From 2294e53a0be1bcae2500a4a1ebaba015982e3eb7 Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Wed, 4 Nov 2020 08:25:42 -0800 Subject: [PATCH] Don't retain context in locker (#10515) Use the context for internal timeouts, but disconnect it from outgoing calls so we always receive the results and cancel it remotely. --- cmd/admin-handlers.go | 4 +- cmd/data-crawler.go | 4 +- cmd/disk-cache-backend.go | 26 ++++---- cmd/erasure-multipart.go | 26 ++++---- cmd/erasure-object.go | 26 ++++---- cmd/erasure-server-sets.go | 18 ++--- cmd/erasure-sets.go | 6 +- cmd/erasure.go | 4 +- cmd/fs-v1-multipart.go | 4 +- cmd/fs-v1.go | 34 +++++----- cmd/gateway-main.go | 4 +- cmd/gateway-unsupported.go | 4 +- cmd/handler-api.go | 3 + cmd/iam.go | 4 +- cmd/namespace-lock.go | 36 +++++----- cmd/object-api-interface.go | 2 +- cmd/server-main.go | 4 +- pkg/dsync/drwmutex.go | 130 +++++++++++++++--------------------- 18 files changed, 159 insertions(+), 180 deletions(-) diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index 0f3a66554..05dab2abc 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -1284,8 +1284,8 @@ func (a adminAPIHandlers) OBDInfoHandler(w http.ResponseWriter, r *http.Request) deadlinedCtx, cancel := context.WithTimeout(ctx, deadline) defer cancel() - nsLock := objectAPI.NewNSLock(ctx, minioMetaBucket, "obd-in-progress") - if err := nsLock.GetLock(newDynamicTimeout(deadline, deadline)); err != nil { // returns a locked lock + nsLock := objectAPI.NewNSLock(minioMetaBucket, "obd-in-progress") + if err := nsLock.GetLock(ctx, newDynamicTimeout(deadline, deadline)); err != nil { // returns a locked lock errResp(err) return } diff --git a/cmd/data-crawler.go b/cmd/data-crawler.go index b31e6d6a8..6d2ee9250 100644 --- a/cmd/data-crawler.go +++ b/cmd/data-crawler.go @@ -69,10 +69,10 @@ func initDataCrawler(ctx context.Context, objAPI ObjectLayer) { // There should only ever be one crawler running per cluster. func runDataCrawler(ctx context.Context, objAPI ObjectLayer) { // Make sure only 1 crawler is running on the cluster. - locker := objAPI.NewNSLock(ctx, minioMetaBucket, "runDataCrawler.lock") + locker := objAPI.NewNSLock(minioMetaBucket, "runDataCrawler.lock") r := rand.New(rand.NewSource(time.Now().UnixNano())) for { - err := locker.GetLock(dataCrawlerLeaderLockTimeout) + err := locker.GetLock(ctx, dataCrawlerLeaderLockTimeout) if err != nil { time.Sleep(time.Duration(r.Float64() * float64(dataCrawlStartDelay))) continue diff --git a/cmd/disk-cache-backend.go b/cmd/disk-cache-backend.go index a94a7cc71..c1fc9da54 100644 --- a/cmd/disk-cache-backend.go +++ b/cmd/disk-cache-backend.go @@ -148,7 +148,7 @@ type diskCache struct { // nsMutex namespace lock nsMutex *nsLockMap // Object functions pointing to the corresponding functions of backend implementation. - NewNSLockFn func(ctx context.Context, cachePath string) RWLocker + NewNSLockFn func(cachePath string) RWLocker } // Inits the disk cache dir if it is not initialized already. @@ -186,8 +186,8 @@ func newDiskCache(ctx context.Context, dir string, config cache.Config) (*diskCa go cache.scanCacheWritebackFailures(ctx) } cache.diskSpaceAvailable(0) // update if cache usage is already high. - cache.NewNSLockFn = func(ctx context.Context, cachePath string) RWLocker { - return cache.nsMutex.NewNSLock(ctx, nil, cachePath, "") + cache.NewNSLockFn = func(cachePath string) RWLocker { + return cache.nsMutex.NewNSLock(nil, cachePath, "") } return &cache, nil } @@ -436,8 +436,8 @@ func (c *diskCache) Stat(ctx context.Context, bucket, object string) (oi ObjectI // if partial object is cached. func (c *diskCache) statCachedMeta(ctx context.Context, cacheObjPath string) (meta *cacheMeta, partial bool, numHits int, err error) { - cLock := c.NewNSLockFn(ctx, cacheObjPath) - if err = cLock.GetRLock(globalOperationTimeout); err != nil { + cLock := c.NewNSLockFn(cacheObjPath) + if err = cLock.GetRLock(ctx, globalOperationTimeout); err != nil { return } @@ -518,8 +518,8 @@ func (c *diskCache) statCache(ctx context.Context, cacheObjPath string) (meta *c // incHitsOnly is true if metadata update is incrementing only the hit counter func (c *diskCache) SaveMetadata(ctx context.Context, bucket, object string, meta map[string]string, actualSize int64, rs *HTTPRangeSpec, rsFileName string, incHitsOnly bool) error { cachedPath := getCacheSHADir(c.dir, bucket, object) - cLock := c.NewNSLockFn(ctx, cachedPath) - if err := cLock.GetLock(globalOperationTimeout); err != nil { + cLock := c.NewNSLockFn(cachedPath) + if err := cLock.GetLock(ctx, globalOperationTimeout); err != nil { return err } defer cLock.Unlock() @@ -696,8 +696,8 @@ func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Read return oi, errDiskFull } cachePath := getCacheSHADir(c.dir, bucket, object) - cLock := c.NewNSLockFn(ctx, cachePath) - if err := cLock.GetLock(globalOperationTimeout); err != nil { + cLock := c.NewNSLockFn(cachePath) + if err := cLock.GetLock(ctx, globalOperationTimeout); err != nil { return oi, err } defer cLock.Unlock() @@ -910,8 +910,8 @@ func (c *diskCache) bitrotReadFromCache(ctx context.Context, filePath string, of // Get returns ObjectInfo and reader for object from disk cache func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, numHits int, err error) { cacheObjPath := getCacheSHADir(c.dir, bucket, object) - cLock := c.NewNSLockFn(ctx, cacheObjPath) - if err := cLock.GetRLock(globalOperationTimeout); err != nil { + cLock := c.NewNSLockFn(cacheObjPath) + if err := cLock.GetRLock(ctx, globalOperationTimeout); err != nil { return nil, numHits, err } @@ -974,8 +974,8 @@ func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRang // Deletes the cached object func (c *diskCache) delete(ctx context.Context, cacheObjPath string) (err error) { - cLock := c.NewNSLockFn(ctx, cacheObjPath) - if err := cLock.GetLock(globalOperationTimeout); err != nil { + cLock := c.NewNSLockFn(cacheObjPath) + if err := cLock.GetLock(ctx, globalOperationTimeout); err != nil { return err } defer cLock.Unlock() diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index 25390cb82..9cff2fcc2 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -355,8 +355,8 @@ func (er erasureObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObjec // // Implements S3 compatible Upload Part API. func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *PutObjReader, opts ObjectOptions) (pi PartInfo, err error) { - uploadIDLock := er.NewNSLock(ctx, bucket, pathJoin(object, uploadID)) - if err = uploadIDLock.GetRLock(globalOperationTimeout); err != nil { + uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID)) + if err = uploadIDLock.GetRLock(ctx, globalOperationTimeout); err != nil { return PartInfo{}, err } readLocked := true @@ -469,7 +469,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo // PutObjectParts would serialize here updating `xl.meta` uploadIDLock.RUnlock() readLocked = false - if err = uploadIDLock.GetLock(globalOperationTimeout); err != nil { + if err = uploadIDLock.GetLock(ctx, globalOperationTimeout); err != nil { return PartInfo{}, err } defer uploadIDLock.Unlock() @@ -550,8 +550,8 @@ func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, u UploadID: uploadID, } - uploadIDLock := er.NewNSLock(ctx, bucket, pathJoin(object, uploadID)) - if err := uploadIDLock.GetRLock(globalOperationTimeout); err != nil { + uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID)) + if err := uploadIDLock.GetRLock(ctx, globalOperationTimeout); err != nil { return MultipartInfo{}, err } defer uploadIDLock.RUnlock() @@ -598,8 +598,8 @@ func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, u // ListPartsInfo structure is marshaled directly into XML and // replied back to the client. func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker, maxParts int, opts ObjectOptions) (result ListPartsInfo, e error) { - uploadIDLock := er.NewNSLock(ctx, bucket, pathJoin(object, uploadID)) - if err := uploadIDLock.GetRLock(globalOperationTimeout); err != nil { + uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID)) + if err := uploadIDLock.GetRLock(ctx, globalOperationTimeout); err != nil { return ListPartsInfo{}, err } defer uploadIDLock.RUnlock() @@ -691,8 +691,8 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket string, object string, uploadID string, parts []CompletePart, opts ObjectOptions) (oi ObjectInfo, err error) { // Hold read-locks to verify uploaded parts, also disallows // parallel part uploads as well. - uploadIDLock := er.NewNSLock(ctx, bucket, pathJoin(object, uploadID)) - if err = uploadIDLock.GetRLock(globalOperationTimeout); err != nil { + uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID)) + if err = uploadIDLock.GetRLock(ctx, globalOperationTimeout); err != nil { return oi, err } defer uploadIDLock.RUnlock() @@ -844,8 +844,8 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str } // Hold namespace to complete the transaction - lk := er.NewNSLock(ctx, bucket, object) - if err = lk.GetLock(globalOperationTimeout); err != nil { + lk := er.NewNSLock(bucket, object) + if err = lk.GetLock(ctx, globalOperationTimeout); err != nil { return oi, err } defer lk.Unlock() @@ -886,8 +886,8 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str // would be removed from the system, rollback is not possible on this // operation. func (er erasureObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) error { - lk := er.NewNSLock(ctx, bucket, pathJoin(object, uploadID)) - if err := lk.GetLock(globalOperationTimeout); err != nil { + lk := er.NewNSLock(bucket, pathJoin(object, uploadID)) + if err := lk.GetLock(ctx, globalOperationTimeout); err != nil { return err } defer lk.Unlock() diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 2e899db39..ba306afab 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -48,8 +48,8 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d } defer ObjectPathUpdated(path.Join(dstBucket, dstObject)) - lk := er.NewNSLock(ctx, dstBucket, dstObject) - if err := lk.GetLock(globalOperationTimeout); err != nil { + lk := er.NewNSLock(dstBucket, dstObject) + if err := lk.GetLock(ctx, globalOperationTimeout); err != nil { return oi, err } defer lk.Unlock() @@ -135,15 +135,15 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri // Acquire lock if lockType != noLock { - lock := er.NewNSLock(ctx, bucket, object) + lock := er.NewNSLock(bucket, object) switch lockType { case writeLock: - if err = lock.GetLock(globalOperationTimeout); err != nil { + if err = lock.GetLock(ctx, globalOperationTimeout); err != nil { return nil, err } nsUnlocker = lock.Unlock case readLock: - if err = lock.GetRLock(globalOperationTimeout); err != nil { + if err = lock.GetRLock(ctx, globalOperationTimeout); err != nil { return nil, err } nsUnlocker = lock.RUnlock @@ -196,8 +196,8 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri // length indicates the total length of the object. func (er erasureObjects) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) error { // Lock the object before reading. - lk := er.NewNSLock(ctx, bucket, object) - if err := lk.GetRLock(globalOperationTimeout); err != nil { + lk := er.NewNSLock(bucket, object) + if err := lk.GetRLock(ctx, globalOperationTimeout); err != nil { return err } defer lk.RUnlock() @@ -343,8 +343,8 @@ func (er erasureObjects) getObject(ctx context.Context, bucket, object string, s // GetObjectInfo - reads object metadata and replies back ObjectInfo. func (er erasureObjects) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (info ObjectInfo, err error) { // Lock the object before reading. - lk := er.NewNSLock(ctx, bucket, object) - if err := lk.GetRLock(globalOperationTimeout); err != nil { + lk := er.NewNSLock(bucket, object) + if err := lk.GetRLock(ctx, globalOperationTimeout); err != nil { return ObjectInfo{}, err } defer lk.RUnlock() @@ -635,8 +635,8 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st return ObjectInfo{}, IncompleteBody{Bucket: bucket, Object: object} } - lk := er.NewNSLock(ctx, bucket, object) - if err := lk.GetLock(globalOperationTimeout); err != nil { + lk := er.NewNSLock(bucket, object) + if err := lk.GetLock(ctx, globalOperationTimeout); err != nil { return ObjectInfo{}, err } defer lk.Unlock() @@ -906,8 +906,8 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string } // Acquire a write lock before deleting the object. - lk := er.NewNSLock(ctx, bucket, object) - if err = lk.GetLock(globalDeleteOperationTimeout); err != nil { + lk := er.NewNSLock(bucket, object) + if err = lk.GetLock(ctx, globalDeleteOperationTimeout); err != nil { return ObjectInfo{}, err } defer lk.Unlock() diff --git a/cmd/erasure-server-sets.go b/cmd/erasure-server-sets.go index ff86fc3a4..03acbc434 100644 --- a/cmd/erasure-server-sets.go +++ b/cmd/erasure-server-sets.go @@ -88,8 +88,8 @@ func newErasureServerSets(ctx context.Context, endpointServerSets EndpointServer return z, nil } -func (z *erasureServerSets) NewNSLock(ctx context.Context, bucket string, objects ...string) RWLocker { - return z.serverSets[0].NewNSLock(ctx, bucket, objects...) +func (z *erasureServerSets) NewNSLock(bucket string, objects ...string) RWLocker { + return z.serverSets[0].NewNSLock(bucket, objects...) } func (z *erasureServerSets) GetAllLockers() []dsync.NetLocker { @@ -569,8 +569,8 @@ func (z *erasureServerSets) DeleteObjects(ctx context.Context, bucket string, ob } // Acquire a bulk write lock across 'objects' - multiDeleteLock := z.NewNSLock(ctx, bucket, objSets.ToSlice()...) - if err := multiDeleteLock.GetLock(globalOperationTimeout); err != nil { + multiDeleteLock := z.NewNSLock(bucket, objSets.ToSlice()...) + if err := multiDeleteLock.GetLock(ctx, globalOperationTimeout); err != nil { for i := range derrs { derrs[i] = err } @@ -1167,8 +1167,8 @@ func (z *erasureServerSets) ListBuckets(ctx context.Context) (buckets []BucketIn func (z *erasureServerSets) HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error) { // Acquire lock on format.json - formatLock := z.NewNSLock(ctx, minioMetaBucket, formatConfigFile) - if err := formatLock.GetLock(globalOperationTimeout); err != nil { + formatLock := z.NewNSLock(minioMetaBucket, formatConfigFile) + if err := formatLock.GetLock(ctx, globalOperationTimeout); err != nil { return madmin.HealResultItem{}, err } defer formatLock.Unlock() @@ -1349,17 +1349,17 @@ func (z *erasureServerSets) HealObjects(ctx context.Context, bucket, prefix stri func (z *erasureServerSets) HealObject(ctx context.Context, bucket, object, versionID string, opts madmin.HealOpts) (madmin.HealResultItem, error) { object = encodeDirObject(object) - lk := z.NewNSLock(ctx, bucket, object) + lk := z.NewNSLock(bucket, object) if bucket == minioMetaBucket { // For .minio.sys bucket heals we should hold write locks. - if err := lk.GetLock(globalOperationTimeout); err != nil { + if err := lk.GetLock(ctx, globalOperationTimeout); err != nil { return madmin.HealResultItem{}, err } defer lk.Unlock() } else { // Lock the object before healing. Use read lock since healing // will only regenerate parts & xl.meta of outdated disks. - if err := lk.GetRLock(globalOperationTimeout); err != nil { + if err := lk.GetRLock(ctx, globalOperationTimeout); err != nil { return madmin.HealResultItem{}, err } defer lk.RUnlock() diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index 4443812af..48c3cd38c 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -417,11 +417,11 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto } // NewNSLock - initialize a new namespace RWLocker instance. -func (s *erasureSets) NewNSLock(ctx context.Context, bucket string, objects ...string) RWLocker { +func (s *erasureSets) NewNSLock(bucket string, objects ...string) RWLocker { if len(objects) == 1 { - return s.getHashedSet(objects[0]).NewNSLock(ctx, bucket, objects...) + return s.getHashedSet(objects[0]).NewNSLock(bucket, objects...) } - return s.getHashedSet("").NewNSLock(ctx, bucket, objects...) + return s.getHashedSet("").NewNSLock(bucket, objects...) } // SetDriveCount returns the current drives per set. diff --git a/cmd/erasure.go b/cmd/erasure.go index 7ea53fa7a..9c081a87d 100644 --- a/cmd/erasure.go +++ b/cmd/erasure.go @@ -68,8 +68,8 @@ type erasureObjects struct { } // NewNSLock - initialize a new namespace RWLocker instance. -func (er erasureObjects) NewNSLock(ctx context.Context, bucket string, objects ...string) RWLocker { - return er.nsMutex.NewNSLock(ctx, er.getLockers, bucket, objects...) +func (er erasureObjects) NewNSLock(bucket string, objects ...string) RWLocker { + return er.nsMutex.NewNSLock(er.getLockers, bucket, objects...) } // SetDriveCount returns the current drives per set. diff --git a/cmd/fs-v1-multipart.go b/cmd/fs-v1-multipart.go index dd56d2253..e1061c961 100644 --- a/cmd/fs-v1-multipart.go +++ b/cmd/fs-v1-multipart.go @@ -714,8 +714,8 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string, } // Hold write lock on the object. - destLock := fs.NewNSLock(ctx, bucket, object) - if err = destLock.GetLock(globalOperationTimeout); err != nil { + destLock := fs.NewNSLock(bucket, object) + if err = destLock.GetLock(ctx, globalOperationTimeout); err != nil { return oi, err } defer destLock.Unlock() diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index f5fb95848..7bcc8413b 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -186,9 +186,9 @@ func NewFSObjectLayer(fsPath string) (ObjectLayer, error) { } // NewNSLock - initialize a new namespace RWLocker instance. -func (fs *FSObjects) NewNSLock(ctx context.Context, bucket string, objects ...string) RWLocker { +func (fs *FSObjects) NewNSLock(bucket string, objects ...string) RWLocker { // lockers are explicitly 'nil' for FS mode since there are only local lockers - return fs.nsMutex.NewNSLock(ctx, nil, bucket, objects...) + return fs.nsMutex.NewNSLock(nil, bucket, objects...) } // SetDriveCount no-op @@ -602,8 +602,8 @@ func (fs *FSObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBu defer ObjectPathUpdated(path.Join(dstBucket, dstObject)) if !cpSrcDstSame { - objectDWLock := fs.NewNSLock(ctx, dstBucket, dstObject) - if err := objectDWLock.GetLock(globalOperationTimeout); err != nil { + objectDWLock := fs.NewNSLock(dstBucket, dstObject) + if err := objectDWLock.GetLock(ctx, globalOperationTimeout); err != nil { return oi, err } defer objectDWLock.Unlock() @@ -693,15 +693,15 @@ func (fs *FSObjects) GetObjectNInfo(ctx context.Context, bucket, object string, if lockType != noLock { // Lock the object before reading. - lock := fs.NewNSLock(ctx, bucket, object) + lock := fs.NewNSLock(bucket, object) switch lockType { case writeLock: - if err = lock.GetLock(globalOperationTimeout); err != nil { + if err = lock.GetLock(ctx, globalOperationTimeout); err != nil { return nil, err } nsUnlocker = lock.Unlock case readLock: - if err = lock.GetRLock(globalOperationTimeout); err != nil { + if err = lock.GetRLock(ctx, globalOperationTimeout); err != nil { return nil, err } nsUnlocker = lock.RUnlock @@ -786,8 +786,8 @@ func (fs *FSObjects) GetObject(ctx context.Context, bucket, object string, offse } // Lock the object before reading. - lk := fs.NewNSLock(ctx, bucket, object) - if err := lk.GetRLock(globalOperationTimeout); err != nil { + lk := fs.NewNSLock(bucket, object) + if err := lk.GetRLock(ctx, globalOperationTimeout); err != nil { logger.LogIf(ctx, err) return err } @@ -1014,8 +1014,8 @@ func (fs *FSObjects) getObjectInfo(ctx context.Context, bucket, object string) ( // getObjectInfoWithLock - reads object metadata and replies back ObjectInfo. func (fs *FSObjects) getObjectInfoWithLock(ctx context.Context, bucket, object string) (oi ObjectInfo, e error) { // Lock the object before reading. - lk := fs.NewNSLock(ctx, bucket, object) - if err := lk.GetRLock(globalOperationTimeout); err != nil { + lk := fs.NewNSLock(bucket, object) + if err := lk.GetRLock(ctx, globalOperationTimeout); err != nil { return oi, err } defer lk.RUnlock() @@ -1052,8 +1052,8 @@ func (fs *FSObjects) GetObjectInfo(ctx context.Context, bucket, object string, o oi, err := fs.getObjectInfoWithLock(ctx, bucket, object) if err == errCorruptedFormat || err == io.EOF { - lk := fs.NewNSLock(ctx, bucket, object) - if err = lk.GetLock(globalOperationTimeout); err != nil { + lk := fs.NewNSLock(bucket, object) + if err = lk.GetLock(ctx, globalOperationTimeout); err != nil { return oi, toObjectErr(err, bucket, object) } @@ -1103,8 +1103,8 @@ func (fs *FSObjects) PutObject(ctx context.Context, bucket string, object string } // Lock the object. - lk := fs.NewNSLock(ctx, bucket, object) - if err := lk.GetLock(globalOperationTimeout); err != nil { + lk := fs.NewNSLock(bucket, object) + if err := lk.GetLock(ctx, globalOperationTimeout); err != nil { logger.LogIf(ctx, err) return objInfo, err } @@ -1285,8 +1285,8 @@ func (fs *FSObjects) DeleteObject(ctx context.Context, bucket, object string, op } // Acquire a write lock before deleting the object. - lk := fs.NewNSLock(ctx, bucket, object) - if err = lk.GetLock(globalOperationTimeout); err != nil { + lk := fs.NewNSLock(bucket, object) + if err = lk.GetLock(ctx, globalOperationTimeout); err != nil { return objInfo, err } defer lk.Unlock() diff --git a/cmd/gateway-main.go b/cmd/gateway-main.go index b336242d7..95e5c75bc 100644 --- a/cmd/gateway-main.go +++ b/cmd/gateway-main.go @@ -51,8 +51,8 @@ type GatewayLocker struct { } // NewNSLock - implements gateway level locker -func (l *GatewayLocker) NewNSLock(ctx context.Context, bucket string, objects ...string) RWLocker { - return l.nsMutex.NewNSLock(ctx, nil, bucket, objects...) +func (l *GatewayLocker) NewNSLock(bucket string, objects ...string) RWLocker { + return l.nsMutex.NewNSLock(nil, bucket, objects...) } // Walk - implements common gateway level Walker, to walk on all objects recursively at a prefix diff --git a/cmd/gateway-unsupported.go b/cmd/gateway-unsupported.go index 61fc213d3..f0269cdef 100644 --- a/cmd/gateway-unsupported.go +++ b/cmd/gateway-unsupported.go @@ -41,8 +41,8 @@ func (a GatewayUnsupported) CrawlAndGetDataUsage(ctx context.Context, bf *bloomF } // NewNSLock is a dummy stub for gateway. -func (a GatewayUnsupported) NewNSLock(ctx context.Context, bucket string, objects ...string) RWLocker { - logger.CriticalIf(ctx, errors.New("not implemented")) +func (a GatewayUnsupported) NewNSLock(bucket string, objects ...string) RWLocker { + logger.CriticalIf(context.Background(), errors.New("not implemented")) return nil } diff --git a/cmd/handler-api.go b/cmd/handler-api.go index 513071449..8d09edb9f 100644 --- a/cmd/handler-api.go +++ b/cmd/handler-api.go @@ -101,6 +101,9 @@ func (t *apiConfig) getRequestsPool() (chan struct{}, <-chan time.Time) { if t.requestsPool == nil { return nil, nil } + if t.requestsDeadline <= 0 { + return t.requestsPool, nil + } return t.requestsPool, time.NewTimer(t.requestsDeadline).C } diff --git a/cmd/iam.go b/cmd/iam.go index 2d49763db..69656d5fe 100644 --- a/cmd/iam.go +++ b/cmd/iam.go @@ -429,7 +429,7 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) { defer cancel() // Hold the lock for migration only. - txnLk := objAPI.NewNSLock(retryCtx, minioMetaBucket, minioConfigPrefix+"/iam.lock") + txnLk := objAPI.NewNSLock(minioMetaBucket, minioConfigPrefix+"/iam.lock") // Initializing IAM sub-system needs a retry mechanism for // the following reasons: @@ -449,7 +449,7 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) { for { // let one of the server acquire the lock, if not let them timeout. // which shall be retried again by this loop. - if err = txnLk.GetLock(iamLockTimeout); err != nil { + if err = txnLk.GetLock(retryCtx, iamLockTimeout); err != nil { logger.Info("Waiting for all MinIO IAM sub-system to be initialized.. trying to acquire lock") time.Sleep(time.Duration(r.Float64() * float64(5*time.Second))) continue diff --git a/cmd/namespace-lock.go b/cmd/namespace-lock.go index 45e965d9c..ede34793f 100644 --- a/cmd/namespace-lock.go +++ b/cmd/namespace-lock.go @@ -38,9 +38,9 @@ var globalLockServers = make(map[Endpoint]*localLocker) // RWLocker - locker interface to introduce GetRLock, RUnlock. type RWLocker interface { - GetLock(timeout *dynamicTimeout) (timedOutErr error) + GetLock(ctx context.Context, timeout *dynamicTimeout) (timedOutErr error) Unlock() - GetRLock(timeout *dynamicTimeout) (timedOutErr error) + GetRLock(ctx context.Context, timeout *dynamicTimeout) (timedOutErr error) RUnlock() } @@ -139,15 +139,14 @@ func (n *nsLockMap) unlock(volume string, path string, readLock bool) { type distLockInstance struct { rwMutex *dsync.DRWMutex opsID string - ctx context.Context } // Lock - block until write lock is taken or timeout has occurred. -func (di *distLockInstance) GetLock(timeout *dynamicTimeout) (timedOutErr error) { +func (di *distLockInstance) GetLock(ctx context.Context, timeout *dynamicTimeout) (timedOutErr error) { lockSource := getSource(2) start := UTCNow() - if !di.rwMutex.GetLock(di.ctx, di.opsID, lockSource, dsync.Options{ + if !di.rwMutex.GetLock(ctx, di.opsID, lockSource, dsync.Options{ Timeout: timeout.Timeout(), }) { timeout.LogFailure() @@ -163,11 +162,11 @@ func (di *distLockInstance) Unlock() { } // RLock - block until read lock is taken or timeout has occurred. -func (di *distLockInstance) GetRLock(timeout *dynamicTimeout) (timedOutErr error) { +func (di *distLockInstance) GetRLock(ctx context.Context, timeout *dynamicTimeout) (timedOutErr error) { lockSource := getSource(2) start := UTCNow() - if !di.rwMutex.GetRLock(di.ctx, di.opsID, lockSource, dsync.Options{ + if !di.rwMutex.GetRLock(ctx, di.opsID, lockSource, dsync.Options{ Timeout: timeout.Timeout(), }) { timeout.LogFailure() @@ -184,7 +183,6 @@ func (di *distLockInstance) RUnlock() { // localLockInstance - frontend/top-level interface for namespace locks. type localLockInstance struct { - ctx context.Context ns *nsLockMap volume string paths []string @@ -194,26 +192,26 @@ type localLockInstance struct { // NewNSLock - returns a lock instance for a given volume and // path. The returned lockInstance object encapsulates the nsLockMap, // volume, path and operation ID. -func (n *nsLockMap) NewNSLock(ctx context.Context, lockers func() ([]dsync.NetLocker, string), volume string, paths ...string) RWLocker { +func (n *nsLockMap) NewNSLock(lockers func() ([]dsync.NetLocker, string), volume string, paths ...string) RWLocker { opsID := mustGetUUID() if n.isDistErasure { drwmutex := dsync.NewDRWMutex(&dsync.Dsync{ GetLockers: lockers, }, pathsJoinPrefix(volume, paths...)...) - return &distLockInstance{drwmutex, opsID, ctx} + return &distLockInstance{drwmutex, opsID} } sort.Strings(paths) - return &localLockInstance{ctx, n, volume, paths, opsID} + return &localLockInstance{n, volume, paths, opsID} } // Lock - block until write lock is taken or timeout has occurred. -func (li *localLockInstance) GetLock(timeout *dynamicTimeout) (timedOutErr error) { +func (li *localLockInstance) GetLock(ctx context.Context, timeout *dynamicTimeout) (timedOutErr error) { lockSource := getSource(2) start := UTCNow() - readLock := false + const readLock = false var success []int for i, path := range li.paths { - if !li.ns.lock(li.ctx, li.volume, path, lockSource, li.opsID, readLock, timeout.Timeout()) { + if !li.ns.lock(ctx, li.volume, path, lockSource, li.opsID, readLock, timeout.Timeout()) { timeout.LogFailure() for _, sint := range success { li.ns.unlock(li.volume, li.paths[sint], readLock) @@ -228,20 +226,20 @@ func (li *localLockInstance) GetLock(timeout *dynamicTimeout) (timedOutErr error // Unlock - block until write lock is released. func (li *localLockInstance) Unlock() { - readLock := false + const readLock = false for _, path := range li.paths { li.ns.unlock(li.volume, path, readLock) } } // RLock - block until read lock is taken or timeout has occurred. -func (li *localLockInstance) GetRLock(timeout *dynamicTimeout) (timedOutErr error) { +func (li *localLockInstance) GetRLock(ctx context.Context, timeout *dynamicTimeout) (timedOutErr error) { lockSource := getSource(2) start := UTCNow() - readLock := true + const readLock = true var success []int for i, path := range li.paths { - if !li.ns.lock(li.ctx, li.volume, path, lockSource, li.opsID, readLock, timeout.Timeout()) { + if !li.ns.lock(ctx, li.volume, path, lockSource, li.opsID, readLock, timeout.Timeout()) { timeout.LogFailure() for _, sint := range success { li.ns.unlock(li.volume, li.paths[sint], readLock) @@ -256,7 +254,7 @@ func (li *localLockInstance) GetRLock(timeout *dynamicTimeout) (timedOutErr erro // RUnlock - block until read lock is released. func (li *localLockInstance) RUnlock() { - readLock := true + const readLock = true for _, path := range li.paths { li.ns.unlock(li.volume, path, readLock) } diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index 7a727d352..ddb11a737 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -68,7 +68,7 @@ type ObjectLayer interface { SetDriveCount() int // Only implemented by erasure layer // Locking operations on object. - NewNSLock(ctx context.Context, bucket string, objects ...string) RWLocker + NewNSLock(bucket string, objects ...string) RWLocker // Storage operations. Shutdown(context.Context) error diff --git a/cmd/server-main.go b/cmd/server-main.go index 7fb6654d0..786ebaa90 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -211,7 +211,7 @@ func initServer(ctx context.Context, newObject ObjectLayer) error { // at a given time, this big transaction lock ensures this // appropriately. This is also true for rotation of encrypted // content. - txnLk := newObject.NewNSLock(ctx, minioMetaBucket, minioConfigPrefix+"/transaction.lock") + txnLk := newObject.NewNSLock(minioMetaBucket, minioConfigPrefix+"/transaction.lock") // **** WARNING **** // Migrating to encrypted backend should happen before initialization of any @@ -241,7 +241,7 @@ func initServer(ctx context.Context, newObject ObjectLayer) error { // let one of the server acquire the lock, if not let them timeout. // which shall be retried again by this loop. - if err = txnLk.GetLock(lockTimeout); err != nil { + if err = txnLk.GetLock(ctx, lockTimeout); err != nil { logger.Info("Waiting for all MinIO sub-systems to be initialized.. trying to acquire lock") time.Sleep(time.Duration(r.Float64() * float64(5*time.Second))) diff --git a/pkg/dsync/drwmutex.go b/pkg/dsync/drwmutex.go index eff41e331..eebff1264 100644 --- a/pkg/dsync/drwmutex.go +++ b/pkg/dsync/drwmutex.go @@ -141,7 +141,7 @@ const ( // algorithm until either the lock is acquired successfully or more // time has elapsed than the timeout value. func (dm *DRWMutex) lockBlocking(ctx context.Context, id, source string, isReadLock bool, opts Options) (locked bool) { - restClnts, owner := dm.clnt.GetLockers() + restClnts, _ := dm.clnt.GetLockers() r := rand.New(rand.NewSource(time.Now().UnixNano())) @@ -149,8 +149,9 @@ func (dm *DRWMutex) lockBlocking(ctx context.Context, id, source string, isReadL locks := make([]string, len(restClnts)) log("lockBlocking %s/%s for %#v: lockType readLock(%t), additional opts: %#v\n", id, source, dm.Names, isReadLock, opts) - retryCtx, cancel := context.WithTimeout(ctx, opts.Timeout) + // Add total timeout + ctx, cancel := context.WithTimeout(ctx, opts.Timeout) defer cancel() // Tolerance is not set, defaults to half of the locker clients. @@ -175,19 +176,11 @@ func (dm *DRWMutex) lockBlocking(ctx context.Context, id, source string, isReadL for { select { - case <-retryCtx.Done(): - log("lockBlocking canceled %s/%s for %#v: lockType readLock(%t), additional opts: %#v\n", id, source, dm.Names, isReadLock, opts) - - // Caller context canceled or we timedout, - // return false anyways for both situations. - - // make sure to unlock any successful locks, since caller has timedout or canceled the request. - releaseAll(dm.clnt, tolerance, owner, &locks, isReadLock, restClnts, dm.Names...) - + case <-ctx.Done(): return false default: // Try to acquire the lock. - if locked = lock(retryCtx, dm.clnt, &locks, id, source, isReadLock, tolerance, quorum, dm.Names...); locked { + if locked = lock(ctx, dm.clnt, &locks, id, source, isReadLock, tolerance, quorum, dm.Names...); locked { dm.m.Lock() // If success, copy array to object @@ -201,6 +194,7 @@ func (dm *DRWMutex) lockBlocking(ctx context.Context, id, source string, isReadL } dm.m.Unlock() + log("lockBlocking %s/%s for %#v: granted\n", id, source, dm.Names) return locked } @@ -219,11 +213,12 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is // Create buffered channel of size equal to total number of nodes. ch := make(chan Granted, len(restClnts)) - defer close(ch) - var wg sync.WaitGroup - for index, c := range restClnts { + // Combined timout for the lock attempt. + ctx, cancel := context.WithTimeout(ctx, DRWMutexAcquireTimeout) + defer cancel() + for index, c := range restClnts { wg.Add(1) // broadcast lock request to all nodes go func(index int, isReadLock bool, c NetLocker) { @@ -231,7 +226,7 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is g := Granted{index: index} if c == nil { - log("dsync: nil locker") + log("dsync: nil locker\n") ch <- g return } @@ -247,93 +242,76 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is var locked bool var err error if isReadLock { - if locked, err = c.RLock(ctx, args); err != nil { + if locked, err = c.RLock(context.Background(), args); err != nil { log("dsync: Unable to call RLock failed with %s for %#v at %s\n", err, args, c) } } else { - if locked, err = c.Lock(ctx, args); err != nil { + if locked, err = c.Lock(context.Background(), args); err != nil { log("dsync: Unable to call Lock failed with %s for %#v at %s\n", err, args, c) } } - if locked { g.lockUID = args.UID } - ch <- g }(index, isReadLock, c) } - quorumLocked := false + // Wait until we have either + // + // a) received all lock responses + // b) received too many 'non-'locks for quorum to be still possible + // c) timed out + // + i, locksFailed := 0, 0 + done := false - wg.Add(1) - go func(isReadLock bool) { - defer wg.Done() - - // Wait until we have either - // - // a) received all lock responses - // b) received too many 'non-'locks for quorum to be still possible - // c) timedout - // - i, locksFailed := 0, 0 - done := false - timeout := time.After(DRWMutexAcquireTimeout) - - for ; i < len(restClnts); i++ { // Loop until we acquired all locks - - select { - case grant := <-ch: - if grant.isLocked() { - // Mark that this node has acquired the lock - (*locks)[grant.index] = grant.lockUID - } else { - locksFailed++ - if locksFailed > tolerance { - // We know that we are not going to get the lock anymore, - // so exit out and release any locks that did get acquired - done = true - // Increment the number of grants received from the buffered channel. - i++ - releaseAll(ds, tolerance, owner, locks, isReadLock, restClnts, lockNames...) - } - } - case <-timeout: - done = true - // timeout happened, maybe one of the nodes is slow, count - // number of locks to check whether we have quorum or not - if !checkQuorumLocked(locks, quorum) { - log("Quorum not met after timeout\n") - releaseAll(ds, tolerance, owner, locks, isReadLock, restClnts, lockNames...) - } else { - log("Quorum met after timeout\n") + for ; i < len(restClnts); i++ { // Loop until we acquired all locks + select { + case grant := <-ch: + if grant.isLocked() { + // Mark that this node has acquired the lock + (*locks)[grant.index] = grant.lockUID + } else { + locksFailed++ + if locksFailed > tolerance { + // We know that we are not going to get the lock anymore, + // so exit out and release any locks that did get acquired + done = true } } - - if done { - break - } + case <-ctx.Done(): + done = true + log("Timeout\n") } - // Count locks in order to determine whether we have quorum or not - quorumLocked = checkQuorumLocked(locks, quorum) + if done { + break + } + } - // Wait for the other responses and immediately release the locks - // (do not add them to the locks array because the DRWMutex could - // already has been unlocked again by the original calling thread) - for ; i < len(restClnts); i++ { - grantToBeReleased := <-ch + // Count locks in order to determine whether we have quorum or not + quorumLocked := checkQuorumLocked(locks, quorum) && locksFailed <= tolerance + if !quorumLocked { + log("Quorum not met\n") + releaseAll(ds, tolerance, owner, locks, isReadLock, restClnts, lockNames...) + } + + // We may have some unused results in ch, release them async. + go func() { + wg.Wait() + close(ch) + for grantToBeReleased := range ch { if grantToBeReleased.isLocked() { // release lock + log("Releasing abandoned lock\n") sendRelease(ds, restClnts[grantToBeReleased.index], owner, grantToBeReleased.lockUID, isReadLock, lockNames...) } } - }(isReadLock) - - wg.Wait() + }() return quorumLocked }