avoid locks for internal and invalid buckets in MakeBucket() (#16302)

This commit is contained in:
Harshavardhana
2022-12-23 07:46:00 -08:00
committed by GitHub
parent de0b43de32
commit b882310e2b
23 changed files with 122 additions and 146 deletions

View File

@@ -33,6 +33,7 @@ import (
"github.com/dustin/go-humanize"
"github.com/minio/madmin-go/v2"
"github.com/minio/minio-go/v7/pkg/s3utils"
"github.com/minio/minio-go/v7/pkg/set"
"github.com/minio/minio-go/v7/pkg/tags"
"github.com/minio/minio/internal/bucket/lifecycle"
@@ -687,20 +688,29 @@ func (z *erasureServerPools) NSScanner(ctx context.Context, bf *bloomFilter, upd
return firstErr
}
// MakeBucketWithLocation - creates a new bucket across all serverPools simultaneously
// MakeBucket - creates a new bucket across all serverPools simultaneously
// even if one of the sets fail to create buckets, we proceed all the successful
// operations.
func (z *erasureServerPools) MakeBucketWithLocation(ctx context.Context, bucket string, opts MakeBucketOptions) error {
func (z *erasureServerPools) MakeBucket(ctx context.Context, bucket string, opts MakeBucketOptions) error {
defer NSUpdated(bucket, slashSeparator)
g := errgroup.WithNErrs(len(z.serverPools))
// Lock the bucket name before creating.
lk := z.NewNSLock(minioMetaTmpBucket, bucket+".lck")
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
if err != nil {
return err
if !isMinioMetaBucketName(bucket) {
// Verify if bucket is valid.
if err := s3utils.CheckValidBucketNameStrict(bucket); err != nil {
return BucketNameInvalid{Bucket: bucket}
}
// Lock the bucket name before creating.
lk := z.NewNSLock(minioMetaTmpBucket, bucket+".lck")
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
if err != nil {
return err
}
ctx = lkctx.Context()
defer lk.Unlock(lkctx.Cancel)
}
ctx = lkctx.Context()
defer lk.Unlock(lkctx.Cancel)
// Create buckets in parallel across all sets.
for index := range z.serverPools {
@@ -709,7 +719,7 @@ func (z *erasureServerPools) MakeBucketWithLocation(ctx context.Context, bucket
if z.IsSuspended(index) {
return nil
}
return z.serverPools[index].MakeBucketWithLocation(ctx, bucket, opts)
return z.serverPools[index].MakeBucket(ctx, bucket, opts)
}, index)
}
@@ -1644,6 +1654,8 @@ func (z *erasureServerPools) IsTaggingSupported() bool {
// even if one of the serverPools fail to delete buckets, we proceed to
// undo a successful operation.
func (z *erasureServerPools) DeleteBucket(ctx context.Context, bucket string, opts DeleteBucketOptions) error {
defer NSUpdated(bucket, slashSeparator)
g := errgroup.WithNErrs(len(z.serverPools))
// Delete buckets in parallel across all serverPools.
@@ -1724,7 +1736,7 @@ func undoDeleteBucketServerPools(ctx context.Context, bucket string, serverPools
index := index
g.Go(func() error {
if errs[index] == nil {
return serverPools[index].MakeBucketWithLocation(ctx, bucket, MakeBucketOptions{})
return serverPools[index].MakeBucket(ctx, bucket, MakeBucketOptions{})
}
return nil
}, index)