diff --git a/cmd/erasure-healing.go b/cmd/erasure-healing.go index 57b19f132..022672d21 100644 --- a/cmd/erasure-healing.go +++ b/cmd/erasure-healing.go @@ -23,6 +23,7 @@ import ( "errors" "fmt" "io" + "strings" "sync" "time" @@ -66,6 +67,68 @@ func (fi FileInfo) DataShardFixed() bool { return fi.Metadata[reservedMetadataPrefixLowerDataShardFix] == "true" } +func (er erasureObjects) listAndHeal(bucket, prefix string, healEntry func(string, metaCacheEntry) error) error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + disks, _ := er.getOnlineDisksWithHealing() + if len(disks) == 0 { + return errors.New("listAndHeal: No non-healing drives found") + } + + // How to resolve partial results. + resolver := metadataResolutionParams{ + dirQuorum: 1, + objQuorum: 1, + bucket: bucket, + strict: false, // Allow less strict matching. + } + + path := baseDirFromPrefix(prefix) + filterPrefix := strings.Trim(strings.TrimPrefix(prefix, path), slashSeparator) + if path == prefix { + filterPrefix = "" + } + + lopts := listPathRawOptions{ + disks: disks, + bucket: bucket, + path: path, + filterPrefix: filterPrefix, + recursive: true, + forwardTo: "", + minDisks: 1, + reportNotFound: false, + agreed: func(entry metaCacheEntry) { + if err := healEntry(bucket, entry); err != nil { + logger.LogIf(ctx, err) + cancel() + } + }, + partial: func(entries metaCacheEntries, _ []error) { + entry, ok := entries.resolve(&resolver) + if !ok { + // check if we can get one entry atleast + // proceed to heal nonetheless. + entry, _ = entries.firstFound() + } + + if err := healEntry(bucket, *entry); err != nil { + logger.LogIf(ctx, err) + cancel() + return + } + }, + finished: nil, + } + + if err := listPathRaw(ctx, lopts); err != nil { + return fmt.Errorf("listPathRaw returned %w: opts(%#v)", err, lopts) + } + + return nil +} + // HealBucket heals a bucket if it doesn't exist on one of the disks, additionally // also heals the missing entries for bucket metadata files // `policy.json, notification.xml, listeners.json`. diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index d2cc798c6..77788ed34 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -1249,7 +1249,14 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str } if !opts.Speedtest && versionsDisparity { - listAndHeal(ctx, bucket, object, &er, healObjectVersionsDisparity) + globalMRFState.addPartialOp(partialOperation{ + bucket: bucket, + object: object, + queued: time.Now(), + allVersions: true, + setIndex: er.setIndex, + poolIndex: er.poolIndex, + }) } // Check if there is any offline disk and add it to the MRF list diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 9e675f769..d42abd116 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -1349,7 +1349,14 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st } if versionsDisparity { - listAndHeal(ctx, bucket, object, &er, healObjectVersionsDisparity) + globalMRFState.addPartialOp(partialOperation{ + bucket: bucket, + object: object, + queued: time.Now(), + allVersions: true, + setIndex: er.setIndex, + poolIndex: er.poolIndex, + }) } } diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index eec8034fb..3cb8fde7a 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -1905,68 +1905,6 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re // HealObjectFn closure function heals the object. type HealObjectFn func(bucket, object, versionID string) error -func listAndHeal(ctx context.Context, bucket, prefix string, set *erasureObjects, healEntry func(string, metaCacheEntry) error) error { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - disks, _ := set.getOnlineDisksWithHealing() - if len(disks) == 0 { - return errors.New("listAndHeal: No non-healing drives found") - } - - // How to resolve partial results. - resolver := metadataResolutionParams{ - dirQuorum: 1, - objQuorum: 1, - bucket: bucket, - strict: false, // Allow less strict matching. - } - - path := baseDirFromPrefix(prefix) - filterPrefix := strings.Trim(strings.TrimPrefix(prefix, path), slashSeparator) - if path == prefix { - filterPrefix = "" - } - - lopts := listPathRawOptions{ - disks: disks, - bucket: bucket, - path: path, - filterPrefix: filterPrefix, - recursive: true, - forwardTo: "", - minDisks: 1, - reportNotFound: false, - agreed: func(entry metaCacheEntry) { - if err := healEntry(bucket, entry); err != nil { - logger.LogIf(ctx, err) - cancel() - } - }, - partial: func(entries metaCacheEntries, _ []error) { - entry, ok := entries.resolve(&resolver) - if !ok { - // check if we can get one entry atleast - // proceed to heal nonetheless. - entry, _ = entries.firstFound() - } - - if err := healEntry(bucket, *entry); err != nil { - logger.LogIf(ctx, err) - cancel() - return - } - }, - finished: nil, - } - - if err := listPathRaw(ctx, lopts); err != nil { - return fmt.Errorf("listPathRaw returned %w: opts(%#v)", err, lopts) - } - - return nil -} - func (z *erasureServerPools) HealObjects(ctx context.Context, bucket, prefix string, opts madmin.HealOpts, healObjectFn HealObjectFn) error { healEntry := func(bucket string, entry metaCacheEntry) error { if entry.isDir() { @@ -2024,7 +1962,7 @@ func (z *erasureServerPools) HealObjects(ctx context.Context, bucket, prefix str go func(idx int, set *erasureObjects) { defer wg.Done() - errs[idx] = listAndHeal(ctx, bucket, prefix, set, healEntry) + errs[idx] = set.listAndHeal(bucket, prefix, healEntry) }(idx, set) } wg.Wait() diff --git a/cmd/mrf.go b/cmd/mrf.go index 5b798276a..02c1c767d 100644 --- a/cmd/mrf.go +++ b/cmd/mrf.go @@ -32,17 +32,19 @@ const ( // partialOperation is a successful upload/delete of an object // but not written in all disks (having quorum) type partialOperation struct { - bucket string - object string - versionID string - queued time.Time + bucket string + object string + versionID string + allVersions bool + setIndex, poolIndex int + queued time.Time } // mrfState sncapsulates all the information // related to the global background MRF. type mrfState struct { - ctx context.Context - objectAPI ObjectLayer + ctx context.Context + pools *erasureServerPools mu sync.Mutex opCh chan partialOperation @@ -55,9 +57,12 @@ func (m *mrfState) init(ctx context.Context, objAPI ObjectLayer) { m.ctx = ctx m.opCh = make(chan partialOperation, mrfOpsQueueSize) - m.objectAPI = objAPI - go globalMRFState.healRoutine() + var ok bool + m.pools, ok = objAPI.(*erasureServerPools) + if ok { + go m.healRoutine() + } } // Add a partial S3 operation (put/delete) when one or more disks are offline. @@ -101,7 +106,11 @@ func (m *mrfState) healRoutine() { if u.object == "" { healBucket(u.bucket, madmin.HealNormalScan) } else { - healObject(u.bucket, u.object, u.versionID, madmin.HealNormalScan) + if u.allVersions { + m.pools.serverPools[u.poolIndex].sets[u.setIndex].listAndHeal(u.bucket, u.object, healObjectVersionsDisparity) + } else { + healObject(u.bucket, u.object, u.versionID, madmin.HealNormalScan) + } } wait()