diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index 2e3d49aa8..17301851b 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -19,6 +19,7 @@ package cmd import ( "context" + "encoding/base64" "errors" "fmt" "io" @@ -41,7 +42,15 @@ import ( ) func (er erasureObjects) getUploadIDDir(bucket, object, uploadID string) string { - return pathJoin(er.getMultipartSHADir(bucket, object), uploadID) + uploadUUID := uploadID + uploadBytes, err := base64.StdEncoding.DecodeString(uploadID) + if err == nil { + slc := strings.SplitN(string(uploadBytes), ".", 2) + if len(slc) == 2 { + uploadUUID = slc[1] + } + } + return pathJoin(er.getMultipartSHADir(bucket, object), uploadUUID) } func (er erasureObjects) getMultipartSHADir(bucket, object string) string { @@ -436,9 +445,9 @@ func (er erasureObjects) newMultipartUpload(ctx context.Context, bucket string, partsMetadata[index].ModTime = modTime partsMetadata[index].Metadata = userDefined } - - uploadID := mustGetUUID() - uploadIDPath := er.getUploadIDDir(bucket, object, uploadID) + uploadUUID := mustGetUUID() + uploadID := base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%s.%s", globalDeploymentID, uploadUUID))) + uploadIDPath := er.getUploadIDDir(bucket, object, uploadUUID) // Write updated `xl.meta` to all disks. if _, err := writeUniqueFileInfo(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum); err != nil { diff --git a/cmd/erasure-single-drive.go b/cmd/erasure-single-drive.go index c597749d3..0bd4a0323 100644 --- a/cmd/erasure-single-drive.go +++ b/cmd/erasure-single-drive.go @@ -20,6 +20,7 @@ package cmd import ( "bytes" "context" + "encoding/base64" "errors" "fmt" "io" @@ -1943,7 +1944,15 @@ func (es *erasureSingle) restoreTransitionedObject(ctx context.Context, bucket s } func (es *erasureSingle) getUploadIDDir(bucket, object, uploadID string) string { - return pathJoin(es.getMultipartSHADir(bucket, object), uploadID) + uploadUUID := uploadID + uploadBytes, err := base64.StdEncoding.DecodeString(uploadID) + if err == nil { + slc := strings.SplitN(string(uploadBytes), ".", 2) + if len(slc) == 2 { + uploadUUID = slc[1] + } + } + return pathJoin(es.getMultipartSHADir(bucket, object), uploadUUID) } func (es *erasureSingle) getMultipartSHADir(bucket, object string) string { @@ -2190,9 +2199,9 @@ func (es *erasureSingle) newMultipartUpload(ctx context.Context, bucket string, partsMetadata[index].ModTime = modTime partsMetadata[index].Metadata = opts.UserDefined } - - uploadID := mustGetUUID() - uploadIDPath := es.getUploadIDDir(bucket, object, uploadID) + uploadUUID := mustGetUUID() + uploadID := base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%s.%s", globalDeploymentID, uploadUUID))) + uploadIDPath := es.getUploadIDDir(bucket, object, uploadUUID) // Write updated `xl.meta` to all disks. if _, err := writeUniqueFileInfo(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum); err != nil { diff --git a/cmd/erasure-utils.go b/cmd/erasure-utils.go index 820a5df2f..434267b2b 100644 --- a/cmd/erasure-utils.go +++ b/cmd/erasure-utils.go @@ -20,8 +20,10 @@ package cmd import ( "bytes" "context" + "encoding/base64" "fmt" "io" + "strings" "github.com/klauspost/reedsolomon" xioutil "github.com/minio/minio/internal/ioutil" @@ -117,3 +119,16 @@ func writeDataBlocks(ctx context.Context, dst io.Writer, enBlocks [][]byte, data // Success. return totalWritten, nil } + +// returns deploymentID from uploadID +func getDeplIDFromUpload(uploadID string) (string, error) { + uploadBytes, err := base64.StdEncoding.DecodeString(uploadID) + if err != nil { + return "", fmt.Errorf("error parsing uploadID %s (%w)", uploadID, err) + } + slc := strings.SplitN(string(uploadBytes), ".", 2) + if len(slc) != 2 { + return "", fmt.Errorf("uploadID %s has incorrect format", uploadID) + } + return slc[0], nil +} diff --git a/cmd/generic-handlers.go b/cmd/generic-handlers.go index 05f83519a..69f0ecb32 100644 --- a/cmd/generic-handlers.go +++ b/cmd/generic-handlers.go @@ -576,3 +576,47 @@ func setCriticalErrorHandler(h http.Handler) http.Handler { h.ServeHTTP(w, r) }) } + +// setUploadForwardingHandler middleware forwards multiparts requests +// in a site replication setup to peer that initiated the upload +func setUploadForwardingHandler(h http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if !globalSiteReplicationSys.isEnabled() || + guessIsHealthCheckReq(r) || guessIsMetricsReq(r) || + guessIsRPCReq(r) || guessIsLoginSTSReq(r) || isAdminReq(r) { + h.ServeHTTP(w, r) + return + } + bucket, object := request2BucketObjectName(r) + uploadID := r.Form.Get(xhttp.UploadID) + + if bucket != "" && object != "" && uploadID != "" { + deplID, err := getDeplIDFromUpload(uploadID) + if err != nil { + writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrNoSuchUpload), r.URL) + return + } + remote, self := globalSiteReplicationSys.getPeerForUpload(deplID) + if self { + h.ServeHTTP(w, r) + return + } + // forward request to peer handling this upload + if globalBucketTargetSys.isOffline(remote.EndpointURL) { + writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrReplicationRemoteConnectionError), r.URL) + return + } + + r.URL.Scheme = remote.EndpointURL.Scheme + r.URL.Host = remote.EndpointURL.Host + // Make sure we remove any existing headers before + // proxying the request to another node. + for k := range w.Header() { + w.Header().Del(k) + } + globalForwarder.ServeHTTP(w, r) + return + } + h.ServeHTTP(w, r) + }) +} diff --git a/cmd/object-api-input-checks.go b/cmd/object-api-input-checks.go index 8dfc6212d..9cffdedb1 100644 --- a/cmd/object-api-input-checks.go +++ b/cmd/object-api-input-checks.go @@ -19,10 +19,10 @@ package cmd import ( "context" + "encoding/base64" "runtime" "strings" - "github.com/google/uuid" "github.com/minio/minio-go/v7/pkg/s3utils" "github.com/minio/minio/internal/logger" ) @@ -112,7 +112,8 @@ func checkListMultipartArgs(ctx context.Context, bucket, prefix, keyMarker, uplo KeyMarker: keyMarker, } } - if _, err := uuid.Parse(uploadIDMarker); err != nil { + _, err := base64.StdEncoding.DecodeString(uploadIDMarker) + if err != nil { logger.LogIf(ctx, err) return MalformedUploadID{ UploadID: uploadIDMarker, diff --git a/cmd/routers.go b/cmd/routers.go index d04dac112..3d59afc43 100644 --- a/cmd/routers.go +++ b/cmd/routers.go @@ -61,6 +61,8 @@ var globalHandlers = []mux.MiddlewareFunc{ setRequestValidityHandler, // set x-amz-request-id header. addCustomHeaders, + // Add upload forwarding handler for site replication + setUploadForwardingHandler, // Add bucket forwarding handler setBucketForwardingHandler, // Add new handlers here. diff --git a/cmd/site-replication.go b/cmd/site-replication.go index b52e7ae50..3125af66d 100644 --- a/cmd/site-replication.go +++ b/cmd/site-replication.go @@ -4843,3 +4843,27 @@ func isUserInfoEqual(u1, u2 madmin.UserInfo) bool { func isPolicyMappingEqual(p1, p2 srPolicyMapping) bool { return p1.Policy == p2.Policy && p1.IsGroup == p2.IsGroup && p1.UserOrGroup == p2.UserOrGroup } + +type srPeerInfo struct { + madmin.PeerInfo + EndpointURL *url.URL +} + +// getPeerForUpload returns the site replication peer handling this upload. Defaults to local cluster otherwise +func (c *SiteReplicationSys) getPeerForUpload(deplID string) (pi srPeerInfo, local bool) { + ci, _ := c.GetClusterInfo(GlobalContext) + if !ci.Enabled { + return pi, true + } + for _, site := range ci.Sites { + if deplID == site.DeploymentID { + ep, _ := url.Parse(site.Endpoint) + pi = srPeerInfo{ + PeerInfo: site, + EndpointURL: ep, + } + return pi, site.DeploymentID == globalDeploymentID + } + } + return pi, true +}