fix: newMultipartUpload should go to same pool (#12106)

avoid potential for duplicates under multi-pool
setup, additionally also make sure CompleteMultipart
is using a more optimal API for uploadID lookup
and never delete the object there is a potential
to create a delete marker during complete multipart.

Signed-off-by: Harshavardhana <harsha@minio.io>
This commit is contained in:
Harshavardhana 2021-04-21 10:57:36 -07:00 committed by GitHub
parent 31dab4e7ff
commit 4a41222310
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 20 additions and 14 deletions

View File

@ -1035,10 +1035,23 @@ func (z *erasureServerPools) NewMultipartUpload(ctx context.Context, bucket, obj
return z.serverPools[0].NewMultipartUpload(ctx, bucket, object, opts) return z.serverPools[0].NewMultipartUpload(ctx, bucket, object, opts)
} }
// We don't know the exact size, so we ask for at least 1GiB file. for idx, pool := range z.serverPools {
idx, err := z.getPoolIdx(ctx, bucket, object, 1<<30) result, err := pool.ListMultipartUploads(ctx, bucket, object, "", "", "", maxUploadsList)
if err != nil { if err != nil {
return "", err return "", err
}
// If there is a multipart upload with the same bucket/object name,
// create the new multipart in the same pool, this will avoid
// creating two multiparts uploads in two different pools
if len(result.Uploads) != 0 {
return z.serverPools[idx].NewMultipartUpload(ctx, bucket, object, opts)
}
}
// We multiply the size by 2 to account for erasure coding.
idx := z.getAvailablePoolIdx(ctx, (1<<30)*2)
if idx < 0 {
return "", toObjectErr(errDiskFull)
} }
return z.serverPools[idx].NewMultipartUpload(ctx, bucket, object, opts) return z.serverPools[idx].NewMultipartUpload(ctx, bucket, object, opts)
@ -1180,20 +1193,13 @@ func (z *erasureServerPools) CompleteMultipartUpload(ctx context.Context, bucket
return z.serverPools[0].CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, opts) return z.serverPools[0].CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, opts)
} }
// Purge any existing object.
for _, pool := range z.serverPools { for _, pool := range z.serverPools {
pool.DeleteObject(ctx, bucket, object, opts) _, err := pool.GetMultipartInfo(ctx, bucket, object, uploadID, opts)
} if err == nil {
for _, pool := range z.serverPools {
result, err := pool.ListMultipartUploads(ctx, bucket, object, "", "", "", maxUploadsList)
if err != nil {
return objInfo, err
}
if result.Lookup(uploadID) {
return pool.CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, opts) return pool.CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, opts)
} }
} }
return objInfo, InvalidUploadID{ return objInfo, InvalidUploadID{
Bucket: bucket, Bucket: bucket,
Object: object, Object: object,