Make donut fully integrated back into API handlers

This commit is contained in:
Harshavardhana 2015-07-02 20:31:22 -07:00
parent 12bde7df30
commit 7c37e9d06a
18 changed files with 971 additions and 595 deletions

View File

@ -8,18 +8,6 @@ import (
"github.com/minio/minio/pkg/server/api"
)
func removeDuplicates(slice []string) []string {
newSlice := []string{}
seen := make(map[string]struct{})
for _, val := range slice {
if _, ok := seen[val]; !ok {
newSlice = append(newSlice, val)
seen[val] = struct{}{}
}
}
return newSlice
}
var commands = []cli.Command{
serverCmd,
controlCmd,

View File

@ -16,6 +16,8 @@
package api
import "github.com/minio/minio/pkg/storage/donut"
// Operation container for individual operations read by Ticket Master
type Operation struct {
ProceedCh chan struct{}
@ -23,10 +25,16 @@ type Operation struct {
// Minio container for API and also carries OP (operation) channel
type Minio struct {
OP chan Operation
OP chan Operation
Donut donut.Interface
}
// New instantiate a new minio API
func New() Minio {
return Minio{OP: make(chan Operation)}
// ignore errors for now
d, _ := donut.LoadDonut()
return Minio{
OP: make(chan Operation),
Donut: d,
}
}

View File

@ -17,16 +17,51 @@
package api
import (
"log"
"net/http"
"github.com/gorilla/mux"
"github.com/minio/minio/pkg/iodine"
"github.com/minio/minio/pkg/storage/donut"
"github.com/minio/minio/pkg/utils/log"
)
func (api Minio) isValidOp(w http.ResponseWriter, req *http.Request, acceptsContentType contentType) bool {
vars := mux.Vars(req)
bucket := vars["bucket"]
log.Println(bucket)
bucketMetadata, err := api.Donut.GetBucketMetadata(bucket)
switch iodine.ToError(err).(type) {
case donut.BucketNotFound:
{
writeErrorResponse(w, req, NoSuchBucket, acceptsContentType, req.URL.Path)
return false
}
case donut.BucketNameInvalid:
{
writeErrorResponse(w, req, InvalidBucketName, acceptsContentType, req.URL.Path)
return false
}
case nil:
if _, err := stripAuth(req); err != nil {
if bucketMetadata.ACL.IsPrivate() {
return true
//uncomment this when we have webcli
//writeErrorResponse(w, req, AccessDenied, acceptsContentType, req.URL.Path)
//return false
}
if bucketMetadata.ACL.IsPublicRead() && req.Method == "PUT" {
return true
//uncomment this when we have webcli
//writeErrorResponse(w, req, AccessDenied, acceptsContentType, req.URL.Path)
//return false
}
}
default:
{
log.Error.Println(iodine.New(err, nil))
writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path)
}
}
return true
}
@ -38,18 +73,16 @@ func (api Minio) isValidOp(w http.ResponseWriter, req *http.Request, acceptsCont
// This operation returns at most 1,000 multipart uploads in the response.
//
func (api Minio) ListMultipartUploadsHandler(w http.ResponseWriter, req *http.Request) {
acceptsContentType := getContentType(req)
op := Operation{}
op.ProceedCh = make(chan struct{})
api.OP <- op
// block until Ticket master gives us a go
<-op.ProceedCh
// Ticket master block
{
// do you operation
op := Operation{}
op.ProceedCh = make(chan struct{})
api.OP <- op
// block until ticket master gives us a go
<-op.ProceedCh
}
log.Println(acceptsContentType)
acceptsContentType := getContentType(req)
resources := getBucketMultipartResources(req.URL.Query())
if resources.MaxUploads == 0 {
resources.MaxUploads = maxObjectList
@ -57,7 +90,29 @@ func (api Minio) ListMultipartUploadsHandler(w http.ResponseWriter, req *http.Re
vars := mux.Vars(req)
bucket := vars["bucket"]
log.Println(bucket)
resources, err := api.Donut.ListMultipartUploads(bucket, resources)
switch iodine.ToError(err).(type) {
case nil: // success
{
// generate response
response := generateListMultipartUploadsResult(bucket, resources)
encodedSuccessResponse := encodeSuccessResponse(response, acceptsContentType)
// write headers
setCommonHeaders(w, getContentTypeString(acceptsContentType), len(encodedSuccessResponse))
// write body
w.Write(encodedSuccessResponse)
}
case donut.BucketNotFound:
{
writeErrorResponse(w, req, NoSuchBucket, acceptsContentType, req.URL.Path)
}
default:
{
log.Error.Println(iodine.New(err, nil))
writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path)
}
}
}
// ListObjectsHandler - GET Bucket (List Objects)
@ -67,18 +122,16 @@ func (api Minio) ListMultipartUploadsHandler(w http.ResponseWriter, req *http.Re
// criteria to return a subset of the objects in a bucket.
//
func (api Minio) ListObjectsHandler(w http.ResponseWriter, req *http.Request) {
acceptsContentType := getContentType(req)
op := Operation{}
op.ProceedCh = make(chan struct{})
api.OP <- op
// block until Ticket master gives us a go
<-op.ProceedCh
// Ticket master block
{
// do you operation
op := Operation{}
op.ProceedCh = make(chan struct{})
api.OP <- op
// block until Ticket master gives us a go
<-op.ProceedCh
}
log.Println(acceptsContentType)
acceptsContentType := getContentType(req)
// verify if bucket allows this operation
if !api.isValidOp(w, req, acceptsContentType) {
return
@ -96,8 +149,25 @@ func (api Minio) ListObjectsHandler(w http.ResponseWriter, req *http.Request) {
vars := mux.Vars(req)
bucket := vars["bucket"]
log.Println(bucket)
objects, resources, err := api.Donut.ListObjects(bucket, resources)
switch iodine.ToError(err).(type) {
case nil:
// generate response
response := generateListObjectsResponse(bucket, objects, resources)
encodedSuccessResponse := encodeSuccessResponse(response, acceptsContentType)
// write headers
setCommonHeaders(w, getContentTypeString(acceptsContentType), len(encodedSuccessResponse))
// write body
w.Write(encodedSuccessResponse)
case donut.ObjectNotFound:
writeErrorResponse(w, req, NoSuchKey, acceptsContentType, req.URL.Path)
case donut.ObjectNameInvalid:
writeErrorResponse(w, req, NoSuchKey, acceptsContentType, req.URL.Path)
default:
log.Error.Println(iodine.New(err, nil))
writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path)
}
}
// ListBucketsHandler - GET Service
@ -105,6 +175,15 @@ func (api Minio) ListObjectsHandler(w http.ResponseWriter, req *http.Request) {
// This implementation of the GET operation returns a list of all buckets
// owned by the authenticated sender of the request.
func (api Minio) ListBucketsHandler(w http.ResponseWriter, req *http.Request) {
// Ticket master block
{
op := Operation{}
op.ProceedCh = make(chan struct{})
api.OP <- op
// block until Ticket master gives us a go
<-op.ProceedCh
}
acceptsContentType := getContentType(req)
// uncomment this when we have webcli
// without access key credentials one cannot list buckets
@ -112,21 +191,36 @@ func (api Minio) ListBucketsHandler(w http.ResponseWriter, req *http.Request) {
// writeErrorResponse(w, req, AccessDenied, acceptsContentType, req.URL.Path)
// return
// }
op := Operation{}
op.ProceedCh = make(chan struct{})
api.OP <- op
// block until Ticket master gives us a go
<-op.ProceedCh
{
// do you operation
buckets, err := api.Donut.ListBuckets()
switch iodine.ToError(err).(type) {
case nil:
// generate response
response := generateListBucketsResponse(buckets)
encodedSuccessResponse := encodeSuccessResponse(response, acceptsContentType)
// write headers
setCommonHeaders(w, getContentTypeString(acceptsContentType), len(encodedSuccessResponse))
// write response
w.Write(encodedSuccessResponse)
default:
log.Error.Println(iodine.New(err, nil))
writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path)
}
log.Println(acceptsContentType)
}
// PutBucketHandler - PUT Bucket
// ----------
// This implementation of the PUT operation creates a new bucket for authenticated request
func (api Minio) PutBucketHandler(w http.ResponseWriter, req *http.Request) {
// Ticket master block
{
op := Operation{}
op.ProceedCh = make(chan struct{})
api.OP <- op
// block until Ticket master gives us a go
<-op.ProceedCh
}
acceptsContentType := getContentType(req)
// uncomment this when we have webcli
// without access key credentials one cannot create a bucket
@ -134,15 +228,6 @@ func (api Minio) PutBucketHandler(w http.ResponseWriter, req *http.Request) {
// writeErrorResponse(w, req, AccessDenied, acceptsContentType, req.URL.Path)
// return
// }
op := Operation{}
op.ProceedCh = make(chan struct{})
api.OP <- op
// block until Ticket master gives us a go
<-op.ProceedCh
{
// do you operation
}
log.Println(acceptsContentType)
if isRequestBucketACL(req.URL.Query()) {
api.PutBucketACLHandler(w, req)
@ -157,24 +242,39 @@ func (api Minio) PutBucketHandler(w http.ResponseWriter, req *http.Request) {
vars := mux.Vars(req)
bucket := vars["bucket"]
log.Println(bucket)
err := api.Donut.MakeBucket(bucket, getACLTypeString(aclType))
switch iodine.ToError(err).(type) {
case nil:
// Make sure to add Location information here only for bucket
w.Header().Set("Location", "/"+bucket)
writeSuccessResponse(w, acceptsContentType)
case donut.TooManyBuckets:
writeErrorResponse(w, req, TooManyBuckets, acceptsContentType, req.URL.Path)
case donut.BucketNameInvalid:
writeErrorResponse(w, req, InvalidBucketName, acceptsContentType, req.URL.Path)
case donut.BucketExists:
writeErrorResponse(w, req, BucketAlreadyExists, acceptsContentType, req.URL.Path)
default:
log.Error.Println(iodine.New(err, nil))
writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path)
}
}
// PutBucketACLHandler - PUT Bucket ACL
// ----------
// This implementation of the PUT operation modifies the bucketACL for authenticated request
func (api Minio) PutBucketACLHandler(w http.ResponseWriter, req *http.Request) {
acceptsContentType := getContentType(req)
op := Operation{}
op.ProceedCh = make(chan struct{})
api.OP <- op
// block until Ticket master gives us a go
<-op.ProceedCh
// Ticket master block
{
// do you operation
op := Operation{}
op.ProceedCh = make(chan struct{})
api.OP <- op
// block until Ticket master gives us a go
<-op.ProceedCh
}
log.Println(acceptsContentType)
acceptsContentType := getContentType(req)
// read from 'x-amz-acl'
aclType := getACLType(req)
@ -185,7 +285,19 @@ func (api Minio) PutBucketACLHandler(w http.ResponseWriter, req *http.Request) {
vars := mux.Vars(req)
bucket := vars["bucket"]
log.Println(bucket)
err := api.Donut.SetBucketMetadata(bucket, map[string]string{"acl": getACLTypeString(aclType)})
switch iodine.ToError(err).(type) {
case nil:
writeSuccessResponse(w, acceptsContentType)
case donut.BucketNameInvalid:
writeErrorResponse(w, req, InvalidBucketName, acceptsContentType, req.URL.Path)
case donut.BucketNotFound:
writeErrorResponse(w, req, NoSuchBucket, acceptsContentType, req.URL.Path)
default:
log.Error.Println(iodine.New(err, nil))
writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path)
}
}
// HeadBucketHandler - HEAD Bucket
@ -195,19 +307,33 @@ func (api Minio) PutBucketACLHandler(w http.ResponseWriter, req *http.Request) {
// have permission to access it. Otherwise, the operation might
// return responses such as 404 Not Found and 403 Forbidden.
func (api Minio) HeadBucketHandler(w http.ResponseWriter, req *http.Request) {
acceptsContentType := getContentType(req)
op := Operation{}
op.ProceedCh = make(chan struct{})
api.OP <- op
// block until Ticket master gives us a go
<-op.ProceedCh
// Ticket master block
{
// do you operation
op := Operation{}
op.ProceedCh = make(chan struct{})
api.OP <- op
// block until Ticket master gives us a go
<-op.ProceedCh
}
log.Println(acceptsContentType)
acceptsContentType := getContentType(req)
vars := mux.Vars(req)
bucket := vars["bucket"]
log.Println(bucket)
_, err := api.Donut.GetBucketMetadata(bucket)
switch iodine.ToError(err).(type) {
case nil:
writeSuccessResponse(w, acceptsContentType)
case donut.BucketNotFound:
error := getErrorCode(NoSuchBucket)
w.WriteHeader(error.HTTPStatusCode)
case donut.BucketNameInvalid:
error := getErrorCode(InvalidBucketName)
w.WriteHeader(error.HTTPStatusCode)
default:
log.Error.Println(iodine.New(err, nil))
error := getErrorCode(InternalError)
w.WriteHeader(error.HTTPStatusCode)
}
}

View File

@ -9,7 +9,7 @@
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implieapi.Donut.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@ -25,6 +25,7 @@ import (
"github.com/gorilla/mux"
"github.com/minio/minio/pkg/iodine"
"github.com/minio/minio/pkg/storage/donut"
"github.com/minio/minio/pkg/utils/log"
)
@ -37,17 +38,16 @@ const (
// This implementation of the GET operation retrieves object. To use GET,
// you must have READ access to the object.
func (api Minio) GetObjectHandler(w http.ResponseWriter, req *http.Request) {
acceptsContentType := getContentType(req)
op := Operation{}
op.ProceedCh = make(chan struct{})
api.OP <- op
// block until Ticket master gives us a go
<-op.ProceedCh
// ticket master block
{
// do you operation
op := Operation{}
op.ProceedCh = make(chan struct{})
api.OP <- op
// block until Ticket master gives us a go
<-op.ProceedCh
}
log.Println(acceptsContentType)
acceptsContentType := getContentType(req)
// verify if this operation is allowed
if !api.isValidOp(w, req, acceptsContentType) {
@ -58,25 +58,57 @@ func (api Minio) GetObjectHandler(w http.ResponseWriter, req *http.Request) {
vars := mux.Vars(req)
bucket = vars["bucket"]
object = vars["object"]
log.Println(bucket, object)
metadata, err := api.Donut.GetObjectMetadata(bucket, object)
switch iodine.ToError(err).(type) {
case nil: // success
{
httpRange, err := getRequestedRange(req, metadata.Size)
if err != nil {
writeErrorResponse(w, req, InvalidRange, acceptsContentType, req.URL.Path)
return
}
switch httpRange.start == 0 && httpRange.length == 0 {
case true:
setObjectHeaders(w, metadata)
if _, err := api.Donut.GetObject(w, bucket, object); err != nil {
// unable to write headers, we've already printed data. Just close the connection.
log.Error.Println(iodine.New(err, nil))
}
case false:
metadata.Size = httpRange.length
setRangeObjectHeaders(w, metadata, httpRange)
w.WriteHeader(http.StatusPartialContent)
if _, err := api.Donut.GetPartialObject(w, bucket, object, httpRange.start, httpRange.length); err != nil {
// unable to write headers, we've already printed data. Just close the connection.
log.Error.Println(iodine.New(err, nil))
}
}
}
case donut.ObjectNotFound:
writeErrorResponse(w, req, NoSuchKey, acceptsContentType, req.URL.Path)
case donut.ObjectNameInvalid:
writeErrorResponse(w, req, NoSuchKey, acceptsContentType, req.URL.Path)
default:
log.Error.Println(iodine.New(err, nil))
writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path)
}
}
// HeadObjectHandler - HEAD Object
// -----------
// The HEAD operation retrieves metadata from an object without returning the object itself.
func (api Minio) HeadObjectHandler(w http.ResponseWriter, req *http.Request) {
acceptsContentType := getContentType(req)
op := Operation{}
op.ProceedCh = make(chan struct{})
api.OP <- op
// block until Ticket master gives us a go
<-op.ProceedCh
// ticket master block
{
// do you operation
op := Operation{}
op.ProceedCh = make(chan struct{})
api.OP <- op
// block until Ticket master gives us a go
<-op.ProceedCh
}
log.Println(acceptsContentType)
acceptsContentType := getContentType(req)
// verify if this operation is allowed
if !api.isValidOp(w, req, acceptsContentType) {
@ -87,25 +119,42 @@ func (api Minio) HeadObjectHandler(w http.ResponseWriter, req *http.Request) {
vars := mux.Vars(req)
bucket = vars["bucket"]
object = vars["object"]
log.Println(bucket, object)
metadata, err := api.Donut.GetObjectMetadata(bucket, object)
switch iodine.ToError(err).(type) {
case nil:
setObjectHeaders(w, metadata)
w.WriteHeader(http.StatusOK)
case donut.ObjectNotFound:
error := getErrorCode(NoSuchKey)
w.Header().Set("Server", "Minio")
w.WriteHeader(error.HTTPStatusCode)
case donut.ObjectNameInvalid:
error := getErrorCode(NoSuchKey)
w.Header().Set("Server", "Minio")
w.WriteHeader(error.HTTPStatusCode)
default:
log.Error.Println(iodine.New(err, nil))
error := getErrorCode(InternalError)
w.Header().Set("Server", "Minio")
w.WriteHeader(error.HTTPStatusCode)
}
}
// PutObjectHandler - PUT Object
// ----------
// This implementation of the PUT operation adds an object to a bucket.
func (api Minio) PutObjectHandler(w http.ResponseWriter, req *http.Request) {
acceptsContentType := getContentType(req)
op := Operation{}
op.ProceedCh = make(chan struct{})
api.OP <- op
// block until Ticket master gives us a go
<-op.ProceedCh
// Ticket master block
{
// do you operation
op := Operation{}
op.ProceedCh = make(chan struct{})
api.OP <- op
// block until Ticket master gives us a go
<-op.ProceedCh
}
log.Println(acceptsContentType)
acceptsContentType := getContentType(req)
// verify if this operation is allowed
if !api.isValidOp(w, req, acceptsContentType) {
return
@ -122,7 +171,7 @@ func (api Minio) PutObjectHandler(w http.ResponseWriter, req *http.Request) {
writeErrorResponse(w, req, InvalidDigest, acceptsContentType, req.URL.Path)
return
}
/// if Content-Length missing, throw away
/// if Content-Length missing, deny the request
size := req.Header.Get("Content-Length")
if size == "" {
writeErrorResponse(w, req, MissingContentLength, acceptsContentType, req.URL.Path)
@ -148,24 +197,40 @@ func (api Minio) PutObjectHandler(w http.ResponseWriter, req *http.Request) {
writeErrorResponse(w, req, InvalidRequest, acceptsContentType, req.URL.Path)
return
}
log.Println(bucket, object, sizeInt64)
metadata, err := api.Donut.CreateObject(bucket, object, md5, sizeInt64, req.Body, nil)
switch iodine.ToError(err).(type) {
case nil:
w.Header().Set("ETag", metadata.MD5Sum)
writeSuccessResponse(w, acceptsContentType)
case donut.ObjectExists:
writeErrorResponse(w, req, MethodNotAllowed, acceptsContentType, req.URL.Path)
case donut.BadDigest:
writeErrorResponse(w, req, BadDigest, acceptsContentType, req.URL.Path)
case donut.EntityTooLarge:
writeErrorResponse(w, req, EntityTooLarge, acceptsContentType, req.URL.Path)
case donut.InvalidDigest:
writeErrorResponse(w, req, InvalidDigest, acceptsContentType, req.URL.Path)
default:
log.Error.Println(iodine.New(err, nil))
writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path)
}
}
/// Multipart API
// NewMultipartUploadHandler - New multipart upload
func (api Minio) NewMultipartUploadHandler(w http.ResponseWriter, req *http.Request) {
acceptsContentType := getContentType(req)
op := Operation{}
op.ProceedCh = make(chan struct{})
api.OP <- op
// block until Ticket master gives us a go
<-op.ProceedCh
// Ticket master block
{
// do you operation
op := Operation{}
op.ProceedCh = make(chan struct{})
api.OP <- op
// block until Ticket master gives us a go
<-op.ProceedCh
}
log.Println(acceptsContentType)
acceptsContentType := getContentType(req)
// handle ACL's here at bucket level
if !api.isValidOp(w, req, acceptsContentType) {
@ -181,22 +246,38 @@ func (api Minio) NewMultipartUploadHandler(w http.ResponseWriter, req *http.Requ
vars := mux.Vars(req)
bucket = vars["bucket"]
object = vars["object"]
log.Println(bucket, object)
uploadID, err := api.Donut.NewMultipartUpload(bucket, object, "")
switch iodine.ToError(err).(type) {
case nil:
{
response := generateInitiateMultipartUploadResult(bucket, object, uploadID)
encodedSuccessResponse := encodeSuccessResponse(response, acceptsContentType)
// write headers
setCommonHeaders(w, getContentTypeString(acceptsContentType), len(encodedSuccessResponse))
// write body
w.Write(encodedSuccessResponse)
}
case donut.ObjectExists:
writeErrorResponse(w, req, MethodNotAllowed, acceptsContentType, req.URL.Path)
default:
log.Error.Println(iodine.New(err, nil))
writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path)
}
}
// PutObjectPartHandler - Upload part
func (api Minio) PutObjectPartHandler(w http.ResponseWriter, req *http.Request) {
acceptsContentType := getContentType(req)
op := Operation{}
op.ProceedCh = make(chan struct{})
api.OP <- op
// block until Ticket master gives us a go
<-op.ProceedCh
// Ticket master block
{
// do you operation
op := Operation{}
op.ProceedCh = make(chan struct{})
api.OP <- op
// block until Ticket master gives us a go
<-op.ProceedCh
}
log.Println(acceptsContentType)
acceptsContentType := getContentType(req)
// handle ACL's here at bucket level
if !api.isValidOp(w, req, acceptsContentType) {
@ -232,7 +313,6 @@ func (api Minio) PutObjectPartHandler(w http.ResponseWriter, req *http.Request)
vars := mux.Vars(req)
bucket := vars["bucket"]
object := vars["object"]
log.Println(bucket, object, sizeInt64)
uploadID := req.URL.Query().Get("uploadId")
partIDString := req.URL.Query().Get("partNumber")
@ -241,22 +321,40 @@ func (api Minio) PutObjectPartHandler(w http.ResponseWriter, req *http.Request)
if err != nil {
writeErrorResponse(w, req, InvalidPart, acceptsContentType, req.URL.Path)
}
log.Println(uploadID, partID)
calculatedMD5, err := api.Donut.CreateObjectPart(bucket, object, uploadID, partID, "", md5, sizeInt64, req.Body)
switch iodine.ToError(err).(type) {
case nil:
w.Header().Set("ETag", calculatedMD5)
writeSuccessResponse(w, acceptsContentType)
case donut.InvalidUploadID:
writeErrorResponse(w, req, NoSuchUpload, acceptsContentType, req.URL.Path)
case donut.ObjectExists:
writeErrorResponse(w, req, MethodNotAllowed, acceptsContentType, req.URL.Path)
case donut.BadDigest:
writeErrorResponse(w, req, BadDigest, acceptsContentType, req.URL.Path)
case donut.EntityTooLarge:
writeErrorResponse(w, req, EntityTooLarge, acceptsContentType, req.URL.Path)
case donut.InvalidDigest:
writeErrorResponse(w, req, InvalidDigest, acceptsContentType, req.URL.Path)
default:
log.Error.Println(iodine.New(err, nil))
writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path)
}
}
// AbortMultipartUploadHandler - Abort multipart upload
func (api Minio) AbortMultipartUploadHandler(w http.ResponseWriter, req *http.Request) {
acceptsContentType := getContentType(req)
op := Operation{}
op.ProceedCh = make(chan struct{})
api.OP <- op
// block until Ticket master gives us a go
<-op.ProceedCh
// Ticket master block
{
// do you operation
op := Operation{}
op.ProceedCh = make(chan struct{})
api.OP <- op
// block until Ticket master gives us a go
<-op.ProceedCh
}
log.Println(acceptsContentType)
acceptsContentType := getContentType(req)
// handle ACL's here at bucket level
if !api.isValidOp(w, req, acceptsContentType) {
@ -267,23 +365,33 @@ func (api Minio) AbortMultipartUploadHandler(w http.ResponseWriter, req *http.Re
bucket := vars["bucket"]
object := vars["object"]
//objectResourcesMetadata := getObjectResources(req.URL.Query())
log.Println(bucket, object)
objectResourcesMetadata := getObjectResources(req.URL.Query())
err := api.Donut.AbortMultipartUpload(bucket, object, objectResourcesMetadata.UploadID)
switch iodine.ToError(err).(type) {
case nil:
setCommonHeaders(w, getContentTypeString(acceptsContentType), 0)
w.WriteHeader(http.StatusNoContent)
case donut.InvalidUploadID:
writeErrorResponse(w, req, NoSuchUpload, acceptsContentType, req.URL.Path)
default:
log.Error.Println(iodine.New(err, nil))
writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path)
}
}
// ListObjectPartsHandler - List object parts
func (api Minio) ListObjectPartsHandler(w http.ResponseWriter, req *http.Request) {
acceptsContentType := getContentType(req)
op := Operation{}
op.ProceedCh = make(chan struct{})
api.OP <- op
// block until Ticket master gives us a go
<-op.ProceedCh
// Ticket master block
{
// do you operation
op := Operation{}
op.ProceedCh = make(chan struct{})
api.OP <- op
// block until Ticket master gives us a go
<-op.ProceedCh
}
log.Println(acceptsContentType)
acceptsContentType := getContentType(req)
// handle ACL's here at bucket level
if !api.isValidOp(w, req, acceptsContentType) {
@ -298,22 +406,38 @@ func (api Minio) ListObjectPartsHandler(w http.ResponseWriter, req *http.Request
vars := mux.Vars(req)
bucket := vars["bucket"]
object := vars["object"]
log.Println(bucket, object)
objectResourcesMetadata, err := api.Donut.ListObjectParts(bucket, object, objectResourcesMetadata)
switch iodine.ToError(err).(type) {
case nil:
{
response := generateListPartsResult(objectResourcesMetadata)
encodedSuccessResponse := encodeSuccessResponse(response, acceptsContentType)
// write headers
setCommonHeaders(w, getContentTypeString(acceptsContentType), len(encodedSuccessResponse))
// write body
w.Write(encodedSuccessResponse)
}
case donut.InvalidUploadID:
writeErrorResponse(w, req, NoSuchUpload, acceptsContentType, req.URL.Path)
default:
log.Error.Println(iodine.New(err, nil))
writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path)
}
}
// CompleteMultipartUploadHandler - Complete multipart upload
func (api Minio) CompleteMultipartUploadHandler(w http.ResponseWriter, req *http.Request) {
acceptsContentType := getContentType(req)
op := Operation{}
op.ProceedCh = make(chan struct{})
api.OP <- op
// block until Ticket master gives us a go
<-op.ProceedCh
// Ticket master block
{
// do you operation
op := Operation{}
op.ProceedCh = make(chan struct{})
api.OP <- op
// block until Ticket master gives us a go
<-op.ProceedCh
}
log.Println(acceptsContentType)
acceptsContentType := getContentType(req)
// handle ACL's here at bucket level
if !api.isValidOp(w, req, acceptsContentType) {
@ -336,15 +460,31 @@ func (api Minio) CompleteMultipartUploadHandler(w http.ResponseWriter, req *http
vars := mux.Vars(req)
bucket := vars["bucket"]
object := vars["object"]
log.Println(bucket, object)
//objectResourcesMetadata := getObjectResources(req.URL.Query())
objectResourcesMetadata := getObjectResources(req.URL.Query())
partMap := make(map[int]string)
for _, part := range parts.Part {
partMap[part.PartNumber] = part.ETag
}
metadata, err := api.Donut.CompleteMultipartUpload(bucket, object, objectResourcesMetadata.UploadID, partMap)
switch iodine.ToError(err).(type) {
case nil:
{
response := generateCompleteMultpartUploadResult(bucket, object, "", metadata.MD5Sum)
encodedSuccessResponse := encodeSuccessResponse(response, acceptsContentType)
// write headers
setCommonHeaders(w, getContentTypeString(acceptsContentType), len(encodedSuccessResponse))
// write body
w.Write(encodedSuccessResponse)
}
case donut.InvalidUploadID:
writeErrorResponse(w, req, NoSuchUpload, acceptsContentType, req.URL.Path)
default:
log.Error.Println(iodine.New(err, nil))
writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path)
}
}
/// Delete API

View File

@ -24,9 +24,9 @@ import (
"github.com/minio/minio/pkg/server/rpc"
)
// Get api
func getAPI() api.Minio {
a := api.New()
return a
return api.New()
}
// registerAPI - register all the object API handlers to their respective paths
@ -82,15 +82,15 @@ func registerChain(handlers ...handlerFunc) chain {
return ch
}
// registerOtherMiddleware register all available middleware
func registerOtherMiddleware(mux http.Handler, conf api.Config) http.Handler {
// registerCustomMiddleware register all available custom middleware
func registerCustomMiddleware(mux http.Handler, conf api.Config) http.Handler {
ch := registerChain(
api.ValidContentTypeHandler,
api.TimeValidityHandler,
api.IgnoreResourcesHandler,
api.ValidateAuthHeaderHandler,
api.LoggingHandler,
// Add new middleware here
// Add new your new middleware here
)
mux = ch.final(mux)
@ -109,7 +109,7 @@ func getAPIHandler(conf api.Config) (http.Handler, api.Minio) {
mux := router.NewRouter()
minioAPI := getAPI()
apiHandler := registerAPI(mux, minioAPI)
apiHandler = registerOtherMiddleware(apiHandler, conf)
apiHandler = registerCustomMiddleware(apiHandler, conf)
return apiHandler, minioAPI
}
@ -120,6 +120,6 @@ func getRPCHandler() http.Handler {
s.RegisterService(new(rpc.HelloService), "")
s.RegisterService(new(rpc.VersionService), "")
s.RegisterService(new(rpc.GetSysInfoService), "")
// add more services here
// Add new services here
return registerRPC(router.NewRouter(), s)
}

View File

@ -25,6 +25,7 @@ import (
"github.com/minio/minio/pkg/server/api"
)
// Start API listener
func startAPI(errCh chan error, conf api.Config, apiHandler http.Handler) {
defer close(errCh)
@ -74,6 +75,7 @@ func startAPI(errCh chan error, conf api.Config, apiHandler http.Handler) {
}
}
// Start RPC listener
func startRPC(errCh chan error, rpcHandler http.Handler) {
defer close(errCh)
@ -86,10 +88,11 @@ func startRPC(errCh chan error, rpcHandler http.Handler) {
errCh <- httpServer.ListenAndServe()
}
// Start ticket master
func startTM(a api.Minio) {
for {
for op := range a.OP {
close(op.ProceedCh)
op.ProceedCh <- struct{}{}
}
}
}
@ -101,8 +104,7 @@ func StartServices(conf api.Config) error {
apiHandler, minioAPI := getAPIHandler(conf)
go startAPI(apiErrCh, conf, apiHandler)
rpcHandler := getRPCHandler()
go startRPC(rpcErrCh, rpcHandler)
go startRPC(rpcErrCh, getRPCHandler())
go startTM(minioAPI)
select {

View File

@ -57,6 +57,7 @@ func newBucket(bucketName, aclType, donutName string, nodes map[string]node) (bu
"donutName": donutName,
"aclType": aclType,
}
if strings.TrimSpace(bucketName) == "" || strings.TrimSpace(donutName) == "" {
return bucket{}, BucketMetadata{}, iodine.New(InvalidArgument{}, errParams)
}
@ -130,7 +131,7 @@ func (b bucket) GetObjectMetadata(objectName string) (ObjectMetadata, error) {
}
// ListObjects - list all objects
func (b bucket) ListObjects(prefix, marker, delimiter string, maxkeys int) (ListObjects, error) {
func (b bucket) ListObjects(prefix, marker, delimiter string, maxkeys int) (ListObjectsResults, error) {
b.lock.RLock()
defer b.lock.RUnlock()
if maxkeys <= 0 {
@ -140,7 +141,7 @@ func (b bucket) ListObjects(prefix, marker, delimiter string, maxkeys int) (List
var objects []string
bucketMetadata, err := b.getBucketMetadata()
if err != nil {
return ListObjects{}, iodine.New(err, nil)
return ListObjectsResults{}, iodine.New(err, nil)
}
for objectName := range bucketMetadata.Buckets[b.getBucketName()].BucketObjects {
if strings.HasPrefix(objectName, strings.TrimSpace(prefix)) {
@ -181,7 +182,7 @@ func (b bucket) ListObjects(prefix, marker, delimiter string, maxkeys int) (List
commonPrefixes = RemoveDuplicates(commonPrefixes)
sort.Strings(commonPrefixes)
listObjects := ListObjects{}
listObjects := ListObjectsResults{}
listObjects.Objects = make(map[string]ObjectMetadata)
listObjects.CommonPrefixes = commonPrefixes
listObjects.IsTruncated = isTruncated
@ -189,7 +190,7 @@ func (b bucket) ListObjects(prefix, marker, delimiter string, maxkeys int) (List
for _, objectName := range results {
objMetadata, err := b.readObjectMetadata(normalizeObjectName(objectName))
if err != nil {
return ListObjects{}, iodine.New(err, nil)
return ListObjectsResults{}, iodine.New(err, nil)
}
listObjects.Objects[objectName] = objMetadata
}

View File

@ -19,10 +19,31 @@ package donut
import (
"bufio"
"bytes"
"io"
"sort"
"strings"
)
// ProxyWriter implements io.Writer to trap written bytes
type ProxyWriter struct {
writer io.Writer
writtenBytes []byte
}
func (r *ProxyWriter) Write(p []byte) (n int, err error) {
n, err = r.writer.Write(p)
if err != nil {
return
}
r.writtenBytes = append(r.writtenBytes, p[0:n]...)
return
}
// NewProxyWriter - wrap around a given writer with ProxyWriter
func NewProxyWriter(w io.Writer) *ProxyWriter {
return &ProxyWriter{writer: w, writtenBytes: nil}
}
// Delimiter delims the string at delimiter
func Delimiter(object, delimiter string) string {
readBuffer := bytes.NewBufferString(object)

View File

@ -0,0 +1,87 @@
/*
* Minimalist Object Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package donut
import (
"os/user"
"path/filepath"
"time"
"github.com/minio/minio/pkg/iodine"
"github.com/minio/minio/pkg/quick"
)
// getDonutConfigPath get donut config file path
func getDonutConfigPath() (string, error) {
u, err := user.Current()
if err != nil {
return "", iodine.New(err, nil)
}
donutConfigPath := filepath.Join(u.HomeDir, ".minio", "donut.json")
return donutConfigPath, nil
}
// SaveConfig save donut config
func SaveConfig(a *Config) error {
donutConfigPath, err := getDonutConfigPath()
if err != nil {
return iodine.New(err, nil)
}
qc, err := quick.New(a)
if err != nil {
return iodine.New(err, nil)
}
if err := qc.Save(donutConfigPath); err != nil {
return iodine.New(err, nil)
}
return nil
}
// LoadConfig load donut config
func LoadConfig() (*Config, error) {
donutConfigPath, err := getDonutConfigPath()
if err != nil {
return nil, iodine.New(err, nil)
}
a := &Config{}
a.Version = "0.0.1"
qc, err := quick.New(a)
if err != nil {
return nil, iodine.New(err, nil)
}
if err := qc.Load(donutConfigPath); err != nil {
return nil, iodine.New(err, nil)
}
return qc.Data().(*Config), nil
}
// LoadDonut load donut from config
func LoadDonut() (Interface, error) {
conf, err := LoadConfig()
if err != nil {
conf = &Config{
Version: "0.0.1",
MaxSize: 512000000,
Expiration: 1 * time.Hour,
}
}
donut, err := New(conf)
if err != nil {
return nil, iodine.New(err, nil)
}
return donut, nil
}

View File

@ -65,8 +65,8 @@ type BucketMetadata struct {
BucketObjects map[string]interface{} `json:"objects"`
}
// ListObjects container for list objects response
type ListObjects struct {
// ListObjectsResults container for list objects response
type ListObjectsResults struct {
Objects map[string]ObjectMetadata `json:"objects"`
CommonPrefixes []string `json:"commonPrefixes"`
IsTruncated bool `json:"isTruncated"`

View File

@ -24,19 +24,10 @@ import (
"path/filepath"
"strconv"
"strings"
"sync"
"github.com/minio/minio/pkg/iodine"
)
// donut struct internal data
type donut struct {
name string
buckets map[string]bucket
nodes map[string]node
lock *sync.RWMutex
}
// config files used inside Donut
const (
// donut system config
@ -51,75 +42,41 @@ const (
bucketMetadataVersion = "1.0.0"
)
// attachDonutNode - wrapper function to instantiate a new node for associatedt donut
// based on the provided configuration
func (dt donut) attachDonutNode(hostname string, disks []string) error {
if err := dt.AttachNode(hostname, disks); err != nil {
return iodine.New(err, nil)
}
return nil
}
// NewDonut - instantiate a new donut
func NewDonut(donutName string, nodeDiskMap map[string][]string) (Donut, error) {
if donutName == "" || len(nodeDiskMap) == 0 {
return nil, iodine.New(InvalidArgument{}, nil)
}
nodes := make(map[string]node)
buckets := make(map[string]bucket)
d := donut{
name: donutName,
nodes: nodes,
buckets: buckets,
lock: new(sync.RWMutex),
}
for k, v := range nodeDiskMap {
if len(v) == 0 {
return nil, iodine.New(InvalidDisksArgument{}, nil)
}
err := d.attachDonutNode(k, v)
if err != nil {
return nil, iodine.New(err, nil)
}
}
return d, nil
}
// MakeBucket - make a new bucket
func (dt donut) MakeBucket(bucket string, acl BucketACL) error {
dt.lock.Lock()
defer dt.lock.Unlock()
// makeBucket - make a new bucket
func (donut API) makeBucket(bucket string, acl BucketACL) error {
donut.lock.Lock()
defer donut.lock.Unlock()
if bucket == "" || strings.TrimSpace(bucket) == "" {
return iodine.New(InvalidArgument{}, nil)
}
return dt.makeDonutBucket(bucket, acl.String())
return donut.makeDonutBucket(bucket, acl.String())
}
// GetBucketMetadata - get bucket metadata
func (dt donut) GetBucketMetadata(bucketName string) (BucketMetadata, error) {
dt.lock.RLock()
defer dt.lock.RUnlock()
if err := dt.listDonutBuckets(); err != nil {
// getBucketMetadata - get bucket metadata
func (donut API) getBucketMetadata(bucketName string) (BucketMetadata, error) {
donut.lock.RLock()
defer donut.lock.RUnlock()
if err := donut.listDonutBuckets(); err != nil {
return BucketMetadata{}, iodine.New(err, nil)
}
if _, ok := dt.buckets[bucketName]; !ok {
if _, ok := donut.buckets[bucketName]; !ok {
return BucketMetadata{}, iodine.New(BucketNotFound{Bucket: bucketName}, nil)
}
metadata, err := dt.getDonutBucketMetadata()
metadata, err := donut.getDonutBucketMetadata()
if err != nil {
return BucketMetadata{}, iodine.New(err, nil)
}
return metadata.Buckets[bucketName], nil
}
// SetBucketMetadata - set bucket metadata
func (dt donut) SetBucketMetadata(bucketName string, bucketMetadata map[string]string) error {
dt.lock.Lock()
defer dt.lock.Unlock()
if err := dt.listDonutBuckets(); err != nil {
// setBucketMetadata - set bucket metadata
func (donut API) setBucketMetadata(bucketName string, bucketMetadata map[string]string) error {
donut.lock.Lock()
defer donut.lock.Unlock()
if err := donut.listDonutBuckets(); err != nil {
return iodine.New(err, nil)
}
metadata, err := dt.getDonutBucketMetadata()
metadata, err := donut.getDonutBucketMetadata()
if err != nil {
return iodine.New(err, nil)
}
@ -130,17 +87,17 @@ func (dt donut) SetBucketMetadata(bucketName string, bucketMetadata map[string]s
}
oldBucketMetadata.ACL = BucketACL(acl)
metadata.Buckets[bucketName] = oldBucketMetadata
return dt.setDonutBucketMetadata(metadata)
return donut.setDonutBucketMetadata(metadata)
}
// ListBuckets - return list of buckets
func (dt donut) ListBuckets() (map[string]BucketMetadata, error) {
dt.lock.RLock()
defer dt.lock.RUnlock()
if err := dt.listDonutBuckets(); err != nil {
// listBuckets - return list of buckets
func (donut API) listBuckets() (map[string]BucketMetadata, error) {
donut.lock.RLock()
defer donut.lock.RUnlock()
if err := donut.listDonutBuckets(); err != nil {
return nil, iodine.New(err, nil)
}
metadata, err := dt.getDonutBucketMetadata()
metadata, err := donut.getDonutBucketMetadata()
if err != nil {
// intentionally left out the error when Donut is empty
// but we need to revisit this area in future - since we need
@ -150,10 +107,10 @@ func (dt donut) ListBuckets() (map[string]BucketMetadata, error) {
return metadata.Buckets, nil
}
// ListObjects - return list of objects
func (dt donut) ListObjects(bucket, prefix, marker, delimiter string, maxkeys int) (ListObjects, error) {
dt.lock.RLock()
defer dt.lock.RUnlock()
// listObjects - return list of objects
func (donut API) listObjects(bucket, prefix, marker, delimiter string, maxkeys int) (ListObjectsResults, error) {
donut.lock.RLock()
defer donut.lock.RUnlock()
errParams := map[string]string{
"bucket": bucket,
"prefix": prefix,
@ -161,23 +118,23 @@ func (dt donut) ListObjects(bucket, prefix, marker, delimiter string, maxkeys in
"delimiter": delimiter,
"maxkeys": strconv.Itoa(maxkeys),
}
if err := dt.listDonutBuckets(); err != nil {
return ListObjects{}, iodine.New(err, errParams)
if err := donut.listDonutBuckets(); err != nil {
return ListObjectsResults{}, iodine.New(err, errParams)
}
if _, ok := dt.buckets[bucket]; !ok {
return ListObjects{}, iodine.New(BucketNotFound{Bucket: bucket}, errParams)
if _, ok := donut.buckets[bucket]; !ok {
return ListObjectsResults{}, iodine.New(BucketNotFound{Bucket: bucket}, errParams)
}
listObjects, err := dt.buckets[bucket].ListObjects(prefix, marker, delimiter, maxkeys)
listObjects, err := donut.buckets[bucket].ListObjects(prefix, marker, delimiter, maxkeys)
if err != nil {
return ListObjects{}, iodine.New(err, errParams)
return ListObjectsResults{}, iodine.New(err, errParams)
}
return listObjects, nil
}
// PutObject - put object
func (dt donut) PutObject(bucket, object, expectedMD5Sum string, reader io.Reader, metadata map[string]string) (ObjectMetadata, error) {
dt.lock.Lock()
defer dt.lock.Unlock()
// putObject - put object
func (donut API) putObject(bucket, object, expectedMD5Sum string, reader io.Reader, metadata map[string]string) (ObjectMetadata, error) {
donut.lock.Lock()
defer donut.lock.Unlock()
errParams := map[string]string{
"bucket": bucket,
"object": object,
@ -188,34 +145,34 @@ func (dt donut) PutObject(bucket, object, expectedMD5Sum string, reader io.Reade
if object == "" || strings.TrimSpace(object) == "" {
return ObjectMetadata{}, iodine.New(InvalidArgument{}, errParams)
}
if err := dt.listDonutBuckets(); err != nil {
if err := donut.listDonutBuckets(); err != nil {
return ObjectMetadata{}, iodine.New(err, errParams)
}
if _, ok := dt.buckets[bucket]; !ok {
if _, ok := donut.buckets[bucket]; !ok {
return ObjectMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, nil)
}
bucketMeta, err := dt.getDonutBucketMetadata()
bucketMeta, err := donut.getDonutBucketMetadata()
if err != nil {
return ObjectMetadata{}, iodine.New(err, errParams)
}
if _, ok := bucketMeta.Buckets[bucket].BucketObjects[object]; ok {
return ObjectMetadata{}, iodine.New(ObjectExists{Object: object}, errParams)
}
objMetadata, err := dt.buckets[bucket].WriteObject(object, reader, expectedMD5Sum, metadata)
objMetadata, err := donut.buckets[bucket].WriteObject(object, reader, expectedMD5Sum, metadata)
if err != nil {
return ObjectMetadata{}, iodine.New(err, errParams)
}
bucketMeta.Buckets[bucket].BucketObjects[object] = 1
if err := dt.setDonutBucketMetadata(bucketMeta); err != nil {
if err := donut.setDonutBucketMetadata(bucketMeta); err != nil {
return ObjectMetadata{}, iodine.New(err, errParams)
}
return objMetadata, nil
}
// GetObject - get object
func (dt donut) GetObject(bucket, object string) (reader io.ReadCloser, size int64, err error) {
dt.lock.RLock()
defer dt.lock.RUnlock()
// getObject - get object
func (donut API) getObject(bucket, object string) (reader io.ReadCloser, size int64, err error) {
donut.lock.RLock()
defer donut.lock.RUnlock()
errParams := map[string]string{
"bucket": bucket,
"object": object,
@ -226,37 +183,37 @@ func (dt donut) GetObject(bucket, object string) (reader io.ReadCloser, size int
if object == "" || strings.TrimSpace(object) == "" {
return nil, 0, iodine.New(InvalidArgument{}, errParams)
}
if err := dt.listDonutBuckets(); err != nil {
if err := donut.listDonutBuckets(); err != nil {
return nil, 0, iodine.New(err, nil)
}
if _, ok := dt.buckets[bucket]; !ok {
if _, ok := donut.buckets[bucket]; !ok {
return nil, 0, iodine.New(BucketNotFound{Bucket: bucket}, errParams)
}
return dt.buckets[bucket].ReadObject(object)
return donut.buckets[bucket].ReadObject(object)
}
// GetObjectMetadata - get object metadata
func (dt donut) GetObjectMetadata(bucket, object string) (ObjectMetadata, error) {
dt.lock.RLock()
defer dt.lock.RUnlock()
// getObjectMetadata - get object metadata
func (donut API) getObjectMetadata(bucket, object string) (ObjectMetadata, error) {
donut.lock.RLock()
defer donut.lock.RUnlock()
errParams := map[string]string{
"bucket": bucket,
"object": object,
}
if err := dt.listDonutBuckets(); err != nil {
if err := donut.listDonutBuckets(); err != nil {
return ObjectMetadata{}, iodine.New(err, errParams)
}
if _, ok := dt.buckets[bucket]; !ok {
if _, ok := donut.buckets[bucket]; !ok {
return ObjectMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, errParams)
}
bucketMeta, err := dt.getDonutBucketMetadata()
bucketMeta, err := donut.getDonutBucketMetadata()
if err != nil {
return ObjectMetadata{}, iodine.New(err, errParams)
}
if _, ok := bucketMeta.Buckets[bucket].BucketObjects[object]; !ok {
return ObjectMetadata{}, iodine.New(ObjectNotFound{Object: object}, errParams)
}
objectMetadata, err := dt.buckets[bucket].GetObjectMetadata(object)
objectMetadata, err := donut.buckets[bucket].GetObjectMetadata(object)
if err != nil {
return ObjectMetadata{}, iodine.New(err, nil)
}
@ -264,16 +221,16 @@ func (dt donut) GetObjectMetadata(bucket, object string) (ObjectMetadata, error)
}
// getDiskWriters -
func (dt donut) getBucketMetadataWriters() ([]io.WriteCloser, error) {
func (donut API) getBucketMetadataWriters() ([]io.WriteCloser, error) {
var writers []io.WriteCloser
for _, node := range dt.nodes {
for _, node := range donut.nodes {
disks, err := node.ListDisks()
if err != nil {
return nil, iodine.New(err, nil)
}
writers = make([]io.WriteCloser, len(disks))
for order, d := range disks {
bucketMetaDataWriter, err := d.CreateFile(filepath.Join(dt.name, bucketMetadataConfig))
bucketMetaDataWriter, err := d.CreateFile(filepath.Join(donut.config.DonutName, bucketMetadataConfig))
if err != nil {
return nil, iodine.New(err, nil)
}
@ -283,16 +240,16 @@ func (dt donut) getBucketMetadataWriters() ([]io.WriteCloser, error) {
return writers, nil
}
func (dt donut) getBucketMetadataReaders() ([]io.ReadCloser, error) {
func (donut API) getBucketMetadataReaders() ([]io.ReadCloser, error) {
var readers []io.ReadCloser
for _, node := range dt.nodes {
for _, node := range donut.nodes {
disks, err := node.ListDisks()
if err != nil {
return nil, iodine.New(err, nil)
}
readers = make([]io.ReadCloser, len(disks))
for order, d := range disks {
bucketMetaDataReader, err := d.OpenFile(filepath.Join(dt.name, bucketMetadataConfig))
bucketMetaDataReader, err := d.OpenFile(filepath.Join(donut.config.DonutName, bucketMetadataConfig))
if err != nil {
return nil, iodine.New(err, nil)
}
@ -303,8 +260,8 @@ func (dt donut) getBucketMetadataReaders() ([]io.ReadCloser, error) {
}
//
func (dt donut) setDonutBucketMetadata(metadata *AllBuckets) error {
writers, err := dt.getBucketMetadataWriters()
func (donut API) setDonutBucketMetadata(metadata *AllBuckets) error {
writers, err := donut.getBucketMetadataWriters()
if err != nil {
return iodine.New(err, nil)
}
@ -320,9 +277,9 @@ func (dt donut) setDonutBucketMetadata(metadata *AllBuckets) error {
return nil
}
func (dt donut) getDonutBucketMetadata() (*AllBuckets, error) {
func (donut API) getDonutBucketMetadata() (*AllBuckets, error) {
metadata := new(AllBuckets)
readers, err := dt.getBucketMetadataReaders()
readers, err := donut.getBucketMetadataReaders()
if err != nil {
return nil, iodine.New(err, nil)
}
@ -339,40 +296,40 @@ func (dt donut) getDonutBucketMetadata() (*AllBuckets, error) {
return nil, iodine.New(InvalidArgument{}, nil)
}
func (dt donut) makeDonutBucket(bucketName, acl string) error {
if err := dt.listDonutBuckets(); err != nil {
func (donut API) makeDonutBucket(bucketName, acl string) error {
if err := donut.listDonutBuckets(); err != nil {
return iodine.New(err, nil)
}
if _, ok := dt.buckets[bucketName]; ok {
if _, ok := donut.buckets[bucketName]; ok {
return iodine.New(BucketExists{Bucket: bucketName}, nil)
}
bucket, bucketMetadata, err := newBucket(bucketName, acl, dt.name, dt.nodes)
bucket, bucketMetadata, err := newBucket(bucketName, acl, donut.config.DonutName, donut.nodes)
if err != nil {
return iodine.New(err, nil)
}
nodeNumber := 0
dt.buckets[bucketName] = bucket
for _, node := range dt.nodes {
donut.buckets[bucketName] = bucket
for _, node := range donut.nodes {
disks, err := node.ListDisks()
if err != nil {
return iodine.New(err, nil)
}
for order, disk := range disks {
bucketSlice := fmt.Sprintf("%s$%d$%d", bucketName, nodeNumber, order)
err := disk.MakeDir(filepath.Join(dt.name, bucketSlice))
err := disk.MakeDir(filepath.Join(donut.config.DonutName, bucketSlice))
if err != nil {
return iodine.New(err, nil)
}
}
nodeNumber = nodeNumber + 1
}
metadata, err := dt.getDonutBucketMetadata()
metadata, err := donut.getDonutBucketMetadata()
if err != nil {
if os.IsNotExist(iodine.ToError(err)) {
metadata := new(AllBuckets)
metadata.Buckets = make(map[string]BucketMetadata)
metadata.Buckets[bucketName] = bucketMetadata
err = dt.setDonutBucketMetadata(metadata)
err = donut.setDonutBucketMetadata(metadata)
if err != nil {
return iodine.New(err, nil)
}
@ -381,21 +338,21 @@ func (dt donut) makeDonutBucket(bucketName, acl string) error {
return iodine.New(err, nil)
}
metadata.Buckets[bucketName] = bucketMetadata
err = dt.setDonutBucketMetadata(metadata)
err = donut.setDonutBucketMetadata(metadata)
if err != nil {
return iodine.New(err, nil)
}
return nil
}
func (dt donut) listDonutBuckets() error {
for _, node := range dt.nodes {
func (donut API) listDonutBuckets() error {
for _, node := range donut.nodes {
disks, err := node.ListDisks()
if err != nil {
return iodine.New(err, nil)
}
for _, disk := range disks {
dirs, err := disk.ListDir(dt.name)
dirs, err := disk.ListDir(donut.config.DonutName)
if err != nil {
return iodine.New(err, nil)
}
@ -406,11 +363,11 @@ func (dt donut) listDonutBuckets() error {
}
bucketName := splitDir[0]
// we dont need this once we cache from makeDonutBucket()
bucket, _, err := newBucket(bucketName, "private", dt.name, dt.nodes)
bucket, _, err := newBucket(bucketName, "private", donut.config.DonutName, donut.nodes)
if err != nil {
return iodine.New(err, nil)
}
dt.buckets[bucketName] = bucket
donut.buckets[bucketName] = bucket
}
}
}

View File

@ -55,13 +55,22 @@ func createTestNodeDiskMap(p string) map[string][]string {
return nodes
}
var dd Cache
var dd Interface
func (s *MyDonutSuite) SetUpSuite(c *C) {
root, err := ioutil.TempDir(os.TempDir(), "donut-")
c.Assert(err, IsNil)
s.root = root
dd = NewCache(100000, time.Duration(1*time.Hour), "test", createTestNodeDiskMap(root))
conf := new(Config)
conf.DonutName = "test"
conf.NodeDiskMap = createTestNodeDiskMap(root)
conf.Expiration = time.Duration(1 * time.Hour)
conf.MaxSize = 100000
dd, err = New(conf)
c.Assert(err, IsNil)
// testing empty donut
buckets, err := dd.ListBuckets()
c.Assert(err, IsNil)
@ -145,7 +154,7 @@ func (s *MyDonutSuite) TestCreateMultipleBucketsAndList(c *C) {
// test object create without bucket
func (s *MyDonutSuite) TestNewObjectFailsWithoutBucket(c *C) {
_, err := dd.CreateObject("unknown", "obj", "", "", 0, nil)
_, err := dd.CreateObject("unknown", "obj", "", 0, nil, nil)
c.Assert(err, Not(IsNil))
}
@ -160,7 +169,7 @@ func (s *MyDonutSuite) TestNewObjectMetadata(c *C) {
err := dd.MakeBucket("foo6", "private")
c.Assert(err, IsNil)
objectMetadata, err := dd.CreateObject("foo6", "obj", "application/json", expectedMd5Sum, int64(len(data)), reader)
objectMetadata, err := dd.CreateObject("foo6", "obj", expectedMd5Sum, int64(len(data)), reader, map[string]string{"contentType": "application/json"})
c.Assert(err, IsNil)
c.Assert(objectMetadata.MD5Sum, Equals, hex.EncodeToString(hasher.Sum(nil)))
c.Assert(objectMetadata.Metadata["contentType"], Equals, "application/json")
@ -168,7 +177,7 @@ func (s *MyDonutSuite) TestNewObjectMetadata(c *C) {
// test create object fails without name
func (s *MyDonutSuite) TestNewObjectFailsWithEmptyName(c *C) {
_, err := dd.CreateObject("foo", "", "", "", 0, nil)
_, err := dd.CreateObject("foo", "", "", 0, nil, nil)
c.Assert(err, Not(IsNil))
}
@ -184,7 +193,7 @@ func (s *MyDonutSuite) TestNewObjectCanBeWritten(c *C) {
expectedMd5Sum := base64.StdEncoding.EncodeToString(hasher.Sum(nil))
reader := ioutil.NopCloser(bytes.NewReader([]byte(data)))
actualMetadata, err := dd.CreateObject("foo", "obj", "application/octet-stream", expectedMd5Sum, int64(len(data)), reader)
actualMetadata, err := dd.CreateObject("foo", "obj", expectedMd5Sum, int64(len(data)), reader, map[string]string{"contentType": "application/octet-stream"})
c.Assert(err, IsNil)
c.Assert(actualMetadata.MD5Sum, Equals, hex.EncodeToString(hasher.Sum(nil)))
@ -206,11 +215,11 @@ func (s *MyDonutSuite) TestMultipleNewObjects(c *C) {
one := ioutil.NopCloser(bytes.NewReader([]byte("one")))
_, err := dd.CreateObject("foo5", "obj1", "", "", int64(len("one")), one)
_, err := dd.CreateObject("foo5", "obj1", "", int64(len("one")), one, nil)
c.Assert(err, IsNil)
two := ioutil.NopCloser(bytes.NewReader([]byte("two")))
_, err = dd.CreateObject("foo5", "obj2", "", "", int64(len("two")), two)
_, err = dd.CreateObject("foo5", "obj2", "", int64(len("two")), two, nil)
c.Assert(err, IsNil)
var buffer1 bytes.Buffer
@ -259,7 +268,7 @@ func (s *MyDonutSuite) TestMultipleNewObjects(c *C) {
c.Assert(objectsMetadata[1].Object, Equals, "obj2")
three := ioutil.NopCloser(bytes.NewReader([]byte("three")))
_, err = dd.CreateObject("foo5", "obj3", "", "", int64(len("three")), three)
_, err = dd.CreateObject("foo5", "obj3", "", int64(len("three")), three, nil)
c.Assert(err, IsNil)
var buffer bytes.Buffer

View File

@ -33,6 +33,7 @@ import (
"time"
"github.com/minio/minio/pkg/iodine"
"github.com/minio/minio/pkg/quick"
"github.com/minio/minio/pkg/storage/donut/trove"
)
@ -41,15 +42,24 @@ const (
totalBuckets = 100
)
// Cache - local variables
type Cache struct {
storedBuckets map[string]storedBucket
// Config donut config
type Config struct {
Version string `json:"version"`
MaxSize uint64 `json:"max-size"`
Expiration time.Duration `json:"expiration"`
DonutName string `json:"donut-name"`
NodeDiskMap map[string][]string `json:"node-disk-map"`
}
// API - local variables
type API struct {
config *Config
lock *sync.RWMutex
objects *trove.Cache
multiPartObjects *trove.Cache
maxSize uint64
expiration time.Duration
donut Donut
storedBuckets map[string]storedBucket
nodes map[string]node
buckets map[string]bucket
}
// storedBucket saved bucket
@ -67,79 +77,85 @@ type multiPartSession struct {
initiated time.Time
}
type proxyWriter struct {
writer io.Writer
writtenBytes []byte
}
func (r *proxyWriter) Write(p []byte) (n int, err error) {
n, err = r.writer.Write(p)
if err != nil {
return
// New instantiate a new donut
func New(c *Config) (Interface, error) {
if err := quick.CheckData(c); err != nil {
return nil, iodine.New(err, nil)
}
r.writtenBytes = append(r.writtenBytes, p[0:n]...)
return
}
func newProxyWriter(w io.Writer) *proxyWriter {
return &proxyWriter{writer: w, writtenBytes: nil}
}
// NewCache new cache
func NewCache(maxSize uint64, expiration time.Duration, donutName string, nodeDiskMap map[string][]string) Cache {
c := Cache{}
c.storedBuckets = make(map[string]storedBucket)
c.objects = trove.NewCache(maxSize, expiration)
c.multiPartObjects = trove.NewCache(0, time.Duration(0))
c.objects.OnExpired = c.expiredObject
c.multiPartObjects.OnExpired = c.expiredPart
c.lock = new(sync.RWMutex)
c.maxSize = maxSize
c.expiration = expiration
a := API{config: c}
a.storedBuckets = make(map[string]storedBucket)
a.nodes = make(map[string]node)
a.buckets = make(map[string]bucket)
a.objects = trove.NewCache(a.config.MaxSize, a.config.Expiration)
a.multiPartObjects = trove.NewCache(0, time.Duration(0))
a.objects.OnExpired = a.expiredObject
a.multiPartObjects.OnExpired = a.expiredPart
a.lock = new(sync.RWMutex)
// set up cache expiration
c.objects.ExpireObjects(time.Second * 5)
c.donut, _ = NewDonut(donutName, nodeDiskMap)
return c
a.objects.ExpireObjects(time.Second * 5)
if len(a.config.NodeDiskMap) > 0 {
for k, v := range a.config.NodeDiskMap {
if len(v) == 0 {
return nil, iodine.New(InvalidDisksArgument{}, nil)
}
err := a.AttachNode(k, v)
if err != nil {
return nil, iodine.New(err, nil)
}
}
/// Initialization, populate all buckets into memory
buckets, err := a.listBuckets()
if err != nil {
return nil, iodine.New(err, nil)
}
for k, v := range buckets {
storedBucket := a.storedBuckets[k]
storedBucket.bucketMetadata = v
a.storedBuckets[k] = storedBucket
}
}
return a, nil
}
// GetObject - GET object from cache buffer
func (cache Cache) GetObject(w io.Writer, bucket string, object string) (int64, error) {
cache.lock.RLock()
func (donut API) GetObject(w io.Writer, bucket string, object string) (int64, error) {
donut.lock.RLock()
if !IsValidBucket(bucket) {
cache.lock.RUnlock()
donut.lock.RUnlock()
return 0, iodine.New(BucketNameInvalid{Bucket: bucket}, nil)
}
if !IsValidObjectName(object) {
cache.lock.RUnlock()
donut.lock.RUnlock()
return 0, iodine.New(ObjectNameInvalid{Object: object}, nil)
}
if _, ok := cache.storedBuckets[bucket]; ok == false {
cache.lock.RUnlock()
if _, ok := donut.storedBuckets[bucket]; ok == false {
donut.lock.RUnlock()
return 0, iodine.New(BucketNotFound{Bucket: bucket}, nil)
}
objectKey := bucket + "/" + object
data, ok := cache.objects.Get(objectKey)
data, ok := donut.objects.Get(objectKey)
if !ok {
if cache.donut != nil {
reader, size, err := cache.donut.GetObject(bucket, object)
if len(donut.config.NodeDiskMap) > 0 {
reader, size, err := donut.getObject(bucket, object)
if err != nil {
cache.lock.RUnlock()
donut.lock.RUnlock()
return 0, iodine.New(err, nil)
}
// new proxy writer to capture data read from disk
pw := newProxyWriter(w)
pw := NewProxyWriter(w)
written, err := io.CopyN(pw, reader, size)
if err != nil {
cache.lock.RUnlock()
donut.lock.RUnlock()
return 0, iodine.New(err, nil)
}
cache.lock.RUnlock()
donut.lock.RUnlock()
/// cache object read from disk
{
cache.lock.Lock()
ok := cache.objects.Set(objectKey, pw.writtenBytes)
cache.lock.Unlock()
donut.lock.Lock()
ok := donut.objects.Set(objectKey, pw.writtenBytes)
donut.lock.Unlock()
pw.writtenBytes = nil
go debug.FreeOSMemory()
if !ok {
@ -148,65 +164,65 @@ func (cache Cache) GetObject(w io.Writer, bucket string, object string) (int64,
}
return written, nil
}
cache.lock.RUnlock()
donut.lock.RUnlock()
return 0, iodine.New(ObjectNotFound{Object: object}, nil)
}
written, err := io.CopyN(w, bytes.NewBuffer(data), int64(cache.objects.Len(objectKey)))
written, err := io.CopyN(w, bytes.NewBuffer(data), int64(donut.objects.Len(objectKey)))
if err != nil {
return 0, iodine.New(err, nil)
}
cache.lock.RUnlock()
donut.lock.RUnlock()
return written, nil
}
// GetPartialObject - GET object from cache buffer range
func (cache Cache) GetPartialObject(w io.Writer, bucket, object string, start, length int64) (int64, error) {
func (donut API) GetPartialObject(w io.Writer, bucket, object string, start, length int64) (int64, error) {
errParams := map[string]string{
"bucket": bucket,
"object": object,
"start": strconv.FormatInt(start, 10),
"length": strconv.FormatInt(length, 10),
}
cache.lock.RLock()
donut.lock.RLock()
if !IsValidBucket(bucket) {
cache.lock.RUnlock()
donut.lock.RUnlock()
return 0, iodine.New(BucketNameInvalid{Bucket: bucket}, errParams)
}
if !IsValidObjectName(object) {
cache.lock.RUnlock()
donut.lock.RUnlock()
return 0, iodine.New(ObjectNameInvalid{Object: object}, errParams)
}
if start < 0 {
cache.lock.RUnlock()
donut.lock.RUnlock()
return 0, iodine.New(InvalidRange{
Start: start,
Length: length,
}, errParams)
}
objectKey := bucket + "/" + object
data, ok := cache.objects.Get(objectKey)
data, ok := donut.objects.Get(objectKey)
if !ok {
if cache.donut != nil {
reader, _, err := cache.donut.GetObject(bucket, object)
if len(donut.config.NodeDiskMap) > 0 {
reader, _, err := donut.getObject(bucket, object)
if err != nil {
cache.lock.RUnlock()
donut.lock.RUnlock()
return 0, iodine.New(err, nil)
}
if _, err := io.CopyN(ioutil.Discard, reader, start); err != nil {
cache.lock.RUnlock()
donut.lock.RUnlock()
return 0, iodine.New(err, nil)
}
pw := newProxyWriter(w)
pw := NewProxyWriter(w)
written, err := io.CopyN(w, reader, length)
if err != nil {
cache.lock.RUnlock()
donut.lock.RUnlock()
return 0, iodine.New(err, nil)
}
cache.lock.RUnlock()
donut.lock.RUnlock()
{
cache.lock.Lock()
ok := cache.objects.Set(objectKey, pw.writtenBytes)
cache.lock.Unlock()
donut.lock.Lock()
ok := donut.objects.Set(objectKey, pw.writtenBytes)
donut.lock.Unlock()
pw.writtenBytes = nil
go debug.FreeOSMemory()
if !ok {
@ -215,72 +231,70 @@ func (cache Cache) GetPartialObject(w io.Writer, bucket, object string, start, l
}
return written, nil
}
cache.lock.RUnlock()
donut.lock.RUnlock()
return 0, iodine.New(ObjectNotFound{Object: object}, nil)
}
written, err := io.CopyN(w, bytes.NewBuffer(data[start:]), length)
if err != nil {
return 0, iodine.New(err, nil)
}
cache.lock.RUnlock()
donut.lock.RUnlock()
return written, nil
}
// GetBucketMetadata -
func (cache Cache) GetBucketMetadata(bucket string) (BucketMetadata, error) {
cache.lock.RLock()
func (donut API) GetBucketMetadata(bucket string) (BucketMetadata, error) {
donut.lock.RLock()
if !IsValidBucket(bucket) {
cache.lock.RUnlock()
donut.lock.RUnlock()
return BucketMetadata{}, iodine.New(BucketNameInvalid{Bucket: bucket}, nil)
}
if _, ok := cache.storedBuckets[bucket]; ok == false {
if cache.donut == nil {
cache.lock.RUnlock()
return BucketMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, nil)
if _, ok := donut.storedBuckets[bucket]; ok == false {
if len(donut.config.NodeDiskMap) > 0 {
bucketMetadata, err := donut.getBucketMetadata(bucket)
if err != nil {
donut.lock.RUnlock()
return BucketMetadata{}, iodine.New(err, nil)
}
storedBucket := donut.storedBuckets[bucket]
donut.lock.RUnlock()
{
donut.lock.Lock()
storedBucket.bucketMetadata = bucketMetadata
donut.storedBuckets[bucket] = storedBucket
donut.lock.Unlock()
}
}
bucketMetadata, err := cache.donut.GetBucketMetadata(bucket)
if err != nil {
cache.lock.RUnlock()
return BucketMetadata{}, iodine.New(err, nil)
}
storedBucket := cache.storedBuckets[bucket]
cache.lock.RUnlock()
cache.lock.Lock()
storedBucket.bucketMetadata = bucketMetadata
cache.storedBuckets[bucket] = storedBucket
cache.lock.Unlock()
return BucketMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, nil)
}
cache.lock.RUnlock()
return cache.storedBuckets[bucket].bucketMetadata, nil
donut.lock.RUnlock()
return donut.storedBuckets[bucket].bucketMetadata, nil
}
// SetBucketMetadata -
func (cache Cache) SetBucketMetadata(bucket, acl string) error {
cache.lock.RLock()
func (donut API) SetBucketMetadata(bucket string, metadata map[string]string) error {
donut.lock.RLock()
if !IsValidBucket(bucket) {
cache.lock.RUnlock()
donut.lock.RUnlock()
return iodine.New(BucketNameInvalid{Bucket: bucket}, nil)
}
if _, ok := cache.storedBuckets[bucket]; ok == false {
cache.lock.RUnlock()
if _, ok := donut.storedBuckets[bucket]; ok == false {
donut.lock.RUnlock()
return iodine.New(BucketNotFound{Bucket: bucket}, nil)
}
if strings.TrimSpace(acl) == "" {
acl = "private"
}
cache.lock.RUnlock()
cache.lock.Lock()
m := make(map[string]string)
m["acl"] = acl
if cache.donut != nil {
if err := cache.donut.SetBucketMetadata(bucket, m); err != nil {
return iodine.New(err, nil)
donut.lock.RUnlock()
donut.lock.Lock()
{
if len(donut.config.NodeDiskMap) > 0 {
if err := donut.setBucketMetadata(bucket, metadata); err != nil {
return iodine.New(err, nil)
}
}
storedBucket := donut.storedBuckets[bucket]
storedBucket.bucketMetadata.ACL = BucketACL(metadata["acl"])
donut.storedBuckets[bucket] = storedBucket
}
storedBucket := cache.storedBuckets[bucket]
storedBucket.bucketMetadata.ACL = BucketACL(acl)
cache.storedBuckets[bucket] = storedBucket
cache.lock.Unlock()
donut.lock.Unlock()
return nil
}
@ -304,44 +318,45 @@ func isMD5SumEqual(expectedMD5Sum, actualMD5Sum string) error {
}
// CreateObject -
func (cache Cache) CreateObject(bucket, key, contentType, expectedMD5Sum string, size int64, data io.Reader) (ObjectMetadata, error) {
if size > int64(cache.maxSize) {
func (donut API) CreateObject(bucket, key, expectedMD5Sum string, size int64, data io.Reader, metadata map[string]string) (ObjectMetadata, error) {
if size > int64(donut.config.MaxSize) {
generic := GenericObjectError{Bucket: bucket, Object: key}
return ObjectMetadata{}, iodine.New(EntityTooLarge{
GenericObjectError: generic,
Size: strconv.FormatInt(size, 10),
MaxSize: strconv.FormatUint(cache.maxSize, 10),
MaxSize: strconv.FormatUint(donut.config.MaxSize, 10),
}, nil)
}
objectMetadata, err := cache.createObject(bucket, key, contentType, expectedMD5Sum, size, data)
contentType := metadata["contentType"]
objectMetadata, err := donut.createObject(bucket, key, contentType, expectedMD5Sum, size, data)
// free
debug.FreeOSMemory()
return objectMetadata, iodine.New(err, nil)
}
// createObject - PUT object to cache buffer
func (cache Cache) createObject(bucket, key, contentType, expectedMD5Sum string, size int64, data io.Reader) (ObjectMetadata, error) {
cache.lock.RLock()
func (donut API) createObject(bucket, key, contentType, expectedMD5Sum string, size int64, data io.Reader) (ObjectMetadata, error) {
donut.lock.RLock()
if !IsValidBucket(bucket) {
cache.lock.RUnlock()
donut.lock.RUnlock()
return ObjectMetadata{}, iodine.New(BucketNameInvalid{Bucket: bucket}, nil)
}
if !IsValidObjectName(key) {
cache.lock.RUnlock()
donut.lock.RUnlock()
return ObjectMetadata{}, iodine.New(ObjectNameInvalid{Object: key}, nil)
}
if _, ok := cache.storedBuckets[bucket]; ok == false {
cache.lock.RUnlock()
if _, ok := donut.storedBuckets[bucket]; ok == false {
donut.lock.RUnlock()
return ObjectMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, nil)
}
storedBucket := cache.storedBuckets[bucket]
storedBucket := donut.storedBuckets[bucket]
// get object key
objectKey := bucket + "/" + key
if _, ok := storedBucket.objectMetadata[objectKey]; ok == true {
cache.lock.RUnlock()
donut.lock.RUnlock()
return ObjectMetadata{}, iodine.New(ObjectExists{Object: key}, nil)
}
cache.lock.RUnlock()
donut.lock.RUnlock()
if contentType == "" {
contentType = "application/octet-stream"
@ -356,15 +371,15 @@ func (cache Cache) createObject(bucket, key, contentType, expectedMD5Sum string,
expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes)
}
if cache.donut != nil {
objMetadata, err := cache.donut.PutObject(bucket, key, expectedMD5Sum, data, map[string]string{"contentType": contentType})
if len(donut.config.NodeDiskMap) > 0 {
objMetadata, err := donut.putObject(bucket, key, expectedMD5Sum, data, map[string]string{"contentType": contentType})
if err != nil {
return ObjectMetadata{}, iodine.New(err, nil)
}
cache.lock.Lock()
donut.lock.Lock()
storedBucket.objectMetadata[objectKey] = objMetadata
cache.storedBuckets[bucket] = storedBucket
cache.lock.Unlock()
donut.storedBuckets[bucket] = storedBucket
donut.lock.Unlock()
return objMetadata, nil
}
// calculate md5
@ -382,9 +397,9 @@ func (cache Cache) createObject(bucket, key, contentType, expectedMD5Sum string,
break
}
hash.Write(byteBuffer[0:length])
cache.lock.Lock()
ok := cache.objects.Append(objectKey, byteBuffer[0:length])
cache.lock.Unlock()
donut.lock.Lock()
ok := donut.objects.Append(objectKey, byteBuffer[0:length])
donut.lock.Unlock()
if !ok {
return ObjectMetadata{}, iodine.New(InternalError{}, nil)
}
@ -416,40 +431,40 @@ func (cache Cache) createObject(bucket, key, contentType, expectedMD5Sum string,
Size: int64(totalLength),
}
cache.lock.Lock()
donut.lock.Lock()
storedBucket.objectMetadata[objectKey] = newObject
cache.storedBuckets[bucket] = storedBucket
cache.lock.Unlock()
donut.storedBuckets[bucket] = storedBucket
donut.lock.Unlock()
return newObject, nil
}
// MakeBucket - create bucket in cache
func (cache Cache) MakeBucket(bucketName, acl string) error {
cache.lock.RLock()
if len(cache.storedBuckets) == totalBuckets {
cache.lock.RUnlock()
func (donut API) MakeBucket(bucketName, acl string) error {
donut.lock.RLock()
if len(donut.storedBuckets) == totalBuckets {
donut.lock.RUnlock()
return iodine.New(TooManyBuckets{Bucket: bucketName}, nil)
}
if !IsValidBucket(bucketName) {
cache.lock.RUnlock()
donut.lock.RUnlock()
return iodine.New(BucketNameInvalid{Bucket: bucketName}, nil)
}
if !IsValidBucketACL(acl) {
cache.lock.RUnlock()
donut.lock.RUnlock()
return iodine.New(InvalidACL{ACL: acl}, nil)
}
if _, ok := cache.storedBuckets[bucketName]; ok == true {
cache.lock.RUnlock()
if _, ok := donut.storedBuckets[bucketName]; ok == true {
donut.lock.RUnlock()
return iodine.New(BucketExists{Bucket: bucketName}, nil)
}
cache.lock.RUnlock()
donut.lock.RUnlock()
if strings.TrimSpace(acl) == "" {
// default is private
acl = "private"
}
if cache.donut != nil {
if err := cache.donut.MakeBucket(bucketName, BucketACL(acl)); err != nil {
if len(donut.config.NodeDiskMap) > 0 {
if err := donut.makeBucket(bucketName, BucketACL(acl)); err != nil {
return iodine.New(err, nil)
}
}
@ -461,29 +476,29 @@ func (cache Cache) MakeBucket(bucketName, acl string) error {
newBucket.bucketMetadata.Name = bucketName
newBucket.bucketMetadata.Created = time.Now().UTC()
newBucket.bucketMetadata.ACL = BucketACL(acl)
cache.lock.Lock()
cache.storedBuckets[bucketName] = newBucket
cache.lock.Unlock()
donut.lock.Lock()
donut.storedBuckets[bucketName] = newBucket
donut.lock.Unlock()
return nil
}
// ListObjects - list objects from cache
func (cache Cache) ListObjects(bucket string, resources BucketResourcesMetadata) ([]ObjectMetadata, BucketResourcesMetadata, error) {
cache.lock.RLock()
defer cache.lock.RUnlock()
func (donut API) ListObjects(bucket string, resources BucketResourcesMetadata) ([]ObjectMetadata, BucketResourcesMetadata, error) {
donut.lock.RLock()
defer donut.lock.RUnlock()
if !IsValidBucket(bucket) {
return nil, BucketResourcesMetadata{IsTruncated: false}, iodine.New(BucketNameInvalid{Bucket: bucket}, nil)
}
if !IsValidPrefix(resources.Prefix) {
return nil, BucketResourcesMetadata{IsTruncated: false}, iodine.New(ObjectNameInvalid{Object: resources.Prefix}, nil)
}
if _, ok := cache.storedBuckets[bucket]; ok == false {
if _, ok := donut.storedBuckets[bucket]; ok == false {
return nil, BucketResourcesMetadata{IsTruncated: false}, iodine.New(BucketNotFound{Bucket: bucket}, nil)
}
var results []ObjectMetadata
var keys []string
if cache.donut != nil {
listObjects, err := cache.donut.ListObjects(
if len(donut.config.NodeDiskMap) > 0 {
listObjects, err := donut.listObjects(
bucket,
resources.Prefix,
resources.Marker,
@ -507,7 +522,7 @@ func (cache Cache) ListObjects(bucket string, resources BucketResourcesMetadata)
}
return results, resources, nil
}
storedBucket := cache.storedBuckets[bucket]
storedBucket := donut.storedBuckets[bucket]
for key := range storedBucket.objectMetadata {
if strings.HasPrefix(key, bucket+"/") {
key = key[len(bucket)+1:]
@ -561,11 +576,11 @@ func (b byBucketName) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
func (b byBucketName) Less(i, j int) bool { return b[i].Name < b[j].Name }
// ListBuckets - List buckets from cache
func (cache Cache) ListBuckets() ([]BucketMetadata, error) {
cache.lock.RLock()
defer cache.lock.RUnlock()
func (donut API) ListBuckets() ([]BucketMetadata, error) {
donut.lock.RLock()
defer donut.lock.RUnlock()
var results []BucketMetadata
for _, bucket := range cache.storedBuckets {
for _, bucket := range donut.storedBuckets {
results = append(results, bucket.bucketMetadata)
}
sort.Sort(byBucketName(results))
@ -573,50 +588,50 @@ func (cache Cache) ListBuckets() ([]BucketMetadata, error) {
}
// GetObjectMetadata - get object metadata from cache
func (cache Cache) GetObjectMetadata(bucket, key string) (ObjectMetadata, error) {
cache.lock.RLock()
func (donut API) GetObjectMetadata(bucket, key string) (ObjectMetadata, error) {
donut.lock.RLock()
// check if bucket exists
if !IsValidBucket(bucket) {
cache.lock.RUnlock()
donut.lock.RUnlock()
return ObjectMetadata{}, iodine.New(BucketNameInvalid{Bucket: bucket}, nil)
}
if !IsValidObjectName(key) {
cache.lock.RUnlock()
donut.lock.RUnlock()
return ObjectMetadata{}, iodine.New(ObjectNameInvalid{Object: key}, nil)
}
if _, ok := cache.storedBuckets[bucket]; ok == false {
cache.lock.RUnlock()
if _, ok := donut.storedBuckets[bucket]; ok == false {
donut.lock.RUnlock()
return ObjectMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, nil)
}
storedBucket := cache.storedBuckets[bucket]
storedBucket := donut.storedBuckets[bucket]
objectKey := bucket + "/" + key
if objMetadata, ok := storedBucket.objectMetadata[objectKey]; ok == true {
cache.lock.RUnlock()
donut.lock.RUnlock()
return objMetadata, nil
}
if cache.donut != nil {
objMetadata, err := cache.donut.GetObjectMetadata(bucket, key)
cache.lock.RUnlock()
if len(donut.config.NodeDiskMap) > 0 {
objMetadata, err := donut.getObjectMetadata(bucket, key)
donut.lock.RUnlock()
if err != nil {
return ObjectMetadata{}, iodine.New(err, nil)
}
// update
cache.lock.Lock()
donut.lock.Lock()
storedBucket.objectMetadata[objectKey] = objMetadata
cache.lock.Unlock()
donut.lock.Unlock()
return objMetadata, nil
}
cache.lock.RUnlock()
donut.lock.RUnlock()
return ObjectMetadata{}, iodine.New(ObjectNotFound{Object: key}, nil)
}
func (cache Cache) expiredObject(a ...interface{}) {
cacheStats := cache.objects.Stats()
func (donut API) expiredObject(a ...interface{}) {
cacheStats := donut.objects.Stats()
log.Printf("CurrentSize: %d, CurrentItems: %d, TotalExpirations: %d",
cacheStats.Bytes, cacheStats.Items, cacheStats.Expired)
key := a[0].(string)
// loop through all buckets
for _, storedBucket := range cache.storedBuckets {
for _, storedBucket := range donut.storedBuckets {
delete(storedBucket.objectMetadata, key)
}
debug.FreeOSMemory()

View File

@ -34,11 +34,20 @@ type MyCacheSuite struct{}
var _ = Suite(&MyCacheSuite{})
var dc Cache
var dc Interface
func (s *MyCacheSuite) SetUpSuite(c *C) {
// no donut this time
dc = NewCache(100000, time.Duration(1*time.Hour), "", nil)
// test only cache
conf := new(Config)
conf.DonutName = ""
conf.NodeDiskMap = nil
conf.Expiration = time.Duration(1 * time.Hour)
conf.MaxSize = 100000
var err error
dc, err = New(conf)
c.Assert(err, IsNil)
// testing empty cache
buckets, err := dc.ListBuckets()
c.Assert(err, IsNil)
@ -118,7 +127,7 @@ func (s *MyCacheSuite) TestCreateMultipleBucketsAndList(c *C) {
// test object create without bucket
func (s *MyCacheSuite) TestNewObjectFailsWithoutBucket(c *C) {
_, err := dc.CreateObject("unknown", "obj", "", "", 0, nil)
_, err := dc.CreateObject("unknown", "obj", "", 0, nil, nil)
c.Assert(err, Not(IsNil))
}
@ -133,7 +142,7 @@ func (s *MyCacheSuite) TestNewObjectMetadata(c *C) {
err := dc.MakeBucket("foo6", "private")
c.Assert(err, IsNil)
objectMetadata, err := dc.CreateObject("foo6", "obj", "application/json", expectedMd5Sum, int64(len(data)), reader)
objectMetadata, err := dc.CreateObject("foo6", "obj", expectedMd5Sum, int64(len(data)), reader, map[string]string{"contentType": "application/json"})
c.Assert(err, IsNil)
c.Assert(objectMetadata.MD5Sum, Equals, hex.EncodeToString(hasher.Sum(nil)))
c.Assert(objectMetadata.Metadata["contentType"], Equals, "application/json")
@ -141,7 +150,7 @@ func (s *MyCacheSuite) TestNewObjectMetadata(c *C) {
// test create object fails without name
func (s *MyCacheSuite) TestNewObjectFailsWithEmptyName(c *C) {
_, err := dc.CreateObject("foo", "", "", "", 0, nil)
_, err := dc.CreateObject("foo", "", "", 0, nil, nil)
c.Assert(err, Not(IsNil))
}
@ -157,7 +166,7 @@ func (s *MyCacheSuite) TestNewObjectCanBeWritten(c *C) {
expectedMd5Sum := base64.StdEncoding.EncodeToString(hasher.Sum(nil))
reader := ioutil.NopCloser(bytes.NewReader([]byte(data)))
actualMetadata, err := dc.CreateObject("foo", "obj", "application/octet-stream", expectedMd5Sum, int64(len(data)), reader)
actualMetadata, err := dc.CreateObject("foo", "obj", expectedMd5Sum, int64(len(data)), reader, map[string]string{"contentType": "application/octet-stream"})
c.Assert(err, IsNil)
c.Assert(actualMetadata.MD5Sum, Equals, hex.EncodeToString(hasher.Sum(nil)))
@ -179,11 +188,11 @@ func (s *MyCacheSuite) TestMultipleNewObjects(c *C) {
one := ioutil.NopCloser(bytes.NewReader([]byte("one")))
_, err := dc.CreateObject("foo5", "obj1", "", "", int64(len("one")), one)
_, err := dc.CreateObject("foo5", "obj1", "", int64(len("one")), one, nil)
c.Assert(err, IsNil)
two := ioutil.NopCloser(bytes.NewReader([]byte("two")))
_, err = dc.CreateObject("foo5", "obj2", "", "", int64(len("two")), two)
_, err = dc.CreateObject("foo5", "obj2", "", int64(len("two")), two, nil)
c.Assert(err, IsNil)
var buffer1 bytes.Buffer
@ -232,7 +241,7 @@ func (s *MyCacheSuite) TestMultipleNewObjects(c *C) {
c.Assert(objectsMetadata[1].Object, Equals, "obj2")
three := ioutil.NopCloser(bytes.NewReader([]byte("three")))
_, err = dc.CreateObject("foo5", "obj3", "", "", int64(len("three")), three)
_, err = dc.CreateObject("foo5", "obj3", "", int64(len("three")), three, nil)
c.Assert(err, IsNil)
var buffer bytes.Buffer

View File

@ -20,8 +20,8 @@ import "io"
// Collection of Donut specification interfaces
// Donut is a collection of object storage and management interface
type Donut interface {
// Interface is a collection of object storage and management interface
type Interface interface {
ObjectStorage
Management
}
@ -31,16 +31,29 @@ type ObjectStorage interface {
// Storage service operations
GetBucketMetadata(bucket string) (BucketMetadata, error)
SetBucketMetadata(bucket string, metadata map[string]string) error
ListBuckets() (map[string]BucketMetadata, error)
MakeBucket(bucket string, acl BucketACL) error
ListBuckets() ([]BucketMetadata, error)
MakeBucket(bucket string, ACL string) error
// Bucket operations
ListObjects(bucket, prefix, marker, delim string, maxKeys int) (ListObjects, error)
ListObjects(bucket string, resources BucketResourcesMetadata) ([]ObjectMetadata, BucketResourcesMetadata, error)
// Object operations
GetObject(bucket, object string) (io.ReadCloser, int64, error)
GetObject(w io.Writer, bucket, object string) (int64, error)
GetPartialObject(w io.Writer, bucket, object string, start, length int64) (int64, error)
GetObjectMetadata(bucket, object string) (ObjectMetadata, error)
PutObject(bucket, object, expectedMD5Sum string, reader io.Reader, metadata map[string]string) (ObjectMetadata, error)
CreateObject(bucket, object, expectedMD5Sum string, size int64, reader io.Reader, metadata map[string]string) (ObjectMetadata, error)
Multipart
}
// Multipart API
type Multipart interface {
NewMultipartUpload(bucket, key, contentType string) (string, error)
AbortMultipartUpload(bucket, key, uploadID string) error
CreateObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error)
CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (ObjectMetadata, error)
ListMultipartUploads(bucket string, resources BucketMultipartResourcesMetadata) (BucketMultipartResourcesMetadata, error)
ListObjectParts(bucket, key string, resources ObjectResourcesMetadata) (ObjectResourcesMetadata, error)
}
// Management is a donut management system interface

View File

@ -25,14 +25,14 @@ import (
)
// Heal - heal a donut and fix bad data blocks
func (dt donut) Heal() error {
func (donut API) Heal() error {
return iodine.New(NotImplemented{Function: "Heal"}, nil)
}
// Info - return info about donut configuration
func (dt donut) Info() (nodeDiskMap map[string][]string, err error) {
func (donut API) Info() (nodeDiskMap map[string][]string, err error) {
nodeDiskMap = make(map[string][]string)
for nodeName, node := range dt.nodes {
for nodeName, node := range donut.nodes {
disks, err := node.ListDisks()
if err != nil {
return nil, iodine.New(err, nil)
@ -47,7 +47,7 @@ func (dt donut) Info() (nodeDiskMap map[string][]string, err error) {
}
// AttachNode - attach node
func (dt donut) AttachNode(hostname string, disks []string) error {
func (donut API) AttachNode(hostname string, disks []string) error {
if hostname == "" || len(disks) == 0 {
return iodine.New(InvalidArgument{}, nil)
}
@ -55,13 +55,13 @@ func (dt donut) AttachNode(hostname string, disks []string) error {
if err != nil {
return iodine.New(err, nil)
}
dt.nodes[hostname] = node
donut.nodes[hostname] = node
for i, d := range disks {
newDisk, err := disk.New(d)
if err != nil {
return iodine.New(err, nil)
}
if err := newDisk.MakeDir(dt.name); err != nil {
if err := newDisk.MakeDir(donut.config.DonutName); err != nil {
return iodine.New(err, nil)
}
if err := node.AttachDisk(newDisk, i); err != nil {
@ -72,21 +72,21 @@ func (dt donut) AttachNode(hostname string, disks []string) error {
}
// DetachNode - detach node
func (dt donut) DetachNode(hostname string) error {
delete(dt.nodes, hostname)
func (donut API) DetachNode(hostname string) error {
delete(donut.nodes, hostname)
return nil
}
// SaveConfig - save donut configuration
func (dt donut) SaveConfig() error {
func (donut API) SaveConfig() error {
nodeDiskMap := make(map[string][]string)
for hostname, node := range dt.nodes {
for hostname, node := range donut.nodes {
disks, err := node.ListDisks()
if err != nil {
return iodine.New(err, nil)
}
for order, disk := range disks {
donutConfigPath := filepath.Join(dt.name, donutConfig)
donutConfigPath := filepath.Join(donut.config.DonutName, donutConfig)
donutConfigWriter, err := disk.CreateFile(donutConfigPath)
defer donutConfigWriter.Close()
if err != nil {
@ -103,6 +103,6 @@ func (dt donut) SaveConfig() error {
}
// LoadConfig - load configuration
func (dt donut) LoadConfig() error {
func (donut API) LoadConfig() error {
return iodine.New(NotImplemented{Function: "LoadConfig"}, nil)
}

View File

@ -35,55 +35,55 @@ import (
)
// NewMultipartUpload -
func (cache Cache) NewMultipartUpload(bucket, key, contentType string) (string, error) {
cache.lock.RLock()
func (donut API) NewMultipartUpload(bucket, key, contentType string) (string, error) {
donut.lock.RLock()
if !IsValidBucket(bucket) {
cache.lock.RUnlock()
donut.lock.RUnlock()
return "", iodine.New(BucketNameInvalid{Bucket: bucket}, nil)
}
if !IsValidObjectName(key) {
cache.lock.RUnlock()
donut.lock.RUnlock()
return "", iodine.New(ObjectNameInvalid{Object: key}, nil)
}
if _, ok := cache.storedBuckets[bucket]; ok == false {
cache.lock.RUnlock()
if _, ok := donut.storedBuckets[bucket]; ok == false {
donut.lock.RUnlock()
return "", iodine.New(BucketNotFound{Bucket: bucket}, nil)
}
storedBucket := cache.storedBuckets[bucket]
storedBucket := donut.storedBuckets[bucket]
objectKey := bucket + "/" + key
if _, ok := storedBucket.objectMetadata[objectKey]; ok == true {
cache.lock.RUnlock()
donut.lock.RUnlock()
return "", iodine.New(ObjectExists{Object: key}, nil)
}
cache.lock.RUnlock()
donut.lock.RUnlock()
cache.lock.Lock()
donut.lock.Lock()
id := []byte(strconv.FormatInt(rand.Int63(), 10) + bucket + key + time.Now().String())
uploadIDSum := sha512.Sum512(id)
uploadID := base64.URLEncoding.EncodeToString(uploadIDSum[:])[:47]
cache.storedBuckets[bucket].multiPartSession[key] = multiPartSession{
donut.storedBuckets[bucket].multiPartSession[key] = multiPartSession{
uploadID: uploadID,
initiated: time.Now(),
totalParts: 0,
}
cache.lock.Unlock()
donut.lock.Unlock()
return uploadID, nil
}
// AbortMultipartUpload -
func (cache Cache) AbortMultipartUpload(bucket, key, uploadID string) error {
cache.lock.RLock()
storedBucket := cache.storedBuckets[bucket]
func (donut API) AbortMultipartUpload(bucket, key, uploadID string) error {
donut.lock.RLock()
storedBucket := donut.storedBuckets[bucket]
if storedBucket.multiPartSession[key].uploadID != uploadID {
cache.lock.RUnlock()
donut.lock.RUnlock()
return iodine.New(InvalidUploadID{UploadID: uploadID}, nil)
}
cache.lock.RUnlock()
donut.lock.RUnlock()
cache.cleanupMultiparts(bucket, key, uploadID)
cache.cleanupMultipartSession(bucket, key, uploadID)
donut.cleanupMultiparts(bucket, key, uploadID)
donut.cleanupMultipartSession(bucket, key, uploadID)
return nil
}
@ -92,17 +92,17 @@ func getMultipartKey(key string, uploadID string, partNumber int) string {
}
// CreateObjectPart -
func (cache Cache) CreateObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) {
func (donut API) CreateObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) {
// Verify upload id
cache.lock.RLock()
storedBucket := cache.storedBuckets[bucket]
donut.lock.RLock()
storedBucket := donut.storedBuckets[bucket]
if storedBucket.multiPartSession[key].uploadID != uploadID {
cache.lock.RUnlock()
donut.lock.RUnlock()
return "", iodine.New(InvalidUploadID{UploadID: uploadID}, nil)
}
cache.lock.RUnlock()
donut.lock.RUnlock()
etag, err := cache.createObjectPart(bucket, key, uploadID, partID, "", expectedMD5Sum, size, data)
etag, err := donut.createObjectPart(bucket, key, uploadID, partID, "", expectedMD5Sum, size, data)
if err != nil {
return "", iodine.New(err, nil)
}
@ -112,28 +112,28 @@ func (cache Cache) CreateObjectPart(bucket, key, uploadID string, partID int, co
}
// createObject - PUT object to cache buffer
func (cache Cache) createObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) {
cache.lock.RLock()
func (donut API) createObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) {
donut.lock.RLock()
if !IsValidBucket(bucket) {
cache.lock.RUnlock()
donut.lock.RUnlock()
return "", iodine.New(BucketNameInvalid{Bucket: bucket}, nil)
}
if !IsValidObjectName(key) {
cache.lock.RUnlock()
donut.lock.RUnlock()
return "", iodine.New(ObjectNameInvalid{Object: key}, nil)
}
if _, ok := cache.storedBuckets[bucket]; ok == false {
cache.lock.RUnlock()
if _, ok := donut.storedBuckets[bucket]; ok == false {
donut.lock.RUnlock()
return "", iodine.New(BucketNotFound{Bucket: bucket}, nil)
}
storedBucket := cache.storedBuckets[bucket]
storedBucket := donut.storedBuckets[bucket]
// get object key
partKey := bucket + "/" + getMultipartKey(key, uploadID, partID)
if _, ok := storedBucket.partMetadata[partKey]; ok == true {
cache.lock.RUnlock()
donut.lock.RUnlock()
return storedBucket.partMetadata[partKey].ETag, nil
}
cache.lock.RUnlock()
donut.lock.RUnlock()
if contentType == "" {
contentType = "application/octet-stream"
@ -172,9 +172,9 @@ func (cache Cache) createObjectPart(bucket, key, uploadID string, partID int, co
md5SumBytes := hash.Sum(nil)
totalLength := int64(len(readBytes))
cache.lock.Lock()
cache.multiPartObjects.Set(partKey, readBytes)
cache.lock.Unlock()
donut.lock.Lock()
donut.multiPartObjects.Set(partKey, readBytes)
donut.lock.Unlock()
// setting up for de-allocation
readBytes = nil
@ -192,32 +192,32 @@ func (cache Cache) createObjectPart(bucket, key, uploadID string, partID int, co
Size: totalLength,
}
cache.lock.Lock()
donut.lock.Lock()
storedBucket.partMetadata[partKey] = newPart
multiPartSession := storedBucket.multiPartSession[key]
multiPartSession.totalParts++
storedBucket.multiPartSession[key] = multiPartSession
cache.storedBuckets[bucket] = storedBucket
cache.lock.Unlock()
donut.storedBuckets[bucket] = storedBucket
donut.lock.Unlock()
return md5Sum, nil
}
func (cache Cache) cleanupMultipartSession(bucket, key, uploadID string) {
cache.lock.Lock()
defer cache.lock.Unlock()
delete(cache.storedBuckets[bucket].multiPartSession, key)
func (donut API) cleanupMultipartSession(bucket, key, uploadID string) {
donut.lock.Lock()
defer donut.lock.Unlock()
delete(donut.storedBuckets[bucket].multiPartSession, key)
}
func (cache Cache) cleanupMultiparts(bucket, key, uploadID string) {
for i := 1; i <= cache.storedBuckets[bucket].multiPartSession[key].totalParts; i++ {
func (donut API) cleanupMultiparts(bucket, key, uploadID string) {
for i := 1; i <= donut.storedBuckets[bucket].multiPartSession[key].totalParts; i++ {
objectKey := bucket + "/" + getMultipartKey(key, uploadID, i)
cache.multiPartObjects.Delete(objectKey)
donut.multiPartObjects.Delete(objectKey)
}
}
// CompleteMultipartUpload -
func (cache Cache) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (ObjectMetadata, error) {
func (donut API) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (ObjectMetadata, error) {
if !IsValidBucket(bucket) {
return ObjectMetadata{}, iodine.New(BucketNameInvalid{Bucket: bucket}, nil)
}
@ -225,26 +225,26 @@ func (cache Cache) CompleteMultipartUpload(bucket, key, uploadID string, parts m
return ObjectMetadata{}, iodine.New(ObjectNameInvalid{Object: key}, nil)
}
// Verify upload id
cache.lock.RLock()
if _, ok := cache.storedBuckets[bucket]; ok == false {
cache.lock.RUnlock()
donut.lock.RLock()
if _, ok := donut.storedBuckets[bucket]; ok == false {
donut.lock.RUnlock()
return ObjectMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, nil)
}
storedBucket := cache.storedBuckets[bucket]
storedBucket := donut.storedBuckets[bucket]
if storedBucket.multiPartSession[key].uploadID != uploadID {
cache.lock.RUnlock()
donut.lock.RUnlock()
return ObjectMetadata{}, iodine.New(InvalidUploadID{UploadID: uploadID}, nil)
}
cache.lock.RUnlock()
donut.lock.RUnlock()
cache.lock.Lock()
donut.lock.Lock()
var size int64
var fullObject bytes.Buffer
for i := 1; i <= len(parts); i++ {
recvMD5 := parts[i]
object, ok := cache.multiPartObjects.Get(bucket + "/" + getMultipartKey(key, uploadID, i))
object, ok := donut.multiPartObjects.Get(bucket + "/" + getMultipartKey(key, uploadID, i))
if ok == false {
cache.lock.Unlock()
donut.lock.Unlock()
return ObjectMetadata{}, iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil)
}
size += int64(len(object))
@ -264,20 +264,20 @@ func (cache Cache) CompleteMultipartUpload(bucket, key, uploadID string, parts m
object = nil
go debug.FreeOSMemory()
}
cache.lock.Unlock()
donut.lock.Unlock()
md5sumSlice := md5.Sum(fullObject.Bytes())
// this is needed for final verification inside CreateObject, do not convert this to hex
md5sum := base64.StdEncoding.EncodeToString(md5sumSlice[:])
objectMetadata, err := cache.CreateObject(bucket, key, "", md5sum, size, &fullObject)
objectMetadata, err := donut.CreateObject(bucket, key, md5sum, size, &fullObject, nil)
if err != nil {
// No need to call internal cleanup functions here, caller will call AbortMultipartUpload()
// which would in-turn cleanup properly in accordance with S3 Spec
return ObjectMetadata{}, iodine.New(err, nil)
}
fullObject.Reset()
cache.cleanupMultiparts(bucket, key, uploadID)
cache.cleanupMultipartSession(bucket, key, uploadID)
donut.cleanupMultiparts(bucket, key, uploadID)
donut.cleanupMultipartSession(bucket, key, uploadID)
return objectMetadata, nil
}
@ -289,14 +289,14 @@ func (a byKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
// ListMultipartUploads -
func (cache Cache) ListMultipartUploads(bucket string, resources BucketMultipartResourcesMetadata) (BucketMultipartResourcesMetadata, error) {
func (donut API) ListMultipartUploads(bucket string, resources BucketMultipartResourcesMetadata) (BucketMultipartResourcesMetadata, error) {
// TODO handle delimiter
cache.lock.RLock()
defer cache.lock.RUnlock()
if _, ok := cache.storedBuckets[bucket]; ok == false {
donut.lock.RLock()
defer donut.lock.RUnlock()
if _, ok := donut.storedBuckets[bucket]; ok == false {
return BucketMultipartResourcesMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, nil)
}
storedBucket := cache.storedBuckets[bucket]
storedBucket := donut.storedBuckets[bucket]
var uploads []*UploadMetadata
for key, session := range storedBucket.multiPartSession {
@ -351,14 +351,14 @@ func (a partNumber) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a partNumber) Less(i, j int) bool { return a[i].PartNumber < a[j].PartNumber }
// ListObjectParts -
func (cache Cache) ListObjectParts(bucket, key string, resources ObjectResourcesMetadata) (ObjectResourcesMetadata, error) {
func (donut API) ListObjectParts(bucket, key string, resources ObjectResourcesMetadata) (ObjectResourcesMetadata, error) {
// Verify upload id
cache.lock.RLock()
defer cache.lock.RUnlock()
if _, ok := cache.storedBuckets[bucket]; ok == false {
donut.lock.RLock()
defer donut.lock.RUnlock()
if _, ok := donut.storedBuckets[bucket]; ok == false {
return ObjectResourcesMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, nil)
}
storedBucket := cache.storedBuckets[bucket]
storedBucket := donut.storedBuckets[bucket]
if _, ok := storedBucket.multiPartSession[key]; ok == false {
return ObjectResourcesMetadata{}, iodine.New(ObjectNotFound{Object: key}, nil)
}
@ -395,10 +395,10 @@ func (cache Cache) ListObjectParts(bucket, key string, resources ObjectResources
return objectResourcesMetadata, nil
}
func (cache Cache) expiredPart(a ...interface{}) {
func (donut API) expiredPart(a ...interface{}) {
key := a[0].(string)
// loop through all buckets
for _, storedBucket := range cache.storedBuckets {
for _, storedBucket := range donut.storedBuckets {
delete(storedBucket.partMetadata, key)
}
debug.FreeOSMemory()

View File

@ -26,11 +26,11 @@ import (
)
// Rebalance -
func (d donut) Rebalance() error {
func (donut API) Rebalance() error {
var totalOffSetLength int
var newDisks []disk.Disk
var existingDirs []os.FileInfo
for _, node := range d.nodes {
for _, node := range donut.nodes {
disks, err := node.ListDisks()
if err != nil {
return iodine.New(err, nil)
@ -38,7 +38,7 @@ func (d donut) Rebalance() error {
totalOffSetLength = len(disks)
fmt.Println(totalOffSetLength)
for _, disk := range disks {
dirs, err := disk.ListDir(d.name)
dirs, err := disk.ListDir(donut.config.DonutName)
if err != nil {
return iodine.New(err, nil)
}