mirror of
https://github.com/minio/minio.git
synced 2025-11-09 13:39:46 -05:00
Adding multipart support
This commit is contained in:
@@ -29,6 +29,7 @@ import (
|
||||
|
||||
"io/ioutil"
|
||||
|
||||
"errors"
|
||||
"github.com/minio-io/minio/pkg/iodine"
|
||||
"github.com/minio-io/minio/pkg/storage/donut"
|
||||
"github.com/minio-io/minio/pkg/storage/drivers"
|
||||
@@ -397,3 +398,15 @@ func (d donutDriver) CreateObject(bucketName, objectName, contentType, expectedM
|
||||
}
|
||||
return calculatedMD5Sum, nil
|
||||
}
|
||||
|
||||
func (d donutDriver) NewMultipartUpload(bucket, key, contentType string) (string, error) {
|
||||
return "", iodine.New(errors.New("Not Implemented"), nil)
|
||||
}
|
||||
|
||||
func (d donutDriver) CreateObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) {
|
||||
return "", iodine.New(errors.New("Not Implemented"), nil)
|
||||
}
|
||||
|
||||
func (d donutDriver) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) error {
|
||||
return iodine.New(errors.New("Not Implemented"), nil)
|
||||
}
|
||||
|
||||
@@ -38,6 +38,11 @@ type Driver interface {
|
||||
GetObjectMetadata(bucket string, object string, prefix string) (ObjectMetadata, error)
|
||||
ListObjects(bucket string, resources BucketResourcesMetadata) ([]ObjectMetadata, BucketResourcesMetadata, error)
|
||||
CreateObject(bucket string, key string, contentType string, md5sum string, size int64, data io.Reader) (string, error)
|
||||
|
||||
// Object Multipart Operations
|
||||
NewMultipartUpload(bucket string, key string, contentType string) (string, error)
|
||||
CreateObjectPart(bucket string, key string, uploadID string, partID int, contentType string, md5sum string, size int64, data io.Reader) (string, error)
|
||||
CompleteMultipartUpload(bucket string, key string, uploadID string, parts map[int]string) error
|
||||
}
|
||||
|
||||
// BucketACL - bucket level access control
|
||||
|
||||
@@ -20,6 +20,7 @@ import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"crypto/md5"
|
||||
"crypto/sha512"
|
||||
"encoding/base64"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
@@ -32,6 +33,8 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"math/rand"
|
||||
|
||||
"github.com/minio-io/minio/pkg/iodine"
|
||||
"github.com/minio-io/minio/pkg/storage/drivers"
|
||||
)
|
||||
@@ -222,7 +225,7 @@ func getMD5AndData(reader io.Reader) ([]byte, []byte, error) {
|
||||
return hash.Sum(nil), data, nil
|
||||
}
|
||||
|
||||
// CreateObject - PUT object to memory buffer
|
||||
// createObject - PUT object to memory buffer
|
||||
func (memory *memoryDriver) createObject(bucket, key, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) {
|
||||
memory.lock.RLock()
|
||||
if !drivers.IsValidBucket(bucket) {
|
||||
@@ -507,3 +510,69 @@ func (memory *memoryDriver) evictObject(a ...interface{}) {
|
||||
}
|
||||
debug.FreeOSMemory()
|
||||
}
|
||||
|
||||
func (memory *memoryDriver) NewMultipartUpload(bucket, key, contentType string) (string, error) {
|
||||
// TODO verify object doesn't exist
|
||||
id := []byte(strconv.FormatInt(rand.Int63(), 10) + bucket + key + time.Now().String())
|
||||
uploadIDSum := sha512.Sum512(id)
|
||||
uploadID := base64.URLEncoding.EncodeToString(uploadIDSum[:])
|
||||
md5sumBytes := md5.Sum([]byte(uploadID))
|
||||
md5sum := hex.EncodeToString(md5sumBytes[:])
|
||||
memory.CreateObject(bucket, key+"?uploadId="+uploadID, contentType, md5sum, int64(len(uploadID)), bytes.NewBufferString(uploadID))
|
||||
return uploadID, nil
|
||||
}
|
||||
|
||||
func getMultipartKey(key string, uploadID string, partNumber int) string {
|
||||
return key + "?uploadId=" + uploadID + "&partNumber=" + strconv.Itoa(partNumber)
|
||||
}
|
||||
|
||||
func (memory *memoryDriver) CreateObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) {
|
||||
// TODO verify upload id exists
|
||||
return memory.CreateObject(bucket, getMultipartKey(key, uploadID, partID), "", expectedMD5Sum, size, data)
|
||||
}
|
||||
|
||||
func (memory *memoryDriver) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) error {
|
||||
// TODO verify upload id exists
|
||||
memory.lock.Lock()
|
||||
size := int64(0)
|
||||
for i := 1; i <= len(parts); i++ {
|
||||
if _, ok := parts[i]; ok {
|
||||
if object, ok := memory.storedBuckets[bucket].objectMetadata[bucket+"/"+getMultipartKey(key, uploadID, i)]; ok == true {
|
||||
size += object.Size
|
||||
}
|
||||
} else {
|
||||
memory.lock.Unlock()
|
||||
return iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil)
|
||||
}
|
||||
}
|
||||
|
||||
var fullObject bytes.Buffer
|
||||
|
||||
for i := 1; i <= len(parts); i++ {
|
||||
if _, ok := parts[i]; ok {
|
||||
if object, ok := memory.objects.Get(bucket + "/" + getMultipartKey(key, uploadID, i)); ok == true {
|
||||
obj := object.([]byte)
|
||||
io.Copy(&fullObject, bytes.NewBuffer(obj))
|
||||
} else {
|
||||
log.Println("Cannot fetch: ", getMultipartKey(key, uploadID, i))
|
||||
}
|
||||
} else {
|
||||
memory.lock.Unlock()
|
||||
return iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil)
|
||||
}
|
||||
}
|
||||
|
||||
for i := 1; i <= len(parts); i++ {
|
||||
if _, ok := parts[i]; ok {
|
||||
objectKey := bucket + "/" + getMultipartKey(key, uploadID, i)
|
||||
memory.objects.Delete(objectKey)
|
||||
}
|
||||
}
|
||||
|
||||
memory.lock.Unlock()
|
||||
|
||||
md5sumSlice := md5.Sum(fullObject.Bytes())
|
||||
md5sum := base64.StdEncoding.EncodeToString(md5sumSlice[:])
|
||||
_, err := memory.CreateObject(bucket, key, "", md5sum, size, &fullObject)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -124,3 +124,32 @@ func (m *Driver) CreateObject(bucket string, key string, contentType string, md5
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// NewMultipartUpload is a mock
|
||||
func (m *Driver) NewMultipartUpload(bucket string, key string, contentType string) (string, error) {
|
||||
ret := m.Called(bucket, key, contentType)
|
||||
|
||||
r0 := ret.Get(0).(string)
|
||||
r1 := ret.Error(1)
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// CreateObjectPart is a mock
|
||||
func (m *Driver) CreateObjectPart(bucket string, key string, uploadID string, partID int, contentType string, md5sum string, size int64, data io.Reader) (string, error) {
|
||||
ret := m.Called(bucket, key, uploadID, partID, contentType, md5sum, size, data)
|
||||
|
||||
r0 := ret.Get(0).(string)
|
||||
r1 := ret.Error(1)
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// CompleteMultipartUpload is a mock
|
||||
func (m *Driver) CompleteMultipartUpload(bucket string, key string, uploadID string, parts map[int]string) error {
|
||||
ret := m.Called(bucket, key, uploadID, parts)
|
||||
|
||||
r0 := ret.Error(0)
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user