replication: perform bucket resync in parallel (#16707)

Default number of parallel resync operations for a bucket to 10
to speed up resync.
This commit is contained in:
Poorna Krishnamoorthy 2023-02-24 12:07:34 -08:00 committed by Harshavardhana
parent c9e87f0548
commit f986b0c493

View File

@ -49,6 +49,7 @@ import (
"github.com/minio/minio/internal/hash" "github.com/minio/minio/internal/hash"
xhttp "github.com/minio/minio/internal/http" xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/logger"
"github.com/minio/pkg/workers"
"github.com/zeebo/xxh3" "github.com/zeebo/xxh3"
) )
@ -2307,7 +2308,10 @@ func (s *replicationResyncer) PersistToDisk(ctx context.Context, objectAPI Objec
} }
} }
const resyncWorkerCnt = 50 // limit of number of bucket resyncs is progress at any given time const (
resyncWorkerCnt = 10 // limit of number of bucket resyncs is progress at any given time
resyncParallelRoutines = 10 // number of parallel resync ops per bucket
)
func newresyncer() *replicationResyncer { func newresyncer() *replicationResyncer {
rs := replicationResyncer{ rs := replicationResyncer{
@ -2322,6 +2326,36 @@ func newresyncer() *replicationResyncer {
return &rs return &rs
} }
// mark status of replication resync on remote target for the bucket
func (s *replicationResyncer) markStatus(status ResyncStatusType, opts resyncOpts) {
s.Lock()
defer s.Unlock()
m := s.statusMap[opts.bucket]
st := m.TargetsMap[opts.arn]
st.LastUpdate = UTCNow()
st.ResyncStatus = status
m.TargetsMap[opts.arn] = st
m.LastUpdate = UTCNow()
s.statusMap[opts.bucket] = m
}
// update replication resync stats for bucket's remote target
func (s *replicationResyncer) incStats(ts TargetReplicationResyncStatus, opts resyncOpts) {
s.Lock()
defer s.Unlock()
m := s.statusMap[opts.bucket]
st := m.TargetsMap[opts.arn]
st.Object = ts.Object
st.ReplicatedCount += ts.ReplicatedCount
st.FailedCount += ts.FailedCount
st.ReplicatedSize += ts.ReplicatedSize
st.FailedSize += ts.FailedSize
m.TargetsMap[opts.arn] = st
m.LastUpdate = UTCNow()
s.statusMap[opts.bucket] = m
}
// resyncBucket resyncs all qualifying objects as per replication rules for the target // resyncBucket resyncs all qualifying objects as per replication rules for the target
// ARN // ARN
func (s *replicationResyncer) resyncBucket(ctx context.Context, objectAPI ObjectLayer, heal bool, opts resyncOpts) { func (s *replicationResyncer) resyncBucket(ctx context.Context, objectAPI ObjectLayer, heal bool, opts resyncOpts) {
@ -2333,15 +2367,7 @@ func (s *replicationResyncer) resyncBucket(ctx context.Context, objectAPI Object
resyncStatus := ResyncFailed resyncStatus := ResyncFailed
defer func() { defer func() {
s.Lock() s.markStatus(resyncStatus, opts)
m := s.statusMap[opts.bucket]
st := m.TargetsMap[opts.arn]
st.LastUpdate = UTCNow()
st.ResyncStatus = resyncStatus
m.TargetsMap[opts.arn] = st
m.LastUpdate = UTCNow()
s.statusMap[opts.bucket] = m
s.Unlock()
globalSiteResyncMetrics.incBucket(opts, resyncStatus) globalSiteResyncMetrics.incBucket(opts, resyncStatus)
s.workerCh <- struct{}{} s.workerCh <- struct{}{}
}() }()
@ -2377,15 +2403,9 @@ func (s *replicationResyncer) resyncBucket(ctx context.Context, objectAPI Object
} }
// mark resync status as resync started // mark resync status as resync started
if !heal { if !heal {
s.Lock() s.markStatus(ResyncStarted, opts)
m := s.statusMap[opts.bucket]
st := m.TargetsMap[opts.arn]
st.ResyncStatus = ResyncStarted
m.TargetsMap[opts.arn] = st
m.LastUpdate = UTCNow()
s.statusMap[opts.bucket] = m
s.Unlock()
} }
// Walk through all object versions - Walk() is always in ascending order needed to ensure // Walk through all object versions - Walk() is always in ascending order needed to ensure
// delete marker replicated to target after object version is first created. // delete marker replicated to target after object version is first created.
if err := objectAPI.Walk(ctx, opts.bucket, "", objInfoCh, ObjectOptions{}); err != nil { if err := objectAPI.Walk(ctx, opts.bucket, "", objInfoCh, ObjectOptions{}); err != nil {
@ -2401,80 +2421,85 @@ func (s *replicationResyncer) resyncBucket(ctx context.Context, objectAPI Object
if st.ResyncStatus == ResyncStarted || st.ResyncStatus == ResyncFailed { if st.ResyncStatus == ResyncStarted || st.ResyncStatus == ResyncFailed {
lastCheckpoint = st.Object lastCheckpoint = st.Object
} }
workers, err := workers.New(resyncParallelRoutines)
for obj := range objInfoCh { for obj := range objInfoCh {
select { select {
case <-s.resyncCancelCh: case <-s.resyncCancelCh:
resyncStatus = ResyncCanceled resyncStatus = ResyncCanceled
return return
case <-ctx.Done():
return
default: default:
} }
if heal && lastCheckpoint != "" && lastCheckpoint != obj.Name { if heal && lastCheckpoint != "" && lastCheckpoint != obj.Name {
continue continue
} }
lastCheckpoint = "" lastCheckpoint = ""
obj := obj
roi := getHealReplicateObjectInfo(obj, rcfg) workers.Take()
if !roi.ExistingObjResync.mustResync() { go func() {
continue defer workers.Give()
} roi := getHealReplicateObjectInfo(obj, rcfg)
traceFn := s.trace(tgt.ResetID, fmt.Sprintf("%s/%s (%s)", opts.bucket, roi.Name, roi.VersionID)) if !roi.ExistingObjResync.mustResync() {
if roi.DeleteMarker || !roi.VersionPurgeStatus.Empty() { return
versionID := ""
dmVersionID := ""
if roi.VersionPurgeStatus.Empty() {
dmVersionID = roi.VersionID
} else {
versionID = roi.VersionID
} }
traceFn := s.trace(tgt.ResetID, fmt.Sprintf("%s/%s (%s)", opts.bucket, roi.Name, roi.VersionID))
if roi.DeleteMarker || !roi.VersionPurgeStatus.Empty() {
versionID := ""
dmVersionID := ""
if roi.VersionPurgeStatus.Empty() {
dmVersionID = roi.VersionID
} else {
versionID = roi.VersionID
}
doi := DeletedObjectReplicationInfo{ doi := DeletedObjectReplicationInfo{
DeletedObject: DeletedObject{ DeletedObject: DeletedObject{
ObjectName: roi.Name, ObjectName: roi.Name,
DeleteMarkerVersionID: dmVersionID, DeleteMarkerVersionID: dmVersionID,
VersionID: versionID, VersionID: versionID,
ReplicationState: roi.getReplicationState(roi.Dsc.String(), versionID, true), ReplicationState: roi.getReplicationState(roi.Dsc.String(), versionID, true),
DeleteMarkerMTime: DeleteMarkerMTime{roi.ModTime}, DeleteMarkerMTime: DeleteMarkerMTime{roi.ModTime},
DeleteMarker: roi.DeleteMarker, DeleteMarker: roi.DeleteMarker,
},
Bucket: roi.Bucket,
OpType: replication.ExistingObjectReplicationType,
EventType: ReplicateExistingDelete,
}
replicateDelete(ctx, doi, objectAPI)
} else {
roi.OpType = replication.ExistingObjectReplicationType
roi.EventType = ReplicateExisting
replicateObject(ctx, roi, objectAPI)
}
_, err = tgt.StatObject(ctx, tgt.Bucket, roi.Name, minio.StatObjectOptions{
VersionID: roi.VersionID,
Internal: minio.AdvancedGetOptions{
ReplicationProxyRequest: "false",
}, },
Bucket: roi.Bucket, })
OpType: replication.ExistingObjectReplicationType, st := TargetReplicationResyncStatus{
EventType: ReplicateExistingDelete, Object: roi.Name,
Bucket: roi.Bucket,
} }
replicateDelete(ctx, doi, objectAPI) success := true
} else { if err != nil {
roi.OpType = replication.ExistingObjectReplicationType if roi.DeleteMarker && isErrMethodNotAllowed(ErrorRespToObjectError(err, opts.bucket, roi.Name)) {
roi.EventType = ReplicateExisting st.ReplicatedCount++
replicateObject(ctx, roi, objectAPI) } else {
} st.FailedCount++
_, err = tgt.StatObject(ctx, tgt.Bucket, roi.Name, minio.StatObjectOptions{ success = false
VersionID: roi.VersionID, }
Internal: minio.AdvancedGetOptions{
ReplicationProxyRequest: "false",
},
})
s.Lock()
m = s.statusMap[opts.bucket]
st = m.TargetsMap[opts.arn]
st.Object = roi.Name
success := true
if err != nil {
if roi.DeleteMarker && isErrMethodNotAllowed(ErrorRespToObjectError(err, opts.bucket, roi.Name)) {
st.ReplicatedCount++
} else { } else {
st.FailedCount++ st.ReplicatedCount++
success = false st.ReplicatedSize += roi.Size
} }
} else { s.incStats(st, opts)
st.ReplicatedCount++ traceFn(err)
st.ReplicatedSize += roi.Size globalSiteResyncMetrics.updateMetric(roi, success, opts.resyncID)
} }()
m.TargetsMap[opts.arn] = st
m.LastUpdate = UTCNow()
s.statusMap[opts.bucket] = m
s.Unlock()
traceFn(err)
globalSiteResyncMetrics.updateMetric(roi, success, opts.resyncID)
} }
workers.Wait()
resyncStatus = ResyncCompleted resyncStatus = ResyncCompleted
} }