From 5b2fa33bdbc5078736c634cd969439618dfb30ae Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Sat, 17 Oct 2015 19:17:33 -0700 Subject: [PATCH] Implementing min-free-disk --- api-errors.go | 16 +- bucket-handlers.go | 20 +- flags.go | 10 +- main.go | 27 +- object-handlers.go | 29 ++- pkg/fs/acl.go | 22 +- pkg/fs/api_suite_nix_test.go | 36 +-- pkg/fs/api_suite_windows_test.go | 36 +-- pkg/fs/errors.go | 9 + pkg/fs/fs-bucket.go | 434 +++++++++++++++++++++++++++++++ pkg/fs/fs-filter.go | 2 +- pkg/fs/fs-multipart.go | 43 ++- pkg/fs/fs-object.go | 23 +- pkg/fs/fs.go | 421 ++---------------------------- pkg/fs/fs_test.go | 6 +- pkg/fs/interfaces.go | 66 ----- routers.go | 32 +-- server-main.go | 65 +++-- server_fs_test.go | 9 +- typed-errors.go | 34 +-- 20 files changed, 710 insertions(+), 630 deletions(-) create mode 100644 pkg/fs/fs-bucket.go delete mode 100644 pkg/fs/interfaces.go diff --git a/api-errors.go b/api-errors.go index 7c9e5bb09..75ef7709c 100644 --- a/api-errors.go +++ b/api-errors.go @@ -71,11 +71,7 @@ const ( AuthorizationHeaderMalformed MalformedPOSTRequest BucketNotEmpty -) - -// Error codes, non exhaustive list - standard HTTP errors -const ( - NotAcceptable = iota + 31 + RootPathFull ) // APIError code to Error structure map @@ -205,11 +201,6 @@ var errorCodeResponse = map[int]APIError{ Description: "The specified method is not allowed against this resource.", HTTPStatusCode: http.StatusMethodNotAllowed, }, - NotAcceptable: { - Code: "NotAcceptable", - Description: "The requested resource is only capable of generating content not acceptable according to the Accept headers sent in the request.", - HTTPStatusCode: http.StatusNotAcceptable, - }, InvalidPart: { Code: "InvalidPart", Description: "One or more of the specified parts could not be found.", @@ -235,6 +226,11 @@ var errorCodeResponse = map[int]APIError{ Description: "The bucket you tried to delete is not empty.", HTTPStatusCode: http.StatusConflict, }, + RootPathFull: { + Code: "RootPathFull", + Description: "Root path has reached its minimum free disk threshold. Please clear few objects to proceed.", + HTTPStatusCode: http.StatusInternalServerError, + }, } // errorCodeError provides errorCode to Error. It returns empty if the code provided is unknown diff --git a/bucket-handlers.go b/bucket-handlers.go index c70b7ffef..29e17b6eb 100644 --- a/bucket-handlers.go +++ b/bucket-handlers.go @@ -35,7 +35,7 @@ import ( // completed or aborted. This operation returns at most 1,000 multipart // uploads in the response. // -func (api API) ListMultipartUploadsHandler(w http.ResponseWriter, req *http.Request) { +func (api CloudStorageAPI) ListMultipartUploadsHandler(w http.ResponseWriter, req *http.Request) { vars := mux.Vars(req) bucket := vars["bucket"] @@ -83,7 +83,7 @@ func (api API) ListMultipartUploadsHandler(w http.ResponseWriter, req *http.Requ // of the objects in a bucket. You can use the request parameters as selection // criteria to return a subset of the objects in a bucket. // -func (api API) ListObjectsHandler(w http.ResponseWriter, req *http.Request) { +func (api CloudStorageAPI) ListObjectsHandler(w http.ResponseWriter, req *http.Request) { vars := mux.Vars(req) bucket := vars["bucket"] @@ -134,7 +134,7 @@ func (api API) ListObjectsHandler(w http.ResponseWriter, req *http.Request) { // ----------- // This implementation of the GET operation returns a list of all buckets // owned by the authenticated sender of the request. -func (api API) ListBucketsHandler(w http.ResponseWriter, req *http.Request) { +func (api CloudStorageAPI) ListBucketsHandler(w http.ResponseWriter, req *http.Request) { if !api.Anonymous { if isRequestRequiresACLCheck(req) { writeErrorResponse(w, req, AccessDenied, req.URL.Path) @@ -159,7 +159,7 @@ func (api API) ListBucketsHandler(w http.ResponseWriter, req *http.Request) { // PutBucketHandler - PUT Bucket // ---------- // This implementation of the PUT operation creates a new bucket for authenticated request -func (api API) PutBucketHandler(w http.ResponseWriter, req *http.Request) { +func (api CloudStorageAPI) PutBucketHandler(w http.ResponseWriter, req *http.Request) { vars := mux.Vars(req) bucket := vars["bucket"] @@ -239,7 +239,7 @@ func (api API) PutBucketHandler(w http.ResponseWriter, req *http.Request) { // ---------- // This implementation of the POST operation handles object creation with a specified // signature policy in multipart/form-data -func (api API) PostPolicyBucketHandler(w http.ResponseWriter, req *http.Request) { +func (api CloudStorageAPI) PostPolicyBucketHandler(w http.ResponseWriter, req *http.Request) { // if body of request is non-nil then check for validity of Content-Length if req.Body != nil { /// if Content-Length missing, deny the request @@ -294,6 +294,8 @@ func (api API) PostPolicyBucketHandler(w http.ResponseWriter, req *http.Request) if perr != nil { errorIf(perr.Trace(), "CreateObject failed.", nil) switch perr.ToGoError().(type) { + case fs.RootPathFull: + writeErrorResponse(w, req, RootPathFull, req.URL.Path) case fs.BucketNotFound: writeErrorResponse(w, req, NoSuchBucket, req.URL.Path) case fs.BucketNameInvalid: @@ -320,7 +322,7 @@ func (api API) PostPolicyBucketHandler(w http.ResponseWriter, req *http.Request) // PutBucketACLHandler - PUT Bucket ACL // ---------- // This implementation of the PUT operation modifies the bucketACL for authenticated request -func (api API) PutBucketACLHandler(w http.ResponseWriter, req *http.Request) { +func (api CloudStorageAPI) PutBucketACLHandler(w http.ResponseWriter, req *http.Request) { vars := mux.Vars(req) bucket := vars["bucket"] @@ -359,7 +361,7 @@ func (api API) PutBucketACLHandler(w http.ResponseWriter, req *http.Request) { // of a bucket. One must have permission to access the bucket to // know its ``acl``. This operation willl return response of 404 // if bucket not found and 403 for invalid credentials. -func (api API) GetBucketACLHandler(w http.ResponseWriter, req *http.Request) { +func (api CloudStorageAPI) GetBucketACLHandler(w http.ResponseWriter, req *http.Request) { vars := mux.Vars(req) bucket := vars["bucket"] @@ -398,7 +400,7 @@ func (api API) GetBucketACLHandler(w http.ResponseWriter, req *http.Request) { // The operation returns a 200 OK if the bucket exists and you // have permission to access it. Otherwise, the operation might // return responses such as 404 Not Found and 403 Forbidden. -func (api API) HeadBucketHandler(w http.ResponseWriter, req *http.Request) { +func (api CloudStorageAPI) HeadBucketHandler(w http.ResponseWriter, req *http.Request) { vars := mux.Vars(req) bucket := vars["bucket"] @@ -428,7 +430,7 @@ func (api API) HeadBucketHandler(w http.ResponseWriter, req *http.Request) { } // DeleteBucketHandler - Delete bucket -func (api API) DeleteBucketHandler(w http.ResponseWriter, req *http.Request) { +func (api CloudStorageAPI) DeleteBucketHandler(w http.ResponseWriter, req *http.Request) { vars := mux.Vars(req) bucket := vars["bucket"] diff --git a/flags.go b/flags.go index 80de184fd..7ccdb6f1e 100644 --- a/flags.go +++ b/flags.go @@ -28,11 +28,17 @@ var ( Usage: "ADDRESS:PORT for cloud storage access.", } + minFreeDiskFlag = cli.StringFlag{ + Name: "min-free-disk", + Value: "10%", + Usage: "Minimum free disk space required for cloud storage.", + } + ratelimitFlag = cli.IntFlag{ Name: "ratelimit", Hide: true, - Value: 16, - Usage: "Limit for total concurrent requests: [DEFAULT: 16].", + Value: 0, + Usage: "Limit for total concurrent requests: [DEFAULT: 0].", } anonymousFlag = cli.BoolFlag{ diff --git a/main.go b/main.go index e45874e37..4d6ac09ac 100644 --- a/main.go +++ b/main.go @@ -27,15 +27,23 @@ import ( "github.com/minio/cli" ) -// fsConfig - fs http server config -type fsConfig struct { - Address string - Path string - Anonymous bool - TLS bool - CertFile string - KeyFile string - RateLimit int +// serverConfig - http server config +type serverConfig struct { + /// HTTP server options + Address string // Address:Port listening + Anonymous bool // No signature turn off + + /// FS options + Path string // Path to export for cloud storage + MinFreeDisk int64 // Minimum free disk space for filesystem + + // TLS service + TLS bool // TLS on when certs are specified + CertFile string // Domain certificate + KeyFile string // Domain key + + /// Advanced HTTP server options + RateLimit int // Ratelimited server of incoming connections } func init() { @@ -95,6 +103,7 @@ func registerApp() *cli.App { // register all flags registerFlag(addressFlag) + registerFlag(minFreeDiskFlag) registerFlag(ratelimitFlag) registerFlag(anonymousFlag) registerFlag(certFlag) diff --git a/object-handlers.go b/object-handlers.go index 92dd1d29d..6ac8fde0f 100644 --- a/object-handlers.go +++ b/object-handlers.go @@ -33,7 +33,7 @@ const ( // ---------- // This implementation of the GET operation retrieves object. To use GET, // you must have READ access to the object. -func (api API) GetObjectHandler(w http.ResponseWriter, req *http.Request) { +func (api CloudStorageAPI) GetObjectHandler(w http.ResponseWriter, req *http.Request) { var object, bucket string vars := mux.Vars(req) bucket = vars["bucket"] @@ -81,7 +81,7 @@ func (api API) GetObjectHandler(w http.ResponseWriter, req *http.Request) { // HeadObjectHandler - HEAD Object // ----------- // The HEAD operation retrieves metadata from an object without returning the object itself. -func (api API) HeadObjectHandler(w http.ResponseWriter, req *http.Request) { +func (api CloudStorageAPI) HeadObjectHandler(w http.ResponseWriter, req *http.Request) { var object, bucket string vars := mux.Vars(req) bucket = vars["bucket"] @@ -119,7 +119,7 @@ func (api API) HeadObjectHandler(w http.ResponseWriter, req *http.Request) { // PutObjectHandler - PUT Object // ---------- // This implementation of the PUT operation adds an object to a bucket. -func (api API) PutObjectHandler(w http.ResponseWriter, req *http.Request) { +func (api CloudStorageAPI) PutObjectHandler(w http.ResponseWriter, req *http.Request) { var object, bucket string vars := mux.Vars(req) bucket = vars["bucket"] @@ -179,6 +179,8 @@ func (api API) PutObjectHandler(w http.ResponseWriter, req *http.Request) { if err != nil { errorIf(err.Trace(), "CreateObject failed.", nil) switch err.ToGoError().(type) { + case fs.RootPathFull: + writeErrorResponse(w, req, RootPathFull, req.URL.Path) case fs.BucketNotFound: writeErrorResponse(w, req, NoSuchBucket, req.URL.Path) case fs.BucketNameInvalid: @@ -204,10 +206,10 @@ func (api API) PutObjectHandler(w http.ResponseWriter, req *http.Request) { writeSuccessResponse(w) } -/// Multipart API +/// Multipart CloudStorageAPI // NewMultipartUploadHandler - New multipart upload -func (api API) NewMultipartUploadHandler(w http.ResponseWriter, req *http.Request) { +func (api CloudStorageAPI) NewMultipartUploadHandler(w http.ResponseWriter, req *http.Request) { var object, bucket string vars := mux.Vars(req) bucket = vars["bucket"] @@ -225,6 +227,8 @@ func (api API) NewMultipartUploadHandler(w http.ResponseWriter, req *http.Reques if err != nil { errorIf(err.Trace(), "NewMultipartUpload failed.", nil) switch err.ToGoError().(type) { + case fs.RootPathFull: + writeErrorResponse(w, req, RootPathFull, req.URL.Path) case fs.BucketNameInvalid: writeErrorResponse(w, req, InvalidBucketName, req.URL.Path) case fs.BucketNotFound: @@ -248,7 +252,7 @@ func (api API) NewMultipartUploadHandler(w http.ResponseWriter, req *http.Reques } // PutObjectPartHandler - Upload part -func (api API) PutObjectPartHandler(w http.ResponseWriter, req *http.Request) { +func (api CloudStorageAPI) PutObjectPartHandler(w http.ResponseWriter, req *http.Request) { vars := mux.Vars(req) bucket := vars["bucket"] object := vars["object"] @@ -321,6 +325,8 @@ func (api API) PutObjectPartHandler(w http.ResponseWriter, req *http.Request) { if err != nil { errorIf(err.Trace(), "CreateObjectPart failed.", nil) switch err.ToGoError().(type) { + case fs.RootPathFull: + writeErrorResponse(w, req, RootPathFull, req.URL.Path) case fs.InvalidUploadID: writeErrorResponse(w, req, NoSuchUpload, req.URL.Path) case fs.BadDigest: @@ -343,7 +349,7 @@ func (api API) PutObjectPartHandler(w http.ResponseWriter, req *http.Request) { } // AbortMultipartUploadHandler - Abort multipart upload -func (api API) AbortMultipartUploadHandler(w http.ResponseWriter, req *http.Request) { +func (api CloudStorageAPI) AbortMultipartUploadHandler(w http.ResponseWriter, req *http.Request) { vars := mux.Vars(req) bucket := vars["bucket"] object := vars["object"] @@ -356,7 +362,6 @@ func (api API) AbortMultipartUploadHandler(w http.ResponseWriter, req *http.Requ } objectResourcesMetadata := getObjectResources(req.URL.Query()) - err := api.Filesystem.AbortMultipartUpload(bucket, object, objectResourcesMetadata.UploadID) if err != nil { errorIf(err.Trace(), "AbortMutlipartUpload failed.", nil) @@ -380,7 +385,7 @@ func (api API) AbortMultipartUploadHandler(w http.ResponseWriter, req *http.Requ } // ListObjectPartsHandler - List object parts -func (api API) ListObjectPartsHandler(w http.ResponseWriter, req *http.Request) { +func (api CloudStorageAPI) ListObjectPartsHandler(w http.ResponseWriter, req *http.Request) { vars := mux.Vars(req) bucket := vars["bucket"] object := vars["object"] @@ -433,7 +438,7 @@ func (api API) ListObjectPartsHandler(w http.ResponseWriter, req *http.Request) } // CompleteMultipartUploadHandler - Complete multipart upload -func (api API) CompleteMultipartUploadHandler(w http.ResponseWriter, req *http.Request) { +func (api CloudStorageAPI) CompleteMultipartUploadHandler(w http.ResponseWriter, req *http.Request) { vars := mux.Vars(req) bucket := vars["bucket"] object := vars["object"] @@ -497,10 +502,10 @@ func (api API) CompleteMultipartUploadHandler(w http.ResponseWriter, req *http.R w.Write(encodedSuccessResponse) } -/// Delete API +/// Delete CloudStorageAPI // DeleteObjectHandler - Delete object -func (api API) DeleteObjectHandler(w http.ResponseWriter, req *http.Request) { +func (api CloudStorageAPI) DeleteObjectHandler(w http.ResponseWriter, req *http.Request) { vars := mux.Vars(req) bucket := vars["bucket"] object := vars["object"] diff --git a/pkg/fs/acl.go b/pkg/fs/acl.go index 552782c03..51954ef8d 100644 --- a/pkg/fs/acl.go +++ b/pkg/fs/acl.go @@ -1,3 +1,19 @@ +/* + * Minio Cloud Storage, (C) 2015 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package fs import ( @@ -6,7 +22,7 @@ import ( ) // IsPrivateBucket - is private bucket -func (fs API) IsPrivateBucket(bucket string) bool { +func (fs Filesystem) IsPrivateBucket(bucket string) bool { fs.lock.Lock() defer fs.lock.Unlock() // get bucket path @@ -19,7 +35,7 @@ func (fs API) IsPrivateBucket(bucket string) bool { } // IsPublicBucket - is public bucket -func (fs API) IsPublicBucket(bucket string) bool { +func (fs Filesystem) IsPublicBucket(bucket string) bool { fs.lock.Lock() defer fs.lock.Unlock() // get bucket path @@ -32,7 +48,7 @@ func (fs API) IsPublicBucket(bucket string) bool { } // IsReadOnlyBucket - is read only bucket -func (fs API) IsReadOnlyBucket(bucket string) bool { +func (fs Filesystem) IsReadOnlyBucket(bucket string) bool { fs.lock.Lock() defer fs.lock.Unlock() // get bucket path diff --git a/pkg/fs/api_suite_nix_test.go b/pkg/fs/api_suite_nix_test.go index b19867330..c3b50281c 100644 --- a/pkg/fs/api_suite_nix_test.go +++ b/pkg/fs/api_suite_nix_test.go @@ -32,7 +32,7 @@ import ( ) // APITestSuite - collection of API tests -func APITestSuite(c *check.C, create func() CloudStorage) { +func APITestSuite(c *check.C, create func() Filesystem) { testMakeBucket(c, create) testMultipleObjectCreation(c, create) testPaging(c, create) @@ -51,13 +51,13 @@ func APITestSuite(c *check.C, create func() CloudStorage) { testMultipartObjectAbort(c, create) } -func testMakeBucket(c *check.C, create func() CloudStorage) { +func testMakeBucket(c *check.C, create func() Filesystem) { fs := create() err := fs.MakeBucket("bucket", "") c.Assert(err, check.IsNil) } -func testMultipartObjectCreation(c *check.C, create func() CloudStorage) { +func testMultipartObjectCreation(c *check.C, create func() Filesystem) { fs := create() err := fs.MakeBucket("bucket", "") c.Assert(err, check.IsNil) @@ -95,7 +95,7 @@ func testMultipartObjectCreation(c *check.C, create func() CloudStorage) { c.Assert(objectMetadata.Md5, check.Equals, finalExpectedmd5SumHex) } -func testMultipartObjectAbort(c *check.C, create func() CloudStorage) { +func testMultipartObjectAbort(c *check.C, create func() Filesystem) { fs := create() err := fs.MakeBucket("bucket", "") c.Assert(err, check.IsNil) @@ -126,7 +126,7 @@ func testMultipartObjectAbort(c *check.C, create func() CloudStorage) { c.Assert(err, check.IsNil) } -func testMultipleObjectCreation(c *check.C, create func() CloudStorage) { +func testMultipleObjectCreation(c *check.C, create func() Filesystem) { objects := make(map[string][]byte) fs := create() err := fs.MakeBucket("bucket", "") @@ -162,7 +162,7 @@ func testMultipleObjectCreation(c *check.C, create func() CloudStorage) { } } -func testPaging(c *check.C, create func() CloudStorage) { +func testPaging(c *check.C, create func() Filesystem) { fs := create() fs.MakeBucket("bucket", "") resources := BucketResourcesMetadata{} @@ -295,7 +295,7 @@ func testPaging(c *check.C, create func() CloudStorage) { } } -func testObjectOverwriteWorks(c *check.C, create func() CloudStorage) { +func testObjectOverwriteWorks(c *check.C, create func() Filesystem) { fs := create() err := fs.MakeBucket("bucket", "") c.Assert(err, check.IsNil) @@ -321,13 +321,13 @@ func testObjectOverwriteWorks(c *check.C, create func() CloudStorage) { c.Assert(string(bytesBuffer.Bytes()), check.Equals, "three") } -func testNonExistantBucketOperations(c *check.C, create func() CloudStorage) { +func testNonExistantBucketOperations(c *check.C, create func() Filesystem) { fs := create() _, err := fs.CreateObject("bucket", "object", "", int64(len("one")), bytes.NewBufferString("one"), nil) c.Assert(err, check.Not(check.IsNil)) } -func testBucketMetadata(c *check.C, create func() CloudStorage) { +func testBucketMetadata(c *check.C, create func() Filesystem) { fs := create() err := fs.MakeBucket("string", "") c.Assert(err, check.IsNil) @@ -337,7 +337,7 @@ func testBucketMetadata(c *check.C, create func() CloudStorage) { c.Assert(metadata.ACL, check.Equals, BucketACL("private")) } -func testBucketRecreateFails(c *check.C, create func() CloudStorage) { +func testBucketRecreateFails(c *check.C, create func() Filesystem) { fs := create() err := fs.MakeBucket("string", "") c.Assert(err, check.IsNil) @@ -345,7 +345,7 @@ func testBucketRecreateFails(c *check.C, create func() CloudStorage) { c.Assert(err, check.Not(check.IsNil)) } -func testPutObjectInSubdir(c *check.C, create func() CloudStorage) { +func testPutObjectInSubdir(c *check.C, create func() Filesystem) { fs := create() err := fs.MakeBucket("bucket", "") c.Assert(err, check.IsNil) @@ -365,7 +365,7 @@ func testPutObjectInSubdir(c *check.C, create func() CloudStorage) { c.Assert(int64(len(bytesBuffer.Bytes())), check.Equals, length) } -func testListBuckets(c *check.C, create func() CloudStorage) { +func testListBuckets(c *check.C, create func() Filesystem) { fs := create() // test empty list @@ -397,7 +397,7 @@ func testListBuckets(c *check.C, create func() CloudStorage) { c.Assert(err, check.IsNil) } -func testListBucketsOrder(c *check.C, create func() CloudStorage) { +func testListBucketsOrder(c *check.C, create func() Filesystem) { // if implementation contains a map, order of map keys will vary. // this ensures they return in the same order each time for i := 0; i < 10; i++ { @@ -415,7 +415,7 @@ func testListBucketsOrder(c *check.C, create func() CloudStorage) { } } -func testListObjectsTestsForNonExistantBucket(c *check.C, create func() CloudStorage) { +func testListObjectsTestsForNonExistantBucket(c *check.C, create func() Filesystem) { fs := create() resources := BucketResourcesMetadata{Prefix: "", Maxkeys: 1000} objects, resources, err := fs.ListObjects("bucket", resources) @@ -424,7 +424,7 @@ func testListObjectsTestsForNonExistantBucket(c *check.C, create func() CloudSto c.Assert(len(objects), check.Equals, 0) } -func testNonExistantObjectInBucket(c *check.C, create func() CloudStorage) { +func testNonExistantObjectInBucket(c *check.C, create func() Filesystem) { fs := create() err := fs.MakeBucket("bucket", "") c.Assert(err, check.IsNil) @@ -442,7 +442,7 @@ func testNonExistantObjectInBucket(c *check.C, create func() CloudStorage) { } } -func testGetDirectoryReturnsObjectNotFound(c *check.C, create func() CloudStorage) { +func testGetDirectoryReturnsObjectNotFound(c *check.C, create func() Filesystem) { fs := create() err := fs.MakeBucket("bucket", "") c.Assert(err, check.IsNil) @@ -477,7 +477,7 @@ func testGetDirectoryReturnsObjectNotFound(c *check.C, create func() CloudStorag c.Assert(len(byteBuffer2.Bytes()), check.Equals, 0) } -func testDefaultContentType(c *check.C, create func() CloudStorage) { +func testDefaultContentType(c *check.C, create func() Filesystem) { fs := create() err := fs.MakeBucket("bucket", "") c.Assert(err, check.IsNil) @@ -489,7 +489,7 @@ func testDefaultContentType(c *check.C, create func() CloudStorage) { c.Assert(metadata.ContentType, check.Equals, "application/octet-stream") } -func testContentMd5Set(c *check.C, create func() CloudStorage) { +func testContentMd5Set(c *check.C, create func() Filesystem) { fs := create() err := fs.MakeBucket("bucket", "") c.Assert(err, check.IsNil) diff --git a/pkg/fs/api_suite_windows_test.go b/pkg/fs/api_suite_windows_test.go index 8e71bd831..940a4287f 100644 --- a/pkg/fs/api_suite_windows_test.go +++ b/pkg/fs/api_suite_windows_test.go @@ -32,7 +32,7 @@ import ( ) // APITestSuite - collection of API tests -func APITestSuite(c *check.C, create func() CloudStorage) { +func APITestSuite(c *check.C, create func() Filesystem) { testMakeBucket(c, create) testMultipleObjectCreation(c, create) testPaging(c, create) @@ -51,13 +51,13 @@ func APITestSuite(c *check.C, create func() CloudStorage) { testMultipartObjectAbort(c, create) } -func testMakeBucket(c *check.C, create func() CloudStorage) { +func testMakeBucket(c *check.C, create func() Filesystem) { fs := create() err := fs.MakeBucket("bucket", "") c.Assert(err, check.IsNil) } -func testMultipartObjectCreation(c *check.C, create func() CloudStorage) { +func testMultipartObjectCreation(c *check.C, create func() Filesystem) { fs := create() err := fs.MakeBucket("bucket", "") c.Assert(err, check.IsNil) @@ -95,7 +95,7 @@ func testMultipartObjectCreation(c *check.C, create func() CloudStorage) { c.Assert(objectMetadata.Md5, check.Equals, finalExpectedmd5SumHex) } -func testMultipartObjectAbort(c *check.C, create func() CloudStorage) { +func testMultipartObjectAbort(c *check.C, create func() Filesystem) { fs := create() err := fs.MakeBucket("bucket", "") c.Assert(err, check.IsNil) @@ -126,7 +126,7 @@ func testMultipartObjectAbort(c *check.C, create func() CloudStorage) { c.Assert(err, check.IsNil) } -func testMultipleObjectCreation(c *check.C, create func() CloudStorage) { +func testMultipleObjectCreation(c *check.C, create func() Filesystem) { objects := make(map[string][]byte) fs := create() err := fs.MakeBucket("bucket", "") @@ -162,7 +162,7 @@ func testMultipleObjectCreation(c *check.C, create func() CloudStorage) { } } -func testPaging(c *check.C, create func() CloudStorage) { +func testPaging(c *check.C, create func() Filesystem) { fs := create() fs.MakeBucket("bucket", "") resources := BucketResourcesMetadata{} @@ -295,7 +295,7 @@ func testPaging(c *check.C, create func() CloudStorage) { } } -func testObjectOverwriteWorks(c *check.C, create func() CloudStorage) { +func testObjectOverwriteWorks(c *check.C, create func() Filesystem) { fs := create() fs.MakeBucket("bucket", "") @@ -320,13 +320,13 @@ func testObjectOverwriteWorks(c *check.C, create func() CloudStorage) { c.Assert(string(bytesBuffer.Bytes()), check.Equals, "three") } -func testNonExistantBucketOperations(c *check.C, create func() CloudStorage) { +func testNonExistantBucketOperations(c *check.C, create func() Filesystem) { fs := create() _, err := fs.CreateObject("bucket", "object", "", int64(len("one")), bytes.NewBufferString("one"), nil) c.Assert(err, check.Not(check.IsNil)) } -func testBucketMetadata(c *check.C, create func() CloudStorage) { +func testBucketMetadata(c *check.C, create func() Filesystem) { fs := create() err := fs.MakeBucket("string", "private") c.Assert(err, check.IsNil) @@ -337,7 +337,7 @@ func testBucketMetadata(c *check.C, create func() CloudStorage) { c.Assert(metadata.ACL, check.Equals, BucketACL("public-read-write")) } -func testBucketRecreateFails(c *check.C, create func() CloudStorage) { +func testBucketRecreateFails(c *check.C, create func() Filesystem) { fs := create() err := fs.MakeBucket("string", "private") c.Assert(err, check.IsNil) @@ -345,7 +345,7 @@ func testBucketRecreateFails(c *check.C, create func() CloudStorage) { c.Assert(err, check.Not(check.IsNil)) } -func testPutObjectInSubdir(c *check.C, create func() CloudStorage) { +func testPutObjectInSubdir(c *check.C, create func() Filesystem) { fs := create() err := fs.MakeBucket("bucket", "private") c.Assert(err, check.IsNil) @@ -365,7 +365,7 @@ func testPutObjectInSubdir(c *check.C, create func() CloudStorage) { c.Assert(int64(len(bytesBuffer.Bytes())), check.Equals, length) } -func testListBuckets(c *check.C, create func() CloudStorage) { +func testListBuckets(c *check.C, create func() Filesystem) { fs := create() // test empty list @@ -397,7 +397,7 @@ func testListBuckets(c *check.C, create func() CloudStorage) { c.Assert(err, check.IsNil) } -func testListBucketsOrder(c *check.C, create func() CloudStorage) { +func testListBucketsOrder(c *check.C, create func() Filesystem) { // if implementation contains a map, order of map keys will vary. // this ensures they return in the same order each time for i := 0; i < 10; i++ { @@ -415,7 +415,7 @@ func testListBucketsOrder(c *check.C, create func() CloudStorage) { } } -func testListObjectsTestsForNonExistantBucket(c *check.C, create func() CloudStorage) { +func testListObjectsTestsForNonExistantBucket(c *check.C, create func() Filesystem) { fs := create() resources := BucketResourcesMetadata{Prefix: "", Maxkeys: 1000} objects, resources, err := fs.ListObjects("bucket", resources) @@ -424,7 +424,7 @@ func testListObjectsTestsForNonExistantBucket(c *check.C, create func() CloudSto c.Assert(len(objects), check.Equals, 0) } -func testNonExistantObjectInBucket(c *check.C, create func() CloudStorage) { +func testNonExistantObjectInBucket(c *check.C, create func() Filesystem) { fs := create() err := fs.MakeBucket("bucket", "") c.Assert(err, check.IsNil) @@ -446,7 +446,7 @@ func testNonExistantObjectInBucket(c *check.C, create func() CloudStorage) { } } -func testGetDirectoryReturnsObjectNotFound(c *check.C, create func() CloudStorage) { +func testGetDirectoryReturnsObjectNotFound(c *check.C, create func() Filesystem) { fs := create() err := fs.MakeBucket("bucket", "") c.Assert(err, check.IsNil) @@ -481,7 +481,7 @@ func testGetDirectoryReturnsObjectNotFound(c *check.C, create func() CloudStorag c.Assert(len(byteBuffer2.Bytes()), check.Equals, 0) } -func testDefaultContentType(c *check.C, create func() CloudStorage) { +func testDefaultContentType(c *check.C, create func() Filesystem) { fs := create() err := fs.MakeBucket("bucket", "") c.Assert(err, check.IsNil) @@ -493,7 +493,7 @@ func testDefaultContentType(c *check.C, create func() CloudStorage) { c.Assert(metadata.ContentType, check.Equals, "application/octet-stream") } -func testContentMd5Set(c *check.C, create func() CloudStorage) { +func testContentMd5Set(c *check.C, create func() Filesystem) { fs := create() err := fs.MakeBucket("bucket", "") c.Assert(err, check.IsNil) diff --git a/pkg/fs/errors.go b/pkg/fs/errors.go index d4ef4a1ff..c87d4730c 100644 --- a/pkg/fs/errors.go +++ b/pkg/fs/errors.go @@ -65,6 +65,15 @@ func (e UnsupportedFilesystem) Error() string { return "Unsupported filesystem: " + e.Type } +// RootPathFull root path out of space +type RootPathFull struct { + Path string +} + +func (e RootPathFull) Error() string { + return "Root path " + e.Path + " reached its minimum free disk threshold." +} + // BucketNotFound bucket does not exist type BucketNotFound struct { Bucket string diff --git a/pkg/fs/fs-bucket.go b/pkg/fs/fs-bucket.go new file mode 100644 index 000000000..c3106785b --- /dev/null +++ b/pkg/fs/fs-bucket.go @@ -0,0 +1,434 @@ +/* + * Minio Cloud Storage, (C) 2015 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fs + +import ( + "io/ioutil" + "os" + "path/filepath" + "runtime" + "strings" + + "github.com/minio/minio-xl/pkg/probe" + "github.com/minio/minio/pkg/disk" +) + +/// Bucket Operations + +// DeleteBucket - delete bucket +func (fs Filesystem) DeleteBucket(bucket string) *probe.Error { + fs.lock.Lock() + defer fs.lock.Unlock() + // verify bucket path legal + if !IsValidBucket(bucket) { + return probe.NewError(BucketNameInvalid{Bucket: bucket}) + } + bucketDir := filepath.Join(fs.path, bucket) + // check bucket exists + if _, err := os.Stat(bucketDir); os.IsNotExist(err) { + return probe.NewError(BucketNotFound{Bucket: bucket}) + } + if err := RemoveAllDirs(bucketDir); err != nil { + if err == ErrDirNotEmpty || strings.Contains(err.Error(), "directory not empty") { + return probe.NewError(BucketNotEmpty{Bucket: bucket}) + } + return probe.NewError(err) + } + if err := os.Remove(bucketDir); err != nil { + if strings.Contains(err.Error(), "directory not empty") { + return probe.NewError(BucketNotEmpty{Bucket: bucket}) + } + return probe.NewError(err) + } + return nil +} + +// ListBuckets - Get service +func (fs Filesystem) ListBuckets() ([]BucketMetadata, *probe.Error) { + fs.lock.Lock() + defer fs.lock.Unlock() + + files, err := ioutil.ReadDir(fs.path) + if err != nil { + return []BucketMetadata{}, probe.NewError(err) + } + + var metadataList []BucketMetadata + for _, file := range files { + if !file.IsDir() { + // if files found ignore them + continue + } + if file.IsDir() { + // if directories found with odd names, skip them too + if !IsValidBucket(file.Name()) { + continue + } + } + metadata := BucketMetadata{ + Name: file.Name(), + Created: file.ModTime(), + } + metadataList = append(metadataList, metadata) + } + return metadataList, nil +} + +// MakeBucket - PUT Bucket +func (fs Filesystem) MakeBucket(bucket, acl string) *probe.Error { + fs.lock.Lock() + defer fs.lock.Unlock() + + stfs, err := disk.Stat(fs.path) + if err != nil { + return probe.NewError(err) + } + + if int64((float64(stfs.Free)/float64(stfs.Total))*100) <= fs.minFreeDisk { + return probe.NewError(RootPathFull{Path: fs.path}) + } + + // verify bucket path legal + if !IsValidBucket(bucket) { + return probe.NewError(BucketNameInvalid{Bucket: bucket}) + } + + // get bucket path + bucketDir := filepath.Join(fs.path, bucket) + + // check if bucket exists + if _, err := os.Stat(bucketDir); err == nil { + return probe.NewError(BucketExists{ + Bucket: bucket, + }) + } + + // make bucket + err = os.Mkdir(bucketDir, aclToPerm(acl)) + if err != nil { + return probe.NewError(err) + } + return nil +} + +// GetBucketMetadata - +func (fs Filesystem) GetBucketMetadata(bucket string) (BucketMetadata, *probe.Error) { + fs.lock.Lock() + defer fs.lock.Unlock() + if !IsValidBucket(bucket) { + return BucketMetadata{}, probe.NewError(BucketNameInvalid{Bucket: bucket}) + } + // get bucket path + bucketDir := filepath.Join(fs.path, bucket) + bucketMetadata := BucketMetadata{} + fi, err := os.Stat(bucketDir) + // check if bucket exists + if os.IsNotExist(err) { + return BucketMetadata{}, probe.NewError(BucketNotFound{Bucket: bucket}) + } + if err != nil { + return BucketMetadata{}, probe.NewError(err) + } + + bucketMetadata.Name = fi.Name() + bucketMetadata.Created = fi.ModTime() + bucketMetadata.ACL = permToACL(fi.Mode()) + return bucketMetadata, nil +} + +// permToACL - convert perm to meaningful ACL +func permToACL(mode os.FileMode) BucketACL { + switch mode.Perm() { + case os.FileMode(0700): + return BucketACL("private") + case os.FileMode(0500): + return BucketACL("public-read") + case os.FileMode(0777): + return BucketACL("public-read-write") + default: + return BucketACL("private") + } +} + +// aclToPerm - convert acl to filesystem mode +func aclToPerm(acl string) os.FileMode { + switch acl { + case "private": + return os.FileMode(0700) + case "public-read": + return os.FileMode(0500) + case "public-read-write": + return os.FileMode(0777) + default: + return os.FileMode(0700) + } +} + +// SetBucketMetadata - +func (fs Filesystem) SetBucketMetadata(bucket string, metadata map[string]string) *probe.Error { + fs.lock.Lock() + defer fs.lock.Unlock() + if !IsValidBucket(bucket) { + return probe.NewError(BucketNameInvalid{Bucket: bucket}) + } + acl := metadata["acl"] + if !IsValidBucketACL(acl) { + return probe.NewError(InvalidACL{ACL: acl}) + } + // get bucket path + bucketDir := filepath.Join(fs.path, bucket) + err := os.Chmod(bucketDir, aclToPerm(acl)) + if err != nil { + return probe.NewError(err) + } + return nil +} + +// ListObjects - GET bucket (list objects) +func (fs Filesystem) ListObjects(bucket string, resources BucketResourcesMetadata) ([]ObjectMetadata, BucketResourcesMetadata, *probe.Error) { + fs.lock.Lock() + defer fs.lock.Unlock() + if !IsValidBucket(bucket) { + return nil, resources, probe.NewError(BucketNameInvalid{Bucket: bucket}) + } + if resources.Prefix != "" && IsValidObjectName(resources.Prefix) == false { + return nil, resources, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: resources.Prefix}) + } + + p := bucketDir{} + rootPrefix := filepath.Join(fs.path, bucket) + // check bucket exists + if _, err := os.Stat(rootPrefix); os.IsNotExist(err) { + return nil, resources, probe.NewError(BucketNotFound{Bucket: bucket}) + } + + p.root = rootPrefix + /// automatically treat "/" delimiter as "\\" delimiter on windows due to its path constraints. + if resources.Delimiter == "/" { + if runtime.GOOS == "windows" { + resources.Delimiter = string(os.PathSeparator) + } + } + + // if delimiter is supplied and not prefix then we are the very top level, list everything and move on. + if resources.Delimiter != "" && resources.Prefix == "" { + files, err := ioutil.ReadDir(rootPrefix) + if err != nil { + if os.IsNotExist(err) { + return nil, resources, probe.NewError(BucketNotFound{Bucket: bucket}) + } + return nil, resources, probe.NewError(err) + } + for _, fl := range files { + p.files = append(p.files, contentInfo{ + Prefix: fl.Name(), + Size: fl.Size(), + Mode: fl.Mode(), + ModTime: fl.ModTime(), + FileInfo: fl, + }) + } + } + + // If delimiter and prefix is supplied make sure that paging doesn't go deep, treat it as simple directory listing. + if resources.Delimiter != "" && resources.Prefix != "" { + if !strings.HasSuffix(resources.Prefix, resources.Delimiter) { + fl, err := os.Stat(filepath.Join(rootPrefix, resources.Prefix)) + if err != nil { + if os.IsNotExist(err) { + return nil, resources, probe.NewError(ObjectNotFound{Bucket: bucket, Object: resources.Prefix}) + } + return nil, resources, probe.NewError(err) + } + p.files = append(p.files, contentInfo{ + Prefix: resources.Prefix, + Size: fl.Size(), + Mode: os.ModeDir, + ModTime: fl.ModTime(), + FileInfo: fl, + }) + } else { + files, err := ioutil.ReadDir(filepath.Join(rootPrefix, resources.Prefix)) + if err != nil { + if os.IsNotExist(err) { + return nil, resources, probe.NewError(ObjectNotFound{Bucket: bucket, Object: resources.Prefix}) + } + return nil, resources, probe.NewError(err) + } + for _, fl := range files { + prefix := fl.Name() + if resources.Prefix != "" { + prefix = filepath.Join(resources.Prefix, fl.Name()) + } + p.files = append(p.files, contentInfo{ + Prefix: prefix, + Size: fl.Size(), + Mode: fl.Mode(), + ModTime: fl.ModTime(), + FileInfo: fl, + }) + } + } + } + if resources.Delimiter == "" { + var files []contentInfo + getAllFiles := func(fp string, fl os.FileInfo, err error) error { + // If any error return back quickly + if err != nil { + return err + } + if strings.HasSuffix(fp, "$multiparts") { + return nil + } + // if file pointer equals to rootPrefix - discard it + if fp == p.root { + return nil + } + if len(files) > resources.Maxkeys { + return ErrSkipFile + } + // Split the root prefix from the incoming file pointer + realFp := "" + if runtime.GOOS == "windows" { + if splits := strings.Split(fp, (p.root + string(os.PathSeparator))); len(splits) > 1 { + realFp = splits[1] + } + } else { + if splits := strings.Split(fp, (p.root + string(os.PathSeparator))); len(splits) > 1 { + realFp = splits[1] + } + } + // If path is a directory and has a prefix verify if the file pointer + // has the prefix if it does not skip the directory. + if fl.Mode().IsDir() { + if resources.Prefix != "" { + if !strings.HasPrefix(fp, filepath.Join(p.root, resources.Prefix)) { + return ErrSkipDir + } + } + } + // If path is a directory and has a marker verify if the file split file pointer + // is lesser than the Marker top level directory if yes skip it. + if fl.Mode().IsDir() { + if resources.Marker != "" { + if realFp != "" { + if runtime.GOOS == "windows" { + if realFp < strings.Split(resources.Marker, string(os.PathSeparator))[0] { + return ErrSkipDir + } + } else { + if realFp < strings.Split(resources.Marker, string(os.PathSeparator))[0] { + return ErrSkipDir + } + } + } + } + } + // If regular file verify + if fl.Mode().IsRegular() { + // If marker is present this will be used to check if filepointer is + // lexically higher than then Marker + if realFp != "" { + if resources.Marker != "" { + if realFp > resources.Marker { + files = append(files, contentInfo{ + Prefix: realFp, + Size: fl.Size(), + Mode: fl.Mode(), + ModTime: fl.ModTime(), + FileInfo: fl, + }) + } + } else { + files = append(files, contentInfo{ + Prefix: realFp, + Size: fl.Size(), + Mode: fl.Mode(), + ModTime: fl.ModTime(), + FileInfo: fl, + }) + } + } + } + // If file is a symlink follow it and populate values. + if fl.Mode()&os.ModeSymlink == os.ModeSymlink { + st, err := os.Stat(fp) + if err != nil { + return nil + } + // If marker is present this will be used to check if filepointer is + // lexically higher than then Marker + if realFp != "" { + if resources.Marker != "" { + if realFp > resources.Marker { + files = append(files, contentInfo{ + Prefix: realFp, + Size: st.Size(), + Mode: st.Mode(), + ModTime: st.ModTime(), + FileInfo: st, + }) + } + } else { + files = append(files, contentInfo{ + Prefix: realFp, + Size: st.Size(), + Mode: st.Mode(), + ModTime: st.ModTime(), + FileInfo: st, + }) + } + } + } + p.files = files + return nil + } + // If no delimiter is specified, crawl through everything. + err := Walk(rootPrefix, getAllFiles) + if err != nil { + if os.IsNotExist(err) { + return nil, resources, probe.NewError(ObjectNotFound{Bucket: bucket, Object: resources.Prefix}) + } + return nil, resources, probe.NewError(err) + } + } + + var metadataList []ObjectMetadata + var metadata ObjectMetadata + + // Filter objects + for _, content := range p.files { + if len(metadataList) == resources.Maxkeys { + resources.IsTruncated = true + if resources.IsTruncated && resources.Delimiter != "" { + resources.NextMarker = metadataList[len(metadataList)-1].Object + } + break + } + if content.Prefix > resources.Marker { + var err *probe.Error + metadata, resources, err = fs.filterObjects(bucket, content, resources) + if err != nil { + return nil, resources, err.Trace() + } + if metadata.Bucket != "" { + metadataList = append(metadataList, metadata) + } + } + } + return metadataList, resources, nil +} diff --git a/pkg/fs/fs-filter.go b/pkg/fs/fs-filter.go index e2cbf7f03..1844e5ac4 100644 --- a/pkg/fs/fs-filter.go +++ b/pkg/fs/fs-filter.go @@ -22,7 +22,7 @@ import ( "github.com/minio/minio-xl/pkg/probe" ) -func (fs API) filterObjects(bucket string, content contentInfo, resources BucketResourcesMetadata) (ObjectMetadata, BucketResourcesMetadata, *probe.Error) { +func (fs Filesystem) filterObjects(bucket string, content contentInfo, resources BucketResourcesMetadata) (ObjectMetadata, BucketResourcesMetadata, *probe.Error) { var err *probe.Error var metadata ObjectMetadata diff --git a/pkg/fs/fs-multipart.go b/pkg/fs/fs-multipart.go index 383ed7bca..d520e4930 100644 --- a/pkg/fs/fs-multipart.go +++ b/pkg/fs/fs-multipart.go @@ -39,9 +39,10 @@ import ( "github.com/minio/minio-xl/pkg/crypto/sha256" "github.com/minio/minio-xl/pkg/crypto/sha512" "github.com/minio/minio-xl/pkg/probe" + "github.com/minio/minio/pkg/disk" ) -func (fs API) isValidUploadID(object, uploadID string) bool { +func (fs Filesystem) isValidUploadID(object, uploadID string) bool { s, ok := fs.multiparts.ActiveSession[object] if !ok { return false @@ -53,7 +54,7 @@ func (fs API) isValidUploadID(object, uploadID string) bool { } // ListMultipartUploads - list incomplete multipart sessions for a given BucketMultipartResourcesMetadata -func (fs API) ListMultipartUploads(bucket string, resources BucketMultipartResourcesMetadata) (BucketMultipartResourcesMetadata, *probe.Error) { +func (fs Filesystem) ListMultipartUploads(bucket string, resources BucketMultipartResourcesMetadata) (BucketMultipartResourcesMetadata, *probe.Error) { fs.lock.Lock() defer fs.lock.Unlock() if !IsValidBucket(bucket) { @@ -113,7 +114,7 @@ func (fs API) ListMultipartUploads(bucket string, resources BucketMultipartResou return resources, nil } -func (fs API) concatParts(parts *CompleteMultipartUpload, objectPath string, mw io.Writer) *probe.Error { +func (fs Filesystem) concatParts(parts *CompleteMultipartUpload, objectPath string, mw io.Writer) *probe.Error { for _, part := range parts.Part { recvMD5 := part.ETag partFile, err := os.OpenFile(objectPath+fmt.Sprintf("$%d", part.PartNumber), os.O_RDONLY, 0600) @@ -143,9 +144,19 @@ func (fs API) concatParts(parts *CompleteMultipartUpload, objectPath string, mw } // NewMultipartUpload - initiate a new multipart session -func (fs API) NewMultipartUpload(bucket, object string) (string, *probe.Error) { +func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.Error) { fs.lock.Lock() defer fs.lock.Unlock() + + stfs, err := disk.Stat(fs.path) + if err != nil { + return "", probe.NewError(err) + } + + if int64((float64(stfs.Free)/float64(stfs.Total))*100) <= fs.minFreeDisk { + return "", probe.NewError(RootPathFull{Path: fs.path}) + } + if !IsValidBucket(bucket) { return "", probe.NewError(BucketNameInvalid{Bucket: bucket}) } @@ -154,7 +165,7 @@ func (fs API) NewMultipartUpload(bucket, object string) (string, *probe.Error) { } bucketPath := filepath.Join(fs.path, bucket) - _, err := os.Stat(bucketPath) + _, err = os.Stat(bucketPath) // check bucket exists if os.IsNotExist(err) { return "", probe.NewError(BucketNotFound{Bucket: bucket}) @@ -208,10 +219,19 @@ func (a partNumber) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a partNumber) Less(i, j int) bool { return a[i].PartNumber < a[j].PartNumber } // CreateObjectPart - create a part in a multipart session -func (fs API) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum string, partID int, size int64, data io.Reader, signature *Signature) (string, *probe.Error) { +func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum string, partID int, size int64, data io.Reader, signature *Signature) (string, *probe.Error) { fs.lock.Lock() defer fs.lock.Unlock() + stfs, err := disk.Stat(fs.path) + if err != nil { + return "", probe.NewError(err) + } + + if int64((float64(stfs.Free)/float64(stfs.Total))*100) <= fs.minFreeDisk { + return "", probe.NewError(RootPathFull{Path: fs.path}) + } + if partID <= 0 { return "", probe.NewError(errors.New("invalid part id, cannot be zero or less than zero")) } @@ -230,7 +250,8 @@ func (fs API) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum string, } if strings.TrimSpace(expectedMD5Sum) != "" { - expectedMD5SumBytes, err := base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum)) + var expectedMD5SumBytes []byte + expectedMD5SumBytes, err = base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum)) if err != nil { // pro-actively close the connection return "", probe.NewError(InvalidDigest{Md5: expectedMD5Sum}) @@ -239,7 +260,7 @@ func (fs API) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum string, } bucketPath := filepath.Join(fs.path, bucket) - if _, err := os.Stat(bucketPath); err != nil { + if _, err = os.Stat(bucketPath); err != nil { // check bucket exists if os.IsNotExist(err) { return "", probe.NewError(BucketNotFound{Bucket: bucket}) @@ -321,7 +342,7 @@ func (fs API) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum string, } // CompleteMultipartUpload - complete a multipart upload and persist the data -func (fs API) CompleteMultipartUpload(bucket, object, uploadID string, data io.Reader, signature *Signature) (ObjectMetadata, *probe.Error) { +func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, data io.Reader, signature *Signature) (ObjectMetadata, *probe.Error) { fs.lock.Lock() defer fs.lock.Unlock() @@ -424,7 +445,7 @@ func (fs API) CompleteMultipartUpload(bucket, object, uploadID string, data io.R } // ListObjectParts - list parts from incomplete multipart session for a given ObjectResourcesMetadata -func (fs API) ListObjectParts(bucket, object string, resources ObjectResourcesMetadata) (ObjectResourcesMetadata, *probe.Error) { +func (fs Filesystem) ListObjectParts(bucket, object string, resources ObjectResourcesMetadata) (ObjectResourcesMetadata, *probe.Error) { fs.lock.Lock() defer fs.lock.Unlock() @@ -493,7 +514,7 @@ func (fs API) ListObjectParts(bucket, object string, resources ObjectResourcesMe } // AbortMultipartUpload - abort an incomplete multipart session -func (fs API) AbortMultipartUpload(bucket, object, uploadID string) *probe.Error { +func (fs Filesystem) AbortMultipartUpload(bucket, object, uploadID string) *probe.Error { fs.lock.Lock() defer fs.lock.Unlock() diff --git a/pkg/fs/fs-object.go b/pkg/fs/fs-object.go index 2c8523c6e..8cf5e33d5 100644 --- a/pkg/fs/fs-object.go +++ b/pkg/fs/fs-object.go @@ -32,12 +32,13 @@ import ( "github.com/minio/minio-xl/pkg/atomic" "github.com/minio/minio-xl/pkg/crypto/sha256" "github.com/minio/minio-xl/pkg/probe" + "github.com/minio/minio/pkg/disk" ) /// Object Operations // GetObject - GET object -func (fs API) GetObject(w io.Writer, bucket, object string, start, length int64) (int64, *probe.Error) { +func (fs Filesystem) GetObject(w io.Writer, bucket, object string, start, length int64) (int64, *probe.Error) { fs.lock.Lock() defer fs.lock.Unlock() @@ -91,7 +92,7 @@ func (fs API) GetObject(w io.Writer, bucket, object string, start, length int64) } // GetObjectMetadata - HEAD object -func (fs API) GetObjectMetadata(bucket, object string) (ObjectMetadata, *probe.Error) { +func (fs Filesystem) GetObjectMetadata(bucket, object string) (ObjectMetadata, *probe.Error) { fs.lock.Lock() defer fs.lock.Unlock() @@ -161,16 +162,25 @@ func isMD5SumEqual(expectedMD5Sum, actualMD5Sum string) *probe.Error { } // CreateObject - PUT object -func (fs API) CreateObject(bucket, object, expectedMD5Sum string, size int64, data io.Reader, signature *Signature) (ObjectMetadata, *probe.Error) { +func (fs Filesystem) CreateObject(bucket, object, expectedMD5Sum string, size int64, data io.Reader, signature *Signature) (ObjectMetadata, *probe.Error) { fs.lock.Lock() defer fs.lock.Unlock() + stfs, err := disk.Stat(fs.path) + if err != nil { + return ObjectMetadata{}, probe.NewError(err) + } + + if int64((float64(stfs.Free)/float64(stfs.Total))*100) <= fs.minFreeDisk { + return ObjectMetadata{}, probe.NewError(RootPathFull{Path: fs.path}) + } + // check bucket name valid if !IsValidBucket(bucket) { return ObjectMetadata{}, probe.NewError(BucketNameInvalid{Bucket: bucket}) } // check bucket exists - if _, err := os.Stat(filepath.Join(fs.path, bucket)); os.IsNotExist(err) { + if _, err = os.Stat(filepath.Join(fs.path, bucket)); os.IsNotExist(err) { return ObjectMetadata{}, probe.NewError(BucketNotFound{Bucket: bucket}) } // verify object path legal @@ -181,7 +191,8 @@ func (fs API) CreateObject(bucket, object, expectedMD5Sum string, size int64, da // get object path objectPath := filepath.Join(fs.path, bucket, object) if strings.TrimSpace(expectedMD5Sum) != "" { - expectedMD5SumBytes, err := base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum)) + var expectedMD5SumBytes []byte + expectedMD5SumBytes, err = base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum)) if err != nil { // pro-actively close the connection return ObjectMetadata{}, probe.NewError(InvalidDigest{Md5: expectedMD5Sum}) @@ -252,7 +263,7 @@ func (fs API) CreateObject(bucket, object, expectedMD5Sum string, size int64, da } // DeleteObject - delete and object -func (fs API) DeleteObject(bucket, object string) *probe.Error { +func (fs Filesystem) DeleteObject(bucket, object string) *probe.Error { fs.lock.Lock() defer fs.lock.Unlock() diff --git a/pkg/fs/fs.go b/pkg/fs/fs.go index 238b5a60a..93dfd66e1 100644 --- a/pkg/fs/fs.go +++ b/pkg/fs/fs.go @@ -18,22 +18,18 @@ package fs import ( "os" - "runtime" - "strings" "sync" "time" - "io/ioutil" - "path/filepath" - "github.com/minio/minio-xl/pkg/probe" ) -// API - local variables -type API struct { - path string - lock *sync.Mutex - multiparts *Multiparts +// Filesystem - local variables +type Filesystem struct { + path string + minFreeDisk int64 + lock *sync.Mutex + multiparts *Multiparts } // MultipartSession holds active session information @@ -51,7 +47,7 @@ type Multiparts struct { } // New instantiate a new donut -func New(path string) (CloudStorage, *probe.Error) { +func New() (Filesystem, *probe.Error) { var err *probe.Error // load multiparts session from disk var multiparts *Multiparts @@ -63,412 +59,27 @@ func New(path string) (CloudStorage, *probe.Error) { ActiveSession: make(map[string]*MultipartSession), } if err := SaveMultipartsSession(multiparts); err != nil { - return nil, err.Trace() + return Filesystem{}, err.Trace() } } else { - return nil, err.Trace() + return Filesystem{}, err.Trace() } } - a := API{ - path: path, - lock: new(sync.Mutex), - } + a := Filesystem{lock: new(sync.Mutex)} a.multiparts = multiparts return a, nil } -/// Bucket Operations - -// DeleteBucket - delete bucket -func (fs API) DeleteBucket(bucket string) *probe.Error { +// SetRootPath - set root path +func (fs *Filesystem) SetRootPath(path string) { fs.lock.Lock() defer fs.lock.Unlock() - - // verify bucket path legal - if !IsValidBucket(bucket) { - return probe.NewError(BucketNameInvalid{Bucket: bucket}) - } - bucketDir := filepath.Join(fs.path, bucket) - // check bucket exists - if _, err := os.Stat(bucketDir); os.IsNotExist(err) { - return probe.NewError(BucketNotFound{Bucket: bucket}) - } - if err := RemoveAllDirs(bucketDir); err != nil { - if err == ErrDirNotEmpty || strings.Contains(err.Error(), "directory not empty") { - return probe.NewError(BucketNotEmpty{Bucket: bucket}) - } - return probe.NewError(err) - } - if err := os.Remove(bucketDir); err != nil { - if strings.Contains(err.Error(), "directory not empty") { - return probe.NewError(BucketNotEmpty{Bucket: bucket}) - } - return probe.NewError(err) - } - return nil + fs.path = path } -// ListBuckets - Get service -func (fs API) ListBuckets() ([]BucketMetadata, *probe.Error) { +// SetMinFreeDisk - set min free disk +func (fs *Filesystem) SetMinFreeDisk(minFreeDisk int64) { fs.lock.Lock() defer fs.lock.Unlock() - - files, err := ioutil.ReadDir(fs.path) - if err != nil { - return []BucketMetadata{}, probe.NewError(err) - } - - var metadataList []BucketMetadata - for _, file := range files { - if !file.IsDir() { - // if files found ignore them - continue - } - if file.IsDir() { - // if directories found with odd names, skip them too - if !IsValidBucket(file.Name()) { - continue - } - } - metadata := BucketMetadata{ - Name: file.Name(), - Created: file.ModTime(), - } - metadataList = append(metadataList, metadata) - } - return metadataList, nil -} - -// MakeBucket - PUT Bucket -func (fs API) MakeBucket(bucket, acl string) *probe.Error { - fs.lock.Lock() - defer fs.lock.Unlock() - - // verify bucket path legal - if !IsValidBucket(bucket) { - return probe.NewError(BucketNameInvalid{Bucket: bucket}) - } - - // get bucket path - bucketDir := filepath.Join(fs.path, bucket) - - // check if bucket exists - if _, err := os.Stat(bucketDir); err == nil { - return probe.NewError(BucketExists{ - Bucket: bucket, - }) - } - - // make bucket - err := os.Mkdir(bucketDir, aclToPerm(acl)) - if err != nil { - return probe.NewError(err) - } - return nil -} - -// GetBucketMetadata - -func (fs API) GetBucketMetadata(bucket string) (BucketMetadata, *probe.Error) { - fs.lock.Lock() - defer fs.lock.Unlock() - if !IsValidBucket(bucket) { - return BucketMetadata{}, probe.NewError(BucketNameInvalid{Bucket: bucket}) - } - // get bucket path - bucketDir := filepath.Join(fs.path, bucket) - bucketMetadata := BucketMetadata{} - fi, err := os.Stat(bucketDir) - // check if bucket exists - if os.IsNotExist(err) { - return BucketMetadata{}, probe.NewError(BucketNotFound{Bucket: bucket}) - } - if err != nil { - return BucketMetadata{}, probe.NewError(err) - } - - bucketMetadata.Name = fi.Name() - bucketMetadata.Created = fi.ModTime() - bucketMetadata.ACL = permToACL(fi.Mode()) - return bucketMetadata, nil -} - -// permToACL - convert perm to meaningful ACL -func permToACL(mode os.FileMode) BucketACL { - switch mode.Perm() { - case os.FileMode(0700): - return BucketACL("private") - case os.FileMode(0500): - return BucketACL("public-read") - case os.FileMode(0777): - return BucketACL("public-read-write") - default: - return BucketACL("private") - } -} - -// aclToPerm - convert acl to filesystem mode -func aclToPerm(acl string) os.FileMode { - switch acl { - case "private": - return os.FileMode(0700) - case "public-read": - return os.FileMode(0500) - case "public-read-write": - return os.FileMode(0777) - default: - return os.FileMode(0700) - } -} - -// SetBucketMetadata - -func (fs API) SetBucketMetadata(bucket string, metadata map[string]string) *probe.Error { - fs.lock.Lock() - defer fs.lock.Unlock() - if !IsValidBucket(bucket) { - return probe.NewError(BucketNameInvalid{Bucket: bucket}) - } - acl := metadata["acl"] - if !IsValidBucketACL(acl) { - return probe.NewError(InvalidACL{ACL: acl}) - } - // get bucket path - bucketDir := filepath.Join(fs.path, bucket) - err := os.Chmod(bucketDir, aclToPerm(acl)) - if err != nil { - return probe.NewError(err) - } - return nil -} - -// ListObjects - GET bucket (list objects) -func (fs API) ListObjects(bucket string, resources BucketResourcesMetadata) ([]ObjectMetadata, BucketResourcesMetadata, *probe.Error) { - if !IsValidBucket(bucket) { - return nil, resources, probe.NewError(BucketNameInvalid{Bucket: bucket}) - } - if resources.Prefix != "" && IsValidObjectName(resources.Prefix) == false { - return nil, resources, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: resources.Prefix}) - } - - p := bucketDir{} - rootPrefix := filepath.Join(fs.path, bucket) - // check bucket exists - if _, err := os.Stat(rootPrefix); os.IsNotExist(err) { - return nil, resources, probe.NewError(BucketNotFound{Bucket: bucket}) - } - - p.root = rootPrefix - /// automatically treat "/" delimiter as "\\" delimiter on windows due to its path constraints. - if resources.Delimiter == "/" { - if runtime.GOOS == "windows" { - resources.Delimiter = string(os.PathSeparator) - } - } - - // if delimiter is supplied and not prefix then we are the very top level, list everything and move on. - if resources.Delimiter != "" && resources.Prefix == "" { - files, err := ioutil.ReadDir(rootPrefix) - if err != nil { - if os.IsNotExist(err) { - return nil, resources, probe.NewError(BucketNotFound{Bucket: bucket}) - } - return nil, resources, probe.NewError(err) - } - for _, fl := range files { - p.files = append(p.files, contentInfo{ - Prefix: fl.Name(), - Size: fl.Size(), - Mode: fl.Mode(), - ModTime: fl.ModTime(), - FileInfo: fl, - }) - } - } - - // If delimiter and prefix is supplied make sure that paging doesn't go deep, treat it as simple directory listing. - if resources.Delimiter != "" && resources.Prefix != "" { - if !strings.HasSuffix(resources.Prefix, resources.Delimiter) { - fl, err := os.Stat(filepath.Join(rootPrefix, resources.Prefix)) - if err != nil { - if os.IsNotExist(err) { - return nil, resources, probe.NewError(ObjectNotFound{Bucket: bucket, Object: resources.Prefix}) - } - return nil, resources, probe.NewError(err) - } - p.files = append(p.files, contentInfo{ - Prefix: resources.Prefix, - Size: fl.Size(), - Mode: os.ModeDir, - ModTime: fl.ModTime(), - FileInfo: fl, - }) - } else { - files, err := ioutil.ReadDir(filepath.Join(rootPrefix, resources.Prefix)) - if err != nil { - if os.IsNotExist(err) { - return nil, resources, probe.NewError(ObjectNotFound{Bucket: bucket, Object: resources.Prefix}) - } - return nil, resources, probe.NewError(err) - } - for _, fl := range files { - prefix := fl.Name() - if resources.Prefix != "" { - prefix = filepath.Join(resources.Prefix, fl.Name()) - } - p.files = append(p.files, contentInfo{ - Prefix: prefix, - Size: fl.Size(), - Mode: fl.Mode(), - ModTime: fl.ModTime(), - FileInfo: fl, - }) - } - } - } - if resources.Delimiter == "" { - var files []contentInfo - getAllFiles := func(fp string, fl os.FileInfo, err error) error { - // If any error return back quickly - if err != nil { - return err - } - if strings.HasSuffix(fp, "$multiparts") { - return nil - } - // if file pointer equals to rootPrefix - discard it - if fp == p.root { - return nil - } - if len(files) > resources.Maxkeys { - return ErrSkipFile - } - // Split the root prefix from the incoming file pointer - realFp := "" - if runtime.GOOS == "windows" { - if splits := strings.Split(fp, (p.root + string(os.PathSeparator))); len(splits) > 1 { - realFp = splits[1] - } - } else { - if splits := strings.Split(fp, (p.root + string(os.PathSeparator))); len(splits) > 1 { - realFp = splits[1] - } - } - // If path is a directory and has a prefix verify if the file pointer - // has the prefix if it does not skip the directory. - if fl.Mode().IsDir() { - if resources.Prefix != "" { - if !strings.HasPrefix(fp, filepath.Join(p.root, resources.Prefix)) { - return ErrSkipDir - } - } - } - // If path is a directory and has a marker verify if the file split file pointer - // is lesser than the Marker top level directory if yes skip it. - if fl.Mode().IsDir() { - if resources.Marker != "" { - if realFp != "" { - if runtime.GOOS == "windows" { - if realFp < strings.Split(resources.Marker, string(os.PathSeparator))[0] { - return ErrSkipDir - } - } else { - if realFp < strings.Split(resources.Marker, string(os.PathSeparator))[0] { - return ErrSkipDir - } - } - } - } - } - // If regular file verify - if fl.Mode().IsRegular() { - // If marker is present this will be used to check if filepointer is - // lexically higher than then Marker - if realFp != "" { - if resources.Marker != "" { - if realFp > resources.Marker { - files = append(files, contentInfo{ - Prefix: realFp, - Size: fl.Size(), - Mode: fl.Mode(), - ModTime: fl.ModTime(), - FileInfo: fl, - }) - } - } else { - files = append(files, contentInfo{ - Prefix: realFp, - Size: fl.Size(), - Mode: fl.Mode(), - ModTime: fl.ModTime(), - FileInfo: fl, - }) - } - } - } - // If file is a symlink follow it and populate values. - if fl.Mode()&os.ModeSymlink == os.ModeSymlink { - st, err := os.Stat(fp) - if err != nil { - return nil - } - // If marker is present this will be used to check if filepointer is - // lexically higher than then Marker - if realFp != "" { - if resources.Marker != "" { - if realFp > resources.Marker { - files = append(files, contentInfo{ - Prefix: realFp, - Size: st.Size(), - Mode: st.Mode(), - ModTime: st.ModTime(), - FileInfo: st, - }) - } - } else { - files = append(files, contentInfo{ - Prefix: realFp, - Size: st.Size(), - Mode: st.Mode(), - ModTime: st.ModTime(), - FileInfo: st, - }) - } - } - } - p.files = files - return nil - } - // If no delimiter is specified, crawl through everything. - err := Walk(rootPrefix, getAllFiles) - if err != nil { - if os.IsNotExist(err) { - return nil, resources, probe.NewError(ObjectNotFound{Bucket: bucket, Object: resources.Prefix}) - } - return nil, resources, probe.NewError(err) - } - } - - var metadataList []ObjectMetadata - var metadata ObjectMetadata - - // Filter objects - for _, content := range p.files { - if len(metadataList) == resources.Maxkeys { - resources.IsTruncated = true - if resources.IsTruncated && resources.Delimiter != "" { - resources.NextMarker = metadataList[len(metadataList)-1].Object - } - break - } - if content.Prefix > resources.Marker { - var err *probe.Error - metadata, resources, err = fs.filterObjects(bucket, content, resources) - if err != nil { - return nil, resources, err.Trace() - } - if metadata.Bucket != "" { - metadataList = append(metadataList, metadata) - } - } - } - return metadataList, resources, nil + fs.minFreeDisk = minFreeDisk } diff --git a/pkg/fs/fs_test.go b/pkg/fs/fs_test.go index bfdc83d3d..086fbbb26 100644 --- a/pkg/fs/fs_test.go +++ b/pkg/fs/fs_test.go @@ -33,14 +33,16 @@ var _ = Suite(&MySuite{}) func (s *MySuite) TestAPISuite(c *C) { var storageList []string - create := func() CloudStorage { + create := func() Filesystem { configPath, err := ioutil.TempDir(os.TempDir(), "minio-") c.Check(err, IsNil) path, err := ioutil.TempDir(os.TempDir(), "minio-") c.Check(err, IsNil) SetFSMultipartsConfigPath(filepath.Join(configPath, "multiparts.json")) storageList = append(storageList, path) - store, perr := New(path) + store, perr := New() + store.SetRootPath(path) + store.SetMinFreeDisk(0) c.Check(perr, IsNil) return store } diff --git a/pkg/fs/interfaces.go b/pkg/fs/interfaces.go deleted file mode 100644 index 3968dcf5e..000000000 --- a/pkg/fs/interfaces.go +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Minio Cloud Storage, (C) 2015 Minio, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package fs - -import ( - "io" - - "github.com/minio/minio-xl/pkg/probe" -) - -// CloudStorage is a fs cloud storage interface -type CloudStorage interface { - // Storage service operations - GetBucketMetadata(bucket string) (BucketMetadata, *probe.Error) - SetBucketMetadata(bucket string, metadata map[string]string) *probe.Error - ListBuckets() ([]BucketMetadata, *probe.Error) - MakeBucket(bucket, acl string) *probe.Error - DeleteBucket(bucket string) *probe.Error - - // Bucket operations - ListObjects(bucket string, resources BucketResourcesMetadata) ([]ObjectMetadata, BucketResourcesMetadata, *probe.Error) - - // Object operations - GetObject(w io.Writer, bucket, object string, start, length int64) (int64, *probe.Error) - GetObjectMetadata(bucket, object string) (ObjectMetadata, *probe.Error) - // bucket, object, expectedMD5Sum, size, reader, metadata, signature - CreateObject(bucket, object, md5sum string, size int64, data io.Reader, signature *Signature) (ObjectMetadata, *probe.Error) - DeleteObject(bucket, object string) *probe.Error - - // Multipart API - Multipart - - // ACL API - ACL -} - -// Multipart API -type Multipart interface { - NewMultipartUpload(bucket, object string) (string, *probe.Error) - AbortMultipartUpload(bucket, object, uploadID string) *probe.Error - CreateObjectPart(bucket, object, uploadID, md5sum string, partID int, size int64, data io.Reader, signature *Signature) (string, *probe.Error) - CompleteMultipartUpload(bucket, object, uploadID string, data io.Reader, signature *Signature) (ObjectMetadata, *probe.Error) - ListMultipartUploads(bucket string, resources BucketMultipartResourcesMetadata) (BucketMultipartResourcesMetadata, *probe.Error) - ListObjectParts(bucket, object string, objectResources ObjectResourcesMetadata) (ObjectResourcesMetadata, *probe.Error) -} - -// ACL API -type ACL interface { - IsPublicBucket(bucket string) bool - IsPrivateBucket(bucket string) bool - IsReadOnlyBucket(bucket string) bool -} diff --git a/routers.go b/routers.go index a92069516..e1987b292 100644 --- a/routers.go +++ b/routers.go @@ -23,8 +23,8 @@ import ( "github.com/minio/minio/pkg/fs" ) -// registerAPI - register all the object API handlers to their respective paths -func registerAPI(mux *router.Router, a API) { +// registerCloudStorageAPI - register all the handlers to their respective paths +func registerCloudStorageAPI(mux *router.Router, a CloudStorageAPI) { mux.HandleFunc("/", a.ListBucketsHandler).Methods("GET") mux.HandleFunc("/{bucket}", a.GetBucketACLHandler).Queries("acl", "").Methods("GET") mux.HandleFunc("/{bucket}", a.ListMultipartUploadsHandler).Queries("uploads", "").Methods("GET") @@ -46,35 +46,35 @@ func registerAPI(mux *router.Router, a API) { mux.HandleFunc("/{bucket}/{object:.*}", a.DeleteObjectHandler).Methods("DELETE") } -// API container for API and also carries OP (operation) channel -type API struct { - Filesystem fs.CloudStorage +// CloudStorageAPI container for API and also carries OP (operation) channel +type CloudStorageAPI struct { + Filesystem fs.Filesystem Anonymous bool // do not checking for incoming signatures, allow all requests } -// getNewAPI instantiate a new minio API -func getNewAPI(path string, anonymous bool) API { - // ignore errors for now - fs, err := fs.New(path) +// getNewCloudStorageAPI instantiate a new CloudStorageAPI +func getNewCloudStorageAPI(conf serverConfig) CloudStorageAPI { + fs, err := fs.New() fatalIf(err.Trace(), "Instantiating filesystem failed.", nil) - return API{ + fs.SetRootPath(conf.Path) + fs.SetMinFreeDisk(conf.MinFreeDisk) + return CloudStorageAPI{ Filesystem: fs, - Anonymous: anonymous, + Anonymous: conf.Anonymous, } } -func getAPIHandler(anonymous bool, api API) http.Handler { +func getCloudStorageAPIHandler(api CloudStorageAPI) http.Handler { var mwHandlers = []MiddlewareHandler{ TimeValidityHandler, IgnoreResourcesHandler, CorsHandler, } - if !anonymous { + if !api.Anonymous { mwHandlers = append(mwHandlers, SignatureHandler) } mux := router.NewRouter() - registerAPI(mux, api) - apiHandler := registerCustomMiddleware(mux, mwHandlers...) - return apiHandler + registerCloudStorageAPI(mux, api) + return registerCustomMiddleware(mux, mwHandlers...) } diff --git a/server-main.go b/server-main.go index cef302298..e197df341 100644 --- a/server-main.go +++ b/server-main.go @@ -24,6 +24,7 @@ import ( "net/http" "os" "runtime" + "strconv" "strings" "github.com/fatih/color" @@ -50,16 +51,19 @@ EXAMPLES: $ minio {{.Name}} C:\MyShare 3. Start minio server bound to a specific IP:PORT, when you have multiple network interfaces. - $ minio --address 192.168.1.101:9000 /home/shared + $ minio --address 192.168.1.101:9000 {{.Name}} /home/shared + + 4. Start minio server with minimum free disk threshold to 5% + $ minio --min-free-disk 5% {{.Name}} /home/shared/Pictures `, } // configureAPIServer configure a new server instance -func configureAPIServer(conf fsConfig, apiHandler http.Handler) (*http.Server, *probe.Error) { +func configureAPIServer(conf serverConfig) (*http.Server, *probe.Error) { // Minio server config apiServer := &http.Server{ Addr: conf.Address, - Handler: apiHandler, + Handler: getCloudStorageAPIHandler(getNewCloudStorageAPI(conf)), MaxHeaderBytes: 1 << 20, } @@ -109,34 +113,59 @@ func configureAPIServer(conf fsConfig, apiHandler http.Handler) (*http.Server, * } // startServer starts an s3 compatible cloud storage server -func startServer(conf fsConfig) *probe.Error { - minioAPI := getNewAPI(conf.Path, conf.Anonymous) - apiHandler := getAPIHandler(conf.Anonymous, minioAPI) - apiServer, err := configureAPIServer(conf, apiHandler) +func startServer(conf serverConfig) *probe.Error { + apiServer, err := configureAPIServer(conf) if err != nil { return err.Trace() } - if err := minhttp.ListenAndServe(apiServer); err != nil { + rateLimit := conf.RateLimit + if err := minhttp.ListenAndServeLimited(rateLimit, apiServer); err != nil { return err.Trace() } return nil } -func getServerConfig(c *cli.Context) fsConfig { +// parse input string with percent to int64 +func parsePercentToInt(s string, bitSize int) (int64, *probe.Error) { + i := strings.Index(s, "%") + if i < 0 { + // no percentage string found try to parse the whole string anyways + p, err := strconv.ParseInt(s, 10, bitSize) + if err != nil { + return 0, probe.NewError(err) + } + return p, nil + } + p, err := strconv.ParseInt(s[:i], 10, bitSize) + if err != nil { + return 0, probe.NewError(err) + } + return p, nil +} + +func getServerConfig(c *cli.Context) serverConfig { + path := strings.TrimSpace(c.Args().First()) + if path == "" { + fatalIf(probe.NewError(errInvalidArgument), "Path argument cannot be empty.", nil) + } certFile := c.GlobalString("cert") keyFile := c.GlobalString("key") if (certFile != "" && keyFile == "") || (certFile == "" && keyFile != "") { - Fatalln("Both certificate and key are required to enable https.") + fatalIf(probe.NewError(errInvalidArgument), "Both certificate and key are required to enable https.", nil) } + minFreeDisk, err := parsePercentToInt(c.GlobalString("min-free-disk"), 64) + fatalIf(err.Trace(c.GlobalString("min-free-disk")), "Unable to parse minimum free disk parameter.", nil) + tls := (certFile != "" && keyFile != "") - return fsConfig{ - Address: c.GlobalString("address"), - Path: strings.TrimSpace(c.Args().First()), - Anonymous: c.GlobalBool("anonymous"), - TLS: tls, - CertFile: certFile, - KeyFile: keyFile, - RateLimit: c.GlobalInt("ratelimit"), + return serverConfig{ + Address: c.GlobalString("address"), + Anonymous: c.GlobalBool("anonymous"), + Path: path, + MinFreeDisk: minFreeDisk, + TLS: tls, + CertFile: certFile, + KeyFile: keyFile, + RateLimit: c.GlobalInt("ratelimit"), } } diff --git a/server_fs_test.go b/server_fs_test.go index e7fc36d0a..2b9a1eb47 100644 --- a/server_fs_test.go +++ b/server_fs_test.go @@ -82,8 +82,13 @@ func (s *MyAPIFSCacheSuite) SetUpSuite(c *C) { perr = saveAuthConfig(authConf) c.Assert(perr, IsNil) - minioAPI := getNewAPI(fsroot, false) - httpHandler := getAPIHandler(false, minioAPI) + server := serverConfig{ + Path: fsroot, + MinFreeDisk: 0, + Anonymous: false, + } + cloudStorageAPI := getNewCloudStorageAPI(server) + httpHandler := getCloudStorageAPIHandler(cloudStorageAPI) testAPIFSCacheServer = httptest.NewServer(httpHandler) } diff --git a/typed-errors.go b/typed-errors.go index 86ef26b8c..555148766 100644 --- a/typed-errors.go +++ b/typed-errors.go @@ -18,15 +18,13 @@ package main import "errors" -// errInvalidArgument means that input argument is invalid -var errInvalidArgument = errors.New("Invalid Argument") +// errInvalidArgument means that input argument is invalid. +var errInvalidArgument = errors.New("Invalid arguments specified") -// errMissingAuthHeader means that Authorization header -// has missing value or it is empty. +// errMissingAuthHeader means that Authorization header has missing value or it is empty. var errMissingAuthHeaderValue = errors.New("Missing auth header value") -// errInvalidAuthHeaderValue means that Authorization -// header is available but is malformed and not in +// errInvalidAuthHeaderValue means that Authorization header is available but is malformed and not in // accordance with signature v4. var errInvalidAuthHeaderValue = errors.New("Invalid auth header value") @@ -34,39 +32,31 @@ var errInvalidAuthHeaderValue = errors.New("Invalid auth header value") // has a wrong prefix only supported value should be "AWS4-HMAC-SHA256". var errInvalidAuthHeaderPrefix = errors.New("Invalid auth header prefix") -// errMissingFieldsAuthHeader means that Authorization -// header is available but has some missing fields. +// errMissingFieldsAuthHeader means that Authorization header is available but has some missing fields. var errMissingFieldsAuthHeader = errors.New("Missing fields in auth header") -// errMissingFieldsCredentialTag means that Authorization -// header credentials tag has some missing fields. +// errMissingFieldsCredentialTag means that Authorization header credentials tag has some missing fields. var errMissingFieldsCredentialTag = errors.New("Missing fields in crendential tag") -// errMissingFieldsSignedHeadersTag means that Authorization -// header signed headers tag has some missing fields. +// errMissingFieldsSignedHeadersTag means that Authorization header signed headers tag has some missing fields. var errMissingFieldsSignedHeadersTag = errors.New("Missing fields in signed headers tag") -// errMissingFieldsSignatureTag means that Authorization -// header signature tag has missing fields. +// errMissingFieldsSignatureTag means that Authorization header signature tag has missing fields. var errMissingFieldsSignatureTag = errors.New("Missing fields in signature tag") -// errCredentialTagMalformed means that Authorization header -// credential tag is malformed. +// errCredentialTagMalformed means that Authorization header credential tag is malformed. var errCredentialTagMalformed = errors.New("Invalid credential tag malformed") -// errInvalidRegion means that the region element from credential tag -// in Authorization header is invalid. +// errInvalidRegion means that the region element from credential tag in Authorization header is invalid. var errInvalidRegion = errors.New("Invalid region") -// errAccessKeyIDInvalid means that the accessKeyID element from -// credential tag in Authorization header is invalid. +// errAccessKeyIDInvalid means that the accessKeyID element from credential tag in Authorization header is invalid. var errAccessKeyIDInvalid = errors.New("AccessKeyID invalid") // errUnsupportedAlgorithm means that the provided X-Amz-Algorithm is unsupported. var errUnsupportedAlgorithm = errors.New("Unsupported Algorithm") -// errPolicyAlreadyExpired means that the client request carries an post policy -// header which is already expired. +// errPolicyAlreadyExpired means that the client request carries an post policy header which is already expired. var errPolicyAlreadyExpired = errors.New("Policy already expired") // errPolicyMissingFields means that form values and policy header have some fields missing.