mirror of
https://github.com/minio/minio.git
synced 2025-11-07 04:42:56 -05:00
control: Fix controller CLI handling with distributed server object layer.
Object layer initialization is done lazily fix it.
This commit is contained in:
@@ -34,7 +34,7 @@ type HealListReply struct {
|
||||
|
||||
// ListObjects - list all objects that needs healing.
|
||||
func (c *controllerAPIHandlers) ListObjectsHeal(arg *HealListArgs, reply *HealListReply) error {
|
||||
objAPI := c.ObjectAPI
|
||||
objAPI := c.ObjectAPI()
|
||||
if objAPI == nil {
|
||||
return errInvalidArgument
|
||||
}
|
||||
@@ -61,7 +61,7 @@ type HealObjectReply struct{}
|
||||
|
||||
// HealObject - heal the object.
|
||||
func (c *controllerAPIHandlers) HealObject(arg *HealObjectArgs, reply *HealObjectReply) error {
|
||||
objAPI := c.ObjectAPI
|
||||
objAPI := c.ObjectAPI()
|
||||
if objAPI == nil {
|
||||
return errInvalidArgument
|
||||
}
|
||||
|
||||
@@ -38,5 +38,5 @@ func registerControlRPCRouter(mux *router.Router, ctrlHandlers *controllerAPIHan
|
||||
|
||||
// Handler for object healing.
|
||||
type controllerAPIHandlers struct {
|
||||
ObjectAPI ObjectLayer
|
||||
ObjectAPI func() ObjectLayer
|
||||
}
|
||||
|
||||
156
cmd/lock-rpc-server.go
Normal file
156
cmd/lock-rpc-server.go
Normal file
@@ -0,0 +1,156 @@
|
||||
/*
|
||||
* Minio Cloud Storage, (C) 2016 Minio, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/rpc"
|
||||
"path"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
router "github.com/gorilla/mux"
|
||||
)
|
||||
|
||||
const lockRPCPath = "/minio/lock"
|
||||
|
||||
type lockServer struct {
|
||||
rpcPath string
|
||||
mutex sync.Mutex
|
||||
// e.g, when a Lock(name) is held, map[string][]bool{"name" : []bool{true}}
|
||||
// when one or more RLock() is held, map[string][]bool{"name" : []bool{false, false}}
|
||||
lockMap map[string][]bool
|
||||
}
|
||||
|
||||
/// Distributed lock handlers
|
||||
|
||||
// LockHandler - rpc handler for lock operation.
|
||||
func (l *lockServer) Lock(name *string, reply *bool) error {
|
||||
l.mutex.Lock()
|
||||
defer l.mutex.Unlock()
|
||||
_, ok := l.lockMap[*name]
|
||||
// No locks held on the given name.
|
||||
if !ok {
|
||||
*reply = true
|
||||
l.lockMap[*name] = []bool{true}
|
||||
return nil
|
||||
}
|
||||
// Either a read or write lock is held on the given name.
|
||||
*reply = false
|
||||
return nil
|
||||
}
|
||||
|
||||
// UnlockHandler - rpc handler for unlock operation.
|
||||
func (l *lockServer) Unlock(name *string, reply *bool) error {
|
||||
l.mutex.Lock()
|
||||
defer l.mutex.Unlock()
|
||||
_, ok := l.lockMap[*name]
|
||||
// No lock is held on the given name, there must be some issue at the lock client side.
|
||||
if !ok {
|
||||
return fmt.Errorf("Unlock attempted on an un-locked entity: %s", *name)
|
||||
}
|
||||
*reply = true
|
||||
delete(l.lockMap, *name)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *lockServer) RLock(name *string, reply *bool) error {
|
||||
l.mutex.Lock()
|
||||
defer l.mutex.Unlock()
|
||||
locksHeld, ok := l.lockMap[*name]
|
||||
// No locks held on the given name.
|
||||
if !ok {
|
||||
// First read-lock to be held on *name.
|
||||
l.lockMap[*name] = []bool{false}
|
||||
*reply = true
|
||||
} else if len(locksHeld) == 1 && locksHeld[0] == true {
|
||||
// A write-lock is held, read lock can't be granted.
|
||||
*reply = false
|
||||
} else {
|
||||
// Add an entry for this read lock.
|
||||
l.lockMap[*name] = append(locksHeld, false)
|
||||
*reply = true
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *lockServer) RUnlock(name *string, reply *bool) error {
|
||||
l.mutex.Lock()
|
||||
defer l.mutex.Unlock()
|
||||
locksHeld, ok := l.lockMap[*name]
|
||||
if !ok {
|
||||
return fmt.Errorf("RUnlock attempted on an un-locked entity: %s", *name)
|
||||
}
|
||||
if len(locksHeld) > 1 {
|
||||
// Remove one of the read locks held.
|
||||
locksHeld = locksHeld[1:]
|
||||
l.lockMap[*name] = locksHeld
|
||||
*reply = true
|
||||
} else {
|
||||
// Delete the map entry since this is the last read lock held
|
||||
// on *name.
|
||||
delete(l.lockMap, *name)
|
||||
*reply = true
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Initialize distributed lock.
|
||||
func initDistributedNSLock(mux *router.Router, serverConfig serverCmdConfig) {
|
||||
lockServers := newLockServers(serverConfig)
|
||||
registerStorageLockers(mux, lockServers)
|
||||
}
|
||||
|
||||
// Create one lock server for every local storage rpc server.
|
||||
func newLockServers(serverConfig serverCmdConfig) (lockServers []*lockServer) {
|
||||
// Initialize posix storage API.
|
||||
exports := serverConfig.disks
|
||||
ignoredExports := serverConfig.ignoredDisks
|
||||
|
||||
// Save ignored disks in a map
|
||||
skipDisks := make(map[string]bool)
|
||||
for _, ignoredExport := range ignoredExports {
|
||||
skipDisks[ignoredExport] = true
|
||||
}
|
||||
for _, export := range exports {
|
||||
if skipDisks[export] {
|
||||
continue
|
||||
}
|
||||
if isLocalStorage(export) {
|
||||
if idx := strings.LastIndex(export, ":"); idx != -1 {
|
||||
export = export[idx+1:]
|
||||
}
|
||||
lockServers = append(lockServers, &lockServer{
|
||||
rpcPath: export,
|
||||
mutex: sync.Mutex{},
|
||||
lockMap: make(map[string][]bool),
|
||||
})
|
||||
}
|
||||
}
|
||||
return lockServers
|
||||
}
|
||||
|
||||
// registerStorageLockers - register locker rpc handlers for net/rpc library clients
|
||||
func registerStorageLockers(mux *router.Router, lockServers []*lockServer) {
|
||||
for _, lockServer := range lockServers {
|
||||
lockRPCServer := rpc.NewServer()
|
||||
lockRPCServer.RegisterName("Dsync", lockServer)
|
||||
lockRouter := mux.PathPrefix(reservedBucket).Subrouter()
|
||||
lockRouter.Path(path.Join("/lock", lockServer.rpcPath)).Handler(lockRPCServer)
|
||||
}
|
||||
}
|
||||
93
cmd/net-rpc-client.go
Normal file
93
cmd/net-rpc-client.go
Normal file
@@ -0,0 +1,93 @@
|
||||
/*
|
||||
* Minio Cloud Storage, (C) 2016 Minio, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"net/rpc"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// RPCClient is a wrapper type for rpc.Client which provides reconnect on first failure.
|
||||
type RPCClient struct {
|
||||
sync.Mutex
|
||||
rpc *rpc.Client
|
||||
node string
|
||||
rpcPath string
|
||||
}
|
||||
|
||||
// newClient constructs a RPCClient object with node and rpcPath initialized.
|
||||
// It _doesn't_ connect to the remote endpoint. See Call method to see when the
|
||||
// connect happens.
|
||||
func newClient(node, rpcPath string) *RPCClient {
|
||||
return &RPCClient{
|
||||
node: node,
|
||||
rpcPath: rpcPath,
|
||||
}
|
||||
}
|
||||
|
||||
// Close closes the underlying socket file descriptor.
|
||||
func (rpcClient *RPCClient) Close() error {
|
||||
rpcClient.Lock()
|
||||
defer rpcClient.Unlock()
|
||||
// If rpc client has not connected yet there is nothing to close.
|
||||
if rpcClient.rpc == nil {
|
||||
return nil
|
||||
}
|
||||
// Reset rpcClient.rpc to allow for subsequent calls to use a new
|
||||
// (socket) connection.
|
||||
clnt := rpcClient.rpc
|
||||
rpcClient.rpc = nil
|
||||
return clnt.Close()
|
||||
}
|
||||
|
||||
// Call makes a RPC call to the remote endpoint using the default codec, namely encoding/gob.
|
||||
func (rpcClient *RPCClient) Call(serviceMethod string, args interface{}, reply interface{}) error {
|
||||
rpcClient.Lock()
|
||||
defer rpcClient.Unlock()
|
||||
// If the rpc.Client is nil, we attempt to (re)connect with the remote endpoint.
|
||||
if rpcClient.rpc == nil {
|
||||
clnt, err := rpc.DialHTTPPath("tcp", rpcClient.node, rpcClient.rpcPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rpcClient.rpc = clnt
|
||||
}
|
||||
|
||||
// If the RPC fails due to a network-related error, then we reset
|
||||
// rpc.Client for a subsequent reconnect.
|
||||
err := rpcClient.rpc.Call(serviceMethod, args, reply)
|
||||
if IsRPCError(err) {
|
||||
rpcClient.rpc = nil
|
||||
}
|
||||
return err
|
||||
|
||||
}
|
||||
|
||||
// IsRPCError returns true if the error value is due to a network related
|
||||
// failure, false otherwise.
|
||||
func IsRPCError(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
// The following are net/rpc specific errors that indicate that
|
||||
// the connection may have been reset. Reset rpcClient.rpc to nil
|
||||
// to trigger a reconnect in future.
|
||||
if err == rpc.ErrShutdown {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
@@ -106,16 +106,23 @@ func configureServerHandler(srvCmdConfig serverCmdConfig) http.Handler {
|
||||
ObjectAPI: newObjectLayerFn,
|
||||
}
|
||||
|
||||
// Initialize Controller.
|
||||
ctrlHandlers := &controllerAPIHandlers{
|
||||
ObjectAPI: newObjectLayerFn,
|
||||
}
|
||||
|
||||
// Initialize router.
|
||||
mux := router.NewRouter()
|
||||
|
||||
// Register all routers.
|
||||
registerStorageRPCRouters(mux, storageRPCs)
|
||||
|
||||
// Initialize distributed NS lock.
|
||||
initDistributedNSLock(mux, srvCmdConfig)
|
||||
|
||||
// FIXME: till net/rpc auth is brought in "minio control" can be enabled only though
|
||||
// this env variable.
|
||||
if os.Getenv("MINIO_CONTROL") != "" {
|
||||
if !strings.EqualFold(os.Getenv("MINIO_CONTROL"), "off") {
|
||||
registerControlRPCRouter(mux, ctrlHandlers)
|
||||
}
|
||||
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package main
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"errors"
|
||||
@@ -88,7 +88,7 @@ func loginRPCClient(rpcClient *RPCClient) (tokenStr string, err error) {
|
||||
}, &reply); err != nil {
|
||||
return "", err
|
||||
}
|
||||
if reply.ServerVersion != minioVersion {
|
||||
if reply.ServerVersion != Version {
|
||||
return "", errors.New("Server version mismatch")
|
||||
}
|
||||
// Reply back server provided token.
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package main
|
||||
package cmd
|
||||
|
||||
// RPCLoginArgs - login username and password for RPC.
|
||||
type RPCLoginArgs struct {
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package main
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
@@ -73,7 +73,7 @@ func (s *storageServer) LoginHandler(args *RPCLoginArgs, reply *RPCLoginReply) e
|
||||
return err
|
||||
}
|
||||
reply.Token = token
|
||||
reply.ServerVersion = minioVersion
|
||||
reply.ServerVersion = Version
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
55
cmd/utils_nix_test.go
Normal file
55
cmd/utils_nix_test.go
Normal file
@@ -0,0 +1,55 @@
|
||||
// +build !windows
|
||||
|
||||
/*
|
||||
* Minio Cloud Storage, (C) 2016 Minio, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"net"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// Test for splitNetPath
|
||||
func TestSplitNetPath(t *testing.T) {
|
||||
testCases := []struct {
|
||||
networkPath string
|
||||
netAddr string
|
||||
netPath string
|
||||
err error
|
||||
}{
|
||||
{"10.1.10.1:", "", "", &net.AddrError{Err: "missing path in network path", Addr: "10.1.10.1:"}},
|
||||
{"10.1.10.1", "", "10.1.10.1", nil},
|
||||
{"10.1.10.1://", "10.1.10.1", "//", nil},
|
||||
{"10.1.10.1:/disk/1", "10.1.10.1", "/disk/1", nil},
|
||||
{"10.1.10.1:\\path\\test", "10.1.10.1", "\\path\\test", nil},
|
||||
}
|
||||
|
||||
for i, test := range testCases {
|
||||
receivedAddr, receivedPath, receivedErr := splitNetPath(test.networkPath)
|
||||
if receivedAddr != test.netAddr {
|
||||
t.Errorf("Test case %d: Expected: %s, Received: %s", i+1, test.netAddr, receivedAddr)
|
||||
}
|
||||
if receivedPath != test.netPath {
|
||||
t.Errorf("Test case %d: Expected: %s, Received: %s", i+1, test.netPath, receivedPath)
|
||||
}
|
||||
if test.err != nil {
|
||||
if receivedErr == nil || receivedErr.Error() != test.err.Error() {
|
||||
t.Errorf("Test case %d: Expected: %v, Received: %v", i+1, test.err, receivedErr)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
55
cmd/utils_windows_test.go
Normal file
55
cmd/utils_windows_test.go
Normal file
@@ -0,0 +1,55 @@
|
||||
// +build windows
|
||||
|
||||
/*
|
||||
* Minio Cloud Storage, (C) 2015 Minio, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"net"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// Test for splitNetPath
|
||||
func TestSplitNetPath(t *testing.T) {
|
||||
testCases := []struct {
|
||||
networkPath string
|
||||
netAddr string
|
||||
netPath string
|
||||
err error
|
||||
}{
|
||||
{"10.1.10.1:C:\\path\\test", "10.1.10.1", "C:\\path\\test", nil},
|
||||
{"10.1.10.1:C:", "10.1.10.1", "C:", nil},
|
||||
{":C:", "", "", &net.AddrError{Err: "missing address in network path", Addr: ":C:"}},
|
||||
{"C:\\path\\test", "", "C:\\path\\test", nil},
|
||||
{"10.1.10.1::C:\\path\\test", "10.1.10.1", ":C:\\path\\test", nil},
|
||||
}
|
||||
|
||||
for i, test := range testCases {
|
||||
receivedAddr, receivedPath, receivedErr := splitNetPath(test.networkPath)
|
||||
if receivedAddr != test.netAddr {
|
||||
t.Errorf("Test case %d: Expected: %s, Received: %s", i+1, test.netAddr, receivedAddr)
|
||||
}
|
||||
if receivedPath != test.netPath {
|
||||
t.Errorf("Test case %d: Expected: %s, Received: %s", i+1, test.netPath, receivedPath)
|
||||
}
|
||||
if test.err != nil {
|
||||
if receivedErr == nil || receivedErr.Error() != test.err.Error() {
|
||||
t.Errorf("Test case %d: Expected: %v, Received: %v", i+1, test.err, receivedErr)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user