mirror of
https://github.com/minio/minio.git
synced 2025-11-07 21:02:58 -05:00
replication: Simplify mrf requeueing and add backlog handler (#17171)
Simplify MRF queueing and add backlog handler - Limit re-tries to 3 to avoid repeated re-queueing. Fall offs to be re-tried when the scanner revisits this object or upon access. - Change MRF to have each node process only its MRF entries. - Collect MRF backlog by the node to allow for current backlog visibility
This commit is contained in:
@@ -27,7 +27,9 @@ import (
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -1127,7 +1129,7 @@ func (ri ReplicateObjectInfo) replicateObject(ctx context.Context, objectAPI Obj
|
||||
}
|
||||
|
||||
if globalBucketTargetSys.isOffline(tgt.EndpointURL()) {
|
||||
logger.LogOnceIf(ctx, fmt.Errorf("remote target is offline for bucket:%s arn:%s", bucket, tgt.ARN), "replication-target-offline-obj-"+tgt.ARN)
|
||||
logger.LogOnceIf(ctx, fmt.Errorf("remote target is offline for bucket:%s arn:%s retry:%d", bucket, tgt.ARN, ri.RetryCount), "replication-target-offline"+tgt.ARN)
|
||||
sendEvent(eventArgs{
|
||||
EventName: event.ObjectReplicationNotTracked,
|
||||
BucketName: bucket,
|
||||
@@ -1277,7 +1279,7 @@ func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI Object
|
||||
}
|
||||
|
||||
if globalBucketTargetSys.isOffline(tgt.EndpointURL()) {
|
||||
logger.LogOnceIf(ctx, fmt.Errorf("remote target is offline for bucket:%s arn:%s", bucket, tgt.ARN), "replication-target-offline-all-"+tgt.ARN)
|
||||
logger.LogOnceIf(ctx, fmt.Errorf("remote target is offline for bucket:%s arn:%s retry:%d", bucket, tgt.ARN, ri.RetryCount), "replication-target-offline-heal"+tgt.ARN)
|
||||
sendEvent(eventArgs{
|
||||
EventName: event.ObjectReplicationNotTracked,
|
||||
BucketName: bucket,
|
||||
@@ -2853,7 +2855,7 @@ func getReplicationDiff(ctx context.Context, objAPI ObjectLayer, bucket string,
|
||||
}
|
||||
|
||||
// QueueReplicationHeal is a wrapper for queueReplicationHeal
|
||||
func QueueReplicationHeal(ctx context.Context, bucket string, oi ObjectInfo) {
|
||||
func QueueReplicationHeal(ctx context.Context, bucket string, oi ObjectInfo, retryCount int) {
|
||||
// un-versioned or a prefix
|
||||
if oi.VersionID == "" || oi.ModTime.IsZero() {
|
||||
return
|
||||
@@ -2863,12 +2865,12 @@ func QueueReplicationHeal(ctx context.Context, bucket string, oi ObjectInfo) {
|
||||
queueReplicationHeal(ctx, bucket, oi, replicationConfig{
|
||||
Config: rcfg,
|
||||
remotes: tgts,
|
||||
})
|
||||
}, retryCount)
|
||||
}
|
||||
|
||||
// queueReplicationHeal enqueues objects that failed replication OR eligible for resyncing through
|
||||
// an ongoing resync operation or via existing objects replication configuration setting.
|
||||
func queueReplicationHeal(ctx context.Context, bucket string, oi ObjectInfo, rcfg replicationConfig) (roi ReplicateObjectInfo) {
|
||||
func queueReplicationHeal(ctx context.Context, bucket string, oi ObjectInfo, rcfg replicationConfig, retryCount int) (roi ReplicateObjectInfo) {
|
||||
// un-versioned or a prefix
|
||||
if oi.VersionID == "" || oi.ModTime.IsZero() {
|
||||
return roi
|
||||
@@ -2878,6 +2880,7 @@ func queueReplicationHeal(ctx context.Context, bucket string, oi ObjectInfo, rcf
|
||||
return roi
|
||||
}
|
||||
roi = getHealReplicateObjectInfo(oi, rcfg)
|
||||
roi.RetryCount = uint32(retryCount)
|
||||
if !roi.Dsc.ReplicateAny() {
|
||||
return
|
||||
}
|
||||
@@ -2939,7 +2942,13 @@ func queueReplicationHeal(ctx context.Context, bucket string, oi ObjectInfo, rcf
|
||||
return
|
||||
}
|
||||
|
||||
const mrfTimeInterval = 5 * time.Minute
|
||||
const (
|
||||
mrfSaveInterval = 5 * time.Minute
|
||||
mrfQueueInterval = 6 * time.Minute
|
||||
|
||||
mrfRetryLimit = 3 // max number of retries before letting scanner catch up on this object version
|
||||
mrfMaxEntries = 1000000
|
||||
)
|
||||
|
||||
func (p *ReplicationPool) persistMRF() {
|
||||
if !p.initialized() {
|
||||
@@ -2948,7 +2957,7 @@ func (p *ReplicationPool) persistMRF() {
|
||||
|
||||
var mu sync.Mutex
|
||||
entries := make(map[string]MRFReplicateEntry)
|
||||
mTimer := time.NewTimer(mrfTimeInterval)
|
||||
mTimer := time.NewTimer(mrfSaveInterval)
|
||||
defer mTimer.Stop()
|
||||
saveMRFToDisk := func(drain bool) {
|
||||
mu.Lock()
|
||||
@@ -2964,8 +2973,11 @@ func (p *ReplicationPool) persistMRF() {
|
||||
entries[e.versionID] = e
|
||||
}
|
||||
}
|
||||
// queue all entries for healing before overwriting the node mrf file
|
||||
p.queueMRFHeal()
|
||||
|
||||
if err := p.saveMRFEntries(cctx, entries); err != nil {
|
||||
logger.LogOnceIf(p.ctx, fmt.Errorf("Unable to persist replication failures to disk:%w", err), string(replicationSubsystem))
|
||||
logger.LogOnceIf(p.ctx, fmt.Errorf("unable to persist replication failures to disk:%w", err), string(replicationSubsystem))
|
||||
}
|
||||
entries = make(map[string]MRFReplicateEntry)
|
||||
}
|
||||
@@ -2973,7 +2985,7 @@ func (p *ReplicationPool) persistMRF() {
|
||||
select {
|
||||
case <-mTimer.C:
|
||||
saveMRFToDisk(false)
|
||||
mTimer.Reset(mrfTimeInterval)
|
||||
mTimer.Reset(mrfSaveInterval)
|
||||
case <-p.ctx.Done():
|
||||
p.mrfStopCh <- struct{}{}
|
||||
close(p.mrfSaveCh)
|
||||
@@ -2991,7 +3003,7 @@ func (p *ReplicationPool) persistMRF() {
|
||||
entries[e.versionID] = e
|
||||
cnt = len(entries)
|
||||
mu.Unlock()
|
||||
if cnt >= cap(p.mrfSaveCh) || len(p.mrfSaveCh) >= int(0.8*float32(cap(p.mrfSaveCh))) {
|
||||
if cnt >= mrfMaxEntries {
|
||||
saveMRFToDisk(true)
|
||||
}
|
||||
}
|
||||
@@ -3002,6 +3014,9 @@ func (p *ReplicationPool) queueMRFSave(entry MRFReplicateEntry) {
|
||||
if !p.initialized() {
|
||||
return
|
||||
}
|
||||
if entry.RetryCount > mrfRetryLimit {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case <-GlobalContext.Done():
|
||||
return
|
||||
@@ -3015,7 +3030,7 @@ func (p *ReplicationPool) queueMRFSave(entry MRFReplicateEntry) {
|
||||
}
|
||||
}
|
||||
|
||||
// save mrf entries to mrf_<uuid>.bin
|
||||
// save mrf entries to nodenamehex.bin
|
||||
func (p *ReplicationPool) saveMRFEntries(ctx context.Context, entries map[string]MRFReplicateEntry) error {
|
||||
if !p.initialized() {
|
||||
return nil
|
||||
@@ -3031,17 +3046,32 @@ func (p *ReplicationPool) saveMRFEntries(ctx context.Context, entries map[string
|
||||
data := make([]byte, 4, v.Msgsize()+4)
|
||||
|
||||
// Initialize the resync meta header.
|
||||
binary.LittleEndian.PutUint16(data[0:2], resyncMetaFormat)
|
||||
binary.LittleEndian.PutUint16(data[2:4], resyncMetaVersion)
|
||||
binary.LittleEndian.PutUint16(data[0:2], mrfMetaFormat)
|
||||
binary.LittleEndian.PutUint16(data[2:4], mrfMetaVersion)
|
||||
|
||||
buf, err := v.MarshalMsg(data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
configFile := path.Join(replicationMRFDir, mustGetUUID()+".bin")
|
||||
err = saveConfig(ctx, p.objLayer, configFile, buf)
|
||||
return err
|
||||
for _, diskPath := range globalEndpoints.LocalDisksPaths() {
|
||||
// write to first drive
|
||||
mrfDir := filepath.Join(diskPath, minioMetaBucket, replicationMRFDir)
|
||||
mrfFileName := filepath.Join(mrfDir, globalLocalNodeNameHex+".bin")
|
||||
if err := os.MkdirAll(mrfDir, 0o777); err != nil {
|
||||
return err
|
||||
}
|
||||
file, err := OpenFile(mrfFileName, os.O_CREATE|os.O_WRONLY|writeMode, 0o666)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
defer file.Close()
|
||||
if _, err = file.Write(buf); err != nil {
|
||||
return err
|
||||
}
|
||||
break
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// load mrf entries from disk
|
||||
@@ -3049,9 +3079,14 @@ func (p *ReplicationPool) loadMRF(fileName string) (re MRFReplicateEntries, e er
|
||||
if !p.initialized() {
|
||||
return re, nil
|
||||
}
|
||||
file, err := Open(fileName)
|
||||
if err != nil {
|
||||
return re, err
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
data, err := readConfig(p.ctx, p.objLayer, fileName)
|
||||
if err != nil && err != errConfigNotFound {
|
||||
data, err := io.ReadAll(file)
|
||||
if err != nil {
|
||||
return re, err
|
||||
}
|
||||
if len(data) == 0 {
|
||||
@@ -3089,7 +3124,7 @@ func (p *ReplicationPool) processMRF() {
|
||||
if !p.initialized() {
|
||||
return
|
||||
}
|
||||
pTimer := time.NewTimer(mrfTimeInterval)
|
||||
pTimer := time.NewTimer(mrfQueueInterval)
|
||||
defer pTimer.Stop()
|
||||
for {
|
||||
select {
|
||||
@@ -3103,24 +3138,13 @@ func (p *ReplicationPool) processMRF() {
|
||||
}
|
||||
}
|
||||
if len(tgts) == offlineCnt {
|
||||
pTimer.Reset(mrfTimeInterval)
|
||||
pTimer.Reset(mrfQueueInterval)
|
||||
continue
|
||||
}
|
||||
objCh := make(chan ObjectInfo)
|
||||
cctx, cancelFn := context.WithCancel(p.ctx)
|
||||
if err := p.objLayer.Walk(cctx, minioMetaBucket, replicationMRFDir, objCh, ObjectOptions{}); err != nil {
|
||||
pTimer.Reset(mrfTimeInterval)
|
||||
cancelFn()
|
||||
if err := p.queueMRFHeal(); err != nil && !osIsNotExist(err) {
|
||||
logger.LogIf(p.ctx, err)
|
||||
continue
|
||||
}
|
||||
for item := range objCh {
|
||||
if err := p.queueMRFHeal(item.Name); err == nil {
|
||||
p.objLayer.DeleteObject(p.ctx, minioMetaBucket, item.Name, ObjectOptions{})
|
||||
}
|
||||
}
|
||||
pTimer.Reset(mrfTimeInterval)
|
||||
cancelFn()
|
||||
pTimer.Reset(mrfQueueInterval)
|
||||
case <-p.ctx.Done():
|
||||
return
|
||||
}
|
||||
@@ -3128,24 +3152,36 @@ func (p *ReplicationPool) processMRF() {
|
||||
}
|
||||
|
||||
// process sends error logs to the heal channel for an attempt to heal replication.
|
||||
func (p *ReplicationPool) queueMRFHeal(file string) error {
|
||||
func (p *ReplicationPool) queueMRFHeal() error {
|
||||
if !p.initialized() {
|
||||
return errServerNotInitialized
|
||||
}
|
||||
|
||||
mrfRec, err := p.loadMRF(file)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for vID, e := range mrfRec.Entries {
|
||||
oi, err := p.objLayer.GetObjectInfo(p.ctx, e.Bucket, e.Object, ObjectOptions{
|
||||
VersionID: vID,
|
||||
})
|
||||
for _, diskPath := range globalEndpoints.LocalDisksPaths() {
|
||||
fileName := filepath.Join(diskPath, minioMetaBucket, replicationMRFDir, globalLocalNodeNameHex+".bin")
|
||||
mrfRec, err := p.loadMRF(fileName)
|
||||
if err != nil {
|
||||
continue
|
||||
return err
|
||||
}
|
||||
QueueReplicationHeal(p.ctx, e.Bucket, oi)
|
||||
// finally delete the file after processing mrf entries
|
||||
os.Remove(fileName)
|
||||
|
||||
// queue replication heal in a goroutine to avoid holding up mrf save routine
|
||||
go func(mrfRec MRFReplicateEntries) {
|
||||
for vID, e := range mrfRec.Entries {
|
||||
|
||||
oi, err := p.objLayer.GetObjectInfo(p.ctx, e.Bucket, e.Object, ObjectOptions{
|
||||
VersionID: vID,
|
||||
})
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
QueueReplicationHeal(p.ctx, e.Bucket, oi, e.RetryCount)
|
||||
}
|
||||
}(mrfRec)
|
||||
break
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -3229,3 +3265,36 @@ func (p *ReplicationPool) saveStats(ctx context.Context) error {
|
||||
}
|
||||
return saveConfig(ctx, p.objLayer, getReplicationStatsPath(), data)
|
||||
}
|
||||
|
||||
// getMRF returns MRF entries for this node.
|
||||
func (p *ReplicationPool) getMRF(ctx context.Context, bucket string) (ch chan madmin.ReplicationMRF, err error) {
|
||||
mrfCh := make(chan madmin.ReplicationMRF, 100)
|
||||
go func() {
|
||||
defer close(mrfCh)
|
||||
for _, diskPath := range globalEndpoints.LocalDisksPaths() {
|
||||
file := filepath.Join(diskPath, minioMetaBucket, replicationMRFDir, globalLocalNodeNameHex+".bin")
|
||||
mrfRec, err := p.loadMRF(file)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
for vID, e := range mrfRec.Entries {
|
||||
if e.Bucket != bucket && bucket != "" {
|
||||
continue
|
||||
}
|
||||
select {
|
||||
case mrfCh <- madmin.ReplicationMRF{
|
||||
NodeName: globalLocalNodeName,
|
||||
Object: e.Object,
|
||||
VersionID: vID,
|
||||
Bucket: e.Bucket,
|
||||
RetryCount: e.RetryCount,
|
||||
}:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return mrfCh, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user