instrumentation: instrumentation for locks. (#2584)

- Instrumentation for locks.
- Detailed test coverage.
- Adding RPC control handler to fetch lock instrumentation.
- RPC control handlers suite tests with a test RPC server.
This commit is contained in:
Karthic Rao 2016-09-01 00:09:08 +05:30 committed by Harshavardhana
parent de67bca211
commit 07d232c7b4
20 changed files with 2132 additions and 204 deletions

View File

@ -35,7 +35,7 @@ var healCmd = cli.Command{
USAGE:
minio control {{.Name}}
EAMPLES:
EXAMPLES:
1. Heal an object.
$ minio control {{.Name}} http://localhost:9000/songs/classical/western/piano.mp3

139
cmd/control-lock-main.go Normal file
View File

@ -0,0 +1,139 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"encoding/json"
"fmt"
"net/url"
"path"
"time"
"github.com/minio/cli"
)
// SystemLockState - Structure to fill the lock state of entire object storage.
// That is the total locks held, total calls blocked on locks and state of all the locks for the entire system.
type SystemLockState struct {
TotalLocks int64 `json:"totalLocks"`
TotalBlockedLocks int64 `json:"totalBlockedLocks"` // count of operations which are blocked waiting for the lock to be released.
TotalAcquiredLocks int64 `json:"totalAcquiredLocks"` // count of operations which has successfully acquired the lock but hasn't unlocked yet( operation in progress).
LocksInfoPerObject []VolumeLockInfo `json:"locksInfoPerObject"`
}
// VolumeLockInfo - Structure to contain the lock state info for volume, path pair.
type VolumeLockInfo struct {
Bucket string `json:"bucket"`
Object string `json:"object"`
LocksOnObject int64 `json:"locksOnObject"` // All locks blocked + running for given <volume,path> pair.
LocksAcquiredOnObject int64 `json:"locksAcquiredOnObject"` // count of operations which has successfully acquired the lock but hasn't unlocked yet( operation in progress).
TotalBlockedLocks int64 `json:"locksBlockedOnObject"` // count of operations which are blocked waiting for the lock to be released.
LockDetailsOnObject []OpsLockState `json:"lockDetailsOnObject"` // state information containing state of the locks for all operations on given <volume,path> pair.
}
// OpsLockState - structure to fill in state information of the lock.
// structure to fill in status information for each operation with given operation ID.
type OpsLockState struct {
OperationID string `json:"opsID"` // string containing operation ID.
LockOrigin string `json:"lockOrigin"` // contant which mentions the operation type (Get Obejct, PutObject...)
LockType string `json:"lockType"`
Status string `json:"status"` // status can be running/ready/blocked.
StatusSince string `json:"statusSince"` // time info of the since how long the status holds true, value in seconds.
}
// Read entire state of the locks in the system and return.
func generateSystemLockResponse() (SystemLockState, error) {
nsMutex.lockMapMutex.Lock()
defer nsMutex.lockMapMutex.Unlock()
if nsMutex.debugLockMap == nil {
return SystemLockState{}, LockInfoNil{}
}
lockState := SystemLockState{}
lockState.TotalBlockedLocks = nsMutex.blockedCounter
lockState.TotalLocks = nsMutex.globalLockCounter
lockState.TotalAcquiredLocks = nsMutex.runningLockCounter
for param := range nsMutex.debugLockMap {
volLockInfo := VolumeLockInfo{}
volLockInfo.Bucket = param.volume
volLockInfo.Object = param.path
volLockInfo.TotalBlockedLocks = nsMutex.debugLockMap[param].blocked
volLockInfo.LocksAcquiredOnObject = nsMutex.debugLockMap[param].running
volLockInfo.LocksOnObject = nsMutex.debugLockMap[param].ref
for opsID := range nsMutex.debugLockMap[param].lockInfo {
opsState := OpsLockState{}
opsState.OperationID = opsID
opsState.LockOrigin = nsMutex.debugLockMap[param].lockInfo[opsID].lockOrigin
opsState.LockType = nsMutex.debugLockMap[param].lockInfo[opsID].lockType
opsState.Status = nsMutex.debugLockMap[param].lockInfo[opsID].status
opsState.StatusSince = time.Now().Sub(nsMutex.debugLockMap[param].lockInfo[opsID].since).String()
volLockInfo.LockDetailsOnObject = append(volLockInfo.LockDetailsOnObject, opsState)
}
lockState.LocksInfoPerObject = append(lockState.LocksInfoPerObject, volLockInfo)
}
return lockState, nil
}
var lockCmd = cli.Command{
Name: "lock",
Usage: "info about the locks in the node.",
Action: lockControl,
CustomHelpTemplate: `NAME:
minio control {{.Name}} - {{.Usage}}
USAGE:
minio control {{.Name}} http://localhost:9000/
EAMPLES:
1. Get all the info about the blocked/held locks in the node:
$ minio control lock http://localhost:9000/
`,
}
// "minio control lock" entry point.
func lockControl(c *cli.Context) {
if len(c.Args()) != 1 {
cli.ShowCommandHelpAndExit(c, "lock", 1)
}
parsedURL, err := url.Parse(c.Args()[0])
fatalIf(err, "Unable to parse URL.")
authCfg := &authConfig{
accessKey: serverConfig.GetCredential().AccessKeyID,
secretKey: serverConfig.GetCredential().SecretAccessKey,
address: parsedURL.Host,
path: path.Join(reservedBucket, controlPath),
loginMethod: "Controller.LoginHandler",
}
client := newAuthClient(authCfg)
args := &GenericArgs{}
reply := &SystemLockState{}
err = client.Call("Control.LockInfo", args, reply)
// logs the error and returns if err != nil.
fatalIf(err, "RPC Control.LockInfo call failed")
// print the lock info on the console.
b, err := json.MarshalIndent(*reply, "", " ")
fatalIf(err, "Failed to parse the RPC lock info response")
fmt.Print(string(b))
}

View File

@ -24,6 +24,7 @@ var controlCmd = cli.Command{
Usage: "Control and manage minio server.",
Action: mainControl,
Subcommands: []cli.Command{
lockCmd,
healCmd,
shutdownCmd,
},

View File

@ -39,7 +39,7 @@ var shutdownCmd = cli.Command{
USAGE:
minio control {{.Name}} http://localhost:9000/
EAMPLES:
EXAMPLES:
1. Shutdown the server:
$ minio control shutdown http://localhost:9000/

View File

@ -153,3 +153,17 @@ func (c *controllerAPIHandlers) TryInitHandler(args *GenericArgs, reply *Generic
return nil
}
// LockInfo - RPC control handler for `minio control lock`.
// Returns the info of the locks held in the system.
func (c *controllerAPIHandlers) LockInfo(arg *GenericArgs, reply *SystemLockState) error {
// obtain the lock state information.
lockInfo, err := generateSystemLockResponse()
// in case of error, return err to the RPC client.
if err != nil {
return err
}
// the response containing the lock info.
*reply = lockInfo
return nil
}

View File

@ -1,63 +0,0 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
// "net/rpc"
"testing"
)
// Wrapper for calling heal disk metadata rpc Handler
func TestControllerHandlerHealDiskMetadata(t *testing.T) {
ExecObjectLayerTest(t, testHealDiskMetadataControllerHandler)
}
// testHealDiskMetadataControllerHandler - Test Heal Disk Metadata handler
func testHealDiskMetadataControllerHandler(obj ObjectLayer, instanceType string, t TestErrHandler) {
// Register the API end points with XL/FS object layer.
serverAddress, random, err := initTestControllerRPCEndPoint(obj)
if err != nil {
t.Fatal(err)
}
// initialize the server and obtain the credentials and root.
// credentials are necessary to sign the HTTP request.
rootPath, err := newTestConfig("us-east-1")
if err != nil {
t.Fatalf("Init Test config failed")
}
// remove the root folder after the test ends.
defer removeAll(rootPath)
authCfg := &authConfig{
accessKey: serverConfig.GetCredential().AccessKeyID,
secretKey: serverConfig.GetCredential().SecretAccessKey,
address: serverAddress,
path: "/controller" + random,
loginMethod: "Controller.LoginHandler",
}
client := newAuthClient(authCfg)
args := &GenericArgs{}
reply := &GenericReply{}
err = client.Call("Controller.HealDiskMetadataHandler", args, reply)
if instanceType == "FS" && err == nil {
t.Errorf("Test should fail with FS")
}
if instanceType == "XL" && err != nil {
t.Errorf("Test should succeed with XL %s", err.Error())
}
}

298
cmd/controller_test.go Normal file
View File

@ -0,0 +1,298 @@
/*
* Minio Cloud Storage, (C) 2015, 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"path"
"strconv"
"sync"
"time"
. "gopkg.in/check.v1"
)
// API suite container common to both FS and XL.
type TestRPCControllerSuite struct {
serverType string
testServer TestServer
endPoint string
accessKey string
secretKey string
}
// Init and run test on XL backend.
var _ = Suite(&TestRPCControllerSuite{serverType: "XL"})
// Setting up the test suite.
// Starting the Test server with temporary FS backend.
func (s *TestRPCControllerSuite) SetUpSuite(c *C) {
s.testServer = StartTestRPCServer(c, s.serverType)
s.endPoint = s.testServer.Server.Listener.Addr().String()
s.accessKey = s.testServer.AccessKey
s.secretKey = s.testServer.SecretKey
}
// Called implicitly by "gopkg.in/check.v1" after all tests are run.
func (s *TestRPCControllerSuite) TearDownSuite(c *C) {
s.testServer.Stop()
}
// Tests to validate the correctness of lock instrumentation control RPC end point.
func (s *TestRPCControllerSuite) TestRPCControlLock(c *C) {
// enabling lock instrumentation.
globalDebugLock = true
// initializing the locks.
initNSLock(false)
// set debug lock info to `nil` so that the next tests have to initialize them again.
defer func() {
globalDebugLock = false
nsMutex.debugLockMap = nil
}()
expectedResult := []lockStateCase{
// Test case - 1.
// Case where 10 read locks are held.
// Entry for any of the 10 reads locks has to be found.
// Since they held in a loop, Lock origin for first 10 read locks (opsID 0-9) should be the same.
{
volume: "my-bucket",
path: "my-object",
opsID: "0",
readLock: true,
lockOrigin: "[lock held] in github.com/minio/minio/cmd.TestLockStats[/Users/hackintoshrao/mycode/go/src/github.com/minio/minio/cmd/namespace-lock_test.go:298]",
// expected metrics.
expectedErr: nil,
expectedLockStatus: "Running",
expectedGlobalLockCount: 10,
expectedRunningLockCount: 10,
expectedBlockedLockCount: 0,
expectedVolPathLockCount: 10,
expectedVolPathRunningCount: 10,
expectedVolPathBlockCount: 0,
},
// Test case 2.
// Testing the existance of entry for the last read lock (read lock with opsID "9").
{
volume: "my-bucket",
path: "my-object",
opsID: "9",
readLock: true,
lockOrigin: "[lock held] in github.com/minio/minio/cmd.TestLockStats[/Users/hackintoshrao/mycode/go/src/github.com/minio/minio/cmd/namespace-lock_test.go:298]",
// expected metrics.
expectedErr: nil,
expectedLockStatus: "Running",
expectedGlobalLockCount: 10,
expectedRunningLockCount: 10,
expectedBlockedLockCount: 0,
expectedVolPathLockCount: 10,
expectedVolPathRunningCount: 10,
expectedVolPathBlockCount: 0,
},
// Test case 3.
// Hold a write lock, and it should block since 10 read locks
// on <"my-bucket", "my-object"> are still held.
{
volume: "my-bucket",
path: "my-object",
opsID: "10",
readLock: false,
lockOrigin: "[lock held] in github.com/minio/minio/cmd.TestLockStats[/Users/hackintoshrao/mycode/go/src/github.com/minio/minio/cmd/namespace-lock_test.go:298]",
// expected metrics.
expectedErr: nil,
expectedLockStatus: "Blocked",
expectedGlobalLockCount: 11,
expectedRunningLockCount: 10,
expectedBlockedLockCount: 1,
expectedVolPathLockCount: 11,
expectedVolPathRunningCount: 10,
expectedVolPathBlockCount: 1,
},
// Test case 4.
// Expected result when all the read locks are released and the blocked write lock acquires the lock.
{
volume: "my-bucket",
path: "my-object",
opsID: "10",
readLock: false,
lockOrigin: "[lock held] in github.com/minio/minio/cmd.TestLockStats[/Users/hackintoshrao/mycode/go/src/github.com/minio/minio/cmd/namespace-lock_test.go:298]",
// expected metrics.
expectedErr: nil,
expectedLockStatus: "Running",
expectedGlobalLockCount: 1,
expectedRunningLockCount: 1,
expectedBlockedLockCount: 0,
expectedVolPathLockCount: 1,
expectedVolPathRunningCount: 1,
expectedVolPathBlockCount: 0,
},
// Test case - 5.
// At the end after locks are released, its verified whether the counters are set to 0.
{
volume: "my-bucket",
path: "my-object",
// expected metrics.
expectedErr: nil,
expectedLockStatus: "Blocked",
expectedGlobalLockCount: 0,
expectedRunningLockCount: 0,
expectedBlockedLockCount: 0,
},
}
// used to make sure that the tests don't end till locks held in other go routines are released.
var wg sync.WaitGroup
// Hold 5 read locks. We should find the info about these in the RPC response.
// hold 10 read locks.
// Then call the RPC control end point for obtaining lock instrumentation info.
for i := 0; i < 10; i++ {
nsMutex.RLock("my-bucket", "my-object", strconv.Itoa(i))
}
authCfg := &authConfig{
accessKey: s.accessKey,
secretKey: s.secretKey,
address: s.endPoint,
path: path.Join(reservedBucket, controlPath),
loginMethod: "Controller.LoginHandler",
}
client := newAuthClient(authCfg)
defer client.Close()
args := &GenericArgs{}
reply := &SystemLockState{}
// Call the lock instrumentation RPC end point.
err := client.Call("Controller.LockInfo", args, reply)
if err != nil {
c.Errorf("Add: expected no error but got string %q", err.Error())
}
// expected lock info.
expectedLockStats := expectedResult[0]
// verify the actual lock info with the expected one.
// verify the existance entry for first read lock (read lock with opsID "0").
verifyRPCLockInfoResponse(expectedLockStats, *reply, c, 1)
expectedLockStats = expectedResult[1]
// verify the actual lock info with the expected one.
// verify the existance entry for last read lock (read lock with opsID "9").
verifyRPCLockInfoResponse(expectedLockStats, *reply, c, 2)
// now hold a write lock in a different go routine and it should block since 10 read locks are
// still held.
wg.Add(1)
go func() {
defer wg.Done()
// blocks till all read locks are released.
nsMutex.Lock("my-bucket", "my-object", strconv.Itoa(10))
// Once the above attempt to lock is unblocked/acquired, we verify the stats and release the lock.
expectedWLockStats := expectedResult[3]
// Since the write lock acquired here, the number of blocked locks should reduce by 1 and
// count of running locks should increase by 1.
// Call the RPC control handle to fetch the lock instrumentation info.
reply = &SystemLockState{}
// Call the lock instrumentation RPC end point.
err = client.Call("Controller.LockInfo", args, reply)
if err != nil {
c.Errorf("Add: expected no error but got string %q", err.Error())
}
verifyRPCLockInfoResponse(expectedWLockStats, *reply, c, 4)
// release the write lock.
nsMutex.Unlock("my-bucket", "my-object", strconv.Itoa(10))
}()
// waiting for a second so that the attempt to acquire the write lock in
// the above go routines gets blocked.
time.Sleep(1 * time.Second)
// The write lock should have got blocked by now,
// check whether the entry for one blocked lock exists.
expectedLockStats = expectedResult[2]
// Call the RPC control handle to fetch the lock instrumentation info.
reply = &SystemLockState{}
// Call the lock instrumentation RPC end point.
err = client.Call("Controller.LockInfo", args, reply)
if err != nil {
c.Errorf("Add: expected no error but got string %q", err.Error())
}
verifyRPCLockInfoResponse(expectedLockStats, *reply, c, 3)
// Release all the read locks held.
// the blocked write lock in the above go routines should get unblocked.
for i := 0; i < 10; i++ {
nsMutex.RUnlock("my-bucket", "my-object", strconv.Itoa(i))
}
wg.Wait()
// Since all the locks are released. There shouldnt be any entry in the lock info.
// and all the counters should be set to 0.
reply = &SystemLockState{}
// Call the lock instrumentation RPC end point.
err = client.Call("Controller.LockInfo", args, reply)
if err != nil {
c.Errorf("Add: expected no error but got string %q", err.Error())
}
if reply.TotalAcquiredLocks != 0 && reply.TotalLocks != 0 && reply.TotalBlockedLocks != 0 {
c.Fatalf("The counters are not reset properly after all locks are released")
}
if len(reply.LocksInfoPerObject) != 0 {
c.Fatalf("Since all locks are released there shouldn't have been any lock info entry, but found %d", len(reply.LocksInfoPerObject))
}
}
// TestControllerHandlerHealDiskMetadata - Registers and call the `HealDiskMetadataHandler`,
// asserts to validate the success.
func (s *TestRPCControllerSuite) TestControllerHandlerHealDiskMetadata(c *C) {
// The suite has already started the test RPC server, just send RPC calls.
authCfg := &authConfig{
accessKey: s.accessKey,
secretKey: s.secretKey,
address: s.endPoint,
path: path.Join(reservedBucket, controlPath),
loginMethod: "Controller.LoginHandler",
}
client := newAuthClient(authCfg)
defer client.Close()
args := &GenericArgs{}
reply := &GenericReply{}
err := client.Call("Controller.HealDiskMetadataHandler", args, reply)
if err != nil {
c.Errorf("Heal Meta Disk Handler test failed with <ERROR> %s", err.Error())
}
}

View File

@ -58,9 +58,12 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
var err error
var eof bool
if uploadIDMarker != "" {
nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, keyMarker))
// generates random string on setting MINIO_DEBUG=lock, else returns empty string.
// used for instrumentation on locks.
opsID := getOpsID()
nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, keyMarker), opsID)
uploads, _, err = listMultipartUploadIDs(bucket, keyMarker, uploadIDMarker, maxUploads, fs.storage)
nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, keyMarker))
nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, keyMarker), opsID)
if err != nil {
return ListMultipartsInfo{}, err
}
@ -110,9 +113,14 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
var tmpUploads []uploadMetadata
var end bool
uploadIDMarker = ""
nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, entry))
// generates random string on setting MINIO_DEBUG=lock, else returns empty string.
// used for instrumentation on locks.
opsID := getOpsID()
nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, entry), opsID)
tmpUploads, end, err = listMultipartUploadIDs(bucket, entry, uploadIDMarker, maxUploads, fs.storage)
nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, entry))
nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, entry), opsID)
if err != nil {
return ListMultipartsInfo{}, err
}
@ -225,9 +233,13 @@ func (fs fsObjects) newMultipartUpload(bucket string, object string, meta map[st
fsMeta.Meta = meta
}
// generates random string on setting MINIO_DEBUG=lock, else returns empty string.
// used for instrumentation on locks.
opsID := getOpsID()
// This lock needs to be held for any changes to the directory contents of ".minio/multipart/object/"
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object))
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object))
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID)
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID)
uploadID = getUUID()
initiated := time.Now().UTC()
@ -290,7 +302,7 @@ func getFSAppendDataPath(uploadID string) string {
}
// Append parts to fsAppendDataFile.
func appendParts(disk StorageAPI, bucket, object, uploadID string) {
func appendParts(disk StorageAPI, bucket, object, uploadID, opsID string) {
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID)
// fs-append.json path
fsAppendMetaPath := getFSAppendMetaPath(uploadID)
@ -298,16 +310,16 @@ func appendParts(disk StorageAPI, bucket, object, uploadID string) {
fsMetaPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, fsMetaJSONFile)
// Lock the uploadID so that no one modifies fs.json
nsMutex.RLock(minioMetaBucket, uploadIDPath)
nsMutex.RLock(minioMetaBucket, uploadIDPath, opsID)
fsMeta, err := readFSMetadata(disk, minioMetaBucket, fsMetaPath)
nsMutex.RUnlock(minioMetaBucket, uploadIDPath)
nsMutex.RUnlock(minioMetaBucket, uploadIDPath, opsID)
if err != nil {
return
}
// Lock fs-append.json so that there is no parallel append to the file.
nsMutex.Lock(minioMetaBucket, fsAppendMetaPath)
defer nsMutex.Unlock(minioMetaBucket, fsAppendMetaPath)
nsMutex.Lock(minioMetaBucket, fsAppendMetaPath, opsID)
defer nsMutex.Unlock(minioMetaBucket, fsAppendMetaPath, opsID)
fsAppendMeta, err := readFSMetadata(disk, minioMetaBucket, fsAppendMetaPath)
if err != nil {
@ -324,8 +336,9 @@ func appendParts(disk StorageAPI, bucket, object, uploadID string) {
return
}
// Hold write lock on the part so that there is no parallel upload on the part.
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID, strconv.Itoa(part.Number)))
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID, strconv.Itoa(part.Number)))
partPath := pathJoin(mpartMetaPrefix, bucket, object, uploadID, strconv.Itoa(part.Number))
nsMutex.Lock(minioMetaBucket, partPath, opsID)
defer nsMutex.Unlock(minioMetaBucket, partPath, opsID)
// Proceed to append "part"
fsAppendDataPath := getFSAppendDataPath(uploadID)
@ -345,7 +358,6 @@ func appendParts(disk StorageAPI, bucket, object, uploadID string) {
}
}
// Path to the part that needs to be appended.
partPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, part.Name)
offset := int64(0)
totalLeft := part.Size
buf := make([]byte, readSizeV1)
@ -381,7 +393,7 @@ func appendParts(disk StorageAPI, bucket, object, uploadID string) {
// If there are more parts that need to be appended to fsAppendDataFile
_, appendNeeded = partToAppend(fsMeta, fsAppendMeta)
if appendNeeded {
go appendParts(disk, bucket, object, uploadID)
go appendParts(disk, bucket, object, uploadID, opsID)
}
}
@ -404,10 +416,14 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID)
nsMutex.RLock(minioMetaBucket, uploadIDPath)
// generates random string on setting MINIO_DEBUG=lock, else returns empty string.
// used for instrumentation on locks.
opsID := getOpsID()
nsMutex.RLock(minioMetaBucket, uploadIDPath, opsID)
// Just check if the uploadID exists to avoid copy if it doesn't.
uploadIDExists := fs.isUploadIDExists(bucket, object, uploadID)
nsMutex.RUnlock(minioMetaBucket, uploadIDPath)
nsMutex.RUnlock(minioMetaBucket, uploadIDPath, opsID)
if !uploadIDExists {
return "", InvalidUploadID{UploadID: uploadID}
}
@ -466,9 +482,13 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
}
}
// generates random string on setting MINIO_DEBUG=lock, else returns empty string.
// used for instrumentation on locks.
opsID = getOpsID()
// Hold write lock as we are updating fs.json
nsMutex.Lock(minioMetaBucket, uploadIDPath)
defer nsMutex.Unlock(minioMetaBucket, uploadIDPath)
nsMutex.Lock(minioMetaBucket, uploadIDPath, opsID)
defer nsMutex.Unlock(minioMetaBucket, uploadIDPath, opsID)
// Just check if the uploadID exists to avoid copy if it doesn't.
if !fs.isUploadIDExists(bucket, object, uploadID) {
@ -494,7 +514,7 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
if err = writeFSMetadata(fs.storage, minioMetaBucket, fsMetaPath, fsMeta); err != nil {
return "", toObjectErr(err, minioMetaBucket, fsMetaPath)
}
go appendParts(fs.storage, bucket, object, uploadID)
go appendParts(fs.storage, bucket, object, uploadID, opsID)
return newMD5Hex, nil
}
@ -568,9 +588,14 @@ func (fs fsObjects) ListObjectParts(bucket, object, uploadID string, partNumberM
if !IsValidObjectName(object) {
return ListPartsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: object}
}
// generates random string on setting MINIO_DEBUG=lock, else returns empty string.
// used for instrumentation on locks.
opsID := getOpsID()
// Hold lock so that there is no competing abort-multipart-upload or complete-multipart-upload.
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID))
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID))
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID)
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID)
if !fs.isUploadIDExists(bucket, object, uploadID) {
return ListPartsInfo{}, InvalidUploadID{UploadID: uploadID}
@ -601,12 +626,16 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
}
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID)
// generates random string on setting MINIO_DEBUG=lock, else returns empty string.
// used for instrumentation on locks.
opsID := getOpsID()
// Hold lock so that
// 1) no one aborts this multipart upload
// 2) no one does a parallel complete-multipart-upload on this
// multipart upload
nsMutex.Lock(minioMetaBucket, uploadIDPath)
defer nsMutex.Unlock(minioMetaBucket, uploadIDPath)
nsMutex.Lock(minioMetaBucket, uploadIDPath, opsID)
defer nsMutex.Unlock(minioMetaBucket, uploadIDPath, opsID)
if !fs.isUploadIDExists(bucket, object, uploadID) {
return "", InvalidUploadID{UploadID: uploadID}
@ -615,8 +644,8 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
// fs-append.json path
fsAppendMetaPath := getFSAppendMetaPath(uploadID)
// Lock fs-append.json so that no parallel appendParts() is being done.
nsMutex.Lock(minioMetaBucket, fsAppendMetaPath)
defer nsMutex.Unlock(minioMetaBucket, fsAppendMetaPath)
nsMutex.Lock(minioMetaBucket, fsAppendMetaPath, opsID)
defer nsMutex.Unlock(minioMetaBucket, fsAppendMetaPath, opsID)
// Calculate s3 compatible md5sum for complete multipart.
s3MD5, err := completeMultipartMD5(parts...)
@ -724,10 +753,14 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
return "", toObjectErr(err, bucket, object)
}
// generates random string on setting MINIO_DEBUG=lock, else returns empty string.
// used for instrumentation on locks.
opsID = getOpsID()
// Hold the lock so that two parallel complete-multipart-uploads do not
// leave a stale uploads.json behind.
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object))
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object))
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID)
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID)
// Validate if there are other incomplete upload-id's present for
// the object, if yes do not attempt to delete 'uploads.json'.
@ -816,9 +849,13 @@ func (fs fsObjects) AbortMultipartUpload(bucket, object, uploadID string) error
return ObjectNameInvalid{Bucket: bucket, Object: object}
}
// generates random string on setting MINIO_DEBUG=lock, else returns empty string.
// used for instrumentation on locks.
opsID := getOpsID()
// Hold lock so that there is no competing complete-multipart-upload or put-object-part.
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID))
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID))
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID)
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID)
if !fs.isUploadIDExists(bucket, object, uploadID) {
return InvalidUploadID{UploadID: uploadID}
@ -826,8 +863,8 @@ func (fs fsObjects) AbortMultipartUpload(bucket, object, uploadID string) error
fsAppendMetaPath := getFSAppendMetaPath(uploadID)
// Lock fs-append.json so that no parallel appendParts() is being done.
nsMutex.Lock(minioMetaBucket, fsAppendMetaPath)
defer nsMutex.Unlock(minioMetaBucket, fsAppendMetaPath)
nsMutex.Lock(minioMetaBucket, fsAppendMetaPath, opsID)
defer nsMutex.Unlock(minioMetaBucket, fsAppendMetaPath, opsID)
err := fs.abortMultipartUpload(bucket, object, uploadID)
return err

View File

@ -21,6 +21,7 @@ import (
"github.com/fatih/color"
"github.com/minio/minio/pkg/objcache"
"os"
)
// Global constants for Minio.
@ -42,6 +43,10 @@ const (
var (
globalQuiet = false // Quiet flag set via command line
globalTrace = false // Trace flag set via environment setting.
globalDebug = false // Debug flag set to print debug info.
globalDebugLock = false // Lock debug info set via environment variable MINIO_DEBUG=lock .
globalDebugMemory = false // Memory debug info set via environment variable MINIO_DEBUG=mem
// Add new global flags here.
// Maximum connections handled per
@ -70,3 +75,15 @@ var (
colorBlue = color.New(color.FgBlue).SprintfFunc()
colorBold = color.New(color.Bold).SprintFunc()
)
// fetch from environment variables and set the global values related to locks.
func setGlobalsDebugFromEnv() {
debugEnv := os.Getenv("MINIO_DEBUG")
switch debugEnv {
case "lock":
globalDebugLock = true
case "mem":
globalDebugMemory = true
}
globalDebug = globalDebugLock || globalDebugMemory
}

283
cmd/lock-instrument.go Normal file
View File

@ -0,0 +1,283 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"fmt"
"time"
)
const (
debugRLockStr = "RLock"
debugWLockStr = "WLock"
)
// struct containing information of status (ready/running/blocked) of an operation with given operation ID.
type debugLockInfo struct {
lockType string // "Rlock" or "WLock".
lockOrigin string // contains the trace of the function which invoked the lock, obtained from runtime.
status string // status can be running/ready/blocked.
since time.Time // time info of the since how long the status holds true.
}
// debugLockInfo - container for storing locking information for unique copy (volume,path) pair.
// ref variable holds the reference count for locks held for.
// `ref` values helps us understand the n locks held for given <volume, path> pair.
// `running` value helps us understand the total successful locks held (not blocked) for given <volume, path> pair and the operation is under execution.
// `blocked` value helps us understand the total number of operations blocked waiting on locks for given <volume,path> pair.
type debugLockInfoPerVolumePath struct {
ref int64 // running + blocked operations.
running int64 // count of successful lock acquire and running operations.
blocked int64 // count of number of operations blocked waiting on lock.
lockInfo (map[string]debugLockInfo) // map of [operationID] debugLockInfo{operation, status, since} .
}
// returns an instance of debugLockInfo.
// need to create this for every unique pair of {volume,path}.
// total locks, number of calls blocked on locks, and number of successful locks held but not unlocked yet.
func newDebugLockInfoPerVolumePath() *debugLockInfoPerVolumePath {
return &debugLockInfoPerVolumePath{
lockInfo: make(map[string]debugLockInfo),
ref: 0,
blocked: 0,
running: 0,
}
}
// LockInfoNil - Returned if the lock info map is not initialized.
type LockInfoNil struct {
}
func (l LockInfoNil) Error() string {
return fmt.Sprintf("Debug Lock Map not initialized:\n1. Enable Lock Debugging using right ENV settings \n2. Make sure initNSLock() is called.")
}
// LockInfoOriginNotFound - While changing the state of the lock info its important that the entry for
// lock at a given origin exists, if not `LockInfoOriginNotFound` is returned.
type LockInfoOriginNotFound struct {
volume string
path string
operationID string
lockOrigin string
}
func (l LockInfoOriginNotFound) Error() string {
return fmt.Sprintf("No lock state stored for the lock origined at \"%s\", for <volume> %s, <path> %s, <operationID> %s.",
l.lockOrigin, l.volume, l.path, l.operationID)
}
// LockInfoVolPathMssing - Error interface. Returned when the info the
type LockInfoVolPathMssing struct {
volume string
path string
}
func (l LockInfoVolPathMssing) Error() string {
return fmt.Sprintf("No entry in debug Lock Map for Volume: %s, path: %s.", l.volume, l.path)
}
// LockInfoOpsIDNotFound - Returned when the lock state info exists, but the entry for
// given operation ID doesn't exist.
type LockInfoOpsIDNotFound struct {
volume string
path string
operationID string
}
func (l LockInfoOpsIDNotFound) Error() string {
return fmt.Sprintf("No entry in lock info for <Operation ID> %s, <volume> %s, <path> %s.", l.operationID, l.volume, l.path)
}
// LockInfoStateNotBlocked - When an attempt to change the state of the lock form `blocked` to `running` is done,
// its necessary that the state before the transsition is "blocked", otherwise LockInfoStateNotBlocked returned.
type LockInfoStateNotBlocked struct {
volume string
path string
operationID string
}
func (l LockInfoStateNotBlocked) Error() string {
return fmt.Sprintf("Lock state should be \"Blocked\" for <volume> %s, <path> %s, <operationID> %s.", l.volume, l.path, l.operationID)
}
// change the state of the lock from Blocked to Running.
func (n *nsLockMap) statusBlockedToRunning(param nsParam, lockOrigin, operationID string, readLock bool) error {
// This operation is not executed under the scope nsLockMap.mutex.Lock(), lock has to be explicitly held here.
n.lockMapMutex.Lock()
defer n.lockMapMutex.Unlock()
if n.debugLockMap == nil {
return LockInfoNil{}
}
// new state info to be set for the lock.
newLockInfo := debugLockInfo{
lockOrigin: lockOrigin,
status: "Running",
since: time.Now().UTC(),
}
// set lock type.
if readLock {
newLockInfo.lockType = debugRLockStr
} else {
newLockInfo.lockType = debugWLockStr
}
// check whether the lock info entry for <volume, path> pair already exists and its not `nil`.
if debugLockMap, ok := n.debugLockMap[param]; ok {
// ``*debugLockInfoPerVolumePath` entry containing lock info for `param <volume, path>` is `nil`.
if debugLockMap == nil {
return LockInfoNil{}
}
} else {
// The lock state info foe given <volume, path> pair should already exist.
// If not return `LockInfoVolPathMssing`.
return LockInfoVolPathMssing{param.volume, param.path}
}
// Lock info the for the given operation ID shouldn't be `nil`.
if n.debugLockMap[param].lockInfo == nil {
return LockInfoOpsIDNotFound{param.volume, param.path, operationID}
}
if lockInfo, ok := n.debugLockMap[param].lockInfo[operationID]; ok {
// The entry for the lock origined at `lockOrigin` should already exist.
// If not return `LockInfoOriginNotFound`.
if lockInfo.lockOrigin != lockOrigin {
return LockInfoOriginNotFound{param.volume, param.path, operationID, lockOrigin}
}
// Status of the lock should already be set to "Blocked".
// If not return `LockInfoStateNotBlocked`.
if lockInfo.status != "Blocked" {
return LockInfoStateNotBlocked{param.volume, param.path, operationID}
}
} else {
// The lock info entry for given `opsID` should already exist for given <volume, path> pair.
// If not return `LockInfoOpsIDNotFound`.
return LockInfoOpsIDNotFound{param.volume, param.path, operationID}
}
// All checks finished.
// changing the status of the operation from blocked to running and updating the time.
n.debugLockMap[param].lockInfo[operationID] = newLockInfo
// After locking unblocks decrease the blocked counter.
n.blockedCounter--
// Increase the running counter.
n.runningLockCounter++
n.debugLockMap[param].blocked--
n.debugLockMap[param].running++
return nil
}
// change the state of the lock from Ready to Blocked.
func (n *nsLockMap) statusNoneToBlocked(param nsParam, lockOrigin, operationID string, readLock bool) error {
if n.debugLockMap == nil {
return LockInfoNil{}
}
newLockInfo := debugLockInfo{
lockOrigin: lockOrigin,
status: "Blocked",
since: time.Now().UTC(),
}
if readLock {
newLockInfo.lockType = debugRLockStr
} else {
newLockInfo.lockType = debugWLockStr
}
if lockInfo, ok := n.debugLockMap[param]; ok {
if lockInfo == nil {
// *debugLockInfoPerVolumePath entry is nil, initialize here to avoid any case of `nil` pointer access.
n.initLockInfoForVolumePath(param)
}
} else {
// State info entry for the given <volume, pair> doesn't exist, initializing it.
n.initLockInfoForVolumePath(param)
}
// lockInfo is a map[string]debugLockInfo, which holds map[OperationID]{status,time, origin} of the lock.
if n.debugLockMap[param].lockInfo == nil {
n.debugLockMap[param].lockInfo = make(map[string]debugLockInfo)
}
// The status of the operation with the given operation ID is marked blocked till its gets unblocked from the lock.
n.debugLockMap[param].lockInfo[operationID] = newLockInfo
// Increment the Global lock counter.
n.globalLockCounter++
// Increment the counter for number of blocked opertions, decrement it after the locking unblocks.
n.blockedCounter++
// increment the reference of the lock for the given <volume,path> pair.
n.debugLockMap[param].ref++
// increment the blocked counter for the given <volume, path> pair.
n.debugLockMap[param].blocked++
return nil
}
// deleteLockInfoEntry - Deletes the lock state information for given <volume, path> pair. Called when nsLk.ref count is 0.
func (n *nsLockMap) deleteLockInfoEntryForVolumePath(param nsParam) error {
if n.debugLockMap == nil {
return LockInfoNil{}
}
// delete the lock info for the given operation.
if _, found := n.debugLockMap[param]; found {
// Remove from the map if there are no more references for the given (volume,path) pair.
delete(n.debugLockMap, param)
} else {
return LockInfoVolPathMssing{param.volume, param.path}
}
return nil
}
// deleteLockInfoEntry - Deletes the entry for given opsID in the lock state information of given <volume, path> pair.
// called when the nsLk ref count for the given <volume, path> pair is not 0.
func (n *nsLockMap) deleteLockInfoEntryForOps(param nsParam, operationID string) error {
if n.debugLockMap == nil {
return LockInfoNil{}
}
// delete the lock info for the given operation.
if infoMap, found := n.debugLockMap[param]; found {
// the opertion finished holding the lock on the resource, remove the entry for the given operation with the operation ID.
if _, foundInfo := infoMap.lockInfo[operationID]; foundInfo {
// decrease the global running and lock reference counter.
n.runningLockCounter--
n.globalLockCounter--
// decrease the lock referene counter for the lock info for given <volume,path> pair.
// decrease the running operation number. Its assumed that the operation is over once an attempt to release the lock is made.
infoMap.running--
// decrease the total reference count of locks jeld on <volume,path> pair.
infoMap.ref--
delete(infoMap.lockInfo, operationID)
} else {
// Unlock request with invalid opertion ID not accepted.
return LockInfoOpsIDNotFound{param.volume, param.path, operationID}
}
} else {
return LockInfoVolPathMssing{param.volume, param.path}
}
return nil
}
// return randomly generated string ID if lock debug is enabled,
// else returns empty string
func getOpsID() (opsID string) {
// check if lock debug is enabled.
if globalDebugLock {
// generated random ID.
opsID = string(generateRequestID())
}
return opsID
}

744
cmd/lock-instrument_test.go Normal file
View File

@ -0,0 +1,744 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"testing"
"time"
)
type lockStateCase struct {
volume string
path string
lockOrigin string
opsID string
readLock bool // lock type.
setBlocked bool // initialize the initial state to blocked.
expectedErr error
// expected global lock stats.
expectedLockStatus string // Status of the lock Blocked/Running.
expectedGlobalLockCount int // Total number of locks held across the system, includes blocked + held locks.
expectedBlockedLockCount int // Total blocked lock across the system.
expectedRunningLockCount int // Total succesfully held locks (non-blocking).
// expected lock statu for given <volume, path> pair.
expectedVolPathLockCount int // Total locks held for given <volume,path> pair, includes blocked locks.
expectedVolPathRunningCount int // Total succcesfully held locks for given <volume, path> pair.
expectedVolPathBlockCount int // Total locks blocked on the given <volume, path> pair.
}
// Used for validating the Lock info obtaining from contol RPC end point for obtaining lock related info.
func verifyRPCLockInfoResponse(l lockStateCase, rpcLockInfoResponse SystemLockState, t TestErrHandler, testNum int) {
// Assert the total number of locks (locked + acquired) in the system.
if rpcLockInfoResponse.TotalLocks != int64(l.expectedGlobalLockCount) {
t.Fatalf("Test %d: Expected the global lock counter to be %v, but got %v", testNum, int64(l.expectedGlobalLockCount),
rpcLockInfoResponse.TotalLocks)
}
// verify the count for total blocked locks.
if rpcLockInfoResponse.TotalBlockedLocks != int64(l.expectedBlockedLockCount) {
t.Fatalf("Test %d: Expected the total blocked lock counter to be %v, but got %v", testNum, int64(l.expectedBlockedLockCount),
rpcLockInfoResponse.TotalBlockedLocks)
}
// verify the count for total running locks.
if rpcLockInfoResponse.TotalAcquiredLocks != int64(l.expectedRunningLockCount) {
t.Fatalf("Test %d: Expected the total running lock counter to be %v, but got %v", testNum, int64(l.expectedRunningLockCount),
rpcLockInfoResponse.TotalAcquiredLocks)
}
for _, locksInfoPerObject := range rpcLockInfoResponse.LocksInfoPerObject {
// See whether the entry for the <bucket, object> exists in the RPC response.
if locksInfoPerObject.Bucket == l.volume && locksInfoPerObject.Object == l.path {
// Assert the total number of locks (blocked + acquired) for the given <buckt, object> pair.
if locksInfoPerObject.LocksOnObject != int64(l.expectedVolPathLockCount) {
t.Errorf("Test %d: Expected the total lock count for bucket: \"%s\", object: \"%s\" to be %v, but got %v", testNum,
l.volume, l.path, int64(l.expectedVolPathLockCount), locksInfoPerObject.LocksOnObject)
}
// Assert the total number of acquired locks for the given <buckt, object> pair.
if locksInfoPerObject.LocksAcquiredOnObject != int64(l.expectedVolPathRunningCount) {
t.Errorf("Test %d: Expected the acquired lock count for bucket: \"%s\", object: \"%s\" to be %v, but got %v", testNum,
l.volume, l.path, int64(l.expectedVolPathRunningCount), locksInfoPerObject.LocksAcquiredOnObject)
}
// Assert the total number of blocked locks for the given <buckt, object> pair.
if locksInfoPerObject.TotalBlockedLocks != int64(l.expectedVolPathBlockCount) {
t.Errorf("Test %d: Expected the blocked lock count for bucket: \"%s\", object: \"%s\" to be %v, but got %v", testNum,
l.volume, l.path, int64(l.expectedVolPathBlockCount), locksInfoPerObject.TotalBlockedLocks)
}
// Flag to mark whether there's an entry in the RPC lock info response for given opsID.
var opsIDfound bool
for _, opsLockState := range locksInfoPerObject.LockDetailsOnObject {
// first check whether the entry for the given operation ID exists.
if opsLockState.OperationID == l.opsID {
opsIDfound = true
// asserting the type of lock (RLock/WLock) from the RPC lock info response.
if l.readLock {
if opsLockState.LockType != debugRLockStr {
t.Errorf("Test case %d: Expected the lock type to be \"%s\"", testNum, debugRLockStr)
}
} else {
if opsLockState.LockType != debugWLockStr {
t.Errorf("Test case %d: Expected the lock type to be \"%s\"", testNum, debugWLockStr)
}
}
if opsLockState.Status != l.expectedLockStatus {
t.Errorf("Test case %d: Expected the status of the operation to be \"%s\", got \"%s\"", testNum, l.expectedLockStatus, opsLockState.Status)
}
// if opsLockState.LockOrigin != l.lockOrigin {
// t.Fatalf("Test case %d: Expected the origin of the lock to be \"%s\", got \"%s\"", testNum, opsLockState.LockOrigin, l.lockOrigin)
// }
// all check satisfied, return here.
// Any mismatch in the earlier checks would have ended the tests due to `Fatalf`,
// control reaching here implies that all checks are satisfied.
return
}
}
// opsID not found.
// No entry for an operation with given operation ID exists.
if !opsIDfound {
t.Fatalf("Test case %d: Entry for OpsId: \"%s\" not found in <bucket>: \"%s\", <path>: \"%s\" doesn't exist in the RPC response", testNum, l.opsID, l.volume, l.path)
}
}
}
// No entry exists for given <bucket, object> pair in the RPC response.
t.Errorf("Test case %d: Entry for <bucket>: \"%s\", <object>: \"%s\" doesn't exist in the RPC response", testNum, l.volume, l.path)
}
// Asserts the lock counter from the global nsMutex inmemory lock with the expected one.
func verifyGlobalLockStats(l lockStateCase, t *testing.T, testNum int) {
nsMutex.lockMapMutex.Lock()
// Verifying the lock stats.
if nsMutex.globalLockCounter != int64(l.expectedGlobalLockCount) {
t.Errorf("Test %d: Expected the global lock counter to be %v, but got %v", testNum, int64(l.expectedGlobalLockCount),
nsMutex.globalLockCounter)
}
// verify the count for total blocked locks.
if nsMutex.blockedCounter != int64(l.expectedBlockedLockCount) {
t.Errorf("Test %d: Expected the total blocked lock counter to be %v, but got %v", testNum, int64(l.expectedBlockedLockCount),
nsMutex.blockedCounter)
}
// verify the count for total running locks.
if nsMutex.runningLockCounter != int64(l.expectedRunningLockCount) {
t.Errorf("Test %d: Expected the total running lock counter to be %v, but got %v", testNum, int64(l.expectedRunningLockCount),
nsMutex.runningLockCounter)
}
nsMutex.lockMapMutex.Unlock()
// Verifying again with the JSON response of the lock info.
// Verifying the lock stats.
sysLockState, err := generateSystemLockResponse()
if err != nil {
t.Fatalf("Obtaining lock info failed with <ERROR> %s", err)
}
if sysLockState.TotalLocks != int64(l.expectedGlobalLockCount) {
t.Errorf("Test %d: Expected the global lock counter to be %v, but got %v", testNum, int64(l.expectedGlobalLockCount),
sysLockState.TotalLocks)
}
// verify the count for total blocked locks.
if sysLockState.TotalBlockedLocks != int64(l.expectedBlockedLockCount) {
t.Errorf("Test %d: Expected the total blocked lock counter to be %v, but got %v", testNum, int64(l.expectedBlockedLockCount),
sysLockState.TotalBlockedLocks)
}
// verify the count for total running locks.
if sysLockState.TotalAcquiredLocks != int64(l.expectedRunningLockCount) {
t.Errorf("Test %d: Expected the total running lock counter to be %v, but got %v", testNum, int64(l.expectedRunningLockCount),
sysLockState.TotalAcquiredLocks)
}
}
// Verify the lock counter for entries of given <volume, path> pair.
func verifyLockStats(l lockStateCase, t *testing.T, testNum int) {
nsMutex.lockMapMutex.Lock()
defer nsMutex.lockMapMutex.Unlock()
param := nsParam{l.volume, l.path}
// Verify the total locks (blocked+running) for given <vol,path> pair.
if nsMutex.debugLockMap[param].ref != int64(l.expectedVolPathLockCount) {
t.Errorf("Test %d: Expected the total lock count for volume: \"%s\", path: \"%s\" to be %v, but got %v", testNum,
param.volume, param.path, int64(l.expectedVolPathLockCount), nsMutex.debugLockMap[param].ref)
}
// Verify the total running locks for given <volume, path> pair.
if nsMutex.debugLockMap[param].running != int64(l.expectedVolPathRunningCount) {
t.Errorf("Test %d: Expected the total running locks for volume: \"%s\", path: \"%s\" to be %v, but got %v", testNum, param.volume, param.path,
int64(l.expectedVolPathRunningCount), nsMutex.debugLockMap[param].running)
}
// Verify the total blocked locks for givne <volume, path> pair.
if nsMutex.debugLockMap[param].blocked != int64(l.expectedVolPathBlockCount) {
t.Errorf("Test %d: Expected the total blocked locks for volume: \"%s\", path: \"%s\" to be %v, but got %v", testNum, param.volume, param.path,
int64(l.expectedVolPathBlockCount), nsMutex.debugLockMap[param].blocked)
}
}
// verifyLockState - function which asserts the expected lock info in the system with the actual values in the nsMutex.
func verifyLockState(l lockStateCase, t *testing.T, testNum int) {
param := nsParam{l.volume, l.path}
verifyGlobalLockStats(l, t, testNum)
nsMutex.lockMapMutex.Lock()
// Verifying the lock statuS fields.
if debugLockMap, ok := nsMutex.debugLockMap[param]; ok {
if lockInfo, ok := debugLockMap.lockInfo[l.opsID]; ok {
// Validating the lock type filed in the debug lock information.
if l.readLock {
if lockInfo.lockType != debugRLockStr {
t.Errorf("Test case %d: Expected the lock type in the lock debug info to be \"%s\"", testNum, debugRLockStr)
}
} else {
if lockInfo.lockType != debugWLockStr {
t.Errorf("Test case %d: Expected the lock type in the lock debug info to be \"%s\"", testNum, debugWLockStr)
}
}
// // validating the lock origin.
// if l.lockOrigin != lockInfo.lockOrigin {
// t.Fatalf("Test %d: Expected the lock origin info to be \"%s\", but got \"%s\"", testNum, l.lockOrigin, lockInfo.lockOrigin)
// }
// validating the status of the lock.
if lockInfo.status != l.expectedLockStatus {
t.Errorf("Test %d: Expected the status of the lock to be \"%s\", but got \"%s\"", testNum, l.expectedLockStatus, lockInfo.status)
}
} else {
// Stop the tests if lock debug entry for given <volume, path> pair is not found.
t.Errorf("Test case %d: Expected an debug lock entry for opsID \"%s\"", testNum, l.opsID)
}
} else {
// To change the status the entry for given <volume, path> should exist in the lock info struct.
t.Errorf("Test case %d: Debug lock entry for volume: %s, path: %s doesn't exist", testNum, param.volume, param.path)
}
// verifyLockStats holds its own lock.
nsMutex.lockMapMutex.Unlock()
// verify the lock count.
verifyLockStats(l, t, testNum)
}
// TestNewDebugLockInfoPerVolumePath - Validates the values initialized by newDebugLockInfoPerVolumePath().
func TestNewDebugLockInfoPerVolumePath(t *testing.T) {
lockInfo := newDebugLockInfoPerVolumePath()
if lockInfo.ref != 0 {
t.Errorf("Expected initial reference value of total locks to be 0, got %d", lockInfo.ref)
}
if lockInfo.blocked != 0 {
t.Errorf("Expected initial reference of blocked locks to be 0, got %d", lockInfo.blocked)
}
if lockInfo.running != 0 {
t.Errorf("Expected initial reference value of held locks to be 0, got %d", lockInfo.running)
}
}
// TestNsLockMapStatusBlockedToRunning - Validates the function for changing the lock state from blocked to running.
func TestNsLockMapStatusBlockedToRunning(t *testing.T) {
testCases := []struct {
volume string
path string
lockOrigin string
opsID string
readLock bool // lock type.
setBlocked bool // initialize the initial state to blocked.
expectedErr error
}{
// Test case - 1.
{
volume: "my-bucket",
path: "my-object",
lockOrigin: "/home/vadmeste/work/go/src/github.com/minio/minio/xl-v1-object.go:683 +0x2a",
opsID: "abcd1234",
readLock: true,
setBlocked: true,
// expected metrics.
expectedErr: nil,
},
// Test case - 2.
// No entry for <volume, path> pair.
// So an attempt to change the state of the lock from `Blocked`->`Running` should fail.
{
volume: "my-bucket",
path: "my-object-2",
lockOrigin: "/home/vadmeste/work/go/src/github.com/minio/minio/xl-v1-object.go:683 +0x2a",
opsID: "abcd1234",
readLock: false,
setBlocked: false,
// expected metrics.
expectedErr: LockInfoVolPathMssing{"my-bucket", "my-object-2"},
},
// Test case - 3.
// Entry for the given operationID doesn't exist in the lock state info.
{
volume: "my-bucket",
path: "my-object",
lockOrigin: "/home/vadmeste/work/go/src/github.com/minio/minio/xl-v1-object.go:683 +0x2a",
opsID: "ops-Id-not-registered",
readLock: true,
setBlocked: false,
// expected metrics.
expectedErr: LockInfoOpsIDNotFound{"my-bucket", "my-object", "ops-Id-not-registered"},
},
// Test case - 4.
// Test case with non-existent lock origin.
{
volume: "my-bucket",
path: "my-object",
lockOrigin: "Bad Origin",
opsID: "abcd1234",
readLock: true,
setBlocked: false,
// expected metrics.
expectedErr: LockInfoOriginNotFound{"my-bucket", "my-object", "abcd1234", "Bad Origin"},
},
// Test case - 5.
// Test case with write lock.
{
volume: "my-bucket",
path: "my-object",
lockOrigin: "/home/vadmeste/work/go/src/github.com/minio/minio/xl-v1-object.go:683 +0x2a",
opsID: "abcd1234",
readLock: false,
setBlocked: true,
// expected metrics.
expectedErr: nil,
},
}
param := nsParam{testCases[0].volume, testCases[0].path}
// Testing before the initialization done.
// Since the data structures for
actualErr := nsMutex.statusBlockedToRunning(param, testCases[0].lockOrigin,
testCases[0].opsID, testCases[0].readLock)
expectedNilErr := LockInfoNil{}
if actualErr != expectedNilErr {
t.Fatalf("Errors mismatch: Expected \"%s\", got \"%s\"", expectedNilErr, actualErr)
}
nsMutex = &nsLockMap{
// entries of <volume,path> -> stateInfo of locks, for instrumentation purpose.
debugLockMap: make(map[nsParam]*debugLockInfoPerVolumePath),
lockMap: make(map[nsParam]*nsLock),
}
// Entry for <volume, path> pair is set to nil.
// Should fail with `LockInfoNil{}`.
nsMutex.debugLockMap[param] = nil
actualErr = nsMutex.statusBlockedToRunning(param, testCases[0].lockOrigin,
testCases[0].opsID, testCases[0].readLock)
expectedNilErr = LockInfoNil{}
if actualErr != expectedNilErr {
t.Fatalf("Errors mismatch: Expected \"%s\", got \"%s\"", expectedNilErr, actualErr)
}
// Setting the lock info the be `nil`.
nsMutex.debugLockMap[param] = &debugLockInfoPerVolumePath{
lockInfo: nil, // setting the lockinfo to nil.
ref: 0,
blocked: 0,
running: 0,
}
actualErr = nsMutex.statusBlockedToRunning(param, testCases[0].lockOrigin,
testCases[0].opsID, testCases[0].readLock)
expectedOpsErr := LockInfoOpsIDNotFound{testCases[0].volume, testCases[0].path, testCases[0].opsID}
if actualErr != expectedOpsErr {
t.Fatalf("Errors mismatch: Expected \"%s\", got \"%s\"", expectedOpsErr, actualErr)
}
// Next case: ase whether an attempt to change the state of the lock to "Running" done,
// but the initial state if already "Running". Such an attempt should fail
nsMutex.debugLockMap[param] = &debugLockInfoPerVolumePath{
lockInfo: make(map[string]debugLockInfo),
ref: 0,
blocked: 0,
running: 0,
}
// Setting the status of the lock to be "Running".
// The initial state of the lock should set to "Blocked", otherwise its not possible to change the state from "Blocked" -> "Running".
nsMutex.debugLockMap[param].lockInfo[testCases[0].opsID] = debugLockInfo{
lockOrigin: "/home/vadmeste/work/go/src/github.com/minio/minio/xl-v1-object.go:683 +0x2a",
status: "Running", // State set to "Running". Should fail with `LockInfoStateNotBlocked`.
since: time.Now().UTC(),
}
actualErr = nsMutex.statusBlockedToRunning(param, testCases[0].lockOrigin,
testCases[0].opsID, testCases[0].readLock)
expectedBlockErr := LockInfoStateNotBlocked{testCases[0].volume, testCases[0].path, testCases[0].opsID}
if actualErr != expectedBlockErr {
t.Fatalf("Errors mismatch: Expected: \"%s\", got: \"%s\"", expectedBlockErr, actualErr)
}
// enabling lock instrumentation.
globalDebugLock = true
// initializing the locks.
initNSLock(false)
// set debug lock info to `nil` so that the next tests have to initialize them again.
defer func() {
globalDebugLock = false
nsMutex.debugLockMap = nil
}()
// Iterate over the cases and assert the result.
for i, testCase := range testCases {
param := nsParam{testCase.volume, testCase.path}
// status of the lock to be set to "Blocked", before setting Blocked->Running.
if testCase.setBlocked {
nsMutex.lockMapMutex.Lock()
err := nsMutex.statusNoneToBlocked(param, testCase.lockOrigin, testCase.opsID, testCase.readLock)
if err != nil {
t.Fatalf("Test %d: Initializing the initial state to Blocked failed <ERROR> %s", i+1, err)
}
nsMutex.lockMapMutex.Unlock()
}
// invoking the method under test.
actualErr = nsMutex.statusBlockedToRunning(param, testCase.lockOrigin, testCase.opsID, testCase.readLock)
if actualErr != testCase.expectedErr {
t.Fatalf("Test %d: Errors mismatch: Expected: \"%s\", got: \"%s\"", i+1, testCase.expectedErr, actualErr)
}
// In case of no error proceed with validating the lock state information.
if actualErr == nil {
// debug entry for given <volume, path> pair should exist.
if debugLockMap, ok := nsMutex.debugLockMap[param]; ok {
if lockInfo, ok := debugLockMap.lockInfo[testCase.opsID]; ok {
// Validating the lock type filed in the debug lock information.
if testCase.readLock {
if lockInfo.lockType != debugRLockStr {
t.Errorf("Test case %d: Expected the lock type in the lock debug info to be \"%s\"", i+1, debugRLockStr)
}
} else {
if lockInfo.lockType != debugWLockStr {
t.Errorf("Test case %d: Expected the lock type in the lock debug info to be \"%s\"", i+1, debugWLockStr)
}
}
// validating the lock origin.
if testCase.lockOrigin != lockInfo.lockOrigin {
t.Errorf("Test %d: Expected the lock origin info to be \"%s\", but got \"%s\"", i+1, testCase.lockOrigin, lockInfo.lockOrigin)
}
// validating the status of the lock.
if lockInfo.status != "Running" {
t.Errorf("Test %d: Expected the status of the lock to be \"%s\", but got \"%s\"", i+1, "Running", lockInfo.status)
}
} else {
// Stop the tests if lock debug entry for given <volume, path> pair is not found.
t.Fatalf("Test case %d: Expected an debug lock entry for opsID \"%s\"", i+1, testCase.opsID)
}
} else {
// To change the status the entry for given <volume, path> should exist in the lock info struct.
t.Fatalf("Test case %d: Debug lock entry for volume: %s, path: %s doesn't exist", i+1, param.volume, param.path)
}
}
}
}
// TestNsLockMapStatusNoneToBlocked - Validates the function for changing the lock state to blocked
func TestNsLockMapStatusNoneToBlocked(t *testing.T) {
testCases := []lockStateCase{
// Test case - 1.
{
volume: "my-bucket",
path: "my-object",
lockOrigin: "/home/vadmeste/work/go/src/github.com/minio/minio/xl-v1-object.go:683 +0x2a",
opsID: "abcd1234",
readLock: true,
// expected metrics.
expectedErr: nil,
expectedLockStatus: "Blocked",
expectedGlobalLockCount: 1,
expectedRunningLockCount: 0,
expectedBlockedLockCount: 1,
expectedVolPathLockCount: 1,
expectedVolPathRunningCount: 0,
expectedVolPathBlockCount: 1,
},
// Test case - 2.
// No entry for <volume, path> pair.
// So an attempt to change the state of the lock from `Blocked`->`Running` should fail.
{
volume: "my-bucket",
path: "my-object-2",
lockOrigin: "/home/vadmeste/work/go/src/github.com/minio/minio/xl-v1-object.go:683 +0x2a",
opsID: "abcd1234",
readLock: false,
// expected metrics.
expectedErr: nil,
expectedLockStatus: "Blocked",
expectedGlobalLockCount: 2,
expectedRunningLockCount: 0,
expectedBlockedLockCount: 2,
expectedVolPathLockCount: 1,
expectedVolPathRunningCount: 0,
expectedVolPathBlockCount: 1,
},
// Test case - 3.
// Entry for the given operationID doesn't exist in the lock state info.
// The entry should be created and relevant counters should be set.
{
volume: "my-bucket",
path: "my-object",
lockOrigin: "/home/vadmeste/work/go/src/github.com/minio/minio/xl-v1-object.go:683 +0x2a",
opsID: "ops-Id-not-registered",
readLock: true,
// expected metrics.
expectedErr: nil,
expectedLockStatus: "Blocked",
expectedGlobalLockCount: 3,
expectedRunningLockCount: 0,
expectedBlockedLockCount: 3,
expectedVolPathLockCount: 2,
expectedVolPathRunningCount: 0,
expectedVolPathBlockCount: 2,
},
}
param := nsParam{testCases[0].volume, testCases[0].path}
// Testing before the initialization done.
// Since the data structures for
actualErr := nsMutex.statusBlockedToRunning(param, testCases[0].lockOrigin,
testCases[0].opsID, testCases[0].readLock)
expectedNilErr := LockInfoNil{}
if actualErr != expectedNilErr {
t.Fatalf("Errors mismatch: Expected \"%s\", got \"%s\"", expectedNilErr, actualErr)
}
// enabling lock instrumentation.
globalDebugLock = true
// initializing the locks.
initNSLock(false)
// set debug lock info to `nil` so that the next tests have to initialize them again.
defer func() {
globalDebugLock = false
nsMutex.debugLockMap = nil
}()
// Iterate over the cases and assert the result.
for i, testCase := range testCases {
nsMutex.lockMapMutex.Lock()
param := nsParam{testCase.volume, testCase.path}
actualErr := nsMutex.statusNoneToBlocked(param, testCase.lockOrigin, testCase.opsID, testCase.readLock)
if actualErr != testCase.expectedErr {
t.Fatalf("Test %d: Errors mismatch: Expected: \"%s\", got: \"%s\"", i+1, testCase.expectedErr, actualErr)
}
nsMutex.lockMapMutex.Unlock()
if actualErr == nil {
verifyLockState(testCase, t, i+1)
}
}
}
// TestNsLockMapDeleteLockInfoEntryForOps - Validates the removal of entry for given Operational ID from the lock info.
func TestNsLockMapDeleteLockInfoEntryForOps(t *testing.T) {
testCases := []lockStateCase{
// Test case - 1.
{
volume: "my-bucket",
path: "my-object",
lockOrigin: "/home/vadmeste/work/go/src/github.com/minio/minio/xl-v1-object.go:683 +0x2a",
opsID: "abcd1234",
readLock: true,
// expected metrics.
},
}
// case - 1.
// Testing the case where delete lock info is attempted even before the lock is initialized.
param := nsParam{testCases[0].volume, testCases[0].path}
// Testing before the initialization done.
actualErr := nsMutex.deleteLockInfoEntryForOps(param, testCases[0].opsID)
expectedNilErr := LockInfoNil{}
if actualErr != expectedNilErr {
t.Fatalf("Errors mismatch: Expected \"%s\", got \"%s\"", expectedNilErr, actualErr)
}
// enabling lock instrumentation.
globalDebugLock = true
// initializing the locks.
initNSLock(false)
// set debug lock info to `nil` so that the next tests have to initialize them again.
defer func() {
globalDebugLock = false
nsMutex.debugLockMap = nil
}()
// case - 2.
// Case where an attempt to delete the entry for non-existent <volume, path> pair is done.
// Set the status of the lock to blocked and then to running.
nonExistParam := nsParam{volume: "non-exist-volume", path: "non-exist-path"}
actualErr = nsMutex.deleteLockInfoEntryForOps(nonExistParam, testCases[0].opsID)
expectedVolPathErr := LockInfoVolPathMssing{nonExistParam.volume, nonExistParam.path}
if actualErr != expectedVolPathErr {
t.Fatalf("Errors mismatch: Expected \"%s\", got \"%s\"", expectedVolPathErr, actualErr)
}
// Case - 3.
// Lock state is set to Running and then an attempt to delete the info for non-existant opsID done.
nsMutex.lockMapMutex.Lock()
err := nsMutex.statusNoneToBlocked(param, testCases[0].lockOrigin, testCases[0].opsID, testCases[0].readLock)
if err != nil {
t.Fatalf("Setting lock status to Blocked failed: <ERROR> %s", err)
}
nsMutex.lockMapMutex.Unlock()
err = nsMutex.statusBlockedToRunning(param, testCases[0].lockOrigin, testCases[0].opsID, testCases[0].readLock)
if err != nil {
t.Fatalf("Setting lock status to Running failed: <ERROR> %s", err)
}
actualErr = nsMutex.deleteLockInfoEntryForOps(param, "non-existant-OpsID")
expectedOpsIDErr := LockInfoOpsIDNotFound{param.volume, param.path, "non-existant-OpsID"}
if actualErr != expectedOpsIDErr {
t.Fatalf("Errors mismatch: Expected \"%s\", got \"%s\"", expectedOpsIDErr, actualErr)
}
// case - 4.
// Attempt to delete an registered entry is done.
// All metrics should be 0 after deleting the entry.
// Verify that the entry the opsID exists.
if debugLockMap, ok := nsMutex.debugLockMap[param]; ok {
if _, ok := debugLockMap.lockInfo[testCases[0].opsID]; !ok {
t.Fatalf("Entry for OpsID \"%s\" in <volume> %s, <path> %s should have existed. ", testCases[0].opsID, param.volume, param.path)
}
} else {
t.Fatalf("Entry for <volume> %s, <path> %s should have existed. ", param.volume, param.path)
}
actualErr = nsMutex.deleteLockInfoEntryForOps(param, testCases[0].opsID)
if actualErr != nil {
t.Fatalf("Expected the error to be <nil>, but got <ERROR> %s", actualErr)
}
// Verify that the entry for the opsId doesn't exists.
if debugLockMap, ok := nsMutex.debugLockMap[param]; ok {
if _, ok := debugLockMap.lockInfo[testCases[0].opsID]; ok {
t.Fatalf("The entry for opsID \"%s\" should have been deleted", testCases[0].opsID)
}
} else {
t.Fatalf("Entry for <volume> %s, <path> %s should have existed. ", param.volume, param.path)
}
if nsMutex.runningLockCounter != int64(0) {
t.Errorf("Expected the count of total running locks to be %v, but got %v", int64(0), nsMutex.runningLockCounter)
}
if nsMutex.blockedCounter != int64(0) {
t.Errorf("Expected the count of total blocked locks to be %v, but got %v", int64(0), nsMutex.blockedCounter)
}
if nsMutex.globalLockCounter != int64(0) {
t.Errorf("Expected the count of all locks to be %v, but got %v", int64(0), nsMutex.globalLockCounter)
}
}
// TestNsLockMapDeleteLockInfoEntryForVolumePath - Tests validate the logic for removal
// of entry for given <volume, path> pair from lock info.
func TestNsLockMapDeleteLockInfoEntryForVolumePath(t *testing.T) {
testCases := []lockStateCase{
// Test case - 1.
{
volume: "my-bucket",
path: "my-object",
lockOrigin: "/home/vadmeste/work/go/src/github.com/minio/minio/xl-v1-object.go:683 +0x2a",
opsID: "abcd1234",
readLock: true,
// expected metrics.
},
}
// case - 1.
// Testing the case where delete lock info is attempted even before the lock is initialized.
param := nsParam{testCases[0].volume, testCases[0].path}
// Testing before the initialization done.
actualErr := nsMutex.deleteLockInfoEntryForVolumePath(param)
expectedNilErr := LockInfoNil{}
if actualErr != expectedNilErr {
t.Fatalf("Errors mismatch: Expected \"%s\", got \"%s\"", expectedNilErr, actualErr)
}
// enabling lock instrumentation.
globalDebugLock = true
// initializing the locks.
initNSLock(false)
// set debug lock info to `nil` so that the next tests have to initialize them again.
defer func() {
globalDebugLock = false
nsMutex.debugLockMap = nil
}()
// case - 2.
// Case where an attempt to delete the entry for non-existent <volume, path> pair is done.
// Set the status of the lock to blocked and then to running.
nonExistParam := nsParam{volume: "non-exist-volume", path: "non-exist-path"}
actualErr = nsMutex.deleteLockInfoEntryForVolumePath(nonExistParam)
expectedVolPathErr := LockInfoVolPathMssing{nonExistParam.volume, nonExistParam.path}
if actualErr != expectedVolPathErr {
t.Fatalf("Errors mismatch: Expected \"%s\", got \"%s\"", expectedVolPathErr, actualErr)
}
// case - 3.
// Attempt to delete an registered entry is done.
// All metrics should be 0 after deleting the entry.
// Registering the entry first.
nsMutex.lockMapMutex.Lock()
err := nsMutex.statusNoneToBlocked(param, testCases[0].lockOrigin, testCases[0].opsID, testCases[0].readLock)
if err != nil {
t.Fatalf("Setting lock status to Blocked failed: <ERROR> %s", err)
}
nsMutex.lockMapMutex.Unlock()
err = nsMutex.statusBlockedToRunning(param, testCases[0].lockOrigin, testCases[0].opsID, testCases[0].readLock)
if err != nil {
t.Fatalf("Setting lock status to Running failed: <ERROR> %s", err)
}
// Verify that the entry the for given <volume, path> exists.
if _, ok := nsMutex.debugLockMap[param]; !ok {
t.Fatalf("Entry for <volume> %s, <path> %s should have existed.", param.volume, param.path)
}
// first delete the entry for the operation ID.
err = nsMutex.deleteLockInfoEntryForOps(param, testCases[0].opsID)
actualErr = nsMutex.deleteLockInfoEntryForVolumePath(param)
if actualErr != nil {
t.Fatalf("Expected the error to be <nil>, but got <ERROR> %s", actualErr)
}
// Verify that the entry for the opsId doesn't exists.
if _, ok := nsMutex.debugLockMap[param]; ok {
t.Fatalf("Entry for <volume> %s, <path> %s should have been deleted. ", param.volume, param.path)
}
// The lock count values should be 0.
if nsMutex.runningLockCounter != int64(0) {
t.Errorf("Expected the count of total running locks to be %v, but got %v", int64(0), nsMutex.runningLockCounter)
}
if nsMutex.blockedCounter != int64(0) {
t.Errorf("Expected the count of total blocked locks to be %v, but got %v", int64(0), nsMutex.blockedCounter)
}
if nsMutex.globalLockCounter != int64(0) {
t.Errorf("Expected the count of all locks to be %v, but got %v", int64(0), nsMutex.globalLockCounter)
}
}

View File

@ -62,6 +62,9 @@ func init() {
// Set global trace flag.
globalTrace = os.Getenv("MINIO_TRACE") == "1"
// Set all the debug flags from ENV if any.
setGlobalsDebugFromEnv()
}
func migrate() {

View File

@ -18,7 +18,9 @@ package cmd
import (
"errors"
"fmt"
pathutil "path"
"runtime"
"strconv"
"strings"
"sync"
@ -58,6 +60,15 @@ func initNSLock(isDist bool) {
isDist: isDist,
lockMap: make(map[nsParam]*nsLock),
}
if globalDebugLock {
// lock Debugging enabed, initialize nsLockMap with entry for debugging information.
// entries of <volume,path> -> stateInfo of locks, for instrumentation purpose.
nsMutex.debugLockMap = make(map[nsParam]*debugLockInfoPerVolumePath)
}
}
func (n *nsLockMap) initLockInfoForVolumePath(param nsParam) {
n.debugLockMap[param] = newDebugLockInfoPerVolumePath()
}
// RWLocker - interface that any read-write locking library should implement.
@ -83,13 +94,19 @@ type nsLock struct {
// nsLockMap - namespace lock map, provides primitives to Lock,
// Unlock, RLock and RUnlock.
type nsLockMap struct {
// lock counter used for lock debugging.
globalLockCounter int64 //total locks held.
blockedCounter int64 // total operations blocked waiting for locks.
runningLockCounter int64 // total locks held but not released yet.
debugLockMap map[nsParam]*debugLockInfoPerVolumePath // info for instrumentation on locks.
isDist bool // indicates whether the locking service is part of a distributed setup or not.
lockMap map[nsParam]*nsLock
lockMapMutex sync.Mutex
}
// Lock the namespace resource.
func (n *nsLockMap) lock(volume, path string, readLock bool) {
func (n *nsLockMap) lock(volume, path string, lockOrigin, opsID string, readLock bool) {
var nsLk *nsLock
n.lockMapMutex.Lock()
@ -112,6 +129,15 @@ func (n *nsLockMap) lock(volume, path string, readLock bool) {
if readLock && n.isDist {
rwlock = dsync.NewDRWMutex(pathutil.Join(volume, path))
}
if globalDebugLock {
// change the state of the lock to be blocked for the given pair of <volume, path> and <OperationID> till the lock unblocks.
// The lock for accessing `nsMutex` is held inside the function itself.
err := n.statusNoneToBlocked(param, lockOrigin, opsID, readLock)
if err != nil {
errorIf(err, "Failed to set lock state to blocked.")
}
}
// Unlock map before Locking NS which might block.
n.lockMapMutex.Unlock()
@ -133,10 +159,20 @@ func (n *nsLockMap) lock(volume, path string, readLock bool) {
} else {
rwlock.Lock()
}
// check if lock debugging enabled.
if globalDebugLock {
// Changing the status of the operation from blocked to running.
// change the state of the lock to be running (from blocked) for the given pair of <volume, path> and <OperationID>.
err := n.statusBlockedToRunning(param, lockOrigin, opsID, readLock)
if err != nil {
errorIf(err, "Failed to set the lock state to running.")
}
}
}
// Unlock the namespace resource.
func (n *nsLockMap) unlock(volume, path string, readLock bool) {
func (n *nsLockMap) unlock(volume, path, opsID string, readLock bool) {
// nsLk.Unlock() will not block, hence locking the map for the entire function is fine.
n.lockMapMutex.Lock()
defer n.lockMapMutex.Unlock()
@ -163,6 +199,13 @@ func (n *nsLockMap) unlock(volume, path string, readLock bool) {
}
if nsLk.ref != 0 {
nsLk.ref--
// locking debug enabled, delete the lock state entry for given operation ID.
if globalDebugLock {
err := n.deleteLockInfoEntryForOps(param, opsID)
if err != nil {
errorIf(err, "Failed to delete lock info entry.")
}
}
}
if nsLk.ref == 0 {
if len(nsLk.readerArray) != 0 && n.isDist {
@ -171,31 +214,61 @@ func (n *nsLockMap) unlock(volume, path string, readLock bool) {
// Remove from the map if there are no more references.
delete(n.lockMap, param)
// locking debug enabled, delete the lock state entry for given <volume, path> pair.
if globalDebugLock {
err := n.deleteLockInfoEntryForVolumePath(param)
if err != nil {
errorIf(err, "Failed to delete lock info entry.")
}
}
}
}
}
// Lock - locks the given resource for writes, using a previously
// allocated name space lock or initializing a new one.
func (n *nsLockMap) Lock(volume, path string) {
func (n *nsLockMap) Lock(volume, path, opsID string) {
var lockOrigin string
// lock debugging enabled. The caller information of the lock held has be obtained here before calling any other function.
if globalDebugLock {
// fetching the package, function name and the line number of the caller from the runtime.
// here is an example https://play.golang.org/p/perrmNRI9_ .
pc, fn, line, success := runtime.Caller(1)
if !success {
errorIf(errors.New("Couldn't get caller info."), "Fetching caller info form runtime failed.")
}
lockOrigin = fmt.Sprintf("[lock held] in %s[%s:%d]", runtime.FuncForPC(pc).Name(), fn, line)
}
readLock := false
n.lock(volume, path, readLock)
n.lock(volume, path, lockOrigin, opsID, readLock)
}
// Unlock - unlocks any previously acquired write locks.
func (n *nsLockMap) Unlock(volume, path string) {
func (n *nsLockMap) Unlock(volume, path, opsID string) {
readLock := false
n.unlock(volume, path, readLock)
n.unlock(volume, path, opsID, readLock)
}
// RLock - locks any previously acquired read locks.
func (n *nsLockMap) RLock(volume, path string) {
func (n *nsLockMap) RLock(volume, path, opsID string) {
var lockOrigin string
readLock := true
n.lock(volume, path, readLock)
// lock debugging enabled. The caller information of the lock held has be obtained here before calling any other function.
if globalDebugLock {
// fetching the package, function name and the line number of the caller from the runtime.
// here is an example https://play.golang.org/p/perrmNRI9_ .
pc, fn, line, success := runtime.Caller(1)
if !success {
errorIf(errors.New("Couldn't get caller info."), "Fetching caller info form runtime failed.")
}
lockOrigin = fmt.Sprintf("[lock held] in %s[%s:%d]", runtime.FuncForPC(pc).Name(), fn, line)
}
n.lock(volume, path, lockOrigin, opsID, readLock)
}
// RUnlock - unlocks any previously acquired read locks.
func (n *nsLockMap) RUnlock(volume, path string) {
func (n *nsLockMap) RUnlock(volume, path, opsID string) {
readLock := true
n.unlock(volume, path, readLock)
n.unlock(volume, path, opsID, readLock)
}

View File

@ -16,16 +16,21 @@
package cmd
import "testing"
import (
"strconv"
"sync"
"testing"
"time"
)
// Tests functionality provided by namespace lock.
func TestNamespaceLockTest(t *testing.T) {
// List of test cases.
testCases := []struct {
lk func(s1, s2 string)
unlk func(s1, s2 string)
rlk func(s1, s2 string)
runlk func(s1, s2 string)
lk func(s1, s2, s3 string)
unlk func(s1, s2, s3 string)
rlk func(s1, s2, s3 string)
runlk func(s1, s2, s3 string)
lkCount int
lockedRefCount uint
unlockedRefCount uint
@ -58,7 +63,7 @@ func TestNamespaceLockTest(t *testing.T) {
// Write lock tests.
testCase := testCases[0]
testCase.lk("a", "b") // lock once.
testCase.lk("a", "b", "c") // lock once.
nsLk, ok := nsMutex.lockMap[nsParam{"a", "b"}]
if !ok && testCase.shouldPass {
t.Errorf("Lock in map missing.")
@ -67,7 +72,7 @@ func TestNamespaceLockTest(t *testing.T) {
if testCase.lockedRefCount != nsLk.ref && testCase.shouldPass {
t.Errorf("Test %d fails, expected to pass. Wanted ref count is %d, got %d", 1, testCase.lockedRefCount, nsLk.ref)
}
testCase.unlk("a", "b") // unlock once.
testCase.unlk("a", "b", "c") // unlock once.
if testCase.unlockedRefCount != nsLk.ref && testCase.shouldPass {
t.Errorf("Test %d fails, expected to pass. Wanted ref count is %d, got %d", 1, testCase.unlockedRefCount, nsLk.ref)
}
@ -78,10 +83,10 @@ func TestNamespaceLockTest(t *testing.T) {
// Read lock tests.
testCase = testCases[1]
testCase.rlk("a", "b") // lock once.
testCase.rlk("a", "b") // lock second time.
testCase.rlk("a", "b") // lock third time.
testCase.rlk("a", "b") // lock fourth time.
testCase.rlk("a", "b", "c") // lock once.
testCase.rlk("a", "b", "c") // lock second time.
testCase.rlk("a", "b", "c") // lock third time.
testCase.rlk("a", "b", "c") // lock fourth time.
nsLk, ok = nsMutex.lockMap[nsParam{"a", "b"}]
if !ok && testCase.shouldPass {
t.Errorf("Lock in map missing.")
@ -90,8 +95,9 @@ func TestNamespaceLockTest(t *testing.T) {
if testCase.lockedRefCount != nsLk.ref && testCase.shouldPass {
t.Errorf("Test %d fails, expected to pass. Wanted ref count is %d, got %d", 1, testCase.lockedRefCount, nsLk.ref)
}
testCase.runlk("a", "b") // unlock once.
testCase.runlk("a", "b") // unlock second time.
testCase.runlk("a", "b", "c") // unlock once.
testCase.runlk("a", "b", "c") // unlock second time.
if testCase.unlockedRefCount != nsLk.ref && testCase.shouldPass {
t.Errorf("Test %d fails, expected to pass. Wanted ref count is %d, got %d", 2, testCase.unlockedRefCount, nsLk.ref)
}
@ -102,7 +108,7 @@ func TestNamespaceLockTest(t *testing.T) {
// Read lock 0 ref count.
testCase = testCases[2]
testCase.rlk("a", "c") // lock once.
testCase.rlk("a", "c", "d") // lock once.
nsLk, ok = nsMutex.lockMap[nsParam{"a", "c"}]
if !ok && testCase.shouldPass {
@ -112,7 +118,7 @@ func TestNamespaceLockTest(t *testing.T) {
if testCase.lockedRefCount != nsLk.ref && testCase.shouldPass {
t.Errorf("Test %d fails, expected to pass. Wanted ref count is %d, got %d", 3, testCase.lockedRefCount, nsLk.ref)
}
testCase.runlk("a", "c") // unlock once.
testCase.runlk("a", "c", "d") // unlock once.
if testCase.unlockedRefCount != nsLk.ref && testCase.shouldPass {
t.Errorf("Test %d fails, expected to pass. Wanted ref count is %d, got %d", 3, testCase.unlockedRefCount, nsLk.ref)
}
@ -121,3 +127,266 @@ func TestNamespaceLockTest(t *testing.T) {
t.Errorf("Lock map not found.")
}
}
func TestLockStats(t *testing.T) {
expectedResult := []lockStateCase{
// Test case - 1.
// Case where 10 read locks are held.
// Entry for any of the 10 reads locks has to be found.
// Since they held in a loop, Lock origin for first 10 read locks (opsID 0-9) should be the same.
{
volume: "my-bucket",
path: "my-object",
opsID: "0",
readLock: true,
lockOrigin: "[lock held] in github.com/minio/minio/cmd.TestLockStats[/Users/hackintoshrao/mycode/go/src/github.com/minio/minio/cmd/namespace-lock_test.go:298]",
// expected metrics.
expectedErr: nil,
expectedLockStatus: "Running",
expectedGlobalLockCount: 10,
expectedRunningLockCount: 10,
expectedBlockedLockCount: 0,
expectedVolPathLockCount: 10,
expectedVolPathRunningCount: 10,
expectedVolPathBlockCount: 0,
},
// Test case - 2.
// Case where the first 5 read locks are released.
// Entry for any of the 6-10th "Running" reads lock has to be found.
{
volume: "my-bucket",
path: "my-object",
opsID: "6",
readLock: true,
lockOrigin: "[lock held] in github.com/minio/minio/cmd.TestLockStats[/Users/hackintoshrao/mycode/go/src/github.com/minio/minio/cmd/namespace-lock_test.go:298]",
// expected metrics.
expectedErr: nil,
expectedLockStatus: "Running",
expectedGlobalLockCount: 5,
expectedRunningLockCount: 5,
expectedBlockedLockCount: 0,
expectedVolPathLockCount: 5,
expectedVolPathRunningCount: 5,
expectedVolPathBlockCount: 0,
},
// Test case - 3.
{
volume: "my-bucket",
path: "my-object",
opsID: "10",
readLock: false,
lockOrigin: "[lock held] in github.com/minio/minio/cmd.TestLockStats[/Users/hackintoshrao/mycode/go/src/github.com/minio/minio/cmd/namespace-lock_test.go:298]",
// expected metrics.
expectedErr: nil,
expectedLockStatus: "Running",
expectedGlobalLockCount: 2,
expectedRunningLockCount: 1,
expectedBlockedLockCount: 1,
expectedVolPathLockCount: 2,
expectedVolPathRunningCount: 1,
expectedVolPathBlockCount: 1,
},
// Test case - 4.
{
volume: "my-bucket",
path: "my-object",
// expected metrics.
expectedErr: nil,
expectedLockStatus: "Blocked",
expectedGlobalLockCount: 1,
expectedRunningLockCount: 0,
expectedBlockedLockCount: 1,
expectedVolPathLockCount: 1,
expectedVolPathRunningCount: 0,
expectedVolPathBlockCount: 1,
},
// Test case - 5.
{
volume: "my-bucket",
path: "my-object",
opsID: "11",
readLock: false,
lockOrigin: "[lock held] in github.com/minio/minio/cmd.TestLockStats[/Users/hackintoshrao/mycode/go/src/github.com/minio/minio/cmd/namespace-lock_test.go:298]",
// expected metrics.
expectedErr: nil,
expectedLockStatus: "Running",
expectedGlobalLockCount: 1,
expectedRunningLockCount: 1,
expectedBlockedLockCount: 0,
expectedVolPathLockCount: 1,
expectedVolPathRunningCount: 1,
expectedVolPathBlockCount: 0,
},
// Test case - 6.
// Case where in the first 5 read locks are released, but 2 write locks are
// blocked waiting for the remaining 5 read locks locks to be released (10 read locks were held intially).
// We check the entry for the first blocked write call here.
{
volume: "my-bucket",
path: "my-object",
opsID: "10",
readLock: false,
// write lock is held at line 318.
// this confirms that we are looking the right write lock.
lockOrigin: "[lock held] in github.com/minio/minio/cmd.TestLockStats.func2[/Users/hackintoshrao/mycode/go/src/github.com/minio/minio/cmd/namespace-lock_test.go:318]",
// expected metrics.
expectedErr: nil,
expectedLockStatus: "Blocked",
// count of held(running) + blocked locks.
expectedGlobalLockCount: 7,
// count of acquired locks.
expectedRunningLockCount: 5,
// 2 write calls are blocked, waiting for the remaining 5 read locks.
expectedBlockedLockCount: 2,
expectedVolPathLockCount: 7,
expectedVolPathRunningCount: 5,
expectedVolPathBlockCount: 2,
},
// Test case - 7.
// Case where in 9 out of 10 read locks are released.
// Since there's one more pending read lock, the 2 write locks are still blocked.
// Testing the entry for the last read lock.
{volume: "my-bucket",
path: "my-object",
opsID: "9",
readLock: true,
lockOrigin: "[lock held] in github.com/minio/minio/cmd.TestLockStats.func2[/Users/hackintoshrao/mycode/go/src/github.com/minio/minio/cmd/namespace-lock_test.go:318]",
// expected metrics.
expectedErr: nil,
expectedLockStatus: "Running",
// Total running + blocked locks.
// 2 blocked write lock.
expectedGlobalLockCount: 3,
expectedRunningLockCount: 1,
expectedBlockedLockCount: 2,
expectedVolPathLockCount: 3,
expectedVolPathRunningCount: 1,
expectedVolPathBlockCount: 2,
},
// Test case - 8.
{
volume: "my-bucket",
path: "my-object",
// expected metrics.
expectedErr: nil,
expectedLockStatus: "Blocked",
expectedGlobalLockCount: 0,
expectedRunningLockCount: 0,
expectedBlockedLockCount: 0,
},
}
var wg sync.WaitGroup
// enabling lock instrumentation.
globalDebugLock = true
// initializing the locks.
initNSLock(false)
// set debug lock info to `nil` so that the next tests have to initialize them again.
defer func() {
globalDebugLock = false
nsMutex.debugLockMap = nil
}()
// hold 10 read locks.
for i := 0; i < 10; i++ {
nsMutex.RLock("my-bucket", "my-object", strconv.Itoa(i))
}
// expected lock info.
expectedLockStats := expectedResult[0]
// verify the actual lock info with the expected one.
verifyLockState(expectedLockStats, t, 1)
// unlock 5 readlock.
for i := 0; i < 5; i++ {
nsMutex.RUnlock("my-bucket", "my-object", strconv.Itoa(i))
}
expectedLockStats = expectedResult[1]
// verify the actual lock info with the expected one.
verifyLockState(expectedLockStats, t, 2)
syncChan := make(chan struct{}, 1)
wg.Add(1)
go func() {
defer wg.Done()
// blocks till all read locks are released.
nsMutex.Lock("my-bucket", "my-object", strconv.Itoa(10))
// Once the above attempt to lock is unblocked/acquired, we verify the stats and release the lock.
expectedWLockStats := expectedResult[2]
// Since the write lock acquired here, the number of blocked locks should reduce by 1 and
// count of running locks should increase by 1.
verifyLockState(expectedWLockStats, t, 3)
// release the write lock.
nsMutex.Unlock("my-bucket", "my-object", strconv.Itoa(10))
// The number of running locks should decrease by 1.
// expectedWLockStats = expectedResult[3]
// verifyLockState(expectedWLockStats, t, 4)
// Take the lock stats after the first write lock is unlocked.
// Only then unlock then second write lock.
syncChan <- struct{}{}
}()
// waiting so that the write locks in the above go routines are held.
// sleeping so that we can predict the order of the write locks held.
time.Sleep(100 * time.Millisecond)
// since there are 5 more readlocks still held on <"my-bucket","my-object">,
// an attempt to hold write locks blocks. So its run in a new go routine.
wg.Add(1)
go func() {
defer wg.Done()
// blocks till all read locks are released.
nsMutex.Lock("my-bucket", "my-object", strconv.Itoa(11))
// Once the above attempt to lock is unblocked/acquired, we release the lock.
// Unlock the second write lock only after lock stats for first write lock release is taken.
<-syncChan
// The number of running locks should decrease by 1.
expectedWLockStats := expectedResult[4]
verifyLockState(expectedWLockStats, t, 5)
nsMutex.Unlock("my-bucket", "my-object", strconv.Itoa(11))
}()
expectedLockStats = expectedResult[5]
time.Sleep(1 * time.Second)
// verify the actual lock info with the expected one.
verifyLockState(expectedLockStats, t, 6)
// unlock 4 out of remaining 5 read locks.
for i := 0; i < 4; i++ {
nsMutex.RUnlock("my-bucket", "my-object", strconv.Itoa(i+5))
}
// verify the entry for one remaining read lock and count of blocked write locks.
expectedLockStats = expectedResult[6]
// verify the actual lock info with the expected one.
verifyLockState(expectedLockStats, t, 7)
// Releasing the last read lock.
nsMutex.RUnlock("my-bucket", "my-object", strconv.Itoa(9))
wg.Wait()
expectedLockStats = expectedResult[7]
// verify the actual lock info with the expected one.
verifyGlobalLockStats(expectedLockStats, t, 8)
}

View File

@ -26,10 +26,8 @@ import (
"io"
"io/ioutil"
"math/rand"
"net"
"net/http"
"net/http/httptest"
"net/rpc"
"net/url"
"os"
"regexp"
@ -180,6 +178,58 @@ func StartTestServer(t TestErrHandler, instanceType string) TestServer {
return testServer
}
// Initializes control RPC end points.
// The object Layer will be a temp back used for testing purpose.
func initTestControlRPCEndPoint(objectLayer ObjectLayer) http.Handler {
// Initialize Web.
controllerHandlers := &controllerAPIHandlers{
ObjectAPI: func() ObjectLayer { return objectLayer },
}
// Initialize router.
muxRouter := router.NewRouter()
registerControllerRPCRouter(muxRouter, controllerHandlers)
return muxRouter
}
// StartTestRPCServer - Creates a temp XL/FS backend and initializes control RPC end points,
// then starts a test server with those control RPC end points registered.
func StartTestRPCServer(t TestErrHandler, instanceType string) TestServer {
// create temporary backend for the test server.
nDisks := 16
disks, err := getRandomDisks(nDisks)
if err != nil {
t.Fatal("Failed to create disks for the backend")
}
// create an instance of TestServer.
testRPCServer := TestServer{}
// create temporary backend for the test server.
objLayer, err := makeTestBackend(disks, instanceType)
if err != nil {
t.Fatalf("Failed obtaining Temp Backend: <ERROR> %s", err)
}
root, err := newTestConfig("us-east-1")
if err != nil {
t.Fatalf("%s", err)
}
// Get credential.
credentials := serverConfig.GetCredential()
testRPCServer.Root = root
testRPCServer.Disks = disks
testRPCServer.AccessKey = credentials.AccessKeyID
testRPCServer.SecretKey = credentials.SecretAccessKey
testRPCServer.Obj = objLayer
// Run TestServer.
testRPCServer.Server = httptest.NewServer(initTestControlRPCEndPoint(objLayer))
return testRPCServer
}
// Configure the server for the test run.
func newTestConfig(bucketLocation string) (rootPath string, err error) {
// Get test root.
@ -990,37 +1040,3 @@ func initTestWebRPCEndPoint(objLayer ObjectLayer) http.Handler {
registerWebRouter(muxRouter, webHandlers)
return muxRouter
}
// Initialize Controller RPC Handlers for testing
func initTestControllerRPCEndPoint(objLayer ObjectLayer) (string, string, error) {
controllerHandlers := &controllerAPIHandlers{
ObjectAPI: func() ObjectLayer { return objLayer },
}
// Start configuring net/rpc server
server := rpc.NewServer()
server.RegisterName("Controller", controllerHandlers)
listenTCP := func() (net.Listener, string, error) {
l, e := net.Listen("tcp", ":0") // any available address
if e != nil {
return nil, "", errors.New("net.Listen tcp :0, " + e.Error())
}
return l, l.Addr().String(), nil
}
l, serverAddr, err := listenTCP()
if err != nil {
return "", "", nil
}
go server.Accept(l)
// net/rpc only accepts one registered path and doesn't help to unregister it,
// so we are registering a new rpc path each time this function is called
random := strconv.Itoa(rand.Int())
server.HandleHTTP("/controller"+random, "/controller-debug"+random)
testserver := httptest.NewServer(nil)
serverAddr = testserver.Listener.Addr().String()
return serverAddr, random, nil
}

View File

@ -35,8 +35,12 @@ func (xl xlObjects) MakeBucket(bucket string) error {
return toObjectErr(errVolumeExists, bucket)
}
nsMutex.Lock(bucket, "")
defer nsMutex.Unlock(bucket, "")
// generates random string on setting MINIO_DEBUG=lock, else returns empty string.
// used for instrumentation on locks.
opsID := getOpsID()
nsMutex.Lock(bucket, "", opsID)
defer nsMutex.Unlock(bucket, "", opsID)
// Initialize sync waitgroup.
var wg = &sync.WaitGroup{}
@ -157,8 +161,12 @@ func (xl xlObjects) getBucketInfo(bucketName string) (bucketInfo BucketInfo, err
// Checks whether bucket exists.
func (xl xlObjects) isBucketExist(bucket string) bool {
nsMutex.RLock(bucket, "")
defer nsMutex.RUnlock(bucket, "")
// generates random string on setting MINIO_DEBUG=lock, else returns empty string.
// used for instrumentation on locks.
opsID := getOpsID()
nsMutex.RLock(bucket, "", opsID)
defer nsMutex.RUnlock(bucket, "", opsID)
// Check whether bucket exists.
_, err := xl.getBucketInfo(bucket)
@ -178,8 +186,12 @@ func (xl xlObjects) GetBucketInfo(bucket string) (BucketInfo, error) {
if !IsValidBucketName(bucket) {
return BucketInfo{}, BucketNameInvalid{Bucket: bucket}
}
nsMutex.RLock(bucket, "")
defer nsMutex.RUnlock(bucket, "")
// generates random string on setting MINIO_DEBUG=lock, else returns empty string.
// used for instrumentation on locks.
opsID := getOpsID()
nsMutex.RLock(bucket, "", opsID)
defer nsMutex.RUnlock(bucket, "", opsID)
bucketInfo, err := xl.getBucketInfo(bucket)
if err != nil {
return BucketInfo{}, toObjectErr(err, bucket)
@ -254,8 +266,12 @@ func (xl xlObjects) DeleteBucket(bucket string) error {
return BucketNotFound{Bucket: bucket}
}
nsMutex.Lock(bucket, "")
defer nsMutex.Unlock(bucket, "")
// generates random string on setting MINIO_DEBUG=lock, else returns empty string.
// used for instrumentation on locks.
opsID := getOpsID()
nsMutex.Lock(bucket, "", opsID)
defer nsMutex.Unlock(bucket, "", opsID)
// Collect if all disks report volume not found.
var wg = &sync.WaitGroup{}

View File

@ -137,8 +137,13 @@ func (xl xlObjects) listObjectsHeal(bucket, prefix, marker, delimiter string, ma
result.Prefixes = append(result.Prefixes, objInfo.Name)
continue
}
// generates random string on setting MINIO_DEBUG=lock, else returns empty string.
// used for instrumentation on locks.
opsID := getOpsID()
// Check if the current object needs healing
nsMutex.RLock(bucket, objInfo.Name)
nsMutex.RLock(bucket, objInfo.Name, opsID)
partsMetadata, errs := readAllXLMetadata(xl.storageDisks, bucket, objInfo.Name)
if xlShouldHeal(partsMetadata, errs) {
result.Objects = append(result.Objects, ObjectInfo{
@ -148,7 +153,7 @@ func (xl xlObjects) listObjectsHeal(bucket, prefix, marker, delimiter string, ma
IsDir: false,
})
}
nsMutex.RUnlock(bucket, objInfo.Name)
nsMutex.RUnlock(bucket, objInfo.Name, opsID)
}
return result, nil
}

View File

@ -62,7 +62,11 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
// List all upload ids for the keyMarker starting from
// uploadIDMarker first.
if uploadIDMarker != "" {
nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, keyMarker))
// generates random string on setting MINIO_DEBUG=lock, else returns empty string.
// used for instrumentation on locks.
opsID := getOpsID()
nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, keyMarker), opsID)
for _, disk := range xl.getLoadBalancedDisks() {
if disk == nil {
continue
@ -76,7 +80,7 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
}
break
}
nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, keyMarker))
nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, keyMarker), opsID)
if err != nil {
return ListMultipartsInfo{}, err
}
@ -127,8 +131,13 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
var newUploads []uploadMetadata
var end bool
uploadIDMarker = ""
// generates random string on setting MINIO_DEBUG=lock, else returns empty string.
// used for instrumentation on locks.
opsID := getOpsID()
// For the new object entry we get all its pending uploadIDs.
nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, entry))
nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, entry), opsID)
var disk StorageAPI
for _, disk = range xl.getLoadBalancedDisks() {
if disk == nil {
@ -143,7 +152,7 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
}
break
}
nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, entry))
nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, entry), opsID)
if err != nil {
if isErrIgnored(err, walkResultIgnoredErrs) {
continue
@ -269,9 +278,13 @@ func (xl xlObjects) newMultipartUpload(bucket string, object string, meta map[st
xlMeta.Stat.ModTime = time.Now().UTC()
xlMeta.Meta = meta
// generates random string on setting MINIO_DEBUG=lock, else returns empty string.
// used for instrumentation on locks.
opsID := getOpsID()
// This lock needs to be held for any changes to the directory contents of ".minio/multipart/object/"
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object))
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object))
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID)
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID)
uploadID = getUUID()
initiated := time.Now().UTC()
@ -339,20 +352,25 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
var partsMetadata []xlMetaV1
var errs []error
uploadIDPath := pathJoin(mpartMetaPrefix, bucket, object, uploadID)
nsMutex.RLock(minioMetaBucket, uploadIDPath)
// generates random string on setting MINIO_DEBUG=lock, else returns empty string.
// used for instrumentation on locks.
opsID := getOpsID()
nsMutex.RLock(minioMetaBucket, uploadIDPath, opsID)
// Validates if upload ID exists.
if !xl.isUploadIDExists(bucket, object, uploadID) {
nsMutex.RUnlock(minioMetaBucket, uploadIDPath)
nsMutex.RUnlock(minioMetaBucket, uploadIDPath, opsID)
return "", InvalidUploadID{UploadID: uploadID}
}
// Read metadata associated with the object from all disks.
partsMetadata, errs = readAllXLMetadata(xl.storageDisks, minioMetaBucket,
uploadIDPath)
if !isDiskQuorum(errs, xl.writeQuorum) {
nsMutex.RUnlock(minioMetaBucket, uploadIDPath)
nsMutex.RUnlock(minioMetaBucket, uploadIDPath, opsID)
return "", toObjectErr(errXLWriteQuorum, bucket, object)
}
nsMutex.RUnlock(minioMetaBucket, uploadIDPath)
nsMutex.RUnlock(minioMetaBucket, uploadIDPath, opsID)
// List all online disks.
onlineDisks, modTime := listOnlineDisks(xl.storageDisks, partsMetadata, errs)
@ -421,8 +439,12 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
}
}
nsMutex.Lock(minioMetaBucket, uploadIDPath)
defer nsMutex.Unlock(minioMetaBucket, uploadIDPath)
// generates random string on setting MINIO_DEBUG=lock, else returns empty string.
// used for instrumentation on locks.
opsID = getOpsID()
nsMutex.Lock(minioMetaBucket, uploadIDPath, opsID)
defer nsMutex.Unlock(minioMetaBucket, uploadIDPath, opsID)
// Validate again if upload ID still exists.
if !xl.isUploadIDExists(bucket, object, uploadID) {
@ -565,9 +587,14 @@ func (xl xlObjects) ListObjectParts(bucket, object, uploadID string, partNumberM
if !IsValidObjectName(object) {
return ListPartsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: object}
}
// generates random string on setting MINIO_DEBUG=lock, else returns empty string.
// used for instrumentation on locks.
opsID := getOpsID()
// Hold lock so that there is no competing abort-multipart-upload or complete-multipart-upload.
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID))
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID))
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID)
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID)
if !xl.isUploadIDExists(bucket, object, uploadID) {
return ListPartsInfo{}, InvalidUploadID{UploadID: uploadID}
@ -597,11 +624,16 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
Object: object,
}
}
// generates random string on setting MINIO_DEBUG=lock, else returns empty string.
// used for instrumentation on locks.
opsID := getOpsID()
// Hold lock so that
// 1) no one aborts this multipart upload
// 2) no one does a parallel complete-multipart-upload on this multipart upload
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID))
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID))
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID)
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID)
if !xl.isUploadIDExists(bucket, object, uploadID) {
return "", InvalidUploadID{UploadID: uploadID}
@ -712,15 +744,20 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
if rErr != nil {
return "", toObjectErr(rErr, minioMetaBucket, uploadIDPath)
}
// generates random string on setting MINIO_DEBUG=lock, else returns empty string.
// used for instrumentation on locks.
opsID = getOpsID()
// Hold write lock on the destination before rename.
nsMutex.Lock(bucket, object)
nsMutex.Lock(bucket, object, opsID)
defer func() {
// A new complete multipart upload invalidates any
// previously cached object in memory.
xl.objCache.Delete(path.Join(bucket, object))
// This lock also protects the cache namespace.
nsMutex.Unlock(bucket, object)
nsMutex.Unlock(bucket, object, opsID)
// Prefetch the object from disk by triggering a fake GetObject call
// Unlike a regular single PutObject, multipart PutObject is comes in
@ -761,10 +798,14 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
// Delete the previously successfully renamed object.
xl.deleteObject(minioMetaBucket, path.Join(tmpMetaPrefix, uniqueID))
// generates random string on setting MINIO_DEBUG=lock, else returns empty string.
// used for instrumentation on locks.
opsID = getOpsID()
// Hold the lock so that two parallel complete-multipart-uploads do not
// leave a stale uploads.json behind.
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object))
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object))
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID)
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID)
// Validate if there are other incomplete upload-id's present for
// the object, if yes do not attempt to delete 'uploads.json'.
@ -804,8 +845,12 @@ func (xl xlObjects) abortMultipartUpload(bucket, object, uploadID string) (err e
return toObjectErr(err, bucket, object)
}
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object))
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object))
// generates random string on setting MINIO_DEBUG=lock, else returns empty string.
// used for instrumentation on locks.
opsID := getOpsID()
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID)
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID)
// Validate if there are other incomplete upload-id's present for
// the object, if yes do not attempt to delete 'uploads.json'.
uploadsJSON, err := xl.readUploadsJSON(bucket, object)
@ -857,9 +902,13 @@ func (xl xlObjects) AbortMultipartUpload(bucket, object, uploadID string) error
return ObjectNameInvalid{Bucket: bucket, Object: object}
}
// generates random string on setting MINIO_DEBUG=lock, else returns empty string.
// used for instrumentation on locks.
opsID := getOpsID()
// Hold lock so that there is no competing complete-multipart-upload or put-object-part.
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID))
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID))
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID)
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID)
if !xl.isUploadIDExists(bucket, object, uploadID) {
return InvalidUploadID{UploadID: uploadID}

View File

@ -56,9 +56,14 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i
if writer == nil {
return toObjectErr(errUnexpected, bucket, object)
}
// generates random string on setting MINIO_DEBUG=lock, else returns empty string.
// used for instrumentation on locks.
opsID := getOpsID()
// Lock the object before reading.
nsMutex.RLock(bucket, object)
defer nsMutex.RUnlock(bucket, object)
nsMutex.RLock(bucket, object, opsID)
defer nsMutex.RUnlock(bucket, object, opsID)
// Read metadata associated with the object from all disks.
metaArr, errs := readAllXLMetadata(xl.storageDisks, bucket, object)
@ -226,9 +231,13 @@ func (xl xlObjects) HealObject(bucket, object string) error {
return ObjectNameInvalid{Bucket: bucket, Object: object}
}
// generates random string on setting MINIO_DEBUG=lock, else returns empty string.
// used for instrumentation on locks.
opsID := getOpsID()
// Lock the object before healing.
nsMutex.RLock(bucket, object)
defer nsMutex.RUnlock(bucket, object)
nsMutex.RLock(bucket, object, opsID)
defer nsMutex.RUnlock(bucket, object, opsID)
partsMetadata, errs := readAllXLMetadata(xl.storageDisks, bucket, object)
if err := reduceErrs(errs, nil); err != nil {
@ -350,8 +359,13 @@ func (xl xlObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) {
if !IsValidObjectName(object) {
return ObjectInfo{}, ObjectNameInvalid{Bucket: bucket, Object: object}
}
nsMutex.RLock(bucket, object)
defer nsMutex.RUnlock(bucket, object)
// generates random string on setting MINIO_DEBUG=lock, else returns empty string.
// used for instrumentation on locks.
opsID := getOpsID()
nsMutex.RLock(bucket, object, opsID)
defer nsMutex.RUnlock(bucket, object, opsID)
info, err := xl.getObjectInfo(bucket, object)
if err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object)
@ -609,9 +623,13 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
}
}
// generates random string on setting MINIO_DEBUG=lock, else returns empty string.
// used for instrumentation on locks.
opsID := getOpsID()
// Lock the object.
nsMutex.Lock(bucket, object)
defer nsMutex.Unlock(bucket, object)
nsMutex.Lock(bucket, object, opsID)
defer nsMutex.Unlock(bucket, object, opsID)
// Check if an object is present as one of the parent dir.
// -- FIXME. (needs a new kind of lock).
@ -724,8 +742,13 @@ func (xl xlObjects) DeleteObject(bucket, object string) (err error) {
if !IsValidObjectName(object) {
return ObjectNameInvalid{Bucket: bucket, Object: object}
}
nsMutex.Lock(bucket, object)
defer nsMutex.Unlock(bucket, object)
// generates random string on setting MINIO_DEBUG=lock, else returns empty string.
// used for instrumentation on locks.
opsID := getOpsID()
nsMutex.Lock(bucket, object, opsID)
defer nsMutex.Unlock(bucket, object, opsID)
// Validate object exists.
if !xl.isObject(bucket, object) {

View File

@ -178,8 +178,12 @@ func (xl xlObjects) Shutdown() error {
// HealDiskMetadata function for object storage interface.
func (xl xlObjects) HealDiskMetadata() error {
nsMutex.Lock(minioMetaBucket, formatConfigFile)
defer nsMutex.Unlock(minioMetaBucket, formatConfigFile)
// generates random string on setting MINIO_DEBUG=lock, else returns empty string.
// used for instrumentation on locks.
opsID := getOpsID()
nsMutex.Lock(minioMetaBucket, formatConfigFile, opsID)
defer nsMutex.Unlock(minioMetaBucket, formatConfigFile, opsID)
return repairDiskMetadata(xl.storageDisks)
}