Allow null versions to be replicated (#13310)

for pre-existing objects present in a bucket
prior to enabling existing object replication.

Co-authored-by: Poorna Krishnamoorthy <poorna@minio.io>
This commit is contained in:
Poorna Krishnamoorthy
2021-09-28 13:26:12 -04:00
committed by GitHub
parent 38027c8f52
commit 7f6ed35347
5 changed files with 22 additions and 7 deletions

View File

@@ -732,7 +732,12 @@ func equals(k1 string, keys ...string) bool {
}
// returns replicationAction by comparing metadata between source and target
func getReplicationAction(oi1 ObjectInfo, oi2 minio.ObjectInfo) replicationAction {
func getReplicationAction(oi1 ObjectInfo, oi2 minio.ObjectInfo, opType replication.Type) replicationAction {
// Avoid resyncing null versions created prior to enabling replication if target has a newer copy
if opType == replication.ExistingObjectReplicationType &&
oi1.ModTime.Unix() > oi2.LastModified.Unix() && oi1.VersionID == nullVersionID {
return replicateNone
}
// needs full replication
if oi1.ETag != oi2.ETag ||
oi1.VersionID != oi2.VersionID ||
@@ -1053,9 +1058,19 @@ func replicateObjectToTarget(ctx context.Context, ri ReplicateObjectInfo, object
ReplicationProxyRequest: "false",
}})
if cerr == nil {
rAction = getReplicationAction(objInfo, oi)
rAction = getReplicationAction(objInfo, oi, ri.OpType)
rinfo.ReplicationStatus = replication.Completed
if rAction == replicateNone {
if ri.OpType == replication.ExistingObjectReplicationType &&
objInfo.ModTime.Unix() > oi.LastModified.Unix() && objInfo.VersionID == nullVersionID {
logger.LogIf(ctx, fmt.Errorf("Unable to replicate %s/%s (null). Newer version exists on target", bucket, object))
sendEvent(eventArgs{
EventName: event.ObjectReplicationNotTracked,
BucketName: bucket,
Object: objInfo,
Host: "Internal: [Replication]",
})
}
// object with same VersionID already exists, replication kicked off by
// PutObject might have completed
if objInfo.TargetReplicationStatus(tgt.ARN) == replication.Pending || objInfo.TargetReplicationStatus(tgt.ARN) == replication.Failed || ri.OpType == replication.ExistingObjectReplicationType {
@@ -1651,7 +1666,7 @@ func (c replicationConfig) Resync(ctx context.Context, oi ObjectInfo, dsc *Repli
return
}
// existing object replication does not apply to un-versioned objects
if oi.VersionID == "" || oi.VersionID == nullVersionID {
if oi.VersionID == "" {
return
}