mirror of
https://github.com/minio/minio.git
synced 2024-12-24 06:05:55 -05:00
Don't retain context in locker (#10515)
Use the context for internal timeouts, but disconnect it from outgoing calls so we always receive the results and cancel it remotely.
This commit is contained in:
parent
f0819cce75
commit
2294e53a0b
@ -1284,8 +1284,8 @@ func (a adminAPIHandlers) OBDInfoHandler(w http.ResponseWriter, r *http.Request)
|
||||
deadlinedCtx, cancel := context.WithTimeout(ctx, deadline)
|
||||
defer cancel()
|
||||
|
||||
nsLock := objectAPI.NewNSLock(ctx, minioMetaBucket, "obd-in-progress")
|
||||
if err := nsLock.GetLock(newDynamicTimeout(deadline, deadline)); err != nil { // returns a locked lock
|
||||
nsLock := objectAPI.NewNSLock(minioMetaBucket, "obd-in-progress")
|
||||
if err := nsLock.GetLock(ctx, newDynamicTimeout(deadline, deadline)); err != nil { // returns a locked lock
|
||||
errResp(err)
|
||||
return
|
||||
}
|
||||
|
@ -69,10 +69,10 @@ func initDataCrawler(ctx context.Context, objAPI ObjectLayer) {
|
||||
// There should only ever be one crawler running per cluster.
|
||||
func runDataCrawler(ctx context.Context, objAPI ObjectLayer) {
|
||||
// Make sure only 1 crawler is running on the cluster.
|
||||
locker := objAPI.NewNSLock(ctx, minioMetaBucket, "runDataCrawler.lock")
|
||||
locker := objAPI.NewNSLock(minioMetaBucket, "runDataCrawler.lock")
|
||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
for {
|
||||
err := locker.GetLock(dataCrawlerLeaderLockTimeout)
|
||||
err := locker.GetLock(ctx, dataCrawlerLeaderLockTimeout)
|
||||
if err != nil {
|
||||
time.Sleep(time.Duration(r.Float64() * float64(dataCrawlStartDelay)))
|
||||
continue
|
||||
|
@ -148,7 +148,7 @@ type diskCache struct {
|
||||
// nsMutex namespace lock
|
||||
nsMutex *nsLockMap
|
||||
// Object functions pointing to the corresponding functions of backend implementation.
|
||||
NewNSLockFn func(ctx context.Context, cachePath string) RWLocker
|
||||
NewNSLockFn func(cachePath string) RWLocker
|
||||
}
|
||||
|
||||
// Inits the disk cache dir if it is not initialized already.
|
||||
@ -186,8 +186,8 @@ func newDiskCache(ctx context.Context, dir string, config cache.Config) (*diskCa
|
||||
go cache.scanCacheWritebackFailures(ctx)
|
||||
}
|
||||
cache.diskSpaceAvailable(0) // update if cache usage is already high.
|
||||
cache.NewNSLockFn = func(ctx context.Context, cachePath string) RWLocker {
|
||||
return cache.nsMutex.NewNSLock(ctx, nil, cachePath, "")
|
||||
cache.NewNSLockFn = func(cachePath string) RWLocker {
|
||||
return cache.nsMutex.NewNSLock(nil, cachePath, "")
|
||||
}
|
||||
return &cache, nil
|
||||
}
|
||||
@ -436,8 +436,8 @@ func (c *diskCache) Stat(ctx context.Context, bucket, object string) (oi ObjectI
|
||||
// if partial object is cached.
|
||||
func (c *diskCache) statCachedMeta(ctx context.Context, cacheObjPath string) (meta *cacheMeta, partial bool, numHits int, err error) {
|
||||
|
||||
cLock := c.NewNSLockFn(ctx, cacheObjPath)
|
||||
if err = cLock.GetRLock(globalOperationTimeout); err != nil {
|
||||
cLock := c.NewNSLockFn(cacheObjPath)
|
||||
if err = cLock.GetRLock(ctx, globalOperationTimeout); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
@ -518,8 +518,8 @@ func (c *diskCache) statCache(ctx context.Context, cacheObjPath string) (meta *c
|
||||
// incHitsOnly is true if metadata update is incrementing only the hit counter
|
||||
func (c *diskCache) SaveMetadata(ctx context.Context, bucket, object string, meta map[string]string, actualSize int64, rs *HTTPRangeSpec, rsFileName string, incHitsOnly bool) error {
|
||||
cachedPath := getCacheSHADir(c.dir, bucket, object)
|
||||
cLock := c.NewNSLockFn(ctx, cachedPath)
|
||||
if err := cLock.GetLock(globalOperationTimeout); err != nil {
|
||||
cLock := c.NewNSLockFn(cachedPath)
|
||||
if err := cLock.GetLock(ctx, globalOperationTimeout); err != nil {
|
||||
return err
|
||||
}
|
||||
defer cLock.Unlock()
|
||||
@ -696,8 +696,8 @@ func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Read
|
||||
return oi, errDiskFull
|
||||
}
|
||||
cachePath := getCacheSHADir(c.dir, bucket, object)
|
||||
cLock := c.NewNSLockFn(ctx, cachePath)
|
||||
if err := cLock.GetLock(globalOperationTimeout); err != nil {
|
||||
cLock := c.NewNSLockFn(cachePath)
|
||||
if err := cLock.GetLock(ctx, globalOperationTimeout); err != nil {
|
||||
return oi, err
|
||||
}
|
||||
defer cLock.Unlock()
|
||||
@ -910,8 +910,8 @@ func (c *diskCache) bitrotReadFromCache(ctx context.Context, filePath string, of
|
||||
// Get returns ObjectInfo and reader for object from disk cache
|
||||
func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, numHits int, err error) {
|
||||
cacheObjPath := getCacheSHADir(c.dir, bucket, object)
|
||||
cLock := c.NewNSLockFn(ctx, cacheObjPath)
|
||||
if err := cLock.GetRLock(globalOperationTimeout); err != nil {
|
||||
cLock := c.NewNSLockFn(cacheObjPath)
|
||||
if err := cLock.GetRLock(ctx, globalOperationTimeout); err != nil {
|
||||
return nil, numHits, err
|
||||
}
|
||||
|
||||
@ -974,8 +974,8 @@ func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRang
|
||||
|
||||
// Deletes the cached object
|
||||
func (c *diskCache) delete(ctx context.Context, cacheObjPath string) (err error) {
|
||||
cLock := c.NewNSLockFn(ctx, cacheObjPath)
|
||||
if err := cLock.GetLock(globalOperationTimeout); err != nil {
|
||||
cLock := c.NewNSLockFn(cacheObjPath)
|
||||
if err := cLock.GetLock(ctx, globalOperationTimeout); err != nil {
|
||||
return err
|
||||
}
|
||||
defer cLock.Unlock()
|
||||
|
@ -355,8 +355,8 @@ func (er erasureObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObjec
|
||||
//
|
||||
// Implements S3 compatible Upload Part API.
|
||||
func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *PutObjReader, opts ObjectOptions) (pi PartInfo, err error) {
|
||||
uploadIDLock := er.NewNSLock(ctx, bucket, pathJoin(object, uploadID))
|
||||
if err = uploadIDLock.GetRLock(globalOperationTimeout); err != nil {
|
||||
uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
||||
if err = uploadIDLock.GetRLock(ctx, globalOperationTimeout); err != nil {
|
||||
return PartInfo{}, err
|
||||
}
|
||||
readLocked := true
|
||||
@ -469,7 +469,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
||||
// PutObjectParts would serialize here updating `xl.meta`
|
||||
uploadIDLock.RUnlock()
|
||||
readLocked = false
|
||||
if err = uploadIDLock.GetLock(globalOperationTimeout); err != nil {
|
||||
if err = uploadIDLock.GetLock(ctx, globalOperationTimeout); err != nil {
|
||||
return PartInfo{}, err
|
||||
}
|
||||
defer uploadIDLock.Unlock()
|
||||
@ -550,8 +550,8 @@ func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, u
|
||||
UploadID: uploadID,
|
||||
}
|
||||
|
||||
uploadIDLock := er.NewNSLock(ctx, bucket, pathJoin(object, uploadID))
|
||||
if err := uploadIDLock.GetRLock(globalOperationTimeout); err != nil {
|
||||
uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
||||
if err := uploadIDLock.GetRLock(ctx, globalOperationTimeout); err != nil {
|
||||
return MultipartInfo{}, err
|
||||
}
|
||||
defer uploadIDLock.RUnlock()
|
||||
@ -598,8 +598,8 @@ func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, u
|
||||
// ListPartsInfo structure is marshaled directly into XML and
|
||||
// replied back to the client.
|
||||
func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker, maxParts int, opts ObjectOptions) (result ListPartsInfo, e error) {
|
||||
uploadIDLock := er.NewNSLock(ctx, bucket, pathJoin(object, uploadID))
|
||||
if err := uploadIDLock.GetRLock(globalOperationTimeout); err != nil {
|
||||
uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
||||
if err := uploadIDLock.GetRLock(ctx, globalOperationTimeout); err != nil {
|
||||
return ListPartsInfo{}, err
|
||||
}
|
||||
defer uploadIDLock.RUnlock()
|
||||
@ -691,8 +691,8 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up
|
||||
func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket string, object string, uploadID string, parts []CompletePart, opts ObjectOptions) (oi ObjectInfo, err error) {
|
||||
// Hold read-locks to verify uploaded parts, also disallows
|
||||
// parallel part uploads as well.
|
||||
uploadIDLock := er.NewNSLock(ctx, bucket, pathJoin(object, uploadID))
|
||||
if err = uploadIDLock.GetRLock(globalOperationTimeout); err != nil {
|
||||
uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
||||
if err = uploadIDLock.GetRLock(ctx, globalOperationTimeout); err != nil {
|
||||
return oi, err
|
||||
}
|
||||
defer uploadIDLock.RUnlock()
|
||||
@ -844,8 +844,8 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
|
||||
}
|
||||
|
||||
// Hold namespace to complete the transaction
|
||||
lk := er.NewNSLock(ctx, bucket, object)
|
||||
if err = lk.GetLock(globalOperationTimeout); err != nil {
|
||||
lk := er.NewNSLock(bucket, object)
|
||||
if err = lk.GetLock(ctx, globalOperationTimeout); err != nil {
|
||||
return oi, err
|
||||
}
|
||||
defer lk.Unlock()
|
||||
@ -886,8 +886,8 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
|
||||
// would be removed from the system, rollback is not possible on this
|
||||
// operation.
|
||||
func (er erasureObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) error {
|
||||
lk := er.NewNSLock(ctx, bucket, pathJoin(object, uploadID))
|
||||
if err := lk.GetLock(globalOperationTimeout); err != nil {
|
||||
lk := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
||||
if err := lk.GetLock(ctx, globalOperationTimeout); err != nil {
|
||||
return err
|
||||
}
|
||||
defer lk.Unlock()
|
||||
|
@ -48,8 +48,8 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d
|
||||
}
|
||||
|
||||
defer ObjectPathUpdated(path.Join(dstBucket, dstObject))
|
||||
lk := er.NewNSLock(ctx, dstBucket, dstObject)
|
||||
if err := lk.GetLock(globalOperationTimeout); err != nil {
|
||||
lk := er.NewNSLock(dstBucket, dstObject)
|
||||
if err := lk.GetLock(ctx, globalOperationTimeout); err != nil {
|
||||
return oi, err
|
||||
}
|
||||
defer lk.Unlock()
|
||||
@ -135,15 +135,15 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri
|
||||
|
||||
// Acquire lock
|
||||
if lockType != noLock {
|
||||
lock := er.NewNSLock(ctx, bucket, object)
|
||||
lock := er.NewNSLock(bucket, object)
|
||||
switch lockType {
|
||||
case writeLock:
|
||||
if err = lock.GetLock(globalOperationTimeout); err != nil {
|
||||
if err = lock.GetLock(ctx, globalOperationTimeout); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
nsUnlocker = lock.Unlock
|
||||
case readLock:
|
||||
if err = lock.GetRLock(globalOperationTimeout); err != nil {
|
||||
if err = lock.GetRLock(ctx, globalOperationTimeout); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
nsUnlocker = lock.RUnlock
|
||||
@ -196,8 +196,8 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri
|
||||
// length indicates the total length of the object.
|
||||
func (er erasureObjects) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) error {
|
||||
// Lock the object before reading.
|
||||
lk := er.NewNSLock(ctx, bucket, object)
|
||||
if err := lk.GetRLock(globalOperationTimeout); err != nil {
|
||||
lk := er.NewNSLock(bucket, object)
|
||||
if err := lk.GetRLock(ctx, globalOperationTimeout); err != nil {
|
||||
return err
|
||||
}
|
||||
defer lk.RUnlock()
|
||||
@ -343,8 +343,8 @@ func (er erasureObjects) getObject(ctx context.Context, bucket, object string, s
|
||||
// GetObjectInfo - reads object metadata and replies back ObjectInfo.
|
||||
func (er erasureObjects) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (info ObjectInfo, err error) {
|
||||
// Lock the object before reading.
|
||||
lk := er.NewNSLock(ctx, bucket, object)
|
||||
if err := lk.GetRLock(globalOperationTimeout); err != nil {
|
||||
lk := er.NewNSLock(bucket, object)
|
||||
if err := lk.GetRLock(ctx, globalOperationTimeout); err != nil {
|
||||
return ObjectInfo{}, err
|
||||
}
|
||||
defer lk.RUnlock()
|
||||
@ -635,8 +635,8 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
|
||||
return ObjectInfo{}, IncompleteBody{Bucket: bucket, Object: object}
|
||||
}
|
||||
|
||||
lk := er.NewNSLock(ctx, bucket, object)
|
||||
if err := lk.GetLock(globalOperationTimeout); err != nil {
|
||||
lk := er.NewNSLock(bucket, object)
|
||||
if err := lk.GetLock(ctx, globalOperationTimeout); err != nil {
|
||||
return ObjectInfo{}, err
|
||||
}
|
||||
defer lk.Unlock()
|
||||
@ -906,8 +906,8 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
|
||||
}
|
||||
|
||||
// Acquire a write lock before deleting the object.
|
||||
lk := er.NewNSLock(ctx, bucket, object)
|
||||
if err = lk.GetLock(globalDeleteOperationTimeout); err != nil {
|
||||
lk := er.NewNSLock(bucket, object)
|
||||
if err = lk.GetLock(ctx, globalDeleteOperationTimeout); err != nil {
|
||||
return ObjectInfo{}, err
|
||||
}
|
||||
defer lk.Unlock()
|
||||
|
@ -88,8 +88,8 @@ func newErasureServerSets(ctx context.Context, endpointServerSets EndpointServer
|
||||
return z, nil
|
||||
}
|
||||
|
||||
func (z *erasureServerSets) NewNSLock(ctx context.Context, bucket string, objects ...string) RWLocker {
|
||||
return z.serverSets[0].NewNSLock(ctx, bucket, objects...)
|
||||
func (z *erasureServerSets) NewNSLock(bucket string, objects ...string) RWLocker {
|
||||
return z.serverSets[0].NewNSLock(bucket, objects...)
|
||||
}
|
||||
|
||||
func (z *erasureServerSets) GetAllLockers() []dsync.NetLocker {
|
||||
@ -569,8 +569,8 @@ func (z *erasureServerSets) DeleteObjects(ctx context.Context, bucket string, ob
|
||||
}
|
||||
|
||||
// Acquire a bulk write lock across 'objects'
|
||||
multiDeleteLock := z.NewNSLock(ctx, bucket, objSets.ToSlice()...)
|
||||
if err := multiDeleteLock.GetLock(globalOperationTimeout); err != nil {
|
||||
multiDeleteLock := z.NewNSLock(bucket, objSets.ToSlice()...)
|
||||
if err := multiDeleteLock.GetLock(ctx, globalOperationTimeout); err != nil {
|
||||
for i := range derrs {
|
||||
derrs[i] = err
|
||||
}
|
||||
@ -1167,8 +1167,8 @@ func (z *erasureServerSets) ListBuckets(ctx context.Context) (buckets []BucketIn
|
||||
|
||||
func (z *erasureServerSets) HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error) {
|
||||
// Acquire lock on format.json
|
||||
formatLock := z.NewNSLock(ctx, minioMetaBucket, formatConfigFile)
|
||||
if err := formatLock.GetLock(globalOperationTimeout); err != nil {
|
||||
formatLock := z.NewNSLock(minioMetaBucket, formatConfigFile)
|
||||
if err := formatLock.GetLock(ctx, globalOperationTimeout); err != nil {
|
||||
return madmin.HealResultItem{}, err
|
||||
}
|
||||
defer formatLock.Unlock()
|
||||
@ -1349,17 +1349,17 @@ func (z *erasureServerSets) HealObjects(ctx context.Context, bucket, prefix stri
|
||||
func (z *erasureServerSets) HealObject(ctx context.Context, bucket, object, versionID string, opts madmin.HealOpts) (madmin.HealResultItem, error) {
|
||||
object = encodeDirObject(object)
|
||||
|
||||
lk := z.NewNSLock(ctx, bucket, object)
|
||||
lk := z.NewNSLock(bucket, object)
|
||||
if bucket == minioMetaBucket {
|
||||
// For .minio.sys bucket heals we should hold write locks.
|
||||
if err := lk.GetLock(globalOperationTimeout); err != nil {
|
||||
if err := lk.GetLock(ctx, globalOperationTimeout); err != nil {
|
||||
return madmin.HealResultItem{}, err
|
||||
}
|
||||
defer lk.Unlock()
|
||||
} else {
|
||||
// Lock the object before healing. Use read lock since healing
|
||||
// will only regenerate parts & xl.meta of outdated disks.
|
||||
if err := lk.GetRLock(globalOperationTimeout); err != nil {
|
||||
if err := lk.GetRLock(ctx, globalOperationTimeout); err != nil {
|
||||
return madmin.HealResultItem{}, err
|
||||
}
|
||||
defer lk.RUnlock()
|
||||
|
@ -417,11 +417,11 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto
|
||||
}
|
||||
|
||||
// NewNSLock - initialize a new namespace RWLocker instance.
|
||||
func (s *erasureSets) NewNSLock(ctx context.Context, bucket string, objects ...string) RWLocker {
|
||||
func (s *erasureSets) NewNSLock(bucket string, objects ...string) RWLocker {
|
||||
if len(objects) == 1 {
|
||||
return s.getHashedSet(objects[0]).NewNSLock(ctx, bucket, objects...)
|
||||
return s.getHashedSet(objects[0]).NewNSLock(bucket, objects...)
|
||||
}
|
||||
return s.getHashedSet("").NewNSLock(ctx, bucket, objects...)
|
||||
return s.getHashedSet("").NewNSLock(bucket, objects...)
|
||||
}
|
||||
|
||||
// SetDriveCount returns the current drives per set.
|
||||
|
@ -68,8 +68,8 @@ type erasureObjects struct {
|
||||
}
|
||||
|
||||
// NewNSLock - initialize a new namespace RWLocker instance.
|
||||
func (er erasureObjects) NewNSLock(ctx context.Context, bucket string, objects ...string) RWLocker {
|
||||
return er.nsMutex.NewNSLock(ctx, er.getLockers, bucket, objects...)
|
||||
func (er erasureObjects) NewNSLock(bucket string, objects ...string) RWLocker {
|
||||
return er.nsMutex.NewNSLock(er.getLockers, bucket, objects...)
|
||||
}
|
||||
|
||||
// SetDriveCount returns the current drives per set.
|
||||
|
@ -714,8 +714,8 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
|
||||
}
|
||||
|
||||
// Hold write lock on the object.
|
||||
destLock := fs.NewNSLock(ctx, bucket, object)
|
||||
if err = destLock.GetLock(globalOperationTimeout); err != nil {
|
||||
destLock := fs.NewNSLock(bucket, object)
|
||||
if err = destLock.GetLock(ctx, globalOperationTimeout); err != nil {
|
||||
return oi, err
|
||||
}
|
||||
defer destLock.Unlock()
|
||||
|
34
cmd/fs-v1.go
34
cmd/fs-v1.go
@ -186,9 +186,9 @@ func NewFSObjectLayer(fsPath string) (ObjectLayer, error) {
|
||||
}
|
||||
|
||||
// NewNSLock - initialize a new namespace RWLocker instance.
|
||||
func (fs *FSObjects) NewNSLock(ctx context.Context, bucket string, objects ...string) RWLocker {
|
||||
func (fs *FSObjects) NewNSLock(bucket string, objects ...string) RWLocker {
|
||||
// lockers are explicitly 'nil' for FS mode since there are only local lockers
|
||||
return fs.nsMutex.NewNSLock(ctx, nil, bucket, objects...)
|
||||
return fs.nsMutex.NewNSLock(nil, bucket, objects...)
|
||||
}
|
||||
|
||||
// SetDriveCount no-op
|
||||
@ -602,8 +602,8 @@ func (fs *FSObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBu
|
||||
defer ObjectPathUpdated(path.Join(dstBucket, dstObject))
|
||||
|
||||
if !cpSrcDstSame {
|
||||
objectDWLock := fs.NewNSLock(ctx, dstBucket, dstObject)
|
||||
if err := objectDWLock.GetLock(globalOperationTimeout); err != nil {
|
||||
objectDWLock := fs.NewNSLock(dstBucket, dstObject)
|
||||
if err := objectDWLock.GetLock(ctx, globalOperationTimeout); err != nil {
|
||||
return oi, err
|
||||
}
|
||||
defer objectDWLock.Unlock()
|
||||
@ -693,15 +693,15 @@ func (fs *FSObjects) GetObjectNInfo(ctx context.Context, bucket, object string,
|
||||
|
||||
if lockType != noLock {
|
||||
// Lock the object before reading.
|
||||
lock := fs.NewNSLock(ctx, bucket, object)
|
||||
lock := fs.NewNSLock(bucket, object)
|
||||
switch lockType {
|
||||
case writeLock:
|
||||
if err = lock.GetLock(globalOperationTimeout); err != nil {
|
||||
if err = lock.GetLock(ctx, globalOperationTimeout); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
nsUnlocker = lock.Unlock
|
||||
case readLock:
|
||||
if err = lock.GetRLock(globalOperationTimeout); err != nil {
|
||||
if err = lock.GetRLock(ctx, globalOperationTimeout); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
nsUnlocker = lock.RUnlock
|
||||
@ -786,8 +786,8 @@ func (fs *FSObjects) GetObject(ctx context.Context, bucket, object string, offse
|
||||
}
|
||||
|
||||
// Lock the object before reading.
|
||||
lk := fs.NewNSLock(ctx, bucket, object)
|
||||
if err := lk.GetRLock(globalOperationTimeout); err != nil {
|
||||
lk := fs.NewNSLock(bucket, object)
|
||||
if err := lk.GetRLock(ctx, globalOperationTimeout); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return err
|
||||
}
|
||||
@ -1014,8 +1014,8 @@ func (fs *FSObjects) getObjectInfo(ctx context.Context, bucket, object string) (
|
||||
// getObjectInfoWithLock - reads object metadata and replies back ObjectInfo.
|
||||
func (fs *FSObjects) getObjectInfoWithLock(ctx context.Context, bucket, object string) (oi ObjectInfo, e error) {
|
||||
// Lock the object before reading.
|
||||
lk := fs.NewNSLock(ctx, bucket, object)
|
||||
if err := lk.GetRLock(globalOperationTimeout); err != nil {
|
||||
lk := fs.NewNSLock(bucket, object)
|
||||
if err := lk.GetRLock(ctx, globalOperationTimeout); err != nil {
|
||||
return oi, err
|
||||
}
|
||||
defer lk.RUnlock()
|
||||
@ -1052,8 +1052,8 @@ func (fs *FSObjects) GetObjectInfo(ctx context.Context, bucket, object string, o
|
||||
|
||||
oi, err := fs.getObjectInfoWithLock(ctx, bucket, object)
|
||||
if err == errCorruptedFormat || err == io.EOF {
|
||||
lk := fs.NewNSLock(ctx, bucket, object)
|
||||
if err = lk.GetLock(globalOperationTimeout); err != nil {
|
||||
lk := fs.NewNSLock(bucket, object)
|
||||
if err = lk.GetLock(ctx, globalOperationTimeout); err != nil {
|
||||
return oi, toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
@ -1103,8 +1103,8 @@ func (fs *FSObjects) PutObject(ctx context.Context, bucket string, object string
|
||||
}
|
||||
|
||||
// Lock the object.
|
||||
lk := fs.NewNSLock(ctx, bucket, object)
|
||||
if err := lk.GetLock(globalOperationTimeout); err != nil {
|
||||
lk := fs.NewNSLock(bucket, object)
|
||||
if err := lk.GetLock(ctx, globalOperationTimeout); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return objInfo, err
|
||||
}
|
||||
@ -1285,8 +1285,8 @@ func (fs *FSObjects) DeleteObject(ctx context.Context, bucket, object string, op
|
||||
}
|
||||
|
||||
// Acquire a write lock before deleting the object.
|
||||
lk := fs.NewNSLock(ctx, bucket, object)
|
||||
if err = lk.GetLock(globalOperationTimeout); err != nil {
|
||||
lk := fs.NewNSLock(bucket, object)
|
||||
if err = lk.GetLock(ctx, globalOperationTimeout); err != nil {
|
||||
return objInfo, err
|
||||
}
|
||||
defer lk.Unlock()
|
||||
|
@ -51,8 +51,8 @@ type GatewayLocker struct {
|
||||
}
|
||||
|
||||
// NewNSLock - implements gateway level locker
|
||||
func (l *GatewayLocker) NewNSLock(ctx context.Context, bucket string, objects ...string) RWLocker {
|
||||
return l.nsMutex.NewNSLock(ctx, nil, bucket, objects...)
|
||||
func (l *GatewayLocker) NewNSLock(bucket string, objects ...string) RWLocker {
|
||||
return l.nsMutex.NewNSLock(nil, bucket, objects...)
|
||||
}
|
||||
|
||||
// Walk - implements common gateway level Walker, to walk on all objects recursively at a prefix
|
||||
|
@ -41,8 +41,8 @@ func (a GatewayUnsupported) CrawlAndGetDataUsage(ctx context.Context, bf *bloomF
|
||||
}
|
||||
|
||||
// NewNSLock is a dummy stub for gateway.
|
||||
func (a GatewayUnsupported) NewNSLock(ctx context.Context, bucket string, objects ...string) RWLocker {
|
||||
logger.CriticalIf(ctx, errors.New("not implemented"))
|
||||
func (a GatewayUnsupported) NewNSLock(bucket string, objects ...string) RWLocker {
|
||||
logger.CriticalIf(context.Background(), errors.New("not implemented"))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -101,6 +101,9 @@ func (t *apiConfig) getRequestsPool() (chan struct{}, <-chan time.Time) {
|
||||
if t.requestsPool == nil {
|
||||
return nil, nil
|
||||
}
|
||||
if t.requestsDeadline <= 0 {
|
||||
return t.requestsPool, nil
|
||||
}
|
||||
|
||||
return t.requestsPool, time.NewTimer(t.requestsDeadline).C
|
||||
}
|
||||
|
@ -429,7 +429,7 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) {
|
||||
defer cancel()
|
||||
|
||||
// Hold the lock for migration only.
|
||||
txnLk := objAPI.NewNSLock(retryCtx, minioMetaBucket, minioConfigPrefix+"/iam.lock")
|
||||
txnLk := objAPI.NewNSLock(minioMetaBucket, minioConfigPrefix+"/iam.lock")
|
||||
|
||||
// Initializing IAM sub-system needs a retry mechanism for
|
||||
// the following reasons:
|
||||
@ -449,7 +449,7 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) {
|
||||
for {
|
||||
// let one of the server acquire the lock, if not let them timeout.
|
||||
// which shall be retried again by this loop.
|
||||
if err = txnLk.GetLock(iamLockTimeout); err != nil {
|
||||
if err = txnLk.GetLock(retryCtx, iamLockTimeout); err != nil {
|
||||
logger.Info("Waiting for all MinIO IAM sub-system to be initialized.. trying to acquire lock")
|
||||
time.Sleep(time.Duration(r.Float64() * float64(5*time.Second)))
|
||||
continue
|
||||
|
@ -38,9 +38,9 @@ var globalLockServers = make(map[Endpoint]*localLocker)
|
||||
|
||||
// RWLocker - locker interface to introduce GetRLock, RUnlock.
|
||||
type RWLocker interface {
|
||||
GetLock(timeout *dynamicTimeout) (timedOutErr error)
|
||||
GetLock(ctx context.Context, timeout *dynamicTimeout) (timedOutErr error)
|
||||
Unlock()
|
||||
GetRLock(timeout *dynamicTimeout) (timedOutErr error)
|
||||
GetRLock(ctx context.Context, timeout *dynamicTimeout) (timedOutErr error)
|
||||
RUnlock()
|
||||
}
|
||||
|
||||
@ -139,15 +139,14 @@ func (n *nsLockMap) unlock(volume string, path string, readLock bool) {
|
||||
type distLockInstance struct {
|
||||
rwMutex *dsync.DRWMutex
|
||||
opsID string
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
// Lock - block until write lock is taken or timeout has occurred.
|
||||
func (di *distLockInstance) GetLock(timeout *dynamicTimeout) (timedOutErr error) {
|
||||
func (di *distLockInstance) GetLock(ctx context.Context, timeout *dynamicTimeout) (timedOutErr error) {
|
||||
lockSource := getSource(2)
|
||||
start := UTCNow()
|
||||
|
||||
if !di.rwMutex.GetLock(di.ctx, di.opsID, lockSource, dsync.Options{
|
||||
if !di.rwMutex.GetLock(ctx, di.opsID, lockSource, dsync.Options{
|
||||
Timeout: timeout.Timeout(),
|
||||
}) {
|
||||
timeout.LogFailure()
|
||||
@ -163,11 +162,11 @@ func (di *distLockInstance) Unlock() {
|
||||
}
|
||||
|
||||
// RLock - block until read lock is taken or timeout has occurred.
|
||||
func (di *distLockInstance) GetRLock(timeout *dynamicTimeout) (timedOutErr error) {
|
||||
func (di *distLockInstance) GetRLock(ctx context.Context, timeout *dynamicTimeout) (timedOutErr error) {
|
||||
lockSource := getSource(2)
|
||||
start := UTCNow()
|
||||
|
||||
if !di.rwMutex.GetRLock(di.ctx, di.opsID, lockSource, dsync.Options{
|
||||
if !di.rwMutex.GetRLock(ctx, di.opsID, lockSource, dsync.Options{
|
||||
Timeout: timeout.Timeout(),
|
||||
}) {
|
||||
timeout.LogFailure()
|
||||
@ -184,7 +183,6 @@ func (di *distLockInstance) RUnlock() {
|
||||
|
||||
// localLockInstance - frontend/top-level interface for namespace locks.
|
||||
type localLockInstance struct {
|
||||
ctx context.Context
|
||||
ns *nsLockMap
|
||||
volume string
|
||||
paths []string
|
||||
@ -194,26 +192,26 @@ type localLockInstance struct {
|
||||
// NewNSLock - returns a lock instance for a given volume and
|
||||
// path. The returned lockInstance object encapsulates the nsLockMap,
|
||||
// volume, path and operation ID.
|
||||
func (n *nsLockMap) NewNSLock(ctx context.Context, lockers func() ([]dsync.NetLocker, string), volume string, paths ...string) RWLocker {
|
||||
func (n *nsLockMap) NewNSLock(lockers func() ([]dsync.NetLocker, string), volume string, paths ...string) RWLocker {
|
||||
opsID := mustGetUUID()
|
||||
if n.isDistErasure {
|
||||
drwmutex := dsync.NewDRWMutex(&dsync.Dsync{
|
||||
GetLockers: lockers,
|
||||
}, pathsJoinPrefix(volume, paths...)...)
|
||||
return &distLockInstance{drwmutex, opsID, ctx}
|
||||
return &distLockInstance{drwmutex, opsID}
|
||||
}
|
||||
sort.Strings(paths)
|
||||
return &localLockInstance{ctx, n, volume, paths, opsID}
|
||||
return &localLockInstance{n, volume, paths, opsID}
|
||||
}
|
||||
|
||||
// Lock - block until write lock is taken or timeout has occurred.
|
||||
func (li *localLockInstance) GetLock(timeout *dynamicTimeout) (timedOutErr error) {
|
||||
func (li *localLockInstance) GetLock(ctx context.Context, timeout *dynamicTimeout) (timedOutErr error) {
|
||||
lockSource := getSource(2)
|
||||
start := UTCNow()
|
||||
readLock := false
|
||||
const readLock = false
|
||||
var success []int
|
||||
for i, path := range li.paths {
|
||||
if !li.ns.lock(li.ctx, li.volume, path, lockSource, li.opsID, readLock, timeout.Timeout()) {
|
||||
if !li.ns.lock(ctx, li.volume, path, lockSource, li.opsID, readLock, timeout.Timeout()) {
|
||||
timeout.LogFailure()
|
||||
for _, sint := range success {
|
||||
li.ns.unlock(li.volume, li.paths[sint], readLock)
|
||||
@ -228,20 +226,20 @@ func (li *localLockInstance) GetLock(timeout *dynamicTimeout) (timedOutErr error
|
||||
|
||||
// Unlock - block until write lock is released.
|
||||
func (li *localLockInstance) Unlock() {
|
||||
readLock := false
|
||||
const readLock = false
|
||||
for _, path := range li.paths {
|
||||
li.ns.unlock(li.volume, path, readLock)
|
||||
}
|
||||
}
|
||||
|
||||
// RLock - block until read lock is taken or timeout has occurred.
|
||||
func (li *localLockInstance) GetRLock(timeout *dynamicTimeout) (timedOutErr error) {
|
||||
func (li *localLockInstance) GetRLock(ctx context.Context, timeout *dynamicTimeout) (timedOutErr error) {
|
||||
lockSource := getSource(2)
|
||||
start := UTCNow()
|
||||
readLock := true
|
||||
const readLock = true
|
||||
var success []int
|
||||
for i, path := range li.paths {
|
||||
if !li.ns.lock(li.ctx, li.volume, path, lockSource, li.opsID, readLock, timeout.Timeout()) {
|
||||
if !li.ns.lock(ctx, li.volume, path, lockSource, li.opsID, readLock, timeout.Timeout()) {
|
||||
timeout.LogFailure()
|
||||
for _, sint := range success {
|
||||
li.ns.unlock(li.volume, li.paths[sint], readLock)
|
||||
@ -256,7 +254,7 @@ func (li *localLockInstance) GetRLock(timeout *dynamicTimeout) (timedOutErr erro
|
||||
|
||||
// RUnlock - block until read lock is released.
|
||||
func (li *localLockInstance) RUnlock() {
|
||||
readLock := true
|
||||
const readLock = true
|
||||
for _, path := range li.paths {
|
||||
li.ns.unlock(li.volume, path, readLock)
|
||||
}
|
||||
|
@ -68,7 +68,7 @@ type ObjectLayer interface {
|
||||
SetDriveCount() int // Only implemented by erasure layer
|
||||
|
||||
// Locking operations on object.
|
||||
NewNSLock(ctx context.Context, bucket string, objects ...string) RWLocker
|
||||
NewNSLock(bucket string, objects ...string) RWLocker
|
||||
|
||||
// Storage operations.
|
||||
Shutdown(context.Context) error
|
||||
|
@ -211,7 +211,7 @@ func initServer(ctx context.Context, newObject ObjectLayer) error {
|
||||
// at a given time, this big transaction lock ensures this
|
||||
// appropriately. This is also true for rotation of encrypted
|
||||
// content.
|
||||
txnLk := newObject.NewNSLock(ctx, minioMetaBucket, minioConfigPrefix+"/transaction.lock")
|
||||
txnLk := newObject.NewNSLock(minioMetaBucket, minioConfigPrefix+"/transaction.lock")
|
||||
|
||||
// **** WARNING ****
|
||||
// Migrating to encrypted backend should happen before initialization of any
|
||||
@ -241,7 +241,7 @@ func initServer(ctx context.Context, newObject ObjectLayer) error {
|
||||
|
||||
// let one of the server acquire the lock, if not let them timeout.
|
||||
// which shall be retried again by this loop.
|
||||
if err = txnLk.GetLock(lockTimeout); err != nil {
|
||||
if err = txnLk.GetLock(ctx, lockTimeout); err != nil {
|
||||
logger.Info("Waiting for all MinIO sub-systems to be initialized.. trying to acquire lock")
|
||||
|
||||
time.Sleep(time.Duration(r.Float64() * float64(5*time.Second)))
|
||||
|
@ -141,7 +141,7 @@ const (
|
||||
// algorithm until either the lock is acquired successfully or more
|
||||
// time has elapsed than the timeout value.
|
||||
func (dm *DRWMutex) lockBlocking(ctx context.Context, id, source string, isReadLock bool, opts Options) (locked bool) {
|
||||
restClnts, owner := dm.clnt.GetLockers()
|
||||
restClnts, _ := dm.clnt.GetLockers()
|
||||
|
||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
|
||||
@ -149,8 +149,9 @@ func (dm *DRWMutex) lockBlocking(ctx context.Context, id, source string, isReadL
|
||||
locks := make([]string, len(restClnts))
|
||||
|
||||
log("lockBlocking %s/%s for %#v: lockType readLock(%t), additional opts: %#v\n", id, source, dm.Names, isReadLock, opts)
|
||||
retryCtx, cancel := context.WithTimeout(ctx, opts.Timeout)
|
||||
|
||||
// Add total timeout
|
||||
ctx, cancel := context.WithTimeout(ctx, opts.Timeout)
|
||||
defer cancel()
|
||||
|
||||
// Tolerance is not set, defaults to half of the locker clients.
|
||||
@ -175,19 +176,11 @@ func (dm *DRWMutex) lockBlocking(ctx context.Context, id, source string, isReadL
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-retryCtx.Done():
|
||||
log("lockBlocking canceled %s/%s for %#v: lockType readLock(%t), additional opts: %#v\n", id, source, dm.Names, isReadLock, opts)
|
||||
|
||||
// Caller context canceled or we timedout,
|
||||
// return false anyways for both situations.
|
||||
|
||||
// make sure to unlock any successful locks, since caller has timedout or canceled the request.
|
||||
releaseAll(dm.clnt, tolerance, owner, &locks, isReadLock, restClnts, dm.Names...)
|
||||
|
||||
case <-ctx.Done():
|
||||
return false
|
||||
default:
|
||||
// Try to acquire the lock.
|
||||
if locked = lock(retryCtx, dm.clnt, &locks, id, source, isReadLock, tolerance, quorum, dm.Names...); locked {
|
||||
if locked = lock(ctx, dm.clnt, &locks, id, source, isReadLock, tolerance, quorum, dm.Names...); locked {
|
||||
dm.m.Lock()
|
||||
|
||||
// If success, copy array to object
|
||||
@ -201,6 +194,7 @@ func (dm *DRWMutex) lockBlocking(ctx context.Context, id, source string, isReadL
|
||||
}
|
||||
|
||||
dm.m.Unlock()
|
||||
log("lockBlocking %s/%s for %#v: granted\n", id, source, dm.Names)
|
||||
return locked
|
||||
}
|
||||
|
||||
@ -219,11 +213,12 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is
|
||||
|
||||
// Create buffered channel of size equal to total number of nodes.
|
||||
ch := make(chan Granted, len(restClnts))
|
||||
defer close(ch)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for index, c := range restClnts {
|
||||
|
||||
// Combined timout for the lock attempt.
|
||||
ctx, cancel := context.WithTimeout(ctx, DRWMutexAcquireTimeout)
|
||||
defer cancel()
|
||||
for index, c := range restClnts {
|
||||
wg.Add(1)
|
||||
// broadcast lock request to all nodes
|
||||
go func(index int, isReadLock bool, c NetLocker) {
|
||||
@ -231,7 +226,7 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is
|
||||
|
||||
g := Granted{index: index}
|
||||
if c == nil {
|
||||
log("dsync: nil locker")
|
||||
log("dsync: nil locker\n")
|
||||
ch <- g
|
||||
return
|
||||
}
|
||||
@ -247,93 +242,76 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is
|
||||
var locked bool
|
||||
var err error
|
||||
if isReadLock {
|
||||
if locked, err = c.RLock(ctx, args); err != nil {
|
||||
if locked, err = c.RLock(context.Background(), args); err != nil {
|
||||
log("dsync: Unable to call RLock failed with %s for %#v at %s\n", err, args, c)
|
||||
}
|
||||
} else {
|
||||
if locked, err = c.Lock(ctx, args); err != nil {
|
||||
if locked, err = c.Lock(context.Background(), args); err != nil {
|
||||
log("dsync: Unable to call Lock failed with %s for %#v at %s\n", err, args, c)
|
||||
}
|
||||
}
|
||||
|
||||
if locked {
|
||||
g.lockUID = args.UID
|
||||
}
|
||||
|
||||
ch <- g
|
||||
|
||||
}(index, isReadLock, c)
|
||||
}
|
||||
|
||||
quorumLocked := false
|
||||
// Wait until we have either
|
||||
//
|
||||
// a) received all lock responses
|
||||
// b) received too many 'non-'locks for quorum to be still possible
|
||||
// c) timed out
|
||||
//
|
||||
i, locksFailed := 0, 0
|
||||
done := false
|
||||
|
||||
wg.Add(1)
|
||||
go func(isReadLock bool) {
|
||||
defer wg.Done()
|
||||
|
||||
// Wait until we have either
|
||||
//
|
||||
// a) received all lock responses
|
||||
// b) received too many 'non-'locks for quorum to be still possible
|
||||
// c) timedout
|
||||
//
|
||||
i, locksFailed := 0, 0
|
||||
done := false
|
||||
timeout := time.After(DRWMutexAcquireTimeout)
|
||||
|
||||
for ; i < len(restClnts); i++ { // Loop until we acquired all locks
|
||||
|
||||
select {
|
||||
case grant := <-ch:
|
||||
if grant.isLocked() {
|
||||
// Mark that this node has acquired the lock
|
||||
(*locks)[grant.index] = grant.lockUID
|
||||
} else {
|
||||
locksFailed++
|
||||
if locksFailed > tolerance {
|
||||
// We know that we are not going to get the lock anymore,
|
||||
// so exit out and release any locks that did get acquired
|
||||
done = true
|
||||
// Increment the number of grants received from the buffered channel.
|
||||
i++
|
||||
releaseAll(ds, tolerance, owner, locks, isReadLock, restClnts, lockNames...)
|
||||
}
|
||||
}
|
||||
case <-timeout:
|
||||
done = true
|
||||
// timeout happened, maybe one of the nodes is slow, count
|
||||
// number of locks to check whether we have quorum or not
|
||||
if !checkQuorumLocked(locks, quorum) {
|
||||
log("Quorum not met after timeout\n")
|
||||
releaseAll(ds, tolerance, owner, locks, isReadLock, restClnts, lockNames...)
|
||||
} else {
|
||||
log("Quorum met after timeout\n")
|
||||
for ; i < len(restClnts); i++ { // Loop until we acquired all locks
|
||||
select {
|
||||
case grant := <-ch:
|
||||
if grant.isLocked() {
|
||||
// Mark that this node has acquired the lock
|
||||
(*locks)[grant.index] = grant.lockUID
|
||||
} else {
|
||||
locksFailed++
|
||||
if locksFailed > tolerance {
|
||||
// We know that we are not going to get the lock anymore,
|
||||
// so exit out and release any locks that did get acquired
|
||||
done = true
|
||||
}
|
||||
}
|
||||
|
||||
if done {
|
||||
break
|
||||
}
|
||||
case <-ctx.Done():
|
||||
done = true
|
||||
log("Timeout\n")
|
||||
}
|
||||
|
||||
// Count locks in order to determine whether we have quorum or not
|
||||
quorumLocked = checkQuorumLocked(locks, quorum)
|
||||
if done {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for the other responses and immediately release the locks
|
||||
// (do not add them to the locks array because the DRWMutex could
|
||||
// already has been unlocked again by the original calling thread)
|
||||
for ; i < len(restClnts); i++ {
|
||||
grantToBeReleased := <-ch
|
||||
// Count locks in order to determine whether we have quorum or not
|
||||
quorumLocked := checkQuorumLocked(locks, quorum) && locksFailed <= tolerance
|
||||
if !quorumLocked {
|
||||
log("Quorum not met\n")
|
||||
releaseAll(ds, tolerance, owner, locks, isReadLock, restClnts, lockNames...)
|
||||
}
|
||||
|
||||
// We may have some unused results in ch, release them async.
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(ch)
|
||||
for grantToBeReleased := range ch {
|
||||
if grantToBeReleased.isLocked() {
|
||||
// release lock
|
||||
log("Releasing abandoned lock\n")
|
||||
sendRelease(ds, restClnts[grantToBeReleased.index],
|
||||
owner,
|
||||
grantToBeReleased.lockUID, isReadLock, lockNames...)
|
||||
}
|
||||
}
|
||||
}(isReadLock)
|
||||
|
||||
wg.Wait()
|
||||
}()
|
||||
|
||||
return quorumLocked
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user