diff --git a/cmd/erasure-common.go b/cmd/erasure-common.go index 990736dd9..16c82a27e 100644 --- a/cmd/erasure-common.go +++ b/cmd/erasure-common.go @@ -19,7 +19,11 @@ package cmd import ( "context" + "fmt" "sync" + + "github.com/minio/minio/internal/logger" + "github.com/minio/minio/internal/sync/errgroup" ) func (er erasureObjects) getOnlineDisks() (newDisks []StorageAPI) { @@ -119,3 +123,84 @@ func (er erasureObjects) getLoadBalancedDisks(optimized bool) []StorageAPI { // Return disks which have maximum disk usage common. return newDisks[max] } + +// readMultipleFiles Reads raw data from all specified files from all disks. +func readMultipleFiles(ctx context.Context, disks []StorageAPI, req ReadMultipleReq, readQuorum int) ([]ReadMultipleResp, error) { + resps := make([]chan ReadMultipleResp, len(disks)) + for i := range resps { + resps[i] = make(chan ReadMultipleResp, len(req.Files)) + } + g := errgroup.WithNErrs(len(disks)) + // Read files in parallel across disks. + for index := range disks { + index := index + g.Go(func() (err error) { + if disks[index] == nil { + return errDiskNotFound + } + return disks[index].ReadMultiple(ctx, req, resps[index]) + }, index) + } + + dataArray := make([]ReadMultipleResp, 0, len(req.Files)) + // Merge results. They should come in order from each. + for _, wantFile := range req.Files { + quorum := 0 + toAdd := ReadMultipleResp{ + Bucket: req.Bucket, + Prefix: req.Prefix, + File: wantFile, + } + for i := range resps { + if disks[i] == nil { + continue + } + select { + case <-ctx.Done(): + case gotFile, ok := <-resps[i]: + if !ok { + continue + } + if gotFile.Error != "" || !gotFile.Exists { + continue + } + if gotFile.File != wantFile || gotFile.Bucket != req.Bucket || gotFile.Prefix != req.Prefix { + continue + } + quorum++ + if toAdd.Modtime.After(gotFile.Modtime) || len(gotFile.Data) < len(toAdd.Data) { + // Pick latest, or largest to avoid possible truncated entries. + continue + } + toAdd = gotFile + } + } + if quorum < readQuorum { + toAdd.Exists = false + toAdd.Error = errErasureReadQuorum.Error() + toAdd.Data = nil + } + dataArray = append(dataArray, toAdd) + } + + errs := g.Wait() + for index, err := range errs { + if err == nil { + continue + } + if !IsErr(err, []error{ + errFileNotFound, + errVolumeNotFound, + errFileVersionNotFound, + errDiskNotFound, + errUnformattedDisk, + }...) { + logger.LogOnceIf(ctx, fmt.Errorf("Drive %s, path (%s/%s) returned an error (%w)", + disks[index], req.Bucket, req.Prefix, err), + disks[index].String()) + } + } + + // Return all the metadata. + return dataArray, nil +} diff --git a/cmd/erasure-healing_test.go b/cmd/erasure-healing_test.go index 5c1449ad4..ed84a0bc0 100644 --- a/cmd/erasure-healing_test.go +++ b/cmd/erasure-healing_test.go @@ -580,7 +580,7 @@ func TestHealCorrectQuorum(t *testing.T) { _, err = objLayer.CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, ObjectOptions{}) if err != nil { - t.Fatalf("Failed to complete multipart upload - %v", err) + t.Fatalf("Failed to complete multipart upload - got: %v", err) } cfgFile := pathJoin(bucketMetaPrefix, bucket, ".test.bin") diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index 2d2ef6e31..54aa892cf 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -76,6 +76,30 @@ func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object return err } +// Removes part.meta given by partName belonging to a mulitpart upload from minioMetaBucket +func (er erasureObjects) removePartMeta(bucket, object, uploadID, dataDir string, partNumber int) { + uploadIDPath := er.getUploadIDDir(bucket, object, uploadID) + curpartPath := pathJoin(uploadIDPath, dataDir, fmt.Sprintf("part.%d", partNumber)) + storageDisks := er.getDisks() + + g := errgroup.WithNErrs(len(storageDisks)) + for index, disk := range storageDisks { + if disk == nil { + continue + } + index := index + g.Go(func() error { + _ = storageDisks[index].Delete(context.TODO(), minioMetaMultipartBucket, curpartPath+".meta", DeleteOptions{ + Recursive: false, + Force: false, + }) + + return nil + }, index) + } + g.Wait() +} + // Removes part given by partName belonging to a mulitpart upload from minioMetaBucket func (er erasureObjects) removeObjectPart(bucket, object, uploadID, dataDir string, partNumber int) { uploadIDPath := er.getUploadIDDir(bucket, object, uploadID) @@ -96,6 +120,11 @@ func (er erasureObjects) removeObjectPart(bucket, object, uploadID, dataDir stri Recursive: false, Force: false, }) + _ = storageDisks[index].Delete(context.TODO(), minioMetaMultipartBucket, curpartPath+".meta", DeleteOptions{ + Recursive: false, + Force: false, + }) + return nil }, index) } @@ -453,6 +482,47 @@ func renamePart(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, ds return evalDisks(disks, errs), err } +// writeAllDisks - writes 'b' to all provided disks. +// If write cannot reach quorum, the files will be deleted from all disks. +func writeAllDisks(ctx context.Context, disks []StorageAPI, dstBucket, dstEntry string, b []byte, writeQuorum int) ([]StorageAPI, error) { + g := errgroup.WithNErrs(len(disks)) + + // Write file to all underlying storage disks. + for index := range disks { + index := index + g.Go(func() error { + if disks[index] == nil { + return errDiskNotFound + } + return disks[index].WriteAll(ctx, dstBucket, dstEntry, b) + }, index) + } + + // Wait for all renames to finish. + errs := g.Wait() + + // We can safely allow RenameFile errors up to len(er.getDisks()) - writeQuorum + // otherwise return failure. Cleanup successful renames. + err := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum) + if err == errErasureWriteQuorum { + // Remove all written + g := errgroup.WithNErrs(len(disks)) + for index := range disks { + if disks[index] == nil || errs[index] != nil { + continue + } + index := index + g.Go(func() error { + return disks[index].Delete(ctx, dstBucket, dstEntry, DeleteOptions{Force: true}) + }, index) + } + // Ignore these errors. + g.WaitErr() + } + + return evalDisks(disks, errs), err +} + // PutObjectPart - reads incoming stream and internally erasure codes // them. This call is similar to single put operation but it is part // of the multipart transaction. @@ -479,11 +549,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo return PartInfo{}, err } rctx := rlkctx.Context() - defer func() { - if uploadIDRLock != nil { - uploadIDRLock.RUnlock(rlkctx.Cancel) - } - }() + defer uploadIDRLock.RUnlock(rlkctx.Cancel) data := r.Reader // Validate input data size and it can never be less than zero. @@ -507,10 +573,6 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo partsMetadata, errs = readAllFileInfo(rctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "", false) - // Unlock upload id locks before, so others can get it. - uploadIDRLock.RUnlock(rlkctx.Cancel) - uploadIDRLock = nil - // get Quorum for this object _, writeQuorum, err := objectQuorumFromMeta(pctx, partsMetadata, errs, er.defaultParityCount) if err != nil { @@ -617,50 +679,21 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo } } - // Acquire write lock to update metadata. - uploadIDWLock := er.NewNSLock(bucket, pathJoin(object, uploadID)) - wlkctx, err := uploadIDWLock.GetLock(pctx, globalOperationTimeout) - if err != nil { - return PartInfo{}, err - } - wctx := wlkctx.Context() - defer uploadIDWLock.Unlock(wlkctx.Cancel) - - // Validates if upload ID exists. - if err = er.checkUploadIDExists(wctx, bucket, object, uploadID); err != nil { - return pi, toObjectErr(err, bucket, object, uploadID) - } - // Rename temporary part file to its final location. partPath := pathJoin(uploadIDPath, fi.DataDir, partSuffix) - onlineDisks, err = renamePart(wctx, onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, writeQuorum) + onlineDisks, err = renamePart(ctx, onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, writeQuorum) if err != nil { return pi, toObjectErr(err, minioMetaMultipartBucket, partPath) } - // Read metadata again because it might be updated with parallel upload of another part. - partsMetadata, errs = readAllFileInfo(wctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, "", false) - reducedErr = reduceWriteQuorumErrs(wctx, errs, objectOpIgnoredErrs, writeQuorum) - if reducedErr == errErasureWriteQuorum { - return pi, toObjectErr(reducedErr, bucket, object) - } - - // Get current highest version based on re-read partsMetadata. - onlineDisks, modTime = listOnlineDisks(onlineDisks, partsMetadata, errs) - - // Pick one from the first valid metadata. - fi, err = pickValidFileInfo(wctx, partsMetadata, modTime, writeQuorum) - if err != nil { - return pi, err - } - - // Once part is successfully committed, proceed with updating erasure metadata. - fi.ModTime = UTCNow() - md5hex := r.MD5CurrentHexString() if opts.PreserveETag != "" { md5hex = opts.PreserveETag } + + // Once part is successfully committed, proceed with saving erasure metadata for part. + fi.ModTime = UTCNow() + var index []byte if opts.IndexCB != nil { index = opts.IndexCB() @@ -669,27 +702,18 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo // Add the current part. fi.AddObjectPart(partID, md5hex, n, data.ActualSize(), index) - for i, disk := range onlineDisks { - if disk == OfflineDisk { - continue - } - partsMetadata[i].Size = fi.Size - partsMetadata[i].ModTime = fi.ModTime - partsMetadata[i].Parts = fi.Parts - partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{ - PartNumber: partID, - Algorithm: DefaultBitrotAlgorithm, - Hash: bitrotWriterSum(writers[i]), - }) + // Save part info as partPath+".meta" + fiMsg, err := fi.MarshalMsg(nil) + if err != nil { + return pi, toObjectErr(err, minioMetaMultipartBucket, partPath) } - // Writes update `xl.meta` format for each disk. - if _, err = writeUniqueFileInfo(wctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum); err != nil { - return pi, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath) + // Write part metadata to all disks. + onlineDisks, err = writeAllDisks(ctx, onlineDisks, minioMetaMultipartBucket, partPath+".meta", fiMsg, writeQuorum) + if err != nil { + return pi, toObjectErr(err, minioMetaMultipartBucket, partPath) } - online = countOnlineDisks(onlineDisks) - // Return success. return PartInfo{ PartNumber: partID, @@ -704,6 +728,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo // by callers to verify object states // - encrypted // - compressed +// Does not contain currently uploaded parts by design. func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (MultipartInfo, error) { auditObjectErasureSet(ctx, object, &er) @@ -795,7 +820,7 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up return result, toObjectErr(reducedErr, minioMetaMultipartBucket, uploadIDPath) } - _, modTime := listOnlineDisks(storageDisks, partsMetadata, errs) + onlineDisks, modTime := listOnlineDisks(storageDisks, partsMetadata, errs) // Pick one from the first valid metadata. fi, err := pickValidFileInfo(ctx, partsMetadata, modTime, writeQuorum) @@ -803,6 +828,24 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up return result, err } + // Read Part info for all parts + partPath := pathJoin(uploadIDPath, fi.DataDir) + "/" + req := ReadMultipleReq{ + Bucket: minioMetaMultipartBucket, + Prefix: partPath, + MaxSize: 1 << 20, // Each part should realistically not be > 1MiB. + } + + // Parts are 1 based, so index 0 is part one, etc. + for i := 1; i <= maxPartsList; i++ { + req.Files = append(req.Files, fmt.Sprintf("part.%d.meta", i)) + } + + partInfoFiles, err := readMultipleFiles(ctx, onlineDisks, req, writeQuorum) + if err != nil { + return result, err + } + // Populate the result stub. result.Bucket = bucket result.Object = object @@ -812,7 +855,7 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up result.UserDefined = cloneMSS(fi.Metadata) // For empty number of parts or maxParts as zero, return right here. - if len(fi.Parts) == 0 || maxParts == 0 { + if len(partInfoFiles) == 0 || maxParts == 0 { return result, nil } @@ -821,6 +864,28 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up maxParts = maxPartsList } + var partFI FileInfo + for i, part := range partInfoFiles { + if part.Error != "" || !part.Exists { + continue + } + _, err := partFI.UnmarshalMsg(part.Data) + if err != nil { + // Maybe crash or similar. + logger.LogIf(ctx, err) + continue + } + + if len(partFI.Parts) != 1 { + logger.LogIf(ctx, fmt.Errorf("unexpected part count: %d", len(partFI.Parts))) + continue + } + + addPart := partFI.Parts[0] + // Add the current part. + fi.AddObjectPart(i+1, addPart.ETag, addPart.Size, addPart.ActualSize, addPart.Index) + } + // Only parts with higher part numbers will be listed. partIdx := objectPartIndex(fi.Parts, partNumberMarker) parts := fi.Parts @@ -860,17 +925,17 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket string, object string, uploadID string, parts []CompletePart, opts ObjectOptions) (oi ObjectInfo, err error) { auditObjectErasureSet(ctx, object, &er) - // Hold read-locks to verify uploaded parts, also disallows - // parallel part uploads as well. + // Hold write locks to verify uploaded parts, also disallows any + // parallel PutObjectPart() requests. uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID)) - rlkctx, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout) + wlkctx, err := uploadIDLock.GetLock(ctx, globalOperationTimeout) if err != nil { return oi, err } - rctx := rlkctx.Context() - defer uploadIDLock.RUnlock(rlkctx.Cancel) + wctx := wlkctx.Context() + defer uploadIDLock.Unlock(wlkctx.Cancel) - if err = er.checkUploadIDExists(rctx, bucket, object, uploadID); err != nil { + if err = er.checkUploadIDExists(wctx, bucket, object, uploadID); err != nil { return oi, toObjectErr(err, bucket, object, uploadID) } @@ -879,15 +944,15 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str storageDisks := er.getDisks() // Read metadata associated with the object from all disks. - partsMetadata, errs := readAllFileInfo(rctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "", false) + partsMetadata, errs := readAllFileInfo(wctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "", false) // get Quorum for this object - _, writeQuorum, err := objectQuorumFromMeta(rctx, partsMetadata, errs, er.defaultParityCount) + _, writeQuorum, err := objectQuorumFromMeta(wctx, partsMetadata, errs, er.defaultParityCount) if err != nil { return oi, toObjectErr(err, bucket, object) } - reducedErr := reduceWriteQuorumErrs(rctx, errs, objectOpIgnoredErrs, writeQuorum) + reducedErr := reduceWriteQuorumErrs(wctx, errs, objectOpIgnoredErrs, writeQuorum) if reducedErr == errErasureWriteQuorum { return oi, toObjectErr(reducedErr, bucket, object) } @@ -895,11 +960,59 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str onlineDisks, modTime := listOnlineDisks(storageDisks, partsMetadata, errs) // Pick one from the first valid metadata. - fi, err := pickValidFileInfo(rctx, partsMetadata, modTime, writeQuorum) + fi, err := pickValidFileInfo(wctx, partsMetadata, modTime, writeQuorum) if err != nil { return oi, err } + // Read Part info for all parts + partPath := pathJoin(uploadIDPath, fi.DataDir) + "/" + req := ReadMultipleReq{ + Bucket: minioMetaMultipartBucket, + Prefix: partPath, + MaxSize: 1 << 20, // Each part should realistically not be > 1MiB. + } + for _, part := range parts { + req.Files = append(req.Files, fmt.Sprintf("part.%d.meta", part.PartNumber)) + } + partInfoFiles, err := readMultipleFiles(ctx, onlineDisks, req, writeQuorum) + if err != nil { + return oi, err + } + if len(partInfoFiles) != len(parts) { + // Should only happen through internal error + err := fmt.Errorf("unexpected part result count: %d, want %d", len(partInfoFiles), len(parts)) + logger.LogIf(ctx, err) + return oi, toObjectErr(err, bucket, object) + } + + var partFI FileInfo + for i, part := range partInfoFiles { + partID := parts[i].PartNumber + if part.Error != "" || !part.Exists { + return oi, InvalidPart{ + PartNumber: partID, + } + } + _, err := partFI.UnmarshalMsg(part.Data) + if err != nil { + // Maybe crash or similar. + logger.LogIf(ctx, err) + return oi, InvalidPart{ + PartNumber: partID, + } + } + if len(partFI.Parts) != 1 { + logger.LogIf(ctx, fmt.Errorf("unexpected part count: %d", len(partFI.Parts))) + return oi, InvalidPart{ + PartNumber: partID, + } + } + addPart := partFI.Parts[0] + // Add the current part. + fi.AddObjectPart(partID, addPart.ETag, addPart.Size, addPart.ActualSize, addPart.Index) + } + // Calculate full object size. var objectSize int64 @@ -939,7 +1052,7 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str return oi, invp } - // All parts except the last part has to be atleast 5MB. + // All parts except the last part has to be at least 5MB. if (i < len(parts)-1) && !isMinAllowedPartSize(currentFI.Parts[partIdx].ActualSize) { return oi, PartTooSmall{ PartNumber: part.PartNumber, @@ -1018,6 +1131,11 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str } } + // Remove part.meta which is not needed anymore. + for _, part := range fi.Parts { + er.removePartMeta(bucket, object, uploadID, fi.DataDir, part.Number) + } + // Rename the multipart object to final location. if onlineDisks, err = renameData(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, bucket, object, writeQuorum); err != nil { diff --git a/cmd/naughty-disk_test.go b/cmd/naughty-disk_test.go index 29df634f9..fed294a62 100644 --- a/cmd/naughty-disk_test.go +++ b/cmd/naughty-disk_test.go @@ -301,3 +301,11 @@ func (d *naughtyDisk) StatInfoFile(ctx context.Context, volume, path string, glo } return d.disk.StatInfoFile(ctx, volume, path, glob) } + +func (d *naughtyDisk) ReadMultiple(ctx context.Context, req ReadMultipleReq, resp chan<- ReadMultipleResp) error { + if err := d.calcError(); err != nil { + close(resp) + return err + } + return d.disk.ReadMultiple(ctx, req, resp) +} diff --git a/cmd/object-api-multipart_test.go b/cmd/object-api-multipart_test.go index e18fd1dba..1ae9bbf94 100644 --- a/cmd/object-api-multipart_test.go +++ b/cmd/object-api-multipart_test.go @@ -26,6 +26,7 @@ import ( "testing" humanize "github.com/dustin/go-humanize" + "github.com/minio/minio/internal/config/storageclass" "github.com/minio/minio/internal/hash" ) @@ -1181,6 +1182,15 @@ func testListObjectPartsDiskNotFound(obj ObjectLayer, instanceType string, disks objectNames := []string{"minio-object-1.txt"} uploadIDs := []string{} + globalStorageClass = storageclass.Config{ + RRS: storageclass.StorageClass{ + Parity: 2, + }, + Standard: storageclass.StorageClass{ + Parity: 4, + }, + } + // bucketnames[0]. // objectNames[0]. // uploadIds [0]. diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index bd12cead6..45cab9471 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -18,10 +18,8 @@ package cmd import ( - "bufio" "context" "encoding/hex" - "encoding/xml" "errors" "fmt" "io" @@ -29,7 +27,6 @@ import ( "net/http/httptest" "net/url" "os" - "sort" "strconv" "strings" "sync" @@ -2218,151 +2215,6 @@ func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *h writeSuccessResponseHeadersOnly(w) } -// Multipart objectAPIHandlers - -// NewMultipartUploadHandler - New multipart upload. -// Notice: The S3 client can send secret keys in headers for encryption related jobs, -// the handler should ensure to remove these keys before sending them to the object layer. -// Currently these keys are: -// - X-Amz-Server-Side-Encryption-Customer-Key -// - X-Amz-Copy-Source-Server-Side-Encryption-Customer-Key -func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r *http.Request) { - ctx := newContext(r, w, "NewMultipartUpload") - - defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) - - objectAPI := api.ObjectAPI() - if objectAPI == nil { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL) - return - } - - if crypto.Requested(r.Header) { - if globalIsGateway { - if crypto.SSEC.IsRequested(r.Header) && !objectAPI.IsEncryptionSupported() { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL) - return - } - } else { - if !objectAPI.IsEncryptionSupported() { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL) - return - } - } - } - - vars := mux.Vars(r) - bucket := vars["bucket"] - object, err := unescapePath(vars["object"]) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - - if s3Error := checkRequestAuthType(ctx, r, policy.PutObjectAction, bucket, object); s3Error != ErrNone { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) - return - } - - // Check if bucket encryption is enabled - sseConfig, _ := globalBucketSSEConfigSys.Get(bucket) - sseConfig.Apply(r.Header, sse.ApplyOptions{ - AutoEncrypt: globalAutoEncryption, - Passthrough: globalIsGateway && globalGatewayName == S3BackendGateway, - }) - - // Validate storage class metadata if present - if sc := r.Header.Get(xhttp.AmzStorageClass); sc != "" { - if !storageclass.IsValid(sc) { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidStorageClass), r.URL) - return - } - } - - encMetadata := map[string]string{} - - if objectAPI.IsEncryptionSupported() { - if crypto.Requested(r.Header) { - if err = setEncryptionMetadata(r, bucket, object, encMetadata); err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - // Set this for multipart only operations, we need to differentiate during - // decryption if the file was actually multipart or not. - encMetadata[ReservedMetadataPrefix+"Encrypted-Multipart"] = "" - } - } - - // Extract metadata that needs to be saved. - metadata, err := extractMetadata(ctx, r) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - - retPerms := isPutActionAllowed(ctx, getRequestAuthType(r), bucket, object, r, iampolicy.PutObjectRetentionAction) - holdPerms := isPutActionAllowed(ctx, getRequestAuthType(r), bucket, object, r, iampolicy.PutObjectLegalHoldAction) - - getObjectInfo := objectAPI.GetObjectInfo - if api.CacheAPI() != nil { - getObjectInfo = api.CacheAPI().GetObjectInfo - } - - retentionMode, retentionDate, legalHold, s3Err := checkPutObjectLockAllowed(ctx, r, bucket, object, getObjectInfo, retPerms, holdPerms) - if s3Err == ErrNone && retentionMode.Valid() { - metadata[strings.ToLower(xhttp.AmzObjectLockMode)] = string(retentionMode) - metadata[strings.ToLower(xhttp.AmzObjectLockRetainUntilDate)] = retentionDate.UTC().Format(iso8601TimeFormat) - } - if s3Err == ErrNone && legalHold.Status.Valid() { - metadata[strings.ToLower(xhttp.AmzObjectLockLegalHold)] = string(legalHold.Status) - } - if s3Err != ErrNone { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL) - return - } - if dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(ObjectInfo{ - UserDefined: metadata, - }, replication.ObjectReplicationType, ObjectOptions{})); dsc.ReplicateAny() { - metadata[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano) - metadata[ReservedMetadataPrefixLower+ReplicationStatus] = dsc.PendingStatus() - } - // We need to preserve the encryption headers set in EncryptRequest, - // so we do not want to override them, copy them instead. - for k, v := range encMetadata { - metadata[k] = v - } - - // Ensure that metadata does not contain sensitive information - crypto.RemoveSensitiveEntries(metadata) - - if objectAPI.IsCompressionSupported() && isCompressible(r.Header, object) { - // Storing the compression metadata. - metadata[ReservedMetadataPrefix+"compression"] = compressionAlgorithmV2 - } - - opts, err := putOpts(ctx, r, bucket, object, metadata) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - newMultipartUpload := objectAPI.NewMultipartUpload - if api.CacheAPI() != nil { - newMultipartUpload = api.CacheAPI().NewMultipartUpload - } - - uploadID, err := newMultipartUpload(ctx, bucket, object, opts) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - - response := generateInitiateMultipartUploadResponse(bucket, object, uploadID) - encodedSuccessResponse := encodeResponse(response) - - // Write success response. - writeSuccessResponseXML(w, encodedSuccessResponse) -} - // CopyObjectPartHandler - uploads a part by copying data from an existing object as data source. func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *http.Request) { ctx := newContext(r, w, "CopyObjectPart") @@ -2701,710 +2553,6 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt writeSuccessResponseXML(w, encodedSuccessResponse) } -// PutObjectPartHandler - uploads an incoming part for an ongoing multipart operation. -func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http.Request) { - ctx := newContext(r, w, "PutObjectPart") - - defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) - - objectAPI := api.ObjectAPI() - if objectAPI == nil { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL) - return - } - - if crypto.Requested(r.Header) { - if globalIsGateway { - if crypto.SSEC.IsRequested(r.Header) && !objectAPI.IsEncryptionSupported() { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL) - return - } - } else { - if !objectAPI.IsEncryptionSupported() { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL) - return - } - } - } - - vars := mux.Vars(r) - bucket := vars["bucket"] - object, err := unescapePath(vars["object"]) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - - // X-Amz-Copy-Source shouldn't be set for this call. - if _, ok := r.Header[xhttp.AmzCopySource]; ok { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidCopySource), r.URL) - return - } - - clientETag, err := etag.FromContentMD5(r.Header) - if err != nil { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidDigest), r.URL) - return - } - - // if Content-Length is unknown/missing, throw away - size := r.ContentLength - - rAuthType := getRequestAuthType(r) - // For auth type streaming signature, we need to gather a different content length. - if rAuthType == authTypeStreamingSigned { - if sizeStr, ok := r.Header[xhttp.AmzDecodedContentLength]; ok { - if sizeStr[0] == "" { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMissingContentLength), r.URL) - return - } - size, err = strconv.ParseInt(sizeStr[0], 10, 64) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - } - } - if size == -1 { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMissingContentLength), r.URL) - return - } - - // maximum Upload size for multipart objects in a single operation - if isMaxAllowedPartSize(size) { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrEntityTooLarge), r.URL) - return - } - - uploadID := r.Form.Get(xhttp.UploadID) - partIDString := r.Form.Get(xhttp.PartNumber) - - partID, err := strconv.Atoi(partIDString) - if err != nil { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidPart), r.URL) - return - } - - // check partID with maximum part ID for multipart objects - if isMaxPartID(partID) { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidMaxParts), r.URL) - return - } - - var ( - md5hex = clientETag.String() - sha256hex = "" - reader io.Reader = r.Body - s3Error APIErrorCode - ) - if s3Error = isPutActionAllowed(ctx, rAuthType, bucket, object, r, iampolicy.PutObjectAction); s3Error != ErrNone { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) - return - } - - switch rAuthType { - case authTypeStreamingSigned: - // Initialize stream signature verifier. - reader, s3Error = newSignV4ChunkedReader(r) - if s3Error != ErrNone { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) - return - } - case authTypeSignedV2, authTypePresignedV2: - if s3Error = isReqAuthenticatedV2(r); s3Error != ErrNone { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) - return - } - case authTypePresigned, authTypeSigned: - if s3Error = reqSignatureV4Verify(r, globalSite.Region, serviceS3); s3Error != ErrNone { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) - return - } - - if !skipContentSha256Cksum(r) { - sha256hex = getContentSha256Cksum(r, serviceS3) - } - } - - if err := enforceBucketQuotaHard(ctx, bucket, size); err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - - actualSize := size - - // get encryption options - var opts ObjectOptions - if crypto.SSEC.IsRequested(r.Header) { - opts, err = getOpts(ctx, r, bucket, object) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - } - - mi, err := objectAPI.GetMultipartInfo(ctx, bucket, object, uploadID, opts) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - - // Read compression metadata preserved in the init multipart for the decision. - _, isCompressed := mi.UserDefined[ReservedMetadataPrefix+"compression"] - - var idxCb func() []byte - if objectAPI.IsCompressionSupported() && isCompressed { - actualReader, err := hash.NewReader(reader, size, md5hex, sha256hex, actualSize) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - - // Set compression metrics. - wantEncryption := objectAPI.IsEncryptionSupported() && crypto.Requested(r.Header) - s2c, cb := newS2CompressReader(actualReader, actualSize, wantEncryption) - idxCb = cb - defer s2c.Close() - reader = etag.Wrap(s2c, actualReader) - size = -1 // Since compressed size is un-predictable. - md5hex = "" // Do not try to verify the content. - sha256hex = "" - } - - hashReader, err := hash.NewReader(reader, size, md5hex, sha256hex, actualSize) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - rawReader := hashReader - pReader := NewPutObjReader(rawReader) - - _, isEncrypted := crypto.IsEncrypted(mi.UserDefined) - var objectEncryptionKey crypto.ObjectKey - if objectAPI.IsEncryptionSupported() && isEncrypted { - if !crypto.SSEC.IsRequested(r.Header) && crypto.SSEC.IsEncrypted(mi.UserDefined) { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrSSEMultipartEncrypted), r.URL) - return - } - - opts, err = putOpts(ctx, r, bucket, object, mi.UserDefined) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - - var key []byte - if crypto.SSEC.IsRequested(r.Header) { - key, err = ParseSSECustomerRequest(r) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - } - - // Calculating object encryption key - key, err = decryptObjectInfo(key, bucket, object, mi.UserDefined) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - copy(objectEncryptionKey[:], key) - - partEncryptionKey := objectEncryptionKey.DerivePartKey(uint32(partID)) - in := io.Reader(hashReader) - if size > encryptBufferThreshold { - // The encryption reads in blocks of 64KB. - // We add a buffer on bigger files to reduce the number of syscalls upstream. - in = bufio.NewReaderSize(hashReader, encryptBufferSize) - } - reader, err = sio.EncryptReader(in, sio.Config{Key: partEncryptionKey[:], CipherSuites: fips.DARECiphers()}) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - wantSize := int64(-1) - if size >= 0 { - info := ObjectInfo{Size: size} - wantSize = info.EncryptedSize() - } - // do not try to verify encrypted content - hashReader, err = hash.NewReader(etag.Wrap(reader, hashReader), wantSize, "", "", actualSize) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - pReader, err = pReader.WithEncryption(hashReader, &objectEncryptionKey) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - if idxCb != nil { - idxCb = compressionIndexEncrypter(objectEncryptionKey, idxCb) - } - } - opts.IndexCB = idxCb - - putObjectPart := objectAPI.PutObjectPart - if api.CacheAPI() != nil { - putObjectPart = api.CacheAPI().PutObjectPart - } - - partInfo, err := putObjectPart(ctx, bucket, object, uploadID, partID, pReader, opts) - if err != nil { - // Verify if the underlying error is signature mismatch. - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - - etag := partInfo.ETag - if kind, encrypted := crypto.IsEncrypted(mi.UserDefined); encrypted { - switch kind { - case crypto.S3KMS: - w.Header().Set(xhttp.AmzServerSideEncryption, xhttp.AmzEncryptionKMS) - w.Header().Set(xhttp.AmzServerSideEncryptionKmsID, mi.KMSKeyID()) - if kmsCtx, ok := mi.UserDefined[crypto.MetaContext]; ok { - w.Header().Set(xhttp.AmzServerSideEncryptionKmsContext, kmsCtx) - } - if len(etag) >= 32 && strings.Count(etag, "-") != 1 { - etag = etag[len(etag)-32:] - } - case crypto.S3: - w.Header().Set(xhttp.AmzServerSideEncryption, xhttp.AmzEncryptionAES) - etag, _ = DecryptETag(objectEncryptionKey, ObjectInfo{ETag: etag}) - case crypto.SSEC: - w.Header().Set(xhttp.AmzServerSideEncryptionCustomerAlgorithm, r.Header.Get(xhttp.AmzServerSideEncryptionCustomerAlgorithm)) - w.Header().Set(xhttp.AmzServerSideEncryptionCustomerKeyMD5, r.Header.Get(xhttp.AmzServerSideEncryptionCustomerKeyMD5)) - - if len(etag) >= 32 && strings.Count(etag, "-") != 1 { - etag = etag[len(etag)-32:] - } - } - } - - // We must not use the http.Header().Set method here because some (broken) - // clients expect the ETag header key to be literally "ETag" - not "Etag" (case-sensitive). - // Therefore, we have to set the ETag directly as map entry. - w.Header()[xhttp.ETag] = []string{"\"" + etag + "\""} - - writeSuccessResponseHeadersOnly(w) -} - -// AbortMultipartUploadHandler - Abort multipart upload -func (api objectAPIHandlers) AbortMultipartUploadHandler(w http.ResponseWriter, r *http.Request) { - ctx := newContext(r, w, "AbortMultipartUpload") - - defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) - - vars := mux.Vars(r) - bucket := vars["bucket"] - object, err := unescapePath(vars["object"]) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - - objectAPI := api.ObjectAPI() - if objectAPI == nil { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL) - return - } - abortMultipartUpload := objectAPI.AbortMultipartUpload - if api.CacheAPI() != nil { - abortMultipartUpload = api.CacheAPI().AbortMultipartUpload - } - - if s3Error := checkRequestAuthType(ctx, r, policy.AbortMultipartUploadAction, bucket, object); s3Error != ErrNone { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) - return - } - - uploadID, _, _, _, s3Error := getObjectResources(r.Form) - if s3Error != ErrNone { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) - return - } - opts := ObjectOptions{} - if err := abortMultipartUpload(ctx, bucket, object, uploadID, opts); err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - - writeSuccessNoContent(w) -} - -// ListObjectPartsHandler - List object parts -func (api objectAPIHandlers) ListObjectPartsHandler(w http.ResponseWriter, r *http.Request) { - ctx := newContext(r, w, "ListObjectParts") - - defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) - - vars := mux.Vars(r) - bucket := vars["bucket"] - object, err := unescapePath(vars["object"]) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - - objectAPI := api.ObjectAPI() - if objectAPI == nil { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL) - return - } - - if s3Error := checkRequestAuthType(ctx, r, policy.ListMultipartUploadPartsAction, bucket, object); s3Error != ErrNone { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) - return - } - - uploadID, partNumberMarker, maxParts, encodingType, s3Error := getObjectResources(r.Form) - if s3Error != ErrNone { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) - return - } - if partNumberMarker < 0 { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidPartNumberMarker), r.URL) - return - } - if maxParts < 0 { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidMaxParts), r.URL) - return - } - - opts := ObjectOptions{} - listPartsInfo, err := objectAPI.ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxParts, opts) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - - // We have to adjust the size of encrypted parts since encrypted parts - // are slightly larger due to encryption overhead. - // Further, we have to adjust the ETags of parts when using SSE-S3. - // Due to AWS S3, SSE-S3 encrypted parts return the plaintext ETag - // being the content MD5 of that particular part. This is not the - // case for SSE-C and SSE-KMS objects. - if kind, ok := crypto.IsEncrypted(listPartsInfo.UserDefined); ok && objectAPI.IsEncryptionSupported() { - var objectEncryptionKey []byte - if kind == crypto.S3 { - objectEncryptionKey, err = decryptObjectInfo(nil, bucket, object, listPartsInfo.UserDefined) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - } - for i, p := range listPartsInfo.Parts { - listPartsInfo.Parts[i].ETag = tryDecryptETag(objectEncryptionKey, p.ETag, kind != crypto.S3) - size, err := sio.DecryptedSize(uint64(p.Size)) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - listPartsInfo.Parts[i].Size = int64(size) - } - } - - response := generateListPartsResponse(listPartsInfo, encodingType) - encodedSuccessResponse := encodeResponse(response) - - // Write success response. - writeSuccessResponseXML(w, encodedSuccessResponse) -} - -type whiteSpaceWriter struct { - http.ResponseWriter - http.Flusher - written bool -} - -func (w *whiteSpaceWriter) Write(b []byte) (n int, err error) { - n, err = w.ResponseWriter.Write(b) - w.written = true - return -} - -func (w *whiteSpaceWriter) WriteHeader(statusCode int) { - if !w.written { - w.ResponseWriter.WriteHeader(statusCode) - } -} - -// Send empty whitespaces every 10 seconds to the client till completeMultiPartUpload() is -// done so that the client does not time out. Downside is we might send 200 OK and -// then send error XML. But accoording to S3 spec the client is supposed to check -// for error XML even if it received 200 OK. But for erasure this is not a problem -// as completeMultiPartUpload() is quick. Even For FS, it would not be an issue as -// we do background append as and when the parts arrive and completeMultiPartUpload -// is quick. Only in a rare case where parts would be out of order will -// FS:completeMultiPartUpload() take a longer time. -func sendWhiteSpace(ctx context.Context, w http.ResponseWriter) <-chan bool { - doneCh := make(chan bool) - go func() { - defer close(doneCh) - ticker := time.NewTicker(time.Second * 10) - defer ticker.Stop() - headerWritten := false - for { - select { - case <-ticker.C: - // Write header if not written yet. - if !headerWritten { - _, err := w.Write([]byte(xml.Header)) - headerWritten = err == nil - } - - // Once header is written keep writing empty spaces - // which are ignored by client SDK XML parsers. - // This occurs when server takes long time to completeMultiPartUpload() - _, err := w.Write([]byte(" ")) - if err != nil { - return - } - w.(http.Flusher).Flush() - case doneCh <- headerWritten: - return - case <-ctx.Done(): - return - } - } - }() - return doneCh -} - -// CompleteMultipartUploadHandler - Complete multipart upload. -func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.Request) { - ctx := newContext(r, w, "CompleteMultipartUpload") - - defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) - - vars := mux.Vars(r) - bucket := vars["bucket"] - object, err := unescapePath(vars["object"]) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - - objectAPI := api.ObjectAPI() - if objectAPI == nil { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL) - return - } - - if s3Error := checkRequestAuthType(ctx, r, policy.PutObjectAction, bucket, object); s3Error != ErrNone { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) - return - } - - // Content-Length is required and should be non-zero - if r.ContentLength <= 0 { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMissingContentLength), r.URL) - return - } - - // Get upload id. - uploadID, _, _, _, s3Error := getObjectResources(r.Form) - if s3Error != ErrNone { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) - return - } - - complMultipartUpload := &CompleteMultipartUpload{} - if err = xmlDecoder(r.Body, complMultipartUpload, r.ContentLength); err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - if len(complMultipartUpload.Parts) == 0 { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMalformedXML), r.URL) - return - } - if !sort.IsSorted(CompletedParts(complMultipartUpload.Parts)) { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidPartOrder), r.URL) - return - } - - // Reject retention or governance headers if set, CompleteMultipartUpload spec - // does not use these headers, and should not be passed down to checkPutObjectLockAllowed - if objectlock.IsObjectLockRequested(r.Header) || objectlock.IsObjectLockGovernanceBypassSet(r.Header) { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidRequest), r.URL) - return - } - - if _, _, _, s3Err := checkPutObjectLockAllowed(ctx, r, bucket, object, objectAPI.GetObjectInfo, ErrNone, ErrNone); s3Err != ErrNone { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL) - return - } - - completeMultiPartUpload := objectAPI.CompleteMultipartUpload - if api.CacheAPI() != nil { - completeMultiPartUpload = api.CacheAPI().CompleteMultipartUpload - } - // This code is specifically to handle the requirements for slow - // complete multipart upload operations on FS mode. - writeErrorResponseWithoutXMLHeader := func(ctx context.Context, w http.ResponseWriter, err APIError, reqURL *url.URL) { - switch err.Code { - case "SlowDown", "XMinioServerNotInitialized", "XMinioReadQuorum", "XMinioWriteQuorum": - // Set retxry-after header to indicate user-agents to retry request after 120secs. - // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After - w.Header().Set(xhttp.RetryAfter, "120") - } - - // Generate error response. - errorResponse := getAPIErrorResponse(ctx, err, reqURL.Path, - w.Header().Get(xhttp.AmzRequestID), globalDeploymentID) - encodedErrorResponse, _ := xml.Marshal(errorResponse) - setCommonHeaders(w) - w.Header().Set(xhttp.ContentType, string(mimeXML)) - w.Write(encodedErrorResponse) - } - - versioned := globalBucketVersioningSys.PrefixEnabled(bucket, object) - suspended := globalBucketVersioningSys.PrefixSuspended(bucket, object) - os := newObjSweeper(bucket, object).WithVersioning(versioned, suspended) - if !globalTierConfigMgr.Empty() { - // Get appropriate object info to identify the remote object to delete - goiOpts := os.GetOpts() - if goi, gerr := objectAPI.GetObjectInfo(ctx, bucket, object, goiOpts); gerr == nil { - os.SetTransitionState(goi.TransitionedObject) - } - } - - setEventStreamHeaders(w) - - opts, err := completeMultipartOpts(ctx, r, bucket, object) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - - // First, we compute the ETag of the multipart object. - // The ETag of a multi-part object is always: - // ETag := MD5(ETag_p1, ETag_p2, ...)+"-N" (N being the number of parts) - // - // This is independent of encryption. An encrypted multipart - // object also has an ETag that is the MD5 of its part ETags. - // The fact the in case of encryption the ETag of a part is - // not the MD5 of the part content does not change that. - var completeETags []etag.ETag - for _, part := range complMultipartUpload.Parts { - ETag, err := etag.Parse(part.ETag) - if err != nil { - continue - } - completeETags = append(completeETags, ETag) - } - multipartETag := etag.Multipart(completeETags...) - opts.UserDefined["etag"] = multipartETag.String() - - // However, in case of encryption, the persisted part ETags don't match - // what we have sent to the client during PutObjectPart. The reason is - // that ETags are encrypted. Hence, the client will send a list of complete - // part ETags of which non can match the ETag of any part. For example - // ETag (client): 30902184f4e62dd8f98f0aaff810c626 - // ETag (server-internal): 20000f00ce5dc16e3f3b124f586ae1d88e9caa1c598415c2759bbb50e84a59f630902184f4e62dd8f98f0aaff810c626 - // - // Therefore, we adjust all ETags sent by the client to match what is stored - // on the backend. - if objectAPI.IsEncryptionSupported() { - mi, err := objectAPI.GetMultipartInfo(ctx, bucket, object, uploadID, ObjectOptions{}) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - - if _, ok := crypto.IsEncrypted(mi.UserDefined); ok { - const MaxParts = 10000 - listPartsInfo, err := objectAPI.ListObjectParts(ctx, bucket, object, uploadID, 0, MaxParts, ObjectOptions{}) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - sort.Slice(listPartsInfo.Parts, func(i, j int) bool { - return listPartsInfo.Parts[i].PartNumber < listPartsInfo.Parts[j].PartNumber - }) - sort.Slice(complMultipartUpload.Parts, func(i, j int) bool { - return complMultipartUpload.Parts[i].PartNumber < complMultipartUpload.Parts[j].PartNumber - }) - for i := range listPartsInfo.Parts { - for j := range complMultipartUpload.Parts { - if listPartsInfo.Parts[i].PartNumber == complMultipartUpload.Parts[j].PartNumber { - complMultipartUpload.Parts[j].ETag = listPartsInfo.Parts[i].ETag - continue - } - } - } - } - } - - w = &whiteSpaceWriter{ResponseWriter: w, Flusher: w.(http.Flusher)} - completeDoneCh := sendWhiteSpace(ctx, w) - objInfo, err := completeMultiPartUpload(ctx, bucket, object, uploadID, complMultipartUpload.Parts, opts) - // Stop writing white spaces to the client. Note that close(doneCh) style is not used as it - // can cause white space to be written after we send XML response in a race condition. - headerWritten := <-completeDoneCh - if err != nil { - if headerWritten { - writeErrorResponseWithoutXMLHeader(ctx, w, toAPIError(ctx, err), r.URL) - } else { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - } - return - } - - // Get object location. - location := getObjectLocation(r, globalDomainNames, bucket, object) - // Generate complete multipart response. - response := generateCompleteMultpartUploadResponse(bucket, object, location, objInfo.ETag) - var encodedSuccessResponse []byte - if !headerWritten { - encodedSuccessResponse = encodeResponse(response) - } else { - encodedSuccessResponse, err = xml.Marshal(response) - if err != nil { - writeErrorResponseWithoutXMLHeader(ctx, w, toAPIError(ctx, err), r.URL) - return - } - } - - if r.Header.Get(xMinIOExtract) == "true" && strings.HasSuffix(object, archiveExt) { - opts := ObjectOptions{VersionID: objInfo.VersionID, MTime: objInfo.ModTime} - if _, err := updateObjectMetadataWithZipInfo(ctx, objectAPI, bucket, object, opts); err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - } - - setPutObjHeaders(w, objInfo, false) - if dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(objInfo, replication.ObjectReplicationType, opts)); dsc.ReplicateAny() { - scheduleReplication(ctx, objInfo.Clone(), objectAPI, dsc, replication.ObjectReplicationType) - } - if _, ok := r.Header[xhttp.MinIOSourceReplicationRequest]; ok { - actualSize, _ := objInfo.GetActualSize() - defer globalReplicationStats.UpdateReplicaStat(bucket, actualSize) - } - - // Write success response. - writeSuccessResponseXML(w, encodedSuccessResponse) - - // Notify object created event. - sendEvent(eventArgs{ - EventName: event.ObjectCreatedCompleteMultipartUpload, - BucketName: bucket, - Object: objInfo, - ReqParams: extractReqParams(r), - RespElements: extractRespElements(w), - UserAgent: r.UserAgent(), - Host: handlers.GetSourceIP(r), - }) - - // Remove the transitioned object whose object version is being overwritten. - if !globalTierConfigMgr.Empty() { - // Schedule object for immediate transition if eligible. - enqueueTransitionImmediate(objInfo) - logger.LogIf(ctx, os.Sweep()) - } -} - // Delete objectAPIHandlers // DeleteObjectHandler - delete an object diff --git a/cmd/object-multipart-handlers.go b/cmd/object-multipart-handlers.go new file mode 100644 index 000000000..ab9ffd00c --- /dev/null +++ b/cmd/object-multipart-handlers.go @@ -0,0 +1,897 @@ +// Copyright (c) 2015-2022 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cmd + +import ( + "bufio" + "context" + "encoding/xml" + "io" + "net/http" + "net/url" + "sort" + "strconv" + "strings" + "time" + + "github.com/gorilla/mux" + sse "github.com/minio/minio/internal/bucket/encryption" + objectlock "github.com/minio/minio/internal/bucket/object/lock" + "github.com/minio/minio/internal/bucket/replication" + "github.com/minio/minio/internal/config/storageclass" + "github.com/minio/minio/internal/crypto" + "github.com/minio/minio/internal/etag" + "github.com/minio/minio/internal/event" + "github.com/minio/minio/internal/fips" + "github.com/minio/minio/internal/handlers" + "github.com/minio/minio/internal/hash" + xhttp "github.com/minio/minio/internal/http" + "github.com/minio/minio/internal/logger" + "github.com/minio/pkg/bucket/policy" + iampolicy "github.com/minio/pkg/iam/policy" + "github.com/minio/sio" +) + +// Multipart objectAPIHandlers + +// NewMultipartUploadHandler - New multipart upload. +// Notice: The S3 client can send secret keys in headers for encryption related jobs, +// the handler should ensure to remove these keys before sending them to the object layer. +// Currently these keys are: +// - X-Amz-Server-Side-Encryption-Customer-Key +// - X-Amz-Copy-Source-Server-Side-Encryption-Customer-Key +func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "NewMultipartUpload") + + defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) + + objectAPI := api.ObjectAPI() + if objectAPI == nil { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL) + return + } + + if crypto.Requested(r.Header) { + if globalIsGateway { + if crypto.SSEC.IsRequested(r.Header) && !objectAPI.IsEncryptionSupported() { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL) + return + } + } else { + if !objectAPI.IsEncryptionSupported() { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL) + return + } + } + } + + vars := mux.Vars(r) + bucket := vars["bucket"] + object, err := unescapePath(vars["object"]) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + if s3Error := checkRequestAuthType(ctx, r, policy.PutObjectAction, bucket, object); s3Error != ErrNone { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) + return + } + + // Check if bucket encryption is enabled + sseConfig, _ := globalBucketSSEConfigSys.Get(bucket) + sseConfig.Apply(r.Header, sse.ApplyOptions{ + AutoEncrypt: globalAutoEncryption, + Passthrough: globalIsGateway && globalGatewayName == S3BackendGateway, + }) + + // Validate storage class metadata if present + if sc := r.Header.Get(xhttp.AmzStorageClass); sc != "" { + if !storageclass.IsValid(sc) { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidStorageClass), r.URL) + return + } + } + + encMetadata := map[string]string{} + + if objectAPI.IsEncryptionSupported() { + if crypto.Requested(r.Header) { + if err = setEncryptionMetadata(r, bucket, object, encMetadata); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + // Set this for multipart only operations, we need to differentiate during + // decryption if the file was actually multipart or not. + encMetadata[ReservedMetadataPrefix+"Encrypted-Multipart"] = "" + } + } + + // Extract metadata that needs to be saved. + metadata, err := extractMetadata(ctx, r) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + retPerms := isPutActionAllowed(ctx, getRequestAuthType(r), bucket, object, r, iampolicy.PutObjectRetentionAction) + holdPerms := isPutActionAllowed(ctx, getRequestAuthType(r), bucket, object, r, iampolicy.PutObjectLegalHoldAction) + + getObjectInfo := objectAPI.GetObjectInfo + if api.CacheAPI() != nil { + getObjectInfo = api.CacheAPI().GetObjectInfo + } + + retentionMode, retentionDate, legalHold, s3Err := checkPutObjectLockAllowed(ctx, r, bucket, object, getObjectInfo, retPerms, holdPerms) + if s3Err == ErrNone && retentionMode.Valid() { + metadata[strings.ToLower(xhttp.AmzObjectLockMode)] = string(retentionMode) + metadata[strings.ToLower(xhttp.AmzObjectLockRetainUntilDate)] = retentionDate.UTC().Format(iso8601TimeFormat) + } + if s3Err == ErrNone && legalHold.Status.Valid() { + metadata[strings.ToLower(xhttp.AmzObjectLockLegalHold)] = string(legalHold.Status) + } + if s3Err != ErrNone { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL) + return + } + if dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(ObjectInfo{ + UserDefined: metadata, + }, replication.ObjectReplicationType, ObjectOptions{})); dsc.ReplicateAny() { + metadata[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano) + metadata[ReservedMetadataPrefixLower+ReplicationStatus] = dsc.PendingStatus() + } + // We need to preserve the encryption headers set in EncryptRequest, + // so we do not want to override them, copy them instead. + for k, v := range encMetadata { + metadata[k] = v + } + + // Ensure that metadata does not contain sensitive information + crypto.RemoveSensitiveEntries(metadata) + + if objectAPI.IsCompressionSupported() && isCompressible(r.Header, object) { + // Storing the compression metadata. + metadata[ReservedMetadataPrefix+"compression"] = compressionAlgorithmV2 + } + + opts, err := putOpts(ctx, r, bucket, object, metadata) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + newMultipartUpload := objectAPI.NewMultipartUpload + if api.CacheAPI() != nil { + newMultipartUpload = api.CacheAPI().NewMultipartUpload + } + + uploadID, err := newMultipartUpload(ctx, bucket, object, opts) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + response := generateInitiateMultipartUploadResponse(bucket, object, uploadID) + encodedSuccessResponse := encodeResponse(response) + + // Write success response. + writeSuccessResponseXML(w, encodedSuccessResponse) +} + +// PutObjectPartHandler - uploads an incoming part for an ongoing multipart operation. +func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "PutObjectPart") + + defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) + + objectAPI := api.ObjectAPI() + if objectAPI == nil { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL) + return + } + + if crypto.Requested(r.Header) { + if globalIsGateway { + if crypto.SSEC.IsRequested(r.Header) && !objectAPI.IsEncryptionSupported() { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL) + return + } + } else { + if !objectAPI.IsEncryptionSupported() { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL) + return + } + } + } + + vars := mux.Vars(r) + bucket := vars["bucket"] + object, err := unescapePath(vars["object"]) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + // X-Amz-Copy-Source shouldn't be set for this call. + if _, ok := r.Header[xhttp.AmzCopySource]; ok { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidCopySource), r.URL) + return + } + + clientETag, err := etag.FromContentMD5(r.Header) + if err != nil { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidDigest), r.URL) + return + } + + // if Content-Length is unknown/missing, throw away + size := r.ContentLength + + rAuthType := getRequestAuthType(r) + // For auth type streaming signature, we need to gather a different content length. + if rAuthType == authTypeStreamingSigned { + if sizeStr, ok := r.Header[xhttp.AmzDecodedContentLength]; ok { + if sizeStr[0] == "" { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMissingContentLength), r.URL) + return + } + size, err = strconv.ParseInt(sizeStr[0], 10, 64) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + } + } + if size == -1 { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMissingContentLength), r.URL) + return + } + + // maximum Upload size for multipart objects in a single operation + if isMaxAllowedPartSize(size) { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrEntityTooLarge), r.URL) + return + } + + uploadID := r.Form.Get(xhttp.UploadID) + partIDString := r.Form.Get(xhttp.PartNumber) + + partID, err := strconv.Atoi(partIDString) + if err != nil { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidPart), r.URL) + return + } + + // check partID with maximum part ID for multipart objects + if isMaxPartID(partID) { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidMaxParts), r.URL) + return + } + + var ( + md5hex = clientETag.String() + sha256hex = "" + reader io.Reader = r.Body + s3Error APIErrorCode + ) + if s3Error = isPutActionAllowed(ctx, rAuthType, bucket, object, r, iampolicy.PutObjectAction); s3Error != ErrNone { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) + return + } + + switch rAuthType { + case authTypeStreamingSigned: + // Initialize stream signature verifier. + reader, s3Error = newSignV4ChunkedReader(r) + if s3Error != ErrNone { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) + return + } + case authTypeSignedV2, authTypePresignedV2: + if s3Error = isReqAuthenticatedV2(r); s3Error != ErrNone { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) + return + } + case authTypePresigned, authTypeSigned: + if s3Error = reqSignatureV4Verify(r, globalSite.Region, serviceS3); s3Error != ErrNone { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) + return + } + + if !skipContentSha256Cksum(r) { + sha256hex = getContentSha256Cksum(r, serviceS3) + } + } + + if err := enforceBucketQuotaHard(ctx, bucket, size); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + actualSize := size + + // get encryption options + var opts ObjectOptions + if crypto.SSEC.IsRequested(r.Header) { + opts, err = getOpts(ctx, r, bucket, object) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + } + + mi, err := objectAPI.GetMultipartInfo(ctx, bucket, object, uploadID, opts) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + // Read compression metadata preserved in the init multipart for the decision. + _, isCompressed := mi.UserDefined[ReservedMetadataPrefix+"compression"] + + var idxCb func() []byte + if objectAPI.IsCompressionSupported() && isCompressed { + actualReader, err := hash.NewReader(reader, size, md5hex, sha256hex, actualSize) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + // Set compression metrics. + wantEncryption := objectAPI.IsEncryptionSupported() && crypto.Requested(r.Header) + s2c, cb := newS2CompressReader(actualReader, actualSize, wantEncryption) + idxCb = cb + defer s2c.Close() + reader = etag.Wrap(s2c, actualReader) + size = -1 // Since compressed size is un-predictable. + md5hex = "" // Do not try to verify the content. + sha256hex = "" + } + + hashReader, err := hash.NewReader(reader, size, md5hex, sha256hex, actualSize) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + rawReader := hashReader + pReader := NewPutObjReader(rawReader) + + _, isEncrypted := crypto.IsEncrypted(mi.UserDefined) + var objectEncryptionKey crypto.ObjectKey + if objectAPI.IsEncryptionSupported() && isEncrypted { + if !crypto.SSEC.IsRequested(r.Header) && crypto.SSEC.IsEncrypted(mi.UserDefined) { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrSSEMultipartEncrypted), r.URL) + return + } + + opts, err = putOpts(ctx, r, bucket, object, mi.UserDefined) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + var key []byte + if crypto.SSEC.IsRequested(r.Header) { + key, err = ParseSSECustomerRequest(r) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + } + + // Calculating object encryption key + key, err = decryptObjectInfo(key, bucket, object, mi.UserDefined) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + copy(objectEncryptionKey[:], key) + + partEncryptionKey := objectEncryptionKey.DerivePartKey(uint32(partID)) + in := io.Reader(hashReader) + if size > encryptBufferThreshold { + // The encryption reads in blocks of 64KB. + // We add a buffer on bigger files to reduce the number of syscalls upstream. + in = bufio.NewReaderSize(hashReader, encryptBufferSize) + } + reader, err = sio.EncryptReader(in, sio.Config{Key: partEncryptionKey[:], CipherSuites: fips.DARECiphers()}) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + wantSize := int64(-1) + if size >= 0 { + info := ObjectInfo{Size: size} + wantSize = info.EncryptedSize() + } + // do not try to verify encrypted content + hashReader, err = hash.NewReader(etag.Wrap(reader, hashReader), wantSize, "", "", actualSize) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + pReader, err = pReader.WithEncryption(hashReader, &objectEncryptionKey) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + if idxCb != nil { + idxCb = compressionIndexEncrypter(objectEncryptionKey, idxCb) + } + } + opts.IndexCB = idxCb + + putObjectPart := objectAPI.PutObjectPart + if api.CacheAPI() != nil { + putObjectPart = api.CacheAPI().PutObjectPart + } + + partInfo, err := putObjectPart(ctx, bucket, object, uploadID, partID, pReader, opts) + if err != nil { + // Verify if the underlying error is signature mismatch. + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + etag := partInfo.ETag + if kind, encrypted := crypto.IsEncrypted(mi.UserDefined); encrypted { + switch kind { + case crypto.S3KMS: + w.Header().Set(xhttp.AmzServerSideEncryption, xhttp.AmzEncryptionKMS) + w.Header().Set(xhttp.AmzServerSideEncryptionKmsID, mi.KMSKeyID()) + if kmsCtx, ok := mi.UserDefined[crypto.MetaContext]; ok { + w.Header().Set(xhttp.AmzServerSideEncryptionKmsContext, kmsCtx) + } + if len(etag) >= 32 && strings.Count(etag, "-") != 1 { + etag = etag[len(etag)-32:] + } + case crypto.S3: + w.Header().Set(xhttp.AmzServerSideEncryption, xhttp.AmzEncryptionAES) + etag, _ = DecryptETag(objectEncryptionKey, ObjectInfo{ETag: etag}) + case crypto.SSEC: + w.Header().Set(xhttp.AmzServerSideEncryptionCustomerAlgorithm, r.Header.Get(xhttp.AmzServerSideEncryptionCustomerAlgorithm)) + w.Header().Set(xhttp.AmzServerSideEncryptionCustomerKeyMD5, r.Header.Get(xhttp.AmzServerSideEncryptionCustomerKeyMD5)) + + if len(etag) >= 32 && strings.Count(etag, "-") != 1 { + etag = etag[len(etag)-32:] + } + } + } + + // We must not use the http.Header().Set method here because some (broken) + // clients expect the ETag header key to be literally "ETag" - not "Etag" (case-sensitive). + // Therefore, we have to set the ETag directly as map entry. + w.Header()[xhttp.ETag] = []string{"\"" + etag + "\""} + + writeSuccessResponseHeadersOnly(w) +} + +// CompleteMultipartUploadHandler - Complete multipart upload. +func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "CompleteMultipartUpload") + + defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) + + vars := mux.Vars(r) + bucket := vars["bucket"] + object, err := unescapePath(vars["object"]) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + objectAPI := api.ObjectAPI() + if objectAPI == nil { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL) + return + } + + if s3Error := checkRequestAuthType(ctx, r, policy.PutObjectAction, bucket, object); s3Error != ErrNone { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) + return + } + + // Content-Length is required and should be non-zero + if r.ContentLength <= 0 { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMissingContentLength), r.URL) + return + } + + // Get upload id. + uploadID, _, _, _, s3Error := getObjectResources(r.Form) + if s3Error != ErrNone { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) + return + } + + complMultipartUpload := &CompleteMultipartUpload{} + if err = xmlDecoder(r.Body, complMultipartUpload, r.ContentLength); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + if len(complMultipartUpload.Parts) == 0 { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMalformedXML), r.URL) + return + } + if !sort.IsSorted(CompletedParts(complMultipartUpload.Parts)) { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidPartOrder), r.URL) + return + } + + // Reject retention or governance headers if set, CompleteMultipartUpload spec + // does not use these headers, and should not be passed down to checkPutObjectLockAllowed + if objectlock.IsObjectLockRequested(r.Header) || objectlock.IsObjectLockGovernanceBypassSet(r.Header) { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidRequest), r.URL) + return + } + + if _, _, _, s3Err := checkPutObjectLockAllowed(ctx, r, bucket, object, objectAPI.GetObjectInfo, ErrNone, ErrNone); s3Err != ErrNone { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL) + return + } + + completeMultiPartUpload := objectAPI.CompleteMultipartUpload + if api.CacheAPI() != nil { + completeMultiPartUpload = api.CacheAPI().CompleteMultipartUpload + } + // This code is specifically to handle the requirements for slow + // complete multipart upload operations on FS mode. + writeErrorResponseWithoutXMLHeader := func(ctx context.Context, w http.ResponseWriter, err APIError, reqURL *url.URL) { + switch err.Code { + case "SlowDown", "XMinioServerNotInitialized", "XMinioReadQuorum", "XMinioWriteQuorum": + // Set retxry-after header to indicate user-agents to retry request after 120secs. + // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After + w.Header().Set(xhttp.RetryAfter, "120") + } + + // Generate error response. + errorResponse := getAPIErrorResponse(ctx, err, reqURL.Path, + w.Header().Get(xhttp.AmzRequestID), globalDeploymentID) + encodedErrorResponse, _ := xml.Marshal(errorResponse) + setCommonHeaders(w) + w.Header().Set(xhttp.ContentType, string(mimeXML)) + w.Write(encodedErrorResponse) + } + + versioned := globalBucketVersioningSys.PrefixEnabled(bucket, object) + suspended := globalBucketVersioningSys.PrefixSuspended(bucket, object) + os := newObjSweeper(bucket, object).WithVersioning(versioned, suspended) + if !globalTierConfigMgr.Empty() { + // Get appropriate object info to identify the remote object to delete + goiOpts := os.GetOpts() + if goi, gerr := objectAPI.GetObjectInfo(ctx, bucket, object, goiOpts); gerr == nil { + os.SetTransitionState(goi.TransitionedObject) + } + } + + setEventStreamHeaders(w) + + opts, err := completeMultipartOpts(ctx, r, bucket, object) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + // First, we compute the ETag of the multipart object. + // The ETag of a multi-part object is always: + // ETag := MD5(ETag_p1, ETag_p2, ...)+"-N" (N being the number of parts) + // + // This is independent of encryption. An encrypted multipart + // object also has an ETag that is the MD5 of its part ETags. + // The fact the in case of encryption the ETag of a part is + // not the MD5 of the part content does not change that. + var completeETags []etag.ETag + for _, part := range complMultipartUpload.Parts { + ETag, err := etag.Parse(part.ETag) + if err != nil { + continue + } + completeETags = append(completeETags, ETag) + } + multipartETag := etag.Multipart(completeETags...) + opts.UserDefined["etag"] = multipartETag.String() + + // However, in case of encryption, the persisted part ETags don't match + // what we have sent to the client during PutObjectPart. The reason is + // that ETags are encrypted. Hence, the client will send a list of complete + // part ETags of which non can match the ETag of any part. For example + // ETag (client): 30902184f4e62dd8f98f0aaff810c626 + // ETag (server-internal): 20000f00ce5dc16e3f3b124f586ae1d88e9caa1c598415c2759bbb50e84a59f630902184f4e62dd8f98f0aaff810c626 + // + // Therefore, we adjust all ETags sent by the client to match what is stored + // on the backend. + if objectAPI.IsEncryptionSupported() { + mi, err := objectAPI.GetMultipartInfo(ctx, bucket, object, uploadID, ObjectOptions{}) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + if _, ok := crypto.IsEncrypted(mi.UserDefined); ok { + const MaxParts = 10000 + listPartsInfo, err := objectAPI.ListObjectParts(ctx, bucket, object, uploadID, 0, MaxParts, ObjectOptions{}) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + sort.Slice(listPartsInfo.Parts, func(i, j int) bool { + return listPartsInfo.Parts[i].PartNumber < listPartsInfo.Parts[j].PartNumber + }) + sort.Slice(complMultipartUpload.Parts, func(i, j int) bool { + return complMultipartUpload.Parts[i].PartNumber < complMultipartUpload.Parts[j].PartNumber + }) + for i := range listPartsInfo.Parts { + for j := range complMultipartUpload.Parts { + if listPartsInfo.Parts[i].PartNumber == complMultipartUpload.Parts[j].PartNumber { + complMultipartUpload.Parts[j].ETag = listPartsInfo.Parts[i].ETag + continue + } + } + } + } + } + + w = &whiteSpaceWriter{ResponseWriter: w, Flusher: w.(http.Flusher)} + completeDoneCh := sendWhiteSpace(ctx, w) + objInfo, err := completeMultiPartUpload(ctx, bucket, object, uploadID, complMultipartUpload.Parts, opts) + // Stop writing white spaces to the client. Note that close(doneCh) style is not used as it + // can cause white space to be written after we send XML response in a race condition. + headerWritten := <-completeDoneCh + if err != nil { + if headerWritten { + writeErrorResponseWithoutXMLHeader(ctx, w, toAPIError(ctx, err), r.URL) + } else { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + } + return + } + + // Get object location. + location := getObjectLocation(r, globalDomainNames, bucket, object) + // Generate complete multipart response. + response := generateCompleteMultpartUploadResponse(bucket, object, location, objInfo.ETag) + var encodedSuccessResponse []byte + if !headerWritten { + encodedSuccessResponse = encodeResponse(response) + } else { + encodedSuccessResponse, err = xml.Marshal(response) + if err != nil { + writeErrorResponseWithoutXMLHeader(ctx, w, toAPIError(ctx, err), r.URL) + return + } + } + + if r.Header.Get(xMinIOExtract) == "true" && strings.HasSuffix(object, archiveExt) { + opts := ObjectOptions{VersionID: objInfo.VersionID, MTime: objInfo.ModTime} + if _, err := updateObjectMetadataWithZipInfo(ctx, objectAPI, bucket, object, opts); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + } + + setPutObjHeaders(w, objInfo, false) + if dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(objInfo, replication.ObjectReplicationType, opts)); dsc.ReplicateAny() { + scheduleReplication(ctx, objInfo.Clone(), objectAPI, dsc, replication.ObjectReplicationType) + } + if _, ok := r.Header[xhttp.MinIOSourceReplicationRequest]; ok { + actualSize, _ := objInfo.GetActualSize() + defer globalReplicationStats.UpdateReplicaStat(bucket, actualSize) + } + + // Write success response. + writeSuccessResponseXML(w, encodedSuccessResponse) + + // Notify object created event. + sendEvent(eventArgs{ + EventName: event.ObjectCreatedCompleteMultipartUpload, + BucketName: bucket, + Object: objInfo, + ReqParams: extractReqParams(r), + RespElements: extractRespElements(w), + UserAgent: r.UserAgent(), + Host: handlers.GetSourceIP(r), + }) + + // Remove the transitioned object whose object version is being overwritten. + if !globalTierConfigMgr.Empty() { + // Schedule object for immediate transition if eligible. + enqueueTransitionImmediate(objInfo) + logger.LogIf(ctx, os.Sweep()) + } +} + +// AbortMultipartUploadHandler - Abort multipart upload +func (api objectAPIHandlers) AbortMultipartUploadHandler(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "AbortMultipartUpload") + + defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) + + vars := mux.Vars(r) + bucket := vars["bucket"] + object, err := unescapePath(vars["object"]) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + objectAPI := api.ObjectAPI() + if objectAPI == nil { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL) + return + } + abortMultipartUpload := objectAPI.AbortMultipartUpload + if api.CacheAPI() != nil { + abortMultipartUpload = api.CacheAPI().AbortMultipartUpload + } + + if s3Error := checkRequestAuthType(ctx, r, policy.AbortMultipartUploadAction, bucket, object); s3Error != ErrNone { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) + return + } + + uploadID, _, _, _, s3Error := getObjectResources(r.Form) + if s3Error != ErrNone { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) + return + } + opts := ObjectOptions{} + if err := abortMultipartUpload(ctx, bucket, object, uploadID, opts); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + writeSuccessNoContent(w) +} + +// ListObjectPartsHandler - List object parts +func (api objectAPIHandlers) ListObjectPartsHandler(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "ListObjectParts") + + defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) + + vars := mux.Vars(r) + bucket := vars["bucket"] + object, err := unescapePath(vars["object"]) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + objectAPI := api.ObjectAPI() + if objectAPI == nil { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL) + return + } + + if s3Error := checkRequestAuthType(ctx, r, policy.ListMultipartUploadPartsAction, bucket, object); s3Error != ErrNone { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) + return + } + + uploadID, partNumberMarker, maxParts, encodingType, s3Error := getObjectResources(r.Form) + if s3Error != ErrNone { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) + return + } + if partNumberMarker < 0 { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidPartNumberMarker), r.URL) + return + } + if maxParts < 0 { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidMaxParts), r.URL) + return + } + + opts := ObjectOptions{} + listPartsInfo, err := objectAPI.ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxParts, opts) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + // We have to adjust the size of encrypted parts since encrypted parts + // are slightly larger due to encryption overhead. + // Further, we have to adjust the ETags of parts when using SSE-S3. + // Due to AWS S3, SSE-S3 encrypted parts return the plaintext ETag + // being the content MD5 of that particular part. This is not the + // case for SSE-C and SSE-KMS objects. + if kind, ok := crypto.IsEncrypted(listPartsInfo.UserDefined); ok && objectAPI.IsEncryptionSupported() { + var objectEncryptionKey []byte + if kind == crypto.S3 { + objectEncryptionKey, err = decryptObjectInfo(nil, bucket, object, listPartsInfo.UserDefined) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + } + for i, p := range listPartsInfo.Parts { + listPartsInfo.Parts[i].ETag = tryDecryptETag(objectEncryptionKey, p.ETag, kind != crypto.S3) + size, err := sio.DecryptedSize(uint64(p.Size)) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + listPartsInfo.Parts[i].Size = int64(size) + } + } + + response := generateListPartsResponse(listPartsInfo, encodingType) + encodedSuccessResponse := encodeResponse(response) + + // Write success response. + writeSuccessResponseXML(w, encodedSuccessResponse) +} + +type whiteSpaceWriter struct { + http.ResponseWriter + http.Flusher + written bool +} + +func (w *whiteSpaceWriter) Write(b []byte) (n int, err error) { + n, err = w.ResponseWriter.Write(b) + w.written = true + return +} + +func (w *whiteSpaceWriter) WriteHeader(statusCode int) { + if !w.written { + w.ResponseWriter.WriteHeader(statusCode) + } +} + +// Send empty whitespaces every 10 seconds to the client till completeMultiPartUpload() is +// done so that the client does not time out. Downside is we might send 200 OK and +// then send error XML. But accoording to S3 spec the client is supposed to check +// for error XML even if it received 200 OK. But for erasure this is not a problem +// as completeMultiPartUpload() is quick. Even For FS, it would not be an issue as +// we do background append as and when the parts arrive and completeMultiPartUpload +// is quick. Only in a rare case where parts would be out of order will +// FS:completeMultiPartUpload() take a longer time. +func sendWhiteSpace(ctx context.Context, w http.ResponseWriter) <-chan bool { + doneCh := make(chan bool) + go func() { + defer close(doneCh) + ticker := time.NewTicker(time.Second * 10) + defer ticker.Stop() + headerWritten := false + for { + select { + case <-ticker.C: + // Write header if not written yet. + if !headerWritten { + _, err := w.Write([]byte(xml.Header)) + headerWritten = err == nil + } + + // Once header is written keep writing empty spaces + // which are ignored by client SDK XML parsers. + // This occurs when server takes long time to completeMultiPartUpload() + _, err := w.Write([]byte(" ")) + if err != nil { + return + } + w.(http.Flusher).Flush() + case doneCh <- headerWritten: + return + case <-ctx.Done(): + return + } + } + }() + return doneCh +} diff --git a/cmd/storage-datatypes.go b/cmd/storage-datatypes.go index b421b0894..b12028664 100644 --- a/cmd/storage-datatypes.go +++ b/cmd/storage-datatypes.go @@ -276,3 +276,24 @@ func newFileInfo(object string, dataBlocks, parityBlocks int) (fi FileInfo) { } return fi } + +// ReadMultipleReq contains information of multiple files to read from disk. +type ReadMultipleReq struct { + Bucket string // Bucket. Can be empty if multiple buckets. + Prefix string // Shared prefix of all files. Can be empty. Will be joined to filename without modification. + Files []string // Individual files to read. + MaxSize int64 // Return error if size is exceed. + MetadataOnly bool // Read as XL meta and truncate data. + AbortOn404 bool // Stop reading after first file not found. +} + +// ReadMultipleResp contains a single response from a ReadMultipleReq. +type ReadMultipleResp struct { + Bucket string // Bucket as given by request. + Prefix string // Prefix as given by request. + File string // File name as given in request. + Exists bool // Returns whether the file existed on disk. + Error string // Returns any error when reading. + Data []byte // Contains all data of file. + Modtime time.Time // Modtime of file on disk. +} diff --git a/cmd/storage-datatypes_gen.go b/cmd/storage-datatypes_gen.go index 3ce8fde35..92125605f 100644 --- a/cmd/storage-datatypes_gen.go +++ b/cmd/storage-datatypes_gen.go @@ -1678,6 +1678,527 @@ func (z *RawFileInfo) Msgsize() (s int) { return } +// DecodeMsg implements msgp.Decodable +func (z *ReadMultipleReq) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "Bucket": + z.Bucket, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Bucket") + return + } + case "Prefix": + z.Prefix, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Prefix") + return + } + case "Files": + var zb0002 uint32 + zb0002, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, "Files") + return + } + if cap(z.Files) >= int(zb0002) { + z.Files = (z.Files)[:zb0002] + } else { + z.Files = make([]string, zb0002) + } + for za0001 := range z.Files { + z.Files[za0001], err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Files", za0001) + return + } + } + case "MaxSize": + z.MaxSize, err = dc.ReadInt64() + if err != nil { + err = msgp.WrapError(err, "MaxSize") + return + } + case "MetadataOnly": + z.MetadataOnly, err = dc.ReadBool() + if err != nil { + err = msgp.WrapError(err, "MetadataOnly") + return + } + case "AbortOn404": + z.AbortOn404, err = dc.ReadBool() + if err != nil { + err = msgp.WrapError(err, "AbortOn404") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *ReadMultipleReq) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 6 + // write "Bucket" + err = en.Append(0x86, 0xa6, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74) + if err != nil { + return + } + err = en.WriteString(z.Bucket) + if err != nil { + err = msgp.WrapError(err, "Bucket") + return + } + // write "Prefix" + err = en.Append(0xa6, 0x50, 0x72, 0x65, 0x66, 0x69, 0x78) + if err != nil { + return + } + err = en.WriteString(z.Prefix) + if err != nil { + err = msgp.WrapError(err, "Prefix") + return + } + // write "Files" + err = en.Append(0xa5, 0x46, 0x69, 0x6c, 0x65, 0x73) + if err != nil { + return + } + err = en.WriteArrayHeader(uint32(len(z.Files))) + if err != nil { + err = msgp.WrapError(err, "Files") + return + } + for za0001 := range z.Files { + err = en.WriteString(z.Files[za0001]) + if err != nil { + err = msgp.WrapError(err, "Files", za0001) + return + } + } + // write "MaxSize" + err = en.Append(0xa7, 0x4d, 0x61, 0x78, 0x53, 0x69, 0x7a, 0x65) + if err != nil { + return + } + err = en.WriteInt64(z.MaxSize) + if err != nil { + err = msgp.WrapError(err, "MaxSize") + return + } + // write "MetadataOnly" + err = en.Append(0xac, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x4f, 0x6e, 0x6c, 0x79) + if err != nil { + return + } + err = en.WriteBool(z.MetadataOnly) + if err != nil { + err = msgp.WrapError(err, "MetadataOnly") + return + } + // write "AbortOn404" + err = en.Append(0xaa, 0x41, 0x62, 0x6f, 0x72, 0x74, 0x4f, 0x6e, 0x34, 0x30, 0x34) + if err != nil { + return + } + err = en.WriteBool(z.AbortOn404) + if err != nil { + err = msgp.WrapError(err, "AbortOn404") + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *ReadMultipleReq) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 6 + // string "Bucket" + o = append(o, 0x86, 0xa6, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74) + o = msgp.AppendString(o, z.Bucket) + // string "Prefix" + o = append(o, 0xa6, 0x50, 0x72, 0x65, 0x66, 0x69, 0x78) + o = msgp.AppendString(o, z.Prefix) + // string "Files" + o = append(o, 0xa5, 0x46, 0x69, 0x6c, 0x65, 0x73) + o = msgp.AppendArrayHeader(o, uint32(len(z.Files))) + for za0001 := range z.Files { + o = msgp.AppendString(o, z.Files[za0001]) + } + // string "MaxSize" + o = append(o, 0xa7, 0x4d, 0x61, 0x78, 0x53, 0x69, 0x7a, 0x65) + o = msgp.AppendInt64(o, z.MaxSize) + // string "MetadataOnly" + o = append(o, 0xac, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x4f, 0x6e, 0x6c, 0x79) + o = msgp.AppendBool(o, z.MetadataOnly) + // string "AbortOn404" + o = append(o, 0xaa, 0x41, 0x62, 0x6f, 0x72, 0x74, 0x4f, 0x6e, 0x34, 0x30, 0x34) + o = msgp.AppendBool(o, z.AbortOn404) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *ReadMultipleReq) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "Bucket": + z.Bucket, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Bucket") + return + } + case "Prefix": + z.Prefix, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Prefix") + return + } + case "Files": + var zb0002 uint32 + zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Files") + return + } + if cap(z.Files) >= int(zb0002) { + z.Files = (z.Files)[:zb0002] + } else { + z.Files = make([]string, zb0002) + } + for za0001 := range z.Files { + z.Files[za0001], bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Files", za0001) + return + } + } + case "MaxSize": + z.MaxSize, bts, err = msgp.ReadInt64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "MaxSize") + return + } + case "MetadataOnly": + z.MetadataOnly, bts, err = msgp.ReadBoolBytes(bts) + if err != nil { + err = msgp.WrapError(err, "MetadataOnly") + return + } + case "AbortOn404": + z.AbortOn404, bts, err = msgp.ReadBoolBytes(bts) + if err != nil { + err = msgp.WrapError(err, "AbortOn404") + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *ReadMultipleReq) Msgsize() (s int) { + s = 1 + 7 + msgp.StringPrefixSize + len(z.Bucket) + 7 + msgp.StringPrefixSize + len(z.Prefix) + 6 + msgp.ArrayHeaderSize + for za0001 := range z.Files { + s += msgp.StringPrefixSize + len(z.Files[za0001]) + } + s += 8 + msgp.Int64Size + 13 + msgp.BoolSize + 11 + msgp.BoolSize + return +} + +// DecodeMsg implements msgp.Decodable +func (z *ReadMultipleResp) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "Bucket": + z.Bucket, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Bucket") + return + } + case "Prefix": + z.Prefix, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Prefix") + return + } + case "File": + z.File, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "File") + return + } + case "Exists": + z.Exists, err = dc.ReadBool() + if err != nil { + err = msgp.WrapError(err, "Exists") + return + } + case "Error": + z.Error, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Error") + return + } + case "Data": + z.Data, err = dc.ReadBytes(z.Data) + if err != nil { + err = msgp.WrapError(err, "Data") + return + } + case "Modtime": + z.Modtime, err = dc.ReadTime() + if err != nil { + err = msgp.WrapError(err, "Modtime") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *ReadMultipleResp) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 7 + // write "Bucket" + err = en.Append(0x87, 0xa6, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74) + if err != nil { + return + } + err = en.WriteString(z.Bucket) + if err != nil { + err = msgp.WrapError(err, "Bucket") + return + } + // write "Prefix" + err = en.Append(0xa6, 0x50, 0x72, 0x65, 0x66, 0x69, 0x78) + if err != nil { + return + } + err = en.WriteString(z.Prefix) + if err != nil { + err = msgp.WrapError(err, "Prefix") + return + } + // write "File" + err = en.Append(0xa4, 0x46, 0x69, 0x6c, 0x65) + if err != nil { + return + } + err = en.WriteString(z.File) + if err != nil { + err = msgp.WrapError(err, "File") + return + } + // write "Exists" + err = en.Append(0xa6, 0x45, 0x78, 0x69, 0x73, 0x74, 0x73) + if err != nil { + return + } + err = en.WriteBool(z.Exists) + if err != nil { + err = msgp.WrapError(err, "Exists") + return + } + // write "Error" + err = en.Append(0xa5, 0x45, 0x72, 0x72, 0x6f, 0x72) + if err != nil { + return + } + err = en.WriteString(z.Error) + if err != nil { + err = msgp.WrapError(err, "Error") + return + } + // write "Data" + err = en.Append(0xa4, 0x44, 0x61, 0x74, 0x61) + if err != nil { + return + } + err = en.WriteBytes(z.Data) + if err != nil { + err = msgp.WrapError(err, "Data") + return + } + // write "Modtime" + err = en.Append(0xa7, 0x4d, 0x6f, 0x64, 0x74, 0x69, 0x6d, 0x65) + if err != nil { + return + } + err = en.WriteTime(z.Modtime) + if err != nil { + err = msgp.WrapError(err, "Modtime") + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *ReadMultipleResp) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 7 + // string "Bucket" + o = append(o, 0x87, 0xa6, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74) + o = msgp.AppendString(o, z.Bucket) + // string "Prefix" + o = append(o, 0xa6, 0x50, 0x72, 0x65, 0x66, 0x69, 0x78) + o = msgp.AppendString(o, z.Prefix) + // string "File" + o = append(o, 0xa4, 0x46, 0x69, 0x6c, 0x65) + o = msgp.AppendString(o, z.File) + // string "Exists" + o = append(o, 0xa6, 0x45, 0x78, 0x69, 0x73, 0x74, 0x73) + o = msgp.AppendBool(o, z.Exists) + // string "Error" + o = append(o, 0xa5, 0x45, 0x72, 0x72, 0x6f, 0x72) + o = msgp.AppendString(o, z.Error) + // string "Data" + o = append(o, 0xa4, 0x44, 0x61, 0x74, 0x61) + o = msgp.AppendBytes(o, z.Data) + // string "Modtime" + o = append(o, 0xa7, 0x4d, 0x6f, 0x64, 0x74, 0x69, 0x6d, 0x65) + o = msgp.AppendTime(o, z.Modtime) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *ReadMultipleResp) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "Bucket": + z.Bucket, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Bucket") + return + } + case "Prefix": + z.Prefix, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Prefix") + return + } + case "File": + z.File, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "File") + return + } + case "Exists": + z.Exists, bts, err = msgp.ReadBoolBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Exists") + return + } + case "Error": + z.Error, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Error") + return + } + case "Data": + z.Data, bts, err = msgp.ReadBytesBytes(bts, z.Data) + if err != nil { + err = msgp.WrapError(err, "Data") + return + } + case "Modtime": + z.Modtime, bts, err = msgp.ReadTimeBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Modtime") + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *ReadMultipleResp) Msgsize() (s int) { + s = 1 + 7 + msgp.StringPrefixSize + len(z.Bucket) + 7 + msgp.StringPrefixSize + len(z.Prefix) + 5 + msgp.StringPrefixSize + len(z.File) + 7 + msgp.BoolSize + 6 + msgp.StringPrefixSize + len(z.Error) + 5 + msgp.BytesPrefixSize + len(z.Data) + 8 + msgp.TimeSize + return +} + // DecodeMsg implements msgp.Decodable func (z *VolInfo) DecodeMsg(dc *msgp.Reader) (err error) { var zb0001 uint32 diff --git a/cmd/storage-datatypes_gen_test.go b/cmd/storage-datatypes_gen_test.go index a6b5f9343..ffc08f690 100644 --- a/cmd/storage-datatypes_gen_test.go +++ b/cmd/storage-datatypes_gen_test.go @@ -687,6 +687,232 @@ func BenchmarkDecodeRawFileInfo(b *testing.B) { } } +func TestMarshalUnmarshalReadMultipleReq(t *testing.T) { + v := ReadMultipleReq{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgReadMultipleReq(b *testing.B) { + v := ReadMultipleReq{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgReadMultipleReq(b *testing.B) { + v := ReadMultipleReq{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalReadMultipleReq(b *testing.B) { + v := ReadMultipleReq{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeReadMultipleReq(t *testing.T) { + v := ReadMultipleReq{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodeReadMultipleReq Msgsize() is inaccurate") + } + + vn := ReadMultipleReq{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeReadMultipleReq(b *testing.B) { + v := ReadMultipleReq{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeReadMultipleReq(b *testing.B) { + v := ReadMultipleReq{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} + +func TestMarshalUnmarshalReadMultipleResp(t *testing.T) { + v := ReadMultipleResp{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgReadMultipleResp(b *testing.B) { + v := ReadMultipleResp{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgReadMultipleResp(b *testing.B) { + v := ReadMultipleResp{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalReadMultipleResp(b *testing.B) { + v := ReadMultipleResp{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeReadMultipleResp(t *testing.T) { + v := ReadMultipleResp{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodeReadMultipleResp Msgsize() is inaccurate") + } + + vn := ReadMultipleResp{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeReadMultipleResp(b *testing.B) { + v := ReadMultipleResp{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeReadMultipleResp(b *testing.B) { + v := ReadMultipleResp{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} + func TestMarshalUnmarshalVolInfo(t *testing.T) { v := VolInfo{} bts, err := v.MarshalMsg(nil) diff --git a/cmd/storage-interface.go b/cmd/storage-interface.go index 901864c7c..2fee13043 100644 --- a/cmd/storage-interface.go +++ b/cmd/storage-interface.go @@ -98,6 +98,7 @@ type StorageAPI interface { Delete(ctx context.Context, volume string, path string, deleteOpts DeleteOptions) (err error) VerifyFile(ctx context.Context, volume, path string, fi FileInfo) error StatInfoFile(ctx context.Context, volume, path string, glob bool) (stat []StatInfo, err error) + ReadMultiple(ctx context.Context, req ReadMultipleReq, resp chan<- ReadMultipleResp) error // Write all data, syncs the data to disk. // Should be used for smaller payloads. @@ -273,3 +274,8 @@ func (p *unrecognizedDisk) ReadAll(ctx context.Context, volume string, path stri func (p *unrecognizedDisk) StatInfoFile(ctx context.Context, volume, path string, glob bool) (stat []StatInfo, err error) { return nil, errDiskNotFound } + +func (p *unrecognizedDisk) ReadMultiple(ctx context.Context, req ReadMultipleReq, resp chan<- ReadMultipleResp) error { + close(resp) + return errDiskNotFound +} diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index 2880bfbe2..c72c78913 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -229,7 +229,10 @@ func (client *storageRESTClient) NSScanner(ctx context.Context, cache dataUsageC rr.CloseWithError(err) return cache, err } - updates <- update + select { + case <-ctx.Done(): + case updates <- update: + } } var newCache dataUsageCache err = newCache.DecodeMsg(ms) @@ -718,6 +721,46 @@ func (client *storageRESTClient) StatInfoFile(ctx context.Context, volume, path return stat, err } +// ReadMultiple will read multiple files and send each back as response. +// Files are read and returned in the given order. +// The resp channel is closed before the call returns. +// Only a canceled context or network errors returns an error. +func (client *storageRESTClient) ReadMultiple(ctx context.Context, req ReadMultipleReq, resp chan<- ReadMultipleResp) error { + defer close(resp) + body, err := req.MarshalMsg(nil) + if err != nil { + return err + } + respBody, err := client.call(ctx, storageRESTMethodReadMultiple, nil, bytes.NewReader(body), int64(len(body))) + if err != nil { + return err + } + defer xhttp.DrainBody(respBody) + if err != nil { + return err + } + pr, pw := io.Pipe() + go func() { + pw.CloseWithError(waitForHTTPStream(respBody, pw)) + }() + mr := msgp.NewReader(pr) + for { + var file ReadMultipleResp + if err := file.DecodeMsg(mr); err != nil { + if errors.Is(err, io.EOF) { + err = nil + } + pr.CloseWithError(err) + return err + } + select { + case <-ctx.Done(): + return ctx.Err() + case resp <- file: + } + } +} + // Close - marks the client as closed. func (client *storageRESTClient) Close() error { client.restClient.Close() diff --git a/cmd/storage-rest-common.go b/cmd/storage-rest-common.go index dd857e35a..fe71face8 100644 --- a/cmd/storage-rest-common.go +++ b/cmd/storage-rest-common.go @@ -18,7 +18,7 @@ package cmd const ( - storageRESTVersion = "v46" // Added MinIO version to FileInfo + storageRESTVersion = "v47" // Added ReadMultiple storageRESTVersionPrefix = SlashSeparator + storageRESTVersion storageRESTPrefix = minioReservedBucketPath + "/storage" ) @@ -53,6 +53,7 @@ const ( storageRESTMethodVerifyFile = "/verifyfile" storageRESTMethodWalkDir = "/walkdir" storageRESTMethodStatInfoFile = "/statfile" + storageRESTMethodReadMultiple = "/readmultiple" ) const ( diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index cf48da2dd..ca82efbd4 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -1244,6 +1244,47 @@ func (s *storageRESTServer) StatInfoFile(w http.ResponseWriter, r *http.Request) } } +// ReadMultiple returns multiple files +func (s *storageRESTServer) ReadMultiple(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + return + } + rw := streamHTTPResponse(w) + defer func() { + if r := recover(); r != nil { + debug.PrintStack() + rw.CloseWithError(fmt.Errorf("panic: %v", r)) + } + }() + + var req ReadMultipleReq + mr := msgpNewReader(r.Body) + err := req.DecodeMsg(mr) + if err != nil { + rw.CloseWithError(err) + return + } + + mw := msgp.NewWriter(rw) + responses := make(chan ReadMultipleResp, len(req.Files)) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for resp := range responses { + err := resp.EncodeMsg(mw) + if err != nil { + rw.CloseWithError(err) + return + } + mw.Flush() + } + }() + err = s.storage.ReadMultiple(r.Context(), req, responses) + wg.Wait() + rw.CloseWithError(err) +} + // registerStorageRPCRouter - register storage rpc router. func registerStorageRESTHandlers(router *mux.Router, endpointServerPools EndpointServerPools) { storageDisks := make([][]*xlStorage, len(endpointServerPools)) @@ -1315,6 +1356,7 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerPools Endpoin subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodVerifyFile).HandlerFunc(httpTraceHdrs(server.VerifyFileHandler)) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodWalkDir).HandlerFunc(httpTraceHdrs(server.WalkDirHandler)) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodStatInfoFile).HandlerFunc(httpTraceHdrs(server.StatInfoFile)) + subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadMultiple).HandlerFunc(httpTraceHdrs(server.ReadMultiple)) } } } diff --git a/cmd/storagemetric_string.go b/cmd/storagemetric_string.go index 71f196d77..a01cf54d4 100644 --- a/cmd/storagemetric_string.go +++ b/cmd/storagemetric_string.go @@ -33,12 +33,13 @@ func _() { _ = x[storageMetricReadXL-22] _ = x[storageMetricReadAll-23] _ = x[storageMetricStatInfoFile-24] - _ = x[storageMetricLast-25] + _ = x[storageMetricReadMultiple-25] + _ = x[storageMetricLast-26] } -const _storageMetric_name = "MakeVolBulkMakeVolListVolsStatVolDeleteVolWalkDirListDirReadFileAppendFileCreateFileReadFileStreamRenameFileRenameDataCheckPartsDeleteDeleteVersionsVerifyFileWriteAllDeleteVersionWriteMetadataUpdateMetadataReadVersionReadXLReadAllStatInfoFileLast" +const _storageMetric_name = "MakeVolBulkMakeVolListVolsStatVolDeleteVolWalkDirListDirReadFileAppendFileCreateFileReadFileStreamRenameFileRenameDataCheckPartsDeleteDeleteVersionsVerifyFileWriteAllDeleteVersionWriteMetadataUpdateMetadataReadVersionReadXLReadAllStatInfoFileReadMultipleLast" -var _storageMetric_index = [...]uint8{0, 11, 18, 26, 33, 42, 49, 56, 64, 74, 84, 98, 108, 118, 128, 134, 148, 158, 166, 179, 192, 206, 217, 223, 230, 242, 246} +var _storageMetric_index = [...]uint16{0, 11, 18, 26, 33, 42, 49, 56, 64, 74, 84, 98, 108, 118, 128, 134, 148, 158, 166, 179, 192, 206, 217, 223, 230, 242, 254, 258} func (i storageMetric) String() string { if i >= storageMetric(len(_storageMetric_index)-1) { diff --git a/cmd/xl-storage-disk-id-check.go b/cmd/xl-storage-disk-id-check.go index 6b6266866..b96dbb083 100644 --- a/cmd/xl-storage-disk-id-check.go +++ b/cmd/xl-storage-disk-id-check.go @@ -64,6 +64,7 @@ const ( storageMetricReadXL storageMetricReadAll storageMetricStatInfoFile + storageMetricReadMultiple // .... add more @@ -510,6 +511,21 @@ func (p *xlStorageDiskIDCheck) StatInfoFile(ctx context.Context, volume, path st return p.storage.StatInfoFile(ctx, volume, path, glob) } +// ReadMultiple will read multiple files and send each back as response. +// Files are read and returned in the given order. +// The resp channel is closed before the call returns. +// Only a canceled context will return an error. +func (p *xlStorageDiskIDCheck) ReadMultiple(ctx context.Context, req ReadMultipleReq, resp chan<- ReadMultipleResp) error { + ctx, done, err := p.TrackDiskHealth(ctx, storageMetricReadMultiple, req.Bucket, req.Prefix) + if err != nil { + close(resp) + return err + } + defer done(&err) + + return p.storage.ReadMultiple(ctx, req, resp) +} + func storageTrace(s storageMetric, startTime time.Time, duration time.Duration, path string) madmin.TraceInfo { return madmin.TraceInfo{ TraceType: madmin.TraceStorage, diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 8d18654a2..6bbe82289 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -2565,6 +2565,66 @@ func (s *xlStorage) VerifyFile(ctx context.Context, volume, path string, fi File return nil } +// ReadMultiple will read multiple files and send each back as response. +// Files are read and returned in the given order. +// The resp channel is closed before the call returns. +// Only a canceled context will return an error. +func (s *xlStorage) ReadMultiple(ctx context.Context, req ReadMultipleReq, resp chan<- ReadMultipleResp) error { + defer close(resp) + + volumeDir := pathJoin(s.diskPath, req.Bucket) + + for _, f := range req.Files { + if contextCanceled(ctx) { + return ctx.Err() + } + r := ReadMultipleResp{ + Bucket: req.Bucket, + Prefix: req.Prefix, + File: f, + } + var data []byte + var mt time.Time + var err error + fullPath := pathJoin(volumeDir, req.Prefix, f) + if req.MetadataOnly { + data, mt, err = s.readMetadataWithDMTime(ctx, fullPath) + } else { + data, mt, err = s.readAllData(ctx, volumeDir, fullPath) + } + + if err != nil { + if !IsErr(err, errFileNotFound, errVolumeNotFound) { + r.Exists = true + r.Error = err.Error() + } + select { + case <-ctx.Done(): + return ctx.Err() + case resp <- r: + } + if req.AbortOn404 && !r.Exists { + // We stop at first file not found. + // We have already reported the error, return nil. + return nil + } + continue + } + diskHealthCheckOK(ctx, nil) + if req.MaxSize > 0 && int64(len(data)) > req.MaxSize { + r.Exists = true + r.Error = fmt.Sprintf("max size (%d) exceeded: %d", req.MaxSize, len(data)) + resp <- r + continue + } + r.Exists = true + r.Data = data + r.Modtime = mt + resp <- r + } + return nil +} + func (s *xlStorage) StatInfoFile(ctx context.Context, volume, path string, glob bool) (stat []StatInfo, err error) { volumeDir, err := s.getVolDir(volume) if err != nil {