From c28a4beeb7da5833dea1eb16de6fdc1f6287fa5f Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Thu, 12 Sep 2024 05:24:04 -0700 Subject: [PATCH] multipart support etag and pre-read small objects (#20423) --- cmd/bucket-handlers.go | 99 +++++++++++++++++-- cmd/erasure-multipart.go | 2 + cmd/erasure-object.go | 31 ++---- internal/config/storageclass/storage-class.go | 21 ++++ 4 files changed, 119 insertions(+), 34 deletions(-) diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index 2d4ab5d4d..41a57e025 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -28,6 +28,7 @@ import ( "errors" "fmt" "io" + "mime" "mime/multipart" "net/http" "net/textproto" @@ -53,6 +54,7 @@ import ( "github.com/minio/minio/internal/bucket/replication" "github.com/minio/minio/internal/config/dns" "github.com/minio/minio/internal/crypto" + "github.com/minio/minio/internal/etag" "github.com/minio/minio/internal/event" "github.com/minio/minio/internal/handlers" "github.com/minio/minio/internal/hash" @@ -887,6 +889,30 @@ func (api objectAPIHandlers) PutBucketHandler(w http.ResponseWriter, r *http.Req }) } +// multipartReader is just like https://pkg.go.dev/net/http#Request.MultipartReader but +// rejects multipart/mixed as its not supported in S3 API. +func multipartReader(r *http.Request) (*multipart.Reader, error) { + v := r.Header.Get("Content-Type") + if v == "" { + return nil, http.ErrNotMultipart + } + if r.Body == nil { + return nil, errors.New("missing form body") + } + d, params, err := mime.ParseMediaType(v) + if err != nil { + return nil, http.ErrNotMultipart + } + if d != "multipart/form-data" { + return nil, http.ErrNotMultipart + } + boundary, ok := params["boundary"] + if !ok { + return nil, http.ErrMissingBoundary + } + return multipart.NewReader(r.Body, boundary), nil +} + // PostPolicyBucketHandler - POST policy // ---------- // This implementation of the POST operation handles object creation with a specified @@ -920,9 +946,14 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h return } + if r.ContentLength <= 0 { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrEmptyRequestBody), r.URL) + return + } + // Here the parameter is the size of the form data that should // be loaded in memory, the remaining being put in temporary files. - mp, err := r.MultipartReader() + mp, err := multipartReader(r) if err != nil { apiErr := errorCodes.ToAPIErr(ErrMalformedPOSTRequest) apiErr.Description = fmt.Sprintf("%s (%v)", apiErr.Description, err) @@ -934,7 +965,7 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h var ( reader io.Reader - fileSize int64 = -1 + actualSize int64 = -1 fileName string fanOutEntries = make([]minio.PutObjectFanOutEntry, 0, 100) ) @@ -942,6 +973,7 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h maxParts := 1000 // Canonicalize the form values into http.Header. formValues := make(http.Header) + var headerLen int64 for { part, err := mp.NextRawPart() if errors.Is(err, io.EOF) { @@ -983,7 +1015,7 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h return } - var b bytes.Buffer + headerLen += int64(len(name)) + int64(len(fileName)) if name != "file" { if http.CanonicalHeaderKey(name) == http.CanonicalHeaderKey("x-minio-fanout-list") { dec := json.NewDecoder(part) @@ -994,7 +1026,7 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h if err := dec.Decode(&m); err != nil { part.Close() apiErr := errorCodes.ToAPIErr(ErrMalformedPOSTRequest) - apiErr.Description = fmt.Sprintf("%s (%v)", apiErr.Description, multipart.ErrMessageTooLarge) + apiErr.Description = fmt.Sprintf("%s (%v)", apiErr.Description, err) writeErrorResponse(ctx, w, apiErr, r.URL) return } @@ -1004,8 +1036,12 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h continue } + buf := bytebufferpool.Get() // value, store as string in memory - n, err := io.CopyN(&b, part, maxMemoryBytes+1) + n, err := io.CopyN(buf, part, maxMemoryBytes+1) + value := buf.String() + buf.Reset() + bytebufferpool.Put(buf) part.Close() if err != nil && err != io.EOF { @@ -1027,7 +1063,8 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h writeErrorResponse(ctx, w, apiErr, r.URL) return } - formValues[http.CanonicalHeaderKey(name)] = append(formValues[http.CanonicalHeaderKey(name)], b.String()) + headerLen += n + formValues[http.CanonicalHeaderKey(name)] = append(formValues[http.CanonicalHeaderKey(name)], value) continue } @@ -1036,6 +1073,21 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h // The file or text content must be the last field in the form. // You cannot upload more than one file at a time. reader = part + + possibleShardSize := (r.ContentLength - headerLen) + if globalStorageClass.ShouldInline(possibleShardSize, false) { // keep versioned false for this check + var b bytes.Buffer + n, err := io.Copy(&b, reader) + if err != nil { + apiErr := errorCodes.ToAPIErr(ErrMalformedPOSTRequest) + apiErr.Description = fmt.Sprintf("%s (%v)", apiErr.Description, err) + writeErrorResponse(ctx, w, apiErr, r.URL) + return + } + reader = &b + actualSize = n + } + // we have found the File part of the request we are done processing multipart-form break } @@ -1137,11 +1189,33 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h return } - hashReader, err := hash.NewReader(ctx, reader, fileSize, "", "", fileSize) + clientETag, err := etag.FromContentMD5(formValues) + if err != nil { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidDigest), r.URL) + return + } + + var forceMD5 []byte + // Optimization: If SSE-KMS and SSE-C did not request Content-Md5. Use uuid as etag. Optionally enable this also + // for server that is started with `--no-compat`. + kind, _ := crypto.IsRequested(formValues) + if !etag.ContentMD5Requested(formValues) && (kind == crypto.SSEC || kind == crypto.S3KMS || !globalServerCtxt.StrictS3Compat) { + forceMD5 = mustGetUUIDBytes() + } + + hashReader, err := hash.NewReaderWithOpts(ctx, reader, hash.Options{ + Size: actualSize, + MD5Hex: clientETag.String(), + SHA256Hex: "", + ActualSize: actualSize, + DisableMD5: false, + ForceMD5: forceMD5, + }) if err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) return } + if checksum != nil && checksum.Valid() { if err = hashReader.AddChecksumNoTrailer(formValues, false); err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) @@ -1201,7 +1275,6 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h opts.WantChecksum = checksum fanOutOpts := fanOutOptions{Checksum: checksum} - if crypto.Requested(formValues) { if crypto.SSECopy.IsRequested(r.Header) { writeErrorResponse(ctx, w, toAPIError(ctx, errInvalidEncryptionParameters), r.URL) @@ -1246,8 +1319,15 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) return } + + wantSize := int64(-1) + if actualSize >= 0 { + info := ObjectInfo{Size: actualSize} + wantSize = info.EncryptedSize() + } + // do not try to verify encrypted content/ - hashReader, err = hash.NewReader(ctx, reader, -1, "", "", -1) + hashReader, err = hash.NewReader(ctx, reader, wantSize, "", "", actualSize) if err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) return @@ -1327,7 +1407,6 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h Key: objInfo.Name, Error: errs[i].Error(), }) - eventArgsList = append(eventArgsList, eventArgs{ EventName: event.ObjectCreatedPost, BucketName: objInfo.Bucket, diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index 1b34c6fba..36d675fbe 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -711,6 +711,8 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo switch { case compressed: // ... nothing changes for compressed stream. + // if actualSize is -1 we have no known way to + // determine what is the actualSize. case encrypted: decSize, err := sio.DecryptedSize(uint64(n)) if err == nil { diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index f0aeb0742..5414a1c88 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -32,7 +32,6 @@ import ( "sync" "time" - "github.com/dustin/go-humanize" "github.com/klauspost/readahead" "github.com/minio/madmin-go/v3" "github.com/minio/minio-go/v7/pkg/tags" @@ -1379,30 +1378,13 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st defer er.deleteAll(context.Background(), minioMetaTmpBucket, tempObj) - shardFileSize := erasure.ShardFileSize(data.Size()) - inlineBlock := globalStorageClass.InlineBlock() - if inlineBlock <= 0 { - inlineBlock = 128 * humanize.KiByte + var inlineBuffers []*bytes.Buffer + if globalStorageClass.ShouldInline(erasure.ShardFileSize(data.ActualSize()), opts.Versioned) { + inlineBuffers = make([]*bytes.Buffer, len(onlineDisks)) } + shardFileSize := erasure.ShardFileSize(data.Size()) writers := make([]io.Writer, len(onlineDisks)) - var inlineBuffers []*bytes.Buffer - if shardFileSize >= 0 { - if !opts.Versioned && shardFileSize <= inlineBlock { - inlineBuffers = make([]*bytes.Buffer, len(onlineDisks)) - } else if shardFileSize < inlineBlock/8 { - inlineBuffers = make([]*bytes.Buffer, len(onlineDisks)) - } - } else { - // If compressed, use actual size to determine. - if sz := erasure.ShardFileSize(data.ActualSize()); sz > 0 { - if !opts.Versioned && sz <= inlineBlock { - inlineBuffers = make([]*bytes.Buffer, len(onlineDisks)) - } else if sz < inlineBlock/8 { - inlineBuffers = make([]*bytes.Buffer, len(onlineDisks)) - } - } - } for i, disk := range onlineDisks { if disk == nil { continue @@ -1469,13 +1451,15 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st modTime = UTCNow() } + kind, encrypted := crypto.IsEncrypted(userDefined) actualSize := data.ActualSize() if actualSize < 0 { - _, encrypted := crypto.IsEncrypted(fi.Metadata) compressed := fi.IsCompressed() switch { case compressed: // ... nothing changes for compressed stream. + // if actualSize is -1 we have no known way to + // determine what is the actualSize. case encrypted: decSize, err := sio.DecryptedSize(uint64(n)) if err == nil { @@ -1502,7 +1486,6 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st } userDefined["etag"] = r.MD5CurrentHexString() - kind, _ := crypto.IsEncrypted(userDefined) if opts.PreserveETag != "" { if !opts.ReplicationRequest { userDefined["etag"] = opts.PreserveETag diff --git a/internal/config/storageclass/storage-class.go b/internal/config/storageclass/storage-class.go index baa26231a..b3b3b26f9 100644 --- a/internal/config/storageclass/storage-class.go +++ b/internal/config/storageclass/storage-class.go @@ -272,6 +272,27 @@ func (sCfg *Config) GetParityForSC(sc string) (parity int) { } } +// ShouldInline returns true if the shardSize is worthy of inline +// if versioned is true then we chosen 1/8th inline block size +// to satisfy the same constraints. +func (sCfg *Config) ShouldInline(shardSize int64, versioned bool) bool { + if shardSize < 0 { + return false + } + + ConfigLock.RLock() + inlineBlock := int64(128 * humanize.KiByte) + if sCfg.initialized { + inlineBlock = sCfg.inlineBlock + } + ConfigLock.RUnlock() + + if versioned { + return shardSize <= inlineBlock/8 + } + return shardSize <= inlineBlock +} + // InlineBlock indicates the size of the block which will be used to inline // an erasure shard and written along with xl.meta on the drive, on a versioned // bucket this value is automatically chosen to 1/8th of the this value, make