handle the locks properly for multi-pool callers (#20495)

- PutObjectMetadata()
- PutObjectTags()
- DeleteObjectTags()
- TransitionObject()
- RestoreTransitionObject()

Also improve the behavior of multipart code across
pool locks, hold locks only once per upload ID for

- CompleteMultipartUpload()
- AbortMultipartUpload()
- ListObjectParts() (read-lock)
- GetMultipartInfo() (read-lock)
- PutObjectPart() (read-lock)

This avoids lock attempts across pools for no
reason, this increases O(n) when there are n-pools.
This commit is contained in:
Harshavardhana 2024-09-29 15:40:36 -07:00 committed by GitHub
parent e8b457e8a6
commit 6186d11761
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 134 additions and 70 deletions

View File

@ -577,19 +577,9 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
return pi, toObjectErr(errInvalidArgument) return pi, toObjectErr(errInvalidArgument)
} }
// Read lock for upload id.
// Only held while reading the upload metadata.
uploadIDRLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
rlkctx, err := uploadIDRLock.GetRLock(ctx, globalOperationTimeout)
if err != nil {
return PartInfo{}, err
}
rctx := rlkctx.Context()
defer uploadIDRLock.RUnlock(rlkctx)
uploadIDPath := er.getUploadIDDir(bucket, object, uploadID) uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)
// Validates if upload ID exists. // Validates if upload ID exists.
fi, _, err := er.checkUploadIDExists(rctx, bucket, object, uploadID, true) fi, _, err := er.checkUploadIDExists(ctx, bucket, object, uploadID, true)
if err != nil { if err != nil {
if errors.Is(err, errVolumeNotFound) { if errors.Is(err, errVolumeNotFound) {
return pi, toObjectErr(err, bucket) return pi, toObjectErr(err, bucket)
@ -744,10 +734,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
return pi, toObjectErr(err, minioMetaMultipartBucket, partPath) return pi, toObjectErr(err, minioMetaMultipartBucket, partPath)
} }
// Write lock for this part ID, only hold it if we are planning to read from the // Serialize concurrent part uploads.
// stream avoid any concurrent updates.
//
// Must be held throughout this call.
partIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID, strconv.Itoa(partID))) partIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID, strconv.Itoa(partID)))
plkctx, err := partIDLock.GetLock(ctx, globalOperationTimeout) plkctx, err := partIDLock.GetLock(ctx, globalOperationTimeout)
if err != nil { if err != nil {
@ -801,14 +788,6 @@ func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, u
UploadID: uploadID, UploadID: uploadID,
} }
uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
lkctx, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout)
if err != nil {
return MultipartInfo{}, err
}
ctx = lkctx.Context()
defer uploadIDLock.RUnlock(lkctx)
fi, _, err := er.checkUploadIDExists(ctx, bucket, object, uploadID, false) fi, _, err := er.checkUploadIDExists(ctx, bucket, object, uploadID, false)
if err != nil { if err != nil {
if errors.Is(err, errVolumeNotFound) { if errors.Is(err, errVolumeNotFound) {
@ -888,14 +867,6 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up
auditObjectErasureSet(ctx, "ListObjectParts", object, &er) auditObjectErasureSet(ctx, "ListObjectParts", object, &er)
} }
uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
lkctx, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout)
if err != nil {
return ListPartsInfo{}, err
}
ctx = lkctx.Context()
defer uploadIDLock.RUnlock(lkctx)
fi, _, err := er.checkUploadIDExists(ctx, bucket, object, uploadID, false) fi, _, err := er.checkUploadIDExists(ctx, bucket, object, uploadID, false)
if err != nil { if err != nil {
return result, toObjectErr(err, bucket, object, uploadID) return result, toObjectErr(err, bucket, object, uploadID)
@ -1118,16 +1089,6 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
} }
} }
// Hold write locks to verify uploaded parts, also disallows any
// parallel PutObjectPart() requests.
uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
wlkctx, err := uploadIDLock.GetLock(ctx, globalOperationTimeout)
if err != nil {
return oi, err
}
ctx = wlkctx.Context()
defer uploadIDLock.Unlock(wlkctx)
fi, partsMetadata, err := er.checkUploadIDExists(ctx, bucket, object, uploadID, true) fi, partsMetadata, err := er.checkUploadIDExists(ctx, bucket, object, uploadID, true)
if err != nil { if err != nil {
if errors.Is(err, errVolumeNotFound) { if errors.Is(err, errVolumeNotFound) {
@ -1494,14 +1455,6 @@ func (er erasureObjects) AbortMultipartUpload(ctx context.Context, bucket, objec
auditObjectErasureSet(ctx, "AbortMultipartUpload", object, &er) auditObjectErasureSet(ctx, "AbortMultipartUpload", object, &er)
} }
lk := er.NewNSLock(bucket, pathJoin(object, uploadID))
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
if err != nil {
return err
}
ctx = lkctx.Context()
defer lk.Unlock(lkctx)
// Validates if upload ID exists. // Validates if upload ID exists.
if _, _, err = er.checkUploadIDExists(ctx, bucket, object, uploadID, false); err != nil { if _, _, err = er.checkUploadIDExists(ctx, bucket, object, uploadID, false); err != nil {
if errors.Is(err, errVolumeNotFound) { if errors.Is(err, errVolumeNotFound) {

View File

@ -2192,6 +2192,7 @@ func (er erasureObjects) PutObjectMetadata(ctx context.Context, bucket, object s
// PutObjectTags - replace or add tags to an existing object // PutObjectTags - replace or add tags to an existing object
func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object string, tags string, opts ObjectOptions) (ObjectInfo, error) { func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object string, tags string, opts ObjectOptions) (ObjectInfo, error) {
if !opts.NoLock {
// Lock the object before updating tags. // Lock the object before updating tags.
lk := er.NewNSLock(bucket, object) lk := er.NewNSLock(bucket, object)
lkctx, err := lk.GetLock(ctx, globalOperationTimeout) lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
@ -2200,6 +2201,7 @@ func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object strin
} }
ctx = lkctx.Context() ctx = lkctx.Context()
defer lk.Unlock(lkctx) defer lk.Unlock(lkctx)
}
disks := er.getDisks() disks := er.getDisks()
@ -2310,6 +2312,7 @@ func (er erasureObjects) TransitionObject(ctx context.Context, bucket, object st
return err return err
} }
if !opts.NoLock {
// Acquire write lock before starting to transition the object. // Acquire write lock before starting to transition the object.
lk := er.NewNSLock(bucket, object) lk := er.NewNSLock(bucket, object)
lkctx, err := lk.GetLock(ctx, globalDeleteOperationTimeout) lkctx, err := lk.GetLock(ctx, globalDeleteOperationTimeout)
@ -2318,6 +2321,7 @@ func (er erasureObjects) TransitionObject(ctx context.Context, bucket, object st
} }
ctx = lkctx.Context() ctx = lkctx.Context()
defer lk.Unlock(lkctx) defer lk.Unlock(lkctx)
}
fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, bucket, object, opts, true) fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, bucket, object, opts, true)
if err != nil { if err != nil {

View File

@ -1858,6 +1858,16 @@ func (z *erasureServerPools) PutObjectPart(ctx context.Context, bucket, object,
return PartInfo{}, err return PartInfo{}, err
} }
// Read lock for upload id.
// Only held while reading the upload metadata.
uploadIDRLock := z.NewNSLock(bucket, pathJoin(object, uploadID))
rlkctx, err := uploadIDRLock.GetRLock(ctx, globalOperationTimeout)
if err != nil {
return PartInfo{}, err
}
ctx = rlkctx.Context()
defer uploadIDRLock.RUnlock(rlkctx)
if z.SinglePool() { if z.SinglePool() {
return z.serverPools[0].PutObjectPart(ctx, bucket, object, uploadID, partID, data, opts) return z.serverPools[0].PutObjectPart(ctx, bucket, object, uploadID, partID, data, opts)
} }
@ -1890,9 +1900,18 @@ func (z *erasureServerPools) GetMultipartInfo(ctx context.Context, bucket, objec
return MultipartInfo{}, err return MultipartInfo{}, err
} }
uploadIDLock := z.NewNSLock(bucket, pathJoin(object, uploadID))
lkctx, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout)
if err != nil {
return MultipartInfo{}, err
}
ctx = lkctx.Context()
defer uploadIDLock.RUnlock(lkctx)
if z.SinglePool() { if z.SinglePool() {
return z.serverPools[0].GetMultipartInfo(ctx, bucket, object, uploadID, opts) return z.serverPools[0].GetMultipartInfo(ctx, bucket, object, uploadID, opts)
} }
for idx, pool := range z.serverPools { for idx, pool := range z.serverPools {
if z.IsSuspended(idx) { if z.IsSuspended(idx) {
continue continue
@ -1908,6 +1927,7 @@ func (z *erasureServerPools) GetMultipartInfo(ctx context.Context, bucket, objec
// any other unhandled error return right here. // any other unhandled error return right here.
return MultipartInfo{}, err return MultipartInfo{}, err
} }
return MultipartInfo{}, InvalidUploadID{ return MultipartInfo{}, InvalidUploadID{
Bucket: bucket, Bucket: bucket,
Object: object, Object: object,
@ -1921,9 +1941,18 @@ func (z *erasureServerPools) ListObjectParts(ctx context.Context, bucket, object
return ListPartsInfo{}, err return ListPartsInfo{}, err
} }
uploadIDLock := z.NewNSLock(bucket, pathJoin(object, uploadID))
lkctx, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout)
if err != nil {
return ListPartsInfo{}, err
}
ctx = lkctx.Context()
defer uploadIDLock.RUnlock(lkctx)
if z.SinglePool() { if z.SinglePool() {
return z.serverPools[0].ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxParts, opts) return z.serverPools[0].ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxParts, opts)
} }
for idx, pool := range z.serverPools { for idx, pool := range z.serverPools {
if z.IsSuspended(idx) { if z.IsSuspended(idx) {
continue continue
@ -1937,6 +1966,7 @@ func (z *erasureServerPools) ListObjectParts(ctx context.Context, bucket, object
} }
return ListPartsInfo{}, err return ListPartsInfo{}, err
} }
return ListPartsInfo{}, InvalidUploadID{ return ListPartsInfo{}, InvalidUploadID{
Bucket: bucket, Bucket: bucket,
Object: object, Object: object,
@ -1957,6 +1987,14 @@ func (z *erasureServerPools) AbortMultipartUpload(ctx context.Context, bucket, o
} }
}() }()
lk := z.NewNSLock(bucket, pathJoin(object, uploadID))
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
if err != nil {
return err
}
ctx = lkctx.Context()
defer lk.Unlock(lkctx)
if z.SinglePool() { if z.SinglePool() {
return z.serverPools[0].AbortMultipartUpload(ctx, bucket, object, uploadID, opts) return z.serverPools[0].AbortMultipartUpload(ctx, bucket, object, uploadID, opts)
} }
@ -1995,6 +2033,16 @@ func (z *erasureServerPools) CompleteMultipartUpload(ctx context.Context, bucket
} }
}() }()
// Hold write locks to verify uploaded parts, also disallows any
// parallel PutObjectPart() requests.
uploadIDLock := z.NewNSLock(bucket, pathJoin(object, uploadID))
wlkctx, err := uploadIDLock.GetLock(ctx, globalOperationTimeout)
if err != nil {
return objInfo, err
}
ctx = wlkctx.Context()
defer uploadIDLock.Unlock(wlkctx)
if z.SinglePool() { if z.SinglePool() {
return z.serverPools[0].CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, opts) return z.serverPools[0].CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, opts)
} }
@ -2774,7 +2822,19 @@ func (z *erasureServerPools) PutObjectMetadata(ctx context.Context, bucket, obje
return z.serverPools[0].PutObjectMetadata(ctx, bucket, object, opts) return z.serverPools[0].PutObjectMetadata(ctx, bucket, object, opts)
} }
if !opts.NoLock {
// Lock the object before updating metadata.
lk := z.NewNSLock(bucket, object)
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
if err != nil {
return ObjectInfo{}, err
}
ctx = lkctx.Context()
defer lk.Unlock(lkctx)
}
opts.MetadataChg = true opts.MetadataChg = true
opts.NoLock = true
// We don't know the size here set 1GiB at least. // We don't know the size here set 1GiB at least.
idx, err := z.getPoolIdxExistingWithOpts(ctx, bucket, object, opts) idx, err := z.getPoolIdxExistingWithOpts(ctx, bucket, object, opts)
if err != nil { if err != nil {
@ -2791,7 +2851,19 @@ func (z *erasureServerPools) PutObjectTags(ctx context.Context, bucket, object s
return z.serverPools[0].PutObjectTags(ctx, bucket, object, tags, opts) return z.serverPools[0].PutObjectTags(ctx, bucket, object, tags, opts)
} }
if !opts.NoLock {
// Lock the object before updating tags.
lk := z.NewNSLock(bucket, object)
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
if err != nil {
return ObjectInfo{}, err
}
ctx = lkctx.Context()
defer lk.Unlock(lkctx)
}
opts.MetadataChg = true opts.MetadataChg = true
opts.NoLock = true
// We don't know the size here set 1GiB at least. // We don't know the size here set 1GiB at least.
idx, err := z.getPoolIdxExistingWithOpts(ctx, bucket, object, opts) idx, err := z.getPoolIdxExistingWithOpts(ctx, bucket, object, opts)
@ -2809,8 +2881,19 @@ func (z *erasureServerPools) DeleteObjectTags(ctx context.Context, bucket, objec
return z.serverPools[0].DeleteObjectTags(ctx, bucket, object, opts) return z.serverPools[0].DeleteObjectTags(ctx, bucket, object, opts)
} }
opts.MetadataChg = true if !opts.NoLock {
// Lock the object before deleting tags.
lk := z.NewNSLock(bucket, object)
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
if err != nil {
return ObjectInfo{}, err
}
ctx = lkctx.Context()
defer lk.Unlock(lkctx)
}
opts.MetadataChg = true
opts.NoLock = true
idx, err := z.getPoolIdxExistingWithOpts(ctx, bucket, object, opts) idx, err := z.getPoolIdxExistingWithOpts(ctx, bucket, object, opts)
if err != nil { if err != nil {
return ObjectInfo{}, err return ObjectInfo{}, err
@ -2841,8 +2924,20 @@ func (z *erasureServerPools) TransitionObject(ctx context.Context, bucket, objec
return z.serverPools[0].TransitionObject(ctx, bucket, object, opts) return z.serverPools[0].TransitionObject(ctx, bucket, object, opts)
} }
if !opts.NoLock {
// Acquire write lock before starting to transition the object.
lk := z.NewNSLock(bucket, object)
lkctx, err := lk.GetLock(ctx, globalDeleteOperationTimeout)
if err != nil {
return err
}
ctx = lkctx.Context()
defer lk.Unlock(lkctx)
}
// Avoid transitioning an object from a pool being decommissioned. // Avoid transitioning an object from a pool being decommissioned.
opts.SkipDecommissioned = true opts.SkipDecommissioned = true
opts.NoLock = true
idx, err := z.getPoolIdxExistingWithOpts(ctx, bucket, object, opts) idx, err := z.getPoolIdxExistingWithOpts(ctx, bucket, object, opts)
if err != nil { if err != nil {
return err return err
@ -2858,8 +2953,20 @@ func (z *erasureServerPools) RestoreTransitionedObject(ctx context.Context, buck
return z.serverPools[0].RestoreTransitionedObject(ctx, bucket, object, opts) return z.serverPools[0].RestoreTransitionedObject(ctx, bucket, object, opts)
} }
if !opts.NoLock {
// Acquire write lock before restoring transitioned object
lk := z.NewNSLock(bucket, object)
lkctx, err := lk.GetLock(ctx, globalDeleteOperationTimeout)
if err != nil {
return err
}
ctx = lkctx.Context()
defer lk.Unlock(lkctx)
}
// Avoid restoring object from a pool being decommissioned. // Avoid restoring object from a pool being decommissioned.
opts.SkipDecommissioned = true opts.SkipDecommissioned = true
opts.NoLock = true
idx, err := z.getPoolIdxExistingWithOpts(ctx, bucket, object, opts) idx, err := z.getPoolIdxExistingWithOpts(ctx, bucket, object, opts)
if err != nil { if err != nil {
return err return err

View File

@ -229,6 +229,7 @@ type localLockInstance struct {
// path. The returned lockInstance object encapsulates the nsLockMap, // path. The returned lockInstance object encapsulates the nsLockMap,
// volume, path and operation ID. // volume, path and operation ID.
func (n *nsLockMap) NewNSLock(lockers func() ([]dsync.NetLocker, string), volume string, paths ...string) RWLocker { func (n *nsLockMap) NewNSLock(lockers func() ([]dsync.NetLocker, string), volume string, paths ...string) RWLocker {
sort.Strings(paths)
opsID := mustGetUUID() opsID := mustGetUUID()
if n.isDistErasure { if n.isDistErasure {
drwmutex := dsync.NewDRWMutex(&dsync.Dsync{ drwmutex := dsync.NewDRWMutex(&dsync.Dsync{
@ -237,7 +238,6 @@ func (n *nsLockMap) NewNSLock(lockers func() ([]dsync.NetLocker, string), volume
}, pathsJoinPrefix(volume, paths...)...) }, pathsJoinPrefix(volume, paths...)...)
return &distLockInstance{drwmutex, opsID} return &distLockInstance{drwmutex, opsID}
} }
sort.Strings(paths)
return &localLockInstance{n, volume, paths, opsID} return &localLockInstance{n, volume, paths, opsID}
} }

View File

@ -26,8 +26,8 @@ cleanup
export MINIO_CI_CD=1 export MINIO_CI_CD=1
export MINIO_BROWSER=off export MINIO_BROWSER=off
export MINIO_ROOT_USER="minio"
export MINIO_ROOT_PASSWORD="minio123" make install-race
# Start MinIO instances # Start MinIO instances
echo -n "Starting MinIO instances ..." echo -n "Starting MinIO instances ..."
@ -48,8 +48,8 @@ if [ ! -f ./mc ]; then
chmod +x mc chmod +x mc
fi fi
export MC_HOST_sitea=http://minio:minio123@127.0.0.1:9001 export MC_HOST_sitea=http://minioadmin:minioadmin@127.0.0.1:9001
export MC_HOST_siteb=http://minio:minio123@127.0.0.1:9004 export MC_HOST_siteb=http://minioadmin:minioadmin@127.0.0.1:9004
./mc ready sitea ./mc ready sitea
./mc ready siteb ./mc ready siteb
@ -65,7 +65,7 @@ export MC_HOST_siteb=http://minio:minio123@127.0.0.1:9004
# Run the test to make sure proxying of DEL marker doesn't happen # Run the test to make sure proxying of DEL marker doesn't happen
loop_count=0 loop_count=0
while true; do while true; do
if [ $loop_count -eq 100 ]; then if [ $loop_count -eq 1000 ]; then
break break
fi fi
echo "Hello World" | ./mc pipe sitea/bucket/obj$loop_count echo "Hello World" | ./mc pipe sitea/bucket/obj$loop_count