diff --git a/cmd/bucket-notification-handlers.go b/cmd/bucket-notification-handlers.go index 8068ef2ae..376022c49 100644 --- a/cmd/bucket-notification-handlers.go +++ b/cmd/bucket-notification-handlers.go @@ -166,10 +166,10 @@ func PutBucketNotificationConfig(bucket string, ncfg *notificationConfig, objAPI // Acquire a write lock on bucket before modifying its // configuration. - opsID := getOpsID() - nsMutex.Lock(bucket, "", opsID) + bucketLock := nsMutex.NewNSLock(bucket, "") + bucketLock.Lock() // Release lock after notifying peers - defer nsMutex.Unlock(bucket, "", opsID) + defer bucketLock.Unlock() // persist config to disk err := persistNotificationConfig(bucket, ncfg, objAPI) @@ -374,10 +374,10 @@ func AddBucketListenerConfig(bucket string, lcfg *listenerConfig, objAPI ObjectL // Acquire a write lock on bucket before modifying its // configuration. - opsID := getOpsID() - nsMutex.Lock(bucket, "", opsID) + bucketLock := nsMutex.NewNSLock(bucket, "") + bucketLock.Lock() // Release lock after notifying peers - defer nsMutex.Unlock(bucket, "", opsID) + defer bucketLock.Unlock() // update persistent config if dist XL if globalIsDistXL { @@ -416,10 +416,10 @@ func RemoveBucketListenerConfig(bucket string, lcfg *listenerConfig, objAPI Obje // Acquire a write lock on bucket before modifying its // configuration. - opsID := getOpsID() - nsMutex.Lock(bucket, "", opsID) + bucketLock := nsMutex.NewNSLock(bucket, "") + bucketLock.Lock() // Release lock after notifying peers - defer nsMutex.Unlock(bucket, "", opsID) + defer bucketLock.Unlock() // update persistent config if dist XL if globalIsDistXL { diff --git a/cmd/bucket-policy-handlers.go b/cmd/bucket-policy-handlers.go index 10bb51896..0ada4f934 100644 --- a/cmd/bucket-policy-handlers.go +++ b/cmd/bucket-policy-handlers.go @@ -209,10 +209,10 @@ func persistAndNotifyBucketPolicyChange(bucket string, pCh policyChange, objAPI // Acquire a write lock on bucket before modifying its // configuration. - opsID := getOpsID() - nsMutex.Lock(bucket, "", opsID) + bucketLock := nsMutex.NewNSLock(bucket, "") + bucketLock.Lock() // Release lock after notifying peers - defer nsMutex.Unlock(bucket, "", opsID) + defer bucketLock.Unlock() if pCh.IsRemove { if err := removeBucketPolicy(bucket, objAPI); err != nil { diff --git a/cmd/control-lock-main_test.go b/cmd/control-lock-main_test.go index 93b1313d4..ad2557023 100644 --- a/cmd/control-lock-main_test.go +++ b/cmd/control-lock-main_test.go @@ -23,12 +23,13 @@ import ( // Test print systemState. func TestPrintLockState(t *testing.T) { - nsMutex.Lock("testbucket", "1.txt", "11-11") + testLock := nsMutex.NewNSLock("testbucket", "1.txt") + testLock.Lock() sysLockState, err := getSystemLockState() if err != nil { t.Fatal(err) } - nsMutex.Unlock("testbucket", "1.txt", "11-11") + testLock.Unlock() sysLockStateMap := map[string]SystemLockState{} sysLockStateMap["bucket"] = sysLockState @@ -66,7 +67,8 @@ func TestLockStateClear(t *testing.T) { nsMutex.ForceUnlock(bucket, object) } - nsMutex.Lock("testbucket", "1.txt", "11-11") + testLock := nsMutex.NewNSLock("testbucket", "1.txt") + testLock.Lock() sysLockState, err := getSystemLockState() if err != nil { @@ -111,7 +113,8 @@ func TestLockStateClear(t *testing.T) { } // Create another lock - nsMutex.RLock("testbucket", "blob.txt", "22-22") + blobLock := nsMutex.NewNSLock("testbucket", "blob.txt") + blobLock.RLock() if sysLockState, err = getSystemLockState(); err != nil { t.Fatal(err) @@ -142,7 +145,8 @@ func TestLockStateClear(t *testing.T) { } // Create yet another lock - nsMutex.RLock("testbucket", "exact.txt", "33-33") + exactLock := nsMutex.NewNSLock("testbucket", "exact.txt") + exactLock.RLock() if sysLockState, err = getSystemLockState(); err != nil { t.Fatal(err) diff --git a/cmd/fs-v1-multipart.go b/cmd/fs-v1-multipart.go index a1de8a111..90c6caa4f 100644 --- a/cmd/fs-v1-multipart.go +++ b/cmd/fs-v1-multipart.go @@ -60,11 +60,11 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark var err error var eof bool if uploadIDMarker != "" { - // get a random ID for lock instrumentation. - opsID := getOpsID() - nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, keyMarker), opsID) + keyMarkerLock := nsMutex.NewNSLock(minioMetaBucket, + pathJoin(mpartMetaPrefix, bucket, keyMarker)) + keyMarkerLock.RLock() uploads, _, err = listMultipartUploadIDs(bucket, keyMarker, uploadIDMarker, maxUploads, fs.storage) - nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, keyMarker), opsID) + keyMarkerLock.RUnlock() if err != nil { return ListMultipartsInfo{}, err } @@ -115,12 +115,11 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark var end bool uploadIDMarker = "" - // get a random ID for lock instrumentation. - opsID := getOpsID() - - nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, entry), opsID) + entryLock := nsMutex.NewNSLock(minioMetaBucket, + pathJoin(mpartMetaPrefix, bucket, entry)) + entryLock.RLock() tmpUploads, end, err = listMultipartUploadIDs(bucket, entry, uploadIDMarker, maxUploads, fs.storage) - nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, entry), opsID) + entryLock.RUnlock() if err != nil { return ListMultipartsInfo{}, err } @@ -233,12 +232,12 @@ func (fs fsObjects) newMultipartUpload(bucket string, object string, meta map[st fsMeta.Meta = meta } - // get a random ID for lock instrumentation. - opsID := getOpsID() - - // This lock needs to be held for any changes to the directory contents of ".minio.sys/multipart/object/" - nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID) - defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID) + // This lock needs to be held for any changes to the directory + // contents of ".minio.sys/multipart/object/" + objectMPartPathLock := nsMutex.NewNSLock(minioMetaBucket, + pathJoin(mpartMetaPrefix, bucket, object)) + objectMPartPathLock.Lock() + defer objectMPartPathLock.Unlock() uploadID = getUUID() initiated := time.Now().UTC() @@ -301,7 +300,7 @@ func getFSAppendDataPath(uploadID string) string { } // Append parts to fsAppendDataFile. -func appendParts(disk StorageAPI, bucket, object, uploadID, opsID string) { +func appendParts(disk StorageAPI, bucket, object, uploadID string) { cleanupAppendPaths := func() { // In case of any error, cleanup the append data and json files // from the tmp so that we do not have any inconsistent append @@ -316,16 +315,18 @@ func appendParts(disk StorageAPI, bucket, object, uploadID, opsID string) { fsMetaPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, fsMetaJSONFile) // Lock the uploadID so that no one modifies fs.json - nsMutex.RLock(minioMetaBucket, uploadIDPath, opsID) + uploadIDLock := nsMutex.NewNSLock(minioMetaBucket, uploadIDPath) + uploadIDLock.RLock() fsMeta, err := readFSMetadata(disk, minioMetaBucket, fsMetaPath) - nsMutex.RUnlock(minioMetaBucket, uploadIDPath, opsID) + uploadIDLock.RUnlock() if err != nil { return } // Lock fs-append.json so that there is no parallel append to the file. - nsMutex.Lock(minioMetaBucket, fsAppendMetaPath, opsID) - defer nsMutex.Unlock(minioMetaBucket, fsAppendMetaPath, opsID) + appendPathLock := nsMutex.NewNSLock(minioMetaBucket, fsAppendMetaPath) + appendPathLock.Lock() + defer appendPathLock.Unlock() fsAppendMeta, err := readFSMetadata(disk, minioMetaBucket, fsAppendMetaPath) if err != nil { @@ -344,8 +345,9 @@ func appendParts(disk StorageAPI, bucket, object, uploadID, opsID string) { } // Hold write lock on the part so that there is no parallel upload on the part. partPath := pathJoin(mpartMetaPrefix, bucket, object, uploadID, strconv.Itoa(part.Number)) - nsMutex.Lock(minioMetaBucket, partPath, opsID) - defer nsMutex.Unlock(minioMetaBucket, partPath, opsID) + partPathLock := nsMutex.NewNSLock(minioMetaBucket, partPath) + partPathLock.Lock() + defer partPathLock.Unlock() // Proceed to append "part" fsAppendDataPath := getFSAppendDataPath(uploadID) @@ -386,7 +388,7 @@ func appendParts(disk StorageAPI, bucket, object, uploadID, opsID string) { // If there are more parts that need to be appended to fsAppendDataFile _, appendNeeded = partToAppend(fsMeta, fsAppendMeta) if appendNeeded { - go appendParts(disk, bucket, object, uploadID, opsID) + go appendParts(disk, bucket, object, uploadID) } } @@ -409,13 +411,11 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) - // get a random ID for lock instrumentation. - opsID := getOpsID() - - nsMutex.RLock(minioMetaBucket, uploadIDPath, opsID) + preUploadIDLock := nsMutex.NewNSLock(minioMetaBucket, uploadIDPath) + preUploadIDLock.RLock() // Just check if the uploadID exists to avoid copy if it doesn't. uploadIDExists := fs.isUploadIDExists(bucket, object, uploadID) - nsMutex.RUnlock(minioMetaBucket, uploadIDPath, opsID) + preUploadIDLock.RUnlock() if !uploadIDExists { return "", traceError(InvalidUploadID{UploadID: uploadID}) } @@ -490,14 +490,10 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s } } - // get a random ID for lock instrumentation. - // generates random string on setting MINIO_DEBUG=lock, else returns empty string. - // used for instrumentation on locks. - opsID = getOpsID() - // Hold write lock as we are updating fs.json - nsMutex.Lock(minioMetaBucket, uploadIDPath, opsID) - defer nsMutex.Unlock(minioMetaBucket, uploadIDPath, opsID) + postUploadIDLock := nsMutex.NewNSLock(minioMetaBucket, uploadIDPath) + postUploadIDLock.Lock() + defer postUploadIDLock.Unlock() // Just check if the uploadID exists to avoid copy if it doesn't. if !fs.isUploadIDExists(bucket, object, uploadID) { @@ -520,7 +516,7 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s if err = writeFSMetadata(fs.storage, minioMetaBucket, path.Join(uploadIDPath, fsMetaJSONFile), fsMeta); err != nil { return "", toObjectErr(err, minioMetaBucket, uploadIDPath) } - go appendParts(fs.storage, bucket, object, uploadID, opsID) + go appendParts(fs.storage, bucket, object, uploadID) return newMD5Hex, nil } @@ -595,12 +591,12 @@ func (fs fsObjects) ListObjectParts(bucket, object, uploadID string, partNumberM return ListPartsInfo{}, traceError(ObjectNameInvalid{Bucket: bucket, Object: object}) } - // get a random ID for lock instrumentation. - opsID := getOpsID() - - // Hold lock so that there is no competing abort-multipart-upload or complete-multipart-upload. - nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID) - defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID) + // Hold lock so that there is no competing + // abort-multipart-upload or complete-multipart-upload. + uploadIDLock := nsMutex.NewNSLock(minioMetaBucket, + pathJoin(mpartMetaPrefix, bucket, object, uploadID)) + uploadIDLock.Lock() + defer uploadIDLock.Unlock() if !fs.isUploadIDExists(bucket, object, uploadID) { return ListPartsInfo{}, traceError(InvalidUploadID{UploadID: uploadID}) @@ -643,15 +639,14 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload } uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) - // get a random ID for lock instrumentation. - opsID := getOpsID() // Hold lock so that // 1) no one aborts this multipart upload // 2) no one does a parallel complete-multipart-upload on this // multipart upload - nsMutex.Lock(minioMetaBucket, uploadIDPath, opsID) - defer nsMutex.Unlock(minioMetaBucket, uploadIDPath, opsID) + uploadIDLock := nsMutex.NewNSLock(minioMetaBucket, uploadIDPath) + uploadIDLock.Lock() + defer uploadIDLock.Unlock() if !fs.isUploadIDExists(bucket, object, uploadID) { return "", traceError(InvalidUploadID{UploadID: uploadID}) @@ -660,8 +655,9 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload // fs-append.json path fsAppendMetaPath := getFSAppendMetaPath(uploadID) // Lock fs-append.json so that no parallel appendParts() is being done. - nsMutex.Lock(minioMetaBucket, fsAppendMetaPath, opsID) - defer nsMutex.Unlock(minioMetaBucket, fsAppendMetaPath, opsID) + appendPathLock := nsMutex.NewNSLock(minioMetaBucket, fsAppendMetaPath) + appendPathLock.Lock() + defer appendPathLock.Unlock() // Calculate s3 compatible md5sum for complete multipart. s3MD5, err := getCompleteMultipartMD5(parts...) @@ -788,13 +784,13 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload return "", toObjectErr(err, bucket, object) } - // get a random ID for lock instrumentation. - opsID = getOpsID() - - // Hold the lock so that two parallel complete-multipart-uploads do not - // leave a stale uploads.json behind. - nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID) - defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID) + // Hold the lock so that two parallel + // complete-multipart-uploads do not leave a stale + // uploads.json behind. + objectMPartPathLock := nsMutex.NewNSLock(minioMetaBucket, + pathJoin(mpartMetaPrefix, bucket, object)) + objectMPartPathLock.Lock() + defer objectMPartPathLock.Unlock() // remove entry from uploads.json if err = fs.updateUploadJSON(bucket, object, uploadIDChange{uploadID: uploadID, isRemove: true}); err != nil { @@ -848,12 +844,12 @@ func (fs fsObjects) AbortMultipartUpload(bucket, object, uploadID string) error return traceError(ObjectNameInvalid{Bucket: bucket, Object: object}) } - // get a random ID for lock instrumentation. - opsID := getOpsID() - - // Hold lock so that there is no competing complete-multipart-upload or put-object-part. - nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID) - defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID) + // Hold lock so that there is no competing + // complete-multipart-upload or put-object-part. + uploadIDLock := nsMutex.NewNSLock(minioMetaBucket, + pathJoin(mpartMetaPrefix, bucket, object, uploadID)) + uploadIDLock.Lock() + defer uploadIDLock.Unlock() if !fs.isUploadIDExists(bucket, object, uploadID) { return traceError(InvalidUploadID{UploadID: uploadID}) @@ -861,8 +857,9 @@ func (fs fsObjects) AbortMultipartUpload(bucket, object, uploadID string) error fsAppendMetaPath := getFSAppendMetaPath(uploadID) // Lock fs-append.json so that no parallel appendParts() is being done. - nsMutex.Lock(minioMetaBucket, fsAppendMetaPath, opsID) - defer nsMutex.Unlock(minioMetaBucket, fsAppendMetaPath, opsID) + appendPathLock := nsMutex.NewNSLock(minioMetaBucket, fsAppendMetaPath) + appendPathLock.Lock() + defer appendPathLock.Unlock() err := fs.abortMultipartUpload(bucket, object, uploadID) return err diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index c41abb3b0..fce574d9b 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -226,12 +226,10 @@ func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64, return traceError(InvalidRange{offset, length, fi.Size}) } - // get a random ID for lock instrumentation. - opsID := getOpsID() - // Lock the object before reading. - nsMutex.RLock(bucket, object, opsID) - defer nsMutex.RUnlock(bucket, object, opsID) + objectLock := nsMutex.NewNSLock(bucket, object) + objectLock.RLock() + defer objectLock.RUnlock() var totalLeft = length bufSize := int64(readSizeV1) @@ -446,12 +444,10 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io. } } - // get a random ID for lock instrumentation. - opsID := getOpsID() - // Lock the object before committing the object. - nsMutex.RLock(bucket, object, opsID) - defer nsMutex.RUnlock(bucket, object, opsID) + objectLock := nsMutex.NewNSLock(bucket, object) + objectLock.RLock() + defer objectLock.RUnlock() // Entire object was written to the temp location, now it's safe to rename it to the actual location. err = fs.storage.RenameFile(minioMetaBucket, tempObj, bucket, object) @@ -488,13 +484,12 @@ func (fs fsObjects) DeleteObject(bucket, object string) error { if !IsValidObjectName(object) { return traceError(ObjectNameInvalid{Bucket: bucket, Object: object}) } - // get a random ID for lock instrumentation. - opsID := getOpsID() // Lock the object before deleting so that an in progress GetObject does not return // corrupt data or there is no race with a PutObject. - nsMutex.RLock(bucket, object, opsID) - defer nsMutex.RUnlock(bucket, object, opsID) + objectLock := nsMutex.NewNSLock(bucket, object) + objectLock.RLock() + defer objectLock.RUnlock() err := fs.storage.DeleteFile(minioMetaBucket, path.Join(bucketMetaPrefix, bucket, object, fsMetaJSONFile)) if err != nil && err != errFileNotFound { diff --git a/cmd/lock-instrument_test.go b/cmd/lock-instrument_test.go index 31aa32176..89e3a6d5d 100644 --- a/cmd/lock-instrument_test.go +++ b/cmd/lock-instrument_test.go @@ -256,7 +256,6 @@ func TestNsLockMapStatusBlockedToRunning(t *testing.T) { }{ // Test case - 1. { - volume: "my-bucket", path: "my-object", lockOrigin: "/home/vadmeste/work/go/src/github.com/minio/minio/xl-v1-object.go:683 +0x2a", @@ -270,7 +269,6 @@ func TestNsLockMapStatusBlockedToRunning(t *testing.T) { // No entry for pair. // So an attempt to change the state of the lock from `Blocked`->`Running` should fail. { - volume: "my-bucket", path: "my-object-2", lockOrigin: "/home/vadmeste/work/go/src/github.com/minio/minio/xl-v1-object.go:683 +0x2a", @@ -307,7 +305,6 @@ func TestNsLockMapStatusBlockedToRunning(t *testing.T) { // Test case - 5. // Test case with write lock. { - volume: "my-bucket", path: "my-object", lockOrigin: "/home/vadmeste/work/go/src/github.com/minio/minio/xl-v1-object.go:683 +0x2a", diff --git a/cmd/namespace-lock.go b/cmd/namespace-lock.go index 4ddc61639..27be90bf2 100644 --- a/cmd/namespace-lock.go +++ b/cmd/namespace-lock.go @@ -256,3 +256,42 @@ func (n *nsLockMap) ForceUnlock(volume, path string) { } } } + +// lockInstance - frontend/top-level interface for namespace locks. +type lockInstance struct { + n *nsLockMap + volume, path, opsID string +} + +// NewNSLock - returns a lock instance for a given volume and +// path. The returned lockInstance object encapsulates the nsLockMap, +// volume, path and operation ID. +func (n *nsLockMap) NewNSLock(volume, path string) *lockInstance { + return &lockInstance{n, volume, path, getOpsID()} +} + +// Lock - block until write lock is taken. +func (li *lockInstance) Lock() { + lockLocation := callerLocation() + readLock := false + li.n.lock(li.volume, li.path, lockLocation, li.opsID, readLock) +} + +// Unlock - block until write lock is released. +func (li *lockInstance) Unlock() { + readLock := false + li.n.unlock(li.volume, li.path, li.opsID, readLock) +} + +// RLock - block until read lock is taken. +func (li *lockInstance) RLock() { + lockLocation := callerLocation() + readLock := true + li.n.lock(li.volume, li.path, lockLocation, li.opsID, readLock) +} + +// RUnlock - block until read lock is released. +func (li *lockInstance) RUnlock() { + readLock := true + li.n.unlock(li.volume, li.path, li.opsID, readLock) +} diff --git a/cmd/namespace-lock_test.go b/cmd/namespace-lock_test.go index 9b289084c..0679b9843 100644 --- a/cmd/namespace-lock_test.go +++ b/cmd/namespace-lock_test.go @@ -386,7 +386,8 @@ func TestLockStats(t *testing.T) { func TestNamespaceForceUnlockTest(t *testing.T) { // Create lock. - nsMutex.Lock("bucket", "object", "11-11") + lock := nsMutex.NewNSLock("bucket", "object") + lock.Lock() // Forcefully unlock lock. nsMutex.ForceUnlock("bucket", "object") @@ -394,7 +395,8 @@ func TestNamespaceForceUnlockTest(t *testing.T) { go func() { // Try to claim lock again. - nsMutex.Lock("bucket", "object", "22-22") + anotherLock := nsMutex.NewNSLock("bucket", "object") + anotherLock.Lock() // And signal succes. ch <- struct{}{} }() diff --git a/cmd/xl-v1-bucket.go b/cmd/xl-v1-bucket.go index 45cb33851..0c52abfa6 100644 --- a/cmd/xl-v1-bucket.go +++ b/cmd/xl-v1-bucket.go @@ -31,11 +31,9 @@ func (xl xlObjects) MakeBucket(bucket string) error { return traceError(BucketNameInvalid{Bucket: bucket}) } - // get a random ID for lock instrumentation. - opsID := getOpsID() - - nsMutex.Lock(bucket, "", opsID) - defer nsMutex.Unlock(bucket, "", opsID) + bucketLock := nsMutex.NewNSLock(bucket, "") + bucketLock.Lock() + defer bucketLock.Unlock() // Initialize sync waitgroup. var wg = &sync.WaitGroup{} @@ -174,11 +172,11 @@ func (xl xlObjects) GetBucketInfo(bucket string) (BucketInfo, error) { if !IsValidBucketName(bucket) { return BucketInfo{}, BucketNameInvalid{Bucket: bucket} } - // get a random ID for lock instrumentation. - opsID := getOpsID() - nsMutex.RLock(bucket, "", opsID) - defer nsMutex.RUnlock(bucket, "", opsID) + bucketLock := nsMutex.NewNSLock(bucket, "") + bucketLock.RLock() + defer bucketLock.RUnlock() + bucketInfo, err := xl.getBucketInfo(bucket) if err != nil { return BucketInfo{}, toObjectErr(err, bucket) @@ -249,11 +247,9 @@ func (xl xlObjects) DeleteBucket(bucket string) error { return BucketNameInvalid{Bucket: bucket} } - // get a random ID for lock instrumentation. - opsID := getOpsID() - - nsMutex.Lock(bucket, "", opsID) - defer nsMutex.Unlock(bucket, "", opsID) + bucketLock := nsMutex.NewNSLock(bucket, "") + bucketLock.Lock() + defer bucketLock.Unlock() // Collect if all disks report volume not found. var wg = &sync.WaitGroup{} diff --git a/cmd/xl-v1-healing.go b/cmd/xl-v1-healing.go index a3da4d10b..21509b9c5 100644 --- a/cmd/xl-v1-healing.go +++ b/cmd/xl-v1-healing.go @@ -32,11 +32,9 @@ func (xl xlObjects) HealBucket(bucket string) error { // Heal bucket - create buckets on disks where it does not exist. - // get a random ID for lock instrumentation. - opsID := getOpsID() - - nsMutex.Lock(bucket, "", opsID) - defer nsMutex.Unlock(bucket, "", opsID) + bucketLock := nsMutex.NewNSLock(bucket, "") + bucketLock.Lock() + defer bucketLock.Unlock() // Initialize sync waitgroup. var wg = &sync.WaitGroup{} @@ -101,12 +99,10 @@ func (xl xlObjects) HealObject(bucket, object string) error { return traceError(ObjectNameInvalid{Bucket: bucket, Object: object}) } - // get a random ID for lock instrumentation. - opsID := getOpsID() - // Lock the object before healing. - nsMutex.RLock(bucket, object, opsID) - defer nsMutex.RUnlock(bucket, object, opsID) + objectLock := nsMutex.NewNSLock(bucket, object) + objectLock.RLock() + defer objectLock.RUnlock() partsMetadata, errs := readAllXLMetadata(xl.storageDisks, bucket, object) if err := reduceErrs(errs, nil); err != nil { diff --git a/cmd/xl-v1-list-objects-heal.go b/cmd/xl-v1-list-objects-heal.go index cb5e3a8b6..df332c8b7 100644 --- a/cmd/xl-v1-list-objects-heal.go +++ b/cmd/xl-v1-list-objects-heal.go @@ -143,11 +143,9 @@ func (xl xlObjects) listObjectsHeal(bucket, prefix, marker, delimiter string, ma continue } - // get a random ID for lock instrumentation. - opsID := getOpsID() - // Check if the current object needs healing - nsMutex.RLock(bucket, objInfo.Name, opsID) + objectLock := nsMutex.NewNSLock(bucket, objInfo.Name) + objectLock.RLock() partsMetadata, errs := readAllXLMetadata(xl.storageDisks, bucket, objInfo.Name) if xlShouldHeal(partsMetadata, errs) { result.Objects = append(result.Objects, ObjectInfo{ @@ -157,7 +155,7 @@ func (xl xlObjects) listObjectsHeal(bucket, prefix, marker, delimiter string, ma IsDir: false, }) } - nsMutex.RUnlock(bucket, objInfo.Name, opsID) + objectLock.RUnlock() } return result, nil } diff --git a/cmd/xl-v1-multipart.go b/cmd/xl-v1-multipart.go index eb32c4275..e9786ed9a 100644 --- a/cmd/xl-v1-multipart.go +++ b/cmd/xl-v1-multipart.go @@ -64,10 +64,10 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark // List all upload ids for the keyMarker starting from // uploadIDMarker first. if uploadIDMarker != "" { - // get a random ID for lock instrumentation. - opsID := getOpsID() - - nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, keyMarker), opsID) + // hold lock on keyMarker path + keyMarkerLock := nsMutex.NewNSLock(minioMetaBucket, + pathJoin(mpartMetaPrefix, bucket, keyMarker)) + keyMarkerLock.RLock() for _, disk := range xl.getLoadBalancedDisks() { if disk == nil { continue @@ -81,7 +81,7 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark } break } - nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, keyMarker), opsID) + keyMarkerLock.RUnlock() if err != nil { return ListMultipartsInfo{}, err } @@ -133,11 +133,11 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark var end bool uploadIDMarker = "" - // get a random ID for lock instrumentation. - opsID := getOpsID() - - // For the new object entry we get all its pending uploadIDs. - nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, entry), opsID) + // For the new object entry we get all its + // pending uploadIDs. + entryLock := nsMutex.NewNSLock(minioMetaBucket, + pathJoin(mpartMetaPrefix, bucket, entry)) + entryLock.RLock() var disk StorageAPI for _, disk = range xl.getLoadBalancedDisks() { if disk == nil { @@ -152,7 +152,7 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark } break } - nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, entry), opsID) + entryLock.RUnlock() if err != nil { if isErrIgnored(err, xlTreeWalkIgnoredErrs) { continue @@ -279,12 +279,12 @@ func (xl xlObjects) newMultipartUpload(bucket string, object string, meta map[st xlMeta.Stat.ModTime = time.Now().UTC() xlMeta.Meta = meta - // get a random ID for lock instrumentation. - opsID := getOpsID() - - // This lock needs to be held for any changes to the directory contents of ".minio.sys/multipart/object/" - nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID) - defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID) + // This lock needs to be held for any changes to the directory + // contents of ".minio.sys/multipart/object/" + objectMPartPathLock := nsMutex.NewNSLock(minioMetaBucket, + pathJoin(mpartMetaPrefix, bucket, object)) + objectMPartPathLock.Lock() + defer objectMPartPathLock.Unlock() uploadID := getUUID() uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) @@ -360,23 +360,22 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s var errs []error uploadIDPath := pathJoin(mpartMetaPrefix, bucket, object, uploadID) - // get a random ID for lock instrumentation. - opsID := getOpsID() - - nsMutex.RLock(minioMetaBucket, uploadIDPath, opsID) + // pre-check upload id lock. + preUploadIDLock := nsMutex.NewNSLock(minioMetaBucket, uploadIDPath) + preUploadIDLock.RLock() // Validates if upload ID exists. if !xl.isUploadIDExists(bucket, object, uploadID) { - nsMutex.RUnlock(minioMetaBucket, uploadIDPath, opsID) + preUploadIDLock.RUnlock() return "", traceError(InvalidUploadID{UploadID: uploadID}) } // Read metadata associated with the object from all disks. partsMetadata, errs = readAllXLMetadata(xl.storageDisks, minioMetaBucket, uploadIDPath) if !isDiskQuorum(errs, xl.writeQuorum) { - nsMutex.RUnlock(minioMetaBucket, uploadIDPath, opsID) + preUploadIDLock.RUnlock() return "", toObjectErr(traceError(errXLWriteQuorum), bucket, object) } - nsMutex.RUnlock(minioMetaBucket, uploadIDPath, opsID) + preUploadIDLock.RUnlock() // List all online disks. onlineDisks, modTime := listOnlineDisks(xl.storageDisks, partsMetadata, errs) @@ -467,13 +466,10 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s } } - // get a random ID for lock instrumentation. - // generates random string on setting MINIO_DEBUG=lock, else returns empty string. - // used for instrumentation on locks. - opsID = getOpsID() - - nsMutex.Lock(minioMetaBucket, uploadIDPath, opsID) - defer nsMutex.Unlock(minioMetaBucket, uploadIDPath, opsID) + // post-upload check (write) lock + postUploadIDLock := nsMutex.NewNSLock(minioMetaBucket, uploadIDPath) + postUploadIDLock.Lock() + defer postUploadIDLock.Unlock() // Validate again if upload ID still exists. if !xl.isUploadIDExists(bucket, object, uploadID) { @@ -617,12 +613,12 @@ func (xl xlObjects) ListObjectParts(bucket, object, uploadID string, partNumberM return ListPartsInfo{}, traceError(ObjectNameInvalid{Bucket: bucket, Object: object}) } - // get a random ID for lock instrumentation. - opsID := getOpsID() - - // Hold lock so that there is no competing abort-multipart-upload or complete-multipart-upload. - nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID) - defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID) + // Hold lock so that there is no competing + // abort-multipart-upload or complete-multipart-upload. + uploadIDLock := nsMutex.NewNSLock(minioMetaBucket, + pathJoin(mpartMetaPrefix, bucket, object, uploadID)) + uploadIDLock.Lock() + defer uploadIDLock.Unlock() if !xl.isUploadIDExists(bucket, object, uploadID) { return ListPartsInfo{}, traceError(InvalidUploadID{UploadID: uploadID}) @@ -653,14 +649,16 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload }) } - // get a random ID for lock instrumentation. - opsID := getOpsID() - // Hold lock so that + // // 1) no one aborts this multipart upload - // 2) no one does a parallel complete-multipart-upload on this multipart upload - nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID) - defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID) + // + // 2) no one does a parallel complete-multipart-upload on this + // multipart upload + uploadIDLock := nsMutex.NewNSLock(minioMetaBucket, + pathJoin(mpartMetaPrefix, bucket, object, uploadID)) + uploadIDLock.Lock() + defer uploadIDLock.Unlock() if !xl.isUploadIDExists(bucket, object, uploadID) { return "", traceError(InvalidUploadID{UploadID: uploadID}) @@ -772,24 +770,22 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload return "", toObjectErr(rErr, minioMetaBucket, uploadIDPath) } - // get a random ID for lock instrumentation. - opsID = getOpsID() - // Hold write lock on the destination before rename. - nsMutex.Lock(bucket, object, opsID) - defer func(curOpsID string) { + destLock := nsMutex.NewNSLock(bucket, object) + destLock.Lock() + defer func() { // A new complete multipart upload invalidates any // previously cached object in memory. xl.objCache.Delete(path.Join(bucket, object)) // This lock also protects the cache namespace. - nsMutex.Unlock(bucket, object, curOpsID) + destLock.Unlock() // Prefetch the object from disk by triggering a fake GetObject call // Unlike a regular single PutObject, multipart PutObject is comes in // stages and it is harder to cache. go xl.GetObject(bucket, object, 0, objectSize, ioutil.Discard) - }(opsID) + }() // Rename if an object already exists to temporary location. uniqueID := getUUID() @@ -824,13 +820,13 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload // Delete the previously successfully renamed object. xl.deleteObject(minioMetaBucket, path.Join(tmpMetaPrefix, uniqueID)) - // get a random ID for lock instrumentation. - opsID = getOpsID() - - // Hold the lock so that two parallel complete-multipart-uploads do not - // leave a stale uploads.json behind. - nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID) - defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID) + // Hold the lock so that two parallel + // complete-multipart-uploads do not leave a stale + // uploads.json behind. + objectMPartPathLock := nsMutex.NewNSLock(minioMetaBucket, + pathJoin(mpartMetaPrefix, bucket, object)) + objectMPartPathLock.Lock() + defer objectMPartPathLock.Unlock() // remove entry from uploads.json with quorum if err = xl.updateUploadJSON(bucket, object, uploadIDChange{uploadID: uploadID, isRemove: true}); err != nil { @@ -851,11 +847,12 @@ func (xl xlObjects) abortMultipartUpload(bucket, object, uploadID string) (err e return toObjectErr(err, bucket, object) } - // get a random ID for lock instrumentation. - opsID := getOpsID() - - nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID) - defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID) + // hold lock so we don't compete with a complete, or abort + // multipart request. + objectMPartPathLock := nsMutex.NewNSLock(minioMetaBucket, + pathJoin(mpartMetaPrefix, bucket, object)) + objectMPartPathLock.Lock() + defer objectMPartPathLock.Unlock() // remove entry from uploads.json with quorum if err = xl.updateUploadJSON(bucket, object, uploadIDChange{uploadID: uploadID, isRemove: true}); err != nil { @@ -889,12 +886,12 @@ func (xl xlObjects) AbortMultipartUpload(bucket, object, uploadID string) error return traceError(ObjectNameInvalid{Bucket: bucket, Object: object}) } - // get a random ID for lock instrumentation. - opsID := getOpsID() - - // Hold lock so that there is no competing complete-multipart-upload or put-object-part. - nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID) - defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID) + // Hold lock so that there is no competing + // complete-multipart-upload or put-object-part. + uploadIDLock := nsMutex.NewNSLock(minioMetaBucket, + pathJoin(mpartMetaPrefix, bucket, object, uploadID)) + uploadIDLock.Lock() + defer uploadIDLock.Unlock() if !xl.isUploadIDExists(bucket, object, uploadID) { return traceError(InvalidUploadID{UploadID: uploadID}) diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index 6f0103173..2db36e9ec 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -59,12 +59,10 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i return traceError(errUnexpected) } - // get a random ID for lock instrumentation. - opsID := getOpsID() - // Lock the object before reading. - nsMutex.RLock(bucket, object, opsID) - defer nsMutex.RUnlock(bucket, object, opsID) + objectLock := nsMutex.NewNSLock(bucket, object) + objectLock.RLock() + defer objectLock.RUnlock() // Read metadata associated with the object from all disks. metaArr, errs := readAllXLMetadata(xl.storageDisks, bucket, object) @@ -227,11 +225,10 @@ func (xl xlObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) { return ObjectInfo{}, ObjectNameInvalid{Bucket: bucket, Object: object} } - // get a random ID for lock instrumentation. - opsID := getOpsID() + objectLock := nsMutex.NewNSLock(bucket, object) + objectLock.RLock() + defer objectLock.RUnlock() - nsMutex.RLock(bucket, object, opsID) - defer nsMutex.RUnlock(bucket, object, opsID) info, err := xl.getObjectInfo(bucket, object) if err != nil { return ObjectInfo{}, toObjectErr(err, bucket, object) @@ -508,14 +505,10 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. } } - // get a random ID for lock instrumentation. - // generates random string on setting MINIO_DEBUG=lock, else returns empty string. - // used for instrumentation on locks. - opsID := getOpsID() - // Lock the object. - nsMutex.Lock(bucket, object, opsID) - defer nsMutex.Unlock(bucket, object, opsID) + objectLock := nsMutex.NewNSLock(bucket, object) + objectLock.Lock() + defer objectLock.Unlock() // Check if an object is present as one of the parent dir. // -- FIXME. (needs a new kind of lock). @@ -637,11 +630,9 @@ func (xl xlObjects) DeleteObject(bucket, object string) (err error) { return traceError(ObjectNameInvalid{Bucket: bucket, Object: object}) } - // get a random ID for lock instrumentation. - opsID := getOpsID() - - nsMutex.Lock(bucket, object, opsID) - defer nsMutex.Unlock(bucket, object, opsID) + objectLock := nsMutex.NewNSLock(bucket, object) + objectLock.Lock() + defer objectLock.Unlock() // Validate object exists. if !xl.isObject(bucket, object) {