mirror of
https://github.com/minio/minio.git
synced 2025-11-10 14:09:48 -05:00
fix: Speed up multi-object delete by taking bulk locks (#8974)
Change distributed locking to allow taking bulk locks
across objects, reduces usually 1000 calls to 1.
Also allows for situations where multiple clients sends
delete requests to objects with following names
```
{1,2,3,4,5}
```
```
{5,4,3,2,1}
```
will block and ensure that we do not fail the request
on each other.
This commit is contained in:
@@ -356,25 +356,12 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
|
||||
return
|
||||
}
|
||||
|
||||
// Allocate incoming content length bytes.
|
||||
var deleteXMLBytes []byte
|
||||
const maxBodySize = 2 * 1000 * 1024 // The max. XML contains 1000 object names (each at most 1024 bytes long) + XML overhead
|
||||
if r.ContentLength > maxBodySize { // Only allocated memory for at most 1000 objects
|
||||
deleteXMLBytes = make([]byte, maxBodySize)
|
||||
} else {
|
||||
deleteXMLBytes = make([]byte, r.ContentLength)
|
||||
}
|
||||
|
||||
// Read incoming body XML bytes.
|
||||
if _, err := io.ReadFull(r.Body, deleteXMLBytes); err != nil {
|
||||
logger.LogIf(ctx, err, logger.Application)
|
||||
writeErrorResponse(ctx, w, toAdminAPIErr(ctx, err), r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
}
|
||||
// The max. XML contains 100000 object names (each at most 1024 bytes long) + XML overhead
|
||||
const maxBodySize = 2 * 100000 * 1024
|
||||
|
||||
// Unmarshal list of keys to be deleted.
|
||||
deleteObjects := &DeleteObjectsRequest{}
|
||||
if err := xml.Unmarshal(deleteXMLBytes, deleteObjects); err != nil {
|
||||
if err := xmlDecoder(r.Body, deleteObjects, maxBodySize); err != nil {
|
||||
logger.LogIf(ctx, err, logger.Application)
|
||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMalformedXML), r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
|
||||
@@ -104,12 +104,12 @@ func startDailyLifecycle() {
|
||||
}
|
||||
}
|
||||
|
||||
var lifecycleTimeout = newDynamicTimeout(60*time.Second, time.Second)
|
||||
var lifecycleLockTimeout = newDynamicTimeout(60*time.Second, time.Second)
|
||||
|
||||
func lifecycleRound(ctx context.Context, objAPI ObjectLayer) error {
|
||||
// Lock to avoid concurrent lifecycle ops from other nodes
|
||||
sweepLock := objAPI.NewNSLock(ctx, "system", "daily-lifecycle-ops")
|
||||
if err := sweepLock.GetLock(lifecycleTimeout); err != nil {
|
||||
if err := sweepLock.GetLock(lifecycleLockTimeout); err != nil {
|
||||
return err
|
||||
}
|
||||
defer sweepLock.Unlock()
|
||||
|
||||
@@ -79,10 +79,12 @@ func timeToCrawl(ctx context.Context, objAPI ObjectLayer) time.Duration {
|
||||
return dataUsageCrawlInterval - waitDuration
|
||||
}
|
||||
|
||||
var dataUsageLockTimeout = lifecycleLockTimeout
|
||||
|
||||
func runDataUsageInfo(ctx context.Context, objAPI ObjectLayer, endCh <-chan struct{}) {
|
||||
locker := objAPI.NewNSLock(ctx, minioMetaBucket, "leader-data-usage-info")
|
||||
for {
|
||||
err := locker.GetLock(newDynamicTimeout(time.Millisecond, time.Millisecond))
|
||||
err := locker.GetLock(dataUsageLockTimeout)
|
||||
if err != nil {
|
||||
time.Sleep(5 * time.Minute)
|
||||
continue
|
||||
|
||||
@@ -188,9 +188,9 @@ func NewFSObjectLayer(fsPath string) (ObjectLayer, error) {
|
||||
}
|
||||
|
||||
// NewNSLock - initialize a new namespace RWLocker instance.
|
||||
func (fs *FSObjects) NewNSLock(ctx context.Context, bucket string, object string) RWLocker {
|
||||
func (fs *FSObjects) NewNSLock(ctx context.Context, bucket string, objects ...string) RWLocker {
|
||||
// lockers are explicitly 'nil' for FS mode since there are only local lockers
|
||||
return fs.nsMutex.NewNSLock(ctx, nil, bucket, object)
|
||||
return fs.nsMutex.NewNSLock(ctx, nil, bucket, objects...)
|
||||
}
|
||||
|
||||
// Shutdown - should be called when process shuts down.
|
||||
@@ -490,7 +490,6 @@ func (fs *FSObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBu
|
||||
// GetObjectNInfo - returns object info and a reader for object
|
||||
// content.
|
||||
func (fs *FSObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) {
|
||||
|
||||
if err = checkGetObjArgs(ctx, bucket, object); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -37,8 +37,8 @@ type GatewayLocker struct {
|
||||
}
|
||||
|
||||
// NewNSLock - implements gateway level locker
|
||||
func (l *GatewayLocker) NewNSLock(ctx context.Context, bucket string, object string) RWLocker {
|
||||
return l.nsMutex.NewNSLock(ctx, nil, bucket, object)
|
||||
func (l *GatewayLocker) NewNSLock(ctx context.Context, bucket string, objects ...string) RWLocker {
|
||||
return l.nsMutex.NewNSLock(ctx, nil, bucket, objects...)
|
||||
}
|
||||
|
||||
// NewGatewayLayerWithLocker - initialize gateway with locker.
|
||||
@@ -56,7 +56,7 @@ func (a GatewayUnsupported) CrawlAndGetDataUsage(ctx context.Context, endCh <-ch
|
||||
}
|
||||
|
||||
// NewNSLock is a dummy stub for gateway.
|
||||
func (a GatewayUnsupported) NewNSLock(ctx context.Context, bucket string, object string) RWLocker {
|
||||
func (a GatewayUnsupported) NewNSLock(ctx context.Context, bucket string, objects ...string) RWLocker {
|
||||
logger.CriticalIf(ctx, errors.New("not implemented"))
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -337,14 +337,6 @@ func (sys *IAMSys) Load() error {
|
||||
|
||||
// Perform IAM configuration migration.
|
||||
func (sys *IAMSys) doIAMConfigMigration(objAPI ObjectLayer) error {
|
||||
// Take IAM configuration migration lock
|
||||
lockPath := iamConfigPrefix + "/migration.lock"
|
||||
objLock := objAPI.NewNSLock(context.Background(), minioMetaBucket, lockPath)
|
||||
if err := objLock.GetLock(globalOperationTimeout); err != nil {
|
||||
return err
|
||||
}
|
||||
defer objLock.Unlock()
|
||||
|
||||
return sys.store.migrateBackendFormat(objAPI)
|
||||
}
|
||||
|
||||
|
||||
@@ -49,12 +49,42 @@ func (l *localLocker) String() string {
|
||||
return l.endpoint.String()
|
||||
}
|
||||
|
||||
func (l *localLocker) canTakeUnlock(resources ...string) bool {
|
||||
var lkCnt int
|
||||
for _, resource := range resources {
|
||||
isWriteLockTaken := isWriteLock(l.lockMap[resource])
|
||||
if isWriteLockTaken {
|
||||
lkCnt++
|
||||
}
|
||||
}
|
||||
return lkCnt == len(resources)
|
||||
}
|
||||
|
||||
func (l *localLocker) canTakeLock(resources ...string) bool {
|
||||
var noLkCnt int
|
||||
for _, resource := range resources {
|
||||
_, lockTaken := l.lockMap[resource]
|
||||
if !lockTaken {
|
||||
noLkCnt++
|
||||
}
|
||||
}
|
||||
return noLkCnt == len(resources)
|
||||
}
|
||||
|
||||
func (l *localLocker) Lock(args dsync.LockArgs) (reply bool, err error) {
|
||||
l.mutex.Lock()
|
||||
defer l.mutex.Unlock()
|
||||
_, isLockTaken := l.lockMap[args.Resource]
|
||||
if !isLockTaken { // No locks held on the given name, so claim write lock
|
||||
l.lockMap[args.Resource] = []lockRequesterInfo{
|
||||
|
||||
if !l.canTakeLock(args.Resources...) {
|
||||
// Not all locks can be taken on resources,
|
||||
// reject it completely.
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// No locks held on the all resources, so claim write
|
||||
// lock on all resources at once.
|
||||
for _, resource := range args.Resources {
|
||||
l.lockMap[resource] = []lockRequesterInfo{
|
||||
{
|
||||
Writer: true,
|
||||
Source: args.Source,
|
||||
@@ -64,24 +94,22 @@ func (l *localLocker) Lock(args dsync.LockArgs) (reply bool, err error) {
|
||||
},
|
||||
}
|
||||
}
|
||||
// return reply=true if lock was granted.
|
||||
return !isLockTaken, nil
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (l *localLocker) Unlock(args dsync.LockArgs) (reply bool, err error) {
|
||||
l.mutex.Lock()
|
||||
defer l.mutex.Unlock()
|
||||
var lri []lockRequesterInfo
|
||||
if lri, reply = l.lockMap[args.Resource]; !reply {
|
||||
// No lock is held on the given name
|
||||
return reply, fmt.Errorf("Unlock attempted on an unlocked entity: %s", args.Resource)
|
||||
|
||||
if !l.canTakeUnlock(args.Resources...) {
|
||||
// Unless it is a write lock reject it.
|
||||
return reply, fmt.Errorf("Unlock attempted on a read locked entity: %s", args.Resources)
|
||||
}
|
||||
if reply = isWriteLock(lri); !reply {
|
||||
// Unless it is a write lock
|
||||
return reply, fmt.Errorf("Unlock attempted on a read locked entity: %s (%d read locks active)", args.Resource, len(lri))
|
||||
}
|
||||
if !l.removeEntry(args.Resource, args.UID, &lri) {
|
||||
return false, fmt.Errorf("Unlock unable to find corresponding lock for uid: %s", args.UID)
|
||||
for _, resource := range args.Resources {
|
||||
lri := l.lockMap[resource]
|
||||
if !l.removeEntry(resource, args.UID, &lri) {
|
||||
return false, fmt.Errorf("Unlock unable to find corresponding lock for uid: %s on resource %s", args.UID, resource)
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
|
||||
@@ -120,14 +148,15 @@ func (l *localLocker) RLock(args dsync.LockArgs) (reply bool, err error) {
|
||||
Timestamp: UTCNow(),
|
||||
TimeLastCheck: UTCNow(),
|
||||
}
|
||||
if lri, ok := l.lockMap[args.Resource]; ok {
|
||||
resource := args.Resources[0]
|
||||
if lri, ok := l.lockMap[resource]; ok {
|
||||
if reply = !isWriteLock(lri); reply {
|
||||
// Unless there is a write lock
|
||||
l.lockMap[args.Resource] = append(l.lockMap[args.Resource], lrInfo)
|
||||
l.lockMap[resource] = append(l.lockMap[resource], lrInfo)
|
||||
}
|
||||
} else {
|
||||
// No locks held on the given name, so claim (first) read lock
|
||||
l.lockMap[args.Resource] = []lockRequesterInfo{lrInfo}
|
||||
l.lockMap[resource] = []lockRequesterInfo{lrInfo}
|
||||
reply = true
|
||||
}
|
||||
return reply, nil
|
||||
@@ -137,15 +166,17 @@ func (l *localLocker) RUnlock(args dsync.LockArgs) (reply bool, err error) {
|
||||
l.mutex.Lock()
|
||||
defer l.mutex.Unlock()
|
||||
var lri []lockRequesterInfo
|
||||
if lri, reply = l.lockMap[args.Resource]; !reply {
|
||||
|
||||
resource := args.Resources[0]
|
||||
if lri, reply = l.lockMap[resource]; !reply {
|
||||
// No lock is held on the given name
|
||||
return reply, fmt.Errorf("RUnlock attempted on an unlocked entity: %s", args.Resource)
|
||||
return reply, fmt.Errorf("RUnlock attempted on an unlocked entity: %s", resource)
|
||||
}
|
||||
if reply = !isWriteLock(lri); !reply {
|
||||
// A write-lock is held, cannot release a read lock
|
||||
return reply, fmt.Errorf("RUnlock attempted on a write locked entity: %s", args.Resource)
|
||||
return reply, fmt.Errorf("RUnlock attempted on a write locked entity: %s", resource)
|
||||
}
|
||||
if !l.removeEntry(args.Resource, args.UID, &lri) {
|
||||
if !l.removeEntry(resource, args.UID, &lri) {
|
||||
return false, fmt.Errorf("RUnlock unable to find corresponding read lock for uid: %s", args.UID)
|
||||
}
|
||||
return reply, nil
|
||||
@@ -176,11 +207,13 @@ func (l *localLocker) Expired(args dsync.LockArgs) (expired bool, err error) {
|
||||
defer l.mutex.Unlock()
|
||||
|
||||
// Lock found, proceed to verify if belongs to given uid.
|
||||
if lri, ok := l.lockMap[args.Resource]; ok {
|
||||
// Check whether uid is still active
|
||||
for _, entry := range lri {
|
||||
if entry.UID == args.UID {
|
||||
return false, nil
|
||||
for _, resource := range args.Resources {
|
||||
if lri, ok := l.lockMap[resource]; ok {
|
||||
// Check whether uid is still active
|
||||
for _, entry := range lri {
|
||||
if entry.UID == args.UID {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
@@ -104,9 +105,12 @@ func (client *lockRESTClient) restCall(call string, args dsync.LockArgs) (reply
|
||||
values := url.Values{}
|
||||
values.Set(lockRESTUID, args.UID)
|
||||
values.Set(lockRESTSource, args.Source)
|
||||
values.Set(lockRESTResource, args.Resource)
|
||||
|
||||
respBody, err := client.call(call, values, nil, -1)
|
||||
var buffer bytes.Buffer
|
||||
for _, resource := range args.Resources {
|
||||
buffer.WriteString(resource)
|
||||
buffer.WriteString("\n")
|
||||
}
|
||||
respBody, err := client.call(call, values, &buffer, -1)
|
||||
defer http.DrainBody(respBody)
|
||||
switch err {
|
||||
case nil:
|
||||
|
||||
@@ -21,8 +21,8 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
lockRESTVersion = "v2"
|
||||
lockRESTVersionPrefix = SlashSeparator + "v2"
|
||||
lockRESTVersion = "v3"
|
||||
lockRESTVersionPrefix = SlashSeparator + lockRESTVersion
|
||||
lockRESTPrefix = minioReservedBucketPath + "/lock"
|
||||
)
|
||||
|
||||
@@ -38,8 +38,6 @@ const (
|
||||
// Source contains the line number, function and file name of the code
|
||||
// on the client node that requested the lock.
|
||||
lockRESTSource = "source"
|
||||
// Resource contains a entity to be locked/unlocked.
|
||||
lockRESTResource = "resource"
|
||||
)
|
||||
|
||||
var (
|
||||
|
||||
@@ -17,11 +17,13 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"errors"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"path"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
@@ -55,12 +57,25 @@ func (l *lockRESTServer) IsValid(w http.ResponseWriter, r *http.Request) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func getLockArgs(r *http.Request) dsync.LockArgs {
|
||||
return dsync.LockArgs{
|
||||
UID: r.URL.Query().Get(lockRESTUID),
|
||||
Source: r.URL.Query().Get(lockRESTSource),
|
||||
Resource: r.URL.Query().Get(lockRESTResource),
|
||||
func getLockArgs(r *http.Request) (args dsync.LockArgs, err error) {
|
||||
args = dsync.LockArgs{
|
||||
UID: r.URL.Query().Get(lockRESTUID),
|
||||
Source: r.URL.Query().Get(lockRESTSource),
|
||||
}
|
||||
|
||||
var resources []string
|
||||
bio := bufio.NewScanner(r.Body)
|
||||
for bio.Scan() {
|
||||
resources = append(resources, bio.Text())
|
||||
}
|
||||
|
||||
if err := bio.Err(); err != nil {
|
||||
return args, err
|
||||
}
|
||||
|
||||
sort.Strings(resources)
|
||||
args.Resources = resources
|
||||
return args, nil
|
||||
}
|
||||
|
||||
// LockHandler - Acquires a lock.
|
||||
@@ -70,7 +85,13 @@ func (l *lockRESTServer) LockHandler(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
success, err := l.ll.Lock(getLockArgs(r))
|
||||
args, err := getLockArgs(r)
|
||||
if err != nil {
|
||||
l.writeErrorResponse(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
success, err := l.ll.Lock(args)
|
||||
if err == nil && !success {
|
||||
err = errLockConflict
|
||||
}
|
||||
@@ -87,7 +108,13 @@ func (l *lockRESTServer) UnlockHandler(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
_, err := l.ll.Unlock(getLockArgs(r))
|
||||
args, err := getLockArgs(r)
|
||||
if err != nil {
|
||||
l.writeErrorResponse(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
_, err = l.ll.Unlock(args)
|
||||
// Ignore the Unlock() "reply" return value because if err == nil, "reply" is always true
|
||||
// Consequently, if err != nil, reply is always false
|
||||
if err != nil {
|
||||
@@ -103,7 +130,13 @@ func (l *lockRESTServer) RLockHandler(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
success, err := l.ll.RLock(getLockArgs(r))
|
||||
args, err := getLockArgs(r)
|
||||
if err != nil {
|
||||
l.writeErrorResponse(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
success, err := l.ll.RLock(args)
|
||||
if err == nil && !success {
|
||||
err = errLockConflict
|
||||
}
|
||||
@@ -120,10 +153,15 @@ func (l *lockRESTServer) RUnlockHandler(w http.ResponseWriter, r *http.Request)
|
||||
return
|
||||
}
|
||||
|
||||
args, err := getLockArgs(r)
|
||||
if err != nil {
|
||||
l.writeErrorResponse(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
// Ignore the RUnlock() "reply" return value because if err == nil, "reply" is always true.
|
||||
// Consequently, if err != nil, reply is always false
|
||||
_, err := l.ll.RUnlock(getLockArgs(r))
|
||||
if err != nil {
|
||||
if _, err = l.ll.RUnlock(args); err != nil {
|
||||
l.writeErrorResponse(w, err)
|
||||
return
|
||||
}
|
||||
@@ -136,17 +174,24 @@ func (l *lockRESTServer) ExpiredHandler(w http.ResponseWriter, r *http.Request)
|
||||
return
|
||||
}
|
||||
|
||||
lockArgs := getLockArgs(r)
|
||||
args, err := getLockArgs(r)
|
||||
if err != nil {
|
||||
l.writeErrorResponse(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
l.ll.mutex.Lock()
|
||||
defer l.ll.mutex.Unlock()
|
||||
|
||||
// Lock found, proceed to verify if belongs to given uid.
|
||||
if lri, ok := l.ll.lockMap[lockArgs.Resource]; ok {
|
||||
// Check whether uid is still active
|
||||
for _, entry := range lri {
|
||||
if entry.UID == lockArgs.UID {
|
||||
l.writeErrorResponse(w, errLockNotExpired)
|
||||
return
|
||||
for _, resource := range args.Resources {
|
||||
if lri, ok := l.ll.lockMap[resource]; ok {
|
||||
// Check whether uid is still active
|
||||
for _, entry := range lri {
|
||||
if entry.UID == args.UID {
|
||||
l.writeErrorResponse(w, errLockNotExpired)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -216,8 +261,8 @@ func lockMaintenance(ctx context.Context, interval time.Duration, objAPI ObjectL
|
||||
// Call back to original server verify whether the lock is
|
||||
// still active (based on name & uid)
|
||||
expired, err := c.Expired(dsync.LockArgs{
|
||||
UID: nlrip.lri.UID,
|
||||
Resource: nlrip.name,
|
||||
UID: nlrip.lri.UID,
|
||||
Resources: []string{nlrip.name},
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
@@ -287,7 +332,7 @@ func startLockMaintenance() {
|
||||
|
||||
// registerLockRESTHandlers - register lock rest router.
|
||||
func registerLockRESTHandlers(router *mux.Router, endpointZones EndpointZones) {
|
||||
queries := restQueries(lockRESTUID, lockRESTSource, lockRESTResource)
|
||||
queries := restQueries(lockRESTUID, lockRESTSource)
|
||||
for _, ep := range endpointZones {
|
||||
for _, endpoint := range ep.Endpoints {
|
||||
if !endpoint.IsLocal {
|
||||
|
||||
@@ -21,6 +21,7 @@ import (
|
||||
"errors"
|
||||
pathutil "path"
|
||||
"runtime"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
@@ -51,16 +52,10 @@ func newNSLock(isDistXL bool) *nsLockMap {
|
||||
if isDistXL {
|
||||
return &nsMutex
|
||||
}
|
||||
nsMutex.lockMap = make(map[nsParam]*nsLock)
|
||||
nsMutex.lockMap = make(map[string]*nsLock)
|
||||
return &nsMutex
|
||||
}
|
||||
|
||||
// nsParam - carries name space resource.
|
||||
type nsParam struct {
|
||||
volume string
|
||||
path string
|
||||
}
|
||||
|
||||
// nsLock - provides primitives for locking critical namespace regions.
|
||||
type nsLock struct {
|
||||
*lsync.LRWMutex
|
||||
@@ -72,23 +67,24 @@ type nsLock struct {
|
||||
type nsLockMap struct {
|
||||
// Indicates if namespace is part of a distributed setup.
|
||||
isDistXL bool
|
||||
lockMap map[nsParam]*nsLock
|
||||
lockMap map[string]*nsLock
|
||||
lockMapMutex sync.RWMutex
|
||||
}
|
||||
|
||||
// Lock the namespace resource.
|
||||
func (n *nsLockMap) lock(ctx context.Context, volume, path string, lockSource, opsID string, readLock bool, timeout time.Duration) (locked bool) {
|
||||
func (n *nsLockMap) lock(ctx context.Context, volume string, path string, lockSource, opsID string, readLock bool, timeout time.Duration) (locked bool) {
|
||||
var nsLk *nsLock
|
||||
|
||||
resource := pathJoin(volume, path)
|
||||
|
||||
n.lockMapMutex.Lock()
|
||||
param := nsParam{volume, path}
|
||||
nsLk, found := n.lockMap[param]
|
||||
nsLk, found := n.lockMap[resource]
|
||||
if !found {
|
||||
n.lockMap[param] = &nsLock{
|
||||
nsLk = &nsLock{
|
||||
LRWMutex: lsync.NewLRWMutex(ctx),
|
||||
ref: 1,
|
||||
}
|
||||
nsLk = n.lockMap[param]
|
||||
n.lockMap[resource] = nsLk
|
||||
} else {
|
||||
// Update ref count here to avoid multiple races.
|
||||
nsLk.ref++
|
||||
@@ -109,7 +105,7 @@ func (n *nsLockMap) lock(ctx context.Context, volume, path string, lockSource, o
|
||||
nsLk.ref--
|
||||
if nsLk.ref == 0 {
|
||||
// Remove from the map if there are no more references.
|
||||
delete(n.lockMap, param)
|
||||
delete(n.lockMap, resource)
|
||||
}
|
||||
n.lockMapMutex.Unlock()
|
||||
}
|
||||
@@ -117,10 +113,10 @@ func (n *nsLockMap) lock(ctx context.Context, volume, path string, lockSource, o
|
||||
}
|
||||
|
||||
// Unlock the namespace resource.
|
||||
func (n *nsLockMap) unlock(volume, path string, readLock bool) {
|
||||
param := nsParam{volume, path}
|
||||
func (n *nsLockMap) unlock(volume string, path string, readLock bool) {
|
||||
resource := pathJoin(volume, path)
|
||||
n.lockMapMutex.RLock()
|
||||
nsLk, found := n.lockMap[param]
|
||||
nsLk, found := n.lockMap[resource]
|
||||
n.lockMapMutex.RUnlock()
|
||||
if !found {
|
||||
return
|
||||
@@ -137,45 +133,16 @@ func (n *nsLockMap) unlock(volume, path string, readLock bool) {
|
||||
nsLk.ref--
|
||||
if nsLk.ref == 0 {
|
||||
// Remove from the map if there are no more references.
|
||||
delete(n.lockMap, param)
|
||||
delete(n.lockMap, resource)
|
||||
}
|
||||
}
|
||||
n.lockMapMutex.Unlock()
|
||||
}
|
||||
|
||||
// Lock - locks the given resource for writes, using a previously
|
||||
// allocated name space lock or initializing a new one.
|
||||
func (n *nsLockMap) Lock(volume, path, opsID string, timeout time.Duration) (locked bool) {
|
||||
readLock := false // This is a write lock.
|
||||
|
||||
lockSource := getSource() // Useful for debugging
|
||||
return n.lock(context.Background(), volume, path, lockSource, opsID, readLock, timeout)
|
||||
}
|
||||
|
||||
// Unlock - unlocks any previously acquired write locks.
|
||||
func (n *nsLockMap) Unlock(volume, path, opsID string) {
|
||||
readLock := false
|
||||
n.unlock(volume, path, readLock)
|
||||
}
|
||||
|
||||
// RLock - locks any previously acquired read locks.
|
||||
func (n *nsLockMap) RLock(volume, path, opsID string, timeout time.Duration) (locked bool) {
|
||||
readLock := true
|
||||
|
||||
lockSource := getSource() // Useful for debugging
|
||||
return n.lock(context.Background(), volume, path, lockSource, opsID, readLock, timeout)
|
||||
}
|
||||
|
||||
// RUnlock - unlocks any previously acquired read locks.
|
||||
func (n *nsLockMap) RUnlock(volume, path, opsID string) {
|
||||
readLock := true
|
||||
n.unlock(volume, path, readLock)
|
||||
}
|
||||
|
||||
// dsync's distributed lock instance.
|
||||
type distLockInstance struct {
|
||||
rwMutex *dsync.DRWMutex
|
||||
volume, path, opsID string
|
||||
rwMutex *dsync.DRWMutex
|
||||
opsID string
|
||||
}
|
||||
|
||||
// Lock - block until write lock is taken or timeout has occurred.
|
||||
@@ -185,7 +152,7 @@ func (di *distLockInstance) GetLock(timeout *dynamicTimeout) (timedOutErr error)
|
||||
|
||||
if !di.rwMutex.GetLock(di.opsID, lockSource, timeout.Timeout()) {
|
||||
timeout.LogFailure()
|
||||
return OperationTimedOut{Path: di.path}
|
||||
return OperationTimedOut{}
|
||||
}
|
||||
timeout.LogSuccess(UTCNow().Sub(start))
|
||||
return nil
|
||||
@@ -202,7 +169,7 @@ func (di *distLockInstance) GetRLock(timeout *dynamicTimeout) (timedOutErr error
|
||||
start := UTCNow()
|
||||
if !di.rwMutex.GetRLock(di.opsID, lockSource, timeout.Timeout()) {
|
||||
timeout.LogFailure()
|
||||
return OperationTimedOut{Path: di.path}
|
||||
return OperationTimedOut{}
|
||||
}
|
||||
timeout.LogSuccess(UTCNow().Sub(start))
|
||||
return nil
|
||||
@@ -215,22 +182,26 @@ func (di *distLockInstance) RUnlock() {
|
||||
|
||||
// localLockInstance - frontend/top-level interface for namespace locks.
|
||||
type localLockInstance struct {
|
||||
ctx context.Context
|
||||
ns *nsLockMap
|
||||
volume, path, opsID string
|
||||
ctx context.Context
|
||||
ns *nsLockMap
|
||||
volume string
|
||||
paths []string
|
||||
opsID string
|
||||
}
|
||||
|
||||
// NewNSLock - returns a lock instance for a given volume and
|
||||
// path. The returned lockInstance object encapsulates the nsLockMap,
|
||||
// volume, path and operation ID.
|
||||
func (n *nsLockMap) NewNSLock(ctx context.Context, lockersFn func() []dsync.NetLocker, volume, path string) RWLocker {
|
||||
func (n *nsLockMap) NewNSLock(ctx context.Context, lockersFn func() []dsync.NetLocker, volume string, paths ...string) RWLocker {
|
||||
opsID := mustGetUUID()
|
||||
if n.isDistXL {
|
||||
return &distLockInstance{dsync.NewDRWMutex(ctx, pathJoin(volume, path), &dsync.Dsync{
|
||||
drwmutex := dsync.NewDRWMutex(ctx, &dsync.Dsync{
|
||||
GetLockersFn: lockersFn,
|
||||
}), volume, path, opsID}
|
||||
}, pathsJoinPrefix(volume, paths...)...)
|
||||
return &distLockInstance{drwmutex, opsID}
|
||||
}
|
||||
return &localLockInstance{ctx, n, volume, path, opsID}
|
||||
sort.Strings(paths)
|
||||
return &localLockInstance{ctx, n, volume, paths, opsID}
|
||||
}
|
||||
|
||||
// Lock - block until write lock is taken or timeout has occurred.
|
||||
@@ -238,9 +209,16 @@ func (li *localLockInstance) GetLock(timeout *dynamicTimeout) (timedOutErr error
|
||||
lockSource := getSource()
|
||||
start := UTCNow()
|
||||
readLock := false
|
||||
if !li.ns.lock(li.ctx, li.volume, li.path, lockSource, li.opsID, readLock, timeout.Timeout()) {
|
||||
timeout.LogFailure()
|
||||
return OperationTimedOut{Path: li.path}
|
||||
var success []int
|
||||
for i, path := range li.paths {
|
||||
if !li.ns.lock(li.ctx, li.volume, path, lockSource, li.opsID, readLock, timeout.Timeout()) {
|
||||
timeout.LogFailure()
|
||||
for _, sint := range success {
|
||||
li.ns.unlock(li.volume, li.paths[sint], readLock)
|
||||
}
|
||||
return OperationTimedOut{}
|
||||
}
|
||||
success = append(success, i)
|
||||
}
|
||||
timeout.LogSuccess(UTCNow().Sub(start))
|
||||
return
|
||||
@@ -249,7 +227,9 @@ func (li *localLockInstance) GetLock(timeout *dynamicTimeout) (timedOutErr error
|
||||
// Unlock - block until write lock is released.
|
||||
func (li *localLockInstance) Unlock() {
|
||||
readLock := false
|
||||
li.ns.unlock(li.volume, li.path, readLock)
|
||||
for _, path := range li.paths {
|
||||
li.ns.unlock(li.volume, path, readLock)
|
||||
}
|
||||
}
|
||||
|
||||
// RLock - block until read lock is taken or timeout has occurred.
|
||||
@@ -257,9 +237,16 @@ func (li *localLockInstance) GetRLock(timeout *dynamicTimeout) (timedOutErr erro
|
||||
lockSource := getSource()
|
||||
start := UTCNow()
|
||||
readLock := true
|
||||
if !li.ns.lock(li.ctx, li.volume, li.path, lockSource, li.opsID, readLock, timeout.Timeout()) {
|
||||
timeout.LogFailure()
|
||||
return OperationTimedOut{Path: li.path}
|
||||
var success []int
|
||||
for i, path := range li.paths {
|
||||
if !li.ns.lock(li.ctx, li.volume, path, lockSource, li.opsID, readLock, timeout.Timeout()) {
|
||||
timeout.LogFailure()
|
||||
for _, sint := range success {
|
||||
li.ns.unlock(li.volume, li.paths[sint], readLock)
|
||||
}
|
||||
return OperationTimedOut{}
|
||||
}
|
||||
success = append(success, i)
|
||||
}
|
||||
timeout.LogSuccess(UTCNow().Sub(start))
|
||||
return
|
||||
@@ -268,7 +255,9 @@ func (li *localLockInstance) GetRLock(timeout *dynamicTimeout) (timedOutErr erro
|
||||
// RUnlock - block until read lock is released.
|
||||
func (li *localLockInstance) RUnlock() {
|
||||
readLock := true
|
||||
li.ns.unlock(li.volume, li.path, readLock)
|
||||
for _, path := range li.paths {
|
||||
li.ns.unlock(li.volume, path, readLock)
|
||||
}
|
||||
}
|
||||
|
||||
func getSource() string {
|
||||
|
||||
@@ -18,184 +18,20 @@ package cmd
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// WARNING:
|
||||
//
|
||||
// Expected source line number is hard coded, 32, in the
|
||||
// Expected source line number is hard coded, 31, in the
|
||||
// following test. Adding new code before this test or changing its
|
||||
// position will cause the line number to change and the test to FAIL
|
||||
// Tests getSource().
|
||||
func TestGetSource(t *testing.T) {
|
||||
currentSource := func() string { return getSource() }
|
||||
gotSource := currentSource()
|
||||
// Hard coded line number, 32, in the "expectedSource" value
|
||||
expectedSource := "[namespace-lock_test.go:32:TestGetSource()]"
|
||||
// Hard coded line number, 31, in the "expectedSource" value
|
||||
expectedSource := "[namespace-lock_test.go:31:TestGetSource()]"
|
||||
if gotSource != expectedSource {
|
||||
t.Errorf("expected : %s, got : %s", expectedSource, gotSource)
|
||||
}
|
||||
}
|
||||
|
||||
// Tests functionality provided by namespace lock.
|
||||
func TestNamespaceLockTest(t *testing.T) {
|
||||
isDistXL := false
|
||||
nsMutex := newNSLock(isDistXL)
|
||||
|
||||
// List of test cases.
|
||||
testCases := []struct {
|
||||
lk func(s1, s2, s3 string, t time.Duration) bool
|
||||
unlk func(s1, s2, s3 string)
|
||||
rlk func(s1, s2, s3 string, t time.Duration) bool
|
||||
runlk func(s1, s2, s3 string)
|
||||
lockedRefCount uint
|
||||
unlockedRefCount uint
|
||||
shouldPass bool
|
||||
}{
|
||||
{
|
||||
lk: nsMutex.Lock,
|
||||
unlk: nsMutex.Unlock,
|
||||
lockedRefCount: 1,
|
||||
unlockedRefCount: 0,
|
||||
shouldPass: true,
|
||||
},
|
||||
{
|
||||
rlk: nsMutex.RLock,
|
||||
runlk: nsMutex.RUnlock,
|
||||
lockedRefCount: 4,
|
||||
unlockedRefCount: 2,
|
||||
shouldPass: true,
|
||||
},
|
||||
{
|
||||
rlk: nsMutex.RLock,
|
||||
runlk: nsMutex.RUnlock,
|
||||
lockedRefCount: 1,
|
||||
unlockedRefCount: 0,
|
||||
shouldPass: true,
|
||||
},
|
||||
}
|
||||
|
||||
// Run all test cases.
|
||||
|
||||
// Write lock tests.
|
||||
testCase := testCases[0]
|
||||
if !testCase.lk("a", "b", "c", 60*time.Second) { // lock once.
|
||||
t.Fatalf("Failed to acquire lock")
|
||||
}
|
||||
nsLk, ok := nsMutex.lockMap[nsParam{"a", "b"}]
|
||||
if !ok && testCase.shouldPass {
|
||||
t.Errorf("Lock in map missing.")
|
||||
}
|
||||
// Validate locked ref count.
|
||||
if testCase.lockedRefCount != nsLk.ref && testCase.shouldPass {
|
||||
t.Errorf("Test %d fails, expected to pass. Wanted ref count is %d, got %d", 1, testCase.lockedRefCount, nsLk.ref)
|
||||
}
|
||||
testCase.unlk("a", "b", "c") // unlock once.
|
||||
if testCase.unlockedRefCount != nsLk.ref && testCase.shouldPass {
|
||||
t.Errorf("Test %d fails, expected to pass. Wanted ref count is %d, got %d", 1, testCase.unlockedRefCount, nsLk.ref)
|
||||
}
|
||||
_, ok = nsMutex.lockMap[nsParam{"a", "b"}]
|
||||
if ok && !testCase.shouldPass {
|
||||
t.Errorf("Lock map found after unlock.")
|
||||
}
|
||||
|
||||
// Read lock tests.
|
||||
testCase = testCases[1]
|
||||
if !testCase.rlk("a", "b", "c", 60*time.Second) { // lock once.
|
||||
t.Fatalf("Failed to acquire first read lock")
|
||||
}
|
||||
if !testCase.rlk("a", "b", "c", 60*time.Second) { // lock second time.
|
||||
t.Fatalf("Failed to acquire second read lock")
|
||||
}
|
||||
if !testCase.rlk("a", "b", "c", 60*time.Second) { // lock third time.
|
||||
t.Fatalf("Failed to acquire third read lock")
|
||||
}
|
||||
if !testCase.rlk("a", "b", "c", 60*time.Second) { // lock fourth time.
|
||||
t.Fatalf("Failed to acquire fourth read lock")
|
||||
}
|
||||
nsLk, ok = nsMutex.lockMap[nsParam{"a", "b"}]
|
||||
if !ok && testCase.shouldPass {
|
||||
t.Errorf("Lock in map missing.")
|
||||
}
|
||||
// Validate locked ref count.
|
||||
if testCase.lockedRefCount != nsLk.ref && testCase.shouldPass {
|
||||
t.Errorf("Test %d fails, expected to pass. Wanted ref count is %d, got %d", 1, testCase.lockedRefCount, nsLk.ref)
|
||||
}
|
||||
|
||||
testCase.runlk("a", "b", "c") // unlock once.
|
||||
testCase.runlk("a", "b", "c") // unlock second time.
|
||||
if testCase.unlockedRefCount != nsLk.ref && testCase.shouldPass {
|
||||
t.Errorf("Test %d fails, expected to pass. Wanted ref count is %d, got %d", 2, testCase.unlockedRefCount, nsLk.ref)
|
||||
}
|
||||
_, ok = nsMutex.lockMap[nsParam{"a", "b"}]
|
||||
if !ok && testCase.shouldPass {
|
||||
t.Errorf("Lock map not found.")
|
||||
}
|
||||
|
||||
// Read lock 0 ref count.
|
||||
testCase = testCases[2]
|
||||
if !testCase.rlk("a", "c", "d", 60*time.Second) { // lock once.
|
||||
t.Fatalf("Failed to acquire read lock")
|
||||
}
|
||||
|
||||
nsLk, ok = nsMutex.lockMap[nsParam{"a", "c"}]
|
||||
if !ok && testCase.shouldPass {
|
||||
t.Errorf("Lock in map missing.")
|
||||
}
|
||||
// Validate locked ref count.
|
||||
if testCase.lockedRefCount != nsLk.ref && testCase.shouldPass {
|
||||
t.Errorf("Test %d fails, expected to pass. Wanted ref count is %d, got %d", 3, testCase.lockedRefCount, nsLk.ref)
|
||||
}
|
||||
testCase.runlk("a", "c", "d") // unlock once.
|
||||
if testCase.unlockedRefCount != nsLk.ref && testCase.shouldPass {
|
||||
t.Errorf("Test %d fails, expected to pass. Wanted ref count is %d, got %d", 3, testCase.unlockedRefCount, nsLk.ref)
|
||||
}
|
||||
_, ok = nsMutex.lockMap[nsParam{"a", "c"}]
|
||||
if ok && !testCase.shouldPass {
|
||||
t.Errorf("Lock map not found.")
|
||||
}
|
||||
}
|
||||
|
||||
func TestNamespaceLockTimedOut(t *testing.T) {
|
||||
isDistXL := false
|
||||
nsMutex := newNSLock(isDistXL)
|
||||
// Get write lock
|
||||
if !nsMutex.Lock("my-bucket", "my-object", "abc", 60*time.Second) {
|
||||
t.Fatalf("Failed to acquire lock")
|
||||
}
|
||||
|
||||
// Second attempt for write lock on same resource should time out
|
||||
locked := nsMutex.Lock("my-bucket", "my-object", "def", 1*time.Second)
|
||||
if locked {
|
||||
t.Fatalf("Should not have acquired lock")
|
||||
}
|
||||
|
||||
// Read lock on same resource should also time out
|
||||
locked = nsMutex.RLock("my-bucket", "my-object", "def", 1*time.Second)
|
||||
if locked {
|
||||
t.Fatalf("Should not have acquired read lock while write lock is active")
|
||||
}
|
||||
|
||||
// Release write lock
|
||||
nsMutex.Unlock("my-bucket", "my-object", "abc")
|
||||
|
||||
// Get read lock
|
||||
if !nsMutex.RLock("my-bucket", "my-object", "ghi", 60*time.Second) {
|
||||
t.Fatalf("Failed to acquire read lock")
|
||||
}
|
||||
|
||||
// Write lock on same resource should time out
|
||||
locked = nsMutex.Lock("my-bucket", "my-object", "klm", 1*time.Second)
|
||||
if locked {
|
||||
t.Fatalf("Should not have acquired lock")
|
||||
}
|
||||
|
||||
// 2nd read lock should be just fine
|
||||
if !nsMutex.RLock("my-bucket", "my-object", "nop", 60*time.Second) {
|
||||
t.Fatalf("Failed to acquire second read lock")
|
||||
}
|
||||
|
||||
// Release both read locks
|
||||
nsMutex.RUnlock("my-bucket", "my-object", "ghi")
|
||||
nsMutex.RUnlock("my-bucket", "my-object", "nop")
|
||||
}
|
||||
|
||||
@@ -348,11 +348,10 @@ func (e ObjectTooSmall) Error() string {
|
||||
|
||||
// OperationTimedOut - a timeout occurred.
|
||||
type OperationTimedOut struct {
|
||||
Path string
|
||||
}
|
||||
|
||||
func (e OperationTimedOut) Error() string {
|
||||
return "Operation timed out: " + e.Path
|
||||
return "Operation timed out"
|
||||
}
|
||||
|
||||
/// Multipart related errors.
|
||||
|
||||
@@ -55,7 +55,7 @@ const (
|
||||
// ObjectLayer implements primitives for object API layer.
|
||||
type ObjectLayer interface {
|
||||
// Locking operations on object.
|
||||
NewNSLock(ctx context.Context, bucket string, object string) RWLocker
|
||||
NewNSLock(ctx context.Context, bucket string, objects ...string) RWLocker
|
||||
|
||||
// Storage operations.
|
||||
Shutdown(context.Context) error
|
||||
|
||||
@@ -198,6 +198,16 @@ func retainSlash(s string) string {
|
||||
return strings.TrimSuffix(s, SlashSeparator) + SlashSeparator
|
||||
}
|
||||
|
||||
// pathsJoinPrefix - like pathJoin retains trailing SlashSeparator
|
||||
// for all elements, prepends them with 'prefix' respectively.
|
||||
func pathsJoinPrefix(prefix string, elem ...string) (paths []string) {
|
||||
paths = make([]string, len(elem))
|
||||
for i, e := range elem {
|
||||
paths[i] = pathJoin(prefix, e)
|
||||
}
|
||||
return paths
|
||||
}
|
||||
|
||||
// pathJoin - like path.Join() but retains trailing SlashSeparator of the last element
|
||||
func pathJoin(elem ...string) string {
|
||||
trailingSlash := ""
|
||||
|
||||
@@ -335,8 +335,11 @@ func newXLSets(endpoints Endpoints, format *formatXLV3, setCount int, drivesPerS
|
||||
}
|
||||
|
||||
// NewNSLock - initialize a new namespace RWLocker instance.
|
||||
func (s *xlSets) NewNSLock(ctx context.Context, bucket string, object string) RWLocker {
|
||||
return s.getHashedSet(object).NewNSLock(ctx, bucket, object)
|
||||
func (s *xlSets) NewNSLock(ctx context.Context, bucket string, objects ...string) RWLocker {
|
||||
if len(objects) == 1 {
|
||||
return s.getHashedSet(objects[0]).NewNSLock(ctx, bucket, objects...)
|
||||
}
|
||||
return s.getHashedSet("").NewNSLock(ctx, bucket, objects...)
|
||||
}
|
||||
|
||||
// StorageInfo - combines output of StorageInfo across all erasure coded object sets.
|
||||
|
||||
@@ -66,8 +66,8 @@ type xlObjects struct {
|
||||
}
|
||||
|
||||
// NewNSLock - initialize a new namespace RWLocker instance.
|
||||
func (xl xlObjects) NewNSLock(ctx context.Context, bucket string, object string) RWLocker {
|
||||
return xl.nsMutex.NewNSLock(ctx, xl.getLockers, bucket, object)
|
||||
func (xl xlObjects) NewNSLock(ctx context.Context, bucket string, objects ...string) RWLocker {
|
||||
return xl.nsMutex.NewNSLock(ctx, xl.getLockers, bucket, objects...)
|
||||
}
|
||||
|
||||
// Shutdown function for object storage interface.
|
||||
|
||||
@@ -83,8 +83,8 @@ func newXLZones(endpointZones EndpointZones) (ObjectLayer, error) {
|
||||
return z, nil
|
||||
}
|
||||
|
||||
func (z *xlZones) NewNSLock(ctx context.Context, bucket string, object string) RWLocker {
|
||||
return z.zones[0].NewNSLock(ctx, bucket, object)
|
||||
func (z *xlZones) NewNSLock(ctx context.Context, bucket string, objects ...string) RWLocker {
|
||||
return z.zones[0].NewNSLock(ctx, bucket, objects...)
|
||||
}
|
||||
|
||||
type zonesAvailableSpace []zoneAvailableSpace
|
||||
@@ -445,20 +445,12 @@ func (z *xlZones) DeleteObjects(ctx context.Context, bucket string, objects []st
|
||||
derrs[i] = checkDelObjArgs(ctx, bucket, objects[i])
|
||||
}
|
||||
|
||||
var objectLocks = make([]RWLocker, len(objects))
|
||||
for i := range objects {
|
||||
if derrs[i] != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Acquire a write lock before deleting the object.
|
||||
objectLocks[i] = z.NewNSLock(ctx, bucket, objects[i])
|
||||
if derrs[i] = objectLocks[i].GetLock(globalOperationTimeout); derrs[i] != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
defer objectLocks[i].Unlock()
|
||||
// Acquire a bulk write lock across 'objects'
|
||||
multiDeleteLock := z.NewNSLock(ctx, bucket, objects...)
|
||||
if err := multiDeleteLock.GetLock(globalOperationTimeout); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer multiDeleteLock.Unlock()
|
||||
|
||||
for _, zone := range z.zones {
|
||||
errs, err := zone.DeleteObjects(ctx, bucket, objects)
|
||||
|
||||
Reference in New Issue
Block a user