mirror of
https://github.com/minio/minio.git
synced 2025-07-14 11:21:52 -04:00
Don't retain context in locker (#10515)
Use the context for internal timeouts, but disconnect it from outgoing calls so we always receive the results and cancel it remotely.
This commit is contained in:
parent
6bd9057bb1
commit
2439d4fb3c
@ -1284,8 +1284,8 @@ func (a adminAPIHandlers) OBDInfoHandler(w http.ResponseWriter, r *http.Request)
|
||||
deadlinedCtx, cancel := context.WithTimeout(ctx, deadline)
|
||||
defer cancel()
|
||||
|
||||
nsLock := objectAPI.NewNSLock(ctx, minioMetaBucket, "obd-in-progress")
|
||||
if err := nsLock.GetLock(newDynamicTimeout(deadline, deadline)); err != nil { // returns a locked lock
|
||||
nsLock := objectAPI.NewNSLock(minioMetaBucket, "obd-in-progress")
|
||||
if err := nsLock.GetLock(ctx, newDynamicTimeout(deadline, deadline)); err != nil { // returns a locked lock
|
||||
errResp(err)
|
||||
return
|
||||
}
|
||||
|
@ -69,10 +69,10 @@ func initDataCrawler(ctx context.Context, objAPI ObjectLayer) {
|
||||
// There should only ever be one crawler running per cluster.
|
||||
func runDataCrawler(ctx context.Context, objAPI ObjectLayer) {
|
||||
// Make sure only 1 crawler is running on the cluster.
|
||||
locker := objAPI.NewNSLock(ctx, minioMetaBucket, "runDataCrawler.lock")
|
||||
locker := objAPI.NewNSLock(minioMetaBucket, "runDataCrawler.lock")
|
||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
for {
|
||||
err := locker.GetLock(dataCrawlerLeaderLockTimeout)
|
||||
err := locker.GetLock(ctx, dataCrawlerLeaderLockTimeout)
|
||||
if err != nil {
|
||||
time.Sleep(time.Duration(r.Float64() * float64(dataCrawlStartDelay)))
|
||||
continue
|
||||
|
@ -142,7 +142,7 @@ type diskCache struct {
|
||||
// nsMutex namespace lock
|
||||
nsMutex *nsLockMap
|
||||
// Object functions pointing to the corresponding functions of backend implementation.
|
||||
NewNSLockFn func(ctx context.Context, cachePath string) RWLocker
|
||||
NewNSLockFn func(cachePath string) RWLocker
|
||||
}
|
||||
|
||||
// Inits the disk cache dir if it is not initialized already.
|
||||
@ -175,8 +175,8 @@ func newDiskCache(ctx context.Context, dir string, config cache.Config) (*diskCa
|
||||
}
|
||||
go cache.purgeWait(ctx)
|
||||
cache.diskSpaceAvailable(0) // update if cache usage is already high.
|
||||
cache.NewNSLockFn = func(ctx context.Context, cachePath string) RWLocker {
|
||||
return cache.nsMutex.NewNSLock(ctx, nil, cachePath, "")
|
||||
cache.NewNSLockFn = func(cachePath string) RWLocker {
|
||||
return cache.nsMutex.NewNSLock(nil, cachePath, "")
|
||||
}
|
||||
return &cache, nil
|
||||
}
|
||||
@ -419,8 +419,8 @@ func (c *diskCache) Stat(ctx context.Context, bucket, object string) (oi ObjectI
|
||||
// if partial object is cached.
|
||||
func (c *diskCache) statCachedMeta(ctx context.Context, cacheObjPath string) (meta *cacheMeta, partial bool, numHits int, err error) {
|
||||
|
||||
cLock := c.NewNSLockFn(ctx, cacheObjPath)
|
||||
if err = cLock.GetRLock(globalOperationTimeout); err != nil {
|
||||
cLock := c.NewNSLockFn(cacheObjPath)
|
||||
if err = cLock.GetRLock(ctx, globalOperationTimeout); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
@ -501,8 +501,8 @@ func (c *diskCache) statCache(ctx context.Context, cacheObjPath string) (meta *c
|
||||
// incHitsOnly is true if metadata update is incrementing only the hit counter
|
||||
func (c *diskCache) SaveMetadata(ctx context.Context, bucket, object string, meta map[string]string, actualSize int64, rs *HTTPRangeSpec, rsFileName string, incHitsOnly bool) error {
|
||||
cachedPath := getCacheSHADir(c.dir, bucket, object)
|
||||
cLock := c.NewNSLockFn(ctx, cachedPath)
|
||||
if err := cLock.GetLock(globalOperationTimeout); err != nil {
|
||||
cLock := c.NewNSLockFn(cachedPath)
|
||||
if err := cLock.GetLock(ctx, globalOperationTimeout); err != nil {
|
||||
return err
|
||||
}
|
||||
defer cLock.Unlock()
|
||||
@ -666,8 +666,8 @@ func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Read
|
||||
return errDiskFull
|
||||
}
|
||||
cachePath := getCacheSHADir(c.dir, bucket, object)
|
||||
cLock := c.NewNSLockFn(ctx, cachePath)
|
||||
if err := cLock.GetLock(globalOperationTimeout); err != nil {
|
||||
cLock := c.NewNSLockFn(cachePath)
|
||||
if err := cLock.GetLock(ctx, globalOperationTimeout); err != nil {
|
||||
return err
|
||||
}
|
||||
defer cLock.Unlock()
|
||||
@ -866,8 +866,8 @@ func (c *diskCache) bitrotReadFromCache(ctx context.Context, filePath string, of
|
||||
// Get returns ObjectInfo and reader for object from disk cache
|
||||
func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, numHits int, err error) {
|
||||
cacheObjPath := getCacheSHADir(c.dir, bucket, object)
|
||||
cLock := c.NewNSLockFn(ctx, cacheObjPath)
|
||||
if err := cLock.GetRLock(globalOperationTimeout); err != nil {
|
||||
cLock := c.NewNSLockFn(cacheObjPath)
|
||||
if err := cLock.GetRLock(ctx, globalOperationTimeout); err != nil {
|
||||
return nil, numHits, err
|
||||
}
|
||||
|
||||
@ -930,8 +930,8 @@ func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRang
|
||||
|
||||
// Deletes the cached object
|
||||
func (c *diskCache) delete(ctx context.Context, cacheObjPath string) (err error) {
|
||||
cLock := c.NewNSLockFn(ctx, cacheObjPath)
|
||||
if err := cLock.GetLock(globalOperationTimeout); err != nil {
|
||||
cLock := c.NewNSLockFn(cacheObjPath)
|
||||
if err := cLock.GetLock(ctx, globalOperationTimeout); err != nil {
|
||||
return err
|
||||
}
|
||||
defer cLock.Unlock()
|
||||
|
@ -355,8 +355,8 @@ func (er erasureObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObjec
|
||||
//
|
||||
// Implements S3 compatible Upload Part API.
|
||||
func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *PutObjReader, opts ObjectOptions) (pi PartInfo, err error) {
|
||||
uploadIDLock := er.NewNSLock(ctx, bucket, pathJoin(object, uploadID))
|
||||
if err = uploadIDLock.GetRLock(globalOperationTimeout); err != nil {
|
||||
uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
||||
if err = uploadIDLock.GetRLock(ctx, globalOperationTimeout); err != nil {
|
||||
return PartInfo{}, err
|
||||
}
|
||||
readLocked := true
|
||||
@ -469,7 +469,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
||||
// PutObjectParts would serialize here updating `xl.meta`
|
||||
uploadIDLock.RUnlock()
|
||||
readLocked = false
|
||||
if err = uploadIDLock.GetLock(globalOperationTimeout); err != nil {
|
||||
if err = uploadIDLock.GetLock(ctx, globalOperationTimeout); err != nil {
|
||||
return PartInfo{}, err
|
||||
}
|
||||
defer uploadIDLock.Unlock()
|
||||
@ -550,8 +550,8 @@ func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, u
|
||||
UploadID: uploadID,
|
||||
}
|
||||
|
||||
uploadIDLock := er.NewNSLock(ctx, bucket, pathJoin(object, uploadID))
|
||||
if err := uploadIDLock.GetRLock(globalOperationTimeout); err != nil {
|
||||
uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
||||
if err := uploadIDLock.GetRLock(ctx, globalOperationTimeout); err != nil {
|
||||
return MultipartInfo{}, err
|
||||
}
|
||||
defer uploadIDLock.RUnlock()
|
||||
@ -598,8 +598,8 @@ func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, u
|
||||
// ListPartsInfo structure is marshaled directly into XML and
|
||||
// replied back to the client.
|
||||
func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker, maxParts int, opts ObjectOptions) (result ListPartsInfo, e error) {
|
||||
uploadIDLock := er.NewNSLock(ctx, bucket, pathJoin(object, uploadID))
|
||||
if err := uploadIDLock.GetRLock(globalOperationTimeout); err != nil {
|
||||
uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
||||
if err := uploadIDLock.GetRLock(ctx, globalOperationTimeout); err != nil {
|
||||
return ListPartsInfo{}, err
|
||||
}
|
||||
defer uploadIDLock.RUnlock()
|
||||
@ -691,8 +691,8 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up
|
||||
func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket string, object string, uploadID string, parts []CompletePart, opts ObjectOptions) (oi ObjectInfo, err error) {
|
||||
// Hold read-locks to verify uploaded parts, also disallows
|
||||
// parallel part uploads as well.
|
||||
uploadIDLock := er.NewNSLock(ctx, bucket, pathJoin(object, uploadID))
|
||||
if err = uploadIDLock.GetRLock(globalOperationTimeout); err != nil {
|
||||
uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
||||
if err = uploadIDLock.GetRLock(ctx, globalOperationTimeout); err != nil {
|
||||
return oi, err
|
||||
}
|
||||
defer uploadIDLock.RUnlock()
|
||||
@ -844,8 +844,8 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
|
||||
}
|
||||
|
||||
// Hold namespace to complete the transaction
|
||||
lk := er.NewNSLock(ctx, bucket, object)
|
||||
if err = lk.GetLock(globalOperationTimeout); err != nil {
|
||||
lk := er.NewNSLock(bucket, object)
|
||||
if err = lk.GetLock(ctx, globalOperationTimeout); err != nil {
|
||||
return oi, err
|
||||
}
|
||||
defer lk.Unlock()
|
||||
@ -886,8 +886,8 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
|
||||
// would be removed from the system, rollback is not possible on this
|
||||
// operation.
|
||||
func (er erasureObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) error {
|
||||
lk := er.NewNSLock(ctx, bucket, pathJoin(object, uploadID))
|
||||
if err := lk.GetLock(globalOperationTimeout); err != nil {
|
||||
lk := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
||||
if err := lk.GetLock(ctx, globalOperationTimeout); err != nil {
|
||||
return err
|
||||
}
|
||||
defer lk.Unlock()
|
||||
|
@ -48,8 +48,8 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d
|
||||
}
|
||||
|
||||
defer ObjectPathUpdated(path.Join(dstBucket, dstObject))
|
||||
lk := er.NewNSLock(ctx, dstBucket, dstObject)
|
||||
if err := lk.GetLock(globalOperationTimeout); err != nil {
|
||||
lk := er.NewNSLock(dstBucket, dstObject)
|
||||
if err := lk.GetLock(ctx, globalOperationTimeout); err != nil {
|
||||
return oi, err
|
||||
}
|
||||
defer lk.Unlock()
|
||||
@ -135,15 +135,15 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri
|
||||
|
||||
// Acquire lock
|
||||
if lockType != noLock {
|
||||
lock := er.NewNSLock(ctx, bucket, object)
|
||||
lock := er.NewNSLock(bucket, object)
|
||||
switch lockType {
|
||||
case writeLock:
|
||||
if err = lock.GetLock(globalOperationTimeout); err != nil {
|
||||
if err = lock.GetLock(ctx, globalOperationTimeout); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
nsUnlocker = lock.Unlock
|
||||
case readLock:
|
||||
if err = lock.GetRLock(globalOperationTimeout); err != nil {
|
||||
if err = lock.GetRLock(ctx, globalOperationTimeout); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
nsUnlocker = lock.RUnlock
|
||||
@ -196,8 +196,8 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri
|
||||
// length indicates the total length of the object.
|
||||
func (er erasureObjects) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) error {
|
||||
// Lock the object before reading.
|
||||
lk := er.NewNSLock(ctx, bucket, object)
|
||||
if err := lk.GetRLock(globalOperationTimeout); err != nil {
|
||||
lk := er.NewNSLock(bucket, object)
|
||||
if err := lk.GetRLock(ctx, globalOperationTimeout); err != nil {
|
||||
return err
|
||||
}
|
||||
defer lk.RUnlock()
|
||||
@ -344,8 +344,8 @@ func (er erasureObjects) getObject(ctx context.Context, bucket, object string, s
|
||||
// GetObjectInfo - reads object metadata and replies back ObjectInfo.
|
||||
func (er erasureObjects) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (info ObjectInfo, err error) {
|
||||
// Lock the object before reading.
|
||||
lk := er.NewNSLock(ctx, bucket, object)
|
||||
if err := lk.GetRLock(globalOperationTimeout); err != nil {
|
||||
lk := er.NewNSLock(bucket, object)
|
||||
if err := lk.GetRLock(ctx, globalOperationTimeout); err != nil {
|
||||
return ObjectInfo{}, err
|
||||
}
|
||||
defer lk.RUnlock()
|
||||
@ -632,8 +632,8 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
|
||||
return ObjectInfo{}, IncompleteBody{Bucket: bucket, Object: object}
|
||||
}
|
||||
|
||||
lk := er.NewNSLock(ctx, bucket, object)
|
||||
if err := lk.GetLock(globalOperationTimeout); err != nil {
|
||||
lk := er.NewNSLock(bucket, object)
|
||||
if err := lk.GetLock(ctx, globalOperationTimeout); err != nil {
|
||||
return ObjectInfo{}, err
|
||||
}
|
||||
defer lk.Unlock()
|
||||
@ -902,8 +902,8 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
|
||||
}
|
||||
|
||||
// Acquire a write lock before deleting the object.
|
||||
lk := er.NewNSLock(ctx, bucket, object)
|
||||
if err = lk.GetLock(globalDeleteOperationTimeout); err != nil {
|
||||
lk := er.NewNSLock(bucket, object)
|
||||
if err = lk.GetLock(ctx, globalDeleteOperationTimeout); err != nil {
|
||||
return ObjectInfo{}, err
|
||||
}
|
||||
defer lk.Unlock()
|
||||
|
@ -89,8 +89,8 @@ func newErasureServerSets(ctx context.Context, endpointServerSets EndpointServer
|
||||
return z, nil
|
||||
}
|
||||
|
||||
func (z *erasureServerSets) NewNSLock(ctx context.Context, bucket string, objects ...string) RWLocker {
|
||||
return z.serverSets[0].NewNSLock(ctx, bucket, objects...)
|
||||
func (z *erasureServerSets) NewNSLock(bucket string, objects ...string) RWLocker {
|
||||
return z.serverSets[0].NewNSLock(bucket, objects...)
|
||||
}
|
||||
|
||||
func (z *erasureServerSets) GetAllLockers() []dsync.NetLocker {
|
||||
@ -570,8 +570,8 @@ func (z *erasureServerSets) DeleteObjects(ctx context.Context, bucket string, ob
|
||||
}
|
||||
|
||||
// Acquire a bulk write lock across 'objects'
|
||||
multiDeleteLock := z.NewNSLock(ctx, bucket, objSets.ToSlice()...)
|
||||
if err := multiDeleteLock.GetLock(globalOperationTimeout); err != nil {
|
||||
multiDeleteLock := z.NewNSLock(bucket, objSets.ToSlice()...)
|
||||
if err := multiDeleteLock.GetLock(ctx, globalOperationTimeout); err != nil {
|
||||
for i := range derrs {
|
||||
derrs[i] = err
|
||||
}
|
||||
@ -1682,8 +1682,8 @@ func (z *erasureServerSets) ListBuckets(ctx context.Context) (buckets []BucketIn
|
||||
|
||||
func (z *erasureServerSets) HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error) {
|
||||
// Acquire lock on format.json
|
||||
formatLock := z.NewNSLock(ctx, minioMetaBucket, formatConfigFile)
|
||||
if err := formatLock.GetLock(globalOperationTimeout); err != nil {
|
||||
formatLock := z.NewNSLock(minioMetaBucket, formatConfigFile)
|
||||
if err := formatLock.GetLock(ctx, globalOperationTimeout); err != nil {
|
||||
return madmin.HealResultItem{}, err
|
||||
}
|
||||
defer formatLock.Unlock()
|
||||
@ -1887,17 +1887,17 @@ func (z *erasureServerSets) HealObjects(ctx context.Context, bucket, prefix stri
|
||||
func (z *erasureServerSets) HealObject(ctx context.Context, bucket, object, versionID string, opts madmin.HealOpts) (madmin.HealResultItem, error) {
|
||||
object = encodeDirObject(object)
|
||||
|
||||
lk := z.NewNSLock(ctx, bucket, object)
|
||||
lk := z.NewNSLock(bucket, object)
|
||||
if bucket == minioMetaBucket {
|
||||
// For .minio.sys bucket heals we should hold write locks.
|
||||
if err := lk.GetLock(globalOperationTimeout); err != nil {
|
||||
if err := lk.GetLock(ctx, globalOperationTimeout); err != nil {
|
||||
return madmin.HealResultItem{}, err
|
||||
}
|
||||
defer lk.Unlock()
|
||||
} else {
|
||||
// Lock the object before healing. Use read lock since healing
|
||||
// will only regenerate parts & xl.meta of outdated disks.
|
||||
if err := lk.GetRLock(globalOperationTimeout); err != nil {
|
||||
if err := lk.GetRLock(ctx, globalOperationTimeout); err != nil {
|
||||
return madmin.HealResultItem{}, err
|
||||
}
|
||||
defer lk.RUnlock()
|
||||
|
@ -431,11 +431,11 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto
|
||||
}
|
||||
|
||||
// NewNSLock - initialize a new namespace RWLocker instance.
|
||||
func (s *erasureSets) NewNSLock(ctx context.Context, bucket string, objects ...string) RWLocker {
|
||||
func (s *erasureSets) NewNSLock(bucket string, objects ...string) RWLocker {
|
||||
if len(objects) == 1 {
|
||||
return s.getHashedSet(objects[0]).NewNSLock(ctx, bucket, objects...)
|
||||
return s.getHashedSet(objects[0]).NewNSLock(bucket, objects...)
|
||||
}
|
||||
return s.getHashedSet("").NewNSLock(ctx, bucket, objects...)
|
||||
return s.getHashedSet("").NewNSLock(bucket, objects...)
|
||||
}
|
||||
|
||||
// SetDriveCount returns the current drives per set.
|
||||
|
@ -68,8 +68,8 @@ type erasureObjects struct {
|
||||
}
|
||||
|
||||
// NewNSLock - initialize a new namespace RWLocker instance.
|
||||
func (er erasureObjects) NewNSLock(ctx context.Context, bucket string, objects ...string) RWLocker {
|
||||
return er.nsMutex.NewNSLock(ctx, er.getLockers, bucket, objects...)
|
||||
func (er erasureObjects) NewNSLock(bucket string, objects ...string) RWLocker {
|
||||
return er.nsMutex.NewNSLock(er.getLockers, bucket, objects...)
|
||||
}
|
||||
|
||||
// SetDriveCount returns the current drives per set.
|
||||
|
@ -714,8 +714,8 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
|
||||
}
|
||||
|
||||
// Hold write lock on the object.
|
||||
destLock := fs.NewNSLock(ctx, bucket, object)
|
||||
if err = destLock.GetLock(globalOperationTimeout); err != nil {
|
||||
destLock := fs.NewNSLock(bucket, object)
|
||||
if err = destLock.GetLock(ctx, globalOperationTimeout); err != nil {
|
||||
return oi, err
|
||||
}
|
||||
defer destLock.Unlock()
|
||||
|
34
cmd/fs-v1.go
34
cmd/fs-v1.go
@ -186,9 +186,9 @@ func NewFSObjectLayer(fsPath string) (ObjectLayer, error) {
|
||||
}
|
||||
|
||||
// NewNSLock - initialize a new namespace RWLocker instance.
|
||||
func (fs *FSObjects) NewNSLock(ctx context.Context, bucket string, objects ...string) RWLocker {
|
||||
func (fs *FSObjects) NewNSLock(bucket string, objects ...string) RWLocker {
|
||||
// lockers are explicitly 'nil' for FS mode since there are only local lockers
|
||||
return fs.nsMutex.NewNSLock(ctx, nil, bucket, objects...)
|
||||
return fs.nsMutex.NewNSLock(nil, bucket, objects...)
|
||||
}
|
||||
|
||||
// SetDriveCount no-op
|
||||
@ -601,8 +601,8 @@ func (fs *FSObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBu
|
||||
defer ObjectPathUpdated(path.Join(dstBucket, dstObject))
|
||||
|
||||
if !cpSrcDstSame {
|
||||
objectDWLock := fs.NewNSLock(ctx, dstBucket, dstObject)
|
||||
if err := objectDWLock.GetLock(globalOperationTimeout); err != nil {
|
||||
objectDWLock := fs.NewNSLock(dstBucket, dstObject)
|
||||
if err := objectDWLock.GetLock(ctx, globalOperationTimeout); err != nil {
|
||||
return oi, err
|
||||
}
|
||||
defer objectDWLock.Unlock()
|
||||
@ -692,15 +692,15 @@ func (fs *FSObjects) GetObjectNInfo(ctx context.Context, bucket, object string,
|
||||
|
||||
if lockType != noLock {
|
||||
// Lock the object before reading.
|
||||
lock := fs.NewNSLock(ctx, bucket, object)
|
||||
lock := fs.NewNSLock(bucket, object)
|
||||
switch lockType {
|
||||
case writeLock:
|
||||
if err = lock.GetLock(globalOperationTimeout); err != nil {
|
||||
if err = lock.GetLock(ctx, globalOperationTimeout); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
nsUnlocker = lock.Unlock
|
||||
case readLock:
|
||||
if err = lock.GetRLock(globalOperationTimeout); err != nil {
|
||||
if err = lock.GetRLock(ctx, globalOperationTimeout); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
nsUnlocker = lock.RUnlock
|
||||
@ -785,8 +785,8 @@ func (fs *FSObjects) GetObject(ctx context.Context, bucket, object string, offse
|
||||
}
|
||||
|
||||
// Lock the object before reading.
|
||||
lk := fs.NewNSLock(ctx, bucket, object)
|
||||
if err := lk.GetRLock(globalOperationTimeout); err != nil {
|
||||
lk := fs.NewNSLock(bucket, object)
|
||||
if err := lk.GetRLock(ctx, globalOperationTimeout); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return err
|
||||
}
|
||||
@ -1013,8 +1013,8 @@ func (fs *FSObjects) getObjectInfo(ctx context.Context, bucket, object string) (
|
||||
// getObjectInfoWithLock - reads object metadata and replies back ObjectInfo.
|
||||
func (fs *FSObjects) getObjectInfoWithLock(ctx context.Context, bucket, object string) (oi ObjectInfo, e error) {
|
||||
// Lock the object before reading.
|
||||
lk := fs.NewNSLock(ctx, bucket, object)
|
||||
if err := lk.GetRLock(globalOperationTimeout); err != nil {
|
||||
lk := fs.NewNSLock(bucket, object)
|
||||
if err := lk.GetRLock(ctx, globalOperationTimeout); err != nil {
|
||||
return oi, err
|
||||
}
|
||||
defer lk.RUnlock()
|
||||
@ -1051,8 +1051,8 @@ func (fs *FSObjects) GetObjectInfo(ctx context.Context, bucket, object string, o
|
||||
|
||||
oi, err := fs.getObjectInfoWithLock(ctx, bucket, object)
|
||||
if err == errCorruptedFormat || err == io.EOF {
|
||||
lk := fs.NewNSLock(ctx, bucket, object)
|
||||
if err = lk.GetLock(globalOperationTimeout); err != nil {
|
||||
lk := fs.NewNSLock(bucket, object)
|
||||
if err = lk.GetLock(ctx, globalOperationTimeout); err != nil {
|
||||
return oi, toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
@ -1102,8 +1102,8 @@ func (fs *FSObjects) PutObject(ctx context.Context, bucket string, object string
|
||||
}
|
||||
|
||||
// Lock the object.
|
||||
lk := fs.NewNSLock(ctx, bucket, object)
|
||||
if err := lk.GetLock(globalOperationTimeout); err != nil {
|
||||
lk := fs.NewNSLock(bucket, object)
|
||||
if err := lk.GetLock(ctx, globalOperationTimeout); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return objInfo, err
|
||||
}
|
||||
@ -1284,8 +1284,8 @@ func (fs *FSObjects) DeleteObject(ctx context.Context, bucket, object string, op
|
||||
}
|
||||
|
||||
// Acquire a write lock before deleting the object.
|
||||
lk := fs.NewNSLock(ctx, bucket, object)
|
||||
if err = lk.GetLock(globalOperationTimeout); err != nil {
|
||||
lk := fs.NewNSLock(bucket, object)
|
||||
if err = lk.GetLock(ctx, globalOperationTimeout); err != nil {
|
||||
return objInfo, err
|
||||
}
|
||||
defer lk.Unlock()
|
||||
|
@ -51,8 +51,8 @@ type GatewayLocker struct {
|
||||
}
|
||||
|
||||
// NewNSLock - implements gateway level locker
|
||||
func (l *GatewayLocker) NewNSLock(ctx context.Context, bucket string, objects ...string) RWLocker {
|
||||
return l.nsMutex.NewNSLock(ctx, nil, bucket, objects...)
|
||||
func (l *GatewayLocker) NewNSLock(bucket string, objects ...string) RWLocker {
|
||||
return l.nsMutex.NewNSLock(nil, bucket, objects...)
|
||||
}
|
||||
|
||||
// Walk - implements common gateway level Walker, to walk on all objects recursively at a prefix
|
||||
|
@ -41,8 +41,8 @@ func (a GatewayUnsupported) CrawlAndGetDataUsage(ctx context.Context, bf *bloomF
|
||||
}
|
||||
|
||||
// NewNSLock is a dummy stub for gateway.
|
||||
func (a GatewayUnsupported) NewNSLock(ctx context.Context, bucket string, objects ...string) RWLocker {
|
||||
logger.CriticalIf(ctx, errors.New("not implemented"))
|
||||
func (a GatewayUnsupported) NewNSLock(bucket string, objects ...string) RWLocker {
|
||||
logger.CriticalIf(context.Background(), errors.New("not implemented"))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -92,6 +92,9 @@ func (t *apiConfig) getRequestsPool() (chan struct{}, <-chan time.Time) {
|
||||
if t.requestsPool == nil {
|
||||
return nil, nil
|
||||
}
|
||||
if t.requestsDeadline <= 0 {
|
||||
return t.requestsPool, nil
|
||||
}
|
||||
|
||||
return t.requestsPool, time.NewTimer(t.requestsDeadline).C
|
||||
}
|
||||
|
@ -429,7 +429,7 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) {
|
||||
defer cancel()
|
||||
|
||||
// Hold the lock for migration only.
|
||||
txnLk := objAPI.NewNSLock(retryCtx, minioMetaBucket, minioConfigPrefix+"/iam.lock")
|
||||
txnLk := objAPI.NewNSLock(minioMetaBucket, minioConfigPrefix+"/iam.lock")
|
||||
|
||||
// Initializing IAM sub-system needs a retry mechanism for
|
||||
// the following reasons:
|
||||
@ -446,7 +446,7 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) {
|
||||
for range retry.NewTimerWithJitter(retryCtx, time.Second, 5*time.Second, retry.MaxJitter) {
|
||||
// let one of the server acquire the lock, if not let them timeout.
|
||||
// which shall be retried again by this loop.
|
||||
if err := txnLk.GetLock(iamLockTimeout); err != nil {
|
||||
if err := txnLk.GetLock(retryCtx, iamLockTimeout); err != nil {
|
||||
logger.Info("Waiting for all MinIO IAM sub-system to be initialized.. trying to acquire lock")
|
||||
continue
|
||||
}
|
||||
|
@ -38,9 +38,9 @@ var globalLockServers = make(map[Endpoint]*localLocker)
|
||||
|
||||
// RWLocker - locker interface to introduce GetRLock, RUnlock.
|
||||
type RWLocker interface {
|
||||
GetLock(timeout *dynamicTimeout) (timedOutErr error)
|
||||
GetLock(ctx context.Context, timeout *dynamicTimeout) (timedOutErr error)
|
||||
Unlock()
|
||||
GetRLock(timeout *dynamicTimeout) (timedOutErr error)
|
||||
GetRLock(ctx context.Context, timeout *dynamicTimeout) (timedOutErr error)
|
||||
RUnlock()
|
||||
}
|
||||
|
||||
@ -139,15 +139,14 @@ func (n *nsLockMap) unlock(volume string, path string, readLock bool) {
|
||||
type distLockInstance struct {
|
||||
rwMutex *dsync.DRWMutex
|
||||
opsID string
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
// Lock - block until write lock is taken or timeout has occurred.
|
||||
func (di *distLockInstance) GetLock(timeout *dynamicTimeout) (timedOutErr error) {
|
||||
func (di *distLockInstance) GetLock(ctx context.Context, timeout *dynamicTimeout) (timedOutErr error) {
|
||||
lockSource := getSource(2)
|
||||
start := UTCNow()
|
||||
|
||||
if !di.rwMutex.GetLock(di.ctx, di.opsID, lockSource, dsync.Options{
|
||||
if !di.rwMutex.GetLock(ctx, di.opsID, lockSource, dsync.Options{
|
||||
Timeout: timeout.Timeout(),
|
||||
}) {
|
||||
timeout.LogFailure()
|
||||
@ -163,11 +162,11 @@ func (di *distLockInstance) Unlock() {
|
||||
}
|
||||
|
||||
// RLock - block until read lock is taken or timeout has occurred.
|
||||
func (di *distLockInstance) GetRLock(timeout *dynamicTimeout) (timedOutErr error) {
|
||||
func (di *distLockInstance) GetRLock(ctx context.Context, timeout *dynamicTimeout) (timedOutErr error) {
|
||||
lockSource := getSource(2)
|
||||
start := UTCNow()
|
||||
|
||||
if !di.rwMutex.GetRLock(di.ctx, di.opsID, lockSource, dsync.Options{
|
||||
if !di.rwMutex.GetRLock(ctx, di.opsID, lockSource, dsync.Options{
|
||||
Timeout: timeout.Timeout(),
|
||||
}) {
|
||||
timeout.LogFailure()
|
||||
@ -184,7 +183,6 @@ func (di *distLockInstance) RUnlock() {
|
||||
|
||||
// localLockInstance - frontend/top-level interface for namespace locks.
|
||||
type localLockInstance struct {
|
||||
ctx context.Context
|
||||
ns *nsLockMap
|
||||
volume string
|
||||
paths []string
|
||||
@ -194,26 +192,26 @@ type localLockInstance struct {
|
||||
// NewNSLock - returns a lock instance for a given volume and
|
||||
// path. The returned lockInstance object encapsulates the nsLockMap,
|
||||
// volume, path and operation ID.
|
||||
func (n *nsLockMap) NewNSLock(ctx context.Context, lockers func() ([]dsync.NetLocker, string), volume string, paths ...string) RWLocker {
|
||||
func (n *nsLockMap) NewNSLock(lockers func() ([]dsync.NetLocker, string), volume string, paths ...string) RWLocker {
|
||||
opsID := mustGetUUID()
|
||||
if n.isDistErasure {
|
||||
drwmutex := dsync.NewDRWMutex(&dsync.Dsync{
|
||||
GetLockers: lockers,
|
||||
}, pathsJoinPrefix(volume, paths...)...)
|
||||
return &distLockInstance{drwmutex, opsID, ctx}
|
||||
return &distLockInstance{drwmutex, opsID}
|
||||
}
|
||||
sort.Strings(paths)
|
||||
return &localLockInstance{ctx, n, volume, paths, opsID}
|
||||
return &localLockInstance{n, volume, paths, opsID}
|
||||
}
|
||||
|
||||
// Lock - block until write lock is taken or timeout has occurred.
|
||||
func (li *localLockInstance) GetLock(timeout *dynamicTimeout) (timedOutErr error) {
|
||||
func (li *localLockInstance) GetLock(ctx context.Context, timeout *dynamicTimeout) (timedOutErr error) {
|
||||
lockSource := getSource(2)
|
||||
start := UTCNow()
|
||||
readLock := false
|
||||
const readLock = false
|
||||
var success []int
|
||||
for i, path := range li.paths {
|
||||
if !li.ns.lock(li.ctx, li.volume, path, lockSource, li.opsID, readLock, timeout.Timeout()) {
|
||||
if !li.ns.lock(ctx, li.volume, path, lockSource, li.opsID, readLock, timeout.Timeout()) {
|
||||
timeout.LogFailure()
|
||||
for _, sint := range success {
|
||||
li.ns.unlock(li.volume, li.paths[sint], readLock)
|
||||
@ -228,20 +226,20 @@ func (li *localLockInstance) GetLock(timeout *dynamicTimeout) (timedOutErr error
|
||||
|
||||
// Unlock - block until write lock is released.
|
||||
func (li *localLockInstance) Unlock() {
|
||||
readLock := false
|
||||
const readLock = false
|
||||
for _, path := range li.paths {
|
||||
li.ns.unlock(li.volume, path, readLock)
|
||||
}
|
||||
}
|
||||
|
||||
// RLock - block until read lock is taken or timeout has occurred.
|
||||
func (li *localLockInstance) GetRLock(timeout *dynamicTimeout) (timedOutErr error) {
|
||||
func (li *localLockInstance) GetRLock(ctx context.Context, timeout *dynamicTimeout) (timedOutErr error) {
|
||||
lockSource := getSource(2)
|
||||
start := UTCNow()
|
||||
readLock := true
|
||||
const readLock = true
|
||||
var success []int
|
||||
for i, path := range li.paths {
|
||||
if !li.ns.lock(li.ctx, li.volume, path, lockSource, li.opsID, readLock, timeout.Timeout()) {
|
||||
if !li.ns.lock(ctx, li.volume, path, lockSource, li.opsID, readLock, timeout.Timeout()) {
|
||||
timeout.LogFailure()
|
||||
for _, sint := range success {
|
||||
li.ns.unlock(li.volume, li.paths[sint], readLock)
|
||||
@ -256,7 +254,7 @@ func (li *localLockInstance) GetRLock(timeout *dynamicTimeout) (timedOutErr erro
|
||||
|
||||
// RUnlock - block until read lock is released.
|
||||
func (li *localLockInstance) RUnlock() {
|
||||
readLock := true
|
||||
const readLock = true
|
||||
for _, path := range li.paths {
|
||||
li.ns.unlock(li.volume, path, readLock)
|
||||
}
|
||||
|
@ -68,7 +68,7 @@ type ObjectLayer interface {
|
||||
SetDriveCount() int // Only implemented by erasure layer
|
||||
|
||||
// Locking operations on object.
|
||||
NewNSLock(ctx context.Context, bucket string, objects ...string) RWLocker
|
||||
NewNSLock(bucket string, objects ...string) RWLocker
|
||||
|
||||
// Storage operations.
|
||||
Shutdown(context.Context) error
|
||||
|
@ -210,7 +210,7 @@ func initServer(ctx context.Context, newObject ObjectLayer) error {
|
||||
// at a given time, this big transaction lock ensures this
|
||||
// appropriately. This is also true for rotation of encrypted
|
||||
// content.
|
||||
txnLk := newObject.NewNSLock(retryCtx, minioMetaBucket, minioConfigPrefix+"/transaction.lock")
|
||||
txnLk := newObject.NewNSLock(minioMetaBucket, minioConfigPrefix+"/transaction.lock")
|
||||
|
||||
// allocate dynamic timeout once before the loop
|
||||
configLockTimeout := newDynamicTimeout(5*time.Second, 3*time.Second)
|
||||
@ -232,7 +232,7 @@ func initServer(ctx context.Context, newObject ObjectLayer) error {
|
||||
for range retry.NewTimerWithJitter(retryCtx, 500*time.Millisecond, time.Second, retry.MaxJitter) {
|
||||
// let one of the server acquire the lock, if not let them timeout.
|
||||
// which shall be retried again by this loop.
|
||||
if err = txnLk.GetLock(configLockTimeout); err != nil {
|
||||
if err = txnLk.GetLock(retryCtx, configLockTimeout); err != nil {
|
||||
logger.Info("Waiting for all MinIO sub-systems to be initialized.. trying to acquire lock")
|
||||
continue
|
||||
}
|
||||
|
@ -141,7 +141,7 @@ const (
|
||||
// algorithm until either the lock is acquired successfully or more
|
||||
// time has elapsed than the timeout value.
|
||||
func (dm *DRWMutex) lockBlocking(ctx context.Context, id, source string, isReadLock bool, opts Options) (locked bool) {
|
||||
restClnts, owner := dm.clnt.GetLockers()
|
||||
restClnts, _ := dm.clnt.GetLockers()
|
||||
|
||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
|
||||
@ -149,8 +149,9 @@ func (dm *DRWMutex) lockBlocking(ctx context.Context, id, source string, isReadL
|
||||
locks := make([]string, len(restClnts))
|
||||
|
||||
log("lockBlocking %s/%s for %#v: lockType readLock(%t), additional opts: %#v\n", id, source, dm.Names, isReadLock, opts)
|
||||
retryCtx, cancel := context.WithTimeout(ctx, opts.Timeout)
|
||||
|
||||
// Add total timeout
|
||||
ctx, cancel := context.WithTimeout(ctx, opts.Timeout)
|
||||
defer cancel()
|
||||
|
||||
// Tolerance is not set, defaults to half of the locker clients.
|
||||
@ -175,19 +176,11 @@ func (dm *DRWMutex) lockBlocking(ctx context.Context, id, source string, isReadL
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-retryCtx.Done():
|
||||
log("lockBlocking canceled %s/%s for %#v: lockType readLock(%t), additional opts: %#v\n", id, source, dm.Names, isReadLock, opts)
|
||||
|
||||
// Caller context canceled or we timedout,
|
||||
// return false anyways for both situations.
|
||||
|
||||
// make sure to unlock any successful locks, since caller has timedout or canceled the request.
|
||||
releaseAll(dm.clnt, tolerance, owner, &locks, isReadLock, restClnts, dm.Names...)
|
||||
|
||||
case <-ctx.Done():
|
||||
return false
|
||||
default:
|
||||
// Try to acquire the lock.
|
||||
if locked = lock(retryCtx, dm.clnt, &locks, id, source, isReadLock, tolerance, quorum, dm.Names...); locked {
|
||||
if locked = lock(ctx, dm.clnt, &locks, id, source, isReadLock, tolerance, quorum, dm.Names...); locked {
|
||||
dm.m.Lock()
|
||||
|
||||
// If success, copy array to object
|
||||
@ -201,6 +194,7 @@ func (dm *DRWMutex) lockBlocking(ctx context.Context, id, source string, isReadL
|
||||
}
|
||||
|
||||
dm.m.Unlock()
|
||||
log("lockBlocking %s/%s for %#v: granted\n", id, source, dm.Names)
|
||||
return locked
|
||||
}
|
||||
|
||||
@ -219,11 +213,12 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is
|
||||
|
||||
// Create buffered channel of size equal to total number of nodes.
|
||||
ch := make(chan Granted, len(restClnts))
|
||||
defer close(ch)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for index, c := range restClnts {
|
||||
|
||||
// Combined timout for the lock attempt.
|
||||
ctx, cancel := context.WithTimeout(ctx, DRWMutexAcquireTimeout)
|
||||
defer cancel()
|
||||
for index, c := range restClnts {
|
||||
wg.Add(1)
|
||||
// broadcast lock request to all nodes
|
||||
go func(index int, isReadLock bool, c NetLocker) {
|
||||
@ -231,7 +226,7 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is
|
||||
|
||||
g := Granted{index: index}
|
||||
if c == nil {
|
||||
log("dsync: nil locker")
|
||||
log("dsync: nil locker\n")
|
||||
ch <- g
|
||||
return
|
||||
}
|
||||
@ -247,42 +242,32 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is
|
||||
var locked bool
|
||||
var err error
|
||||
if isReadLock {
|
||||
if locked, err = c.RLock(ctx, args); err != nil {
|
||||
if locked, err = c.RLock(context.Background(), args); err != nil {
|
||||
log("dsync: Unable to call RLock failed with %s for %#v at %s\n", err, args, c)
|
||||
}
|
||||
} else {
|
||||
if locked, err = c.Lock(ctx, args); err != nil {
|
||||
if locked, err = c.Lock(context.Background(), args); err != nil {
|
||||
log("dsync: Unable to call Lock failed with %s for %#v at %s\n", err, args, c)
|
||||
}
|
||||
}
|
||||
|
||||
if locked {
|
||||
g.lockUID = args.UID
|
||||
}
|
||||
|
||||
ch <- g
|
||||
|
||||
}(index, isReadLock, c)
|
||||
}
|
||||
|
||||
quorumLocked := false
|
||||
|
||||
wg.Add(1)
|
||||
go func(isReadLock bool) {
|
||||
defer wg.Done()
|
||||
|
||||
// Wait until we have either
|
||||
//
|
||||
// a) received all lock responses
|
||||
// b) received too many 'non-'locks for quorum to be still possible
|
||||
// c) timedout
|
||||
// c) timed out
|
||||
//
|
||||
i, locksFailed := 0, 0
|
||||
done := false
|
||||
timeout := time.After(DRWMutexAcquireTimeout)
|
||||
|
||||
for ; i < len(restClnts); i++ { // Loop until we acquired all locks
|
||||
|
||||
select {
|
||||
case grant := <-ch:
|
||||
if grant.isLocked() {
|
||||
@ -294,21 +279,11 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is
|
||||
// We know that we are not going to get the lock anymore,
|
||||
// so exit out and release any locks that did get acquired
|
||||
done = true
|
||||
// Increment the number of grants received from the buffered channel.
|
||||
i++
|
||||
releaseAll(ds, tolerance, owner, locks, isReadLock, restClnts, lockNames...)
|
||||
}
|
||||
}
|
||||
case <-timeout:
|
||||
case <-ctx.Done():
|
||||
done = true
|
||||
// timeout happened, maybe one of the nodes is slow, count
|
||||
// number of locks to check whether we have quorum or not
|
||||
if !checkQuorumLocked(locks, quorum) {
|
||||
log("Quorum not met after timeout\n")
|
||||
releaseAll(ds, tolerance, owner, locks, isReadLock, restClnts, lockNames...)
|
||||
} else {
|
||||
log("Quorum met after timeout\n")
|
||||
}
|
||||
log("Timeout\n")
|
||||
}
|
||||
|
||||
if done {
|
||||
@ -317,23 +292,26 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is
|
||||
}
|
||||
|
||||
// Count locks in order to determine whether we have quorum or not
|
||||
quorumLocked = checkQuorumLocked(locks, quorum)
|
||||
quorumLocked := checkQuorumLocked(locks, quorum) && locksFailed <= tolerance
|
||||
if !quorumLocked {
|
||||
log("Quorum not met\n")
|
||||
releaseAll(ds, tolerance, owner, locks, isReadLock, restClnts, lockNames...)
|
||||
}
|
||||
|
||||
// Wait for the other responses and immediately release the locks
|
||||
// (do not add them to the locks array because the DRWMutex could
|
||||
// already has been unlocked again by the original calling thread)
|
||||
for ; i < len(restClnts); i++ {
|
||||
grantToBeReleased := <-ch
|
||||
// We may have some unused results in ch, release them async.
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(ch)
|
||||
for grantToBeReleased := range ch {
|
||||
if grantToBeReleased.isLocked() {
|
||||
// release lock
|
||||
log("Releasing abandoned lock\n")
|
||||
sendRelease(ds, restClnts[grantToBeReleased.index],
|
||||
owner,
|
||||
grantToBeReleased.lockUID, isReadLock, lockNames...)
|
||||
}
|
||||
}
|
||||
}(isReadLock)
|
||||
|
||||
wg.Wait()
|
||||
}()
|
||||
|
||||
return quorumLocked
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user