replication: heal proactively upon access (#15501)

Queue failed/pending replication for healing during listing and GET/HEAD
API calls. This includes healing of existing objects that were never
replicated or those in the middle of a resync operation.

This PR also fixes a bug in ListObjectVersions where lifecycle filtering
should be done.
This commit is contained in:
Poorna 2022-08-09 15:00:24 -07:00 committed by GitHub
parent a406bb0288
commit 21bf5b4db7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 173 additions and 129 deletions

View File

@ -2386,3 +2386,90 @@ func getReplicationDiff(ctx context.Context, objAPI ObjectLayer, bucket string,
}()
return diffCh, nil
}
// QueueReplicationHeal is a wrapper for queueReplicationHeal
func QueueReplicationHeal(ctx context.Context, bucket string, oi ObjectInfo) {
// un-versioned case
if oi.VersionID == "" {
return
}
rcfg, _, _ := globalBucketMetadataSys.GetReplicationConfig(ctx, bucket)
tgts, _ := globalBucketTargetSys.ListBucketTargets(ctx, bucket)
queueReplicationHeal(ctx, bucket, oi, replicationConfig{
Config: rcfg,
remotes: tgts,
})
}
// queueReplicationHeal enqueues objects that failed replication OR eligible for resyncing through
// an ongoing resync operation or via existing objects replication configuration setting.
func queueReplicationHeal(ctx context.Context, bucket string, oi ObjectInfo, rcfg replicationConfig) (roi ReplicateObjectInfo) {
// un-versioned case
if oi.VersionID == "" {
return roi
}
if rcfg.Config == nil || rcfg.remotes == nil {
return roi
}
roi = getHealReplicateObjectInfo(oi, rcfg)
if !roi.Dsc.ReplicateAny() {
return
}
// early return if replication already done, otherwise we need to determine if this
// version is an existing object that needs healing.
if oi.ReplicationStatus == replication.Completed && oi.VersionPurgeStatus.Empty() && !roi.ExistingObjResync.mustResync() {
return
}
if roi.DeleteMarker || !roi.VersionPurgeStatus.Empty() {
versionID := ""
dmVersionID := ""
if roi.VersionPurgeStatus.Empty() {
dmVersionID = roi.VersionID
} else {
versionID = roi.VersionID
}
dv := DeletedObjectReplicationInfo{
DeletedObject: DeletedObject{
ObjectName: roi.Name,
DeleteMarkerVersionID: dmVersionID,
VersionID: versionID,
ReplicationState: roi.getReplicationState(roi.Dsc.String(), versionID, true),
DeleteMarkerMTime: DeleteMarkerMTime{roi.ModTime},
DeleteMarker: roi.DeleteMarker,
},
Bucket: roi.Bucket,
OpType: replication.HealReplicationType,
EventType: ReplicateHealDelete,
}
// heal delete marker replication failure or versioned delete replication failure
if roi.ReplicationStatus == replication.Pending ||
roi.ReplicationStatus == replication.Failed ||
roi.VersionPurgeStatus == Failed || roi.VersionPurgeStatus == Pending {
globalReplicationPool.queueReplicaDeleteTask(dv)
return
}
// if replication status is Complete on DeleteMarker and existing object resync required
if roi.ExistingObjResync.mustResync() && (roi.ReplicationStatus == replication.Completed || roi.ReplicationStatus.Empty()) {
queueReplicateDeletesWrapper(dv, roi.ExistingObjResync)
return
}
return
}
if roi.ExistingObjResync.mustResync() {
roi.OpType = replication.ExistingObjectReplicationType
}
switch roi.ReplicationStatus {
case replication.Pending, replication.Failed:
roi.EventType = ReplicateHeal
globalReplicationPool.queueReplicaTask(roi)
return
}
if roi.ExistingObjResync.mustResync() {
roi.EventType = ReplicateExisting
globalReplicationPool.queueReplicaTask(roi)
}
return
}

View File

@ -1225,29 +1225,16 @@ func (i *scannerItem) objectPath() string {
// healReplication will heal a scanned item that has failed replication.
func (i *scannerItem) healReplication(ctx context.Context, o ObjectLayer, oi ObjectInfo, sizeS *sizeSummary) {
roi := getHealReplicateObjectInfo(oi, i.replication)
if !roi.Dsc.ReplicateAny() {
if oi.VersionID == "" {
return
}
if i.replication.Config == nil {
return
}
roi := queueReplicationHeal(ctx, oi.Bucket, oi, i.replication)
if oi.DeleteMarker || !oi.VersionPurgeStatus.Empty() {
// heal delete marker replication failure or versioned delete replication failure
if oi.ReplicationStatus == replication.Pending ||
oi.ReplicationStatus == replication.Failed ||
oi.VersionPurgeStatus == Failed || oi.VersionPurgeStatus == Pending {
i.healReplicationDeletes(ctx, o, roi)
return
}
// if replication status is Complete on DeleteMarker and existing object resync required
if roi.ExistingObjResync.mustResync() && (oi.ReplicationStatus == replication.Completed || oi.ReplicationStatus.Empty()) {
i.healReplicationDeletes(ctx, o, roi)
return
}
return
}
if roi.ExistingObjResync.mustResync() {
roi.OpType = replication.ExistingObjectReplicationType
}
if sizeS.replTargetStats == nil && len(roi.TargetStatuses) > 0 {
sizeS.replTargetStats = make(map[string]replTargetSizeSummary)
@ -1277,52 +1264,9 @@ func (i *scannerItem) healReplication(ctx context.Context, o ObjectLayer, oi Obj
}
switch oi.ReplicationStatus {
case replication.Pending, replication.Failed:
roi.EventType = ReplicateHeal
globalReplicationPool.queueReplicaTask(roi)
return
case replication.Replica:
sizeS.replicaSize += oi.Size
}
if roi.ExistingObjResync.mustResync() {
roi.EventType = ReplicateExisting
globalReplicationPool.queueReplicaTask(roi)
}
}
// healReplicationDeletes will heal a scanned deleted item that failed to replicate deletes.
func (i *scannerItem) healReplicationDeletes(ctx context.Context, o ObjectLayer, roi ReplicateObjectInfo) {
// handle soft delete and permanent delete failures here.
if roi.DeleteMarker || !roi.VersionPurgeStatus.Empty() {
versionID := ""
dmVersionID := ""
if roi.VersionPurgeStatus.Empty() {
dmVersionID = roi.VersionID
} else {
versionID = roi.VersionID
}
doi := DeletedObjectReplicationInfo{
DeletedObject: DeletedObject{
ObjectName: roi.Name,
DeleteMarkerVersionID: dmVersionID,
VersionID: versionID,
ReplicationState: roi.getReplicationState(roi.Dsc.String(), versionID, true),
DeleteMarkerMTime: DeleteMarkerMTime{roi.ModTime},
DeleteMarker: roi.DeleteMarker,
},
Bucket: roi.Bucket,
OpType: replication.HealReplicationType,
EventType: ReplicateHealDelete,
}
if roi.ExistingObjResync.mustResync() {
doi.OpType = replication.ExistingObjectReplicationType
doi.EventType = ReplicateExistingDelete
queueReplicateDeletesWrapper(doi, roi.ExistingObjResync)
return
}
globalReplicationPool.queueReplicaDeleteTask(doi)
}
}
type dynamicSleeper struct {

View File

@ -1178,7 +1178,6 @@ func (z *erasureServerPools) ListObjectVersions(ctx context.Context, bucket, pre
if marker == "" && versionMarker != "" {
return loi, NotImplemented{}
}
opts := listPathOptions{
Bucket: bucket,
Prefix: prefix,
@ -1189,6 +1188,8 @@ func (z *erasureServerPools) ListObjectVersions(ctx context.Context, bucket, pre
AskDisks: globalAPIConfig.getListQuorum(),
Versioned: true,
}
// set bucket metadata in opts
opts.setBucketMeta(ctx)
merged, err := z.listPath(ctx, &opts)
if err != nil && err != io.EOF {
@ -1235,12 +1236,16 @@ func maxKeysPlusOne(maxKeys int, addOne bool) int {
func (z *erasureServerPools) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) {
var loi ListObjectsInfo
// Automatically remove the object/version is an expiry lifecycle rule can be applied
lc, _ := globalLifecycleSys.Get(bucket)
// Check if bucket is object locked.
rcfg, _ := globalBucketObjectLockSys.Get(bucket)
opts := listPathOptions{
Bucket: bucket,
Prefix: prefix,
Separator: delimiter,
Limit: maxKeysPlusOne(maxKeys, marker != ""),
Marker: marker,
InclDeleted: false,
AskDisks: globalAPIConfig.getListQuorum(),
}
opts.setBucketMeta(ctx)
if len(prefix) > 0 && maxKeys == 1 && delimiter == "" && marker == "" {
// Optimization for certain applications like
@ -1252,8 +1257,8 @@ func (z *erasureServerPools) ListObjects(ctx context.Context, bucket, prefix, ma
// to avoid the need for ListObjects().
objInfo, err := z.GetObjectInfo(ctx, bucket, prefix, ObjectOptions{NoLock: true})
if err == nil {
if lc != nil {
action := evalActionFromLifecycle(ctx, *lc, rcfg, objInfo, false)
if opts.Lifecycle != nil {
action := evalActionFromLifecycle(ctx, *opts.Lifecycle, opts.Retention, objInfo, false)
switch action {
case lifecycle.DeleteVersionAction, lifecycle.DeleteAction:
fallthrough
@ -1266,18 +1271,6 @@ func (z *erasureServerPools) ListObjects(ctx context.Context, bucket, prefix, ma
}
}
opts := listPathOptions{
Bucket: bucket,
Prefix: prefix,
Separator: delimiter,
Limit: maxKeysPlusOne(maxKeys, marker != ""),
Marker: marker,
InclDeleted: false,
AskDisks: globalAPIConfig.getListQuorum(),
Lifecycle: lc,
Retention: rcfg,
}
merged, err := z.listPath(ctx, &opts)
if err != nil && err != io.EOF {
if !isErrBucketNotFound(err) {

View File

@ -2859,11 +2859,16 @@ func (es *erasureSingle) AbortMultipartUpload(ctx context.Context, bucket, objec
func (es *erasureSingle) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) {
var loi ListObjectsInfo
// Automatically remove the object/version is an expiry lifecycle rule can be applied
lc, _ := globalLifecycleSys.Get(bucket)
// Check if bucket is object locked.
rcfg, _ := globalBucketObjectLockSys.Get(bucket)
opts := listPathOptions{
Bucket: bucket,
Prefix: prefix,
Separator: delimiter,
Limit: maxKeysPlusOne(maxKeys, marker != ""),
Marker: marker,
InclDeleted: false,
AskDisks: globalAPIConfig.getListQuorum(),
}
opts.setBucketMeta(ctx)
if len(prefix) > 0 && maxKeys == 1 && delimiter == "" && marker == "" {
// Optimization for certain applications like
@ -2875,8 +2880,8 @@ func (es *erasureSingle) ListObjects(ctx context.Context, bucket, prefix, marker
// to avoid the need for ListObjects().
objInfo, err := es.GetObjectInfo(ctx, bucket, prefix, ObjectOptions{NoLock: true})
if err == nil {
if lc != nil {
action := evalActionFromLifecycle(ctx, *lc, rcfg, objInfo, false)
if opts.Lifecycle != nil {
action := evalActionFromLifecycle(ctx, *opts.Lifecycle, opts.Retention, objInfo, false)
switch action {
case lifecycle.DeleteVersionAction, lifecycle.DeleteAction:
fallthrough
@ -2889,18 +2894,6 @@ func (es *erasureSingle) ListObjects(ctx context.Context, bucket, prefix, marker
}
}
opts := listPathOptions{
Bucket: bucket,
Prefix: prefix,
Separator: delimiter,
Limit: maxKeysPlusOne(maxKeys, marker != ""),
Marker: marker,
InclDeleted: false,
AskDisks: globalAPIConfig.getListQuorum(),
Lifecycle: lc,
Retention: rcfg,
}
merged, err := es.listPath(ctx, &opts)
if err != nil && err != io.EOF {
if !isErrBucketNotFound(err) {
@ -2959,7 +2952,6 @@ func (es *erasureSingle) ListObjectVersions(ctx context.Context, bucket, prefix,
if marker == "" && versionMarker != "" {
return loi, NotImplemented{}
}
opts := listPathOptions{
Bucket: bucket,
Prefix: prefix,
@ -2970,6 +2962,7 @@ func (es *erasureSingle) ListObjectVersions(ctx context.Context, bucket, prefix,
AskDisks: "strict",
Versioned: true,
}
opts.setBucketMeta(ctx)
merged, err := es.listPath(ctx, &opts)
if err != nil && err != io.EOF {

View File

@ -29,7 +29,6 @@ import (
"time"
"github.com/minio/minio/internal/bucket/lifecycle"
"github.com/minio/minio/internal/bucket/object/lock"
"github.com/minio/minio/internal/logger"
)
@ -486,9 +485,9 @@ func (es *erasureSingle) listMerged(ctx context.Context, o listPathOptions, resu
mu.Unlock()
// Do lifecycle filtering.
if o.Lifecycle != nil {
if o.Lifecycle != nil || o.Replication.Config != nil {
filterIn := make(chan metaCacheEntry, 10)
go filterLifeCycle(ctx, o.Bucket, *o.Lifecycle, o.Retention, filterIn, results)
go applyBucketActions(ctx, o, filterIn, results)
// Replace results.
results = filterIn
}
@ -572,9 +571,9 @@ func (z *erasureServerPools) listMerged(ctx context.Context, o listPathOptions,
mu.Unlock()
// Do lifecycle filtering.
if o.Lifecycle != nil {
if o.Lifecycle != nil || o.Replication.Config != nil {
filterIn := make(chan metaCacheEntry, 10)
go filterLifeCycle(ctx, o.Bucket, *o.Lifecycle, o.Retention, filterIn, results)
go applyBucketActions(ctx, o, filterIn, results)
// Replace results.
results = filterIn
}
@ -635,15 +634,16 @@ func (z *erasureServerPools) listMerged(ctx context.Context, o listPathOptions,
return nil
}
// filterLifeCycle will filter out objects if the most recent
// version should be deleted by lifecycle.
// applyBucketActions applies lifecycle and replication actions on the listing
// It will filter out objects if the most recent version should be deleted by lifecycle.
// Entries that failed replication will be queued if no lifecycle rules got applied.
// out will be closed when there are no more results.
// When 'in' is closed or the context is canceled the
// function closes 'out' and exits.
func filterLifeCycle(ctx context.Context, bucket string, lc lifecycle.Lifecycle, lr lock.Retention, in <-chan metaCacheEntry, out chan<- metaCacheEntry) {
func applyBucketActions(ctx context.Context, o listPathOptions, in <-chan metaCacheEntry, out chan<- metaCacheEntry) {
defer close(out)
vcfg, _ := globalBucketVersioningSys.Get(bucket)
vcfg, _ := globalBucketVersioningSys.Get(o.Bucket)
for {
var obj metaCacheEntry
var ok bool
@ -656,29 +656,32 @@ func filterLifeCycle(ctx context.Context, bucket string, lc lifecycle.Lifecycle,
}
}
fi, err := obj.fileInfo(bucket)
fi, err := obj.fileInfo(o.Bucket)
if err != nil {
continue
}
versioned := vcfg != nil && vcfg.Versioned(obj.name)
objInfo := fi.ToObjectInfo(bucket, obj.name, versioned)
action := evalActionFromLifecycle(ctx, lc, lr, objInfo, false)
switch action {
case lifecycle.DeleteVersionAction, lifecycle.DeleteAction:
globalExpiryState.enqueueByDays(objInfo, false, action == lifecycle.DeleteVersionAction)
// Skip this entry.
continue
case lifecycle.DeleteRestoredAction, lifecycle.DeleteRestoredVersionAction:
globalExpiryState.enqueueByDays(objInfo, true, action == lifecycle.DeleteRestoredVersionAction)
// Skip this entry.
continue
objInfo := fi.ToObjectInfo(o.Bucket, obj.name, versioned)
if o.Lifecycle != nil {
action := evalActionFromLifecycle(ctx, *o.Lifecycle, o.Retention, objInfo, false)
switch action {
case lifecycle.DeleteVersionAction, lifecycle.DeleteAction:
globalExpiryState.enqueueByDays(objInfo, false, action == lifecycle.DeleteVersionAction)
// Skip this entry.
continue
case lifecycle.DeleteRestoredAction, lifecycle.DeleteRestoredVersionAction:
globalExpiryState.enqueueByDays(objInfo, true, action == lifecycle.DeleteRestoredVersionAction)
// Skip this entry.
continue
}
}
select {
case <-ctx.Done():
return
case out <- obj:
queueReplicationHeal(ctx, o.Bucket, objInfo, o.Replication)
}
}
}

View File

@ -102,6 +102,8 @@ type listPathOptions struct {
// Retention configuration, needed to be passed along with lifecycle if set.
Retention lock.Retention
// Replication configuration
Replication replicationConfig
// pool and set of where the cache is located.
pool, set int
}
@ -110,6 +112,21 @@ func init() {
gob.Register(listPathOptions{})
}
func (o *listPathOptions) setBucketMeta(ctx context.Context) {
lc, _ := globalLifecycleSys.Get(o.Bucket)
// Check if bucket is object locked.
rcfg, _ := globalBucketObjectLockSys.Get(o.Bucket)
replCfg, _, _ := globalBucketMetadataSys.GetReplicationConfig(ctx, o.Bucket)
tgts, _ := globalBucketTargetSys.ListBucketTargets(ctx, o.Bucket)
o.Lifecycle = lc
o.Replication = replicationConfig{
Config: replCfg,
remotes: tgts,
}
o.Retention = rcfg
}
// newMetacache constructs a new metacache from the options.
func (o listPathOptions) newMetacache() metacache {
return metacache{

View File

@ -417,12 +417,11 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj
return checkPreconditions(ctx, w, r, oi, opts)
}
var proxy proxyResult
gr, err := getObjectNInfo(ctx, bucket, object, rs, r.Header, readLock, opts)
if err != nil {
var (
reader *GetObjectReader
proxy proxyResult
perr error
)
proxytgts := getProxyTargets(ctx, bucket, object, opts)
@ -491,6 +490,10 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj
}
}
// Queue failed/pending replication automatically
if !proxy.Proxy {
QueueReplicationHeal(ctx, bucket, objInfo)
}
// filter object lock metadata if permission does not permit
getRetPerms := checkRequestAuthType(ctx, r, policy.GetObjectRetentionAction, bucket, object)
legalHoldPerms := checkRequestAuthType(ctx, r, policy.GetObjectLegalHoldAction, bucket, object)
@ -659,11 +662,9 @@ func (api objectAPIHandlers) headObjectHandler(ctx context.Context, objectAPI Ob
rangeHeader := r.Header.Get(xhttp.Range)
objInfo, err := getObjectInfo(ctx, bucket, object, opts)
var proxy proxyResult
if err != nil {
var (
proxy proxyResult
oi ObjectInfo
)
var oi ObjectInfo
// proxy HEAD to replication target if active-active replication configured on bucket
proxytgts := getProxyTargets(ctx, bucket, object, opts)
if !proxytgts.Empty() {
@ -690,6 +691,7 @@ func (api objectAPIHandlers) headObjectHandler(ctx context.Context, objectAPI Ob
w.Header()[xhttp.AmzDeleteMarker] = []string{strconv.FormatBool(objInfo.DeleteMarker)}
}
}
QueueReplicationHeal(ctx, bucket, objInfo)
writeErrorResponseHeadersOnly(w, toAPIError(ctx, err))
return
}
@ -714,6 +716,11 @@ func (api objectAPIHandlers) headObjectHandler(ctx context.Context, objectAPI Ob
}
}
// Queue failed/pending replication automatically
if !proxy.Proxy {
QueueReplicationHeal(ctx, bucket, objInfo)
}
// filter object lock metadata if permission does not permit
getRetPerms := checkRequestAuthType(ctx, r, policy.GetObjectRetentionAction, bucket, object)
legalHoldPerms := checkRequestAuthType(ctx, r, policy.GetObjectLegalHoldAction, bucket, object)