mirror of
https://github.com/minio/minio.git
synced 2025-01-11 06:53:22 -05:00
lock: Always cancel the returned Get(R)Lock context (#12162)
* lock: Always cancel the returned Get(R)Lock context There is a leak with cancel created inside the locking mechanism. The cancel purpose was to cancel operations such erasure get/put that are holding non-refreshable locks. This PR will ensure the created context.Cancel is passed to the unlock API so it will cleanup and avoid leaks. * locks: Avoid returning nil cancel in local lockers Since there is no Refresh mechanism in the local locking mechanism, we do not generate a new context or cancel. Currently, a nil cancel function is returned but this can cause a crash. Return a dummy function instead.
This commit is contained in:
parent
fbdfa11f76
commit
9e797532dc
@ -1412,17 +1412,16 @@ func (a adminAPIHandlers) HealthInfoHandler(w http.ResponseWriter, r *http.Reque
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
deadlinedCtx, cancel := context.WithTimeout(ctx, deadline)
|
deadlinedCtx, deadlineCancel := context.WithTimeout(ctx, deadline)
|
||||||
defer cancel()
|
defer deadlineCancel()
|
||||||
|
|
||||||
var err error
|
|
||||||
nsLock := objectAPI.NewNSLock(minioMetaBucket, "health-check-in-progress")
|
nsLock := objectAPI.NewNSLock(minioMetaBucket, "health-check-in-progress")
|
||||||
ctx, err = nsLock.GetLock(ctx, newDynamicTimeout(deadline, deadline))
|
ctx, cancel, err := nsLock.GetLock(ctx, newDynamicTimeout(deadline, deadline))
|
||||||
if err != nil { // returns a locked lock
|
if err != nil { // returns a locked lock
|
||||||
errResp(err)
|
errResp(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer nsLock.Unlock()
|
defer nsLock.Unlock(cancel)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
defer close(healthInfoCh)
|
defer close(healthInfoCh)
|
||||||
|
@ -92,16 +92,18 @@ func (s *safeDuration) Get() time.Duration {
|
|||||||
// The function will block until the context is canceled.
|
// The function will block until the context is canceled.
|
||||||
// There should only ever be one scanner running per cluster.
|
// There should only ever be one scanner running per cluster.
|
||||||
func runDataScanner(ctx context.Context, objAPI ObjectLayer) {
|
func runDataScanner(ctx context.Context, objAPI ObjectLayer) {
|
||||||
var err error
|
|
||||||
// Make sure only 1 scanner is running on the cluster.
|
// Make sure only 1 scanner is running on the cluster.
|
||||||
locker := objAPI.NewNSLock(minioMetaBucket, "runDataScanner.lock")
|
locker := objAPI.NewNSLock(minioMetaBucket, "runDataScanner.lock")
|
||||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||||
for {
|
for {
|
||||||
ctx, err = locker.GetLock(ctx, dataScannerLeaderLockTimeout)
|
var err error
|
||||||
|
var cancel context.CancelFunc
|
||||||
|
ctx, cancel, err = locker.GetLock(ctx, dataScannerLeaderLockTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
time.Sleep(time.Duration(r.Float64() * float64(scannerCycle.Get())))
|
time.Sleep(time.Duration(r.Float64() * float64(scannerCycle.Get())))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
defer cancel()
|
||||||
break
|
break
|
||||||
// No unlock for "leader" lock.
|
// No unlock for "leader" lock.
|
||||||
}
|
}
|
||||||
|
@ -438,13 +438,13 @@ func (c *diskCache) Stat(ctx context.Context, bucket, object string) (oi ObjectI
|
|||||||
// statCachedMeta returns metadata from cache - including ranges cached, partial to indicate
|
// statCachedMeta returns metadata from cache - including ranges cached, partial to indicate
|
||||||
// if partial object is cached.
|
// if partial object is cached.
|
||||||
func (c *diskCache) statCachedMeta(ctx context.Context, cacheObjPath string) (meta *cacheMeta, partial bool, numHits int, err error) {
|
func (c *diskCache) statCachedMeta(ctx context.Context, cacheObjPath string) (meta *cacheMeta, partial bool, numHits int, err error) {
|
||||||
|
|
||||||
cLock := c.NewNSLockFn(cacheObjPath)
|
cLock := c.NewNSLockFn(cacheObjPath)
|
||||||
if ctx, err = cLock.GetRLock(ctx, globalOperationTimeout); err != nil {
|
ctx, cancel, err := cLock.GetRLock(ctx, globalOperationTimeout)
|
||||||
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
defer cLock.RUnlock()
|
defer cLock.RUnlock(cancel)
|
||||||
return c.statCache(ctx, cacheObjPath)
|
return c.statCache(ctx, cacheObjPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -518,14 +518,13 @@ func (c *diskCache) statCache(ctx context.Context, cacheObjPath string) (meta *c
|
|||||||
// saves object metadata to disk cache
|
// saves object metadata to disk cache
|
||||||
// incHitsOnly is true if metadata update is incrementing only the hit counter
|
// incHitsOnly is true if metadata update is incrementing only the hit counter
|
||||||
func (c *diskCache) SaveMetadata(ctx context.Context, bucket, object string, meta map[string]string, actualSize int64, rs *HTTPRangeSpec, rsFileName string, incHitsOnly bool) error {
|
func (c *diskCache) SaveMetadata(ctx context.Context, bucket, object string, meta map[string]string, actualSize int64, rs *HTTPRangeSpec, rsFileName string, incHitsOnly bool) error {
|
||||||
var err error
|
|
||||||
cachedPath := getCacheSHADir(c.dir, bucket, object)
|
cachedPath := getCacheSHADir(c.dir, bucket, object)
|
||||||
cLock := c.NewNSLockFn(cachedPath)
|
cLock := c.NewNSLockFn(cachedPath)
|
||||||
ctx, err = cLock.GetLock(ctx, globalOperationTimeout)
|
ctx, cancel, err := cLock.GetLock(ctx, globalOperationTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer cLock.Unlock()
|
defer cLock.Unlock(cancel)
|
||||||
return c.saveMetadata(ctx, bucket, object, meta, actualSize, rs, rsFileName, incHitsOnly)
|
return c.saveMetadata(ctx, bucket, object, meta, actualSize, rs, rsFileName, incHitsOnly)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -699,11 +698,11 @@ func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Read
|
|||||||
}
|
}
|
||||||
cachePath := getCacheSHADir(c.dir, bucket, object)
|
cachePath := getCacheSHADir(c.dir, bucket, object)
|
||||||
cLock := c.NewNSLockFn(cachePath)
|
cLock := c.NewNSLockFn(cachePath)
|
||||||
ctx, err = cLock.GetLock(ctx, globalOperationTimeout)
|
ctx, cancel, err := cLock.GetLock(ctx, globalOperationTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return oi, err
|
return oi, err
|
||||||
}
|
}
|
||||||
defer cLock.Unlock()
|
defer cLock.Unlock(cancel)
|
||||||
|
|
||||||
meta, _, numHits, err := c.statCache(ctx, cachePath)
|
meta, _, numHits, err := c.statCache(ctx, cachePath)
|
||||||
// Case where object not yet cached
|
// Case where object not yet cached
|
||||||
@ -914,12 +913,12 @@ func (c *diskCache) bitrotReadFromCache(ctx context.Context, filePath string, of
|
|||||||
func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, numHits int, err error) {
|
func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, numHits int, err error) {
|
||||||
cacheObjPath := getCacheSHADir(c.dir, bucket, object)
|
cacheObjPath := getCacheSHADir(c.dir, bucket, object)
|
||||||
cLock := c.NewNSLockFn(cacheObjPath)
|
cLock := c.NewNSLockFn(cacheObjPath)
|
||||||
ctx, err = cLock.GetRLock(ctx, globalOperationTimeout)
|
ctx, cancel, err := cLock.GetRLock(ctx, globalOperationTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, numHits, err
|
return nil, numHits, err
|
||||||
}
|
}
|
||||||
|
defer cLock.RUnlock(cancel)
|
||||||
|
|
||||||
defer cLock.RUnlock()
|
|
||||||
var objInfo ObjectInfo
|
var objInfo ObjectInfo
|
||||||
var rngInfo RangeInfo
|
var rngInfo RangeInfo
|
||||||
if objInfo, rngInfo, numHits, err = c.statRange(ctx, bucket, object, rs); err != nil {
|
if objInfo, rngInfo, numHits, err = c.statRange(ctx, bucket, object, rs); err != nil {
|
||||||
@ -979,11 +978,11 @@ func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRang
|
|||||||
// Deletes the cached object
|
// Deletes the cached object
|
||||||
func (c *diskCache) delete(ctx context.Context, cacheObjPath string) (err error) {
|
func (c *diskCache) delete(ctx context.Context, cacheObjPath string) (err error) {
|
||||||
cLock := c.NewNSLockFn(cacheObjPath)
|
cLock := c.NewNSLockFn(cacheObjPath)
|
||||||
_, err = cLock.GetLock(ctx, globalOperationTimeout)
|
_, cancel, err := cLock.GetLock(ctx, globalOperationTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer cLock.Unlock()
|
defer cLock.Unlock(cancel)
|
||||||
return removeAll(cacheObjPath)
|
return removeAll(cacheObjPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -253,11 +253,12 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
|
|||||||
}
|
}
|
||||||
|
|
||||||
if !opts.NoLock {
|
if !opts.NoLock {
|
||||||
|
var cancel context.CancelFunc
|
||||||
lk := er.NewNSLock(bucket, object)
|
lk := er.NewNSLock(bucket, object)
|
||||||
if ctx, err = lk.GetLock(ctx, globalOperationTimeout); err != nil {
|
if ctx, cancel, err = lk.GetLock(ctx, globalOperationTimeout); err != nil {
|
||||||
return result, err
|
return result, err
|
||||||
}
|
}
|
||||||
defer lk.Unlock()
|
defer lk.Unlock(cancel)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Re-read when we have lock...
|
// Re-read when we have lock...
|
||||||
|
@ -378,21 +378,21 @@ func (er erasureObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObjec
|
|||||||
// Implements S3 compatible Upload Part API.
|
// Implements S3 compatible Upload Part API.
|
||||||
func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *PutObjReader, opts ObjectOptions) (pi PartInfo, err error) {
|
func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *PutObjReader, opts ObjectOptions) (pi PartInfo, err error) {
|
||||||
uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
||||||
ctx, err = uploadIDLock.GetRLock(ctx, globalOperationTimeout)
|
rctx, rcancel, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return PartInfo{}, err
|
return PartInfo{}, err
|
||||||
}
|
}
|
||||||
readLocked := true
|
readLocked := true
|
||||||
defer func() {
|
defer func() {
|
||||||
if readLocked {
|
if readLocked {
|
||||||
uploadIDLock.RUnlock()
|
uploadIDLock.RUnlock(rcancel)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
data := r.Reader
|
data := r.Reader
|
||||||
// Validate input data size and it can never be less than zero.
|
// Validate input data size and it can never be less than zero.
|
||||||
if data.Size() < -1 {
|
if data.Size() < -1 {
|
||||||
logger.LogIf(ctx, errInvalidArgument, logger.Application)
|
logger.LogIf(rctx, errInvalidArgument, logger.Application)
|
||||||
return pi, toObjectErr(errInvalidArgument)
|
return pi, toObjectErr(errInvalidArgument)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -401,23 +401,23 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
|||||||
uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)
|
uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)
|
||||||
|
|
||||||
// Validates if upload ID exists.
|
// Validates if upload ID exists.
|
||||||
if err = er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
|
if err = er.checkUploadIDExists(rctx, bucket, object, uploadID); err != nil {
|
||||||
return pi, toObjectErr(err, bucket, object, uploadID)
|
return pi, toObjectErr(err, bucket, object, uploadID)
|
||||||
}
|
}
|
||||||
|
|
||||||
storageDisks := er.getDisks()
|
storageDisks := er.getDisks()
|
||||||
|
|
||||||
// Read metadata associated with the object from all disks.
|
// Read metadata associated with the object from all disks.
|
||||||
partsMetadata, errs = readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket,
|
partsMetadata, errs = readAllFileInfo(rctx, storageDisks, minioMetaMultipartBucket,
|
||||||
uploadIDPath, "", false)
|
uploadIDPath, "", false)
|
||||||
|
|
||||||
// get Quorum for this object
|
// get Quorum for this object
|
||||||
_, writeQuorum, err := objectQuorumFromMeta(ctx, partsMetadata, errs, er.defaultParityCount)
|
_, writeQuorum, err := objectQuorumFromMeta(rctx, partsMetadata, errs, er.defaultParityCount)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return pi, toObjectErr(err, bucket, object)
|
return pi, toObjectErr(err, bucket, object)
|
||||||
}
|
}
|
||||||
|
|
||||||
reducedErr := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
|
reducedErr := reduceWriteQuorumErrs(rctx, errs, objectOpIgnoredErrs, writeQuorum)
|
||||||
if reducedErr == errErasureWriteQuorum {
|
if reducedErr == errErasureWriteQuorum {
|
||||||
return pi, toObjectErr(reducedErr, bucket, object)
|
return pi, toObjectErr(reducedErr, bucket, object)
|
||||||
}
|
}
|
||||||
@ -426,7 +426,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
|||||||
onlineDisks, modTime, dataDir := listOnlineDisks(storageDisks, partsMetadata, errs)
|
onlineDisks, modTime, dataDir := listOnlineDisks(storageDisks, partsMetadata, errs)
|
||||||
|
|
||||||
// Pick one from the first valid metadata.
|
// Pick one from the first valid metadata.
|
||||||
fi, err := pickValidFileInfo(ctx, partsMetadata, modTime, dataDir, writeQuorum)
|
fi, err := pickValidFileInfo(rctx, partsMetadata, modTime, dataDir, writeQuorum)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return pi, err
|
return pi, err
|
||||||
}
|
}
|
||||||
@ -448,7 +448,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
erasure, err := NewErasure(ctx, fi.Erasure.DataBlocks, fi.Erasure.ParityBlocks, fi.Erasure.BlockSize)
|
erasure, err := NewErasure(rctx, fi.Erasure.DataBlocks, fi.Erasure.ParityBlocks, fi.Erasure.BlockSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return pi, toObjectErr(err, bucket, object)
|
return pi, toObjectErr(err, bucket, object)
|
||||||
}
|
}
|
||||||
@ -485,7 +485,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
|||||||
erasure.ShardFileSize(data.Size()), DefaultBitrotAlgorithm, erasure.ShardSize(), false)
|
erasure.ShardFileSize(data.Size()), DefaultBitrotAlgorithm, erasure.ShardSize(), false)
|
||||||
}
|
}
|
||||||
|
|
||||||
n, err := erasure.Encode(ctx, data, writers, buffer, writeQuorum)
|
n, err := erasure.Encode(rctx, data, writers, buffer, writeQuorum)
|
||||||
closeBitrotWriters(writers)
|
closeBitrotWriters(writers)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return pi, toObjectErr(err, bucket, object)
|
return pi, toObjectErr(err, bucket, object)
|
||||||
@ -505,29 +505,29 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
|||||||
|
|
||||||
// Unlock here before acquiring write locks all concurrent
|
// Unlock here before acquiring write locks all concurrent
|
||||||
// PutObjectParts would serialize here updating `xl.meta`
|
// PutObjectParts would serialize here updating `xl.meta`
|
||||||
uploadIDLock.RUnlock()
|
uploadIDLock.RUnlock(rcancel)
|
||||||
readLocked = false
|
readLocked = false
|
||||||
ctx, err = uploadIDLock.GetLock(ctx, globalOperationTimeout)
|
wctx, wcancel, err := uploadIDLock.GetLock(ctx, globalOperationTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return PartInfo{}, err
|
return PartInfo{}, err
|
||||||
}
|
}
|
||||||
defer uploadIDLock.Unlock()
|
defer uploadIDLock.Unlock(wcancel)
|
||||||
|
|
||||||
// Validates if upload ID exists.
|
// Validates if upload ID exists.
|
||||||
if err = er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
|
if err = er.checkUploadIDExists(wctx, bucket, object, uploadID); err != nil {
|
||||||
return pi, toObjectErr(err, bucket, object, uploadID)
|
return pi, toObjectErr(err, bucket, object, uploadID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Rename temporary part file to its final location.
|
// Rename temporary part file to its final location.
|
||||||
partPath := pathJoin(uploadIDPath, fi.DataDir, partSuffix)
|
partPath := pathJoin(uploadIDPath, fi.DataDir, partSuffix)
|
||||||
onlineDisks, err = rename(ctx, onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, false, writeQuorum, nil)
|
onlineDisks, err = rename(wctx, onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, false, writeQuorum, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return pi, toObjectErr(err, minioMetaMultipartBucket, partPath)
|
return pi, toObjectErr(err, minioMetaMultipartBucket, partPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read metadata again because it might be updated with parallel upload of another part.
|
// Read metadata again because it might be updated with parallel upload of another part.
|
||||||
partsMetadata, errs = readAllFileInfo(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, "", false)
|
partsMetadata, errs = readAllFileInfo(wctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, "", false)
|
||||||
reducedErr = reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
|
reducedErr = reduceWriteQuorumErrs(wctx, errs, objectOpIgnoredErrs, writeQuorum)
|
||||||
if reducedErr == errErasureWriteQuorum {
|
if reducedErr == errErasureWriteQuorum {
|
||||||
return pi, toObjectErr(reducedErr, bucket, object)
|
return pi, toObjectErr(reducedErr, bucket, object)
|
||||||
}
|
}
|
||||||
@ -536,7 +536,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
|||||||
onlineDisks, modTime, dataDir = listOnlineDisks(onlineDisks, partsMetadata, errs)
|
onlineDisks, modTime, dataDir = listOnlineDisks(onlineDisks, partsMetadata, errs)
|
||||||
|
|
||||||
// Pick one from the first valid metadata.
|
// Pick one from the first valid metadata.
|
||||||
fi, err = pickValidFileInfo(ctx, partsMetadata, modTime, dataDir, writeQuorum)
|
fi, err = pickValidFileInfo(wctx, partsMetadata, modTime, dataDir, writeQuorum)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return pi, err
|
return pi, err
|
||||||
}
|
}
|
||||||
@ -564,7 +564,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Writes update `xl.meta` format for each disk.
|
// Writes update `xl.meta` format for each disk.
|
||||||
if _, err = writeUniqueFileInfo(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum); err != nil {
|
if _, err = writeUniqueFileInfo(wctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum); err != nil {
|
||||||
return pi, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath)
|
return pi, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -591,13 +591,12 @@ func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, u
|
|||||||
UploadID: uploadID,
|
UploadID: uploadID,
|
||||||
}
|
}
|
||||||
|
|
||||||
var err error
|
|
||||||
uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
||||||
ctx, err = uploadIDLock.GetRLock(ctx, globalOperationTimeout)
|
ctx, cancel, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return MultipartInfo{}, err
|
return MultipartInfo{}, err
|
||||||
}
|
}
|
||||||
defer uploadIDLock.RUnlock()
|
defer uploadIDLock.RUnlock(cancel)
|
||||||
|
|
||||||
if err := er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
|
if err := er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
|
||||||
return result, toObjectErr(err, bucket, object, uploadID)
|
return result, toObjectErr(err, bucket, object, uploadID)
|
||||||
@ -642,11 +641,11 @@ func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, u
|
|||||||
// replied back to the client.
|
// replied back to the client.
|
||||||
func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker, maxParts int, opts ObjectOptions) (result ListPartsInfo, err error) {
|
func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker, maxParts int, opts ObjectOptions) (result ListPartsInfo, err error) {
|
||||||
uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
||||||
ctx, err = uploadIDLock.GetRLock(ctx, globalOperationTimeout)
|
ctx, cancel, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ListPartsInfo{}, err
|
return ListPartsInfo{}, err
|
||||||
}
|
}
|
||||||
defer uploadIDLock.RUnlock()
|
defer uploadIDLock.RUnlock(cancel)
|
||||||
|
|
||||||
if err := er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
|
if err := er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
|
||||||
return result, toObjectErr(err, bucket, object, uploadID)
|
return result, toObjectErr(err, bucket, object, uploadID)
|
||||||
@ -736,11 +735,11 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
|
|||||||
// Hold read-locks to verify uploaded parts, also disallows
|
// Hold read-locks to verify uploaded parts, also disallows
|
||||||
// parallel part uploads as well.
|
// parallel part uploads as well.
|
||||||
uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
||||||
ctx, err = uploadIDLock.GetRLock(ctx, globalOperationTimeout)
|
ctx, cancel, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return oi, err
|
return oi, err
|
||||||
}
|
}
|
||||||
defer uploadIDLock.RUnlock()
|
defer uploadIDLock.RUnlock(cancel)
|
||||||
|
|
||||||
if err = er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
|
if err = er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
|
||||||
return oi, toObjectErr(err, bucket, object, uploadID)
|
return oi, toObjectErr(err, bucket, object, uploadID)
|
||||||
@ -892,11 +891,11 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
|
|||||||
|
|
||||||
// Hold namespace to complete the transaction
|
// Hold namespace to complete the transaction
|
||||||
lk := er.NewNSLock(bucket, object)
|
lk := er.NewNSLock(bucket, object)
|
||||||
ctx, err = lk.GetLock(ctx, globalOperationTimeout)
|
ctx, cancel, err = lk.GetLock(ctx, globalOperationTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return oi, err
|
return oi, err
|
||||||
}
|
}
|
||||||
defer lk.Unlock()
|
defer lk.Unlock(cancel)
|
||||||
|
|
||||||
// Rename the multipart object to final location.
|
// Rename the multipart object to final location.
|
||||||
if onlineDisks, err = renameData(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath,
|
if onlineDisks, err = renameData(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath,
|
||||||
@ -935,11 +934,11 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
|
|||||||
// operation.
|
// operation.
|
||||||
func (er erasureObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (err error) {
|
func (er erasureObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (err error) {
|
||||||
lk := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
lk := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
||||||
ctx, err = lk.GetLock(ctx, globalOperationTimeout)
|
ctx, cancel, err := lk.GetLock(ctx, globalOperationTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer lk.Unlock()
|
defer lk.Unlock(cancel)
|
||||||
|
|
||||||
// Validates if upload ID exists.
|
// Validates if upload ID exists.
|
||||||
if err := er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
|
if err := er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
|
||||||
|
@ -65,12 +65,13 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d
|
|||||||
|
|
||||||
defer ObjectPathUpdated(pathJoin(dstBucket, dstObject))
|
defer ObjectPathUpdated(pathJoin(dstBucket, dstObject))
|
||||||
if !dstOpts.NoLock {
|
if !dstOpts.NoLock {
|
||||||
|
var cancel context.CancelFunc
|
||||||
lk := er.NewNSLock(dstBucket, dstObject)
|
lk := er.NewNSLock(dstBucket, dstObject)
|
||||||
ctx, err = lk.GetLock(ctx, globalOperationTimeout)
|
ctx, cancel, err = lk.GetLock(ctx, globalOperationTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return oi, err
|
return oi, err
|
||||||
}
|
}
|
||||||
defer lk.Unlock()
|
defer lk.Unlock(cancel)
|
||||||
}
|
}
|
||||||
// Read metadata associated with the object from all disks.
|
// Read metadata associated with the object from all disks.
|
||||||
storageDisks := er.getDisks()
|
storageDisks := er.getDisks()
|
||||||
@ -145,20 +146,21 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri
|
|||||||
|
|
||||||
// Acquire lock
|
// Acquire lock
|
||||||
if lockType != noLock {
|
if lockType != noLock {
|
||||||
|
var cancel context.CancelFunc
|
||||||
lock := er.NewNSLock(bucket, object)
|
lock := er.NewNSLock(bucket, object)
|
||||||
switch lockType {
|
switch lockType {
|
||||||
case writeLock:
|
case writeLock:
|
||||||
ctx, err = lock.GetLock(ctx, globalOperationTimeout)
|
ctx, cancel, err = lock.GetLock(ctx, globalOperationTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
nsUnlocker = lock.Unlock
|
nsUnlocker = func() { lock.Unlock(cancel) }
|
||||||
case readLock:
|
case readLock:
|
||||||
ctx, err = lock.GetRLock(ctx, globalOperationTimeout)
|
ctx, cancel, err = lock.GetRLock(ctx, globalOperationTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
nsUnlocker = lock.RUnlock
|
nsUnlocker = func() { lock.RUnlock(cancel) }
|
||||||
}
|
}
|
||||||
unlockOnDefer = true
|
unlockOnDefer = true
|
||||||
}
|
}
|
||||||
@ -213,11 +215,11 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri
|
|||||||
func (er erasureObjects) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) (err error) {
|
func (er erasureObjects) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) (err error) {
|
||||||
// Lock the object before reading.
|
// Lock the object before reading.
|
||||||
lk := er.NewNSLock(bucket, object)
|
lk := er.NewNSLock(bucket, object)
|
||||||
ctx, err = lk.GetRLock(ctx, globalOperationTimeout)
|
ctx, cancel, err := lk.GetRLock(ctx, globalOperationTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer lk.RUnlock()
|
defer lk.RUnlock(cancel)
|
||||||
|
|
||||||
// Start offset cannot be negative.
|
// Start offset cannot be negative.
|
||||||
if startOffset < 0 {
|
if startOffset < 0 {
|
||||||
@ -376,13 +378,14 @@ func (er erasureObjects) getObject(ctx context.Context, bucket, object string, s
|
|||||||
// GetObjectInfo - reads object metadata and replies back ObjectInfo.
|
// GetObjectInfo - reads object metadata and replies back ObjectInfo.
|
||||||
func (er erasureObjects) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (info ObjectInfo, err error) {
|
func (er erasureObjects) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (info ObjectInfo, err error) {
|
||||||
if !opts.NoLock {
|
if !opts.NoLock {
|
||||||
|
var cancel context.CancelFunc
|
||||||
// Lock the object before reading.
|
// Lock the object before reading.
|
||||||
lk := er.NewNSLock(bucket, object)
|
lk := er.NewNSLock(bucket, object)
|
||||||
ctx, err = lk.GetRLock(ctx, globalOperationTimeout)
|
ctx, cancel, err = lk.GetRLock(ctx, globalOperationTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ObjectInfo{}, err
|
return ObjectInfo{}, err
|
||||||
}
|
}
|
||||||
defer lk.RUnlock()
|
defer lk.RUnlock(cancel)
|
||||||
}
|
}
|
||||||
|
|
||||||
return er.getObjectInfo(ctx, bucket, object, opts)
|
return er.getObjectInfo(ctx, bucket, object, opts)
|
||||||
@ -732,12 +735,13 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
|
|||||||
|
|
||||||
if !opts.NoLock {
|
if !opts.NoLock {
|
||||||
var err error
|
var err error
|
||||||
|
var cancel context.CancelFunc
|
||||||
lk := er.NewNSLock(bucket, object)
|
lk := er.NewNSLock(bucket, object)
|
||||||
ctx, err = lk.GetLock(ctx, globalOperationTimeout)
|
ctx, cancel, err = lk.GetLock(ctx, globalOperationTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ObjectInfo{}, err
|
return ObjectInfo{}, err
|
||||||
}
|
}
|
||||||
defer lk.Unlock()
|
defer lk.Unlock(cancel)
|
||||||
}
|
}
|
||||||
|
|
||||||
for i, w := range writers {
|
for i, w := range writers {
|
||||||
@ -1043,11 +1047,11 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
|
|||||||
}
|
}
|
||||||
// Acquire a write lock before deleting the object.
|
// Acquire a write lock before deleting the object.
|
||||||
lk := er.NewNSLock(bucket, object)
|
lk := er.NewNSLock(bucket, object)
|
||||||
ctx, err = lk.GetLock(ctx, globalDeleteOperationTimeout)
|
ctx, cancel, err := lk.GetLock(ctx, globalDeleteOperationTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ObjectInfo{}, err
|
return ObjectInfo{}, err
|
||||||
}
|
}
|
||||||
defer lk.Unlock()
|
defer lk.Unlock(cancel)
|
||||||
|
|
||||||
storageDisks := er.getDisks()
|
storageDisks := er.getDisks()
|
||||||
writeQuorum := len(storageDisks)/2 + 1
|
writeQuorum := len(storageDisks)/2 + 1
|
||||||
@ -1154,14 +1158,13 @@ func (er erasureObjects) addPartial(bucket, object, versionID string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (er erasureObjects) PutObjectMetadata(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) {
|
func (er erasureObjects) PutObjectMetadata(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) {
|
||||||
var err error
|
|
||||||
// Lock the object before updating tags.
|
// Lock the object before updating tags.
|
||||||
lk := er.NewNSLock(bucket, object)
|
lk := er.NewNSLock(bucket, object)
|
||||||
ctx, err = lk.GetLock(ctx, globalOperationTimeout)
|
ctx, cancel, err := lk.GetLock(ctx, globalOperationTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ObjectInfo{}, err
|
return ObjectInfo{}, err
|
||||||
}
|
}
|
||||||
defer lk.Unlock()
|
defer lk.Unlock(cancel)
|
||||||
|
|
||||||
disks := er.getDisks()
|
disks := er.getDisks()
|
||||||
|
|
||||||
@ -1205,14 +1208,13 @@ func (er erasureObjects) PutObjectMetadata(ctx context.Context, bucket, object s
|
|||||||
|
|
||||||
// PutObjectTags - replace or add tags to an existing object
|
// PutObjectTags - replace or add tags to an existing object
|
||||||
func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object string, tags string, opts ObjectOptions) (ObjectInfo, error) {
|
func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object string, tags string, opts ObjectOptions) (ObjectInfo, error) {
|
||||||
var err error
|
|
||||||
// Lock the object before updating tags.
|
// Lock the object before updating tags.
|
||||||
lk := er.NewNSLock(bucket, object)
|
lk := er.NewNSLock(bucket, object)
|
||||||
ctx, err = lk.GetLock(ctx, globalOperationTimeout)
|
ctx, cancel, err := lk.GetLock(ctx, globalOperationTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ObjectInfo{}, err
|
return ObjectInfo{}, err
|
||||||
}
|
}
|
||||||
defer lk.Unlock()
|
defer lk.Unlock(cancel)
|
||||||
|
|
||||||
disks := er.getDisks()
|
disks := er.getDisks()
|
||||||
|
|
||||||
@ -1302,11 +1304,11 @@ func (er erasureObjects) TransitionObject(ctx context.Context, bucket, object st
|
|||||||
}
|
}
|
||||||
// Acquire write lock before starting to transition the object.
|
// Acquire write lock before starting to transition the object.
|
||||||
lk := er.NewNSLock(bucket, object)
|
lk := er.NewNSLock(bucket, object)
|
||||||
ctx, err = lk.GetLock(ctx, globalDeleteOperationTimeout)
|
ctx, cancel, err := lk.GetLock(ctx, globalDeleteOperationTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer lk.Unlock()
|
defer lk.Unlock(cancel)
|
||||||
|
|
||||||
fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, bucket, object, opts, true)
|
fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, bucket, object, opts, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -613,19 +613,20 @@ func (z *erasureServerPools) GetObjectNInfo(ctx context.Context, bucket, object
|
|||||||
// Acquire lock
|
// Acquire lock
|
||||||
if lockType != noLock {
|
if lockType != noLock {
|
||||||
lock := z.NewNSLock(bucket, object)
|
lock := z.NewNSLock(bucket, object)
|
||||||
|
var cancel context.CancelFunc
|
||||||
switch lockType {
|
switch lockType {
|
||||||
case writeLock:
|
case writeLock:
|
||||||
ctx, err = lock.GetLock(ctx, globalOperationTimeout)
|
ctx, cancel, err = lock.GetLock(ctx, globalOperationTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
nsUnlocker = lock.Unlock
|
nsUnlocker = func() { lock.Unlock(cancel) }
|
||||||
case readLock:
|
case readLock:
|
||||||
ctx, err = lock.GetRLock(ctx, globalOperationTimeout)
|
ctx, cancel, err = lock.GetRLock(ctx, globalOperationTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
nsUnlocker = lock.RUnlock
|
nsUnlocker = func() { lock.RUnlock(cancel) }
|
||||||
}
|
}
|
||||||
unlockOnDefer = true
|
unlockOnDefer = true
|
||||||
}
|
}
|
||||||
@ -684,11 +685,11 @@ func (z *erasureServerPools) GetObjectInfo(ctx context.Context, bucket, object s
|
|||||||
|
|
||||||
// Lock the object before reading.
|
// Lock the object before reading.
|
||||||
lk := z.NewNSLock(bucket, object)
|
lk := z.NewNSLock(bucket, object)
|
||||||
ctx, err = lk.GetRLock(ctx, globalOperationTimeout)
|
ctx, cancel, err := lk.GetRLock(ctx, globalOperationTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ObjectInfo{}, err
|
return ObjectInfo{}, err
|
||||||
}
|
}
|
||||||
defer lk.RUnlock()
|
defer lk.RUnlock(cancel)
|
||||||
|
|
||||||
errs := make([]error, len(z.serverPools))
|
errs := make([]error, len(z.serverPools))
|
||||||
objInfos := make([]ObjectInfo, len(z.serverPools))
|
objInfos := make([]ObjectInfo, len(z.serverPools))
|
||||||
@ -800,17 +801,16 @@ func (z *erasureServerPools) DeleteObjects(ctx context.Context, bucket string, o
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var err error
|
|
||||||
// Acquire a bulk write lock across 'objects'
|
// Acquire a bulk write lock across 'objects'
|
||||||
multiDeleteLock := z.NewNSLock(bucket, objSets.ToSlice()...)
|
multiDeleteLock := z.NewNSLock(bucket, objSets.ToSlice()...)
|
||||||
ctx, err = multiDeleteLock.GetLock(ctx, globalOperationTimeout)
|
ctx, cancel, err := multiDeleteLock.GetLock(ctx, globalOperationTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
for i := range derrs {
|
for i := range derrs {
|
||||||
derrs[i] = err
|
derrs[i] = err
|
||||||
}
|
}
|
||||||
return dobjects, derrs
|
return dobjects, derrs
|
||||||
}
|
}
|
||||||
defer multiDeleteLock.Unlock()
|
defer multiDeleteLock.Unlock(cancel)
|
||||||
|
|
||||||
if z.SinglePool() {
|
if z.SinglePool() {
|
||||||
return z.serverPools[0].DeleteObjects(ctx, bucket, objects, opts)
|
return z.serverPools[0].DeleteObjects(ctx, bucket, objects, opts)
|
||||||
@ -1371,14 +1371,13 @@ func (z *erasureServerPools) ListBuckets(ctx context.Context) (buckets []BucketI
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (z *erasureServerPools) HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error) {
|
func (z *erasureServerPools) HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error) {
|
||||||
var err error
|
|
||||||
// Acquire lock on format.json
|
// Acquire lock on format.json
|
||||||
formatLock := z.NewNSLock(minioMetaBucket, formatConfigFile)
|
formatLock := z.NewNSLock(minioMetaBucket, formatConfigFile)
|
||||||
ctx, err = formatLock.GetLock(ctx, globalOperationTimeout)
|
ctx, cancel, err := formatLock.GetLock(ctx, globalOperationTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return madmin.HealResultItem{}, err
|
return madmin.HealResultItem{}, err
|
||||||
}
|
}
|
||||||
defer formatLock.Unlock()
|
defer formatLock.Unlock(cancel)
|
||||||
|
|
||||||
var r = madmin.HealResultItem{
|
var r = madmin.HealResultItem{
|
||||||
Type: madmin.HealItemMetadata,
|
Type: madmin.HealItemMetadata,
|
||||||
|
@ -710,11 +710,11 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
|
|||||||
|
|
||||||
// Hold write lock on the object.
|
// Hold write lock on the object.
|
||||||
destLock := fs.NewNSLock(bucket, object)
|
destLock := fs.NewNSLock(bucket, object)
|
||||||
ctx, err = destLock.GetLock(ctx, globalOperationTimeout)
|
ctx, cancel, err := destLock.GetLock(ctx, globalOperationTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return oi, err
|
return oi, err
|
||||||
}
|
}
|
||||||
defer destLock.Unlock()
|
defer destLock.Unlock(cancel)
|
||||||
|
|
||||||
bucketMetaDir := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix)
|
bucketMetaDir := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix)
|
||||||
fsMetaPath := pathJoin(bucketMetaDir, bucket, object, fs.metaJSONFile)
|
fsMetaPath := pathJoin(bucketMetaDir, bucket, object, fs.metaJSONFile)
|
||||||
|
31
cmd/fs-v1.go
31
cmd/fs-v1.go
@ -609,12 +609,13 @@ func (fs *FSObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBu
|
|||||||
defer ObjectPathUpdated(path.Join(dstBucket, dstObject))
|
defer ObjectPathUpdated(path.Join(dstBucket, dstObject))
|
||||||
|
|
||||||
if !cpSrcDstSame {
|
if !cpSrcDstSame {
|
||||||
|
var cancel context.CancelFunc
|
||||||
objectDWLock := fs.NewNSLock(dstBucket, dstObject)
|
objectDWLock := fs.NewNSLock(dstBucket, dstObject)
|
||||||
ctx, err = objectDWLock.GetLock(ctx, globalOperationTimeout)
|
ctx, cancel, err = objectDWLock.GetLock(ctx, globalOperationTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return oi, err
|
return oi, err
|
||||||
}
|
}
|
||||||
defer objectDWLock.Unlock()
|
defer objectDWLock.Unlock(cancel)
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic.AddInt64(&fs.activeIOCount, 1)
|
atomic.AddInt64(&fs.activeIOCount, 1)
|
||||||
@ -702,19 +703,20 @@ func (fs *FSObjects) GetObjectNInfo(ctx context.Context, bucket, object string,
|
|||||||
if lockType != noLock {
|
if lockType != noLock {
|
||||||
// Lock the object before reading.
|
// Lock the object before reading.
|
||||||
lock := fs.NewNSLock(bucket, object)
|
lock := fs.NewNSLock(bucket, object)
|
||||||
|
var cancel context.CancelFunc
|
||||||
switch lockType {
|
switch lockType {
|
||||||
case writeLock:
|
case writeLock:
|
||||||
ctx, err = lock.GetLock(ctx, globalOperationTimeout)
|
ctx, cancel, err = lock.GetLock(ctx, globalOperationTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
nsUnlocker = lock.Unlock
|
nsUnlocker = func() { lock.Unlock(cancel) }
|
||||||
case readLock:
|
case readLock:
|
||||||
ctx, err = lock.GetRLock(ctx, globalOperationTimeout)
|
ctx, cancel, err = lock.GetRLock(ctx, globalOperationTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
nsUnlocker = lock.RUnlock
|
nsUnlocker = func() { lock.RUnlock(cancel) }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -983,11 +985,11 @@ func (fs *FSObjects) getObjectInfo(ctx context.Context, bucket, object string) (
|
|||||||
func (fs *FSObjects) getObjectInfoWithLock(ctx context.Context, bucket, object string) (oi ObjectInfo, err error) {
|
func (fs *FSObjects) getObjectInfoWithLock(ctx context.Context, bucket, object string) (oi ObjectInfo, err error) {
|
||||||
// Lock the object before reading.
|
// Lock the object before reading.
|
||||||
lk := fs.NewNSLock(bucket, object)
|
lk := fs.NewNSLock(bucket, object)
|
||||||
ctx, err = lk.GetRLock(ctx, globalOperationTimeout)
|
ctx, cancel, err := lk.GetRLock(ctx, globalOperationTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return oi, err
|
return oi, err
|
||||||
}
|
}
|
||||||
defer lk.RUnlock()
|
defer lk.RUnlock(cancel)
|
||||||
|
|
||||||
if err := checkGetObjArgs(ctx, bucket, object); err != nil {
|
if err := checkGetObjArgs(ctx, bucket, object); err != nil {
|
||||||
return oi, err
|
return oi, err
|
||||||
@ -1021,15 +1023,16 @@ func (fs *FSObjects) GetObjectInfo(ctx context.Context, bucket, object string, o
|
|||||||
|
|
||||||
oi, err := fs.getObjectInfoWithLock(ctx, bucket, object)
|
oi, err := fs.getObjectInfoWithLock(ctx, bucket, object)
|
||||||
if err == errCorruptedFormat || err == io.EOF {
|
if err == errCorruptedFormat || err == io.EOF {
|
||||||
|
var cancel context.CancelFunc
|
||||||
lk := fs.NewNSLock(bucket, object)
|
lk := fs.NewNSLock(bucket, object)
|
||||||
ctx, err = lk.GetLock(ctx, globalOperationTimeout)
|
_, cancel, err = lk.GetLock(ctx, globalOperationTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return oi, toObjectErr(err, bucket, object)
|
return oi, toObjectErr(err, bucket, object)
|
||||||
}
|
}
|
||||||
|
|
||||||
fsMetaPath := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket, object, fs.metaJSONFile)
|
fsMetaPath := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket, object, fs.metaJSONFile)
|
||||||
err = fs.createFsJSON(object, fsMetaPath)
|
err = fs.createFsJSON(object, fsMetaPath)
|
||||||
lk.Unlock()
|
lk.Unlock(cancel)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return oi, toObjectErr(err, bucket, object)
|
return oi, toObjectErr(err, bucket, object)
|
||||||
}
|
}
|
||||||
@ -1074,12 +1077,12 @@ func (fs *FSObjects) PutObject(ctx context.Context, bucket string, object string
|
|||||||
|
|
||||||
// Lock the object.
|
// Lock the object.
|
||||||
lk := fs.NewNSLock(bucket, object)
|
lk := fs.NewNSLock(bucket, object)
|
||||||
ctx, err = lk.GetLock(ctx, globalOperationTimeout)
|
ctx, cancel, err := lk.GetLock(ctx, globalOperationTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.LogIf(ctx, err)
|
logger.LogIf(ctx, err)
|
||||||
return objInfo, err
|
return objInfo, err
|
||||||
}
|
}
|
||||||
defer lk.Unlock()
|
defer lk.Unlock(cancel)
|
||||||
defer ObjectPathUpdated(path.Join(bucket, object))
|
defer ObjectPathUpdated(path.Join(bucket, object))
|
||||||
|
|
||||||
atomic.AddInt64(&fs.activeIOCount, 1)
|
atomic.AddInt64(&fs.activeIOCount, 1)
|
||||||
@ -1250,11 +1253,11 @@ func (fs *FSObjects) DeleteObject(ctx context.Context, bucket, object string, op
|
|||||||
|
|
||||||
// Acquire a write lock before deleting the object.
|
// Acquire a write lock before deleting the object.
|
||||||
lk := fs.NewNSLock(bucket, object)
|
lk := fs.NewNSLock(bucket, object)
|
||||||
ctx, err = lk.GetLock(ctx, globalOperationTimeout)
|
ctx, cancel, err := lk.GetLock(ctx, globalOperationTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return objInfo, err
|
return objInfo, err
|
||||||
}
|
}
|
||||||
defer lk.Unlock()
|
defer lk.Unlock(cancel)
|
||||||
|
|
||||||
if err = checkDelObjArgs(ctx, bucket, object); err != nil {
|
if err = checkDelObjArgs(ctx, bucket, object); err != nil {
|
||||||
return objInfo, err
|
return objInfo, err
|
||||||
|
@ -597,7 +597,8 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) {
|
|||||||
for {
|
for {
|
||||||
// let one of the server acquire the lock, if not let them timeout.
|
// let one of the server acquire the lock, if not let them timeout.
|
||||||
// which shall be retried again by this loop.
|
// which shall be retried again by this loop.
|
||||||
if _, err := txnLk.GetLock(retryCtx, iamLockTimeout); err != nil {
|
_, cancel, err := txnLk.GetLock(retryCtx, iamLockTimeout)
|
||||||
|
if err != nil {
|
||||||
logger.Info("Waiting for all MinIO IAM sub-system to be initialized.. trying to acquire lock")
|
logger.Info("Waiting for all MinIO IAM sub-system to be initialized.. trying to acquire lock")
|
||||||
time.Sleep(time.Duration(r.Float64() * float64(5*time.Second)))
|
time.Sleep(time.Duration(r.Float64() * float64(5*time.Second)))
|
||||||
continue
|
continue
|
||||||
@ -608,7 +609,7 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) {
|
|||||||
// Migrating to encrypted backend on etcd should happen before initialization of
|
// Migrating to encrypted backend on etcd should happen before initialization of
|
||||||
// IAM sub-system, make sure that we do not move the above codeblock elsewhere.
|
// IAM sub-system, make sure that we do not move the above codeblock elsewhere.
|
||||||
if err := migrateIAMConfigsEtcdToEncrypted(ctx, globalEtcdClient); err != nil {
|
if err := migrateIAMConfigsEtcdToEncrypted(ctx, globalEtcdClient); err != nil {
|
||||||
txnLk.Unlock()
|
txnLk.Unlock(cancel)
|
||||||
logger.LogIf(ctx, fmt.Errorf("Unable to decrypt an encrypted ETCD backend for IAM users and policies: %w", err))
|
logger.LogIf(ctx, fmt.Errorf("Unable to decrypt an encrypted ETCD backend for IAM users and policies: %w", err))
|
||||||
logger.LogIf(ctx, errors.New("IAM sub-system is partially initialized, some users may not be available"))
|
logger.LogIf(ctx, errors.New("IAM sub-system is partially initialized, some users may not be available"))
|
||||||
return
|
return
|
||||||
@ -622,7 +623,7 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) {
|
|||||||
|
|
||||||
// Migrate IAM configuration, if necessary.
|
// Migrate IAM configuration, if necessary.
|
||||||
if err := sys.doIAMConfigMigration(ctx); err != nil {
|
if err := sys.doIAMConfigMigration(ctx); err != nil {
|
||||||
txnLk.Unlock()
|
txnLk.Unlock(cancel)
|
||||||
if configRetriableErrors(err) {
|
if configRetriableErrors(err) {
|
||||||
logger.Info("Waiting for all MinIO IAM sub-system to be initialized.. possible cause (%v)", err)
|
logger.Info("Waiting for all MinIO IAM sub-system to be initialized.. possible cause (%v)", err)
|
||||||
continue
|
continue
|
||||||
@ -633,7 +634,7 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Successfully migrated, proceed to load the users.
|
// Successfully migrated, proceed to load the users.
|
||||||
txnLk.Unlock()
|
txnLk.Unlock(cancel)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -39,10 +39,10 @@ var globalLockServer *localLocker
|
|||||||
|
|
||||||
// RWLocker - locker interface to introduce GetRLock, RUnlock.
|
// RWLocker - locker interface to introduce GetRLock, RUnlock.
|
||||||
type RWLocker interface {
|
type RWLocker interface {
|
||||||
GetLock(ctx context.Context, timeout *dynamicTimeout) (newCtx context.Context, timedOutErr error)
|
GetLock(ctx context.Context, timeout *dynamicTimeout) (newCtx context.Context, cancel context.CancelFunc, timedOutErr error)
|
||||||
Unlock()
|
Unlock(cancel context.CancelFunc)
|
||||||
GetRLock(ctx context.Context, timeout *dynamicTimeout) (newCtx context.Context, timedOutErr error)
|
GetRLock(ctx context.Context, timeout *dynamicTimeout) (newCtx context.Context, cancel context.CancelFunc, timedOutErr error)
|
||||||
RUnlock()
|
RUnlock(cancel context.CancelFunc)
|
||||||
}
|
}
|
||||||
|
|
||||||
// newNSLock - return a new name space lock map.
|
// newNSLock - return a new name space lock map.
|
||||||
@ -143,7 +143,7 @@ type distLockInstance struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Lock - block until write lock is taken or timeout has occurred.
|
// Lock - block until write lock is taken or timeout has occurred.
|
||||||
func (di *distLockInstance) GetLock(ctx context.Context, timeout *dynamicTimeout) (context.Context, error) {
|
func (di *distLockInstance) GetLock(ctx context.Context, timeout *dynamicTimeout) (context.Context, context.CancelFunc, error) {
|
||||||
lockSource := getSource(2)
|
lockSource := getSource(2)
|
||||||
start := UTCNow()
|
start := UTCNow()
|
||||||
|
|
||||||
@ -152,19 +152,22 @@ func (di *distLockInstance) GetLock(ctx context.Context, timeout *dynamicTimeout
|
|||||||
Timeout: timeout.Timeout(),
|
Timeout: timeout.Timeout(),
|
||||||
}) {
|
}) {
|
||||||
timeout.LogFailure()
|
timeout.LogFailure()
|
||||||
return ctx, OperationTimedOut{}
|
return ctx, nil, OperationTimedOut{}
|
||||||
}
|
}
|
||||||
timeout.LogSuccess(UTCNow().Sub(start))
|
timeout.LogSuccess(UTCNow().Sub(start))
|
||||||
return newCtx, nil
|
return newCtx, cancel, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unlock - block until write lock is released.
|
// Unlock - block until write lock is released.
|
||||||
func (di *distLockInstance) Unlock() {
|
func (di *distLockInstance) Unlock(cancel context.CancelFunc) {
|
||||||
|
if cancel != nil {
|
||||||
|
cancel()
|
||||||
|
}
|
||||||
di.rwMutex.Unlock()
|
di.rwMutex.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// RLock - block until read lock is taken or timeout has occurred.
|
// RLock - block until read lock is taken or timeout has occurred.
|
||||||
func (di *distLockInstance) GetRLock(ctx context.Context, timeout *dynamicTimeout) (context.Context, error) {
|
func (di *distLockInstance) GetRLock(ctx context.Context, timeout *dynamicTimeout) (context.Context, context.CancelFunc, error) {
|
||||||
lockSource := getSource(2)
|
lockSource := getSource(2)
|
||||||
start := UTCNow()
|
start := UTCNow()
|
||||||
|
|
||||||
@ -173,14 +176,17 @@ func (di *distLockInstance) GetRLock(ctx context.Context, timeout *dynamicTimeou
|
|||||||
Timeout: timeout.Timeout(),
|
Timeout: timeout.Timeout(),
|
||||||
}) {
|
}) {
|
||||||
timeout.LogFailure()
|
timeout.LogFailure()
|
||||||
return ctx, OperationTimedOut{}
|
return ctx, nil, OperationTimedOut{}
|
||||||
}
|
}
|
||||||
timeout.LogSuccess(UTCNow().Sub(start))
|
timeout.LogSuccess(UTCNow().Sub(start))
|
||||||
return newCtx, nil
|
return newCtx, cancel, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// RUnlock - block until read lock is released.
|
// RUnlock - block until read lock is released.
|
||||||
func (di *distLockInstance) RUnlock() {
|
func (di *distLockInstance) RUnlock(cancel context.CancelFunc) {
|
||||||
|
if cancel != nil {
|
||||||
|
cancel()
|
||||||
|
}
|
||||||
di.rwMutex.RUnlock()
|
di.rwMutex.RUnlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -208,7 +214,7 @@ func (n *nsLockMap) NewNSLock(lockers func() ([]dsync.NetLocker, string), volume
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Lock - block until write lock is taken or timeout has occurred.
|
// Lock - block until write lock is taken or timeout has occurred.
|
||||||
func (li *localLockInstance) GetLock(ctx context.Context, timeout *dynamicTimeout) (_ context.Context, timedOutErr error) {
|
func (li *localLockInstance) GetLock(ctx context.Context, timeout *dynamicTimeout) (_ context.Context, _ context.CancelFunc, timedOutErr error) {
|
||||||
lockSource := getSource(2)
|
lockSource := getSource(2)
|
||||||
start := UTCNow()
|
start := UTCNow()
|
||||||
const readLock = false
|
const readLock = false
|
||||||
@ -221,16 +227,19 @@ func (li *localLockInstance) GetLock(ctx context.Context, timeout *dynamicTimeou
|
|||||||
li.ns.unlock(li.volume, li.paths[si], readLock)
|
li.ns.unlock(li.volume, li.paths[si], readLock)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil, OperationTimedOut{}
|
return nil, nil, OperationTimedOut{}
|
||||||
}
|
}
|
||||||
success[i] = 1
|
success[i] = 1
|
||||||
}
|
}
|
||||||
timeout.LogSuccess(UTCNow().Sub(start))
|
timeout.LogSuccess(UTCNow().Sub(start))
|
||||||
return ctx, nil
|
return ctx, func() {}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unlock - block until write lock is released.
|
// Unlock - block until write lock is released.
|
||||||
func (li *localLockInstance) Unlock() {
|
func (li *localLockInstance) Unlock(cancel context.CancelFunc) {
|
||||||
|
if cancel != nil {
|
||||||
|
cancel()
|
||||||
|
}
|
||||||
const readLock = false
|
const readLock = false
|
||||||
for _, path := range li.paths {
|
for _, path := range li.paths {
|
||||||
li.ns.unlock(li.volume, path, readLock)
|
li.ns.unlock(li.volume, path, readLock)
|
||||||
@ -238,7 +247,7 @@ func (li *localLockInstance) Unlock() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// RLock - block until read lock is taken or timeout has occurred.
|
// RLock - block until read lock is taken or timeout has occurred.
|
||||||
func (li *localLockInstance) GetRLock(ctx context.Context, timeout *dynamicTimeout) (_ context.Context, timedOutErr error) {
|
func (li *localLockInstance) GetRLock(ctx context.Context, timeout *dynamicTimeout) (_ context.Context, _ context.CancelFunc, timedOutErr error) {
|
||||||
lockSource := getSource(2)
|
lockSource := getSource(2)
|
||||||
start := UTCNow()
|
start := UTCNow()
|
||||||
const readLock = true
|
const readLock = true
|
||||||
@ -251,16 +260,19 @@ func (li *localLockInstance) GetRLock(ctx context.Context, timeout *dynamicTimeo
|
|||||||
li.ns.unlock(li.volume, li.paths[si], readLock)
|
li.ns.unlock(li.volume, li.paths[si], readLock)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil, OperationTimedOut{}
|
return nil, nil, OperationTimedOut{}
|
||||||
}
|
}
|
||||||
success[i] = 1
|
success[i] = 1
|
||||||
}
|
}
|
||||||
timeout.LogSuccess(UTCNow().Sub(start))
|
timeout.LogSuccess(UTCNow().Sub(start))
|
||||||
return ctx, nil
|
return ctx, func() {}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// RUnlock - block until read lock is released.
|
// RUnlock - block until read lock is released.
|
||||||
func (li *localLockInstance) RUnlock() {
|
func (li *localLockInstance) RUnlock(cancel context.CancelFunc) {
|
||||||
|
if cancel != nil {
|
||||||
|
cancel()
|
||||||
|
}
|
||||||
const readLock = true
|
const readLock = true
|
||||||
for _, path := range li.paths {
|
for _, path := range li.paths {
|
||||||
li.ns.unlock(li.volume, path, readLock)
|
li.ns.unlock(li.volume, path, readLock)
|
||||||
|
@ -289,7 +289,6 @@ func initServer(ctx context.Context, newObject ObjectLayer) error {
|
|||||||
|
|
||||||
lockTimeout := newDynamicTimeout(5*time.Second, 3*time.Second)
|
lockTimeout := newDynamicTimeout(5*time.Second, 3*time.Second)
|
||||||
|
|
||||||
var err error
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
@ -300,7 +299,8 @@ func initServer(ctx context.Context, newObject ObjectLayer) error {
|
|||||||
|
|
||||||
// let one of the server acquire the lock, if not let them timeout.
|
// let one of the server acquire the lock, if not let them timeout.
|
||||||
// which shall be retried again by this loop.
|
// which shall be retried again by this loop.
|
||||||
if _, err = txnLk.GetLock(ctx, lockTimeout); err != nil {
|
_, cancel, err := txnLk.GetLock(ctx, lockTimeout)
|
||||||
|
if err != nil {
|
||||||
logger.Info("Waiting for all MinIO sub-systems to be initialized.. trying to acquire lock")
|
logger.Info("Waiting for all MinIO sub-systems to be initialized.. trying to acquire lock")
|
||||||
|
|
||||||
time.Sleep(time.Duration(r.Float64() * float64(5*time.Second)))
|
time.Sleep(time.Duration(r.Float64() * float64(5*time.Second)))
|
||||||
@ -319,7 +319,7 @@ func initServer(ctx context.Context, newObject ObjectLayer) error {
|
|||||||
// Upon success migrating the config, initialize all sub-systems
|
// Upon success migrating the config, initialize all sub-systems
|
||||||
// if all sub-systems initialized successfully return right away
|
// if all sub-systems initialized successfully return right away
|
||||||
if err = initAllSubsystems(ctx, newObject); err == nil {
|
if err = initAllSubsystems(ctx, newObject); err == nil {
|
||||||
txnLk.Unlock()
|
txnLk.Unlock(cancel)
|
||||||
// All successful return.
|
// All successful return.
|
||||||
if globalIsDistErasure {
|
if globalIsDistErasure {
|
||||||
// These messages only meant primarily for distributed setup, so only log during distributed setup.
|
// These messages only meant primarily for distributed setup, so only log during distributed setup.
|
||||||
@ -329,7 +329,7 @@ func initServer(ctx context.Context, newObject ObjectLayer) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
txnLk.Unlock() // Unlock the transaction lock and allow other nodes to acquire the lock if possible.
|
txnLk.Unlock(cancel) // Unlock the transaction lock and allow other nodes to acquire the lock if possible.
|
||||||
|
|
||||||
if configRetriableErrors(err) {
|
if configRetriableErrors(err) {
|
||||||
logger.Info("Waiting for all MinIO sub-systems to be initialized.. possible cause (%v)", err)
|
logger.Info("Waiting for all MinIO sub-systems to be initialized.. possible cause (%v)", err)
|
||||||
|
Loading…
Reference in New Issue
Block a user