flatten out audit tags, do not send as free-form (#20256)

move away from map[string]interface{} to map[string]string
to simplify the audit, and also provide concise information.

avoids large allocations under load(), reduces the amount
of audit information generated, as the current implementation
was a bit free-form. instead all datastructures must be
flattened.
This commit is contained in:
Harshavardhana 2024-08-13 15:22:04 -07:00 committed by GitHub
parent 516af01a12
commit e7a56f35b9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 100 additions and 109 deletions

View File

@ -17,7 +17,11 @@
package cmd
import "github.com/minio/minio/internal/bucket/lifecycle"
import (
"strconv"
"github.com/minio/minio/internal/bucket/lifecycle"
)
//go:generate stringer -type lcEventSrc -trimprefix lcEventSrc_ $GOFILE
type lcEventSrc uint8
@ -43,7 +47,7 @@ type lcAuditEvent struct {
source lcEventSrc
}
func (lae lcAuditEvent) Tags() map[string]interface{} {
func (lae lcAuditEvent) Tags() map[string]string {
event := lae.Event
src := lae.source
const (
@ -55,7 +59,7 @@ func (lae lcAuditEvent) Tags() map[string]interface{} {
ilmNewerNoncurrentVersions = "ilm-newer-noncurrent-versions"
ilmNoncurrentDays = "ilm-noncurrent-days"
)
tags := make(map[string]interface{}, 5)
tags := make(map[string]string, 5)
if src > lcEventSrc_None {
tags[ilmSrc] = src.String()
}
@ -63,7 +67,7 @@ func (lae lcAuditEvent) Tags() map[string]interface{} {
tags[ilmRuleID] = event.RuleID
if !event.Due.IsZero() {
tags[ilmDue] = event.Due
tags[ilmDue] = event.Due.Format(iso8601Format)
}
// rule with Transition/NoncurrentVersionTransition in effect
@ -73,10 +77,10 @@ func (lae lcAuditEvent) Tags() map[string]interface{} {
// rule with NewernoncurrentVersions in effect
if event.NewerNoncurrentVersions > 0 {
tags[ilmNewerNoncurrentVersions] = event.NewerNoncurrentVersions
tags[ilmNewerNoncurrentVersions] = strconv.Itoa(event.NewerNoncurrentVersions)
}
if event.NoncurrentDays > 0 {
tags[ilmNoncurrentDays] = event.NoncurrentDays
tags[ilmNoncurrentDays] = strconv.Itoa(event.NoncurrentDays)
}
return tags
}

View File

@ -71,7 +71,7 @@ func NewLifecycleSys() *LifecycleSys {
return &LifecycleSys{}
}
func ilmTrace(startTime time.Time, duration time.Duration, oi ObjectInfo, event string) madmin.TraceInfo {
func ilmTrace(startTime time.Time, duration time.Duration, oi ObjectInfo, event string, metadata map[string]string) madmin.TraceInfo {
sz, _ := oi.GetActualSize()
return madmin.TraceInfo{
TraceType: madmin.TraceILM,
@ -83,16 +83,16 @@ func ilmTrace(startTime time.Time, duration time.Duration, oi ObjectInfo, event
Bytes: sz,
Error: "",
Message: getSource(4),
Custom: map[string]string{"version-id": oi.VersionID},
Custom: metadata,
}
}
func (sys *LifecycleSys) trace(oi ObjectInfo) func(event string) {
func (sys *LifecycleSys) trace(oi ObjectInfo) func(event string, metadata map[string]string) {
startTime := time.Now()
return func(event string) {
return func(event string, metadata map[string]string) {
duration := time.Since(startTime)
if globalTrace.NumSubscribers(madmin.TraceILM) > 0 {
globalTrace.Publish(ilmTrace(startTime, duration, oi, event))
globalTrace.Publish(ilmTrace(startTime, duration, oi, event, metadata))
}
}
}
@ -707,6 +707,11 @@ type auditTierOp struct {
Error string `json:"error,omitempty"`
}
func (op auditTierOp) String() string {
// flattening the auditTierOp{} for audit
return fmt.Sprintf("tier:%s,respNS:%d,tx:%d,err:%s", op.Tier, op.TimeToResponseNS, op.OutputBytes, op.Error)
}
func auditTierActions(ctx context.Context, tier string, bytes int64) func(err error) {
startTime := time.Now()
return func(err error) {
@ -730,7 +735,7 @@ func auditTierActions(ctx context.Context, tier string, bytes int64) func(err er
globalTierMetrics.logFailure(tier)
}
logger.GetReqInfo(ctx).AppendTags("tierStats", op)
logger.GetReqInfo(ctx).AppendTags("tierStats", op.String())
}
}

View File

@ -569,7 +569,7 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
APIName: "Scanner",
Bucket: f.root,
Object: prefixName,
Tags: map[string]interface{}{
Tags: map[string]string{
"x-minio-prefixes-total": strconv.Itoa(totalFolders),
},
})
@ -1138,7 +1138,7 @@ func (i *scannerItem) applyVersionActions(ctx context.Context, o ObjectLayer, fi
APIName: "Scanner",
Bucket: i.bucket,
Object: i.objectPath(),
Tags: map[string]interface{}{
Tags: map[string]string{
"x-minio-versions": strconv.Itoa(len(objInfos)),
},
})
@ -1170,7 +1170,7 @@ func (i *scannerItem) applyVersionActions(ctx context.Context, o ObjectLayer, fi
APIName: "Scanner",
Bucket: i.bucket,
Object: i.objectPath(),
Tags: map[string]interface{}{
Tags: map[string]string{
"x-minio-versions-count": strconv.Itoa(len(objInfos)),
"x-minio-versions-size": strconv.FormatInt(cumulativeSize, 10),
},
@ -1336,6 +1336,8 @@ func applyExpiryOnNonTransitionedObjects(ctx context.Context, objLayer ObjectLay
}
tags := newLifecycleAuditEvent(src, lcEvent).Tags()
tags["version-id"] = dobj.VersionID
// Send audit for the lifecycle delete operation
auditLogLifecycle(ctx, dobj, ILMExpiry, tags, traceFn)
@ -1547,7 +1549,7 @@ const (
ILMTransition = " ilm:transition"
)
func auditLogLifecycle(ctx context.Context, oi ObjectInfo, event string, tags map[string]interface{}, traceFn func(event string)) {
func auditLogLifecycle(ctx context.Context, oi ObjectInfo, event string, tags map[string]string, traceFn func(event string, metadata map[string]string)) {
var apiName string
switch event {
case ILMExpiry:
@ -1565,5 +1567,5 @@ func auditLogLifecycle(ctx context.Context, oi ObjectInfo, event string, tags ma
VersionID: oi.VersionID,
Tags: tags,
})
traceFn(event)
traceFn(event, tags)
}

View File

@ -226,21 +226,28 @@ func (er *erasureObjects) auditHealObject(ctx context.Context, bucket, object, v
opts := AuditLogOptions{
Event: "HealObject",
Bucket: bucket,
Object: object,
Object: decodeDirObject(object),
VersionID: versionID,
}
if err != nil {
opts.Error = err.Error()
}
opts.Tags = map[string]interface{}{
"healResult": result,
"objectLocation": auditObjectOp{
Name: decodeDirObject(object),
b, a := result.GetCorruptedCounts()
if b == a {
opts.Error = fmt.Sprintf("unable to heal %d corrupted blocks on drives", b)
}
b, a = result.GetMissingCounts()
if b == a {
opts.Error = fmt.Sprintf("unable to heal %d missing blocks on drives", b)
}
opts.Tags = map[string]string{
"healObject": auditObjectOp{
Name: opts.Object,
Pool: er.poolIndex + 1,
Set: er.setIndex + 1,
Drives: er.getEndpointStrings(),
},
}.String(),
}
auditLogInternal(ctx, opts)

View File

@ -258,7 +258,7 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto
// towards simplification of multipart APIs.
// The resulting ListMultipartsInfo structure is unmarshalled directly as XML.
func (er erasureObjects) ListMultipartUploads(ctx context.Context, bucket, object, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, err error) {
auditObjectErasureSet(ctx, object, &er)
auditObjectErasureSet(ctx, "ListMultipartUploads", object, &er)
result.MaxUploads = maxUploads
result.KeyMarker = keyMarker
@ -514,7 +514,7 @@ func (er erasureObjects) newMultipartUpload(ctx context.Context, bucket string,
// Implements S3 compatible initiate multipart API.
func (er erasureObjects) NewMultipartUpload(ctx context.Context, bucket, object string, opts ObjectOptions) (*NewMultipartUploadResult, error) {
if !opts.NoAuditLog {
auditObjectErasureSet(ctx, object, &er)
auditObjectErasureSet(ctx, "NewMultipartUpload", object, &er)
}
return er.newMultipartUpload(ctx, bucket, object, opts)
@ -560,7 +560,7 @@ func (er erasureObjects) renamePart(ctx context.Context, disks []StorageAPI, src
// Implements S3 compatible Upload Part API.
func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *PutObjReader, opts ObjectOptions) (pi PartInfo, err error) {
if !opts.NoAuditLog {
auditObjectErasureSet(ctx, object, &er)
auditObjectErasureSet(ctx, "PutObjectPart", object, &er)
}
data := r.Reader
@ -766,7 +766,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
// Does not contain currently uploaded parts by design.
func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (MultipartInfo, error) {
if !opts.NoAuditLog {
auditObjectErasureSet(ctx, object, &er)
auditObjectErasureSet(ctx, "GetMultipartInfo", object, &er)
}
result := MultipartInfo{
@ -804,7 +804,7 @@ func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, u
// replied back to the client.
func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker, maxParts int, opts ObjectOptions) (result ListPartsInfo, err error) {
if !opts.NoAuditLog {
auditObjectErasureSet(ctx, object, &er)
auditObjectErasureSet(ctx, "ListObjectParts", object, &er)
}
uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
@ -1058,7 +1058,7 @@ func errStrToPartErr(errStr string) error {
// Implements S3 compatible Complete multipart API.
func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket string, object string, uploadID string, parts []CompletePart, opts ObjectOptions) (oi ObjectInfo, err error) {
if !opts.NoAuditLog {
auditObjectErasureSet(ctx, object, &er)
auditObjectErasureSet(ctx, "CompleteMultipartUpload", object, &er)
}
if opts.CheckPrecondFn != nil {
@ -1455,7 +1455,7 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
// operation.
func (er erasureObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (err error) {
if !opts.NoAuditLog {
auditObjectErasureSet(ctx, object, &er)
auditObjectErasureSet(ctx, "AbortMultipartUpload", object, &er)
}
lk := er.NewNSLock(bucket, pathJoin(object, uploadID))

View File

@ -70,7 +70,7 @@ func countOnlineDisks(onlineDisks []StorageAPI) (online int) {
// update metadata.
func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (oi ObjectInfo, err error) {
if !dstOpts.NoAuditLog {
auditObjectErasureSet(ctx, dstObject, &er)
auditObjectErasureSet(ctx, "CopyObject", dstObject, &er)
}
// This call shouldn't be used for anything other than metadata updates or adding self referential versions.
@ -199,7 +199,7 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d
// Read(Closer). When err != nil, the returned reader is always nil.
func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, err error) {
if !opts.NoAuditLog {
auditObjectErasureSet(ctx, object, &er)
auditObjectErasureSet(ctx, "GetObject", object, &er)
}
var unlockOnDefer bool
@ -439,7 +439,7 @@ func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, obje
// GetObjectInfo - reads object metadata and replies back ObjectInfo.
func (er erasureObjects) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (info ObjectInfo, err error) {
if !opts.NoAuditLog {
auditObjectErasureSet(ctx, object, &er)
auditObjectErasureSet(ctx, "GetObjectInfo", object, &er)
}
if !opts.NoLock {
@ -456,7 +456,7 @@ func (er erasureObjects) GetObjectInfo(ctx context.Context, bucket, object strin
return er.getObjectInfo(ctx, bucket, object, opts)
}
func auditDanglingObjectDeletion(ctx context.Context, bucket, object, versionID string, tags map[string]interface{}) {
func auditDanglingObjectDeletion(ctx context.Context, bucket, object, versionID string, tags map[string]string) {
if len(logger.AuditTargets()) == 0 {
return
}
@ -472,13 +472,16 @@ func auditDanglingObjectDeletion(ctx context.Context, bucket, object, versionID
auditLogInternal(ctx, opts)
}
func joinErrs(errs []error) []string {
s := make([]string, len(errs))
func joinErrs(errs []error) string {
var s string
for i := range s {
if s != "" {
s += ","
}
if errs[i] == nil {
s[i] = "<nil>"
s += "<nil>"
} else {
s[i] = errs[i].Error()
s += errs[i].Error()
}
}
return s
@ -491,20 +494,18 @@ func (er erasureObjects) deleteIfDangling(ctx context.Context, bucket, object st
// can be deleted safely, in such a scenario return ReadQuorum error.
return FileInfo{}, errErasureReadQuorum
}
tags := make(map[string]interface{}, 4)
tags["set"] = er.setIndex
tags["pool"] = er.poolIndex
tags := make(map[string]string, 16)
tags["set"] = strconv.Itoa(er.setIndex)
tags["pool"] = strconv.Itoa(er.poolIndex)
tags["merrs"] = joinErrs(errs)
tags["derrs"] = dataErrsByPart
tags["derrs"] = fmt.Sprintf("%v", dataErrsByPart)
if m.IsValid() {
tags["size"] = m.Size
tags["mtime"] = m.ModTime.Format(http.TimeFormat)
tags["data"] = m.Erasure.DataBlocks
tags["parity"] = m.Erasure.ParityBlocks
tags["sz"] = strconv.FormatInt(m.Size, 10)
tags["mt"] = m.ModTime.Format(iso8601Format)
tags["d:p"] = fmt.Sprintf("%d:%d", m.Erasure.DataBlocks, m.Erasure.ParityBlocks)
} else {
tags["invalid-meta"] = true
tags["data"] = er.setDriveCount - er.defaultParityCount
tags["parity"] = er.defaultParityCount
tags["invalid"] = "1"
tags["d:p"] = fmt.Sprintf("%d:%d", er.setDriveCount-er.defaultParityCount, er.defaultParityCount)
}
// count the number of offline disks
@ -527,7 +528,7 @@ func (er erasureObjects) deleteIfDangling(ctx context.Context, bucket, object st
}
}
if offline > 0 {
tags["offline"] = offline
tags["offline"] = strconv.Itoa(offline)
}
_, file, line, cok := runtime.Caller(1)
@ -556,22 +557,16 @@ func (er erasureObjects) deleteIfDangling(ctx context.Context, bucket, object st
}, index)
}
rmDisks := make(map[string]string, len(disks))
for index, err := range g.Wait() {
var errStr, diskName string
var errStr string
if err != nil {
errStr = err.Error()
} else {
errStr = "<nil>"
}
if disks[index] != nil {
diskName = disks[index].String()
} else {
diskName = fmt.Sprintf("disk-%d", index)
tags[fmt.Sprintf("ddisk-%d", index)] = errStr
}
rmDisks[diskName] = errStr
}
tags["cleanupResult"] = rmDisks
return m, nil
}
@ -1251,7 +1246,7 @@ func (er erasureObjects) PutObject(ctx context.Context, bucket string, object st
// putObject wrapper for erasureObjects PutObject
func (er erasureObjects) putObject(ctx context.Context, bucket string, object string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) {
if !opts.NoAuditLog {
auditObjectErasureSet(ctx, object, &er)
auditObjectErasureSet(ctx, "PutObject", object, &er)
}
data := r.Reader
@ -1626,7 +1621,7 @@ func (er erasureObjects) deleteObjectVersion(ctx context.Context, bucket, object
func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objects []ObjectToDelete, opts ObjectOptions) ([]DeletedObject, []error) {
if !opts.NoAuditLog {
for _, obj := range objects {
auditObjectErasureSet(ctx, obj.ObjectV.ObjectName, &er)
auditObjectErasureSet(ctx, "DeleteObjects", obj.ObjectV.ObjectName, &er)
}
}
@ -1846,7 +1841,7 @@ func (er erasureObjects) deletePrefix(ctx context.Context, bucket, prefix string
// response to the client request.
func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
if !opts.NoAuditLog {
auditObjectErasureSet(ctx, object, &er)
auditObjectErasureSet(ctx, "DeleteObject", object, &er)
}
var lc *lifecycle.Lifecycle

View File

@ -540,11 +540,15 @@ type auditObjectOp struct {
Name string `json:"name"`
Pool int `json:"poolId"`
Set int `json:"setId"`
Drives []string `json:"drives"`
}
func (op auditObjectOp) String() string {
// Flatten the auditObjectOp
return fmt.Sprintf("name=%s,pool=%d,set=%d", op.Name, op.Pool, op.Set)
}
// Add erasure set information to the current context
func auditObjectErasureSet(ctx context.Context, object string, set *erasureObjects) {
func auditObjectErasureSet(ctx context.Context, api, object string, set *erasureObjects) {
if len(logger.AuditTargets()) == 0 {
return
}
@ -553,10 +557,9 @@ func auditObjectErasureSet(ctx context.Context, object string, set *erasureObjec
Name: decodeDirObject(object),
Pool: set.poolIndex + 1,
Set: set.setIndex + 1,
Drives: set.getEndpointStrings(),
}
logger.GetReqInfo(ctx).AppendTags("objectLocation", op)
logger.GetReqInfo(ctx).AppendTags(api, op.String())
}
// NewNSLock - initialize a new namespace RWLocker instance.

View File

@ -15,8 +15,6 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
//go:build fmtgen
package cmd
import (
@ -51,6 +49,7 @@ var fmtGenCmd = cli.Command{
Usage: "Generate format.json files for an erasure server pool",
Flags: append(fmtGenFlags, GlobalFlags...),
Action: fmtGenMain,
Hidden: true,
CustomHelpTemplate: `NAME:
{{.HelpName}} - {{.Usage}}

View File

@ -1,24 +0,0 @@
// Copyright (c) 2015-2024 MinIO, Inc.
//
// # This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
//go:build !fmtgen
package cmd
import "github.com/minio/cli"
var fmtGenCmd cli.Command

View File

@ -910,7 +910,7 @@ type AuditLogOptions struct {
Object string
VersionID string
Error string
Tags map[string]interface{}
Tags map[string]string
}
// sends audit logs for internal subsystem activity
@ -935,7 +935,7 @@ func auditLogInternal(ctx context.Context, opts AuditLogOptions) {
// Merge tag information if found - this is currently needed for tags
// set during decommissioning.
if reqInfo := logger.GetReqInfo(ctx); reqInfo != nil {
reqInfo.PopulateTagsMap(entry.Tags)
reqInfo.PopulateTagsMap(opts.Tags)
}
ctx = logger.SetAuditEntry(ctx, &entry)
logger.AuditLog(ctx, nil, nil, nil)

View File

@ -33,7 +33,7 @@ const contextLogKey = contextKeyType("miniolog")
// KeyVal - appended to ReqInfo.Tags
type KeyVal struct {
Key string
Val interface{}
Val string
}
// ObjectVersion object version key/versionId
@ -77,7 +77,7 @@ func NewReqInfo(remoteHost, userAgent, deploymentID, requestID, api, bucket, obj
}
// AppendTags - appends key/val to ReqInfo.tags
func (r *ReqInfo) AppendTags(key string, val interface{}) *ReqInfo {
func (r *ReqInfo) AppendTags(key, val string) *ReqInfo {
if r == nil {
return nil
}
@ -88,7 +88,7 @@ func (r *ReqInfo) AppendTags(key string, val interface{}) *ReqInfo {
}
// SetTags - sets key/val to ReqInfo.tags
func (r *ReqInfo) SetTags(key string, val interface{}) *ReqInfo {
func (r *ReqInfo) SetTags(key, val string) *ReqInfo {
if r == nil {
return nil
}
@ -121,13 +121,13 @@ func (r *ReqInfo) GetTags() []KeyVal {
}
// GetTagsMap - returns the user defined tags in a map structure
func (r *ReqInfo) GetTagsMap() map[string]interface{} {
func (r *ReqInfo) GetTagsMap() map[string]string {
if r == nil {
return nil
}
r.RLock()
defer r.RUnlock()
m := make(map[string]interface{}, len(r.tags))
m := make(map[string]string, len(r.tags))
for _, t := range r.tags {
m[t.Key] = t.Val
}
@ -135,7 +135,7 @@ func (r *ReqInfo) GetTagsMap() map[string]interface{} {
}
// PopulateTagsMap - returns the user defined tags in a map structure
func (r *ReqInfo) PopulateTagsMap(tagsMap map[string]interface{}) {
func (r *ReqInfo) PopulateTagsMap(tagsMap map[string]string) {
if r == nil {
return
}