mirror of
https://github.com/minio/minio.git
synced 2025-01-23 12:43:16 -05:00
initialize the disk healer early on (#19143)
This PR fixes a bug that perhaps has been long introduced, with no visible workarounds. In any deployment, if an entire erasure set is deleted, there is no way the cluster recovers.
This commit is contained in:
parent
0aae0180fb
commit
9a012a53ef
@ -83,7 +83,7 @@ function start_minio_3_node() {
|
||||
}
|
||||
|
||||
function check_online() {
|
||||
if grep -q 'Unable to initialize sub-systems' ${WORK_DIR}/dist-minio-*.log; then
|
||||
if ! grep -q 'Status:' ${WORK_DIR}/dist-minio-*.log; then
|
||||
echo "1"
|
||||
fi
|
||||
}
|
||||
@ -109,6 +109,7 @@ function perform_test() {
|
||||
|
||||
rm -rf ${WORK_DIR}/${1}/*/
|
||||
|
||||
set -x
|
||||
start_minio_3_node 120 $2
|
||||
|
||||
rv=$(check_online)
|
||||
|
@ -102,7 +102,6 @@ func waitForLowHTTPReq() {
|
||||
|
||||
func initBackgroundHealing(ctx context.Context, objAPI ObjectLayer) {
|
||||
// Run the background healer
|
||||
globalBackgroundHealRoutine = newHealRoutine()
|
||||
for i := 0; i < globalBackgroundHealRoutine.workers; i++ {
|
||||
go globalBackgroundHealRoutine.AddWorker(ctx, objAPI)
|
||||
}
|
||||
|
@ -376,26 +376,8 @@ func getLocalDisksToHeal() (disksToHeal Endpoints) {
|
||||
var newDiskHealingTimeout = newDynamicTimeout(30*time.Second, 10*time.Second)
|
||||
|
||||
func healFreshDisk(ctx context.Context, z *erasureServerPools, endpoint Endpoint) error {
|
||||
disk, format, _, err := connectEndpoint(endpoint)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error: %w, %s", err, endpoint)
|
||||
}
|
||||
defer disk.Close()
|
||||
poolIdx := globalEndpoints.GetLocalPoolIdx(disk.Endpoint())
|
||||
if poolIdx < 0 {
|
||||
return fmt.Errorf("unexpected pool index (%d) found for %s", poolIdx, disk.Endpoint())
|
||||
}
|
||||
|
||||
// Calculate the set index where the current endpoint belongs
|
||||
z.serverPools[poolIdx].erasureDisksMu.RLock()
|
||||
setIdx, _, err := findDiskIndex(z.serverPools[poolIdx].format, format)
|
||||
z.serverPools[poolIdx].erasureDisksMu.RUnlock()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if setIdx < 0 {
|
||||
return fmt.Errorf("unexpected set index (%d) found for %s", setIdx, disk.Endpoint())
|
||||
}
|
||||
poolIdx, setIdx := endpoint.PoolIdx, endpoint.SetIdx
|
||||
disk := getStorageViaEndpoint(endpoint)
|
||||
|
||||
// Prevent parallel erasure set healing
|
||||
locker := z.NewNSLock(minioMetaBucket, fmt.Sprintf("new-drive-healing/%d/%d", poolIdx, setIdx))
|
||||
|
@ -145,7 +145,7 @@ func (es *expiryState) enqueueByNewerNoncurrent(bucket string, versions []Object
|
||||
}
|
||||
}
|
||||
|
||||
var globalExpiryState *expiryState
|
||||
var globalExpiryState = newExpiryState()
|
||||
|
||||
func newExpiryState() *expiryState {
|
||||
return &expiryState{
|
||||
@ -155,8 +155,6 @@ func newExpiryState() *expiryState {
|
||||
}
|
||||
|
||||
func initBackgroundExpiry(ctx context.Context, objectAPI ObjectLayer) {
|
||||
globalExpiryState = newExpiryState()
|
||||
|
||||
workerSize, _ := strconv.Atoi(env.Get("_MINIO_ILM_EXPIRY_WORKERS", strconv.Itoa((runtime.GOMAXPROCS(0)+1)/2)))
|
||||
if workerSize == 0 {
|
||||
workerSize = 4
|
||||
@ -185,6 +183,7 @@ func initBackgroundExpiry(ctx context.Context, objectAPI ObjectLayer) {
|
||||
}
|
||||
ewk.Wait()
|
||||
}()
|
||||
|
||||
go func() {
|
||||
for t := range globalExpiryState.byNewerNoncurrentCh {
|
||||
nwk.Take()
|
||||
|
@ -88,8 +88,6 @@ func init() {
|
||||
logger.Init(GOPATH, GOROOT)
|
||||
logger.RegisterError(config.FmtError)
|
||||
|
||||
initGlobalContext()
|
||||
|
||||
globalBatchJobsMetrics = batchJobMetrics{metrics: make(map[string]*batchJobInfo)}
|
||||
go globalBatchJobsMetrics.purgeJobMetrics()
|
||||
|
||||
|
@ -1020,7 +1020,7 @@ func (i *scannerItem) applyTierObjSweep(ctx context.Context, o ObjectLayer, oi O
|
||||
|
||||
// applyNewerNoncurrentVersionLimit removes noncurrent versions older than the most recent NewerNoncurrentVersions configured.
|
||||
// Note: This function doesn't update sizeSummary since it always removes versions that it doesn't return.
|
||||
func (i *scannerItem) applyNewerNoncurrentVersionLimit(ctx context.Context, _ ObjectLayer, fivs []FileInfo) ([]ObjectInfo, error) {
|
||||
func (i *scannerItem) applyNewerNoncurrentVersionLimit(ctx context.Context, _ ObjectLayer, fivs []FileInfo, expState *expiryState) ([]ObjectInfo, error) {
|
||||
done := globalScannerMetrics.time(scannerMetricApplyNonCurrent)
|
||||
defer done()
|
||||
|
||||
@ -1087,14 +1087,14 @@ func (i *scannerItem) applyNewerNoncurrentVersionLimit(ctx context.Context, _ Ob
|
||||
})
|
||||
}
|
||||
|
||||
globalExpiryState.enqueueByNewerNoncurrent(i.bucket, toDel, event)
|
||||
expState.enqueueByNewerNoncurrent(i.bucket, toDel, event)
|
||||
return objectInfos, nil
|
||||
}
|
||||
|
||||
// applyVersionActions will apply lifecycle checks on all versions of a scanned item. Returns versions that remain
|
||||
// after applying lifecycle checks configured.
|
||||
func (i *scannerItem) applyVersionActions(ctx context.Context, o ObjectLayer, fivs []FileInfo) ([]ObjectInfo, error) {
|
||||
objInfos, err := i.applyNewerNoncurrentVersionLimit(ctx, o, fivs)
|
||||
func (i *scannerItem) applyVersionActions(ctx context.Context, o ObjectLayer, fivs []FileInfo, expState *expiryState) ([]ObjectInfo, error) {
|
||||
objInfos, err := i.applyNewerNoncurrentVersionLimit(ctx, o, fivs, expState)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -39,13 +39,13 @@ func TestApplyNewerNoncurrentVersionsLimit(t *testing.T) {
|
||||
globalBucketMetadataSys = NewBucketMetadataSys()
|
||||
globalBucketObjectLockSys = &BucketObjectLockSys{}
|
||||
globalBucketVersioningSys = &BucketVersioningSys{}
|
||||
globalExpiryState = newExpiryState()
|
||||
expiryState := newExpiryState()
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
expired := make([]ObjectToDelete, 0, 5)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for t := range globalExpiryState.byNewerNoncurrentCh {
|
||||
for t := range expiryState.byNewerNoncurrentCh {
|
||||
expired = append(expired, t.versions...)
|
||||
}
|
||||
}()
|
||||
@ -116,7 +116,7 @@ func TestApplyNewerNoncurrentVersionsLimit(t *testing.T) {
|
||||
for i, fi := range fivs[:2] {
|
||||
wants[i] = fi.ToObjectInfo(bucket, obj, versioned)
|
||||
}
|
||||
gots, err := item.applyNewerNoncurrentVersionLimit(context.TODO(), objAPI, fivs)
|
||||
gots, err := item.applyNewerNoncurrentVersionLimit(context.TODO(), objAPI, fivs, expiryState)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed with err: %v", err)
|
||||
}
|
||||
@ -125,7 +125,7 @@ func TestApplyNewerNoncurrentVersionsLimit(t *testing.T) {
|
||||
}
|
||||
|
||||
// Close expiry state's channel to inspect object versions enqueued for expiration
|
||||
close(globalExpiryState.byNewerNoncurrentCh)
|
||||
close(expiryState.byNewerNoncurrentCh)
|
||||
wg.Wait()
|
||||
for _, obj := range expired {
|
||||
switch obj.ObjectV.VersionID {
|
||||
|
@ -175,8 +175,29 @@ func newErasureServerPools(ctx context.Context, endpointServerPools EndpointServ
|
||||
z.poolMeta = newPoolMeta(z, poolMeta{})
|
||||
z.poolMeta.dontSave = true
|
||||
|
||||
bootstrapTrace("newSharedLock", func() {
|
||||
globalLeaderLock = newSharedLock(GlobalContext, z, "leader.lock")
|
||||
})
|
||||
|
||||
// Enable background operations on
|
||||
//
|
||||
// - Disk auto healing
|
||||
// - MRF (most recently failed) healing
|
||||
// - Background expiration routine for lifecycle policies
|
||||
bootstrapTrace("initAutoHeal", func() {
|
||||
initAutoHeal(GlobalContext, z)
|
||||
})
|
||||
|
||||
bootstrapTrace("initHealMRF", func() {
|
||||
go globalMRFState.healRoutine(z)
|
||||
})
|
||||
|
||||
bootstrapTrace("initBackgroundExpiry", func() {
|
||||
initBackgroundExpiry(GlobalContext, z)
|
||||
})
|
||||
|
||||
// initialize the object layer.
|
||||
setObjectLayer(z)
|
||||
defer setObjectLayer(z)
|
||||
|
||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
attempt := 1
|
||||
|
@ -224,7 +224,9 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
|
||||
|
||||
disks, _ := er.getOnlineDisksWithHealing(false)
|
||||
if len(disks) == 0 {
|
||||
logger.LogIf(ctx, fmt.Errorf("no online disks found to heal the bucket `%s`", bucket))
|
||||
// No object healing necessary
|
||||
tracker.bucketDone(bucket)
|
||||
logger.LogIf(ctx, tracker.update(ctx))
|
||||
continue
|
||||
}
|
||||
|
||||
@ -472,9 +474,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
|
||||
|
||||
func healBucket(bucket string, scan madmin.HealScanMode) error {
|
||||
// Get background heal sequence to send elements to heal
|
||||
globalHealStateLK.Lock()
|
||||
bgSeq, ok := globalBackgroundHealState.getHealSequenceByToken(bgHealingUUID)
|
||||
globalHealStateLK.Unlock()
|
||||
if ok {
|
||||
return bgSeq.queueHealTask(healSource{bucket: bucket}, madmin.HealItemBucket)
|
||||
}
|
||||
@ -484,9 +484,7 @@ func healBucket(bucket string, scan madmin.HealScanMode) error {
|
||||
// healObject sends the given object/version to the background healing workers
|
||||
func healObject(bucket, object, versionID string, scan madmin.HealScanMode) error {
|
||||
// Get background heal sequence to send elements to heal
|
||||
globalHealStateLK.Lock()
|
||||
bgSeq, ok := globalBackgroundHealState.getHealSequenceByToken(bgHealingUUID)
|
||||
globalHealStateLK.Unlock()
|
||||
if ok {
|
||||
return bgSeq.queueHealTask(healSource{
|
||||
bucket: bucket,
|
||||
|
@ -379,13 +379,15 @@ var (
|
||||
return *ptr
|
||||
}
|
||||
|
||||
globalAllHealState *allHealState
|
||||
globalAllHealState = newHealState(GlobalContext, true)
|
||||
|
||||
// The always present healing routine ready to heal objects
|
||||
globalBackgroundHealRoutine *healRoutine
|
||||
globalBackgroundHealState *allHealState
|
||||
globalBackgroundHealRoutine = newHealRoutine()
|
||||
globalBackgroundHealState = newHealState(GlobalContext, false)
|
||||
|
||||
globalMRFState mrfState
|
||||
globalMRFState = mrfState{
|
||||
opCh: make(chan partialOperation, mrfOpsQueueSize),
|
||||
}
|
||||
|
||||
// If writes to FS backend should be O_SYNC.
|
||||
globalFSOSync bool
|
||||
|
34
cmd/mrf.go
34
cmd/mrf.go
@ -19,7 +19,6 @@ package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/minio/madmin-go/v3"
|
||||
@ -44,37 +43,15 @@ type partialOperation struct {
|
||||
// mrfState sncapsulates all the information
|
||||
// related to the global background MRF.
|
||||
type mrfState struct {
|
||||
ctx context.Context
|
||||
pools *erasureServerPools
|
||||
|
||||
mu sync.RWMutex
|
||||
opCh chan partialOperation
|
||||
}
|
||||
|
||||
// Initialize healing MRF subsystem
|
||||
func (m *mrfState) init(ctx context.Context, objAPI ObjectLayer) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
m.ctx = ctx
|
||||
m.opCh = make(chan partialOperation, mrfOpsQueueSize)
|
||||
|
||||
var ok bool
|
||||
m.pools, ok = objAPI.(*erasureServerPools)
|
||||
if ok {
|
||||
go m.healRoutine()
|
||||
}
|
||||
}
|
||||
|
||||
// Add a partial S3 operation (put/delete) when one or more disks are offline.
|
||||
func (m *mrfState) addPartialOp(op partialOperation) {
|
||||
if m == nil {
|
||||
return
|
||||
}
|
||||
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
||||
select {
|
||||
case m.opCh <- op:
|
||||
default:
|
||||
@ -86,10 +63,10 @@ var healSleeper = newDynamicSleeper(5, time.Second, false)
|
||||
// healRoutine listens to new disks reconnection events and
|
||||
// issues healing requests for queued objects belonging to the
|
||||
// corresponding erasure set
|
||||
func (m *mrfState) healRoutine() {
|
||||
func (m *mrfState) healRoutine(z *erasureServerPools) {
|
||||
for {
|
||||
select {
|
||||
case <-m.ctx.Done():
|
||||
case <-GlobalContext.Done():
|
||||
return
|
||||
case u, ok := <-m.opCh:
|
||||
if !ok {
|
||||
@ -115,7 +92,7 @@ func (m *mrfState) healRoutine() {
|
||||
healBucket(u.bucket, scan)
|
||||
} else {
|
||||
if u.allVersions {
|
||||
m.pools.serverPools[u.poolIndex].sets[u.setIndex].listAndHeal(u.bucket, u.object, u.scanMode, healObjectVersionsDisparity)
|
||||
z.serverPools[u.poolIndex].sets[u.setIndex].listAndHeal(u.bucket, u.object, u.scanMode, healObjectVersionsDisparity)
|
||||
} else {
|
||||
healObject(u.bucket, u.object, u.versionID, scan)
|
||||
}
|
||||
@ -125,8 +102,3 @@ func (m *mrfState) healRoutine() {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize healing MRF
|
||||
func initHealMRF(ctx context.Context, obj ObjectLayer) {
|
||||
globalMRFState.init(ctx, obj)
|
||||
}
|
||||
|
@ -30,7 +30,6 @@ import (
|
||||
"os/signal"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
@ -372,15 +371,7 @@ func serverHandleCmdArgs(ctxt serverCtxt) {
|
||||
globalConnWriteDeadline = ctxt.ConnWriteDeadline
|
||||
}
|
||||
|
||||
var globalHealStateLK sync.RWMutex
|
||||
|
||||
func initAllSubsystems(ctx context.Context) {
|
||||
globalHealStateLK.Lock()
|
||||
// New global heal state
|
||||
globalAllHealState = newHealState(ctx, true)
|
||||
globalBackgroundHealState = newHealState(ctx, false)
|
||||
globalHealStateLK.Unlock()
|
||||
|
||||
// Initialize notification peer targets
|
||||
globalNotificationSys = NewNotificationSys(globalEndpoints)
|
||||
|
||||
@ -814,27 +805,6 @@ func serverMain(ctx *cli.Context) {
|
||||
globalNodeNamesHex[hex.EncodeToString(nodeNameSum[:])] = struct{}{}
|
||||
}
|
||||
|
||||
bootstrapTrace("newSharedLock", func() {
|
||||
globalLeaderLock = newSharedLock(GlobalContext, newObject, "leader.lock")
|
||||
})
|
||||
|
||||
// Enable background operations on
|
||||
//
|
||||
// - Disk auto healing
|
||||
// - MRF (most recently failed) healing
|
||||
// - Background expiration routine for lifecycle policies
|
||||
bootstrapTrace("initAutoHeal", func() {
|
||||
initAutoHeal(GlobalContext, newObject)
|
||||
})
|
||||
|
||||
bootstrapTrace("initHealMRF", func() {
|
||||
initHealMRF(GlobalContext, newObject)
|
||||
})
|
||||
|
||||
bootstrapTrace("initBackgroundExpiry", func() {
|
||||
initBackgroundExpiry(GlobalContext, newObject)
|
||||
})
|
||||
|
||||
var err error
|
||||
bootstrapTrace("initServerConfig", func() {
|
||||
if err = initServerConfig(GlobalContext, newObject); err != nil {
|
||||
|
@ -40,18 +40,11 @@ const (
|
||||
)
|
||||
|
||||
// Global service signal channel.
|
||||
var globalServiceSignalCh chan serviceSignal
|
||||
var globalServiceSignalCh = make(chan serviceSignal)
|
||||
|
||||
// GlobalContext context that is canceled when server is requested to shut down.
|
||||
var GlobalContext context.Context
|
||||
|
||||
// cancelGlobalContext can be used to indicate server shutdown.
|
||||
var cancelGlobalContext context.CancelFunc
|
||||
|
||||
func initGlobalContext() {
|
||||
GlobalContext, cancelGlobalContext = context.WithCancel(context.Background())
|
||||
globalServiceSignalCh = make(chan serviceSignal)
|
||||
}
|
||||
var GlobalContext, cancelGlobalContext = context.WithCancel(context.Background())
|
||||
|
||||
// restartProcess starts a new process passing it the active fd's. It
|
||||
// doesn't fork, but starts a new process using the same environment and
|
||||
|
@ -54,7 +54,7 @@ var errDiskStale = errors.New("drive stale")
|
||||
|
||||
// To abstract a disk over network.
|
||||
type storageRESTServer struct {
|
||||
poolIndex, setIndex, diskIndex int
|
||||
endpoint Endpoint
|
||||
}
|
||||
|
||||
var (
|
||||
@ -74,10 +74,14 @@ var (
|
||||
storageListDirRPC = grid.NewStream[*grid.MSS, grid.NoPayload, *ListDirResult](grid.HandlerListDir, grid.NewMSS, nil, func() *ListDirResult { return &ListDirResult{} }).WithOutCapacity(1)
|
||||
)
|
||||
|
||||
func (s *storageRESTServer) getStorage() StorageAPI {
|
||||
func getStorageViaEndpoint(endpoint Endpoint) StorageAPI {
|
||||
globalLocalDrivesMu.RLock()
|
||||
defer globalLocalDrivesMu.RUnlock()
|
||||
return globalLocalSetDrives[s.poolIndex][s.setIndex][s.diskIndex]
|
||||
return globalLocalSetDrives[endpoint.PoolIdx][endpoint.SetIdx][endpoint.DiskIdx]
|
||||
}
|
||||
|
||||
func (s *storageRESTServer) getStorage() StorageAPI {
|
||||
return getStorageViaEndpoint(s.endpoint)
|
||||
}
|
||||
|
||||
func (s *storageRESTServer) writeErrorResponse(w http.ResponseWriter, err error) {
|
||||
@ -1287,9 +1291,7 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerPools Endpoin
|
||||
}
|
||||
|
||||
server := &storageRESTServer{
|
||||
poolIndex: endpoint.PoolIdx,
|
||||
setIndex: endpoint.SetIdx,
|
||||
diskIndex: endpoint.DiskIdx,
|
||||
endpoint: endpoint,
|
||||
}
|
||||
|
||||
subrouter := router.PathPrefix(path.Join(storageRESTPrefix, endpoint.Path)).Subrouter()
|
||||
|
@ -581,7 +581,7 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates
|
||||
}
|
||||
|
||||
done := globalScannerMetrics.time(scannerMetricApplyAll)
|
||||
objInfos, err := item.applyVersionActions(ctx, objAPI, fivs.Versions)
|
||||
objInfos, err := item.applyVersionActions(ctx, objAPI, fivs.Versions, globalExpiryState)
|
||||
done()
|
||||
|
||||
if err != nil {
|
||||
|
Loading…
x
Reference in New Issue
Block a user