remove numAvailableDisks check as it doesn't serve any purpose (#15954)

This commit is contained in:
Harshavardhana 2022-10-27 09:05:24 -07:00 committed by GitHub
parent ec77d28e62
commit 136d41775f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 30 additions and 41 deletions

View File

@ -339,7 +339,24 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
readQuorum, _, err := objectQuorumFromMeta(ctx, partsMetadata, errs, er.defaultParityCount)
if err != nil {
return er.purgeObjectDangling(ctx, bucket, object, versionID, partsMetadata, errs, nil, opts)
m, err := er.deleteIfDangling(ctx, bucket, object, partsMetadata, errs, nil, ObjectOptions{
VersionID: versionID,
})
errs = make([]error, len(errs))
for i := range errs {
errs[i] = err
}
if err == nil {
// Dangling object successfully purged, size is '0'
m.Size = 0
}
// Generate file/version not found with default heal result
err = errFileNotFound
if versionID != "" {
err = errFileVersionNotFound
}
return er.defaultHealResult(m, storageDisks, storageEndpoints,
errs, bucket, object, versionID), err
}
result.ParityBlocks = result.DiskCount - readQuorum
@ -392,14 +409,12 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
// data state and a list of outdated disks on which data needs
// to be healed.
outDatedDisks := make([]StorageAPI, len(storageDisks))
numAvailableDisks := 0
disksToHealCount := 0
for i, v := range availableDisks {
driveState := ""
switch {
case v != nil:
driveState = madmin.DriveStateOk
numAvailableDisks++
// If data is sane on any one disk, we can
// extract the correct object size.
result.ObjectSize = partsMetadata[i].Size
@ -451,12 +466,6 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
bucket, object, versionID), err
}
// If less than read quorum number of disks have all the parts
// of the data, we can't reconstruct the erasure-coded data.
if numAvailableDisks < readQuorum {
return er.purgeObjectDangling(ctx, bucket, object, versionID, partsMetadata, errs, dataErrs, opts)
}
if disksToHealCount == 0 {
// Nothing to heal!
return result, nil
@ -879,32 +888,6 @@ func isObjectDirDangling(errs []error) (ok bool) {
return found < notFound && found > 0
}
func (er erasureObjects) purgeObjectDangling(ctx context.Context, bucket, object, versionID string,
metaArr []FileInfo, errs []error, dataErrs []error, opts madmin.HealOpts) (madmin.HealResultItem, error,
) {
storageDisks := er.getDisks()
storageEndpoints := er.getEndpoints()
m, err := er.deleteIfDangling(ctx, bucket, object, metaArr, errs, dataErrs, ObjectOptions{
VersionID: versionID,
})
errs = make([]error, len(errs))
for i := range errs {
errs[i] = err
}
if err == nil {
// Dangling object successfully purged, size is '0'
m.Size = 0
}
// Generate file/version not found with default heal result
err = errFileNotFound
if versionID != "" {
err = errFileVersionNotFound
}
return er.defaultHealResult(m, storageDisks, storageEndpoints,
errs, bucket, object, versionID), err
}
// Object is considered dangling/corrupted if any only
// if total disks - a combination of corrupted and missing
// files is lesser than number of data blocks.

View File

@ -25,6 +25,7 @@ import (
"io"
"net/http"
"path"
"runtime"
"strconv"
"strings"
"sync"
@ -439,14 +440,10 @@ func (er erasureObjects) GetObjectInfo(ctx context.Context, bucket, object strin
return er.getObjectInfo(ctx, bucket, object, opts)
}
func auditDanglingObjectDeletion(ctx context.Context, bucket, object, versionID string, pool, set, objectParity int) {
func auditDanglingObjectDeletion(ctx context.Context, bucket, object, versionID string, tags map[string]interface{}) {
if len(logger.AuditTargets()) == 0 {
return
}
tags := make(map[string]interface{})
tags["pool"] = pool
tags["set"] = set
tags["objectParity"] = objectParity
opts := AuditLogOptions{
Event: "DeleteDanglingObject",
@ -460,10 +457,19 @@ func auditDanglingObjectDeletion(ctx context.Context, bucket, object, versionID
}
func (er erasureObjects) deleteIfDangling(ctx context.Context, bucket, object string, metaArr []FileInfo, errs []error, dataErrs []error, opts ObjectOptions) (FileInfo, error) {
_, file, line, cok := runtime.Caller(1)
var err error
m, ok := isObjectDangling(metaArr, errs, dataErrs)
if ok {
defer auditDanglingObjectDeletion(ctx, bucket, object, m.VersionID, er.poolIndex, er.setIndex, m.Erasure.ParityBlocks)
tags := make(map[string]interface{}, 4)
tags["set"] = er.setIndex
tags["pool"] = er.poolIndex
tags["parity"] = m.Erasure.ParityBlocks
if cok {
tags["caller"] = fmt.Sprintf("%s:%d", file, line)
}
defer auditDanglingObjectDeletion(ctx, bucket, object, m.VersionID, tags)
err = errFileNotFound
if opts.VersionID != "" {