proxy multipart to peers via multipart uploadID (#15926)

This commit is contained in:
Poorna 2022-10-25 10:52:29 -07:00 committed by GitHub
parent 1673778633
commit ce8456a1a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 114 additions and 10 deletions

View File

@ -19,6 +19,7 @@ package cmd
import ( import (
"context" "context"
"encoding/base64"
"errors" "errors"
"fmt" "fmt"
"io" "io"
@ -41,7 +42,15 @@ import (
) )
func (er erasureObjects) getUploadIDDir(bucket, object, uploadID string) string { func (er erasureObjects) getUploadIDDir(bucket, object, uploadID string) string {
return pathJoin(er.getMultipartSHADir(bucket, object), uploadID) uploadUUID := uploadID
uploadBytes, err := base64.StdEncoding.DecodeString(uploadID)
if err == nil {
slc := strings.SplitN(string(uploadBytes), ".", 2)
if len(slc) == 2 {
uploadUUID = slc[1]
}
}
return pathJoin(er.getMultipartSHADir(bucket, object), uploadUUID)
} }
func (er erasureObjects) getMultipartSHADir(bucket, object string) string { func (er erasureObjects) getMultipartSHADir(bucket, object string) string {
@ -436,9 +445,9 @@ func (er erasureObjects) newMultipartUpload(ctx context.Context, bucket string,
partsMetadata[index].ModTime = modTime partsMetadata[index].ModTime = modTime
partsMetadata[index].Metadata = userDefined partsMetadata[index].Metadata = userDefined
} }
uploadUUID := mustGetUUID()
uploadID := mustGetUUID() uploadID := base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%s.%s", globalDeploymentID, uploadUUID)))
uploadIDPath := er.getUploadIDDir(bucket, object, uploadID) uploadIDPath := er.getUploadIDDir(bucket, object, uploadUUID)
// Write updated `xl.meta` to all disks. // Write updated `xl.meta` to all disks.
if _, err := writeUniqueFileInfo(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum); err != nil { if _, err := writeUniqueFileInfo(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum); err != nil {

View File

@ -20,6 +20,7 @@ package cmd
import ( import (
"bytes" "bytes"
"context" "context"
"encoding/base64"
"errors" "errors"
"fmt" "fmt"
"io" "io"
@ -1943,7 +1944,15 @@ func (es *erasureSingle) restoreTransitionedObject(ctx context.Context, bucket s
} }
func (es *erasureSingle) getUploadIDDir(bucket, object, uploadID string) string { func (es *erasureSingle) getUploadIDDir(bucket, object, uploadID string) string {
return pathJoin(es.getMultipartSHADir(bucket, object), uploadID) uploadUUID := uploadID
uploadBytes, err := base64.StdEncoding.DecodeString(uploadID)
if err == nil {
slc := strings.SplitN(string(uploadBytes), ".", 2)
if len(slc) == 2 {
uploadUUID = slc[1]
}
}
return pathJoin(es.getMultipartSHADir(bucket, object), uploadUUID)
} }
func (es *erasureSingle) getMultipartSHADir(bucket, object string) string { func (es *erasureSingle) getMultipartSHADir(bucket, object string) string {
@ -2190,9 +2199,9 @@ func (es *erasureSingle) newMultipartUpload(ctx context.Context, bucket string,
partsMetadata[index].ModTime = modTime partsMetadata[index].ModTime = modTime
partsMetadata[index].Metadata = opts.UserDefined partsMetadata[index].Metadata = opts.UserDefined
} }
uploadUUID := mustGetUUID()
uploadID := mustGetUUID() uploadID := base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%s.%s", globalDeploymentID, uploadUUID)))
uploadIDPath := es.getUploadIDDir(bucket, object, uploadID) uploadIDPath := es.getUploadIDDir(bucket, object, uploadUUID)
// Write updated `xl.meta` to all disks. // Write updated `xl.meta` to all disks.
if _, err := writeUniqueFileInfo(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum); err != nil { if _, err := writeUniqueFileInfo(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum); err != nil {

View File

@ -20,8 +20,10 @@ package cmd
import ( import (
"bytes" "bytes"
"context" "context"
"encoding/base64"
"fmt" "fmt"
"io" "io"
"strings"
"github.com/klauspost/reedsolomon" "github.com/klauspost/reedsolomon"
xioutil "github.com/minio/minio/internal/ioutil" xioutil "github.com/minio/minio/internal/ioutil"
@ -117,3 +119,16 @@ func writeDataBlocks(ctx context.Context, dst io.Writer, enBlocks [][]byte, data
// Success. // Success.
return totalWritten, nil return totalWritten, nil
} }
// returns deploymentID from uploadID
func getDeplIDFromUpload(uploadID string) (string, error) {
uploadBytes, err := base64.StdEncoding.DecodeString(uploadID)
if err != nil {
return "", fmt.Errorf("error parsing uploadID %s (%w)", uploadID, err)
}
slc := strings.SplitN(string(uploadBytes), ".", 2)
if len(slc) != 2 {
return "", fmt.Errorf("uploadID %s has incorrect format", uploadID)
}
return slc[0], nil
}

View File

@ -576,3 +576,47 @@ func setCriticalErrorHandler(h http.Handler) http.Handler {
h.ServeHTTP(w, r) h.ServeHTTP(w, r)
}) })
} }
// setUploadForwardingHandler middleware forwards multiparts requests
// in a site replication setup to peer that initiated the upload
func setUploadForwardingHandler(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if !globalSiteReplicationSys.isEnabled() ||
guessIsHealthCheckReq(r) || guessIsMetricsReq(r) ||
guessIsRPCReq(r) || guessIsLoginSTSReq(r) || isAdminReq(r) {
h.ServeHTTP(w, r)
return
}
bucket, object := request2BucketObjectName(r)
uploadID := r.Form.Get(xhttp.UploadID)
if bucket != "" && object != "" && uploadID != "" {
deplID, err := getDeplIDFromUpload(uploadID)
if err != nil {
writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrNoSuchUpload), r.URL)
return
}
remote, self := globalSiteReplicationSys.getPeerForUpload(deplID)
if self {
h.ServeHTTP(w, r)
return
}
// forward request to peer handling this upload
if globalBucketTargetSys.isOffline(remote.EndpointURL) {
writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrReplicationRemoteConnectionError), r.URL)
return
}
r.URL.Scheme = remote.EndpointURL.Scheme
r.URL.Host = remote.EndpointURL.Host
// Make sure we remove any existing headers before
// proxying the request to another node.
for k := range w.Header() {
w.Header().Del(k)
}
globalForwarder.ServeHTTP(w, r)
return
}
h.ServeHTTP(w, r)
})
}

View File

@ -19,10 +19,10 @@ package cmd
import ( import (
"context" "context"
"encoding/base64"
"runtime" "runtime"
"strings" "strings"
"github.com/google/uuid"
"github.com/minio/minio-go/v7/pkg/s3utils" "github.com/minio/minio-go/v7/pkg/s3utils"
"github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/logger"
) )
@ -112,7 +112,8 @@ func checkListMultipartArgs(ctx context.Context, bucket, prefix, keyMarker, uplo
KeyMarker: keyMarker, KeyMarker: keyMarker,
} }
} }
if _, err := uuid.Parse(uploadIDMarker); err != nil { _, err := base64.StdEncoding.DecodeString(uploadIDMarker)
if err != nil {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
return MalformedUploadID{ return MalformedUploadID{
UploadID: uploadIDMarker, UploadID: uploadIDMarker,

View File

@ -61,6 +61,8 @@ var globalHandlers = []mux.MiddlewareFunc{
setRequestValidityHandler, setRequestValidityHandler,
// set x-amz-request-id header. // set x-amz-request-id header.
addCustomHeaders, addCustomHeaders,
// Add upload forwarding handler for site replication
setUploadForwardingHandler,
// Add bucket forwarding handler // Add bucket forwarding handler
setBucketForwardingHandler, setBucketForwardingHandler,
// Add new handlers here. // Add new handlers here.

View File

@ -4843,3 +4843,27 @@ func isUserInfoEqual(u1, u2 madmin.UserInfo) bool {
func isPolicyMappingEqual(p1, p2 srPolicyMapping) bool { func isPolicyMappingEqual(p1, p2 srPolicyMapping) bool {
return p1.Policy == p2.Policy && p1.IsGroup == p2.IsGroup && p1.UserOrGroup == p2.UserOrGroup return p1.Policy == p2.Policy && p1.IsGroup == p2.IsGroup && p1.UserOrGroup == p2.UserOrGroup
} }
type srPeerInfo struct {
madmin.PeerInfo
EndpointURL *url.URL
}
// getPeerForUpload returns the site replication peer handling this upload. Defaults to local cluster otherwise
func (c *SiteReplicationSys) getPeerForUpload(deplID string) (pi srPeerInfo, local bool) {
ci, _ := c.GetClusterInfo(GlobalContext)
if !ci.Enabled {
return pi, true
}
for _, site := range ci.Sites {
if deplID == site.DeploymentID {
ep, _ := url.Parse(site.Endpoint)
pi = srPeerInfo{
PeerInfo: site,
EndpointURL: ep,
}
return pi, site.DeploymentID == globalDeploymentID
}
}
return pi, true
}