From 74ccee66198da4ef1cc331924ab477da1fdfe63e Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Wed, 6 Mar 2024 03:43:16 -0800 Subject: [PATCH] avoid too much auditing during decom/rebalance make it more robust (#19174) there can be a sudden spike in tiny allocations, due to too much auditing being done, also don't hang on the ``` h.logCh <- entry ``` after initializing workers if you do not have a way to dequeue for some reason. --- cmd/erasure-multipart.go | 24 +++-- cmd/erasure-object.go | 26 ++++-- cmd/erasure-server-pool-decom.go | 37 +++++--- cmd/erasure-server-pool-rebalance.go | 130 ++++++++++++++++----------- cmd/object-api-interface.go | 1 + cmd/utils.go | 19 ++-- docs/debugging/pprofgoparser/main.go | 1 - internal/logger/logger.go | 39 +++++--- internal/logger/reqinfo.go | 16 ++++ internal/logger/target/http/http.go | 9 +- 10 files changed, 202 insertions(+), 100 deletions(-) diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index 6e7412db9..21c48c592 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -505,7 +505,9 @@ func (er erasureObjects) newMultipartUpload(ctx context.Context, bucket string, // // Implements S3 compatible initiate multipart API. func (er erasureObjects) NewMultipartUpload(ctx context.Context, bucket, object string, opts ObjectOptions) (*NewMultipartUploadResult, error) { - auditObjectErasureSet(ctx, object, &er) + if !opts.NoAuditLog { + auditObjectErasureSet(ctx, object, &er) + } return er.newMultipartUpload(ctx, bucket, object, opts) } @@ -584,7 +586,9 @@ func writeAllDisks(ctx context.Context, disks []StorageAPI, dstBucket, dstEntry // // Implements S3 compatible Upload Part API. func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *PutObjReader, opts ObjectOptions) (pi PartInfo, err error) { - auditObjectErasureSet(ctx, object, &er) + if !opts.NoAuditLog { + auditObjectErasureSet(ctx, object, &er) + } data := r.Reader // Validate input data size and it can never be less than zero. @@ -783,7 +787,9 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo // - compressed // Does not contain currently uploaded parts by design. func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (MultipartInfo, error) { - auditObjectErasureSet(ctx, object, &er) + if !opts.NoAuditLog { + auditObjectErasureSet(ctx, object, &er) + } result := MultipartInfo{ Bucket: bucket, @@ -819,7 +825,9 @@ func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, u // ListPartsInfo structure is marshaled directly into XML and // replied back to the client. func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker, maxParts int, opts ObjectOptions) (result ListPartsInfo, err error) { - auditObjectErasureSet(ctx, object, &er) + if !opts.NoAuditLog { + auditObjectErasureSet(ctx, object, &er) + } uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID)) lkctx, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout) @@ -975,7 +983,9 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up // // Implements S3 compatible Complete multipart API. func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket string, object string, uploadID string, parts []CompletePart, opts ObjectOptions) (oi ObjectInfo, err error) { - auditObjectErasureSet(ctx, object, &er) + if !opts.NoAuditLog { + auditObjectErasureSet(ctx, object, &er) + } // Hold write locks to verify uploaded parts, also disallows any // parallel PutObjectPart() requests. @@ -1342,7 +1352,9 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str // would be removed from the system, rollback is not possible on this // operation. func (er erasureObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (err error) { - auditObjectErasureSet(ctx, object, &er) + if !opts.NoAuditLog { + auditObjectErasureSet(ctx, object, &er) + } lk := er.NewNSLock(bucket, pathJoin(object, uploadID)) lkctx, err := lk.GetLock(ctx, globalOperationTimeout) diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index e7e50f3f7..a22d0416e 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -67,7 +67,9 @@ func countOnlineDisks(onlineDisks []StorageAPI) (online int) { // if source object and destination object are same we only // update metadata. func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (oi ObjectInfo, err error) { - auditObjectErasureSet(ctx, dstObject, &er) + if !dstOpts.NoAuditLog { + auditObjectErasureSet(ctx, dstObject, &er) + } // This call shouldn't be used for anything other than metadata updates or adding self referential versions. if !srcInfo.metadataOnly { @@ -189,7 +191,9 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d // GetObjectNInfo - returns object info and an object // Read(Closer). When err != nil, the returned reader is always nil. func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, err error) { - auditObjectErasureSet(ctx, object, &er) + if !opts.NoAuditLog { + auditObjectErasureSet(ctx, object, &er) + } var unlockOnDefer bool nsUnlocker := func() {} @@ -420,7 +424,9 @@ func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, obje // GetObjectInfo - reads object metadata and replies back ObjectInfo. func (er erasureObjects) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (info ObjectInfo, err error) { - auditObjectErasureSet(ctx, object, &er) + if !opts.NoAuditLog { + auditObjectErasureSet(ctx, object, &er) + } if !opts.NoLock { // Lock the object before reading. @@ -1254,7 +1260,9 @@ func healObjectVersionsDisparity(bucket string, entry metaCacheEntry, scanMode m // putObject wrapper for erasureObjects PutObject func (er erasureObjects) putObject(ctx context.Context, bucket string, object string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) { - auditObjectErasureSet(ctx, object, &er) + if !opts.NoAuditLog { + auditObjectErasureSet(ctx, object, &er) + } data := r.Reader @@ -1611,8 +1619,10 @@ func (er erasureObjects) deleteObjectVersion(ctx context.Context, bucket, object // into smaller bulks if some object names are found to be duplicated in the delete list, splitting // into smaller bulks will avoid holding twice the write lock of the duplicated object names. func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objects []ObjectToDelete, opts ObjectOptions) ([]DeletedObject, []error) { - for _, obj := range objects { - auditObjectErasureSet(ctx, obj.ObjectV.ObjectName, &er) + if !opts.NoAuditLog { + for _, obj := range objects { + auditObjectErasureSet(ctx, obj.ObjectV.ObjectName, &er) + } } errs := make([]error, len(objects)) @@ -1813,7 +1823,9 @@ func (er erasureObjects) deletePrefix(ctx context.Context, bucket, prefix string // any error as it is not necessary for the handler to reply back a // response to the client request. func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) { - auditObjectErasureSet(ctx, object, &er) + if !opts.NoAuditLog { + auditObjectErasureSet(ctx, object, &er) + } if opts.DeletePrefix { if globalCacheConfig.Enabled() { diff --git a/cmd/erasure-server-pool-decom.go b/cmd/erasure-server-pool-decom.go index 65d68e62f..26ff091be 100644 --- a/cmd/erasure-server-pool-decom.go +++ b/cmd/erasure-server-pool-decom.go @@ -26,7 +26,6 @@ import ( "math/rand" "net/http" "sort" - "strconv" "strings" "time" @@ -620,11 +619,12 @@ func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket stri res, err := z.NewMultipartUpload(ctx, bucket, objInfo.Name, ObjectOptions{ VersionID: objInfo.VersionID, UserDefined: objInfo.UserDefined, + NoAuditLog: true, }) if err != nil { return fmt.Errorf("decommissionObject: NewMultipartUpload() %w", err) } - defer z.AbortMultipartUpload(ctx, bucket, objInfo.Name, res.UploadID, ObjectOptions{}) + defer z.AbortMultipartUpload(ctx, bucket, objInfo.Name, res.UploadID, ObjectOptions{NoAuditLog: true}) parts := make([]CompletePart, len(objInfo.Parts)) for i, part := range objInfo.Parts { hr, err := hash.NewReader(ctx, io.LimitReader(gr, part.Size), part.Size, "", "", part.ActualSize) @@ -639,6 +639,7 @@ func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket stri IndexCB: func() []byte { return part.Index // Preserve part Index to ensure decompression works. }, + NoAuditLog: true, }) if err != nil { return fmt.Errorf("decommissionObject: PutObjectPart() %w", err) @@ -655,6 +656,7 @@ func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket stri _, err = z.CompleteMultipartUpload(ctx, bucket, objInfo.Name, res.UploadID, parts, ObjectOptions{ DataMovement: true, MTime: objInfo.ModTime, + NoAuditLog: true, }) if err != nil { err = fmt.Errorf("decommissionObject: CompleteMultipartUpload() %w", err) @@ -680,6 +682,7 @@ func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket stri IndexCB: func() []byte { return objInfo.Parts[0].Index // Preserve part Index to ensure decompression works. }, + NoAuditLog: true, }) if err != nil { err = fmt.Errorf("decommissionObject: PutObject() %w", err) @@ -699,17 +702,18 @@ func (v versionsSorter) reverse() { } func (set *erasureObjects) listObjectsToDecommission(ctx context.Context, bi decomBucketInfo, fn func(entry metaCacheEntry)) error { - disks := set.getOnlineDisks() + disks, _ := set.getOnlineDisksWithHealing(false) if len(disks) == 0 { return fmt.Errorf("no online drives found for set with endpoints %s", set.getEndpoints()) } - listQuorum := (len(disks) + 1) / 2 + // However many we ask, versions must exist on ~50% + listingQuorum := (set.setDriveCount + 1) / 2 // How to resolve partial results. resolver := metadataResolutionParams{ - dirQuorum: listQuorum, // make sure to capture all quorum ratios - objQuorum: listQuorum, // make sure to capture all quorum ratios + dirQuorum: listingQuorum, // make sure to capture all quorum ratios + objQuorum: listingQuorum, // make sure to capture all quorum ratios bucket: bi.Name, } @@ -719,7 +723,7 @@ func (set *erasureObjects) listObjectsToDecommission(ctx context.Context, bi dec path: bi.Prefix, recursive: true, forwardTo: "", - minDisks: listQuorum, + minDisks: listingQuorum, reportNotFound: false, agreed: fn, partial: func(entries metaCacheEntries, _ []error) { @@ -736,14 +740,15 @@ func (set *erasureObjects) listObjectsToDecommission(ctx context.Context, bi dec func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool *erasureSets, bi decomBucketInfo) error { ctx = logger.SetReqInfo(ctx, &logger.ReqInfo{}) - wStr := env.Get("_MINIO_DECOMMISSION_WORKERS", strconv.Itoa(len(pool.sets))) - workerSize, err := strconv.Atoi(wStr) + const envDecomWorkers = "_MINIO_DECOMMISSION_WORKERS" + workerSize, err := env.GetInt(envDecomWorkers, len(pool.sets)) if err != nil { - return err + logger.LogIf(ctx, fmt.Errorf("invalid workers value err: %v, defaulting to %d", err, len(pool.sets))) + workerSize = len(pool.sets) } - // each set get its own thread separate from the concurrent - // objects/versions being decommissioned. + // Each decom worker needs one List() goroutine/worker + // add that many extra workers. workerSize += len(pool.sets) wk, err := workers.New(workerSize) @@ -839,6 +844,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool DeleteReplication: version.ReplicationState, DeleteMarker: true, // make sure we create a delete marker SkipDecommissioned: true, // make sure we skip the decommissioned pool + NoAuditLog: true, }) var failure bool if err != nil { @@ -858,6 +864,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool // Success keep a count. decommissioned++ } + auditLogDecom(ctx, "DecomCopyDeleteMarker", bi.Name, version.Name, versionID, err) continue } @@ -888,6 +895,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool VersionID: versionID, NoDecryption: true, NoLock: true, + NoAuditLog: true, }) if isErrObjectNotFound(err) || isErrVersionNotFound(err) { // object deleted by the application, nothing to do here we move on. @@ -941,6 +949,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool ObjectOptions{ DeletePrefix: true, // use prefix delete to delete all versions at once. DeletePrefixObject: true, // use prefix delete on exact object (this is an optimization to avoid fan-out calls) + NoAuditLog: true, }, ) stopFn(err) @@ -965,6 +974,10 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool // We will perpetually retry listing if it fails, since we cannot // possibly give up in this matter for { + if contextCanceled(ctx) { + break + } + err := set.listObjectsToDecommission(ctx, bi, func(entry metaCacheEntry) { wk.Take() diff --git a/cmd/erasure-server-pool-rebalance.go b/cmd/erasure-server-pool-rebalance.go index 3b7e8c13f..17c1d05fd 100644 --- a/cmd/erasure-server-pool-rebalance.go +++ b/cmd/erasure-server-pool-rebalance.go @@ -26,17 +26,17 @@ import ( "math" "math/rand" "net/http" - "strconv" "strings" - "sync" "time" + "github.com/dustin/go-humanize" "github.com/lithammer/shortuuid/v4" "github.com/minio/madmin-go/v3" "github.com/minio/minio/internal/hash" xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" "github.com/minio/pkg/v2/env" + "github.com/minio/pkg/v2/workers" ) //go:generate msgp -file $GOFILE -unexported @@ -485,6 +485,41 @@ func (z *erasureServerPools) checkIfRebalanceDone(poolIdx int) bool { return false } +func (set *erasureObjects) listObjectsToRebalance(ctx context.Context, bucketName string, fn func(entry metaCacheEntry)) error { + disks, _ := set.getOnlineDisksWithHealing(false) + if len(disks) == 0 { + return fmt.Errorf("no online drives found for set with endpoints %s", set.getEndpoints()) + } + + // However many we ask, versions must exist on ~50% + listingQuorum := (set.setDriveCount + 1) / 2 + + // How to resolve partial results. + resolver := metadataResolutionParams{ + dirQuorum: listingQuorum, // make sure to capture all quorum ratios + objQuorum: listingQuorum, // make sure to capture all quorum ratios + bucket: bucketName, + } + + err := listPathRaw(ctx, listPathRawOptions{ + disks: disks, + bucket: bucketName, + recursive: true, + forwardTo: "", + minDisks: listingQuorum, + reportNotFound: false, + agreed: fn, + partial: func(entries metaCacheEntries, _ []error) { + entry, ok := entries.resolve(&resolver) + if ok { + fn(*entry) + } + }, + finished: nil, + }) + return err +} + // rebalanceBucket rebalances objects under bucket in poolIdx pool func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string, poolIdx int) error { ctx = logger.SetReqInfo(ctx, &logger.ReqInfo{}) @@ -496,23 +531,25 @@ func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string, rcfg, _ := getReplicationConfig(ctx, bucket) pool := z.serverPools[poolIdx] + const envRebalanceWorkers = "_MINIO_REBALANCE_WORKERS" - wStr := env.Get(envRebalanceWorkers, strconv.Itoa(len(pool.sets))) - workerSize, err := strconv.Atoi(wStr) + workerSize, err := env.GetInt(envRebalanceWorkers, len(pool.sets)) if err != nil { - logger.LogIf(ctx, fmt.Errorf("invalid %s value: %s err: %v, defaulting to %d", envRebalanceWorkers, wStr, err, len(pool.sets))) + logger.LogIf(ctx, fmt.Errorf("invalid workers value err: %v, defaulting to %d", err, len(pool.sets))) workerSize = len(pool.sets) } - workers := make(chan struct{}, workerSize) - var wg sync.WaitGroup - for _, set := range pool.sets { + + // Each decom worker needs one List() goroutine/worker + // add that many extra workers. + workerSize += len(pool.sets) + + wk, err := workers.New(workerSize) + if err != nil { + return err + } + + for setIdx, set := range pool.sets { set := set - disks := set.getOnlineDisks() - if len(disks) == 0 { - logger.LogIf(ctx, fmt.Errorf("no online disks found for set with endpoints %s", - set.getEndpoints())) - continue - } filterLifecycle := func(bucket, object string, fi FileInfo) bool { if lc == nil { @@ -531,10 +568,7 @@ func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string, } rebalanceEntry := func(entry metaCacheEntry) { - defer func() { - <-workers - wg.Done() - }() + defer wk.Give() if entry.isDir() { return @@ -592,6 +626,7 @@ func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string, DeleteReplication: version.ReplicationState, DeleteMarker: true, // make sure we create a delete marker SkipRebalancing: true, // make sure we skip the decommissioned pool + NoAuditLog: true, }) var failure bool if err != nil && !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { @@ -603,6 +638,7 @@ func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string, z.updatePoolStats(poolIdx, bucket, version) rebalanced++ } + auditLogRebalance(ctx, "Rebalance:DeleteMarker", bucket, version.Name, versionID, err) continue } @@ -619,6 +655,7 @@ func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string, VersionID: versionID, NoDecryption: true, NoLock: true, + NoAuditLog: true, }) if isErrObjectNotFound(err) || isErrVersionNotFound(err) { // object deleted by the application, nothing to do here we move on. @@ -663,6 +700,7 @@ func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string, ObjectOptions{ DeletePrefix: true, // use prefix delete to delete all versions at once. DeletePrefixObject: true, // use prefix delete on exact object (this is an optimization to avoid fan-out calls) + NoAuditLog: true, }, ) stopFn(err) @@ -673,44 +711,24 @@ func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string, } } - wg.Add(1) - go func() { - defer wg.Done() - - listQuorum := (len(disks) + 1) / 2 - - // How to resolve partial results. - resolver := metadataResolutionParams{ - dirQuorum: listQuorum, // make sure to capture all quorum ratios - objQuorum: listQuorum, // make sure to capture all quorum ratios - bucket: bucket, - } - err := listPathRaw(ctx, listPathRawOptions{ - disks: disks, - bucket: bucket, - recursive: true, - forwardTo: "", - minDisks: listQuorum, - reportNotFound: false, - agreed: func(entry metaCacheEntry) { - workers <- struct{}{} - wg.Add(1) + wk.Take() + go func(setIdx int) { + defer wk.Give() + err := set.listObjectsToRebalance(ctx, bucket, + func(entry metaCacheEntry) { + wk.Take() go rebalanceEntry(entry) }, - partial: func(entries metaCacheEntries, _ []error) { - entry, ok := entries.resolve(&resolver) - if ok { - workers <- struct{}{} - wg.Add(1) - go rebalanceEntry(*entry) - } - }, - finished: nil, - }) - logger.LogIf(ctx, err) - }() + ) + if err == nil || errors.Is(err, context.Canceled) { + return + } + setN := humanize.Ordinal(setIdx + 1) + logger.LogOnceIf(ctx, fmt.Errorf("listing objects from %s set failed with %v", setN, err), "rebalance-listing-failed"+setN) + }(setIdx) } - wg.Wait() + + wk.Wait() return nil } @@ -783,11 +801,12 @@ func (z *erasureServerPools) rebalanceObject(ctx context.Context, bucket string, res, err := z.NewMultipartUpload(ctx, bucket, oi.Name, ObjectOptions{ VersionID: oi.VersionID, UserDefined: oi.UserDefined, + NoAuditLog: true, }) if err != nil { return fmt.Errorf("rebalanceObject: NewMultipartUpload() %w", err) } - defer z.AbortMultipartUpload(ctx, bucket, oi.Name, res.UploadID, ObjectOptions{}) + defer z.AbortMultipartUpload(ctx, bucket, oi.Name, res.UploadID, ObjectOptions{NoAuditLog: true}) parts := make([]CompletePart, len(oi.Parts)) for i, part := range oi.Parts { @@ -803,6 +822,7 @@ func (z *erasureServerPools) rebalanceObject(ctx context.Context, bucket string, IndexCB: func() []byte { return part.Index // Preserve part Index to ensure decompression works. }, + NoAuditLog: true, }) if err != nil { return fmt.Errorf("rebalanceObject: PutObjectPart() %w", err) @@ -815,6 +835,7 @@ func (z *erasureServerPools) rebalanceObject(ctx context.Context, bucket string, _, err = z.CompleteMultipartUpload(ctx, bucket, oi.Name, res.UploadID, parts, ObjectOptions{ DataMovement: true, MTime: oi.ModTime, + NoAuditLog: true, }) if err != nil { err = fmt.Errorf("rebalanceObject: CompleteMultipartUpload() %w", err) @@ -840,6 +861,7 @@ func (z *erasureServerPools) rebalanceObject(ctx context.Context, bucket string, IndexCB: func() []byte { return oi.Parts[0].Index // Preserve part Index to ensure decompression works. }, + NoAuditLog: true, }) if err != nil { err = fmt.Errorf("rebalanceObject: PutObject() %w", err) diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index 127cc75dd..2758b775f 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -129,6 +129,7 @@ type ObjectOptions struct { EvalRetentionBypassFn EvalRetentionBypassFn // only set for enforcing retention bypass on DeleteObject. FastGetObjInfo bool // Only for S3 Head/Get Object calls for now + NoAuditLog bool // Only set for decom, rebalance, to avoid double audits. } // WalkOptions provides filtering, marker and other Walk() specific options. diff --git a/cmd/utils.go b/cmd/utils.go index 3c13fe9e8..c50a27f97 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -984,18 +984,19 @@ func auditLogInternal(ctx context.Context, opts AuditLogOptions) { entry.API.Bucket = opts.Bucket entry.API.Objects = []pkgAudit.ObjectVersion{{ObjectName: opts.Object, VersionID: opts.VersionID}} entry.API.Status = opts.Status - entry.Tags = opts.Tags + if len(opts.Tags) > 0 { + entry.Tags = make(map[string]interface{}, len(opts.Tags)) + for k, v := range opts.Tags { + entry.Tags[k] = v + } + } else { + entry.Tags = make(map[string]interface{}) + } + // Merge tag information if found - this is currently needed for tags // set during decommissioning. if reqInfo := logger.GetReqInfo(ctx); reqInfo != nil { - if tags := reqInfo.GetTagsMap(); len(tags) > 0 { - if entry.Tags == nil { - entry.Tags = make(map[string]interface{}, len(tags)) - } - for k, v := range tags { - entry.Tags[k] = v - } - } + reqInfo.PopulateTagsMap(entry.Tags) } ctx = logger.SetAuditEntry(ctx, &entry) logger.AuditLog(ctx, nil, nil, nil) diff --git a/docs/debugging/pprofgoparser/main.go b/docs/debugging/pprofgoparser/main.go index b35f25638..74c5e9385 100644 --- a/docs/debugging/pprofgoparser/main.go +++ b/docs/debugging/pprofgoparser/main.go @@ -41,7 +41,6 @@ var ( ) func init() { - flag.DurationVar(&less, "less", 0, "goroutine waiting less than the specified time") flag.DurationVar(&goTime, "time", 0, "goroutine waiting for exactly the specified time") flag.DurationVar(&margin, "margin", 0, "margin time") diff --git a/internal/logger/logger.go b/internal/logger/logger.go index 879b95f5e..3ef2f8fb1 100644 --- a/internal/logger/logger.go +++ b/internal/logger/logger.go @@ -26,12 +26,12 @@ import ( "path/filepath" "reflect" "runtime" + "sort" "strings" "time" "github.com/minio/highwayhash" "github.com/minio/madmin-go/v3" - "github.com/minio/minio-go/v7/pkg/set" xhttp "github.com/minio/minio/internal/http" "github.com/minio/pkg/v2/logger/message/log" ) @@ -104,15 +104,32 @@ func RegisterError(f func(string, error, bool) string) { errorFmtFunc = f } -// Remove any duplicates and return unique entries. -func uniqueEntries(paths []string) []string { - m := make(set.StringSet) - for _, p := range paths { - if !m.Contains(p) { - m.Add(p) +// uniq swaps away duplicate elements in data, returning the size of the +// unique set. data is expected to be pre-sorted, and the resulting set in +// the range [0:size] will remain in sorted order. Uniq, following a +// sort.Sort call, can be used to prepare arbitrary inputs for use as sets. +func uniq(data sort.Interface) (size int) { + p, l := 0, data.Len() + if l <= 1 { + return l + } + for i := 1; i < l; i++ { + if !data.Less(p, i) { + continue + } + p++ + if p < i { + data.Swap(p, i) } } - return m.ToSlice() + return p + 1 +} + +// Remove any duplicates and return unique entries. +func uniqueEntries(paths []string) []string { + sort.Strings(paths) + n := uniq(sort.StringSlice(paths)) + return paths[:n] } // Init sets the trimStrings to possible GOPATHs @@ -359,9 +376,11 @@ func consoleLogIf(ctx context.Context, err error, errKind ...interface{}) { if DisableErrorLog { return } + if err == nil { + return + } if consoleTgt != nil { - entry := errToEntry(ctx, err, errKind...) - consoleTgt.Send(ctx, entry) + consoleTgt.Send(ctx, errToEntry(ctx, err, errKind...)) } } diff --git a/internal/logger/reqinfo.go b/internal/logger/reqinfo.go index ccc094393..10d71a5d7 100644 --- a/internal/logger/reqinfo.go +++ b/internal/logger/reqinfo.go @@ -134,6 +134,22 @@ func (r *ReqInfo) GetTagsMap() map[string]interface{} { return m } +// PopulateTagsMap - returns the user defined tags in a map structure +func (r *ReqInfo) PopulateTagsMap(tagsMap map[string]interface{}) { + if r == nil { + return + } + if tagsMap == nil { + return + } + r.RLock() + defer r.RUnlock() + for _, t := range r.tags { + tagsMap[t.Key] = t.Val + } + return +} + // SetReqInfo sets ReqInfo in the context. func SetReqInfo(ctx context.Context, req *ReqInfo) context.Context { if ctx == nil { diff --git a/internal/logger/target/http/http.go b/internal/logger/target/http/http.go index cc58c4c83..9aa3377bf 100644 --- a/internal/logger/target/http/http.go +++ b/internal/logger/target/http/http.go @@ -416,7 +416,14 @@ func (h *Target) Send(ctx context.Context, entry interface{}) error { go h.startHTTPLogger(ctx) } } - h.logCh <- entry + select { + case h.logCh <- entry: + case <-ctx.Done(): + // return error only for context timedout. + if errors.Is(ctx.Err(), context.DeadlineExceeded) { + return ctx.Err() + } + } return nil } atomic.AddInt64(&h.totalMessages, 1)