diff --git a/buildscripts/verify-healing.sh b/buildscripts/verify-healing.sh index 6ede11d99..4a7be9e48 100755 --- a/buildscripts/verify-healing.sh +++ b/buildscripts/verify-healing.sh @@ -83,7 +83,7 @@ function start_minio_3_node() { } function check_online() { - if grep -q 'Unable to initialize sub-systems' ${WORK_DIR}/dist-minio-*.log; then + if ! grep -q 'Status:' ${WORK_DIR}/dist-minio-*.log; then echo "1" fi } @@ -109,6 +109,7 @@ function perform_test() { rm -rf ${WORK_DIR}/${1}/*/ + set -x start_minio_3_node 120 $2 rv=$(check_online) diff --git a/cmd/background-heal-ops.go b/cmd/background-heal-ops.go index 5568d0b5d..f4affaed9 100644 --- a/cmd/background-heal-ops.go +++ b/cmd/background-heal-ops.go @@ -102,7 +102,6 @@ func waitForLowHTTPReq() { func initBackgroundHealing(ctx context.Context, objAPI ObjectLayer) { // Run the background healer - globalBackgroundHealRoutine = newHealRoutine() for i := 0; i < globalBackgroundHealRoutine.workers; i++ { go globalBackgroundHealRoutine.AddWorker(ctx, objAPI) } diff --git a/cmd/background-newdisks-heal-ops.go b/cmd/background-newdisks-heal-ops.go index fba8c1799..a12641e90 100644 --- a/cmd/background-newdisks-heal-ops.go +++ b/cmd/background-newdisks-heal-ops.go @@ -376,26 +376,8 @@ func getLocalDisksToHeal() (disksToHeal Endpoints) { var newDiskHealingTimeout = newDynamicTimeout(30*time.Second, 10*time.Second) func healFreshDisk(ctx context.Context, z *erasureServerPools, endpoint Endpoint) error { - disk, format, _, err := connectEndpoint(endpoint) - if err != nil { - return fmt.Errorf("Error: %w, %s", err, endpoint) - } - defer disk.Close() - poolIdx := globalEndpoints.GetLocalPoolIdx(disk.Endpoint()) - if poolIdx < 0 { - return fmt.Errorf("unexpected pool index (%d) found for %s", poolIdx, disk.Endpoint()) - } - - // Calculate the set index where the current endpoint belongs - z.serverPools[poolIdx].erasureDisksMu.RLock() - setIdx, _, err := findDiskIndex(z.serverPools[poolIdx].format, format) - z.serverPools[poolIdx].erasureDisksMu.RUnlock() - if err != nil { - return err - } - if setIdx < 0 { - return fmt.Errorf("unexpected set index (%d) found for %s", setIdx, disk.Endpoint()) - } + poolIdx, setIdx := endpoint.PoolIdx, endpoint.SetIdx + disk := getStorageViaEndpoint(endpoint) // Prevent parallel erasure set healing locker := z.NewNSLock(minioMetaBucket, fmt.Sprintf("new-drive-healing/%d/%d", poolIdx, setIdx)) diff --git a/cmd/bucket-lifecycle.go b/cmd/bucket-lifecycle.go index f5270d380..2f4a50258 100644 --- a/cmd/bucket-lifecycle.go +++ b/cmd/bucket-lifecycle.go @@ -145,7 +145,7 @@ func (es *expiryState) enqueueByNewerNoncurrent(bucket string, versions []Object } } -var globalExpiryState *expiryState +var globalExpiryState = newExpiryState() func newExpiryState() *expiryState { return &expiryState{ @@ -155,8 +155,6 @@ func newExpiryState() *expiryState { } func initBackgroundExpiry(ctx context.Context, objectAPI ObjectLayer) { - globalExpiryState = newExpiryState() - workerSize, _ := strconv.Atoi(env.Get("_MINIO_ILM_EXPIRY_WORKERS", strconv.Itoa((runtime.GOMAXPROCS(0)+1)/2))) if workerSize == 0 { workerSize = 4 @@ -185,6 +183,7 @@ func initBackgroundExpiry(ctx context.Context, objectAPI ObjectLayer) { } ewk.Wait() }() + go func() { for t := range globalExpiryState.byNewerNoncurrentCh { nwk.Take() diff --git a/cmd/common-main.go b/cmd/common-main.go index 32abee71b..7cda7b023 100644 --- a/cmd/common-main.go +++ b/cmd/common-main.go @@ -88,8 +88,6 @@ func init() { logger.Init(GOPATH, GOROOT) logger.RegisterError(config.FmtError) - initGlobalContext() - globalBatchJobsMetrics = batchJobMetrics{metrics: make(map[string]*batchJobInfo)} go globalBatchJobsMetrics.purgeJobMetrics() diff --git a/cmd/data-scanner.go b/cmd/data-scanner.go index 86ed7e883..8ea6613e0 100644 --- a/cmd/data-scanner.go +++ b/cmd/data-scanner.go @@ -1020,7 +1020,7 @@ func (i *scannerItem) applyTierObjSweep(ctx context.Context, o ObjectLayer, oi O // applyNewerNoncurrentVersionLimit removes noncurrent versions older than the most recent NewerNoncurrentVersions configured. // Note: This function doesn't update sizeSummary since it always removes versions that it doesn't return. -func (i *scannerItem) applyNewerNoncurrentVersionLimit(ctx context.Context, _ ObjectLayer, fivs []FileInfo) ([]ObjectInfo, error) { +func (i *scannerItem) applyNewerNoncurrentVersionLimit(ctx context.Context, _ ObjectLayer, fivs []FileInfo, expState *expiryState) ([]ObjectInfo, error) { done := globalScannerMetrics.time(scannerMetricApplyNonCurrent) defer done() @@ -1087,14 +1087,14 @@ func (i *scannerItem) applyNewerNoncurrentVersionLimit(ctx context.Context, _ Ob }) } - globalExpiryState.enqueueByNewerNoncurrent(i.bucket, toDel, event) + expState.enqueueByNewerNoncurrent(i.bucket, toDel, event) return objectInfos, nil } // applyVersionActions will apply lifecycle checks on all versions of a scanned item. Returns versions that remain // after applying lifecycle checks configured. -func (i *scannerItem) applyVersionActions(ctx context.Context, o ObjectLayer, fivs []FileInfo) ([]ObjectInfo, error) { - objInfos, err := i.applyNewerNoncurrentVersionLimit(ctx, o, fivs) +func (i *scannerItem) applyVersionActions(ctx context.Context, o ObjectLayer, fivs []FileInfo, expState *expiryState) ([]ObjectInfo, error) { + objInfos, err := i.applyNewerNoncurrentVersionLimit(ctx, o, fivs, expState) if err != nil { return nil, err } diff --git a/cmd/data-scanner_test.go b/cmd/data-scanner_test.go index 92b444313..464feb805 100644 --- a/cmd/data-scanner_test.go +++ b/cmd/data-scanner_test.go @@ -39,13 +39,13 @@ func TestApplyNewerNoncurrentVersionsLimit(t *testing.T) { globalBucketMetadataSys = NewBucketMetadataSys() globalBucketObjectLockSys = &BucketObjectLockSys{} globalBucketVersioningSys = &BucketVersioningSys{} - globalExpiryState = newExpiryState() + expiryState := newExpiryState() var wg sync.WaitGroup wg.Add(1) expired := make([]ObjectToDelete, 0, 5) go func() { defer wg.Done() - for t := range globalExpiryState.byNewerNoncurrentCh { + for t := range expiryState.byNewerNoncurrentCh { expired = append(expired, t.versions...) } }() @@ -116,7 +116,7 @@ func TestApplyNewerNoncurrentVersionsLimit(t *testing.T) { for i, fi := range fivs[:2] { wants[i] = fi.ToObjectInfo(bucket, obj, versioned) } - gots, err := item.applyNewerNoncurrentVersionLimit(context.TODO(), objAPI, fivs) + gots, err := item.applyNewerNoncurrentVersionLimit(context.TODO(), objAPI, fivs, expiryState) if err != nil { t.Fatalf("Failed with err: %v", err) } @@ -125,7 +125,7 @@ func TestApplyNewerNoncurrentVersionsLimit(t *testing.T) { } // Close expiry state's channel to inspect object versions enqueued for expiration - close(globalExpiryState.byNewerNoncurrentCh) + close(expiryState.byNewerNoncurrentCh) wg.Wait() for _, obj := range expired { switch obj.ObjectV.VersionID { diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index 0e39b7854..2884a9e45 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -175,8 +175,29 @@ func newErasureServerPools(ctx context.Context, endpointServerPools EndpointServ z.poolMeta = newPoolMeta(z, poolMeta{}) z.poolMeta.dontSave = true + bootstrapTrace("newSharedLock", func() { + globalLeaderLock = newSharedLock(GlobalContext, z, "leader.lock") + }) + + // Enable background operations on + // + // - Disk auto healing + // - MRF (most recently failed) healing + // - Background expiration routine for lifecycle policies + bootstrapTrace("initAutoHeal", func() { + initAutoHeal(GlobalContext, z) + }) + + bootstrapTrace("initHealMRF", func() { + go globalMRFState.healRoutine(z) + }) + + bootstrapTrace("initBackgroundExpiry", func() { + initBackgroundExpiry(GlobalContext, z) + }) + // initialize the object layer. - setObjectLayer(z) + defer setObjectLayer(z) r := rand.New(rand.NewSource(time.Now().UnixNano())) attempt := 1 diff --git a/cmd/global-heal.go b/cmd/global-heal.go index 47e7ab2e5..7d311244f 100644 --- a/cmd/global-heal.go +++ b/cmd/global-heal.go @@ -224,7 +224,9 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, disks, _ := er.getOnlineDisksWithHealing(false) if len(disks) == 0 { - logger.LogIf(ctx, fmt.Errorf("no online disks found to heal the bucket `%s`", bucket)) + // No object healing necessary + tracker.bucketDone(bucket) + logger.LogIf(ctx, tracker.update(ctx)) continue } @@ -472,9 +474,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, func healBucket(bucket string, scan madmin.HealScanMode) error { // Get background heal sequence to send elements to heal - globalHealStateLK.Lock() bgSeq, ok := globalBackgroundHealState.getHealSequenceByToken(bgHealingUUID) - globalHealStateLK.Unlock() if ok { return bgSeq.queueHealTask(healSource{bucket: bucket}, madmin.HealItemBucket) } @@ -484,9 +484,7 @@ func healBucket(bucket string, scan madmin.HealScanMode) error { // healObject sends the given object/version to the background healing workers func healObject(bucket, object, versionID string, scan madmin.HealScanMode) error { // Get background heal sequence to send elements to heal - globalHealStateLK.Lock() bgSeq, ok := globalBackgroundHealState.getHealSequenceByToken(bgHealingUUID) - globalHealStateLK.Unlock() if ok { return bgSeq.queueHealTask(healSource{ bucket: bucket, diff --git a/cmd/globals.go b/cmd/globals.go index 24734e7e6..bcd1e6b57 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -379,13 +379,15 @@ var ( return *ptr } - globalAllHealState *allHealState + globalAllHealState = newHealState(GlobalContext, true) // The always present healing routine ready to heal objects - globalBackgroundHealRoutine *healRoutine - globalBackgroundHealState *allHealState + globalBackgroundHealRoutine = newHealRoutine() + globalBackgroundHealState = newHealState(GlobalContext, false) - globalMRFState mrfState + globalMRFState = mrfState{ + opCh: make(chan partialOperation, mrfOpsQueueSize), + } // If writes to FS backend should be O_SYNC. globalFSOSync bool diff --git a/cmd/mrf.go b/cmd/mrf.go index 7eccda884..bd82863e5 100644 --- a/cmd/mrf.go +++ b/cmd/mrf.go @@ -19,7 +19,6 @@ package cmd import ( "context" - "sync" "time" "github.com/minio/madmin-go/v3" @@ -44,37 +43,15 @@ type partialOperation struct { // mrfState sncapsulates all the information // related to the global background MRF. type mrfState struct { - ctx context.Context - pools *erasureServerPools - - mu sync.RWMutex opCh chan partialOperation } -// Initialize healing MRF subsystem -func (m *mrfState) init(ctx context.Context, objAPI ObjectLayer) { - m.mu.Lock() - defer m.mu.Unlock() - - m.ctx = ctx - m.opCh = make(chan partialOperation, mrfOpsQueueSize) - - var ok bool - m.pools, ok = objAPI.(*erasureServerPools) - if ok { - go m.healRoutine() - } -} - // Add a partial S3 operation (put/delete) when one or more disks are offline. func (m *mrfState) addPartialOp(op partialOperation) { if m == nil { return } - m.mu.RLock() - defer m.mu.RUnlock() - select { case m.opCh <- op: default: @@ -86,10 +63,10 @@ var healSleeper = newDynamicSleeper(5, time.Second, false) // healRoutine listens to new disks reconnection events and // issues healing requests for queued objects belonging to the // corresponding erasure set -func (m *mrfState) healRoutine() { +func (m *mrfState) healRoutine(z *erasureServerPools) { for { select { - case <-m.ctx.Done(): + case <-GlobalContext.Done(): return case u, ok := <-m.opCh: if !ok { @@ -115,7 +92,7 @@ func (m *mrfState) healRoutine() { healBucket(u.bucket, scan) } else { if u.allVersions { - m.pools.serverPools[u.poolIndex].sets[u.setIndex].listAndHeal(u.bucket, u.object, u.scanMode, healObjectVersionsDisparity) + z.serverPools[u.poolIndex].sets[u.setIndex].listAndHeal(u.bucket, u.object, u.scanMode, healObjectVersionsDisparity) } else { healObject(u.bucket, u.object, u.versionID, scan) } @@ -125,8 +102,3 @@ func (m *mrfState) healRoutine() { } } } - -// Initialize healing MRF -func initHealMRF(ctx context.Context, obj ObjectLayer) { - globalMRFState.init(ctx, obj) -} diff --git a/cmd/server-main.go b/cmd/server-main.go index 9001dc02b..01e21cb0d 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -30,7 +30,6 @@ import ( "os/signal" "runtime" "strings" - "sync" "syscall" "time" @@ -372,15 +371,7 @@ func serverHandleCmdArgs(ctxt serverCtxt) { globalConnWriteDeadline = ctxt.ConnWriteDeadline } -var globalHealStateLK sync.RWMutex - func initAllSubsystems(ctx context.Context) { - globalHealStateLK.Lock() - // New global heal state - globalAllHealState = newHealState(ctx, true) - globalBackgroundHealState = newHealState(ctx, false) - globalHealStateLK.Unlock() - // Initialize notification peer targets globalNotificationSys = NewNotificationSys(globalEndpoints) @@ -814,27 +805,6 @@ func serverMain(ctx *cli.Context) { globalNodeNamesHex[hex.EncodeToString(nodeNameSum[:])] = struct{}{} } - bootstrapTrace("newSharedLock", func() { - globalLeaderLock = newSharedLock(GlobalContext, newObject, "leader.lock") - }) - - // Enable background operations on - // - // - Disk auto healing - // - MRF (most recently failed) healing - // - Background expiration routine for lifecycle policies - bootstrapTrace("initAutoHeal", func() { - initAutoHeal(GlobalContext, newObject) - }) - - bootstrapTrace("initHealMRF", func() { - initHealMRF(GlobalContext, newObject) - }) - - bootstrapTrace("initBackgroundExpiry", func() { - initBackgroundExpiry(GlobalContext, newObject) - }) - var err error bootstrapTrace("initServerConfig", func() { if err = initServerConfig(GlobalContext, newObject); err != nil { diff --git a/cmd/service.go b/cmd/service.go index ebe7394b5..5561cfc9f 100644 --- a/cmd/service.go +++ b/cmd/service.go @@ -40,18 +40,11 @@ const ( ) // Global service signal channel. -var globalServiceSignalCh chan serviceSignal +var globalServiceSignalCh = make(chan serviceSignal) // GlobalContext context that is canceled when server is requested to shut down. -var GlobalContext context.Context - // cancelGlobalContext can be used to indicate server shutdown. -var cancelGlobalContext context.CancelFunc - -func initGlobalContext() { - GlobalContext, cancelGlobalContext = context.WithCancel(context.Background()) - globalServiceSignalCh = make(chan serviceSignal) -} +var GlobalContext, cancelGlobalContext = context.WithCancel(context.Background()) // restartProcess starts a new process passing it the active fd's. It // doesn't fork, but starts a new process using the same environment and diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index 7f57074fe..cd4d3da5d 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -54,7 +54,7 @@ var errDiskStale = errors.New("drive stale") // To abstract a disk over network. type storageRESTServer struct { - poolIndex, setIndex, diskIndex int + endpoint Endpoint } var ( @@ -74,10 +74,14 @@ var ( storageListDirRPC = grid.NewStream[*grid.MSS, grid.NoPayload, *ListDirResult](grid.HandlerListDir, grid.NewMSS, nil, func() *ListDirResult { return &ListDirResult{} }).WithOutCapacity(1) ) -func (s *storageRESTServer) getStorage() StorageAPI { +func getStorageViaEndpoint(endpoint Endpoint) StorageAPI { globalLocalDrivesMu.RLock() defer globalLocalDrivesMu.RUnlock() - return globalLocalSetDrives[s.poolIndex][s.setIndex][s.diskIndex] + return globalLocalSetDrives[endpoint.PoolIdx][endpoint.SetIdx][endpoint.DiskIdx] +} + +func (s *storageRESTServer) getStorage() StorageAPI { + return getStorageViaEndpoint(s.endpoint) } func (s *storageRESTServer) writeErrorResponse(w http.ResponseWriter, err error) { @@ -1287,9 +1291,7 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerPools Endpoin } server := &storageRESTServer{ - poolIndex: endpoint.PoolIdx, - setIndex: endpoint.SetIdx, - diskIndex: endpoint.DiskIdx, + endpoint: endpoint, } subrouter := router.PathPrefix(path.Join(storageRESTPrefix, endpoint.Path)).Subrouter() diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index b74de4328..c84437e38 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -581,7 +581,7 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates } done := globalScannerMetrics.time(scannerMetricApplyAll) - objInfos, err := item.applyVersionActions(ctx, objAPI, fivs.Versions) + objInfos, err := item.applyVersionActions(ctx, objAPI, fivs.Versions, globalExpiryState) done() if err != nil {