optimize MRF replication queue to avoid memory leaks (#18007)

This commit is contained in:
Harshavardhana 2023-09-11 20:59:11 -07:00 committed by GitHub
parent 9f7044aed0
commit 1df5e31706
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 132 additions and 118 deletions

View File

@ -49,6 +49,7 @@ import (
"github.com/minio/minio/internal/hash" "github.com/minio/minio/internal/hash"
xhttp "github.com/minio/minio/internal/http" xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/logger"
"github.com/tinylib/msgp/msgp"
"github.com/zeebo/xxh3" "github.com/zeebo/xxh3"
) )
@ -3051,7 +3052,7 @@ func queueReplicationHeal(ctx context.Context, bucket string, oi ObjectInfo, rcf
const ( const (
mrfSaveInterval = 5 * time.Minute mrfSaveInterval = 5 * time.Minute
mrfQueueInterval = 6 * time.Minute mrfQueueInterval = mrfSaveInterval + time.Minute // A minute higher than save interval
mrfRetryLimit = 3 // max number of retries before letting scanner catch up on this object version mrfRetryLimit = 3 // max number of retries before letting scanner catch up on this object version
mrfMaxEntries = 1000000 mrfMaxEntries = 1000000
@ -3066,46 +3067,37 @@ func (p *ReplicationPool) persistMRF() {
mTimer := time.NewTimer(mrfSaveInterval) mTimer := time.NewTimer(mrfSaveInterval)
defer mTimer.Stop() defer mTimer.Stop()
saveMRFToDisk := func(drain bool) { saveMRFToDisk := func() {
if len(entries) == 0 { if len(entries) == 0 {
return return
} }
cctx := p.ctx
if drain {
cctx = context.Background()
// drain all mrf entries and save to disk
for e := range p.mrfSaveCh {
entries[e.versionID] = e
}
}
// queue all entries for healing before overwriting the node mrf file // queue all entries for healing before overwriting the node mrf file
if !contextCanceled(p.ctx) { if !contextCanceled(p.ctx) {
p.queueMRFHeal() p.queueMRFHeal()
} }
if err := p.saveMRFEntries(cctx, entries); err != nil { p.saveMRFEntries(p.ctx, entries)
logger.LogOnceIf(p.ctx, fmt.Errorf("unable to persist replication failures to disk:%w", err), string(replicationSubsystem))
}
entries = make(map[string]MRFReplicateEntry) entries = make(map[string]MRFReplicateEntry)
} }
for { for {
select { select {
case <-mTimer.C: case <-mTimer.C:
saveMRFToDisk(false) saveMRFToDisk()
mTimer.Reset(mrfSaveInterval) mTimer.Reset(mrfSaveInterval)
case <-p.ctx.Done(): case <-p.ctx.Done():
p.mrfStopCh <- struct{}{} p.mrfStopCh <- struct{}{}
close(p.mrfSaveCh) close(p.mrfSaveCh)
saveMRFToDisk(true) // We try to save if possible, but we don't care beyond that.
saveMRFToDisk()
return return
case e, ok := <-p.mrfSaveCh: case e, ok := <-p.mrfSaveCh:
if !ok { if !ok {
return return
} }
if len(entries) >= mrfMaxEntries { if len(entries) >= mrfMaxEntries {
saveMRFToDisk(true) saveMRFToDisk()
} }
entries[e.versionID] = e entries[e.versionID] = e
} }
@ -3135,14 +3127,45 @@ func (p *ReplicationPool) queueMRFSave(entry MRFReplicateEntry) {
} }
} }
func (p *ReplicationPool) persistToDrive(ctx context.Context, v MRFReplicateEntries, data []byte) {
newReader := func() io.ReadCloser {
r, w := io.Pipe()
go func() {
mw := msgp.NewWriter(w)
n, err := mw.Write(data)
if err != nil {
w.CloseWithError(err)
return
}
if n != len(data) {
w.CloseWithError(io.ErrShortWrite)
return
}
err = v.EncodeMsg(mw)
mw.Flush()
w.CloseWithError(err)
}()
return r
}
for _, localDrive := range globalLocalDrives {
r := newReader()
err := localDrive.CreateFile(ctx, minioMetaBucket, pathJoin(replicationMRFDir, globalLocalNodeNameHex+".bin"), -1, r)
r.Close()
if err == nil {
break
}
}
}
// save mrf entries to nodenamehex.bin // save mrf entries to nodenamehex.bin
func (p *ReplicationPool) saveMRFEntries(ctx context.Context, entries map[string]MRFReplicateEntry) error { func (p *ReplicationPool) saveMRFEntries(ctx context.Context, entries map[string]MRFReplicateEntry) {
if !p.initialized() { if !p.initialized() {
return nil return
} }
atomic.StoreUint64(&globalReplicationStats.mrfStats.LastFailedCount, uint64(len(entries))) atomic.StoreUint64(&globalReplicationStats.mrfStats.LastFailedCount, uint64(len(entries)))
if len(entries) == 0 { if len(entries) == 0 {
return nil return
} }
v := MRFReplicateEntries{ v := MRFReplicateEntries{
@ -3155,30 +3178,24 @@ func (p *ReplicationPool) saveMRFEntries(ctx context.Context, entries map[string
binary.LittleEndian.PutUint16(data[0:2], mrfMetaFormat) binary.LittleEndian.PutUint16(data[0:2], mrfMetaFormat)
binary.LittleEndian.PutUint16(data[2:4], mrfMetaVersion) binary.LittleEndian.PutUint16(data[2:4], mrfMetaVersion)
buf, err := v.MarshalMsg(data) p.persistToDrive(ctx, v, data)
if err != nil {
return err
}
for _, localDrive := range globalLocalDrives {
if err := localDrive.WriteAll(ctx, minioMetaBucket, pathJoin(replicationMRFDir, globalLocalNodeNameHex+".bin"), buf); err == nil {
break
}
}
return nil
} }
// load mrf entries from disk // load mrf entries from disk
func (p *ReplicationPool) loadMRF(data []byte) (re MRFReplicateEntries, e error) { func (p *ReplicationPool) loadMRF() (mrfRec MRFReplicateEntries, err error) {
loadMRF := func(rc io.ReadCloser) (re MRFReplicateEntries, err error) {
defer rc.Close()
if !p.initialized() { if !p.initialized() {
return re, nil return re, nil
} }
if len(data) == 0 { data := make([]byte, 4)
// Seems to be empty. n, err := rc.Read(data)
return re, nil if err != nil {
return re, err
} }
if len(data) <= 4 { if n != len(data) {
return re, fmt.Errorf("replication mrf: no data") return re, errors.New("replication mrf: no data")
} }
// Read resync meta header // Read resync meta header
switch binary.LittleEndian.Uint16(data[0:2]) { switch binary.LittleEndian.Uint16(data[0:2]) {
@ -3191,17 +3208,30 @@ func (p *ReplicationPool) loadMRF(data []byte) (re MRFReplicateEntries, e error)
default: default:
return re, fmt.Errorf("replication mrf: unknown version: %d", binary.LittleEndian.Uint16(data[2:4])) return re, fmt.Errorf("replication mrf: unknown version: %d", binary.LittleEndian.Uint16(data[2:4]))
} }
// OK, parse data. // OK, parse data.
if _, err := re.UnmarshalMsg(data[4:]); err != nil { // ignore any parsing errors, we do not care this file is generated again anyways.
return re, err re.DecodeMsg(msgp.NewReader(rc))
return re, nil
}
for _, localDrive := range globalLocalDrives {
rc, err := localDrive.ReadFileStream(p.ctx, minioMetaBucket, pathJoin(replicationMRFDir, globalLocalNodeNameHex+".bin"), 0, -1)
if err != nil {
continue
} }
switch re.Version { mrfRec, err = loadMRF(rc)
case mrfMetaVersionV1: if err != nil {
default: continue
return re, fmt.Errorf("unexpected mrf meta version: %d", re.Version)
} }
return re, nil
// finally delete the file after processing mrf entries
localDrive.Delete(p.ctx, minioMetaBucket, pathJoin(replicationMRFDir, globalLocalNodeNameHex+".bin"), DeleteOptions{})
break
}
return mrfRec, nil
} }
func (p *ReplicationPool) processMRF() { func (p *ReplicationPool) processMRF() {
@ -3244,22 +3274,13 @@ func (p *ReplicationPool) queueMRFHeal() error {
return errServerNotInitialized return errServerNotInitialized
} }
for _, localDrive := range globalLocalDrives { mrfRec, err := p.loadMRF()
buf, err := localDrive.ReadAll(p.ctx, minioMetaBucket, pathJoin(replicationMRFDir, globalLocalNodeNameHex+".bin"))
if err != nil { if err != nil {
continue return err
} }
mrfRec, err := p.loadMRF(buf)
if err != nil {
continue
}
// finally delete the file after processing mrf entries
localDrive.Delete(p.ctx, minioMetaBucket, pathJoin(replicationMRFDir, globalLocalNodeNameHex+".bin"), DeleteOptions{})
// queue replication heal in a goroutine to avoid holding up mrf save routine // queue replication heal in a goroutine to avoid holding up mrf save routine
go func(mrfRec MRFReplicateEntries) { go func() {
for vID, e := range mrfRec.Entries { for vID, e := range mrfRec.Entries {
ctx, cancel := context.WithTimeout(p.ctx, time.Second) // Do not waste more than a second on this. ctx, cancel := context.WithTimeout(p.ctx, time.Second) // Do not waste more than a second on this.
@ -3273,9 +3294,7 @@ func (p *ReplicationPool) queueMRFHeal() error {
QueueReplicationHeal(p.ctx, e.Bucket, oi, e.RetryCount) QueueReplicationHeal(p.ctx, e.Bucket, oi, e.RetryCount)
} }
}(mrfRec) }()
break
}
return nil return nil
} }
@ -3286,18 +3305,14 @@ func (p *ReplicationPool) initialized() bool {
// getMRF returns MRF entries for this node. // getMRF returns MRF entries for this node.
func (p *ReplicationPool) getMRF(ctx context.Context, bucket string) (ch chan madmin.ReplicationMRF, err error) { func (p *ReplicationPool) getMRF(ctx context.Context, bucket string) (ch chan madmin.ReplicationMRF, err error) {
mrfRec, err := p.loadMRF()
if err != nil {
return nil, err
}
mrfCh := make(chan madmin.ReplicationMRF, 100) mrfCh := make(chan madmin.ReplicationMRF, 100)
go func() { go func() {
defer close(mrfCh) defer close(mrfCh)
for _, localDrive := range globalLocalDrives {
buf, err := localDrive.ReadAll(ctx, minioMetaBucket, pathJoin(replicationMRFDir, globalLocalNodeNameHex+".bin"))
if err != nil {
continue
}
mrfRec, err := p.loadMRF(buf)
if err != nil {
continue
}
for vID, e := range mrfRec.Entries { for vID, e := range mrfRec.Entries {
if e.Bucket != bucket && bucket != "" { if e.Bucket != bucket && bucket != "" {
continue continue
@ -3314,7 +3329,6 @@ func (p *ReplicationPool) getMRF(ctx context.Context, bucket string) (ch chan ma
return return
} }
} }
}
}() }()
return mrfCh, nil return mrfCh, nil

View File

@ -1790,7 +1790,7 @@ func (s *xlStorage) ReadFileStream(ctx context.Context, volume, path string, off
return nil, err return nil, err
} }
odirectEnabled := globalAPIConfig.odirectEnabled() && s.oDirect odirectEnabled := globalAPIConfig.odirectEnabled() && s.oDirect && length >= 0
var file *os.File var file *os.File
if odirectEnabled { if odirectEnabled {
@ -1822,6 +1822,10 @@ func (s *xlStorage) ReadFileStream(ctx context.Context, volume, path string, off
} }
} }
if length < 0 {
return file, nil
}
st, err := file.Stat() st, err := file.Stat()
if err != nil { if err != nil {
file.Close() file.Close()
@ -1889,10 +1893,6 @@ func (c closeWrapper) Close() error {
// CreateFile - creates the file. // CreateFile - creates the file.
func (s *xlStorage) CreateFile(ctx context.Context, volume, path string, fileSize int64, r io.Reader) (err error) { func (s *xlStorage) CreateFile(ctx context.Context, volume, path string, fileSize int64, r io.Reader) (err error) {
if fileSize < -1 {
return errInvalidArgument
}
volumeDir, err := s.getVolDir(volume) volumeDir, err := s.getVolDir(volume)
if err != nil { if err != nil {
return err return err
@ -1929,7 +1929,7 @@ func (s *xlStorage) writeAllDirect(ctx context.Context, filePath string, fileSiz
return osErrToFileErr(err) return osErrToFileErr(err)
} }
odirectEnabled := globalAPIConfig.odirectEnabled() && s.oDirect odirectEnabled := globalAPIConfig.odirectEnabled() && s.oDirect && fileSize > 0
var w *os.File var w *os.File
if odirectEnabled { if odirectEnabled {