mirror of https://github.com/minio/minio.git
1472 lines
45 KiB
Go
1472 lines
45 KiB
Go
// Copyright (c) 2015-2023 MinIO, Inc.
|
|
//
|
|
// This file is part of MinIO Object Storage stack
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Affero General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"context"
|
|
"encoding/base64"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"path"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/klauspost/readahead"
|
|
"github.com/minio/minio-go/v7/pkg/set"
|
|
"github.com/minio/minio/internal/config/storageclass"
|
|
"github.com/minio/minio/internal/crypto"
|
|
"github.com/minio/minio/internal/hash"
|
|
xhttp "github.com/minio/minio/internal/http"
|
|
xioutil "github.com/minio/minio/internal/ioutil"
|
|
"github.com/minio/minio/internal/logger"
|
|
"github.com/minio/pkg/v3/mimedb"
|
|
"github.com/minio/pkg/v3/sync/errgroup"
|
|
"github.com/minio/sio"
|
|
)
|
|
|
|
func (er erasureObjects) getUploadIDDir(bucket, object, uploadID string) string {
|
|
uploadUUID := uploadID
|
|
uploadBytes, err := base64.RawURLEncoding.DecodeString(uploadID)
|
|
if err == nil {
|
|
slc := strings.SplitN(string(uploadBytes), ".", 2)
|
|
if len(slc) == 2 {
|
|
uploadUUID = slc[1]
|
|
}
|
|
}
|
|
return pathJoin(er.getMultipartSHADir(bucket, object), uploadUUID)
|
|
}
|
|
|
|
func (er erasureObjects) getMultipartSHADir(bucket, object string) string {
|
|
return getSHA256Hash([]byte(pathJoin(bucket, object)))
|
|
}
|
|
|
|
// checkUploadIDExists - verify if a given uploadID exists and is valid.
|
|
func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object, uploadID string, write bool) (fi FileInfo, metArr []FileInfo, err error) {
|
|
defer func() {
|
|
if errors.Is(err, errFileNotFound) {
|
|
err = errUploadIDNotFound
|
|
}
|
|
}()
|
|
|
|
uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)
|
|
|
|
storageDisks := er.getDisks()
|
|
|
|
// Read metadata associated with the object from all disks.
|
|
partsMetadata, errs := readAllFileInfo(ctx, storageDisks, bucket, minioMetaMultipartBucket,
|
|
uploadIDPath, "", false, false)
|
|
|
|
readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, partsMetadata, errs, er.defaultParityCount)
|
|
if err != nil {
|
|
return fi, nil, err
|
|
}
|
|
|
|
if readQuorum < 0 {
|
|
return fi, nil, errErasureReadQuorum
|
|
}
|
|
|
|
if writeQuorum < 0 {
|
|
return fi, nil, errErasureWriteQuorum
|
|
}
|
|
|
|
quorum := readQuorum
|
|
if write {
|
|
quorum = writeQuorum
|
|
}
|
|
|
|
// List all online disks.
|
|
_, modTime, etag := listOnlineDisks(storageDisks, partsMetadata, errs, quorum)
|
|
|
|
if write {
|
|
err = reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
|
|
} else {
|
|
err = reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum)
|
|
}
|
|
if err != nil {
|
|
return fi, nil, err
|
|
}
|
|
|
|
// Pick one from the first valid metadata.
|
|
fi, err = pickValidFileInfo(ctx, partsMetadata, modTime, etag, quorum)
|
|
return fi, partsMetadata, err
|
|
}
|
|
|
|
// cleanupMultipartPath removes all extraneous files and parts from the multipart folder, this is used per CompleteMultipart.
|
|
// do not use this function outside of completeMultipartUpload()
|
|
func (er erasureObjects) cleanupMultipartPath(ctx context.Context, paths ...string) {
|
|
storageDisks := er.getDisks()
|
|
|
|
g := errgroup.WithNErrs(len(storageDisks))
|
|
for index, disk := range storageDisks {
|
|
if disk == nil {
|
|
continue
|
|
}
|
|
index := index
|
|
g.Go(func() error {
|
|
_ = storageDisks[index].DeleteBulk(ctx, minioMetaMultipartBucket, paths...)
|
|
return nil
|
|
}, index)
|
|
}
|
|
g.Wait()
|
|
}
|
|
|
|
// Clean-up the old multipart uploads. Should be run in a Go routine.
|
|
func (er erasureObjects) cleanupStaleUploads(ctx context.Context) {
|
|
// run multiple cleanup's local to this server.
|
|
var wg sync.WaitGroup
|
|
for _, disk := range er.getLocalDisks() {
|
|
if disk != nil {
|
|
wg.Add(1)
|
|
go func(disk StorageAPI) {
|
|
defer wg.Done()
|
|
er.cleanupStaleUploadsOnDisk(ctx, disk)
|
|
}(disk)
|
|
}
|
|
}
|
|
wg.Wait()
|
|
}
|
|
|
|
func (er erasureObjects) deleteAll(ctx context.Context, bucket, prefix string) {
|
|
var wg sync.WaitGroup
|
|
for _, disk := range er.getDisks() {
|
|
if disk == nil {
|
|
continue
|
|
}
|
|
wg.Add(1)
|
|
go func(disk StorageAPI) {
|
|
defer wg.Done()
|
|
disk.Delete(ctx, bucket, prefix, DeleteOptions{
|
|
Recursive: true,
|
|
Immediate: false,
|
|
})
|
|
}(disk)
|
|
}
|
|
wg.Wait()
|
|
}
|
|
|
|
// Remove the old multipart uploads on the given disk.
|
|
func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk StorageAPI) {
|
|
drivePath := disk.Endpoint().Path
|
|
|
|
readDirFn(pathJoin(drivePath, minioMetaMultipartBucket), func(shaDir string, typ os.FileMode) error {
|
|
readDirFn(pathJoin(drivePath, minioMetaMultipartBucket, shaDir), func(uploadIDDir string, typ os.FileMode) error {
|
|
uploadIDPath := pathJoin(shaDir, uploadIDDir)
|
|
var modTime time.Time
|
|
// Upload IDs are of the form base64_url(<UUID>x<UnixNano>), we can extract the time from the UUID.
|
|
if b64, err := base64.RawURLEncoding.DecodeString(uploadIDDir); err == nil {
|
|
if split := strings.Split(string(b64), "x"); len(split) == 2 {
|
|
t, err := strconv.ParseInt(split[1], 10, 64)
|
|
if err == nil {
|
|
modTime = time.Unix(0, t)
|
|
}
|
|
}
|
|
}
|
|
// Fallback for older uploads without time in the ID.
|
|
if modTime.IsZero() {
|
|
wait := deleteMultipartCleanupSleeper.Timer(ctx)
|
|
fi, err := disk.ReadVersion(ctx, "", minioMetaMultipartBucket, uploadIDPath, "", ReadOptions{})
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
modTime = fi.ModTime
|
|
wait()
|
|
}
|
|
if time.Since(modTime) < globalAPIConfig.getStaleUploadsExpiry() {
|
|
return nil
|
|
}
|
|
w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
|
|
return w.Run(func() error {
|
|
wait := deleteMultipartCleanupSleeper.Timer(ctx)
|
|
pathUUID := mustGetUUID()
|
|
targetPath := pathJoin(drivePath, minioMetaTmpDeletedBucket, pathUUID)
|
|
renameAll(pathJoin(drivePath, minioMetaMultipartBucket, uploadIDPath), targetPath, pathJoin(drivePath, minioMetaBucket))
|
|
wait()
|
|
return nil
|
|
})
|
|
})
|
|
// Get the modtime of the shaDir.
|
|
vi, err := disk.StatVol(ctx, pathJoin(minioMetaMultipartBucket, shaDir))
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
// Modtime is returned in the Created field. See (*xlStorage).StatVol
|
|
if time.Since(vi.Created) < globalAPIConfig.getStaleUploadsExpiry() {
|
|
return nil
|
|
}
|
|
w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
|
|
return w.Run(func() error {
|
|
wait := deleteMultipartCleanupSleeper.Timer(ctx)
|
|
pathUUID := mustGetUUID()
|
|
targetPath := pathJoin(drivePath, minioMetaTmpDeletedBucket, pathUUID)
|
|
|
|
// We are not deleting shaDir recursively here, if shaDir is empty
|
|
// and its older then we can happily delete it.
|
|
Rename(pathJoin(drivePath, minioMetaMultipartBucket, shaDir), targetPath)
|
|
wait()
|
|
return nil
|
|
})
|
|
})
|
|
|
|
readDirFn(pathJoin(drivePath, minioMetaTmpBucket), func(tmpDir string, typ os.FileMode) error {
|
|
if strings.HasPrefix(tmpDir, ".trash") {
|
|
// do not remove .trash/ here, it has its own routines
|
|
return nil
|
|
}
|
|
vi, err := disk.StatVol(ctx, pathJoin(minioMetaTmpBucket, tmpDir))
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
|
|
return w.Run(func() error {
|
|
wait := deleteMultipartCleanupSleeper.Timer(ctx)
|
|
if time.Since(vi.Created) > globalAPIConfig.getStaleUploadsExpiry() {
|
|
pathUUID := mustGetUUID()
|
|
targetPath := pathJoin(drivePath, minioMetaTmpDeletedBucket, pathUUID)
|
|
|
|
renameAll(pathJoin(drivePath, minioMetaTmpBucket, tmpDir), targetPath, pathJoin(drivePath, minioMetaBucket))
|
|
}
|
|
wait()
|
|
return nil
|
|
})
|
|
})
|
|
}
|
|
|
|
// ListMultipartUploads - lists all the pending multipart
|
|
// uploads for a particular object in a bucket.
|
|
//
|
|
// Implements minimal S3 compatible ListMultipartUploads API. We do
|
|
// not support prefix based listing, this is a deliberate attempt
|
|
// towards simplification of multipart APIs.
|
|
// The resulting ListMultipartsInfo structure is unmarshalled directly as XML.
|
|
func (er erasureObjects) ListMultipartUploads(ctx context.Context, bucket, object, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, err error) {
|
|
auditObjectErasureSet(ctx, "ListMultipartUploads", object, &er)
|
|
|
|
result.MaxUploads = maxUploads
|
|
result.KeyMarker = keyMarker
|
|
result.Prefix = object
|
|
result.Delimiter = delimiter
|
|
|
|
var uploadIDs []string
|
|
var disk StorageAPI
|
|
disks := er.getOnlineLocalDisks()
|
|
if len(disks) == 0 {
|
|
// If no local, get non-healing disks.
|
|
var ok bool
|
|
if disks, ok = er.getOnlineDisksWithHealing(false); !ok {
|
|
disks = er.getOnlineDisks()
|
|
}
|
|
}
|
|
|
|
for _, disk = range disks {
|
|
if disk == nil {
|
|
continue
|
|
}
|
|
if !disk.IsOnline() {
|
|
continue
|
|
}
|
|
uploadIDs, err = disk.ListDir(ctx, bucket, minioMetaMultipartBucket, er.getMultipartSHADir(bucket, object), -1)
|
|
if err != nil {
|
|
if errors.Is(err, errDiskNotFound) {
|
|
continue
|
|
}
|
|
if errors.Is(err, errFileNotFound) {
|
|
return result, nil
|
|
}
|
|
return result, toObjectErr(err, bucket, object)
|
|
}
|
|
break
|
|
}
|
|
|
|
for i := range uploadIDs {
|
|
uploadIDs[i] = strings.TrimSuffix(uploadIDs[i], SlashSeparator)
|
|
}
|
|
|
|
// S3 spec says uploadIDs should be sorted based on initiated time, we need
|
|
// to read the metadata entry.
|
|
var uploads []MultipartInfo
|
|
|
|
populatedUploadIDs := set.NewStringSet()
|
|
|
|
for _, uploadID := range uploadIDs {
|
|
if populatedUploadIDs.Contains(uploadID) {
|
|
continue
|
|
}
|
|
// If present, use time stored in ID.
|
|
startTime := time.Now()
|
|
if split := strings.Split(uploadID, "x"); len(split) == 2 {
|
|
t, err := strconv.ParseInt(split[1], 10, 64)
|
|
if err == nil {
|
|
startTime = time.Unix(0, t)
|
|
}
|
|
}
|
|
uploads = append(uploads, MultipartInfo{
|
|
Bucket: bucket,
|
|
Object: object,
|
|
UploadID: base64.RawURLEncoding.EncodeToString([]byte(fmt.Sprintf("%s.%s", globalDeploymentID(), uploadID))),
|
|
Initiated: startTime,
|
|
})
|
|
populatedUploadIDs.Add(uploadID)
|
|
}
|
|
|
|
sort.Slice(uploads, func(i int, j int) bool {
|
|
return uploads[i].Initiated.Before(uploads[j].Initiated)
|
|
})
|
|
|
|
uploadIndex := 0
|
|
if uploadIDMarker != "" {
|
|
for uploadIndex < len(uploads) {
|
|
if uploads[uploadIndex].UploadID != uploadIDMarker {
|
|
uploadIndex++
|
|
continue
|
|
}
|
|
if uploads[uploadIndex].UploadID == uploadIDMarker {
|
|
uploadIndex++
|
|
break
|
|
}
|
|
uploadIndex++
|
|
}
|
|
}
|
|
for uploadIndex < len(uploads) {
|
|
result.Uploads = append(result.Uploads, uploads[uploadIndex])
|
|
result.NextUploadIDMarker = uploads[uploadIndex].UploadID
|
|
uploadIndex++
|
|
if len(result.Uploads) == maxUploads {
|
|
break
|
|
}
|
|
}
|
|
|
|
result.IsTruncated = uploadIndex < len(uploads)
|
|
|
|
if !result.IsTruncated {
|
|
result.NextKeyMarker = ""
|
|
result.NextUploadIDMarker = ""
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// newMultipartUpload - wrapper for initializing a new multipart
|
|
// request; returns a unique upload id.
|
|
//
|
|
// Internally this function creates 'uploads.json' associated for the
|
|
// incoming object at
|
|
// '.minio.sys/multipart/bucket/object/uploads.json' on all the
|
|
// disks. `uploads.json` carries metadata regarding on-going multipart
|
|
// operation(s) on the object.
|
|
func (er erasureObjects) newMultipartUpload(ctx context.Context, bucket string, object string, opts ObjectOptions) (*NewMultipartUploadResult, error) {
|
|
if opts.CheckPrecondFn != nil {
|
|
if !opts.NoLock {
|
|
ns := er.NewNSLock(bucket, object)
|
|
lkctx, err := ns.GetLock(ctx, globalOperationTimeout)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ctx = lkctx.Context()
|
|
defer ns.Unlock(lkctx)
|
|
opts.NoLock = true
|
|
}
|
|
|
|
obj, err := er.getObjectInfo(ctx, bucket, object, opts)
|
|
if err == nil && opts.CheckPrecondFn(obj) {
|
|
return nil, PreConditionFailed{}
|
|
}
|
|
if err != nil && !isErrVersionNotFound(err) && !isErrObjectNotFound(err) && !isErrReadQuorum(err) {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
userDefined := cloneMSS(opts.UserDefined)
|
|
if opts.PreserveETag != "" {
|
|
userDefined["etag"] = opts.PreserveETag
|
|
}
|
|
onlineDisks := er.getDisks()
|
|
|
|
// Get parity and data drive count based on storage class metadata
|
|
parityDrives := globalStorageClass.GetParityForSC(userDefined[xhttp.AmzStorageClass])
|
|
if parityDrives < 0 {
|
|
parityDrives = er.defaultParityCount
|
|
}
|
|
|
|
if globalStorageClass.AvailabilityOptimized() {
|
|
// If we have offline disks upgrade the number of erasure codes for this object.
|
|
parityOrig := parityDrives
|
|
|
|
var offlineDrives int
|
|
for _, disk := range onlineDisks {
|
|
if disk == nil || !disk.IsOnline() {
|
|
parityDrives++
|
|
offlineDrives++
|
|
continue
|
|
}
|
|
}
|
|
|
|
if offlineDrives >= (len(onlineDisks)+1)/2 {
|
|
// if offline drives are more than 50% of the drives
|
|
// we have no quorum, we shouldn't proceed just
|
|
// fail at that point.
|
|
return nil, toObjectErr(errErasureWriteQuorum, bucket, object)
|
|
}
|
|
|
|
if parityDrives >= len(onlineDisks)/2 {
|
|
parityDrives = len(onlineDisks) / 2
|
|
}
|
|
|
|
if parityOrig != parityDrives {
|
|
userDefined[minIOErasureUpgraded] = strconv.Itoa(parityOrig) + "->" + strconv.Itoa(parityDrives)
|
|
}
|
|
}
|
|
|
|
dataDrives := len(onlineDisks) - parityDrives
|
|
|
|
// we now know the number of blocks this object needs for data and parity.
|
|
// establish the writeQuorum using this data
|
|
writeQuorum := dataDrives
|
|
if dataDrives == parityDrives {
|
|
writeQuorum++
|
|
}
|
|
|
|
// Initialize parts metadata
|
|
partsMetadata := make([]FileInfo, len(onlineDisks))
|
|
|
|
fi := newFileInfo(pathJoin(bucket, object), dataDrives, parityDrives)
|
|
fi.VersionID = opts.VersionID
|
|
if opts.Versioned && fi.VersionID == "" {
|
|
fi.VersionID = mustGetUUID()
|
|
}
|
|
fi.DataDir = mustGetUUID()
|
|
|
|
if ckSum := userDefined[ReplicationSsecChecksumHeader]; ckSum != "" {
|
|
v, err := base64.StdEncoding.DecodeString(ckSum)
|
|
if err == nil {
|
|
fi.Checksum = v
|
|
}
|
|
delete(userDefined, ReplicationSsecChecksumHeader)
|
|
}
|
|
|
|
// Initialize erasure metadata.
|
|
for index := range partsMetadata {
|
|
partsMetadata[index] = fi
|
|
}
|
|
|
|
// Guess content-type from the extension if possible.
|
|
if userDefined["content-type"] == "" {
|
|
userDefined["content-type"] = mimedb.TypeByExtension(path.Ext(object))
|
|
}
|
|
|
|
// if storageClass is standard no need to save it as part of metadata.
|
|
if userDefined[xhttp.AmzStorageClass] == storageclass.STANDARD {
|
|
delete(userDefined, xhttp.AmzStorageClass)
|
|
}
|
|
|
|
if opts.WantChecksum != nil && opts.WantChecksum.Type.IsSet() {
|
|
userDefined[hash.MinIOMultipartChecksum] = opts.WantChecksum.Type.String()
|
|
}
|
|
|
|
modTime := opts.MTime
|
|
if opts.MTime.IsZero() {
|
|
modTime = UTCNow()
|
|
}
|
|
|
|
onlineDisks, partsMetadata = shuffleDisksAndPartsMetadata(onlineDisks, partsMetadata, fi)
|
|
|
|
// Fill all the necessary metadata.
|
|
// Update `xl.meta` content on each disks.
|
|
for index := range partsMetadata {
|
|
partsMetadata[index].Fresh = true
|
|
partsMetadata[index].ModTime = modTime
|
|
partsMetadata[index].Metadata = userDefined
|
|
}
|
|
uploadUUID := fmt.Sprintf("%sx%d", mustGetUUID(), modTime.UnixNano())
|
|
uploadID := base64.RawURLEncoding.EncodeToString([]byte(fmt.Sprintf("%s.%s", globalDeploymentID(), uploadUUID)))
|
|
uploadIDPath := er.getUploadIDDir(bucket, object, uploadUUID)
|
|
|
|
// Write updated `xl.meta` to all disks.
|
|
if _, err := writeAllMetadata(ctx, onlineDisks, bucket, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum); err != nil {
|
|
return nil, toObjectErr(err, bucket, object)
|
|
}
|
|
|
|
return &NewMultipartUploadResult{
|
|
UploadID: uploadID,
|
|
ChecksumAlgo: userDefined[hash.MinIOMultipartChecksum],
|
|
}, nil
|
|
}
|
|
|
|
// NewMultipartUpload - initialize a new multipart upload, returns a
|
|
// unique id. The unique id returned here is of UUID form, for each
|
|
// subsequent request each UUID is unique.
|
|
//
|
|
// Implements S3 compatible initiate multipart API.
|
|
func (er erasureObjects) NewMultipartUpload(ctx context.Context, bucket, object string, opts ObjectOptions) (*NewMultipartUploadResult, error) {
|
|
if !opts.NoAuditLog {
|
|
auditObjectErasureSet(ctx, "NewMultipartUpload", object, &er)
|
|
}
|
|
|
|
return er.newMultipartUpload(ctx, bucket, object, opts)
|
|
}
|
|
|
|
// renamePart - renames multipart part to its relevant location under uploadID.
|
|
func (er erasureObjects) renamePart(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, optsMeta []byte, writeQuorum int) ([]StorageAPI, error) {
|
|
paths := []string{
|
|
dstEntry,
|
|
dstEntry + ".meta",
|
|
}
|
|
|
|
// cleanup existing paths first across all drives.
|
|
er.cleanupMultipartPath(ctx, paths...)
|
|
|
|
g := errgroup.WithNErrs(len(disks))
|
|
|
|
// Rename file on all underlying storage disks.
|
|
for index := range disks {
|
|
index := index
|
|
g.Go(func() error {
|
|
if disks[index] == nil {
|
|
return errDiskNotFound
|
|
}
|
|
return disks[index].RenamePart(ctx, srcBucket, srcEntry, dstBucket, dstEntry, optsMeta)
|
|
}, index)
|
|
}
|
|
|
|
// Wait for all renames to finish.
|
|
errs := g.Wait()
|
|
|
|
err := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
|
|
if err != nil {
|
|
er.cleanupMultipartPath(ctx, paths...)
|
|
}
|
|
|
|
// We can safely allow RenameFile errors up to len(er.getDisks()) - writeQuorum
|
|
// otherwise return failure. Cleanup successful renames.
|
|
return evalDisks(disks, errs), err
|
|
}
|
|
|
|
// PutObjectPart - reads incoming stream and internally erasure codes
|
|
// them. This call is similar to single put operation but it is part
|
|
// of the multipart transaction.
|
|
//
|
|
// Implements S3 compatible Upload Part API.
|
|
func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *PutObjReader, opts ObjectOptions) (pi PartInfo, err error) {
|
|
if !opts.NoAuditLog {
|
|
auditObjectErasureSet(ctx, "PutObjectPart", object, &er)
|
|
}
|
|
|
|
data := r.Reader
|
|
// Validate input data size and it can never be less than zero.
|
|
if data.Size() < -1 {
|
|
bugLogIf(ctx, errInvalidArgument, logger.ErrorKind)
|
|
return pi, toObjectErr(errInvalidArgument)
|
|
}
|
|
|
|
uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)
|
|
// Validates if upload ID exists.
|
|
fi, _, err := er.checkUploadIDExists(ctx, bucket, object, uploadID, true)
|
|
if err != nil {
|
|
if errors.Is(err, errVolumeNotFound) {
|
|
return pi, toObjectErr(err, bucket)
|
|
}
|
|
return pi, toObjectErr(err, bucket, object, uploadID)
|
|
}
|
|
|
|
onlineDisks := er.getDisks()
|
|
writeQuorum := fi.WriteQuorum(er.defaultWQuorum())
|
|
|
|
if cs := fi.Metadata[hash.MinIOMultipartChecksum]; cs != "" {
|
|
if r.ContentCRCType().String() != cs {
|
|
return pi, InvalidArgument{
|
|
Bucket: bucket,
|
|
Object: fi.Name,
|
|
Err: fmt.Errorf("checksum missing, want %q, got %q", cs, r.ContentCRCType().String()),
|
|
}
|
|
}
|
|
}
|
|
onlineDisks = shuffleDisks(onlineDisks, fi.Erasure.Distribution)
|
|
|
|
// Need a unique name for the part being written in minioMetaBucket to
|
|
// accommodate concurrent PutObjectPart requests
|
|
|
|
partSuffix := fmt.Sprintf("part.%d", partID)
|
|
// Random UUID and timestamp for temporary part file.
|
|
tmpPart := fmt.Sprintf("%sx%d", mustGetUUID(), time.Now().UnixNano())
|
|
tmpPartPath := pathJoin(tmpPart, partSuffix)
|
|
|
|
// Delete the temporary object part. If PutObjectPart succeeds there would be nothing to delete.
|
|
defer func() {
|
|
if countOnlineDisks(onlineDisks) != len(onlineDisks) {
|
|
er.deleteAll(context.Background(), minioMetaTmpBucket, tmpPart)
|
|
}
|
|
}()
|
|
|
|
erasure, err := NewErasure(ctx, fi.Erasure.DataBlocks, fi.Erasure.ParityBlocks, fi.Erasure.BlockSize)
|
|
if err != nil {
|
|
return pi, toObjectErr(err, bucket, object)
|
|
}
|
|
|
|
// Fetch buffer for I/O, returns from the pool if not allocates a new one and returns.
|
|
var buffer []byte
|
|
switch size := data.Size(); {
|
|
case size == 0:
|
|
buffer = make([]byte, 1) // Allocate at least a byte to reach EOF
|
|
case size == -1:
|
|
if size := data.ActualSize(); size > 0 && size < fi.Erasure.BlockSize {
|
|
// Account for padding and forced compression overhead and encryption.
|
|
buffer = make([]byte, data.ActualSize()+256+32+32, data.ActualSize()*2+512)
|
|
} else {
|
|
buffer = globalBytePoolCap.Load().Get()
|
|
defer globalBytePoolCap.Load().Put(buffer)
|
|
}
|
|
case size >= fi.Erasure.BlockSize:
|
|
buffer = globalBytePoolCap.Load().Get()
|
|
defer globalBytePoolCap.Load().Put(buffer)
|
|
case size < fi.Erasure.BlockSize:
|
|
// No need to allocate fully fi.Erasure.BlockSize buffer if the incoming data is smaller.
|
|
buffer = make([]byte, size, 2*size+int64(fi.Erasure.ParityBlocks+fi.Erasure.DataBlocks-1))
|
|
}
|
|
|
|
if len(buffer) > int(fi.Erasure.BlockSize) {
|
|
buffer = buffer[:fi.Erasure.BlockSize]
|
|
}
|
|
writers := make([]io.Writer, len(onlineDisks))
|
|
for i, disk := range onlineDisks {
|
|
if disk == nil {
|
|
continue
|
|
}
|
|
writers[i] = newBitrotWriter(disk, bucket, minioMetaTmpBucket, tmpPartPath, erasure.ShardFileSize(data.Size()), DefaultBitrotAlgorithm, erasure.ShardSize())
|
|
}
|
|
|
|
toEncode := io.Reader(data)
|
|
if data.Size() > bigFileThreshold {
|
|
// Add input readahead.
|
|
// We use 2 buffers, so we always have a full buffer of input.
|
|
pool := globalBytePoolCap.Load()
|
|
bufA := pool.Get()
|
|
bufB := pool.Get()
|
|
defer pool.Put(bufA)
|
|
defer pool.Put(bufB)
|
|
ra, err := readahead.NewReaderBuffer(data, [][]byte{bufA[:fi.Erasure.BlockSize], bufB[:fi.Erasure.BlockSize]})
|
|
if err == nil {
|
|
toEncode = ra
|
|
defer ra.Close()
|
|
}
|
|
}
|
|
|
|
n, err := erasure.Encode(ctx, toEncode, writers, buffer, writeQuorum)
|
|
closeBitrotWriters(writers)
|
|
if err != nil {
|
|
return pi, toObjectErr(err, bucket, object)
|
|
}
|
|
|
|
// Should return IncompleteBody{} error when reader has fewer bytes
|
|
// than specified in request header.
|
|
if n < data.Size() {
|
|
return pi, IncompleteBody{Bucket: bucket, Object: object}
|
|
}
|
|
|
|
for i := range writers {
|
|
if writers[i] == nil {
|
|
onlineDisks[i] = nil
|
|
}
|
|
}
|
|
|
|
// Rename temporary part file to its final location.
|
|
partPath := pathJoin(uploadIDPath, fi.DataDir, partSuffix)
|
|
|
|
md5hex := r.MD5CurrentHexString()
|
|
if opts.PreserveETag != "" {
|
|
md5hex = opts.PreserveETag
|
|
}
|
|
|
|
var index []byte
|
|
if opts.IndexCB != nil {
|
|
index = opts.IndexCB()
|
|
}
|
|
|
|
actualSize := data.ActualSize()
|
|
if actualSize < 0 {
|
|
_, encrypted := crypto.IsEncrypted(fi.Metadata)
|
|
compressed := fi.IsCompressed()
|
|
switch {
|
|
case compressed:
|
|
// ... nothing changes for compressed stream.
|
|
// if actualSize is -1 we have no known way to
|
|
// determine what is the actualSize.
|
|
case encrypted:
|
|
decSize, err := sio.DecryptedSize(uint64(n))
|
|
if err == nil {
|
|
actualSize = int64(decSize)
|
|
}
|
|
default:
|
|
actualSize = n
|
|
}
|
|
}
|
|
|
|
partInfo := ObjectPartInfo{
|
|
Number: partID,
|
|
ETag: md5hex,
|
|
Size: n,
|
|
ActualSize: actualSize,
|
|
ModTime: UTCNow(),
|
|
Index: index,
|
|
Checksums: r.ContentCRC(),
|
|
}
|
|
|
|
partFI, err := partInfo.MarshalMsg(nil)
|
|
if err != nil {
|
|
return pi, toObjectErr(err, minioMetaMultipartBucket, partPath)
|
|
}
|
|
|
|
// Serialize concurrent part uploads.
|
|
partIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID, strconv.Itoa(partID)))
|
|
plkctx, err := partIDLock.GetLock(ctx, globalOperationTimeout)
|
|
if err != nil {
|
|
return PartInfo{}, err
|
|
}
|
|
|
|
ctx = plkctx.Context()
|
|
defer partIDLock.Unlock(plkctx)
|
|
|
|
onlineDisks, err = er.renamePart(ctx, onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, partFI, writeQuorum)
|
|
if err != nil {
|
|
if errors.Is(err, errFileNotFound) {
|
|
// An in-quorum errFileNotFound means that client stream
|
|
// prematurely closed and we do not find any xl.meta or
|
|
// part.1's - in such a scenario we must return as if client
|
|
// disconnected. This means that erasure.Encode() CreateFile()
|
|
// did not do anything.
|
|
return pi, IncompleteBody{Bucket: bucket, Object: object}
|
|
}
|
|
|
|
return pi, toObjectErr(err, minioMetaMultipartBucket, partPath)
|
|
}
|
|
|
|
// Return success.
|
|
return PartInfo{
|
|
PartNumber: partInfo.Number,
|
|
ETag: partInfo.ETag,
|
|
LastModified: partInfo.ModTime,
|
|
Size: partInfo.Size,
|
|
ActualSize: partInfo.ActualSize,
|
|
ChecksumCRC32: partInfo.Checksums["CRC32"],
|
|
ChecksumCRC32C: partInfo.Checksums["CRC32C"],
|
|
ChecksumSHA1: partInfo.Checksums["SHA1"],
|
|
ChecksumSHA256: partInfo.Checksums["SHA256"],
|
|
}, nil
|
|
}
|
|
|
|
// GetMultipartInfo returns multipart metadata uploaded during newMultipartUpload, used
|
|
// by callers to verify object states
|
|
// - encrypted
|
|
// - compressed
|
|
// Does not contain currently uploaded parts by design.
|
|
func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (MultipartInfo, error) {
|
|
if !opts.NoAuditLog {
|
|
auditObjectErasureSet(ctx, "GetMultipartInfo", object, &er)
|
|
}
|
|
|
|
result := MultipartInfo{
|
|
Bucket: bucket,
|
|
Object: object,
|
|
UploadID: uploadID,
|
|
}
|
|
|
|
fi, _, err := er.checkUploadIDExists(ctx, bucket, object, uploadID, false)
|
|
if err != nil {
|
|
if errors.Is(err, errVolumeNotFound) {
|
|
return result, toObjectErr(err, bucket)
|
|
}
|
|
return result, toObjectErr(err, bucket, object, uploadID)
|
|
}
|
|
|
|
result.UserDefined = cloneMSS(fi.Metadata)
|
|
return result, nil
|
|
}
|
|
|
|
func (er erasureObjects) listParts(ctx context.Context, onlineDisks []StorageAPI, partPath string, readQuorum int) ([]int, error) {
|
|
g := errgroup.WithNErrs(len(onlineDisks))
|
|
|
|
objectParts := make([][]string, len(onlineDisks))
|
|
// List uploaded parts from drives.
|
|
for index := range onlineDisks {
|
|
index := index
|
|
g.Go(func() (err error) {
|
|
if onlineDisks[index] == nil {
|
|
return errDiskNotFound
|
|
}
|
|
objectParts[index], err = onlineDisks[index].ListDir(ctx, minioMetaMultipartBucket, minioMetaMultipartBucket, partPath, -1)
|
|
return err
|
|
}, index)
|
|
}
|
|
|
|
if err := reduceReadQuorumErrs(ctx, g.Wait(), objectOpIgnoredErrs, readQuorum); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
partQuorumMap := make(map[int]int)
|
|
for _, driveParts := range objectParts {
|
|
partsWithMetaCount := make(map[int]int, len(driveParts))
|
|
// part files can be either part.N or part.N.meta
|
|
for _, partPath := range driveParts {
|
|
var partNum int
|
|
if _, err := fmt.Sscanf(partPath, "part.%d", &partNum); err == nil {
|
|
partsWithMetaCount[partNum]++
|
|
continue
|
|
}
|
|
if _, err := fmt.Sscanf(partPath, "part.%d.meta", &partNum); err == nil {
|
|
partsWithMetaCount[partNum]++
|
|
}
|
|
}
|
|
// Include only part.N.meta files with corresponding part.N
|
|
for partNum, cnt := range partsWithMetaCount {
|
|
if cnt < 2 {
|
|
continue
|
|
}
|
|
partQuorumMap[partNum]++
|
|
}
|
|
}
|
|
|
|
var partNums []int
|
|
for partNum, count := range partQuorumMap {
|
|
if count < readQuorum {
|
|
continue
|
|
}
|
|
partNums = append(partNums, partNum)
|
|
}
|
|
|
|
sort.Ints(partNums)
|
|
return partNums, nil
|
|
}
|
|
|
|
// ListObjectParts - lists all previously uploaded parts for a given
|
|
// object and uploadID. Takes additional input of part-number-marker
|
|
// to indicate where the listing should begin from.
|
|
//
|
|
// Implements S3 compatible ListObjectParts API. The resulting
|
|
// ListPartsInfo structure is marshaled directly into XML and
|
|
// replied back to the client.
|
|
func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker, maxParts int, opts ObjectOptions) (result ListPartsInfo, err error) {
|
|
if !opts.NoAuditLog {
|
|
auditObjectErasureSet(ctx, "ListObjectParts", object, &er)
|
|
}
|
|
|
|
fi, _, err := er.checkUploadIDExists(ctx, bucket, object, uploadID, false)
|
|
if err != nil {
|
|
return result, toObjectErr(err, bucket, object, uploadID)
|
|
}
|
|
|
|
uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)
|
|
if partNumberMarker < 0 {
|
|
partNumberMarker = 0
|
|
}
|
|
|
|
// Limit output to maxPartsList.
|
|
if maxParts > maxPartsList {
|
|
maxParts = maxPartsList
|
|
}
|
|
|
|
// Populate the result stub.
|
|
result.Bucket = bucket
|
|
result.Object = object
|
|
result.UploadID = uploadID
|
|
result.MaxParts = maxParts
|
|
result.PartNumberMarker = partNumberMarker
|
|
result.UserDefined = cloneMSS(fi.Metadata)
|
|
result.ChecksumAlgorithm = fi.Metadata[hash.MinIOMultipartChecksum]
|
|
|
|
if maxParts == 0 {
|
|
return result, nil
|
|
}
|
|
|
|
onlineDisks := er.getDisks()
|
|
readQuorum := fi.ReadQuorum(er.defaultRQuorum())
|
|
// Read Part info for all parts
|
|
partPath := pathJoin(uploadIDPath, fi.DataDir) + SlashSeparator
|
|
|
|
// List parts in quorum
|
|
partNums, err := er.listParts(ctx, onlineDisks, partPath, readQuorum)
|
|
if err != nil {
|
|
// This means that fi.DataDir, is not yet populated so we
|
|
// return an empty response.
|
|
if errors.Is(err, errFileNotFound) {
|
|
return result, nil
|
|
}
|
|
return result, toObjectErr(err, bucket, object, uploadID)
|
|
}
|
|
|
|
if len(partNums) == 0 {
|
|
return result, nil
|
|
}
|
|
|
|
start := objectPartIndexNums(partNums, partNumberMarker)
|
|
if start != -1 {
|
|
partNums = partNums[start+1:]
|
|
}
|
|
|
|
result.Parts = make([]PartInfo, 0, len(partNums))
|
|
partMetaPaths := make([]string, len(partNums))
|
|
for i, part := range partNums {
|
|
partMetaPaths[i] = pathJoin(partPath, fmt.Sprintf("part.%d.meta", part))
|
|
}
|
|
|
|
// Read parts in quorum
|
|
objParts, err := readParts(ctx, onlineDisks, minioMetaMultipartBucket, partMetaPaths,
|
|
partNums, readQuorum)
|
|
if err != nil {
|
|
return result, toObjectErr(err, bucket, object, uploadID)
|
|
}
|
|
|
|
count := maxParts
|
|
for _, objPart := range objParts {
|
|
result.Parts = append(result.Parts, PartInfo{
|
|
PartNumber: objPart.Number,
|
|
LastModified: objPart.ModTime,
|
|
ETag: objPart.ETag,
|
|
Size: objPart.Size,
|
|
ActualSize: objPart.ActualSize,
|
|
ChecksumCRC32: objPart.Checksums["CRC32"],
|
|
ChecksumCRC32C: objPart.Checksums["CRC32C"],
|
|
ChecksumSHA1: objPart.Checksums["SHA1"],
|
|
ChecksumSHA256: objPart.Checksums["SHA256"],
|
|
})
|
|
count--
|
|
if count == 0 {
|
|
break
|
|
}
|
|
}
|
|
|
|
if len(objParts) > len(result.Parts) {
|
|
result.IsTruncated = true
|
|
// Make sure to fill next part number marker if IsTruncated is true for subsequent listing.
|
|
result.NextPartNumberMarker = result.Parts[len(result.Parts)-1].PartNumber
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func readParts(ctx context.Context, disks []StorageAPI, bucket string, partMetaPaths []string, partNumbers []int, readQuorum int) ([]ObjectPartInfo, error) {
|
|
g := errgroup.WithNErrs(len(disks))
|
|
|
|
objectPartInfos := make([][]*ObjectPartInfo, len(disks))
|
|
// Rename file on all underlying storage disks.
|
|
for index := range disks {
|
|
index := index
|
|
g.Go(func() (err error) {
|
|
if disks[index] == nil {
|
|
return errDiskNotFound
|
|
}
|
|
objectPartInfos[index], err = disks[index].ReadParts(ctx, bucket, partMetaPaths...)
|
|
return err
|
|
}, index)
|
|
}
|
|
|
|
if err := reduceReadQuorumErrs(ctx, g.Wait(), objectOpIgnoredErrs, readQuorum); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
partInfosInQuorum := make([]ObjectPartInfo, len(partMetaPaths))
|
|
for pidx := range partMetaPaths {
|
|
// partMetaQuorumMap uses
|
|
// - path/to/part.N as key to collate errors from failed drives.
|
|
// - part ETag to collate part metadata
|
|
partMetaQuorumMap := make(map[string]int, len(partNumbers))
|
|
var pinfos []*ObjectPartInfo
|
|
for idx := range disks {
|
|
if len(objectPartInfos[idx]) != len(partMetaPaths) {
|
|
partMetaQuorumMap[partMetaPaths[pidx]]++
|
|
continue
|
|
}
|
|
|
|
pinfo := objectPartInfos[idx][pidx]
|
|
if pinfo != nil && pinfo.ETag != "" {
|
|
pinfos = append(pinfos, pinfo)
|
|
partMetaQuorumMap[pinfo.ETag]++
|
|
continue
|
|
}
|
|
partMetaQuorumMap[partMetaPaths[pidx]]++
|
|
}
|
|
|
|
var maxQuorum int
|
|
var maxETag string
|
|
var maxPartMeta string
|
|
for etag, quorum := range partMetaQuorumMap {
|
|
if maxQuorum < quorum {
|
|
maxQuorum = quorum
|
|
maxETag = etag
|
|
maxPartMeta = etag
|
|
}
|
|
}
|
|
// found is a representative ObjectPartInfo which either has the maximally occurring ETag or an error.
|
|
var found *ObjectPartInfo
|
|
for _, pinfo := range pinfos {
|
|
if pinfo == nil {
|
|
continue
|
|
}
|
|
if maxETag != "" && pinfo.ETag == maxETag {
|
|
found = pinfo
|
|
break
|
|
}
|
|
if pinfo.ETag == "" && maxPartMeta != "" && path.Base(maxPartMeta) == fmt.Sprintf("part.%d.meta", pinfo.Number) {
|
|
found = pinfo
|
|
break
|
|
}
|
|
}
|
|
|
|
if found != nil && found.ETag != "" && partMetaQuorumMap[maxETag] >= readQuorum {
|
|
partInfosInQuorum[pidx] = *found
|
|
continue
|
|
}
|
|
partInfosInQuorum[pidx] = ObjectPartInfo{
|
|
Number: partNumbers[pidx],
|
|
Error: InvalidPart{
|
|
PartNumber: partNumbers[pidx],
|
|
}.Error(),
|
|
}
|
|
|
|
}
|
|
return partInfosInQuorum, nil
|
|
}
|
|
|
|
func objPartToPartErr(part ObjectPartInfo) error {
|
|
if strings.Contains(part.Error, "file not found") {
|
|
return InvalidPart{PartNumber: part.Number}
|
|
}
|
|
if strings.Contains(part.Error, "Specified part could not be found") {
|
|
return InvalidPart{PartNumber: part.Number}
|
|
}
|
|
if strings.Contains(part.Error, errErasureReadQuorum.Error()) {
|
|
return errErasureReadQuorum
|
|
}
|
|
return errors.New(part.Error)
|
|
}
|
|
|
|
// CompleteMultipartUpload - completes an ongoing multipart
|
|
// transaction after receiving all the parts indicated by the client.
|
|
// Returns an md5sum calculated by concatenating all the individual
|
|
// md5sums of all the parts.
|
|
//
|
|
// Implements S3 compatible Complete multipart API.
|
|
func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket string, object string, uploadID string, parts []CompletePart, opts ObjectOptions) (oi ObjectInfo, err error) {
|
|
if !opts.NoAuditLog {
|
|
auditObjectErasureSet(ctx, "CompleteMultipartUpload", object, &er)
|
|
}
|
|
|
|
if opts.CheckPrecondFn != nil {
|
|
if !opts.NoLock {
|
|
ns := er.NewNSLock(bucket, object)
|
|
lkctx, err := ns.GetLock(ctx, globalOperationTimeout)
|
|
if err != nil {
|
|
return ObjectInfo{}, err
|
|
}
|
|
ctx = lkctx.Context()
|
|
defer ns.Unlock(lkctx)
|
|
opts.NoLock = true
|
|
}
|
|
|
|
obj, err := er.getObjectInfo(ctx, bucket, object, opts)
|
|
if err == nil && opts.CheckPrecondFn(obj) {
|
|
return ObjectInfo{}, PreConditionFailed{}
|
|
}
|
|
if err != nil && !isErrVersionNotFound(err) && !isErrObjectNotFound(err) && !isErrReadQuorum(err) {
|
|
return ObjectInfo{}, err
|
|
}
|
|
}
|
|
|
|
fi, partsMetadata, err := er.checkUploadIDExists(ctx, bucket, object, uploadID, true)
|
|
if err != nil {
|
|
if errors.Is(err, errVolumeNotFound) {
|
|
return oi, toObjectErr(err, bucket)
|
|
}
|
|
return oi, toObjectErr(err, bucket, object, uploadID)
|
|
}
|
|
|
|
uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)
|
|
onlineDisks := er.getDisks()
|
|
writeQuorum := fi.WriteQuorum(er.defaultWQuorum())
|
|
readQuorum := fi.ReadQuorum(er.defaultRQuorum())
|
|
|
|
// Read Part info for all parts
|
|
partPath := pathJoin(uploadIDPath, fi.DataDir) + SlashSeparator
|
|
partMetaPaths := make([]string, len(parts))
|
|
partNumbers := make([]int, len(parts))
|
|
for idx, part := range parts {
|
|
partMetaPaths[idx] = pathJoin(partPath, fmt.Sprintf("part.%d.meta", part.PartNumber))
|
|
partNumbers[idx] = part.PartNumber
|
|
}
|
|
|
|
partInfoFiles, err := readParts(ctx, onlineDisks, minioMetaMultipartBucket, partMetaPaths, partNumbers, readQuorum)
|
|
if err != nil {
|
|
return oi, err
|
|
}
|
|
|
|
if len(partInfoFiles) != len(parts) {
|
|
// Should only happen through internal error
|
|
err := fmt.Errorf("unexpected part result count: %d, want %d", len(partInfoFiles), len(parts))
|
|
bugLogIf(ctx, err)
|
|
return oi, toObjectErr(err, bucket, object)
|
|
}
|
|
|
|
// Checksum type set when upload started.
|
|
var checksumType hash.ChecksumType
|
|
if cs := fi.Metadata[hash.MinIOMultipartChecksum]; cs != "" {
|
|
checksumType = hash.NewChecksumType(cs)
|
|
if opts.WantChecksum != nil && !opts.WantChecksum.Type.Is(checksumType) {
|
|
return oi, InvalidArgument{
|
|
Bucket: bucket,
|
|
Object: fi.Name,
|
|
Err: fmt.Errorf("checksum type mismatch"),
|
|
}
|
|
}
|
|
}
|
|
|
|
var checksumCombined []byte
|
|
|
|
// However, in case of encryption, the persisted part ETags don't match
|
|
// what we have sent to the client during PutObjectPart. The reason is
|
|
// that ETags are encrypted. Hence, the client will send a list of complete
|
|
// part ETags of which may not match the ETag of any part. For example
|
|
// ETag (client): 30902184f4e62dd8f98f0aaff810c626
|
|
// ETag (server-internal): 20000f00ce5dc16e3f3b124f586ae1d88e9caa1c598415c2759bbb50e84a59f630902184f4e62dd8f98f0aaff810c626
|
|
//
|
|
// Therefore, we adjust all ETags sent by the client to match what is stored
|
|
// on the backend.
|
|
kind, _ := crypto.IsEncrypted(fi.Metadata)
|
|
|
|
var objectEncryptionKey []byte
|
|
switch kind {
|
|
case crypto.SSEC:
|
|
if checksumType.IsSet() {
|
|
if opts.EncryptFn == nil {
|
|
return oi, crypto.ErrMissingCustomerKey
|
|
}
|
|
baseKey := opts.EncryptFn("", nil)
|
|
if len(baseKey) != 32 {
|
|
return oi, crypto.ErrInvalidCustomerKey
|
|
}
|
|
objectEncryptionKey, err = decryptObjectMeta(baseKey, bucket, object, fi.Metadata)
|
|
if err != nil {
|
|
return oi, err
|
|
}
|
|
}
|
|
case crypto.S3, crypto.S3KMS:
|
|
objectEncryptionKey, err = decryptObjectMeta(nil, bucket, object, fi.Metadata)
|
|
if err != nil {
|
|
return oi, err
|
|
}
|
|
}
|
|
if len(objectEncryptionKey) == 32 {
|
|
var key crypto.ObjectKey
|
|
copy(key[:], objectEncryptionKey)
|
|
opts.EncryptFn = metadataEncrypter(key)
|
|
}
|
|
|
|
for idx, part := range partInfoFiles {
|
|
if part.Error != "" {
|
|
err = objPartToPartErr(part)
|
|
bugLogIf(ctx, err)
|
|
return oi, err
|
|
}
|
|
|
|
if parts[idx].PartNumber != part.Number {
|
|
internalLogIf(ctx, fmt.Errorf("part.%d.meta has incorrect corresponding part number: expected %d, got %d", parts[idx].PartNumber, parts[idx].PartNumber, part.Number))
|
|
return oi, InvalidPart{
|
|
PartNumber: part.Number,
|
|
}
|
|
}
|
|
|
|
// Add the current part.
|
|
fi.AddObjectPart(part.Number, part.ETag, part.Size, part.ActualSize, part.ModTime, part.Index, part.Checksums)
|
|
}
|
|
|
|
// Calculate full object size.
|
|
var objectSize int64
|
|
|
|
// Calculate consolidated actual size.
|
|
var objectActualSize int64
|
|
|
|
// Order online disks in accordance with distribution order.
|
|
// Order parts metadata in accordance with distribution order.
|
|
onlineDisks, partsMetadata = shuffleDisksAndPartsMetadataByIndex(onlineDisks, partsMetadata, fi)
|
|
|
|
// Save current erasure metadata for validation.
|
|
currentFI := fi
|
|
|
|
// Allocate parts similar to incoming slice.
|
|
fi.Parts = make([]ObjectPartInfo, len(parts))
|
|
|
|
// Validate each part and then commit to disk.
|
|
for i, part := range parts {
|
|
partIdx := objectPartIndex(currentFI.Parts, part.PartNumber)
|
|
// All parts should have same part number.
|
|
if partIdx == -1 {
|
|
invp := InvalidPart{
|
|
PartNumber: part.PartNumber,
|
|
GotETag: part.ETag,
|
|
}
|
|
return oi, invp
|
|
}
|
|
expPart := currentFI.Parts[partIdx]
|
|
|
|
// ensure that part ETag is canonicalized to strip off extraneous quotes
|
|
part.ETag = canonicalizeETag(part.ETag)
|
|
expETag := tryDecryptETag(objectEncryptionKey, expPart.ETag, kind == crypto.S3)
|
|
if expETag != part.ETag {
|
|
invp := InvalidPart{
|
|
PartNumber: part.PartNumber,
|
|
ExpETag: expETag,
|
|
GotETag: part.ETag,
|
|
}
|
|
return oi, invp
|
|
}
|
|
|
|
if checksumType.IsSet() {
|
|
crc := expPart.Checksums[checksumType.String()]
|
|
if crc == "" {
|
|
return oi, InvalidPart{
|
|
PartNumber: part.PartNumber,
|
|
}
|
|
}
|
|
wantCS := map[string]string{
|
|
hash.ChecksumCRC32.String(): part.ChecksumCRC32,
|
|
hash.ChecksumCRC32C.String(): part.ChecksumCRC32C,
|
|
hash.ChecksumSHA1.String(): part.ChecksumSHA1,
|
|
hash.ChecksumSHA256.String(): part.ChecksumSHA256,
|
|
}
|
|
if wantCS[checksumType.String()] != crc {
|
|
return oi, InvalidPart{
|
|
PartNumber: part.PartNumber,
|
|
ExpETag: wantCS[checksumType.String()],
|
|
GotETag: crc,
|
|
}
|
|
}
|
|
cs := hash.NewChecksumString(checksumType.String(), crc)
|
|
if !cs.Valid() {
|
|
return oi, InvalidPart{
|
|
PartNumber: part.PartNumber,
|
|
}
|
|
}
|
|
checksumCombined = append(checksumCombined, cs.Raw...)
|
|
}
|
|
|
|
// All parts except the last part has to be at least 5MB.
|
|
if (i < len(parts)-1) && !isMinAllowedPartSize(currentFI.Parts[partIdx].ActualSize) {
|
|
return oi, PartTooSmall{
|
|
PartNumber: part.PartNumber,
|
|
PartSize: expPart.ActualSize,
|
|
PartETag: part.ETag,
|
|
}
|
|
}
|
|
|
|
// Save for total object size.
|
|
objectSize += expPart.Size
|
|
|
|
// Save the consolidated actual size.
|
|
objectActualSize += expPart.ActualSize
|
|
|
|
// Add incoming parts.
|
|
fi.Parts[i] = ObjectPartInfo{
|
|
Number: part.PartNumber,
|
|
Size: expPart.Size,
|
|
ActualSize: expPart.ActualSize,
|
|
ModTime: expPart.ModTime,
|
|
Index: expPart.Index,
|
|
Checksums: nil, // Not transferred since we do not need it.
|
|
}
|
|
}
|
|
|
|
if opts.WantChecksum != nil {
|
|
err := opts.WantChecksum.Matches(checksumCombined, len(parts))
|
|
if err != nil {
|
|
return oi, err
|
|
}
|
|
}
|
|
|
|
// Accept encrypted checksum from incoming request.
|
|
if opts.UserDefined[ReplicationSsecChecksumHeader] != "" {
|
|
if v, err := base64.StdEncoding.DecodeString(opts.UserDefined[ReplicationSsecChecksumHeader]); err == nil {
|
|
fi.Checksum = v
|
|
}
|
|
delete(opts.UserDefined, ReplicationSsecChecksumHeader)
|
|
}
|
|
|
|
if checksumType.IsSet() {
|
|
checksumType |= hash.ChecksumMultipart | hash.ChecksumIncludesMultipart
|
|
var cs *hash.Checksum
|
|
cs = hash.NewChecksumFromData(checksumType, checksumCombined)
|
|
fi.Checksum = cs.AppendTo(nil, checksumCombined)
|
|
if opts.EncryptFn != nil {
|
|
fi.Checksum = opts.EncryptFn("object-checksum", fi.Checksum)
|
|
}
|
|
}
|
|
delete(fi.Metadata, hash.MinIOMultipartChecksum) // Not needed in final object.
|
|
|
|
// Save the final object size and modtime.
|
|
fi.Size = objectSize
|
|
fi.ModTime = opts.MTime
|
|
if opts.MTime.IsZero() {
|
|
fi.ModTime = UTCNow()
|
|
}
|
|
|
|
// Save successfully calculated md5sum.
|
|
// for replica, newMultipartUpload would have already sent the replication ETag
|
|
if fi.Metadata["etag"] == "" {
|
|
if opts.UserDefined["etag"] != "" {
|
|
fi.Metadata["etag"] = opts.UserDefined["etag"]
|
|
} else { // fallback if not already calculated in handler.
|
|
fi.Metadata["etag"] = getCompleteMultipartMD5(parts)
|
|
}
|
|
}
|
|
|
|
// Save the consolidated actual size.
|
|
if opts.ReplicationRequest {
|
|
if v := opts.UserDefined[ReservedMetadataPrefix+"Actual-Object-Size"]; v != "" {
|
|
fi.Metadata[ReservedMetadataPrefix+"actual-size"] = v
|
|
}
|
|
} else {
|
|
fi.Metadata[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(objectActualSize, 10)
|
|
}
|
|
|
|
if opts.DataMovement {
|
|
fi.SetDataMov()
|
|
}
|
|
|
|
// Update all erasure metadata, make sure to not modify fields like
|
|
// checksum which are different on each disks.
|
|
for index := range partsMetadata {
|
|
if partsMetadata[index].IsValid() {
|
|
partsMetadata[index].Size = fi.Size
|
|
partsMetadata[index].ModTime = fi.ModTime
|
|
partsMetadata[index].Metadata = fi.Metadata
|
|
partsMetadata[index].Parts = fi.Parts
|
|
partsMetadata[index].Checksum = fi.Checksum
|
|
partsMetadata[index].Versioned = opts.Versioned || opts.VersionSuspended
|
|
}
|
|
}
|
|
|
|
paths := make([]string, 0, len(currentFI.Parts))
|
|
// Remove parts that weren't present in CompleteMultipartUpload request.
|
|
for _, curpart := range currentFI.Parts {
|
|
paths = append(paths, pathJoin(uploadIDPath, currentFI.DataDir, fmt.Sprintf("part.%d.meta", curpart.Number)))
|
|
|
|
if objectPartIndex(fi.Parts, curpart.Number) == -1 {
|
|
// Delete the missing part files. e.g,
|
|
// Request 1: NewMultipart
|
|
// Request 2: PutObjectPart 1
|
|
// Request 3: PutObjectPart 2
|
|
// Request 4: CompleteMultipartUpload --part 2
|
|
// N.B. 1st part is not present. This part should be removed from the storage.
|
|
paths = append(paths, pathJoin(uploadIDPath, currentFI.DataDir, fmt.Sprintf("part.%d", curpart.Number)))
|
|
}
|
|
}
|
|
|
|
if !opts.NoLock {
|
|
lk := er.NewNSLock(bucket, object)
|
|
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
|
|
if err != nil {
|
|
return ObjectInfo{}, err
|
|
}
|
|
ctx = lkctx.Context()
|
|
defer lk.Unlock(lkctx)
|
|
}
|
|
|
|
er.cleanupMultipartPath(ctx, paths...) // cleanup all part.N.meta, and skipped part.N's before final rename().
|
|
|
|
defer func() {
|
|
if err == nil {
|
|
er.deleteAll(context.Background(), minioMetaMultipartBucket, uploadIDPath)
|
|
}
|
|
}()
|
|
|
|
// Rename the multipart object to final location.
|
|
onlineDisks, versions, oldDataDir, err := renameData(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath,
|
|
partsMetadata, bucket, object, writeQuorum)
|
|
if err != nil {
|
|
return oi, toObjectErr(err, bucket, object, uploadID)
|
|
}
|
|
|
|
if err = er.commitRenameDataDir(ctx, bucket, object, oldDataDir, onlineDisks, writeQuorum); err != nil {
|
|
return ObjectInfo{}, toObjectErr(err, bucket, object, uploadID)
|
|
}
|
|
|
|
if !opts.Speedtest && len(versions) > 0 {
|
|
globalMRFState.addPartialOp(PartialOperation{
|
|
Bucket: bucket,
|
|
Object: object,
|
|
Queued: time.Now(),
|
|
Versions: versions,
|
|
SetIndex: er.setIndex,
|
|
PoolIndex: er.poolIndex,
|
|
})
|
|
}
|
|
|
|
if !opts.Speedtest && len(versions) == 0 {
|
|
// Check if there is any offline disk and add it to the MRF list
|
|
for _, disk := range onlineDisks {
|
|
if disk != nil && disk.IsOnline() {
|
|
continue
|
|
}
|
|
er.addPartial(bucket, object, fi.VersionID)
|
|
break
|
|
}
|
|
}
|
|
|
|
for i := 0; i < len(onlineDisks); i++ {
|
|
if onlineDisks[i] != nil && onlineDisks[i].IsOnline() {
|
|
// Object info is the same in all disks, so we can pick
|
|
// the first meta from online disk
|
|
fi = partsMetadata[i]
|
|
break
|
|
}
|
|
}
|
|
|
|
// we are adding a new version to this object under the namespace lock, so this is the latest version.
|
|
fi.IsLatest = true
|
|
|
|
// Success, return object info.
|
|
return fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended), nil
|
|
}
|
|
|
|
// AbortMultipartUpload - aborts an ongoing multipart operation
|
|
// signified by the input uploadID. This is an atomic operation
|
|
// doesn't require clients to initiate multiple such requests.
|
|
//
|
|
// All parts are purged from all disks and reference to the uploadID
|
|
// would be removed from the system, rollback is not possible on this
|
|
// operation.
|
|
func (er erasureObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (err error) {
|
|
if !opts.NoAuditLog {
|
|
auditObjectErasureSet(ctx, "AbortMultipartUpload", object, &er)
|
|
}
|
|
|
|
// Validates if upload ID exists.
|
|
if _, _, err = er.checkUploadIDExists(ctx, bucket, object, uploadID, false); err != nil {
|
|
if errors.Is(err, errVolumeNotFound) {
|
|
return toObjectErr(err, bucket)
|
|
}
|
|
return toObjectErr(err, bucket, object, uploadID)
|
|
}
|
|
|
|
// Cleanup all uploaded parts.
|
|
er.deleteAll(ctx, minioMetaMultipartBucket, er.getUploadIDDir(bucket, object, uploadID))
|
|
|
|
// Successfully purged.
|
|
return nil
|
|
}
|