diff --git a/cmd/batch-handlers.go b/cmd/batch-handlers.go index baa1f4f5f..7c1af59cc 100644 --- a/cmd/batch-handlers.go +++ b/cmd/batch-handlers.go @@ -174,6 +174,7 @@ type BatchJobReplicateResourceType string func (t BatchJobReplicateResourceType) Validate() error { switch t { case BatchJobReplicateResourceMinIO: + case BatchJobReplicateResourceS3: default: return errInvalidArgument } @@ -183,6 +184,8 @@ func (t BatchJobReplicateResourceType) Validate() error { // Different types of batch jobs.. const ( BatchJobReplicateResourceMinIO BatchJobReplicateResourceType = "minio" + BatchJobReplicateResourceS3 BatchJobReplicateResourceType = "s3" + // add future targets ) @@ -297,10 +300,13 @@ func (r *BatchJobReplicateV1) ReplicateFromSource(ctx context.Context, api Objec versioned := globalBucketVersioningSys.PrefixEnabled(tgtBucket, tgtObject) versionSuspended := globalBucketVersioningSys.PrefixSuspended(tgtBucket, tgtObject) - + versionID := srcObjInfo.VersionID + if r.Target.Type == BatchJobReplicateResourceS3 || r.Source.Type == BatchJobReplicateResourceS3 { + versionID = "" + } if srcObjInfo.DeleteMarker { _, err := api.DeleteObject(ctx, tgtBucket, tgtObject, ObjectOptions{ - VersionID: srcObjInfo.VersionID, + VersionID: versionID, VersionSuspended: versionSuspended, Versioned: versioned, MTime: srcObjInfo.ModTime, @@ -318,6 +324,9 @@ func (r *BatchJobReplicateV1) ReplicateFromSource(ctx context.Context, api Objec PreserveETag: srcObjInfo.ETag, UserDefined: srcObjInfo.UserDefined, } + if r.Target.Type == BatchJobReplicateResourceS3 || r.Source.Type == BatchJobReplicateResourceS3 { + opts.VersionID = "" + } if crypto.S3.IsEncrypted(srcObjInfo.UserDefined) { opts.ServerSideEncryption = encrypt.NewSSE() } @@ -358,7 +367,9 @@ func (r *BatchJobReplicateV1) copyWithMultipartfromSource(ctx context.Context, a if r.Target.Prefix != "" { tgtObject = path.Join(r.Target.Prefix, srcObjInfo.Name) } - + if r.Target.Type == BatchJobReplicateResourceS3 || r.Source.Type == BatchJobReplicateResourceS3 { + opts.VersionID = "" + } var uploadedParts []CompletePart res, err := api.NewMultipartUpload(context.Background(), tgtBucket, tgtObject, opts) if err != nil { @@ -539,19 +550,47 @@ func (r *BatchJobReplicateV1) StartFromSource(ctx context.Context, api ObjectLay retry := false for attempts := 1; attempts <= retryAttempts; attempts++ { attempts := attempts - + // one of source/target is s3, skip delete marker and all versions under the same object name. + s3Type := r.Target.Type == BatchJobReplicateResourceS3 || r.Source.Type == BatchJobReplicateResourceS3 + minioSrc := r.Source.Type == BatchJobReplicateResourceMinIO ctx, cancel := context.WithCancel(ctx) objInfoCh := c.ListObjects(ctx, r.Source.Bucket, minio.ListObjectsOptions{ Prefix: r.Source.Prefix, - WithVersions: true, + WithVersions: minioSrc, Recursive: true, WithMetadata: true, }) + prevObj := "" + skipReplicate := false + for obj := range objInfoCh { oi := toObjectInfo(r.Source.Bucket, obj.Key, obj) + if !minioSrc { + oi2, err := c.StatObject(ctx, r.Source.Bucket, obj.Key, minio.StatObjectOptions{}) + if err == nil { + oi = toObjectInfo(r.Source.Bucket, obj.Key, oi2) + } else { + if isErrMethodNotAllowed(ErrorRespToObjectError(err, r.Source.Bucket, obj.Key)) || + isErrObjectNotFound(ErrorRespToObjectError(err, r.Source.Bucket, obj.Key)) { + continue + } + logger.LogIf(ctx, err) + cancel() + return err + } + } if skip(oi) { continue } + if obj.Key != prevObj { + prevObj = obj.Key + // skip replication of delete marker and all versions under the same object name if one of source or target is s3. + skipReplicate = obj.IsDeleteMarker && s3Type + } + if skipReplicate { + continue + } + wk.Take() go func() { defer wk.Give() @@ -646,9 +685,10 @@ func (r *BatchJobReplicateV1) ReplicateToTarget(ctx context.Context, api ObjectL tgtBucket := r.Target.Bucket tgtPrefix := r.Target.Prefix srcObject := srcObjInfo.Name + s3Type := r.Target.Type == BatchJobReplicateResourceS3 || r.Source.Type == BatchJobReplicateResourceS3 if srcObjInfo.DeleteMarker || !srcObjInfo.VersionPurgeStatus.Empty() { - if retry { + if retry && !s3Type { if _, err := c.StatObject(ctx, tgtBucket, pathJoin(tgtPrefix, srcObject), miniogo.StatObjectOptions{ VersionID: srcObjInfo.VersionID, Internal: miniogo.AdvancedGetOptions{ @@ -664,7 +704,10 @@ func (r *BatchJobReplicateV1) ReplicateToTarget(ctx context.Context, api ObjectL if srcObjInfo.VersionPurgeStatus.Empty() { dmVersionID = srcObjInfo.VersionID } - + if r.Target.Type == BatchJobReplicateResourceS3 || r.Source.Type == BatchJobReplicateResourceS3 { + dmVersionID = "" + versionID = "" + } return c.RemoveObject(ctx, tgtBucket, pathJoin(tgtPrefix, srcObject), miniogo.RemoveObjectOptions{ VersionID: versionID, Internal: miniogo.AdvancedRemoveOptions{ @@ -676,7 +719,7 @@ func (r *BatchJobReplicateV1) ReplicateToTarget(ctx context.Context, api ObjectL }) } - if retry { // when we are retrying avoid copying if necessary. + if retry && !s3Type { // when we are retrying avoid copying if necessary. gopts := miniogo.GetObjectOptions{} if err := gopts.SetMatchETag(srcObjInfo.ETag); err != nil { return err @@ -710,7 +753,9 @@ func (r *BatchJobReplicateV1) ReplicateToTarget(ctx context.Context, api ObjectL if err != nil { return err } - + if r.Target.Type == BatchJobReplicateResourceS3 || r.Source.Type == BatchJobReplicateResourceS3 { + putOpts.Internal = miniogo.AdvancedPutOptions{} + } if objInfo.isMultipart() { if err := replicateObjectWithMultipart(ctx, c, tgtBucket, pathJoin(tgtPrefix, objInfo.Name), rd, objInfo, putOpts); err != nil { return err @@ -1021,6 +1066,10 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba // None of the provided metadata filters match skip the object. return false } + // if one of source or target is non MinIO, just replicate the top most version like `mc mirror` + if (r.Target.Type == BatchJobReplicateResourceS3 || r.Source.Type == BatchJobReplicateResourceS3) && !info.IsLatest { + return false + } return true } @@ -1059,6 +1108,8 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba attempts := attempts ctx, cancel := context.WithCancel(ctx) + // one of source/target is s3, skip delete marker and all versions under the same object name. + s3Type := r.Target.Type == BatchJobReplicateResourceS3 || r.Source.Type == BatchJobReplicateResourceS3 results := make(chan ObjectInfo, 100) if err := api.Walk(ctx, r.Source.Bucket, r.Source.Prefix, results, ObjectOptions{ @@ -1070,8 +1121,18 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba return err } + prevObj := "" + + skipReplicate := false for result := range results { result := result + if result.Name != prevObj { + prevObj = result.Name + skipReplicate = result.DeleteMarker && s3Type + } + if skipReplicate { + continue + } wk.Take() go func() { defer wk.Give() @@ -1265,8 +1326,10 @@ func (r *BatchJobReplicateV1) Validate(ctx context.Context, job BatchJobRequest, } return err } + // if both source and target are minio instances + minioType := r.Target.Type == BatchJobReplicateResourceMinIO && r.Source.Type == BatchJobReplicateResourceMinIO // If source has versioning enabled, target must have versioning enabled - if (info.Versioning && !vcfg.Enabled() && !isRemoteToLocal) || (!info.Versioning && vcfg.Enabled() && isRemoteToLocal) { + if minioType && ((info.Versioning && !vcfg.Enabled() && !isRemoteToLocal) || (!info.Versioning && vcfg.Enabled() && isRemoteToLocal)) { return batchReplicationJobError{ Code: "InvalidBucketState", Description: fmt.Sprintf("The source '%s' has versioning enabled, target '%s' must have versioning enabled",