From 81bee93b8ddc0c409ac4f88a7e212b6c6bddf250 Mon Sep 17 00:00:00 2001 From: Krishna Srinivas <634494+krishnasrinivas@users.noreply.github.com> Date: Thu, 4 Oct 2018 17:44:06 -0700 Subject: [PATCH] Move remote disk StorageAPI abstraction from RPC to REST (#6464) --- cmd/object-api-common.go | 2 +- cmd/rest/client.go | 108 ++++++ cmd/routers.go | 2 +- cmd/storage-rest-client.go | 343 +++++++++++++++++ cmd/storage-rest-common.go | 52 +++ cmd/storage-rest-server.go | 353 ++++++++++++++++++ ...orage-rpc_test.go => storage-rest_test.go} | 112 +++--- cmd/storage-rpc-client.go | 338 ----------------- cmd/storage-rpc-server.go | 233 ------------ cmd/utils.go | 10 + cmd/utils_test.go | 22 ++ cmd/xl-sets.go | 3 + 12 files changed, 945 insertions(+), 633 deletions(-) create mode 100644 cmd/rest/client.go create mode 100644 cmd/storage-rest-client.go create mode 100644 cmd/storage-rest-common.go create mode 100644 cmd/storage-rest-server.go rename cmd/{storage-rpc_test.go => storage-rest_test.go} (82%) delete mode 100644 cmd/storage-rpc-client.go delete mode 100644 cmd/storage-rpc-server.go diff --git a/cmd/object-api-common.go b/cmd/object-api-common.go index 142fd9739..81265bbda 100644 --- a/cmd/object-api-common.go +++ b/cmd/object-api-common.go @@ -100,7 +100,7 @@ func newStorageAPI(endpoint Endpoint) (storage StorageAPI, err error) { return newPosix(endpoint.Path) } - return newStorageRPC(endpoint), nil + return newStorageRESTClient(endpoint), nil } // Cleanup a directory recursively. diff --git a/cmd/rest/client.go b/cmd/rest/client.go new file mode 100644 index 000000000..35933aa95 --- /dev/null +++ b/cmd/rest/client.go @@ -0,0 +1,108 @@ +/* + * Minio Cloud Storage, (C) 2018 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rest + +import ( + "context" + "crypto/tls" + "errors" + "io" + "io/ioutil" + "net" + "net/http" + "net/url" + "time" + + xhttp "github.com/minio/minio/cmd/http" +) + +// DefaultRESTTimeout - default RPC timeout is one minute. +const DefaultRESTTimeout = 1 * time.Minute + +// Client - http based RPC client. +type Client struct { + httpClient *http.Client + url *url.URL + newAuthToken func() string +} + +// Call - make a REST call. +func (c *Client) Call(method string, values url.Values, body io.Reader) (reply io.ReadCloser, err error) { + req, err := http.NewRequest(http.MethodPost, c.url.String()+"/"+method+"?"+values.Encode(), body) + if err != nil { + return nil, err + } + + req.Header.Set("Authorization", "Bearer "+c.newAuthToken()) + req.Header.Set("X-Minio-Time", time.Now().UTC().Format(time.RFC3339)) + + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusOK { + // Limit the ReadAll(), just in case, because of a bug, the server responds with large data. + r := io.LimitReader(resp.Body, 1024) + b, err := ioutil.ReadAll(r) + if err != nil { + return nil, err + } + return nil, errors.New(string(b)) + } + return resp.Body, nil +} + +func newCustomDialContext(timeout time.Duration) func(ctx context.Context, network, addr string) (net.Conn, error) { + return func(ctx context.Context, network, addr string) (net.Conn, error) { + dialer := &net.Dialer{ + Timeout: timeout, + KeepAlive: timeout, + DualStack: true, + } + + conn, err := dialer.DialContext(ctx, network, addr) + if err != nil { + return nil, err + } + + return xhttp.NewTimeoutConn(conn, timeout, timeout), nil + } +} + +// NewClient - returns new RPC client. +func NewClient(url *url.URL, tlsConfig *tls.Config, timeout time.Duration, newAuthToken func() string) *Client { + return &Client{ + httpClient: &http.Client{ + // Transport is exactly same as Go default in https://golang.org/pkg/net/http/#RoundTripper + // except custom DialContext and TLSClientConfig. + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: newCustomDialContext(timeout), + MaxIdleConnsPerHost: 4096, + MaxIdleConns: 4096, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + TLSClientConfig: tlsConfig, + DisableCompression: true, + }, + }, + url: url, + newAuthToken: newAuthToken, + } +} diff --git a/cmd/routers.go b/cmd/routers.go index 306dad881..381706c7e 100644 --- a/cmd/routers.go +++ b/cmd/routers.go @@ -36,7 +36,7 @@ func newCacheObjectsFn() CacheObjectLayer { // Composed function registering routers for only distributed XL setup. func registerDistXLRouters(router *mux.Router, endpoints EndpointList) { // Register storage rpc router only if its a distributed setup. - registerStorageRPCRouters(router, endpoints) + registerStorageRESTHandlers(router, endpoints) // Register distributed namespace lock. registerDistNSLockRouter(router) diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go new file mode 100644 index 000000000..1063a9d34 --- /dev/null +++ b/cmd/storage-rest-client.go @@ -0,0 +1,343 @@ +/* + * Minio Cloud Storage, (C) 2018 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "bytes" + "crypto/tls" + "io" + "io/ioutil" + "net" + "net/url" + "path" + "strconv" + + "encoding/gob" + "encoding/hex" + + "github.com/minio/minio/cmd/logger" + "github.com/minio/minio/cmd/rest" + xnet "github.com/minio/minio/pkg/net" +) + +func isNetworkDisconnectError(err error) bool { + if err == nil { + return false + } + + if uerr, isURLError := err.(*url.Error); isURLError { + if uerr.Timeout() { + return true + } + + err = uerr.Err + } + + _, isNetOpError := err.(*net.OpError) + return isNetOpError +} + +// Converts rpc.ServerError to underlying error. This function is +// written so that the storageAPI errors are consistent across network +// disks as well. +func toStorageErr(err error) error { + if err == nil { + return nil + } + + if isNetworkDisconnectError(err) { + return errDiskNotFound + } + + switch err.Error() { + case io.EOF.Error(): + return io.EOF + case io.ErrUnexpectedEOF.Error(): + return io.ErrUnexpectedEOF + case errUnexpected.Error(): + return errUnexpected + case errDiskFull.Error(): + return errDiskFull + case errVolumeNotFound.Error(): + return errVolumeNotFound + case errVolumeExists.Error(): + return errVolumeExists + case errFileNotFound.Error(): + return errFileNotFound + case errFileNameTooLong.Error(): + return errFileNameTooLong + case errFileAccessDenied.Error(): + return errFileAccessDenied + case errIsNotRegular.Error(): + return errIsNotRegular + case errVolumeNotEmpty.Error(): + return errVolumeNotEmpty + case errVolumeAccessDenied.Error(): + return errVolumeAccessDenied + case errCorruptedFormat.Error(): + return errCorruptedFormat + case errUnformattedDisk.Error(): + return errUnformattedDisk + case errInvalidAccessKeyID.Error(): + return errInvalidAccessKeyID + case errAuthentication.Error(): + return errAuthentication + case errRPCAPIVersionUnsupported.Error(): + return errRPCAPIVersionUnsupported + case errServerTimeMismatch.Error(): + return errServerTimeMismatch + } + return err +} + +// Abstracts a remote disk. +type storageRESTClient struct { + endpoint Endpoint + restClient *rest.Client + connected bool + lastError error +} + +// Wrapper to restClient.Call to handle network errors, in case of network error the connection is makred disconnected +// permanently. The only way to restore the storage connection is at the xl-sets layer by xlsets.monitorAndConnectEndpoints() +// after verifying format.json +func (client *storageRESTClient) call(method string, values url.Values, body io.Reader) (respBody io.ReadCloser, err error) { + if !client.connected { + return nil, errDiskNotFound + } + respBody, err = client.restClient.Call(method, values, body) + if err == nil { + return respBody, nil + } + client.lastError = err + if isNetworkDisconnectError(err) { + client.connected = false + } + + return nil, toStorageErr(err) +} + +// Stringer provides a canonicalized representation of network device. +func (client *storageRESTClient) String() string { + return client.endpoint.String() +} + +// IsOnline - returns whether RPC client failed to connect or not. +func (client *storageRESTClient) IsOnline() bool { + return client.connected +} + +// LastError - returns the network error if any. +func (client *storageRESTClient) LastError() error { + return client.lastError +} + +// DiskInfo - fetch disk information for a remote disk. +func (client *storageRESTClient) DiskInfo() (info DiskInfo, err error) { + respBody, err := client.call(storageRESTMethodDiskInfo, nil, nil) + if err != nil { + return + } + defer CloseResponse(respBody) + err = gob.NewDecoder(respBody).Decode(&info) + return info, err +} + +// MakeVol - create a volume on a remote disk. +func (client *storageRESTClient) MakeVol(volume string) (err error) { + values := make(url.Values) + values.Set(storageRESTVolume, volume) + respBody, err := client.call(storageRESTMethodMakeVol, values, nil) + defer CloseResponse(respBody) + return err +} + +// ListVols - List all volumes on a remote disk. +func (client *storageRESTClient) ListVols() (volinfo []VolInfo, err error) { + respBody, err := client.call(storageRESTMethodListVols, nil, nil) + if err != nil { + return + } + defer CloseResponse(respBody) + err = gob.NewDecoder(respBody).Decode(&volinfo) + return volinfo, err +} + +// StatVol - get volume info over the network. +func (client *storageRESTClient) StatVol(volume string) (volInfo VolInfo, err error) { + values := make(url.Values) + values.Set(storageRESTVolume, volume) + respBody, err := client.call(storageRESTMethodStatVol, values, nil) + if err != nil { + return + } + defer CloseResponse(respBody) + err = gob.NewDecoder(respBody).Decode(&volInfo) + return volInfo, err +} + +// DeleteVol - Deletes a volume over the network. +func (client *storageRESTClient) DeleteVol(volume string) (err error) { + values := make(url.Values) + values.Set(storageRESTVolume, volume) + respBody, err := client.call(storageRESTMethodDeleteVol, values, nil) + defer CloseResponse(respBody) + return err +} + +// PrepareFile - to fallocate() disk space for a file. +func (client *storageRESTClient) PrepareFile(volume, path string, length int64) error { + values := make(url.Values) + values.Set(storageRESTVolume, volume) + values.Set(storageRESTFilePath, path) + values.Set(storageRESTLength, strconv.Itoa(int(length))) + respBody, err := client.call(storageRESTMethodPrepareFile, values, nil) + defer CloseResponse(respBody) + return err +} + +// AppendFile - append to a file. +func (client *storageRESTClient) AppendFile(volume, path string, buffer []byte) error { + values := make(url.Values) + values.Set(storageRESTVolume, volume) + values.Set(storageRESTFilePath, path) + reader := bytes.NewBuffer(buffer) + respBody, err := client.call(storageRESTMethodAppendFile, values, reader) + defer CloseResponse(respBody) + return err +} + +// StatFile - stat a file. +func (client *storageRESTClient) StatFile(volume, path string) (info FileInfo, err error) { + values := make(url.Values) + values.Set(storageRESTVolume, volume) + values.Set(storageRESTFilePath, path) + respBody, err := client.call(storageRESTMethodStatFile, values, nil) + if err != nil { + return info, err + } + defer CloseResponse(respBody) + err = gob.NewDecoder(respBody).Decode(&info) + return info, err +} + +// ReadAll - reads all contents of a file. +func (client *storageRESTClient) ReadAll(volume, path string) ([]byte, error) { + values := make(url.Values) + values.Set(storageRESTVolume, volume) + values.Set(storageRESTFilePath, path) + respBody, err := client.call(storageRESTMethodReadAll, values, nil) + if err != nil { + return nil, err + } + defer CloseResponse(respBody) + return ioutil.ReadAll(respBody) +} + +// ReadFile - reads section of a file. +func (client *storageRESTClient) ReadFile(volume, path string, offset int64, buffer []byte, verifier *BitrotVerifier) (int64, error) { + values := make(url.Values) + values.Set(storageRESTVolume, volume) + values.Set(storageRESTFilePath, path) + values.Set(storageRESTOffset, strconv.Itoa(int(offset))) + values.Set(storageRESTLength, strconv.Itoa(len(buffer))) + if verifier != nil { + values.Set(storageRESTBitrotAlgo, verifier.algorithm.String()) + values.Set(storageRESTBitrotHash, hex.EncodeToString(verifier.sum)) + } else { + values.Set(storageRESTBitrotAlgo, "") + values.Set(storageRESTBitrotHash, "") + } + respBody, err := client.call(storageRESTMethodReadFile, values, nil) + if err != nil { + return 0, err + } + defer CloseResponse(respBody) + n, err := io.ReadFull(respBody, buffer) + return int64(n), err +} + +// ListDir - lists a directory. +func (client *storageRESTClient) ListDir(volume, dirPath string, count int) (entries []string, err error) { + values := make(url.Values) + values.Set(storageRESTVolume, volume) + values.Set(storageRESTDirPath, dirPath) + values.Set(storageRESTCount, strconv.Itoa(count)) + respBody, err := client.call(storageRESTMethodListDir, values, nil) + if err != nil { + return nil, err + } + defer CloseResponse(respBody) + err = gob.NewDecoder(respBody).Decode(&entries) + return entries, err +} + +// DeleteFile - deletes a file. +func (client *storageRESTClient) DeleteFile(volume, path string) error { + values := make(url.Values) + values.Set(storageRESTVolume, volume) + values.Set(storageRESTFilePath, path) + respBody, err := client.call(storageRESTMethodDeleteFile, values, nil) + defer CloseResponse(respBody) + return err +} + +// RenameFile - renames a file. +func (client *storageRESTClient) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err error) { + values := make(url.Values) + values.Set(storageRESTSrcVolume, srcVolume) + values.Set(storageRESTSrcPath, srcPath) + values.Set(storageRESTDstVolume, dstVolume) + values.Set(storageRESTDstPath, dstPath) + respBody, err := client.call(storageRESTMethodRenameFile, values, nil) + defer CloseResponse(respBody) + return err +} + +// Close - marks the client as closed. +func (client *storageRESTClient) Close() error { + client.connected = false + return nil +} + +// Returns a storage rest client. +func newStorageRESTClient(endpoint Endpoint) *storageRESTClient { + host, err := xnet.ParseHost(endpoint.Host) + logger.FatalIf(err, "Unable to parse storage Host") + + scheme := "http" + if globalIsSSL { + scheme = "https" + } + + serverURL := &url.URL{ + Scheme: scheme, + Host: endpoint.Host, + Path: path.Join(storageRESTPath, endpoint.Path), + } + + var tlsConfig *tls.Config + if globalIsSSL { + tlsConfig = &tls.Config{ + ServerName: host.Name, + RootCAs: globalRootCAs, + } + } + + restClient := rest.NewClient(serverURL, tlsConfig, rest.DefaultRESTTimeout, newAuthToken) + return &storageRESTClient{endpoint: endpoint, restClient: restClient, connected: true} +} diff --git a/cmd/storage-rest-common.go b/cmd/storage-rest-common.go new file mode 100644 index 000000000..6298db7e2 --- /dev/null +++ b/cmd/storage-rest-common.go @@ -0,0 +1,52 @@ +/* + * Minio Cloud Storage, (C) 2018 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +const storageRESTVersion = "v1" +const storageRESTPath = minioReservedBucketPath + "/storage/" + storageRESTVersion + "/" + +const ( + storageRESTMethodDiskInfo = "diskinfo" + storageRESTMethodMakeVol = "makevol" + storageRESTMethodStatVol = "statvol" + storageRESTMethodDeleteVol = "deletevol" + storageRESTMethodListVols = "listvols" + + storageRESTMethodPrepareFile = "preparefile" + storageRESTMethodAppendFile = "appendfile" + storageRESTMethodStatFile = "statfile" + storageRESTMethodReadAll = "readall" + storageRESTMethodReadFile = "readfile" + storageRESTMethodListDir = "listdir" + storageRESTMethodDeleteFile = "deletefile" + storageRESTMethodRenameFile = "renamefile" +) + +const ( + storageRESTVolume = "volume" + storageRESTDirPath = "dir-path" + storageRESTFilePath = "file-path" + storageRESTSrcVolume = "source-volume" + storageRESTSrcPath = "source-path" + storageRESTDstVolume = "destination-volume" + storageRESTDstPath = "destination-path" + storageRESTOffset = "offset" + storageRESTLength = "length" + storageRESTCount = "count" + storageRESTBitrotAlgo = "bitrot-algo" + storageRESTBitrotHash = "bitrot-hash" +) diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go new file mode 100644 index 000000000..bf6ca1723 --- /dev/null +++ b/cmd/storage-rest-server.go @@ -0,0 +1,353 @@ +/* + * Minio Cloud Storage, (C) 2018 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "fmt" + "io" + "path" + "strconv" + + "net/http" + + "encoding/gob" + "encoding/hex" + + "time" + + "github.com/gorilla/mux" + "github.com/minio/minio/cmd/logger" +) + +// To abstract a disk over network. +type storageRESTServer struct { + storage *posix +} + +func (s *storageRESTServer) writeErrorResponse(w http.ResponseWriter, err error) { + w.WriteHeader(http.StatusForbidden) + w.Write([]byte(err.Error())) +} + +// IsValid - To authenticate and verify the time difference. +func (s *storageRESTServer) IsValid(w http.ResponseWriter, r *http.Request) bool { + requestTimeStr := r.Header.Get("X-Minio-Time") + requestTime, err := time.Parse(time.RFC3339, requestTimeStr) + if err != nil { + s.writeErrorResponse(w, err) + return false + } + utcNow := UTCNow() + delta := requestTime.Sub(utcNow) + if delta < 0 { + delta = delta * -1 + } + if delta > DefaultSkewTime { + s.writeErrorResponse(w, fmt.Errorf("client time %v is too apart with server time %v", requestTime, utcNow)) + return false + } + return true +} + +// DiskInfoHandler - returns disk info. +func (s *storageRESTServer) DiskInfoHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + return + } + info, err := s.storage.DiskInfo() + if err != nil { + s.writeErrorResponse(w, err) + return + } + defer w.(http.Flusher).Flush() + gob.NewEncoder(w).Encode(info) +} + +// MakeVolHandler - make a volume. +func (s *storageRESTServer) MakeVolHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + return + } + vars := mux.Vars(r) + volume := vars[storageRESTVolume] + err := s.storage.MakeVol(volume) + if err != nil { + s.writeErrorResponse(w, err) + } +} + +// ListVolsHandler - list volumes. +func (s *storageRESTServer) ListVolsHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + return + } + infos, err := s.storage.ListVols() + if err != nil { + s.writeErrorResponse(w, err) + return + } + defer w.(http.Flusher).Flush() + gob.NewEncoder(w).Encode(&infos) +} + +// StatVolHandler - stat a volume. +func (s *storageRESTServer) StatVolHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + return + } + vars := mux.Vars(r) + volume := vars[storageRESTVolume] + info, err := s.storage.StatVol(volume) + if err != nil { + s.writeErrorResponse(w, err) + return + } + defer w.(http.Flusher).Flush() + gob.NewEncoder(w).Encode(info) +} + +// DeleteVolumeHandler - delete a volume. +func (s *storageRESTServer) DeleteVolHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + return + } + vars := mux.Vars(r) + volume := vars[storageRESTVolume] + err := s.storage.DeleteVol(volume) + if err != nil { + s.writeErrorResponse(w, err) + } +} + +// PrepareFileHandler - fallocate() space for a file. +func (s *storageRESTServer) PrepareFileHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + return + } + vars := mux.Vars(r) + volume := vars[storageRESTVolume] + filePath := vars[storageRESTFilePath] + fileSizeStr := vars[storageRESTLength] + fileSize, err := strconv.Atoi(fileSizeStr) + if err != nil { + s.writeErrorResponse(w, err) + return + } + err = s.storage.PrepareFile(volume, filePath, int64(fileSize)) + if err != nil { + s.writeErrorResponse(w, err) + } +} + +// AppendFileHandler - append to a file. +func (s *storageRESTServer) AppendFileHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + return + } + vars := mux.Vars(r) + volume := vars[storageRESTVolume] + filePath := vars[storageRESTFilePath] + + if r.ContentLength < 0 { + s.writeErrorResponse(w, errInvalidArgument) + return + } + + buf := make([]byte, r.ContentLength) + _, err := io.ReadFull(r.Body, buf) + if err != nil { + s.writeErrorResponse(w, err) + return + } + err = s.storage.AppendFile(volume, filePath, buf) + if err != nil { + s.writeErrorResponse(w, err) + } +} + +// StatFileHandler - stat a file. +func (s *storageRESTServer) StatFileHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + return + } + vars := mux.Vars(r) + volume := vars[storageRESTVolume] + filePath := vars[storageRESTFilePath] + + info, err := s.storage.StatFile(volume, filePath) + if err != nil { + s.writeErrorResponse(w, err) + return + } + defer w.(http.Flusher).Flush() + gob.NewEncoder(w).Encode(info) +} + +// ReadAllHandler - read all the contents of a file. +func (s *storageRESTServer) ReadAllHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + return + } + vars := mux.Vars(r) + volume := vars[storageRESTVolume] + filePath := vars[storageRESTFilePath] + + buf, err := s.storage.ReadAll(volume, filePath) + if err != nil { + s.writeErrorResponse(w, err) + return + } + w.Header().Set("Content-Length", strconv.Itoa(len(buf))) + w.Write(buf) +} + +// ReadFileHandler - read section of a file. +func (s *storageRESTServer) ReadFileHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + return + } + vars := mux.Vars(r) + volume := vars[storageRESTVolume] + filePath := vars[storageRESTFilePath] + offset, err := strconv.Atoi(vars[storageRESTOffset]) + if err != nil { + s.writeErrorResponse(w, err) + return + } + length, err := strconv.Atoi(vars[storageRESTLength]) + if err != nil { + s.writeErrorResponse(w, err) + return + } + if offset < 0 || length < 0 { + s.writeErrorResponse(w, errInvalidArgument) + return + } + var verifier *BitrotVerifier + if vars[storageRESTBitrotAlgo] != "" { + hashStr := vars[storageRESTBitrotHash] + var hash []byte + hash, err = hex.DecodeString(hashStr) + if err != nil { + s.writeErrorResponse(w, err) + return + } + verifier = NewBitrotVerifier(BitrotAlgorithmFromString(vars[storageRESTBitrotAlgo]), hash) + } + buf := make([]byte, length) + _, err = s.storage.ReadFile(volume, filePath, int64(offset), buf, verifier) + if err != nil { + s.writeErrorResponse(w, err) + return + } + w.Header().Set("Content-Length", strconv.Itoa(len(buf))) + w.Write(buf) +} + +// ListDirHandler - list a directory. +func (s *storageRESTServer) ListDirHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + return + } + vars := mux.Vars(r) + volume := vars[storageRESTVolume] + dirPath := vars[storageRESTDirPath] + count, err := strconv.Atoi(vars[storageRESTCount]) + if err != nil { + s.writeErrorResponse(w, err) + return + } + entries, err := s.storage.ListDir(volume, dirPath, count) + if err != nil { + s.writeErrorResponse(w, err) + return + } + defer w.(http.Flusher).Flush() + gob.NewEncoder(w).Encode(&entries) +} + +// DeleteFileHandler - delete a file. +func (s *storageRESTServer) DeleteFileHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + return + } + vars := mux.Vars(r) + volume := vars[storageRESTVolume] + filePath := vars[storageRESTFilePath] + + err := s.storage.DeleteFile(volume, filePath) + if err != nil { + s.writeErrorResponse(w, err) + } +} + +// RenameFileHandler - rename a file. +func (s *storageRESTServer) RenameFileHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + return + } + vars := mux.Vars(r) + srcVolume := vars[storageRESTSrcVolume] + srcFilePath := vars[storageRESTSrcPath] + dstVolume := vars[storageRESTDstVolume] + dstFilePath := vars[storageRESTDstPath] + err := s.storage.RenameFile(srcVolume, srcFilePath, dstVolume, dstFilePath) + if err != nil { + s.writeErrorResponse(w, err) + } +} + +// registerStorageRPCRouter - register storage rpc router. +func registerStorageRESTHandlers(router *mux.Router, endpoints EndpointList) { + for _, endpoint := range endpoints { + if !endpoint.IsLocal { + continue + } + storage, err := newPosix(endpoint.Path) + if err != nil { + logger.Fatal(uiErrUnableToWriteInBackend(err), "Unable to initialize posix backend") + } + + server := &storageRESTServer{storage} + + subrouter := router.PathPrefix(path.Join(storageRESTPath, endpoint.Path)).Subrouter() + + subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodDiskInfo).HandlerFunc(httpTraceHdrs(server.DiskInfoHandler)) + subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodMakeVol).HandlerFunc(httpTraceHdrs(server.MakeVolHandler)).Queries(restQueries(storageRESTVolume)...) + subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodStatVol).HandlerFunc(httpTraceHdrs(server.StatVolHandler)).Queries(restQueries(storageRESTVolume)...) + subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodDeleteVol).HandlerFunc(httpTraceHdrs(server.DeleteVolHandler)).Queries(restQueries(storageRESTVolume)...) + subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodListVols).HandlerFunc(httpTraceHdrs(server.ListVolsHandler)) + + subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodPrepareFile).HandlerFunc(httpTraceHdrs(server.PrepareFileHandler)). + Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTLength)...) + subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodAppendFile).HandlerFunc(httpTraceHdrs(server.AppendFileHandler)). + Queries(restQueries(storageRESTVolume, storageRESTFilePath)...) + subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodStatFile).HandlerFunc(httpTraceHdrs(server.StatFileHandler)). + Queries(restQueries(storageRESTVolume, storageRESTFilePath)...) + subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodReadAll).HandlerFunc(httpTraceHdrs(server.ReadAllHandler)). + Queries(restQueries(storageRESTVolume, storageRESTFilePath)...) + subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodReadFile).HandlerFunc(httpTraceHdrs(server.ReadFileHandler)). + Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTOffset, storageRESTLength, storageRESTBitrotAlgo, storageRESTBitrotHash)...) + subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodListDir).HandlerFunc(httpTraceHdrs(server.ListDirHandler)). + Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTCount)...) + subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodDeleteFile).HandlerFunc(httpTraceHdrs(server.DeleteFileHandler)). + Queries(restQueries(storageRESTVolume, storageRESTFilePath)...) + subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodRenameFile).HandlerFunc(httpTraceHdrs(server.RenameFileHandler)). + Queries(restQueries(storageRESTSrcVolume, storageRESTSrcPath, storageRESTDstVolume, storageRESTDstPath)...) + + } +} diff --git a/cmd/storage-rpc_test.go b/cmd/storage-rest_test.go similarity index 82% rename from cmd/storage-rpc_test.go rename to cmd/storage-rest_test.go index 634595bed..ecc68a658 100644 --- a/cmd/storage-rpc_test.go +++ b/cmd/storage-rest_test.go @@ -18,18 +18,18 @@ package cmd import ( "io/ioutil" - "net/http" "net/http/httptest" "os" "reflect" "testing" + "github.com/gorilla/mux" xnet "github.com/minio/minio/pkg/net" ) /////////////////////////////////////////////////////////////////////////////// // -// Storage RPC server, storageRPCReceiver and StorageRPCClient are +// Storage REST server, storageRESTReceiver and StorageRESTClient are // inter-dependent, below test functions are sufficient to test all of them. // /////////////////////////////////////////////////////////////////////////////// @@ -520,182 +520,174 @@ func testStorageAPIRenameFile(t *testing.T, storage StorageAPI) { } } -func newStorageRPCHTTPServerClient(t *testing.T) (*httptest.Server, *StorageRPCClient, *serverConfig, string) { - endpointPath, err := ioutil.TempDir("", ".TestStorageRPC.") +func newStorageRESTHTTPServerClient(t *testing.T) (*httptest.Server, *storageRESTClient, *serverConfig, string) { + endpointPath, err := ioutil.TempDir("", ".TestStorageREST.") if err != nil { t.Fatalf("unexpected error %v", err) } - rpcServer, err := NewStorageRPCServer(endpointPath) - if err != nil { - t.Fatalf("unexpected error %v", err) - } - - httpServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - rpcServer.ServeHTTP(w, r) - })) + router := mux.NewRouter() + httpServer := httptest.NewServer(router) url, err := xnet.ParseURL(httpServer.URL) if err != nil { t.Fatalf("unexpected error %v", err) } + url.Path = endpointPath - host, err := xnet.ParseHost(url.Host) + endpoint, err := NewEndpoint(url.String()) if err != nil { - t.Fatalf("unexpected error %v", err) + t.Fatalf("NewEndpoint failed %v", endpoint) } + registerStorageRESTHandlers(router, EndpointList{endpoint}) + restClient := newStorageRESTClient(endpoint) + prevGlobalServerConfig := globalServerConfig globalServerConfig = newServerConfig() - rpcClient, err := NewStorageRPCClient(host, endpointPath) - if err != nil { - t.Fatalf("unexpected error %v", err) - } - rpcClient.connected = true - - return httpServer, rpcClient, prevGlobalServerConfig, endpointPath + return httpServer, restClient, prevGlobalServerConfig, endpointPath } -func TestStorageRPCClientDiskInfo(t *testing.T) { - httpServer, rpcClient, prevGlobalServerConfig, endpointPath := newStorageRPCHTTPServerClient(t) +func TestStorageRESTClientDiskInfo(t *testing.T) { + httpServer, restClient, prevGlobalServerConfig, endpointPath := newStorageRESTHTTPServerClient(t) defer httpServer.Close() defer func() { globalServerConfig = prevGlobalServerConfig }() defer os.RemoveAll(endpointPath) - testStorageAPIDiskInfo(t, rpcClient) + testStorageAPIDiskInfo(t, restClient) } -func TestStorageRPCClientMakeVol(t *testing.T) { - httpServer, rpcClient, prevGlobalServerConfig, endpointPath := newStorageRPCHTTPServerClient(t) +func TestStorageRESTClientMakeVol(t *testing.T) { + httpServer, restClient, prevGlobalServerConfig, endpointPath := newStorageRESTHTTPServerClient(t) defer httpServer.Close() defer func() { globalServerConfig = prevGlobalServerConfig }() defer os.RemoveAll(endpointPath) - testStorageAPIMakeVol(t, rpcClient) + testStorageAPIMakeVol(t, restClient) } -func TestStorageRPCClientListVols(t *testing.T) { - httpServer, rpcClient, prevGlobalServerConfig, endpointPath := newStorageRPCHTTPServerClient(t) +func TestStorageRESTClientListVols(t *testing.T) { + httpServer, restClient, prevGlobalServerConfig, endpointPath := newStorageRESTHTTPServerClient(t) defer httpServer.Close() defer func() { globalServerConfig = prevGlobalServerConfig }() defer os.RemoveAll(endpointPath) - testStorageAPIListVols(t, rpcClient) + testStorageAPIListVols(t, restClient) } -func TestStorageRPCClientStatVol(t *testing.T) { - httpServer, rpcClient, prevGlobalServerConfig, endpointPath := newStorageRPCHTTPServerClient(t) +func TestStorageRESTClientStatVol(t *testing.T) { + httpServer, restClient, prevGlobalServerConfig, endpointPath := newStorageRESTHTTPServerClient(t) defer httpServer.Close() defer func() { globalServerConfig = prevGlobalServerConfig }() defer os.RemoveAll(endpointPath) - testStorageAPIStatVol(t, rpcClient) + testStorageAPIStatVol(t, restClient) } -func TestStorageRPCClientDeleteVol(t *testing.T) { - httpServer, rpcClient, prevGlobalServerConfig, endpointPath := newStorageRPCHTTPServerClient(t) +func TestStorageRESTClientDeleteVol(t *testing.T) { + httpServer, restClient, prevGlobalServerConfig, endpointPath := newStorageRESTHTTPServerClient(t) defer httpServer.Close() defer func() { globalServerConfig = prevGlobalServerConfig }() defer os.RemoveAll(endpointPath) - testStorageAPIDeleteVol(t, rpcClient) + testStorageAPIDeleteVol(t, restClient) } -func TestStorageRPCClientStatFile(t *testing.T) { - httpServer, rpcClient, prevGlobalServerConfig, endpointPath := newStorageRPCHTTPServerClient(t) +func TestStorageRESTClientStatFile(t *testing.T) { + httpServer, restClient, prevGlobalServerConfig, endpointPath := newStorageRESTHTTPServerClient(t) defer httpServer.Close() defer func() { globalServerConfig = prevGlobalServerConfig }() defer os.RemoveAll(endpointPath) - testStorageAPIStatFile(t, rpcClient) + testStorageAPIStatFile(t, restClient) } -func TestStorageRPCClientListDir(t *testing.T) { - httpServer, rpcClient, prevGlobalServerConfig, endpointPath := newStorageRPCHTTPServerClient(t) +func TestStorageRESTClientListDir(t *testing.T) { + httpServer, restClient, prevGlobalServerConfig, endpointPath := newStorageRESTHTTPServerClient(t) defer httpServer.Close() defer func() { globalServerConfig = prevGlobalServerConfig }() defer os.RemoveAll(endpointPath) - testStorageAPIListDir(t, rpcClient) + testStorageAPIListDir(t, restClient) } -func TestStorageRPCClientReadAll(t *testing.T) { - httpServer, rpcClient, prevGlobalServerConfig, endpointPath := newStorageRPCHTTPServerClient(t) +func TestStorageRESTClientReadAll(t *testing.T) { + httpServer, restClient, prevGlobalServerConfig, endpointPath := newStorageRESTHTTPServerClient(t) defer httpServer.Close() defer func() { globalServerConfig = prevGlobalServerConfig }() defer os.RemoveAll(endpointPath) - testStorageAPIReadAll(t, rpcClient) + testStorageAPIReadAll(t, restClient) } -func TestStorageRPCClientReadFile(t *testing.T) { - httpServer, rpcClient, prevGlobalServerConfig, endpointPath := newStorageRPCHTTPServerClient(t) +func TestStorageRESTClientReadFile(t *testing.T) { + httpServer, restClient, prevGlobalServerConfig, endpointPath := newStorageRESTHTTPServerClient(t) defer httpServer.Close() defer func() { globalServerConfig = prevGlobalServerConfig }() defer os.RemoveAll(endpointPath) - testStorageAPIReadFile(t, rpcClient) + testStorageAPIReadFile(t, restClient) } -func TestStorageRPCClientPrepareFile(t *testing.T) { - httpServer, rpcClient, prevGlobalServerConfig, endpointPath := newStorageRPCHTTPServerClient(t) +func TestStorageRESTClientPrepareFile(t *testing.T) { + httpServer, restClient, prevGlobalServerConfig, endpointPath := newStorageRESTHTTPServerClient(t) defer httpServer.Close() defer func() { globalServerConfig = prevGlobalServerConfig }() defer os.RemoveAll(endpointPath) - testStorageAPIPrepareFile(t, rpcClient) + testStorageAPIPrepareFile(t, restClient) } -func TestStorageRPCClientAppendFile(t *testing.T) { - httpServer, rpcClient, prevGlobalServerConfig, endpointPath := newStorageRPCHTTPServerClient(t) +func TestStorageRESTClientAppendFile(t *testing.T) { + httpServer, restClient, prevGlobalServerConfig, endpointPath := newStorageRESTHTTPServerClient(t) defer httpServer.Close() defer func() { globalServerConfig = prevGlobalServerConfig }() defer os.RemoveAll(endpointPath) - testStorageAPIAppendFile(t, rpcClient) + testStorageAPIAppendFile(t, restClient) } -func TestStorageRPCClientDeleteFile(t *testing.T) { - httpServer, rpcClient, prevGlobalServerConfig, endpointPath := newStorageRPCHTTPServerClient(t) +func TestStorageRESTClientDeleteFile(t *testing.T) { + httpServer, restClient, prevGlobalServerConfig, endpointPath := newStorageRESTHTTPServerClient(t) defer httpServer.Close() defer func() { globalServerConfig = prevGlobalServerConfig }() defer os.RemoveAll(endpointPath) - testStorageAPIDeleteFile(t, rpcClient) + testStorageAPIDeleteFile(t, restClient) } -func TestStorageRPCClientRenameFile(t *testing.T) { - httpServer, rpcClient, prevGlobalServerConfig, endpointPath := newStorageRPCHTTPServerClient(t) +func TestStorageRESTClientRenameFile(t *testing.T) { + httpServer, restClient, prevGlobalServerConfig, endpointPath := newStorageRESTHTTPServerClient(t) defer httpServer.Close() defer func() { globalServerConfig = prevGlobalServerConfig }() defer os.RemoveAll(endpointPath) - testStorageAPIRenameFile(t, rpcClient) + testStorageAPIRenameFile(t, restClient) } diff --git a/cmd/storage-rpc-client.go b/cmd/storage-rpc-client.go deleted file mode 100644 index bada5f8b7..000000000 --- a/cmd/storage-rpc-client.go +++ /dev/null @@ -1,338 +0,0 @@ -/* - * Minio Cloud Storage, (C) 2016 Minio, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cmd - -import ( - "bytes" - "crypto/tls" - "io" - "net" - "net/url" - "path" - "strings" - - "github.com/minio/minio/cmd/logger" - xnet "github.com/minio/minio/pkg/net" -) - -func isNetworkDisconnectError(err error) bool { - if err == nil { - return false - } - - if uerr, isURLError := err.(*url.Error); isURLError { - if uerr.Timeout() { - return true - } - - err = uerr.Err - } - - _, isNetOpError := err.(*net.OpError) - return isNetOpError -} - -// Converts rpc.ServerError to underlying error. This function is -// written so that the storageAPI errors are consistent across network -// disks as well. -func toStorageErr(err error) error { - if err == nil { - return nil - } - - if isNetworkDisconnectError(err) { - return errDiskNotFound - } - - switch err.Error() { - case io.EOF.Error(): - return io.EOF - case io.ErrUnexpectedEOF.Error(): - return io.ErrUnexpectedEOF - case errUnexpected.Error(): - return errUnexpected - case errDiskFull.Error(): - return errDiskFull - case errVolumeNotFound.Error(): - return errVolumeNotFound - case errVolumeExists.Error(): - return errVolumeExists - case errFileNotFound.Error(): - return errFileNotFound - case errFileNameTooLong.Error(): - return errFileNameTooLong - case errFileAccessDenied.Error(): - return errFileAccessDenied - case errIsNotRegular.Error(): - return errIsNotRegular - case errVolumeNotEmpty.Error(): - return errVolumeNotEmpty - case errVolumeAccessDenied.Error(): - return errVolumeAccessDenied - case errCorruptedFormat.Error(): - return errCorruptedFormat - case errUnformattedDisk.Error(): - return errUnformattedDisk - case errInvalidAccessKeyID.Error(): - return errInvalidAccessKeyID - case errAuthentication.Error(): - return errAuthentication - case errRPCAPIVersionUnsupported.Error(): - return errRPCAPIVersionUnsupported - case errServerTimeMismatch.Error(): - return errServerTimeMismatch - } - return err -} - -// StorageRPCClient - storage RPC client. -type StorageRPCClient struct { - *RPCClient - connected bool - // Plain error of the last RPC call - lastRPCError error -} - -// Stringer provides a canonicalized representation of network device. -func (client *StorageRPCClient) String() string { - url := client.ServiceURL() - // Remove the storage RPC path prefix, internal paths are meaningless. why? - url.Path = strings.TrimPrefix(url.Path, storageServicePath) - return url.String() -} - -// LastError - returns the last RPC call result, nil or error if any -func (client *StorageRPCClient) LastError() error { - return client.lastRPCError -} - -// Close - closes underneath RPC client. -func (client *StorageRPCClient) Close() error { - client.connected = false - return toStorageErr(client.RPCClient.Close()) -} - -// IsOnline - returns whether RPC client failed to connect or not. -func (client *StorageRPCClient) IsOnline() bool { - return client.connected -} - -func (client *StorageRPCClient) connect() { - err := client.Call(storageServiceName+".Connect", &AuthArgs{}, &VoidReply{}) - client.lastRPCError = err - client.connected = err == nil -} - -func (client *StorageRPCClient) call(handler string, args interface { - SetAuthArgs(args AuthArgs) -}, reply interface{}) error { - - if !client.connected { - return errDiskNotFound - } - - err := client.Call(handler, args, reply) - client.lastRPCError = err - if err == nil { - return nil - } - - if isNetworkDisconnectError(err) { - client.connected = false - } - - return toStorageErr(err) -} - -// DiskInfo - fetch disk information for a remote disk. -func (client *StorageRPCClient) DiskInfo() (info DiskInfo, err error) { - err = client.call(storageServiceName+".DiskInfo", &AuthArgs{}, &info) - return info, err -} - -// MakeVol - create a volume on a remote disk. -func (client *StorageRPCClient) MakeVol(volume string) (err error) { - return client.call(storageServiceName+".MakeVol", &VolArgs{Vol: volume}, &VoidReply{}) -} - -// ListVols - List all volumes on a remote disk. -func (client *StorageRPCClient) ListVols() ([]VolInfo, error) { - var reply []VolInfo - err := client.call(storageServiceName+".ListVols", &AuthArgs{}, &reply) - return reply, err -} - -// StatVol - get volume info over the network. -func (client *StorageRPCClient) StatVol(volume string) (volInfo VolInfo, err error) { - err = client.call(storageServiceName+".StatVol", &VolArgs{Vol: volume}, &volInfo) - return volInfo, err -} - -// DeleteVol - Deletes a volume over the network. -func (client *StorageRPCClient) DeleteVol(volume string) (err error) { - return client.call(storageServiceName+".DeleteVol", &VolArgs{Vol: volume}, &VoidReply{}) -} - -// File operations. - -// PrepareFile - calls PrepareFile RPC. -func (client *StorageRPCClient) PrepareFile(volume, path string, length int64) (err error) { - args := PrepareFileArgs{ - Vol: volume, - Path: path, - Size: length, - } - reply := VoidReply{} - - return client.call(storageServiceName+".PrepareFile", &args, &reply) -} - -// AppendFile - append file writes buffer to a remote network path. -func (client *StorageRPCClient) AppendFile(volume, path string, buffer []byte) (err error) { - args := AppendFileArgs{ - Vol: volume, - Path: path, - Buffer: buffer, - } - reply := VoidReply{} - - return client.call(storageServiceName+".AppendFile", &args, &reply) -} - -// StatFile - get latest Stat information for a file at path. -func (client *StorageRPCClient) StatFile(volume, path string) (fileInfo FileInfo, err error) { - err = client.call(storageServiceName+".StatFile", &StatFileArgs{Vol: volume, Path: path}, &fileInfo) - return fileInfo, err -} - -// ReadAll - reads entire contents of the file at path until EOF, returns the -// contents in a byte slice. Returns buf == nil if err != nil. -// This API is meant to be used on files which have small memory footprint, do -// not use this on large files as it would cause server to crash. -func (client *StorageRPCClient) ReadAll(volume, path string) (buf []byte, err error) { - err = client.call(storageServiceName+".ReadAll", &ReadAllArgs{Vol: volume, Path: path}, &buf) - return buf, err -} - -// ReadFile - reads a file at remote path and fills the buffer. -func (client *StorageRPCClient) ReadFile(volume string, path string, offset int64, buffer []byte, verifier *BitrotVerifier) (m int64, err error) { - // Recover from any panic and return error. - defer func() { - if r := recover(); r != nil { - err = bytes.ErrTooLarge - } - }() - - args := ReadFileArgs{ - Vol: volume, - Path: path, - Offset: offset, - Length: int64(len(buffer)), - Verified: verifier == nil, // Marked accordingly if verifier is set or not. - } - if verifier != nil { - args.Algo = verifier.algorithm - args.ExpectedHash = verifier.sum - } - var reply []byte - - err = client.call(storageServiceName+".ReadFile", &args, &reply) - - // Copy reply to buffer. - copy(buffer, reply) - - // Return length of result, err if any. - return int64(len(reply)), err -} - -// ListDir - list all entries at prefix. -func (client *StorageRPCClient) ListDir(volume, path string, count int) (entries []string, err error) { - err = client.call(storageServiceName+".ListDir", &ListDirArgs{Vol: volume, Path: path, Count: count}, &entries) - return entries, err -} - -// DeleteFile - Delete a file at path. -func (client *StorageRPCClient) DeleteFile(volume, path string) (err error) { - args := DeleteFileArgs{ - Vol: volume, - Path: path, - } - reply := VoidReply{} - - return client.call(storageServiceName+".DeleteFile", &args, &reply) -} - -// RenameFile - rename a remote file from source to destination. -func (client *StorageRPCClient) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err error) { - args := RenameFileArgs{ - SrcVol: srcVolume, - SrcPath: srcPath, - DstVol: dstVolume, - DstPath: dstPath, - } - reply := VoidReply{} - - return client.call(storageServiceName+".RenameFile", &args, &reply) -} - -// NewStorageRPCClient - returns new storage RPC client. -func NewStorageRPCClient(host *xnet.Host, endpointPath string) (*StorageRPCClient, error) { - scheme := "http" - if globalIsSSL { - scheme = "https" - } - - serviceURL := &xnet.URL{ - Scheme: scheme, - Host: host.String(), - Path: path.Join(storageServicePath, endpointPath), - } - - var tlsConfig *tls.Config - if globalIsSSL { - tlsConfig = &tls.Config{ - ServerName: host.Name, - RootCAs: globalRootCAs, - } - } - - rpcClient, err := NewRPCClient( - RPCClientArgs{ - NewAuthTokenFunc: newAuthToken, - RPCVersion: globalRPCAPIVersion, - ServiceName: storageServiceName, - ServiceURL: serviceURL, - TLSConfig: tlsConfig, - }, - ) - if err != nil { - return nil, err - } - - return &StorageRPCClient{RPCClient: rpcClient}, nil -} - -// Initialize new storage rpc client. -func newStorageRPC(endpoint Endpoint) *StorageRPCClient { - host, err := xnet.ParseHost(endpoint.Host) - logger.FatalIf(err, "Unable to parse storage RPC Host") - rpcClient, err := NewStorageRPCClient(host, endpoint.Path) - logger.FatalIf(err, "Unable to initialize storage RPC client") - // Attempt first try connection and save error if any. - rpcClient.connect() - return rpcClient -} diff --git a/cmd/storage-rpc-server.go b/cmd/storage-rpc-server.go deleted file mode 100644 index e3cf9b6b4..000000000 --- a/cmd/storage-rpc-server.go +++ /dev/null @@ -1,233 +0,0 @@ -/* - * Minio Cloud Storage, (C) 2016 Minio, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cmd - -import ( - "io" - "path" - - "github.com/gorilla/mux" - "github.com/minio/minio/cmd/logger" - xrpc "github.com/minio/minio/cmd/rpc" -) - -const storageServiceName = "Storage" -const storageServiceSubPath = "/storage" - -var storageServicePath = path.Join(minioReservedBucketPath, storageServiceSubPath) - -// storageRPCReceiver - Storage RPC receiver for storage RPC server -type storageRPCReceiver struct { - local *posix -} - -// VolArgs - generic volume args. -type VolArgs struct { - AuthArgs - Vol string -} - -/// Storage operations handlers. - -// Connect - authenticates remote connection. -func (receiver *storageRPCReceiver) Connect(args *AuthArgs, reply *VoidReply) (err error) { - return args.Authenticate() -} - -// DiskInfo - disk info handler is rpc wrapper for DiskInfo operation. -func (receiver *storageRPCReceiver) DiskInfo(args *AuthArgs, reply *DiskInfo) (err error) { - *reply, err = receiver.local.DiskInfo() - return err -} - -/// Volume operations handlers. - -// MakeVol - make vol handler is rpc wrapper for MakeVol operation. -func (receiver *storageRPCReceiver) MakeVol(args *VolArgs, reply *VoidReply) error { - return receiver.local.MakeVol(args.Vol) -} - -// ListVols - list vols handler is rpc wrapper for ListVols operation. -func (receiver *storageRPCReceiver) ListVols(args *AuthArgs, reply *[]VolInfo) (err error) { - *reply, err = receiver.local.ListVols() - return err -} - -// StatVol - stat vol handler is a rpc wrapper for StatVol operation. -func (receiver *storageRPCReceiver) StatVol(args *VolArgs, reply *VolInfo) (err error) { - *reply, err = receiver.local.StatVol(args.Vol) - return err -} - -// DeleteVol - delete vol handler is a rpc wrapper for -// DeleteVol operation. -func (receiver *storageRPCReceiver) DeleteVol(args *VolArgs, reply *VoidReply) error { - return receiver.local.DeleteVol(args.Vol) -} - -/// File operations - -// StatFileArgs represents stat file RPC arguments. -type StatFileArgs struct { - AuthArgs - Vol string - Path string -} - -// StatFile - stat file handler is rpc wrapper to stat file. -func (receiver *storageRPCReceiver) StatFile(args *StatFileArgs, reply *FileInfo) (err error) { - *reply, err = receiver.local.StatFile(args.Vol, args.Path) - return err -} - -// ListDirArgs represents list contents RPC arguments. -type ListDirArgs struct { - AuthArgs - Vol string - Path string - Count int -} - -// ListDir - list directory handler is rpc wrapper to list dir. -func (receiver *storageRPCReceiver) ListDir(args *ListDirArgs, reply *[]string) (err error) { - *reply, err = receiver.local.ListDir(args.Vol, args.Path, args.Count) - return err -} - -// ReadAllArgs represents read all RPC arguments. -type ReadAllArgs struct { - AuthArgs - Vol string - Path string -} - -// ReadAll - read all handler is rpc wrapper to read all storage API. -func (receiver *storageRPCReceiver) ReadAll(args *ReadAllArgs, reply *[]byte) (err error) { - *reply, err = receiver.local.ReadAll(args.Vol, args.Path) - return err -} - -// ReadFileArgs represents read file RPC arguments. -type ReadFileArgs struct { - AuthArgs - Vol string - Path string - Offset int64 - Length int64 - Algo BitrotAlgorithm - ExpectedHash []byte - Verified bool -} - -// ReadFile - read file handler is rpc wrapper to read file. -func (receiver *storageRPCReceiver) ReadFile(args *ReadFileArgs, reply *[]byte) error { - var verifier *BitrotVerifier - if !args.Verified { - verifier = NewBitrotVerifier(args.Algo, args.ExpectedHash) - } - - buf := make([]byte, args.Length) - n, err := receiver.local.ReadFile(args.Vol, args.Path, args.Offset, buf, verifier) - // Ignore io.ErrEnexpectedEOF for short reads i.e. less content available than requested. - if err == io.ErrUnexpectedEOF { - err = nil - } - - *reply = buf[0:n] - return err -} - -// PrepareFileArgs represents append file RPC arguments. -type PrepareFileArgs struct { - AuthArgs - Vol string - Path string - Size int64 -} - -// PrepareFile - prepare file handler is rpc wrapper to prepare file. -func (receiver *storageRPCReceiver) PrepareFile(args *PrepareFileArgs, reply *VoidReply) error { - return receiver.local.PrepareFile(args.Vol, args.Path, args.Size) -} - -// AppendFileArgs represents append file RPC arguments. -type AppendFileArgs struct { - AuthArgs - Vol string - Path string - Buffer []byte -} - -// AppendFile - append file handler is rpc wrapper to append file. -func (receiver *storageRPCReceiver) AppendFile(args *AppendFileArgs, reply *VoidReply) error { - return receiver.local.AppendFile(args.Vol, args.Path, args.Buffer) -} - -// DeleteFileArgs represents delete file RPC arguments. -type DeleteFileArgs struct { - AuthArgs - Vol string - Path string -} - -// DeleteFile - delete file handler is rpc wrapper to delete file. -func (receiver *storageRPCReceiver) DeleteFile(args *DeleteFileArgs, reply *VoidReply) error { - return receiver.local.DeleteFile(args.Vol, args.Path) -} - -// RenameFileArgs represents rename file RPC arguments. -type RenameFileArgs struct { - AuthArgs - SrcVol string - SrcPath string - DstVol string - DstPath string -} - -// RenameFile - rename file handler is rpc wrapper to rename file. -func (receiver *storageRPCReceiver) RenameFile(args *RenameFileArgs, reply *VoidReply) error { - return receiver.local.RenameFile(args.SrcVol, args.SrcPath, args.DstVol, args.DstPath) -} - -// NewStorageRPCServer - returns new storage RPC server. -func NewStorageRPCServer(endpointPath string) (*xrpc.Server, error) { - storage, err := newPosix(endpointPath) - if err != nil { - return nil, err - } - - rpcServer := xrpc.NewServer() - if err = rpcServer.RegisterName(storageServiceName, &storageRPCReceiver{storage}); err != nil { - return nil, err - } - - return rpcServer, nil -} - -// registerStorageRPCRouter - register storage rpc router. -func registerStorageRPCRouters(router *mux.Router, endpoints EndpointList) { - for _, endpoint := range endpoints { - if endpoint.IsLocal { - rpcServer, err := NewStorageRPCServer(endpoint.Path) - if err != nil { - logger.Fatal(uiErrUnableToWriteInBackend(err), "Unable to configure one of server's RPC services") - } - subrouter := router.PathPrefix(minioReservedBucketPath).Subrouter() - subrouter.Path(path.Join(storageServiceSubPath, endpoint.Path)).HandlerFunc(httpTraceHdrs(rpcServer.ServeHTTP)) - } - } -} diff --git a/cmd/utils.go b/cmd/utils.go index 726b7113a..33f38bb53 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -458,3 +458,13 @@ func CloseResponse(respBody io.ReadCloser) { respBody.Close() } } + +// Used for registering with rest handlers (have a look at registerStorageRESTHandlers for usage example) +// If it is passed ["aaaa", "bbbb"], it returns ["aaaa", "{aaaa:.*}", "bbbb", "{bbbb:.*}"] +func restQueries(keys ...string) []string { + var accumulator []string + for _, key := range keys { + accumulator = append(accumulator, key, "{"+key+":.*}") + } + return accumulator +} diff --git a/cmd/utils_test.go b/cmd/utils_test.go index 5c889cdd4..60258cc97 100644 --- a/cmd/utils_test.go +++ b/cmd/utils_test.go @@ -470,3 +470,25 @@ func TestIsErrIgnored(t *testing.T) { } } } + +// Test queries() +func TestQueries(t *testing.T) { + var testCases = []struct { + keys []string + keyvalues []string + }{ + { + []string{"aaaa", "bbbb"}, + []string{"aaaa", "{aaaa:.*}", "bbbb", "{bbbb:.*}"}, + }, + } + + for i, test := range testCases { + keyvalues := restQueries(test.keys...) + for j := range keyvalues { + if keyvalues[j] != test.keyvalues[j] { + t.Fatalf("test %d: keyvalues[%d] does not match", i+1, j) + } + } + } +} diff --git a/cmd/xl-sets.go b/cmd/xl-sets.go index da1deff88..ceb0eaf8f 100644 --- a/cmd/xl-sets.go +++ b/cmd/xl-sets.go @@ -194,6 +194,9 @@ func (s *xlSets) connectDisksWithQuorum() { s.xlDisks[i][j] = disk onlineDisks++ } + // Sleep for a while - so that we don't go into + // 100% CPU when half the disks are online. + time.Sleep(500 * time.Millisecond) } }