mirror of
https://github.com/minio/minio.git
synced 2024-12-25 06:35:56 -05:00
vectorize cluster-wide calls such as bucket operations (#16313)
This commit is contained in:
parent
72394a8319
commit
f1bbb7fef5
@ -114,8 +114,7 @@ func (a adminAPIHandlers) SRPeerBucketOps(w http.ResponseWriter, r *http.Request
|
|||||||
default:
|
default:
|
||||||
err = errSRInvalidRequest(errInvalidArgument)
|
err = errSRInvalidRequest(errInvalidArgument)
|
||||||
case madmin.MakeWithVersioningBktOp:
|
case madmin.MakeWithVersioningBktOp:
|
||||||
createdAtStr := strings.TrimSpace(r.Form.Get("createdAt"))
|
createdAt, cerr := time.Parse(time.RFC3339Nano, strings.TrimSpace(r.Form.Get("createdAt")))
|
||||||
createdAt, cerr := time.Parse(time.RFC3339Nano, createdAtStr)
|
|
||||||
if cerr != nil {
|
if cerr != nil {
|
||||||
createdAt = timeSentinel
|
createdAt = timeSentinel
|
||||||
}
|
}
|
||||||
@ -132,7 +131,6 @@ func (a adminAPIHandlers) SRPeerBucketOps(w http.ResponseWriter, r *http.Request
|
|||||||
case madmin.DeleteBucketBktOp, madmin.ForceDeleteBucketBktOp:
|
case madmin.DeleteBucketBktOp, madmin.ForceDeleteBucketBktOp:
|
||||||
err = globalSiteReplicationSys.PeerBucketDeleteHandler(ctx, bucket, DeleteBucketOptions{
|
err = globalSiteReplicationSys.PeerBucketDeleteHandler(ctx, bucket, DeleteBucketOptions{
|
||||||
Force: operation == madmin.ForceDeleteBucketBktOp,
|
Force: operation == madmin.ForceDeleteBucketBktOp,
|
||||||
NoRecreate: r.Form.Get("noRecreate") == "true",
|
|
||||||
SRDeleteOp: getSRBucketDeleteOp(true),
|
SRDeleteOp: getSRBucketDeleteOp(true),
|
||||||
})
|
})
|
||||||
case madmin.PurgeDeletedBucketOp:
|
case madmin.PurgeDeletedBucketOp:
|
||||||
|
@ -1332,7 +1332,6 @@ func makeObjectPerfBucket(ctx context.Context, objectAPI ObjectLayer, bucketName
|
|||||||
func deleteObjectPerfBucket(objectAPI ObjectLayer) {
|
func deleteObjectPerfBucket(objectAPI ObjectLayer) {
|
||||||
objectAPI.DeleteBucket(context.Background(), globalObjectPerfBucket, DeleteBucketOptions{
|
objectAPI.DeleteBucket(context.Background(), globalObjectPerfBucket, DeleteBucketOptions{
|
||||||
Force: true,
|
Force: true,
|
||||||
NoRecreate: true,
|
|
||||||
SRDeleteOp: getSRBucketDeleteOp(globalSiteReplicationSys.isEnabled()),
|
SRDeleteOp: getSRBucketDeleteOp(globalSiteReplicationSys.isEnabled()),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -791,8 +791,7 @@ func (api objectAPIHandlers) PutBucketHandler(w http.ResponseWriter, r *http.Req
|
|||||||
|
|
||||||
if err = globalDNSConfig.Put(bucket); err != nil {
|
if err = globalDNSConfig.Put(bucket); err != nil {
|
||||||
objectAPI.DeleteBucket(context.Background(), bucket, DeleteBucketOptions{
|
objectAPI.DeleteBucket(context.Background(), bucket, DeleteBucketOptions{
|
||||||
Force: false,
|
Force: true,
|
||||||
NoRecreate: true,
|
|
||||||
SRDeleteOp: getSRBucketDeleteOp(globalSiteReplicationSys.isEnabled()),
|
SRDeleteOp: getSRBucketDeleteOp(globalSiteReplicationSys.isEnabled()),
|
||||||
})
|
})
|
||||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||||
|
@ -66,7 +66,7 @@ func (sys *BucketMetadataSys) Remove(bucket string) {
|
|||||||
// so they should be replaced atomically and not appended to, etc.
|
// so they should be replaced atomically and not appended to, etc.
|
||||||
// Data is not persisted to disk.
|
// Data is not persisted to disk.
|
||||||
func (sys *BucketMetadataSys) Set(bucket string, meta BucketMetadata) {
|
func (sys *BucketMetadataSys) Set(bucket string, meta BucketMetadata) {
|
||||||
if bucket != minioMetaBucket {
|
if !isMinioMetaBucketName(bucket) {
|
||||||
sys.Lock()
|
sys.Lock()
|
||||||
sys.metadataMap[bucket] = meta
|
sys.metadataMap[bucket] = meta
|
||||||
sys.Unlock()
|
sys.Unlock()
|
||||||
@ -79,7 +79,7 @@ func (sys *BucketMetadataSys) updateAndParse(ctx context.Context, bucket string,
|
|||||||
return updatedAt, errServerNotInitialized
|
return updatedAt, errServerNotInitialized
|
||||||
}
|
}
|
||||||
|
|
||||||
if bucket == minioMetaBucket {
|
if isMinioMetaBucketName(bucket) {
|
||||||
return updatedAt, errInvalidArgument
|
return updatedAt, errInvalidArgument
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -164,7 +164,7 @@ func (sys *BucketMetadataSys) Update(ctx context.Context, bucket string, configF
|
|||||||
// For all other bucket specific metadata, use the relevant
|
// For all other bucket specific metadata, use the relevant
|
||||||
// calls implemented specifically for each of those features.
|
// calls implemented specifically for each of those features.
|
||||||
func (sys *BucketMetadataSys) Get(bucket string) (BucketMetadata, error) {
|
func (sys *BucketMetadataSys) Get(bucket string) (BucketMetadata, error) {
|
||||||
if bucket == minioMetaBucket {
|
if isMinioMetaBucketName(bucket) {
|
||||||
return newBucketMetadata(bucket), errConfigNotFound
|
return newBucketMetadata(bucket), errConfigNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -345,7 +345,7 @@ func (sys *BucketMetadataSys) GetConfig(ctx context.Context, bucket string) (Buc
|
|||||||
return newBucketMetadata(bucket), errServerNotInitialized
|
return newBucketMetadata(bucket), errServerNotInitialized
|
||||||
}
|
}
|
||||||
|
|
||||||
if bucket == minioMetaBucket {
|
if isMinioMetaBucketName(bucket) {
|
||||||
return newBucketMetadata(bucket), errInvalidArgument
|
return newBucketMetadata(bucket), errInvalidArgument
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -135,7 +135,7 @@ func (b *BucketMetadata) SetCreatedAt(createdAt time.Time) {
|
|||||||
func (b *BucketMetadata) Load(ctx context.Context, api ObjectLayer, name string) error {
|
func (b *BucketMetadata) Load(ctx context.Context, api ObjectLayer, name string) error {
|
||||||
if name == "" {
|
if name == "" {
|
||||||
logger.LogIf(ctx, errors.New("bucket name cannot be empty"))
|
logger.LogIf(ctx, errors.New("bucket name cannot be empty"))
|
||||||
return errors.New("bucket name cannot be empty")
|
return errInvalidArgument
|
||||||
}
|
}
|
||||||
configFile := path.Join(bucketMetaPrefix, name, bucketMetadataFile)
|
configFile := path.Join(bucketMetaPrefix, name, bucketMetadataFile)
|
||||||
data, err := readConfig(ctx, api, configFile)
|
data, err := readConfig(ctx, api, configFile)
|
||||||
|
@ -21,7 +21,6 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
|
||||||
"github.com/minio/minio/internal/logger"
|
|
||||||
"github.com/minio/minio/internal/sync/errgroup"
|
"github.com/minio/minio/internal/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -31,169 +30,6 @@ var bucketOpIgnoredErrs = append(baseIgnoredErrs, errDiskAccessDenied, errUnform
|
|||||||
// list all errors that can be ignored in a bucket metadata operation.
|
// list all errors that can be ignored in a bucket metadata operation.
|
||||||
var bucketMetadataOpIgnoredErrs = append(bucketOpIgnoredErrs, errVolumeNotFound)
|
var bucketMetadataOpIgnoredErrs = append(bucketOpIgnoredErrs, errVolumeNotFound)
|
||||||
|
|
||||||
// Bucket operations
|
|
||||||
|
|
||||||
// MakeBucket - make a bucket.
|
|
||||||
func (er erasureObjects) MakeBucket(ctx context.Context, bucket string, opts MakeBucketOptions) error {
|
|
||||||
storageDisks := er.getDisks()
|
|
||||||
|
|
||||||
g := errgroup.WithNErrs(len(storageDisks))
|
|
||||||
|
|
||||||
// Make a volume entry on all underlying storage disks.
|
|
||||||
for index := range storageDisks {
|
|
||||||
index := index
|
|
||||||
g.Go(func() error {
|
|
||||||
if storageDisks[index] != nil {
|
|
||||||
if err := storageDisks[index].MakeVol(ctx, bucket); err != nil {
|
|
||||||
if opts.ForceCreate && errors.Is(err, errVolumeExists) {
|
|
||||||
// No need to return error when force create was
|
|
||||||
// requested.
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if !errors.Is(err, errVolumeExists) {
|
|
||||||
logger.LogIf(ctx, err)
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return errDiskNotFound
|
|
||||||
}, index)
|
|
||||||
}
|
|
||||||
|
|
||||||
err := reduceWriteQuorumErrs(ctx, g.Wait(), bucketOpIgnoredErrs, er.defaultWQuorum())
|
|
||||||
return toObjectErr(err, bucket)
|
|
||||||
}
|
|
||||||
|
|
||||||
func undoDeleteBucket(storageDisks []StorageAPI, bucket string) {
|
|
||||||
g := errgroup.WithNErrs(len(storageDisks))
|
|
||||||
// Undo previous make bucket entry on all underlying storage disks.
|
|
||||||
for index := range storageDisks {
|
|
||||||
if storageDisks[index] == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
index := index
|
|
||||||
g.Go(func() error {
|
|
||||||
_ = storageDisks[index].MakeVol(context.Background(), bucket)
|
|
||||||
return nil
|
|
||||||
}, index)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for all make vol to finish.
|
|
||||||
g.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
// getBucketInfo - returns the BucketInfo from one of the load balanced disks.
|
|
||||||
func (er erasureObjects) getBucketInfo(ctx context.Context, bucketName string, opts BucketOptions) (bucketInfo BucketInfo, err error) {
|
|
||||||
storageDisks := er.getDisks()
|
|
||||||
|
|
||||||
g := errgroup.WithNErrs(len(storageDisks))
|
|
||||||
bucketsInfo := make([]BucketInfo, len(storageDisks))
|
|
||||||
// Undo previous make bucket entry on all underlying storage disks.
|
|
||||||
for index := range storageDisks {
|
|
||||||
index := index
|
|
||||||
g.Go(func() error {
|
|
||||||
if storageDisks[index] == nil {
|
|
||||||
return errDiskNotFound
|
|
||||||
}
|
|
||||||
volInfo, err := storageDisks[index].StatVol(ctx, bucketName)
|
|
||||||
if err != nil {
|
|
||||||
if opts.Deleted {
|
|
||||||
dvi, derr := storageDisks[index].StatVol(ctx, pathJoin(minioMetaBucket, bucketMetaPrefix, deletedBucketsPrefix, bucketName))
|
|
||||||
if derr != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
bucketsInfo[index] = BucketInfo{Name: bucketName, Deleted: dvi.Created}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
bucketsInfo[index] = BucketInfo{Name: volInfo.Name, Created: volInfo.Created}
|
|
||||||
return nil
|
|
||||||
}, index)
|
|
||||||
}
|
|
||||||
|
|
||||||
errs := g.Wait()
|
|
||||||
|
|
||||||
for i, err := range errs {
|
|
||||||
if err == nil {
|
|
||||||
return bucketsInfo[i], nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// If all our errors were ignored, then we try to
|
|
||||||
// reduce to one error based on read quorum.
|
|
||||||
// `nil` is deliberately passed for ignoredErrs
|
|
||||||
// because these errors were already ignored.
|
|
||||||
return BucketInfo{}, reduceReadQuorumErrs(ctx, errs, nil, er.defaultRQuorum())
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetBucketInfo - returns BucketInfo for a bucket.
|
|
||||||
func (er erasureObjects) GetBucketInfo(ctx context.Context, bucket string, opts BucketOptions) (bi BucketInfo, e error) {
|
|
||||||
bucketInfo, err := er.getBucketInfo(ctx, bucket, opts)
|
|
||||||
if err != nil {
|
|
||||||
return bi, toObjectErr(err, bucket)
|
|
||||||
}
|
|
||||||
return bucketInfo, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// DeleteBucket - deletes a bucket.
|
|
||||||
func (er erasureObjects) DeleteBucket(ctx context.Context, bucket string, opts DeleteBucketOptions) error {
|
|
||||||
storageDisks := er.getDisks()
|
|
||||||
|
|
||||||
g := errgroup.WithNErrs(len(storageDisks))
|
|
||||||
|
|
||||||
for index := range storageDisks {
|
|
||||||
index := index
|
|
||||||
g.Go(func() error {
|
|
||||||
if storageDisks[index] != nil {
|
|
||||||
return storageDisks[index].DeleteVol(ctx, bucket, opts.Force)
|
|
||||||
}
|
|
||||||
return errDiskNotFound
|
|
||||||
}, index)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for all the delete vols to finish.
|
|
||||||
dErrs := g.Wait()
|
|
||||||
|
|
||||||
if opts.Force {
|
|
||||||
for _, err := range dErrs {
|
|
||||||
if err != nil {
|
|
||||||
undoDeleteBucket(storageDisks, bucket)
|
|
||||||
return toObjectErr(err, bucket)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
err := reduceWriteQuorumErrs(ctx, dErrs, bucketOpIgnoredErrs, er.defaultWQuorum())
|
|
||||||
if err == errErasureWriteQuorum && !opts.NoRecreate {
|
|
||||||
undoDeleteBucket(storageDisks, bucket)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err == nil || errors.Is(err, errVolumeNotFound) {
|
|
||||||
var purgedDangling bool
|
|
||||||
// At this point we have `err == nil` but some errors might be `errVolumeNotEmpty`
|
|
||||||
// we should proceed to attempt a force delete of such buckets.
|
|
||||||
for index, err := range dErrs {
|
|
||||||
if err == errVolumeNotEmpty && storageDisks[index] != nil {
|
|
||||||
storageDisks[index].RenameFile(ctx, bucket, "", minioMetaTmpDeletedBucket, mustGetUUID())
|
|
||||||
purgedDangling = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// if we purged dangling buckets, ignore errVolumeNotFound error.
|
|
||||||
if purgedDangling {
|
|
||||||
err = nil
|
|
||||||
}
|
|
||||||
if opts.SRDeleteOp == MarkDelete {
|
|
||||||
er.markDelete(ctx, minioMetaBucket, pathJoin(bucketMetaPrefix, deletedBucketsPrefix, bucket))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return toObjectErr(err, bucket)
|
|
||||||
}
|
|
||||||
|
|
||||||
// markDelete creates a vol entry in .minio.sys/buckets/.deleted until site replication
|
// markDelete creates a vol entry in .minio.sys/buckets/.deleted until site replication
|
||||||
// syncs the delete to peers
|
// syncs the delete to peers
|
||||||
func (er erasureObjects) markDelete(ctx context.Context, bucket, prefix string) error {
|
func (er erasureObjects) markDelete(ctx context.Context, bucket, prefix string) error {
|
||||||
|
@ -214,22 +214,19 @@ func TestDeleteObjectsVersioned(t *testing.T) {
|
|||||||
func TestErasureDeleteObjectsErasureSet(t *testing.T) {
|
func TestErasureDeleteObjectsErasureSet(t *testing.T) {
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
var objs []*erasureObjects
|
|
||||||
for i := 0; i < 32; i++ {
|
obj, fsDirs, err := prepareErasureSets32(ctx)
|
||||||
obj, fsDirs, err := prepareErasure(ctx, 16)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal("Unable to initialize 'Erasure' object layer.", err)
|
t.Fatal("Unable to initialize 'Erasure' object layer.", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
setObjectLayer(obj)
|
||||||
|
initConfigSubsystem(ctx, obj)
|
||||||
|
|
||||||
// Remove all dirs.
|
// Remove all dirs.
|
||||||
for _, dir := range fsDirs {
|
for _, dir := range fsDirs {
|
||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
}
|
}
|
||||||
z := obj.(*erasureServerPools)
|
|
||||||
xl := z.serverPools[0].sets[0]
|
|
||||||
objs = append(objs, xl)
|
|
||||||
}
|
|
||||||
|
|
||||||
erasureSets := &erasureSets{sets: objs, distributionAlgo: "CRCMOD"}
|
|
||||||
|
|
||||||
type testCaseType struct {
|
type testCaseType struct {
|
||||||
bucket string
|
bucket string
|
||||||
@ -244,13 +241,12 @@ func TestErasureDeleteObjectsErasureSet(t *testing.T) {
|
|||||||
{bucketName, "obj_4"},
|
{bucketName, "obj_4"},
|
||||||
}
|
}
|
||||||
|
|
||||||
err := erasureSets.MakeBucket(ctx, bucketName, MakeBucketOptions{})
|
if err = obj.MakeBucket(ctx, bucketName, MakeBucketOptions{}); err != nil {
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, testCase := range testCases {
|
for _, testCase := range testCases {
|
||||||
_, err = erasureSets.PutObject(ctx, testCase.bucket, testCase.object,
|
_, err = obj.PutObject(ctx, testCase.bucket, testCase.object,
|
||||||
mustGetPutObjReader(t, bytes.NewReader([]byte("abcd")), int64(len("abcd")), "", ""), ObjectOptions{})
|
mustGetPutObjReader(t, bytes.NewReader([]byte("abcd")), int64(len("abcd")), "", ""), ObjectOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Erasure Object upload failed: <ERROR> %s", err)
|
t.Fatalf("Erasure Object upload failed: <ERROR> %s", err)
|
||||||
@ -270,7 +266,7 @@ func TestErasureDeleteObjectsErasureSet(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
objectNames := toObjectNames(testCases)
|
objectNames := toObjectNames(testCases)
|
||||||
_, delErrs := erasureSets.DeleteObjects(ctx, bucketName, objectNames, ObjectOptions{})
|
_, delErrs := obj.DeleteObjects(ctx, bucketName, objectNames, ObjectOptions{})
|
||||||
|
|
||||||
for i := range delErrs {
|
for i := range delErrs {
|
||||||
if delErrs[i] != nil {
|
if delErrs[i] != nil {
|
||||||
@ -279,7 +275,7 @@ func TestErasureDeleteObjectsErasureSet(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, test := range testCases {
|
for _, test := range testCases {
|
||||||
_, statErr := erasureSets.GetObjectInfo(ctx, test.bucket, test.object, ObjectOptions{})
|
_, statErr := obj.GetObjectInfo(ctx, test.bucket, test.object, ObjectOptions{})
|
||||||
switch statErr.(type) {
|
switch statErr.(type) {
|
||||||
case ObjectNotFound:
|
case ObjectNotFound:
|
||||||
default:
|
default:
|
||||||
|
@ -61,6 +61,8 @@ type erasureServerPools struct {
|
|||||||
|
|
||||||
// Active decommission canceler
|
// Active decommission canceler
|
||||||
decommissionCancelers []context.CancelFunc
|
decommissionCancelers []context.CancelFunc
|
||||||
|
|
||||||
|
s3Peer *S3PeerSys
|
||||||
}
|
}
|
||||||
|
|
||||||
func (z *erasureServerPools) SinglePool() bool {
|
func (z *erasureServerPools) SinglePool() bool {
|
||||||
@ -79,6 +81,7 @@ func newErasureServerPools(ctx context.Context, endpointServerPools EndpointServ
|
|||||||
storageDisks = make([][]StorageAPI, len(endpointServerPools))
|
storageDisks = make([][]StorageAPI, len(endpointServerPools))
|
||||||
z = &erasureServerPools{
|
z = &erasureServerPools{
|
||||||
serverPools: make([]*erasureSets, len(endpointServerPools)),
|
serverPools: make([]*erasureSets, len(endpointServerPools)),
|
||||||
|
s3Peer: NewS3PeerSys(endpointServerPools),
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -710,14 +713,13 @@ func (z *erasureServerPools) NSScanner(ctx context.Context, bf *bloomFilter, upd
|
|||||||
func (z *erasureServerPools) MakeBucket(ctx context.Context, bucket string, opts MakeBucketOptions) error {
|
func (z *erasureServerPools) MakeBucket(ctx context.Context, bucket string, opts MakeBucketOptions) error {
|
||||||
defer NSUpdated(bucket, slashSeparator)
|
defer NSUpdated(bucket, slashSeparator)
|
||||||
|
|
||||||
g := errgroup.WithNErrs(len(z.serverPools))
|
|
||||||
|
|
||||||
if !isMinioMetaBucketName(bucket) {
|
|
||||||
// Verify if bucket is valid.
|
// Verify if bucket is valid.
|
||||||
|
if !isMinioMetaBucketName(bucket) {
|
||||||
if err := s3utils.CheckValidBucketNameStrict(bucket); err != nil {
|
if err := s3utils.CheckValidBucketNameStrict(bucket); err != nil {
|
||||||
return BucketNameInvalid{Bucket: bucket}
|
return BucketNameInvalid{Bucket: bucket}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !opts.NoLock {
|
||||||
// Lock the bucket name before creating.
|
// Lock the bucket name before creating.
|
||||||
lk := z.NewNSLock(minioMetaTmpBucket, bucket+".lck")
|
lk := z.NewNSLock(minioMetaTmpBucket, bucket+".lck")
|
||||||
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
|
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
|
||||||
@ -728,32 +730,18 @@ func (z *erasureServerPools) MakeBucket(ctx context.Context, bucket string, opts
|
|||||||
ctx = lkctx.Context()
|
ctx = lkctx.Context()
|
||||||
defer lk.Unlock(lkctx)
|
defer lk.Unlock(lkctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create buckets in parallel across all sets.
|
|
||||||
for index := range z.serverPools {
|
|
||||||
index := index
|
|
||||||
g.Go(func() error {
|
|
||||||
if z.IsSuspended(index) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return z.serverPools[index].MakeBucket(ctx, bucket, opts)
|
|
||||||
}, index)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
errs := g.Wait()
|
if err := z.s3Peer.MakeBucket(ctx, bucket, opts); err != nil {
|
||||||
// Return the first encountered error
|
|
||||||
for _, err := range errs {
|
|
||||||
if err != nil {
|
|
||||||
if _, ok := err.(BucketExists); !ok {
|
if _, ok := err.(BucketExists); !ok {
|
||||||
// Delete created buckets, ignoring errors.
|
// Delete created buckets, ignoring errors.
|
||||||
z.DeleteBucket(context.Background(), bucket, DeleteBucketOptions{
|
z.DeleteBucket(context.Background(), bucket, DeleteBucketOptions{
|
||||||
Force: false,
|
NoLock: true,
|
||||||
NoRecreate: true,
|
NoRecreate: true,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// If it doesn't exist we get a new, so ignore errors
|
// If it doesn't exist we get a new, so ignore errors
|
||||||
meta := newBucketMetadata(bucket)
|
meta := newBucketMetadata(bucket)
|
||||||
@ -1609,11 +1597,11 @@ func (z *erasureServerPools) CompleteMultipartUpload(ctx context.Context, bucket
|
|||||||
|
|
||||||
// GetBucketInfo - returns bucket info from one of the erasure coded serverPools.
|
// GetBucketInfo - returns bucket info from one of the erasure coded serverPools.
|
||||||
func (z *erasureServerPools) GetBucketInfo(ctx context.Context, bucket string, opts BucketOptions) (bucketInfo BucketInfo, err error) {
|
func (z *erasureServerPools) GetBucketInfo(ctx context.Context, bucket string, opts BucketOptions) (bucketInfo BucketInfo, err error) {
|
||||||
if z.SinglePool() {
|
bucketInfo, err = z.s3Peer.GetBucketInfo(ctx, bucket, opts)
|
||||||
bucketInfo, err = z.serverPools[0].GetBucketInfo(ctx, bucket, opts)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return bucketInfo, err
|
return bucketInfo, toObjectErr(err, bucket)
|
||||||
}
|
}
|
||||||
|
|
||||||
meta, err := globalBucketMetadataSys.Get(bucket)
|
meta, err := globalBucketMetadataSys.Get(bucket)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
bucketInfo.Created = meta.Created
|
bucketInfo.Created = meta.Created
|
||||||
@ -1622,26 +1610,6 @@ func (z *erasureServerPools) GetBucketInfo(ctx context.Context, bucket string, o
|
|||||||
}
|
}
|
||||||
return bucketInfo, nil
|
return bucketInfo, nil
|
||||||
}
|
}
|
||||||
for _, pool := range z.serverPools {
|
|
||||||
bucketInfo, err = pool.GetBucketInfo(ctx, bucket, opts)
|
|
||||||
if err != nil {
|
|
||||||
if isErrBucketNotFound(err) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
return bucketInfo, err
|
|
||||||
}
|
|
||||||
meta, err := globalBucketMetadataSys.Get(bucket)
|
|
||||||
if err == nil {
|
|
||||||
bucketInfo.Created = meta.Created
|
|
||||||
bucketInfo.Versioning = meta.LockEnabled || globalBucketVersioningSys.Enabled(bucket)
|
|
||||||
bucketInfo.ObjectLocking = meta.LockEnabled
|
|
||||||
}
|
|
||||||
return bucketInfo, nil
|
|
||||||
}
|
|
||||||
return bucketInfo, BucketNotFound{
|
|
||||||
Bucket: bucket,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// IsNotificationSupported returns whether bucket notification is applicable for this layer.
|
// IsNotificationSupported returns whether bucket notification is applicable for this layer.
|
||||||
func (z *erasureServerPools) IsNotificationSupported() bool {
|
func (z *erasureServerPools) IsNotificationSupported() bool {
|
||||||
@ -1671,43 +1639,48 @@ func (z *erasureServerPools) IsTaggingSupported() bool {
|
|||||||
// even if one of the serverPools fail to delete buckets, we proceed to
|
// even if one of the serverPools fail to delete buckets, we proceed to
|
||||||
// undo a successful operation.
|
// undo a successful operation.
|
||||||
func (z *erasureServerPools) DeleteBucket(ctx context.Context, bucket string, opts DeleteBucketOptions) error {
|
func (z *erasureServerPools) DeleteBucket(ctx context.Context, bucket string, opts DeleteBucketOptions) error {
|
||||||
|
if isMinioMetaBucketName(bucket) {
|
||||||
|
return BucketNameInvalid{Bucket: bucket}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify if bucket is valid.
|
||||||
|
if err := s3utils.CheckValidBucketName(bucket); err != nil {
|
||||||
|
return BucketNameInvalid{Bucket: bucket}
|
||||||
|
}
|
||||||
|
|
||||||
defer NSUpdated(bucket, slashSeparator)
|
defer NSUpdated(bucket, slashSeparator)
|
||||||
|
if !opts.NoLock {
|
||||||
g := errgroup.WithNErrs(len(z.serverPools))
|
// Lock the bucket name before creating.
|
||||||
|
lk := z.NewNSLock(minioMetaTmpBucket, bucket+".lck")
|
||||||
// Delete buckets in parallel across all serverPools.
|
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
|
||||||
for index := range z.serverPools {
|
|
||||||
index := index
|
|
||||||
g.Go(func() error {
|
|
||||||
if z.IsSuspended(index) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return z.serverPools[index].DeleteBucket(ctx, bucket, opts)
|
|
||||||
}, index)
|
|
||||||
}
|
|
||||||
|
|
||||||
errs := g.Wait()
|
|
||||||
|
|
||||||
// For any write quorum failure, we undo all the delete
|
|
||||||
// buckets operation by creating all the buckets again.
|
|
||||||
for _, err := range errs {
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !z.SinglePool() && !opts.NoRecreate {
|
|
||||||
undoDeleteBucketServerPools(context.Background(), bucket, z.serverPools, errs)
|
|
||||||
}
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
ctx = lkctx.Context()
|
||||||
|
defer lk.Unlock(lkctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Purge the entire bucket metadata entirely.
|
err := z.s3Peer.DeleteBucket(ctx, bucket, opts)
|
||||||
z.deleteAll(context.Background(), minioMetaBucket, pathJoin(bucketMetaPrefix, bucket))
|
if err == nil || errors.Is(err, errVolumeNotFound) {
|
||||||
// If site replication is configured, hold on to deleted bucket state until sites sync
|
// If site replication is configured, hold on to deleted bucket state until sites sync
|
||||||
switch opts.SRDeleteOp {
|
switch opts.SRDeleteOp {
|
||||||
case MarkDelete:
|
case MarkDelete:
|
||||||
z.markDelete(context.Background(), minioMetaBucket, pathJoin(bucketMetaPrefix, deletedBucketsPrefix, bucket))
|
z.markDelete(context.Background(), minioMetaBucket, pathJoin(bucketMetaPrefix, deletedBucketsPrefix, bucket))
|
||||||
}
|
}
|
||||||
// Success.
|
}
|
||||||
return nil
|
|
||||||
|
if err != nil && !errors.Is(err, errVolumeNotFound) {
|
||||||
|
if !opts.NoRecreate {
|
||||||
|
z.s3Peer.MakeBucket(ctx, bucket, MakeBucketOptions{})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
// Purge the entire bucket metadata entirely.
|
||||||
|
z.deleteAll(context.Background(), minioMetaBucket, pathJoin(bucketMetaPrefix, bucket))
|
||||||
|
}
|
||||||
|
|
||||||
|
return toObjectErr(err, bucket)
|
||||||
}
|
}
|
||||||
|
|
||||||
// deleteAll will rename bucket+prefix unconditionally across all disks to
|
// deleteAll will rename bucket+prefix unconditionally across all disks to
|
||||||
@ -1744,31 +1717,10 @@ func (z *erasureServerPools) purgeDelete(ctx context.Context, bucket, prefix str
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// This function is used to undo a successful DeleteBucket operation.
|
|
||||||
func undoDeleteBucketServerPools(ctx context.Context, bucket string, serverPools []*erasureSets, errs []error) {
|
|
||||||
g := errgroup.WithNErrs(len(serverPools))
|
|
||||||
|
|
||||||
// Undo previous delete bucket on all underlying serverPools.
|
|
||||||
for index := range serverPools {
|
|
||||||
index := index
|
|
||||||
g.Go(func() error {
|
|
||||||
if errs[index] == nil {
|
|
||||||
return serverPools[index].MakeBucket(ctx, bucket, MakeBucketOptions{})
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}, index)
|
|
||||||
}
|
|
||||||
|
|
||||||
g.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
// List all buckets from one of the serverPools, we are not doing merge
|
// List all buckets from one of the serverPools, we are not doing merge
|
||||||
// sort here just for simplification. As per design it is assumed
|
// sort here just for simplification. As per design it is assumed
|
||||||
// that all buckets are present on all serverPools.
|
// that all buckets are present on all serverPools.
|
||||||
func (z *erasureServerPools) ListBuckets(ctx context.Context, opts BucketOptions) (buckets []BucketInfo, err error) {
|
func (z *erasureServerPools) ListBuckets(ctx context.Context, opts BucketOptions) (buckets []BucketInfo, err error) {
|
||||||
if z.SinglePool() {
|
|
||||||
buckets, err = z.serverPools[0].ListBuckets(ctx, opts)
|
|
||||||
} else {
|
|
||||||
for idx, pool := range z.serverPools {
|
for idx, pool := range z.serverPools {
|
||||||
if z.IsSuspended(idx) {
|
if z.IsSuspended(idx) {
|
||||||
continue
|
continue
|
||||||
@ -1780,7 +1732,6 @@ func (z *erasureServerPools) ListBuckets(ctx context.Context, opts BucketOptions
|
|||||||
}
|
}
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -677,32 +677,6 @@ func (s *erasureSets) Shutdown(ctx context.Context) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// MakeBucketLocation - creates a new bucket across all sets simultaneously,
|
|
||||||
// then return the first encountered error
|
|
||||||
func (s *erasureSets) MakeBucket(ctx context.Context, bucket string, opts MakeBucketOptions) error {
|
|
||||||
g := errgroup.WithNErrs(len(s.sets))
|
|
||||||
|
|
||||||
// Create buckets in parallel across all sets.
|
|
||||||
for index := range s.sets {
|
|
||||||
index := index
|
|
||||||
g.Go(func() error {
|
|
||||||
return s.sets[index].MakeBucket(ctx, bucket, opts)
|
|
||||||
}, index)
|
|
||||||
}
|
|
||||||
|
|
||||||
errs := g.Wait()
|
|
||||||
|
|
||||||
// Return the first encountered error
|
|
||||||
for _, err := range errs {
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Success.
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// hashes the key returning an integer based on the input algorithm.
|
// hashes the key returning an integer based on the input algorithm.
|
||||||
// This function currently supports
|
// This function currently supports
|
||||||
// - CRCMOD
|
// - CRCMOD
|
||||||
@ -749,11 +723,6 @@ func (s *erasureSets) getHashedSet(input string) (set *erasureObjects) {
|
|||||||
return s.sets[s.getHashedSetIndex(input)]
|
return s.sets[s.getHashedSetIndex(input)]
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetBucketInfo - returns bucket info from one of the erasure coded set.
|
|
||||||
func (s *erasureSets) GetBucketInfo(ctx context.Context, bucket string, opts BucketOptions) (bucketInfo BucketInfo, err error) {
|
|
||||||
return s.getHashedSet("").GetBucketInfo(ctx, bucket, opts)
|
|
||||||
}
|
|
||||||
|
|
||||||
// IsNotificationSupported returns whether bucket notification is applicable for this layer.
|
// IsNotificationSupported returns whether bucket notification is applicable for this layer.
|
||||||
func (s *erasureSets) IsNotificationSupported() bool {
|
func (s *erasureSets) IsNotificationSupported() bool {
|
||||||
return s.getHashedSet("").IsNotificationSupported()
|
return s.getHashedSet("").IsNotificationSupported()
|
||||||
@ -778,52 +747,6 @@ func (s *erasureSets) IsTaggingSupported() bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteBucket - deletes a bucket on all sets simultaneously,
|
|
||||||
// even if one of the sets fail to delete buckets, we proceed to
|
|
||||||
// undo a successful operation.
|
|
||||||
func (s *erasureSets) DeleteBucket(ctx context.Context, bucket string, opts DeleteBucketOptions) error {
|
|
||||||
g := errgroup.WithNErrs(len(s.sets))
|
|
||||||
|
|
||||||
// Delete buckets in parallel across all sets.
|
|
||||||
for index := range s.sets {
|
|
||||||
index := index
|
|
||||||
g.Go(func() error {
|
|
||||||
return s.sets[index].DeleteBucket(ctx, bucket, opts)
|
|
||||||
}, index)
|
|
||||||
}
|
|
||||||
|
|
||||||
errs := g.Wait()
|
|
||||||
// For any failure, we attempt undo all the delete buckets operation
|
|
||||||
// by creating buckets again on all sets which were successfully deleted.
|
|
||||||
for _, err := range errs {
|
|
||||||
if err != nil && !opts.NoRecreate {
|
|
||||||
undoDeleteBucketSets(ctx, bucket, s.sets, errs)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Success.
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// This function is used to undo a successful DeleteBucket operation.
|
|
||||||
func undoDeleteBucketSets(ctx context.Context, bucket string, sets []*erasureObjects, errs []error) {
|
|
||||||
g := errgroup.WithNErrs(len(sets))
|
|
||||||
|
|
||||||
// Undo previous delete bucket on all underlying sets.
|
|
||||||
for index := range sets {
|
|
||||||
index := index
|
|
||||||
g.Go(func() error {
|
|
||||||
if errs[index] == nil {
|
|
||||||
return sets[index].MakeBucket(ctx, bucket, MakeBucketOptions{})
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}, index)
|
|
||||||
}
|
|
||||||
|
|
||||||
g.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
// List all buckets from one of the set, we are not doing merge
|
// List all buckets from one of the set, we are not doing merge
|
||||||
// sort here just for simplification. As per design it is assumed
|
// sort here just for simplification. As per design it is assumed
|
||||||
// that all buckets are present on all sets.
|
// that all buckets are present on all sets.
|
||||||
|
@ -89,10 +89,6 @@ func (er erasureObjects) defaultWQuorum() int {
|
|||||||
return dataCount
|
return dataCount
|
||||||
}
|
}
|
||||||
|
|
||||||
func (er erasureObjects) defaultRQuorum() int {
|
|
||||||
return er.setDriveCount - er.defaultParityCount
|
|
||||||
}
|
|
||||||
|
|
||||||
// byDiskTotal is a collection satisfying sort.Interface.
|
// byDiskTotal is a collection satisfying sort.Interface.
|
||||||
type byDiskTotal []madmin.Disk
|
type byDiskTotal []madmin.Disk
|
||||||
|
|
||||||
|
@ -121,12 +121,14 @@ type MakeBucketOptions struct {
|
|||||||
VersioningEnabled bool
|
VersioningEnabled bool
|
||||||
ForceCreate bool // Create buckets even if they are already created.
|
ForceCreate bool // Create buckets even if they are already created.
|
||||||
CreatedAt time.Time // only for site replication
|
CreatedAt time.Time // only for site replication
|
||||||
|
NoLock bool // does not lock the make bucket call if set to 'true'
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteBucketOptions provides options for DeleteBucket calls.
|
// DeleteBucketOptions provides options for DeleteBucket calls.
|
||||||
type DeleteBucketOptions struct {
|
type DeleteBucketOptions struct {
|
||||||
|
NoLock bool // does not lock the delete bucket call if set to 'true'
|
||||||
|
NoRecreate bool // do not recreate bucket on delete failures
|
||||||
Force bool // Force deletion
|
Force bool // Force deletion
|
||||||
NoRecreate bool // Do not recreate on delete failures
|
|
||||||
SRDeleteOp SRBucketDeleteOp // only when site replication is enabled
|
SRDeleteOp SRBucketDeleteOp // only when site replication is enabled
|
||||||
}
|
}
|
||||||
|
|
||||||
|
278
cmd/peer-s3-client.go
Normal file
278
cmd/peer-s3-client.go
Normal file
@ -0,0 +1,278 @@
|
|||||||
|
// Copyright (c) 2015-2022 MinIO, Inc.
|
||||||
|
//
|
||||||
|
// This file is part of MinIO Object Storage stack
|
||||||
|
//
|
||||||
|
// This program is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Affero General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// This program is distributed in the hope that it will be useful
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Affero General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU Affero General Public License
|
||||||
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package cmd
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/gob"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/url"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
|
xhttp "github.com/minio/minio/internal/http"
|
||||||
|
"github.com/minio/minio/internal/logger"
|
||||||
|
"github.com/minio/minio/internal/rest"
|
||||||
|
"github.com/minio/minio/internal/sync/errgroup"
|
||||||
|
xnet "github.com/minio/pkg/net"
|
||||||
|
)
|
||||||
|
|
||||||
|
var errPeerOffline = errors.New("peer is offline")
|
||||||
|
|
||||||
|
// client to talk to peer Nodes.
|
||||||
|
type peerS3Client struct {
|
||||||
|
host *xnet.Host
|
||||||
|
restClient *rest.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wrapper to restClient.Call to handle network errors, in case of network error the connection is marked disconnected
|
||||||
|
// permanently. The only way to restore the connection is at the xl-sets layer by xlsets.monitorAndConnectEndpoints()
|
||||||
|
// after verifying format.json
|
||||||
|
func (client *peerS3Client) call(method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) {
|
||||||
|
return client.callWithContext(GlobalContext, method, values, body, length)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wrapper to restClient.Call to handle network errors, in case of network error the connection is marked disconnected
|
||||||
|
// permanently. The only way to restore the connection is at the xl-sets layer by xlsets.monitorAndConnectEndpoints()
|
||||||
|
// after verifying format.json
|
||||||
|
func (client *peerS3Client) callWithContext(ctx context.Context, method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) {
|
||||||
|
if values == nil {
|
||||||
|
values = make(url.Values)
|
||||||
|
}
|
||||||
|
|
||||||
|
respBody, err = client.restClient.Call(ctx, method, values, body, length)
|
||||||
|
if err == nil {
|
||||||
|
return respBody, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
err = toStorageErr(err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// S3PeerSys - S3 peer call system.
|
||||||
|
type S3PeerSys struct {
|
||||||
|
peerClients []*peerS3Client // Excludes self
|
||||||
|
allPeerClients []*peerS3Client // Includes nil client for self
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewS3PeerSys - creates new S3 peer calls.
|
||||||
|
func NewS3PeerSys(endpoints EndpointServerPools) *S3PeerSys {
|
||||||
|
remote, all := newPeerS3Clients(endpoints)
|
||||||
|
return &S3PeerSys{
|
||||||
|
peerClients: remote,
|
||||||
|
allPeerClients: all,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetBucketInfo returns bucket stat info about bucket on disk across all peers
|
||||||
|
func (sys *S3PeerSys) GetBucketInfo(ctx context.Context, bucket string, opts BucketOptions) (binfo BucketInfo, err error) {
|
||||||
|
g := errgroup.WithNErrs(len(sys.peerClients))
|
||||||
|
|
||||||
|
bucketInfos := make([]BucketInfo, len(sys.peerClients)+1)
|
||||||
|
|
||||||
|
bucketInfo, err := getBucketInfoLocal(ctx, bucket, opts)
|
||||||
|
if err != nil {
|
||||||
|
return BucketInfo{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
errs := []error{nil}
|
||||||
|
bucketInfos[0] = bucketInfo
|
||||||
|
|
||||||
|
for idx, client := range sys.peerClients {
|
||||||
|
idx := idx
|
||||||
|
client := client
|
||||||
|
g.Go(func() error {
|
||||||
|
if client == nil {
|
||||||
|
return errPeerOffline
|
||||||
|
}
|
||||||
|
bucketInfo, err := client.GetBucketInfo(ctx, bucket, opts)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
bucketInfos[idx] = bucketInfo
|
||||||
|
return nil
|
||||||
|
}, idx)
|
||||||
|
}
|
||||||
|
|
||||||
|
errs = append(errs, g.Wait()...)
|
||||||
|
|
||||||
|
quorum := (len(sys.allPeerClients) / 2)
|
||||||
|
if err = reduceReadQuorumErrs(ctx, errs, bucketOpIgnoredErrs, quorum); err != nil {
|
||||||
|
return BucketInfo{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, err := range errs {
|
||||||
|
if err == nil {
|
||||||
|
bucketInfo = bucketInfos[i]
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return bucketInfo, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetBucketInfo returns bucket stat info from a peer
|
||||||
|
func (client *peerS3Client) GetBucketInfo(ctx context.Context, bucket string, opts BucketOptions) (BucketInfo, error) {
|
||||||
|
v := url.Values{}
|
||||||
|
v.Set(peerS3Bucket, bucket)
|
||||||
|
v.Set(peerS3BucketDeleted, strconv.FormatBool(opts.Deleted))
|
||||||
|
|
||||||
|
respBody, err := client.call(peerS3MethodGetBucketInfo, v, nil, -1)
|
||||||
|
if err != nil {
|
||||||
|
return BucketInfo{}, err
|
||||||
|
}
|
||||||
|
defer xhttp.DrainBody(respBody)
|
||||||
|
|
||||||
|
var bucketInfo BucketInfo
|
||||||
|
err = gob.NewDecoder(respBody).Decode(&bucketInfo)
|
||||||
|
return bucketInfo, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// MakeBucket creates bucket across all peers
|
||||||
|
func (sys *S3PeerSys) MakeBucket(ctx context.Context, bucket string, opts MakeBucketOptions) error {
|
||||||
|
g := errgroup.WithNErrs(len(sys.peerClients))
|
||||||
|
|
||||||
|
for idx, client := range sys.peerClients {
|
||||||
|
client := client
|
||||||
|
g.Go(func() error {
|
||||||
|
if client == nil {
|
||||||
|
return errPeerOffline
|
||||||
|
}
|
||||||
|
return client.MakeBucket(ctx, bucket, opts)
|
||||||
|
}, idx)
|
||||||
|
}
|
||||||
|
|
||||||
|
errs := g.Wait()
|
||||||
|
errs = append(errs, makeBucketLocal(ctx, bucket, opts))
|
||||||
|
|
||||||
|
quorum := (len(sys.allPeerClients) / 2) + 1
|
||||||
|
err := reduceWriteQuorumErrs(ctx, errs, bucketOpIgnoredErrs, quorum)
|
||||||
|
return toObjectErr(err, bucket)
|
||||||
|
}
|
||||||
|
|
||||||
|
// MakeBucket creates a bucket on a peer
|
||||||
|
func (client *peerS3Client) MakeBucket(ctx context.Context, bucket string, opts MakeBucketOptions) error {
|
||||||
|
v := url.Values{}
|
||||||
|
v.Set(peerS3Bucket, bucket)
|
||||||
|
v.Set(peerS3BucketForceCreate, strconv.FormatBool(opts.ForceCreate))
|
||||||
|
|
||||||
|
respBody, err := client.call(peerS3MethodMakeBucket, v, nil, -1)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer xhttp.DrainBody(respBody)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteBucket deletes bucket across all peers
|
||||||
|
func (sys *S3PeerSys) DeleteBucket(ctx context.Context, bucket string, opts DeleteBucketOptions) error {
|
||||||
|
g := errgroup.WithNErrs(len(sys.peerClients))
|
||||||
|
|
||||||
|
for idx, client := range sys.peerClients {
|
||||||
|
client := client
|
||||||
|
g.Go(func() error {
|
||||||
|
if client == nil {
|
||||||
|
return errPeerOffline
|
||||||
|
}
|
||||||
|
return client.DeleteBucket(ctx, bucket, opts)
|
||||||
|
}, idx)
|
||||||
|
}
|
||||||
|
|
||||||
|
errs := g.Wait()
|
||||||
|
errs = append(errs, deleteBucketLocal(ctx, bucket, opts))
|
||||||
|
|
||||||
|
for _, err := range errs {
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteBucket deletes bucket on a peer
|
||||||
|
func (client *peerS3Client) DeleteBucket(ctx context.Context, bucket string, opts DeleteBucketOptions) error {
|
||||||
|
v := url.Values{}
|
||||||
|
v.Set(peerS3Bucket, bucket)
|
||||||
|
v.Set(peerS3BucketForceDelete, strconv.FormatBool(opts.Force))
|
||||||
|
|
||||||
|
respBody, err := client.call(peerS3MethodDeleteBucket, v, nil, -1)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer xhttp.DrainBody(respBody)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// newPeerS3Clients creates new peer clients.
|
||||||
|
// The two slices will point to the same clients,
|
||||||
|
// but 'all' will contain nil entry for local client.
|
||||||
|
// The 'all' slice will be in the same order across the cluster.
|
||||||
|
func newPeerS3Clients(endpoints EndpointServerPools) (remote, all []*peerS3Client) {
|
||||||
|
if !globalIsDistErasure {
|
||||||
|
// Only useful in distributed setups
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
hosts := endpoints.hostsSorted()
|
||||||
|
remote = make([]*peerS3Client, 0, len(hosts))
|
||||||
|
all = make([]*peerS3Client, len(hosts))
|
||||||
|
for i, host := range hosts {
|
||||||
|
if host == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
all[i] = newPeerS3Client(host)
|
||||||
|
remote = append(remote, all[i])
|
||||||
|
}
|
||||||
|
if len(all) != len(remote)+1 {
|
||||||
|
logger.LogIf(context.Background(), fmt.Errorf("WARNING: Expected number of all hosts (%v) to be remote +1 (%v)", len(all), len(remote)))
|
||||||
|
}
|
||||||
|
return remote, all
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returns a peer S3 client.
|
||||||
|
func newPeerS3Client(peer *xnet.Host) *peerS3Client {
|
||||||
|
scheme := "http"
|
||||||
|
if globalIsTLS {
|
||||||
|
scheme = "https"
|
||||||
|
}
|
||||||
|
|
||||||
|
serverURL := &url.URL{
|
||||||
|
Scheme: scheme,
|
||||||
|
Host: peer.String(),
|
||||||
|
Path: peerS3Path,
|
||||||
|
}
|
||||||
|
|
||||||
|
restClient := rest.NewClient(serverURL, globalInternodeTransport, newCachedAuthToken())
|
||||||
|
// Use a separate client to avoid recursive calls.
|
||||||
|
healthClient := rest.NewClient(serverURL, globalInternodeTransport, newCachedAuthToken())
|
||||||
|
healthClient.ExpectTimeouts = true
|
||||||
|
healthClient.NoMetrics = true
|
||||||
|
|
||||||
|
// Construct a new health function.
|
||||||
|
restClient.HealthCheckFn = func() bool {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), restClient.HealthCheckTimeout)
|
||||||
|
defer cancel()
|
||||||
|
respBody, err := healthClient.Call(ctx, peerS3MethodHealth, nil, nil, -1)
|
||||||
|
xhttp.DrainBody(respBody)
|
||||||
|
return !isNetworkError(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &peerS3Client{host: peer, restClient: restClient}
|
||||||
|
}
|
256
cmd/peer-s3-server.go
Normal file
256
cmd/peer-s3-server.go
Normal file
@ -0,0 +1,256 @@
|
|||||||
|
// Copyright (c) 2015-2022 MinIO, Inc.
|
||||||
|
//
|
||||||
|
// This file is part of MinIO Object Storage stack
|
||||||
|
//
|
||||||
|
// This program is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Affero General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// This program is distributed in the hope that it will be useful
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Affero General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU Affero General Public License
|
||||||
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package cmd
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/gob"
|
||||||
|
"errors"
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/gorilla/mux"
|
||||||
|
"github.com/minio/minio/internal/logger"
|
||||||
|
"github.com/minio/minio/internal/sync/errgroup"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
peerS3Version = "v1" // First implementation
|
||||||
|
|
||||||
|
peerS3VersionPrefix = SlashSeparator + peerS3Version
|
||||||
|
peerS3Prefix = minioReservedBucketPath + "/peer"
|
||||||
|
peerS3Path = peerS3Prefix + peerS3VersionPrefix
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
peerS3MethodHealth = "/health"
|
||||||
|
peerS3MethodMakeBucket = "/make-bucket"
|
||||||
|
peerS3MethodGetBucketInfo = "/get-bucket-info"
|
||||||
|
peerS3MethodDeleteBucket = "/delete-bucket"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
peerS3Bucket = "bucket"
|
||||||
|
peerS3BucketDeleted = "bucket-deleted"
|
||||||
|
peerS3BucketForceCreate = "force-create"
|
||||||
|
peerS3BucketForceDelete = "force-delete"
|
||||||
|
)
|
||||||
|
|
||||||
|
type peerS3Server struct{}
|
||||||
|
|
||||||
|
func (s *peerS3Server) writeErrorResponse(w http.ResponseWriter, err error) {
|
||||||
|
w.WriteHeader(http.StatusForbidden)
|
||||||
|
w.Write([]byte(err.Error()))
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsValid - To authenticate and verify the time difference.
|
||||||
|
func (s *peerS3Server) IsValid(w http.ResponseWriter, r *http.Request) bool {
|
||||||
|
objAPI := newObjectLayerFn()
|
||||||
|
if objAPI == nil {
|
||||||
|
s.writeErrorResponse(w, errServerNotInitialized)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := storageServerRequestValidate(r); err != nil {
|
||||||
|
s.writeErrorResponse(w, err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// HealthHandler - returns true of health
|
||||||
|
func (s *peerS3Server) HealthHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
s.IsValid(w, r)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getBucketInfoLocal(ctx context.Context, bucket string, opts BucketOptions) (BucketInfo, error) {
|
||||||
|
g := errgroup.WithNErrs(len(globalLocalDrives)).WithConcurrency(32)
|
||||||
|
bucketsInfo := make([]BucketInfo, len(globalLocalDrives))
|
||||||
|
|
||||||
|
// Make a volume entry on all underlying storage disks.
|
||||||
|
for index := range globalLocalDrives {
|
||||||
|
index := index
|
||||||
|
g.Go(func() error {
|
||||||
|
if globalLocalDrives[index] == nil {
|
||||||
|
return errDiskNotFound
|
||||||
|
}
|
||||||
|
volInfo, err := globalLocalDrives[index].StatVol(ctx, bucket)
|
||||||
|
if err != nil {
|
||||||
|
if opts.Deleted {
|
||||||
|
dvi, derr := globalLocalDrives[index].StatVol(ctx, pathJoin(minioMetaBucket, bucketMetaPrefix, deletedBucketsPrefix, bucket))
|
||||||
|
if derr != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
bucketsInfo[index] = BucketInfo{Name: bucket, Deleted: dvi.Created}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
bucketsInfo[index] = BucketInfo{Name: bucket, Created: volInfo.Created}
|
||||||
|
return nil
|
||||||
|
}, index)
|
||||||
|
}
|
||||||
|
|
||||||
|
errs := g.Wait()
|
||||||
|
if err := reduceReadQuorumErrs(ctx, errs, bucketOpIgnoredErrs, (len(globalLocalDrives) / 2)); err != nil {
|
||||||
|
return BucketInfo{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var bucketInfo BucketInfo
|
||||||
|
for i, err := range errs {
|
||||||
|
if err == nil {
|
||||||
|
bucketInfo = bucketsInfo[i]
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return bucketInfo, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func deleteBucketLocal(ctx context.Context, bucket string, opts DeleteBucketOptions) error {
|
||||||
|
g := errgroup.WithNErrs(len(globalLocalDrives)).WithConcurrency(32)
|
||||||
|
|
||||||
|
// Make a volume entry on all underlying storage disks.
|
||||||
|
for index := range globalLocalDrives {
|
||||||
|
index := index
|
||||||
|
g.Go(func() error {
|
||||||
|
if globalLocalDrives[index] == nil {
|
||||||
|
return errDiskNotFound
|
||||||
|
}
|
||||||
|
return globalLocalDrives[index].DeleteVol(ctx, bucket, opts.Force)
|
||||||
|
}, index)
|
||||||
|
}
|
||||||
|
|
||||||
|
var recreate bool
|
||||||
|
errs := g.Wait()
|
||||||
|
for index, err := range errs {
|
||||||
|
if errors.Is(err, errVolumeNotEmpty) {
|
||||||
|
recreate = true
|
||||||
|
}
|
||||||
|
if err == nil && recreate {
|
||||||
|
// ignore any errors
|
||||||
|
globalLocalDrives[index].MakeVol(ctx, bucket)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, err := range errs {
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func makeBucketLocal(ctx context.Context, bucket string, opts MakeBucketOptions) error {
|
||||||
|
g := errgroup.WithNErrs(len(globalLocalDrives)).WithConcurrency(32)
|
||||||
|
|
||||||
|
// Make a volume entry on all underlying storage disks.
|
||||||
|
for index := range globalLocalDrives {
|
||||||
|
index := index
|
||||||
|
g.Go(func() error {
|
||||||
|
if globalLocalDrives[index] == nil {
|
||||||
|
return errDiskNotFound
|
||||||
|
}
|
||||||
|
err := globalLocalDrives[index].MakeVol(ctx, bucket)
|
||||||
|
if opts.ForceCreate && errors.Is(err, errVolumeExists) {
|
||||||
|
// No need to return error when force create was
|
||||||
|
// requested.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err != nil && !errors.Is(err, errVolumeExists) {
|
||||||
|
logger.LogIf(ctx, err)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}, index)
|
||||||
|
}
|
||||||
|
|
||||||
|
errs := g.Wait()
|
||||||
|
return reduceWriteQuorumErrs(ctx, errs, bucketOpIgnoredErrs, (len(globalLocalDrives)/2)+1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetBucketInfoHandler implements peer BuckeInfo call, returns bucket create date.
|
||||||
|
func (s *peerS3Server) GetBucketInfoHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if !s.IsValid(w, r) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
bucket := r.Form.Get(peerS3Bucket)
|
||||||
|
bucketDeleted := r.Form.Get(peerS3BucketDeleted) == "true"
|
||||||
|
bucketInfo, err := getBucketInfoLocal(r.Context(), bucket, BucketOptions{
|
||||||
|
Deleted: bucketDeleted,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
s.writeErrorResponse(w, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.LogIf(r.Context(), gob.NewEncoder(w).Encode(bucketInfo))
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteBucketHandler implements peer delete bucket call.
|
||||||
|
func (s *peerS3Server) DeleteBucketHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if !s.IsValid(w, r) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
bucket := r.Form.Get(peerS3Bucket)
|
||||||
|
if isMinioMetaBucket(bucket) {
|
||||||
|
s.writeErrorResponse(w, errInvalidArgument)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
forceDelete := r.Form.Get(peerS3BucketForceDelete) == "true"
|
||||||
|
|
||||||
|
err := deleteBucketLocal(r.Context(), bucket, DeleteBucketOptions{
|
||||||
|
Force: forceDelete,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
s.writeErrorResponse(w, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// MakeBucketHandler implements peer create bucket call.
|
||||||
|
func (s *peerS3Server) MakeBucketHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if !s.IsValid(w, r) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
bucket := r.Form.Get(peerS3Bucket)
|
||||||
|
forceCreate := r.Form.Get(peerS3BucketForceCreate) == "true"
|
||||||
|
|
||||||
|
err := makeBucketLocal(r.Context(), bucket, MakeBucketOptions{
|
||||||
|
ForceCreate: forceCreate,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
s.writeErrorResponse(w, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// registerPeerS3Handlers - register peer s3 router.
|
||||||
|
func registerPeerS3Handlers(router *mux.Router) {
|
||||||
|
server := &peerS3Server{}
|
||||||
|
subrouter := router.PathPrefix(peerS3Prefix).Subrouter()
|
||||||
|
|
||||||
|
subrouter.Methods(http.MethodPost).Path(peerS3VersionPrefix + peerS3MethodHealth).HandlerFunc(httpTraceHdrs(server.HealthHandler))
|
||||||
|
subrouter.Methods(http.MethodPost).Path(peerS3VersionPrefix + peerS3MethodMakeBucket).HandlerFunc(httpTraceHdrs(server.MakeBucketHandler))
|
||||||
|
subrouter.Methods(http.MethodPost).Path(peerS3VersionPrefix + peerS3MethodDeleteBucket).HandlerFunc(httpTraceHdrs(server.DeleteBucketHandler))
|
||||||
|
subrouter.Methods(http.MethodPost).Path(peerS3VersionPrefix + peerS3MethodGetBucketInfo).HandlerFunc(httpTraceHdrs(server.GetBucketInfoHandler))
|
||||||
|
}
|
@ -31,6 +31,9 @@ func registerDistErasureRouters(router *mux.Router, endpointServerPools Endpoint
|
|||||||
// Register peer REST router only if its a distributed setup.
|
// Register peer REST router only if its a distributed setup.
|
||||||
registerPeerRESTHandlers(router)
|
registerPeerRESTHandlers(router)
|
||||||
|
|
||||||
|
// Register peer S3 router only if its a distributed setup.
|
||||||
|
registerPeerS3Handlers(router)
|
||||||
|
|
||||||
// Register bootstrap REST router for distributed setups.
|
// Register bootstrap REST router for distributed setups.
|
||||||
registerBootstrapRESTHandlers(router)
|
registerBootstrapRESTHandlers(router)
|
||||||
|
|
||||||
|
@ -59,12 +59,14 @@ type check struct {
|
|||||||
|
|
||||||
// Assert - checks if gotValue is same as expectedValue, if not fails the test.
|
// Assert - checks if gotValue is same as expectedValue, if not fails the test.
|
||||||
func (c *check) Assert(gotValue interface{}, expectedValue interface{}) {
|
func (c *check) Assert(gotValue interface{}, expectedValue interface{}) {
|
||||||
|
c.Helper()
|
||||||
if !reflect.DeepEqual(gotValue, expectedValue) {
|
if !reflect.DeepEqual(gotValue, expectedValue) {
|
||||||
c.Fatalf("Test %s:%s expected %v, got %v", getSource(2), c.testType, expectedValue, gotValue)
|
c.Fatalf("Test %s expected %v, got %v", c.testType, expectedValue, gotValue)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func verifyError(c *check, response *http.Response, code, description string, statusCode int) {
|
func verifyError(c *check, response *http.Response, code, description string, statusCode int) {
|
||||||
|
c.Helper()
|
||||||
data, err := io.ReadAll(response.Body)
|
data, err := io.ReadAll(response.Body)
|
||||||
c.Assert(err, nil)
|
c.Assert(err, nil)
|
||||||
errorResponse := APIErrorResponse{}
|
errorResponse := APIErrorResponse{}
|
||||||
|
Loading…
Reference in New Issue
Block a user