Merge pull request #312 from fkautz/pr_out_list_objects_in_buckets

This commit is contained in:
Frederick F. Kautz IV 2015-03-16 20:43:50 -07:00
commit 1766b588fb
2 changed files with 74 additions and 3 deletions

View File

@ -18,9 +18,13 @@ package encoded
import ( import (
"bytes" "bytes"
"crypto/md5"
"encoding/hex"
"errors" "errors"
"io" "io"
"sort"
"strconv" "strconv"
"strings"
"time" "time"
"github.com/minio-io/minio/pkg/donutbox" "github.com/minio-io/minio/pkg/donutbox"
@ -164,7 +168,71 @@ func (diskStorage StorageDriver) GetObjectMetadata(bucket, key string, prefix st
// ListObjects lists objects // ListObjects lists objects
func (diskStorage StorageDriver) ListObjects(bucket string, resources storage.BucketResourcesMetadata) ([]storage.ObjectMetadata, storage.BucketResourcesMetadata, error) { func (diskStorage StorageDriver) ListObjects(bucket string, resources storage.BucketResourcesMetadata) ([]storage.ObjectMetadata, storage.BucketResourcesMetadata, error) {
return nil, storage.BucketResourcesMetadata{}, errors.New("Not Implemented") objects, err := diskStorage.donutBox.ListObjectsInBucket(bucket, resources.Prefix)
if err != nil {
return nil, storage.BucketResourcesMetadata{}, err
}
var results []storage.ObjectMetadata
sort.Strings(objects)
for _, object := range withoutDelimiter(objects, resources.Prefix, resources.Delimiter) {
if len(results) < resources.Maxkeys {
objectMetadata, err := diskStorage.GetObjectMetadata(bucket, object, "")
if err != nil {
return nil, storage.BucketResourcesMetadata{}, err
}
results = append(results, objectMetadata)
} else {
resources.IsTruncated = true
}
}
if resources.Delimiter != "" {
objects = trimPrefixWithDelimiter(objects, resources.Prefix, resources.Delimiter)
objects = beforeDelimiter(objects, resources.Delimiter)
objects = removeDuplicates(objects)
resources.CommonPrefixes = objects
}
return results, resources, nil
}
func withoutDelimiter(inputs []string, prefix, delim string) (results []string) {
if delim == "" {
return inputs
}
for _, input := range inputs {
input = strings.TrimPrefix(input, prefix)
if !strings.Contains(input, delim) {
results = append(results, prefix+input)
}
}
return results
}
func trimPrefixWithDelimiter(inputs []string, prefix, delim string) (results []string) {
for _, input := range inputs {
input = strings.TrimPrefix(input, prefix)
if strings.Contains(input, delim) {
results = append(results, input)
}
}
return results
}
func beforeDelimiter(inputs []string, delim string) (results []string) {
for _, input := range inputs {
results = append(results, strings.Split(input, delim)[0]+delim)
}
return results
}
func removeDuplicates(inputs []string) (results []string) {
keys := make(map[string]string)
for _, input := range inputs {
keys[input] = input
}
for result := range keys {
results = append(results, result)
}
return results
} }
// CreateObject creates a new object // CreateObject creates a new object
@ -182,11 +250,13 @@ func (diskStorage StorageDriver) CreateObject(bucketKey string, objectKey string
} }
totalLength := uint64(0) totalLength := uint64(0)
chunkCount := 0 chunkCount := 0
hasher := md5.New()
for chunk := range splitStream { for chunk := range splitStream {
params, err := erasure.ParseEncoderParams(8, 8, erasure.Cauchy) params, err := erasure.ParseEncoderParams(8, 8, erasure.Cauchy)
if err != nil { if err != nil {
return err return err
} }
hasher.Write(chunk.Data)
totalLength = totalLength + uint64(len(chunk.Data)) totalLength = totalLength + uint64(len(chunk.Data))
chunkCount = chunkCount + 1 chunkCount = chunkCount + 1
encoder := erasure.NewEncoder(params) encoder := erasure.NewEncoder(params)
@ -213,7 +283,7 @@ func (diskStorage StorageDriver) CreateObject(bucketKey string, objectKey string
ContentType: contentType, ContentType: contentType,
Created: time.Now(), Created: time.Now(),
Md5: "md5", Md5: hex.EncodeToString(hasher.Sum(nil)),
Size: int64(totalLength), Size: int64(totalLength),
} }

View File

@ -85,9 +85,9 @@ func testPaging(c *check.C, create func() Storage) {
storage.CreateBucket("bucket") storage.CreateBucket("bucket")
resources := BucketResourcesMetadata{} resources := BucketResourcesMetadata{}
objects, resources, err := storage.ListObjects("bucket", resources) objects, resources, err := storage.ListObjects("bucket", resources)
c.Assert(err, check.IsNil)
c.Assert(len(objects), check.Equals, 0) c.Assert(len(objects), check.Equals, 0)
c.Assert(resources.IsTruncated, check.Equals, false) c.Assert(resources.IsTruncated, check.Equals, false)
c.Assert(err, check.IsNil)
// check before paging occurs // check before paging occurs
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
key := "obj" + strconv.Itoa(i) key := "obj" + strconv.Itoa(i)
@ -140,6 +140,7 @@ func testPaging(c *check.C, create func() Storage) {
resources.Prefix = "this/is/" resources.Prefix = "this/is/"
resources.Maxkeys = 10 resources.Maxkeys = 10
objects, resources, err = storage.ListObjects("bucket", resources) objects, resources, err = storage.ListObjects("bucket", resources)
c.Assert(err, check.IsNil)
c.Assert(len(objects), check.Equals, 1) c.Assert(len(objects), check.Equals, 1)
c.Assert(resources.CommonPrefixes[0], check.Equals, "also/") c.Assert(resources.CommonPrefixes[0], check.Equals, "also/")
} }