From 45a7eab804b375e648395513b32c14b876e1b74a Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Sat, 27 Jun 2015 15:02:49 -0700 Subject: [PATCH] An attempt to bring in memory layer into donut driver --- commands.go | 55 +++- pkg/api/api_test.go | 32 +- pkg/server/server.go | 6 +- pkg/storage/donut/bucket.go | 3 +- pkg/storage/donut/donut.go | 2 +- pkg/storage/donut/interfaces.go | 2 +- pkg/storage/drivers/api_testsuite.go | 9 - pkg/storage/drivers/donut/donut.go | 312 +++++++++++++----- pkg/storage/drivers/donut/donut_test.go | 3 +- pkg/storage/drivers/donut/multipart.go | 419 ++++++++++++++++++++++++ pkg/storage/drivers/memory/memory.go | 2 +- 11 files changed, 713 insertions(+), 132 deletions(-) create mode 100644 pkg/storage/drivers/donut/multipart.go diff --git a/commands.go b/commands.go index 22c0d1158..810121ec8 100644 --- a/commands.go +++ b/commands.go @@ -154,6 +154,8 @@ func runMemory(c *cli.Context) { } func runDonut(c *cli.Context) { + var err error + u, err := user.Current() if err != nil { Fatalf("Unable to determine current user. Reason: %s\n", err) @@ -161,6 +163,53 @@ func runDonut(c *cli.Context) { if len(c.Args()) < 1 { cli.ShowCommandHelpAndExit(c, "donut", 1) // last argument is exit code } + var maxMemory uint64 + maxMemorySet := false + + var expiration time.Duration + expirationSet := false + + args := c.Args() + for len(args) > 0 { + switch args.First() { + case "limit": + { + if maxMemorySet { + Fatalln("Limit should be set only once") + } + args = args.Tail() + maxMemory, err = humanize.ParseBytes(args.First()) + if err != nil { + Fatalf("Invalid memory size [%s] passed. Reason: %s\n", args.First(), iodine.New(err, nil)) + } + if maxMemory < 1024*1024*10 { + Fatalf("Invalid memory size [%s] passed. Should be greater than 10M\n", args.First()) + } + args = args.Tail() + maxMemorySet = true + } + case "expire": + { + if expirationSet { + Fatalln("Expiration should be set only once") + } + args = args.Tail() + expiration, err = time.ParseDuration(args.First()) + if err != nil { + Fatalf("Invalid expiration time [%s] passed. Reason: %s\n", args.First(), iodine.New(err, nil)) + } + args = args.Tail() + expirationSet = true + } + default: + { + cli.ShowCommandHelpAndExit(c, "donut", 1) // last argument is exit code + } + } + } + if maxMemorySet == false { + Fatalln("Memory limit must be set") + } // supporting multiple paths var paths []string if strings.TrimSpace(c.Args().First()) == "" { @@ -173,8 +222,10 @@ func runDonut(c *cli.Context) { } apiServerConfig := getAPIServerConfig(c) donutDriver := server.DonutFactory{ - Config: apiServerConfig, - Paths: paths, + Config: apiServerConfig, + Paths: paths, + MaxMemory: maxMemory, + Expiration: expiration, } apiServer := donutDriver.GetStartServerFunc() // webServer := getWebServerConfigFunc(c) diff --git a/pkg/api/api_test.go b/pkg/api/api_test.go index 0c4052e9d..d59a51808 100644 --- a/pkg/api/api_test.go +++ b/pkg/api/api_test.go @@ -59,7 +59,7 @@ var _ = Suite(&MySuite{ var _ = Suite(&MySuite{ initDriver: func() (drivers.Driver, string) { - _, _, driver := memory.Start(1000, 3*time.Hour) + _, _, driver := memory.Start(10000, 3*time.Hour) return driver, "" }, }) @@ -69,7 +69,7 @@ var _ = Suite(&MySuite{ root, _ := ioutil.TempDir(os.TempDir(), "minio-api") var roots []string roots = append(roots, root) - _, _, driver := donut.Start(roots) + _, _, driver := donut.Start(roots, 10000, 3*time.Hour) return driver, root }, }) @@ -1451,13 +1451,6 @@ func (s *MySuite) TestObjectMultipartAbort(c *C) { { driver.AssertExpectations(c) } - default: - // Donut doesn't have multipart support yet - { - if reflect.TypeOf(driver).String() == "*donut.donutDriver" { - return - } - } } driver := s.Driver typedDriver := s.MockDriver @@ -1534,13 +1527,6 @@ func (s *MySuite) TestBucketMultipartList(c *C) { { driver.AssertExpectations(c) } - default: - // Donut doesn't have multipart support yet - { - if reflect.TypeOf(driver).String() == "*donut.donutDriver" { - return - } - } } driver := s.Driver typedDriver := s.MockDriver @@ -1623,13 +1609,6 @@ func (s *MySuite) TestObjectMultipartList(c *C) { { driver.AssertExpectations(c) } - default: - // Donut doesn't have multipart support yet - { - if reflect.TypeOf(driver).String() == "*donut.donutDriver" { - return - } - } } driver := s.Driver typedDriver := s.MockDriver @@ -1707,13 +1686,6 @@ func (s *MySuite) TestObjectMultipart(c *C) { { driver.AssertExpectations(c) } - default: - // Donut doesn't have multipart support yet - { - if reflect.TypeOf(driver).String() == "*donut.donutDriver" { - return - } - } } driver := s.Driver typedDriver := s.MockDriver diff --git a/pkg/server/server.go b/pkg/server/server.go index 0f9f72fd0..ace14ac87 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -83,13 +83,15 @@ func (f WebFactory) GetStartServerFunc() StartServerFunc { // DonutFactory is used to build donut api server type DonutFactory struct { httpserver.Config - Paths []string + Paths []string + MaxMemory uint64 + Expiration time.Duration } // GetStartServerFunc DonutFactory builds donut api server func (f DonutFactory) GetStartServerFunc() StartServerFunc { return func() (chan<- string, <-chan error) { - _, _, driver := donut.Start(f.Paths) + _, _, driver := donut.Start(f.Paths, f.MaxMemory, f.Expiration) conf := api.Config{RateLimit: f.RateLimit} conf.SetDriver(driver) ctrl, status, _ := httpserver.Start(api.HTTPHandler(conf), f.Config) diff --git a/pkg/storage/donut/bucket.go b/pkg/storage/donut/bucket.go index 8e0e1e5ca..9c5d54225 100644 --- a/pkg/storage/donut/bucket.go +++ b/pkg/storage/donut/bucket.go @@ -377,14 +377,13 @@ func (b bucket) getDataAndParity(totalWriters int) (k uint8, m uint8, err error) // writeEncodedData - func (b bucket) writeEncodedData(k, m uint8, writers []io.WriteCloser, objectData io.Reader, sumMD5, sum512 hash.Hash) (int, int, error) { - chunks := split.Stream(objectData, 10*1024*1024) encoder, err := newEncoder(k, m, "Cauchy") if err != nil { return 0, 0, iodine.New(err, nil) } chunkCount := 0 totalLength := 0 - for chunk := range chunks { + for chunk := range split.Stream(objectData, 10*1024*1024) { if chunk.Err == nil { totalLength = totalLength + len(chunk.Data) encodedBlocks, _ := encoder.Encode(chunk.Data) diff --git a/pkg/storage/donut/donut.go b/pkg/storage/donut/donut.go index 7925fb874..34e16d712 100644 --- a/pkg/storage/donut/donut.go +++ b/pkg/storage/donut/donut.go @@ -175,7 +175,7 @@ func (dt donut) ListObjects(bucket, prefix, marker, delimiter string, maxkeys in } // PutObject - put object -func (dt donut) PutObject(bucket, object, expectedMD5Sum string, reader io.ReadCloser, metadata map[string]string) (string, error) { +func (dt donut) PutObject(bucket, object, expectedMD5Sum string, reader io.Reader, metadata map[string]string) (string, error) { dt.lock.Lock() defer dt.lock.Unlock() errParams := map[string]string{ diff --git a/pkg/storage/donut/interfaces.go b/pkg/storage/donut/interfaces.go index c5e78e96f..0b961aa17 100644 --- a/pkg/storage/donut/interfaces.go +++ b/pkg/storage/donut/interfaces.go @@ -40,7 +40,7 @@ type ObjectStorage interface { // Object operations GetObject(bucket, object string) (io.ReadCloser, int64, error) GetObjectMetadata(bucket, object string) (ObjectMetadata, error) - PutObject(bucket, object, expectedMD5Sum string, reader io.ReadCloser, metadata map[string]string) (string, error) + PutObject(bucket, object, expectedMD5Sum string, reader io.Reader, metadata map[string]string) (string, error) } // Management is a donut management system interface diff --git a/pkg/storage/drivers/api_testsuite.go b/pkg/storage/drivers/api_testsuite.go index b20cae176..a8d7fb899 100644 --- a/pkg/storage/drivers/api_testsuite.go +++ b/pkg/storage/drivers/api_testsuite.go @@ -22,7 +22,6 @@ import ( "encoding/base64" "encoding/hex" "math/rand" - "reflect" "strconv" "time" @@ -59,10 +58,6 @@ func testCreateBucket(c *check.C, create func() Driver) { func testMultipartObjectCreation(c *check.C, create func() Driver) { drivers := create() - switch { - case reflect.TypeOf(drivers).String() == "*donut.donutDriver": - return - } err := drivers.CreateBucket("bucket", "") c.Assert(err, check.IsNil) uploadID, err := drivers.NewMultipartUpload("bucket", "key", "") @@ -97,10 +92,6 @@ func testMultipartObjectCreation(c *check.C, create func() Driver) { func testMultipartObjectAbort(c *check.C, create func() Driver) { drivers := create() - switch { - case reflect.TypeOf(drivers).String() == "*donut.donutDriver": - return - } err := drivers.CreateBucket("bucket", "") c.Assert(err, check.IsNil) uploadID, err := drivers.NewMultipartUpload("bucket", "key", "") diff --git a/pkg/storage/drivers/donut/donut.go b/pkg/storage/drivers/donut/donut.go index 069d095fb..4b8833a6c 100644 --- a/pkg/storage/drivers/donut/donut.go +++ b/pkg/storage/drivers/donut/donut.go @@ -17,29 +17,55 @@ package donut import ( + "bytes" "encoding/base64" "encoding/hex" "io" "os" "path/filepath" + "runtime/debug" "sort" "strconv" "strings" "sync" + "time" "io/ioutil" "github.com/minio/minio/pkg/iodine" "github.com/minio/minio/pkg/storage/donut" "github.com/minio/minio/pkg/storage/drivers" + "github.com/minio/minio/pkg/storage/trove" "github.com/minio/minio/pkg/utils/log" ) +type storedBucket struct { + bucketMetadata drivers.BucketMetadata + objectMetadata map[string]drivers.ObjectMetadata + partMetadata map[string]drivers.PartMetadata + multiPartSession map[string]multiPartSession +} + +type multiPartSession struct { + totalParts int + uploadID string + initiated time.Time +} + +const ( + totalBuckets = 100 +) + // donutDriver - creates a new single disk drivers driver using donut type donutDriver struct { - donut donut.Donut - paths []string - lock *sync.RWMutex + donut donut.Donut + paths []string + lock *sync.RWMutex + storedBuckets map[string]storedBucket + objects *trove.Cache + multiPartObjects *trove.Cache + maxSize uint64 + expiration time.Duration } // This is a dummy nodeDiskMap which is going to be deprecated soon @@ -82,41 +108,72 @@ func createNodeDiskMapFromSlice(paths []string) map[string][]string { } // Start a single disk subsystem -func Start(paths []string) (chan<- string, <-chan error, drivers.Driver) { +func Start(paths []string, maxSize uint64, expiration time.Duration) (chan<- string, <-chan error, drivers.Driver) { ctrlChannel := make(chan string) errorChannel := make(chan error) + driver := new(donutDriver) + driver.storedBuckets = make(map[string]storedBucket) + driver.objects = trove.NewCache(maxSize, expiration) + driver.maxSize = maxSize + driver.expiration = expiration + driver.multiPartObjects = trove.NewCache(0, time.Duration(0)) + driver.lock = new(sync.RWMutex) + + driver.objects.OnExpired = driver.expiredObject + driver.multiPartObjects.OnExpired = driver.expiredPart + + // set up memory expiration + driver.objects.ExpireObjects(time.Second * 5) + // Soon to be user configurable, when Management API is available // we should remove "default" to something which is passed down // from configuration paramters - var d donut.Donut - var err error - if len(paths) == 1 { - d, err = donut.NewDonut("default", createNodeDiskMap(paths[0])) + switch { + case len(paths) == 1: + d, err := donut.NewDonut("default", createNodeDiskMap(paths[0])) if err != nil { err = iodine.New(err, nil) log.Error.Println(err) } - } else { - d, err = donut.NewDonut("default", createNodeDiskMapFromSlice(paths)) + driver.donut = d + default: + d, err := donut.NewDonut("default", createNodeDiskMapFromSlice(paths)) if err != nil { err = iodine.New(err, nil) log.Error.Println(err) } + driver.donut = d } - s := new(donutDriver) - s.donut = d - s.paths = paths - s.lock = new(sync.RWMutex) + driver.paths = paths + driver.lock = new(sync.RWMutex) - go start(ctrlChannel, errorChannel, s) - return ctrlChannel, errorChannel, s + go start(ctrlChannel, errorChannel, driver) + return ctrlChannel, errorChannel, driver } -func start(ctrlChannel <-chan string, errorChannel chan<- error, s *donutDriver) { +func start(ctrlChannel <-chan string, errorChannel chan<- error, driver *donutDriver) { close(errorChannel) } +func (d donutDriver) expiredObject(a ...interface{}) { + cacheStats := d.objects.Stats() + log.Printf("CurrentSize: %d, CurrentItems: %d, TotalExpirations: %d", + cacheStats.Bytes, cacheStats.Items, cacheStats.Expired) + key := a[0].(string) + // loop through all buckets + for bucket, storedBucket := range d.storedBuckets { + delete(storedBucket.objectMetadata, key) + // remove bucket if no objects found anymore + if len(storedBucket.objectMetadata) == 0 { + if time.Since(d.storedBuckets[bucket].bucketMetadata.Created) > d.expiration { + delete(d.storedBuckets, bucket) + } + } + } + go debug.FreeOSMemory() +} + // byBucketName is a type for sorting bucket metadata by bucket name type byBucketName []drivers.BucketMetadata @@ -126,8 +183,6 @@ func (b byBucketName) Less(i, j int) bool { return b[i].Name < b[j].Name } // ListBuckets returns a list of buckets func (d donutDriver) ListBuckets() (results []drivers.BucketMetadata, err error) { - d.lock.RLock() - defer d.lock.RUnlock() if d.donut == nil { return nil, iodine.New(drivers.InternalError{}, nil) } @@ -135,11 +190,17 @@ func (d donutDriver) ListBuckets() (results []drivers.BucketMetadata, err error) if err != nil { return nil, err } - for name, metadata := range buckets { + for bucketName, metadata := range buckets { result := drivers.BucketMetadata{ - Name: name, + Name: metadata.Name, Created: metadata.Created, + ACL: drivers.BucketACL(metadata.ACL), } + d.lock.Lock() + storedBucket := d.storedBuckets[bucketName] + storedBucket.bucketMetadata = result + d.storedBuckets[bucketName] = storedBucket + d.lock.Unlock() results = append(results, result) } sort.Sort(byBucketName(results)) @@ -150,6 +211,9 @@ func (d donutDriver) ListBuckets() (results []drivers.BucketMetadata, err error) func (d donutDriver) CreateBucket(bucketName, acl string) error { d.lock.Lock() defer d.lock.Unlock() + if len(d.storedBuckets) == totalBuckets { + return iodine.New(drivers.TooManyBuckets{Bucket: bucketName}, nil) + } if d.donut == nil { return iodine.New(drivers.InternalError{}, nil) } @@ -167,6 +231,11 @@ func (d donutDriver) CreateBucket(bucketName, acl string) error { } return iodine.New(err, nil) } + var newBucket = storedBucket{} + newBucket.objectMetadata = make(map[string]drivers.ObjectMetadata) + newBucket.multiPartSession = make(map[string]multiPartSession) + newBucket.partMetadata = make(map[string]drivers.PartMetadata) + d.storedBuckets[bucketName] = newBucket return nil } return iodine.New(drivers.BucketNameInvalid{Bucket: bucketName}, nil) @@ -179,9 +248,12 @@ func (d donutDriver) GetBucketMetadata(bucketName string) (drivers.BucketMetadat if d.donut == nil { return drivers.BucketMetadata{}, iodine.New(drivers.InternalError{}, nil) } - if !drivers.IsValidBucket(bucketName) || strings.Contains(bucketName, ".") { + if !drivers.IsValidBucket(bucketName) { return drivers.BucketMetadata{}, drivers.BucketNameInvalid{Bucket: bucketName} } + if d.storedBuckets[bucketName].bucketMetadata.Name != "" { + return d.storedBuckets[bucketName].bucketMetadata, nil + } metadata, err := d.donut.GetBucketMetadata(bucketName) if err != nil { return drivers.BucketMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucketName}, nil) @@ -213,11 +285,14 @@ func (d donutDriver) SetBucketMetadata(bucketName, acl string) error { if err != nil { return iodine.New(drivers.BucketNotFound{Bucket: bucketName}, nil) } + storedBucket := d.storedBuckets[bucketName] + storedBucket.bucketMetadata.ACL = drivers.BucketACL(acl) + d.storedBuckets[bucketName] = storedBucket return nil } // GetObject retrieves an object and writes it to a writer -func (d donutDriver) GetObject(target io.Writer, bucketName, objectName string) (int64, error) { +func (d donutDriver) GetObject(w io.Writer, bucketName, objectName string) (int64, error) { d.lock.RLock() defer d.lock.RUnlock() if d.donut == nil { @@ -229,15 +304,30 @@ func (d donutDriver) GetObject(target io.Writer, bucketName, objectName string) if !drivers.IsValidObjectName(objectName) || strings.TrimSpace(objectName) == "" { return 0, iodine.New(drivers.ObjectNameInvalid{Object: objectName}, nil) } - reader, size, err := d.donut.GetObject(bucketName, objectName) - if err != nil { - return 0, iodine.New(drivers.ObjectNotFound{ - Bucket: bucketName, - Object: objectName, - }, nil) + if _, ok := d.storedBuckets[bucketName]; ok == false { + return 0, iodine.New(drivers.BucketNotFound{Bucket: bucketName}, nil) } - n, err := io.CopyN(target, reader, size) - return n, iodine.New(err, nil) + objectKey := bucketName + "/" + objectName + data, ok := d.objects.Get(objectKey) + if !ok { + reader, size, err := d.donut.GetObject(bucketName, objectName) + if err != nil { + return 0, iodine.New(drivers.ObjectNotFound{ + Bucket: bucketName, + Object: objectName, + }, nil) + } + n, err := io.CopyN(w, reader, size) + if err != nil { + return 0, iodine.New(err, nil) + } + return n, nil + } + written, err := io.Copy(w, bytes.NewBuffer(data)) + if err != nil { + return 0, iodine.New(err, nil) + } + return written, nil } // GetPartialObject retrieves an object range and writes it to a writer @@ -265,29 +355,38 @@ func (d donutDriver) GetPartialObject(w io.Writer, bucketName, objectName string Length: length, }, errParams) } - reader, size, err := d.donut.GetObject(bucketName, objectName) - if err != nil { - return 0, iodine.New(drivers.ObjectNotFound{ - Bucket: bucketName, - Object: objectName, - }, nil) + if _, ok := d.storedBuckets[bucketName]; ok == false { + return 0, iodine.New(drivers.BucketNotFound{Bucket: bucketName}, nil) } - defer reader.Close() - if start > size || (start+length-1) > size { - return 0, iodine.New(drivers.InvalidRange{ - Start: start, - Length: length, - }, errParams) + objectKey := bucketName + "/" + objectName + data, ok := d.objects.Get(objectKey) + if !ok { + reader, size, err := d.donut.GetObject(bucketName, objectName) + if err != nil { + return 0, iodine.New(drivers.ObjectNotFound{ + Bucket: bucketName, + Object: objectName, + }, nil) + } + defer reader.Close() + if start > size || (start+length-1) > size { + return 0, iodine.New(drivers.InvalidRange{ + Start: start, + Length: length, + }, errParams) + } + _, err = io.CopyN(ioutil.Discard, reader, start) + if err != nil { + return 0, iodine.New(err, errParams) + } + n, err := io.CopyN(w, reader, length) + if err != nil { + return 0, iodine.New(err, errParams) + } + return n, nil } - _, err = io.CopyN(ioutil.Discard, reader, start) - if err != nil { - return 0, iodine.New(err, errParams) - } - n, err := io.CopyN(w, reader, length) - if err != nil { - return 0, iodine.New(err, errParams) - } - return n, nil + written, err := io.CopyN(w, bytes.NewBuffer(data[start:]), length) + return written, iodine.New(err, nil) } // GetObjectMetadata retrieves an object's metadata @@ -302,12 +401,20 @@ func (d donutDriver) GetObjectMetadata(bucketName, objectName string) (drivers.O if d.donut == nil { return drivers.ObjectMetadata{}, iodine.New(drivers.InternalError{}, errParams) } - if !drivers.IsValidBucket(bucketName) || strings.Contains(bucketName, ".") { + if !drivers.IsValidBucket(bucketName) { return drivers.ObjectMetadata{}, iodine.New(drivers.BucketNameInvalid{Bucket: bucketName}, errParams) } - if !drivers.IsValidObjectName(objectName) || strings.TrimSpace(objectName) == "" { + if !drivers.IsValidObjectName(objectName) { return drivers.ObjectMetadata{}, iodine.New(drivers.ObjectNameInvalid{Object: objectName}, errParams) } + if _, ok := d.storedBuckets[bucketName]; !ok { + return drivers.ObjectMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucketName}, nil) + } + storedBucket := d.storedBuckets[bucketName] + objectKey := bucketName + "/" + objectName + if object, ok := storedBucket.objectMetadata[objectKey]; ok { + return object, nil + } metadata, err := d.donut.GetObjectMetadata(bucketName, objectName) if err != nil { return drivers.ObjectMetadata{}, iodine.New(drivers.ObjectNotFound{ @@ -370,6 +477,28 @@ func (d donutDriver) ListObjects(bucketName string, resources drivers.BucketReso return results, resources, nil } +type proxyReader struct { + io.Reader + readBytes []byte +} + +func (r *proxyReader) Read(p []byte) (n int, err error) { + n, err = r.Reader.Read(p) + if err == io.EOF || err == io.ErrUnexpectedEOF { + r.readBytes = append(r.readBytes, p[0:n]...) + return + } + if err != nil { + return + } + r.readBytes = append(r.readBytes, p[0:n]...) + return +} + +func newProxyReader(r io.Reader) *proxyReader { + return &proxyReader{r, nil} +} + // CreateObject creates a new object func (d donutDriver) CreateObject(bucketName, objectName, contentType, expectedMD5Sum string, size int64, reader io.Reader) (string, error) { d.lock.Lock() @@ -379,56 +508,73 @@ func (d donutDriver) CreateObject(bucketName, objectName, contentType, expectedM "objectName": objectName, "contentType": contentType, } + if size > int64(d.maxSize) { + generic := drivers.GenericObjectError{Bucket: bucketName, Object: objectName} + return "", iodine.New(drivers.EntityTooLarge{ + GenericObjectError: generic, + Size: strconv.FormatInt(size, 10), + MaxSize: strconv.FormatUint(d.maxSize, 10), + }, nil) + } if d.donut == nil { return "", iodine.New(drivers.InternalError{}, errParams) } - if !drivers.IsValidBucket(bucketName) || strings.Contains(bucketName, ".") { + if !drivers.IsValidBucket(bucketName) { return "", iodine.New(drivers.BucketNameInvalid{Bucket: bucketName}, nil) } - if !drivers.IsValidObjectName(objectName) || strings.TrimSpace(objectName) == "" { + if !drivers.IsValidObjectName(objectName) { return "", iodine.New(drivers.ObjectNameInvalid{Object: objectName}, nil) } + storedBucket := d.storedBuckets[bucketName] + // get object key + objectKey := bucketName + "/" + objectName + if _, ok := storedBucket.objectMetadata[objectKey]; ok == true { + return "", iodine.New(drivers.ObjectExists{Bucket: bucketName, Object: objectName}, nil) + } if strings.TrimSpace(contentType) == "" { contentType = "application/octet-stream" } metadata := make(map[string]string) metadata["contentType"] = strings.TrimSpace(contentType) metadata["contentLength"] = strconv.FormatInt(size, 10) - if strings.TrimSpace(expectedMD5Sum) != "" { expectedMD5SumBytes, err := base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum)) if err != nil { - return "", iodine.New(err, nil) + return "", iodine.New(drivers.InvalidDigest{Md5: expectedMD5Sum}, nil) } expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes) } - calculatedMD5Sum, err := d.donut.PutObject(bucketName, objectName, expectedMD5Sum, ioutil.NopCloser(reader), metadata) + newReader := newProxyReader(reader) + calculatedMD5Sum, err := d.donut.PutObject(bucketName, objectName, expectedMD5Sum, newReader, metadata) if err != nil { + switch iodine.ToError(err).(type) { + case donut.BadDigest: + return "", iodine.New(drivers.BadDigest{Md5: expectedMD5Sum, Bucket: bucketName, Key: objectName}, nil) + } return "", iodine.New(err, errParams) } + // get object key + ok := d.objects.Set(objectKey, newReader.readBytes) + // setting up for de-allocation + newReader.readBytes = nil + go debug.FreeOSMemory() + if !ok { + return "", iodine.New(drivers.InternalError{}, nil) + } + objectMetadata, err := d.donut.GetObjectMetadata(bucketName, objectName) + if err != nil { + return "", iodine.New(err, nil) + } + newObject := drivers.ObjectMetadata{ + Bucket: bucketName, + Key: objectName, + + ContentType: objectMetadata.Metadata["contentType"], + Created: objectMetadata.Created, + Md5: calculatedMD5Sum, + Size: objectMetadata.Size, + } + storedBucket.objectMetadata[objectKey] = newObject + d.storedBuckets[bucketName] = storedBucket return calculatedMD5Sum, nil } - -func (d donutDriver) ListMultipartUploads(bucket string, resources drivers.BucketMultipartResourcesMetadata) (drivers.BucketMultipartResourcesMetadata, error) { - return drivers.BucketMultipartResourcesMetadata{}, iodine.New(drivers.APINotImplemented{API: "ListMultipartUploads"}, nil) -} - -func (d donutDriver) NewMultipartUpload(bucket, key, contentType string) (string, error) { - return "", iodine.New(drivers.APINotImplemented{API: "NewMultipartUpload"}, nil) -} - -func (d donutDriver) CreateObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) { - return "", iodine.New(drivers.APINotImplemented{API: "CreateObjectPart"}, nil) -} - -func (d donutDriver) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (string, error) { - return "", iodine.New(drivers.APINotImplemented{API: "CompleteMultipartUpload"}, nil) -} - -func (d donutDriver) ListObjectParts(bucket, key string, resources drivers.ObjectResourcesMetadata) (drivers.ObjectResourcesMetadata, error) { - return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.APINotImplemented{API: "ListObjectParts"}, nil) -} - -func (d donutDriver) AbortMultipartUpload(bucket, key, uploadID string) error { - return iodine.New(drivers.APINotImplemented{API: "AbortMultipartUpload"}, nil) -} diff --git a/pkg/storage/drivers/donut/donut_test.go b/pkg/storage/drivers/donut/donut_test.go index 98e497a1b..02a86e35a 100644 --- a/pkg/storage/drivers/donut/donut_test.go +++ b/pkg/storage/drivers/donut/donut_test.go @@ -20,6 +20,7 @@ import ( "io/ioutil" "os" "testing" + "time" . "github.com/minio/check" "github.com/minio/minio/pkg/storage/drivers" @@ -39,7 +40,7 @@ func (s *MySuite) TestAPISuite(c *C) { c.Check(err, IsNil) storageList = append(storageList, p) paths = append(paths, p) - _, _, store := Start(paths) + _, _, store := Start(paths, 1000000, 3*time.Hour) return store } drivers.APITestSuite(c, create) diff --git a/pkg/storage/drivers/donut/multipart.go b/pkg/storage/drivers/donut/multipart.go new file mode 100644 index 000000000..dd13f673d --- /dev/null +++ b/pkg/storage/drivers/donut/multipart.go @@ -0,0 +1,419 @@ +/* + * Minimalist Object Storage, (C) 2015 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package donut + +import ( + "bytes" + "crypto/md5" + "crypto/sha512" + "encoding/base64" + "encoding/hex" + "errors" + "io" + "math/rand" + "runtime/debug" + "sort" + "strconv" + "strings" + "time" + + "github.com/minio/minio/pkg/iodine" + "github.com/minio/minio/pkg/storage/drivers" +) + +// isMD5SumEqual - returns error if md5sum mismatches, success its `nil` +func isMD5SumEqual(expectedMD5Sum, actualMD5Sum string) error { + if strings.TrimSpace(expectedMD5Sum) != "" && strings.TrimSpace(actualMD5Sum) != "" { + expectedMD5SumBytes, err := hex.DecodeString(expectedMD5Sum) + if err != nil { + return iodine.New(err, nil) + } + actualMD5SumBytes, err := hex.DecodeString(actualMD5Sum) + if err != nil { + return iodine.New(err, nil) + } + if !bytes.Equal(expectedMD5SumBytes, actualMD5SumBytes) { + return iodine.New(errors.New("bad digest, md5sum mismatch"), nil) + } + return nil + } + return iodine.New(errors.New("invalid argument"), nil) +} + +func (d donutDriver) NewMultipartUpload(bucket, key, contentType string) (string, error) { + d.lock.RLock() + if !drivers.IsValidBucket(bucket) { + d.lock.RUnlock() + return "", iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, nil) + } + if !drivers.IsValidObjectName(key) { + d.lock.RUnlock() + return "", iodine.New(drivers.ObjectNameInvalid{Object: key}, nil) + } + if _, ok := d.storedBuckets[bucket]; ok == false { + d.lock.RUnlock() + return "", iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) + } + storedBucket := d.storedBuckets[bucket] + objectKey := bucket + "/" + key + if _, ok := storedBucket.objectMetadata[objectKey]; ok == true { + d.lock.RUnlock() + return "", iodine.New(drivers.ObjectExists{Bucket: bucket, Object: key}, nil) + } + d.lock.RUnlock() + + d.lock.Lock() + id := []byte(strconv.FormatInt(rand.Int63(), 10) + bucket + key + time.Now().String()) + uploadIDSum := sha512.Sum512(id) + uploadID := base64.URLEncoding.EncodeToString(uploadIDSum[:])[:47] + + d.storedBuckets[bucket].multiPartSession[key] = multiPartSession{ + uploadID: uploadID, + initiated: time.Now(), + totalParts: 0, + } + d.lock.Unlock() + + return uploadID, nil +} + +func (d donutDriver) AbortMultipartUpload(bucket, key, uploadID string) error { + d.lock.RLock() + storedBucket := d.storedBuckets[bucket] + if storedBucket.multiPartSession[key].uploadID != uploadID { + d.lock.RUnlock() + return iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil) + } + d.lock.RUnlock() + + d.cleanupMultiparts(bucket, key, uploadID) + d.cleanupMultipartSession(bucket, key, uploadID) + return nil +} + +func getMultipartKey(key string, uploadID string, partNumber int) string { + return key + "?uploadId=" + uploadID + "&partNumber=" + strconv.Itoa(partNumber) +} + +func (d donutDriver) CreateObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) { + // Verify upload id + d.lock.RLock() + storedBucket := d.storedBuckets[bucket] + if storedBucket.multiPartSession[key].uploadID != uploadID { + d.lock.RUnlock() + return "", iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil) + } + d.lock.RUnlock() + + etag, err := d.createObjectPart(bucket, key, uploadID, partID, "", expectedMD5Sum, size, data) + if err != nil { + return "", iodine.New(err, nil) + } + // free + debug.FreeOSMemory() + return etag, nil +} + +// createObject - PUT object to memory buffer +func (d donutDriver) createObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) { + d.lock.RLock() + if !drivers.IsValidBucket(bucket) { + d.lock.RUnlock() + return "", iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, nil) + } + if !drivers.IsValidObjectName(key) { + d.lock.RUnlock() + return "", iodine.New(drivers.ObjectNameInvalid{Object: key}, nil) + } + if _, ok := d.storedBuckets[bucket]; ok == false { + d.lock.RUnlock() + return "", iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) + } + storedBucket := d.storedBuckets[bucket] + // get object key + partKey := bucket + "/" + getMultipartKey(key, uploadID, partID) + if _, ok := storedBucket.partMetadata[partKey]; ok == true { + d.lock.RUnlock() + return storedBucket.partMetadata[partKey].ETag, nil + } + d.lock.RUnlock() + + if contentType == "" { + contentType = "application/octet-stream" + } + contentType = strings.TrimSpace(contentType) + if strings.TrimSpace(expectedMD5Sum) != "" { + expectedMD5SumBytes, err := base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum)) + if err != nil { + // pro-actively close the connection + return "", iodine.New(drivers.InvalidDigest{Md5: expectedMD5Sum}, nil) + } + expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes) + } + + // calculate md5 + hash := md5.New() + var readBytes []byte + + var err error + var length int + for err == nil { + byteBuffer := make([]byte, 1024*1024) + length, err = data.Read(byteBuffer) + // While hash.Write() wouldn't mind a Nil byteBuffer + // It is necessary for us to verify this and break + if length == 0 { + break + } + hash.Write(byteBuffer[0:length]) + readBytes = append(readBytes, byteBuffer[0:length]...) + } + if err != io.EOF { + return "", iodine.New(err, nil) + } + go debug.FreeOSMemory() + md5SumBytes := hash.Sum(nil) + totalLength := int64(len(readBytes)) + + d.lock.Lock() + d.multiPartObjects.Set(partKey, readBytes) + d.lock.Unlock() + // setting up for de-allocation + readBytes = nil + + md5Sum := hex.EncodeToString(md5SumBytes) + // Verify if the written object is equal to what is expected, only if it is requested as such + if strings.TrimSpace(expectedMD5Sum) != "" { + if err := isMD5SumEqual(strings.TrimSpace(expectedMD5Sum), md5Sum); err != nil { + return "", iodine.New(drivers.BadDigest{Md5: expectedMD5Sum, Bucket: bucket, Key: key}, nil) + } + } + newPart := drivers.PartMetadata{ + PartNumber: partID, + LastModified: time.Now().UTC(), + ETag: md5Sum, + Size: totalLength, + } + + d.lock.Lock() + storedBucket.partMetadata[partKey] = newPart + multiPartSession := storedBucket.multiPartSession[key] + multiPartSession.totalParts++ + storedBucket.multiPartSession[key] = multiPartSession + d.storedBuckets[bucket] = storedBucket + d.lock.Unlock() + + return md5Sum, nil +} + +func (d donutDriver) cleanupMultipartSession(bucket, key, uploadID string) { + d.lock.Lock() + defer d.lock.Unlock() + delete(d.storedBuckets[bucket].multiPartSession, key) +} + +func (d donutDriver) cleanupMultiparts(bucket, key, uploadID string) { + for i := 1; i <= d.storedBuckets[bucket].multiPartSession[key].totalParts; i++ { + objectKey := bucket + "/" + getMultipartKey(key, uploadID, i) + d.multiPartObjects.Delete(objectKey) + } +} + +func (d donutDriver) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (string, error) { + if !drivers.IsValidBucket(bucket) { + return "", iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, nil) + } + if !drivers.IsValidObjectName(key) { + return "", iodine.New(drivers.ObjectNameInvalid{Object: key}, nil) + } + // Verify upload id + d.lock.RLock() + if _, ok := d.storedBuckets[bucket]; ok == false { + d.lock.RUnlock() + return "", iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) + } + storedBucket := d.storedBuckets[bucket] + if storedBucket.multiPartSession[key].uploadID != uploadID { + d.lock.RUnlock() + return "", iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil) + } + d.lock.RUnlock() + + d.lock.Lock() + var size int64 + var fullObject bytes.Buffer + for i := 1; i <= len(parts); i++ { + recvMD5 := parts[i] + object, ok := d.multiPartObjects.Get(bucket + "/" + getMultipartKey(key, uploadID, i)) + if ok == false { + d.lock.Unlock() + return "", iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil) + } + size += int64(len(object)) + calcMD5Bytes := md5.Sum(object) + // complete multi part request header md5sum per part is hex encoded + recvMD5Bytes, err := hex.DecodeString(strings.Trim(recvMD5, "\"")) + if err != nil { + return "", iodine.New(drivers.InvalidDigest{Md5: recvMD5}, nil) + } + if !bytes.Equal(recvMD5Bytes, calcMD5Bytes[:]) { + return "", iodine.New(drivers.BadDigest{Md5: recvMD5, Bucket: bucket, Key: getMultipartKey(key, uploadID, i)}, nil) + } + _, err = io.Copy(&fullObject, bytes.NewBuffer(object)) + if err != nil { + return "", iodine.New(err, nil) + } + object = nil + go debug.FreeOSMemory() + } + d.lock.Unlock() + + md5sumSlice := md5.Sum(fullObject.Bytes()) + // this is needed for final verification inside CreateObject, do not convert this to hex + md5sum := base64.StdEncoding.EncodeToString(md5sumSlice[:]) + etag, err := d.CreateObject(bucket, key, "", md5sum, size, &fullObject) + if err != nil { + // No need to call internal cleanup functions here, caller will call AbortMultipartUpload() + // which would in-turn cleanup properly in accordance with S3 Spec + return "", iodine.New(err, nil) + } + fullObject.Reset() + d.cleanupMultiparts(bucket, key, uploadID) + d.cleanupMultipartSession(bucket, key, uploadID) + return etag, nil +} + +// byKey is a sortable interface for UploadMetadata slice +type byKey []*drivers.UploadMetadata + +func (a byKey) Len() int { return len(a) } +func (a byKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a byKey) Less(i, j int) bool { return a[i].Key < a[j].Key } + +func (d donutDriver) ListMultipartUploads(bucket string, resources drivers.BucketMultipartResourcesMetadata) (drivers.BucketMultipartResourcesMetadata, error) { + // TODO handle delimiter + d.lock.RLock() + defer d.lock.RUnlock() + if _, ok := d.storedBuckets[bucket]; ok == false { + return drivers.BucketMultipartResourcesMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) + } + storedBucket := d.storedBuckets[bucket] + var uploads []*drivers.UploadMetadata + + for key, session := range storedBucket.multiPartSession { + if strings.HasPrefix(key, resources.Prefix) { + if len(uploads) > resources.MaxUploads { + sort.Sort(byKey(uploads)) + resources.Upload = uploads + resources.NextKeyMarker = key + resources.NextUploadIDMarker = session.uploadID + resources.IsTruncated = true + return resources, nil + } + // uploadIDMarker is ignored if KeyMarker is empty + switch { + case resources.KeyMarker != "" && resources.UploadIDMarker == "": + if key > resources.KeyMarker { + upload := new(drivers.UploadMetadata) + upload.Key = key + upload.UploadID = session.uploadID + upload.Initiated = session.initiated + uploads = append(uploads, upload) + } + case resources.KeyMarker != "" && resources.UploadIDMarker != "": + if session.uploadID > resources.UploadIDMarker { + if key >= resources.KeyMarker { + upload := new(drivers.UploadMetadata) + upload.Key = key + upload.UploadID = session.uploadID + upload.Initiated = session.initiated + uploads = append(uploads, upload) + } + } + default: + upload := new(drivers.UploadMetadata) + upload.Key = key + upload.UploadID = session.uploadID + upload.Initiated = session.initiated + uploads = append(uploads, upload) + } + } + } + sort.Sort(byKey(uploads)) + resources.Upload = uploads + return resources, nil +} + +// partNumber is a sortable interface for Part slice +type partNumber []*drivers.PartMetadata + +func (a partNumber) Len() int { return len(a) } +func (a partNumber) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a partNumber) Less(i, j int) bool { return a[i].PartNumber < a[j].PartNumber } + +func (d donutDriver) ListObjectParts(bucket, key string, resources drivers.ObjectResourcesMetadata) (drivers.ObjectResourcesMetadata, error) { + // Verify upload id + d.lock.RLock() + defer d.lock.RUnlock() + if _, ok := d.storedBuckets[bucket]; ok == false { + return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) + } + storedBucket := d.storedBuckets[bucket] + if _, ok := storedBucket.multiPartSession[key]; ok == false { + return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.ObjectNotFound{Bucket: bucket, Object: key}, nil) + } + if storedBucket.multiPartSession[key].uploadID != resources.UploadID { + return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.InvalidUploadID{UploadID: resources.UploadID}, nil) + } + objectResourcesMetadata := resources + objectResourcesMetadata.Bucket = bucket + objectResourcesMetadata.Key = key + var parts []*drivers.PartMetadata + var startPartNumber int + switch { + case objectResourcesMetadata.PartNumberMarker == 0: + startPartNumber = 1 + default: + startPartNumber = objectResourcesMetadata.PartNumberMarker + } + for i := startPartNumber; i <= storedBucket.multiPartSession[key].totalParts; i++ { + if len(parts) > objectResourcesMetadata.MaxParts { + sort.Sort(partNumber(parts)) + objectResourcesMetadata.IsTruncated = true + objectResourcesMetadata.Part = parts + objectResourcesMetadata.NextPartNumberMarker = i + return objectResourcesMetadata, nil + } + part, ok := storedBucket.partMetadata[bucket+"/"+getMultipartKey(key, resources.UploadID, i)] + if !ok { + return drivers.ObjectResourcesMetadata{}, iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil) + } + parts = append(parts, &part) + } + sort.Sort(partNumber(parts)) + objectResourcesMetadata.Part = parts + return objectResourcesMetadata, nil +} + +func (d donutDriver) expiredPart(a ...interface{}) { + key := a[0].(string) + // loop through all buckets + for _, storedBucket := range d.storedBuckets { + delete(storedBucket.partMetadata, key) + } + debug.FreeOSMemory() +} diff --git a/pkg/storage/drivers/memory/memory.go b/pkg/storage/drivers/memory/memory.go index ad05fa479..ed866a27a 100644 --- a/pkg/storage/drivers/memory/memory.go +++ b/pkg/storage/drivers/memory/memory.go @@ -321,7 +321,7 @@ func (memory *memoryDriver) createObject(bucket, key, contentType, expectedMD5Su func (memory *memoryDriver) CreateBucket(bucketName, acl string) error { memory.lock.RLock() if len(memory.storedBuckets) == totalBuckets { - memory.lock.RLock() + memory.lock.RUnlock() return iodine.New(drivers.TooManyBuckets{Bucket: bucketName}, nil) } if !drivers.IsValidBucket(bucketName) {