diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index b7ffbf2b3..22c02d815 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -1412,17 +1412,16 @@ func (a adminAPIHandlers) HealthInfoHandler(w http.ResponseWriter, r *http.Reque } } - deadlinedCtx, cancel := context.WithTimeout(ctx, deadline) - defer cancel() + deadlinedCtx, deadlineCancel := context.WithTimeout(ctx, deadline) + defer deadlineCancel() - var err error nsLock := objectAPI.NewNSLock(minioMetaBucket, "health-check-in-progress") - ctx, err = nsLock.GetLock(ctx, newDynamicTimeout(deadline, deadline)) + ctx, cancel, err := nsLock.GetLock(ctx, newDynamicTimeout(deadline, deadline)) if err != nil { // returns a locked lock errResp(err) return } - defer nsLock.Unlock() + defer nsLock.Unlock(cancel) go func() { defer close(healthInfoCh) diff --git a/cmd/data-scanner.go b/cmd/data-scanner.go index b80330010..8a4865f04 100644 --- a/cmd/data-scanner.go +++ b/cmd/data-scanner.go @@ -92,16 +92,18 @@ func (s *safeDuration) Get() time.Duration { // The function will block until the context is canceled. // There should only ever be one scanner running per cluster. func runDataScanner(ctx context.Context, objAPI ObjectLayer) { - var err error // Make sure only 1 scanner is running on the cluster. locker := objAPI.NewNSLock(minioMetaBucket, "runDataScanner.lock") r := rand.New(rand.NewSource(time.Now().UnixNano())) for { - ctx, err = locker.GetLock(ctx, dataScannerLeaderLockTimeout) + var err error + var cancel context.CancelFunc + ctx, cancel, err = locker.GetLock(ctx, dataScannerLeaderLockTimeout) if err != nil { time.Sleep(time.Duration(r.Float64() * float64(scannerCycle.Get()))) continue } + defer cancel() break // No unlock for "leader" lock. } diff --git a/cmd/disk-cache-backend.go b/cmd/disk-cache-backend.go index 129063e9a..809b234a5 100644 --- a/cmd/disk-cache-backend.go +++ b/cmd/disk-cache-backend.go @@ -438,13 +438,13 @@ func (c *diskCache) Stat(ctx context.Context, bucket, object string) (oi ObjectI // statCachedMeta returns metadata from cache - including ranges cached, partial to indicate // if partial object is cached. func (c *diskCache) statCachedMeta(ctx context.Context, cacheObjPath string) (meta *cacheMeta, partial bool, numHits int, err error) { - cLock := c.NewNSLockFn(cacheObjPath) - if ctx, err = cLock.GetRLock(ctx, globalOperationTimeout); err != nil { + ctx, cancel, err := cLock.GetRLock(ctx, globalOperationTimeout) + if err != nil { return } - defer cLock.RUnlock() + defer cLock.RUnlock(cancel) return c.statCache(ctx, cacheObjPath) } @@ -518,14 +518,13 @@ func (c *diskCache) statCache(ctx context.Context, cacheObjPath string) (meta *c // saves object metadata to disk cache // incHitsOnly is true if metadata update is incrementing only the hit counter func (c *diskCache) SaveMetadata(ctx context.Context, bucket, object string, meta map[string]string, actualSize int64, rs *HTTPRangeSpec, rsFileName string, incHitsOnly bool) error { - var err error cachedPath := getCacheSHADir(c.dir, bucket, object) cLock := c.NewNSLockFn(cachedPath) - ctx, err = cLock.GetLock(ctx, globalOperationTimeout) + ctx, cancel, err := cLock.GetLock(ctx, globalOperationTimeout) if err != nil { return err } - defer cLock.Unlock() + defer cLock.Unlock(cancel) return c.saveMetadata(ctx, bucket, object, meta, actualSize, rs, rsFileName, incHitsOnly) } @@ -699,11 +698,11 @@ func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Read } cachePath := getCacheSHADir(c.dir, bucket, object) cLock := c.NewNSLockFn(cachePath) - ctx, err = cLock.GetLock(ctx, globalOperationTimeout) + ctx, cancel, err := cLock.GetLock(ctx, globalOperationTimeout) if err != nil { return oi, err } - defer cLock.Unlock() + defer cLock.Unlock(cancel) meta, _, numHits, err := c.statCache(ctx, cachePath) // Case where object not yet cached @@ -914,12 +913,12 @@ func (c *diskCache) bitrotReadFromCache(ctx context.Context, filePath string, of func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, numHits int, err error) { cacheObjPath := getCacheSHADir(c.dir, bucket, object) cLock := c.NewNSLockFn(cacheObjPath) - ctx, err = cLock.GetRLock(ctx, globalOperationTimeout) + ctx, cancel, err := cLock.GetRLock(ctx, globalOperationTimeout) if err != nil { return nil, numHits, err } + defer cLock.RUnlock(cancel) - defer cLock.RUnlock() var objInfo ObjectInfo var rngInfo RangeInfo if objInfo, rngInfo, numHits, err = c.statRange(ctx, bucket, object, rs); err != nil { @@ -979,11 +978,11 @@ func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRang // Deletes the cached object func (c *diskCache) delete(ctx context.Context, cacheObjPath string) (err error) { cLock := c.NewNSLockFn(cacheObjPath) - _, err = cLock.GetLock(ctx, globalOperationTimeout) + _, cancel, err := cLock.GetLock(ctx, globalOperationTimeout) if err != nil { return err } - defer cLock.Unlock() + defer cLock.Unlock(cancel) return removeAll(cacheObjPath) } diff --git a/cmd/erasure-healing.go b/cmd/erasure-healing.go index 228174a4a..9f7807410 100644 --- a/cmd/erasure-healing.go +++ b/cmd/erasure-healing.go @@ -253,11 +253,12 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s } if !opts.NoLock { + var cancel context.CancelFunc lk := er.NewNSLock(bucket, object) - if ctx, err = lk.GetLock(ctx, globalOperationTimeout); err != nil { + if ctx, cancel, err = lk.GetLock(ctx, globalOperationTimeout); err != nil { return result, err } - defer lk.Unlock() + defer lk.Unlock(cancel) } // Re-read when we have lock... diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index eebaaa7fe..3b853641d 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -378,21 +378,21 @@ func (er erasureObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObjec // Implements S3 compatible Upload Part API. func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *PutObjReader, opts ObjectOptions) (pi PartInfo, err error) { uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID)) - ctx, err = uploadIDLock.GetRLock(ctx, globalOperationTimeout) + rctx, rcancel, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout) if err != nil { return PartInfo{}, err } readLocked := true defer func() { if readLocked { - uploadIDLock.RUnlock() + uploadIDLock.RUnlock(rcancel) } }() data := r.Reader // Validate input data size and it can never be less than zero. if data.Size() < -1 { - logger.LogIf(ctx, errInvalidArgument, logger.Application) + logger.LogIf(rctx, errInvalidArgument, logger.Application) return pi, toObjectErr(errInvalidArgument) } @@ -401,23 +401,23 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo uploadIDPath := er.getUploadIDDir(bucket, object, uploadID) // Validates if upload ID exists. - if err = er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil { + if err = er.checkUploadIDExists(rctx, bucket, object, uploadID); err != nil { return pi, toObjectErr(err, bucket, object, uploadID) } storageDisks := er.getDisks() // Read metadata associated with the object from all disks. - partsMetadata, errs = readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, + partsMetadata, errs = readAllFileInfo(rctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "", false) // get Quorum for this object - _, writeQuorum, err := objectQuorumFromMeta(ctx, partsMetadata, errs, er.defaultParityCount) + _, writeQuorum, err := objectQuorumFromMeta(rctx, partsMetadata, errs, er.defaultParityCount) if err != nil { return pi, toObjectErr(err, bucket, object) } - reducedErr := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum) + reducedErr := reduceWriteQuorumErrs(rctx, errs, objectOpIgnoredErrs, writeQuorum) if reducedErr == errErasureWriteQuorum { return pi, toObjectErr(reducedErr, bucket, object) } @@ -426,7 +426,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo onlineDisks, modTime, dataDir := listOnlineDisks(storageDisks, partsMetadata, errs) // Pick one from the first valid metadata. - fi, err := pickValidFileInfo(ctx, partsMetadata, modTime, dataDir, writeQuorum) + fi, err := pickValidFileInfo(rctx, partsMetadata, modTime, dataDir, writeQuorum) if err != nil { return pi, err } @@ -448,7 +448,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo } }() - erasure, err := NewErasure(ctx, fi.Erasure.DataBlocks, fi.Erasure.ParityBlocks, fi.Erasure.BlockSize) + erasure, err := NewErasure(rctx, fi.Erasure.DataBlocks, fi.Erasure.ParityBlocks, fi.Erasure.BlockSize) if err != nil { return pi, toObjectErr(err, bucket, object) } @@ -485,7 +485,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo erasure.ShardFileSize(data.Size()), DefaultBitrotAlgorithm, erasure.ShardSize(), false) } - n, err := erasure.Encode(ctx, data, writers, buffer, writeQuorum) + n, err := erasure.Encode(rctx, data, writers, buffer, writeQuorum) closeBitrotWriters(writers) if err != nil { return pi, toObjectErr(err, bucket, object) @@ -505,29 +505,29 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo // Unlock here before acquiring write locks all concurrent // PutObjectParts would serialize here updating `xl.meta` - uploadIDLock.RUnlock() + uploadIDLock.RUnlock(rcancel) readLocked = false - ctx, err = uploadIDLock.GetLock(ctx, globalOperationTimeout) + wctx, wcancel, err := uploadIDLock.GetLock(ctx, globalOperationTimeout) if err != nil { return PartInfo{}, err } - defer uploadIDLock.Unlock() + defer uploadIDLock.Unlock(wcancel) // Validates if upload ID exists. - if err = er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil { + if err = er.checkUploadIDExists(wctx, bucket, object, uploadID); err != nil { return pi, toObjectErr(err, bucket, object, uploadID) } // Rename temporary part file to its final location. partPath := pathJoin(uploadIDPath, fi.DataDir, partSuffix) - onlineDisks, err = rename(ctx, onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, false, writeQuorum, nil) + onlineDisks, err = rename(wctx, onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, false, writeQuorum, nil) if err != nil { return pi, toObjectErr(err, minioMetaMultipartBucket, partPath) } // Read metadata again because it might be updated with parallel upload of another part. - partsMetadata, errs = readAllFileInfo(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, "", false) - reducedErr = reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum) + partsMetadata, errs = readAllFileInfo(wctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, "", false) + reducedErr = reduceWriteQuorumErrs(wctx, errs, objectOpIgnoredErrs, writeQuorum) if reducedErr == errErasureWriteQuorum { return pi, toObjectErr(reducedErr, bucket, object) } @@ -536,7 +536,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo onlineDisks, modTime, dataDir = listOnlineDisks(onlineDisks, partsMetadata, errs) // Pick one from the first valid metadata. - fi, err = pickValidFileInfo(ctx, partsMetadata, modTime, dataDir, writeQuorum) + fi, err = pickValidFileInfo(wctx, partsMetadata, modTime, dataDir, writeQuorum) if err != nil { return pi, err } @@ -564,7 +564,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo } // Writes update `xl.meta` format for each disk. - if _, err = writeUniqueFileInfo(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum); err != nil { + if _, err = writeUniqueFileInfo(wctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum); err != nil { return pi, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath) } @@ -591,13 +591,12 @@ func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, u UploadID: uploadID, } - var err error uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID)) - ctx, err = uploadIDLock.GetRLock(ctx, globalOperationTimeout) + ctx, cancel, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout) if err != nil { return MultipartInfo{}, err } - defer uploadIDLock.RUnlock() + defer uploadIDLock.RUnlock(cancel) if err := er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil { return result, toObjectErr(err, bucket, object, uploadID) @@ -642,11 +641,11 @@ func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, u // replied back to the client. func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker, maxParts int, opts ObjectOptions) (result ListPartsInfo, err error) { uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID)) - ctx, err = uploadIDLock.GetRLock(ctx, globalOperationTimeout) + ctx, cancel, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout) if err != nil { return ListPartsInfo{}, err } - defer uploadIDLock.RUnlock() + defer uploadIDLock.RUnlock(cancel) if err := er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil { return result, toObjectErr(err, bucket, object, uploadID) @@ -736,11 +735,11 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str // Hold read-locks to verify uploaded parts, also disallows // parallel part uploads as well. uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID)) - ctx, err = uploadIDLock.GetRLock(ctx, globalOperationTimeout) + ctx, cancel, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout) if err != nil { return oi, err } - defer uploadIDLock.RUnlock() + defer uploadIDLock.RUnlock(cancel) if err = er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil { return oi, toObjectErr(err, bucket, object, uploadID) @@ -892,11 +891,11 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str // Hold namespace to complete the transaction lk := er.NewNSLock(bucket, object) - ctx, err = lk.GetLock(ctx, globalOperationTimeout) + ctx, cancel, err = lk.GetLock(ctx, globalOperationTimeout) if err != nil { return oi, err } - defer lk.Unlock() + defer lk.Unlock(cancel) // Rename the multipart object to final location. if onlineDisks, err = renameData(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, @@ -935,11 +934,11 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str // operation. func (er erasureObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (err error) { lk := er.NewNSLock(bucket, pathJoin(object, uploadID)) - ctx, err = lk.GetLock(ctx, globalOperationTimeout) + ctx, cancel, err := lk.GetLock(ctx, globalOperationTimeout) if err != nil { return err } - defer lk.Unlock() + defer lk.Unlock(cancel) // Validates if upload ID exists. if err := er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil { diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index f90e54f9b..f95ed3398 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -65,12 +65,13 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d defer ObjectPathUpdated(pathJoin(dstBucket, dstObject)) if !dstOpts.NoLock { + var cancel context.CancelFunc lk := er.NewNSLock(dstBucket, dstObject) - ctx, err = lk.GetLock(ctx, globalOperationTimeout) + ctx, cancel, err = lk.GetLock(ctx, globalOperationTimeout) if err != nil { return oi, err } - defer lk.Unlock() + defer lk.Unlock(cancel) } // Read metadata associated with the object from all disks. storageDisks := er.getDisks() @@ -145,20 +146,21 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri // Acquire lock if lockType != noLock { + var cancel context.CancelFunc lock := er.NewNSLock(bucket, object) switch lockType { case writeLock: - ctx, err = lock.GetLock(ctx, globalOperationTimeout) + ctx, cancel, err = lock.GetLock(ctx, globalOperationTimeout) if err != nil { return nil, err } - nsUnlocker = lock.Unlock + nsUnlocker = func() { lock.Unlock(cancel) } case readLock: - ctx, err = lock.GetRLock(ctx, globalOperationTimeout) + ctx, cancel, err = lock.GetRLock(ctx, globalOperationTimeout) if err != nil { return nil, err } - nsUnlocker = lock.RUnlock + nsUnlocker = func() { lock.RUnlock(cancel) } } unlockOnDefer = true } @@ -213,11 +215,11 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri func (er erasureObjects) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) (err error) { // Lock the object before reading. lk := er.NewNSLock(bucket, object) - ctx, err = lk.GetRLock(ctx, globalOperationTimeout) + ctx, cancel, err := lk.GetRLock(ctx, globalOperationTimeout) if err != nil { return err } - defer lk.RUnlock() + defer lk.RUnlock(cancel) // Start offset cannot be negative. if startOffset < 0 { @@ -376,13 +378,14 @@ func (er erasureObjects) getObject(ctx context.Context, bucket, object string, s // GetObjectInfo - reads object metadata and replies back ObjectInfo. func (er erasureObjects) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (info ObjectInfo, err error) { if !opts.NoLock { + var cancel context.CancelFunc // Lock the object before reading. lk := er.NewNSLock(bucket, object) - ctx, err = lk.GetRLock(ctx, globalOperationTimeout) + ctx, cancel, err = lk.GetRLock(ctx, globalOperationTimeout) if err != nil { return ObjectInfo{}, err } - defer lk.RUnlock() + defer lk.RUnlock(cancel) } return er.getObjectInfo(ctx, bucket, object, opts) @@ -732,12 +735,13 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st if !opts.NoLock { var err error + var cancel context.CancelFunc lk := er.NewNSLock(bucket, object) - ctx, err = lk.GetLock(ctx, globalOperationTimeout) + ctx, cancel, err = lk.GetLock(ctx, globalOperationTimeout) if err != nil { return ObjectInfo{}, err } - defer lk.Unlock() + defer lk.Unlock(cancel) } for i, w := range writers { @@ -1043,11 +1047,11 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string } // Acquire a write lock before deleting the object. lk := er.NewNSLock(bucket, object) - ctx, err = lk.GetLock(ctx, globalDeleteOperationTimeout) + ctx, cancel, err := lk.GetLock(ctx, globalDeleteOperationTimeout) if err != nil { return ObjectInfo{}, err } - defer lk.Unlock() + defer lk.Unlock(cancel) storageDisks := er.getDisks() writeQuorum := len(storageDisks)/2 + 1 @@ -1154,14 +1158,13 @@ func (er erasureObjects) addPartial(bucket, object, versionID string) { } func (er erasureObjects) PutObjectMetadata(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) { - var err error // Lock the object before updating tags. lk := er.NewNSLock(bucket, object) - ctx, err = lk.GetLock(ctx, globalOperationTimeout) + ctx, cancel, err := lk.GetLock(ctx, globalOperationTimeout) if err != nil { return ObjectInfo{}, err } - defer lk.Unlock() + defer lk.Unlock(cancel) disks := er.getDisks() @@ -1205,14 +1208,13 @@ func (er erasureObjects) PutObjectMetadata(ctx context.Context, bucket, object s // PutObjectTags - replace or add tags to an existing object func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object string, tags string, opts ObjectOptions) (ObjectInfo, error) { - var err error // Lock the object before updating tags. lk := er.NewNSLock(bucket, object) - ctx, err = lk.GetLock(ctx, globalOperationTimeout) + ctx, cancel, err := lk.GetLock(ctx, globalOperationTimeout) if err != nil { return ObjectInfo{}, err } - defer lk.Unlock() + defer lk.Unlock(cancel) disks := er.getDisks() @@ -1302,11 +1304,11 @@ func (er erasureObjects) TransitionObject(ctx context.Context, bucket, object st } // Acquire write lock before starting to transition the object. lk := er.NewNSLock(bucket, object) - ctx, err = lk.GetLock(ctx, globalDeleteOperationTimeout) + ctx, cancel, err := lk.GetLock(ctx, globalDeleteOperationTimeout) if err != nil { return err } - defer lk.Unlock() + defer lk.Unlock(cancel) fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, bucket, object, opts, true) if err != nil { diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index e8808359d..896409bdf 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -613,19 +613,20 @@ func (z *erasureServerPools) GetObjectNInfo(ctx context.Context, bucket, object // Acquire lock if lockType != noLock { lock := z.NewNSLock(bucket, object) + var cancel context.CancelFunc switch lockType { case writeLock: - ctx, err = lock.GetLock(ctx, globalOperationTimeout) + ctx, cancel, err = lock.GetLock(ctx, globalOperationTimeout) if err != nil { return nil, err } - nsUnlocker = lock.Unlock + nsUnlocker = func() { lock.Unlock(cancel) } case readLock: - ctx, err = lock.GetRLock(ctx, globalOperationTimeout) + ctx, cancel, err = lock.GetRLock(ctx, globalOperationTimeout) if err != nil { return nil, err } - nsUnlocker = lock.RUnlock + nsUnlocker = func() { lock.RUnlock(cancel) } } unlockOnDefer = true } @@ -684,11 +685,11 @@ func (z *erasureServerPools) GetObjectInfo(ctx context.Context, bucket, object s // Lock the object before reading. lk := z.NewNSLock(bucket, object) - ctx, err = lk.GetRLock(ctx, globalOperationTimeout) + ctx, cancel, err := lk.GetRLock(ctx, globalOperationTimeout) if err != nil { return ObjectInfo{}, err } - defer lk.RUnlock() + defer lk.RUnlock(cancel) errs := make([]error, len(z.serverPools)) objInfos := make([]ObjectInfo, len(z.serverPools)) @@ -800,17 +801,16 @@ func (z *erasureServerPools) DeleteObjects(ctx context.Context, bucket string, o } } - var err error // Acquire a bulk write lock across 'objects' multiDeleteLock := z.NewNSLock(bucket, objSets.ToSlice()...) - ctx, err = multiDeleteLock.GetLock(ctx, globalOperationTimeout) + ctx, cancel, err := multiDeleteLock.GetLock(ctx, globalOperationTimeout) if err != nil { for i := range derrs { derrs[i] = err } return dobjects, derrs } - defer multiDeleteLock.Unlock() + defer multiDeleteLock.Unlock(cancel) if z.SinglePool() { return z.serverPools[0].DeleteObjects(ctx, bucket, objects, opts) @@ -1371,14 +1371,13 @@ func (z *erasureServerPools) ListBuckets(ctx context.Context) (buckets []BucketI } func (z *erasureServerPools) HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error) { - var err error // Acquire lock on format.json formatLock := z.NewNSLock(minioMetaBucket, formatConfigFile) - ctx, err = formatLock.GetLock(ctx, globalOperationTimeout) + ctx, cancel, err := formatLock.GetLock(ctx, globalOperationTimeout) if err != nil { return madmin.HealResultItem{}, err } - defer formatLock.Unlock() + defer formatLock.Unlock(cancel) var r = madmin.HealResultItem{ Type: madmin.HealItemMetadata, diff --git a/cmd/fs-v1-multipart.go b/cmd/fs-v1-multipart.go index 00762dd5a..f648035dd 100644 --- a/cmd/fs-v1-multipart.go +++ b/cmd/fs-v1-multipart.go @@ -710,11 +710,11 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string, // Hold write lock on the object. destLock := fs.NewNSLock(bucket, object) - ctx, err = destLock.GetLock(ctx, globalOperationTimeout) + ctx, cancel, err := destLock.GetLock(ctx, globalOperationTimeout) if err != nil { return oi, err } - defer destLock.Unlock() + defer destLock.Unlock(cancel) bucketMetaDir := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix) fsMetaPath := pathJoin(bucketMetaDir, bucket, object, fs.metaJSONFile) diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index c3a98ed5e..6df7613d8 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -609,12 +609,13 @@ func (fs *FSObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBu defer ObjectPathUpdated(path.Join(dstBucket, dstObject)) if !cpSrcDstSame { + var cancel context.CancelFunc objectDWLock := fs.NewNSLock(dstBucket, dstObject) - ctx, err = objectDWLock.GetLock(ctx, globalOperationTimeout) + ctx, cancel, err = objectDWLock.GetLock(ctx, globalOperationTimeout) if err != nil { return oi, err } - defer objectDWLock.Unlock() + defer objectDWLock.Unlock(cancel) } atomic.AddInt64(&fs.activeIOCount, 1) @@ -702,19 +703,20 @@ func (fs *FSObjects) GetObjectNInfo(ctx context.Context, bucket, object string, if lockType != noLock { // Lock the object before reading. lock := fs.NewNSLock(bucket, object) + var cancel context.CancelFunc switch lockType { case writeLock: - ctx, err = lock.GetLock(ctx, globalOperationTimeout) + ctx, cancel, err = lock.GetLock(ctx, globalOperationTimeout) if err != nil { return nil, err } - nsUnlocker = lock.Unlock + nsUnlocker = func() { lock.Unlock(cancel) } case readLock: - ctx, err = lock.GetRLock(ctx, globalOperationTimeout) + ctx, cancel, err = lock.GetRLock(ctx, globalOperationTimeout) if err != nil { return nil, err } - nsUnlocker = lock.RUnlock + nsUnlocker = func() { lock.RUnlock(cancel) } } } @@ -983,11 +985,11 @@ func (fs *FSObjects) getObjectInfo(ctx context.Context, bucket, object string) ( func (fs *FSObjects) getObjectInfoWithLock(ctx context.Context, bucket, object string) (oi ObjectInfo, err error) { // Lock the object before reading. lk := fs.NewNSLock(bucket, object) - ctx, err = lk.GetRLock(ctx, globalOperationTimeout) + ctx, cancel, err := lk.GetRLock(ctx, globalOperationTimeout) if err != nil { return oi, err } - defer lk.RUnlock() + defer lk.RUnlock(cancel) if err := checkGetObjArgs(ctx, bucket, object); err != nil { return oi, err @@ -1021,15 +1023,16 @@ func (fs *FSObjects) GetObjectInfo(ctx context.Context, bucket, object string, o oi, err := fs.getObjectInfoWithLock(ctx, bucket, object) if err == errCorruptedFormat || err == io.EOF { + var cancel context.CancelFunc lk := fs.NewNSLock(bucket, object) - ctx, err = lk.GetLock(ctx, globalOperationTimeout) + _, cancel, err = lk.GetLock(ctx, globalOperationTimeout) if err != nil { return oi, toObjectErr(err, bucket, object) } fsMetaPath := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket, object, fs.metaJSONFile) err = fs.createFsJSON(object, fsMetaPath) - lk.Unlock() + lk.Unlock(cancel) if err != nil { return oi, toObjectErr(err, bucket, object) } @@ -1074,12 +1077,12 @@ func (fs *FSObjects) PutObject(ctx context.Context, bucket string, object string // Lock the object. lk := fs.NewNSLock(bucket, object) - ctx, err = lk.GetLock(ctx, globalOperationTimeout) + ctx, cancel, err := lk.GetLock(ctx, globalOperationTimeout) if err != nil { logger.LogIf(ctx, err) return objInfo, err } - defer lk.Unlock() + defer lk.Unlock(cancel) defer ObjectPathUpdated(path.Join(bucket, object)) atomic.AddInt64(&fs.activeIOCount, 1) @@ -1250,11 +1253,11 @@ func (fs *FSObjects) DeleteObject(ctx context.Context, bucket, object string, op // Acquire a write lock before deleting the object. lk := fs.NewNSLock(bucket, object) - ctx, err = lk.GetLock(ctx, globalOperationTimeout) + ctx, cancel, err := lk.GetLock(ctx, globalOperationTimeout) if err != nil { return objInfo, err } - defer lk.Unlock() + defer lk.Unlock(cancel) if err = checkDelObjArgs(ctx, bucket, object); err != nil { return objInfo, err diff --git a/cmd/iam.go b/cmd/iam.go index 47aeadc67..9e8ef575b 100644 --- a/cmd/iam.go +++ b/cmd/iam.go @@ -597,7 +597,8 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) { for { // let one of the server acquire the lock, if not let them timeout. // which shall be retried again by this loop. - if _, err := txnLk.GetLock(retryCtx, iamLockTimeout); err != nil { + _, cancel, err := txnLk.GetLock(retryCtx, iamLockTimeout) + if err != nil { logger.Info("Waiting for all MinIO IAM sub-system to be initialized.. trying to acquire lock") time.Sleep(time.Duration(r.Float64() * float64(5*time.Second))) continue @@ -608,7 +609,7 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) { // Migrating to encrypted backend on etcd should happen before initialization of // IAM sub-system, make sure that we do not move the above codeblock elsewhere. if err := migrateIAMConfigsEtcdToEncrypted(ctx, globalEtcdClient); err != nil { - txnLk.Unlock() + txnLk.Unlock(cancel) logger.LogIf(ctx, fmt.Errorf("Unable to decrypt an encrypted ETCD backend for IAM users and policies: %w", err)) logger.LogIf(ctx, errors.New("IAM sub-system is partially initialized, some users may not be available")) return @@ -622,7 +623,7 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) { // Migrate IAM configuration, if necessary. if err := sys.doIAMConfigMigration(ctx); err != nil { - txnLk.Unlock() + txnLk.Unlock(cancel) if configRetriableErrors(err) { logger.Info("Waiting for all MinIO IAM sub-system to be initialized.. possible cause (%v)", err) continue @@ -633,7 +634,7 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) { } // Successfully migrated, proceed to load the users. - txnLk.Unlock() + txnLk.Unlock(cancel) break } diff --git a/cmd/namespace-lock.go b/cmd/namespace-lock.go index 58aaff398..37c4e2b7a 100644 --- a/cmd/namespace-lock.go +++ b/cmd/namespace-lock.go @@ -39,10 +39,10 @@ var globalLockServer *localLocker // RWLocker - locker interface to introduce GetRLock, RUnlock. type RWLocker interface { - GetLock(ctx context.Context, timeout *dynamicTimeout) (newCtx context.Context, timedOutErr error) - Unlock() - GetRLock(ctx context.Context, timeout *dynamicTimeout) (newCtx context.Context, timedOutErr error) - RUnlock() + GetLock(ctx context.Context, timeout *dynamicTimeout) (newCtx context.Context, cancel context.CancelFunc, timedOutErr error) + Unlock(cancel context.CancelFunc) + GetRLock(ctx context.Context, timeout *dynamicTimeout) (newCtx context.Context, cancel context.CancelFunc, timedOutErr error) + RUnlock(cancel context.CancelFunc) } // newNSLock - return a new name space lock map. @@ -143,7 +143,7 @@ type distLockInstance struct { } // Lock - block until write lock is taken or timeout has occurred. -func (di *distLockInstance) GetLock(ctx context.Context, timeout *dynamicTimeout) (context.Context, error) { +func (di *distLockInstance) GetLock(ctx context.Context, timeout *dynamicTimeout) (context.Context, context.CancelFunc, error) { lockSource := getSource(2) start := UTCNow() @@ -152,19 +152,22 @@ func (di *distLockInstance) GetLock(ctx context.Context, timeout *dynamicTimeout Timeout: timeout.Timeout(), }) { timeout.LogFailure() - return ctx, OperationTimedOut{} + return ctx, nil, OperationTimedOut{} } timeout.LogSuccess(UTCNow().Sub(start)) - return newCtx, nil + return newCtx, cancel, nil } // Unlock - block until write lock is released. -func (di *distLockInstance) Unlock() { +func (di *distLockInstance) Unlock(cancel context.CancelFunc) { + if cancel != nil { + cancel() + } di.rwMutex.Unlock() } // RLock - block until read lock is taken or timeout has occurred. -func (di *distLockInstance) GetRLock(ctx context.Context, timeout *dynamicTimeout) (context.Context, error) { +func (di *distLockInstance) GetRLock(ctx context.Context, timeout *dynamicTimeout) (context.Context, context.CancelFunc, error) { lockSource := getSource(2) start := UTCNow() @@ -173,14 +176,17 @@ func (di *distLockInstance) GetRLock(ctx context.Context, timeout *dynamicTimeou Timeout: timeout.Timeout(), }) { timeout.LogFailure() - return ctx, OperationTimedOut{} + return ctx, nil, OperationTimedOut{} } timeout.LogSuccess(UTCNow().Sub(start)) - return newCtx, nil + return newCtx, cancel, nil } // RUnlock - block until read lock is released. -func (di *distLockInstance) RUnlock() { +func (di *distLockInstance) RUnlock(cancel context.CancelFunc) { + if cancel != nil { + cancel() + } di.rwMutex.RUnlock() } @@ -208,7 +214,7 @@ func (n *nsLockMap) NewNSLock(lockers func() ([]dsync.NetLocker, string), volume } // Lock - block until write lock is taken or timeout has occurred. -func (li *localLockInstance) GetLock(ctx context.Context, timeout *dynamicTimeout) (_ context.Context, timedOutErr error) { +func (li *localLockInstance) GetLock(ctx context.Context, timeout *dynamicTimeout) (_ context.Context, _ context.CancelFunc, timedOutErr error) { lockSource := getSource(2) start := UTCNow() const readLock = false @@ -221,16 +227,19 @@ func (li *localLockInstance) GetLock(ctx context.Context, timeout *dynamicTimeou li.ns.unlock(li.volume, li.paths[si], readLock) } } - return nil, OperationTimedOut{} + return nil, nil, OperationTimedOut{} } success[i] = 1 } timeout.LogSuccess(UTCNow().Sub(start)) - return ctx, nil + return ctx, func() {}, nil } // Unlock - block until write lock is released. -func (li *localLockInstance) Unlock() { +func (li *localLockInstance) Unlock(cancel context.CancelFunc) { + if cancel != nil { + cancel() + } const readLock = false for _, path := range li.paths { li.ns.unlock(li.volume, path, readLock) @@ -238,7 +247,7 @@ func (li *localLockInstance) Unlock() { } // RLock - block until read lock is taken or timeout has occurred. -func (li *localLockInstance) GetRLock(ctx context.Context, timeout *dynamicTimeout) (_ context.Context, timedOutErr error) { +func (li *localLockInstance) GetRLock(ctx context.Context, timeout *dynamicTimeout) (_ context.Context, _ context.CancelFunc, timedOutErr error) { lockSource := getSource(2) start := UTCNow() const readLock = true @@ -251,16 +260,19 @@ func (li *localLockInstance) GetRLock(ctx context.Context, timeout *dynamicTimeo li.ns.unlock(li.volume, li.paths[si], readLock) } } - return nil, OperationTimedOut{} + return nil, nil, OperationTimedOut{} } success[i] = 1 } timeout.LogSuccess(UTCNow().Sub(start)) - return ctx, nil + return ctx, func() {}, nil } // RUnlock - block until read lock is released. -func (li *localLockInstance) RUnlock() { +func (li *localLockInstance) RUnlock(cancel context.CancelFunc) { + if cancel != nil { + cancel() + } const readLock = true for _, path := range li.paths { li.ns.unlock(li.volume, path, readLock) diff --git a/cmd/server-main.go b/cmd/server-main.go index a0826de7c..43c7c6d28 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -289,7 +289,6 @@ func initServer(ctx context.Context, newObject ObjectLayer) error { lockTimeout := newDynamicTimeout(5*time.Second, 3*time.Second) - var err error for { select { case <-ctx.Done(): @@ -300,7 +299,8 @@ func initServer(ctx context.Context, newObject ObjectLayer) error { // let one of the server acquire the lock, if not let them timeout. // which shall be retried again by this loop. - if _, err = txnLk.GetLock(ctx, lockTimeout); err != nil { + _, cancel, err := txnLk.GetLock(ctx, lockTimeout) + if err != nil { logger.Info("Waiting for all MinIO sub-systems to be initialized.. trying to acquire lock") time.Sleep(time.Duration(r.Float64() * float64(5*time.Second))) @@ -319,7 +319,7 @@ func initServer(ctx context.Context, newObject ObjectLayer) error { // Upon success migrating the config, initialize all sub-systems // if all sub-systems initialized successfully return right away if err = initAllSubsystems(ctx, newObject); err == nil { - txnLk.Unlock() + txnLk.Unlock(cancel) // All successful return. if globalIsDistErasure { // These messages only meant primarily for distributed setup, so only log during distributed setup. @@ -329,7 +329,7 @@ func initServer(ctx context.Context, newObject ObjectLayer) error { } } - txnLk.Unlock() // Unlock the transaction lock and allow other nodes to acquire the lock if possible. + txnLk.Unlock(cancel) // Unlock the transaction lock and allow other nodes to acquire the lock if possible. if configRetriableErrors(err) { logger.Info("Waiting for all MinIO sub-systems to be initialized.. possible cause (%v)", err)