fix: dangling objects honor parityBlocks instead of dataBlocks (#19019)

Bonus: do not recreate buckets if NoRecreate is asked.
This commit is contained in:
Harshavardhana 2024-02-08 15:22:16 -08:00 committed by GitHub
parent 6005ad3d48
commit 404d8b3084
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 11 additions and 20 deletions

View File

@ -961,24 +961,18 @@ func isObjectDangling(metaArr []FileInfo, errs []error, dataErrs []error) (valid
return validMeta, notFoundMetaErrs > dataBlocks return validMeta, notFoundMetaErrs > dataBlocks
} }
quorum := validMeta.Erasure.DataBlocks
if validMeta.Erasure.DataBlocks == validMeta.Erasure.ParityBlocks {
quorum++
}
// TODO: It is possible to replay the object via just single // TODO: It is possible to replay the object via just single
// xl.meta file, considering quorum number of data-dirs are still // xl.meta file, considering quorum number of data-dirs are still
// present on other drives. // present on other drives.
// //
// However this requires a bit of a rewrite, leave this up for // However this requires a bit of a rewrite, leave this up for
// future work. // future work.
if notFoundMetaErrs > 0 && notFoundMetaErrs > validMeta.Erasure.ParityBlocks {
if notFoundMetaErrs > 0 && notFoundMetaErrs >= quorum {
// All xl.meta is beyond data blocks missing, this is dangling // All xl.meta is beyond data blocks missing, this is dangling
return validMeta, true return validMeta, true
} }
if !validMeta.IsRemote() && notFoundPartsErrs > 0 && notFoundPartsErrs >= quorum { if !validMeta.IsRemote() && notFoundPartsErrs > 0 && notFoundPartsErrs > validMeta.Erasure.ParityBlocks {
// All data-dir is beyond data blocks missing, this is dangling // All data-dir is beyond data blocks missing, this is dangling
return validMeta, true return validMeta, true
} }

View File

@ -301,10 +301,10 @@ func (er erasureObjects) ListMultipartUploads(ctx context.Context, bucket, objec
// to read the metadata entry. // to read the metadata entry.
var uploads []MultipartInfo var uploads []MultipartInfo
populatedUploadIds := set.NewStringSet() populatedUploadIDs := set.NewStringSet()
for _, uploadID := range uploadIDs { for _, uploadID := range uploadIDs {
if populatedUploadIds.Contains(uploadID) { if populatedUploadIDs.Contains(uploadID) {
continue continue
} }
// If present, use time stored in ID. // If present, use time stored in ID.
@ -321,7 +321,7 @@ func (er erasureObjects) ListMultipartUploads(ctx context.Context, bucket, objec
UploadID: base64.RawURLEncoding.EncodeToString([]byte(fmt.Sprintf("%s.%s", globalDeploymentID(), uploadID))), UploadID: base64.RawURLEncoding.EncodeToString([]byte(fmt.Sprintf("%s.%s", globalDeploymentID(), uploadID))),
Initiated: startTime, Initiated: startTime,
}) })
populatedUploadIds.Add(uploadID) populatedUploadIDs.Add(uploadID)
} }
sort.Slice(uploads, func(i int, j int) bool { sort.Slice(uploads, func(i int, j int) bool {

View File

@ -1840,12 +1840,6 @@ func (z *erasureServerPools) DeleteBucket(ctx context.Context, bucket string, op
} }
} }
if err != nil && !isErrBucketNotFound(err) {
if !opts.NoRecreate {
z.s3Peer.MakeBucket(ctx, bucket, MakeBucketOptions{})
}
}
if err == nil { if err == nil {
// Purge the entire bucket metadata entirely. // Purge the entire bucket metadata entirely.
z.deleteAll(context.Background(), minioMetaBucket, pathJoin(bucketMetaPrefix, bucket)) z.deleteAll(context.Background(), minioMetaBucket, pathJoin(bucketMetaPrefix, bucket))

View File

@ -474,9 +474,12 @@ func (sys *S3PeerSys) DeleteBucket(ctx context.Context, bucket string, opts Dele
perPoolErrs = append(perPoolErrs, errs[i]) perPoolErrs = append(perPoolErrs, errs[i])
} }
} }
if poolErr := reduceWriteQuorumErrs(ctx, perPoolErrs, bucketOpIgnoredErrs, len(perPoolErrs)/2+1); poolErr != nil && poolErr != errVolumeNotFound { poolErr := reduceWriteQuorumErrs(ctx, perPoolErrs, bucketOpIgnoredErrs, len(perPoolErrs)/2+1)
// re-create successful deletes, since we are return an error. if poolErr != nil && !errors.Is(poolErr, errVolumeNotFound) {
sys.MakeBucket(ctx, bucket, MakeBucketOptions{}) if !opts.NoRecreate {
// re-create successful deletes, since we are return an error.
sys.MakeBucket(ctx, bucket, MakeBucketOptions{})
}
return toObjectErr(poolErr, bucket) return toObjectErr(poolErr, bucket)
} }
} }