audit: Fix merrs and derrs object dangling message (#18714)

merrs and derrs are empty when a dangling object is deleted. Fix the bug
and adds invalid-meta data for data blocks
This commit is contained in:
Anis Eleuch 2023-12-27 22:27:04 -08:00 committed by GitHub
parent fbd8dfe60f
commit 8a0ba093dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 50 additions and 28 deletions

View File

@ -23,7 +23,6 @@ import (
"errors"
"fmt"
"io"
"math"
"math/rand"
"net/http"
"net/url"
@ -1973,10 +1972,10 @@ func (p *ReplicationPool) ResizeWorkerPriority(pri string, maxWorkers int) {
workers = WorkerAutoDefault
mrfWorkers = MRFWorkerAutoDefault
if len(p.workers) < WorkerAutoDefault {
workers = int(math.Min(float64(len(p.workers)+1), WorkerAutoDefault))
workers = min(len(p.workers)+1, WorkerAutoDefault)
}
if p.mrfWorkerSize < MRFWorkerAutoDefault {
mrfWorkers = int(math.Min(float64(p.mrfWorkerSize+1), MRFWorkerAutoDefault))
mrfWorkers = min(p.mrfWorkerSize+1, MRFWorkerAutoDefault)
}
}
if maxWorkers > 0 && workers > maxWorkers {
@ -2069,18 +2068,18 @@ func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) {
case "slow":
logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming traffic - we recommend increasing replication priority with `mc admin config set api replication_priority=auto`"), string(replicationSubsystem))
default:
maxWorkers = int(math.Min(float64(maxWorkers), WorkerMaxLimit))
maxWorkers = min(maxWorkers, WorkerMaxLimit)
if p.ActiveWorkers() < maxWorkers {
p.mu.RLock()
workers := int(math.Min(float64(len(p.workers)+1), float64(maxWorkers)))
workers := min(len(p.workers)+1, maxWorkers)
existing := len(p.workers)
p.mu.RUnlock()
p.ResizeWorkers(workers, existing)
}
maxMRFWorkers := int(math.Min(float64(maxWorkers), MRFWorkerMaxLimit))
maxMRFWorkers := min(maxWorkers, MRFWorkerMaxLimit)
if p.ActiveMRFWorkers() < maxMRFWorkers {
p.mu.RLock()
workers := int(math.Min(float64(p.mrfWorkerSize+1), float64(maxMRFWorkers)))
workers := min(p.mrfWorkerSize+1, maxMRFWorkers)
p.mu.RUnlock()
p.ResizeFailedWorkers(workers)
}
@ -2126,10 +2125,10 @@ func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectReplicationInf
case "slow":
logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming deletes - we recommend increasing replication priority with `mc admin config set api replication_priority=auto`"), string(replicationSubsystem))
default:
maxWorkers = int(math.Min(float64(maxWorkers), WorkerMaxLimit))
maxWorkers = min(maxWorkers, WorkerMaxLimit)
if p.ActiveWorkers() < maxWorkers {
p.mu.RLock()
workers := int(math.Min(float64(len(p.workers)+1), float64(maxWorkers)))
workers := min(len(p.workers)+1, maxWorkers)
existing := len(p.workers)
p.mu.RUnlock()
p.ResizeWorkers(workers, existing)

View File

@ -476,40 +476,50 @@ func auditDanglingObjectDeletion(ctx context.Context, bucket, object, versionID
auditLogInternal(ctx, opts)
}
func joinErrors(errs ...error) error {
s := make([]string, 0, len(errs))
nonNilErrs := make([]any, 0, len(errs))
for _, err := range errs {
if err == nil {
continue
func joinErrs(errs []error) []string {
s := make([]string, len(errs))
for i := range s {
if errs[i] == nil {
s[i] = "<nil>"
} else {
s[i] = errs[i].Error()
}
s = append(s, "[%w]")
nonNilErrs = append(nonNilErrs, err)
}
// If all the errors were nil, return nil.
if len(nonNilErrs) == 0 {
return nil
}
allErrs := strings.Join(s, "\n")
return fmt.Errorf(allErrs, nonNilErrs...)
return s
}
func (er erasureObjects) deleteIfDangling(ctx context.Context, bucket, object string, metaArr []FileInfo, errs []error, dataErrs []error, opts ObjectOptions) (FileInfo, error) {
_, file, line, cok := runtime.Caller(1)
var err error
m, ok := isObjectDangling(metaArr, errs, dataErrs)
if ok {
tags := make(map[string]interface{}, 4)
tags["set"] = er.setIndex
tags["pool"] = er.poolIndex
tags["merrs"] = joinErrors(errs...) // errors.Join(errs...)
tags["derrs"] = joinErrors(dataErrs...) // errors.Join(dataErrs...)
tags["merrs"] = joinErrs(errs)
tags["derrs"] = joinErrs(dataErrs)
if m.IsValid() {
tags["size"] = m.Size
tags["mtime"] = m.ModTime.Format(http.TimeFormat)
tags["data"] = m.Erasure.DataBlocks
tags["parity"] = m.Erasure.ParityBlocks
} else {
tags["invalid-meta"] = true
tags["data"] = er.setDriveCount - er.defaultParityCount
tags["parity"] = er.defaultParityCount
}
// count the number of offline disks
offline := 0
for i := 0; i < max(len(errs), len(dataErrs)); i++ {
if i < len(errs) && errs[i] == errDiskNotFound || i < len(dataErrs) && dataErrs[i] == errDiskNotFound {
offline++
}
}
if offline > 0 {
tags["offline"] = offline
}
_, file, line, cok := runtime.Caller(1)
if cok {
tags["caller"] = fmt.Sprintf("%s:%d", file, line)
}

View File

@ -19,7 +19,6 @@ package cmd
import (
"fmt"
"math"
"runtime"
"strings"
"time"
@ -64,7 +63,7 @@ func colorizeUpdateMessage(updateString string, newerThan string) string {
line2InColor := fmt.Sprintf(msgLine2Fmt, color.CyanBold(updateString))
// calculate the rectangular box size.
maxContentWidth := int(math.Max(float64(line1Length), float64(line2Length)))
maxContentWidth := max(line1Length, line2Length)
// termWidth is set to a default one to use when we are
// not able to calculate terminal width via OS syscalls

View File

@ -1273,3 +1273,17 @@ func stringsHasPrefixFold(s, prefix string) bool {
func ptr[T any](a T) *T {
return &a
}
func max(a, b int) int {
if a > b {
return a
}
return b
}
func min(a, b int) int {
if a < b {
return a
}
return b
}