mirror of
https://github.com/minio/minio.git
synced 2025-11-06 20:33:07 -05:00
Migrate to external Donut package
This commit is contained in:
@@ -19,6 +19,8 @@ package donut
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"os"
|
||||
"path"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
@@ -26,9 +28,9 @@ import (
|
||||
|
||||
"io/ioutil"
|
||||
|
||||
"github.com/minio-io/donut"
|
||||
"github.com/minio-io/iodine"
|
||||
"github.com/minio-io/minio/pkg/drivers"
|
||||
"github.com/minio-io/minio/pkg/storage/donut"
|
||||
"github.com/minio-io/minio/pkg/utils/log"
|
||||
)
|
||||
|
||||
@@ -41,20 +43,44 @@ const (
|
||||
blockSize = 10 * 1024 * 1024
|
||||
)
|
||||
|
||||
// This is a dummy nodeDiskMap which is going to be deprecated soon
|
||||
// once the Management API is standardized, this map is useful for now
|
||||
// to show multi disk API correctness behavior.
|
||||
//
|
||||
// This should be obtained from donut configuration file
|
||||
func createNodeDiskMap(p string) map[string][]string {
|
||||
nodes := make(map[string][]string)
|
||||
nodes["localhost"] = make([]string, 16)
|
||||
for i := 0; i < len(nodes["localhost"]); i++ {
|
||||
diskPath := path.Join(p, strconv.Itoa(i))
|
||||
if _, err := os.Stat(diskPath); err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
os.MkdirAll(diskPath, 0700)
|
||||
}
|
||||
}
|
||||
nodes["localhost"][i] = diskPath
|
||||
}
|
||||
return nodes
|
||||
}
|
||||
|
||||
// Start a single disk subsystem
|
||||
func Start(path string) (chan<- string, <-chan error, drivers.Driver) {
|
||||
ctrlChannel := make(chan string)
|
||||
errorChannel := make(chan error)
|
||||
s := new(donutDriver)
|
||||
errParams := map[string]string{"path": path}
|
||||
|
||||
// TODO donut driver should be passed in as Start param and driven by config
|
||||
var err error
|
||||
s.donut, err = donut.NewDonut(path)
|
||||
err = iodine.New(err, map[string]string{"path": path})
|
||||
// Soon to be user configurable, when Management API
|
||||
// is finished we remove "default" to something
|
||||
// which is passed down from configuration
|
||||
donut, err := donut.NewDonut("default", createNodeDiskMap(path))
|
||||
if err != nil {
|
||||
err = iodine.New(err, errParams)
|
||||
log.Error.Println(err)
|
||||
}
|
||||
|
||||
s := new(donutDriver)
|
||||
s.donut = donut
|
||||
|
||||
go start(ctrlChannel, errorChannel, s)
|
||||
return ctrlChannel, errorChannel, s
|
||||
}
|
||||
@@ -63,26 +89,37 @@ func start(ctrlChannel <-chan string, errorChannel chan<- error, s *donutDriver)
|
||||
close(errorChannel)
|
||||
}
|
||||
|
||||
// byBucketName is a type for sorting bucket metadata by bucket name
|
||||
type byBucketName []drivers.BucketMetadata
|
||||
|
||||
func (b byBucketName) Len() int { return len(b) }
|
||||
func (b byBucketName) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
|
||||
func (b byBucketName) Less(i, j int) bool { return b[i].Name < b[j].Name }
|
||||
|
||||
// ListBuckets returns a list of buckets
|
||||
func (d donutDriver) ListBuckets() (results []drivers.BucketMetadata, err error) {
|
||||
buckets, err := d.donut.ListBuckets()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, bucket := range buckets {
|
||||
for name := range buckets {
|
||||
result := drivers.BucketMetadata{
|
||||
Name: bucket,
|
||||
Name: name,
|
||||
// TODO Add real created date
|
||||
Created: time.Now(),
|
||||
}
|
||||
results = append(results, result)
|
||||
}
|
||||
sort.Sort(byBucketName(results))
|
||||
return results, nil
|
||||
}
|
||||
|
||||
// CreateBucket creates a new bucket
|
||||
func (d donutDriver) CreateBucket(bucket string) error {
|
||||
return d.donut.CreateBucket(bucket)
|
||||
func (d donutDriver) CreateBucket(bucketName string) error {
|
||||
if drivers.IsValidBucket(bucketName) && !strings.Contains(bucketName, ".") {
|
||||
return d.donut.MakeBucket(bucketName)
|
||||
}
|
||||
return errors.New("Invalid bucket")
|
||||
}
|
||||
|
||||
// GetBucketMetadata retrieves an bucket's metadata
|
||||
@@ -101,30 +138,75 @@ func (d donutDriver) GetBucketPolicy(bucket string) (drivers.BucketPolicy, error
|
||||
}
|
||||
|
||||
// GetObject retrieves an object and writes it to a writer
|
||||
func (d donutDriver) GetObject(target io.Writer, bucket, key string) (int64, error) {
|
||||
reader, err := d.donut.GetObjectReader(bucket, key)
|
||||
if err != nil {
|
||||
return 0, drivers.ObjectNotFound{
|
||||
Bucket: bucket,
|
||||
Object: key,
|
||||
}
|
||||
func (d donutDriver) GetObject(target io.Writer, bucketName, objectName string) (int64, error) {
|
||||
errParams := map[string]string{
|
||||
"bucketName": bucketName,
|
||||
"objectName": objectName,
|
||||
}
|
||||
return io.Copy(target, reader)
|
||||
if bucketName == "" || strings.TrimSpace(bucketName) == "" {
|
||||
return 0, iodine.New(errors.New("invalid argument"), errParams)
|
||||
}
|
||||
if objectName == "" || strings.TrimSpace(objectName) == "" {
|
||||
return 0, iodine.New(errors.New("invalid argument"), errParams)
|
||||
}
|
||||
buckets, err := d.donut.ListBuckets()
|
||||
if err != nil {
|
||||
return 0, iodine.New(err, nil)
|
||||
}
|
||||
if _, ok := buckets[bucketName]; !ok {
|
||||
return 0, iodine.New(errors.New("bucket does not exist"), errParams)
|
||||
}
|
||||
reader, size, err := buckets[bucketName].GetObject(objectName)
|
||||
if os.IsNotExist(err) {
|
||||
return 0, drivers.ObjectNotFound{
|
||||
Bucket: bucketName,
|
||||
Object: objectName,
|
||||
}
|
||||
} else if err != nil {
|
||||
return 0, iodine.New(err, errParams)
|
||||
}
|
||||
n, err := io.CopyN(target, reader, size)
|
||||
return n, iodine.New(err, errParams)
|
||||
}
|
||||
|
||||
// GetPartialObject retrieves an object range and writes it to a writer
|
||||
func (d donutDriver) GetPartialObject(w io.Writer, bucket, object string, start, length int64) (int64, error) {
|
||||
func (d donutDriver) GetPartialObject(w io.Writer, bucketName, objectName string, start, length int64) (int64, error) {
|
||||
// TODO more efficient get partial object with proper donut support
|
||||
errParams := map[string]string{
|
||||
"bucket": bucket,
|
||||
"object": object,
|
||||
"start": strconv.FormatInt(start, 10),
|
||||
"length": strconv.FormatInt(length, 10),
|
||||
"bucketName": bucketName,
|
||||
"objectName": objectName,
|
||||
"start": strconv.FormatInt(start, 10),
|
||||
"length": strconv.FormatInt(length, 10),
|
||||
}
|
||||
reader, err := d.donut.GetObjectReader(bucket, object)
|
||||
|
||||
if bucketName == "" || strings.TrimSpace(bucketName) == "" {
|
||||
return 0, iodine.New(errors.New("invalid argument"), errParams)
|
||||
}
|
||||
if objectName == "" || strings.TrimSpace(objectName) == "" {
|
||||
return 0, iodine.New(errors.New("invalid argument"), errParams)
|
||||
}
|
||||
if start < 0 {
|
||||
return 0, iodine.New(errors.New("invalid argument"), errParams)
|
||||
}
|
||||
buckets, err := d.donut.ListBuckets()
|
||||
if err != nil {
|
||||
return 0, iodine.New(err, nil)
|
||||
}
|
||||
if _, ok := buckets[bucketName]; !ok {
|
||||
return 0, iodine.New(errors.New("bucket does not exist"), errParams)
|
||||
}
|
||||
reader, size, err := buckets[bucketName].GetObject(objectName)
|
||||
if os.IsNotExist(err) {
|
||||
return 0, drivers.ObjectNotFound{
|
||||
Bucket: bucketName,
|
||||
Object: objectName,
|
||||
}
|
||||
} else if err != nil {
|
||||
return 0, iodine.New(err, errParams)
|
||||
}
|
||||
if start > size || (start+length-1) > size {
|
||||
return 0, iodine.New(errors.New("invalid range"), errParams)
|
||||
}
|
||||
_, err = io.CopyN(ioutil.Discard, reader, start)
|
||||
if err != nil {
|
||||
return 0, iodine.New(err, errParams)
|
||||
@@ -134,156 +216,160 @@ func (d donutDriver) GetPartialObject(w io.Writer, bucket, object string, start,
|
||||
}
|
||||
|
||||
// GetObjectMetadata retrieves an object's metadata
|
||||
func (d donutDriver) GetObjectMetadata(bucket, key string, prefix string) (drivers.ObjectMetadata, error) {
|
||||
metadata, err := d.donut.GetObjectMetadata(bucket, key)
|
||||
func (d donutDriver) GetObjectMetadata(bucketName, objectName, prefixName string) (drivers.ObjectMetadata, error) {
|
||||
errParams := map[string]string{
|
||||
"bucketName": bucketName,
|
||||
"objectName": objectName,
|
||||
"prefixName": prefixName,
|
||||
}
|
||||
buckets, err := d.donut.ListBuckets()
|
||||
if err != nil {
|
||||
return drivers.ObjectMetadata{}, iodine.New(err, errParams)
|
||||
}
|
||||
if _, ok := buckets[bucketName]; !ok {
|
||||
return drivers.ObjectMetadata{}, iodine.New(errors.New("bucket does not exist"), errParams)
|
||||
}
|
||||
objectList, err := buckets[bucketName].ListObjects()
|
||||
if err != nil {
|
||||
return drivers.ObjectMetadata{}, iodine.New(err, errParams)
|
||||
}
|
||||
donutObjectMetadata, err := objectList[objectName].GetDonutObjectMetadata()
|
||||
if os.IsNotExist(err) {
|
||||
// return ObjectNotFound quickly on an error, API needs this to handle invalid requests
|
||||
return drivers.ObjectMetadata{}, drivers.ObjectNotFound{
|
||||
Bucket: bucket,
|
||||
Object: key,
|
||||
Bucket: bucketName,
|
||||
Object: objectName,
|
||||
}
|
||||
} else if err != nil {
|
||||
return drivers.ObjectMetadata{}, iodine.New(err, errParams)
|
||||
}
|
||||
created, err := time.Parse(time.RFC3339Nano, metadata["sys.created"])
|
||||
objectMetadata, err := objectList[objectName].GetObjectMetadata()
|
||||
if os.IsNotExist(err) {
|
||||
// return ObjectNotFound quickly on an error, API needs this to handle invalid requests
|
||||
return drivers.ObjectMetadata{}, drivers.ObjectNotFound{
|
||||
Bucket: bucketName,
|
||||
Object: objectName,
|
||||
}
|
||||
} else if err != nil {
|
||||
return drivers.ObjectMetadata{}, iodine.New(err, errParams)
|
||||
}
|
||||
created, err := time.Parse(time.RFC3339Nano, donutObjectMetadata["created"])
|
||||
if err != nil {
|
||||
return drivers.ObjectMetadata{}, err
|
||||
return drivers.ObjectMetadata{}, iodine.New(err, nil)
|
||||
}
|
||||
size, err := strconv.ParseInt(metadata["sys.size"], 10, 64)
|
||||
size, err := strconv.ParseInt(donutObjectMetadata["size"], 10, 64)
|
||||
if err != nil {
|
||||
return drivers.ObjectMetadata{}, err
|
||||
return drivers.ObjectMetadata{}, iodine.New(err, nil)
|
||||
}
|
||||
objectMetadata := drivers.ObjectMetadata{
|
||||
Bucket: bucket,
|
||||
Key: key,
|
||||
driversObjectMetadata := drivers.ObjectMetadata{
|
||||
Bucket: bucketName,
|
||||
Key: objectName,
|
||||
|
||||
ContentType: metadata["contentType"],
|
||||
ContentType: objectMetadata["contentType"],
|
||||
Created: created,
|
||||
Md5: metadata["sys.md5"],
|
||||
Md5: donutObjectMetadata["md5"],
|
||||
Size: size,
|
||||
}
|
||||
return objectMetadata, nil
|
||||
return driversObjectMetadata, nil
|
||||
}
|
||||
|
||||
type byObjectKey []drivers.ObjectMetadata
|
||||
|
||||
func (b byObjectKey) Len() int { return len(b) }
|
||||
func (b byObjectKey) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
|
||||
func (b byObjectKey) Less(i, j int) bool { return b[i].Key < b[j].Key }
|
||||
|
||||
// ListObjects - returns list of objects
|
||||
func (d donutDriver) ListObjects(bucket string, resources drivers.BucketResourcesMetadata) ([]drivers.ObjectMetadata, drivers.BucketResourcesMetadata, error) {
|
||||
// TODO Fix IsPrefixSet && IsDelimiterSet and use them
|
||||
objects, err := d.donut.ListObjects(bucket)
|
||||
func (d donutDriver) ListObjects(bucketName string, resources drivers.BucketResourcesMetadata) ([]drivers.ObjectMetadata, drivers.BucketResourcesMetadata, error) {
|
||||
errParams := map[string]string{
|
||||
"bucketName": bucketName,
|
||||
}
|
||||
buckets, err := d.donut.ListBuckets()
|
||||
if err != nil {
|
||||
return nil, drivers.BucketResourcesMetadata{}, err
|
||||
return nil, drivers.BucketResourcesMetadata{}, iodine.New(err, errParams)
|
||||
}
|
||||
if _, ok := buckets[bucketName]; !ok {
|
||||
return nil, drivers.BucketResourcesMetadata{}, iodine.New(errors.New("bucket does not exist"), errParams)
|
||||
}
|
||||
objectList, err := buckets[bucketName].ListObjects()
|
||||
if err != nil {
|
||||
return nil, drivers.BucketResourcesMetadata{}, iodine.New(err, errParams)
|
||||
}
|
||||
var objects []string
|
||||
for key := range objectList {
|
||||
objects = append(objects, key)
|
||||
}
|
||||
sort.Strings(objects)
|
||||
if resources.Prefix != "" {
|
||||
objects = filterPrefix(objects, resources.Prefix)
|
||||
objects = removePrefix(objects, resources.Prefix)
|
||||
}
|
||||
|
||||
if resources.Maxkeys <= 0 || resources.Maxkeys > 1000 {
|
||||
resources.Maxkeys = 1000
|
||||
}
|
||||
|
||||
var actualObjects []string
|
||||
var commonPrefixes []string
|
||||
if strings.TrimSpace(resources.Delimiter) != "" {
|
||||
actualObjects = filterDelimited(objects, resources.Delimiter)
|
||||
commonPrefixes = filterNotDelimited(objects, resources.Delimiter)
|
||||
commonPrefixes = extractDir(commonPrefixes, resources.Delimiter)
|
||||
commonPrefixes = uniqueObjects(commonPrefixes)
|
||||
resources.CommonPrefixes = commonPrefixes
|
||||
} else {
|
||||
actualObjects = objects
|
||||
}
|
||||
// Populate filtering mode
|
||||
resources.Mode = drivers.GetMode(resources)
|
||||
// filter objects based on resources.Prefix and resources.Delimiter
|
||||
actualObjects, commonPrefixes := d.filter(objects, resources)
|
||||
resources.CommonPrefixes = commonPrefixes
|
||||
|
||||
var results []drivers.ObjectMetadata
|
||||
for _, object := range actualObjects {
|
||||
for _, objectName := range actualObjects {
|
||||
if len(results) >= resources.Maxkeys {
|
||||
resources.IsTruncated = true
|
||||
break
|
||||
}
|
||||
metadata, err := d.GetObjectMetadata(bucket, resources.Prefix+object, "")
|
||||
if _, ok := objectList[objectName]; !ok {
|
||||
return nil, drivers.BucketResourcesMetadata{}, iodine.New(errors.New("object corrupted"), errParams)
|
||||
}
|
||||
objectMetadata, err := objectList[objectName].GetDonutObjectMetadata()
|
||||
if err != nil {
|
||||
return nil, drivers.BucketResourcesMetadata{}, err
|
||||
return nil, drivers.BucketResourcesMetadata{}, iodine.New(err, errParams)
|
||||
}
|
||||
t, err := time.Parse(time.RFC3339Nano, objectMetadata["created"])
|
||||
if err != nil {
|
||||
return nil, drivers.BucketResourcesMetadata{}, iodine.New(err, nil)
|
||||
}
|
||||
size, err := strconv.ParseInt(objectMetadata["size"], 10, 64)
|
||||
if err != nil {
|
||||
return nil, drivers.BucketResourcesMetadata{}, iodine.New(err, nil)
|
||||
}
|
||||
metadata := drivers.ObjectMetadata{
|
||||
Key: objectName,
|
||||
Created: t,
|
||||
Size: size,
|
||||
}
|
||||
results = append(results, metadata)
|
||||
}
|
||||
sort.Sort(byObjectKey(results))
|
||||
return results, resources, nil
|
||||
}
|
||||
|
||||
func filterPrefix(objects []string, prefix string) []string {
|
||||
var results []string
|
||||
for _, object := range objects {
|
||||
if strings.HasPrefix(object, prefix) {
|
||||
results = append(results, object)
|
||||
}
|
||||
}
|
||||
return results
|
||||
}
|
||||
|
||||
func removePrefix(objects []string, prefix string) []string {
|
||||
var results []string
|
||||
for _, object := range objects {
|
||||
results = append(results, strings.TrimPrefix(object, prefix))
|
||||
}
|
||||
return results
|
||||
}
|
||||
|
||||
func filterDelimited(objects []string, delim string) []string {
|
||||
var results []string
|
||||
for _, object := range objects {
|
||||
if !strings.Contains(object, delim) {
|
||||
results = append(results, object)
|
||||
}
|
||||
}
|
||||
return results
|
||||
}
|
||||
func filterNotDelimited(objects []string, delim string) []string {
|
||||
var results []string
|
||||
for _, object := range objects {
|
||||
if strings.Contains(object, delim) {
|
||||
results = append(results, object)
|
||||
}
|
||||
}
|
||||
return results
|
||||
}
|
||||
|
||||
func extractDir(objects []string, delim string) []string {
|
||||
var results []string
|
||||
for _, object := range objects {
|
||||
parts := strings.Split(object, delim)
|
||||
results = append(results, parts[0]+"/")
|
||||
}
|
||||
return results
|
||||
}
|
||||
|
||||
func uniqueObjects(objects []string) []string {
|
||||
objectMap := make(map[string]string)
|
||||
for _, v := range objects {
|
||||
objectMap[v] = v
|
||||
}
|
||||
var results []string
|
||||
for k := range objectMap {
|
||||
results = append(results, k)
|
||||
}
|
||||
sort.Strings(results)
|
||||
return results
|
||||
}
|
||||
|
||||
// CreateObject creates a new object
|
||||
func (d donutDriver) CreateObject(bucketKey, objectKey, contentType, expectedMd5sum string, reader io.Reader) error {
|
||||
writer, err := d.donut.GetObjectWriter(bucketKey, objectKey)
|
||||
if err != nil {
|
||||
return err
|
||||
func (d donutDriver) CreateObject(bucketName, objectName, contentType, expectedMd5sum string, reader io.Reader) error {
|
||||
errParams := map[string]string{
|
||||
"bucketName": bucketName,
|
||||
"objectName": objectName,
|
||||
"contentType": contentType,
|
||||
}
|
||||
if _, err := io.Copy(writer, reader); err != nil {
|
||||
return err
|
||||
if bucketName == "" || strings.TrimSpace(bucketName) == "" {
|
||||
return iodine.New(errors.New("invalid argument"), errParams)
|
||||
}
|
||||
if objectName == "" || strings.TrimSpace(objectName) == "" {
|
||||
return iodine.New(errors.New("invalid argument"), errParams)
|
||||
}
|
||||
buckets, err := d.donut.ListBuckets()
|
||||
if err != nil {
|
||||
return iodine.New(err, errParams)
|
||||
}
|
||||
if _, ok := buckets[bucketName]; !ok {
|
||||
return iodine.New(errors.New("bucket does not exist"), errParams)
|
||||
}
|
||||
if contentType == "" {
|
||||
contentType = "application/octet-stream"
|
||||
}
|
||||
contentType = strings.TrimSpace(contentType)
|
||||
metadata := make(map[string]string)
|
||||
metadata["bucket"] = bucketKey
|
||||
metadata["object"] = objectKey
|
||||
metadata["contentType"] = contentType
|
||||
if err = writer.SetMetadata(metadata); err != nil {
|
||||
return err
|
||||
err = buckets[bucketName].PutObject(objectName, contentType, reader)
|
||||
if err != nil {
|
||||
return iodine.New(err, errParams)
|
||||
}
|
||||
return writer.Close()
|
||||
// handle expectedMd5sum
|
||||
return nil
|
||||
}
|
||||
|
||||
68
pkg/drivers/donut/donut_filter.go
Normal file
68
pkg/drivers/donut/donut_filter.go
Normal file
@@ -0,0 +1,68 @@
|
||||
package donut
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"strings"
|
||||
|
||||
"github.com/minio-io/minio/pkg/drivers"
|
||||
)
|
||||
|
||||
func delimiter(object, delimiter string) string {
|
||||
readBuffer := bytes.NewBufferString(object)
|
||||
reader := bufio.NewReader(readBuffer)
|
||||
stringReader := strings.NewReader(delimiter)
|
||||
delimited, _ := stringReader.ReadByte()
|
||||
delimitedStr, _ := reader.ReadString(delimited)
|
||||
return delimitedStr
|
||||
}
|
||||
|
||||
func appendUniq(slice []string, i string) []string {
|
||||
for _, ele := range slice {
|
||||
if ele == i {
|
||||
return slice
|
||||
}
|
||||
}
|
||||
return append(slice, i)
|
||||
}
|
||||
|
||||
func (d donutDriver) filter(objects []string, resources drivers.BucketResourcesMetadata) ([]string, []string) {
|
||||
var actualObjects []string
|
||||
var commonPrefixes []string
|
||||
for _, name := range objects {
|
||||
switch true {
|
||||
// Both delimiter and Prefix is present
|
||||
case resources.IsDelimiterPrefixSet():
|
||||
if strings.HasPrefix(name, resources.Prefix) {
|
||||
trimmedName := strings.TrimPrefix(name, resources.Prefix)
|
||||
delimitedName := delimiter(trimmedName, resources.Delimiter)
|
||||
if delimitedName != "" {
|
||||
if delimitedName == resources.Delimiter {
|
||||
commonPrefixes = appendUniq(commonPrefixes, resources.Prefix+delimitedName)
|
||||
} else {
|
||||
commonPrefixes = appendUniq(commonPrefixes, delimitedName)
|
||||
}
|
||||
if trimmedName == delimitedName {
|
||||
actualObjects = appendUniq(actualObjects, name)
|
||||
}
|
||||
}
|
||||
}
|
||||
// Delimiter present and Prefix is absent
|
||||
case resources.IsDelimiterSet():
|
||||
delimitedName := delimiter(name, resources.Delimiter)
|
||||
switch true {
|
||||
case delimitedName == name:
|
||||
actualObjects = appendUniq(actualObjects, name)
|
||||
case delimitedName != "":
|
||||
commonPrefixes = appendUniq(commonPrefixes, delimitedName)
|
||||
}
|
||||
case resources.IsPrefixSet():
|
||||
if strings.HasPrefix(name, resources.Prefix) {
|
||||
actualObjects = appendUniq(actualObjects, name)
|
||||
}
|
||||
case resources.IsDefault():
|
||||
return objects, nil
|
||||
}
|
||||
}
|
||||
return actualObjects, commonPrefixes
|
||||
}
|
||||
@@ -17,6 +17,7 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os/user"
|
||||
"path"
|
||||
"reflect"
|
||||
@@ -187,7 +188,8 @@ func Start(configs []Config) {
|
||||
for _, ch := range ctrlChans {
|
||||
close(ch)
|
||||
}
|
||||
log.Fatal(value.Interface())
|
||||
msg := fmt.Sprintf("%q", value.Interface())
|
||||
log.Fatal(iodine.New(errors.New(msg), nil))
|
||||
}
|
||||
case false:
|
||||
// Channel closed, remove from list
|
||||
|
||||
@@ -1,42 +0,0 @@
|
||||
package donut
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/minio-io/iodine"
|
||||
)
|
||||
|
||||
type donutBucket struct {
|
||||
nodes []string
|
||||
objects map[string][]byte
|
||||
}
|
||||
|
||||
// GetNodes - get list of associated nodes for a given bucket
|
||||
func (b donutBucket) GetNodes() ([]string, error) {
|
||||
var nodes []string
|
||||
for _, node := range b.nodes {
|
||||
nodes = append(nodes, node)
|
||||
}
|
||||
return nodes, nil
|
||||
}
|
||||
|
||||
// AddNode - adds a node to a bucket
|
||||
func (b donutBucket) AddNode(nodeID, bucketID string) error {
|
||||
errParams := map[string]string{"node": nodeID, "bucketID": bucketID}
|
||||
tokens := strings.Split(bucketID, ":")
|
||||
if len(tokens) != 3 {
|
||||
return iodine.New(errors.New("Bucket ID malformeD: "+bucketID), errParams)
|
||||
|
||||
}
|
||||
// bucketName := tokens[0]
|
||||
// aggregate := tokens[1]
|
||||
// aggregate := "0"
|
||||
part, err := strconv.Atoi(tokens[2])
|
||||
if err != nil {
|
||||
return iodine.New(errors.New("Part malformed: "+tokens[2]), errParams)
|
||||
}
|
||||
b.nodes[part] = nodeID
|
||||
return nil
|
||||
}
|
||||
@@ -1,196 +0,0 @@
|
||||
package donut
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/minio-io/iodine"
|
||||
)
|
||||
|
||||
type donut struct {
|
||||
buckets map[string]Bucket
|
||||
nodes map[string]Node
|
||||
}
|
||||
|
||||
// NewDonut - instantiate new donut driver
|
||||
func NewDonut(root string) (Donut, error) {
|
||||
nodes := make(map[string]Node)
|
||||
nodes["localhost"] = &localDirectoryNode{root: root}
|
||||
driver := &donut{
|
||||
buckets: make(map[string]Bucket),
|
||||
nodes: nodes,
|
||||
}
|
||||
for nodeID, node := range nodes {
|
||||
bucketIDs, err := node.GetBuckets()
|
||||
if err != nil {
|
||||
return nil, iodine.New(err, map[string]string{"root": root})
|
||||
}
|
||||
for _, bucketID := range bucketIDs {
|
||||
tokens := strings.Split(bucketID, ":")
|
||||
if _, ok := driver.buckets[tokens[0]]; !ok {
|
||||
bucket := donutBucket{
|
||||
nodes: make([]string, 16),
|
||||
}
|
||||
// TODO catch errors
|
||||
driver.buckets[tokens[0]] = bucket
|
||||
}
|
||||
if err = driver.buckets[tokens[0]].AddNode(nodeID, bucketID); err != nil {
|
||||
return nil, iodine.New(err, map[string]string{"root": root})
|
||||
}
|
||||
}
|
||||
}
|
||||
return driver, nil
|
||||
}
|
||||
|
||||
// CreateBucket - create a new bucket
|
||||
func (d donut) CreateBucket(bucketName string) error {
|
||||
if _, ok := d.buckets[bucketName]; ok == false {
|
||||
bucketName = strings.TrimSpace(bucketName)
|
||||
if bucketName == "" {
|
||||
return iodine.New(errors.New("Cannot create bucket with no name"), map[string]string{"bucket": bucketName})
|
||||
}
|
||||
// assign nodes
|
||||
// TODO assign other nodes
|
||||
nodes := make([]string, 16)
|
||||
for i := 0; i < 16; i++ {
|
||||
nodes[i] = "localhost"
|
||||
if node, ok := d.nodes["localhost"]; ok {
|
||||
err := node.CreateBucket(bucketName + ":0:" + strconv.Itoa(i))
|
||||
if err != nil {
|
||||
return iodine.New(err, map[string]string{"node": nodes[i], "bucket": bucketName})
|
||||
}
|
||||
}
|
||||
}
|
||||
bucket := donutBucket{
|
||||
nodes: nodes,
|
||||
}
|
||||
d.buckets[bucketName] = bucket
|
||||
return nil
|
||||
}
|
||||
return iodine.New(errors.New("Bucket exists"), map[string]string{"bucket": bucketName})
|
||||
}
|
||||
|
||||
// ListBuckets - list all buckets
|
||||
func (d donut) ListBuckets() ([]string, error) {
|
||||
var buckets []string
|
||||
for bucket := range d.buckets {
|
||||
buckets = append(buckets, bucket)
|
||||
}
|
||||
sort.Strings(buckets)
|
||||
return buckets, nil
|
||||
}
|
||||
|
||||
// GetObjectWriter - get a new writer interface for a new object
|
||||
func (d donut) GetObjectWriter(bucketName, objectName string) (ObjectWriter, error) {
|
||||
if bucket, ok := d.buckets[bucketName]; ok == true {
|
||||
writers := make([]Writer, 16)
|
||||
nodes, err := bucket.GetNodes()
|
||||
if err != nil {
|
||||
return nil, iodine.New(err, map[string]string{"bucket": bucketName, "object": objectName})
|
||||
}
|
||||
for i, nodeID := range nodes {
|
||||
if node, ok := d.nodes[nodeID]; ok == true {
|
||||
bucketID := bucketName + ":0:" + strconv.Itoa(i)
|
||||
writer, err := node.GetWriter(bucketID, objectName)
|
||||
if err != nil {
|
||||
for _, writerToClose := range writers {
|
||||
if writerToClose != nil {
|
||||
writerToClose.CloseWithError(iodine.New(err, nil))
|
||||
}
|
||||
}
|
||||
return nil, iodine.New(err, map[string]string{"bucketid": bucketID})
|
||||
}
|
||||
writers[i] = writer
|
||||
}
|
||||
}
|
||||
return newErasureWriter(writers), nil
|
||||
}
|
||||
return nil, iodine.New(errors.New("Bucket not found"), map[string]string{"bucket": bucketName})
|
||||
}
|
||||
|
||||
// GetObjectReader - get a new reader interface for a new object
|
||||
func (d donut) GetObjectReader(bucketName, objectName string) (io.ReadCloser, error) {
|
||||
errParams := map[string]string{"bucket": bucketName, "object": objectName}
|
||||
r, w := io.Pipe()
|
||||
if bucket, ok := d.buckets[bucketName]; ok == true {
|
||||
readers := make([]io.ReadCloser, 16)
|
||||
nodes, err := bucket.GetNodes()
|
||||
if err != nil {
|
||||
return nil, iodine.New(err, errParams)
|
||||
}
|
||||
var metadata map[string]string
|
||||
for i, nodeID := range nodes {
|
||||
if node, ok := d.nodes[nodeID]; ok == true {
|
||||
bucketID := bucketName + ":0:" + strconv.Itoa(i)
|
||||
reader, err := node.GetReader(bucketID, objectName)
|
||||
if err != nil {
|
||||
errParams["node"] = nodeID
|
||||
return nil, iodine.New(err, errParams)
|
||||
}
|
||||
readers[i] = reader
|
||||
if metadata == nil {
|
||||
metadata, err = node.GetDonutMetadata(bucketID, objectName)
|
||||
if err != nil {
|
||||
errParams["node"] = nodeID
|
||||
return nil, iodine.New(err, errParams)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
go erasureReader(readers, metadata, w)
|
||||
return r, nil
|
||||
}
|
||||
return nil, iodine.New(errors.New("Bucket not found"), errParams)
|
||||
}
|
||||
|
||||
// GetObjectMetadata returns metadata for a given object in a bucket
|
||||
func (d donut) GetObjectMetadata(bucketName, object string) (map[string]string, error) {
|
||||
errParams := map[string]string{"bucket": bucketName, "object": object}
|
||||
if bucket, ok := d.buckets[bucketName]; ok {
|
||||
nodes, err := bucket.GetNodes()
|
||||
if err != nil {
|
||||
return nil, iodine.New(err, errParams)
|
||||
}
|
||||
if node, ok := d.nodes[nodes[0]]; ok {
|
||||
bucketID := bucketName + ":0:0"
|
||||
metadata, err := node.GetMetadata(bucketID, object)
|
||||
if err != nil {
|
||||
errParams["bucketID"] = bucketID
|
||||
return nil, iodine.New(err, errParams)
|
||||
}
|
||||
donutMetadata, err := node.GetDonutMetadata(bucketID, object)
|
||||
if err != nil {
|
||||
errParams["bucketID"] = bucketID
|
||||
return nil, iodine.New(err, errParams)
|
||||
}
|
||||
metadata["sys.created"] = donutMetadata["created"]
|
||||
metadata["sys.md5"] = donutMetadata["md5"]
|
||||
metadata["sys.size"] = donutMetadata["size"]
|
||||
return metadata, nil
|
||||
}
|
||||
errParams["node"] = nodes[0]
|
||||
return nil, iodine.New(errors.New("Cannot connect to node: "+nodes[0]), errParams)
|
||||
}
|
||||
return nil, errors.New("Bucket not found")
|
||||
}
|
||||
|
||||
// ListObjects - list all the available objects in a bucket
|
||||
func (d donut) ListObjects(bucketName string) ([]string, error) {
|
||||
errParams := map[string]string{"bucket": bucketName}
|
||||
if bucket, ok := d.buckets[bucketName]; ok {
|
||||
nodes, err := bucket.GetNodes()
|
||||
if err != nil {
|
||||
return nil, iodine.New(err, errParams)
|
||||
}
|
||||
if node, ok := d.nodes[nodes[0]]; ok {
|
||||
bucketID := bucketName + ":0:0"
|
||||
objects, err := node.ListObjects(bucketID)
|
||||
errParams["bucketID"] = bucketID
|
||||
return objects, iodine.New(err, errParams)
|
||||
}
|
||||
}
|
||||
return nil, iodine.New(errors.New("Bucket not found"), errParams)
|
||||
}
|
||||
@@ -1,240 +0,0 @@
|
||||
package donut
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
. "github.com/minio-io/check"
|
||||
)
|
||||
|
||||
func Test(t *testing.T) { TestingT(t) }
|
||||
|
||||
type MySuite struct{}
|
||||
|
||||
var _ = Suite(&MySuite{})
|
||||
|
||||
func (s *MySuite) TestEmptyBucket(c *C) {
|
||||
root, err := ioutil.TempDir(os.TempDir(), "donut-")
|
||||
c.Assert(err, IsNil)
|
||||
defer os.RemoveAll(root)
|
||||
donut, err := NewDonut(root)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
// check buckets are empty
|
||||
buckets, err := donut.ListBuckets()
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(buckets, IsNil)
|
||||
}
|
||||
|
||||
func (s *MySuite) TestBucketWithoutNameFails(c *C) {
|
||||
root, err := ioutil.TempDir(os.TempDir(), "donut-")
|
||||
c.Assert(err, IsNil)
|
||||
defer os.RemoveAll(root)
|
||||
donut, err := NewDonut(root)
|
||||
c.Assert(err, IsNil)
|
||||
// fail to create new bucket without a name
|
||||
err = donut.CreateBucket("")
|
||||
c.Assert(err, Not(IsNil))
|
||||
|
||||
err = donut.CreateBucket(" ")
|
||||
c.Assert(err, Not(IsNil))
|
||||
}
|
||||
|
||||
func (s *MySuite) TestCreateBucketAndList(c *C) {
|
||||
root, err := ioutil.TempDir(os.TempDir(), "donut-")
|
||||
c.Assert(err, IsNil)
|
||||
defer os.RemoveAll(root)
|
||||
donut, err := NewDonut(root)
|
||||
c.Assert(err, IsNil)
|
||||
// create bucket
|
||||
err = donut.CreateBucket("foo")
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
// check bucket exists
|
||||
buckets, err := donut.ListBuckets()
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(buckets, DeepEquals, []string{"foo"})
|
||||
}
|
||||
|
||||
func (s *MySuite) TestCreateBucketWithSameNameFails(c *C) {
|
||||
root, err := ioutil.TempDir(os.TempDir(), "donut-")
|
||||
c.Assert(err, IsNil)
|
||||
defer os.RemoveAll(root)
|
||||
donut, err := NewDonut(root)
|
||||
c.Assert(err, IsNil)
|
||||
err = donut.CreateBucket("foo")
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
err = donut.CreateBucket("foo")
|
||||
c.Assert(err, Not(IsNil))
|
||||
}
|
||||
|
||||
func (s *MySuite) TestCreateMultipleBucketsAndList(c *C) {
|
||||
root, err := ioutil.TempDir(os.TempDir(), "donut-")
|
||||
c.Assert(err, IsNil)
|
||||
defer os.RemoveAll(root)
|
||||
donut, err := NewDonut(root)
|
||||
c.Assert(err, IsNil)
|
||||
// add a second bucket
|
||||
err = donut.CreateBucket("foo")
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
err = donut.CreateBucket("bar")
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
buckets, err := donut.ListBuckets()
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(buckets, DeepEquals, []string{"bar", "foo"})
|
||||
|
||||
err = donut.CreateBucket("foobar")
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
buckets, err = donut.ListBuckets()
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(buckets, DeepEquals, []string{"bar", "foo", "foobar"})
|
||||
}
|
||||
|
||||
func (s *MySuite) TestNewObjectFailsWithoutBucket(c *C) {
|
||||
root, err := ioutil.TempDir(os.TempDir(), "donut-")
|
||||
c.Assert(err, IsNil)
|
||||
defer os.RemoveAll(root)
|
||||
donut, err := NewDonut(root)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
writer, err := donut.GetObjectWriter("foo", "obj")
|
||||
c.Assert(err, Not(IsNil))
|
||||
c.Assert(writer, IsNil)
|
||||
}
|
||||
|
||||
func (s *MySuite) TestNewObjectFailsWithEmptyName(c *C) {
|
||||
root, err := ioutil.TempDir(os.TempDir(), "donut-")
|
||||
c.Assert(err, IsNil)
|
||||
defer os.RemoveAll(root)
|
||||
donut, err := NewDonut(root)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
writer, err := donut.GetObjectWriter("foo", "")
|
||||
c.Assert(err, Not(IsNil))
|
||||
c.Assert(writer, IsNil)
|
||||
|
||||
writer, err = donut.GetObjectWriter("foo", " ")
|
||||
c.Assert(err, Not(IsNil))
|
||||
c.Assert(writer, IsNil)
|
||||
}
|
||||
|
||||
func (s *MySuite) TestNewObjectCanBeWritten(c *C) {
|
||||
root, err := ioutil.TempDir(os.TempDir(), "donut-")
|
||||
c.Assert(err, IsNil)
|
||||
defer os.RemoveAll(root)
|
||||
donut, err := NewDonut(root)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
err = donut.CreateBucket("foo")
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
writer, err := donut.GetObjectWriter("foo", "obj")
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
data := "Hello World"
|
||||
length, err := writer.Write([]byte(data))
|
||||
c.Assert(length, Equals, len(data))
|
||||
|
||||
expectedMetadata := map[string]string{
|
||||
"foo": "bar",
|
||||
"created": "one",
|
||||
"hello": "world",
|
||||
}
|
||||
|
||||
err = writer.SetMetadata(expectedMetadata)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
err = writer.Close()
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
actualWriterMetadata, err := writer.GetMetadata()
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(actualWriterMetadata, DeepEquals, expectedMetadata)
|
||||
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
reader, err := donut.GetObjectReader("foo", "obj")
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
var actualData bytes.Buffer
|
||||
_, err = io.Copy(&actualData, reader)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(actualData.Bytes(), DeepEquals, []byte(data))
|
||||
|
||||
actualMetadata, err := donut.GetObjectMetadata("foo", "obj")
|
||||
c.Assert(err, IsNil)
|
||||
expectedMetadata["sys.md5"] = "b10a8db164e0754105b7a99be72e3fe5"
|
||||
expectedMetadata["sys.size"] = "11"
|
||||
_, err = time.Parse(time.RFC3339Nano, actualMetadata["sys.created"])
|
||||
c.Assert(err, IsNil)
|
||||
expectedMetadata["sys.created"] = actualMetadata["sys.created"]
|
||||
c.Assert(actualMetadata, DeepEquals, expectedMetadata)
|
||||
}
|
||||
|
||||
func (s *MySuite) TestMultipleNewObjects(c *C) {
|
||||
root, err := ioutil.TempDir(os.TempDir(), "donut-")
|
||||
c.Assert(err, IsNil)
|
||||
defer os.RemoveAll(root)
|
||||
donut, err := NewDonut(root)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
c.Assert(donut.CreateBucket("foo"), IsNil)
|
||||
writer, err := donut.GetObjectWriter("foo", "obj1")
|
||||
c.Assert(err, IsNil)
|
||||
writer.Write([]byte("one"))
|
||||
writer.Close()
|
||||
|
||||
writer, err = donut.GetObjectWriter("foo", "obj2")
|
||||
c.Assert(err, IsNil)
|
||||
writer.Write([]byte("two"))
|
||||
writer.Close()
|
||||
|
||||
// c.Skip("not complete")
|
||||
|
||||
reader, err := donut.GetObjectReader("foo", "obj1")
|
||||
c.Assert(err, IsNil)
|
||||
var readerBuffer1 bytes.Buffer
|
||||
_, err = io.Copy(&readerBuffer1, reader)
|
||||
c.Assert(err, IsNil)
|
||||
// c.Skip("Not Implemented")
|
||||
c.Assert(readerBuffer1.Bytes(), DeepEquals, []byte("one"))
|
||||
|
||||
reader, err = donut.GetObjectReader("foo", "obj2")
|
||||
c.Assert(err, IsNil)
|
||||
var readerBuffer2 bytes.Buffer
|
||||
_, err = io.Copy(&readerBuffer2, reader)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(readerBuffer2.Bytes(), DeepEquals, []byte("two"))
|
||||
|
||||
// test list objects
|
||||
listObjects, err := donut.ListObjects("foo")
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(listObjects, DeepEquals, []string{"obj1", "obj2"})
|
||||
}
|
||||
|
||||
func (s *MySuite) TestSysPrefixShouldFail(c *C) {
|
||||
root, err := ioutil.TempDir(os.TempDir(), "donut-")
|
||||
c.Assert(err, IsNil)
|
||||
defer os.RemoveAll(root)
|
||||
donut, err := NewDonut(root)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
c.Assert(donut.CreateBucket("foo"), IsNil)
|
||||
writer, err := donut.GetObjectWriter("foo", "obj1")
|
||||
c.Assert(err, IsNil)
|
||||
writer.Write([]byte("one"))
|
||||
metadata := make(map[string]string)
|
||||
metadata["foo"] = "bar"
|
||||
metadata["sys.hello"] = "world"
|
||||
err = writer.SetMetadata(metadata)
|
||||
c.Assert(err, Not(IsNil))
|
||||
writer.Close()
|
||||
}
|
||||
@@ -1,245 +0,0 @@
|
||||
package donut
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"hash"
|
||||
"io"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"crypto/md5"
|
||||
"encoding/hex"
|
||||
|
||||
encoding "github.com/minio-io/erasure"
|
||||
"github.com/minio-io/iodine"
|
||||
"github.com/minio-io/minio/pkg/utils/split"
|
||||
)
|
||||
|
||||
// getErasureTechnique - convert technique string into Technique type
|
||||
func getErasureTechnique(technique string) (encoding.Technique, error) {
|
||||
switch true {
|
||||
case technique == "Cauchy":
|
||||
return encoding.Cauchy, nil
|
||||
case technique == "Vandermonde":
|
||||
return encoding.Cauchy, nil
|
||||
default:
|
||||
return encoding.None, iodine.New(errors.New("Invalid erasure technique: "+technique), nil)
|
||||
}
|
||||
}
|
||||
|
||||
// erasureReader - returns aligned streaming reads over a PipeWriter
|
||||
func erasureReader(readers []io.ReadCloser, donutMetadata map[string]string, writer *io.PipeWriter) {
|
||||
totalChunks, err := strconv.Atoi(donutMetadata["chunkCount"])
|
||||
if err != nil {
|
||||
writer.CloseWithError(iodine.New(err, donutMetadata))
|
||||
return
|
||||
}
|
||||
totalLeft, err := strconv.ParseInt(donutMetadata["size"], 10, 64)
|
||||
if err != nil {
|
||||
writer.CloseWithError(iodine.New(err, donutMetadata))
|
||||
return
|
||||
}
|
||||
blockSize, err := strconv.Atoi(donutMetadata["blockSize"])
|
||||
if err != nil {
|
||||
writer.CloseWithError(iodine.New(err, donutMetadata))
|
||||
return
|
||||
}
|
||||
parsedk, err := strconv.ParseUint(donutMetadata["erasureK"], 10, 8)
|
||||
if err != nil {
|
||||
writer.CloseWithError(iodine.New(err, donutMetadata))
|
||||
return
|
||||
}
|
||||
k := uint8(parsedk)
|
||||
parsedm, err := strconv.ParseUint(donutMetadata["erasureM"], 10, 8)
|
||||
if err != nil {
|
||||
writer.CloseWithError(iodine.New(err, donutMetadata))
|
||||
return
|
||||
}
|
||||
m := uint8(parsedm)
|
||||
expectedMd5sum, err := hex.DecodeString(donutMetadata["md5"])
|
||||
if err != nil {
|
||||
writer.CloseWithError(iodine.New(err, donutMetadata))
|
||||
return
|
||||
}
|
||||
technique, err := getErasureTechnique(donutMetadata["erasureTechnique"])
|
||||
if err != nil {
|
||||
writer.CloseWithError(iodine.New(err, donutMetadata))
|
||||
return
|
||||
}
|
||||
hasher := md5.New()
|
||||
params, err := encoding.ValidateParams(k, m, technique)
|
||||
if err != nil {
|
||||
writer.CloseWithError(iodine.New(err, donutMetadata))
|
||||
}
|
||||
encoder := encoding.NewErasure(params)
|
||||
for i := 0; i < totalChunks; i++ {
|
||||
totalLeft, err = decodeChunk(writer, readers, encoder, hasher, k, m, totalLeft, blockSize)
|
||||
if err != nil {
|
||||
errParams := map[string]string{
|
||||
"totalLeft": strconv.FormatInt(totalLeft, 10),
|
||||
}
|
||||
for k, v := range donutMetadata {
|
||||
errParams[k] = v
|
||||
}
|
||||
writer.CloseWithError(iodine.New(err, errParams))
|
||||
}
|
||||
}
|
||||
actualMd5sum := hasher.Sum(nil)
|
||||
if bytes.Compare(expectedMd5sum, actualMd5sum) != 0 {
|
||||
writer.CloseWithError(iodine.New(errors.New("decoded md5sum did not match. expected: "+string(expectedMd5sum)+" actual: "+string(actualMd5sum)), donutMetadata))
|
||||
return
|
||||
}
|
||||
writer.Close()
|
||||
return
|
||||
}
|
||||
|
||||
func decodeChunk(writer *io.PipeWriter, readers []io.ReadCloser, encoder *encoding.Erasure, hasher hash.Hash, k, m uint8, totalLeft int64, blockSize int) (int64, error) {
|
||||
curBlockSize := 0
|
||||
if int64(blockSize) < totalLeft {
|
||||
curBlockSize = blockSize
|
||||
} else {
|
||||
curBlockSize = int(totalLeft) // cast is safe, blockSize in if protects
|
||||
}
|
||||
|
||||
curChunkSize := encoding.GetEncodedBlockLen(curBlockSize, uint8(k))
|
||||
encodedBytes := make([][]byte, 16)
|
||||
for i, reader := range readers {
|
||||
var bytesBuffer bytes.Buffer
|
||||
written, err := io.CopyN(&bytesBuffer, reader, int64(curChunkSize))
|
||||
if err != nil {
|
||||
errParams := map[string]string{}
|
||||
errParams["part"] = strconv.FormatInt(written, 10)
|
||||
errParams["block.written"] = strconv.FormatInt(written, 10)
|
||||
errParams["block.length"] = strconv.Itoa(curChunkSize)
|
||||
return totalLeft, iodine.New(err, errParams)
|
||||
}
|
||||
encodedBytes[i] = bytesBuffer.Bytes()
|
||||
}
|
||||
decodedData, err := encoder.Decode(encodedBytes, curBlockSize)
|
||||
if err != nil {
|
||||
errParams := map[string]string{}
|
||||
errParams["block.length"] = strconv.Itoa(curChunkSize)
|
||||
return totalLeft, iodine.New(err, errParams)
|
||||
}
|
||||
_, err = hasher.Write(decodedData) // not expecting errors from hash, will also catch further down on .Sum mismatch in parent
|
||||
if err != nil {
|
||||
errParams := map[string]string{}
|
||||
errParams["block.length"] = strconv.Itoa(curChunkSize)
|
||||
return totalLeft, iodine.New(err, errParams)
|
||||
}
|
||||
_, err = io.Copy(writer, bytes.NewBuffer(decodedData))
|
||||
if err != nil {
|
||||
errParams := map[string]string{}
|
||||
errParams["block.length"] = strconv.Itoa(curChunkSize)
|
||||
return totalLeft, iodine.New(err, errParams)
|
||||
}
|
||||
totalLeft = totalLeft - int64(blockSize)
|
||||
return totalLeft, nil
|
||||
}
|
||||
|
||||
// erasure writer
|
||||
|
||||
type erasureWriter struct {
|
||||
writers []Writer
|
||||
metadata map[string]string
|
||||
donutMetadata map[string]string // not exposed
|
||||
erasureWriter *io.PipeWriter
|
||||
isClosed <-chan bool
|
||||
}
|
||||
|
||||
// newErasureWriter - get a new writer
|
||||
func newErasureWriter(writers []Writer) ObjectWriter {
|
||||
r, w := io.Pipe()
|
||||
isClosed := make(chan bool)
|
||||
writer := erasureWriter{
|
||||
writers: writers,
|
||||
metadata: make(map[string]string),
|
||||
erasureWriter: w,
|
||||
isClosed: isClosed,
|
||||
}
|
||||
go erasureGoroutine(r, writer, isClosed)
|
||||
return writer
|
||||
}
|
||||
|
||||
func erasureGoroutine(r *io.PipeReader, eWriter erasureWriter, isClosed chan<- bool) {
|
||||
chunks := split.Stream(r, 10*1024*1024)
|
||||
params, _ := encoding.ValidateParams(8, 8, encoding.Cauchy)
|
||||
encoder := encoding.NewErasure(params)
|
||||
chunkCount := 0
|
||||
totalLength := 0
|
||||
summer := md5.New()
|
||||
for chunk := range chunks {
|
||||
if chunk.Err == nil {
|
||||
totalLength = totalLength + len(chunk.Data)
|
||||
encodedBlocks, _ := encoder.Encode(chunk.Data)
|
||||
summer.Write(chunk.Data)
|
||||
for blockIndex, block := range encodedBlocks {
|
||||
io.Copy(eWriter.writers[blockIndex], bytes.NewBuffer(block))
|
||||
}
|
||||
}
|
||||
chunkCount = chunkCount + 1
|
||||
}
|
||||
dataMd5sum := summer.Sum(nil)
|
||||
metadata := make(map[string]string)
|
||||
metadata["blockSize"] = strconv.Itoa(10 * 1024 * 1024)
|
||||
metadata["chunkCount"] = strconv.Itoa(chunkCount)
|
||||
metadata["created"] = time.Now().Format(time.RFC3339Nano)
|
||||
metadata["erasureK"] = "8"
|
||||
metadata["erasureM"] = "8"
|
||||
metadata["erasureTechnique"] = "Cauchy"
|
||||
metadata["md5"] = hex.EncodeToString(dataMd5sum)
|
||||
metadata["size"] = strconv.Itoa(totalLength)
|
||||
for _, nodeWriter := range eWriter.writers {
|
||||
if nodeWriter != nil {
|
||||
nodeWriter.SetMetadata(eWriter.metadata)
|
||||
nodeWriter.SetDonutMetadata(metadata)
|
||||
nodeWriter.Close()
|
||||
}
|
||||
}
|
||||
isClosed <- true
|
||||
}
|
||||
|
||||
func (eWriter erasureWriter) Write(data []byte) (int, error) {
|
||||
io.Copy(eWriter.erasureWriter, bytes.NewBuffer(data))
|
||||
return len(data), nil
|
||||
}
|
||||
|
||||
func (eWriter erasureWriter) Close() error {
|
||||
eWriter.erasureWriter.Close()
|
||||
<-eWriter.isClosed
|
||||
return nil
|
||||
}
|
||||
|
||||
func (eWriter erasureWriter) CloseWithError(err error) error {
|
||||
for _, writer := range eWriter.writers {
|
||||
if writer != nil {
|
||||
writer.CloseWithError(err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (eWriter erasureWriter) SetMetadata(metadata map[string]string) error {
|
||||
for k := range metadata {
|
||||
if strings.HasPrefix(k, "sys.") {
|
||||
return errors.New("Invalid key '" + k + "', cannot start with sys.'")
|
||||
}
|
||||
}
|
||||
for k := range eWriter.metadata {
|
||||
delete(eWriter.metadata, k)
|
||||
}
|
||||
for k, v := range metadata {
|
||||
eWriter.metadata[k] = v
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (eWriter erasureWriter) GetMetadata() (map[string]string, error) {
|
||||
metadata := make(map[string]string)
|
||||
for k, v := range eWriter.metadata {
|
||||
metadata[k] = v
|
||||
}
|
||||
return metadata, nil
|
||||
}
|
||||
@@ -1,51 +0,0 @@
|
||||
package donut
|
||||
|
||||
import (
|
||||
"io"
|
||||
)
|
||||
|
||||
// Collection of Donut specification interfaces
|
||||
|
||||
// Donut interface
|
||||
type Donut interface {
|
||||
CreateBucket(bucket string) error
|
||||
GetObjectReader(bucket, object string) (io.ReadCloser, error)
|
||||
GetObjectWriter(bucket, object string) (ObjectWriter, error)
|
||||
GetObjectMetadata(bucket, object string) (map[string]string, error)
|
||||
ListBuckets() ([]string, error)
|
||||
ListObjects(bucket string) ([]string, error)
|
||||
}
|
||||
|
||||
// Bucket interface
|
||||
type Bucket interface {
|
||||
GetNodes() ([]string, error)
|
||||
AddNode(nodeID, bucketID string) error
|
||||
}
|
||||
|
||||
// Node interface
|
||||
type Node interface {
|
||||
CreateBucket(bucket string) error
|
||||
GetBuckets() ([]string, error)
|
||||
GetDonutMetadata(bucket, object string) (map[string]string, error)
|
||||
GetMetadata(bucket, object string) (map[string]string, error)
|
||||
GetReader(bucket, object string) (io.ReadCloser, error)
|
||||
GetWriter(bucket, object string) (Writer, error)
|
||||
ListObjects(bucket string) ([]string, error)
|
||||
}
|
||||
|
||||
// ObjectWriter interface
|
||||
type ObjectWriter interface {
|
||||
Close() error
|
||||
CloseWithError(error) error
|
||||
GetMetadata() (map[string]string, error)
|
||||
SetMetadata(map[string]string) error
|
||||
Write([]byte) (int, error)
|
||||
}
|
||||
|
||||
// Writer interface
|
||||
type Writer interface {
|
||||
ObjectWriter
|
||||
|
||||
GetDonutMetadata() (map[string]string, error)
|
||||
SetDonutMetadata(map[string]string) error
|
||||
}
|
||||
@@ -1,100 +0,0 @@
|
||||
package donut
|
||||
|
||||
import (
|
||||
"io"
|
||||
"os"
|
||||
"path"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
"encoding/json"
|
||||
"io/ioutil"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/minio-io/iodine"
|
||||
)
|
||||
|
||||
type localDirectoryNode struct {
|
||||
root string
|
||||
}
|
||||
|
||||
func (d localDirectoryNode) CreateBucket(bucket string) error {
|
||||
objectPath := path.Join(d.root, bucket)
|
||||
return iodine.New(os.MkdirAll(objectPath, 0700), map[string]string{"bucket": bucket})
|
||||
}
|
||||
|
||||
func (d localDirectoryNode) GetBuckets() ([]string, error) {
|
||||
files, err := ioutil.ReadDir(d.root)
|
||||
if err != nil {
|
||||
return nil, iodine.New(err, nil)
|
||||
}
|
||||
var results []string
|
||||
for _, file := range files {
|
||||
if file.IsDir() {
|
||||
results = append(results, file.Name())
|
||||
}
|
||||
}
|
||||
return results, nil
|
||||
}
|
||||
|
||||
func (d localDirectoryNode) GetWriter(bucket, object string) (Writer, error) {
|
||||
errParams := map[string]string{"bucket": bucket, "object": object}
|
||||
objectPath := path.Join(d.root, bucket, object)
|
||||
err := os.MkdirAll(objectPath, 0700)
|
||||
if err != nil {
|
||||
return nil, iodine.New(err, errParams)
|
||||
}
|
||||
writer, err := newDonutObjectWriter(objectPath)
|
||||
return writer, iodine.New(err, errParams)
|
||||
}
|
||||
|
||||
func (d localDirectoryNode) GetReader(bucket, object string) (io.ReadCloser, error) {
|
||||
reader, err := os.Open(path.Join(d.root, bucket, object, "data"))
|
||||
return reader, iodine.New(err, map[string]string{"bucket": bucket, "object": object})
|
||||
}
|
||||
|
||||
func (d localDirectoryNode) GetMetadata(bucket, object string) (map[string]string, error) {
|
||||
m, err := d.getMetadata(bucket, object, "metadata.json")
|
||||
return m, iodine.New(err, map[string]string{"bucket": bucket, "object": object})
|
||||
}
|
||||
func (d localDirectoryNode) GetDonutMetadata(bucket, object string) (map[string]string, error) {
|
||||
m, err := d.getMetadata(bucket, object, "donutMetadata.json")
|
||||
return m, iodine.New(err, map[string]string{"bucket": bucket, "object": object})
|
||||
}
|
||||
|
||||
func (d localDirectoryNode) getMetadata(bucket, object, fileName string) (map[string]string, error) {
|
||||
errParams := map[string]string{"bucket": bucket, "object": object, "file": fileName}
|
||||
file, err := os.Open(path.Join(d.root, bucket, object, fileName))
|
||||
defer file.Close()
|
||||
if err != nil {
|
||||
return nil, iodine.New(err, errParams)
|
||||
}
|
||||
metadata := make(map[string]string)
|
||||
decoder := json.NewDecoder(file)
|
||||
if err := decoder.Decode(&metadata); err != nil {
|
||||
return nil, iodine.New(err, errParams)
|
||||
}
|
||||
return metadata, nil
|
||||
|
||||
}
|
||||
|
||||
func (d localDirectoryNode) ListObjects(bucketName string) ([]string, error) {
|
||||
errParams := map[string]string{"bucket": bucketName}
|
||||
prefix := path.Join(d.root, bucketName)
|
||||
var objects []string
|
||||
if err := filepath.Walk(prefix, func(path string, info os.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
return iodine.New(err, errParams)
|
||||
}
|
||||
if !info.IsDir() && strings.HasSuffix(path, "data") {
|
||||
object := strings.TrimPrefix(path, prefix+"/")
|
||||
object = strings.TrimSuffix(object, "/data")
|
||||
objects = append(objects, object)
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, iodine.New(err, errParams)
|
||||
}
|
||||
sort.Strings(objects)
|
||||
return objects, nil
|
||||
}
|
||||
@@ -1,91 +0,0 @@
|
||||
package donut
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
|
||||
"github.com/minio-io/iodine"
|
||||
)
|
||||
|
||||
func newDonutObjectWriter(objectDir string) (Writer, error) {
|
||||
dataFile, err := os.OpenFile(path.Join(objectDir, "data"), os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0600)
|
||||
if err != nil {
|
||||
return nil, iodine.New(err, map[string]string{"objectDir": objectDir})
|
||||
}
|
||||
return donutObjectWriter{
|
||||
root: objectDir,
|
||||
file: dataFile,
|
||||
metadata: make(map[string]string),
|
||||
donutMetadata: make(map[string]string),
|
||||
}, nil
|
||||
}
|
||||
|
||||
type donutObjectWriter struct {
|
||||
root string
|
||||
file *os.File
|
||||
metadata map[string]string
|
||||
donutMetadata map[string]string
|
||||
err error
|
||||
}
|
||||
|
||||
func (d donutObjectWriter) Write(data []byte) (int, error) {
|
||||
written, err := d.file.Write(data)
|
||||
return written, iodine.New(err, nil)
|
||||
}
|
||||
|
||||
func (d donutObjectWriter) Close() error {
|
||||
if d.err != nil {
|
||||
return iodine.New(d.err, nil)
|
||||
}
|
||||
metadata, _ := json.Marshal(d.metadata)
|
||||
ioutil.WriteFile(path.Join(d.root, "metadata.json"), metadata, 0600)
|
||||
donutMetadata, _ := json.Marshal(d.donutMetadata)
|
||||
ioutil.WriteFile(path.Join(d.root, "donutMetadata.json"), donutMetadata, 0600)
|
||||
|
||||
return iodine.New(d.file.Close(), nil)
|
||||
}
|
||||
|
||||
func (d donutObjectWriter) CloseWithError(err error) error {
|
||||
if d.err != nil {
|
||||
d.err = err
|
||||
}
|
||||
return iodine.New(d.Close(), nil)
|
||||
}
|
||||
|
||||
func (d donutObjectWriter) SetMetadata(metadata map[string]string) error {
|
||||
for k := range d.metadata {
|
||||
delete(d.metadata, k)
|
||||
}
|
||||
for k, v := range metadata {
|
||||
d.metadata[k] = v
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d donutObjectWriter) GetMetadata() (map[string]string, error) {
|
||||
metadata := make(map[string]string)
|
||||
for k, v := range d.metadata {
|
||||
metadata[k] = v
|
||||
}
|
||||
return metadata, nil
|
||||
}
|
||||
|
||||
func (d donutObjectWriter) SetDonutMetadata(metadata map[string]string) error {
|
||||
for k := range d.donutMetadata {
|
||||
delete(d.donutMetadata, k)
|
||||
}
|
||||
for k, v := range metadata {
|
||||
d.donutMetadata[k] = v
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d donutObjectWriter) GetDonutMetadata() (map[string]string, error) {
|
||||
donutMetadata := make(map[string]string)
|
||||
for k, v := range d.donutMetadata {
|
||||
donutMetadata[k] = v
|
||||
}
|
||||
return donutMetadata, nil
|
||||
}
|
||||
Reference in New Issue
Block a user