mirror of
https://github.com/minio/minio.git
synced 2025-01-13 16:03:21 -05:00
Merge pull request #678 from harshavardhana/pr_out_handle_racy_map_updates_in_listobjects_on_a_bucket
Avoid racy maps, read from disk on success return quickly. Many more optimizations
This commit is contained in:
commit
39f26acbc9
@ -23,7 +23,6 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@ -44,7 +43,6 @@ type bucket struct {
|
|||||||
time time.Time
|
time time.Time
|
||||||
donutName string
|
donutName string
|
||||||
nodes map[string]node
|
nodes map[string]node
|
||||||
objects map[string]object
|
|
||||||
lock *sync.RWMutex
|
lock *sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -66,13 +64,15 @@ func newBucket(bucketName, aclType, donutName string, nodes map[string]node) (bu
|
|||||||
b.time = t
|
b.time = t
|
||||||
b.donutName = donutName
|
b.donutName = donutName
|
||||||
b.nodes = nodes
|
b.nodes = nodes
|
||||||
b.objects = make(map[string]object)
|
|
||||||
b.lock = new(sync.RWMutex)
|
b.lock = new(sync.RWMutex)
|
||||||
|
|
||||||
metadata := BucketMetadata{}
|
metadata := BucketMetadata{}
|
||||||
|
metadata.Version = bucketMetadataVersion
|
||||||
metadata.Name = bucketName
|
metadata.Name = bucketName
|
||||||
metadata.ACL = aclType
|
metadata.ACL = aclType
|
||||||
metadata.Created = t
|
metadata.Created = t
|
||||||
|
metadata.Metadata = make(map[string]string)
|
||||||
|
metadata.BucketObjectsMetadata = make(map[string]map[string]string)
|
||||||
|
|
||||||
return b, metadata, nil
|
return b, metadata, nil
|
||||||
}
|
}
|
||||||
@ -81,95 +81,115 @@ func (b bucket) getBucketName() string {
|
|||||||
return b.name
|
return b.name
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b bucket) getObjectName(fileName, diskPath, bucketPath string) (string, error) {
|
func (b bucket) GetObjectMetadata(objectName string) (ObjectMetadata, error) {
|
||||||
newObject, err := newObject(fileName, filepath.Join(diskPath, bucketPath))
|
b.lock.RLock()
|
||||||
|
defer b.lock.RUnlock()
|
||||||
|
metadataReaders, err := b.getDiskReaders(normalizeObjectName(objectName), objectMetadataConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", iodine.New(err, nil)
|
return ObjectMetadata{}, iodine.New(err, nil)
|
||||||
}
|
}
|
||||||
newObjectMetadata, err := newObject.GetObjectMetadata()
|
for _, metadataReader := range metadataReaders {
|
||||||
if err != nil {
|
defer metadataReader.Close()
|
||||||
return "", iodine.New(err, nil)
|
|
||||||
}
|
}
|
||||||
if newObjectMetadata.Object == "" {
|
objMetadata := ObjectMetadata{}
|
||||||
return "", iodine.New(ObjectCorrupted{Object: newObject.name}, nil)
|
for _, metadataReader := range metadataReaders {
|
||||||
|
jdec := json.NewDecoder(metadataReader)
|
||||||
|
if err := jdec.Decode(&objMetadata); err != nil {
|
||||||
|
return ObjectMetadata{}, iodine.New(err, nil)
|
||||||
|
}
|
||||||
|
return objMetadata, nil
|
||||||
}
|
}
|
||||||
b.objects[newObjectMetadata.Object] = newObject
|
return ObjectMetadata{}, iodine.New(InvalidArgument{}, nil)
|
||||||
return newObjectMetadata.Object, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b bucket) GetObjectMetadata(objectName string) (ObjectMetadata, error) {
|
func (b bucket) getBucketMetadataReaders() ([]io.ReadCloser, error) {
|
||||||
return b.objects[objectName].GetObjectMetadata()
|
var readers []io.ReadCloser
|
||||||
|
for _, node := range b.nodes {
|
||||||
|
disks, err := node.ListDisks()
|
||||||
|
if err != nil {
|
||||||
|
return nil, iodine.New(err, nil)
|
||||||
|
}
|
||||||
|
readers = make([]io.ReadCloser, len(disks))
|
||||||
|
for order, disk := range disks {
|
||||||
|
bucketMetaDataReader, err := disk.OpenFile(filepath.Join(b.donutName, bucketMetadataConfig))
|
||||||
|
if err != nil {
|
||||||
|
return nil, iodine.New(err, nil)
|
||||||
|
}
|
||||||
|
readers[order] = bucketMetaDataReader
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return readers, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b bucket) getBucketMetadata() (*AllBuckets, error) {
|
||||||
|
metadata := new(AllBuckets)
|
||||||
|
readers, err := b.getBucketMetadataReaders()
|
||||||
|
if err != nil {
|
||||||
|
return nil, iodine.New(err, nil)
|
||||||
|
}
|
||||||
|
for _, reader := range readers {
|
||||||
|
defer reader.Close()
|
||||||
|
}
|
||||||
|
for _, reader := range readers {
|
||||||
|
jenc := json.NewDecoder(reader)
|
||||||
|
if err := jenc.Decode(metadata); err != nil {
|
||||||
|
return nil, iodine.New(err, nil)
|
||||||
|
}
|
||||||
|
return metadata, nil
|
||||||
|
}
|
||||||
|
return nil, iodine.New(InvalidArgument{}, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListObjects - list all objects
|
// ListObjects - list all objects
|
||||||
func (b bucket) ListObjects(prefix, marker, delimiter string, maxkeys int) ([]string, []string, bool, error) {
|
func (b bucket) ListObjects(prefix, marker, delimiter string, maxkeys int) ([]string, []string, bool, error) {
|
||||||
b.lock.RLock()
|
b.lock.RLock()
|
||||||
defer b.lock.RUnlock()
|
defer b.lock.RUnlock()
|
||||||
|
|
||||||
if maxkeys <= 0 {
|
if maxkeys <= 0 {
|
||||||
maxkeys = 1000
|
maxkeys = 1000
|
||||||
}
|
}
|
||||||
var isTruncated bool
|
var isTruncated bool
|
||||||
nodeSlice := 0
|
|
||||||
var objects []string
|
var objects []string
|
||||||
for _, node := range b.nodes {
|
bucketMetadata, err := b.getBucketMetadata()
|
||||||
disks, err := node.ListDisks()
|
if err != nil {
|
||||||
if err != nil {
|
return nil, nil, false, iodine.New(err, nil)
|
||||||
return nil, nil, false, iodine.New(err, nil)
|
|
||||||
}
|
|
||||||
for order, disk := range disks {
|
|
||||||
bucketSlice := fmt.Sprintf("%s$%d$%d", b.name, nodeSlice, order)
|
|
||||||
bucketPath := filepath.Join(b.donutName, bucketSlice)
|
|
||||||
files, err := disk.ListDir(bucketPath)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, false, iodine.New(err, nil)
|
|
||||||
}
|
|
||||||
for _, file := range files {
|
|
||||||
objectName, err := b.getObjectName(file.Name(), disk.GetPath(), bucketPath)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, false, iodine.New(err, nil)
|
|
||||||
}
|
|
||||||
if strings.HasPrefix(objectName, strings.TrimSpace(prefix)) {
|
|
||||||
if objectName > marker {
|
|
||||||
objects = appendUniq(objects, objectName)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
nodeSlice = nodeSlice + 1
|
|
||||||
}
|
}
|
||||||
{
|
for objectName := range bucketMetadata.Buckets[b.getBucketName()].BucketObjectsMetadata {
|
||||||
if strings.TrimSpace(prefix) != "" {
|
if strings.HasPrefix(objectName, strings.TrimSpace(prefix)) {
|
||||||
objects = removePrefix(objects, prefix)
|
if objectName > marker {
|
||||||
|
objects = appendUniq(objects, objectName)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
var prefixes []string
|
}
|
||||||
var filteredObjects []string
|
if strings.TrimSpace(prefix) != "" {
|
||||||
if strings.TrimSpace(delimiter) != "" {
|
objects = removePrefix(objects, prefix)
|
||||||
filteredObjects = filterDelimited(objects, delimiter)
|
}
|
||||||
prefixes = filterNotDelimited(objects, delimiter)
|
var prefixes []string
|
||||||
prefixes = extractDelimited(prefixes, delimiter)
|
var filteredObjects []string
|
||||||
prefixes = uniqueObjects(prefixes)
|
if strings.TrimSpace(delimiter) != "" {
|
||||||
} else {
|
filteredObjects = filterDelimited(objects, delimiter)
|
||||||
filteredObjects = objects
|
prefixes = filterNotDelimited(objects, delimiter)
|
||||||
}
|
prefixes = extractDelimited(prefixes, delimiter)
|
||||||
var results []string
|
prefixes = uniqueObjects(prefixes)
|
||||||
var commonPrefixes []string
|
} else {
|
||||||
|
filteredObjects = objects
|
||||||
|
}
|
||||||
|
var results []string
|
||||||
|
var commonPrefixes []string
|
||||||
|
|
||||||
sort.Strings(filteredObjects)
|
sort.Strings(filteredObjects)
|
||||||
for _, objectName := range filteredObjects {
|
for _, objectName := range filteredObjects {
|
||||||
if len(results) >= maxkeys {
|
if len(results) >= maxkeys {
|
||||||
isTruncated = true
|
isTruncated = true
|
||||||
break
|
break
|
||||||
}
|
|
||||||
results = appendUniq(results, prefix+objectName)
|
|
||||||
}
|
}
|
||||||
for _, commonPrefix := range prefixes {
|
results = appendUniq(results, prefix+objectName)
|
||||||
commonPrefixes = appendUniq(commonPrefixes, prefix+commonPrefix)
|
|
||||||
}
|
|
||||||
sort.Strings(results)
|
|
||||||
sort.Strings(commonPrefixes)
|
|
||||||
return results, commonPrefixes, isTruncated, nil
|
|
||||||
}
|
}
|
||||||
|
for _, commonPrefix := range prefixes {
|
||||||
|
commonPrefixes = appendUniq(commonPrefixes, prefix+commonPrefix)
|
||||||
|
}
|
||||||
|
sort.Strings(results)
|
||||||
|
sort.Strings(commonPrefixes)
|
||||||
|
return results, commonPrefixes, isTruncated, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReadObject - open an object to read
|
// ReadObject - open an object to read
|
||||||
@ -178,58 +198,58 @@ func (b bucket) ReadObject(objectName string) (reader io.ReadCloser, size int64,
|
|||||||
defer b.lock.RUnlock()
|
defer b.lock.RUnlock()
|
||||||
reader, writer := io.Pipe()
|
reader, writer := io.Pipe()
|
||||||
// get list of objects
|
// get list of objects
|
||||||
_, _, _, err = b.ListObjects(objectName, "", "", 1)
|
bucketMetadata, err := b.getBucketMetadata()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, 0, iodine.New(err, nil)
|
return nil, 0, iodine.New(err, nil)
|
||||||
}
|
}
|
||||||
// check if object exists
|
// check if object exists
|
||||||
object, ok := b.objects[objectName]
|
if _, ok := bucketMetadata.Buckets[b.getBucketName()].BucketObjectsMetadata[objectName]; !ok {
|
||||||
if !ok {
|
|
||||||
return nil, 0, iodine.New(ObjectNotFound{Object: objectName}, nil)
|
return nil, 0, iodine.New(ObjectNotFound{Object: objectName}, nil)
|
||||||
}
|
}
|
||||||
// verify if sysObjectMetadata is readable, before we server the request
|
objMetadata := ObjectMetadata{}
|
||||||
sysObjMetadata, err := object.GetSystemObjectMetadata()
|
metadataReaders, err := b.getDiskReaders(normalizeObjectName(objectName), objectMetadataConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, 0, iodine.New(err, nil)
|
return nil, 0, iodine.New(err, nil)
|
||||||
}
|
}
|
||||||
|
for _, metadataReader := range metadataReaders {
|
||||||
|
defer metadataReader.Close()
|
||||||
|
}
|
||||||
|
for _, metadataReader := range metadataReaders {
|
||||||
|
jdec := json.NewDecoder(metadataReader)
|
||||||
|
if err := jdec.Decode(&objMetadata); err != nil {
|
||||||
|
return nil, 0, iodine.New(err, nil)
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
// read and reply back to GetObject() request in a go-routine
|
// read and reply back to GetObject() request in a go-routine
|
||||||
go b.readEncodedData(b.normalizeObjectName(objectName), writer, sysObjMetadata)
|
go b.readEncodedData(normalizeObjectName(objectName), writer, objMetadata)
|
||||||
return reader, sysObjMetadata.Size, nil
|
return reader, objMetadata.Size, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteObject - write a new object into bucket
|
// WriteObject - write a new object into bucket
|
||||||
func (b bucket) WriteObject(objectName string, objectData io.Reader, expectedMD5Sum string, metadata map[string]string) (string, error) {
|
func (b bucket) WriteObject(objectName string, objectData io.Reader, expectedMD5Sum string) (string, error) {
|
||||||
b.lock.Lock()
|
b.lock.Lock()
|
||||||
defer b.lock.Unlock()
|
defer b.lock.Unlock()
|
||||||
if objectName == "" || objectData == nil {
|
if objectName == "" || objectData == nil {
|
||||||
return "", iodine.New(InvalidArgument{}, nil)
|
return "", iodine.New(InvalidArgument{}, nil)
|
||||||
}
|
}
|
||||||
writers, err := b.getDiskWriters(b.normalizeObjectName(objectName), "data")
|
writers, err := b.getDiskWriters(normalizeObjectName(objectName), "data")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", iodine.New(err, nil)
|
return "", iodine.New(err, nil)
|
||||||
}
|
}
|
||||||
sumMD5 := md5.New()
|
sumMD5 := md5.New()
|
||||||
sum512 := sha512.New()
|
sum512 := sha512.New()
|
||||||
|
|
||||||
objMetadata := new(ObjectMetadata)
|
objMetadata := new(ObjectMetadata)
|
||||||
sysObjMetadata := new(SystemObjectMetadata)
|
|
||||||
objMetadata.Version = objectMetadataVersion
|
objMetadata.Version = objectMetadataVersion
|
||||||
sysObjMetadata.Version = systemObjectMetadataVersion
|
objMetadata.Created = time.Now().UTC()
|
||||||
size := metadata["contentLength"]
|
|
||||||
sizeInt, err := strconv.ParseInt(size, 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
return "", iodine.New(err, nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
// if total writers are only '1' do not compute erasure
|
// if total writers are only '1' do not compute erasure
|
||||||
switch len(writers) == 1 {
|
switch len(writers) == 1 {
|
||||||
case true:
|
case true:
|
||||||
mw := io.MultiWriter(writers[0], sumMD5, sum512)
|
mw := io.MultiWriter(writers[0], sumMD5, sum512)
|
||||||
totalLength, err := io.CopyN(mw, objectData, sizeInt)
|
totalLength, err := io.Copy(mw, objectData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", iodine.New(err, nil)
|
return "", iodine.New(err, nil)
|
||||||
}
|
}
|
||||||
sysObjMetadata.Size = totalLength
|
|
||||||
objMetadata.Size = totalLength
|
objMetadata.Size = totalLength
|
||||||
case false:
|
case false:
|
||||||
// calculate data and parity dictated by total number of writers
|
// calculate data and parity dictated by total number of writers
|
||||||
@ -243,30 +263,20 @@ func (b bucket) WriteObject(objectName string, objectData io.Reader, expectedMD5
|
|||||||
return "", iodine.New(err, nil)
|
return "", iodine.New(err, nil)
|
||||||
}
|
}
|
||||||
/// donutMetadata section
|
/// donutMetadata section
|
||||||
sysObjMetadata.BlockSize = 10 * 1024 * 1024
|
objMetadata.BlockSize = 10 * 1024 * 1024
|
||||||
sysObjMetadata.ChunkCount = chunkCount
|
objMetadata.ChunkCount = chunkCount
|
||||||
sysObjMetadata.DataDisks = k
|
objMetadata.DataDisks = k
|
||||||
sysObjMetadata.ParityDisks = m
|
objMetadata.ParityDisks = m
|
||||||
sysObjMetadata.ErasureTechnique = "Cauchy"
|
objMetadata.ErasureTechnique = "Cauchy"
|
||||||
sysObjMetadata.Size = int64(totalLength)
|
|
||||||
// keep size inside ObjectMetadata as well for Object API requests
|
|
||||||
objMetadata.Size = int64(totalLength)
|
objMetadata.Size = int64(totalLength)
|
||||||
}
|
}
|
||||||
objMetadata.Bucket = b.getBucketName()
|
objMetadata.Bucket = b.getBucketName()
|
||||||
objMetadata.Object = objectName
|
objMetadata.Object = objectName
|
||||||
objMetadata.Metadata = metadata
|
|
||||||
dataMD5sum := sumMD5.Sum(nil)
|
dataMD5sum := sumMD5.Sum(nil)
|
||||||
dataSHA512sum := sum512.Sum(nil)
|
dataSHA512sum := sum512.Sum(nil)
|
||||||
objMetadata.Created = time.Now().UTC()
|
|
||||||
|
|
||||||
// keeping md5sum for the object in two different places
|
objMetadata.MD5Sum = hex.EncodeToString(dataMD5sum)
|
||||||
// one for object storage and another is for internal use
|
objMetadata.SHA512Sum = hex.EncodeToString(dataSHA512sum)
|
||||||
hexMD5Sum := hex.EncodeToString(dataMD5sum)
|
|
||||||
hex512Sum := hex.EncodeToString(dataSHA512sum)
|
|
||||||
objMetadata.MD5Sum = hexMD5Sum
|
|
||||||
objMetadata.SHA512Sum = hex512Sum
|
|
||||||
sysObjMetadata.MD5Sum = hexMD5Sum
|
|
||||||
sysObjMetadata.SHA512Sum = hex512Sum
|
|
||||||
|
|
||||||
// Verify if the written object is equal to what is expected, only if it is requested as such
|
// Verify if the written object is equal to what is expected, only if it is requested as such
|
||||||
if strings.TrimSpace(expectedMD5Sum) != "" {
|
if strings.TrimSpace(expectedMD5Sum) != "" {
|
||||||
@ -274,12 +284,8 @@ func (b bucket) WriteObject(objectName string, objectData io.Reader, expectedMD5
|
|||||||
return "", iodine.New(err, nil)
|
return "", iodine.New(err, nil)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// write donut specific metadata
|
|
||||||
if err := b.writeSystemObjectMetadata(b.normalizeObjectName(objectName), sysObjMetadata); err != nil {
|
|
||||||
return "", iodine.New(err, nil)
|
|
||||||
}
|
|
||||||
// write object specific metadata
|
// write object specific metadata
|
||||||
if err := b.writeObjectMetadata(b.normalizeObjectName(objectName), objMetadata); err != nil {
|
if err := b.writeObjectMetadata(normalizeObjectName(objectName), objMetadata); err != nil {
|
||||||
return "", iodine.New(err, nil)
|
return "", iodine.New(err, nil)
|
||||||
}
|
}
|
||||||
// close all writers, when control flow reaches here
|
// close all writers, when control flow reaches here
|
||||||
@ -329,27 +335,6 @@ func (b bucket) writeObjectMetadata(objectName string, objMetadata *ObjectMetada
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// writeSystemObjectMetadata - write donut related object metadata
|
|
||||||
func (b bucket) writeSystemObjectMetadata(objectName string, sysObjMetadata *SystemObjectMetadata) error {
|
|
||||||
if sysObjMetadata == nil {
|
|
||||||
return iodine.New(InvalidArgument{}, nil)
|
|
||||||
}
|
|
||||||
sysObjMetadataWriters, err := b.getDiskWriters(objectName, sysObjectMetadataConfig)
|
|
||||||
if err != nil {
|
|
||||||
return iodine.New(err, nil)
|
|
||||||
}
|
|
||||||
for _, sysObjMetadataWriter := range sysObjMetadataWriters {
|
|
||||||
defer sysObjMetadataWriter.Close()
|
|
||||||
}
|
|
||||||
for _, sysObjMetadataWriter := range sysObjMetadataWriters {
|
|
||||||
jenc := json.NewEncoder(sysObjMetadataWriter)
|
|
||||||
if err := jenc.Encode(sysObjMetadata); err != nil {
|
|
||||||
return iodine.New(err, nil)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO - This a temporary normalization of objectNames, need to find a better way
|
// TODO - This a temporary normalization of objectNames, need to find a better way
|
||||||
//
|
//
|
||||||
// normalizedObjectName - all objectNames with "/" get normalized to a simple objectName
|
// normalizedObjectName - all objectNames with "/" get normalized to a simple objectName
|
||||||
@ -358,7 +343,7 @@ func (b bucket) writeSystemObjectMetadata(objectName string, sysObjMetadata *Sys
|
|||||||
// user provided value - "this/is/my/deep/directory/structure"
|
// user provided value - "this/is/my/deep/directory/structure"
|
||||||
// donut normalized value - "this-is-my-deep-directory-structure"
|
// donut normalized value - "this-is-my-deep-directory-structure"
|
||||||
//
|
//
|
||||||
func (b bucket) normalizeObjectName(objectName string) string {
|
func normalizeObjectName(objectName string) string {
|
||||||
// replace every '/' with '-'
|
// replace every '/' with '-'
|
||||||
return strings.Replace(objectName, "/", "-", -1)
|
return strings.Replace(objectName, "/", "-", -1)
|
||||||
}
|
}
|
||||||
@ -407,12 +392,7 @@ func (b bucket) writeEncodedData(k, m uint8, writers []io.WriteCloser, objectDat
|
|||||||
}
|
}
|
||||||
|
|
||||||
// readEncodedData -
|
// readEncodedData -
|
||||||
func (b bucket) readEncodedData(objectName string, writer *io.PipeWriter, sysObjMetadata SystemObjectMetadata) {
|
func (b bucket) readEncodedData(objectName string, writer *io.PipeWriter, objMetadata ObjectMetadata) {
|
||||||
expectedMd5sum, err := hex.DecodeString(sysObjMetadata.MD5Sum)
|
|
||||||
if err != nil {
|
|
||||||
writer.CloseWithError(iodine.New(err, nil))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
readers, err := b.getDiskReaders(objectName, "data")
|
readers, err := b.getDiskReaders(objectName, "data")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
writer.CloseWithError(iodine.New(err, nil))
|
writer.CloseWithError(iodine.New(err, nil))
|
||||||
@ -421,22 +401,27 @@ func (b bucket) readEncodedData(objectName string, writer *io.PipeWriter, sysObj
|
|||||||
for _, reader := range readers {
|
for _, reader := range readers {
|
||||||
defer reader.Close()
|
defer reader.Close()
|
||||||
}
|
}
|
||||||
|
expectedMd5sum, err := hex.DecodeString(objMetadata.MD5Sum)
|
||||||
|
if err != nil {
|
||||||
|
writer.CloseWithError(iodine.New(err, nil))
|
||||||
|
return
|
||||||
|
}
|
||||||
hasher := md5.New()
|
hasher := md5.New()
|
||||||
mwriter := io.MultiWriter(writer, hasher)
|
mwriter := io.MultiWriter(writer, hasher)
|
||||||
switch len(readers) == 1 {
|
switch len(readers) == 1 {
|
||||||
case false:
|
case false:
|
||||||
if sysObjMetadata.ErasureTechnique == "" {
|
if objMetadata.ErasureTechnique == "" {
|
||||||
writer.CloseWithError(iodine.New(MissingErasureTechnique{}, nil))
|
writer.CloseWithError(iodine.New(MissingErasureTechnique{}, nil))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
encoder, err := newEncoder(sysObjMetadata.DataDisks, sysObjMetadata.ParityDisks, sysObjMetadata.ErasureTechnique)
|
encoder, err := newEncoder(objMetadata.DataDisks, objMetadata.ParityDisks, objMetadata.ErasureTechnique)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
writer.CloseWithError(iodine.New(err, nil))
|
writer.CloseWithError(iodine.New(err, nil))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
totalLeft := sysObjMetadata.Size
|
totalLeft := objMetadata.Size
|
||||||
for i := 0; i < sysObjMetadata.ChunkCount; i++ {
|
for i := 0; i < objMetadata.ChunkCount; i++ {
|
||||||
decodedData, err := b.decodeEncodedData(totalLeft, int64(sysObjMetadata.BlockSize), readers, encoder, writer)
|
decodedData, err := b.decodeEncodedData(totalLeft, int64(objMetadata.BlockSize), readers, encoder, writer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
writer.CloseWithError(iodine.New(err, nil))
|
writer.CloseWithError(iodine.New(err, nil))
|
||||||
return
|
return
|
||||||
@ -446,7 +431,7 @@ func (b bucket) readEncodedData(objectName string, writer *io.PipeWriter, sysObj
|
|||||||
writer.CloseWithError(iodine.New(err, nil))
|
writer.CloseWithError(iodine.New(err, nil))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
totalLeft = totalLeft - int64(sysObjMetadata.BlockSize)
|
totalLeft = totalLeft - int64(objMetadata.BlockSize)
|
||||||
}
|
}
|
||||||
case true:
|
case true:
|
||||||
_, err := io.Copy(writer, readers[0])
|
_, err := io.Copy(writer, readers[0])
|
||||||
|
@ -18,39 +18,23 @@ package donut
|
|||||||
|
|
||||||
import "time"
|
import "time"
|
||||||
|
|
||||||
// ObjectMetadata object specific metadata per object
|
// ObjectMetadata container for object on donut system
|
||||||
type ObjectMetadata struct {
|
type ObjectMetadata struct {
|
||||||
// version
|
// version
|
||||||
Version string `json:"version"`
|
Version string `json:"version"`
|
||||||
|
|
||||||
// object metadata
|
// object metadata
|
||||||
Size int64 `json:"size"`
|
|
||||||
Created time.Time `json:"created"`
|
Created time.Time `json:"created"`
|
||||||
Bucket string `json:"bucket"`
|
Bucket string `json:"bucket"`
|
||||||
Object string `json:"object"`
|
Object string `json:"object"`
|
||||||
|
Size int64 `json:"size"`
|
||||||
// checksums
|
|
||||||
MD5Sum string `json:"md5sum"`
|
|
||||||
SHA512Sum string `json:"sha512sum"`
|
|
||||||
|
|
||||||
// additional metadata
|
|
||||||
Metadata map[string]string `json:"metadata"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// SystemObjectMetadata container for donut system specific metadata per object
|
|
||||||
type SystemObjectMetadata struct {
|
|
||||||
// version
|
|
||||||
Version string `json:"version"`
|
|
||||||
|
|
||||||
// erasure
|
// erasure
|
||||||
DataDisks uint8 `json:"sys.erasureK"`
|
DataDisks uint8 `json:"sys.erasureK"`
|
||||||
ParityDisks uint8 `json:"sys.erasureM"`
|
ParityDisks uint8 `json:"sys.erasureM"`
|
||||||
ErasureTechnique string `json:"sys.erasureTechnique"`
|
ErasureTechnique string `json:"sys.erasureTechnique"`
|
||||||
|
BlockSize int `json:"sys.blockSize"`
|
||||||
// object metadata
|
ChunkCount int `json:"sys.chunkCount"`
|
||||||
Size int64 `json:"sys.size"`
|
|
||||||
BlockSize int `json:"sys.blockSize"`
|
|
||||||
ChunkCount int `json:"sys.chunkCount"`
|
|
||||||
|
|
||||||
// checksums
|
// checksums
|
||||||
MD5Sum string `json:"sys.md5sum"`
|
MD5Sum string `json:"sys.md5sum"`
|
||||||
@ -64,14 +48,16 @@ type Metadata struct {
|
|||||||
|
|
||||||
// AllBuckets container for all buckets
|
// AllBuckets container for all buckets
|
||||||
type AllBuckets struct {
|
type AllBuckets struct {
|
||||||
Buckets map[string]BucketMetadata
|
Version string `json:"version"`
|
||||||
|
Buckets map[string]BucketMetadata `json:"buckets"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// BucketMetadata container for bucket level metadata
|
// BucketMetadata container for bucket level metadata
|
||||||
type BucketMetadata struct {
|
type BucketMetadata struct {
|
||||||
Version string `json:"version"`
|
Version string `json:"version"`
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
ACL string `json:"acl"`
|
ACL string `json:"acl"`
|
||||||
Created time.Time `json:"created"`
|
Created time.Time `json:"created"`
|
||||||
Metadata map[string]string `json:"metadata"`
|
Metadata map[string]string `json:"metadata"`
|
||||||
|
BucketObjectsMetadata map[string]map[string]string `json:"objectsMetadata"`
|
||||||
}
|
}
|
||||||
|
@ -39,8 +39,6 @@ type donut struct {
|
|||||||
|
|
||||||
// config files used inside Donut
|
// config files used inside Donut
|
||||||
const (
|
const (
|
||||||
// donut system object metadata
|
|
||||||
sysObjectMetadataConfig = "sysObjectMetadata.json"
|
|
||||||
// donut system config
|
// donut system config
|
||||||
donutConfig = "donutConfig.json"
|
donutConfig = "donutConfig.json"
|
||||||
|
|
||||||
@ -49,8 +47,8 @@ const (
|
|||||||
objectMetadataConfig = "objectMetadata.json"
|
objectMetadataConfig = "objectMetadata.json"
|
||||||
|
|
||||||
// versions
|
// versions
|
||||||
objectMetadataVersion = "1.0.0"
|
objectMetadataVersion = "1.0.0"
|
||||||
systemObjectMetadataVersion = "1.0.0"
|
bucketMetadataVersion = "1.0.0"
|
||||||
)
|
)
|
||||||
|
|
||||||
// attachDonutNode - wrapper function to instantiate a new node for associatedt donut
|
// attachDonutNode - wrapper function to instantiate a new node for associatedt donut
|
||||||
@ -196,19 +194,21 @@ func (dt donut) PutObject(bucket, object, expectedMD5Sum string, reader io.ReadC
|
|||||||
if _, ok := dt.buckets[bucket]; !ok {
|
if _, ok := dt.buckets[bucket]; !ok {
|
||||||
return "", iodine.New(BucketNotFound{Bucket: bucket}, nil)
|
return "", iodine.New(BucketNotFound{Bucket: bucket}, nil)
|
||||||
}
|
}
|
||||||
objectList, _, _, err := dt.buckets[bucket].ListObjects("", "", "", 1000)
|
bucketMeta, err := dt.getDonutBucketMetadata()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", iodine.New(err, nil)
|
return "", iodine.New(err, errParams)
|
||||||
}
|
}
|
||||||
for _, objectName := range objectList {
|
if _, ok := bucketMeta.Buckets[bucket].BucketObjectsMetadata[object]; ok {
|
||||||
if objectName == object {
|
return "", iodine.New(ObjectExists{Object: object}, errParams)
|
||||||
return "", iodine.New(ObjectExists{Object: object}, nil)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
md5sum, err := dt.buckets[bucket].WriteObject(object, reader, expectedMD5Sum, metadata)
|
md5sum, err := dt.buckets[bucket].WriteObject(object, reader, expectedMD5Sum)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", iodine.New(err, errParams)
|
return "", iodine.New(err, errParams)
|
||||||
}
|
}
|
||||||
|
bucketMeta.Buckets[bucket].BucketObjectsMetadata[object] = metadata
|
||||||
|
if err := dt.setDonutBucketMetadata(bucketMeta); err != nil {
|
||||||
|
return "", iodine.New(err, errParams)
|
||||||
|
}
|
||||||
return md5sum, nil
|
return md5sum, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -236,7 +236,7 @@ func (dt donut) GetObject(bucket, object string) (reader io.ReadCloser, size int
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetObjectMetadata - get object metadata
|
// GetObjectMetadata - get object metadata
|
||||||
func (dt donut) GetObjectMetadata(bucket, object string) (ObjectMetadata, error) {
|
func (dt donut) GetObjectMetadata(bucket, object string) (ObjectMetadata, map[string]string, error) {
|
||||||
dt.lock.RLock()
|
dt.lock.RLock()
|
||||||
defer dt.lock.RUnlock()
|
defer dt.lock.RUnlock()
|
||||||
errParams := map[string]string{
|
errParams := map[string]string{
|
||||||
@ -244,26 +244,23 @@ func (dt donut) GetObjectMetadata(bucket, object string) (ObjectMetadata, error)
|
|||||||
"object": object,
|
"object": object,
|
||||||
}
|
}
|
||||||
if err := dt.listDonutBuckets(); err != nil {
|
if err := dt.listDonutBuckets(); err != nil {
|
||||||
return ObjectMetadata{}, iodine.New(err, errParams)
|
return ObjectMetadata{}, nil, iodine.New(err, errParams)
|
||||||
}
|
}
|
||||||
if _, ok := dt.buckets[bucket]; !ok {
|
if _, ok := dt.buckets[bucket]; !ok {
|
||||||
return ObjectMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, errParams)
|
return ObjectMetadata{}, nil, iodine.New(BucketNotFound{Bucket: bucket}, errParams)
|
||||||
}
|
}
|
||||||
//
|
bucketMeta, err := dt.getDonutBucketMetadata()
|
||||||
// there is a potential issue here, if the object comes after the truncated list
|
|
||||||
// below GetObjectMetadata would fail as ObjectNotFound{}
|
|
||||||
//
|
|
||||||
// will fix it when we bring in persistent json into Donut - TODO
|
|
||||||
objectList, _, _, err := dt.buckets[bucket].ListObjects("", "", "", 1000)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ObjectMetadata{}, iodine.New(err, errParams)
|
return ObjectMetadata{}, nil, iodine.New(err, errParams)
|
||||||
}
|
}
|
||||||
for _, objectName := range objectList {
|
if _, ok := bucketMeta.Buckets[bucket].BucketObjectsMetadata[object]; !ok {
|
||||||
if objectName == object {
|
return ObjectMetadata{}, nil, iodine.New(ObjectNotFound{Object: object}, errParams)
|
||||||
return dt.buckets[bucket].GetObjectMetadata(object)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return ObjectMetadata{}, iodine.New(ObjectNotFound{Object: object}, errParams)
|
objectMetadata, err := dt.buckets[bucket].GetObjectMetadata(object)
|
||||||
|
if err != nil {
|
||||||
|
return ObjectMetadata{}, nil, iodine.New(err, nil)
|
||||||
|
}
|
||||||
|
return objectMetadata, bucketMeta.Buckets[bucket].BucketObjectsMetadata[object], nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// getDiskWriters -
|
// getDiskWriters -
|
||||||
@ -337,8 +334,9 @@ func (dt donut) getDonutBucketMetadata() (*AllBuckets, error) {
|
|||||||
if err := jenc.Decode(metadata); err != nil {
|
if err := jenc.Decode(metadata); err != nil {
|
||||||
return nil, iodine.New(err, nil)
|
return nil, iodine.New(err, nil)
|
||||||
}
|
}
|
||||||
|
return metadata, nil
|
||||||
}
|
}
|
||||||
return metadata, nil
|
return nil, iodine.New(InvalidArgument{}, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dt donut) makeDonutBucket(bucketName, acl string) error {
|
func (dt donut) makeDonutBucket(bucketName, acl string) error {
|
||||||
|
@ -198,12 +198,12 @@ func (s *MySuite) TestNewObjectMetadata(c *C) {
|
|||||||
c.Assert(err, IsNil)
|
c.Assert(err, IsNil)
|
||||||
c.Assert(calculatedMd5Sum, Equals, expectedMd5Sum)
|
c.Assert(calculatedMd5Sum, Equals, expectedMd5Sum)
|
||||||
|
|
||||||
objectMetadata, err := donut.GetObjectMetadata("foo", "obj")
|
_, additionalMetadata, err := donut.GetObjectMetadata("foo", "obj")
|
||||||
c.Assert(err, IsNil)
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
c.Assert(objectMetadata.Metadata["contentType"], Equals, metadata["contentType"])
|
c.Assert(additionalMetadata["contentType"], Equals, metadata["contentType"])
|
||||||
c.Assert(objectMetadata.Metadata["foo"], Equals, metadata["foo"])
|
c.Assert(additionalMetadata["foo"], Equals, metadata["foo"])
|
||||||
c.Assert(objectMetadata.Metadata["hello"], Equals, metadata["hello"])
|
c.Assert(additionalMetadata["hello"], Equals, metadata["hello"])
|
||||||
}
|
}
|
||||||
|
|
||||||
// test create object fails without name
|
// test create object fails without name
|
||||||
@ -252,7 +252,7 @@ func (s *MySuite) TestNewObjectCanBeWritten(c *C) {
|
|||||||
c.Assert(err, IsNil)
|
c.Assert(err, IsNil)
|
||||||
c.Assert(actualData.Bytes(), DeepEquals, []byte(data))
|
c.Assert(actualData.Bytes(), DeepEquals, []byte(data))
|
||||||
|
|
||||||
actualMetadata, err := donut.GetObjectMetadata("foo", "obj")
|
actualMetadata, _, err := donut.GetObjectMetadata("foo", "obj")
|
||||||
c.Assert(err, IsNil)
|
c.Assert(err, IsNil)
|
||||||
c.Assert(expectedMd5Sum, Equals, actualMetadata.MD5Sum)
|
c.Assert(expectedMd5Sum, Equals, actualMetadata.MD5Sum)
|
||||||
c.Assert(int64(len(data)), Equals, actualMetadata.Size)
|
c.Assert(int64(len(data)), Equals, actualMetadata.Size)
|
||||||
|
@ -39,7 +39,7 @@ type ObjectStorage interface {
|
|||||||
|
|
||||||
// Object operations
|
// Object operations
|
||||||
GetObject(bucket, object string) (io.ReadCloser, int64, error)
|
GetObject(bucket, object string) (io.ReadCloser, int64, error)
|
||||||
GetObjectMetadata(bucket, object string) (ObjectMetadata, error)
|
GetObjectMetadata(bucket, object string) (ObjectMetadata, map[string]string, error)
|
||||||
PutObject(bucket, object, expectedMD5Sum string, reader io.ReadCloser, metadata map[string]string) (string, error)
|
PutObject(bucket, object, expectedMD5Sum string, reader io.ReadCloser, metadata map[string]string) (string, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,66 +0,0 @@
|
|||||||
/*
|
|
||||||
* Minimalist Object Storage, (C) 2015 Minio, Inc.
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package donut
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/json"
|
|
||||||
"io/ioutil"
|
|
||||||
"path/filepath"
|
|
||||||
|
|
||||||
"github.com/minio/minio/pkg/iodine"
|
|
||||||
)
|
|
||||||
|
|
||||||
// object internal struct
|
|
||||||
type object struct {
|
|
||||||
name string
|
|
||||||
objectPath string
|
|
||||||
}
|
|
||||||
|
|
||||||
// newObject - instantiate a new object
|
|
||||||
func newObject(objectName, p string) (object, error) {
|
|
||||||
if objectName == "" {
|
|
||||||
return object{}, iodine.New(InvalidArgument{}, nil)
|
|
||||||
}
|
|
||||||
o := object{}
|
|
||||||
o.name = objectName
|
|
||||||
o.objectPath = filepath.Join(p, objectName)
|
|
||||||
return o, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (o object) GetObjectMetadata() (ObjectMetadata, error) {
|
|
||||||
objMetadata := ObjectMetadata{}
|
|
||||||
objMetadataBytes, err := ioutil.ReadFile(filepath.Join(o.objectPath, objectMetadataConfig))
|
|
||||||
if err != nil {
|
|
||||||
return ObjectMetadata{}, iodine.New(ObjectNotFound{Object: o.name}, nil)
|
|
||||||
}
|
|
||||||
if err := json.Unmarshal(objMetadataBytes, &objMetadata); err != nil {
|
|
||||||
return ObjectMetadata{}, iodine.New(err, nil)
|
|
||||||
}
|
|
||||||
return objMetadata, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (o object) GetSystemObjectMetadata() (SystemObjectMetadata, error) {
|
|
||||||
sysObjMetadata := SystemObjectMetadata{}
|
|
||||||
sysObjMetadataBytes, err := ioutil.ReadFile(filepath.Join(o.objectPath, sysObjectMetadataConfig))
|
|
||||||
if err != nil {
|
|
||||||
return SystemObjectMetadata{}, iodine.New(ObjectNotFound{Object: o.name}, nil)
|
|
||||||
}
|
|
||||||
if err := json.Unmarshal(sysObjMetadataBytes, &sysObjMetadata); err != nil {
|
|
||||||
return SystemObjectMetadata{}, iodine.New(err, nil)
|
|
||||||
}
|
|
||||||
return sysObjMetadata, nil
|
|
||||||
}
|
|
@ -312,7 +312,7 @@ func (d donutDriver) GetObjectMetadata(bucketName, objectName string) (drivers.O
|
|||||||
if !drivers.IsValidObjectName(objectName) || strings.TrimSpace(objectName) == "" {
|
if !drivers.IsValidObjectName(objectName) || strings.TrimSpace(objectName) == "" {
|
||||||
return drivers.ObjectMetadata{}, iodine.New(drivers.ObjectNameInvalid{Object: objectName}, errParams)
|
return drivers.ObjectMetadata{}, iodine.New(drivers.ObjectNameInvalid{Object: objectName}, errParams)
|
||||||
}
|
}
|
||||||
metadata, err := d.donut.GetObjectMetadata(bucketName, objectName)
|
metadata, additionalMetadata, err := d.donut.GetObjectMetadata(bucketName, objectName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return drivers.ObjectMetadata{}, iodine.New(drivers.ObjectNotFound{
|
return drivers.ObjectMetadata{}, iodine.New(drivers.ObjectNotFound{
|
||||||
Bucket: bucketName,
|
Bucket: bucketName,
|
||||||
@ -323,7 +323,7 @@ func (d donutDriver) GetObjectMetadata(bucketName, objectName string) (drivers.O
|
|||||||
Bucket: bucketName,
|
Bucket: bucketName,
|
||||||
Key: objectName,
|
Key: objectName,
|
||||||
|
|
||||||
ContentType: metadata.Metadata["contentType"],
|
ContentType: additionalMetadata["contentType"],
|
||||||
Created: metadata.Created,
|
Created: metadata.Created,
|
||||||
Md5: metadata.MD5Sum,
|
Md5: metadata.MD5Sum,
|
||||||
Size: metadata.Size,
|
Size: metadata.Size,
|
||||||
@ -365,7 +365,7 @@ func (d donutDriver) ListObjects(bucketName string, resources drivers.BucketReso
|
|||||||
}
|
}
|
||||||
var results []drivers.ObjectMetadata
|
var results []drivers.ObjectMetadata
|
||||||
for _, objectName := range actualObjects {
|
for _, objectName := range actualObjects {
|
||||||
objectMetadata, err := d.donut.GetObjectMetadata(bucketName, objectName)
|
objectMetadata, _, err := d.donut.GetObjectMetadata(bucketName, objectName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, drivers.BucketResourcesMetadata{}, iodine.New(err, errParams)
|
return nil, drivers.BucketResourcesMetadata{}, iodine.New(err, errParams)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user