Support MinIO to be deployed on more than 32 nodes (#8492)

This PR implements locking from a global entity into
a more localized set level entity, allowing for locks
to be held only on the resources which are writing
to a collection of disks rather than a global level.

In this process this PR also removes the top-level
limit of 32 nodes to an unlimited number of nodes. This
is a precursor change before bring in bucket expansion.
This commit is contained in:
Harshavardhana 2019-11-13 12:17:45 -08:00 committed by kannappanr
parent 069b8ee8ff
commit e9b2bf00ad
58 changed files with 2312 additions and 838 deletions

View File

@ -46,7 +46,7 @@ matrix:
go: 1.13.x
script:
- go build --ldflags="$(go run buildscripts/gen-ldflags.go)" -o %GOPATH%\bin\minio.exe
- go test -v --timeout 20m ./...
- for d in $(go list ./... | grep -v browser); do go test -v --timeout 20m "$d"; done
before_script:
# Add an IPv6 config - see the corresponding Travis issue

28
CREDITS
View File

@ -10910,6 +10910,34 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
================================================================
github.com/klauspost/readahead
https://github.com/klauspost/readahead
----------------------------------------------------------------
The MIT License (MIT)
Copyright (c) 2015 Klaus Post
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
================================================================
github.com/klauspost/reedsolomon

View File

@ -476,7 +476,13 @@ func (a adminAPIHandlers) PerfInfoHandler(w http.ResponseWriter, r *http.Request
}
func newLockEntry(l lockRequesterInfo, resource, server string) *madmin.LockEntry {
entry := &madmin.LockEntry{Timestamp: l.Timestamp, Resource: resource, ServerList: []string{server}, Owner: l.Node, Source: l.Source, ID: l.UID}
entry := &madmin.LockEntry{
Timestamp: l.Timestamp,
Resource: resource,
ServerList: []string{server},
Source: l.Source,
ID: l.UID,
}
if l.Writer {
entry.Type = "Write"
} else {
@ -491,12 +497,14 @@ func topLockEntries(peerLocks []*PeerLocks) madmin.LockEntries {
if peerLock == nil {
continue
}
for k, v := range peerLock.Locks {
for _, lockReqInfo := range v {
if val, ok := entryMap[lockReqInfo.UID]; ok {
val.ServerList = append(val.ServerList, peerLock.Addr)
} else {
entryMap[lockReqInfo.UID] = newLockEntry(lockReqInfo, k, peerLock.Addr)
for _, locks := range peerLock.Locks {
for k, v := range locks {
for _, lockReqInfo := range v {
if val, ok := entryMap[lockReqInfo.UID]; ok {
val.ServerList = append(val.ServerList, peerLock.Addr)
} else {
entryMap[lockReqInfo.UID] = newLockEntry(lockReqInfo, k, peerLock.Addr)
}
}
}
}
@ -531,10 +539,13 @@ func (a adminAPIHandlers) TopLocksHandler(w http.ResponseWriter, r *http.Request
peerLocks := globalNotificationSys.GetLocks(ctx)
// Once we have received all the locks currently used from peers
// add the local peer locks list as well.
localLocks := globalLockServer.ll.DupLockMap()
var getRespLocks GetLocksResp
for _, llocker := range globalLockServers {
getRespLocks = append(getRespLocks, llocker.DupLockMap())
}
peerLocks = append(peerLocks, &PeerLocks{
Addr: getHostName(r),
Locks: localLocks,
Locks: getRespLocks,
})
topLocks := topLockEntries(peerLocks)

View File

@ -27,7 +27,6 @@ import (
"net/url"
"sync"
"testing"
"time"
"github.com/gorilla/mux"
"github.com/minio/minio/pkg/auth"
@ -68,10 +67,6 @@ func prepareAdminXLTestBed() (*adminXLTestBed, error) {
// code backend.
globalIsXL = true
// initialize NSLock.
isDistXL := false
initNSLock(isDistXL)
// Init global heal state
if globalIsXL {
globalAllHealState = initHealState()
@ -345,46 +340,6 @@ func TestToAdminAPIErrCode(t *testing.T) {
}
}
func TestTopLockEntries(t *testing.T) {
t1 := UTCNow()
t2 := UTCNow().Add(10 * time.Second)
peerLocks := []*PeerLocks{
{
Addr: "1",
Locks: map[string][]lockRequesterInfo{
"1": {
{false, "node2", "ep2", "2", t2, t2, ""},
{true, "node1", "ep1", "1", t1, t1, ""},
},
"2": {
{false, "node2", "ep2", "2", t2, t2, ""},
{true, "node1", "ep1", "1", t1, t1, ""},
},
},
},
{
Addr: "2",
Locks: map[string][]lockRequesterInfo{
"1": {
{false, "node2", "ep2", "2", t2, t2, ""},
{true, "node1", "ep1", "1", t1, t1, ""},
},
"2": {
{false, "node2", "ep2", "2", t2, t2, ""},
{true, "node1", "ep1", "1", t1, t1, ""},
},
},
},
}
les := topLockEntries(peerLocks)
if len(les) != 2 {
t.Fatalf("Did not get 2 results")
}
if les[0].Timestamp.After(les[1].Timestamp) {
t.Fatalf("Got wrong sorted value")
}
}
func TestExtractHealInitParams(t *testing.T) {
mkParams := func(clientToken string, forceStart, forceStop bool) url.Values {
v := url.Values{}

View File

@ -52,7 +52,7 @@ func handleEncryptedConfigBackend(objAPI ObjectLayer, server bool) error {
// at a given time, this big transaction lock ensures this
// appropriately. This is also true for rotation of encrypted
// content.
objLock := globalNSMutex.NewNSLock(context.Background(), minioMetaBucket, transactionConfigPrefix)
objLock := objAPI.NewNSLock(context.Background(), minioMetaBucket, transactionConfigPrefix)
if err := objLock.GetLock(globalOperationTimeout); err != nil {
return err
}

View File

@ -294,7 +294,7 @@ func initConfig(objAPI ObjectLayer) error {
// all the config as necessary, this is to ensure that
// redundant locks are not held for each migration - this allows
// for a more predictable behavior while debugging.
objLock := globalNSMutex.NewNSLock(context.Background(), minioMetaBucket, transactionConfigPrefix)
objLock := objAPI.NewNSLock(context.Background(), minioMetaBucket, transactionConfigPrefix)
if err := objLock.GetLock(globalOperationTimeout); err != nil {
return err
}

View File

@ -108,7 +108,7 @@ var lifecycleTimeout = newDynamicTimeout(60*time.Second, time.Second)
func lifecycleRound(ctx context.Context, objAPI ObjectLayer) error {
// Lock to avoid concurrent lifecycle ops from other nodes
sweepLock := globalNSMutex.NewNSLock(ctx, "system", "daily-lifecycle-ops")
sweepLock := objAPI.NewNSLock(ctx, "system", "daily-lifecycle-ops")
if err := sweepLock.GetLock(lifecycleTimeout); err != nil {
return err
}

View File

@ -1,3 +1,19 @@
/*
* MinIO Cloud Storage, (C) 2019 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
@ -49,8 +65,6 @@ type cacheObjects struct {
cache []*diskCache
// file path patterns to exclude from cache
exclude []string
// to manage cache namespace locks
nsMutex *nsLockMap
// if true migration is in progress from v1 to v2
migrating bool
@ -58,6 +72,7 @@ type cacheObjects struct {
migMutex sync.Mutex
// Object functions pointing to the corresponding functions of backend implementation.
NewNSLockFn func(ctx context.Context, bucket, object string) RWLocker
GetObjectNInfoFn func(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error)
GetObjectInfoFn func(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error)
DeleteObjectFn func(ctx context.Context, bucket, object string) error
@ -66,7 +81,7 @@ type cacheObjects struct {
}
func (c *cacheObjects) delete(ctx context.Context, dcache *diskCache, bucket, object string) (err error) {
cLock := c.nsMutex.NewNSLock(ctx, bucket, object)
cLock := c.NewNSLockFn(ctx, bucket, object)
if err := cLock.GetLock(globalObjectTimeout); err != nil {
return err
}
@ -75,7 +90,7 @@ func (c *cacheObjects) delete(ctx context.Context, dcache *diskCache, bucket, ob
}
func (c *cacheObjects) put(ctx context.Context, dcache *diskCache, bucket, object string, data io.Reader, size int64, opts ObjectOptions) error {
cLock := c.nsMutex.NewNSLock(ctx, bucket, object)
cLock := c.NewNSLockFn(ctx, bucket, object)
if err := cLock.GetLock(globalObjectTimeout); err != nil {
return err
}
@ -84,7 +99,7 @@ func (c *cacheObjects) put(ctx context.Context, dcache *diskCache, bucket, objec
}
func (c *cacheObjects) get(ctx context.Context, dcache *diskCache, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, err error) {
cLock := c.nsMutex.NewNSLock(ctx, bucket, object)
cLock := c.NewNSLockFn(ctx, bucket, object)
if err := cLock.GetRLock(globalObjectTimeout); err != nil {
return nil, err
}
@ -94,7 +109,7 @@ func (c *cacheObjects) get(ctx context.Context, dcache *diskCache, bucket, objec
}
func (c *cacheObjects) stat(ctx context.Context, dcache *diskCache, bucket, object string) (oi ObjectInfo, err error) {
cLock := c.nsMutex.NewNSLock(ctx, bucket, object)
cLock := c.NewNSLockFn(ctx, bucket, object)
if err := cLock.GetRLock(globalObjectTimeout); err != nil {
return oi, err
}
@ -549,9 +564,11 @@ func newServerCacheObjects(ctx context.Context, config cache.Config) (CacheObjec
c := &cacheObjects{
cache: cache,
exclude: config.Exclude,
nsMutex: newNSLock(false),
migrating: migrateSw,
migMutex: sync.Mutex{},
NewNSLockFn: func(ctx context.Context, bucket, object string) RWLocker {
return globalObjectAPI.NewNSLock(ctx, bucket, object)
},
GetObjectInfoFn: func(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) {
return newObjectLayerFn().GetObjectInfo(ctx, bucket, object, opts)
},

View File

@ -30,11 +30,6 @@ import (
// This file implements and supports ellipses pattern for
// `minio server` command line arguments.
// Maximum number of unique args supported on the command line.
const (
serverCommandLineArgsMax = 32
)
// Endpoint set represents parsed ellipses values, also provides
// methods to get the sets of endpoints.
type endpointSet struct {

View File

@ -577,12 +577,6 @@ func CreateEndpoints(serverAddr string, args ...[]string) (string, EndpointList,
uniqueArgs.Add(endpoint.Host)
}
// Error out if we have more than serverCommandLineArgsMax unique servers.
if len(uniqueArgs.ToSlice()) > serverCommandLineArgsMax {
err := fmt.Errorf("Unsupported number of endpoints (%s), total number of servers cannot be more than %d", endpoints, serverCommandLineArgsMax)
return serverAddr, endpoints, setupType, err
}
// Error out if we have less than 2 unique servers.
if len(uniqueArgs.ToSlice()) < 2 && setupType == DistXLSetupType {
err := fmt.Errorf("Unsupported number of endpoints (%s), minimum number of servers cannot be less than 2 in distributed setup", endpoints)

View File

@ -471,15 +471,7 @@ func formatXLGetDeploymentID(refFormat *formatXLV3, formats []*formatXLV3) (stri
}
// formatXLFixDeploymentID - Add deployment id if it is not present.
func formatXLFixDeploymentID(ctx context.Context, endpoints EndpointList, storageDisks []StorageAPI, refFormat *formatXLV3) (err error) {
// Acquire lock on format.json
mutex := newNSLock(globalIsDistXL)
formatLock := mutex.NewNSLock(ctx, minioMetaBucket, formatConfigFile)
if err = formatLock.GetLock(globalHealingTimeout); err != nil {
return err
}
defer formatLock.Unlock()
func formatXLFixDeploymentID(endpoints EndpointList, storageDisks []StorageAPI, refFormat *formatXLV3) (err error) {
// Attempt to load all `format.json` from all disks.
var sErrs []error
formats, sErrs := loadFormatXLAll(storageDisks)
@ -518,12 +510,12 @@ func formatXLFixDeploymentID(ctx context.Context, endpoints EndpointList, storag
}
// Deployment ID needs to be set on all the disks.
// Save `format.json` across all disks.
return saveFormatXLAll(ctx, storageDisks, formats)
return saveFormatXLAll(context.Background(), storageDisks, formats)
}
// Update only the valid local disks which have not been updated before.
func formatXLFixLocalDeploymentID(ctx context.Context, endpoints EndpointList, storageDisks []StorageAPI, refFormat *formatXLV3) error {
func formatXLFixLocalDeploymentID(endpoints EndpointList, storageDisks []StorageAPI, refFormat *formatXLV3) error {
// If this server was down when the deploymentID was updated
// then we make sure that we update the local disks with the deploymentID.
for index, storageDisk := range storageDisks {
@ -542,7 +534,7 @@ func formatXLFixLocalDeploymentID(ctx context.Context, endpoints EndpointList, s
}
format.ID = refFormat.ID
if err := saveFormatXL(storageDisk, format); err != nil {
logger.LogIf(ctx, err)
logger.LogIf(context.Background(), err)
return fmt.Errorf("Unable to save format.json, %s", err)
}
}

View File

@ -646,7 +646,7 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
}
// Hold write lock on the object.
destLock := fs.nsMutex.NewNSLock(ctx, bucket, object)
destLock := fs.NewNSLock(ctx, bucket, object)
if err = destLock.GetLock(globalObjectTimeout); err != nil {
return oi, err
}

View File

@ -165,6 +165,12 @@ func NewFSObjectLayer(fsPath string) (ObjectLayer, error) {
return fs, nil
}
// NewNSLock - initialize a new namespace RWLocker instance.
func (fs *FSObjects) NewNSLock(ctx context.Context, bucket string, object string) RWLocker {
// lockers are explicitly 'nil' for FS mode since there are only local lockers
return fs.nsMutex.NewNSLock(ctx, nil, bucket, object)
}
// Shutdown - should be called when process shuts down.
func (fs *FSObjects) Shutdown(ctx context.Context) error {
fs.fsFormatRlk.Close()
@ -290,7 +296,7 @@ func (fs *FSObjects) statBucketDir(ctx context.Context, bucket string) (os.FileI
// MakeBucketWithLocation - create a new bucket, returns if it
// already exists.
func (fs *FSObjects) MakeBucketWithLocation(ctx context.Context, bucket, location string) error {
bucketLock := fs.nsMutex.NewNSLock(ctx, bucket, "")
bucketLock := fs.NewNSLock(ctx, bucket, "")
if err := bucketLock.GetLock(globalObjectTimeout); err != nil {
return err
}
@ -313,7 +319,7 @@ func (fs *FSObjects) MakeBucketWithLocation(ctx context.Context, bucket, locatio
// GetBucketInfo - fetch bucket metadata info.
func (fs *FSObjects) GetBucketInfo(ctx context.Context, bucket string) (bi BucketInfo, e error) {
bucketLock := fs.nsMutex.NewNSLock(ctx, bucket, "")
bucketLock := fs.NewNSLock(ctx, bucket, "")
if e := bucketLock.GetRLock(globalObjectTimeout); e != nil {
return bi, e
}
@ -376,7 +382,7 @@ func (fs *FSObjects) ListBuckets(ctx context.Context) ([]BucketInfo, error) {
// DeleteBucket - delete a bucket and all the metadata associated
// with the bucket including pending multipart, object metadata.
func (fs *FSObjects) DeleteBucket(ctx context.Context, bucket string) error {
bucketLock := fs.nsMutex.NewNSLock(ctx, bucket, "")
bucketLock := fs.NewNSLock(ctx, bucket, "")
if err := bucketLock.GetLock(globalObjectTimeout); err != nil {
logger.LogIf(ctx, err)
return err
@ -412,7 +418,7 @@ func (fs *FSObjects) DeleteBucket(ctx context.Context, bucket string) error {
func (fs *FSObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (oi ObjectInfo, e error) {
cpSrcDstSame := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(dstBucket, dstObject))
if !cpSrcDstSame {
objectDWLock := fs.nsMutex.NewNSLock(ctx, dstBucket, dstObject)
objectDWLock := fs.NewNSLock(ctx, dstBucket, dstObject)
if err := objectDWLock.GetLock(globalObjectTimeout); err != nil {
return oi, err
}
@ -484,7 +490,7 @@ func (fs *FSObjects) GetObjectNInfo(ctx context.Context, bucket, object string,
if lockType != noLock {
// Lock the object before reading.
lock := fs.nsMutex.NewNSLock(ctx, bucket, object)
lock := fs.NewNSLock(ctx, bucket, object)
switch lockType {
case writeLock:
if err = lock.GetLock(globalObjectTimeout); err != nil {
@ -571,7 +577,7 @@ func (fs *FSObjects) GetObject(ctx context.Context, bucket, object string, offse
}
// Lock the object before reading.
objectLock := fs.nsMutex.NewNSLock(ctx, bucket, object)
objectLock := fs.NewNSLock(ctx, bucket, object)
if err := objectLock.GetRLock(globalObjectTimeout); err != nil {
logger.LogIf(ctx, err)
return err
@ -739,7 +745,7 @@ func (fs *FSObjects) getObjectInfo(ctx context.Context, bucket, object string) (
// getObjectInfoWithLock - reads object metadata and replies back ObjectInfo.
func (fs *FSObjects) getObjectInfoWithLock(ctx context.Context, bucket, object string) (oi ObjectInfo, e error) {
// Lock the object before reading.
objectLock := fs.nsMutex.NewNSLock(ctx, bucket, object)
objectLock := fs.NewNSLock(ctx, bucket, object)
if err := objectLock.GetRLock(globalObjectTimeout); err != nil {
return oi, err
}
@ -764,7 +770,7 @@ func (fs *FSObjects) getObjectInfoWithLock(ctx context.Context, bucket, object s
func (fs *FSObjects) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (oi ObjectInfo, e error) {
oi, err := fs.getObjectInfoWithLock(ctx, bucket, object)
if err == errCorruptedFormat || err == io.EOF {
objectLock := fs.nsMutex.NewNSLock(ctx, bucket, object)
objectLock := fs.NewNSLock(ctx, bucket, object)
if err = objectLock.GetLock(globalObjectTimeout); err != nil {
return oi, toObjectErr(err, bucket, object)
}
@ -810,7 +816,7 @@ func (fs *FSObjects) PutObject(ctx context.Context, bucket string, object string
return ObjectInfo{}, err
}
// Lock the object.
objectLock := fs.nsMutex.NewNSLock(ctx, bucket, object)
objectLock := fs.NewNSLock(ctx, bucket, object)
if err := objectLock.GetLock(globalObjectTimeout); err != nil {
logger.LogIf(ctx, err)
return objInfo, err
@ -965,7 +971,7 @@ func (fs *FSObjects) DeleteObjects(ctx context.Context, bucket string, objects [
// and there are no rollbacks supported.
func (fs *FSObjects) DeleteObject(ctx context.Context, bucket, object string) error {
// Acquire a write lock before deleting the object.
objectLock := fs.nsMutex.NewNSLock(ctx, bucket, object)
objectLock := fs.NewNSLock(ctx, bucket, object)
if err := objectLock.GetLock(globalOperationTimeout); err != nil {
return err
}

View File

@ -142,8 +142,6 @@ func StartGateway(ctx *cli.Context, gw Gateway) {
// Set system resources to maximum.
logger.LogIf(context.Background(), setMaxResources())
initNSLock(false) // Enable local namespace lock.
// Set when gateway is enabled
globalIsGateway = true

View File

@ -18,6 +18,7 @@ package cmd
import (
"context"
"errors"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/lifecycle"
@ -28,6 +29,12 @@ import (
// GatewayUnsupported list of unsupported call stubs for gateway.
type GatewayUnsupported struct{}
// NewNSLock is a dummy stub for gateway.
func (a GatewayUnsupported) NewNSLock(ctx context.Context, bucket string, object string) RWLocker {
logger.CriticalIf(ctx, errors.New("not implemented"))
return nil
}
// ListMultipartUploads lists all multipart uploads.
func (a GatewayUnsupported) ListMultipartUploads(ctx context.Context, bucket string, prefix string, keyMarker string, uploadIDMarker string, delimiter string, maxUploads int) (lmi ListMultipartsInfo, err error) {
return lmi, NotImplemented{}

View File

@ -83,7 +83,8 @@ func healErasureSet(ctx context.Context, setIndex int, xlObj *xlObjects) error {
// Hold a lock for healing the erasure set
zeroDuration := time.Millisecond
zeroDynamicTimeout := newDynamicTimeout(zeroDuration, zeroDuration)
erasureSetHealLock := globalNSMutex.NewNSLock(ctx, "system", fmt.Sprintf("erasure-set-heal-%d", setIndex))
erasureSetHealLock := xlObj.nsMutex.NewNSLock(ctx, xlObj.getLockers(),
"system", fmt.Sprintf("erasure-set-heal-%d", setIndex))
if err := erasureSetHealLock.GetLock(zeroDynamicTimeout); err != nil {
return err
}
@ -126,7 +127,7 @@ func execLeaderTasks(sets *xlSets) {
ctx := context.Background()
// Hold a lock so only one server performs auto-healing
leaderLock := globalNSMutex.NewNSLock(ctx, minioMetaBucket, "leader")
leaderLock := sets.NewNSLock(ctx, minioMetaBucket, "leader")
for {
err := leaderLock.GetLock(leaderLockTimeout)
if err == nil {

View File

@ -338,7 +338,7 @@ func (sys *IAMSys) Load() error {
func (sys *IAMSys) doIAMConfigMigration(objAPI ObjectLayer) error {
// Take IAM configuration migration lock
lockPath := iamConfigPrefix + "/migration.lock"
objLock := globalNSMutex.NewNSLock(context.Background(), minioMetaBucket, lockPath)
objLock := objAPI.NewNSLock(context.Background(), minioMetaBucket, lockPath)
if err := objLock.GetLock(globalOperationTimeout); err != nil {
return err
}

View File

@ -17,22 +17,21 @@
package cmd
import (
"errors"
"fmt"
"sync"
"time"
"github.com/minio/dsync/v2"
"github.com/minio/minio/pkg/dsync"
)
// lockRequesterInfo stores various info from the client for each lock that is requested.
type lockRequesterInfo struct {
Writer bool // Bool whether write or read lock.
Node string // Network address of client claiming lock.
ServiceEndpoint string // RPC path of client claiming lock.
UID string // UID to uniquely identify request of client.
Timestamp time.Time // Timestamp set at the time of initialization.
TimeLastCheck time.Time // Timestamp for last check of validity of lock.
Source string // Contains line, function and filename reqesting the lock.
Writer bool // Bool whether write or read lock.
UID string // UID to uniquely identify request of client.
Timestamp time.Time // Timestamp set at the time of initialization.
TimeLastCheck time.Time // Timestamp for last check of validity of lock.
Source string // Contains line, function and filename reqesting the lock.
}
// isWriteLock returns whether the lock is a write or read lock.
@ -40,20 +39,41 @@ func isWriteLock(lri []lockRequesterInfo) bool {
return len(lri) == 1 && lri[0].Writer
}
type errorLocker struct{}
func (d *errorLocker) String() string {
return ""
}
func (d *errorLocker) Lock(args dsync.LockArgs) (reply bool, err error) {
return false, errors.New("unable to lock")
}
func (d *errorLocker) Unlock(args dsync.LockArgs) (reply bool, err error) {
return false, errors.New("unable to unlock")
}
func (d *errorLocker) RLock(args dsync.LockArgs) (reply bool, err error) {
return false, errors.New("unable to rlock")
}
func (d *errorLocker) RUnlock(args dsync.LockArgs) (reply bool, err error) {
return false, errors.New("unable to runlock")
}
func (d *errorLocker) Close() error {
return nil
}
// localLocker implements Dsync.NetLocker
type localLocker struct {
mutex sync.Mutex
serviceEndpoint string
serverAddr string
lockMap map[string][]lockRequesterInfo
mutex sync.Mutex
endpoint Endpoint
lockMap map[string][]lockRequesterInfo
}
func (l *localLocker) ServerAddr() string {
return l.serverAddr
}
func (l *localLocker) ServiceEndpoint() string {
return l.serviceEndpoint
func (l *localLocker) String() string {
return l.endpoint.String()
}
func (l *localLocker) Lock(args dsync.LockArgs) (reply bool, err error) {
@ -63,13 +83,11 @@ func (l *localLocker) Lock(args dsync.LockArgs) (reply bool, err error) {
if !isLockTaken { // No locks held on the given name, so claim write lock
l.lockMap[args.Resource] = []lockRequesterInfo{
{
Writer: true,
Node: args.ServerAddr,
ServiceEndpoint: args.ServiceEndpoint,
Source: args.Source,
UID: args.UID,
Timestamp: UTCNow(),
TimeLastCheck: UTCNow(),
Writer: true,
Source: args.Source,
UID: args.UID,
Timestamp: UTCNow(),
TimeLastCheck: UTCNow(),
},
}
}
@ -96,17 +114,38 @@ func (l *localLocker) Unlock(args dsync.LockArgs) (reply bool, err error) {
}
// removeEntry based on the uid of the lock message, removes a single entry from the
// lockRequesterInfo array or the whole array from the map (in case of a write lock
// or last read lock)
func (l *localLocker) removeEntry(name, uid string, lri *[]lockRequesterInfo) bool {
// Find correct entry to remove based on uid.
for index, entry := range *lri {
if entry.UID == uid {
if len(*lri) == 1 {
// Remove the write lock.
delete(l.lockMap, name)
} else {
// Remove the appropriate read lock.
*lri = append((*lri)[:index], (*lri)[index+1:]...)
l.lockMap[name] = *lri
}
return true
}
}
// None found return false, perhaps entry removed in previous run.
return false
}
func (l *localLocker) RLock(args dsync.LockArgs) (reply bool, err error) {
l.mutex.Lock()
defer l.mutex.Unlock()
lrInfo := lockRequesterInfo{
Writer: false,
Node: args.ServerAddr,
ServiceEndpoint: args.ServiceEndpoint,
Source: args.Source,
UID: args.UID,
Timestamp: UTCNow(),
TimeLastCheck: UTCNow(),
Writer: false,
Source: args.Source,
UID: args.UID,
Timestamp: UTCNow(),
TimeLastCheck: UTCNow(),
}
if lri, ok := l.lockMap[args.Resource]; ok {
if reply = !isWriteLock(lri); reply {
@ -139,18 +178,6 @@ func (l *localLocker) RUnlock(args dsync.LockArgs) (reply bool, err error) {
return reply, nil
}
func (l *localLocker) ForceUnlock(args dsync.LockArgs) (reply bool, err error) {
l.mutex.Lock()
defer l.mutex.Unlock()
if len(args.UID) != 0 {
return false, fmt.Errorf("ForceUnlock called with non-empty UID: %s", args.UID)
}
// Only clear lock when it is taken
// Remove the lock (irrespective of write or read lock)
delete(l.lockMap, args.Resource)
return true, nil
}
func (l *localLocker) DupLockMap() map[string][]lockRequesterInfo {
l.mutex.Lock()
defer l.mutex.Unlock()
@ -161,3 +188,14 @@ func (l *localLocker) DupLockMap() map[string][]lockRequesterInfo {
}
return lockCopy
}
func (l *localLocker) Close() error {
return nil
}
func newLocker(endpoint Endpoint) *localLocker {
return &localLocker{
endpoint: endpoint,
lockMap: make(map[string][]lockRequesterInfo),
}
}

View File

@ -26,18 +26,16 @@ import (
"net/url"
"github.com/minio/dsync/v2"
"github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/cmd/rest"
xnet "github.com/minio/minio/pkg/net"
"github.com/minio/minio/pkg/dsync"
)
// lockRESTClient is authenticable lock REST client
type lockRESTClient struct {
host *xnet.Host
restClient *rest.Client
serverURL *url.URL
endpoint Endpoint
connected int32
}
@ -49,20 +47,13 @@ func toLockError(err error) error {
switch err.Error() {
case errLockConflict.Error():
return errLockConflict
case errLockNotExpired.Error():
return errLockNotExpired
}
return err
}
// ServerAddr - dsync.NetLocker interface compatible method.
func (client *lockRESTClient) ServerAddr() string {
return client.serverURL.Host
}
// ServiceEndpoint - dsync.NetLocker interface compatible method.
func (client *lockRESTClient) ServiceEndpoint() string {
return client.serverURL.Path
// String stringer *dsync.NetLocker* interface compatible method.
func (client *lockRESTClient) String() string {
return client.endpoint.String()
}
// Wrapper to restClient.Call to handle network errors, in case of network error the connection is marked disconnected
@ -83,22 +74,17 @@ func (client *lockRESTClient) call(method string, values url.Values, body io.Rea
}
if isNetworkError(err) {
time.AfterFunc(defaultRetryUnit*5, func() {
// After 5 seconds, take this lock client
// online for a retry.
time.AfterFunc(defaultRetryUnit*3, func() {
// After 3 seconds, take this lock client online for a retry.
atomic.StoreInt32(&client.connected, 1)
})
atomic.StoreInt32(&client.connected, 0)
}
return nil, toLockError(err)
}
// Stringer provides a canonicalized representation of node.
func (client *lockRESTClient) String() string {
return client.host.String()
}
// IsOnline - returns whether REST client failed to connect or not.
func (client *lockRESTClient) IsOnline() bool {
return atomic.LoadInt32(&client.connected) == 1
@ -117,15 +103,13 @@ func (client *lockRESTClient) restCall(call string, args dsync.LockArgs) (reply
values.Set(lockRESTUID, args.UID)
values.Set(lockRESTSource, args.Source)
values.Set(lockRESTResource, args.Resource)
values.Set(lockRESTServerAddr, args.ServerAddr)
values.Set(lockRESTServerEndpoint, args.ServiceEndpoint)
respBody, err := client.call(call, values, nil, -1)
defer http.DrainBody(respBody)
switch err {
case nil:
return true, nil
case errLockConflict, errLockNotExpired:
case errLockConflict:
return false, nil
default:
return false, err
@ -152,45 +136,42 @@ func (client *lockRESTClient) Unlock(args dsync.LockArgs) (reply bool, err error
return client.restCall(lockRESTMethodUnlock, args)
}
// ForceUnlock calls force unlock RPC.
func (client *lockRESTClient) ForceUnlock(args dsync.LockArgs) (reply bool, err error) {
return client.restCall(lockRESTMethodForceUnlock, args)
func closeLockers(lockers []dsync.NetLocker) {
for _, locker := range lockers {
locker.Close()
}
}
// Expired calls expired RPC.
func (client *lockRESTClient) Expired(args dsync.LockArgs) (reply bool, err error) {
return client.restCall(lockRESTMethodExpired, args)
func newLockAPI(endpoint Endpoint) dsync.NetLocker {
if endpoint.IsLocal {
return globalLockServers[endpoint]
}
return newlockRESTClient(endpoint)
}
// Returns a lock rest client.
func newlockRESTClient(peer *xnet.Host) *lockRESTClient {
scheme := "http"
if globalIsSSL {
scheme = "https"
}
func newlockRESTClient(endpoint Endpoint) *lockRESTClient {
serverURL := &url.URL{
Scheme: scheme,
Host: peer.String(),
Path: lockRESTPath,
Scheme: endpoint.Scheme,
Host: endpoint.Host,
Path: pathJoin(lockRESTPrefix, endpoint.Path, lockRESTVersion),
}
var tlsConfig *tls.Config
if globalIsSSL {
tlsConfig = &tls.Config{
ServerName: peer.Name,
ServerName: endpoint.Hostname(),
RootCAs: globalRootCAs,
NextProtos: []string{"http/1.1"}, // Force http1.1
}
}
restClient, err := rest.NewClient(serverURL, tlsConfig, rest.DefaultRESTTimeout, newAuthToken)
trFn := newCustomHTTPTransport(tlsConfig, rest.DefaultRESTTimeout, rest.DefaultRESTTimeout)
restClient, err := rest.NewClient(serverURL, trFn, newAuthToken)
if err != nil {
logger.LogIf(context.Background(), err)
return &lockRESTClient{serverURL: serverURL, host: peer, restClient: restClient, connected: 0}
return &lockRESTClient{endpoint: endpoint, restClient: restClient, connected: 0}
}
return &lockRESTClient{serverURL: serverURL, host: peer, restClient: restClient, connected: 1}
return &lockRESTClient{endpoint: endpoint, restClient: restClient, connected: 1}
}

View File

@ -19,17 +19,16 @@ package cmd
import (
"testing"
"github.com/minio/dsync/v2"
xnet "github.com/minio/minio/pkg/net"
"github.com/minio/minio/pkg/dsync"
)
// Tests lock rpc client.
func TestLockRESTlient(t *testing.T) {
host, err := xnet.ParseHost("localhost:9000")
endpoint, err := NewEndpoint("http://localhost:9000")
if err != nil {
t.Fatalf("unexpected error %v", err)
}
lkClient := newlockRESTClient(host)
lkClient := newlockRESTClient(endpoint)
if lkClient.connected == 0 {
t.Fatalf("unexpected error. connection failed")
}
@ -54,14 +53,4 @@ func TestLockRESTlient(t *testing.T) {
if err == nil {
t.Fatal("Expected for Unlock to fail")
}
_, err = lkClient.ForceUnlock(dsync.LockArgs{})
if err == nil {
t.Fatal("Expected for ForceUnlock to fail")
}
_, err = lkClient.Expired(dsync.LockArgs{})
if err == nil {
t.Fatal("Expected for Expired to fail")
}
}

View File

@ -18,23 +18,19 @@ package cmd
import (
"errors"
"time"
)
const (
lockRESTVersion = "v2"
lockRESTVersionPrefix = SlashSeparator + "v2"
lockRESTPrefix = minioReservedBucketPath + "/lock"
lockRESTPath = lockRESTPrefix + lockRESTVersionPrefix
)
const (
lockRESTMethodLock = "/lock"
lockRESTMethodRLock = "/rlock"
lockRESTMethodUnlock = "/unlock"
lockRESTMethodRUnlock = "/runlock"
lockRESTMethodForceUnlock = "/forceunlock"
lockRESTMethodExpired = "/expired"
lockRESTMethodLock = "/lock"
lockRESTMethodRLock = "/rlock"
lockRESTMethodUnlock = "/unlock"
lockRESTMethodRUnlock = "/runlock"
// Unique ID of lock/unlock request.
lockRESTUID = "uid"
@ -43,67 +39,6 @@ const (
lockRESTSource = "source"
// Resource contains a entity to be locked/unlocked.
lockRESTResource = "resource"
// ServerAddr contains the address of the server who requested lock/unlock of the above resource.
lockRESTServerAddr = "serverAddr"
// ServiceEndpoint contains the network path of above server to do lock/unlock.
lockRESTServerEndpoint = "serverEndpoint"
)
// nameLockRequesterInfoPair is a helper type for lock maintenance
type nameLockRequesterInfoPair struct {
name string
lri lockRequesterInfo
}
var errLockConflict = errors.New("lock conflict")
var errLockNotExpired = errors.New("lock not expired")
// Similar to removeEntry but only removes an entry only if the lock entry exists in map.
func (l *localLocker) removeEntryIfExists(nlrip nameLockRequesterInfoPair) {
// Check if entry is still in map (could have been removed altogether by 'concurrent' (R)Unlock of last entry)
if lri, ok := l.lockMap[nlrip.name]; ok {
// Even if the entry exists, it may not be the same entry which was
// considered as expired, so we simply an attempt to remove it if its
// not possible there is nothing we need to do.
l.removeEntry(nlrip.name, nlrip.lri.UID, &lri)
}
}
// removeEntry based on the uid of the lock message, removes a single entry from the
// lockRequesterInfo array or the whole array from the map (in case of a write lock
// or last read lock)
func (l *localLocker) removeEntry(name, uid string, lri *[]lockRequesterInfo) bool {
// Find correct entry to remove based on uid.
for index, entry := range *lri {
if entry.UID == uid {
if len(*lri) == 1 {
// Remove the write lock.
delete(l.lockMap, name)
} else {
// Remove the appropriate read lock.
*lri = append((*lri)[:index], (*lri)[index+1:]...)
l.lockMap[name] = *lri
}
return true
}
}
// None found return false, perhaps entry removed in previous run.
return false
}
// getLongLivedLocks returns locks that are older than a certain time and
// have not been 'checked' for validity too soon enough
func getLongLivedLocks(m map[string][]lockRequesterInfo, interval time.Duration) []nameLockRequesterInfoPair {
rslt := []nameLockRequesterInfoPair{}
for name, lriArray := range m {
for idx := range lriArray {
// Check whether enough time has gone by since last check
if time.Since(lriArray[idx].TimeLastCheck) >= interval {
rslt = append(rslt, nameLockRequesterInfoPair{name: name, lri: lriArray[idx]})
lriArray[idx].TimeLastCheck = UTCNow()
}
}
}
return rslt
}

View File

@ -21,7 +21,6 @@ import (
"reflect"
"sync"
"testing"
"time"
)
// Helper function to create a lock server for testing
@ -36,9 +35,8 @@ func createLockTestServer(t *testing.T) (string, *lockRESTServer, string) {
locker := &lockRESTServer{
ll: &localLocker{
mutex: sync.Mutex{},
serviceEndpoint: "rpc-path",
lockMap: make(map[string][]lockRequesterInfo),
mutex: sync.Mutex{},
lockMap: make(map[string][]lockRequesterInfo),
},
}
creds := globalActiveCred
@ -49,63 +47,22 @@ func createLockTestServer(t *testing.T) (string, *lockRESTServer, string) {
return fsDir, locker, token
}
// Test function to remove lock entries from map only in case they still exist based on name & uid combination
func TestLockRpcServerRemoveEntryIfExists(t *testing.T) {
testPath, locker, _ := createLockTestServer(t)
defer os.RemoveAll(testPath)
lri := lockRequesterInfo{
Writer: false,
Node: "host",
ServiceEndpoint: "rpc-path",
UID: "0123-4567",
Timestamp: UTCNow(),
TimeLastCheck: UTCNow(),
}
nlrip := nameLockRequesterInfoPair{name: "name", lri: lri}
// first test by simulating item has already been deleted
locker.ll.removeEntryIfExists(nlrip)
{
gotLri := locker.ll.lockMap["name"]
expectedLri := []lockRequesterInfo(nil)
if !reflect.DeepEqual(expectedLri, gotLri) {
t.Errorf("Expected %#v, got %#v", expectedLri, gotLri)
}
}
// then test normal deletion
locker.ll.lockMap["name"] = []lockRequesterInfo{lri} // add item
locker.ll.removeEntryIfExists(nlrip)
{
gotLri := locker.ll.lockMap["name"]
expectedLri := []lockRequesterInfo(nil)
if !reflect.DeepEqual(expectedLri, gotLri) {
t.Errorf("Expected %#v, got %#v", expectedLri, gotLri)
}
}
}
// Test function to remove lock entries from map based on name & uid combination
func TestLockRpcServerRemoveEntry(t *testing.T) {
testPath, locker, _ := createLockTestServer(t)
defer os.RemoveAll(testPath)
lockRequesterInfo1 := lockRequesterInfo{
Writer: true,
Node: "host",
ServiceEndpoint: "rpc-path",
UID: "0123-4567",
Timestamp: UTCNow(),
TimeLastCheck: UTCNow(),
Writer: true,
UID: "0123-4567",
Timestamp: UTCNow(),
TimeLastCheck: UTCNow(),
}
lockRequesterInfo2 := lockRequesterInfo{
Writer: true,
Node: "host",
ServiceEndpoint: "rpc-path",
UID: "89ab-cdef",
Timestamp: UTCNow(),
TimeLastCheck: UTCNow(),
Writer: true,
UID: "89ab-cdef",
Timestamp: UTCNow(),
TimeLastCheck: UTCNow(),
}
locker.ll.lockMap["name"] = []lockRequesterInfo{
@ -140,64 +97,3 @@ func TestLockRpcServerRemoveEntry(t *testing.T) {
}
}
}
// Tests function returning long lived locks.
func TestLockRpcServerGetLongLivedLocks(t *testing.T) {
ut := UTCNow()
// Collection of test cases for verifying returning valid long lived locks.
testCases := []struct {
lockMap map[string][]lockRequesterInfo
lockInterval time.Duration
expectedNSLR []nameLockRequesterInfoPair
}{
// Testcase - 1 validates long lived locks, returns empty list.
{
lockMap: map[string][]lockRequesterInfo{
"test": {{
Writer: true,
Node: "10.1.10.21",
ServiceEndpoint: "/lock/mnt/disk1",
UID: "10000112",
Timestamp: ut,
TimeLastCheck: ut,
}},
},
lockInterval: 1 * time.Minute,
expectedNSLR: []nameLockRequesterInfoPair{},
},
// Testcase - 2 validates long lived locks, returns at least one list.
{
lockMap: map[string][]lockRequesterInfo{
"test": {{
Writer: true,
Node: "10.1.10.21",
ServiceEndpoint: "/lock/mnt/disk1",
UID: "10000112",
Timestamp: ut,
TimeLastCheck: ut.Add(-2 * time.Minute),
}},
},
lockInterval: 1 * time.Minute,
expectedNSLR: []nameLockRequesterInfoPair{
{
name: "test",
lri: lockRequesterInfo{
Writer: true,
Node: "10.1.10.21",
ServiceEndpoint: "/lock/mnt/disk1",
UID: "10000112",
Timestamp: ut,
TimeLastCheck: ut.Add(-2 * time.Minute),
},
},
},
},
}
// Validates all test cases here.
for i, testCase := range testCases {
nsLR := getLongLivedLocks(testCase.lockMap, testCase.lockInterval)
if !reflect.DeepEqual(testCase.expectedNSLR, nsLR) {
t.Errorf("Test %d: Expected %#v, got %#v", i+1, testCase.expectedNSLR, nsLR)
}
}
}

View File

@ -17,16 +17,13 @@
package cmd
import (
"context"
"errors"
"math/rand"
"net/http"
"path"
"time"
"github.com/gorilla/mux"
"github.com/minio/dsync/v2"
"github.com/minio/minio/cmd/logger"
xnet "github.com/minio/minio/pkg/net"
"github.com/minio/minio/pkg/dsync"
)
const (
@ -58,11 +55,9 @@ func (l *lockRESTServer) IsValid(w http.ResponseWriter, r *http.Request) bool {
func getLockArgs(r *http.Request) dsync.LockArgs {
return dsync.LockArgs{
UID: r.URL.Query().Get(lockRESTUID),
Source: r.URL.Query().Get(lockRESTSource),
Resource: r.URL.Query().Get(lockRESTResource),
ServerAddr: r.URL.Query().Get(lockRESTServerAddr),
ServiceEndpoint: r.URL.Query().Get(lockRESTServerEndpoint),
UID: r.URL.Query().Get(lockRESTUID),
Source: r.URL.Query().Get(lockRESTSource),
Resource: r.URL.Query().Get(lockRESTResource),
}
}
@ -132,130 +127,28 @@ func (l *lockRESTServer) RUnlockHandler(w http.ResponseWriter, r *http.Request)
}
}
// ForceUnlockHandler - force releases the acquired lock.
func (l *lockRESTServer) ForceUnlockHandler(w http.ResponseWriter, r *http.Request) {
if !l.IsValid(w, r) {
l.writeErrorResponse(w, errors.New("Invalid request"))
return
}
// Ignore the ForceUnlock() "reply" return value because if err == nil, "reply" is always true
// Consequently, if err != nil, reply is always false
if _, err := l.ll.ForceUnlock(getLockArgs(r)); err != nil {
l.writeErrorResponse(w, err)
return
}
}
// ExpiredHandler - query expired lock status.
func (l *lockRESTServer) ExpiredHandler(w http.ResponseWriter, r *http.Request) {
if !l.IsValid(w, r) {
l.writeErrorResponse(w, errors.New("Invalid request"))
return
}
lockArgs := getLockArgs(r)
l.ll.mutex.Lock()
defer l.ll.mutex.Unlock()
// Lock found, proceed to verify if belongs to given uid.
if lri, ok := l.ll.lockMap[lockArgs.Resource]; ok {
// Check whether uid is still active
for _, entry := range lri {
if entry.UID == lockArgs.UID {
l.writeErrorResponse(w, errLockNotExpired)
return
}
}
}
}
// lockMaintenance loops over locks that have been active for some time and checks back
// with the original server whether it is still alive or not
//
// Following logic inside ignores the errors generated for Dsync.Active operation.
// - server at client down
// - some network error (and server is up normally)
//
// We will ignore the error, and we will retry later to get a resolve on this lock
func (l *lockRESTServer) lockMaintenance(interval time.Duration) {
l.ll.mutex.Lock()
// Get list of long lived locks to check for staleness.
nlripLongLived := getLongLivedLocks(l.ll.lockMap, interval)
l.ll.mutex.Unlock()
// Validate if long lived locks are indeed clean.
for _, nlrip := range nlripLongLived {
// Initialize client based on the long live locks.
host, err := xnet.ParseHost(nlrip.lri.Node)
if err != nil {
logger.LogIf(context.Background(), err)
continue
}
c := newlockRESTClient(host)
if !c.IsOnline() {
continue
}
// Call back to original server verify whether the lock is still active (based on name & uid)
expired, err := c.Expired(dsync.LockArgs{
UID: nlrip.lri.UID,
Resource: nlrip.name,
})
if err != nil {
continue
}
// For successful response, verify if lock was indeed active or stale.
if expired {
// The lock is no longer active at server that originated
// the lock, attempt to remove the lock.
l.ll.mutex.Lock()
l.ll.removeEntryIfExists(nlrip) // Purge the stale entry if it exists.
l.ll.mutex.Unlock()
}
// Close the connection regardless of the call response.
c.Close()
}
}
// Start lock maintenance from all lock servers.
func startLockMaintenance(lkSrv *lockRESTServer) {
// Initialize a new ticker with a minute between each ticks.
ticker := time.NewTicker(lockMaintenanceInterval)
// Stop the timer upon service closure and cleanup the go-routine.
defer ticker.Stop()
// Start with random sleep time, so as to avoid "synchronous checks" between servers
time.Sleep(time.Duration(rand.Float64() * float64(lockMaintenanceInterval)))
for {
// Verifies every minute for locks held more than 2 minutes.
select {
case <-GlobalServiceDoneCh:
return
case <-ticker.C:
lkSrv.lockMaintenance(lockValidityCheckInterval)
}
}
}
// registerLockRESTHandlers - register lock rest router.
func registerLockRESTHandlers(router *mux.Router) {
subrouter := router.PathPrefix(lockRESTPrefix).Subrouter()
queries := restQueries(lockRESTUID, lockRESTSource, lockRESTResource, lockRESTServerAddr, lockRESTServerEndpoint)
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodLock).HandlerFunc(httpTraceHdrs(globalLockServer.LockHandler)).Queries(queries...)
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRLock).HandlerFunc(httpTraceHdrs(globalLockServer.RLockHandler)).Queries(queries...)
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodUnlock).HandlerFunc(httpTraceHdrs(globalLockServer.UnlockHandler)).Queries(queries...)
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRUnlock).HandlerFunc(httpTraceHdrs(globalLockServer.RUnlockHandler)).Queries(queries...)
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodForceUnlock).HandlerFunc(httpTraceHdrs(globalLockServer.ForceUnlockHandler)).Queries(queries...)
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodExpired).HandlerFunc(httpTraceAll(globalLockServer.ExpiredHandler)).Queries(queries...)
func registerLockRESTHandlers(router *mux.Router, endpoints EndpointList) {
queries := restQueries(lockRESTUID, lockRESTSource, lockRESTResource)
for _, endpoint := range endpoints {
if !endpoint.IsLocal {
continue
}
lockServer := &lockRESTServer{
ll: newLocker(endpoint),
}
subrouter := router.PathPrefix(path.Join(lockRESTPrefix, endpoint.Path)).Subrouter()
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodLock).HandlerFunc(httpTraceHdrs(lockServer.LockHandler)).Queries(queries...)
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRLock).HandlerFunc(httpTraceHdrs(lockServer.RLockHandler)).Queries(queries...)
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodUnlock).HandlerFunc(httpTraceHdrs(lockServer.UnlockHandler)).Queries(queries...)
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRUnlock).HandlerFunc(httpTraceHdrs(lockServer.RUnlockHandler)).Queries(queries...)
globalLockServers[endpoint] = lockServer.ll
}
// If none of the routes match add default error handler routes
router.NotFoundHandler = http.HandlerFunc(httpTraceAll(errorResponseHandler))
router.MethodNotAllowedHandler = http.HandlerFunc(httpTraceAll(errorResponseHandler))
// Start lock maintenance from all lock servers.
go startLockMaintenance(globalLockServer)
}

View File

@ -27,21 +27,13 @@ import (
"fmt"
"time"
"github.com/minio/dsync/v2"
"github.com/minio/lsync"
"github.com/minio/minio-go/v6/pkg/set"
"github.com/minio/minio/cmd/logger"
xnet "github.com/minio/minio/pkg/net"
"github.com/minio/minio/pkg/dsync"
)
// Global name space lock.
var globalNSMutex *nsLockMap
// Global lock server one per server.
var globalLockServer *lockRESTServer
// Instance of dsync for distributed clients.
var globalDsync *dsync.Dsync
// local lock servers
var globalLockServers = make(map[Endpoint]*localLocker)
// RWLocker - locker interface to introduce GetRLock, RUnlock.
type RWLocker interface {
@ -51,45 +43,6 @@ type RWLocker interface {
RUnlock()
}
// Initialize distributed locking only in case of distributed setup.
// Returns lock clients and the node index for the current server.
func newDsyncNodes(endpoints EndpointList) (clnts []dsync.NetLocker, myNode int, err error) {
myNode = -1
seenHosts := set.NewStringSet()
for _, endpoint := range endpoints {
if seenHosts.Contains(endpoint.Host) {
continue
}
seenHosts.Add(endpoint.Host)
var locker dsync.NetLocker
if endpoint.IsLocal {
myNode = len(clnts)
globalLockServer = &lockRESTServer{
ll: &localLocker{
serverAddr: endpoint.Host,
serviceEndpoint: lockRESTPrefix,
lockMap: make(map[string][]lockRequesterInfo),
},
}
locker = globalLockServer.ll
} else {
var host *xnet.Host
host, err = xnet.ParseHost(endpoint.Host)
locker = newlockRESTClient(host)
}
clnts = append(clnts, locker)
}
if myNode == -1 {
return clnts, myNode, errors.New("no endpoint pointing to the local machine is found")
}
return clnts, myNode, err
}
// newNSLock - return a new name space lock map.
func newNSLock(isDistXL bool) *nsLockMap {
nsMutex := nsLockMap{
@ -102,11 +55,6 @@ func newNSLock(isDistXL bool) *nsLockMap {
return &nsMutex
}
// initNSLock - initialize name space lock map.
func initNSLock(isDistXL bool) {
globalNSMutex = newNSLock(isDistXL)
}
// nsParam - carries name space resource.
type nsParam struct {
volume string
@ -224,32 +172,6 @@ func (n *nsLockMap) RUnlock(volume, path, opsID string) {
n.unlock(volume, path, readLock)
}
// ForceUnlock - forcefully unlock a lock based on name.
func (n *nsLockMap) ForceUnlock(volume, path string) {
n.lockMapMutex.Lock()
defer n.lockMapMutex.Unlock()
// Clarification on operation:
// - In case of FS or XL we call ForceUnlock on the local globalNSMutex
// (since there is only a single server) which will cause the 'stuck'
// mutex to be removed from the map. Existing operations for this
// will continue to be blocked (and timeout). New operations on this
// resource will use a new mutex and proceed normally.
//
// - In case of Distributed setup (using dsync), there is no need to call
// ForceUnlock on the server where the lock was acquired and is presumably
// 'stuck'. Instead dsync.ForceUnlock() will release the underlying locks
// that participated in granting the lock. Any pending dsync locks that
// are blocking can now proceed as normal and any new locks will also
// participate normally.
if n.isDistXL { // For distributed mode, broadcast ForceUnlock message.
dsync.NewDRWMutex(context.Background(), pathJoin(volume, path), globalDsync).ForceUnlock()
}
// Remove lock from the map.
delete(n.lockMap, nsParam{volume, path})
}
// dsync's distributed lock instance.
type distLockInstance struct {
rwMutex *dsync.DRWMutex
@ -301,10 +223,14 @@ type localLockInstance struct {
// NewNSLock - returns a lock instance for a given volume and
// path. The returned lockInstance object encapsulates the nsLockMap,
// volume, path and operation ID.
func (n *nsLockMap) NewNSLock(ctx context.Context, volume, path string) RWLocker {
func (n *nsLockMap) NewNSLock(ctx context.Context, lockers []dsync.NetLocker, volume, path string) RWLocker {
opsID := mustGetUUID()
if n.isDistXL {
return &distLockInstance{dsync.NewDRWMutex(ctx, pathJoin(volume, path), globalDsync), volume, path, opsID}
sync, err := dsync.New(lockers)
if err != nil {
logger.CriticalIf(ctx, err)
}
return &distLockInstance{dsync.NewDRWMutex(ctx, pathJoin(volume, path), sync), volume, path, opsID}
}
return &localLockInstance{ctx, n, volume, path, opsID}
}

View File

@ -17,7 +17,6 @@
package cmd
import (
"context"
"testing"
"time"
)
@ -32,7 +31,7 @@ func TestGetSource(t *testing.T) {
currentSource := func() string { return getSource() }
gotSource := currentSource()
// Hard coded line number, 32, in the "expectedSource" value
expectedSource := "[namespace-lock_test.go:33:TestGetSource()]"
expectedSource := "[namespace-lock_test.go:32:TestGetSource()]"
if gotSource != expectedSource {
t.Errorf("expected : %s, got : %s", expectedSource, gotSource)
}
@ -41,7 +40,8 @@ func TestGetSource(t *testing.T) {
// Tests functionality provided by namespace lock.
func TestNamespaceLockTest(t *testing.T) {
isDistXL := false
initNSLock(isDistXL)
nsMutex := newNSLock(isDistXL)
// List of test cases.
testCases := []struct {
lk func(s1, s2, s3 string, t time.Duration) bool
@ -53,22 +53,22 @@ func TestNamespaceLockTest(t *testing.T) {
shouldPass bool
}{
{
lk: globalNSMutex.Lock,
unlk: globalNSMutex.Unlock,
lk: nsMutex.Lock,
unlk: nsMutex.Unlock,
lockedRefCount: 1,
unlockedRefCount: 0,
shouldPass: true,
},
{
rlk: globalNSMutex.RLock,
runlk: globalNSMutex.RUnlock,
rlk: nsMutex.RLock,
runlk: nsMutex.RUnlock,
lockedRefCount: 4,
unlockedRefCount: 2,
shouldPass: true,
},
{
rlk: globalNSMutex.RLock,
runlk: globalNSMutex.RUnlock,
rlk: nsMutex.RLock,
runlk: nsMutex.RUnlock,
lockedRefCount: 1,
unlockedRefCount: 0,
shouldPass: true,
@ -82,7 +82,7 @@ func TestNamespaceLockTest(t *testing.T) {
if !testCase.lk("a", "b", "c", 60*time.Second) { // lock once.
t.Fatalf("Failed to acquire lock")
}
nsLk, ok := globalNSMutex.lockMap[nsParam{"a", "b"}]
nsLk, ok := nsMutex.lockMap[nsParam{"a", "b"}]
if !ok && testCase.shouldPass {
t.Errorf("Lock in map missing.")
}
@ -94,7 +94,7 @@ func TestNamespaceLockTest(t *testing.T) {
if testCase.unlockedRefCount != nsLk.ref && testCase.shouldPass {
t.Errorf("Test %d fails, expected to pass. Wanted ref count is %d, got %d", 1, testCase.unlockedRefCount, nsLk.ref)
}
_, ok = globalNSMutex.lockMap[nsParam{"a", "b"}]
_, ok = nsMutex.lockMap[nsParam{"a", "b"}]
if ok && !testCase.shouldPass {
t.Errorf("Lock map found after unlock.")
}
@ -113,7 +113,7 @@ func TestNamespaceLockTest(t *testing.T) {
if !testCase.rlk("a", "b", "c", 60*time.Second) { // lock fourth time.
t.Fatalf("Failed to acquire fourth read lock")
}
nsLk, ok = globalNSMutex.lockMap[nsParam{"a", "b"}]
nsLk, ok = nsMutex.lockMap[nsParam{"a", "b"}]
if !ok && testCase.shouldPass {
t.Errorf("Lock in map missing.")
}
@ -127,7 +127,7 @@ func TestNamespaceLockTest(t *testing.T) {
if testCase.unlockedRefCount != nsLk.ref && testCase.shouldPass {
t.Errorf("Test %d fails, expected to pass. Wanted ref count is %d, got %d", 2, testCase.unlockedRefCount, nsLk.ref)
}
_, ok = globalNSMutex.lockMap[nsParam{"a", "b"}]
_, ok = nsMutex.lockMap[nsParam{"a", "b"}]
if !ok && testCase.shouldPass {
t.Errorf("Lock map not found.")
}
@ -138,7 +138,7 @@ func TestNamespaceLockTest(t *testing.T) {
t.Fatalf("Failed to acquire read lock")
}
nsLk, ok = globalNSMutex.lockMap[nsParam{"a", "c"}]
nsLk, ok = nsMutex.lockMap[nsParam{"a", "c"}]
if !ok && testCase.shouldPass {
t.Errorf("Lock in map missing.")
}
@ -150,7 +150,7 @@ func TestNamespaceLockTest(t *testing.T) {
if testCase.unlockedRefCount != nsLk.ref && testCase.shouldPass {
t.Errorf("Test %d fails, expected to pass. Wanted ref count is %d, got %d", 3, testCase.unlockedRefCount, nsLk.ref)
}
_, ok = globalNSMutex.lockMap[nsParam{"a", "c"}]
_, ok = nsMutex.lockMap[nsParam{"a", "c"}]
if ok && !testCase.shouldPass {
t.Errorf("Lock map not found.")
}
@ -158,83 +158,44 @@ func TestNamespaceLockTest(t *testing.T) {
func TestNamespaceLockTimedOut(t *testing.T) {
isDistXL := false
initNSLock(isDistXL)
nsMutex := newNSLock(isDistXL)
// Get write lock
if !globalNSMutex.Lock("my-bucket", "my-object", "abc", 60*time.Second) {
if !nsMutex.Lock("my-bucket", "my-object", "abc", 60*time.Second) {
t.Fatalf("Failed to acquire lock")
}
// Second attempt for write lock on same resource should time out
locked := globalNSMutex.Lock("my-bucket", "my-object", "def", 1*time.Second)
locked := nsMutex.Lock("my-bucket", "my-object", "def", 1*time.Second)
if locked {
t.Fatalf("Should not have acquired lock")
}
// Read lock on same resource should also time out
locked = globalNSMutex.RLock("my-bucket", "my-object", "def", 1*time.Second)
locked = nsMutex.RLock("my-bucket", "my-object", "def", 1*time.Second)
if locked {
t.Fatalf("Should not have acquired read lock while write lock is active")
}
// Release write lock
globalNSMutex.Unlock("my-bucket", "my-object", "abc")
nsMutex.Unlock("my-bucket", "my-object", "abc")
// Get read lock
if !globalNSMutex.RLock("my-bucket", "my-object", "ghi", 60*time.Second) {
if !nsMutex.RLock("my-bucket", "my-object", "ghi", 60*time.Second) {
t.Fatalf("Failed to acquire read lock")
}
// Write lock on same resource should time out
locked = globalNSMutex.Lock("my-bucket", "my-object", "klm", 1*time.Second)
locked = nsMutex.Lock("my-bucket", "my-object", "klm", 1*time.Second)
if locked {
t.Fatalf("Should not have acquired lock")
}
// 2nd read lock should be just fine
if !globalNSMutex.RLock("my-bucket", "my-object", "nop", 60*time.Second) {
if !nsMutex.RLock("my-bucket", "my-object", "nop", 60*time.Second) {
t.Fatalf("Failed to acquire second read lock")
}
// Release both read locks
globalNSMutex.RUnlock("my-bucket", "my-object", "ghi")
globalNSMutex.RUnlock("my-bucket", "my-object", "nop")
}
// Tests functionality to forcefully unlock locks.
func TestNamespaceForceUnlockTest(t *testing.T) {
isDistXL := false
initNSLock(isDistXL)
// Create lock.
lock := globalNSMutex.NewNSLock(context.Background(), "bucket", "object")
if lock.GetLock(newDynamicTimeout(60*time.Second, time.Second)) != nil {
t.Fatalf("Failed to get lock")
}
// Forcefully unlock lock.
globalNSMutex.ForceUnlock("bucket", "object")
ch := make(chan struct{}, 1)
go func() {
// Try to claim lock again.
anotherLock := globalNSMutex.NewNSLock(context.Background(), "bucket", "object")
if anotherLock.GetLock(newDynamicTimeout(60*time.Second, time.Second)) != nil {
t.Errorf("Failed to get lock")
return
}
// And signal success.
ch <- struct{}{}
}()
select {
case <-ch:
// Signaled so all is fine.
break
case <-time.After(100 * time.Millisecond):
// In case we hit the time out, the lock has not been cleared.
t.Errorf("Lock not cleared.")
}
// Clean up lock.
globalNSMutex.ForceUnlock("bucket", "object")
nsMutex.RUnlock("my-bucket", "my-object", "ghi")
nsMutex.RUnlock("my-bucket", "my-object", "nop")
}

View File

@ -466,7 +466,6 @@ func (sys *NotificationSys) ServerInfo(ctx context.Context) []ServerInfo {
// GetLocks - makes GetLocks RPC call on all peers.
func (sys *NotificationSys) GetLocks(ctx context.Context) []*PeerLocks {
locksResp := make([]*PeerLocks, len(sys.peerClients))
g := errgroup.WithNErrs(len(sys.peerClients))
for index, client := range sys.peerClients {
@ -703,7 +702,7 @@ func (sys *NotificationSys) initListeners(ctx context.Context, objAPI ObjectLaye
// As object layer's GetObject() and PutObject() take respective lock on minioMetaBucket
// and configFile, take a transaction lock to avoid data race between readConfig()
// and saveConfig().
objLock := globalNSMutex.NewNSLock(ctx, minioMetaBucket, transactionConfigFile)
objLock := objAPI.NewNSLock(ctx, minioMetaBucket, transactionConfigFile)
if err := objLock.GetRLock(globalOperationTimeout); err != nil {
return err
}
@ -1344,7 +1343,7 @@ func SaveListener(objAPI ObjectLayer, bucketName string, eventNames []event.Name
// As object layer's GetObject() and PutObject() take respective lock on minioMetaBucket
// and configFile, take a transaction lock to avoid data race between readConfig()
// and saveConfig().
objLock := globalNSMutex.NewNSLock(ctx, minioMetaBucket, transactionConfigFile)
objLock := objAPI.NewNSLock(ctx, minioMetaBucket, transactionConfigFile)
if err := objLock.GetLock(globalOperationTimeout); err != nil {
return err
}
@ -1395,7 +1394,7 @@ func RemoveListener(objAPI ObjectLayer, bucketName string, targetID event.Target
// As object layer's GetObject() and PutObject() take respective lock on minioMetaBucket
// and configFile, take a transaction lock to avoid data race between readConfig()
// and saveConfig().
objLock := globalNSMutex.NewNSLock(ctx, minioMetaBucket, transactionConfigFile)
objLock := objAPI.NewNSLock(ctx, minioMetaBucket, transactionConfigFile)
if err := objLock.GetLock(globalOperationTimeout); err != nil {
return err
}

View File

@ -106,7 +106,7 @@ func newStorageAPI(endpoint Endpoint) (storage StorageAPI, err error) {
return &posixDiskIDCheck{storage: storage}, nil
}
return newStorageRESTClient(endpoint)
return newStorageRESTClient(endpoint), nil
}
// Cleanup a directory recursively.

View File

@ -48,6 +48,9 @@ const (
// ObjectLayer implements primitives for object API layer.
type ObjectLayer interface {
// Locking operations on object.
NewNSLock(ctx context.Context, bucket string, object string) RWLocker
// Storage operations.
Shutdown(context.Context) error
StorageInfo(context.Context) StorageInfo

View File

@ -100,7 +100,7 @@ func (client *peerRESTClient) Close() error {
}
// GetLocksResp stores various info from the client for each lock that is requested.
type GetLocksResp map[string][]lockRequesterInfo
type GetLocksResp []map[string][]lockRequesterInfo
// NetReadPerfInfo - fetch network read performance information for a remote node.
func (client *peerRESTClient) NetReadPerfInfo(size int64) (info ServerNetReadPerfInfo, err error) {
@ -759,10 +759,10 @@ func newPeerRESTClient(peer *xnet.Host) (*peerRESTClient, error) {
}
}
restClient, err := rest.NewClient(serverURL, tlsConfig, rest.DefaultRESTTimeout, newAuthToken)
trFn := newCustomHTTPTransport(tlsConfig, rest.DefaultRESTTimeout, rest.DefaultRESTTimeout)
restClient, err := rest.NewClient(serverURL, trFn, newAuthToken)
if err != nil {
return &peerRESTClient{host: peer, restClient: restClient, connected: 0}, err
return nil, err
}
return &peerRESTClient{host: peer, restClient: restClient, connected: 1}, nil

View File

@ -148,8 +148,12 @@ func (s *peerRESTServer) GetLocksHandler(w http.ResponseWriter, r *http.Request)
}
ctx := newContext(r, w, "GetLocks")
locks := globalLockServer.ll.DupLockMap()
logger.LogIf(ctx, gob.NewEncoder(w).Encode(locks))
var llockers []map[string][]lockRequesterInfo
for _, llocker := range globalLockServers {
llockers = append(llockers, llocker.DupLockMap())
}
logger.LogIf(ctx, gob.NewEncoder(w).Encode(llockers))
w.(http.Flusher).Flush()

View File

@ -268,14 +268,18 @@ func connectLoadInitFormats(retryCount int, firstDisk bool, endpoints EndpointLi
}
if format.ID == "" {
if err = formatXLFixDeploymentID(context.Background(), endpoints, storageDisks, format); err != nil {
// Not a first disk, wait until first disk fixes deploymentID
if !firstDisk {
return nil, errNotFirstDisk
}
if err = formatXLFixDeploymentID(endpoints, storageDisks, format); err != nil {
return nil, err
}
}
globalDeploymentID = format.ID
if err = formatXLFixLocalDeploymentID(context.Background(), endpoints, storageDisks, format); err != nil {
if err = formatXLFixLocalDeploymentID(endpoints, storageDisks, format); err != nil {
return nil, err
}
return format, nil

View File

@ -18,11 +18,9 @@ package rest
import (
"context"
"crypto/tls"
"errors"
"io"
"io/ioutil"
"net"
"net/http"
"net/url"
"time"
@ -102,32 +100,11 @@ func (c *Client) Close() {
}
}
func newCustomDialContext(timeout time.Duration) func(ctx context.Context, network, addr string) (net.Conn, error) {
return func(ctx context.Context, network, addr string) (net.Conn, error) {
dialer := &net.Dialer{
Timeout: timeout,
KeepAlive: timeout,
DualStack: true,
}
return dialer.DialContext(ctx, network, addr)
}
}
// NewClient - returns new REST client.
func NewClient(url *url.URL, tlsConfig *tls.Config, timeout time.Duration, newAuthToken func() string) (*Client, error) {
func NewClient(url *url.URL, newCustomTransport func() *http.Transport, newAuthToken func() string) (*Client, error) {
// Transport is exactly same as Go default in https://golang.org/pkg/net/http/#RoundTripper
// except custom DialContext and TLSClientConfig.
tr := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: newCustomDialContext(timeout),
MaxIdleConnsPerHost: 256,
IdleConnTimeout: 60 * time.Second,
TLSHandshakeTimeout: 30 * time.Second,
ExpectContinueTimeout: 10 * time.Second,
TLSClientConfig: tlsConfig,
DisableCompression: true,
}
tr := newCustomTransport()
return &Client{
httpClient: &http.Client{Transport: tr},
httpIdleConnsCloser: tr.CloseIdleConnections,

View File

@ -31,7 +31,7 @@ func registerDistXLRouters(router *mux.Router, endpoints EndpointList) {
registerPeerRESTHandlers(router)
// Register distributed namespace lock.
registerLockRESTHandlers(router)
registerLockRESTHandlers(router, endpoints)
}

View File

@ -27,7 +27,6 @@ import (
"syscall"
"github.com/minio/cli"
"github.com/minio/dsync/v2"
"github.com/minio/minio/cmd/config"
xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger"
@ -144,12 +143,6 @@ func serverHandleCmdArgs(ctx *cli.Context) {
var setupType SetupType
var err error
if len(ctx.Args()) > serverCommandLineArgsMax {
uErr := config.ErrInvalidErasureEndpoints(nil).Msg(fmt.Sprintf("Invalid total number of endpoints (%d) passed, supported upto 32 unique arguments",
len(ctx.Args())))
logger.FatalIf(uErr, "Unable to validate passed endpoints")
}
endpoints := strings.Fields(env.Get(config.EnvEndpoints, ""))
if len(endpoints) > 0 {
globalMinioAddr, globalEndpoints, setupType, globalXLSetCount, globalXLSetDriveCount, err = createServerEndpoints(globalCLIContext.Addr, endpoints...)
@ -329,21 +322,6 @@ func serverMain(ctx *cli.Context) {
// Set system resources to maximum.
logger.LogIf(context.Background(), setMaxResources())
// Set nodes for dsync for distributed setup.
if globalIsDistXL {
clnts, myNode, err := newDsyncNodes(globalEndpoints)
if err != nil {
logger.Fatal(err, "Unable to initialize distributed locking on %s", globalEndpoints)
}
globalDsync, err = dsync.New(clnts, myNode)
if err != nil {
logger.Fatal(err, "Unable to initialize distributed locking on %s", globalEndpoints)
}
}
// Initialize name space lock.
initNSLock(globalIsDistXL)
if globalIsXL {
// Init global heal state
globalAllHealState = initHealState()

View File

@ -19,6 +19,7 @@ package cmd
import (
"bufio"
"bytes"
"context"
"crypto/tls"
"encoding/gob"
"encoding/hex"
@ -30,6 +31,7 @@ import (
"sync/atomic"
"github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/cmd/rest"
xnet "github.com/minio/minio/pkg/net"
)
@ -446,19 +448,9 @@ func (client *storageRESTClient) Close() error {
}
// Returns a storage rest client.
func newStorageRESTClient(endpoint Endpoint) (*storageRESTClient, error) {
host, err := xnet.ParseHost(endpoint.Host)
if err != nil {
return nil, err
}
scheme := "http"
if globalIsSSL {
scheme = "https"
}
func newStorageRESTClient(endpoint Endpoint) *storageRESTClient {
serverURL := &url.URL{
Scheme: scheme,
Scheme: endpoint.Scheme,
Host: endpoint.Host,
Path: path.Join(storageRESTPrefix, endpoint.Path, storageRESTVersion),
}
@ -466,16 +458,17 @@ func newStorageRESTClient(endpoint Endpoint) (*storageRESTClient, error) {
var tlsConfig *tls.Config
if globalIsSSL {
tlsConfig = &tls.Config{
ServerName: host.Name,
ServerName: endpoint.Hostname(),
RootCAs: globalRootCAs,
NextProtos: []string{"http/1.1"}, // Force http1.1
}
}
restClient, err := rest.NewClient(serverURL, tlsConfig, rest.DefaultRESTTimeout, newAuthToken)
trFn := newCustomHTTPTransport(tlsConfig, rest.DefaultRESTTimeout, rest.DefaultRESTTimeout)
restClient, err := rest.NewClient(serverURL, trFn, newAuthToken)
if err != nil {
return nil, err
logger.LogIf(context.Background(), err)
return &storageRESTClient{endpoint: endpoint, restClient: restClient, connected: 0}
}
client := &storageRESTClient{endpoint: endpoint, restClient: restClient, connected: 1}
return client, nil
return &storageRESTClient{endpoint: endpoint, restClient: restClient, connected: 1}
}

View File

@ -514,11 +514,7 @@ func newStorageRESTHTTPServerClient(t *testing.T) (*httptest.Server, *storageRES
}
registerStorageRESTHandlers(router, EndpointList{endpoint})
restClient, err := newStorageRESTClient(endpoint)
if err != nil {
t.Fatalf("newStorageRESTClient failed for %v, with error %s", endpoint, err)
}
restClient := newStorageRESTClient(endpoint)
prevGlobalServerConfig := globalServerConfig
globalServerConfig = newServerConfig()

View File

@ -60,6 +60,7 @@ import (
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/auth"
"github.com/minio/minio/pkg/bpool"
"github.com/minio/minio/pkg/dsync"
"github.com/minio/minio/pkg/hash"
"github.com/minio/minio/pkg/policy"
)
@ -69,9 +70,6 @@ func init() {
// Set as non-distributed.
globalIsDistXL = false
// Initialize name space lock.
initNSLock(globalIsDistXL)
// Disable printing console messages during tests.
color.Output = ioutil.Discard
@ -451,13 +449,6 @@ func resetGlobalConfig() {
globalServerConfigMu.Unlock()
}
// reset global NSLock.
func resetGlobalNSLock() {
if globalNSMutex != nil {
globalNSMutex = nil
}
}
func resetGlobalEndpoints() {
globalEndpoints = EndpointList{}
}
@ -497,8 +488,6 @@ func resetTestGlobals() {
resetGlobalConfigPath()
// Reset Global server config.
resetGlobalConfig()
// Reset global NSLock.
resetGlobalNSLock()
// Reset global endpoints.
resetGlobalEndpoints()
// Reset global isXL flag.
@ -1629,6 +1618,9 @@ func newTestObjectLayer(endpoints EndpointList) (newObject ObjectLayer, err erro
xl.getDisks = func() []StorageAPI {
return xl.storageDisks
}
xl.getLockers = func() []dsync.NetLocker {
return nil
}
globalConfigSys = NewConfigSys()
@ -1918,9 +1910,6 @@ func ExecObjectLayerAPITest(t *testing.T, objAPITest objAPITestType, endpoints [
// this is to make sure that the tests are not affected by modified value.
resetTestGlobals()
// initialize NSLock.
initNSLock(false)
objLayer, fsDir, err := prepareFS()
if err != nil {
t.Fatalf("Initialization of object layer failed for single node setup: %s", err)

View File

@ -332,25 +332,49 @@ func ToS3ETag(etag string) string {
return etag
}
type dialContext func(ctx context.Context, network, address string) (net.Conn, error)
func newCustomDialContext(dialTimeout, dialKeepAlive time.Duration) dialContext {
return func(ctx context.Context, network, addr string) (net.Conn, error) {
dialer := &net.Dialer{
Timeout: dialTimeout,
KeepAlive: dialKeepAlive,
DualStack: true,
}
return dialer.DialContext(ctx, network, addr)
}
}
func newCustomHTTPTransport(tlsConfig *tls.Config, dialTimeout, dialKeepAlive time.Duration) func() *http.Transport {
// For more details about various values used here refer
// https://golang.org/pkg/net/http/#Transport documentation
tr := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: newCustomDialContext(dialTimeout, dialKeepAlive),
MaxIdleConnsPerHost: 256,
IdleConnTimeout: 60 * time.Second,
TLSHandshakeTimeout: 30 * time.Second,
ExpectContinueTimeout: 10 * time.Second,
TLSClientConfig: tlsConfig,
// Go net/http automatically unzip if content-type is
// gzip disable this feature, as we are always interested
// in raw stream.
DisableCompression: true,
}
return func() *http.Transport {
return tr
}
}
// NewCustomHTTPTransport returns a new http configuration
// used while communicating with the cloud backends.
// This sets the value for MaxIdleConnsPerHost from 2 (go default)
// to 100.
// to 256.
func NewCustomHTTPTransport() *http.Transport {
return &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: defaultDialTimeout,
KeepAlive: defaultDialKeepAlive,
}).DialContext,
MaxIdleConns: 1024,
MaxIdleConnsPerHost: 1024,
IdleConnTimeout: 30 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
TLSClientConfig: &tls.Config{RootCAs: globalRootCAs},
DisableCompression: true,
}
return newCustomHTTPTransport(&tls.Config{
RootCAs: globalRootCAs,
}, defaultDialTimeout, defaultDialKeepAlive)()
}
// Load the json (typically from disk file).

View File

@ -29,6 +29,7 @@ import (
"github.com/minio/minio/cmd/config/storageclass"
xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/pkg/bpool"
"github.com/minio/minio/pkg/dsync"
"github.com/minio/minio/pkg/lifecycle"
"github.com/minio/minio/pkg/madmin"
"github.com/minio/minio/pkg/policy"
@ -38,6 +39,9 @@ import (
// setsStorageAPI is encapsulated type for Close()
type setsStorageAPI [][]StorageAPI
// setsDsyncLockers is encapsulated type for Close()
type setsDsyncLockers [][]dsync.NetLocker
func (s setsStorageAPI) Close() error {
for i := 0; i < len(s); i++ {
for j, disk := range s[i] {
@ -51,6 +55,18 @@ func (s setsStorageAPI) Close() error {
return nil
}
func (s setsDsyncLockers) Close() error {
for i := 0; i < len(s); i++ {
for _, locker := range s[i] {
if locker == nil {
continue
}
locker.Close()
}
}
return nil
}
// xlSets implements ObjectLayer combining a static list of erasure coded
// object sets. NOTE: There is no dynamic scaling allowed or intended in
// current design.
@ -66,6 +82,12 @@ type xlSets struct {
// Re-ordered list of disks per set.
xlDisks setsStorageAPI
// Distributed locker clients.
xlLockers setsDsyncLockers
// Lockers map holds dsync lockers for each endpoint
lockersMap map[Endpoint]dsync.NetLocker
// List of endpoints provided on the command line.
endpoints EndpointList
@ -101,7 +123,11 @@ func (s *xlSets) isConnected(endpoint Endpoint) bool {
if s.xlDisks[i][j].String() != endpointStr {
continue
}
return s.xlDisks[i][j].IsOnline()
if s.xlDisks[i][j].IsOnline() {
return true
}
s.xlLockers[i][j].Close()
return false
}
}
return false
@ -147,10 +173,9 @@ func findDiskIndex(refFormat, format *formatXLV3) (int, int, error) {
return -1, -1, fmt.Errorf("diskID: %s not found", format.XL.This)
}
// connectDisksWithQuorum is same as connectDisks but waits
// for quorum number of formatted disks to be online in
// any given sets.
func (s *xlSets) connectDisksWithQuorum() {
// connectDisksAndLockersWithQuorum is same as connectDisksAndLockers but waits
// for quorum number of formatted disks to be online in any given sets.
func (s *xlSets) connectDisksAndLockersWithQuorum() {
var onlineDisks int
for onlineDisks < len(s.endpoints)/2 {
for _, endpoint := range s.endpoints {
@ -171,6 +196,7 @@ func (s *xlSets) connectDisksWithQuorum() {
}
disk.SetDiskID(format.XL.This)
s.xlDisks[i][j] = disk
s.xlLockers[i][j] = s.lockersMap[endpoint]
onlineDisks++
}
// Sleep for a while - so that we don't go into
@ -179,9 +205,9 @@ func (s *xlSets) connectDisksWithQuorum() {
}
}
// connectDisks - attempt to connect all the endpoints, loads format
// connectDisksAndLockers - attempt to connect all the endpoints, loads format
// and re-arranges the disks in proper position.
func (s *xlSets) connectDisks() {
func (s *xlSets) connectDisksAndLockers() {
for _, endpoint := range s.endpoints {
if s.isConnected(endpoint) {
continue
@ -201,6 +227,7 @@ func (s *xlSets) connectDisks() {
disk.SetDiskID(format.XL.This)
s.xlDisksMu.Lock()
s.xlDisks[i][j] = disk
s.xlLockers[i][j] = s.lockersMap[endpoint]
s.xlDisksMu.Unlock()
}
}
@ -220,11 +247,27 @@ func (s *xlSets) monitorAndConnectEndpoints(monitorInterval time.Duration) {
case <-s.disksConnectDoneCh:
return
case <-ticker.C:
s.connectDisks()
s.connectDisksAndLockers()
}
}
}
func (s *xlSets) GetLockers(setIndex int) func() []dsync.NetLocker {
return func() []dsync.NetLocker {
s.xlDisksMu.Lock()
defer s.xlDisksMu.Unlock()
lockers := make([]dsync.NetLocker, s.drivesPerSet)
copy(lockers, s.xlLockers[setIndex])
for i, lk := range lockers {
// Add error lockers for unavailable locker.
if lk == nil {
lockers[i] = &errorLocker{}
}
}
return lockers
}
}
// GetDisks returns a closure for a given set, which provides list of disks per set.
func (s *xlSets) GetDisks(setIndex int) func() []StorageAPI {
return func() []StorageAPI {
@ -241,10 +284,17 @@ const defaultMonitorConnectEndpointInterval = time.Second * 10 // Set to 10 secs
// Initialize new set of erasure coded sets.
func newXLSets(endpoints EndpointList, format *formatXLV3, setCount int, drivesPerSet int) (ObjectLayer, error) {
lockersMap := make(map[Endpoint]dsync.NetLocker)
for _, endpoint := range endpoints {
lockersMap[endpoint] = newLockAPI(endpoint)
}
// Initialize the XL sets instance.
s := &xlSets{
sets: make([]*xlObjects, setCount),
xlDisks: make([][]StorageAPI, setCount),
xlLockers: make([][]dsync.NetLocker, setCount),
lockersMap: lockersMap,
endpoints: endpoints,
setCount: setCount,
drivesPerSet: drivesPerSet,
@ -262,18 +312,21 @@ func newXLSets(endpoints EndpointList, format *formatXLV3, setCount int, drivesP
for i := 0; i < len(format.XL.Sets); i++ {
s.xlDisks[i] = make([]StorageAPI, drivesPerSet)
s.xlLockers[i] = make([]dsync.NetLocker, drivesPerSet)
// Initialize xl objects for a given set.
s.sets[i] = &xlObjects{
getDisks: s.GetDisks(i),
nsMutex: mutex,
bp: bp,
getDisks: s.GetDisks(i),
getLockers: s.GetLockers(i),
nsMutex: mutex,
bp: bp,
}
go s.sets[i].cleanupStaleMultipartUploads(context.Background(), GlobalMultipartCleanupInterval, GlobalMultipartExpiry, GlobalServiceDoneCh)
go s.sets[i].cleanupStaleMultipartUploads(context.Background(),
GlobalMultipartCleanupInterval, GlobalMultipartExpiry, GlobalServiceDoneCh)
}
// Connect disks right away, but wait until we have `format.json` quorum.
s.connectDisksWithQuorum()
s.connectDisksAndLockersWithQuorum()
// Start the disk monitoring and connect routine.
go s.monitorAndConnectEndpoints(defaultMonitorConnectEndpointInterval)
@ -281,6 +334,11 @@ func newXLSets(endpoints EndpointList, format *formatXLV3, setCount int, drivesP
return s, nil
}
// NewNSLock - initialize a new namespace RWLocker instance.
func (s *xlSets) NewNSLock(ctx context.Context, bucket string, object string) RWLocker {
return s.getHashedSet(object).NewNSLock(ctx, bucket, object)
}
// StorageInfo - combines output of StorageInfo across all erasure coded object sets.
func (s *xlSets) StorageInfo(ctx context.Context) StorageInfo {
var storageInfo StorageInfo
@ -406,6 +464,13 @@ func (s *xlSets) Shutdown(ctx context.Context) error {
// even if one of the sets fail to create buckets, we proceed to undo a
// successful operation.
func (s *xlSets) MakeBucketWithLocation(ctx context.Context, bucket, location string) error {
set := s.getHashedSet(bucket)
bucketLock := set.nsMutex.NewNSLock(ctx, set.getLockers(), bucket, "")
if err := bucketLock.GetLock(globalOperationTimeout); err != nil {
return err
}
defer bucketLock.Unlock()
g := errgroup.WithNErrs(len(s.sets))
// Create buckets in parallel across all sets.
@ -484,6 +549,13 @@ func (s *xlSets) getHashedSet(input string) (set *xlObjects) {
// GetBucketInfo - returns bucket info from one of the erasure coded set.
func (s *xlSets) GetBucketInfo(ctx context.Context, bucket string) (bucketInfo BucketInfo, err error) {
set := s.getHashedSet(bucket)
bucketLock := set.nsMutex.NewNSLock(ctx, set.getLockers(), bucket, "")
if err = bucketLock.GetRLock(globalOperationTimeout); err != nil {
return bucketInfo, err
}
defer bucketLock.RUnlock()
return s.getHashedSet(bucket).GetBucketInfo(ctx, bucket)
}
@ -563,6 +635,13 @@ func (s *xlSets) IsCompressionSupported() bool {
// even if one of the sets fail to delete buckets, we proceed to
// undo a successful operation.
func (s *xlSets) DeleteBucket(ctx context.Context, bucket string) error {
set := s.getHashedSet(bucket)
bucketLock := set.nsMutex.NewNSLock(ctx, set.getLockers(), bucket, "")
if err := bucketLock.GetLock(globalOperationTimeout); err != nil {
return err
}
defer bucketLock.Unlock()
g := errgroup.WithNErrs(len(s.sets))
// Delete buckets in parallel across all sets.
@ -709,7 +788,7 @@ func (s *xlSets) CopyObject(ctx context.Context, srcBucket, srcObject, destBucke
}
if !cpSrcDstSame {
objectDWLock := destSet.nsMutex.NewNSLock(ctx, destBucket, destObject)
objectDWLock := destSet.nsMutex.NewNSLock(ctx, destSet.getLockers(), destBucket, destObject)
if err := objectDWLock.GetLock(globalObjectTimeout); err != nil {
return objInfo, err
}
@ -1239,7 +1318,8 @@ func formatsToDrivesInfo(endpoints EndpointList, formats []*formatXLV3, sErrs []
// healing in a distributed setup.
func (s *xlSets) ReloadFormat(ctx context.Context, dryRun bool) (err error) {
// Acquire lock on format.json
formatLock := s.getHashedSet(formatConfigFile).nsMutex.NewNSLock(ctx, minioMetaBucket, formatConfigFile)
set := s.getHashedSet(formatConfigFile)
formatLock := set.nsMutex.NewNSLock(ctx, set.getLockers(), minioMetaBucket, formatConfigFile)
if err = formatLock.GetRLock(globalHealingTimeout); err != nil {
return err
}
@ -1285,9 +1365,10 @@ func (s *xlSets) ReloadFormat(ctx context.Context, dryRun bool) (err error) {
// Replace the new format.
s.format = refFormat
// Close all existing disks and reconnect all the disks.
// Close all existing disks, lockers and reconnect all the disks/lockers.
s.xlDisks.Close()
s.connectDisks()
s.xlLockers.Close()
s.connectDisksAndLockers()
// Restart monitoring loop to monitor reformatted disks again.
go s.monitorAndConnectEndpoints(defaultMonitorConnectEndpointInterval)
@ -1356,7 +1437,8 @@ func markRootDisksAsDown(storageDisks []StorageAPI) {
// coded data in it.
func (s *xlSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.HealResultItem, err error) {
// Acquire lock on format.json
formatLock := s.getHashedSet(formatConfigFile).nsMutex.NewNSLock(ctx, minioMetaBucket, formatConfigFile)
set := s.getHashedSet(formatConfigFile)
formatLock := set.nsMutex.NewNSLock(ctx, set.getLockers(), minioMetaBucket, formatConfigFile)
if err = formatLock.GetLock(globalHealingTimeout); err != nil {
return madmin.HealResultItem{}, err
}
@ -1492,9 +1574,10 @@ func (s *xlSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.HealRe
// Replace with new reference format.
s.format = refFormat
// Disconnect/relinquish all existing disks and reconnect the disks.
// Disconnect/relinquish all existing disks, lockers and reconnect the disks, lockers.
s.xlDisks.Close()
s.connectDisks()
s.xlLockers.Close()
s.connectDisksAndLockers()
// Restart our monitoring loop to start monitoring newly formatted disks.
go s.monitorAndConnectEndpoints(defaultMonitorConnectEndpointInterval)
@ -1505,8 +1588,9 @@ func (s *xlSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.HealRe
// HealBucket - heals inconsistent buckets and bucket metadata on all sets.
func (s *xlSets) HealBucket(ctx context.Context, bucket string, dryRun, remove bool) (result madmin.HealResultItem, err error) {
bucketLock := globalNSMutex.NewNSLock(ctx, bucket, "")
if err := bucketLock.GetLock(globalHealingTimeout); err != nil {
set := s.getHashedSet(bucket)
bucketLock := set.nsMutex.NewNSLock(ctx, set.getLockers(), bucket, "")
if err = bucketLock.GetLock(globalOperationTimeout); err != nil {
return result, err
}
defer bucketLock.Unlock()

View File

@ -140,11 +140,6 @@ func (xl xlObjects) getBucketInfo(ctx context.Context, bucketName string) (bucke
// GetBucketInfo - returns BucketInfo for a bucket.
func (xl xlObjects) GetBucketInfo(ctx context.Context, bucket string) (bi BucketInfo, e error) {
bucketLock := xl.nsMutex.NewNSLock(ctx, bucket, "")
if e := bucketLock.GetRLock(globalObjectTimeout); e != nil {
return bi, e
}
defer bucketLock.RUnlock()
bucketInfo, err := xl.getBucketInfo(ctx, bucket)
if err != nil {
return bi, toObjectErr(err, bucket)
@ -226,12 +221,6 @@ func deleteDanglingBucket(ctx context.Context, storageDisks []StorageAPI, dErrs
// DeleteBucket - deletes a bucket.
func (xl xlObjects) DeleteBucket(ctx context.Context, bucket string) error {
bucketLock := xl.nsMutex.NewNSLock(ctx, bucket, "")
if err := bucketLock.GetLock(globalObjectTimeout); err != nil {
return err
}
defer bucketLock.Unlock()
// Collect if all disks report volume not found.
storageDisks := xl.getDisks()

View File

@ -681,7 +681,7 @@ func (xl xlObjects) HealObject(ctx context.Context, bucket, object string, dryRu
// Lock the object before healing. Use read lock since healing
// will only regenerate parts & xl.json of outdated disks.
objectLock := xl.nsMutex.NewNSLock(ctx, bucket, object)
objectLock := xl.nsMutex.NewNSLock(ctx, xl.getLockers(), bucket, object)
if lerr := objectLock.GetRLock(globalHealingTimeout); lerr != nil {
return madmin.HealResultItem{}, lerr
}

View File

@ -264,7 +264,7 @@ func (xl xlObjects) NewMultipartUpload(ctx context.Context, bucket, object strin
func (xl xlObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject, uploadID string, partID int, startOffset int64, length int64, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (pi PartInfo, e error) {
// Hold read locks on source object only if we are
// going to read data from source object.
objectSRLock := xl.nsMutex.NewNSLock(ctx, srcBucket, srcObject)
objectSRLock := xl.nsMutex.NewNSLock(ctx, xl.getLockers(), srcBucket, srcObject)
if err := objectSRLock.GetRLock(globalObjectTimeout); err != nil {
return pi, err
}
@ -306,7 +306,7 @@ func (xl xlObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID
uploadIDLockPath := xl.getUploadIDLockPath(bucket, object, uploadID)
// pre-check upload id lock.
preUploadIDLock := xl.nsMutex.NewNSLock(ctx, minioMetaMultipartBucket, uploadIDLockPath)
preUploadIDLock := xl.nsMutex.NewNSLock(ctx, xl.getLockers(), minioMetaMultipartBucket, uploadIDLockPath)
if err := preUploadIDLock.GetRLock(globalOperationTimeout); err != nil {
return pi, err
}
@ -404,7 +404,7 @@ func (xl xlObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID
}
// post-upload check (write) lock
postUploadIDLock := xl.nsMutex.NewNSLock(ctx, minioMetaMultipartBucket, uploadIDLockPath)
postUploadIDLock := xl.nsMutex.NewNSLock(ctx, xl.getLockers(), minioMetaMultipartBucket, uploadIDLockPath)
if err = postUploadIDLock.GetLock(globalOperationTimeout); err != nil {
return pi, err
}
@ -499,7 +499,8 @@ func (xl xlObjects) ListObjectParts(ctx context.Context, bucket, object, uploadI
}
// Hold lock so that there is no competing
// abort-multipart-upload or complete-multipart-upload.
uploadIDLock := xl.nsMutex.NewNSLock(ctx, minioMetaMultipartBucket,
uploadIDLock := xl.nsMutex.NewNSLock(ctx, xl.getLockers(),
minioMetaMultipartBucket,
xl.getUploadIDLockPath(bucket, object, uploadID))
if err := uploadIDLock.GetLock(globalListingTimeout); err != nil {
return result, err
@ -603,7 +604,7 @@ func (xl xlObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
return oi, err
}
// Hold write lock on the object.
destLock := xl.nsMutex.NewNSLock(ctx, bucket, object)
destLock := xl.nsMutex.NewNSLock(ctx, xl.getLockers(), bucket, object)
if err := destLock.GetLock(globalObjectTimeout); err != nil {
return oi, err
}
@ -618,7 +619,7 @@ func (xl xlObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
//
// 2) no one does a parallel complete-multipart-upload on this
// multipart upload
uploadIDLock := xl.nsMutex.NewNSLock(ctx, minioMetaMultipartBucket, uploadIDLockPath)
uploadIDLock := xl.nsMutex.NewNSLock(ctx, xl.getLockers(), minioMetaMultipartBucket, uploadIDLockPath)
if err := uploadIDLock.GetLock(globalOperationTimeout); err != nil {
return oi, err
}
@ -824,7 +825,7 @@ func (xl xlObjects) AbortMultipartUpload(ctx context.Context, bucket, object, up
uploadIDLockPath := xl.getUploadIDLockPath(bucket, object, uploadID)
// Hold lock so that there is no competing
// complete-multipart-upload or put-object-part.
uploadIDLock := xl.nsMutex.NewNSLock(ctx, minioMetaMultipartBucket, uploadIDLockPath)
uploadIDLock := xl.nsMutex.NewNSLock(ctx, xl.getLockers(), minioMetaMultipartBucket, uploadIDLockPath)
if err := uploadIDLock.GetLock(globalOperationTimeout); err != nil {
return err
}

View File

@ -127,7 +127,7 @@ func (xl xlObjects) GetObjectNInfo(ctx context.Context, bucket, object string, r
// Acquire lock
if lockType != noLock {
lock := xl.nsMutex.NewNSLock(ctx, bucket, object)
lock := xl.nsMutex.NewNSLock(ctx, xl.getLockers(), bucket, object)
switch lockType {
case writeLock:
if err = lock.GetLock(globalObjectTimeout); err != nil {
@ -190,7 +190,7 @@ func (xl xlObjects) GetObjectNInfo(ctx context.Context, bucket, object string, r
// length indicates the total length of the object.
func (xl xlObjects) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) error {
// Lock the object before reading.
objectLock := xl.nsMutex.NewNSLock(ctx, bucket, object)
objectLock := xl.nsMutex.NewNSLock(ctx, xl.getLockers(), bucket, object)
if err := objectLock.GetRLock(globalObjectTimeout); err != nil {
return err
}
@ -369,7 +369,7 @@ func (xl xlObjects) getObjectInfoDir(ctx context.Context, bucket, object string)
// GetObjectInfo - reads object metadata and replies back ObjectInfo.
func (xl xlObjects) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (oi ObjectInfo, e error) {
// Lock the object before reading.
objectLock := xl.nsMutex.NewNSLock(ctx, bucket, object)
objectLock := xl.nsMutex.NewNSLock(ctx, xl.getLockers(), bucket, object)
if err := objectLock.GetRLock(globalObjectTimeout); err != nil {
return oi, err
}
@ -498,7 +498,7 @@ func (xl xlObjects) PutObject(ctx context.Context, bucket string, object string,
}
// Lock the object.
objectLock := xl.nsMutex.NewNSLock(ctx, bucket, object)
objectLock := xl.nsMutex.NewNSLock(ctx, xl.getLockers(), bucket, object)
if err := objectLock.GetLock(globalObjectTimeout); err != nil {
return objInfo, err
}
@ -851,7 +851,7 @@ func (xl xlObjects) deleteObjects(ctx context.Context, bucket string, objects []
continue
}
// Acquire a write lock before deleting the object.
objectLocks[i] = xl.nsMutex.NewNSLock(ctx, bucket, object)
objectLocks[i] = xl.nsMutex.NewNSLock(ctx, xl.getLockers(), bucket, object)
if errs[i] = objectLocks[i].GetLock(globalOperationTimeout); errs[i] != nil {
continue
}
@ -954,7 +954,7 @@ func (xl xlObjects) DeleteObjects(ctx context.Context, bucket string, objects []
// response to the client request.
func (xl xlObjects) DeleteObject(ctx context.Context, bucket, object string) (err error) {
// Acquire a write lock before deleting the object.
objectLock := xl.nsMutex.NewNSLock(ctx, bucket, object)
objectLock := xl.nsMutex.NewNSLock(ctx, xl.getLockers(), bucket, object)
if perr := objectLock.GetLock(globalOperationTimeout); perr != nil {
return perr
}

View File

@ -23,6 +23,7 @@ import (
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/bpool"
"github.com/minio/minio/pkg/dsync"
"github.com/minio/minio/pkg/madmin"
xnet "github.com/minio/minio/pkg/net"
"github.com/minio/minio/pkg/sync/errgroup"
@ -39,12 +40,15 @@ var OfflineDisk StorageAPI // zero value is nil
// xlObjects - Implements XL object layer.
type xlObjects struct {
// name space mutex for object layer.
nsMutex *nsLockMap
// getDisks returns list of storageAPIs.
getDisks func() []StorageAPI
// getLockers returns list of remote and local lockers.
getLockers func() []dsync.NetLocker
// Locker mutex map.
nsMutex *nsLockMap
// Byte pools used for temporary i/o buffers.
bp *bpool.BytePoolCap
@ -55,10 +59,16 @@ type xlObjects struct {
listPool *TreeWalkPool
}
// NewNSLock - initialize a new namespace RWLocker instance.
func (xl xlObjects) NewNSLock(ctx context.Context, bucket string, object string) RWLocker {
return xl.nsMutex.NewNSLock(ctx, xl.getLockers(), bucket, object)
}
// Shutdown function for object storage interface.
func (xl xlObjects) Shutdown(ctx context.Context) error {
// Add any object layer shutdown activities here.
closeStorageDisks(xl.getDisks())
closeLockers(xl.getLockers())
return nil
}

View File

@ -4,8 +4,8 @@
|Item|Specification|
|:---|:---|
|Maximum number of servers per cluster| 32|
|Maximum number of Federated clusters | Unlimited|
|Maximum number of servers per cluster| Unlimited|
|Maximum number of federated clusters | Unlimited|
|Minimum number of servers| 02|
|Maximum number of drives per server| Unlimited|
|Read quorum| N/2|
@ -39,7 +39,6 @@ We found the following APIs to be redundant or less useful outside of AWS S3. If
- BucketACL (Use [bucket policies](https://docs.min.io/docs/minio-client-complete-guide#policy) instead)
- BucketCORS (CORS enabled by default on all buckets for all HTTP verbs)
- BucketLifecycle (Not required for MinIO erasure coded backend)
- BucketReplication (Use [`mc mirror`](https://docs.min.io/docs/minio-client-complete-guide#mirror) instead)
- BucketVersions, BucketVersioning (Use [`s3git`](https://github.com/s3git/s3git))
- BucketWebsite (Use [`caddy`](https://github.com/mholt/caddy) or [`nginx`](https://www.nginx.com/resources/wiki/))

View File

@ -5,8 +5,8 @@
|项目|参数|
|:---|:---|
|最大驱动器数量|16|
|最小驱动器数量|4|
|最大驱动器数量|Unlimited|
|最小驱动器数量|Unlimited|
|读仲裁|N / 2|
|写仲裁|N / 2+1 |

1
go.mod
View File

@ -38,7 +38,6 @@ require (
github.com/lib/pq v1.0.0
github.com/miekg/dns v1.1.8
github.com/minio/cli v1.22.0
github.com/minio/dsync/v2 v2.0.0
github.com/minio/gokrb5/v7 v7.2.5
github.com/minio/hdfs/v3 v3.0.1
github.com/minio/highwayhash v1.0.0

3
pkg/dsync/.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
dsync.test
coverage.txt
*.out

440
pkg/dsync/drwmutex.go Normal file
View File

@ -0,0 +1,440 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dsync
import (
"context"
"fmt"
golog "log"
"math/rand"
"os"
"path"
"runtime"
"sync"
"time"
)
// Indicator if logging is enabled.
var dsyncLog bool
func init() {
// Check for MINIO_DSYNC_TRACE env variable, if set logging will be enabled for failed REST operations.
dsyncLog = os.Getenv("MINIO_DSYNC_TRACE") == "1"
rand.Seed(time.Now().UnixNano())
}
func log(msg ...interface{}) {
if dsyncLog {
golog.Println(msg...)
}
}
// DRWMutexAcquireTimeout - tolerance limit to wait for lock acquisition before.
const DRWMutexAcquireTimeout = 1 * time.Second // 1 second.
const drwMutexInfinite = time.Duration(1<<63 - 1)
// A DRWMutex is a distributed mutual exclusion lock.
type DRWMutex struct {
Name string
writeLocks []string // Array of nodes that granted a write lock
readersLocks [][]string // Array of array of nodes that granted reader locks
m sync.Mutex // Mutex to prevent multiple simultaneous locks from this node
clnt *Dsync
ctx context.Context
}
// Granted - represents a structure of a granted lock.
type Granted struct {
index int
lockUID string // Locked if set with UID string, unlocked if empty
}
func (g *Granted) isLocked() bool {
return isLocked(g.lockUID)
}
func isLocked(uid string) bool {
return len(uid) > 0
}
// NewDRWMutex - initializes a new dsync RW mutex.
func NewDRWMutex(ctx context.Context, name string, clnt *Dsync) *DRWMutex {
return &DRWMutex{
Name: name,
writeLocks: make([]string, clnt.dNodeCount),
clnt: clnt,
ctx: ctx,
}
}
// Lock holds a write lock on dm.
//
// If the lock is already in use, the calling go routine
// blocks until the mutex is available.
func (dm *DRWMutex) Lock(id, source string) {
isReadLock := false
dm.lockBlocking(drwMutexInfinite, id, source, isReadLock)
}
// GetLock tries to get a write lock on dm before the timeout elapses.
//
// If the lock is already in use, the calling go routine
// blocks until either the mutex becomes available and return success or
// more time has passed than the timeout value and return false.
func (dm *DRWMutex) GetLock(id, source string, timeout time.Duration) (locked bool) {
isReadLock := false
return dm.lockBlocking(timeout, id, source, isReadLock)
}
// RLock holds a read lock on dm.
//
// If one or more read locks are already in use, it will grant another lock.
// Otherwise the calling go routine blocks until the mutex is available.
func (dm *DRWMutex) RLock(id, source string) {
isReadLock := true
dm.lockBlocking(drwMutexInfinite, id, source, isReadLock)
}
// GetRLock tries to get a read lock on dm before the timeout elapses.
//
// If one or more read locks are already in use, it will grant another lock.
// Otherwise the calling go routine blocks until either the mutex becomes
// available and return success or more time has passed than the timeout
// value and return false.
func (dm *DRWMutex) GetRLock(id, source string, timeout time.Duration) (locked bool) {
isReadLock := true
return dm.lockBlocking(timeout, id, source, isReadLock)
}
// lockBlocking will try to acquire either a read or a write lock
//
// The function will loop using a built-in timing randomized back-off
// algorithm until either the lock is acquired successfully or more
// time has elapsed than the timeout value.
func (dm *DRWMutex) lockBlocking(timeout time.Duration, id, source string, isReadLock bool) (locked bool) {
doneCh, start := make(chan struct{}), time.Now().UTC()
defer close(doneCh)
// Use incremental back-off algorithm for repeated attempts to acquire the lock
for range newRetryTimerSimple(doneCh) {
select {
case <-dm.ctx.Done():
return
default:
}
// Create temp array on stack.
locks := make([]string, dm.clnt.dNodeCount)
// Try to acquire the lock.
success := lock(dm.clnt, &locks, dm.Name, id, source, isReadLock)
if success {
dm.m.Lock()
// If success, copy array to object
if isReadLock {
// Append new array of strings at the end
dm.readersLocks = append(dm.readersLocks, make([]string, dm.clnt.dNodeCount))
// and copy stack array into last spot
copy(dm.readersLocks[len(dm.readersLocks)-1], locks[:])
} else {
copy(dm.writeLocks, locks[:])
}
dm.m.Unlock()
return true
}
if time.Now().UTC().Sub(start) >= timeout { // Are we past the timeout?
break
}
// Failed to acquire the lock on this attempt, incrementally wait
// for a longer back-off time and try again afterwards.
}
return false
}
// lock tries to acquire the distributed lock, returning true or false.
func lock(ds *Dsync, locks *[]string, lockName, id, source string, isReadLock bool) bool {
// Create buffered channel of size equal to total number of nodes.
ch := make(chan Granted, ds.dNodeCount)
defer close(ch)
var wg sync.WaitGroup
for index, c := range ds.restClnts {
wg.Add(1)
// broadcast lock request to all nodes
go func(index int, isReadLock bool, c NetLocker) {
defer wg.Done()
args := LockArgs{
UID: id,
Resource: lockName,
Source: source,
}
var locked bool
var err error
if isReadLock {
if locked, err = c.RLock(args); err != nil {
log("Unable to call RLock", err)
}
} else {
if locked, err = c.Lock(args); err != nil {
log("Unable to call Lock", err)
}
}
g := Granted{index: index}
if locked {
g.lockUID = args.UID
}
ch <- g
}(index, isReadLock, c)
}
quorum := false
wg.Add(1)
go func(isReadLock bool) {
// Wait until we have either
//
// a) received all lock responses
// b) received too many 'non-'locks for quorum to be still possible
// c) time out
//
i, locksFailed := 0, 0
done := false
timeout := time.After(DRWMutexAcquireTimeout)
for ; i < ds.dNodeCount; i++ { // Loop until we acquired all locks
select {
case grant := <-ch:
if grant.isLocked() {
// Mark that this node has acquired the lock
(*locks)[grant.index] = grant.lockUID
} else {
locksFailed++
if !isReadLock && locksFailed > ds.dNodeCount-ds.dquorum ||
isReadLock && locksFailed > ds.dNodeCount-ds.dquorumReads {
// We know that we are not going to get the lock anymore,
// so exit out and release any locks that did get acquired
done = true
// Increment the number of grants received from the buffered channel.
i++
releaseAll(ds, locks, lockName, isReadLock)
}
}
case <-timeout:
done = true
// timeout happened, maybe one of the nodes is slow, count
// number of locks to check whether we have quorum or not
if !quorumMet(locks, isReadLock, ds.dquorum, ds.dquorumReads) {
releaseAll(ds, locks, lockName, isReadLock)
}
}
if done {
break
}
}
// Count locks in order to determine whether we have quorum or not
quorum = quorumMet(locks, isReadLock, ds.dquorum, ds.dquorumReads)
// Signal that we have the quorum
wg.Done()
// Wait for the other responses and immediately release the locks
// (do not add them to the locks array because the DRWMutex could
// already has been unlocked again by the original calling thread)
for ; i < ds.dNodeCount; i++ {
grantToBeReleased := <-ch
if grantToBeReleased.isLocked() {
// release lock
sendRelease(ds, ds.restClnts[grantToBeReleased.index], lockName, grantToBeReleased.lockUID, isReadLock)
}
}
}(isReadLock)
wg.Wait()
return quorum
}
// quorumMet determines whether we have acquired the required quorum of underlying locks or not
func quorumMet(locks *[]string, isReadLock bool, quorum, quorumReads int) bool {
count := 0
for _, uid := range *locks {
if isLocked(uid) {
count++
}
}
var metQuorum bool
if isReadLock {
metQuorum = count >= quorumReads
} else {
metQuorum = count >= quorum
}
return metQuorum
}
// releaseAll releases all locks that are marked as locked
func releaseAll(ds *Dsync, locks *[]string, lockName string, isReadLock bool) {
for lock := 0; lock < ds.dNodeCount; lock++ {
if isLocked((*locks)[lock]) {
sendRelease(ds, ds.restClnts[lock], lockName, (*locks)[lock], isReadLock)
(*locks)[lock] = ""
}
}
}
// Unlock unlocks the write lock.
//
// It is a run-time error if dm is not locked on entry to Unlock.
func (dm *DRWMutex) Unlock() {
// create temp array on stack
locks := make([]string, dm.clnt.dNodeCount)
{
dm.m.Lock()
defer dm.m.Unlock()
// Check if minimally a single bool is set in the writeLocks array
lockFound := false
for _, uid := range dm.writeLocks {
if isLocked(uid) {
lockFound = true
break
}
}
if !lockFound {
panic("Trying to Unlock() while no Lock() is active")
}
// Copy write locks to stack array
copy(locks, dm.writeLocks[:])
// Clear write locks array
dm.writeLocks = make([]string, dm.clnt.dNodeCount)
}
isReadLock := false
unlock(dm.clnt, locks, dm.Name, isReadLock)
}
// RUnlock releases a read lock held on dm.
//
// It is a run-time error if dm is not locked on entry to RUnlock.
func (dm *DRWMutex) RUnlock() {
// create temp array on stack
locks := make([]string, dm.clnt.dNodeCount)
{
dm.m.Lock()
defer dm.m.Unlock()
if len(dm.readersLocks) == 0 {
panic("Trying to RUnlock() while no RLock() is active")
}
// Copy out first element to release it first (FIFO)
copy(locks, dm.readersLocks[0][:])
// Drop first element from array
dm.readersLocks = dm.readersLocks[1:]
}
isReadLock := true
unlock(dm.clnt, locks, dm.Name, isReadLock)
}
func unlock(ds *Dsync, locks []string, name string, isReadLock bool) {
// We don't need to synchronously wait until we have released all the locks (or the quorum)
// (a subsequent lock will retry automatically in case it would fail to get quorum)
for index, c := range ds.restClnts {
if isLocked(locks[index]) {
// broadcast lock release to all nodes that granted the lock
sendRelease(ds, c, name, locks[index], isReadLock)
}
}
}
// sendRelease sends a release message to a node that previously granted a lock
func sendRelease(ds *Dsync, c NetLocker, name, uid string, isReadLock bool) {
args := LockArgs{
UID: uid,
Resource: name,
}
if isReadLock {
if _, err := c.RUnlock(args); err != nil {
log("Unable to call RUnlock", err)
}
} else {
if _, err := c.Unlock(args); err != nil {
log("Unable to call Unlock", err)
}
}
}
// DRLocker returns a sync.Locker interface that implements
// the Lock and Unlock methods by calling drw.RLock and drw.RUnlock.
func (dm *DRWMutex) DRLocker() sync.Locker {
return (*drlocker)(dm)
}
type drlocker DRWMutex
var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
func randString(n int) string {
b := make([]rune, n)
for i := range b {
b[i] = letterRunes[rand.Intn(len(letterRunes))]
}
return string(b)
}
func getSource() string {
var funcName string
pc, filename, lineNum, ok := runtime.Caller(2)
if ok {
filename = path.Base(filename)
funcName = runtime.FuncForPC(pc).Name()
} else {
filename = "<unknown>"
lineNum = 0
}
return fmt.Sprintf("[%s:%d:%s()]", filename, lineNum, funcName)
}
func (dr *drlocker) Lock() { (*DRWMutex)(dr).RLock(randString(16), getSource()) }
func (dr *drlocker) Unlock() { (*DRWMutex)(dr).RUnlock() }

383
pkg/dsync/drwmutex_test.go Normal file
View File

@ -0,0 +1,383 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// GOMAXPROCS=10 go test
package dsync_test
import (
"context"
"fmt"
"runtime"
"sync"
"sync/atomic"
"testing"
"time"
. "github.com/minio/minio/pkg/dsync"
)
const (
id = "1234-5678"
source = "main.go"
)
func testSimpleWriteLock(t *testing.T, duration time.Duration) (locked bool) {
drwm := NewDRWMutex(context.Background(), "simplelock", ds)
if !drwm.GetRLock(id, source, time.Second) {
panic("Failed to acquire read lock")
}
// fmt.Println("1st read lock acquired, waiting...")
if !drwm.GetRLock(id, source, time.Second) {
panic("Failed to acquire read lock")
}
// fmt.Println("2nd read lock acquired, waiting...")
go func() {
time.Sleep(2 * time.Second)
drwm.RUnlock()
// fmt.Println("1st read lock released, waiting...")
}()
go func() {
time.Sleep(3 * time.Second)
drwm.RUnlock()
// fmt.Println("2nd read lock released, waiting...")
}()
// fmt.Println("Trying to acquire write lock, waiting...")
locked = drwm.GetLock(id, source, duration)
if locked {
// fmt.Println("Write lock acquired, waiting...")
time.Sleep(time.Second)
drwm.Unlock()
}
// fmt.Println("Write lock failed due to timeout")
return
}
func TestSimpleWriteLockAcquired(t *testing.T) {
locked := testSimpleWriteLock(t, 5*time.Second)
expected := true
if locked != expected {
t.Errorf("TestSimpleWriteLockAcquired(): \nexpected %#v\ngot %#v", expected, locked)
}
}
func TestSimpleWriteLockTimedOut(t *testing.T) {
locked := testSimpleWriteLock(t, time.Second)
expected := false
if locked != expected {
t.Errorf("TestSimpleWriteLockTimedOut(): \nexpected %#v\ngot %#v", expected, locked)
}
}
func testDualWriteLock(t *testing.T, duration time.Duration) (locked bool) {
drwm := NewDRWMutex(context.Background(), "duallock", ds)
// fmt.Println("Getting initial write lock")
if !drwm.GetLock(id, source, time.Second) {
panic("Failed to acquire initial write lock")
}
go func() {
time.Sleep(2 * time.Second)
drwm.Unlock()
// fmt.Println("Initial write lock released, waiting...")
}()
// fmt.Println("Trying to acquire 2nd write lock, waiting...")
locked = drwm.GetLock(id, source, duration)
if locked {
// fmt.Println("2nd write lock acquired, waiting...")
time.Sleep(time.Second)
drwm.Unlock()
}
// fmt.Println("2nd write lock failed due to timeout")
return
}
func TestDualWriteLockAcquired(t *testing.T) {
locked := testDualWriteLock(t, 5*time.Second)
expected := true
if locked != expected {
t.Errorf("TestDualWriteLockAcquired(): \nexpected %#v\ngot %#v", expected, locked)
}
}
func TestDualWriteLockTimedOut(t *testing.T) {
locked := testDualWriteLock(t, time.Second)
expected := false
if locked != expected {
t.Errorf("TestDualWriteLockTimedOut(): \nexpected %#v\ngot %#v", expected, locked)
}
}
// Test cases below are copied 1 to 1 from sync/rwmutex_test.go (adapted to use DRWMutex)
// Borrowed from rwmutex_test.go
func parallelReader(m *DRWMutex, clocked, cunlock, cdone chan bool) {
if m.GetRLock(id, source, time.Second) {
clocked <- true
<-cunlock
m.RUnlock()
cdone <- true
}
}
// Borrowed from rwmutex_test.go
func doTestParallelReaders(numReaders, gomaxprocs int) {
runtime.GOMAXPROCS(gomaxprocs)
m := NewDRWMutex(context.Background(), "test-parallel", ds)
clocked := make(chan bool)
cunlock := make(chan bool)
cdone := make(chan bool)
for i := 0; i < numReaders; i++ {
go parallelReader(m, clocked, cunlock, cdone)
}
// Wait for all parallel RLock()s to succeed.
for i := 0; i < numReaders; i++ {
<-clocked
}
for i := 0; i < numReaders; i++ {
cunlock <- true
}
// Wait for the goroutines to finish.
for i := 0; i < numReaders; i++ {
<-cdone
}
}
// Borrowed from rwmutex_test.go
func TestParallelReaders(t *testing.T) {
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(-1))
doTestParallelReaders(1, 4)
doTestParallelReaders(3, 4)
doTestParallelReaders(4, 2)
}
// Borrowed from rwmutex_test.go
func reader(rwm *DRWMutex, numIterations int, activity *int32, cdone chan bool) {
for i := 0; i < numIterations; i++ {
if rwm.GetRLock(id, source, time.Second) {
n := atomic.AddInt32(activity, 1)
if n < 1 || n >= 10000 {
panic(fmt.Sprintf("wlock(%d)\n", n))
}
for i := 0; i < 100; i++ {
}
atomic.AddInt32(activity, -1)
rwm.RUnlock()
}
}
cdone <- true
}
// Borrowed from rwmutex_test.go
func writer(rwm *DRWMutex, numIterations int, activity *int32, cdone chan bool) {
for i := 0; i < numIterations; i++ {
if rwm.GetLock(id, source, time.Second) {
n := atomic.AddInt32(activity, 10000)
if n != 10000 {
panic(fmt.Sprintf("wlock(%d)\n", n))
}
for i := 0; i < 100; i++ {
}
atomic.AddInt32(activity, -10000)
rwm.Unlock()
}
}
cdone <- true
}
// Borrowed from rwmutex_test.go
func HammerRWMutex(gomaxprocs, numReaders, numIterations int) {
runtime.GOMAXPROCS(gomaxprocs)
// Number of active readers + 10000 * number of active writers.
var activity int32
rwm := NewDRWMutex(context.Background(), "test", ds)
cdone := make(chan bool)
go writer(rwm, numIterations, &activity, cdone)
var i int
for i = 0; i < numReaders/2; i++ {
go reader(rwm, numIterations, &activity, cdone)
}
go writer(rwm, numIterations, &activity, cdone)
for ; i < numReaders; i++ {
go reader(rwm, numIterations, &activity, cdone)
}
// Wait for the 2 writers and all readers to finish.
for i := 0; i < 2+numReaders; i++ {
<-cdone
}
}
// Borrowed from rwmutex_test.go
func TestRWMutex(t *testing.T) {
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(-1))
n := 1000
if testing.Short() {
n = 5
}
HammerRWMutex(1, 1, n)
HammerRWMutex(1, 3, n)
HammerRWMutex(1, 10, n)
HammerRWMutex(4, 1, n)
HammerRWMutex(4, 3, n)
HammerRWMutex(4, 10, n)
HammerRWMutex(10, 1, n)
HammerRWMutex(10, 3, n)
HammerRWMutex(10, 10, n)
HammerRWMutex(10, 5, n)
}
// Borrowed from rwmutex_test.go
func TestDRLocker(t *testing.T) {
wl := NewDRWMutex(context.Background(), "test", ds)
var rl sync.Locker
wlocked := make(chan bool, 1)
rlocked := make(chan bool, 1)
rl = wl.DRLocker()
n := 10
go func() {
for i := 0; i < n; i++ {
rl.Lock()
rl.Lock()
rlocked <- true
wl.Lock(id, source)
wlocked <- true
}
}()
for i := 0; i < n; i++ {
<-rlocked
rl.Unlock()
select {
case <-wlocked:
t.Fatal("RLocker() didn't read-lock it")
default:
}
rl.Unlock()
<-wlocked
select {
case <-rlocked:
t.Fatal("RLocker() didn't respect the write lock")
default:
}
wl.Unlock()
}
}
// Borrowed from rwmutex_test.go
func TestUnlockPanic(t *testing.T) {
defer func() {
if recover() == nil {
t.Fatalf("unlock of unlocked RWMutex did not panic")
}
}()
mu := NewDRWMutex(context.Background(), "test", ds)
mu.Unlock()
}
// Borrowed from rwmutex_test.go
func TestUnlockPanic2(t *testing.T) {
defer func() {
if recover() == nil {
t.Fatalf("unlock of unlocked RWMutex did not panic")
}
}()
mu := NewDRWMutex(context.Background(), "test-unlock-panic-2", ds)
mu.RLock(id, source)
mu.Unlock()
}
// Borrowed from rwmutex_test.go
func TestRUnlockPanic(t *testing.T) {
defer func() {
if recover() == nil {
t.Fatalf("read unlock of unlocked RWMutex did not panic")
}
}()
mu := NewDRWMutex(context.Background(), "test", ds)
mu.RUnlock()
}
// Borrowed from rwmutex_test.go
func TestRUnlockPanic2(t *testing.T) {
defer func() {
if recover() == nil {
t.Fatalf("read unlock of unlocked RWMutex did not panic")
}
}()
mu := NewDRWMutex(context.Background(), "test-runlock-panic-2", ds)
mu.Lock(id, source)
mu.RUnlock()
}
// Borrowed from rwmutex_test.go
func benchmarkRWMutex(b *testing.B, localWork, writeRatio int) {
rwm := NewDRWMutex(context.Background(), "test", ds)
b.RunParallel(func(pb *testing.PB) {
foo := 0
for pb.Next() {
foo++
if foo%writeRatio == 0 {
rwm.Lock(id, source)
rwm.Unlock()
} else {
rwm.RLock(id, source)
for i := 0; i != localWork; i += 1 {
foo *= 2
foo /= 2
}
rwm.RUnlock()
}
}
_ = foo
})
}
// Borrowed from rwmutex_test.go
func BenchmarkRWMutexWrite100(b *testing.B) {
benchmarkRWMutex(b, 0, 100)
}
// Borrowed from rwmutex_test.go
func BenchmarkRWMutexWrite10(b *testing.B) {
benchmarkRWMutex(b, 0, 10)
}
// Borrowed from rwmutex_test.go
func BenchmarkRWMutexWorkWrite100(b *testing.B) {
benchmarkRWMutex(b, 100, 100)
}
// Borrowed from rwmutex_test.go
func BenchmarkRWMutexWorkWrite10(b *testing.B) {
benchmarkRWMutex(b, 100, 10)
}

View File

@ -0,0 +1,103 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dsync_test
import (
"fmt"
"sync"
. "github.com/minio/minio/pkg/dsync"
)
const WriteLock = -1
type lockServer struct {
mutex sync.Mutex
// Map of locks, with negative value indicating (exclusive) write lock
// and positive values indicating number of read locks
lockMap map[string]int64
}
func (l *lockServer) Lock(args *LockArgs, reply *bool) error {
l.mutex.Lock()
defer l.mutex.Unlock()
if _, *reply = l.lockMap[args.Resource]; !*reply {
l.lockMap[args.Resource] = WriteLock // No locks held on the given name, so claim write lock
}
*reply = !*reply // Negate *reply to return true when lock is granted or false otherwise
return nil
}
func (l *lockServer) Unlock(args *LockArgs, reply *bool) error {
l.mutex.Lock()
defer l.mutex.Unlock()
var locksHeld int64
if locksHeld, *reply = l.lockMap[args.Resource]; !*reply { // No lock is held on the given name
return fmt.Errorf("Unlock attempted on an unlocked entity: %s", args.Resource)
}
if *reply = locksHeld == WriteLock; !*reply { // Unless it is a write lock
return fmt.Errorf("Unlock attempted on a read locked entity: %s (%d read locks active)", args.Resource, locksHeld)
}
delete(l.lockMap, args.Resource) // Remove the write lock
return nil
}
const ReadLock = 1
func (l *lockServer) RLock(args *LockArgs, reply *bool) error {
l.mutex.Lock()
defer l.mutex.Unlock()
var locksHeld int64
if locksHeld, *reply = l.lockMap[args.Resource]; !*reply {
l.lockMap[args.Resource] = ReadLock // No locks held on the given name, so claim (first) read lock
*reply = true
} else {
if *reply = locksHeld != WriteLock; *reply { // Unless there is a write lock
l.lockMap[args.Resource] = locksHeld + ReadLock // Grant another read lock
}
}
return nil
}
func (l *lockServer) RUnlock(args *LockArgs, reply *bool) error {
l.mutex.Lock()
defer l.mutex.Unlock()
var locksHeld int64
if locksHeld, *reply = l.lockMap[args.Resource]; !*reply { // No lock is held on the given name
return fmt.Errorf("RUnlock attempted on an unlocked entity: %s", args.Resource)
}
if *reply = locksHeld != WriteLock; !*reply { // A write-lock is held, cannot release a read lock
return fmt.Errorf("RUnlock attempted on a write locked entity: %s", args.Resource)
}
if locksHeld > ReadLock {
l.lockMap[args.Resource] = locksHeld - ReadLock // Remove one of the read locks held
} else {
delete(l.lockMap, args.Resource) // Remove the (last) read lock
}
return nil
}
func (l *lockServer) ForceUnlock(args *LockArgs, reply *bool) error {
l.mutex.Lock()
defer l.mutex.Unlock()
if len(args.UID) != 0 {
return fmt.Errorf("ForceUnlock called with non-empty UID: %s", args.UID)
}
delete(l.lockMap, args.Resource) // Remove the lock (irrespective of write or read lock)
*reply = true
return nil
}

60
pkg/dsync/dsync.go Normal file
View File

@ -0,0 +1,60 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dsync
import (
"errors"
"math"
)
// Dsync represents dsync client object which is initialized with
// authenticated clients, used to initiate lock REST calls.
type Dsync struct {
// Number of nodes participating in the distributed locking.
dNodeCount int
// List of rest client objects, one per lock server.
restClnts []NetLocker
// Simple majority based quorum, set to dNodeCount/2+1
dquorum int
// Simple quorum for read operations, set to dNodeCount/2
dquorumReads int
}
// New - initializes a new dsync object with input restClnts.
func New(restClnts []NetLocker) (*Dsync, error) {
if len(restClnts) < 2 {
return nil, errors.New("Dsync is not designed for less than 2 nodes")
} else if len(restClnts) > 32 {
return nil, errors.New("Dsync is not designed for more than 32 nodes")
}
ds := &Dsync{}
ds.dNodeCount = len(restClnts)
// With odd number of nodes, write and read quorum is basically the same
ds.dquorum = int(ds.dNodeCount/2) + 1
ds.dquorumReads = int(math.Ceil(float64(ds.dNodeCount) / 2.0))
// Initialize node name and rest path for each NetLocker object.
ds.restClnts = make([]NetLocker, ds.dNodeCount)
copy(ds.restClnts, restClnts)
return ds, nil
}

View File

@ -0,0 +1,58 @@
/*
* Minio Cloud Storage, (C) 2018 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// GOMAXPROCS=10 go test
package dsync
import "testing"
// Tests dsync.New
func TestNew(t *testing.T) {
nclnts := make([]NetLocker, 33)
if _, err := New(nclnts); err == nil {
t.Fatal("Should have failed")
}
nclnts = make([]NetLocker, 1)
if _, err := New(nclnts); err == nil {
t.Fatal("Should have failed")
}
nclnts = make([]NetLocker, 2)
nds, err := New(nclnts)
if err != nil {
t.Fatal("Should pass", err)
}
if nds.dquorumReads != 1 {
t.Fatalf("Unexpected read quorum values expected 1, got %d", nds.dquorumReads)
}
if nds.dquorum != 2 {
t.Fatalf("Unexpected quorum values expected 2, got %d", nds.dquorum)
}
nclnts = make([]NetLocker, 3)
nds, err = New(nclnts)
if err != nil {
t.Fatal("Should pass", err)
}
if nds.dquorumReads != nds.dquorum {
t.Fatalf("Unexpected quorum values for odd nodes we expect read %d and write %d quorum to be same", nds.dquorumReads, nds.dquorum)
}
}

361
pkg/dsync/dsync_test.go Normal file
View File

@ -0,0 +1,361 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// GOMAXPROCS=10 go test
package dsync_test
import (
"context"
"fmt"
"log"
"math/rand"
"net"
"net/http"
"net/rpc"
"os"
"strconv"
"sync"
"testing"
"time"
. "github.com/minio/minio/pkg/dsync"
)
var ds *Dsync
var rpcPaths []string // list of rpc paths where lock server is serving.
func startRPCServers(nodes []string) {
for i := range nodes {
server := rpc.NewServer()
server.RegisterName("Dsync", &lockServer{
mutex: sync.Mutex{},
lockMap: make(map[string]int64),
})
// For some reason the registration paths need to be different (even for different server objs)
server.HandleHTTP(rpcPaths[i], fmt.Sprintf("%s-debug", rpcPaths[i]))
l, e := net.Listen("tcp", ":"+strconv.Itoa(i+12345))
if e != nil {
log.Fatal("listen error:", e)
}
go http.Serve(l, nil)
}
// Let servers start
time.Sleep(10 * time.Millisecond)
}
// TestMain initializes the testing framework
func TestMain(m *testing.M) {
const rpcPath = "/dsync"
rand.Seed(time.Now().UTC().UnixNano())
nodes := make([]string, 4) // list of node IP addrs or hostname with ports.
for i := range nodes {
nodes[i] = fmt.Sprintf("127.0.0.1:%d", i+12345)
}
for i := range nodes {
rpcPaths = append(rpcPaths, rpcPath+"-"+strconv.Itoa(i))
}
// Initialize net/rpc clients for dsync.
var clnts []NetLocker
for i := 0; i < len(nodes); i++ {
clnts = append(clnts, newClient(nodes[i], rpcPaths[i]))
}
var err error
ds, err = New(clnts)
if err != nil {
log.Fatalf("set nodes failed with %v", err)
}
startRPCServers(nodes)
os.Exit(m.Run())
}
func TestSimpleLock(t *testing.T) {
dm := NewDRWMutex(context.Background(), "test", ds)
dm.Lock(id, source)
// fmt.Println("Lock acquired, waiting...")
time.Sleep(2500 * time.Millisecond)
dm.Unlock()
}
func TestSimpleLockUnlockMultipleTimes(t *testing.T) {
dm := NewDRWMutex(context.Background(), "test", ds)
dm.Lock(id, source)
time.Sleep(time.Duration(10+(rand.Float32()*50)) * time.Millisecond)
dm.Unlock()
dm.Lock(id, source)
time.Sleep(time.Duration(10+(rand.Float32()*50)) * time.Millisecond)
dm.Unlock()
dm.Lock(id, source)
time.Sleep(time.Duration(10+(rand.Float32()*50)) * time.Millisecond)
dm.Unlock()
dm.Lock(id, source)
time.Sleep(time.Duration(10+(rand.Float32()*50)) * time.Millisecond)
dm.Unlock()
dm.Lock(id, source)
time.Sleep(time.Duration(10+(rand.Float32()*50)) * time.Millisecond)
dm.Unlock()
}
// Test two locks for same resource, one succeeds, one fails (after timeout)
func TestTwoSimultaneousLocksForSameResource(t *testing.T) {
dm1st := NewDRWMutex(context.Background(), "aap", ds)
dm2nd := NewDRWMutex(context.Background(), "aap", ds)
dm1st.Lock(id, source)
// Release lock after 10 seconds
go func() {
time.Sleep(10 * time.Second)
// fmt.Println("Unlocking dm1")
dm1st.Unlock()
}()
dm2nd.Lock(id, source)
// fmt.Printf("2nd lock obtained after 1st lock is released\n")
time.Sleep(2500 * time.Millisecond)
dm2nd.Unlock()
}
// Test three locks for same resource, one succeeds, one fails (after timeout)
func TestThreeSimultaneousLocksForSameResource(t *testing.T) {
dm1st := NewDRWMutex(context.Background(), "aap", ds)
dm2nd := NewDRWMutex(context.Background(), "aap", ds)
dm3rd := NewDRWMutex(context.Background(), "aap", ds)
dm1st.Lock(id, source)
// Release lock after 10 seconds
go func() {
time.Sleep(10 * time.Second)
// fmt.Println("Unlocking dm1")
dm1st.Unlock()
}()
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
dm2nd.Lock(id, source)
// Release lock after 10 seconds
go func() {
time.Sleep(2500 * time.Millisecond)
// fmt.Println("Unlocking dm2")
dm2nd.Unlock()
}()
dm3rd.Lock(id, source)
// fmt.Printf("3rd lock obtained after 1st & 2nd locks are released\n")
time.Sleep(2500 * time.Millisecond)
dm3rd.Unlock()
}()
go func() {
defer wg.Done()
dm3rd.Lock(id, source)
// Release lock after 10 seconds
go func() {
time.Sleep(2500 * time.Millisecond)
// fmt.Println("Unlocking dm3")
dm3rd.Unlock()
}()
dm2nd.Lock(id, source)
// fmt.Printf("2nd lock obtained after 1st & 3rd locks are released\n")
time.Sleep(2500 * time.Millisecond)
dm2nd.Unlock()
}()
wg.Wait()
}
// Test two locks for different resources, both succeed
func TestTwoSimultaneousLocksForDifferentResources(t *testing.T) {
dm1 := NewDRWMutex(context.Background(), "aap", ds)
dm2 := NewDRWMutex(context.Background(), "noot", ds)
dm1.Lock(id, source)
dm2.Lock(id, source)
// fmt.Println("Both locks acquired, waiting...")
time.Sleep(2500 * time.Millisecond)
dm1.Unlock()
dm2.Unlock()
time.Sleep(10 * time.Millisecond)
}
// Borrowed from mutex_test.go
func HammerMutex(m *DRWMutex, loops int, cdone chan bool) {
for i := 0; i < loops; i++ {
m.Lock(id, source)
m.Unlock()
}
cdone <- true
}
// Borrowed from mutex_test.go
func TestMutex(t *testing.T) {
c := make(chan bool)
m := NewDRWMutex(context.Background(), "test", ds)
for i := 0; i < 10; i++ {
go HammerMutex(m, 1000, c)
}
for i := 0; i < 10; i++ {
<-c
}
}
func BenchmarkMutexUncontended(b *testing.B) {
type PaddedMutex struct {
DRWMutex
pad [128]uint8
}
b.RunParallel(func(pb *testing.PB) {
var mu PaddedMutex
for pb.Next() {
mu.Lock(id, source)
mu.Unlock()
}
})
}
func benchmarkMutex(b *testing.B, slack, work bool) {
mu := NewDRWMutex(context.Background(), "", ds)
if slack {
b.SetParallelism(10)
}
b.RunParallel(func(pb *testing.PB) {
foo := 0
for pb.Next() {
mu.Lock(id, source)
mu.Unlock()
if work {
for i := 0; i < 100; i++ {
foo *= 2
foo /= 2
}
}
}
_ = foo
})
}
func BenchmarkMutex(b *testing.B) {
benchmarkMutex(b, false, false)
}
func BenchmarkMutexSlack(b *testing.B) {
benchmarkMutex(b, true, false)
}
func BenchmarkMutexWork(b *testing.B) {
benchmarkMutex(b, false, true)
}
func BenchmarkMutexWorkSlack(b *testing.B) {
benchmarkMutex(b, true, true)
}
func BenchmarkMutexNoSpin(b *testing.B) {
// This benchmark models a situation where spinning in the mutex should be
// non-profitable and allows to confirm that spinning does not do harm.
// To achieve this we create excess of goroutines most of which do local work.
// These goroutines yield during local work, so that switching from
// a blocked goroutine to other goroutines is profitable.
// As a matter of fact, this benchmark still triggers some spinning in the mutex.
m := NewDRWMutex(context.Background(), "", ds)
var acc0, acc1 uint64
b.SetParallelism(4)
b.RunParallel(func(pb *testing.PB) {
c := make(chan bool)
var data [4 << 10]uint64
for i := 0; pb.Next(); i++ {
if i%4 == 0 {
m.Lock(id, source)
acc0 -= 100
acc1 += 100
m.Unlock()
} else {
for i := 0; i < len(data); i += 4 {
data[i]++
}
// Elaborate way to say runtime.Gosched
// that does not put the goroutine onto global runq.
go func() {
c <- true
}()
<-c
}
}
})
}
func BenchmarkMutexSpin(b *testing.B) {
// This benchmark models a situation where spinning in the mutex should be
// profitable. To achieve this we create a goroutine per-proc.
// These goroutines access considerable amount of local data so that
// unnecessary rescheduling is penalized by cache misses.
m := NewDRWMutex(context.Background(), "", ds)
var acc0, acc1 uint64
b.RunParallel(func(pb *testing.PB) {
var data [16 << 10]uint64
for i := 0; pb.Next(); i++ {
m.Lock(id, source)
acc0 -= 100
acc1 += 100
m.Unlock()
for i := 0; i < len(data); i += 4 {
data[i]++
}
}
})
}

142
pkg/dsync/retry.go Normal file
View File

@ -0,0 +1,142 @@
/*
* Minio Cloud Storage, (C) 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dsync
import (
"math/rand"
"sync"
"time"
)
// lockedRandSource provides protected rand source, implements rand.Source interface.
type lockedRandSource struct {
lk sync.Mutex
src rand.Source
}
// Int63 returns a non-negative pseudo-random 63-bit integer as an
// int64.
func (r *lockedRandSource) Int63() (n int64) {
r.lk.Lock()
n = r.src.Int63()
r.lk.Unlock()
return
}
// Seed uses the provided seed value to initialize the generator to a
// deterministic state.
func (r *lockedRandSource) Seed(seed int64) {
r.lk.Lock()
r.src.Seed(seed)
r.lk.Unlock()
}
// MaxJitter will randomize over the full exponential backoff time
const MaxJitter = 1.0
// NoJitter disables the use of jitter for randomizing the
// exponential backoff time
const NoJitter = 0.0
// Global random source for fetching random values.
var globalRandomSource = rand.New(&lockedRandSource{
src: rand.NewSource(time.Now().UTC().UnixNano()),
})
// newRetryTimerJitter creates a timer with exponentially increasing delays
// until the maximum retry attempts are reached. - this function is a fully
// configurable version, meant for only advanced use cases. For the most part
// one should use newRetryTimerSimple and newRetryTimer.
func newRetryTimerWithJitter(unit time.Duration, cap time.Duration, jitter float64, doneCh <-chan struct{}) <-chan int {
attemptCh := make(chan int)
// normalize jitter to the range [0, 1.0]
if jitter < NoJitter {
jitter = NoJitter
}
if jitter > MaxJitter {
jitter = MaxJitter
}
// computes the exponential backoff duration according to
// https://www.awsarchitectureblog.com/2015/03/backoff.html
exponentialBackoffWait := func(attempt int) time.Duration {
// 1<<uint(attempt) below could overflow, so limit the value of attempt
maxAttempt := 30
if attempt > maxAttempt {
attempt = maxAttempt
}
//sleep = random_between(0, min(cap, base * 2 ** attempt))
sleep := unit * time.Duration(1<<uint(attempt))
if sleep > cap {
sleep = cap
}
if jitter != NoJitter {
sleep -= time.Duration(globalRandomSource.Float64() * float64(sleep) * jitter)
}
return sleep
}
go func() {
defer close(attemptCh)
nextBackoff := 0
// Channel used to signal after the expiry of backoff wait seconds.
var timer *time.Timer
for {
select { // Attempts starts.
case attemptCh <- nextBackoff:
nextBackoff++
case <-doneCh:
// Stop the routine.
return
}
timer = time.NewTimer(exponentialBackoffWait(nextBackoff))
// wait till next backoff time or till doneCh gets a message.
select {
case <-timer.C:
case <-doneCh:
// stop the timer and return.
timer.Stop()
return
}
}
}()
// Start reading..
return attemptCh
}
// Default retry constants.
const (
defaultRetryUnit = time.Second // 1 second.
defaultRetryCap = 1 * time.Second // 1 second.
)
// newRetryTimer creates a timer with exponentially increasing delays
// until the maximum retry attempts are reached. - this function provides
// resulting retry values to be of maximum jitter.
func newRetryTimer(unit time.Duration, cap time.Duration, doneCh <-chan struct{}) <-chan int {
return newRetryTimerWithJitter(unit, cap, MaxJitter, doneCh)
}
// newRetryTimerSimple creates a timer with exponentially increasing delays
// until the maximum retry attempts are reached. - this function is a
// simpler version with all default values.
func newRetryTimerSimple(doneCh <-chan struct{}) <-chan int {
return newRetryTimerWithJitter(defaultRetryUnit, defaultRetryCap, MaxJitter, doneCh)
}

82
pkg/dsync/retry_test.go Normal file
View File

@ -0,0 +1,82 @@
/*
* Minio Cloud Storage, (C) 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dsync
import (
"testing"
"time"
)
// Tests for retry timer.
func TestRetryTimerSimple(t *testing.T) {
doneCh := make(chan struct{})
attemptCh := newRetryTimerSimple(doneCh)
i := <-attemptCh
if i != 0 {
close(doneCh)
t.Fatalf("Invalid attempt counter returned should be 0, found %d instead", i)
}
i = <-attemptCh
if i <= 0 {
close(doneCh)
t.Fatalf("Invalid attempt counter returned should be greater than 0, found %d instead", i)
}
close(doneCh)
_, ok := <-attemptCh
if ok {
t.Fatal("Attempt counter should be closed")
}
}
// Test retry time with no jitter.
func TestRetryTimerWithNoJitter(t *testing.T) {
doneCh := make(chan struct{})
// No jitter
attemptCh := newRetryTimerWithJitter(time.Millisecond, 5*time.Millisecond, NoJitter, doneCh)
i := <-attemptCh
if i != 0 {
close(doneCh)
t.Fatalf("Invalid attempt counter returned should be 0, found %d instead", i)
}
// Loop through the maximum possible attempt.
for i = range attemptCh {
if i == 30 {
close(doneCh)
}
}
_, ok := <-attemptCh
if ok {
t.Fatal("Attempt counter should be closed")
}
}
// Test retry time with Jitter greater than MaxJitter.
func TestRetryTimerWithJitter(t *testing.T) {
doneCh := make(chan struct{})
// Jitter will be set back to 1.0
attemptCh := newRetryTimerWithJitter(time.Second, 30*time.Second, 2.0, doneCh)
i := <-attemptCh
if i != 0 {
close(doneCh)
t.Fatalf("Invalid attempt counter returned should be 0, found %d instead", i)
}
close(doneCh)
_, ok := <-attemptCh
if ok {
t.Fatal("Attempt counter should be closed")
}
}

View File

@ -0,0 +1,111 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dsync_test
import (
"net/rpc"
"sync"
. "github.com/minio/minio/pkg/dsync"
)
// ReconnectRPCClient is a wrapper type for rpc.Client which provides reconnect on first failure.
type ReconnectRPCClient struct {
mutex sync.Mutex
rpc *rpc.Client
addr string
endpoint string
}
// newClient constructs a ReconnectRPCClient object with addr and endpoint initialized.
// It _doesn't_ connect to the remote endpoint. See Call method to see when the
// connect happens.
func newClient(addr, endpoint string) NetLocker {
return &ReconnectRPCClient{
addr: addr,
endpoint: endpoint,
}
}
// Close closes the underlying socket file descriptor.
func (rpcClient *ReconnectRPCClient) Close() error {
rpcClient.mutex.Lock()
defer rpcClient.mutex.Unlock()
// If rpc client has not connected yet there is nothing to close.
if rpcClient.rpc == nil {
return nil
}
// Reset rpcClient.rpc to allow for subsequent calls to use a new
// (socket) connection.
clnt := rpcClient.rpc
rpcClient.rpc = nil
return clnt.Close()
}
// Call makes a RPC call to the remote endpoint using the default codec, namely encoding/gob.
func (rpcClient *ReconnectRPCClient) Call(serviceMethod string, args interface{}, reply interface{}) (err error) {
rpcClient.mutex.Lock()
defer rpcClient.mutex.Unlock()
dialCall := func() error {
// If the rpc.Client is nil, we attempt to (re)connect with the remote endpoint.
if rpcClient.rpc == nil {
clnt, derr := rpc.DialHTTPPath("tcp", rpcClient.addr, rpcClient.endpoint)
if derr != nil {
return derr
}
rpcClient.rpc = clnt
}
// If the RPC fails due to a network-related error, then we reset
// rpc.Client for a subsequent reconnect.
return rpcClient.rpc.Call(serviceMethod, args, reply)
}
if err = dialCall(); err == rpc.ErrShutdown {
rpcClient.rpc.Close()
rpcClient.rpc = nil
err = dialCall()
}
return err
}
func (rpcClient *ReconnectRPCClient) RLock(args LockArgs) (status bool, err error) {
err = rpcClient.Call("Dsync.RLock", &args, &status)
return status, err
}
func (rpcClient *ReconnectRPCClient) Lock(args LockArgs) (status bool, err error) {
err = rpcClient.Call("Dsync.Lock", &args, &status)
return status, err
}
func (rpcClient *ReconnectRPCClient) RUnlock(args LockArgs) (status bool, err error) {
err = rpcClient.Call("Dsync.RUnlock", &args, &status)
return status, err
}
func (rpcClient *ReconnectRPCClient) Unlock(args LockArgs) (status bool, err error) {
err = rpcClient.Call("Dsync.Unlock", &args, &status)
return status, err
}
func (rpcClient *ReconnectRPCClient) ForceUnlock(args LockArgs) (status bool, err error) {
err = rpcClient.Call("Dsync.ForceUnlock", &args, &status)
return status, err
}
func (rpcClient *ReconnectRPCClient) String() string {
return "http://" + rpcClient.addr + "/" + rpcClient.endpoint
}

View File

@ -0,0 +1,59 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dsync
// LockArgs is minimal required values for any dsync compatible lock operation.
type LockArgs struct {
// Unique ID of lock/unlock request.
UID string
// Resource contains a entity to be locked/unlocked.
Resource string
// Source contains the line number, function and file name of the code
// on the client node that requested the lock.
Source string
}
// NetLocker is dsync compatible locker interface.
type NetLocker interface {
// Do read lock for given LockArgs. It should return
// * a boolean to indicate success/failure of the operation
// * an error on failure of lock request operation.
RLock(args LockArgs) (bool, error)
// Do write lock for given LockArgs. It should return
// * a boolean to indicate success/failure of the operation
// * an error on failure of lock request operation.
Lock(args LockArgs) (bool, error)
// Do read unlock for given LockArgs. It should return
// * a boolean to indicate success/failure of the operation
// * an error on failure of unlock request operation.
RUnlock(args LockArgs) (bool, error)
// Do write unlock for given LockArgs. It should return
// * a boolean to indicate success/failure of the operation
// * an error on failure of unlock request operation.
Unlock(args LockArgs) (bool, error)
// Returns underlying endpoint of this lock client instance.
String() string
// Close closes any underlying connection to the service endpoint
Close() error
}