diff --git a/cmd/lock-rpc-server.go b/cmd/lock-rpc-server.go index 2e07ba2c1..d80aeb998 100644 --- a/cmd/lock-rpc-server.go +++ b/cmd/lock-rpc-server.go @@ -86,7 +86,7 @@ func (l *lockServer) LoginHandler(args *RPCLoginArgs, reply *RPCLoginReply) erro return nil } -// LockHandler - rpc handler for lock operation. +// Lock - rpc handler for (single) write lock operation. func (l *lockServer) Lock(args *LockArgs, reply *bool) error { l.mutex.Lock() defer l.mutex.Unlock() @@ -98,30 +98,36 @@ func (l *lockServer) Lock(args *LockArgs, reply *bool) error { if !ok { *reply = true l.lockMap[args.Name] = []bool{true} - return nil + } else { + // Either a read or write lock is held on the given name. + *reply = false } - // Either a read or write lock is held on the given name. - *reply = false return nil } -// UnlockHandler - rpc handler for unlock operation. +// Unlock - rpc handler for (single) write unlock operation. func (l *lockServer) Unlock(args *LockArgs, reply *bool) error { l.mutex.Lock() defer l.mutex.Unlock() if err := l.verifyArgs(args); err != nil { return err } - _, ok := l.lockMap[args.Name] + locksHeld, ok := l.lockMap[args.Name] // No lock is held on the given name, there must be some issue at the lock client side. if !ok { + *reply = false return fmt.Errorf("Unlock attempted on an un-locked entity: %s", args.Name) + } else if len(locksHeld) == 1 && locksHeld[0] == true { + *reply = true + delete(l.lockMap, args.Name) + return nil + } else { + *reply = false + return fmt.Errorf("Unlock attempted on a read locked entity: %s (%d read locks active)", args.Name, len(locksHeld)) } - *reply = true - delete(l.lockMap, args.Name) - return nil } +// RLock - rpc handler for read lock operation. func (l *lockServer) RLock(args *LockArgs, reply *bool) error { l.mutex.Lock() defer l.mutex.Unlock() @@ -142,10 +148,10 @@ func (l *lockServer) RLock(args *LockArgs, reply *bool) error { l.lockMap[args.Name] = append(locksHeld, false) *reply = true } - return nil } +// RUnlock - rpc handler for read unlock operation. func (l *lockServer) RUnlock(args *LockArgs, reply *bool) error { l.mutex.Lock() defer l.mutex.Unlock() @@ -154,9 +160,13 @@ func (l *lockServer) RUnlock(args *LockArgs, reply *bool) error { } locksHeld, ok := l.lockMap[args.Name] if !ok { + *reply = false return fmt.Errorf("RUnlock attempted on an un-locked entity: %s", args.Name) - } - if len(locksHeld) > 1 { + } else if len(locksHeld) == 1 && locksHeld[0] == true { + // A write-lock is held, cannot release a read lock + *reply = false + return fmt.Errorf("RUnlock attempted on a write locked entity: %s", args.Name) + } else if len(locksHeld) > 1 { // Remove one of the read locks held. locksHeld = locksHeld[1:] l.lockMap[args.Name] = locksHeld diff --git a/cmd/namespace-lock.go b/cmd/namespace-lock.go index 3549a21cf..5347fd234 100644 --- a/cmd/namespace-lock.go +++ b/cmd/namespace-lock.go @@ -75,8 +75,9 @@ type nsParam struct { // nsLock - provides primitives for locking critical namespace regions. type nsLock struct { - RWLocker - ref uint + writer RWLocker + readerArray []RWLocker + ref uint } // nsLockMap - namespace lock map, provides primitives to Lock, @@ -96,7 +97,7 @@ func (n *nsLockMap) lock(volume, path string, readLock bool) { nsLk, found := n.lockMap[param] if !found { nsLk = &nsLock{ - RWLocker: func() RWLocker { + writer: func() RWLocker { if n.isDist { return dsync.NewDRWMutex(pathutil.Join(volume, path)) } @@ -107,14 +108,29 @@ func (n *nsLockMap) lock(volume, path string, readLock bool) { n.lockMap[param] = nsLk } nsLk.ref++ // Update ref count here to avoid multiple races. + rwlock := nsLk.writer + if readLock { + rwlock = dsync.NewDRWMutex(pathutil.Join(volume, path)) + } // Unlock map before Locking NS which might block. n.lockMapMutex.Unlock() // Locking here can block. if readLock { - nsLk.RLock() + rwlock.RLock() + + // Only add (for reader case) to array after RLock() succeeds + // (so that we know for sure that element in [0] can be RUnlocked()) + n.lockMapMutex.Lock() + if len(nsLk.readerArray) == 0 { + nsLk.readerArray = []RWLocker{rwlock} + } else { + nsLk.readerArray = append(nsLk.readerArray, rwlock) + } + n.lockMapMutex.Unlock() + } else { - nsLk.Lock() + rwlock.Lock() } } @@ -127,9 +143,15 @@ func (n *nsLockMap) unlock(volume, path string, readLock bool) { param := nsParam{volume, path} if nsLk, found := n.lockMap[param]; found { if readLock { - nsLk.RUnlock() + if len(nsLk.readerArray) == 0 { + errorIf(errors.New("Length of reader lock array cannot be 0."), "Invalid reader lock array length detected.") + } + // Release first lock first (FIFO) + nsLk.readerArray[0].RUnlock() + // And discard first element + nsLk.readerArray = nsLk.readerArray[1:] } else { - nsLk.Unlock() + nsLk.writer.Unlock() } if nsLk.ref == 0 { errorIf(errors.New("Namespace reference count cannot be 0."), "Invalid reference count detected.") @@ -138,6 +160,10 @@ func (n *nsLockMap) unlock(volume, path string, readLock bool) { nsLk.ref-- } if nsLk.ref == 0 { + if len(nsLk.readerArray) != 0 { + errorIf(errors.New("Length of reader lock array should be 0 upon deleting map entry."), "Invalid reader lock array length detected.") + } + // Remove from the map if there are no more references. delete(n.lockMap, param) } diff --git a/cmd/net-rpc-client.go b/cmd/net-rpc-client.go index 6e53f15cf..e3b02d016 100644 --- a/cmd/net-rpc-client.go +++ b/cmd/net-rpc-client.go @@ -17,16 +17,17 @@ package cmd import ( + "errors" "net/rpc" "sync" ) // RPCClient is a wrapper type for rpc.Client which provides reconnect on first failure. type RPCClient struct { - sync.Mutex - rpc *rpc.Client - node string - rpcPath string + mu sync.Mutex + rpcPrivate *rpc.Client + node string + rpcPath string } // newClient constructs a RPCClient object with node and rpcPath initialized. @@ -39,42 +40,80 @@ func newClient(node, rpcPath string) *RPCClient { } } -// Close closes the underlying socket file descriptor. -func (rpcClient *RPCClient) Close() error { - rpcClient.Lock() - defer rpcClient.Unlock() - // If rpc client has not connected yet there is nothing to close. - if rpcClient.rpc == nil { - return nil +// clearRPCClient clears the pointer to the rpc.Client object in a safe manner +func (rpcClient *RPCClient) clearRPCClient() { + rpcClient.mu.Lock() + rpcClient.rpcPrivate = nil + rpcClient.mu.Unlock() +} + +// getRPCClient gets the pointer to the rpc.Client object in a safe manner +func (rpcClient *RPCClient) getRPCClient() *rpc.Client { + rpcClient.mu.Lock() + rpcLocalStack := rpcClient.rpcPrivate + rpcClient.mu.Unlock() + return rpcLocalStack +} + +// dialRPCClient tries to establish a connection to the server in a safe manner +func (rpcClient *RPCClient) dialRPCClient() (*rpc.Client, error) { + rpcClient.mu.Lock() + defer rpcClient.mu.Unlock() + // After acquiring lock, check whether another thread may not have already dialed and established connection + if rpcClient.rpcPrivate != nil { + return rpcClient.rpcPrivate, nil } - // Reset rpcClient.rpc to allow for subsequent calls to use a new - // (socket) connection. - clnt := rpcClient.rpc - rpcClient.rpc = nil - return clnt.Close() + rpc, err := rpc.DialHTTPPath("tcp", rpcClient.node, rpcClient.rpcPath) + if err != nil { + return nil, err + } else if rpc == nil { + return nil, errors.New("No valid RPC Client created after dial") + } + rpcClient.rpcPrivate = rpc + return rpcClient.rpcPrivate, nil } // Call makes a RPC call to the remote endpoint using the default codec, namely encoding/gob. func (rpcClient *RPCClient) Call(serviceMethod string, args interface{}, reply interface{}) error { - rpcClient.Lock() - defer rpcClient.Unlock() + + // Make a copy below so that we can safely (continue to) work with the rpc.Client. + // Even in the case the two threads would simultaneously find that the connection is not initialised, + // they would both attempt to dial and only one of them would succeed in doing so. + rpcLocalStack := rpcClient.getRPCClient() + // If the rpc.Client is nil, we attempt to (re)connect with the remote endpoint. - if rpcClient.rpc == nil { - clnt, err := rpc.DialHTTPPath("tcp", rpcClient.node, rpcClient.rpcPath) + if rpcLocalStack == nil { + var err error + rpcLocalStack, err = rpcClient.dialRPCClient() if err != nil { return err } - rpcClient.rpc = clnt } // If the RPC fails due to a network-related error, then we reset // rpc.Client for a subsequent reconnect. - err := rpcClient.rpc.Call(serviceMethod, args, reply) + err := rpcLocalStack.Call(serviceMethod, args, reply) if IsRPCError(err) { - rpcClient.rpc = nil + rpcClient.clearRPCClient() } return err +} +// Close closes the underlying socket file descriptor. +func (rpcClient *RPCClient) Close() error { + + // See comment above for making a copy on local stack + rpcLocalStack := rpcClient.getRPCClient() + + // If rpc client has not connected yet there is nothing to close. + if rpcLocalStack == nil { + return nil + } + + // Reset rpcClient.rpc to allow for subsequent calls to use a new + // (socket) connection. + rpcClient.clearRPCClient() + return rpcLocalStack.Close() } // IsRPCError returns true if the error value is due to a network related