diff --git a/cmd/batch-handlers.go b/cmd/batch-handlers.go index 067853ef3..3dd1fa261 100644 --- a/cmd/batch-handlers.go +++ b/cmd/batch-handlers.go @@ -272,17 +272,53 @@ func (r *BatchJobReplicateV1) ReplicateToTarget(ctx context.Context, api ObjectL tgtPrefix := r.Target.Prefix srcObject := srcObjInfo.Name + if srcObjInfo.DeleteMarker || !srcObjInfo.VersionPurgeStatus.Empty() { + if retry { + if _, err := c.StatObject(ctx, tgtBucket, pathJoin(tgtPrefix, srcObject), miniogo.StatObjectOptions{ + VersionID: srcObjInfo.VersionID, + Internal: miniogo.AdvancedGetOptions{ + ReplicationProxyRequest: "false", + }, + }); isErrMethodNotAllowed(ErrorRespToObjectError(err, tgtBucket, pathJoin(tgtPrefix, srcObject))) { + return nil + } + } + + versionID := srcObjInfo.VersionID + dmVersionID := "" + if srcObjInfo.VersionPurgeStatus.Empty() { + dmVersionID = srcObjInfo.VersionID + } + + return c.RemoveObject(ctx, tgtBucket, pathJoin(tgtPrefix, srcObject), miniogo.RemoveObjectOptions{ + VersionID: versionID, + Internal: miniogo.AdvancedRemoveOptions{ + ReplicationDeleteMarker: dmVersionID != "", + ReplicationMTime: srcObjInfo.ModTime, + ReplicationStatus: miniogo.ReplicationStatusReplica, + ReplicationRequest: true, // always set this to distinguish between `mc mirror` replication and serverside + }, + }) + } + if retry { // when we are retrying avoid copying if necessary. gopts := miniogo.GetObjectOptions{} if err := gopts.SetMatchETag(srcObjInfo.ETag); err != nil { return err } - if _, err := c.StatObject(ctx, tgtBucket, srcObject, gopts); err == nil { + if _, err := c.StatObject(ctx, tgtBucket, pathJoin(tgtPrefix, srcObject), gopts); err == nil { return nil } } - opts := ObjectOptions{} + versioned := globalBucketVersioningSys.PrefixEnabled(srcBucket, srcObject) + versionSuspended := globalBucketVersioningSys.PrefixSuspended(srcBucket, srcObject) + + opts := ObjectOptions{ + VersionID: srcObjInfo.VersionID, + Versioned: versioned, + VersionSuspended: versionSuspended, + } rd, err := api.GetObjectNInfo(ctx, srcBucket, srcObject, nil, http.Header{}, readLock, opts) if err != nil { return err @@ -331,10 +367,12 @@ type batchJobInfo struct { Object string `json:"-" msg:"lobj"` // Verbose information - Objects int64 `json:"objects" msg:"ob"` - ObjectsFailed int64 `json:"objectsFailed" msg:"obf"` - BytesTransferred int64 `json:"bytesTransferred" msg:"bt"` - BytesFailed int64 `json:"bytesFailed" msg:"bf"` + Objects int64 `json:"objects" msg:"ob"` + DeleteMarkers int64 `json:"deleteMarkers" msg:"dm"` + ObjectsFailed int64 `json:"objectsFailed" msg:"obf"` + DeleteMarkersFailed int64 `json:"deleteMarkersFailed" msg:"dmf"` + BytesTransferred int64 `json:"bytesTransferred" msg:"bt"` + BytesFailed int64 `json:"bytesFailed" msg:"bf"` } const ( @@ -431,16 +469,24 @@ func (ri batchJobInfo) save(ctx context.Context, api ObjectLayer, jobLocation st return saveConfig(ctx, api, pathJoin(jobLocation, batchReplName), buf) } -func (ri *batchJobInfo) countItem(size int64, success bool) { +func (ri *batchJobInfo) countItem(size int64, dmarker, success bool) { if ri == nil { return } if success { - ri.Objects++ - ri.BytesTransferred += size + if dmarker { + ri.DeleteMarkers++ + } else { + ri.Objects++ + ri.BytesTransferred += size + } } else { - ri.ObjectsFailed++ - ri.BytesFailed += size + if dmarker { + ri.DeleteMarkersFailed++ + } else { + ri.ObjectsFailed++ + ri.BytesFailed += size + } } } @@ -466,7 +512,7 @@ func (ri *batchJobInfo) trackCurrentBucketObject(bucket string, info ObjectInfo, } ri.Bucket = bucket ri.Object = info.Name - ri.countItem(info.Size, failed) + ri.countItem(info.Size, info.DeleteMarker, failed) } // Start start the batch replication job, resumes if there was a pending job via "job.ID" @@ -544,15 +590,15 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba } for result := range results { - if result.DeleteMarker { - // delete-markers will never be 'replicated' - continue - } - stopFn := globalBatchJobsMetrics.trace(batchReplicationMetricObject, job.ID, attempts, result) success := true if err := r.ReplicateToTarget(ctx, api, c, result, retry); err != nil { stopFn(err) + if isErrVersionNotFound(err) || isErrObjectNotFound(err) { + // object must be deleted concurrently, allow + // these failures but do not count them + continue + } logger.LogIf(ctx, err) success = false } else { diff --git a/cmd/batch-handlers_gen.go b/cmd/batch-handlers_gen.go index 39054c790..20ec1cd75 100644 --- a/cmd/batch-handlers_gen.go +++ b/cmd/batch-handlers_gen.go @@ -2531,12 +2531,24 @@ func (z *batchJobInfo) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "Objects") return } + case "dm": + z.DeleteMarkers, err = dc.ReadInt64() + if err != nil { + err = msgp.WrapError(err, "DeleteMarkers") + return + } case "obf": z.ObjectsFailed, err = dc.ReadInt64() if err != nil { err = msgp.WrapError(err, "ObjectsFailed") return } + case "dmf": + z.DeleteMarkersFailed, err = dc.ReadInt64() + if err != nil { + err = msgp.WrapError(err, "DeleteMarkersFailed") + return + } case "bt": z.BytesTransferred, err = dc.ReadInt64() if err != nil { @@ -2562,9 +2574,9 @@ func (z *batchJobInfo) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *batchJobInfo) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 14 + // map header, size 16 // write "v" - err = en.Append(0x8e, 0xa1, 0x76) + err = en.Append(0xde, 0x0, 0x10, 0xa1, 0x76) if err != nil { return } @@ -2673,6 +2685,16 @@ func (z *batchJobInfo) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "Objects") return } + // write "dm" + err = en.Append(0xa2, 0x64, 0x6d) + if err != nil { + return + } + err = en.WriteInt64(z.DeleteMarkers) + if err != nil { + err = msgp.WrapError(err, "DeleteMarkers") + return + } // write "obf" err = en.Append(0xa3, 0x6f, 0x62, 0x66) if err != nil { @@ -2683,6 +2705,16 @@ func (z *batchJobInfo) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "ObjectsFailed") return } + // write "dmf" + err = en.Append(0xa3, 0x64, 0x6d, 0x66) + if err != nil { + return + } + err = en.WriteInt64(z.DeleteMarkersFailed) + if err != nil { + err = msgp.WrapError(err, "DeleteMarkersFailed") + return + } // write "bt" err = en.Append(0xa2, 0x62, 0x74) if err != nil { @@ -2709,9 +2741,9 @@ func (z *batchJobInfo) EncodeMsg(en *msgp.Writer) (err error) { // MarshalMsg implements msgp.Marshaler func (z *batchJobInfo) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 14 + // map header, size 16 // string "v" - o = append(o, 0x8e, 0xa1, 0x76) + o = append(o, 0xde, 0x0, 0x10, 0xa1, 0x76) o = msgp.AppendInt(o, z.Version) // string "jid" o = append(o, 0xa3, 0x6a, 0x69, 0x64) @@ -2743,9 +2775,15 @@ func (z *batchJobInfo) MarshalMsg(b []byte) (o []byte, err error) { // string "ob" o = append(o, 0xa2, 0x6f, 0x62) o = msgp.AppendInt64(o, z.Objects) + // string "dm" + o = append(o, 0xa2, 0x64, 0x6d) + o = msgp.AppendInt64(o, z.DeleteMarkers) // string "obf" o = append(o, 0xa3, 0x6f, 0x62, 0x66) o = msgp.AppendInt64(o, z.ObjectsFailed) + // string "dmf" + o = append(o, 0xa3, 0x64, 0x6d, 0x66) + o = msgp.AppendInt64(o, z.DeleteMarkersFailed) // string "bt" o = append(o, 0xa2, 0x62, 0x74) o = msgp.AppendInt64(o, z.BytesTransferred) @@ -2839,12 +2877,24 @@ func (z *batchJobInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "Objects") return } + case "dm": + z.DeleteMarkers, bts, err = msgp.ReadInt64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "DeleteMarkers") + return + } case "obf": z.ObjectsFailed, bts, err = msgp.ReadInt64Bytes(bts) if err != nil { err = msgp.WrapError(err, "ObjectsFailed") return } + case "dmf": + z.DeleteMarkersFailed, bts, err = msgp.ReadInt64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "DeleteMarkersFailed") + return + } case "bt": z.BytesTransferred, bts, err = msgp.ReadInt64Bytes(bts) if err != nil { @@ -2871,6 +2921,6 @@ func (z *batchJobInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *batchJobInfo) Msgsize() (s int) { - s = 1 + 2 + msgp.IntSize + 4 + msgp.StringPrefixSize + len(z.JobID) + 3 + msgp.StringPrefixSize + len(z.JobType) + 3 + msgp.TimeSize + 3 + msgp.TimeSize + 3 + msgp.IntSize + 4 + msgp.BoolSize + 4 + msgp.BoolSize + 5 + msgp.StringPrefixSize + len(z.Bucket) + 5 + msgp.StringPrefixSize + len(z.Object) + 3 + msgp.Int64Size + 4 + msgp.Int64Size + 3 + msgp.Int64Size + 3 + msgp.Int64Size + s = 3 + 2 + msgp.IntSize + 4 + msgp.StringPrefixSize + len(z.JobID) + 3 + msgp.StringPrefixSize + len(z.JobType) + 3 + msgp.TimeSize + 3 + msgp.TimeSize + 3 + msgp.IntSize + 4 + msgp.BoolSize + 4 + msgp.BoolSize + 5 + msgp.StringPrefixSize + len(z.Bucket) + 5 + msgp.StringPrefixSize + len(z.Object) + 3 + msgp.Int64Size + 3 + msgp.Int64Size + 4 + msgp.Int64Size + 4 + msgp.Int64Size + 3 + msgp.Int64Size + 3 + msgp.Int64Size return }