mirror of
https://github.com/minio/minio.git
synced 2025-02-03 18:06:00 -05:00
More donut, cache, api cleanup
This commit is contained in:
parent
dc0df3dc0e
commit
335c7827eb
@ -67,7 +67,7 @@ func runServer(c *cli.Context) {
|
||||
cli.ShowCommandHelpAndExit(c, "server", 1) // last argument is exit code
|
||||
}
|
||||
apiServerConfig := getAPIServerConfig(c)
|
||||
if err := api.Start(apiServerConfig); err != nil {
|
||||
if err := api.StartServer(apiServerConfig); err != nil {
|
||||
Fatalln(err)
|
||||
}
|
||||
}
|
||||
|
2
main.go
2
main.go
@ -71,7 +71,7 @@ func init() {
|
||||
}
|
||||
}
|
||||
|
||||
func getAPIServerConfig(c *cli.Context) httpserver.Config {
|
||||
func getAPIServerConfig(c *cli.Context) api.Config {
|
||||
certFile := c.GlobalString("cert")
|
||||
keyFile := c.GlobalString("key")
|
||||
if (certFile != "" && keyFile == "") || (certFile == "" && keyFile != "") {
|
||||
|
@ -81,12 +81,12 @@ func generateListObjectsResponse(bucket string, objects []donut.ObjectMetadata,
|
||||
|
||||
for _, object := range objects {
|
||||
var content = &Object{}
|
||||
if object.Key == "" {
|
||||
if object.Object == "" {
|
||||
continue
|
||||
}
|
||||
content.Key = object.Key
|
||||
content.Key = object.Object
|
||||
content.LastModified = object.Created.Format(iso8601Format)
|
||||
content.ETag = "\"" + object.Md5 + "\""
|
||||
content.ETag = "\"" + object.MD5Sum + "\""
|
||||
content.Size = object.Size
|
||||
content.StorageClass = "STANDARD"
|
||||
content.Owner = owner
|
||||
|
@ -65,16 +65,16 @@ func encodeErrorResponse(response interface{}, acceptsType contentType) []byte {
|
||||
func setObjectHeaders(w http.ResponseWriter, metadata donut.ObjectMetadata) {
|
||||
lastModified := metadata.Created.Format(http.TimeFormat)
|
||||
// common headers
|
||||
setCommonHeaders(w, metadata.ContentType, int(metadata.Size))
|
||||
setCommonHeaders(w, metadata.Metadata["contentType"], int(metadata.Size))
|
||||
// object related headers
|
||||
w.Header().Set("ETag", "\""+metadata.Md5+"\"")
|
||||
w.Header().Set("ETag", "\""+metadata.MD5Sum+"\"")
|
||||
w.Header().Set("Last-Modified", lastModified)
|
||||
}
|
||||
|
||||
// Write range object header
|
||||
func setRangeObjectHeaders(w http.ResponseWriter, metadata donut.ObjectMetadata, contentRange *httpRange) {
|
||||
// set common headers
|
||||
setCommonHeaders(w, metadata.ContentType, int(metadata.Size))
|
||||
setCommonHeaders(w, metadata.Metadata["contentType"], int(metadata.Size))
|
||||
// set object headers
|
||||
setObjectHeaders(w, metadata)
|
||||
// set content range
|
||||
|
@ -73,7 +73,7 @@ func newBucket(bucketName, aclType, donutName string, nodes map[string]node) (bu
|
||||
metadata := BucketMetadata{}
|
||||
metadata.Version = bucketMetadataVersion
|
||||
metadata.Name = bucketName
|
||||
metadata.ACL = aclType
|
||||
metadata.ACL = BucketACL(aclType)
|
||||
metadata.Created = t
|
||||
metadata.Metadata = make(map[string]string)
|
||||
metadata.BucketObjects = make(map[string]interface{})
|
||||
|
@ -34,7 +34,8 @@ import (
|
||||
"github.com/minio/minio/pkg/iodine"
|
||||
)
|
||||
|
||||
func (cache donut) NewMultipartUpload(bucket, key, contentType string) (string, error) {
|
||||
// NewMultipartUpload -
|
||||
func (cache Cache) NewMultipartUpload(bucket, key, contentType string) (string, error) {
|
||||
cache.lock.RLock()
|
||||
if !IsValidBucket(bucket) {
|
||||
cache.lock.RUnlock()
|
||||
@ -52,7 +53,7 @@ func (cache donut) NewMultipartUpload(bucket, key, contentType string) (string,
|
||||
objectKey := bucket + "/" + key
|
||||
if _, ok := storedBucket.objectMetadata[objectKey]; ok == true {
|
||||
cache.lock.RUnlock()
|
||||
return "", iodine.New(ObjectExists{Bucket: bucket, Object: key}, nil)
|
||||
return "", iodine.New(ObjectExists{Object: key}, nil)
|
||||
}
|
||||
cache.lock.RUnlock()
|
||||
|
||||
@ -71,7 +72,8 @@ func (cache donut) NewMultipartUpload(bucket, key, contentType string) (string,
|
||||
return uploadID, nil
|
||||
}
|
||||
|
||||
func (cache donut) AbortMultipartUpload(bucket, key, uploadID string) error {
|
||||
// AbortMultipartUpload -
|
||||
func (cache Cache) AbortMultipartUpload(bucket, key, uploadID string) error {
|
||||
cache.lock.RLock()
|
||||
storedBucket := cache.storedBuckets[bucket]
|
||||
if storedBucket.multiPartSession[key].uploadID != uploadID {
|
||||
@ -89,7 +91,8 @@ func getMultipartKey(key string, uploadID string, partNumber int) string {
|
||||
return key + "?uploadId=" + uploadID + "&partNumber=" + strconv.Itoa(partNumber)
|
||||
}
|
||||
|
||||
func (cache donut) CreateObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) {
|
||||
// CreateObjectPart -
|
||||
func (cache Cache) CreateObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) {
|
||||
// Verify upload id
|
||||
cache.lock.RLock()
|
||||
storedBucket := cache.storedBuckets[bucket]
|
||||
@ -109,7 +112,7 @@ func (cache donut) CreateObjectPart(bucket, key, uploadID string, partID int, co
|
||||
}
|
||||
|
||||
// createObject - PUT object to cache buffer
|
||||
func (cache donut) createObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) {
|
||||
func (cache Cache) createObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) {
|
||||
cache.lock.RLock()
|
||||
if !IsValidBucket(bucket) {
|
||||
cache.lock.RUnlock()
|
||||
@ -179,7 +182,7 @@ func (cache donut) createObjectPart(bucket, key, uploadID string, partID int, co
|
||||
// Verify if the written object is equal to what is expected, only if it is requested as such
|
||||
if strings.TrimSpace(expectedMD5Sum) != "" {
|
||||
if err := isMD5SumEqual(strings.TrimSpace(expectedMD5Sum), md5Sum); err != nil {
|
||||
return "", iodine.New(BadDigest{Md5: expectedMD5Sum, Bucket: bucket, Key: key}, nil)
|
||||
return "", iodine.New(BadDigest{}, nil)
|
||||
}
|
||||
}
|
||||
newPart := PartMetadata{
|
||||
@ -200,20 +203,21 @@ func (cache donut) createObjectPart(bucket, key, uploadID string, partID int, co
|
||||
return md5Sum, nil
|
||||
}
|
||||
|
||||
func (cache donut) cleanupMultipartSession(bucket, key, uploadID string) {
|
||||
func (cache Cache) cleanupMultipartSession(bucket, key, uploadID string) {
|
||||
cache.lock.Lock()
|
||||
defer cache.lock.Unlock()
|
||||
delete(cache.storedBuckets[bucket].multiPartSession, key)
|
||||
}
|
||||
|
||||
func (cache donut) cleanupMultiparts(bucket, key, uploadID string) {
|
||||
func (cache Cache) cleanupMultiparts(bucket, key, uploadID string) {
|
||||
for i := 1; i <= cache.storedBuckets[bucket].multiPartSession[key].totalParts; i++ {
|
||||
objectKey := bucket + "/" + getMultipartKey(key, uploadID, i)
|
||||
cache.multiPartObjects.Delete(objectKey)
|
||||
}
|
||||
}
|
||||
|
||||
func (cache donut) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (string, error) {
|
||||
// CompleteMultipartUpload -
|
||||
func (cache Cache) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (string, error) {
|
||||
if !IsValidBucket(bucket) {
|
||||
return "", iodine.New(BucketNameInvalid{Bucket: bucket}, nil)
|
||||
}
|
||||
@ -251,7 +255,7 @@ func (cache donut) CompleteMultipartUpload(bucket, key, uploadID string, parts m
|
||||
return "", iodine.New(InvalidDigest{Md5: recvMD5}, nil)
|
||||
}
|
||||
if !bytes.Equal(recvMD5Bytes, calcMD5Bytes[:]) {
|
||||
return "", iodine.New(BadDigest{Md5: recvMD5, Bucket: bucket, Key: getMultipartKey(key, uploadID, i)}, nil)
|
||||
return "", iodine.New(BadDigest{}, nil)
|
||||
}
|
||||
_, err = io.Copy(&fullObject, bytes.NewBuffer(object))
|
||||
if err != nil {
|
||||
@ -284,7 +288,8 @@ func (a byKey) Len() int { return len(a) }
|
||||
func (a byKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
func (a byKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
|
||||
|
||||
func (cache donut) ListMultipartUploads(bucket string, resources BucketMultipartResourcesMetadata) (BucketMultipartResourcesMetadata, error) {
|
||||
// ListMultipartUploads -
|
||||
func (cache Cache) ListMultipartUploads(bucket string, resources BucketMultipartResourcesMetadata) (BucketMultipartResourcesMetadata, error) {
|
||||
// TODO handle delimiter
|
||||
cache.lock.RLock()
|
||||
defer cache.lock.RUnlock()
|
||||
@ -345,7 +350,8 @@ func (a partNumber) Len() int { return len(a) }
|
||||
func (a partNumber) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
func (a partNumber) Less(i, j int) bool { return a[i].PartNumber < a[j].PartNumber }
|
||||
|
||||
func (cache donut) ListObjectParts(bucket, key string, resources ObjectResourcesMetadata) (ObjectResourcesMetadata, error) {
|
||||
// ListObjectParts -
|
||||
func (cache Cache) ListObjectParts(bucket, key string, resources ObjectResourcesMetadata) (ObjectResourcesMetadata, error) {
|
||||
// Verify upload id
|
||||
cache.lock.RLock()
|
||||
defer cache.lock.RUnlock()
|
||||
@ -354,7 +360,7 @@ func (cache donut) ListObjectParts(bucket, key string, resources ObjectResources
|
||||
}
|
||||
storedBucket := cache.storedBuckets[bucket]
|
||||
if _, ok := storedBucket.multiPartSession[key]; ok == false {
|
||||
return ObjectResourcesMetadata{}, iodine.New(ObjectNotFound{Bucket: bucket, Object: key}, nil)
|
||||
return ObjectResourcesMetadata{}, iodine.New(ObjectNotFound{Object: key}, nil)
|
||||
}
|
||||
if storedBucket.multiPartSession[key].uploadID != resources.UploadID {
|
||||
return ObjectResourcesMetadata{}, iodine.New(InvalidUploadID{UploadID: resources.UploadID}, nil)
|
||||
@ -389,7 +395,7 @@ func (cache donut) ListObjectParts(bucket, key string, resources ObjectResources
|
||||
return objectResourcesMetadata, nil
|
||||
}
|
||||
|
||||
func (cache donut) expiredPart(a ...interface{}) {
|
||||
func (cache Cache) expiredPart(a ...interface{}) {
|
||||
key := a[0].(string)
|
||||
// loop through all buckets
|
||||
for _, storedBucket := range cache.storedBuckets {
|
||||
|
@ -24,14 +24,17 @@ import (
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"runtime/debug"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/minio/minio/pkg/iodine"
|
||||
"github.com/minio/minio/pkg/storage/donut/trove"
|
||||
)
|
||||
|
||||
// total Number of buckets allowed
|
||||
@ -39,6 +42,32 @@ const (
|
||||
totalBuckets = 100
|
||||
)
|
||||
|
||||
// Cache - local variables
|
||||
type Cache struct {
|
||||
storedBuckets map[string]storedBucket
|
||||
lock *sync.RWMutex
|
||||
objects *trove.Cache
|
||||
multiPartObjects *trove.Cache
|
||||
maxSize uint64
|
||||
expiration time.Duration
|
||||
donut Donut
|
||||
}
|
||||
|
||||
// storedBucket saved bucket
|
||||
type storedBucket struct {
|
||||
bucketMetadata BucketMetadata
|
||||
objectMetadata map[string]ObjectMetadata
|
||||
partMetadata map[string]PartMetadata
|
||||
multiPartSession map[string]multiPartSession
|
||||
}
|
||||
|
||||
// multiPartSession multipart session
|
||||
type multiPartSession struct {
|
||||
totalParts int
|
||||
uploadID string
|
||||
initiated time.Time
|
||||
}
|
||||
|
||||
type proxyWriter struct {
|
||||
writer io.Writer
|
||||
writtenBytes []byte
|
||||
@ -57,8 +86,23 @@ func newProxyWriter(w io.Writer) *proxyWriter {
|
||||
return &proxyWriter{writer: w, writtenBytes: nil}
|
||||
}
|
||||
|
||||
// NewCache new cache
|
||||
func NewCache(maxSize uint64, expiration time.Duration, donutName string, nodeDiskMap map[string][]string) Cache {
|
||||
c := Cache{}
|
||||
c.storedBuckets = make(map[string]storedBucket)
|
||||
c.objects = trove.NewCache(maxSize, expiration)
|
||||
c.multiPartObjects = trove.NewCache(0, time.Duration(0))
|
||||
c.objects.OnExpired = c.expiredObject
|
||||
c.multiPartObjects.OnExpired = c.expiredPart
|
||||
|
||||
// set up cache expiration
|
||||
c.objects.ExpireObjects(time.Second * 5)
|
||||
c.donut, _ = NewDonut(donutName, nodeDiskMap)
|
||||
return c
|
||||
}
|
||||
|
||||
// GetObject - GET object from cache buffer
|
||||
func (cache donut) GetObject(w io.Writer, bucket string, object string) (int64, error) {
|
||||
func (cache Cache) GetObject(w io.Writer, bucket string, object string) (int64, error) {
|
||||
cache.lock.RLock()
|
||||
defer cache.lock.RUnlock()
|
||||
if !IsValidBucket(bucket) {
|
||||
@ -73,10 +117,18 @@ func (cache donut) GetObject(w io.Writer, bucket string, object string) (int64,
|
||||
objectKey := bucket + "/" + object
|
||||
data, ok := cache.objects.Get(objectKey)
|
||||
if !ok {
|
||||
if cache.driver != nil {
|
||||
return cache.driver.GetObject(w, bucket, object)
|
||||
if cache.donut != nil {
|
||||
reader, size, err := cache.donut.GetObject(bucket, object)
|
||||
if err != nil {
|
||||
return 0, iodine.New(err, nil)
|
||||
}
|
||||
return 0, iodine.New(ObjectNotFound{Bucket: bucket, Object: object}, nil)
|
||||
written, err := io.CopyN(w, reader, size)
|
||||
if err != nil {
|
||||
return 0, iodine.New(err, nil)
|
||||
}
|
||||
return written, nil
|
||||
}
|
||||
return 0, iodine.New(ObjectNotFound{Object: object}, nil)
|
||||
}
|
||||
written, err := io.Copy(w, bytes.NewBuffer(data))
|
||||
if err != nil {
|
||||
@ -86,7 +138,7 @@ func (cache donut) GetObject(w io.Writer, bucket string, object string) (int64,
|
||||
}
|
||||
|
||||
// GetPartialObject - GET object from cache buffer range
|
||||
func (cache donut) GetPartialObject(w io.Writer, bucket, object string, start, length int64) (int64, error) {
|
||||
func (cache Cache) GetPartialObject(w io.Writer, bucket, object string, start, length int64) (int64, error) {
|
||||
errParams := map[string]string{
|
||||
"bucket": bucket,
|
||||
"object": object,
|
||||
@ -110,10 +162,21 @@ func (cache donut) GetPartialObject(w io.Writer, bucket, object string, start, l
|
||||
objectKey := bucket + "/" + object
|
||||
data, ok := cache.objects.Get(objectKey)
|
||||
if !ok {
|
||||
if cache.driver != nil {
|
||||
return cache.driver.GetPartialObject(w, bucket, object, start, length)
|
||||
if cache.donut != nil {
|
||||
reader, _, err := cache.donut.GetObject(bucket, object)
|
||||
if err != nil {
|
||||
return 0, iodine.New(err, nil)
|
||||
}
|
||||
return 0, iodine.New(ObjectNotFound{Bucket: bucket, Object: object}, nil)
|
||||
if _, err := io.CopyN(ioutil.Discard, reader, start); err != nil {
|
||||
return 0, iodine.New(err, nil)
|
||||
}
|
||||
written, err := io.CopyN(w, reader, length)
|
||||
if err != nil {
|
||||
return 0, iodine.New(err, nil)
|
||||
}
|
||||
return written, nil
|
||||
}
|
||||
return 0, iodine.New(ObjectNotFound{Object: object}, nil)
|
||||
}
|
||||
written, err := io.CopyN(w, bytes.NewBuffer(data[start:]), length)
|
||||
if err != nil {
|
||||
@ -123,18 +186,18 @@ func (cache donut) GetPartialObject(w io.Writer, bucket, object string, start, l
|
||||
}
|
||||
|
||||
// GetBucketMetadata -
|
||||
func (cache donut) GetBucketMetadata(bucket string) (BucketMetadata, error) {
|
||||
func (cache Cache) GetBucketMetadata(bucket string) (BucketMetadata, error) {
|
||||
cache.lock.RLock()
|
||||
if !IsValidBucket(bucket) {
|
||||
cache.lock.RUnlock()
|
||||
return BucketMetadata{}, iodine.New(BucketNameInvalid{Bucket: bucket}, nil)
|
||||
}
|
||||
if _, ok := cache.storedBuckets[bucket]; ok == false {
|
||||
if cache.driver == nil {
|
||||
if cache.donut == nil {
|
||||
cache.lock.RUnlock()
|
||||
return BucketMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, nil)
|
||||
}
|
||||
bucketMetadata, err := cache.driver.GetBucketMetadata(bucket)
|
||||
bucketMetadata, err := cache.donut.GetBucketMetadata(bucket)
|
||||
if err != nil {
|
||||
cache.lock.RUnlock()
|
||||
return BucketMetadata{}, iodine.New(err, nil)
|
||||
@ -151,7 +214,7 @@ func (cache donut) GetBucketMetadata(bucket string) (BucketMetadata, error) {
|
||||
}
|
||||
|
||||
// SetBucketMetadata -
|
||||
func (cache donut) SetBucketMetadata(bucket, acl string) error {
|
||||
func (cache Cache) SetBucketMetadata(bucket, acl string) error {
|
||||
cache.lock.RLock()
|
||||
if !IsValidBucket(bucket) {
|
||||
cache.lock.RUnlock()
|
||||
@ -166,8 +229,10 @@ func (cache donut) SetBucketMetadata(bucket, acl string) error {
|
||||
}
|
||||
cache.lock.RUnlock()
|
||||
cache.lock.Lock()
|
||||
if cache.driver != nil {
|
||||
if err := cache.driver.SetBucketMetadata(bucket, acl); err != nil {
|
||||
m := make(map[string]string)
|
||||
m["acl"] = acl
|
||||
if cache.donut != nil {
|
||||
if err := cache.donut.SetBucketMetadata(bucket, m); err != nil {
|
||||
return iodine.New(err, nil)
|
||||
}
|
||||
}
|
||||
@ -197,7 +262,8 @@ func isMD5SumEqual(expectedMD5Sum, actualMD5Sum string) error {
|
||||
return iodine.New(errors.New("invalid argument"), nil)
|
||||
}
|
||||
|
||||
func (cache donut) CreateObject(bucket, key, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) {
|
||||
// CreateObject -
|
||||
func (cache Cache) CreateObject(bucket, key, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) {
|
||||
if size > int64(cache.maxSize) {
|
||||
generic := GenericObjectError{Bucket: bucket, Object: key}
|
||||
return "", iodine.New(EntityTooLarge{
|
||||
@ -213,7 +279,7 @@ func (cache donut) CreateObject(bucket, key, contentType, expectedMD5Sum string,
|
||||
}
|
||||
|
||||
// createObject - PUT object to cache buffer
|
||||
func (cache donut) createObject(bucket, key, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) {
|
||||
func (cache Cache) createObject(bucket, key, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) {
|
||||
cache.lock.RLock()
|
||||
if !IsValidBucket(bucket) {
|
||||
cache.lock.RUnlock()
|
||||
@ -232,7 +298,7 @@ func (cache donut) createObject(bucket, key, contentType, expectedMD5Sum string,
|
||||
objectKey := bucket + "/" + key
|
||||
if _, ok := storedBucket.objectMetadata[objectKey]; ok == true {
|
||||
cache.lock.RUnlock()
|
||||
return "", iodine.New(ObjectExists{Bucket: bucket, Object: key}, nil)
|
||||
return "", iodine.New(ObjectExists{Object: key}, nil)
|
||||
}
|
||||
cache.lock.RUnlock()
|
||||
|
||||
@ -286,17 +352,19 @@ func (cache donut) createObject(bucket, key, contentType, expectedMD5Sum string,
|
||||
// Verify if the written object is equal to what is expected, only if it is requested as such
|
||||
if strings.TrimSpace(expectedMD5Sum) != "" {
|
||||
if err := isMD5SumEqual(strings.TrimSpace(expectedMD5Sum), md5Sum); err != nil {
|
||||
return "", iodine.New(BadDigest{Md5: expectedMD5Sum, Bucket: bucket, Key: key}, nil)
|
||||
return "", iodine.New(BadDigest{}, nil)
|
||||
}
|
||||
}
|
||||
|
||||
m := make(map[string]string)
|
||||
m["contentType"] = contentType
|
||||
newObject := ObjectMetadata{
|
||||
Bucket: bucket,
|
||||
Key: key,
|
||||
Object: key,
|
||||
|
||||
ContentType: contentType,
|
||||
Metadata: m,
|
||||
Created: time.Now().UTC(),
|
||||
Md5: md5Sum,
|
||||
MD5Sum: md5Sum,
|
||||
Size: int64(totalLength),
|
||||
}
|
||||
|
||||
@ -304,11 +372,11 @@ func (cache donut) createObject(bucket, key, contentType, expectedMD5Sum string,
|
||||
storedBucket.objectMetadata[objectKey] = newObject
|
||||
cache.storedBuckets[bucket] = storedBucket
|
||||
cache.lock.Unlock()
|
||||
return newObject.Md5, nil
|
||||
return newObject.MD5Sum, nil
|
||||
}
|
||||
|
||||
// CreateBucket - create bucket in cache
|
||||
func (cache donut) CreateBucket(bucketName, acl string) error {
|
||||
func (cache Cache) CreateBucket(bucketName, acl string) error {
|
||||
cache.lock.RLock()
|
||||
if len(cache.storedBuckets) == totalBuckets {
|
||||
cache.lock.RUnlock()
|
||||
@ -332,8 +400,8 @@ func (cache donut) CreateBucket(bucketName, acl string) error {
|
||||
// default is private
|
||||
acl = "private"
|
||||
}
|
||||
if cache.driver != nil {
|
||||
if err := cache.driver.CreateBucket(bucketName, acl); err != nil {
|
||||
if cache.donut != nil {
|
||||
if err := cache.donut.MakeBucket(bucketName, BucketACL(acl)); err != nil {
|
||||
return iodine.New(err, nil)
|
||||
}
|
||||
}
|
||||
@ -369,7 +437,7 @@ func appendUniq(slice []string, i string) []string {
|
||||
return append(slice, i)
|
||||
}
|
||||
|
||||
func (cache donut) filterDelimiterPrefix(keys []string, key, delim string, r BucketResourcesMetadata) ([]string, BucketResourcesMetadata) {
|
||||
func (cache Cache) filterDelimiterPrefix(keys []string, key, delim string, r BucketResourcesMetadata) ([]string, BucketResourcesMetadata) {
|
||||
switch true {
|
||||
case key == r.Prefix:
|
||||
keys = appendUniq(keys, key)
|
||||
@ -382,7 +450,7 @@ func (cache donut) filterDelimiterPrefix(keys []string, key, delim string, r Buc
|
||||
return keys, r
|
||||
}
|
||||
|
||||
func (cache donut) listObjects(keys []string, key string, r BucketResourcesMetadata) ([]string, BucketResourcesMetadata) {
|
||||
func (cache Cache) listObjects(keys []string, key string, r BucketResourcesMetadata) ([]string, BucketResourcesMetadata) {
|
||||
switch true {
|
||||
// Prefix absent, delimit object key based on delimiter
|
||||
case r.IsDelimiterSet():
|
||||
@ -411,7 +479,7 @@ func (cache donut) listObjects(keys []string, key string, r BucketResourcesMetad
|
||||
}
|
||||
|
||||
// ListObjects - list objects from cache
|
||||
func (cache donut) ListObjects(bucket string, resources BucketResourcesMetadata) ([]ObjectMetadata, BucketResourcesMetadata, error) {
|
||||
func (cache Cache) ListObjects(bucket string, resources BucketResourcesMetadata) ([]ObjectMetadata, BucketResourcesMetadata, error) {
|
||||
cache.lock.RLock()
|
||||
defer cache.lock.RUnlock()
|
||||
if !IsValidBucket(bucket) {
|
||||
@ -448,7 +516,7 @@ func (cache donut) ListObjects(bucket string, resources BucketResourcesMetadata)
|
||||
if len(results) == resources.Maxkeys {
|
||||
resources.IsTruncated = true
|
||||
if resources.IsTruncated && resources.IsDelimiterSet() {
|
||||
resources.NextMarker = results[len(results)-1].Key
|
||||
resources.NextMarker = results[len(results)-1].Object
|
||||
}
|
||||
return results, resources, nil
|
||||
}
|
||||
@ -466,7 +534,7 @@ func (b byBucketName) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
|
||||
func (b byBucketName) Less(i, j int) bool { return b[i].Name < b[j].Name }
|
||||
|
||||
// ListBuckets - List buckets from cache
|
||||
func (cache donut) ListBuckets() ([]BucketMetadata, error) {
|
||||
func (cache Cache) ListBuckets() ([]BucketMetadata, error) {
|
||||
cache.lock.RLock()
|
||||
defer cache.lock.RUnlock()
|
||||
var results []BucketMetadata
|
||||
@ -478,7 +546,7 @@ func (cache donut) ListBuckets() ([]BucketMetadata, error) {
|
||||
}
|
||||
|
||||
// GetObjectMetadata - get object metadata from cache
|
||||
func (cache donut) GetObjectMetadata(bucket, key string) (ObjectMetadata, error) {
|
||||
func (cache Cache) GetObjectMetadata(bucket, key string) (ObjectMetadata, error) {
|
||||
cache.lock.RLock()
|
||||
// check if bucket exists
|
||||
if !IsValidBucket(bucket) {
|
||||
@ -499,8 +567,8 @@ func (cache donut) GetObjectMetadata(bucket, key string) (ObjectMetadata, error)
|
||||
cache.lock.RUnlock()
|
||||
return objMetadata, nil
|
||||
}
|
||||
if cache.driver != nil {
|
||||
objMetadata, err := cache.driver.GetObjectMetadata(bucket, key)
|
||||
if cache.donut != nil {
|
||||
objMetadata, err := cache.donut.GetObjectMetadata(bucket, key)
|
||||
cache.lock.RUnlock()
|
||||
if err != nil {
|
||||
return ObjectMetadata{}, iodine.New(err, nil)
|
||||
@ -512,10 +580,10 @@ func (cache donut) GetObjectMetadata(bucket, key string) (ObjectMetadata, error)
|
||||
return objMetadata, nil
|
||||
}
|
||||
cache.lock.RUnlock()
|
||||
return ObjectMetadata{}, iodine.New(ObjectNotFound{Bucket: bucket, Object: key}, nil)
|
||||
return ObjectMetadata{}, iodine.New(ObjectNotFound{Object: key}, nil)
|
||||
}
|
||||
|
||||
func (cache donut) expiredObject(a ...interface{}) {
|
||||
func (cache Cache) expiredObject(a ...interface{}) {
|
||||
cacheStats := cache.objects.Stats()
|
||||
log.Printf("CurrentSize: %d, CurrentItems: %d, TotalExpirations: %d",
|
||||
cacheStats.Bytes, cacheStats.Items, cacheStats.Expired)
|
||||
|
@ -59,7 +59,7 @@ type AllBuckets struct {
|
||||
type BucketMetadata struct {
|
||||
Version string `json:"version"`
|
||||
Name string `json:"name"`
|
||||
ACL string `json:"acl"`
|
||||
ACL BucketACL `json:"acl"`
|
||||
Created time.Time `json:"created"`
|
||||
Metadata map[string]string `json:"metadata"`
|
||||
BucketObjects map[string]interface{} `json:"objects"`
|
||||
|
@ -25,10 +25,8 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/minio/minio/pkg/iodine"
|
||||
"github.com/minio/minio/pkg/storage/donut/trove"
|
||||
)
|
||||
|
||||
// donut struct internal data
|
||||
@ -37,32 +35,6 @@ type donut struct {
|
||||
buckets map[string]bucket
|
||||
nodes map[string]node
|
||||
lock *sync.RWMutex
|
||||
cache cache
|
||||
}
|
||||
|
||||
// cache - local variables
|
||||
type cache struct {
|
||||
storedBuckets map[string]storedBucket
|
||||
lock *sync.RWMutex
|
||||
objects *trove.Cache
|
||||
multiPartObjects *trove.Cache
|
||||
maxSize uint64
|
||||
expiration time.Duration
|
||||
}
|
||||
|
||||
// storedBucket saved bucket
|
||||
type storedBucket struct {
|
||||
bucketMetadata BucketMetadata
|
||||
objectMetadata map[string]ObjectMetadata
|
||||
partMetadata map[string]PartMetadata
|
||||
multiPartSession map[string]multiPartSession
|
||||
}
|
||||
|
||||
// multiPartSession multipart session
|
||||
type multiPartSession struct {
|
||||
totalParts int
|
||||
uploadID string
|
||||
initiated time.Time
|
||||
}
|
||||
|
||||
// config files used inside Donut
|
||||
@ -110,26 +82,17 @@ func NewDonut(donutName string, nodeDiskMap map[string][]string) (Donut, error)
|
||||
return nil, iodine.New(err, nil)
|
||||
}
|
||||
}
|
||||
d.cache.storedBuckets = make(map[string]storedBucket)
|
||||
d.cache.objects = trove.NewCache(maxSize, expiration)
|
||||
d.cache.multiPartObjects = trove.NewCache(0, time.Duration(0))
|
||||
|
||||
d.cache.objects.OnExpired = d.expiredObject
|
||||
d.cache.multiPartObjects.OnExpired = d.expiredPart
|
||||
|
||||
// set up cache expiration
|
||||
d.cache.objects.ExpireObjects(time.Second * 5)
|
||||
return d, nil
|
||||
}
|
||||
|
||||
// MakeBucket - make a new bucket
|
||||
func (dt donut) MakeBucket(bucket, acl string) error {
|
||||
func (dt donut) MakeBucket(bucket string, acl BucketACL) error {
|
||||
dt.lock.Lock()
|
||||
defer dt.lock.Unlock()
|
||||
if bucket == "" || strings.TrimSpace(bucket) == "" {
|
||||
return iodine.New(InvalidArgument{}, nil)
|
||||
}
|
||||
return dt.makeDonutBucket(bucket, acl)
|
||||
return dt.makeDonutBucket(bucket, acl.String())
|
||||
}
|
||||
|
||||
// GetBucketMetadata - get bucket metadata
|
||||
@ -165,7 +128,7 @@ func (dt donut) SetBucketMetadata(bucketName string, bucketMetadata map[string]s
|
||||
if !ok {
|
||||
return iodine.New(InvalidArgument{}, nil)
|
||||
}
|
||||
oldBucketMetadata.ACL = acl
|
||||
oldBucketMetadata.ACL = BucketACL(acl)
|
||||
metadata.Buckets[bucketName] = oldBucketMetadata
|
||||
return dt.setDonutBucketMetadata(metadata)
|
||||
}
|
||||
|
@ -89,7 +89,7 @@ func (s *MySuite) TestEmptyBucket(c *C) {
|
||||
donut, err := NewDonut("test", createTestNodeDiskMap(root))
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
c.Assert(donut.MakeBucket("foo", "private"), IsNil)
|
||||
c.Assert(donut.MakeBucket("foo", BucketACL("private")), IsNil)
|
||||
// check if bucket is empty
|
||||
listObjects, err := donut.ListObjects("foo", "", "", "", 1)
|
||||
c.Assert(err, IsNil)
|
||||
@ -106,14 +106,14 @@ func (s *MySuite) TestMakeBucketAndList(c *C) {
|
||||
donut, err := NewDonut("test", createTestNodeDiskMap(root))
|
||||
c.Assert(err, IsNil)
|
||||
// create bucket
|
||||
err = donut.MakeBucket("foo", "private")
|
||||
err = donut.MakeBucket("foo", BucketACL("private"))
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
// check bucket exists
|
||||
buckets, err := donut.ListBuckets()
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(len(buckets), Equals, 1)
|
||||
c.Assert(buckets["foo"].ACL, Equals, "private")
|
||||
c.Assert(buckets["foo"].ACL, Equals, BucketACL("private"))
|
||||
}
|
||||
|
||||
// test re-create bucket
|
||||
@ -123,10 +123,10 @@ func (s *MySuite) TestMakeBucketWithSameNameFails(c *C) {
|
||||
defer os.RemoveAll(root)
|
||||
donut, err := NewDonut("test", createTestNodeDiskMap(root))
|
||||
c.Assert(err, IsNil)
|
||||
err = donut.MakeBucket("foo", "private")
|
||||
err = donut.MakeBucket("foo", BucketACL("private"))
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
err = donut.MakeBucket("foo", "private")
|
||||
err = donut.MakeBucket("foo", BucketACL("private"))
|
||||
c.Assert(err, Not(IsNil))
|
||||
}
|
||||
|
||||
@ -138,10 +138,10 @@ func (s *MySuite) TestCreateMultipleBucketsAndList(c *C) {
|
||||
donut, err := NewDonut("test", createTestNodeDiskMap(root))
|
||||
c.Assert(err, IsNil)
|
||||
// add a second bucket
|
||||
err = donut.MakeBucket("foo", "private")
|
||||
err = donut.MakeBucket("foo", BucketACL("private"))
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
err = donut.MakeBucket("bar", "private")
|
||||
err = donut.MakeBucket("bar", BucketACL("private"))
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
buckets, err := donut.ListBuckets()
|
||||
@ -152,7 +152,7 @@ func (s *MySuite) TestCreateMultipleBucketsAndList(c *C) {
|
||||
_, ok = buckets["bar"]
|
||||
c.Assert(ok, Equals, true)
|
||||
|
||||
err = donut.MakeBucket("foobar", "private")
|
||||
err = donut.MakeBucket("foobar", BucketACL("private"))
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
buckets, err = donut.ListBuckets()
|
||||
@ -264,7 +264,7 @@ func (s *MySuite) TestMultipleNewObjects(c *C) {
|
||||
donut, err := NewDonut("test", createTestNodeDiskMap(root))
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
c.Assert(donut.MakeBucket("foo", "private"), IsNil)
|
||||
c.Assert(donut.MakeBucket("foo", BucketACL("private")), IsNil)
|
||||
|
||||
one := ioutil.NopCloser(bytes.NewReader([]byte("one")))
|
||||
metadata := make(map[string]string)
|
||||
|
@ -32,7 +32,7 @@ type ObjectStorage interface {
|
||||
GetBucketMetadata(bucket string) (BucketMetadata, error)
|
||||
SetBucketMetadata(bucket string, metadata map[string]string) error
|
||||
ListBuckets() (map[string]BucketMetadata, error)
|
||||
MakeBucket(bucket, acl string) error
|
||||
MakeBucket(bucket string, acl BucketACL) error
|
||||
|
||||
// Bucket operations
|
||||
ListObjects(bucket, prefix, marker, delim string, maxKeys int) (ListObjects, error)
|
||||
|
@ -36,24 +36,6 @@ func (b BucketACL) IsPublicReadWrite() bool {
|
||||
return b == BucketACL("public-read-write")
|
||||
}
|
||||
|
||||
// BucketMetadata - name and create date
|
||||
type BucketMetadata struct {
|
||||
Name string
|
||||
Created time.Time
|
||||
ACL BucketACL
|
||||
}
|
||||
|
||||
// ObjectMetadata - object key and its relevant metadata
|
||||
type ObjectMetadata struct {
|
||||
Bucket string
|
||||
Key string
|
||||
|
||||
ContentType string
|
||||
Created time.Time
|
||||
Md5 string
|
||||
Size int64
|
||||
}
|
||||
|
||||
// FilterMode type
|
||||
type FilterMode int
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user