From 30b0b4debab30dc756da13222e49e1a0953f1c2b Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 12 Apr 2016 12:45:15 -0700 Subject: [PATCH] storage/server/client: Enable storage server, enable client storage. --- api-router.go | 8 +- bucket-handlers.go | 18 +-- bucket-policy-handlers.go | 6 +- fs.go | 7 +- generic-handlers.go | 4 +- network-fs.go | 239 ++++++++++++++++++++++++++++++++++++++ object-api-interface.go | 33 ------ object-handlers.go | 24 ++-- object_api_suite_test.go | 182 ++++++++++++++--------------- routers.go | 31 ++++- server-main.go | 38 +++--- server_test.go | 10 +- storage-network.go | 178 ---------------------------- storage-rpc-datatypes.go | 23 ---- storage-rpc-server.go | 126 ++++++++------------ web-handlers.go | 24 ++-- web-router.go | 8 +- 17 files changed, 476 insertions(+), 483 deletions(-) create mode 100644 network-fs.go delete mode 100644 object-api-interface.go delete mode 100644 storage-network.go diff --git a/api-router.go b/api-router.go index 50986f687..b263a20c3 100644 --- a/api-router.go +++ b/api-router.go @@ -18,13 +18,13 @@ package main import router "github.com/gorilla/mux" -// objectStorageAPI container for S3 compatible API. -type objectStorageAPI struct { - ObjectAPI ObjectAPI +// objectAPIHandler implements and provides http handlers for S3 API. +type objectAPIHandlers struct { + ObjectAPI *objectAPI } // registerAPIRouter - registers S3 compatible APIs. -func registerAPIRouter(mux *router.Router, api objectStorageAPI) { +func registerAPIRouter(mux *router.Router, api objectAPIHandlers) { // API Router apiRouter := mux.NewRoute().PathPrefix("/").Subrouter() diff --git a/bucket-handlers.go b/bucket-handlers.go index f79d986d1..1ce48bd83 100644 --- a/bucket-handlers.go +++ b/bucket-handlers.go @@ -74,7 +74,7 @@ func enforceBucketPolicy(action string, bucket string, reqURL *url.URL) (s3Error // GetBucketLocationHandler - GET Bucket location. // ------------------------- // This operation returns bucket location. -func (api objectStorageAPI) GetBucketLocationHandler(w http.ResponseWriter, r *http.Request) { +func (api objectAPIHandlers) GetBucketLocationHandler(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) bucket := vars["bucket"] @@ -152,7 +152,7 @@ func (api objectStorageAPI) GetBucketLocationHandler(w http.ResponseWriter, r *h // completed or aborted. This operation returns at most 1,000 multipart // uploads in the response. // -func (api objectStorageAPI) ListMultipartUploadsHandler(w http.ResponseWriter, r *http.Request) { +func (api objectAPIHandlers) ListMultipartUploadsHandler(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) bucket := vars["bucket"] @@ -223,7 +223,7 @@ func (api objectStorageAPI) ListMultipartUploadsHandler(w http.ResponseWriter, r // of the objects in a bucket. You can use the request parameters as selection // criteria to return a subset of the objects in a bucket. // -func (api objectStorageAPI) ListObjectsHandler(w http.ResponseWriter, r *http.Request) { +func (api objectAPIHandlers) ListObjectsHandler(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) bucket := vars["bucket"] @@ -301,7 +301,7 @@ func (api objectStorageAPI) ListObjectsHandler(w http.ResponseWriter, r *http.Re // ----------- // This implementation of the GET operation returns a list of all buckets // owned by the authenticated sender of the request. -func (api objectStorageAPI) ListBucketsHandler(w http.ResponseWriter, r *http.Request) { +func (api objectAPIHandlers) ListBucketsHandler(w http.ResponseWriter, r *http.Request) { // List buckets does not support bucket policies. switch getRequestAuthType(r) { default: @@ -352,7 +352,7 @@ func (api objectStorageAPI) ListBucketsHandler(w http.ResponseWriter, r *http.Re } // DeleteMultipleObjectsHandler - deletes multiple objects. -func (api objectStorageAPI) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *http.Request) { +func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) bucket := vars["bucket"] @@ -463,7 +463,7 @@ func (api objectStorageAPI) DeleteMultipleObjectsHandler(w http.ResponseWriter, // PutBucketHandler - PUT Bucket // ---------- // This implementation of the PUT operation creates a new bucket for authenticated request -func (api objectStorageAPI) PutBucketHandler(w http.ResponseWriter, r *http.Request) { +func (api objectAPIHandlers) PutBucketHandler(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) bucket := vars["bucket"] @@ -528,7 +528,7 @@ func extractHTTPFormValues(reader *multipart.Reader) (io.Reader, map[string]stri // ---------- // This implementation of the POST operation handles object creation with a specified // signature policy in multipart/form-data -func (api objectStorageAPI) PostPolicyBucketHandler(w http.ResponseWriter, r *http.Request) { +func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *http.Request) { // Here the parameter is the size of the form data that should // be loaded in memory, the remaining being put in temporary files. reader, e := r.MultipartReader() @@ -589,7 +589,7 @@ func (api objectStorageAPI) PostPolicyBucketHandler(w http.ResponseWriter, r *ht // The operation returns a 200 OK if the bucket exists and you // have permission to access it. Otherwise, the operation might // return responses such as 404 Not Found and 403 Forbidden. -func (api objectStorageAPI) HeadBucketHandler(w http.ResponseWriter, r *http.Request) { +func (api objectAPIHandlers) HeadBucketHandler(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) bucket := vars["bucket"] @@ -628,7 +628,7 @@ func (api objectStorageAPI) HeadBucketHandler(w http.ResponseWriter, r *http.Req } // DeleteBucketHandler - Delete bucket -func (api objectStorageAPI) DeleteBucketHandler(w http.ResponseWriter, r *http.Request) { +func (api objectAPIHandlers) DeleteBucketHandler(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) bucket := vars["bucket"] diff --git a/bucket-policy-handlers.go b/bucket-policy-handlers.go index eab8949a7..35f674d81 100644 --- a/bucket-policy-handlers.go +++ b/bucket-policy-handlers.go @@ -127,7 +127,7 @@ func bucketPolicyConditionMatch(conditions map[string]string, statement policySt // ----------------- // This implementation of the PUT operation uses the policy // subresource to add to or replace a policy on a bucket -func (api objectStorageAPI) PutBucketPolicyHandler(w http.ResponseWriter, r *http.Request) { +func (api objectAPIHandlers) PutBucketPolicyHandler(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) bucket := vars["bucket"] @@ -201,7 +201,7 @@ func (api objectStorageAPI) PutBucketPolicyHandler(w http.ResponseWriter, r *htt // ----------------- // This implementation of the DELETE operation uses the policy // subresource to add to remove a policy on a bucket. -func (api objectStorageAPI) DeleteBucketPolicyHandler(w http.ResponseWriter, r *http.Request) { +func (api objectAPIHandlers) DeleteBucketPolicyHandler(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) bucket := vars["bucket"] @@ -238,7 +238,7 @@ func (api objectStorageAPI) DeleteBucketPolicyHandler(w http.ResponseWriter, r * // ----------------- // This operation uses the policy // subresource to return the policy of a specified bucket. -func (api objectStorageAPI) GetBucketPolicyHandler(w http.ResponseWriter, r *http.Request) { +func (api objectAPIHandlers) GetBucketPolicyHandler(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) bucket := vars["bucket"] diff --git a/fs.go b/fs.go index 68a29c3ea..7a19b78f5 100644 --- a/fs.go +++ b/fs.go @@ -66,7 +66,7 @@ func isDirEmpty(dirname string) (status bool, err error) { // isDirExist - returns whether given directory exists or not. func isDirExist(dirname string) (bool, error) { - fi, e := os.Lstat(dirname) + fi, e := os.Stat(dirname) if e != nil { if os.IsNotExist(e) { return false, nil @@ -322,6 +322,8 @@ func (s fsStorage) ListFiles(volume, prefix, marker string, recursive bool, coun if err == nil { // Prefix does not exist, not an error just respond empty list response. return nil, true, nil + } else if strings.Contains(err.Error(), "not a directory") { + return nil, true, nil } // Rest errors should be treated as failure. return nil, true, err @@ -465,6 +467,9 @@ func (s fsStorage) StatFile(volume, path string) (file FileInfo, err error) { if os.IsNotExist(err) { return FileInfo{}, errFileNotFound } + if strings.Contains(err.Error(), "not a directory") { + return FileInfo{}, errIsNotRegular + } return FileInfo{}, err } if st.Mode().IsDir() { diff --git a/generic-handlers.go b/generic-handlers.go index cc2a03bd3..daa071d34 100644 --- a/generic-handlers.go +++ b/generic-handlers.go @@ -58,10 +58,10 @@ func (h redirectHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Re-direction handled specifically for browsers. if strings.Contains(r.Header.Get("User-Agent"), "Mozilla") { // '/' is redirected to 'locationPrefix/' - // '/rpc' is redirected to 'locationPrefix/rpc' + // '/webrpc' is redirected to 'locationPrefix/webrpc' // '/login' is redirected to 'locationPrefix/login' switch r.URL.Path { - case "/", "/rpc", "/login", "/favicon.ico": + case "/", "/webrpc", "/login", "/favicon.ico": location := h.locationPrefix + r.URL.Path // Redirect to new location. http.Redirect(w, r, location, http.StatusTemporaryRedirect) diff --git a/network-fs.go b/network-fs.go new file mode 100644 index 000000000..a160c4fd4 --- /dev/null +++ b/network-fs.go @@ -0,0 +1,239 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "errors" + "fmt" + "io" + "net/http" + "net/rpc" + "net/url" + urlpath "path" + "strconv" + "strings" + "time" +) + +type networkFS struct { + netAddr string + netPath string + rpcClient *rpc.Client + httpClient *http.Client +} + +const ( + connected = "200 Connected to Go RPC" + dialTimeoutSecs = 30 // 30 seconds. +) + +// splits network path into its components Address and Path. +func splitNetPath(networkPath string) (netAddr, netPath string) { + index := strings.LastIndex(networkPath, ":") + netAddr = networkPath[:index] + netPath = networkPath[index+1:] + return netAddr, netPath +} + +// Initialize new network file system. +func newNetworkFS(networkPath string) (StorageAPI, error) { + // Input validation. + if networkPath == "" && strings.LastIndex(networkPath, ":") != -1 { + return nil, errInvalidArgument + } + + // TODO validate netAddr and netPath. + netAddr, netPath := splitNetPath(networkPath) + + // Dial minio rpc storage http path. + rpcClient, err := rpc.DialHTTPPath("tcp", netAddr, "/minio/rpc/storage") + if err != nil { + return nil, err + } + + // Initialize http client. + httpClient := &http.Client{ + // Setting a sensible time out of 6minutes to wait for + // response headers. Request is pro-actively cancelled + // after 6minutes if no response was received from server. + Timeout: 6 * time.Minute, + Transport: http.DefaultTransport, + } + + // Initialize network storage. + ndisk := &networkFS{ + netAddr: netAddr, + netPath: netPath, + rpcClient: rpcClient, + httpClient: httpClient, + } + + // Returns successfully here. + return ndisk, nil +} + +// MakeVol - make a volume. +func (n networkFS) MakeVol(volume string) error { + reply := GenericReply{} + if err := n.rpcClient.Call("Storage.MakeVolHandler", volume, &reply); err != nil { + if err.Error() == errVolumeExists.Error() { + return errVolumeExists + } + return err + } + return nil +} + +// ListVols - List all volumes. +func (n networkFS) ListVols() (vols []VolInfo, err error) { + ListVols := ListVolsReply{} + err = n.rpcClient.Call("Storage.ListVolsHandler", "", &ListVols) + if err != nil { + return nil, err + } + return ListVols.Vols, nil +} + +// StatVol - get current Stat volume info. +func (n networkFS) StatVol(volume string) (volInfo VolInfo, err error) { + if err = n.rpcClient.Call("Storage.StatVolHandler", volume, &volInfo); err != nil { + if err.Error() == errVolumeNotFound.Error() { + return VolInfo{}, errVolumeNotFound + } + return VolInfo{}, err + } + return volInfo, nil +} + +// DeleteVol - Delete a volume. +func (n networkFS) DeleteVol(volume string) error { + reply := GenericReply{} + if err := n.rpcClient.Call("Storage.DeleteVolHandler", volume, &reply); err != nil { + if err.Error() == errVolumeNotFound.Error() { + return errVolumeNotFound + } + return err + } + return nil +} + +// File operations. + +// CreateFile - create file. +func (n networkFS) CreateFile(volume, path string) (writeCloser io.WriteCloser, err error) { + writeURL := new(url.URL) + writeURL.Scheme = "http" // TODO fix this. + writeURL.Host = n.netAddr + writeURL.Path = fmt.Sprintf("/minio/rpc/storage/upload/%s", urlpath.Join(volume, path)) + + contentType := "application/octet-stream" + readCloser, writeCloser := io.Pipe() + go func() { + resp, err := n.httpClient.Post(writeURL.String(), contentType, readCloser) + if err != nil { + readCloser.CloseWithError(err) + return + } + if resp != nil { + if resp.StatusCode != http.StatusNotFound { + readCloser.CloseWithError(errFileNotFound) + return + } + readCloser.CloseWithError(errors.New("Invalid response.")) + } + }() + return writeCloser, nil +} + +// StatFile - get latest Stat information for a file at path. +func (n networkFS) StatFile(volume, path string) (fileInfo FileInfo, err error) { + if err = n.rpcClient.Call("Storage.StatFileHandler", StatFileArgs{ + Vol: volume, + Path: path, + }, &fileInfo); err != nil { + if err.Error() == errVolumeNotFound.Error() { + return FileInfo{}, errVolumeNotFound + } else if err.Error() == errFileNotFound.Error() { + return FileInfo{}, errFileNotFound + } else if err.Error() == errIsNotRegular.Error() { + return FileInfo{}, errFileNotFound + } + return FileInfo{}, err + } + return fileInfo, nil +} + +// ReadFile - reads a file. +func (n networkFS) ReadFile(volume string, path string, offset int64) (reader io.ReadCloser, err error) { + readURL := new(url.URL) + readURL.Scheme = "http" // TODO fix this. + readURL.Host = n.netAddr + readURL.Path = fmt.Sprintf("/minio/rpc/storage/download/%s", urlpath.Join(volume, path)) + readQuery := make(url.Values) + readQuery.Set("offset", strconv.FormatInt(offset, 10)) + readURL.RawQuery = readQuery.Encode() + resp, err := n.httpClient.Get(readURL.String()) + if err != nil { + return nil, err + } + if resp.StatusCode != http.StatusOK { + if resp.StatusCode == http.StatusNotFound { + return nil, errFileNotFound + } + return nil, errors.New("Invalid response") + } + return resp.Body, nil +} + +// ListFiles - List all files in a volume. +func (n networkFS) ListFiles(volume, prefix, marker string, recursive bool, count int) (files []FileInfo, eof bool, err error) { + listFilesReply := ListFilesReply{} + if err = n.rpcClient.Call("Storage.ListFilesHandler", ListFilesArgs{ + Vol: volume, + Prefix: prefix, + Marker: marker, + Recursive: recursive, + Count: count, + }, &listFilesReply); err != nil { + if err.Error() == errVolumeNotFound.Error() { + return nil, true, errVolumeNotFound + } + return nil, true, err + } + // List of files. + files = listFilesReply.Files + // EOF. + eof = listFilesReply.EOF + return files, eof, nil +} + +// DeleteFile - Delete a file at path. +func (n networkFS) DeleteFile(volume, path string) (err error) { + reply := GenericReply{} + if err = n.rpcClient.Call("Storage.DeleteFileHandler", DeleteFileArgs{ + Vol: volume, + Path: path, + }, &reply); err != nil { + if err.Error() == errVolumeNotFound.Error() { + return errVolumeNotFound + } else if err.Error() == errFileNotFound.Error() { + return errFileNotFound + } + return err + } + return nil +} diff --git a/object-api-interface.go b/object-api-interface.go deleted file mode 100644 index 9c4b11681..000000000 --- a/object-api-interface.go +++ /dev/null @@ -1,33 +0,0 @@ -package main - -import ( - "io" - - "github.com/minio/minio/pkg/probe" -) - -// ObjectAPI interface. -type ObjectAPI interface { - // Bucket resource API. - DeleteBucket(bucket string) *probe.Error - ListBuckets() ([]BucketInfo, *probe.Error) - MakeBucket(bucket string) *probe.Error - GetBucketInfo(bucket string) (BucketInfo, *probe.Error) - - // Bucket query API. - ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, *probe.Error) - ListMultipartUploads(bucket, objectPrefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, *probe.Error) - - // Object resource API. - GetObject(bucket, object string, startOffset int64) (io.ReadCloser, *probe.Error) - GetObjectInfo(bucket, object string) (ObjectInfo, *probe.Error) - PutObject(bucket string, object string, size int64, data io.Reader, metadata map[string]string) (ObjectInfo, *probe.Error) - DeleteObject(bucket, object string) *probe.Error - - // Object query API. - NewMultipartUpload(bucket, object string) (string, *probe.Error) - PutObjectPart(bucket, object, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, *probe.Error) - ListObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, *probe.Error) - CompleteMultipartUpload(bucket string, object string, uploadID string, parts []completePart) (ObjectInfo, *probe.Error) - AbortMultipartUpload(bucket, object, uploadID string) *probe.Error -} diff --git a/object-handlers.go b/object-handlers.go index 19c3d02d6..ef501da2d 100644 --- a/object-handlers.go +++ b/object-handlers.go @@ -74,7 +74,7 @@ func errAllowableObjectNotFound(bucket string, r *http.Request) APIErrorCode { // ---------- // This implementation of the GET operation retrieves object. To use GET, // you must have READ access to the object. -func (api objectStorageAPI) GetObjectHandler(w http.ResponseWriter, r *http.Request) { +func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Request) { var object, bucket string vars := mux.Vars(r) bucket = vars["bucket"] @@ -268,7 +268,7 @@ func checkETag(w http.ResponseWriter, r *http.Request) bool { // HeadObjectHandler - HEAD Object // ----------- // The HEAD operation retrieves metadata from an object without returning the object itself. -func (api objectStorageAPI) HeadObjectHandler(w http.ResponseWriter, r *http.Request) { +func (api objectAPIHandlers) HeadObjectHandler(w http.ResponseWriter, r *http.Request) { var object, bucket string vars := mux.Vars(r) bucket = vars["bucket"] @@ -332,7 +332,7 @@ func (api objectStorageAPI) HeadObjectHandler(w http.ResponseWriter, r *http.Req // ---------- // This implementation of the PUT operation adds an object to a bucket // while reading the object from another source. -func (api objectStorageAPI) CopyObjectHandler(w http.ResponseWriter, r *http.Request) { +func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) bucket := vars["bucket"] object := vars["object"] @@ -583,7 +583,7 @@ func checkCopySourceETag(w http.ResponseWriter, r *http.Request) bool { // PutObjectHandler - PUT Object // ---------- // This implementation of the PUT operation adds an object to a bucket. -func (api objectStorageAPI) PutObjectHandler(w http.ResponseWriter, r *http.Request) { +func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Request) { // If the matching failed, it means that the X-Amz-Copy-Source was // wrong, fail right here. if _, ok := r.Header["X-Amz-Copy-Source"]; ok { @@ -699,10 +699,10 @@ func (api objectStorageAPI) PutObjectHandler(w http.ResponseWriter, r *http.Requ writeSuccessResponse(w, nil) } -/// Multipart objectStorageAPI +/// Multipart objectAPIHandlers // NewMultipartUploadHandler - New multipart upload -func (api objectStorageAPI) NewMultipartUploadHandler(w http.ResponseWriter, r *http.Request) { +func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r *http.Request) { var object, bucket string vars := mux.Vars(r) bucket = vars["bucket"] @@ -755,7 +755,7 @@ func (api objectStorageAPI) NewMultipartUploadHandler(w http.ResponseWriter, r * } // PutObjectPartHandler - Upload part -func (api objectStorageAPI) PutObjectPartHandler(w http.ResponseWriter, r *http.Request) { +func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) bucket := vars["bucket"] object := vars["object"] @@ -868,7 +868,7 @@ func (api objectStorageAPI) PutObjectPartHandler(w http.ResponseWriter, r *http. } // AbortMultipartUploadHandler - Abort multipart upload -func (api objectStorageAPI) AbortMultipartUploadHandler(w http.ResponseWriter, r *http.Request) { +func (api objectAPIHandlers) AbortMultipartUploadHandler(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) bucket := vars["bucket"] object := vars["object"] @@ -915,7 +915,7 @@ func (api objectStorageAPI) AbortMultipartUploadHandler(w http.ResponseWriter, r } // ListObjectPartsHandler - List object parts -func (api objectStorageAPI) ListObjectPartsHandler(w http.ResponseWriter, r *http.Request) { +func (api objectAPIHandlers) ListObjectPartsHandler(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) bucket := vars["bucket"] object := vars["object"] @@ -979,7 +979,7 @@ func (api objectStorageAPI) ListObjectPartsHandler(w http.ResponseWriter, r *htt } // CompleteMultipartUploadHandler - Complete multipart upload -func (api objectStorageAPI) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.Request) { +func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) bucket := vars["bucket"] object := vars["object"] @@ -1066,10 +1066,10 @@ func (api objectStorageAPI) CompleteMultipartUploadHandler(w http.ResponseWriter writeSuccessResponse(w, encodedSuccessResponse) } -/// Delete objectStorageAPI +/// Delete objectAPIHandlers // DeleteObjectHandler - delete an object -func (api objectStorageAPI) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) { +func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) bucket := vars["bucket"] object := vars["object"] diff --git a/object_api_suite_test.go b/object_api_suite_test.go index 804a64db7..635aae54c 100644 --- a/object_api_suite_test.go +++ b/object_api_suite_test.go @@ -28,7 +28,7 @@ import ( ) // APITestSuite - collection of API tests -func APITestSuite(c *check.C, create func() ObjectAPI) { +func APITestSuite(c *check.C, create func() *objectAPI) { testMakeBucket(c, create) testMultipleObjectCreation(c, create) testPaging(c, create) @@ -46,17 +46,17 @@ func APITestSuite(c *check.C, create func() ObjectAPI) { testMultipartObjectAbort(c, create) } -func testMakeBucket(c *check.C, create func() ObjectAPI) { - fs := create() - err := fs.MakeBucket("bucket") +func testMakeBucket(c *check.C, create func() *objectAPI) { + obj := create() + err := obj.MakeBucket("bucket") c.Assert(err, check.IsNil) } -func testMultipartObjectCreation(c *check.C, create func() ObjectAPI) { - fs := create() - err := fs.MakeBucket("bucket") +func testMultipartObjectCreation(c *check.C, create func() *objectAPI) { + obj := create() + err := obj.MakeBucket("bucket") c.Assert(err, check.IsNil) - uploadID, err := fs.NewMultipartUpload("bucket", "key") + uploadID, err := obj.NewMultipartUpload("bucket", "key") c.Assert(err, check.IsNil) completedParts := completeMultipartUpload{} @@ -72,21 +72,21 @@ func testMultipartObjectCreation(c *check.C, create func() ObjectAPI) { expectedMD5Sumhex := hex.EncodeToString(hasher.Sum(nil)) var calculatedMD5sum string - calculatedMD5sum, err = fs.PutObjectPart("bucket", "key", uploadID, i, int64(len(randomString)), bytes.NewBufferString(randomString), expectedMD5Sumhex) + calculatedMD5sum, err = obj.PutObjectPart("bucket", "key", uploadID, i, int64(len(randomString)), bytes.NewBufferString(randomString), expectedMD5Sumhex) c.Assert(err, check.IsNil) c.Assert(calculatedMD5sum, check.Equals, expectedMD5Sumhex) completedParts.Parts = append(completedParts.Parts, completePart{PartNumber: i, ETag: calculatedMD5sum}) } - objInfo, err := fs.CompleteMultipartUpload("bucket", "key", uploadID, completedParts.Parts) + objInfo, err := obj.CompleteMultipartUpload("bucket", "key", uploadID, completedParts.Parts) c.Assert(err, check.IsNil) c.Assert(objInfo.MD5Sum, check.Equals, "3605d84b1c43b1a664aa7c0d5082d271-10") } -func testMultipartObjectAbort(c *check.C, create func() ObjectAPI) { - fs := create() - err := fs.MakeBucket("bucket") +func testMultipartObjectAbort(c *check.C, create func() *objectAPI) { + obj := create() + err := obj.MakeBucket("bucket") c.Assert(err, check.IsNil) - uploadID, err := fs.NewMultipartUpload("bucket", "key") + uploadID, err := obj.NewMultipartUpload("bucket", "key") c.Assert(err, check.IsNil) parts := make(map[int]string) @@ -104,19 +104,19 @@ func testMultipartObjectAbort(c *check.C, create func() ObjectAPI) { metadata["md5"] = expectedMD5Sumhex var calculatedMD5sum string - calculatedMD5sum, err = fs.PutObjectPart("bucket", "key", uploadID, i, int64(len(randomString)), bytes.NewBufferString(randomString), expectedMD5Sumhex) + calculatedMD5sum, err = obj.PutObjectPart("bucket", "key", uploadID, i, int64(len(randomString)), bytes.NewBufferString(randomString), expectedMD5Sumhex) c.Assert(err, check.IsNil) c.Assert(calculatedMD5sum, check.Equals, expectedMD5Sumhex) parts[i] = expectedMD5Sumhex } - err = fs.AbortMultipartUpload("bucket", "key", uploadID) + err = obj.AbortMultipartUpload("bucket", "key", uploadID) c.Assert(err, check.IsNil) } -func testMultipleObjectCreation(c *check.C, create func() ObjectAPI) { +func testMultipleObjectCreation(c *check.C, create func() *objectAPI) { objects := make(map[string][]byte) - fs := create() - err := fs.MakeBucket("bucket") + obj := create() + err := obj.MakeBucket("bucket") c.Assert(err, check.IsNil) for i := 0; i < 10; i++ { randomPerm := rand.Perm(10) @@ -133,40 +133,40 @@ func testMultipleObjectCreation(c *check.C, create func() ObjectAPI) { objects[key] = []byte(randomString) metadata := make(map[string]string) metadata["md5Sum"] = expectedMD5Sumhex - objInfo, err := fs.PutObject("bucket", key, int64(len(randomString)), bytes.NewBufferString(randomString), metadata) + objInfo, err := obj.PutObject("bucket", key, int64(len(randomString)), bytes.NewBufferString(randomString), metadata) c.Assert(err, check.IsNil) c.Assert(objInfo.MD5Sum, check.Equals, expectedMD5Sumhex) } for key, value := range objects { var byteBuffer bytes.Buffer - r, err := fs.GetObject("bucket", key, 0) + r, err := obj.GetObject("bucket", key, 0) c.Assert(err, check.IsNil) _, e := io.Copy(&byteBuffer, r) c.Assert(e, check.IsNil) c.Assert(byteBuffer.Bytes(), check.DeepEquals, value) c.Assert(r.Close(), check.IsNil) - objInfo, err := fs.GetObjectInfo("bucket", key) + objInfo, err := obj.GetObjectInfo("bucket", key) c.Assert(err, check.IsNil) c.Assert(objInfo.Size, check.Equals, int64(len(value))) r.Close() } } -func testPaging(c *check.C, create func() ObjectAPI) { - fs := create() - fs.MakeBucket("bucket") - result, err := fs.ListObjects("bucket", "", "", "", 0) +func testPaging(c *check.C, create func() *objectAPI) { + obj := create() + obj.MakeBucket("bucket") + result, err := obj.ListObjects("bucket", "", "", "", 0) c.Assert(err, check.IsNil) c.Assert(len(result.Objects), check.Equals, 0) c.Assert(result.IsTruncated, check.Equals, false) // check before paging occurs for i := 0; i < 5; i++ { key := "obj" + strconv.Itoa(i) - _, err = fs.PutObject("bucket", key, int64(len(key)), bytes.NewBufferString(key), nil) + _, err = obj.PutObject("bucket", key, int64(len(key)), bytes.NewBufferString(key), nil) c.Assert(err, check.IsNil) - result, err = fs.ListObjects("bucket", "", "", "", 5) + result, err = obj.ListObjects("bucket", "", "", "", 5) c.Assert(err, check.IsNil) c.Assert(len(result.Objects), check.Equals, i+1) c.Assert(result.IsTruncated, check.Equals, false) @@ -174,27 +174,27 @@ func testPaging(c *check.C, create func() ObjectAPI) { // check after paging occurs pages work for i := 6; i <= 10; i++ { key := "obj" + strconv.Itoa(i) - _, err = fs.PutObject("bucket", key, int64(len(key)), bytes.NewBufferString(key), nil) + _, err = obj.PutObject("bucket", key, int64(len(key)), bytes.NewBufferString(key), nil) c.Assert(err, check.IsNil) - result, err = fs.ListObjects("bucket", "obj", "", "", 5) + result, err = obj.ListObjects("bucket", "obj", "", "", 5) c.Assert(err, check.IsNil) c.Assert(len(result.Objects), check.Equals, 5) c.Assert(result.IsTruncated, check.Equals, true) } // check paging with prefix at end returns less objects { - _, err = fs.PutObject("bucket", "newPrefix", int64(len("prefix1")), bytes.NewBufferString("prefix1"), nil) + _, err = obj.PutObject("bucket", "newPrefix", int64(len("prefix1")), bytes.NewBufferString("prefix1"), nil) c.Assert(err, check.IsNil) - _, err = fs.PutObject("bucket", "newPrefix2", int64(len("prefix2")), bytes.NewBufferString("prefix2"), nil) + _, err = obj.PutObject("bucket", "newPrefix2", int64(len("prefix2")), bytes.NewBufferString("prefix2"), nil) c.Assert(err, check.IsNil) - result, err = fs.ListObjects("bucket", "new", "", "", 5) + result, err = obj.ListObjects("bucket", "new", "", "", 5) c.Assert(err, check.IsNil) c.Assert(len(result.Objects), check.Equals, 2) } // check ordering of pages { - result, err = fs.ListObjects("bucket", "", "", "", 1000) + result, err = obj.ListObjects("bucket", "", "", "", 1000) c.Assert(err, check.IsNil) c.Assert(result.Objects[0].Name, check.Equals, "newPrefix") c.Assert(result.Objects[1].Name, check.Equals, "newPrefix2") @@ -205,11 +205,11 @@ func testPaging(c *check.C, create func() ObjectAPI) { // check delimited results with delimiter and prefix { - _, err = fs.PutObject("bucket", "this/is/delimited", int64(len("prefix1")), bytes.NewBufferString("prefix1"), nil) + _, err = obj.PutObject("bucket", "this/is/delimited", int64(len("prefix1")), bytes.NewBufferString("prefix1"), nil) c.Assert(err, check.IsNil) - _, err = fs.PutObject("bucket", "this/is/also/a/delimited/file", int64(len("prefix2")), bytes.NewBufferString("prefix2"), nil) + _, err = obj.PutObject("bucket", "this/is/also/a/delimited/file", int64(len("prefix2")), bytes.NewBufferString("prefix2"), nil) c.Assert(err, check.IsNil) - result, err = fs.ListObjects("bucket", "this/is/", "", "/", 10) + result, err = obj.ListObjects("bucket", "this/is/", "", "/", 10) c.Assert(err, check.IsNil) c.Assert(len(result.Objects), check.Equals, 1) c.Assert(result.Prefixes[0], check.Equals, "this/is/also/") @@ -217,7 +217,7 @@ func testPaging(c *check.C, create func() ObjectAPI) { // check delimited results with delimiter without prefix { - result, err = fs.ListObjects("bucket", "", "", "/", 1000) + result, err = obj.ListObjects("bucket", "", "", "/", 1000) c.Assert(err, check.IsNil) c.Assert(result.Objects[0].Name, check.Equals, "newPrefix") c.Assert(result.Objects[1].Name, check.Equals, "newPrefix2") @@ -229,7 +229,7 @@ func testPaging(c *check.C, create func() ObjectAPI) { // check results with Marker { - result, err = fs.ListObjects("bucket", "", "newPrefix", "", 3) + result, err = obj.ListObjects("bucket", "", "newPrefix", "", 3) c.Assert(err, check.IsNil) c.Assert(result.Objects[0].Name, check.Equals, "newPrefix2") c.Assert(result.Objects[1].Name, check.Equals, "obj0") @@ -237,7 +237,7 @@ func testPaging(c *check.C, create func() ObjectAPI) { } // check ordering of results with prefix { - result, err = fs.ListObjects("bucket", "obj", "", "", 1000) + result, err = obj.ListObjects("bucket", "obj", "", "", 1000) c.Assert(err, check.IsNil) c.Assert(result.Objects[0].Name, check.Equals, "obj0") c.Assert(result.Objects[1].Name, check.Equals, "obj1") @@ -247,27 +247,27 @@ func testPaging(c *check.C, create func() ObjectAPI) { } // check ordering of results with prefix and no paging { - result, err = fs.ListObjects("bucket", "new", "", "", 5) + result, err = obj.ListObjects("bucket", "new", "", "", 5) c.Assert(err, check.IsNil) c.Assert(result.Objects[0].Name, check.Equals, "newPrefix") c.Assert(result.Objects[1].Name, check.Equals, "newPrefix2") } } -func testObjectOverwriteWorks(c *check.C, create func() ObjectAPI) { - fs := create() - err := fs.MakeBucket("bucket") +func testObjectOverwriteWorks(c *check.C, create func() *objectAPI) { + obj := create() + err := obj.MakeBucket("bucket") c.Assert(err, check.IsNil) - _, err = fs.PutObject("bucket", "object", int64(len("one")), bytes.NewBufferString("one"), nil) + _, err = obj.PutObject("bucket", "object", int64(len("one")), bytes.NewBufferString("one"), nil) c.Assert(err, check.IsNil) // c.Assert(md5Sum1hex, check.Equals, objInfo.MD5Sum) - _, err = fs.PutObject("bucket", "object", int64(len("three")), bytes.NewBufferString("three"), nil) + _, err = obj.PutObject("bucket", "object", int64(len("three")), bytes.NewBufferString("three"), nil) c.Assert(err, check.IsNil) var bytesBuffer bytes.Buffer - r, err := fs.GetObject("bucket", "object", 0) + r, err := obj.GetObject("bucket", "object", 0) c.Assert(err, check.IsNil) _, e := io.Copy(&bytesBuffer, r) c.Assert(e, check.IsNil) @@ -275,30 +275,30 @@ func testObjectOverwriteWorks(c *check.C, create func() ObjectAPI) { c.Assert(r.Close(), check.IsNil) } -func testNonExistantBucketOperations(c *check.C, create func() ObjectAPI) { - fs := create() - _, err := fs.PutObject("bucket", "object", int64(len("one")), bytes.NewBufferString("one"), nil) +func testNonExistantBucketOperations(c *check.C, create func() *objectAPI) { + obj := create() + _, err := obj.PutObject("bucket", "object", int64(len("one")), bytes.NewBufferString("one"), nil) c.Assert(err, check.Not(check.IsNil)) } -func testBucketRecreateFails(c *check.C, create func() ObjectAPI) { - fs := create() - err := fs.MakeBucket("string") +func testBucketRecreateFails(c *check.C, create func() *objectAPI) { + obj := create() + err := obj.MakeBucket("string") c.Assert(err, check.IsNil) - err = fs.MakeBucket("string") + err = obj.MakeBucket("string") c.Assert(err, check.Not(check.IsNil)) } -func testPutObjectInSubdir(c *check.C, create func() ObjectAPI) { - fs := create() - err := fs.MakeBucket("bucket") +func testPutObjectInSubdir(c *check.C, create func() *objectAPI) { + obj := create() + err := obj.MakeBucket("bucket") c.Assert(err, check.IsNil) - _, err = fs.PutObject("bucket", "dir1/dir2/object", int64(len("hello world")), bytes.NewBufferString("hello world"), nil) + _, err = obj.PutObject("bucket", "dir1/dir2/object", int64(len("hello world")), bytes.NewBufferString("hello world"), nil) c.Assert(err, check.IsNil) var bytesBuffer bytes.Buffer - r, err := fs.GetObject("bucket", "dir1/dir2/object", 0) + r, err := obj.GetObject("bucket", "dir1/dir2/object", 0) c.Assert(err, check.IsNil) n, e := io.Copy(&bytesBuffer, r) c.Assert(e, check.IsNil) @@ -307,49 +307,49 @@ func testPutObjectInSubdir(c *check.C, create func() ObjectAPI) { c.Assert(r.Close(), check.IsNil) } -func testListBuckets(c *check.C, create func() ObjectAPI) { - fs := create() +func testListBuckets(c *check.C, create func() *objectAPI) { + obj := create() // test empty list - buckets, err := fs.ListBuckets() + buckets, err := obj.ListBuckets() c.Assert(err, check.IsNil) c.Assert(len(buckets), check.Equals, 0) // add one and test exists - err = fs.MakeBucket("bucket1") + err = obj.MakeBucket("bucket1") c.Assert(err, check.IsNil) - buckets, err = fs.ListBuckets() + buckets, err = obj.ListBuckets() c.Assert(len(buckets), check.Equals, 1) c.Assert(err, check.IsNil) // add two and test exists - err = fs.MakeBucket("bucket2") + err = obj.MakeBucket("bucket2") c.Assert(err, check.IsNil) - buckets, err = fs.ListBuckets() + buckets, err = obj.ListBuckets() c.Assert(len(buckets), check.Equals, 2) c.Assert(err, check.IsNil) // add three and test exists + prefix - err = fs.MakeBucket("bucket22") + err = obj.MakeBucket("bucket22") - buckets, err = fs.ListBuckets() + buckets, err = obj.ListBuckets() c.Assert(len(buckets), check.Equals, 3) c.Assert(err, check.IsNil) } -func testListBucketsOrder(c *check.C, create func() ObjectAPI) { +func testListBucketsOrder(c *check.C, create func() *objectAPI) { // if implementation contains a map, order of map keys will vary. // this ensures they return in the same order each time for i := 0; i < 10; i++ { - fs := create() + obj := create() // add one and test exists - err := fs.MakeBucket("bucket1") + err := obj.MakeBucket("bucket1") c.Assert(err, check.IsNil) - err = fs.MakeBucket("bucket2") + err = obj.MakeBucket("bucket2") c.Assert(err, check.IsNil) - buckets, err := fs.ListBuckets() + buckets, err := obj.ListBuckets() c.Assert(err, check.IsNil) c.Assert(len(buckets), check.Equals, 2) c.Assert(buckets[0].Name, check.Equals, "bucket1") @@ -357,20 +357,20 @@ func testListBucketsOrder(c *check.C, create func() ObjectAPI) { } } -func testListObjectsTestsForNonExistantBucket(c *check.C, create func() ObjectAPI) { - fs := create() - result, err := fs.ListObjects("bucket", "", "", "", 1000) +func testListObjectsTestsForNonExistantBucket(c *check.C, create func() *objectAPI) { + obj := create() + result, err := obj.ListObjects("bucket", "", "", "", 1000) c.Assert(err, check.Not(check.IsNil)) c.Assert(result.IsTruncated, check.Equals, false) c.Assert(len(result.Objects), check.Equals, 0) } -func testNonExistantObjectInBucket(c *check.C, create func() ObjectAPI) { - fs := create() - err := fs.MakeBucket("bucket") +func testNonExistantObjectInBucket(c *check.C, create func() *objectAPI) { + obj := create() + err := obj.MakeBucket("bucket") c.Assert(err, check.IsNil) - _, err = fs.GetObject("bucket", "dir1", 0) + _, err = obj.GetObject("bucket", "dir1", 0) c.Assert(err, check.Not(check.IsNil)) switch err := err.ToGoError().(type) { case ObjectNotFound: @@ -380,15 +380,15 @@ func testNonExistantObjectInBucket(c *check.C, create func() ObjectAPI) { } } -func testGetDirectoryReturnsObjectNotFound(c *check.C, create func() ObjectAPI) { - fs := create() - err := fs.MakeBucket("bucket") +func testGetDirectoryReturnsObjectNotFound(c *check.C, create func() *objectAPI) { + obj := create() + err := obj.MakeBucket("bucket") c.Assert(err, check.IsNil) - _, err = fs.PutObject("bucket", "dir1/dir2/object", int64(len("hello world")), bytes.NewBufferString("hello world"), nil) + _, err = obj.PutObject("bucket", "dir1/dir2/object", int64(len("hello world")), bytes.NewBufferString("hello world"), nil) c.Assert(err, check.IsNil) - _, err = fs.GetObject("bucket", "dir1", 0) + _, err = obj.GetObject("bucket", "dir1", 0) switch err := err.ToGoError().(type) { case ObjectNotFound: c.Assert(err.Bucket, check.Equals, "bucket") @@ -398,7 +398,7 @@ func testGetDirectoryReturnsObjectNotFound(c *check.C, create func() ObjectAPI) c.Assert(err, check.Equals, "ObjectNotFound") } - _, err = fs.GetObject("bucket", "dir1/", 0) + _, err = obj.GetObject("bucket", "dir1/", 0) switch err := err.ToGoError().(type) { case ObjectNotFound: c.Assert(err.Bucket, check.Equals, "bucket") @@ -409,15 +409,15 @@ func testGetDirectoryReturnsObjectNotFound(c *check.C, create func() ObjectAPI) } } -func testDefaultContentType(c *check.C, create func() ObjectAPI) { - fs := create() - err := fs.MakeBucket("bucket") +func testDefaultContentType(c *check.C, create func() *objectAPI) { + obj := create() + err := obj.MakeBucket("bucket") c.Assert(err, check.IsNil) // Test empty - _, err = fs.PutObject("bucket", "one", int64(len("one")), bytes.NewBufferString("one"), nil) + _, err = obj.PutObject("bucket", "one", int64(len("one")), bytes.NewBufferString("one"), nil) c.Assert(err, check.IsNil) - objInfo, err := fs.GetObjectInfo("bucket", "one") + objInfo, err := obj.GetObjectInfo("bucket", "one") c.Assert(err, check.IsNil) c.Assert(objInfo.ContentType, check.Equals, "application/octet-stream") } diff --git a/routers.go b/routers.go index cc363424e..be0716153 100644 --- a/routers.go +++ b/routers.go @@ -18,19 +18,39 @@ package main import ( "net/http" + "os" router "github.com/gorilla/mux" + "github.com/minio/minio/pkg/probe" ) // configureServer handler returns final handler for the http server. -func configureServerHandler(objectAPI ObjectAPI) http.Handler { +func configureServerHandler(srvCmdConfig serverCmdConfig) http.Handler { + var storageHandlers StorageAPI + if len(srvCmdConfig.exportPaths) == 1 { + // Verify if export path is a local file system path. + st, e := os.Stat(srvCmdConfig.exportPaths[0]) + if e == nil && st.Mode().IsDir() { + // Initialize storage API. + storageHandlers, e = newFS(srvCmdConfig.exportPaths[0]) + fatalIf(probe.NewError(e), "Initializing fs failed.", nil) + } else { + // Initialize storage API. + storageHandlers, e = newNetworkFS(srvCmdConfig.exportPaths[0]) + fatalIf(probe.NewError(e), "Initializing network fs failed.", nil) + } + } // else if - XL part. + + // Initialize object layer. + objectAPI := newObjectLayer(storageHandlers) + // Initialize API. - api := objectStorageAPI{ + apiHandlers := objectAPIHandlers{ ObjectAPI: objectAPI, } // Initialize Web. - web := &webAPI{ + webHandlers := &webAPIHandlers{ ObjectAPI: objectAPI, } @@ -38,8 +58,9 @@ func configureServerHandler(objectAPI ObjectAPI) http.Handler { mux := router.NewRouter() // Register all routers. - registerWebRouter(mux, web) - registerAPIRouter(mux, api) + registerStorageRPCRouter(mux, storageHandlers) + registerWebRouter(mux, webHandlers) + registerAPIRouter(mux, apiHandlers) // Add new routers here. // List of some generic handlers which are applied for all diff --git a/server-main.go b/server-main.go index 865f15ecf..4dba2a0f8 100644 --- a/server-main.go +++ b/server-main.go @@ -69,12 +69,17 @@ EXAMPLES: `, } +type serverCmdConfig struct { + serverAddr string + exportPaths []string +} + // configureServer configure a new server instance -func configureServer(serverAddr string, objectAPI ObjectAPI) *http.Server { +func configureServer(srvCmdConfig serverCmdConfig) *http.Server { // Minio server config apiServer := &http.Server{ - Addr: serverAddr, - Handler: configureServerHandler(objectAPI), + Addr: srvCmdConfig.serverAddr, + Handler: configureServerHandler(srvCmdConfig), MaxHeaderBytes: 1 << 20, } @@ -148,7 +153,7 @@ func initServerConfig(c *cli.Context) { // Check server arguments. func checkServerSyntax(c *cli.Context) { - if c.Args().First() == "help" { + if !c.Args().Present() && c.Args().First() == "help" { cli.ShowCommandHelpAndExit(c, "server", 1) } if len(c.Args()) > 2 { @@ -255,26 +260,17 @@ func serverMain(c *cli.Context) { } } - // Check configured ports. + // Check if requested port is available. checkPortAvailability(getPort(net.JoinHostPort(host, port))) - var objectAPI ObjectAPI - var err *probe.Error - - // Set backend FS type. - fsPath := strings.TrimSpace(c.Args().Get(0)) - if fsPath != "" { - // Last argument is always a file system path, verify if it exists and is accessible. - _, e := os.Stat(fsPath) - fatalIf(probe.NewError(e), "Unable to validate the path", nil) - // Initialize filesystem storage layer. - storage, e := newFS(fsPath) - fatalIf(probe.NewError(e), "Initializing filesystem failed.", nil) - objectAPI = newObjectLayer(storage) - } + // Save all command line args as export paths. + exportPaths := c.Args() // Configure server. - apiServer := configureServer(serverAddress, objectAPI) + apiServer := configureServer(serverCmdConfig{ + serverAddr: serverAddress, + exportPaths: exportPaths, + }) // Credential. cred := serverConfig.GetCredential() @@ -305,6 +301,6 @@ func serverMain(c *cli.Context) { } // Start server. - err = minhttp.ListenAndServe(apiServer) + err := minhttp.ListenAndServe(apiServer) errorIf(err.Trace(), "Failed to start the minio server.", nil) } diff --git a/server_test.go b/server_test.go index 88a0c3322..1def4cbd5 100644 --- a/server_test.go +++ b/server_test.go @@ -96,12 +96,10 @@ func (s *MyAPISuite) SetUpSuite(c *C) { // Save config. c.Assert(serverConfig.Save(), IsNil) - fs, err := newFS(fsroot) - c.Assert(err, IsNil) - - obj := newObjectLayer(fs) - - apiServer := configureServer(addr, obj) + apiServer := configureServer(serverCmdConfig{ + serverAddr: addr, + exportPaths: []string{fsroot}, + }) testAPIFSCacheServer = httptest.NewServer(apiServer.Handler) } diff --git a/storage-network.go b/storage-network.go deleted file mode 100644 index 25a393973..000000000 --- a/storage-network.go +++ /dev/null @@ -1,178 +0,0 @@ -/* - * Minio Cloud Storage, (C) 2016 Minio, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package main - -import ( - "errors" - "io" - "net" - "net/http" - "net/rpc" - "time" -) - -type networkStorage struct { - address string - connection *rpc.Client - httpClient *http.Client -} - -const ( - connected = "200 Connected to Go RPC" - dialTimeoutSecs = 30 // 30 seconds. -) - -// Initialize new network storage. -func newNetworkStorage(address string) (StorageAPI, error) { - // Dial to the address with timeout of 30secs, this includes DNS resolution. - conn, err := net.DialTimeout("tcp", address, dialTimeoutSecs*time.Second) - if err != nil { - return nil, err - } - - // Initialize rpc client with dialed connection. - rpcClient := rpc.NewClient(conn) - - // Initialize http client. - httpClient := &http.Client{ - // Setting a sensible time out of 2minutes to wait for - // response headers. Request is pro-actively cancelled - // after 2minutes if no response was received from server. - Timeout: 2 * time.Minute, - Transport: http.DefaultTransport, - } - - // Initialize network storage. - ndisk := &networkStorage{ - address: address, - connection: rpcClient, - httpClient: httpClient, - } - - // Returns successfully here. - return ndisk, nil -} - -// MakeVol - make a volume. -func (n networkStorage) MakeVol(volume string) error { - reply := GenericReply{} - return n.connection.Call("Storage.MakeVolHandler", volume, &reply) -} - -// ListVols - List all volumes. -func (n networkStorage) ListVols() (vols []VolInfo, err error) { - ListVols := ListVolsReply{} - err = n.connection.Call("Storage.ListVolsHandler", "", &ListVols) - if err != nil { - return nil, err - } - return ListVols.Vols, nil -} - -// StatVol - get current Stat volume info. -func (n networkStorage) StatVol(volume string) (volInfo VolInfo, err error) { - if err = n.connection.Call("Storage.StatVolHandler", volume, &volInfo); err != nil { - return VolInfo{}, err - } - return volInfo, nil -} - -// DeleteVol - Delete a volume. -func (n networkStorage) DeleteVol(volume string) error { - reply := GenericReply{} - return n.connection.Call("Storage.DeleteVolHandler", volume, &reply) -} - -// File operations. - -// CreateFile - create file. -func (n networkStorage) CreateFile(volume, path string) (writeCloser io.WriteCloser, err error) { - createFileReply := CreateFileReply{} - if err = n.connection.Call("Storage.CreateFileHandler", CreateFileArgs{ - Vol: volume, - Path: path, - }, &createFileReply); err != nil { - return nil, err - } - contentType := "application/octet-stream" - readCloser, writeCloser := io.Pipe() - defer readCloser.Close() - go n.httpClient.Post(createFileReply.URL, contentType, readCloser) - return writeCloser, nil -} - -// StatFile - get latest Stat information for a file at path. -func (n networkStorage) StatFile(volume, path string) (fileInfo FileInfo, err error) { - if err = n.connection.Call("Storage.StatFileHandler", StatFileArgs{ - Vol: volume, - Path: path, - }, &fileInfo); err != nil { - return FileInfo{}, err - } - return fileInfo, nil -} - -// ReadFile - reads a file. -func (n networkStorage) ReadFile(volume string, path string, offset int64) (reader io.ReadCloser, err error) { - readFileReply := ReadFileReply{} - if err = n.connection.Call("Storage.ReadFileHandler", ReadFileArgs{ - Vol: volume, - Path: path, - Offset: offset, - }, &readFileReply); err != nil { - return nil, err - } - resp, err := n.httpClient.Get(readFileReply.URL) - if err != nil { - return nil, err - } - if resp.StatusCode != http.StatusOK { - return nil, errors.New("Invalid response") - } - return resp.Body, nil -} - -// ListFiles - List all files in a volume. -func (n networkStorage) ListFiles(volume, prefix, marker string, recursive bool, count int) (files []FileInfo, eof bool, err error) { - listFilesReply := ListFilesReply{} - if err = n.connection.Call("Storage.ListFilesHandler", ListFilesArgs{ - Vol: volume, - Prefix: prefix, - Marker: marker, - Recursive: recursive, - Count: count, - }, &listFilesReply); err != nil { - return nil, true, err - } - // List of files. - files = listFilesReply.Files - // EOF. - eof = listFilesReply.EOF - return files, eof, nil -} - -// DeleteFile - Delete a file at path. -func (n networkStorage) DeleteFile(volume, path string) (err error) { - reply := GenericReply{} - if err = n.connection.Call("Storage.DeleteFileHandler", DeleteFileArgs{ - Vol: volume, - Path: path, - }, &reply); err != nil { - return err - } - return nil -} diff --git a/storage-rpc-datatypes.go b/storage-rpc-datatypes.go index ee1f1bf7a..e0fc197e5 100644 --- a/storage-rpc-datatypes.go +++ b/storage-rpc-datatypes.go @@ -42,29 +42,6 @@ type ListFilesReply struct { EOF bool } -// ReadFileArgs read file args. -type ReadFileArgs struct { - Vol string - Path string - Offset int64 -} - -// ReadFileReply read file reply. -type ReadFileReply struct { - URL string -} - -// CreateFileArgs create file args. -type CreateFileArgs struct { - Vol string - Path string -} - -// CreateFileReply create file reply. -type CreateFileReply struct { - URL string -} - // StatFileArgs stat file args. type StatFileArgs struct { Vol string diff --git a/storage-rpc-server.go b/storage-rpc-server.go index e81df6b32..760c8b0d9 100644 --- a/storage-rpc-server.go +++ b/storage-rpc-server.go @@ -1,17 +1,13 @@ package main import ( - "fmt" "io" "net/http" "net/rpc" - "net/url" "os" - "path" "strconv" router "github.com/gorilla/mux" - "github.com/minio/minio/pkg/probe" "github.com/minio/minio/pkg/safe" ) @@ -67,32 +63,6 @@ func (s *storageServer) ListFilesHandler(arg *ListFilesArgs, reply *ListFilesRep return nil } -// ReadFileHandler - read file handler is a wrapper to provide -// destination URL for reading files. -func (s *storageServer) ReadFileHandler(arg *ReadFileArgs, reply *ReadFileReply) error { - endpoint := "http://localhost:9000/minio/rpc/storage" // TODO fix this. - newURL, err := url.Parse(fmt.Sprintf("%s/%s", endpoint, path.Join(arg.Vol, arg.Path))) - if err != nil { - return err - } - q := newURL.Query() - q.Set("offset", fmt.Sprintf("%d", arg.Offset)) - newURL.RawQuery = q.Encode() - reply.URL = newURL.String() - return nil -} - -// CreateFileHandler - create file handler is rpc wrapper to create file. -func (s *storageServer) CreateFileHandler(arg *CreateFileArgs, reply *CreateFileReply) error { - endpoint := "http://localhost:9000/minio/rpc/storage" // TODO fix this. - newURL, err := url.Parse(fmt.Sprintf("%s/%s", endpoint, path.Join(arg.Vol, arg.Path))) - if err != nil { - return err - } - reply.URL = newURL.String() - return nil -} - // StatFileHandler - stat file handler is rpc wrapper to stat file. func (s *storageServer) StatFileHandler(arg *StatFileArgs, reply *FileInfo) error { fileInfo, err := s.storage.StatFile(arg.Vol, arg.Path) @@ -108,58 +78,56 @@ func (s *storageServer) DeleteFileHandler(arg *DeleteFileArgs, reply *GenericRep return s.storage.DeleteFile(arg.Vol, arg.Path) } -// StreamUpload - stream upload handler. -func (s *storageServer) StreamUploadHandler(w http.ResponseWriter, r *http.Request) { - vars := router.Vars(r) - volume := vars["volume"] - path := vars["path"] - writeCloser, err := s.storage.CreateFile(volume, path) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - reader := r.Body - if _, err = io.Copy(writeCloser, reader); err != nil { - writeCloser.(*safe.File).CloseAndRemove() - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - writeCloser.Close() -} - -// StreamDownloadHandler - stream download handler. -func (s *storageServer) StreamDownloadHandler(w http.ResponseWriter, r *http.Request) { - vars := router.Vars(r) - volume := vars["volume"] - path := vars["path"] - offset, err := strconv.ParseInt(r.URL.Query().Get("offset"), 10, 64) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - readCloser, err := s.storage.ReadFile(volume, path, offset) - if err != nil { - httpErr := http.StatusBadRequest - if os.IsNotExist(err) { - httpErr = http.StatusNotFound - } - http.Error(w, err.Error(), httpErr) - return - } - io.Copy(w, readCloser) -} - -func registerStorageServer(mux *router.Router, diskPath string) { - // Minio storage routes. - fs, e := newFS(diskPath) - fatalIf(probe.NewError(e), "Unable to initialize storage disk.", nil) - storageRPCServer := rpc.NewServer() +// registerStorageRPCRouter - register storage rpc router. +func registerStorageRPCRouter(mux *router.Router, storageAPI StorageAPI) { stServer := &storageServer{ - storage: fs, + storage: storageAPI, } + storageRPCServer := rpc.NewServer() storageRPCServer.RegisterName("Storage", stServer) storageRouter := mux.NewRoute().PathPrefix(reservedBucket).Subrouter() + // Add minio storage routes. storageRouter.Path("/rpc/storage").Handler(storageRPCServer) - storageRouter.Methods("POST").Path("/rpc/storage/upload/{volume}/{path:.+}").HandlerFunc(stServer.StreamUploadHandler) - storageRouter.Methods("GET").Path("/rpc/storage/download/{volume}/{path:.+}").Queries("offset", "").HandlerFunc(stServer.StreamDownloadHandler) + // StreamUpload - stream upload handler. + storageRouter.Methods("POST").Path("/rpc/storage/upload/{volume}/{path:.+}").HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + vars := router.Vars(r) + volume := vars["volume"] + path := vars["path"] + writeCloser, err := stServer.storage.CreateFile(volume, path) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + reader := r.Body + if _, err = io.Copy(writeCloser, reader); err != nil { + writeCloser.(*safe.File).CloseAndRemove() + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + writeCloser.Close() + reader.Close() + }) + // StreamDownloadHandler - stream download handler. + storageRouter.Methods("GET").Path("/rpc/storage/download/{volume}/{path:.+}").Queries("offset", "{offset:.*}").HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + vars := router.Vars(r) + volume := vars["volume"] + path := vars["path"] + offset, err := strconv.ParseInt(r.URL.Query().Get("offset"), 10, 64) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + readCloser, err := stServer.storage.ReadFile(volume, path, offset) + if err != nil { + httpErr := http.StatusBadRequest + if os.IsNotExist(err) { + httpErr = http.StatusNotFound + } + http.Error(w, err.Error(), httpErr) + return + } + io.Copy(w, readCloser) + w.(http.Flusher).Flush() + readCloser.Close() + }) } diff --git a/web-handlers.go b/web-handlers.go index c4acfc484..d8d87ae6d 100644 --- a/web-handlers.go +++ b/web-handlers.go @@ -71,7 +71,7 @@ type ServerInfoRep struct { } // ServerInfo - get server info. -func (web *webAPI) ServerInfo(r *http.Request, args *WebGenericArgs, reply *ServerInfoRep) error { +func (web *webAPIHandlers) ServerInfo(r *http.Request, args *WebGenericArgs, reply *ServerInfoRep) error { if !isJWTReqAuthenticated(r) { return &json2.Error{Message: "Unauthorized request"} } @@ -106,7 +106,7 @@ type DiskInfoRep struct { } // DiskInfo - get disk statistics. -func (web *webAPI) DiskInfo(r *http.Request, args *WebGenericArgs, reply *DiskInfoRep) error { +func (web *webAPIHandlers) DiskInfo(r *http.Request, args *WebGenericArgs, reply *DiskInfoRep) error { // FIXME: bring in StatFS in StorageAPI interface and uncomment the below lines. // if !isJWTReqAuthenticated(r) { // return &json2.Error{Message: "Unauthorized request"} @@ -126,7 +126,7 @@ type MakeBucketArgs struct { } // MakeBucket - make a bucket. -func (web *webAPI) MakeBucket(r *http.Request, args *MakeBucketArgs, reply *WebGenericRep) error { +func (web *webAPIHandlers) MakeBucket(r *http.Request, args *MakeBucketArgs, reply *WebGenericRep) error { if !isJWTReqAuthenticated(r) { return &json2.Error{Message: "Unauthorized request"} } @@ -153,7 +153,7 @@ type WebBucketInfo struct { } // ListBuckets - list buckets api. -func (web *webAPI) ListBuckets(r *http.Request, args *WebGenericArgs, reply *ListBucketsRep) error { +func (web *webAPIHandlers) ListBuckets(r *http.Request, args *WebGenericArgs, reply *ListBucketsRep) error { if !isJWTReqAuthenticated(r) { return &json2.Error{Message: "Unauthorized request"} } @@ -199,7 +199,7 @@ type WebObjectInfo struct { } // ListObjects - list objects api. -func (web *webAPI) ListObjects(r *http.Request, args *ListObjectsArgs, reply *ListObjectsRep) error { +func (web *webAPIHandlers) ListObjects(r *http.Request, args *ListObjectsArgs, reply *ListObjectsRep) error { marker := "" if !isJWTReqAuthenticated(r) { return &json2.Error{Message: "Unauthorized request"} @@ -238,7 +238,7 @@ type RemoveObjectArgs struct { } // RemoveObject - removes an object. -func (web *webAPI) RemoveObject(r *http.Request, args *RemoveObjectArgs, reply *WebGenericRep) error { +func (web *webAPIHandlers) RemoveObject(r *http.Request, args *RemoveObjectArgs, reply *WebGenericRep) error { if !isJWTReqAuthenticated(r) { return &json2.Error{Message: "Unauthorized request"} } @@ -263,7 +263,7 @@ type LoginRep struct { } // Login - user login handler. -func (web *webAPI) Login(r *http.Request, args *LoginArgs, reply *LoginRep) error { +func (web *webAPIHandlers) Login(r *http.Request, args *LoginArgs, reply *LoginRep) error { jwt := initJWT() if jwt.Authenticate(args.Username, args.Password) { token, err := jwt.GenerateToken(args.Username) @@ -284,7 +284,7 @@ type GenerateAuthReply struct { UIVersion string `json:"uiVersion"` } -func (web webAPI) GenerateAuth(r *http.Request, args *WebGenericArgs, reply *GenerateAuthReply) error { +func (web webAPIHandlers) GenerateAuth(r *http.Request, args *WebGenericArgs, reply *GenerateAuthReply) error { if !isJWTReqAuthenticated(r) { return &json2.Error{Message: "Unauthorized request"} } @@ -308,7 +308,7 @@ type SetAuthReply struct { } // SetAuth - Set accessKey and secretKey credentials. -func (web *webAPI) SetAuth(r *http.Request, args *SetAuthArgs, reply *SetAuthReply) error { +func (web *webAPIHandlers) SetAuth(r *http.Request, args *SetAuthArgs, reply *SetAuthReply) error { if !isJWTReqAuthenticated(r) { return &json2.Error{Message: "Unauthorized request"} } @@ -345,7 +345,7 @@ type GetAuthReply struct { } // GetAuth - return accessKey and secretKey credentials. -func (web *webAPI) GetAuth(r *http.Request, args *WebGenericArgs, reply *GetAuthReply) error { +func (web *webAPIHandlers) GetAuth(r *http.Request, args *WebGenericArgs, reply *GetAuthReply) error { if !isJWTReqAuthenticated(r) { return &json2.Error{Message: "Unauthorized request"} } @@ -357,7 +357,7 @@ func (web *webAPI) GetAuth(r *http.Request, args *WebGenericArgs, reply *GetAuth } // Upload - file upload handler. -func (web *webAPI) Upload(w http.ResponseWriter, r *http.Request) { +func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) { if !isJWTReqAuthenticated(r) { writeWebErrorResponse(w, errInvalidToken) return @@ -371,7 +371,7 @@ func (web *webAPI) Upload(w http.ResponseWriter, r *http.Request) { } // Download - file download handler. -func (web *webAPI) Download(w http.ResponseWriter, r *http.Request) { +func (web *webAPIHandlers) Download(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) bucket := vars["bucket"] object := vars["object"] diff --git a/web-router.go b/web-router.go index 33674a8ad..698b6bd82 100644 --- a/web-router.go +++ b/web-router.go @@ -29,8 +29,8 @@ import ( ) // webAPI container for Web API. -type webAPI struct { - ObjectAPI ObjectAPI +type webAPIHandlers struct { + ObjectAPI *objectAPI } // indexHandler - Handler to serve index.html @@ -58,7 +58,7 @@ func assetFS() *assetfs.AssetFS { const specialAssets = "loader.css|logo.svg|firefox.png|safari.png|chrome.png|favicon.ico" // registerWebRouter - registers web router for serving minio browser. -func registerWebRouter(mux *router.Router, web *webAPI) { +func registerWebRouter(mux *router.Router, web *webAPIHandlers) { // Initialize a new json2 codec. codec := json2.NewCodec() @@ -74,7 +74,7 @@ func registerWebRouter(mux *router.Router, web *webAPI) { // RPC handler at URI - /minio/webrpc webBrowserRouter.Methods("POST").Path("/webrpc").Handler(webRPC) webBrowserRouter.Methods("PUT").Path("/upload/{bucket}/{object:.+}").HandlerFunc(web.Upload) - webBrowserRouter.Methods("GET").Path("/download/{bucket}/{object:.+}").Queries("token", "").HandlerFunc(web.Download) + webBrowserRouter.Methods("GET").Path("/download/{bucket}/{object:.+}").Queries("token", "{token:.*}").HandlerFunc(web.Download) // Add compression for assets. compressedAssets := handlers.CompressHandler(http.StripPrefix(reservedBucket, http.FileServer(assetFS())))