Add support for syncing replica modifications (#11104)

when bidirectional replication is set up.

If ReplicaModifications is enabled in the replication
configuration, sync metadata updates to source if
replication rules are met. By default, if this
configuration is unset, MinIO automatically sync's
metadata updates on replica back to the source.
This commit is contained in:
Poorna Krishnamoorthy
2021-05-13 19:20:45 -07:00
committed by GitHub
parent 397391c89f
commit 951acf561c
8 changed files with 230 additions and 31 deletions

View File

@@ -1285,7 +1285,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
if rs := r.Header.Get(xhttp.AmzBucketReplicationStatus); rs != "" {
srcInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = rs
}
if ok, _ := mustReplicate(ctx, r, dstBucket, dstObject, srcInfo.UserDefined, srcInfo.ReplicationStatus.String()); ok {
if ok, _ := mustReplicate(ctx, r, dstBucket, dstObject, srcInfo.UserDefined, srcInfo.ReplicationStatus.String(), srcInfo.metadataOnly); ok {
srcInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = replication.Pending.String()
}
// Store the preserved compression metadata.
@@ -1387,7 +1387,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
objInfo.ETag = getDecryptedETag(r.Header, objInfo, false)
response := generateCopyObjectResponse(objInfo.ETag, objInfo.ModTime)
encodedSuccessResponse := encodeResponse(response)
if replicate, sync := mustReplicate(ctx, r, dstBucket, dstObject, objInfo.UserDefined, objInfo.ReplicationStatus.String()); replicate {
if replicate, sync := mustReplicate(ctx, r, dstBucket, dstObject, objInfo.UserDefined, objInfo.ReplicationStatus.String(), objInfo.metadataOnly); replicate {
scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync, replication.ObjectReplicationType)
}
@@ -1634,7 +1634,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL, guessIsBrowserReq(r))
return
}
if ok, _ := mustReplicate(ctx, r, bucket, object, metadata, ""); ok {
if ok, _ := mustReplicate(ctx, r, bucket, object, metadata, "", false); ok {
metadata[xhttp.AmzBucketReplicationStatus] = replication.Pending.String()
}
if r.Header.Get(xhttp.AmzBucketReplicationStatus) == replication.Replica.String() {
@@ -1721,7 +1721,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
objInfo.ETag = objInfo.ETag + "-1"
}
}
if replicate, sync := mustReplicate(ctx, r, bucket, object, metadata, ""); replicate {
if replicate, sync := mustReplicate(ctx, r, bucket, object, metadata, "", false); replicate {
scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync, replication.ObjectReplicationType)
}
@@ -1960,7 +1960,7 @@ func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *h
return
}
if ok, _ := mustReplicate(ctx, r, bucket, object, metadata, ""); ok {
if ok, _ := mustReplicate(ctx, r, bucket, object, metadata, "", false); ok {
metadata[xhttp.AmzBucketReplicationStatus] = replication.Pending.String()
}
@@ -2016,8 +2016,9 @@ func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *h
return
}
if replicate, sync := mustReplicate(ctx, r, bucket, object, metadata, ""); replicate {
if replicate, sync := mustReplicate(ctx, r, bucket, object, metadata, "", false); replicate {
scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync, replication.ObjectReplicationType)
}
}
@@ -2130,7 +2131,7 @@ func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL, guessIsBrowserReq(r))
return
}
if ok, _ := mustReplicate(ctx, r, bucket, object, metadata, ""); ok {
if ok, _ := mustReplicate(ctx, r, bucket, object, metadata, "", false); ok {
metadata[xhttp.AmzBucketReplicationStatus] = replication.Pending.String()
}
// We need to preserve the encryption headers set in EncryptRequest,
@@ -3120,7 +3121,7 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
}
setPutObjHeaders(w, objInfo, false)
if replicate, sync := mustReplicate(ctx, r, bucket, object, objInfo.UserDefined, objInfo.ReplicationStatus.String()); replicate {
if replicate, sync := mustReplicate(ctx, r, bucket, object, objInfo.UserDefined, objInfo.ReplicationStatus.String(), false); replicate {
scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync, replication.ObjectReplicationType)
}
@@ -3381,7 +3382,7 @@ func (api objectAPIHandlers) PutObjectLegalHoldHandler(w http.ResponseWriter, r
return
}
objInfo.UserDefined[strings.ToLower(xhttp.AmzObjectLockLegalHold)] = strings.ToUpper(string(legalHold.Status))
replicate, sync := mustReplicate(ctx, r, bucket, object, objInfo.UserDefined, "")
replicate, sync := mustReplicate(ctx, r, bucket, object, objInfo.UserDefined, objInfo.ReplicationStatus.String(), true)
if replicate {
objInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = replication.Pending.String()
}
@@ -3560,7 +3561,7 @@ func (api objectAPIHandlers) PutObjectRetentionHandler(w http.ResponseWriter, r
objInfo.UserDefined[strings.ToLower(xhttp.AmzObjectLockMode)] = ""
objInfo.UserDefined[strings.ToLower(xhttp.AmzObjectLockRetainUntilDate)] = ""
}
replicate, sync := mustReplicate(ctx, r, bucket, object, objInfo.UserDefined, "")
replicate, sync := mustReplicate(ctx, r, bucket, object, objInfo.UserDefined, objInfo.ReplicationStatus.String(), true)
if replicate {
objInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = replication.Pending.String()
}
@@ -3752,7 +3753,12 @@ func (api objectAPIHandlers) PutObjectTaggingHandler(w http.ResponseWriter, r *h
return
}
replicate, sync := mustReplicate(ctx, r, bucket, object, map[string]string{xhttp.AmzObjectTagging: tags.String()}, "")
objInfo, err := objAPI.GetObjectInfo(ctx, bucket, object, opts)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
replicate, sync := mustReplicate(ctx, r, bucket, object, map[string]string{xhttp.AmzObjectTagging: tags.String()}, objInfo.ReplicationStatus.String(), true)
if replicate {
opts.UserDefined = make(map[string]string)
opts.UserDefined[xhttp.AmzBucketReplicationStatus] = replication.Pending.String()
@@ -3761,7 +3767,7 @@ func (api objectAPIHandlers) PutObjectTaggingHandler(w http.ResponseWriter, r *h
tagsStr := tags.String()
// Put object tags
objInfo, err := objAPI.PutObjectTags(ctx, bucket, object, tagsStr, opts)
objInfo, err = objAPI.PutObjectTags(ctx, bucket, object, tagsStr, opts)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
@@ -3829,7 +3835,7 @@ func (api objectAPIHandlers) DeleteObjectTaggingHandler(w http.ResponseWriter, r
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
replicate, sync := mustReplicate(ctx, r, bucket, object, map[string]string{xhttp.AmzObjectTagging: oi.UserTags}, "")
replicate, sync := mustReplicate(ctx, r, bucket, object, map[string]string{xhttp.AmzObjectTagging: oi.UserTags}, oi.ReplicationStatus.String(), true)
if replicate {
opts.UserDefined = make(map[string]string)
opts.UserDefined[xhttp.AmzBucketReplicationStatus] = replication.Pending.String()