diff --git a/cmd/erasure-healing.go b/cmd/erasure-healing.go index 487309f10..8e1e58c55 100644 --- a/cmd/erasure-healing.go +++ b/cmd/erasure-healing.go @@ -339,7 +339,24 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s readQuorum, _, err := objectQuorumFromMeta(ctx, partsMetadata, errs, er.defaultParityCount) if err != nil { - return er.purgeObjectDangling(ctx, bucket, object, versionID, partsMetadata, errs, nil, opts) + m, err := er.deleteIfDangling(ctx, bucket, object, partsMetadata, errs, nil, ObjectOptions{ + VersionID: versionID, + }) + errs = make([]error, len(errs)) + for i := range errs { + errs[i] = err + } + if err == nil { + // Dangling object successfully purged, size is '0' + m.Size = 0 + } + // Generate file/version not found with default heal result + err = errFileNotFound + if versionID != "" { + err = errFileVersionNotFound + } + return er.defaultHealResult(m, storageDisks, storageEndpoints, + errs, bucket, object, versionID), err } result.ParityBlocks = result.DiskCount - readQuorum @@ -392,14 +409,12 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s // data state and a list of outdated disks on which data needs // to be healed. outDatedDisks := make([]StorageAPI, len(storageDisks)) - numAvailableDisks := 0 disksToHealCount := 0 for i, v := range availableDisks { driveState := "" switch { case v != nil: driveState = madmin.DriveStateOk - numAvailableDisks++ // If data is sane on any one disk, we can // extract the correct object size. result.ObjectSize = partsMetadata[i].Size @@ -451,12 +466,6 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s bucket, object, versionID), err } - // If less than read quorum number of disks have all the parts - // of the data, we can't reconstruct the erasure-coded data. - if numAvailableDisks < readQuorum { - return er.purgeObjectDangling(ctx, bucket, object, versionID, partsMetadata, errs, dataErrs, opts) - } - if disksToHealCount == 0 { // Nothing to heal! return result, nil @@ -879,32 +888,6 @@ func isObjectDirDangling(errs []error) (ok bool) { return found < notFound && found > 0 } -func (er erasureObjects) purgeObjectDangling(ctx context.Context, bucket, object, versionID string, - metaArr []FileInfo, errs []error, dataErrs []error, opts madmin.HealOpts) (madmin.HealResultItem, error, -) { - storageDisks := er.getDisks() - storageEndpoints := er.getEndpoints() - - m, err := er.deleteIfDangling(ctx, bucket, object, metaArr, errs, dataErrs, ObjectOptions{ - VersionID: versionID, - }) - errs = make([]error, len(errs)) - for i := range errs { - errs[i] = err - } - if err == nil { - // Dangling object successfully purged, size is '0' - m.Size = 0 - } - // Generate file/version not found with default heal result - err = errFileNotFound - if versionID != "" { - err = errFileVersionNotFound - } - return er.defaultHealResult(m, storageDisks, storageEndpoints, - errs, bucket, object, versionID), err -} - // Object is considered dangling/corrupted if any only // if total disks - a combination of corrupted and missing // files is lesser than number of data blocks. diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 8a8036955..f62ac08e5 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -25,6 +25,7 @@ import ( "io" "net/http" "path" + "runtime" "strconv" "strings" "sync" @@ -439,14 +440,10 @@ func (er erasureObjects) GetObjectInfo(ctx context.Context, bucket, object strin return er.getObjectInfo(ctx, bucket, object, opts) } -func auditDanglingObjectDeletion(ctx context.Context, bucket, object, versionID string, pool, set, objectParity int) { +func auditDanglingObjectDeletion(ctx context.Context, bucket, object, versionID string, tags map[string]interface{}) { if len(logger.AuditTargets()) == 0 { return } - tags := make(map[string]interface{}) - tags["pool"] = pool - tags["set"] = set - tags["objectParity"] = objectParity opts := AuditLogOptions{ Event: "DeleteDanglingObject", @@ -460,10 +457,19 @@ func auditDanglingObjectDeletion(ctx context.Context, bucket, object, versionID } func (er erasureObjects) deleteIfDangling(ctx context.Context, bucket, object string, metaArr []FileInfo, errs []error, dataErrs []error, opts ObjectOptions) (FileInfo, error) { + _, file, line, cok := runtime.Caller(1) var err error m, ok := isObjectDangling(metaArr, errs, dataErrs) if ok { - defer auditDanglingObjectDeletion(ctx, bucket, object, m.VersionID, er.poolIndex, er.setIndex, m.Erasure.ParityBlocks) + tags := make(map[string]interface{}, 4) + tags["set"] = er.setIndex + tags["pool"] = er.poolIndex + tags["parity"] = m.Erasure.ParityBlocks + if cok { + tags["caller"] = fmt.Sprintf("%s:%d", file, line) + } + + defer auditDanglingObjectDeletion(ctx, bucket, object, m.VersionID, tags) err = errFileNotFound if opts.VersionID != "" {