mirror of
https://github.com/minio/minio.git
synced 2025-11-07 21:02:58 -05:00
replication resync: fix queueing (#17775)
Assign resync of all versions of object to the same worker to avoid locking contention. Fixes parallel resync implementation in #16707
This commit is contained in:
@@ -51,7 +51,6 @@ import (
|
||||
"github.com/minio/minio/internal/hash"
|
||||
xhttp "github.com/minio/minio/internal/http"
|
||||
"github.com/minio/minio/internal/logger"
|
||||
"github.com/minio/pkg/workers"
|
||||
"github.com/zeebo/xxh3"
|
||||
)
|
||||
|
||||
@@ -2440,12 +2439,12 @@ func (s *replicationResyncer) resyncBucket(ctx context.Context, objectAPI Object
|
||||
objInfoCh := make(chan ObjectInfo)
|
||||
cfg, err := getReplicationConfig(ctx, opts.bucket)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, fmt.Errorf("Replication resync of %s for arn %s failed with %w", opts.bucket, opts.arn, err))
|
||||
logger.LogIf(ctx, fmt.Errorf("replication resync of %s for arn %s failed with %w", opts.bucket, opts.arn, err))
|
||||
return
|
||||
}
|
||||
tgts, err := globalBucketTargetSys.ListBucketTargets(ctx, opts.bucket)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, fmt.Errorf("Replication resync of %s for arn %s failed %w", opts.bucket, opts.arn, err))
|
||||
logger.LogIf(ctx, fmt.Errorf("replication resync of %s for arn %s failed %w", opts.bucket, opts.arn, err))
|
||||
return
|
||||
}
|
||||
rcfg := replicationConfig{
|
||||
@@ -2458,12 +2457,12 @@ func (s *replicationResyncer) resyncBucket(ctx context.Context, objectAPI Object
|
||||
TargetArn: opts.arn,
|
||||
})
|
||||
if len(tgtArns) != 1 {
|
||||
logger.LogIf(ctx, fmt.Errorf("Replication resync failed for %s - arn specified %s is missing in the replication config", opts.bucket, opts.arn))
|
||||
logger.LogIf(ctx, fmt.Errorf("replication resync failed for %s - arn specified %s is missing in the replication config", opts.bucket, opts.arn))
|
||||
return
|
||||
}
|
||||
tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, opts.arn)
|
||||
if tgt == nil {
|
||||
logger.LogIf(ctx, fmt.Errorf("Replication resync failed for %s - target could not be created for arn %s", opts.bucket, opts.arn))
|
||||
logger.LogIf(ctx, fmt.Errorf("replication resync failed for %s - target could not be created for arn %s", opts.bucket, opts.arn))
|
||||
return
|
||||
}
|
||||
// mark resync status as resync started
|
||||
@@ -2486,7 +2485,83 @@ func (s *replicationResyncer) resyncBucket(ctx context.Context, objectAPI Object
|
||||
if st.ResyncStatus == ResyncStarted || st.ResyncStatus == ResyncFailed {
|
||||
lastCheckpoint = st.Object
|
||||
}
|
||||
workers, err := workers.New(resyncParallelRoutines)
|
||||
workers := make([]chan ReplicateObjectInfo, resyncParallelRoutines)
|
||||
resultCh := make(chan TargetReplicationResyncStatus, 1)
|
||||
defer close(resultCh)
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < resyncParallelRoutines; i++ {
|
||||
wg.Add(1)
|
||||
workers[i] = make(chan ReplicateObjectInfo, 100)
|
||||
i := i
|
||||
go func(ctx context.Context, idx int) {
|
||||
defer wg.Done()
|
||||
for roi := range workers[idx] {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-s.resyncCancelCh:
|
||||
default:
|
||||
}
|
||||
traceFn := s.trace(tgt.ResetID, fmt.Sprintf("%s/%s (%s)", opts.bucket, roi.Name, roi.VersionID))
|
||||
if roi.DeleteMarker || !roi.VersionPurgeStatus.Empty() {
|
||||
versionID := ""
|
||||
dmVersionID := ""
|
||||
if roi.VersionPurgeStatus.Empty() {
|
||||
dmVersionID = roi.VersionID
|
||||
} else {
|
||||
versionID = roi.VersionID
|
||||
}
|
||||
|
||||
doi := DeletedObjectReplicationInfo{
|
||||
DeletedObject: DeletedObject{
|
||||
ObjectName: roi.Name,
|
||||
DeleteMarkerVersionID: dmVersionID,
|
||||
VersionID: versionID,
|
||||
ReplicationState: roi.getReplicationState(),
|
||||
DeleteMarkerMTime: DeleteMarkerMTime{roi.ModTime},
|
||||
DeleteMarker: roi.DeleteMarker,
|
||||
},
|
||||
Bucket: roi.Bucket,
|
||||
OpType: replication.ExistingObjectReplicationType,
|
||||
EventType: ReplicateExistingDelete,
|
||||
}
|
||||
replicateDelete(ctx, doi, objectAPI)
|
||||
} else {
|
||||
roi.OpType = replication.ExistingObjectReplicationType
|
||||
roi.EventType = ReplicateExisting
|
||||
replicateObject(ctx, roi, objectAPI)
|
||||
}
|
||||
_, err = tgt.StatObject(ctx, tgt.Bucket, roi.Name, minio.StatObjectOptions{
|
||||
VersionID: roi.VersionID,
|
||||
Internal: minio.AdvancedGetOptions{
|
||||
ReplicationProxyRequest: "false",
|
||||
},
|
||||
})
|
||||
st := TargetReplicationResyncStatus{
|
||||
Object: roi.Name,
|
||||
Bucket: roi.Bucket,
|
||||
}
|
||||
if err != nil {
|
||||
if roi.DeleteMarker && isErrMethodNotAllowed(ErrorRespToObjectError(err, opts.bucket, roi.Name)) {
|
||||
st.ReplicatedCount++
|
||||
} else {
|
||||
st.FailedCount++
|
||||
}
|
||||
} else {
|
||||
st.ReplicatedCount++
|
||||
st.ReplicatedSize += roi.Size
|
||||
}
|
||||
traceFn(err)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-s.resyncCancelCh:
|
||||
return
|
||||
case resultCh <- st:
|
||||
}
|
||||
}
|
||||
}(ctx, i)
|
||||
}
|
||||
for obj := range objInfoCh {
|
||||
select {
|
||||
case <-s.resyncCancelCh:
|
||||
@@ -2500,71 +2575,30 @@ func (s *replicationResyncer) resyncBucket(ctx context.Context, objectAPI Object
|
||||
continue
|
||||
}
|
||||
lastCheckpoint = ""
|
||||
obj := obj
|
||||
workers.Take()
|
||||
go func() {
|
||||
defer workers.Give()
|
||||
roi := getHealReplicateObjectInfo(obj, rcfg)
|
||||
if !roi.ExistingObjResync.mustResync() {
|
||||
return
|
||||
}
|
||||
traceFn := s.trace(tgt.ResetID, fmt.Sprintf("%s/%s (%s)", opts.bucket, roi.Name, roi.VersionID))
|
||||
if roi.DeleteMarker || !roi.VersionPurgeStatus.Empty() {
|
||||
versionID := ""
|
||||
dmVersionID := ""
|
||||
if roi.VersionPurgeStatus.Empty() {
|
||||
dmVersionID = roi.VersionID
|
||||
} else {
|
||||
versionID = roi.VersionID
|
||||
}
|
||||
|
||||
doi := DeletedObjectReplicationInfo{
|
||||
DeletedObject: DeletedObject{
|
||||
ObjectName: roi.Name,
|
||||
DeleteMarkerVersionID: dmVersionID,
|
||||
VersionID: versionID,
|
||||
ReplicationState: roi.getReplicationState(),
|
||||
DeleteMarkerMTime: DeleteMarkerMTime{roi.ModTime},
|
||||
DeleteMarker: roi.DeleteMarker,
|
||||
},
|
||||
Bucket: roi.Bucket,
|
||||
OpType: replication.ExistingObjectReplicationType,
|
||||
EventType: ReplicateExistingDelete,
|
||||
}
|
||||
replicateDelete(ctx, doi, objectAPI)
|
||||
} else {
|
||||
roi.OpType = replication.ExistingObjectReplicationType
|
||||
roi.EventType = ReplicateExisting
|
||||
replicateObject(ctx, roi, objectAPI)
|
||||
}
|
||||
_, err = tgt.StatObject(ctx, tgt.Bucket, roi.Name, minio.StatObjectOptions{
|
||||
VersionID: roi.VersionID,
|
||||
Internal: minio.AdvancedGetOptions{
|
||||
ReplicationProxyRequest: "false",
|
||||
},
|
||||
})
|
||||
st := TargetReplicationResyncStatus{
|
||||
Object: roi.Name,
|
||||
Bucket: roi.Bucket,
|
||||
}
|
||||
success := true
|
||||
if err != nil {
|
||||
if roi.DeleteMarker && isErrMethodNotAllowed(ErrorRespToObjectError(err, opts.bucket, roi.Name)) {
|
||||
st.ReplicatedCount++
|
||||
} else {
|
||||
st.FailedCount++
|
||||
success = false
|
||||
}
|
||||
} else {
|
||||
st.ReplicatedCount++
|
||||
st.ReplicatedSize += roi.Size
|
||||
}
|
||||
s.incStats(st, opts)
|
||||
traceFn(err)
|
||||
globalSiteResyncMetrics.updateMetric(roi, success, opts.resyncID)
|
||||
}()
|
||||
roi := getHealReplicateObjectInfo(obj, rcfg)
|
||||
if !roi.ExistingObjResync.mustResync() {
|
||||
continue
|
||||
}
|
||||
select {
|
||||
case <-s.resyncCancelCh:
|
||||
return
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
h := xxh3.HashString(roi.Bucket + roi.Name)
|
||||
workers[h%uint64(resyncParallelRoutines)] <- roi
|
||||
}
|
||||
}
|
||||
workers.Wait()
|
||||
for i := 0; i < resyncParallelRoutines; i++ {
|
||||
close(workers[i])
|
||||
}
|
||||
go func() {
|
||||
for r := range resultCh {
|
||||
s.incStats(r, opts)
|
||||
globalSiteResyncMetrics.updateMetric(r, opts.resyncID)
|
||||
}
|
||||
}()
|
||||
wg.Wait()
|
||||
resyncStatus = ResyncCompleted
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user