site replication: fix healing of bucket deletes. (#15377)

This PR changes the handling of bucket deletes for site 
replicated setups to hold on to deleted bucket state until 
it syncs to all the clusters participating in site replication.
This commit is contained in:
Poorna
2022-07-25 17:51:32 -07:00
committed by GitHub
parent e4b51235f8
commit 426c902b87
55 changed files with 1946 additions and 320 deletions

View File

@@ -27,6 +27,7 @@ import (
"net/http"
"reflect"
"sort"
"strings"
"sync"
"time"
@@ -689,7 +690,7 @@ func (s *erasureSets) Shutdown(ctx context.Context) error {
// MakeBucketLocation - creates a new bucket across all sets simultaneously,
// then return the first encountered error
func (s *erasureSets) MakeBucketWithLocation(ctx context.Context, bucket string, opts BucketOptions) error {
func (s *erasureSets) MakeBucketWithLocation(ctx context.Context, bucket string, opts MakeBucketOptions) error {
g := errgroup.WithNErrs(len(s.sets))
// Create buckets in parallel across all sets.
@@ -760,8 +761,8 @@ func (s *erasureSets) getHashedSet(input string) (set *erasureObjects) {
}
// GetBucketInfo - returns bucket info from one of the erasure coded set.
func (s *erasureSets) GetBucketInfo(ctx context.Context, bucket string) (bucketInfo BucketInfo, err error) {
return s.getHashedSet("").GetBucketInfo(ctx, bucket)
func (s *erasureSets) GetBucketInfo(ctx context.Context, bucket string, opts BucketOptions) (bucketInfo BucketInfo, err error) {
return s.getHashedSet("").GetBucketInfo(ctx, bucket, opts)
}
// IsNotificationSupported returns whether bucket notification is applicable for this layer.
@@ -825,7 +826,7 @@ func undoDeleteBucketSets(ctx context.Context, bucket string, sets []*erasureObj
index := index
g.Go(func() error {
if errs[index] == nil {
return sets[index].MakeBucketWithLocation(ctx, bucket, BucketOptions{})
return sets[index].MakeBucketWithLocation(ctx, bucket, MakeBucketOptions{})
}
return nil
}, index)
@@ -837,7 +838,7 @@ func undoDeleteBucketSets(ctx context.Context, bucket string, sets []*erasureObj
// List all buckets from one of the set, we are not doing merge
// sort here just for simplification. As per design it is assumed
// that all buckets are present on all sets.
func (s *erasureSets) ListBuckets(ctx context.Context) (buckets []BucketInfo, err error) {
func (s *erasureSets) ListBuckets(ctx context.Context, opts BucketOptions) (buckets []BucketInfo, err error) {
var listBuckets []BucketInfo
healBuckets := map[string]VolInfo{}
for _, set := range s.sets {
@@ -847,8 +848,34 @@ func (s *erasureSets) ListBuckets(ctx context.Context) (buckets []BucketInfo, er
}
}
// include deleted buckets in listBuckets output
deletedBuckets := map[string]VolInfo{}
if opts.Deleted {
for _, set := range s.sets {
// lists all unique buckets across drives.
if err := listDeletedBuckets(ctx, set.getDisks(), deletedBuckets, s.defaultParityCount); err != nil {
return nil, err
}
}
}
for _, v := range healBuckets {
listBuckets = append(listBuckets, BucketInfo(v))
bi := BucketInfo{
Name: v.Name,
Created: v.Created,
}
if vi, ok := deletedBuckets[v.Name]; ok {
bi.Deleted = vi.Created
}
listBuckets = append(listBuckets, bi)
}
for _, v := range deletedBuckets {
if _, ok := healBuckets[v.Name]; !ok {
listBuckets = append(listBuckets, BucketInfo{
Name: v.Name,
Deleted: v.Created,
})
}
}
sort.Slice(listBuckets, func(i, j int) bool {
@@ -858,6 +885,45 @@ func (s *erasureSets) ListBuckets(ctx context.Context) (buckets []BucketInfo, er
return listBuckets, nil
}
// listDeletedBuckets lists deleted buckets from all disks.
func listDeletedBuckets(ctx context.Context, storageDisks []StorageAPI, delBuckets map[string]VolInfo, readQuorum int) error {
g := errgroup.WithNErrs(len(storageDisks))
var mu sync.Mutex
for index := range storageDisks {
index := index
g.Go(func() error {
if storageDisks[index] == nil {
// we ignore disk not found errors
return nil
}
volsInfo, err := storageDisks[index].ListDir(ctx, minioMetaBucket, pathJoin(bucketMetaPrefix, deletedBucketsPrefix), -1)
if err != nil {
if err == errFileNotFound {
return nil
}
return err
}
for _, volName := range volsInfo {
mu.Lock()
if _, ok := delBuckets[volName]; !ok {
vi, err := storageDisks[index].StatVol(ctx, pathJoin(minioMetaBucket, bucketMetaPrefix, deletedBucketsPrefix, volName))
if err != nil {
return err
}
bkt := strings.TrimPrefix(vi.Name, pathJoin(minioMetaBucket, bucketMetaPrefix, deletedBucketsPrefix))
bkt = strings.TrimPrefix(bkt, slashSeparator)
bkt = strings.TrimSuffix(bkt, slashSeparator)
vi.Name = bkt
delBuckets[bkt] = vi
}
mu.Unlock()
}
return nil
}, index)
}
return reduceReadQuorumErrs(ctx, g.Wait(), bucketMetadataOpIgnoredErrs, readQuorum)
}
// --- Object Operations ---
// GetObjectNInfo - returns object info and locked object ReadCloser