From 7c37e9d06abe598230a4ad67cc4152b5b022027e Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Thu, 2 Jul 2015 20:31:22 -0700 Subject: [PATCH] Make donut fully integrated back into API handlers --- commands.go | 12 - pkg/server/api/api.go | 12 +- pkg/server/api/bucket-handlers.go | 246 +++++++++--- pkg/server/api/object-handlers.go | 310 +++++++++++---- pkg/server/router.go | 14 +- pkg/server/server.go | 8 +- pkg/storage/donut/bucket.go | 9 +- pkg/storage/donut/common.go | 21 + pkg/storage/donut/config.go | 87 ++++ pkg/storage/donut/definitions.go | 4 +- pkg/storage/donut/{donut.go => donut-v1.go} | 211 ++++------ .../donut/{donut_test.go => donut-v1_test.go} | 27 +- pkg/storage/donut/{cache.go => donut-v2.go} | 373 +++++++++--------- .../donut/{cache_test.go => donut-v2_test.go} | 29 +- pkg/storage/donut/interfaces.go | 27 +- pkg/storage/donut/management.go | 24 +- .../{cache-multipart.go => multipart.go} | 146 +++---- pkg/storage/donut/rebalance.go | 6 +- 18 files changed, 971 insertions(+), 595 deletions(-) create mode 100644 pkg/storage/donut/config.go rename pkg/storage/donut/{donut.go => donut-v1.go} (57%) rename pkg/storage/donut/{donut_test.go => donut-v1_test.go} (89%) rename pkg/storage/donut/{cache.go => donut-v2.go} (63%) rename pkg/storage/donut/{cache_test.go => donut-v2_test.go} (88%) rename pkg/storage/donut/{cache-multipart.go => multipart.go} (75%) diff --git a/commands.go b/commands.go index a613b766d..1aedd3b8d 100644 --- a/commands.go +++ b/commands.go @@ -8,18 +8,6 @@ import ( "github.com/minio/minio/pkg/server/api" ) -func removeDuplicates(slice []string) []string { - newSlice := []string{} - seen := make(map[string]struct{}) - for _, val := range slice { - if _, ok := seen[val]; !ok { - newSlice = append(newSlice, val) - seen[val] = struct{}{} - } - } - return newSlice -} - var commands = []cli.Command{ serverCmd, controlCmd, diff --git a/pkg/server/api/api.go b/pkg/server/api/api.go index b84a3563e..8306e7c0b 100644 --- a/pkg/server/api/api.go +++ b/pkg/server/api/api.go @@ -16,6 +16,8 @@ package api +import "github.com/minio/minio/pkg/storage/donut" + // Operation container for individual operations read by Ticket Master type Operation struct { ProceedCh chan struct{} @@ -23,10 +25,16 @@ type Operation struct { // Minio container for API and also carries OP (operation) channel type Minio struct { - OP chan Operation + OP chan Operation + Donut donut.Interface } // New instantiate a new minio API func New() Minio { - return Minio{OP: make(chan Operation)} + // ignore errors for now + d, _ := donut.LoadDonut() + return Minio{ + OP: make(chan Operation), + Donut: d, + } } diff --git a/pkg/server/api/bucket-handlers.go b/pkg/server/api/bucket-handlers.go index ef6f08bda..cbdc6b550 100644 --- a/pkg/server/api/bucket-handlers.go +++ b/pkg/server/api/bucket-handlers.go @@ -17,16 +17,51 @@ package api import ( - "log" "net/http" "github.com/gorilla/mux" + "github.com/minio/minio/pkg/iodine" + "github.com/minio/minio/pkg/storage/donut" + "github.com/minio/minio/pkg/utils/log" ) func (api Minio) isValidOp(w http.ResponseWriter, req *http.Request, acceptsContentType contentType) bool { vars := mux.Vars(req) bucket := vars["bucket"] - log.Println(bucket) + + bucketMetadata, err := api.Donut.GetBucketMetadata(bucket) + switch iodine.ToError(err).(type) { + case donut.BucketNotFound: + { + writeErrorResponse(w, req, NoSuchBucket, acceptsContentType, req.URL.Path) + return false + } + case donut.BucketNameInvalid: + { + writeErrorResponse(w, req, InvalidBucketName, acceptsContentType, req.URL.Path) + return false + } + case nil: + if _, err := stripAuth(req); err != nil { + if bucketMetadata.ACL.IsPrivate() { + return true + //uncomment this when we have webcli + //writeErrorResponse(w, req, AccessDenied, acceptsContentType, req.URL.Path) + //return false + } + if bucketMetadata.ACL.IsPublicRead() && req.Method == "PUT" { + return true + //uncomment this when we have webcli + //writeErrorResponse(w, req, AccessDenied, acceptsContentType, req.URL.Path) + //return false + } + } + default: + { + log.Error.Println(iodine.New(err, nil)) + writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path) + } + } return true } @@ -38,18 +73,16 @@ func (api Minio) isValidOp(w http.ResponseWriter, req *http.Request, acceptsCont // This operation returns at most 1,000 multipart uploads in the response. // func (api Minio) ListMultipartUploadsHandler(w http.ResponseWriter, req *http.Request) { - acceptsContentType := getContentType(req) - - op := Operation{} - op.ProceedCh = make(chan struct{}) - api.OP <- op - // block until Ticket master gives us a go - <-op.ProceedCh + // Ticket master block { - // do you operation + op := Operation{} + op.ProceedCh = make(chan struct{}) + api.OP <- op + // block until ticket master gives us a go + <-op.ProceedCh } - log.Println(acceptsContentType) + acceptsContentType := getContentType(req) resources := getBucketMultipartResources(req.URL.Query()) if resources.MaxUploads == 0 { resources.MaxUploads = maxObjectList @@ -57,7 +90,29 @@ func (api Minio) ListMultipartUploadsHandler(w http.ResponseWriter, req *http.Re vars := mux.Vars(req) bucket := vars["bucket"] - log.Println(bucket) + + resources, err := api.Donut.ListMultipartUploads(bucket, resources) + switch iodine.ToError(err).(type) { + case nil: // success + { + // generate response + response := generateListMultipartUploadsResult(bucket, resources) + encodedSuccessResponse := encodeSuccessResponse(response, acceptsContentType) + // write headers + setCommonHeaders(w, getContentTypeString(acceptsContentType), len(encodedSuccessResponse)) + // write body + w.Write(encodedSuccessResponse) + } + case donut.BucketNotFound: + { + writeErrorResponse(w, req, NoSuchBucket, acceptsContentType, req.URL.Path) + } + default: + { + log.Error.Println(iodine.New(err, nil)) + writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path) + } + } } // ListObjectsHandler - GET Bucket (List Objects) @@ -67,18 +122,16 @@ func (api Minio) ListMultipartUploadsHandler(w http.ResponseWriter, req *http.Re // criteria to return a subset of the objects in a bucket. // func (api Minio) ListObjectsHandler(w http.ResponseWriter, req *http.Request) { - acceptsContentType := getContentType(req) - - op := Operation{} - op.ProceedCh = make(chan struct{}) - api.OP <- op - // block until Ticket master gives us a go - <-op.ProceedCh + // Ticket master block { - // do you operation + op := Operation{} + op.ProceedCh = make(chan struct{}) + api.OP <- op + // block until Ticket master gives us a go + <-op.ProceedCh } - log.Println(acceptsContentType) + acceptsContentType := getContentType(req) // verify if bucket allows this operation if !api.isValidOp(w, req, acceptsContentType) { return @@ -96,8 +149,25 @@ func (api Minio) ListObjectsHandler(w http.ResponseWriter, req *http.Request) { vars := mux.Vars(req) bucket := vars["bucket"] - log.Println(bucket) + objects, resources, err := api.Donut.ListObjects(bucket, resources) + switch iodine.ToError(err).(type) { + case nil: + // generate response + response := generateListObjectsResponse(bucket, objects, resources) + encodedSuccessResponse := encodeSuccessResponse(response, acceptsContentType) + // write headers + setCommonHeaders(w, getContentTypeString(acceptsContentType), len(encodedSuccessResponse)) + // write body + w.Write(encodedSuccessResponse) + case donut.ObjectNotFound: + writeErrorResponse(w, req, NoSuchKey, acceptsContentType, req.URL.Path) + case donut.ObjectNameInvalid: + writeErrorResponse(w, req, NoSuchKey, acceptsContentType, req.URL.Path) + default: + log.Error.Println(iodine.New(err, nil)) + writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path) + } } // ListBucketsHandler - GET Service @@ -105,6 +175,15 @@ func (api Minio) ListObjectsHandler(w http.ResponseWriter, req *http.Request) { // This implementation of the GET operation returns a list of all buckets // owned by the authenticated sender of the request. func (api Minio) ListBucketsHandler(w http.ResponseWriter, req *http.Request) { + // Ticket master block + { + op := Operation{} + op.ProceedCh = make(chan struct{}) + api.OP <- op + // block until Ticket master gives us a go + <-op.ProceedCh + } + acceptsContentType := getContentType(req) // uncomment this when we have webcli // without access key credentials one cannot list buckets @@ -112,21 +191,36 @@ func (api Minio) ListBucketsHandler(w http.ResponseWriter, req *http.Request) { // writeErrorResponse(w, req, AccessDenied, acceptsContentType, req.URL.Path) // return // } - op := Operation{} - op.ProceedCh = make(chan struct{}) - api.OP <- op - // block until Ticket master gives us a go - <-op.ProceedCh - { - // do you operation + + buckets, err := api.Donut.ListBuckets() + switch iodine.ToError(err).(type) { + case nil: + // generate response + response := generateListBucketsResponse(buckets) + encodedSuccessResponse := encodeSuccessResponse(response, acceptsContentType) + // write headers + setCommonHeaders(w, getContentTypeString(acceptsContentType), len(encodedSuccessResponse)) + // write response + w.Write(encodedSuccessResponse) + default: + log.Error.Println(iodine.New(err, nil)) + writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path) } - log.Println(acceptsContentType) } // PutBucketHandler - PUT Bucket // ---------- // This implementation of the PUT operation creates a new bucket for authenticated request func (api Minio) PutBucketHandler(w http.ResponseWriter, req *http.Request) { + // Ticket master block + { + op := Operation{} + op.ProceedCh = make(chan struct{}) + api.OP <- op + // block until Ticket master gives us a go + <-op.ProceedCh + } + acceptsContentType := getContentType(req) // uncomment this when we have webcli // without access key credentials one cannot create a bucket @@ -134,15 +228,6 @@ func (api Minio) PutBucketHandler(w http.ResponseWriter, req *http.Request) { // writeErrorResponse(w, req, AccessDenied, acceptsContentType, req.URL.Path) // return // } - op := Operation{} - op.ProceedCh = make(chan struct{}) - api.OP <- op - // block until Ticket master gives us a go - <-op.ProceedCh - { - // do you operation - } - log.Println(acceptsContentType) if isRequestBucketACL(req.URL.Query()) { api.PutBucketACLHandler(w, req) @@ -157,24 +242,39 @@ func (api Minio) PutBucketHandler(w http.ResponseWriter, req *http.Request) { vars := mux.Vars(req) bucket := vars["bucket"] - log.Println(bucket) + + err := api.Donut.MakeBucket(bucket, getACLTypeString(aclType)) + switch iodine.ToError(err).(type) { + case nil: + // Make sure to add Location information here only for bucket + w.Header().Set("Location", "/"+bucket) + writeSuccessResponse(w, acceptsContentType) + case donut.TooManyBuckets: + writeErrorResponse(w, req, TooManyBuckets, acceptsContentType, req.URL.Path) + case donut.BucketNameInvalid: + writeErrorResponse(w, req, InvalidBucketName, acceptsContentType, req.URL.Path) + case donut.BucketExists: + writeErrorResponse(w, req, BucketAlreadyExists, acceptsContentType, req.URL.Path) + default: + log.Error.Println(iodine.New(err, nil)) + writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path) + } } // PutBucketACLHandler - PUT Bucket ACL // ---------- // This implementation of the PUT operation modifies the bucketACL for authenticated request func (api Minio) PutBucketACLHandler(w http.ResponseWriter, req *http.Request) { - acceptsContentType := getContentType(req) - - op := Operation{} - op.ProceedCh = make(chan struct{}) - api.OP <- op - // block until Ticket master gives us a go - <-op.ProceedCh + // Ticket master block { - // do you operation + op := Operation{} + op.ProceedCh = make(chan struct{}) + api.OP <- op + // block until Ticket master gives us a go + <-op.ProceedCh } - log.Println(acceptsContentType) + + acceptsContentType := getContentType(req) // read from 'x-amz-acl' aclType := getACLType(req) @@ -185,7 +285,19 @@ func (api Minio) PutBucketACLHandler(w http.ResponseWriter, req *http.Request) { vars := mux.Vars(req) bucket := vars["bucket"] - log.Println(bucket) + + err := api.Donut.SetBucketMetadata(bucket, map[string]string{"acl": getACLTypeString(aclType)}) + switch iodine.ToError(err).(type) { + case nil: + writeSuccessResponse(w, acceptsContentType) + case donut.BucketNameInvalid: + writeErrorResponse(w, req, InvalidBucketName, acceptsContentType, req.URL.Path) + case donut.BucketNotFound: + writeErrorResponse(w, req, NoSuchBucket, acceptsContentType, req.URL.Path) + default: + log.Error.Println(iodine.New(err, nil)) + writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path) + } } // HeadBucketHandler - HEAD Bucket @@ -195,19 +307,33 @@ func (api Minio) PutBucketACLHandler(w http.ResponseWriter, req *http.Request) { // have permission to access it. Otherwise, the operation might // return responses such as 404 Not Found and 403 Forbidden. func (api Minio) HeadBucketHandler(w http.ResponseWriter, req *http.Request) { - acceptsContentType := getContentType(req) - - op := Operation{} - op.ProceedCh = make(chan struct{}) - api.OP <- op - // block until Ticket master gives us a go - <-op.ProceedCh + // Ticket master block { - // do you operation + op := Operation{} + op.ProceedCh = make(chan struct{}) + api.OP <- op + // block until Ticket master gives us a go + <-op.ProceedCh } - log.Println(acceptsContentType) + + acceptsContentType := getContentType(req) vars := mux.Vars(req) bucket := vars["bucket"] - log.Println(bucket) + + _, err := api.Donut.GetBucketMetadata(bucket) + switch iodine.ToError(err).(type) { + case nil: + writeSuccessResponse(w, acceptsContentType) + case donut.BucketNotFound: + error := getErrorCode(NoSuchBucket) + w.WriteHeader(error.HTTPStatusCode) + case donut.BucketNameInvalid: + error := getErrorCode(InvalidBucketName) + w.WriteHeader(error.HTTPStatusCode) + default: + log.Error.Println(iodine.New(err, nil)) + error := getErrorCode(InternalError) + w.WriteHeader(error.HTTPStatusCode) + } } diff --git a/pkg/server/api/object-handlers.go b/pkg/server/api/object-handlers.go index b1c9dc807..e09dfe64c 100644 --- a/pkg/server/api/object-handlers.go +++ b/pkg/server/api/object-handlers.go @@ -9,7 +9,7 @@ * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implieapi.Donut. * See the License for the specific language governing permissions and * limitations under the License. */ @@ -25,6 +25,7 @@ import ( "github.com/gorilla/mux" "github.com/minio/minio/pkg/iodine" + "github.com/minio/minio/pkg/storage/donut" "github.com/minio/minio/pkg/utils/log" ) @@ -37,17 +38,16 @@ const ( // This implementation of the GET operation retrieves object. To use GET, // you must have READ access to the object. func (api Minio) GetObjectHandler(w http.ResponseWriter, req *http.Request) { - acceptsContentType := getContentType(req) - - op := Operation{} - op.ProceedCh = make(chan struct{}) - api.OP <- op - // block until Ticket master gives us a go - <-op.ProceedCh + // ticket master block { - // do you operation + op := Operation{} + op.ProceedCh = make(chan struct{}) + api.OP <- op + // block until Ticket master gives us a go + <-op.ProceedCh } - log.Println(acceptsContentType) + + acceptsContentType := getContentType(req) // verify if this operation is allowed if !api.isValidOp(w, req, acceptsContentType) { @@ -58,25 +58,57 @@ func (api Minio) GetObjectHandler(w http.ResponseWriter, req *http.Request) { vars := mux.Vars(req) bucket = vars["bucket"] object = vars["object"] - log.Println(bucket, object) + metadata, err := api.Donut.GetObjectMetadata(bucket, object) + switch iodine.ToError(err).(type) { + case nil: // success + { + httpRange, err := getRequestedRange(req, metadata.Size) + if err != nil { + writeErrorResponse(w, req, InvalidRange, acceptsContentType, req.URL.Path) + return + } + switch httpRange.start == 0 && httpRange.length == 0 { + case true: + setObjectHeaders(w, metadata) + if _, err := api.Donut.GetObject(w, bucket, object); err != nil { + // unable to write headers, we've already printed data. Just close the connection. + log.Error.Println(iodine.New(err, nil)) + } + case false: + metadata.Size = httpRange.length + setRangeObjectHeaders(w, metadata, httpRange) + w.WriteHeader(http.StatusPartialContent) + if _, err := api.Donut.GetPartialObject(w, bucket, object, httpRange.start, httpRange.length); err != nil { + // unable to write headers, we've already printed data. Just close the connection. + log.Error.Println(iodine.New(err, nil)) + } + } + } + case donut.ObjectNotFound: + writeErrorResponse(w, req, NoSuchKey, acceptsContentType, req.URL.Path) + case donut.ObjectNameInvalid: + writeErrorResponse(w, req, NoSuchKey, acceptsContentType, req.URL.Path) + default: + log.Error.Println(iodine.New(err, nil)) + writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path) + } } // HeadObjectHandler - HEAD Object // ----------- // The HEAD operation retrieves metadata from an object without returning the object itself. func (api Minio) HeadObjectHandler(w http.ResponseWriter, req *http.Request) { - acceptsContentType := getContentType(req) - - op := Operation{} - op.ProceedCh = make(chan struct{}) - api.OP <- op - // block until Ticket master gives us a go - <-op.ProceedCh + // ticket master block { - // do you operation + op := Operation{} + op.ProceedCh = make(chan struct{}) + api.OP <- op + // block until Ticket master gives us a go + <-op.ProceedCh } - log.Println(acceptsContentType) + + acceptsContentType := getContentType(req) // verify if this operation is allowed if !api.isValidOp(w, req, acceptsContentType) { @@ -87,25 +119,42 @@ func (api Minio) HeadObjectHandler(w http.ResponseWriter, req *http.Request) { vars := mux.Vars(req) bucket = vars["bucket"] object = vars["object"] - log.Println(bucket, object) + + metadata, err := api.Donut.GetObjectMetadata(bucket, object) + switch iodine.ToError(err).(type) { + case nil: + setObjectHeaders(w, metadata) + w.WriteHeader(http.StatusOK) + case donut.ObjectNotFound: + error := getErrorCode(NoSuchKey) + w.Header().Set("Server", "Minio") + w.WriteHeader(error.HTTPStatusCode) + case donut.ObjectNameInvalid: + error := getErrorCode(NoSuchKey) + w.Header().Set("Server", "Minio") + w.WriteHeader(error.HTTPStatusCode) + default: + log.Error.Println(iodine.New(err, nil)) + error := getErrorCode(InternalError) + w.Header().Set("Server", "Minio") + w.WriteHeader(error.HTTPStatusCode) + } } // PutObjectHandler - PUT Object // ---------- // This implementation of the PUT operation adds an object to a bucket. func (api Minio) PutObjectHandler(w http.ResponseWriter, req *http.Request) { - acceptsContentType := getContentType(req) - - op := Operation{} - op.ProceedCh = make(chan struct{}) - api.OP <- op - // block until Ticket master gives us a go - <-op.ProceedCh + // Ticket master block { - // do you operation + op := Operation{} + op.ProceedCh = make(chan struct{}) + api.OP <- op + // block until Ticket master gives us a go + <-op.ProceedCh } - log.Println(acceptsContentType) + acceptsContentType := getContentType(req) // verify if this operation is allowed if !api.isValidOp(w, req, acceptsContentType) { return @@ -122,7 +171,7 @@ func (api Minio) PutObjectHandler(w http.ResponseWriter, req *http.Request) { writeErrorResponse(w, req, InvalidDigest, acceptsContentType, req.URL.Path) return } - /// if Content-Length missing, throw away + /// if Content-Length missing, deny the request size := req.Header.Get("Content-Length") if size == "" { writeErrorResponse(w, req, MissingContentLength, acceptsContentType, req.URL.Path) @@ -148,24 +197,40 @@ func (api Minio) PutObjectHandler(w http.ResponseWriter, req *http.Request) { writeErrorResponse(w, req, InvalidRequest, acceptsContentType, req.URL.Path) return } - log.Println(bucket, object, sizeInt64) + + metadata, err := api.Donut.CreateObject(bucket, object, md5, sizeInt64, req.Body, nil) + switch iodine.ToError(err).(type) { + case nil: + w.Header().Set("ETag", metadata.MD5Sum) + writeSuccessResponse(w, acceptsContentType) + case donut.ObjectExists: + writeErrorResponse(w, req, MethodNotAllowed, acceptsContentType, req.URL.Path) + case donut.BadDigest: + writeErrorResponse(w, req, BadDigest, acceptsContentType, req.URL.Path) + case donut.EntityTooLarge: + writeErrorResponse(w, req, EntityTooLarge, acceptsContentType, req.URL.Path) + case donut.InvalidDigest: + writeErrorResponse(w, req, InvalidDigest, acceptsContentType, req.URL.Path) + default: + log.Error.Println(iodine.New(err, nil)) + writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path) + } } /// Multipart API // NewMultipartUploadHandler - New multipart upload func (api Minio) NewMultipartUploadHandler(w http.ResponseWriter, req *http.Request) { - acceptsContentType := getContentType(req) - - op := Operation{} - op.ProceedCh = make(chan struct{}) - api.OP <- op - // block until Ticket master gives us a go - <-op.ProceedCh + // Ticket master block { - // do you operation + op := Operation{} + op.ProceedCh = make(chan struct{}) + api.OP <- op + // block until Ticket master gives us a go + <-op.ProceedCh } - log.Println(acceptsContentType) + + acceptsContentType := getContentType(req) // handle ACL's here at bucket level if !api.isValidOp(w, req, acceptsContentType) { @@ -181,22 +246,38 @@ func (api Minio) NewMultipartUploadHandler(w http.ResponseWriter, req *http.Requ vars := mux.Vars(req) bucket = vars["bucket"] object = vars["object"] - log.Println(bucket, object) + + uploadID, err := api.Donut.NewMultipartUpload(bucket, object, "") + switch iodine.ToError(err).(type) { + case nil: + { + response := generateInitiateMultipartUploadResult(bucket, object, uploadID) + encodedSuccessResponse := encodeSuccessResponse(response, acceptsContentType) + // write headers + setCommonHeaders(w, getContentTypeString(acceptsContentType), len(encodedSuccessResponse)) + // write body + w.Write(encodedSuccessResponse) + } + case donut.ObjectExists: + writeErrorResponse(w, req, MethodNotAllowed, acceptsContentType, req.URL.Path) + default: + log.Error.Println(iodine.New(err, nil)) + writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path) + } } // PutObjectPartHandler - Upload part func (api Minio) PutObjectPartHandler(w http.ResponseWriter, req *http.Request) { - acceptsContentType := getContentType(req) - - op := Operation{} - op.ProceedCh = make(chan struct{}) - api.OP <- op - // block until Ticket master gives us a go - <-op.ProceedCh + // Ticket master block { - // do you operation + op := Operation{} + op.ProceedCh = make(chan struct{}) + api.OP <- op + // block until Ticket master gives us a go + <-op.ProceedCh } - log.Println(acceptsContentType) + + acceptsContentType := getContentType(req) // handle ACL's here at bucket level if !api.isValidOp(w, req, acceptsContentType) { @@ -232,7 +313,6 @@ func (api Minio) PutObjectPartHandler(w http.ResponseWriter, req *http.Request) vars := mux.Vars(req) bucket := vars["bucket"] object := vars["object"] - log.Println(bucket, object, sizeInt64) uploadID := req.URL.Query().Get("uploadId") partIDString := req.URL.Query().Get("partNumber") @@ -241,22 +321,40 @@ func (api Minio) PutObjectPartHandler(w http.ResponseWriter, req *http.Request) if err != nil { writeErrorResponse(w, req, InvalidPart, acceptsContentType, req.URL.Path) } - log.Println(uploadID, partID) + + calculatedMD5, err := api.Donut.CreateObjectPart(bucket, object, uploadID, partID, "", md5, sizeInt64, req.Body) + switch iodine.ToError(err).(type) { + case nil: + w.Header().Set("ETag", calculatedMD5) + writeSuccessResponse(w, acceptsContentType) + case donut.InvalidUploadID: + writeErrorResponse(w, req, NoSuchUpload, acceptsContentType, req.URL.Path) + case donut.ObjectExists: + writeErrorResponse(w, req, MethodNotAllowed, acceptsContentType, req.URL.Path) + case donut.BadDigest: + writeErrorResponse(w, req, BadDigest, acceptsContentType, req.URL.Path) + case donut.EntityTooLarge: + writeErrorResponse(w, req, EntityTooLarge, acceptsContentType, req.URL.Path) + case donut.InvalidDigest: + writeErrorResponse(w, req, InvalidDigest, acceptsContentType, req.URL.Path) + default: + log.Error.Println(iodine.New(err, nil)) + writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path) + } } // AbortMultipartUploadHandler - Abort multipart upload func (api Minio) AbortMultipartUploadHandler(w http.ResponseWriter, req *http.Request) { - acceptsContentType := getContentType(req) - - op := Operation{} - op.ProceedCh = make(chan struct{}) - api.OP <- op - // block until Ticket master gives us a go - <-op.ProceedCh + // Ticket master block { - // do you operation + op := Operation{} + op.ProceedCh = make(chan struct{}) + api.OP <- op + // block until Ticket master gives us a go + <-op.ProceedCh } - log.Println(acceptsContentType) + + acceptsContentType := getContentType(req) // handle ACL's here at bucket level if !api.isValidOp(w, req, acceptsContentType) { @@ -267,23 +365,33 @@ func (api Minio) AbortMultipartUploadHandler(w http.ResponseWriter, req *http.Re bucket := vars["bucket"] object := vars["object"] - //objectResourcesMetadata := getObjectResources(req.URL.Query()) - log.Println(bucket, object) + objectResourcesMetadata := getObjectResources(req.URL.Query()) + + err := api.Donut.AbortMultipartUpload(bucket, object, objectResourcesMetadata.UploadID) + switch iodine.ToError(err).(type) { + case nil: + setCommonHeaders(w, getContentTypeString(acceptsContentType), 0) + w.WriteHeader(http.StatusNoContent) + case donut.InvalidUploadID: + writeErrorResponse(w, req, NoSuchUpload, acceptsContentType, req.URL.Path) + default: + log.Error.Println(iodine.New(err, nil)) + writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path) + } } // ListObjectPartsHandler - List object parts func (api Minio) ListObjectPartsHandler(w http.ResponseWriter, req *http.Request) { - acceptsContentType := getContentType(req) - - op := Operation{} - op.ProceedCh = make(chan struct{}) - api.OP <- op - // block until Ticket master gives us a go - <-op.ProceedCh + // Ticket master block { - // do you operation + op := Operation{} + op.ProceedCh = make(chan struct{}) + api.OP <- op + // block until Ticket master gives us a go + <-op.ProceedCh } - log.Println(acceptsContentType) + + acceptsContentType := getContentType(req) // handle ACL's here at bucket level if !api.isValidOp(w, req, acceptsContentType) { @@ -298,22 +406,38 @@ func (api Minio) ListObjectPartsHandler(w http.ResponseWriter, req *http.Request vars := mux.Vars(req) bucket := vars["bucket"] object := vars["object"] - log.Println(bucket, object) + + objectResourcesMetadata, err := api.Donut.ListObjectParts(bucket, object, objectResourcesMetadata) + switch iodine.ToError(err).(type) { + case nil: + { + response := generateListPartsResult(objectResourcesMetadata) + encodedSuccessResponse := encodeSuccessResponse(response, acceptsContentType) + // write headers + setCommonHeaders(w, getContentTypeString(acceptsContentType), len(encodedSuccessResponse)) + // write body + w.Write(encodedSuccessResponse) + } + case donut.InvalidUploadID: + writeErrorResponse(w, req, NoSuchUpload, acceptsContentType, req.URL.Path) + default: + log.Error.Println(iodine.New(err, nil)) + writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path) + } } // CompleteMultipartUploadHandler - Complete multipart upload func (api Minio) CompleteMultipartUploadHandler(w http.ResponseWriter, req *http.Request) { - acceptsContentType := getContentType(req) - - op := Operation{} - op.ProceedCh = make(chan struct{}) - api.OP <- op - // block until Ticket master gives us a go - <-op.ProceedCh + // Ticket master block { - // do you operation + op := Operation{} + op.ProceedCh = make(chan struct{}) + api.OP <- op + // block until Ticket master gives us a go + <-op.ProceedCh } - log.Println(acceptsContentType) + + acceptsContentType := getContentType(req) // handle ACL's here at bucket level if !api.isValidOp(w, req, acceptsContentType) { @@ -336,15 +460,31 @@ func (api Minio) CompleteMultipartUploadHandler(w http.ResponseWriter, req *http vars := mux.Vars(req) bucket := vars["bucket"] object := vars["object"] - log.Println(bucket, object) - //objectResourcesMetadata := getObjectResources(req.URL.Query()) + objectResourcesMetadata := getObjectResources(req.URL.Query()) partMap := make(map[int]string) for _, part := range parts.Part { partMap[part.PartNumber] = part.ETag } + metadata, err := api.Donut.CompleteMultipartUpload(bucket, object, objectResourcesMetadata.UploadID, partMap) + switch iodine.ToError(err).(type) { + case nil: + { + response := generateCompleteMultpartUploadResult(bucket, object, "", metadata.MD5Sum) + encodedSuccessResponse := encodeSuccessResponse(response, acceptsContentType) + // write headers + setCommonHeaders(w, getContentTypeString(acceptsContentType), len(encodedSuccessResponse)) + // write body + w.Write(encodedSuccessResponse) + } + case donut.InvalidUploadID: + writeErrorResponse(w, req, NoSuchUpload, acceptsContentType, req.URL.Path) + default: + log.Error.Println(iodine.New(err, nil)) + writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path) + } } /// Delete API diff --git a/pkg/server/router.go b/pkg/server/router.go index c3ba765e1..a789843d5 100644 --- a/pkg/server/router.go +++ b/pkg/server/router.go @@ -24,9 +24,9 @@ import ( "github.com/minio/minio/pkg/server/rpc" ) +// Get api func getAPI() api.Minio { - a := api.New() - return a + return api.New() } // registerAPI - register all the object API handlers to their respective paths @@ -82,15 +82,15 @@ func registerChain(handlers ...handlerFunc) chain { return ch } -// registerOtherMiddleware register all available middleware -func registerOtherMiddleware(mux http.Handler, conf api.Config) http.Handler { +// registerCustomMiddleware register all available custom middleware +func registerCustomMiddleware(mux http.Handler, conf api.Config) http.Handler { ch := registerChain( api.ValidContentTypeHandler, api.TimeValidityHandler, api.IgnoreResourcesHandler, api.ValidateAuthHeaderHandler, api.LoggingHandler, - // Add new middleware here + // Add new your new middleware here ) mux = ch.final(mux) @@ -109,7 +109,7 @@ func getAPIHandler(conf api.Config) (http.Handler, api.Minio) { mux := router.NewRouter() minioAPI := getAPI() apiHandler := registerAPI(mux, minioAPI) - apiHandler = registerOtherMiddleware(apiHandler, conf) + apiHandler = registerCustomMiddleware(apiHandler, conf) return apiHandler, minioAPI } @@ -120,6 +120,6 @@ func getRPCHandler() http.Handler { s.RegisterService(new(rpc.HelloService), "") s.RegisterService(new(rpc.VersionService), "") s.RegisterService(new(rpc.GetSysInfoService), "") - // add more services here + // Add new services here return registerRPC(router.NewRouter(), s) } diff --git a/pkg/server/server.go b/pkg/server/server.go index a383732c2..999c36efe 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -25,6 +25,7 @@ import ( "github.com/minio/minio/pkg/server/api" ) +// Start API listener func startAPI(errCh chan error, conf api.Config, apiHandler http.Handler) { defer close(errCh) @@ -74,6 +75,7 @@ func startAPI(errCh chan error, conf api.Config, apiHandler http.Handler) { } } +// Start RPC listener func startRPC(errCh chan error, rpcHandler http.Handler) { defer close(errCh) @@ -86,10 +88,11 @@ func startRPC(errCh chan error, rpcHandler http.Handler) { errCh <- httpServer.ListenAndServe() } +// Start ticket master func startTM(a api.Minio) { for { for op := range a.OP { - close(op.ProceedCh) + op.ProceedCh <- struct{}{} } } } @@ -101,8 +104,7 @@ func StartServices(conf api.Config) error { apiHandler, minioAPI := getAPIHandler(conf) go startAPI(apiErrCh, conf, apiHandler) - rpcHandler := getRPCHandler() - go startRPC(rpcErrCh, rpcHandler) + go startRPC(rpcErrCh, getRPCHandler()) go startTM(minioAPI) select { diff --git a/pkg/storage/donut/bucket.go b/pkg/storage/donut/bucket.go index 6e4012902..ccc89ada8 100644 --- a/pkg/storage/donut/bucket.go +++ b/pkg/storage/donut/bucket.go @@ -57,6 +57,7 @@ func newBucket(bucketName, aclType, donutName string, nodes map[string]node) (bu "donutName": donutName, "aclType": aclType, } + if strings.TrimSpace(bucketName) == "" || strings.TrimSpace(donutName) == "" { return bucket{}, BucketMetadata{}, iodine.New(InvalidArgument{}, errParams) } @@ -130,7 +131,7 @@ func (b bucket) GetObjectMetadata(objectName string) (ObjectMetadata, error) { } // ListObjects - list all objects -func (b bucket) ListObjects(prefix, marker, delimiter string, maxkeys int) (ListObjects, error) { +func (b bucket) ListObjects(prefix, marker, delimiter string, maxkeys int) (ListObjectsResults, error) { b.lock.RLock() defer b.lock.RUnlock() if maxkeys <= 0 { @@ -140,7 +141,7 @@ func (b bucket) ListObjects(prefix, marker, delimiter string, maxkeys int) (List var objects []string bucketMetadata, err := b.getBucketMetadata() if err != nil { - return ListObjects{}, iodine.New(err, nil) + return ListObjectsResults{}, iodine.New(err, nil) } for objectName := range bucketMetadata.Buckets[b.getBucketName()].BucketObjects { if strings.HasPrefix(objectName, strings.TrimSpace(prefix)) { @@ -181,7 +182,7 @@ func (b bucket) ListObjects(prefix, marker, delimiter string, maxkeys int) (List commonPrefixes = RemoveDuplicates(commonPrefixes) sort.Strings(commonPrefixes) - listObjects := ListObjects{} + listObjects := ListObjectsResults{} listObjects.Objects = make(map[string]ObjectMetadata) listObjects.CommonPrefixes = commonPrefixes listObjects.IsTruncated = isTruncated @@ -189,7 +190,7 @@ func (b bucket) ListObjects(prefix, marker, delimiter string, maxkeys int) (List for _, objectName := range results { objMetadata, err := b.readObjectMetadata(normalizeObjectName(objectName)) if err != nil { - return ListObjects{}, iodine.New(err, nil) + return ListObjectsResults{}, iodine.New(err, nil) } listObjects.Objects[objectName] = objMetadata } diff --git a/pkg/storage/donut/common.go b/pkg/storage/donut/common.go index e55fc3739..b91fd3b1b 100644 --- a/pkg/storage/donut/common.go +++ b/pkg/storage/donut/common.go @@ -19,10 +19,31 @@ package donut import ( "bufio" "bytes" + "io" "sort" "strings" ) +// ProxyWriter implements io.Writer to trap written bytes +type ProxyWriter struct { + writer io.Writer + writtenBytes []byte +} + +func (r *ProxyWriter) Write(p []byte) (n int, err error) { + n, err = r.writer.Write(p) + if err != nil { + return + } + r.writtenBytes = append(r.writtenBytes, p[0:n]...) + return +} + +// NewProxyWriter - wrap around a given writer with ProxyWriter +func NewProxyWriter(w io.Writer) *ProxyWriter { + return &ProxyWriter{writer: w, writtenBytes: nil} +} + // Delimiter delims the string at delimiter func Delimiter(object, delimiter string) string { readBuffer := bytes.NewBufferString(object) diff --git a/pkg/storage/donut/config.go b/pkg/storage/donut/config.go new file mode 100644 index 000000000..4934f5714 --- /dev/null +++ b/pkg/storage/donut/config.go @@ -0,0 +1,87 @@ +/* + * Minimalist Object Storage, (C) 2015 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package donut + +import ( + "os/user" + "path/filepath" + "time" + + "github.com/minio/minio/pkg/iodine" + "github.com/minio/minio/pkg/quick" +) + +// getDonutConfigPath get donut config file path +func getDonutConfigPath() (string, error) { + u, err := user.Current() + if err != nil { + return "", iodine.New(err, nil) + } + donutConfigPath := filepath.Join(u.HomeDir, ".minio", "donut.json") + return donutConfigPath, nil +} + +// SaveConfig save donut config +func SaveConfig(a *Config) error { + donutConfigPath, err := getDonutConfigPath() + if err != nil { + return iodine.New(err, nil) + } + qc, err := quick.New(a) + if err != nil { + return iodine.New(err, nil) + } + if err := qc.Save(donutConfigPath); err != nil { + return iodine.New(err, nil) + } + return nil +} + +// LoadConfig load donut config +func LoadConfig() (*Config, error) { + donutConfigPath, err := getDonutConfigPath() + if err != nil { + return nil, iodine.New(err, nil) + } + a := &Config{} + a.Version = "0.0.1" + qc, err := quick.New(a) + if err != nil { + return nil, iodine.New(err, nil) + } + if err := qc.Load(donutConfigPath); err != nil { + return nil, iodine.New(err, nil) + } + return qc.Data().(*Config), nil +} + +// LoadDonut load donut from config +func LoadDonut() (Interface, error) { + conf, err := LoadConfig() + if err != nil { + conf = &Config{ + Version: "0.0.1", + MaxSize: 512000000, + Expiration: 1 * time.Hour, + } + } + donut, err := New(conf) + if err != nil { + return nil, iodine.New(err, nil) + } + return donut, nil +} diff --git a/pkg/storage/donut/definitions.go b/pkg/storage/donut/definitions.go index 333676fb1..b06a59148 100644 --- a/pkg/storage/donut/definitions.go +++ b/pkg/storage/donut/definitions.go @@ -65,8 +65,8 @@ type BucketMetadata struct { BucketObjects map[string]interface{} `json:"objects"` } -// ListObjects container for list objects response -type ListObjects struct { +// ListObjectsResults container for list objects response +type ListObjectsResults struct { Objects map[string]ObjectMetadata `json:"objects"` CommonPrefixes []string `json:"commonPrefixes"` IsTruncated bool `json:"isTruncated"` diff --git a/pkg/storage/donut/donut.go b/pkg/storage/donut/donut-v1.go similarity index 57% rename from pkg/storage/donut/donut.go rename to pkg/storage/donut/donut-v1.go index a34a0a5f0..54067c33a 100644 --- a/pkg/storage/donut/donut.go +++ b/pkg/storage/donut/donut-v1.go @@ -24,19 +24,10 @@ import ( "path/filepath" "strconv" "strings" - "sync" "github.com/minio/minio/pkg/iodine" ) -// donut struct internal data -type donut struct { - name string - buckets map[string]bucket - nodes map[string]node - lock *sync.RWMutex -} - // config files used inside Donut const ( // donut system config @@ -51,75 +42,41 @@ const ( bucketMetadataVersion = "1.0.0" ) -// attachDonutNode - wrapper function to instantiate a new node for associatedt donut -// based on the provided configuration -func (dt donut) attachDonutNode(hostname string, disks []string) error { - if err := dt.AttachNode(hostname, disks); err != nil { - return iodine.New(err, nil) - } - return nil -} - -// NewDonut - instantiate a new donut -func NewDonut(donutName string, nodeDiskMap map[string][]string) (Donut, error) { - if donutName == "" || len(nodeDiskMap) == 0 { - return nil, iodine.New(InvalidArgument{}, nil) - } - nodes := make(map[string]node) - buckets := make(map[string]bucket) - d := donut{ - name: donutName, - nodes: nodes, - buckets: buckets, - lock: new(sync.RWMutex), - } - for k, v := range nodeDiskMap { - if len(v) == 0 { - return nil, iodine.New(InvalidDisksArgument{}, nil) - } - err := d.attachDonutNode(k, v) - if err != nil { - return nil, iodine.New(err, nil) - } - } - return d, nil -} - -// MakeBucket - make a new bucket -func (dt donut) MakeBucket(bucket string, acl BucketACL) error { - dt.lock.Lock() - defer dt.lock.Unlock() +// makeBucket - make a new bucket +func (donut API) makeBucket(bucket string, acl BucketACL) error { + donut.lock.Lock() + defer donut.lock.Unlock() if bucket == "" || strings.TrimSpace(bucket) == "" { return iodine.New(InvalidArgument{}, nil) } - return dt.makeDonutBucket(bucket, acl.String()) + return donut.makeDonutBucket(bucket, acl.String()) } -// GetBucketMetadata - get bucket metadata -func (dt donut) GetBucketMetadata(bucketName string) (BucketMetadata, error) { - dt.lock.RLock() - defer dt.lock.RUnlock() - if err := dt.listDonutBuckets(); err != nil { +// getBucketMetadata - get bucket metadata +func (donut API) getBucketMetadata(bucketName string) (BucketMetadata, error) { + donut.lock.RLock() + defer donut.lock.RUnlock() + if err := donut.listDonutBuckets(); err != nil { return BucketMetadata{}, iodine.New(err, nil) } - if _, ok := dt.buckets[bucketName]; !ok { + if _, ok := donut.buckets[bucketName]; !ok { return BucketMetadata{}, iodine.New(BucketNotFound{Bucket: bucketName}, nil) } - metadata, err := dt.getDonutBucketMetadata() + metadata, err := donut.getDonutBucketMetadata() if err != nil { return BucketMetadata{}, iodine.New(err, nil) } return metadata.Buckets[bucketName], nil } -// SetBucketMetadata - set bucket metadata -func (dt donut) SetBucketMetadata(bucketName string, bucketMetadata map[string]string) error { - dt.lock.Lock() - defer dt.lock.Unlock() - if err := dt.listDonutBuckets(); err != nil { +// setBucketMetadata - set bucket metadata +func (donut API) setBucketMetadata(bucketName string, bucketMetadata map[string]string) error { + donut.lock.Lock() + defer donut.lock.Unlock() + if err := donut.listDonutBuckets(); err != nil { return iodine.New(err, nil) } - metadata, err := dt.getDonutBucketMetadata() + metadata, err := donut.getDonutBucketMetadata() if err != nil { return iodine.New(err, nil) } @@ -130,17 +87,17 @@ func (dt donut) SetBucketMetadata(bucketName string, bucketMetadata map[string]s } oldBucketMetadata.ACL = BucketACL(acl) metadata.Buckets[bucketName] = oldBucketMetadata - return dt.setDonutBucketMetadata(metadata) + return donut.setDonutBucketMetadata(metadata) } -// ListBuckets - return list of buckets -func (dt donut) ListBuckets() (map[string]BucketMetadata, error) { - dt.lock.RLock() - defer dt.lock.RUnlock() - if err := dt.listDonutBuckets(); err != nil { +// listBuckets - return list of buckets +func (donut API) listBuckets() (map[string]BucketMetadata, error) { + donut.lock.RLock() + defer donut.lock.RUnlock() + if err := donut.listDonutBuckets(); err != nil { return nil, iodine.New(err, nil) } - metadata, err := dt.getDonutBucketMetadata() + metadata, err := donut.getDonutBucketMetadata() if err != nil { // intentionally left out the error when Donut is empty // but we need to revisit this area in future - since we need @@ -150,10 +107,10 @@ func (dt donut) ListBuckets() (map[string]BucketMetadata, error) { return metadata.Buckets, nil } -// ListObjects - return list of objects -func (dt donut) ListObjects(bucket, prefix, marker, delimiter string, maxkeys int) (ListObjects, error) { - dt.lock.RLock() - defer dt.lock.RUnlock() +// listObjects - return list of objects +func (donut API) listObjects(bucket, prefix, marker, delimiter string, maxkeys int) (ListObjectsResults, error) { + donut.lock.RLock() + defer donut.lock.RUnlock() errParams := map[string]string{ "bucket": bucket, "prefix": prefix, @@ -161,23 +118,23 @@ func (dt donut) ListObjects(bucket, prefix, marker, delimiter string, maxkeys in "delimiter": delimiter, "maxkeys": strconv.Itoa(maxkeys), } - if err := dt.listDonutBuckets(); err != nil { - return ListObjects{}, iodine.New(err, errParams) + if err := donut.listDonutBuckets(); err != nil { + return ListObjectsResults{}, iodine.New(err, errParams) } - if _, ok := dt.buckets[bucket]; !ok { - return ListObjects{}, iodine.New(BucketNotFound{Bucket: bucket}, errParams) + if _, ok := donut.buckets[bucket]; !ok { + return ListObjectsResults{}, iodine.New(BucketNotFound{Bucket: bucket}, errParams) } - listObjects, err := dt.buckets[bucket].ListObjects(prefix, marker, delimiter, maxkeys) + listObjects, err := donut.buckets[bucket].ListObjects(prefix, marker, delimiter, maxkeys) if err != nil { - return ListObjects{}, iodine.New(err, errParams) + return ListObjectsResults{}, iodine.New(err, errParams) } return listObjects, nil } -// PutObject - put object -func (dt donut) PutObject(bucket, object, expectedMD5Sum string, reader io.Reader, metadata map[string]string) (ObjectMetadata, error) { - dt.lock.Lock() - defer dt.lock.Unlock() +// putObject - put object +func (donut API) putObject(bucket, object, expectedMD5Sum string, reader io.Reader, metadata map[string]string) (ObjectMetadata, error) { + donut.lock.Lock() + defer donut.lock.Unlock() errParams := map[string]string{ "bucket": bucket, "object": object, @@ -188,34 +145,34 @@ func (dt donut) PutObject(bucket, object, expectedMD5Sum string, reader io.Reade if object == "" || strings.TrimSpace(object) == "" { return ObjectMetadata{}, iodine.New(InvalidArgument{}, errParams) } - if err := dt.listDonutBuckets(); err != nil { + if err := donut.listDonutBuckets(); err != nil { return ObjectMetadata{}, iodine.New(err, errParams) } - if _, ok := dt.buckets[bucket]; !ok { + if _, ok := donut.buckets[bucket]; !ok { return ObjectMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, nil) } - bucketMeta, err := dt.getDonutBucketMetadata() + bucketMeta, err := donut.getDonutBucketMetadata() if err != nil { return ObjectMetadata{}, iodine.New(err, errParams) } if _, ok := bucketMeta.Buckets[bucket].BucketObjects[object]; ok { return ObjectMetadata{}, iodine.New(ObjectExists{Object: object}, errParams) } - objMetadata, err := dt.buckets[bucket].WriteObject(object, reader, expectedMD5Sum, metadata) + objMetadata, err := donut.buckets[bucket].WriteObject(object, reader, expectedMD5Sum, metadata) if err != nil { return ObjectMetadata{}, iodine.New(err, errParams) } bucketMeta.Buckets[bucket].BucketObjects[object] = 1 - if err := dt.setDonutBucketMetadata(bucketMeta); err != nil { + if err := donut.setDonutBucketMetadata(bucketMeta); err != nil { return ObjectMetadata{}, iodine.New(err, errParams) } return objMetadata, nil } -// GetObject - get object -func (dt donut) GetObject(bucket, object string) (reader io.ReadCloser, size int64, err error) { - dt.lock.RLock() - defer dt.lock.RUnlock() +// getObject - get object +func (donut API) getObject(bucket, object string) (reader io.ReadCloser, size int64, err error) { + donut.lock.RLock() + defer donut.lock.RUnlock() errParams := map[string]string{ "bucket": bucket, "object": object, @@ -226,37 +183,37 @@ func (dt donut) GetObject(bucket, object string) (reader io.ReadCloser, size int if object == "" || strings.TrimSpace(object) == "" { return nil, 0, iodine.New(InvalidArgument{}, errParams) } - if err := dt.listDonutBuckets(); err != nil { + if err := donut.listDonutBuckets(); err != nil { return nil, 0, iodine.New(err, nil) } - if _, ok := dt.buckets[bucket]; !ok { + if _, ok := donut.buckets[bucket]; !ok { return nil, 0, iodine.New(BucketNotFound{Bucket: bucket}, errParams) } - return dt.buckets[bucket].ReadObject(object) + return donut.buckets[bucket].ReadObject(object) } -// GetObjectMetadata - get object metadata -func (dt donut) GetObjectMetadata(bucket, object string) (ObjectMetadata, error) { - dt.lock.RLock() - defer dt.lock.RUnlock() +// getObjectMetadata - get object metadata +func (donut API) getObjectMetadata(bucket, object string) (ObjectMetadata, error) { + donut.lock.RLock() + defer donut.lock.RUnlock() errParams := map[string]string{ "bucket": bucket, "object": object, } - if err := dt.listDonutBuckets(); err != nil { + if err := donut.listDonutBuckets(); err != nil { return ObjectMetadata{}, iodine.New(err, errParams) } - if _, ok := dt.buckets[bucket]; !ok { + if _, ok := donut.buckets[bucket]; !ok { return ObjectMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, errParams) } - bucketMeta, err := dt.getDonutBucketMetadata() + bucketMeta, err := donut.getDonutBucketMetadata() if err != nil { return ObjectMetadata{}, iodine.New(err, errParams) } if _, ok := bucketMeta.Buckets[bucket].BucketObjects[object]; !ok { return ObjectMetadata{}, iodine.New(ObjectNotFound{Object: object}, errParams) } - objectMetadata, err := dt.buckets[bucket].GetObjectMetadata(object) + objectMetadata, err := donut.buckets[bucket].GetObjectMetadata(object) if err != nil { return ObjectMetadata{}, iodine.New(err, nil) } @@ -264,16 +221,16 @@ func (dt donut) GetObjectMetadata(bucket, object string) (ObjectMetadata, error) } // getDiskWriters - -func (dt donut) getBucketMetadataWriters() ([]io.WriteCloser, error) { +func (donut API) getBucketMetadataWriters() ([]io.WriteCloser, error) { var writers []io.WriteCloser - for _, node := range dt.nodes { + for _, node := range donut.nodes { disks, err := node.ListDisks() if err != nil { return nil, iodine.New(err, nil) } writers = make([]io.WriteCloser, len(disks)) for order, d := range disks { - bucketMetaDataWriter, err := d.CreateFile(filepath.Join(dt.name, bucketMetadataConfig)) + bucketMetaDataWriter, err := d.CreateFile(filepath.Join(donut.config.DonutName, bucketMetadataConfig)) if err != nil { return nil, iodine.New(err, nil) } @@ -283,16 +240,16 @@ func (dt donut) getBucketMetadataWriters() ([]io.WriteCloser, error) { return writers, nil } -func (dt donut) getBucketMetadataReaders() ([]io.ReadCloser, error) { +func (donut API) getBucketMetadataReaders() ([]io.ReadCloser, error) { var readers []io.ReadCloser - for _, node := range dt.nodes { + for _, node := range donut.nodes { disks, err := node.ListDisks() if err != nil { return nil, iodine.New(err, nil) } readers = make([]io.ReadCloser, len(disks)) for order, d := range disks { - bucketMetaDataReader, err := d.OpenFile(filepath.Join(dt.name, bucketMetadataConfig)) + bucketMetaDataReader, err := d.OpenFile(filepath.Join(donut.config.DonutName, bucketMetadataConfig)) if err != nil { return nil, iodine.New(err, nil) } @@ -303,8 +260,8 @@ func (dt donut) getBucketMetadataReaders() ([]io.ReadCloser, error) { } // -func (dt donut) setDonutBucketMetadata(metadata *AllBuckets) error { - writers, err := dt.getBucketMetadataWriters() +func (donut API) setDonutBucketMetadata(metadata *AllBuckets) error { + writers, err := donut.getBucketMetadataWriters() if err != nil { return iodine.New(err, nil) } @@ -320,9 +277,9 @@ func (dt donut) setDonutBucketMetadata(metadata *AllBuckets) error { return nil } -func (dt donut) getDonutBucketMetadata() (*AllBuckets, error) { +func (donut API) getDonutBucketMetadata() (*AllBuckets, error) { metadata := new(AllBuckets) - readers, err := dt.getBucketMetadataReaders() + readers, err := donut.getBucketMetadataReaders() if err != nil { return nil, iodine.New(err, nil) } @@ -339,40 +296,40 @@ func (dt donut) getDonutBucketMetadata() (*AllBuckets, error) { return nil, iodine.New(InvalidArgument{}, nil) } -func (dt donut) makeDonutBucket(bucketName, acl string) error { - if err := dt.listDonutBuckets(); err != nil { +func (donut API) makeDonutBucket(bucketName, acl string) error { + if err := donut.listDonutBuckets(); err != nil { return iodine.New(err, nil) } - if _, ok := dt.buckets[bucketName]; ok { + if _, ok := donut.buckets[bucketName]; ok { return iodine.New(BucketExists{Bucket: bucketName}, nil) } - bucket, bucketMetadata, err := newBucket(bucketName, acl, dt.name, dt.nodes) + bucket, bucketMetadata, err := newBucket(bucketName, acl, donut.config.DonutName, donut.nodes) if err != nil { return iodine.New(err, nil) } nodeNumber := 0 - dt.buckets[bucketName] = bucket - for _, node := range dt.nodes { + donut.buckets[bucketName] = bucket + for _, node := range donut.nodes { disks, err := node.ListDisks() if err != nil { return iodine.New(err, nil) } for order, disk := range disks { bucketSlice := fmt.Sprintf("%s$%d$%d", bucketName, nodeNumber, order) - err := disk.MakeDir(filepath.Join(dt.name, bucketSlice)) + err := disk.MakeDir(filepath.Join(donut.config.DonutName, bucketSlice)) if err != nil { return iodine.New(err, nil) } } nodeNumber = nodeNumber + 1 } - metadata, err := dt.getDonutBucketMetadata() + metadata, err := donut.getDonutBucketMetadata() if err != nil { if os.IsNotExist(iodine.ToError(err)) { metadata := new(AllBuckets) metadata.Buckets = make(map[string]BucketMetadata) metadata.Buckets[bucketName] = bucketMetadata - err = dt.setDonutBucketMetadata(metadata) + err = donut.setDonutBucketMetadata(metadata) if err != nil { return iodine.New(err, nil) } @@ -381,21 +338,21 @@ func (dt donut) makeDonutBucket(bucketName, acl string) error { return iodine.New(err, nil) } metadata.Buckets[bucketName] = bucketMetadata - err = dt.setDonutBucketMetadata(metadata) + err = donut.setDonutBucketMetadata(metadata) if err != nil { return iodine.New(err, nil) } return nil } -func (dt donut) listDonutBuckets() error { - for _, node := range dt.nodes { +func (donut API) listDonutBuckets() error { + for _, node := range donut.nodes { disks, err := node.ListDisks() if err != nil { return iodine.New(err, nil) } for _, disk := range disks { - dirs, err := disk.ListDir(dt.name) + dirs, err := disk.ListDir(donut.config.DonutName) if err != nil { return iodine.New(err, nil) } @@ -406,11 +363,11 @@ func (dt donut) listDonutBuckets() error { } bucketName := splitDir[0] // we dont need this once we cache from makeDonutBucket() - bucket, _, err := newBucket(bucketName, "private", dt.name, dt.nodes) + bucket, _, err := newBucket(bucketName, "private", donut.config.DonutName, donut.nodes) if err != nil { return iodine.New(err, nil) } - dt.buckets[bucketName] = bucket + donut.buckets[bucketName] = bucket } } } diff --git a/pkg/storage/donut/donut_test.go b/pkg/storage/donut/donut-v1_test.go similarity index 89% rename from pkg/storage/donut/donut_test.go rename to pkg/storage/donut/donut-v1_test.go index e4f56fec4..10e5f3ef8 100644 --- a/pkg/storage/donut/donut_test.go +++ b/pkg/storage/donut/donut-v1_test.go @@ -55,13 +55,22 @@ func createTestNodeDiskMap(p string) map[string][]string { return nodes } -var dd Cache +var dd Interface func (s *MyDonutSuite) SetUpSuite(c *C) { root, err := ioutil.TempDir(os.TempDir(), "donut-") c.Assert(err, IsNil) s.root = root - dd = NewCache(100000, time.Duration(1*time.Hour), "test", createTestNodeDiskMap(root)) + + conf := new(Config) + conf.DonutName = "test" + conf.NodeDiskMap = createTestNodeDiskMap(root) + conf.Expiration = time.Duration(1 * time.Hour) + conf.MaxSize = 100000 + + dd, err = New(conf) + c.Assert(err, IsNil) + // testing empty donut buckets, err := dd.ListBuckets() c.Assert(err, IsNil) @@ -145,7 +154,7 @@ func (s *MyDonutSuite) TestCreateMultipleBucketsAndList(c *C) { // test object create without bucket func (s *MyDonutSuite) TestNewObjectFailsWithoutBucket(c *C) { - _, err := dd.CreateObject("unknown", "obj", "", "", 0, nil) + _, err := dd.CreateObject("unknown", "obj", "", 0, nil, nil) c.Assert(err, Not(IsNil)) } @@ -160,7 +169,7 @@ func (s *MyDonutSuite) TestNewObjectMetadata(c *C) { err := dd.MakeBucket("foo6", "private") c.Assert(err, IsNil) - objectMetadata, err := dd.CreateObject("foo6", "obj", "application/json", expectedMd5Sum, int64(len(data)), reader) + objectMetadata, err := dd.CreateObject("foo6", "obj", expectedMd5Sum, int64(len(data)), reader, map[string]string{"contentType": "application/json"}) c.Assert(err, IsNil) c.Assert(objectMetadata.MD5Sum, Equals, hex.EncodeToString(hasher.Sum(nil))) c.Assert(objectMetadata.Metadata["contentType"], Equals, "application/json") @@ -168,7 +177,7 @@ func (s *MyDonutSuite) TestNewObjectMetadata(c *C) { // test create object fails without name func (s *MyDonutSuite) TestNewObjectFailsWithEmptyName(c *C) { - _, err := dd.CreateObject("foo", "", "", "", 0, nil) + _, err := dd.CreateObject("foo", "", "", 0, nil, nil) c.Assert(err, Not(IsNil)) } @@ -184,7 +193,7 @@ func (s *MyDonutSuite) TestNewObjectCanBeWritten(c *C) { expectedMd5Sum := base64.StdEncoding.EncodeToString(hasher.Sum(nil)) reader := ioutil.NopCloser(bytes.NewReader([]byte(data))) - actualMetadata, err := dd.CreateObject("foo", "obj", "application/octet-stream", expectedMd5Sum, int64(len(data)), reader) + actualMetadata, err := dd.CreateObject("foo", "obj", expectedMd5Sum, int64(len(data)), reader, map[string]string{"contentType": "application/octet-stream"}) c.Assert(err, IsNil) c.Assert(actualMetadata.MD5Sum, Equals, hex.EncodeToString(hasher.Sum(nil))) @@ -206,11 +215,11 @@ func (s *MyDonutSuite) TestMultipleNewObjects(c *C) { one := ioutil.NopCloser(bytes.NewReader([]byte("one"))) - _, err := dd.CreateObject("foo5", "obj1", "", "", int64(len("one")), one) + _, err := dd.CreateObject("foo5", "obj1", "", int64(len("one")), one, nil) c.Assert(err, IsNil) two := ioutil.NopCloser(bytes.NewReader([]byte("two"))) - _, err = dd.CreateObject("foo5", "obj2", "", "", int64(len("two")), two) + _, err = dd.CreateObject("foo5", "obj2", "", int64(len("two")), two, nil) c.Assert(err, IsNil) var buffer1 bytes.Buffer @@ -259,7 +268,7 @@ func (s *MyDonutSuite) TestMultipleNewObjects(c *C) { c.Assert(objectsMetadata[1].Object, Equals, "obj2") three := ioutil.NopCloser(bytes.NewReader([]byte("three"))) - _, err = dd.CreateObject("foo5", "obj3", "", "", int64(len("three")), three) + _, err = dd.CreateObject("foo5", "obj3", "", int64(len("three")), three, nil) c.Assert(err, IsNil) var buffer bytes.Buffer diff --git a/pkg/storage/donut/cache.go b/pkg/storage/donut/donut-v2.go similarity index 63% rename from pkg/storage/donut/cache.go rename to pkg/storage/donut/donut-v2.go index 59b49a5fa..8cb9de52b 100644 --- a/pkg/storage/donut/cache.go +++ b/pkg/storage/donut/donut-v2.go @@ -33,6 +33,7 @@ import ( "time" "github.com/minio/minio/pkg/iodine" + "github.com/minio/minio/pkg/quick" "github.com/minio/minio/pkg/storage/donut/trove" ) @@ -41,15 +42,24 @@ const ( totalBuckets = 100 ) -// Cache - local variables -type Cache struct { - storedBuckets map[string]storedBucket +// Config donut config +type Config struct { + Version string `json:"version"` + MaxSize uint64 `json:"max-size"` + Expiration time.Duration `json:"expiration"` + DonutName string `json:"donut-name"` + NodeDiskMap map[string][]string `json:"node-disk-map"` +} + +// API - local variables +type API struct { + config *Config lock *sync.RWMutex objects *trove.Cache multiPartObjects *trove.Cache - maxSize uint64 - expiration time.Duration - donut Donut + storedBuckets map[string]storedBucket + nodes map[string]node + buckets map[string]bucket } // storedBucket saved bucket @@ -67,79 +77,85 @@ type multiPartSession struct { initiated time.Time } -type proxyWriter struct { - writer io.Writer - writtenBytes []byte -} - -func (r *proxyWriter) Write(p []byte) (n int, err error) { - n, err = r.writer.Write(p) - if err != nil { - return +// New instantiate a new donut +func New(c *Config) (Interface, error) { + if err := quick.CheckData(c); err != nil { + return nil, iodine.New(err, nil) } - r.writtenBytes = append(r.writtenBytes, p[0:n]...) - return -} - -func newProxyWriter(w io.Writer) *proxyWriter { - return &proxyWriter{writer: w, writtenBytes: nil} -} - -// NewCache new cache -func NewCache(maxSize uint64, expiration time.Duration, donutName string, nodeDiskMap map[string][]string) Cache { - c := Cache{} - c.storedBuckets = make(map[string]storedBucket) - c.objects = trove.NewCache(maxSize, expiration) - c.multiPartObjects = trove.NewCache(0, time.Duration(0)) - c.objects.OnExpired = c.expiredObject - c.multiPartObjects.OnExpired = c.expiredPart - c.lock = new(sync.RWMutex) - c.maxSize = maxSize - c.expiration = expiration + a := API{config: c} + a.storedBuckets = make(map[string]storedBucket) + a.nodes = make(map[string]node) + a.buckets = make(map[string]bucket) + a.objects = trove.NewCache(a.config.MaxSize, a.config.Expiration) + a.multiPartObjects = trove.NewCache(0, time.Duration(0)) + a.objects.OnExpired = a.expiredObject + a.multiPartObjects.OnExpired = a.expiredPart + a.lock = new(sync.RWMutex) // set up cache expiration - c.objects.ExpireObjects(time.Second * 5) - c.donut, _ = NewDonut(donutName, nodeDiskMap) - return c + a.objects.ExpireObjects(time.Second * 5) + + if len(a.config.NodeDiskMap) > 0 { + for k, v := range a.config.NodeDiskMap { + if len(v) == 0 { + return nil, iodine.New(InvalidDisksArgument{}, nil) + } + err := a.AttachNode(k, v) + if err != nil { + return nil, iodine.New(err, nil) + } + } + /// Initialization, populate all buckets into memory + buckets, err := a.listBuckets() + if err != nil { + return nil, iodine.New(err, nil) + } + for k, v := range buckets { + storedBucket := a.storedBuckets[k] + storedBucket.bucketMetadata = v + a.storedBuckets[k] = storedBucket + } + } + return a, nil } // GetObject - GET object from cache buffer -func (cache Cache) GetObject(w io.Writer, bucket string, object string) (int64, error) { - cache.lock.RLock() +func (donut API) GetObject(w io.Writer, bucket string, object string) (int64, error) { + donut.lock.RLock() if !IsValidBucket(bucket) { - cache.lock.RUnlock() + donut.lock.RUnlock() return 0, iodine.New(BucketNameInvalid{Bucket: bucket}, nil) } if !IsValidObjectName(object) { - cache.lock.RUnlock() + donut.lock.RUnlock() return 0, iodine.New(ObjectNameInvalid{Object: object}, nil) } - if _, ok := cache.storedBuckets[bucket]; ok == false { - cache.lock.RUnlock() + if _, ok := donut.storedBuckets[bucket]; ok == false { + donut.lock.RUnlock() return 0, iodine.New(BucketNotFound{Bucket: bucket}, nil) } objectKey := bucket + "/" + object - data, ok := cache.objects.Get(objectKey) + data, ok := donut.objects.Get(objectKey) if !ok { - if cache.donut != nil { - reader, size, err := cache.donut.GetObject(bucket, object) + if len(donut.config.NodeDiskMap) > 0 { + reader, size, err := donut.getObject(bucket, object) if err != nil { - cache.lock.RUnlock() + donut.lock.RUnlock() return 0, iodine.New(err, nil) } // new proxy writer to capture data read from disk - pw := newProxyWriter(w) + pw := NewProxyWriter(w) written, err := io.CopyN(pw, reader, size) if err != nil { - cache.lock.RUnlock() + donut.lock.RUnlock() return 0, iodine.New(err, nil) } - cache.lock.RUnlock() + donut.lock.RUnlock() /// cache object read from disk { - cache.lock.Lock() - ok := cache.objects.Set(objectKey, pw.writtenBytes) - cache.lock.Unlock() + donut.lock.Lock() + ok := donut.objects.Set(objectKey, pw.writtenBytes) + donut.lock.Unlock() pw.writtenBytes = nil go debug.FreeOSMemory() if !ok { @@ -148,65 +164,65 @@ func (cache Cache) GetObject(w io.Writer, bucket string, object string) (int64, } return written, nil } - cache.lock.RUnlock() + donut.lock.RUnlock() return 0, iodine.New(ObjectNotFound{Object: object}, nil) } - written, err := io.CopyN(w, bytes.NewBuffer(data), int64(cache.objects.Len(objectKey))) + written, err := io.CopyN(w, bytes.NewBuffer(data), int64(donut.objects.Len(objectKey))) if err != nil { return 0, iodine.New(err, nil) } - cache.lock.RUnlock() + donut.lock.RUnlock() return written, nil } // GetPartialObject - GET object from cache buffer range -func (cache Cache) GetPartialObject(w io.Writer, bucket, object string, start, length int64) (int64, error) { +func (donut API) GetPartialObject(w io.Writer, bucket, object string, start, length int64) (int64, error) { errParams := map[string]string{ "bucket": bucket, "object": object, "start": strconv.FormatInt(start, 10), "length": strconv.FormatInt(length, 10), } - cache.lock.RLock() + donut.lock.RLock() if !IsValidBucket(bucket) { - cache.lock.RUnlock() + donut.lock.RUnlock() return 0, iodine.New(BucketNameInvalid{Bucket: bucket}, errParams) } if !IsValidObjectName(object) { - cache.lock.RUnlock() + donut.lock.RUnlock() return 0, iodine.New(ObjectNameInvalid{Object: object}, errParams) } if start < 0 { - cache.lock.RUnlock() + donut.lock.RUnlock() return 0, iodine.New(InvalidRange{ Start: start, Length: length, }, errParams) } objectKey := bucket + "/" + object - data, ok := cache.objects.Get(objectKey) + data, ok := donut.objects.Get(objectKey) if !ok { - if cache.donut != nil { - reader, _, err := cache.donut.GetObject(bucket, object) + if len(donut.config.NodeDiskMap) > 0 { + reader, _, err := donut.getObject(bucket, object) if err != nil { - cache.lock.RUnlock() + donut.lock.RUnlock() return 0, iodine.New(err, nil) } if _, err := io.CopyN(ioutil.Discard, reader, start); err != nil { - cache.lock.RUnlock() + donut.lock.RUnlock() return 0, iodine.New(err, nil) } - pw := newProxyWriter(w) + pw := NewProxyWriter(w) written, err := io.CopyN(w, reader, length) if err != nil { - cache.lock.RUnlock() + donut.lock.RUnlock() return 0, iodine.New(err, nil) } - cache.lock.RUnlock() + donut.lock.RUnlock() { - cache.lock.Lock() - ok := cache.objects.Set(objectKey, pw.writtenBytes) - cache.lock.Unlock() + donut.lock.Lock() + ok := donut.objects.Set(objectKey, pw.writtenBytes) + donut.lock.Unlock() pw.writtenBytes = nil go debug.FreeOSMemory() if !ok { @@ -215,72 +231,70 @@ func (cache Cache) GetPartialObject(w io.Writer, bucket, object string, start, l } return written, nil } - cache.lock.RUnlock() + donut.lock.RUnlock() return 0, iodine.New(ObjectNotFound{Object: object}, nil) } written, err := io.CopyN(w, bytes.NewBuffer(data[start:]), length) if err != nil { return 0, iodine.New(err, nil) } - cache.lock.RUnlock() + donut.lock.RUnlock() return written, nil } // GetBucketMetadata - -func (cache Cache) GetBucketMetadata(bucket string) (BucketMetadata, error) { - cache.lock.RLock() +func (donut API) GetBucketMetadata(bucket string) (BucketMetadata, error) { + donut.lock.RLock() if !IsValidBucket(bucket) { - cache.lock.RUnlock() + donut.lock.RUnlock() return BucketMetadata{}, iodine.New(BucketNameInvalid{Bucket: bucket}, nil) } - if _, ok := cache.storedBuckets[bucket]; ok == false { - if cache.donut == nil { - cache.lock.RUnlock() - return BucketMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, nil) + if _, ok := donut.storedBuckets[bucket]; ok == false { + if len(donut.config.NodeDiskMap) > 0 { + bucketMetadata, err := donut.getBucketMetadata(bucket) + if err != nil { + donut.lock.RUnlock() + return BucketMetadata{}, iodine.New(err, nil) + } + storedBucket := donut.storedBuckets[bucket] + donut.lock.RUnlock() + { + donut.lock.Lock() + storedBucket.bucketMetadata = bucketMetadata + donut.storedBuckets[bucket] = storedBucket + donut.lock.Unlock() + } } - bucketMetadata, err := cache.donut.GetBucketMetadata(bucket) - if err != nil { - cache.lock.RUnlock() - return BucketMetadata{}, iodine.New(err, nil) - } - storedBucket := cache.storedBuckets[bucket] - cache.lock.RUnlock() - cache.lock.Lock() - storedBucket.bucketMetadata = bucketMetadata - cache.storedBuckets[bucket] = storedBucket - cache.lock.Unlock() + return BucketMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, nil) } - cache.lock.RUnlock() - return cache.storedBuckets[bucket].bucketMetadata, nil + donut.lock.RUnlock() + return donut.storedBuckets[bucket].bucketMetadata, nil } // SetBucketMetadata - -func (cache Cache) SetBucketMetadata(bucket, acl string) error { - cache.lock.RLock() +func (donut API) SetBucketMetadata(bucket string, metadata map[string]string) error { + donut.lock.RLock() if !IsValidBucket(bucket) { - cache.lock.RUnlock() + donut.lock.RUnlock() return iodine.New(BucketNameInvalid{Bucket: bucket}, nil) } - if _, ok := cache.storedBuckets[bucket]; ok == false { - cache.lock.RUnlock() + if _, ok := donut.storedBuckets[bucket]; ok == false { + donut.lock.RUnlock() return iodine.New(BucketNotFound{Bucket: bucket}, nil) } - if strings.TrimSpace(acl) == "" { - acl = "private" - } - cache.lock.RUnlock() - cache.lock.Lock() - m := make(map[string]string) - m["acl"] = acl - if cache.donut != nil { - if err := cache.donut.SetBucketMetadata(bucket, m); err != nil { - return iodine.New(err, nil) + donut.lock.RUnlock() + donut.lock.Lock() + { + if len(donut.config.NodeDiskMap) > 0 { + if err := donut.setBucketMetadata(bucket, metadata); err != nil { + return iodine.New(err, nil) + } } + storedBucket := donut.storedBuckets[bucket] + storedBucket.bucketMetadata.ACL = BucketACL(metadata["acl"]) + donut.storedBuckets[bucket] = storedBucket } - storedBucket := cache.storedBuckets[bucket] - storedBucket.bucketMetadata.ACL = BucketACL(acl) - cache.storedBuckets[bucket] = storedBucket - cache.lock.Unlock() + donut.lock.Unlock() return nil } @@ -304,44 +318,45 @@ func isMD5SumEqual(expectedMD5Sum, actualMD5Sum string) error { } // CreateObject - -func (cache Cache) CreateObject(bucket, key, contentType, expectedMD5Sum string, size int64, data io.Reader) (ObjectMetadata, error) { - if size > int64(cache.maxSize) { +func (donut API) CreateObject(bucket, key, expectedMD5Sum string, size int64, data io.Reader, metadata map[string]string) (ObjectMetadata, error) { + if size > int64(donut.config.MaxSize) { generic := GenericObjectError{Bucket: bucket, Object: key} return ObjectMetadata{}, iodine.New(EntityTooLarge{ GenericObjectError: generic, Size: strconv.FormatInt(size, 10), - MaxSize: strconv.FormatUint(cache.maxSize, 10), + MaxSize: strconv.FormatUint(donut.config.MaxSize, 10), }, nil) } - objectMetadata, err := cache.createObject(bucket, key, contentType, expectedMD5Sum, size, data) + contentType := metadata["contentType"] + objectMetadata, err := donut.createObject(bucket, key, contentType, expectedMD5Sum, size, data) // free debug.FreeOSMemory() return objectMetadata, iodine.New(err, nil) } // createObject - PUT object to cache buffer -func (cache Cache) createObject(bucket, key, contentType, expectedMD5Sum string, size int64, data io.Reader) (ObjectMetadata, error) { - cache.lock.RLock() +func (donut API) createObject(bucket, key, contentType, expectedMD5Sum string, size int64, data io.Reader) (ObjectMetadata, error) { + donut.lock.RLock() if !IsValidBucket(bucket) { - cache.lock.RUnlock() + donut.lock.RUnlock() return ObjectMetadata{}, iodine.New(BucketNameInvalid{Bucket: bucket}, nil) } if !IsValidObjectName(key) { - cache.lock.RUnlock() + donut.lock.RUnlock() return ObjectMetadata{}, iodine.New(ObjectNameInvalid{Object: key}, nil) } - if _, ok := cache.storedBuckets[bucket]; ok == false { - cache.lock.RUnlock() + if _, ok := donut.storedBuckets[bucket]; ok == false { + donut.lock.RUnlock() return ObjectMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, nil) } - storedBucket := cache.storedBuckets[bucket] + storedBucket := donut.storedBuckets[bucket] // get object key objectKey := bucket + "/" + key if _, ok := storedBucket.objectMetadata[objectKey]; ok == true { - cache.lock.RUnlock() + donut.lock.RUnlock() return ObjectMetadata{}, iodine.New(ObjectExists{Object: key}, nil) } - cache.lock.RUnlock() + donut.lock.RUnlock() if contentType == "" { contentType = "application/octet-stream" @@ -356,15 +371,15 @@ func (cache Cache) createObject(bucket, key, contentType, expectedMD5Sum string, expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes) } - if cache.donut != nil { - objMetadata, err := cache.donut.PutObject(bucket, key, expectedMD5Sum, data, map[string]string{"contentType": contentType}) + if len(donut.config.NodeDiskMap) > 0 { + objMetadata, err := donut.putObject(bucket, key, expectedMD5Sum, data, map[string]string{"contentType": contentType}) if err != nil { return ObjectMetadata{}, iodine.New(err, nil) } - cache.lock.Lock() + donut.lock.Lock() storedBucket.objectMetadata[objectKey] = objMetadata - cache.storedBuckets[bucket] = storedBucket - cache.lock.Unlock() + donut.storedBuckets[bucket] = storedBucket + donut.lock.Unlock() return objMetadata, nil } // calculate md5 @@ -382,9 +397,9 @@ func (cache Cache) createObject(bucket, key, contentType, expectedMD5Sum string, break } hash.Write(byteBuffer[0:length]) - cache.lock.Lock() - ok := cache.objects.Append(objectKey, byteBuffer[0:length]) - cache.lock.Unlock() + donut.lock.Lock() + ok := donut.objects.Append(objectKey, byteBuffer[0:length]) + donut.lock.Unlock() if !ok { return ObjectMetadata{}, iodine.New(InternalError{}, nil) } @@ -416,40 +431,40 @@ func (cache Cache) createObject(bucket, key, contentType, expectedMD5Sum string, Size: int64(totalLength), } - cache.lock.Lock() + donut.lock.Lock() storedBucket.objectMetadata[objectKey] = newObject - cache.storedBuckets[bucket] = storedBucket - cache.lock.Unlock() + donut.storedBuckets[bucket] = storedBucket + donut.lock.Unlock() return newObject, nil } // MakeBucket - create bucket in cache -func (cache Cache) MakeBucket(bucketName, acl string) error { - cache.lock.RLock() - if len(cache.storedBuckets) == totalBuckets { - cache.lock.RUnlock() +func (donut API) MakeBucket(bucketName, acl string) error { + donut.lock.RLock() + if len(donut.storedBuckets) == totalBuckets { + donut.lock.RUnlock() return iodine.New(TooManyBuckets{Bucket: bucketName}, nil) } if !IsValidBucket(bucketName) { - cache.lock.RUnlock() + donut.lock.RUnlock() return iodine.New(BucketNameInvalid{Bucket: bucketName}, nil) } if !IsValidBucketACL(acl) { - cache.lock.RUnlock() + donut.lock.RUnlock() return iodine.New(InvalidACL{ACL: acl}, nil) } - if _, ok := cache.storedBuckets[bucketName]; ok == true { - cache.lock.RUnlock() + if _, ok := donut.storedBuckets[bucketName]; ok == true { + donut.lock.RUnlock() return iodine.New(BucketExists{Bucket: bucketName}, nil) } - cache.lock.RUnlock() + donut.lock.RUnlock() if strings.TrimSpace(acl) == "" { // default is private acl = "private" } - if cache.donut != nil { - if err := cache.donut.MakeBucket(bucketName, BucketACL(acl)); err != nil { + if len(donut.config.NodeDiskMap) > 0 { + if err := donut.makeBucket(bucketName, BucketACL(acl)); err != nil { return iodine.New(err, nil) } } @@ -461,29 +476,29 @@ func (cache Cache) MakeBucket(bucketName, acl string) error { newBucket.bucketMetadata.Name = bucketName newBucket.bucketMetadata.Created = time.Now().UTC() newBucket.bucketMetadata.ACL = BucketACL(acl) - cache.lock.Lock() - cache.storedBuckets[bucketName] = newBucket - cache.lock.Unlock() + donut.lock.Lock() + donut.storedBuckets[bucketName] = newBucket + donut.lock.Unlock() return nil } // ListObjects - list objects from cache -func (cache Cache) ListObjects(bucket string, resources BucketResourcesMetadata) ([]ObjectMetadata, BucketResourcesMetadata, error) { - cache.lock.RLock() - defer cache.lock.RUnlock() +func (donut API) ListObjects(bucket string, resources BucketResourcesMetadata) ([]ObjectMetadata, BucketResourcesMetadata, error) { + donut.lock.RLock() + defer donut.lock.RUnlock() if !IsValidBucket(bucket) { return nil, BucketResourcesMetadata{IsTruncated: false}, iodine.New(BucketNameInvalid{Bucket: bucket}, nil) } if !IsValidPrefix(resources.Prefix) { return nil, BucketResourcesMetadata{IsTruncated: false}, iodine.New(ObjectNameInvalid{Object: resources.Prefix}, nil) } - if _, ok := cache.storedBuckets[bucket]; ok == false { + if _, ok := donut.storedBuckets[bucket]; ok == false { return nil, BucketResourcesMetadata{IsTruncated: false}, iodine.New(BucketNotFound{Bucket: bucket}, nil) } var results []ObjectMetadata var keys []string - if cache.donut != nil { - listObjects, err := cache.donut.ListObjects( + if len(donut.config.NodeDiskMap) > 0 { + listObjects, err := donut.listObjects( bucket, resources.Prefix, resources.Marker, @@ -507,7 +522,7 @@ func (cache Cache) ListObjects(bucket string, resources BucketResourcesMetadata) } return results, resources, nil } - storedBucket := cache.storedBuckets[bucket] + storedBucket := donut.storedBuckets[bucket] for key := range storedBucket.objectMetadata { if strings.HasPrefix(key, bucket+"/") { key = key[len(bucket)+1:] @@ -561,11 +576,11 @@ func (b byBucketName) Swap(i, j int) { b[i], b[j] = b[j], b[i] } func (b byBucketName) Less(i, j int) bool { return b[i].Name < b[j].Name } // ListBuckets - List buckets from cache -func (cache Cache) ListBuckets() ([]BucketMetadata, error) { - cache.lock.RLock() - defer cache.lock.RUnlock() +func (donut API) ListBuckets() ([]BucketMetadata, error) { + donut.lock.RLock() + defer donut.lock.RUnlock() var results []BucketMetadata - for _, bucket := range cache.storedBuckets { + for _, bucket := range donut.storedBuckets { results = append(results, bucket.bucketMetadata) } sort.Sort(byBucketName(results)) @@ -573,50 +588,50 @@ func (cache Cache) ListBuckets() ([]BucketMetadata, error) { } // GetObjectMetadata - get object metadata from cache -func (cache Cache) GetObjectMetadata(bucket, key string) (ObjectMetadata, error) { - cache.lock.RLock() +func (donut API) GetObjectMetadata(bucket, key string) (ObjectMetadata, error) { + donut.lock.RLock() // check if bucket exists if !IsValidBucket(bucket) { - cache.lock.RUnlock() + donut.lock.RUnlock() return ObjectMetadata{}, iodine.New(BucketNameInvalid{Bucket: bucket}, nil) } if !IsValidObjectName(key) { - cache.lock.RUnlock() + donut.lock.RUnlock() return ObjectMetadata{}, iodine.New(ObjectNameInvalid{Object: key}, nil) } - if _, ok := cache.storedBuckets[bucket]; ok == false { - cache.lock.RUnlock() + if _, ok := donut.storedBuckets[bucket]; ok == false { + donut.lock.RUnlock() return ObjectMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, nil) } - storedBucket := cache.storedBuckets[bucket] + storedBucket := donut.storedBuckets[bucket] objectKey := bucket + "/" + key if objMetadata, ok := storedBucket.objectMetadata[objectKey]; ok == true { - cache.lock.RUnlock() + donut.lock.RUnlock() return objMetadata, nil } - if cache.donut != nil { - objMetadata, err := cache.donut.GetObjectMetadata(bucket, key) - cache.lock.RUnlock() + if len(donut.config.NodeDiskMap) > 0 { + objMetadata, err := donut.getObjectMetadata(bucket, key) + donut.lock.RUnlock() if err != nil { return ObjectMetadata{}, iodine.New(err, nil) } // update - cache.lock.Lock() + donut.lock.Lock() storedBucket.objectMetadata[objectKey] = objMetadata - cache.lock.Unlock() + donut.lock.Unlock() return objMetadata, nil } - cache.lock.RUnlock() + donut.lock.RUnlock() return ObjectMetadata{}, iodine.New(ObjectNotFound{Object: key}, nil) } -func (cache Cache) expiredObject(a ...interface{}) { - cacheStats := cache.objects.Stats() +func (donut API) expiredObject(a ...interface{}) { + cacheStats := donut.objects.Stats() log.Printf("CurrentSize: %d, CurrentItems: %d, TotalExpirations: %d", cacheStats.Bytes, cacheStats.Items, cacheStats.Expired) key := a[0].(string) // loop through all buckets - for _, storedBucket := range cache.storedBuckets { + for _, storedBucket := range donut.storedBuckets { delete(storedBucket.objectMetadata, key) } debug.FreeOSMemory() diff --git a/pkg/storage/donut/cache_test.go b/pkg/storage/donut/donut-v2_test.go similarity index 88% rename from pkg/storage/donut/cache_test.go rename to pkg/storage/donut/donut-v2_test.go index 644df5f00..556c0ddc0 100644 --- a/pkg/storage/donut/cache_test.go +++ b/pkg/storage/donut/donut-v2_test.go @@ -34,11 +34,20 @@ type MyCacheSuite struct{} var _ = Suite(&MyCacheSuite{}) -var dc Cache +var dc Interface func (s *MyCacheSuite) SetUpSuite(c *C) { - // no donut this time - dc = NewCache(100000, time.Duration(1*time.Hour), "", nil) + // test only cache + conf := new(Config) + conf.DonutName = "" + conf.NodeDiskMap = nil + conf.Expiration = time.Duration(1 * time.Hour) + conf.MaxSize = 100000 + + var err error + dc, err = New(conf) + c.Assert(err, IsNil) + // testing empty cache buckets, err := dc.ListBuckets() c.Assert(err, IsNil) @@ -118,7 +127,7 @@ func (s *MyCacheSuite) TestCreateMultipleBucketsAndList(c *C) { // test object create without bucket func (s *MyCacheSuite) TestNewObjectFailsWithoutBucket(c *C) { - _, err := dc.CreateObject("unknown", "obj", "", "", 0, nil) + _, err := dc.CreateObject("unknown", "obj", "", 0, nil, nil) c.Assert(err, Not(IsNil)) } @@ -133,7 +142,7 @@ func (s *MyCacheSuite) TestNewObjectMetadata(c *C) { err := dc.MakeBucket("foo6", "private") c.Assert(err, IsNil) - objectMetadata, err := dc.CreateObject("foo6", "obj", "application/json", expectedMd5Sum, int64(len(data)), reader) + objectMetadata, err := dc.CreateObject("foo6", "obj", expectedMd5Sum, int64(len(data)), reader, map[string]string{"contentType": "application/json"}) c.Assert(err, IsNil) c.Assert(objectMetadata.MD5Sum, Equals, hex.EncodeToString(hasher.Sum(nil))) c.Assert(objectMetadata.Metadata["contentType"], Equals, "application/json") @@ -141,7 +150,7 @@ func (s *MyCacheSuite) TestNewObjectMetadata(c *C) { // test create object fails without name func (s *MyCacheSuite) TestNewObjectFailsWithEmptyName(c *C) { - _, err := dc.CreateObject("foo", "", "", "", 0, nil) + _, err := dc.CreateObject("foo", "", "", 0, nil, nil) c.Assert(err, Not(IsNil)) } @@ -157,7 +166,7 @@ func (s *MyCacheSuite) TestNewObjectCanBeWritten(c *C) { expectedMd5Sum := base64.StdEncoding.EncodeToString(hasher.Sum(nil)) reader := ioutil.NopCloser(bytes.NewReader([]byte(data))) - actualMetadata, err := dc.CreateObject("foo", "obj", "application/octet-stream", expectedMd5Sum, int64(len(data)), reader) + actualMetadata, err := dc.CreateObject("foo", "obj", expectedMd5Sum, int64(len(data)), reader, map[string]string{"contentType": "application/octet-stream"}) c.Assert(err, IsNil) c.Assert(actualMetadata.MD5Sum, Equals, hex.EncodeToString(hasher.Sum(nil))) @@ -179,11 +188,11 @@ func (s *MyCacheSuite) TestMultipleNewObjects(c *C) { one := ioutil.NopCloser(bytes.NewReader([]byte("one"))) - _, err := dc.CreateObject("foo5", "obj1", "", "", int64(len("one")), one) + _, err := dc.CreateObject("foo5", "obj1", "", int64(len("one")), one, nil) c.Assert(err, IsNil) two := ioutil.NopCloser(bytes.NewReader([]byte("two"))) - _, err = dc.CreateObject("foo5", "obj2", "", "", int64(len("two")), two) + _, err = dc.CreateObject("foo5", "obj2", "", int64(len("two")), two, nil) c.Assert(err, IsNil) var buffer1 bytes.Buffer @@ -232,7 +241,7 @@ func (s *MyCacheSuite) TestMultipleNewObjects(c *C) { c.Assert(objectsMetadata[1].Object, Equals, "obj2") three := ioutil.NopCloser(bytes.NewReader([]byte("three"))) - _, err = dc.CreateObject("foo5", "obj3", "", "", int64(len("three")), three) + _, err = dc.CreateObject("foo5", "obj3", "", int64(len("three")), three, nil) c.Assert(err, IsNil) var buffer bytes.Buffer diff --git a/pkg/storage/donut/interfaces.go b/pkg/storage/donut/interfaces.go index b1ea80318..d675c5b8c 100644 --- a/pkg/storage/donut/interfaces.go +++ b/pkg/storage/donut/interfaces.go @@ -20,8 +20,8 @@ import "io" // Collection of Donut specification interfaces -// Donut is a collection of object storage and management interface -type Donut interface { +// Interface is a collection of object storage and management interface +type Interface interface { ObjectStorage Management } @@ -31,16 +31,29 @@ type ObjectStorage interface { // Storage service operations GetBucketMetadata(bucket string) (BucketMetadata, error) SetBucketMetadata(bucket string, metadata map[string]string) error - ListBuckets() (map[string]BucketMetadata, error) - MakeBucket(bucket string, acl BucketACL) error + ListBuckets() ([]BucketMetadata, error) + MakeBucket(bucket string, ACL string) error // Bucket operations - ListObjects(bucket, prefix, marker, delim string, maxKeys int) (ListObjects, error) + ListObjects(bucket string, resources BucketResourcesMetadata) ([]ObjectMetadata, BucketResourcesMetadata, error) // Object operations - GetObject(bucket, object string) (io.ReadCloser, int64, error) + GetObject(w io.Writer, bucket, object string) (int64, error) + GetPartialObject(w io.Writer, bucket, object string, start, length int64) (int64, error) GetObjectMetadata(bucket, object string) (ObjectMetadata, error) - PutObject(bucket, object, expectedMD5Sum string, reader io.Reader, metadata map[string]string) (ObjectMetadata, error) + CreateObject(bucket, object, expectedMD5Sum string, size int64, reader io.Reader, metadata map[string]string) (ObjectMetadata, error) + + Multipart +} + +// Multipart API +type Multipart interface { + NewMultipartUpload(bucket, key, contentType string) (string, error) + AbortMultipartUpload(bucket, key, uploadID string) error + CreateObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) + CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (ObjectMetadata, error) + ListMultipartUploads(bucket string, resources BucketMultipartResourcesMetadata) (BucketMultipartResourcesMetadata, error) + ListObjectParts(bucket, key string, resources ObjectResourcesMetadata) (ObjectResourcesMetadata, error) } // Management is a donut management system interface diff --git a/pkg/storage/donut/management.go b/pkg/storage/donut/management.go index 17f89e4ef..c897524b2 100644 --- a/pkg/storage/donut/management.go +++ b/pkg/storage/donut/management.go @@ -25,14 +25,14 @@ import ( ) // Heal - heal a donut and fix bad data blocks -func (dt donut) Heal() error { +func (donut API) Heal() error { return iodine.New(NotImplemented{Function: "Heal"}, nil) } // Info - return info about donut configuration -func (dt donut) Info() (nodeDiskMap map[string][]string, err error) { +func (donut API) Info() (nodeDiskMap map[string][]string, err error) { nodeDiskMap = make(map[string][]string) - for nodeName, node := range dt.nodes { + for nodeName, node := range donut.nodes { disks, err := node.ListDisks() if err != nil { return nil, iodine.New(err, nil) @@ -47,7 +47,7 @@ func (dt donut) Info() (nodeDiskMap map[string][]string, err error) { } // AttachNode - attach node -func (dt donut) AttachNode(hostname string, disks []string) error { +func (donut API) AttachNode(hostname string, disks []string) error { if hostname == "" || len(disks) == 0 { return iodine.New(InvalidArgument{}, nil) } @@ -55,13 +55,13 @@ func (dt donut) AttachNode(hostname string, disks []string) error { if err != nil { return iodine.New(err, nil) } - dt.nodes[hostname] = node + donut.nodes[hostname] = node for i, d := range disks { newDisk, err := disk.New(d) if err != nil { return iodine.New(err, nil) } - if err := newDisk.MakeDir(dt.name); err != nil { + if err := newDisk.MakeDir(donut.config.DonutName); err != nil { return iodine.New(err, nil) } if err := node.AttachDisk(newDisk, i); err != nil { @@ -72,21 +72,21 @@ func (dt donut) AttachNode(hostname string, disks []string) error { } // DetachNode - detach node -func (dt donut) DetachNode(hostname string) error { - delete(dt.nodes, hostname) +func (donut API) DetachNode(hostname string) error { + delete(donut.nodes, hostname) return nil } // SaveConfig - save donut configuration -func (dt donut) SaveConfig() error { +func (donut API) SaveConfig() error { nodeDiskMap := make(map[string][]string) - for hostname, node := range dt.nodes { + for hostname, node := range donut.nodes { disks, err := node.ListDisks() if err != nil { return iodine.New(err, nil) } for order, disk := range disks { - donutConfigPath := filepath.Join(dt.name, donutConfig) + donutConfigPath := filepath.Join(donut.config.DonutName, donutConfig) donutConfigWriter, err := disk.CreateFile(donutConfigPath) defer donutConfigWriter.Close() if err != nil { @@ -103,6 +103,6 @@ func (dt donut) SaveConfig() error { } // LoadConfig - load configuration -func (dt donut) LoadConfig() error { +func (donut API) LoadConfig() error { return iodine.New(NotImplemented{Function: "LoadConfig"}, nil) } diff --git a/pkg/storage/donut/cache-multipart.go b/pkg/storage/donut/multipart.go similarity index 75% rename from pkg/storage/donut/cache-multipart.go rename to pkg/storage/donut/multipart.go index e89b57ee6..1c76451c1 100644 --- a/pkg/storage/donut/cache-multipart.go +++ b/pkg/storage/donut/multipart.go @@ -35,55 +35,55 @@ import ( ) // NewMultipartUpload - -func (cache Cache) NewMultipartUpload(bucket, key, contentType string) (string, error) { - cache.lock.RLock() +func (donut API) NewMultipartUpload(bucket, key, contentType string) (string, error) { + donut.lock.RLock() if !IsValidBucket(bucket) { - cache.lock.RUnlock() + donut.lock.RUnlock() return "", iodine.New(BucketNameInvalid{Bucket: bucket}, nil) } if !IsValidObjectName(key) { - cache.lock.RUnlock() + donut.lock.RUnlock() return "", iodine.New(ObjectNameInvalid{Object: key}, nil) } - if _, ok := cache.storedBuckets[bucket]; ok == false { - cache.lock.RUnlock() + if _, ok := donut.storedBuckets[bucket]; ok == false { + donut.lock.RUnlock() return "", iodine.New(BucketNotFound{Bucket: bucket}, nil) } - storedBucket := cache.storedBuckets[bucket] + storedBucket := donut.storedBuckets[bucket] objectKey := bucket + "/" + key if _, ok := storedBucket.objectMetadata[objectKey]; ok == true { - cache.lock.RUnlock() + donut.lock.RUnlock() return "", iodine.New(ObjectExists{Object: key}, nil) } - cache.lock.RUnlock() + donut.lock.RUnlock() - cache.lock.Lock() + donut.lock.Lock() id := []byte(strconv.FormatInt(rand.Int63(), 10) + bucket + key + time.Now().String()) uploadIDSum := sha512.Sum512(id) uploadID := base64.URLEncoding.EncodeToString(uploadIDSum[:])[:47] - cache.storedBuckets[bucket].multiPartSession[key] = multiPartSession{ + donut.storedBuckets[bucket].multiPartSession[key] = multiPartSession{ uploadID: uploadID, initiated: time.Now(), totalParts: 0, } - cache.lock.Unlock() + donut.lock.Unlock() return uploadID, nil } // AbortMultipartUpload - -func (cache Cache) AbortMultipartUpload(bucket, key, uploadID string) error { - cache.lock.RLock() - storedBucket := cache.storedBuckets[bucket] +func (donut API) AbortMultipartUpload(bucket, key, uploadID string) error { + donut.lock.RLock() + storedBucket := donut.storedBuckets[bucket] if storedBucket.multiPartSession[key].uploadID != uploadID { - cache.lock.RUnlock() + donut.lock.RUnlock() return iodine.New(InvalidUploadID{UploadID: uploadID}, nil) } - cache.lock.RUnlock() + donut.lock.RUnlock() - cache.cleanupMultiparts(bucket, key, uploadID) - cache.cleanupMultipartSession(bucket, key, uploadID) + donut.cleanupMultiparts(bucket, key, uploadID) + donut.cleanupMultipartSession(bucket, key, uploadID) return nil } @@ -92,17 +92,17 @@ func getMultipartKey(key string, uploadID string, partNumber int) string { } // CreateObjectPart - -func (cache Cache) CreateObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) { +func (donut API) CreateObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) { // Verify upload id - cache.lock.RLock() - storedBucket := cache.storedBuckets[bucket] + donut.lock.RLock() + storedBucket := donut.storedBuckets[bucket] if storedBucket.multiPartSession[key].uploadID != uploadID { - cache.lock.RUnlock() + donut.lock.RUnlock() return "", iodine.New(InvalidUploadID{UploadID: uploadID}, nil) } - cache.lock.RUnlock() + donut.lock.RUnlock() - etag, err := cache.createObjectPart(bucket, key, uploadID, partID, "", expectedMD5Sum, size, data) + etag, err := donut.createObjectPart(bucket, key, uploadID, partID, "", expectedMD5Sum, size, data) if err != nil { return "", iodine.New(err, nil) } @@ -112,28 +112,28 @@ func (cache Cache) CreateObjectPart(bucket, key, uploadID string, partID int, co } // createObject - PUT object to cache buffer -func (cache Cache) createObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) { - cache.lock.RLock() +func (donut API) createObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) { + donut.lock.RLock() if !IsValidBucket(bucket) { - cache.lock.RUnlock() + donut.lock.RUnlock() return "", iodine.New(BucketNameInvalid{Bucket: bucket}, nil) } if !IsValidObjectName(key) { - cache.lock.RUnlock() + donut.lock.RUnlock() return "", iodine.New(ObjectNameInvalid{Object: key}, nil) } - if _, ok := cache.storedBuckets[bucket]; ok == false { - cache.lock.RUnlock() + if _, ok := donut.storedBuckets[bucket]; ok == false { + donut.lock.RUnlock() return "", iodine.New(BucketNotFound{Bucket: bucket}, nil) } - storedBucket := cache.storedBuckets[bucket] + storedBucket := donut.storedBuckets[bucket] // get object key partKey := bucket + "/" + getMultipartKey(key, uploadID, partID) if _, ok := storedBucket.partMetadata[partKey]; ok == true { - cache.lock.RUnlock() + donut.lock.RUnlock() return storedBucket.partMetadata[partKey].ETag, nil } - cache.lock.RUnlock() + donut.lock.RUnlock() if contentType == "" { contentType = "application/octet-stream" @@ -172,9 +172,9 @@ func (cache Cache) createObjectPart(bucket, key, uploadID string, partID int, co md5SumBytes := hash.Sum(nil) totalLength := int64(len(readBytes)) - cache.lock.Lock() - cache.multiPartObjects.Set(partKey, readBytes) - cache.lock.Unlock() + donut.lock.Lock() + donut.multiPartObjects.Set(partKey, readBytes) + donut.lock.Unlock() // setting up for de-allocation readBytes = nil @@ -192,32 +192,32 @@ func (cache Cache) createObjectPart(bucket, key, uploadID string, partID int, co Size: totalLength, } - cache.lock.Lock() + donut.lock.Lock() storedBucket.partMetadata[partKey] = newPart multiPartSession := storedBucket.multiPartSession[key] multiPartSession.totalParts++ storedBucket.multiPartSession[key] = multiPartSession - cache.storedBuckets[bucket] = storedBucket - cache.lock.Unlock() + donut.storedBuckets[bucket] = storedBucket + donut.lock.Unlock() return md5Sum, nil } -func (cache Cache) cleanupMultipartSession(bucket, key, uploadID string) { - cache.lock.Lock() - defer cache.lock.Unlock() - delete(cache.storedBuckets[bucket].multiPartSession, key) +func (donut API) cleanupMultipartSession(bucket, key, uploadID string) { + donut.lock.Lock() + defer donut.lock.Unlock() + delete(donut.storedBuckets[bucket].multiPartSession, key) } -func (cache Cache) cleanupMultiparts(bucket, key, uploadID string) { - for i := 1; i <= cache.storedBuckets[bucket].multiPartSession[key].totalParts; i++ { +func (donut API) cleanupMultiparts(bucket, key, uploadID string) { + for i := 1; i <= donut.storedBuckets[bucket].multiPartSession[key].totalParts; i++ { objectKey := bucket + "/" + getMultipartKey(key, uploadID, i) - cache.multiPartObjects.Delete(objectKey) + donut.multiPartObjects.Delete(objectKey) } } // CompleteMultipartUpload - -func (cache Cache) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (ObjectMetadata, error) { +func (donut API) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (ObjectMetadata, error) { if !IsValidBucket(bucket) { return ObjectMetadata{}, iodine.New(BucketNameInvalid{Bucket: bucket}, nil) } @@ -225,26 +225,26 @@ func (cache Cache) CompleteMultipartUpload(bucket, key, uploadID string, parts m return ObjectMetadata{}, iodine.New(ObjectNameInvalid{Object: key}, nil) } // Verify upload id - cache.lock.RLock() - if _, ok := cache.storedBuckets[bucket]; ok == false { - cache.lock.RUnlock() + donut.lock.RLock() + if _, ok := donut.storedBuckets[bucket]; ok == false { + donut.lock.RUnlock() return ObjectMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, nil) } - storedBucket := cache.storedBuckets[bucket] + storedBucket := donut.storedBuckets[bucket] if storedBucket.multiPartSession[key].uploadID != uploadID { - cache.lock.RUnlock() + donut.lock.RUnlock() return ObjectMetadata{}, iodine.New(InvalidUploadID{UploadID: uploadID}, nil) } - cache.lock.RUnlock() + donut.lock.RUnlock() - cache.lock.Lock() + donut.lock.Lock() var size int64 var fullObject bytes.Buffer for i := 1; i <= len(parts); i++ { recvMD5 := parts[i] - object, ok := cache.multiPartObjects.Get(bucket + "/" + getMultipartKey(key, uploadID, i)) + object, ok := donut.multiPartObjects.Get(bucket + "/" + getMultipartKey(key, uploadID, i)) if ok == false { - cache.lock.Unlock() + donut.lock.Unlock() return ObjectMetadata{}, iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil) } size += int64(len(object)) @@ -264,20 +264,20 @@ func (cache Cache) CompleteMultipartUpload(bucket, key, uploadID string, parts m object = nil go debug.FreeOSMemory() } - cache.lock.Unlock() + donut.lock.Unlock() md5sumSlice := md5.Sum(fullObject.Bytes()) // this is needed for final verification inside CreateObject, do not convert this to hex md5sum := base64.StdEncoding.EncodeToString(md5sumSlice[:]) - objectMetadata, err := cache.CreateObject(bucket, key, "", md5sum, size, &fullObject) + objectMetadata, err := donut.CreateObject(bucket, key, md5sum, size, &fullObject, nil) if err != nil { // No need to call internal cleanup functions here, caller will call AbortMultipartUpload() // which would in-turn cleanup properly in accordance with S3 Spec return ObjectMetadata{}, iodine.New(err, nil) } fullObject.Reset() - cache.cleanupMultiparts(bucket, key, uploadID) - cache.cleanupMultipartSession(bucket, key, uploadID) + donut.cleanupMultiparts(bucket, key, uploadID) + donut.cleanupMultipartSession(bucket, key, uploadID) return objectMetadata, nil } @@ -289,14 +289,14 @@ func (a byKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a byKey) Less(i, j int) bool { return a[i].Key < a[j].Key } // ListMultipartUploads - -func (cache Cache) ListMultipartUploads(bucket string, resources BucketMultipartResourcesMetadata) (BucketMultipartResourcesMetadata, error) { +func (donut API) ListMultipartUploads(bucket string, resources BucketMultipartResourcesMetadata) (BucketMultipartResourcesMetadata, error) { // TODO handle delimiter - cache.lock.RLock() - defer cache.lock.RUnlock() - if _, ok := cache.storedBuckets[bucket]; ok == false { + donut.lock.RLock() + defer donut.lock.RUnlock() + if _, ok := donut.storedBuckets[bucket]; ok == false { return BucketMultipartResourcesMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, nil) } - storedBucket := cache.storedBuckets[bucket] + storedBucket := donut.storedBuckets[bucket] var uploads []*UploadMetadata for key, session := range storedBucket.multiPartSession { @@ -351,14 +351,14 @@ func (a partNumber) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a partNumber) Less(i, j int) bool { return a[i].PartNumber < a[j].PartNumber } // ListObjectParts - -func (cache Cache) ListObjectParts(bucket, key string, resources ObjectResourcesMetadata) (ObjectResourcesMetadata, error) { +func (donut API) ListObjectParts(bucket, key string, resources ObjectResourcesMetadata) (ObjectResourcesMetadata, error) { // Verify upload id - cache.lock.RLock() - defer cache.lock.RUnlock() - if _, ok := cache.storedBuckets[bucket]; ok == false { + donut.lock.RLock() + defer donut.lock.RUnlock() + if _, ok := donut.storedBuckets[bucket]; ok == false { return ObjectResourcesMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, nil) } - storedBucket := cache.storedBuckets[bucket] + storedBucket := donut.storedBuckets[bucket] if _, ok := storedBucket.multiPartSession[key]; ok == false { return ObjectResourcesMetadata{}, iodine.New(ObjectNotFound{Object: key}, nil) } @@ -395,10 +395,10 @@ func (cache Cache) ListObjectParts(bucket, key string, resources ObjectResources return objectResourcesMetadata, nil } -func (cache Cache) expiredPart(a ...interface{}) { +func (donut API) expiredPart(a ...interface{}) { key := a[0].(string) // loop through all buckets - for _, storedBucket := range cache.storedBuckets { + for _, storedBucket := range donut.storedBuckets { delete(storedBucket.partMetadata, key) } debug.FreeOSMemory() diff --git a/pkg/storage/donut/rebalance.go b/pkg/storage/donut/rebalance.go index 3d47f2a32..7db4cfedd 100644 --- a/pkg/storage/donut/rebalance.go +++ b/pkg/storage/donut/rebalance.go @@ -26,11 +26,11 @@ import ( ) // Rebalance - -func (d donut) Rebalance() error { +func (donut API) Rebalance() error { var totalOffSetLength int var newDisks []disk.Disk var existingDirs []os.FileInfo - for _, node := range d.nodes { + for _, node := range donut.nodes { disks, err := node.ListDisks() if err != nil { return iodine.New(err, nil) @@ -38,7 +38,7 @@ func (d donut) Rebalance() error { totalOffSetLength = len(disks) fmt.Println(totalOffSetLength) for _, disk := range disks { - dirs, err := disk.ListDir(d.name) + dirs, err := disk.ListDir(donut.config.DonutName) if err != nil { return iodine.New(err, nil) }