mirror of
https://github.com/minio/minio.git
synced 2025-11-21 18:26:04 -05:00
support deleteMarkers and all versions in batch replication (#15858)
This commit is contained in:
@@ -272,17 +272,53 @@ func (r *BatchJobReplicateV1) ReplicateToTarget(ctx context.Context, api ObjectL
|
||||
tgtPrefix := r.Target.Prefix
|
||||
srcObject := srcObjInfo.Name
|
||||
|
||||
if srcObjInfo.DeleteMarker || !srcObjInfo.VersionPurgeStatus.Empty() {
|
||||
if retry {
|
||||
if _, err := c.StatObject(ctx, tgtBucket, pathJoin(tgtPrefix, srcObject), miniogo.StatObjectOptions{
|
||||
VersionID: srcObjInfo.VersionID,
|
||||
Internal: miniogo.AdvancedGetOptions{
|
||||
ReplicationProxyRequest: "false",
|
||||
},
|
||||
}); isErrMethodNotAllowed(ErrorRespToObjectError(err, tgtBucket, pathJoin(tgtPrefix, srcObject))) {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
versionID := srcObjInfo.VersionID
|
||||
dmVersionID := ""
|
||||
if srcObjInfo.VersionPurgeStatus.Empty() {
|
||||
dmVersionID = srcObjInfo.VersionID
|
||||
}
|
||||
|
||||
return c.RemoveObject(ctx, tgtBucket, pathJoin(tgtPrefix, srcObject), miniogo.RemoveObjectOptions{
|
||||
VersionID: versionID,
|
||||
Internal: miniogo.AdvancedRemoveOptions{
|
||||
ReplicationDeleteMarker: dmVersionID != "",
|
||||
ReplicationMTime: srcObjInfo.ModTime,
|
||||
ReplicationStatus: miniogo.ReplicationStatusReplica,
|
||||
ReplicationRequest: true, // always set this to distinguish between `mc mirror` replication and serverside
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
if retry { // when we are retrying avoid copying if necessary.
|
||||
gopts := miniogo.GetObjectOptions{}
|
||||
if err := gopts.SetMatchETag(srcObjInfo.ETag); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := c.StatObject(ctx, tgtBucket, srcObject, gopts); err == nil {
|
||||
if _, err := c.StatObject(ctx, tgtBucket, pathJoin(tgtPrefix, srcObject), gopts); err == nil {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
opts := ObjectOptions{}
|
||||
versioned := globalBucketVersioningSys.PrefixEnabled(srcBucket, srcObject)
|
||||
versionSuspended := globalBucketVersioningSys.PrefixSuspended(srcBucket, srcObject)
|
||||
|
||||
opts := ObjectOptions{
|
||||
VersionID: srcObjInfo.VersionID,
|
||||
Versioned: versioned,
|
||||
VersionSuspended: versionSuspended,
|
||||
}
|
||||
rd, err := api.GetObjectNInfo(ctx, srcBucket, srcObject, nil, http.Header{}, readLock, opts)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -331,10 +367,12 @@ type batchJobInfo struct {
|
||||
Object string `json:"-" msg:"lobj"`
|
||||
|
||||
// Verbose information
|
||||
Objects int64 `json:"objects" msg:"ob"`
|
||||
ObjectsFailed int64 `json:"objectsFailed" msg:"obf"`
|
||||
BytesTransferred int64 `json:"bytesTransferred" msg:"bt"`
|
||||
BytesFailed int64 `json:"bytesFailed" msg:"bf"`
|
||||
Objects int64 `json:"objects" msg:"ob"`
|
||||
DeleteMarkers int64 `json:"deleteMarkers" msg:"dm"`
|
||||
ObjectsFailed int64 `json:"objectsFailed" msg:"obf"`
|
||||
DeleteMarkersFailed int64 `json:"deleteMarkersFailed" msg:"dmf"`
|
||||
BytesTransferred int64 `json:"bytesTransferred" msg:"bt"`
|
||||
BytesFailed int64 `json:"bytesFailed" msg:"bf"`
|
||||
}
|
||||
|
||||
const (
|
||||
@@ -431,16 +469,24 @@ func (ri batchJobInfo) save(ctx context.Context, api ObjectLayer, jobLocation st
|
||||
return saveConfig(ctx, api, pathJoin(jobLocation, batchReplName), buf)
|
||||
}
|
||||
|
||||
func (ri *batchJobInfo) countItem(size int64, success bool) {
|
||||
func (ri *batchJobInfo) countItem(size int64, dmarker, success bool) {
|
||||
if ri == nil {
|
||||
return
|
||||
}
|
||||
if success {
|
||||
ri.Objects++
|
||||
ri.BytesTransferred += size
|
||||
if dmarker {
|
||||
ri.DeleteMarkers++
|
||||
} else {
|
||||
ri.Objects++
|
||||
ri.BytesTransferred += size
|
||||
}
|
||||
} else {
|
||||
ri.ObjectsFailed++
|
||||
ri.BytesFailed += size
|
||||
if dmarker {
|
||||
ri.DeleteMarkersFailed++
|
||||
} else {
|
||||
ri.ObjectsFailed++
|
||||
ri.BytesFailed += size
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -466,7 +512,7 @@ func (ri *batchJobInfo) trackCurrentBucketObject(bucket string, info ObjectInfo,
|
||||
}
|
||||
ri.Bucket = bucket
|
||||
ri.Object = info.Name
|
||||
ri.countItem(info.Size, failed)
|
||||
ri.countItem(info.Size, info.DeleteMarker, failed)
|
||||
}
|
||||
|
||||
// Start start the batch replication job, resumes if there was a pending job via "job.ID"
|
||||
@@ -544,15 +590,15 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
|
||||
}
|
||||
|
||||
for result := range results {
|
||||
if result.DeleteMarker {
|
||||
// delete-markers will never be 'replicated'
|
||||
continue
|
||||
}
|
||||
|
||||
stopFn := globalBatchJobsMetrics.trace(batchReplicationMetricObject, job.ID, attempts, result)
|
||||
success := true
|
||||
if err := r.ReplicateToTarget(ctx, api, c, result, retry); err != nil {
|
||||
stopFn(err)
|
||||
if isErrVersionNotFound(err) || isErrObjectNotFound(err) {
|
||||
// object must be deleted concurrently, allow
|
||||
// these failures but do not count them
|
||||
continue
|
||||
}
|
||||
logger.LogIf(ctx, err)
|
||||
success = false
|
||||
} else {
|
||||
|
||||
Reference in New Issue
Block a user