mirror of
https://github.com/minio/minio.git
synced 2025-01-23 20:53:18 -05:00
1875a47495
Previously, more than one goroutine calls RPCClient.dial(), each goroutine gets a new rpc.Client but only one such client is stored into RPCClient object. This leads to leaky connection at the server side. This is fixed by taking lock at top of dial() and release on return.
303 lines
9.4 KiB
Go
303 lines
9.4 KiB
Go
/*
|
|
* Minio Cloud Storage, (C) 2016 Minio, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"fmt"
|
|
"math/rand"
|
|
"net/rpc"
|
|
"path"
|
|
"sync"
|
|
"time"
|
|
|
|
router "github.com/gorilla/mux"
|
|
)
|
|
|
|
const lockRPCPath = "/minio/lock"
|
|
const lockMaintenanceLoop = 1 * time.Minute
|
|
const lockCheckValidityInterval = 2 * time.Minute
|
|
|
|
// LockArgs besides lock name, holds Token and Timestamp for session
|
|
// authentication and validation server restart.
|
|
type LockArgs struct {
|
|
Name string
|
|
Token string
|
|
Timestamp time.Time
|
|
Node string
|
|
RPCPath string
|
|
UID string
|
|
}
|
|
|
|
// SetToken - sets the token to the supplied value.
|
|
func (l *LockArgs) SetToken(token string) {
|
|
l.Token = token
|
|
}
|
|
|
|
// SetTimestamp - sets the timestamp to the supplied value.
|
|
func (l *LockArgs) SetTimestamp(tstamp time.Time) {
|
|
l.Timestamp = tstamp
|
|
}
|
|
|
|
// lockRequesterInfo stores various info from the client for each lock that is requested
|
|
type lockRequesterInfo struct {
|
|
writer bool // Bool whether write or read lock
|
|
node string // Network address of client claiming lock
|
|
rpcPath string // RPC path of client claiming lock
|
|
uid string // Uid to uniquely identify request of client
|
|
timestamp time.Time // Timestamp set at the time of initialization
|
|
timeLastCheck time.Time // Timestamp for last check of validity of lock
|
|
}
|
|
|
|
// isWriteLock returns whether the lock is a write or read lock
|
|
func isWriteLock(lri []lockRequesterInfo) bool {
|
|
return len(lri) == 1 && lri[0].writer
|
|
}
|
|
|
|
// lockServer is type for RPC handlers
|
|
type lockServer struct {
|
|
loginServer
|
|
rpcPath string
|
|
mutex sync.Mutex
|
|
lockMap map[string][]lockRequesterInfo
|
|
}
|
|
|
|
// Register distributed NS lock handlers.
|
|
func registerDistNSLockRouter(mux *router.Router, serverConfig serverCmdConfig) error {
|
|
lockServers := newLockServers(serverConfig)
|
|
return registerStorageLockers(mux, lockServers)
|
|
}
|
|
|
|
// Create one lock server for every local storage rpc server.
|
|
func newLockServers(srvConfig serverCmdConfig) (lockServers []*lockServer) {
|
|
for _, ep := range srvConfig.endpoints {
|
|
// Not local storage move to the next node.
|
|
if !isLocalStorage(ep) {
|
|
continue
|
|
}
|
|
|
|
// Create handler for lock RPCs
|
|
locker := &lockServer{
|
|
rpcPath: getPath(ep),
|
|
mutex: sync.Mutex{},
|
|
lockMap: make(map[string][]lockRequesterInfo),
|
|
}
|
|
|
|
// Start loop for stale lock maintenance
|
|
go func() {
|
|
// Start with random sleep time, so as to avoid "synchronous checks" between servers
|
|
time.Sleep(time.Duration(rand.Float64() * float64(lockMaintenanceLoop)))
|
|
for {
|
|
time.Sleep(lockMaintenanceLoop)
|
|
locker.lockMaintenance(lockCheckValidityInterval)
|
|
}
|
|
}()
|
|
lockServers = append(lockServers, locker)
|
|
}
|
|
return lockServers
|
|
}
|
|
|
|
// registerStorageLockers - register locker rpc handlers for net/rpc library clients
|
|
func registerStorageLockers(mux *router.Router, lockServers []*lockServer) error {
|
|
for _, lockServer := range lockServers {
|
|
lockRPCServer := rpc.NewServer()
|
|
err := lockRPCServer.RegisterName("Dsync", lockServer)
|
|
if err != nil {
|
|
return traceError(err)
|
|
}
|
|
lockRouter := mux.PathPrefix(reservedBucket).Subrouter()
|
|
lockRouter.Path(path.Join("/lock", lockServer.rpcPath)).Handler(lockRPCServer)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
/// Distributed lock handlers
|
|
|
|
// Lock - rpc handler for (single) write lock operation.
|
|
func (l *lockServer) Lock(args *LockArgs, reply *bool) error {
|
|
l.mutex.Lock()
|
|
defer l.mutex.Unlock()
|
|
if err := l.validateLockArgs(args); err != nil {
|
|
return err
|
|
}
|
|
_, *reply = l.lockMap[args.Name]
|
|
if !*reply { // No locks held on the given name, so claim write lock
|
|
l.lockMap[args.Name] = []lockRequesterInfo{
|
|
{
|
|
writer: true,
|
|
node: args.Node,
|
|
rpcPath: args.RPCPath,
|
|
uid: args.UID,
|
|
timestamp: time.Now().UTC(),
|
|
timeLastCheck: time.Now().UTC(),
|
|
},
|
|
}
|
|
}
|
|
*reply = !*reply // Negate *reply to return true when lock is granted or false otherwise
|
|
return nil
|
|
}
|
|
|
|
// Unlock - rpc handler for (single) write unlock operation.
|
|
func (l *lockServer) Unlock(args *LockArgs, reply *bool) error {
|
|
l.mutex.Lock()
|
|
defer l.mutex.Unlock()
|
|
if err := l.validateLockArgs(args); err != nil {
|
|
return err
|
|
}
|
|
var lri []lockRequesterInfo
|
|
if lri, *reply = l.lockMap[args.Name]; !*reply { // No lock is held on the given name
|
|
return fmt.Errorf("Unlock attempted on an unlocked entity: %s", args.Name)
|
|
}
|
|
if *reply = isWriteLock(lri); !*reply { // Unless it is a write lock
|
|
return fmt.Errorf("Unlock attempted on a read locked entity: %s (%d read locks active)", args.Name, len(lri))
|
|
}
|
|
if !l.removeEntry(args.Name, args.UID, &lri) {
|
|
return fmt.Errorf("Unlock unable to find corresponding lock for uid: %s", args.UID)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// RLock - rpc handler for read lock operation.
|
|
func (l *lockServer) RLock(args *LockArgs, reply *bool) error {
|
|
l.mutex.Lock()
|
|
defer l.mutex.Unlock()
|
|
if err := l.validateLockArgs(args); err != nil {
|
|
return err
|
|
}
|
|
lrInfo := lockRequesterInfo{
|
|
writer: false,
|
|
node: args.Node,
|
|
rpcPath: args.RPCPath,
|
|
uid: args.UID,
|
|
timestamp: time.Now().UTC(),
|
|
timeLastCheck: time.Now().UTC(),
|
|
}
|
|
if lri, ok := l.lockMap[args.Name]; ok {
|
|
if *reply = !isWriteLock(lri); *reply { // Unless there is a write lock
|
|
l.lockMap[args.Name] = append(l.lockMap[args.Name], lrInfo)
|
|
}
|
|
} else { // No locks held on the given name, so claim (first) read lock
|
|
l.lockMap[args.Name] = []lockRequesterInfo{lrInfo}
|
|
*reply = true
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// RUnlock - rpc handler for read unlock operation.
|
|
func (l *lockServer) RUnlock(args *LockArgs, reply *bool) error {
|
|
l.mutex.Lock()
|
|
defer l.mutex.Unlock()
|
|
if err := l.validateLockArgs(args); err != nil {
|
|
return err
|
|
}
|
|
var lri []lockRequesterInfo
|
|
if lri, *reply = l.lockMap[args.Name]; !*reply { // No lock is held on the given name
|
|
return fmt.Errorf("RUnlock attempted on an unlocked entity: %s", args.Name)
|
|
}
|
|
if *reply = !isWriteLock(lri); !*reply { // A write-lock is held, cannot release a read lock
|
|
return fmt.Errorf("RUnlock attempted on a write locked entity: %s", args.Name)
|
|
}
|
|
if !l.removeEntry(args.Name, args.UID, &lri) {
|
|
return fmt.Errorf("RUnlock unable to find corresponding read lock for uid: %s", args.UID)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ForceUnlock - rpc handler for force unlock operation.
|
|
func (l *lockServer) ForceUnlock(args *LockArgs, reply *bool) error {
|
|
l.mutex.Lock()
|
|
defer l.mutex.Unlock()
|
|
if err := l.validateLockArgs(args); err != nil {
|
|
return err
|
|
}
|
|
if len(args.UID) != 0 {
|
|
return fmt.Errorf("ForceUnlock called with non-empty UID: %s", args.UID)
|
|
}
|
|
if _, ok := l.lockMap[args.Name]; ok { // Only clear lock when set
|
|
delete(l.lockMap, args.Name) // Remove the lock (irrespective of write or read lock)
|
|
}
|
|
*reply = true
|
|
return nil
|
|
}
|
|
|
|
// Expired - rpc handler for expired lock status.
|
|
func (l *lockServer) Expired(args *LockArgs, reply *bool) error {
|
|
l.mutex.Lock()
|
|
defer l.mutex.Unlock()
|
|
if err := l.validateLockArgs(args); err != nil {
|
|
return err
|
|
}
|
|
// Lock found, proceed to verify if belongs to given uid.
|
|
if lri, ok := l.lockMap[args.Name]; ok {
|
|
// Check whether uid is still active
|
|
for _, entry := range lri {
|
|
if entry.uid == args.UID {
|
|
*reply = false // When uid found, lock is still active so return not expired.
|
|
return nil // When uid found *reply is set to true.
|
|
}
|
|
}
|
|
}
|
|
// When we get here lock is no longer active due to either args.Name
|
|
// being absent from map or uid not found for given args.Name
|
|
*reply = true
|
|
return nil
|
|
}
|
|
|
|
// nameLockRequesterInfoPair is a helper type for lock maintenance
|
|
type nameLockRequesterInfoPair struct {
|
|
name string
|
|
lri lockRequesterInfo
|
|
}
|
|
|
|
// lockMaintenance loops over locks that have been active for some time and checks back
|
|
// with the original server whether it is still alive or not
|
|
//
|
|
// Following logic inside ignores the errors generated for Dsync.Active operation.
|
|
// - server at client down
|
|
// - some network error (and server is up normally)
|
|
//
|
|
// We will ignore the error, and we will retry later to get a resolve on this lock
|
|
func (l *lockServer) lockMaintenance(interval time.Duration) {
|
|
l.mutex.Lock()
|
|
// Get list of long lived locks to check for staleness.
|
|
nlripLongLived := getLongLivedLocks(l.lockMap, interval)
|
|
l.mutex.Unlock()
|
|
|
|
// Validate if long lived locks are indeed clean.
|
|
for _, nlrip := range nlripLongLived {
|
|
// Initialize client based on the long live locks.
|
|
c := newRPCClient(nlrip.lri.node, nlrip.lri.rpcPath, isSSL())
|
|
|
|
var expired bool
|
|
|
|
// Call back to original server verify whether the lock is still active (based on name & uid)
|
|
c.Call("Dsync.Expired", &LockArgs{
|
|
Name: nlrip.name,
|
|
UID: nlrip.lri.uid,
|
|
}, &expired)
|
|
c.Close() // Close the connection regardless of the call response.
|
|
|
|
// For successful response, verify if lock is indeed active or stale.
|
|
if expired {
|
|
// The lock is no longer active at server that originated the lock
|
|
// So remove the lock from the map.
|
|
l.mutex.Lock()
|
|
l.removeEntryIfExists(nlrip) // Purge the stale entry if it exists.
|
|
l.mutex.Unlock()
|
|
}
|
|
}
|
|
}
|