ListObjects now considers multipart objects, also move to upstream check.v1

This commit is contained in:
Harshavardhana
2015-07-18 11:20:09 -07:00
parent e397fa48c4
commit 43c908d5b9
44 changed files with 189 additions and 172 deletions

View File

@@ -19,7 +19,6 @@ package donut
import (
"bytes"
"fmt"
"hash"
"io"
"path/filepath"
"sort"
@@ -154,6 +153,13 @@ func (b bucket) ListObjects(prefix, marker, delimiter string, maxkeys int) (List
if err != nil {
return ListObjectsResults{}, iodine.New(err, nil)
}
for objectName := range bucketMetadata.Buckets[b.getBucketName()].Multiparts {
if strings.HasPrefix(objectName, strings.TrimSpace(prefix)) {
if objectName > marker {
objects = append(objects, objectName)
}
}
}
for objectName := range bucketMetadata.Buckets[b.getBucketName()].BucketObjects {
if strings.HasPrefix(objectName, strings.TrimSpace(prefix)) {
if objectName > marker {
@@ -171,7 +177,7 @@ func (b bucket) ListObjects(prefix, marker, delimiter string, maxkeys int) (List
filteredObjects = HasNoDelimiter(objects, delimiter)
prefixes = HasDelimiter(objects, delimiter)
prefixes = SplitDelimiter(prefixes, delimiter)
prefixes = SortU(prefixes)
prefixes = SortUnique(prefixes)
}
var results []string
var commonPrefixes []string
@@ -264,8 +270,9 @@ func (b bucket) WriteObject(objectName string, objectData io.Reader, expectedMD5
CleanupWritersOnError(writers)
return ObjectMetadata{}, iodine.New(err, nil)
}
mwriter := io.MultiWriter(sumMD5, sum256, sum512)
// write encoded data with k, m and writers
chunkCount, totalLength, err := b.writeObjectData(k, m, writers, objectData, sumMD5, sum256, sum512)
chunkCount, totalLength, err := b.writeObjectData(k, m, writers, objectData, mwriter)
if err != nil {
CleanupWritersOnError(writers)
return ObjectMetadata{}, iodine.New(err, nil)
@@ -364,11 +371,11 @@ func (b bucket) writeObjectMetadata(objectName string, objMetadata ObjectMetadat
// readObjectMetadata - read object metadata
func (b bucket) readObjectMetadata(objectName string) (ObjectMetadata, error) {
objMetadata := ObjectMetadata{}
if objectName == "" {
return ObjectMetadata{}, iodine.New(InvalidArgument{}, nil)
}
var err error
objMetadata := ObjectMetadata{}
objMetadataReaders, err := b.getObjectReaders(objectName, objectMetadataConfig)
if err != nil {
return ObjectMetadata{}, iodine.New(err, nil)
@@ -415,7 +422,7 @@ func (b bucket) getDataAndParity(totalWriters int) (k uint8, m uint8, err error)
}
// writeObjectData -
func (b bucket) writeObjectData(k, m uint8, writers []io.WriteCloser, objectData io.Reader, sumMD5, sum256, sum512 hash.Hash) (int, int, error) {
func (b bucket) writeObjectData(k, m uint8, writers []io.WriteCloser, objectData io.Reader, writer io.Writer) (int, int, error) {
encoder, err := newEncoder(k, m, "Cauchy")
if err != nil {
return 0, 0, iodine.New(err, nil)
@@ -432,9 +439,7 @@ func (b bucket) writeObjectData(k, m uint8, writers []io.WriteCloser, objectData
return 0, 0, iodine.New(err, nil)
}
sumMD5.Write(chunk.Data)
sum256.Write(chunk.Data)
sum512.Write(chunk.Data)
writer.Write(chunk.Data)
for blockIndex, block := range encodedBlocks {
errCh := make(chan error, 1)
go func(writer io.Writer, reader io.Reader) {

View File

@@ -19,7 +19,7 @@ package data
import (
"testing"
. "github.com/minio/check"
. "gopkg.in/check.v1"
)
func Test(t *testing.T) { TestingT(t) }

View File

@@ -19,7 +19,7 @@ package metadata
import (
"testing"
. "github.com/minio/check"
. "gopkg.in/check.v1"
)
func Test(t *testing.T) { TestingT(t) }

View File

@@ -168,8 +168,8 @@ func SplitDelimiter(objects []string, delim string) []string {
return results
}
// SortU sort a slice in lexical order, removing duplicate elements
func SortU(objects []string) []string {
// SortUnique sort a slice in lexical order, removing duplicate elements
func SortUnique(objects []string) []string {
objectMap := make(map[string]string)
for _, v := range objects {
objectMap[v] = v

View File

@@ -22,7 +22,7 @@ import (
"path/filepath"
"testing"
. "github.com/minio/check"
. "gopkg.in/check.v1"
)
func TestDisk(t *testing.T) { TestingT(t) }

View File

@@ -191,7 +191,7 @@ func (donut API) putObjectPart(bucket, object, expectedMD5Sum, uploadID string,
if err != nil {
return PartMetadata{}, iodine.New(err, errParams)
}
if _, ok := bucketMeta.Buckets[bucket].Multiparts[object+uploadID]; !ok {
if _, ok := bucketMeta.Buckets[bucket].Multiparts[object]; !ok {
return PartMetadata{}, iodine.New(InvalidUploadID{UploadID: uploadID}, nil)
}
if _, ok := bucketMeta.Buckets[bucket].BucketObjects[object]; ok {
@@ -208,9 +208,9 @@ func (donut API) putObjectPart(bucket, object, expectedMD5Sum, uploadID string,
ETag: objmetadata.MD5Sum,
Size: objmetadata.Size,
}
multipartSession := bucketMeta.Buckets[bucket].Multiparts[object+uploadID]
multipartSession := bucketMeta.Buckets[bucket].Multiparts[object]
multipartSession.Parts[strconv.Itoa(partID)] = partMetadata
bucketMeta.Buckets[bucket].Multiparts[object+uploadID] = multipartSession
bucketMeta.Buckets[bucket].Multiparts[object] = multipartSession
if err := donut.setDonutBucketMetadata(bucketMeta); err != nil {
return PartMetadata{}, iodine.New(err, errParams)
}
@@ -297,7 +297,7 @@ func (donut API) newMultipartUpload(bucket, object, contentType string) (string,
Parts: make(map[string]PartMetadata),
TotalParts: 0,
}
multiparts[object+uploadID] = multipartSession
multiparts[object] = multipartSession
bucketMetadata.Multiparts = multiparts
allbuckets.Buckets[bucket] = bucketMetadata
@@ -326,12 +326,16 @@ func (donut API) listObjectParts(bucket, object string, resources ObjectResource
if _, ok := donut.buckets[bucket]; !ok {
return ObjectResourcesMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, errParams)
}
bucketMetadata, err := donut.getDonutBucketMetadata()
allBuckets, err := donut.getDonutBucketMetadata()
if err != nil {
return ObjectResourcesMetadata{}, iodine.New(err, errParams)
}
if _, ok := bucketMetadata.Buckets[bucket].Multiparts[object+resources.UploadID]; !ok {
return ObjectResourcesMetadata{}, iodine.New(InvalidUploadID{UploadID: resources.UploadID}, nil)
bucketMetadata := allBuckets.Buckets[bucket]
if _, ok := bucketMetadata.Multiparts[object]; !ok {
return ObjectResourcesMetadata{}, iodine.New(InvalidUploadID{UploadID: resources.UploadID}, errParams)
}
if bucketMetadata.Multiparts[object].UploadID != resources.UploadID {
return ObjectResourcesMetadata{}, iodine.New(InvalidUploadID{UploadID: resources.UploadID}, errParams)
}
objectResourcesMetadata := resources
objectResourcesMetadata.Bucket = bucket
@@ -344,7 +348,7 @@ func (donut API) listObjectParts(bucket, object string, resources ObjectResource
default:
startPartNumber = objectResourcesMetadata.PartNumberMarker
}
for i := startPartNumber; i <= bucketMetadata.Buckets[bucket].Multiparts[object+resources.UploadID].TotalParts; i++ {
for i := startPartNumber; i <= bucketMetadata.Multiparts[object].TotalParts; i++ {
if len(parts) > objectResourcesMetadata.MaxParts {
sort.Sort(partNumber(parts))
objectResourcesMetadata.IsTruncated = true
@@ -352,7 +356,7 @@ func (donut API) listObjectParts(bucket, object string, resources ObjectResource
objectResourcesMetadata.NextPartNumberMarker = i
return objectResourcesMetadata, nil
}
part, ok := bucketMetadata.Buckets[bucket].Multiparts[object+resources.UploadID].Parts[strconv.Itoa(i)]
part, ok := bucketMetadata.Multiparts[object].Parts[strconv.Itoa(i)]
if !ok {
return ObjectResourcesMetadata{}, iodine.New(InvalidPart{}, nil)
}
@@ -382,11 +386,15 @@ func (donut API) completeMultipartUpload(bucket, object, uploadID string, data i
if _, ok := donut.buckets[bucket]; !ok {
return ObjectMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, errParams)
}
bucketMetadata, err := donut.getDonutBucketMetadata()
allBuckets, err := donut.getDonutBucketMetadata()
if err != nil {
return ObjectMetadata{}, iodine.New(err, errParams)
}
if _, ok := bucketMetadata.Buckets[bucket].Multiparts[object+uploadID]; !ok {
bucketMetadata := allBuckets.Buckets[bucket]
if _, ok := bucketMetadata.Multiparts[object]; !ok {
return ObjectMetadata{}, iodine.New(InvalidUploadID{UploadID: uploadID}, errParams)
}
if bucketMetadata.Multiparts[object].UploadID != uploadID {
return ObjectMetadata{}, iodine.New(InvalidUploadID{UploadID: uploadID}, errParams)
}
partBytes, err := ioutil.ReadAll(data)
@@ -409,10 +417,15 @@ func (donut API) completeMultipartUpload(bucket, object, uploadID string, data i
if !sort.IsSorted(completedParts(parts.Part)) {
return ObjectMetadata{}, iodine.New(InvalidPartOrder{}, errParams)
}
for _, part := range parts.Part {
if part.ETag != bucketMetadata.Multiparts[object].Parts[strconv.Itoa(part.PartNumber)].ETag {
return ObjectMetadata{}, iodine.New(InvalidPart{}, errParams)
}
}
var finalETagBytes []byte
var finalSize int64
totalParts := strconv.Itoa(bucketMetadata.Buckets[bucket].Multiparts[object+uploadID].TotalParts)
for _, part := range bucketMetadata.Buckets[bucket].Multiparts[object+uploadID].Parts {
totalParts := strconv.Itoa(bucketMetadata.Multiparts[object].TotalParts)
for _, part := range bucketMetadata.Multiparts[object].Parts {
partETagBytes, err := hex.DecodeString(part.ETag)
if err != nil {
return ObjectMetadata{}, iodine.New(err, errParams)
@@ -426,7 +439,7 @@ func (donut API) completeMultipartUpload(bucket, object, uploadID string, data i
objMetadata.Object = object
objMetadata.Bucket = bucket
objMetadata.Size = finalSize
objMetadata.Created = bucketMetadata.Buckets[bucket].Multiparts[object+uploadID].Parts[totalParts].LastModified
objMetadata.Created = bucketMetadata.Multiparts[object].Parts[totalParts].LastModified
return objMetadata, nil
}
@@ -446,7 +459,6 @@ func (donut API) listMultipartUploads(bucket string, resources BucketMultipartRe
return BucketMultipartResourcesMetadata{}, iodine.New(err, errParams)
}
bucketMetadata := allbuckets.Buckets[bucket]
var uploads []*UploadMetadata
for key, session := range bucketMetadata.Multiparts {
if strings.HasPrefix(key, resources.Prefix) {
@@ -510,7 +522,13 @@ func (donut API) abortMultipartUpload(bucket, object, uploadID string) error {
return iodine.New(err, errParams)
}
bucketMetadata := allbuckets.Buckets[bucket]
delete(bucketMetadata.Multiparts, object+uploadID)
if _, ok := bucketMetadata.Multiparts[object]; !ok {
return iodine.New(InvalidUploadID{UploadID: uploadID}, errParams)
}
if bucketMetadata.Multiparts[object].UploadID != uploadID {
return iodine.New(InvalidUploadID{UploadID: uploadID}, errParams)
}
delete(bucketMetadata.Multiparts, object)
allbuckets.Buckets[bucket] = bucketMetadata
if err := donut.setDonutBucketMetadata(allbuckets); err != nil {

View File

@@ -27,7 +27,7 @@ import (
"strconv"
"testing"
. "github.com/minio/check"
. "gopkg.in/check.v1"
)
func TestDonut(t *testing.T) { TestingT(t) }

View File

@@ -579,7 +579,7 @@ func (donut API) ListObjects(bucket string, resources BucketResourcesMetadata, s
filteredKeys = HasNoDelimiter(keys, resources.Delimiter)
prefixes = HasDelimiter(keys, resources.Delimiter)
prefixes = SplitDelimiter(prefixes, resources.Delimiter)
prefixes = SortU(prefixes)
prefixes = SortUnique(prefixes)
}
for _, commonPrefix := range prefixes {
resources.CommonPrefixes = append(resources.CommonPrefixes, resources.Prefix+commonPrefix)

View File

@@ -26,7 +26,7 @@ import (
"path/filepath"
"testing"
. "github.com/minio/check"
. "gopkg.in/check.v1"
)
func TestCache(t *testing.T) { TestingT(t) }

View File

@@ -24,8 +24,8 @@ import (
"strconv"
"testing"
. "github.com/minio/check"
"github.com/minio/minio/pkg/donut/split"
. "gopkg.in/check.v1"
)
type MySuite struct{}