mirror of
https://github.com/minio/minio.git
synced 2024-12-25 22:55:54 -05:00
33cee9f38a
Each multipart upload is holding a read lock for the entire upload duration of each part. This makes it impossible for other parts to complete until all currently uploading parts have released their locks. It will also make it impossible for new parts to start as long as the write lock is still being requested, essentially deadlocking uploads until all that may have been granted a read lock has been completed. Refactor to only hold the upload id lock while reading and writing the metadata, but hold a part id lock while the part is being uploaded.
1005 lines
32 KiB
Go
1005 lines
32 KiB
Go
// Copyright (c) 2015-2021 MinIO, Inc.
|
|
//
|
|
// This file is part of MinIO Object Storage stack
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Affero General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"path"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/minio/minio-go/v7/pkg/set"
|
|
xhttp "github.com/minio/minio/internal/http"
|
|
"github.com/minio/minio/internal/logger"
|
|
"github.com/minio/minio/internal/sync/errgroup"
|
|
"github.com/minio/pkg/mimedb"
|
|
)
|
|
|
|
func (er erasureObjects) getUploadIDDir(bucket, object, uploadID string) string {
|
|
return pathJoin(er.getMultipartSHADir(bucket, object), uploadID)
|
|
}
|
|
|
|
func (er erasureObjects) getMultipartSHADir(bucket, object string) string {
|
|
return getSHA256Hash([]byte(pathJoin(bucket, object)))
|
|
}
|
|
|
|
// checkUploadIDExists - verify if a given uploadID exists and is valid.
|
|
func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object, uploadID string) (err error) {
|
|
defer func() {
|
|
if err == errFileNotFound {
|
|
err = errUploadIDNotFound
|
|
}
|
|
}()
|
|
|
|
disks := er.getDisks()
|
|
|
|
// Read metadata associated with the object from all disks.
|
|
metaArr, errs := readAllFileInfo(ctx, disks, minioMetaMultipartBucket, er.getUploadIDDir(bucket, object, uploadID), "", false)
|
|
|
|
readQuorum, _, err := objectQuorumFromMeta(ctx, metaArr, errs, er.defaultParityCount)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil {
|
|
return reducedErr
|
|
}
|
|
|
|
// List all online disks.
|
|
_, modTime, dataDir := listOnlineDisks(disks, metaArr, errs)
|
|
|
|
// Pick latest valid metadata.
|
|
_, err = pickValidFileInfo(ctx, metaArr, modTime, dataDir, readQuorum)
|
|
return err
|
|
}
|
|
|
|
// Removes part given by partName belonging to a mulitpart upload from minioMetaBucket
|
|
func (er erasureObjects) removeObjectPart(bucket, object, uploadID, dataDir string, partNumber int) {
|
|
uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)
|
|
curpartPath := pathJoin(uploadIDPath, dataDir, fmt.Sprintf("part.%d", partNumber))
|
|
storageDisks := er.getDisks()
|
|
|
|
g := errgroup.WithNErrs(len(storageDisks))
|
|
for index, disk := range storageDisks {
|
|
if disk == nil {
|
|
continue
|
|
}
|
|
index := index
|
|
g.Go(func() error {
|
|
// Ignoring failure to remove parts that weren't present in CompleteMultipartUpload
|
|
// requests. xl.meta is the authoritative source of truth on which parts constitute
|
|
// the object. The presence of parts that don't belong in the object doesn't affect correctness.
|
|
_ = storageDisks[index].Delete(context.TODO(), minioMetaMultipartBucket, curpartPath, false)
|
|
return nil
|
|
}, index)
|
|
}
|
|
g.Wait()
|
|
}
|
|
|
|
// Clean-up the old multipart uploads. Should be run in a Go routine.
|
|
func (er erasureObjects) cleanupStaleUploads(ctx context.Context, expiry time.Duration) {
|
|
// run multiple cleanup's local to this server.
|
|
var wg sync.WaitGroup
|
|
for _, disk := range er.getLoadBalancedLocalDisks() {
|
|
if disk != nil {
|
|
wg.Add(1)
|
|
go func(disk StorageAPI) {
|
|
defer wg.Done()
|
|
er.cleanupStaleUploadsOnDisk(ctx, disk, expiry)
|
|
}(disk)
|
|
}
|
|
}
|
|
wg.Wait()
|
|
}
|
|
|
|
func (er erasureObjects) renameAll(ctx context.Context, bucket, prefix string) {
|
|
var wg sync.WaitGroup
|
|
for _, disk := range er.getDisks() {
|
|
if disk == nil {
|
|
continue
|
|
}
|
|
wg.Add(1)
|
|
go func(disk StorageAPI) {
|
|
defer wg.Done()
|
|
disk.RenameFile(ctx, bucket, prefix, minioMetaTmpBucket, mustGetUUID())
|
|
}(disk)
|
|
}
|
|
wg.Wait()
|
|
}
|
|
|
|
func (er erasureObjects) deleteAll(ctx context.Context, bucket, prefix string) {
|
|
var wg sync.WaitGroup
|
|
for _, disk := range er.getDisks() {
|
|
if disk == nil {
|
|
continue
|
|
}
|
|
wg.Add(1)
|
|
go func(disk StorageAPI) {
|
|
defer wg.Done()
|
|
disk.Delete(ctx, bucket, prefix, true)
|
|
}(disk)
|
|
}
|
|
wg.Wait()
|
|
}
|
|
|
|
// Remove the old multipart uploads on the given disk.
|
|
func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk StorageAPI, expiry time.Duration) {
|
|
now := time.Now()
|
|
diskPath := disk.Endpoint().Path
|
|
|
|
readDirFn(pathJoin(diskPath, minioMetaMultipartBucket), func(shaDir string, typ os.FileMode) error {
|
|
return readDirFn(pathJoin(diskPath, minioMetaMultipartBucket, shaDir), func(uploadIDDir string, typ os.FileMode) error {
|
|
uploadIDPath := pathJoin(shaDir, uploadIDDir)
|
|
fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, uploadIDPath, "", false)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
wait := er.deletedCleanupSleeper.Timer(ctx)
|
|
if now.Sub(fi.ModTime) > expiry {
|
|
er.renameAll(ctx, minioMetaMultipartBucket, uploadIDPath)
|
|
}
|
|
wait()
|
|
return nil
|
|
})
|
|
})
|
|
|
|
readDirFn(pathJoin(diskPath, minioMetaTmpBucket), func(tmpDir string, typ os.FileMode) error {
|
|
if tmpDir == ".trash/" { // do not remove .trash/ here, it has its own routines
|
|
return nil
|
|
}
|
|
vi, err := disk.StatVol(ctx, pathJoin(minioMetaTmpBucket, tmpDir))
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
wait := er.deletedCleanupSleeper.Timer(ctx)
|
|
if now.Sub(vi.Created) > expiry {
|
|
er.deleteAll(ctx, minioMetaTmpBucket, tmpDir)
|
|
}
|
|
wait()
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// ListMultipartUploads - lists all the pending multipart
|
|
// uploads for a particular object in a bucket.
|
|
//
|
|
// Implements minimal S3 compatible ListMultipartUploads API. We do
|
|
// not support prefix based listing, this is a deliberate attempt
|
|
// towards simplification of multipart APIs.
|
|
// The resulting ListMultipartsInfo structure is unmarshalled directly as XML.
|
|
func (er erasureObjects) ListMultipartUploads(ctx context.Context, bucket, object, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, err error) {
|
|
result.MaxUploads = maxUploads
|
|
result.KeyMarker = keyMarker
|
|
result.Prefix = object
|
|
result.Delimiter = delimiter
|
|
|
|
var uploadIDs []string
|
|
var disk StorageAPI
|
|
for _, disk = range er.getLoadBalancedDisks(true) {
|
|
uploadIDs, err = disk.ListDir(ctx, minioMetaMultipartBucket, er.getMultipartSHADir(bucket, object), -1)
|
|
if err != nil {
|
|
if err == errDiskNotFound {
|
|
continue
|
|
}
|
|
if err == errFileNotFound {
|
|
return result, nil
|
|
}
|
|
logger.LogIf(ctx, err)
|
|
return result, toObjectErr(err, bucket, object)
|
|
}
|
|
break
|
|
}
|
|
|
|
for i := range uploadIDs {
|
|
uploadIDs[i] = strings.TrimSuffix(uploadIDs[i], SlashSeparator)
|
|
}
|
|
|
|
// S3 spec says uploadIDs should be sorted based on initiated time, we need
|
|
// to read the metadata entry.
|
|
var uploads []MultipartInfo
|
|
|
|
populatedUploadIds := set.NewStringSet()
|
|
|
|
for _, uploadID := range uploadIDs {
|
|
if populatedUploadIds.Contains(uploadID) {
|
|
continue
|
|
}
|
|
fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, pathJoin(er.getUploadIDDir(bucket, object, uploadID)), "", false)
|
|
if err != nil {
|
|
return result, toObjectErr(err, bucket, object)
|
|
}
|
|
populatedUploadIds.Add(uploadID)
|
|
uploads = append(uploads, MultipartInfo{
|
|
Object: object,
|
|
UploadID: uploadID,
|
|
Initiated: fi.ModTime,
|
|
})
|
|
}
|
|
|
|
sort.Slice(uploads, func(i int, j int) bool {
|
|
return uploads[i].Initiated.Before(uploads[j].Initiated)
|
|
})
|
|
|
|
uploadIndex := 0
|
|
if uploadIDMarker != "" {
|
|
for uploadIndex < len(uploads) {
|
|
if uploads[uploadIndex].UploadID != uploadIDMarker {
|
|
uploadIndex++
|
|
continue
|
|
}
|
|
if uploads[uploadIndex].UploadID == uploadIDMarker {
|
|
uploadIndex++
|
|
break
|
|
}
|
|
uploadIndex++
|
|
}
|
|
}
|
|
for uploadIndex < len(uploads) {
|
|
result.Uploads = append(result.Uploads, uploads[uploadIndex])
|
|
result.NextUploadIDMarker = uploads[uploadIndex].UploadID
|
|
uploadIndex++
|
|
if len(result.Uploads) == maxUploads {
|
|
break
|
|
}
|
|
}
|
|
|
|
result.IsTruncated = uploadIndex < len(uploads)
|
|
|
|
if !result.IsTruncated {
|
|
result.NextKeyMarker = ""
|
|
result.NextUploadIDMarker = ""
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// newMultipartUpload - wrapper for initializing a new multipart
|
|
// request; returns a unique upload id.
|
|
//
|
|
// Internally this function creates 'uploads.json' associated for the
|
|
// incoming object at
|
|
// '.minio.sys/multipart/bucket/object/uploads.json' on all the
|
|
// disks. `uploads.json` carries metadata regarding on-going multipart
|
|
// operation(s) on the object.
|
|
func (er erasureObjects) newMultipartUpload(ctx context.Context, bucket string, object string, opts ObjectOptions) (string, error) {
|
|
onlineDisks := er.getDisks()
|
|
parityDrives := globalStorageClass.GetParityForSC(opts.UserDefined[xhttp.AmzStorageClass])
|
|
if parityDrives <= 0 {
|
|
parityDrives = er.defaultParityCount
|
|
}
|
|
|
|
parityOrig := parityDrives
|
|
for _, disk := range onlineDisks {
|
|
if parityDrives >= len(onlineDisks)/2 {
|
|
parityDrives = len(onlineDisks) / 2
|
|
break
|
|
}
|
|
if disk == nil {
|
|
parityDrives++
|
|
continue
|
|
}
|
|
di, err := disk.DiskInfo(ctx)
|
|
if err != nil || di.ID == "" {
|
|
parityDrives++
|
|
}
|
|
}
|
|
if parityOrig != parityDrives {
|
|
opts.UserDefined[minIOErasureUpgraded] = strconv.Itoa(parityOrig) + "->" + strconv.Itoa(parityDrives)
|
|
}
|
|
|
|
dataDrives := len(onlineDisks) - parityDrives
|
|
|
|
// we now know the number of blocks this object needs for data and parity.
|
|
// establish the writeQuorum using this data
|
|
writeQuorum := dataDrives
|
|
if dataDrives == parityDrives {
|
|
writeQuorum++
|
|
}
|
|
|
|
// Initialize parts metadata
|
|
partsMetadata := make([]FileInfo, len(onlineDisks))
|
|
|
|
fi := newFileInfo(pathJoin(bucket, object), dataDrives, parityDrives)
|
|
fi.VersionID = opts.VersionID
|
|
if opts.Versioned && fi.VersionID == "" {
|
|
fi.VersionID = mustGetUUID()
|
|
}
|
|
fi.DataDir = mustGetUUID()
|
|
|
|
// Initialize erasure metadata.
|
|
for index := range partsMetadata {
|
|
partsMetadata[index] = fi
|
|
}
|
|
|
|
// Guess content-type from the extension if possible.
|
|
if opts.UserDefined["content-type"] == "" {
|
|
opts.UserDefined["content-type"] = mimedb.TypeByExtension(path.Ext(object))
|
|
}
|
|
|
|
modTime := opts.MTime
|
|
if opts.MTime.IsZero() {
|
|
modTime = UTCNow()
|
|
}
|
|
|
|
onlineDisks, partsMetadata = shuffleDisksAndPartsMetadata(onlineDisks, partsMetadata, fi)
|
|
|
|
// Fill all the necessary metadata.
|
|
// Update `xl.meta` content on each disks.
|
|
for index := range partsMetadata {
|
|
partsMetadata[index].Metadata = opts.UserDefined
|
|
partsMetadata[index].ModTime = modTime
|
|
}
|
|
|
|
uploadID := mustGetUUID()
|
|
uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)
|
|
|
|
// Write updated `xl.meta` to all disks.
|
|
if _, err := writeUniqueFileInfo(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum); err != nil {
|
|
return "", toObjectErr(err, minioMetaMultipartBucket, uploadIDPath)
|
|
}
|
|
|
|
// Return success.
|
|
return uploadID, nil
|
|
}
|
|
|
|
// NewMultipartUpload - initialize a new multipart upload, returns a
|
|
// unique id. The unique id returned here is of UUID form, for each
|
|
// subsequent request each UUID is unique.
|
|
//
|
|
// Implements S3 compatible initiate multipart API.
|
|
func (er erasureObjects) NewMultipartUpload(ctx context.Context, bucket, object string, opts ObjectOptions) (string, error) {
|
|
// No metadata is set, allocate a new one.
|
|
if opts.UserDefined == nil {
|
|
opts.UserDefined = make(map[string]string)
|
|
}
|
|
return er.newMultipartUpload(ctx, bucket, object, opts)
|
|
}
|
|
|
|
// CopyObjectPart - reads incoming stream and internally erasure codes
|
|
// them. This call is similar to put object part operation but the source
|
|
// data is read from an existing object.
|
|
//
|
|
// Implements S3 compatible Upload Part Copy API.
|
|
func (er erasureObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject, uploadID string, partID int, startOffset int64, length int64, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (pi PartInfo, e error) {
|
|
partInfo, err := er.PutObjectPart(ctx, dstBucket, dstObject, uploadID, partID, NewPutObjReader(srcInfo.Reader), dstOpts)
|
|
if err != nil {
|
|
return pi, toObjectErr(err, dstBucket, dstObject)
|
|
}
|
|
|
|
// Success.
|
|
return partInfo, nil
|
|
}
|
|
|
|
// PutObjectPart - reads incoming stream and internally erasure codes
|
|
// them. This call is similar to single put operation but it is part
|
|
// of the multipart transaction.
|
|
//
|
|
// Implements S3 compatible Upload Part API.
|
|
func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *PutObjReader, opts ObjectOptions) (pi PartInfo, err error) {
|
|
// Write lock for this part ID.
|
|
// Held throughout the operation.
|
|
partIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID, strconv.Itoa(partID)))
|
|
plkctx, err := partIDLock.GetLock(ctx, globalOperationTimeout)
|
|
if err != nil {
|
|
return PartInfo{}, err
|
|
}
|
|
pctx := plkctx.Context()
|
|
defer partIDLock.Unlock(plkctx.Cancel)
|
|
|
|
// Read lock for upload id.
|
|
// Only held while reading the upload metadata.
|
|
uploadIDRLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
|
rlkctx, err := uploadIDRLock.GetRLock(ctx, globalOperationTimeout)
|
|
if err != nil {
|
|
return PartInfo{}, err
|
|
}
|
|
rctx := rlkctx.Context()
|
|
defer func() {
|
|
if uploadIDRLock != nil {
|
|
uploadIDRLock.RUnlock(rlkctx.Cancel)
|
|
}
|
|
}()
|
|
|
|
data := r.Reader
|
|
// Validate input data size and it can never be less than zero.
|
|
if data.Size() < -1 {
|
|
logger.LogIf(rctx, errInvalidArgument, logger.Application)
|
|
return pi, toObjectErr(errInvalidArgument)
|
|
}
|
|
|
|
var partsMetadata []FileInfo
|
|
var errs []error
|
|
uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)
|
|
|
|
// Validates if upload ID exists.
|
|
if err = er.checkUploadIDExists(rctx, bucket, object, uploadID); err != nil {
|
|
return pi, toObjectErr(err, bucket, object, uploadID)
|
|
}
|
|
|
|
storageDisks := er.getDisks()
|
|
|
|
// Read metadata associated with the object from all disks.
|
|
partsMetadata, errs = readAllFileInfo(rctx, storageDisks, minioMetaMultipartBucket,
|
|
uploadIDPath, "", false)
|
|
|
|
// Unlock upload id locks before, so others can get it.
|
|
uploadIDRLock.RUnlock(rlkctx.Cancel)
|
|
uploadIDRLock = nil
|
|
|
|
// get Quorum for this object
|
|
_, writeQuorum, err := objectQuorumFromMeta(pctx, partsMetadata, errs, er.defaultParityCount)
|
|
if err != nil {
|
|
return pi, toObjectErr(err, bucket, object)
|
|
}
|
|
|
|
reducedErr := reduceWriteQuorumErrs(pctx, errs, objectOpIgnoredErrs, writeQuorum)
|
|
if reducedErr == errErasureWriteQuorum {
|
|
return pi, toObjectErr(reducedErr, bucket, object)
|
|
}
|
|
|
|
// List all online disks.
|
|
onlineDisks, modTime, dataDir := listOnlineDisks(storageDisks, partsMetadata, errs)
|
|
|
|
// Pick one from the first valid metadata.
|
|
fi, err := pickValidFileInfo(pctx, partsMetadata, modTime, dataDir, writeQuorum)
|
|
if err != nil {
|
|
return pi, err
|
|
}
|
|
|
|
onlineDisks = shuffleDisks(onlineDisks, fi.Erasure.Distribution)
|
|
|
|
// Need a unique name for the part being written in minioMetaBucket to
|
|
// accommodate concurrent PutObjectPart requests
|
|
|
|
partSuffix := fmt.Sprintf("part.%d", partID)
|
|
tmpPart := mustGetUUID()
|
|
tmpPartPath := pathJoin(tmpPart, partSuffix)
|
|
|
|
// Delete the temporary object part. If PutObjectPart succeeds there would be nothing to delete.
|
|
var online int
|
|
defer func() {
|
|
if online != len(onlineDisks) {
|
|
er.deleteObject(context.Background(), minioMetaTmpBucket, tmpPart, writeQuorum)
|
|
}
|
|
}()
|
|
|
|
erasure, err := NewErasure(pctx, fi.Erasure.DataBlocks, fi.Erasure.ParityBlocks, fi.Erasure.BlockSize)
|
|
if err != nil {
|
|
return pi, toObjectErr(err, bucket, object)
|
|
}
|
|
|
|
// Fetch buffer for I/O, returns from the pool if not allocates a new one and returns.
|
|
var buffer []byte
|
|
switch size := data.Size(); {
|
|
case size == 0:
|
|
buffer = make([]byte, 1) // Allocate atleast a byte to reach EOF
|
|
case size == -1:
|
|
if size := data.ActualSize(); size > 0 && size < fi.Erasure.BlockSize {
|
|
buffer = make([]byte, data.ActualSize()+256, data.ActualSize()*2+512)
|
|
} else {
|
|
buffer = er.bp.Get()
|
|
defer er.bp.Put(buffer)
|
|
}
|
|
case size >= fi.Erasure.BlockSize:
|
|
buffer = er.bp.Get()
|
|
defer er.bp.Put(buffer)
|
|
case size < fi.Erasure.BlockSize:
|
|
// No need to allocate fully fi.Erasure.BlockSize buffer if the incoming data is smaller.
|
|
buffer = make([]byte, size, 2*size+int64(fi.Erasure.ParityBlocks+fi.Erasure.DataBlocks-1))
|
|
}
|
|
|
|
if len(buffer) > int(fi.Erasure.BlockSize) {
|
|
buffer = buffer[:fi.Erasure.BlockSize]
|
|
}
|
|
writers := make([]io.Writer, len(onlineDisks))
|
|
for i, disk := range onlineDisks {
|
|
if disk == nil {
|
|
continue
|
|
}
|
|
writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, tmpPartPath, erasure.ShardFileSize(data.Size()), DefaultBitrotAlgorithm, erasure.ShardSize())
|
|
}
|
|
|
|
n, err := erasure.Encode(pctx, data, writers, buffer, writeQuorum)
|
|
closeBitrotWriters(writers)
|
|
if err != nil {
|
|
return pi, toObjectErr(err, bucket, object)
|
|
}
|
|
|
|
// Should return IncompleteBody{} error when reader has fewer bytes
|
|
// than specified in request header.
|
|
if n < data.Size() {
|
|
return pi, IncompleteBody{Bucket: bucket, Object: object}
|
|
}
|
|
|
|
for i := range writers {
|
|
if writers[i] == nil {
|
|
onlineDisks[i] = nil
|
|
}
|
|
}
|
|
|
|
// Acquire write lock to update metadata.
|
|
uploadIDWLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
|
wlkctx, err := uploadIDWLock.GetLock(pctx, globalOperationTimeout)
|
|
if err != nil {
|
|
return PartInfo{}, err
|
|
}
|
|
wctx := wlkctx.Context()
|
|
defer uploadIDWLock.Unlock(wlkctx.Cancel)
|
|
|
|
// Validates if upload ID exists.
|
|
if err = er.checkUploadIDExists(wctx, bucket, object, uploadID); err != nil {
|
|
return pi, toObjectErr(err, bucket, object, uploadID)
|
|
}
|
|
|
|
// Rename temporary part file to its final location.
|
|
partPath := pathJoin(uploadIDPath, fi.DataDir, partSuffix)
|
|
onlineDisks, err = rename(wctx, onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, false, writeQuorum, nil)
|
|
if err != nil {
|
|
return pi, toObjectErr(err, minioMetaMultipartBucket, partPath)
|
|
}
|
|
|
|
// Read metadata again because it might be updated with parallel upload of another part.
|
|
partsMetadata, errs = readAllFileInfo(wctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, "", false)
|
|
reducedErr = reduceWriteQuorumErrs(wctx, errs, objectOpIgnoredErrs, writeQuorum)
|
|
if reducedErr == errErasureWriteQuorum {
|
|
return pi, toObjectErr(reducedErr, bucket, object)
|
|
}
|
|
|
|
// Get current highest version based on re-read partsMetadata.
|
|
onlineDisks, modTime, dataDir = listOnlineDisks(onlineDisks, partsMetadata, errs)
|
|
|
|
// Pick one from the first valid metadata.
|
|
fi, err = pickValidFileInfo(wctx, partsMetadata, modTime, dataDir, writeQuorum)
|
|
if err != nil {
|
|
return pi, err
|
|
}
|
|
|
|
// Once part is successfully committed, proceed with updating erasure metadata.
|
|
fi.ModTime = UTCNow()
|
|
|
|
md5hex := r.MD5CurrentHexString()
|
|
|
|
// Add the current part.
|
|
fi.AddObjectPart(partID, md5hex, n, data.ActualSize())
|
|
|
|
for i, disk := range onlineDisks {
|
|
if disk == OfflineDisk {
|
|
continue
|
|
}
|
|
partsMetadata[i].Size = fi.Size
|
|
partsMetadata[i].ModTime = fi.ModTime
|
|
partsMetadata[i].Parts = fi.Parts
|
|
partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{
|
|
PartNumber: partID,
|
|
Algorithm: DefaultBitrotAlgorithm,
|
|
Hash: bitrotWriterSum(writers[i]),
|
|
})
|
|
}
|
|
|
|
// Writes update `xl.meta` format for each disk.
|
|
if _, err = writeUniqueFileInfo(wctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum); err != nil {
|
|
return pi, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath)
|
|
}
|
|
|
|
online = countOnlineDisks(onlineDisks)
|
|
|
|
// Return success.
|
|
return PartInfo{
|
|
PartNumber: partID,
|
|
ETag: md5hex,
|
|
LastModified: fi.ModTime,
|
|
Size: n,
|
|
ActualSize: data.ActualSize(),
|
|
}, nil
|
|
}
|
|
|
|
// GetMultipartInfo returns multipart metadata uploaded during newMultipartUpload, used
|
|
// by callers to verify object states
|
|
// - encrypted
|
|
// - compressed
|
|
func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (MultipartInfo, error) {
|
|
result := MultipartInfo{
|
|
Bucket: bucket,
|
|
Object: object,
|
|
UploadID: uploadID,
|
|
}
|
|
|
|
uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
|
lkctx, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout)
|
|
if err != nil {
|
|
return MultipartInfo{}, err
|
|
}
|
|
ctx = lkctx.Context()
|
|
defer uploadIDLock.RUnlock(lkctx.Cancel)
|
|
|
|
if err := er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
|
|
return result, toObjectErr(err, bucket, object, uploadID)
|
|
}
|
|
|
|
uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)
|
|
|
|
storageDisks := er.getDisks()
|
|
|
|
// Read metadata associated with the object from all disks.
|
|
partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, opts.VersionID, false)
|
|
|
|
// get Quorum for this object
|
|
readQuorum, _, err := objectQuorumFromMeta(ctx, partsMetadata, errs, er.defaultParityCount)
|
|
if err != nil {
|
|
return result, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath)
|
|
}
|
|
|
|
reducedErr := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum)
|
|
if reducedErr == errErasureReadQuorum {
|
|
return result, toObjectErr(reducedErr, minioMetaMultipartBucket, uploadIDPath)
|
|
}
|
|
|
|
_, modTime, dataDir := listOnlineDisks(storageDisks, partsMetadata, errs)
|
|
|
|
// Pick one from the first valid metadata.
|
|
fi, err := pickValidFileInfo(ctx, partsMetadata, modTime, dataDir, readQuorum)
|
|
if err != nil {
|
|
return result, err
|
|
}
|
|
|
|
result.UserDefined = cloneMSS(fi.Metadata)
|
|
return result, nil
|
|
}
|
|
|
|
// ListObjectParts - lists all previously uploaded parts for a given
|
|
// object and uploadID. Takes additional input of part-number-marker
|
|
// to indicate where the listing should begin from.
|
|
//
|
|
// Implements S3 compatible ListObjectParts API. The resulting
|
|
// ListPartsInfo structure is marshaled directly into XML and
|
|
// replied back to the client.
|
|
func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker, maxParts int, opts ObjectOptions) (result ListPartsInfo, err error) {
|
|
uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
|
lkctx, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout)
|
|
if err != nil {
|
|
return ListPartsInfo{}, err
|
|
}
|
|
ctx = lkctx.Context()
|
|
defer uploadIDLock.RUnlock(lkctx.Cancel)
|
|
|
|
if err := er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
|
|
return result, toObjectErr(err, bucket, object, uploadID)
|
|
}
|
|
|
|
uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)
|
|
|
|
storageDisks := er.getDisks()
|
|
|
|
// Read metadata associated with the object from all disks.
|
|
partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "", false)
|
|
|
|
// get Quorum for this object
|
|
_, writeQuorum, err := objectQuorumFromMeta(ctx, partsMetadata, errs, er.defaultParityCount)
|
|
if err != nil {
|
|
return result, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath)
|
|
}
|
|
|
|
reducedErr := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
|
|
if reducedErr == errErasureWriteQuorum {
|
|
return result, toObjectErr(reducedErr, minioMetaMultipartBucket, uploadIDPath)
|
|
}
|
|
|
|
_, modTime, dataDir := listOnlineDisks(storageDisks, partsMetadata, errs)
|
|
|
|
// Pick one from the first valid metadata.
|
|
fi, err := pickValidFileInfo(ctx, partsMetadata, modTime, dataDir, writeQuorum)
|
|
if err != nil {
|
|
return result, err
|
|
}
|
|
|
|
// Populate the result stub.
|
|
result.Bucket = bucket
|
|
result.Object = object
|
|
result.UploadID = uploadID
|
|
result.MaxParts = maxParts
|
|
result.PartNumberMarker = partNumberMarker
|
|
result.UserDefined = cloneMSS(fi.Metadata)
|
|
|
|
// For empty number of parts or maxParts as zero, return right here.
|
|
if len(fi.Parts) == 0 || maxParts == 0 {
|
|
return result, nil
|
|
}
|
|
|
|
// Limit output to maxPartsList.
|
|
if maxParts > maxPartsList {
|
|
maxParts = maxPartsList
|
|
}
|
|
|
|
// Only parts with higher part numbers will be listed.
|
|
partIdx := objectPartIndex(fi.Parts, partNumberMarker)
|
|
parts := fi.Parts
|
|
if partIdx != -1 {
|
|
parts = fi.Parts[partIdx+1:]
|
|
}
|
|
count := maxParts
|
|
for _, part := range parts {
|
|
result.Parts = append(result.Parts, PartInfo{
|
|
PartNumber: part.Number,
|
|
ETag: part.ETag,
|
|
LastModified: fi.ModTime,
|
|
Size: part.Size,
|
|
})
|
|
count--
|
|
if count == 0 {
|
|
break
|
|
}
|
|
}
|
|
// If listed entries are more than maxParts, we set IsTruncated as true.
|
|
if len(parts) > len(result.Parts) {
|
|
result.IsTruncated = true
|
|
// Make sure to fill next part number marker if IsTruncated is
|
|
// true for subsequent listing.
|
|
nextPartNumberMarker := result.Parts[len(result.Parts)-1].PartNumber
|
|
result.NextPartNumberMarker = nextPartNumberMarker
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// CompleteMultipartUpload - completes an ongoing multipart
|
|
// transaction after receiving all the parts indicated by the client.
|
|
// Returns an md5sum calculated by concatenating all the individual
|
|
// md5sums of all the parts.
|
|
//
|
|
// Implements S3 compatible Complete multipart API.
|
|
func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket string, object string, uploadID string, parts []CompletePart, opts ObjectOptions) (oi ObjectInfo, err error) {
|
|
// Hold read-locks to verify uploaded parts, also disallows
|
|
// parallel part uploads as well.
|
|
uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
|
rlkctx, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout)
|
|
if err != nil {
|
|
return oi, err
|
|
}
|
|
rctx := rlkctx.Context()
|
|
defer uploadIDLock.RUnlock(rlkctx.Cancel)
|
|
|
|
if err = er.checkUploadIDExists(rctx, bucket, object, uploadID); err != nil {
|
|
return oi, toObjectErr(err, bucket, object, uploadID)
|
|
}
|
|
|
|
// Check if an object is present as one of the parent dir.
|
|
// -- FIXME. (needs a new kind of lock).
|
|
if opts.ParentIsObject != nil && opts.ParentIsObject(rctx, bucket, path.Dir(object)) {
|
|
return oi, toObjectErr(errFileParentIsFile, bucket, object)
|
|
}
|
|
|
|
// Calculate s3 compatible md5sum for complete multipart.
|
|
s3MD5 := getCompleteMultipartMD5(parts)
|
|
|
|
uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)
|
|
|
|
storageDisks := er.getDisks()
|
|
|
|
// Read metadata associated with the object from all disks.
|
|
partsMetadata, errs := readAllFileInfo(rctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "", false)
|
|
|
|
// get Quorum for this object
|
|
_, writeQuorum, err := objectQuorumFromMeta(rctx, partsMetadata, errs, er.defaultParityCount)
|
|
if err != nil {
|
|
return oi, toObjectErr(err, bucket, object)
|
|
}
|
|
|
|
reducedErr := reduceWriteQuorumErrs(rctx, errs, objectOpIgnoredErrs, writeQuorum)
|
|
if reducedErr == errErasureWriteQuorum {
|
|
return oi, toObjectErr(reducedErr, bucket, object)
|
|
}
|
|
|
|
onlineDisks, modTime, dataDir := listOnlineDisks(storageDisks, partsMetadata, errs)
|
|
|
|
// Pick one from the first valid metadata.
|
|
fi, err := pickValidFileInfo(rctx, partsMetadata, modTime, dataDir, writeQuorum)
|
|
if err != nil {
|
|
return oi, err
|
|
}
|
|
|
|
// Calculate full object size.
|
|
var objectSize int64
|
|
|
|
// Calculate consolidated actual size.
|
|
var objectActualSize int64
|
|
|
|
// Order online disks in accordance with distribution order.
|
|
// Order parts metadata in accordance with distribution order.
|
|
onlineDisks, partsMetadata = shuffleDisksAndPartsMetadataByIndex(onlineDisks, partsMetadata, fi)
|
|
|
|
// Save current erasure metadata for validation.
|
|
var currentFI = fi
|
|
|
|
// Allocate parts similar to incoming slice.
|
|
fi.Parts = make([]ObjectPartInfo, len(parts))
|
|
|
|
// Validate each part and then commit to disk.
|
|
for i, part := range parts {
|
|
partIdx := objectPartIndex(currentFI.Parts, part.PartNumber)
|
|
// All parts should have same part number.
|
|
if partIdx == -1 {
|
|
invp := InvalidPart{
|
|
PartNumber: part.PartNumber,
|
|
GotETag: part.ETag,
|
|
}
|
|
return oi, invp
|
|
}
|
|
|
|
// ensure that part ETag is canonicalized to strip off extraneous quotes
|
|
part.ETag = canonicalizeETag(part.ETag)
|
|
if currentFI.Parts[partIdx].ETag != part.ETag {
|
|
invp := InvalidPart{
|
|
PartNumber: part.PartNumber,
|
|
ExpETag: currentFI.Parts[partIdx].ETag,
|
|
GotETag: part.ETag,
|
|
}
|
|
return oi, invp
|
|
}
|
|
|
|
// All parts except the last part has to be atleast 5MB.
|
|
if (i < len(parts)-1) && !isMinAllowedPartSize(currentFI.Parts[partIdx].ActualSize) {
|
|
return oi, PartTooSmall{
|
|
PartNumber: part.PartNumber,
|
|
PartSize: currentFI.Parts[partIdx].ActualSize,
|
|
PartETag: part.ETag,
|
|
}
|
|
}
|
|
|
|
// Save for total object size.
|
|
objectSize += currentFI.Parts[partIdx].Size
|
|
|
|
// Save the consolidated actual size.
|
|
objectActualSize += currentFI.Parts[partIdx].ActualSize
|
|
|
|
// Add incoming parts.
|
|
fi.Parts[i] = ObjectPartInfo{
|
|
Number: part.PartNumber,
|
|
Size: currentFI.Parts[partIdx].Size,
|
|
ActualSize: currentFI.Parts[partIdx].ActualSize,
|
|
}
|
|
}
|
|
|
|
// Save the final object size and modtime.
|
|
fi.Size = objectSize
|
|
fi.ModTime = opts.MTime
|
|
if opts.MTime.IsZero() {
|
|
fi.ModTime = UTCNow()
|
|
}
|
|
|
|
// Save successfully calculated md5sum.
|
|
fi.Metadata["etag"] = s3MD5
|
|
if opts.UserDefined["etag"] != "" { // preserve ETag if set
|
|
fi.Metadata["etag"] = opts.UserDefined["etag"]
|
|
}
|
|
|
|
// Save the consolidated actual size.
|
|
fi.Metadata[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(objectActualSize, 10)
|
|
|
|
// Update all erasure metadata, make sure to not modify fields like
|
|
// checksum which are different on each disks.
|
|
for index := range partsMetadata {
|
|
if partsMetadata[index].IsValid() {
|
|
partsMetadata[index].Size = fi.Size
|
|
partsMetadata[index].ModTime = fi.ModTime
|
|
partsMetadata[index].Metadata = fi.Metadata
|
|
partsMetadata[index].Parts = fi.Parts
|
|
}
|
|
}
|
|
|
|
// Hold namespace to complete the transaction
|
|
lk := er.NewNSLock(bucket, object)
|
|
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
|
|
if err != nil {
|
|
return oi, err
|
|
}
|
|
ctx = lkctx.Context()
|
|
defer lk.Unlock(lkctx.Cancel)
|
|
|
|
// Write final `xl.meta` at uploadID location
|
|
onlineDisks, err = writeUniqueFileInfo(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum)
|
|
if err != nil {
|
|
return oi, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath)
|
|
}
|
|
|
|
// Remove parts that weren't present in CompleteMultipartUpload request.
|
|
for _, curpart := range currentFI.Parts {
|
|
if objectPartIndex(fi.Parts, curpart.Number) == -1 {
|
|
// Delete the missing part files. e.g,
|
|
// Request 1: NewMultipart
|
|
// Request 2: PutObjectPart 1
|
|
// Request 3: PutObjectPart 2
|
|
// Request 4: CompleteMultipartUpload --part 2
|
|
// N.B. 1st part is not present. This part should be removed from the storage.
|
|
er.removeObjectPart(bucket, object, uploadID, fi.DataDir, curpart.Number)
|
|
}
|
|
}
|
|
|
|
// Rename the multipart object to final location.
|
|
if onlineDisks, err = renameData(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath,
|
|
partsMetadata, bucket, object, writeQuorum); err != nil {
|
|
return oi, toObjectErr(err, bucket, object)
|
|
}
|
|
|
|
// Check if there is any offline disk and add it to the MRF list
|
|
for _, disk := range onlineDisks {
|
|
if disk != nil && disk.IsOnline() {
|
|
continue
|
|
}
|
|
er.addPartial(bucket, object, fi.VersionID)
|
|
break
|
|
}
|
|
|
|
for i := 0; i < len(onlineDisks); i++ {
|
|
if onlineDisks[i] != nil && onlineDisks[i].IsOnline() {
|
|
// Object info is the same in all disks, so we can pick
|
|
// the first meta from online disk
|
|
fi = partsMetadata[i]
|
|
break
|
|
}
|
|
}
|
|
|
|
// Success, return object info.
|
|
return fi.ToObjectInfo(bucket, object), nil
|
|
}
|
|
|
|
// AbortMultipartUpload - aborts an ongoing multipart operation
|
|
// signified by the input uploadID. This is an atomic operation
|
|
// doesn't require clients to initiate multiple such requests.
|
|
//
|
|
// All parts are purged from all disks and reference to the uploadID
|
|
// would be removed from the system, rollback is not possible on this
|
|
// operation.
|
|
func (er erasureObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (err error) {
|
|
lk := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
|
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
ctx = lkctx.Context()
|
|
defer lk.Unlock(lkctx.Cancel)
|
|
|
|
// Validates if upload ID exists.
|
|
if err := er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
|
|
return toObjectErr(err, bucket, object, uploadID)
|
|
}
|
|
|
|
uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)
|
|
|
|
// Read metadata associated with the object from all disks.
|
|
partsMetadata, errs := readAllFileInfo(ctx, er.getDisks(), minioMetaMultipartBucket, uploadIDPath, "", false)
|
|
|
|
// get Quorum for this object
|
|
_, writeQuorum, err := objectQuorumFromMeta(ctx, partsMetadata, errs, er.defaultParityCount)
|
|
if err != nil {
|
|
return toObjectErr(err, bucket, object, uploadID)
|
|
}
|
|
|
|
// Cleanup all uploaded parts.
|
|
if err = er.deleteObject(ctx, minioMetaMultipartBucket, uploadIDPath, writeQuorum); err != nil {
|
|
return toObjectErr(err, bucket, object, uploadID)
|
|
}
|
|
|
|
// Successfully purged.
|
|
return nil
|
|
}
|