mirror of
https://github.com/minio/minio.git
synced 2025-02-03 01:46:00 -05:00
Fix various poolmeta races (#18230)
There is a fundamental race condition in `newErasureServerPools`, where setObjectLayer is called before the poolMeta has been loaded/populated. We add a placeholder value to this field but disable all saving of the value, so we don't risk overwriting the value on disk. Once the value has been loaded or created, it is replaced with the proper value, which will also be saved. Also fixes various accesses of `poolMeta` that were done without locks. We make the `poolMeta.IsSuspended` return false, even if we shouldn't risk out-of-bounds reads anymore.
This commit is contained in:
parent
409c391850
commit
9a877734b2
@ -160,6 +160,9 @@ func (ps PoolStatus) Clone() PoolStatus {
|
||||
type poolMeta struct {
|
||||
Version int `msg:"v"`
|
||||
Pools []PoolStatus `msg:"pls"`
|
||||
|
||||
// Value should not be saved when we have not loaded anything yet.
|
||||
dontSave bool `msg:"-"`
|
||||
}
|
||||
|
||||
// A decommission resumable tells us if decommission is worth
|
||||
@ -308,6 +311,10 @@ func (p *poolMeta) Decommission(idx int, pi poolSpaceInfo) error {
|
||||
}
|
||||
|
||||
func (p poolMeta) IsSuspended(idx int) bool {
|
||||
if idx >= len(p.Pools) {
|
||||
// We don't really know if the pool is suspended or not, since it doesn't exist.
|
||||
return false
|
||||
}
|
||||
return p.Pools[idx].Decommission != nil
|
||||
}
|
||||
|
||||
@ -445,6 +452,9 @@ func (p *poolMeta) updateAfter(ctx context.Context, idx int, pools []*erasureSet
|
||||
}
|
||||
|
||||
func (p poolMeta) save(ctx context.Context, pools []*erasureSets) error {
|
||||
if p.dontSave {
|
||||
return nil
|
||||
}
|
||||
data := make([]byte, 4, p.Msgsize()+4)
|
||||
|
||||
// Initialize the header.
|
||||
@ -499,36 +509,17 @@ func (z *erasureServerPools) Init(ctx context.Context) error {
|
||||
|
||||
// if no update is needed return right away.
|
||||
if !update {
|
||||
z.poolMetaMutex.Lock()
|
||||
z.poolMeta = meta
|
||||
z.poolMetaMutex.Unlock()
|
||||
} else {
|
||||
newMeta := poolMeta{} // to update write poolMeta fresh.
|
||||
// looks like new pool was added we need to update,
|
||||
// or this is a fresh installation (or an existing
|
||||
// installation with pool removed)
|
||||
newMeta.Version = poolMetaVersion
|
||||
for idx, pool := range z.serverPools {
|
||||
var skip bool
|
||||
for _, currentPool := range meta.Pools {
|
||||
// Preserve any current pool status.
|
||||
if currentPool.CmdLine == pool.endpoints.CmdLine {
|
||||
newMeta.Pools = append(newMeta.Pools, currentPool)
|
||||
skip = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if skip {
|
||||
continue
|
||||
}
|
||||
newMeta.Pools = append(newMeta.Pools, PoolStatus{
|
||||
CmdLine: pool.endpoints.CmdLine,
|
||||
ID: idx,
|
||||
LastUpdate: UTCNow(),
|
||||
})
|
||||
}
|
||||
newMeta := newPoolMeta(z, meta)
|
||||
if err = newMeta.save(ctx, z.serverPools); err != nil {
|
||||
return err
|
||||
}
|
||||
z.poolMetaMutex.Lock()
|
||||
z.poolMeta = newMeta
|
||||
z.poolMetaMutex.Unlock()
|
||||
}
|
||||
|
||||
pools := meta.returnResumablePools()
|
||||
@ -568,6 +559,34 @@ func (z *erasureServerPools) Init(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func newPoolMeta(z *erasureServerPools, prevMeta poolMeta) poolMeta {
|
||||
newMeta := poolMeta{} // to update write poolMeta fresh.
|
||||
// looks like new pool was added we need to update,
|
||||
// or this is a fresh installation (or an existing
|
||||
// installation with pool removed)
|
||||
newMeta.Version = poolMetaVersion
|
||||
for idx, pool := range z.serverPools {
|
||||
var skip bool
|
||||
for _, currentPool := range prevMeta.Pools {
|
||||
// Preserve any current pool status.
|
||||
if currentPool.CmdLine == pool.endpoints.CmdLine {
|
||||
newMeta.Pools = append(newMeta.Pools, currentPool)
|
||||
skip = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if skip {
|
||||
continue
|
||||
}
|
||||
newMeta.Pools = append(newMeta.Pools, PoolStatus{
|
||||
CmdLine: pool.endpoints.CmdLine,
|
||||
ID: idx,
|
||||
LastUpdate: UTCNow(),
|
||||
})
|
||||
}
|
||||
return newMeta
|
||||
}
|
||||
|
||||
func (z *erasureServerPools) IsDecommissionRunning() bool {
|
||||
z.poolMetaMutex.RLock()
|
||||
defer z.poolMetaMutex.RUnlock()
|
||||
@ -1003,8 +1022,15 @@ func (m *decomMetrics) log(d decomMetric, poolIdx int, paths ...string) func(err
|
||||
|
||||
func (z *erasureServerPools) decommissionInBackground(ctx context.Context, idx int) error {
|
||||
pool := z.serverPools[idx]
|
||||
for _, bucket := range z.poolMeta.PendingBuckets(idx) {
|
||||
if z.poolMeta.isBucketDecommissioned(idx, bucket.String()) {
|
||||
z.poolMetaMutex.RLock()
|
||||
pending := z.poolMeta.PendingBuckets(idx)
|
||||
z.poolMetaMutex.RUnlock()
|
||||
|
||||
for _, bucket := range pending {
|
||||
z.poolMetaMutex.RLock()
|
||||
isDecommissioned := z.poolMeta.isBucketDecommissioned(idx, bucket.String())
|
||||
z.poolMetaMutex.RUnlock()
|
||||
if isDecommissioned {
|
||||
if serverDebugLog {
|
||||
console.Debugln("decommission: already done, moving on", bucket)
|
||||
}
|
||||
|
@ -143,6 +143,12 @@ func newErasureServerPools(ctx context.Context, endpointServerPools EndpointServ
|
||||
|
||||
z.decommissionCancelers = make([]context.CancelFunc, len(z.serverPools))
|
||||
|
||||
// Initialize the pool meta, but set it to not save.
|
||||
// When z.Init below has loaded the poolmeta will be initialized,
|
||||
// and allowed to save.
|
||||
z.poolMeta = newPoolMeta(z, poolMeta{})
|
||||
z.poolMeta.dontSave = true
|
||||
|
||||
// initialize the object layer.
|
||||
setObjectLayer(z)
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user