mirror of
https://github.com/minio/minio.git
synced 2025-01-22 20:23:14 -05:00
Add detailed scanner trace output and notifications (#16668)
This commit is contained in:
parent
8a08861dd9
commit
fd6622458b
@ -1142,6 +1142,17 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
|
||||
UserAgent: r.UserAgent(),
|
||||
Host: handlers.GetSourceIP(r),
|
||||
})
|
||||
if objInfo.NumVersions > dataScannerExcessiveVersionsThreshold {
|
||||
defer sendEvent(eventArgs{
|
||||
EventName: event.ObjectManyVersions,
|
||||
BucketName: objInfo.Bucket,
|
||||
Object: objInfo,
|
||||
ReqParams: extractReqParams(r),
|
||||
RespElements: extractRespElements(w),
|
||||
UserAgent: r.UserAgent(),
|
||||
Host: handlers.GetSourceIP(r),
|
||||
})
|
||||
}
|
||||
|
||||
if redirectURL != nil { // success_action_redirect is valid and set.
|
||||
v := redirectURL.Query()
|
||||
|
@ -69,9 +69,9 @@ const (
|
||||
|
||||
// log scanner action.
|
||||
// Use for s > scannerMetricStartTrace
|
||||
func (p *scannerMetrics) log(s scannerMetric, paths ...string) func() {
|
||||
func (p *scannerMetrics) log(s scannerMetric, paths ...string) func(custom map[string]string) {
|
||||
startTime := time.Now()
|
||||
return func() {
|
||||
return func(custom map[string]string) {
|
||||
duration := time.Since(startTime)
|
||||
|
||||
atomic.AddUint64(&p.operations[s], 1)
|
||||
@ -80,7 +80,7 @@ func (p *scannerMetrics) log(s scannerMetric, paths ...string) func() {
|
||||
}
|
||||
|
||||
if s > scannerMetricStartTrace && globalTrace.NumSubscribers(madmin.TraceScanner) > 0 {
|
||||
globalTrace.Publish(scannerTrace(s, startTime, duration, strings.Join(paths, " ")))
|
||||
globalTrace.Publish(scannerTrace(s, startTime, duration, strings.Join(paths, " "), custom))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -29,6 +29,7 @@ import (
|
||||
"math/rand"
|
||||
"os"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@ -59,6 +60,9 @@ const (
|
||||
healDeleteDangling = true
|
||||
healFolderIncludeProb = 32 // Include a clean folder one in n cycles.
|
||||
healObjectSelectProb = 512 // Overall probability of a file being scanned; one in n.
|
||||
|
||||
dataScannerExcessiveVersionsThreshold = 1000 // Issue a warning when a single object has more versions than this
|
||||
dataScannerExcessiveFoldersThreshold = 50000 // Issue a warning when a folder has more subfolders than this in a *set*
|
||||
)
|
||||
|
||||
var (
|
||||
@ -207,7 +211,11 @@ func runDataScanner(ctx context.Context, objAPI ObjectLayer) {
|
||||
logger.LogIf(ctx, err)
|
||||
err = objAPI.NSScanner(ctx, bf, results, uint32(cycleInfo.current), scanMode)
|
||||
logger.LogIf(ctx, err)
|
||||
stopFn()
|
||||
res := map[string]string{"cycle": fmt.Sprint(cycleInfo.current)}
|
||||
if err != nil {
|
||||
res["error"] = err.Error()
|
||||
}
|
||||
stopFn(res)
|
||||
if err == nil {
|
||||
// Store new cycle...
|
||||
cycleInfo.next++
|
||||
@ -566,6 +574,19 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
|
||||
len(existingFolders)+len(newFolders) >= dataScannerCompactAtFolders ||
|
||||
len(existingFolders)+len(newFolders) >= dataScannerForceCompactAtFolders
|
||||
|
||||
if len(existingFolders)+len(newFolders) > dataScannerExcessiveFoldersThreshold {
|
||||
// Notify object accessed via a GET request.
|
||||
sendEvent(eventArgs{
|
||||
EventName: event.PrefixManyFolders,
|
||||
BucketName: f.root,
|
||||
Object: ObjectInfo{
|
||||
Name: strings.TrimSuffix(folder.name, "/") + "/",
|
||||
Size: int64(len(existingFolders) + len(newFolders)),
|
||||
},
|
||||
UserAgent: "scanner",
|
||||
Host: globalMinioHost,
|
||||
})
|
||||
}
|
||||
if !into.Compacted && shouldCompact {
|
||||
into.Compacted = true
|
||||
newFolders = append(newFolders, existingFolders...)
|
||||
@ -632,7 +653,8 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
|
||||
f.updateCurrentPath(folder.name)
|
||||
stopFn := globalScannerMetrics.log(scannerMetricScanFolder, f.root, folder.name)
|
||||
scanFolder(folder)
|
||||
stopFn()
|
||||
stopFn(map[string]string{"type": "new"})
|
||||
|
||||
// Add new folders if this is new and we don't have existing.
|
||||
if !into.Compacted {
|
||||
parent := f.updateCache.find(thisHash.Key())
|
||||
@ -661,9 +683,9 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
|
||||
}
|
||||
}
|
||||
f.updateCurrentPath(folder.name)
|
||||
stopFn := globalScannerMetrics.log(scannerMetricScanFolder, f.root, folder.name, "EXISTING")
|
||||
stopFn := globalScannerMetrics.log(scannerMetricScanFolder, f.root, folder.name)
|
||||
scanFolder(folder)
|
||||
stopFn()
|
||||
stopFn(map[string]string{"type": "existing"})
|
||||
}
|
||||
|
||||
// Scan for healing
|
||||
@ -803,7 +825,7 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
|
||||
this := cachedFolder{name: k, parent: &thisHash, objectHealProbDiv: 1}
|
||||
stopFn := globalScannerMetrics.log(scannerMetricScanFolder, f.root, this.name, "HEALED")
|
||||
scanFolder(this)
|
||||
stopFn()
|
||||
stopFn(map[string]string{"type": "healed"})
|
||||
}
|
||||
}
|
||||
break
|
||||
@ -817,9 +839,6 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
|
||||
flat.Compacted = true
|
||||
var compact bool
|
||||
if flat.Objects < dataScannerCompactLeastObject {
|
||||
if flat.Objects > 1 {
|
||||
globalScannerMetrics.log(scannerMetricCompactFolder, folder.name)
|
||||
}
|
||||
compact = true
|
||||
} else {
|
||||
// Compact if we only have objects as children...
|
||||
@ -832,13 +851,20 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
|
||||
}
|
||||
}
|
||||
}
|
||||
if compact {
|
||||
globalScannerMetrics.log(scannerMetricCompactFolder, folder.name)
|
||||
}
|
||||
|
||||
}
|
||||
if compact {
|
||||
stop := globalScannerMetrics.log(scannerMetricCompactFolder, folder.name)
|
||||
f.newCache.deleteRecursive(thisHash)
|
||||
f.newCache.replaceHashed(thisHash, folder.parent, *flat)
|
||||
total := map[string]string{
|
||||
"objects": fmt.Sprint(flat.Objects),
|
||||
"size": fmt.Sprint(flat.Size),
|
||||
}
|
||||
if flat.Versions > 0 {
|
||||
total["versions"] = fmt.Sprint(flat.Versions)
|
||||
}
|
||||
stop(total)
|
||||
}
|
||||
|
||||
}
|
||||
@ -1082,8 +1108,27 @@ func (i *scannerItem) applyVersionActions(ctx context.Context, o ObjectLayer, fi
|
||||
}
|
||||
}
|
||||
}
|
||||
fivs, err := i.applyNewerNoncurrentVersionLimit(ctx, o, fivs)
|
||||
if err != nil {
|
||||
return fivs, err
|
||||
}
|
||||
|
||||
return i.applyNewerNoncurrentVersionLimit(ctx, o, fivs)
|
||||
// Check if we have many versions after applyNewerNoncurrentVersionLimit.
|
||||
if len(fivs) > dataScannerExcessiveVersionsThreshold {
|
||||
// Notify object accessed via a GET request.
|
||||
sendEvent(eventArgs{
|
||||
EventName: event.ObjectManyVersions,
|
||||
BucketName: i.bucket,
|
||||
Object: ObjectInfo{
|
||||
Name: i.objectPath(),
|
||||
},
|
||||
UserAgent: "scanner",
|
||||
Host: globalMinioHost,
|
||||
RespElements: map[string]string{"x-minio-versions": strconv.Itoa(len(fivs))},
|
||||
})
|
||||
}
|
||||
|
||||
return fivs, nil
|
||||
}
|
||||
|
||||
// applyActions will apply lifecycle checks on to a scanned item.
|
||||
|
@ -66,7 +66,7 @@ func (fi FileInfo) DataShardFixed() bool {
|
||||
return fi.Metadata[reservedMetadataPrefixLowerDataShardFix] == "true"
|
||||
}
|
||||
|
||||
// Heals a bucket if it doesn't exist on one of the disks, additionally
|
||||
// HealBucket heals a bucket if it doesn't exist on one of the disks, additionally
|
||||
// also heals the missing entries for bucket metadata files
|
||||
// `policy.json, notification.xml, listeners.json`.
|
||||
func (er erasureObjects) HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (
|
||||
@ -1062,7 +1062,12 @@ func healTrace(funcName healingMetric, startTime time.Time, bucket, object strin
|
||||
Path: pathJoin(bucket, decodeDirObject(object)),
|
||||
}
|
||||
if opts != nil {
|
||||
tr.Message = fmt.Sprintf("dry:%v, rm:%v, recreate:%v mode:%v", opts.DryRun, opts.Remove, opts.Recreate, opts.ScanMode)
|
||||
tr.Custom = map[string]string{
|
||||
"dry": fmt.Sprint(opts.DryRun),
|
||||
"remove": fmt.Sprint(opts.Remove),
|
||||
"recreate": fmt.Sprint(opts.Recreate),
|
||||
"mode": fmt.Sprint(opts.ScanMode),
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
tr.Error = err.Error()
|
||||
|
@ -1918,7 +1918,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
|
||||
writeSuccessResponseHeadersOnly(w)
|
||||
|
||||
// Notify object created event.
|
||||
sendEvent(eventArgs{
|
||||
evt := eventArgs{
|
||||
EventName: event.ObjectCreatedPut,
|
||||
BucketName: bucket,
|
||||
Object: objInfo,
|
||||
@ -1926,7 +1926,12 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
|
||||
RespElements: extractRespElements(w),
|
||||
UserAgent: r.UserAgent(),
|
||||
Host: handlers.GetSourceIP(r),
|
||||
})
|
||||
}
|
||||
sendEvent(evt)
|
||||
if objInfo.NumVersions > dataScannerExcessiveVersionsThreshold {
|
||||
evt.EventName = event.ObjectManyVersions
|
||||
sendEvent(evt)
|
||||
}
|
||||
|
||||
// Remove the transitioned object whose object version is being overwritten.
|
||||
if !globalTierConfigMgr.Empty() {
|
||||
|
@ -1019,7 +1019,7 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
|
||||
writeSuccessResponseXML(w, encodedSuccessResponse)
|
||||
|
||||
// Notify object created event.
|
||||
sendEvent(eventArgs{
|
||||
evt := eventArgs{
|
||||
EventName: event.ObjectCreatedCompleteMultipartUpload,
|
||||
BucketName: bucket,
|
||||
Object: objInfo,
|
||||
@ -1027,7 +1027,13 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
|
||||
RespElements: extractRespElements(w),
|
||||
UserAgent: r.UserAgent(),
|
||||
Host: handlers.GetSourceIP(r),
|
||||
})
|
||||
}
|
||||
sendEvent(evt)
|
||||
|
||||
if objInfo.NumVersions > dataScannerExcessiveVersionsThreshold {
|
||||
evt.EventName = event.ObjectManyVersions
|
||||
sendEvent(evt)
|
||||
}
|
||||
|
||||
// Remove the transitioned object whose object version is being overwritten.
|
||||
if !globalTierConfigMgr.Empty() {
|
||||
|
@ -555,7 +555,7 @@ func storageTrace(s storageMetric, startTime time.Time, duration time.Duration,
|
||||
}
|
||||
}
|
||||
|
||||
func scannerTrace(s scannerMetric, startTime time.Time, duration time.Duration, path string) madmin.TraceInfo {
|
||||
func scannerTrace(s scannerMetric, startTime time.Time, duration time.Duration, path string, custom map[string]string) madmin.TraceInfo {
|
||||
return madmin.TraceInfo{
|
||||
TraceType: madmin.TraceScanner,
|
||||
Time: startTime,
|
||||
@ -563,6 +563,7 @@ func scannerTrace(s scannerMetric, startTime time.Time, duration time.Duration,
|
||||
FuncName: "scanner." + s.String(),
|
||||
Duration: duration,
|
||||
Path: path,
|
||||
Custom: custom,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -492,11 +492,15 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates
|
||||
return sizeSummary{}, errSkipFile
|
||||
}
|
||||
stopFn := globalScannerMetrics.log(scannerMetricScanObject, s.diskPath, pathJoin(item.bucket, item.objectPath()))
|
||||
defer stopFn()
|
||||
res := make(map[string]string, 8)
|
||||
defer func() {
|
||||
stopFn(res)
|
||||
}()
|
||||
|
||||
doneSz := globalScannerMetrics.timeSize(scannerMetricReadMetadata)
|
||||
buf, err := s.readMetadata(ctx, item.Path)
|
||||
doneSz(len(buf))
|
||||
res["metasize"] = fmt.Sprint(len(buf))
|
||||
if err != nil {
|
||||
if intDataUpdateTracker.debug {
|
||||
console.Debugf(color.Green("scannerBucket:")+" object path missing: %v: %w\n", item.Path, err)
|
||||
@ -515,6 +519,7 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates
|
||||
}
|
||||
return sizeSummary{}, errSkipFile
|
||||
}
|
||||
|
||||
sizeS := sizeSummary{}
|
||||
var noTiers bool
|
||||
if noTiers = globalTierConfigMgr.Empty(); !noTiers {
|
||||
@ -560,12 +565,45 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates
|
||||
}
|
||||
|
||||
// apply tier sweep action on free versions
|
||||
if len(fivs.FreeVersions) > 0 {
|
||||
res["free-versions"] = fmt.Sprint(len(fivs.FreeVersions))
|
||||
}
|
||||
for _, freeVersion := range fivs.FreeVersions {
|
||||
oi := freeVersion.ToObjectInfo(item.bucket, item.objectPath(), versioned)
|
||||
done = globalScannerMetrics.time(scannerMetricTierObjSweep)
|
||||
item.applyTierObjSweep(ctx, objAPI, oi)
|
||||
done()
|
||||
}
|
||||
|
||||
// These are rather expensive. Skip if nobody listens.
|
||||
if globalTrace.NumSubscribers(madmin.TraceScanner) > 0 {
|
||||
if sizeS.versions > 0 {
|
||||
res["versions"] = fmt.Sprint()
|
||||
}
|
||||
res["size"] = fmt.Sprint(sizeS.totalSize)
|
||||
if len(sizeS.tiers) > 0 {
|
||||
for name, tier := range sizeS.tiers {
|
||||
res["size-"+name] = fmt.Sprint(tier.TotalSize)
|
||||
res["versions-"+name] = fmt.Sprint(tier.NumVersions)
|
||||
}
|
||||
}
|
||||
if sizeS.failedCount > 0 {
|
||||
res["repl-failed"] = fmt.Sprintf("%d versions, %d bytes", sizeS.failedCount, sizeS.failedSize)
|
||||
}
|
||||
if sizeS.pendingCount > 0 {
|
||||
res["repl-pending"] = fmt.Sprintf("%d versions, %d bytes", sizeS.pendingCount, sizeS.pendingSize)
|
||||
}
|
||||
for tgt, st := range sizeS.replTargetStats {
|
||||
res["repl-size-"+tgt] = fmt.Sprint(st.replicatedSize)
|
||||
if st.failedCount > 0 {
|
||||
res["repl-failed-"+tgt] = fmt.Sprintf("%d versions, %d bytes", st.failedCount, st.failedSize)
|
||||
}
|
||||
if st.pendingCount > 0 {
|
||||
res["repl-pending-"+tgt] = fmt.Sprintf("%d versions, %d bytes", st.pendingCount, st.pendingSize)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return sizeS, nil
|
||||
}, scanMode)
|
||||
if err != nil {
|
||||
|
2
go.mod
2
go.mod
@ -48,7 +48,7 @@ require (
|
||||
github.com/minio/dperf v0.4.2
|
||||
github.com/minio/highwayhash v1.0.2
|
||||
github.com/minio/kes-go v0.1.0
|
||||
github.com/minio/madmin-go/v2 v2.0.12
|
||||
github.com/minio/madmin-go/v2 v2.0.13-0.20230220143547-e6641ef0b8d5
|
||||
github.com/minio/minio-go/v7 v7.0.48
|
||||
github.com/minio/mux v1.9.0
|
||||
github.com/minio/pkg v1.6.1
|
||||
|
4
go.sum
4
go.sum
@ -776,8 +776,8 @@ github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLT
|
||||
github.com/minio/kes-go v0.1.0 h1:h201DyOYP5sTqajkxFGxmXz/kPbT8HQNX1uh3Yx2PFc=
|
||||
github.com/minio/kes-go v0.1.0/go.mod h1:VorHLaIYis9/MxAHAtXN4d8PUMNKhIxTIlvFt0hBOEo=
|
||||
github.com/minio/madmin-go v1.6.6/go.mod h1:ATvkBOLiP3av4D++2v1UEHC/QzsGtgXD5kYvvRYzdKs=
|
||||
github.com/minio/madmin-go/v2 v2.0.12 h1:3PKFAhiZOMjeShxhtY10INw1kDrY/2VwH55FAat3hM4=
|
||||
github.com/minio/madmin-go/v2 v2.0.12/go.mod h1:5aFi/VLWBHC2DEFfGIlUmAeJhaF4ZAjuYpEWZFU14Zw=
|
||||
github.com/minio/madmin-go/v2 v2.0.13-0.20230220143547-e6641ef0b8d5 h1:IZa7nXFFDdOigg2wufK51C6oTB5q9c6LCV969u3L0YY=
|
||||
github.com/minio/madmin-go/v2 v2.0.13-0.20230220143547-e6641ef0b8d5/go.mod h1:5aFi/VLWBHC2DEFfGIlUmAeJhaF4ZAjuYpEWZFU14Zw=
|
||||
github.com/minio/mc v0.0.0-20230216192011-54e2edd1be94 h1:umgaS1OMD7qHLca/GH/eoox8RL44tQenBp34ZV1aWkU=
|
||||
github.com/minio/mc v0.0.0-20230216192011-54e2edd1be94/go.mod h1:4tBWo9BLlIUajqXqo5HdxxpQAdXktlOLSVvHaHFNYjE=
|
||||
github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34=
|
||||
|
@ -57,6 +57,8 @@ const (
|
||||
ObjectRestoreCompleted
|
||||
ObjectTransitionFailed
|
||||
ObjectTransitionComplete
|
||||
ObjectManyVersions
|
||||
PrefixManyFolders
|
||||
|
||||
objectSingleTypesEnd
|
||||
// Start Compound types that require expansion:
|
||||
@ -67,6 +69,7 @@ const (
|
||||
ObjectReplicationAll
|
||||
ObjectRestoreAll
|
||||
ObjectTransitionAll
|
||||
ObjectScannerAll
|
||||
Everything
|
||||
)
|
||||
|
||||
@ -113,6 +116,11 @@ func (name Name) Expand() []Name {
|
||||
ObjectTransitionFailed,
|
||||
ObjectTransitionComplete,
|
||||
}
|
||||
case ObjectScannerAll:
|
||||
return []Name{
|
||||
ObjectManyVersions,
|
||||
PrefixManyFolders,
|
||||
}
|
||||
case Everything:
|
||||
res := make([]Name, objectSingleTypesEnd-1)
|
||||
for i := range res {
|
||||
@ -202,6 +210,10 @@ func (name Name) String() string {
|
||||
return "s3:ObjectTransition:Failed"
|
||||
case ObjectTransitionComplete:
|
||||
return "s3:ObjectTransition:Complete"
|
||||
case ObjectManyVersions:
|
||||
return "s3:Scanner:ManyVersions"
|
||||
case PrefixManyFolders:
|
||||
return "s3:Scanner:BigPrefix"
|
||||
}
|
||||
|
||||
return ""
|
||||
@ -314,6 +326,10 @@ func ParseName(s string) (Name, error) {
|
||||
return ObjectTransitionComplete, nil
|
||||
case "s3:ObjectTransition:*":
|
||||
return ObjectTransitionAll, nil
|
||||
case "s3:Scanner:ManyVersions":
|
||||
return ObjectManyVersions, nil
|
||||
case "s3:Scanner:BigPrefix":
|
||||
return PrefixManyFolders, nil
|
||||
default:
|
||||
return 0, &ErrInvalidEventName{s}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user