allow decommissioned pools to be removed while others are finishing (#17221)

This commit is contained in:
Harshavardhana 2023-05-16 16:00:57 -07:00 committed by GitHub
parent 2131046427
commit 06557fe8be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 62 additions and 111 deletions

View File

@ -8,8 +8,9 @@ export SECRET_KEY="$3"
export JOB_NAME="$4" export JOB_NAME="$4"
export MINT_MODE="full" export MINT_MODE="full"
docker system prune -f docker system prune -f || true
docker volume prune -f docker volume prune -f || true
docker volume rm $(docker volume ls -f dangling=true) || true
## change working directory ## change working directory
cd .github/workflows/mint cd .github/workflows/mint
@ -45,6 +46,7 @@ sleep 10s
docker system prune -f || true docker system prune -f || true
docker volume prune -f || true docker volume prune -f || true
docker volume rm $(docker volume ls -f dangling=true) || true
## change working directory ## change working directory
cd ../../../ cd ../../../

View File

@ -301,81 +301,31 @@ func (p *poolMeta) validate(pools []*erasureSets) (bool, error) {
specifiedPools[pool.endpoints.CmdLine] = idx specifiedPools[pool.endpoints.CmdLine] = idx
} }
replaceScheme := func(k string) string {
// This is needed as fallback when users are updating
// from http->https or https->http, we need to verify
// both because MinIO remembers the command-line in
// "exact" order - as long as this order is not disturbed
// we allow changing the "scheme" i.e internode communication
// from plain-text to TLS or from TLS to plain-text.
if strings.HasPrefix(k, "http://") {
k = strings.ReplaceAll(k, "http://", "https://")
} else if strings.HasPrefix(k, "https://") {
k = strings.ReplaceAll(k, "https://", "http://")
}
return k
}
var update bool var update bool
// Check if specified pools need to remove decommissioned pool. // Check if specified pools need to be removed from decommissioned pool.
for k := range specifiedPools { for k := range specifiedPools {
pi, ok := rememberedPools[k] pi, ok := rememberedPools[k]
if !ok { if !ok {
pi, ok = rememberedPools[replaceScheme(k)] // we do not have the pool anymore that we previously remembered, since all
if ok { // the CLI checks out we can allow updates since we are mostly adding a pool here.
update = true // Looks like user is changing from http->https or https->http update = true
}
} }
if ok && pi.completed { if ok && pi.completed {
return false, fmt.Errorf("pool(%s) = %s is decommissioned, please remove from server command line", humanize.Ordinal(pi.position+1), k) return false, fmt.Errorf("pool(%s) = %s is decommissioned, please remove from server command line", humanize.Ordinal(pi.position+1), k)
} }
} }
// check if remembered pools are in right position or missing from command line. if len(specifiedPools) == len(rememberedPools) {
for k, pi := range rememberedPools {
if pi.completed {
continue
}
_, ok := specifiedPools[k]
if !ok {
_, ok = specifiedPools[replaceScheme(k)]
if ok {
update = true // Looks like user is changing from http->https or https->http
}
}
if !ok {
update = true
}
}
// check when remembered pools and specified pools are same they are at the expected position
if len(rememberedPools) == len(specifiedPools) {
for k, pi := range rememberedPools { for k, pi := range rememberedPools {
pos, ok := specifiedPools[k] pos, ok := specifiedPools[k]
if !ok {
pos, ok = specifiedPools[replaceScheme(k)]
if ok {
update = true // Looks like user is changing from http->https or https->http
}
}
if !ok {
update = true
}
if ok && pos != pi.position { if ok && pos != pi.position {
return false, fmt.Errorf("pool order change detected for %s, expected position is (%s) but found (%s)", k, humanize.Ordinal(pi.position+1), humanize.Ordinal(pos+1)) update = true // pool order is changing, its okay to allow it.
} }
} }
} }
if !update { if !update {
update = len(rememberedPools) != len(specifiedPools) update = len(specifiedPools) != len(rememberedPools)
}
if update {
for k, pi := range rememberedPools {
if pi.decomStarted && !pi.completed {
return false, fmt.Errorf("pool(%s) = %s is being decommissioned, No changes should be made to the command line arguments. Please complete the decommission in progress", humanize.Ordinal(pi.position+1), k)
}
}
} }
return update, nil return update, nil
@ -507,60 +457,59 @@ func (z *erasureServerPools) Init(ctx context.Context) error {
// if no update is needed return right away. // if no update is needed return right away.
if !update { if !update {
z.poolMeta = meta z.poolMeta = meta
} else {
pools := meta.returnResumablePools() meta = poolMeta{} // to update write poolMeta fresh.
poolIndices := make([]int, 0, len(pools)) // looks like new pool was added we need to update,
for _, pool := range pools { // or this is a fresh installation (or an existing
idx := globalEndpoints.GetPoolIdx(pool.CmdLine) // installation with pool removed)
if idx == -1 { meta.Version = poolMetaVersion
return fmt.Errorf("unexpected state present for decommission status pool(%s) not found", pool.CmdLine) for idx, pool := range z.serverPools {
} meta.Pools = append(meta.Pools, PoolStatus{
poolIndices = append(poolIndices, idx) CmdLine: pool.endpoints.CmdLine,
ID: idx,
LastUpdate: UTCNow(),
})
} }
if err = meta.save(ctx, z.serverPools); err != nil {
return err
}
z.poolMeta = meta
}
if len(poolIndices) > 0 && globalEndpoints[poolIndices[0]].Endpoints[0].IsLocal { pools := meta.returnResumablePools()
go func() { poolIndices := make([]int, 0, len(pools))
r := rand.New(rand.NewSource(time.Now().UnixNano())) for _, pool := range pools {
for { idx := globalEndpoints.GetPoolIdx(pool.CmdLine)
if err := z.Decommission(ctx, poolIndices...); err != nil { if idx == -1 {
if errors.Is(err, errDecommissionAlreadyRunning) { return fmt.Errorf("unexpected state present for decommission status pool(%s) not found", pool.CmdLine)
// A previous decommission running found restart it. }
for _, idx := range poolIndices { poolIndices = append(poolIndices, idx)
z.doDecommissionInRoutine(ctx, idx) }
}
return if len(poolIndices) > 0 && globalEndpoints[poolIndices[0]].Endpoints[0].IsLocal {
go func() {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for {
if err := z.Decommission(ctx, poolIndices...); err != nil {
if errors.Is(err, errDecommissionAlreadyRunning) {
// A previous decommission running found restart it.
for _, idx := range poolIndices {
z.doDecommissionInRoutine(ctx, idx)
} }
if configRetriableErrors(err) {
logger.LogIf(ctx, fmt.Errorf("Unable to resume decommission of pools %v: %w: retrying..", pools, err))
time.Sleep(time.Second + time.Duration(r.Float64()*float64(5*time.Second)))
continue
}
logger.LogIf(ctx, fmt.Errorf("Unable to resume decommission of pool %v: %w", pools, err))
return return
} }
if configRetriableErrors(err) {
logger.LogIf(ctx, fmt.Errorf("Unable to resume decommission of pools %v: %w: retrying..", pools, err))
time.Sleep(time.Second + time.Duration(r.Float64()*float64(5*time.Second)))
continue
}
logger.LogIf(ctx, fmt.Errorf("Unable to resume decommission of pool %v: %w", pools, err))
return
} }
}() }
} }()
return nil
} }
meta = poolMeta{} // to update write poolMeta fresh.
// looks like new pool was added we need to update,
// or this is a fresh installation (or an existing
// installation with pool removed)
meta.Version = poolMetaVersion
for idx, pool := range z.serverPools {
meta.Pools = append(meta.Pools, PoolStatus{
CmdLine: pool.endpoints.CmdLine,
ID: idx,
LastUpdate: UTCNow(),
})
}
if err = meta.save(ctx, z.serverPools); err != nil {
return err
}
z.poolMeta = meta
return nil return nil
} }

View File

@ -127,8 +127,8 @@ func TestPoolMetaValidate(t *testing.T) {
meta: meta, meta: meta,
pools: orderChangePools, pools: orderChangePools,
name: "Invalid-Orderchange", name: "Invalid-Orderchange",
expectedErr: true, expectedErr: false,
expectedUpdate: false, expectedUpdate: true,
}, },
{ {
meta: nmeta1, meta: nmeta1,
@ -148,8 +148,8 @@ func TestPoolMetaValidate(t *testing.T) {
meta: nmeta2, meta: nmeta2,
pools: reducedPools, pools: reducedPools,
name: "Invalid-Decom-Pending-Pool-Removal", name: "Invalid-Decom-Pending-Pool-Removal",
expectedErr: true, expectedErr: false,
expectedUpdate: false, expectedUpdate: true,
}, },
{ {
meta: nmeta1, meta: nmeta1,
@ -169,8 +169,8 @@ func TestPoolMetaValidate(t *testing.T) {
meta: nmeta2, meta: nmeta2,
pools: orderChangePools, pools: orderChangePools,
name: "Invalid-Orderchange-Decom", name: "Invalid-Orderchange-Decom",
expectedErr: true, expectedErr: false,
expectedUpdate: false, expectedUpdate: true,
}, },
} }