From 63f3e5c3fcee6f5bc35636d16b6ff587e9115d50 Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Mon, 23 Aug 2021 17:16:18 +0200 Subject: [PATCH] replication: Lock object while replicating (#13014) Introduce a replication lock that will ensure that only one replication operation will run for any given object at any time. Fixes #13013 --- cmd/bucket-replication.go | 42 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 41 insertions(+), 1 deletion(-) diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index afaa5a844..a50019bb3 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -28,7 +28,7 @@ import ( "time" "github.com/minio/madmin-go" - minio "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7" miniogo "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/encrypt" "github.com/minio/minio-go/v7/pkg/tags" @@ -316,6 +316,28 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj return } + // Lock the object name before starting replication operation. + // Use separate lock that doesn't collide with regular objects. + lk := objectAPI.NewNSLock(bucket, "/[replicate]/"+dobj.ObjectName) + lkctx, err := lk.GetLock(ctx, globalOperationTimeout) + if err != nil { + logger.LogIf(ctx, fmt.Errorf("failed to get lock for object: %s bucket:%s arn:%s", dobj.ObjectName, bucket, rcfg.RoleArn)) + sendEvent(eventArgs{ + BucketName: bucket, + Object: ObjectInfo{ + Bucket: bucket, + Name: dobj.ObjectName, + VersionID: versionID, + DeleteMarker: dobj.DeleteMarker, + }, + Host: "Internal: [Replication]", + EventName: event.ObjectReplicationNotTracked, + }) + return + } + ctx = lkctx.Context() + defer lk.Unlock(lkctx.Cancel) + rmErr := tgt.RemoveObject(ctx, rcfg.GetDestination().Bucket, dobj.ObjectName, miniogo.RemoveObjectOptions{ VersionID: versionID, Internal: miniogo.AdvancedRemoveOptions{ @@ -669,6 +691,24 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje }) return } + + // Lock the object name before starting replication. + // Use separate lock that doesn't collide with regular objects. + lk := objectAPI.NewNSLock(bucket, "/[replicate]/"+object) + lkctx, err := lk.GetLock(ctx, globalOperationTimeout) + if err != nil { + sendEvent(eventArgs{ + EventName: event.ObjectReplicationNotTracked, + BucketName: bucket, + Object: objInfo, + Host: "Internal: [Replication]", + }) + logger.LogIf(ctx, fmt.Errorf("failed to get lock for object: %s bucket:%s arn:%s", object, bucket, cfg.RoleArn)) + return + } + ctx = lkctx.Context() + defer lk.Unlock(lkctx.Cancel) + var closeOnDefer bool gr, err := objectAPI.GetObjectNInfo(ctx, bucket, object, nil, http.Header{}, readLock, ObjectOptions{ VersionID: objInfo.VersionID,