diff --git a/cmd/admin-heal-ops.go b/cmd/admin-heal-ops.go index b9402a5aa..1af92237d 100644 --- a/cmd/admin-heal-ops.go +++ b/cmd/admin-heal-ops.go @@ -869,13 +869,8 @@ func (h *healSequence) healBucket(objAPI ObjectLayer, bucket string, bucketsOnly if !h.settings.Recursive { if h.object != "" { - // Check if an object named as the objPrefix exists, - // and if so heal it. - oi, err := objAPI.GetObjectInfo(h.ctx, bucket, h.object, ObjectOptions{}) - if err == nil { - if err = h.healObject(bucket, h.object, oi.VersionID); err != nil { - return err - } + if err := h.healObject(bucket, h.object, ""); err != nil { + return err } } diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index 1435109ea..f52df3678 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -1227,7 +1227,7 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str if disk != nil && disk.IsOnline() { continue } - er.addPartial(bucket, object, fi.VersionID, fi.Size) + er.addPartial(bucket, object, fi.VersionID) break } diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index dc5a56465..5da03a30f 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -29,6 +29,7 @@ import ( "strconv" "strings" "sync" + "time" "github.com/klauspost/readahead" "github.com/minio/madmin-go/v2" @@ -1299,7 +1300,7 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st continue } - er.addPartial(bucket, object, fi.VersionID, fi.Size) + er.addPartial(bucket, object, fi.VersionID) break } @@ -1491,7 +1492,7 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec if errs[i] != nil && !isErrVersionNotFound(errs[i]) && !isErrObjectNotFound(errs[i]) { // all other direct versionId references we should // ensure no dangling file is left over. - er.addPartial(bucket, dobj.ObjectName, dobj.VersionID, -1) + er.addPartial(bucket, dobj.ObjectName, dobj.VersionID) continue } @@ -1504,7 +1505,7 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec // all other direct versionId references we should // ensure no dangling file is left over. - er.addPartial(bucket, dobj.ObjectName, dobj.VersionID, -1) + er.addPartial(bucket, dobj.ObjectName, dobj.VersionID) break } } @@ -1667,7 +1668,7 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string if disk != nil && disk.IsOnline() { continue } - er.addPartial(bucket, object, opts.VersionID, -1) + er.addPartial(bucket, object, opts.VersionID) break } }() @@ -1724,14 +1725,12 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string // Send the successful but partial upload/delete, however ignore // if the channel is blocked by other items. -func (er erasureObjects) addPartial(bucket, object, versionID string, size int64) { +func (er erasureObjects) addPartial(bucket, object, versionID string) { globalMRFState.addPartialOp(partialOperation{ bucket: bucket, object: object, versionID: versionID, - size: size, - setIndex: er.setIndex, - poolIndex: er.poolIndex, + queued: time.Now(), }) } @@ -1998,7 +1997,7 @@ func (er erasureObjects) TransitionObject(ctx context.Context, bucket, object st if disk != nil && disk.IsOnline() { continue } - er.addPartial(bucket, object, opts.VersionID, -1) + er.addPartial(bucket, object, opts.VersionID) break } diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index 1bbb7e9e6..db9738cec 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -190,7 +190,6 @@ func (s *erasureSets) connectDisks() { var wg sync.WaitGroup diskMap := s.getDiskMap() - setsJustConnected := make([]bool, s.setCount) for _, endpoint := range s.endpoints.Endpoints { cdisk := diskMap[endpoint] if cdisk != nil && cdisk.IsOnline() { @@ -203,8 +202,6 @@ func (s *erasureSets) connectDisks() { // putting it back into the s.erasureDisks by re-placing the disk again. _, setIndex, _ := cdisk.GetDiskLoc() if setIndex != -1 { - // Recently disconnected disks must go to MRF - setsJustConnected[setIndex] = cdisk.LastConn().After(s.lastConnectDisksOpTime) continue } } @@ -260,21 +257,11 @@ func (s *erasureSets) connectDisks() { s.erasureDisks[setIndex][diskIndex] = disk } disk.SetDiskLoc(s.poolIndex, setIndex, diskIndex) - setsJustConnected[setIndex] = true // disk just went online we treat it is as MRF event s.erasureDisksMu.Unlock() }(endpoint) } wg.Wait() - - go func() { - for setIndex, justConnected := range setsJustConnected { - if !justConnected { - continue - } - globalMRFState.newSetReconnected(s.poolIndex, setIndex) - } - }() } // monitorAndConnectEndpoints this is a monitoring loop to keep track of disconnected diff --git a/cmd/global-heal.go b/cmd/global-heal.go index d92e2ab57..b2e3d166d 100644 --- a/cmd/global-heal.go +++ b/cmd/global-heal.go @@ -81,12 +81,6 @@ func getLocalBackgroundHealStatus(ctx context.Context, o ObjectLayer) (madmin.Bg ScannedItemsCount: bgSeq.getScannedItemsCount(), } - if globalMRFState.initialized() { - status.MRF = map[string]madmin.MRFStatus{ - globalLocalNodeName: globalMRFState.getCurrentMRFRoundInfo(), - } - } - healDisksMap := map[string]struct{}{} for _, ep := range getLocalDisksToHeal() { healDisksMap[ep.String()] = struct{}{} @@ -421,15 +415,25 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, return retErr } -// healObject sends the given object/version to the background healing workers -// and only returns when healing of the object is done. -func healObject(bucket, object, versionID string, scan madmin.HealScanMode) { +func healBucket(bucket string, scan madmin.HealScanMode) error { // Get background heal sequence to send elements to heal globalHealStateLK.Lock() bgSeq, ok := globalBackgroundHealState.getHealSequenceByToken(bgHealingUUID) globalHealStateLK.Unlock() if ok { - bgSeq.queueHealTask(healSource{ + return bgSeq.queueHealTask(healSource{bucket: bucket}, madmin.HealItemBucket) + } + return nil +} + +// healObject sends the given object/version to the background healing workers +func healObject(bucket, object, versionID string, scan madmin.HealScanMode) error { + // Get background heal sequence to send elements to heal + globalHealStateLK.Lock() + bgSeq, ok := globalBackgroundHealState.getHealSequenceByToken(bgHealingUUID) + globalHealStateLK.Unlock() + if ok { + return bgSeq.queueHealTask(healSource{ bucket: bucket, object: object, versionID: versionID, @@ -440,4 +444,5 @@ func healObject(bucket, object, versionID string, scan madmin.HealScanMode) { }, }, madmin.HealItemObject) } + return nil } diff --git a/cmd/mrf.go b/cmd/mrf.go index f3f79ab1f..16434ef48 100644 --- a/cmd/mrf.go +++ b/cmd/mrf.go @@ -20,16 +20,13 @@ package cmd import ( "context" "sync" - "sync/atomic" "time" "github.com/minio/madmin-go/v2" - "github.com/minio/minio/internal/logger" ) const ( - mrfInfoResetInterval = 10 * time.Second - mrfOpsQueueSize = 10000 + mrfOpsQueueSize = 100000 ) // partialOperation is a successful upload/delete of an object @@ -38,35 +35,17 @@ type partialOperation struct { bucket string object string versionID string - size int64 - setIndex int - poolIndex int -} - -type setInfo struct { - index, pool int + queued time.Time } // mrfState sncapsulates all the information // related to the global background MRF. type mrfState struct { - ready int32 // ref: https://golang.org/pkg/sync/atomic/#pkg-note-BUG - _ int32 // For 64 bits alignment - ctx context.Context objectAPI ObjectLayer - mu sync.Mutex - opCh chan partialOperation - pendingOps map[partialOperation]setInfo - setReconnectEvent chan setInfo - - itemsHealed uint64 - bytesHealed uint64 - pendingItems uint64 - pendingBytes uint64 - - triggeredAt time.Time + mu sync.Mutex + opCh chan partialOperation } // Initialize healing MRF subsystem @@ -75,24 +54,15 @@ func (m *mrfState) init(ctx context.Context, objAPI ObjectLayer) { defer m.mu.Unlock() m.ctx = ctx - m.objectAPI = objAPI m.opCh = make(chan partialOperation, mrfOpsQueueSize) - m.pendingOps = make(map[partialOperation]setInfo) - m.setReconnectEvent = make(chan setInfo) + m.objectAPI = objAPI - go globalMRFState.maintainMRFList() go globalMRFState.healRoutine() - - atomic.StoreInt32(&m.ready, 1) -} - -func (m *mrfState) initialized() bool { - return atomic.LoadInt32(&m.ready) != 0 } // Add a partial S3 operation (put/delete) when one or more disks are offline. func (m *mrfState) addPartialOp(op partialOperation) { - if !m.initialized() { + if m == nil { return } @@ -102,140 +72,39 @@ func (m *mrfState) addPartialOp(op partialOperation) { } } -// Receive the new set (disk) reconnection event -func (m *mrfState) newSetReconnected(pool, set int) { - if !m.initialized() { - return - } - - idler := time.NewTimer(100 * time.Millisecond) - defer idler.Stop() - - select { - case m.setReconnectEvent <- setInfo{index: set, pool: pool}: - case <-idler.C: - } -} - -// Get current MRF stats of the last MRF activity -func (m *mrfState) getCurrentMRFRoundInfo() madmin.MRFStatus { - m.mu.Lock() - triggeredAt := m.triggeredAt - itemsHealed := m.itemsHealed - bytesHealed := m.bytesHealed - pendingItems := m.pendingItems - pendingBytes := m.pendingBytes - m.mu.Unlock() - - if pendingItems == 0 { - return madmin.MRFStatus{} - } - - return madmin.MRFStatus{ - Started: triggeredAt, - ItemsHealed: itemsHealed, - BytesHealed: bytesHealed, - TotalItems: itemsHealed + pendingItems, - TotalBytes: bytesHealed + pendingBytes, - } -} - -// maintainMRFList gathers the list of successful partial uploads -// from all underlying er.sets and puts them in a global map which -// should not have more than 10000 entries. -func (m *mrfState) maintainMRFList() { - for fOp := range m.opCh { - m.mu.Lock() - if len(m.pendingOps) > mrfOpsQueueSize { - m.mu.Unlock() - continue - } - - m.pendingOps[fOp] = setInfo{index: fOp.setIndex, pool: fOp.poolIndex} - m.pendingItems++ - if fOp.size > 0 { - m.pendingBytes += uint64(fOp.size) - } - - m.mu.Unlock() - } -} - -// Reset current MRF stats -func (m *mrfState) resetMRFInfoIfNoPendingOps() { - m.mu.Lock() - defer m.mu.Unlock() - - if m.pendingItems > 0 { - return - } - - m.itemsHealed = 0 - m.bytesHealed = 0 - m.pendingItems = 0 - m.pendingBytes = 0 - m.triggeredAt = time.Time{} -} +var healSleeper = newDynamicSleeper(5, time.Second, false) // healRoutine listens to new disks reconnection events and // issues healing requests for queued objects belonging to the // corresponding erasure set func (m *mrfState) healRoutine() { - idler := time.NewTimer(mrfInfoResetInterval) - defer idler.Stop() - - mrfHealingOpts := madmin.HealOpts{ - ScanMode: madmin.HealNormalScan, - Remove: healDeleteDangling, - } - for { select { case <-m.ctx.Done(): return - case <-idler.C: - m.resetMRFInfoIfNoPendingOps() - idler.Reset(mrfInfoResetInterval) - case setInfo := <-m.setReconnectEvent: - // Get the list of objects related the er.set - // to which the connected disk belongs. - var mrfOperations []partialOperation - m.mu.Lock() - for k, v := range m.pendingOps { - if v == setInfo { - mrfOperations = append(mrfOperations, k) - } - } - m.mu.Unlock() - - if len(mrfOperations) == 0 { - continue + case u, ok := <-m.opCh: + if !ok { + return } - m.mu.Lock() - m.triggeredAt = time.Now().UTC() - m.mu.Unlock() - - // Heal objects - for _, u := range mrfOperations { - _, err := m.objectAPI.HealObject(m.ctx, u.bucket, u.object, u.versionID, mrfHealingOpts) - m.mu.Lock() - if err == nil { - m.itemsHealed++ - m.bytesHealed += uint64(u.size) - } - m.pendingItems-- - m.pendingBytes -= uint64(u.size) - delete(m.pendingOps, u) - m.mu.Unlock() - - if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { - // Log healing error if any - logger.LogIf(m.ctx, err) - } + now := time.Now() + if now.Sub(u.queued) < time.Second { + // let recently failed networks to reconnect + // making MRF wait for 1s before retrying, + // i.e 4 reconnect attempts. + time.Sleep(1 * time.Second) } - waitForLowHTTPReq() + // wait on timer per heal + wait := healSleeper.Timer(context.Background()) + + if u.object == "" { + healBucket(u.bucket, madmin.HealNormalScan) + } else { + healObject(u.bucket, u.object, u.versionID, madmin.HealNormalScan) + } + + wait() } } } diff --git a/cmd/peer-s3-client.go b/cmd/peer-s3-client.go index e24bb0141..2bc831d66 100644 --- a/cmd/peer-s3-client.go +++ b/cmd/peer-s3-client.go @@ -225,6 +225,19 @@ func (sys *S3PeerSys) MakeBucket(ctx context.Context, bucket string, opts MakeBu quorum := (len(sys.allPeerClients) / 2) + 1 err := reduceWriteQuorumErrs(ctx, errs, bucketOpIgnoredErrs, quorum) + + // Perform MRF on missing buckets for temporary errors. + for _, err := range errs { + if err == nil { + continue + } + if errors.Is(err, errPeerOffline) || errors.Is(err, errDiskNotFound) || + isNetworkError(err) { + globalMRFState.addPartialOp(partialOperation{ + bucket: bucket, + }) + } + } return toObjectErr(err, bucket) }