From 1b339ea062b423f1c6fbeb02116d020d18418917 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Thu, 14 Jul 2022 20:44:22 -0700 Subject: [PATCH] allow force delete on decom pool (#15302) Bonus: - skip suspended pool from being considered for multipart uploads - add more context for decomErrors() --- cmd/erasure-multipart.go | 2 +- cmd/erasure-server-pool-decom.go | 14 ++++++++--- cmd/erasure-server-pool.go | 42 ++++++++++++++++++++++---------- 3 files changed, 40 insertions(+), 18 deletions(-) diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index 12465be38..258d010c6 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -237,7 +237,7 @@ func (er erasureObjects) ListMultipartUploads(ctx context.Context, bucket, objec } fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, pathJoin(er.getUploadIDDir(bucket, object, uploadID)), "", false) if err != nil { - return result, toObjectErr(err, bucket, object) + return result, toObjectErr(err, bucket, object, uploadID) } populatedUploadIds.Add(uploadID) uploads = append(uploads, MultipartInfo{ diff --git a/cmd/erasure-server-pool-decom.go b/cmd/erasure-server-pool-decom.go index 28b68811a..97ac94b03 100644 --- a/cmd/erasure-server-pool-decom.go +++ b/cmd/erasure-server-pool-decom.go @@ -587,21 +587,21 @@ func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket stri UserDefined: objInfo.UserDefined, }) if err != nil { - return err + return fmt.Errorf("decommissionObject: NewMultipartUpload() %w", err) } defer z.AbortMultipartUpload(ctx, bucket, objInfo.Name, uploadID, ObjectOptions{}) parts := make([]CompletePart, len(objInfo.Parts)) for i, part := range objInfo.Parts { hr, err := hash.NewReader(gr, part.Size, "", "", part.Size) if err != nil { - return err + return fmt.Errorf("decommissionObject: hash.NewReader() %w", err) } pi, err := z.PutObjectPart(ctx, bucket, objInfo.Name, uploadID, part.Number, NewPutObjReader(hr), ObjectOptions{}) if err != nil { - return err + return fmt.Errorf("decommissionObject: PutObjectPart() %w", err) } parts[i] = CompletePart{ ETag: pi.ETag, @@ -611,11 +611,14 @@ func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket stri _, err = z.CompleteMultipartUpload(ctx, bucket, objInfo.Name, uploadID, parts, ObjectOptions{ MTime: objInfo.ModTime, }) + if err != nil { + err = fmt.Errorf("decommissionObject: CompleteMultipartUpload() %w", err) + } return err } hr, err := hash.NewReader(gr, objInfo.Size, "", "", objInfo.Size) if err != nil { - return err + return fmt.Errorf("decommissionObject: hash.NewReader() %w", err) } _, err = z.PutObject(ctx, bucket, @@ -626,6 +629,9 @@ func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket stri MTime: objInfo.ModTime, UserDefined: objInfo.UserDefined, }) + if err != nil { + err = fmt.Errorf("decommissionObject: PutObject() %w", err) + } return err } diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index 77d922e62..e427e8c3b 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -976,13 +976,8 @@ func (z *erasureServerPools) PutObject(ctx context.Context, bucket string, objec } func (z *erasureServerPools) deletePrefix(ctx context.Context, bucket string, prefix string) error { - for idx, pool := range z.serverPools { - if z.IsSuspended(idx) { - logger.LogIf(ctx, fmt.Errorf("pool %d is suspended, all writes are suspended", idx+1)) - continue - } - _, err := pool.DeleteObject(ctx, bucket, prefix, ObjectOptions{DeletePrefix: true}) - if err != nil { + for _, pool := range z.serverPools { + if _, err := pool.DeleteObject(ctx, bucket, prefix, ObjectOptions{DeletePrefix: true}); err != nil { return err } } @@ -1325,7 +1320,10 @@ func (z *erasureServerPools) ListMultipartUploads(ctx context.Context, bucket, p poolResult.KeyMarker = keyMarker poolResult.Prefix = prefix poolResult.Delimiter = delimiter - for _, pool := range z.serverPools { + for idx, pool := range z.serverPools { + if z.IsSuspended(idx) { + continue + } result, err := pool.ListMultipartUploads(ctx, bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads) if err != nil { @@ -1350,6 +1348,9 @@ func (z *erasureServerPools) NewMultipartUpload(ctx context.Context, bucket, obj } for idx, pool := range z.serverPools { + if z.IsSuspended(idx) { + continue + } result, err := pool.ListMultipartUploads(ctx, bucket, object, "", "", "", maxUploadsList) if err != nil { return "", err @@ -1392,7 +1393,10 @@ func (z *erasureServerPools) PutObjectPart(ctx context.Context, bucket, object, return z.serverPools[0].PutObjectPart(ctx, bucket, object, uploadID, partID, data, opts) } - for _, pool := range z.serverPools { + for idx, pool := range z.serverPools { + if z.IsSuspended(idx) { + continue + } _, err := pool.GetMultipartInfo(ctx, bucket, object, uploadID, opts) if err == nil { return pool.PutObjectPart(ctx, bucket, object, uploadID, partID, data, opts) @@ -1421,7 +1425,10 @@ func (z *erasureServerPools) GetMultipartInfo(ctx context.Context, bucket, objec if z.SinglePool() { return z.serverPools[0].GetMultipartInfo(ctx, bucket, object, uploadID, opts) } - for _, pool := range z.serverPools { + for idx, pool := range z.serverPools { + if z.IsSuspended(idx) { + continue + } mi, err := pool.GetMultipartInfo(ctx, bucket, object, uploadID, opts) if err == nil { return mi, nil @@ -1450,7 +1457,10 @@ func (z *erasureServerPools) ListObjectParts(ctx context.Context, bucket, object if z.SinglePool() { return z.serverPools[0].ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxParts, opts) } - for _, pool := range z.serverPools { + for idx, pool := range z.serverPools { + if z.IsSuspended(idx) { + continue + } _, err := pool.GetMultipartInfo(ctx, bucket, object, uploadID, opts) if err == nil { return pool.ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxParts, opts) @@ -1478,7 +1488,10 @@ func (z *erasureServerPools) AbortMultipartUpload(ctx context.Context, bucket, o return z.serverPools[0].AbortMultipartUpload(ctx, bucket, object, uploadID, opts) } - for _, pool := range z.serverPools { + for idx, pool := range z.serverPools { + if z.IsSuspended(idx) { + continue + } _, err := pool.GetMultipartInfo(ctx, bucket, object, uploadID, opts) if err == nil { return pool.AbortMultipartUpload(ctx, bucket, object, uploadID, opts) @@ -1507,7 +1520,10 @@ func (z *erasureServerPools) CompleteMultipartUpload(ctx context.Context, bucket return z.serverPools[0].CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, opts) } - for _, pool := range z.serverPools { + for idx, pool := range z.serverPools { + if z.IsSuspended(idx) { + continue + } _, err := pool.GetMultipartInfo(ctx, bucket, object, uploadID, opts) if err == nil { return pool.CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, opts)