mirror of
https://github.com/minio/minio.git
synced 2025-01-11 23:13:23 -05:00
remove more duplicate bloom filter trackers (#12302)
At some places bloom filter tracker was getting updated for `.minio.sys/tmp` bucket, there is no reason to update bloom filters for those. And add a missing bloom filter update for MakeBucket() Bonus: purge unused function deleteEmptyDir()
This commit is contained in:
parent
ce3d9dc9fa
commit
f1e479d274
@ -506,23 +506,18 @@ func (d *dataUpdateTracker) startCollector(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// markDirty adds the supplied path to the current bloom filter.
|
// markDirty adds the supplied path to the current bloom filter.
|
||||||
func (d *dataUpdateTracker) markDirty(in string) {
|
func (d *dataUpdateTracker) markDirty(bucket, prefix string) {
|
||||||
bucket, _ := path2BucketObjectWithBasePath("", in)
|
|
||||||
dateUpdateTrackerLogPrefix := color.Green("dataUpdateTracker:")
|
dateUpdateTrackerLogPrefix := color.Green("dataUpdateTracker:")
|
||||||
if bucket == "" {
|
if bucket == "" && d.debug {
|
||||||
if d.debug && len(in) > 0 {
|
console.Debugf(dateUpdateTrackerLogPrefix + " no bucket specified\n")
|
||||||
console.Debugf(dateUpdateTrackerLogPrefix+" no bucket (%s)\n", in)
|
|
||||||
}
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if isReservedOrInvalidBucket(bucket, false) {
|
if isReservedOrInvalidBucket(bucket, false) && d.debug {
|
||||||
if d.debug && false {
|
console.Debugf(dateUpdateTrackerLogPrefix+" isReservedOrInvalidBucket: %v, entry: %v\n", bucket, prefix)
|
||||||
console.Debugf(dateUpdateTrackerLogPrefix+" isReservedOrInvalidBucket: %v, entry: %v\n", bucket, in)
|
|
||||||
}
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
split := splitPathDeterministic(in)
|
split := splitPathDeterministic(pathJoin(bucket, prefix))
|
||||||
|
|
||||||
// Add all paths until done.
|
// Add all paths until done.
|
||||||
d.mu.Lock()
|
d.mu.Lock()
|
||||||
@ -679,10 +674,10 @@ type bloomFilterResponse struct {
|
|||||||
Filter []byte
|
Filter []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
// ObjectPathUpdated indicates a path has been updated.
|
// NSUpdated indicates namespace has been updated.
|
||||||
// The function will block until the entry has been picked up.
|
// The function will block until the entry has been picked up.
|
||||||
func ObjectPathUpdated(s string) {
|
func NSUpdated(bucket, prefix string) {
|
||||||
if intDataUpdateTracker != nil {
|
if intDataUpdateTracker != nil {
|
||||||
intDataUpdateTracker.markDirty(s)
|
intDataUpdateTracker.markDirty(bucket, prefix)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -36,6 +36,8 @@ var bucketMetadataOpIgnoredErrs = append(bucketOpIgnoredErrs, errVolumeNotFound)
|
|||||||
|
|
||||||
// MakeBucket - make a bucket.
|
// MakeBucket - make a bucket.
|
||||||
func (er erasureObjects) MakeBucketWithLocation(ctx context.Context, bucket string, opts BucketOptions) error {
|
func (er erasureObjects) MakeBucketWithLocation(ctx context.Context, bucket string, opts BucketOptions) error {
|
||||||
|
defer NSUpdated(bucket, slashSeparator)
|
||||||
|
|
||||||
// Verify if bucket is valid.
|
// Verify if bucket is valid.
|
||||||
if err := s3utils.CheckValidBucketNameStrict(bucket); err != nil {
|
if err := s3utils.CheckValidBucketNameStrict(bucket); err != nil {
|
||||||
return BucketNameInvalid{Bucket: bucket}
|
return BucketNameInvalid{Bucket: bucket}
|
||||||
@ -159,7 +161,8 @@ func deleteDanglingBucket(ctx context.Context, storageDisks []StorageAPI, dErrs
|
|||||||
// DeleteBucket - deletes a bucket.
|
// DeleteBucket - deletes a bucket.
|
||||||
func (er erasureObjects) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error {
|
func (er erasureObjects) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error {
|
||||||
// Collect if all disks report volume not found.
|
// Collect if all disks report volume not found.
|
||||||
defer ObjectPathUpdated(bucket + slashSeparator)
|
defer NSUpdated(bucket, slashSeparator)
|
||||||
|
|
||||||
storageDisks := er.getDisks()
|
storageDisks := er.getDisks()
|
||||||
|
|
||||||
g := errgroup.WithNErrs(len(storageDisks))
|
g := errgroup.WithNErrs(len(storageDisks))
|
||||||
|
@ -37,7 +37,7 @@ import (
|
|||||||
func (er erasureObjects) HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (
|
func (er erasureObjects) HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (
|
||||||
result madmin.HealResultItem, err error) {
|
result madmin.HealResultItem, err error) {
|
||||||
if !opts.DryRun {
|
if !opts.DryRun {
|
||||||
defer ObjectPathUpdated(bucket)
|
defer NSUpdated(bucket, slashSeparator)
|
||||||
}
|
}
|
||||||
|
|
||||||
storageDisks := er.getDisks()
|
storageDisks := er.getDisks()
|
||||||
@ -234,6 +234,9 @@ func shouldHealObjectOnDisk(erErr, dataErr error, meta FileInfo, quorumModTime t
|
|||||||
|
|
||||||
// Heals an object by re-writing corrupt/missing erasure blocks.
|
// Heals an object by re-writing corrupt/missing erasure blocks.
|
||||||
func (er erasureObjects) healObject(ctx context.Context, bucket string, object string, versionID string, opts madmin.HealOpts) (result madmin.HealResultItem, err error) {
|
func (er erasureObjects) healObject(ctx context.Context, bucket string, object string, versionID string, opts madmin.HealOpts) (result madmin.HealResultItem, err error) {
|
||||||
|
if !opts.DryRun {
|
||||||
|
defer NSUpdated(bucket, object)
|
||||||
|
}
|
||||||
|
|
||||||
dryRun := opts.DryRun
|
dryRun := opts.DryRun
|
||||||
scanMode := opts.ScanMode
|
scanMode := opts.ScanMode
|
||||||
@ -376,7 +379,6 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return result, toObjectErr(err, bucket, object, versionID)
|
return result, toObjectErr(err, bucket, object, versionID)
|
||||||
}
|
}
|
||||||
defer ObjectPathUpdated(pathJoin(bucket, object))
|
|
||||||
|
|
||||||
cleanFileInfo := func(fi FileInfo) FileInfo {
|
cleanFileInfo := func(fi FileInfo) FileInfo {
|
||||||
// Returns a copy of the 'fi' with checksums and parts nil'ed.
|
// Returns a copy of the 'fi' with checksums and parts nil'ed.
|
||||||
@ -584,7 +586,7 @@ func (er erasureObjects) healObjectDir(ctx context.Context, bucket, object strin
|
|||||||
}(index, disk)
|
}(index, disk)
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
ObjectPathUpdated(pathJoin(bucket, object))
|
NSUpdated(bucket, object)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -756,8 +756,6 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
|
|||||||
return oi, toObjectErr(errFileParentIsFile, bucket, object)
|
return oi, toObjectErr(errFileParentIsFile, bucket, object)
|
||||||
}
|
}
|
||||||
|
|
||||||
defer ObjectPathUpdated(pathJoin(bucket, object))
|
|
||||||
|
|
||||||
// Calculate s3 compatible md5sum for complete multipart.
|
// Calculate s3 compatible md5sum for complete multipart.
|
||||||
s3MD5 := getCompleteMultipartMD5(parts)
|
s3MD5 := getCompleteMultipartMD5(parts)
|
||||||
|
|
||||||
|
@ -64,7 +64,8 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d
|
|||||||
return oi, NotImplemented{}
|
return oi, NotImplemented{}
|
||||||
}
|
}
|
||||||
|
|
||||||
defer ObjectPathUpdated(pathJoin(dstBucket, dstObject))
|
defer NSUpdated(dstBucket, dstObject)
|
||||||
|
|
||||||
if !dstOpts.NoLock {
|
if !dstOpts.NoLock {
|
||||||
lk := er.NewNSLock(dstBucket, dstObject)
|
lk := er.NewNSLock(dstBucket, dstObject)
|
||||||
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
|
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
|
||||||
@ -526,7 +527,7 @@ func undoRename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry str
|
|||||||
|
|
||||||
// Similar to rename but renames data from srcEntry to dstEntry at dataDir
|
// Similar to rename but renames data from srcEntry to dstEntry at dataDir
|
||||||
func renameData(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry string, metadata []FileInfo, dstBucket, dstEntry string, writeQuorum int) ([]StorageAPI, error) {
|
func renameData(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry string, metadata []FileInfo, dstBucket, dstEntry string, writeQuorum int) ([]StorageAPI, error) {
|
||||||
defer ObjectPathUpdated(pathJoin(dstBucket, dstEntry))
|
defer NSUpdated(dstBucket, dstEntry)
|
||||||
|
|
||||||
g := errgroup.WithNErrs(len(disks))
|
g := errgroup.WithNErrs(len(disks))
|
||||||
|
|
||||||
@ -566,7 +567,7 @@ func rename(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, dstBuc
|
|||||||
dstEntry = retainSlash(dstEntry)
|
dstEntry = retainSlash(dstEntry)
|
||||||
srcEntry = retainSlash(srcEntry)
|
srcEntry = retainSlash(srcEntry)
|
||||||
}
|
}
|
||||||
defer ObjectPathUpdated(pathJoin(dstBucket, dstEntry))
|
defer NSUpdated(dstBucket, dstEntry)
|
||||||
|
|
||||||
g := errgroup.WithNErrs(len(disks))
|
g := errgroup.WithNErrs(len(disks))
|
||||||
|
|
||||||
@ -827,7 +828,6 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (er erasureObjects) deleteObjectVersion(ctx context.Context, bucket, object string, writeQuorum int, fi FileInfo, forceDelMarker bool) error {
|
func (er erasureObjects) deleteObjectVersion(ctx context.Context, bucket, object string, writeQuorum int, fi FileInfo, forceDelMarker bool) error {
|
||||||
defer ObjectPathUpdated(pathJoin(bucket, object))
|
|
||||||
disks := er.getDisks()
|
disks := er.getDisks()
|
||||||
g := errgroup.WithNErrs(len(disks))
|
g := errgroup.WithNErrs(len(disks))
|
||||||
for index := range disks {
|
for index := range disks {
|
||||||
@ -843,37 +843,11 @@ func (er erasureObjects) deleteObjectVersion(ctx context.Context, bucket, object
|
|||||||
return reduceWriteQuorumErrs(ctx, g.Wait(), objectOpIgnoredErrs, writeQuorum)
|
return reduceWriteQuorumErrs(ctx, g.Wait(), objectOpIgnoredErrs, writeQuorum)
|
||||||
}
|
}
|
||||||
|
|
||||||
// deleteEmptyDir knows only how to remove an empty directory (not the empty object with a
|
|
||||||
// trailing slash), this is called for the healing code to remove such directories.
|
|
||||||
func (er erasureObjects) deleteEmptyDir(ctx context.Context, bucket, object string) error {
|
|
||||||
defer ObjectPathUpdated(pathJoin(bucket, object))
|
|
||||||
|
|
||||||
if bucket == minioMetaTmpBucket {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
disks := er.getDisks()
|
|
||||||
g := errgroup.WithNErrs(len(disks))
|
|
||||||
for index := range disks {
|
|
||||||
index := index
|
|
||||||
g.Go(func() error {
|
|
||||||
if disks[index] == nil {
|
|
||||||
return errDiskNotFound
|
|
||||||
}
|
|
||||||
return disks[index].Delete(ctx, bucket, object, false)
|
|
||||||
}, index)
|
|
||||||
}
|
|
||||||
|
|
||||||
// return errors if any during deletion
|
|
||||||
return reduceWriteQuorumErrs(ctx, g.Wait(), objectOpIgnoredErrs, len(disks)/2+1)
|
|
||||||
}
|
|
||||||
|
|
||||||
// deleteObject - wrapper for delete object, deletes an object from
|
// deleteObject - wrapper for delete object, deletes an object from
|
||||||
// all the disks in parallel, including `xl.meta` associated with the
|
// all the disks in parallel, including `xl.meta` associated with the
|
||||||
// object.
|
// object.
|
||||||
func (er erasureObjects) deleteObject(ctx context.Context, bucket, object string, writeQuorum int) error {
|
func (er erasureObjects) deleteObject(ctx context.Context, bucket, object string, writeQuorum int) error {
|
||||||
var err error
|
var err error
|
||||||
defer ObjectPathUpdated(pathJoin(bucket, object))
|
|
||||||
|
|
||||||
disks := er.getDisks()
|
disks := er.getDisks()
|
||||||
tmpObj := mustGetUUID()
|
tmpObj := mustGetUUID()
|
||||||
@ -999,7 +973,7 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec
|
|||||||
}
|
}
|
||||||
|
|
||||||
if errs[objIndex] == nil {
|
if errs[objIndex] == nil {
|
||||||
ObjectPathUpdated(pathJoin(bucket, objects[objIndex].ObjectName))
|
NSUpdated(bucket, objects[objIndex].ObjectName)
|
||||||
}
|
}
|
||||||
|
|
||||||
if versions[objIndex].Deleted {
|
if versions[objIndex].Deleted {
|
||||||
@ -1059,6 +1033,9 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
|
|||||||
return objInfo, gerr
|
return objInfo, gerr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
defer NSUpdated(bucket, object)
|
||||||
|
|
||||||
// Acquire a write lock before deleting the object.
|
// Acquire a write lock before deleting the object.
|
||||||
lk := er.NewNSLock(bucket, object)
|
lk := er.NewNSLock(bucket, object)
|
||||||
lkctx, err := lk.GetLock(ctx, globalDeleteOperationTimeout)
|
lkctx, err := lk.GetLock(ctx, globalDeleteOperationTimeout)
|
||||||
@ -1319,6 +1296,8 @@ func (er erasureObjects) TransitionObject(ctx context.Context, bucket, object st
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
defer NSUpdated(bucket, object)
|
||||||
|
|
||||||
// Acquire write lock before starting to transition the object.
|
// Acquire write lock before starting to transition the object.
|
||||||
lk := er.NewNSLock(bucket, object)
|
lk := er.NewNSLock(bucket, object)
|
||||||
lkctx, err := lk.GetLock(ctx, globalDeleteOperationTimeout)
|
lkctx, err := lk.GetLock(ctx, globalDeleteOperationTimeout)
|
||||||
|
@ -558,7 +558,7 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
|
|||||||
if _, err := fs.statBucketDir(ctx, bucket); err != nil {
|
if _, err := fs.statBucketDir(ctx, bucket); err != nil {
|
||||||
return oi, toObjectErr(err, bucket)
|
return oi, toObjectErr(err, bucket)
|
||||||
}
|
}
|
||||||
defer ObjectPathUpdated(pathutil.Join(bucket, object))
|
defer NSUpdated(bucket, object)
|
||||||
|
|
||||||
uploadIDDir := fs.getUploadIDDir(bucket, object, uploadID)
|
uploadIDDir := fs.getUploadIDDir(bucket, object, uploadID)
|
||||||
// Just check if the uploadID exists to avoid copy if it doesn't.
|
// Just check if the uploadID exists to avoid copy if it doesn't.
|
||||||
|
14
cmd/fs-v1.go
14
cmd/fs-v1.go
@ -407,7 +407,8 @@ func (fs *FSObjects) MakeBucketWithLocation(ctx context.Context, bucket string,
|
|||||||
return BucketNameInvalid{Bucket: bucket}
|
return BucketNameInvalid{Bucket: bucket}
|
||||||
}
|
}
|
||||||
|
|
||||||
defer ObjectPathUpdated(bucket + slashSeparator)
|
defer NSUpdated(bucket, slashSeparator)
|
||||||
|
|
||||||
atomic.AddInt64(&fs.activeIOCount, 1)
|
atomic.AddInt64(&fs.activeIOCount, 1)
|
||||||
defer func() {
|
defer func() {
|
||||||
atomic.AddInt64(&fs.activeIOCount, -1)
|
atomic.AddInt64(&fs.activeIOCount, -1)
|
||||||
@ -553,6 +554,8 @@ func (fs *FSObjects) ListBuckets(ctx context.Context) ([]BucketInfo, error) {
|
|||||||
// DeleteBucket - delete a bucket and all the metadata associated
|
// DeleteBucket - delete a bucket and all the metadata associated
|
||||||
// with the bucket including pending multipart, object metadata.
|
// with the bucket including pending multipart, object metadata.
|
||||||
func (fs *FSObjects) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error {
|
func (fs *FSObjects) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error {
|
||||||
|
defer NSUpdated(bucket, slashSeparator)
|
||||||
|
|
||||||
atomic.AddInt64(&fs.activeIOCount, 1)
|
atomic.AddInt64(&fs.activeIOCount, 1)
|
||||||
defer func() {
|
defer func() {
|
||||||
atomic.AddInt64(&fs.activeIOCount, -1)
|
atomic.AddInt64(&fs.activeIOCount, -1)
|
||||||
@ -606,7 +609,7 @@ func (fs *FSObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBu
|
|||||||
}
|
}
|
||||||
|
|
||||||
cpSrcDstSame := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(dstBucket, dstObject))
|
cpSrcDstSame := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(dstBucket, dstObject))
|
||||||
defer ObjectPathUpdated(path.Join(dstBucket, dstObject))
|
defer NSUpdated(dstBucket, dstObject)
|
||||||
|
|
||||||
if !cpSrcDstSame {
|
if !cpSrcDstSame {
|
||||||
objectDWLock := fs.NewNSLock(dstBucket, dstObject)
|
objectDWLock := fs.NewNSLock(dstBucket, dstObject)
|
||||||
@ -1079,6 +1082,8 @@ func (fs *FSObjects) PutObject(ctx context.Context, bucket string, object string
|
|||||||
return ObjectInfo{}, err
|
return ObjectInfo{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
defer NSUpdated(bucket, object)
|
||||||
|
|
||||||
// Lock the object.
|
// Lock the object.
|
||||||
lk := fs.NewNSLock(bucket, object)
|
lk := fs.NewNSLock(bucket, object)
|
||||||
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
|
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
|
||||||
@ -1088,7 +1093,6 @@ func (fs *FSObjects) PutObject(ctx context.Context, bucket string, object string
|
|||||||
}
|
}
|
||||||
ctx = lkctx.Context()
|
ctx = lkctx.Context()
|
||||||
defer lk.Unlock(lkctx.Cancel)
|
defer lk.Unlock(lkctx.Cancel)
|
||||||
defer ObjectPathUpdated(path.Join(bucket, object))
|
|
||||||
|
|
||||||
atomic.AddInt64(&fs.activeIOCount, 1)
|
atomic.AddInt64(&fs.activeIOCount, 1)
|
||||||
defer func() {
|
defer func() {
|
||||||
@ -1256,6 +1260,8 @@ func (fs *FSObjects) DeleteObject(ctx context.Context, bucket, object string, op
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
defer NSUpdated(bucket, object)
|
||||||
|
|
||||||
// Acquire a write lock before deleting the object.
|
// Acquire a write lock before deleting the object.
|
||||||
lk := fs.NewNSLock(bucket, object)
|
lk := fs.NewNSLock(bucket, object)
|
||||||
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
|
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
|
||||||
@ -1269,8 +1275,6 @@ func (fs *FSObjects) DeleteObject(ctx context.Context, bucket, object string, op
|
|||||||
return objInfo, err
|
return objInfo, err
|
||||||
}
|
}
|
||||||
|
|
||||||
defer ObjectPathUpdated(path.Join(bucket, object))
|
|
||||||
|
|
||||||
atomic.AddInt64(&fs.activeIOCount, 1)
|
atomic.AddInt64(&fs.activeIOCount, 1)
|
||||||
defer func() {
|
defer func() {
|
||||||
atomic.AddInt64(&fs.activeIOCount, -1)
|
atomic.AddInt64(&fs.activeIOCount, -1)
|
||||||
|
Loading…
Reference in New Issue
Block a user