Implement delimiter, path prefix

This commit is contained in:
Harshavardhana 2015-02-26 17:23:42 -08:00
parent 01e84c596c
commit 53669a0854
10 changed files with 168 additions and 80 deletions

View File

@ -28,6 +28,7 @@ import (
"testing" "testing"
"time" "time"
mstorage "github.com/minio-io/minio/pkg/storage"
"github.com/minio-io/minio/pkg/storage/inmemory" "github.com/minio-io/minio/pkg/storage/inmemory"
. "gopkg.in/check.v1" . "gopkg.in/check.v1"
) )
@ -243,9 +244,13 @@ func (s *MySuite) TestPutObject(c *C) {
testServer := httptest.NewServer(httpHandler) testServer := httptest.NewServer(httpHandler)
defer testServer.Close() defer testServer.Close()
objects, isTruncated, err := storage.ListObjects("bucket", "", 1000) resources := mstorage.BucketResourcesMetadata{}
resources.Maxkeys = 1000
resources.Prefix = ""
objects, resources, err := storage.ListObjects("bucket", resources)
c.Assert(len(objects), Equals, 0) c.Assert(len(objects), Equals, 0)
c.Assert(isTruncated, Equals, false) c.Assert(resources.IsTruncated, Equals, false)
c.Assert(err, Not(IsNil)) c.Assert(err, Not(IsNil))
date1 := time.Now() date1 := time.Now()
@ -268,9 +273,12 @@ func (s *MySuite) TestPutObject(c *C) {
date2 := time.Now() date2 := time.Now()
objects, isTruncated, err = storage.ListObjects("bucket", "", 1000) resources.Maxkeys = 1000
resources.Prefix = ""
objects, resources, err = storage.ListObjects("bucket", resources)
c.Assert(len(objects), Equals, 1) c.Assert(len(objects), Equals, 1)
c.Assert(isTruncated, Equals, false) c.Assert(resources.IsTruncated, Equals, false)
c.Assert(err, IsNil) c.Assert(err, IsNil)
var writer bytes.Buffer var writer bytes.Buffer

View File

@ -35,17 +35,17 @@ func (server *minioApi) listObjectsHandler(w http.ResponseWriter, req *http.Requ
bucket := vars["bucket"] bucket := vars["bucket"]
resources := getBucketResources(req.URL.Query()) resources := getBucketResources(req.URL.Query())
if resources.policy == true { if resources.Policy == true {
server.getBucketPolicyHandler(w, req) server.getBucketPolicyHandler(w, req)
return return
} }
acceptsContentType := getContentType(req) acceptsContentType := getContentType(req)
objects, isTruncated, err := server.storage.ListObjects(bucket, resources.prefix, 1000) objects, resources, err := server.storage.ListObjects(bucket, resources)
switch err := err.(type) { switch err := err.(type) {
case nil: // success case nil: // success
{ {
response := generateObjectsListResult(bucket, objects, isTruncated) response := generateObjectsListResult(bucket, objects, resources.IsTruncated)
w.Write(writeObjectHeadersAndResponse(w, response, acceptsContentType)) w.Write(writeObjectHeadersAndResponse(w, response, acceptsContentType))
} }
case mstorage.BucketNotFound: case mstorage.BucketNotFound:
@ -74,7 +74,7 @@ func (server *minioApi) listObjectsHandler(w http.ResponseWriter, req *http.Requ
case mstorage.ObjectNameInvalid: case mstorage.ObjectNameInvalid:
{ {
error := errorCodeError(NoSuchKey) error := errorCodeError(NoSuchKey)
errorResponse := getErrorResponse(error, resources.prefix) errorResponse := getErrorResponse(error, resources.Prefix)
w.WriteHeader(error.HttpStatusCode) w.WriteHeader(error.HttpStatusCode)
w.Write(writeErrorResponse(w, errorResponse, acceptsContentType)) w.Write(writeErrorResponse(w, errorResponse, acceptsContentType))
} }
@ -122,7 +122,7 @@ func (server *minioApi) putBucketHandler(w http.ResponseWriter, req *http.Reques
err := server.storage.StoreBucket(bucket) err := server.storage.StoreBucket(bucket)
resources := getBucketResources(req.URL.Query()) resources := getBucketResources(req.URL.Query())
if resources.policy == true { if resources.Policy == true {
server.putBucketPolicyHandler(w, req) server.putBucketPolicyHandler(w, req)
return return
} }

View File

@ -29,10 +29,15 @@ const (
type ObjectListResponse struct { type ObjectListResponse struct {
XMLName xml.Name `xml:"ListBucketResult" json:"-"` XMLName xml.Name `xml:"ListBucketResult" json:"-"`
Name string Name string
Prefix string
Marker string Marker string
MaxKeys int MaxKeys int
Delimiter string
IsTruncated bool IsTruncated bool
Contents []*Item `xml:,innerxml` Contents []*Item `xml:,innerxml`
CommonPrefixes struct {
Prefix string
} `xml:,innerxml`
} }
// Bucket list response format // Bucket list response format

View File

@ -130,7 +130,7 @@ func (server *minioApi) putObjectHandler(w http.ResponseWriter, req *http.Reques
object = vars["object"] object = vars["object"]
resources := getBucketResources(req.URL.Query()) resources := getBucketResources(req.URL.Query())
if resources.policy == true && object == "" { if resources.Policy == true && object == "" {
server.putBucketPolicyHandler(w, req) server.putBucketPolicyHandler(w, req)
return return
} }

View File

@ -19,32 +19,24 @@ package minioapi
import ( import (
"net/url" "net/url"
"strconv" "strconv"
mstorage "github.com/minio-io/minio/pkg/storage"
) )
// support bucket resources go here
type bucketResources struct {
prefix string
marker string
maxkeys int
policy bool
delimiter string
// uploads bool - TODO implemented with multipart support
}
// parse bucket url queries // parse bucket url queries
func getBucketResources(values url.Values) (v bucketResources) { func getBucketResources(values url.Values) (v mstorage.BucketResourcesMetadata) {
for key, value := range values { for key, value := range values {
switch true { switch true {
case key == "prefix": case key == "prefix":
v.prefix = value[0] v.Prefix = value[0]
case key == "marker": case key == "marker":
v.marker = value[0] v.Marker = value[0]
case key == "maxkeys": case key == "max-keys":
v.maxkeys, _ = strconv.Atoi(value[0]) v.Maxkeys, _ = strconv.Atoi(value[0])
case key == "policy": case key == "policy":
v.policy = true v.Policy = true
case key == "delimiter": case key == "delimiter":
v.delimiter = value[0] v.Delimiter = value[0]
} }
} }
return return

View File

@ -23,6 +23,8 @@ import (
"io/ioutil" "io/ioutil"
"os" "os"
"path" "path"
"path/filepath"
"sort"
"strings" "strings"
"sync" "sync"
@ -291,45 +293,96 @@ func (storage *storage) GetObjectMetadata(bucket string, object string) (mstorag
return metadata, nil return metadata, nil
} }
func (storage *storage) ListObjects(bucket, prefix string, count int) ([]mstorage.ObjectMetadata, bool, error) { type Path struct {
if mstorage.IsValidBucket(bucket) == false { files map[string]os.FileInfo
return []mstorage.ObjectMetadata{}, false, mstorage.BucketNameInvalid{Bucket: bucket} root string
}
func (p *Path) getAllFiles(path string, fl os.FileInfo, err error) error {
if err != nil {
return err
} }
if mstorage.IsValidObject(prefix) == false { if fl.Mode().IsRegular() {
return []mstorage.ObjectMetadata{}, false, mstorage.ObjectNameInvalid{Bucket: bucket, Object: prefix} if strings.HasSuffix(path, "$metadata") {
return nil
}
_p := strings.Split(path, p.root+"/")
if len(_p) > 1 {
p.files[_p[1]] = fl
}
}
return nil
}
type ByObjectKey []mstorage.ObjectMetadata
func (b ByObjectKey) Len() int { return len(b) }
func (b ByObjectKey) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
func (b ByObjectKey) Less(i, j int) bool { return b[i].Key < b[j].Key }
func (storage *storage) ListObjects(bucket string, resources mstorage.BucketResourcesMetadata) ([]mstorage.ObjectMetadata, mstorage.BucketResourcesMetadata, error) {
p := Path{}
p.files = make(map[string]os.FileInfo)
if mstorage.IsValidBucket(bucket) == false {
return []mstorage.ObjectMetadata{}, resources, mstorage.BucketNameInvalid{Bucket: bucket}
}
if mstorage.IsValidObject(resources.Prefix) == false {
return []mstorage.ObjectMetadata{}, resources, mstorage.ObjectNameInvalid{Bucket: bucket, Object: resources.Prefix}
} }
rootPrefix := path.Join(storage.root, bucket) rootPrefix := path.Join(storage.root, bucket)
// check bucket exists // check bucket exists
if _, err := os.Stat(rootPrefix); os.IsNotExist(err) { if _, err := os.Stat(rootPrefix); os.IsNotExist(err) {
return []mstorage.ObjectMetadata{}, false, mstorage.BucketNotFound{Bucket: bucket} return []mstorage.ObjectMetadata{}, resources, mstorage.BucketNotFound{Bucket: bucket}
} }
files, err := ioutil.ReadDir(rootPrefix) p.root = rootPrefix
err := filepath.Walk(rootPrefix, p.getAllFiles)
if err != nil { if err != nil {
return []mstorage.ObjectMetadata{}, false, mstorage.EmbedError("bucket", "", err) return []mstorage.ObjectMetadata{}, resources, mstorage.EmbedError(bucket, "", err)
} }
var metadataList []mstorage.ObjectMetadata var metadataList []mstorage.ObjectMetadata
for _, file := range files { for name, file := range p.files {
if !strings.HasSuffix(file.Name(), "$metadata") { if len(metadataList) >= resources.Maxkeys {
if len(metadataList) >= count { resources.IsTruncated = true
return metadataList, true, nil goto ret
} }
if strings.HasPrefix(file.Name(), prefix) { // TODO handle resources.Marker
if resources.Delimiter != "" {
metadata := mstorage.ObjectMetadata{ metadata := mstorage.ObjectMetadata{
Bucket: bucket, Bucket: bucket,
Key: file.Name(), Maxkeys: resources.Maxkeys,
Prefix: resources.Prefix,
Marker: resources.Marker,
Delimiter: resources.Delimiter,
}
metadataList = append(metadataList, metadata)
}
if resources.Delimiter != "" && strings.HasPrefix(name, resources.Prefix) {
metadata := mstorage.ObjectMetadata{}
metadataList = append(metadataList, metadata)
}
if strings.HasPrefix(name, resources.Prefix) {
metadata := mstorage.ObjectMetadata{
Bucket: bucket,
Maxkeys: resources.Maxkeys,
Prefix: resources.Prefix,
Marker: resources.Marker,
Delimiter: resources.Delimiter,
Key: name,
Created: file.ModTime(), Created: file.ModTime(),
Size: file.Size(), Size: file.Size(),
ETag: bucket + "#" + file.Name(), ETag: bucket + "#" + name,
} }
metadataList = append(metadataList, metadata) metadataList = append(metadataList, metadata)
} }
} }
}
return metadataList, false, nil ret:
sort.Sort(ByObjectKey(metadataList))
return metadataList, resources, nil
} }
func (storage *storage) StoreObject(bucket, key, contentType string, data io.Reader) error { func (storage *storage) StoreObject(bucket, key, contentType string, data io.Reader) error {

View File

@ -128,31 +128,31 @@ func (storage *storage) StoreBucket(bucketName string) error {
return nil return nil
} }
func (storage *storage) ListObjects(bucket, prefix string, count int) ([]mstorage.ObjectMetadata, bool, error) { func (storage *storage) ListObjects(bucket string, resources mstorage.BucketResourcesMetadata) ([]mstorage.ObjectMetadata, mstorage.BucketResourcesMetadata, error) {
if _, ok := storage.bucketdata[bucket]; ok == false { if _, ok := storage.bucketdata[bucket]; ok == false {
return []mstorage.ObjectMetadata{}, false, mstorage.BucketNotFound{Bucket: bucket} return []mstorage.ObjectMetadata{}, mstorage.BucketResourcesMetadata{IsTruncated: false}, mstorage.BucketNotFound{Bucket: bucket}
} }
// TODO prefix and count handling // TODO prefix and count handling
var results []mstorage.ObjectMetadata var results []mstorage.ObjectMetadata
var keys []string var keys []string
for key := range storage.objectdata { for key := range storage.objectdata {
if strings.HasPrefix(key, bucket+":"+prefix) { if strings.HasPrefix(key, bucket+":"+resources.Prefix) {
keys = append(keys, key) keys = append(keys, key)
} }
} }
sort.Strings(keys) sort.Strings(keys)
for _, key := range keys { for _, key := range keys {
if len(results) == count { if len(results) == resources.Maxkeys {
return results, true, nil return results, mstorage.BucketResourcesMetadata{IsTruncated: true}, nil
} }
object := storage.objectdata[key] object := storage.objectdata[key]
if bucket == object.metadata.Bucket { if bucket == object.metadata.Bucket {
if strings.HasPrefix(key, bucket+":") { if strings.HasPrefix(key, bucket+":"+resources.Prefix) {
results = append(results, object.metadata) results = append(results, object.metadata)
} }
} }
} }
return results, false, nil return results, resources, nil
} }
type ByBucketName []mstorage.BucketMetadata type ByBucketName []mstorage.BucketMetadata

View File

@ -32,7 +32,7 @@ type Storage interface {
// Object Operations // Object Operations
CopyObjectToWriter(w io.Writer, bucket string, object string) (int64, error) CopyObjectToWriter(w io.Writer, bucket string, object string) (int64, error)
GetObjectMetadata(bucket string, object string) (ObjectMetadata, error) GetObjectMetadata(bucket string, object string) (ObjectMetadata, error)
ListObjects(bucket, prefix string, count int) ([]ObjectMetadata, bool, error) ListObjects(bucket string, resources BucketResourcesMetadata) ([]ObjectMetadata, BucketResourcesMetadata, error)
StoreObject(bucket string, key string, contentType string, data io.Reader) error StoreObject(bucket string, key string, contentType string, data io.Reader) error
} }
@ -44,6 +44,10 @@ type BucketMetadata struct {
type ObjectMetadata struct { type ObjectMetadata struct {
Bucket string Bucket string
Key string Key string
Maxkeys int
Prefix string
Marker string
Delimiter string
ContentType string ContentType string
Created time.Time Created time.Time
@ -51,6 +55,21 @@ type ObjectMetadata struct {
Size int64 Size int64
} }
// Various types of bucket resources
type BucketResourcesMetadata struct {
Prefix string
Marker string
Maxkeys int
Delimiter string
IsTruncated bool
CommonPrefixes []string
Policy bool
// TODO
Logging string
Notification string
}
func IsValidBucket(bucket string) bool { func IsValidBucket(bucket string) bool {
if len(bucket) < 3 || len(bucket) > 63 { if len(bucket) < 3 || len(bucket) > 63 {
return false return false

View File

@ -81,40 +81,46 @@ func testMultipleObjectCreation(c *C, create func() Storage) {
func testPaging(c *C, create func() Storage) { func testPaging(c *C, create func() Storage) {
storage := create() storage := create()
storage.StoreBucket("bucket") storage.StoreBucket("bucket")
storage.ListObjects("bucket", "", 5) resources := BucketResourcesMetadata{}
objects, isTruncated, err := storage.ListObjects("bucket", "", 5) objects, resources, err := storage.ListObjects("bucket", resources)
c.Assert(len(objects), Equals, 0) c.Assert(len(objects), Equals, 0)
c.Assert(isTruncated, Equals, false) c.Assert(resources.IsTruncated, Equals, false)
c.Assert(err, IsNil) c.Assert(err, IsNil)
// check before paging occurs // check before paging occurs
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
key := "obj" + strconv.Itoa(i) key := "obj" + strconv.Itoa(i)
storage.StoreObject("bucket", key, "", bytes.NewBufferString(key)) storage.StoreObject("bucket", key, "", bytes.NewBufferString(key))
objects, isTruncated, err = storage.ListObjects("bucket", "", 5) resources.Maxkeys = 5
objects, resources, err = storage.ListObjects("bucket", resources)
c.Assert(len(objects), Equals, i+1) c.Assert(len(objects), Equals, i+1)
c.Assert(isTruncated, Equals, false) c.Assert(resources.IsTruncated, Equals, false)
c.Assert(err, IsNil) c.Assert(err, IsNil)
} }
// check after paging occurs pages work // check after paging occurs pages work
for i := 6; i <= 10; i++ { for i := 6; i <= 10; i++ {
key := "obj" + strconv.Itoa(i) key := "obj" + strconv.Itoa(i)
storage.StoreObject("bucket", key, "", bytes.NewBufferString(key)) storage.StoreObject("bucket", key, "", bytes.NewBufferString(key))
objects, isTruncated, err = storage.ListObjects("bucket", "", 5) resources.Maxkeys = 5
objects, resources, err = storage.ListObjects("bucket", resources)
c.Assert(len(objects), Equals, 5) c.Assert(len(objects), Equals, 5)
c.Assert(isTruncated, Equals, true) c.Assert(resources.IsTruncated, Equals, true)
c.Assert(err, IsNil) c.Assert(err, IsNil)
} }
// check paging with prefix at end returns less objects // check paging with prefix at end returns less objects
{ {
storage.StoreObject("bucket", "newPrefix", "", bytes.NewBufferString("prefix1")) storage.StoreObject("bucket", "newPrefix", "", bytes.NewBufferString("prefix1"))
storage.StoreObject("bucket", "newPrefix2", "", bytes.NewBufferString("prefix2")) storage.StoreObject("bucket", "newPrefix2", "", bytes.NewBufferString("prefix2"))
objects, isTruncated, err = storage.ListObjects("bucket", "new", 5) resources.Prefix = "new"
resources.Maxkeys = 5
objects, resources, err = storage.ListObjects("bucket", resources)
c.Assert(len(objects), Equals, 2) c.Assert(len(objects), Equals, 2)
} }
// check ordering of pages // check ordering of pages
{ {
objects, isTruncated, err = storage.ListObjects("bucket", "", 5) resources.Prefix = ""
resources.Maxkeys = 1000
objects, resources, err = storage.ListObjects("bucket", resources)
c.Assert(objects[0].Key, Equals, "newPrefix") c.Assert(objects[0].Key, Equals, "newPrefix")
c.Assert(objects[1].Key, Equals, "newPrefix2") c.Assert(objects[1].Key, Equals, "newPrefix2")
c.Assert(objects[2].Key, Equals, "obj0") c.Assert(objects[2].Key, Equals, "obj0")
@ -123,7 +129,9 @@ func testPaging(c *C, create func() Storage) {
} }
// check ordering of results with prefix // check ordering of results with prefix
{ {
objects, isTruncated, err = storage.ListObjects("bucket", "obj", 5) resources.Prefix = "obj"
resources.Maxkeys = 1000
objects, resources, err = storage.ListObjects("bucket", resources)
c.Assert(objects[0].Key, Equals, "obj0") c.Assert(objects[0].Key, Equals, "obj0")
c.Assert(objects[1].Key, Equals, "obj1") c.Assert(objects[1].Key, Equals, "obj1")
c.Assert(objects[2].Key, Equals, "obj10") c.Assert(objects[2].Key, Equals, "obj10")
@ -132,7 +140,9 @@ func testPaging(c *C, create func() Storage) {
} }
// check ordering of results with prefix and no paging // check ordering of results with prefix and no paging
{ {
objects, isTruncated, err = storage.ListObjects("bucket", "new", 5) resources.Prefix = "new"
resources.Maxkeys = 5
objects, resources, err = storage.ListObjects("bucket", resources)
c.Assert(objects[0].Key, Equals, "newPrefix") c.Assert(objects[0].Key, Equals, "newPrefix")
c.Assert(objects[1].Key, Equals, "newPrefix2") c.Assert(objects[1].Key, Equals, "newPrefix2")
} }
@ -230,9 +240,10 @@ func testListBucketsOrder(c *C, create func() Storage) {
func testListObjectsTestsForNonExistantBucket(c *C, create func() Storage) { func testListObjectsTestsForNonExistantBucket(c *C, create func() Storage) {
storage := create() storage := create()
objects, isTruncated, err := storage.ListObjects("bucket", "", 1000) resources := BucketResourcesMetadata{Prefix: "", Maxkeys: 1000}
objects, resources, err := storage.ListObjects("bucket", resources)
c.Assert(err, Not(IsNil)) c.Assert(err, Not(IsNil))
c.Assert(isTruncated, Equals, false) c.Assert(resources.IsTruncated, Equals, false)
c.Assert(len(objects), Equals, 0) c.Assert(len(objects), Equals, 0)
} }

View File

@ -7,7 +7,7 @@
// Using this part of Minio codebase under the license // Using this part of Minio codebase under the license
// Apache License Version 2.0 with modifications // Apache License Version 2.0 with modifications
// Package sha256 SHA256SSE3, SHA256AVX, SHA256AVX2 // Package sha256 provides SHA256SSE3, SHA256AVX, SHA256AVX2
package sha256 package sha256
import ( import (