mirror of
https://github.com/minio/minio.git
synced 2025-01-11 15:03:22 -05:00
fix: discarding results do not attempt in-memory metacache writer (#11163)
Optimizations include - do not write the metacache block if the size of the block is '0' and it is the first block - where listing is attempted for a transient prefix, this helps to avoid creating lots of empty metacache entries for `minioMetaBucket` - avoid the entire initialization sequence of cacheCh , metacacheBlockWriter if we are simply going to skip them when discardResults is set to true. - No need to hold write locks while writing metacache blocks - each block is unique, per bucket, per prefix and also is written by a single node.
This commit is contained in:
parent
45ea161f8d
commit
027e17468a
@ -570,8 +570,13 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
|
||||
bucket: bucket,
|
||||
object: entry.name,
|
||||
versionID: "",
|
||||
opts: &madmin.HealOpts{
|
||||
Remove: true,
|
||||
},
|
||||
}, madmin.HealItemObject)
|
||||
logger.LogIf(ctx, err)
|
||||
if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
|
||||
logger.LogIf(ctx, err)
|
||||
}
|
||||
foundObjs = foundObjs || err == nil
|
||||
return
|
||||
}
|
||||
@ -583,8 +588,13 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
|
||||
bucket: bucket,
|
||||
object: fiv.Name,
|
||||
versionID: ver.VersionID,
|
||||
opts: &madmin.HealOpts{
|
||||
Remove: true,
|
||||
},
|
||||
}, madmin.HealItemObject)
|
||||
logger.LogIf(ctx, err)
|
||||
if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
|
||||
logger.LogIf(ctx, err)
|
||||
}
|
||||
foundObjs = foundObjs || err == nil
|
||||
}
|
||||
},
|
||||
|
@ -674,11 +674,13 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
|
||||
return ObjectInfo{}, IncompleteBody{Bucket: bucket, Object: object}
|
||||
}
|
||||
|
||||
lk := er.NewNSLock(bucket, object)
|
||||
if err := lk.GetLock(ctx, globalOperationTimeout); err != nil {
|
||||
return ObjectInfo{}, err
|
||||
if !opts.NoLock {
|
||||
lk := er.NewNSLock(bucket, object)
|
||||
if err := lk.GetLock(ctx, globalOperationTimeout); err != nil {
|
||||
return ObjectInfo{}, err
|
||||
}
|
||||
defer lk.Unlock()
|
||||
}
|
||||
defer lk.Unlock()
|
||||
|
||||
for i, w := range writers {
|
||||
if w == nil {
|
||||
|
@ -130,7 +130,9 @@ func healErasureSet(ctx context.Context, setIndex int, buckets []BucketInfo, dis
|
||||
bucket: minioMetaBucket,
|
||||
object: backendEncryptedFile,
|
||||
}, madmin.HealItemMetadata); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
|
||||
logger.LogIf(ctx, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Heal all buckets with all objects
|
||||
@ -139,7 +141,9 @@ func healErasureSet(ctx context.Context, setIndex int, buckets []BucketInfo, dis
|
||||
if err := bgSeq.queueHealTask(healSource{
|
||||
bucket: bucket.Name,
|
||||
}, madmin.HealItemBucket); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
|
||||
logger.LogIf(ctx, err)
|
||||
}
|
||||
}
|
||||
|
||||
var entryChs []FileInfoVersionsCh
|
||||
@ -179,7 +183,9 @@ func healErasureSet(ctx context.Context, setIndex int, buckets []BucketInfo, dis
|
||||
object: version.Name,
|
||||
versionID: version.VersionID,
|
||||
}, madmin.HealItemObject); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
|
||||
logger.LogIf(ctx, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -615,13 +615,18 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr
|
||||
}
|
||||
|
||||
// Create output for our results.
|
||||
cacheCh := make(chan metaCacheEntry, metacacheBlockSize)
|
||||
var cacheCh chan metaCacheEntry
|
||||
if !o.discardResult {
|
||||
cacheCh = make(chan metaCacheEntry, metacacheBlockSize)
|
||||
}
|
||||
|
||||
// Create filter for results.
|
||||
filterCh := make(chan metaCacheEntry, 100)
|
||||
filteredResults := o.gatherResults(filterCh)
|
||||
closeChannels := func() {
|
||||
close(cacheCh)
|
||||
if !o.discardResult {
|
||||
close(cacheCh)
|
||||
}
|
||||
close(filterCh)
|
||||
}
|
||||
|
||||
@ -657,54 +662,62 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr
|
||||
}()
|
||||
|
||||
const retryDelay = 200 * time.Millisecond
|
||||
const maxTries = 10
|
||||
const maxTries = 5
|
||||
|
||||
// Write results to disk.
|
||||
bw := newMetacacheBlockWriter(cacheCh, func(b *metacacheBlock) error {
|
||||
if o.discardResult {
|
||||
// Don't save single object listings.
|
||||
return nil
|
||||
}
|
||||
o.debugln("listPath: saving block", b.n, "to", o.objectPath(b.n))
|
||||
r, err := hash.NewReader(bytes.NewBuffer(b.data), int64(len(b.data)), "", "", int64(len(b.data)), false)
|
||||
logger.LogIf(ctx, err)
|
||||
custom := b.headerKV()
|
||||
_, err = er.putObject(ctx, minioMetaBucket, o.objectPath(b.n), NewPutObjReader(r, nil, nil), ObjectOptions{UserDefined: custom})
|
||||
if err != nil {
|
||||
metaMu.Lock()
|
||||
if meta.error != "" {
|
||||
meta.status = scanStateError
|
||||
meta.error = err.Error()
|
||||
var bw *metacacheBlockWriter
|
||||
// Don't save single object listings.
|
||||
if !o.discardResult {
|
||||
// Write results to disk.
|
||||
bw = newMetacacheBlockWriter(cacheCh, func(b *metacacheBlock) error {
|
||||
// if the block is 0 bytes and its a first block skip it.
|
||||
// skip only this for Transient caches.
|
||||
if len(b.data) == 0 && b.n == 0 && o.Transient {
|
||||
return nil
|
||||
}
|
||||
metaMu.Unlock()
|
||||
cancel()
|
||||
return err
|
||||
}
|
||||
if b.n == 0 {
|
||||
return nil
|
||||
}
|
||||
// Update block 0 metadata.
|
||||
var retries int
|
||||
for {
|
||||
err := er.updateObjectMeta(ctx, minioMetaBucket, o.objectPath(0), b.headerKV(), ObjectOptions{})
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
switch err.(type) {
|
||||
case ObjectNotFound:
|
||||
return err
|
||||
case InsufficientReadQuorum:
|
||||
default:
|
||||
logger.LogIf(ctx, err)
|
||||
}
|
||||
if retries >= maxTries {
|
||||
o.debugln("listPath: saving block", b.n, "to", o.objectPath(b.n))
|
||||
r, err := hash.NewReader(bytes.NewReader(b.data), int64(len(b.data)), "", "", int64(len(b.data)), false)
|
||||
logger.LogIf(ctx, err)
|
||||
custom := b.headerKV()
|
||||
_, err = er.putObject(ctx, minioMetaBucket, o.objectPath(b.n), NewPutObjReader(r, nil, nil), ObjectOptions{
|
||||
UserDefined: custom,
|
||||
NoLock: true, // No need to hold namespace lock, each prefix caches uniquely.
|
||||
})
|
||||
if err != nil {
|
||||
metaMu.Lock()
|
||||
if meta.error != "" {
|
||||
meta.status = scanStateError
|
||||
meta.error = err.Error()
|
||||
}
|
||||
metaMu.Unlock()
|
||||
cancel()
|
||||
return err
|
||||
}
|
||||
retries++
|
||||
time.Sleep(retryDelay)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if b.n == 0 {
|
||||
return nil
|
||||
}
|
||||
// Update block 0 metadata.
|
||||
var retries int
|
||||
for {
|
||||
err := er.updateObjectMeta(ctx, minioMetaBucket, o.objectPath(0), b.headerKV(), ObjectOptions{})
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
switch err.(type) {
|
||||
case ObjectNotFound:
|
||||
return err
|
||||
case InsufficientReadQuorum:
|
||||
default:
|
||||
logger.LogIf(ctx, err)
|
||||
}
|
||||
if retries >= maxTries {
|
||||
return err
|
||||
}
|
||||
retries++
|
||||
time.Sleep(retryDelay)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// How to resolve results.
|
||||
resolver := metadataResolutionParams{
|
||||
@ -721,14 +734,18 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr
|
||||
filterPrefix: o.FilterPrefix,
|
||||
minDisks: listingQuorum,
|
||||
agreed: func(entry metaCacheEntry) {
|
||||
cacheCh <- entry
|
||||
if !o.discardResult {
|
||||
cacheCh <- entry
|
||||
}
|
||||
filterCh <- entry
|
||||
},
|
||||
partial: func(entries metaCacheEntries, nAgreed int, errs []error) {
|
||||
// Results Disagree :-(
|
||||
entry, ok := entries.resolve(&resolver)
|
||||
if ok {
|
||||
cacheCh <- *entry
|
||||
if !o.discardResult {
|
||||
cacheCh <- *entry
|
||||
}
|
||||
filterCh <- *entry
|
||||
}
|
||||
},
|
||||
@ -749,12 +766,14 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr
|
||||
metaMu.Unlock()
|
||||
|
||||
closeChannels()
|
||||
if err := bw.Close(); err != nil {
|
||||
metaMu.Lock()
|
||||
meta.error = err.Error()
|
||||
meta.status = scanStateError
|
||||
meta, err = o.updateMetacacheListing(meta, rpc)
|
||||
metaMu.Unlock()
|
||||
if !o.discardResult {
|
||||
if err := bw.Close(); err != nil {
|
||||
metaMu.Lock()
|
||||
meta.error = err.Error()
|
||||
meta.status = scanStateError
|
||||
meta, err = o.updateMetacacheListing(meta, rpc)
|
||||
metaMu.Unlock()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -51,6 +51,7 @@ type ObjectOptions struct {
|
||||
DeleteMarkerReplicationStatus string // Is only set in DELETE operations
|
||||
VersionPurgeStatus VersionPurgeStatusType // Is only set in DELETE operations for delete marker version to be permanently deleted.
|
||||
TransitionStatus string // status of the transition
|
||||
NoLock bool // indicates to lower layers if the caller is expecting to hold locks.
|
||||
}
|
||||
|
||||
// BucketOptions represents bucket options for ObjectLayer bucket operations
|
||||
|
Loading…
Reference in New Issue
Block a user