serialize queueMRFHeal(), add timeouts and avoid normal build-ups (#17886)

we expect a certain level of IOPs and latency so this is okay.

fixes other miscellaneous bugs

- such as hanging on mrfCh <- when the context is canceled
- queuing MRF heal when the context is canceled
- remove unused saveStateCh channel
This commit is contained in:
Harshavardhana 2023-08-21 16:44:50 -07:00 committed by GitHub
parent 3a0125fa1f
commit 1c5af7c31a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 41 additions and 57 deletions

View File

@ -27,9 +27,7 @@ import (
"math/rand" "math/rand"
"net/http" "net/http"
"net/url" "net/url"
"os"
"path" "path"
"path/filepath"
"reflect" "reflect"
"strings" "strings"
"sync" "sync"
@ -1631,6 +1629,7 @@ type ReplicationPool struct {
ctx context.Context ctx context.Context
priority string priority string
mu sync.RWMutex mu sync.RWMutex
mrfMU sync.Mutex
resyncer *replicationResyncer resyncer *replicationResyncer
// workers: // workers:
@ -1644,7 +1643,6 @@ type ReplicationPool struct {
mrfSaveCh chan MRFReplicateEntry mrfSaveCh chan MRFReplicateEntry
mrfStopCh chan struct{} mrfStopCh chan struct{}
mrfWorkerSize int mrfWorkerSize int
saveStateCh chan struct{}
} }
// ReplicationWorkerOperation is a shared interface of replication operations. // ReplicationWorkerOperation is a shared interface of replication operations.
@ -1704,7 +1702,6 @@ func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPool
resyncer: newresyncer(), resyncer: newresyncer(),
mrfSaveCh: make(chan MRFReplicateEntry, 100000), mrfSaveCh: make(chan MRFReplicateEntry, 100000),
mrfStopCh: make(chan struct{}, 1), mrfStopCh: make(chan struct{}, 1),
saveStateCh: make(chan struct{}, 1),
ctx: ctx, ctx: ctx,
objLayer: o, objLayer: o,
priority: priority, priority: priority,
@ -3035,13 +3032,11 @@ func (p *ReplicationPool) persistMRF() {
return return
} }
var mu sync.Mutex
entries := make(map[string]MRFReplicateEntry) entries := make(map[string]MRFReplicateEntry)
mTimer := time.NewTimer(mrfSaveInterval) mTimer := time.NewTimer(mrfSaveInterval)
defer mTimer.Stop() defer mTimer.Stop()
saveMRFToDisk := func(drain bool) { saveMRFToDisk := func(drain bool) {
mu.Lock()
defer mu.Unlock()
if len(entries) == 0 { if len(entries) == 0 {
return return
} }
@ -3053,12 +3048,16 @@ func (p *ReplicationPool) persistMRF() {
entries[e.versionID] = e entries[e.versionID] = e
} }
} }
// queue all entries for healing before overwriting the node mrf file // queue all entries for healing before overwriting the node mrf file
p.queueMRFHeal() if !contextCanceled(p.ctx) {
p.queueMRFHeal()
}
if err := p.saveMRFEntries(cctx, entries); err != nil { if err := p.saveMRFEntries(cctx, entries); err != nil {
logger.LogOnceIf(p.ctx, fmt.Errorf("unable to persist replication failures to disk:%w", err), string(replicationSubsystem)) logger.LogOnceIf(p.ctx, fmt.Errorf("unable to persist replication failures to disk:%w", err), string(replicationSubsystem))
} }
entries = make(map[string]MRFReplicateEntry) entries = make(map[string]MRFReplicateEntry)
} }
for { for {
@ -3071,21 +3070,14 @@ func (p *ReplicationPool) persistMRF() {
close(p.mrfSaveCh) close(p.mrfSaveCh)
saveMRFToDisk(true) saveMRFToDisk(true)
return return
case <-p.saveStateCh:
saveMRFToDisk(true)
return
case e, ok := <-p.mrfSaveCh: case e, ok := <-p.mrfSaveCh:
if !ok { if !ok {
return return
} }
var cnt int if len(entries) >= mrfMaxEntries {
mu.Lock()
entries[e.versionID] = e
cnt = len(entries)
mu.Unlock()
if cnt >= mrfMaxEntries {
saveMRFToDisk(true) saveMRFToDisk(true)
} }
entries[e.versionID] = e
} }
} }
} }
@ -3119,6 +3111,7 @@ func (p *ReplicationPool) saveMRFEntries(ctx context.Context, entries map[string
if len(entries) == 0 { if len(entries) == 0 {
return nil return nil
} }
v := MRFReplicateEntries{ v := MRFReplicateEntries{
Entries: entries, Entries: entries,
Version: mrfMetaVersionV1, Version: mrfMetaVersionV1,
@ -3134,41 +3127,19 @@ func (p *ReplicationPool) saveMRFEntries(ctx context.Context, entries map[string
return err return err
} }
for _, diskPath := range globalEndpoints.LocalDisksPaths() { for _, localDrive := range globalLocalDrives {
// write to first drive if err := localDrive.WriteAll(ctx, minioMetaBucket, pathJoin(replicationMRFDir, globalLocalNodeNameHex+".bin"), buf); err == nil {
mrfDir := filepath.Join(diskPath, minioMetaBucket, replicationMRFDir) break
mrfFileName := filepath.Join(mrfDir, globalLocalNodeNameHex+".bin")
if err := os.MkdirAll(mrfDir, 0o777); err != nil {
return err
} }
file, err := OpenFile(mrfFileName, os.O_CREATE|os.O_WRONLY|writeMode, 0o666)
if err != nil {
continue
}
defer file.Close()
if _, err = file.Write(buf); err != nil {
return err
}
break
} }
return nil return nil
} }
// load mrf entries from disk // load mrf entries from disk
func (p *ReplicationPool) loadMRF(fileName string) (re MRFReplicateEntries, e error) { func (p *ReplicationPool) loadMRF(data []byte) (re MRFReplicateEntries, e error) {
if !p.initialized() { if !p.initialized() {
return re, nil return re, nil
} }
file, err := Open(fileName)
if err != nil {
return re, err
}
defer file.Close()
data, err := io.ReadAll(file)
if err != nil {
return re, err
}
if len(data) == 0 { if len(data) == 0 {
// Seems to be empty. // Seems to be empty.
return re, nil return re, nil
@ -3188,7 +3159,7 @@ func (p *ReplicationPool) loadMRF(fileName string) (re MRFReplicateEntries, e er
return re, fmt.Errorf("replication mrf: unknown version: %d", binary.LittleEndian.Uint16(data[2:4])) return re, fmt.Errorf("replication mrf: unknown version: %d", binary.LittleEndian.Uint16(data[2:4]))
} }
// OK, parse data. // OK, parse data.
if _, err = re.UnmarshalMsg(data[4:]); err != nil { if _, err := re.UnmarshalMsg(data[4:]); err != nil {
return re, err return re, err
} }
@ -3233,29 +3204,40 @@ func (p *ReplicationPool) processMRF() {
// process sends error logs to the heal channel for an attempt to heal replication. // process sends error logs to the heal channel for an attempt to heal replication.
func (p *ReplicationPool) queueMRFHeal() error { func (p *ReplicationPool) queueMRFHeal() error {
p.mrfMU.Lock()
defer p.mrfMU.Unlock()
if !p.initialized() { if !p.initialized() {
return errServerNotInitialized return errServerNotInitialized
} }
for _, diskPath := range globalEndpoints.LocalDisksPaths() { for _, localDrive := range globalLocalDrives {
fileName := filepath.Join(diskPath, minioMetaBucket, replicationMRFDir, globalLocalNodeNameHex+".bin") buf, err := localDrive.ReadAll(p.ctx, minioMetaBucket, pathJoin(replicationMRFDir, globalLocalNodeNameHex+".bin"))
mrfRec, err := p.loadMRF(fileName)
if err != nil { if err != nil {
return err continue
} }
mrfRec, err := p.loadMRF(buf)
if err != nil {
continue
}
// finally delete the file after processing mrf entries // finally delete the file after processing mrf entries
os.Remove(fileName) localDrive.Delete(p.ctx, minioMetaBucket, pathJoin(replicationMRFDir, globalLocalNodeNameHex+".bin"), DeleteOptions{})
// queue replication heal in a goroutine to avoid holding up mrf save routine // queue replication heal in a goroutine to avoid holding up mrf save routine
go func(mrfRec MRFReplicateEntries) { go func(mrfRec MRFReplicateEntries) {
for vID, e := range mrfRec.Entries { for vID, e := range mrfRec.Entries {
ctx, cancel := context.WithTimeout(p.ctx, time.Second) // Do not waste more than a second on this.
oi, err := p.objLayer.GetObjectInfo(p.ctx, e.Bucket, e.Object, ObjectOptions{ oi, err := p.objLayer.GetObjectInfo(ctx, e.Bucket, e.Object, ObjectOptions{
VersionID: vID, VersionID: vID,
}) })
cancel()
if err != nil { if err != nil {
continue continue
} }
QueueReplicationHeal(p.ctx, e.Bucket, oi, e.RetryCount) QueueReplicationHeal(p.ctx, e.Bucket, oi, e.RetryCount)
} }
}(mrfRec) }(mrfRec)
@ -3351,11 +3333,14 @@ func (p *ReplicationPool) getMRF(ctx context.Context, bucket string) (ch chan ma
mrfCh := make(chan madmin.ReplicationMRF, 100) mrfCh := make(chan madmin.ReplicationMRF, 100)
go func() { go func() {
defer close(mrfCh) defer close(mrfCh)
for _, diskPath := range globalEndpoints.LocalDisksPaths() { for _, localDrive := range globalLocalDrives {
file := filepath.Join(diskPath, minioMetaBucket, replicationMRFDir, globalLocalNodeNameHex+".bin") buf, err := localDrive.ReadAll(ctx, minioMetaBucket, pathJoin(replicationMRFDir, globalLocalNodeNameHex+".bin"))
mrfRec, err := p.loadMRF(file)
if err != nil { if err != nil {
break continue
}
mrfRec, err := p.loadMRF(buf)
if err != nil {
continue
} }
for vID, e := range mrfRec.Entries { for vID, e := range mrfRec.Entries {
if e.Bucket != bucket && bucket != "" { if e.Bucket != bucket && bucket != "" {

View File

@ -1520,15 +1520,14 @@ func (sys *NotificationSys) GetReplicationMRF(ctx context.Context, bucket, node
select { select {
case <-ctx.Done(): case <-ctx.Done():
return err return err
default: case mrfCh <- e:
mrfCh <- e
} }
} }
return nil return nil
}(mrfCh) }(mrfCh)
go func(wg *sync.WaitGroup) { go func(wg *sync.WaitGroup) {
wg.Wait() wg.Wait()
defer close(mrfCh) close(mrfCh)
}(&wg) }(&wg)
return mrfCh, nil return mrfCh, nil
} }