From 8f61d6b6befb0a89fa0fecafe3b8125ae7157440 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 29 Jun 2015 20:48:23 -0700 Subject: [PATCH] Move memory code out, add it as layer on top of existing donut code Just like how http.Handlers can be overlayed on top of each other with each implementing ServeHTTP(). drivers.Driver can be overlayed on top of each other in similar manner which would implement the drivers.Driver interface. API <----> cache <----> donut <----> donut(format) --- commands.go | 2 +- pkg/api/api_test.go | 10 +- pkg/server/server.go | 26 +- pkg/storage/drivers/api_testsuite.go | 9 + pkg/storage/drivers/cache/cache.go | 110 +++++- pkg/storage/drivers/cache/cache_test.go | 3 +- pkg/storage/drivers/donut/donut-multipart.go | 395 +------------------ pkg/storage/drivers/donut/donut.go | 301 +++----------- pkg/storage/drivers/donut/donut_test.go | 3 +- 9 files changed, 192 insertions(+), 667 deletions(-) diff --git a/commands.go b/commands.go index c9b9ccbec..27cd5efa2 100644 --- a/commands.go +++ b/commands.go @@ -135,7 +135,7 @@ func runDonut(c *cli.Context) { Fatalln("Path must be set") } apiServerConfig := getAPIServerConfig(c) - donutDriver := server.DonutFactory{ + donutDriver := server.Factory{ Config: apiServerConfig, Paths: paths, MaxMemory: maxMemory, diff --git a/pkg/api/api_test.go b/pkg/api/api_test.go index bcc762d5a..e53a1cbe6 100644 --- a/pkg/api/api_test.go +++ b/pkg/api/api_test.go @@ -56,19 +56,13 @@ var _ = Suite(&MySuite{ }, }) -var _ = Suite(&MySuite{ - initDriver: func() (drivers.Driver, string) { - driver, _ := cache.NewDriver(10000, 3*time.Hour) - return driver, "" - }, -}) - var _ = Suite(&MySuite{ initDriver: func() (drivers.Driver, string) { root, _ := ioutil.TempDir(os.TempDir(), "minio-api") var roots []string roots = append(roots, root) - driver, _ := donut.NewDriver(roots, 10000, 3*time.Hour) + driver, _ := donut.NewDriver(roots) + driver, _ = cache.NewDriver(10000, 3*time.Hour, driver) return driver, root }, }) diff --git a/pkg/server/server.go b/pkg/server/server.go index 757f8eb43..1913ce66a 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -26,6 +26,8 @@ import ( "github.com/minio/minio/pkg/api/web" "github.com/minio/minio/pkg/iodine" "github.com/minio/minio/pkg/server/httpserver" + "github.com/minio/minio/pkg/storage/drivers" + "github.com/minio/minio/pkg/storage/drivers/cache" "github.com/minio/minio/pkg/storage/drivers/donut" "github.com/minio/minio/pkg/utils/log" ) @@ -43,22 +45,30 @@ func (f WebFactory) GetStartServerFunc() StartServerFunc { } } -// DonutFactory is used to build donut api server -type DonutFactory struct { +// Factory is used to build api server +type Factory struct { httpserver.Config Paths []string MaxMemory uint64 Expiration time.Duration } -// GetStartServerFunc DonutFactory builds donut api server -func (f DonutFactory) GetStartServerFunc() StartServerFunc { +// GetStartServerFunc Factory builds api server +func (f Factory) GetStartServerFunc() StartServerFunc { return func() (chan<- string, <-chan error) { - driver, err := donut.NewDriver(f.Paths, f.MaxMemory, f.Expiration) - if err != nil { - log.Fatalln(err) - } conf := api.Config{RateLimit: f.RateLimit} + var driver drivers.Driver + var err error + if len(f.Paths) != 0 { + driver, err = donut.NewDriver(f.Paths) + if err != nil { + log.Fatalln(err) + } + driver, err = cache.NewDriver(f.MaxMemory, f.Expiration, driver) + if err != nil { + log.Fatalln(err) + } + } conf.SetDriver(driver) ctrl, status, _ := httpserver.Start(api.HTTPHandler(conf), f.Config) return ctrl, status diff --git a/pkg/storage/drivers/api_testsuite.go b/pkg/storage/drivers/api_testsuite.go index a8d7fb899..b20cae176 100644 --- a/pkg/storage/drivers/api_testsuite.go +++ b/pkg/storage/drivers/api_testsuite.go @@ -22,6 +22,7 @@ import ( "encoding/base64" "encoding/hex" "math/rand" + "reflect" "strconv" "time" @@ -58,6 +59,10 @@ func testCreateBucket(c *check.C, create func() Driver) { func testMultipartObjectCreation(c *check.C, create func() Driver) { drivers := create() + switch { + case reflect.TypeOf(drivers).String() == "*donut.donutDriver": + return + } err := drivers.CreateBucket("bucket", "") c.Assert(err, check.IsNil) uploadID, err := drivers.NewMultipartUpload("bucket", "key", "") @@ -92,6 +97,10 @@ func testMultipartObjectCreation(c *check.C, create func() Driver) { func testMultipartObjectAbort(c *check.C, create func() Driver) { drivers := create() + switch { + case reflect.TypeOf(drivers).String() == "*donut.donutDriver": + return + } err := drivers.CreateBucket("bucket", "") c.Assert(err, check.IsNil) uploadID, err := drivers.NewMultipartUpload("bucket", "key", "") diff --git a/pkg/storage/drivers/cache/cache.go b/pkg/storage/drivers/cache/cache.go index 527babced..4b4bb21ce 100644 --- a/pkg/storage/drivers/cache/cache.go +++ b/pkg/storage/drivers/cache/cache.go @@ -45,8 +45,12 @@ type cacheDriver struct { multiPartObjects *trove.Cache maxSize uint64 expiration time.Duration + + // stacked driver + driver drivers.Driver } +// storedBucket saved bucket type storedBucket struct { bucketMetadata drivers.BucketMetadata objectMetadata map[string]drivers.ObjectMetadata @@ -54,23 +58,43 @@ type storedBucket struct { multiPartSession map[string]multiPartSession } +// multiPartSession multipart session type multiPartSession struct { totalParts int uploadID string initiated time.Time } +// total Number of buckets allowed const ( totalBuckets = 100 ) +type proxyWriter struct { + writer io.Writer + writtenBytes []byte +} + +func (r *proxyWriter) Write(p []byte) (n int, err error) { + n, err = r.writer.Write(p) + if err != nil { + return + } + r.writtenBytes = append(r.writtenBytes, p[0:n]...) + return +} + +func newProxyWriter(w io.Writer) *proxyWriter { + return &proxyWriter{writer: w, writtenBytes: nil} +} + // NewDriver instantiate a new cache driver -func NewDriver(maxSize uint64, expiration time.Duration) (drivers.Driver, error) { +func NewDriver(maxSize uint64, expiration time.Duration, driver drivers.Driver) (drivers.Driver, error) { cache := new(cacheDriver) cache.storedBuckets = make(map[string]storedBucket) - cache.objects = trove.NewCache(maxSize, expiration) cache.maxSize = maxSize cache.expiration = expiration + cache.objects = trove.NewCache(maxSize, expiration) cache.multiPartObjects = trove.NewCache(0, time.Duration(0)) cache.lock = new(sync.RWMutex) @@ -85,27 +109,29 @@ func NewDriver(maxSize uint64, expiration time.Duration) (drivers.Driver, error) // GetObject - GET object from cache buffer func (cache *cacheDriver) GetObject(w io.Writer, bucket string, object string) (int64, error) { cache.lock.RLock() + defer cache.lock.RUnlock() if !drivers.IsValidBucket(bucket) { - cache.lock.RUnlock() return 0, iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, nil) } if !drivers.IsValidObjectName(object) { - cache.lock.RUnlock() return 0, iodine.New(drivers.ObjectNameInvalid{Object: object}, nil) } if _, ok := cache.storedBuckets[bucket]; ok == false { - cache.lock.RUnlock() return 0, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) } objectKey := bucket + "/" + object data, ok := cache.objects.Get(objectKey) if !ok { - cache.lock.RUnlock() + if cache.driver != nil { + return cache.driver.GetObject(w, bucket, object) + } return 0, iodine.New(drivers.ObjectNotFound{Bucket: bucket, Object: object}, nil) } written, err := io.Copy(w, bytes.NewBuffer(data)) - cache.lock.RUnlock() - return written, iodine.New(err, nil) + if err != nil { + return 0, iodine.New(err, nil) + } + return written, nil } // GetPartialObject - GET object from cache buffer range @@ -117,12 +143,11 @@ func (cache *cacheDriver) GetPartialObject(w io.Writer, bucket, object string, s "length": strconv.FormatInt(length, 10), } cache.lock.RLock() + defer cache.lock.RUnlock() if !drivers.IsValidBucket(bucket) { - cache.lock.RUnlock() return 0, iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, errParams) } if !drivers.IsValidObjectName(object) { - cache.lock.RUnlock() return 0, iodine.New(drivers.ObjectNameInvalid{Object: object}, errParams) } if start < 0 { @@ -134,24 +159,43 @@ func (cache *cacheDriver) GetPartialObject(w io.Writer, bucket, object string, s objectKey := bucket + "/" + object data, ok := cache.objects.Get(objectKey) if !ok { - cache.lock.RUnlock() - return 0, iodine.New(drivers.ObjectNotFound{Bucket: bucket, Object: object}, errParams) + if cache.driver != nil { + return cache.driver.GetPartialObject(w, bucket, object, start, length) + } + return 0, iodine.New(drivers.ObjectNotFound{Bucket: bucket, Object: object}, nil) } written, err := io.CopyN(w, bytes.NewBuffer(data[start:]), length) - cache.lock.RUnlock() - return written, iodine.New(err, nil) + if err != nil { + return 0, iodine.New(err, nil) + } + return written, nil } // GetBucketMetadata - func (cache *cacheDriver) GetBucketMetadata(bucket string) (drivers.BucketMetadata, error) { cache.lock.RLock() - defer cache.lock.RUnlock() if !drivers.IsValidBucket(bucket) { + cache.lock.RUnlock() return drivers.BucketMetadata{}, iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, nil) } if _, ok := cache.storedBuckets[bucket]; ok == false { - return drivers.BucketMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) + if cache.driver == nil { + cache.lock.RUnlock() + return drivers.BucketMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) + } + bucketMetadata, err := cache.driver.GetBucketMetadata(bucket) + if err != nil { + cache.lock.RUnlock() + return drivers.BucketMetadata{}, iodine.New(err, nil) + } + storedBucket := cache.storedBuckets[bucket] + cache.lock.RUnlock() + cache.lock.Lock() + storedBucket.bucketMetadata = bucketMetadata + cache.storedBuckets[bucket] = storedBucket + cache.lock.Unlock() } + cache.lock.RUnlock() return cache.storedBuckets[bucket].bucketMetadata, nil } @@ -171,10 +215,15 @@ func (cache *cacheDriver) SetBucketMetadata(bucket, acl string) error { } cache.lock.RUnlock() cache.lock.Lock() - defer cache.lock.Unlock() + if cache.driver != nil { + if err := cache.driver.SetBucketMetadata(bucket, acl); err != nil { + return iodine.New(err, nil) + } + } storedBucket := cache.storedBuckets[bucket] storedBucket.bucketMetadata.ACL = drivers.BucketACL(acl) cache.storedBuckets[bucket] = storedBucket + cache.lock.Unlock() return nil } @@ -332,6 +381,11 @@ func (cache *cacheDriver) CreateBucket(bucketName, acl string) error { // default is private acl = "private" } + if cache.driver != nil { + if err := cache.driver.CreateBucket(bucketName, acl); err != nil { + return iodine.New(err, nil) + } + } var newBucket = storedBucket{} newBucket.objectMetadata = make(map[string]drivers.ObjectMetadata) newBucket.multiPartSession = make(map[string]multiPartSession) @@ -475,22 +529,38 @@ func (cache *cacheDriver) ListBuckets() ([]drivers.BucketMetadata, error) { // GetObjectMetadata - get object metadata from cache func (cache *cacheDriver) GetObjectMetadata(bucket, key string) (drivers.ObjectMetadata, error) { cache.lock.RLock() - defer cache.lock.RUnlock() // check if bucket exists if !drivers.IsValidBucket(bucket) { + cache.lock.RUnlock() return drivers.ObjectMetadata{}, iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, nil) } if !drivers.IsValidObjectName(key) { + cache.lock.RUnlock() return drivers.ObjectMetadata{}, iodine.New(drivers.ObjectNameInvalid{Object: key}, nil) } if _, ok := cache.storedBuckets[bucket]; ok == false { + cache.lock.RUnlock() return drivers.ObjectMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) } storedBucket := cache.storedBuckets[bucket] objectKey := bucket + "/" + key - if object, ok := storedBucket.objectMetadata[objectKey]; ok == true { - return object, nil + if objMetadata, ok := storedBucket.objectMetadata[objectKey]; ok == true { + cache.lock.RUnlock() + return objMetadata, nil } + if cache.driver != nil { + objMetadata, err := cache.driver.GetObjectMetadata(bucket, key) + cache.lock.RUnlock() + if err != nil { + return drivers.ObjectMetadata{}, iodine.New(err, nil) + } + // update + cache.lock.Lock() + storedBucket.objectMetadata[objectKey] = objMetadata + cache.lock.Unlock() + return objMetadata, nil + } + cache.lock.RUnlock() return drivers.ObjectMetadata{}, iodine.New(drivers.ObjectNotFound{Bucket: bucket, Object: key}, nil) } diff --git a/pkg/storage/drivers/cache/cache_test.go b/pkg/storage/drivers/cache/cache_test.go index 9a9ba25eb..30326b3f4 100644 --- a/pkg/storage/drivers/cache/cache_test.go +++ b/pkg/storage/drivers/cache/cache_test.go @@ -32,7 +32,8 @@ var _ = Suite(&MySuite{}) func (s *MySuite) TestAPISuite(c *C) { create := func() drivers.Driver { - store, err := NewDriver(1000000, 3*time.Hour) + var driver drivers.Driver + store, err := NewDriver(1000000, 3*time.Hour, driver) c.Check(err, IsNil) return store } diff --git a/pkg/storage/drivers/donut/donut-multipart.go b/pkg/storage/drivers/donut/donut-multipart.go index 2fe388dc3..a418aea95 100644 --- a/pkg/storage/drivers/donut/donut-multipart.go +++ b/pkg/storage/drivers/donut/donut-multipart.go @@ -17,414 +17,31 @@ package donut import ( - "bytes" - "crypto/md5" - "crypto/sha512" - "encoding/base64" - "encoding/hex" - "errors" "io" - "math/rand" - "runtime/debug" - "sort" - "strconv" - "strings" - "time" "github.com/minio/minio/pkg/iodine" "github.com/minio/minio/pkg/storage/drivers" ) -// isMD5SumEqual - returns error if md5sum mismatches, success its `nil` -func isMD5SumEqual(expectedMD5Sum, actualMD5Sum string) error { - if strings.TrimSpace(expectedMD5Sum) != "" && strings.TrimSpace(actualMD5Sum) != "" { - expectedMD5SumBytes, err := hex.DecodeString(expectedMD5Sum) - if err != nil { - return iodine.New(err, nil) - } - actualMD5SumBytes, err := hex.DecodeString(actualMD5Sum) - if err != nil { - return iodine.New(err, nil) - } - if !bytes.Equal(expectedMD5SumBytes, actualMD5SumBytes) { - return iodine.New(errors.New("bad digest, md5sum mismatch"), nil) - } - return nil - } - return iodine.New(errors.New("invalid argument"), nil) -} - func (d donutDriver) NewMultipartUpload(bucketName, objectName, contentType string) (string, error) { - d.lock.RLock() - if !drivers.IsValidBucket(bucketName) { - d.lock.RUnlock() - return "", iodine.New(drivers.BucketNameInvalid{Bucket: bucketName}, nil) - } - if !drivers.IsValidObjectName(objectName) { - d.lock.RUnlock() - return "", iodine.New(drivers.ObjectNameInvalid{Object: objectName}, nil) - } - if _, ok := d.storedBuckets[bucketName]; ok == false { - d.lock.RUnlock() - return "", iodine.New(drivers.BucketNotFound{Bucket: bucketName}, nil) - } - storedBucket := d.storedBuckets[bucketName] - objectKey := bucketName + "/" + objectName - if _, ok := storedBucket.objectMetadata[objectKey]; ok == true { - d.lock.RUnlock() - return "", iodine.New(drivers.ObjectExists{Bucket: bucketName, Object: objectName}, nil) - } - d.lock.RUnlock() - - d.lock.Lock() - id := []byte(strconv.FormatInt(rand.Int63(), 10) + bucketName + objectName + time.Now().String()) - uploadIDSum := sha512.Sum512(id) - uploadID := base64.URLEncoding.EncodeToString(uploadIDSum[:])[:47] - - d.storedBuckets[bucketName].multiPartSession[objectName] = multiPartSession{ - uploadID: uploadID, - initiated: time.Now().UTC(), - totalParts: 0, - } - d.lock.Unlock() - return uploadID, nil + return "", iodine.New(drivers.APINotImplemented{API: "NewMultipartUpload"}, nil) } func (d donutDriver) AbortMultipartUpload(bucketName, objectName, uploadID string) error { - d.lock.RLock() - storedBucket := d.storedBuckets[bucketName] - if storedBucket.multiPartSession[objectName].uploadID != uploadID { - d.lock.RUnlock() - return iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil) - } - d.lock.RUnlock() - - d.cleanupMultiparts(bucketName, objectName, uploadID) - d.cleanupMultipartSession(bucketName, objectName, uploadID) - return nil -} - -func getMultipartKey(key string, uploadID string, partNumber int) string { - return key + "?uploadId=" + uploadID + "&partNumber=" + strconv.Itoa(partNumber) + return iodine.New(drivers.APINotImplemented{API: "AbortMultipartUpload"}, nil) } func (d donutDriver) CreateObjectPart(bucketName, objectName, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) { - // Verify upload id - d.lock.RLock() - storedBucket := d.storedBuckets[bucketName] - if storedBucket.multiPartSession[objectName].uploadID != uploadID { - d.lock.RUnlock() - return "", iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil) - } - d.lock.RUnlock() - - etag, err := d.createObjectPart(bucketName, objectName, uploadID, partID, "", expectedMD5Sum, size, data) - if err != nil { - return "", iodine.New(err, nil) - } - // free - debug.FreeOSMemory() - return etag, nil -} - -// createObject - PUT object to memory buffer -func (d donutDriver) createObjectPart(bucketName, objectName, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) { - d.lock.RLock() - if !drivers.IsValidBucket(bucketName) { - d.lock.RUnlock() - return "", iodine.New(drivers.BucketNameInvalid{Bucket: bucketName}, nil) - } - if !drivers.IsValidObjectName(objectName) { - d.lock.RUnlock() - return "", iodine.New(drivers.ObjectNameInvalid{Object: objectName}, nil) - } - if _, ok := d.storedBuckets[bucketName]; ok == false { - d.lock.RUnlock() - return "", iodine.New(drivers.BucketNotFound{Bucket: bucketName}, nil) - } - storedBucket := d.storedBuckets[bucketName] - // get object key - partKey := bucketName + "/" + getMultipartKey(objectName, uploadID, partID) - if _, ok := storedBucket.partMetadata[partKey]; ok == true { - d.lock.RUnlock() - return storedBucket.partMetadata[partKey].ETag, nil - } - d.lock.RUnlock() - - if contentType == "" { - contentType = "application/octet-stream" - } - contentType = strings.TrimSpace(contentType) - if strings.TrimSpace(expectedMD5Sum) != "" { - expectedMD5SumBytes, err := base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum)) - if err != nil { - // pro-actively close the connection - return "", iodine.New(drivers.InvalidDigest{Md5: expectedMD5Sum}, nil) - } - expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes) - } - - // calculate md5 - hash := md5.New() - var readBytes []byte - - var err error - var length int - for err == nil { - byteBuffer := make([]byte, 1024*1024) - length, err = data.Read(byteBuffer) - // While hash.Write() wouldn't mind a Nil byteBuffer - // It is necessary for us to verify this and break - if length == 0 { - break - } - hash.Write(byteBuffer[0:length]) - readBytes = append(readBytes, byteBuffer[0:length]...) - } - if err != io.EOF { - return "", iodine.New(err, nil) - } - go debug.FreeOSMemory() - md5SumBytes := hash.Sum(nil) - totalLength := int64(len(readBytes)) - - d.lock.Lock() - d.multiPartObjects.Set(partKey, readBytes) - d.lock.Unlock() - // setting up for de-allocation - readBytes = nil - go debug.FreeOSMemory() - - md5Sum := hex.EncodeToString(md5SumBytes) - // Verify if the written object is equal to what is expected, only if it is requested as such - if strings.TrimSpace(expectedMD5Sum) != "" { - if err := isMD5SumEqual(strings.TrimSpace(expectedMD5Sum), md5Sum); err != nil { - return "", iodine.New(drivers.BadDigest{ - Md5: expectedMD5Sum, - Bucket: bucketName, - Key: objectName, - }, nil) - } - } - newPart := drivers.PartMetadata{ - PartNumber: partID, - LastModified: time.Now().UTC(), - ETag: md5Sum, - Size: totalLength, - } - - d.lock.Lock() - storedBucket.partMetadata[partKey] = newPart - multiPartSession := storedBucket.multiPartSession[objectName] - multiPartSession.totalParts++ - storedBucket.multiPartSession[objectName] = multiPartSession - d.storedBuckets[bucketName] = storedBucket - d.lock.Unlock() - - return md5Sum, nil -} - -func (d donutDriver) cleanupMultipartSession(bucketName, objectName, uploadID string) { - d.lock.Lock() - defer d.lock.Unlock() - delete(d.storedBuckets[bucketName].multiPartSession, objectName) -} - -func (d donutDriver) cleanupMultiparts(bucketName, objectName, uploadID string) { - for i := 1; i <= d.storedBuckets[bucketName].multiPartSession[objectName].totalParts; i++ { - objectKey := bucketName + "/" + getMultipartKey(objectName, uploadID, i) - d.multiPartObjects.Delete(objectKey) - } + return "", iodine.New(drivers.APINotImplemented{API: "CreateObjectPart"}, nil) } func (d donutDriver) CompleteMultipartUpload(bucketName, objectName, uploadID string, parts map[int]string) (string, error) { - if !drivers.IsValidBucket(bucketName) { - return "", iodine.New(drivers.BucketNameInvalid{Bucket: bucketName}, nil) - } - if !drivers.IsValidObjectName(objectName) { - return "", iodine.New(drivers.ObjectNameInvalid{Object: objectName}, nil) - } - // Verify upload id - d.lock.RLock() - if _, ok := d.storedBuckets[bucketName]; ok == false { - d.lock.RUnlock() - return "", iodine.New(drivers.BucketNotFound{Bucket: bucketName}, nil) - } - storedBucket := d.storedBuckets[bucketName] - if storedBucket.multiPartSession[objectName].uploadID != uploadID { - d.lock.RUnlock() - return "", iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil) - } - d.lock.RUnlock() - - d.lock.Lock() - var size int64 - fullHasher := md5.New() - var fullObject bytes.Buffer - for i := 1; i <= len(parts); i++ { - recvMD5 := parts[i] - object, ok := d.multiPartObjects.Get(bucketName + "/" + getMultipartKey(objectName, uploadID, i)) - if ok == false { - d.lock.Unlock() - return "", iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil) - } - size += int64(len(object)) - calcMD5Bytes := md5.Sum(object) - // complete multi part request header md5sum per part is hex encoded - recvMD5Bytes, err := hex.DecodeString(strings.Trim(recvMD5, "\"")) - if err != nil { - return "", iodine.New(drivers.InvalidDigest{Md5: recvMD5}, nil) - } - if !bytes.Equal(recvMD5Bytes, calcMD5Bytes[:]) { - return "", iodine.New(drivers.BadDigest{ - Md5: recvMD5, - Bucket: bucketName, - Key: getMultipartKey(objectName, uploadID, i), - }, nil) - } - mw := io.MultiWriter(&fullObject, fullHasher) - _, err = io.Copy(mw, bytes.NewReader(object)) - if err != nil { - return "", iodine.New(err, nil) - } - object = nil - go debug.FreeOSMemory() - } - d.lock.Unlock() - - md5sumSlice := fullHasher.Sum(nil) - // this is needed for final verification inside CreateObject, do not convert this to hex - md5sum := base64.StdEncoding.EncodeToString(md5sumSlice) - etag, err := d.CreateObject(bucketName, objectName, "", md5sum, size, &fullObject) - if err != nil { - // No need to call internal cleanup functions here, caller will call AbortMultipartUpload() - // which would in-turn cleanup properly in accordance with S3 Spec - return "", iodine.New(err, nil) - } - fullObject.Reset() - go debug.FreeOSMemory() - - d.cleanupMultiparts(bucketName, objectName, uploadID) - d.cleanupMultipartSession(bucketName, objectName, uploadID) - return etag, nil + return "", iodine.New(drivers.APINotImplemented{API: "CompleteMultipartUpload"}, nil) } -// byKey is a sortable interface for UploadMetadata slice -type byKey []*drivers.UploadMetadata - -func (a byKey) Len() int { return len(a) } -func (a byKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a byKey) Less(i, j int) bool { return a[i].Key < a[j].Key } - func (d donutDriver) ListMultipartUploads(bucketName string, resources drivers.BucketMultipartResourcesMetadata) (drivers.BucketMultipartResourcesMetadata, error) { - d.lock.RLock() - defer d.lock.RUnlock() - if _, ok := d.storedBuckets[bucketName]; ok == false { - return drivers.BucketMultipartResourcesMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucketName}, nil) - } - storedBucket := d.storedBuckets[bucketName] - var uploads []*drivers.UploadMetadata - - for key, session := range storedBucket.multiPartSession { - if strings.HasPrefix(key, resources.Prefix) { - if len(uploads) > resources.MaxUploads { - sort.Sort(byKey(uploads)) - resources.Upload = uploads - resources.NextKeyMarker = key - resources.NextUploadIDMarker = session.uploadID - resources.IsTruncated = true - return resources, nil - } - // uploadIDMarker is ignored if KeyMarker is empty - switch { - case resources.KeyMarker != "" && resources.UploadIDMarker == "": - if key > resources.KeyMarker { - upload := new(drivers.UploadMetadata) - upload.Key = key - upload.UploadID = session.uploadID - upload.Initiated = session.initiated - uploads = append(uploads, upload) - } - case resources.KeyMarker != "" && resources.UploadIDMarker != "": - if session.uploadID > resources.UploadIDMarker { - if key >= resources.KeyMarker { - upload := new(drivers.UploadMetadata) - upload.Key = key - upload.UploadID = session.uploadID - upload.Initiated = session.initiated - uploads = append(uploads, upload) - } - } - default: - upload := new(drivers.UploadMetadata) - upload.Key = key - upload.UploadID = session.uploadID - upload.Initiated = session.initiated - uploads = append(uploads, upload) - } - } - } - sort.Sort(byKey(uploads)) - resources.Upload = uploads - return resources, nil + return drivers.BucketMultipartResourcesMetadata{}, iodine.New(drivers.APINotImplemented{API: "ListMultipartUploads"}, nil) } - -// partNumber is a sortable interface for Part slice -type partNumber []*drivers.PartMetadata - -func (a partNumber) Len() int { return len(a) } -func (a partNumber) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a partNumber) Less(i, j int) bool { return a[i].PartNumber < a[j].PartNumber } - func (d donutDriver) ListObjectParts(bucketName, objectName string, resources drivers.ObjectResourcesMetadata) (drivers.ObjectResourcesMetadata, error) { - // Verify upload id - d.lock.RLock() - defer d.lock.RUnlock() - if _, ok := d.storedBuckets[bucketName]; ok == false { - return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucketName}, nil) - } - storedBucket := d.storedBuckets[bucketName] - if _, ok := storedBucket.multiPartSession[objectName]; ok == false { - return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.ObjectNotFound{Bucket: bucketName, Object: objectName}, nil) - } - if storedBucket.multiPartSession[objectName].uploadID != resources.UploadID { - return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.InvalidUploadID{UploadID: resources.UploadID}, nil) - } - objectResourcesMetadata := resources - objectResourcesMetadata.Bucket = bucketName - objectResourcesMetadata.Key = objectName - var parts []*drivers.PartMetadata - var startPartNumber int - switch { - case objectResourcesMetadata.PartNumberMarker == 0: - startPartNumber = 1 - default: - startPartNumber = objectResourcesMetadata.PartNumberMarker - } - for i := startPartNumber; i <= storedBucket.multiPartSession[objectName].totalParts; i++ { - if len(parts) > objectResourcesMetadata.MaxParts { - sort.Sort(partNumber(parts)) - objectResourcesMetadata.IsTruncated = true - objectResourcesMetadata.Part = parts - objectResourcesMetadata.NextPartNumberMarker = i - return objectResourcesMetadata, nil - } - part, ok := storedBucket.partMetadata[bucketName+"/"+getMultipartKey(objectName, resources.UploadID, i)] - if !ok { - return drivers.ObjectResourcesMetadata{}, iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil) - } - parts = append(parts, &part) - } - sort.Sort(partNumber(parts)) - objectResourcesMetadata.Part = parts - return objectResourcesMetadata, nil -} - -func (d donutDriver) expiredPart(a ...interface{}) { - key := a[0].(string) - // loop through all buckets - for _, storedBucket := range d.storedBuckets { - delete(storedBucket.partMetadata, key) - } - go debug.FreeOSMemory() + return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.APINotImplemented{API: "ListObjectParts"}, nil) } diff --git a/pkg/storage/drivers/donut/donut.go b/pkg/storage/drivers/donut/donut.go index 0b5ff27f6..b23fd925b 100644 --- a/pkg/storage/drivers/donut/donut.go +++ b/pkg/storage/drivers/donut/donut.go @@ -17,55 +17,28 @@ package donut import ( - "bytes" "encoding/base64" "encoding/hex" "io" "os" "path/filepath" - "runtime/debug" "sort" "strconv" "strings" "sync" - "time" "io/ioutil" "github.com/minio/minio/pkg/iodine" "github.com/minio/minio/pkg/storage/donut" "github.com/minio/minio/pkg/storage/drivers" - "github.com/minio/minio/pkg/storage/trove" - "github.com/minio/minio/pkg/utils/log" -) - -type storedBucket struct { - bucketMetadata drivers.BucketMetadata - objectMetadata map[string]drivers.ObjectMetadata - partMetadata map[string]drivers.PartMetadata - multiPartSession map[string]multiPartSession -} - -type multiPartSession struct { - totalParts int - uploadID string - initiated time.Time -} - -const ( - totalBuckets = 100 ) // donutDriver - creates a new single disk drivers driver using donut type donutDriver struct { - donut donut.Donut - paths []string - lock *sync.RWMutex - storedBuckets map[string]storedBucket - objects *trove.Cache - multiPartObjects *trove.Cache - maxSize uint64 - expiration time.Duration + donut donut.Donut + paths []string + lock *sync.RWMutex } // This is a dummy nodeDiskMap which is going to be deprecated soon @@ -101,83 +74,18 @@ func createNodeDiskMap(paths []string) map[string][]string { return nodes } -func initialize(d *donutDriver) error { +// NewDriver instantiate a donut driver +func NewDriver(paths []string) (drivers.Driver, error) { + driver := new(donutDriver) + driver.paths = paths + driver.lock = new(sync.RWMutex) + // Soon to be user configurable, when Management API is available // we should remove "default" to something which is passed down // from configuration paramters var err error - d.donut, err = donut.NewDonut("default", createNodeDiskMap(d.paths)) - if err != nil { - return iodine.New(err, nil) - } - buckets, err := d.donut.ListBuckets() - if err != nil { - return iodine.New(err, nil) - } - for bucketName, metadata := range buckets { - d.lock.RLock() - storedBucket := d.storedBuckets[bucketName] - d.lock.RUnlock() - if len(storedBucket.multiPartSession) == 0 { - storedBucket.multiPartSession = make(map[string]multiPartSession) - } - if len(storedBucket.objectMetadata) == 0 { - storedBucket.objectMetadata = make(map[string]drivers.ObjectMetadata) - } - if len(storedBucket.partMetadata) == 0 { - storedBucket.partMetadata = make(map[string]drivers.PartMetadata) - } - storedBucket.bucketMetadata = drivers.BucketMetadata{ - Name: metadata.Name, - Created: metadata.Created, - ACL: drivers.BucketACL(metadata.ACL), - } - d.lock.Lock() - d.storedBuckets[bucketName] = storedBucket - d.lock.Unlock() - } - return nil -} - -// NewDriver instantiate a donut driver -func NewDriver(paths []string, maxSize uint64, expiration time.Duration) (drivers.Driver, error) { - driver := new(donutDriver) - driver.storedBuckets = make(map[string]storedBucket) - driver.objects = trove.NewCache(maxSize, expiration) - driver.maxSize = maxSize - driver.expiration = expiration - driver.multiPartObjects = trove.NewCache(0, time.Duration(0)) - driver.lock = new(sync.RWMutex) - - driver.objects.OnExpired = driver.expiredObject - driver.multiPartObjects.OnExpired = driver.expiredPart - - // set up memory expiration - driver.objects.ExpireObjects(time.Second * 5) - - driver.paths = paths - driver.lock = new(sync.RWMutex) - - err := initialize(driver) - return driver, err -} - -func (d donutDriver) expiredObject(a ...interface{}) { - cacheStats := d.objects.Stats() - log.Printf("CurrentSize: %d, CurrentItems: %d, TotalExpirations: %d", - cacheStats.Bytes, cacheStats.Items, cacheStats.Expired) - key := a[0].(string) - // loop through all buckets - for bucket, storedBucket := range d.storedBuckets { - delete(storedBucket.objectMetadata, key) - // remove bucket if no objects found anymore - if len(storedBucket.objectMetadata) == 0 { - if time.Since(d.storedBuckets[bucket].bucketMetadata.Created) > d.expiration { - delete(d.storedBuckets, bucket) - } - } - } - go debug.FreeOSMemory() + driver.donut, err = donut.NewDonut("default", createNodeDiskMap(driver.paths)) + return driver, iodine.New(err, nil) } // byBucketName is a type for sorting bucket metadata by bucket name @@ -192,8 +100,17 @@ func (d donutDriver) ListBuckets() (results []drivers.BucketMetadata, err error) if d.donut == nil { return nil, iodine.New(drivers.InternalError{}, nil) } - for _, storedBucket := range d.storedBuckets { - results = append(results, storedBucket.bucketMetadata) + buckets, err := d.donut.ListBuckets() + if err != nil { + return nil, iodine.New(err, nil) + } + for _, metadata := range buckets { + result := drivers.BucketMetadata{ + Name: metadata.Name, + Created: metadata.Created, + ACL: drivers.BucketACL(metadata.ACL), + } + results = append(results, result) } sort.Sort(byBucketName(results)) return results, nil @@ -203,9 +120,6 @@ func (d donutDriver) ListBuckets() (results []drivers.BucketMetadata, err error) func (d donutDriver) CreateBucket(bucketName, acl string) error { d.lock.Lock() defer d.lock.Unlock() - if len(d.storedBuckets) == totalBuckets { - return iodine.New(drivers.TooManyBuckets{Bucket: bucketName}, nil) - } if d.donut == nil { return iodine.New(drivers.InternalError{}, nil) } @@ -223,20 +137,6 @@ func (d donutDriver) CreateBucket(bucketName, acl string) error { } return iodine.New(err, nil) } - var newBucket = storedBucket{} - newBucket.objectMetadata = make(map[string]drivers.ObjectMetadata) - newBucket.multiPartSession = make(map[string]multiPartSession) - newBucket.partMetadata = make(map[string]drivers.PartMetadata) - metadata, err := d.donut.GetBucketMetadata(bucketName) - if err != nil { - return iodine.New(err, nil) - } - newBucket.bucketMetadata = drivers.BucketMetadata{ - Name: metadata.Name, - Created: metadata.Created, - ACL: drivers.BucketACL(metadata.ACL), - } - d.storedBuckets[bucketName] = newBucket return nil } return iodine.New(drivers.BucketNameInvalid{Bucket: bucketName}, nil) @@ -252,9 +152,6 @@ func (d donutDriver) GetBucketMetadata(bucketName string) (drivers.BucketMetadat if !drivers.IsValidBucket(bucketName) { return drivers.BucketMetadata{}, drivers.BucketNameInvalid{Bucket: bucketName} } - if d.storedBuckets[bucketName].bucketMetadata.Name != "" { - return d.storedBuckets[bucketName].bucketMetadata, nil - } metadata, err := d.donut.GetBucketMetadata(bucketName) if err != nil { return drivers.BucketMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucketName}, nil) @@ -286,9 +183,6 @@ func (d donutDriver) SetBucketMetadata(bucketName, acl string) error { if err != nil { return iodine.New(drivers.BucketNotFound{Bucket: bucketName}, nil) } - storedBucket := d.storedBuckets[bucketName] - storedBucket.bucketMetadata.ACL = drivers.BucketACL(acl) - d.storedBuckets[bucketName] = storedBucket return nil } @@ -303,41 +197,23 @@ func (d donutDriver) GetObject(w io.Writer, bucketName, objectName string) (int6 if !drivers.IsValidObjectName(objectName) { return 0, iodine.New(drivers.ObjectNameInvalid{Object: objectName}, nil) } - if _, ok := d.storedBuckets[bucketName]; ok == false { - return 0, iodine.New(drivers.BucketNotFound{Bucket: bucketName}, nil) - } d.lock.RLock() defer d.lock.RUnlock() - objectKey := bucketName + "/" + objectName - data, ok := d.objects.Get(objectKey) - if !ok { - reader, size, err := d.donut.GetObject(bucketName, objectName) - if err != nil { - switch iodine.ToError(err).(type) { - case donut.BucketNotFound: - return 0, iodine.New(drivers.BucketNotFound{Bucket: bucketName}, nil) - case donut.ObjectNotFound: - return 0, iodine.New(drivers.ObjectNotFound{ - Bucket: bucketName, - Object: objectName, - }, nil) - default: - return 0, iodine.New(drivers.InternalError{}, nil) - } + reader, size, err := d.donut.GetObject(bucketName, objectName) + if err != nil { + switch iodine.ToError(err).(type) { + case donut.BucketNotFound: + return 0, iodine.New(drivers.BucketNotFound{Bucket: bucketName}, nil) + case donut.ObjectNotFound: + return 0, iodine.New(drivers.ObjectNotFound{ + Bucket: bucketName, + Object: objectName, + }, nil) + default: + return 0, iodine.New(drivers.InternalError{}, nil) } - pw := newProxyWriter(w) - n, err := io.CopyN(pw, reader, size) - if err != nil { - return 0, iodine.New(err, nil) - } - // Save in memory for future reads - d.objects.Set(objectKey, pw.writtenBytes) - // free up - pw.writtenBytes = nil - go debug.FreeOSMemory() - return n, nil } - written, err := io.Copy(w, bytes.NewBuffer(data)) + written, err := io.CopyN(w, reader, size) if err != nil { return 0, iodine.New(err, nil) } @@ -369,45 +245,36 @@ func (d donutDriver) GetPartialObject(w io.Writer, bucketName, objectName string Length: length, }, errParams) } - if _, ok := d.storedBuckets[bucketName]; ok == false { - return 0, iodine.New(drivers.BucketNotFound{Bucket: bucketName}, nil) + reader, size, err := d.donut.GetObject(bucketName, objectName) + if err != nil { + switch iodine.ToError(err).(type) { + case donut.BucketNotFound: + return 0, iodine.New(drivers.BucketNotFound{Bucket: bucketName}, nil) + case donut.ObjectNotFound: + return 0, iodine.New(drivers.ObjectNotFound{ + Bucket: bucketName, + Object: objectName, + }, nil) + default: + return 0, iodine.New(drivers.InternalError{}, nil) + } } - objectKey := bucketName + "/" + objectName - data, ok := d.objects.Get(objectKey) - if !ok { - reader, size, err := d.donut.GetObject(bucketName, objectName) - if err != nil { - switch iodine.ToError(err).(type) { - case donut.BucketNotFound: - return 0, iodine.New(drivers.BucketNotFound{Bucket: bucketName}, nil) - case donut.ObjectNotFound: - return 0, iodine.New(drivers.ObjectNotFound{ - Bucket: bucketName, - Object: objectName, - }, nil) - default: - return 0, iodine.New(drivers.InternalError{}, nil) - } - } - defer reader.Close() - if start > size || (start+length-1) > size { - return 0, iodine.New(drivers.InvalidRange{ - Start: start, - Length: length, - }, errParams) - } - _, err = io.CopyN(ioutil.Discard, reader, start) - if err != nil { - return 0, iodine.New(err, errParams) - } - n, err := io.CopyN(w, reader, length) - if err != nil { - return 0, iodine.New(err, errParams) - } - return n, nil + defer reader.Close() + if start > size || (start+length-1) > size { + return 0, iodine.New(drivers.InvalidRange{ + Start: start, + Length: length, + }, errParams) } - written, err := io.CopyN(w, bytes.NewBuffer(data[start:]), length) - return written, iodine.New(err, nil) + _, err = io.CopyN(ioutil.Discard, reader, start) + if err != nil { + return 0, iodine.New(err, errParams) + } + n, err := io.CopyN(w, reader, length) + if err != nil { + return 0, iodine.New(err, errParams) + } + return n, nil } // GetObjectMetadata retrieves an object's metadata @@ -428,13 +295,6 @@ func (d donutDriver) GetObjectMetadata(bucketName, objectName string) (drivers.O if !drivers.IsValidObjectName(objectName) { return drivers.ObjectMetadata{}, iodine.New(drivers.ObjectNameInvalid{Object: objectName}, errParams) } - if _, ok := d.storedBuckets[bucketName]; ok { - storedBucket := d.storedBuckets[bucketName] - objectKey := bucketName + "/" + objectName - if object, ok := storedBucket.objectMetadata[objectKey]; ok { - return object, nil - } - } metadata, err := d.donut.GetObjectMetadata(bucketName, objectName) if err != nil { return drivers.ObjectMetadata{}, iodine.New(drivers.ObjectNotFound{ @@ -498,24 +358,6 @@ func (d donutDriver) ListObjects(bucketName string, resources drivers.BucketReso return results, resources, nil } -type proxyWriter struct { - writer io.Writer - writtenBytes []byte -} - -func (r *proxyWriter) Write(p []byte) (n int, err error) { - n, err = r.writer.Write(p) - if err != nil { - return - } - r.writtenBytes = append(r.writtenBytes, p[0:n]...) - return -} - -func newProxyWriter(w io.Writer) *proxyWriter { - return &proxyWriter{writer: w, writtenBytes: nil} -} - // CreateObject creates a new object func (d donutDriver) CreateObject(bucketName, objectName, contentType, expectedMD5Sum string, size int64, reader io.Reader) (string, error) { d.lock.Lock() @@ -528,27 +370,12 @@ func (d donutDriver) CreateObject(bucketName, objectName, contentType, expectedM if d.donut == nil { return "", iodine.New(drivers.InternalError{}, errParams) } - // TODO - Should be able to write bigger than cache - if size > int64(d.maxSize) { - generic := drivers.GenericObjectError{Bucket: bucketName, Object: objectName} - return "", iodine.New(drivers.EntityTooLarge{ - GenericObjectError: generic, - Size: strconv.FormatInt(size, 10), - MaxSize: strconv.FormatUint(d.maxSize, 10), - }, nil) - } if !drivers.IsValidBucket(bucketName) { return "", iodine.New(drivers.BucketNameInvalid{Bucket: bucketName}, nil) } if !drivers.IsValidObjectName(objectName) { return "", iodine.New(drivers.ObjectNameInvalid{Object: objectName}, nil) } - storedBucket := d.storedBuckets[bucketName] - // get object key - objectKey := bucketName + "/" + objectName - if _, ok := storedBucket.objectMetadata[objectKey]; ok == true { - return "", iodine.New(drivers.ObjectExists{Bucket: bucketName, Object: objectName}, nil) - } if strings.TrimSpace(contentType) == "" { contentType = "application/octet-stream" } @@ -579,7 +406,5 @@ func (d donutDriver) CreateObject(bucketName, objectName, contentType, expectedM Md5: objMetadata.MD5Sum, Size: objMetadata.Size, } - storedBucket.objectMetadata[objectKey] = newObject - d.storedBuckets[bucketName] = storedBucket return newObject.Md5, nil } diff --git a/pkg/storage/drivers/donut/donut_test.go b/pkg/storage/drivers/donut/donut_test.go index c0d5f50b2..1d823e6f0 100644 --- a/pkg/storage/drivers/donut/donut_test.go +++ b/pkg/storage/drivers/donut/donut_test.go @@ -20,7 +20,6 @@ import ( "io/ioutil" "os" "testing" - "time" . "github.com/minio/check" "github.com/minio/minio/pkg/storage/drivers" @@ -40,7 +39,7 @@ func (s *MySuite) TestAPISuite(c *C) { c.Check(err, IsNil) storageList = append(storageList, p) paths = append(paths, p) - store, err := NewDriver(paths, 1000000, 3*time.Hour) + store, err := NewDriver(paths) c.Check(err, IsNil) return store }