mirror of https://github.com/minio/minio.git
multipart support etag and pre-read small objects (#20423)
This commit is contained in:
parent
15ab0808b3
commit
c28a4beeb7
|
@ -28,6 +28,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"mime"
|
||||
"mime/multipart"
|
||||
"net/http"
|
||||
"net/textproto"
|
||||
|
@ -53,6 +54,7 @@ import (
|
|||
"github.com/minio/minio/internal/bucket/replication"
|
||||
"github.com/minio/minio/internal/config/dns"
|
||||
"github.com/minio/minio/internal/crypto"
|
||||
"github.com/minio/minio/internal/etag"
|
||||
"github.com/minio/minio/internal/event"
|
||||
"github.com/minio/minio/internal/handlers"
|
||||
"github.com/minio/minio/internal/hash"
|
||||
|
@ -887,6 +889,30 @@ func (api objectAPIHandlers) PutBucketHandler(w http.ResponseWriter, r *http.Req
|
|||
})
|
||||
}
|
||||
|
||||
// multipartReader is just like https://pkg.go.dev/net/http#Request.MultipartReader but
|
||||
// rejects multipart/mixed as its not supported in S3 API.
|
||||
func multipartReader(r *http.Request) (*multipart.Reader, error) {
|
||||
v := r.Header.Get("Content-Type")
|
||||
if v == "" {
|
||||
return nil, http.ErrNotMultipart
|
||||
}
|
||||
if r.Body == nil {
|
||||
return nil, errors.New("missing form body")
|
||||
}
|
||||
d, params, err := mime.ParseMediaType(v)
|
||||
if err != nil {
|
||||
return nil, http.ErrNotMultipart
|
||||
}
|
||||
if d != "multipart/form-data" {
|
||||
return nil, http.ErrNotMultipart
|
||||
}
|
||||
boundary, ok := params["boundary"]
|
||||
if !ok {
|
||||
return nil, http.ErrMissingBoundary
|
||||
}
|
||||
return multipart.NewReader(r.Body, boundary), nil
|
||||
}
|
||||
|
||||
// PostPolicyBucketHandler - POST policy
|
||||
// ----------
|
||||
// This implementation of the POST operation handles object creation with a specified
|
||||
|
@ -920,9 +946,14 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
|
|||
return
|
||||
}
|
||||
|
||||
if r.ContentLength <= 0 {
|
||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrEmptyRequestBody), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
// Here the parameter is the size of the form data that should
|
||||
// be loaded in memory, the remaining being put in temporary files.
|
||||
mp, err := r.MultipartReader()
|
||||
mp, err := multipartReader(r)
|
||||
if err != nil {
|
||||
apiErr := errorCodes.ToAPIErr(ErrMalformedPOSTRequest)
|
||||
apiErr.Description = fmt.Sprintf("%s (%v)", apiErr.Description, err)
|
||||
|
@ -934,7 +965,7 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
|
|||
|
||||
var (
|
||||
reader io.Reader
|
||||
fileSize int64 = -1
|
||||
actualSize int64 = -1
|
||||
fileName string
|
||||
fanOutEntries = make([]minio.PutObjectFanOutEntry, 0, 100)
|
||||
)
|
||||
|
@ -942,6 +973,7 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
|
|||
maxParts := 1000
|
||||
// Canonicalize the form values into http.Header.
|
||||
formValues := make(http.Header)
|
||||
var headerLen int64
|
||||
for {
|
||||
part, err := mp.NextRawPart()
|
||||
if errors.Is(err, io.EOF) {
|
||||
|
@ -983,7 +1015,7 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
|
|||
return
|
||||
}
|
||||
|
||||
var b bytes.Buffer
|
||||
headerLen += int64(len(name)) + int64(len(fileName))
|
||||
if name != "file" {
|
||||
if http.CanonicalHeaderKey(name) == http.CanonicalHeaderKey("x-minio-fanout-list") {
|
||||
dec := json.NewDecoder(part)
|
||||
|
@ -994,7 +1026,7 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
|
|||
if err := dec.Decode(&m); err != nil {
|
||||
part.Close()
|
||||
apiErr := errorCodes.ToAPIErr(ErrMalformedPOSTRequest)
|
||||
apiErr.Description = fmt.Sprintf("%s (%v)", apiErr.Description, multipart.ErrMessageTooLarge)
|
||||
apiErr.Description = fmt.Sprintf("%s (%v)", apiErr.Description, err)
|
||||
writeErrorResponse(ctx, w, apiErr, r.URL)
|
||||
return
|
||||
}
|
||||
|
@ -1004,8 +1036,12 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
|
|||
continue
|
||||
}
|
||||
|
||||
buf := bytebufferpool.Get()
|
||||
// value, store as string in memory
|
||||
n, err := io.CopyN(&b, part, maxMemoryBytes+1)
|
||||
n, err := io.CopyN(buf, part, maxMemoryBytes+1)
|
||||
value := buf.String()
|
||||
buf.Reset()
|
||||
bytebufferpool.Put(buf)
|
||||
part.Close()
|
||||
|
||||
if err != nil && err != io.EOF {
|
||||
|
@ -1027,7 +1063,8 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
|
|||
writeErrorResponse(ctx, w, apiErr, r.URL)
|
||||
return
|
||||
}
|
||||
formValues[http.CanonicalHeaderKey(name)] = append(formValues[http.CanonicalHeaderKey(name)], b.String())
|
||||
headerLen += n
|
||||
formValues[http.CanonicalHeaderKey(name)] = append(formValues[http.CanonicalHeaderKey(name)], value)
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -1036,6 +1073,21 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
|
|||
// The file or text content must be the last field in the form.
|
||||
// You cannot upload more than one file at a time.
|
||||
reader = part
|
||||
|
||||
possibleShardSize := (r.ContentLength - headerLen)
|
||||
if globalStorageClass.ShouldInline(possibleShardSize, false) { // keep versioned false for this check
|
||||
var b bytes.Buffer
|
||||
n, err := io.Copy(&b, reader)
|
||||
if err != nil {
|
||||
apiErr := errorCodes.ToAPIErr(ErrMalformedPOSTRequest)
|
||||
apiErr.Description = fmt.Sprintf("%s (%v)", apiErr.Description, err)
|
||||
writeErrorResponse(ctx, w, apiErr, r.URL)
|
||||
return
|
||||
}
|
||||
reader = &b
|
||||
actualSize = n
|
||||
}
|
||||
|
||||
// we have found the File part of the request we are done processing multipart-form
|
||||
break
|
||||
}
|
||||
|
@ -1137,11 +1189,33 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
|
|||
return
|
||||
}
|
||||
|
||||
hashReader, err := hash.NewReader(ctx, reader, fileSize, "", "", fileSize)
|
||||
clientETag, err := etag.FromContentMD5(formValues)
|
||||
if err != nil {
|
||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidDigest), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
var forceMD5 []byte
|
||||
// Optimization: If SSE-KMS and SSE-C did not request Content-Md5. Use uuid as etag. Optionally enable this also
|
||||
// for server that is started with `--no-compat`.
|
||||
kind, _ := crypto.IsRequested(formValues)
|
||||
if !etag.ContentMD5Requested(formValues) && (kind == crypto.SSEC || kind == crypto.S3KMS || !globalServerCtxt.StrictS3Compat) {
|
||||
forceMD5 = mustGetUUIDBytes()
|
||||
}
|
||||
|
||||
hashReader, err := hash.NewReaderWithOpts(ctx, reader, hash.Options{
|
||||
Size: actualSize,
|
||||
MD5Hex: clientETag.String(),
|
||||
SHA256Hex: "",
|
||||
ActualSize: actualSize,
|
||||
DisableMD5: false,
|
||||
ForceMD5: forceMD5,
|
||||
})
|
||||
if err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
if checksum != nil && checksum.Valid() {
|
||||
if err = hashReader.AddChecksumNoTrailer(formValues, false); err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||
|
@ -1201,7 +1275,6 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
|
|||
opts.WantChecksum = checksum
|
||||
|
||||
fanOutOpts := fanOutOptions{Checksum: checksum}
|
||||
|
||||
if crypto.Requested(formValues) {
|
||||
if crypto.SSECopy.IsRequested(r.Header) {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, errInvalidEncryptionParameters), r.URL)
|
||||
|
@ -1246,8 +1319,15 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
|
|||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
wantSize := int64(-1)
|
||||
if actualSize >= 0 {
|
||||
info := ObjectInfo{Size: actualSize}
|
||||
wantSize = info.EncryptedSize()
|
||||
}
|
||||
|
||||
// do not try to verify encrypted content/
|
||||
hashReader, err = hash.NewReader(ctx, reader, -1, "", "", -1)
|
||||
hashReader, err = hash.NewReader(ctx, reader, wantSize, "", "", actualSize)
|
||||
if err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||
return
|
||||
|
@ -1327,7 +1407,6 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
|
|||
Key: objInfo.Name,
|
||||
Error: errs[i].Error(),
|
||||
})
|
||||
|
||||
eventArgsList = append(eventArgsList, eventArgs{
|
||||
EventName: event.ObjectCreatedPost,
|
||||
BucketName: objInfo.Bucket,
|
||||
|
|
|
@ -711,6 +711,8 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
|||
switch {
|
||||
case compressed:
|
||||
// ... nothing changes for compressed stream.
|
||||
// if actualSize is -1 we have no known way to
|
||||
// determine what is the actualSize.
|
||||
case encrypted:
|
||||
decSize, err := sio.DecryptedSize(uint64(n))
|
||||
if err == nil {
|
||||
|
|
|
@ -32,7 +32,6 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/dustin/go-humanize"
|
||||
"github.com/klauspost/readahead"
|
||||
"github.com/minio/madmin-go/v3"
|
||||
"github.com/minio/minio-go/v7/pkg/tags"
|
||||
|
@ -1379,30 +1378,13 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
|
|||
|
||||
defer er.deleteAll(context.Background(), minioMetaTmpBucket, tempObj)
|
||||
|
||||
shardFileSize := erasure.ShardFileSize(data.Size())
|
||||
inlineBlock := globalStorageClass.InlineBlock()
|
||||
if inlineBlock <= 0 {
|
||||
inlineBlock = 128 * humanize.KiByte
|
||||
var inlineBuffers []*bytes.Buffer
|
||||
if globalStorageClass.ShouldInline(erasure.ShardFileSize(data.ActualSize()), opts.Versioned) {
|
||||
inlineBuffers = make([]*bytes.Buffer, len(onlineDisks))
|
||||
}
|
||||
|
||||
shardFileSize := erasure.ShardFileSize(data.Size())
|
||||
writers := make([]io.Writer, len(onlineDisks))
|
||||
var inlineBuffers []*bytes.Buffer
|
||||
if shardFileSize >= 0 {
|
||||
if !opts.Versioned && shardFileSize <= inlineBlock {
|
||||
inlineBuffers = make([]*bytes.Buffer, len(onlineDisks))
|
||||
} else if shardFileSize < inlineBlock/8 {
|
||||
inlineBuffers = make([]*bytes.Buffer, len(onlineDisks))
|
||||
}
|
||||
} else {
|
||||
// If compressed, use actual size to determine.
|
||||
if sz := erasure.ShardFileSize(data.ActualSize()); sz > 0 {
|
||||
if !opts.Versioned && sz <= inlineBlock {
|
||||
inlineBuffers = make([]*bytes.Buffer, len(onlineDisks))
|
||||
} else if sz < inlineBlock/8 {
|
||||
inlineBuffers = make([]*bytes.Buffer, len(onlineDisks))
|
||||
}
|
||||
}
|
||||
}
|
||||
for i, disk := range onlineDisks {
|
||||
if disk == nil {
|
||||
continue
|
||||
|
@ -1469,13 +1451,15 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
|
|||
modTime = UTCNow()
|
||||
}
|
||||
|
||||
kind, encrypted := crypto.IsEncrypted(userDefined)
|
||||
actualSize := data.ActualSize()
|
||||
if actualSize < 0 {
|
||||
_, encrypted := crypto.IsEncrypted(fi.Metadata)
|
||||
compressed := fi.IsCompressed()
|
||||
switch {
|
||||
case compressed:
|
||||
// ... nothing changes for compressed stream.
|
||||
// if actualSize is -1 we have no known way to
|
||||
// determine what is the actualSize.
|
||||
case encrypted:
|
||||
decSize, err := sio.DecryptedSize(uint64(n))
|
||||
if err == nil {
|
||||
|
@ -1502,7 +1486,6 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
|
|||
}
|
||||
|
||||
userDefined["etag"] = r.MD5CurrentHexString()
|
||||
kind, _ := crypto.IsEncrypted(userDefined)
|
||||
if opts.PreserveETag != "" {
|
||||
if !opts.ReplicationRequest {
|
||||
userDefined["etag"] = opts.PreserveETag
|
||||
|
|
|
@ -272,6 +272,27 @@ func (sCfg *Config) GetParityForSC(sc string) (parity int) {
|
|||
}
|
||||
}
|
||||
|
||||
// ShouldInline returns true if the shardSize is worthy of inline
|
||||
// if versioned is true then we chosen 1/8th inline block size
|
||||
// to satisfy the same constraints.
|
||||
func (sCfg *Config) ShouldInline(shardSize int64, versioned bool) bool {
|
||||
if shardSize < 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
ConfigLock.RLock()
|
||||
inlineBlock := int64(128 * humanize.KiByte)
|
||||
if sCfg.initialized {
|
||||
inlineBlock = sCfg.inlineBlock
|
||||
}
|
||||
ConfigLock.RUnlock()
|
||||
|
||||
if versioned {
|
||||
return shardSize <= inlineBlock/8
|
||||
}
|
||||
return shardSize <= inlineBlock
|
||||
}
|
||||
|
||||
// InlineBlock indicates the size of the block which will be used to inline
|
||||
// an erasure shard and written along with xl.meta on the drive, on a versioned
|
||||
// bucket this value is automatically chosen to 1/8th of the this value, make
|
||||
|
|
Loading…
Reference in New Issue