mirror of
https://github.com/minio/minio.git
synced 2025-04-29 06:08:00 -04:00
fix: error out if an object is found after a full decom (#16277)
This commit is contained in:
parent
9815dac48f
commit
475a88b555
@ -686,6 +686,39 @@ func (v versionsSorter) reverse() {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (set *erasureObjects) listObjectsToDecommission(ctx context.Context, bi decomBucketInfo, fn func(entry metaCacheEntry)) error {
|
||||||
|
disks := set.getOnlineDisks()
|
||||||
|
if len(disks) == 0 {
|
||||||
|
return fmt.Errorf("no online drives found for set with endpoints %s", set.getEndpoints())
|
||||||
|
}
|
||||||
|
|
||||||
|
// How to resolve partial results.
|
||||||
|
resolver := metadataResolutionParams{
|
||||||
|
dirQuorum: len(disks) / 2, // make sure to capture all quorum ratios
|
||||||
|
objQuorum: len(disks) / 2, // make sure to capture all quorum ratios
|
||||||
|
bucket: bi.Name,
|
||||||
|
}
|
||||||
|
|
||||||
|
err := listPathRaw(ctx, listPathRawOptions{
|
||||||
|
disks: disks,
|
||||||
|
bucket: bi.Name,
|
||||||
|
path: bi.Prefix,
|
||||||
|
recursive: true,
|
||||||
|
forwardTo: "",
|
||||||
|
minDisks: len(disks) / 2, // to capture all quorum ratios
|
||||||
|
reportNotFound: false,
|
||||||
|
agreed: fn,
|
||||||
|
partial: func(entries metaCacheEntries, _ []error) {
|
||||||
|
entry, ok := entries.resolve(&resolver)
|
||||||
|
if ok {
|
||||||
|
fn(*entry)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
finished: nil,
|
||||||
|
})
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool *erasureSets, bi decomBucketInfo) error {
|
func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool *erasureSets, bi decomBucketInfo) error {
|
||||||
ctx = logger.SetReqInfo(ctx, &logger.ReqInfo{})
|
ctx = logger.SetReqInfo(ctx, &logger.ReqInfo{})
|
||||||
|
|
||||||
@ -700,12 +733,6 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
|
|||||||
|
|
||||||
for _, set := range pool.sets {
|
for _, set := range pool.sets {
|
||||||
set := set
|
set := set
|
||||||
disks := set.getOnlineDisks()
|
|
||||||
if len(disks) == 0 {
|
|
||||||
logger.LogIf(GlobalContext, fmt.Errorf("no online drives found for set with endpoints %s",
|
|
||||||
set.getEndpoints()))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
vc, _ := globalBucketVersioningSys.Get(bi.Name)
|
vc, _ := globalBucketVersioningSys.Get(bi.Name)
|
||||||
|
|
||||||
@ -879,39 +906,16 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
|
|||||||
z.poolMetaMutex.Unlock()
|
z.poolMetaMutex.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// How to resolve partial results.
|
|
||||||
resolver := metadataResolutionParams{
|
|
||||||
dirQuorum: len(disks) / 2, // make sure to capture all quorum ratios
|
|
||||||
objQuorum: len(disks) / 2, // make sure to capture all quorum ratios
|
|
||||||
bucket: bi.Name,
|
|
||||||
}
|
|
||||||
|
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
err := listPathRaw(ctx, listPathRawOptions{
|
err := set.listObjectsToDecommission(ctx, bi,
|
||||||
disks: disks,
|
func(entry metaCacheEntry) {
|
||||||
bucket: bi.Name,
|
|
||||||
path: bi.Prefix,
|
|
||||||
recursive: true,
|
|
||||||
forwardTo: "",
|
|
||||||
minDisks: len(disks) / 2, // to capture all quorum ratios
|
|
||||||
reportNotFound: false,
|
|
||||||
agreed: func(entry metaCacheEntry) {
|
|
||||||
parallelWorkers <- struct{}{}
|
parallelWorkers <- struct{}{}
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go decommissionEntry(entry)
|
go decommissionEntry(entry)
|
||||||
},
|
},
|
||||||
partial: func(entries metaCacheEntries, _ []error) {
|
)
|
||||||
entry, ok := entries.resolve(&resolver)
|
|
||||||
if ok {
|
|
||||||
parallelWorkers <- struct{}{}
|
|
||||||
wg.Add(1)
|
|
||||||
go decommissionEntry(*entry)
|
|
||||||
}
|
|
||||||
},
|
|
||||||
finished: nil,
|
|
||||||
})
|
|
||||||
logger.LogIf(ctx, err)
|
logger.LogIf(ctx, err)
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
@ -992,6 +996,33 @@ func (z *erasureServerPools) decommissionInBackground(ctx context.Context, idx i
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (z *erasureServerPools) checkAfterDecom(ctx context.Context, idx int) error {
|
||||||
|
buckets, err := z.getBucketsToDecommission(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
pool := z.serverPools[idx]
|
||||||
|
for _, set := range pool.sets {
|
||||||
|
for _, bi := range buckets {
|
||||||
|
var objectsFound int
|
||||||
|
err := set.listObjectsToDecommission(ctx, bi, func(entry metaCacheEntry) {
|
||||||
|
if entry.isObject() {
|
||||||
|
objectsFound++
|
||||||
|
}
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if objectsFound > 0 {
|
||||||
|
return fmt.Errorf("at least %d objects were found in bucket `%s` after decommissioning", objectsFound, bi.Name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (z *erasureServerPools) doDecommissionInRoutine(ctx context.Context, idx int) {
|
func (z *erasureServerPools) doDecommissionInRoutine(ctx context.Context, idx int) {
|
||||||
z.poolMetaMutex.Lock()
|
z.poolMetaMutex.Lock()
|
||||||
var dctx context.Context
|
var dctx context.Context
|
||||||
@ -1011,6 +1042,15 @@ func (z *erasureServerPools) doDecommissionInRoutine(ctx context.Context, idx in
|
|||||||
failed := z.poolMeta.Pools[idx].Decommission.ItemsDecommissionFailed > 0
|
failed := z.poolMeta.Pools[idx].Decommission.ItemsDecommissionFailed > 0
|
||||||
z.poolMetaMutex.Unlock()
|
z.poolMetaMutex.Unlock()
|
||||||
|
|
||||||
|
if !failed {
|
||||||
|
logger.Info("Decommissioning almost complete - checking for left over objects")
|
||||||
|
err := z.checkAfterDecom(dctx, idx)
|
||||||
|
if err != nil {
|
||||||
|
logger.LogIf(ctx, err)
|
||||||
|
failed = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if failed {
|
if failed {
|
||||||
// Decommission failed indicate as such.
|
// Decommission failed indicate as such.
|
||||||
logger.LogIf(GlobalContext, z.DecommissionFailed(dctx, idx))
|
logger.LogIf(GlobalContext, z.DecommissionFailed(dctx, idx))
|
||||||
@ -1193,6 +1233,33 @@ func (z *erasureServerPools) CompleteDecommission(ctx context.Context, idx int)
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (z *erasureServerPools) getBucketsToDecommission(ctx context.Context) ([]decomBucketInfo, error) {
|
||||||
|
buckets, err := z.ListBuckets(ctx, BucketOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
decomBuckets := make([]decomBucketInfo, len(buckets))
|
||||||
|
for i := range buckets {
|
||||||
|
decomBuckets[i] = decomBucketInfo{
|
||||||
|
Name: buckets[i].Name,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Buckets data are dispersed in multiple zones/sets, make
|
||||||
|
// sure to decommission the necessary metadata.
|
||||||
|
decomBuckets = append(decomBuckets, decomBucketInfo{
|
||||||
|
Name: minioMetaBucket,
|
||||||
|
Prefix: minioConfigPrefix,
|
||||||
|
})
|
||||||
|
decomBuckets = append(decomBuckets, decomBucketInfo{
|
||||||
|
Name: minioMetaBucket,
|
||||||
|
Prefix: bucketMetaPrefix,
|
||||||
|
})
|
||||||
|
|
||||||
|
return decomBuckets, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (z *erasureServerPools) StartDecommission(ctx context.Context, idx int) (err error) {
|
func (z *erasureServerPools) StartDecommission(ctx context.Context, idx int) (err error) {
|
||||||
if idx < 0 {
|
if idx < 0 {
|
||||||
return errInvalidArgument
|
return errInvalidArgument
|
||||||
@ -1202,25 +1269,15 @@ func (z *erasureServerPools) StartDecommission(ctx context.Context, idx int) (er
|
|||||||
return errInvalidArgument
|
return errInvalidArgument
|
||||||
}
|
}
|
||||||
|
|
||||||
buckets, err := z.ListBuckets(ctx, BucketOptions{})
|
decomBuckets, err := z.getBucketsToDecommission(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make sure to heal the buckets to ensure the new
|
for _, bucket := range decomBuckets {
|
||||||
// pool has the new buckets, this is to avoid
|
|
||||||
// failures later.
|
|
||||||
for _, bucket := range buckets {
|
|
||||||
z.HealBucket(ctx, bucket.Name, madmin.HealOpts{})
|
z.HealBucket(ctx, bucket.Name, madmin.HealOpts{})
|
||||||
}
|
}
|
||||||
|
|
||||||
decomBuckets := make([]decomBucketInfo, len(buckets))
|
|
||||||
for i := range buckets {
|
|
||||||
decomBuckets[i] = decomBucketInfo{
|
|
||||||
Name: buckets[i].Name,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: Support decommissioning transition tiers.
|
// TODO: Support decommissioning transition tiers.
|
||||||
for _, bucket := range decomBuckets {
|
for _, bucket := range decomBuckets {
|
||||||
if lc, err := globalLifecycleSys.Get(bucket.Name); err == nil {
|
if lc, err := globalLifecycleSys.Get(bucket.Name); err == nil {
|
||||||
@ -1247,17 +1304,6 @@ func (z *erasureServerPools) StartDecommission(ctx context.Context, idx int) (er
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Buckets data are dispersed in multiple zones/sets, make
|
|
||||||
// sure to decommission the necessary metadata.
|
|
||||||
decomBuckets = append(decomBuckets, decomBucketInfo{
|
|
||||||
Name: minioMetaBucket,
|
|
||||||
Prefix: minioConfigPrefix,
|
|
||||||
})
|
|
||||||
decomBuckets = append(decomBuckets, decomBucketInfo{
|
|
||||||
Name: minioMetaBucket,
|
|
||||||
Prefix: bucketMetaPrefix,
|
|
||||||
})
|
|
||||||
|
|
||||||
var pool *erasureSets
|
var pool *erasureSets
|
||||||
for pidx := range z.serverPools {
|
for pidx := range z.serverPools {
|
||||||
if pidx == idx {
|
if pidx == idx {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user