results must be a single channel to avoid overwriting healing.bin (#19702)

This commit is contained in:
Harshavardhana
2024-05-09 10:15:03 -07:00
committed by GitHub
parent f5e3eedf34
commit 3549e583a6
7 changed files with 259 additions and 231 deletions

View File

@@ -121,10 +121,6 @@ func listAllBuckets(ctx context.Context, storageDisks []StorageAPI, healBuckets
// we ignore disk not found errors
return nil
}
if storageDisks[index].Healing() != nil {
// we ignore disks under healing
return nil
}
volsInfo, err := storageDisks[index].ListVols(ctx)
if err != nil {
return err
@@ -216,7 +212,7 @@ func (fi FileInfo) DataMov() bool {
return ok
}
func auditHealObject(ctx context.Context, bucket, object, versionID string, result madmin.HealResultItem, err error) {
func (er *erasureObjects) auditHealObject(ctx context.Context, bucket, object, versionID string, result madmin.HealResultItem, err error) {
if len(logger.AuditTargets()) == 0 {
return
}
@@ -231,8 +227,14 @@ func auditHealObject(ctx context.Context, bucket, object, versionID string, resu
opts.Error = err.Error()
}
if result.After.Drives != nil {
opts.Tags = map[string]interface{}{"drives-result": result.After.Drives}
opts.Tags = map[string]interface{}{
"healResult": result,
"objectLocation": auditObjectOp{
Name: decodeDirObject(object),
Pool: er.poolIndex + 1,
Set: er.setIndex + 1,
Drives: er.getEndpointStrings(),
},
}
auditLogInternal(ctx, opts)
@@ -247,7 +249,7 @@ func (er *erasureObjects) healObject(ctx context.Context, bucket string, object
storageEndpoints := er.getEndpoints()
defer func() {
auditHealObject(ctx, bucket, object, versionID, result, err)
er.auditHealObject(ctx, bucket, object, versionID, result, err)
}()
if globalTrace.NumSubscribers(madmin.TraceHealing) > 0 {
@@ -289,21 +291,18 @@ func (er *erasureObjects) healObject(ctx context.Context, bucket string, object
readQuorum, _, err := objectQuorumFromMeta(ctx, partsMetadata, errs, er.defaultParityCount)
if err != nil {
m, err := er.deleteIfDangling(ctx, bucket, object, partsMetadata, errs, nil, ObjectOptions{
m, derr := er.deleteIfDangling(ctx, bucket, object, partsMetadata, errs, nil, ObjectOptions{
VersionID: versionID,
})
errs = make([]error, len(errs))
for i := range errs {
errs[i] = err
}
if err == nil {
// Dangling object successfully purged, size is '0'
m.Size = 0
}
// Generate file/version not found with default heal result
err = errFileNotFound
if versionID != "" {
err = errFileVersionNotFound
if derr == nil {
derr = errFileNotFound
if versionID != "" {
derr = errFileVersionNotFound
}
// We did find a new danging object
return er.defaultHealResult(m, storageDisks, storageEndpoints,
errs, bucket, object, versionID), derr
}
return er.defaultHealResult(m, storageDisks, storageEndpoints,
errs, bucket, object, versionID), err
@@ -360,11 +359,10 @@ func (er *erasureObjects) healObject(ctx context.Context, bucket string, object
switch {
case v != nil:
driveState = madmin.DriveStateOk
case errs[i] == errDiskNotFound, dataErrs[i] == errDiskNotFound:
case errors.Is(errs[i], errDiskNotFound), errors.Is(dataErrs[i], errDiskNotFound):
driveState = madmin.DriveStateOffline
case errs[i] == errFileNotFound, errs[i] == errFileVersionNotFound, errs[i] == errVolumeNotFound:
fallthrough
case dataErrs[i] == errFileNotFound, dataErrs[i] == errFileVersionNotFound, dataErrs[i] == errVolumeNotFound:
case IsErr(errs[i], errFileNotFound, errFileVersionNotFound, errVolumeNotFound),
IsErr(dataErrs[i], errFileNotFound, errFileVersionNotFound, errVolumeNotFound):
driveState = madmin.DriveStateMissing
default:
// all remaining cases imply corrupt data/metadata
@@ -417,18 +415,18 @@ func (er *erasureObjects) healObject(ctx context.Context, bucket string, object
VersionID: versionID,
})
errs = make([]error, len(errs))
if err == nil {
err = errFileNotFound
if versionID != "" {
err = errFileVersionNotFound
}
// We did find a new danging object
return er.defaultHealResult(m, storageDisks, storageEndpoints,
errs, bucket, object, versionID), err
}
for i := range errs {
errs[i] = err
}
if err == nil {
// Dangling object successfully purged, size is '0'
m.Size = 0
}
// Generate file/version not found with default heal result
err = errFileNotFound
if versionID != "" {
err = errFileVersionNotFound
}
return er.defaultHealResult(m, storageDisks, storageEndpoints,
errs, bucket, object, versionID), err
}
@@ -641,6 +639,7 @@ func (er *erasureObjects) checkAbandonedParts(ctx context.Context, bucket string
if !opts.Remove || opts.DryRun {
return nil
}
if globalTrace.NumSubscribers(madmin.TraceHealing) > 0 {
startTime := time.Now()
defer func() {
@@ -983,12 +982,12 @@ func isObjectDangling(metaArr []FileInfo, errs []error, dataErrs []error) (valid
// However this requires a bit of a rewrite, leave this up for
// future work.
if notFoundMetaErrs > 0 && notFoundMetaErrs > validMeta.Erasure.ParityBlocks {
// All xl.meta is beyond data blocks missing, this is dangling
// All xl.meta is beyond parity blocks missing, this is dangling
return validMeta, true
}
if !validMeta.IsRemote() && notFoundPartsErrs > 0 && notFoundPartsErrs > validMeta.Erasure.ParityBlocks {
// All data-dir is beyond data blocks missing, this is dangling
// All data-dir is beyond parity blocks missing, this is dangling
return validMeta, true
}
@@ -1069,8 +1068,7 @@ func healTrace(funcName healingMetric, startTime time.Time, bucket, object strin
}
if err != nil {
tr.Error = err.Error()
} else {
tr.HealResult = result
}
tr.HealResult = result
globalTrace.Publish(tr)
}

View File

@@ -1497,7 +1497,13 @@ func TestHealObjectErasure(t *testing.T) {
er.getDisks = func() []StorageAPI {
// Nil more than half the disks, to remove write quorum.
for i := 0; i <= len(erasureDisks)/2; i++ {
erasureDisks[i] = nil
err := erasureDisks[i].Delete(context.Background(), bucket, object, DeleteOptions{
Recursive: true,
Immediate: false,
})
if err != nil {
t.Fatalf("Failed to delete a file - %v", err)
}
}
return erasureDisks
}

View File

@@ -103,8 +103,12 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d
if err != nil {
if errors.Is(err, errErasureReadQuorum) && !strings.HasPrefix(srcBucket, minioMetaBucket) {
_, derr := er.deleteIfDangling(context.Background(), srcBucket, srcObject, metaArr, errs, nil, srcOpts)
if derr != nil {
err = derr
if derr == nil {
if srcOpts.VersionID != "" {
err = errFileVersionNotFound
} else {
err = errFileNotFound
}
}
}
return ObjectInfo{}, toObjectErr(err, srcBucket, srcObject)
@@ -485,85 +489,82 @@ func joinErrs(errs []error) []string {
}
func (er erasureObjects) deleteIfDangling(ctx context.Context, bucket, object string, metaArr []FileInfo, errs []error, dataErrs []error, opts ObjectOptions) (FileInfo, error) {
var err error
m, ok := isObjectDangling(metaArr, errs, dataErrs)
if ok {
tags := make(map[string]interface{}, 4)
tags["set"] = er.setIndex
tags["pool"] = er.poolIndex
tags["merrs"] = joinErrs(errs)
tags["derrs"] = joinErrs(dataErrs)
if m.IsValid() {
tags["size"] = m.Size
tags["mtime"] = m.ModTime.Format(http.TimeFormat)
tags["data"] = m.Erasure.DataBlocks
tags["parity"] = m.Erasure.ParityBlocks
} else {
tags["invalid-meta"] = true
tags["data"] = er.setDriveCount - er.defaultParityCount
tags["parity"] = er.defaultParityCount
}
// count the number of offline disks
offline := 0
for i := 0; i < max(len(errs), len(dataErrs)); i++ {
if i < len(errs) && errors.Is(errs[i], errDiskNotFound) || i < len(dataErrs) && errors.Is(dataErrs[i], errDiskNotFound) {
offline++
}
}
if offline > 0 {
tags["offline"] = offline
}
_, file, line, cok := runtime.Caller(1)
if cok {
tags["caller"] = fmt.Sprintf("%s:%d", file, line)
}
defer auditDanglingObjectDeletion(ctx, bucket, object, m.VersionID, tags)
err = errFileNotFound
if opts.VersionID != "" {
err = errFileVersionNotFound
}
fi := FileInfo{
VersionID: m.VersionID,
}
if opts.VersionID != "" {
fi.VersionID = opts.VersionID
}
fi.SetTierFreeVersionID(mustGetUUID())
disks := er.getDisks()
g := errgroup.WithNErrs(len(disks))
for index := range disks {
index := index
g.Go(func() error {
if disks[index] == nil {
return errDiskNotFound
}
return disks[index].DeleteVersion(ctx, bucket, object, fi, false, DeleteOptions{})
}, index)
}
rmDisks := make(map[string]string, len(disks))
for index, err := range g.Wait() {
var errStr, diskName string
if err != nil {
errStr = err.Error()
} else {
errStr = "<nil>"
}
if disks[index] != nil {
diskName = disks[index].String()
} else {
diskName = fmt.Sprintf("disk-%d", index)
}
rmDisks[diskName] = errStr
}
tags["cleanupResult"] = rmDisks
if !ok {
// We only come here if we cannot figure out if the object
// can be deleted safely, in such a scenario return ReadQuorum error.
return FileInfo{}, errErasureReadQuorum
}
return m, err
tags := make(map[string]interface{}, 4)
tags["set"] = er.setIndex
tags["pool"] = er.poolIndex
tags["merrs"] = joinErrs(errs)
tags["derrs"] = joinErrs(dataErrs)
if m.IsValid() {
tags["size"] = m.Size
tags["mtime"] = m.ModTime.Format(http.TimeFormat)
tags["data"] = m.Erasure.DataBlocks
tags["parity"] = m.Erasure.ParityBlocks
} else {
tags["invalid-meta"] = true
tags["data"] = er.setDriveCount - er.defaultParityCount
tags["parity"] = er.defaultParityCount
}
// count the number of offline disks
offline := 0
for i := 0; i < max(len(errs), len(dataErrs)); i++ {
if i < len(errs) && errors.Is(errs[i], errDiskNotFound) || i < len(dataErrs) && errors.Is(dataErrs[i], errDiskNotFound) {
offline++
}
}
if offline > 0 {
tags["offline"] = offline
}
_, file, line, cok := runtime.Caller(1)
if cok {
tags["caller"] = fmt.Sprintf("%s:%d", file, line)
}
defer auditDanglingObjectDeletion(ctx, bucket, object, m.VersionID, tags)
fi := FileInfo{
VersionID: m.VersionID,
}
if opts.VersionID != "" {
fi.VersionID = opts.VersionID
}
fi.SetTierFreeVersionID(mustGetUUID())
disks := er.getDisks()
g := errgroup.WithNErrs(len(disks))
for index := range disks {
index := index
g.Go(func() error {
if disks[index] == nil {
return errDiskNotFound
}
return disks[index].DeleteVersion(ctx, bucket, object, fi, false, DeleteOptions{})
}, index)
}
rmDisks := make(map[string]string, len(disks))
for index, err := range g.Wait() {
var errStr, diskName string
if err != nil {
errStr = err.Error()
} else {
errStr = "<nil>"
}
if disks[index] != nil {
diskName = disks[index].String()
} else {
diskName = fmt.Sprintf("disk-%d", index)
}
rmDisks[diskName] = errStr
}
tags["cleanupResult"] = rmDisks
return m, nil
}
func fileInfoFromRaw(ri RawFileInfo, bucket, object string, readData, inclFreeVers, allParts bool) (FileInfo, error) {
@@ -925,8 +926,12 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s
// not we simply ignore it, since we can't tell for sure if its dangling object.
if totalResp == er.setDriveCount && shouldCheckForDangling(err, errs, bucket) {
_, derr := er.deleteIfDangling(context.Background(), bucket, object, metaArr, errs, nil, opts)
if derr != nil {
err = derr
if derr == nil {
if opts.VersionID != "" {
err = errFileVersionNotFound
} else {
err = errFileNotFound
}
}
}
return fi, nil, nil, toObjectErr(err, bucket, object)
@@ -2141,8 +2146,12 @@ func (er erasureObjects) PutObjectMetadata(ctx context.Context, bucket, object s
if err != nil {
if errors.Is(err, errErasureReadQuorum) && !strings.HasPrefix(bucket, minioMetaBucket) {
_, derr := er.deleteIfDangling(context.Background(), bucket, object, metaArr, errs, nil, opts)
if derr != nil {
err = derr
if derr == nil {
if opts.VersionID != "" {
err = errFileVersionNotFound
} else {
err = errFileNotFound
}
}
}
return ObjectInfo{}, toObjectErr(err, bucket, object)
@@ -2214,8 +2223,12 @@ func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object strin
if err != nil {
if errors.Is(err, errErasureReadQuorum) && !strings.HasPrefix(bucket, minioMetaBucket) {
_, derr := er.deleteIfDangling(context.Background(), bucket, object, metaArr, errs, nil, opts)
if derr != nil {
err = derr
if derr == nil {
if opts.VersionID != "" {
err = errFileVersionNotFound
} else {
err = errFileNotFound
}
}
}
return ObjectInfo{}, toObjectErr(err, bucket, object)

View File

@@ -554,10 +554,10 @@ func (s *erasureSets) cleanupStaleUploads(ctx context.Context) {
}
type auditObjectOp struct {
Name string `json:"name"`
Pool int `json:"poolId"`
Set int `json:"setId"`
Disks []string `json:"disks"`
Name string `json:"name"`
Pool int `json:"poolId"`
Set int `json:"setId"`
Drives []string `json:"drives"`
}
// Add erasure set information to the current context
@@ -567,10 +567,10 @@ func auditObjectErasureSet(ctx context.Context, object string, set *erasureObjec
}
op := auditObjectOp{
Name: decodeDirObject(object),
Pool: set.poolIndex + 1,
Set: set.setIndex + 1,
Disks: set.getEndpointStrings(),
Name: decodeDirObject(object),
Pool: set.poolIndex + 1,
Set: set.setIndex + 1,
Drives: set.getEndpointStrings(),
}
logger.GetReqInfo(ctx).AppendTags("objectLocation", op)

View File

@@ -34,7 +34,6 @@ import (
"github.com/minio/minio/internal/bucket/versioning"
"github.com/minio/minio/internal/color"
"github.com/minio/minio/internal/config/storageclass"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger"
"github.com/minio/pkg/v2/console"
"github.com/minio/pkg/v2/wildcard"
@@ -141,6 +140,14 @@ func getLocalBackgroundHealStatus(ctx context.Context, o ObjectLayer) (madmin.Bg
return status, true
}
type healEntryResult struct {
bytes uint64
success bool
skipped bool
entryDone bool
name string
}
// healErasureSet lists and heals all objects in a specific erasure set
func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, tracker *healingTracker) error {
scanMode := madmin.HealNormalScan
@@ -187,21 +194,68 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
jt, _ := workers.New(int(numHealers))
healEntryDone := func(name string) healEntryResult {
return healEntryResult{
entryDone: true,
name: name,
}
}
healEntrySuccess := func(sz uint64) healEntryResult {
return healEntryResult{
bytes: sz,
success: true,
}
}
healEntryFailure := func(sz uint64) healEntryResult {
return healEntryResult{
bytes: sz,
}
}
healEntrySkipped := func(sz uint64) healEntryResult {
return healEntryResult{
bytes: sz,
skipped: true,
}
}
// Collect updates to tracker from concurrent healEntry calls
results := make(chan healEntryResult, 1000)
defer close(results)
go func() {
for res := range results {
if res.entryDone {
tracker.setObject(res.name)
if time.Since(tracker.getLastUpdate()) > time.Minute {
healingLogIf(ctx, tracker.update(ctx))
}
continue
}
tracker.updateProgress(res.success, res.skipped, res.bytes)
}
}()
var retErr error
// Heal all buckets with all objects
for _, bucket := range healBuckets {
if tracker.isHealed(bucket) {
continue
}
var forwardTo string
// If we resume to the same bucket, forward to last known item.
if b := tracker.getBucket(); b != "" {
if b == bucket {
forwardTo = tracker.getObject()
} else {
// Reset to where last bucket ended if resuming.
tracker.resume()
}
b := tracker.getBucket()
if b == bucket {
forwardTo = tracker.getObject()
}
if b != "" {
// Reset to where last bucket ended if resuming.
tracker.resume()
}
tracker.setObject("")
tracker.setBucket(bucket)
@@ -280,37 +334,6 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
fallbackDisks := disks[expectedDisks:]
disks = disks[:expectedDisks]
type healEntryResult struct {
bytes uint64
success bool
skipped bool
entryDone bool
name string
}
healEntryDone := func(name string) healEntryResult {
return healEntryResult{
entryDone: true,
name: name,
}
}
healEntrySuccess := func(sz uint64) healEntryResult {
return healEntryResult{
bytes: sz,
success: true,
}
}
healEntryFailure := func(sz uint64) healEntryResult {
return healEntryResult{
bytes: sz,
}
}
healEntrySkipped := func(sz uint64) healEntryResult {
return healEntryResult{
bytes: sz,
skipped: true,
}
}
filterLifecycle := func(bucket, object string, fi FileInfo) bool {
if lc == nil {
return false
@@ -331,22 +354,6 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
}
}
// Collect updates to tracker from concurrent healEntry calls
results := make(chan healEntryResult, 1000)
go func() {
for res := range results {
if res.entryDone {
tracker.setObject(res.name)
if time.Since(tracker.getLastUpdate()) > time.Minute {
healingLogIf(ctx, tracker.update(ctx))
}
continue
}
tracker.updateProgress(res.success, res.skipped, res.bytes)
}
}()
send := func(result healEntryResult) bool {
select {
case <-ctx.Done():
@@ -393,7 +400,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
var result healEntryResult
fivs, err := entry.fileInfoVersions(bucket)
if err != nil {
_, err := er.HealObject(ctx, bucket, encodedEntryName, "",
res, err := er.HealObject(ctx, bucket, encodedEntryName, "",
madmin.HealOpts{
ScanMode: scanMode,
Remove: healDeleteDangling,
@@ -407,7 +414,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
result = healEntryFailure(0)
healingLogIf(ctx, fmt.Errorf("unable to heal object %s/%s: %w", bucket, entry.name, err))
} else {
result = healEntrySuccess(0)
result = healEntrySuccess(uint64(res.ObjectSize))
}
send(result)
@@ -430,11 +437,12 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
continue
}
if _, err := er.HealObject(ctx, bucket, encodedEntryName,
res, err := er.HealObject(ctx, bucket, encodedEntryName,
version.VersionID, madmin.HealOpts{
ScanMode: scanMode,
Remove: healDeleteDangling,
}); err != nil {
})
if err != nil {
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
// queueing happens across namespace, ignore
// objects that are not found.
@@ -449,22 +457,20 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
healingLogIf(ctx, fmt.Errorf("unable to heal object %s/%s: %w", bucket, version.Name, err))
}
} else {
result = healEntrySuccess(uint64(version.Size))
result = healEntrySuccess(uint64(res.ObjectSize))
}
if !send(result) {
return
}
}
// All versions resulted in 'ObjectNotFound/VersionNotFound'
if versionNotFound == len(fivs.Versions) {
return
}
select {
case <-ctx.Done():
return
case results <- healEntryDone(entry.name):
}
send(healEntryDone(entry.name))
// Wait and proceed if there are active requests
waitForLowHTTPReq()
@@ -502,7 +508,6 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
finished: nil,
})
jt.Wait() // synchronize all the concurrent heal jobs
xioutil.SafeClose(results)
if err != nil {
// Set this such that when we return this function
// we let the caller retry this disk again for the

View File

@@ -417,6 +417,10 @@ func (s *xlStorage) Healing() *healingTracker {
if err != nil {
return nil
}
if len(b) == 0 {
// 'healing.bin' might be truncated
return nil
}
h := newHealingTracker()
_, err = h.UnmarshalMsg(b)
bugLogIf(GlobalContext, err)