diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index afaa5a844..a50019bb3 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -28,7 +28,7 @@ import ( "time" "github.com/minio/madmin-go" - minio "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7" miniogo "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/encrypt" "github.com/minio/minio-go/v7/pkg/tags" @@ -316,6 +316,28 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj return } + // Lock the object name before starting replication operation. + // Use separate lock that doesn't collide with regular objects. + lk := objectAPI.NewNSLock(bucket, "/[replicate]/"+dobj.ObjectName) + lkctx, err := lk.GetLock(ctx, globalOperationTimeout) + if err != nil { + logger.LogIf(ctx, fmt.Errorf("failed to get lock for object: %s bucket:%s arn:%s", dobj.ObjectName, bucket, rcfg.RoleArn)) + sendEvent(eventArgs{ + BucketName: bucket, + Object: ObjectInfo{ + Bucket: bucket, + Name: dobj.ObjectName, + VersionID: versionID, + DeleteMarker: dobj.DeleteMarker, + }, + Host: "Internal: [Replication]", + EventName: event.ObjectReplicationNotTracked, + }) + return + } + ctx = lkctx.Context() + defer lk.Unlock(lkctx.Cancel) + rmErr := tgt.RemoveObject(ctx, rcfg.GetDestination().Bucket, dobj.ObjectName, miniogo.RemoveObjectOptions{ VersionID: versionID, Internal: miniogo.AdvancedRemoveOptions{ @@ -669,6 +691,24 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje }) return } + + // Lock the object name before starting replication. + // Use separate lock that doesn't collide with regular objects. + lk := objectAPI.NewNSLock(bucket, "/[replicate]/"+object) + lkctx, err := lk.GetLock(ctx, globalOperationTimeout) + if err != nil { + sendEvent(eventArgs{ + EventName: event.ObjectReplicationNotTracked, + BucketName: bucket, + Object: objInfo, + Host: "Internal: [Replication]", + }) + logger.LogIf(ctx, fmt.Errorf("failed to get lock for object: %s bucket:%s arn:%s", object, bucket, cfg.RoleArn)) + return + } + ctx = lkctx.Context() + defer lk.Unlock(lkctx.Cancel) + var closeOnDefer bool gr, err := objectAPI.GetObjectNInfo(ctx, bucket, object, nil, http.Header{}, readLock, ObjectOptions{ VersionID: objInfo.VersionID,