mirror of https://github.com/minio/minio.git
Deprecate ListLocks and ClearLocks (#6233)
No locks are ever left in memory, we also have a periodic interval of clearing stale locks anyways. The lock instrumentation was not complete and was seldom used. Deprecate this for now and bring it back later if it is really needed. This also in-turn seems to improve performance slightly.
This commit is contained in:
parent
eb391a53c1
commit
556a51120c
|
@ -275,133 +275,6 @@ func (a adminAPIHandlers) ServerInfoHandler(w http.ResponseWriter, r *http.Reque
|
|||
writeSuccessResponseJSON(w, jsonBytes)
|
||||
}
|
||||
|
||||
// validateLockQueryParams - Validates query params for list/clear
|
||||
// locks management APIs.
|
||||
func validateLockQueryParams(vars url.Values) (string, string, time.Duration,
|
||||
APIErrorCode) {
|
||||
|
||||
bucket := vars.Get(string(mgmtBucket))
|
||||
prefix := vars.Get(string(mgmtPrefix))
|
||||
olderThanStr := vars.Get(string(mgmtLockOlderThan))
|
||||
|
||||
// N B empty bucket name is invalid
|
||||
if !IsValidBucketName(bucket) {
|
||||
return "", "", time.Duration(0), ErrInvalidBucketName
|
||||
}
|
||||
// empty prefix is valid.
|
||||
if !IsValidObjectPrefix(prefix) {
|
||||
return "", "", time.Duration(0), ErrInvalidObjectName
|
||||
}
|
||||
|
||||
// If older-than parameter was empty then set it to 0s to list
|
||||
// all locks older than now.
|
||||
if olderThanStr == "" {
|
||||
olderThanStr = "0s"
|
||||
}
|
||||
duration, err := time.ParseDuration(olderThanStr)
|
||||
if err != nil {
|
||||
logger.LogIf(context.Background(), err)
|
||||
return "", "", time.Duration(0), ErrInvalidDuration
|
||||
}
|
||||
|
||||
return bucket, prefix, duration, ErrNone
|
||||
}
|
||||
|
||||
// ListLocksHandler - GET /minio/admin/v1/locks?bucket=mybucket&prefix=myprefix&older-than=10s
|
||||
// - bucket is a mandatory query parameter
|
||||
// - prefix and older-than are optional query parameters
|
||||
// ---------
|
||||
// Lists locks held on a given bucket, prefix and duration it was held for.
|
||||
func (a adminAPIHandlers) ListLocksHandler(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
adminAPIErr := checkAdminRequestAuthType(r, "")
|
||||
if adminAPIErr != ErrNone {
|
||||
writeErrorResponseJSON(w, adminAPIErr, r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
vars := r.URL.Query()
|
||||
bucket, prefix, duration, adminAPIErr := validateLockQueryParams(vars)
|
||||
if adminAPIErr != ErrNone {
|
||||
writeErrorResponseJSON(w, adminAPIErr, r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
// Fetch lock information of locks matching bucket/prefix that
|
||||
// are available for longer than duration.
|
||||
volLocks, err := listPeerLocksInfo(globalAdminPeers, bucket, prefix,
|
||||
duration)
|
||||
if err != nil {
|
||||
writeErrorResponseJSON(w, ErrInternalError, r.URL)
|
||||
logger.LogIf(context.Background(), err)
|
||||
return
|
||||
}
|
||||
|
||||
// Marshal list of locks as json.
|
||||
jsonBytes, err := json.Marshal(volLocks)
|
||||
if err != nil {
|
||||
writeErrorResponseJSON(w, ErrInternalError, r.URL)
|
||||
logger.LogIf(context.Background(), err)
|
||||
return
|
||||
}
|
||||
|
||||
// Reply with list of locks held on bucket, matching prefix
|
||||
// held longer than duration supplied, as json.
|
||||
writeSuccessResponseJSON(w, jsonBytes)
|
||||
}
|
||||
|
||||
// ClearLocksHandler - DELETE /minio/admin/v1/locks?bucket=mybucket&prefix=myprefix&duration=duration
|
||||
// - bucket is a mandatory query parameter
|
||||
// - prefix and older-than are optional query parameters
|
||||
// ---------
|
||||
// Clear locks held on a given bucket, prefix and duration it was held for.
|
||||
func (a adminAPIHandlers) ClearLocksHandler(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := newContext(r, w, "ClearLocks")
|
||||
|
||||
// Get object layer instance.
|
||||
objLayer := newObjectLayerFn()
|
||||
if objLayer == nil {
|
||||
writeErrorResponseJSON(w, ErrServerNotInitialized, r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
adminAPIErr := checkAdminRequestAuthType(r, "")
|
||||
if adminAPIErr != ErrNone {
|
||||
writeErrorResponseJSON(w, adminAPIErr, r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
vars := r.URL.Query()
|
||||
bucket, prefix, duration, adminAPIErr := validateLockQueryParams(vars)
|
||||
if adminAPIErr != ErrNone {
|
||||
writeErrorResponseJSON(w, adminAPIErr, r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
// Fetch lock information of locks matching bucket/prefix that
|
||||
// are held for longer than duration.
|
||||
volLocks, err := listPeerLocksInfo(globalAdminPeers, bucket, prefix,
|
||||
duration)
|
||||
if err != nil {
|
||||
writeErrorResponseJSON(w, ErrInternalError, r.URL)
|
||||
logger.LogIf(ctx, err)
|
||||
return
|
||||
}
|
||||
|
||||
// Marshal list of locks as json.
|
||||
jsonBytes, err := json.Marshal(volLocks)
|
||||
if err != nil {
|
||||
writeErrorResponseJSON(w, ErrInternalError, r.URL)
|
||||
logger.LogIf(ctx, err)
|
||||
return
|
||||
}
|
||||
|
||||
objLayer.ClearLocks(ctx, volLocks)
|
||||
|
||||
// Reply with list of locks cleared, as json.
|
||||
writeSuccessResponseJSON(w, jsonBytes)
|
||||
}
|
||||
|
||||
// extractHealInitParams - Validates params for heal init API.
|
||||
func extractHealInitParams(r *http.Request) (bucket, objPrefix string,
|
||||
hs madmin.HealOpts, clientToken string, forceStart bool,
|
||||
|
|
|
@ -611,193 +611,6 @@ func TestServiceSetCreds(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// mkLockQueryVal - helper function to build lock query param.
|
||||
func mkLockQueryVal(bucket, prefix, durationStr string) url.Values {
|
||||
qVal := url.Values{}
|
||||
qVal.Set(string(mgmtBucket), bucket)
|
||||
qVal.Set(string(mgmtPrefix), prefix)
|
||||
qVal.Set(string(mgmtLockOlderThan), durationStr)
|
||||
return qVal
|
||||
}
|
||||
|
||||
// Test for locks list management REST API.
|
||||
func TestListLocksHandler(t *testing.T) {
|
||||
adminTestBed, err := prepareAdminXLTestBed()
|
||||
if err != nil {
|
||||
t.Fatal("Failed to initialize a single node XL backend for admin handler tests.")
|
||||
}
|
||||
defer adminTestBed.TearDown()
|
||||
|
||||
// Initialize admin peers to make admin RPC calls.
|
||||
globalMinioAddr = "127.0.0.1:9000"
|
||||
initGlobalAdminPeers(mustGetNewEndpointList("http://127.0.0.1:9000/d1"))
|
||||
|
||||
testCases := []struct {
|
||||
bucket string
|
||||
prefix string
|
||||
duration string
|
||||
expectedStatus int
|
||||
}{
|
||||
// Test 1 - valid testcase
|
||||
{
|
||||
bucket: "mybucket",
|
||||
prefix: "myobject",
|
||||
duration: "1s",
|
||||
expectedStatus: http.StatusOK,
|
||||
},
|
||||
// Test 2 - invalid duration
|
||||
{
|
||||
bucket: "mybucket",
|
||||
prefix: "myprefix",
|
||||
duration: "invalidDuration",
|
||||
expectedStatus: http.StatusBadRequest,
|
||||
},
|
||||
// Test 3 - invalid bucket name
|
||||
{
|
||||
bucket: `invalid\\Bucket`,
|
||||
prefix: "myprefix",
|
||||
duration: "1h",
|
||||
expectedStatus: http.StatusBadRequest,
|
||||
},
|
||||
// Test 4 - invalid prefix
|
||||
{
|
||||
bucket: "mybucket",
|
||||
prefix: `invalid\\Prefix`,
|
||||
duration: "1h",
|
||||
expectedStatus: http.StatusBadRequest,
|
||||
},
|
||||
}
|
||||
|
||||
for i, test := range testCases {
|
||||
queryVal := mkLockQueryVal(test.bucket, test.prefix, test.duration)
|
||||
req, err := newTestRequest("GET", "/minio/admin/v1/locks?"+queryVal.Encode(), 0, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("Test %d - Failed to construct list locks request - %v", i+1, err)
|
||||
}
|
||||
|
||||
cred := globalServerConfig.GetCredential()
|
||||
err = signRequestV4(req, cred.AccessKey, cred.SecretKey)
|
||||
if err != nil {
|
||||
t.Fatalf("Test %d - Failed to sign list locks request - %v", i+1, err)
|
||||
}
|
||||
rec := httptest.NewRecorder()
|
||||
adminTestBed.router.ServeHTTP(rec, req)
|
||||
if test.expectedStatus != rec.Code {
|
||||
t.Errorf("Test %d - Expected HTTP status code %d but received %d", i+1, test.expectedStatus, rec.Code)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Test for locks clear management REST API.
|
||||
func TestClearLocksHandler(t *testing.T) {
|
||||
adminTestBed, err := prepareAdminXLTestBed()
|
||||
if err != nil {
|
||||
t.Fatal("Failed to initialize a single node XL backend for admin handler tests.")
|
||||
}
|
||||
defer adminTestBed.TearDown()
|
||||
|
||||
// Initialize admin peers to make admin RPC calls.
|
||||
initGlobalAdminPeers(mustGetNewEndpointList("http://127.0.0.1:9000/d1"))
|
||||
|
||||
testCases := []struct {
|
||||
bucket string
|
||||
prefix string
|
||||
duration string
|
||||
expectedStatus int
|
||||
}{
|
||||
// Test 1 - valid testcase
|
||||
{
|
||||
bucket: "mybucket",
|
||||
prefix: "myobject",
|
||||
duration: "1s",
|
||||
expectedStatus: http.StatusOK,
|
||||
},
|
||||
// Test 2 - invalid duration
|
||||
{
|
||||
bucket: "mybucket",
|
||||
prefix: "myprefix",
|
||||
duration: "invalidDuration",
|
||||
expectedStatus: http.StatusBadRequest,
|
||||
},
|
||||
// Test 3 - invalid bucket name
|
||||
{
|
||||
bucket: `invalid\\Bucket`,
|
||||
prefix: "myprefix",
|
||||
duration: "1h",
|
||||
expectedStatus: http.StatusBadRequest,
|
||||
},
|
||||
// Test 4 - invalid prefix
|
||||
{
|
||||
bucket: "mybucket",
|
||||
prefix: `invalid\\Prefix`,
|
||||
duration: "1h",
|
||||
expectedStatus: http.StatusBadRequest,
|
||||
},
|
||||
}
|
||||
|
||||
for i, test := range testCases {
|
||||
queryVal := mkLockQueryVal(test.bucket, test.prefix, test.duration)
|
||||
req, err := newTestRequest("DELETE", "/minio/admin/v1/locks?"+queryVal.Encode(), 0, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("Test %d - Failed to construct clear locks request - %v", i+1, err)
|
||||
}
|
||||
|
||||
cred := globalServerConfig.GetCredential()
|
||||
err = signRequestV4(req, cred.AccessKey, cred.SecretKey)
|
||||
if err != nil {
|
||||
t.Fatalf("Test %d - Failed to sign clear locks request - %v", i+1, err)
|
||||
}
|
||||
rec := httptest.NewRecorder()
|
||||
adminTestBed.router.ServeHTTP(rec, req)
|
||||
if test.expectedStatus != rec.Code {
|
||||
t.Errorf("Test %d - Expected HTTP status code %d but received %d", i+1, test.expectedStatus, rec.Code)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Test for lock query param validation helper function.
|
||||
func TestValidateLockQueryParams(t *testing.T) {
|
||||
// reset globals.
|
||||
// this is to make sure that the tests are not affected by modified globals.
|
||||
resetTestGlobals()
|
||||
// initialize NSLock.
|
||||
initNSLock(false)
|
||||
// Sample query values for test cases.
|
||||
allValidVal := mkLockQueryVal("bucket", "prefix", "1s")
|
||||
invalidBucketVal := mkLockQueryVal(`invalid\\Bucket`, "prefix", "1s")
|
||||
invalidPrefixVal := mkLockQueryVal("bucket", `invalid\\Prefix`, "1s")
|
||||
invalidOlderThanVal := mkLockQueryVal("bucket", "prefix", "invalidDuration")
|
||||
|
||||
testCases := []struct {
|
||||
qVals url.Values
|
||||
apiErr APIErrorCode
|
||||
}{
|
||||
{
|
||||
qVals: invalidBucketVal,
|
||||
apiErr: ErrInvalidBucketName,
|
||||
},
|
||||
{
|
||||
qVals: invalidPrefixVal,
|
||||
apiErr: ErrInvalidObjectName,
|
||||
},
|
||||
{
|
||||
qVals: invalidOlderThanVal,
|
||||
apiErr: ErrInvalidDuration,
|
||||
},
|
||||
{
|
||||
qVals: allValidVal,
|
||||
apiErr: ErrNone,
|
||||
},
|
||||
}
|
||||
|
||||
for i, test := range testCases {
|
||||
_, _, _, apiErr := validateLockQueryParams(test.qVals)
|
||||
if apiErr != test.apiErr {
|
||||
t.Errorf("Test %d - Expected error %v but received %v", i+1, test.apiErr, apiErr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// buildAdminRequest - helper function to build an admin API request.
|
||||
func buildAdminRequest(queryVal url.Values, method, path string,
|
||||
contentLength int64, bodySeeker io.ReadSeeker) (*http.Request, error) {
|
||||
|
|
|
@ -53,13 +53,6 @@ func registerAdminRouter(router *mux.Router) {
|
|||
// Info operations
|
||||
adminV1Router.Methods(http.MethodGet).Path("/info").HandlerFunc(httpTraceAll(adminAPI.ServerInfoHandler))
|
||||
|
||||
/// Lock operations
|
||||
|
||||
// List Locks
|
||||
adminV1Router.Methods(http.MethodGet).Path("/locks").HandlerFunc(httpTraceAll(adminAPI.ListLocksHandler))
|
||||
// Clear locks
|
||||
adminV1Router.Methods(http.MethodDelete).Path("/locks").HandlerFunc(httpTraceAll(adminAPI.ClearLocksHandler))
|
||||
|
||||
/// Heal operations
|
||||
|
||||
// Heal processing endpoint.
|
||||
|
|
|
@ -54,19 +54,6 @@ func (rpcClient *AdminRPCClient) ReInitFormat(dryRun bool) error {
|
|||
return rpcClient.Call(adminServiceName+".ReInitFormat", &args, &reply)
|
||||
}
|
||||
|
||||
// ListLocks - Sends list locks command to remote server via RPC.
|
||||
func (rpcClient *AdminRPCClient) ListLocks(bucket, prefix string, duration time.Duration) ([]VolumeLockInfo, error) {
|
||||
args := ListLocksQuery{
|
||||
Bucket: bucket,
|
||||
Prefix: prefix,
|
||||
Duration: duration,
|
||||
}
|
||||
var reply []VolumeLockInfo
|
||||
|
||||
err := rpcClient.Call(adminServiceName+".ListLocks", &args, &reply)
|
||||
return reply, err
|
||||
}
|
||||
|
||||
// ServerInfo - returns the server info of the server to which the RPC call is made.
|
||||
func (rpcClient *AdminRPCClient) ServerInfo() (sid ServerInfoData, err error) {
|
||||
err = rpcClient.Call(adminServiceName+".ServerInfo", &AuthArgs{}, &sid)
|
||||
|
@ -147,7 +134,6 @@ func NewAdminRPCClient(host *xnet.Host) (*AdminRPCClient, error) {
|
|||
type adminCmdRunner interface {
|
||||
SignalService(s serviceSignal) error
|
||||
ReInitFormat(dryRun bool) error
|
||||
ListLocks(bucket, prefix string, duration time.Duration) ([]VolumeLockInfo, error)
|
||||
ServerInfo() (ServerInfoData, error)
|
||||
GetConfig() ([]byte, error)
|
||||
WriteTmpConfig(tmpFileName string, configBytes []byte) error
|
||||
|
@ -244,56 +230,6 @@ func sendServiceCmd(cps adminPeers, cmd serviceSignal) {
|
|||
errs[0] = invokeServiceCmd(cps[0], cmd)
|
||||
}
|
||||
|
||||
// listPeerLocksInfo - fetch list of locks held on the given bucket,
|
||||
// matching prefix held longer than duration from all peer servers.
|
||||
func listPeerLocksInfo(peers adminPeers, bucket, prefix string, duration time.Duration) ([]VolumeLockInfo, error) {
|
||||
// Used to aggregate volume lock information from all nodes.
|
||||
allLocks := make([][]VolumeLockInfo, len(peers))
|
||||
errs := make([]error, len(peers))
|
||||
var wg sync.WaitGroup
|
||||
localPeer := peers[0]
|
||||
remotePeers := peers[1:]
|
||||
for i, remotePeer := range remotePeers {
|
||||
wg.Add(1)
|
||||
go func(idx int, remotePeer adminPeer) {
|
||||
defer wg.Done()
|
||||
// `remotePeers` is right-shifted by one position relative to `peers`
|
||||
allLocks[idx], errs[idx] = remotePeer.cmdRunner.ListLocks(bucket, prefix, duration)
|
||||
}(i+1, remotePeer)
|
||||
}
|
||||
wg.Wait()
|
||||
allLocks[0], errs[0] = localPeer.cmdRunner.ListLocks(bucket, prefix, duration)
|
||||
|
||||
// Summarizing errors received for ListLocks RPC across all
|
||||
// nodes. N B the possible unavailability of quorum in errors
|
||||
// applies only to distributed setup.
|
||||
errCount, err := reduceErrs(errs, []error{})
|
||||
if err != nil {
|
||||
if errCount >= (len(peers)/2 + 1) {
|
||||
return nil, err
|
||||
}
|
||||
return nil, InsufficientReadQuorum{}
|
||||
}
|
||||
|
||||
// Group lock information across nodes by (bucket, object)
|
||||
// pair. For readability only.
|
||||
paramLockMap := make(map[nsParam][]VolumeLockInfo)
|
||||
for _, nodeLocks := range allLocks {
|
||||
for _, lockInfo := range nodeLocks {
|
||||
param := nsParam{
|
||||
volume: lockInfo.Bucket,
|
||||
path: lockInfo.Object,
|
||||
}
|
||||
paramLockMap[param] = append(paramLockMap[param], lockInfo)
|
||||
}
|
||||
}
|
||||
groupedLockInfos := []VolumeLockInfo{}
|
||||
for _, volLocks := range paramLockMap {
|
||||
groupedLockInfos = append(groupedLockInfos, volLocks...)
|
||||
}
|
||||
return groupedLockInfos, nil
|
||||
}
|
||||
|
||||
// uptimeSlice - used to sort uptimes in chronological order.
|
||||
type uptimeSlice []struct {
|
||||
err error
|
||||
|
|
|
@ -19,7 +19,6 @@ package cmd
|
|||
import (
|
||||
"context"
|
||||
"path"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
|
@ -47,20 +46,6 @@ func (receiver *adminRPCReceiver) SignalService(args *SignalServiceArgs, reply *
|
|||
return receiver.local.SignalService(args.Sig)
|
||||
}
|
||||
|
||||
// ListLocksQuery - wraps ListLocks API's query values to send over RPC.
|
||||
type ListLocksQuery struct {
|
||||
AuthArgs
|
||||
Bucket string
|
||||
Prefix string
|
||||
Duration time.Duration
|
||||
}
|
||||
|
||||
// ListLocks - lists locks held by requests handled by this server instance.
|
||||
func (receiver *adminRPCReceiver) ListLocks(args *ListLocksQuery, reply *[]VolumeLockInfo) (err error) {
|
||||
*reply, err = receiver.local.ListLocks(args.Bucket, args.Prefix, args.Duration)
|
||||
return err
|
||||
}
|
||||
|
||||
// ServerInfo - returns the server info when object layer was initialized on this server.
|
||||
func (receiver *adminRPCReceiver) ServerInfo(args *AuthArgs, reply *ServerInfoData) (err error) {
|
||||
*reply, err = receiver.local.ServerInfo()
|
||||
|
|
|
@ -99,31 +99,6 @@ func testAdminCmdRunnerReInitFormat(t *testing.T, client adminCmdRunner) {
|
|||
}
|
||||
}
|
||||
|
||||
func testAdminCmdRunnerListLocks(t *testing.T, client adminCmdRunner) {
|
||||
tmpGlobalObjectAPI := globalObjectAPI
|
||||
defer func() {
|
||||
globalObjectAPI = tmpGlobalObjectAPI
|
||||
}()
|
||||
|
||||
testCases := []struct {
|
||||
objectAPI ObjectLayer
|
||||
expectErr bool
|
||||
}{
|
||||
{&DummyObjectLayer{}, false},
|
||||
{nil, true},
|
||||
}
|
||||
|
||||
for i, testCase := range testCases {
|
||||
globalObjectAPI = testCase.objectAPI
|
||||
_, err := client.ListLocks("", "", time.Duration(0))
|
||||
expectErr := (err != nil)
|
||||
|
||||
if expectErr != testCase.expectErr {
|
||||
t.Fatalf("case %v: expected: %v, got: %v", i+1, testCase.expectErr, expectErr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func testAdminCmdRunnerServerInfo(t *testing.T, client adminCmdRunner) {
|
||||
tmpGlobalBootTime := globalBootTime
|
||||
tmpGlobalObjectAPI := globalObjectAPI
|
||||
|
@ -323,16 +298,6 @@ func TestAdminRPCClientReInitFormat(t *testing.T) {
|
|||
testAdminCmdRunnerReInitFormat(t, rpcClient)
|
||||
}
|
||||
|
||||
func TestAdminRPCClientListLocks(t *testing.T) {
|
||||
httpServer, rpcClient, prevGlobalServerConfig := newAdminRPCHTTPServerClient(t)
|
||||
defer httpServer.Close()
|
||||
defer func() {
|
||||
globalServerConfig = prevGlobalServerConfig
|
||||
}()
|
||||
|
||||
testAdminCmdRunnerListLocks(t, rpcClient)
|
||||
}
|
||||
|
||||
func TestAdminRPCClientServerInfo(t *testing.T) {
|
||||
httpServer, rpcClient, prevGlobalServerConfig := newAdminRPCHTTPServerClient(t)
|
||||
defer httpServer.Close()
|
||||
|
|
|
@ -19,7 +19,6 @@ package cmd
|
|||
import (
|
||||
"context"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/minio/minio/pkg/hash"
|
||||
"github.com/minio/minio/pkg/madmin"
|
||||
|
@ -132,14 +131,6 @@ func (api *DummyObjectLayer) ListObjectsHeal(ctx context.Context, bucket, prefix
|
|||
return
|
||||
}
|
||||
|
||||
func (api *DummyObjectLayer) ListLocks(ctx context.Context, bucket, prefix string, duration time.Duration) (info []VolumeLockInfo, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (api *DummyObjectLayer) ClearLocks(context.Context, []VolumeLockInfo) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (api *DummyObjectLayer) SetBucketPolicy(context.Context, string, *policy.Policy) (err error) {
|
||||
return
|
||||
}
|
||||
|
|
12
cmd/fs-v1.go
12
cmd/fs-v1.go
|
@ -265,18 +265,6 @@ func (fs *FSObjects) StorageInfo(ctx context.Context) StorageInfo {
|
|||
return storageInfo
|
||||
}
|
||||
|
||||
// Locking operations
|
||||
|
||||
// ListLocks - List namespace locks held in object layer
|
||||
func (fs *FSObjects) ListLocks(ctx context.Context, bucket, prefix string, duration time.Duration) ([]VolumeLockInfo, error) {
|
||||
return []VolumeLockInfo{}, NotImplemented{}
|
||||
}
|
||||
|
||||
// ClearLocks - Clear namespace locks held in object layer
|
||||
func (fs *FSObjects) ClearLocks(ctx context.Context, info []VolumeLockInfo) error {
|
||||
return NotImplemented{}
|
||||
}
|
||||
|
||||
/// Bucket operations
|
||||
|
||||
// getBucketDir - will convert incoming bucket names to
|
||||
|
|
|
@ -18,7 +18,6 @@ package cmd
|
|||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
"github.com/minio/minio/pkg/hash"
|
||||
|
@ -137,20 +136,6 @@ func (a GatewayUnsupported) CopyObject(ctx context.Context, srcBucket string, sr
|
|||
return objInfo, NotImplemented{}
|
||||
}
|
||||
|
||||
// Locking operations
|
||||
|
||||
// ListLocks lists namespace locks held in object layer
|
||||
func (a GatewayUnsupported) ListLocks(ctx context.Context, bucket, prefix string, duration time.Duration) ([]VolumeLockInfo, error) {
|
||||
logger.LogIf(ctx, NotImplemented{})
|
||||
return []VolumeLockInfo{}, NotImplemented{}
|
||||
}
|
||||
|
||||
// ClearLocks clears namespace locks held in object layer
|
||||
func (a GatewayUnsupported) ClearLocks(ctx context.Context, info []VolumeLockInfo) error {
|
||||
logger.LogIf(ctx, NotImplemented{})
|
||||
return NotImplemented{}
|
||||
}
|
||||
|
||||
// RefreshBucketPolicy refreshes cache policy with what's on disk.
|
||||
func (a GatewayUnsupported) RefreshBucketPolicy(ctx context.Context, bucket string) error {
|
||||
logger.LogIf(ctx, NotImplemented{})
|
||||
|
|
|
@ -23,7 +23,6 @@ import (
|
|||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
)
|
||||
|
@ -51,16 +50,6 @@ func (lc localAdminClient) ReInitFormat(dryRun bool) error {
|
|||
return objectAPI.ReloadFormat(context.Background(), dryRun)
|
||||
}
|
||||
|
||||
// ListLocks - Fetches lock information from local lock instrumentation.
|
||||
func (lc localAdminClient) ListLocks(bucket, prefix string, duration time.Duration) ([]VolumeLockInfo, error) {
|
||||
objectAPI := newObjectLayerFn()
|
||||
if objectAPI == nil {
|
||||
return nil, errServerNotInitialized
|
||||
}
|
||||
|
||||
return objectAPI.ListLocks(context.Background(), bucket, prefix, duration)
|
||||
}
|
||||
|
||||
// ServerInfo - Returns the server info of this server.
|
||||
func (lc localAdminClient) ServerInfo() (sid ServerInfoData, e error) {
|
||||
if globalBootTime.IsZero() {
|
||||
|
|
|
@ -17,10 +17,7 @@
|
|||
package cmd
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestLocalAdminClientSignalService(t *testing.T) {
|
||||
|
@ -31,10 +28,6 @@ func TestLocalAdminClientReInitFormat(t *testing.T) {
|
|||
testAdminCmdRunnerReInitFormat(t, &localAdminClient{})
|
||||
}
|
||||
|
||||
func TestLocalAdminClientListLocks(t *testing.T) {
|
||||
testAdminCmdRunnerListLocks(t, &localAdminClient{})
|
||||
}
|
||||
|
||||
func TestLocalAdminClientServerInfo(t *testing.T) {
|
||||
testAdminCmdRunnerServerInfo(t, &localAdminClient{})
|
||||
}
|
||||
|
@ -50,91 +43,3 @@ func TestLocalAdminClientWriteTmpConfig(t *testing.T) {
|
|||
func TestLocalAdminClientCommitConfig(t *testing.T) {
|
||||
testAdminCmdRunnerCommitConfig(t, &localAdminClient{})
|
||||
}
|
||||
|
||||
func TestListLocksInfo(t *testing.T) {
|
||||
// reset global variables to start afresh.
|
||||
resetTestGlobals()
|
||||
// Initialize minio server config.
|
||||
rootPath, err := newTestConfig(globalMinioDefaultRegion)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer os.RemoveAll(rootPath)
|
||||
|
||||
// Initializing new XL objectLayer.
|
||||
objAPI, _, xlErr := initTestXLObjLayer()
|
||||
if xlErr != nil {
|
||||
t.Fatalf("failed to init object layer")
|
||||
}
|
||||
// Make objLayer available to all internal services via globalObjectAPI.
|
||||
globalObjLayerMutex.Lock()
|
||||
globalObjectAPI = objAPI
|
||||
globalObjLayerMutex.Unlock()
|
||||
// Set globalIsXL to indicate that the setup uses an erasure code backend.
|
||||
// initialize NSLock.
|
||||
isDistXL := false
|
||||
initNSLock(isDistXL)
|
||||
|
||||
var nsMutex *nsLockMap
|
||||
|
||||
nsMutex = objAPI.(*xlSets).sets[0].nsMutex
|
||||
|
||||
// Acquire a few locks to populate lock instrumentation.
|
||||
// Take 10 read locks on bucket1/prefix1/obj1
|
||||
for i := 0; i < 10; i++ {
|
||||
readLk := nsMutex.NewNSLock("bucket1", "prefix1/obj1")
|
||||
if readLk.GetRLock(newDynamicTimeout(60*time.Second, time.Second)) != nil {
|
||||
t.Errorf("Failed to get read lock on iteration %d", i)
|
||||
}
|
||||
}
|
||||
|
||||
// Take write locks on bucket1/prefix/obj{11..19}
|
||||
for i := 0; i < 10; i++ {
|
||||
wrLk := nsMutex.NewNSLock("bucket1", fmt.Sprintf("prefix1/obj%d", 10+i))
|
||||
if wrLk.GetLock(newDynamicTimeout(60*time.Second, time.Second)) != nil {
|
||||
t.Errorf("Failed to get write lock on iteration %d", i)
|
||||
}
|
||||
}
|
||||
|
||||
client := &localAdminClient{}
|
||||
|
||||
testCases := []struct {
|
||||
bucket string
|
||||
prefix string
|
||||
duration time.Duration
|
||||
numLocks int
|
||||
}{
|
||||
// Test 1 - Matches all the locks acquired above.
|
||||
{
|
||||
bucket: "bucket1",
|
||||
prefix: "prefix1",
|
||||
duration: time.Duration(0 * time.Second),
|
||||
numLocks: 20,
|
||||
},
|
||||
// Test 2 - Bucket doesn't match.
|
||||
{
|
||||
bucket: "bucket",
|
||||
prefix: "prefix1",
|
||||
duration: time.Duration(0 * time.Second),
|
||||
numLocks: 0,
|
||||
},
|
||||
// Test 3 - Prefix doesn't match.
|
||||
{
|
||||
bucket: "bucket1",
|
||||
prefix: "prefix11",
|
||||
duration: time.Duration(0 * time.Second),
|
||||
numLocks: 0,
|
||||
},
|
||||
}
|
||||
|
||||
for i, test := range testCases {
|
||||
actual, err := client.ListLocks(test.bucket, test.prefix, test.duration)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error %v", err)
|
||||
}
|
||||
if len(actual) != test.numLocks {
|
||||
t.Errorf("Test %d - Expected %d locks but observed %d locks",
|
||||
i+1, test.numLocks, len(actual))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,276 +0,0 @@
|
|||
/*
|
||||
* Minio Cloud Storage, (C) 2016, 2017 Minio, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
)
|
||||
|
||||
type statusType string
|
||||
|
||||
const (
|
||||
runningStatus statusType = "Running"
|
||||
blockedStatus statusType = "Blocked"
|
||||
)
|
||||
|
||||
type lockType string
|
||||
|
||||
const (
|
||||
debugRLockStr lockType = "RLock"
|
||||
debugWLockStr lockType = "WLock"
|
||||
)
|
||||
|
||||
// debugLockInfo - represents a single lock's information, uniquely identified by opsID.
|
||||
// See debugLockInfoPerVolumePath for more context.
|
||||
type debugLockInfo struct {
|
||||
// "RLock" or "WLock".
|
||||
lType lockType
|
||||
// Contains the backtrace of incl. the function which called (r)(un)lock.
|
||||
lockSource string
|
||||
// Status can be running/blocked.
|
||||
status statusType
|
||||
// Time of last status update.
|
||||
since time.Time
|
||||
}
|
||||
|
||||
// debugLockInfoPerVolumePath - lock state information on all locks held on (volume, path).
|
||||
type debugLockInfoPerVolumePath struct {
|
||||
counters *lockStat // Holds stats of lock held on (volume, path)
|
||||
lockInfo map[string]debugLockInfo // Lock information per operation ID.
|
||||
}
|
||||
|
||||
// LockInfoOriginMismatch - represents error when lock origin don't match.
|
||||
type LockInfoOriginMismatch struct {
|
||||
volume string
|
||||
path string
|
||||
opsID string
|
||||
lockSource string
|
||||
}
|
||||
|
||||
func (l LockInfoOriginMismatch) Error() string {
|
||||
return fmt.Sprintf("No lock state stored for the lock originated at \"%s\", for <volume> %s, <path> %s, <opsID> %s",
|
||||
l.lockSource, l.volume, l.path, l.opsID)
|
||||
}
|
||||
|
||||
// LockInfoVolPathMissing - represents error when lock information is missing for a given (volume, path).
|
||||
type LockInfoVolPathMissing struct {
|
||||
volume string
|
||||
path string
|
||||
}
|
||||
|
||||
func (l LockInfoVolPathMissing) Error() string {
|
||||
return fmt.Sprintf("No entry in debug Lock Map for Volume: %s, path: %s", l.volume, l.path)
|
||||
}
|
||||
|
||||
// LockInfoOpsIDNotFound - represents error when lock info entry for a given operation ID doesn't exist.
|
||||
type LockInfoOpsIDNotFound struct {
|
||||
volume string
|
||||
path string
|
||||
opsID string
|
||||
}
|
||||
|
||||
func (l LockInfoOpsIDNotFound) Error() string {
|
||||
return fmt.Sprintf("No entry in lock info for <Operation ID> %s, <volume> %s, <path> %s", l.opsID, l.volume, l.path)
|
||||
}
|
||||
|
||||
// LockInfoStateNotBlocked - represents error when lock info isn't in blocked state when it should be.
|
||||
type LockInfoStateNotBlocked struct {
|
||||
volume string
|
||||
path string
|
||||
opsID string
|
||||
}
|
||||
|
||||
func (l LockInfoStateNotBlocked) Error() string {
|
||||
return fmt.Sprintf("Lock state should be \"Blocked\" for <volume> %s, <path> %s, <opsID> %s", l.volume, l.path, l.opsID)
|
||||
}
|
||||
|
||||
// Initialize lock info for given (volume, path).
|
||||
func (n *nsLockMap) initLockInfoForVolumePath(param nsParam) {
|
||||
n.debugLockMap[param] = &debugLockInfoPerVolumePath{
|
||||
lockInfo: make(map[string]debugLockInfo),
|
||||
counters: &lockStat{},
|
||||
}
|
||||
}
|
||||
|
||||
// Change the state of the lock from Blocked to Running.
|
||||
func (n *nsLockMap) statusBlockedToRunning(param nsParam, lockSource, opsID string, readLock bool) error {
|
||||
// This function is called outside nsLockMap.mutex.Lock(), so must be held explicitly.
|
||||
ctx := context.Background()
|
||||
n.lockMapMutex.Lock()
|
||||
defer n.lockMapMutex.Unlock()
|
||||
|
||||
// Check whether the lock info entry for <volume, path> pair already exists.
|
||||
_, ok := n.debugLockMap[param]
|
||||
if !ok {
|
||||
logger.LogIf(ctx, LockInfoVolPathMissing{param.volume, param.path})
|
||||
return LockInfoVolPathMissing{param.volume, param.path}
|
||||
}
|
||||
|
||||
// Check whether lock info entry for the given `opsID` exists.
|
||||
lockInfo, ok := n.debugLockMap[param].lockInfo[opsID]
|
||||
if !ok {
|
||||
logger.LogIf(ctx, LockInfoOpsIDNotFound{param.volume, param.path, opsID})
|
||||
return LockInfoOpsIDNotFound{param.volume, param.path, opsID}
|
||||
}
|
||||
|
||||
// Check whether lockSource is same.
|
||||
if lockInfo.lockSource != lockSource {
|
||||
logger.LogIf(ctx, LockInfoOriginMismatch{param.volume, param.path, opsID, lockSource})
|
||||
return LockInfoOriginMismatch{param.volume, param.path, opsID, lockSource}
|
||||
}
|
||||
|
||||
// Status of the lock should be set to "Blocked".
|
||||
if lockInfo.status != blockedStatus {
|
||||
logger.LogIf(ctx, LockInfoStateNotBlocked{param.volume, param.path, opsID})
|
||||
return LockInfoStateNotBlocked{param.volume, param.path, opsID}
|
||||
}
|
||||
// Change lock status to running and update the time.
|
||||
n.debugLockMap[param].lockInfo[opsID] = newDebugLockInfo(lockSource, runningStatus, readLock)
|
||||
|
||||
// Update global lock stats.
|
||||
n.counters.lockGranted()
|
||||
// Update (volume, pair) lock stats.
|
||||
n.debugLockMap[param].counters.lockGranted()
|
||||
return nil
|
||||
}
|
||||
|
||||
// newDebugLockInfo - Constructs a debugLockInfo value given lock source, status and type.
|
||||
func newDebugLockInfo(lockSource string, status statusType, readLock bool) debugLockInfo {
|
||||
var lType lockType
|
||||
if readLock {
|
||||
lType = debugRLockStr
|
||||
} else {
|
||||
lType = debugWLockStr
|
||||
}
|
||||
return debugLockInfo{
|
||||
lockSource: lockSource,
|
||||
lType: lType,
|
||||
status: status,
|
||||
since: UTCNow(),
|
||||
}
|
||||
}
|
||||
|
||||
// Change the state of the lock to Blocked.
|
||||
func (n *nsLockMap) statusNoneToBlocked(param nsParam, lockSource, opsID string, readLock bool) error {
|
||||
_, ok := n.debugLockMap[param]
|
||||
if !ok {
|
||||
// Lock info entry for (volume, pair) doesn't exist, initialize it.
|
||||
n.initLockInfoForVolumePath(param)
|
||||
}
|
||||
|
||||
// Mark lock status blocked for given opsID.
|
||||
n.debugLockMap[param].lockInfo[opsID] = newDebugLockInfo(lockSource, blockedStatus, readLock)
|
||||
// Update global lock stats.
|
||||
n.counters.lockWaiting()
|
||||
// Update (volume, path) lock stats.
|
||||
n.debugLockMap[param].counters.lockWaiting()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Change the state of the lock from Blocked to none.
|
||||
func (n *nsLockMap) statusBlockedToNone(param nsParam, lockSource, opsID string, readLock bool) error {
|
||||
_, ok := n.debugLockMap[param]
|
||||
ctx := context.Background()
|
||||
if !ok {
|
||||
logger.LogIf(ctx, LockInfoVolPathMissing{param.volume, param.path})
|
||||
return LockInfoVolPathMissing{param.volume, param.path}
|
||||
}
|
||||
|
||||
// Check whether lock info entry for the given `opsID` exists.
|
||||
lockInfo, ok := n.debugLockMap[param].lockInfo[opsID]
|
||||
if !ok {
|
||||
logger.LogIf(ctx, LockInfoOpsIDNotFound{param.volume, param.path, opsID})
|
||||
return LockInfoOpsIDNotFound{param.volume, param.path, opsID}
|
||||
}
|
||||
|
||||
// Check whether lockSource is same.
|
||||
if lockInfo.lockSource != lockSource {
|
||||
logger.LogIf(ctx, LockInfoOriginMismatch{param.volume, param.path, opsID, lockSource})
|
||||
return LockInfoOriginMismatch{param.volume, param.path, opsID, lockSource}
|
||||
}
|
||||
|
||||
// Status of the lock should be set to "Blocked".
|
||||
if lockInfo.status != blockedStatus {
|
||||
logger.LogIf(ctx, LockInfoStateNotBlocked{param.volume, param.path, opsID})
|
||||
return LockInfoStateNotBlocked{param.volume, param.path, opsID}
|
||||
}
|
||||
|
||||
// Update global lock stats.
|
||||
n.counters.lockTimedOut()
|
||||
// Update (volume, path) lock stats.
|
||||
n.debugLockMap[param].counters.lockTimedOut()
|
||||
return nil
|
||||
}
|
||||
|
||||
// deleteLockInfoEntry - Deletes the lock information for given (volume, path).
|
||||
// Called when nsLk.ref count is 0.
|
||||
func (n *nsLockMap) deleteLockInfoEntryForVolumePath(param nsParam) error {
|
||||
// delete the lock info for the given operation.
|
||||
if _, found := n.debugLockMap[param]; !found {
|
||||
logger.LogIf(context.Background(), LockInfoVolPathMissing{param.volume, param.path})
|
||||
return LockInfoVolPathMissing{param.volume, param.path}
|
||||
}
|
||||
|
||||
// The following stats update is relevant only in case of a
|
||||
// ForceUnlock. In case of the last unlock on a (volume,
|
||||
// path), this would be a no-op.
|
||||
volumePathLocks := n.debugLockMap[param]
|
||||
for _, lockInfo := range volumePathLocks.lockInfo {
|
||||
granted := lockInfo.status == runningStatus
|
||||
// Update global and (volume, path) stats.
|
||||
n.counters.lockRemoved(granted)
|
||||
volumePathLocks.counters.lockRemoved(granted)
|
||||
}
|
||||
delete(n.debugLockMap, param)
|
||||
return nil
|
||||
}
|
||||
|
||||
// deleteLockInfoEntry - Deletes lock info entry for given opsID.
|
||||
// Called when the nsLk ref count for the given (volume, path) is
|
||||
// not 0.
|
||||
func (n *nsLockMap) deleteLockInfoEntryForOps(param nsParam, opsID string) error {
|
||||
ctx := context.Background()
|
||||
// delete the lock info for the given operation.
|
||||
infoMap, found := n.debugLockMap[param]
|
||||
if !found {
|
||||
logger.LogIf(ctx, LockInfoVolPathMissing{param.volume, param.path})
|
||||
return LockInfoVolPathMissing{param.volume, param.path}
|
||||
}
|
||||
// The operation finished holding the lock on the resource, remove
|
||||
// the entry for the given operation with the operation ID.
|
||||
opsIDLock, foundInfo := infoMap.lockInfo[opsID]
|
||||
if !foundInfo {
|
||||
// Unlock request with invalid operation ID not accepted.
|
||||
logger.LogIf(ctx, LockInfoOpsIDNotFound{param.volume, param.path, opsID})
|
||||
return LockInfoOpsIDNotFound{param.volume, param.path, opsID}
|
||||
}
|
||||
// Update global and (volume, path) lock status.
|
||||
granted := opsIDLock.status == runningStatus
|
||||
n.counters.lockRemoved(granted)
|
||||
infoMap.counters.lockRemoved(granted)
|
||||
delete(infoMap.lockInfo, opsID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Return randomly generated string ID
|
||||
func getOpsID() string {
|
||||
return mustGetUUID()
|
||||
}
|
|
@ -1,667 +0,0 @@
|
|||
/*
|
||||
* Minio Cloud Storage, (C) 2016, 2017 Minio, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
type lockStateCase struct {
|
||||
volume string
|
||||
path string
|
||||
lockSource string
|
||||
opsID string
|
||||
readLock bool // lock type.
|
||||
setBlocked bool // initialize the initial state to blocked.
|
||||
expectedErr error
|
||||
// Expected global lock stats.
|
||||
expectedLockStatus statusType // Status of the lock Blocked/Running.
|
||||
|
||||
expectedGlobalLockCount int // Total number of locks held across the system, includes blocked + held locks.
|
||||
expectedBlockedLockCount int // Total blocked lock across the system.
|
||||
expectedRunningLockCount int // Total successfully held locks (non-blocking).
|
||||
// Expected lock status for given <volume, path> pair.
|
||||
expectedVolPathLockCount int // Total locks held for given <volume,path> pair, includes blocked locks.
|
||||
expectedVolPathRunningCount int // Total succcesfully held locks for given <volume, path> pair.
|
||||
expectedVolPathBlockCount int // Total locks blocked on the given <volume, path> pair.
|
||||
}
|
||||
|
||||
// Read entire state of the locks in the system and return.
|
||||
func getSystemLockState() (SystemLockState, error) {
|
||||
globalNSMutex.lockMapMutex.Lock()
|
||||
defer globalNSMutex.lockMapMutex.Unlock()
|
||||
|
||||
lockState := SystemLockState{}
|
||||
|
||||
lockState.TotalBlockedLocks = globalNSMutex.counters.blocked
|
||||
lockState.TotalLocks = globalNSMutex.counters.total
|
||||
lockState.TotalAcquiredLocks = globalNSMutex.counters.granted
|
||||
|
||||
for param, debugLock := range globalNSMutex.debugLockMap {
|
||||
volLockInfo := VolumeLockInfo{}
|
||||
volLockInfo.Bucket = param.volume
|
||||
volLockInfo.Object = param.path
|
||||
volLockInfo.LocksOnObject = debugLock.counters.total
|
||||
volLockInfo.TotalBlockedLocks = debugLock.counters.blocked
|
||||
volLockInfo.LocksAcquiredOnObject = debugLock.counters.granted
|
||||
for opsID, lockInfo := range debugLock.lockInfo {
|
||||
volLockInfo.LockDetailsOnObject = append(volLockInfo.LockDetailsOnObject, OpsLockState{
|
||||
OperationID: opsID,
|
||||
LockSource: lockInfo.lockSource,
|
||||
LockType: lockInfo.lType,
|
||||
Status: lockInfo.status,
|
||||
Since: lockInfo.since,
|
||||
})
|
||||
}
|
||||
lockState.LocksInfoPerObject = append(lockState.LocksInfoPerObject, volLockInfo)
|
||||
}
|
||||
return lockState, nil
|
||||
}
|
||||
|
||||
// Asserts the lock counter from the global globalNSMutex inmemory lock with the expected one.
|
||||
func verifyGlobalLockStats(l lockStateCase, t *testing.T, testNum int) {
|
||||
globalNSMutex.lockMapMutex.Lock()
|
||||
|
||||
// Verifying the lock stats.
|
||||
if globalNSMutex.counters.total != int64(l.expectedGlobalLockCount) {
|
||||
t.Errorf("Test %d: Expected the global lock counter to be %v, but got %v", testNum, int64(l.expectedGlobalLockCount),
|
||||
globalNSMutex.counters.total)
|
||||
}
|
||||
// verify the count for total blocked locks.
|
||||
if globalNSMutex.counters.blocked != int64(l.expectedBlockedLockCount) {
|
||||
t.Errorf("Test %d: Expected the total blocked lock counter to be %v, but got %v", testNum, int64(l.expectedBlockedLockCount),
|
||||
globalNSMutex.counters.blocked)
|
||||
}
|
||||
// verify the count for total running locks.
|
||||
if globalNSMutex.counters.granted != int64(l.expectedRunningLockCount) {
|
||||
t.Errorf("Test %d: Expected the total running lock counter to be %v, but got %v", testNum, int64(l.expectedRunningLockCount),
|
||||
globalNSMutex.counters.granted)
|
||||
}
|
||||
globalNSMutex.lockMapMutex.Unlock()
|
||||
// Verifying again with the JSON response of the lock info.
|
||||
// Verifying the lock stats.
|
||||
sysLockState, err := getSystemLockState()
|
||||
if err != nil {
|
||||
t.Fatalf("Obtaining lock info failed with <ERROR> %s", err)
|
||||
|
||||
}
|
||||
if sysLockState.TotalLocks != int64(l.expectedGlobalLockCount) {
|
||||
t.Errorf("Test %d: Expected the global lock counter to be %v, but got %v", testNum, int64(l.expectedGlobalLockCount),
|
||||
sysLockState.TotalLocks)
|
||||
}
|
||||
// verify the count for total blocked locks.
|
||||
if sysLockState.TotalBlockedLocks != int64(l.expectedBlockedLockCount) {
|
||||
t.Errorf("Test %d: Expected the total blocked lock counter to be %v, but got %v", testNum, int64(l.expectedBlockedLockCount),
|
||||
sysLockState.TotalBlockedLocks)
|
||||
}
|
||||
// verify the count for total running locks.
|
||||
if sysLockState.TotalAcquiredLocks != int64(l.expectedRunningLockCount) {
|
||||
t.Errorf("Test %d: Expected the total running lock counter to be %v, but got %v", testNum, int64(l.expectedRunningLockCount),
|
||||
sysLockState.TotalAcquiredLocks)
|
||||
}
|
||||
}
|
||||
|
||||
// Verify the lock counter for entries of given <volume, path> pair.
|
||||
func verifyLockStats(l lockStateCase, t *testing.T, testNum int) {
|
||||
globalNSMutex.lockMapMutex.Lock()
|
||||
defer globalNSMutex.lockMapMutex.Unlock()
|
||||
param := nsParam{l.volume, l.path}
|
||||
|
||||
// Verify the total locks (blocked+running) for given <vol,path> pair.
|
||||
if globalNSMutex.debugLockMap[param].counters.total != int64(l.expectedVolPathLockCount) {
|
||||
t.Errorf("Test %d: Expected the total lock count for volume: \"%s\", path: \"%s\" to be %v, but got %v", testNum,
|
||||
param.volume, param.path, int64(l.expectedVolPathLockCount), globalNSMutex.debugLockMap[param].counters.total)
|
||||
}
|
||||
// Verify the total running locks for given <volume, path> pair.
|
||||
if globalNSMutex.debugLockMap[param].counters.granted != int64(l.expectedVolPathRunningCount) {
|
||||
t.Errorf("Test %d: Expected the total running locks for volume: \"%s\", path: \"%s\" to be %v, but got %v", testNum, param.volume, param.path,
|
||||
int64(l.expectedVolPathRunningCount), globalNSMutex.debugLockMap[param].counters.granted)
|
||||
}
|
||||
// Verify the total blocked locks for givne <volume, path> pair.
|
||||
if globalNSMutex.debugLockMap[param].counters.blocked != int64(l.expectedVolPathBlockCount) {
|
||||
t.Errorf("Test %d: Expected the total blocked locks for volume: \"%s\", path: \"%s\" to be %v, but got %v", testNum, param.volume, param.path,
|
||||
int64(l.expectedVolPathBlockCount), globalNSMutex.debugLockMap[param].counters.blocked)
|
||||
}
|
||||
}
|
||||
|
||||
// verifyLockState - function which asserts the expected lock info in the system with the actual values in the globalNSMutex.
|
||||
func verifyLockState(l lockStateCase, t *testing.T, testNum int) {
|
||||
param := nsParam{l.volume, l.path}
|
||||
|
||||
verifyGlobalLockStats(l, t, testNum)
|
||||
globalNSMutex.lockMapMutex.Lock()
|
||||
// Verifying the lock statuS fields.
|
||||
if debugLockMap, ok := globalNSMutex.debugLockMap[param]; ok {
|
||||
if lockInfo, ok := debugLockMap.lockInfo[l.opsID]; ok {
|
||||
// Validating the lock type filed in the debug lock information.
|
||||
if l.readLock {
|
||||
if lockInfo.lType != debugRLockStr {
|
||||
t.Errorf("Test case %d: Expected the lock type in the lock debug info to be \"%s\"", testNum, debugRLockStr)
|
||||
}
|
||||
} else {
|
||||
if lockInfo.lType != debugWLockStr {
|
||||
t.Errorf("Test case %d: Expected the lock type in the lock debug info to be \"%s\"", testNum, debugWLockStr)
|
||||
}
|
||||
}
|
||||
|
||||
// // validating the lock origin.
|
||||
// if l.lockSource != lockInfo.lockSource {
|
||||
// t.Fatalf("Test %d: Expected the lock origin info to be \"%s\", but got \"%s\"", testNum, l.lockSource, lockInfo.lockSource)
|
||||
// }
|
||||
// validating the status of the lock.
|
||||
if lockInfo.status != l.expectedLockStatus {
|
||||
t.Errorf("Test %d: Expected the status of the lock to be \"%s\", but got \"%s\"", testNum, l.expectedLockStatus, lockInfo.status)
|
||||
}
|
||||
} else {
|
||||
// Stop the tests if lock debug entry for given <volume, path> pair is not found.
|
||||
t.Errorf("Test case %d: Expected an debug lock entry for opsID \"%s\"", testNum, l.opsID)
|
||||
}
|
||||
} else {
|
||||
// To change the status the entry for given <volume, path> should exist in the lock info struct.
|
||||
t.Errorf("Test case %d: Debug lock entry for volume: %s, path: %s doesn't exist", testNum, param.volume, param.path)
|
||||
}
|
||||
// verifyLockStats holds its own lock.
|
||||
globalNSMutex.lockMapMutex.Unlock()
|
||||
|
||||
// verify the lock count.
|
||||
verifyLockStats(l, t, testNum)
|
||||
}
|
||||
|
||||
// TestNewDebugLockInfoPerVolumePath - Validates the values initialized by newDebugLockInfoPerVolumePath().
|
||||
func TestNewDebugLockInfoPerVolumePath(t *testing.T) {
|
||||
lockInfo := &debugLockInfoPerVolumePath{
|
||||
lockInfo: make(map[string]debugLockInfo),
|
||||
counters: &lockStat{},
|
||||
}
|
||||
|
||||
if lockInfo.counters.total != 0 {
|
||||
t.Errorf("Expected initial reference value of total locks to be 0, got %d", lockInfo.counters.total)
|
||||
}
|
||||
if lockInfo.counters.blocked != 0 {
|
||||
t.Errorf("Expected initial reference of blocked locks to be 0, got %d", lockInfo.counters.blocked)
|
||||
}
|
||||
if lockInfo.counters.granted != 0 {
|
||||
t.Errorf("Expected initial reference value of held locks to be 0, got %d", lockInfo.counters.granted)
|
||||
}
|
||||
}
|
||||
|
||||
// TestNsLockMapStatusBlockedToRunning - Validates the function for changing the lock state from blocked to running.
|
||||
func TestNsLockMapStatusBlockedToRunning(t *testing.T) {
|
||||
|
||||
testCases := []struct {
|
||||
volume string
|
||||
path string
|
||||
lockSource string
|
||||
opsID string
|
||||
readLock bool // Read lock type.
|
||||
setBlocked bool // Initialize the initial state to blocked.
|
||||
expectedErr error
|
||||
}{
|
||||
// Test case - 1.
|
||||
{
|
||||
volume: "my-bucket",
|
||||
path: "my-object",
|
||||
lockSource: "/home/vadmeste/work/go/src/github.com/minio/minio/xl-v1-object.go:683 +0x2a",
|
||||
opsID: "abcd1234",
|
||||
readLock: true,
|
||||
setBlocked: true,
|
||||
// expected metrics.
|
||||
expectedErr: nil,
|
||||
},
|
||||
// Test case - 2.
|
||||
// No entry for <volume, path> pair.
|
||||
// So an attempt to change the state of the lock from `Blocked`->`Running` should fail.
|
||||
{
|
||||
volume: "my-bucket",
|
||||
path: "my-object-2",
|
||||
lockSource: "/home/vadmeste/work/go/src/github.com/minio/minio/xl-v1-object.go:683 +0x2a",
|
||||
opsID: "abcd1234",
|
||||
readLock: false,
|
||||
setBlocked: false,
|
||||
// expected metrics.
|
||||
expectedErr: LockInfoVolPathMissing{"my-bucket", "my-object-2"},
|
||||
},
|
||||
// Test case - 3.
|
||||
// Entry for the given operationID doesn't exist in the lock state info.
|
||||
{
|
||||
volume: "my-bucket",
|
||||
path: "my-object",
|
||||
lockSource: "/home/vadmeste/work/go/src/github.com/minio/minio/xl-v1-object.go:683 +0x2a",
|
||||
opsID: "ops-Id-not-registered",
|
||||
readLock: true,
|
||||
setBlocked: false,
|
||||
// expected metrics.
|
||||
expectedErr: LockInfoOpsIDNotFound{"my-bucket", "my-object", "ops-Id-not-registered"},
|
||||
},
|
||||
// Test case - 4.
|
||||
// Test case with non-existent lock origin.
|
||||
{
|
||||
volume: "my-bucket",
|
||||
path: "my-object",
|
||||
lockSource: "Bad Origin",
|
||||
opsID: "abcd1234",
|
||||
readLock: true,
|
||||
setBlocked: false,
|
||||
// expected metrics.
|
||||
expectedErr: LockInfoOriginMismatch{"my-bucket", "my-object", "abcd1234", "Bad Origin"},
|
||||
},
|
||||
// Test case - 5.
|
||||
// Test case with write lock.
|
||||
{
|
||||
volume: "my-bucket",
|
||||
path: "my-object",
|
||||
lockSource: "/home/vadmeste/work/go/src/github.com/minio/minio/xl-v1-object.go:683 +0x2a",
|
||||
opsID: "abcd1234",
|
||||
readLock: false,
|
||||
setBlocked: true,
|
||||
// expected metrics.
|
||||
expectedErr: nil,
|
||||
},
|
||||
}
|
||||
|
||||
param := nsParam{testCases[0].volume, testCases[0].path}
|
||||
// Testing before the initialization done.
|
||||
// Since the data structures for
|
||||
actualErr := globalNSMutex.statusBlockedToRunning(param, testCases[0].lockSource,
|
||||
testCases[0].opsID, testCases[0].readLock)
|
||||
|
||||
expectedErr := LockInfoVolPathMissing{testCases[0].volume, testCases[0].path}
|
||||
if actualErr != expectedErr {
|
||||
t.Fatalf("Errors mismatch: Expected \"%s\", got \"%s\"", expectedErr, actualErr)
|
||||
}
|
||||
|
||||
globalNSMutex = &nsLockMap{
|
||||
// entries of <volume,path> -> stateInfo of locks, for instrumentation purpose.
|
||||
debugLockMap: make(map[nsParam]*debugLockInfoPerVolumePath),
|
||||
lockMap: make(map[nsParam]*nsLock),
|
||||
}
|
||||
|
||||
// Setting the lock info the be `nil`.
|
||||
globalNSMutex.debugLockMap[param] = &debugLockInfoPerVolumePath{
|
||||
lockInfo: nil, // setting the lockinfo to nil.
|
||||
counters: &lockStat{},
|
||||
}
|
||||
|
||||
actualErr = globalNSMutex.statusBlockedToRunning(param, testCases[0].lockSource,
|
||||
testCases[0].opsID, testCases[0].readLock)
|
||||
|
||||
expectedOpsErr := LockInfoOpsIDNotFound{testCases[0].volume, testCases[0].path, testCases[0].opsID}
|
||||
if actualErr != expectedOpsErr {
|
||||
t.Fatalf("Errors mismatch: Expected \"%s\", got \"%s\"", expectedOpsErr, actualErr)
|
||||
}
|
||||
|
||||
// Next case: ase whether an attempt to change the state of the lock to "Running" done,
|
||||
// but the initial state if already "Running". Such an attempt should fail
|
||||
globalNSMutex.debugLockMap[param] = &debugLockInfoPerVolumePath{
|
||||
lockInfo: make(map[string]debugLockInfo),
|
||||
counters: &lockStat{},
|
||||
}
|
||||
|
||||
// Setting the status of the lock to be "Running".
|
||||
// The initial state of the lock should set to "Blocked", otherwise its not possible to change the state from "Blocked" -> "Running".
|
||||
globalNSMutex.debugLockMap[param].lockInfo[testCases[0].opsID] = debugLockInfo{
|
||||
lockSource: "/home/vadmeste/work/go/src/github.com/minio/minio/xl-v1-object.go:683 +0x2a",
|
||||
status: "Running", // State set to "Running". Should fail with `LockInfoStateNotBlocked`.
|
||||
since: UTCNow(),
|
||||
}
|
||||
|
||||
actualErr = globalNSMutex.statusBlockedToRunning(param, testCases[0].lockSource,
|
||||
testCases[0].opsID, testCases[0].readLock)
|
||||
|
||||
expectedBlockErr := LockInfoStateNotBlocked{testCases[0].volume, testCases[0].path, testCases[0].opsID}
|
||||
if actualErr != expectedBlockErr {
|
||||
t.Fatalf("Errors mismatch: Expected: \"%s\", got: \"%s\"", expectedBlockErr, actualErr)
|
||||
}
|
||||
|
||||
// initializing the locks.
|
||||
initNSLock(false)
|
||||
|
||||
// Iterate over the cases and assert the result.
|
||||
for i, testCase := range testCases {
|
||||
param := nsParam{testCase.volume, testCase.path}
|
||||
// status of the lock to be set to "Blocked", before setting Blocked->Running.
|
||||
if testCase.setBlocked {
|
||||
globalNSMutex.lockMapMutex.Lock()
|
||||
err := globalNSMutex.statusNoneToBlocked(param, testCase.lockSource, testCase.opsID, testCase.readLock)
|
||||
if err != nil {
|
||||
t.Fatalf("Test %d: Initializing the initial state to Blocked failed <ERROR> %s", i+1, err)
|
||||
}
|
||||
globalNSMutex.lockMapMutex.Unlock()
|
||||
}
|
||||
// invoking the method under test.
|
||||
actualErr = globalNSMutex.statusBlockedToRunning(param, testCase.lockSource, testCase.opsID, testCase.readLock)
|
||||
if actualErr != testCase.expectedErr {
|
||||
t.Fatalf("Test %d: Errors mismatch: Expected: \"%s\", got: \"%s\"", i+1, testCase.expectedErr, actualErr)
|
||||
}
|
||||
// In case of no error proceed with validating the lock state information.
|
||||
if actualErr == nil {
|
||||
// debug entry for given <volume, path> pair should exist.
|
||||
if debugLockMap, ok := globalNSMutex.debugLockMap[param]; ok {
|
||||
if lockInfo, ok := debugLockMap.lockInfo[testCase.opsID]; ok {
|
||||
// Validating the lock type filed in the debug lock information.
|
||||
if testCase.readLock {
|
||||
if lockInfo.lType != debugRLockStr {
|
||||
t.Errorf("Test case %d: Expected the lock type in the lock debug info to be \"%s\"", i+1, debugRLockStr)
|
||||
}
|
||||
} else {
|
||||
if lockInfo.lType != debugWLockStr {
|
||||
t.Errorf("Test case %d: Expected the lock type in the lock debug info to be \"%s\"", i+1, debugWLockStr)
|
||||
}
|
||||
}
|
||||
|
||||
// validating the lock origin.
|
||||
if testCase.lockSource != lockInfo.lockSource {
|
||||
t.Errorf("Test %d: Expected the lock origin info to be \"%s\", but got \"%s\"", i+1, testCase.lockSource, lockInfo.lockSource)
|
||||
}
|
||||
// validating the status of the lock.
|
||||
if lockInfo.status != runningStatus {
|
||||
t.Errorf("Test %d: Expected the status of the lock to be \"%s\", but got \"%s\"", i+1, "Running", lockInfo.status)
|
||||
}
|
||||
} else {
|
||||
// Stop the tests if lock debug entry for given <volume, path> pair is not found.
|
||||
t.Fatalf("Test case %d: Expected an debug lock entry for opsID \"%s\"", i+1, testCase.opsID)
|
||||
}
|
||||
} else {
|
||||
// To change the status the entry for given <volume, path> should exist in the lock info struct.
|
||||
t.Fatalf("Test case %d: Debug lock entry for volume: %s, path: %s doesn't exist", i+1, param.volume, param.path)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// TestNsLockMapStatusNoneToBlocked - Validates the function for changing the lock state to blocked
|
||||
func TestNsLockMapStatusNoneToBlocked(t *testing.T) {
|
||||
|
||||
testCases := []lockStateCase{
|
||||
// Test case - 1.
|
||||
{
|
||||
|
||||
volume: "my-bucket",
|
||||
path: "my-object",
|
||||
lockSource: "/home/vadmeste/work/go/src/github.com/minio/minio/xl-v1-object.go:683 +0x2a",
|
||||
opsID: "abcd1234",
|
||||
readLock: true,
|
||||
// expected metrics.
|
||||
expectedErr: nil,
|
||||
expectedLockStatus: blockedStatus,
|
||||
|
||||
expectedGlobalLockCount: 1,
|
||||
expectedRunningLockCount: 0,
|
||||
expectedBlockedLockCount: 1,
|
||||
|
||||
expectedVolPathLockCount: 1,
|
||||
expectedVolPathRunningCount: 0,
|
||||
expectedVolPathBlockCount: 1,
|
||||
},
|
||||
// Test case - 2.
|
||||
// No entry for <volume, path> pair.
|
||||
// So an attempt to change the state of the lock from `Blocked`->`Running` should fail.
|
||||
{
|
||||
|
||||
volume: "my-bucket",
|
||||
path: "my-object-2",
|
||||
lockSource: "/home/vadmeste/work/go/src/github.com/minio/minio/xl-v1-object.go:683 +0x2a",
|
||||
opsID: "abcd1234",
|
||||
readLock: false,
|
||||
// expected metrics.
|
||||
expectedErr: nil,
|
||||
expectedLockStatus: blockedStatus,
|
||||
|
||||
expectedGlobalLockCount: 2,
|
||||
expectedRunningLockCount: 0,
|
||||
expectedBlockedLockCount: 2,
|
||||
|
||||
expectedVolPathLockCount: 1,
|
||||
expectedVolPathRunningCount: 0,
|
||||
expectedVolPathBlockCount: 1,
|
||||
},
|
||||
// Test case - 3.
|
||||
// Entry for the given operationID doesn't exist in the lock state info.
|
||||
// The entry should be created and relevant counters should be set.
|
||||
{
|
||||
volume: "my-bucket",
|
||||
path: "my-object",
|
||||
lockSource: "/home/vadmeste/work/go/src/github.com/minio/minio/xl-v1-object.go:683 +0x2a",
|
||||
opsID: "ops-Id-not-registered",
|
||||
readLock: true,
|
||||
// expected metrics.
|
||||
expectedErr: nil,
|
||||
expectedLockStatus: "Blocked",
|
||||
|
||||
expectedGlobalLockCount: 3,
|
||||
expectedRunningLockCount: 0,
|
||||
expectedBlockedLockCount: 3,
|
||||
|
||||
expectedVolPathLockCount: 2,
|
||||
expectedVolPathRunningCount: 0,
|
||||
expectedVolPathBlockCount: 2,
|
||||
},
|
||||
}
|
||||
|
||||
// initializing the locks.
|
||||
initNSLock(false)
|
||||
|
||||
param := nsParam{testCases[0].volume, testCases[0].path}
|
||||
// Testing before the initialization done.
|
||||
// Since the data structures for
|
||||
actualErr := globalNSMutex.statusBlockedToRunning(param, testCases[0].lockSource,
|
||||
testCases[0].opsID, testCases[0].readLock)
|
||||
|
||||
expectedErr := LockInfoVolPathMissing{testCases[0].volume, testCases[0].path}
|
||||
if actualErr != expectedErr {
|
||||
t.Fatalf("Errors mismatch: Expected \"%s\", got \"%s\"", expectedErr, actualErr)
|
||||
}
|
||||
|
||||
// Iterate over the cases and assert the result.
|
||||
for i, testCase := range testCases {
|
||||
globalNSMutex.lockMapMutex.Lock()
|
||||
param := nsParam{testCase.volume, testCase.path}
|
||||
actualErr := globalNSMutex.statusNoneToBlocked(param, testCase.lockSource, testCase.opsID, testCase.readLock)
|
||||
if actualErr != testCase.expectedErr {
|
||||
t.Fatalf("Test %d: Errors mismatch: Expected: \"%s\", got: \"%s\"", i+1, testCase.expectedErr, actualErr)
|
||||
}
|
||||
globalNSMutex.lockMapMutex.Unlock()
|
||||
if actualErr == nil {
|
||||
verifyLockState(testCase, t, i+1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestNsLockMapDeleteLockInfoEntryForOps - Validates the removal of entry for given Operational ID from the lock info.
|
||||
func TestNsLockMapDeleteLockInfoEntryForOps(t *testing.T) {
|
||||
testCases := []lockStateCase{
|
||||
// Test case - 1.
|
||||
{
|
||||
volume: "my-bucket",
|
||||
path: "my-object",
|
||||
lockSource: "/home/vadmeste/work/go/src/github.com/minio/minio/xl-v1-object.go:683 +0x2a",
|
||||
opsID: "abcd1234",
|
||||
readLock: true,
|
||||
// expected metrics.
|
||||
},
|
||||
}
|
||||
|
||||
// initializing the locks.
|
||||
initNSLock(false)
|
||||
|
||||
// case - 1.
|
||||
// Testing the case where delete lock info is attempted even before the lock is initialized.
|
||||
param := nsParam{testCases[0].volume, testCases[0].path}
|
||||
// Testing before the initialization done.
|
||||
|
||||
actualErr := globalNSMutex.deleteLockInfoEntryForOps(param, testCases[0].opsID)
|
||||
|
||||
expectedErr := LockInfoVolPathMissing{testCases[0].volume, testCases[0].path}
|
||||
if actualErr != expectedErr {
|
||||
t.Fatalf("Errors mismatch: Expected \"%s\", got \"%s\"", expectedErr, actualErr)
|
||||
}
|
||||
|
||||
// Case - 2.
|
||||
// Lock state is set to Running and then an attempt to delete the info for non-existent opsID done.
|
||||
globalNSMutex.lockMapMutex.Lock()
|
||||
err := globalNSMutex.statusNoneToBlocked(param, testCases[0].lockSource, testCases[0].opsID, testCases[0].readLock)
|
||||
if err != nil {
|
||||
t.Fatalf("Setting lock status to Blocked failed: <ERROR> %s", err)
|
||||
}
|
||||
globalNSMutex.lockMapMutex.Unlock()
|
||||
err = globalNSMutex.statusBlockedToRunning(param, testCases[0].lockSource, testCases[0].opsID, testCases[0].readLock)
|
||||
if err != nil {
|
||||
t.Fatalf("Setting lock status to Running failed: <ERROR> %s", err)
|
||||
}
|
||||
actualErr = globalNSMutex.deleteLockInfoEntryForOps(param, "non-existent-OpsID")
|
||||
|
||||
expectedOpsIDErr := LockInfoOpsIDNotFound{param.volume, param.path, "non-existent-OpsID"}
|
||||
if actualErr != expectedOpsIDErr {
|
||||
t.Fatalf("Errors mismatch: Expected \"%s\", got \"%s\"", expectedOpsIDErr, actualErr)
|
||||
}
|
||||
// case - 4.
|
||||
// Attempt to delete an registered entry is done.
|
||||
// All metrics should be 0 after deleting the entry.
|
||||
|
||||
// Verify that the entry the opsID exists.
|
||||
if debugLockMap, ok := globalNSMutex.debugLockMap[param]; ok {
|
||||
if _, ok := debugLockMap.lockInfo[testCases[0].opsID]; !ok {
|
||||
t.Fatalf("Entry for OpsID \"%s\" in <volume> %s, <path> %s should have existed. ", testCases[0].opsID, param.volume, param.path)
|
||||
}
|
||||
} else {
|
||||
t.Fatalf("Entry for <volume> %s, <path> %s should have existed. ", param.volume, param.path)
|
||||
}
|
||||
|
||||
actualErr = globalNSMutex.deleteLockInfoEntryForOps(param, testCases[0].opsID)
|
||||
if actualErr != nil {
|
||||
t.Fatalf("Expected the error to be <nil>, but got <ERROR> %s", actualErr)
|
||||
}
|
||||
|
||||
// Verify that the entry for the opsId doesn't exists.
|
||||
if debugLockMap, ok := globalNSMutex.debugLockMap[param]; ok {
|
||||
if _, ok := debugLockMap.lockInfo[testCases[0].opsID]; ok {
|
||||
t.Fatalf("The entry for opsID \"%s\" should have been deleted", testCases[0].opsID)
|
||||
}
|
||||
} else {
|
||||
t.Fatalf("Entry for <volume> %s, <path> %s should have existed. ", param.volume, param.path)
|
||||
}
|
||||
if globalNSMutex.counters.granted != 0 {
|
||||
t.Errorf("Expected the count of total running locks to be %v, but got %v", 0, globalNSMutex.counters.granted)
|
||||
}
|
||||
if globalNSMutex.counters.blocked != 0 {
|
||||
t.Errorf("Expected the count of total blocked locks to be %v, but got %v", 0, globalNSMutex.counters.blocked)
|
||||
}
|
||||
if globalNSMutex.counters.total != 0 {
|
||||
t.Errorf("Expected the count of all locks to be %v, but got %v", 0, globalNSMutex.counters.total)
|
||||
}
|
||||
}
|
||||
|
||||
// TestNsLockMapDeleteLockInfoEntryForVolumePath - Tests validate the logic for removal
|
||||
// of entry for given <volume, path> pair from lock info.
|
||||
func TestNsLockMapDeleteLockInfoEntryForVolumePath(t *testing.T) {
|
||||
testCases := []lockStateCase{
|
||||
// Test case - 1.
|
||||
{
|
||||
volume: "my-bucket",
|
||||
path: "my-object",
|
||||
lockSource: "/home/vadmeste/work/go/src/github.com/minio/minio/xl-v1-object.go:683 +0x2a",
|
||||
opsID: "abcd1234",
|
||||
readLock: true,
|
||||
// expected metrics.
|
||||
},
|
||||
}
|
||||
|
||||
// initializing the locks.
|
||||
initNSLock(false)
|
||||
|
||||
// case - 1.
|
||||
// Case where an attempt to delete the entry for non-existent <volume, path> pair is done.
|
||||
// Set the status of the lock to blocked and then to running.
|
||||
param := nsParam{testCases[0].volume, testCases[0].path}
|
||||
actualErr := globalNSMutex.deleteLockInfoEntryForVolumePath(param)
|
||||
expectedNilErr := LockInfoVolPathMissing{param.volume, param.path}
|
||||
if actualErr != expectedNilErr {
|
||||
t.Fatalf("Errors mismatch: Expected \"%s\", got \"%s\"", expectedNilErr, actualErr)
|
||||
}
|
||||
|
||||
// case - 2.
|
||||
// Attempt to delete an registered entry is done.
|
||||
// All metrics should be 0 after deleting the entry.
|
||||
|
||||
// Registering the entry first.
|
||||
globalNSMutex.lockMapMutex.Lock()
|
||||
err := globalNSMutex.statusNoneToBlocked(param, testCases[0].lockSource, testCases[0].opsID, testCases[0].readLock)
|
||||
if err != nil {
|
||||
t.Fatalf("Setting lock status to Blocked failed: <ERROR> %s", err)
|
||||
}
|
||||
globalNSMutex.lockMapMutex.Unlock()
|
||||
err = globalNSMutex.statusBlockedToRunning(param, testCases[0].lockSource, testCases[0].opsID, testCases[0].readLock)
|
||||
if err != nil {
|
||||
t.Fatalf("Setting lock status to Running failed: <ERROR> %s", err)
|
||||
}
|
||||
// Verify that the entry the for given <volume, path> exists.
|
||||
if _, ok := globalNSMutex.debugLockMap[param]; !ok {
|
||||
t.Fatalf("Entry for <volume> %s, <path> %s should have existed.", param.volume, param.path)
|
||||
}
|
||||
// first delete the entry for the operation ID.
|
||||
_ = globalNSMutex.deleteLockInfoEntryForOps(param, testCases[0].opsID)
|
||||
actualErr = globalNSMutex.deleteLockInfoEntryForVolumePath(param)
|
||||
if actualErr != nil {
|
||||
t.Fatalf("Expected the error to be <nil>, but got <ERROR> %s", actualErr)
|
||||
}
|
||||
|
||||
// Verify that the entry for the opsId doesn't exists.
|
||||
if _, ok := globalNSMutex.debugLockMap[param]; ok {
|
||||
t.Fatalf("Entry for <volume> %s, <path> %s should have been deleted. ", param.volume, param.path)
|
||||
}
|
||||
// The lock count values should be 0.
|
||||
if globalNSMutex.counters.granted != 0 {
|
||||
t.Errorf("Expected the count of total running locks to be %v, but got %v", 0, globalNSMutex.counters.granted)
|
||||
}
|
||||
if globalNSMutex.counters.blocked != 0 {
|
||||
t.Errorf("Expected the count of total blocked locks to be %v, but got %v", 0, globalNSMutex.counters.blocked)
|
||||
}
|
||||
if globalNSMutex.counters.total != 0 {
|
||||
t.Errorf("Expected the count of all locks to be %v, but got %v", 0, globalNSMutex.counters.total)
|
||||
}
|
||||
}
|
||||
|
||||
// Test to assert that status change from blocked to none shouldn't remove lock info entry for ops
|
||||
// Ref: Logs from https://github.com/minio/minio/issues/5311
|
||||
func TestStatusBlockedToNone(t *testing.T) {
|
||||
// Initialize namespace lock subsystem
|
||||
initNSLock(false)
|
||||
|
||||
ns := globalNSMutex
|
||||
|
||||
volume, path := "bucket", "object"
|
||||
param := nsParam{volume: volume, path: path}
|
||||
lockSrc := "main.go:1"
|
||||
opsID := "1"
|
||||
|
||||
err := ns.statusNoneToBlocked(param, lockSrc, opsID, false)
|
||||
if err != nil {
|
||||
t.Fatal("Failed to mark lock state to blocked")
|
||||
}
|
||||
|
||||
err = ns.statusBlockedToNone(param, lockSrc, opsID, false)
|
||||
if err != nil {
|
||||
t.Fatal("Failed to mark lock state to none")
|
||||
}
|
||||
|
||||
err = ns.deleteLockInfoEntryForOps(param, opsID)
|
||||
if err != nil {
|
||||
t.Fatalf("Expected deleting of lock entry for %s to pass but got %v", opsID, err)
|
||||
}
|
||||
|
||||
}
|
|
@ -1,67 +0,0 @@
|
|||
/*
|
||||
* Minio Cloud Storage, (C) 2016, 2017 Minio, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// SystemLockState - Structure to fill the lock state of entire object storage.
|
||||
// That is the total locks held, total calls blocked on locks and state of all the locks for the entire system.
|
||||
type SystemLockState struct {
|
||||
TotalLocks int64 `json:"totalLocks"`
|
||||
// Count of operations which are blocked waiting for the lock to
|
||||
// be released.
|
||||
TotalBlockedLocks int64 `json:"totalBlockedLocks"`
|
||||
// Count of operations which has successfully acquired the lock but
|
||||
// hasn't unlocked yet (operation in progress).
|
||||
TotalAcquiredLocks int64 `json:"totalAcquiredLocks"`
|
||||
LocksInfoPerObject []VolumeLockInfo `json:"locksInfoPerObject"`
|
||||
}
|
||||
|
||||
// VolumeLockInfo - Structure to contain the lock state info for volume, path pair.
|
||||
type VolumeLockInfo struct {
|
||||
Bucket string `json:"bucket"`
|
||||
Object string `json:"object"`
|
||||
|
||||
// All locks blocked + running for given <volume,path> pair.
|
||||
LocksOnObject int64 `json:"-"`
|
||||
// Count of operations which has successfully acquired the lock
|
||||
// but hasn't unlocked yet( operation in progress).
|
||||
LocksAcquiredOnObject int64 `json:"-"`
|
||||
// Count of operations which are blocked waiting for the lock
|
||||
// to be released.
|
||||
TotalBlockedLocks int64 `json:"-"`
|
||||
|
||||
// Count of all read locks
|
||||
TotalReadLocks int64 `json:"readLocks"`
|
||||
// Count of all write locks
|
||||
TotalWriteLocks int64 `json:"writeLocks"`
|
||||
// State information containing state of the locks for all operations
|
||||
// on given <volume,path> pair.
|
||||
LockDetailsOnObject []OpsLockState `json:"lockOwners"`
|
||||
}
|
||||
|
||||
// OpsLockState - structure to fill in state information of the lock.
|
||||
// structure to fill in status information for each operation with given operation ID.
|
||||
type OpsLockState struct {
|
||||
OperationID string `json:"id"` // String containing operation ID.
|
||||
LockSource string `json:"source"` // Operation type (GetObject, PutObject...)
|
||||
LockType lockType `json:"type"` // Lock type (RLock, WLock)
|
||||
Status statusType `json:"status"` // Status can be Running/Ready/Blocked.
|
||||
Since time.Time `json:"since"` // Time when the lock was initially held.
|
||||
}
|
|
@ -104,10 +104,6 @@ func newNSLock(isDistXL bool) *nsLockMap {
|
|||
lockMap: make(map[nsParam]*nsLock),
|
||||
counters: &lockStat{},
|
||||
}
|
||||
|
||||
// Initialize nsLockMap with entry for instrumentation information.
|
||||
// Entries of <volume,path> -> stateInfo of locks
|
||||
nsMutex.debugLockMap = make(map[nsParam]*debugLockInfoPerVolumePath)
|
||||
return &nsMutex
|
||||
}
|
||||
|
||||
|
@ -132,8 +128,7 @@ type nsLock struct {
|
|||
// Unlock, RLock and RUnlock.
|
||||
type nsLockMap struct {
|
||||
// Lock counter used for lock debugging.
|
||||
counters *lockStat
|
||||
debugLockMap map[nsParam]*debugLockInfoPerVolumePath // Info for instrumentation on locks.
|
||||
counters *lockStat
|
||||
|
||||
// Indicates if namespace is part of a distributed setup.
|
||||
isDistXL bool
|
||||
|
@ -162,12 +157,6 @@ func (n *nsLockMap) lock(volume, path string, lockSource, opsID string, readLock
|
|||
}
|
||||
nsLk.ref++ // Update ref count here to avoid multiple races.
|
||||
|
||||
// Change the state of the lock to be blocked for the given
|
||||
// pair of <volume, path> and <OperationID> till the lock
|
||||
// unblocks. The lock for accessing `globalNSMutex` is held inside
|
||||
// the function itself.
|
||||
n.statusNoneToBlocked(param, lockSource, opsID, readLock)
|
||||
|
||||
// Unlock map before Locking NS which might block.
|
||||
n.lockMapMutex.Unlock()
|
||||
|
||||
|
@ -181,28 +170,14 @@ func (n *nsLockMap) lock(volume, path string, lockSource, opsID string, readLock
|
|||
if !locked { // We failed to get the lock
|
||||
n.lockMapMutex.Lock()
|
||||
defer n.lockMapMutex.Unlock()
|
||||
// Changing the status of the operation from blocked to none
|
||||
n.statusBlockedToNone(param, lockSource, opsID, readLock)
|
||||
|
||||
nsLk.ref-- // Decrement ref count since we failed to get the lock
|
||||
// delete the lock state entry for given operation ID.
|
||||
n.deleteLockInfoEntryForOps(param, opsID)
|
||||
|
||||
if nsLk.ref == 0 {
|
||||
// Remove from the map if there are no more references.
|
||||
delete(n.lockMap, param)
|
||||
|
||||
// delete the lock state entry for given
|
||||
// <volume, path> pair.
|
||||
n.deleteLockInfoEntryForVolumePath(param)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Changing the status of the operation from blocked to
|
||||
// running. change the state of the lock to be running (from
|
||||
// blocked) for the given pair of <volume, path> and <OperationID>.
|
||||
n.statusBlockedToRunning(param, lockSource, opsID, readLock)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -225,17 +200,10 @@ func (n *nsLockMap) unlock(volume, path, opsID string, readLock bool) {
|
|||
}
|
||||
if nsLk.ref != 0 {
|
||||
nsLk.ref--
|
||||
|
||||
// delete the lock state entry for given operation ID.
|
||||
n.deleteLockInfoEntryForOps(param, opsID)
|
||||
}
|
||||
if nsLk.ref == 0 {
|
||||
// Remove from the map if there are no more references.
|
||||
delete(n.lockMap, param)
|
||||
|
||||
// delete the lock state entry for given
|
||||
// <volume, path> pair.
|
||||
n.deleteLockInfoEntryForVolumePath(param)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -296,11 +264,6 @@ func (n *nsLockMap) ForceUnlock(volume, path string) {
|
|||
// Remove lock from the map.
|
||||
delete(n.lockMap, param)
|
||||
}
|
||||
|
||||
// delete the lock state entry for given
|
||||
// <volume, path> pair. Ignore error as there
|
||||
// is no way to report it back
|
||||
n.deleteLockInfoEntryForVolumePath(param)
|
||||
}
|
||||
|
||||
// lockInstance - frontend/top-level interface for namespace locks.
|
||||
|
@ -313,7 +276,8 @@ type lockInstance struct {
|
|||
// path. The returned lockInstance object encapsulates the nsLockMap,
|
||||
// volume, path and operation ID.
|
||||
func (n *nsLockMap) NewNSLock(volume, path string) RWLocker {
|
||||
return &lockInstance{n, volume, path, getOpsID()}
|
||||
opsID := mustGetUUID()
|
||||
return &lockInstance{n, volume, path, opsID}
|
||||
}
|
||||
|
||||
// Lock - block until write lock is taken or timeout has occurred.
|
||||
|
|
|
@ -19,7 +19,6 @@ package cmd
|
|||
import (
|
||||
"context"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/minio/minio/pkg/hash"
|
||||
"github.com/minio/minio/pkg/madmin"
|
||||
|
@ -65,10 +64,6 @@ type ObjectLayer interface {
|
|||
ListBucketsHeal(ctx context.Context) (buckets []BucketInfo, err error)
|
||||
ListObjectsHeal(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error)
|
||||
|
||||
// Locking operations
|
||||
ListLocks(ctx context.Context, bucket, prefix string, duration time.Duration) ([]VolumeLockInfo, error)
|
||||
ClearLocks(context.Context, []VolumeLockInfo) error
|
||||
|
||||
// Policy operations
|
||||
SetBucketPolicy(context.Context, string, *policy.Policy) error
|
||||
GetBucketPolicy(context.Context, string) (*policy.Policy, error)
|
||||
|
|
|
@ -1433,24 +1433,3 @@ func (s *xlSets) ListObjectsHeal(ctx context.Context, bucket, prefix, marker, de
|
|||
// Return error at the end.
|
||||
return loi, toObjectErr(err, bucket, prefix)
|
||||
}
|
||||
|
||||
// ListLocks from all sets, aggregate them and return.
|
||||
func (s *xlSets) ListLocks(ctx context.Context, bucket, prefix string, duration time.Duration) (lockInfo []VolumeLockInfo, err error) {
|
||||
for _, set := range s.sets {
|
||||
var setLockInfo []VolumeLockInfo
|
||||
setLockInfo, err = set.ListLocks(ctx, bucket, prefix, duration)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
lockInfo = append(lockInfo, setLockInfo...)
|
||||
}
|
||||
return lockInfo, nil
|
||||
}
|
||||
|
||||
// Clear all requested locks on all sets.
|
||||
func (s *xlSets) ClearLocks(ctx context.Context, lockInfo []VolumeLockInfo) error {
|
||||
for _, set := range s.sets {
|
||||
set.ClearLocks(ctx, lockInfo)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
58
cmd/xl-v1.go
58
cmd/xl-v1.go
|
@ -19,7 +19,6 @@ package cmd
|
|||
import (
|
||||
"context"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
"github.com/minio/minio/pkg/bpool"
|
||||
|
@ -56,63 +55,6 @@ func (xl xlObjects) Shutdown(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Locking operations
|
||||
|
||||
// List namespace locks held in object layer
|
||||
func (xl xlObjects) ListLocks(ctx context.Context, bucket, prefix string, duration time.Duration) ([]VolumeLockInfo, error) {
|
||||
xl.nsMutex.lockMapMutex.Lock()
|
||||
defer xl.nsMutex.lockMapMutex.Unlock()
|
||||
// Fetch current time once instead of fetching system time for every lock.
|
||||
timeNow := UTCNow()
|
||||
volumeLocks := []VolumeLockInfo{}
|
||||
|
||||
for param, debugLock := range xl.nsMutex.debugLockMap {
|
||||
if param.volume != bucket {
|
||||
continue
|
||||
}
|
||||
// N B empty prefix matches all param.path.
|
||||
if !hasPrefix(param.path, prefix) {
|
||||
continue
|
||||
}
|
||||
|
||||
volLockInfo := VolumeLockInfo{
|
||||
Bucket: param.volume,
|
||||
Object: param.path,
|
||||
LocksOnObject: debugLock.counters.total,
|
||||
TotalBlockedLocks: debugLock.counters.blocked,
|
||||
LocksAcquiredOnObject: debugLock.counters.granted,
|
||||
}
|
||||
// Filter locks that are held on bucket, prefix.
|
||||
for opsID, lockInfo := range debugLock.lockInfo {
|
||||
// filter locks that were held for longer than duration.
|
||||
elapsed := timeNow.Sub(lockInfo.since)
|
||||
if elapsed < duration {
|
||||
continue
|
||||
}
|
||||
// Add locks that are held for longer than duration.
|
||||
volLockInfo.LockDetailsOnObject = append(volLockInfo.LockDetailsOnObject,
|
||||
OpsLockState{
|
||||
OperationID: opsID,
|
||||
LockSource: lockInfo.lockSource,
|
||||
LockType: lockInfo.lType,
|
||||
Status: lockInfo.status,
|
||||
Since: lockInfo.since,
|
||||
})
|
||||
volumeLocks = append(volumeLocks, volLockInfo)
|
||||
}
|
||||
}
|
||||
return volumeLocks, nil
|
||||
}
|
||||
|
||||
// Clear namespace locks held in object layer
|
||||
func (xl xlObjects) ClearLocks(ctx context.Context, volLocks []VolumeLockInfo) error {
|
||||
// Remove lock matching bucket/prefix held longer than duration.
|
||||
for _, volLock := range volLocks {
|
||||
xl.nsMutex.ForceUnlock(volLock.Bucket, volLock.Object)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// byDiskTotal is a collection satisfying sort.Interface.
|
||||
type byDiskTotal []DiskInfo
|
||||
|
||||
|
|
|
@ -69,61 +69,6 @@
|
|||
</Error>
|
||||
```
|
||||
|
||||
|
||||
### 锁管理API
|
||||
* ListLocks
|
||||
- GET /?lock&bucket=mybucket&prefix=myprefix&duration=duration
|
||||
- x-minio-operation: list
|
||||
- Response: On success 200, json encoded response containing all locks held, for longer than duration.
|
||||
- Possible error responses
|
||||
- ErrInvalidBucketName
|
||||
```xml
|
||||
<Error>
|
||||
<Code>InvalidBucketName</Code>
|
||||
<Message>The specified bucket is not valid.</Message>
|
||||
<Key></Key>
|
||||
<BucketName></BucketName>
|
||||
<Resource>/</Resource>
|
||||
<RequestId>3L137</RequestId>
|
||||
<HostId>3L137</HostId>
|
||||
</Error>
|
||||
```
|
||||
- ErrInvalidObjectName
|
||||
```xml
|
||||
<Error>
|
||||
<Code>XMinioInvalidObjectName</Code>
|
||||
<Message>Object name contains unsupported characters. Unsupported characters are `^*|\"</Message>
|
||||
<Key></Key>
|
||||
<BucketName></BucketName>
|
||||
<Resource>/</Resource>
|
||||
<RequestId>3L137</RequestId>
|
||||
<HostId>3L137</HostId>
|
||||
</Error>
|
||||
```
|
||||
|
||||
- ErrInvalidDuration
|
||||
```xml
|
||||
<Error>
|
||||
<Code>InvalidDuration</Code>
|
||||
<Message>Duration provided in the request is invalid.</Message>
|
||||
<Key></Key>
|
||||
<BucketName></BucketName>
|
||||
<Resource>/</Resource>
|
||||
<RequestId>3L137</RequestId>
|
||||
<HostId>3L137</HostId>
|
||||
</Error>
|
||||
```
|
||||
|
||||
|
||||
* ClearLocks
|
||||
- POST /?lock&bucket=mybucket&prefix=myprefix&duration=duration
|
||||
- x-minio-operation: clear
|
||||
- Response: On success 200, json encoded response containing all locks cleared, for longer than duration.
|
||||
- Possible error responses, similar to errors listed in ListLocks.
|
||||
- ErrInvalidBucketName
|
||||
- ErrInvalidObjectName
|
||||
- ErrInvalidDuration
|
||||
|
||||
### 修复
|
||||
|
||||
* ListBucketsHeal
|
||||
|
|
|
@ -36,10 +36,10 @@ func main() {
|
|||
|
||||
```
|
||||
|
||||
| Service operations | Info operations | LockInfo operations | Healing operations | Config operations | Misc |
|
||||
| Service operations | Info operations | Healing operations | Config operations | Misc |
|
||||
|:------------------------------------|:----------------------------|:----------------------------|:--------------------------------------|:--------------------------|:------------------------------------|
|
||||
| [`ServiceStatus`](#ServiceStatus) | [`ServerInfo`](#ServerInfo) | [`ListLocks`](#ListLocks) | [`Heal`](#Heal) | [`GetConfig`](#GetConfig) | [`SetCredentials`](#SetCredentials) |
|
||||
| [`ServiceSendAction`](#ServiceSendAction) | | [`ClearLocks`](#ClearLocks) | | [`SetConfig`](#SetConfig) | |
|
||||
| [`ServiceStatus`](#ServiceStatus) | [`ServerInfo`](#ServerInfo) | [`Heal`](#Heal) | [`GetConfig`](#GetConfig) | [`SetCredentials`](#SetCredentials) |
|
||||
| [`ServiceSendAction`](#ServiceSendAction) | | | [`SetConfig`](#SetConfig) | |
|
||||
|
||||
|
||||
## 1. Constructor
|
||||
|
@ -203,38 +203,6 @@ Fetches information for all cluster nodes, such as server properties, storage in
|
|||
```
|
||||
|
||||
|
||||
## 5. Lock operations
|
||||
|
||||
<a name="ListLocks"></a>
|
||||
### ListLocks(bucket, prefix string, duration time.Duration) ([]VolumeLockInfo, error)
|
||||
If successful returns information on the list of locks held on ``bucket`` matching ``prefix`` for longer than ``duration`` seconds.
|
||||
|
||||
__Example__
|
||||
|
||||
``` go
|
||||
volLocks, err := madmClnt.ListLocks("mybucket", "myprefix", 30 * time.Second)
|
||||
if err != nil {
|
||||
log.Fatalln(err)
|
||||
}
|
||||
log.Println("List of locks: ", volLocks)
|
||||
|
||||
```
|
||||
|
||||
<a name="ClearLocks"></a>
|
||||
### ClearLocks(bucket, prefix string, duration time.Duration) ([]VolumeLockInfo, error)
|
||||
If successful returns information on the list of locks cleared on ``bucket`` matching ``prefix`` for longer than ``duration`` seconds.
|
||||
|
||||
__Example__
|
||||
|
||||
``` go
|
||||
volLocks, err := madmClnt.ClearLocks("mybucket", "myprefix", 30 * time.Second)
|
||||
if err != nil {
|
||||
log.Fatalln(err)
|
||||
}
|
||||
log.Println("List of locks cleared: ", volLocks)
|
||||
|
||||
```
|
||||
|
||||
## 6. Heal operations
|
||||
|
||||
<a name="Heal"></a>
|
||||
|
|
|
@ -1,46 +0,0 @@
|
|||
// +build ignore
|
||||
|
||||
/*
|
||||
* Minio Cloud Storage, (C) 2016 Minio, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/minio/minio/pkg/madmin"
|
||||
)
|
||||
|
||||
func main() {
|
||||
// Note: YOUR-ACCESSKEYID, YOUR-SECRETACCESSKEY are
|
||||
// dummy values, please replace them with original values.
|
||||
|
||||
// API requests are secure (HTTPS) if secure=true and insecure (HTTPS) otherwise.
|
||||
// New returns an Minio Admin client object.
|
||||
madmClnt, err := madmin.New("your-minio.example.com:9000", "YOUR-ACCESSKEYID", "YOUR-SECRETACCESSKEY", true)
|
||||
if err != nil {
|
||||
log.Fatalln(err)
|
||||
}
|
||||
|
||||
// List locks held on mybucket/myprefix for longer than 30s.
|
||||
locksHeld, err := madmClnt.ListLocks("mybucket", "myprefix", time.Duration(30*time.Second))
|
||||
if err != nil {
|
||||
log.Fatalln(err)
|
||||
}
|
||||
log.Println(locksHeld)
|
||||
}
|
|
@ -1,135 +0,0 @@
|
|||
/*
|
||||
* Minio Cloud Storage, (C) 2016 Minio, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package madmin
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"time"
|
||||
)
|
||||
|
||||
type statusType string
|
||||
|
||||
type lockType string
|
||||
|
||||
// OpsLockState - represents lock specific details.
|
||||
type OpsLockState struct {
|
||||
OperationID string `json:"id"` // String containing operation ID.
|
||||
LockSource string `json:"source"` // Operation type (GetObject, PutObject...)
|
||||
LockType lockType `json:"type"` // Lock type (RLock, WLock)
|
||||
Status statusType `json:"status"` // Status can be Running/Ready/Blocked.
|
||||
Since time.Time `json:"since"` // Time when the lock was initially held.
|
||||
}
|
||||
|
||||
// VolumeLockInfo - represents summary and individual lock details of all
|
||||
// locks held on a given bucket, object.
|
||||
type VolumeLockInfo struct {
|
||||
Bucket string `json:"bucket"`
|
||||
Object string `json:"object"`
|
||||
|
||||
// All locks blocked + running for given <volume,path> pair.
|
||||
LocksOnObject int64 `json:"-"`
|
||||
// Count of operations which has successfully acquired the lock
|
||||
// but hasn't unlocked yet( operation in progress).
|
||||
LocksAcquiredOnObject int64 `json:"-"`
|
||||
// Count of operations which are blocked waiting for the lock
|
||||
// to be released.
|
||||
TotalBlockedLocks int64 `json:"-"`
|
||||
|
||||
// Count of all read locks
|
||||
TotalReadLocks int64 `json:"readLocks"`
|
||||
// Count of all write locks
|
||||
TotalWriteLocks int64 `json:"writeLocks"`
|
||||
// State information containing state of the locks for all operations
|
||||
// on given <volume,path> pair.
|
||||
LockDetailsOnObject []OpsLockState `json:"lockOwners"`
|
||||
}
|
||||
|
||||
// getLockInfos - unmarshal []VolumeLockInfo from a reader.
|
||||
func getLockInfos(body io.Reader) ([]VolumeLockInfo, error) {
|
||||
respBytes, err := ioutil.ReadAll(body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var lockInfos []VolumeLockInfo
|
||||
|
||||
err = json.Unmarshal(respBytes, &lockInfos)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return lockInfos, nil
|
||||
}
|
||||
|
||||
// ListLocks - Calls List Locks Management API to fetch locks matching
|
||||
// bucket, prefix and held before the duration supplied.
|
||||
func (adm *AdminClient) ListLocks(bucket, prefix string,
|
||||
duration time.Duration) ([]VolumeLockInfo, error) {
|
||||
|
||||
queryVal := make(url.Values)
|
||||
queryVal.Set("bucket", bucket)
|
||||
queryVal.Set("prefix", prefix)
|
||||
queryVal.Set("older-than", duration.String())
|
||||
|
||||
// Execute GET on /minio/admin/v1/locks to list locks.
|
||||
resp, err := adm.executeMethod("GET", requestData{
|
||||
queryValues: queryVal,
|
||||
relPath: "/v1/locks",
|
||||
})
|
||||
defer closeResponse(resp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, httpRespToErrorResponse(resp)
|
||||
}
|
||||
|
||||
return getLockInfos(resp.Body)
|
||||
}
|
||||
|
||||
// ClearLocks - Calls Clear Locks Management API to clear locks held
|
||||
// on bucket, matching prefix older than duration supplied.
|
||||
func (adm *AdminClient) ClearLocks(bucket, prefix string,
|
||||
duration time.Duration) ([]VolumeLockInfo, error) {
|
||||
|
||||
queryVal := make(url.Values)
|
||||
queryVal.Set("bucket", bucket)
|
||||
queryVal.Set("prefix", prefix)
|
||||
queryVal.Set("duration", duration.String())
|
||||
|
||||
// Execute POST on /?lock to clear locks.
|
||||
resp, err := adm.executeMethod("DELETE", requestData{
|
||||
queryValues: queryVal,
|
||||
relPath: "/v1/locks",
|
||||
})
|
||||
defer closeResponse(resp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, httpRespToErrorResponse(resp)
|
||||
}
|
||||
|
||||
return getLockInfos(resp.Body)
|
||||
}
|
|
@ -1,61 +0,0 @@
|
|||
/*
|
||||
* Minio Cloud Storage, (C) 2016 Minio, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package madmin
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// Test for getLockInfos helper function.
|
||||
func TestGetLockInfos(t *testing.T) {
|
||||
testCases := []struct {
|
||||
// Used to construct a io.Reader holding xml serialized lock information
|
||||
inputLocks []VolumeLockInfo
|
||||
}{
|
||||
// To build a reader with _no_ lock information.
|
||||
{
|
||||
inputLocks: []VolumeLockInfo{},
|
||||
},
|
||||
// To build a reader with _one_ lock information.
|
||||
{
|
||||
inputLocks: []VolumeLockInfo{{Bucket: "bucket", Object: "object"}},
|
||||
},
|
||||
}
|
||||
for i, test := range testCases {
|
||||
jsonBytes, err := json.Marshal(test.inputLocks)
|
||||
if err != nil {
|
||||
t.Fatalf("Test %d - Failed to marshal input lockInfos - %v", i+1, err)
|
||||
}
|
||||
actualLocks, err := getLockInfos(bytes.NewReader(jsonBytes))
|
||||
if err != nil {
|
||||
t.Fatalf("Test %d - Failed to get lock information - %v", i+1, err)
|
||||
}
|
||||
if !reflect.DeepEqual(actualLocks, test.inputLocks) {
|
||||
t.Errorf("Test %d - Expected %v but received %v", i+1, test.inputLocks, actualLocks)
|
||||
}
|
||||
}
|
||||
|
||||
// Invalid json representation of []VolumeLockInfo
|
||||
_, err := getLockInfos(bytes.NewReader([]byte("invalidBytes")))
|
||||
if err == nil {
|
||||
t.Errorf("Test expected to fail, but passed")
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue