locking: Add Refresh for better locking cleanup (#11535)

Co-authored-by: Anis Elleuch <anis@min.io>
Co-authored-by: Harshavardhana <harsha@minio.io>
This commit is contained in:
Anis Elleuch
2021-03-04 03:36:43 +01:00
committed by GitHub
parent 464fa08f2e
commit 7be7109471
23 changed files with 597 additions and 313 deletions

View File

@@ -1348,8 +1348,10 @@ func (a adminAPIHandlers) HealthInfoHandler(w http.ResponseWriter, r *http.Reque
deadlinedCtx, cancel := context.WithTimeout(ctx, deadline)
defer cancel()
var err error
nsLock := objectAPI.NewNSLock(minioMetaBucket, "health-check-in-progress")
if err := nsLock.GetLock(ctx, newDynamicTimeout(deadline, deadline)); err != nil { // returns a locked lock
ctx, err = nsLock.GetLock(ctx, newDynamicTimeout(deadline, deadline))
if err != nil { // returns a locked lock
errResp(err)
return
}

View File

@@ -70,11 +70,12 @@ func initDataScanner(ctx context.Context, objAPI ObjectLayer) {
// The function will block until the context is canceled.
// There should only ever be one scanner running per cluster.
func runDataScanner(ctx context.Context, objAPI ObjectLayer) {
var err error
// Make sure only 1 scanner is running on the cluster.
locker := objAPI.NewNSLock(minioMetaBucket, "runDataScanner.lock")
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for {
err := locker.GetLock(ctx, dataScannerLeaderLockTimeout)
ctx, err = locker.GetLock(ctx, dataScannerLeaderLockTimeout)
if err != nil {
time.Sleep(time.Duration(r.Float64() * float64(dataScannerStartDelay)))
continue

View File

@@ -437,7 +437,7 @@ func (c *diskCache) Stat(ctx context.Context, bucket, object string) (oi ObjectI
func (c *diskCache) statCachedMeta(ctx context.Context, cacheObjPath string) (meta *cacheMeta, partial bool, numHits int, err error) {
cLock := c.NewNSLockFn(cacheObjPath)
if err = cLock.GetRLock(ctx, globalOperationTimeout); err != nil {
if ctx, err = cLock.GetRLock(ctx, globalOperationTimeout); err != nil {
return
}
@@ -515,9 +515,11 @@ func (c *diskCache) statCache(ctx context.Context, cacheObjPath string) (meta *c
// saves object metadata to disk cache
// incHitsOnly is true if metadata update is incrementing only the hit counter
func (c *diskCache) SaveMetadata(ctx context.Context, bucket, object string, meta map[string]string, actualSize int64, rs *HTTPRangeSpec, rsFileName string, incHitsOnly bool) error {
var err error
cachedPath := getCacheSHADir(c.dir, bucket, object)
cLock := c.NewNSLockFn(cachedPath)
if err := cLock.GetLock(ctx, globalOperationTimeout); err != nil {
ctx, err = cLock.GetLock(ctx, globalOperationTimeout)
if err != nil {
return err
}
defer cLock.Unlock()
@@ -694,7 +696,8 @@ func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Read
}
cachePath := getCacheSHADir(c.dir, bucket, object)
cLock := c.NewNSLockFn(cachePath)
if err := cLock.GetLock(ctx, globalOperationTimeout); err != nil {
ctx, err = cLock.GetLock(ctx, globalOperationTimeout)
if err != nil {
return oi, err
}
defer cLock.Unlock()
@@ -908,7 +911,8 @@ func (c *diskCache) bitrotReadFromCache(ctx context.Context, filePath string, of
func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, numHits int, err error) {
cacheObjPath := getCacheSHADir(c.dir, bucket, object)
cLock := c.NewNSLockFn(cacheObjPath)
if err := cLock.GetRLock(ctx, globalOperationTimeout); err != nil {
ctx, err = cLock.GetRLock(ctx, globalOperationTimeout)
if err != nil {
return nil, numHits, err
}
@@ -972,7 +976,8 @@ func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRang
// Deletes the cached object
func (c *diskCache) delete(ctx context.Context, cacheObjPath string) (err error) {
cLock := c.NewNSLockFn(cacheObjPath)
if err := cLock.GetLock(ctx, globalOperationTimeout); err != nil {
_, err = cLock.GetLock(ctx, globalOperationTimeout)
if err != nil {
return err
}
defer cLock.Unlock()

View File

@@ -378,7 +378,8 @@ func (er erasureObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObjec
// Implements S3 compatible Upload Part API.
func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *PutObjReader, opts ObjectOptions) (pi PartInfo, err error) {
uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
if err = uploadIDLock.GetRLock(ctx, globalOperationTimeout); err != nil {
ctx, err = uploadIDLock.GetRLock(ctx, globalOperationTimeout)
if err != nil {
return PartInfo{}, err
}
readLocked := true
@@ -500,7 +501,8 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
// PutObjectParts would serialize here updating `xl.meta`
uploadIDLock.RUnlock()
readLocked = false
if err = uploadIDLock.GetLock(ctx, globalOperationTimeout); err != nil {
ctx, err = uploadIDLock.GetLock(ctx, globalOperationTimeout)
if err != nil {
return PartInfo{}, err
}
defer uploadIDLock.Unlock()
@@ -581,8 +583,10 @@ func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, u
UploadID: uploadID,
}
var err error
uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
if err := uploadIDLock.GetRLock(ctx, globalOperationTimeout); err != nil {
ctx, err = uploadIDLock.GetRLock(ctx, globalOperationTimeout)
if err != nil {
return MultipartInfo{}, err
}
defer uploadIDLock.RUnlock()
@@ -628,9 +632,10 @@ func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, u
// Implements S3 compatible ListObjectParts API. The resulting
// ListPartsInfo structure is marshaled directly into XML and
// replied back to the client.
func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker, maxParts int, opts ObjectOptions) (result ListPartsInfo, e error) {
func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker, maxParts int, opts ObjectOptions) (result ListPartsInfo, err error) {
uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
if err := uploadIDLock.GetRLock(ctx, globalOperationTimeout); err != nil {
ctx, err = uploadIDLock.GetRLock(ctx, globalOperationTimeout)
if err != nil {
return ListPartsInfo{}, err
}
defer uploadIDLock.RUnlock()
@@ -723,7 +728,8 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
// Hold read-locks to verify uploaded parts, also disallows
// parallel part uploads as well.
uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
if err = uploadIDLock.GetRLock(ctx, globalOperationTimeout); err != nil {
ctx, err = uploadIDLock.GetRLock(ctx, globalOperationTimeout)
if err != nil {
return oi, err
}
defer uploadIDLock.RUnlock()
@@ -878,7 +884,8 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
// Hold namespace to complete the transaction
lk := er.NewNSLock(bucket, object)
if err = lk.GetLock(ctx, globalOperationTimeout); err != nil {
ctx, err = lk.GetLock(ctx, globalOperationTimeout)
if err != nil {
return oi, err
}
defer lk.Unlock()
@@ -918,9 +925,10 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
// All parts are purged from all disks and reference to the uploadID
// would be removed from the system, rollback is not possible on this
// operation.
func (er erasureObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) error {
func (er erasureObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (err error) {
lk := er.NewNSLock(bucket, pathJoin(object, uploadID))
if err := lk.GetLock(ctx, globalOperationTimeout); err != nil {
ctx, err = lk.GetLock(ctx, globalOperationTimeout)
if err != nil {
return err
}
defer lk.Unlock()

View File

@@ -45,7 +45,7 @@ var objectOpIgnoredErrs = append(baseIgnoredErrs, errDiskAccessDenied, errUnform
// CopyObject - copy object source object to destination object.
// if source object and destination object are same we only
// update metadata.
func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (oi ObjectInfo, e error) {
func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (oi ObjectInfo, err error) {
// This call shouldn't be used for anything other than metadata updates or adding self referential versions.
if !srcInfo.metadataOnly {
return oi, NotImplemented{}
@@ -54,7 +54,8 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d
defer ObjectPathUpdated(pathJoin(dstBucket, dstObject))
lk := er.NewNSLock(dstBucket, dstObject)
if err := lk.GetLock(ctx, globalOperationTimeout); err != nil {
ctx, err = lk.GetLock(ctx, globalOperationTimeout)
if err != nil {
return oi, err
}
defer lk.Unlock()
@@ -147,12 +148,14 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri
lock := er.NewNSLock(bucket, object)
switch lockType {
case writeLock:
if err = lock.GetLock(ctx, globalOperationTimeout); err != nil {
ctx, err = lock.GetLock(ctx, globalOperationTimeout)
if err != nil {
return nil, err
}
nsUnlocker = lock.Unlock
case readLock:
if err = lock.GetRLock(ctx, globalOperationTimeout); err != nil {
ctx, err = lock.GetRLock(ctx, globalOperationTimeout)
if err != nil {
return nil, err
}
nsUnlocker = lock.RUnlock
@@ -208,10 +211,11 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri
//
// startOffset indicates the starting read location of the object.
// length indicates the total length of the object.
func (er erasureObjects) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) error {
func (er erasureObjects) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) (err error) {
// Lock the object before reading.
lk := er.NewNSLock(bucket, object)
if err := lk.GetRLock(ctx, globalOperationTimeout); err != nil {
ctx, err = lk.GetRLock(ctx, globalOperationTimeout)
if err != nil {
return err
}
defer lk.RUnlock()
@@ -375,7 +379,8 @@ func (er erasureObjects) GetObjectInfo(ctx context.Context, bucket, object strin
if !opts.NoLock {
// Lock the object before reading.
lk := er.NewNSLock(bucket, object)
if err := lk.GetRLock(ctx, globalOperationTimeout); err != nil {
ctx, err = lk.GetRLock(ctx, globalOperationTimeout)
if err != nil {
return ObjectInfo{}, err
}
defer lk.RUnlock()
@@ -726,8 +731,10 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
}
if !opts.NoLock {
var err error
lk := er.NewNSLock(bucket, object)
if err := lk.GetLock(ctx, globalOperationTimeout); err != nil {
ctx, err = lk.GetLock(ctx, globalOperationTimeout)
if err != nil {
return ObjectInfo{}, err
}
defer lk.Unlock()
@@ -1036,7 +1043,8 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
}
// Acquire a write lock before deleting the object.
lk := er.NewNSLock(bucket, object)
if err = lk.GetLock(ctx, globalDeleteOperationTimeout); err != nil {
ctx, err = lk.GetLock(ctx, globalDeleteOperationTimeout)
if err != nil {
return ObjectInfo{}, err
}
defer lk.Unlock()
@@ -1145,9 +1153,11 @@ func (er erasureObjects) addPartial(bucket, object, versionID string) {
// PutObjectTags - replace or add tags to an existing object
func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object string, tags string, opts ObjectOptions) (ObjectInfo, error) {
var err error
// Lock the object before updating tags.
lk := er.NewNSLock(bucket, object)
if err := lk.GetLock(ctx, globalOperationTimeout); err != nil {
ctx, err = lk.GetLock(ctx, globalOperationTimeout)
if err != nil {
return ObjectInfo{}, err
}
defer lk.Unlock()

View File

@@ -570,12 +570,14 @@ func (z *erasureServerPools) GetObjectNInfo(ctx context.Context, bucket, object
lock := z.NewNSLock(bucket, object)
switch lockType {
case writeLock:
if err = lock.GetLock(ctx, globalOperationTimeout); err != nil {
ctx, err = lock.GetLock(ctx, globalOperationTimeout)
if err != nil {
return nil, err
}
nsUnlocker = lock.Unlock
case readLock:
if err = lock.GetRLock(ctx, globalOperationTimeout); err != nil {
ctx, err = lock.GetRLock(ctx, globalOperationTimeout)
if err != nil {
return nil, err
}
nsUnlocker = lock.RUnlock
@@ -637,7 +639,8 @@ func (z *erasureServerPools) GetObjectInfo(ctx context.Context, bucket, object s
// Lock the object before reading.
lk := z.NewNSLock(bucket, object)
if err := lk.GetRLock(ctx, globalOperationTimeout); err != nil {
ctx, err = lk.GetRLock(ctx, globalOperationTimeout)
if err != nil {
return ObjectInfo{}, err
}
defer lk.RUnlock()
@@ -749,9 +752,11 @@ func (z *erasureServerPools) DeleteObjects(ctx context.Context, bucket string, o
}
}
var err error
// Acquire a bulk write lock across 'objects'
multiDeleteLock := z.NewNSLock(bucket, objSets.ToSlice()...)
if err := multiDeleteLock.GetLock(ctx, globalOperationTimeout); err != nil {
ctx, err = multiDeleteLock.GetLock(ctx, globalOperationTimeout)
if err != nil {
for i := range derrs {
derrs[i] = err
}
@@ -1312,9 +1317,11 @@ func (z *erasureServerPools) ListBuckets(ctx context.Context) (buckets []BucketI
}
func (z *erasureServerPools) HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error) {
var err error
// Acquire lock on format.json
formatLock := z.NewNSLock(minioMetaBucket, formatConfigFile)
if err := formatLock.GetLock(ctx, globalOperationTimeout); err != nil {
ctx, err = formatLock.GetLock(ctx, globalOperationTimeout)
if err != nil {
return madmin.HealResultItem{}, err
}
defer formatLock.Unlock()
@@ -1512,17 +1519,20 @@ func (z *erasureServerPools) HealObjects(ctx context.Context, bucket, prefix str
func (z *erasureServerPools) HealObject(ctx context.Context, bucket, object, versionID string, opts madmin.HealOpts) (madmin.HealResultItem, error) {
object = encodeDirObject(object)
var err error
lk := z.NewNSLock(bucket, object)
if bucket == minioMetaBucket {
// For .minio.sys bucket heals we should hold write locks.
if err := lk.GetLock(ctx, globalOperationTimeout); err != nil {
ctx, err = lk.GetLock(ctx, globalOperationTimeout)
if err != nil {
return madmin.HealResultItem{}, err
}
defer lk.Unlock()
} else {
// Lock the object before healing. Use read lock since healing
// will only regenerate parts & xl.meta of outdated disks.
if err := lk.GetRLock(ctx, globalOperationTimeout); err != nil {
ctx, err = lk.GetRLock(ctx, globalOperationTimeout)
if err != nil {
return madmin.HealResultItem{}, err
}
defer lk.RUnlock()

View File

@@ -709,7 +709,8 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
// Hold write lock on the object.
destLock := fs.NewNSLock(bucket, object)
if err = destLock.GetLock(ctx, globalOperationTimeout); err != nil {
ctx, err = destLock.GetLock(ctx, globalOperationTimeout)
if err != nil {
return oi, err
}
defer destLock.Unlock()

View File

@@ -595,7 +595,7 @@ func (fs *FSObjects) DeleteBucket(ctx context.Context, bucket string, forceDelet
// CopyObject - copy object source object to destination object.
// if source object and destination object are same we only
// update metadata.
func (fs *FSObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (oi ObjectInfo, e error) {
func (fs *FSObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (oi ObjectInfo, err error) {
if srcOpts.VersionID != "" && srcOpts.VersionID != nullVersionID {
return oi, VersionNotFound{
Bucket: srcBucket,
@@ -609,7 +609,8 @@ func (fs *FSObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBu
if !cpSrcDstSame {
objectDWLock := fs.NewNSLock(dstBucket, dstObject)
if err := objectDWLock.GetLock(ctx, globalOperationTimeout); err != nil {
ctx, err = objectDWLock.GetLock(ctx, globalOperationTimeout)
if err != nil {
return oi, err
}
defer objectDWLock.Unlock()
@@ -702,12 +703,14 @@ func (fs *FSObjects) GetObjectNInfo(ctx context.Context, bucket, object string,
lock := fs.NewNSLock(bucket, object)
switch lockType {
case writeLock:
if err = lock.GetLock(ctx, globalOperationTimeout); err != nil {
ctx, err = lock.GetLock(ctx, globalOperationTimeout)
if err != nil {
return nil, err
}
nsUnlocker = lock.Unlock
case readLock:
if err = lock.GetRLock(ctx, globalOperationTimeout); err != nil {
ctx, err = lock.GetRLock(ctx, globalOperationTimeout)
if err != nil {
return nil, err
}
nsUnlocker = lock.RUnlock
@@ -976,10 +979,11 @@ func (fs *FSObjects) getObjectInfo(ctx context.Context, bucket, object string) (
}
// getObjectInfoWithLock - reads object metadata and replies back ObjectInfo.
func (fs *FSObjects) getObjectInfoWithLock(ctx context.Context, bucket, object string) (oi ObjectInfo, e error) {
func (fs *FSObjects) getObjectInfoWithLock(ctx context.Context, bucket, object string) (oi ObjectInfo, err error) {
// Lock the object before reading.
lk := fs.NewNSLock(bucket, object)
if err := lk.GetRLock(ctx, globalOperationTimeout); err != nil {
ctx, err = lk.GetRLock(ctx, globalOperationTimeout)
if err != nil {
return oi, err
}
defer lk.RUnlock()
@@ -1017,7 +1021,8 @@ func (fs *FSObjects) GetObjectInfo(ctx context.Context, bucket, object string, o
oi, err := fs.getObjectInfoWithLock(ctx, bucket, object)
if err == errCorruptedFormat || err == io.EOF {
lk := fs.NewNSLock(bucket, object)
if err = lk.GetLock(ctx, globalOperationTimeout); err != nil {
ctx, err = lk.GetLock(ctx, globalOperationTimeout)
if err != nil {
return oi, toObjectErr(err, bucket, object)
}
@@ -1057,7 +1062,7 @@ func (fs *FSObjects) parentDirIsObject(ctx context.Context, bucket, parent strin
// until EOF, writes data directly to configured filesystem path.
// Additionally writes `fs.json` which carries the necessary metadata
// for future object operations.
func (fs *FSObjects) PutObject(ctx context.Context, bucket string, object string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, retErr error) {
func (fs *FSObjects) PutObject(ctx context.Context, bucket string, object string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) {
if opts.Versioned {
return objInfo, NotImplemented{}
}
@@ -1068,7 +1073,8 @@ func (fs *FSObjects) PutObject(ctx context.Context, bucket string, object string
// Lock the object.
lk := fs.NewNSLock(bucket, object)
if err := lk.GetLock(ctx, globalOperationTimeout); err != nil {
ctx, err = lk.GetLock(ctx, globalOperationTimeout)
if err != nil {
logger.LogIf(ctx, err)
return objInfo, err
}
@@ -1243,7 +1249,8 @@ func (fs *FSObjects) DeleteObject(ctx context.Context, bucket, object string, op
// Acquire a write lock before deleting the object.
lk := fs.NewNSLock(bucket, object)
if err = lk.GetLock(ctx, globalOperationTimeout); err != nil {
ctx, err = lk.GetLock(ctx, globalOperationTimeout)
if err != nil {
return objInfo, err
}
defer lk.Unlock()

View File

@@ -594,7 +594,7 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) {
for {
// let one of the server acquire the lock, if not let them timeout.
// which shall be retried again by this loop.
if err := txnLk.GetLock(retryCtx, iamLockTimeout); err != nil {
if _, err := txnLk.GetLock(retryCtx, iamLockTimeout); err != nil {
logger.Info("Waiting for all MinIO IAM sub-system to be initialized.. trying to acquire lock")
time.Sleep(time.Duration(r.Float64() * float64(5*time.Second)))
continue

View File

@@ -27,13 +27,13 @@ import (
// lockRequesterInfo stores various info from the client for each lock that is requested.
type lockRequesterInfo struct {
Name string // name of the resource lock was requested for
Writer bool // Bool whether write or read lock.
UID string // UID to uniquely identify request of client.
Timestamp time.Time // Timestamp set at the time of initialization.
TimeLastCheck time.Time // Timestamp for last check of validity of lock.
Source string // Contains line, function and filename reqesting the lock.
Group bool // indicates if it was a group lock.
Name string // name of the resource lock was requested for
Writer bool // Bool whether write or read lock.
UID string // UID to uniquely identify request of client.
Timestamp time.Time // Timestamp set at the time of initialization.
TimeLastRefresh time.Time // Timestamp for last lock refresh.
Source string // Contains line, function and filename reqesting the lock.
Group bool // indicates if it was a group lock.
// Owner represents the UUID of the owner who originally requested the lock
// useful in expiry.
Owner string
@@ -93,15 +93,15 @@ func (l *localLocker) Lock(ctx context.Context, args dsync.LockArgs) (reply bool
for _, resource := range args.Resources {
l.lockMap[resource] = []lockRequesterInfo{
{
Name: resource,
Writer: true,
Source: args.Source,
Owner: args.Owner,
UID: args.UID,
Timestamp: UTCNow(),
TimeLastCheck: UTCNow(),
Group: len(args.Resources) > 1,
Quorum: args.Quorum,
Name: resource,
Writer: true,
Source: args.Source,
Owner: args.Owner,
UID: args.UID,
Timestamp: UTCNow(),
TimeLastRefresh: UTCNow(),
Group: len(args.Resources) > 1,
Quorum: args.Quorum,
},
}
}
@@ -154,14 +154,14 @@ func (l *localLocker) RLock(ctx context.Context, args dsync.LockArgs) (reply boo
defer l.mutex.Unlock()
resource := args.Resources[0]
lrInfo := lockRequesterInfo{
Name: resource,
Writer: false,
Source: args.Source,
Owner: args.Owner,
UID: args.UID,
Timestamp: UTCNow(),
TimeLastCheck: UTCNow(),
Quorum: args.Quorum,
Name: resource,
Writer: false,
Source: args.Source,
Owner: args.Owner,
UID: args.UID,
Timestamp: UTCNow(),
TimeLastRefresh: UTCNow(),
Quorum: args.Quorum,
}
if lri, ok := l.lockMap[resource]; ok {
if reply = !isWriteLock(lri); reply {
@@ -236,7 +236,7 @@ func (l *localLocker) ForceUnlock(ctx context.Context, args dsync.LockArgs) (rep
}
}
func (l *localLocker) Expired(ctx context.Context, args dsync.LockArgs) (expired bool, err error) {
func (l *localLocker) Refresh(ctx context.Context, args dsync.LockArgs) (refreshed bool, err error) {
select {
case <-ctx.Done():
return false, ctx.Err()
@@ -244,45 +244,39 @@ func (l *localLocker) Expired(ctx context.Context, args dsync.LockArgs) (expired
l.mutex.Lock()
defer l.mutex.Unlock()
resource := args.Resources[0] // expiry check is always per resource.
resource := args.Resources[0] // refresh check is always per resource.
// Lock found, proceed to verify if belongs to given uid.
lri, ok := l.lockMap[resource]
if !ok {
// lock doesn't exist yet not reason to
// expire that doesn't exist yet - it may be
// racing with other active lock requests.
// lock doesn't exist yet, return false
return false, nil
}
// Check whether uid is still active
for _, entry := range lri {
if entry.UID == args.UID && entry.Owner == args.Owner {
ep := globalRemoteEndpoints[args.Owner]
if !ep.IsLocal {
// check if the owner is online
return isServerResolvable(ep, 3*time.Second) != nil, nil
}
return false, nil
for i := range lri {
if lri[i].UID == args.UID && lri[i].Owner == args.Owner {
lri[i].TimeLastRefresh = UTCNow()
return true, nil
}
}
return true, nil
return false, nil
}
}
// Similar to removeEntry but only removes an entry only if the lock entry exists in map.
// Caller must hold 'l.mutex' lock.
func (l *localLocker) removeEntryIfExists(lrip lockRequesterInfo) {
func (l *localLocker) expireOldLocks(interval time.Duration) {
l.mutex.Lock()
defer l.mutex.Unlock()
// Check if entry is still in map (could have been removed altogether by 'concurrent' (R)Unlock of last entry)
if lri, ok := l.lockMap[lrip.Name]; ok {
// Even if the entry exists, it may not be the same entry which was
// considered as expired, so we simply an attempt to remove it if its
// not possible there is nothing we need to do.
l.removeEntry(lrip.Name, dsync.LockArgs{Owner: lrip.Owner, UID: lrip.UID}, &lri)
for _, lris := range l.lockMap {
for _, lri := range lris {
if time.Since(lri.TimeLastRefresh) > interval {
l.removeEntry(lri.Name, dsync.LockArgs{Owner: lri.Owner, UID: lri.UID}, &lris)
}
}
}
}

View File

@@ -43,8 +43,8 @@ func toLockError(err error) error {
switch err.Error() {
case errLockConflict.Error():
return errLockConflict
case errLockNotExpired.Error():
return errLockNotExpired
case errLockNotFound.Error():
return errLockNotFound
}
return err
}
@@ -103,7 +103,7 @@ func (client *lockRESTClient) restCall(ctx context.Context, call string, args ds
switch err {
case nil:
return true, nil
case errLockConflict, errLockNotExpired:
case errLockConflict, errLockNotFound:
return false, nil
default:
return false, err
@@ -125,16 +125,16 @@ func (client *lockRESTClient) RUnlock(args dsync.LockArgs) (reply bool, err erro
return client.restCall(context.Background(), lockRESTMethodRUnlock, args)
}
// RUnlock calls read unlock REST API.
func (client *lockRESTClient) Refresh(ctx context.Context, args dsync.LockArgs) (reply bool, err error) {
return client.restCall(ctx, lockRESTMethodRefresh, args)
}
// Unlock calls write unlock RPC.
func (client *lockRESTClient) Unlock(args dsync.LockArgs) (reply bool, err error) {
return client.restCall(context.Background(), lockRESTMethodUnlock, args)
}
// Expired calls expired handler to check if lock args have expired.
func (client *lockRESTClient) Expired(ctx context.Context, args dsync.LockArgs) (expired bool, err error) {
return client.restCall(ctx, lockRESTMethodExpired, args)
}
// ForceUnlock calls force unlock handler to forcibly unlock an active lock.
func (client *lockRESTClient) ForceUnlock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) {
return client.restCall(ctx, lockRESTMethodForceUnlock, args)

View File

@@ -21,18 +21,18 @@ import (
)
const (
lockRESTVersion = "v5" // Add Quorum query param
lockRESTVersion = "v6" // Add Refresh API
lockRESTVersionPrefix = SlashSeparator + lockRESTVersion
lockRESTPrefix = minioReservedBucketPath + "/lock"
)
const (
lockRESTMethodHealth = "/health"
lockRESTMethodRefresh = "/refresh"
lockRESTMethodLock = "/lock"
lockRESTMethodRLock = "/rlock"
lockRESTMethodUnlock = "/unlock"
lockRESTMethodRUnlock = "/runlock"
lockRESTMethodExpired = "/expired"
lockRESTMethodForceUnlock = "/force-unlock"
// lockRESTOwner represents owner UUID
@@ -52,6 +52,6 @@ const (
var (
errLockConflict = errors.New("lock conflict")
errLockNotExpired = errors.New("lock not expired")
errLockNotInitialized = errors.New("lock not initialized")
errLockNotFound = errors.New("lock not found")
)

View File

@@ -55,18 +55,18 @@ func TestLockRpcServerRemoveEntry(t *testing.T) {
defer os.RemoveAll(testPath)
lockRequesterInfo1 := lockRequesterInfo{
Owner: "owner",
Writer: true,
UID: "0123-4567",
Timestamp: UTCNow(),
TimeLastCheck: UTCNow(),
Owner: "owner",
Writer: true,
UID: "0123-4567",
Timestamp: UTCNow(),
TimeLastRefresh: UTCNow(),
}
lockRequesterInfo2 := lockRequesterInfo{
Owner: "owner",
Writer: true,
UID: "89ab-cdef",
Timestamp: UTCNow(),
TimeLastCheck: UTCNow(),
Owner: "owner",
Writer: true,
UID: "89ab-cdef",
Timestamp: UTCNow(),
TimeLastRefresh: UTCNow(),
}
locker.ll.lockMap["name"] = []lockRequesterInfo{

View File

@@ -20,11 +20,9 @@ import (
"bufio"
"context"
"errors"
"math/rand"
"net/http"
"sort"
"strconv"
"sync"
"time"
"github.com/gorilla/mux"
@@ -35,8 +33,8 @@ const (
// Lock maintenance interval.
lockMaintenanceInterval = 1 * time.Minute
// Lock validity check interval.
lockValidityCheckInterval = 30 * time.Second
// Lock validity duration
lockValidityDuration = 20 * time.Second
)
// To abstract a node over network.
@@ -96,6 +94,31 @@ func (l *lockRESTServer) HealthHandler(w http.ResponseWriter, r *http.Request) {
l.IsValid(w, r)
}
// RefreshHandler - refresh the current lock
func (l *lockRESTServer) RefreshHandler(w http.ResponseWriter, r *http.Request) {
if !l.IsValid(w, r) {
l.writeErrorResponse(w, errors.New("invalid request"))
return
}
args, err := getLockArgs(r)
if err != nil {
l.writeErrorResponse(w, err)
return
}
refreshed, err := l.ll.Refresh(r.Context(), args)
if err != nil {
l.writeErrorResponse(w, err)
return
}
if !refreshed {
l.writeErrorResponse(w, errLockNotFound)
return
}
}
// LockHandler - Acquires a lock.
func (l *lockRESTServer) LockHandler(w http.ResponseWriter, r *http.Request) {
if !l.IsValid(w, r) {
@@ -204,148 +227,17 @@ func (l *lockRESTServer) ForceUnlockHandler(w http.ResponseWriter, r *http.Reque
}
}
// ExpiredHandler - query expired lock status.
func (l *lockRESTServer) ExpiredHandler(w http.ResponseWriter, r *http.Request) {
if !l.IsValid(w, r) {
l.writeErrorResponse(w, errors.New("invalid request"))
return
}
args, err := getLockArgs(r)
if err != nil {
l.writeErrorResponse(w, err)
return
}
expired, err := l.ll.Expired(r.Context(), args)
if err != nil {
l.writeErrorResponse(w, err)
return
}
if !expired {
l.writeErrorResponse(w, errLockNotExpired)
return
}
}
// getLongLivedLocks returns locks that are older than a certain time and
// have not been 'checked' for validity too soon enough
func getLongLivedLocks(interval time.Duration) []lockRequesterInfo {
lrips := []lockRequesterInfo{}
globalLockServer.mutex.Lock()
for _, lriArray := range globalLockServer.lockMap {
for idx := range lriArray {
// Check whether enough time has gone by since last check
if time.Since(lriArray[idx].TimeLastCheck) >= interval {
lrips = append(lrips, lriArray[idx])
lriArray[idx].TimeLastCheck = UTCNow()
}
}
}
globalLockServer.mutex.Unlock()
return lrips
}
// lockMaintenance loops over locks that have been active for some time and checks back
// with the original server whether it is still alive or not
//
// Following logic inside ignores the errors generated for Dsync.Active operation.
// - server at client down
// - some network error (and server is up normally)
//
// We will ignore the error, and we will retry later to get a resolve on this lock
func lockMaintenance(ctx context.Context, interval time.Duration) {
objAPI := newObjectLayerFn()
if objAPI == nil {
return
}
z, ok := objAPI.(*erasureServerPools)
if !ok {
return
}
type nlock struct {
locks int
writer bool
}
updateNlocks := func(nlripsMap map[string]nlock, name string, writer bool) {
nlk, ok := nlripsMap[name]
if ok {
nlk.locks++
nlripsMap[name] = nlk
} else {
nlripsMap[name] = nlock{
locks: 1,
writer: writer,
}
}
}
// Validate if long lived locks are indeed clean.
// Get list of long lived locks to check for staleness.
lrips := getLongLivedLocks(interval)
lripsMap := make(map[string]nlock, len(lrips))
var mutex sync.Mutex // mutex for lripsMap updates
for _, lrip := range lrips {
// fetch the lockers participated in handing
// out locks for `nlrip.name`
var lockers []dsync.NetLocker
if lrip.Group {
lockers, _ = z.serverPools[0].getHashedSet("").getLockers()
} else {
_, objName := path2BucketObject(lrip.Name)
lockers, _ = z.serverPools[0].getHashedSet(objName).getLockers()
}
var wg sync.WaitGroup
wg.Add(len(lockers))
for _, c := range lockers {
go func(lrip lockRequesterInfo, c dsync.NetLocker) {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
// Call back to all participating servers, verify
// if each of those servers think lock is still
// active, if not expire it.
expired, err := c.Expired(ctx, dsync.LockArgs{
Owner: lrip.Owner,
UID: lrip.UID,
Resources: []string{lrip.Name},
})
cancel()
if err != nil {
mutex.Lock()
updateNlocks(lripsMap, lrip.Name, lrip.Writer)
mutex.Unlock()
return
}
if !expired {
mutex.Lock()
updateNlocks(lripsMap, lrip.Name, lrip.Writer)
mutex.Unlock()
}
}(lrip, c)
}
wg.Wait()
// less than the quorum, we have locks expired.
if lripsMap[lrip.Name].locks < lrip.Quorum {
// Purge the stale entry if it exists.
globalLockServer.removeEntryIfExists(lrip)
}
}
}
// Start lock maintenance from all lock servers.
func startLockMaintenance(ctx context.Context) {
// lockMaintenance loops over all locks and discards locks
// that have not been refreshed for some time.
func lockMaintenance(ctx context.Context) {
// Wait until the object API is ready
// no need to start the lock maintenance
// if ObjectAPI is not initialized.
var objAPI ObjectLayer
for {
objAPI := newObjectLayerFn()
objAPI = newObjectLayerFn()
if objAPI == nil {
time.Sleep(time.Second)
continue
@@ -353,17 +245,15 @@ func startLockMaintenance(ctx context.Context) {
break
}
if _, ok := objAPI.(*erasureServerPools); !ok {
return
}
// Initialize a new ticker with 1 minute between each ticks.
lkTimer := time.NewTimer(lockMaintenanceInterval)
// Stop the timer upon returning.
defer lkTimer.Stop()
r := rand.New(rand.NewSource(UTCNow().UnixNano()))
// Start with random sleep time, so as to avoid
// "synchronous checks" between servers
time.Sleep(time.Duration(r.Float64() * float64(lockMaintenanceInterval)))
for {
// Verifies every minute for locks held more than 2 minutes.
select {
@@ -371,9 +261,9 @@ func startLockMaintenance(ctx context.Context) {
return
case <-lkTimer.C:
// Reset the timer for next cycle.
lkTimer.Reset(time.Duration(r.Float64() * float64(lockMaintenanceInterval)))
lkTimer.Reset(lockMaintenanceInterval)
lockMaintenance(ctx, lockValidityCheckInterval)
globalLockServer.expireOldLocks(lockValidityDuration)
}
}
}
@@ -386,14 +276,14 @@ func registerLockRESTHandlers(router *mux.Router) {
subrouter := router.PathPrefix(lockRESTPrefix).Subrouter()
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodHealth).HandlerFunc(httpTraceHdrs(lockServer.HealthHandler))
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRefresh).HandlerFunc(httpTraceHdrs(lockServer.RefreshHandler))
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodLock).HandlerFunc(httpTraceHdrs(lockServer.LockHandler))
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRLock).HandlerFunc(httpTraceHdrs(lockServer.RLockHandler))
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodUnlock).HandlerFunc(httpTraceHdrs(lockServer.UnlockHandler))
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRUnlock).HandlerFunc(httpTraceHdrs(lockServer.RUnlockHandler))
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodExpired).HandlerFunc(httpTraceAll(lockServer.ExpiredHandler))
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodForceUnlock).HandlerFunc(httpTraceAll(lockServer.ForceUnlockHandler))
globalLockServer = lockServer.ll
go startLockMaintenance(GlobalContext)
go lockMaintenance(GlobalContext)
}

View File

@@ -38,9 +38,9 @@ var globalLockServer *localLocker
// RWLocker - locker interface to introduce GetRLock, RUnlock.
type RWLocker interface {
GetLock(ctx context.Context, timeout *dynamicTimeout) (timedOutErr error)
GetLock(ctx context.Context, timeout *dynamicTimeout) (newCtx context.Context, timedOutErr error)
Unlock()
GetRLock(ctx context.Context, timeout *dynamicTimeout) (timedOutErr error)
GetRLock(ctx context.Context, timeout *dynamicTimeout) (newCtx context.Context, timedOutErr error)
RUnlock()
}
@@ -142,18 +142,19 @@ type distLockInstance struct {
}
// Lock - block until write lock is taken or timeout has occurred.
func (di *distLockInstance) GetLock(ctx context.Context, timeout *dynamicTimeout) (timedOutErr error) {
func (di *distLockInstance) GetLock(ctx context.Context, timeout *dynamicTimeout) (context.Context, error) {
lockSource := getSource(2)
start := UTCNow()
if !di.rwMutex.GetLock(ctx, di.opsID, lockSource, dsync.Options{
newCtx, cancel := context.WithCancel(ctx)
if !di.rwMutex.GetLock(newCtx, cancel, di.opsID, lockSource, dsync.Options{
Timeout: timeout.Timeout(),
}) {
timeout.LogFailure()
return OperationTimedOut{}
return ctx, OperationTimedOut{}
}
timeout.LogSuccess(UTCNow().Sub(start))
return nil
return newCtx, nil
}
// Unlock - block until write lock is released.
@@ -162,18 +163,19 @@ func (di *distLockInstance) Unlock() {
}
// RLock - block until read lock is taken or timeout has occurred.
func (di *distLockInstance) GetRLock(ctx context.Context, timeout *dynamicTimeout) (timedOutErr error) {
func (di *distLockInstance) GetRLock(ctx context.Context, timeout *dynamicTimeout) (context.Context, error) {
lockSource := getSource(2)
start := UTCNow()
if !di.rwMutex.GetRLock(ctx, di.opsID, lockSource, dsync.Options{
newCtx, cancel := context.WithCancel(ctx)
if !di.rwMutex.GetRLock(ctx, cancel, di.opsID, lockSource, dsync.Options{
Timeout: timeout.Timeout(),
}) {
timeout.LogFailure()
return OperationTimedOut{}
return ctx, OperationTimedOut{}
}
timeout.LogSuccess(UTCNow().Sub(start))
return nil
return newCtx, nil
}
// RUnlock - block until read lock is released.
@@ -205,7 +207,7 @@ func (n *nsLockMap) NewNSLock(lockers func() ([]dsync.NetLocker, string), volume
}
// Lock - block until write lock is taken or timeout has occurred.
func (li *localLockInstance) GetLock(ctx context.Context, timeout *dynamicTimeout) (timedOutErr error) {
func (li *localLockInstance) GetLock(ctx context.Context, timeout *dynamicTimeout) (_ context.Context, timedOutErr error) {
lockSource := getSource(2)
start := UTCNow()
const readLock = false
@@ -216,12 +218,12 @@ func (li *localLockInstance) GetLock(ctx context.Context, timeout *dynamicTimeou
for _, sint := range success {
li.ns.unlock(li.volume, li.paths[sint], readLock)
}
return OperationTimedOut{}
return nil, OperationTimedOut{}
}
success = append(success, i)
}
timeout.LogSuccess(UTCNow().Sub(start))
return
return ctx, nil
}
// Unlock - block until write lock is released.
@@ -233,7 +235,7 @@ func (li *localLockInstance) Unlock() {
}
// RLock - block until read lock is taken or timeout has occurred.
func (li *localLockInstance) GetRLock(ctx context.Context, timeout *dynamicTimeout) (timedOutErr error) {
func (li *localLockInstance) GetRLock(ctx context.Context, timeout *dynamicTimeout) (_ context.Context, timedOutErr error) {
lockSource := getSource(2)
start := UTCNow()
const readLock = true
@@ -244,12 +246,12 @@ func (li *localLockInstance) GetRLock(ctx context.Context, timeout *dynamicTimeo
for _, sint := range success {
li.ns.unlock(li.volume, li.paths[sint], readLock)
}
return OperationTimedOut{}
return nil, OperationTimedOut{}
}
success = append(success, i)
}
timeout.LogSuccess(UTCNow().Sub(start))
return
return ctx, nil
}
// RUnlock - block until read lock is released.

View File

@@ -276,7 +276,7 @@ func initServer(ctx context.Context, newObject ObjectLayer) error {
// let one of the server acquire the lock, if not let them timeout.
// which shall be retried again by this loop.
if err = txnLk.GetLock(ctx, lockTimeout); err != nil {
if _, err = txnLk.GetLock(ctx, lockTimeout); err != nil {
logger.Info("Waiting for all MinIO sub-systems to be initialized.. trying to acquire lock")
time.Sleep(time.Duration(r.Float64() * float64(5*time.Second)))

View File

@@ -56,6 +56,12 @@ func (p *xlStorageDiskIDCheck) Healing() bool {
}
func (p *xlStorageDiskIDCheck) NSScanner(ctx context.Context, cache dataUsageCache) (dataUsageCache, error) {
select {
case <-ctx.Done():
return dataUsageCache{}, ctx.Err()
default:
}
if err := p.checkDiskStale(); err != nil {
return dataUsageCache{}, err
}
@@ -93,6 +99,12 @@ func (p *xlStorageDiskIDCheck) checkDiskStale() error {
}
func (p *xlStorageDiskIDCheck) DiskInfo(ctx context.Context) (info DiskInfo, err error) {
select {
case <-ctx.Done():
return DiskInfo{}, ctx.Err()
default:
}
info, err = p.storage.DiskInfo(ctx)
if err != nil {
return info, err
@@ -108,6 +120,12 @@ func (p *xlStorageDiskIDCheck) DiskInfo(ctx context.Context) (info DiskInfo, err
}
func (p *xlStorageDiskIDCheck) MakeVolBulk(ctx context.Context, volumes ...string) (err error) {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if err = p.checkDiskStale(); err != nil {
return err
}
@@ -115,6 +133,12 @@ func (p *xlStorageDiskIDCheck) MakeVolBulk(ctx context.Context, volumes ...strin
}
func (p *xlStorageDiskIDCheck) MakeVol(ctx context.Context, volume string) (err error) {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if err = p.checkDiskStale(); err != nil {
return err
}
@@ -122,6 +146,12 @@ func (p *xlStorageDiskIDCheck) MakeVol(ctx context.Context, volume string) (err
}
func (p *xlStorageDiskIDCheck) ListVols(ctx context.Context) ([]VolInfo, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
if err := p.checkDiskStale(); err != nil {
return nil, err
}
@@ -129,6 +159,12 @@ func (p *xlStorageDiskIDCheck) ListVols(ctx context.Context) ([]VolInfo, error)
}
func (p *xlStorageDiskIDCheck) StatVol(ctx context.Context, volume string) (vol VolInfo, err error) {
select {
case <-ctx.Done():
return VolInfo{}, ctx.Err()
default:
}
if err = p.checkDiskStale(); err != nil {
return vol, err
}
@@ -136,6 +172,12 @@ func (p *xlStorageDiskIDCheck) StatVol(ctx context.Context, volume string) (vol
}
func (p *xlStorageDiskIDCheck) DeleteVol(ctx context.Context, volume string, forceDelete bool) (err error) {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if err = p.checkDiskStale(); err != nil {
return err
}
@@ -143,6 +185,12 @@ func (p *xlStorageDiskIDCheck) DeleteVol(ctx context.Context, volume string, for
}
func (p *xlStorageDiskIDCheck) WalkVersions(ctx context.Context, volume, dirPath, marker string, recursive bool, endWalkCh <-chan struct{}) (chan FileInfoVersions, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
if err := p.checkDiskStale(); err != nil {
return nil, err
}
@@ -150,6 +198,12 @@ func (p *xlStorageDiskIDCheck) WalkVersions(ctx context.Context, volume, dirPath
}
func (p *xlStorageDiskIDCheck) ListDir(ctx context.Context, volume, dirPath string, count int) ([]string, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
if err := p.checkDiskStale(); err != nil {
return nil, err
}
@@ -158,6 +212,12 @@ func (p *xlStorageDiskIDCheck) ListDir(ctx context.Context, volume, dirPath stri
}
func (p *xlStorageDiskIDCheck) ReadFile(ctx context.Context, volume string, path string, offset int64, buf []byte, verifier *BitrotVerifier) (n int64, err error) {
select {
case <-ctx.Done():
return 0, ctx.Err()
default:
}
if err := p.checkDiskStale(); err != nil {
return 0, err
}
@@ -166,6 +226,12 @@ func (p *xlStorageDiskIDCheck) ReadFile(ctx context.Context, volume string, path
}
func (p *xlStorageDiskIDCheck) AppendFile(ctx context.Context, volume string, path string, buf []byte) (err error) {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if err = p.checkDiskStale(); err != nil {
return err
}
@@ -174,6 +240,12 @@ func (p *xlStorageDiskIDCheck) AppendFile(ctx context.Context, volume string, pa
}
func (p *xlStorageDiskIDCheck) CreateFile(ctx context.Context, volume, path string, size int64, reader io.Reader) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if err := p.checkDiskStale(); err != nil {
return err
}
@@ -182,6 +254,12 @@ func (p *xlStorageDiskIDCheck) CreateFile(ctx context.Context, volume, path stri
}
func (p *xlStorageDiskIDCheck) ReadFileStream(ctx context.Context, volume, path string, offset, length int64) (io.ReadCloser, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
if err := p.checkDiskStale(); err != nil {
return nil, err
}
@@ -190,6 +268,12 @@ func (p *xlStorageDiskIDCheck) ReadFileStream(ctx context.Context, volume, path
}
func (p *xlStorageDiskIDCheck) RenameFile(ctx context.Context, srcVolume, srcPath, dstVolume, dstPath string) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if err := p.checkDiskStale(); err != nil {
return err
}
@@ -198,6 +282,12 @@ func (p *xlStorageDiskIDCheck) RenameFile(ctx context.Context, srcVolume, srcPat
}
func (p *xlStorageDiskIDCheck) RenameData(ctx context.Context, srcVolume, srcPath, dataDir, dstVolume, dstPath string) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if err := p.checkDiskStale(); err != nil {
return err
}
@@ -206,6 +296,12 @@ func (p *xlStorageDiskIDCheck) RenameData(ctx context.Context, srcVolume, srcPat
}
func (p *xlStorageDiskIDCheck) CheckParts(ctx context.Context, volume string, path string, fi FileInfo) (err error) {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if err = p.checkDiskStale(); err != nil {
return err
}
@@ -214,6 +310,12 @@ func (p *xlStorageDiskIDCheck) CheckParts(ctx context.Context, volume string, pa
}
func (p *xlStorageDiskIDCheck) CheckFile(ctx context.Context, volume string, path string) (err error) {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if err = p.checkDiskStale(); err != nil {
return err
}
@@ -222,6 +324,12 @@ func (p *xlStorageDiskIDCheck) CheckFile(ctx context.Context, volume string, pat
}
func (p *xlStorageDiskIDCheck) Delete(ctx context.Context, volume string, path string, recursive bool) (err error) {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if err = p.checkDiskStale(); err != nil {
return err
}
@@ -241,6 +349,12 @@ func (p *xlStorageDiskIDCheck) DeleteVersions(ctx context.Context, volume string
}
func (p *xlStorageDiskIDCheck) VerifyFile(ctx context.Context, volume, path string, fi FileInfo) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if err := p.checkDiskStale(); err != nil {
return err
}
@@ -249,6 +363,12 @@ func (p *xlStorageDiskIDCheck) VerifyFile(ctx context.Context, volume, path stri
}
func (p *xlStorageDiskIDCheck) WriteAll(ctx context.Context, volume string, path string, b []byte) (err error) {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if err = p.checkDiskStale(); err != nil {
return err
}
@@ -257,6 +377,12 @@ func (p *xlStorageDiskIDCheck) WriteAll(ctx context.Context, volume string, path
}
func (p *xlStorageDiskIDCheck) DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool) (err error) {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if err = p.checkDiskStale(); err != nil {
return err
}
@@ -265,6 +391,12 @@ func (p *xlStorageDiskIDCheck) DeleteVersion(ctx context.Context, volume, path s
}
func (p *xlStorageDiskIDCheck) WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) (err error) {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if err = p.checkDiskStale(); err != nil {
return err
}
@@ -273,6 +405,12 @@ func (p *xlStorageDiskIDCheck) WriteMetadata(ctx context.Context, volume, path s
}
func (p *xlStorageDiskIDCheck) ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (fi FileInfo, err error) {
select {
case <-ctx.Done():
return fi, ctx.Err()
default:
}
if err = p.checkDiskStale(); err != nil {
return fi, err
}
@@ -281,6 +419,12 @@ func (p *xlStorageDiskIDCheck) ReadVersion(ctx context.Context, volume, path, ve
}
func (p *xlStorageDiskIDCheck) ReadAll(ctx context.Context, volume string, path string) (buf []byte, err error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
if err = p.checkDiskStale(); err != nil {
return nil, err
}