diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 85cc94fc7..c7e1e4f82 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -20,6 +20,7 @@ package cmd import ( "context" "fmt" + "io" "net/http" "reflect" "strings" @@ -36,6 +37,7 @@ import ( "github.com/minio/minio/internal/config/storageclass" "github.com/minio/minio/internal/crypto" "github.com/minio/minio/internal/event" + "github.com/minio/minio/internal/hash" xhttp "github.com/minio/minio/internal/http" "github.com/minio/minio/internal/logger" iampolicy "github.com/minio/pkg/iam/policy" @@ -768,9 +770,17 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje newCtx, cancel := context.WithTimeout(ctx, globalOperationTimeout.Timeout()) defer cancel() r := bandwidth.NewMonitoredReader(newCtx, globalBucketMonitor, gr, opts) - if _, err = c.PutObject(ctx, dest.Bucket, object, r, size, "", "", putOpts); err != nil { - replicationStatus = replication.Failed - logger.LogIf(ctx, fmt.Errorf("Unable to replicate for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err)) + if len(objInfo.Parts) > 1 { + if uploadID, err := replicateObjectWithMultipart(ctx, c, dest.Bucket, object, r, objInfo, putOpts); err != nil { + replicationStatus = replication.Failed + logger.LogIf(ctx, fmt.Errorf("Unable to replicate for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err)) + defer c.AbortMultipartUpload(ctx, dest.Bucket, object, uploadID) + } + } else { + if _, err = c.PutObject(ctx, dest.Bucket, object, r, size, "", "", putOpts); err != nil { + replicationStatus = replication.Failed + logger.LogIf(ctx, fmt.Errorf("Unable to replicate for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err)) + } } } gr.Close() @@ -839,6 +849,40 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje } } +func replicateObjectWithMultipart(ctx context.Context, c *miniogo.Core, bucket, object string, r io.Reader, objInfo ObjectInfo, opts miniogo.PutObjectOptions) (uploadID string, err error) { + var uploadedParts []miniogo.CompletePart + uploadID, err = c.NewMultipartUpload(context.Background(), bucket, object, opts) + if err != nil { + return + } + var ( + hr *hash.Reader + pInfo miniogo.ObjectPart + ) + for _, partInfo := range objInfo.Parts { + hr, err = hash.NewReader(r, partInfo.Size, "", "", partInfo.Size) + if err != nil { + return + } + pInfo, err = c.PutObjectPart(ctx, bucket, object, uploadID, partInfo.Number, hr, partInfo.Size, "", "", opts.ServerSideEncryption) + if err != nil { + return + } + if pInfo.Size != partInfo.Size { + return uploadID, fmt.Errorf("Part size mismatch: got %d, want %d", pInfo.Size, partInfo.Size) + } + uploadedParts = append(uploadedParts, miniogo.CompletePart{ + PartNumber: pInfo.PartNumber, + ETag: pInfo.ETag, + }) + } + _, err = c.CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, miniogo.PutObjectOptions{Internal: miniogo.AdvancedPutOptions{ + SourceMTime: objInfo.ModTime, + ReplicationRequest: true, // always set this to distinguish between `mc mirror` replication and serverside + }}) + return +} + // filterReplicationStatusMetadata filters replication status metadata for COPY func filterReplicationStatusMetadata(metadata map[string]string) map[string]string { // Copy on write diff --git a/cmd/gateway/s3/gateway-s3.go b/cmd/gateway/s3/gateway-s3.go index 493551e8d..bbce5e971 100644 --- a/cmd/gateway/s3/gateway-s3.go +++ b/cmd/gateway/s3/gateway-s3.go @@ -664,7 +664,7 @@ func (l *s3Objects) AbortMultipartUpload(ctx context.Context, bucket string, obj // CompleteMultipartUpload completes ongoing multipart upload and finalizes object func (l *s3Objects) CompleteMultipartUpload(ctx context.Context, bucket string, object string, uploadID string, uploadedParts []minio.CompletePart, opts minio.ObjectOptions) (oi minio.ObjectInfo, e error) { - etag, err := l.Client.CompleteMultipartUpload(ctx, bucket, object, uploadID, minio.ToMinioClientCompleteParts(uploadedParts)) + etag, err := l.Client.CompleteMultipartUpload(ctx, bucket, object, uploadID, minio.ToMinioClientCompleteParts(uploadedParts), miniogo.PutObjectOptions{}) if err != nil { return oi, minio.ErrorRespToObjectError(err, bucket, object) } diff --git a/cmd/object-api-options.go b/cmd/object-api-options.go index 4790eba98..c2e90f291 100644 --- a/cmd/object-api-options.go +++ b/cmd/object-api-options.go @@ -336,3 +336,21 @@ func copySrcOpts(ctx context.Context, r *http.Request, bucket, object string) (O } return opts, nil } + +// get ObjectOptions for CompleteMultipart calls +func completeMultipartOpts(ctx context.Context, r *http.Request, bucket, object string) (opts ObjectOptions, err error) { + mtimeStr := strings.TrimSpace(r.Header.Get(xhttp.MinIOSourceMTime)) + mtime := UTCNow() + if mtimeStr != "" { + mtime, err = time.Parse(time.RFC3339, mtimeStr) + if err != nil { + return opts, InvalidArgument{ + Bucket: bucket, + Object: object, + Err: fmt.Errorf("Unable to parse %s, failed with %w", xhttp.MinIOSourceMTime, err), + } + } + } + opts.MTime = mtime + return opts, nil +} diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index e626d5c1f..8f4c596bd 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -3115,9 +3115,15 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite setEventStreamHeaders(w) + opts, err := completeMultipartOpts(ctx, r, bucket, object) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + w = &whiteSpaceWriter{ResponseWriter: w, Flusher: w.(http.Flusher)} completeDoneCh := sendWhiteSpace(w) - objInfo, err := completeMultiPartUpload(ctx, bucket, object, uploadID, completeParts, ObjectOptions{}) + objInfo, err := completeMultiPartUpload(ctx, bucket, object, uploadID, completeParts, opts) // Stop writing white spaces to the client. Note that close(doneCh) style is not used as it // can cause white space to be written after we send XML response in a race condition. headerWritten := <-completeDoneCh diff --git a/go.mod b/go.mod index 20bba5943..a927da478 100644 --- a/go.mod +++ b/go.mod @@ -45,7 +45,7 @@ require ( github.com/minio/highwayhash v1.0.2 github.com/minio/kes v0.14.0 github.com/minio/madmin-go v1.0.12 - github.com/minio/minio-go/v7 v7.0.12-0.20210617160455-b7103728fb87 + github.com/minio/minio-go/v7 v7.0.12-0.20210622001910-0823af6c707c github.com/minio/parquet-go v1.0.0 github.com/minio/pkg v1.0.8 github.com/minio/rpc v1.0.0 diff --git a/go.sum b/go.sum index 9eae4f205..5dbff1f3d 100644 --- a/go.sum +++ b/go.sum @@ -1023,6 +1023,8 @@ github.com/minio/minio-go/v7 v7.0.11-0.20210302210017-6ae69c73ce78/go.mod h1:mTh github.com/minio/minio-go/v7 v7.0.11-0.20210607181445-e162fdb8e584/go.mod h1:WoyW+ySKAKjY98B9+7ZbI8z8S3jaxaisdcvj9TGlazA= github.com/minio/minio-go/v7 v7.0.12-0.20210617160455-b7103728fb87 h1:BnFzAooBfXMLroRcXEot1tHNa2s7Sa5avBcIG2q85+8= github.com/minio/minio-go/v7 v7.0.12-0.20210617160455-b7103728fb87/go.mod h1:S23iSP5/gbMwtxeY5FM71R+TkAYyzEdoNEDDwpt8yWs= +github.com/minio/minio-go/v7 v7.0.12-0.20210622001910-0823af6c707c h1:wg0ywTE1zsFtf3CP9UDOw/SYv6BCN3SrkH41gbk3rgc= +github.com/minio/minio-go/v7 v7.0.12-0.20210622001910-0823af6c707c/go.mod h1:S23iSP5/gbMwtxeY5FM71R+TkAYyzEdoNEDDwpt8yWs= github.com/minio/operator v0.0.0-20210616045941-65f31f5f78ae h1:GONmqbjCi/KTEc1CGujnS/m1qeJeghcQ8dUBLh19qQo= github.com/minio/operator v0.0.0-20210616045941-65f31f5f78ae/go.mod h1:8/mIXK+CFdL6VqyxRn1SwD+PEX0jsN8uqjoadaw/Np0= github.com/minio/operator/logsearchapi v0.0.0-20210604224119-7e256f98cf90 h1:Qu6j6oE7+QNuq7Kr2DLyVYq3fqMdqFd/T8NAeNp47og=