mirror of
https://github.com/minio/minio.git
synced 2025-04-04 11:50:36 -04:00
fix: allow configuring excess versions alerting (#19028)
Bonus: enable audit alerts for object versions beyond the configured value, default is '100' versions per object beyond which scanner will alert for each such objects.
This commit is contained in:
parent
e3fbac9e24
commit
afd19de5a9
@ -1395,7 +1395,7 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
|
|||||||
// Notify object created events.
|
// Notify object created events.
|
||||||
sendEvent(eventArgsList[i])
|
sendEvent(eventArgsList[i])
|
||||||
|
|
||||||
if eventArgsList[i].Object.NumVersions > dataScannerExcessiveVersionsThreshold {
|
if eventArgsList[i].Object.NumVersions > int(scannerExcessObjectVersions.Load()) {
|
||||||
// Send events for excessive versions.
|
// Send events for excessive versions.
|
||||||
sendEvent(eventArgs{
|
sendEvent(eventArgs{
|
||||||
EventName: event.ObjectManyVersions,
|
EventName: event.ObjectManyVersions,
|
||||||
@ -1406,6 +1406,15 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
|
|||||||
UserAgent: r.UserAgent() + " " + "MinIO-Fan-Out",
|
UserAgent: r.UserAgent() + " " + "MinIO-Fan-Out",
|
||||||
Host: handlers.GetSourceIP(r),
|
Host: handlers.GetSourceIP(r),
|
||||||
})
|
})
|
||||||
|
|
||||||
|
auditLogInternal(context.Background(), AuditLogOptions{
|
||||||
|
Event: "scanner:manyversions",
|
||||||
|
APIName: "PostPolicyBucket",
|
||||||
|
Bucket: eventArgsList[i].Object.Bucket,
|
||||||
|
Object: eventArgsList[i].Object.Name,
|
||||||
|
VersionID: eventArgsList[i].Object.VersionID,
|
||||||
|
Status: http.StatusText(http.StatusOK),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1461,7 +1470,7 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
|
|||||||
Host: handlers.GetSourceIP(r),
|
Host: handlers.GetSourceIP(r),
|
||||||
})
|
})
|
||||||
|
|
||||||
if objInfo.NumVersions > dataScannerExcessiveVersionsThreshold {
|
if objInfo.NumVersions > int(scannerExcessObjectVersions.Load()) {
|
||||||
defer sendEvent(eventArgs{
|
defer sendEvent(eventArgs{
|
||||||
EventName: event.ObjectManyVersions,
|
EventName: event.ObjectManyVersions,
|
||||||
BucketName: objInfo.Bucket,
|
BucketName: objInfo.Bucket,
|
||||||
@ -1471,6 +1480,15 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
|
|||||||
UserAgent: r.UserAgent(),
|
UserAgent: r.UserAgent(),
|
||||||
Host: handlers.GetSourceIP(r),
|
Host: handlers.GetSourceIP(r),
|
||||||
})
|
})
|
||||||
|
|
||||||
|
auditLogInternal(context.Background(), AuditLogOptions{
|
||||||
|
Event: "scanner:manyversions",
|
||||||
|
APIName: "PostPolicyBucket",
|
||||||
|
Bucket: objInfo.Bucket,
|
||||||
|
Object: objInfo.Name,
|
||||||
|
VersionID: objInfo.VersionID,
|
||||||
|
Status: http.StatusText(http.StatusOK),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
if redirectURL != nil { // success_action_redirect is valid and set.
|
if redirectURL != nil { // success_action_redirect is valid and set.
|
||||||
|
@ -602,6 +602,8 @@ func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s conf
|
|||||||
// update dynamic scanner values.
|
// update dynamic scanner values.
|
||||||
scannerIdleMode.Store(scannerCfg.IdleMode)
|
scannerIdleMode.Store(scannerCfg.IdleMode)
|
||||||
scannerCycle.Store(scannerCfg.Cycle)
|
scannerCycle.Store(scannerCfg.Cycle)
|
||||||
|
scannerExcessObjectVersions.Store(scannerCfg.ExcessVersions)
|
||||||
|
scannerExcessFolders.Store(scannerCfg.ExcessFolders)
|
||||||
logger.LogIf(ctx, scannerSleeper.Update(scannerCfg.Delay, scannerCfg.MaxWait))
|
logger.LogIf(ctx, scannerSleeper.Update(scannerCfg.Delay, scannerCfg.MaxWait))
|
||||||
case config.LoggerWebhookSubSys:
|
case config.LoggerWebhookSubSys:
|
||||||
loggerCfg, err := logger.LookupConfigForSubSys(ctx, s, config.LoggerWebhookSubSys)
|
loggerCfg, err := logger.LookupConfigForSubSys(ctx, s, config.LoggerWebhookSubSys)
|
||||||
|
@ -57,18 +57,17 @@ const (
|
|||||||
|
|
||||||
healDeleteDangling = true
|
healDeleteDangling = true
|
||||||
healObjectSelectProb = 1024 // Overall probability of a file being scanned; one in n.
|
healObjectSelectProb = 1024 // Overall probability of a file being scanned; one in n.
|
||||||
|
|
||||||
dataScannerExcessiveVersionsThreshold = 100 // Issue a warning when a single object has more versions than this
|
|
||||||
dataScannerExcessiveFoldersThreshold = 50000 // Issue a warning when a folder has more subfolders than this in a *set*
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
globalHealConfig heal.Config
|
globalHealConfig heal.Config
|
||||||
|
|
||||||
// Sleeper values are updated when config is loaded.
|
// Sleeper values are updated when config is loaded.
|
||||||
scannerSleeper = newDynamicSleeper(2, time.Second, true) // Keep defaults same as config defaults
|
scannerSleeper = newDynamicSleeper(2, time.Second, true) // Keep defaults same as config defaults
|
||||||
scannerCycle = uatomic.NewDuration(dataScannerStartDelay)
|
scannerCycle = uatomic.NewDuration(dataScannerStartDelay)
|
||||||
scannerIdleMode = uatomic.NewInt32(0) // default is throttled when idle
|
scannerIdleMode = uatomic.NewInt32(0) // default is throttled when idle
|
||||||
|
scannerExcessObjectVersions = uatomic.NewInt64(100)
|
||||||
|
scannerExcessFolders = uatomic.NewInt64(50000)
|
||||||
)
|
)
|
||||||
|
|
||||||
// initDataScanner will start the scanner in the background.
|
// initDataScanner will start the scanner in the background.
|
||||||
@ -530,8 +529,7 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
|
|||||||
len(existingFolders)+len(newFolders) >= dataScannerCompactAtFolders ||
|
len(existingFolders)+len(newFolders) >= dataScannerCompactAtFolders ||
|
||||||
len(existingFolders)+len(newFolders) >= dataScannerForceCompactAtFolders
|
len(existingFolders)+len(newFolders) >= dataScannerForceCompactAtFolders
|
||||||
|
|
||||||
if len(existingFolders)+len(newFolders) > dataScannerExcessiveFoldersThreshold {
|
if len(existingFolders)+len(newFolders) > int(scannerExcessFolders.Load()) {
|
||||||
// Notify object accessed via a GET request.
|
|
||||||
sendEvent(eventArgs{
|
sendEvent(eventArgs{
|
||||||
EventName: event.PrefixManyFolders,
|
EventName: event.PrefixManyFolders,
|
||||||
BucketName: f.root,
|
BucketName: f.root,
|
||||||
@ -1101,7 +1099,7 @@ func (i *scannerItem) applyVersionActions(ctx context.Context, o ObjectLayer, fi
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Check if we have many versions after applyNewerNoncurrentVersionLimit.
|
// Check if we have many versions after applyNewerNoncurrentVersionLimit.
|
||||||
if len(objInfos) > dataScannerExcessiveVersionsThreshold {
|
if len(objInfos) > int(scannerExcessObjectVersions.Load()) {
|
||||||
// Notify object accessed via a GET request.
|
// Notify object accessed via a GET request.
|
||||||
sendEvent(eventArgs{
|
sendEvent(eventArgs{
|
||||||
EventName: event.ObjectManyVersions,
|
EventName: event.ObjectManyVersions,
|
||||||
@ -1113,6 +1111,16 @@ func (i *scannerItem) applyVersionActions(ctx context.Context, o ObjectLayer, fi
|
|||||||
Host: globalLocalNodeName,
|
Host: globalLocalNodeName,
|
||||||
RespElements: map[string]string{"x-minio-versions": strconv.Itoa(len(fivs))},
|
RespElements: map[string]string{"x-minio-versions": strconv.Itoa(len(fivs))},
|
||||||
})
|
})
|
||||||
|
|
||||||
|
auditLogInternal(context.Background(), AuditLogOptions{
|
||||||
|
Event: "scanner:manyversions",
|
||||||
|
APIName: ILMExpiry,
|
||||||
|
Bucket: i.bucket,
|
||||||
|
Object: i.objectPath(),
|
||||||
|
Tags: map[string]interface{}{
|
||||||
|
"x-minio-versions": strconv.Itoa(len(fivs)),
|
||||||
|
},
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
return objInfos, nil
|
return objInfos, nil
|
||||||
|
@ -2360,9 +2360,18 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
|
|||||||
Host: handlers.GetSourceIP(r),
|
Host: handlers.GetSourceIP(r),
|
||||||
}
|
}
|
||||||
sendEvent(evt)
|
sendEvent(evt)
|
||||||
if objInfo.NumVersions > dataScannerExcessiveVersionsThreshold {
|
if objInfo.NumVersions > int(scannerExcessObjectVersions.Load()) {
|
||||||
evt.EventName = event.ObjectManyVersions
|
evt.EventName = event.ObjectManyVersions
|
||||||
sendEvent(evt)
|
sendEvent(evt)
|
||||||
|
|
||||||
|
auditLogInternal(context.Background(), AuditLogOptions{
|
||||||
|
Event: "scanner:manyversions",
|
||||||
|
APIName: "PutObject",
|
||||||
|
Bucket: objInfo.Bucket,
|
||||||
|
Object: objInfo.Name,
|
||||||
|
VersionID: objInfo.VersionID,
|
||||||
|
Status: http.StatusText(http.StatusOK),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Do not send checksums in events to avoid leaks.
|
// Do not send checksums in events to avoid leaks.
|
||||||
|
@ -19,6 +19,7 @@ package cmd
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
|
"context"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
@ -1035,9 +1036,18 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
|
|||||||
Metadata: cleanReservedKeys(objInfo.UserDefined),
|
Metadata: cleanReservedKeys(objInfo.UserDefined),
|
||||||
})
|
})
|
||||||
|
|
||||||
if objInfo.NumVersions > dataScannerExcessiveVersionsThreshold {
|
if objInfo.NumVersions > int(scannerExcessObjectVersions.Load()) {
|
||||||
evt.EventName = event.ObjectManyVersions
|
evt.EventName = event.ObjectManyVersions
|
||||||
sendEvent(evt)
|
sendEvent(evt)
|
||||||
|
|
||||||
|
auditLogInternal(context.Background(), AuditLogOptions{
|
||||||
|
Event: "scanner:manyversions",
|
||||||
|
APIName: "CompleteMultipartUpload",
|
||||||
|
Bucket: objInfo.Bucket,
|
||||||
|
Object: objInfo.Name,
|
||||||
|
VersionID: objInfo.VersionID,
|
||||||
|
Status: http.StatusText(http.StatusOK),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove the transitioned object whose object version is being overwritten.
|
// Remove the transitioned object whose object version is being overwritten.
|
||||||
|
@ -28,9 +28,21 @@ var (
|
|||||||
Help = config.HelpKVS{
|
Help = config.HelpKVS{
|
||||||
config.HelpKV{
|
config.HelpKV{
|
||||||
Key: Speed,
|
Key: Speed,
|
||||||
Description: `scanner speed` + defaultHelpPostfix(Speed),
|
Description: `customize scanner speed (default|slowest|slow|fast|fastest)` + defaultHelpPostfix(Speed),
|
||||||
Optional: true,
|
Optional: true,
|
||||||
Type: "default|slowest|slow|fast|fastest",
|
Type: "string",
|
||||||
|
},
|
||||||
|
config.HelpKV{
|
||||||
|
Key: ExcessVersions,
|
||||||
|
Description: `alert per object beyond this many versions` + defaultHelpPostfix(ExcessVersions),
|
||||||
|
Optional: true,
|
||||||
|
Type: "int",
|
||||||
|
},
|
||||||
|
config.HelpKV{
|
||||||
|
Key: ExcessFolders,
|
||||||
|
Description: `alert beyond this many sub-folders per folder in an erasure set` + defaultHelpPostfix(ExcessFolders),
|
||||||
|
Optional: true,
|
||||||
|
Type: "int",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
@ -34,6 +34,12 @@ const (
|
|||||||
IdleSpeed = "idle_speed"
|
IdleSpeed = "idle_speed"
|
||||||
EnvIdleSpeed = "MINIO_SCANNER_IDLE_SPEED"
|
EnvIdleSpeed = "MINIO_SCANNER_IDLE_SPEED"
|
||||||
|
|
||||||
|
ExcessVersions = "alert_excess_versions"
|
||||||
|
EnvExcessVersions = "MINIO_SCANNER_ALERT_EXCESS_VERSIONS"
|
||||||
|
|
||||||
|
ExcessFolders = "alert_excess_folders"
|
||||||
|
EnvExcessFolders = "MINIO_SCANNER_ALERT_EXCESS_FOLDERS"
|
||||||
|
|
||||||
// All below are deprecated in October 2022 and
|
// All below are deprecated in October 2022 and
|
||||||
// replaced them with a single speed parameter
|
// replaced them with a single speed parameter
|
||||||
Delay = "delay"
|
Delay = "delay"
|
||||||
@ -50,8 +56,16 @@ const (
|
|||||||
type Config struct {
|
type Config struct {
|
||||||
// Delay is the sleep multiplier.
|
// Delay is the sleep multiplier.
|
||||||
Delay float64 `json:"delay"`
|
Delay float64 `json:"delay"`
|
||||||
// Behavior of the scanner when there is no other parallel S3 requests
|
|
||||||
IdleMode int32 // 0 => throttling, 1 => full speed
|
// Sleep always or based on incoming S3 requests.
|
||||||
|
IdleMode int32 // 0 => on, 1 => off
|
||||||
|
|
||||||
|
// Alert upon this many excess object versions
|
||||||
|
ExcessVersions int64 // 100
|
||||||
|
|
||||||
|
// Alert upon this many excess sub-folders per folder in an erasure set.
|
||||||
|
ExcessFolders int64 // 50000
|
||||||
|
|
||||||
// MaxWait is maximum wait time between operations
|
// MaxWait is maximum wait time between operations
|
||||||
MaxWait time.Duration
|
MaxWait time.Duration
|
||||||
// Cycle is the time.Duration between each scanner cycles
|
// Cycle is the time.Duration between each scanner cycles
|
||||||
@ -69,6 +83,15 @@ var DefaultKVS = config.KVS{
|
|||||||
Value: "",
|
Value: "",
|
||||||
HiddenIfEmpty: true,
|
HiddenIfEmpty: true,
|
||||||
},
|
},
|
||||||
|
config.KV{
|
||||||
|
Key: ExcessVersions,
|
||||||
|
Value: "100",
|
||||||
|
},
|
||||||
|
config.KV{
|
||||||
|
Key: ExcessFolders,
|
||||||
|
Value: "50000",
|
||||||
|
},
|
||||||
|
|
||||||
// Deprecated Oct 2022
|
// Deprecated Oct 2022
|
||||||
config.KV{
|
config.KV{
|
||||||
Key: Delay,
|
Key: Delay,
|
||||||
@ -119,15 +142,27 @@ func LookupConfig(kvs config.KVS) (cfg Config, err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
switch idleSpeed := env.Get(EnvIdleSpeed, kvs.GetWithDefault(IdleSpeed, DefaultKVS)); idleSpeed {
|
switch idleSpeed := env.Get(EnvIdleSpeed, kvs.GetWithDefault(IdleSpeed, DefaultKVS)); idleSpeed {
|
||||||
case "", "throttled": // Empty is the default mode;
|
case "", config.EnableOn:
|
||||||
cfg.IdleMode = 0
|
cfg.IdleMode = 0
|
||||||
case "full":
|
case config.EnableOff:
|
||||||
cfg.IdleMode = 1
|
cfg.IdleMode = 1
|
||||||
default:
|
default:
|
||||||
return cfg, fmt.Errorf("unknown value: '%s'", idleSpeed)
|
return cfg, fmt.Errorf("unknown value: '%s'", idleSpeed)
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
excessVersions, err := strconv.ParseInt(env.Get(EnvExcessVersions, kvs.GetWithDefault(ExcessVersions, DefaultKVS)), 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return cfg, err
|
||||||
|
}
|
||||||
|
cfg.ExcessVersions = excessVersions
|
||||||
|
|
||||||
|
excessFolders, err := strconv.ParseInt(env.Get(EnvExcessFolders, kvs.GetWithDefault(ExcessFolders, DefaultKVS)), 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return cfg, err
|
||||||
|
}
|
||||||
|
cfg.ExcessFolders = excessFolders
|
||||||
|
|
||||||
|
return cfg, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func lookupDeprecatedScannerConfig(kvs config.KVS) (cfg Config, err error) {
|
func lookupDeprecatedScannerConfig(kvs config.KVS) (cfg Config, err error) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user