support fan-out objects via PostUpload() (#17233)

This commit is contained in:
Harshavardhana
2023-05-24 22:51:07 -07:00
committed by GitHub
parent 66156b8230
commit d0a0eb9738
6 changed files with 367 additions and 69 deletions

View File

@@ -21,6 +21,7 @@ import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"encoding/xml"
"errors"
"fmt"
@@ -30,6 +31,7 @@ import (
"net/textproto"
"net/url"
"path"
"runtime"
"sort"
"strconv"
"strings"
@@ -37,8 +39,10 @@ import (
"github.com/google/uuid"
"github.com/minio/mux"
"github.com/valyala/bytebufferpool"
"github.com/minio/madmin-go/v2"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/set"
"github.com/minio/minio-go/v7/pkg/tags"
"github.com/minio/minio/internal/auth"
@@ -51,6 +55,7 @@ import (
"github.com/minio/minio/internal/handlers"
"github.com/minio/minio/internal/hash"
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/kms"
"github.com/minio/minio/internal/logger"
"github.com/minio/pkg/bucket/policy"
@@ -889,14 +894,6 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
}
bucket := mux.Vars(r)["bucket"]
// Require Content-Length to be set in the request
size := r.ContentLength
if size < 0 {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMissingContentLength), r.URL)
return
}
resource, err := getResource(r.URL.Path, r.Host, globalDomainNames)
if err != nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidRequest), r.URL)
@@ -911,7 +908,7 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
// Here the parameter is the size of the form data that should
// be loaded in memory, the remaining being put in temporary files.
reader, err := r.MultipartReader()
mp, err := r.MultipartReader()
if err != nil {
apiErr := errorCodes.ToAPIErr(ErrMalformedPOSTRequest)
apiErr.Description = fmt.Sprintf("%s (%v)", apiErr.Description, err)
@@ -919,16 +916,20 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
return
}
const mapEntryOverhead = 200
var (
fileBody io.ReadCloser
fileName string
reader io.Reader
fileSize int64 = -1
fileName string
fanOutEntries = make([]minio.PutObjectFanOutEntry, 0, 100)
)
maxParts := 1000
// Canonicalize the form values into http.Header.
formValues := make(http.Header)
for {
part, err := reader.NextRawPart()
part, err := mp.NextRawPart()
if errors.Is(err, io.EOF) {
break
}
@@ -956,7 +957,6 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
// Multiple values for the same key (one map entry, longer slice) are cheaper
// than the same number of values for different keys (many map entries), but
// using a consistent per-value cost for overhead is simpler.
const mapEntryOverhead = 200
maxMemoryBytes := 2 * int64(10<<20)
maxMemoryBytes -= int64(len(name))
maxMemoryBytes -= mapEntryOverhead
@@ -971,9 +971,29 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
var b bytes.Buffer
if fileName == "" {
if http.CanonicalHeaderKey(name) == http.CanonicalHeaderKey("x-minio-fanout-list") {
dec := json.NewDecoder(part)
// while the array contains values
for dec.More() {
var m minio.PutObjectFanOutEntry
if err := dec.Decode(&m); err != nil {
part.Close()
apiErr := errorCodes.ToAPIErr(ErrMalformedPOSTRequest)
apiErr.Description = fmt.Sprintf("%s (%v)", apiErr.Description, multipart.ErrMessageTooLarge)
writeErrorResponse(ctx, w, apiErr, r.URL)
return
}
fanOutEntries = append(fanOutEntries, m)
}
part.Close()
continue
}
// value, store as string in memory
n, err := io.CopyN(&b, part, maxMemoryBytes+1)
part.Close()
if err != nil && err != io.EOF {
apiErr := errorCodes.ToAPIErr(ErrMalformedPOSTRequest)
apiErr.Description = fmt.Sprintf("%s (%v)", apiErr.Description, err)
@@ -1001,8 +1021,8 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
// The file or text content.
// The file or text content must be the last field in the form.
// You cannot upload more than one file at a time.
fileBody = part // we have found the File part of the request breakout
defer part.Close()
reader = part
// we have found the File part of the request we are done processing multipart-form
break
}
@@ -1012,6 +1032,7 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
writeErrorResponse(ctx, w, apiErr, r.URL)
return
}
if fileName == "" {
apiErr := errorCodes.ToAPIErr(ErrMalformedPOSTRequest)
apiErr.Description = fmt.Sprintf("%s (%v)", apiErr.Description, errors.New("The file or text content is missing"))
@@ -1059,20 +1080,38 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
return
}
// Once signature is validated, check if the user has
// explicit permissions for the user.
if !globalIAMSys.IsAllowed(iampolicy.Args{
AccountName: cred.AccessKey,
Groups: cred.Groups,
Action: iampolicy.PutObjectAction,
ConditionValues: getConditionValues(r, "", cred),
BucketName: bucket,
ObjectName: object,
IsOwner: globalActiveCred.AccessKey == cred.AccessKey,
Claims: cred.Claims,
}) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrAccessDenied), r.URL)
return
if len(fanOutEntries) > 0 {
// Once signature is validated, check if the user has
// explicit permissions for the user.
if !globalIAMSys.IsAllowed(iampolicy.Args{
AccountName: cred.AccessKey,
Groups: cred.Groups,
Action: iampolicy.PutObjectFanOutAction,
ConditionValues: getConditionValues(r, "", cred),
BucketName: bucket,
ObjectName: object,
IsOwner: globalActiveCred.AccessKey == cred.AccessKey,
Claims: cred.Claims,
}) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrAccessDenied), r.URL)
return
}
} else {
// Once signature is validated, check if the user has
// explicit permissions for the user.
if !globalIAMSys.IsAllowed(iampolicy.Args{
AccountName: cred.AccessKey,
Groups: cred.Groups,
Action: iampolicy.PutObjectAction,
ConditionValues: getConditionValues(r, "", cred),
BucketName: bucket,
ObjectName: object,
IsOwner: globalActiveCred.AccessKey == cred.AccessKey,
Claims: cred.Claims,
}) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrAccessDenied), r.URL)
return
}
}
policyBytes, err := base64.StdEncoding.DecodeString(formValues.Get("Policy"))
@@ -1081,15 +1120,14 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
return
}
hashReader, err := hash.NewReader(fileBody, -1, "", "", -1)
hashReader, err := hash.NewReader(reader, fileSize, "", "", fileSize)
if err != nil {
logger.LogIf(ctx, err)
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
if checksum != nil && checksum.Valid() {
err = hashReader.AddChecksum(r, false)
if err != nil {
if err = hashReader.AddChecksumNoTrailer(formValues, false); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
@@ -1134,7 +1172,7 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
// Check if bucket encryption is enabled
sseConfig, _ := globalBucketSSEConfigSys.Get(bucket)
sseConfig.Apply(r.Header, sse.ApplyOptions{
sseConfig.Apply(formValues, sse.ApplyOptions{
AutoEncrypt: globalAutoEncryption,
})
@@ -1145,6 +1183,8 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
return
}
fanOutOpts := fanOutOptions{Checksum: checksum}
if crypto.Requested(formValues) {
if crypto.SSECopy.IsRequested(r.Header) {
writeErrorResponse(ctx, w, toAPIError(ctx, errInvalidEncryptionParameters), r.URL)
@@ -1177,29 +1217,137 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
return
}
}
reader, objectEncryptionKey, err = newEncryptReader(ctx, hashReader, kind, keyID, key, bucket, object, metadata, kmsCtx)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
// do not try to verify encrypted content/
hashReader, err = hash.NewReader(reader, -1, "", "", -1)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
if checksum != nil && checksum.Valid() {
err = hashReader.AddChecksum(r, true)
if len(fanOutEntries) == 0 {
reader, objectEncryptionKey, err = newEncryptReader(ctx, hashReader, kind, keyID, key, bucket, object, metadata, kmsCtx)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
// do not try to verify encrypted content/
hashReader, err = hash.NewReader(reader, -1, "", "", -1)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
if checksum != nil && checksum.Valid() {
if err = hashReader.AddChecksumNoTrailer(formValues, true); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
}
pReader, err = pReader.WithEncryption(hashReader, &objectEncryptionKey)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
} else {
fanOutOpts = fanOutOptions{
Key: key,
Kind: kind,
KeyID: keyID,
KmsCtx: kmsCtx,
Checksum: checksum,
}
}
pReader, err = pReader.WithEncryption(hashReader, &objectEncryptionKey)
}
if len(fanOutEntries) > 0 {
// Fan-out requires no copying, and must be carried from original source
// https://en.wikipedia.org/wiki/Copy_protection so the incoming stream
// is always going to be in-memory as we cannot re-read from what we
// wrote to disk - since that amounts to "copying" from a "copy"
// instead of "copying" from source, we need the stream to be seekable
// to ensure that we can make fan-out calls concurrently.
buf := bytebufferpool.Get()
defer bytebufferpool.Put(buf)
// Maximum allowed fan-out object size.
const maxFanOutSize = 16 << 20
n, err := io.Copy(buf, ioutil.HardLimitReader(pReader, maxFanOutSize))
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
concurrentSize := 100
if runtime.GOMAXPROCS(0) < concurrentSize {
concurrentSize = runtime.GOMAXPROCS(0)
}
fanOutResp := make([]minio.PutObjectFanOutResponse, 0, len(fanOutEntries))
eventArgsList := make([]eventArgs, 0, len(fanOutEntries))
for {
var objInfos []ObjectInfo
var errs []error
var done bool
if len(fanOutEntries) < concurrentSize {
objInfos, errs = fanOutPutObject(ctx, bucket, objectAPI, fanOutEntries, buf.Bytes()[:n], fanOutOpts)
done = true
} else {
objInfos, errs = fanOutPutObject(ctx, bucket, objectAPI, fanOutEntries[:concurrentSize], buf.Bytes()[:n], fanOutOpts)
fanOutEntries = fanOutEntries[concurrentSize:]
}
for i, objInfo := range objInfos {
if errs[i] != nil {
fanOutResp = append(fanOutResp, minio.PutObjectFanOutResponse{
Key: objInfo.Name,
Error: errs[i],
})
continue
}
fanOutResp = append(fanOutResp, minio.PutObjectFanOutResponse{
Key: objInfo.Name,
ETag: getDecryptedETag(formValues, objInfo, false),
VersionID: objInfo.VersionID,
LastModified: &objInfo.ModTime,
})
eventArgsList = append(eventArgsList, eventArgs{
EventName: event.ObjectCreatedPost,
BucketName: objInfo.Bucket,
Object: objInfo,
ReqParams: extractReqParams(r),
RespElements: extractRespElements(w),
UserAgent: r.UserAgent() + " " + "MinIO-Fan-Out",
Host: handlers.GetSourceIP(r),
})
}
if done {
break
}
}
enc := json.NewEncoder(w)
for i, fanOutResp := range fanOutResp {
if err = enc.Encode(&fanOutResp); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
// Notify object created events.
sendEvent(eventArgsList[i])
if eventArgsList[i].Object.NumVersions > dataScannerExcessiveVersionsThreshold {
// Send events for excessive versions.
sendEvent(eventArgs{
EventName: event.ObjectManyVersions,
BucketName: eventArgsList[i].Object.Bucket,
Object: eventArgsList[i].Object,
ReqParams: extractReqParams(r),
RespElements: extractRespElements(w),
UserAgent: r.UserAgent() + " " + "MinIO-Fan-Out",
Host: handlers.GetSourceIP(r),
})
}
}
return
}
objInfo, err := objectAPI.PutObject(ctx, bucket, object, pReader, opts)
@@ -1232,6 +1380,7 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
UserAgent: r.UserAgent(),
Host: handlers.GetSourceIP(r),
})
if objInfo.NumVersions > dataScannerExcessiveVersionsThreshold {
defer sendEvent(eventArgs{
EventName: event.ObjectManyVersions,