replication: set context timeout for NewMultipartUpload calls (#17807)

This commit is contained in:
Poorna 2023-08-05 12:27:07 -07:00 committed by GitHub
parent a436fd513b
commit 26c23b30f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1491,7 +1491,9 @@ func replicateObjectWithMultipart(ctx context.Context, c *minio.Core, bucket, ob
var uploadedParts []minio.CompletePart var uploadedParts []minio.CompletePart
// new multipart must not set mtime as it may lead to erroneous cleanups at various intervals. // new multipart must not set mtime as it may lead to erroneous cleanups at various intervals.
opts.Internal.SourceMTime = time.Time{} // this value is saved properly in CompleteMultipartUpload() opts.Internal.SourceMTime = time.Time{} // this value is saved properly in CompleteMultipartUpload()
uploadID, err := c.NewMultipartUpload(context.Background(), bucket, object, opts) nctx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()
uploadID, err := c.NewMultipartUpload(nctx, bucket, object, opts)
if err != nil { if err != nil {
return err return err
} }
@ -1501,12 +1503,15 @@ func replicateObjectWithMultipart(ctx context.Context, c *minio.Core, bucket, ob
// block and abort remote upload upon failure. // block and abort remote upload upon failure.
attempts := 1 attempts := 1
for attempts <= 3 { for attempts <= 3 {
aerr := c.AbortMultipartUpload(ctx, bucket, object, uploadID) actx, acancel := context.WithTimeout(ctx, time.Minute)
aerr := c.AbortMultipartUpload(actx, bucket, object, uploadID)
if aerr == nil { if aerr == nil {
acancel()
return return
} }
logger.LogIf(ctx, acancel()
fmt.Errorf("Trying %s: Unable to cleanup failed multipart replication %s on remote %s/%s: %w - this may consume space on remote cluster", logger.LogIf(actx,
fmt.Errorf("trying %s: Unable to cleanup failed multipart replication %s on remote %s/%s: %w - this may consume space on remote cluster",
humanize.Ordinal(attempts), uploadID, bucket, object, aerr)) humanize.Ordinal(attempts), uploadID, bucket, object, aerr))
attempts++ attempts++
time.Sleep(time.Second) time.Sleep(time.Second)
@ -1541,7 +1546,9 @@ func replicateObjectWithMultipart(ctx context.Context, c *minio.Core, bucket, ob
ETag: pInfo.ETag, ETag: pInfo.ETag,
}) })
} }
_, err = c.CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, minio.PutObjectOptions{ cctx, ccancel := context.WithTimeout(ctx, 10*time.Minute)
defer ccancel()
_, err = c.CompleteMultipartUpload(cctx, bucket, object, uploadID, uploadedParts, minio.PutObjectOptions{
Internal: minio.AdvancedPutOptions{ Internal: minio.AdvancedPutOptions{
SourceMTime: objInfo.ModTime, SourceMTime: objInfo.ModTime,
// always set this to distinguish between `mc mirror` replication and serverside // always set this to distinguish between `mc mirror` replication and serverside