lock on multi pool object creation (#12541)

Create write lock on PutObject and CopyObject when on multi-pool setup.

Use the same lock as NewMultipartUpload so all creation calls share the same lock.
This commit is contained in:
Klaus Post 2021-06-21 09:25:10 -07:00 committed by GitHub
parent 8f1fe3b761
commit 14bb969782
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -783,6 +783,16 @@ func (z *erasureServerPools) PutObject(ctx context.Context, bucket string, objec
}
return z.serverPools[0].PutObject(ctx, bucket, object, data, opts)
}
if !opts.NoLock {
ns := z.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object, "newMultipartObject.lck"))
lkctx, err := ns.GetLock(ctx, globalOperationTimeout)
if err != nil {
return ObjectInfo{}, err
}
ctx = lkctx.Context()
defer ns.Unlock(lkctx.Cancel)
opts.NoLock = true
}
idx, err := z.getPoolIdx(ctx, bucket, object, data.Size())
if err != nil {
@ -894,6 +904,17 @@ func (z *erasureServerPools) CopyObject(ctx context.Context, srcBucket, srcObjec
cpSrcDstSame := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(dstBucket, dstObject))
if !dstOpts.NoLock {
ns := z.NewNSLock(minioMetaMultipartBucket, pathJoin(dstBucket, dstObject, "newMultipartObject.lck"))
lkctx, err := ns.GetLock(ctx, globalOperationTimeout)
if err != nil {
return ObjectInfo{}, err
}
ctx = lkctx.Context()
defer ns.Unlock(lkctx.Cancel)
dstOpts.NoLock = true
}
poolIdx, err := z.getPoolIdx(ctx, dstBucket, dstObject, srcInfo.Size)
if err != nil {
return objInfo, err