Move memory code out, add it as layer on top of existing donut code

Just like how http.Handlers can be overlayed on top of each other
with each implementing ServeHTTP().

drivers.Driver can be overlayed on top of each other in similar manner
which would implement the drivers.Driver interface.

   API <----> cache <----> donut <----> donut(format)
This commit is contained in:
Harshavardhana 2015-06-29 20:48:23 -07:00
parent fe3c618cc7
commit 8f61d6b6be
9 changed files with 192 additions and 667 deletions

View File

@ -135,7 +135,7 @@ func runDonut(c *cli.Context) {
Fatalln("Path must be set")
}
apiServerConfig := getAPIServerConfig(c)
donutDriver := server.DonutFactory{
donutDriver := server.Factory{
Config: apiServerConfig,
Paths: paths,
MaxMemory: maxMemory,

View File

@ -56,19 +56,13 @@ var _ = Suite(&MySuite{
},
})
var _ = Suite(&MySuite{
initDriver: func() (drivers.Driver, string) {
driver, _ := cache.NewDriver(10000, 3*time.Hour)
return driver, ""
},
})
var _ = Suite(&MySuite{
initDriver: func() (drivers.Driver, string) {
root, _ := ioutil.TempDir(os.TempDir(), "minio-api")
var roots []string
roots = append(roots, root)
driver, _ := donut.NewDriver(roots, 10000, 3*time.Hour)
driver, _ := donut.NewDriver(roots)
driver, _ = cache.NewDriver(10000, 3*time.Hour, driver)
return driver, root
},
})

View File

@ -26,6 +26,8 @@ import (
"github.com/minio/minio/pkg/api/web"
"github.com/minio/minio/pkg/iodine"
"github.com/minio/minio/pkg/server/httpserver"
"github.com/minio/minio/pkg/storage/drivers"
"github.com/minio/minio/pkg/storage/drivers/cache"
"github.com/minio/minio/pkg/storage/drivers/donut"
"github.com/minio/minio/pkg/utils/log"
)
@ -43,22 +45,30 @@ func (f WebFactory) GetStartServerFunc() StartServerFunc {
}
}
// DonutFactory is used to build donut api server
type DonutFactory struct {
// Factory is used to build api server
type Factory struct {
httpserver.Config
Paths []string
MaxMemory uint64
Expiration time.Duration
}
// GetStartServerFunc DonutFactory builds donut api server
func (f DonutFactory) GetStartServerFunc() StartServerFunc {
// GetStartServerFunc Factory builds api server
func (f Factory) GetStartServerFunc() StartServerFunc {
return func() (chan<- string, <-chan error) {
driver, err := donut.NewDriver(f.Paths, f.MaxMemory, f.Expiration)
conf := api.Config{RateLimit: f.RateLimit}
var driver drivers.Driver
var err error
if len(f.Paths) != 0 {
driver, err = donut.NewDriver(f.Paths)
if err != nil {
log.Fatalln(err)
}
conf := api.Config{RateLimit: f.RateLimit}
driver, err = cache.NewDriver(f.MaxMemory, f.Expiration, driver)
if err != nil {
log.Fatalln(err)
}
}
conf.SetDriver(driver)
ctrl, status, _ := httpserver.Start(api.HTTPHandler(conf), f.Config)
return ctrl, status

View File

@ -22,6 +22,7 @@ import (
"encoding/base64"
"encoding/hex"
"math/rand"
"reflect"
"strconv"
"time"
@ -58,6 +59,10 @@ func testCreateBucket(c *check.C, create func() Driver) {
func testMultipartObjectCreation(c *check.C, create func() Driver) {
drivers := create()
switch {
case reflect.TypeOf(drivers).String() == "*donut.donutDriver":
return
}
err := drivers.CreateBucket("bucket", "")
c.Assert(err, check.IsNil)
uploadID, err := drivers.NewMultipartUpload("bucket", "key", "")
@ -92,6 +97,10 @@ func testMultipartObjectCreation(c *check.C, create func() Driver) {
func testMultipartObjectAbort(c *check.C, create func() Driver) {
drivers := create()
switch {
case reflect.TypeOf(drivers).String() == "*donut.donutDriver":
return
}
err := drivers.CreateBucket("bucket", "")
c.Assert(err, check.IsNil)
uploadID, err := drivers.NewMultipartUpload("bucket", "key", "")

View File

@ -45,8 +45,12 @@ type cacheDriver struct {
multiPartObjects *trove.Cache
maxSize uint64
expiration time.Duration
// stacked driver
driver drivers.Driver
}
// storedBucket saved bucket
type storedBucket struct {
bucketMetadata drivers.BucketMetadata
objectMetadata map[string]drivers.ObjectMetadata
@ -54,23 +58,43 @@ type storedBucket struct {
multiPartSession map[string]multiPartSession
}
// multiPartSession multipart session
type multiPartSession struct {
totalParts int
uploadID string
initiated time.Time
}
// total Number of buckets allowed
const (
totalBuckets = 100
)
type proxyWriter struct {
writer io.Writer
writtenBytes []byte
}
func (r *proxyWriter) Write(p []byte) (n int, err error) {
n, err = r.writer.Write(p)
if err != nil {
return
}
r.writtenBytes = append(r.writtenBytes, p[0:n]...)
return
}
func newProxyWriter(w io.Writer) *proxyWriter {
return &proxyWriter{writer: w, writtenBytes: nil}
}
// NewDriver instantiate a new cache driver
func NewDriver(maxSize uint64, expiration time.Duration) (drivers.Driver, error) {
func NewDriver(maxSize uint64, expiration time.Duration, driver drivers.Driver) (drivers.Driver, error) {
cache := new(cacheDriver)
cache.storedBuckets = make(map[string]storedBucket)
cache.objects = trove.NewCache(maxSize, expiration)
cache.maxSize = maxSize
cache.expiration = expiration
cache.objects = trove.NewCache(maxSize, expiration)
cache.multiPartObjects = trove.NewCache(0, time.Duration(0))
cache.lock = new(sync.RWMutex)
@ -85,27 +109,29 @@ func NewDriver(maxSize uint64, expiration time.Duration) (drivers.Driver, error)
// GetObject - GET object from cache buffer
func (cache *cacheDriver) GetObject(w io.Writer, bucket string, object string) (int64, error) {
cache.lock.RLock()
defer cache.lock.RUnlock()
if !drivers.IsValidBucket(bucket) {
cache.lock.RUnlock()
return 0, iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, nil)
}
if !drivers.IsValidObjectName(object) {
cache.lock.RUnlock()
return 0, iodine.New(drivers.ObjectNameInvalid{Object: object}, nil)
}
if _, ok := cache.storedBuckets[bucket]; ok == false {
cache.lock.RUnlock()
return 0, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil)
}
objectKey := bucket + "/" + object
data, ok := cache.objects.Get(objectKey)
if !ok {
cache.lock.RUnlock()
if cache.driver != nil {
return cache.driver.GetObject(w, bucket, object)
}
return 0, iodine.New(drivers.ObjectNotFound{Bucket: bucket, Object: object}, nil)
}
written, err := io.Copy(w, bytes.NewBuffer(data))
cache.lock.RUnlock()
return written, iodine.New(err, nil)
if err != nil {
return 0, iodine.New(err, nil)
}
return written, nil
}
// GetPartialObject - GET object from cache buffer range
@ -117,12 +143,11 @@ func (cache *cacheDriver) GetPartialObject(w io.Writer, bucket, object string, s
"length": strconv.FormatInt(length, 10),
}
cache.lock.RLock()
defer cache.lock.RUnlock()
if !drivers.IsValidBucket(bucket) {
cache.lock.RUnlock()
return 0, iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, errParams)
}
if !drivers.IsValidObjectName(object) {
cache.lock.RUnlock()
return 0, iodine.New(drivers.ObjectNameInvalid{Object: object}, errParams)
}
if start < 0 {
@ -134,24 +159,43 @@ func (cache *cacheDriver) GetPartialObject(w io.Writer, bucket, object string, s
objectKey := bucket + "/" + object
data, ok := cache.objects.Get(objectKey)
if !ok {
cache.lock.RUnlock()
return 0, iodine.New(drivers.ObjectNotFound{Bucket: bucket, Object: object}, errParams)
if cache.driver != nil {
return cache.driver.GetPartialObject(w, bucket, object, start, length)
}
return 0, iodine.New(drivers.ObjectNotFound{Bucket: bucket, Object: object}, nil)
}
written, err := io.CopyN(w, bytes.NewBuffer(data[start:]), length)
cache.lock.RUnlock()
return written, iodine.New(err, nil)
if err != nil {
return 0, iodine.New(err, nil)
}
return written, nil
}
// GetBucketMetadata -
func (cache *cacheDriver) GetBucketMetadata(bucket string) (drivers.BucketMetadata, error) {
cache.lock.RLock()
defer cache.lock.RUnlock()
if !drivers.IsValidBucket(bucket) {
cache.lock.RUnlock()
return drivers.BucketMetadata{}, iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, nil)
}
if _, ok := cache.storedBuckets[bucket]; ok == false {
if cache.driver == nil {
cache.lock.RUnlock()
return drivers.BucketMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil)
}
bucketMetadata, err := cache.driver.GetBucketMetadata(bucket)
if err != nil {
cache.lock.RUnlock()
return drivers.BucketMetadata{}, iodine.New(err, nil)
}
storedBucket := cache.storedBuckets[bucket]
cache.lock.RUnlock()
cache.lock.Lock()
storedBucket.bucketMetadata = bucketMetadata
cache.storedBuckets[bucket] = storedBucket
cache.lock.Unlock()
}
cache.lock.RUnlock()
return cache.storedBuckets[bucket].bucketMetadata, nil
}
@ -171,10 +215,15 @@ func (cache *cacheDriver) SetBucketMetadata(bucket, acl string) error {
}
cache.lock.RUnlock()
cache.lock.Lock()
defer cache.lock.Unlock()
if cache.driver != nil {
if err := cache.driver.SetBucketMetadata(bucket, acl); err != nil {
return iodine.New(err, nil)
}
}
storedBucket := cache.storedBuckets[bucket]
storedBucket.bucketMetadata.ACL = drivers.BucketACL(acl)
cache.storedBuckets[bucket] = storedBucket
cache.lock.Unlock()
return nil
}
@ -332,6 +381,11 @@ func (cache *cacheDriver) CreateBucket(bucketName, acl string) error {
// default is private
acl = "private"
}
if cache.driver != nil {
if err := cache.driver.CreateBucket(bucketName, acl); err != nil {
return iodine.New(err, nil)
}
}
var newBucket = storedBucket{}
newBucket.objectMetadata = make(map[string]drivers.ObjectMetadata)
newBucket.multiPartSession = make(map[string]multiPartSession)
@ -475,22 +529,38 @@ func (cache *cacheDriver) ListBuckets() ([]drivers.BucketMetadata, error) {
// GetObjectMetadata - get object metadata from cache
func (cache *cacheDriver) GetObjectMetadata(bucket, key string) (drivers.ObjectMetadata, error) {
cache.lock.RLock()
defer cache.lock.RUnlock()
// check if bucket exists
if !drivers.IsValidBucket(bucket) {
cache.lock.RUnlock()
return drivers.ObjectMetadata{}, iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, nil)
}
if !drivers.IsValidObjectName(key) {
cache.lock.RUnlock()
return drivers.ObjectMetadata{}, iodine.New(drivers.ObjectNameInvalid{Object: key}, nil)
}
if _, ok := cache.storedBuckets[bucket]; ok == false {
cache.lock.RUnlock()
return drivers.ObjectMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil)
}
storedBucket := cache.storedBuckets[bucket]
objectKey := bucket + "/" + key
if object, ok := storedBucket.objectMetadata[objectKey]; ok == true {
return object, nil
if objMetadata, ok := storedBucket.objectMetadata[objectKey]; ok == true {
cache.lock.RUnlock()
return objMetadata, nil
}
if cache.driver != nil {
objMetadata, err := cache.driver.GetObjectMetadata(bucket, key)
cache.lock.RUnlock()
if err != nil {
return drivers.ObjectMetadata{}, iodine.New(err, nil)
}
// update
cache.lock.Lock()
storedBucket.objectMetadata[objectKey] = objMetadata
cache.lock.Unlock()
return objMetadata, nil
}
cache.lock.RUnlock()
return drivers.ObjectMetadata{}, iodine.New(drivers.ObjectNotFound{Bucket: bucket, Object: key}, nil)
}

View File

@ -32,7 +32,8 @@ var _ = Suite(&MySuite{})
func (s *MySuite) TestAPISuite(c *C) {
create := func() drivers.Driver {
store, err := NewDriver(1000000, 3*time.Hour)
var driver drivers.Driver
store, err := NewDriver(1000000, 3*time.Hour, driver)
c.Check(err, IsNil)
return store
}

View File

@ -17,414 +17,31 @@
package donut
import (
"bytes"
"crypto/md5"
"crypto/sha512"
"encoding/base64"
"encoding/hex"
"errors"
"io"
"math/rand"
"runtime/debug"
"sort"
"strconv"
"strings"
"time"
"github.com/minio/minio/pkg/iodine"
"github.com/minio/minio/pkg/storage/drivers"
)
// isMD5SumEqual - returns error if md5sum mismatches, success its `nil`
func isMD5SumEqual(expectedMD5Sum, actualMD5Sum string) error {
if strings.TrimSpace(expectedMD5Sum) != "" && strings.TrimSpace(actualMD5Sum) != "" {
expectedMD5SumBytes, err := hex.DecodeString(expectedMD5Sum)
if err != nil {
return iodine.New(err, nil)
}
actualMD5SumBytes, err := hex.DecodeString(actualMD5Sum)
if err != nil {
return iodine.New(err, nil)
}
if !bytes.Equal(expectedMD5SumBytes, actualMD5SumBytes) {
return iodine.New(errors.New("bad digest, md5sum mismatch"), nil)
}
return nil
}
return iodine.New(errors.New("invalid argument"), nil)
}
func (d donutDriver) NewMultipartUpload(bucketName, objectName, contentType string) (string, error) {
d.lock.RLock()
if !drivers.IsValidBucket(bucketName) {
d.lock.RUnlock()
return "", iodine.New(drivers.BucketNameInvalid{Bucket: bucketName}, nil)
}
if !drivers.IsValidObjectName(objectName) {
d.lock.RUnlock()
return "", iodine.New(drivers.ObjectNameInvalid{Object: objectName}, nil)
}
if _, ok := d.storedBuckets[bucketName]; ok == false {
d.lock.RUnlock()
return "", iodine.New(drivers.BucketNotFound{Bucket: bucketName}, nil)
}
storedBucket := d.storedBuckets[bucketName]
objectKey := bucketName + "/" + objectName
if _, ok := storedBucket.objectMetadata[objectKey]; ok == true {
d.lock.RUnlock()
return "", iodine.New(drivers.ObjectExists{Bucket: bucketName, Object: objectName}, nil)
}
d.lock.RUnlock()
d.lock.Lock()
id := []byte(strconv.FormatInt(rand.Int63(), 10) + bucketName + objectName + time.Now().String())
uploadIDSum := sha512.Sum512(id)
uploadID := base64.URLEncoding.EncodeToString(uploadIDSum[:])[:47]
d.storedBuckets[bucketName].multiPartSession[objectName] = multiPartSession{
uploadID: uploadID,
initiated: time.Now().UTC(),
totalParts: 0,
}
d.lock.Unlock()
return uploadID, nil
return "", iodine.New(drivers.APINotImplemented{API: "NewMultipartUpload"}, nil)
}
func (d donutDriver) AbortMultipartUpload(bucketName, objectName, uploadID string) error {
d.lock.RLock()
storedBucket := d.storedBuckets[bucketName]
if storedBucket.multiPartSession[objectName].uploadID != uploadID {
d.lock.RUnlock()
return iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil)
}
d.lock.RUnlock()
d.cleanupMultiparts(bucketName, objectName, uploadID)
d.cleanupMultipartSession(bucketName, objectName, uploadID)
return nil
}
func getMultipartKey(key string, uploadID string, partNumber int) string {
return key + "?uploadId=" + uploadID + "&partNumber=" + strconv.Itoa(partNumber)
return iodine.New(drivers.APINotImplemented{API: "AbortMultipartUpload"}, nil)
}
func (d donutDriver) CreateObjectPart(bucketName, objectName, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) {
// Verify upload id
d.lock.RLock()
storedBucket := d.storedBuckets[bucketName]
if storedBucket.multiPartSession[objectName].uploadID != uploadID {
d.lock.RUnlock()
return "", iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil)
}
d.lock.RUnlock()
etag, err := d.createObjectPart(bucketName, objectName, uploadID, partID, "", expectedMD5Sum, size, data)
if err != nil {
return "", iodine.New(err, nil)
}
// free
debug.FreeOSMemory()
return etag, nil
}
// createObject - PUT object to memory buffer
func (d donutDriver) createObjectPart(bucketName, objectName, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) {
d.lock.RLock()
if !drivers.IsValidBucket(bucketName) {
d.lock.RUnlock()
return "", iodine.New(drivers.BucketNameInvalid{Bucket: bucketName}, nil)
}
if !drivers.IsValidObjectName(objectName) {
d.lock.RUnlock()
return "", iodine.New(drivers.ObjectNameInvalid{Object: objectName}, nil)
}
if _, ok := d.storedBuckets[bucketName]; ok == false {
d.lock.RUnlock()
return "", iodine.New(drivers.BucketNotFound{Bucket: bucketName}, nil)
}
storedBucket := d.storedBuckets[bucketName]
// get object key
partKey := bucketName + "/" + getMultipartKey(objectName, uploadID, partID)
if _, ok := storedBucket.partMetadata[partKey]; ok == true {
d.lock.RUnlock()
return storedBucket.partMetadata[partKey].ETag, nil
}
d.lock.RUnlock()
if contentType == "" {
contentType = "application/octet-stream"
}
contentType = strings.TrimSpace(contentType)
if strings.TrimSpace(expectedMD5Sum) != "" {
expectedMD5SumBytes, err := base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum))
if err != nil {
// pro-actively close the connection
return "", iodine.New(drivers.InvalidDigest{Md5: expectedMD5Sum}, nil)
}
expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes)
}
// calculate md5
hash := md5.New()
var readBytes []byte
var err error
var length int
for err == nil {
byteBuffer := make([]byte, 1024*1024)
length, err = data.Read(byteBuffer)
// While hash.Write() wouldn't mind a Nil byteBuffer
// It is necessary for us to verify this and break
if length == 0 {
break
}
hash.Write(byteBuffer[0:length])
readBytes = append(readBytes, byteBuffer[0:length]...)
}
if err != io.EOF {
return "", iodine.New(err, nil)
}
go debug.FreeOSMemory()
md5SumBytes := hash.Sum(nil)
totalLength := int64(len(readBytes))
d.lock.Lock()
d.multiPartObjects.Set(partKey, readBytes)
d.lock.Unlock()
// setting up for de-allocation
readBytes = nil
go debug.FreeOSMemory()
md5Sum := hex.EncodeToString(md5SumBytes)
// Verify if the written object is equal to what is expected, only if it is requested as such
if strings.TrimSpace(expectedMD5Sum) != "" {
if err := isMD5SumEqual(strings.TrimSpace(expectedMD5Sum), md5Sum); err != nil {
return "", iodine.New(drivers.BadDigest{
Md5: expectedMD5Sum,
Bucket: bucketName,
Key: objectName,
}, nil)
}
}
newPart := drivers.PartMetadata{
PartNumber: partID,
LastModified: time.Now().UTC(),
ETag: md5Sum,
Size: totalLength,
}
d.lock.Lock()
storedBucket.partMetadata[partKey] = newPart
multiPartSession := storedBucket.multiPartSession[objectName]
multiPartSession.totalParts++
storedBucket.multiPartSession[objectName] = multiPartSession
d.storedBuckets[bucketName] = storedBucket
d.lock.Unlock()
return md5Sum, nil
}
func (d donutDriver) cleanupMultipartSession(bucketName, objectName, uploadID string) {
d.lock.Lock()
defer d.lock.Unlock()
delete(d.storedBuckets[bucketName].multiPartSession, objectName)
}
func (d donutDriver) cleanupMultiparts(bucketName, objectName, uploadID string) {
for i := 1; i <= d.storedBuckets[bucketName].multiPartSession[objectName].totalParts; i++ {
objectKey := bucketName + "/" + getMultipartKey(objectName, uploadID, i)
d.multiPartObjects.Delete(objectKey)
}
return "", iodine.New(drivers.APINotImplemented{API: "CreateObjectPart"}, nil)
}
func (d donutDriver) CompleteMultipartUpload(bucketName, objectName, uploadID string, parts map[int]string) (string, error) {
if !drivers.IsValidBucket(bucketName) {
return "", iodine.New(drivers.BucketNameInvalid{Bucket: bucketName}, nil)
return "", iodine.New(drivers.APINotImplemented{API: "CompleteMultipartUpload"}, nil)
}
if !drivers.IsValidObjectName(objectName) {
return "", iodine.New(drivers.ObjectNameInvalid{Object: objectName}, nil)
}
// Verify upload id
d.lock.RLock()
if _, ok := d.storedBuckets[bucketName]; ok == false {
d.lock.RUnlock()
return "", iodine.New(drivers.BucketNotFound{Bucket: bucketName}, nil)
}
storedBucket := d.storedBuckets[bucketName]
if storedBucket.multiPartSession[objectName].uploadID != uploadID {
d.lock.RUnlock()
return "", iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil)
}
d.lock.RUnlock()
d.lock.Lock()
var size int64
fullHasher := md5.New()
var fullObject bytes.Buffer
for i := 1; i <= len(parts); i++ {
recvMD5 := parts[i]
object, ok := d.multiPartObjects.Get(bucketName + "/" + getMultipartKey(objectName, uploadID, i))
if ok == false {
d.lock.Unlock()
return "", iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil)
}
size += int64(len(object))
calcMD5Bytes := md5.Sum(object)
// complete multi part request header md5sum per part is hex encoded
recvMD5Bytes, err := hex.DecodeString(strings.Trim(recvMD5, "\""))
if err != nil {
return "", iodine.New(drivers.InvalidDigest{Md5: recvMD5}, nil)
}
if !bytes.Equal(recvMD5Bytes, calcMD5Bytes[:]) {
return "", iodine.New(drivers.BadDigest{
Md5: recvMD5,
Bucket: bucketName,
Key: getMultipartKey(objectName, uploadID, i),
}, nil)
}
mw := io.MultiWriter(&fullObject, fullHasher)
_, err = io.Copy(mw, bytes.NewReader(object))
if err != nil {
return "", iodine.New(err, nil)
}
object = nil
go debug.FreeOSMemory()
}
d.lock.Unlock()
md5sumSlice := fullHasher.Sum(nil)
// this is needed for final verification inside CreateObject, do not convert this to hex
md5sum := base64.StdEncoding.EncodeToString(md5sumSlice)
etag, err := d.CreateObject(bucketName, objectName, "", md5sum, size, &fullObject)
if err != nil {
// No need to call internal cleanup functions here, caller will call AbortMultipartUpload()
// which would in-turn cleanup properly in accordance with S3 Spec
return "", iodine.New(err, nil)
}
fullObject.Reset()
go debug.FreeOSMemory()
d.cleanupMultiparts(bucketName, objectName, uploadID)
d.cleanupMultipartSession(bucketName, objectName, uploadID)
return etag, nil
}
// byKey is a sortable interface for UploadMetadata slice
type byKey []*drivers.UploadMetadata
func (a byKey) Len() int { return len(a) }
func (a byKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
func (d donutDriver) ListMultipartUploads(bucketName string, resources drivers.BucketMultipartResourcesMetadata) (drivers.BucketMultipartResourcesMetadata, error) {
d.lock.RLock()
defer d.lock.RUnlock()
if _, ok := d.storedBuckets[bucketName]; ok == false {
return drivers.BucketMultipartResourcesMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucketName}, nil)
return drivers.BucketMultipartResourcesMetadata{}, iodine.New(drivers.APINotImplemented{API: "ListMultipartUploads"}, nil)
}
storedBucket := d.storedBuckets[bucketName]
var uploads []*drivers.UploadMetadata
for key, session := range storedBucket.multiPartSession {
if strings.HasPrefix(key, resources.Prefix) {
if len(uploads) > resources.MaxUploads {
sort.Sort(byKey(uploads))
resources.Upload = uploads
resources.NextKeyMarker = key
resources.NextUploadIDMarker = session.uploadID
resources.IsTruncated = true
return resources, nil
}
// uploadIDMarker is ignored if KeyMarker is empty
switch {
case resources.KeyMarker != "" && resources.UploadIDMarker == "":
if key > resources.KeyMarker {
upload := new(drivers.UploadMetadata)
upload.Key = key
upload.UploadID = session.uploadID
upload.Initiated = session.initiated
uploads = append(uploads, upload)
}
case resources.KeyMarker != "" && resources.UploadIDMarker != "":
if session.uploadID > resources.UploadIDMarker {
if key >= resources.KeyMarker {
upload := new(drivers.UploadMetadata)
upload.Key = key
upload.UploadID = session.uploadID
upload.Initiated = session.initiated
uploads = append(uploads, upload)
}
}
default:
upload := new(drivers.UploadMetadata)
upload.Key = key
upload.UploadID = session.uploadID
upload.Initiated = session.initiated
uploads = append(uploads, upload)
}
}
}
sort.Sort(byKey(uploads))
resources.Upload = uploads
return resources, nil
}
// partNumber is a sortable interface for Part slice
type partNumber []*drivers.PartMetadata
func (a partNumber) Len() int { return len(a) }
func (a partNumber) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a partNumber) Less(i, j int) bool { return a[i].PartNumber < a[j].PartNumber }
func (d donutDriver) ListObjectParts(bucketName, objectName string, resources drivers.ObjectResourcesMetadata) (drivers.ObjectResourcesMetadata, error) {
// Verify upload id
d.lock.RLock()
defer d.lock.RUnlock()
if _, ok := d.storedBuckets[bucketName]; ok == false {
return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucketName}, nil)
}
storedBucket := d.storedBuckets[bucketName]
if _, ok := storedBucket.multiPartSession[objectName]; ok == false {
return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.ObjectNotFound{Bucket: bucketName, Object: objectName}, nil)
}
if storedBucket.multiPartSession[objectName].uploadID != resources.UploadID {
return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.InvalidUploadID{UploadID: resources.UploadID}, nil)
}
objectResourcesMetadata := resources
objectResourcesMetadata.Bucket = bucketName
objectResourcesMetadata.Key = objectName
var parts []*drivers.PartMetadata
var startPartNumber int
switch {
case objectResourcesMetadata.PartNumberMarker == 0:
startPartNumber = 1
default:
startPartNumber = objectResourcesMetadata.PartNumberMarker
}
for i := startPartNumber; i <= storedBucket.multiPartSession[objectName].totalParts; i++ {
if len(parts) > objectResourcesMetadata.MaxParts {
sort.Sort(partNumber(parts))
objectResourcesMetadata.IsTruncated = true
objectResourcesMetadata.Part = parts
objectResourcesMetadata.NextPartNumberMarker = i
return objectResourcesMetadata, nil
}
part, ok := storedBucket.partMetadata[bucketName+"/"+getMultipartKey(objectName, resources.UploadID, i)]
if !ok {
return drivers.ObjectResourcesMetadata{}, iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil)
}
parts = append(parts, &part)
}
sort.Sort(partNumber(parts))
objectResourcesMetadata.Part = parts
return objectResourcesMetadata, nil
}
func (d donutDriver) expiredPart(a ...interface{}) {
key := a[0].(string)
// loop through all buckets
for _, storedBucket := range d.storedBuckets {
delete(storedBucket.partMetadata, key)
}
go debug.FreeOSMemory()
return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.APINotImplemented{API: "ListObjectParts"}, nil)
}

View File

@ -17,43 +17,21 @@
package donut
import (
"bytes"
"encoding/base64"
"encoding/hex"
"io"
"os"
"path/filepath"
"runtime/debug"
"sort"
"strconv"
"strings"
"sync"
"time"
"io/ioutil"
"github.com/minio/minio/pkg/iodine"
"github.com/minio/minio/pkg/storage/donut"
"github.com/minio/minio/pkg/storage/drivers"
"github.com/minio/minio/pkg/storage/trove"
"github.com/minio/minio/pkg/utils/log"
)
type storedBucket struct {
bucketMetadata drivers.BucketMetadata
objectMetadata map[string]drivers.ObjectMetadata
partMetadata map[string]drivers.PartMetadata
multiPartSession map[string]multiPartSession
}
type multiPartSession struct {
totalParts int
uploadID string
initiated time.Time
}
const (
totalBuckets = 100
)
// donutDriver - creates a new single disk drivers driver using donut
@ -61,11 +39,6 @@ type donutDriver struct {
donut donut.Donut
paths []string
lock *sync.RWMutex
storedBuckets map[string]storedBucket
objects *trove.Cache
multiPartObjects *trove.Cache
maxSize uint64
expiration time.Duration
}
// This is a dummy nodeDiskMap which is going to be deprecated soon
@ -101,83 +74,18 @@ func createNodeDiskMap(paths []string) map[string][]string {
return nodes
}
func initialize(d *donutDriver) error {
// NewDriver instantiate a donut driver
func NewDriver(paths []string) (drivers.Driver, error) {
driver := new(donutDriver)
driver.paths = paths
driver.lock = new(sync.RWMutex)
// Soon to be user configurable, when Management API is available
// we should remove "default" to something which is passed down
// from configuration paramters
var err error
d.donut, err = donut.NewDonut("default", createNodeDiskMap(d.paths))
if err != nil {
return iodine.New(err, nil)
}
buckets, err := d.donut.ListBuckets()
if err != nil {
return iodine.New(err, nil)
}
for bucketName, metadata := range buckets {
d.lock.RLock()
storedBucket := d.storedBuckets[bucketName]
d.lock.RUnlock()
if len(storedBucket.multiPartSession) == 0 {
storedBucket.multiPartSession = make(map[string]multiPartSession)
}
if len(storedBucket.objectMetadata) == 0 {
storedBucket.objectMetadata = make(map[string]drivers.ObjectMetadata)
}
if len(storedBucket.partMetadata) == 0 {
storedBucket.partMetadata = make(map[string]drivers.PartMetadata)
}
storedBucket.bucketMetadata = drivers.BucketMetadata{
Name: metadata.Name,
Created: metadata.Created,
ACL: drivers.BucketACL(metadata.ACL),
}
d.lock.Lock()
d.storedBuckets[bucketName] = storedBucket
d.lock.Unlock()
}
return nil
}
// NewDriver instantiate a donut driver
func NewDriver(paths []string, maxSize uint64, expiration time.Duration) (drivers.Driver, error) {
driver := new(donutDriver)
driver.storedBuckets = make(map[string]storedBucket)
driver.objects = trove.NewCache(maxSize, expiration)
driver.maxSize = maxSize
driver.expiration = expiration
driver.multiPartObjects = trove.NewCache(0, time.Duration(0))
driver.lock = new(sync.RWMutex)
driver.objects.OnExpired = driver.expiredObject
driver.multiPartObjects.OnExpired = driver.expiredPart
// set up memory expiration
driver.objects.ExpireObjects(time.Second * 5)
driver.paths = paths
driver.lock = new(sync.RWMutex)
err := initialize(driver)
return driver, err
}
func (d donutDriver) expiredObject(a ...interface{}) {
cacheStats := d.objects.Stats()
log.Printf("CurrentSize: %d, CurrentItems: %d, TotalExpirations: %d",
cacheStats.Bytes, cacheStats.Items, cacheStats.Expired)
key := a[0].(string)
// loop through all buckets
for bucket, storedBucket := range d.storedBuckets {
delete(storedBucket.objectMetadata, key)
// remove bucket if no objects found anymore
if len(storedBucket.objectMetadata) == 0 {
if time.Since(d.storedBuckets[bucket].bucketMetadata.Created) > d.expiration {
delete(d.storedBuckets, bucket)
}
}
}
go debug.FreeOSMemory()
driver.donut, err = donut.NewDonut("default", createNodeDiskMap(driver.paths))
return driver, iodine.New(err, nil)
}
// byBucketName is a type for sorting bucket metadata by bucket name
@ -192,8 +100,17 @@ func (d donutDriver) ListBuckets() (results []drivers.BucketMetadata, err error)
if d.donut == nil {
return nil, iodine.New(drivers.InternalError{}, nil)
}
for _, storedBucket := range d.storedBuckets {
results = append(results, storedBucket.bucketMetadata)
buckets, err := d.donut.ListBuckets()
if err != nil {
return nil, iodine.New(err, nil)
}
for _, metadata := range buckets {
result := drivers.BucketMetadata{
Name: metadata.Name,
Created: metadata.Created,
ACL: drivers.BucketACL(metadata.ACL),
}
results = append(results, result)
}
sort.Sort(byBucketName(results))
return results, nil
@ -203,9 +120,6 @@ func (d donutDriver) ListBuckets() (results []drivers.BucketMetadata, err error)
func (d donutDriver) CreateBucket(bucketName, acl string) error {
d.lock.Lock()
defer d.lock.Unlock()
if len(d.storedBuckets) == totalBuckets {
return iodine.New(drivers.TooManyBuckets{Bucket: bucketName}, nil)
}
if d.donut == nil {
return iodine.New(drivers.InternalError{}, nil)
}
@ -223,20 +137,6 @@ func (d donutDriver) CreateBucket(bucketName, acl string) error {
}
return iodine.New(err, nil)
}
var newBucket = storedBucket{}
newBucket.objectMetadata = make(map[string]drivers.ObjectMetadata)
newBucket.multiPartSession = make(map[string]multiPartSession)
newBucket.partMetadata = make(map[string]drivers.PartMetadata)
metadata, err := d.donut.GetBucketMetadata(bucketName)
if err != nil {
return iodine.New(err, nil)
}
newBucket.bucketMetadata = drivers.BucketMetadata{
Name: metadata.Name,
Created: metadata.Created,
ACL: drivers.BucketACL(metadata.ACL),
}
d.storedBuckets[bucketName] = newBucket
return nil
}
return iodine.New(drivers.BucketNameInvalid{Bucket: bucketName}, nil)
@ -252,9 +152,6 @@ func (d donutDriver) GetBucketMetadata(bucketName string) (drivers.BucketMetadat
if !drivers.IsValidBucket(bucketName) {
return drivers.BucketMetadata{}, drivers.BucketNameInvalid{Bucket: bucketName}
}
if d.storedBuckets[bucketName].bucketMetadata.Name != "" {
return d.storedBuckets[bucketName].bucketMetadata, nil
}
metadata, err := d.donut.GetBucketMetadata(bucketName)
if err != nil {
return drivers.BucketMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucketName}, nil)
@ -286,9 +183,6 @@ func (d donutDriver) SetBucketMetadata(bucketName, acl string) error {
if err != nil {
return iodine.New(drivers.BucketNotFound{Bucket: bucketName}, nil)
}
storedBucket := d.storedBuckets[bucketName]
storedBucket.bucketMetadata.ACL = drivers.BucketACL(acl)
d.storedBuckets[bucketName] = storedBucket
return nil
}
@ -303,14 +197,8 @@ func (d donutDriver) GetObject(w io.Writer, bucketName, objectName string) (int6
if !drivers.IsValidObjectName(objectName) {
return 0, iodine.New(drivers.ObjectNameInvalid{Object: objectName}, nil)
}
if _, ok := d.storedBuckets[bucketName]; ok == false {
return 0, iodine.New(drivers.BucketNotFound{Bucket: bucketName}, nil)
}
d.lock.RLock()
defer d.lock.RUnlock()
objectKey := bucketName + "/" + objectName
data, ok := d.objects.Get(objectKey)
if !ok {
reader, size, err := d.donut.GetObject(bucketName, objectName)
if err != nil {
switch iodine.ToError(err).(type) {
@ -325,19 +213,7 @@ func (d donutDriver) GetObject(w io.Writer, bucketName, objectName string) (int6
return 0, iodine.New(drivers.InternalError{}, nil)
}
}
pw := newProxyWriter(w)
n, err := io.CopyN(pw, reader, size)
if err != nil {
return 0, iodine.New(err, nil)
}
// Save in memory for future reads
d.objects.Set(objectKey, pw.writtenBytes)
// free up
pw.writtenBytes = nil
go debug.FreeOSMemory()
return n, nil
}
written, err := io.Copy(w, bytes.NewBuffer(data))
written, err := io.CopyN(w, reader, size)
if err != nil {
return 0, iodine.New(err, nil)
}
@ -369,12 +245,6 @@ func (d donutDriver) GetPartialObject(w io.Writer, bucketName, objectName string
Length: length,
}, errParams)
}
if _, ok := d.storedBuckets[bucketName]; ok == false {
return 0, iodine.New(drivers.BucketNotFound{Bucket: bucketName}, nil)
}
objectKey := bucketName + "/" + objectName
data, ok := d.objects.Get(objectKey)
if !ok {
reader, size, err := d.donut.GetObject(bucketName, objectName)
if err != nil {
switch iodine.ToError(err).(type) {
@ -406,9 +276,6 @@ func (d donutDriver) GetPartialObject(w io.Writer, bucketName, objectName string
}
return n, nil
}
written, err := io.CopyN(w, bytes.NewBuffer(data[start:]), length)
return written, iodine.New(err, nil)
}
// GetObjectMetadata retrieves an object's metadata
func (d donutDriver) GetObjectMetadata(bucketName, objectName string) (drivers.ObjectMetadata, error) {
@ -428,13 +295,6 @@ func (d donutDriver) GetObjectMetadata(bucketName, objectName string) (drivers.O
if !drivers.IsValidObjectName(objectName) {
return drivers.ObjectMetadata{}, iodine.New(drivers.ObjectNameInvalid{Object: objectName}, errParams)
}
if _, ok := d.storedBuckets[bucketName]; ok {
storedBucket := d.storedBuckets[bucketName]
objectKey := bucketName + "/" + objectName
if object, ok := storedBucket.objectMetadata[objectKey]; ok {
return object, nil
}
}
metadata, err := d.donut.GetObjectMetadata(bucketName, objectName)
if err != nil {
return drivers.ObjectMetadata{}, iodine.New(drivers.ObjectNotFound{
@ -498,24 +358,6 @@ func (d donutDriver) ListObjects(bucketName string, resources drivers.BucketReso
return results, resources, nil
}
type proxyWriter struct {
writer io.Writer
writtenBytes []byte
}
func (r *proxyWriter) Write(p []byte) (n int, err error) {
n, err = r.writer.Write(p)
if err != nil {
return
}
r.writtenBytes = append(r.writtenBytes, p[0:n]...)
return
}
func newProxyWriter(w io.Writer) *proxyWriter {
return &proxyWriter{writer: w, writtenBytes: nil}
}
// CreateObject creates a new object
func (d donutDriver) CreateObject(bucketName, objectName, contentType, expectedMD5Sum string, size int64, reader io.Reader) (string, error) {
d.lock.Lock()
@ -528,27 +370,12 @@ func (d donutDriver) CreateObject(bucketName, objectName, contentType, expectedM
if d.donut == nil {
return "", iodine.New(drivers.InternalError{}, errParams)
}
// TODO - Should be able to write bigger than cache
if size > int64(d.maxSize) {
generic := drivers.GenericObjectError{Bucket: bucketName, Object: objectName}
return "", iodine.New(drivers.EntityTooLarge{
GenericObjectError: generic,
Size: strconv.FormatInt(size, 10),
MaxSize: strconv.FormatUint(d.maxSize, 10),
}, nil)
}
if !drivers.IsValidBucket(bucketName) {
return "", iodine.New(drivers.BucketNameInvalid{Bucket: bucketName}, nil)
}
if !drivers.IsValidObjectName(objectName) {
return "", iodine.New(drivers.ObjectNameInvalid{Object: objectName}, nil)
}
storedBucket := d.storedBuckets[bucketName]
// get object key
objectKey := bucketName + "/" + objectName
if _, ok := storedBucket.objectMetadata[objectKey]; ok == true {
return "", iodine.New(drivers.ObjectExists{Bucket: bucketName, Object: objectName}, nil)
}
if strings.TrimSpace(contentType) == "" {
contentType = "application/octet-stream"
}
@ -579,7 +406,5 @@ func (d donutDriver) CreateObject(bucketName, objectName, contentType, expectedM
Md5: objMetadata.MD5Sum,
Size: objMetadata.Size,
}
storedBucket.objectMetadata[objectKey] = newObject
d.storedBuckets[bucketName] = storedBucket
return newObject.Md5, nil
}

View File

@ -20,7 +20,6 @@ import (
"io/ioutil"
"os"
"testing"
"time"
. "github.com/minio/check"
"github.com/minio/minio/pkg/storage/drivers"
@ -40,7 +39,7 @@ func (s *MySuite) TestAPISuite(c *C) {
c.Check(err, IsNil)
storageList = append(storageList, p)
paths = append(paths, p)
store, err := NewDriver(paths, 1000000, 3*time.Hour)
store, err := NewDriver(paths)
c.Check(err, IsNil)
return store
}