mirror of
https://github.com/minio/minio.git
synced 2025-02-04 10:26:01 -05:00
9dca46e156
Signature calculation has now moved out from being a package to top-level as a layered mechanism. In case of payload calculation with body, go-routines are initiated to simultaneously write and calculate shasum. Errors are sent over the writer so that the lower layer removes the temporary files properly.
667 lines
20 KiB
Go
667 lines
20 KiB
Go
/*
|
|
* Minio Cloud Storage, (C) 2015 Minio, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package fs
|
|
|
|
import (
|
|
"crypto/md5"
|
|
"encoding/base64"
|
|
"encoding/hex"
|
|
"encoding/xml"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"math/rand"
|
|
"os"
|
|
"path/filepath"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/minio/minio/pkg/atomic"
|
|
"github.com/minio/minio/pkg/crypto/sha512"
|
|
"github.com/minio/minio/pkg/disk"
|
|
"github.com/minio/minio/pkg/mimedb"
|
|
"github.com/minio/minio/pkg/probe"
|
|
)
|
|
|
|
// isValidUploadID - is upload id.
|
|
func (fs Filesystem) isValidUploadID(object, uploadID string) (ok bool) {
|
|
fs.rwLock.RLock()
|
|
defer fs.rwLock.RUnlock()
|
|
_, ok = fs.multiparts.ActiveSession[uploadID]
|
|
if !ok {
|
|
return
|
|
}
|
|
return
|
|
}
|
|
|
|
// byObjectInfoKey is a sortable interface for UploadMetadata slice
|
|
type byUploadMetadataKey []*UploadMetadata
|
|
|
|
func (b byUploadMetadataKey) Len() int { return len(b) }
|
|
func (b byUploadMetadataKey) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
|
|
func (b byUploadMetadataKey) Less(i, j int) bool { return b[i].Object < b[j].Object }
|
|
|
|
// ListMultipartUploads - list incomplete multipart sessions for a given BucketMultipartResourcesMetadata
|
|
func (fs Filesystem) ListMultipartUploads(bucket string, resources BucketMultipartResourcesMetadata) (BucketMultipartResourcesMetadata, *probe.Error) {
|
|
// Input validation.
|
|
if !IsValidBucketName(bucket) {
|
|
return BucketMultipartResourcesMetadata{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
|
|
}
|
|
bucket = fs.denormalizeBucket(bucket)
|
|
bucketPath := filepath.Join(fs.path, bucket)
|
|
if _, e := os.Stat(bucketPath); e != nil {
|
|
// Check bucket exists.
|
|
if os.IsNotExist(e) {
|
|
return BucketMultipartResourcesMetadata{}, probe.NewError(BucketNotFound{Bucket: bucket})
|
|
}
|
|
return BucketMultipartResourcesMetadata{}, probe.NewError(e)
|
|
}
|
|
var uploads []*UploadMetadata
|
|
fs.rwLock.RLock()
|
|
defer fs.rwLock.RUnlock()
|
|
for uploadID, session := range fs.multiparts.ActiveSession {
|
|
objectName := session.ObjectName
|
|
if strings.HasPrefix(objectName, resources.Prefix) {
|
|
if len(uploads) > resources.MaxUploads {
|
|
sort.Sort(byUploadMetadataKey(uploads))
|
|
resources.Upload = uploads
|
|
resources.NextKeyMarker = session.ObjectName
|
|
resources.NextUploadIDMarker = uploadID
|
|
resources.IsTruncated = true
|
|
return resources, nil
|
|
}
|
|
// UploadIDMarker is ignored if KeyMarker is empty.
|
|
switch {
|
|
case resources.KeyMarker != "" && resources.UploadIDMarker == "":
|
|
if objectName > resources.KeyMarker {
|
|
upload := new(UploadMetadata)
|
|
upload.Object = objectName
|
|
upload.UploadID = uploadID
|
|
upload.Initiated = session.Initiated
|
|
uploads = append(uploads, upload)
|
|
}
|
|
case resources.KeyMarker != "" && resources.UploadIDMarker != "":
|
|
if session.UploadID > resources.UploadIDMarker {
|
|
if objectName >= resources.KeyMarker {
|
|
upload := new(UploadMetadata)
|
|
upload.Object = objectName
|
|
upload.UploadID = uploadID
|
|
upload.Initiated = session.Initiated
|
|
uploads = append(uploads, upload)
|
|
}
|
|
}
|
|
default:
|
|
upload := new(UploadMetadata)
|
|
upload.Object = objectName
|
|
upload.UploadID = uploadID
|
|
upload.Initiated = session.Initiated
|
|
uploads = append(uploads, upload)
|
|
}
|
|
}
|
|
}
|
|
sort.Sort(byUploadMetadataKey(uploads))
|
|
resources.Upload = uploads
|
|
return resources, nil
|
|
}
|
|
|
|
// verify if parts sent over the network do really match with what we
|
|
// have for the session.
|
|
func doPartsMatch(parts []CompletePart, savedParts []PartMetadata) bool {
|
|
if parts == nil || savedParts == nil {
|
|
return false
|
|
}
|
|
if len(parts) != len(savedParts) {
|
|
return false
|
|
}
|
|
// Range of incoming parts and compare them with saved parts.
|
|
for i, part := range parts {
|
|
if strings.Trim(part.ETag, "\"") != savedParts[i].ETag {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
// Create an s3 compatible MD5sum for complete multipart transaction.
|
|
func makeS3MD5(md5Strs ...string) (string, *probe.Error) {
|
|
var finalMD5Bytes []byte
|
|
for _, md5Str := range md5Strs {
|
|
md5Bytes, e := hex.DecodeString(md5Str)
|
|
if e != nil {
|
|
return "", probe.NewError(e)
|
|
}
|
|
finalMD5Bytes = append(finalMD5Bytes, md5Bytes...)
|
|
}
|
|
md5Hasher := md5.New()
|
|
md5Hasher.Write(finalMD5Bytes)
|
|
s3MD5 := fmt.Sprintf("%s-%d", hex.EncodeToString(md5Hasher.Sum(nil)), len(md5Strs))
|
|
return s3MD5, nil
|
|
}
|
|
|
|
type multiCloser struct {
|
|
Closers []io.Closer
|
|
}
|
|
|
|
func (m multiCloser) Close() error {
|
|
for _, c := range m.Closers {
|
|
if e := c.Close(); e != nil {
|
|
return e
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// MultiCloser - returns a Closer that's the logical
|
|
// concatenation of the provided input closers. They're closed
|
|
// sequentially. If any of the closers return a non-nil error, Close
|
|
// will return that error.
|
|
func MultiCloser(closers ...io.Closer) io.Closer {
|
|
return multiCloser{closers}
|
|
}
|
|
|
|
// removeParts - remove all parts.
|
|
func removeParts(partPathPrefix string, parts []PartMetadata) *probe.Error {
|
|
for _, part := range parts {
|
|
// We are on purpose ignoring the return values here, since
|
|
// another thread would have purged these entries.
|
|
os.Remove(partPathPrefix + part.ETag + fmt.Sprintf("$%d-$multiparts", part.PartNumber))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// saveParts - concantenate and save all parts.
|
|
func saveParts(partPathPrefix string, mw io.Writer, parts []CompletePart) *probe.Error {
|
|
var partReaders []io.Reader
|
|
var partClosers []io.Closer
|
|
for _, part := range parts {
|
|
// Trim prefix
|
|
md5Sum := strings.TrimPrefix(part.ETag, "\"")
|
|
// Trim suffix
|
|
md5Sum = strings.TrimSuffix(md5Sum, "\"")
|
|
partFile, e := os.OpenFile(partPathPrefix+md5Sum+fmt.Sprintf("$%d-$multiparts", part.PartNumber), os.O_RDONLY, 0600)
|
|
if e != nil {
|
|
if !os.IsNotExist(e) {
|
|
return probe.NewError(e)
|
|
}
|
|
// Some clients do not set Content-Md5, so we would have
|
|
// created part files without 'ETag' in them.
|
|
partFile, e = os.OpenFile(partPathPrefix+fmt.Sprintf("$%d-$multiparts", part.PartNumber), os.O_RDONLY, 0600)
|
|
if e != nil {
|
|
return probe.NewError(e)
|
|
}
|
|
}
|
|
partReaders = append(partReaders, partFile)
|
|
partClosers = append(partClosers, partFile)
|
|
}
|
|
// Concatenate a list of closers and close upon return.
|
|
closer := MultiCloser(partClosers...)
|
|
defer closer.Close()
|
|
|
|
reader := io.MultiReader(partReaders...)
|
|
readBufferSize := 8 * 1024 * 1024 // 8MiB
|
|
readBuffer := make([]byte, readBufferSize) // Allocate 8MiB buffer.
|
|
if _, e := io.CopyBuffer(mw, reader, readBuffer); e != nil {
|
|
return probe.NewError(e)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// NewMultipartUpload - initiate a new multipart session
|
|
func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.Error) {
|
|
di, e := disk.GetInfo(fs.path)
|
|
if e != nil {
|
|
return "", probe.NewError(e)
|
|
}
|
|
|
|
// Remove 5% from total space for cumulative disk space used for
|
|
// journalling, inodes etc.
|
|
availableDiskSpace := (float64(di.Free) / (float64(di.Total) - (0.05 * float64(di.Total)))) * 100
|
|
if int64(availableDiskSpace) <= fs.minFreeDisk {
|
|
return "", probe.NewError(RootPathFull{Path: fs.path})
|
|
}
|
|
|
|
// Input validation.
|
|
if !IsValidBucketName(bucket) {
|
|
return "", probe.NewError(BucketNameInvalid{Bucket: bucket})
|
|
}
|
|
if !IsValidObjectName(object) {
|
|
return "", probe.NewError(ObjectNameInvalid{Object: object})
|
|
}
|
|
|
|
bucket = fs.denormalizeBucket(bucket)
|
|
bucketPath := filepath.Join(fs.path, bucket)
|
|
if _, e = os.Stat(bucketPath); e != nil {
|
|
// Check bucket exists.
|
|
if os.IsNotExist(e) {
|
|
return "", probe.NewError(BucketNotFound{Bucket: bucket})
|
|
}
|
|
return "", probe.NewError(e)
|
|
}
|
|
|
|
objectPath := filepath.Join(bucketPath, object)
|
|
objectDir := filepath.Dir(objectPath)
|
|
if _, e = os.Stat(objectDir); e != nil {
|
|
if !os.IsNotExist(e) {
|
|
return "", probe.NewError(e)
|
|
}
|
|
e = os.MkdirAll(objectDir, 0700)
|
|
if e != nil {
|
|
return "", probe.NewError(e)
|
|
}
|
|
}
|
|
|
|
// Generate new upload id.
|
|
id := []byte(strconv.FormatInt(rand.Int63(), 10) + bucket + object + time.Now().String())
|
|
uploadIDSum := sha512.Sum512(id)
|
|
uploadID := base64.URLEncoding.EncodeToString(uploadIDSum[:])[:47]
|
|
|
|
// Critical region requiring write lock.
|
|
fs.rwLock.Lock()
|
|
defer fs.rwLock.Unlock()
|
|
// Initialize multipart session.
|
|
mpartSession := &MultipartSession{}
|
|
mpartSession.TotalParts = 0
|
|
mpartSession.ObjectName = object
|
|
mpartSession.UploadID = uploadID
|
|
mpartSession.Initiated = time.Now().UTC()
|
|
// Multipart has maximum of 10000 parts.
|
|
var parts []PartMetadata
|
|
mpartSession.Parts = parts
|
|
|
|
fs.multiparts.ActiveSession[uploadID] = mpartSession
|
|
if err := saveMultipartsSession(*fs.multiparts); err != nil {
|
|
return "", err.Trace(objectPath)
|
|
}
|
|
return uploadID, nil
|
|
}
|
|
|
|
// Remove all duplicated parts based on the latest time of their upload.
|
|
func removeDuplicateParts(parts []PartMetadata) []PartMetadata {
|
|
length := len(parts) - 1
|
|
for i := 0; i < length; i++ {
|
|
for j := i + 1; j <= length; j++ {
|
|
if parts[i].PartNumber == parts[j].PartNumber {
|
|
if parts[i].LastModified.Sub(parts[j].LastModified) > 0 {
|
|
parts[i] = parts[length]
|
|
} else {
|
|
parts[j] = parts[length]
|
|
}
|
|
parts = parts[0:length]
|
|
length--
|
|
j--
|
|
}
|
|
}
|
|
}
|
|
return parts
|
|
}
|
|
|
|
// partNumber is a sortable interface for Part slice.
|
|
type partNumber []PartMetadata
|
|
|
|
func (a partNumber) Len() int { return len(a) }
|
|
func (a partNumber) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
|
func (a partNumber) Less(i, j int) bool { return a[i].PartNumber < a[j].PartNumber }
|
|
|
|
// CreateObjectPart - create a part in a multipart session
|
|
func (fs Filesystem) CreateObjectPart(bucket, object, uploadID string, partID int, size int64, data io.Reader, md5Bytes []byte) (string, *probe.Error) {
|
|
di, err := disk.GetInfo(fs.path)
|
|
if err != nil {
|
|
return "", probe.NewError(err)
|
|
}
|
|
|
|
// Remove 5% from total space for cumulative disk space used for
|
|
// journalling, inodes etc.
|
|
availableDiskSpace := (float64(di.Free) / (float64(di.Total) - (0.05 * float64(di.Total)))) * 100
|
|
if int64(availableDiskSpace) <= fs.minFreeDisk {
|
|
return "", probe.NewError(RootPathFull{Path: fs.path})
|
|
}
|
|
|
|
// Part id cannot be negative.
|
|
if partID <= 0 {
|
|
return "", probe.NewError(errors.New("invalid part id, cannot be zero or less than zero"))
|
|
}
|
|
|
|
// Check bucket name valid.
|
|
if !IsValidBucketName(bucket) {
|
|
return "", probe.NewError(BucketNameInvalid{Bucket: bucket})
|
|
}
|
|
|
|
// Verify object path legal.
|
|
if !IsValidObjectName(object) {
|
|
return "", probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
|
|
}
|
|
|
|
// Verify upload is valid for the incoming object.
|
|
if !fs.isValidUploadID(object, uploadID) {
|
|
return "", probe.NewError(InvalidUploadID{UploadID: uploadID})
|
|
}
|
|
|
|
bucket = fs.denormalizeBucket(bucket)
|
|
bucketPath := filepath.Join(fs.path, bucket)
|
|
if _, e := os.Stat(bucketPath); e != nil {
|
|
// Check bucket exists.
|
|
if os.IsNotExist(e) {
|
|
return "", probe.NewError(BucketNotFound{Bucket: bucket})
|
|
}
|
|
return "", probe.NewError(e)
|
|
}
|
|
|
|
// md5Hex representation.
|
|
var md5Hex string
|
|
if len(md5Bytes) != 0 {
|
|
md5Hex = hex.EncodeToString(md5Bytes)
|
|
}
|
|
|
|
objectPath := filepath.Join(bucketPath, object)
|
|
partPathPrefix := objectPath + uploadID
|
|
partPath := partPathPrefix + md5Hex + fmt.Sprintf("$%d-$multiparts", partID)
|
|
partFile, e := atomic.FileCreateWithPrefix(partPath, "$multiparts")
|
|
if e != nil {
|
|
return "", probe.NewError(e)
|
|
}
|
|
defer partFile.Close()
|
|
|
|
// Initialize md5 writer.
|
|
md5Writer := md5.New()
|
|
|
|
// Create a multiwriter.
|
|
multiWriter := io.MultiWriter(md5Writer, partFile)
|
|
|
|
if _, e = io.CopyN(multiWriter, data, size); e != nil {
|
|
partFile.CloseAndPurge()
|
|
return "", probe.NewError(e)
|
|
}
|
|
|
|
// Finalize new md5.
|
|
newMD5Hex := hex.EncodeToString(md5Writer.Sum(nil))
|
|
if len(md5Bytes) != 0 {
|
|
if newMD5Hex != md5Hex {
|
|
return "", probe.NewError(BadDigest{md5Hex, newMD5Hex})
|
|
}
|
|
}
|
|
|
|
// Stat the file to get the latest information.
|
|
fi, e := os.Stat(partFile.Name())
|
|
if e != nil {
|
|
return "", probe.NewError(e)
|
|
}
|
|
partMetadata := PartMetadata{}
|
|
partMetadata.PartNumber = partID
|
|
partMetadata.ETag = newMD5Hex
|
|
partMetadata.Size = fi.Size()
|
|
partMetadata.LastModified = fi.ModTime()
|
|
|
|
// Critical region requiring read lock.
|
|
fs.rwLock.RLock()
|
|
deserializedMultipartSession, ok := fs.multiparts.ActiveSession[uploadID]
|
|
fs.rwLock.RUnlock()
|
|
if !ok {
|
|
return "", probe.NewError(InvalidUploadID{UploadID: uploadID})
|
|
}
|
|
|
|
// Add all incoming parts.
|
|
deserializedMultipartSession.Parts = append(deserializedMultipartSession.Parts, partMetadata)
|
|
|
|
// Remove duplicate parts based on the most recent uploaded.
|
|
deserializedMultipartSession.Parts = removeDuplicateParts(deserializedMultipartSession.Parts)
|
|
// Save total parts uploaded.
|
|
deserializedMultipartSession.TotalParts = len(deserializedMultipartSession.Parts)
|
|
|
|
// Sort by part number before saving.
|
|
sort.Sort(partNumber(deserializedMultipartSession.Parts))
|
|
|
|
// Critical region requiring write lock.
|
|
fs.rwLock.Lock()
|
|
defer fs.rwLock.Unlock()
|
|
|
|
fs.multiparts.ActiveSession[uploadID] = deserializedMultipartSession
|
|
if err := saveMultipartsSession(*fs.multiparts); err != nil {
|
|
return "", err.Trace(partPathPrefix)
|
|
}
|
|
return newMD5Hex, nil
|
|
}
|
|
|
|
// CompleteMultipartUpload - complete a multipart upload and persist the data
|
|
func (fs Filesystem) CompleteMultipartUpload(bucket string, object string, uploadID string, completeMultipartBytes []byte) (ObjectInfo, *probe.Error) {
|
|
// Check bucket name is valid.
|
|
if !IsValidBucketName(bucket) {
|
|
return ObjectInfo{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
|
|
}
|
|
|
|
// Verify object path is legal.
|
|
if !IsValidObjectName(object) {
|
|
return ObjectInfo{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
|
|
}
|
|
|
|
// Verify if valid upload for incoming object.
|
|
if !fs.isValidUploadID(object, uploadID) {
|
|
return ObjectInfo{}, probe.NewError(InvalidUploadID{UploadID: uploadID})
|
|
}
|
|
|
|
bucket = fs.denormalizeBucket(bucket)
|
|
bucketPath := filepath.Join(fs.path, bucket)
|
|
if _, e := os.Stat(bucketPath); e != nil {
|
|
// Check bucket exists.
|
|
if os.IsNotExist(e) {
|
|
return ObjectInfo{}, probe.NewError(BucketNotFound{Bucket: bucket})
|
|
}
|
|
return ObjectInfo{}, probe.NewError(InternalError{})
|
|
}
|
|
|
|
objectPath := filepath.Join(bucketPath, object)
|
|
objectWriter, e := atomic.FileCreateWithPrefix(objectPath, "$tmpobject")
|
|
if e != nil {
|
|
return ObjectInfo{}, probe.NewError(e)
|
|
}
|
|
|
|
completeMultipartUpload := &CompleteMultipartUpload{}
|
|
if e = xml.Unmarshal(completeMultipartBytes, completeMultipartUpload); e != nil {
|
|
objectWriter.CloseAndPurge()
|
|
return ObjectInfo{}, probe.NewError(MalformedXML{})
|
|
}
|
|
if !sort.IsSorted(completedParts(completeMultipartUpload.Part)) {
|
|
objectWriter.CloseAndPurge()
|
|
return ObjectInfo{}, probe.NewError(InvalidPartOrder{})
|
|
}
|
|
|
|
// Save parts for verification.
|
|
parts := completeMultipartUpload.Part
|
|
|
|
// Critical region requiring read lock.
|
|
fs.rwLock.RLock()
|
|
savedParts := fs.multiparts.ActiveSession[uploadID].Parts
|
|
fs.rwLock.RUnlock()
|
|
|
|
if !doPartsMatch(parts, savedParts) {
|
|
objectWriter.CloseAndPurge()
|
|
return ObjectInfo{}, probe.NewError(InvalidPart{})
|
|
}
|
|
|
|
// Parts successfully validated, save all the parts.
|
|
partPathPrefix := objectPath + uploadID
|
|
if err := saveParts(partPathPrefix, objectWriter, parts); err != nil {
|
|
objectWriter.CloseAndPurge()
|
|
return ObjectInfo{}, err.Trace(partPathPrefix)
|
|
}
|
|
var md5Strs []string
|
|
for _, part := range savedParts {
|
|
md5Strs = append(md5Strs, part.ETag)
|
|
}
|
|
// Save the s3 md5.
|
|
s3MD5, err := makeS3MD5(md5Strs...)
|
|
if err != nil {
|
|
objectWriter.CloseAndPurge()
|
|
return ObjectInfo{}, err.Trace(md5Strs...)
|
|
}
|
|
|
|
// Successfully saved multipart, remove all parts in a routine.
|
|
go removeParts(partPathPrefix, savedParts)
|
|
|
|
// Critical region requiring write lock.
|
|
fs.rwLock.Lock()
|
|
delete(fs.multiparts.ActiveSession, uploadID)
|
|
if err := saveMultipartsSession(*fs.multiparts); err != nil {
|
|
fs.rwLock.Unlock()
|
|
objectWriter.CloseAndPurge()
|
|
return ObjectInfo{}, err.Trace(partPathPrefix)
|
|
}
|
|
if e = objectWriter.Close(); e != nil {
|
|
fs.rwLock.Unlock()
|
|
return ObjectInfo{}, probe.NewError(e)
|
|
}
|
|
fs.rwLock.Unlock()
|
|
|
|
// Send stat again to get object metadata.
|
|
st, e := os.Stat(objectPath)
|
|
if e != nil {
|
|
return ObjectInfo{}, probe.NewError(e)
|
|
}
|
|
|
|
contentType := "application/octet-stream"
|
|
if objectExt := filepath.Ext(objectPath); objectExt != "" {
|
|
content, ok := mimedb.DB[strings.ToLower(strings.TrimPrefix(objectExt, "."))]
|
|
if ok {
|
|
contentType = content.ContentType
|
|
}
|
|
}
|
|
newObject := ObjectInfo{
|
|
Bucket: bucket,
|
|
Name: object,
|
|
ModifiedTime: st.ModTime(),
|
|
Size: st.Size(),
|
|
ContentType: contentType,
|
|
MD5Sum: s3MD5,
|
|
}
|
|
return newObject, nil
|
|
}
|
|
|
|
// ListObjectParts - list parts from incomplete multipart session for a given ObjectResourcesMetadata
|
|
func (fs Filesystem) ListObjectParts(bucket, object string, resources ObjectResourcesMetadata) (ObjectResourcesMetadata, *probe.Error) {
|
|
// Check bucket name is valid.
|
|
if !IsValidBucketName(bucket) {
|
|
return ObjectResourcesMetadata{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
|
|
}
|
|
|
|
// Verify object path legal.
|
|
if !IsValidObjectName(object) {
|
|
return ObjectResourcesMetadata{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
|
|
}
|
|
|
|
// Save upload id.
|
|
uploadID := resources.UploadID
|
|
|
|
// Verify if upload id is valid for incoming object.
|
|
if !fs.isValidUploadID(object, uploadID) {
|
|
return ObjectResourcesMetadata{}, probe.NewError(InvalidUploadID{UploadID: uploadID})
|
|
}
|
|
|
|
objectResourcesMetadata := resources
|
|
objectResourcesMetadata.Bucket = bucket
|
|
objectResourcesMetadata.Object = object
|
|
var startPartNumber int
|
|
switch {
|
|
case objectResourcesMetadata.PartNumberMarker == 0:
|
|
startPartNumber = 1
|
|
default:
|
|
startPartNumber = objectResourcesMetadata.PartNumberMarker
|
|
}
|
|
|
|
bucket = fs.denormalizeBucket(bucket)
|
|
bucketPath := filepath.Join(fs.path, bucket)
|
|
if _, e := os.Stat(bucketPath); e != nil {
|
|
// Check bucket exists.
|
|
if os.IsNotExist(e) {
|
|
return ObjectResourcesMetadata{}, probe.NewError(BucketNotFound{Bucket: bucket})
|
|
}
|
|
return ObjectResourcesMetadata{}, probe.NewError(e)
|
|
}
|
|
|
|
// Critical region requiring read lock.
|
|
fs.rwLock.RLock()
|
|
deserializedMultipartSession, ok := fs.multiparts.ActiveSession[uploadID]
|
|
fs.rwLock.RUnlock()
|
|
if !ok {
|
|
return ObjectResourcesMetadata{}, probe.NewError(InvalidUploadID{UploadID: resources.UploadID})
|
|
}
|
|
var parts []PartMetadata
|
|
for i := startPartNumber; i <= deserializedMultipartSession.TotalParts; i++ {
|
|
if len(parts) > objectResourcesMetadata.MaxParts {
|
|
sort.Sort(partNumber(parts))
|
|
objectResourcesMetadata.IsTruncated = true
|
|
objectResourcesMetadata.Part = parts
|
|
objectResourcesMetadata.NextPartNumberMarker = i
|
|
return objectResourcesMetadata, nil
|
|
}
|
|
parts = append(parts, deserializedMultipartSession.Parts[i-1])
|
|
}
|
|
sort.Sort(partNumber(parts))
|
|
objectResourcesMetadata.Part = parts
|
|
return objectResourcesMetadata, nil
|
|
}
|
|
|
|
// AbortMultipartUpload - abort an incomplete multipart session
|
|
func (fs Filesystem) AbortMultipartUpload(bucket, object, uploadID string) *probe.Error {
|
|
// Check bucket name valid.
|
|
if !IsValidBucketName(bucket) {
|
|
return probe.NewError(BucketNameInvalid{Bucket: bucket})
|
|
}
|
|
|
|
// Verify object path legal.
|
|
if !IsValidObjectName(object) {
|
|
return probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
|
|
}
|
|
|
|
if !fs.isValidUploadID(object, uploadID) {
|
|
return probe.NewError(InvalidUploadID{UploadID: uploadID})
|
|
}
|
|
|
|
bucket = fs.denormalizeBucket(bucket)
|
|
bucketPath := filepath.Join(fs.path, bucket)
|
|
if _, e := os.Stat(bucketPath); e != nil {
|
|
// Check bucket exists.
|
|
if os.IsNotExist(e) {
|
|
return probe.NewError(BucketNotFound{Bucket: bucket})
|
|
}
|
|
return probe.NewError(e)
|
|
}
|
|
|
|
objectPath := filepath.Join(bucketPath, object)
|
|
partPathPrefix := objectPath + uploadID
|
|
|
|
// Critical region requiring read lock.
|
|
fs.rwLock.RLock()
|
|
savedParts := fs.multiparts.ActiveSession[uploadID].Parts
|
|
fs.rwLock.RUnlock()
|
|
|
|
// Remove all parts.
|
|
if err := removeParts(partPathPrefix, savedParts); err != nil {
|
|
return err.Trace(partPathPrefix)
|
|
}
|
|
|
|
// Critical region requiring write lock.
|
|
fs.rwLock.Lock()
|
|
defer fs.rwLock.Unlock()
|
|
|
|
delete(fs.multiparts.ActiveSession, uploadID)
|
|
if err := saveMultipartsSession(*fs.multiparts); err != nil {
|
|
return err.Trace(partPathPrefix)
|
|
}
|
|
return nil
|
|
}
|