Avoid notification event for replicas (#11683)

Creating notification events for replica creation
is not particularly useful to send as the notification
event generated at source already includes replication
completion events.

For applications using replica cluster as failover, avoiding
duplicate notifications for replica event will allow seamless
failover.
This commit is contained in:
Poorna Krishnamoorthy
2021-03-03 11:13:31 -08:00
committed by GitHub
parent 039f59b552
commit 690434514d
6 changed files with 28 additions and 8 deletions

View File

@@ -268,6 +268,7 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectVersionInfo, objectA
ReplicationDeleteMarker: dobj.DeleteMarkerVersionID != "",
ReplicationMTime: dobj.DeleteMarkerMTime.Time,
ReplicationStatus: miniogo.ReplicationStatusReplica,
ReplicationRequest: true, // always set this to distinguish between `mc mirror` replication and serverside
},
})
@@ -412,10 +413,11 @@ func putReplicationOpts(ctx context.Context, dest replication.Destination, objIn
ContentEncoding: objInfo.ContentEncoding,
StorageClass: sc,
Internal: miniogo.AdvancedPutOptions{
SourceVersionID: objInfo.VersionID,
ReplicationStatus: miniogo.ReplicationStatusReplica,
SourceMTime: objInfo.ModTime,
SourceETag: objInfo.ETag,
SourceVersionID: objInfo.VersionID,
ReplicationStatus: miniogo.ReplicationStatusReplica,
SourceMTime: objInfo.ModTime,
SourceETag: objInfo.ETag,
ReplicationRequest: true, // always set this to distinguish between `mc mirror` replication and serverside
},
}
if objInfo.UserTags != "" {
@@ -653,7 +655,11 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa
Bucket: dest.Bucket,
Object: object,
VersionID: objInfo.VersionID}
dstOpts := miniogo.PutObjectOptions{Internal: miniogo.AdvancedPutOptions{SourceVersionID: objInfo.VersionID}}
dstOpts := miniogo.PutObjectOptions{
Internal: miniogo.AdvancedPutOptions{
SourceVersionID: objInfo.VersionID,
ReplicationRequest: true, // always set this to distinguish between `mc mirror` replication and serverside
}}
if _, err = c.CopyObject(ctx, dest.Bucket, object, dest.Bucket, object, getCopyObjMetadata(objInfo, dest), srcOpts, dstOpts); err != nil {
replicationStatus = replication.Failed
logger.LogIf(ctx, fmt.Errorf("Unable to replicate metadata for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err))

View File

@@ -234,12 +234,16 @@ func extractReqParams(r *http.Request) map[string]string {
cred := getReqAccessCred(r, region)
// Success.
return map[string]string{
m := map[string]string{
"region": region,
"accessKey": cred.AccessKey,
"sourceIPAddress": handlers.GetSourceIP(r),
// Add more fields here.
}
if _, ok := r.Header[xhttp.MinIOSourceReplicationRequest]; ok {
m[xhttp.MinIOSourceReplicationRequest] = ""
}
return m
}
// Extract response elements to be sent with event notifiation.

View File

@@ -165,6 +165,8 @@ const (
MinIODeleteMarkerReplicationStatus = "X-Minio-Replication-DeleteMarker-Status"
// Header indicates if its a GET/HEAD proxy request for active-active replication
MinIOSourceProxyRequest = "X-Minio-Source-Proxy-Request"
// Header indicates that this request is a replication request to create a REPLICA
MinIOSourceReplicationRequest = "X-Minio-Source-Replication-Request"
)
// Common http query params S3 API

View File

@@ -33,6 +33,7 @@ import (
"github.com/klauspost/compress/zip"
"github.com/minio/minio-go/v7/pkg/set"
"github.com/minio/minio/cmd/crypto"
xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger"
bandwidth "github.com/minio/minio/pkg/bandwidth"
bucketBandwidth "github.com/minio/minio/pkg/bucket/bandwidth"
@@ -1387,6 +1388,10 @@ func (args eventArgs) ToEvent(escape bool) event.Event {
func sendEvent(args eventArgs) {
args.Object.Size, _ = args.Object.GetActualSize()
// avoid generating a notification for REPLICA creation event.
if _, ok := args.ReqParams[xhttp.MinIOSourceReplicationRequest]; ok {
return
}
// remove sensitive encryption entries in metadata.
crypto.RemoveSensitiveEntries(args.Object.UserDefined)
crypto.RemoveInternalEntries(args.Object.UserDefined)