Update ReplicationStatus if metadata not updated correctly (#12191)

There can be situations where replication completed but the
`X-Amz-Replication-Status` metadata update failed such as
when the server returns 503 under high load. This object version will
continue to be picked up by the scanner and replicateObject would perform
no action since the versions match between source and target.
The metadata would never reflect that replication was successful
without this fix, leading to repeated re-queuing.
This commit is contained in:
Poorna Krishnamoorthy 2021-04-29 16:46:26 -07:00 committed by GitHub
parent c4b21ac7fa
commit 90112b5644
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -646,6 +646,33 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
if rtype == replicateNone {
// object with same VersionID already exists, replication kicked off by
// PutObject might have completed
if objInfo.ReplicationStatus == replication.Pending || objInfo.ReplicationStatus == replication.Failed {
// if metadata is not updated for some reason after replication, such as 503 encountered while updating metadata - make sure
// to set ReplicationStatus as Completed.Note that replication Stats would have been updated despite metadata update failure.
z, ok := objectAPI.(*erasureServerPools)
if !ok {
return
}
// This lower level implementation is necessary to avoid write locks from CopyObject.
poolIdx, err := z.getPoolIdx(ctx, bucket, object, objInfo.Size)
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err))
} else {
fi := FileInfo{}
fi.VersionID = objInfo.VersionID
fi.Metadata = make(map[string]string, len(objInfo.UserDefined))
for k, v := range objInfo.UserDefined {
fi.Metadata[k] = v
}
fi.Metadata[xhttp.AmzBucketReplicationStatus] = replication.Completed.String()
if objInfo.UserTags != "" {
fi.Metadata[xhttp.AmzObjectTagging] = objInfo.UserTags
}
if err = z.serverPools[poolIdx].getHashedSet(object).updateObjectMeta(ctx, bucket, object, fi); err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err))
}
}
}
return
}
}