mirror of
https://github.com/minio/minio.git
synced 2025-11-09 21:49:46 -05:00
Use multipart call for replication (#12535)
if object was uploaded with multipart. This is to ensure that GetObject calls with partNumber in URI request parameters have same behavior on source and replication target.
This commit is contained in:
committed by
GitHub
parent
a6ad965799
commit
a3f0288262
@@ -20,6 +20,7 @@ package cmd
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"reflect"
|
||||
"strings"
|
||||
@@ -36,6 +37,7 @@ import (
|
||||
"github.com/minio/minio/internal/config/storageclass"
|
||||
"github.com/minio/minio/internal/crypto"
|
||||
"github.com/minio/minio/internal/event"
|
||||
"github.com/minio/minio/internal/hash"
|
||||
xhttp "github.com/minio/minio/internal/http"
|
||||
"github.com/minio/minio/internal/logger"
|
||||
iampolicy "github.com/minio/pkg/iam/policy"
|
||||
@@ -768,9 +770,17 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
|
||||
newCtx, cancel := context.WithTimeout(ctx, globalOperationTimeout.Timeout())
|
||||
defer cancel()
|
||||
r := bandwidth.NewMonitoredReader(newCtx, globalBucketMonitor, gr, opts)
|
||||
if _, err = c.PutObject(ctx, dest.Bucket, object, r, size, "", "", putOpts); err != nil {
|
||||
replicationStatus = replication.Failed
|
||||
logger.LogIf(ctx, fmt.Errorf("Unable to replicate for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err))
|
||||
if len(objInfo.Parts) > 1 {
|
||||
if uploadID, err := replicateObjectWithMultipart(ctx, c, dest.Bucket, object, r, objInfo, putOpts); err != nil {
|
||||
replicationStatus = replication.Failed
|
||||
logger.LogIf(ctx, fmt.Errorf("Unable to replicate for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err))
|
||||
defer c.AbortMultipartUpload(ctx, dest.Bucket, object, uploadID)
|
||||
}
|
||||
} else {
|
||||
if _, err = c.PutObject(ctx, dest.Bucket, object, r, size, "", "", putOpts); err != nil {
|
||||
replicationStatus = replication.Failed
|
||||
logger.LogIf(ctx, fmt.Errorf("Unable to replicate for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err))
|
||||
}
|
||||
}
|
||||
}
|
||||
gr.Close()
|
||||
@@ -839,6 +849,40 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
|
||||
}
|
||||
}
|
||||
|
||||
func replicateObjectWithMultipart(ctx context.Context, c *miniogo.Core, bucket, object string, r io.Reader, objInfo ObjectInfo, opts miniogo.PutObjectOptions) (uploadID string, err error) {
|
||||
var uploadedParts []miniogo.CompletePart
|
||||
uploadID, err = c.NewMultipartUpload(context.Background(), bucket, object, opts)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
var (
|
||||
hr *hash.Reader
|
||||
pInfo miniogo.ObjectPart
|
||||
)
|
||||
for _, partInfo := range objInfo.Parts {
|
||||
hr, err = hash.NewReader(r, partInfo.Size, "", "", partInfo.Size)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
pInfo, err = c.PutObjectPart(ctx, bucket, object, uploadID, partInfo.Number, hr, partInfo.Size, "", "", opts.ServerSideEncryption)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if pInfo.Size != partInfo.Size {
|
||||
return uploadID, fmt.Errorf("Part size mismatch: got %d, want %d", pInfo.Size, partInfo.Size)
|
||||
}
|
||||
uploadedParts = append(uploadedParts, miniogo.CompletePart{
|
||||
PartNumber: pInfo.PartNumber,
|
||||
ETag: pInfo.ETag,
|
||||
})
|
||||
}
|
||||
_, err = c.CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, miniogo.PutObjectOptions{Internal: miniogo.AdvancedPutOptions{
|
||||
SourceMTime: objInfo.ModTime,
|
||||
ReplicationRequest: true, // always set this to distinguish between `mc mirror` replication and serverside
|
||||
}})
|
||||
return
|
||||
}
|
||||
|
||||
// filterReplicationStatusMetadata filters replication status metadata for COPY
|
||||
func filterReplicationStatusMetadata(metadata map[string]string) map[string]string {
|
||||
// Copy on write
|
||||
|
||||
@@ -664,7 +664,7 @@ func (l *s3Objects) AbortMultipartUpload(ctx context.Context, bucket string, obj
|
||||
|
||||
// CompleteMultipartUpload completes ongoing multipart upload and finalizes object
|
||||
func (l *s3Objects) CompleteMultipartUpload(ctx context.Context, bucket string, object string, uploadID string, uploadedParts []minio.CompletePart, opts minio.ObjectOptions) (oi minio.ObjectInfo, e error) {
|
||||
etag, err := l.Client.CompleteMultipartUpload(ctx, bucket, object, uploadID, minio.ToMinioClientCompleteParts(uploadedParts))
|
||||
etag, err := l.Client.CompleteMultipartUpload(ctx, bucket, object, uploadID, minio.ToMinioClientCompleteParts(uploadedParts), miniogo.PutObjectOptions{})
|
||||
if err != nil {
|
||||
return oi, minio.ErrorRespToObjectError(err, bucket, object)
|
||||
}
|
||||
|
||||
@@ -336,3 +336,21 @@ func copySrcOpts(ctx context.Context, r *http.Request, bucket, object string) (O
|
||||
}
|
||||
return opts, nil
|
||||
}
|
||||
|
||||
// get ObjectOptions for CompleteMultipart calls
|
||||
func completeMultipartOpts(ctx context.Context, r *http.Request, bucket, object string) (opts ObjectOptions, err error) {
|
||||
mtimeStr := strings.TrimSpace(r.Header.Get(xhttp.MinIOSourceMTime))
|
||||
mtime := UTCNow()
|
||||
if mtimeStr != "" {
|
||||
mtime, err = time.Parse(time.RFC3339, mtimeStr)
|
||||
if err != nil {
|
||||
return opts, InvalidArgument{
|
||||
Bucket: bucket,
|
||||
Object: object,
|
||||
Err: fmt.Errorf("Unable to parse %s, failed with %w", xhttp.MinIOSourceMTime, err),
|
||||
}
|
||||
}
|
||||
}
|
||||
opts.MTime = mtime
|
||||
return opts, nil
|
||||
}
|
||||
|
||||
@@ -3115,9 +3115,15 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
|
||||
|
||||
setEventStreamHeaders(w)
|
||||
|
||||
opts, err := completeMultipartOpts(ctx, r, bucket, object)
|
||||
if err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
w = &whiteSpaceWriter{ResponseWriter: w, Flusher: w.(http.Flusher)}
|
||||
completeDoneCh := sendWhiteSpace(w)
|
||||
objInfo, err := completeMultiPartUpload(ctx, bucket, object, uploadID, completeParts, ObjectOptions{})
|
||||
objInfo, err := completeMultiPartUpload(ctx, bucket, object, uploadID, completeParts, opts)
|
||||
// Stop writing white spaces to the client. Note that close(doneCh) style is not used as it
|
||||
// can cause white space to be written after we send XML response in a race condition.
|
||||
headerWritten := <-completeDoneCh
|
||||
|
||||
Reference in New Issue
Block a user