mirror of
https://github.com/minio/minio.git
synced 2025-01-25 21:53:16 -05:00
update() stale rebalance stats() object during pool expansion (#18882)
it is entirely possible that a rebalance process which was running when it was asked to "stop" it failed to write its last statistics to the disk. After this a pool expansion can cause disruption and all S3 API calls would fail at IsPoolRebalancing() function. This PRs makes sure that we update rebalance.bin under such conditions to avoid any runtime crashes.
This commit is contained in:
parent
c51f9ef940
commit
32e668eb94
@ -490,6 +490,10 @@ func healFreshDisk(ctx context.Context, z *erasureServerPools, endpoint Endpoint
|
||||
}
|
||||
|
||||
for _, disk := range disks {
|
||||
if disk == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
t, err := loadHealingTracker(ctx, disk)
|
||||
if err != nil {
|
||||
if !errors.Is(err, errFileNotFound) {
|
||||
|
@ -936,7 +936,8 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
|
||||
bi.Name,
|
||||
encodeDirObject(entry.name),
|
||||
ObjectOptions{
|
||||
DeletePrefix: true, // use prefix delete to delete all versions at once.
|
||||
DeletePrefix: true, // use prefix delete to delete all versions at once.
|
||||
DeletePrefixObject: true, // use prefix delete on exact object (this is an optimization to avoid fan-out calls)
|
||||
},
|
||||
)
|
||||
stopFn(err)
|
||||
|
@ -114,12 +114,60 @@ func (z *erasureServerPools) loadRebalanceMeta(ctx context.Context) error {
|
||||
}
|
||||
|
||||
z.rebalMu.Lock()
|
||||
z.rebalMeta = r
|
||||
if len(r.PoolStats) == len(z.serverPools) {
|
||||
z.rebalMeta = r
|
||||
} else {
|
||||
z.updateRebalanceStats(ctx)
|
||||
}
|
||||
z.rebalMu.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// updates rebalance.bin from let's say 2 pool setup in the middle
|
||||
// of a rebalance, was expanded can cause z.rebalMeta to be outdated
|
||||
// due to a missing new pool. This function tries to handle this
|
||||
// scenario, albeit rare it seems to have occurred in the wild.
|
||||
//
|
||||
// since we do not explicitly disallow it, but it is okay for them
|
||||
// expand and then we continue to rebalance.
|
||||
func (z *erasureServerPools) updateRebalanceStats(ctx context.Context) error {
|
||||
var ok bool
|
||||
for i := range z.serverPools {
|
||||
if z.findIndex(i) == -1 {
|
||||
// Also ensure to initialize rebalanceStats to indicate
|
||||
// its a new pool that can receive rebalanced data.
|
||||
z.rebalMeta.PoolStats = append(z.rebalMeta.PoolStats, &rebalanceStats{})
|
||||
ok = true
|
||||
}
|
||||
}
|
||||
if ok {
|
||||
lock := z.serverPools[0].NewNSLock(minioMetaBucket, rebalMetaName)
|
||||
lkCtx, err := lock.GetLock(ctx, globalOperationTimeout)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, fmt.Errorf("failed to acquire write lock on %s/%s: %w", minioMetaBucket, rebalMetaName, err))
|
||||
return err
|
||||
}
|
||||
defer lock.Unlock(lkCtx)
|
||||
|
||||
ctx = lkCtx.Context()
|
||||
|
||||
noLockOpts := ObjectOptions{NoLock: true}
|
||||
return z.rebalMeta.saveWithOpts(ctx, z.serverPools[0], noLockOpts)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (z *erasureServerPools) findIndex(index int) int {
|
||||
for i := 0; i < len(z.rebalMeta.PoolStats); i++ {
|
||||
if i == index {
|
||||
return index
|
||||
}
|
||||
}
|
||||
return -1
|
||||
}
|
||||
|
||||
// initRebalanceMeta initializes rebalance metadata for a new rebalance
|
||||
// operation and saves it in the object store.
|
||||
func (z *erasureServerPools) initRebalanceMeta(ctx context.Context, buckets []string) (arn string, err error) {
|
||||
@ -510,7 +558,6 @@ func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string,
|
||||
|
||||
// Apply lifecycle rules on the objects that are expired.
|
||||
if filterLifecycle(bucket, version.Name, version) {
|
||||
rebalanced++
|
||||
expired++
|
||||
continue
|
||||
}
|
||||
@ -609,7 +656,8 @@ func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string,
|
||||
bucket,
|
||||
encodeDirObject(entry.name),
|
||||
ObjectOptions{
|
||||
DeletePrefix: true, // use prefix delete to delete all versions at once.
|
||||
DeletePrefix: true, // use prefix delete to delete all versions at once.
|
||||
DeletePrefixObject: true, // use prefix delete on exact object (this is an optimization to avoid fan-out calls)
|
||||
},
|
||||
)
|
||||
stopFn(err)
|
||||
@ -695,8 +743,7 @@ func (z *erasureServerPools) saveRebalanceStats(ctx context.Context, poolIdx int
|
||||
}
|
||||
z.rebalMeta = r
|
||||
|
||||
err = z.rebalMeta.saveWithOpts(ctx, z.serverPools[0], noLockOpts)
|
||||
return err
|
||||
return z.rebalMeta.saveWithOpts(ctx, z.serverPools[0], noLockOpts)
|
||||
}
|
||||
|
||||
func auditLogRebalance(ctx context.Context, apiName, bucket, object, versionID string, err error) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user