Merge pull request #1039 from harshavardhana/channel-ftw

listObjects: Channel based file tree walk.
This commit is contained in:
Harshavardhana 2016-01-26 12:41:21 -08:00
commit 68a25aa425
18 changed files with 482 additions and 570 deletions

View File

@ -53,7 +53,6 @@ const (
InvalidBucketName InvalidBucketName
InvalidDigest InvalidDigest
InvalidRange InvalidRange
InvalidRequest
InvalidMaxKeys InvalidMaxKeys
InvalidMaxUploads InvalidMaxUploads
InvalidMaxParts InvalidMaxParts

View File

@ -24,12 +24,12 @@ import (
) )
// parse bucket url queries // parse bucket url queries
func getBucketResources(values url.Values) (v fs.BucketResourcesMetadata) { func getBucketResources(values url.Values) (prefix, marker, delimiter string, maxkeys int, encodingType string) {
v.Prefix = values.Get("prefix") prefix = values.Get("prefix")
v.Marker = values.Get("marker") marker = values.Get("marker")
v.Maxkeys, _ = strconv.Atoi(values.Get("max-keys")) delimiter = values.Get("delimiter")
v.Delimiter = values.Get("delimiter") maxkeys, _ = strconv.Atoi(values.Get("max-keys"))
v.EncodingType = values.Get("encoding-type") encodingType = values.Get("encoding-type")
return return
} }

View File

@ -92,7 +92,7 @@ func generateAccessControlPolicyResponse(acl fs.BucketACL) AccessControlPolicyRe
} }
// generates an ListObjects response for the said bucket with other enumerated options. // generates an ListObjects response for the said bucket with other enumerated options.
func generateListObjectsResponse(bucket string, objects []fs.ObjectMetadata, bucketResources fs.BucketResourcesMetadata) ListObjectsResponse { func generateListObjectsResponse(bucket, prefix, marker, delimiter string, maxKeys int, resp fs.ListObjectsResult) ListObjectsResponse {
var contents []*Object var contents []*Object
var prefixes []*CommonPrefix var prefixes []*CommonPrefix
var owner = Owner{} var owner = Owner{}
@ -101,7 +101,7 @@ func generateListObjectsResponse(bucket string, objects []fs.ObjectMetadata, buc
owner.ID = "minio" owner.ID = "minio"
owner.DisplayName = "minio" owner.DisplayName = "minio"
for _, object := range objects { for _, object := range resp.Objects {
var content = &Object{} var content = &Object{}
if object.Object == "" { if object.Object == "" {
continue continue
@ -117,13 +117,15 @@ func generateListObjectsResponse(bucket string, objects []fs.ObjectMetadata, buc
// TODO - support EncodingType in xml decoding // TODO - support EncodingType in xml decoding
data.Name = bucket data.Name = bucket
data.Contents = contents data.Contents = contents
data.MaxKeys = bucketResources.Maxkeys
data.Prefix = bucketResources.Prefix data.Prefix = prefix
data.Delimiter = bucketResources.Delimiter data.Marker = marker
data.Marker = bucketResources.Marker data.Delimiter = delimiter
data.NextMarker = bucketResources.NextMarker data.MaxKeys = maxKeys
data.IsTruncated = bucketResources.IsTruncated
for _, prefix := range bucketResources.CommonPrefixes { data.NextMarker = resp.NextMarker
data.IsTruncated = resp.IsTruncated
for _, prefix := range resp.Prefixes {
var prefixItem = &CommonPrefix{} var prefixItem = &CommonPrefix{}
prefixItem.Prefix = prefix prefixItem.Prefix = prefix
prefixes = append(prefixes, prefixItem) prefixes = append(prefixes, prefixItem)

View File

@ -52,6 +52,7 @@ func (api CloudStorageAPI) GetBucketLocationHandler(w http.ResponseWriter, req *
default: default:
writeErrorResponse(w, req, InternalError, req.URL.Path) writeErrorResponse(w, req, InternalError, req.URL.Path)
} }
return
} }
// TODO: Location value for LocationResponse is deliberately not used, until // TODO: Location value for LocationResponse is deliberately not used, until
@ -128,19 +129,21 @@ func (api CloudStorageAPI) ListObjectsHandler(w http.ResponseWriter, req *http.R
} }
} }
} }
resources := getBucketResources(req.URL.Query())
if resources.Maxkeys < 0 { // TODO handle encoding type.
prefix, marker, delimiter, maxkeys, _ := getBucketResources(req.URL.Query())
if maxkeys < 0 {
writeErrorResponse(w, req, InvalidMaxKeys, req.URL.Path) writeErrorResponse(w, req, InvalidMaxKeys, req.URL.Path)
return return
} }
if resources.Maxkeys == 0 { if maxkeys == 0 {
resources.Maxkeys = maxObjectList maxkeys = maxObjectList
} }
objects, resources, err := api.Filesystem.ListObjects(bucket, resources) listResp, err := api.Filesystem.ListObjects(bucket, prefix, marker, delimiter, maxkeys)
if err == nil { if err == nil {
// Generate response // generate response
response := generateListObjectsResponse(bucket, objects, resources) response := generateListObjectsResponse(bucket, prefix, marker, delimiter, maxkeys, listResp)
encodedSuccessResponse := encodeSuccessResponse(response) encodedSuccessResponse := encodeSuccessResponse(response)
// Write headers // Write headers
setCommonHeaders(w) setCommonHeaders(w)

View File

@ -34,6 +34,7 @@ var _ = Suite(&MySuite{})
func (s *MySuite) TestFree(c *C) { func (s *MySuite) TestFree(c *C) {
path, err := ioutil.TempDir(os.TempDir(), "minio-") path, err := ioutil.TempDir(os.TempDir(), "minio-")
defer os.RemoveAll(path)
c.Assert(err, IsNil) c.Assert(err, IsNil)
di, err := disk.GetInfo(path) di, err := disk.GetInfo(path)

View File

@ -165,59 +165,50 @@ func testMultipleObjectCreation(c *check.C, create func() Filesystem) {
func testPaging(c *check.C, create func() Filesystem) { func testPaging(c *check.C, create func() Filesystem) {
fs := create() fs := create()
fs.MakeBucket("bucket", "") fs.MakeBucket("bucket", "")
resources := BucketResourcesMetadata{} result, err := fs.ListObjects("bucket", "", "", "", 0)
objects, resources, err := fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
c.Assert(len(objects), check.Equals, 0) c.Assert(len(result.Objects), check.Equals, 0)
c.Assert(resources.IsTruncated, check.Equals, false) c.Assert(result.IsTruncated, check.Equals, false)
// check before paging occurs // check before paging occurs
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
key := "obj" + strconv.Itoa(i) key := "obj" + strconv.Itoa(i)
_, err = fs.CreateObject("bucket", key, "", int64(len(key)), bytes.NewBufferString(key), nil) _, err = fs.CreateObject("bucket", key, "", int64(len(key)), bytes.NewBufferString(key), nil)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
resources.Maxkeys = 5 result, err = fs.ListObjects("bucket", "", "", "", 5)
resources.Prefix = ""
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
c.Assert(len(objects), check.Equals, i+1) c.Assert(len(result.Objects), check.Equals, i+1)
c.Assert(resources.IsTruncated, check.Equals, false) c.Assert(result.IsTruncated, check.Equals, false)
} }
// check after paging occurs pages work // check after paging occurs pages work
for i := 6; i <= 10; i++ { for i := 6; i <= 10; i++ {
key := "obj" + strconv.Itoa(i) key := "obj" + strconv.Itoa(i)
_, err = fs.CreateObject("bucket", key, "", int64(len(key)), bytes.NewBufferString(key), nil) _, err = fs.CreateObject("bucket", key, "", int64(len(key)), bytes.NewBufferString(key), nil)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
resources.Maxkeys = 5 result, err = fs.ListObjects("bucket", "", "", "", 5)
resources.Prefix = ""
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
c.Assert(len(objects), check.Equals, 5) c.Assert(len(result.Objects), check.Equals, 5)
c.Assert(resources.IsTruncated, check.Equals, true) c.Assert(result.IsTruncated, check.Equals, true)
} }
// check paging with prefix at end returns less objects // check paging with prefix at end returns less objects
{ {
_, err = fs.CreateObject("bucket", "newPrefix", "", int64(len("prefix1")), bytes.NewBufferString("prefix1"), nil) _, err = fs.CreateObject("bucket", "newPrefix", "", int64(len("prefix1")), bytes.NewBufferString("prefix1"), nil)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
fs.CreateObject("bucket", "newPrefix2", "", int64(len("prefix2")), bytes.NewBufferString("prefix2"), nil) _, err = fs.CreateObject("bucket", "newPrefix2", "", int64(len("prefix2")), bytes.NewBufferString("prefix2"), nil)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
resources.Prefix = "new" result, err = fs.ListObjects("bucket", "new", "", "", 5)
resources.Maxkeys = 5
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
c.Assert(len(objects), check.Equals, 2) c.Assert(len(result.Objects), check.Equals, 2)
} }
// check ordering of pages // check ordering of pages
{ {
resources.Prefix = "" result, err = fs.ListObjects("bucket", "", "", "", 1000)
resources.Maxkeys = 1000
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
c.Assert(objects[0].Object, check.Equals, "newPrefix") c.Assert(result.Objects[0].Object, check.Equals, "newPrefix")
c.Assert(objects[1].Object, check.Equals, "newPrefix2") c.Assert(result.Objects[1].Object, check.Equals, "newPrefix2")
c.Assert(objects[2].Object, check.Equals, "obj0") c.Assert(result.Objects[2].Object, check.Equals, "obj0")
c.Assert(objects[3].Object, check.Equals, "obj1") c.Assert(result.Objects[3].Object, check.Equals, "obj1")
c.Assert(objects[4].Object, check.Equals, "obj10") c.Assert(result.Objects[4].Object, check.Equals, "obj10")
} }
// check delimited results with delimiter and prefix // check delimited results with delimiter and prefix
@ -226,72 +217,49 @@ func testPaging(c *check.C, create func() Filesystem) {
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
_, err = fs.CreateObject("bucket", "this/is/also/a/delimited/file", "", int64(len("prefix2")), bytes.NewBufferString("prefix2"), nil) _, err = fs.CreateObject("bucket", "this/is/also/a/delimited/file", "", int64(len("prefix2")), bytes.NewBufferString("prefix2"), nil)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
var prefixes []string result, err = fs.ListObjects("bucket", "this/is/", "", "/", 10)
resources.CommonPrefixes = prefixes // allocate new everytime
resources.Delimiter = "/"
resources.Prefix = "this/is/"
resources.Maxkeys = 10
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
c.Assert(len(objects), check.Equals, 1) c.Assert(len(result.Objects), check.Equals, 1)
c.Assert(resources.CommonPrefixes[0], check.Equals, "this/is/also/") c.Assert(result.Prefixes[0], check.Equals, "this/is/also/")
} }
time.Sleep(time.Second) time.Sleep(time.Second)
// check delimited results with delimiter without prefix // check delimited results with delimiter without prefix
{ {
var prefixes []string result, err = fs.ListObjects("bucket", "", "", "/", 1000)
resources.CommonPrefixes = prefixes // allocate new everytime
resources.Delimiter = "/"
resources.Prefix = ""
resources.Maxkeys = 1000
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
c.Assert(objects[0].Object, check.Equals, "newPrefix") c.Assert(result.Objects[0].Object, check.Equals, "newPrefix")
c.Assert(objects[1].Object, check.Equals, "newPrefix2") c.Assert(result.Objects[1].Object, check.Equals, "newPrefix2")
c.Assert(objects[2].Object, check.Equals, "obj0") c.Assert(result.Objects[2].Object, check.Equals, "obj0")
c.Assert(objects[3].Object, check.Equals, "obj1") c.Assert(result.Objects[3].Object, check.Equals, "obj1")
c.Assert(objects[4].Object, check.Equals, "obj10") c.Assert(result.Objects[4].Object, check.Equals, "obj10")
c.Assert(resources.CommonPrefixes[0], check.Equals, "this/") c.Assert(result.Prefixes[0], check.Equals, "this/")
} }
// check results with Marker // check results with Marker
{ {
var prefixes []string result, err = fs.ListObjects("bucket", "", "newPrefix", "", 3)
resources.CommonPrefixes = prefixes // allocate new everytime
resources.Prefix = ""
resources.Marker = "newPrefix"
resources.Delimiter = ""
resources.Maxkeys = 3
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
c.Assert(objects[0].Object, check.Equals, "newPrefix2") c.Assert(result.Objects[0].Object, check.Equals, "newPrefix2")
c.Assert(objects[1].Object, check.Equals, "obj0") c.Assert(result.Objects[1].Object, check.Equals, "obj0")
c.Assert(objects[2].Object, check.Equals, "obj1") c.Assert(result.Objects[2].Object, check.Equals, "obj1")
} }
// check ordering of results with prefix // check ordering of results with prefix
{ {
resources.Prefix = "obj" result, err = fs.ListObjects("bucket", "obj", "", "", 1000)
resources.Delimiter = ""
resources.Marker = ""
resources.Maxkeys = 1000
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
c.Assert(objects[0].Object, check.Equals, "obj0") c.Assert(result.Objects[0].Object, check.Equals, "obj0")
c.Assert(objects[1].Object, check.Equals, "obj1") c.Assert(result.Objects[1].Object, check.Equals, "obj1")
c.Assert(objects[2].Object, check.Equals, "obj10") c.Assert(result.Objects[2].Object, check.Equals, "obj10")
c.Assert(objects[3].Object, check.Equals, "obj2") c.Assert(result.Objects[3].Object, check.Equals, "obj2")
c.Assert(objects[4].Object, check.Equals, "obj3") c.Assert(result.Objects[4].Object, check.Equals, "obj3")
} }
// check ordering of results with prefix and no paging // check ordering of results with prefix and no paging
{ {
resources.Prefix = "new" result, err = fs.ListObjects("bucket", "new", "", "", 5)
resources.Marker = ""
resources.Maxkeys = 5
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
c.Assert(objects[0].Object, check.Equals, "newPrefix") c.Assert(result.Objects[0].Object, check.Equals, "newPrefix")
c.Assert(objects[1].Object, check.Equals, "newPrefix2") c.Assert(result.Objects[1].Object, check.Equals, "newPrefix2")
} }
} }
@ -417,11 +385,10 @@ func testListBucketsOrder(c *check.C, create func() Filesystem) {
func testListObjectsTestsForNonExistantBucket(c *check.C, create func() Filesystem) { func testListObjectsTestsForNonExistantBucket(c *check.C, create func() Filesystem) {
fs := create() fs := create()
resources := BucketResourcesMetadata{Prefix: "", Maxkeys: 1000} result, err := fs.ListObjects("bucket", "", "", "", 1000)
objects, resources, err := fs.ListObjects("bucket", resources)
c.Assert(err, check.Not(check.IsNil)) c.Assert(err, check.Not(check.IsNil))
c.Assert(resources.IsTruncated, check.Equals, false) c.Assert(result.IsTruncated, check.Equals, false)
c.Assert(len(objects), check.Equals, 0) c.Assert(len(result.Objects), check.Equals, 0)
} }
func testNonExistantObjectInBucket(c *check.C, create func() Filesystem) { func testNonExistantObjectInBucket(c *check.C, create func() Filesystem) {

View File

@ -26,7 +26,6 @@ import (
"encoding/xml" "encoding/xml"
"math/rand" "math/rand"
"strconv" "strconv"
"time"
"gopkg.in/check.v1" "gopkg.in/check.v1"
) )
@ -165,59 +164,50 @@ func testMultipleObjectCreation(c *check.C, create func() Filesystem) {
func testPaging(c *check.C, create func() Filesystem) { func testPaging(c *check.C, create func() Filesystem) {
fs := create() fs := create()
fs.MakeBucket("bucket", "") fs.MakeBucket("bucket", "")
resources := BucketResourcesMetadata{} result, err := fs.ListObjects("bucket", "", "", "", 0)
objects, resources, err := fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
c.Assert(len(objects), check.Equals, 0) c.Assert(len(result.Objects), check.Equals, 0)
c.Assert(resources.IsTruncated, check.Equals, false) c.Assert(result.IsTruncated, check.Equals, false)
// check before paging occurs // check before paging occurs
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
key := "obj" + strconv.Itoa(i) key := "obj" + strconv.Itoa(i)
_, err = fs.CreateObject("bucket", key, "", int64(len(key)), bytes.NewBufferString(key), nil) _, err = fs.CreateObject("bucket", key, "", int64(len(key)), bytes.NewBufferString(key), nil)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
resources.Maxkeys = 5 result, err = fs.ListObjects("bucket", "", "", "", 5)
resources.Prefix = ""
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
c.Assert(len(objects), check.Equals, i+1) c.Assert(len(result.Objects), check.Equals, i+1)
c.Assert(resources.IsTruncated, check.Equals, false) c.Assert(result.IsTruncated, check.Equals, false)
} }
// check after paging occurs pages work // check after paging occurs pages work
for i := 6; i <= 10; i++ { for i := 6; i <= 10; i++ {
key := "obj" + strconv.Itoa(i) key := "obj" + strconv.Itoa(i)
_, err = fs.CreateObject("bucket", key, "", int64(len(key)), bytes.NewBufferString(key), nil) _, err = fs.CreateObject("bucket", key, "", int64(len(key)), bytes.NewBufferString(key), nil)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
resources.Maxkeys = 5 result, err = fs.ListObjects("bucket", "", "", "", 5)
resources.Prefix = ""
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
c.Assert(len(objects), check.Equals, 5) c.Assert(len(result.Objects), check.Equals, 5)
c.Assert(resources.IsTruncated, check.Equals, true) c.Assert(result.IsTruncated, check.Equals, true)
} }
// check paging with prefix at end returns less objects // check paging with prefix at end returns less objects
{ {
_, err = fs.CreateObject("bucket", "newPrefix", "", int64(len("prefix1")), bytes.NewBufferString("prefix1"), nil) _, err = fs.CreateObject("bucket", "newPrefix", "", int64(len("prefix1")), bytes.NewBufferString("prefix1"), nil)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
_, err = fs.CreateObject("bucket", "newPrefix2", "", int64(len("prefix2")), bytes.NewBufferString("prefix2"), nil) fs.CreateObject("bucket", "newPrefix2", "", int64(len("prefix2")), bytes.NewBufferString("prefix2"), nil)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
resources.Prefix = "new" result, err = fs.ListObjects("bucket", "new", "", "", 5)
resources.Maxkeys = 5
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
c.Assert(len(objects), check.Equals, 2) c.Assert(len(result.Objects), check.Equals, 2)
} }
// check ordering of pages // check ordering of pages
{ {
resources.Prefix = "" result, err = fs.ListObjects("bucket", "", "", "", 1000)
resources.Maxkeys = 1000
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
c.Assert(objects[0].Object, check.Equals, "newPrefix") c.Assert(result.Objects[0].Object, check.Equals, "newPrefix")
c.Assert(objects[1].Object, check.Equals, "newPrefix2") c.Assert(result.Objects[1].Object, check.Equals, "newPrefix2")
c.Assert(objects[2].Object, check.Equals, "obj0") c.Assert(result.Objects[2].Object, check.Equals, "obj0")
c.Assert(objects[3].Object, check.Equals, "obj1") c.Assert(result.Objects[3].Object, check.Equals, "obj1")
c.Assert(objects[4].Object, check.Equals, "obj10") c.Assert(result.Objects[4].Object, check.Equals, "obj10")
} }
// check delimited results with delimiter and prefix // check delimited results with delimiter and prefix
@ -226,72 +216,48 @@ func testPaging(c *check.C, create func() Filesystem) {
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
_, err = fs.CreateObject("bucket", "this/is/also/a/delimited/file", "", int64(len("prefix2")), bytes.NewBufferString("prefix2"), nil) _, err = fs.CreateObject("bucket", "this/is/also/a/delimited/file", "", int64(len("prefix2")), bytes.NewBufferString("prefix2"), nil)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
var prefixes []string result, err = fs.ListObjects("bucket", "this/is/", "", "/", 10)
resources.CommonPrefixes = prefixes // allocate new everytime
resources.Delimiter = "/"
resources.Prefix = "this/is/"
resources.Maxkeys = 10
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
c.Assert(len(objects), check.Equals, 1) c.Assert(len(result.Objects), check.Equals, 1)
c.Assert(resources.CommonPrefixes[0], check.Equals, "this/is/also/") c.Assert(result.Prefixes[0], check.Equals, "this/is/also/")
} }
time.Sleep(time.Second)
// check delimited results with delimiter without prefix // check delimited results with delimiter without prefix
{ {
var prefixes []string result, err = fs.ListObjects("bucket", "", "", "/", 1000)
resources.CommonPrefixes = prefixes // allocate new everytime
resources.Delimiter = "/"
resources.Prefix = ""
resources.Maxkeys = 1000
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
c.Assert(objects[0].Object, check.Equals, "newPrefix") c.Assert(result.Objects[0].Object, check.Equals, "newPrefix")
c.Assert(objects[1].Object, check.Equals, "newPrefix2") c.Assert(result.Objects[1].Object, check.Equals, "newPrefix2")
c.Assert(objects[2].Object, check.Equals, "obj0") c.Assert(result.Objects[2].Object, check.Equals, "obj0")
c.Assert(objects[3].Object, check.Equals, "obj1") c.Assert(result.Objects[3].Object, check.Equals, "obj1")
c.Assert(objects[4].Object, check.Equals, "obj10") c.Assert(result.Objects[4].Object, check.Equals, "obj10")
c.Assert(resources.CommonPrefixes[0], check.Equals, "this/") c.Assert(result.Prefixes[0], check.Equals, "this/")
} }
// check results with Marker // check results with Marker
{ {
var prefixes []string result, err = fs.ListObjects("bucket", "", "newPrefix", "", 3)
resources.CommonPrefixes = prefixes // allocate new everytime
resources.Prefix = ""
resources.Marker = "newPrefix"
resources.Delimiter = ""
resources.Maxkeys = 3
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
c.Assert(objects[0].Object, check.Equals, "newPrefix2") c.Assert(result.Objects[0].Object, check.Equals, "newPrefix2")
c.Assert(objects[1].Object, check.Equals, "obj0") c.Assert(result.Objects[1].Object, check.Equals, "obj0")
c.Assert(objects[2].Object, check.Equals, "obj1") c.Assert(result.Objects[2].Object, check.Equals, "obj1")
} }
// check ordering of results with prefix // check ordering of results with prefix
{ {
resources.Prefix = "obj" result, err = fs.ListObjects("bucket", "obj", "", "", 1000)
resources.Delimiter = ""
resources.Marker = ""
resources.Maxkeys = 1000
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
c.Assert(objects[0].Object, check.Equals, "obj0") c.Assert(result.Objects[0].Object, check.Equals, "obj0")
c.Assert(objects[1].Object, check.Equals, "obj1") c.Assert(result.Objects[1].Object, check.Equals, "obj1")
c.Assert(objects[2].Object, check.Equals, "obj10") c.Assert(result.Objects[2].Object, check.Equals, "obj10")
c.Assert(objects[3].Object, check.Equals, "obj2") c.Assert(result.Objects[3].Object, check.Equals, "obj2")
c.Assert(objects[4].Object, check.Equals, "obj3") c.Assert(result.Objects[4].Object, check.Equals, "obj3")
} }
// check ordering of results with prefix and no paging // check ordering of results with prefix and no paging
{ {
resources.Prefix = "new" result, err = fs.ListObjects("bucket", "new", "", "", 5)
resources.Marker = ""
resources.Maxkeys = 5
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
c.Assert(objects[0].Object, check.Equals, "newPrefix") c.Assert(result.Objects[0].Object, check.Equals, "newPrefix")
c.Assert(objects[1].Object, check.Equals, "newPrefix2") c.Assert(result.Objects[1].Object, check.Equals, "newPrefix2")
} }
} }
@ -416,11 +382,10 @@ func testListBucketsOrder(c *check.C, create func() Filesystem) {
func testListObjectsTestsForNonExistantBucket(c *check.C, create func() Filesystem) { func testListObjectsTestsForNonExistantBucket(c *check.C, create func() Filesystem) {
fs := create() fs := create()
resources := BucketResourcesMetadata{Prefix: "", Maxkeys: 1000} result, err := fs.ListObjects("bucket", "", "", "", 1000)
objects, resources, err := fs.ListObjects("bucket", resources)
c.Assert(err, check.Not(check.IsNil)) c.Assert(err, check.Not(check.IsNil))
c.Assert(resources.IsTruncated, check.Equals, false) c.Assert(result.IsTruncated, check.Equals, false)
c.Assert(len(objects), check.Equals, 0) c.Assert(len(result.Objects), check.Equals, 0)
} }
func testNonExistantObjectInBucket(c *check.C, create func() Filesystem) { func testNonExistantObjectInBucket(c *check.C, create func() Filesystem) {

View File

@ -118,16 +118,12 @@ type BucketMultipartResourcesMetadata struct {
CommonPrefixes []string CommonPrefixes []string
} }
// BucketResourcesMetadata - various types of bucket resources // ListObjectsResult - container for list object request results.
type BucketResourcesMetadata struct { type ListObjectsResult struct {
Prefix string
Marker string
NextMarker string
Maxkeys int
EncodingType string
Delimiter string
IsTruncated bool IsTruncated bool
CommonPrefixes []string NextMarker string
Objects []ObjectMetadata
Prefixes []string
} }
// CompletePart - completed part container // CompletePart - completed part container

View File

@ -1,5 +1,5 @@
/* /*
* Minio Cloud Storage, (C) 2015 Minio, Inc. * Minio Cloud Storage, (C) 2015-2016 Minio, Inc.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -17,363 +17,272 @@
package fs package fs
import ( import (
"io/ioutil" "errors"
"hash/fnv"
"os" "os"
"path/filepath" "path/filepath"
"runtime"
"sort"
"strings" "strings"
"time"
"github.com/minio/minio-xl/pkg/probe" "github.com/minio/minio-xl/pkg/probe"
) )
// ListObjects - GET bucket (list objects) // listObjectsParams - list objects input parameters.
func (fs Filesystem) ListObjects(bucket string, resources BucketResourcesMetadata) ([]ObjectMetadata, BucketResourcesMetadata, *probe.Error) { type listObjectsParams struct {
// Bucket name to list the objects for.
Bucket string
// list all objects with this parameter as common prefix.
Prefix string
// list all objects starting with object after marker in
// lexicographical order.
Marker string
// list all objects until the first occurrence of the delimtier
// after the prefix.
Delimiter string
// maximum number of objects returned per listObjects()
// operation.
MaxKeys int
}
// listServiceReq
type listServiceReq struct {
reqParams listObjectsParams
respCh chan ListObjectsResult
}
type listWorkerReq struct {
respCh chan ListObjectsResult
}
// listObjects - list objects lists objects upto maxKeys for a given prefix.
func (fs Filesystem) listObjects(bucket, prefix, marker, delimiter string, maxKeys int) (chan<- listWorkerReq, *probe.Error) {
quitWalker := make(chan bool)
reqCh := make(chan listWorkerReq)
walkerCh := make(chan ObjectMetadata)
go func() {
defer close(walkerCh)
var walkPath string
bucketPath := filepath.Join(fs.path, bucket)
// Bucket path prefix should always end with a separator.
bucketPathPrefix := bucketPath + string(os.PathSeparator)
prefixPath := bucketPathPrefix + prefix
st, err := os.Stat(prefixPath)
if err != nil && os.IsNotExist(err) {
walkPath = bucketPath
} else {
if st.IsDir() && !strings.HasSuffix(prefix, delimiter) {
walkPath = bucketPath
} else {
walkPath = prefixPath
}
}
Walk(walkPath, func(path string, info os.FileInfo, err error) error {
// We don't need to list the walk path.
if path == walkPath {
return nil
}
// For all incoming directories add a ending separator.
if info.IsDir() {
path = path + string(os.PathSeparator)
}
// Extract object name.
objectName := strings.TrimPrefix(path, bucketPathPrefix)
if strings.HasPrefix(objectName, prefix) {
// For objectName lesser than marker, ignore.
if marker >= objectName {
return nil
}
object := ObjectMetadata{
Object: objectName,
Created: info.ModTime(),
Mode: info.Mode(),
Size: info.Size(),
}
select {
// Send object on walker channel.
case walkerCh <- object:
case <-quitWalker:
// Returning error ends the file tree Walk().
return errors.New("Quit list worker.")
}
// If delimiter is set, we stop if current path is a
// directory.
if delimiter != "" && info.IsDir() {
return ErrSkipDir
}
}
return nil
})
}()
go func() {
for {
select {
// Timeout after 10 seconds if request did not arrive for
// the given list parameters.
case <-time.After(10 * time.Second):
quitWalker <- true // Quit file path walk if running.
// Send back the hash for this request.
fs.timeoutReqCh <- fnvSum(bucket, prefix, marker, delimiter)
return
case req, ok := <-reqCh:
if !ok {
// If the request channel is closed, no more
// requests return here.
return
}
resp := ListObjectsResult{}
var count int
for object := range walkerCh {
if count == maxKeys {
resp.IsTruncated = true
break
}
// If object is a directory.
if object.Mode.IsDir() {
if delimiter == "" {
// Skip directories for recursive listing.
continue
}
resp.Prefixes = append(resp.Prefixes, object.Object)
} else {
resp.Objects = append(resp.Objects, object)
}
// Set the next marker for the next request.
resp.NextMarker = object.Object
count++
}
req.respCh <- resp
}
}
}()
return reqCh, nil
}
// fnvSum calculates a hash for concatenation of all input strings.
func fnvSum(elements ...string) uint32 {
fnvHash := fnv.New32a()
for _, element := range elements {
fnvHash.Write([]byte(element))
}
return fnvHash.Sum32()
}
// listObjectsService - list objects service manages various incoming
// list object requests by delegating them to an existing listObjects
// routine or initializes a new listObjects routine.
func (fs *Filesystem) listObjectsService() *probe.Error {
// Initialize list service request channel.
listServiceReqCh := make(chan listServiceReq)
fs.listServiceReqCh = listServiceReqCh
// Initialize timeout request channel to receive request hashes of
// timed-out requests.
timeoutReqCh := make(chan uint32)
fs.timeoutReqCh = timeoutReqCh
// Initialize request hash to list worker map.
reqToListWorkerReqCh := make(map[uint32]chan<- listWorkerReq)
// Start service in a go routine.
go func() {
for {
select {
case reqHash := <-timeoutReqCh:
// For requests which have timed-out, close the worker
// channels proactively, this may happen for idle
// workers once in 10seconds.
listWorkerReqCh, ok := reqToListWorkerReqCh[reqHash]
if ok {
close(listWorkerReqCh)
}
delete(reqToListWorkerReqCh, reqHash)
case srvReq := <-listServiceReqCh:
// Save the params for readability.
bucket := srvReq.reqParams.Bucket
prefix := srvReq.reqParams.Prefix
marker := srvReq.reqParams.Marker
delimiter := srvReq.reqParams.Delimiter
maxKeys := srvReq.reqParams.MaxKeys
// Generate hash.
reqHash := fnvSum(bucket, prefix, marker, delimiter)
listWorkerReqCh, ok := reqToListWorkerReqCh[reqHash]
if !ok {
var err *probe.Error
listWorkerReqCh, err = fs.listObjects(bucket, prefix, marker, delimiter, maxKeys)
if err != nil {
srvReq.respCh <- ListObjectsResult{}
return
}
reqToListWorkerReqCh[reqHash] = listWorkerReqCh
}
respCh := make(chan ListObjectsResult)
listWorkerReqCh <- listWorkerReq{respCh}
resp, ok := <-respCh
if !ok {
srvReq.respCh <- ListObjectsResult{}
return
}
delete(reqToListWorkerReqCh, reqHash)
if !resp.IsTruncated {
close(listWorkerReqCh)
} else {
nextMarker := resp.NextMarker
reqHash = fnvSum(bucket, prefix, nextMarker, delimiter)
reqToListWorkerReqCh[reqHash] = listWorkerReqCh
}
srvReq.respCh <- resp
}
}
}()
return nil
}
// ListObjects - lists all objects for a given prefix, returns upto
// maxKeys number of objects per call.
func (fs Filesystem) ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsResult, *probe.Error) {
fs.lock.Lock() fs.lock.Lock()
defer fs.lock.Unlock() defer fs.lock.Unlock()
if !IsValidBucketName(bucket) { if !IsValidBucketName(bucket) {
return nil, resources, probe.NewError(BucketNameInvalid{Bucket: bucket}) return ListObjectsResult{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
}
if resources.Prefix != "" && IsValidObjectName(resources.Prefix) == false {
return nil, resources, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: resources.Prefix})
} }
bucket = fs.denormalizeBucket(bucket) bucket = fs.denormalizeBucket(bucket)
p := bucketDir{}
rootPrefix := filepath.Join(fs.path, bucket) rootPrefix := filepath.Join(fs.path, bucket)
// check bucket exists // check bucket exists
if _, err := os.Stat(rootPrefix); os.IsNotExist(err) { if _, e := os.Stat(rootPrefix); e != nil {
return nil, resources, probe.NewError(BucketNotFound{Bucket: bucket}) if os.IsNotExist(e) {
return ListObjectsResult{}, probe.NewError(BucketNotFound{Bucket: bucket})
}
return ListObjectsResult{}, probe.NewError(e)
} }
p.root = rootPrefix reqParams := listObjectsParams{}
/// automatically treat incoming "/" as "\\" on windows due to its path constraints. reqParams.Bucket = bucket
if runtime.GOOS == "windows" { reqParams.Prefix = filepath.FromSlash(prefix)
if resources.Prefix != "" { reqParams.Marker = filepath.FromSlash(marker)
resources.Prefix = strings.Replace(resources.Prefix, "/", string(os.PathSeparator), -1) reqParams.Delimiter = filepath.FromSlash(delimiter)
} reqParams.MaxKeys = maxKeys
if resources.Delimiter != "" {
resources.Delimiter = strings.Replace(resources.Delimiter, "/", string(os.PathSeparator), -1)
}
if resources.Marker != "" {
resources.Marker = strings.Replace(resources.Marker, "/", string(os.PathSeparator), -1)
}
}
// if delimiter is supplied and not prefix then we are the very top level, list everything and move on. respCh := make(chan ListObjectsResult)
if resources.Delimiter != "" && resources.Prefix == "" { fs.listServiceReqCh <- listServiceReq{reqParams, respCh}
files, err := ioutil.ReadDir(rootPrefix) resp := <-respCh
if err != nil {
if os.IsNotExist(err) {
return nil, resources, probe.NewError(BucketNotFound{Bucket: bucket})
}
return nil, resources, probe.NewError(err)
}
for _, fl := range files {
if strings.HasSuffix(fl.Name(), "$multiparts") {
continue
}
p.files = append(p.files, contentInfo{
Prefix: fl.Name(),
Size: fl.Size(),
Mode: fl.Mode(),
ModTime: fl.ModTime(),
FileInfo: fl,
})
}
}
// If delimiter and prefix is supplied make sure that paging doesn't go deep, treat it as simple directory listing. for i := range resp.Prefixes {
if resources.Delimiter != "" && resources.Prefix != "" { resp.Prefixes[i] = filepath.ToSlash(resp.Prefixes[i])
if !strings.HasSuffix(resources.Prefix, resources.Delimiter) {
fl, err := os.Stat(filepath.Join(rootPrefix, resources.Prefix))
if err != nil {
if os.IsNotExist(err) {
return nil, resources, probe.NewError(ObjectNotFound{Bucket: bucket, Object: resources.Prefix})
} }
return nil, resources, probe.NewError(err) for i := range resp.Objects {
resp.Objects[i].Object = filepath.ToSlash(resp.Objects[i].Object)
} }
p.files = append(p.files, contentInfo{ if reqParams.Delimiter == "" {
Prefix: resources.Prefix, // This element is set only if you have delimiter set.
Size: fl.Size(), // If response does not include the NextMaker and it is
Mode: os.ModeDir, // truncated, you can use the value of the last Key in the
ModTime: fl.ModTime(), // response as the marker in the subsequent request to get the
FileInfo: fl, // next set of object keys.
}) resp.NextMarker = ""
} else {
var prefixPath string
if runtime.GOOS == "windows" {
prefixPath = rootPrefix + string(os.PathSeparator) + resources.Prefix
} else {
prefixPath = rootPrefix + string(os.PathSeparator) + resources.Prefix
} }
files, err := ioutil.ReadDir(prefixPath) return resp, nil
if err != nil {
switch err := err.(type) {
case *os.PathError:
if err.Op == "open" {
return nil, resources, probe.NewError(ObjectNotFound{Bucket: bucket, Object: resources.Prefix})
}
}
return nil, resources, probe.NewError(err)
}
for _, fl := range files {
if strings.HasSuffix(fl.Name(), "$multiparts") {
continue
}
prefix := fl.Name()
if resources.Prefix != "" {
prefix = filepath.Join(resources.Prefix, fl.Name())
}
p.files = append(p.files, contentInfo{
Prefix: prefix,
Size: fl.Size(),
Mode: fl.Mode(),
ModTime: fl.ModTime(),
FileInfo: fl,
})
}
}
}
if resources.Delimiter == "" {
var files []contentInfo
getAllFiles := func(fp string, fl os.FileInfo, err error) error {
// If any error return back quickly
if err != nil {
return err
}
if strings.HasSuffix(fp, "$multiparts") {
return nil
}
// if file pointer equals to rootPrefix - discard it
if fp == p.root {
return nil
}
if len(files) > resources.Maxkeys {
return ErrSkipFile
}
// Split the root prefix from the incoming file pointer
realFp := ""
if runtime.GOOS == "windows" {
if splits := strings.Split(fp, (p.root + string(os.PathSeparator))); len(splits) > 1 {
realFp = splits[1]
}
} else {
if splits := strings.Split(fp, (p.root + string(os.PathSeparator))); len(splits) > 1 {
realFp = splits[1]
}
}
// If path is a directory and has a prefix verify if the file pointer
// has the prefix if it does not skip the directory.
if fl.Mode().IsDir() {
if resources.Prefix != "" {
// Skip the directory on following situations
// - when prefix is part of file pointer along with the root path
// - when file pointer is part of the prefix along with root path
if !strings.HasPrefix(fp, filepath.Join(p.root, resources.Prefix)) &&
!strings.HasPrefix(filepath.Join(p.root, resources.Prefix), fp) {
return ErrSkipDir
}
}
}
// If path is a directory and has a marker verify if the file split file pointer
// is lesser than the Marker top level directory if yes skip it.
if fl.Mode().IsDir() {
if resources.Marker != "" {
if realFp != "" {
// For windows split with its own os.PathSeparator
if runtime.GOOS == "windows" {
if realFp < strings.Split(resources.Marker, string(os.PathSeparator))[0] {
return ErrSkipDir
}
} else {
if realFp < strings.Split(resources.Marker, string(os.PathSeparator))[0] {
return ErrSkipDir
}
}
}
}
}
// If regular file verify
if fl.Mode().IsRegular() {
// If marker is present this will be used to check if filepointer is
// lexically higher than then Marker
if realFp != "" {
if resources.Marker != "" {
if realFp > resources.Marker {
files = append(files, contentInfo{
Prefix: realFp,
Size: fl.Size(),
Mode: fl.Mode(),
ModTime: fl.ModTime(),
FileInfo: fl,
})
}
} else {
files = append(files, contentInfo{
Prefix: realFp,
Size: fl.Size(),
Mode: fl.Mode(),
ModTime: fl.ModTime(),
FileInfo: fl,
})
}
}
}
// If file is a symlink follow it and populate values.
if fl.Mode()&os.ModeSymlink == os.ModeSymlink {
st, err := os.Stat(fp)
if err != nil {
return nil
}
// If marker is present this will be used to check if filepointer is
// lexically higher than then Marker
if realFp != "" {
if resources.Marker != "" {
if realFp > resources.Marker {
files = append(files, contentInfo{
Prefix: realFp,
Size: st.Size(),
Mode: st.Mode(),
ModTime: st.ModTime(),
FileInfo: st,
})
}
} else {
files = append(files, contentInfo{
Prefix: realFp,
Size: st.Size(),
Mode: st.Mode(),
ModTime: st.ModTime(),
FileInfo: st,
})
}
}
}
p.files = files
return nil
}
// If no delimiter is specified, crawl through everything.
err := Walk(rootPrefix, getAllFiles)
if err != nil {
if os.IsNotExist(err) {
return nil, resources, probe.NewError(ObjectNotFound{Bucket: bucket, Object: resources.Prefix})
}
return nil, resources, probe.NewError(err)
}
}
var metadataList []ObjectMetadata
var metadata ObjectMetadata
// Filter objects
for _, content := range p.files {
if len(metadataList) == resources.Maxkeys {
resources.IsTruncated = true
if resources.IsTruncated && resources.Delimiter != "" {
resources.NextMarker = metadataList[len(metadataList)-1].Object
}
break
}
if content.Prefix > resources.Marker {
var err *probe.Error
metadata, resources, err = fs.filterObjects(bucket, content, resources)
if err != nil {
return nil, resources, err.Trace()
}
// If windows replace all the incoming paths to API compatible paths
if runtime.GOOS == "windows" {
metadata.Object = sanitizeWindowsPath(metadata.Object)
}
if metadata.Bucket != "" {
metadataList = append(metadataList, metadata)
}
}
}
// Sanitize common prefixes back into API compatible paths
if runtime.GOOS == "windows" {
resources.CommonPrefixes = sanitizeWindowsPaths(resources.CommonPrefixes...)
}
return metadataList, resources, nil
}
func (fs Filesystem) filterObjects(bucket string, content contentInfo, resources BucketResourcesMetadata) (ObjectMetadata, BucketResourcesMetadata, *probe.Error) {
var err *probe.Error
var metadata ObjectMetadata
name := content.Prefix
switch true {
// Both delimiter and Prefix is present
case resources.Delimiter != "" && resources.Prefix != "":
if strings.HasPrefix(name, resources.Prefix) {
trimmedName := strings.TrimPrefix(name, resources.Prefix)
delimitedName := delimiter(trimmedName, resources.Delimiter)
switch true {
case name == resources.Prefix:
// Use resources.Prefix to filter out delimited file
metadata, err = getMetadata(fs.path, bucket, name)
if err != nil {
return ObjectMetadata{}, resources, err.Trace()
}
if metadata.Mode.IsDir() {
resources.CommonPrefixes = append(resources.CommonPrefixes, name+resources.Delimiter)
return ObjectMetadata{}, resources, nil
}
case delimitedName == content.FileInfo.Name():
// Use resources.Prefix to filter out delimited files
metadata, err = getMetadata(fs.path, bucket, name)
if err != nil {
return ObjectMetadata{}, resources, err.Trace()
}
if metadata.Mode.IsDir() {
resources.CommonPrefixes = append(resources.CommonPrefixes, name+resources.Delimiter)
return ObjectMetadata{}, resources, nil
}
case delimitedName != "":
resources.CommonPrefixes = append(resources.CommonPrefixes, resources.Prefix+delimitedName)
}
}
// Delimiter present and Prefix is absent
case resources.Delimiter != "" && resources.Prefix == "":
delimitedName := delimiter(name, resources.Delimiter)
switch true {
case delimitedName == "":
metadata, err = getMetadata(fs.path, bucket, name)
if err != nil {
return ObjectMetadata{}, resources, err.Trace()
}
if metadata.Mode.IsDir() {
resources.CommonPrefixes = append(resources.CommonPrefixes, name+resources.Delimiter)
return ObjectMetadata{}, resources, nil
}
case delimitedName == content.FileInfo.Name():
metadata, err = getMetadata(fs.path, bucket, name)
if err != nil {
return ObjectMetadata{}, resources, err.Trace()
}
if metadata.Mode.IsDir() {
resources.CommonPrefixes = append(resources.CommonPrefixes, name+resources.Delimiter)
return ObjectMetadata{}, resources, nil
}
case delimitedName != "":
resources.CommonPrefixes = append(resources.CommonPrefixes, delimitedName)
}
// Delimiter is absent and only Prefix is present
case resources.Delimiter == "" && resources.Prefix != "":
if strings.HasPrefix(name, resources.Prefix) {
// Do not strip prefix object output
metadata, err = getMetadata(fs.path, bucket, name)
if err != nil {
return ObjectMetadata{}, resources, err.Trace()
}
}
default:
metadata, err = getMetadata(fs.path, bucket, name)
if err != nil {
return ObjectMetadata{}, resources, err.Trace()
}
}
sortUnique(sort.StringSlice(resources.CommonPrefixes))
return metadata, resources, nil
} }

View File

@ -33,6 +33,8 @@ type Filesystem struct {
lock *sync.Mutex lock *sync.Mutex
multiparts *Multiparts multiparts *Multiparts
buckets *Buckets buckets *Buckets
listServiceReqCh chan<- listServiceReq
timeoutReqCh chan<- uint32
} }
// Buckets holds acl information // Buckets holds acl information
@ -96,14 +98,17 @@ func New(rootPath string) (Filesystem, *probe.Error) {
fs.path = rootPath fs.path = rootPath
fs.multiparts = multiparts fs.multiparts = multiparts
fs.buckets = buckets fs.buckets = buckets
/// Defaults /// Defaults
// maximum buckets to be listed from list buckets. // maximum buckets to be listed from list buckets.
fs.maxBuckets = 100 fs.maxBuckets = 1000
// minium free disk required for i/o operations to succeed. // minium free disk required for i/o operations to succeed.
fs.minFreeDisk = 10 fs.minFreeDisk = 10
// Start list goroutine.
if err = fs.listObjectsService(); err != nil {
return Filesystem{}, err.Trace(rootPath)
}
// Return here. // Return here.
return fs, nil return fs, nil
} }

View File

@ -1,3 +1,19 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ioutils package ioutils
import ( import (
@ -20,10 +36,12 @@ func ReadDirN(dirname string, n int) ([]os.FileInfo, error) {
return nil, err return nil, err
} }
list, err := f.Readdir(n) list, err := f.Readdir(n)
f.Close()
if err != nil { if err != nil {
return nil, err return nil, err
} }
if err = f.Close(); err != nil {
return nil, err
}
sort.Sort(byName(list)) sort.Sort(byName(list))
return list, nil return list, nil
} }
@ -36,10 +54,12 @@ func ReadDirNamesN(dirname string, n int) ([]string, error) {
return nil, err return nil, err
} }
names, err := f.Readdirnames(n) names, err := f.Readdirnames(n)
f.Close()
if err != nil { if err != nil {
return nil, err return nil, err
} }
if err = f.Close(); err != nil {
return nil, err
}
sort.Strings(names) sort.Strings(names)
return names, nil return names, nil
} }

View File

@ -0,0 +1,54 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ioutils_test
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"testing"
"github.com/minio/minio/pkg/ioutils"
. "gopkg.in/check.v1"
)
func Test(t *testing.T) { TestingT(t) }
type MySuite struct{}
var _ = Suite(&MySuite{})
func (s *MySuite) TestIoutils(c *C) {
path, err := ioutil.TempDir(os.TempDir(), "minio-")
c.Assert(err, IsNil)
defer os.RemoveAll(path)
var count int
for count < 102 {
count++
err = os.MkdirAll(filepath.Join(path, fmt.Sprintf("minio-%d", count)), 0700)
c.Assert(err, IsNil)
}
dirs, err := ioutils.ReadDirN(path, 100)
c.Assert(err, IsNil)
c.Assert(len(dirs), Equals, 100)
dirNames, err := ioutils.ReadDirNamesN(path, 100)
c.Assert(err, IsNil)
c.Assert(len(dirNames), Equals, 100)
}

View File

@ -40,8 +40,8 @@ type CloudStorageAPI struct {
// WebAPI container for Web API. // WebAPI container for Web API.
type WebAPI struct { type WebAPI struct {
// Do not check for incoming authorization header. // FSPath filesystem path.
Anonymous bool FSPath string
// Once true log all incoming request. // Once true log all incoming request.
AccessLog bool AccessLog bool
// Minio client instance. // Minio client instance.
@ -53,9 +53,6 @@ func getWebAPIHandler(web *WebAPI) http.Handler {
TimeValidityHandler, // Validate time. TimeValidityHandler, // Validate time.
CorsHandler, // CORS added only for testing purposes. CorsHandler, // CORS added only for testing purposes.
} }
if !web.Anonymous {
mwHandlers = append(mwHandlers, AuthHandler)
}
if web.AccessLog { if web.AccessLog {
mwHandlers = append(mwHandlers, AccessLogHandler) mwHandlers = append(mwHandlers, AccessLogHandler)
} }
@ -120,7 +117,7 @@ func getNewWebAPI(conf cloudServerConfig) *WebAPI {
fatalIf(probe.NewError(e), "Unable to initialize minio client", nil) fatalIf(probe.NewError(e), "Unable to initialize minio client", nil)
web := &WebAPI{ web := &WebAPI{
Anonymous: conf.Anonymous, FSPath: conf.Path,
AccessLog: conf.AccessLog, AccessLog: conf.AccessLog,
Client: client, Client: client,
} }

View File

@ -159,24 +159,6 @@ func createConfigPath() *probe.Error {
return nil return nil
} }
// isAuthConfigFileExists is auth config file exists?
func isConfigFileExists() bool {
if _, err := os.Stat(mustGetConfigFile()); err != nil {
if os.IsNotExist(err) {
return false
}
panic(err)
}
return true
}
// mustGetConfigFile always get users config file, if not panic
func mustGetConfigFile() string {
configFile, err := getConfigFile()
fatalIf(err.Trace(), "Unable to get config file.", nil)
return configFile
}
// getConfigFile get users config file // getConfigFile get users config file
func getConfigFile() (string, *probe.Error) { func getConfigFile() (string, *probe.Error) {
configPath, err := getConfigPath() configPath, err := getConfigPath()

View File

@ -126,16 +126,17 @@ var ignoredHeaders = map[string]bool{
} }
func (s *MyAPIFSCacheSuite) newRequest(method, urlStr string, contentLength int64, body io.ReadSeeker) (*http.Request, error) { func (s *MyAPIFSCacheSuite) newRequest(method, urlStr string, contentLength int64, body io.ReadSeeker) (*http.Request, error) {
if method == "" {
method = "POST"
}
t := time.Now().UTC() t := time.Now().UTC()
req, err := http.NewRequest(method, urlStr, nil) req, err := http.NewRequest(method, urlStr, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
req.Header.Set("x-amz-date", t.Format(iso8601Format)) req.Header.Set("x-amz-date", t.Format(iso8601Format))
if method == "" {
method = "POST"
}
// add Content-Length // add Content-Length
req.ContentLength = contentLength req.ContentLength = contentLength

View File

@ -74,12 +74,6 @@ const (
minioUpdateExperimentalURL = "https://dl.minio.io/server/minio/experimental/" minioUpdateExperimentalURL = "https://dl.minio.io/server/minio/experimental/"
) )
// minioUpdates container to hold updates json.
type minioUpdates struct {
BuildDate string
Platforms map[string]string
}
// updateMessage container to hold update messages. // updateMessage container to hold update messages.
type updateMessage struct { type updateMessage struct {
Status string `json:"status"` Status string `json:"status"`

View File

@ -23,6 +23,9 @@ type MakeBucketArgs struct {
BucketName string `json:"bucketName"` BucketName string `json:"bucketName"`
} }
// DiskInfoArgs - disk info args.
type DiskInfoArgs struct{}
// ListBucketsArgs - list bucket args. // ListBucketsArgs - list bucket args.
type ListBucketsArgs struct{} type ListBucketsArgs struct{}

View File

@ -22,6 +22,7 @@ import (
"time" "time"
jwtgo "github.com/dgrijalva/jwt-go" jwtgo "github.com/dgrijalva/jwt-go"
"github.com/minio/minio/pkg/disk"
) )
// isAuthenticated validates if any incoming request to be a valid JWT // isAuthenticated validates if any incoming request to be a valid JWT
@ -40,6 +41,19 @@ func isAuthenticated(req *http.Request) bool {
return tokenRequest.Valid return tokenRequest.Valid
} }
// DiskInfo - get disk statistics.
func (web *WebAPI) DiskInfo(r *http.Request, args *DiskInfoArgs, reply *disk.Info) error {
if !isAuthenticated(r) {
return errUnAuthorizedRequest
}
info, err := disk.GetInfo(web.FSPath)
if err != nil {
return err
}
*reply = info
return nil
}
// MakeBucket - make a bucket. // MakeBucket - make a bucket.
func (web *WebAPI) MakeBucket(r *http.Request, args *MakeBucketArgs, reply *string) error { func (web *WebAPI) MakeBucket(r *http.Request, args *MakeBucketArgs, reply *string) error {
if !isAuthenticated(r) { if !isAuthenticated(r) {