mirror of
https://github.com/minio/minio.git
synced 2025-01-25 21:53:16 -05:00
fix: avoid races in NewMultipartUpload under multiple pools (#12233)
It is possible in some scenarios that in multiple pools, two concurrent calls for the same object as a multipart operation can lead to duplicate entries on two different pools. This PR fixes this - hold locks to serialize multiple callers so that we don't race. - make sure to look for existing objects on the namespace as well not just for existing uploadIDs
This commit is contained in:
parent
1aa5858543
commit
361940706d
@ -1082,9 +1082,9 @@ var errorCodes = errorCodeMap{
|
||||
HTTPStatusCode: http.StatusBadRequest,
|
||||
},
|
||||
ErrKMSNotConfigured: {
|
||||
Code: "InvalidArgument",
|
||||
Code: "NotImplemented",
|
||||
Description: "Server side encryption specified but KMS is not configured",
|
||||
HTTPStatusCode: http.StatusBadRequest,
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
},
|
||||
ErrNoAccessKey: {
|
||||
Code: "AccessDenied",
|
||||
|
@ -268,47 +268,6 @@ func (z *erasureServerPools) getPoolIdxExisting(ctx context.Context, bucket, obj
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
for i, err := range errs {
|
||||
if err == nil {
|
||||
return i, nil
|
||||
}
|
||||
if isErrObjectNotFound(err) {
|
||||
// No object exists or its a delete marker,
|
||||
// check objInfo to confirm.
|
||||
if objInfos[i].DeleteMarker && objInfos[i].Name != "" {
|
||||
return i, nil
|
||||
}
|
||||
|
||||
// objInfo is not valid, truly the object doesn't
|
||||
// exist proceed to next pool.
|
||||
continue
|
||||
}
|
||||
return -1, err
|
||||
}
|
||||
|
||||
return -1, toObjectErr(errFileNotFound, bucket, object)
|
||||
}
|
||||
|
||||
// getPoolIdx returns the found previous object and its corresponding pool idx,
|
||||
// if none are found falls back to most available space pool.
|
||||
func (z *erasureServerPools) getPoolIdx(ctx context.Context, bucket, object string, size int64) (idx int, err error) {
|
||||
if z.SinglePool() {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
errs := make([]error, len(z.serverPools))
|
||||
objInfos := make([]ObjectInfo, len(z.serverPools))
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for i, pool := range z.serverPools {
|
||||
wg.Add(1)
|
||||
go func(i int, pool *erasureSets) {
|
||||
defer wg.Done()
|
||||
objInfos[i], errs[i] = pool.GetObjectInfo(ctx, bucket, object, ObjectOptions{})
|
||||
}(i, pool)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
for i, err := range errs {
|
||||
if err != nil && !isErrObjectNotFound(err) {
|
||||
return -1, err
|
||||
@ -319,19 +278,33 @@ func (z *erasureServerPools) getPoolIdx(ctx context.Context, bucket, object stri
|
||||
if objInfos[i].DeleteMarker && objInfos[i].Name != "" {
|
||||
return i, nil
|
||||
}
|
||||
|
||||
// objInfo is not valid, truly the object doesn't
|
||||
// exist proceed to next pool.
|
||||
continue
|
||||
}
|
||||
// object exists at this pool.
|
||||
return i, nil
|
||||
}
|
||||
|
||||
// We multiply the size by 2 to account for erasure coding.
|
||||
idx = z.getAvailablePoolIdx(ctx, size*2)
|
||||
if idx < 0 {
|
||||
return -1, toObjectErr(errDiskFull)
|
||||
return -1, toObjectErr(errFileNotFound, bucket, object)
|
||||
}
|
||||
|
||||
// getPoolIdx returns the found previous object and its corresponding pool idx,
|
||||
// if none are found falls back to most available space pool.
|
||||
func (z *erasureServerPools) getPoolIdx(ctx context.Context, bucket, object string, size int64) (idx int, err error) {
|
||||
idx, err = z.getPoolIdxExisting(ctx, bucket, object)
|
||||
if err != nil && !isErrObjectNotFound(err) {
|
||||
return idx, err
|
||||
}
|
||||
|
||||
if isErrObjectNotFound(err) {
|
||||
// We multiply the size by 2 to account for erasure coding.
|
||||
idx = z.getAvailablePoolIdx(ctx, size*2)
|
||||
if idx < 0 {
|
||||
return -1, toObjectErr(errDiskFull)
|
||||
}
|
||||
}
|
||||
|
||||
return idx, nil
|
||||
}
|
||||
|
||||
@ -1039,6 +1012,14 @@ func (z *erasureServerPools) NewMultipartUpload(ctx context.Context, bucket, obj
|
||||
return z.serverPools[0].NewMultipartUpload(ctx, bucket, object, opts)
|
||||
}
|
||||
|
||||
ns := z.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object, "newMultipartObject.lck"))
|
||||
lkctx, err := ns.GetLock(ctx, globalOperationTimeout)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
ctx = lkctx.Context()
|
||||
defer ns.Unlock(lkctx.Cancel)
|
||||
|
||||
for idx, pool := range z.serverPools {
|
||||
result, err := pool.ListMultipartUploads(ctx, bucket, object, "", "", "", maxUploadsList)
|
||||
if err != nil {
|
||||
@ -1052,10 +1033,9 @@ func (z *erasureServerPools) NewMultipartUpload(ctx context.Context, bucket, obj
|
||||
}
|
||||
}
|
||||
|
||||
// We multiply the size by 2 to account for erasure coding.
|
||||
idx := z.getAvailablePoolIdx(ctx, (1<<30)*2)
|
||||
if idx < 0 {
|
||||
return "", toObjectErr(errDiskFull)
|
||||
idx, err := z.getPoolIdx(ctx, bucket, object, 1<<30)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return z.serverPools[idx].NewMultipartUpload(ctx, bucket, object, opts)
|
||||
|
Loading…
x
Reference in New Issue
Block a user