Migrate this project to minio micro services code

This commit is contained in:
Harshavardhana
2015-10-16 11:26:01 -07:00
parent 8c4119cbeb
commit 762b798767
349 changed files with 3704 additions and 76049 deletions

View File

@@ -0,0 +1,507 @@
// +build linux darwin freebsd openbsd netbsd dragonfly
/*
* Minimalist Object Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package fs
import (
"bytes"
"crypto/md5"
"encoding/base64"
"encoding/hex"
"encoding/xml"
"math/rand"
"strconv"
"time"
"gopkg.in/check.v1"
)
// APITestSuite - collection of API tests
func APITestSuite(c *check.C, create func() CloudStorage) {
testMakeBucket(c, create)
testMultipleObjectCreation(c, create)
testPaging(c, create)
testObjectOverwriteWorks(c, create)
testNonExistantBucketOperations(c, create)
testBucketMetadata(c, create)
testBucketRecreateFails(c, create)
testPutObjectInSubdir(c, create)
testListBuckets(c, create)
testListBucketsOrder(c, create)
testListObjectsTestsForNonExistantBucket(c, create)
testNonExistantObjectInBucket(c, create)
testGetDirectoryReturnsObjectNotFound(c, create)
testDefaultContentType(c, create)
testMultipartObjectCreation(c, create)
testMultipartObjectAbort(c, create)
}
func testMakeBucket(c *check.C, create func() CloudStorage) {
fs := create()
err := fs.MakeBucket("bucket", "")
c.Assert(err, check.IsNil)
}
func testMultipartObjectCreation(c *check.C, create func() CloudStorage) {
fs := create()
err := fs.MakeBucket("bucket", "")
c.Assert(err, check.IsNil)
uploadID, err := fs.NewMultipartUpload("bucket", "key")
c.Assert(err, check.IsNil)
completedParts := CompleteMultipartUpload{}
completedParts.Part = make([]CompletePart, 0)
finalHasher := md5.New()
for i := 1; i <= 10; i++ {
randomPerm := rand.Perm(10)
randomString := ""
for _, num := range randomPerm {
randomString = randomString + strconv.Itoa(num)
}
hasher := md5.New()
finalHasher.Write([]byte(randomString))
hasher.Write([]byte(randomString))
expectedmd5Sum := base64.StdEncoding.EncodeToString(hasher.Sum(nil))
expectedmd5Sumhex := hex.EncodeToString(hasher.Sum(nil))
var calculatedmd5sum string
calculatedmd5sum, err = fs.CreateObjectPart("bucket", "key", uploadID, expectedmd5Sum, i, int64(len(randomString)),
bytes.NewBufferString(randomString), nil)
c.Assert(err, check.IsNil)
c.Assert(calculatedmd5sum, check.Equals, expectedmd5Sumhex)
completedParts.Part = append(completedParts.Part, CompletePart{PartNumber: i, ETag: calculatedmd5sum})
}
finalExpectedmd5SumHex := hex.EncodeToString(finalHasher.Sum(nil))
completedPartsBytes, e := xml.Marshal(completedParts)
c.Assert(e, check.IsNil)
objectMetadata, err := fs.CompleteMultipartUpload("bucket", "key", uploadID, bytes.NewReader(completedPartsBytes), nil)
c.Assert(err, check.IsNil)
c.Assert(objectMetadata.Md5, check.Equals, finalExpectedmd5SumHex)
}
func testMultipartObjectAbort(c *check.C, create func() CloudStorage) {
fs := create()
err := fs.MakeBucket("bucket", "")
c.Assert(err, check.IsNil)
uploadID, err := fs.NewMultipartUpload("bucket", "key")
c.Assert(err, check.IsNil)
parts := make(map[int]string)
for i := 1; i <= 10; i++ {
randomPerm := rand.Perm(10)
randomString := ""
for _, num := range randomPerm {
randomString = randomString + strconv.Itoa(num)
}
hasher := md5.New()
hasher.Write([]byte(randomString))
expectedmd5Sum := base64.StdEncoding.EncodeToString(hasher.Sum(nil))
expectedmd5Sumhex := hex.EncodeToString(hasher.Sum(nil))
var calculatedmd5sum string
calculatedmd5sum, err = fs.CreateObjectPart("bucket", "key", uploadID, expectedmd5Sum, i, int64(len(randomString)),
bytes.NewBufferString(randomString), nil)
c.Assert(err, check.IsNil)
c.Assert(calculatedmd5sum, check.Equals, expectedmd5Sumhex)
parts[i] = calculatedmd5sum
}
err = fs.AbortMultipartUpload("bucket", "key", uploadID)
c.Assert(err, check.IsNil)
}
func testMultipleObjectCreation(c *check.C, create func() CloudStorage) {
objects := make(map[string][]byte)
fs := create()
err := fs.MakeBucket("bucket", "")
c.Assert(err, check.IsNil)
for i := 0; i < 10; i++ {
randomPerm := rand.Perm(10)
randomString := ""
for _, num := range randomPerm {
randomString = randomString + strconv.Itoa(num)
}
hasher := md5.New()
hasher.Write([]byte(randomString))
expectedmd5Sum := base64.StdEncoding.EncodeToString(hasher.Sum(nil))
expectedmd5Sumhex := hex.EncodeToString(hasher.Sum(nil))
key := "obj" + strconv.Itoa(i)
objects[key] = []byte(randomString)
objectMetadata, err := fs.CreateObject("bucket", key, expectedmd5Sum, int64(len(randomString)), bytes.NewBufferString(randomString), nil)
c.Assert(err, check.IsNil)
c.Assert(objectMetadata.Md5, check.Equals, expectedmd5Sumhex)
}
for key, value := range objects {
var byteBuffer bytes.Buffer
_, err := fs.GetObject(&byteBuffer, "bucket", key, 0, 0)
c.Assert(err, check.IsNil)
c.Assert(byteBuffer.Bytes(), check.DeepEquals, value)
metadata, err := fs.GetObjectMetadata("bucket", key)
c.Assert(err, check.IsNil)
c.Assert(metadata.Size, check.Equals, int64(len(value)))
}
}
func testPaging(c *check.C, create func() CloudStorage) {
fs := create()
fs.MakeBucket("bucket", "")
resources := BucketResourcesMetadata{}
objects, resources, err := fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil)
c.Assert(len(objects), check.Equals, 0)
c.Assert(resources.IsTruncated, check.Equals, false)
// check before paging occurs
for i := 0; i < 5; i++ {
key := "obj" + strconv.Itoa(i)
_, err = fs.CreateObject("bucket", key, "", int64(len(key)), bytes.NewBufferString(key), nil)
c.Assert(err, check.IsNil)
resources.Maxkeys = 5
resources.Prefix = ""
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil)
c.Assert(len(objects), check.Equals, i+1)
c.Assert(resources.IsTruncated, check.Equals, false)
}
// check after paging occurs pages work
for i := 6; i <= 10; i++ {
key := "obj" + strconv.Itoa(i)
_, err = fs.CreateObject("bucket", key, "", int64(len(key)), bytes.NewBufferString(key), nil)
c.Assert(err, check.IsNil)
resources.Maxkeys = 5
resources.Prefix = ""
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil)
c.Assert(len(objects), check.Equals, 5)
c.Assert(resources.IsTruncated, check.Equals, true)
}
// check paging with prefix at end returns less objects
{
_, err = fs.CreateObject("bucket", "newPrefix", "", int64(len("prefix1")), bytes.NewBufferString("prefix1"), nil)
c.Assert(err, check.IsNil)
fs.CreateObject("bucket", "newPrefix2", "", int64(len("prefix2")), bytes.NewBufferString("prefix2"), nil)
c.Assert(err, check.IsNil)
resources.Prefix = "new"
resources.Maxkeys = 5
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil)
c.Assert(len(objects), check.Equals, 2)
}
// check ordering of pages
{
resources.Prefix = ""
resources.Maxkeys = 1000
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil)
c.Assert(objects[0].Object, check.Equals, "newPrefix")
c.Assert(objects[1].Object, check.Equals, "newPrefix2")
c.Assert(objects[2].Object, check.Equals, "obj0")
c.Assert(objects[3].Object, check.Equals, "obj1")
c.Assert(objects[4].Object, check.Equals, "obj10")
}
// check delimited results with delimiter and prefix
{
_, err = fs.CreateObject("bucket", "this/is/delimited", "", int64(len("prefix1")), bytes.NewBufferString("prefix1"), nil)
c.Assert(err, check.IsNil)
_, err = fs.CreateObject("bucket", "this/is/also/a/delimited/file", "", int64(len("prefix2")), bytes.NewBufferString("prefix2"), nil)
c.Assert(err, check.IsNil)
var prefixes []string
resources.CommonPrefixes = prefixes // allocate new everytime
resources.Delimiter = "/"
resources.Prefix = "this/is/"
resources.Maxkeys = 10
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil)
c.Assert(len(objects), check.Equals, 1)
c.Assert(resources.CommonPrefixes[0], check.Equals, "this/is/also/")
}
time.Sleep(time.Second)
// check delimited results with delimiter without prefix
{
var prefixes []string
resources.CommonPrefixes = prefixes // allocate new everytime
resources.Delimiter = "/"
resources.Prefix = ""
resources.Maxkeys = 1000
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil)
c.Assert(objects[0].Object, check.Equals, "newPrefix")
c.Assert(objects[1].Object, check.Equals, "newPrefix2")
c.Assert(objects[2].Object, check.Equals, "obj0")
c.Assert(objects[3].Object, check.Equals, "obj1")
c.Assert(objects[4].Object, check.Equals, "obj10")
c.Assert(resources.CommonPrefixes[0], check.Equals, "this/")
}
// check results with Marker
{
var prefixes []string
resources.CommonPrefixes = prefixes // allocate new everytime
resources.Prefix = ""
resources.Marker = "newPrefix"
resources.Delimiter = ""
resources.Maxkeys = 3
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil)
c.Assert(objects[0].Object, check.Equals, "newPrefix2")
c.Assert(objects[1].Object, check.Equals, "obj0")
c.Assert(objects[2].Object, check.Equals, "obj1")
}
// check ordering of results with prefix
{
resources.Prefix = "obj"
resources.Delimiter = ""
resources.Marker = ""
resources.Maxkeys = 1000
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil)
c.Assert(objects[0].Object, check.Equals, "obj0")
c.Assert(objects[1].Object, check.Equals, "obj1")
c.Assert(objects[2].Object, check.Equals, "obj10")
c.Assert(objects[3].Object, check.Equals, "obj2")
c.Assert(objects[4].Object, check.Equals, "obj3")
}
// check ordering of results with prefix and no paging
{
resources.Prefix = "new"
resources.Marker = ""
resources.Maxkeys = 5
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil)
c.Assert(objects[0].Object, check.Equals, "newPrefix")
c.Assert(objects[1].Object, check.Equals, "newPrefix2")
}
}
func testObjectOverwriteWorks(c *check.C, create func() CloudStorage) {
fs := create()
err := fs.MakeBucket("bucket", "")
c.Assert(err, check.IsNil)
hasher1 := md5.New()
hasher1.Write([]byte("one"))
md5Sum1 := base64.StdEncoding.EncodeToString(hasher1.Sum(nil))
md5Sum1hex := hex.EncodeToString(hasher1.Sum(nil))
objectMetadata, err := fs.CreateObject("bucket", "object", md5Sum1, int64(len("one")), bytes.NewBufferString("one"), nil)
c.Assert(err, check.IsNil)
c.Assert(md5Sum1hex, check.Equals, objectMetadata.Md5)
hasher2 := md5.New()
hasher2.Write([]byte("three"))
md5Sum2 := base64.StdEncoding.EncodeToString(hasher2.Sum(nil))
_, err = fs.CreateObject("bucket", "object", md5Sum2, int64(len("three")), bytes.NewBufferString("three"), nil)
c.Assert(err, check.IsNil)
var bytesBuffer bytes.Buffer
length, err := fs.GetObject(&bytesBuffer, "bucket", "object", 0, 0)
c.Assert(err, check.IsNil)
c.Assert(length, check.Equals, int64(len("three")))
c.Assert(string(bytesBuffer.Bytes()), check.Equals, "three")
}
func testNonExistantBucketOperations(c *check.C, create func() CloudStorage) {
fs := create()
_, err := fs.CreateObject("bucket", "object", "", int64(len("one")), bytes.NewBufferString("one"), nil)
c.Assert(err, check.Not(check.IsNil))
}
func testBucketMetadata(c *check.C, create func() CloudStorage) {
fs := create()
err := fs.MakeBucket("string", "")
c.Assert(err, check.IsNil)
metadata, err := fs.GetBucketMetadata("string")
c.Assert(err, check.IsNil)
c.Assert(metadata.ACL, check.Equals, BucketACL("private"))
}
func testBucketRecreateFails(c *check.C, create func() CloudStorage) {
fs := create()
err := fs.MakeBucket("string", "")
c.Assert(err, check.IsNil)
err = fs.MakeBucket("string", "")
c.Assert(err, check.Not(check.IsNil))
}
func testPutObjectInSubdir(c *check.C, create func() CloudStorage) {
fs := create()
err := fs.MakeBucket("bucket", "")
c.Assert(err, check.IsNil)
hasher := md5.New()
hasher.Write([]byte("hello world"))
md5Sum1 := base64.StdEncoding.EncodeToString(hasher.Sum(nil))
md5Sum1hex := hex.EncodeToString(hasher.Sum(nil))
objectMetadata, err := fs.CreateObject("bucket", "dir1/dir2/object", md5Sum1, int64(len("hello world")), bytes.NewBufferString("hello world"), nil)
c.Assert(err, check.IsNil)
c.Assert(objectMetadata.Md5, check.Equals, md5Sum1hex)
var bytesBuffer bytes.Buffer
length, err := fs.GetObject(&bytesBuffer, "bucket", "dir1/dir2/object", 0, 0)
c.Assert(err, check.IsNil)
c.Assert(len(bytesBuffer.Bytes()), check.Equals, len("hello world"))
c.Assert(int64(len(bytesBuffer.Bytes())), check.Equals, length)
}
func testListBuckets(c *check.C, create func() CloudStorage) {
fs := create()
// test empty list
buckets, err := fs.ListBuckets()
c.Assert(err, check.IsNil)
c.Assert(len(buckets), check.Equals, 0)
// add one and test exists
err = fs.MakeBucket("bucket1", "")
c.Assert(err, check.IsNil)
buckets, err = fs.ListBuckets()
c.Assert(len(buckets), check.Equals, 1)
c.Assert(err, check.IsNil)
// add two and test exists
err = fs.MakeBucket("bucket2", "")
c.Assert(err, check.IsNil)
buckets, err = fs.ListBuckets()
c.Assert(len(buckets), check.Equals, 2)
c.Assert(err, check.IsNil)
// add three and test exists + prefix
err = fs.MakeBucket("bucket22", "")
buckets, err = fs.ListBuckets()
c.Assert(len(buckets), check.Equals, 3)
c.Assert(err, check.IsNil)
}
func testListBucketsOrder(c *check.C, create func() CloudStorage) {
// if implementation contains a map, order of map keys will vary.
// this ensures they return in the same order each time
for i := 0; i < 10; i++ {
fs := create()
// add one and test exists
err := fs.MakeBucket("bucket1", "")
c.Assert(err, check.IsNil)
err = fs.MakeBucket("bucket2", "")
c.Assert(err, check.IsNil)
buckets, err := fs.ListBuckets()
c.Assert(err, check.IsNil)
c.Assert(len(buckets), check.Equals, 2)
c.Assert(buckets[0].Name, check.Equals, "bucket1")
c.Assert(buckets[1].Name, check.Equals, "bucket2")
}
}
func testListObjectsTestsForNonExistantBucket(c *check.C, create func() CloudStorage) {
fs := create()
resources := BucketResourcesMetadata{Prefix: "", Maxkeys: 1000}
objects, resources, err := fs.ListObjects("bucket", resources)
c.Assert(err, check.Not(check.IsNil))
c.Assert(resources.IsTruncated, check.Equals, false)
c.Assert(len(objects), check.Equals, 0)
}
func testNonExistantObjectInBucket(c *check.C, create func() CloudStorage) {
fs := create()
err := fs.MakeBucket("bucket", "")
c.Assert(err, check.IsNil)
var byteBuffer bytes.Buffer
length, err := fs.GetObject(&byteBuffer, "bucket", "dir1", 0, 0)
c.Assert(length, check.Equals, int64(0))
c.Assert(err, check.Not(check.IsNil))
c.Assert(len(byteBuffer.Bytes()), check.Equals, 0)
switch err := err.ToGoError().(type) {
case ObjectNotFound:
c.Assert(err, check.ErrorMatches, "Object not found: bucket#dir1")
default:
c.Assert(err, check.Equals, "fails")
}
}
func testGetDirectoryReturnsObjectNotFound(c *check.C, create func() CloudStorage) {
fs := create()
err := fs.MakeBucket("bucket", "")
c.Assert(err, check.IsNil)
_, err = fs.CreateObject("bucket", "dir1/dir2/object", "", int64(len("hello world")), bytes.NewBufferString("hello world"), nil)
c.Assert(err, check.IsNil)
var byteBuffer bytes.Buffer
length, err := fs.GetObject(&byteBuffer, "bucket", "dir1", 0, 0)
c.Assert(length, check.Equals, int64(0))
switch err := err.ToGoError().(type) {
case ObjectNotFound:
c.Assert(err.Bucket, check.Equals, "bucket")
c.Assert(err.Object, check.Equals, "dir1")
default:
// force a failure with a line number
c.Assert(err, check.Equals, "ObjectNotFound")
}
c.Assert(len(byteBuffer.Bytes()), check.Equals, 0)
var byteBuffer2 bytes.Buffer
length, err = fs.GetObject(&byteBuffer, "bucket", "dir1/", 0, 0)
c.Assert(length, check.Equals, int64(0))
switch err := err.ToGoError().(type) {
case ObjectNotFound:
c.Assert(err.Bucket, check.Equals, "bucket")
c.Assert(err.Object, check.Equals, "dir1/")
default:
// force a failure with a line number
c.Assert(err, check.Equals, "ObjectNotFound")
}
c.Assert(len(byteBuffer2.Bytes()), check.Equals, 0)
}
func testDefaultContentType(c *check.C, create func() CloudStorage) {
fs := create()
err := fs.MakeBucket("bucket", "")
c.Assert(err, check.IsNil)
// test empty
_, err = fs.CreateObject("bucket", "one", "", int64(len("one")), bytes.NewBufferString("one"), nil)
metadata, err := fs.GetObjectMetadata("bucket", "one")
c.Assert(err, check.IsNil)
c.Assert(metadata.ContentType, check.Equals, "application/octet-stream")
}
func testContentMd5Set(c *check.C, create func() CloudStorage) {
fs := create()
err := fs.MakeBucket("bucket", "")
c.Assert(err, check.IsNil)
// test md5 invalid
badmd5Sum := "NWJiZjVhNTIzMjhlNzQzOWFlNmU3MTlkZmU3MTIyMDA"
calculatedmd5sum, err := fs.CreateObject("bucket", "one", badmd5Sum, int64(len("one")), bytes.NewBufferString("one"), nil)
c.Assert(err, check.Not(check.IsNil))
c.Assert(calculatedmd5sum, check.Not(check.Equals), badmd5Sum)
goodmd5sum := "NWJiZjVhNTIzMjhlNzQzOWFlNmU3MTlkZmU3MTIyMDA="
calculatedmd5sum, err = fs.CreateObject("bucket", "two", goodmd5sum, int64(len("one")), bytes.NewBufferString("one"), nil)
c.Assert(err, check.IsNil)
c.Assert(calculatedmd5sum, check.Equals, goodmd5sum)
}

View File

@@ -0,0 +1,511 @@
// +build windows
/*
* Minimalist Object Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package fs
import (
"bytes"
"crypto/md5"
"encoding/base64"
"encoding/hex"
"encoding/xml"
"math/rand"
"strconv"
"time"
"gopkg.in/check.v1"
)
// APITestSuite - collection of API tests
func APITestSuite(c *check.C, create func() CloudStorage) {
testMakeBucket(c, create)
testMultipleObjectCreation(c, create)
testPaging(c, create)
testObjectOverwriteWorks(c, create)
testNonExistantBucketOperations(c, create)
testBucketMetadata(c, create)
testBucketRecreateFails(c, create)
testPutObjectInSubdir(c, create)
testListBuckets(c, create)
testListBucketsOrder(c, create)
testListObjectsTestsForNonExistantBucket(c, create)
testNonExistantObjectInBucket(c, create)
testGetDirectoryReturnsObjectNotFound(c, create)
testDefaultContentType(c, create)
testMultipartObjectCreation(c, create)
testMultipartObjectAbort(c, create)
}
func testMakeBucket(c *check.C, create func() CloudStorage) {
fs := create()
err := fs.MakeBucket("bucket", "")
c.Assert(err, check.IsNil)
}
func testMultipartObjectCreation(c *check.C, create func() CloudStorage) {
fs := create()
err := fs.MakeBucket("bucket", "")
c.Assert(err, check.IsNil)
uploadID, err := fs.NewMultipartUpload("bucket", "key")
c.Assert(err, check.IsNil)
completedParts := CompleteMultipartUpload{}
completedParts.Part = make([]CompletePart, 0)
finalHasher := md5.New()
for i := 1; i <= 10; i++ {
randomPerm := rand.Perm(10)
randomString := ""
for _, num := range randomPerm {
randomString = randomString + strconv.Itoa(num)
}
hasher := md5.New()
finalHasher.Write([]byte(randomString))
hasher.Write([]byte(randomString))
expectedmd5Sum := base64.StdEncoding.EncodeToString(hasher.Sum(nil))
expectedmd5Sumhex := hex.EncodeToString(hasher.Sum(nil))
var calculatedmd5sum string
calculatedmd5sum, err = fs.CreateObjectPart("bucket", "key", uploadID, expectedmd5Sum, i, int64(len(randomString)),
bytes.NewBufferString(randomString), nil)
c.Assert(err, check.IsNil)
c.Assert(calculatedmd5sum, check.Equals, expectedmd5Sumhex)
completedParts.Part = append(completedParts.Part, CompletePart{PartNumber: i, ETag: calculatedmd5sum})
}
finalExpectedmd5SumHex := hex.EncodeToString(finalHasher.Sum(nil))
completedPartsBytes, e := xml.Marshal(completedParts)
c.Assert(e, check.IsNil)
objectMetadata, err := fs.CompleteMultipartUpload("bucket", "key", uploadID, bytes.NewReader(completedPartsBytes), nil)
c.Assert(err, check.IsNil)
c.Assert(objectMetadata.Md5, check.Equals, finalExpectedmd5SumHex)
}
func testMultipartObjectAbort(c *check.C, create func() CloudStorage) {
fs := create()
err := fs.MakeBucket("bucket", "")
c.Assert(err, check.IsNil)
uploadID, err := fs.NewMultipartUpload("bucket", "key")
c.Assert(err, check.IsNil)
parts := make(map[int]string)
for i := 1; i <= 10; i++ {
randomPerm := rand.Perm(10)
randomString := ""
for _, num := range randomPerm {
randomString = randomString + strconv.Itoa(num)
}
hasher := md5.New()
hasher.Write([]byte(randomString))
expectedmd5Sum := base64.StdEncoding.EncodeToString(hasher.Sum(nil))
expectedmd5Sumhex := hex.EncodeToString(hasher.Sum(nil))
var calculatedmd5sum string
calculatedmd5sum, err = fs.CreateObjectPart("bucket", "key", uploadID, expectedmd5Sum, i, int64(len(randomString)),
bytes.NewBufferString(randomString), nil)
c.Assert(err, check.IsNil)
c.Assert(calculatedmd5sum, check.Equals, expectedmd5Sumhex)
parts[i] = calculatedmd5sum
}
err = fs.AbortMultipartUpload("bucket", "key", uploadID)
c.Assert(err, check.IsNil)
}
func testMultipleObjectCreation(c *check.C, create func() CloudStorage) {
objects := make(map[string][]byte)
fs := create()
err := fs.MakeBucket("bucket", "")
c.Assert(err, check.IsNil)
for i := 0; i < 10; i++ {
randomPerm := rand.Perm(10)
randomString := ""
for _, num := range randomPerm {
randomString = randomString + strconv.Itoa(num)
}
hasher := md5.New()
hasher.Write([]byte(randomString))
expectedmd5Sum := base64.StdEncoding.EncodeToString(hasher.Sum(nil))
expectedmd5Sumhex := hex.EncodeToString(hasher.Sum(nil))
key := "obj" + strconv.Itoa(i)
objects[key] = []byte(randomString)
objectMetadata, err := fs.CreateObject("bucket", key, expectedmd5Sum, int64(len(randomString)), bytes.NewBufferString(randomString), nil)
c.Assert(err, check.IsNil)
c.Assert(objectMetadata.Md5, check.Equals, expectedmd5Sumhex)
}
for key, value := range objects {
var byteBuffer bytes.Buffer
_, err := fs.GetObject(&byteBuffer, "bucket", key, 0, 0)
c.Assert(err, check.IsNil)
c.Assert(byteBuffer.Bytes(), check.DeepEquals, value)
metadata, err := fs.GetObjectMetadata("bucket", key)
c.Assert(err, check.IsNil)
c.Assert(metadata.Size, check.Equals, int64(len(value)))
}
}
func testPaging(c *check.C, create func() CloudStorage) {
fs := create()
fs.MakeBucket("bucket", "")
resources := BucketResourcesMetadata{}
objects, resources, err := fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil)
c.Assert(len(objects), check.Equals, 0)
c.Assert(resources.IsTruncated, check.Equals, false)
// check before paging occurs
for i := 0; i < 5; i++ {
key := "obj" + strconv.Itoa(i)
_, err = fs.CreateObject("bucket", key, "", int64(len(key)), bytes.NewBufferString(key), nil)
c.Assert(err, check.IsNil)
resources.Maxkeys = 5
resources.Prefix = ""
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil)
c.Assert(len(objects), check.Equals, i+1)
c.Assert(resources.IsTruncated, check.Equals, false)
}
// check after paging occurs pages work
for i := 6; i <= 10; i++ {
key := "obj" + strconv.Itoa(i)
_, err = fs.CreateObject("bucket", key, "", int64(len(key)), bytes.NewBufferString(key), nil)
c.Assert(err, check.IsNil)
resources.Maxkeys = 5
resources.Prefix = ""
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil)
c.Assert(len(objects), check.Equals, 5)
c.Assert(resources.IsTruncated, check.Equals, true)
}
// check paging with prefix at end returns less objects
{
_, err = fs.CreateObject("bucket", "newPrefix", "", int64(len("prefix1")), bytes.NewBufferString("prefix1"), nil)
c.Assert(err, check.IsNil)
_, err = fs.CreateObject("bucket", "newPrefix2", "", int64(len("prefix2")), bytes.NewBufferString("prefix2"), nil)
c.Assert(err, check.IsNil)
resources.Prefix = "new"
resources.Maxkeys = 5
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil)
c.Assert(len(objects), check.Equals, 2)
}
// check ordering of pages
{
resources.Prefix = ""
resources.Maxkeys = 1000
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil)
c.Assert(objects[0].Object, check.Equals, "newPrefix")
c.Assert(objects[1].Object, check.Equals, "newPrefix2")
c.Assert(objects[2].Object, check.Equals, "obj0")
c.Assert(objects[3].Object, check.Equals, "obj1")
c.Assert(objects[4].Object, check.Equals, "obj10")
}
// check delimited results with delimiter and prefix
{
_, err = fs.CreateObject("bucket", "this/is/delimited", "", int64(len("prefix1")), bytes.NewBufferString("prefix1"), nil)
c.Assert(err, check.IsNil)
_, err = fs.CreateObject("bucket", "this/is/also/a/delimited/file", "", int64(len("prefix2")), bytes.NewBufferString("prefix2"), nil)
c.Assert(err, check.IsNil)
var prefixes []string
resources.CommonPrefixes = prefixes // allocate new everytime
resources.Delimiter = "\\"
resources.Prefix = "this\\is\\"
resources.Maxkeys = 10
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil)
c.Assert(len(objects), check.Equals, 1)
c.Assert(resources.CommonPrefixes[0], check.Equals, "this\\is\\also\\")
}
time.Sleep(time.Second)
// check delimited results with delimiter without prefix
{
var prefixes []string
resources.CommonPrefixes = prefixes // allocate new everytime
resources.Delimiter = "\\"
resources.Prefix = ""
resources.Maxkeys = 1000
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil)
c.Assert(objects[0].Object, check.Equals, "newPrefix")
c.Assert(objects[1].Object, check.Equals, "newPrefix2")
c.Assert(objects[2].Object, check.Equals, "obj0")
c.Assert(objects[3].Object, check.Equals, "obj1")
c.Assert(objects[4].Object, check.Equals, "obj10")
c.Assert(resources.CommonPrefixes[0], check.Equals, "this\\")
}
// check results with Marker
{
var prefixes []string
resources.CommonPrefixes = prefixes // allocate new everytime
resources.Prefix = ""
resources.Marker = "newPrefix"
resources.Delimiter = ""
resources.Maxkeys = 3
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil)
c.Assert(objects[0].Object, check.Equals, "newPrefix2")
c.Assert(objects[1].Object, check.Equals, "obj0")
c.Assert(objects[2].Object, check.Equals, "obj1")
}
// check ordering of results with prefix
{
resources.Prefix = "obj"
resources.Delimiter = ""
resources.Marker = ""
resources.Maxkeys = 1000
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil)
c.Assert(objects[0].Object, check.Equals, "obj0")
c.Assert(objects[1].Object, check.Equals, "obj1")
c.Assert(objects[2].Object, check.Equals, "obj10")
c.Assert(objects[3].Object, check.Equals, "obj2")
c.Assert(objects[4].Object, check.Equals, "obj3")
}
// check ordering of results with prefix and no paging
{
resources.Prefix = "new"
resources.Marker = ""
resources.Maxkeys = 5
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil)
c.Assert(objects[0].Object, check.Equals, "newPrefix")
c.Assert(objects[1].Object, check.Equals, "newPrefix2")
}
}
func testObjectOverwriteWorks(c *check.C, create func() CloudStorage) {
fs := create()
fs.MakeBucket("bucket", "")
hasher1 := md5.New()
hasher1.Write([]byte("one"))
md5Sum1 := base64.StdEncoding.EncodeToString(hasher1.Sum(nil))
md5Sum1hex := hex.EncodeToString(hasher1.Sum(nil))
objectMetadata, err := fs.CreateObject("bucket", "object", md5Sum1, int64(len("one")), bytes.NewBufferString("one"), nil)
c.Assert(err, check.IsNil)
c.Assert(md5Sum1hex, check.Equals, objectMetadata.Md5)
hasher2 := md5.New()
hasher2.Write([]byte("three"))
md5Sum2 := base64.StdEncoding.EncodeToString(hasher2.Sum(nil))
_, err = fs.CreateObject("bucket", "object", md5Sum2, int64(len("three")), bytes.NewBufferString("three"), nil)
c.Assert(err, check.IsNil)
var bytesBuffer bytes.Buffer
length, err := fs.GetObject(&bytesBuffer, "bucket", "object", 0, 0)
c.Assert(err, check.IsNil)
c.Assert(length, check.Equals, int64(len("three")))
c.Assert(string(bytesBuffer.Bytes()), check.Equals, "three")
}
func testNonExistantBucketOperations(c *check.C, create func() CloudStorage) {
fs := create()
_, err := fs.CreateObject("bucket", "object", "", int64(len("one")), bytes.NewBufferString("one"), nil)
c.Assert(err, check.Not(check.IsNil))
}
func testBucketMetadata(c *check.C, create func() CloudStorage) {
fs := create()
err := fs.MakeBucket("string", "private")
c.Assert(err, check.IsNil)
metadata, err := fs.GetBucketMetadata("string")
c.Assert(err, check.IsNil)
// On windows everything directory is always in public-read-write mode -- TODO need to handle this
c.Assert(metadata.ACL, check.Equals, BucketACL("public-read-write"))
}
func testBucketRecreateFails(c *check.C, create func() CloudStorage) {
fs := create()
err := fs.MakeBucket("string", "private")
c.Assert(err, check.IsNil)
err = fs.MakeBucket("string", "private")
c.Assert(err, check.Not(check.IsNil))
}
func testPutObjectInSubdir(c *check.C, create func() CloudStorage) {
fs := create()
err := fs.MakeBucket("bucket", "private")
c.Assert(err, check.IsNil)
hasher := md5.New()
hasher.Write([]byte("hello world"))
md5Sum1 := base64.StdEncoding.EncodeToString(hasher.Sum(nil))
md5Sum1hex := hex.EncodeToString(hasher.Sum(nil))
objectMetadata, err := fs.CreateObject("bucket", "dir1/dir2/object", md5Sum1, int64(len("hello world")), bytes.NewBufferString("hello world"), nil)
c.Assert(err, check.IsNil)
c.Assert(objectMetadata.Md5, check.Equals, md5Sum1hex)
var bytesBuffer bytes.Buffer
length, err := fs.GetObject(&bytesBuffer, "bucket", "dir1/dir2/object", 0, 0)
c.Assert(err, check.IsNil)
c.Assert(len(bytesBuffer.Bytes()), check.Equals, len("hello world"))
c.Assert(int64(len(bytesBuffer.Bytes())), check.Equals, length)
}
func testListBuckets(c *check.C, create func() CloudStorage) {
fs := create()
// test empty list
buckets, err := fs.ListBuckets()
c.Assert(err, check.IsNil)
c.Assert(len(buckets), check.Equals, 0)
// add one and test exists
err = fs.MakeBucket("bucket1", "")
c.Assert(err, check.IsNil)
buckets, err = fs.ListBuckets()
c.Assert(len(buckets), check.Equals, 1)
c.Assert(err, check.IsNil)
// add two and test exists
err = fs.MakeBucket("bucket2", "")
c.Assert(err, check.IsNil)
buckets, err = fs.ListBuckets()
c.Assert(len(buckets), check.Equals, 2)
c.Assert(err, check.IsNil)
// add three and test exists + prefix
err = fs.MakeBucket("bucket22", "")
buckets, err = fs.ListBuckets()
c.Assert(len(buckets), check.Equals, 3)
c.Assert(err, check.IsNil)
}
func testListBucketsOrder(c *check.C, create func() CloudStorage) {
// if implementation contains a map, order of map keys will vary.
// this ensures they return in the same order each time
for i := 0; i < 10; i++ {
fs := create()
// add one and test exists
err := fs.MakeBucket("bucket1", "")
c.Assert(err, check.IsNil)
err = fs.MakeBucket("bucket2", "")
c.Assert(err, check.IsNil)
buckets, err := fs.ListBuckets()
c.Assert(err, check.IsNil)
c.Assert(len(buckets), check.Equals, 2)
c.Assert(buckets[0].Name, check.Equals, "bucket1")
c.Assert(buckets[1].Name, check.Equals, "bucket2")
}
}
func testListObjectsTestsForNonExistantBucket(c *check.C, create func() CloudStorage) {
fs := create()
resources := BucketResourcesMetadata{Prefix: "", Maxkeys: 1000}
objects, resources, err := fs.ListObjects("bucket", resources)
c.Assert(err, check.Not(check.IsNil))
c.Assert(resources.IsTruncated, check.Equals, false)
c.Assert(len(objects), check.Equals, 0)
}
func testNonExistantObjectInBucket(c *check.C, create func() CloudStorage) {
fs := create()
err := fs.MakeBucket("bucket", "")
c.Assert(err, check.IsNil)
var byteBuffer bytes.Buffer
length, err := fs.GetObject(&byteBuffer, "bucket", "dir1", 0, 0)
c.Assert(length, check.Equals, int64(0))
c.Assert(err, check.Not(check.IsNil))
c.Assert(len(byteBuffer.Bytes()), check.Equals, 0)
switch err := err.ToGoError().(type) {
case ObjectNotFound:
{
c.Assert(err, check.ErrorMatches, "Object not found: bucket#dir1")
}
default:
{
c.Assert(err, check.Equals, "fails")
}
}
}
func testGetDirectoryReturnsObjectNotFound(c *check.C, create func() CloudStorage) {
fs := create()
err := fs.MakeBucket("bucket", "")
c.Assert(err, check.IsNil)
_, err = fs.CreateObject("bucket", "dir1/dir2/object", "", int64(len("hello world")), bytes.NewBufferString("hello world"), nil)
c.Assert(err, check.IsNil)
var byteBuffer bytes.Buffer
length, err := fs.GetObject(&byteBuffer, "bucket", "dir1", 0, 0)
c.Assert(length, check.Equals, int64(0))
switch err := err.ToGoError().(type) {
case ObjectNotFound:
c.Assert(err.Bucket, check.Equals, "bucket")
c.Assert(err.Object, check.Equals, "dir1")
default:
// force a failure with a line number
c.Assert(err, check.Equals, "ObjectNotFound")
}
c.Assert(len(byteBuffer.Bytes()), check.Equals, 0)
var byteBuffer2 bytes.Buffer
length, err = fs.GetObject(&byteBuffer, "bucket", "dir1/", 0, 0)
c.Assert(length, check.Equals, int64(0))
switch err := err.ToGoError().(type) {
case ObjectNotFound:
c.Assert(err.Bucket, check.Equals, "bucket")
c.Assert(err.Object, check.Equals, "dir1/")
default:
// force a failure with a line number
c.Assert(err, check.Equals, "ObjectNotFound")
}
c.Assert(len(byteBuffer2.Bytes()), check.Equals, 0)
}
func testDefaultContentType(c *check.C, create func() CloudStorage) {
fs := create()
err := fs.MakeBucket("bucket", "")
c.Assert(err, check.IsNil)
// test empty
_, err = fs.CreateObject("bucket", "one", "", int64(len("one")), bytes.NewBufferString("one"), nil)
metadata, err := fs.GetObjectMetadata("bucket", "one")
c.Assert(err, check.IsNil)
c.Assert(metadata.ContentType, check.Equals, "application/octet-stream")
}
func testContentMd5Set(c *check.C, create func() CloudStorage) {
fs := create()
err := fs.MakeBucket("bucket", "")
c.Assert(err, check.IsNil)
// test md5 invalid
badmd5Sum := "NWJiZjVhNTIzMjhlNzQzOWFlNmU3MTlkZmU3MTIyMDA"
calculatedmd5sum, err := fs.CreateObject("bucket", "one", badmd5Sum, int64(len("one")), bytes.NewBufferString("one"), nil)
c.Assert(err, check.Not(check.IsNil))
c.Assert(calculatedmd5sum, check.Not(check.Equals), badmd5Sum)
goodmd5sum := "NWJiZjVhNTIzMjhlNzQzOWFlNmU3MTlkZmU3MTIyMDA="
calculatedmd5sum, err = fs.CreateObject("bucket", "two", goodmd5sum, int64(len("one")), bytes.NewBufferString("one"), nil)
c.Assert(err, check.IsNil)
c.Assert(calculatedmd5sum, check.Equals, goodmd5sum)
}

85
pkg/fs/config.go Normal file
View File

@@ -0,0 +1,85 @@
/*
* Minio Cloud Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package fs
import (
"os/user"
"path/filepath"
"github.com/minio/minio-xl/pkg/probe"
"github.com/minio/minio-xl/pkg/quick"
)
func getFSMultipartConfigPath() (string, *probe.Error) {
if customMultipartsConfigPath != "" {
return customMultipartsConfigPath, nil
}
u, err := user.Current()
if err != nil {
return "", probe.NewError(err)
}
fsMultipartsConfigPath := filepath.Join(u.HomeDir, ".minio", "multiparts.json")
return fsMultipartsConfigPath, nil
}
// internal variable only accessed via get/set methods
var customConfigPath, customMultipartsConfigPath string
// SetFSConfigPath - set custom fs config path
func SetFSConfigPath(configPath string) {
customConfigPath = configPath
}
// SetFSMultipartsConfigPath - set custom multiparts session config path
func SetFSMultipartsConfigPath(configPath string) {
customMultipartsConfigPath = configPath
}
// SaveMultipartsSession - save multiparts
func SaveMultipartsSession(multiparts *Multiparts) *probe.Error {
fsMultipartsConfigPath, err := getFSMultipartConfigPath()
if err != nil {
return err.Trace()
}
qc, err := quick.New(multiparts)
if err != nil {
return err.Trace()
}
if err := qc.Save(fsMultipartsConfigPath); err != nil {
return err.Trace()
}
return nil
}
// loadMultipartsSession load multipart session file
func loadMultipartsSession() (*Multiparts, *probe.Error) {
fsMultipartsConfigPath, err := getFSMultipartConfigPath()
if err != nil {
return nil, err.Trace()
}
multiparts := &Multiparts{}
multiparts.Version = "1"
multiparts.ActiveSession = make(map[string]*MultipartSession)
qc, err := quick.New(multiparts)
if err != nil {
return nil, err.Trace()
}
if err := qc.Load(fsMultipartsConfigPath); err != nil {
return nil, err.Trace()
}
return qc.Data().(*Multiparts), nil
}

198
pkg/fs/definitions.go Normal file
View File

@@ -0,0 +1,198 @@
/*
* Minimalist Object Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package fs
import (
"os"
"regexp"
"strings"
"time"
"unicode/utf8"
)
// BucketACL - bucket level access control
type BucketACL string
// different types of ACL's currently supported for buckets
const (
BucketPrivate = BucketACL("private")
BucketPublicRead = BucketACL("public-read")
BucketPublicReadWrite = BucketACL("public-read-write")
)
func (b BucketACL) String() string {
return string(b)
}
// IsPrivate - is acl Private
func (b BucketACL) IsPrivate() bool {
return b == BucketACL("private")
}
// IsPublicRead - is acl PublicRead
func (b BucketACL) IsPublicRead() bool {
return b == BucketACL("public-read")
}
// IsPublicReadWrite - is acl PublicReadWrite
func (b BucketACL) IsPublicReadWrite() bool {
return b == BucketACL("public-read-write")
}
// BucketMetadata - name and create date
type BucketMetadata struct {
Name string
Created time.Time
ACL BucketACL
}
// ObjectMetadata - object key and its relevant metadata
type ObjectMetadata struct {
Bucket string
Object string
ContentType string
Created time.Time
Mode os.FileMode
Md5 string
Size int64
}
// PartMetadata - various types of individual part resources
type PartMetadata struct {
PartNumber int
LastModified time.Time
ETag string
Size int64
}
// ObjectResourcesMetadata - various types of object resources
type ObjectResourcesMetadata struct {
Bucket string
Object string
UploadID string
StorageClass string
PartNumberMarker int
NextPartNumberMarker int
MaxParts int
IsTruncated bool
Part []*PartMetadata
EncodingType string
}
// UploadMetadata container capturing metadata on in progress multipart upload in a given bucket
type UploadMetadata struct {
Object string
UploadID string
StorageClass string
Initiated time.Time
}
// BucketMultipartResourcesMetadata - various types of bucket resources for inprogress multipart uploads
type BucketMultipartResourcesMetadata struct {
KeyMarker string
UploadIDMarker string
NextKeyMarker string
NextUploadIDMarker string
EncodingType string
MaxUploads int
IsTruncated bool
Upload []*UploadMetadata
Prefix string
Delimiter string
CommonPrefixes []string
}
// BucketResourcesMetadata - various types of bucket resources
type BucketResourcesMetadata struct {
Prefix string
Marker string
NextMarker string
Maxkeys int
EncodingType string
Delimiter string
IsTruncated bool
CommonPrefixes []string
}
// CompletePart - completed part container
type CompletePart struct {
PartNumber int
ETag string
}
// completedParts is a sortable interface for Part slice
type completedParts []CompletePart
func (a completedParts) Len() int { return len(a) }
func (a completedParts) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a completedParts) Less(i, j int) bool { return a[i].PartNumber < a[j].PartNumber }
// CompleteMultipartUpload container for completing multipart upload
type CompleteMultipartUpload struct {
Part []CompletePart
}
// IsValidBucketACL - is provided acl string supported
func IsValidBucketACL(acl string) bool {
switch acl {
case "private":
fallthrough
case "public-read":
fallthrough
case "public-read-write":
return true
case "":
// by default its "private"
return true
default:
return false
}
}
// IsValidBucket - verify bucket name in accordance with
// - http://docs.aws.amazon.com/AmazonS3/latest/dev/UsingBucket.html
func IsValidBucket(bucket string) bool {
if len(bucket) < 3 || len(bucket) > 63 {
return false
}
if bucket[0] == '.' || bucket[len(bucket)-1] == '.' {
return false
}
if match, _ := regexp.MatchString("\\.\\.", bucket); match == true {
return false
}
// We don't support buckets with '.' in them
match, _ := regexp.MatchString("^[a-zA-Z][a-zA-Z0-9\\-]+[a-zA-Z0-9]$", bucket)
return match
}
// IsValidObjectName - verify object name in accordance with
// - http://docs.aws.amazon.com/AmazonS3/latest/dev/UsingMetadata.html
func IsValidObjectName(object string) bool {
if strings.TrimSpace(object) == "" {
return true
}
if len(object) > 1024 || len(object) == 0 {
return false
}
if !utf8.ValidString(object) {
return false
}
return true
}

361
pkg/fs/errors.go Normal file
View File

@@ -0,0 +1,361 @@
/*
* Minio Cloud Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package fs
import "fmt"
// MissingDateHeader date header missing
type MissingDateHeader struct{}
func (e MissingDateHeader) Error() string {
return "Missing date header"
}
// MissingExpiresQuery expires query string missing
type MissingExpiresQuery struct{}
func (e MissingExpiresQuery) Error() string {
return "Missing expires query string"
}
// ExpiredPresignedRequest request already expired
type ExpiredPresignedRequest struct{}
func (e ExpiredPresignedRequest) Error() string {
return "Presigned request already expired"
}
// SignatureDoesNotMatch invalid signature
type SignatureDoesNotMatch struct {
SignatureSent string
SignatureCalculated string
}
func (e SignatureDoesNotMatch) Error() string {
return "The request signature we calculated does not match the signature you provided"
}
// InvalidArgument invalid argument
type InvalidArgument struct{}
func (e InvalidArgument) Error() string {
return "Invalid argument"
}
// UnsupportedFilesystem unsupported filesystem type
type UnsupportedFilesystem struct {
Type string
}
func (e UnsupportedFilesystem) Error() string {
return "Unsupported filesystem: " + e.Type
}
// BucketNotFound bucket does not exist
type BucketNotFound struct {
Bucket string
}
func (e BucketNotFound) Error() string {
return "Bucket not found: " + e.Bucket
}
// BucketNotEmpty bucket is not empty
type BucketNotEmpty struct {
Bucket string
}
func (e BucketNotEmpty) Error() string {
return "Bucket not empty: " + e.Bucket
}
// ObjectNotFound object does not exist
type ObjectNotFound struct {
Bucket string
Object string
}
func (e ObjectNotFound) Error() string {
return "Object not found: " + e.Bucket + "#" + e.Object
}
// ObjectCorrupted object found to be corrupted
type ObjectCorrupted struct {
Object string
}
func (e ObjectCorrupted) Error() string {
return "Object found corrupted: " + e.Object
}
// BucketExists bucket exists
type BucketExists struct {
Bucket string
}
func (e BucketExists) Error() string {
return "Bucket exists: " + e.Bucket
}
// CorruptedBackend backend found to be corrupted
type CorruptedBackend struct {
Backend string
}
func (e CorruptedBackend) Error() string {
return "Corrupted backend: " + e.Backend
}
// NotImplemented function not implemented
type NotImplemented struct {
Function string
}
func (e NotImplemented) Error() string {
return "Not implemented: " + e.Function
}
// InvalidDisksArgument invalid number of disks per node
type InvalidDisksArgument struct{}
func (e InvalidDisksArgument) Error() string {
return "Invalid number of disks per node"
}
// BadDigest bad md5sum
type BadDigest struct {
Md5 string
Bucket string
Object string
}
func (e BadDigest) Error() string {
return "Bad digest"
}
// ParityOverflow parity over flow
type ParityOverflow struct{}
func (e ParityOverflow) Error() string {
return "Parity overflow"
}
// ChecksumMismatch checksum mismatch
type ChecksumMismatch struct{}
func (e ChecksumMismatch) Error() string {
return "Checksum mismatch"
}
// MissingPOSTPolicy missing post policy
type MissingPOSTPolicy struct{}
func (e MissingPOSTPolicy) Error() string {
return "Missing POST policy in multipart form"
}
// InternalError - generic internal error
type InternalError struct {
}
// BackendError - generic disk backend error
type BackendError struct {
Path string
}
// BackendCorrupted - path has corrupted data
type BackendCorrupted BackendError
// APINotImplemented - generic API not implemented error
type APINotImplemented struct {
API string
}
// GenericBucketError - generic bucket error
type GenericBucketError struct {
Bucket string
}
// GenericObjectError - generic object error
type GenericObjectError struct {
Bucket string
Object string
}
// ImplementationError - generic implementation error
type ImplementationError struct {
Bucket string
Object string
Err error
}
// DigestError - Generic Md5 error
type DigestError struct {
Bucket string
Key string
Md5 string
}
/// ACL related errors
// InvalidACL - acl invalid
type InvalidACL struct {
ACL string
}
func (e InvalidACL) Error() string {
return "Requested ACL is " + e.ACL + " invalid"
}
/// Bucket related errors
// BucketNameInvalid - bucketname provided is invalid
type BucketNameInvalid GenericBucketError
/// Object related errors
// EntityTooLarge - object size exceeds maximum limit
type EntityTooLarge struct {
GenericObjectError
Size string
MaxSize string
}
// ObjectNameInvalid - object name provided is invalid
type ObjectNameInvalid GenericObjectError
// InvalidDigest - md5 in request header invalid
type InvalidDigest DigestError
// Return string an error formatted as the given text
func (e ImplementationError) Error() string {
error := ""
if e.Bucket != "" {
error = error + "Bucket: " + e.Bucket + " "
}
if e.Object != "" {
error = error + "Object: " + e.Object + " "
}
error = error + "Error: " + e.Err.Error()
return error
}
// EmbedError - wrapper function for error object
func EmbedError(bucket, object string, err error) ImplementationError {
return ImplementationError{
Bucket: bucket,
Object: object,
Err: err,
}
}
// Return string an error formatted as the given text
func (e InternalError) Error() string {
return "Internal error occured"
}
// Return string an error formatted as the given text
func (e APINotImplemented) Error() string {
return "Api not implemented: " + e.API
}
// Return string an error formatted as the given text
func (e BucketNameInvalid) Error() string {
return "Bucket name invalid: " + e.Bucket
}
// Return string an error formatted as the given text
func (e ObjectNameInvalid) Error() string {
return "Object name invalid: " + e.Bucket + "#" + e.Object
}
// Return string an error formatted as the given text
func (e EntityTooLarge) Error() string {
return e.Bucket + "#" + e.Object + "with " + e.Size + "reached maximum allowed size limit " + e.MaxSize
}
// IncompleteBody You did not provide the number of bytes specified by the Content-Length HTTP header
type IncompleteBody GenericObjectError
// Return string an error formatted as the given text
func (e IncompleteBody) Error() string {
return e.Bucket + "#" + e.Object + "has incomplete body"
}
// Return string an error formatted as the given text
func (e BackendCorrupted) Error() string {
return "Backend corrupted: " + e.Path
}
// Return string an error formatted as the given text
func (e InvalidDigest) Error() string {
return "Md5 provided " + e.Md5 + " is invalid"
}
// OperationNotPermitted - operation not permitted
type OperationNotPermitted struct {
Op string
Reason string
}
func (e OperationNotPermitted) Error() string {
return "Operation " + e.Op + " not permitted for reason: " + e.Reason
}
// InvalidRange - invalid range
type InvalidRange struct {
Start int64
Length int64
}
func (e InvalidRange) Error() string {
return fmt.Sprintf("Invalid range start:%d length:%d", e.Start, e.Length)
}
/// Multipart related errors
// InvalidUploadID invalid upload id
type InvalidUploadID struct {
UploadID string
}
func (e InvalidUploadID) Error() string {
return "Invalid upload id " + e.UploadID
}
// InvalidPart One or more of the specified parts could not be found
type InvalidPart struct{}
func (e InvalidPart) Error() string {
return "One or more of the specified parts could not be found"
}
// InvalidPartOrder parts are not ordered as Requested
type InvalidPartOrder struct {
UploadID string
}
func (e InvalidPartOrder) Error() string {
return "Invalid part order sent for " + e.UploadID
}
// MalformedXML invalid xml format
type MalformedXML struct{}
func (e MalformedXML) Error() string {
return "Malformed XML"
}

75
pkg/fs/fs-common.go Normal file
View File

@@ -0,0 +1,75 @@
/*
* Minio Cloud Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package fs
import (
"bufio"
"bytes"
"os"
"sort"
"strings"
"time"
)
// Metadata - carries metadata about object
type Metadata struct {
Md5sum []byte
ContentType string
}
// sortUnique sort a slice in lexical order, removing duplicate elements
func sortUnique(objects []string) []string {
objectMap := make(map[string]string)
for _, v := range objects {
objectMap[v] = v
}
var results []string
for k := range objectMap {
results = append(results, k)
}
sort.Strings(results)
return results
}
type contentInfo struct {
os.FileInfo
Prefix string
Size int64
Mode os.FileMode
ModTime time.Time
}
type bucketDir struct {
files []contentInfo
root string
}
func delimiter(object, delimiter string) string {
readBuffer := bytes.NewBufferString(object)
reader := bufio.NewReader(readBuffer)
stringReader := strings.NewReader(delimiter)
delimited, _ := stringReader.ReadByte()
delimitedStr, _ := reader.ReadString(delimited)
return delimitedStr
}
// byObjectMetadataKey is a sortable interface for UploadMetadata slice
type byUploadMetadataKey []*UploadMetadata
func (b byUploadMetadataKey) Len() int { return len(b) }
func (b byUploadMetadataKey) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
func (b byUploadMetadataKey) Less(i, j int) bool { return b[i].Object < b[j].Object }

99
pkg/fs/fs-filter.go Normal file
View File

@@ -0,0 +1,99 @@
/*
* Minio Cloud Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package fs
import (
"strings"
"github.com/minio/minio-xl/pkg/probe"
)
func (fs API) filterObjects(bucket string, content contentInfo, resources BucketResourcesMetadata) (ObjectMetadata, BucketResourcesMetadata, *probe.Error) {
var err *probe.Error
var metadata ObjectMetadata
name := content.Prefix
switch true {
// Both delimiter and Prefix is present
case resources.Delimiter != "" && resources.Prefix != "":
if strings.HasPrefix(name, resources.Prefix) {
trimmedName := strings.TrimPrefix(name, resources.Prefix)
delimitedName := delimiter(trimmedName, resources.Delimiter)
switch true {
case name == resources.Prefix:
// Use resources.Prefix to filter out delimited file
metadata, err = getMetadata(fs.path, bucket, name)
if err != nil {
return ObjectMetadata{}, resources, err.Trace()
}
case delimitedName == content.FileInfo.Name():
// Use resources.Prefix to filter out delimited files
metadata, err = getMetadata(fs.path, bucket, name)
if err != nil {
return ObjectMetadata{}, resources, err.Trace()
}
if metadata.Mode.IsDir() {
resources.CommonPrefixes = append(resources.CommonPrefixes, name+resources.Delimiter)
sortUnique(resources.CommonPrefixes)
return ObjectMetadata{}, resources, nil
}
case delimitedName != "":
resources.CommonPrefixes = append(resources.CommonPrefixes, resources.Prefix+delimitedName)
sortUnique(resources.CommonPrefixes)
}
}
// Delimiter present and Prefix is absent
case resources.Delimiter != "" && resources.Prefix == "":
delimitedName := delimiter(name, resources.Delimiter)
switch true {
case delimitedName == "":
metadata, err = getMetadata(fs.path, bucket, name)
if err != nil {
return ObjectMetadata{}, resources, err.Trace()
}
case delimitedName == content.FileInfo.Name():
metadata, err = getMetadata(fs.path, bucket, name)
if err != nil {
return ObjectMetadata{}, resources, err.Trace()
}
if metadata.Mode.IsDir() {
resources.CommonPrefixes = append(resources.CommonPrefixes, name+resources.Delimiter)
sortUnique(resources.CommonPrefixes)
return ObjectMetadata{}, resources, nil
}
case delimitedName != "":
resources.CommonPrefixes = append(resources.CommonPrefixes, delimitedName)
sortUnique(resources.CommonPrefixes)
}
// Delimiter is absent and only Prefix is present
case resources.Delimiter == "" && resources.Prefix != "":
if strings.HasPrefix(name, resources.Prefix) {
// Do not strip prefix object output
metadata, err = getMetadata(fs.path, bucket, name)
if err != nil {
return ObjectMetadata{}, resources, err.Trace()
}
}
default:
metadata, err = getMetadata(fs.path, bucket, name)
if err != nil {
return ObjectMetadata{}, resources, err.Trace()
}
}
return metadata, resources, nil
}

537
pkg/fs/fs-multipart.go Normal file
View File

@@ -0,0 +1,537 @@
/*
* Minio Cloud Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package fs
import (
"bytes"
"crypto/md5"
"encoding/base64"
"encoding/hex"
"encoding/json"
"encoding/xml"
"errors"
"fmt"
"io"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"time"
"github.com/minio/minio-xl/pkg/atomic"
"github.com/minio/minio-xl/pkg/crypto/sha256"
"github.com/minio/minio-xl/pkg/crypto/sha512"
"github.com/minio/minio-xl/pkg/probe"
)
func (fs API) isValidUploadID(object, uploadID string) bool {
s, ok := fs.multiparts.ActiveSession[object]
if !ok {
return false
}
if uploadID == s.UploadID {
return true
}
return false
}
// ListMultipartUploads - list incomplete multipart sessions for a given BucketMultipartResourcesMetadata
func (fs API) ListMultipartUploads(bucket string, resources BucketMultipartResourcesMetadata) (BucketMultipartResourcesMetadata, *probe.Error) {
fs.lock.Lock()
defer fs.lock.Unlock()
if !IsValidBucket(bucket) {
return BucketMultipartResourcesMetadata{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
}
bucketPath := filepath.Join(fs.path, bucket)
_, err := os.Stat(bucketPath)
// check bucket exists
if os.IsNotExist(err) {
return BucketMultipartResourcesMetadata{}, probe.NewError(BucketNotFound{Bucket: bucket})
}
if err != nil {
return BucketMultipartResourcesMetadata{}, probe.NewError(InternalError{})
}
var uploads []*UploadMetadata
for object, session := range fs.multiparts.ActiveSession {
if strings.HasPrefix(object, resources.Prefix) {
if len(uploads) > resources.MaxUploads {
sort.Sort(byUploadMetadataKey(uploads))
resources.Upload = uploads
resources.NextKeyMarker = object
resources.NextUploadIDMarker = session.UploadID
resources.IsTruncated = true
return resources, nil
}
// uploadIDMarker is ignored if KeyMarker is empty
switch {
case resources.KeyMarker != "" && resources.UploadIDMarker == "":
if object > resources.KeyMarker {
upload := new(UploadMetadata)
upload.Object = object
upload.UploadID = session.UploadID
upload.Initiated = session.Initiated
uploads = append(uploads, upload)
}
case resources.KeyMarker != "" && resources.UploadIDMarker != "":
if session.UploadID > resources.UploadIDMarker {
if object >= resources.KeyMarker {
upload := new(UploadMetadata)
upload.Object = object
upload.UploadID = session.UploadID
upload.Initiated = session.Initiated
uploads = append(uploads, upload)
}
}
default:
upload := new(UploadMetadata)
upload.Object = object
upload.UploadID = session.UploadID
upload.Initiated = session.Initiated
uploads = append(uploads, upload)
}
}
}
sort.Sort(byUploadMetadataKey(uploads))
resources.Upload = uploads
return resources, nil
}
func (fs API) concatParts(parts *CompleteMultipartUpload, objectPath string, mw io.Writer) *probe.Error {
for _, part := range parts.Part {
recvMD5 := part.ETag
partFile, err := os.OpenFile(objectPath+fmt.Sprintf("$%d", part.PartNumber), os.O_RDONLY, 0600)
defer partFile.Close()
if err != nil {
return probe.NewError(err)
}
obj, err := ioutil.ReadAll(partFile)
if err != nil {
return probe.NewError(err)
}
calcMD5Bytes := md5.Sum(obj)
// complete multi part request header md5sum per part is hex encoded
recvMD5Bytes, err := hex.DecodeString(strings.Trim(recvMD5, "\""))
if err != nil {
return probe.NewError(InvalidDigest{Md5: recvMD5})
}
if !bytes.Equal(recvMD5Bytes, calcMD5Bytes[:]) {
return probe.NewError(BadDigest{Md5: recvMD5})
}
_, err = io.Copy(mw, bytes.NewBuffer(obj))
if err != nil {
return probe.NewError(err)
}
}
return nil
}
// NewMultipartUpload - initiate a new multipart session
func (fs API) NewMultipartUpload(bucket, object string) (string, *probe.Error) {
fs.lock.Lock()
defer fs.lock.Unlock()
if !IsValidBucket(bucket) {
return "", probe.NewError(BucketNameInvalid{Bucket: bucket})
}
if !IsValidObjectName(object) {
return "", probe.NewError(ObjectNameInvalid{Object: object})
}
bucketPath := filepath.Join(fs.path, bucket)
_, err := os.Stat(bucketPath)
// check bucket exists
if os.IsNotExist(err) {
return "", probe.NewError(BucketNotFound{Bucket: bucket})
}
if err != nil {
return "", probe.NewError(InternalError{})
}
objectPath := filepath.Join(bucketPath, object)
objectDir := filepath.Dir(objectPath)
if _, err = os.Stat(objectDir); os.IsNotExist(err) {
err = os.MkdirAll(objectDir, 0700)
if err != nil {
return "", probe.NewError(err)
}
}
id := []byte(strconv.FormatInt(rand.Int63(), 10) + bucket + object + time.Now().String())
uploadIDSum := sha512.Sum512(id)
uploadID := base64.URLEncoding.EncodeToString(uploadIDSum[:])[:47]
multiPartfile, err := os.OpenFile(objectPath+"$multiparts", os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
return "", probe.NewError(err)
}
defer multiPartfile.Close()
mpartSession := new(MultipartSession)
mpartSession.TotalParts = 0
mpartSession.UploadID = uploadID
mpartSession.Initiated = time.Now().UTC()
var parts []*PartMetadata
mpartSession.Parts = parts
fs.multiparts.ActiveSession[object] = mpartSession
encoder := json.NewEncoder(multiPartfile)
err = encoder.Encode(mpartSession)
if err != nil {
return "", probe.NewError(err)
}
if err := SaveMultipartsSession(fs.multiparts); err != nil {
return "", err.Trace()
}
return uploadID, nil
}
// partNumber is a sortable interface for Part slice
type partNumber []*PartMetadata
func (a partNumber) Len() int { return len(a) }
func (a partNumber) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a partNumber) Less(i, j int) bool { return a[i].PartNumber < a[j].PartNumber }
// CreateObjectPart - create a part in a multipart session
func (fs API) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum string, partID int, size int64, data io.Reader, signature *Signature) (string, *probe.Error) {
fs.lock.Lock()
defer fs.lock.Unlock()
if partID <= 0 {
return "", probe.NewError(errors.New("invalid part id, cannot be zero or less than zero"))
}
// check bucket name valid
if !IsValidBucket(bucket) {
return "", probe.NewError(BucketNameInvalid{Bucket: bucket})
}
// verify object path legal
if !IsValidObjectName(object) {
return "", probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
if !fs.isValidUploadID(object, uploadID) {
return "", probe.NewError(InvalidUploadID{UploadID: uploadID})
}
if strings.TrimSpace(expectedMD5Sum) != "" {
expectedMD5SumBytes, err := base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum))
if err != nil {
// pro-actively close the connection
return "", probe.NewError(InvalidDigest{Md5: expectedMD5Sum})
}
expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes)
}
bucketPath := filepath.Join(fs.path, bucket)
if _, err := os.Stat(bucketPath); err != nil {
// check bucket exists
if os.IsNotExist(err) {
return "", probe.NewError(BucketNotFound{Bucket: bucket})
}
if err != nil {
return "", probe.NewError(InternalError{})
}
}
objectPath := filepath.Join(bucketPath, object)
partPath := objectPath + fmt.Sprintf("$%d", partID)
partFile, err := atomic.FileCreate(partPath)
if err != nil {
return "", probe.NewError(err)
}
h := md5.New()
sh := sha256.New()
mw := io.MultiWriter(partFile, h, sh)
_, err = io.CopyN(mw, data, size)
if err != nil {
partFile.CloseAndPurge()
return "", probe.NewError(err)
}
md5sum := hex.EncodeToString(h.Sum(nil))
// Verify if the written object is equal to what is expected, only if it is requested as such
if strings.TrimSpace(expectedMD5Sum) != "" {
if err := isMD5SumEqual(strings.TrimSpace(expectedMD5Sum), md5sum); err != nil {
partFile.CloseAndPurge()
return "", probe.NewError(BadDigest{Md5: expectedMD5Sum, Bucket: bucket, Object: object})
}
}
if signature != nil {
ok, perr := signature.DoesSignatureMatch(hex.EncodeToString(sh.Sum(nil)))
if perr != nil {
partFile.CloseAndPurge()
return "", perr.Trace()
}
if !ok {
partFile.CloseAndPurge()
return "", probe.NewError(SignatureDoesNotMatch{})
}
}
partFile.File.Sync()
partFile.Close()
fi, err := os.Stat(partPath)
if err != nil {
return "", probe.NewError(err)
}
partMetadata := PartMetadata{}
partMetadata.ETag = md5sum
partMetadata.PartNumber = partID
partMetadata.Size = fi.Size()
partMetadata.LastModified = fi.ModTime()
multiPartfile, err := os.OpenFile(objectPath+"$multiparts", os.O_RDWR|os.O_APPEND, 0600)
if err != nil {
return "", probe.NewError(err)
}
defer multiPartfile.Close()
var deserializedMultipartSession MultipartSession
decoder := json.NewDecoder(multiPartfile)
err = decoder.Decode(&deserializedMultipartSession)
if err != nil {
return "", probe.NewError(err)
}
deserializedMultipartSession.Parts = append(deserializedMultipartSession.Parts, &partMetadata)
deserializedMultipartSession.TotalParts++
fs.multiparts.ActiveSession[object] = &deserializedMultipartSession
sort.Sort(partNumber(deserializedMultipartSession.Parts))
encoder := json.NewEncoder(multiPartfile)
err = encoder.Encode(&deserializedMultipartSession)
if err != nil {
return "", probe.NewError(err)
}
return partMetadata.ETag, nil
}
// CompleteMultipartUpload - complete a multipart upload and persist the data
func (fs API) CompleteMultipartUpload(bucket, object, uploadID string, data io.Reader, signature *Signature) (ObjectMetadata, *probe.Error) {
fs.lock.Lock()
defer fs.lock.Unlock()
// check bucket name valid
if !IsValidBucket(bucket) {
return ObjectMetadata{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
}
// verify object path legal
if !IsValidObjectName(object) {
return ObjectMetadata{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
if !fs.isValidUploadID(object, uploadID) {
return ObjectMetadata{}, probe.NewError(InvalidUploadID{UploadID: uploadID})
}
bucketPath := filepath.Join(fs.path, bucket)
if _, err := os.Stat(bucketPath); err != nil {
// check bucket exists
if os.IsNotExist(err) {
return ObjectMetadata{}, probe.NewError(BucketNotFound{Bucket: bucket})
}
return ObjectMetadata{}, probe.NewError(InternalError{})
}
objectPath := filepath.Join(bucketPath, object)
file, err := atomic.FileCreate(objectPath)
if err != nil {
return ObjectMetadata{}, probe.NewError(err)
}
h := md5.New()
mw := io.MultiWriter(file, h)
partBytes, err := ioutil.ReadAll(data)
if err != nil {
file.CloseAndPurge()
return ObjectMetadata{}, probe.NewError(err)
}
if signature != nil {
sh := sha256.New()
sh.Write(partBytes)
ok, perr := signature.DoesSignatureMatch(hex.EncodeToString(sh.Sum(nil)))
if perr != nil {
file.CloseAndPurge()
return ObjectMetadata{}, probe.NewError(err)
}
if !ok {
file.CloseAndPurge()
return ObjectMetadata{}, probe.NewError(SignatureDoesNotMatch{})
}
}
parts := &CompleteMultipartUpload{}
if err := xml.Unmarshal(partBytes, parts); err != nil {
file.CloseAndPurge()
return ObjectMetadata{}, probe.NewError(MalformedXML{})
}
if !sort.IsSorted(completedParts(parts.Part)) {
file.CloseAndPurge()
return ObjectMetadata{}, probe.NewError(InvalidPartOrder{})
}
if err := fs.concatParts(parts, objectPath, mw); err != nil {
file.CloseAndPurge()
return ObjectMetadata{}, err.Trace()
}
delete(fs.multiparts.ActiveSession, object)
for _, part := range parts.Part {
err = os.Remove(objectPath + fmt.Sprintf("$%d", part.PartNumber))
if err != nil {
file.CloseAndPurge()
return ObjectMetadata{}, probe.NewError(err)
}
}
if err := os.Remove(objectPath + "$multiparts"); err != nil {
file.CloseAndPurge()
return ObjectMetadata{}, probe.NewError(err)
}
if err := SaveMultipartsSession(fs.multiparts); err != nil {
file.CloseAndPurge()
return ObjectMetadata{}, err.Trace()
}
file.File.Sync()
file.Close()
st, err := os.Stat(objectPath)
if err != nil {
return ObjectMetadata{}, probe.NewError(err)
}
newObject := ObjectMetadata{
Bucket: bucket,
Object: object,
Created: st.ModTime(),
Size: st.Size(),
ContentType: "application/octet-stream",
Md5: hex.EncodeToString(h.Sum(nil)),
}
return newObject, nil
}
// ListObjectParts - list parts from incomplete multipart session for a given ObjectResourcesMetadata
func (fs API) ListObjectParts(bucket, object string, resources ObjectResourcesMetadata) (ObjectResourcesMetadata, *probe.Error) {
fs.lock.Lock()
defer fs.lock.Unlock()
// check bucket name valid
if !IsValidBucket(bucket) {
return ObjectResourcesMetadata{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
}
// verify object path legal
if !IsValidObjectName(object) {
return ObjectResourcesMetadata{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
if !fs.isValidUploadID(object, resources.UploadID) {
return ObjectResourcesMetadata{}, probe.NewError(InvalidUploadID{UploadID: resources.UploadID})
}
objectResourcesMetadata := resources
objectResourcesMetadata.Bucket = bucket
objectResourcesMetadata.Object = object
var startPartNumber int
switch {
case objectResourcesMetadata.PartNumberMarker == 0:
startPartNumber = 1
default:
startPartNumber = objectResourcesMetadata.PartNumberMarker
}
bucketPath := filepath.Join(fs.path, bucket)
_, err := os.Stat(bucketPath)
// check bucket exists
if os.IsNotExist(err) {
return ObjectResourcesMetadata{}, probe.NewError(BucketNotFound{Bucket: bucket})
}
if err != nil {
return ObjectResourcesMetadata{}, probe.NewError(InternalError{})
}
objectPath := filepath.Join(bucketPath, object)
multiPartfile, err := os.OpenFile(objectPath+"$multiparts", os.O_RDONLY, 0600)
if err != nil {
return ObjectResourcesMetadata{}, probe.NewError(err)
}
defer multiPartfile.Close()
var deserializedMultipartSession MultipartSession
decoder := json.NewDecoder(multiPartfile)
err = decoder.Decode(&deserializedMultipartSession)
if err != nil {
return ObjectResourcesMetadata{}, probe.NewError(err)
}
var parts []*PartMetadata
for i := startPartNumber; i <= deserializedMultipartSession.TotalParts; i++ {
if len(parts) > objectResourcesMetadata.MaxParts {
sort.Sort(partNumber(parts))
objectResourcesMetadata.IsTruncated = true
objectResourcesMetadata.Part = parts
objectResourcesMetadata.NextPartNumberMarker = i
return objectResourcesMetadata, nil
}
parts = append(parts, deserializedMultipartSession.Parts[i-1])
}
sort.Sort(partNumber(parts))
objectResourcesMetadata.Part = parts
return objectResourcesMetadata, nil
}
// AbortMultipartUpload - abort an incomplete multipart session
func (fs API) AbortMultipartUpload(bucket, object, uploadID string) *probe.Error {
fs.lock.Lock()
defer fs.lock.Unlock()
// check bucket name valid
if !IsValidBucket(bucket) {
return probe.NewError(BucketNameInvalid{Bucket: bucket})
}
// verify object path legal
if !IsValidObjectName(object) {
return probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
if !fs.isValidUploadID(object, uploadID) {
return probe.NewError(InvalidUploadID{UploadID: uploadID})
}
bucketPath := filepath.Join(fs.path, bucket)
_, err := os.Stat(bucketPath)
// check bucket exists
if os.IsNotExist(err) {
return probe.NewError(BucketNotFound{Bucket: bucket})
}
if err != nil {
return probe.NewError(InternalError{})
}
objectPath := filepath.Join(bucketPath, object)
for _, part := range fs.multiparts.ActiveSession[object].Parts {
err = os.RemoveAll(objectPath + fmt.Sprintf("$%d", part.PartNumber))
if err != nil {
return probe.NewError(err)
}
}
delete(fs.multiparts.ActiveSession, object)
err = os.RemoveAll(objectPath + "$multiparts")
if err != nil {
return probe.NewError(err)
}
return nil
}

294
pkg/fs/fs-object.go Normal file
View File

@@ -0,0 +1,294 @@
/*
* Minio Cloud Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package fs
import (
"bytes"
"io"
"os"
"path/filepath"
"strings"
"crypto/md5"
"encoding/base64"
"encoding/hex"
"errors"
"runtime"
"github.com/minio/minio-xl/pkg/atomic"
"github.com/minio/minio-xl/pkg/crypto/sha256"
"github.com/minio/minio-xl/pkg/probe"
)
/// Object Operations
// GetObject - GET object
func (fs API) GetObject(w io.Writer, bucket, object string, start, length int64) (int64, *probe.Error) {
fs.lock.Lock()
defer fs.lock.Unlock()
// validate bucket
if !IsValidBucket(bucket) {
return 0, probe.NewError(BucketNameInvalid{Bucket: bucket})
}
// validate object
if !IsValidObjectName(object) {
return 0, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
objectPath := filepath.Join(fs.path, bucket, object)
filestat, err := os.Stat(objectPath)
switch err := err.(type) {
case nil:
if filestat.IsDir() {
return 0, probe.NewError(ObjectNotFound{Bucket: bucket, Object: object})
}
default:
if os.IsNotExist(err) {
return 0, probe.NewError(ObjectNotFound{Bucket: bucket, Object: object})
}
return 0, probe.NewError(err)
}
file, err := os.Open(objectPath)
if err != nil {
return 0, probe.NewError(err)
}
defer file.Close()
_, err = file.Seek(start, os.SEEK_SET)
if err != nil {
return 0, probe.NewError(err)
}
var count int64
if length > 0 {
count, err = io.CopyN(w, file, length)
if err != nil {
return count, probe.NewError(err)
}
} else {
count, err = io.Copy(w, file)
if err != nil {
return count, probe.NewError(err)
}
}
return count, nil
}
// GetObjectMetadata - HEAD object
func (fs API) GetObjectMetadata(bucket, object string) (ObjectMetadata, *probe.Error) {
fs.lock.Lock()
defer fs.lock.Unlock()
if !IsValidBucket(bucket) {
return ObjectMetadata{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
}
if !IsValidObjectName(object) {
return ObjectMetadata{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: bucket})
}
metadata, err := getMetadata(fs.path, bucket, object)
if err != nil {
return ObjectMetadata{}, err.Trace(bucket, object)
}
if metadata.Mode.IsDir() {
return ObjectMetadata{}, probe.NewError(ObjectNotFound{Bucket: bucket, Object: object})
}
return metadata, nil
}
func getMetadata(rootPath, bucket, object string) (ObjectMetadata, *probe.Error) {
// Do not use filepath.Join() since filepath.Join strips off any object names with '/', use them as is
// in a static manner so that we can send a proper 'ObjectNotFound' reply back upon os.Stat()
var objectPath string
if runtime.GOOS == "windows" {
objectPath = rootPath + "\\" + bucket + "\\" + object
} else {
objectPath = rootPath + "/" + bucket + "/" + object
}
stat, err := os.Stat(objectPath)
if err != nil {
if os.IsNotExist(err) {
return ObjectMetadata{}, probe.NewError(ObjectNotFound{Bucket: bucket, Object: object})
}
return ObjectMetadata{}, probe.NewError(err)
}
contentType := "application/octet-stream"
metadata := ObjectMetadata{
Bucket: bucket,
Object: object,
Created: stat.ModTime(),
Size: stat.Size(),
ContentType: contentType,
Mode: stat.Mode(),
}
return metadata, nil
}
// isMD5SumEqual - returns error if md5sum mismatches, success its `nil`
func isMD5SumEqual(expectedMD5Sum, actualMD5Sum string) *probe.Error {
if strings.TrimSpace(expectedMD5Sum) != "" && strings.TrimSpace(actualMD5Sum) != "" {
expectedMD5SumBytes, err := hex.DecodeString(expectedMD5Sum)
if err != nil {
return probe.NewError(err)
}
actualMD5SumBytes, err := hex.DecodeString(actualMD5Sum)
if err != nil {
return probe.NewError(err)
}
if !bytes.Equal(expectedMD5SumBytes, actualMD5SumBytes) {
return probe.NewError(BadDigest{Md5: expectedMD5Sum})
}
return nil
}
return probe.NewError(errors.New("invalid argument"))
}
// CreateObject - PUT object
func (fs API) CreateObject(bucket, object, expectedMD5Sum string, size int64, data io.Reader, signature *Signature) (ObjectMetadata, *probe.Error) {
fs.lock.Lock()
defer fs.lock.Unlock()
// check bucket name valid
if !IsValidBucket(bucket) {
return ObjectMetadata{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
}
// check bucket exists
if _, err := os.Stat(filepath.Join(fs.path, bucket)); os.IsNotExist(err) {
return ObjectMetadata{}, probe.NewError(BucketNotFound{Bucket: bucket})
}
// verify object path legal
if !IsValidObjectName(object) {
return ObjectMetadata{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
// get object path
objectPath := filepath.Join(fs.path, bucket, object)
if strings.TrimSpace(expectedMD5Sum) != "" {
expectedMD5SumBytes, err := base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum))
if err != nil {
// pro-actively close the connection
return ObjectMetadata{}, probe.NewError(InvalidDigest{Md5: expectedMD5Sum})
}
expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes)
}
// write object
file, err := atomic.FileCreate(objectPath)
if err != nil {
return ObjectMetadata{}, probe.NewError(err)
}
h := md5.New()
sh := sha256.New()
mw := io.MultiWriter(file, h, sh)
if size > 0 {
_, err = io.CopyN(mw, data, size)
if err != nil {
file.CloseAndPurge()
return ObjectMetadata{}, probe.NewError(err)
}
} else {
_, err = io.Copy(mw, data)
if err != nil {
file.CloseAndPurge()
return ObjectMetadata{}, probe.NewError(err)
}
}
md5Sum := hex.EncodeToString(h.Sum(nil))
// Verify if the written object is equal to what is expected, only if it is requested as such
if strings.TrimSpace(expectedMD5Sum) != "" {
if err := isMD5SumEqual(strings.TrimSpace(expectedMD5Sum), md5Sum); err != nil {
file.CloseAndPurge()
return ObjectMetadata{}, probe.NewError(BadDigest{Md5: expectedMD5Sum, Bucket: bucket, Object: object})
}
}
sha256Sum := hex.EncodeToString(sh.Sum(nil))
if signature != nil {
ok, perr := signature.DoesSignatureMatch(sha256Sum)
if perr != nil {
file.CloseAndPurge()
return ObjectMetadata{}, perr.Trace()
}
if !ok {
file.CloseAndPurge()
return ObjectMetadata{}, probe.NewError(SignatureDoesNotMatch{})
}
}
file.File.Sync()
file.Close()
st, err := os.Stat(objectPath)
if err != nil {
return ObjectMetadata{}, probe.NewError(err)
}
newObject := ObjectMetadata{
Bucket: bucket,
Object: object,
Created: st.ModTime(),
Size: st.Size(),
ContentType: "application/octet-stream",
Md5: md5Sum,
}
return newObject, nil
}
// DeleteObject - delete and object
func (fs API) DeleteObject(bucket, object string) *probe.Error {
fs.lock.Lock()
defer fs.lock.Unlock()
// check bucket name valid
if !IsValidBucket(bucket) {
return probe.NewError(BucketNameInvalid{Bucket: bucket})
}
// check bucket exists
if _, err := os.Stat(filepath.Join(fs.path, bucket)); os.IsNotExist(err) {
return probe.NewError(BucketNotFound{Bucket: bucket})
}
// verify object path legal
if !IsValidObjectName(object) {
return probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
// Do not use filepath.Join() since filepath.Join strips off any object names with '/', use them as is
// in a static manner so that we can send a proper 'ObjectNotFound' reply back upon os.Stat()
var objectPath string
if runtime.GOOS == "windows" {
objectPath = fs.path + "\\" + bucket + "\\" + object
} else {
objectPath = fs.path + "/" + bucket + "/" + object
}
_, err := os.Stat(objectPath)
if err != nil {
if os.IsNotExist(err) {
return probe.NewError(ObjectNotFound{Bucket: bucket, Object: object})
}
return probe.NewError(err)
}
if err := os.Remove(objectPath); err != nil {
return probe.NewError(err)
}
return nil
}

435
pkg/fs/fs.go Normal file
View File

@@ -0,0 +1,435 @@
/*
* Minio Cloud Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package fs
import (
"os"
"runtime"
"strings"
"sync"
"time"
"io/ioutil"
"path/filepath"
"github.com/minio/minio-xl/pkg/probe"
)
// API - local variables
type API struct {
path string
lock *sync.Mutex
multiparts *Multiparts
}
// MultipartSession holds active session information
type MultipartSession struct {
TotalParts int
UploadID string
Initiated time.Time
Parts []*PartMetadata
}
// Multiparts collection of many parts
type Multiparts struct {
Version string `json:"version"`
ActiveSession map[string]*MultipartSession `json:"activeSessions"`
}
// New instantiate a new donut
func New(path string) (CloudStorage, *probe.Error) {
var err *probe.Error
// load multiparts session from disk
var multiparts *Multiparts
multiparts, err = loadMultipartsSession()
if err != nil {
if os.IsNotExist(err.ToGoError()) {
multiparts = &Multiparts{
Version: "1",
ActiveSession: make(map[string]*MultipartSession),
}
if err := SaveMultipartsSession(multiparts); err != nil {
return nil, err.Trace()
}
} else {
return nil, err.Trace()
}
}
a := API{
path: path,
lock: new(sync.Mutex),
}
a.multiparts = multiparts
return a, nil
}
/// Bucket Operations
// DeleteBucket - delete bucket
func (fs API) DeleteBucket(bucket string) *probe.Error {
fs.lock.Lock()
defer fs.lock.Unlock()
// verify bucket path legal
if !IsValidBucket(bucket) {
return probe.NewError(BucketNameInvalid{Bucket: bucket})
}
bucketDir := filepath.Join(fs.path, bucket)
// check bucket exists
if _, err := os.Stat(bucketDir); os.IsNotExist(err) {
return probe.NewError(BucketNotFound{Bucket: bucket})
}
files, err := ioutil.ReadDir(bucketDir)
if err != nil {
return probe.NewError(err)
}
if len(files) > 0 {
return probe.NewError(BucketNotEmpty{Bucket: bucket})
}
if err := os.Remove(bucketDir); err != nil {
return probe.NewError(err)
}
return nil
}
// ListBuckets - Get service
func (fs API) ListBuckets() ([]BucketMetadata, *probe.Error) {
fs.lock.Lock()
defer fs.lock.Unlock()
files, err := ioutil.ReadDir(fs.path)
if err != nil {
return []BucketMetadata{}, probe.NewError(err)
}
var metadataList []BucketMetadata
for _, file := range files {
if !file.IsDir() {
// if files found ignore them
continue
}
if file.IsDir() {
// if directories found with odd names, skip them too
if !IsValidBucket(file.Name()) {
continue
}
}
metadata := BucketMetadata{
Name: file.Name(),
Created: file.ModTime(),
}
metadataList = append(metadataList, metadata)
}
return metadataList, nil
}
// MakeBucket - PUT Bucket
func (fs API) MakeBucket(bucket, acl string) *probe.Error {
fs.lock.Lock()
defer fs.lock.Unlock()
// verify bucket path legal
if !IsValidBucket(bucket) {
return probe.NewError(BucketNameInvalid{Bucket: bucket})
}
// get bucket path
bucketDir := filepath.Join(fs.path, bucket)
// check if bucket exists
if _, err := os.Stat(bucketDir); err == nil {
return probe.NewError(BucketExists{
Bucket: bucket,
})
}
// make bucket
err := os.Mkdir(bucketDir, aclToPerm(acl))
if err != nil {
return probe.NewError(err)
}
return nil
}
// GetBucketMetadata -
func (fs API) GetBucketMetadata(bucket string) (BucketMetadata, *probe.Error) {
fs.lock.Lock()
defer fs.lock.Unlock()
if !IsValidBucket(bucket) {
return BucketMetadata{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
}
// get bucket path
bucketDir := filepath.Join(fs.path, bucket)
bucketMetadata := BucketMetadata{}
fi, err := os.Stat(bucketDir)
// check if bucket exists
if os.IsNotExist(err) {
return BucketMetadata{}, probe.NewError(BucketNotFound{Bucket: bucket})
}
if err != nil {
return BucketMetadata{}, probe.NewError(err)
}
bucketMetadata.Name = fi.Name()
bucketMetadata.Created = fi.ModTime()
bucketMetadata.ACL = permToACL(fi.Mode())
return bucketMetadata, nil
}
// permToACL - convert perm to meaningful ACL
func permToACL(mode os.FileMode) BucketACL {
switch mode.Perm() {
case os.FileMode(0700):
return BucketACL("private")
case os.FileMode(0500):
return BucketACL("public-read")
case os.FileMode(0777):
return BucketACL("public-read-write")
default:
return BucketACL("private")
}
}
// aclToPerm - convert acl to filesystem mode
func aclToPerm(acl string) os.FileMode {
switch acl {
case "private":
return os.FileMode(0700)
case "public-read":
return os.FileMode(0500)
case "public-read-write":
return os.FileMode(0777)
default:
return os.FileMode(0700)
}
}
// SetBucketMetadata -
func (fs API) SetBucketMetadata(bucket string, metadata map[string]string) *probe.Error {
fs.lock.Lock()
defer fs.lock.Unlock()
if !IsValidBucket(bucket) {
return probe.NewError(BucketNameInvalid{Bucket: bucket})
}
acl := metadata["acl"]
if !IsValidBucketACL(acl) {
return probe.NewError(InvalidACL{ACL: acl})
}
// get bucket path
bucketDir := filepath.Join(fs.path, bucket)
err := os.Chmod(bucketDir, aclToPerm(acl))
if err != nil {
return probe.NewError(err)
}
return nil
}
// ListObjects - GET bucket (list objects)
func (fs API) ListObjects(bucket string, resources BucketResourcesMetadata) ([]ObjectMetadata, BucketResourcesMetadata, *probe.Error) {
if !IsValidBucket(bucket) {
return nil, resources, probe.NewError(BucketNameInvalid{Bucket: bucket})
}
if resources.Prefix != "" && IsValidObjectName(resources.Prefix) == false {
return nil, resources, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: resources.Prefix})
}
p := bucketDir{}
rootPrefix := filepath.Join(fs.path, bucket)
// check bucket exists
if _, err := os.Stat(rootPrefix); os.IsNotExist(err) {
return nil, resources, probe.NewError(BucketNotFound{Bucket: bucket})
}
p.root = rootPrefix
/// automatically treat "/" delimiter as "\\" delimiter on windows due to its path constraints.
if resources.Delimiter == "/" {
if runtime.GOOS == "windows" {
resources.Delimiter = "\\"
}
}
// If delimiter is supplied make sure that paging doesn't go deep, treat it as simple directory listing.
if resources.Delimiter != "" {
files, err := ioutil.ReadDir(filepath.Join(rootPrefix, resources.Prefix))
if err != nil {
if os.IsNotExist(err) {
return nil, resources, probe.NewError(ObjectNotFound{Bucket: bucket, Object: resources.Prefix})
}
return nil, resources, probe.NewError(err)
}
for _, fl := range files {
prefix := fl.Name()
if resources.Prefix != "" {
prefix = filepath.Join(resources.Prefix, fl.Name())
}
p.files = append(p.files, contentInfo{
Prefix: prefix,
Size: fl.Size(),
Mode: fl.Mode(),
ModTime: fl.ModTime(),
FileInfo: fl,
})
}
} else {
var files []contentInfo
getAllFiles := func(fp string, fl os.FileInfo, err error) error {
// If any error return back quickly
if err != nil {
return err
}
if strings.HasSuffix(fp, "$multiparts") {
return nil
}
// if file pointer equals to rootPrefix - discard it
if fp == p.root {
return nil
}
if len(files) > resources.Maxkeys {
return ErrSkipFile
}
// Split the root prefix from the incoming file pointer
realFp := ""
if runtime.GOOS == "windows" {
if splits := strings.Split(fp, p.root+"\\"); len(splits) > 1 {
realFp = splits[1]
}
} else {
if splits := strings.Split(fp, p.root+"/"); len(splits) > 1 {
realFp = splits[1]
}
}
// If path is a directory and has a prefix verify if the file pointer
// has the prefix if it does not skip the directory.
if fl.Mode().IsDir() {
if resources.Prefix != "" {
if !strings.HasPrefix(fp, filepath.Join(p.root, resources.Prefix)) {
return ErrSkipDir
}
}
}
// If path is a directory and has a marker verify if the file split file pointer
// is lesser than the Marker top level directory if yes skip it.
if fl.Mode().IsDir() {
if resources.Marker != "" {
if realFp != "" {
if runtime.GOOS == "windows" {
if realFp < strings.Split(resources.Marker, "\\")[0] {
return ErrSkipDir
}
} else {
if realFp < strings.Split(resources.Marker, "/")[0] {
return ErrSkipDir
}
}
}
}
}
// If regular file verify
if fl.Mode().IsRegular() {
// If marker is present this will be used to check if filepointer is
// lexically higher than then Marker
if realFp != "" {
if resources.Marker != "" {
if realFp > resources.Marker {
files = append(files, contentInfo{
Prefix: realFp,
Size: fl.Size(),
Mode: fl.Mode(),
ModTime: fl.ModTime(),
FileInfo: fl,
})
}
} else {
files = append(files, contentInfo{
Prefix: realFp,
Size: fl.Size(),
Mode: fl.Mode(),
ModTime: fl.ModTime(),
FileInfo: fl,
})
}
}
}
// If file is a symlink follow it and populate values.
if fl.Mode()&os.ModeSymlink == os.ModeSymlink {
st, err := os.Stat(fp)
if err != nil {
return nil
}
// If marker is present this will be used to check if filepointer is
// lexically higher than then Marker
if realFp != "" {
if resources.Marker != "" {
if realFp > resources.Marker {
files = append(files, contentInfo{
Prefix: realFp,
Size: st.Size(),
Mode: st.Mode(),
ModTime: st.ModTime(),
FileInfo: st,
})
}
} else {
files = append(files, contentInfo{
Prefix: realFp,
Size: st.Size(),
Mode: st.Mode(),
ModTime: st.ModTime(),
FileInfo: st,
})
}
}
}
p.files = files
return nil
}
// If no delimiter is specified, crawl through everything.
err := Walk(rootPrefix, getAllFiles)
if err != nil {
if os.IsNotExist(err) {
return nil, resources, probe.NewError(ObjectNotFound{Bucket: bucket, Object: resources.Prefix})
}
return nil, resources, probe.NewError(err)
}
}
var metadataList []ObjectMetadata
var metadata ObjectMetadata
// Filter objects
for _, content := range p.files {
if len(metadataList) == resources.Maxkeys {
resources.IsTruncated = true
if resources.IsTruncated && resources.Delimiter != "" {
resources.NextMarker = metadataList[len(metadataList)-1].Object
}
break
}
if content.Prefix > resources.Marker {
var err *probe.Error
metadata, resources, err = fs.filterObjects(bucket, content, resources)
if err != nil {
return nil, resources, err.Trace()
}
if metadata.Bucket != "" {
metadataList = append(metadataList, metadata)
}
}
}
return metadataList, resources, nil
}

56
pkg/fs/fs_test.go Normal file
View File

@@ -0,0 +1,56 @@
/*
* Minio Cloud Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package fs
import (
"io/ioutil"
"os"
"path/filepath"
"testing"
. "gopkg.in/check.v1"
)
func Test(t *testing.T) { TestingT(t) }
type MySuite struct{}
var _ = Suite(&MySuite{})
func (s *MySuite) TestAPISuite(c *C) {
var storageList []string
create := func() CloudStorage {
configPath, err := ioutil.TempDir(os.TempDir(), "minio-")
c.Check(err, IsNil)
path, err := ioutil.TempDir(os.TempDir(), "minio-")
c.Check(err, IsNil)
SetFSMultipartsConfigPath(filepath.Join(configPath, "multiparts.json"))
storageList = append(storageList, path)
store, perr := New(path)
c.Check(perr, IsNil)
return store
}
APITestSuite(c, create)
defer removeRoots(c, storageList)
}
func removeRoots(c *C, roots []string) {
for _, root := range roots {
err := os.RemoveAll(root)
c.Check(err, IsNil)
}
}

55
pkg/fs/interfaces.go Normal file
View File

@@ -0,0 +1,55 @@
/*
* Minio Cloud Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package fs
import (
"io"
"github.com/minio/minio-xl/pkg/probe"
)
// CloudStorage is a fs cloud storage interface
type CloudStorage interface {
// Storage service operations
GetBucketMetadata(bucket string) (BucketMetadata, *probe.Error)
SetBucketMetadata(bucket string, metadata map[string]string) *probe.Error
ListBuckets() ([]BucketMetadata, *probe.Error)
MakeBucket(bucket, acl string) *probe.Error
DeleteBucket(bucket string) *probe.Error
// Bucket operations
ListObjects(bucket string, resources BucketResourcesMetadata) ([]ObjectMetadata, BucketResourcesMetadata, *probe.Error)
// Object operations
GetObject(w io.Writer, bucket, object string, start, length int64) (int64, *probe.Error)
GetObjectMetadata(bucket, object string) (ObjectMetadata, *probe.Error)
// bucket, object, expectedMD5Sum, size, reader, metadata, signature
CreateObject(bucket, object, md5sum string, size int64, data io.Reader, signature *Signature) (ObjectMetadata, *probe.Error)
DeleteObject(bucket, object string) *probe.Error
Multipart
}
// Multipart API
type Multipart interface {
NewMultipartUpload(bucket, object string) (string, *probe.Error)
AbortMultipartUpload(bucket, object, uploadID string) *probe.Error
CreateObjectPart(bucket, object, uploadID, md5sum string, partID int, size int64, data io.Reader, signature *Signature) (string, *probe.Error)
CompleteMultipartUpload(bucket, object, uploadID string, data io.Reader, signature *Signature) (ObjectMetadata, *probe.Error)
ListMultipartUploads(bucket string, resources BucketMultipartResourcesMetadata) (BucketMultipartResourcesMetadata, *probe.Error)
ListObjectParts(bucket, object string, objectResources ObjectResourcesMetadata) (ObjectResourcesMetadata, *probe.Error)
}

157
pkg/fs/postpolicyform.go Normal file
View File

@@ -0,0 +1,157 @@
/*
* Minio Cloud Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package fs
import (
"encoding/json"
"fmt"
"reflect"
"time"
"github.com/minio/minio-xl/pkg/probe"
)
// toString - Safely convert interface to string without causing panic.
func toString(val interface{}) string {
switch v := val.(type) {
case string:
return v
}
return ""
}
// toInteger _ Safely convert interface to integer without causing panic.
func toInteger(val interface{}) int {
switch v := val.(type) {
case int:
return v
}
return 0
}
// isString - Safely check if val is of type string without causing panic.
func isString(val interface{}) bool {
switch val.(type) {
case string:
return true
}
return false
}
// PostPolicyForm provides strict static type conversion and validation for Amazon S3's POST policy JSON string.
type PostPolicyForm struct {
Expiration time.Time // Expiration date and time of the POST policy.
Conditions struct { // Conditional policy structure.
Policies map[string]struct {
Operator string
Value string
}
ContentLengthRange struct {
Min int
Max int
}
}
}
// ParsePostPolicyForm - Parse JSON policy string into typed POostPolicyForm structure.
func ParsePostPolicyForm(policy string) (PostPolicyForm, *probe.Error) {
// Convert po into interfaces and
// perform strict type conversion using reflection.
var rawPolicy struct {
Expiration string `json:"expiration"`
Conditions []interface{} `json:"conditions"`
}
e := json.Unmarshal([]byte(policy), &rawPolicy)
if e != nil {
return PostPolicyForm{}, probe.NewError(e)
}
parsedPolicy := PostPolicyForm{}
// Parse expiry time.
parsedPolicy.Expiration, e = time.Parse(time.RFC3339Nano, rawPolicy.Expiration)
if e != nil {
return PostPolicyForm{}, probe.NewError(e)
}
parsedPolicy.Conditions.Policies = make(map[string]struct {
Operator string
Value string
})
// Parse conditions.
for _, val := range rawPolicy.Conditions {
switch condt := val.(type) {
case map[string]interface{}: // Handle key:value map types.
for k, v := range condt {
if !isString(v) { // Pre-check value type.
// All values must be of type string.
return parsedPolicy, probe.NewError(fmt.Errorf("Unknown type ‘%s’ of conditional field value ‘%s’ found in POST policy form.",
reflect.TypeOf(condt).String(), condt))
}
// {"acl": "public-read" } is an alternate way to indicate - [ "eq", "$acl", "public-read" ]
// In this case we will just collapse this into "eq" for all use cases.
parsedPolicy.Conditions.Policies["$"+k] = struct {
Operator string
Value string
}{
Operator: "eq",
Value: toString(v),
}
}
case []interface{}: // Handle array types.
if len(condt) != 3 { // Return error if we have insufficient elements.
return parsedPolicy, probe.NewError(fmt.Errorf("Malformed conditional fields ‘%s’ of type ‘%s’ found in POST policy form.",
condt, reflect.TypeOf(condt).String()))
}
switch toString(condt[0]) {
case "eq", "starts-with":
for _, v := range condt { // Pre-check all values for type.
if !isString(v) {
// All values must be of type string.
return parsedPolicy, probe.NewError(fmt.Errorf("Unknown type ‘%s’ of conditional field value ‘%s’ found in POST policy form.",
reflect.TypeOf(condt).String(), condt))
}
}
operator, matchType, value := toString(condt[0]), toString(condt[1]), toString(condt[2])
parsedPolicy.Conditions.Policies[matchType] = struct {
Operator string
Value string
}{
Operator: operator,
Value: value,
}
case "content-length-range":
parsedPolicy.Conditions.ContentLengthRange = struct {
Min int
Max int
}{
Min: toInteger(condt[1]),
Max: toInteger(condt[2]),
}
default:
// Condition should be valid.
return parsedPolicy, probe.NewError(fmt.Errorf("Unknown type ‘%s’ of conditional field value ‘%s’ found in POST policy form.",
reflect.TypeOf(condt).String(), condt))
}
default:
return parsedPolicy, probe.NewError(fmt.Errorf("Unknown field ‘%s’ of type ‘%s’ found in POST policy form.",
condt, reflect.TypeOf(condt).String()))
}
}
return parsedPolicy, nil
}

327
pkg/fs/signature.go Normal file
View File

@@ -0,0 +1,327 @@
/*
* Minio Cloud Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package fs
import (
"bytes"
"crypto/hmac"
"encoding/hex"
"net/http"
"net/url"
"regexp"
"sort"
"strconv"
"strings"
"time"
"unicode/utf8"
"github.com/minio/minio-xl/pkg/crypto/sha256"
"github.com/minio/minio-xl/pkg/probe"
)
// Signature - local variables
type Signature struct {
AccessKeyID string
SecretAccessKey string
Presigned bool
PresignedPolicy string
SignedHeaders []string
Signature string
Request *http.Request
}
const (
authHeaderPrefix = "AWS4-HMAC-SHA256"
iso8601Format = "20060102T150405Z"
yyyymmdd = "20060102"
)
// sumHMAC calculate hmac between two input byte array
func sumHMAC(key []byte, data []byte) []byte {
hash := hmac.New(sha256.New, key)
hash.Write(data)
return hash.Sum(nil)
}
// getURLEncodedName encode the strings from UTF-8 byte representations to HTML hex escape sequences
//
// This is necessary since regular url.Parse() and url.Encode() functions do not support UTF-8
// non english characters cannot be parsed due to the nature in which url.Encode() is written
//
// This function on the other hand is a direct replacement for url.Encode() technique to support
// pretty much every UTF-8 character.
func getURLEncodedName(name string) string {
// if object matches reserved string, no need to encode them
reservedNames := regexp.MustCompile("^[a-zA-Z0-9-_.~/]+$")
if reservedNames.MatchString(name) {
return name
}
var encodedName string
for _, s := range name {
if 'A' <= s && s <= 'Z' || 'a' <= s && s <= 'z' || '0' <= s && s <= '9' { // §2.3 Unreserved characters (mark)
encodedName = encodedName + string(s)
continue
}
switch s {
case '-', '_', '.', '~', '/': // §2.3 Unreserved characters (mark)
encodedName = encodedName + string(s)
continue
default:
len := utf8.RuneLen(s)
if len < 0 {
return name
}
u := make([]byte, len)
utf8.EncodeRune(u, s)
for _, r := range u {
hex := hex.EncodeToString([]byte{r})
encodedName = encodedName + "%" + strings.ToUpper(hex)
}
}
}
return encodedName
}
// getCanonicalHeaders generate a list of request headers with their values
func (r Signature) getCanonicalHeaders(signedHeaders map[string][]string) string {
var headers []string
vals := make(map[string][]string)
for k, vv := range signedHeaders {
headers = append(headers, strings.ToLower(k))
vals[strings.ToLower(k)] = vv
}
headers = append(headers, "host")
sort.Strings(headers)
var buf bytes.Buffer
for _, k := range headers {
buf.WriteString(k)
buf.WriteByte(':')
switch {
case k == "host":
buf.WriteString(r.Request.Host)
fallthrough
default:
for idx, v := range vals[k] {
if idx > 0 {
buf.WriteByte(',')
}
buf.WriteString(v)
}
buf.WriteByte('\n')
}
}
return buf.String()
}
// getSignedHeaders generate a string i.e alphabetically sorted, semicolon-separated list of lowercase request header names
func (r Signature) getSignedHeaders(signedHeaders map[string][]string) string {
var headers []string
for k := range signedHeaders {
headers = append(headers, strings.ToLower(k))
}
headers = append(headers, "host")
sort.Strings(headers)
return strings.Join(headers, ";")
}
// extractSignedHeaders extract signed headers from Authorization header
func (r Signature) extractSignedHeaders() map[string][]string {
extractedSignedHeadersMap := make(map[string][]string)
for _, header := range r.SignedHeaders {
val, ok := r.Request.Header[http.CanonicalHeaderKey(header)]
if !ok {
// if not found continue, we will fail later
continue
}
extractedSignedHeadersMap[header] = val
}
return extractedSignedHeadersMap
}
// getCanonicalRequest generate a canonical request of style
//
// canonicalRequest =
// <HTTPMethod>\n
// <CanonicalURI>\n
// <CanonicalQueryString>\n
// <CanonicalHeaders>\n
// <SignedHeaders>\n
// <HashedPayload>
//
func (r *Signature) getCanonicalRequest() string {
payload := r.Request.Header.Get(http.CanonicalHeaderKey("x-amz-content-sha256"))
r.Request.URL.RawQuery = strings.Replace(r.Request.URL.Query().Encode(), "+", "%20", -1)
encodedPath := getURLEncodedName(r.Request.URL.Path)
// convert any space strings back to "+"
encodedPath = strings.Replace(encodedPath, "+", "%20", -1)
canonicalRequest := strings.Join([]string{
r.Request.Method,
encodedPath,
r.Request.URL.RawQuery,
r.getCanonicalHeaders(r.extractSignedHeaders()),
r.getSignedHeaders(r.extractSignedHeaders()),
payload,
}, "\n")
return canonicalRequest
}
// getCanonicalRequest generate a canonical request of style
//
// canonicalRequest =
// <HTTPMethod>\n
// <CanonicalURI>\n
// <CanonicalQueryString>\n
// <CanonicalHeaders>\n
// <SignedHeaders>\n
// <HashedPayload>
//
func (r Signature) getPresignedCanonicalRequest(presignedQuery string) string {
rawQuery := strings.Replace(presignedQuery, "+", "%20", -1)
encodedPath := getURLEncodedName(r.Request.URL.Path)
// convert any space strings back to "+"
encodedPath = strings.Replace(encodedPath, "+", "%20", -1)
canonicalRequest := strings.Join([]string{
r.Request.Method,
encodedPath,
rawQuery,
r.getCanonicalHeaders(r.extractSignedHeaders()),
r.getSignedHeaders(r.extractSignedHeaders()),
"UNSIGNED-PAYLOAD",
}, "\n")
return canonicalRequest
}
// getScope generate a string of a specific date, an AWS region, and a service
func (r Signature) getScope(t time.Time) string {
scope := strings.Join([]string{
t.Format(yyyymmdd),
"milkyway",
"s3",
"aws4_request",
}, "/")
return scope
}
// getStringToSign a string based on selected query values
func (r Signature) getStringToSign(canonicalRequest string, t time.Time) string {
stringToSign := authHeaderPrefix + "\n" + t.Format(iso8601Format) + "\n"
stringToSign = stringToSign + r.getScope(t) + "\n"
stringToSign = stringToSign + hex.EncodeToString(sha256.Sum256([]byte(canonicalRequest)))
return stringToSign
}
// getSigningKey hmac seed to calculate final signature
func (r Signature) getSigningKey(t time.Time) []byte {
secret := r.SecretAccessKey
date := sumHMAC([]byte("AWS4"+secret), []byte(t.Format(yyyymmdd)))
region := sumHMAC(date, []byte("milkyway"))
service := sumHMAC(region, []byte("s3"))
signingKey := sumHMAC(service, []byte("aws4_request"))
return signingKey
}
// getSignature final signature in hexadecimal form
func (r Signature) getSignature(signingKey []byte, stringToSign string) string {
return hex.EncodeToString(sumHMAC(signingKey, []byte(stringToSign)))
}
// DoesPolicySignatureMatch - Verify query headers with post policy
// - http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-HTTPPOSTConstructPolicy.html
// returns true if matches, false otherwise. if error is not nil then it is always false
func (r *Signature) DoesPolicySignatureMatch(date string) (bool, *probe.Error) {
t, err := time.Parse(iso8601Format, date)
if err != nil {
return false, probe.NewError(err)
}
signingKey := r.getSigningKey(t)
stringToSign := string(r.PresignedPolicy)
newSignature := r.getSignature(signingKey, stringToSign)
if newSignature != r.Signature {
return false, nil
}
return true, nil
}
// DoesPresignedSignatureMatch - Verify query headers with presigned signature
// - http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-query-string-auth.html
// returns true if matches, false otherwise. if error is not nil then it is always false
func (r *Signature) DoesPresignedSignatureMatch() (bool, *probe.Error) {
query := make(url.Values)
query.Set("X-Amz-Algorithm", authHeaderPrefix)
var date string
if date = r.Request.URL.Query().Get("X-Amz-Date"); date == "" {
return false, probe.NewError(MissingDateHeader{})
}
t, err := time.Parse(iso8601Format, date)
if err != nil {
return false, probe.NewError(err)
}
if _, ok := r.Request.URL.Query()["X-Amz-Expires"]; !ok {
return false, probe.NewError(MissingExpiresQuery{})
}
expireSeconds, err := strconv.Atoi(r.Request.URL.Query().Get("X-Amz-Expires"))
if err != nil {
return false, probe.NewError(err)
}
if time.Now().UTC().Sub(t) > time.Duration(expireSeconds)*time.Second {
return false, probe.NewError(ExpiredPresignedRequest{})
}
query.Set("X-Amz-Date", t.Format(iso8601Format))
query.Set("X-Amz-Expires", strconv.Itoa(expireSeconds))
query.Set("X-Amz-SignedHeaders", r.getSignedHeaders(r.extractSignedHeaders()))
query.Set("X-Amz-Credential", r.AccessKeyID+"/"+r.getScope(t))
encodedQuery := query.Encode()
newSignature := r.getSignature(r.getSigningKey(t), r.getStringToSign(r.getPresignedCanonicalRequest(encodedQuery), t))
encodedQuery += "&X-Amz-Signature=" + newSignature
if encodedQuery != r.Request.URL.RawQuery {
return false, nil
}
return true, nil
}
// DoesSignatureMatch - Verify authorization header with calculated header in accordance with
// - http://docs.aws.amazon.com/AmazonS3/latest/API/sig-v4-authenticating-requests.html
// returns true if matches, false otherwise. if error is not nil then it is always false
func (r *Signature) DoesSignatureMatch(hashedPayload string) (bool, *probe.Error) {
// set new calulated payload
r.Request.Header.Set("X-Amz-Content-Sha256", hashedPayload)
// Add date if not present throw error
var date string
if date = r.Request.Header.Get(http.CanonicalHeaderKey("x-amz-date")); date == "" {
if date = r.Request.Header.Get("Date"); date == "" {
return false, probe.NewError(MissingDateHeader{})
}
}
t, err := time.Parse(iso8601Format, date)
if err != nil {
return false, probe.NewError(err)
}
canonicalRequest := r.getCanonicalRequest()
stringToSign := r.getStringToSign(canonicalRequest, t)
signingKey := r.getSigningKey(t)
newSignature := r.getSignature(signingKey, stringToSign)
if newSignature != r.Signature {
return false, nil
}
return true, nil
}

108
pkg/fs/walk.go Normal file
View File

@@ -0,0 +1,108 @@
/*
* Minio Cloud Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package fs
import (
"errors"
"os"
"path/filepath"
"sort"
)
// Walk walks the file tree rooted at root, calling walkFn for each file or
// directory in the tree, including root.
func Walk(root string, walkFn WalkFunc) error {
info, err := os.Lstat(root)
if err != nil {
return walkFn(root, nil, err)
}
return walk(root, info, walkFn)
}
// readDirNames reads the directory named by dirname and returns
// a sorted list of directory entries.
func readDirNames(dirname string) ([]string, error) {
f, err := os.Open(dirname)
if err != nil {
return nil, err
}
names, err := f.Readdirnames(-1)
f.Close()
if err != nil {
return nil, err
}
sort.Strings(names)
return names, nil
}
// WalkFunc is the type of the function called for each file or directory
// visited by Walk. The path argument contains the argument to Walk as a
// prefix; that is, if Walk is called with "dir", which is a directory
// containing the file "a", the walk function will be called with argument
// "dir/a". The info argument is the os.FileInfo for the named path.
type WalkFunc func(path string, info os.FileInfo, err error) error
// ErrSkipDir is used as a return value from WalkFuncs to indicate that
// the directory named in the call is to be skipped. It is not returned
// as an error by any function.
var ErrSkipDir = errors.New("skip this directory")
// ErrSkipFile is used as a return value from WalkFuncs to indicate that
// the file named in the call is to be skipped. It is not returned
// as an error by any function.
var ErrSkipFile = errors.New("skip this file")
// walk recursively descends path, calling w.
func walk(path string, info os.FileInfo, walkFn WalkFunc) error {
err := walkFn(path, info, nil)
if err != nil {
if info.Mode().IsDir() && err == ErrSkipDir {
return nil
}
if info.Mode().IsRegular() && err == ErrSkipFile {
return nil
}
return err
}
if !info.IsDir() {
return nil
}
names, err := readDirNames(path)
if err != nil {
return walkFn(path, info, err)
}
for _, name := range names {
filename := filepath.Join(path, name)
fileInfo, err := os.Lstat(filename)
if err != nil {
if err := walkFn(filename, fileInfo, err); err != nil && err != ErrSkipDir && err != ErrSkipFile {
return err
}
} else {
err = walk(filename, fileInfo, walkFn)
if err != nil {
if err == ErrSkipDir || err == ErrSkipFile {
return nil
}
return err
}
}
}
return nil
}