From f939d1c1831c71f4b1c14df6d9cd62b12ccce7a3 Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Tue, 19 Jul 2022 08:35:29 -0700 Subject: [PATCH] Independent Multipart Uploads (#15346) Do completely independent multipart uploads. In distributed mode, a lock was held to merge each multipart upload as it was added. This lock was highly contested and retries are expensive (timewise) in distributed mode. Instead, each part adds its metadata information uniquely. This eliminates the per object lock required for each to merge. The metadata is read back and merged by "CompleteMultipartUpload" without locks when constructing final object. Co-authored-by: Harshavardhana --- cmd/erasure-common.go | 85 +++ cmd/erasure-healing_test.go | 2 +- cmd/erasure-multipart.go | 264 ++++++--- cmd/naughty-disk_test.go | 8 + cmd/object-api-multipart_test.go | 10 + cmd/object-handlers.go | 852 ---------------------------- cmd/object-multipart-handlers.go | 897 ++++++++++++++++++++++++++++++ cmd/storage-datatypes.go | 21 + cmd/storage-datatypes_gen.go | 521 +++++++++++++++++ cmd/storage-datatypes_gen_test.go | 226 ++++++++ cmd/storage-interface.go | 6 + cmd/storage-rest-client.go | 45 +- cmd/storage-rest-common.go | 3 +- cmd/storage-rest-server.go | 42 ++ cmd/storagemetric_string.go | 7 +- cmd/xl-storage-disk-id-check.go | 16 + cmd/xl-storage.go | 60 ++ 17 files changed, 2134 insertions(+), 931 deletions(-) create mode 100644 cmd/object-multipart-handlers.go diff --git a/cmd/erasure-common.go b/cmd/erasure-common.go index 990736dd9..16c82a27e 100644 --- a/cmd/erasure-common.go +++ b/cmd/erasure-common.go @@ -19,7 +19,11 @@ package cmd import ( "context" + "fmt" "sync" + + "github.com/minio/minio/internal/logger" + "github.com/minio/minio/internal/sync/errgroup" ) func (er erasureObjects) getOnlineDisks() (newDisks []StorageAPI) { @@ -119,3 +123,84 @@ func (er erasureObjects) getLoadBalancedDisks(optimized bool) []StorageAPI { // Return disks which have maximum disk usage common. return newDisks[max] } + +// readMultipleFiles Reads raw data from all specified files from all disks. +func readMultipleFiles(ctx context.Context, disks []StorageAPI, req ReadMultipleReq, readQuorum int) ([]ReadMultipleResp, error) { + resps := make([]chan ReadMultipleResp, len(disks)) + for i := range resps { + resps[i] = make(chan ReadMultipleResp, len(req.Files)) + } + g := errgroup.WithNErrs(len(disks)) + // Read files in parallel across disks. + for index := range disks { + index := index + g.Go(func() (err error) { + if disks[index] == nil { + return errDiskNotFound + } + return disks[index].ReadMultiple(ctx, req, resps[index]) + }, index) + } + + dataArray := make([]ReadMultipleResp, 0, len(req.Files)) + // Merge results. They should come in order from each. + for _, wantFile := range req.Files { + quorum := 0 + toAdd := ReadMultipleResp{ + Bucket: req.Bucket, + Prefix: req.Prefix, + File: wantFile, + } + for i := range resps { + if disks[i] == nil { + continue + } + select { + case <-ctx.Done(): + case gotFile, ok := <-resps[i]: + if !ok { + continue + } + if gotFile.Error != "" || !gotFile.Exists { + continue + } + if gotFile.File != wantFile || gotFile.Bucket != req.Bucket || gotFile.Prefix != req.Prefix { + continue + } + quorum++ + if toAdd.Modtime.After(gotFile.Modtime) || len(gotFile.Data) < len(toAdd.Data) { + // Pick latest, or largest to avoid possible truncated entries. + continue + } + toAdd = gotFile + } + } + if quorum < readQuorum { + toAdd.Exists = false + toAdd.Error = errErasureReadQuorum.Error() + toAdd.Data = nil + } + dataArray = append(dataArray, toAdd) + } + + errs := g.Wait() + for index, err := range errs { + if err == nil { + continue + } + if !IsErr(err, []error{ + errFileNotFound, + errVolumeNotFound, + errFileVersionNotFound, + errDiskNotFound, + errUnformattedDisk, + }...) { + logger.LogOnceIf(ctx, fmt.Errorf("Drive %s, path (%s/%s) returned an error (%w)", + disks[index], req.Bucket, req.Prefix, err), + disks[index].String()) + } + } + + // Return all the metadata. + return dataArray, nil +} diff --git a/cmd/erasure-healing_test.go b/cmd/erasure-healing_test.go index 5c1449ad4..ed84a0bc0 100644 --- a/cmd/erasure-healing_test.go +++ b/cmd/erasure-healing_test.go @@ -580,7 +580,7 @@ func TestHealCorrectQuorum(t *testing.T) { _, err = objLayer.CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, ObjectOptions{}) if err != nil { - t.Fatalf("Failed to complete multipart upload - %v", err) + t.Fatalf("Failed to complete multipart upload - got: %v", err) } cfgFile := pathJoin(bucketMetaPrefix, bucket, ".test.bin") diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index 2d2ef6e31..54aa892cf 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -76,6 +76,30 @@ func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object return err } +// Removes part.meta given by partName belonging to a mulitpart upload from minioMetaBucket +func (er erasureObjects) removePartMeta(bucket, object, uploadID, dataDir string, partNumber int) { + uploadIDPath := er.getUploadIDDir(bucket, object, uploadID) + curpartPath := pathJoin(uploadIDPath, dataDir, fmt.Sprintf("part.%d", partNumber)) + storageDisks := er.getDisks() + + g := errgroup.WithNErrs(len(storageDisks)) + for index, disk := range storageDisks { + if disk == nil { + continue + } + index := index + g.Go(func() error { + _ = storageDisks[index].Delete(context.TODO(), minioMetaMultipartBucket, curpartPath+".meta", DeleteOptions{ + Recursive: false, + Force: false, + }) + + return nil + }, index) + } + g.Wait() +} + // Removes part given by partName belonging to a mulitpart upload from minioMetaBucket func (er erasureObjects) removeObjectPart(bucket, object, uploadID, dataDir string, partNumber int) { uploadIDPath := er.getUploadIDDir(bucket, object, uploadID) @@ -96,6 +120,11 @@ func (er erasureObjects) removeObjectPart(bucket, object, uploadID, dataDir stri Recursive: false, Force: false, }) + _ = storageDisks[index].Delete(context.TODO(), minioMetaMultipartBucket, curpartPath+".meta", DeleteOptions{ + Recursive: false, + Force: false, + }) + return nil }, index) } @@ -453,6 +482,47 @@ func renamePart(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, ds return evalDisks(disks, errs), err } +// writeAllDisks - writes 'b' to all provided disks. +// If write cannot reach quorum, the files will be deleted from all disks. +func writeAllDisks(ctx context.Context, disks []StorageAPI, dstBucket, dstEntry string, b []byte, writeQuorum int) ([]StorageAPI, error) { + g := errgroup.WithNErrs(len(disks)) + + // Write file to all underlying storage disks. + for index := range disks { + index := index + g.Go(func() error { + if disks[index] == nil { + return errDiskNotFound + } + return disks[index].WriteAll(ctx, dstBucket, dstEntry, b) + }, index) + } + + // Wait for all renames to finish. + errs := g.Wait() + + // We can safely allow RenameFile errors up to len(er.getDisks()) - writeQuorum + // otherwise return failure. Cleanup successful renames. + err := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum) + if err == errErasureWriteQuorum { + // Remove all written + g := errgroup.WithNErrs(len(disks)) + for index := range disks { + if disks[index] == nil || errs[index] != nil { + continue + } + index := index + g.Go(func() error { + return disks[index].Delete(ctx, dstBucket, dstEntry, DeleteOptions{Force: true}) + }, index) + } + // Ignore these errors. + g.WaitErr() + } + + return evalDisks(disks, errs), err +} + // PutObjectPart - reads incoming stream and internally erasure codes // them. This call is similar to single put operation but it is part // of the multipart transaction. @@ -479,11 +549,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo return PartInfo{}, err } rctx := rlkctx.Context() - defer func() { - if uploadIDRLock != nil { - uploadIDRLock.RUnlock(rlkctx.Cancel) - } - }() + defer uploadIDRLock.RUnlock(rlkctx.Cancel) data := r.Reader // Validate input data size and it can never be less than zero. @@ -507,10 +573,6 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo partsMetadata, errs = readAllFileInfo(rctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "", false) - // Unlock upload id locks before, so others can get it. - uploadIDRLock.RUnlock(rlkctx.Cancel) - uploadIDRLock = nil - // get Quorum for this object _, writeQuorum, err := objectQuorumFromMeta(pctx, partsMetadata, errs, er.defaultParityCount) if err != nil { @@ -617,50 +679,21 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo } } - // Acquire write lock to update metadata. - uploadIDWLock := er.NewNSLock(bucket, pathJoin(object, uploadID)) - wlkctx, err := uploadIDWLock.GetLock(pctx, globalOperationTimeout) - if err != nil { - return PartInfo{}, err - } - wctx := wlkctx.Context() - defer uploadIDWLock.Unlock(wlkctx.Cancel) - - // Validates if upload ID exists. - if err = er.checkUploadIDExists(wctx, bucket, object, uploadID); err != nil { - return pi, toObjectErr(err, bucket, object, uploadID) - } - // Rename temporary part file to its final location. partPath := pathJoin(uploadIDPath, fi.DataDir, partSuffix) - onlineDisks, err = renamePart(wctx, onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, writeQuorum) + onlineDisks, err = renamePart(ctx, onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, writeQuorum) if err != nil { return pi, toObjectErr(err, minioMetaMultipartBucket, partPath) } - // Read metadata again because it might be updated with parallel upload of another part. - partsMetadata, errs = readAllFileInfo(wctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, "", false) - reducedErr = reduceWriteQuorumErrs(wctx, errs, objectOpIgnoredErrs, writeQuorum) - if reducedErr == errErasureWriteQuorum { - return pi, toObjectErr(reducedErr, bucket, object) - } - - // Get current highest version based on re-read partsMetadata. - onlineDisks, modTime = listOnlineDisks(onlineDisks, partsMetadata, errs) - - // Pick one from the first valid metadata. - fi, err = pickValidFileInfo(wctx, partsMetadata, modTime, writeQuorum) - if err != nil { - return pi, err - } - - // Once part is successfully committed, proceed with updating erasure metadata. - fi.ModTime = UTCNow() - md5hex := r.MD5CurrentHexString() if opts.PreserveETag != "" { md5hex = opts.PreserveETag } + + // Once part is successfully committed, proceed with saving erasure metadata for part. + fi.ModTime = UTCNow() + var index []byte if opts.IndexCB != nil { index = opts.IndexCB() @@ -669,27 +702,18 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo // Add the current part. fi.AddObjectPart(partID, md5hex, n, data.ActualSize(), index) - for i, disk := range onlineDisks { - if disk == OfflineDisk { - continue - } - partsMetadata[i].Size = fi.Size - partsMetadata[i].ModTime = fi.ModTime - partsMetadata[i].Parts = fi.Parts - partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{ - PartNumber: partID, - Algorithm: DefaultBitrotAlgorithm, - Hash: bitrotWriterSum(writers[i]), - }) + // Save part info as partPath+".meta" + fiMsg, err := fi.MarshalMsg(nil) + if err != nil { + return pi, toObjectErr(err, minioMetaMultipartBucket, partPath) } - // Writes update `xl.meta` format for each disk. - if _, err = writeUniqueFileInfo(wctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum); err != nil { - return pi, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath) + // Write part metadata to all disks. + onlineDisks, err = writeAllDisks(ctx, onlineDisks, minioMetaMultipartBucket, partPath+".meta", fiMsg, writeQuorum) + if err != nil { + return pi, toObjectErr(err, minioMetaMultipartBucket, partPath) } - online = countOnlineDisks(onlineDisks) - // Return success. return PartInfo{ PartNumber: partID, @@ -704,6 +728,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo // by callers to verify object states // - encrypted // - compressed +// Does not contain currently uploaded parts by design. func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (MultipartInfo, error) { auditObjectErasureSet(ctx, object, &er) @@ -795,7 +820,7 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up return result, toObjectErr(reducedErr, minioMetaMultipartBucket, uploadIDPath) } - _, modTime := listOnlineDisks(storageDisks, partsMetadata, errs) + onlineDisks, modTime := listOnlineDisks(storageDisks, partsMetadata, errs) // Pick one from the first valid metadata. fi, err := pickValidFileInfo(ctx, partsMetadata, modTime, writeQuorum) @@ -803,6 +828,24 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up return result, err } + // Read Part info for all parts + partPath := pathJoin(uploadIDPath, fi.DataDir) + "/" + req := ReadMultipleReq{ + Bucket: minioMetaMultipartBucket, + Prefix: partPath, + MaxSize: 1 << 20, // Each part should realistically not be > 1MiB. + } + + // Parts are 1 based, so index 0 is part one, etc. + for i := 1; i <= maxPartsList; i++ { + req.Files = append(req.Files, fmt.Sprintf("part.%d.meta", i)) + } + + partInfoFiles, err := readMultipleFiles(ctx, onlineDisks, req, writeQuorum) + if err != nil { + return result, err + } + // Populate the result stub. result.Bucket = bucket result.Object = object @@ -812,7 +855,7 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up result.UserDefined = cloneMSS(fi.Metadata) // For empty number of parts or maxParts as zero, return right here. - if len(fi.Parts) == 0 || maxParts == 0 { + if len(partInfoFiles) == 0 || maxParts == 0 { return result, nil } @@ -821,6 +864,28 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up maxParts = maxPartsList } + var partFI FileInfo + for i, part := range partInfoFiles { + if part.Error != "" || !part.Exists { + continue + } + _, err := partFI.UnmarshalMsg(part.Data) + if err != nil { + // Maybe crash or similar. + logger.LogIf(ctx, err) + continue + } + + if len(partFI.Parts) != 1 { + logger.LogIf(ctx, fmt.Errorf("unexpected part count: %d", len(partFI.Parts))) + continue + } + + addPart := partFI.Parts[0] + // Add the current part. + fi.AddObjectPart(i+1, addPart.ETag, addPart.Size, addPart.ActualSize, addPart.Index) + } + // Only parts with higher part numbers will be listed. partIdx := objectPartIndex(fi.Parts, partNumberMarker) parts := fi.Parts @@ -860,17 +925,17 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket string, object string, uploadID string, parts []CompletePart, opts ObjectOptions) (oi ObjectInfo, err error) { auditObjectErasureSet(ctx, object, &er) - // Hold read-locks to verify uploaded parts, also disallows - // parallel part uploads as well. + // Hold write locks to verify uploaded parts, also disallows any + // parallel PutObjectPart() requests. uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID)) - rlkctx, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout) + wlkctx, err := uploadIDLock.GetLock(ctx, globalOperationTimeout) if err != nil { return oi, err } - rctx := rlkctx.Context() - defer uploadIDLock.RUnlock(rlkctx.Cancel) + wctx := wlkctx.Context() + defer uploadIDLock.Unlock(wlkctx.Cancel) - if err = er.checkUploadIDExists(rctx, bucket, object, uploadID); err != nil { + if err = er.checkUploadIDExists(wctx, bucket, object, uploadID); err != nil { return oi, toObjectErr(err, bucket, object, uploadID) } @@ -879,15 +944,15 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str storageDisks := er.getDisks() // Read metadata associated with the object from all disks. - partsMetadata, errs := readAllFileInfo(rctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "", false) + partsMetadata, errs := readAllFileInfo(wctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "", false) // get Quorum for this object - _, writeQuorum, err := objectQuorumFromMeta(rctx, partsMetadata, errs, er.defaultParityCount) + _, writeQuorum, err := objectQuorumFromMeta(wctx, partsMetadata, errs, er.defaultParityCount) if err != nil { return oi, toObjectErr(err, bucket, object) } - reducedErr := reduceWriteQuorumErrs(rctx, errs, objectOpIgnoredErrs, writeQuorum) + reducedErr := reduceWriteQuorumErrs(wctx, errs, objectOpIgnoredErrs, writeQuorum) if reducedErr == errErasureWriteQuorum { return oi, toObjectErr(reducedErr, bucket, object) } @@ -895,11 +960,59 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str onlineDisks, modTime := listOnlineDisks(storageDisks, partsMetadata, errs) // Pick one from the first valid metadata. - fi, err := pickValidFileInfo(rctx, partsMetadata, modTime, writeQuorum) + fi, err := pickValidFileInfo(wctx, partsMetadata, modTime, writeQuorum) if err != nil { return oi, err } + // Read Part info for all parts + partPath := pathJoin(uploadIDPath, fi.DataDir) + "/" + req := ReadMultipleReq{ + Bucket: minioMetaMultipartBucket, + Prefix: partPath, + MaxSize: 1 << 20, // Each part should realistically not be > 1MiB. + } + for _, part := range parts { + req.Files = append(req.Files, fmt.Sprintf("part.%d.meta", part.PartNumber)) + } + partInfoFiles, err := readMultipleFiles(ctx, onlineDisks, req, writeQuorum) + if err != nil { + return oi, err + } + if len(partInfoFiles) != len(parts) { + // Should only happen through internal error + err := fmt.Errorf("unexpected part result count: %d, want %d", len(partInfoFiles), len(parts)) + logger.LogIf(ctx, err) + return oi, toObjectErr(err, bucket, object) + } + + var partFI FileInfo + for i, part := range partInfoFiles { + partID := parts[i].PartNumber + if part.Error != "" || !part.Exists { + return oi, InvalidPart{ + PartNumber: partID, + } + } + _, err := partFI.UnmarshalMsg(part.Data) + if err != nil { + // Maybe crash or similar. + logger.LogIf(ctx, err) + return oi, InvalidPart{ + PartNumber: partID, + } + } + if len(partFI.Parts) != 1 { + logger.LogIf(ctx, fmt.Errorf("unexpected part count: %d", len(partFI.Parts))) + return oi, InvalidPart{ + PartNumber: partID, + } + } + addPart := partFI.Parts[0] + // Add the current part. + fi.AddObjectPart(partID, addPart.ETag, addPart.Size, addPart.ActualSize, addPart.Index) + } + // Calculate full object size. var objectSize int64 @@ -939,7 +1052,7 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str return oi, invp } - // All parts except the last part has to be atleast 5MB. + // All parts except the last part has to be at least 5MB. if (i < len(parts)-1) && !isMinAllowedPartSize(currentFI.Parts[partIdx].ActualSize) { return oi, PartTooSmall{ PartNumber: part.PartNumber, @@ -1018,6 +1131,11 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str } } + // Remove part.meta which is not needed anymore. + for _, part := range fi.Parts { + er.removePartMeta(bucket, object, uploadID, fi.DataDir, part.Number) + } + // Rename the multipart object to final location. if onlineDisks, err = renameData(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, bucket, object, writeQuorum); err != nil { diff --git a/cmd/naughty-disk_test.go b/cmd/naughty-disk_test.go index 29df634f9..fed294a62 100644 --- a/cmd/naughty-disk_test.go +++ b/cmd/naughty-disk_test.go @@ -301,3 +301,11 @@ func (d *naughtyDisk) StatInfoFile(ctx context.Context, volume, path string, glo } return d.disk.StatInfoFile(ctx, volume, path, glob) } + +func (d *naughtyDisk) ReadMultiple(ctx context.Context, req ReadMultipleReq, resp chan<- ReadMultipleResp) error { + if err := d.calcError(); err != nil { + close(resp) + return err + } + return d.disk.ReadMultiple(ctx, req, resp) +} diff --git a/cmd/object-api-multipart_test.go b/cmd/object-api-multipart_test.go index e18fd1dba..1ae9bbf94 100644 --- a/cmd/object-api-multipart_test.go +++ b/cmd/object-api-multipart_test.go @@ -26,6 +26,7 @@ import ( "testing" humanize "github.com/dustin/go-humanize" + "github.com/minio/minio/internal/config/storageclass" "github.com/minio/minio/internal/hash" ) @@ -1181,6 +1182,15 @@ func testListObjectPartsDiskNotFound(obj ObjectLayer, instanceType string, disks objectNames := []string{"minio-object-1.txt"} uploadIDs := []string{} + globalStorageClass = storageclass.Config{ + RRS: storageclass.StorageClass{ + Parity: 2, + }, + Standard: storageclass.StorageClass{ + Parity: 4, + }, + } + // bucketnames[0]. // objectNames[0]. // uploadIds [0]. diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index bd12cead6..45cab9471 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -18,10 +18,8 @@ package cmd import ( - "bufio" "context" "encoding/hex" - "encoding/xml" "errors" "fmt" "io" @@ -29,7 +27,6 @@ import ( "net/http/httptest" "net/url" "os" - "sort" "strconv" "strings" "sync" @@ -2218,151 +2215,6 @@ func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *h writeSuccessResponseHeadersOnly(w) } -// Multipart objectAPIHandlers - -// NewMultipartUploadHandler - New multipart upload. -// Notice: The S3 client can send secret keys in headers for encryption related jobs, -// the handler should ensure to remove these keys before sending them to the object layer. -// Currently these keys are: -// - X-Amz-Server-Side-Encryption-Customer-Key -// - X-Amz-Copy-Source-Server-Side-Encryption-Customer-Key -func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r *http.Request) { - ctx := newContext(r, w, "NewMultipartUpload") - - defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) - - objectAPI := api.ObjectAPI() - if objectAPI == nil { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL) - return - } - - if crypto.Requested(r.Header) { - if globalIsGateway { - if crypto.SSEC.IsRequested(r.Header) && !objectAPI.IsEncryptionSupported() { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL) - return - } - } else { - if !objectAPI.IsEncryptionSupported() { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL) - return - } - } - } - - vars := mux.Vars(r) - bucket := vars["bucket"] - object, err := unescapePath(vars["object"]) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - - if s3Error := checkRequestAuthType(ctx, r, policy.PutObjectAction, bucket, object); s3Error != ErrNone { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) - return - } - - // Check if bucket encryption is enabled - sseConfig, _ := globalBucketSSEConfigSys.Get(bucket) - sseConfig.Apply(r.Header, sse.ApplyOptions{ - AutoEncrypt: globalAutoEncryption, - Passthrough: globalIsGateway && globalGatewayName == S3BackendGateway, - }) - - // Validate storage class metadata if present - if sc := r.Header.Get(xhttp.AmzStorageClass); sc != "" { - if !storageclass.IsValid(sc) { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidStorageClass), r.URL) - return - } - } - - encMetadata := map[string]string{} - - if objectAPI.IsEncryptionSupported() { - if crypto.Requested(r.Header) { - if err = setEncryptionMetadata(r, bucket, object, encMetadata); err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - // Set this for multipart only operations, we need to differentiate during - // decryption if the file was actually multipart or not. - encMetadata[ReservedMetadataPrefix+"Encrypted-Multipart"] = "" - } - } - - // Extract metadata that needs to be saved. - metadata, err := extractMetadata(ctx, r) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - - retPerms := isPutActionAllowed(ctx, getRequestAuthType(r), bucket, object, r, iampolicy.PutObjectRetentionAction) - holdPerms := isPutActionAllowed(ctx, getRequestAuthType(r), bucket, object, r, iampolicy.PutObjectLegalHoldAction) - - getObjectInfo := objectAPI.GetObjectInfo - if api.CacheAPI() != nil { - getObjectInfo = api.CacheAPI().GetObjectInfo - } - - retentionMode, retentionDate, legalHold, s3Err := checkPutObjectLockAllowed(ctx, r, bucket, object, getObjectInfo, retPerms, holdPerms) - if s3Err == ErrNone && retentionMode.Valid() { - metadata[strings.ToLower(xhttp.AmzObjectLockMode)] = string(retentionMode) - metadata[strings.ToLower(xhttp.AmzObjectLockRetainUntilDate)] = retentionDate.UTC().Format(iso8601TimeFormat) - } - if s3Err == ErrNone && legalHold.Status.Valid() { - metadata[strings.ToLower(xhttp.AmzObjectLockLegalHold)] = string(legalHold.Status) - } - if s3Err != ErrNone { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL) - return - } - if dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(ObjectInfo{ - UserDefined: metadata, - }, replication.ObjectReplicationType, ObjectOptions{})); dsc.ReplicateAny() { - metadata[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano) - metadata[ReservedMetadataPrefixLower+ReplicationStatus] = dsc.PendingStatus() - } - // We need to preserve the encryption headers set in EncryptRequest, - // so we do not want to override them, copy them instead. - for k, v := range encMetadata { - metadata[k] = v - } - - // Ensure that metadata does not contain sensitive information - crypto.RemoveSensitiveEntries(metadata) - - if objectAPI.IsCompressionSupported() && isCompressible(r.Header, object) { - // Storing the compression metadata. - metadata[ReservedMetadataPrefix+"compression"] = compressionAlgorithmV2 - } - - opts, err := putOpts(ctx, r, bucket, object, metadata) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - newMultipartUpload := objectAPI.NewMultipartUpload - if api.CacheAPI() != nil { - newMultipartUpload = api.CacheAPI().NewMultipartUpload - } - - uploadID, err := newMultipartUpload(ctx, bucket, object, opts) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - - response := generateInitiateMultipartUploadResponse(bucket, object, uploadID) - encodedSuccessResponse := encodeResponse(response) - - // Write success response. - writeSuccessResponseXML(w, encodedSuccessResponse) -} - // CopyObjectPartHandler - uploads a part by copying data from an existing object as data source. func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *http.Request) { ctx := newContext(r, w, "CopyObjectPart") @@ -2701,710 +2553,6 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt writeSuccessResponseXML(w, encodedSuccessResponse) } -// PutObjectPartHandler - uploads an incoming part for an ongoing multipart operation. -func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http.Request) { - ctx := newContext(r, w, "PutObjectPart") - - defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) - - objectAPI := api.ObjectAPI() - if objectAPI == nil { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL) - return - } - - if crypto.Requested(r.Header) { - if globalIsGateway { - if crypto.SSEC.IsRequested(r.Header) && !objectAPI.IsEncryptionSupported() { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL) - return - } - } else { - if !objectAPI.IsEncryptionSupported() { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL) - return - } - } - } - - vars := mux.Vars(r) - bucket := vars["bucket"] - object, err := unescapePath(vars["object"]) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - - // X-Amz-Copy-Source shouldn't be set for this call. - if _, ok := r.Header[xhttp.AmzCopySource]; ok { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidCopySource), r.URL) - return - } - - clientETag, err := etag.FromContentMD5(r.Header) - if err != nil { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidDigest), r.URL) - return - } - - // if Content-Length is unknown/missing, throw away - size := r.ContentLength - - rAuthType := getRequestAuthType(r) - // For auth type streaming signature, we need to gather a different content length. - if rAuthType == authTypeStreamingSigned { - if sizeStr, ok := r.Header[xhttp.AmzDecodedContentLength]; ok { - if sizeStr[0] == "" { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMissingContentLength), r.URL) - return - } - size, err = strconv.ParseInt(sizeStr[0], 10, 64) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - } - } - if size == -1 { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMissingContentLength), r.URL) - return - } - - // maximum Upload size for multipart objects in a single operation - if isMaxAllowedPartSize(size) { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrEntityTooLarge), r.URL) - return - } - - uploadID := r.Form.Get(xhttp.UploadID) - partIDString := r.Form.Get(xhttp.PartNumber) - - partID, err := strconv.Atoi(partIDString) - if err != nil { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidPart), r.URL) - return - } - - // check partID with maximum part ID for multipart objects - if isMaxPartID(partID) { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidMaxParts), r.URL) - return - } - - var ( - md5hex = clientETag.String() - sha256hex = "" - reader io.Reader = r.Body - s3Error APIErrorCode - ) - if s3Error = isPutActionAllowed(ctx, rAuthType, bucket, object, r, iampolicy.PutObjectAction); s3Error != ErrNone { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) - return - } - - switch rAuthType { - case authTypeStreamingSigned: - // Initialize stream signature verifier. - reader, s3Error = newSignV4ChunkedReader(r) - if s3Error != ErrNone { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) - return - } - case authTypeSignedV2, authTypePresignedV2: - if s3Error = isReqAuthenticatedV2(r); s3Error != ErrNone { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) - return - } - case authTypePresigned, authTypeSigned: - if s3Error = reqSignatureV4Verify(r, globalSite.Region, serviceS3); s3Error != ErrNone { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) - return - } - - if !skipContentSha256Cksum(r) { - sha256hex = getContentSha256Cksum(r, serviceS3) - } - } - - if err := enforceBucketQuotaHard(ctx, bucket, size); err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - - actualSize := size - - // get encryption options - var opts ObjectOptions - if crypto.SSEC.IsRequested(r.Header) { - opts, err = getOpts(ctx, r, bucket, object) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - } - - mi, err := objectAPI.GetMultipartInfo(ctx, bucket, object, uploadID, opts) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - - // Read compression metadata preserved in the init multipart for the decision. - _, isCompressed := mi.UserDefined[ReservedMetadataPrefix+"compression"] - - var idxCb func() []byte - if objectAPI.IsCompressionSupported() && isCompressed { - actualReader, err := hash.NewReader(reader, size, md5hex, sha256hex, actualSize) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - - // Set compression metrics. - wantEncryption := objectAPI.IsEncryptionSupported() && crypto.Requested(r.Header) - s2c, cb := newS2CompressReader(actualReader, actualSize, wantEncryption) - idxCb = cb - defer s2c.Close() - reader = etag.Wrap(s2c, actualReader) - size = -1 // Since compressed size is un-predictable. - md5hex = "" // Do not try to verify the content. - sha256hex = "" - } - - hashReader, err := hash.NewReader(reader, size, md5hex, sha256hex, actualSize) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - rawReader := hashReader - pReader := NewPutObjReader(rawReader) - - _, isEncrypted := crypto.IsEncrypted(mi.UserDefined) - var objectEncryptionKey crypto.ObjectKey - if objectAPI.IsEncryptionSupported() && isEncrypted { - if !crypto.SSEC.IsRequested(r.Header) && crypto.SSEC.IsEncrypted(mi.UserDefined) { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrSSEMultipartEncrypted), r.URL) - return - } - - opts, err = putOpts(ctx, r, bucket, object, mi.UserDefined) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - - var key []byte - if crypto.SSEC.IsRequested(r.Header) { - key, err = ParseSSECustomerRequest(r) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - } - - // Calculating object encryption key - key, err = decryptObjectInfo(key, bucket, object, mi.UserDefined) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - copy(objectEncryptionKey[:], key) - - partEncryptionKey := objectEncryptionKey.DerivePartKey(uint32(partID)) - in := io.Reader(hashReader) - if size > encryptBufferThreshold { - // The encryption reads in blocks of 64KB. - // We add a buffer on bigger files to reduce the number of syscalls upstream. - in = bufio.NewReaderSize(hashReader, encryptBufferSize) - } - reader, err = sio.EncryptReader(in, sio.Config{Key: partEncryptionKey[:], CipherSuites: fips.DARECiphers()}) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - wantSize := int64(-1) - if size >= 0 { - info := ObjectInfo{Size: size} - wantSize = info.EncryptedSize() - } - // do not try to verify encrypted content - hashReader, err = hash.NewReader(etag.Wrap(reader, hashReader), wantSize, "", "", actualSize) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - pReader, err = pReader.WithEncryption(hashReader, &objectEncryptionKey) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - if idxCb != nil { - idxCb = compressionIndexEncrypter(objectEncryptionKey, idxCb) - } - } - opts.IndexCB = idxCb - - putObjectPart := objectAPI.PutObjectPart - if api.CacheAPI() != nil { - putObjectPart = api.CacheAPI().PutObjectPart - } - - partInfo, err := putObjectPart(ctx, bucket, object, uploadID, partID, pReader, opts) - if err != nil { - // Verify if the underlying error is signature mismatch. - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - - etag := partInfo.ETag - if kind, encrypted := crypto.IsEncrypted(mi.UserDefined); encrypted { - switch kind { - case crypto.S3KMS: - w.Header().Set(xhttp.AmzServerSideEncryption, xhttp.AmzEncryptionKMS) - w.Header().Set(xhttp.AmzServerSideEncryptionKmsID, mi.KMSKeyID()) - if kmsCtx, ok := mi.UserDefined[crypto.MetaContext]; ok { - w.Header().Set(xhttp.AmzServerSideEncryptionKmsContext, kmsCtx) - } - if len(etag) >= 32 && strings.Count(etag, "-") != 1 { - etag = etag[len(etag)-32:] - } - case crypto.S3: - w.Header().Set(xhttp.AmzServerSideEncryption, xhttp.AmzEncryptionAES) - etag, _ = DecryptETag(objectEncryptionKey, ObjectInfo{ETag: etag}) - case crypto.SSEC: - w.Header().Set(xhttp.AmzServerSideEncryptionCustomerAlgorithm, r.Header.Get(xhttp.AmzServerSideEncryptionCustomerAlgorithm)) - w.Header().Set(xhttp.AmzServerSideEncryptionCustomerKeyMD5, r.Header.Get(xhttp.AmzServerSideEncryptionCustomerKeyMD5)) - - if len(etag) >= 32 && strings.Count(etag, "-") != 1 { - etag = etag[len(etag)-32:] - } - } - } - - // We must not use the http.Header().Set method here because some (broken) - // clients expect the ETag header key to be literally "ETag" - not "Etag" (case-sensitive). - // Therefore, we have to set the ETag directly as map entry. - w.Header()[xhttp.ETag] = []string{"\"" + etag + "\""} - - writeSuccessResponseHeadersOnly(w) -} - -// AbortMultipartUploadHandler - Abort multipart upload -func (api objectAPIHandlers) AbortMultipartUploadHandler(w http.ResponseWriter, r *http.Request) { - ctx := newContext(r, w, "AbortMultipartUpload") - - defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) - - vars := mux.Vars(r) - bucket := vars["bucket"] - object, err := unescapePath(vars["object"]) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - - objectAPI := api.ObjectAPI() - if objectAPI == nil { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL) - return - } - abortMultipartUpload := objectAPI.AbortMultipartUpload - if api.CacheAPI() != nil { - abortMultipartUpload = api.CacheAPI().AbortMultipartUpload - } - - if s3Error := checkRequestAuthType(ctx, r, policy.AbortMultipartUploadAction, bucket, object); s3Error != ErrNone { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) - return - } - - uploadID, _, _, _, s3Error := getObjectResources(r.Form) - if s3Error != ErrNone { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) - return - } - opts := ObjectOptions{} - if err := abortMultipartUpload(ctx, bucket, object, uploadID, opts); err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - - writeSuccessNoContent(w) -} - -// ListObjectPartsHandler - List object parts -func (api objectAPIHandlers) ListObjectPartsHandler(w http.ResponseWriter, r *http.Request) { - ctx := newContext(r, w, "ListObjectParts") - - defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) - - vars := mux.Vars(r) - bucket := vars["bucket"] - object, err := unescapePath(vars["object"]) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - - objectAPI := api.ObjectAPI() - if objectAPI == nil { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL) - return - } - - if s3Error := checkRequestAuthType(ctx, r, policy.ListMultipartUploadPartsAction, bucket, object); s3Error != ErrNone { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) - return - } - - uploadID, partNumberMarker, maxParts, encodingType, s3Error := getObjectResources(r.Form) - if s3Error != ErrNone { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) - return - } - if partNumberMarker < 0 { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidPartNumberMarker), r.URL) - return - } - if maxParts < 0 { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidMaxParts), r.URL) - return - } - - opts := ObjectOptions{} - listPartsInfo, err := objectAPI.ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxParts, opts) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - - // We have to adjust the size of encrypted parts since encrypted parts - // are slightly larger due to encryption overhead. - // Further, we have to adjust the ETags of parts when using SSE-S3. - // Due to AWS S3, SSE-S3 encrypted parts return the plaintext ETag - // being the content MD5 of that particular part. This is not the - // case for SSE-C and SSE-KMS objects. - if kind, ok := crypto.IsEncrypted(listPartsInfo.UserDefined); ok && objectAPI.IsEncryptionSupported() { - var objectEncryptionKey []byte - if kind == crypto.S3 { - objectEncryptionKey, err = decryptObjectInfo(nil, bucket, object, listPartsInfo.UserDefined) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - } - for i, p := range listPartsInfo.Parts { - listPartsInfo.Parts[i].ETag = tryDecryptETag(objectEncryptionKey, p.ETag, kind != crypto.S3) - size, err := sio.DecryptedSize(uint64(p.Size)) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - listPartsInfo.Parts[i].Size = int64(size) - } - } - - response := generateListPartsResponse(listPartsInfo, encodingType) - encodedSuccessResponse := encodeResponse(response) - - // Write success response. - writeSuccessResponseXML(w, encodedSuccessResponse) -} - -type whiteSpaceWriter struct { - http.ResponseWriter - http.Flusher - written bool -} - -func (w *whiteSpaceWriter) Write(b []byte) (n int, err error) { - n, err = w.ResponseWriter.Write(b) - w.written = true - return -} - -func (w *whiteSpaceWriter) WriteHeader(statusCode int) { - if !w.written { - w.ResponseWriter.WriteHeader(statusCode) - } -} - -// Send empty whitespaces every 10 seconds to the client till completeMultiPartUpload() is -// done so that the client does not time out. Downside is we might send 200 OK and -// then send error XML. But accoording to S3 spec the client is supposed to check -// for error XML even if it received 200 OK. But for erasure this is not a problem -// as completeMultiPartUpload() is quick. Even For FS, it would not be an issue as -// we do background append as and when the parts arrive and completeMultiPartUpload -// is quick. Only in a rare case where parts would be out of order will -// FS:completeMultiPartUpload() take a longer time. -func sendWhiteSpace(ctx context.Context, w http.ResponseWriter) <-chan bool { - doneCh := make(chan bool) - go func() { - defer close(doneCh) - ticker := time.NewTicker(time.Second * 10) - defer ticker.Stop() - headerWritten := false - for { - select { - case <-ticker.C: - // Write header if not written yet. - if !headerWritten { - _, err := w.Write([]byte(xml.Header)) - headerWritten = err == nil - } - - // Once header is written keep writing empty spaces - // which are ignored by client SDK XML parsers. - // This occurs when server takes long time to completeMultiPartUpload() - _, err := w.Write([]byte(" ")) - if err != nil { - return - } - w.(http.Flusher).Flush() - case doneCh <- headerWritten: - return - case <-ctx.Done(): - return - } - } - }() - return doneCh -} - -// CompleteMultipartUploadHandler - Complete multipart upload. -func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.Request) { - ctx := newContext(r, w, "CompleteMultipartUpload") - - defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) - - vars := mux.Vars(r) - bucket := vars["bucket"] - object, err := unescapePath(vars["object"]) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - - objectAPI := api.ObjectAPI() - if objectAPI == nil { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL) - return - } - - if s3Error := checkRequestAuthType(ctx, r, policy.PutObjectAction, bucket, object); s3Error != ErrNone { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) - return - } - - // Content-Length is required and should be non-zero - if r.ContentLength <= 0 { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMissingContentLength), r.URL) - return - } - - // Get upload id. - uploadID, _, _, _, s3Error := getObjectResources(r.Form) - if s3Error != ErrNone { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) - return - } - - complMultipartUpload := &CompleteMultipartUpload{} - if err = xmlDecoder(r.Body, complMultipartUpload, r.ContentLength); err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - if len(complMultipartUpload.Parts) == 0 { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMalformedXML), r.URL) - return - } - if !sort.IsSorted(CompletedParts(complMultipartUpload.Parts)) { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidPartOrder), r.URL) - return - } - - // Reject retention or governance headers if set, CompleteMultipartUpload spec - // does not use these headers, and should not be passed down to checkPutObjectLockAllowed - if objectlock.IsObjectLockRequested(r.Header) || objectlock.IsObjectLockGovernanceBypassSet(r.Header) { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidRequest), r.URL) - return - } - - if _, _, _, s3Err := checkPutObjectLockAllowed(ctx, r, bucket, object, objectAPI.GetObjectInfo, ErrNone, ErrNone); s3Err != ErrNone { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL) - return - } - - completeMultiPartUpload := objectAPI.CompleteMultipartUpload - if api.CacheAPI() != nil { - completeMultiPartUpload = api.CacheAPI().CompleteMultipartUpload - } - // This code is specifically to handle the requirements for slow - // complete multipart upload operations on FS mode. - writeErrorResponseWithoutXMLHeader := func(ctx context.Context, w http.ResponseWriter, err APIError, reqURL *url.URL) { - switch err.Code { - case "SlowDown", "XMinioServerNotInitialized", "XMinioReadQuorum", "XMinioWriteQuorum": - // Set retxry-after header to indicate user-agents to retry request after 120secs. - // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After - w.Header().Set(xhttp.RetryAfter, "120") - } - - // Generate error response. - errorResponse := getAPIErrorResponse(ctx, err, reqURL.Path, - w.Header().Get(xhttp.AmzRequestID), globalDeploymentID) - encodedErrorResponse, _ := xml.Marshal(errorResponse) - setCommonHeaders(w) - w.Header().Set(xhttp.ContentType, string(mimeXML)) - w.Write(encodedErrorResponse) - } - - versioned := globalBucketVersioningSys.PrefixEnabled(bucket, object) - suspended := globalBucketVersioningSys.PrefixSuspended(bucket, object) - os := newObjSweeper(bucket, object).WithVersioning(versioned, suspended) - if !globalTierConfigMgr.Empty() { - // Get appropriate object info to identify the remote object to delete - goiOpts := os.GetOpts() - if goi, gerr := objectAPI.GetObjectInfo(ctx, bucket, object, goiOpts); gerr == nil { - os.SetTransitionState(goi.TransitionedObject) - } - } - - setEventStreamHeaders(w) - - opts, err := completeMultipartOpts(ctx, r, bucket, object) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - - // First, we compute the ETag of the multipart object. - // The ETag of a multi-part object is always: - // ETag := MD5(ETag_p1, ETag_p2, ...)+"-N" (N being the number of parts) - // - // This is independent of encryption. An encrypted multipart - // object also has an ETag that is the MD5 of its part ETags. - // The fact the in case of encryption the ETag of a part is - // not the MD5 of the part content does not change that. - var completeETags []etag.ETag - for _, part := range complMultipartUpload.Parts { - ETag, err := etag.Parse(part.ETag) - if err != nil { - continue - } - completeETags = append(completeETags, ETag) - } - multipartETag := etag.Multipart(completeETags...) - opts.UserDefined["etag"] = multipartETag.String() - - // However, in case of encryption, the persisted part ETags don't match - // what we have sent to the client during PutObjectPart. The reason is - // that ETags are encrypted. Hence, the client will send a list of complete - // part ETags of which non can match the ETag of any part. For example - // ETag (client): 30902184f4e62dd8f98f0aaff810c626 - // ETag (server-internal): 20000f00ce5dc16e3f3b124f586ae1d88e9caa1c598415c2759bbb50e84a59f630902184f4e62dd8f98f0aaff810c626 - // - // Therefore, we adjust all ETags sent by the client to match what is stored - // on the backend. - if objectAPI.IsEncryptionSupported() { - mi, err := objectAPI.GetMultipartInfo(ctx, bucket, object, uploadID, ObjectOptions{}) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - - if _, ok := crypto.IsEncrypted(mi.UserDefined); ok { - const MaxParts = 10000 - listPartsInfo, err := objectAPI.ListObjectParts(ctx, bucket, object, uploadID, 0, MaxParts, ObjectOptions{}) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - sort.Slice(listPartsInfo.Parts, func(i, j int) bool { - return listPartsInfo.Parts[i].PartNumber < listPartsInfo.Parts[j].PartNumber - }) - sort.Slice(complMultipartUpload.Parts, func(i, j int) bool { - return complMultipartUpload.Parts[i].PartNumber < complMultipartUpload.Parts[j].PartNumber - }) - for i := range listPartsInfo.Parts { - for j := range complMultipartUpload.Parts { - if listPartsInfo.Parts[i].PartNumber == complMultipartUpload.Parts[j].PartNumber { - complMultipartUpload.Parts[j].ETag = listPartsInfo.Parts[i].ETag - continue - } - } - } - } - } - - w = &whiteSpaceWriter{ResponseWriter: w, Flusher: w.(http.Flusher)} - completeDoneCh := sendWhiteSpace(ctx, w) - objInfo, err := completeMultiPartUpload(ctx, bucket, object, uploadID, complMultipartUpload.Parts, opts) - // Stop writing white spaces to the client. Note that close(doneCh) style is not used as it - // can cause white space to be written after we send XML response in a race condition. - headerWritten := <-completeDoneCh - if err != nil { - if headerWritten { - writeErrorResponseWithoutXMLHeader(ctx, w, toAPIError(ctx, err), r.URL) - } else { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - } - return - } - - // Get object location. - location := getObjectLocation(r, globalDomainNames, bucket, object) - // Generate complete multipart response. - response := generateCompleteMultpartUploadResponse(bucket, object, location, objInfo.ETag) - var encodedSuccessResponse []byte - if !headerWritten { - encodedSuccessResponse = encodeResponse(response) - } else { - encodedSuccessResponse, err = xml.Marshal(response) - if err != nil { - writeErrorResponseWithoutXMLHeader(ctx, w, toAPIError(ctx, err), r.URL) - return - } - } - - if r.Header.Get(xMinIOExtract) == "true" && strings.HasSuffix(object, archiveExt) { - opts := ObjectOptions{VersionID: objInfo.VersionID, MTime: objInfo.ModTime} - if _, err := updateObjectMetadataWithZipInfo(ctx, objectAPI, bucket, object, opts); err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - } - - setPutObjHeaders(w, objInfo, false) - if dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(objInfo, replication.ObjectReplicationType, opts)); dsc.ReplicateAny() { - scheduleReplication(ctx, objInfo.Clone(), objectAPI, dsc, replication.ObjectReplicationType) - } - if _, ok := r.Header[xhttp.MinIOSourceReplicationRequest]; ok { - actualSize, _ := objInfo.GetActualSize() - defer globalReplicationStats.UpdateReplicaStat(bucket, actualSize) - } - - // Write success response. - writeSuccessResponseXML(w, encodedSuccessResponse) - - // Notify object created event. - sendEvent(eventArgs{ - EventName: event.ObjectCreatedCompleteMultipartUpload, - BucketName: bucket, - Object: objInfo, - ReqParams: extractReqParams(r), - RespElements: extractRespElements(w), - UserAgent: r.UserAgent(), - Host: handlers.GetSourceIP(r), - }) - - // Remove the transitioned object whose object version is being overwritten. - if !globalTierConfigMgr.Empty() { - // Schedule object for immediate transition if eligible. - enqueueTransitionImmediate(objInfo) - logger.LogIf(ctx, os.Sweep()) - } -} - // Delete objectAPIHandlers // DeleteObjectHandler - delete an object diff --git a/cmd/object-multipart-handlers.go b/cmd/object-multipart-handlers.go new file mode 100644 index 000000000..ab9ffd00c --- /dev/null +++ b/cmd/object-multipart-handlers.go @@ -0,0 +1,897 @@ +// Copyright (c) 2015-2022 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cmd + +import ( + "bufio" + "context" + "encoding/xml" + "io" + "net/http" + "net/url" + "sort" + "strconv" + "strings" + "time" + + "github.com/gorilla/mux" + sse "github.com/minio/minio/internal/bucket/encryption" + objectlock "github.com/minio/minio/internal/bucket/object/lock" + "github.com/minio/minio/internal/bucket/replication" + "github.com/minio/minio/internal/config/storageclass" + "github.com/minio/minio/internal/crypto" + "github.com/minio/minio/internal/etag" + "github.com/minio/minio/internal/event" + "github.com/minio/minio/internal/fips" + "github.com/minio/minio/internal/handlers" + "github.com/minio/minio/internal/hash" + xhttp "github.com/minio/minio/internal/http" + "github.com/minio/minio/internal/logger" + "github.com/minio/pkg/bucket/policy" + iampolicy "github.com/minio/pkg/iam/policy" + "github.com/minio/sio" +) + +// Multipart objectAPIHandlers + +// NewMultipartUploadHandler - New multipart upload. +// Notice: The S3 client can send secret keys in headers for encryption related jobs, +// the handler should ensure to remove these keys before sending them to the object layer. +// Currently these keys are: +// - X-Amz-Server-Side-Encryption-Customer-Key +// - X-Amz-Copy-Source-Server-Side-Encryption-Customer-Key +func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "NewMultipartUpload") + + defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) + + objectAPI := api.ObjectAPI() + if objectAPI == nil { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL) + return + } + + if crypto.Requested(r.Header) { + if globalIsGateway { + if crypto.SSEC.IsRequested(r.Header) && !objectAPI.IsEncryptionSupported() { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL) + return + } + } else { + if !objectAPI.IsEncryptionSupported() { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL) + return + } + } + } + + vars := mux.Vars(r) + bucket := vars["bucket"] + object, err := unescapePath(vars["object"]) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + if s3Error := checkRequestAuthType(ctx, r, policy.PutObjectAction, bucket, object); s3Error != ErrNone { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) + return + } + + // Check if bucket encryption is enabled + sseConfig, _ := globalBucketSSEConfigSys.Get(bucket) + sseConfig.Apply(r.Header, sse.ApplyOptions{ + AutoEncrypt: globalAutoEncryption, + Passthrough: globalIsGateway && globalGatewayName == S3BackendGateway, + }) + + // Validate storage class metadata if present + if sc := r.Header.Get(xhttp.AmzStorageClass); sc != "" { + if !storageclass.IsValid(sc) { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidStorageClass), r.URL) + return + } + } + + encMetadata := map[string]string{} + + if objectAPI.IsEncryptionSupported() { + if crypto.Requested(r.Header) { + if err = setEncryptionMetadata(r, bucket, object, encMetadata); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + // Set this for multipart only operations, we need to differentiate during + // decryption if the file was actually multipart or not. + encMetadata[ReservedMetadataPrefix+"Encrypted-Multipart"] = "" + } + } + + // Extract metadata that needs to be saved. + metadata, err := extractMetadata(ctx, r) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + retPerms := isPutActionAllowed(ctx, getRequestAuthType(r), bucket, object, r, iampolicy.PutObjectRetentionAction) + holdPerms := isPutActionAllowed(ctx, getRequestAuthType(r), bucket, object, r, iampolicy.PutObjectLegalHoldAction) + + getObjectInfo := objectAPI.GetObjectInfo + if api.CacheAPI() != nil { + getObjectInfo = api.CacheAPI().GetObjectInfo + } + + retentionMode, retentionDate, legalHold, s3Err := checkPutObjectLockAllowed(ctx, r, bucket, object, getObjectInfo, retPerms, holdPerms) + if s3Err == ErrNone && retentionMode.Valid() { + metadata[strings.ToLower(xhttp.AmzObjectLockMode)] = string(retentionMode) + metadata[strings.ToLower(xhttp.AmzObjectLockRetainUntilDate)] = retentionDate.UTC().Format(iso8601TimeFormat) + } + if s3Err == ErrNone && legalHold.Status.Valid() { + metadata[strings.ToLower(xhttp.AmzObjectLockLegalHold)] = string(legalHold.Status) + } + if s3Err != ErrNone { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL) + return + } + if dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(ObjectInfo{ + UserDefined: metadata, + }, replication.ObjectReplicationType, ObjectOptions{})); dsc.ReplicateAny() { + metadata[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano) + metadata[ReservedMetadataPrefixLower+ReplicationStatus] = dsc.PendingStatus() + } + // We need to preserve the encryption headers set in EncryptRequest, + // so we do not want to override them, copy them instead. + for k, v := range encMetadata { + metadata[k] = v + } + + // Ensure that metadata does not contain sensitive information + crypto.RemoveSensitiveEntries(metadata) + + if objectAPI.IsCompressionSupported() && isCompressible(r.Header, object) { + // Storing the compression metadata. + metadata[ReservedMetadataPrefix+"compression"] = compressionAlgorithmV2 + } + + opts, err := putOpts(ctx, r, bucket, object, metadata) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + newMultipartUpload := objectAPI.NewMultipartUpload + if api.CacheAPI() != nil { + newMultipartUpload = api.CacheAPI().NewMultipartUpload + } + + uploadID, err := newMultipartUpload(ctx, bucket, object, opts) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + response := generateInitiateMultipartUploadResponse(bucket, object, uploadID) + encodedSuccessResponse := encodeResponse(response) + + // Write success response. + writeSuccessResponseXML(w, encodedSuccessResponse) +} + +// PutObjectPartHandler - uploads an incoming part for an ongoing multipart operation. +func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "PutObjectPart") + + defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) + + objectAPI := api.ObjectAPI() + if objectAPI == nil { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL) + return + } + + if crypto.Requested(r.Header) { + if globalIsGateway { + if crypto.SSEC.IsRequested(r.Header) && !objectAPI.IsEncryptionSupported() { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL) + return + } + } else { + if !objectAPI.IsEncryptionSupported() { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL) + return + } + } + } + + vars := mux.Vars(r) + bucket := vars["bucket"] + object, err := unescapePath(vars["object"]) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + // X-Amz-Copy-Source shouldn't be set for this call. + if _, ok := r.Header[xhttp.AmzCopySource]; ok { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidCopySource), r.URL) + return + } + + clientETag, err := etag.FromContentMD5(r.Header) + if err != nil { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidDigest), r.URL) + return + } + + // if Content-Length is unknown/missing, throw away + size := r.ContentLength + + rAuthType := getRequestAuthType(r) + // For auth type streaming signature, we need to gather a different content length. + if rAuthType == authTypeStreamingSigned { + if sizeStr, ok := r.Header[xhttp.AmzDecodedContentLength]; ok { + if sizeStr[0] == "" { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMissingContentLength), r.URL) + return + } + size, err = strconv.ParseInt(sizeStr[0], 10, 64) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + } + } + if size == -1 { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMissingContentLength), r.URL) + return + } + + // maximum Upload size for multipart objects in a single operation + if isMaxAllowedPartSize(size) { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrEntityTooLarge), r.URL) + return + } + + uploadID := r.Form.Get(xhttp.UploadID) + partIDString := r.Form.Get(xhttp.PartNumber) + + partID, err := strconv.Atoi(partIDString) + if err != nil { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidPart), r.URL) + return + } + + // check partID with maximum part ID for multipart objects + if isMaxPartID(partID) { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidMaxParts), r.URL) + return + } + + var ( + md5hex = clientETag.String() + sha256hex = "" + reader io.Reader = r.Body + s3Error APIErrorCode + ) + if s3Error = isPutActionAllowed(ctx, rAuthType, bucket, object, r, iampolicy.PutObjectAction); s3Error != ErrNone { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) + return + } + + switch rAuthType { + case authTypeStreamingSigned: + // Initialize stream signature verifier. + reader, s3Error = newSignV4ChunkedReader(r) + if s3Error != ErrNone { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) + return + } + case authTypeSignedV2, authTypePresignedV2: + if s3Error = isReqAuthenticatedV2(r); s3Error != ErrNone { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) + return + } + case authTypePresigned, authTypeSigned: + if s3Error = reqSignatureV4Verify(r, globalSite.Region, serviceS3); s3Error != ErrNone { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) + return + } + + if !skipContentSha256Cksum(r) { + sha256hex = getContentSha256Cksum(r, serviceS3) + } + } + + if err := enforceBucketQuotaHard(ctx, bucket, size); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + actualSize := size + + // get encryption options + var opts ObjectOptions + if crypto.SSEC.IsRequested(r.Header) { + opts, err = getOpts(ctx, r, bucket, object) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + } + + mi, err := objectAPI.GetMultipartInfo(ctx, bucket, object, uploadID, opts) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + // Read compression metadata preserved in the init multipart for the decision. + _, isCompressed := mi.UserDefined[ReservedMetadataPrefix+"compression"] + + var idxCb func() []byte + if objectAPI.IsCompressionSupported() && isCompressed { + actualReader, err := hash.NewReader(reader, size, md5hex, sha256hex, actualSize) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + // Set compression metrics. + wantEncryption := objectAPI.IsEncryptionSupported() && crypto.Requested(r.Header) + s2c, cb := newS2CompressReader(actualReader, actualSize, wantEncryption) + idxCb = cb + defer s2c.Close() + reader = etag.Wrap(s2c, actualReader) + size = -1 // Since compressed size is un-predictable. + md5hex = "" // Do not try to verify the content. + sha256hex = "" + } + + hashReader, err := hash.NewReader(reader, size, md5hex, sha256hex, actualSize) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + rawReader := hashReader + pReader := NewPutObjReader(rawReader) + + _, isEncrypted := crypto.IsEncrypted(mi.UserDefined) + var objectEncryptionKey crypto.ObjectKey + if objectAPI.IsEncryptionSupported() && isEncrypted { + if !crypto.SSEC.IsRequested(r.Header) && crypto.SSEC.IsEncrypted(mi.UserDefined) { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrSSEMultipartEncrypted), r.URL) + return + } + + opts, err = putOpts(ctx, r, bucket, object, mi.UserDefined) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + var key []byte + if crypto.SSEC.IsRequested(r.Header) { + key, err = ParseSSECustomerRequest(r) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + } + + // Calculating object encryption key + key, err = decryptObjectInfo(key, bucket, object, mi.UserDefined) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + copy(objectEncryptionKey[:], key) + + partEncryptionKey := objectEncryptionKey.DerivePartKey(uint32(partID)) + in := io.Reader(hashReader) + if size > encryptBufferThreshold { + // The encryption reads in blocks of 64KB. + // We add a buffer on bigger files to reduce the number of syscalls upstream. + in = bufio.NewReaderSize(hashReader, encryptBufferSize) + } + reader, err = sio.EncryptReader(in, sio.Config{Key: partEncryptionKey[:], CipherSuites: fips.DARECiphers()}) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + wantSize := int64(-1) + if size >= 0 { + info := ObjectInfo{Size: size} + wantSize = info.EncryptedSize() + } + // do not try to verify encrypted content + hashReader, err = hash.NewReader(etag.Wrap(reader, hashReader), wantSize, "", "", actualSize) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + pReader, err = pReader.WithEncryption(hashReader, &objectEncryptionKey) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + if idxCb != nil { + idxCb = compressionIndexEncrypter(objectEncryptionKey, idxCb) + } + } + opts.IndexCB = idxCb + + putObjectPart := objectAPI.PutObjectPart + if api.CacheAPI() != nil { + putObjectPart = api.CacheAPI().PutObjectPart + } + + partInfo, err := putObjectPart(ctx, bucket, object, uploadID, partID, pReader, opts) + if err != nil { + // Verify if the underlying error is signature mismatch. + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + etag := partInfo.ETag + if kind, encrypted := crypto.IsEncrypted(mi.UserDefined); encrypted { + switch kind { + case crypto.S3KMS: + w.Header().Set(xhttp.AmzServerSideEncryption, xhttp.AmzEncryptionKMS) + w.Header().Set(xhttp.AmzServerSideEncryptionKmsID, mi.KMSKeyID()) + if kmsCtx, ok := mi.UserDefined[crypto.MetaContext]; ok { + w.Header().Set(xhttp.AmzServerSideEncryptionKmsContext, kmsCtx) + } + if len(etag) >= 32 && strings.Count(etag, "-") != 1 { + etag = etag[len(etag)-32:] + } + case crypto.S3: + w.Header().Set(xhttp.AmzServerSideEncryption, xhttp.AmzEncryptionAES) + etag, _ = DecryptETag(objectEncryptionKey, ObjectInfo{ETag: etag}) + case crypto.SSEC: + w.Header().Set(xhttp.AmzServerSideEncryptionCustomerAlgorithm, r.Header.Get(xhttp.AmzServerSideEncryptionCustomerAlgorithm)) + w.Header().Set(xhttp.AmzServerSideEncryptionCustomerKeyMD5, r.Header.Get(xhttp.AmzServerSideEncryptionCustomerKeyMD5)) + + if len(etag) >= 32 && strings.Count(etag, "-") != 1 { + etag = etag[len(etag)-32:] + } + } + } + + // We must not use the http.Header().Set method here because some (broken) + // clients expect the ETag header key to be literally "ETag" - not "Etag" (case-sensitive). + // Therefore, we have to set the ETag directly as map entry. + w.Header()[xhttp.ETag] = []string{"\"" + etag + "\""} + + writeSuccessResponseHeadersOnly(w) +} + +// CompleteMultipartUploadHandler - Complete multipart upload. +func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "CompleteMultipartUpload") + + defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) + + vars := mux.Vars(r) + bucket := vars["bucket"] + object, err := unescapePath(vars["object"]) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + objectAPI := api.ObjectAPI() + if objectAPI == nil { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL) + return + } + + if s3Error := checkRequestAuthType(ctx, r, policy.PutObjectAction, bucket, object); s3Error != ErrNone { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) + return + } + + // Content-Length is required and should be non-zero + if r.ContentLength <= 0 { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMissingContentLength), r.URL) + return + } + + // Get upload id. + uploadID, _, _, _, s3Error := getObjectResources(r.Form) + if s3Error != ErrNone { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) + return + } + + complMultipartUpload := &CompleteMultipartUpload{} + if err = xmlDecoder(r.Body, complMultipartUpload, r.ContentLength); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + if len(complMultipartUpload.Parts) == 0 { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMalformedXML), r.URL) + return + } + if !sort.IsSorted(CompletedParts(complMultipartUpload.Parts)) { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidPartOrder), r.URL) + return + } + + // Reject retention or governance headers if set, CompleteMultipartUpload spec + // does not use these headers, and should not be passed down to checkPutObjectLockAllowed + if objectlock.IsObjectLockRequested(r.Header) || objectlock.IsObjectLockGovernanceBypassSet(r.Header) { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidRequest), r.URL) + return + } + + if _, _, _, s3Err := checkPutObjectLockAllowed(ctx, r, bucket, object, objectAPI.GetObjectInfo, ErrNone, ErrNone); s3Err != ErrNone { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL) + return + } + + completeMultiPartUpload := objectAPI.CompleteMultipartUpload + if api.CacheAPI() != nil { + completeMultiPartUpload = api.CacheAPI().CompleteMultipartUpload + } + // This code is specifically to handle the requirements for slow + // complete multipart upload operations on FS mode. + writeErrorResponseWithoutXMLHeader := func(ctx context.Context, w http.ResponseWriter, err APIError, reqURL *url.URL) { + switch err.Code { + case "SlowDown", "XMinioServerNotInitialized", "XMinioReadQuorum", "XMinioWriteQuorum": + // Set retxry-after header to indicate user-agents to retry request after 120secs. + // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After + w.Header().Set(xhttp.RetryAfter, "120") + } + + // Generate error response. + errorResponse := getAPIErrorResponse(ctx, err, reqURL.Path, + w.Header().Get(xhttp.AmzRequestID), globalDeploymentID) + encodedErrorResponse, _ := xml.Marshal(errorResponse) + setCommonHeaders(w) + w.Header().Set(xhttp.ContentType, string(mimeXML)) + w.Write(encodedErrorResponse) + } + + versioned := globalBucketVersioningSys.PrefixEnabled(bucket, object) + suspended := globalBucketVersioningSys.PrefixSuspended(bucket, object) + os := newObjSweeper(bucket, object).WithVersioning(versioned, suspended) + if !globalTierConfigMgr.Empty() { + // Get appropriate object info to identify the remote object to delete + goiOpts := os.GetOpts() + if goi, gerr := objectAPI.GetObjectInfo(ctx, bucket, object, goiOpts); gerr == nil { + os.SetTransitionState(goi.TransitionedObject) + } + } + + setEventStreamHeaders(w) + + opts, err := completeMultipartOpts(ctx, r, bucket, object) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + // First, we compute the ETag of the multipart object. + // The ETag of a multi-part object is always: + // ETag := MD5(ETag_p1, ETag_p2, ...)+"-N" (N being the number of parts) + // + // This is independent of encryption. An encrypted multipart + // object also has an ETag that is the MD5 of its part ETags. + // The fact the in case of encryption the ETag of a part is + // not the MD5 of the part content does not change that. + var completeETags []etag.ETag + for _, part := range complMultipartUpload.Parts { + ETag, err := etag.Parse(part.ETag) + if err != nil { + continue + } + completeETags = append(completeETags, ETag) + } + multipartETag := etag.Multipart(completeETags...) + opts.UserDefined["etag"] = multipartETag.String() + + // However, in case of encryption, the persisted part ETags don't match + // what we have sent to the client during PutObjectPart. The reason is + // that ETags are encrypted. Hence, the client will send a list of complete + // part ETags of which non can match the ETag of any part. For example + // ETag (client): 30902184f4e62dd8f98f0aaff810c626 + // ETag (server-internal): 20000f00ce5dc16e3f3b124f586ae1d88e9caa1c598415c2759bbb50e84a59f630902184f4e62dd8f98f0aaff810c626 + // + // Therefore, we adjust all ETags sent by the client to match what is stored + // on the backend. + if objectAPI.IsEncryptionSupported() { + mi, err := objectAPI.GetMultipartInfo(ctx, bucket, object, uploadID, ObjectOptions{}) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + if _, ok := crypto.IsEncrypted(mi.UserDefined); ok { + const MaxParts = 10000 + listPartsInfo, err := objectAPI.ListObjectParts(ctx, bucket, object, uploadID, 0, MaxParts, ObjectOptions{}) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + sort.Slice(listPartsInfo.Parts, func(i, j int) bool { + return listPartsInfo.Parts[i].PartNumber < listPartsInfo.Parts[j].PartNumber + }) + sort.Slice(complMultipartUpload.Parts, func(i, j int) bool { + return complMultipartUpload.Parts[i].PartNumber < complMultipartUpload.Parts[j].PartNumber + }) + for i := range listPartsInfo.Parts { + for j := range complMultipartUpload.Parts { + if listPartsInfo.Parts[i].PartNumber == complMultipartUpload.Parts[j].PartNumber { + complMultipartUpload.Parts[j].ETag = listPartsInfo.Parts[i].ETag + continue + } + } + } + } + } + + w = &whiteSpaceWriter{ResponseWriter: w, Flusher: w.(http.Flusher)} + completeDoneCh := sendWhiteSpace(ctx, w) + objInfo, err := completeMultiPartUpload(ctx, bucket, object, uploadID, complMultipartUpload.Parts, opts) + // Stop writing white spaces to the client. Note that close(doneCh) style is not used as it + // can cause white space to be written after we send XML response in a race condition. + headerWritten := <-completeDoneCh + if err != nil { + if headerWritten { + writeErrorResponseWithoutXMLHeader(ctx, w, toAPIError(ctx, err), r.URL) + } else { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + } + return + } + + // Get object location. + location := getObjectLocation(r, globalDomainNames, bucket, object) + // Generate complete multipart response. + response := generateCompleteMultpartUploadResponse(bucket, object, location, objInfo.ETag) + var encodedSuccessResponse []byte + if !headerWritten { + encodedSuccessResponse = encodeResponse(response) + } else { + encodedSuccessResponse, err = xml.Marshal(response) + if err != nil { + writeErrorResponseWithoutXMLHeader(ctx, w, toAPIError(ctx, err), r.URL) + return + } + } + + if r.Header.Get(xMinIOExtract) == "true" && strings.HasSuffix(object, archiveExt) { + opts := ObjectOptions{VersionID: objInfo.VersionID, MTime: objInfo.ModTime} + if _, err := updateObjectMetadataWithZipInfo(ctx, objectAPI, bucket, object, opts); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + } + + setPutObjHeaders(w, objInfo, false) + if dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(objInfo, replication.ObjectReplicationType, opts)); dsc.ReplicateAny() { + scheduleReplication(ctx, objInfo.Clone(), objectAPI, dsc, replication.ObjectReplicationType) + } + if _, ok := r.Header[xhttp.MinIOSourceReplicationRequest]; ok { + actualSize, _ := objInfo.GetActualSize() + defer globalReplicationStats.UpdateReplicaStat(bucket, actualSize) + } + + // Write success response. + writeSuccessResponseXML(w, encodedSuccessResponse) + + // Notify object created event. + sendEvent(eventArgs{ + EventName: event.ObjectCreatedCompleteMultipartUpload, + BucketName: bucket, + Object: objInfo, + ReqParams: extractReqParams(r), + RespElements: extractRespElements(w), + UserAgent: r.UserAgent(), + Host: handlers.GetSourceIP(r), + }) + + // Remove the transitioned object whose object version is being overwritten. + if !globalTierConfigMgr.Empty() { + // Schedule object for immediate transition if eligible. + enqueueTransitionImmediate(objInfo) + logger.LogIf(ctx, os.Sweep()) + } +} + +// AbortMultipartUploadHandler - Abort multipart upload +func (api objectAPIHandlers) AbortMultipartUploadHandler(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "AbortMultipartUpload") + + defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) + + vars := mux.Vars(r) + bucket := vars["bucket"] + object, err := unescapePath(vars["object"]) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + objectAPI := api.ObjectAPI() + if objectAPI == nil { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL) + return + } + abortMultipartUpload := objectAPI.AbortMultipartUpload + if api.CacheAPI() != nil { + abortMultipartUpload = api.CacheAPI().AbortMultipartUpload + } + + if s3Error := checkRequestAuthType(ctx, r, policy.AbortMultipartUploadAction, bucket, object); s3Error != ErrNone { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) + return + } + + uploadID, _, _, _, s3Error := getObjectResources(r.Form) + if s3Error != ErrNone { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) + return + } + opts := ObjectOptions{} + if err := abortMultipartUpload(ctx, bucket, object, uploadID, opts); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + writeSuccessNoContent(w) +} + +// ListObjectPartsHandler - List object parts +func (api objectAPIHandlers) ListObjectPartsHandler(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "ListObjectParts") + + defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) + + vars := mux.Vars(r) + bucket := vars["bucket"] + object, err := unescapePath(vars["object"]) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + objectAPI := api.ObjectAPI() + if objectAPI == nil { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL) + return + } + + if s3Error := checkRequestAuthType(ctx, r, policy.ListMultipartUploadPartsAction, bucket, object); s3Error != ErrNone { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) + return + } + + uploadID, partNumberMarker, maxParts, encodingType, s3Error := getObjectResources(r.Form) + if s3Error != ErrNone { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) + return + } + if partNumberMarker < 0 { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidPartNumberMarker), r.URL) + return + } + if maxParts < 0 { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidMaxParts), r.URL) + return + } + + opts := ObjectOptions{} + listPartsInfo, err := objectAPI.ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxParts, opts) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + // We have to adjust the size of encrypted parts since encrypted parts + // are slightly larger due to encryption overhead. + // Further, we have to adjust the ETags of parts when using SSE-S3. + // Due to AWS S3, SSE-S3 encrypted parts return the plaintext ETag + // being the content MD5 of that particular part. This is not the + // case for SSE-C and SSE-KMS objects. + if kind, ok := crypto.IsEncrypted(listPartsInfo.UserDefined); ok && objectAPI.IsEncryptionSupported() { + var objectEncryptionKey []byte + if kind == crypto.S3 { + objectEncryptionKey, err = decryptObjectInfo(nil, bucket, object, listPartsInfo.UserDefined) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + } + for i, p := range listPartsInfo.Parts { + listPartsInfo.Parts[i].ETag = tryDecryptETag(objectEncryptionKey, p.ETag, kind != crypto.S3) + size, err := sio.DecryptedSize(uint64(p.Size)) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + listPartsInfo.Parts[i].Size = int64(size) + } + } + + response := generateListPartsResponse(listPartsInfo, encodingType) + encodedSuccessResponse := encodeResponse(response) + + // Write success response. + writeSuccessResponseXML(w, encodedSuccessResponse) +} + +type whiteSpaceWriter struct { + http.ResponseWriter + http.Flusher + written bool +} + +func (w *whiteSpaceWriter) Write(b []byte) (n int, err error) { + n, err = w.ResponseWriter.Write(b) + w.written = true + return +} + +func (w *whiteSpaceWriter) WriteHeader(statusCode int) { + if !w.written { + w.ResponseWriter.WriteHeader(statusCode) + } +} + +// Send empty whitespaces every 10 seconds to the client till completeMultiPartUpload() is +// done so that the client does not time out. Downside is we might send 200 OK and +// then send error XML. But accoording to S3 spec the client is supposed to check +// for error XML even if it received 200 OK. But for erasure this is not a problem +// as completeMultiPartUpload() is quick. Even For FS, it would not be an issue as +// we do background append as and when the parts arrive and completeMultiPartUpload +// is quick. Only in a rare case where parts would be out of order will +// FS:completeMultiPartUpload() take a longer time. +func sendWhiteSpace(ctx context.Context, w http.ResponseWriter) <-chan bool { + doneCh := make(chan bool) + go func() { + defer close(doneCh) + ticker := time.NewTicker(time.Second * 10) + defer ticker.Stop() + headerWritten := false + for { + select { + case <-ticker.C: + // Write header if not written yet. + if !headerWritten { + _, err := w.Write([]byte(xml.Header)) + headerWritten = err == nil + } + + // Once header is written keep writing empty spaces + // which are ignored by client SDK XML parsers. + // This occurs when server takes long time to completeMultiPartUpload() + _, err := w.Write([]byte(" ")) + if err != nil { + return + } + w.(http.Flusher).Flush() + case doneCh <- headerWritten: + return + case <-ctx.Done(): + return + } + } + }() + return doneCh +} diff --git a/cmd/storage-datatypes.go b/cmd/storage-datatypes.go index b421b0894..b12028664 100644 --- a/cmd/storage-datatypes.go +++ b/cmd/storage-datatypes.go @@ -276,3 +276,24 @@ func newFileInfo(object string, dataBlocks, parityBlocks int) (fi FileInfo) { } return fi } + +// ReadMultipleReq contains information of multiple files to read from disk. +type ReadMultipleReq struct { + Bucket string // Bucket. Can be empty if multiple buckets. + Prefix string // Shared prefix of all files. Can be empty. Will be joined to filename without modification. + Files []string // Individual files to read. + MaxSize int64 // Return error if size is exceed. + MetadataOnly bool // Read as XL meta and truncate data. + AbortOn404 bool // Stop reading after first file not found. +} + +// ReadMultipleResp contains a single response from a ReadMultipleReq. +type ReadMultipleResp struct { + Bucket string // Bucket as given by request. + Prefix string // Prefix as given by request. + File string // File name as given in request. + Exists bool // Returns whether the file existed on disk. + Error string // Returns any error when reading. + Data []byte // Contains all data of file. + Modtime time.Time // Modtime of file on disk. +} diff --git a/cmd/storage-datatypes_gen.go b/cmd/storage-datatypes_gen.go index 3ce8fde35..92125605f 100644 --- a/cmd/storage-datatypes_gen.go +++ b/cmd/storage-datatypes_gen.go @@ -1678,6 +1678,527 @@ func (z *RawFileInfo) Msgsize() (s int) { return } +// DecodeMsg implements msgp.Decodable +func (z *ReadMultipleReq) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "Bucket": + z.Bucket, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Bucket") + return + } + case "Prefix": + z.Prefix, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Prefix") + return + } + case "Files": + var zb0002 uint32 + zb0002, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, "Files") + return + } + if cap(z.Files) >= int(zb0002) { + z.Files = (z.Files)[:zb0002] + } else { + z.Files = make([]string, zb0002) + } + for za0001 := range z.Files { + z.Files[za0001], err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Files", za0001) + return + } + } + case "MaxSize": + z.MaxSize, err = dc.ReadInt64() + if err != nil { + err = msgp.WrapError(err, "MaxSize") + return + } + case "MetadataOnly": + z.MetadataOnly, err = dc.ReadBool() + if err != nil { + err = msgp.WrapError(err, "MetadataOnly") + return + } + case "AbortOn404": + z.AbortOn404, err = dc.ReadBool() + if err != nil { + err = msgp.WrapError(err, "AbortOn404") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *ReadMultipleReq) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 6 + // write "Bucket" + err = en.Append(0x86, 0xa6, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74) + if err != nil { + return + } + err = en.WriteString(z.Bucket) + if err != nil { + err = msgp.WrapError(err, "Bucket") + return + } + // write "Prefix" + err = en.Append(0xa6, 0x50, 0x72, 0x65, 0x66, 0x69, 0x78) + if err != nil { + return + } + err = en.WriteString(z.Prefix) + if err != nil { + err = msgp.WrapError(err, "Prefix") + return + } + // write "Files" + err = en.Append(0xa5, 0x46, 0x69, 0x6c, 0x65, 0x73) + if err != nil { + return + } + err = en.WriteArrayHeader(uint32(len(z.Files))) + if err != nil { + err = msgp.WrapError(err, "Files") + return + } + for za0001 := range z.Files { + err = en.WriteString(z.Files[za0001]) + if err != nil { + err = msgp.WrapError(err, "Files", za0001) + return + } + } + // write "MaxSize" + err = en.Append(0xa7, 0x4d, 0x61, 0x78, 0x53, 0x69, 0x7a, 0x65) + if err != nil { + return + } + err = en.WriteInt64(z.MaxSize) + if err != nil { + err = msgp.WrapError(err, "MaxSize") + return + } + // write "MetadataOnly" + err = en.Append(0xac, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x4f, 0x6e, 0x6c, 0x79) + if err != nil { + return + } + err = en.WriteBool(z.MetadataOnly) + if err != nil { + err = msgp.WrapError(err, "MetadataOnly") + return + } + // write "AbortOn404" + err = en.Append(0xaa, 0x41, 0x62, 0x6f, 0x72, 0x74, 0x4f, 0x6e, 0x34, 0x30, 0x34) + if err != nil { + return + } + err = en.WriteBool(z.AbortOn404) + if err != nil { + err = msgp.WrapError(err, "AbortOn404") + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *ReadMultipleReq) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 6 + // string "Bucket" + o = append(o, 0x86, 0xa6, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74) + o = msgp.AppendString(o, z.Bucket) + // string "Prefix" + o = append(o, 0xa6, 0x50, 0x72, 0x65, 0x66, 0x69, 0x78) + o = msgp.AppendString(o, z.Prefix) + // string "Files" + o = append(o, 0xa5, 0x46, 0x69, 0x6c, 0x65, 0x73) + o = msgp.AppendArrayHeader(o, uint32(len(z.Files))) + for za0001 := range z.Files { + o = msgp.AppendString(o, z.Files[za0001]) + } + // string "MaxSize" + o = append(o, 0xa7, 0x4d, 0x61, 0x78, 0x53, 0x69, 0x7a, 0x65) + o = msgp.AppendInt64(o, z.MaxSize) + // string "MetadataOnly" + o = append(o, 0xac, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x4f, 0x6e, 0x6c, 0x79) + o = msgp.AppendBool(o, z.MetadataOnly) + // string "AbortOn404" + o = append(o, 0xaa, 0x41, 0x62, 0x6f, 0x72, 0x74, 0x4f, 0x6e, 0x34, 0x30, 0x34) + o = msgp.AppendBool(o, z.AbortOn404) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *ReadMultipleReq) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "Bucket": + z.Bucket, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Bucket") + return + } + case "Prefix": + z.Prefix, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Prefix") + return + } + case "Files": + var zb0002 uint32 + zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Files") + return + } + if cap(z.Files) >= int(zb0002) { + z.Files = (z.Files)[:zb0002] + } else { + z.Files = make([]string, zb0002) + } + for za0001 := range z.Files { + z.Files[za0001], bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Files", za0001) + return + } + } + case "MaxSize": + z.MaxSize, bts, err = msgp.ReadInt64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "MaxSize") + return + } + case "MetadataOnly": + z.MetadataOnly, bts, err = msgp.ReadBoolBytes(bts) + if err != nil { + err = msgp.WrapError(err, "MetadataOnly") + return + } + case "AbortOn404": + z.AbortOn404, bts, err = msgp.ReadBoolBytes(bts) + if err != nil { + err = msgp.WrapError(err, "AbortOn404") + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *ReadMultipleReq) Msgsize() (s int) { + s = 1 + 7 + msgp.StringPrefixSize + len(z.Bucket) + 7 + msgp.StringPrefixSize + len(z.Prefix) + 6 + msgp.ArrayHeaderSize + for za0001 := range z.Files { + s += msgp.StringPrefixSize + len(z.Files[za0001]) + } + s += 8 + msgp.Int64Size + 13 + msgp.BoolSize + 11 + msgp.BoolSize + return +} + +// DecodeMsg implements msgp.Decodable +func (z *ReadMultipleResp) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "Bucket": + z.Bucket, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Bucket") + return + } + case "Prefix": + z.Prefix, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Prefix") + return + } + case "File": + z.File, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "File") + return + } + case "Exists": + z.Exists, err = dc.ReadBool() + if err != nil { + err = msgp.WrapError(err, "Exists") + return + } + case "Error": + z.Error, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Error") + return + } + case "Data": + z.Data, err = dc.ReadBytes(z.Data) + if err != nil { + err = msgp.WrapError(err, "Data") + return + } + case "Modtime": + z.Modtime, err = dc.ReadTime() + if err != nil { + err = msgp.WrapError(err, "Modtime") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *ReadMultipleResp) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 7 + // write "Bucket" + err = en.Append(0x87, 0xa6, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74) + if err != nil { + return + } + err = en.WriteString(z.Bucket) + if err != nil { + err = msgp.WrapError(err, "Bucket") + return + } + // write "Prefix" + err = en.Append(0xa6, 0x50, 0x72, 0x65, 0x66, 0x69, 0x78) + if err != nil { + return + } + err = en.WriteString(z.Prefix) + if err != nil { + err = msgp.WrapError(err, "Prefix") + return + } + // write "File" + err = en.Append(0xa4, 0x46, 0x69, 0x6c, 0x65) + if err != nil { + return + } + err = en.WriteString(z.File) + if err != nil { + err = msgp.WrapError(err, "File") + return + } + // write "Exists" + err = en.Append(0xa6, 0x45, 0x78, 0x69, 0x73, 0x74, 0x73) + if err != nil { + return + } + err = en.WriteBool(z.Exists) + if err != nil { + err = msgp.WrapError(err, "Exists") + return + } + // write "Error" + err = en.Append(0xa5, 0x45, 0x72, 0x72, 0x6f, 0x72) + if err != nil { + return + } + err = en.WriteString(z.Error) + if err != nil { + err = msgp.WrapError(err, "Error") + return + } + // write "Data" + err = en.Append(0xa4, 0x44, 0x61, 0x74, 0x61) + if err != nil { + return + } + err = en.WriteBytes(z.Data) + if err != nil { + err = msgp.WrapError(err, "Data") + return + } + // write "Modtime" + err = en.Append(0xa7, 0x4d, 0x6f, 0x64, 0x74, 0x69, 0x6d, 0x65) + if err != nil { + return + } + err = en.WriteTime(z.Modtime) + if err != nil { + err = msgp.WrapError(err, "Modtime") + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *ReadMultipleResp) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 7 + // string "Bucket" + o = append(o, 0x87, 0xa6, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74) + o = msgp.AppendString(o, z.Bucket) + // string "Prefix" + o = append(o, 0xa6, 0x50, 0x72, 0x65, 0x66, 0x69, 0x78) + o = msgp.AppendString(o, z.Prefix) + // string "File" + o = append(o, 0xa4, 0x46, 0x69, 0x6c, 0x65) + o = msgp.AppendString(o, z.File) + // string "Exists" + o = append(o, 0xa6, 0x45, 0x78, 0x69, 0x73, 0x74, 0x73) + o = msgp.AppendBool(o, z.Exists) + // string "Error" + o = append(o, 0xa5, 0x45, 0x72, 0x72, 0x6f, 0x72) + o = msgp.AppendString(o, z.Error) + // string "Data" + o = append(o, 0xa4, 0x44, 0x61, 0x74, 0x61) + o = msgp.AppendBytes(o, z.Data) + // string "Modtime" + o = append(o, 0xa7, 0x4d, 0x6f, 0x64, 0x74, 0x69, 0x6d, 0x65) + o = msgp.AppendTime(o, z.Modtime) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *ReadMultipleResp) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "Bucket": + z.Bucket, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Bucket") + return + } + case "Prefix": + z.Prefix, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Prefix") + return + } + case "File": + z.File, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "File") + return + } + case "Exists": + z.Exists, bts, err = msgp.ReadBoolBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Exists") + return + } + case "Error": + z.Error, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Error") + return + } + case "Data": + z.Data, bts, err = msgp.ReadBytesBytes(bts, z.Data) + if err != nil { + err = msgp.WrapError(err, "Data") + return + } + case "Modtime": + z.Modtime, bts, err = msgp.ReadTimeBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Modtime") + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *ReadMultipleResp) Msgsize() (s int) { + s = 1 + 7 + msgp.StringPrefixSize + len(z.Bucket) + 7 + msgp.StringPrefixSize + len(z.Prefix) + 5 + msgp.StringPrefixSize + len(z.File) + 7 + msgp.BoolSize + 6 + msgp.StringPrefixSize + len(z.Error) + 5 + msgp.BytesPrefixSize + len(z.Data) + 8 + msgp.TimeSize + return +} + // DecodeMsg implements msgp.Decodable func (z *VolInfo) DecodeMsg(dc *msgp.Reader) (err error) { var zb0001 uint32 diff --git a/cmd/storage-datatypes_gen_test.go b/cmd/storage-datatypes_gen_test.go index a6b5f9343..ffc08f690 100644 --- a/cmd/storage-datatypes_gen_test.go +++ b/cmd/storage-datatypes_gen_test.go @@ -687,6 +687,232 @@ func BenchmarkDecodeRawFileInfo(b *testing.B) { } } +func TestMarshalUnmarshalReadMultipleReq(t *testing.T) { + v := ReadMultipleReq{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgReadMultipleReq(b *testing.B) { + v := ReadMultipleReq{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgReadMultipleReq(b *testing.B) { + v := ReadMultipleReq{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalReadMultipleReq(b *testing.B) { + v := ReadMultipleReq{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeReadMultipleReq(t *testing.T) { + v := ReadMultipleReq{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodeReadMultipleReq Msgsize() is inaccurate") + } + + vn := ReadMultipleReq{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeReadMultipleReq(b *testing.B) { + v := ReadMultipleReq{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeReadMultipleReq(b *testing.B) { + v := ReadMultipleReq{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} + +func TestMarshalUnmarshalReadMultipleResp(t *testing.T) { + v := ReadMultipleResp{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgReadMultipleResp(b *testing.B) { + v := ReadMultipleResp{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgReadMultipleResp(b *testing.B) { + v := ReadMultipleResp{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalReadMultipleResp(b *testing.B) { + v := ReadMultipleResp{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeReadMultipleResp(t *testing.T) { + v := ReadMultipleResp{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodeReadMultipleResp Msgsize() is inaccurate") + } + + vn := ReadMultipleResp{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeReadMultipleResp(b *testing.B) { + v := ReadMultipleResp{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeReadMultipleResp(b *testing.B) { + v := ReadMultipleResp{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} + func TestMarshalUnmarshalVolInfo(t *testing.T) { v := VolInfo{} bts, err := v.MarshalMsg(nil) diff --git a/cmd/storage-interface.go b/cmd/storage-interface.go index 901864c7c..2fee13043 100644 --- a/cmd/storage-interface.go +++ b/cmd/storage-interface.go @@ -98,6 +98,7 @@ type StorageAPI interface { Delete(ctx context.Context, volume string, path string, deleteOpts DeleteOptions) (err error) VerifyFile(ctx context.Context, volume, path string, fi FileInfo) error StatInfoFile(ctx context.Context, volume, path string, glob bool) (stat []StatInfo, err error) + ReadMultiple(ctx context.Context, req ReadMultipleReq, resp chan<- ReadMultipleResp) error // Write all data, syncs the data to disk. // Should be used for smaller payloads. @@ -273,3 +274,8 @@ func (p *unrecognizedDisk) ReadAll(ctx context.Context, volume string, path stri func (p *unrecognizedDisk) StatInfoFile(ctx context.Context, volume, path string, glob bool) (stat []StatInfo, err error) { return nil, errDiskNotFound } + +func (p *unrecognizedDisk) ReadMultiple(ctx context.Context, req ReadMultipleReq, resp chan<- ReadMultipleResp) error { + close(resp) + return errDiskNotFound +} diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index 2880bfbe2..c72c78913 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -229,7 +229,10 @@ func (client *storageRESTClient) NSScanner(ctx context.Context, cache dataUsageC rr.CloseWithError(err) return cache, err } - updates <- update + select { + case <-ctx.Done(): + case updates <- update: + } } var newCache dataUsageCache err = newCache.DecodeMsg(ms) @@ -718,6 +721,46 @@ func (client *storageRESTClient) StatInfoFile(ctx context.Context, volume, path return stat, err } +// ReadMultiple will read multiple files and send each back as response. +// Files are read and returned in the given order. +// The resp channel is closed before the call returns. +// Only a canceled context or network errors returns an error. +func (client *storageRESTClient) ReadMultiple(ctx context.Context, req ReadMultipleReq, resp chan<- ReadMultipleResp) error { + defer close(resp) + body, err := req.MarshalMsg(nil) + if err != nil { + return err + } + respBody, err := client.call(ctx, storageRESTMethodReadMultiple, nil, bytes.NewReader(body), int64(len(body))) + if err != nil { + return err + } + defer xhttp.DrainBody(respBody) + if err != nil { + return err + } + pr, pw := io.Pipe() + go func() { + pw.CloseWithError(waitForHTTPStream(respBody, pw)) + }() + mr := msgp.NewReader(pr) + for { + var file ReadMultipleResp + if err := file.DecodeMsg(mr); err != nil { + if errors.Is(err, io.EOF) { + err = nil + } + pr.CloseWithError(err) + return err + } + select { + case <-ctx.Done(): + return ctx.Err() + case resp <- file: + } + } +} + // Close - marks the client as closed. func (client *storageRESTClient) Close() error { client.restClient.Close() diff --git a/cmd/storage-rest-common.go b/cmd/storage-rest-common.go index dd857e35a..fe71face8 100644 --- a/cmd/storage-rest-common.go +++ b/cmd/storage-rest-common.go @@ -18,7 +18,7 @@ package cmd const ( - storageRESTVersion = "v46" // Added MinIO version to FileInfo + storageRESTVersion = "v47" // Added ReadMultiple storageRESTVersionPrefix = SlashSeparator + storageRESTVersion storageRESTPrefix = minioReservedBucketPath + "/storage" ) @@ -53,6 +53,7 @@ const ( storageRESTMethodVerifyFile = "/verifyfile" storageRESTMethodWalkDir = "/walkdir" storageRESTMethodStatInfoFile = "/statfile" + storageRESTMethodReadMultiple = "/readmultiple" ) const ( diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index cf48da2dd..ca82efbd4 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -1244,6 +1244,47 @@ func (s *storageRESTServer) StatInfoFile(w http.ResponseWriter, r *http.Request) } } +// ReadMultiple returns multiple files +func (s *storageRESTServer) ReadMultiple(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + return + } + rw := streamHTTPResponse(w) + defer func() { + if r := recover(); r != nil { + debug.PrintStack() + rw.CloseWithError(fmt.Errorf("panic: %v", r)) + } + }() + + var req ReadMultipleReq + mr := msgpNewReader(r.Body) + err := req.DecodeMsg(mr) + if err != nil { + rw.CloseWithError(err) + return + } + + mw := msgp.NewWriter(rw) + responses := make(chan ReadMultipleResp, len(req.Files)) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for resp := range responses { + err := resp.EncodeMsg(mw) + if err != nil { + rw.CloseWithError(err) + return + } + mw.Flush() + } + }() + err = s.storage.ReadMultiple(r.Context(), req, responses) + wg.Wait() + rw.CloseWithError(err) +} + // registerStorageRPCRouter - register storage rpc router. func registerStorageRESTHandlers(router *mux.Router, endpointServerPools EndpointServerPools) { storageDisks := make([][]*xlStorage, len(endpointServerPools)) @@ -1315,6 +1356,7 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerPools Endpoin subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodVerifyFile).HandlerFunc(httpTraceHdrs(server.VerifyFileHandler)) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodWalkDir).HandlerFunc(httpTraceHdrs(server.WalkDirHandler)) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodStatInfoFile).HandlerFunc(httpTraceHdrs(server.StatInfoFile)) + subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadMultiple).HandlerFunc(httpTraceHdrs(server.ReadMultiple)) } } } diff --git a/cmd/storagemetric_string.go b/cmd/storagemetric_string.go index 71f196d77..a01cf54d4 100644 --- a/cmd/storagemetric_string.go +++ b/cmd/storagemetric_string.go @@ -33,12 +33,13 @@ func _() { _ = x[storageMetricReadXL-22] _ = x[storageMetricReadAll-23] _ = x[storageMetricStatInfoFile-24] - _ = x[storageMetricLast-25] + _ = x[storageMetricReadMultiple-25] + _ = x[storageMetricLast-26] } -const _storageMetric_name = "MakeVolBulkMakeVolListVolsStatVolDeleteVolWalkDirListDirReadFileAppendFileCreateFileReadFileStreamRenameFileRenameDataCheckPartsDeleteDeleteVersionsVerifyFileWriteAllDeleteVersionWriteMetadataUpdateMetadataReadVersionReadXLReadAllStatInfoFileLast" +const _storageMetric_name = "MakeVolBulkMakeVolListVolsStatVolDeleteVolWalkDirListDirReadFileAppendFileCreateFileReadFileStreamRenameFileRenameDataCheckPartsDeleteDeleteVersionsVerifyFileWriteAllDeleteVersionWriteMetadataUpdateMetadataReadVersionReadXLReadAllStatInfoFileReadMultipleLast" -var _storageMetric_index = [...]uint8{0, 11, 18, 26, 33, 42, 49, 56, 64, 74, 84, 98, 108, 118, 128, 134, 148, 158, 166, 179, 192, 206, 217, 223, 230, 242, 246} +var _storageMetric_index = [...]uint16{0, 11, 18, 26, 33, 42, 49, 56, 64, 74, 84, 98, 108, 118, 128, 134, 148, 158, 166, 179, 192, 206, 217, 223, 230, 242, 254, 258} func (i storageMetric) String() string { if i >= storageMetric(len(_storageMetric_index)-1) { diff --git a/cmd/xl-storage-disk-id-check.go b/cmd/xl-storage-disk-id-check.go index 6b6266866..b96dbb083 100644 --- a/cmd/xl-storage-disk-id-check.go +++ b/cmd/xl-storage-disk-id-check.go @@ -64,6 +64,7 @@ const ( storageMetricReadXL storageMetricReadAll storageMetricStatInfoFile + storageMetricReadMultiple // .... add more @@ -510,6 +511,21 @@ func (p *xlStorageDiskIDCheck) StatInfoFile(ctx context.Context, volume, path st return p.storage.StatInfoFile(ctx, volume, path, glob) } +// ReadMultiple will read multiple files and send each back as response. +// Files are read and returned in the given order. +// The resp channel is closed before the call returns. +// Only a canceled context will return an error. +func (p *xlStorageDiskIDCheck) ReadMultiple(ctx context.Context, req ReadMultipleReq, resp chan<- ReadMultipleResp) error { + ctx, done, err := p.TrackDiskHealth(ctx, storageMetricReadMultiple, req.Bucket, req.Prefix) + if err != nil { + close(resp) + return err + } + defer done(&err) + + return p.storage.ReadMultiple(ctx, req, resp) +} + func storageTrace(s storageMetric, startTime time.Time, duration time.Duration, path string) madmin.TraceInfo { return madmin.TraceInfo{ TraceType: madmin.TraceStorage, diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 8d18654a2..6bbe82289 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -2565,6 +2565,66 @@ func (s *xlStorage) VerifyFile(ctx context.Context, volume, path string, fi File return nil } +// ReadMultiple will read multiple files and send each back as response. +// Files are read and returned in the given order. +// The resp channel is closed before the call returns. +// Only a canceled context will return an error. +func (s *xlStorage) ReadMultiple(ctx context.Context, req ReadMultipleReq, resp chan<- ReadMultipleResp) error { + defer close(resp) + + volumeDir := pathJoin(s.diskPath, req.Bucket) + + for _, f := range req.Files { + if contextCanceled(ctx) { + return ctx.Err() + } + r := ReadMultipleResp{ + Bucket: req.Bucket, + Prefix: req.Prefix, + File: f, + } + var data []byte + var mt time.Time + var err error + fullPath := pathJoin(volumeDir, req.Prefix, f) + if req.MetadataOnly { + data, mt, err = s.readMetadataWithDMTime(ctx, fullPath) + } else { + data, mt, err = s.readAllData(ctx, volumeDir, fullPath) + } + + if err != nil { + if !IsErr(err, errFileNotFound, errVolumeNotFound) { + r.Exists = true + r.Error = err.Error() + } + select { + case <-ctx.Done(): + return ctx.Err() + case resp <- r: + } + if req.AbortOn404 && !r.Exists { + // We stop at first file not found. + // We have already reported the error, return nil. + return nil + } + continue + } + diskHealthCheckOK(ctx, nil) + if req.MaxSize > 0 && int64(len(data)) > req.MaxSize { + r.Exists = true + r.Error = fmt.Sprintf("max size (%d) exceeded: %d", req.MaxSize, len(data)) + resp <- r + continue + } + r.Exists = true + r.Data = data + r.Modtime = mt + resp <- r + } + return nil +} + func (s *xlStorage) StatInfoFile(ctx context.Context, volume, path string, glob bool) (stat []StatInfo, err error) { volumeDir, err := s.getVolDir(volume) if err != nil {