diff --git a/cmd/erasure-healing.go b/cmd/erasure-healing.go index 24ab1ea98..ad2b0bc36 100644 --- a/cmd/erasure-healing.go +++ b/cmd/erasure-healing.go @@ -357,7 +357,8 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s err = toObjectErr(errFileVersionNotFound, bucket, object, versionID) } // File is fully gone, fileInfo is empty. - return defaultHealResult(FileInfo{}, storageDisks, storageEndpoints, errs, bucket, object, versionID, er.defaultParityCount), err + return er.defaultHealResult(FileInfo{}, storageDisks, storageEndpoints, errs, + bucket, object, versionID), err } // If less than read quorum number of disks have all the parts @@ -642,30 +643,25 @@ func (er erasureObjects) healObjectDir(ctx context.Context, bucket, object strin // Populates default heal result item entries with possible values when we are returning prematurely. // This is to ensure that in any circumstance we are not returning empty arrays with wrong values. -func defaultHealResult(lfi FileInfo, storageDisks []StorageAPI, storageEndpoints []string, errs []error, bucket, object, versionID string, defaultParityCount int) madmin.HealResultItem { +func (er erasureObjects) defaultHealResult(lfi FileInfo, storageDisks []StorageAPI, storageEndpoints []string, errs []error, bucket, object, versionID string) madmin.HealResultItem { // Initialize heal result object result := madmin.HealResultItem{ - Type: madmin.HealItemObject, - Bucket: bucket, - Object: object, - VersionID: versionID, - DiskCount: len(storageDisks), + Type: madmin.HealItemObject, + Bucket: bucket, + Object: object, + ObjectSize: lfi.Size, + VersionID: versionID, + DiskCount: len(storageDisks), } if lfi.IsValid() { - result.ObjectSize = lfi.Size result.ParityBlocks = lfi.Erasure.ParityBlocks } else { // Default to most common configuration for erasure blocks. - result.ParityBlocks = defaultParityCount + result.ParityBlocks = er.defaultParityCount } result.DataBlocks = len(storageDisks) - result.ParityBlocks - if errs == nil { - // No disks related errors are provided, quit - return result - } - for index, disk := range storageDisks { if disk == nil { result.Before.Drives = append(result.Before.Drives, madmin.HealDriveInfo{ @@ -684,6 +680,8 @@ func defaultHealResult(lfi FileInfo, storageDisks []StorageAPI, storageEndpoints switch errs[index] { case errFileNotFound, errVolumeNotFound: driveState = madmin.DriveStateMissing + case nil: + driveState = madmin.DriveStateOk } result.Before.Drives = append(result.Before.Drives, madmin.HealDriveInfo{ UUID: "", @@ -697,15 +695,6 @@ func defaultHealResult(lfi FileInfo, storageDisks []StorageAPI, storageEndpoints }) } - if !lfi.IsValid() { - // Default to most common configuration for erasure blocks. - result.ParityBlocks = defaultParityCount - result.DataBlocks = len(storageDisks) - defaultParityCount - } else { - result.ParityBlocks = lfi.Erasure.ParityBlocks - result.DataBlocks = lfi.Erasure.DataBlocks - } - return result } @@ -820,15 +809,18 @@ func (er erasureObjects) purgeObjectDangling(ctx context.Context, bucket, object if versionID != "" { err = toObjectErr(errFileVersionNotFound, bucket, object, versionID) } - return defaultHealResult(m, storageDisks, storageEndpoints, errs, bucket, object, versionID, er.defaultParityCount), err + return er.defaultHealResult(m, storageDisks, storageEndpoints, + errs, bucket, object, versionID), err } - return defaultHealResult(m, storageDisks, storageEndpoints, errs, bucket, object, versionID, er.defaultParityCount), toObjectErr(err, bucket, object, versionID) + return er.defaultHealResult(m, storageDisks, storageEndpoints, + errs, bucket, object, versionID), toObjectErr(err, bucket, object, versionID) } readQuorum := len(storageDisks) - er.defaultParityCount - err := toObjectErr(reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum), bucket, object, versionID) - return defaultHealResult(m, storageDisks, storageEndpoints, errs, bucket, object, versionID, er.defaultParityCount), err + err := toObjectErr(reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum), + bucket, object, versionID) + return er.defaultHealResult(m, storageDisks, storageEndpoints, errs, bucket, object, versionID), err } // Object is considered dangling/corrupted if any only @@ -921,14 +913,16 @@ func (er erasureObjects) HealObject(ctx context.Context, bucket, object, version err = toObjectErr(errFileVersionNotFound, bucket, object, versionID) } // Nothing to do, file is already gone. - return defaultHealResult(FileInfo{}, storageDisks, storageEndpoints, errs, bucket, object, versionID, er.defaultParityCount), err + return er.defaultHealResult(FileInfo{}, storageDisks, storageEndpoints, + errs, bucket, object, versionID), err } // Return early if all ok and not deep scanning. if opts.ScanMode == madmin.HealNormalScan && fileInfoConsistent(ctx, partsMetadata, errs) { fi, err := getLatestFileInfo(ctx, partsMetadata, errs) if err == nil && fi.VersionID == versionID { - return defaultHealResult(fi, storageDisks, storageEndpoints, errs, bucket, object, versionID, fi.Erasure.ParityBlocks), nil + return er.defaultHealResult(fi, storageDisks, storageEndpoints, + errs, bucket, object, versionID), nil } } diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 4b1bef5e4..9acd510f0 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -442,7 +442,9 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s } // if missing metadata can be reconstructed, attempt to reconstruct. - if missingBlocks > 0 && missingBlocks < readQuorum { + // additionally do not heal delete markers inline, let them be + // healed upon regular heal process. + if !fi.Deleted && missingBlocks > 0 && missingBlocks < readQuorum { if _, healing := er.getOnlineDisksWithHealing(); !healing { go healObject(bucket, object, fi.VersionID, madmin.HealNormalScan) } diff --git a/cmd/metacache-set.go b/cmd/metacache-set.go index 841607cc3..4aa05721b 100644 --- a/cmd/metacache-set.go +++ b/cmd/metacache-set.go @@ -906,8 +906,12 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) { var combinedErr []string for i, err := range errs { if err != nil { - combinedErr = append(combinedErr, - fmt.Sprintf("disk %s returned: %s", disks[i], err)) + if disks[i] != nil { + combinedErr = append(combinedErr, + fmt.Sprintf("disk %s returned: %s", disks[i], err)) + } else { + combinedErr = append(combinedErr, err.Error()) + } } } return errors.New(strings.Join(combinedErr, ", "))