mirror of
https://github.com/minio/minio.git
synced 2025-11-07 21:02:58 -05:00
Refactor replication resync to be an active process (#14266)
When resync is triggered, walk the bucket namespace and resync objects that are unreplicated. This PR also adds an API to report resync progress.
This commit is contained in:
@@ -19,10 +19,13 @@ package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"net/http"
|
||||
"path"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -998,6 +1001,7 @@ func replicateObjectToTarget(ctx context.Context, ri ReplicateObjectInfo, object
|
||||
OpType: ri.OpType,
|
||||
ReplicationAction: rAction,
|
||||
}
|
||||
|
||||
if ri.ObjectInfo.TargetReplicationStatus(tgt.ARN) == replication.Completed && !ri.ExistingObjResync.Empty() && !ri.ExistingObjResync.mustResyncTarget(tgt.ARN) {
|
||||
rinfo.ReplicationStatus = replication.Completed
|
||||
rinfo.ReplicationResynced = true
|
||||
@@ -1057,6 +1061,13 @@ func replicateObjectToTarget(ctx context.Context, ri ReplicateObjectInfo, object
|
||||
})
|
||||
return rinfo
|
||||
}
|
||||
defer func() {
|
||||
if rinfo.ReplicationStatus == replication.Completed && ri.OpType == replication.ExistingObjectReplicationType && tgt.ResetID != "" {
|
||||
rinfo.ResyncTimestamp = fmt.Sprintf("%s;%s", UTCNow().Format(http.TimeFormat), tgt.ResetID)
|
||||
rinfo.ReplicationResynced = true
|
||||
}
|
||||
rinfo.Duration = time.Since(startTime)
|
||||
}()
|
||||
|
||||
rAction = replicateAll
|
||||
oi, cerr := tgt.StatObject(ctx, tgt.Bucket, object, miniogo.StatObjectOptions{
|
||||
@@ -1089,13 +1100,8 @@ func replicateObjectToTarget(ctx context.Context, ri ReplicateObjectInfo, object
|
||||
// Note: Replication Stats would have been updated despite metadata update failure.
|
||||
gr.Close()
|
||||
closeOnDefer = false
|
||||
return replicatedTargetInfo{
|
||||
ReplicationStatus: replication.Completed,
|
||||
Size: sz,
|
||||
Arn: tgt.ARN,
|
||||
ReplicationAction: rAction,
|
||||
PrevReplicationStatus: objInfo.TargetReplicationStatus(tgt.ARN),
|
||||
}
|
||||
rinfo.ReplicationAction = rAction
|
||||
rinfo.ReplicationStatus = replication.Completed
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -1103,13 +1109,6 @@ func replicateObjectToTarget(ctx context.Context, ri ReplicateObjectInfo, object
|
||||
rinfo.ReplicationStatus = replication.Completed
|
||||
rinfo.Size = size
|
||||
rinfo.ReplicationAction = rAction
|
||||
defer func() {
|
||||
if rinfo.ReplicationStatus == replication.Completed && ri.OpType == replication.ExistingObjectReplicationType && tgt.ResetID != "" {
|
||||
rinfo.ResyncTimestamp = fmt.Sprintf("%s;%s", UTCNow().Format(http.TimeFormat), tgt.ResetID)
|
||||
rinfo.ReplicationResynced = true
|
||||
}
|
||||
rinfo.Duration = time.Since(startTime)
|
||||
}()
|
||||
// use core client to avoid doing multipart on PUT
|
||||
c := &miniogo.Core{Client: tgt.Client}
|
||||
if rAction != replicateAll {
|
||||
@@ -1308,6 +1307,7 @@ type ReplicationPool struct {
|
||||
existingReplicaDeleteCh chan DeletedObjectReplicationInfo
|
||||
workerSize int
|
||||
mrfWorkerSize int
|
||||
resyncState replicationResyncState
|
||||
workerWg sync.WaitGroup
|
||||
mrfWorkerWg sync.WaitGroup
|
||||
once sync.Once
|
||||
@@ -1324,6 +1324,7 @@ func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPool
|
||||
mrfWorkerKillCh: make(chan struct{}, opts.FailedWorkers),
|
||||
existingReplicaCh: make(chan ReplicateObjectInfo, 100000),
|
||||
existingReplicaDeleteCh: make(chan DeletedObjectReplicationInfo, 100000),
|
||||
resyncState: replicationResyncState{statusMap: make(map[string]BucketReplicationResyncStatus)},
|
||||
ctx: ctx,
|
||||
objLayer: o,
|
||||
}
|
||||
@@ -1331,6 +1332,7 @@ func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPool
|
||||
pool.ResizeWorkers(opts.Workers)
|
||||
pool.ResizeFailedWorkers(opts.FailedWorkers)
|
||||
go pool.AddExistingObjectReplicateWorker()
|
||||
go pool.periodicResyncMetaSave(ctx, o)
|
||||
return pool
|
||||
}
|
||||
|
||||
@@ -1870,3 +1872,333 @@ func getLatestReplicationStats(bucket string, u BucketUsageInfo) (s BucketReplic
|
||||
s.ReplicatedSize = int64(math.Max(float64(s.ReplicatedSize), float64(latestTotReplicatedSize)))
|
||||
return s
|
||||
}
|
||||
|
||||
const resyncTimeInterval = time.Minute * 10
|
||||
|
||||
// periodicResyncMetaSave saves in-memory resync meta stats to disk in periodic intervals
|
||||
func (p *ReplicationPool) periodicResyncMetaSave(ctx context.Context, objectAPI ObjectLayer) {
|
||||
resyncTimer := time.NewTimer(resyncTimeInterval)
|
||||
defer resyncTimer.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-resyncTimer.C:
|
||||
resyncTimer.Reset(resyncTimeInterval)
|
||||
now := UTCNow()
|
||||
p.resyncState.RLock()
|
||||
for bucket, brs := range p.resyncState.statusMap {
|
||||
var updt bool
|
||||
for _, st := range brs.TargetsMap {
|
||||
// if resync in progress or just ended, needs to save to disk
|
||||
if st.EndTime.Equal(timeSentinel) || now.Sub(st.EndTime) <= resyncTimeInterval {
|
||||
updt = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if updt {
|
||||
brs.LastUpdate = now
|
||||
if err := saveResyncStatus(ctx, bucket, brs, objectAPI); err != nil {
|
||||
logger.LogIf(ctx, fmt.Errorf("Could not save resync metadata to disk for %s - %w", bucket, err))
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
p.resyncState.RUnlock()
|
||||
case <-ctx.Done():
|
||||
// server could be restarting - need
|
||||
// to exit immediately
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// resyncBucket resyncs all qualifying objects as per replication rules for the target
|
||||
// ARN
|
||||
func resyncBucket(ctx context.Context, bucket, arn string, heal bool, objectAPI ObjectLayer) {
|
||||
resyncStatus := ResyncFailed
|
||||
defer func() {
|
||||
globalReplicationPool.resyncState.Lock()
|
||||
m := globalReplicationPool.resyncState.statusMap[bucket]
|
||||
st := m.TargetsMap[arn]
|
||||
st.EndTime = UTCNow()
|
||||
st.ResyncStatus = resyncStatus
|
||||
m.TargetsMap[arn] = st
|
||||
globalReplicationPool.resyncState.Unlock()
|
||||
}()
|
||||
// Allocate new results channel to receive ObjectInfo.
|
||||
objInfoCh := make(chan ObjectInfo)
|
||||
cfg, err := getReplicationConfig(ctx, bucket)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, fmt.Errorf("Replication resync of %s for arn %s failed with %w", bucket, arn, err))
|
||||
return
|
||||
}
|
||||
tgts, err := globalBucketTargetSys.ListBucketTargets(ctx, bucket)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, fmt.Errorf("Replication resync of %s for arn %s failed %w", bucket, arn, err))
|
||||
return
|
||||
}
|
||||
rcfg := replicationConfig{
|
||||
Config: cfg,
|
||||
remotes: tgts,
|
||||
}
|
||||
tgtArns := cfg.FilterTargetArns(
|
||||
replication.ObjectOpts{
|
||||
OpType: replication.ResyncReplicationType,
|
||||
TargetArn: arn,
|
||||
})
|
||||
if len(tgtArns) != 1 {
|
||||
logger.LogIf(ctx, fmt.Errorf("Replication resync failed for %s - arn specified %s is missing in the replication config", bucket, arn))
|
||||
return
|
||||
}
|
||||
tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, arn)
|
||||
if tgt == nil {
|
||||
logger.LogIf(ctx, fmt.Errorf("Replication resync failed for %s - target could not be created for arn %s", bucket, arn))
|
||||
return
|
||||
}
|
||||
|
||||
// Walk through all object versions - note ascending order of walk needed to ensure delete marker replicated to
|
||||
// target after object version is first created.
|
||||
if err := objectAPI.Walk(ctx, bucket, "", objInfoCh, ObjectOptions{WalkAscending: true}); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return
|
||||
}
|
||||
|
||||
globalReplicationPool.resyncState.RLock()
|
||||
m := globalReplicationPool.resyncState.statusMap[bucket]
|
||||
st := m.TargetsMap[arn]
|
||||
globalReplicationPool.resyncState.RUnlock()
|
||||
var lastCheckpoint string
|
||||
if st.ResyncStatus == ResyncStarted || st.ResyncStatus == ResyncFailed {
|
||||
lastCheckpoint = st.Object
|
||||
}
|
||||
for obj := range objInfoCh {
|
||||
if heal && lastCheckpoint != "" && lastCheckpoint != obj.Name {
|
||||
continue
|
||||
}
|
||||
lastCheckpoint = ""
|
||||
|
||||
roi := getHealReplicateObjectInfo(obj, rcfg)
|
||||
if !roi.ExistingObjResync.mustResync() {
|
||||
continue
|
||||
}
|
||||
|
||||
if roi.DeleteMarker || !roi.VersionPurgeStatus.Empty() {
|
||||
|
||||
versionID := ""
|
||||
dmVersionID := ""
|
||||
if roi.VersionPurgeStatus.Empty() {
|
||||
dmVersionID = roi.VersionID
|
||||
} else {
|
||||
versionID = roi.VersionID
|
||||
}
|
||||
|
||||
doi := DeletedObjectReplicationInfo{
|
||||
DeletedObject: DeletedObject{
|
||||
ObjectName: roi.Name,
|
||||
DeleteMarkerVersionID: dmVersionID,
|
||||
VersionID: versionID,
|
||||
ReplicationState: roi.getReplicationState(roi.Dsc.String(), versionID, true),
|
||||
DeleteMarkerMTime: DeleteMarkerMTime{roi.ModTime},
|
||||
DeleteMarker: roi.DeleteMarker,
|
||||
},
|
||||
Bucket: roi.Bucket,
|
||||
OpType: replication.ExistingObjectReplicationType,
|
||||
}
|
||||
replicateDelete(ctx, doi, objectAPI, ReplicateDelete)
|
||||
} else {
|
||||
roi.OpType = replication.ExistingObjectReplicationType
|
||||
replicateObject(ctx, roi, objectAPI, ReplicateExisting)
|
||||
}
|
||||
_, err = tgt.StatObject(ctx, tgt.Bucket, roi.Name, miniogo.StatObjectOptions{
|
||||
VersionID: roi.VersionID,
|
||||
Internal: miniogo.AdvancedGetOptions{
|
||||
ReplicationProxyRequest: "false",
|
||||
},
|
||||
})
|
||||
globalReplicationPool.resyncState.Lock()
|
||||
m = globalReplicationPool.resyncState.statusMap[bucket]
|
||||
st = m.TargetsMap[arn]
|
||||
st.Object = roi.Name
|
||||
if err != nil {
|
||||
if roi.DeleteMarker && isErrMethodNotAllowed(ErrorRespToObjectError(err, bucket, roi.Name)) {
|
||||
st.ReplicatedCount++
|
||||
} else {
|
||||
st.FailedCount++
|
||||
}
|
||||
} else {
|
||||
st.ReplicatedCount++
|
||||
st.ReplicatedSize += roi.Size
|
||||
}
|
||||
m.TargetsMap[arn] = st
|
||||
globalReplicationPool.resyncState.Unlock()
|
||||
}
|
||||
resyncStatus = ResyncCompleted
|
||||
}
|
||||
|
||||
// start replication resync for the remote target ARN specified
|
||||
func startReplicationResync(ctx context.Context, bucket, arn, resyncID string, resyncBeforeDate time.Time, objAPI ObjectLayer) error {
|
||||
if bucket == "" {
|
||||
return fmt.Errorf("bucket name is empty")
|
||||
}
|
||||
if arn == "" {
|
||||
return fmt.Errorf("target ARN specified for resync is empty")
|
||||
}
|
||||
// Check if the current bucket has quota restrictions, if not skip it
|
||||
cfg, err := getReplicationConfig(ctx, bucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tgtArns := cfg.FilterTargetArns(
|
||||
replication.ObjectOpts{
|
||||
OpType: replication.ResyncReplicationType,
|
||||
TargetArn: arn,
|
||||
})
|
||||
|
||||
if len(tgtArns) == 0 {
|
||||
return fmt.Errorf("arn %s specified for resync not found in replication config", arn)
|
||||
}
|
||||
|
||||
data, err := loadBucketResyncMetadata(ctx, bucket, objAPI)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// validate if resync is in progress for this arn
|
||||
for tArn, st := range data.TargetsMap {
|
||||
if arn == tArn && st.ResyncStatus == ResyncStarted {
|
||||
return fmt.Errorf("Resync of bucket %s is already in progress for remote bucket %s", bucket, arn)
|
||||
}
|
||||
}
|
||||
|
||||
status := TargetReplicationResyncStatus{
|
||||
ResyncID: resyncID,
|
||||
ResyncBeforeDate: resyncBeforeDate,
|
||||
StartTime: UTCNow(),
|
||||
ResyncStatus: ResyncStarted,
|
||||
Bucket: bucket,
|
||||
}
|
||||
data.TargetsMap[arn] = status
|
||||
if err = saveResyncStatus(ctx, bucket, data, objAPI); err != nil {
|
||||
return err
|
||||
}
|
||||
globalReplicationPool.resyncState.Lock()
|
||||
defer globalReplicationPool.resyncState.Unlock()
|
||||
brs, ok := globalReplicationPool.resyncState.statusMap[bucket]
|
||||
if !ok {
|
||||
brs = BucketReplicationResyncStatus{
|
||||
Version: resyncMetaVersion,
|
||||
TargetsMap: make(map[string]TargetReplicationResyncStatus),
|
||||
}
|
||||
}
|
||||
brs.TargetsMap[arn] = status
|
||||
globalReplicationPool.resyncState.statusMap[bucket] = brs
|
||||
go resyncBucket(GlobalContext, bucket, arn, false, objAPI)
|
||||
return nil
|
||||
}
|
||||
|
||||
// delete resync metadata from replication resync state in memory
|
||||
func (p *ReplicationPool) deleteResyncMetadata(ctx context.Context, bucket string) {
|
||||
if p == nil {
|
||||
return
|
||||
}
|
||||
p.resyncState.Lock()
|
||||
delete(p.resyncState.statusMap, bucket)
|
||||
defer p.resyncState.Unlock()
|
||||
}
|
||||
|
||||
// initResync - initializes bucket replication resync for all buckets.
|
||||
func (p *ReplicationPool) initResync(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) error {
|
||||
if objAPI == nil {
|
||||
return errServerNotInitialized
|
||||
}
|
||||
// replication applies only to erasure coded setups
|
||||
if !globalIsErasure {
|
||||
return nil
|
||||
}
|
||||
// Load bucket metadata sys in background
|
||||
go p.loadResync(ctx, buckets, objAPI)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Loads bucket replication resync statuses into memory.
|
||||
func (p *ReplicationPool) loadResync(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) {
|
||||
for index := range buckets {
|
||||
meta, err := loadBucketResyncMetadata(ctx, buckets[index].Name, objAPI)
|
||||
if err != nil {
|
||||
if errors.Is(err, errVolumeNotFound) {
|
||||
meta = newBucketResyncStatus(buckets[index].Name)
|
||||
} else {
|
||||
logger.LogIf(ctx, err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
p.resyncState.statusMap[buckets[index].Name] = meta
|
||||
}
|
||||
for index := range buckets {
|
||||
bucket := buckets[index].Name
|
||||
m, ok := p.resyncState.statusMap[bucket]
|
||||
if ok {
|
||||
for arn, st := range m.TargetsMap {
|
||||
if st.ResyncStatus == ResyncFailed || st.ResyncStatus == ResyncStarted {
|
||||
go resyncBucket(ctx, bucket, arn, true, objAPI)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// load bucket resync metadata from disk
|
||||
func loadBucketResyncMetadata(ctx context.Context, bucket string, objAPI ObjectLayer) (brs BucketReplicationResyncStatus, e error) {
|
||||
brs = newBucketResyncStatus(bucket)
|
||||
|
||||
resyncDirPath := path.Join(bucketMetaPrefix, bucket, replicationDir)
|
||||
data, err := readConfig(GlobalContext, objAPI, pathJoin(resyncDirPath, resyncFileName))
|
||||
if err != nil && err != errConfigNotFound {
|
||||
return brs, err
|
||||
}
|
||||
if len(data) == 0 {
|
||||
// Seems to be empty.
|
||||
return brs, nil
|
||||
}
|
||||
if len(data) <= 4 {
|
||||
return brs, fmt.Errorf("replication resync: no data")
|
||||
}
|
||||
// Read resync meta header
|
||||
switch binary.LittleEndian.Uint16(data[0:2]) {
|
||||
case resyncMetaFormat:
|
||||
default:
|
||||
return brs, fmt.Errorf("resyncMeta: unknown format: %d", binary.LittleEndian.Uint16(data[0:2]))
|
||||
}
|
||||
switch binary.LittleEndian.Uint16(data[2:4]) {
|
||||
case resyncMetaVersion:
|
||||
default:
|
||||
return brs, fmt.Errorf("resyncMeta: unknown version: %d", binary.LittleEndian.Uint16(data[2:4]))
|
||||
}
|
||||
// OK, parse data.
|
||||
if _, err = brs.UnmarshalMsg(data[4:]); err != nil {
|
||||
return brs, err
|
||||
}
|
||||
|
||||
switch brs.Version {
|
||||
case resyncMetaVersionV1:
|
||||
default:
|
||||
return brs, fmt.Errorf("unexpected resync meta version: %d", brs.Version)
|
||||
}
|
||||
return brs, nil
|
||||
}
|
||||
|
||||
// save resync status to resync.bin
|
||||
func saveResyncStatus(ctx context.Context, bucket string, brs BucketReplicationResyncStatus, objectAPI ObjectLayer) error {
|
||||
data := make([]byte, 4, brs.Msgsize()+4)
|
||||
|
||||
// Initialize the resync meta header.
|
||||
binary.LittleEndian.PutUint16(data[0:2], resyncMetaFormat)
|
||||
binary.LittleEndian.PutUint16(data[2:4], resyncMetaVersion)
|
||||
|
||||
buf, err := brs.MarshalMsg(data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
configFile := path.Join(bucketMetaPrefix, bucket, replicationDir, resyncFileName)
|
||||
return saveConfig(ctx, objectAPI, configFile, buf)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user