Merge pull request #576 from fkautz/pr_out_adding_multipart_support

This commit is contained in:
Frederick F. Kautz IV 2015-05-07 20:00:20 -07:00
commit 9dc7b82b39
9 changed files with 314 additions and 6 deletions

View File

@ -28,6 +28,7 @@ import (
"github.com/dustin/go-humanize"
"github.com/minio-io/cli"
"github.com/minio-io/minio/pkg/featureflags"
"github.com/minio-io/minio/pkg/iodine"
"github.com/minio-io/minio/pkg/server"
"github.com/minio-io/minio/pkg/server/httpserver"
@ -295,6 +296,9 @@ func getSystemData() map[string]string {
var Version = mustHashBinarySelf()
func main() {
// enable features
featureflags.Enable(featureflags.MultipartPutObject)
// set up iodine
iodine.SetGlobalState("minio.version", Version)
iodine.SetGlobalState("minio.starttime", time.Now().Format(time.RFC3339))

View File

@ -95,6 +95,26 @@ type Owner struct {
DisplayName string
}
// InitiateMultipartUploadResult - Returns upload id to use
type InitiateMultipartUploadResult struct {
XMLName xml.Name `xml:"http://doc.s3.amazonaws.com/2006-03-01 InitiateMultipartUploadResult" json:"-"`
Bucket string
Key string
UploadID string `xml:"UploadId"`
}
// CompleteMultipartUpload - Construct object from uploaded parts
type CompleteMultipartUpload struct {
Part []Part
}
// Part - Description of a multipart part
type Part struct {
PartNumber int
ETag string
}
// List of not implemented bucket queries
var notimplementedBucketResourceNames = map[string]bool{
"policy": true,
@ -113,7 +133,6 @@ var notimplementedBucketResourceNames = map[string]bool{
// List of not implemented object queries
var notimplementedObjectResourceNames = map[string]bool{
"uploadId": true,
"torrent": true,
"uploads": true,
}

View File

@ -20,6 +20,7 @@ import (
"net/http"
"strconv"
"encoding/xml"
"github.com/gorilla/mux"
"github.com/minio-io/minio/pkg/iodine"
"github.com/minio-io/minio/pkg/storage/drivers"
@ -173,8 +174,8 @@ func (server *minioAPI) putObjectHandler(w http.ResponseWriter, req *http.Reques
return
}
// ignoring error here, TODO find a way to reply back if we can
sizeInt, _ := strconv.ParseInt(size, 10, 64)
calculatedMD5, err := server.driver.CreateObject(bucket, object, "", md5, sizeInt, req.Body)
sizeInt64, _ := strconv.ParseInt(size, 10, 64)
calculatedMD5, err := server.driver.CreateObject(bucket, object, "", md5, sizeInt64, req.Body)
switch err := iodine.ToError(err).(type) {
case nil:
{
@ -210,3 +211,156 @@ func (server *minioAPI) putObjectHandler(w http.ResponseWriter, req *http.Reques
}
}
}
func (server *minioAPI) newMultipartUploadHandler(w http.ResponseWriter, req *http.Request) {
acceptsContentType := getContentType(req)
if acceptsContentType == unknownContentType {
writeErrorResponse(w, req, NotAcceptable, acceptsContentType, req.URL.Path)
return
}
// handle ACL's here at bucket level
if !server.isValidOp(w, req, acceptsContentType) {
return
}
var object, bucket string
vars := mux.Vars(req)
bucket = vars["bucket"]
object = vars["object"]
var uploadID string
var err error
if uploadID, err = server.driver.NewMultipartUpload(bucket, object, ""); err != nil {
log.Println(iodine.New(err, nil))
writeErrorResponse(w, req, NotImplemented, acceptsContentType, req.URL.Path)
return
}
response := generateInitiateMultipartUploadResult(bucket, object, uploadID)
encodedSuccessResponse := encodeSuccessResponse(response, acceptsContentType)
// write headers
setCommonHeaders(w, getContentTypeString(acceptsContentType))
// set content-length to the size of the body
w.Header().Set("Content-Length", strconv.Itoa(len(encodedSuccessResponse)))
w.WriteHeader(http.StatusOK)
// write body
w.Write(encodedSuccessResponse)
}
func (server *minioAPI) putObjectPartHandler(w http.ResponseWriter, req *http.Request) {
acceptsContentType := getContentType(req)
if acceptsContentType == unknownContentType {
writeErrorResponse(w, req, NotAcceptable, acceptsContentType, req.URL.Path)
return
}
// handle ACL's here at bucket level
if !server.isValidOp(w, req, acceptsContentType) {
return
}
// get Content-MD5 sent by client and verify if valid
md5 := req.Header.Get("Content-MD5")
if !isValidMD5(md5) {
writeErrorResponse(w, req, InvalidDigest, acceptsContentType, req.URL.Path)
return
}
/// if Content-Length missing, throw away
size := req.Header.Get("Content-Length")
if size == "" {
writeErrorResponse(w, req, MissingContentLength, acceptsContentType, req.URL.Path)
return
}
/// maximum Upload size for objects in a single operation
if isMaxObjectSize(size) {
writeErrorResponse(w, req, EntityTooLarge, acceptsContentType, req.URL.Path)
return
}
/// minimum Upload size for objects in a single operation
if isMinObjectSize(size) {
writeErrorResponse(w, req, EntityTooSmall, acceptsContentType, req.URL.Path)
return
}
// ignoring error here, TODO find a way to reply back if we can
sizeInt64, _ := strconv.ParseInt(size, 10, 64)
var object, bucket string
vars := mux.Vars(req)
bucket = vars["bucket"]
object = vars["object"]
uploadID := vars["uploadID"]
partIDString := vars["partNumber"]
partID, err := strconv.Atoi(partIDString)
if err != nil {
// TODO find the write value for this error
writeErrorResponse(w, req, NotAcceptable, acceptsContentType, req.URL.Path)
}
calculatedMD5, err := server.driver.CreateObjectPart(bucket, object, uploadID, partID, "", md5, sizeInt64, req.Body)
switch err := iodine.ToError(err).(type) {
case nil:
{
w.Header().Set("ETag", calculatedMD5)
writeSuccessResponse(w, acceptsContentType)
}
case drivers.ObjectExists:
{
writeErrorResponse(w, req, MethodNotAllowed, acceptsContentType, req.URL.Path)
}
case drivers.BadDigest:
{
writeErrorResponse(w, req, BadDigest, acceptsContentType, req.URL.Path)
}
case drivers.EntityTooLarge:
{
writeErrorResponse(w, req, EntityTooLarge, acceptsContentType, req.URL.Path)
}
case drivers.InvalidDigest:
{
writeErrorResponse(w, req, InvalidDigest, acceptsContentType, req.URL.Path)
}
case drivers.ImplementationError:
{
log.Error.Println(err)
writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path)
}
default:
{
log.Error.Println(err)
writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path)
}
}
}
func (server *minioAPI) completeMultipartUploadHandler(w http.ResponseWriter, req *http.Request) {
acceptsContentType := getContentType(req)
if acceptsContentType == unknownContentType {
writeErrorResponse(w, req, NotAcceptable, acceptsContentType, req.URL.Path)
return
}
decoder := xml.NewDecoder(req.Body)
parts := &CompleteMultipartUpload{}
err := decoder.Decode(parts)
if err != nil {
log.Error.Println(err)
writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path)
}
partMap := make(map[int]string)
vars := mux.Vars(req)
bucket := vars["bucket"]
object := vars["object"]
uploadID := vars["uploadID"]
for _, part := range parts.Part {
partMap[part.PartNumber] = part.ETag
}
err = server.driver.CompleteMultipartUpload(bucket, object, uploadID, partMap)
}
func (server *minioAPI) notImplementedHandler(w http.ResponseWriter, req *http.Request) {
acceptsContentType := getContentType(req)
writeErrorResponse(w, req, NotImplemented, acceptsContentType, req.URL.Path)
}

View File

@ -112,6 +112,14 @@ func generateListObjectsResponse(bucket string, objects []drivers.ObjectMetadata
return data
}
func generateInitiateMultipartUploadResult(bucket, key, uploadID string) InitiateMultipartUploadResult {
return InitiateMultipartUploadResult{
Bucket: bucket,
Key: key,
UploadID: uploadID,
}
}
// writeSuccessResponse - write success headers
func writeSuccessResponse(w http.ResponseWriter, acceptsContentType contentType) {
setCommonHeaders(w, getContentTypeString(acceptsContentType))

View File

@ -24,6 +24,7 @@ import (
"github.com/minio-io/minio/pkg/api/config"
"github.com/minio-io/minio/pkg/api/logging"
"github.com/minio-io/minio/pkg/api/quota"
"github.com/minio-io/minio/pkg/featureflags"
"github.com/minio-io/minio/pkg/iodine"
"github.com/minio-io/minio/pkg/storage/drivers"
)
@ -46,6 +47,12 @@ func HTTPHandler(driver drivers.Driver) http.Handler {
mux.HandleFunc("/{bucket}", api.headBucketHandler).Methods("HEAD")
mux.HandleFunc("/{bucket}/{object:.*}", api.getObjectHandler).Methods("GET")
mux.HandleFunc("/{bucket}/{object:.*}", api.headObjectHandler).Methods("HEAD")
if featureflags.Get(featureflags.MultipartPutObject) {
log.Println("Enabling feature", featureflags.MultipartPutObject)
mux.HandleFunc("/{bucket}/{object:.*}?uploads", api.newMultipartUploadHandler).Methods("POST")
mux.HandleFunc("/{bucket}/{object:.*}", api.putObjectPartHandler).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}").Methods("PUT")
mux.HandleFunc("/{bucket}/{object:.*}", api.completeMultipartUploadHandler).Queries("uploadId", "{uploadId:.*}").Methods("POST")
}
mux.HandleFunc("/{bucket}/{object:.*}", api.putObjectHandler).Methods("PUT")
var conf = config.Config{}

View File

@ -29,6 +29,7 @@ import (
"io/ioutil"
"errors"
"github.com/minio-io/minio/pkg/iodine"
"github.com/minio-io/minio/pkg/storage/donut"
"github.com/minio-io/minio/pkg/storage/drivers"
@ -397,3 +398,15 @@ func (d donutDriver) CreateObject(bucketName, objectName, contentType, expectedM
}
return calculatedMD5Sum, nil
}
func (d donutDriver) NewMultipartUpload(bucket, key, contentType string) (string, error) {
return "", iodine.New(errors.New("Not Implemented"), nil)
}
func (d donutDriver) CreateObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) {
return "", iodine.New(errors.New("Not Implemented"), nil)
}
func (d donutDriver) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) error {
return iodine.New(errors.New("Not Implemented"), nil)
}

View File

@ -38,6 +38,11 @@ type Driver interface {
GetObjectMetadata(bucket string, object string, prefix string) (ObjectMetadata, error)
ListObjects(bucket string, resources BucketResourcesMetadata) ([]ObjectMetadata, BucketResourcesMetadata, error)
CreateObject(bucket string, key string, contentType string, md5sum string, size int64, data io.Reader) (string, error)
// Object Multipart Operations
NewMultipartUpload(bucket string, key string, contentType string) (string, error)
CreateObjectPart(bucket string, key string, uploadID string, partID int, contentType string, md5sum string, size int64, data io.Reader) (string, error)
CompleteMultipartUpload(bucket string, key string, uploadID string, parts map[int]string) error
}
// BucketACL - bucket level access control

View File

@ -20,6 +20,7 @@ import (
"bufio"
"bytes"
"crypto/md5"
"crypto/sha512"
"encoding/base64"
"encoding/hex"
"errors"
@ -32,6 +33,8 @@ import (
"sync"
"time"
"math/rand"
"github.com/minio-io/minio/pkg/iodine"
"github.com/minio-io/minio/pkg/storage/drivers"
)
@ -222,7 +225,7 @@ func getMD5AndData(reader io.Reader) ([]byte, []byte, error) {
return hash.Sum(nil), data, nil
}
// CreateObject - PUT object to memory buffer
// createObject - PUT object to memory buffer
func (memory *memoryDriver) createObject(bucket, key, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) {
memory.lock.RLock()
if !drivers.IsValidBucket(bucket) {
@ -507,3 +510,69 @@ func (memory *memoryDriver) evictObject(a ...interface{}) {
}
debug.FreeOSMemory()
}
func (memory *memoryDriver) NewMultipartUpload(bucket, key, contentType string) (string, error) {
// TODO verify object doesn't exist
id := []byte(strconv.FormatInt(rand.Int63(), 10) + bucket + key + time.Now().String())
uploadIDSum := sha512.Sum512(id)
uploadID := base64.URLEncoding.EncodeToString(uploadIDSum[:])
md5sumBytes := md5.Sum([]byte(uploadID))
md5sum := hex.EncodeToString(md5sumBytes[:])
memory.CreateObject(bucket, key+"?uploadId="+uploadID, contentType, md5sum, int64(len(uploadID)), bytes.NewBufferString(uploadID))
return uploadID, nil
}
func getMultipartKey(key string, uploadID string, partNumber int) string {
return key + "?uploadId=" + uploadID + "&partNumber=" + strconv.Itoa(partNumber)
}
func (memory *memoryDriver) CreateObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) {
// TODO verify upload id exists
return memory.CreateObject(bucket, getMultipartKey(key, uploadID, partID), "", expectedMD5Sum, size, data)
}
func (memory *memoryDriver) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) error {
// TODO verify upload id exists
memory.lock.Lock()
size := int64(0)
for i := 1; i <= len(parts); i++ {
if _, ok := parts[i]; ok {
if object, ok := memory.storedBuckets[bucket].objectMetadata[bucket+"/"+getMultipartKey(key, uploadID, i)]; ok == true {
size += object.Size
}
} else {
memory.lock.Unlock()
return iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil)
}
}
var fullObject bytes.Buffer
for i := 1; i <= len(parts); i++ {
if _, ok := parts[i]; ok {
if object, ok := memory.objects.Get(bucket + "/" + getMultipartKey(key, uploadID, i)); ok == true {
obj := object.([]byte)
io.Copy(&fullObject, bytes.NewBuffer(obj))
} else {
log.Println("Cannot fetch: ", getMultipartKey(key, uploadID, i))
}
} else {
memory.lock.Unlock()
return iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil)
}
}
for i := 1; i <= len(parts); i++ {
if _, ok := parts[i]; ok {
objectKey := bucket + "/" + getMultipartKey(key, uploadID, i)
memory.objects.Delete(objectKey)
}
}
memory.lock.Unlock()
md5sumSlice := md5.Sum(fullObject.Bytes())
md5sum := base64.StdEncoding.EncodeToString(md5sumSlice[:])
_, err := memory.CreateObject(bucket, key, "", md5sum, size, &fullObject)
return err
}

View File

@ -124,3 +124,32 @@ func (m *Driver) CreateObject(bucket string, key string, contentType string, md5
return r0, r1
}
// NewMultipartUpload is a mock
func (m *Driver) NewMultipartUpload(bucket string, key string, contentType string) (string, error) {
ret := m.Called(bucket, key, contentType)
r0 := ret.Get(0).(string)
r1 := ret.Error(1)
return r0, r1
}
// CreateObjectPart is a mock
func (m *Driver) CreateObjectPart(bucket string, key string, uploadID string, partID int, contentType string, md5sum string, size int64, data io.Reader) (string, error) {
ret := m.Called(bucket, key, uploadID, partID, contentType, md5sum, size, data)
r0 := ret.Get(0).(string)
r1 := ret.Error(1)
return r0, r1
}
// CompleteMultipartUpload is a mock
func (m *Driver) CompleteMultipartUpload(bucket string, key string, uploadID string, parts map[int]string) error {
ret := m.Called(bucket, key, uploadID, parts)
r0 := ret.Error(0)
return r0
}