mirror of https://github.com/minio/minio.git
Update minio-io/donut, minio-io/objectdriver
This commit is contained in:
parent
34d06101cc
commit
ec39155089
|
@ -24,7 +24,7 @@
|
|||
},
|
||||
{
|
||||
"ImportPath": "github.com/minio-io/donut",
|
||||
"Rev": "9d0c663a857103c780414e15d7e426c6fca9984e"
|
||||
"Rev": "2b8c72e28768f47511730adae5e780550fdc0236"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/minio-io/erasure",
|
||||
|
@ -36,7 +36,7 @@
|
|||
},
|
||||
{
|
||||
"ImportPath": "github.com/minio-io/objectdriver",
|
||||
"Rev": "3efb266b6a644b161d8a4d9a1f55bea95a2e391b"
|
||||
"Rev": "64e55f18297dd962102afc7e81ea63488fc4706a"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/stretchr/objx",
|
||||
|
|
|
@ -26,6 +26,8 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/minio-io/iodine"
|
||||
|
||||
"crypto/md5"
|
||||
"encoding/hex"
|
||||
)
|
||||
|
@ -39,8 +41,12 @@ type bucket struct {
|
|||
|
||||
// NewBucket - instantiate a new bucket
|
||||
func NewBucket(bucketName, donutName string, nodes map[string]Node) (Bucket, error) {
|
||||
if bucketName == "" {
|
||||
return nil, errors.New("invalid argument")
|
||||
errParams := map[string]string{
|
||||
"bucketName": bucketName,
|
||||
"donutName": donutName,
|
||||
}
|
||||
if strings.TrimSpace(bucketName) == "" || strings.TrimSpace(donutName) == "" {
|
||||
return nil, iodine.New(errors.New("invalid argument"), errParams)
|
||||
}
|
||||
b := bucket{}
|
||||
b.name = bucketName
|
||||
|
@ -55,27 +61,27 @@ func (b bucket) ListObjects() (map[string]Object, error) {
|
|||
for _, node := range b.nodes {
|
||||
disks, err := node.ListDisks()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, iodine.New(err, nil)
|
||||
}
|
||||
for _, disk := range disks {
|
||||
bucketSlice := fmt.Sprintf("%s$%d$%d", b.name, nodeSlice, disk.GetOrder())
|
||||
bucketPath := path.Join(b.donutName, bucketSlice)
|
||||
objects, err := disk.ListDir(bucketPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, iodine.New(err, nil)
|
||||
}
|
||||
for _, object := range objects {
|
||||
newObject, err := NewObject(object.Name(), path.Join(disk.GetPath(), bucketPath))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, iodine.New(err, nil)
|
||||
}
|
||||
newObjectMetadata, err := newObject.GetObjectMetadata()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, iodine.New(err, nil)
|
||||
}
|
||||
objectName, ok := newObjectMetadata["object"]
|
||||
if !ok {
|
||||
return nil, errors.New("object corrupted")
|
||||
return nil, iodine.New(errors.New("object corrupted"), nil)
|
||||
}
|
||||
b.objects[objectName] = newObject
|
||||
}
|
||||
|
@ -90,82 +96,101 @@ func (b bucket) GetObject(objectName string) (reader io.ReadCloser, size int64,
|
|||
// get list of objects
|
||||
objects, err := b.ListObjects()
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
return nil, 0, iodine.New(err, nil)
|
||||
}
|
||||
// check if object exists
|
||||
object, ok := objects[objectName]
|
||||
if !ok {
|
||||
return nil, 0, os.ErrNotExist
|
||||
return nil, 0, iodine.New(os.ErrNotExist, nil)
|
||||
}
|
||||
objectMetadata, err := object.GetObjectMetadata()
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
return nil, 0, iodine.New(err, nil)
|
||||
}
|
||||
if objectName == "" || writer == nil || len(objectMetadata) == 0 {
|
||||
return nil, 0, errors.New("invalid argument")
|
||||
return nil, 0, iodine.New(errors.New("invalid argument"), nil)
|
||||
}
|
||||
size, err = strconv.ParseInt(objectMetadata["size"], 10, 64)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
return nil, 0, iodine.New(err, nil)
|
||||
}
|
||||
go b.readEncodedData(b.normalizeObjectName(objectName), writer, objectMetadata)
|
||||
donutObjectMetadata, err := object.GetDonutObjectMetadata()
|
||||
if err != nil {
|
||||
return nil, 0, iodine.New(err, nil)
|
||||
}
|
||||
go b.readEncodedData(b.normalizeObjectName(objectName), writer, donutObjectMetadata)
|
||||
return reader, size, nil
|
||||
}
|
||||
|
||||
func (b bucket) PutObject(objectName string, objectData io.Reader, metadata map[string]string) error {
|
||||
func (b bucket) PutObject(objectName string, objectData io.Reader, expectedMD5Sum string, metadata map[string]string) error {
|
||||
if objectName == "" || objectData == nil {
|
||||
return errors.New("invalid argument")
|
||||
}
|
||||
contentType, ok := metadata["contentType"]
|
||||
if !ok || strings.TrimSpace(contentType) == "" {
|
||||
contentType = "application/octet-stream"
|
||||
return iodine.New(errors.New("invalid argument"), nil)
|
||||
}
|
||||
writers, err := b.getDiskWriters(b.normalizeObjectName(objectName), "data")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, writer := range writers {
|
||||
defer writer.Close()
|
||||
return iodine.New(err, nil)
|
||||
}
|
||||
summer := md5.New()
|
||||
objectMetadata := make(map[string]string)
|
||||
donutObjectMetadata := make(map[string]string)
|
||||
objectMetadata["version"] = "1.0"
|
||||
donutObjectMetadata["version"] = "1.0"
|
||||
switch len(writers) == 1 {
|
||||
case true:
|
||||
mw := io.MultiWriter(writers[0], summer)
|
||||
totalLength, err := io.Copy(mw, objectData)
|
||||
if err != nil {
|
||||
return err
|
||||
return iodine.New(err, nil)
|
||||
}
|
||||
donutObjectMetadata["sys.size"] = strconv.FormatInt(totalLength, 10)
|
||||
objectMetadata["size"] = strconv.FormatInt(totalLength, 10)
|
||||
case false:
|
||||
k, m, err := b.getDataAndParity(len(writers))
|
||||
if err != nil {
|
||||
return err
|
||||
return iodine.New(err, nil)
|
||||
}
|
||||
chunkCount, totalLength, err := b.writeEncodedData(k, m, writers, objectData, summer)
|
||||
if err != nil {
|
||||
return err
|
||||
return iodine.New(err, nil)
|
||||
}
|
||||
objectMetadata["blockSize"] = strconv.Itoa(10 * 1024 * 1024)
|
||||
objectMetadata["chunkCount"] = strconv.Itoa(chunkCount)
|
||||
objectMetadata["erasureK"] = strconv.FormatUint(uint64(k), 10)
|
||||
objectMetadata["erasureM"] = strconv.FormatUint(uint64(m), 10)
|
||||
objectMetadata["erasureTechnique"] = "Cauchy"
|
||||
donutObjectMetadata["sys.blockSize"] = strconv.Itoa(10 * 1024 * 1024)
|
||||
donutObjectMetadata["sys.chunkCount"] = strconv.Itoa(chunkCount)
|
||||
donutObjectMetadata["sys.erasureK"] = strconv.FormatUint(uint64(k), 10)
|
||||
donutObjectMetadata["sys.erasureM"] = strconv.FormatUint(uint64(m), 10)
|
||||
donutObjectMetadata["sys.erasureTechnique"] = "Cauchy"
|
||||
donutObjectMetadata["sys.size"] = strconv.Itoa(totalLength)
|
||||
objectMetadata["size"] = strconv.Itoa(totalLength)
|
||||
}
|
||||
dataMd5sum := summer.Sum(nil)
|
||||
objectMetadata["created"] = time.Now().Format(time.RFC3339Nano)
|
||||
objectMetadata["md5"] = hex.EncodeToString(dataMd5sum)
|
||||
if _, ok := metadata["expectedMd5Sum"]; ok {
|
||||
if err := b.isMD5SumEqual(metadata["expectedMd5sum"], objectMetadata["md5"]); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
objectMetadata["bucket"] = b.name
|
||||
objectMetadata["object"] = objectName
|
||||
objectMetadata["contentType"] = strings.TrimSpace(contentType)
|
||||
if err := b.writeDonutObjectMetadata(b.normalizeObjectName(objectName), objectMetadata); err != nil {
|
||||
return err
|
||||
for k, v := range metadata {
|
||||
objectMetadata[k] = v
|
||||
}
|
||||
dataMd5sum := summer.Sum(nil)
|
||||
objectMetadata["created"] = time.Now().Format(time.RFC3339Nano)
|
||||
|
||||
// keeping md5sum for the object in two different places
|
||||
// one for object storage and another is for internal use
|
||||
objectMetadata["md5"] = hex.EncodeToString(dataMd5sum)
|
||||
donutObjectMetadata["sys.md5"] = hex.EncodeToString(dataMd5sum)
|
||||
|
||||
// Verify if the written object is equal to what is expected, only if it is requested as such
|
||||
if strings.TrimSpace(expectedMD5Sum) != "" {
|
||||
if err := b.isMD5SumEqual(strings.TrimSpace(expectedMD5Sum), objectMetadata["md5"]); err != nil {
|
||||
return iodine.New(err, nil)
|
||||
}
|
||||
}
|
||||
// write donut specific metadata
|
||||
if err := b.writeDonutObjectMetadata(b.normalizeObjectName(objectName), donutObjectMetadata); err != nil {
|
||||
return iodine.New(err, nil)
|
||||
}
|
||||
// write object specific metadata
|
||||
if err := b.writeObjectMetadata(b.normalizeObjectName(objectName), objectMetadata); err != nil {
|
||||
return iodine.New(err, nil)
|
||||
}
|
||||
// close all writers, when control flow reaches here
|
||||
for _, writer := range writers {
|
||||
writer.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -19,7 +19,6 @@ package donut
|
|||
import (
|
||||
"bytes"
|
||||
"crypto/md5"
|
||||
"encoding/base64"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
|
@ -36,7 +35,7 @@ import (
|
|||
|
||||
func (b bucket) isMD5SumEqual(expectedMD5Sum, actualMD5Sum string) error {
|
||||
if strings.TrimSpace(expectedMD5Sum) != "" && strings.TrimSpace(actualMD5Sum) != "" {
|
||||
expectedMD5SumBytes, err := base64.StdEncoding.DecodeString(expectedMD5Sum)
|
||||
expectedMD5SumBytes, err := hex.DecodeString(expectedMD5Sum)
|
||||
if err != nil {
|
||||
return iodine.New(err, nil)
|
||||
}
|
||||
|
@ -52,13 +51,13 @@ func (b bucket) isMD5SumEqual(expectedMD5Sum, actualMD5Sum string) error {
|
|||
return iodine.New(errors.New("invalid argument"), nil)
|
||||
}
|
||||
|
||||
func (b bucket) writeDonutObjectMetadata(objectName string, objectMetadata map[string]string) error {
|
||||
func (b bucket) writeObjectMetadata(objectName string, objectMetadata map[string]string) error {
|
||||
if len(objectMetadata) == 0 {
|
||||
return errors.New("invalid argument")
|
||||
return iodine.New(errors.New("invalid argument"), nil)
|
||||
}
|
||||
objectMetadataWriters, err := b.getDiskWriters(objectName, objectMetadataConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
return iodine.New(err, nil)
|
||||
}
|
||||
for _, objectMetadataWriter := range objectMetadataWriters {
|
||||
defer objectMetadataWriter.Close()
|
||||
|
@ -66,7 +65,27 @@ func (b bucket) writeDonutObjectMetadata(objectName string, objectMetadata map[s
|
|||
for _, objectMetadataWriter := range objectMetadataWriters {
|
||||
jenc := json.NewEncoder(objectMetadataWriter)
|
||||
if err := jenc.Encode(objectMetadata); err != nil {
|
||||
return err
|
||||
return iodine.New(err, nil)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b bucket) writeDonutObjectMetadata(objectName string, objectMetadata map[string]string) error {
|
||||
if len(objectMetadata) == 0 {
|
||||
return iodine.New(errors.New("invalid argument"), nil)
|
||||
}
|
||||
objectMetadataWriters, err := b.getDiskWriters(objectName, donutObjectMetadataConfig)
|
||||
if err != nil {
|
||||
return iodine.New(err, nil)
|
||||
}
|
||||
for _, objectMetadataWriter := range objectMetadataWriters {
|
||||
defer objectMetadataWriter.Close()
|
||||
}
|
||||
for _, objectMetadataWriter := range objectMetadataWriters {
|
||||
jenc := json.NewEncoder(objectMetadataWriter)
|
||||
if err := jenc.Encode(objectMetadata); err != nil {
|
||||
return iodine.New(err, nil)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
@ -80,12 +99,12 @@ func (b bucket) normalizeObjectName(objectName string) string {
|
|||
|
||||
func (b bucket) getDataAndParity(totalWriters int) (k uint8, m uint8, err error) {
|
||||
if totalWriters <= 1 {
|
||||
return 0, 0, errors.New("invalid argument")
|
||||
return 0, 0, iodine.New(errors.New("invalid argument"), nil)
|
||||
}
|
||||
quotient := totalWriters / 2 // not using float or abs to let integer round off to lower value
|
||||
// quotient cannot be bigger than (255 / 2) = 127
|
||||
if quotient > 127 {
|
||||
return 0, 0, errors.New("parity over flow")
|
||||
return 0, 0, iodine.New(errors.New("parity over flow"), nil)
|
||||
}
|
||||
remainder := totalWriters % 2 // will be 1 for odd and 0 for even numbers
|
||||
k = uint8(quotient + remainder)
|
||||
|
@ -97,7 +116,7 @@ func (b bucket) writeEncodedData(k, m uint8, writers []io.WriteCloser, objectDat
|
|||
chunks := split.Stream(objectData, 10*1024*1024)
|
||||
encoder, err := NewEncoder(k, m, "Cauchy")
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
return 0, 0, iodine.New(err, nil)
|
||||
}
|
||||
chunkCount := 0
|
||||
totalLength := 0
|
||||
|
@ -109,7 +128,7 @@ func (b bucket) writeEncodedData(k, m uint8, writers []io.WriteCloser, objectDat
|
|||
for blockIndex, block := range encodedBlocks {
|
||||
_, err := io.Copy(writers[blockIndex], bytes.NewBuffer(block))
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
return 0, 0, iodine.New(err, nil)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -118,45 +137,46 @@ func (b bucket) writeEncodedData(k, m uint8, writers []io.WriteCloser, objectDat
|
|||
return chunkCount, totalLength, nil
|
||||
}
|
||||
|
||||
func (b bucket) readEncodedData(objectName string, writer *io.PipeWriter, objectMetadata map[string]string) {
|
||||
expectedMd5sum, err := hex.DecodeString(objectMetadata["md5"])
|
||||
func (b bucket) readEncodedData(objectName string, writer *io.PipeWriter, donutObjectMetadata map[string]string) {
|
||||
expectedMd5sum, err := hex.DecodeString(donutObjectMetadata["sys.md5"])
|
||||
if err != nil {
|
||||
writer.CloseWithError(err)
|
||||
writer.CloseWithError(iodine.New(err, nil))
|
||||
return
|
||||
}
|
||||
readers, err := b.getDiskReaders(objectName, "data")
|
||||
if err != nil {
|
||||
writer.CloseWithError(err)
|
||||
writer.CloseWithError(iodine.New(err, nil))
|
||||
return
|
||||
}
|
||||
hasher := md5.New()
|
||||
mwriter := io.MultiWriter(writer, hasher)
|
||||
switch len(readers) == 1 {
|
||||
case false:
|
||||
totalChunks, totalLeft, blockSize, k, m, err := b.metadata2Values(objectMetadata)
|
||||
totalChunks, totalLeft, blockSize, k, m, err := b.metadata2Values(donutObjectMetadata)
|
||||
if err != nil {
|
||||
writer.CloseWithError(err)
|
||||
writer.CloseWithError(iodine.New(err, nil))
|
||||
return
|
||||
}
|
||||
technique, ok := objectMetadata["erasureTechnique"]
|
||||
technique, ok := donutObjectMetadata["sys.erasureTechnique"]
|
||||
if !ok {
|
||||
writer.CloseWithError(errors.New("missing erasure Technique"))
|
||||
err := errors.New("missing erasure Technique")
|
||||
writer.CloseWithError(iodine.New(err, nil))
|
||||
return
|
||||
}
|
||||
encoder, err := NewEncoder(uint8(k), uint8(m), technique)
|
||||
if err != nil {
|
||||
writer.CloseWithError(err)
|
||||
writer.CloseWithError(iodine.New(err, nil))
|
||||
return
|
||||
}
|
||||
for i := 0; i < totalChunks; i++ {
|
||||
decodedData, err := b.decodeData(totalLeft, blockSize, readers, encoder, writer)
|
||||
if err != nil {
|
||||
writer.CloseWithError(err)
|
||||
writer.CloseWithError(iodine.New(err, nil))
|
||||
return
|
||||
}
|
||||
_, err = io.Copy(mwriter, bytes.NewBuffer(decodedData))
|
||||
if err != nil {
|
||||
writer.CloseWithError(err)
|
||||
writer.CloseWithError(iodine.New(err, nil))
|
||||
return
|
||||
}
|
||||
totalLeft = totalLeft - int64(blockSize)
|
||||
|
@ -164,13 +184,14 @@ func (b bucket) readEncodedData(objectName string, writer *io.PipeWriter, object
|
|||
case true:
|
||||
_, err := io.Copy(writer, readers[0])
|
||||
if err != nil {
|
||||
writer.CloseWithError(err)
|
||||
writer.CloseWithError(iodine.New(err, nil))
|
||||
return
|
||||
}
|
||||
}
|
||||
// check if decodedData md5sum matches
|
||||
if !bytes.Equal(expectedMd5sum, hasher.Sum(nil)) {
|
||||
writer.CloseWithError(errors.New("checksum mismatch"))
|
||||
err := errors.New("checksum mismatch")
|
||||
writer.CloseWithError(iodine.New(err, nil))
|
||||
return
|
||||
}
|
||||
writer.Close()
|
||||
|
@ -186,31 +207,46 @@ func (b bucket) decodeData(totalLeft, blockSize int64, readers []io.ReadCloser,
|
|||
}
|
||||
curChunkSize, err := encoder.GetEncodedBlockLen(int(curBlockSize))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, iodine.New(err, nil)
|
||||
}
|
||||
encodedBytes := make([][]byte, len(readers))
|
||||
for i, reader := range readers {
|
||||
var bytesBuffer bytes.Buffer
|
||||
_, err := io.CopyN(&bytesBuffer, reader, int64(curChunkSize))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, iodine.New(err, nil)
|
||||
}
|
||||
encodedBytes[i] = bytesBuffer.Bytes()
|
||||
}
|
||||
decodedData, err := encoder.Decode(encodedBytes, int(curBlockSize))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, iodine.New(err, nil)
|
||||
}
|
||||
return decodedData, nil
|
||||
}
|
||||
|
||||
func (b bucket) metadata2Values(objectMetadata map[string]string) (totalChunks int, totalLeft, blockSize int64, k, m uint64, err error) {
|
||||
totalChunks, err = strconv.Atoi(objectMetadata["chunkCount"])
|
||||
totalLeft, err = strconv.ParseInt(objectMetadata["size"], 10, 64)
|
||||
blockSize, err = strconv.ParseInt(objectMetadata["blockSize"], 10, 64)
|
||||
k, err = strconv.ParseUint(objectMetadata["erasureK"], 10, 8)
|
||||
m, err = strconv.ParseUint(objectMetadata["erasureM"], 10, 8)
|
||||
return
|
||||
func (b bucket) metadata2Values(donutObjectMetadata map[string]string) (totalChunks int, totalLeft, blockSize int64, k, m uint64, err error) {
|
||||
totalChunks, err = strconv.Atoi(donutObjectMetadata["sys.chunkCount"])
|
||||
if err != nil {
|
||||
return 0, 0, 0, 0, 0, iodine.New(err, nil)
|
||||
}
|
||||
totalLeft, err = strconv.ParseInt(donutObjectMetadata["sys.size"], 10, 64)
|
||||
if err != nil {
|
||||
return 0, 0, 0, 0, 0, iodine.New(err, nil)
|
||||
}
|
||||
blockSize, err = strconv.ParseInt(donutObjectMetadata["sys.blockSize"], 10, 64)
|
||||
if err != nil {
|
||||
return 0, 0, 0, 0, 0, iodine.New(err, nil)
|
||||
}
|
||||
k, err = strconv.ParseUint(donutObjectMetadata["sys.erasureK"], 10, 8)
|
||||
if err != nil {
|
||||
return 0, 0, 0, 0, 0, iodine.New(err, nil)
|
||||
}
|
||||
m, err = strconv.ParseUint(donutObjectMetadata["sys.erasureM"], 10, 8)
|
||||
if err != nil {
|
||||
return 0, 0, 0, 0, 0, iodine.New(err, nil)
|
||||
}
|
||||
return totalChunks, totalLeft, blockSize, k, m, nil
|
||||
}
|
||||
|
||||
func (b bucket) getDiskReaders(objectName, objectMeta string) ([]io.ReadCloser, error) {
|
||||
|
@ -219,7 +255,7 @@ func (b bucket) getDiskReaders(objectName, objectMeta string) ([]io.ReadCloser,
|
|||
for _, node := range b.nodes {
|
||||
disks, err := node.ListDisks()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, iodine.New(err, nil)
|
||||
}
|
||||
readers = make([]io.ReadCloser, len(disks))
|
||||
for _, disk := range disks {
|
||||
|
@ -227,7 +263,7 @@ func (b bucket) getDiskReaders(objectName, objectMeta string) ([]io.ReadCloser,
|
|||
objectPath := path.Join(b.donutName, bucketSlice, objectName, objectMeta)
|
||||
objectSlice, err := disk.OpenFile(objectPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, iodine.New(err, nil)
|
||||
}
|
||||
readers[disk.GetOrder()] = objectSlice
|
||||
}
|
||||
|
@ -242,7 +278,7 @@ func (b bucket) getDiskWriters(objectName, objectMeta string) ([]io.WriteCloser,
|
|||
for _, node := range b.nodes {
|
||||
disks, err := node.ListDisks()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, iodine.New(err, nil)
|
||||
}
|
||||
writers = make([]io.WriteCloser, len(disks))
|
||||
for _, disk := range disks {
|
||||
|
@ -250,7 +286,7 @@ func (b bucket) getDiskWriters(objectName, objectMeta string) ([]io.WriteCloser,
|
|||
objectPath := path.Join(b.donutName, bucketSlice, objectName, objectMeta)
|
||||
objectSlice, err := disk.MakeFile(objectPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, iodine.New(err, nil)
|
||||
}
|
||||
writers[disk.GetOrder()] = objectSlice
|
||||
}
|
||||
|
|
|
@ -23,6 +23,8 @@ import (
|
|||
"syscall"
|
||||
|
||||
"io/ioutil"
|
||||
|
||||
"github.com/minio-io/iodine"
|
||||
)
|
||||
|
||||
type disk struct {
|
||||
|
@ -34,19 +36,19 @@ type disk struct {
|
|||
// NewDisk - instantiate new disk
|
||||
func NewDisk(diskPath string, diskOrder int) (Disk, error) {
|
||||
if diskPath == "" || diskOrder < 0 {
|
||||
return nil, errors.New("invalid argument")
|
||||
return nil, iodine.New(errors.New("invalid argument"), nil)
|
||||
}
|
||||
s := syscall.Statfs_t{}
|
||||
err := syscall.Statfs(diskPath, &s)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, iodine.New(err, nil)
|
||||
}
|
||||
st, err := os.Stat(diskPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, iodine.New(err, nil)
|
||||
}
|
||||
if !st.IsDir() {
|
||||
return nil, syscall.ENOTDIR
|
||||
return nil, iodine.New(syscall.ENOTDIR, nil)
|
||||
}
|
||||
d := disk{
|
||||
root: diskPath,
|
||||
|
@ -58,7 +60,7 @@ func NewDisk(diskPath string, diskOrder int) (Disk, error) {
|
|||
d.filesystem["MountPoint"] = d.root
|
||||
return d, nil
|
||||
}
|
||||
return nil, errors.New("unsupported filesystem")
|
||||
return nil, iodine.New(errors.New("unsupported filesystem"), nil)
|
||||
}
|
||||
|
||||
func (d disk) GetPath() string {
|
||||
|
@ -87,7 +89,7 @@ func (d disk) MakeDir(dirname string) error {
|
|||
func (d disk) ListDir(dirname string) ([]os.FileInfo, error) {
|
||||
contents, err := ioutil.ReadDir(path.Join(d.root, dirname))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, iodine.New(err, nil)
|
||||
}
|
||||
var directories []os.FileInfo
|
||||
for _, content := range contents {
|
||||
|
@ -102,7 +104,7 @@ func (d disk) ListDir(dirname string) ([]os.FileInfo, error) {
|
|||
func (d disk) ListFiles(dirname string) ([]os.FileInfo, error) {
|
||||
contents, err := ioutil.ReadDir(path.Join(d.root, dirname))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, iodine.New(err, nil)
|
||||
}
|
||||
var files []os.FileInfo
|
||||
for _, content := range contents {
|
||||
|
@ -116,35 +118,27 @@ func (d disk) ListFiles(dirname string) ([]os.FileInfo, error) {
|
|||
|
||||
func (d disk) MakeFile(filename string) (*os.File, error) {
|
||||
if filename == "" {
|
||||
return nil, errors.New("Invalid argument")
|
||||
return nil, iodine.New(errors.New("Invalid argument"), nil)
|
||||
}
|
||||
filePath := path.Join(d.root, filename)
|
||||
// Create directories if they don't exist
|
||||
if err := os.MkdirAll(path.Dir(filePath), 0700); err != nil {
|
||||
return nil, err
|
||||
return nil, iodine.New(err, nil)
|
||||
}
|
||||
dataFile, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0600)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, iodine.New(err, nil)
|
||||
}
|
||||
return dataFile, nil
|
||||
}
|
||||
|
||||
func (d disk) OpenFile(filename string) (*os.File, error) {
|
||||
if filename == "" {
|
||||
return nil, errors.New("Invalid argument")
|
||||
return nil, iodine.New(errors.New("Invalid argument"), nil)
|
||||
}
|
||||
dataFile, err := os.Open(path.Join(d.root, filename))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, iodine.New(err, nil)
|
||||
}
|
||||
return dataFile, nil
|
||||
}
|
||||
|
||||
func (d disk) SaveConfig() error {
|
||||
return errors.New("Not Implemented")
|
||||
}
|
||||
|
||||
func (d disk) LoadConfig() error {
|
||||
return errors.New("Not Implemented")
|
||||
}
|
||||
|
|
|
@ -16,7 +16,11 @@
|
|||
|
||||
package donut
|
||||
|
||||
import "errors"
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"github.com/minio-io/iodine"
|
||||
)
|
||||
|
||||
type donut struct {
|
||||
name string
|
||||
|
@ -24,28 +28,34 @@ type donut struct {
|
|||
nodes map[string]Node
|
||||
}
|
||||
|
||||
const (
|
||||
donutObjectMetadataConfig = "donutObjectMetadata.json"
|
||||
objectMetadataConfig = "objectMetadata.json"
|
||||
donutConfig = "donutMetadata.json"
|
||||
)
|
||||
|
||||
// attachDonutNode - wrapper function to instantiate a new node for associated donut
|
||||
// based on the configuration
|
||||
func (d donut) attachDonutNode(hostname string, disks []string) error {
|
||||
node, err := NewNode(hostname)
|
||||
if err != nil {
|
||||
return err
|
||||
return iodine.New(err, nil)
|
||||
}
|
||||
for i, disk := range disks {
|
||||
// Order is necessary for maps, keep order number separately
|
||||
newDisk, err := NewDisk(disk, i)
|
||||
if err != nil {
|
||||
return err
|
||||
return iodine.New(err, nil)
|
||||
}
|
||||
if err := newDisk.MakeDir(d.name); err != nil {
|
||||
return err
|
||||
return iodine.New(err, nil)
|
||||
}
|
||||
if err := node.AttachDisk(newDisk); err != nil {
|
||||
return err
|
||||
return iodine.New(err, nil)
|
||||
}
|
||||
}
|
||||
if err := d.AttachNode(node); err != nil {
|
||||
return err
|
||||
return iodine.New(err, nil)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -53,7 +63,7 @@ func (d donut) attachDonutNode(hostname string, disks []string) error {
|
|||
// NewDonut - instantiate a new donut
|
||||
func NewDonut(donutName string, nodeDiskMap map[string][]string) (Donut, error) {
|
||||
if donutName == "" || len(nodeDiskMap) == 0 {
|
||||
return nil, errors.New("invalid argument")
|
||||
return nil, iodine.New(errors.New("invalid argument"), nil)
|
||||
}
|
||||
nodes := make(map[string]Node)
|
||||
buckets := make(map[string]Bucket)
|
||||
|
@ -64,11 +74,11 @@ func NewDonut(donutName string, nodeDiskMap map[string][]string) (Donut, error)
|
|||
}
|
||||
for k, v := range nodeDiskMap {
|
||||
if len(v) == 0 {
|
||||
return nil, errors.New("invalid number of disks per node")
|
||||
return nil, iodine.New(errors.New("invalid number of disks per node"), nil)
|
||||
}
|
||||
err := d.attachDonutNode(k, v)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, iodine.New(err, nil)
|
||||
}
|
||||
}
|
||||
return d, nil
|
||||
|
|
|
@ -2,6 +2,8 @@ package donut
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/md5"
|
||||
"encoding/hex"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
|
@ -122,10 +124,42 @@ func (s *MySuite) TestNewObjectFailsWithoutBucket(c *C) {
|
|||
defer os.RemoveAll(root)
|
||||
donut, err := NewDonut("test", createTestNodeDiskMap(root))
|
||||
c.Assert(err, IsNil)
|
||||
err = donut.PutObject("foo", "obj", nil, nil)
|
||||
err = donut.PutObject("foo", "obj", "", nil, nil)
|
||||
c.Assert(err, Not(IsNil))
|
||||
}
|
||||
|
||||
func (s *MySuite) TestNewObjectMetadata(c *C) {
|
||||
root, err := ioutil.TempDir(os.TempDir(), "donut-")
|
||||
c.Assert(err, IsNil)
|
||||
defer os.RemoveAll(root)
|
||||
donut, err := NewDonut("test", createTestNodeDiskMap(root))
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
metadata := make(map[string]string)
|
||||
metadata["contentType"] = "application/json"
|
||||
metadata["foo"] = "value1"
|
||||
metadata["hello"] = "world"
|
||||
|
||||
data := "Hello World"
|
||||
hasher := md5.New()
|
||||
hasher.Write([]byte(data))
|
||||
expectedMd5Sum := hex.EncodeToString(hasher.Sum(nil))
|
||||
reader := ioutil.NopCloser(bytes.NewReader([]byte(data)))
|
||||
|
||||
err = donut.MakeBucket("foo")
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
err = donut.PutObject("foo", "obj", expectedMd5Sum, reader, metadata)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
objectMetadata, err := donut.GetObjectMetadata("foo", "obj")
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
c.Assert(objectMetadata["contentType"], Equals, metadata["contentType"])
|
||||
c.Assert(objectMetadata["foo"], Equals, metadata["foo"])
|
||||
c.Assert(objectMetadata["hello"], Equals, metadata["hello"])
|
||||
}
|
||||
|
||||
func (s *MySuite) TestNewObjectFailsWithEmptyName(c *C) {
|
||||
root, err := ioutil.TempDir(os.TempDir(), "donut-")
|
||||
c.Assert(err, IsNil)
|
||||
|
@ -133,10 +167,10 @@ func (s *MySuite) TestNewObjectFailsWithEmptyName(c *C) {
|
|||
donut, err := NewDonut("test", createTestNodeDiskMap(root))
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
err = donut.PutObject("foo", "", nil, nil)
|
||||
err = donut.PutObject("foo", "", "", nil, nil)
|
||||
c.Assert(err, Not(IsNil))
|
||||
|
||||
err = donut.PutObject("foo", " ", nil, nil)
|
||||
err = donut.PutObject("foo", " ", "", nil, nil)
|
||||
c.Assert(err, Not(IsNil))
|
||||
}
|
||||
|
||||
|
@ -152,11 +186,14 @@ func (s *MySuite) TestNewObjectCanBeWritten(c *C) {
|
|||
|
||||
metadata := make(map[string]string)
|
||||
metadata["contentType"] = "application/octet-stream"
|
||||
|
||||
data := "Hello World"
|
||||
|
||||
hasher := md5.New()
|
||||
hasher.Write([]byte(data))
|
||||
expectedMd5Sum := hex.EncodeToString(hasher.Sum(nil))
|
||||
reader := ioutil.NopCloser(bytes.NewReader([]byte(data)))
|
||||
|
||||
err = donut.PutObject("foo", "obj", reader, metadata)
|
||||
err = donut.PutObject("foo", "obj", expectedMd5Sum, reader, metadata)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
reader, size, err := donut.GetObject("foo", "obj")
|
||||
|
@ -170,8 +207,9 @@ func (s *MySuite) TestNewObjectCanBeWritten(c *C) {
|
|||
|
||||
actualMetadata, err := donut.GetObjectMetadata("foo", "obj")
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert("b10a8db164e0754105b7a99be72e3fe5", Equals, actualMetadata["md5"])
|
||||
c.Assert(expectedMd5Sum, Equals, actualMetadata["md5"])
|
||||
c.Assert("11", Equals, actualMetadata["size"])
|
||||
c.Assert("1.0", Equals, actualMetadata["version"])
|
||||
_, err = time.Parse(time.RFC3339Nano, actualMetadata["created"])
|
||||
c.Assert(err, IsNil)
|
||||
}
|
||||
|
@ -186,11 +224,11 @@ func (s *MySuite) TestMultipleNewObjects(c *C) {
|
|||
c.Assert(donut.MakeBucket("foo"), IsNil)
|
||||
|
||||
one := ioutil.NopCloser(bytes.NewReader([]byte("one")))
|
||||
err = donut.PutObject("foo", "obj1", one, nil)
|
||||
err = donut.PutObject("foo", "obj1", "", one, nil)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
two := ioutil.NopCloser(bytes.NewReader([]byte("two")))
|
||||
err = donut.PutObject("foo", "obj2", two, nil)
|
||||
err = donut.PutObject("foo", "obj2", "", two, nil)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
obj1, size, err := donut.GetObject("foo", "obj1")
|
||||
|
@ -223,7 +261,7 @@ func (s *MySuite) TestMultipleNewObjects(c *C) {
|
|||
c.Assert(listObjects, DeepEquals, []string{"obj1", "obj2"})
|
||||
|
||||
three := ioutil.NopCloser(bytes.NewReader([]byte("three")))
|
||||
err = donut.PutObject("foo", "obj3", three, nil)
|
||||
err = donut.PutObject("foo", "obj3", "", three, nil)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
obj3, size, err := donut.GetObject("foo", "obj3")
|
||||
|
|
|
@ -18,8 +18,10 @@ package donut
|
|||
|
||||
import (
|
||||
"errors"
|
||||
"strconv"
|
||||
|
||||
encoding "github.com/minio-io/erasure"
|
||||
"github.com/minio-io/iodine"
|
||||
)
|
||||
|
||||
type encoder struct {
|
||||
|
@ -36,20 +38,25 @@ func getErasureTechnique(technique string) (encoding.Technique, error) {
|
|||
case technique == "Vandermonde":
|
||||
return encoding.Cauchy, nil
|
||||
default:
|
||||
return encoding.None, errors.New("Invalid erasure technique")
|
||||
return encoding.None, iodine.New(errors.New("Invalid erasure technique"), nil)
|
||||
}
|
||||
}
|
||||
|
||||
// NewEncoder - instantiate a new encoder
|
||||
func NewEncoder(k, m uint8, technique string) (Encoder, error) {
|
||||
errParams := map[string]string{
|
||||
"k": strconv.FormatUint(uint64(k), 10),
|
||||
"m": strconv.FormatUint(uint64(m), 10),
|
||||
"technique": technique,
|
||||
}
|
||||
e := encoder{}
|
||||
t, err := getErasureTechnique(technique)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, iodine.New(err, errParams)
|
||||
}
|
||||
params, err := encoding.ValidateParams(k, m, t)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, iodine.New(err, errParams)
|
||||
}
|
||||
e.encoder = encoding.NewErasure(params)
|
||||
e.k = k
|
||||
|
@ -59,19 +66,19 @@ func NewEncoder(k, m uint8, technique string) (Encoder, error) {
|
|||
}
|
||||
|
||||
func (e encoder) GetEncodedBlockLen(dataLength int) (int, error) {
|
||||
if dataLength == 0 {
|
||||
return 0, errors.New("invalid argument")
|
||||
if dataLength <= 0 {
|
||||
return 0, iodine.New(errors.New("invalid argument"), nil)
|
||||
}
|
||||
return encoding.GetEncodedBlockLen(dataLength, e.k), nil
|
||||
}
|
||||
|
||||
func (e encoder) Encode(data []byte) (encodedData [][]byte, err error) {
|
||||
if data == nil {
|
||||
return nil, errors.New("invalid argument")
|
||||
return nil, iodine.New(errors.New("invalid argument"), nil)
|
||||
}
|
||||
encodedData, err = e.encoder.Encode(data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, iodine.New(err, nil)
|
||||
}
|
||||
return encodedData, nil
|
||||
}
|
||||
|
@ -79,7 +86,7 @@ func (e encoder) Encode(data []byte) (encodedData [][]byte, err error) {
|
|||
func (e encoder) Decode(encodedData [][]byte, dataLength int) (data []byte, err error) {
|
||||
decodedData, err := e.encoder.Decode(encodedData, dataLength)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, iodine.New(err, nil)
|
||||
}
|
||||
return decodedData, nil
|
||||
}
|
||||
|
|
|
@ -43,7 +43,7 @@ type ObjectStorage interface {
|
|||
// Object Operations
|
||||
GetObject(bucket, object string) (io.ReadCloser, int64, error)
|
||||
GetObjectMetadata(bucket, object string) (map[string]string, error)
|
||||
PutObject(bucket, object string, reader io.ReadCloser, metadata map[string]string) error
|
||||
PutObject(bucket, object, expectedMD5Sum string, reader io.ReadCloser, metadata map[string]string) error
|
||||
}
|
||||
|
||||
// Management is a donut management system interface
|
||||
|
@ -71,12 +71,13 @@ type Bucket interface {
|
|||
ListObjects() (map[string]Object, error)
|
||||
|
||||
GetObject(object string) (io.ReadCloser, int64, error)
|
||||
PutObject(object string, contents io.Reader, metadata map[string]string) error
|
||||
PutObject(object string, contents io.Reader, expectedMD5Sum string, metadata map[string]string) error
|
||||
}
|
||||
|
||||
// Object interface
|
||||
type Object interface {
|
||||
GetObjectMetadata() (map[string]string, error)
|
||||
GetDonutObjectMetadata() (map[string]string, error)
|
||||
}
|
||||
|
||||
// Node interface
|
||||
|
@ -104,8 +105,3 @@ type Disk interface {
|
|||
GetOrder() int
|
||||
GetFSInfo() map[string]string
|
||||
}
|
||||
|
||||
const (
|
||||
objectMetadataConfig = "objectMetadata.json"
|
||||
donutConfig = "donutMetadata.json"
|
||||
)
|
||||
|
|
|
@ -4,6 +4,8 @@ import (
|
|||
"encoding/json"
|
||||
"errors"
|
||||
"path"
|
||||
|
||||
"github.com/minio-io/iodine"
|
||||
)
|
||||
|
||||
func (d donut) Heal() error {
|
||||
|
@ -15,7 +17,7 @@ func (d donut) Info() (nodeDiskMap map[string][]string, err error) {
|
|||
for nodeName, node := range d.nodes {
|
||||
disks, err := node.ListDisks()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, iodine.New(err, nil)
|
||||
}
|
||||
diskList := make([]string, len(disks))
|
||||
for diskName, disk := range disks {
|
||||
|
@ -28,7 +30,7 @@ func (d donut) Info() (nodeDiskMap map[string][]string, err error) {
|
|||
|
||||
func (d donut) AttachNode(node Node) error {
|
||||
if node == nil {
|
||||
return errors.New("invalid argument")
|
||||
return iodine.New(errors.New("invalid argument"), nil)
|
||||
}
|
||||
d.nodes[node.GetNodeName()] = node
|
||||
return nil
|
||||
|
@ -43,19 +45,19 @@ func (d donut) SaveConfig() error {
|
|||
for hostname, node := range d.nodes {
|
||||
disks, err := node.ListDisks()
|
||||
if err != nil {
|
||||
return err
|
||||
return iodine.New(err, nil)
|
||||
}
|
||||
for _, disk := range disks {
|
||||
donutConfigPath := path.Join(d.name, donutConfig)
|
||||
donutConfigWriter, err := disk.MakeFile(donutConfigPath)
|
||||
defer donutConfigWriter.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
return iodine.New(err, nil)
|
||||
}
|
||||
nodeDiskMap[hostname][disk.GetOrder()] = disk.GetPath()
|
||||
jenc := json.NewEncoder(donutConfigWriter)
|
||||
if err := jenc.Encode(nodeDiskMap); err != nil {
|
||||
return err
|
||||
return iodine.New(err, nil)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,6 +18,8 @@ package donut
|
|||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"github.com/minio-io/iodine"
|
||||
)
|
||||
|
||||
type node struct {
|
||||
|
@ -28,7 +30,7 @@ type node struct {
|
|||
// NewNode - instantiates a new node
|
||||
func NewNode(hostname string) (Node, error) {
|
||||
if hostname == "" {
|
||||
return nil, errors.New("invalid argument")
|
||||
return nil, iodine.New(errors.New("invalid argument"), nil)
|
||||
}
|
||||
disks := make(map[string]Disk)
|
||||
n := node{
|
||||
|
@ -48,7 +50,7 @@ func (n node) ListDisks() (map[string]Disk, error) {
|
|||
|
||||
func (n node) AttachDisk(disk Disk) error {
|
||||
if disk == nil {
|
||||
return errors.New("Invalid argument")
|
||||
return iodine.New(errors.New("Invalid argument"), nil)
|
||||
}
|
||||
n.disks[disk.GetPath()] = disk
|
||||
return nil
|
||||
|
|
|
@ -28,6 +28,7 @@ type object struct {
|
|||
name string
|
||||
objectPath string
|
||||
objectMetadata map[string]string
|
||||
donutObjectMetadata map[string]string
|
||||
}
|
||||
|
||||
// NewObject - instantiate a new object
|
||||
|
@ -51,5 +52,18 @@ func (o object) GetObjectMetadata() (map[string]string, error) {
|
|||
return nil, err
|
||||
}
|
||||
o.objectMetadata = objectMetadata
|
||||
return objectMetadata, nil
|
||||
return o.objectMetadata, nil
|
||||
}
|
||||
|
||||
func (o object) GetDonutObjectMetadata() (map[string]string, error) {
|
||||
donutObjectMetadata := make(map[string]string)
|
||||
donutObjectMetadataBytes, err := ioutil.ReadFile(path.Join(o.objectPath, donutObjectMetadataConfig))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := json.Unmarshal(donutObjectMetadataBytes, &donutObjectMetadata); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
o.donutObjectMetadata = donutObjectMetadata
|
||||
return o.donutObjectMetadata, nil
|
||||
}
|
||||
|
|
|
@ -109,7 +109,7 @@ func (d donut) ListObjects(bucket, prefix, marker, delimiter string, maxkeys int
|
|||
return results, commonPrefixes, isTruncated, nil
|
||||
}
|
||||
|
||||
func (d donut) PutObject(bucket, object string, reader io.ReadCloser, metadata map[string]string) error {
|
||||
func (d donut) PutObject(bucket, object, expectedMD5Sum string, reader io.ReadCloser, metadata map[string]string) error {
|
||||
errParams := map[string]string{
|
||||
"bucket": bucket,
|
||||
"object": object,
|
||||
|
@ -127,7 +127,7 @@ func (d donut) PutObject(bucket, object string, reader io.ReadCloser, metadata m
|
|||
if _, ok := d.buckets[bucket]; !ok {
|
||||
return iodine.New(errors.New("bucket does not exist"), nil)
|
||||
}
|
||||
err = d.buckets[bucket].PutObject(object, reader, metadata)
|
||||
err = d.buckets[bucket].PutObject(object, reader, expectedMD5Sum, metadata)
|
||||
if err != nil {
|
||||
return iodine.New(err, errParams)
|
||||
}
|
||||
|
|
|
@ -6,6 +6,8 @@ import (
|
|||
"path"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
"github.com/minio-io/iodine"
|
||||
)
|
||||
|
||||
func appendUniq(slice []string, i string) []string {
|
||||
|
@ -80,27 +82,27 @@ func uniqueObjects(objects []string) []string {
|
|||
func (d donut) makeBucket(bucketName string) error {
|
||||
err := d.getAllBuckets()
|
||||
if err != nil {
|
||||
return err
|
||||
return iodine.New(err, nil)
|
||||
}
|
||||
if _, ok := d.buckets[bucketName]; ok {
|
||||
return errors.New("bucket exists")
|
||||
return iodine.New(errors.New("bucket exists"), nil)
|
||||
}
|
||||
bucket, err := NewBucket(bucketName, d.name, d.nodes)
|
||||
if err != nil {
|
||||
return err
|
||||
return iodine.New(err, nil)
|
||||
}
|
||||
nodeNumber := 0
|
||||
d.buckets[bucketName] = bucket
|
||||
for _, node := range d.nodes {
|
||||
disks, err := node.ListDisks()
|
||||
if err != nil {
|
||||
return err
|
||||
return iodine.New(err, nil)
|
||||
}
|
||||
for _, disk := range disks {
|
||||
bucketSlice := fmt.Sprintf("%s$%d$%d", bucketName, nodeNumber, disk.GetOrder())
|
||||
err := disk.MakeDir(path.Join(d.name, bucketSlice))
|
||||
if err != nil {
|
||||
return err
|
||||
return iodine.New(err, nil)
|
||||
}
|
||||
}
|
||||
nodeNumber = nodeNumber + 1
|
||||
|
@ -112,23 +114,23 @@ func (d donut) getAllBuckets() error {
|
|||
for _, node := range d.nodes {
|
||||
disks, err := node.ListDisks()
|
||||
if err != nil {
|
||||
return err
|
||||
return iodine.New(err, nil)
|
||||
}
|
||||
for _, disk := range disks {
|
||||
dirs, err := disk.ListDir(d.name)
|
||||
if err != nil {
|
||||
return err
|
||||
return iodine.New(err, nil)
|
||||
}
|
||||
for _, dir := range dirs {
|
||||
splitDir := strings.Split(dir.Name(), "$")
|
||||
if len(splitDir) < 3 {
|
||||
return errors.New("corrupted backend")
|
||||
return iodine.New(errors.New("corrupted backend"), nil)
|
||||
}
|
||||
bucketName := splitDir[0]
|
||||
// we dont need this NewBucket once we cache these
|
||||
bucket, err := NewBucket(bucketName, d.name, d.nodes)
|
||||
if err != nil {
|
||||
return err
|
||||
return iodine.New(err, nil)
|
||||
}
|
||||
d.buckets[bucketName] = bucket
|
||||
}
|
||||
|
|
|
@ -4,6 +4,8 @@ import (
|
|||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/minio-io/iodine"
|
||||
)
|
||||
|
||||
func (d donut) Rebalance() error {
|
||||
|
@ -13,14 +15,14 @@ func (d donut) Rebalance() error {
|
|||
for _, node := range d.nodes {
|
||||
disks, err := node.ListDisks()
|
||||
if err != nil {
|
||||
return err
|
||||
return iodine.New(err, nil)
|
||||
}
|
||||
totalOffSetLength = len(disks)
|
||||
fmt.Println(totalOffSetLength)
|
||||
for _, disk := range disks {
|
||||
dirs, err := disk.ListDir(d.name)
|
||||
if err != nil {
|
||||
return err
|
||||
return iodine.New(err, nil)
|
||||
}
|
||||
if len(dirs) == 0 {
|
||||
newDisks = append(newDisks, disk)
|
||||
|
|
|
@ -11,7 +11,7 @@
|
|||
},
|
||||
{
|
||||
"ImportPath": "github.com/minio-io/donut",
|
||||
"Rev": "9d0c663a857103c780414e15d7e426c6fca9984e"
|
||||
"Rev": "4b20a97ea6d1fd4744ae06774e4c0092a67dafb5"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/minio-io/erasure",
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
package donut
|
||||
|
||||
import (
|
||||
"encoding/base64"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"io"
|
||||
"os"
|
||||
|
@ -297,7 +299,7 @@ func (d donutDriver) ListObjects(bucketName string, resources drivers.BucketReso
|
|||
}
|
||||
|
||||
// CreateObject creates a new object
|
||||
func (d donutDriver) CreateObject(bucketName, objectName, contentType, expectedMd5sum string, reader io.Reader) error {
|
||||
func (d donutDriver) CreateObject(bucketName, objectName, contentType, expectedMD5Sum string, reader io.Reader) error {
|
||||
errParams := map[string]string{
|
||||
"bucketName": bucketName,
|
||||
"objectName": objectName,
|
||||
|
@ -313,11 +315,17 @@ func (d donutDriver) CreateObject(bucketName, objectName, contentType, expectedM
|
|||
contentType = "application/octet-stream"
|
||||
}
|
||||
metadata := make(map[string]string)
|
||||
metadata["contentType"] = contentType
|
||||
if strings.TrimSpace(expectedMd5sum) != "" {
|
||||
metadata["expectedMd5sum"] = expectedMd5sum
|
||||
metadata["contentType"] = strings.TrimSpace(contentType)
|
||||
|
||||
if strings.TrimSpace(expectedMD5Sum) != "" {
|
||||
expectedMD5SumBytes, err := base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum))
|
||||
if err != nil {
|
||||
return iodine.New(err, nil)
|
||||
}
|
||||
err := d.donut.PutObject(bucketName, objectName, ioutil.NopCloser(reader), metadata)
|
||||
expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes)
|
||||
}
|
||||
|
||||
err := d.donut.PutObject(bucketName, objectName, expectedMD5Sum, ioutil.NopCloser(reader), metadata)
|
||||
if err != nil {
|
||||
return iodine.New(err, errParams)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue