mirror of https://github.com/minio/minio.git
avoid too much auditing during decom/rebalance make it more robust (#19174)
there can be a sudden spike in tiny allocations, due to too much auditing being done, also don't hang on the ``` h.logCh <- entry ``` after initializing workers if you do not have a way to dequeue for some reason.
This commit is contained in:
parent
c26b8d4eb8
commit
74ccee6619
|
@ -505,7 +505,9 @@ func (er erasureObjects) newMultipartUpload(ctx context.Context, bucket string,
|
||||||
//
|
//
|
||||||
// Implements S3 compatible initiate multipart API.
|
// Implements S3 compatible initiate multipart API.
|
||||||
func (er erasureObjects) NewMultipartUpload(ctx context.Context, bucket, object string, opts ObjectOptions) (*NewMultipartUploadResult, error) {
|
func (er erasureObjects) NewMultipartUpload(ctx context.Context, bucket, object string, opts ObjectOptions) (*NewMultipartUploadResult, error) {
|
||||||
auditObjectErasureSet(ctx, object, &er)
|
if !opts.NoAuditLog {
|
||||||
|
auditObjectErasureSet(ctx, object, &er)
|
||||||
|
}
|
||||||
|
|
||||||
return er.newMultipartUpload(ctx, bucket, object, opts)
|
return er.newMultipartUpload(ctx, bucket, object, opts)
|
||||||
}
|
}
|
||||||
|
@ -584,7 +586,9 @@ func writeAllDisks(ctx context.Context, disks []StorageAPI, dstBucket, dstEntry
|
||||||
//
|
//
|
||||||
// Implements S3 compatible Upload Part API.
|
// Implements S3 compatible Upload Part API.
|
||||||
func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *PutObjReader, opts ObjectOptions) (pi PartInfo, err error) {
|
func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *PutObjReader, opts ObjectOptions) (pi PartInfo, err error) {
|
||||||
auditObjectErasureSet(ctx, object, &er)
|
if !opts.NoAuditLog {
|
||||||
|
auditObjectErasureSet(ctx, object, &er)
|
||||||
|
}
|
||||||
|
|
||||||
data := r.Reader
|
data := r.Reader
|
||||||
// Validate input data size and it can never be less than zero.
|
// Validate input data size and it can never be less than zero.
|
||||||
|
@ -783,7 +787,9 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
||||||
// - compressed
|
// - compressed
|
||||||
// Does not contain currently uploaded parts by design.
|
// Does not contain currently uploaded parts by design.
|
||||||
func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (MultipartInfo, error) {
|
func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (MultipartInfo, error) {
|
||||||
auditObjectErasureSet(ctx, object, &er)
|
if !opts.NoAuditLog {
|
||||||
|
auditObjectErasureSet(ctx, object, &er)
|
||||||
|
}
|
||||||
|
|
||||||
result := MultipartInfo{
|
result := MultipartInfo{
|
||||||
Bucket: bucket,
|
Bucket: bucket,
|
||||||
|
@ -819,7 +825,9 @@ func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, u
|
||||||
// ListPartsInfo structure is marshaled directly into XML and
|
// ListPartsInfo structure is marshaled directly into XML and
|
||||||
// replied back to the client.
|
// replied back to the client.
|
||||||
func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker, maxParts int, opts ObjectOptions) (result ListPartsInfo, err error) {
|
func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker, maxParts int, opts ObjectOptions) (result ListPartsInfo, err error) {
|
||||||
auditObjectErasureSet(ctx, object, &er)
|
if !opts.NoAuditLog {
|
||||||
|
auditObjectErasureSet(ctx, object, &er)
|
||||||
|
}
|
||||||
|
|
||||||
uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
||||||
lkctx, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout)
|
lkctx, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout)
|
||||||
|
@ -975,7 +983,9 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up
|
||||||
//
|
//
|
||||||
// Implements S3 compatible Complete multipart API.
|
// Implements S3 compatible Complete multipart API.
|
||||||
func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket string, object string, uploadID string, parts []CompletePart, opts ObjectOptions) (oi ObjectInfo, err error) {
|
func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket string, object string, uploadID string, parts []CompletePart, opts ObjectOptions) (oi ObjectInfo, err error) {
|
||||||
auditObjectErasureSet(ctx, object, &er)
|
if !opts.NoAuditLog {
|
||||||
|
auditObjectErasureSet(ctx, object, &er)
|
||||||
|
}
|
||||||
|
|
||||||
// Hold write locks to verify uploaded parts, also disallows any
|
// Hold write locks to verify uploaded parts, also disallows any
|
||||||
// parallel PutObjectPart() requests.
|
// parallel PutObjectPart() requests.
|
||||||
|
@ -1342,7 +1352,9 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
|
||||||
// would be removed from the system, rollback is not possible on this
|
// would be removed from the system, rollback is not possible on this
|
||||||
// operation.
|
// operation.
|
||||||
func (er erasureObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (err error) {
|
func (er erasureObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (err error) {
|
||||||
auditObjectErasureSet(ctx, object, &er)
|
if !opts.NoAuditLog {
|
||||||
|
auditObjectErasureSet(ctx, object, &er)
|
||||||
|
}
|
||||||
|
|
||||||
lk := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
lk := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
||||||
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
|
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
|
||||||
|
|
|
@ -67,7 +67,9 @@ func countOnlineDisks(onlineDisks []StorageAPI) (online int) {
|
||||||
// if source object and destination object are same we only
|
// if source object and destination object are same we only
|
||||||
// update metadata.
|
// update metadata.
|
||||||
func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (oi ObjectInfo, err error) {
|
func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (oi ObjectInfo, err error) {
|
||||||
auditObjectErasureSet(ctx, dstObject, &er)
|
if !dstOpts.NoAuditLog {
|
||||||
|
auditObjectErasureSet(ctx, dstObject, &er)
|
||||||
|
}
|
||||||
|
|
||||||
// This call shouldn't be used for anything other than metadata updates or adding self referential versions.
|
// This call shouldn't be used for anything other than metadata updates or adding self referential versions.
|
||||||
if !srcInfo.metadataOnly {
|
if !srcInfo.metadataOnly {
|
||||||
|
@ -189,7 +191,9 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d
|
||||||
// GetObjectNInfo - returns object info and an object
|
// GetObjectNInfo - returns object info and an object
|
||||||
// Read(Closer). When err != nil, the returned reader is always nil.
|
// Read(Closer). When err != nil, the returned reader is always nil.
|
||||||
func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, err error) {
|
func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, err error) {
|
||||||
auditObjectErasureSet(ctx, object, &er)
|
if !opts.NoAuditLog {
|
||||||
|
auditObjectErasureSet(ctx, object, &er)
|
||||||
|
}
|
||||||
|
|
||||||
var unlockOnDefer bool
|
var unlockOnDefer bool
|
||||||
nsUnlocker := func() {}
|
nsUnlocker := func() {}
|
||||||
|
@ -420,7 +424,9 @@ func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, obje
|
||||||
|
|
||||||
// GetObjectInfo - reads object metadata and replies back ObjectInfo.
|
// GetObjectInfo - reads object metadata and replies back ObjectInfo.
|
||||||
func (er erasureObjects) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (info ObjectInfo, err error) {
|
func (er erasureObjects) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (info ObjectInfo, err error) {
|
||||||
auditObjectErasureSet(ctx, object, &er)
|
if !opts.NoAuditLog {
|
||||||
|
auditObjectErasureSet(ctx, object, &er)
|
||||||
|
}
|
||||||
|
|
||||||
if !opts.NoLock {
|
if !opts.NoLock {
|
||||||
// Lock the object before reading.
|
// Lock the object before reading.
|
||||||
|
@ -1254,7 +1260,9 @@ func healObjectVersionsDisparity(bucket string, entry metaCacheEntry, scanMode m
|
||||||
|
|
||||||
// putObject wrapper for erasureObjects PutObject
|
// putObject wrapper for erasureObjects PutObject
|
||||||
func (er erasureObjects) putObject(ctx context.Context, bucket string, object string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) {
|
func (er erasureObjects) putObject(ctx context.Context, bucket string, object string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) {
|
||||||
auditObjectErasureSet(ctx, object, &er)
|
if !opts.NoAuditLog {
|
||||||
|
auditObjectErasureSet(ctx, object, &er)
|
||||||
|
}
|
||||||
|
|
||||||
data := r.Reader
|
data := r.Reader
|
||||||
|
|
||||||
|
@ -1611,8 +1619,10 @@ func (er erasureObjects) deleteObjectVersion(ctx context.Context, bucket, object
|
||||||
// into smaller bulks if some object names are found to be duplicated in the delete list, splitting
|
// into smaller bulks if some object names are found to be duplicated in the delete list, splitting
|
||||||
// into smaller bulks will avoid holding twice the write lock of the duplicated object names.
|
// into smaller bulks will avoid holding twice the write lock of the duplicated object names.
|
||||||
func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objects []ObjectToDelete, opts ObjectOptions) ([]DeletedObject, []error) {
|
func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objects []ObjectToDelete, opts ObjectOptions) ([]DeletedObject, []error) {
|
||||||
for _, obj := range objects {
|
if !opts.NoAuditLog {
|
||||||
auditObjectErasureSet(ctx, obj.ObjectV.ObjectName, &er)
|
for _, obj := range objects {
|
||||||
|
auditObjectErasureSet(ctx, obj.ObjectV.ObjectName, &er)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
errs := make([]error, len(objects))
|
errs := make([]error, len(objects))
|
||||||
|
@ -1813,7 +1823,9 @@ func (er erasureObjects) deletePrefix(ctx context.Context, bucket, prefix string
|
||||||
// any error as it is not necessary for the handler to reply back a
|
// any error as it is not necessary for the handler to reply back a
|
||||||
// response to the client request.
|
// response to the client request.
|
||||||
func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
|
func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
|
||||||
auditObjectErasureSet(ctx, object, &er)
|
if !opts.NoAuditLog {
|
||||||
|
auditObjectErasureSet(ctx, object, &er)
|
||||||
|
}
|
||||||
|
|
||||||
if opts.DeletePrefix {
|
if opts.DeletePrefix {
|
||||||
if globalCacheConfig.Enabled() {
|
if globalCacheConfig.Enabled() {
|
||||||
|
|
|
@ -26,7 +26,6 @@ import (
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"net/http"
|
"net/http"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -620,11 +619,12 @@ func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket stri
|
||||||
res, err := z.NewMultipartUpload(ctx, bucket, objInfo.Name, ObjectOptions{
|
res, err := z.NewMultipartUpload(ctx, bucket, objInfo.Name, ObjectOptions{
|
||||||
VersionID: objInfo.VersionID,
|
VersionID: objInfo.VersionID,
|
||||||
UserDefined: objInfo.UserDefined,
|
UserDefined: objInfo.UserDefined,
|
||||||
|
NoAuditLog: true,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("decommissionObject: NewMultipartUpload() %w", err)
|
return fmt.Errorf("decommissionObject: NewMultipartUpload() %w", err)
|
||||||
}
|
}
|
||||||
defer z.AbortMultipartUpload(ctx, bucket, objInfo.Name, res.UploadID, ObjectOptions{})
|
defer z.AbortMultipartUpload(ctx, bucket, objInfo.Name, res.UploadID, ObjectOptions{NoAuditLog: true})
|
||||||
parts := make([]CompletePart, len(objInfo.Parts))
|
parts := make([]CompletePart, len(objInfo.Parts))
|
||||||
for i, part := range objInfo.Parts {
|
for i, part := range objInfo.Parts {
|
||||||
hr, err := hash.NewReader(ctx, io.LimitReader(gr, part.Size), part.Size, "", "", part.ActualSize)
|
hr, err := hash.NewReader(ctx, io.LimitReader(gr, part.Size), part.Size, "", "", part.ActualSize)
|
||||||
|
@ -639,6 +639,7 @@ func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket stri
|
||||||
IndexCB: func() []byte {
|
IndexCB: func() []byte {
|
||||||
return part.Index // Preserve part Index to ensure decompression works.
|
return part.Index // Preserve part Index to ensure decompression works.
|
||||||
},
|
},
|
||||||
|
NoAuditLog: true,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("decommissionObject: PutObjectPart() %w", err)
|
return fmt.Errorf("decommissionObject: PutObjectPart() %w", err)
|
||||||
|
@ -655,6 +656,7 @@ func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket stri
|
||||||
_, err = z.CompleteMultipartUpload(ctx, bucket, objInfo.Name, res.UploadID, parts, ObjectOptions{
|
_, err = z.CompleteMultipartUpload(ctx, bucket, objInfo.Name, res.UploadID, parts, ObjectOptions{
|
||||||
DataMovement: true,
|
DataMovement: true,
|
||||||
MTime: objInfo.ModTime,
|
MTime: objInfo.ModTime,
|
||||||
|
NoAuditLog: true,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("decommissionObject: CompleteMultipartUpload() %w", err)
|
err = fmt.Errorf("decommissionObject: CompleteMultipartUpload() %w", err)
|
||||||
|
@ -680,6 +682,7 @@ func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket stri
|
||||||
IndexCB: func() []byte {
|
IndexCB: func() []byte {
|
||||||
return objInfo.Parts[0].Index // Preserve part Index to ensure decompression works.
|
return objInfo.Parts[0].Index // Preserve part Index to ensure decompression works.
|
||||||
},
|
},
|
||||||
|
NoAuditLog: true,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("decommissionObject: PutObject() %w", err)
|
err = fmt.Errorf("decommissionObject: PutObject() %w", err)
|
||||||
|
@ -699,17 +702,18 @@ func (v versionsSorter) reverse() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (set *erasureObjects) listObjectsToDecommission(ctx context.Context, bi decomBucketInfo, fn func(entry metaCacheEntry)) error {
|
func (set *erasureObjects) listObjectsToDecommission(ctx context.Context, bi decomBucketInfo, fn func(entry metaCacheEntry)) error {
|
||||||
disks := set.getOnlineDisks()
|
disks, _ := set.getOnlineDisksWithHealing(false)
|
||||||
if len(disks) == 0 {
|
if len(disks) == 0 {
|
||||||
return fmt.Errorf("no online drives found for set with endpoints %s", set.getEndpoints())
|
return fmt.Errorf("no online drives found for set with endpoints %s", set.getEndpoints())
|
||||||
}
|
}
|
||||||
|
|
||||||
listQuorum := (len(disks) + 1) / 2
|
// However many we ask, versions must exist on ~50%
|
||||||
|
listingQuorum := (set.setDriveCount + 1) / 2
|
||||||
|
|
||||||
// How to resolve partial results.
|
// How to resolve partial results.
|
||||||
resolver := metadataResolutionParams{
|
resolver := metadataResolutionParams{
|
||||||
dirQuorum: listQuorum, // make sure to capture all quorum ratios
|
dirQuorum: listingQuorum, // make sure to capture all quorum ratios
|
||||||
objQuorum: listQuorum, // make sure to capture all quorum ratios
|
objQuorum: listingQuorum, // make sure to capture all quorum ratios
|
||||||
bucket: bi.Name,
|
bucket: bi.Name,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -719,7 +723,7 @@ func (set *erasureObjects) listObjectsToDecommission(ctx context.Context, bi dec
|
||||||
path: bi.Prefix,
|
path: bi.Prefix,
|
||||||
recursive: true,
|
recursive: true,
|
||||||
forwardTo: "",
|
forwardTo: "",
|
||||||
minDisks: listQuorum,
|
minDisks: listingQuorum,
|
||||||
reportNotFound: false,
|
reportNotFound: false,
|
||||||
agreed: fn,
|
agreed: fn,
|
||||||
partial: func(entries metaCacheEntries, _ []error) {
|
partial: func(entries metaCacheEntries, _ []error) {
|
||||||
|
@ -736,14 +740,15 @@ func (set *erasureObjects) listObjectsToDecommission(ctx context.Context, bi dec
|
||||||
func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool *erasureSets, bi decomBucketInfo) error {
|
func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool *erasureSets, bi decomBucketInfo) error {
|
||||||
ctx = logger.SetReqInfo(ctx, &logger.ReqInfo{})
|
ctx = logger.SetReqInfo(ctx, &logger.ReqInfo{})
|
||||||
|
|
||||||
wStr := env.Get("_MINIO_DECOMMISSION_WORKERS", strconv.Itoa(len(pool.sets)))
|
const envDecomWorkers = "_MINIO_DECOMMISSION_WORKERS"
|
||||||
workerSize, err := strconv.Atoi(wStr)
|
workerSize, err := env.GetInt(envDecomWorkers, len(pool.sets))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
logger.LogIf(ctx, fmt.Errorf("invalid workers value err: %v, defaulting to %d", err, len(pool.sets)))
|
||||||
|
workerSize = len(pool.sets)
|
||||||
}
|
}
|
||||||
|
|
||||||
// each set get its own thread separate from the concurrent
|
// Each decom worker needs one List() goroutine/worker
|
||||||
// objects/versions being decommissioned.
|
// add that many extra workers.
|
||||||
workerSize += len(pool.sets)
|
workerSize += len(pool.sets)
|
||||||
|
|
||||||
wk, err := workers.New(workerSize)
|
wk, err := workers.New(workerSize)
|
||||||
|
@ -839,6 +844,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
|
||||||
DeleteReplication: version.ReplicationState,
|
DeleteReplication: version.ReplicationState,
|
||||||
DeleteMarker: true, // make sure we create a delete marker
|
DeleteMarker: true, // make sure we create a delete marker
|
||||||
SkipDecommissioned: true, // make sure we skip the decommissioned pool
|
SkipDecommissioned: true, // make sure we skip the decommissioned pool
|
||||||
|
NoAuditLog: true,
|
||||||
})
|
})
|
||||||
var failure bool
|
var failure bool
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -858,6 +864,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
|
||||||
// Success keep a count.
|
// Success keep a count.
|
||||||
decommissioned++
|
decommissioned++
|
||||||
}
|
}
|
||||||
|
auditLogDecom(ctx, "DecomCopyDeleteMarker", bi.Name, version.Name, versionID, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -888,6 +895,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
|
||||||
VersionID: versionID,
|
VersionID: versionID,
|
||||||
NoDecryption: true,
|
NoDecryption: true,
|
||||||
NoLock: true,
|
NoLock: true,
|
||||||
|
NoAuditLog: true,
|
||||||
})
|
})
|
||||||
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
|
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
|
||||||
// object deleted by the application, nothing to do here we move on.
|
// object deleted by the application, nothing to do here we move on.
|
||||||
|
@ -941,6 +949,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
|
||||||
ObjectOptions{
|
ObjectOptions{
|
||||||
DeletePrefix: true, // use prefix delete to delete all versions at once.
|
DeletePrefix: true, // use prefix delete to delete all versions at once.
|
||||||
DeletePrefixObject: true, // use prefix delete on exact object (this is an optimization to avoid fan-out calls)
|
DeletePrefixObject: true, // use prefix delete on exact object (this is an optimization to avoid fan-out calls)
|
||||||
|
NoAuditLog: true,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
stopFn(err)
|
stopFn(err)
|
||||||
|
@ -965,6 +974,10 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
|
||||||
// We will perpetually retry listing if it fails, since we cannot
|
// We will perpetually retry listing if it fails, since we cannot
|
||||||
// possibly give up in this matter
|
// possibly give up in this matter
|
||||||
for {
|
for {
|
||||||
|
if contextCanceled(ctx) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
err := set.listObjectsToDecommission(ctx, bi,
|
err := set.listObjectsToDecommission(ctx, bi,
|
||||||
func(entry metaCacheEntry) {
|
func(entry metaCacheEntry) {
|
||||||
wk.Take()
|
wk.Take()
|
||||||
|
|
|
@ -26,17 +26,17 @@ import (
|
||||||
"math"
|
"math"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/dustin/go-humanize"
|
||||||
"github.com/lithammer/shortuuid/v4"
|
"github.com/lithammer/shortuuid/v4"
|
||||||
"github.com/minio/madmin-go/v3"
|
"github.com/minio/madmin-go/v3"
|
||||||
"github.com/minio/minio/internal/hash"
|
"github.com/minio/minio/internal/hash"
|
||||||
xioutil "github.com/minio/minio/internal/ioutil"
|
xioutil "github.com/minio/minio/internal/ioutil"
|
||||||
"github.com/minio/minio/internal/logger"
|
"github.com/minio/minio/internal/logger"
|
||||||
"github.com/minio/pkg/v2/env"
|
"github.com/minio/pkg/v2/env"
|
||||||
|
"github.com/minio/pkg/v2/workers"
|
||||||
)
|
)
|
||||||
|
|
||||||
//go:generate msgp -file $GOFILE -unexported
|
//go:generate msgp -file $GOFILE -unexported
|
||||||
|
@ -485,6 +485,41 @@ func (z *erasureServerPools) checkIfRebalanceDone(poolIdx int) bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (set *erasureObjects) listObjectsToRebalance(ctx context.Context, bucketName string, fn func(entry metaCacheEntry)) error {
|
||||||
|
disks, _ := set.getOnlineDisksWithHealing(false)
|
||||||
|
if len(disks) == 0 {
|
||||||
|
return fmt.Errorf("no online drives found for set with endpoints %s", set.getEndpoints())
|
||||||
|
}
|
||||||
|
|
||||||
|
// However many we ask, versions must exist on ~50%
|
||||||
|
listingQuorum := (set.setDriveCount + 1) / 2
|
||||||
|
|
||||||
|
// How to resolve partial results.
|
||||||
|
resolver := metadataResolutionParams{
|
||||||
|
dirQuorum: listingQuorum, // make sure to capture all quorum ratios
|
||||||
|
objQuorum: listingQuorum, // make sure to capture all quorum ratios
|
||||||
|
bucket: bucketName,
|
||||||
|
}
|
||||||
|
|
||||||
|
err := listPathRaw(ctx, listPathRawOptions{
|
||||||
|
disks: disks,
|
||||||
|
bucket: bucketName,
|
||||||
|
recursive: true,
|
||||||
|
forwardTo: "",
|
||||||
|
minDisks: listingQuorum,
|
||||||
|
reportNotFound: false,
|
||||||
|
agreed: fn,
|
||||||
|
partial: func(entries metaCacheEntries, _ []error) {
|
||||||
|
entry, ok := entries.resolve(&resolver)
|
||||||
|
if ok {
|
||||||
|
fn(*entry)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
finished: nil,
|
||||||
|
})
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// rebalanceBucket rebalances objects under bucket in poolIdx pool
|
// rebalanceBucket rebalances objects under bucket in poolIdx pool
|
||||||
func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string, poolIdx int) error {
|
func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string, poolIdx int) error {
|
||||||
ctx = logger.SetReqInfo(ctx, &logger.ReqInfo{})
|
ctx = logger.SetReqInfo(ctx, &logger.ReqInfo{})
|
||||||
|
@ -496,23 +531,25 @@ func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string,
|
||||||
rcfg, _ := getReplicationConfig(ctx, bucket)
|
rcfg, _ := getReplicationConfig(ctx, bucket)
|
||||||
|
|
||||||
pool := z.serverPools[poolIdx]
|
pool := z.serverPools[poolIdx]
|
||||||
|
|
||||||
const envRebalanceWorkers = "_MINIO_REBALANCE_WORKERS"
|
const envRebalanceWorkers = "_MINIO_REBALANCE_WORKERS"
|
||||||
wStr := env.Get(envRebalanceWorkers, strconv.Itoa(len(pool.sets)))
|
workerSize, err := env.GetInt(envRebalanceWorkers, len(pool.sets))
|
||||||
workerSize, err := strconv.Atoi(wStr)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.LogIf(ctx, fmt.Errorf("invalid %s value: %s err: %v, defaulting to %d", envRebalanceWorkers, wStr, err, len(pool.sets)))
|
logger.LogIf(ctx, fmt.Errorf("invalid workers value err: %v, defaulting to %d", err, len(pool.sets)))
|
||||||
workerSize = len(pool.sets)
|
workerSize = len(pool.sets)
|
||||||
}
|
}
|
||||||
workers := make(chan struct{}, workerSize)
|
|
||||||
var wg sync.WaitGroup
|
// Each decom worker needs one List() goroutine/worker
|
||||||
for _, set := range pool.sets {
|
// add that many extra workers.
|
||||||
|
workerSize += len(pool.sets)
|
||||||
|
|
||||||
|
wk, err := workers.New(workerSize)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for setIdx, set := range pool.sets {
|
||||||
set := set
|
set := set
|
||||||
disks := set.getOnlineDisks()
|
|
||||||
if len(disks) == 0 {
|
|
||||||
logger.LogIf(ctx, fmt.Errorf("no online disks found for set with endpoints %s",
|
|
||||||
set.getEndpoints()))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
filterLifecycle := func(bucket, object string, fi FileInfo) bool {
|
filterLifecycle := func(bucket, object string, fi FileInfo) bool {
|
||||||
if lc == nil {
|
if lc == nil {
|
||||||
|
@ -531,10 +568,7 @@ func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string,
|
||||||
}
|
}
|
||||||
|
|
||||||
rebalanceEntry := func(entry metaCacheEntry) {
|
rebalanceEntry := func(entry metaCacheEntry) {
|
||||||
defer func() {
|
defer wk.Give()
|
||||||
<-workers
|
|
||||||
wg.Done()
|
|
||||||
}()
|
|
||||||
|
|
||||||
if entry.isDir() {
|
if entry.isDir() {
|
||||||
return
|
return
|
||||||
|
@ -592,6 +626,7 @@ func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string,
|
||||||
DeleteReplication: version.ReplicationState,
|
DeleteReplication: version.ReplicationState,
|
||||||
DeleteMarker: true, // make sure we create a delete marker
|
DeleteMarker: true, // make sure we create a delete marker
|
||||||
SkipRebalancing: true, // make sure we skip the decommissioned pool
|
SkipRebalancing: true, // make sure we skip the decommissioned pool
|
||||||
|
NoAuditLog: true,
|
||||||
})
|
})
|
||||||
var failure bool
|
var failure bool
|
||||||
if err != nil && !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
|
if err != nil && !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
|
||||||
|
@ -603,6 +638,7 @@ func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string,
|
||||||
z.updatePoolStats(poolIdx, bucket, version)
|
z.updatePoolStats(poolIdx, bucket, version)
|
||||||
rebalanced++
|
rebalanced++
|
||||||
}
|
}
|
||||||
|
auditLogRebalance(ctx, "Rebalance:DeleteMarker", bucket, version.Name, versionID, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -619,6 +655,7 @@ func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string,
|
||||||
VersionID: versionID,
|
VersionID: versionID,
|
||||||
NoDecryption: true,
|
NoDecryption: true,
|
||||||
NoLock: true,
|
NoLock: true,
|
||||||
|
NoAuditLog: true,
|
||||||
})
|
})
|
||||||
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
|
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
|
||||||
// object deleted by the application, nothing to do here we move on.
|
// object deleted by the application, nothing to do here we move on.
|
||||||
|
@ -663,6 +700,7 @@ func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string,
|
||||||
ObjectOptions{
|
ObjectOptions{
|
||||||
DeletePrefix: true, // use prefix delete to delete all versions at once.
|
DeletePrefix: true, // use prefix delete to delete all versions at once.
|
||||||
DeletePrefixObject: true, // use prefix delete on exact object (this is an optimization to avoid fan-out calls)
|
DeletePrefixObject: true, // use prefix delete on exact object (this is an optimization to avoid fan-out calls)
|
||||||
|
NoAuditLog: true,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
stopFn(err)
|
stopFn(err)
|
||||||
|
@ -673,44 +711,24 @@ func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Add(1)
|
wk.Take()
|
||||||
go func() {
|
go func(setIdx int) {
|
||||||
defer wg.Done()
|
defer wk.Give()
|
||||||
|
err := set.listObjectsToRebalance(ctx, bucket,
|
||||||
listQuorum := (len(disks) + 1) / 2
|
func(entry metaCacheEntry) {
|
||||||
|
wk.Take()
|
||||||
// How to resolve partial results.
|
|
||||||
resolver := metadataResolutionParams{
|
|
||||||
dirQuorum: listQuorum, // make sure to capture all quorum ratios
|
|
||||||
objQuorum: listQuorum, // make sure to capture all quorum ratios
|
|
||||||
bucket: bucket,
|
|
||||||
}
|
|
||||||
err := listPathRaw(ctx, listPathRawOptions{
|
|
||||||
disks: disks,
|
|
||||||
bucket: bucket,
|
|
||||||
recursive: true,
|
|
||||||
forwardTo: "",
|
|
||||||
minDisks: listQuorum,
|
|
||||||
reportNotFound: false,
|
|
||||||
agreed: func(entry metaCacheEntry) {
|
|
||||||
workers <- struct{}{}
|
|
||||||
wg.Add(1)
|
|
||||||
go rebalanceEntry(entry)
|
go rebalanceEntry(entry)
|
||||||
},
|
},
|
||||||
partial: func(entries metaCacheEntries, _ []error) {
|
)
|
||||||
entry, ok := entries.resolve(&resolver)
|
if err == nil || errors.Is(err, context.Canceled) {
|
||||||
if ok {
|
return
|
||||||
workers <- struct{}{}
|
}
|
||||||
wg.Add(1)
|
setN := humanize.Ordinal(setIdx + 1)
|
||||||
go rebalanceEntry(*entry)
|
logger.LogOnceIf(ctx, fmt.Errorf("listing objects from %s set failed with %v", setN, err), "rebalance-listing-failed"+setN)
|
||||||
}
|
}(setIdx)
|
||||||
},
|
|
||||||
finished: nil,
|
|
||||||
})
|
|
||||||
logger.LogIf(ctx, err)
|
|
||||||
}()
|
|
||||||
}
|
}
|
||||||
wg.Wait()
|
|
||||||
|
wk.Wait()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -783,11 +801,12 @@ func (z *erasureServerPools) rebalanceObject(ctx context.Context, bucket string,
|
||||||
res, err := z.NewMultipartUpload(ctx, bucket, oi.Name, ObjectOptions{
|
res, err := z.NewMultipartUpload(ctx, bucket, oi.Name, ObjectOptions{
|
||||||
VersionID: oi.VersionID,
|
VersionID: oi.VersionID,
|
||||||
UserDefined: oi.UserDefined,
|
UserDefined: oi.UserDefined,
|
||||||
|
NoAuditLog: true,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("rebalanceObject: NewMultipartUpload() %w", err)
|
return fmt.Errorf("rebalanceObject: NewMultipartUpload() %w", err)
|
||||||
}
|
}
|
||||||
defer z.AbortMultipartUpload(ctx, bucket, oi.Name, res.UploadID, ObjectOptions{})
|
defer z.AbortMultipartUpload(ctx, bucket, oi.Name, res.UploadID, ObjectOptions{NoAuditLog: true})
|
||||||
|
|
||||||
parts := make([]CompletePart, len(oi.Parts))
|
parts := make([]CompletePart, len(oi.Parts))
|
||||||
for i, part := range oi.Parts {
|
for i, part := range oi.Parts {
|
||||||
|
@ -803,6 +822,7 @@ func (z *erasureServerPools) rebalanceObject(ctx context.Context, bucket string,
|
||||||
IndexCB: func() []byte {
|
IndexCB: func() []byte {
|
||||||
return part.Index // Preserve part Index to ensure decompression works.
|
return part.Index // Preserve part Index to ensure decompression works.
|
||||||
},
|
},
|
||||||
|
NoAuditLog: true,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("rebalanceObject: PutObjectPart() %w", err)
|
return fmt.Errorf("rebalanceObject: PutObjectPart() %w", err)
|
||||||
|
@ -815,6 +835,7 @@ func (z *erasureServerPools) rebalanceObject(ctx context.Context, bucket string,
|
||||||
_, err = z.CompleteMultipartUpload(ctx, bucket, oi.Name, res.UploadID, parts, ObjectOptions{
|
_, err = z.CompleteMultipartUpload(ctx, bucket, oi.Name, res.UploadID, parts, ObjectOptions{
|
||||||
DataMovement: true,
|
DataMovement: true,
|
||||||
MTime: oi.ModTime,
|
MTime: oi.ModTime,
|
||||||
|
NoAuditLog: true,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("rebalanceObject: CompleteMultipartUpload() %w", err)
|
err = fmt.Errorf("rebalanceObject: CompleteMultipartUpload() %w", err)
|
||||||
|
@ -840,6 +861,7 @@ func (z *erasureServerPools) rebalanceObject(ctx context.Context, bucket string,
|
||||||
IndexCB: func() []byte {
|
IndexCB: func() []byte {
|
||||||
return oi.Parts[0].Index // Preserve part Index to ensure decompression works.
|
return oi.Parts[0].Index // Preserve part Index to ensure decompression works.
|
||||||
},
|
},
|
||||||
|
NoAuditLog: true,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("rebalanceObject: PutObject() %w", err)
|
err = fmt.Errorf("rebalanceObject: PutObject() %w", err)
|
||||||
|
|
|
@ -129,6 +129,7 @@ type ObjectOptions struct {
|
||||||
EvalRetentionBypassFn EvalRetentionBypassFn // only set for enforcing retention bypass on DeleteObject.
|
EvalRetentionBypassFn EvalRetentionBypassFn // only set for enforcing retention bypass on DeleteObject.
|
||||||
|
|
||||||
FastGetObjInfo bool // Only for S3 Head/Get Object calls for now
|
FastGetObjInfo bool // Only for S3 Head/Get Object calls for now
|
||||||
|
NoAuditLog bool // Only set for decom, rebalance, to avoid double audits.
|
||||||
}
|
}
|
||||||
|
|
||||||
// WalkOptions provides filtering, marker and other Walk() specific options.
|
// WalkOptions provides filtering, marker and other Walk() specific options.
|
||||||
|
|
19
cmd/utils.go
19
cmd/utils.go
|
@ -984,18 +984,19 @@ func auditLogInternal(ctx context.Context, opts AuditLogOptions) {
|
||||||
entry.API.Bucket = opts.Bucket
|
entry.API.Bucket = opts.Bucket
|
||||||
entry.API.Objects = []pkgAudit.ObjectVersion{{ObjectName: opts.Object, VersionID: opts.VersionID}}
|
entry.API.Objects = []pkgAudit.ObjectVersion{{ObjectName: opts.Object, VersionID: opts.VersionID}}
|
||||||
entry.API.Status = opts.Status
|
entry.API.Status = opts.Status
|
||||||
entry.Tags = opts.Tags
|
if len(opts.Tags) > 0 {
|
||||||
|
entry.Tags = make(map[string]interface{}, len(opts.Tags))
|
||||||
|
for k, v := range opts.Tags {
|
||||||
|
entry.Tags[k] = v
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
entry.Tags = make(map[string]interface{})
|
||||||
|
}
|
||||||
|
|
||||||
// Merge tag information if found - this is currently needed for tags
|
// Merge tag information if found - this is currently needed for tags
|
||||||
// set during decommissioning.
|
// set during decommissioning.
|
||||||
if reqInfo := logger.GetReqInfo(ctx); reqInfo != nil {
|
if reqInfo := logger.GetReqInfo(ctx); reqInfo != nil {
|
||||||
if tags := reqInfo.GetTagsMap(); len(tags) > 0 {
|
reqInfo.PopulateTagsMap(entry.Tags)
|
||||||
if entry.Tags == nil {
|
|
||||||
entry.Tags = make(map[string]interface{}, len(tags))
|
|
||||||
}
|
|
||||||
for k, v := range tags {
|
|
||||||
entry.Tags[k] = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
ctx = logger.SetAuditEntry(ctx, &entry)
|
ctx = logger.SetAuditEntry(ctx, &entry)
|
||||||
logger.AuditLog(ctx, nil, nil, nil)
|
logger.AuditLog(ctx, nil, nil, nil)
|
||||||
|
|
|
@ -41,7 +41,6 @@ var (
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
|
||||||
flag.DurationVar(&less, "less", 0, "goroutine waiting less than the specified time")
|
flag.DurationVar(&less, "less", 0, "goroutine waiting less than the specified time")
|
||||||
flag.DurationVar(&goTime, "time", 0, "goroutine waiting for exactly the specified time")
|
flag.DurationVar(&goTime, "time", 0, "goroutine waiting for exactly the specified time")
|
||||||
flag.DurationVar(&margin, "margin", 0, "margin time")
|
flag.DurationVar(&margin, "margin", 0, "margin time")
|
||||||
|
|
|
@ -26,12 +26,12 @@ import (
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"reflect"
|
"reflect"
|
||||||
"runtime"
|
"runtime"
|
||||||
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/minio/highwayhash"
|
"github.com/minio/highwayhash"
|
||||||
"github.com/minio/madmin-go/v3"
|
"github.com/minio/madmin-go/v3"
|
||||||
"github.com/minio/minio-go/v7/pkg/set"
|
|
||||||
xhttp "github.com/minio/minio/internal/http"
|
xhttp "github.com/minio/minio/internal/http"
|
||||||
"github.com/minio/pkg/v2/logger/message/log"
|
"github.com/minio/pkg/v2/logger/message/log"
|
||||||
)
|
)
|
||||||
|
@ -104,15 +104,32 @@ func RegisterError(f func(string, error, bool) string) {
|
||||||
errorFmtFunc = f
|
errorFmtFunc = f
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove any duplicates and return unique entries.
|
// uniq swaps away duplicate elements in data, returning the size of the
|
||||||
func uniqueEntries(paths []string) []string {
|
// unique set. data is expected to be pre-sorted, and the resulting set in
|
||||||
m := make(set.StringSet)
|
// the range [0:size] will remain in sorted order. Uniq, following a
|
||||||
for _, p := range paths {
|
// sort.Sort call, can be used to prepare arbitrary inputs for use as sets.
|
||||||
if !m.Contains(p) {
|
func uniq(data sort.Interface) (size int) {
|
||||||
m.Add(p)
|
p, l := 0, data.Len()
|
||||||
|
if l <= 1 {
|
||||||
|
return l
|
||||||
|
}
|
||||||
|
for i := 1; i < l; i++ {
|
||||||
|
if !data.Less(p, i) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
p++
|
||||||
|
if p < i {
|
||||||
|
data.Swap(p, i)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return m.ToSlice()
|
return p + 1
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove any duplicates and return unique entries.
|
||||||
|
func uniqueEntries(paths []string) []string {
|
||||||
|
sort.Strings(paths)
|
||||||
|
n := uniq(sort.StringSlice(paths))
|
||||||
|
return paths[:n]
|
||||||
}
|
}
|
||||||
|
|
||||||
// Init sets the trimStrings to possible GOPATHs
|
// Init sets the trimStrings to possible GOPATHs
|
||||||
|
@ -359,9 +376,11 @@ func consoleLogIf(ctx context.Context, err error, errKind ...interface{}) {
|
||||||
if DisableErrorLog {
|
if DisableErrorLog {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if err == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
if consoleTgt != nil {
|
if consoleTgt != nil {
|
||||||
entry := errToEntry(ctx, err, errKind...)
|
consoleTgt.Send(ctx, errToEntry(ctx, err, errKind...))
|
||||||
consoleTgt.Send(ctx, entry)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -134,6 +134,22 @@ func (r *ReqInfo) GetTagsMap() map[string]interface{} {
|
||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PopulateTagsMap - returns the user defined tags in a map structure
|
||||||
|
func (r *ReqInfo) PopulateTagsMap(tagsMap map[string]interface{}) {
|
||||||
|
if r == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if tagsMap == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
r.RLock()
|
||||||
|
defer r.RUnlock()
|
||||||
|
for _, t := range r.tags {
|
||||||
|
tagsMap[t.Key] = t.Val
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// SetReqInfo sets ReqInfo in the context.
|
// SetReqInfo sets ReqInfo in the context.
|
||||||
func SetReqInfo(ctx context.Context, req *ReqInfo) context.Context {
|
func SetReqInfo(ctx context.Context, req *ReqInfo) context.Context {
|
||||||
if ctx == nil {
|
if ctx == nil {
|
||||||
|
|
|
@ -416,7 +416,14 @@ func (h *Target) Send(ctx context.Context, entry interface{}) error {
|
||||||
go h.startHTTPLogger(ctx)
|
go h.startHTTPLogger(ctx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
h.logCh <- entry
|
select {
|
||||||
|
case h.logCh <- entry:
|
||||||
|
case <-ctx.Done():
|
||||||
|
// return error only for context timedout.
|
||||||
|
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
atomic.AddInt64(&h.totalMessages, 1)
|
atomic.AddInt64(&h.totalMessages, 1)
|
||||||
|
|
Loading…
Reference in New Issue