diff --git a/cmd/admin-rpc-server.go b/cmd/admin-rpc-server.go index 25a03b310..f5b3d4855 100644 --- a/cmd/admin-rpc-server.go +++ b/cmd/admin-rpc-server.go @@ -21,7 +21,6 @@ import ( "errors" "fmt" "io/ioutil" - "net/rpc" "os" "path/filepath" "time" @@ -236,7 +235,7 @@ func (s *adminCmd) CommitConfig(cArgs *CommitConfigArgs, cReply *CommitConfigRep // stop and restart commands. func registerAdminRPCRouter(mux *router.Router) error { adminRPCHandler := &adminCmd{} - adminRPCServer := rpc.NewServer() + adminRPCServer := newRPCServer() err := adminRPCServer.RegisterName("Admin", adminRPCHandler) if err != nil { return traceError(err) diff --git a/cmd/browser-rpc-router.go b/cmd/browser-rpc-router.go index d88c5779d..fdf88c856 100644 --- a/cmd/browser-rpc-router.go +++ b/cmd/browser-rpc-router.go @@ -17,8 +17,6 @@ package cmd import ( - "net/rpc" - router "github.com/gorilla/mux" ) @@ -39,7 +37,7 @@ type browserPeerAPIHandlers struct { func registerBrowserPeerRPCRouter(mux *router.Router) error { bpHandlers := &browserPeerAPIHandlers{} - bpRPCServer := rpc.NewServer() + bpRPCServer := newRPCServer() err := bpRPCServer.RegisterName("BrowserPeer", bpHandlers) if err != nil { return traceError(err) diff --git a/cmd/lock-rpc-server.go b/cmd/lock-rpc-server.go index 70c5d77ff..7d14d0eee 100644 --- a/cmd/lock-rpc-server.go +++ b/cmd/lock-rpc-server.go @@ -19,7 +19,6 @@ package cmd import ( "fmt" "math/rand" - "net/rpc" "path" "sync" "time" @@ -99,7 +98,7 @@ func registerDistNSLockRouter(mux *router.Router, endpoints EndpointList) error // registerStorageLockers - register locker rpc handlers for net/rpc library clients func registerStorageLockers(mux *router.Router, lockServers []*lockServer) error { for _, lockServer := range lockServers { - lockRPCServer := rpc.NewServer() + lockRPCServer := newRPCServer() if err := lockRPCServer.RegisterName(lockServiceName, lockServer); err != nil { return traceError(err) } diff --git a/cmd/rpc-server.go b/cmd/rpc-server.go new file mode 100644 index 000000000..3ca020832 --- /dev/null +++ b/cmd/rpc-server.go @@ -0,0 +1,58 @@ +/* + * Minio Cloud Storage, (C) 2017 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "io" + "net/http" + "net/rpc" + + miniohttp "github.com/minio/minio/pkg/http" +) + +// ServeHTTP implements an http.Handler that answers RPC requests, +// hijacks the underlying connection and clears all deadlines if any. +func (server *rpcServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { + if req.Method != http.MethodConnect { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + conn, _, err := w.(http.Hijacker).Hijack() + if err != nil { + errorIf(err, "rpc hijacking failed for: %s", req.RemoteAddr) + return + } + + // Overrides Read/Write deadlines if any. + bufConn, ok := conn.(*miniohttp.BufConn) + if ok { + bufConn.RemoveTimeout() + conn = bufConn + } + + // Can connect to RPC service using HTTP CONNECT to rpcPath. + io.WriteString(conn, "HTTP/1.0 200 Connected to Go RPC\n\n") + server.ServeConn(conn) +} + +type rpcServer struct{ *rpc.Server } + +// Similar to rpc.NewServer() provides a custom ServeHTTP override. +func newRPCServer() *rpcServer { + return &rpcServer{rpc.NewServer()} +} diff --git a/cmd/rpc-server_test.go b/cmd/rpc-server_test.go new file mode 100644 index 000000000..e58c51bfb --- /dev/null +++ b/cmd/rpc-server_test.go @@ -0,0 +1,76 @@ +/* + * Minio Cloud Storage, (C) 2017 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "net/http" + "net/http/httptest" + "testing" + + router "github.com/gorilla/mux" +) + +type ArithArgs struct { + A, B int +} + +type ArithReply struct { + C int +} + +type Arith int + +// Some of Arith's methods have value args, some have pointer args. That's deliberate. + +func (t *Arith) Add(args ArithArgs, reply *ArithReply) error { + reply.C = args.A + args.B + return nil +} + +func TestGoHTTPRPC(t *testing.T) { + newServer := newRPCServer() + newServer.Register(new(Arith)) + + mux := router.NewRouter().SkipClean(true) + mux.Path("/foo").Handler(newServer) + + httpServer := httptest.NewServer(mux) + defer httpServer.Close() + + client := newRPCClient(httpServer.Listener.Addr().String(), "/foo", false) + defer client.Close() + + // Synchronous calls + args := &ArithArgs{7, 8} + reply := new(ArithReply) + if err := client.Call("Arith.Add", args, reply); err != nil { + t.Errorf("Add: expected no error but got string %v", err) + } + + if reply.C != args.A+args.B { + t.Errorf("Add: expected %d got %d", reply.C, args.A+args.B) + } + + resp, err := http.Get(httpServer.URL + "/foo") + if err != nil { + t.Fatal(err) + } + + if resp.StatusCode != http.StatusMethodNotAllowed { + t.Errorf("Expected %d, got %d", http.StatusMethodNotAllowed, resp.StatusCode) + } +} diff --git a/cmd/s3-peer-router.go b/cmd/s3-peer-router.go index 3701bc8ab..aaf243256 100644 --- a/cmd/s3-peer-router.go +++ b/cmd/s3-peer-router.go @@ -17,8 +17,6 @@ package cmd import ( - "net/rpc" - router "github.com/gorilla/mux" ) @@ -39,7 +37,7 @@ func registerS3PeerRPCRouter(mux *router.Router) error { }, } - s3PeerRPCServer := rpc.NewServer() + s3PeerRPCServer := newRPCServer() err := s3PeerRPCServer.RegisterName("S3", s3PeerHandlers) if err != nil { return traceError(err) diff --git a/cmd/storage-rpc-server.go b/cmd/storage-rpc-server.go index ee4d968fb..0f18d709f 100644 --- a/cmd/storage-rpc-server.go +++ b/cmd/storage-rpc-server.go @@ -18,7 +18,6 @@ package cmd import ( "io" - "net/rpc" "path" "time" @@ -217,7 +216,7 @@ func (s *storageServer) RenameFileHandler(args *RenameFileArgs, reply *AuthRPCRe } // Initialize new storage rpc. -func newRPCServer(endpoints EndpointList) (servers []*storageServer, err error) { +func newStorageRPCServer(endpoints EndpointList) (servers []*storageServer, err error) { for _, endpoint := range endpoints { if endpoint.IsLocal { storage, err := newPosix(endpoint.Path) @@ -238,14 +237,14 @@ func newRPCServer(endpoints EndpointList) (servers []*storageServer, err error) // registerStorageRPCRouter - register storage rpc router. func registerStorageRPCRouters(mux *router.Router, endpoints EndpointList) error { // Initialize storage rpc servers for every disk that is hosted on this node. - storageRPCs, err := newRPCServer(endpoints) + storageRPCs, err := newStorageRPCServer(endpoints) if err != nil { return traceError(err) } // Create a unique route for each disk exported from this node. for _, stServer := range storageRPCs { - storageRPCServer := rpc.NewServer() + storageRPCServer := newRPCServer() err = storageRPCServer.RegisterName("Storage", stServer) if err != nil { return traceError(err) diff --git a/pkg/http/bufconn.go b/pkg/http/bufconn.go index fb000c424..cf6679502 100644 --- a/pkg/http/bufconn.go +++ b/pkg/http/bufconn.go @@ -32,6 +32,7 @@ type BufConn struct { updateBytesWrittenFunc func(int) // function to be called to update bytes written. } +// Sets read timeout func (c *BufConn) setReadTimeout() { if c.readTimeout != 0 { c.SetReadDeadline(time.Now().UTC().Add(c.readTimeout)) @@ -44,6 +45,20 @@ func (c *BufConn) setWriteTimeout() { } } +// RemoveTimeout - removes all configured read and write +// timeouts. Used by callers which control net.Conn behavior +// themselves. +func (c *BufConn) RemoveTimeout() { + c.readTimeout = 0 + c.writeTimeout = 0 + // Unset read/write timeouts, since we use **bufio** it is not + // guaranteed that the underlying Peek/Read operation in-fact + // indeed performed a Read() operation on the network. With + // that in mind we need to unset any timeouts currently set to + // avoid any pre-mature timeouts. + c.SetDeadline(time.Time{}) +} + // Peek - returns the next n bytes without advancing the reader. It just wraps bufio.Reader.Peek(). func (c *BufConn) Peek(n int) ([]byte, error) { c.setReadTimeout() @@ -54,7 +69,6 @@ func (c *BufConn) Peek(n int) ([]byte, error) { func (c *BufConn) Read(b []byte) (n int, err error) { c.setReadTimeout() n, err = c.bufReader.Read(b) - if err == nil && c.updateBytesReadFunc != nil { c.updateBytesReadFunc(n) } @@ -66,7 +80,6 @@ func (c *BufConn) Read(b []byte) (n int, err error) { func (c *BufConn) Write(b []byte) (n int, err error) { c.setWriteTimeout() n, err = c.Conn.Write(b) - if err == nil && c.updateBytesWrittenFunc != nil { c.updateBytesWrittenFunc(n) } diff --git a/pkg/http/bufconn_test.go b/pkg/http/bufconn_test.go index 1e55445d2..d3ff749d1 100644 --- a/pkg/http/bufconn_test.go +++ b/pkg/http/bufconn_test.go @@ -79,6 +79,9 @@ func TestBuffConnReadTimeout(t *testing.T) { if terr != nil { t.Fatalf("failed to write to client. %v", terr) } + + // Removes all deadlines if any. + bufconn.RemoveTimeout() }() c, err := net.Dial("tcp", "localhost:"+port)