mirror of
https://github.com/minio/minio.git
synced 2025-01-29 23:46:00 -05:00
9dca46e156
Signature calculation has now moved out from being a package to top-level as a layered mechanism. In case of payload calculation with body, go-routines are initiated to simultaneously write and calculate shasum. Errors are sent over the writer so that the lower layer removes the temporary files properly.
517 lines
17 KiB
Go
517 lines
17 KiB
Go
// +build ignore
|
|
|
|
/*
|
|
* Minio Cloud Storage, (C) 2015 Minio, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package xl
|
|
|
|
import (
|
|
"bytes"
|
|
"crypto/md5"
|
|
"crypto/sha512"
|
|
"encoding/base64"
|
|
"encoding/hex"
|
|
"encoding/xml"
|
|
|
|
"io"
|
|
"io/ioutil"
|
|
"math/rand"
|
|
"runtime/debug"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/minio/minio/pkg/crypto/sha256"
|
|
"github.com/minio/minio/pkg/probe"
|
|
"github.com/minio/minio/pkg/s3/signature4"
|
|
"github.com/minio/minio/pkg/xl/cache/data"
|
|
)
|
|
|
|
/// V2 API functions
|
|
|
|
// NewMultipartUpload - initiate a new multipart session
|
|
func (xl API) NewMultipartUpload(bucket, key, contentType string) (string, *probe.Error) {
|
|
xl.lock.Lock()
|
|
defer xl.lock.Unlock()
|
|
|
|
if !IsValidBucket(bucket) {
|
|
return "", probe.NewError(BucketNameInvalid{Bucket: bucket})
|
|
}
|
|
if !IsValidObjectName(key) {
|
|
return "", probe.NewError(ObjectNameInvalid{Object: key})
|
|
}
|
|
// if len(xl.config.NodeDiskMap) > 0 {
|
|
// return xl.newMultipartUpload(bucket, key, contentType)
|
|
// }
|
|
if !xl.storedBuckets.Exists(bucket) {
|
|
return "", probe.NewError(BucketNotFound{Bucket: bucket})
|
|
}
|
|
storedBucket := xl.storedBuckets.Get(bucket).(storedBucket)
|
|
objectKey := bucket + "/" + key
|
|
if _, ok := storedBucket.objectMetadata[objectKey]; ok == true {
|
|
return "", probe.NewError(ObjectExists{Object: key})
|
|
}
|
|
id := []byte(strconv.Itoa(rand.Int()) + bucket + key + time.Now().UTC().String())
|
|
uploadIDSum := sha512.Sum512(id)
|
|
uploadID := base64.URLEncoding.EncodeToString(uploadIDSum[:])[:47]
|
|
|
|
storedBucket.multiPartSession[key] = MultiPartSession{
|
|
UploadID: uploadID,
|
|
Initiated: time.Now().UTC(),
|
|
TotalParts: 0,
|
|
}
|
|
storedBucket.partMetadata[key] = make(map[int]PartMetadata)
|
|
multiPartCache := data.NewCache(0)
|
|
multiPartCache.OnEvicted = xl.evictedPart
|
|
xl.multiPartObjects[uploadID] = multiPartCache
|
|
xl.storedBuckets.Set(bucket, storedBucket)
|
|
return uploadID, nil
|
|
}
|
|
|
|
// AbortMultipartUpload - abort an incomplete multipart session
|
|
func (xl API) AbortMultipartUpload(bucket, key, uploadID string) *probe.Error {
|
|
xl.lock.Lock()
|
|
defer xl.lock.Unlock()
|
|
|
|
if !IsValidBucket(bucket) {
|
|
return probe.NewError(BucketNameInvalid{Bucket: bucket})
|
|
}
|
|
if !IsValidObjectName(key) {
|
|
return probe.NewError(ObjectNameInvalid{Object: key})
|
|
}
|
|
// TODO: multipart support for xl is broken, since we haven't finalized the format in which
|
|
// it can be stored, disabling this for now until we get the underlying layout stable.
|
|
//
|
|
// if len(xl.config.NodeDiskMap) > 0 {
|
|
// return xl.abortMultipartUpload(bucket, key, uploadID)
|
|
// }
|
|
if !xl.storedBuckets.Exists(bucket) {
|
|
return probe.NewError(BucketNotFound{Bucket: bucket})
|
|
}
|
|
storedBucket := xl.storedBuckets.Get(bucket).(storedBucket)
|
|
if storedBucket.multiPartSession[key].UploadID != uploadID {
|
|
return probe.NewError(InvalidUploadID{UploadID: uploadID})
|
|
}
|
|
xl.cleanupMultipartSession(bucket, key, uploadID)
|
|
return nil
|
|
}
|
|
|
|
// CreateObjectPart - create a part in a multipart session
|
|
func (xl API) CreateObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader, signature *signature4.Sign) (string, *probe.Error) {
|
|
xl.lock.Lock()
|
|
etag, err := xl.createObjectPart(bucket, key, uploadID, partID, "", expectedMD5Sum, size, data, signature)
|
|
xl.lock.Unlock()
|
|
// possible free
|
|
debug.FreeOSMemory()
|
|
|
|
return etag, err.Trace()
|
|
}
|
|
|
|
// createObject - internal wrapper function called by CreateObjectPart
|
|
func (xl API) createObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader, signature *signature4.Sign) (string, *probe.Error) {
|
|
if !IsValidBucket(bucket) {
|
|
return "", probe.NewError(BucketNameInvalid{Bucket: bucket})
|
|
}
|
|
if !IsValidObjectName(key) {
|
|
return "", probe.NewError(ObjectNameInvalid{Object: key})
|
|
}
|
|
// TODO: multipart support for xl is broken, since we haven't finalized the format in which
|
|
// it can be stored, disabling this for now until we get the underlying layout stable.
|
|
//
|
|
/*
|
|
if len(xl.config.NodeDiskMap) > 0 {
|
|
metadata := make(map[string]string)
|
|
if contentType == "" {
|
|
contentType = "application/octet-stream"
|
|
}
|
|
contentType = strings.TrimSpace(contentType)
|
|
metadata["contentType"] = contentType
|
|
if strings.TrimSpace(expectedMD5Sum) != "" {
|
|
expectedMD5SumBytes, err := base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum))
|
|
if err != nil {
|
|
// pro-actively close the connection
|
|
return "", probe.NewError(InvalidDigest{Md5: expectedMD5Sum})
|
|
}
|
|
expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes)
|
|
}
|
|
partMetadata, err := xl.putObjectPart(bucket, key, expectedMD5Sum, uploadID, partID, data, size, metadata, signature)
|
|
if err != nil {
|
|
return "", err.Trace()
|
|
}
|
|
return partMetadata.ETag, nil
|
|
}
|
|
*/
|
|
if !xl.storedBuckets.Exists(bucket) {
|
|
return "", probe.NewError(BucketNotFound{Bucket: bucket})
|
|
}
|
|
strBucket := xl.storedBuckets.Get(bucket).(storedBucket)
|
|
// Verify upload id
|
|
if strBucket.multiPartSession[key].UploadID != uploadID {
|
|
return "", probe.NewError(InvalidUploadID{UploadID: uploadID})
|
|
}
|
|
|
|
// get object key
|
|
parts := strBucket.partMetadata[key]
|
|
if _, ok := parts[partID]; ok {
|
|
return parts[partID].ETag, nil
|
|
}
|
|
|
|
if contentType == "" {
|
|
contentType = "application/octet-stream"
|
|
}
|
|
contentType = strings.TrimSpace(contentType)
|
|
if strings.TrimSpace(expectedMD5Sum) != "" {
|
|
expectedMD5SumBytes, err := base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum))
|
|
if err != nil {
|
|
// pro-actively close the connection
|
|
return "", probe.NewError(InvalidDigest{Md5: expectedMD5Sum})
|
|
}
|
|
expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes)
|
|
}
|
|
|
|
// calculate md5
|
|
hash := md5.New()
|
|
sha256hash := sha256.New()
|
|
|
|
var totalLength int64
|
|
var err error
|
|
for err == nil {
|
|
var length int
|
|
byteBuffer := make([]byte, 1024*1024)
|
|
length, err = data.Read(byteBuffer) // do not read error return error here, we will handle this error later
|
|
if length != 0 {
|
|
hash.Write(byteBuffer[0:length])
|
|
sha256hash.Write(byteBuffer[0:length])
|
|
ok := xl.multiPartObjects[uploadID].Append(partID, byteBuffer[0:length])
|
|
if !ok {
|
|
return "", probe.NewError(InternalError{})
|
|
}
|
|
totalLength += int64(length)
|
|
go debug.FreeOSMemory()
|
|
}
|
|
}
|
|
if totalLength != size {
|
|
xl.multiPartObjects[uploadID].Delete(partID)
|
|
return "", probe.NewError(IncompleteBody{Bucket: bucket, Object: key})
|
|
}
|
|
if err != io.EOF {
|
|
return "", probe.NewError(err)
|
|
}
|
|
|
|
md5SumBytes := hash.Sum(nil)
|
|
md5Sum := hex.EncodeToString(md5SumBytes)
|
|
// Verify if the written object is equal to what is expected, only if it is requested as such
|
|
if strings.TrimSpace(expectedMD5Sum) != "" {
|
|
if err := isMD5SumEqual(strings.TrimSpace(expectedMD5Sum), md5Sum); err != nil {
|
|
return "", err.Trace()
|
|
}
|
|
}
|
|
|
|
if signature != nil {
|
|
{
|
|
ok, err := signature.DoesSignatureMatch(hex.EncodeToString(sha256hash.Sum(nil)))
|
|
if err != nil {
|
|
return "", err.Trace()
|
|
}
|
|
if !ok {
|
|
return "", probe.NewError(SignDoesNotMatch{})
|
|
}
|
|
}
|
|
}
|
|
|
|
newPart := PartMetadata{
|
|
PartNumber: partID,
|
|
LastModified: time.Now().UTC(),
|
|
ETag: md5Sum,
|
|
Size: totalLength,
|
|
}
|
|
|
|
parts[partID] = newPart
|
|
strBucket.partMetadata[key] = parts
|
|
multiPartSession := strBucket.multiPartSession[key]
|
|
multiPartSession.TotalParts++
|
|
strBucket.multiPartSession[key] = multiPartSession
|
|
xl.storedBuckets.Set(bucket, strBucket)
|
|
return md5Sum, nil
|
|
}
|
|
|
|
// cleanupMultipartSession invoked during an abort or complete multipart session to cleanup session from memory
|
|
func (xl API) cleanupMultipartSession(bucket, key, uploadID string) {
|
|
storedBucket := xl.storedBuckets.Get(bucket).(storedBucket)
|
|
for i := 1; i <= storedBucket.multiPartSession[key].TotalParts; i++ {
|
|
xl.multiPartObjects[uploadID].Delete(i)
|
|
}
|
|
delete(storedBucket.multiPartSession, key)
|
|
delete(storedBucket.partMetadata, key)
|
|
xl.storedBuckets.Set(bucket, storedBucket)
|
|
}
|
|
|
|
func (xl API) mergeMultipart(parts *CompleteMultipartUpload, uploadID string, fullObjectWriter *io.PipeWriter) {
|
|
for _, part := range parts.Part {
|
|
recvMD5 := part.ETag
|
|
object, ok := xl.multiPartObjects[uploadID].Get(part.PartNumber)
|
|
if ok == false {
|
|
fullObjectWriter.CloseWithError(probe.WrapError(probe.NewError(InvalidPart{})))
|
|
return
|
|
}
|
|
calcMD5Bytes := md5.Sum(object)
|
|
// complete multi part request header md5sum per part is hex encoded
|
|
recvMD5Bytes, err := hex.DecodeString(strings.Trim(recvMD5, "\""))
|
|
if err != nil {
|
|
fullObjectWriter.CloseWithError(probe.WrapError(probe.NewError(InvalidDigest{Md5: recvMD5})))
|
|
return
|
|
}
|
|
if !bytes.Equal(recvMD5Bytes, calcMD5Bytes[:]) {
|
|
fullObjectWriter.CloseWithError(probe.WrapError(probe.NewError(BadDigest{})))
|
|
return
|
|
}
|
|
|
|
if _, err := io.Copy(fullObjectWriter, bytes.NewReader(object)); err != nil {
|
|
fullObjectWriter.CloseWithError(probe.WrapError(probe.NewError(err)))
|
|
return
|
|
}
|
|
object = nil
|
|
}
|
|
fullObjectWriter.Close()
|
|
return
|
|
}
|
|
|
|
// CompleteMultipartUpload - complete a multipart upload and persist the data
|
|
func (xl API) CompleteMultipartUpload(bucket, key, uploadID string, data io.Reader, signature *signature4.Sign) (ObjectMetadata, *probe.Error) {
|
|
xl.lock.Lock()
|
|
defer xl.lock.Unlock()
|
|
size := int64(xl.multiPartObjects[uploadID].Stats().Bytes)
|
|
fullObjectReader, err := xl.completeMultipartUploadV2(bucket, key, uploadID, data, signature)
|
|
if err != nil {
|
|
return ObjectMetadata{}, err.Trace()
|
|
}
|
|
objectMetadata, err := xl.createObject(bucket, key, "", "", size, fullObjectReader, nil)
|
|
if err != nil {
|
|
// No need to call internal cleanup functions here, caller should call AbortMultipartUpload()
|
|
// which would in-turn cleanup properly in accordance with S3 Spec
|
|
return ObjectMetadata{}, err.Trace()
|
|
}
|
|
xl.cleanupMultipartSession(bucket, key, uploadID)
|
|
return objectMetadata, nil
|
|
}
|
|
|
|
func (xl API) completeMultipartUploadV2(bucket, key, uploadID string, data io.Reader, signature *signature4.Sign) (io.Reader, *probe.Error) {
|
|
if !IsValidBucket(bucket) {
|
|
return nil, probe.NewError(BucketNameInvalid{Bucket: bucket})
|
|
}
|
|
if !IsValidObjectName(key) {
|
|
return nil, probe.NewError(ObjectNameInvalid{Object: key})
|
|
}
|
|
|
|
// TODO: multipart support for xl is broken, since we haven't finalized the format in which
|
|
// it can be stored, disabling this for now until we get the underlying layout stable.
|
|
//
|
|
// if len(xl.config.NodeDiskMap) > 0 {
|
|
// xl.lock.Unlock()
|
|
// return xl.completeMultipartUpload(bucket, key, uploadID, data, signature)
|
|
// }
|
|
|
|
if !xl.storedBuckets.Exists(bucket) {
|
|
return nil, probe.NewError(BucketNotFound{Bucket: bucket})
|
|
}
|
|
storedBucket := xl.storedBuckets.Get(bucket).(storedBucket)
|
|
// Verify upload id
|
|
if storedBucket.multiPartSession[key].UploadID != uploadID {
|
|
return nil, probe.NewError(InvalidUploadID{UploadID: uploadID})
|
|
}
|
|
partBytes, err := ioutil.ReadAll(data)
|
|
if err != nil {
|
|
return nil, probe.NewError(err)
|
|
}
|
|
if signature != nil {
|
|
partHashBytes := sha256.Sum256(partBytes)
|
|
ok, err := signature.DoesSignatureMatch(hex.EncodeToString(partHashBytes[:]))
|
|
if err != nil {
|
|
return nil, err.Trace()
|
|
}
|
|
if !ok {
|
|
return nil, probe.NewError(SignDoesNotMatch{})
|
|
}
|
|
}
|
|
parts := &CompleteMultipartUpload{}
|
|
if err := xml.Unmarshal(partBytes, parts); err != nil {
|
|
return nil, probe.NewError(MalformedXML{})
|
|
}
|
|
if !sort.IsSorted(completedParts(parts.Part)) {
|
|
return nil, probe.NewError(InvalidPartOrder{})
|
|
}
|
|
|
|
fullObjectReader, fullObjectWriter := io.Pipe()
|
|
go xl.mergeMultipart(parts, uploadID, fullObjectWriter)
|
|
|
|
return fullObjectReader, nil
|
|
}
|
|
|
|
// byKey is a sortable interface for UploadMetadata slice
|
|
type byKey []*UploadMetadata
|
|
|
|
func (a byKey) Len() int { return len(a) }
|
|
func (a byKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
|
func (a byKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
|
|
|
|
// ListMultipartUploads - list incomplete multipart sessions for a given bucket
|
|
func (xl API) ListMultipartUploads(bucket string, resources BucketMultipartResourcesMetadata) (BucketMultipartResourcesMetadata, *probe.Error) {
|
|
// TODO handle delimiter, low priority
|
|
xl.lock.Lock()
|
|
defer xl.lock.Unlock()
|
|
|
|
if !IsValidBucket(bucket) {
|
|
return BucketMultipartResourcesMetadata{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
|
|
}
|
|
|
|
// TODO: multipart support for xl is broken, since we haven't finalized the format in which
|
|
// it can be stored, disabling this for now until we get the underlying layout stable.
|
|
//
|
|
// if len(xl.config.NodeDiskMap) > 0 {
|
|
// return xl.listMultipartUploads(bucket, resources)
|
|
// }
|
|
|
|
if !xl.storedBuckets.Exists(bucket) {
|
|
return BucketMultipartResourcesMetadata{}, probe.NewError(BucketNotFound{Bucket: bucket})
|
|
}
|
|
|
|
storedBucket := xl.storedBuckets.Get(bucket).(storedBucket)
|
|
var uploads []*UploadMetadata
|
|
|
|
for key, session := range storedBucket.multiPartSession {
|
|
if strings.HasPrefix(key, resources.Prefix) {
|
|
if len(uploads) > resources.MaxUploads {
|
|
sort.Sort(byKey(uploads))
|
|
resources.Upload = uploads
|
|
resources.NextKeyMarker = key
|
|
resources.NextUploadIDMarker = session.UploadID
|
|
resources.IsTruncated = true
|
|
return resources, nil
|
|
}
|
|
// uploadIDMarker is ignored if KeyMarker is empty
|
|
switch {
|
|
case resources.KeyMarker != "" && resources.UploadIDMarker == "":
|
|
if key > resources.KeyMarker {
|
|
upload := new(UploadMetadata)
|
|
upload.Key = key
|
|
upload.UploadID = session.UploadID
|
|
upload.Initiated = session.Initiated
|
|
uploads = append(uploads, upload)
|
|
}
|
|
case resources.KeyMarker != "" && resources.UploadIDMarker != "":
|
|
if session.UploadID > resources.UploadIDMarker {
|
|
if key >= resources.KeyMarker {
|
|
upload := new(UploadMetadata)
|
|
upload.Key = key
|
|
upload.UploadID = session.UploadID
|
|
upload.Initiated = session.Initiated
|
|
uploads = append(uploads, upload)
|
|
}
|
|
}
|
|
default:
|
|
upload := new(UploadMetadata)
|
|
upload.Key = key
|
|
upload.UploadID = session.UploadID
|
|
upload.Initiated = session.Initiated
|
|
uploads = append(uploads, upload)
|
|
}
|
|
}
|
|
}
|
|
sort.Sort(byKey(uploads))
|
|
resources.Upload = uploads
|
|
return resources, nil
|
|
}
|
|
|
|
// partNumber is a sortable interface for Part slice
|
|
type partNumber []*PartMetadata
|
|
|
|
func (a partNumber) Len() int { return len(a) }
|
|
func (a partNumber) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
|
func (a partNumber) Less(i, j int) bool { return a[i].PartNumber < a[j].PartNumber }
|
|
|
|
// ListObjectParts - list parts from incomplete multipart session for a given object
|
|
func (xl API) ListObjectParts(bucket, key string, resources ObjectResourcesMetadata) (ObjectResourcesMetadata, *probe.Error) {
|
|
// Verify upload id
|
|
xl.lock.Lock()
|
|
defer xl.lock.Unlock()
|
|
|
|
if !IsValidBucket(bucket) {
|
|
return ObjectResourcesMetadata{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
|
|
}
|
|
if !IsValidObjectName(key) {
|
|
return ObjectResourcesMetadata{}, probe.NewError(ObjectNameInvalid{Object: key})
|
|
}
|
|
|
|
// TODO: multipart support for xl is broken, since we haven't finalized the format in which
|
|
// it can be stored, disabling this for now until we get the underlying layout stable.
|
|
//
|
|
// if len(xl.config.NodeDiskMap) > 0 {
|
|
// return xl.listObjectParts(bucket, key, resources)
|
|
// }
|
|
|
|
if !xl.storedBuckets.Exists(bucket) {
|
|
return ObjectResourcesMetadata{}, probe.NewError(BucketNotFound{Bucket: bucket})
|
|
}
|
|
storedBucket := xl.storedBuckets.Get(bucket).(storedBucket)
|
|
if _, ok := storedBucket.multiPartSession[key]; ok == false {
|
|
return ObjectResourcesMetadata{}, probe.NewError(ObjectNotFound{Object: key})
|
|
}
|
|
if storedBucket.multiPartSession[key].UploadID != resources.UploadID {
|
|
return ObjectResourcesMetadata{}, probe.NewError(InvalidUploadID{UploadID: resources.UploadID})
|
|
}
|
|
storedParts := storedBucket.partMetadata[key]
|
|
objectResourcesMetadata := resources
|
|
objectResourcesMetadata.Bucket = bucket
|
|
objectResourcesMetadata.Key = key
|
|
var parts []*PartMetadata
|
|
var startPartNumber int
|
|
switch {
|
|
case objectResourcesMetadata.PartNumberMarker == 0:
|
|
startPartNumber = 1
|
|
default:
|
|
startPartNumber = objectResourcesMetadata.PartNumberMarker
|
|
}
|
|
for i := startPartNumber; i <= storedBucket.multiPartSession[key].TotalParts; i++ {
|
|
if len(parts) > objectResourcesMetadata.MaxParts {
|
|
sort.Sort(partNumber(parts))
|
|
objectResourcesMetadata.IsTruncated = true
|
|
objectResourcesMetadata.Part = parts
|
|
objectResourcesMetadata.NextPartNumberMarker = i
|
|
return objectResourcesMetadata, nil
|
|
}
|
|
part, ok := storedParts[i]
|
|
if !ok {
|
|
return ObjectResourcesMetadata{}, probe.NewError(InvalidPart{})
|
|
}
|
|
parts = append(parts, &part)
|
|
}
|
|
sort.Sort(partNumber(parts))
|
|
objectResourcesMetadata.Part = parts
|
|
return objectResourcesMetadata, nil
|
|
}
|
|
|
|
// evictedPart - call back function called by caching module during individual cache evictions
|
|
func (xl API) evictedPart(a ...interface{}) {
|
|
// loop through all buckets
|
|
buckets := xl.storedBuckets.GetAll()
|
|
for bucketName, bucket := range buckets {
|
|
b := bucket.(storedBucket)
|
|
xl.storedBuckets.Set(bucketName, b)
|
|
}
|
|
debug.FreeOSMemory()
|
|
}
|