parallelize renameData() cleanup upon error (#18591)

This commit is contained in:
Harshavardhana 2023-12-04 14:54:34 -08:00 committed by GitHub
parent 05bb655efc
commit 45b7253f39
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -99,7 +99,7 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d
readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, metaArr, errs, er.defaultParityCount)
if err != nil {
if errors.Is(err, errErasureReadQuorum) && !strings.HasPrefix(srcBucket, minioMetaBucket) {
_, derr := er.deleteIfDangling(ctx, srcBucket, srcObject, metaArr, errs, nil, srcOpts)
_, derr := er.deleteIfDangling(context.Background(), srcBucket, srcObject, metaArr, errs, nil, srcOpts)
if derr != nil {
err = derr
}
@ -910,7 +910,7 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s
if err != nil {
if shouldCheckForDangling(err, errs, bucket) {
_, derr := er.deleteIfDangling(ctx, bucket, object, metaArr, errs, nil, opts)
_, derr := er.deleteIfDangling(context.Background(), bucket, object, metaArr, errs, nil, opts)
if derr != nil {
err = derr
}
@ -1037,15 +1037,21 @@ func renameData(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry str
err := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
if err != nil {
dg := errgroup.WithNErrs(len(disks))
for index, nerr := range errs {
if nerr != nil {
continue
}
index := index
// When we are going to return error, attempt to delete success
// on some of the drives, if we cannot we do not have to notify
// caller this dangling object will be now scheduled to be removed
// via active healing.
if nerr == nil {
disks[index].DeleteVersion(ctx, dstBucket, dstEntry, metadata[index], false)
}
dg.Go(func() error {
return disks[index].DeleteVersion(context.Background(), dstBucket, dstEntry, metadata[index], false)
}, index)
}
dg.Wait()
}
if err == nil {
versions := reduceCommonVersions(diskVersions, writeQuorum)
@ -2042,7 +2048,7 @@ func (er erasureObjects) PutObjectMetadata(ctx context.Context, bucket, object s
readQuorum, _, err := objectQuorumFromMeta(ctx, metaArr, errs, er.defaultParityCount)
if err != nil {
if errors.Is(err, errErasureReadQuorum) && !strings.HasPrefix(bucket, minioMetaBucket) {
_, derr := er.deleteIfDangling(ctx, bucket, object, metaArr, errs, nil, opts)
_, derr := er.deleteIfDangling(context.Background(), bucket, object, metaArr, errs, nil, opts)
if derr != nil {
err = derr
}
@ -2115,7 +2121,7 @@ func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object strin
readQuorum, _, err := objectQuorumFromMeta(ctx, metaArr, errs, er.defaultParityCount)
if err != nil {
if errors.Is(err, errErasureReadQuorum) && !strings.HasPrefix(bucket, minioMetaBucket) {
_, derr := er.deleteIfDangling(ctx, bucket, object, metaArr, errs, nil, opts)
_, derr := er.deleteIfDangling(context.Background(), bucket, object, metaArr, errs, nil, opts)
if derr != nil {
err = derr
}