diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index 8461aa531..a8457d6da 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -45,11 +45,6 @@ import ( // setsDsyncLockers is encapsulated type for Close() type setsDsyncLockers [][]dsync.NetLocker -// Information of a new disk connection -type diskConnectInfo struct { - setIndex int -} - const envMinioDeleteCleanupInterval = "MINIO_DELETE_CLEANUP_INTERVAL" // erasureSets implements ObjectLayer combining a static list of erasure coded @@ -89,7 +84,9 @@ type erasureSets struct { poolIndex int - disksConnectEvent chan diskConnectInfo + // A channel to send the set index to the MRF when + // any disk belonging to that set is connected + setReconnectEvent chan int // Distribution algorithm of choice. distributionAlgo string @@ -199,6 +196,7 @@ func findDiskIndex(refFormat, format *formatErasureV3) (int, int, error) { // and re-arranges the disks in proper position. func (s *erasureSets) connectDisks() { var wg sync.WaitGroup + var setsJustConnected = make([]bool, s.setCount) diskMap := s.getDiskMap() for _, endpoint := range s.endpoints { diskPath := endpoint.String() @@ -253,19 +251,29 @@ func (s *erasureSets) connectDisks() { disk.SetDiskLoc(s.poolIndex, setIndex, diskIndex) s.endpointStrings[setIndex*s.setDriveCount+diskIndex] = disk.String() s.erasureDisksMu.Unlock() - go func(setIndex int) { - idler := time.NewTimer(100 * time.Millisecond) - defer idler.Stop() - - // Send a new disk connect event with a timeout - select { - case s.disksConnectEvent <- diskConnectInfo{setIndex: setIndex}: - case <-idler.C: - } - }(setIndex) + setsJustConnected[setIndex] = true }(endpoint) } + wg.Wait() + + go func() { + idler := time.NewTimer(100 * time.Millisecond) + defer idler.Stop() + + for setIndex, justConnected := range setsJustConnected { + if !justConnected { + continue + } + + // Send a new set connect event with a timeout + idler.Reset(100 * time.Millisecond) + select { + case s.setReconnectEvent <- setIndex: + case <-idler.C: + } + } + }() } // monitorAndConnectEndpoints this is a monitoring loop to keep track of disconnected @@ -354,7 +362,7 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto setDriveCount: setDriveCount, defaultParityCount: defaultParityCount, format: format, - disksConnectEvent: make(chan diskConnectInfo), + setReconnectEvent: make(chan int), distributionAlgo: format.Erasure.DistributionAlgo, deploymentID: uuid.MustParse(format.ID), mrfOperations: make(map[healSource]int), @@ -659,12 +667,12 @@ func (s *erasureSets) Shutdown(ctx context.Context) error { } } select { - case _, ok := <-s.disksConnectEvent: + case _, ok := <-s.setReconnectEvent: if ok { - close(s.disksConnectEvent) + close(s.setReconnectEvent) } default: - close(s.disksConnectEvent) + close(s.setReconnectEvent) } return nil } @@ -1353,47 +1361,25 @@ func (s *erasureSets) maintainMRFList() { bucket: fOp.bucket, object: fOp.object, versionID: fOp.versionID, + opts: &madmin.HealOpts{Remove: true}, }] = fOp.failedSet s.mrfMU.Unlock() } } -func toSourceChTimed(t *time.Timer, sourceCh chan healSource, u healSource) { - t.Reset(100 * time.Millisecond) - - // No defer, as we don't know which - // case will be selected - - select { - case sourceCh <- u: - case <-t.C: - return - } - - // We still need to check the return value - // of Stop, because t could have fired - // between the send on sourceCh and this line. - if !t.Stop() { - <-t.C - } -} - // healMRFRoutine monitors new disks connection, sweep the MRF list // to find objects related to the new disk that needs to be healed. func (s *erasureSets) healMRFRoutine() { // Wait until background heal state is initialized bgSeq := mustGetHealSequence(GlobalContext) - idler := time.NewTimer(100 * time.Millisecond) - defer idler.Stop() - - for e := range s.disksConnectEvent { + for setIndex := range s.setReconnectEvent { // Get the list of objects related the er.set // to which the connected disk belongs. var mrfOperations []healSource s.mrfMU.Lock() for k, v := range s.mrfOperations { - if v == e.setIndex { + if v == setIndex { mrfOperations = append(mrfOperations, k) } } @@ -1401,8 +1387,10 @@ func (s *erasureSets) healMRFRoutine() { // Heal objects for _, u := range mrfOperations { + waitForLowHTTPReq(globalHealConfig.IOCount, globalHealConfig.Sleep) + // Send an object to background heal - toSourceChTimed(idler, bgSeq.sourceCh, u) + bgSeq.sourceCh <- u s.mrfMU.Lock() delete(s.mrfOperations, u)