allow force delete on decom pool (#15302)

Bonus:

- skip suspended pool from being
  considered for multipart uploads

- add more context for decomErrors()
This commit is contained in:
Harshavardhana 2022-07-14 20:44:22 -07:00 committed by GitHub
parent 236ef03dbd
commit 1b339ea062
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 40 additions and 18 deletions

View File

@ -237,7 +237,7 @@ func (er erasureObjects) ListMultipartUploads(ctx context.Context, bucket, objec
} }
fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, pathJoin(er.getUploadIDDir(bucket, object, uploadID)), "", false) fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, pathJoin(er.getUploadIDDir(bucket, object, uploadID)), "", false)
if err != nil { if err != nil {
return result, toObjectErr(err, bucket, object) return result, toObjectErr(err, bucket, object, uploadID)
} }
populatedUploadIds.Add(uploadID) populatedUploadIds.Add(uploadID)
uploads = append(uploads, MultipartInfo{ uploads = append(uploads, MultipartInfo{

View File

@ -587,21 +587,21 @@ func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket stri
UserDefined: objInfo.UserDefined, UserDefined: objInfo.UserDefined,
}) })
if err != nil { if err != nil {
return err return fmt.Errorf("decommissionObject: NewMultipartUpload() %w", err)
} }
defer z.AbortMultipartUpload(ctx, bucket, objInfo.Name, uploadID, ObjectOptions{}) defer z.AbortMultipartUpload(ctx, bucket, objInfo.Name, uploadID, ObjectOptions{})
parts := make([]CompletePart, len(objInfo.Parts)) parts := make([]CompletePart, len(objInfo.Parts))
for i, part := range objInfo.Parts { for i, part := range objInfo.Parts {
hr, err := hash.NewReader(gr, part.Size, "", "", part.Size) hr, err := hash.NewReader(gr, part.Size, "", "", part.Size)
if err != nil { if err != nil {
return err return fmt.Errorf("decommissionObject: hash.NewReader() %w", err)
} }
pi, err := z.PutObjectPart(ctx, bucket, objInfo.Name, uploadID, pi, err := z.PutObjectPart(ctx, bucket, objInfo.Name, uploadID,
part.Number, part.Number,
NewPutObjReader(hr), NewPutObjReader(hr),
ObjectOptions{}) ObjectOptions{})
if err != nil { if err != nil {
return err return fmt.Errorf("decommissionObject: PutObjectPart() %w", err)
} }
parts[i] = CompletePart{ parts[i] = CompletePart{
ETag: pi.ETag, ETag: pi.ETag,
@ -611,11 +611,14 @@ func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket stri
_, err = z.CompleteMultipartUpload(ctx, bucket, objInfo.Name, uploadID, parts, ObjectOptions{ _, err = z.CompleteMultipartUpload(ctx, bucket, objInfo.Name, uploadID, parts, ObjectOptions{
MTime: objInfo.ModTime, MTime: objInfo.ModTime,
}) })
if err != nil {
err = fmt.Errorf("decommissionObject: CompleteMultipartUpload() %w", err)
}
return err return err
} }
hr, err := hash.NewReader(gr, objInfo.Size, "", "", objInfo.Size) hr, err := hash.NewReader(gr, objInfo.Size, "", "", objInfo.Size)
if err != nil { if err != nil {
return err return fmt.Errorf("decommissionObject: hash.NewReader() %w", err)
} }
_, err = z.PutObject(ctx, _, err = z.PutObject(ctx,
bucket, bucket,
@ -626,6 +629,9 @@ func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket stri
MTime: objInfo.ModTime, MTime: objInfo.ModTime,
UserDefined: objInfo.UserDefined, UserDefined: objInfo.UserDefined,
}) })
if err != nil {
err = fmt.Errorf("decommissionObject: PutObject() %w", err)
}
return err return err
} }

View File

@ -976,13 +976,8 @@ func (z *erasureServerPools) PutObject(ctx context.Context, bucket string, objec
} }
func (z *erasureServerPools) deletePrefix(ctx context.Context, bucket string, prefix string) error { func (z *erasureServerPools) deletePrefix(ctx context.Context, bucket string, prefix string) error {
for idx, pool := range z.serverPools { for _, pool := range z.serverPools {
if z.IsSuspended(idx) { if _, err := pool.DeleteObject(ctx, bucket, prefix, ObjectOptions{DeletePrefix: true}); err != nil {
logger.LogIf(ctx, fmt.Errorf("pool %d is suspended, all writes are suspended", idx+1))
continue
}
_, err := pool.DeleteObject(ctx, bucket, prefix, ObjectOptions{DeletePrefix: true})
if err != nil {
return err return err
} }
} }
@ -1325,7 +1320,10 @@ func (z *erasureServerPools) ListMultipartUploads(ctx context.Context, bucket, p
poolResult.KeyMarker = keyMarker poolResult.KeyMarker = keyMarker
poolResult.Prefix = prefix poolResult.Prefix = prefix
poolResult.Delimiter = delimiter poolResult.Delimiter = delimiter
for _, pool := range z.serverPools { for idx, pool := range z.serverPools {
if z.IsSuspended(idx) {
continue
}
result, err := pool.ListMultipartUploads(ctx, bucket, prefix, keyMarker, uploadIDMarker, result, err := pool.ListMultipartUploads(ctx, bucket, prefix, keyMarker, uploadIDMarker,
delimiter, maxUploads) delimiter, maxUploads)
if err != nil { if err != nil {
@ -1350,6 +1348,9 @@ func (z *erasureServerPools) NewMultipartUpload(ctx context.Context, bucket, obj
} }
for idx, pool := range z.serverPools { for idx, pool := range z.serverPools {
if z.IsSuspended(idx) {
continue
}
result, err := pool.ListMultipartUploads(ctx, bucket, object, "", "", "", maxUploadsList) result, err := pool.ListMultipartUploads(ctx, bucket, object, "", "", "", maxUploadsList)
if err != nil { if err != nil {
return "", err return "", err
@ -1392,7 +1393,10 @@ func (z *erasureServerPools) PutObjectPart(ctx context.Context, bucket, object,
return z.serverPools[0].PutObjectPart(ctx, bucket, object, uploadID, partID, data, opts) return z.serverPools[0].PutObjectPart(ctx, bucket, object, uploadID, partID, data, opts)
} }
for _, pool := range z.serverPools { for idx, pool := range z.serverPools {
if z.IsSuspended(idx) {
continue
}
_, err := pool.GetMultipartInfo(ctx, bucket, object, uploadID, opts) _, err := pool.GetMultipartInfo(ctx, bucket, object, uploadID, opts)
if err == nil { if err == nil {
return pool.PutObjectPart(ctx, bucket, object, uploadID, partID, data, opts) return pool.PutObjectPart(ctx, bucket, object, uploadID, partID, data, opts)
@ -1421,7 +1425,10 @@ func (z *erasureServerPools) GetMultipartInfo(ctx context.Context, bucket, objec
if z.SinglePool() { if z.SinglePool() {
return z.serverPools[0].GetMultipartInfo(ctx, bucket, object, uploadID, opts) return z.serverPools[0].GetMultipartInfo(ctx, bucket, object, uploadID, opts)
} }
for _, pool := range z.serverPools { for idx, pool := range z.serverPools {
if z.IsSuspended(idx) {
continue
}
mi, err := pool.GetMultipartInfo(ctx, bucket, object, uploadID, opts) mi, err := pool.GetMultipartInfo(ctx, bucket, object, uploadID, opts)
if err == nil { if err == nil {
return mi, nil return mi, nil
@ -1450,7 +1457,10 @@ func (z *erasureServerPools) ListObjectParts(ctx context.Context, bucket, object
if z.SinglePool() { if z.SinglePool() {
return z.serverPools[0].ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxParts, opts) return z.serverPools[0].ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxParts, opts)
} }
for _, pool := range z.serverPools { for idx, pool := range z.serverPools {
if z.IsSuspended(idx) {
continue
}
_, err := pool.GetMultipartInfo(ctx, bucket, object, uploadID, opts) _, err := pool.GetMultipartInfo(ctx, bucket, object, uploadID, opts)
if err == nil { if err == nil {
return pool.ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxParts, opts) return pool.ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxParts, opts)
@ -1478,7 +1488,10 @@ func (z *erasureServerPools) AbortMultipartUpload(ctx context.Context, bucket, o
return z.serverPools[0].AbortMultipartUpload(ctx, bucket, object, uploadID, opts) return z.serverPools[0].AbortMultipartUpload(ctx, bucket, object, uploadID, opts)
} }
for _, pool := range z.serverPools { for idx, pool := range z.serverPools {
if z.IsSuspended(idx) {
continue
}
_, err := pool.GetMultipartInfo(ctx, bucket, object, uploadID, opts) _, err := pool.GetMultipartInfo(ctx, bucket, object, uploadID, opts)
if err == nil { if err == nil {
return pool.AbortMultipartUpload(ctx, bucket, object, uploadID, opts) return pool.AbortMultipartUpload(ctx, bucket, object, uploadID, opts)
@ -1507,7 +1520,10 @@ func (z *erasureServerPools) CompleteMultipartUpload(ctx context.Context, bucket
return z.serverPools[0].CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, opts) return z.serverPools[0].CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, opts)
} }
for _, pool := range z.serverPools { for idx, pool := range z.serverPools {
if z.IsSuspended(idx) {
continue
}
_, err := pool.GetMultipartInfo(ctx, bucket, object, uploadID, opts) _, err := pool.GetMultipartInfo(ctx, bucket, object, uploadID, opts)
if err == nil { if err == nil {
return pool.CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, opts) return pool.CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, opts)