diff --git a/cmd/metacache-set.go b/cmd/metacache-set.go index 258e75975..c427bbb78 100644 --- a/cmd/metacache-set.go +++ b/cmd/metacache-set.go @@ -549,7 +549,7 @@ func getListQuorum(quorum string, driveCount int) int { case "reduced": return 2 case "strict": - return -1 + return driveCount } // Defaults to (driveCount+1)/2 drives per set, defaults to "optimal" value if driveCount > 0 { @@ -563,16 +563,19 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions, resul defer close(results) o.debugf(color.Green("listPath:")+" with options: %#v", o) + // get non-healing disks for listing + disks, _ := er.getOnlineDisksWithHealing() askDisks := getListQuorum(o.AskDisks, er.setDriveCount) - listingQuorum := askDisks - 1 - disks := er.getDisks() var fallbackDisks []StorageAPI // Special case: ask all disks if the drive count is 4 - if askDisks <= 0 || er.setDriveCount == 4 { - askDisks = len(disks) // with 'strict' quorum list on all drives. - listingQuorum = (len(disks) + 1) / 2 // keep this such that we can list all objects with different quorum ratio. + if er.setDriveCount == 4 || askDisks > len(disks) { + askDisks = len(disks) // use all available drives } + + // However many we ask, versions must exist on ~50% + listingQuorum := (askDisks + 1) / 2 + if askDisks > 0 && len(disks) > askDisks { rand.Shuffle(len(disks), func(i, j int) { disks[i], disks[j] = disks[j], disks[i] diff --git a/cmd/xl-storage-format-v2_test.go b/cmd/xl-storage-format-v2_test.go index 5516dbe0f..fb399f33e 100644 --- a/cmd/xl-storage-format-v2_test.go +++ b/cmd/xl-storage-format-v2_test.go @@ -21,10 +21,13 @@ import ( "bufio" "bytes" "compress/gzip" + "encoding/base64" "encoding/json" "fmt" "io" + "math/rand" "os" + "reflect" "sort" "testing" "time" @@ -825,3 +828,128 @@ func Test_mergeXLV2Versions(t *testing.T) { }) } } + +func Test_mergeXLV2Versions2(t *testing.T) { + vDelMarker := xlMetaV2ShallowVersion{header: xlMetaV2VersionHeader{ + VersionID: [16]byte{2}, + ModTime: 1500, + Signature: [4]byte{5, 6, 7, 8}, + Type: DeleteType, + Flags: 0, + }} + vDelMarker.meta, _ = base64.StdEncoding.DecodeString("gqRUeXBlAqZEZWxPYmqDoklExBCvwGEaY+BAO4B4vyG5ERorpU1UaW1l0xbgJlsWE9IHp01ldGFTeXOA") + + vObj := xlMetaV2ShallowVersion{header: xlMetaV2VersionHeader{ + VersionID: [16]byte{1}, + ModTime: 1000, + Signature: [4]byte{1, 2, 3, 4}, + Type: ObjectType, + Flags: xlFlagUsesDataDir | xlFlagInlineData, + }} + vObj.meta, _ = base64.StdEncoding.DecodeString("gqRUeXBlAaVWMk9iat4AEaJJRMQQEkaOteYCSrWB3nqppSIKTqRERGlyxBAO8fXSJ5RI+YEtsp8KneVVpkVjQWxnbwGjRWNNDKNFY04Ep0VjQlNpemXSABAAAKdFY0luZGV4BaZFY0Rpc3TcABAFBgcICQoLDA0ODxABAgMEqENTdW1BbGdvAahQYXJ0TnVtc5EBqVBhcnRFVGFnc8CpUGFydFNpemVzkdEBL6pQYXJ0QVNpemVzkdEBL6RTaXpl0QEvpU1UaW1l0xbgJhIa6ABvp01ldGFTeXOBvHgtbWluaW8taW50ZXJuYWwtaW5saW5lLWRhdGHEBHRydWWnTWV0YVVzcoKsY29udGVudC10eXBluGFwcGxpY2F0aW9uL29jdGV0LXN0cmVhbaRldGFn2SBlYTIxMDE2MmVlYjRhZGMzMWZmOTg0Y2I3NDRkNmFmNg==") + + testCases := []struct { + name string + input [][]xlMetaV2ShallowVersion + quorum int + reqVersions int + want []xlMetaV2ShallowVersion + }{ + { + name: "obj-on-one", + input: [][]xlMetaV2ShallowVersion{ + 0: {vDelMarker, vObj}, // disk 0 + 1: {vDelMarker}, // disk 1 + 2: {vDelMarker}, // disk 2 + }, + quorum: 2, + reqVersions: 0, + want: []xlMetaV2ShallowVersion{vDelMarker}, + }, + { + name: "obj-on-two", + input: [][]xlMetaV2ShallowVersion{ + 0: {vDelMarker, vObj}, // disk 0 + 1: {vDelMarker, vObj}, // disk 1 + 2: {vDelMarker}, // disk 2 + }, + quorum: 2, + reqVersions: 0, + want: []xlMetaV2ShallowVersion{vDelMarker, vObj}, + }, + { + name: "obj-on-all", + input: [][]xlMetaV2ShallowVersion{ + 0: {vDelMarker, vObj}, // disk 0 + 1: {vDelMarker, vObj}, // disk 1 + 2: {vDelMarker, vObj}, // disk 2 + }, + quorum: 2, + reqVersions: 0, + want: []xlMetaV2ShallowVersion{vDelMarker, vObj}, + }, + { + name: "del-on-one", + input: [][]xlMetaV2ShallowVersion{ + 0: {vDelMarker, vObj}, // disk 0 + 1: {vObj}, // disk 1 + 2: {vObj}, // disk 2 + }, + quorum: 2, + reqVersions: 0, + want: []xlMetaV2ShallowVersion{vObj}, + }, + { + name: "del-on-two", + input: [][]xlMetaV2ShallowVersion{ + 0: {vDelMarker, vObj}, // disk 0 + 1: {vDelMarker, vObj}, // disk 1 + 2: {vObj}, // disk 2 + }, + quorum: 2, + reqVersions: 0, + want: []xlMetaV2ShallowVersion{vDelMarker, vObj}, + }, + { + name: "del-on-two-16stripe", + input: [][]xlMetaV2ShallowVersion{ + 0: {vObj}, // disk 0 + 1: {vDelMarker, vObj}, // disk 1 + 2: {vDelMarker, vObj}, // disk 2 + 3: {vDelMarker, vObj}, // disk 3 + 4: {vDelMarker, vObj}, // disk 4 + 5: {vDelMarker, vObj}, // disk 5 + 6: {vDelMarker, vObj}, // disk 6 + 7: {vDelMarker, vObj}, // disk 7 + 8: {vDelMarker, vObj}, // disk 8 + 9: {vDelMarker, vObj}, // disk 9 + 10: {vObj}, // disk 10 + 11: {vDelMarker, vObj}, // disk 11 + 12: {vDelMarker, vObj}, // disk 12 + 13: {vDelMarker, vObj}, // disk 13 + 14: {vDelMarker, vObj}, // disk 14 + 15: {vDelMarker, vObj}, // disk 15 + }, + quorum: 7, + reqVersions: 0, + want: []xlMetaV2ShallowVersion{vDelMarker, vObj}, + }, + } + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + // Run multiple times, shuffling the input order. + for i := int64(0); i < 50; i++ { + t.Run(fmt.Sprint(i), func(t *testing.T) { + rng := rand.New(rand.NewSource(i)) + rng.Shuffle(len(test.input), func(i, j int) { + test.input[i], test.input[j] = test.input[j], test.input[i] + }) + got := mergeXLV2Versions(test.quorum, true, 0, test.input...) + if !reflect.DeepEqual(test.want, got) { + t.Errorf("want %v != got %v", test.want, got) + } + }) + } + }) + } +}