diff --git a/cmd/data-update-tracker.go b/cmd/data-update-tracker.go index fb33e44c3..245e9cc0a 100644 --- a/cmd/data-update-tracker.go +++ b/cmd/data-update-tracker.go @@ -506,23 +506,18 @@ func (d *dataUpdateTracker) startCollector(ctx context.Context) { } // markDirty adds the supplied path to the current bloom filter. -func (d *dataUpdateTracker) markDirty(in string) { - bucket, _ := path2BucketObjectWithBasePath("", in) +func (d *dataUpdateTracker) markDirty(bucket, prefix string) { dateUpdateTrackerLogPrefix := color.Green("dataUpdateTracker:") - if bucket == "" { - if d.debug && len(in) > 0 { - console.Debugf(dateUpdateTrackerLogPrefix+" no bucket (%s)\n", in) - } + if bucket == "" && d.debug { + console.Debugf(dateUpdateTrackerLogPrefix + " no bucket specified\n") return } - if isReservedOrInvalidBucket(bucket, false) { - if d.debug && false { - console.Debugf(dateUpdateTrackerLogPrefix+" isReservedOrInvalidBucket: %v, entry: %v\n", bucket, in) - } + if isReservedOrInvalidBucket(bucket, false) && d.debug { + console.Debugf(dateUpdateTrackerLogPrefix+" isReservedOrInvalidBucket: %v, entry: %v\n", bucket, prefix) return } - split := splitPathDeterministic(in) + split := splitPathDeterministic(pathJoin(bucket, prefix)) // Add all paths until done. d.mu.Lock() @@ -679,10 +674,10 @@ type bloomFilterResponse struct { Filter []byte } -// ObjectPathUpdated indicates a path has been updated. +// NSUpdated indicates namespace has been updated. // The function will block until the entry has been picked up. -func ObjectPathUpdated(s string) { +func NSUpdated(bucket, prefix string) { if intDataUpdateTracker != nil { - intDataUpdateTracker.markDirty(s) + intDataUpdateTracker.markDirty(bucket, prefix) } } diff --git a/cmd/erasure-bucket.go b/cmd/erasure-bucket.go index fcf3fc11c..6876ba5c3 100644 --- a/cmd/erasure-bucket.go +++ b/cmd/erasure-bucket.go @@ -36,6 +36,8 @@ var bucketMetadataOpIgnoredErrs = append(bucketOpIgnoredErrs, errVolumeNotFound) // MakeBucket - make a bucket. func (er erasureObjects) MakeBucketWithLocation(ctx context.Context, bucket string, opts BucketOptions) error { + defer NSUpdated(bucket, slashSeparator) + // Verify if bucket is valid. if err := s3utils.CheckValidBucketNameStrict(bucket); err != nil { return BucketNameInvalid{Bucket: bucket} @@ -159,7 +161,8 @@ func deleteDanglingBucket(ctx context.Context, storageDisks []StorageAPI, dErrs // DeleteBucket - deletes a bucket. func (er erasureObjects) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error { // Collect if all disks report volume not found. - defer ObjectPathUpdated(bucket + slashSeparator) + defer NSUpdated(bucket, slashSeparator) + storageDisks := er.getDisks() g := errgroup.WithNErrs(len(storageDisks)) diff --git a/cmd/erasure-healing.go b/cmd/erasure-healing.go index 077f92f87..5485c48d9 100644 --- a/cmd/erasure-healing.go +++ b/cmd/erasure-healing.go @@ -37,7 +37,7 @@ import ( func (er erasureObjects) HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) ( result madmin.HealResultItem, err error) { if !opts.DryRun { - defer ObjectPathUpdated(bucket) + defer NSUpdated(bucket, slashSeparator) } storageDisks := er.getDisks() @@ -234,6 +234,9 @@ func shouldHealObjectOnDisk(erErr, dataErr error, meta FileInfo, quorumModTime t // Heals an object by re-writing corrupt/missing erasure blocks. func (er erasureObjects) healObject(ctx context.Context, bucket string, object string, versionID string, opts madmin.HealOpts) (result madmin.HealResultItem, err error) { + if !opts.DryRun { + defer NSUpdated(bucket, object) + } dryRun := opts.DryRun scanMode := opts.ScanMode @@ -376,7 +379,6 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s if err != nil { return result, toObjectErr(err, bucket, object, versionID) } - defer ObjectPathUpdated(pathJoin(bucket, object)) cleanFileInfo := func(fi FileInfo) FileInfo { // Returns a copy of the 'fi' with checksums and parts nil'ed. @@ -584,7 +586,7 @@ func (er erasureObjects) healObjectDir(ctx context.Context, bucket, object strin }(index, disk) } wg.Wait() - ObjectPathUpdated(pathJoin(bucket, object)) + NSUpdated(bucket, object) } } diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index 48c0af785..935d68e47 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -756,8 +756,6 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str return oi, toObjectErr(errFileParentIsFile, bucket, object) } - defer ObjectPathUpdated(pathJoin(bucket, object)) - // Calculate s3 compatible md5sum for complete multipart. s3MD5 := getCompleteMultipartMD5(parts) diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 5d5eaa7c0..5aadde4dd 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -64,7 +64,8 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d return oi, NotImplemented{} } - defer ObjectPathUpdated(pathJoin(dstBucket, dstObject)) + defer NSUpdated(dstBucket, dstObject) + if !dstOpts.NoLock { lk := er.NewNSLock(dstBucket, dstObject) lkctx, err := lk.GetLock(ctx, globalOperationTimeout) @@ -526,7 +527,7 @@ func undoRename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry str // Similar to rename but renames data from srcEntry to dstEntry at dataDir func renameData(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry string, metadata []FileInfo, dstBucket, dstEntry string, writeQuorum int) ([]StorageAPI, error) { - defer ObjectPathUpdated(pathJoin(dstBucket, dstEntry)) + defer NSUpdated(dstBucket, dstEntry) g := errgroup.WithNErrs(len(disks)) @@ -566,7 +567,7 @@ func rename(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, dstBuc dstEntry = retainSlash(dstEntry) srcEntry = retainSlash(srcEntry) } - defer ObjectPathUpdated(pathJoin(dstBucket, dstEntry)) + defer NSUpdated(dstBucket, dstEntry) g := errgroup.WithNErrs(len(disks)) @@ -827,7 +828,6 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st } func (er erasureObjects) deleteObjectVersion(ctx context.Context, bucket, object string, writeQuorum int, fi FileInfo, forceDelMarker bool) error { - defer ObjectPathUpdated(pathJoin(bucket, object)) disks := er.getDisks() g := errgroup.WithNErrs(len(disks)) for index := range disks { @@ -843,37 +843,11 @@ func (er erasureObjects) deleteObjectVersion(ctx context.Context, bucket, object return reduceWriteQuorumErrs(ctx, g.Wait(), objectOpIgnoredErrs, writeQuorum) } -// deleteEmptyDir knows only how to remove an empty directory (not the empty object with a -// trailing slash), this is called for the healing code to remove such directories. -func (er erasureObjects) deleteEmptyDir(ctx context.Context, bucket, object string) error { - defer ObjectPathUpdated(pathJoin(bucket, object)) - - if bucket == minioMetaTmpBucket { - return nil - } - - disks := er.getDisks() - g := errgroup.WithNErrs(len(disks)) - for index := range disks { - index := index - g.Go(func() error { - if disks[index] == nil { - return errDiskNotFound - } - return disks[index].Delete(ctx, bucket, object, false) - }, index) - } - - // return errors if any during deletion - return reduceWriteQuorumErrs(ctx, g.Wait(), objectOpIgnoredErrs, len(disks)/2+1) -} - // deleteObject - wrapper for delete object, deletes an object from // all the disks in parallel, including `xl.meta` associated with the // object. func (er erasureObjects) deleteObject(ctx context.Context, bucket, object string, writeQuorum int) error { var err error - defer ObjectPathUpdated(pathJoin(bucket, object)) disks := er.getDisks() tmpObj := mustGetUUID() @@ -999,7 +973,7 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec } if errs[objIndex] == nil { - ObjectPathUpdated(pathJoin(bucket, objects[objIndex].ObjectName)) + NSUpdated(bucket, objects[objIndex].ObjectName) } if versions[objIndex].Deleted { @@ -1059,6 +1033,9 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string return objInfo, gerr } } + + defer NSUpdated(bucket, object) + // Acquire a write lock before deleting the object. lk := er.NewNSLock(bucket, object) lkctx, err := lk.GetLock(ctx, globalDeleteOperationTimeout) @@ -1319,6 +1296,8 @@ func (er erasureObjects) TransitionObject(ctx context.Context, bucket, object st if err != nil { return err } + defer NSUpdated(bucket, object) + // Acquire write lock before starting to transition the object. lk := er.NewNSLock(bucket, object) lkctx, err := lk.GetLock(ctx, globalDeleteOperationTimeout) diff --git a/cmd/fs-v1-multipart.go b/cmd/fs-v1-multipart.go index 37af70141..4e0b40f5e 100644 --- a/cmd/fs-v1-multipart.go +++ b/cmd/fs-v1-multipart.go @@ -558,7 +558,7 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string, if _, err := fs.statBucketDir(ctx, bucket); err != nil { return oi, toObjectErr(err, bucket) } - defer ObjectPathUpdated(pathutil.Join(bucket, object)) + defer NSUpdated(bucket, object) uploadIDDir := fs.getUploadIDDir(bucket, object, uploadID) // Just check if the uploadID exists to avoid copy if it doesn't. diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index 7bcb17b19..d7a14fe27 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -407,7 +407,8 @@ func (fs *FSObjects) MakeBucketWithLocation(ctx context.Context, bucket string, return BucketNameInvalid{Bucket: bucket} } - defer ObjectPathUpdated(bucket + slashSeparator) + defer NSUpdated(bucket, slashSeparator) + atomic.AddInt64(&fs.activeIOCount, 1) defer func() { atomic.AddInt64(&fs.activeIOCount, -1) @@ -553,6 +554,8 @@ func (fs *FSObjects) ListBuckets(ctx context.Context) ([]BucketInfo, error) { // DeleteBucket - delete a bucket and all the metadata associated // with the bucket including pending multipart, object metadata. func (fs *FSObjects) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error { + defer NSUpdated(bucket, slashSeparator) + atomic.AddInt64(&fs.activeIOCount, 1) defer func() { atomic.AddInt64(&fs.activeIOCount, -1) @@ -606,7 +609,7 @@ func (fs *FSObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBu } cpSrcDstSame := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(dstBucket, dstObject)) - defer ObjectPathUpdated(path.Join(dstBucket, dstObject)) + defer NSUpdated(dstBucket, dstObject) if !cpSrcDstSame { objectDWLock := fs.NewNSLock(dstBucket, dstObject) @@ -1079,6 +1082,8 @@ func (fs *FSObjects) PutObject(ctx context.Context, bucket string, object string return ObjectInfo{}, err } + defer NSUpdated(bucket, object) + // Lock the object. lk := fs.NewNSLock(bucket, object) lkctx, err := lk.GetLock(ctx, globalOperationTimeout) @@ -1088,7 +1093,6 @@ func (fs *FSObjects) PutObject(ctx context.Context, bucket string, object string } ctx = lkctx.Context() defer lk.Unlock(lkctx.Cancel) - defer ObjectPathUpdated(path.Join(bucket, object)) atomic.AddInt64(&fs.activeIOCount, 1) defer func() { @@ -1256,6 +1260,8 @@ func (fs *FSObjects) DeleteObject(ctx context.Context, bucket, object string, op } } + defer NSUpdated(bucket, object) + // Acquire a write lock before deleting the object. lk := fs.NewNSLock(bucket, object) lkctx, err := lk.GetLock(ctx, globalOperationTimeout) @@ -1269,8 +1275,6 @@ func (fs *FSObjects) DeleteObject(ctx context.Context, bucket, object string, op return objInfo, err } - defer ObjectPathUpdated(path.Join(bucket, object)) - atomic.AddInt64(&fs.activeIOCount, 1) defer func() { atomic.AddInt64(&fs.activeIOCount, -1)