diff --git a/cmd/erasure-healing-common.go b/cmd/erasure-healing-common.go index 51cb5e1d8..c949d054d 100644 --- a/cmd/erasure-healing-common.go +++ b/cmd/erasure-healing-common.go @@ -189,6 +189,35 @@ func getLatestFileInfo(ctx context.Context, partsMetadata []FileInfo, errs []err return latestFileInfo, nil } +// fileInfoConsistent whether all fileinfos are consistent with each other. +// Will return false if any fileinfo mismatches. +func fileInfoConsistent(ctx context.Context, partsMetadata []FileInfo, errs []error) bool { + // There should be atleast half correct entries, if not return failure + if reducedErr := reduceReadQuorumErrs(ctx, errs, nil, len(partsMetadata)/2); reducedErr != nil { + return false + } + if len(partsMetadata) == 1 { + return true + } + // Reference + ref := partsMetadata[0] + if !ref.IsValid() { + return false + } + for _, meta := range partsMetadata[1:] { + if !meta.IsValid() { + return false + } + if !meta.ModTime.Equal(ref.ModTime) { + return false + } + if meta.DataDir != ref.DataDir { + return false + } + } + return true +} + // disksWithAllParts - This function needs to be called with // []StorageAPI returned by listOnlineDisks. Returns, // diff --git a/cmd/erasure-healing.go b/cmd/erasure-healing.go index b82f8578f..e09d6dbbb 100644 --- a/cmd/erasure-healing.go +++ b/cmd/erasure-healing.go @@ -254,9 +254,23 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s DataBlocks: len(storageDisks) - er.defaultParityCount, } - // Re-read with data enabled + if !opts.NoLock { + lk := er.NewNSLock(bucket, object) + lkctx, err := lk.GetLock(ctx, globalOperationTimeout) + if err != nil { + return result, err + } + ctx = lkctx.Context() + defer lk.Unlock(lkctx.Cancel) + } + + // Re-read when we have lock... partsMetadata, errs := readAllFileInfo(ctx, storageDisks, bucket, object, versionID, true) + _, err = getLatestFileInfo(ctx, partsMetadata, errs) + if err != nil { + return er.purgeObjectDangling(ctx, bucket, object, versionID, partsMetadata, errs, []error{}, opts) + } // List of disks having latest version of the object er.meta // (by modtime). _, modTime, dataDir := listOnlineDisks(storageDisks, partsMetadata, errs) @@ -679,6 +693,15 @@ func defaultHealResult(lfi FileInfo, storageDisks []StorageAPI, storageEndpoints }) } + if !lfi.IsValid() { + // Default to most common configuration for erasure blocks. + result.ParityBlocks = defaultParityCount + result.DataBlocks = len(storageDisks) - defaultParityCount + } else { + result.ParityBlocks = lfi.Erasure.ParityBlocks + result.DataBlocks = lfi.Erasure.DataBlocks + } + return result } @@ -867,34 +890,23 @@ func (er erasureObjects) HealObject(ctx context.Context, bucket, object, version } else { newReqInfo = logger.NewReqInfo("", "", globalDeploymentID, "", "Heal", bucket, object) } - - // Inherit context from the global context to avoid cancellation healCtx := logger.SetReqInfo(GlobalContext, newReqInfo) - storageDisks := er.getDisks() - storageEndpoints := er.getEndpoints() - - if !opts.NoLock { - lk := er.NewNSLock(bucket, object) - lkctx, err := lk.GetLock(healCtx, globalOperationTimeout) - if err != nil { - return defaultHealResult(FileInfo{}, storageDisks, storageEndpoints, nil, bucket, object, versionID, er.defaultParityCount), err - } - healCtx = lkctx.Context() - defer lk.Unlock(lkctx.Cancel) - } - // Healing directories handle it separately. if HasSuffix(object, SlashSeparator) { return er.healObjectDir(healCtx, bucket, object, opts.DryRun, opts.Remove) } + storageDisks := er.getDisks() + storageEndpoints := er.getEndpoints() + // When versionID is empty, we read directly from the `null` versionID for healing. if versionID == "" { versionID = nullVersionID } - // Read metadata files from all the disks + // Perform quick read without lock. + // This allows to quickly check if all is ok or all are missing. partsMetadata, errs := readAllFileInfo(healCtx, storageDisks, bucket, object, versionID, false) if isAllNotFound(errs) { err = toObjectErr(errFileNotFound, bucket, object) @@ -905,9 +917,12 @@ func (er erasureObjects) HealObject(ctx context.Context, bucket, object, version return defaultHealResult(FileInfo{}, storageDisks, storageEndpoints, errs, bucket, object, versionID, er.defaultParityCount), err } - _, err = getLatestFileInfo(healCtx, partsMetadata, errs) - if err != nil { - return er.purgeObjectDangling(healCtx, bucket, object, versionID, partsMetadata, errs, []error{}, opts) + // Return early if all ok and not deep scanning. + if opts.ScanMode == madmin.HealNormalScan && fileInfoConsistent(ctx, partsMetadata, errs) { + fi, err := getLatestFileInfo(ctx, partsMetadata, errs) + if err == nil && fi.VersionID == versionID { + return defaultHealResult(fi, storageDisks, storageEndpoints, errs, bucket, object, versionID, fi.Erasure.ParityBlocks), nil + } } // Heal the object.