From 21bf5b4db73731a592ff990550385d531ed424c6 Mon Sep 17 00:00:00 2001 From: Poorna Date: Tue, 9 Aug 2022 15:00:24 -0700 Subject: [PATCH] replication: heal proactively upon access (#15501) Queue failed/pending replication for healing during listing and GET/HEAD API calls. This includes healing of existing objects that were never replicated or those in the middle of a resync operation. This PR also fixes a bug in ListObjectVersions where lifecycle filtering should be done. --- cmd/bucket-replication.go | 87 ++++++++++++++++++++++++++++++++++++ cmd/data-scanner.go | 66 +++------------------------ cmd/erasure-server-pool.go | 35 ++++++--------- cmd/erasure-single-drive.go | 33 ++++++-------- cmd/metacache-server-pool.go | 45 ++++++++++--------- cmd/metacache-set.go | 17 +++++++ cmd/object-handlers.go | 19 +++++--- 7 files changed, 173 insertions(+), 129 deletions(-) diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index f6393ab9f..296413ea1 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -2386,3 +2386,90 @@ func getReplicationDiff(ctx context.Context, objAPI ObjectLayer, bucket string, }() return diffCh, nil } + +// QueueReplicationHeal is a wrapper for queueReplicationHeal +func QueueReplicationHeal(ctx context.Context, bucket string, oi ObjectInfo) { + // un-versioned case + if oi.VersionID == "" { + return + } + rcfg, _, _ := globalBucketMetadataSys.GetReplicationConfig(ctx, bucket) + tgts, _ := globalBucketTargetSys.ListBucketTargets(ctx, bucket) + queueReplicationHeal(ctx, bucket, oi, replicationConfig{ + Config: rcfg, + remotes: tgts, + }) +} + +// queueReplicationHeal enqueues objects that failed replication OR eligible for resyncing through +// an ongoing resync operation or via existing objects replication configuration setting. +func queueReplicationHeal(ctx context.Context, bucket string, oi ObjectInfo, rcfg replicationConfig) (roi ReplicateObjectInfo) { + // un-versioned case + if oi.VersionID == "" { + return roi + } + + if rcfg.Config == nil || rcfg.remotes == nil { + return roi + } + roi = getHealReplicateObjectInfo(oi, rcfg) + if !roi.Dsc.ReplicateAny() { + return + } + // early return if replication already done, otherwise we need to determine if this + // version is an existing object that needs healing. + if oi.ReplicationStatus == replication.Completed && oi.VersionPurgeStatus.Empty() && !roi.ExistingObjResync.mustResync() { + return + } + + if roi.DeleteMarker || !roi.VersionPurgeStatus.Empty() { + versionID := "" + dmVersionID := "" + if roi.VersionPurgeStatus.Empty() { + dmVersionID = roi.VersionID + } else { + versionID = roi.VersionID + } + + dv := DeletedObjectReplicationInfo{ + DeletedObject: DeletedObject{ + ObjectName: roi.Name, + DeleteMarkerVersionID: dmVersionID, + VersionID: versionID, + ReplicationState: roi.getReplicationState(roi.Dsc.String(), versionID, true), + DeleteMarkerMTime: DeleteMarkerMTime{roi.ModTime}, + DeleteMarker: roi.DeleteMarker, + }, + Bucket: roi.Bucket, + OpType: replication.HealReplicationType, + EventType: ReplicateHealDelete, + } + // heal delete marker replication failure or versioned delete replication failure + if roi.ReplicationStatus == replication.Pending || + roi.ReplicationStatus == replication.Failed || + roi.VersionPurgeStatus == Failed || roi.VersionPurgeStatus == Pending { + globalReplicationPool.queueReplicaDeleteTask(dv) + return + } + // if replication status is Complete on DeleteMarker and existing object resync required + if roi.ExistingObjResync.mustResync() && (roi.ReplicationStatus == replication.Completed || roi.ReplicationStatus.Empty()) { + queueReplicateDeletesWrapper(dv, roi.ExistingObjResync) + return + } + return + } + if roi.ExistingObjResync.mustResync() { + roi.OpType = replication.ExistingObjectReplicationType + } + switch roi.ReplicationStatus { + case replication.Pending, replication.Failed: + roi.EventType = ReplicateHeal + globalReplicationPool.queueReplicaTask(roi) + return + } + if roi.ExistingObjResync.mustResync() { + roi.EventType = ReplicateExisting + globalReplicationPool.queueReplicaTask(roi) + } + return +} diff --git a/cmd/data-scanner.go b/cmd/data-scanner.go index 23dde8266..b95d62f80 100644 --- a/cmd/data-scanner.go +++ b/cmd/data-scanner.go @@ -1225,29 +1225,16 @@ func (i *scannerItem) objectPath() string { // healReplication will heal a scanned item that has failed replication. func (i *scannerItem) healReplication(ctx context.Context, o ObjectLayer, oi ObjectInfo, sizeS *sizeSummary) { - roi := getHealReplicateObjectInfo(oi, i.replication) - if !roi.Dsc.ReplicateAny() { + if oi.VersionID == "" { return } - + if i.replication.Config == nil { + return + } + roi := queueReplicationHeal(ctx, oi.Bucket, oi, i.replication) if oi.DeleteMarker || !oi.VersionPurgeStatus.Empty() { - // heal delete marker replication failure or versioned delete replication failure - if oi.ReplicationStatus == replication.Pending || - oi.ReplicationStatus == replication.Failed || - oi.VersionPurgeStatus == Failed || oi.VersionPurgeStatus == Pending { - i.healReplicationDeletes(ctx, o, roi) - return - } - // if replication status is Complete on DeleteMarker and existing object resync required - if roi.ExistingObjResync.mustResync() && (oi.ReplicationStatus == replication.Completed || oi.ReplicationStatus.Empty()) { - i.healReplicationDeletes(ctx, o, roi) - return - } return } - if roi.ExistingObjResync.mustResync() { - roi.OpType = replication.ExistingObjectReplicationType - } if sizeS.replTargetStats == nil && len(roi.TargetStatuses) > 0 { sizeS.replTargetStats = make(map[string]replTargetSizeSummary) @@ -1277,52 +1264,9 @@ func (i *scannerItem) healReplication(ctx context.Context, o ObjectLayer, oi Obj } switch oi.ReplicationStatus { - case replication.Pending, replication.Failed: - roi.EventType = ReplicateHeal - globalReplicationPool.queueReplicaTask(roi) - return case replication.Replica: sizeS.replicaSize += oi.Size } - if roi.ExistingObjResync.mustResync() { - roi.EventType = ReplicateExisting - globalReplicationPool.queueReplicaTask(roi) - } -} - -// healReplicationDeletes will heal a scanned deleted item that failed to replicate deletes. -func (i *scannerItem) healReplicationDeletes(ctx context.Context, o ObjectLayer, roi ReplicateObjectInfo) { - // handle soft delete and permanent delete failures here. - if roi.DeleteMarker || !roi.VersionPurgeStatus.Empty() { - versionID := "" - dmVersionID := "" - if roi.VersionPurgeStatus.Empty() { - dmVersionID = roi.VersionID - } else { - versionID = roi.VersionID - } - - doi := DeletedObjectReplicationInfo{ - DeletedObject: DeletedObject{ - ObjectName: roi.Name, - DeleteMarkerVersionID: dmVersionID, - VersionID: versionID, - ReplicationState: roi.getReplicationState(roi.Dsc.String(), versionID, true), - DeleteMarkerMTime: DeleteMarkerMTime{roi.ModTime}, - DeleteMarker: roi.DeleteMarker, - }, - Bucket: roi.Bucket, - OpType: replication.HealReplicationType, - EventType: ReplicateHealDelete, - } - if roi.ExistingObjResync.mustResync() { - doi.OpType = replication.ExistingObjectReplicationType - doi.EventType = ReplicateExistingDelete - queueReplicateDeletesWrapper(doi, roi.ExistingObjResync) - return - } - globalReplicationPool.queueReplicaDeleteTask(doi) - } } type dynamicSleeper struct { diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index a47b5ac23..e2a3d217e 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -1178,7 +1178,6 @@ func (z *erasureServerPools) ListObjectVersions(ctx context.Context, bucket, pre if marker == "" && versionMarker != "" { return loi, NotImplemented{} } - opts := listPathOptions{ Bucket: bucket, Prefix: prefix, @@ -1189,6 +1188,8 @@ func (z *erasureServerPools) ListObjectVersions(ctx context.Context, bucket, pre AskDisks: globalAPIConfig.getListQuorum(), Versioned: true, } + // set bucket metadata in opts + opts.setBucketMeta(ctx) merged, err := z.listPath(ctx, &opts) if err != nil && err != io.EOF { @@ -1235,12 +1236,16 @@ func maxKeysPlusOne(maxKeys int, addOne bool) int { func (z *erasureServerPools) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) { var loi ListObjectsInfo - - // Automatically remove the object/version is an expiry lifecycle rule can be applied - lc, _ := globalLifecycleSys.Get(bucket) - - // Check if bucket is object locked. - rcfg, _ := globalBucketObjectLockSys.Get(bucket) + opts := listPathOptions{ + Bucket: bucket, + Prefix: prefix, + Separator: delimiter, + Limit: maxKeysPlusOne(maxKeys, marker != ""), + Marker: marker, + InclDeleted: false, + AskDisks: globalAPIConfig.getListQuorum(), + } + opts.setBucketMeta(ctx) if len(prefix) > 0 && maxKeys == 1 && delimiter == "" && marker == "" { // Optimization for certain applications like @@ -1252,8 +1257,8 @@ func (z *erasureServerPools) ListObjects(ctx context.Context, bucket, prefix, ma // to avoid the need for ListObjects(). objInfo, err := z.GetObjectInfo(ctx, bucket, prefix, ObjectOptions{NoLock: true}) if err == nil { - if lc != nil { - action := evalActionFromLifecycle(ctx, *lc, rcfg, objInfo, false) + if opts.Lifecycle != nil { + action := evalActionFromLifecycle(ctx, *opts.Lifecycle, opts.Retention, objInfo, false) switch action { case lifecycle.DeleteVersionAction, lifecycle.DeleteAction: fallthrough @@ -1266,18 +1271,6 @@ func (z *erasureServerPools) ListObjects(ctx context.Context, bucket, prefix, ma } } - opts := listPathOptions{ - Bucket: bucket, - Prefix: prefix, - Separator: delimiter, - Limit: maxKeysPlusOne(maxKeys, marker != ""), - Marker: marker, - InclDeleted: false, - AskDisks: globalAPIConfig.getListQuorum(), - Lifecycle: lc, - Retention: rcfg, - } - merged, err := z.listPath(ctx, &opts) if err != nil && err != io.EOF { if !isErrBucketNotFound(err) { diff --git a/cmd/erasure-single-drive.go b/cmd/erasure-single-drive.go index 4f4ec4eac..4df8e8dcc 100644 --- a/cmd/erasure-single-drive.go +++ b/cmd/erasure-single-drive.go @@ -2859,11 +2859,16 @@ func (es *erasureSingle) AbortMultipartUpload(ctx context.Context, bucket, objec func (es *erasureSingle) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) { var loi ListObjectsInfo - // Automatically remove the object/version is an expiry lifecycle rule can be applied - lc, _ := globalLifecycleSys.Get(bucket) - - // Check if bucket is object locked. - rcfg, _ := globalBucketObjectLockSys.Get(bucket) + opts := listPathOptions{ + Bucket: bucket, + Prefix: prefix, + Separator: delimiter, + Limit: maxKeysPlusOne(maxKeys, marker != ""), + Marker: marker, + InclDeleted: false, + AskDisks: globalAPIConfig.getListQuorum(), + } + opts.setBucketMeta(ctx) if len(prefix) > 0 && maxKeys == 1 && delimiter == "" && marker == "" { // Optimization for certain applications like @@ -2875,8 +2880,8 @@ func (es *erasureSingle) ListObjects(ctx context.Context, bucket, prefix, marker // to avoid the need for ListObjects(). objInfo, err := es.GetObjectInfo(ctx, bucket, prefix, ObjectOptions{NoLock: true}) if err == nil { - if lc != nil { - action := evalActionFromLifecycle(ctx, *lc, rcfg, objInfo, false) + if opts.Lifecycle != nil { + action := evalActionFromLifecycle(ctx, *opts.Lifecycle, opts.Retention, objInfo, false) switch action { case lifecycle.DeleteVersionAction, lifecycle.DeleteAction: fallthrough @@ -2889,18 +2894,6 @@ func (es *erasureSingle) ListObjects(ctx context.Context, bucket, prefix, marker } } - opts := listPathOptions{ - Bucket: bucket, - Prefix: prefix, - Separator: delimiter, - Limit: maxKeysPlusOne(maxKeys, marker != ""), - Marker: marker, - InclDeleted: false, - AskDisks: globalAPIConfig.getListQuorum(), - Lifecycle: lc, - Retention: rcfg, - } - merged, err := es.listPath(ctx, &opts) if err != nil && err != io.EOF { if !isErrBucketNotFound(err) { @@ -2959,7 +2952,6 @@ func (es *erasureSingle) ListObjectVersions(ctx context.Context, bucket, prefix, if marker == "" && versionMarker != "" { return loi, NotImplemented{} } - opts := listPathOptions{ Bucket: bucket, Prefix: prefix, @@ -2970,6 +2962,7 @@ func (es *erasureSingle) ListObjectVersions(ctx context.Context, bucket, prefix, AskDisks: "strict", Versioned: true, } + opts.setBucketMeta(ctx) merged, err := es.listPath(ctx, &opts) if err != nil && err != io.EOF { diff --git a/cmd/metacache-server-pool.go b/cmd/metacache-server-pool.go index 8a3469123..26c19b748 100644 --- a/cmd/metacache-server-pool.go +++ b/cmd/metacache-server-pool.go @@ -29,7 +29,6 @@ import ( "time" "github.com/minio/minio/internal/bucket/lifecycle" - "github.com/minio/minio/internal/bucket/object/lock" "github.com/minio/minio/internal/logger" ) @@ -486,9 +485,9 @@ func (es *erasureSingle) listMerged(ctx context.Context, o listPathOptions, resu mu.Unlock() // Do lifecycle filtering. - if o.Lifecycle != nil { + if o.Lifecycle != nil || o.Replication.Config != nil { filterIn := make(chan metaCacheEntry, 10) - go filterLifeCycle(ctx, o.Bucket, *o.Lifecycle, o.Retention, filterIn, results) + go applyBucketActions(ctx, o, filterIn, results) // Replace results. results = filterIn } @@ -572,9 +571,9 @@ func (z *erasureServerPools) listMerged(ctx context.Context, o listPathOptions, mu.Unlock() // Do lifecycle filtering. - if o.Lifecycle != nil { + if o.Lifecycle != nil || o.Replication.Config != nil { filterIn := make(chan metaCacheEntry, 10) - go filterLifeCycle(ctx, o.Bucket, *o.Lifecycle, o.Retention, filterIn, results) + go applyBucketActions(ctx, o, filterIn, results) // Replace results. results = filterIn } @@ -635,15 +634,16 @@ func (z *erasureServerPools) listMerged(ctx context.Context, o listPathOptions, return nil } -// filterLifeCycle will filter out objects if the most recent -// version should be deleted by lifecycle. +// applyBucketActions applies lifecycle and replication actions on the listing +// It will filter out objects if the most recent version should be deleted by lifecycle. +// Entries that failed replication will be queued if no lifecycle rules got applied. // out will be closed when there are no more results. // When 'in' is closed or the context is canceled the // function closes 'out' and exits. -func filterLifeCycle(ctx context.Context, bucket string, lc lifecycle.Lifecycle, lr lock.Retention, in <-chan metaCacheEntry, out chan<- metaCacheEntry) { +func applyBucketActions(ctx context.Context, o listPathOptions, in <-chan metaCacheEntry, out chan<- metaCacheEntry) { defer close(out) - vcfg, _ := globalBucketVersioningSys.Get(bucket) + vcfg, _ := globalBucketVersioningSys.Get(o.Bucket) for { var obj metaCacheEntry var ok bool @@ -656,29 +656,32 @@ func filterLifeCycle(ctx context.Context, bucket string, lc lifecycle.Lifecycle, } } - fi, err := obj.fileInfo(bucket) + fi, err := obj.fileInfo(o.Bucket) if err != nil { continue } versioned := vcfg != nil && vcfg.Versioned(obj.name) - objInfo := fi.ToObjectInfo(bucket, obj.name, versioned) - action := evalActionFromLifecycle(ctx, lc, lr, objInfo, false) - switch action { - case lifecycle.DeleteVersionAction, lifecycle.DeleteAction: - globalExpiryState.enqueueByDays(objInfo, false, action == lifecycle.DeleteVersionAction) - // Skip this entry. - continue - case lifecycle.DeleteRestoredAction, lifecycle.DeleteRestoredVersionAction: - globalExpiryState.enqueueByDays(objInfo, true, action == lifecycle.DeleteRestoredVersionAction) - // Skip this entry. - continue + objInfo := fi.ToObjectInfo(o.Bucket, obj.name, versioned) + if o.Lifecycle != nil { + action := evalActionFromLifecycle(ctx, *o.Lifecycle, o.Retention, objInfo, false) + switch action { + case lifecycle.DeleteVersionAction, lifecycle.DeleteAction: + globalExpiryState.enqueueByDays(objInfo, false, action == lifecycle.DeleteVersionAction) + // Skip this entry. + continue + case lifecycle.DeleteRestoredAction, lifecycle.DeleteRestoredVersionAction: + globalExpiryState.enqueueByDays(objInfo, true, action == lifecycle.DeleteRestoredVersionAction) + // Skip this entry. + continue + } } select { case <-ctx.Done(): return case out <- obj: + queueReplicationHeal(ctx, o.Bucket, objInfo, o.Replication) } } } diff --git a/cmd/metacache-set.go b/cmd/metacache-set.go index 4803d8e76..6b8a2d2e6 100644 --- a/cmd/metacache-set.go +++ b/cmd/metacache-set.go @@ -102,6 +102,8 @@ type listPathOptions struct { // Retention configuration, needed to be passed along with lifecycle if set. Retention lock.Retention + // Replication configuration + Replication replicationConfig // pool and set of where the cache is located. pool, set int } @@ -110,6 +112,21 @@ func init() { gob.Register(listPathOptions{}) } +func (o *listPathOptions) setBucketMeta(ctx context.Context) { + lc, _ := globalLifecycleSys.Get(o.Bucket) + + // Check if bucket is object locked. + rcfg, _ := globalBucketObjectLockSys.Get(o.Bucket) + replCfg, _, _ := globalBucketMetadataSys.GetReplicationConfig(ctx, o.Bucket) + tgts, _ := globalBucketTargetSys.ListBucketTargets(ctx, o.Bucket) + o.Lifecycle = lc + o.Replication = replicationConfig{ + Config: replCfg, + remotes: tgts, + } + o.Retention = rcfg +} + // newMetacache constructs a new metacache from the options. func (o listPathOptions) newMetacache() metacache { return metacache{ diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index eaaf47360..610baec3a 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -417,12 +417,11 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj return checkPreconditions(ctx, w, r, oi, opts) } - + var proxy proxyResult gr, err := getObjectNInfo(ctx, bucket, object, rs, r.Header, readLock, opts) if err != nil { var ( reader *GetObjectReader - proxy proxyResult perr error ) proxytgts := getProxyTargets(ctx, bucket, object, opts) @@ -491,6 +490,10 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj } } + // Queue failed/pending replication automatically + if !proxy.Proxy { + QueueReplicationHeal(ctx, bucket, objInfo) + } // filter object lock metadata if permission does not permit getRetPerms := checkRequestAuthType(ctx, r, policy.GetObjectRetentionAction, bucket, object) legalHoldPerms := checkRequestAuthType(ctx, r, policy.GetObjectLegalHoldAction, bucket, object) @@ -659,11 +662,9 @@ func (api objectAPIHandlers) headObjectHandler(ctx context.Context, objectAPI Ob rangeHeader := r.Header.Get(xhttp.Range) objInfo, err := getObjectInfo(ctx, bucket, object, opts) + var proxy proxyResult if err != nil { - var ( - proxy proxyResult - oi ObjectInfo - ) + var oi ObjectInfo // proxy HEAD to replication target if active-active replication configured on bucket proxytgts := getProxyTargets(ctx, bucket, object, opts) if !proxytgts.Empty() { @@ -690,6 +691,7 @@ func (api objectAPIHandlers) headObjectHandler(ctx context.Context, objectAPI Ob w.Header()[xhttp.AmzDeleteMarker] = []string{strconv.FormatBool(objInfo.DeleteMarker)} } } + QueueReplicationHeal(ctx, bucket, objInfo) writeErrorResponseHeadersOnly(w, toAPIError(ctx, err)) return } @@ -714,6 +716,11 @@ func (api objectAPIHandlers) headObjectHandler(ctx context.Context, objectAPI Ob } } + // Queue failed/pending replication automatically + if !proxy.Proxy { + QueueReplicationHeal(ctx, bucket, objInfo) + } + // filter object lock metadata if permission does not permit getRetPerms := checkRequestAuthType(ctx, r, policy.GetObjectRetentionAction, bucket, object) legalHoldPerms := checkRequestAuthType(ctx, r, policy.GetObjectLegalHoldAction, bucket, object)