From 64f6020854e80f3e43ac78c1325dbe9e3d9f0299 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Thu, 29 Apr 2021 20:55:21 -0700 Subject: [PATCH] fix: cleanup locking, cancel context upon lock timeout (#12183) upon errors to acquire lock context would still leak, since the cancel would never be called. since the lock is never acquired - proactively clear it before returning. --- Makefile | 2 +- cmd/admin-handlers.go | 4 +-- cmd/data-scanner.go | 10 +++--- cmd/disk-cache-backend.go | 25 ++++++++------- cmd/erasure-healing.go | 7 +++-- cmd/erasure-multipart.go | 64 +++++++++++++++++++++----------------- cmd/erasure-object.go | 55 +++++++++++++++++--------------- cmd/erasure-server-pool.go | 26 +++++++++------- cmd/fs-v1-multipart.go | 5 +-- cmd/fs-v1.go | 38 ++++++++++++---------- cmd/iam.go | 12 +++---- cmd/namespace-lock.go | 48 +++++++++++++++++++--------- cmd/server-main.go | 7 +++-- 13 files changed, 175 insertions(+), 128 deletions(-) diff --git a/Makefile b/Makefile index 0cc17df73..feadc3251 100644 --- a/Makefile +++ b/Makefile @@ -47,7 +47,7 @@ test-race: verifiers build # Verify minio binary verify: @echo "Verifying build with race" - @GO111MODULE=on CGO_ENABLED=1 go build -tags kqueue -trimpath --ldflags "$(LDFLAGS)" -o $(PWD)/minio 1>/dev/null + @GO111MODULE=on CGO_ENABLED=1 go build -race -tags kqueue -trimpath --ldflags "$(LDFLAGS)" -o $(PWD)/minio 1>/dev/null @(env bash $(PWD)/buildscripts/verify-build.sh) # Verify healing of disks with minio binary diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index 22c02d815..53447e6ba 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -1416,12 +1416,12 @@ func (a adminAPIHandlers) HealthInfoHandler(w http.ResponseWriter, r *http.Reque defer deadlineCancel() nsLock := objectAPI.NewNSLock(minioMetaBucket, "health-check-in-progress") - ctx, cancel, err := nsLock.GetLock(ctx, newDynamicTimeout(deadline, deadline)) + lkctx, err := nsLock.GetLock(ctx, newDynamicTimeout(deadline, deadline)) if err != nil { // returns a locked lock errResp(err) return } - defer nsLock.Unlock(cancel) + defer nsLock.Unlock(lkctx.Cancel) go func() { defer close(healthInfoCh) diff --git a/cmd/data-scanner.go b/cmd/data-scanner.go index 746c4a2d8..ef7596905 100644 --- a/cmd/data-scanner.go +++ b/cmd/data-scanner.go @@ -91,19 +91,19 @@ func (s *safeDuration) Get() time.Duration { // runDataScanner will start a data scanner. // The function will block until the context is canceled. // There should only ever be one scanner running per cluster. -func runDataScanner(ctx context.Context, objAPI ObjectLayer) { +func runDataScanner(pctx context.Context, objAPI ObjectLayer) { // Make sure only 1 scanner is running on the cluster. locker := objAPI.NewNSLock(minioMetaBucket, "runDataScanner.lock") + var ctx context.Context r := rand.New(rand.NewSource(time.Now().UnixNano())) for { - var err error - var cancel context.CancelFunc - ctx, cancel, err = locker.GetLock(ctx, dataScannerLeaderLockTimeout) + lkctx, err := locker.GetLock(pctx, dataScannerLeaderLockTimeout) if err != nil { time.Sleep(time.Duration(r.Float64() * float64(scannerCycle.Get()))) continue } - defer cancel() + ctx = lkctx.Context() + defer lkctx.Cancel() break // No unlock for "leader" lock. } diff --git a/cmd/disk-cache-backend.go b/cmd/disk-cache-backend.go index 809b234a5..9923f3198 100644 --- a/cmd/disk-cache-backend.go +++ b/cmd/disk-cache-backend.go @@ -439,12 +439,12 @@ func (c *diskCache) Stat(ctx context.Context, bucket, object string) (oi ObjectI // if partial object is cached. func (c *diskCache) statCachedMeta(ctx context.Context, cacheObjPath string) (meta *cacheMeta, partial bool, numHits int, err error) { cLock := c.NewNSLockFn(cacheObjPath) - ctx, cancel, err := cLock.GetRLock(ctx, globalOperationTimeout) + lkctx, err := cLock.GetRLock(ctx, globalOperationTimeout) if err != nil { return } - - defer cLock.RUnlock(cancel) + ctx = lkctx.Context() + defer cLock.RUnlock(lkctx.Cancel) return c.statCache(ctx, cacheObjPath) } @@ -520,11 +520,12 @@ func (c *diskCache) statCache(ctx context.Context, cacheObjPath string) (meta *c func (c *diskCache) SaveMetadata(ctx context.Context, bucket, object string, meta map[string]string, actualSize int64, rs *HTTPRangeSpec, rsFileName string, incHitsOnly bool) error { cachedPath := getCacheSHADir(c.dir, bucket, object) cLock := c.NewNSLockFn(cachedPath) - ctx, cancel, err := cLock.GetLock(ctx, globalOperationTimeout) + lkctx, err := cLock.GetLock(ctx, globalOperationTimeout) if err != nil { return err } - defer cLock.Unlock(cancel) + ctx = lkctx.Context() + defer cLock.Unlock(lkctx.Cancel) return c.saveMetadata(ctx, bucket, object, meta, actualSize, rs, rsFileName, incHitsOnly) } @@ -698,11 +699,12 @@ func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Read } cachePath := getCacheSHADir(c.dir, bucket, object) cLock := c.NewNSLockFn(cachePath) - ctx, cancel, err := cLock.GetLock(ctx, globalOperationTimeout) + lkctx, err := cLock.GetLock(ctx, globalOperationTimeout) if err != nil { return oi, err } - defer cLock.Unlock(cancel) + ctx = lkctx.Context() + defer cLock.Unlock(lkctx.Cancel) meta, _, numHits, err := c.statCache(ctx, cachePath) // Case where object not yet cached @@ -913,11 +915,12 @@ func (c *diskCache) bitrotReadFromCache(ctx context.Context, filePath string, of func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, numHits int, err error) { cacheObjPath := getCacheSHADir(c.dir, bucket, object) cLock := c.NewNSLockFn(cacheObjPath) - ctx, cancel, err := cLock.GetRLock(ctx, globalOperationTimeout) + lkctx, err := cLock.GetRLock(ctx, globalOperationTimeout) if err != nil { return nil, numHits, err } - defer cLock.RUnlock(cancel) + ctx = lkctx.Context() + defer cLock.RUnlock(lkctx.Cancel) var objInfo ObjectInfo var rngInfo RangeInfo @@ -978,11 +981,11 @@ func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRang // Deletes the cached object func (c *diskCache) delete(ctx context.Context, cacheObjPath string) (err error) { cLock := c.NewNSLockFn(cacheObjPath) - _, cancel, err := cLock.GetLock(ctx, globalOperationTimeout) + lkctx, err := cLock.GetLock(ctx, globalOperationTimeout) if err != nil { return err } - defer cLock.Unlock(cancel) + defer cLock.Unlock(lkctx.Cancel) return removeAll(cacheObjPath) } diff --git a/cmd/erasure-healing.go b/cmd/erasure-healing.go index 9f7807410..bfd33013a 100644 --- a/cmd/erasure-healing.go +++ b/cmd/erasure-healing.go @@ -253,12 +253,13 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s } if !opts.NoLock { - var cancel context.CancelFunc lk := er.NewNSLock(bucket, object) - if ctx, cancel, err = lk.GetLock(ctx, globalOperationTimeout); err != nil { + lkctx, err := lk.GetLock(ctx, globalOperationTimeout) + if err != nil { return result, err } - defer lk.Unlock(cancel) + ctx = lkctx.Context() + defer lk.Unlock(lkctx.Cancel) } // Re-read when we have lock... diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index 3b853641d..48c0af785 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -378,14 +378,15 @@ func (er erasureObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObjec // Implements S3 compatible Upload Part API. func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *PutObjReader, opts ObjectOptions) (pi PartInfo, err error) { uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID)) - rctx, rcancel, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout) + rlkctx, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout) if err != nil { return PartInfo{}, err } + rctx := rlkctx.Context() readLocked := true defer func() { if readLocked { - uploadIDLock.RUnlock(rcancel) + uploadIDLock.RUnlock(rlkctx.Cancel) } }() @@ -505,13 +506,14 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo // Unlock here before acquiring write locks all concurrent // PutObjectParts would serialize here updating `xl.meta` - uploadIDLock.RUnlock(rcancel) + uploadIDLock.RUnlock(rlkctx.Cancel) readLocked = false - wctx, wcancel, err := uploadIDLock.GetLock(ctx, globalOperationTimeout) + wlkctx, err := uploadIDLock.GetLock(ctx, globalOperationTimeout) if err != nil { return PartInfo{}, err } - defer uploadIDLock.Unlock(wcancel) + wctx := wlkctx.Context() + defer uploadIDLock.Unlock(wlkctx.Cancel) // Validates if upload ID exists. if err = er.checkUploadIDExists(wctx, bucket, object, uploadID); err != nil { @@ -592,11 +594,12 @@ func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, u } uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID)) - ctx, cancel, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout) + lkctx, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout) if err != nil { return MultipartInfo{}, err } - defer uploadIDLock.RUnlock(cancel) + ctx = lkctx.Context() + defer uploadIDLock.RUnlock(lkctx.Cancel) if err := er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil { return result, toObjectErr(err, bucket, object, uploadID) @@ -641,11 +644,12 @@ func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, u // replied back to the client. func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker, maxParts int, opts ObjectOptions) (result ListPartsInfo, err error) { uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID)) - ctx, cancel, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout) + lkctx, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout) if err != nil { return ListPartsInfo{}, err } - defer uploadIDLock.RUnlock(cancel) + ctx = lkctx.Context() + defer uploadIDLock.RUnlock(lkctx.Cancel) if err := er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil { return result, toObjectErr(err, bucket, object, uploadID) @@ -735,19 +739,20 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str // Hold read-locks to verify uploaded parts, also disallows // parallel part uploads as well. uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID)) - ctx, cancel, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout) + rlkctx, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout) if err != nil { return oi, err } - defer uploadIDLock.RUnlock(cancel) + rctx := rlkctx.Context() + defer uploadIDLock.RUnlock(rlkctx.Cancel) - if err = er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil { + if err = er.checkUploadIDExists(rctx, bucket, object, uploadID); err != nil { return oi, toObjectErr(err, bucket, object, uploadID) } // Check if an object is present as one of the parent dir. // -- FIXME. (needs a new kind of lock). - if opts.ParentIsObject != nil && opts.ParentIsObject(ctx, bucket, path.Dir(object)) { + if opts.ParentIsObject != nil && opts.ParentIsObject(rctx, bucket, path.Dir(object)) { return oi, toObjectErr(errFileParentIsFile, bucket, object) } @@ -761,15 +766,15 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str storageDisks := er.getDisks() // Read metadata associated with the object from all disks. - partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "", false) + partsMetadata, errs := readAllFileInfo(rctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "", false) // get Quorum for this object - _, writeQuorum, err := objectQuorumFromMeta(ctx, partsMetadata, errs, er.defaultParityCount) + _, writeQuorum, err := objectQuorumFromMeta(rctx, partsMetadata, errs, er.defaultParityCount) if err != nil { return oi, toObjectErr(err, bucket, object) } - reducedErr := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum) + reducedErr := reduceWriteQuorumErrs(rctx, errs, objectOpIgnoredErrs, writeQuorum) if reducedErr == errErasureWriteQuorum { return oi, toObjectErr(reducedErr, bucket, object) } @@ -777,7 +782,7 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str onlineDisks, modTime, dataDir := listOnlineDisks(storageDisks, partsMetadata, errs) // Pick one from the first valid metadata. - fi, err := pickValidFileInfo(ctx, partsMetadata, modTime, dataDir, writeQuorum) + fi, err := pickValidFileInfo(rctx, partsMetadata, modTime, dataDir, writeQuorum) if err != nil { return oi, err } @@ -871,8 +876,18 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str } } + // Hold namespace to complete the transaction + lk := er.NewNSLock(bucket, object) + lkctx, err := lk.GetLock(ctx, globalOperationTimeout) + if err != nil { + return oi, err + } + ctx = lkctx.Context() + defer lk.Unlock(lkctx.Cancel) + // Write final `xl.meta` at uploadID location - if onlineDisks, err = writeUniqueFileInfo(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum); err != nil { + onlineDisks, err = writeUniqueFileInfo(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum) + if err != nil { return oi, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath) } @@ -889,14 +904,6 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str } } - // Hold namespace to complete the transaction - lk := er.NewNSLock(bucket, object) - ctx, cancel, err = lk.GetLock(ctx, globalOperationTimeout) - if err != nil { - return oi, err - } - defer lk.Unlock(cancel) - // Rename the multipart object to final location. if onlineDisks, err = renameData(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, bucket, object, writeQuorum); err != nil { @@ -934,11 +941,12 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str // operation. func (er erasureObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (err error) { lk := er.NewNSLock(bucket, pathJoin(object, uploadID)) - ctx, cancel, err := lk.GetLock(ctx, globalOperationTimeout) + lkctx, err := lk.GetLock(ctx, globalOperationTimeout) if err != nil { return err } - defer lk.Unlock(cancel) + ctx = lkctx.Context() + defer lk.Unlock(lkctx.Cancel) // Validates if upload ID exists. if err := er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil { diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index c8653c767..a480bbcf1 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -65,13 +65,13 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d defer ObjectPathUpdated(pathJoin(dstBucket, dstObject)) if !dstOpts.NoLock { - var cancel context.CancelFunc lk := er.NewNSLock(dstBucket, dstObject) - ctx, cancel, err = lk.GetLock(ctx, globalOperationTimeout) + lkctx, err := lk.GetLock(ctx, globalOperationTimeout) if err != nil { return oi, err } - defer lk.Unlock(cancel) + ctx = lkctx.Context() + defer lk.Unlock(lkctx.Cancel) } // Read metadata associated with the object from all disks. storageDisks := er.getDisks() @@ -146,21 +146,22 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri // Acquire lock if lockType != noLock { - var cancel context.CancelFunc lock := er.NewNSLock(bucket, object) switch lockType { case writeLock: - ctx, cancel, err = lock.GetLock(ctx, globalOperationTimeout) + lkctx, err := lock.GetLock(ctx, globalOperationTimeout) if err != nil { return nil, err } - nsUnlocker = func() { lock.Unlock(cancel) } + ctx = lkctx.Context() + nsUnlocker = func() { lock.Unlock(lkctx.Cancel) } case readLock: - ctx, cancel, err = lock.GetRLock(ctx, globalOperationTimeout) + lkctx, err := lock.GetRLock(ctx, globalOperationTimeout) if err != nil { return nil, err } - nsUnlocker = func() { lock.RUnlock(cancel) } + ctx = lkctx.Context() + nsUnlocker = func() { lock.RUnlock(lkctx.Cancel) } } unlockOnDefer = true } @@ -215,11 +216,12 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri func (er erasureObjects) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) (err error) { // Lock the object before reading. lk := er.NewNSLock(bucket, object) - ctx, cancel, err := lk.GetRLock(ctx, globalOperationTimeout) + lkctx, err := lk.GetRLock(ctx, globalOperationTimeout) if err != nil { return err } - defer lk.RUnlock(cancel) + ctx = lkctx.Context() + defer lk.RUnlock(lkctx.Cancel) // Start offset cannot be negative. if startOffset < 0 { @@ -378,14 +380,14 @@ func (er erasureObjects) getObject(ctx context.Context, bucket, object string, s // GetObjectInfo - reads object metadata and replies back ObjectInfo. func (er erasureObjects) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (info ObjectInfo, err error) { if !opts.NoLock { - var cancel context.CancelFunc // Lock the object before reading. lk := er.NewNSLock(bucket, object) - ctx, cancel, err = lk.GetRLock(ctx, globalOperationTimeout) + lkctx, err := lk.GetRLock(ctx, globalOperationTimeout) if err != nil { return ObjectInfo{}, err } - defer lk.RUnlock(cancel) + ctx = lkctx.Context() + defer lk.RUnlock(lkctx.Cancel) } return er.getObjectInfo(ctx, bucket, object, opts) @@ -740,14 +742,13 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st } if !opts.NoLock { - var err error - var cancel context.CancelFunc lk := er.NewNSLock(bucket, object) - ctx, cancel, err = lk.GetLock(ctx, globalOperationTimeout) + lkctx, err := lk.GetLock(ctx, globalOperationTimeout) if err != nil { return ObjectInfo{}, err } - defer lk.Unlock(cancel) + ctx = lkctx.Context() + defer lk.Unlock(lkctx.Cancel) } for i, w := range writers { @@ -1053,11 +1054,12 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string } // Acquire a write lock before deleting the object. lk := er.NewNSLock(bucket, object) - ctx, cancel, err := lk.GetLock(ctx, globalDeleteOperationTimeout) + lkctx, err := lk.GetLock(ctx, globalDeleteOperationTimeout) if err != nil { return ObjectInfo{}, err } - defer lk.Unlock(cancel) + ctx = lkctx.Context() + defer lk.Unlock(lkctx.Cancel) storageDisks := er.getDisks() writeQuorum := len(storageDisks)/2 + 1 @@ -1166,11 +1168,12 @@ func (er erasureObjects) addPartial(bucket, object, versionID string) { func (er erasureObjects) PutObjectMetadata(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) { // Lock the object before updating tags. lk := er.NewNSLock(bucket, object) - ctx, cancel, err := lk.GetLock(ctx, globalOperationTimeout) + lkctx, err := lk.GetLock(ctx, globalOperationTimeout) if err != nil { return ObjectInfo{}, err } - defer lk.Unlock(cancel) + ctx = lkctx.Context() + defer lk.Unlock(lkctx.Cancel) disks := er.getDisks() @@ -1216,11 +1219,12 @@ func (er erasureObjects) PutObjectMetadata(ctx context.Context, bucket, object s func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object string, tags string, opts ObjectOptions) (ObjectInfo, error) { // Lock the object before updating tags. lk := er.NewNSLock(bucket, object) - ctx, cancel, err := lk.GetLock(ctx, globalOperationTimeout) + lkctx, err := lk.GetLock(ctx, globalOperationTimeout) if err != nil { return ObjectInfo{}, err } - defer lk.Unlock(cancel) + ctx = lkctx.Context() + defer lk.Unlock(lkctx.Cancel) disks := er.getDisks() @@ -1310,11 +1314,12 @@ func (er erasureObjects) TransitionObject(ctx context.Context, bucket, object st } // Acquire write lock before starting to transition the object. lk := er.NewNSLock(bucket, object) - ctx, cancel, err := lk.GetLock(ctx, globalDeleteOperationTimeout) + lkctx, err := lk.GetLock(ctx, globalDeleteOperationTimeout) if err != nil { return err } - defer lk.Unlock(cancel) + ctx = lkctx.Context() + defer lk.Unlock(lkctx.Cancel) fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, bucket, object, opts, true) if err != nil { diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index 896409bdf..a608ca993 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -613,20 +613,21 @@ func (z *erasureServerPools) GetObjectNInfo(ctx context.Context, bucket, object // Acquire lock if lockType != noLock { lock := z.NewNSLock(bucket, object) - var cancel context.CancelFunc switch lockType { case writeLock: - ctx, cancel, err = lock.GetLock(ctx, globalOperationTimeout) + lkctx, err := lock.GetLock(ctx, globalOperationTimeout) if err != nil { return nil, err } - nsUnlocker = func() { lock.Unlock(cancel) } + ctx = lkctx.Context() + nsUnlocker = func() { lock.Unlock(lkctx.Cancel) } case readLock: - ctx, cancel, err = lock.GetRLock(ctx, globalOperationTimeout) + lkctx, err := lock.GetRLock(ctx, globalOperationTimeout) if err != nil { return nil, err } - nsUnlocker = func() { lock.RUnlock(cancel) } + ctx = lkctx.Context() + nsUnlocker = func() { lock.RUnlock(lkctx.Cancel) } } unlockOnDefer = true } @@ -685,11 +686,12 @@ func (z *erasureServerPools) GetObjectInfo(ctx context.Context, bucket, object s // Lock the object before reading. lk := z.NewNSLock(bucket, object) - ctx, cancel, err := lk.GetRLock(ctx, globalOperationTimeout) + lkctx, err := lk.GetRLock(ctx, globalOperationTimeout) if err != nil { return ObjectInfo{}, err } - defer lk.RUnlock(cancel) + ctx = lkctx.Context() + defer lk.RUnlock(lkctx.Cancel) errs := make([]error, len(z.serverPools)) objInfos := make([]ObjectInfo, len(z.serverPools)) @@ -803,14 +805,15 @@ func (z *erasureServerPools) DeleteObjects(ctx context.Context, bucket string, o // Acquire a bulk write lock across 'objects' multiDeleteLock := z.NewNSLock(bucket, objSets.ToSlice()...) - ctx, cancel, err := multiDeleteLock.GetLock(ctx, globalOperationTimeout) + lkctx, err := multiDeleteLock.GetLock(ctx, globalOperationTimeout) if err != nil { for i := range derrs { derrs[i] = err } return dobjects, derrs } - defer multiDeleteLock.Unlock(cancel) + ctx = lkctx.Context() + defer multiDeleteLock.Unlock(lkctx.Cancel) if z.SinglePool() { return z.serverPools[0].DeleteObjects(ctx, bucket, objects, opts) @@ -1373,11 +1376,12 @@ func (z *erasureServerPools) ListBuckets(ctx context.Context) (buckets []BucketI func (z *erasureServerPools) HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error) { // Acquire lock on format.json formatLock := z.NewNSLock(minioMetaBucket, formatConfigFile) - ctx, cancel, err := formatLock.GetLock(ctx, globalOperationTimeout) + lkctx, err := formatLock.GetLock(ctx, globalOperationTimeout) if err != nil { return madmin.HealResultItem{}, err } - defer formatLock.Unlock(cancel) + ctx = lkctx.Context() + defer formatLock.Unlock(lkctx.Cancel) var r = madmin.HealResultItem{ Type: madmin.HealItemMetadata, diff --git a/cmd/fs-v1-multipart.go b/cmd/fs-v1-multipart.go index f648035dd..37af70141 100644 --- a/cmd/fs-v1-multipart.go +++ b/cmd/fs-v1-multipart.go @@ -710,11 +710,12 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string, // Hold write lock on the object. destLock := fs.NewNSLock(bucket, object) - ctx, cancel, err := destLock.GetLock(ctx, globalOperationTimeout) + lkctx, err := destLock.GetLock(ctx, globalOperationTimeout) if err != nil { return oi, err } - defer destLock.Unlock(cancel) + ctx = lkctx.Context() + defer destLock.Unlock(lkctx.Cancel) bucketMetaDir := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix) fsMetaPath := pathJoin(bucketMetaDir, bucket, object, fs.metaJSONFile) diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index 6df7613d8..60e632ecf 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -609,13 +609,13 @@ func (fs *FSObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBu defer ObjectPathUpdated(path.Join(dstBucket, dstObject)) if !cpSrcDstSame { - var cancel context.CancelFunc objectDWLock := fs.NewNSLock(dstBucket, dstObject) - ctx, cancel, err = objectDWLock.GetLock(ctx, globalOperationTimeout) + lkctx, err := objectDWLock.GetLock(ctx, globalOperationTimeout) if err != nil { return oi, err } - defer objectDWLock.Unlock(cancel) + ctx = lkctx.Context() + defer objectDWLock.Unlock(lkctx.Cancel) } atomic.AddInt64(&fs.activeIOCount, 1) @@ -703,20 +703,21 @@ func (fs *FSObjects) GetObjectNInfo(ctx context.Context, bucket, object string, if lockType != noLock { // Lock the object before reading. lock := fs.NewNSLock(bucket, object) - var cancel context.CancelFunc switch lockType { case writeLock: - ctx, cancel, err = lock.GetLock(ctx, globalOperationTimeout) + lkctx, err := lock.GetLock(ctx, globalOperationTimeout) if err != nil { return nil, err } - nsUnlocker = func() { lock.Unlock(cancel) } + ctx = lkctx.Context() + nsUnlocker = func() { lock.Unlock(lkctx.Cancel) } case readLock: - ctx, cancel, err = lock.GetRLock(ctx, globalOperationTimeout) + lkctx, err := lock.GetRLock(ctx, globalOperationTimeout) if err != nil { return nil, err } - nsUnlocker = func() { lock.RUnlock(cancel) } + ctx = lkctx.Context() + nsUnlocker = func() { lock.RUnlock(lkctx.Cancel) } } } @@ -985,11 +986,12 @@ func (fs *FSObjects) getObjectInfo(ctx context.Context, bucket, object string) ( func (fs *FSObjects) getObjectInfoWithLock(ctx context.Context, bucket, object string) (oi ObjectInfo, err error) { // Lock the object before reading. lk := fs.NewNSLock(bucket, object) - ctx, cancel, err := lk.GetRLock(ctx, globalOperationTimeout) + lkctx, err := lk.GetRLock(ctx, globalOperationTimeout) if err != nil { return oi, err } - defer lk.RUnlock(cancel) + ctx = lkctx.Context() + defer lk.RUnlock(lkctx.Cancel) if err := checkGetObjArgs(ctx, bucket, object); err != nil { return oi, err @@ -1023,21 +1025,21 @@ func (fs *FSObjects) GetObjectInfo(ctx context.Context, bucket, object string, o oi, err := fs.getObjectInfoWithLock(ctx, bucket, object) if err == errCorruptedFormat || err == io.EOF { - var cancel context.CancelFunc lk := fs.NewNSLock(bucket, object) - _, cancel, err = lk.GetLock(ctx, globalOperationTimeout) + lkctx, err := lk.GetLock(ctx, globalOperationTimeout) if err != nil { return oi, toObjectErr(err, bucket, object) } fsMetaPath := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket, object, fs.metaJSONFile) err = fs.createFsJSON(object, fsMetaPath) - lk.Unlock(cancel) + lk.Unlock(lkctx.Cancel) if err != nil { return oi, toObjectErr(err, bucket, object) } oi, err = fs.getObjectInfoWithLock(ctx, bucket, object) + return oi, toObjectErr(err, bucket, object) } return oi, toObjectErr(err, bucket, object) } @@ -1077,12 +1079,13 @@ func (fs *FSObjects) PutObject(ctx context.Context, bucket string, object string // Lock the object. lk := fs.NewNSLock(bucket, object) - ctx, cancel, err := lk.GetLock(ctx, globalOperationTimeout) + lkctx, err := lk.GetLock(ctx, globalOperationTimeout) if err != nil { logger.LogIf(ctx, err) return objInfo, err } - defer lk.Unlock(cancel) + ctx = lkctx.Context() + defer lk.Unlock(lkctx.Cancel) defer ObjectPathUpdated(path.Join(bucket, object)) atomic.AddInt64(&fs.activeIOCount, 1) @@ -1253,11 +1256,12 @@ func (fs *FSObjects) DeleteObject(ctx context.Context, bucket, object string, op // Acquire a write lock before deleting the object. lk := fs.NewNSLock(bucket, object) - ctx, cancel, err := lk.GetLock(ctx, globalOperationTimeout) + lkctx, err := lk.GetLock(ctx, globalOperationTimeout) if err != nil { return objInfo, err } - defer lk.Unlock(cancel) + ctx = lkctx.Context() + defer lk.Unlock(lkctx.Cancel) if err = checkDelObjArgs(ctx, bucket, object); err != nil { return objInfo, err diff --git a/cmd/iam.go b/cmd/iam.go index 817009782..3557ff93e 100644 --- a/cmd/iam.go +++ b/cmd/iam.go @@ -597,7 +597,7 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) { for { // let one of the server acquire the lock, if not let them timeout. // which shall be retried again by this loop. - _, cancel, err := txnLk.GetLock(retryCtx, iamLockTimeout) + lkctx, err := txnLk.GetLock(retryCtx, iamLockTimeout) if err != nil { logger.Info("Waiting for all MinIO IAM sub-system to be initialized.. trying to acquire lock") time.Sleep(time.Duration(r.Float64() * float64(5*time.Second))) @@ -608,8 +608,8 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) { // **** WARNING **** // Migrating to encrypted backend on etcd should happen before initialization of // IAM sub-system, make sure that we do not move the above codeblock elsewhere. - if err := migrateIAMConfigsEtcdToEncrypted(ctx, globalEtcdClient); err != nil { - txnLk.Unlock(cancel) + if err := migrateIAMConfigsEtcdToEncrypted(lkctx.Context(), globalEtcdClient); err != nil { + txnLk.Unlock(lkctx.Cancel) logger.LogIf(ctx, fmt.Errorf("Unable to decrypt an encrypted ETCD backend for IAM users and policies: %w", err)) logger.LogIf(ctx, errors.New("IAM sub-system is partially initialized, some users may not be available")) return @@ -622,8 +622,8 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) { } // Migrate IAM configuration, if necessary. - if err := sys.doIAMConfigMigration(ctx); err != nil { - txnLk.Unlock(cancel) + if err := sys.doIAMConfigMigration(lkctx.Context()); err != nil { + txnLk.Unlock(lkctx.Cancel) if configRetriableErrors(err) { logger.Info("Waiting for all MinIO IAM sub-system to be initialized.. possible cause (%v)", err) continue @@ -634,7 +634,7 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) { } // Successfully migrated, proceed to load the users. - txnLk.Unlock(cancel) + txnLk.Unlock(lkctx.Cancel) break } diff --git a/cmd/namespace-lock.go b/cmd/namespace-lock.go index 37c4e2b7a..f400841a7 100644 --- a/cmd/namespace-lock.go +++ b/cmd/namespace-lock.go @@ -39,12 +39,30 @@ var globalLockServer *localLocker // RWLocker - locker interface to introduce GetRLock, RUnlock. type RWLocker interface { - GetLock(ctx context.Context, timeout *dynamicTimeout) (newCtx context.Context, cancel context.CancelFunc, timedOutErr error) + GetLock(ctx context.Context, timeout *dynamicTimeout) (lkCtx LockContext, timedOutErr error) Unlock(cancel context.CancelFunc) - GetRLock(ctx context.Context, timeout *dynamicTimeout) (newCtx context.Context, cancel context.CancelFunc, timedOutErr error) + GetRLock(ctx context.Context, timeout *dynamicTimeout) (lkCtx LockContext, timedOutErr error) RUnlock(cancel context.CancelFunc) } +// LockContext lock context holds the lock backed context and canceler for the context. +type LockContext struct { + ctx context.Context + cancel context.CancelFunc +} + +// Context returns lock context +func (l LockContext) Context() context.Context { + return l.ctx +} + +// Cancel function calls cancel() function +func (l LockContext) Cancel() { + if l.cancel != nil { + l.cancel() + } +} + // newNSLock - return a new name space lock map. func newNSLock(isDistErasure bool) *nsLockMap { nsMutex := nsLockMap{ @@ -143,7 +161,7 @@ type distLockInstance struct { } // Lock - block until write lock is taken or timeout has occurred. -func (di *distLockInstance) GetLock(ctx context.Context, timeout *dynamicTimeout) (context.Context, context.CancelFunc, error) { +func (di *distLockInstance) GetLock(ctx context.Context, timeout *dynamicTimeout) (LockContext, error) { lockSource := getSource(2) start := UTCNow() @@ -152,10 +170,11 @@ func (di *distLockInstance) GetLock(ctx context.Context, timeout *dynamicTimeout Timeout: timeout.Timeout(), }) { timeout.LogFailure() - return ctx, nil, OperationTimedOut{} + cancel() + return LockContext{ctx: ctx, cancel: func() {}}, OperationTimedOut{} } timeout.LogSuccess(UTCNow().Sub(start)) - return newCtx, cancel, nil + return LockContext{ctx: newCtx, cancel: cancel}, nil } // Unlock - block until write lock is released. @@ -167,7 +186,7 @@ func (di *distLockInstance) Unlock(cancel context.CancelFunc) { } // RLock - block until read lock is taken or timeout has occurred. -func (di *distLockInstance) GetRLock(ctx context.Context, timeout *dynamicTimeout) (context.Context, context.CancelFunc, error) { +func (di *distLockInstance) GetRLock(ctx context.Context, timeout *dynamicTimeout) (LockContext, error) { lockSource := getSource(2) start := UTCNow() @@ -176,10 +195,11 @@ func (di *distLockInstance) GetRLock(ctx context.Context, timeout *dynamicTimeou Timeout: timeout.Timeout(), }) { timeout.LogFailure() - return ctx, nil, OperationTimedOut{} + cancel() + return LockContext{ctx: ctx, cancel: func() {}}, OperationTimedOut{} } timeout.LogSuccess(UTCNow().Sub(start)) - return newCtx, cancel, nil + return LockContext{ctx: newCtx, cancel: cancel}, nil } // RUnlock - block until read lock is released. @@ -214,7 +234,7 @@ func (n *nsLockMap) NewNSLock(lockers func() ([]dsync.NetLocker, string), volume } // Lock - block until write lock is taken or timeout has occurred. -func (li *localLockInstance) GetLock(ctx context.Context, timeout *dynamicTimeout) (_ context.Context, _ context.CancelFunc, timedOutErr error) { +func (li *localLockInstance) GetLock(ctx context.Context, timeout *dynamicTimeout) (_ LockContext, timedOutErr error) { lockSource := getSource(2) start := UTCNow() const readLock = false @@ -227,12 +247,12 @@ func (li *localLockInstance) GetLock(ctx context.Context, timeout *dynamicTimeou li.ns.unlock(li.volume, li.paths[si], readLock) } } - return nil, nil, OperationTimedOut{} + return LockContext{}, OperationTimedOut{} } success[i] = 1 } timeout.LogSuccess(UTCNow().Sub(start)) - return ctx, func() {}, nil + return LockContext{ctx: ctx, cancel: func() {}}, nil } // Unlock - block until write lock is released. @@ -247,7 +267,7 @@ func (li *localLockInstance) Unlock(cancel context.CancelFunc) { } // RLock - block until read lock is taken or timeout has occurred. -func (li *localLockInstance) GetRLock(ctx context.Context, timeout *dynamicTimeout) (_ context.Context, _ context.CancelFunc, timedOutErr error) { +func (li *localLockInstance) GetRLock(ctx context.Context, timeout *dynamicTimeout) (_ LockContext, timedOutErr error) { lockSource := getSource(2) start := UTCNow() const readLock = true @@ -260,12 +280,12 @@ func (li *localLockInstance) GetRLock(ctx context.Context, timeout *dynamicTimeo li.ns.unlock(li.volume, li.paths[si], readLock) } } - return nil, nil, OperationTimedOut{} + return LockContext{}, OperationTimedOut{} } success[i] = 1 } timeout.LogSuccess(UTCNow().Sub(start)) - return ctx, func() {}, nil + return LockContext{ctx: ctx, cancel: func() {}}, nil } // RUnlock - block until read lock is released. diff --git a/cmd/server-main.go b/cmd/server-main.go index 43c7c6d28..e87929c4f 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -299,7 +299,7 @@ func initServer(ctx context.Context, newObject ObjectLayer) error { // let one of the server acquire the lock, if not let them timeout. // which shall be retried again by this loop. - _, cancel, err := txnLk.GetLock(ctx, lockTimeout) + lkctx, err := txnLk.GetLock(ctx, lockTimeout) if err != nil { logger.Info("Waiting for all MinIO sub-systems to be initialized.. trying to acquire lock") @@ -319,7 +319,7 @@ func initServer(ctx context.Context, newObject ObjectLayer) error { // Upon success migrating the config, initialize all sub-systems // if all sub-systems initialized successfully return right away if err = initAllSubsystems(ctx, newObject); err == nil { - txnLk.Unlock(cancel) + txnLk.Unlock(lkctx.Cancel) // All successful return. if globalIsDistErasure { // These messages only meant primarily for distributed setup, so only log during distributed setup. @@ -329,7 +329,8 @@ func initServer(ctx context.Context, newObject ObjectLayer) error { } } - txnLk.Unlock(cancel) // Unlock the transaction lock and allow other nodes to acquire the lock if possible. + // Unlock the transaction lock and allow other nodes to acquire the lock if possible. + txnLk.Unlock(lkctx.Cancel) if configRetriableErrors(err) { logger.Info("Waiting for all MinIO sub-systems to be initialized.. possible cause (%v)", err)