Revert "Revert "Add delete marker replication support (#10396)""

This reverts commit 267d7bf0a9.
This commit is contained in:
Harshavardhana
2020-11-19 18:43:58 -08:00
parent b9e3a8b5ac
commit 9a34fd5c4a
25 changed files with 950 additions and 222 deletions

View File

@@ -83,7 +83,7 @@ func mustReplicateWeb(ctx context.Context, r *http.Request, bucket, object strin
if permErr != ErrNone {
return false
}
return mustReplicater(ctx, r, bucket, object, meta, replStatus)
return mustReplicater(ctx, bucket, object, meta, replStatus)
}
// mustReplicate returns true if object meets replication criteria.
@@ -91,11 +91,11 @@ func mustReplicate(ctx context.Context, r *http.Request, bucket, object string,
if s3Err := isPutActionAllowed(ctx, getRequestAuthType(r), bucket, "", r, iampolicy.GetReplicationConfigurationAction); s3Err != ErrNone {
return false
}
return mustReplicater(ctx, r, bucket, object, meta, replStatus)
return mustReplicater(ctx, bucket, object, meta, replStatus)
}
// mustReplicater returns true if object meets replication criteria.
func mustReplicater(ctx context.Context, r *http.Request, bucket, object string, meta map[string]string, replStatus string) bool {
func mustReplicater(ctx context.Context, bucket, object string, meta map[string]string, replStatus string) bool {
if globalIsGateway {
return false
}
@@ -120,6 +120,127 @@ func mustReplicater(ctx context.Context, r *http.Request, bucket, object string,
return cfg.Replicate(opts)
}
// returns true if any of the objects being deleted qualifies for replication.
func hasReplicationRules(ctx context.Context, bucket string, objects []ObjectToDelete) bool {
c, err := getReplicationConfig(ctx, bucket)
if err != nil || c == nil {
return false
}
for _, obj := range objects {
if c.HasActiveRules(obj.ObjectName, true) {
return true
}
}
return false
}
// returns whether object version is a deletemarker and if object qualifies for replication
func checkReplicateDelete(ctx context.Context, getObjectInfoFn GetObjectInfoFn, bucket string, dobj ObjectToDelete) (dm, replicate bool) {
rcfg, err := getReplicationConfig(ctx, bucket)
if err != nil || rcfg == nil {
return false, false
}
oi, err := getObjectInfoFn(ctx, bucket, dobj.ObjectName, ObjectOptions{VersionID: dobj.VersionID})
// when incoming delete is removal of a delete marker( a.k.a versioned delete),
// GetObjectInfo returns extra information even though it returns errFileNotFound
if err != nil {
validReplStatus := false
switch oi.ReplicationStatus {
case replication.Pending, replication.Complete, replication.Failed:
validReplStatus = true
}
if oi.DeleteMarker && validReplStatus {
return oi.DeleteMarker, true
}
return oi.DeleteMarker, false
}
opts := replication.ObjectOpts{
Name: dobj.ObjectName,
SSEC: crypto.SSEC.IsEncrypted(oi.UserDefined),
UserTags: oi.UserTags,
DeleteMarker: true,
VersionID: dobj.VersionID,
}
return oi.DeleteMarker, rcfg.Replicate(opts)
}
// replicate deletes to the designated replication target if replication configuration
// has delete marker replication or delete replication (MinIO extension to allow deletes where version id
// is specified) enabled.
// Similar to bucket replication for PUT operation, soft delete (a.k.a setting delete marker) and
// permanent deletes (by specifying a version ID in the delete operation) have three states "Pending", "Complete"
// and "Failed" to mark the status of the replication of "DELETE" operation. All failed operations can
// then be retried by healing. In the case of permanent deletes, until the replication is completed on the
// target cluster, the object version is marked deleted on the source and hidden from listing. It is permanently
// deleted from the source when the VersionPurgeStatus changes to "Complete", i.e after replication succeeds
// on target.
func replicateDelete(ctx context.Context, dobj DeletedObjectVersionInfo, objectAPI ObjectLayer) {
bucket := dobj.Bucket
rcfg, err := getReplicationConfig(ctx, bucket)
if err != nil || rcfg == nil {
return
}
tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, rcfg.RoleArn)
if tgt == nil {
return
}
versionID := dobj.DeleteMarkerVersionID
if versionID == "" {
versionID = dobj.VersionID
}
rmErr := tgt.RemoveObject(ctx, rcfg.GetDestination().Bucket, dobj.ObjectName, miniogo.RemoveObjectOptions{
VersionID: versionID,
Internal: miniogo.AdvancedRemoveOptions{
ReplicationDeleteMarker: dobj.DeleteMarkerVersionID != "",
ReplicationMTime: dobj.DeleteMarkerMTime,
ReplicationStatus: miniogo.ReplicationStatusReplica,
},
})
replicationStatus := dobj.DeleteMarkerReplicationStatus
versionPurgeStatus := dobj.VersionPurgeStatus
if rmErr != nil {
if dobj.VersionID == "" {
replicationStatus = string(replication.Failed)
} else {
versionPurgeStatus = Failed
}
} else {
if dobj.VersionID == "" {
replicationStatus = string(replication.Complete)
} else {
versionPurgeStatus = Complete
}
}
if replicationStatus == string(replication.Failed) || versionPurgeStatus == Failed {
objInfo := ObjectInfo{
Name: dobj.ObjectName,
DeleteMarker: dobj.DeleteMarker,
VersionID: versionID,
ReplicationStatus: replication.StatusType(dobj.DeleteMarkerReplicationStatus),
}
eventArg := &eventArgs{
BucketName: bucket,
Object: objInfo,
Host: "Internal: [Replication]",
EventName: event.ObjectReplicationFailed,
}
sendEvent(*eventArg)
}
// Update metadata on the delete marker or purge permanent delete if replication success.
if _, err = objectAPI.DeleteObject(ctx, bucket, dobj.ObjectName, ObjectOptions{
VersionID: versionID,
DeleteMarker: dobj.DeleteMarker,
DeleteMarkerReplicationStatus: replicationStatus,
Versioned: globalBucketVersioningSys.Enabled(bucket),
VersionPurgeStatus: versionPurgeStatus,
VersionSuspended: globalBucketVersioningSys.Suspended(bucket),
}); err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s %s: %w", bucket, dobj.ObjectName, dobj.VersionID, err))
}
}
func putReplicationOpts(ctx context.Context, dest replication.Destination, objInfo ObjectInfo) (putOpts miniogo.PutObjectOptions) {
meta := make(map[string]string)
for k, v := range objInfo.UserDefined {
@@ -304,18 +425,37 @@ func filterReplicationStatusMetadata(metadata map[string]string) map[string]stri
return dst
}
// DeletedObjectVersionInfo has info on deleted object
type DeletedObjectVersionInfo struct {
DeletedObject
Bucket string
}
type replicationState struct {
// add future metrics here
replicaCh chan ObjectInfo
replicaCh chan ObjectInfo
replicaDeleteCh chan DeletedObjectVersionInfo
}
func (r *replicationState) queueReplicaTask(oi ObjectInfo) {
if r == nil {
return
}
select {
case r.replicaCh <- oi:
default:
}
}
func (r *replicationState) queueReplicaDeleteTask(doi DeletedObjectVersionInfo) {
if r == nil {
return
}
select {
case r.replicaDeleteCh <- doi:
default:
}
}
var (
globalReplicationState *replicationState
// TODO: currently keeping it conservative
@@ -332,11 +472,13 @@ func newReplicationState() *replicationState {
globalReplicationConcurrent = 1
}
rs := &replicationState{
replicaCh: make(chan ObjectInfo, 10000),
replicaCh: make(chan ObjectInfo, 10000),
replicaDeleteCh: make(chan DeletedObjectVersionInfo, 10000),
}
go func() {
<-GlobalContext.Done()
close(rs.replicaCh)
close(rs.replicaDeleteCh)
}()
return rs
}
@@ -354,6 +496,11 @@ func (r *replicationState) addWorker(ctx context.Context, objectAPI ObjectLayer)
return
}
replicateObject(ctx, oi, objectAPI)
case doi, ok := <-r.replicaDeleteCh:
if !ok {
return
}
replicateDelete(ctx, doi, objectAPI)
}
}
}()