From 12bde7df302bb5c428487cc0c300a066392797ea Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Thu, 2 Jul 2015 15:40:16 -0700 Subject: [PATCH] Add simple Ticket Master which pro-actively sends messages on proceedChannel Handlers are going to wait on proceedChannel, this the initial step towards providing priority for different set of API operations --- pkg/server/api/api.go | 32 +++++++++ pkg/server/api/bucket-handlers.go | 75 ++++++++++++++++++--- pkg/server/api/object-handlers.go | 108 +++++++++++++++++++++++++++--- pkg/server/router.go | 44 ++++++------ pkg/server/server.go | 23 ++++--- 5 files changed, 235 insertions(+), 47 deletions(-) create mode 100644 pkg/server/api/api.go diff --git a/pkg/server/api/api.go b/pkg/server/api/api.go new file mode 100644 index 000000000..b84a3563e --- /dev/null +++ b/pkg/server/api/api.go @@ -0,0 +1,32 @@ +/* + * Minimalist Object Storage, (C) 2015 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package api + +// Operation container for individual operations read by Ticket Master +type Operation struct { + ProceedCh chan struct{} +} + +// Minio container for API and also carries OP (operation) channel +type Minio struct { + OP chan Operation +} + +// New instantiate a new minio API +func New() Minio { + return Minio{OP: make(chan Operation)} +} diff --git a/pkg/server/api/bucket-handlers.go b/pkg/server/api/bucket-handlers.go index bae237e0b..ef6f08bda 100644 --- a/pkg/server/api/bucket-handlers.go +++ b/pkg/server/api/bucket-handlers.go @@ -23,10 +23,7 @@ import ( "github.com/gorilla/mux" ) -// MinioAPI - -type MinioAPI struct{} - -func (api MinioAPI) isValidOp(w http.ResponseWriter, req *http.Request, acceptsContentType contentType) bool { +func (api Minio) isValidOp(w http.ResponseWriter, req *http.Request, acceptsContentType contentType) bool { vars := mux.Vars(req) bucket := vars["bucket"] log.Println(bucket) @@ -40,8 +37,17 @@ func (api MinioAPI) isValidOp(w http.ResponseWriter, req *http.Request, acceptsC // using the Initiate Multipart Upload request, but has not yet been completed or aborted. // This operation returns at most 1,000 multipart uploads in the response. // -func (api MinioAPI) ListMultipartUploadsHandler(w http.ResponseWriter, req *http.Request) { +func (api Minio) ListMultipartUploadsHandler(w http.ResponseWriter, req *http.Request) { acceptsContentType := getContentType(req) + + op := Operation{} + op.ProceedCh = make(chan struct{}) + api.OP <- op + // block until Ticket master gives us a go + <-op.ProceedCh + { + // do you operation + } log.Println(acceptsContentType) resources := getBucketMultipartResources(req.URL.Query()) @@ -60,8 +66,19 @@ func (api MinioAPI) ListMultipartUploadsHandler(w http.ResponseWriter, req *http // of the objects in a bucket. You can use the request parameters as selection // criteria to return a subset of the objects in a bucket. // -func (api MinioAPI) ListObjectsHandler(w http.ResponseWriter, req *http.Request) { +func (api Minio) ListObjectsHandler(w http.ResponseWriter, req *http.Request) { acceptsContentType := getContentType(req) + + op := Operation{} + op.ProceedCh = make(chan struct{}) + api.OP <- op + // block until Ticket master gives us a go + <-op.ProceedCh + { + // do you operation + } + log.Println(acceptsContentType) + // verify if bucket allows this operation if !api.isValidOp(w, req, acceptsContentType) { return @@ -87,7 +104,7 @@ func (api MinioAPI) ListObjectsHandler(w http.ResponseWriter, req *http.Request) // ----------- // This implementation of the GET operation returns a list of all buckets // owned by the authenticated sender of the request. -func (api MinioAPI) ListBucketsHandler(w http.ResponseWriter, req *http.Request) { +func (api Minio) ListBucketsHandler(w http.ResponseWriter, req *http.Request) { acceptsContentType := getContentType(req) // uncomment this when we have webcli // without access key credentials one cannot list buckets @@ -95,13 +112,21 @@ func (api MinioAPI) ListBucketsHandler(w http.ResponseWriter, req *http.Request) // writeErrorResponse(w, req, AccessDenied, acceptsContentType, req.URL.Path) // return // } + op := Operation{} + op.ProceedCh = make(chan struct{}) + api.OP <- op + // block until Ticket master gives us a go + <-op.ProceedCh + { + // do you operation + } log.Println(acceptsContentType) } // PutBucketHandler - PUT Bucket // ---------- // This implementation of the PUT operation creates a new bucket for authenticated request -func (api MinioAPI) PutBucketHandler(w http.ResponseWriter, req *http.Request) { +func (api Minio) PutBucketHandler(w http.ResponseWriter, req *http.Request) { acceptsContentType := getContentType(req) // uncomment this when we have webcli // without access key credentials one cannot create a bucket @@ -109,6 +134,16 @@ func (api MinioAPI) PutBucketHandler(w http.ResponseWriter, req *http.Request) { // writeErrorResponse(w, req, AccessDenied, acceptsContentType, req.URL.Path) // return // } + op := Operation{} + op.ProceedCh = make(chan struct{}) + api.OP <- op + // block until Ticket master gives us a go + <-op.ProceedCh + { + // do you operation + } + log.Println(acceptsContentType) + if isRequestBucketACL(req.URL.Query()) { api.PutBucketACLHandler(w, req) return @@ -128,8 +163,19 @@ func (api MinioAPI) PutBucketHandler(w http.ResponseWriter, req *http.Request) { // PutBucketACLHandler - PUT Bucket ACL // ---------- // This implementation of the PUT operation modifies the bucketACL for authenticated request -func (api MinioAPI) PutBucketACLHandler(w http.ResponseWriter, req *http.Request) { +func (api Minio) PutBucketACLHandler(w http.ResponseWriter, req *http.Request) { acceptsContentType := getContentType(req) + + op := Operation{} + op.ProceedCh = make(chan struct{}) + api.OP <- op + // block until Ticket master gives us a go + <-op.ProceedCh + { + // do you operation + } + log.Println(acceptsContentType) + // read from 'x-amz-acl' aclType := getACLType(req) if aclType == unsupportedACLType { @@ -148,8 +194,17 @@ func (api MinioAPI) PutBucketACLHandler(w http.ResponseWriter, req *http.Request // The operation returns a 200 OK if the bucket exists and you // have permission to access it. Otherwise, the operation might // return responses such as 404 Not Found and 403 Forbidden. -func (api MinioAPI) HeadBucketHandler(w http.ResponseWriter, req *http.Request) { +func (api Minio) HeadBucketHandler(w http.ResponseWriter, req *http.Request) { acceptsContentType := getContentType(req) + + op := Operation{} + op.ProceedCh = make(chan struct{}) + api.OP <- op + // block until Ticket master gives us a go + <-op.ProceedCh + { + // do you operation + } log.Println(acceptsContentType) vars := mux.Vars(req) diff --git a/pkg/server/api/object-handlers.go b/pkg/server/api/object-handlers.go index 0ec281500..b1c9dc807 100644 --- a/pkg/server/api/object-handlers.go +++ b/pkg/server/api/object-handlers.go @@ -36,8 +36,19 @@ const ( // ---------- // This implementation of the GET operation retrieves object. To use GET, // you must have READ access to the object. -func (api MinioAPI) GetObjectHandler(w http.ResponseWriter, req *http.Request) { +func (api Minio) GetObjectHandler(w http.ResponseWriter, req *http.Request) { acceptsContentType := getContentType(req) + + op := Operation{} + op.ProceedCh = make(chan struct{}) + api.OP <- op + // block until Ticket master gives us a go + <-op.ProceedCh + { + // do you operation + } + log.Println(acceptsContentType) + // verify if this operation is allowed if !api.isValidOp(w, req, acceptsContentType) { return @@ -54,8 +65,19 @@ func (api MinioAPI) GetObjectHandler(w http.ResponseWriter, req *http.Request) { // HeadObjectHandler - HEAD Object // ----------- // The HEAD operation retrieves metadata from an object without returning the object itself. -func (api MinioAPI) HeadObjectHandler(w http.ResponseWriter, req *http.Request) { +func (api Minio) HeadObjectHandler(w http.ResponseWriter, req *http.Request) { acceptsContentType := getContentType(req) + + op := Operation{} + op.ProceedCh = make(chan struct{}) + api.OP <- op + // block until Ticket master gives us a go + <-op.ProceedCh + { + // do you operation + } + log.Println(acceptsContentType) + // verify if this operation is allowed if !api.isValidOp(w, req, acceptsContentType) { return @@ -71,8 +93,19 @@ func (api MinioAPI) HeadObjectHandler(w http.ResponseWriter, req *http.Request) // PutObjectHandler - PUT Object // ---------- // This implementation of the PUT operation adds an object to a bucket. -func (api MinioAPI) PutObjectHandler(w http.ResponseWriter, req *http.Request) { +func (api Minio) PutObjectHandler(w http.ResponseWriter, req *http.Request) { acceptsContentType := getContentType(req) + + op := Operation{} + op.ProceedCh = make(chan struct{}) + api.OP <- op + // block until Ticket master gives us a go + <-op.ProceedCh + { + // do you operation + } + log.Println(acceptsContentType) + // verify if this operation is allowed if !api.isValidOp(w, req, acceptsContentType) { return @@ -121,8 +154,19 @@ func (api MinioAPI) PutObjectHandler(w http.ResponseWriter, req *http.Request) { /// Multipart API // NewMultipartUploadHandler - New multipart upload -func (api MinioAPI) NewMultipartUploadHandler(w http.ResponseWriter, req *http.Request) { +func (api Minio) NewMultipartUploadHandler(w http.ResponseWriter, req *http.Request) { acceptsContentType := getContentType(req) + + op := Operation{} + op.ProceedCh = make(chan struct{}) + api.OP <- op + // block until Ticket master gives us a go + <-op.ProceedCh + { + // do you operation + } + log.Println(acceptsContentType) + // handle ACL's here at bucket level if !api.isValidOp(w, req, acceptsContentType) { return @@ -141,8 +185,19 @@ func (api MinioAPI) NewMultipartUploadHandler(w http.ResponseWriter, req *http.R } // PutObjectPartHandler - Upload part -func (api MinioAPI) PutObjectPartHandler(w http.ResponseWriter, req *http.Request) { +func (api Minio) PutObjectPartHandler(w http.ResponseWriter, req *http.Request) { acceptsContentType := getContentType(req) + + op := Operation{} + op.ProceedCh = make(chan struct{}) + api.OP <- op + // block until Ticket master gives us a go + <-op.ProceedCh + { + // do you operation + } + log.Println(acceptsContentType) + // handle ACL's here at bucket level if !api.isValidOp(w, req, acceptsContentType) { return @@ -190,8 +245,19 @@ func (api MinioAPI) PutObjectPartHandler(w http.ResponseWriter, req *http.Reques } // AbortMultipartUploadHandler - Abort multipart upload -func (api MinioAPI) AbortMultipartUploadHandler(w http.ResponseWriter, req *http.Request) { +func (api Minio) AbortMultipartUploadHandler(w http.ResponseWriter, req *http.Request) { acceptsContentType := getContentType(req) + + op := Operation{} + op.ProceedCh = make(chan struct{}) + api.OP <- op + // block until Ticket master gives us a go + <-op.ProceedCh + { + // do you operation + } + log.Println(acceptsContentType) + // handle ACL's here at bucket level if !api.isValidOp(w, req, acceptsContentType) { return @@ -206,8 +272,19 @@ func (api MinioAPI) AbortMultipartUploadHandler(w http.ResponseWriter, req *http } // ListObjectPartsHandler - List object parts -func (api MinioAPI) ListObjectPartsHandler(w http.ResponseWriter, req *http.Request) { +func (api Minio) ListObjectPartsHandler(w http.ResponseWriter, req *http.Request) { acceptsContentType := getContentType(req) + + op := Operation{} + op.ProceedCh = make(chan struct{}) + api.OP <- op + // block until Ticket master gives us a go + <-op.ProceedCh + { + // do you operation + } + log.Println(acceptsContentType) + // handle ACL's here at bucket level if !api.isValidOp(w, req, acceptsContentType) { return @@ -225,8 +302,19 @@ func (api MinioAPI) ListObjectPartsHandler(w http.ResponseWriter, req *http.Requ } // CompleteMultipartUploadHandler - Complete multipart upload -func (api MinioAPI) CompleteMultipartUploadHandler(w http.ResponseWriter, req *http.Request) { +func (api Minio) CompleteMultipartUploadHandler(w http.ResponseWriter, req *http.Request) { acceptsContentType := getContentType(req) + + op := Operation{} + op.ProceedCh = make(chan struct{}) + api.OP <- op + // block until Ticket master gives us a go + <-op.ProceedCh + { + // do you operation + } + log.Println(acceptsContentType) + // handle ACL's here at bucket level if !api.isValidOp(w, req, acceptsContentType) { return @@ -262,13 +350,13 @@ func (api MinioAPI) CompleteMultipartUploadHandler(w http.ResponseWriter, req *h /// Delete API // DeleteBucketHandler - Delete bucket -func (api MinioAPI) DeleteBucketHandler(w http.ResponseWriter, req *http.Request) { +func (api Minio) DeleteBucketHandler(w http.ResponseWriter, req *http.Request) { error := getErrorCode(NotImplemented) w.WriteHeader(error.HTTPStatusCode) } // DeleteObjectHandler - Delete object -func (api MinioAPI) DeleteObjectHandler(w http.ResponseWriter, req *http.Request) { +func (api Minio) DeleteObjectHandler(w http.ResponseWriter, req *http.Request) { error := getErrorCode(NotImplemented) w.WriteHeader(error.HTTPStatusCode) } diff --git a/pkg/server/router.go b/pkg/server/router.go index 0ed4e1070..c3ba765e1 100644 --- a/pkg/server/router.go +++ b/pkg/server/router.go @@ -24,28 +24,31 @@ import ( "github.com/minio/minio/pkg/server/rpc" ) -// registerAPI - register all the object API handlers to their respective paths -func registerAPI(mux *router.Router) http.Handler { - api := api.MinioAPI{} +func getAPI() api.Minio { + a := api.New() + return a +} - mux.HandleFunc("/", api.ListBucketsHandler).Methods("GET") - mux.HandleFunc("/{bucket}", api.ListObjectsHandler).Methods("GET") - mux.HandleFunc("/{bucket}", api.PutBucketHandler).Methods("PUT") - mux.HandleFunc("/{bucket}", api.HeadBucketHandler).Methods("HEAD") - mux.HandleFunc("/{bucket}/{object:.*}", api.HeadObjectHandler).Methods("HEAD") - mux.HandleFunc("/{bucket}/{object:.*}", api.PutObjectPartHandler).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}").Methods("PUT") - mux.HandleFunc("/{bucket}/{object:.*}", api.ListObjectPartsHandler).Queries("uploadId", "{uploadId:.*}").Methods("GET") - mux.HandleFunc("/{bucket}/{object:.*}", api.CompleteMultipartUploadHandler).Queries("uploadId", "{uploadId:.*}").Methods("POST") - mux.HandleFunc("/{bucket}/{object:.*}", api.NewMultipartUploadHandler).Methods("POST") - mux.HandleFunc("/{bucket}/{object:.*}", api.AbortMultipartUploadHandler).Queries("uploadId", "{uploadId:.*}").Methods("DELETE") - mux.HandleFunc("/{bucket}/{object:.*}", api.GetObjectHandler).Methods("GET") - mux.HandleFunc("/{bucket}/{object:.*}", api.PutObjectHandler).Methods("PUT") +// registerAPI - register all the object API handlers to their respective paths +func registerAPI(mux *router.Router, a api.Minio) http.Handler { + mux.HandleFunc("/", a.ListBucketsHandler).Methods("GET") + mux.HandleFunc("/{bucket}", a.ListObjectsHandler).Methods("GET") + mux.HandleFunc("/{bucket}", a.PutBucketHandler).Methods("PUT") + mux.HandleFunc("/{bucket}", a.HeadBucketHandler).Methods("HEAD") + mux.HandleFunc("/{bucket}/{object:.*}", a.HeadObjectHandler).Methods("HEAD") + mux.HandleFunc("/{bucket}/{object:.*}", a.PutObjectPartHandler).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}").Methods("PUT") + mux.HandleFunc("/{bucket}/{object:.*}", a.ListObjectPartsHandler).Queries("uploadId", "{uploadId:.*}").Methods("GET") + mux.HandleFunc("/{bucket}/{object:.*}", a.CompleteMultipartUploadHandler).Queries("uploadId", "{uploadId:.*}").Methods("POST") + mux.HandleFunc("/{bucket}/{object:.*}", a.NewMultipartUploadHandler).Methods("POST") + mux.HandleFunc("/{bucket}/{object:.*}", a.AbortMultipartUploadHandler).Queries("uploadId", "{uploadId:.*}").Methods("DELETE") + mux.HandleFunc("/{bucket}/{object:.*}", a.GetObjectHandler).Methods("GET") + mux.HandleFunc("/{bucket}/{object:.*}", a.PutObjectHandler).Methods("PUT") // not implemented yet - mux.HandleFunc("/{bucket}", api.DeleteBucketHandler).Methods("DELETE") + mux.HandleFunc("/{bucket}", a.DeleteBucketHandler).Methods("DELETE") // unsupported API - mux.HandleFunc("/{bucket}/{object:.*}", api.DeleteObjectHandler).Methods("DELETE") + mux.HandleFunc("/{bucket}/{object:.*}", a.DeleteObjectHandler).Methods("DELETE") return mux } @@ -102,9 +105,12 @@ func registerRPC(mux *router.Router, s *rpc.Server) http.Handler { } // getAPIHandler api handler -func getAPIHandler(conf api.Config) http.Handler { +func getAPIHandler(conf api.Config) (http.Handler, api.Minio) { mux := router.NewRouter() - return registerOtherMiddleware(registerAPI(mux), conf) + minioAPI := getAPI() + apiHandler := registerAPI(mux, minioAPI) + apiHandler = registerOtherMiddleware(apiHandler, conf) + return apiHandler, minioAPI } // getRPCHandler rpc handler diff --git a/pkg/server/server.go b/pkg/server/server.go index 3ddc12623..a383732c2 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -25,13 +25,13 @@ import ( "github.com/minio/minio/pkg/server/api" ) -func startAPI(errCh chan error, conf api.Config) { +func startAPI(errCh chan error, conf api.Config, apiHandler http.Handler) { defer close(errCh) // Minio server config httpServer := &http.Server{ Addr: conf.Address, - Handler: getAPIHandler(conf), + Handler: apiHandler, MaxHeaderBytes: 1 << 20, } @@ -74,20 +74,24 @@ func startAPI(errCh chan error, conf api.Config) { } } -func startRPC(errCh chan error) { +func startRPC(errCh chan error, rpcHandler http.Handler) { defer close(errCh) // Minio server config httpServer := &http.Server{ Addr: "127.0.0.1:9001", // TODO make this configurable - Handler: getRPCHandler(), + Handler: rpcHandler, MaxHeaderBytes: 1 << 20, } errCh <- httpServer.ListenAndServe() } -func startTM(errCh chan error) { - defer close(errCh) +func startTM(a api.Minio) { + for { + for op := range a.OP { + close(op.ProceedCh) + } + } } // StartServices starts basic services for a server @@ -95,8 +99,11 @@ func StartServices(conf api.Config) error { apiErrCh := make(chan error) rpcErrCh := make(chan error) - go startAPI(apiErrCh, conf) - go startRPC(rpcErrCh) + apiHandler, minioAPI := getAPIHandler(conf) + go startAPI(apiErrCh, conf, apiHandler) + rpcHandler := getRPCHandler() + go startRPC(rpcErrCh, rpcHandler) + go startTM(minioAPI) select { case err := <-apiErrCh: