update quorum requirement to list all objects (#14201)

some upgraded objects might not get listed due
to different quorum ratios across objects.

make sure to list all objects that satisfy the
maximum possible quorum.
This commit is contained in:
Harshavardhana 2022-01-27 17:00:15 -08:00 committed by GitHub
parent c3d9c45f58
commit aaea94a48d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 9 additions and 15 deletions

View File

@ -289,7 +289,7 @@ func scanDataFolder(ctx context.Context, poolIdx, setIdx int, basePath string, c
if poolIdx < len(objAPI.serverPools) && setIdx < len(objAPI.serverPools[poolIdx].sets) { if poolIdx < len(objAPI.serverPools) && setIdx < len(objAPI.serverPools[poolIdx].sets) {
// Pass the disks belonging to the set. // Pass the disks belonging to the set.
s.disks = objAPI.serverPools[poolIdx].sets[setIdx].getDisks() s.disks = objAPI.serverPools[poolIdx].sets[setIdx].getDisks()
s.disksQuorum = objAPI.serverPools[poolIdx].sets[setIdx].defaultRQuorum() s.disksQuorum = len(s.disks) / 2
} else { } else {
logger.LogIf(ctx, fmt.Errorf("Matching pool %s, set %s not found", humanize.Ordinal(poolIdx+1), humanize.Ordinal(setIdx+1))) logger.LogIf(ctx, fmt.Errorf("Matching pool %s, set %s not found", humanize.Ordinal(poolIdx+1), humanize.Ordinal(setIdx+1)))
} }

View File

@ -1037,7 +1037,7 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec
// class for objects which have reduced quorum // class for objects which have reduced quorum
// storage class only needs to be honored for // storage class only needs to be honored for
// Read() requests alone which we already do. // Read() requests alone which we already do.
writeQuorums[i] = er.defaultWQuorum() writeQuorums[i] = len(storageDisks)/2 + 1
} }
versionsMap := make(map[string]FileInfoVersions, len(objects)) versionsMap := make(map[string]FileInfoVersions, len(objects))
@ -1153,10 +1153,8 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec
errs[objIndex] = toObjectErr(err, bucket, objects[objIndex].ObjectName) errs[objIndex] = toObjectErr(err, bucket, objects[objIndex].ObjectName)
} }
if errs[objIndex] == nil {
defer NSUpdated(bucket, objects[objIndex].ObjectName) defer NSUpdated(bucket, objects[objIndex].ObjectName)
} }
}
// Check failed deletes across multiple objects // Check failed deletes across multiple objects
for i, dobj := range dobjects { for i, dobj := range dobjects {

View File

@ -682,8 +682,8 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
// How to resolve partial results. // How to resolve partial results.
resolver := metadataResolutionParams{ resolver := metadataResolutionParams{
dirQuorum: set.defaultRQuorum(), dirQuorum: len(disks) / 2, // make sure to capture all quorum ratios
objQuorum: set.defaultRQuorum(), objQuorum: len(disks) / 2, // make sure to capture all quorum ratios
bucket: bName, bucket: bName,
} }
@ -692,7 +692,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
bucket: bName, bucket: bName,
recursive: true, recursive: true,
forwardTo: forwardTo, forwardTo: forwardTo,
minDisks: len(disks), minDisks: len(disks) / 2, // to capture all quorum ratios
reportNotFound: false, reportNotFound: false,
agreed: decommissionEntry, agreed: decommissionEntry,
partial: func(entries metaCacheEntries, nAgreed int, errs []error) { partial: func(entries metaCacheEntries, nAgreed int, errs []error) {

View File

@ -542,13 +542,9 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions, resul
var fallbackDisks []StorageAPI var fallbackDisks []StorageAPI
// Special case: ask all disks if the drive count is 4 // Special case: ask all disks if the drive count is 4
if askDisks == -1 || er.setDriveCount == 4 { if askDisks <= 0 || er.setDriveCount == 4 {
askDisks = len(disks) // with 'strict' quorum list on all online disks. askDisks = len(disks) // with 'strict' quorum list on all online disks.
listingQuorum = er.defaultRQuorum() listingQuorum = len(disks) / 2 // keep this such that we can list all objects with different quorum ratio.
}
if askDisks == 0 {
askDisks = globalAPIConfig.getListQuorum()
listingQuorum = askDisks
} }
if askDisks > 0 && len(disks) > askDisks { if askDisks > 0 && len(disks) > askDisks {
rand.Shuffle(len(disks), func(i, j int) { rand.Shuffle(len(disks), func(i, j int) {