Merge pull request #682 from harshavardhana/pr_out_an_attempt_to_bring_in_memory_layer_into_donut_driver

This commit is contained in:
Harshavardhana 2015-06-28 00:25:27 +00:00
commit c65969077d
11 changed files with 713 additions and 132 deletions

View File

@ -154,6 +154,8 @@ func runMemory(c *cli.Context) {
} }
func runDonut(c *cli.Context) { func runDonut(c *cli.Context) {
var err error
u, err := user.Current() u, err := user.Current()
if err != nil { if err != nil {
Fatalf("Unable to determine current user. Reason: %s\n", err) Fatalf("Unable to determine current user. Reason: %s\n", err)
@ -161,6 +163,53 @@ func runDonut(c *cli.Context) {
if len(c.Args()) < 1 { if len(c.Args()) < 1 {
cli.ShowCommandHelpAndExit(c, "donut", 1) // last argument is exit code cli.ShowCommandHelpAndExit(c, "donut", 1) // last argument is exit code
} }
var maxMemory uint64
maxMemorySet := false
var expiration time.Duration
expirationSet := false
args := c.Args()
for len(args) > 0 {
switch args.First() {
case "limit":
{
if maxMemorySet {
Fatalln("Limit should be set only once")
}
args = args.Tail()
maxMemory, err = humanize.ParseBytes(args.First())
if err != nil {
Fatalf("Invalid memory size [%s] passed. Reason: %s\n", args.First(), iodine.New(err, nil))
}
if maxMemory < 1024*1024*10 {
Fatalf("Invalid memory size [%s] passed. Should be greater than 10M\n", args.First())
}
args = args.Tail()
maxMemorySet = true
}
case "expire":
{
if expirationSet {
Fatalln("Expiration should be set only once")
}
args = args.Tail()
expiration, err = time.ParseDuration(args.First())
if err != nil {
Fatalf("Invalid expiration time [%s] passed. Reason: %s\n", args.First(), iodine.New(err, nil))
}
args = args.Tail()
expirationSet = true
}
default:
{
cli.ShowCommandHelpAndExit(c, "donut", 1) // last argument is exit code
}
}
}
if maxMemorySet == false {
Fatalln("Memory limit must be set")
}
// supporting multiple paths // supporting multiple paths
var paths []string var paths []string
if strings.TrimSpace(c.Args().First()) == "" { if strings.TrimSpace(c.Args().First()) == "" {
@ -175,6 +224,8 @@ func runDonut(c *cli.Context) {
donutDriver := server.DonutFactory{ donutDriver := server.DonutFactory{
Config: apiServerConfig, Config: apiServerConfig,
Paths: paths, Paths: paths,
MaxMemory: maxMemory,
Expiration: expiration,
} }
apiServer := donutDriver.GetStartServerFunc() apiServer := donutDriver.GetStartServerFunc()
// webServer := getWebServerConfigFunc(c) // webServer := getWebServerConfigFunc(c)

View File

@ -59,7 +59,7 @@ var _ = Suite(&MySuite{
var _ = Suite(&MySuite{ var _ = Suite(&MySuite{
initDriver: func() (drivers.Driver, string) { initDriver: func() (drivers.Driver, string) {
_, _, driver := memory.Start(1000, 3*time.Hour) _, _, driver := memory.Start(10000, 3*time.Hour)
return driver, "" return driver, ""
}, },
}) })
@ -69,7 +69,7 @@ var _ = Suite(&MySuite{
root, _ := ioutil.TempDir(os.TempDir(), "minio-api") root, _ := ioutil.TempDir(os.TempDir(), "minio-api")
var roots []string var roots []string
roots = append(roots, root) roots = append(roots, root)
_, _, driver := donut.Start(roots) _, _, driver := donut.Start(roots, 10000, 3*time.Hour)
return driver, root return driver, root
}, },
}) })
@ -1451,13 +1451,6 @@ func (s *MySuite) TestObjectMultipartAbort(c *C) {
{ {
driver.AssertExpectations(c) driver.AssertExpectations(c)
} }
default:
// Donut doesn't have multipart support yet
{
if reflect.TypeOf(driver).String() == "*donut.donutDriver" {
return
}
}
} }
driver := s.Driver driver := s.Driver
typedDriver := s.MockDriver typedDriver := s.MockDriver
@ -1534,13 +1527,6 @@ func (s *MySuite) TestBucketMultipartList(c *C) {
{ {
driver.AssertExpectations(c) driver.AssertExpectations(c)
} }
default:
// Donut doesn't have multipart support yet
{
if reflect.TypeOf(driver).String() == "*donut.donutDriver" {
return
}
}
} }
driver := s.Driver driver := s.Driver
typedDriver := s.MockDriver typedDriver := s.MockDriver
@ -1623,13 +1609,6 @@ func (s *MySuite) TestObjectMultipartList(c *C) {
{ {
driver.AssertExpectations(c) driver.AssertExpectations(c)
} }
default:
// Donut doesn't have multipart support yet
{
if reflect.TypeOf(driver).String() == "*donut.donutDriver" {
return
}
}
} }
driver := s.Driver driver := s.Driver
typedDriver := s.MockDriver typedDriver := s.MockDriver
@ -1707,13 +1686,6 @@ func (s *MySuite) TestObjectMultipart(c *C) {
{ {
driver.AssertExpectations(c) driver.AssertExpectations(c)
} }
default:
// Donut doesn't have multipart support yet
{
if reflect.TypeOf(driver).String() == "*donut.donutDriver" {
return
}
}
} }
driver := s.Driver driver := s.Driver
typedDriver := s.MockDriver typedDriver := s.MockDriver

View File

@ -84,12 +84,14 @@ func (f WebFactory) GetStartServerFunc() StartServerFunc {
type DonutFactory struct { type DonutFactory struct {
httpserver.Config httpserver.Config
Paths []string Paths []string
MaxMemory uint64
Expiration time.Duration
} }
// GetStartServerFunc DonutFactory builds donut api server // GetStartServerFunc DonutFactory builds donut api server
func (f DonutFactory) GetStartServerFunc() StartServerFunc { func (f DonutFactory) GetStartServerFunc() StartServerFunc {
return func() (chan<- string, <-chan error) { return func() (chan<- string, <-chan error) {
_, _, driver := donut.Start(f.Paths) _, _, driver := donut.Start(f.Paths, f.MaxMemory, f.Expiration)
conf := api.Config{RateLimit: f.RateLimit} conf := api.Config{RateLimit: f.RateLimit}
conf.SetDriver(driver) conf.SetDriver(driver)
ctrl, status, _ := httpserver.Start(api.HTTPHandler(conf), f.Config) ctrl, status, _ := httpserver.Start(api.HTTPHandler(conf), f.Config)

View File

@ -377,14 +377,13 @@ func (b bucket) getDataAndParity(totalWriters int) (k uint8, m uint8, err error)
// writeEncodedData - // writeEncodedData -
func (b bucket) writeEncodedData(k, m uint8, writers []io.WriteCloser, objectData io.Reader, sumMD5, sum512 hash.Hash) (int, int, error) { func (b bucket) writeEncodedData(k, m uint8, writers []io.WriteCloser, objectData io.Reader, sumMD5, sum512 hash.Hash) (int, int, error) {
chunks := split.Stream(objectData, 10*1024*1024)
encoder, err := newEncoder(k, m, "Cauchy") encoder, err := newEncoder(k, m, "Cauchy")
if err != nil { if err != nil {
return 0, 0, iodine.New(err, nil) return 0, 0, iodine.New(err, nil)
} }
chunkCount := 0 chunkCount := 0
totalLength := 0 totalLength := 0
for chunk := range chunks { for chunk := range split.Stream(objectData, 10*1024*1024) {
if chunk.Err == nil { if chunk.Err == nil {
totalLength = totalLength + len(chunk.Data) totalLength = totalLength + len(chunk.Data)
encodedBlocks, _ := encoder.Encode(chunk.Data) encodedBlocks, _ := encoder.Encode(chunk.Data)

View File

@ -175,7 +175,7 @@ func (dt donut) ListObjects(bucket, prefix, marker, delimiter string, maxkeys in
} }
// PutObject - put object // PutObject - put object
func (dt donut) PutObject(bucket, object, expectedMD5Sum string, reader io.ReadCloser, metadata map[string]string) (string, error) { func (dt donut) PutObject(bucket, object, expectedMD5Sum string, reader io.Reader, metadata map[string]string) (string, error) {
dt.lock.Lock() dt.lock.Lock()
defer dt.lock.Unlock() defer dt.lock.Unlock()
errParams := map[string]string{ errParams := map[string]string{

View File

@ -40,7 +40,7 @@ type ObjectStorage interface {
// Object operations // Object operations
GetObject(bucket, object string) (io.ReadCloser, int64, error) GetObject(bucket, object string) (io.ReadCloser, int64, error)
GetObjectMetadata(bucket, object string) (ObjectMetadata, error) GetObjectMetadata(bucket, object string) (ObjectMetadata, error)
PutObject(bucket, object, expectedMD5Sum string, reader io.ReadCloser, metadata map[string]string) (string, error) PutObject(bucket, object, expectedMD5Sum string, reader io.Reader, metadata map[string]string) (string, error)
} }
// Management is a donut management system interface // Management is a donut management system interface

View File

@ -22,7 +22,6 @@ import (
"encoding/base64" "encoding/base64"
"encoding/hex" "encoding/hex"
"math/rand" "math/rand"
"reflect"
"strconv" "strconv"
"time" "time"
@ -59,10 +58,6 @@ func testCreateBucket(c *check.C, create func() Driver) {
func testMultipartObjectCreation(c *check.C, create func() Driver) { func testMultipartObjectCreation(c *check.C, create func() Driver) {
drivers := create() drivers := create()
switch {
case reflect.TypeOf(drivers).String() == "*donut.donutDriver":
return
}
err := drivers.CreateBucket("bucket", "") err := drivers.CreateBucket("bucket", "")
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
uploadID, err := drivers.NewMultipartUpload("bucket", "key", "") uploadID, err := drivers.NewMultipartUpload("bucket", "key", "")
@ -97,10 +92,6 @@ func testMultipartObjectCreation(c *check.C, create func() Driver) {
func testMultipartObjectAbort(c *check.C, create func() Driver) { func testMultipartObjectAbort(c *check.C, create func() Driver) {
drivers := create() drivers := create()
switch {
case reflect.TypeOf(drivers).String() == "*donut.donutDriver":
return
}
err := drivers.CreateBucket("bucket", "") err := drivers.CreateBucket("bucket", "")
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
uploadID, err := drivers.NewMultipartUpload("bucket", "key", "") uploadID, err := drivers.NewMultipartUpload("bucket", "key", "")

View File

@ -17,29 +17,55 @@
package donut package donut
import ( import (
"bytes"
"encoding/base64" "encoding/base64"
"encoding/hex" "encoding/hex"
"io" "io"
"os" "os"
"path/filepath" "path/filepath"
"runtime/debug"
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"time"
"io/ioutil" "io/ioutil"
"github.com/minio/minio/pkg/iodine" "github.com/minio/minio/pkg/iodine"
"github.com/minio/minio/pkg/storage/donut" "github.com/minio/minio/pkg/storage/donut"
"github.com/minio/minio/pkg/storage/drivers" "github.com/minio/minio/pkg/storage/drivers"
"github.com/minio/minio/pkg/storage/trove"
"github.com/minio/minio/pkg/utils/log" "github.com/minio/minio/pkg/utils/log"
) )
type storedBucket struct {
bucketMetadata drivers.BucketMetadata
objectMetadata map[string]drivers.ObjectMetadata
partMetadata map[string]drivers.PartMetadata
multiPartSession map[string]multiPartSession
}
type multiPartSession struct {
totalParts int
uploadID string
initiated time.Time
}
const (
totalBuckets = 100
)
// donutDriver - creates a new single disk drivers driver using donut // donutDriver - creates a new single disk drivers driver using donut
type donutDriver struct { type donutDriver struct {
donut donut.Donut donut donut.Donut
paths []string paths []string
lock *sync.RWMutex lock *sync.RWMutex
storedBuckets map[string]storedBucket
objects *trove.Cache
multiPartObjects *trove.Cache
maxSize uint64
expiration time.Duration
} }
// This is a dummy nodeDiskMap which is going to be deprecated soon // This is a dummy nodeDiskMap which is going to be deprecated soon
@ -82,41 +108,72 @@ func createNodeDiskMapFromSlice(paths []string) map[string][]string {
} }
// Start a single disk subsystem // Start a single disk subsystem
func Start(paths []string) (chan<- string, <-chan error, drivers.Driver) { func Start(paths []string, maxSize uint64, expiration time.Duration) (chan<- string, <-chan error, drivers.Driver) {
ctrlChannel := make(chan string) ctrlChannel := make(chan string)
errorChannel := make(chan error) errorChannel := make(chan error)
driver := new(donutDriver)
driver.storedBuckets = make(map[string]storedBucket)
driver.objects = trove.NewCache(maxSize, expiration)
driver.maxSize = maxSize
driver.expiration = expiration
driver.multiPartObjects = trove.NewCache(0, time.Duration(0))
driver.lock = new(sync.RWMutex)
driver.objects.OnExpired = driver.expiredObject
driver.multiPartObjects.OnExpired = driver.expiredPart
// set up memory expiration
driver.objects.ExpireObjects(time.Second * 5)
// Soon to be user configurable, when Management API is available // Soon to be user configurable, when Management API is available
// we should remove "default" to something which is passed down // we should remove "default" to something which is passed down
// from configuration paramters // from configuration paramters
var d donut.Donut switch {
var err error case len(paths) == 1:
if len(paths) == 1 { d, err := donut.NewDonut("default", createNodeDiskMap(paths[0]))
d, err = donut.NewDonut("default", createNodeDiskMap(paths[0]))
if err != nil { if err != nil {
err = iodine.New(err, nil) err = iodine.New(err, nil)
log.Error.Println(err) log.Error.Println(err)
} }
} else { driver.donut = d
d, err = donut.NewDonut("default", createNodeDiskMapFromSlice(paths)) default:
d, err := donut.NewDonut("default", createNodeDiskMapFromSlice(paths))
if err != nil { if err != nil {
err = iodine.New(err, nil) err = iodine.New(err, nil)
log.Error.Println(err) log.Error.Println(err)
} }
driver.donut = d
} }
s := new(donutDriver) driver.paths = paths
s.donut = d driver.lock = new(sync.RWMutex)
s.paths = paths
s.lock = new(sync.RWMutex)
go start(ctrlChannel, errorChannel, s) go start(ctrlChannel, errorChannel, driver)
return ctrlChannel, errorChannel, s return ctrlChannel, errorChannel, driver
} }
func start(ctrlChannel <-chan string, errorChannel chan<- error, s *donutDriver) { func start(ctrlChannel <-chan string, errorChannel chan<- error, driver *donutDriver) {
close(errorChannel) close(errorChannel)
} }
func (d donutDriver) expiredObject(a ...interface{}) {
cacheStats := d.objects.Stats()
log.Printf("CurrentSize: %d, CurrentItems: %d, TotalExpirations: %d",
cacheStats.Bytes, cacheStats.Items, cacheStats.Expired)
key := a[0].(string)
// loop through all buckets
for bucket, storedBucket := range d.storedBuckets {
delete(storedBucket.objectMetadata, key)
// remove bucket if no objects found anymore
if len(storedBucket.objectMetadata) == 0 {
if time.Since(d.storedBuckets[bucket].bucketMetadata.Created) > d.expiration {
delete(d.storedBuckets, bucket)
}
}
}
go debug.FreeOSMemory()
}
// byBucketName is a type for sorting bucket metadata by bucket name // byBucketName is a type for sorting bucket metadata by bucket name
type byBucketName []drivers.BucketMetadata type byBucketName []drivers.BucketMetadata
@ -126,8 +183,6 @@ func (b byBucketName) Less(i, j int) bool { return b[i].Name < b[j].Name }
// ListBuckets returns a list of buckets // ListBuckets returns a list of buckets
func (d donutDriver) ListBuckets() (results []drivers.BucketMetadata, err error) { func (d donutDriver) ListBuckets() (results []drivers.BucketMetadata, err error) {
d.lock.RLock()
defer d.lock.RUnlock()
if d.donut == nil { if d.donut == nil {
return nil, iodine.New(drivers.InternalError{}, nil) return nil, iodine.New(drivers.InternalError{}, nil)
} }
@ -135,11 +190,17 @@ func (d donutDriver) ListBuckets() (results []drivers.BucketMetadata, err error)
if err != nil { if err != nil {
return nil, err return nil, err
} }
for name, metadata := range buckets { for bucketName, metadata := range buckets {
result := drivers.BucketMetadata{ result := drivers.BucketMetadata{
Name: name, Name: metadata.Name,
Created: metadata.Created, Created: metadata.Created,
ACL: drivers.BucketACL(metadata.ACL),
} }
d.lock.Lock()
storedBucket := d.storedBuckets[bucketName]
storedBucket.bucketMetadata = result
d.storedBuckets[bucketName] = storedBucket
d.lock.Unlock()
results = append(results, result) results = append(results, result)
} }
sort.Sort(byBucketName(results)) sort.Sort(byBucketName(results))
@ -150,6 +211,9 @@ func (d donutDriver) ListBuckets() (results []drivers.BucketMetadata, err error)
func (d donutDriver) CreateBucket(bucketName, acl string) error { func (d donutDriver) CreateBucket(bucketName, acl string) error {
d.lock.Lock() d.lock.Lock()
defer d.lock.Unlock() defer d.lock.Unlock()
if len(d.storedBuckets) == totalBuckets {
return iodine.New(drivers.TooManyBuckets{Bucket: bucketName}, nil)
}
if d.donut == nil { if d.donut == nil {
return iodine.New(drivers.InternalError{}, nil) return iodine.New(drivers.InternalError{}, nil)
} }
@ -167,6 +231,11 @@ func (d donutDriver) CreateBucket(bucketName, acl string) error {
} }
return iodine.New(err, nil) return iodine.New(err, nil)
} }
var newBucket = storedBucket{}
newBucket.objectMetadata = make(map[string]drivers.ObjectMetadata)
newBucket.multiPartSession = make(map[string]multiPartSession)
newBucket.partMetadata = make(map[string]drivers.PartMetadata)
d.storedBuckets[bucketName] = newBucket
return nil return nil
} }
return iodine.New(drivers.BucketNameInvalid{Bucket: bucketName}, nil) return iodine.New(drivers.BucketNameInvalid{Bucket: bucketName}, nil)
@ -179,9 +248,12 @@ func (d donutDriver) GetBucketMetadata(bucketName string) (drivers.BucketMetadat
if d.donut == nil { if d.donut == nil {
return drivers.BucketMetadata{}, iodine.New(drivers.InternalError{}, nil) return drivers.BucketMetadata{}, iodine.New(drivers.InternalError{}, nil)
} }
if !drivers.IsValidBucket(bucketName) || strings.Contains(bucketName, ".") { if !drivers.IsValidBucket(bucketName) {
return drivers.BucketMetadata{}, drivers.BucketNameInvalid{Bucket: bucketName} return drivers.BucketMetadata{}, drivers.BucketNameInvalid{Bucket: bucketName}
} }
if d.storedBuckets[bucketName].bucketMetadata.Name != "" {
return d.storedBuckets[bucketName].bucketMetadata, nil
}
metadata, err := d.donut.GetBucketMetadata(bucketName) metadata, err := d.donut.GetBucketMetadata(bucketName)
if err != nil { if err != nil {
return drivers.BucketMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucketName}, nil) return drivers.BucketMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucketName}, nil)
@ -213,11 +285,14 @@ func (d donutDriver) SetBucketMetadata(bucketName, acl string) error {
if err != nil { if err != nil {
return iodine.New(drivers.BucketNotFound{Bucket: bucketName}, nil) return iodine.New(drivers.BucketNotFound{Bucket: bucketName}, nil)
} }
storedBucket := d.storedBuckets[bucketName]
storedBucket.bucketMetadata.ACL = drivers.BucketACL(acl)
d.storedBuckets[bucketName] = storedBucket
return nil return nil
} }
// GetObject retrieves an object and writes it to a writer // GetObject retrieves an object and writes it to a writer
func (d donutDriver) GetObject(target io.Writer, bucketName, objectName string) (int64, error) { func (d donutDriver) GetObject(w io.Writer, bucketName, objectName string) (int64, error) {
d.lock.RLock() d.lock.RLock()
defer d.lock.RUnlock() defer d.lock.RUnlock()
if d.donut == nil { if d.donut == nil {
@ -229,6 +304,12 @@ func (d donutDriver) GetObject(target io.Writer, bucketName, objectName string)
if !drivers.IsValidObjectName(objectName) || strings.TrimSpace(objectName) == "" { if !drivers.IsValidObjectName(objectName) || strings.TrimSpace(objectName) == "" {
return 0, iodine.New(drivers.ObjectNameInvalid{Object: objectName}, nil) return 0, iodine.New(drivers.ObjectNameInvalid{Object: objectName}, nil)
} }
if _, ok := d.storedBuckets[bucketName]; ok == false {
return 0, iodine.New(drivers.BucketNotFound{Bucket: bucketName}, nil)
}
objectKey := bucketName + "/" + objectName
data, ok := d.objects.Get(objectKey)
if !ok {
reader, size, err := d.donut.GetObject(bucketName, objectName) reader, size, err := d.donut.GetObject(bucketName, objectName)
if err != nil { if err != nil {
return 0, iodine.New(drivers.ObjectNotFound{ return 0, iodine.New(drivers.ObjectNotFound{
@ -236,8 +317,17 @@ func (d donutDriver) GetObject(target io.Writer, bucketName, objectName string)
Object: objectName, Object: objectName,
}, nil) }, nil)
} }
n, err := io.CopyN(target, reader, size) n, err := io.CopyN(w, reader, size)
return n, iodine.New(err, nil) if err != nil {
return 0, iodine.New(err, nil)
}
return n, nil
}
written, err := io.Copy(w, bytes.NewBuffer(data))
if err != nil {
return 0, iodine.New(err, nil)
}
return written, nil
} }
// GetPartialObject retrieves an object range and writes it to a writer // GetPartialObject retrieves an object range and writes it to a writer
@ -265,6 +355,12 @@ func (d donutDriver) GetPartialObject(w io.Writer, bucketName, objectName string
Length: length, Length: length,
}, errParams) }, errParams)
} }
if _, ok := d.storedBuckets[bucketName]; ok == false {
return 0, iodine.New(drivers.BucketNotFound{Bucket: bucketName}, nil)
}
objectKey := bucketName + "/" + objectName
data, ok := d.objects.Get(objectKey)
if !ok {
reader, size, err := d.donut.GetObject(bucketName, objectName) reader, size, err := d.donut.GetObject(bucketName, objectName)
if err != nil { if err != nil {
return 0, iodine.New(drivers.ObjectNotFound{ return 0, iodine.New(drivers.ObjectNotFound{
@ -288,6 +384,9 @@ func (d donutDriver) GetPartialObject(w io.Writer, bucketName, objectName string
return 0, iodine.New(err, errParams) return 0, iodine.New(err, errParams)
} }
return n, nil return n, nil
}
written, err := io.CopyN(w, bytes.NewBuffer(data[start:]), length)
return written, iodine.New(err, nil)
} }
// GetObjectMetadata retrieves an object's metadata // GetObjectMetadata retrieves an object's metadata
@ -302,12 +401,20 @@ func (d donutDriver) GetObjectMetadata(bucketName, objectName string) (drivers.O
if d.donut == nil { if d.donut == nil {
return drivers.ObjectMetadata{}, iodine.New(drivers.InternalError{}, errParams) return drivers.ObjectMetadata{}, iodine.New(drivers.InternalError{}, errParams)
} }
if !drivers.IsValidBucket(bucketName) || strings.Contains(bucketName, ".") { if !drivers.IsValidBucket(bucketName) {
return drivers.ObjectMetadata{}, iodine.New(drivers.BucketNameInvalid{Bucket: bucketName}, errParams) return drivers.ObjectMetadata{}, iodine.New(drivers.BucketNameInvalid{Bucket: bucketName}, errParams)
} }
if !drivers.IsValidObjectName(objectName) || strings.TrimSpace(objectName) == "" { if !drivers.IsValidObjectName(objectName) {
return drivers.ObjectMetadata{}, iodine.New(drivers.ObjectNameInvalid{Object: objectName}, errParams) return drivers.ObjectMetadata{}, iodine.New(drivers.ObjectNameInvalid{Object: objectName}, errParams)
} }
if _, ok := d.storedBuckets[bucketName]; !ok {
return drivers.ObjectMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucketName}, nil)
}
storedBucket := d.storedBuckets[bucketName]
objectKey := bucketName + "/" + objectName
if object, ok := storedBucket.objectMetadata[objectKey]; ok {
return object, nil
}
metadata, err := d.donut.GetObjectMetadata(bucketName, objectName) metadata, err := d.donut.GetObjectMetadata(bucketName, objectName)
if err != nil { if err != nil {
return drivers.ObjectMetadata{}, iodine.New(drivers.ObjectNotFound{ return drivers.ObjectMetadata{}, iodine.New(drivers.ObjectNotFound{
@ -370,6 +477,28 @@ func (d donutDriver) ListObjects(bucketName string, resources drivers.BucketReso
return results, resources, nil return results, resources, nil
} }
type proxyReader struct {
io.Reader
readBytes []byte
}
func (r *proxyReader) Read(p []byte) (n int, err error) {
n, err = r.Reader.Read(p)
if err == io.EOF || err == io.ErrUnexpectedEOF {
r.readBytes = append(r.readBytes, p[0:n]...)
return
}
if err != nil {
return
}
r.readBytes = append(r.readBytes, p[0:n]...)
return
}
func newProxyReader(r io.Reader) *proxyReader {
return &proxyReader{r, nil}
}
// CreateObject creates a new object // CreateObject creates a new object
func (d donutDriver) CreateObject(bucketName, objectName, contentType, expectedMD5Sum string, size int64, reader io.Reader) (string, error) { func (d donutDriver) CreateObject(bucketName, objectName, contentType, expectedMD5Sum string, size int64, reader io.Reader) (string, error) {
d.lock.Lock() d.lock.Lock()
@ -379,56 +508,73 @@ func (d donutDriver) CreateObject(bucketName, objectName, contentType, expectedM
"objectName": objectName, "objectName": objectName,
"contentType": contentType, "contentType": contentType,
} }
if size > int64(d.maxSize) {
generic := drivers.GenericObjectError{Bucket: bucketName, Object: objectName}
return "", iodine.New(drivers.EntityTooLarge{
GenericObjectError: generic,
Size: strconv.FormatInt(size, 10),
MaxSize: strconv.FormatUint(d.maxSize, 10),
}, nil)
}
if d.donut == nil { if d.donut == nil {
return "", iodine.New(drivers.InternalError{}, errParams) return "", iodine.New(drivers.InternalError{}, errParams)
} }
if !drivers.IsValidBucket(bucketName) || strings.Contains(bucketName, ".") { if !drivers.IsValidBucket(bucketName) {
return "", iodine.New(drivers.BucketNameInvalid{Bucket: bucketName}, nil) return "", iodine.New(drivers.BucketNameInvalid{Bucket: bucketName}, nil)
} }
if !drivers.IsValidObjectName(objectName) || strings.TrimSpace(objectName) == "" { if !drivers.IsValidObjectName(objectName) {
return "", iodine.New(drivers.ObjectNameInvalid{Object: objectName}, nil) return "", iodine.New(drivers.ObjectNameInvalid{Object: objectName}, nil)
} }
storedBucket := d.storedBuckets[bucketName]
// get object key
objectKey := bucketName + "/" + objectName
if _, ok := storedBucket.objectMetadata[objectKey]; ok == true {
return "", iodine.New(drivers.ObjectExists{Bucket: bucketName, Object: objectName}, nil)
}
if strings.TrimSpace(contentType) == "" { if strings.TrimSpace(contentType) == "" {
contentType = "application/octet-stream" contentType = "application/octet-stream"
} }
metadata := make(map[string]string) metadata := make(map[string]string)
metadata["contentType"] = strings.TrimSpace(contentType) metadata["contentType"] = strings.TrimSpace(contentType)
metadata["contentLength"] = strconv.FormatInt(size, 10) metadata["contentLength"] = strconv.FormatInt(size, 10)
if strings.TrimSpace(expectedMD5Sum) != "" { if strings.TrimSpace(expectedMD5Sum) != "" {
expectedMD5SumBytes, err := base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum)) expectedMD5SumBytes, err := base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum))
if err != nil { if err != nil {
return "", iodine.New(err, nil) return "", iodine.New(drivers.InvalidDigest{Md5: expectedMD5Sum}, nil)
} }
expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes) expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes)
} }
calculatedMD5Sum, err := d.donut.PutObject(bucketName, objectName, expectedMD5Sum, ioutil.NopCloser(reader), metadata) newReader := newProxyReader(reader)
calculatedMD5Sum, err := d.donut.PutObject(bucketName, objectName, expectedMD5Sum, newReader, metadata)
if err != nil { if err != nil {
switch iodine.ToError(err).(type) {
case donut.BadDigest:
return "", iodine.New(drivers.BadDigest{Md5: expectedMD5Sum, Bucket: bucketName, Key: objectName}, nil)
}
return "", iodine.New(err, errParams) return "", iodine.New(err, errParams)
} }
// get object key
ok := d.objects.Set(objectKey, newReader.readBytes)
// setting up for de-allocation
newReader.readBytes = nil
go debug.FreeOSMemory()
if !ok {
return "", iodine.New(drivers.InternalError{}, nil)
}
objectMetadata, err := d.donut.GetObjectMetadata(bucketName, objectName)
if err != nil {
return "", iodine.New(err, nil)
}
newObject := drivers.ObjectMetadata{
Bucket: bucketName,
Key: objectName,
ContentType: objectMetadata.Metadata["contentType"],
Created: objectMetadata.Created,
Md5: calculatedMD5Sum,
Size: objectMetadata.Size,
}
storedBucket.objectMetadata[objectKey] = newObject
d.storedBuckets[bucketName] = storedBucket
return calculatedMD5Sum, nil return calculatedMD5Sum, nil
} }
func (d donutDriver) ListMultipartUploads(bucket string, resources drivers.BucketMultipartResourcesMetadata) (drivers.BucketMultipartResourcesMetadata, error) {
return drivers.BucketMultipartResourcesMetadata{}, iodine.New(drivers.APINotImplemented{API: "ListMultipartUploads"}, nil)
}
func (d donutDriver) NewMultipartUpload(bucket, key, contentType string) (string, error) {
return "", iodine.New(drivers.APINotImplemented{API: "NewMultipartUpload"}, nil)
}
func (d donutDriver) CreateObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) {
return "", iodine.New(drivers.APINotImplemented{API: "CreateObjectPart"}, nil)
}
func (d donutDriver) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (string, error) {
return "", iodine.New(drivers.APINotImplemented{API: "CompleteMultipartUpload"}, nil)
}
func (d donutDriver) ListObjectParts(bucket, key string, resources drivers.ObjectResourcesMetadata) (drivers.ObjectResourcesMetadata, error) {
return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.APINotImplemented{API: "ListObjectParts"}, nil)
}
func (d donutDriver) AbortMultipartUpload(bucket, key, uploadID string) error {
return iodine.New(drivers.APINotImplemented{API: "AbortMultipartUpload"}, nil)
}

View File

@ -20,6 +20,7 @@ import (
"io/ioutil" "io/ioutil"
"os" "os"
"testing" "testing"
"time"
. "github.com/minio/check" . "github.com/minio/check"
"github.com/minio/minio/pkg/storage/drivers" "github.com/minio/minio/pkg/storage/drivers"
@ -39,7 +40,7 @@ func (s *MySuite) TestAPISuite(c *C) {
c.Check(err, IsNil) c.Check(err, IsNil)
storageList = append(storageList, p) storageList = append(storageList, p)
paths = append(paths, p) paths = append(paths, p)
_, _, store := Start(paths) _, _, store := Start(paths, 1000000, 3*time.Hour)
return store return store
} }
drivers.APITestSuite(c, create) drivers.APITestSuite(c, create)

View File

@ -0,0 +1,419 @@
/*
* Minimalist Object Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package donut
import (
"bytes"
"crypto/md5"
"crypto/sha512"
"encoding/base64"
"encoding/hex"
"errors"
"io"
"math/rand"
"runtime/debug"
"sort"
"strconv"
"strings"
"time"
"github.com/minio/minio/pkg/iodine"
"github.com/minio/minio/pkg/storage/drivers"
)
// isMD5SumEqual - returns error if md5sum mismatches, success its `nil`
func isMD5SumEqual(expectedMD5Sum, actualMD5Sum string) error {
if strings.TrimSpace(expectedMD5Sum) != "" && strings.TrimSpace(actualMD5Sum) != "" {
expectedMD5SumBytes, err := hex.DecodeString(expectedMD5Sum)
if err != nil {
return iodine.New(err, nil)
}
actualMD5SumBytes, err := hex.DecodeString(actualMD5Sum)
if err != nil {
return iodine.New(err, nil)
}
if !bytes.Equal(expectedMD5SumBytes, actualMD5SumBytes) {
return iodine.New(errors.New("bad digest, md5sum mismatch"), nil)
}
return nil
}
return iodine.New(errors.New("invalid argument"), nil)
}
func (d donutDriver) NewMultipartUpload(bucket, key, contentType string) (string, error) {
d.lock.RLock()
if !drivers.IsValidBucket(bucket) {
d.lock.RUnlock()
return "", iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, nil)
}
if !drivers.IsValidObjectName(key) {
d.lock.RUnlock()
return "", iodine.New(drivers.ObjectNameInvalid{Object: key}, nil)
}
if _, ok := d.storedBuckets[bucket]; ok == false {
d.lock.RUnlock()
return "", iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil)
}
storedBucket := d.storedBuckets[bucket]
objectKey := bucket + "/" + key
if _, ok := storedBucket.objectMetadata[objectKey]; ok == true {
d.lock.RUnlock()
return "", iodine.New(drivers.ObjectExists{Bucket: bucket, Object: key}, nil)
}
d.lock.RUnlock()
d.lock.Lock()
id := []byte(strconv.FormatInt(rand.Int63(), 10) + bucket + key + time.Now().String())
uploadIDSum := sha512.Sum512(id)
uploadID := base64.URLEncoding.EncodeToString(uploadIDSum[:])[:47]
d.storedBuckets[bucket].multiPartSession[key] = multiPartSession{
uploadID: uploadID,
initiated: time.Now(),
totalParts: 0,
}
d.lock.Unlock()
return uploadID, nil
}
func (d donutDriver) AbortMultipartUpload(bucket, key, uploadID string) error {
d.lock.RLock()
storedBucket := d.storedBuckets[bucket]
if storedBucket.multiPartSession[key].uploadID != uploadID {
d.lock.RUnlock()
return iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil)
}
d.lock.RUnlock()
d.cleanupMultiparts(bucket, key, uploadID)
d.cleanupMultipartSession(bucket, key, uploadID)
return nil
}
func getMultipartKey(key string, uploadID string, partNumber int) string {
return key + "?uploadId=" + uploadID + "&partNumber=" + strconv.Itoa(partNumber)
}
func (d donutDriver) CreateObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) {
// Verify upload id
d.lock.RLock()
storedBucket := d.storedBuckets[bucket]
if storedBucket.multiPartSession[key].uploadID != uploadID {
d.lock.RUnlock()
return "", iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil)
}
d.lock.RUnlock()
etag, err := d.createObjectPart(bucket, key, uploadID, partID, "", expectedMD5Sum, size, data)
if err != nil {
return "", iodine.New(err, nil)
}
// free
debug.FreeOSMemory()
return etag, nil
}
// createObject - PUT object to memory buffer
func (d donutDriver) createObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) {
d.lock.RLock()
if !drivers.IsValidBucket(bucket) {
d.lock.RUnlock()
return "", iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, nil)
}
if !drivers.IsValidObjectName(key) {
d.lock.RUnlock()
return "", iodine.New(drivers.ObjectNameInvalid{Object: key}, nil)
}
if _, ok := d.storedBuckets[bucket]; ok == false {
d.lock.RUnlock()
return "", iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil)
}
storedBucket := d.storedBuckets[bucket]
// get object key
partKey := bucket + "/" + getMultipartKey(key, uploadID, partID)
if _, ok := storedBucket.partMetadata[partKey]; ok == true {
d.lock.RUnlock()
return storedBucket.partMetadata[partKey].ETag, nil
}
d.lock.RUnlock()
if contentType == "" {
contentType = "application/octet-stream"
}
contentType = strings.TrimSpace(contentType)
if strings.TrimSpace(expectedMD5Sum) != "" {
expectedMD5SumBytes, err := base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum))
if err != nil {
// pro-actively close the connection
return "", iodine.New(drivers.InvalidDigest{Md5: expectedMD5Sum}, nil)
}
expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes)
}
// calculate md5
hash := md5.New()
var readBytes []byte
var err error
var length int
for err == nil {
byteBuffer := make([]byte, 1024*1024)
length, err = data.Read(byteBuffer)
// While hash.Write() wouldn't mind a Nil byteBuffer
// It is necessary for us to verify this and break
if length == 0 {
break
}
hash.Write(byteBuffer[0:length])
readBytes = append(readBytes, byteBuffer[0:length]...)
}
if err != io.EOF {
return "", iodine.New(err, nil)
}
go debug.FreeOSMemory()
md5SumBytes := hash.Sum(nil)
totalLength := int64(len(readBytes))
d.lock.Lock()
d.multiPartObjects.Set(partKey, readBytes)
d.lock.Unlock()
// setting up for de-allocation
readBytes = nil
md5Sum := hex.EncodeToString(md5SumBytes)
// Verify if the written object is equal to what is expected, only if it is requested as such
if strings.TrimSpace(expectedMD5Sum) != "" {
if err := isMD5SumEqual(strings.TrimSpace(expectedMD5Sum), md5Sum); err != nil {
return "", iodine.New(drivers.BadDigest{Md5: expectedMD5Sum, Bucket: bucket, Key: key}, nil)
}
}
newPart := drivers.PartMetadata{
PartNumber: partID,
LastModified: time.Now().UTC(),
ETag: md5Sum,
Size: totalLength,
}
d.lock.Lock()
storedBucket.partMetadata[partKey] = newPart
multiPartSession := storedBucket.multiPartSession[key]
multiPartSession.totalParts++
storedBucket.multiPartSession[key] = multiPartSession
d.storedBuckets[bucket] = storedBucket
d.lock.Unlock()
return md5Sum, nil
}
func (d donutDriver) cleanupMultipartSession(bucket, key, uploadID string) {
d.lock.Lock()
defer d.lock.Unlock()
delete(d.storedBuckets[bucket].multiPartSession, key)
}
func (d donutDriver) cleanupMultiparts(bucket, key, uploadID string) {
for i := 1; i <= d.storedBuckets[bucket].multiPartSession[key].totalParts; i++ {
objectKey := bucket + "/" + getMultipartKey(key, uploadID, i)
d.multiPartObjects.Delete(objectKey)
}
}
func (d donutDriver) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (string, error) {
if !drivers.IsValidBucket(bucket) {
return "", iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, nil)
}
if !drivers.IsValidObjectName(key) {
return "", iodine.New(drivers.ObjectNameInvalid{Object: key}, nil)
}
// Verify upload id
d.lock.RLock()
if _, ok := d.storedBuckets[bucket]; ok == false {
d.lock.RUnlock()
return "", iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil)
}
storedBucket := d.storedBuckets[bucket]
if storedBucket.multiPartSession[key].uploadID != uploadID {
d.lock.RUnlock()
return "", iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil)
}
d.lock.RUnlock()
d.lock.Lock()
var size int64
var fullObject bytes.Buffer
for i := 1; i <= len(parts); i++ {
recvMD5 := parts[i]
object, ok := d.multiPartObjects.Get(bucket + "/" + getMultipartKey(key, uploadID, i))
if ok == false {
d.lock.Unlock()
return "", iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil)
}
size += int64(len(object))
calcMD5Bytes := md5.Sum(object)
// complete multi part request header md5sum per part is hex encoded
recvMD5Bytes, err := hex.DecodeString(strings.Trim(recvMD5, "\""))
if err != nil {
return "", iodine.New(drivers.InvalidDigest{Md5: recvMD5}, nil)
}
if !bytes.Equal(recvMD5Bytes, calcMD5Bytes[:]) {
return "", iodine.New(drivers.BadDigest{Md5: recvMD5, Bucket: bucket, Key: getMultipartKey(key, uploadID, i)}, nil)
}
_, err = io.Copy(&fullObject, bytes.NewBuffer(object))
if err != nil {
return "", iodine.New(err, nil)
}
object = nil
go debug.FreeOSMemory()
}
d.lock.Unlock()
md5sumSlice := md5.Sum(fullObject.Bytes())
// this is needed for final verification inside CreateObject, do not convert this to hex
md5sum := base64.StdEncoding.EncodeToString(md5sumSlice[:])
etag, err := d.CreateObject(bucket, key, "", md5sum, size, &fullObject)
if err != nil {
// No need to call internal cleanup functions here, caller will call AbortMultipartUpload()
// which would in-turn cleanup properly in accordance with S3 Spec
return "", iodine.New(err, nil)
}
fullObject.Reset()
d.cleanupMultiparts(bucket, key, uploadID)
d.cleanupMultipartSession(bucket, key, uploadID)
return etag, nil
}
// byKey is a sortable interface for UploadMetadata slice
type byKey []*drivers.UploadMetadata
func (a byKey) Len() int { return len(a) }
func (a byKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
func (d donutDriver) ListMultipartUploads(bucket string, resources drivers.BucketMultipartResourcesMetadata) (drivers.BucketMultipartResourcesMetadata, error) {
// TODO handle delimiter
d.lock.RLock()
defer d.lock.RUnlock()
if _, ok := d.storedBuckets[bucket]; ok == false {
return drivers.BucketMultipartResourcesMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil)
}
storedBucket := d.storedBuckets[bucket]
var uploads []*drivers.UploadMetadata
for key, session := range storedBucket.multiPartSession {
if strings.HasPrefix(key, resources.Prefix) {
if len(uploads) > resources.MaxUploads {
sort.Sort(byKey(uploads))
resources.Upload = uploads
resources.NextKeyMarker = key
resources.NextUploadIDMarker = session.uploadID
resources.IsTruncated = true
return resources, nil
}
// uploadIDMarker is ignored if KeyMarker is empty
switch {
case resources.KeyMarker != "" && resources.UploadIDMarker == "":
if key > resources.KeyMarker {
upload := new(drivers.UploadMetadata)
upload.Key = key
upload.UploadID = session.uploadID
upload.Initiated = session.initiated
uploads = append(uploads, upload)
}
case resources.KeyMarker != "" && resources.UploadIDMarker != "":
if session.uploadID > resources.UploadIDMarker {
if key >= resources.KeyMarker {
upload := new(drivers.UploadMetadata)
upload.Key = key
upload.UploadID = session.uploadID
upload.Initiated = session.initiated
uploads = append(uploads, upload)
}
}
default:
upload := new(drivers.UploadMetadata)
upload.Key = key
upload.UploadID = session.uploadID
upload.Initiated = session.initiated
uploads = append(uploads, upload)
}
}
}
sort.Sort(byKey(uploads))
resources.Upload = uploads
return resources, nil
}
// partNumber is a sortable interface for Part slice
type partNumber []*drivers.PartMetadata
func (a partNumber) Len() int { return len(a) }
func (a partNumber) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a partNumber) Less(i, j int) bool { return a[i].PartNumber < a[j].PartNumber }
func (d donutDriver) ListObjectParts(bucket, key string, resources drivers.ObjectResourcesMetadata) (drivers.ObjectResourcesMetadata, error) {
// Verify upload id
d.lock.RLock()
defer d.lock.RUnlock()
if _, ok := d.storedBuckets[bucket]; ok == false {
return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil)
}
storedBucket := d.storedBuckets[bucket]
if _, ok := storedBucket.multiPartSession[key]; ok == false {
return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.ObjectNotFound{Bucket: bucket, Object: key}, nil)
}
if storedBucket.multiPartSession[key].uploadID != resources.UploadID {
return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.InvalidUploadID{UploadID: resources.UploadID}, nil)
}
objectResourcesMetadata := resources
objectResourcesMetadata.Bucket = bucket
objectResourcesMetadata.Key = key
var parts []*drivers.PartMetadata
var startPartNumber int
switch {
case objectResourcesMetadata.PartNumberMarker == 0:
startPartNumber = 1
default:
startPartNumber = objectResourcesMetadata.PartNumberMarker
}
for i := startPartNumber; i <= storedBucket.multiPartSession[key].totalParts; i++ {
if len(parts) > objectResourcesMetadata.MaxParts {
sort.Sort(partNumber(parts))
objectResourcesMetadata.IsTruncated = true
objectResourcesMetadata.Part = parts
objectResourcesMetadata.NextPartNumberMarker = i
return objectResourcesMetadata, nil
}
part, ok := storedBucket.partMetadata[bucket+"/"+getMultipartKey(key, resources.UploadID, i)]
if !ok {
return drivers.ObjectResourcesMetadata{}, iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil)
}
parts = append(parts, &part)
}
sort.Sort(partNumber(parts))
objectResourcesMetadata.Part = parts
return objectResourcesMetadata, nil
}
func (d donutDriver) expiredPart(a ...interface{}) {
key := a[0].(string)
// loop through all buckets
for _, storedBucket := range d.storedBuckets {
delete(storedBucket.partMetadata, key)
}
debug.FreeOSMemory()
}

View File

@ -321,7 +321,7 @@ func (memory *memoryDriver) createObject(bucket, key, contentType, expectedMD5Su
func (memory *memoryDriver) CreateBucket(bucketName, acl string) error { func (memory *memoryDriver) CreateBucket(bucketName, acl string) error {
memory.lock.RLock() memory.lock.RLock()
if len(memory.storedBuckets) == totalBuckets { if len(memory.storedBuckets) == totalBuckets {
memory.lock.RLock() memory.lock.RUnlock()
return iodine.New(drivers.TooManyBuckets{Bucket: bucketName}, nil) return iodine.New(drivers.TooManyBuckets{Bucket: bucketName}, nil)
} }
if !drivers.IsValidBucket(bucketName) { if !drivers.IsValidBucket(bucketName) {