From 82dcbf262dd4b7570ea57713f02015eda18875b7 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Thu, 25 Jun 2015 10:43:36 -0700 Subject: [PATCH] Add simple locking for donut API for now - fixes #671 --- pkg/storage/donut/bucket.go | 25 +- pkg/storage/donut/donut.go | 395 ++++++++++++++++++++++++++++ pkg/storage/donut/objectstorage.go | 397 ----------------------------- pkg/storage/drivers/donut/donut.go | 23 +- 4 files changed, 434 insertions(+), 406 deletions(-) delete mode 100644 pkg/storage/donut/objectstorage.go diff --git a/pkg/storage/donut/bucket.go b/pkg/storage/donut/bucket.go index 1fc2dc01b..2f9c9e70f 100644 --- a/pkg/storage/donut/bucket.go +++ b/pkg/storage/donut/bucket.go @@ -24,6 +24,7 @@ import ( "path/filepath" "strconv" "strings" + "sync" "time" "crypto/md5" @@ -41,7 +42,7 @@ type bucket struct { time time.Time donutName string nodes map[string]Node - objects map[string]object + lock *sync.RWMutex } // newBucket - instantiate a new bucket @@ -63,14 +64,17 @@ func newBucket(bucketName, aclType, donutName string, nodes map[string]Node) (bu b.acl = aclType b.time = t b.donutName = donutName - b.objects = make(map[string]object) b.nodes = nodes + b.lock = new(sync.RWMutex) return b, bucketMetadata, nil } // ListObjects - list all objects func (b bucket) ListObjects() (map[string]object, error) { + b.lock.RLock() + defer b.lock.RUnlock() nodeSlice := 0 + objects := make(map[string]object) for _, node := range b.nodes { disks, err := node.ListDisks() if err != nil { @@ -79,12 +83,12 @@ func (b bucket) ListObjects() (map[string]object, error) { for order, disk := range disks { bucketSlice := fmt.Sprintf("%s$%d$%d", b.name, nodeSlice, order) bucketPath := filepath.Join(b.donutName, bucketSlice) - objects, err := disk.ListDir(bucketPath) + files, err := disk.ListDir(bucketPath) if err != nil { return nil, iodine.New(err, nil) } - for _, object := range objects { - newObject, err := newObject(object.Name(), filepath.Join(disk.GetPath(), bucketPath)) + for _, file := range files { + newObject, err := newObject(file.Name(), filepath.Join(disk.GetPath(), bucketPath)) if err != nil { return nil, iodine.New(err, nil) } @@ -94,18 +98,20 @@ func (b bucket) ListObjects() (map[string]object, error) { } objectName, ok := newObjectMetadata["object"] if !ok { - return nil, iodine.New(ObjectCorrupted{Object: object.Name()}, nil) + return nil, iodine.New(ObjectCorrupted{Object: newObject.name}, nil) } - b.objects[objectName] = newObject + objects[objectName] = newObject } } nodeSlice = nodeSlice + 1 } - return b.objects, nil + return objects, nil } // ReadObject - open an object to read func (b bucket) ReadObject(objectName string) (reader io.ReadCloser, size int64, err error) { + b.lock.RLock() + defer b.lock.RUnlock() reader, writer := io.Pipe() // get list of objects objects, err := b.ListObjects() @@ -141,6 +147,9 @@ func (b bucket) ReadObject(objectName string) (reader io.ReadCloser, size int64, // WriteObject - write a new object into bucket func (b bucket) WriteObject(objectName string, objectData io.Reader, expectedMD5Sum string, metadata map[string]string) (string, error) { + b.lock.Lock() + defer b.lock.Unlock() + if objectName == "" || objectData == nil { return "", iodine.New(InvalidArgument{}, nil) } diff --git a/pkg/storage/donut/donut.go b/pkg/storage/donut/donut.go index cbdcb0093..eec08de9d 100644 --- a/pkg/storage/donut/donut.go +++ b/pkg/storage/donut/donut.go @@ -17,6 +17,16 @@ package donut import ( + "encoding/json" + "fmt" + "io" + "os" + "path/filepath" + "sort" + "strconv" + "strings" + "sync" + "github.com/minio/minio/pkg/iodine" "github.com/minio/minio/pkg/storage/donut/disk" ) @@ -26,6 +36,7 @@ type donut struct { name string buckets map[string]bucket nodes map[string]Node + lock *sync.RWMutex } // config files used inside Donut @@ -81,6 +92,7 @@ func NewDonut(donutName string, nodeDiskMap map[string][]string) (Donut, error) name: donutName, nodes: nodes, buckets: buckets, + lock: new(sync.RWMutex), } for k, v := range nodeDiskMap { if len(v) == 0 { @@ -93,3 +105,386 @@ func NewDonut(donutName string, nodeDiskMap map[string][]string) (Donut, error) } return d, nil } + +// MakeBucket - make a new bucket +func (d donut) MakeBucket(bucket, acl string) error { + d.lock.Lock() + defer d.lock.Unlock() + if bucket == "" || strings.TrimSpace(bucket) == "" { + return iodine.New(InvalidArgument{}, nil) + } + return d.makeDonutBucket(bucket, acl) +} + +// GetBucketMetadata - get bucket metadata +func (d donut) GetBucketMetadata(bucket string) (map[string]string, error) { + d.lock.RLock() + defer d.lock.RUnlock() + err := d.getDonutBuckets() + if err != nil { + return nil, iodine.New(err, nil) + } + if _, ok := d.buckets[bucket]; !ok { + return nil, iodine.New(BucketNotFound{Bucket: bucket}, nil) + } + metadata, err := d.getDonutBucketMetadata() + if err != nil { + return nil, iodine.New(err, nil) + } + return metadata[bucket], nil +} + +// SetBucketMetadata - set bucket metadata +func (d donut) SetBucketMetadata(bucket string, bucketMetadata map[string]string) error { + d.lock.Lock() + defer d.lock.Unlock() + err := d.getDonutBuckets() + if err != nil { + return iodine.New(err, nil) + } + metadata, err := d.getDonutBucketMetadata() + if err != nil { + return iodine.New(err, nil) + } + oldBucketMetadata := metadata[bucket] + // TODO ignore rest of the keys for now, only mutable data is "acl" + oldBucketMetadata["acl"] = bucketMetadata["acl"] + metadata[bucket] = oldBucketMetadata + return d.setDonutBucketMetadata(metadata) +} + +// ListBuckets - return list of buckets +func (d donut) ListBuckets() (metadata map[string]map[string]string, err error) { + d.lock.RLock() + defer d.lock.RUnlock() + err = d.getDonutBuckets() + if err != nil { + return nil, iodine.New(err, nil) + } + dummyMetadata := make(map[string]map[string]string) + metadata, err = d.getDonutBucketMetadata() + if err != nil { + // intentionally left out the error when Donut is empty + // but we need to revisit this area in future - since we need + // to figure out between acceptable and unacceptable errors + return dummyMetadata, nil + } + return metadata, nil +} + +// ListObjects - return list of objects +func (d donut) ListObjects(bucket, prefix, marker, delimiter string, maxkeys int) ([]string, []string, bool, error) { + d.lock.RLock() + defer d.lock.RUnlock() + errParams := map[string]string{ + "bucket": bucket, + "prefix": prefix, + "marker": marker, + "delimiter": delimiter, + "maxkeys": strconv.Itoa(maxkeys), + } + err := d.getDonutBuckets() + if err != nil { + return nil, nil, false, iodine.New(err, errParams) + } + if _, ok := d.buckets[bucket]; !ok { + return nil, nil, false, iodine.New(BucketNotFound{Bucket: bucket}, errParams) + } + objectList, err := d.buckets[bucket].ListObjects() + if err != nil { + return nil, nil, false, iodine.New(err, errParams) + } + var donutObjects []string + for objectName := range objectList { + donutObjects = append(donutObjects, objectName) + } + if maxkeys <= 0 { + maxkeys = 1000 + } + if strings.TrimSpace(prefix) != "" { + donutObjects = filterPrefix(donutObjects, prefix) + donutObjects = removePrefix(donutObjects, prefix) + } + + var actualObjects []string + var actualPrefixes []string + var isTruncated bool + if strings.TrimSpace(delimiter) != "" { + actualObjects = filterDelimited(donutObjects, delimiter) + actualPrefixes = filterNotDelimited(donutObjects, delimiter) + actualPrefixes = extractDir(actualPrefixes, delimiter) + actualPrefixes = uniqueObjects(actualPrefixes) + } else { + actualObjects = donutObjects + } + + sort.Strings(actualObjects) + var newActualObjects []string + switch { + case marker != "": + for _, objectName := range actualObjects { + if objectName > marker { + newActualObjects = append(newActualObjects, objectName) + } + } + default: + newActualObjects = actualObjects + } + + var results []string + var commonPrefixes []string + for _, objectName := range newActualObjects { + if len(results) >= maxkeys { + isTruncated = true + break + } + results = appendUniq(results, prefix+objectName) + } + for _, commonPrefix := range actualPrefixes { + commonPrefixes = appendUniq(commonPrefixes, prefix+commonPrefix) + } + sort.Strings(results) + sort.Strings(commonPrefixes) + return results, commonPrefixes, isTruncated, nil +} + +// PutObject - put object +func (d donut) PutObject(bucket, object, expectedMD5Sum string, reader io.ReadCloser, metadata map[string]string) (string, error) { + d.lock.Lock() + defer d.lock.Unlock() + errParams := map[string]string{ + "bucket": bucket, + "object": object, + } + if bucket == "" || strings.TrimSpace(bucket) == "" { + return "", iodine.New(InvalidArgument{}, errParams) + } + if object == "" || strings.TrimSpace(object) == "" { + return "", iodine.New(InvalidArgument{}, errParams) + } + err := d.getDonutBuckets() + if err != nil { + return "", iodine.New(err, errParams) + } + if _, ok := d.buckets[bucket]; !ok { + return "", iodine.New(BucketNotFound{Bucket: bucket}, nil) + } + objectList, err := d.buckets[bucket].ListObjects() + if err != nil { + return "", iodine.New(err, nil) + } + for objectName := range objectList { + if objectName == object { + return "", iodine.New(ObjectExists{Object: object}, nil) + } + } + md5sum, err := d.buckets[bucket].WriteObject(object, reader, expectedMD5Sum, metadata) + if err != nil { + return "", iodine.New(err, errParams) + } + return md5sum, nil +} + +// GetObject - get object +func (d donut) GetObject(bucket, object string) (reader io.ReadCloser, size int64, err error) { + d.lock.RLock() + defer d.lock.RUnlock() + errParams := map[string]string{ + "bucket": bucket, + "object": object, + } + if bucket == "" || strings.TrimSpace(bucket) == "" { + return nil, 0, iodine.New(InvalidArgument{}, errParams) + } + if object == "" || strings.TrimSpace(object) == "" { + return nil, 0, iodine.New(InvalidArgument{}, errParams) + } + err = d.getDonutBuckets() + if err != nil { + return nil, 0, iodine.New(err, nil) + } + if _, ok := d.buckets[bucket]; !ok { + return nil, 0, iodine.New(BucketNotFound{Bucket: bucket}, errParams) + } + return d.buckets[bucket].ReadObject(object) +} + +// GetObjectMetadata - get object metadata +func (d donut) GetObjectMetadata(bucket, object string) (map[string]string, error) { + d.lock.RLock() + defer d.lock.RUnlock() + errParams := map[string]string{ + "bucket": bucket, + "object": object, + } + err := d.getDonutBuckets() + if err != nil { + return nil, iodine.New(err, errParams) + } + if _, ok := d.buckets[bucket]; !ok { + return nil, iodine.New(BucketNotFound{Bucket: bucket}, errParams) + } + objectList, err := d.buckets[bucket].ListObjects() + if err != nil { + return nil, iodine.New(err, errParams) + } + donutObject, ok := objectList[object] + if !ok { + return nil, iodine.New(ObjectNotFound{Object: object}, errParams) + } + return donutObject.GetObjectMetadata() +} + +// getDiskWriters - +func (d donut) getBucketMetadataWriters() ([]io.WriteCloser, error) { + var writers []io.WriteCloser + for _, node := range d.nodes { + disks, err := node.ListDisks() + if err != nil { + return nil, iodine.New(err, nil) + } + writers = make([]io.WriteCloser, len(disks)) + for order, disk := range disks { + bucketMetaDataWriter, err := disk.CreateFile(filepath.Join(d.name, bucketMetadataConfig)) + if err != nil { + return nil, iodine.New(err, nil) + } + writers[order] = bucketMetaDataWriter + } + } + return writers, nil +} + +func (d donut) getBucketMetadataReaders() ([]io.ReadCloser, error) { + var readers []io.ReadCloser + for _, node := range d.nodes { + disks, err := node.ListDisks() + if err != nil { + return nil, iodine.New(err, nil) + } + readers = make([]io.ReadCloser, len(disks)) + for order, disk := range disks { + bucketMetaDataReader, err := disk.OpenFile(filepath.Join(d.name, bucketMetadataConfig)) + if err != nil { + return nil, iodine.New(err, nil) + } + readers[order] = bucketMetaDataReader + } + } + return readers, nil +} + +// +func (d donut) setDonutBucketMetadata(metadata map[string]map[string]string) error { + writers, err := d.getBucketMetadataWriters() + if err != nil { + return iodine.New(err, nil) + } + for _, writer := range writers { + defer writer.Close() + } + for _, writer := range writers { + jenc := json.NewEncoder(writer) + if err := jenc.Encode(metadata); err != nil { + return iodine.New(err, nil) + } + } + return nil +} + +func (d donut) getDonutBucketMetadata() (map[string]map[string]string, error) { + metadata := make(map[string]map[string]string) + readers, err := d.getBucketMetadataReaders() + if err != nil { + return nil, iodine.New(err, nil) + } + for _, reader := range readers { + defer reader.Close() + } + for _, reader := range readers { + jenc := json.NewDecoder(reader) + if err := jenc.Decode(&metadata); err != nil { + return nil, iodine.New(err, nil) + } + } + return metadata, nil +} + +func (d donut) makeDonutBucket(bucketName, acl string) error { + err := d.getDonutBuckets() + if err != nil { + return iodine.New(err, nil) + } + if _, ok := d.buckets[bucketName]; ok { + return iodine.New(BucketExists{Bucket: bucketName}, nil) + } + bucket, bucketMetadata, err := newBucket(bucketName, acl, d.name, d.nodes) + if err != nil { + return iodine.New(err, nil) + } + nodeNumber := 0 + d.buckets[bucketName] = bucket + for _, node := range d.nodes { + disks, err := node.ListDisks() + if err != nil { + return iodine.New(err, nil) + } + for order, disk := range disks { + bucketSlice := fmt.Sprintf("%s$%d$%d", bucketName, nodeNumber, order) + err := disk.MakeDir(filepath.Join(d.name, bucketSlice)) + if err != nil { + return iodine.New(err, nil) + } + } + nodeNumber = nodeNumber + 1 + } + metadata, err := d.getDonutBucketMetadata() + if err != nil { + err = iodine.ToError(err) + if os.IsNotExist(err) { + metadata := make(map[string]map[string]string) + metadata[bucketName] = bucketMetadata + err = d.setDonutBucketMetadata(metadata) + if err != nil { + return iodine.New(err, nil) + } + return nil + } + return iodine.New(err, nil) + } + metadata[bucketName] = bucketMetadata + err = d.setDonutBucketMetadata(metadata) + if err != nil { + return iodine.New(err, nil) + } + return nil +} + +func (d donut) getDonutBuckets() error { + for _, node := range d.nodes { + disks, err := node.ListDisks() + if err != nil { + return iodine.New(err, nil) + } + for _, disk := range disks { + dirs, err := disk.ListDir(d.name) + if err != nil { + return iodine.New(err, nil) + } + for _, dir := range dirs { + splitDir := strings.Split(dir.Name(), "$") + if len(splitDir) < 3 { + return iodine.New(CorruptedBackend{Backend: dir.Name()}, nil) + } + bucketName := splitDir[0] + // we dont need this NewBucket once we cache from makeDonutBucket() + bucket, _, err := newBucket(bucketName, "private", d.name, d.nodes) + if err != nil { + return iodine.New(err, nil) + } + d.buckets[bucketName] = bucket + } + } + } + return nil +} diff --git a/pkg/storage/donut/objectstorage.go b/pkg/storage/donut/objectstorage.go deleted file mode 100644 index b27929291..000000000 --- a/pkg/storage/donut/objectstorage.go +++ /dev/null @@ -1,397 +0,0 @@ -/* - * Minimalist Object Storage, (C) 2015 Minio, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package donut - -import ( - "encoding/json" - "fmt" - "io" - "os" - "path/filepath" - "sort" - "strconv" - "strings" - - "github.com/minio/minio/pkg/iodine" -) - -// MakeBucket - make a new bucket -func (d donut) MakeBucket(bucket, acl string) error { - if bucket == "" || strings.TrimSpace(bucket) == "" { - return iodine.New(InvalidArgument{}, nil) - } - return d.makeDonutBucket(bucket, acl) -} - -// GetBucketMetadata - get bucket metadata -func (d donut) GetBucketMetadata(bucket string) (map[string]string, error) { - err := d.getDonutBuckets() - if err != nil { - return nil, iodine.New(err, nil) - } - if _, ok := d.buckets[bucket]; !ok { - return nil, iodine.New(BucketNotFound{Bucket: bucket}, nil) - } - metadata, err := d.getDonutBucketMetadata() - if err != nil { - return nil, iodine.New(err, nil) - } - return metadata[bucket], nil -} - -// SetBucketMetadata - set bucket metadata -func (d donut) SetBucketMetadata(bucket string, bucketMetadata map[string]string) error { - err := d.getDonutBuckets() - if err != nil { - return iodine.New(err, nil) - } - metadata, err := d.getDonutBucketMetadata() - if err != nil { - return iodine.New(err, nil) - } - oldBucketMetadata := metadata[bucket] - // TODO ignore rest of the keys for now, only mutable data is "acl" - oldBucketMetadata["acl"] = bucketMetadata["acl"] - metadata[bucket] = oldBucketMetadata - return d.setDonutBucketMetadata(metadata) -} - -// ListBuckets - return list of buckets -func (d donut) ListBuckets() (metadata map[string]map[string]string, err error) { - err = d.getDonutBuckets() - if err != nil { - return nil, iodine.New(err, nil) - } - dummyMetadata := make(map[string]map[string]string) - metadata, err = d.getDonutBucketMetadata() - if err != nil { - // intentionally left out the error when Donut is empty - // but we need to revisit this area in future - since we need - // to figure out between acceptable and unacceptable errors - return dummyMetadata, nil - } - return metadata, nil -} - -// ListObjects - return list of objects -func (d donut) ListObjects(bucket, prefix, marker, delimiter string, maxkeys int) ([]string, []string, bool, error) { - errParams := map[string]string{ - "bucket": bucket, - "prefix": prefix, - "marker": marker, - "delimiter": delimiter, - "maxkeys": strconv.Itoa(maxkeys), - } - err := d.getDonutBuckets() - if err != nil { - return nil, nil, false, iodine.New(err, errParams) - } - if _, ok := d.buckets[bucket]; !ok { - return nil, nil, false, iodine.New(BucketNotFound{Bucket: bucket}, errParams) - } - objectList, err := d.buckets[bucket].ListObjects() - if err != nil { - return nil, nil, false, iodine.New(err, errParams) - } - var donutObjects []string - for objectName := range objectList { - donutObjects = append(donutObjects, objectName) - } - if maxkeys <= 0 { - maxkeys = 1000 - } - if strings.TrimSpace(prefix) != "" { - donutObjects = filterPrefix(donutObjects, prefix) - donutObjects = removePrefix(donutObjects, prefix) - } - - var actualObjects []string - var actualPrefixes []string - var isTruncated bool - if strings.TrimSpace(delimiter) != "" { - actualObjects = filterDelimited(donutObjects, delimiter) - actualPrefixes = filterNotDelimited(donutObjects, delimiter) - actualPrefixes = extractDir(actualPrefixes, delimiter) - actualPrefixes = uniqueObjects(actualPrefixes) - } else { - actualObjects = donutObjects - } - - sort.Strings(actualObjects) - var newActualObjects []string - switch { - case marker != "": - for _, objectName := range actualObjects { - if objectName > marker { - newActualObjects = append(newActualObjects, objectName) - } - } - default: - newActualObjects = actualObjects - } - - var results []string - var commonPrefixes []string - for _, objectName := range newActualObjects { - if len(results) >= maxkeys { - isTruncated = true - break - } - results = appendUniq(results, prefix+objectName) - } - for _, commonPrefix := range actualPrefixes { - commonPrefixes = appendUniq(commonPrefixes, prefix+commonPrefix) - } - sort.Strings(results) - sort.Strings(commonPrefixes) - return results, commonPrefixes, isTruncated, nil -} - -// PutObject - put object -func (d donut) PutObject(bucket, object, expectedMD5Sum string, reader io.ReadCloser, metadata map[string]string) (string, error) { - errParams := map[string]string{ - "bucket": bucket, - "object": object, - } - if bucket == "" || strings.TrimSpace(bucket) == "" { - return "", iodine.New(InvalidArgument{}, errParams) - } - if object == "" || strings.TrimSpace(object) == "" { - return "", iodine.New(InvalidArgument{}, errParams) - } - err := d.getDonutBuckets() - if err != nil { - return "", iodine.New(err, errParams) - } - if _, ok := d.buckets[bucket]; !ok { - return "", iodine.New(BucketNotFound{Bucket: bucket}, nil) - } - objectList, err := d.buckets[bucket].ListObjects() - if err != nil { - return "", iodine.New(err, nil) - } - for objectName := range objectList { - if objectName == object { - return "", iodine.New(ObjectExists{Object: object}, nil) - } - } - md5sum, err := d.buckets[bucket].WriteObject(object, reader, expectedMD5Sum, metadata) - if err != nil { - return "", iodine.New(err, errParams) - } - return md5sum, nil -} - -// GetObject - get object -func (d donut) GetObject(bucket, object string) (reader io.ReadCloser, size int64, err error) { - errParams := map[string]string{ - "bucket": bucket, - "object": object, - } - if bucket == "" || strings.TrimSpace(bucket) == "" { - return nil, 0, iodine.New(InvalidArgument{}, errParams) - } - if object == "" || strings.TrimSpace(object) == "" { - return nil, 0, iodine.New(InvalidArgument{}, errParams) - } - err = d.getDonutBuckets() - if err != nil { - return nil, 0, iodine.New(err, nil) - } - if _, ok := d.buckets[bucket]; !ok { - return nil, 0, iodine.New(BucketNotFound{Bucket: bucket}, errParams) - } - return d.buckets[bucket].ReadObject(object) -} - -// GetObjectMetadata - get object metadata -func (d donut) GetObjectMetadata(bucket, object string) (map[string]string, error) { - errParams := map[string]string{ - "bucket": bucket, - "object": object, - } - err := d.getDonutBuckets() - if err != nil { - return nil, iodine.New(err, errParams) - } - if _, ok := d.buckets[bucket]; !ok { - return nil, iodine.New(BucketNotFound{Bucket: bucket}, errParams) - } - objectList, err := d.buckets[bucket].ListObjects() - if err != nil { - return nil, iodine.New(err, errParams) - } - donutObject, ok := objectList[object] - if !ok { - return nil, iodine.New(ObjectNotFound{Object: object}, errParams) - } - return donutObject.GetObjectMetadata() -} - -// getDiskWriters - -func (d donut) getBucketMetadataWriters() ([]io.WriteCloser, error) { - var writers []io.WriteCloser - for _, node := range d.nodes { - disks, err := node.ListDisks() - if err != nil { - return nil, iodine.New(err, nil) - } - writers = make([]io.WriteCloser, len(disks)) - for order, disk := range disks { - bucketMetaDataWriter, err := disk.CreateFile(filepath.Join(d.name, bucketMetadataConfig)) - if err != nil { - return nil, iodine.New(err, nil) - } - writers[order] = bucketMetaDataWriter - } - } - return writers, nil -} - -func (d donut) getBucketMetadataReaders() ([]io.ReadCloser, error) { - var readers []io.ReadCloser - for _, node := range d.nodes { - disks, err := node.ListDisks() - if err != nil { - return nil, iodine.New(err, nil) - } - readers = make([]io.ReadCloser, len(disks)) - for order, disk := range disks { - bucketMetaDataReader, err := disk.OpenFile(filepath.Join(d.name, bucketMetadataConfig)) - if err != nil { - return nil, iodine.New(err, nil) - } - readers[order] = bucketMetaDataReader - } - } - return readers, nil -} - -// -func (d donut) setDonutBucketMetadata(metadata map[string]map[string]string) error { - writers, err := d.getBucketMetadataWriters() - if err != nil { - return iodine.New(err, nil) - } - for _, writer := range writers { - defer writer.Close() - } - for _, writer := range writers { - jenc := json.NewEncoder(writer) - if err := jenc.Encode(metadata); err != nil { - return iodine.New(err, nil) - } - } - return nil -} - -func (d donut) getDonutBucketMetadata() (map[string]map[string]string, error) { - metadata := make(map[string]map[string]string) - readers, err := d.getBucketMetadataReaders() - if err != nil { - return nil, iodine.New(err, nil) - } - for _, reader := range readers { - defer reader.Close() - } - for _, reader := range readers { - jenc := json.NewDecoder(reader) - if err := jenc.Decode(&metadata); err != nil { - return nil, iodine.New(err, nil) - } - } - return metadata, nil -} - -func (d donut) makeDonutBucket(bucketName, acl string) error { - err := d.getDonutBuckets() - if err != nil { - return iodine.New(err, nil) - } - if _, ok := d.buckets[bucketName]; ok { - return iodine.New(BucketExists{Bucket: bucketName}, nil) - } - bucket, bucketMetadata, err := newBucket(bucketName, acl, d.name, d.nodes) - if err != nil { - return iodine.New(err, nil) - } - nodeNumber := 0 - d.buckets[bucketName] = bucket - for _, node := range d.nodes { - disks, err := node.ListDisks() - if err != nil { - return iodine.New(err, nil) - } - for order, disk := range disks { - bucketSlice := fmt.Sprintf("%s$%d$%d", bucketName, nodeNumber, order) - err := disk.MakeDir(filepath.Join(d.name, bucketSlice)) - if err != nil { - return iodine.New(err, nil) - } - } - nodeNumber = nodeNumber + 1 - } - metadata, err := d.getDonutBucketMetadata() - if err != nil { - err = iodine.ToError(err) - if os.IsNotExist(err) { - metadata := make(map[string]map[string]string) - metadata[bucketName] = bucketMetadata - err = d.setDonutBucketMetadata(metadata) - if err != nil { - return iodine.New(err, nil) - } - return nil - } - return iodine.New(err, nil) - } - metadata[bucketName] = bucketMetadata - err = d.setDonutBucketMetadata(metadata) - if err != nil { - return iodine.New(err, nil) - } - return nil -} - -func (d donut) getDonutBuckets() error { - for _, node := range d.nodes { - disks, err := node.ListDisks() - if err != nil { - return iodine.New(err, nil) - } - for _, disk := range disks { - dirs, err := disk.ListDir(d.name) - if err != nil { - return iodine.New(err, nil) - } - for _, dir := range dirs { - splitDir := strings.Split(dir.Name(), "$") - if len(splitDir) < 3 { - return iodine.New(CorruptedBackend{Backend: dir.Name()}, nil) - } - bucketName := splitDir[0] - // we dont need this NewBucket once we cache from makeDonutBucket() - bucket, _, err := newBucket(bucketName, "private", d.name, d.nodes) - if err != nil { - return iodine.New(err, nil) - } - d.buckets[bucketName] = bucket - } - } - } - return nil -} diff --git a/pkg/storage/drivers/donut/donut.go b/pkg/storage/drivers/donut/donut.go index 33f20ea6d..48572df7e 100644 --- a/pkg/storage/drivers/donut/donut.go +++ b/pkg/storage/drivers/donut/donut.go @@ -25,6 +25,7 @@ import ( "sort" "strconv" "strings" + "sync" "time" "io/ioutil" @@ -39,6 +40,7 @@ import ( type donutDriver struct { donut donut.Donut paths []string + lock *sync.RWMutex } const ( @@ -110,6 +112,7 @@ func Start(paths []string) (chan<- string, <-chan error, drivers.Driver) { s := new(donutDriver) s.donut = d s.paths = paths + s.lock = new(sync.RWMutex) go start(ctrlChannel, errorChannel, s) return ctrlChannel, errorChannel, s @@ -128,6 +131,8 @@ func (b byBucketName) Less(i, j int) bool { return b[i].Name < b[j].Name } // ListBuckets returns a list of buckets func (d donutDriver) ListBuckets() (results []drivers.BucketMetadata, err error) { + d.lock.RLock() + defer d.lock.RUnlock() if d.donut == nil { return nil, iodine.New(drivers.InternalError{}, nil) } @@ -152,6 +157,8 @@ func (d donutDriver) ListBuckets() (results []drivers.BucketMetadata, err error) // CreateBucket creates a new bucket func (d donutDriver) CreateBucket(bucketName, acl string) error { + d.lock.Lock() + defer d.lock.Unlock() if d.donut == nil { return iodine.New(drivers.InternalError{}, nil) } @@ -176,6 +183,8 @@ func (d donutDriver) CreateBucket(bucketName, acl string) error { // GetBucketMetadata retrieves an bucket's metadata func (d donutDriver) GetBucketMetadata(bucketName string) (drivers.BucketMetadata, error) { + d.lock.RLock() + defer d.lock.RUnlock() if d.donut == nil { return drivers.BucketMetadata{}, iodine.New(drivers.InternalError{}, nil) } @@ -204,6 +213,8 @@ func (d donutDriver) GetBucketMetadata(bucketName string) (drivers.BucketMetadat // SetBucketMetadata sets bucket's metadata func (d donutDriver) SetBucketMetadata(bucketName, acl string) error { + d.lock.Lock() + defer d.lock.Unlock() if d.donut == nil { return iodine.New(drivers.InternalError{}, nil) } @@ -224,6 +235,8 @@ func (d donutDriver) SetBucketMetadata(bucketName, acl string) error { // GetObject retrieves an object and writes it to a writer func (d donutDriver) GetObject(target io.Writer, bucketName, objectName string) (int64, error) { + d.lock.RLock() + defer d.lock.RUnlock() if d.donut == nil { return 0, iodine.New(drivers.InternalError{}, nil) } @@ -246,10 +259,11 @@ func (d donutDriver) GetObject(target io.Writer, bucketName, objectName string) // GetPartialObject retrieves an object range and writes it to a writer func (d donutDriver) GetPartialObject(w io.Writer, bucketName, objectName string, start, length int64) (int64, error) { + d.lock.RLock() + defer d.lock.RUnlock() if d.donut == nil { return 0, iodine.New(drivers.InternalError{}, nil) } - // TODO more efficient get partial object with proper donut support errParams := map[string]string{ "bucketName": bucketName, "objectName": objectName, @@ -295,6 +309,9 @@ func (d donutDriver) GetPartialObject(w io.Writer, bucketName, objectName string // GetObjectMetadata retrieves an object's metadata func (d donutDriver) GetObjectMetadata(bucketName, objectName string) (drivers.ObjectMetadata, error) { + d.lock.RLock() + defer d.lock.RUnlock() + errParams := map[string]string{ "bucketName": bucketName, "objectName": objectName, @@ -343,6 +360,8 @@ func (b byObjectKey) Less(i, j int) bool { return b[i].Key < b[j].Key } // ListObjects - returns list of objects func (d donutDriver) ListObjects(bucketName string, resources drivers.BucketResourcesMetadata) ([]drivers.ObjectMetadata, drivers.BucketResourcesMetadata, error) { + d.lock.RLock() + defer d.lock.RUnlock() errParams := map[string]string{ "bucketName": bucketName, } @@ -392,6 +411,8 @@ func (d donutDriver) ListObjects(bucketName string, resources drivers.BucketReso // CreateObject creates a new object func (d donutDriver) CreateObject(bucketName, objectName, contentType, expectedMD5Sum string, size int64, reader io.Reader) (string, error) { + d.lock.Lock() + defer d.lock.Unlock() errParams := map[string]string{ "bucketName": bucketName, "objectName": objectName,