mirror of
https://github.com/minio/minio.git
synced 2024-12-23 21:55:53 -05:00
lock/instrumentation: Cleanup and print in user friendly form. (#2807)
This commit is contained in:
parent
3ac6790ca2
commit
fa8ea41cd9
@ -29,8 +29,13 @@ type GenericReply struct{}
|
||||
|
||||
// GenericArgs represents any generic RPC arguments.
|
||||
type GenericArgs struct {
|
||||
Token string // Used to authenticate every RPC call.
|
||||
Timestamp time.Time // Used to verify if the RPC call was issued between the same Login() and disconnect event pair.
|
||||
Token string // Used to authenticate every RPC call.
|
||||
// Used to verify if the RPC call was issued between
|
||||
// the same Login() and disconnect event pair.
|
||||
Timestamp time.Time
|
||||
|
||||
// Indicates if args should be sent to remote peers as well.
|
||||
Remote bool
|
||||
}
|
||||
|
||||
// SetToken - sets the token to the supplied value.
|
||||
@ -95,7 +100,6 @@ type AuthRPCClient struct {
|
||||
rpc *RPCClient // reconnect'able rpc client built on top of net/rpc Client
|
||||
isLoggedIn bool // Indicates if the auth client has been logged in and token is valid.
|
||||
token string // JWT based token
|
||||
tstamp time.Time // Timestamp as received on Login RPC.
|
||||
serverVersion string // Server version exchanged by the RPC.
|
||||
}
|
||||
|
||||
@ -141,7 +145,6 @@ func (authClient *AuthRPCClient) Login() error {
|
||||
}
|
||||
// Set token, time stamp as received from a successful login call.
|
||||
authClient.token = reply.Token
|
||||
authClient.tstamp = reply.Timestamp
|
||||
authClient.serverVersion = reply.ServerVersion
|
||||
authClient.isLoggedIn = true
|
||||
return nil
|
||||
@ -158,7 +161,7 @@ func (authClient *AuthRPCClient) Call(serviceMethod string, args interface {
|
||||
if err = authClient.Login(); err == nil {
|
||||
// Set token and timestamp before the rpc call.
|
||||
args.SetToken(authClient.token)
|
||||
args.SetTimestamp(authClient.tstamp)
|
||||
args.SetTimestamp(time.Now().UTC())
|
||||
|
||||
// Call the underlying rpc.
|
||||
err = authClient.rpc.Call(serviceMethod, args, reply)
|
||||
|
@ -34,7 +34,7 @@ var errServerTimeMismatch = errors.New("Server times are too far apart.")
|
||||
/// Auth operations
|
||||
|
||||
// Login - login handler.
|
||||
func (c *controllerAPIHandlers) LoginHandler(args *RPCLoginArgs, reply *RPCLoginReply) error {
|
||||
func (c *controlAPIHandlers) LoginHandler(args *RPCLoginArgs, reply *RPCLoginReply) error {
|
||||
jwt, err := newJWT(defaultInterNodeJWTExpiry)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -47,7 +47,7 @@ func (c *controllerAPIHandlers) LoginHandler(args *RPCLoginArgs, reply *RPCLogin
|
||||
return err
|
||||
}
|
||||
reply.Token = token
|
||||
reply.Timestamp = c.timestamp
|
||||
reply.Timestamp = time.Now().UTC()
|
||||
reply.ServerVersion = Version
|
||||
return nil
|
||||
}
|
||||
@ -72,7 +72,7 @@ type HealListReply struct {
|
||||
}
|
||||
|
||||
// ListObjects - list all objects that needs healing.
|
||||
func (c *controllerAPIHandlers) ListObjectsHealHandler(args *HealListArgs, reply *HealListReply) error {
|
||||
func (c *controlAPIHandlers) ListObjectsHealHandler(args *HealListArgs, reply *HealListReply) error {
|
||||
objAPI := c.ObjectAPI()
|
||||
if objAPI == nil {
|
||||
return errServerNotInitialized
|
||||
@ -108,7 +108,7 @@ type HealObjectArgs struct {
|
||||
type HealObjectReply struct{}
|
||||
|
||||
// HealObject - heal the object.
|
||||
func (c *controllerAPIHandlers) HealObjectHandler(args *HealObjectArgs, reply *GenericReply) error {
|
||||
func (c *controlAPIHandlers) HealObjectHandler(args *HealObjectArgs, reply *GenericReply) error {
|
||||
objAPI := c.ObjectAPI()
|
||||
if objAPI == nil {
|
||||
return errServerNotInitialized
|
||||
@ -120,7 +120,7 @@ func (c *controllerAPIHandlers) HealObjectHandler(args *HealObjectArgs, reply *G
|
||||
}
|
||||
|
||||
// HealObject - heal the object.
|
||||
func (c *controllerAPIHandlers) HealDiskMetadataHandler(args *GenericArgs, reply *GenericReply) error {
|
||||
func (c *controlAPIHandlers) HealDiskMetadataHandler(args *GenericArgs, reply *GenericReply) error {
|
||||
if !isRPCTokenValid(args.Token) {
|
||||
return errInvalidToken
|
||||
}
|
||||
@ -143,9 +143,6 @@ type ServiceArgs struct {
|
||||
// to perform. Currently supported signals are
|
||||
// stop, restart and status.
|
||||
Signal serviceSignal
|
||||
|
||||
// Make remote calls.
|
||||
Remote bool
|
||||
}
|
||||
|
||||
// ServiceReply - represents service operation success info.
|
||||
@ -153,24 +150,30 @@ type ServiceReply struct {
|
||||
StorageInfo StorageInfo
|
||||
}
|
||||
|
||||
func (c *controllerAPIHandlers) remoteCall(serviceMethod string, args interface {
|
||||
SetToken(token string)
|
||||
SetTimestamp(tstamp time.Time)
|
||||
}, reply interface{}) {
|
||||
// Remote procedure call, calls serviceMethod with given input args.
|
||||
func (c *controlAPIHandlers) remoteServiceCall(args *ServiceArgs, replies []*ServiceReply) error {
|
||||
var wg sync.WaitGroup
|
||||
for index, clnt := range c.RemoteControllers {
|
||||
var errs = make([]error, len(c.RemoteControls))
|
||||
// Send remote call to all neighboring peers to restart minio servers.
|
||||
for index, clnt := range c.RemoteControls {
|
||||
wg.Add(1)
|
||||
go func(index int, client *AuthRPCClient) {
|
||||
defer wg.Done()
|
||||
err := client.Call(serviceMethod, args, reply)
|
||||
errorIf(err, "Unable to initiate %s", serviceMethod)
|
||||
errs[index] = client.Call("Control.ServiceHandler", args, replies[index])
|
||||
errorIf(errs[index], "Unable to initiate control service request to remote node %s", client.Node())
|
||||
}(index, clnt)
|
||||
}
|
||||
wg.Wait()
|
||||
for _, err := range errs {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Service - handler for sending service signals across many servers.
|
||||
func (c *controllerAPIHandlers) ServiceHandler(args *ServiceArgs, reply *ServiceReply) error {
|
||||
func (c *controlAPIHandlers) ServiceHandler(args *ServiceArgs, reply *ServiceReply) error {
|
||||
if !isRPCTokenValid(args.Token) {
|
||||
return errInvalidToken
|
||||
}
|
||||
@ -182,21 +185,24 @@ func (c *controllerAPIHandlers) ServiceHandler(args *ServiceArgs, reply *Service
|
||||
reply.StorageInfo = objAPI.StorageInfo()
|
||||
return nil
|
||||
}
|
||||
var replies = make([]*ServiceReply, len(c.RemoteControls))
|
||||
switch args.Signal {
|
||||
case serviceRestart:
|
||||
if args.Remote {
|
||||
// Set remote as false for remote calls.
|
||||
args.Remote = false
|
||||
// Send remote call to all neighboring peers to restart minio servers.
|
||||
c.remoteCall("Controller.ServiceHandler", args, reply)
|
||||
if err := c.remoteServiceCall(args, replies); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
globalServiceSignalCh <- serviceRestart
|
||||
case serviceStop:
|
||||
if args.Remote {
|
||||
// Set remote as false for remote calls.
|
||||
args.Remote = false
|
||||
// Send remote call to all neighboring peers to stop minio servers.
|
||||
c.remoteCall("Controller.ServiceHandler", args, reply)
|
||||
if err := c.remoteServiceCall(args, replies); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
globalServiceSignalCh <- serviceStop
|
||||
}
|
||||
@ -204,14 +210,13 @@ func (c *controllerAPIHandlers) ServiceHandler(args *ServiceArgs, reply *Service
|
||||
}
|
||||
|
||||
// LockInfo - RPC control handler for `minio control lock`. Returns the info of the locks held in the system.
|
||||
func (c *controllerAPIHandlers) LockInfo(arg *GenericArgs, reply *SystemLockState) error {
|
||||
// obtain the lock state information.
|
||||
lockInfo, err := generateSystemLockResponse()
|
||||
// in case of error, return err to the RPC client.
|
||||
if err != nil {
|
||||
return err
|
||||
func (c *controlAPIHandlers) TryInitHandler(args *GenericArgs, reply *GenericReply) error {
|
||||
if !isRPCTokenValid(args.Token) {
|
||||
return errInvalidToken
|
||||
}
|
||||
// The response containing the lock info.
|
||||
*reply = lockInfo
|
||||
go func() {
|
||||
globalWakeupCh <- struct{}{}
|
||||
}()
|
||||
*reply = GenericReply{}
|
||||
return nil
|
||||
}
|
@ -90,7 +90,7 @@ func healControl(ctx *cli.Context) {
|
||||
secureConn: parsedURL.Scheme == "https",
|
||||
address: parsedURL.Host,
|
||||
path: path.Join(reservedBucket, controlPath),
|
||||
loginMethod: "Controller.LoginHandler",
|
||||
loginMethod: "Control.LoginHandler",
|
||||
}
|
||||
client := newAuthClient(authCfg)
|
||||
|
||||
@ -98,7 +98,7 @@ func healControl(ctx *cli.Context) {
|
||||
fmt.Print("Checking and healing disk metadata..")
|
||||
args := &GenericArgs{}
|
||||
reply := &GenericReply{}
|
||||
err = client.Call("Controller.HealDiskMetadataHandler", args, reply)
|
||||
err = client.Call("Control.HealDiskMetadataHandler", args, reply)
|
||||
fatalIf(err, "Unable to heal disk metadata.")
|
||||
fmt.Println(" ok")
|
||||
|
||||
@ -112,7 +112,7 @@ func healControl(ctx *cli.Context) {
|
||||
fmt.Printf("Healing : /%s/%s\n", bucketName, objectName)
|
||||
args := &HealObjectArgs{Bucket: bucketName, Object: objectName}
|
||||
reply := &HealObjectReply{}
|
||||
err = client.Call("Controller.HealObjectHandler", args, reply)
|
||||
err = client.Call("Control.HealObjectHandler", args, reply)
|
||||
errorIf(err, "Healing object %s failed.", objectName)
|
||||
return
|
||||
}
|
||||
@ -129,7 +129,7 @@ func healControl(ctx *cli.Context) {
|
||||
MaxKeys: 1000,
|
||||
}
|
||||
reply := &HealListReply{}
|
||||
err = client.Call("Controller.ListObjectsHealHandler", args, reply)
|
||||
err = client.Call("Control.ListObjectsHealHandler", args, reply)
|
||||
fatalIf(err, "Unable to list objects for healing.")
|
||||
|
||||
// Heal the objects returned in the ListObjects reply.
|
||||
@ -137,7 +137,7 @@ func healControl(ctx *cli.Context) {
|
||||
fmt.Printf("Healing : /%s/%s\n", bucketName, obj)
|
||||
reply := &GenericReply{}
|
||||
healArgs := &HealObjectArgs{Bucket: bucketName, Object: obj}
|
||||
err = client.Call("Controller.HealObjectHandler", healArgs, reply)
|
||||
err = client.Call("Control.HealObjectHandler", healArgs, reply)
|
||||
errorIf(err, "Healing object %s failed.", obj)
|
||||
}
|
||||
|
||||
|
@ -17,129 +17,134 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"path"
|
||||
"time"
|
||||
|
||||
"github.com/minio/cli"
|
||||
"github.com/minio/mc/pkg/console"
|
||||
)
|
||||
|
||||
// SystemLockState - Structure to fill the lock state of entire object storage.
|
||||
// That is the total locks held, total calls blocked on locks and state of all the locks for the entire system.
|
||||
type SystemLockState struct {
|
||||
TotalLocks int64 `json:"totalLocks"`
|
||||
TotalBlockedLocks int64 `json:"totalBlockedLocks"` // count of operations which are blocked waiting for the lock to be released.
|
||||
TotalAcquiredLocks int64 `json:"totalAcquiredLocks"` // count of operations which has successfully acquired the lock but hasn't unlocked yet( operation in progress).
|
||||
LocksInfoPerObject []VolumeLockInfo `json:"locksInfoPerObject"`
|
||||
}
|
||||
|
||||
// VolumeLockInfo - Structure to contain the lock state info for volume, path pair.
|
||||
type VolumeLockInfo struct {
|
||||
Bucket string `json:"bucket"`
|
||||
Object string `json:"object"`
|
||||
LocksOnObject int64 `json:"locksOnObject"` // All locks blocked + running for given <volume,path> pair.
|
||||
LocksAcquiredOnObject int64 `json:"locksAcquiredOnObject"` // count of operations which has successfully acquired the lock but hasn't unlocked yet( operation in progress).
|
||||
TotalBlockedLocks int64 `json:"locksBlockedOnObject"` // count of operations which are blocked waiting for the lock to be released.
|
||||
LockDetailsOnObject []OpsLockState `json:"lockDetailsOnObject"` // state information containing state of the locks for all operations on given <volume,path> pair.
|
||||
}
|
||||
|
||||
// OpsLockState - structure to fill in state information of the lock.
|
||||
// structure to fill in status information for each operation with given operation ID.
|
||||
type OpsLockState struct {
|
||||
OperationID string `json:"opsID"` // string containing operation ID.
|
||||
LockOrigin string `json:"lockOrigin"` // contant which mentions the operation type (Get Obejct, PutObject...)
|
||||
LockType string `json:"lockType"`
|
||||
Status string `json:"status"` // status can be running/ready/blocked.
|
||||
StatusSince string `json:"statusSince"` // time info of the since how long the status holds true, value in seconds.
|
||||
}
|
||||
|
||||
// Read entire state of the locks in the system and return.
|
||||
func generateSystemLockResponse() (SystemLockState, error) {
|
||||
nsMutex.lockMapMutex.Lock()
|
||||
defer nsMutex.lockMapMutex.Unlock()
|
||||
|
||||
if nsMutex.debugLockMap == nil {
|
||||
return SystemLockState{}, errLockNotInitialized
|
||||
}
|
||||
|
||||
lockState := SystemLockState{}
|
||||
|
||||
lockState.TotalBlockedLocks = nsMutex.blockedCounter
|
||||
lockState.TotalLocks = nsMutex.globalLockCounter
|
||||
lockState.TotalAcquiredLocks = nsMutex.runningLockCounter
|
||||
|
||||
for param := range nsMutex.debugLockMap {
|
||||
volLockInfo := VolumeLockInfo{}
|
||||
volLockInfo.Bucket = param.volume
|
||||
volLockInfo.Object = param.path
|
||||
volLockInfo.TotalBlockedLocks = nsMutex.debugLockMap[param].blocked
|
||||
volLockInfo.LocksAcquiredOnObject = nsMutex.debugLockMap[param].running
|
||||
volLockInfo.LocksOnObject = nsMutex.debugLockMap[param].ref
|
||||
for opsID := range nsMutex.debugLockMap[param].lockInfo {
|
||||
opsState := OpsLockState{}
|
||||
opsState.OperationID = opsID
|
||||
opsState.LockOrigin = nsMutex.debugLockMap[param].lockInfo[opsID].lockOrigin
|
||||
opsState.LockType = nsMutex.debugLockMap[param].lockInfo[opsID].lockType
|
||||
opsState.Status = nsMutex.debugLockMap[param].lockInfo[opsID].status
|
||||
opsState.StatusSince = time.Now().UTC().Sub(nsMutex.debugLockMap[param].lockInfo[opsID].since).String()
|
||||
|
||||
volLockInfo.LockDetailsOnObject = append(volLockInfo.LockDetailsOnObject, opsState)
|
||||
}
|
||||
lockState.LocksInfoPerObject = append(lockState.LocksInfoPerObject, volLockInfo)
|
||||
}
|
||||
|
||||
return lockState, nil
|
||||
var lockFlags = []cli.Flag{
|
||||
cli.StringFlag{
|
||||
Name: "older-than",
|
||||
Usage: "List locks older than given time.",
|
||||
Value: "24h",
|
||||
},
|
||||
cli.BoolFlag{
|
||||
Name: "verbose",
|
||||
Usage: "Lists more information about locks.",
|
||||
},
|
||||
}
|
||||
|
||||
var lockCmd = cli.Command{
|
||||
Name: "lock",
|
||||
Usage: "info about the locks in the node.",
|
||||
Usage: "Prints current lock information.",
|
||||
Action: lockControl,
|
||||
Flags: globalFlags,
|
||||
Flags: append(lockFlags, globalFlags...),
|
||||
CustomHelpTemplate: `NAME:
|
||||
minio control {{.Name}} - {{.Usage}}
|
||||
|
||||
USAGE:
|
||||
minio control {{.Name}} http://localhost:9000/
|
||||
minio control {{.Name}} [list|clear] http://localhost:9000/
|
||||
|
||||
FLAGS:
|
||||
{{range .Flags}}{{.}}
|
||||
{{end}}
|
||||
|
||||
EAMPLES:
|
||||
1. Get all the info about the blocked/held locks in the node:
|
||||
$ minio control lock http://localhost:9000/
|
||||
1. List all currently active locks from all nodes. Defaults to list locks held longer than 24hrs.
|
||||
$ minio control {{.Name}} list http://localhost:9000/
|
||||
|
||||
2. List all currently active locks from all nodes. Request locks from older than 1minute.
|
||||
$ minio control {{.Name}} --older-than=1m list http://localhost:9000/
|
||||
`,
|
||||
}
|
||||
|
||||
// printLockStateVerbose - pretty prints systemLockState, additionally this filters out based on a given duration.
|
||||
func printLockStateVerbose(lkStateRep map[string]SystemLockState, olderThan time.Duration) {
|
||||
console.Println("Duration Server LockType LockAcquired Status LockOrigin Resource")
|
||||
for server, lockState := range lkStateRep {
|
||||
for _, lockInfo := range lockState.LocksInfoPerObject {
|
||||
lockedResource := path.Join(lockInfo.Bucket, lockInfo.Object)
|
||||
for _, lockDetails := range lockInfo.LockDetailsOnObject {
|
||||
if lockDetails.Duration < olderThan {
|
||||
continue
|
||||
}
|
||||
console.Println(lockDetails.Duration, server,
|
||||
lockDetails.LockType, lockDetails.Since,
|
||||
lockDetails.Status, lockDetails.LockOrigin,
|
||||
lockedResource)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// printLockState - pretty prints systemLockState, additionally this filters out based on a given duration.
|
||||
func printLockState(lkStateRep map[string]SystemLockState, olderThan time.Duration) {
|
||||
console.Println("Duration Server LockType Resource")
|
||||
for server, lockState := range lkStateRep {
|
||||
for _, lockInfo := range lockState.LocksInfoPerObject {
|
||||
lockedResource := path.Join(lockInfo.Bucket, lockInfo.Object)
|
||||
for _, lockDetails := range lockInfo.LockDetailsOnObject {
|
||||
if lockDetails.Duration < olderThan {
|
||||
continue
|
||||
}
|
||||
console.Println(lockDetails.Duration, server,
|
||||
lockDetails.LockType, lockedResource)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// "minio control lock" entry point.
|
||||
func lockControl(c *cli.Context) {
|
||||
if len(c.Args()) != 1 {
|
||||
if !c.Args().Present() && len(c.Args()) != 2 {
|
||||
cli.ShowCommandHelpAndExit(c, "lock", 1)
|
||||
}
|
||||
|
||||
parsedURL, err := url.Parse(c.Args()[0])
|
||||
parsedURL, err := url.Parse(c.Args().Get(1))
|
||||
fatalIf(err, "Unable to parse URL.")
|
||||
|
||||
// Parse older than string.
|
||||
olderThanStr := c.String("older-than")
|
||||
olderThan, err := time.ParseDuration(olderThanStr)
|
||||
fatalIf(err, "Unable to parse older-than time duration.")
|
||||
|
||||
// Verbose flag.
|
||||
verbose := c.Bool("verbose")
|
||||
|
||||
authCfg := &authConfig{
|
||||
accessKey: serverConfig.GetCredential().AccessKeyID,
|
||||
secretKey: serverConfig.GetCredential().SecretAccessKey,
|
||||
secureConn: parsedURL.Scheme == "https",
|
||||
address: parsedURL.Host,
|
||||
path: path.Join(reservedBucket, controlPath),
|
||||
loginMethod: "Controller.LoginHandler",
|
||||
loginMethod: "Control.LoginHandler",
|
||||
}
|
||||
client := newAuthClient(authCfg)
|
||||
|
||||
args := &GenericArgs{}
|
||||
reply := &SystemLockState{}
|
||||
err = client.Call("Controller.LockInfo", args, reply)
|
||||
// logs the error and returns if err != nil.
|
||||
fatalIf(err, "RPC Controller.LockInfo call failed")
|
||||
// print the lock info on the console.
|
||||
b, err := json.MarshalIndent(*reply, "", " ")
|
||||
fatalIf(err, "Failed to parse the RPC lock info response")
|
||||
fmt.Print(string(b))
|
||||
args := &GenericArgs{
|
||||
// This is necessary so that the remotes,
|
||||
// don't end up sending requests back and forth.
|
||||
Remote: true,
|
||||
}
|
||||
|
||||
subCommand := c.Args().Get(0)
|
||||
switch subCommand {
|
||||
case "list":
|
||||
lkStateRep := make(map[string]SystemLockState)
|
||||
// Request lock info, fetches from all the nodes in the cluster.
|
||||
err = client.Call("Control.LockInfo", args, &lkStateRep)
|
||||
fatalIf(err, "Unable to fetch system lockInfo.")
|
||||
if !verbose {
|
||||
printLockState(lkStateRep, olderThan)
|
||||
} else {
|
||||
printLockStateVerbose(lkStateRep, olderThan)
|
||||
}
|
||||
case "clear":
|
||||
// TODO. Defaults to clearing all locks.
|
||||
default:
|
||||
fatalIf(errInvalidArgument, "Unsupported lock control operation %s", c.Args().Get(0))
|
||||
}
|
||||
|
||||
}
|
||||
|
46
cmd/control-lock-main_test.go
Normal file
46
cmd/control-lock-main_test.go
Normal file
@ -0,0 +1,46 @@
|
||||
/*
|
||||
* Minio Cloud Storage, (C) 2016 Minio, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Test print systemState.
|
||||
func TestPrintLockState(t *testing.T) {
|
||||
nsMutex.Lock("testbucket", "1.txt", "11-11")
|
||||
sysLockState, err := getSystemLockState()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
nsMutex.Unlock("testbucket", "1.txt", "11-11")
|
||||
sysLockStateMap := map[string]SystemLockState{}
|
||||
sysLockStateMap["bucket"] = sysLockState
|
||||
|
||||
// Print lock state.
|
||||
printLockState(sysLockStateMap, 0)
|
||||
|
||||
// Print lock state verbose.
|
||||
printLockStateVerbose(sysLockStateMap, 0)
|
||||
|
||||
// Does not print any lock state in normal print mode.
|
||||
printLockState(sysLockStateMap, 10*time.Second)
|
||||
|
||||
// Does not print any lock state in debug print mode.
|
||||
printLockStateVerbose(sysLockStateMap, 10*time.Second)
|
||||
}
|
@ -49,23 +49,23 @@ func TestControlHealMain(t *testing.T) {
|
||||
|
||||
// Test to call lockControl() in control-lock-main.go
|
||||
func TestControlLockMain(t *testing.T) {
|
||||
// create cli app for testing
|
||||
// Create cli app for testing
|
||||
app := cli.NewApp()
|
||||
app.Commands = []cli.Command{controlCmd}
|
||||
|
||||
// start test server
|
||||
// Start test server
|
||||
testServer := StartTestServer(t, "XL")
|
||||
|
||||
// schedule cleanup at the end
|
||||
// Schedule cleanup at the end
|
||||
defer testServer.Stop()
|
||||
|
||||
// fetch http server endpoint
|
||||
// Fetch http server endpoint
|
||||
url := testServer.Server.URL
|
||||
|
||||
// create args to call
|
||||
args := []string{"./minio", "control", "lock", url}
|
||||
// Create args to call
|
||||
args := []string{"./minio", "control", "lock", "list", url}
|
||||
|
||||
// run app
|
||||
// Run app
|
||||
err := app.Run(args)
|
||||
if err != nil {
|
||||
t.Errorf("Control-Lock-Main test failed with - %s", err.Error())
|
||||
|
@ -21,7 +21,6 @@ import (
|
||||
"net/rpc"
|
||||
"path"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
router "github.com/gorilla/mux"
|
||||
"github.com/minio/minio-go/pkg/set"
|
||||
@ -29,31 +28,39 @@ import (
|
||||
|
||||
// Routes paths for "minio control" commands.
|
||||
const (
|
||||
controlPath = "/controller"
|
||||
controlPath = "/control"
|
||||
)
|
||||
|
||||
// Initializes remote controller clients for making remote requests.
|
||||
func initRemoteControllerClients(srvCmdConfig serverCmdConfig) []*AuthRPCClient {
|
||||
// Find local node through the command line arguments.
|
||||
func getLocalAddress(srvCmdConfig serverCmdConfig) string {
|
||||
if !srvCmdConfig.isDistXL {
|
||||
return fmt.Sprintf(":%d", globalMinioPort)
|
||||
}
|
||||
for _, export := range srvCmdConfig.disks {
|
||||
// Validates if remote disk is local.
|
||||
if isLocalStorage(export) {
|
||||
var host string
|
||||
if idx := strings.LastIndex(export, ":"); idx != -1 {
|
||||
host = export[:idx]
|
||||
}
|
||||
return fmt.Sprintf("%s:%d", host, globalMinioPort)
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// Initializes remote control clients for making remote requests.
|
||||
func initRemoteControlClients(srvCmdConfig serverCmdConfig) []*AuthRPCClient {
|
||||
if !srvCmdConfig.isDistXL {
|
||||
return nil
|
||||
}
|
||||
var newExports []string
|
||||
// Initialize auth rpc clients.
|
||||
exports := srvCmdConfig.disks
|
||||
ignoredExports := srvCmdConfig.ignoredDisks
|
||||
remoteHosts := set.NewStringSet()
|
||||
|
||||
// Initialize ignored disks in a new set.
|
||||
ignoredSet := set.NewStringSet()
|
||||
if len(ignoredExports) > 0 {
|
||||
ignoredSet = set.CreateStringSet(ignoredExports...)
|
||||
}
|
||||
var authRPCClients []*AuthRPCClient
|
||||
var remoteControlClnts []*AuthRPCClient
|
||||
for _, export := range exports {
|
||||
if ignoredSet.Contains(export) {
|
||||
// Ignore initializing ignored export.
|
||||
continue
|
||||
}
|
||||
// Validates if remote disk is local.
|
||||
if isLocalStorage(export) {
|
||||
continue
|
||||
@ -68,41 +75,40 @@ func initRemoteControllerClients(srvCmdConfig serverCmdConfig) []*AuthRPCClient
|
||||
remoteHosts.Add(fmt.Sprintf("%s:%d", host, globalMinioPort))
|
||||
}
|
||||
for host := range remoteHosts {
|
||||
authRPCClients = append(authRPCClients, newAuthClient(&authConfig{
|
||||
remoteControlClnts = append(remoteControlClnts, newAuthClient(&authConfig{
|
||||
accessKey: serverConfig.GetCredential().AccessKeyID,
|
||||
secretKey: serverConfig.GetCredential().SecretAccessKey,
|
||||
secureConn: isSSL(),
|
||||
address: host,
|
||||
path: path.Join(reservedBucket, controlPath),
|
||||
loginMethod: "Controller.LoginHandler",
|
||||
loginMethod: "Control.LoginHandler",
|
||||
}))
|
||||
}
|
||||
return authRPCClients
|
||||
return remoteControlClnts
|
||||
}
|
||||
|
||||
// Register controller RPC handlers.
|
||||
func registerControllerRPCRouter(mux *router.Router, srvCmdConfig serverCmdConfig) {
|
||||
// Initialize controller.
|
||||
ctrlHandlers := &controllerAPIHandlers{
|
||||
ObjectAPI: newObjectLayerFn,
|
||||
StorageDisks: srvCmdConfig.storageDisks,
|
||||
timestamp: time.Now().UTC(),
|
||||
// Represents control object which provides handlers for control
|
||||
// operations on server.
|
||||
type controlAPIHandlers struct {
|
||||
ObjectAPI func() ObjectLayer
|
||||
StorageDisks []StorageAPI
|
||||
RemoteControls []*AuthRPCClient
|
||||
LocalNode string
|
||||
}
|
||||
|
||||
// Register control RPC handlers.
|
||||
func registerControlRPCRouter(mux *router.Router, srvCmdConfig serverCmdConfig) {
|
||||
// Initialize Control.
|
||||
ctrlHandlers := &controlAPIHandlers{
|
||||
ObjectAPI: newObjectLayerFn,
|
||||
RemoteControls: initRemoteControlClients(srvCmdConfig),
|
||||
LocalNode: getLocalAddress(srvCmdConfig),
|
||||
StorageDisks: srvCmdConfig.storageDisks,
|
||||
}
|
||||
|
||||
// Initializes remote controller clients.
|
||||
ctrlHandlers.RemoteControllers = initRemoteControllerClients(srvCmdConfig)
|
||||
|
||||
ctrlRPCServer := rpc.NewServer()
|
||||
ctrlRPCServer.RegisterName("Controller", ctrlHandlers)
|
||||
ctrlRPCServer.RegisterName("Control", ctrlHandlers)
|
||||
|
||||
ctrlRouter := mux.NewRoute().PathPrefix(reservedBucket).Subrouter()
|
||||
ctrlRouter.Path(controlPath).Handler(ctrlRPCServer)
|
||||
}
|
||||
|
||||
// Handler for object healing.
|
||||
type controllerAPIHandlers struct {
|
||||
ObjectAPI func() ObjectLayer
|
||||
StorageDisks []StorageAPI
|
||||
RemoteControllers []*AuthRPCClient
|
||||
timestamp time.Time
|
||||
}
|
@ -16,10 +16,73 @@
|
||||
|
||||
package cmd
|
||||
|
||||
import "testing"
|
||||
import (
|
||||
"runtime"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// Tests fetch local address.
|
||||
func TestLocalAddress(t *testing.T) {
|
||||
if runtime.GOOS == "windows" {
|
||||
return
|
||||
}
|
||||
testCases := []struct {
|
||||
srvCmdConfig serverCmdConfig
|
||||
localAddr string
|
||||
}{
|
||||
// Test 1 - local address is found.
|
||||
{
|
||||
srvCmdConfig: serverCmdConfig{
|
||||
isDistXL: true,
|
||||
disks: []string{
|
||||
"localhost:/mnt/disk1",
|
||||
"1.1.1.2:/mnt/disk2",
|
||||
"1.1.2.1:/mnt/disk3",
|
||||
"1.1.2.2:/mnt/disk4",
|
||||
},
|
||||
},
|
||||
localAddr: "localhost:9000",
|
||||
},
|
||||
// Test 2 - local address is everything.
|
||||
{
|
||||
srvCmdConfig: serverCmdConfig{
|
||||
isDistXL: false,
|
||||
disks: []string{
|
||||
"/mnt/disk1",
|
||||
"/mnt/disk2",
|
||||
"/mnt/disk3",
|
||||
"/mnt/disk4",
|
||||
},
|
||||
},
|
||||
localAddr: ":9000",
|
||||
},
|
||||
// Test 3 - local address is not found.
|
||||
{
|
||||
srvCmdConfig: serverCmdConfig{
|
||||
isDistXL: true,
|
||||
disks: []string{
|
||||
"1.1.1.1:/mnt/disk1",
|
||||
"1.1.1.2:/mnt/disk2",
|
||||
"1.1.2.1:/mnt/disk3",
|
||||
"1.1.2.2:/mnt/disk4",
|
||||
},
|
||||
},
|
||||
localAddr: "",
|
||||
},
|
||||
}
|
||||
|
||||
// Validates fetching local address.
|
||||
for i, testCase := range testCases {
|
||||
localAddr := getLocalAddress(testCase.srvCmdConfig)
|
||||
if localAddr != testCase.localAddr {
|
||||
t.Fatalf("Test %d: Expected %s, got %s", i+1, testCase.localAddr, localAddr)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Tests initialization of remote controller clients.
|
||||
func TestInitRemoteControllerClients(t *testing.T) {
|
||||
func TestInitRemoteControlClients(t *testing.T) {
|
||||
rootPath, err := newTestConfig("us-east-1")
|
||||
if err != nil {
|
||||
t.Fatal("Unable to initialize config", err)
|
||||
@ -63,27 +126,11 @@ func TestInitRemoteControllerClients(t *testing.T) {
|
||||
},
|
||||
totalClients: 4,
|
||||
},
|
||||
// Test - 4 2 clients allocated with 4 disks with 1 disk ignored.
|
||||
{
|
||||
srvCmdConfig: serverCmdConfig{
|
||||
isDistXL: true,
|
||||
disks: []string{
|
||||
"10.1.10.1:/mnt/disk1",
|
||||
"10.1.10.2:/mnt/disk2",
|
||||
"10.1.10.3:/mnt/disk3",
|
||||
"10.1.10.4:/mnt/disk4",
|
||||
},
|
||||
ignoredDisks: []string{
|
||||
"10.1.10.1:/mnt/disk1",
|
||||
},
|
||||
},
|
||||
totalClients: 3,
|
||||
},
|
||||
}
|
||||
|
||||
// Evaluate and validate all test cases.
|
||||
for i, testCase := range testCases {
|
||||
rclients := initRemoteControllerClients(testCase.srvCmdConfig)
|
||||
rclients := initRemoteControlClients(testCase.srvCmdConfig)
|
||||
if len(rclients) != testCase.totalClients {
|
||||
t.Errorf("Test %d, Expected %d, got %d RPC clients.", i+1, testCase.totalClients, len(rclients))
|
||||
}
|
@ -65,7 +65,7 @@ func serviceControl(c *cli.Context) {
|
||||
case "stop":
|
||||
signal = serviceStop
|
||||
default:
|
||||
fatalIf(errInvalidArgument, "Unsupported signalling requested %s", c.Args().Get(0))
|
||||
fatalIf(errInvalidArgument, "Unrecognized service %s", c.Args().Get(0))
|
||||
}
|
||||
|
||||
parsedURL, err := url.Parse(c.Args().Get(1))
|
||||
@ -77,18 +77,18 @@ func serviceControl(c *cli.Context) {
|
||||
secureConn: parsedURL.Scheme == "https",
|
||||
address: parsedURL.Host,
|
||||
path: path.Join(reservedBucket, controlPath),
|
||||
loginMethod: "Controller.LoginHandler",
|
||||
loginMethod: "Control.LoginHandler",
|
||||
}
|
||||
client := newAuthClient(authCfg)
|
||||
|
||||
args := &ServiceArgs{
|
||||
Signal: signal,
|
||||
// This is necessary so that the remotes,
|
||||
// don't end up sending requests back and forth.
|
||||
Remote: true,
|
||||
}
|
||||
// This is necessary so that the remotes,
|
||||
// don't end up sending requests back and forth.
|
||||
args.Remote = true
|
||||
reply := &ServiceReply{}
|
||||
err = client.Call("Controller.ServiceHandler", args, reply)
|
||||
err = client.Call("Control.ServiceHandler", args, reply)
|
||||
fatalIf(err, "Service command %s failed for %s", c.Args().Get(0), parsedURL.Host)
|
||||
if signal == serviceStatus {
|
||||
console.Println(getStorageInfoMsg(reply.StorageInfo))
|
||||
|
@ -26,7 +26,7 @@ import (
|
||||
)
|
||||
|
||||
// API suite container common to both FS and XL.
|
||||
type TestRPCControllerSuite struct {
|
||||
type TestRPCControlSuite struct {
|
||||
serverType string
|
||||
testServer TestServer
|
||||
testAuthConf *authConfig
|
||||
@ -34,27 +34,27 @@ type TestRPCControllerSuite struct {
|
||||
|
||||
// Setting up the test suite.
|
||||
// Starting the Test server with temporary FS backend.
|
||||
func (s *TestRPCControllerSuite) SetUpSuite(c *testing.T) {
|
||||
func (s *TestRPCControlSuite) SetUpSuite(c *testing.T) {
|
||||
s.testServer = StartTestControlRPCServer(c, s.serverType)
|
||||
s.testAuthConf = &authConfig{
|
||||
address: s.testServer.Server.Listener.Addr().String(),
|
||||
accessKey: s.testServer.AccessKey,
|
||||
secretKey: s.testServer.SecretKey,
|
||||
path: path.Join(reservedBucket, controlPath),
|
||||
loginMethod: "Controller.LoginHandler",
|
||||
loginMethod: "Control.LoginHandler",
|
||||
}
|
||||
}
|
||||
|
||||
// No longer used with gocheck, but used in explicit teardown code in
|
||||
// each test function. // Called implicitly by "gopkg.in/check.v1"
|
||||
// after all tests are run.
|
||||
func (s *TestRPCControllerSuite) TearDownSuite(c *testing.T) {
|
||||
func (s *TestRPCControlSuite) TearDownSuite(c *testing.T) {
|
||||
s.testServer.Stop()
|
||||
}
|
||||
|
||||
func TestRPCControlLock(t *testing.T) {
|
||||
//setup code
|
||||
s := &TestRPCControllerSuite{serverType: "XL"}
|
||||
s := &TestRPCControlSuite{serverType: "XL"}
|
||||
s.SetUpSuite(t)
|
||||
|
||||
//run test
|
||||
@ -65,7 +65,7 @@ func TestRPCControlLock(t *testing.T) {
|
||||
}
|
||||
|
||||
// Tests to validate the correctness of lock instrumentation control RPC end point.
|
||||
func (s *TestRPCControllerSuite) testRPCControlLock(c *testing.T) {
|
||||
func (s *TestRPCControlSuite) testRPCControlLock(c *testing.T) {
|
||||
expectedResult := []lockStateCase{
|
||||
// Test case - 1.
|
||||
// Case where 10 read locks are held.
|
||||
@ -188,9 +188,9 @@ func (s *TestRPCControllerSuite) testRPCControlLock(c *testing.T) {
|
||||
defer client.Close()
|
||||
|
||||
args := &GenericArgs{}
|
||||
reply := &SystemLockState{}
|
||||
reply := make(map[string]*SystemLockState)
|
||||
// Call the lock instrumentation RPC end point.
|
||||
err := client.Call("Controller.LockInfo", args, reply)
|
||||
err := client.Call("Control.LockInfo", args, &reply)
|
||||
if err != nil {
|
||||
c.Errorf("Add: expected no error but got string %q", err.Error())
|
||||
}
|
||||
@ -198,11 +198,11 @@ func (s *TestRPCControllerSuite) testRPCControlLock(c *testing.T) {
|
||||
expectedLockStats := expectedResult[0]
|
||||
// verify the actual lock info with the expected one.
|
||||
// verify the existence entry for first read lock (read lock with opsID "0").
|
||||
verifyRPCLockInfoResponse(expectedLockStats, *reply, c, 1)
|
||||
verifyRPCLockInfoResponse(expectedLockStats, reply, c, 1)
|
||||
expectedLockStats = expectedResult[1]
|
||||
// verify the actual lock info with the expected one.
|
||||
// verify the existence entry for last read lock (read lock with opsID "9").
|
||||
verifyRPCLockInfoResponse(expectedLockStats, *reply, c, 2)
|
||||
verifyRPCLockInfoResponse(expectedLockStats, reply, c, 2)
|
||||
|
||||
// now hold a write lock in a different go routine and it should block since 10 read locks are
|
||||
// still held.
|
||||
@ -217,13 +217,13 @@ func (s *TestRPCControllerSuite) testRPCControlLock(c *testing.T) {
|
||||
// count of running locks should increase by 1.
|
||||
|
||||
// Call the RPC control handle to fetch the lock instrumentation info.
|
||||
reply = &SystemLockState{}
|
||||
reply = make(map[string]*SystemLockState)
|
||||
// Call the lock instrumentation RPC end point.
|
||||
err = client.Call("Controller.LockInfo", args, reply)
|
||||
err = client.Call("Control.LockInfo", args, &reply)
|
||||
if err != nil {
|
||||
c.Errorf("Add: expected no error but got string %q", err.Error())
|
||||
}
|
||||
verifyRPCLockInfoResponse(expectedWLockStats, *reply, c, 4)
|
||||
verifyRPCLockInfoResponse(expectedWLockStats, reply, c, 4)
|
||||
|
||||
// release the write lock.
|
||||
nsMutex.Unlock("my-bucket", "my-object", strconv.Itoa(10))
|
||||
@ -237,13 +237,13 @@ func (s *TestRPCControllerSuite) testRPCControlLock(c *testing.T) {
|
||||
expectedLockStats = expectedResult[2]
|
||||
|
||||
// Call the RPC control handle to fetch the lock instrumentation info.
|
||||
reply = &SystemLockState{}
|
||||
reply = make(map[string]*SystemLockState)
|
||||
// Call the lock instrumentation RPC end point.
|
||||
err = client.Call("Controller.LockInfo", args, reply)
|
||||
err = client.Call("Control.LockInfo", args, &reply)
|
||||
if err != nil {
|
||||
c.Errorf("Add: expected no error but got string %q", err.Error())
|
||||
}
|
||||
verifyRPCLockInfoResponse(expectedLockStats, *reply, c, 3)
|
||||
verifyRPCLockInfoResponse(expectedLockStats, reply, c, 3)
|
||||
// Release all the read locks held.
|
||||
// the blocked write lock in the above go routines should get unblocked.
|
||||
for i := 0; i < 10; i++ {
|
||||
@ -252,98 +252,99 @@ func (s *TestRPCControllerSuite) testRPCControlLock(c *testing.T) {
|
||||
wg.Wait()
|
||||
// Since all the locks are released. There should not be any entry in the lock info.
|
||||
// and all the counters should be set to 0.
|
||||
reply = &SystemLockState{}
|
||||
reply = make(map[string]*SystemLockState)
|
||||
// Call the lock instrumentation RPC end point.
|
||||
err = client.Call("Controller.LockInfo", args, reply)
|
||||
err = client.Call("Control.LockInfo", args, &reply)
|
||||
if err != nil {
|
||||
c.Errorf("Add: expected no error but got string %q", err.Error())
|
||||
}
|
||||
|
||||
if reply.TotalAcquiredLocks != 0 && reply.TotalLocks != 0 && reply.TotalBlockedLocks != 0 {
|
||||
c.Fatalf("The counters are not reset properly after all locks are released")
|
||||
}
|
||||
if len(reply.LocksInfoPerObject) != 0 {
|
||||
c.Fatalf("Since all locks are released there shouldn't have been any lock info entry, but found %d", len(reply.LocksInfoPerObject))
|
||||
for _, rpcLockInfo := range reply {
|
||||
if rpcLockInfo.TotalAcquiredLocks != 0 && rpcLockInfo.TotalLocks != 0 && rpcLockInfo.TotalBlockedLocks != 0 {
|
||||
c.Fatalf("The counters are not reset properly after all locks are released")
|
||||
}
|
||||
if len(rpcLockInfo.LocksInfoPerObject) != 0 {
|
||||
c.Fatalf("Since all locks are released there shouldn't have been any lock info entry, but found %d", len(rpcLockInfo.LocksInfoPerObject))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestControllerHealDiskMetadataH(t *testing.T) {
|
||||
func TestControlHealDiskMetadataH(t *testing.T) {
|
||||
//setup code
|
||||
s := &TestRPCControllerSuite{serverType: "XL"}
|
||||
s := &TestRPCControlSuite{serverType: "XL"}
|
||||
s.SetUpSuite(t)
|
||||
|
||||
//run test
|
||||
s.testControllerHealDiskMetadataH(t)
|
||||
s.testControlHealDiskMetadataH(t)
|
||||
|
||||
//teardown code
|
||||
s.TearDownSuite(t)
|
||||
}
|
||||
|
||||
// TestControllerHandlerHealDiskMetadata - Registers and call the `HealDiskMetadataHandler`,
|
||||
// asserts to validate the success.
|
||||
func (s *TestRPCControllerSuite) testControllerHealDiskMetadataH(c *testing.T) {
|
||||
// TestControlHandlerHealDiskMetadata - Registers and call the `HealDiskMetadataHandler`, asserts to validate the success.
|
||||
func (s *TestRPCControlSuite) testControlHealDiskMetadataH(c *testing.T) {
|
||||
// The suite has already started the test RPC server, just send RPC calls.
|
||||
client := newAuthClient(s.testAuthConf)
|
||||
defer client.Close()
|
||||
|
||||
args := &GenericArgs{}
|
||||
reply := &GenericReply{}
|
||||
err := client.Call("Controller.HealDiskMetadataHandler", args, reply)
|
||||
err := client.Call("Control.HealDiskMetadataHandler", args, reply)
|
||||
if err != nil {
|
||||
c.Errorf("Control.HealDiskMetadataH - test failed with <ERROR> %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestControllerHealObjectH(t *testing.T) {
|
||||
func TestControlHealObjectH(t *testing.T) {
|
||||
//setup code
|
||||
s := &TestRPCControllerSuite{serverType: "XL"}
|
||||
s := &TestRPCControlSuite{serverType: "XL"}
|
||||
s.SetUpSuite(t)
|
||||
|
||||
//run test
|
||||
s.testControllerHealObjectH(t)
|
||||
s.testControlHealObjectH(t)
|
||||
|
||||
//teardown code
|
||||
s.TearDownSuite(t)
|
||||
}
|
||||
|
||||
func (s *TestRPCControllerSuite) testControllerHealObjectH(t *testing.T) {
|
||||
func (s *TestRPCControlSuite) testControlHealObjectH(t *testing.T) {
|
||||
client := newAuthClient(s.testAuthConf)
|
||||
defer client.Close()
|
||||
|
||||
err := newObjectLayerFn().MakeBucket("testbucket")
|
||||
if err != nil {
|
||||
t.Fatalf(
|
||||
"Controller.HealObjectH - create bucket failed with <ERROR> %s", err)
|
||||
"Control.HealObjectH - create bucket failed with <ERROR> %s", err)
|
||||
}
|
||||
|
||||
datum := strings.NewReader("a")
|
||||
_, err = newObjectLayerFn().PutObject("testbucket", "testobject", 1, datum, nil, "")
|
||||
if err != nil {
|
||||
t.Fatalf("Controller.HealObjectH - put object failed with <ERROR> %s", err)
|
||||
t.Fatalf("Control.HealObjectH - put object failed with <ERROR> %s", err)
|
||||
}
|
||||
|
||||
args := &HealObjectArgs{GenericArgs{}, "testbucket", "testobject"}
|
||||
reply := &GenericReply{}
|
||||
err = client.Call("Controller.HealObjectHandler", args, reply)
|
||||
err = client.Call("Control.HealObjectHandler", args, reply)
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("Controller.HealObjectH - test failed with <ERROR> %s", err)
|
||||
t.Errorf("Control.HealObjectH - test failed with <ERROR> %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestControllerListObjectsHealH(t *testing.T) {
|
||||
func TestControlListObjectsHealH(t *testing.T) {
|
||||
//setup code
|
||||
s := &TestRPCControllerSuite{serverType: "XL"}
|
||||
s := &TestRPCControlSuite{serverType: "XL"}
|
||||
s.SetUpSuite(t)
|
||||
|
||||
//run test
|
||||
s.testControllerListObjectsHealH(t)
|
||||
s.testControlListObjectsHealH(t)
|
||||
|
||||
//teardown code
|
||||
s.TearDownSuite(t)
|
||||
}
|
||||
|
||||
func (s *TestRPCControllerSuite) testControllerListObjectsHealH(t *testing.T) {
|
||||
func (s *TestRPCControlSuite) testControlListObjectsHealH(t *testing.T) {
|
||||
client := newAuthClient(s.testAuthConf)
|
||||
defer client.Close()
|
||||
|
||||
@ -351,13 +352,13 @@ func (s *TestRPCControllerSuite) testControllerListObjectsHealH(t *testing.T) {
|
||||
err := newObjectLayerFn().MakeBucket("testbucket")
|
||||
if err != nil {
|
||||
t.Fatalf(
|
||||
"Controller.ListObjectsHealH - create bucket failed - %s", err)
|
||||
"Control.ListObjectsHealH - create bucket failed - %s", err)
|
||||
}
|
||||
|
||||
r := strings.NewReader("0")
|
||||
_, err = newObjectLayerFn().PutObject("testbucket", "testObj-0", 1, r, nil, "")
|
||||
if err != nil {
|
||||
t.Fatalf("Controller.ListObjectsHealH - object creation failed - %s", err)
|
||||
t.Fatalf("Control.ListObjectsHealH - object creation failed - %s", err)
|
||||
}
|
||||
|
||||
args := &HealListArgs{
|
||||
@ -365,9 +366,9 @@ func (s *TestRPCControllerSuite) testControllerListObjectsHealH(t *testing.T) {
|
||||
"", "", 100,
|
||||
}
|
||||
reply := &GenericReply{}
|
||||
err = client.Call("Controller.ListObjectsHealHandler", args, reply)
|
||||
err = client.Call("Control.ListObjectsHealHandler", args, reply)
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("Controller.ListObjectsHealHandler - test failed - %s", err)
|
||||
t.Errorf("Control.ListObjectsHealHandler - test failed - %s", err)
|
||||
}
|
||||
}
|
@ -22,29 +22,45 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
type statusType string
|
||||
|
||||
const (
|
||||
debugRLockStr = "RLock"
|
||||
debugWLockStr = "WLock"
|
||||
runningStatus statusType = "Running"
|
||||
readyStatus statusType = "Ready"
|
||||
blockedStatus statusType = "Blocked"
|
||||
)
|
||||
|
||||
// struct containing information of status (ready/running/blocked) of an operation with given operation ID.
|
||||
type lockType string
|
||||
|
||||
const (
|
||||
debugRLockStr lockType = "RLock"
|
||||
debugWLockStr lockType = "WLock"
|
||||
)
|
||||
|
||||
// Struct containing information of status (ready/running/blocked) of an operation with given operation ID.
|
||||
type debugLockInfo struct {
|
||||
lockType string // "Rlock" or "WLock".
|
||||
lockOrigin string // contains the trace of the function which invoked the lock, obtained from runtime.
|
||||
status string // status can be running/ready/blocked.
|
||||
since time.Time // time info of the since how long the status holds true.
|
||||
// "RLock" or "WLock".
|
||||
lType lockType
|
||||
// Contains the trace of the function which invoked the lock, obtained from runtime.
|
||||
lockOrigin string
|
||||
// Status can be running/ready/blocked.
|
||||
status statusType
|
||||
// Time info of the since how long the status holds true.
|
||||
since time.Time
|
||||
}
|
||||
|
||||
// debugLockInfo - container for storing locking information for unique copy (volume,path) pair.
|
||||
// ref variable holds the reference count for locks held for.
|
||||
// debugLockInfo - container for storing locking information for unique copy
|
||||
// (volume,path) pair. ref variable holds the reference count for locks held for.
|
||||
// `ref` values helps us understand the n locks held for given <volume, path> pair.
|
||||
// `running` value helps us understand the total successful locks held (not blocked) for given <volume, path> pair and the operation is under execution.
|
||||
// `blocked` value helps us understand the total number of operations blocked waiting on locks for given <volume,path> pair.
|
||||
// `running` value helps us understand the total successful locks held (not blocked)
|
||||
// for given <volume, path> pair and the operation is under execution. `blocked`
|
||||
// value helps us understand the total number of operations blocked waiting on
|
||||
// locks for given <volume,path> pair.
|
||||
type debugLockInfoPerVolumePath struct {
|
||||
ref int64 // running + blocked operations.
|
||||
running int64 // count of successful lock acquire and running operations.
|
||||
blocked int64 // count of number of operations blocked waiting on lock.
|
||||
lockInfo (map[string]debugLockInfo) // map of [operationID] debugLockInfo{operation, status, since} .
|
||||
ref int64 // running + blocked operations.
|
||||
running int64 // count of successful lock acquire and running operations.
|
||||
blocked int64 // count of number of operations blocked waiting on lock.
|
||||
lockInfo map[string]debugLockInfo // map of [opsID] debugLockInfo{operation, status, since} .
|
||||
}
|
||||
|
||||
// returns an instance of debugLockInfo.
|
||||
@ -62,15 +78,15 @@ func newDebugLockInfoPerVolumePath() *debugLockInfoPerVolumePath {
|
||||
// LockInfoOriginNotFound - While changing the state of the lock info its important that the entry for
|
||||
// lock at a given origin exists, if not `LockInfoOriginNotFound` is returned.
|
||||
type LockInfoOriginNotFound struct {
|
||||
volume string
|
||||
path string
|
||||
operationID string
|
||||
lockOrigin string
|
||||
volume string
|
||||
path string
|
||||
opsID string
|
||||
lockOrigin string
|
||||
}
|
||||
|
||||
func (l LockInfoOriginNotFound) Error() string {
|
||||
return fmt.Sprintf("No lock state stored for the lock origined at \"%s\", for <volume> %s, <path> %s, <operationID> %s.",
|
||||
l.lockOrigin, l.volume, l.path, l.operationID)
|
||||
return fmt.Sprintf("No lock state stored for the lock origined at \"%s\", for <volume> %s, <path> %s, <opsID> %s.",
|
||||
l.lockOrigin, l.volume, l.path, l.opsID)
|
||||
}
|
||||
|
||||
// LockInfoVolPathMssing - Error interface. Returned when the info the
|
||||
@ -86,79 +102,80 @@ func (l LockInfoVolPathMssing) Error() string {
|
||||
// LockInfoOpsIDNotFound - Returned when the lock state info exists, but the entry for
|
||||
// given operation ID doesn't exist.
|
||||
type LockInfoOpsIDNotFound struct {
|
||||
volume string
|
||||
path string
|
||||
operationID string
|
||||
volume string
|
||||
path string
|
||||
opsID string
|
||||
}
|
||||
|
||||
func (l LockInfoOpsIDNotFound) Error() string {
|
||||
return fmt.Sprintf("No entry in lock info for <Operation ID> %s, <volume> %s, <path> %s.", l.operationID, l.volume, l.path)
|
||||
return fmt.Sprintf("No entry in lock info for <Operation ID> %s, <volume> %s, <path> %s.", l.opsID, l.volume, l.path)
|
||||
}
|
||||
|
||||
// LockInfoStateNotBlocked - When an attempt to change the state of the lock form `blocked` to `running` is done,
|
||||
// its necessary that the state before the transsition is "blocked", otherwise LockInfoStateNotBlocked returned.
|
||||
type LockInfoStateNotBlocked struct {
|
||||
volume string
|
||||
path string
|
||||
operationID string
|
||||
volume string
|
||||
path string
|
||||
opsID string
|
||||
}
|
||||
|
||||
func (l LockInfoStateNotBlocked) Error() string {
|
||||
return fmt.Sprintf("Lock state should be \"Blocked\" for <volume> %s, <path> %s, <operationID> %s.", l.volume, l.path, l.operationID)
|
||||
return fmt.Sprintf("Lock state should be \"Blocked\" for <volume> %s, <path> %s, <opsID> %s.", l.volume, l.path, l.opsID)
|
||||
}
|
||||
|
||||
var errLockNotInitialized = errors.New("Debug lockMap not initialized.")
|
||||
|
||||
// change the state of the lock from Blocked to Running.
|
||||
func (n *nsLockMap) statusBlockedToRunning(param nsParam, lockOrigin, operationID string, readLock bool) error {
|
||||
// Initialize lock info volume path.
|
||||
func (n *nsLockMap) initLockInfoForVolumePath(param nsParam) {
|
||||
n.debugLockMap[param] = newDebugLockInfoPerVolumePath()
|
||||
}
|
||||
|
||||
// Change the state of the lock from Blocked to Running.
|
||||
func (n *nsLockMap) statusBlockedToRunning(param nsParam, lockOrigin, opsID string, readLock bool) error {
|
||||
// This operation is not executed under the scope nsLockMap.mutex.Lock(), lock has to be explicitly held here.
|
||||
n.lockMapMutex.Lock()
|
||||
defer n.lockMapMutex.Unlock()
|
||||
// new state info to be set for the lock.
|
||||
newLockInfo := debugLockInfo{
|
||||
lockOrigin: lockOrigin,
|
||||
status: "Running",
|
||||
status: runningStatus,
|
||||
since: time.Now().UTC(),
|
||||
}
|
||||
|
||||
// set lock type.
|
||||
// Set lock type.
|
||||
if readLock {
|
||||
newLockInfo.lockType = debugRLockStr
|
||||
newLockInfo.lType = debugRLockStr
|
||||
} else {
|
||||
newLockInfo.lockType = debugWLockStr
|
||||
newLockInfo.lType = debugWLockStr
|
||||
}
|
||||
|
||||
// check whether the lock info entry for <volume, path> pair already exists and its not `nil`.
|
||||
lockInfo, ok := n.debugLockMap[param]
|
||||
// Check whether the lock info entry for <volume, path> pair already exists and its not `nil`.
|
||||
debugLockMap, ok := n.debugLockMap[param]
|
||||
if !ok {
|
||||
// The lock state info for given <volume, path> pair should already exist.
|
||||
// The lock state info foe given <volume, path> pair should already exist.
|
||||
// If not return `LockInfoVolPathMssing`.
|
||||
return LockInfoVolPathMssing{param.volume, param.path}
|
||||
}
|
||||
// Lock info the for the given operation ID shouldn't be `nil`.
|
||||
if lockInfo == nil {
|
||||
// ``debugLockMap`` entry containing lock info for `param <volume, path>` is `nil`.
|
||||
if debugLockMap == nil {
|
||||
return errLockNotInitialized
|
||||
}
|
||||
lockInfoOpID, ok := n.debugLockMap[param].lockInfo[operationID]
|
||||
lockInfo, ok := n.debugLockMap[param].lockInfo[opsID]
|
||||
if !ok {
|
||||
// The lock info entry for given `opsID` should already exist for given <volume, path> pair.
|
||||
// If not return `LockInfoOpsIDNotFound`.
|
||||
return LockInfoOpsIDNotFound{param.volume, param.path, operationID}
|
||||
return LockInfoOpsIDNotFound{param.volume, param.path, opsID}
|
||||
}
|
||||
// The entry for the lock origined at `lockOrigin` should already exist.
|
||||
// If not return `LockInfoOriginNotFound`.
|
||||
if lockInfoOpID.lockOrigin != lockOrigin {
|
||||
return LockInfoOriginNotFound{param.volume, param.path, operationID, lockOrigin}
|
||||
// The entry for the lock origined at `lockOrigin` should already exist. If not return `LockInfoOriginNotFound`.
|
||||
if lockInfo.lockOrigin != lockOrigin {
|
||||
return LockInfoOriginNotFound{param.volume, param.path, opsID, lockOrigin}
|
||||
}
|
||||
// Status of the lock should already be set to "Blocked".
|
||||
// If not return `LockInfoStateNotBlocked`.
|
||||
if lockInfoOpID.status != "Blocked" {
|
||||
return LockInfoStateNotBlocked{param.volume, param.path, operationID}
|
||||
// Status of the lock should already be set to "Blocked". If not return `LockInfoStateNotBlocked`.
|
||||
if lockInfo.status != blockedStatus {
|
||||
return LockInfoStateNotBlocked{param.volume, param.path, opsID}
|
||||
}
|
||||
|
||||
// All checks finished.
|
||||
// changing the status of the operation from blocked to running and updating the time.
|
||||
n.debugLockMap[param].lockInfo[operationID] = newLockInfo
|
||||
// All checks finished. Changing the status of the operation from blocked to running and updating the time.
|
||||
n.debugLockMap[param].lockInfo[opsID] = newLockInfo
|
||||
|
||||
// After locking unblocks decrease the blocked counter.
|
||||
n.blockedCounter--
|
||||
@ -169,21 +186,17 @@ func (n *nsLockMap) statusBlockedToRunning(param nsParam, lockOrigin, operationI
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *nsLockMap) initLockInfoForVolumePath(param nsParam) {
|
||||
n.debugLockMap[param] = newDebugLockInfoPerVolumePath()
|
||||
}
|
||||
|
||||
// change the state of the lock from Ready to Blocked.
|
||||
func (n *nsLockMap) statusNoneToBlocked(param nsParam, lockOrigin, operationID string, readLock bool) error {
|
||||
// Change the state of the lock from Ready to Blocked.
|
||||
func (n *nsLockMap) statusNoneToBlocked(param nsParam, lockOrigin, opsID string, readLock bool) error {
|
||||
newLockInfo := debugLockInfo{
|
||||
lockOrigin: lockOrigin,
|
||||
status: "Blocked",
|
||||
status: blockedStatus,
|
||||
since: time.Now().UTC(),
|
||||
}
|
||||
if readLock {
|
||||
newLockInfo.lockType = debugRLockStr
|
||||
newLockInfo.lType = debugRLockStr
|
||||
} else {
|
||||
newLockInfo.lockType = debugWLockStr
|
||||
newLockInfo.lType = debugWLockStr
|
||||
}
|
||||
|
||||
lockInfo, ok := n.debugLockMap[param]
|
||||
@ -192,15 +205,16 @@ func (n *nsLockMap) statusNoneToBlocked(param nsParam, lockOrigin, operationID s
|
||||
n.initLockInfoForVolumePath(param)
|
||||
}
|
||||
if lockInfo == nil {
|
||||
// *debugLockInfoPerVolumePath entry is nil, initialize here to avoid any case of `nil` pointer access.
|
||||
// *lockInfo is nil, initialize here.
|
||||
n.initLockInfoForVolumePath(param)
|
||||
}
|
||||
|
||||
// lockInfo is a map[string]debugLockInfo, which holds map[OperationID]{status,time, origin} of the lock.
|
||||
if n.debugLockMap[param].lockInfo == nil {
|
||||
n.debugLockMap[param].lockInfo = make(map[string]debugLockInfo)
|
||||
}
|
||||
// The status of the operation with the given operation ID is marked blocked till its gets unblocked from the lock.
|
||||
n.debugLockMap[param].lockInfo[operationID] = newLockInfo
|
||||
n.debugLockMap[param].lockInfo[opsID] = newLockInfo
|
||||
// Increment the Global lock counter.
|
||||
n.globalLockCounter++
|
||||
// Increment the counter for number of blocked opertions, decrement it after the locking unblocks.
|
||||
@ -212,7 +226,8 @@ func (n *nsLockMap) statusNoneToBlocked(param nsParam, lockOrigin, operationID s
|
||||
return nil
|
||||
}
|
||||
|
||||
// deleteLockInfoEntry - Deletes the lock state information for given <volume, path> pair. Called when nsLk.ref count is 0.
|
||||
// deleteLockInfoEntry - Deletes the lock state information for given
|
||||
// <volume, path> pair. Called when nsLk.ref count is 0.
|
||||
func (n *nsLockMap) deleteLockInfoEntryForVolumePath(param nsParam) error {
|
||||
// delete the lock info for the given operation.
|
||||
if _, found := n.debugLockMap[param]; !found {
|
||||
@ -223,32 +238,36 @@ func (n *nsLockMap) deleteLockInfoEntryForVolumePath(param nsParam) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// deleteLockInfoEntry - Deletes the entry for given opsID in the lock state information of given <volume, path> pair.
|
||||
// called when the nsLk ref count for the given <volume, path> pair is not 0.
|
||||
func (n *nsLockMap) deleteLockInfoEntryForOps(param nsParam, operationID string) error {
|
||||
// deleteLockInfoEntry - Deletes the entry for given opsID in the lock state information
|
||||
// of given <volume, path> pair. Called when the nsLk ref count for the given
|
||||
// <volume, path> pair is not 0.
|
||||
func (n *nsLockMap) deleteLockInfoEntryForOps(param nsParam, opsID string) error {
|
||||
// delete the lock info for the given operation.
|
||||
infoMap, found := n.debugLockMap[param]
|
||||
if !found {
|
||||
return LockInfoVolPathMssing{param.volume, param.path}
|
||||
}
|
||||
// the opertion finished holding the lock on the resource, remove the entry for the given operation with the operation ID.
|
||||
if _, foundInfo := infoMap.lockInfo[operationID]; !foundInfo {
|
||||
// The opertion finished holding the lock on the resource, remove
|
||||
// the entry for the given operation with the operation ID.
|
||||
_, foundInfo := infoMap.lockInfo[opsID]
|
||||
if !foundInfo {
|
||||
// Unlock request with invalid opertion ID not accepted.
|
||||
return LockInfoOpsIDNotFound{param.volume, param.path, operationID}
|
||||
return LockInfoOpsIDNotFound{param.volume, param.path, opsID}
|
||||
}
|
||||
// decrease the global running and lock reference counter.
|
||||
// Decrease the global running and lock reference counter.
|
||||
n.runningLockCounter--
|
||||
n.globalLockCounter--
|
||||
// decrease the lock referee counter for the lock info for given <volume,path> pair.
|
||||
// decrease the running operation number. Its assumed that the operation is over once an attempt to release the lock is made.
|
||||
// Decrease the lock referee counter for the lock info for given <volume,path> pair.
|
||||
// Decrease the running operation number. Its assumed that the operation is over
|
||||
// once an attempt to release the lock is made.
|
||||
infoMap.running--
|
||||
// decrease the total reference count of locks jeld on <volume,path> pair.
|
||||
// Decrease the total reference count of locks jeld on <volume,path> pair.
|
||||
infoMap.ref--
|
||||
delete(infoMap.lockInfo, operationID)
|
||||
delete(infoMap.lockInfo, opsID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// return randomly generated string ID
|
||||
// Return randomly generated string ID
|
||||
func getOpsID() string {
|
||||
return string(generateRequestID())
|
||||
}
|
||||
|
@ -29,95 +29,94 @@ type lockStateCase struct {
|
||||
readLock bool // lock type.
|
||||
setBlocked bool // initialize the initial state to blocked.
|
||||
expectedErr error
|
||||
// expected global lock stats.
|
||||
expectedLockStatus string // Status of the lock Blocked/Running.
|
||||
// Expected global lock stats.
|
||||
expectedLockStatus statusType // Status of the lock Blocked/Running.
|
||||
|
||||
expectedGlobalLockCount int // Total number of locks held across the system, includes blocked + held locks.
|
||||
expectedBlockedLockCount int // Total blocked lock across the system.
|
||||
expectedRunningLockCount int // Total successfully held locks (non-blocking).
|
||||
// expected lock statu for given <volume, path> pair.
|
||||
// Expected lock status for given <volume, path> pair.
|
||||
expectedVolPathLockCount int // Total locks held for given <volume,path> pair, includes blocked locks.
|
||||
expectedVolPathRunningCount int // Total succcesfully held locks for given <volume, path> pair.
|
||||
expectedVolPathBlockCount int // Total locks blocked on the given <volume, path> pair.
|
||||
}
|
||||
|
||||
// Used for validating the Lock info obtaining from contol RPC end point for obtaining lock related info.
|
||||
func verifyRPCLockInfoResponse(l lockStateCase, rpcLockInfoResponse SystemLockState, t TestErrHandler, testNum int) {
|
||||
// Assert the total number of locks (locked + acquired) in the system.
|
||||
if rpcLockInfoResponse.TotalLocks != int64(l.expectedGlobalLockCount) {
|
||||
t.Fatalf("Test %d: Expected the global lock counter to be %v, but got %v", testNum, int64(l.expectedGlobalLockCount),
|
||||
rpcLockInfoResponse.TotalLocks)
|
||||
}
|
||||
func verifyRPCLockInfoResponse(l lockStateCase, rpcLockInfoMap map[string]*SystemLockState, t TestErrHandler, testNum int) {
|
||||
for _, rpcLockInfoResponse := range rpcLockInfoMap {
|
||||
// Assert the total number of locks (locked + acquired) in the system.
|
||||
if rpcLockInfoResponse.TotalLocks != int64(l.expectedGlobalLockCount) {
|
||||
t.Fatalf("Test %d: Expected the global lock counter to be %v, but got %v", testNum, int64(l.expectedGlobalLockCount),
|
||||
rpcLockInfoResponse.TotalLocks)
|
||||
}
|
||||
|
||||
// verify the count for total blocked locks.
|
||||
if rpcLockInfoResponse.TotalBlockedLocks != int64(l.expectedBlockedLockCount) {
|
||||
t.Fatalf("Test %d: Expected the total blocked lock counter to be %v, but got %v", testNum, int64(l.expectedBlockedLockCount),
|
||||
rpcLockInfoResponse.TotalBlockedLocks)
|
||||
}
|
||||
// verify the count for total blocked locks.
|
||||
if rpcLockInfoResponse.TotalBlockedLocks != int64(l.expectedBlockedLockCount) {
|
||||
t.Fatalf("Test %d: Expected the total blocked lock counter to be %v, but got %v", testNum, int64(l.expectedBlockedLockCount),
|
||||
rpcLockInfoResponse.TotalBlockedLocks)
|
||||
}
|
||||
|
||||
// verify the count for total running locks.
|
||||
if rpcLockInfoResponse.TotalAcquiredLocks != int64(l.expectedRunningLockCount) {
|
||||
t.Fatalf("Test %d: Expected the total running lock counter to be %v, but got %v", testNum, int64(l.expectedRunningLockCount),
|
||||
rpcLockInfoResponse.TotalAcquiredLocks)
|
||||
}
|
||||
// verify the count for total running locks.
|
||||
if rpcLockInfoResponse.TotalAcquiredLocks != int64(l.expectedRunningLockCount) {
|
||||
t.Fatalf("Test %d: Expected the total running lock counter to be %v, but got %v", testNum, int64(l.expectedRunningLockCount),
|
||||
rpcLockInfoResponse.TotalAcquiredLocks)
|
||||
}
|
||||
|
||||
for _, locksInfoPerObject := range rpcLockInfoResponse.LocksInfoPerObject {
|
||||
// See whether the entry for the <bucket, object> exists in the RPC response.
|
||||
if locksInfoPerObject.Bucket == l.volume && locksInfoPerObject.Object == l.path {
|
||||
// Assert the total number of locks (blocked + acquired) for the given <buckt, object> pair.
|
||||
if locksInfoPerObject.LocksOnObject != int64(l.expectedVolPathLockCount) {
|
||||
t.Errorf("Test %d: Expected the total lock count for bucket: \"%s\", object: \"%s\" to be %v, but got %v", testNum,
|
||||
l.volume, l.path, int64(l.expectedVolPathLockCount), locksInfoPerObject.LocksOnObject)
|
||||
}
|
||||
// Assert the total number of acquired locks for the given <buckt, object> pair.
|
||||
if locksInfoPerObject.LocksAcquiredOnObject != int64(l.expectedVolPathRunningCount) {
|
||||
t.Errorf("Test %d: Expected the acquired lock count for bucket: \"%s\", object: \"%s\" to be %v, but got %v", testNum,
|
||||
l.volume, l.path, int64(l.expectedVolPathRunningCount), locksInfoPerObject.LocksAcquiredOnObject)
|
||||
}
|
||||
// Assert the total number of blocked locks for the given <buckt, object> pair.
|
||||
if locksInfoPerObject.TotalBlockedLocks != int64(l.expectedVolPathBlockCount) {
|
||||
t.Errorf("Test %d: Expected the blocked lock count for bucket: \"%s\", object: \"%s\" to be %v, but got %v", testNum,
|
||||
l.volume, l.path, int64(l.expectedVolPathBlockCount), locksInfoPerObject.TotalBlockedLocks)
|
||||
}
|
||||
// Flag to mark whether there's an entry in the RPC lock info response for given opsID.
|
||||
var opsIDfound bool
|
||||
for _, opsLockState := range locksInfoPerObject.LockDetailsOnObject {
|
||||
// first check whether the entry for the given operation ID exists.
|
||||
if opsLockState.OperationID == l.opsID {
|
||||
opsIDfound = true
|
||||
// asserting the type of lock (RLock/WLock) from the RPC lock info response.
|
||||
if l.readLock {
|
||||
if opsLockState.LockType != debugRLockStr {
|
||||
t.Errorf("Test case %d: Expected the lock type to be \"%s\"", testNum, debugRLockStr)
|
||||
for _, locksInfoPerObject := range rpcLockInfoResponse.LocksInfoPerObject {
|
||||
// See whether the entry for the <bucket, object> exists in the RPC response.
|
||||
if locksInfoPerObject.Bucket == l.volume && locksInfoPerObject.Object == l.path {
|
||||
// Assert the total number of locks (blocked + acquired) for the given <buckt, object> pair.
|
||||
if locksInfoPerObject.LocksOnObject != int64(l.expectedVolPathLockCount) {
|
||||
t.Errorf("Test %d: Expected the total lock count for bucket: \"%s\", object: \"%s\" to be %v, but got %v", testNum,
|
||||
l.volume, l.path, int64(l.expectedVolPathLockCount), locksInfoPerObject.LocksOnObject)
|
||||
}
|
||||
// Assert the total number of acquired locks for the given <buckt, object> pair.
|
||||
if locksInfoPerObject.LocksAcquiredOnObject != int64(l.expectedVolPathRunningCount) {
|
||||
t.Errorf("Test %d: Expected the acquired lock count for bucket: \"%s\", object: \"%s\" to be %v, but got %v", testNum,
|
||||
l.volume, l.path, int64(l.expectedVolPathRunningCount), locksInfoPerObject.LocksAcquiredOnObject)
|
||||
}
|
||||
// Assert the total number of blocked locks for the given <buckt, object> pair.
|
||||
if locksInfoPerObject.TotalBlockedLocks != int64(l.expectedVolPathBlockCount) {
|
||||
t.Errorf("Test %d: Expected the blocked lock count for bucket: \"%s\", object: \"%s\" to be %v, but got %v", testNum,
|
||||
l.volume, l.path, int64(l.expectedVolPathBlockCount), locksInfoPerObject.TotalBlockedLocks)
|
||||
}
|
||||
// Flag to mark whether there's an entry in the RPC lock info response for given opsID.
|
||||
var opsIDfound bool
|
||||
for _, opsLockState := range locksInfoPerObject.LockDetailsOnObject {
|
||||
// first check whether the entry for the given operation ID exists.
|
||||
if opsLockState.OperationID == l.opsID {
|
||||
opsIDfound = true
|
||||
// asserting the type of lock (RLock/WLock) from the RPC lock info response.
|
||||
if l.readLock {
|
||||
if opsLockState.LockType != debugRLockStr {
|
||||
t.Errorf("Test case %d: Expected the lock type to be \"%s\"", testNum, debugRLockStr)
|
||||
}
|
||||
} else {
|
||||
if opsLockState.LockType != debugWLockStr {
|
||||
t.Errorf("Test case %d: Expected the lock type to be \"%s\"", testNum, debugWLockStr)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if opsLockState.LockType != debugWLockStr {
|
||||
t.Errorf("Test case %d: Expected the lock type to be \"%s\"", testNum, debugWLockStr)
|
||||
|
||||
if opsLockState.Status != l.expectedLockStatus {
|
||||
t.Errorf("Test case %d: Expected the status of the operation to be \"%s\", got \"%s\"", testNum, l.expectedLockStatus, opsLockState.Status)
|
||||
}
|
||||
}
|
||||
|
||||
if opsLockState.Status != l.expectedLockStatus {
|
||||
t.Errorf("Test case %d: Expected the status of the operation to be \"%s\", got \"%s\"", testNum, l.expectedLockStatus, opsLockState.Status)
|
||||
// all check satisfied, return here.
|
||||
// Any mismatch in the earlier checks would have ended the tests due to `Fatalf`,
|
||||
// control reaching here implies that all checks are satisfied.
|
||||
return
|
||||
}
|
||||
|
||||
// if opsLockState.LockOrigin != l.lockOrigin {
|
||||
// t.Fatalf("Test case %d: Expected the origin of the lock to be \"%s\", got \"%s\"", testNum, opsLockState.LockOrigin, l.lockOrigin)
|
||||
// }
|
||||
// all check satisfied, return here.
|
||||
// Any mismatch in the earlier checks would have ended the tests due to `Fatalf`,
|
||||
// control reaching here implies that all checks are satisfied.
|
||||
return
|
||||
}
|
||||
// opsID not found.
|
||||
// No entry for an operation with given operation ID exists.
|
||||
if !opsIDfound {
|
||||
t.Fatalf("Test case %d: Entry for OpsId: \"%s\" not found in <bucket>: \"%s\", <path>: \"%s\" doesn't exist in the RPC response", testNum, l.opsID, l.volume, l.path)
|
||||
}
|
||||
}
|
||||
// opsID not found.
|
||||
// No entry for an operation with given operation ID exists.
|
||||
if !opsIDfound {
|
||||
t.Fatalf("Test case %d: Entry for OpsId: \"%s\" not found in <bucket>: \"%s\", <path>: \"%s\" doesn't exist in the RPC response", testNum, l.opsID, l.volume, l.path)
|
||||
}
|
||||
}
|
||||
// No entry exists for given <bucket, object> pair in the RPC response.
|
||||
t.Errorf("Test case %d: Entry for <bucket>: \"%s\", <object>: \"%s\" doesn't exist in the RPC response", testNum, l.volume, l.path)
|
||||
}
|
||||
// No entry exists for given <bucket, object> pair in the RPC response.
|
||||
t.Errorf("Test case %d: Entry for <bucket>: \"%s\", <object>: \"%s\" doesn't exist in the RPC response", testNum, l.volume, l.path)
|
||||
}
|
||||
|
||||
// Asserts the lock counter from the global nsMutex inmemory lock with the expected one.
|
||||
@ -142,7 +141,7 @@ func verifyGlobalLockStats(l lockStateCase, t *testing.T, testNum int) {
|
||||
nsMutex.lockMapMutex.Unlock()
|
||||
// Verifying again with the JSON response of the lock info.
|
||||
// Verifying the lock stats.
|
||||
sysLockState, err := generateSystemLockResponse()
|
||||
sysLockState, err := getSystemLockState()
|
||||
if err != nil {
|
||||
t.Fatalf("Obtaining lock info failed with <ERROR> %s", err)
|
||||
|
||||
@ -197,11 +196,11 @@ func verifyLockState(l lockStateCase, t *testing.T, testNum int) {
|
||||
if lockInfo, ok := debugLockMap.lockInfo[l.opsID]; ok {
|
||||
// Validating the lock type filed in the debug lock information.
|
||||
if l.readLock {
|
||||
if lockInfo.lockType != debugRLockStr {
|
||||
if lockInfo.lType != debugRLockStr {
|
||||
t.Errorf("Test case %d: Expected the lock type in the lock debug info to be \"%s\"", testNum, debugRLockStr)
|
||||
}
|
||||
} else {
|
||||
if lockInfo.lockType != debugWLockStr {
|
||||
if lockInfo.lType != debugWLockStr {
|
||||
t.Errorf("Test case %d: Expected the lock type in the lock debug info to be \"%s\"", testNum, debugWLockStr)
|
||||
}
|
||||
}
|
||||
@ -251,8 +250,8 @@ func TestNsLockMapStatusBlockedToRunning(t *testing.T) {
|
||||
path string
|
||||
lockOrigin string
|
||||
opsID string
|
||||
readLock bool // lock type.
|
||||
setBlocked bool // initialize the initial state to blocked.
|
||||
readLock bool // Read lock type.
|
||||
setBlocked bool // Initialize the initial state to blocked.
|
||||
expectedErr error
|
||||
}{
|
||||
// Test case - 1.
|
||||
@ -413,11 +412,11 @@ func TestNsLockMapStatusBlockedToRunning(t *testing.T) {
|
||||
if lockInfo, ok := debugLockMap.lockInfo[testCase.opsID]; ok {
|
||||
// Validating the lock type filed in the debug lock information.
|
||||
if testCase.readLock {
|
||||
if lockInfo.lockType != debugRLockStr {
|
||||
if lockInfo.lType != debugRLockStr {
|
||||
t.Errorf("Test case %d: Expected the lock type in the lock debug info to be \"%s\"", i+1, debugRLockStr)
|
||||
}
|
||||
} else {
|
||||
if lockInfo.lockType != debugWLockStr {
|
||||
if lockInfo.lType != debugWLockStr {
|
||||
t.Errorf("Test case %d: Expected the lock type in the lock debug info to be \"%s\"", i+1, debugWLockStr)
|
||||
}
|
||||
}
|
||||
@ -427,7 +426,7 @@ func TestNsLockMapStatusBlockedToRunning(t *testing.T) {
|
||||
t.Errorf("Test %d: Expected the lock origin info to be \"%s\", but got \"%s\"", i+1, testCase.lockOrigin, lockInfo.lockOrigin)
|
||||
}
|
||||
// validating the status of the lock.
|
||||
if lockInfo.status != "Running" {
|
||||
if lockInfo.status != runningStatus {
|
||||
t.Errorf("Test %d: Expected the status of the lock to be \"%s\", but got \"%s\"", i+1, "Running", lockInfo.status)
|
||||
}
|
||||
} else {
|
||||
@ -457,7 +456,7 @@ func TestNsLockMapStatusNoneToBlocked(t *testing.T) {
|
||||
readLock: true,
|
||||
// expected metrics.
|
||||
expectedErr: nil,
|
||||
expectedLockStatus: "Blocked",
|
||||
expectedLockStatus: blockedStatus,
|
||||
|
||||
expectedGlobalLockCount: 1,
|
||||
expectedRunningLockCount: 0,
|
||||
@ -479,7 +478,7 @@ func TestNsLockMapStatusNoneToBlocked(t *testing.T) {
|
||||
readLock: false,
|
||||
// expected metrics.
|
||||
expectedErr: nil,
|
||||
expectedLockStatus: "Blocked",
|
||||
expectedLockStatus: blockedStatus,
|
||||
|
||||
expectedGlobalLockCount: 2,
|
||||
expectedRunningLockCount: 0,
|
||||
|
@ -59,8 +59,9 @@ func (l *lockServer) removeEntry(name, uid string, lri *[]lockRequesterInfo) boo
|
||||
|
||||
// Validate lock args.
|
||||
func (l *lockServer) validateLockArgs(args *LockArgs) error {
|
||||
if !l.timestamp.Equal(args.Timestamp) {
|
||||
return errInvalidTimestamp
|
||||
curTime := time.Now().UTC()
|
||||
if curTime.Sub(args.Timestamp) > globalMaxSkewTime {
|
||||
return errServerTimeMismatch
|
||||
}
|
||||
if !isRPCTokenValid(args.Token) {
|
||||
return errInvalidToken
|
||||
|
@ -24,8 +24,7 @@ import (
|
||||
|
||||
// Test function to remove lock entries from map only in case they still exist based on name & uid combination
|
||||
func TestLockRpcServerRemoveEntryIfExists(t *testing.T) {
|
||||
|
||||
testPath, locker, _, _ := createLockTestServer(t)
|
||||
testPath, locker, _ := createLockTestServer(t)
|
||||
defer removeAll(testPath)
|
||||
|
||||
lri := lockRequesterInfo{
|
||||
@ -62,8 +61,7 @@ func TestLockRpcServerRemoveEntryIfExists(t *testing.T) {
|
||||
|
||||
// Test function to remove lock entries from map based on name & uid combination
|
||||
func TestLockRpcServerRemoveEntry(t *testing.T) {
|
||||
|
||||
testPath, locker, _, _ := createLockTestServer(t)
|
||||
testPath, locker, _ := createLockTestServer(t)
|
||||
defer removeAll(testPath)
|
||||
|
||||
lockRequesterInfo1 := lockRequesterInfo{
|
||||
|
@ -26,6 +26,7 @@ import (
|
||||
"time"
|
||||
|
||||
router "github.com/gorilla/mux"
|
||||
"github.com/minio/minio-go/pkg/set"
|
||||
)
|
||||
|
||||
const lockRPCPath = "/minio/lock"
|
||||
@ -73,8 +74,6 @@ type lockServer struct {
|
||||
rpcPath string
|
||||
mutex sync.Mutex
|
||||
lockMap map[string][]lockRequesterInfo
|
||||
// Timestamp set at the time of initialization. Resets naturally on minio server restart.
|
||||
timestamp time.Time
|
||||
}
|
||||
|
||||
// Register distributed NS lock handlers.
|
||||
@ -90,12 +89,14 @@ func newLockServers(serverConfig serverCmdConfig) (lockServers []*lockServer) {
|
||||
ignoredExports := serverConfig.ignoredDisks
|
||||
|
||||
// Save ignored disks in a map
|
||||
skipDisks := make(map[string]bool)
|
||||
for _, ignoredExport := range ignoredExports {
|
||||
skipDisks[ignoredExport] = true
|
||||
// Initialize ignored disks in a new set.
|
||||
ignoredSet := set.NewStringSet()
|
||||
if len(ignoredExports) > 0 {
|
||||
ignoredSet = set.CreateStringSet(ignoredExports...)
|
||||
}
|
||||
for _, export := range exports {
|
||||
if skipDisks[export] {
|
||||
if ignoredSet.Contains(export) {
|
||||
// Ignore initializing ignored export.
|
||||
continue
|
||||
}
|
||||
// Not local storage move to the next node.
|
||||
@ -107,10 +108,9 @@ func newLockServers(serverConfig serverCmdConfig) (lockServers []*lockServer) {
|
||||
}
|
||||
// Create handler for lock RPCs
|
||||
locker := &lockServer{
|
||||
rpcPath: export,
|
||||
mutex: sync.Mutex{},
|
||||
lockMap: make(map[string][]lockRequesterInfo),
|
||||
timestamp: time.Now().UTC(),
|
||||
rpcPath: export,
|
||||
mutex: sync.Mutex{},
|
||||
lockMap: make(map[string][]lockRequesterInfo),
|
||||
}
|
||||
|
||||
// Start loop for stale lock maintenance
|
||||
@ -153,7 +153,7 @@ func (l *lockServer) LoginHandler(args *RPCLoginArgs, reply *RPCLoginReply) erro
|
||||
return err
|
||||
}
|
||||
reply.Token = token
|
||||
reply.Timestamp = l.timestamp
|
||||
reply.Timestamp = time.Now().UTC()
|
||||
reply.ServerVersion = Version
|
||||
return nil
|
||||
}
|
||||
|
@ -17,6 +17,7 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"runtime"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
@ -24,7 +25,6 @@ import (
|
||||
|
||||
// Helper function to test equality of locks (without taking timing info into account)
|
||||
func testLockEquality(lriLeft, lriRight []lockRequesterInfo) bool {
|
||||
|
||||
if len(lriLeft) != len(lriRight) {
|
||||
return false
|
||||
}
|
||||
@ -41,8 +41,7 @@ func testLockEquality(lriLeft, lriRight []lockRequesterInfo) bool {
|
||||
}
|
||||
|
||||
// Helper function to create a lock server for testing
|
||||
func createLockTestServer(t *testing.T) (string, *lockServer, string, time.Time) {
|
||||
|
||||
func createLockTestServer(t *testing.T) (string, *lockServer, string) {
|
||||
testPath, err := newTestConfig("us-east-1")
|
||||
if err != nil {
|
||||
t.Fatalf("unable initialize config file, %s", err)
|
||||
@ -63,21 +62,20 @@ func createLockTestServer(t *testing.T) (string, *lockServer, string, time.Time)
|
||||
t.Fatalf("unable for JWT to generate token, %s", err)
|
||||
}
|
||||
|
||||
timestamp := time.Now().UTC()
|
||||
locker := &lockServer{
|
||||
rpcPath: "rpc-path",
|
||||
mutex: sync.Mutex{},
|
||||
lockMap: make(map[string][]lockRequesterInfo),
|
||||
timestamp: timestamp,
|
||||
rpcPath: "rpc-path",
|
||||
mutex: sync.Mutex{},
|
||||
lockMap: make(map[string][]lockRequesterInfo),
|
||||
}
|
||||
|
||||
return testPath, locker, token, timestamp
|
||||
return testPath, locker, token
|
||||
}
|
||||
|
||||
// Test Lock functionality
|
||||
func TestLockRpcServerLock(t *testing.T) {
|
||||
|
||||
testPath, locker, token, timestamp := createLockTestServer(t)
|
||||
timestamp := time.Now().UTC()
|
||||
testPath, locker, token := createLockTestServer(t)
|
||||
defer removeAll(testPath)
|
||||
|
||||
la := LockArgs{
|
||||
@ -100,7 +98,7 @@ func TestLockRpcServerLock(t *testing.T) {
|
||||
} else {
|
||||
gotLri, _ := locker.lockMap["name"]
|
||||
expectedLri := []lockRequesterInfo{
|
||||
lockRequesterInfo{
|
||||
{
|
||||
writer: true,
|
||||
node: "node",
|
||||
rpcPath: "rpc-path",
|
||||
@ -135,7 +133,8 @@ func TestLockRpcServerLock(t *testing.T) {
|
||||
// Test Unlock functionality
|
||||
func TestLockRpcServerUnlock(t *testing.T) {
|
||||
|
||||
testPath, locker, token, timestamp := createLockTestServer(t)
|
||||
timestamp := time.Now().UTC()
|
||||
testPath, locker, token := createLockTestServer(t)
|
||||
defer removeAll(testPath)
|
||||
|
||||
la := LockArgs{
|
||||
@ -182,7 +181,8 @@ func TestLockRpcServerUnlock(t *testing.T) {
|
||||
// Test RLock functionality
|
||||
func TestLockRpcServerRLock(t *testing.T) {
|
||||
|
||||
testPath, locker, token, timestamp := createLockTestServer(t)
|
||||
timestamp := time.Now().UTC()
|
||||
testPath, locker, token := createLockTestServer(t)
|
||||
defer removeAll(testPath)
|
||||
|
||||
la := LockArgs{
|
||||
@ -205,7 +205,7 @@ func TestLockRpcServerRLock(t *testing.T) {
|
||||
} else {
|
||||
gotLri, _ := locker.lockMap["name"]
|
||||
expectedLri := []lockRequesterInfo{
|
||||
lockRequesterInfo{
|
||||
{
|
||||
writer: false,
|
||||
node: "node",
|
||||
rpcPath: "rpc-path",
|
||||
@ -240,7 +240,8 @@ func TestLockRpcServerRLock(t *testing.T) {
|
||||
// Test RUnlock functionality
|
||||
func TestLockRpcServerRUnlock(t *testing.T) {
|
||||
|
||||
testPath, locker, token, timestamp := createLockTestServer(t)
|
||||
timestamp := time.Now().UTC()
|
||||
testPath, locker, token := createLockTestServer(t)
|
||||
defer removeAll(testPath)
|
||||
|
||||
la := LockArgs{
|
||||
@ -294,7 +295,7 @@ func TestLockRpcServerRUnlock(t *testing.T) {
|
||||
} else {
|
||||
gotLri, _ := locker.lockMap["name"]
|
||||
expectedLri := []lockRequesterInfo{
|
||||
lockRequesterInfo{
|
||||
{
|
||||
writer: false,
|
||||
node: "node",
|
||||
rpcPath: "rpc-path",
|
||||
@ -327,8 +328,8 @@ func TestLockRpcServerRUnlock(t *testing.T) {
|
||||
|
||||
// Test Expired functionality
|
||||
func TestLockRpcServerExpired(t *testing.T) {
|
||||
|
||||
testPath, locker, token, timestamp := createLockTestServer(t)
|
||||
timestamp := time.Now().UTC()
|
||||
testPath, locker, token := createLockTestServer(t)
|
||||
defer removeAll(testPath)
|
||||
|
||||
la := LockArgs{
|
||||
@ -369,3 +370,52 @@ func TestLockRpcServerExpired(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Test initialization of lock servers.
|
||||
func TestLockServers(t *testing.T) {
|
||||
if runtime.GOOS == "windows" {
|
||||
return
|
||||
}
|
||||
testCases := []struct {
|
||||
srvCmdConfig serverCmdConfig
|
||||
totalLockServers int
|
||||
}{
|
||||
// Test - 1 one lock server initialized.
|
||||
{
|
||||
srvCmdConfig: serverCmdConfig{
|
||||
isDistXL: true,
|
||||
disks: []string{
|
||||
"localhost:/mnt/disk1",
|
||||
"1.1.1.2:/mnt/disk2",
|
||||
"1.1.2.1:/mnt/disk3",
|
||||
"1.1.2.2:/mnt/disk4",
|
||||
},
|
||||
},
|
||||
totalLockServers: 1,
|
||||
},
|
||||
// Test - 2 two servers possible, 1 ignored.
|
||||
{
|
||||
srvCmdConfig: serverCmdConfig{
|
||||
isDistXL: true,
|
||||
disks: []string{
|
||||
"localhost:/mnt/disk1",
|
||||
"localhost:/mnt/disk2",
|
||||
"1.1.2.1:/mnt/disk3",
|
||||
"1.1.2.2:/mnt/disk4",
|
||||
},
|
||||
ignoredDisks: []string{
|
||||
"localhost:/mnt/disk2",
|
||||
},
|
||||
},
|
||||
totalLockServers: 1,
|
||||
},
|
||||
}
|
||||
|
||||
// Validates lock server initialization.
|
||||
for i, testCase := range testCases {
|
||||
lockServers := newLockServers(testCase.srvCmdConfig)
|
||||
if len(lockServers) != testCase.totalLockServers {
|
||||
t.Fatalf("Test %d: Expected total %d, got %d", i+1, testCase.totalLockServers, len(lockServers))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
169
cmd/lockinfo-handlers.go
Normal file
169
cmd/lockinfo-handlers.go
Normal file
@ -0,0 +1,169 @@
|
||||
/*
|
||||
* Minio Cloud Storage, (C) 2016 Minio, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// SystemLockState - Structure to fill the lock state of entire object storage.
|
||||
// That is the total locks held, total calls blocked on locks and state of all the locks for the entire system.
|
||||
type SystemLockState struct {
|
||||
TotalLocks int64 `json:"totalLocks"`
|
||||
// Count of operations which are blocked waiting for the lock to
|
||||
// be released.
|
||||
TotalBlockedLocks int64 `json:"totalBlockedLocks"`
|
||||
// Count of operations which has successfully acquired the lock but
|
||||
// hasn't unlocked yet( operation in progress).
|
||||
TotalAcquiredLocks int64 `json:"totalAcquiredLocks"`
|
||||
LocksInfoPerObject []VolumeLockInfo `json:"locksInfoPerObject"`
|
||||
}
|
||||
|
||||
// VolumeLockInfo - Structure to contain the lock state info for volume, path pair.
|
||||
type VolumeLockInfo struct {
|
||||
Bucket string `json:"bucket"`
|
||||
Object string `json:"object"`
|
||||
// All locks blocked + running for given <volume,path> pair.
|
||||
LocksOnObject int64 `json:"locksOnObject"`
|
||||
// Count of operations which has successfully acquired the lock
|
||||
// but hasn't unlocked yet( operation in progress).
|
||||
LocksAcquiredOnObject int64 `json:"locksAcquiredOnObject"`
|
||||
// Count of operations which are blocked waiting for the lock
|
||||
// to be released.
|
||||
TotalBlockedLocks int64 `json:"locksBlockedOnObject"`
|
||||
// State information containing state of the locks for all operations
|
||||
// on given <volume,path> pair.
|
||||
LockDetailsOnObject []OpsLockState `json:"lockDetailsOnObject"`
|
||||
}
|
||||
|
||||
// OpsLockState - structure to fill in state information of the lock.
|
||||
// structure to fill in status information for each operation with given operation ID.
|
||||
type OpsLockState struct {
|
||||
OperationID string `json:"opsID"` // String containing operation ID.
|
||||
LockOrigin string `json:"lockOrigin"` // Operation type (GetObject, PutObject...)
|
||||
LockType lockType `json:"lockType"` // Lock type (RLock, WLock)
|
||||
Status statusType `json:"status"` // Status can be Running/Ready/Blocked.
|
||||
Since time.Time `json:"statusSince"` // Time when the lock was initially held.
|
||||
Duration time.Duration `json:"statusDuration"` // Duration since the lock was held.
|
||||
}
|
||||
|
||||
// Read entire state of the locks in the system and return.
|
||||
func getSystemLockState() (SystemLockState, error) {
|
||||
nsMutex.lockMapMutex.Lock()
|
||||
defer nsMutex.lockMapMutex.Unlock()
|
||||
|
||||
lockState := SystemLockState{}
|
||||
|
||||
lockState.TotalBlockedLocks = nsMutex.blockedCounter
|
||||
lockState.TotalLocks = nsMutex.globalLockCounter
|
||||
lockState.TotalAcquiredLocks = nsMutex.runningLockCounter
|
||||
|
||||
for param, debugLock := range nsMutex.debugLockMap {
|
||||
volLockInfo := VolumeLockInfo{}
|
||||
volLockInfo.Bucket = param.volume
|
||||
volLockInfo.Object = param.path
|
||||
volLockInfo.LocksOnObject = debugLock.ref
|
||||
volLockInfo.TotalBlockedLocks = debugLock.blocked
|
||||
volLockInfo.LocksAcquiredOnObject = debugLock.running
|
||||
for opsID, lockInfo := range debugLock.lockInfo {
|
||||
volLockInfo.LockDetailsOnObject = append(volLockInfo.LockDetailsOnObject, OpsLockState{
|
||||
OperationID: opsID,
|
||||
LockOrigin: lockInfo.lockOrigin,
|
||||
LockType: lockInfo.lType,
|
||||
Status: lockInfo.status,
|
||||
Since: lockInfo.since,
|
||||
Duration: time.Now().UTC().Sub(lockInfo.since),
|
||||
})
|
||||
}
|
||||
lockState.LocksInfoPerObject = append(lockState.LocksInfoPerObject, volLockInfo)
|
||||
}
|
||||
return lockState, nil
|
||||
}
|
||||
|
||||
// Remote procedure call, calls LockInfo handler with given input args.
|
||||
func (c *controlAPIHandlers) remoteLockInfoCall(args *GenericArgs, replies []SystemLockState) error {
|
||||
var wg sync.WaitGroup
|
||||
var errs = make([]error, len(c.RemoteControls))
|
||||
// Send remote call to all neighboring peers to restart minio servers.
|
||||
for index, clnt := range c.RemoteControls {
|
||||
wg.Add(1)
|
||||
go func(index int, client *AuthRPCClient) {
|
||||
defer wg.Done()
|
||||
errs[index] = client.Call("Control.RemoteLockInfo", args, &replies[index])
|
||||
errorIf(errs[index], "Unable to initiate control lockInfo request to remote node %s", client.Node())
|
||||
}(index, clnt)
|
||||
}
|
||||
wg.Wait()
|
||||
for _, err := range errs {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// RemoteLockInfo - RPC control handler for `minio control lock`, used internally by LockInfo to
|
||||
// make calls to neighboring peers.
|
||||
func (c *controlAPIHandlers) RemoteLockInfo(args *GenericArgs, reply *SystemLockState) error {
|
||||
if !isRPCTokenValid(args.Token) {
|
||||
return errInvalidToken
|
||||
}
|
||||
// Obtain the lock state information of the local system.
|
||||
lockState, err := getSystemLockState()
|
||||
// In case of error, return err to the RPC client.
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
*reply = lockState
|
||||
return nil
|
||||
}
|
||||
|
||||
// LockInfo - RPC control handler for `minio control lock`. Returns the info of the locks held in the cluster.
|
||||
func (c *controlAPIHandlers) LockInfo(args *GenericArgs, reply *map[string]SystemLockState) error {
|
||||
if !isRPCTokenValid(args.Token) {
|
||||
return errInvalidToken
|
||||
}
|
||||
var replies = make([]SystemLockState, len(c.RemoteControls))
|
||||
if args.Remote {
|
||||
// Fetch lock states from all the remote peers.
|
||||
args.Remote = false
|
||||
if err := c.remoteLockInfoCall(args, replies); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
rep := make(map[string]SystemLockState)
|
||||
// The response containing the lock info.
|
||||
for index, client := range c.RemoteControls {
|
||||
rep[client.Node()] = replies[index]
|
||||
}
|
||||
// Obtain the lock state information of the local system.
|
||||
lockState, err := getSystemLockState()
|
||||
// In case of error, return err to the RPC client.
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Save the local node lock state.
|
||||
rep[c.LocalNode] = lockState
|
||||
|
||||
// Set the reply.
|
||||
*reply = rep
|
||||
|
||||
// Success.
|
||||
return nil
|
||||
}
|
@ -19,6 +19,9 @@ package cmd
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"runtime/debug"
|
||||
"strings"
|
||||
|
||||
@ -42,6 +45,20 @@ type logger struct {
|
||||
// Add new loggers here.
|
||||
}
|
||||
|
||||
// Function takes input with the results from runtime.Caller(1). Depending on the boolean.
|
||||
// This function can either returned a shotFile form or a longFile form.
|
||||
func funcFromPC(pc uintptr, file string, line int, shortFile bool) string {
|
||||
var fn, name string
|
||||
if shortFile {
|
||||
fn = strings.Replace(file, filepath.ToSlash(GOPATH)+"/src/github.com/minio/minio/cmd/", "", -1)
|
||||
name = strings.Replace(runtime.FuncForPC(pc).Name(), "github.com/minio/minio/cmd.", "", -1)
|
||||
} else {
|
||||
fn = strings.Replace(file, filepath.ToSlash(GOPATH)+"/src/", "", -1)
|
||||
name = strings.Replace(runtime.FuncForPC(pc).Name(), "github.com/minio/minio/cmd.", "", -1)
|
||||
}
|
||||
return fmt.Sprintf("%s [%s:%d]", name, fn, line)
|
||||
}
|
||||
|
||||
// stackInfo returns printable stack trace.
|
||||
func stackInfo() string {
|
||||
// Convert stack-trace bytes to io.Reader.
|
||||
@ -56,7 +73,7 @@ func stackInfo() string {
|
||||
stackBuf.ReadFrom(rawStack)
|
||||
|
||||
// Strip GOPATH of the build system and return.
|
||||
return strings.Replace(stackBuf.String(), GOPATH+"/src/", "", -1)
|
||||
return strings.Replace(stackBuf.String(), filepath.ToSlash(GOPATH)+"/src/", "", -1)
|
||||
}
|
||||
|
||||
// errorIf synonymous with fatalIf but doesn't exit on error != nil
|
||||
|
@ -20,17 +20,36 @@ import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"testing"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
|
||||
. "gopkg.in/check.v1"
|
||||
)
|
||||
|
||||
type LoggerSuite struct{}
|
||||
// Tests func obtained from process stack counter.
|
||||
func TestFuncToPC(t *testing.T) {
|
||||
GOPATH = filepath.ToSlash(os.Getenv("GOPATH"))
|
||||
pc, file, line, success := runtime.Caller(0)
|
||||
if !success {
|
||||
file = "???"
|
||||
line = 0
|
||||
}
|
||||
shortFile := true // We are only interested in short file form.
|
||||
cLocation := funcFromPC(pc, file, line, shortFile)
|
||||
if cLocation != "TestFuncToPC [logger_test.go:34]" {
|
||||
t.Fatal("Unexpected caller location found", cLocation)
|
||||
}
|
||||
shortFile = false // We are not interested in short file form.
|
||||
cLocation = funcFromPC(pc, file, line, shortFile)
|
||||
if cLocation != "TestFuncToPC [github.com/minio/minio/cmd/logger_test.go:34]" {
|
||||
t.Fatal("Unexpected caller location found", cLocation)
|
||||
}
|
||||
}
|
||||
|
||||
var _ = Suite(&LoggerSuite{})
|
||||
|
||||
func (s *LoggerSuite) TestLogger(c *C) {
|
||||
// Tests error logger.
|
||||
func TestLogger(t *testing.T) {
|
||||
var buffer bytes.Buffer
|
||||
var fields logrus.Fields
|
||||
log.Out = &buffer
|
||||
@ -38,10 +57,17 @@ func (s *LoggerSuite) TestLogger(c *C) {
|
||||
|
||||
errorIf(errors.New("Fake error"), "Failed with error.")
|
||||
err := json.Unmarshal(buffer.Bytes(), &fields)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(fields["level"], Equals, "error")
|
||||
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if fields["level"] != "error" {
|
||||
t.Fatalf("Expected error, got %s", fields["level"])
|
||||
}
|
||||
msg, ok := fields["cause"]
|
||||
c.Assert(ok, Equals, true)
|
||||
c.Assert(msg, Equals, "Fake error")
|
||||
if !ok {
|
||||
t.Fatal("Cause field missing")
|
||||
}
|
||||
if msg != "Fake error" {
|
||||
t.Fatal("Cause field has unexpected message", msg)
|
||||
}
|
||||
}
|
||||
|
@ -18,7 +18,6 @@ package cmd
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
pathutil "path"
|
||||
"runtime"
|
||||
"strconv"
|
||||
@ -66,8 +65,7 @@ func initNSLock(isDist bool) {
|
||||
lockMap: make(map[nsParam]*nsLock),
|
||||
}
|
||||
|
||||
// Initialize nsLockMap with entry for instrumentation
|
||||
// information.
|
||||
// Initialize nsLockMap with entry for instrumentation information.
|
||||
// Entries of <volume,path> -> stateInfo of locks
|
||||
nsMutex.debugLockMap = make(map[nsParam]*debugLockInfoPerVolumePath)
|
||||
}
|
||||
@ -94,7 +92,7 @@ type nsLock struct {
|
||||
// nsLockMap - namespace lock map, provides primitives to Lock,
|
||||
// Unlock, RLock and RUnlock.
|
||||
type nsLockMap struct {
|
||||
// lock counter used for lock debugging.
|
||||
// Lock counter used for lock debugging.
|
||||
globalLockCounter int64 // Total locks held.
|
||||
blockedCounter int64 // Total operations blocked waiting for locks.
|
||||
runningLockCounter int64 // Total locks held but not released yet.
|
||||
@ -148,8 +146,7 @@ func (n *nsLockMap) lock(volume, path string, lockOrigin, opsID string, readLock
|
||||
|
||||
// Changing the status of the operation from blocked to
|
||||
// running. change the state of the lock to be running (from
|
||||
// blocked) for the given pair of <volume, path> and
|
||||
// <OperationID>.
|
||||
// blocked) for the given pair of <volume, path> and <OperationID>.
|
||||
if err := n.statusBlockedToRunning(param, lockOrigin, opsID, readLock); err != nil {
|
||||
errorIf(err, "Failed to set the lock state to running.")
|
||||
}
|
||||
@ -199,24 +196,22 @@ func (n *nsLockMap) unlock(volume, path, opsID string, readLock bool) {
|
||||
// Lock - locks the given resource for writes, using a previously
|
||||
// allocated name space lock or initializing a new one.
|
||||
func (n *nsLockMap) Lock(volume, path, opsID string) {
|
||||
var lockOrigin string
|
||||
readLock := false // This is a write lock.
|
||||
|
||||
// The caller information of the lock held has been obtained
|
||||
// here before calling any other function.
|
||||
|
||||
// Fetching the package, function name and the line number of
|
||||
// the caller from the runtime. here is an example
|
||||
// https://play.golang.org/p/perrmNRI9_ .
|
||||
pc, fn, line, success := runtime.Caller(1)
|
||||
// the caller from the runtime.
|
||||
pc, file, line, success := runtime.Caller(1)
|
||||
if !success {
|
||||
errorIf(errors.New("Couldn't get caller info."),
|
||||
"Fetching caller info form runtime failed.")
|
||||
file = "???"
|
||||
line = 0
|
||||
}
|
||||
lockOrigin = fmt.Sprintf("[lock held] in %s[%s:%d]",
|
||||
runtime.FuncForPC(pc).Name(), fn, line)
|
||||
shortFile := true // We are only interested in short file form.
|
||||
lockLocation := funcFromPC(pc, file, line, shortFile)
|
||||
|
||||
readLock := false
|
||||
n.lock(volume, path, lockOrigin, opsID, readLock)
|
||||
n.lock(volume, path, lockLocation, opsID, readLock)
|
||||
}
|
||||
|
||||
// Unlock - unlocks any previously acquired write locks.
|
||||
@ -227,24 +222,22 @@ func (n *nsLockMap) Unlock(volume, path, opsID string) {
|
||||
|
||||
// RLock - locks any previously acquired read locks.
|
||||
func (n *nsLockMap) RLock(volume, path, opsID string) {
|
||||
var lockOrigin string
|
||||
readLock := true
|
||||
|
||||
// The caller information of the lock held has been obtained
|
||||
// here before calling any other function.
|
||||
|
||||
// Fetching the package, function name and the line number of
|
||||
// the caller from the runtime. Here is an example
|
||||
// https://play.golang.org/p/perrmNRI9_ .
|
||||
pc, fn, line, success := runtime.Caller(1)
|
||||
// the caller from the runtime.
|
||||
pc, file, line, success := runtime.Caller(1)
|
||||
if !success {
|
||||
errorIf(errors.New("Couldn't get caller info."),
|
||||
"Fetching caller info form runtime failed.")
|
||||
file = "???"
|
||||
line = 0
|
||||
}
|
||||
lockOrigin = fmt.Sprintf("[lock held] in %s[%s:%d]",
|
||||
runtime.FuncForPC(pc).Name(), fn, line)
|
||||
shortFile := true // We are only interested in short file form.
|
||||
lockLocation := funcFromPC(pc, file, line, shortFile)
|
||||
|
||||
n.lock(volume, path, lockOrigin, opsID, readLock)
|
||||
n.lock(volume, path, lockLocation, opsID, readLock)
|
||||
}
|
||||
|
||||
// RUnlock - unlocks any previously acquired read locks.
|
||||
|
@ -81,7 +81,7 @@ func configureServerHandler(srvCmdConfig serverCmdConfig) http.Handler {
|
||||
}
|
||||
|
||||
// Register controller rpc router.
|
||||
registerControllerRPCRouter(mux, srvCmdConfig)
|
||||
registerControlRPCRouter(mux, srvCmdConfig)
|
||||
|
||||
// set environmental variable MINIO_BROWSER=off to disable minio web browser.
|
||||
// By default minio web browser is enabled.
|
||||
|
@ -53,7 +53,7 @@ func (s *storageServer) LoginHandler(args *RPCLoginArgs, reply *RPCLoginReply) e
|
||||
return err
|
||||
}
|
||||
reply.Token = token
|
||||
reply.Timestamp = s.timestamp
|
||||
reply.Timestamp = time.Now().UTC()
|
||||
reply.ServerVersion = Version
|
||||
return nil
|
||||
}
|
||||
@ -229,7 +229,6 @@ func newRPCServer(serverConfig serverCmdConfig) (servers []*storageServer, err e
|
||||
if len(ignoredExports) > 0 {
|
||||
ignoredSet = set.CreateStringSet(ignoredExports...)
|
||||
}
|
||||
tstamp := time.Now().UTC()
|
||||
for _, export := range exports {
|
||||
if ignoredSet.Contains(export) {
|
||||
// Ignore initializing ignored export.
|
||||
@ -249,9 +248,8 @@ func newRPCServer(serverConfig serverCmdConfig) (servers []*storageServer, err e
|
||||
export = export[idx+1:]
|
||||
}
|
||||
servers = append(servers, &storageServer{
|
||||
storage: storage,
|
||||
path: export,
|
||||
timestamp: tstamp,
|
||||
storage: storage,
|
||||
path: export,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -241,7 +241,7 @@ func StartTestStorageRPCServer(t TestErrHandler, instanceType string, diskN int)
|
||||
func initTestControlRPCEndPoint(srvCmdConfig serverCmdConfig) http.Handler {
|
||||
// Initialize router.
|
||||
muxRouter := router.NewRouter()
|
||||
registerControllerRPCRouter(muxRouter, srvCmdConfig)
|
||||
registerControlRPCRouter(muxRouter, srvCmdConfig)
|
||||
return muxRouter
|
||||
}
|
||||
|
||||
|
@ -30,9 +30,6 @@ var errSignatureMismatch = errors.New("Signature does not match")
|
||||
// used when token used for authentication by the MinioBrowser has expired
|
||||
var errInvalidToken = errors.New("Invalid token")
|
||||
|
||||
// used when cached timestamp do not match with what client remembers.
|
||||
var errInvalidTimestamp = errors.New("Timestamps don't match, server may have restarted.")
|
||||
|
||||
// If x-amz-content-sha256 header value mismatches with what we calculate.
|
||||
var errContentSHA256Mismatch = errors.New("Content checksum SHA256 mismatch")
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user