From 4daa0d2cee9cf4b0ec7a443416a4c453779eec98 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Sat, 10 Dec 2016 16:15:12 -0800 Subject: [PATCH] lock: Moving locking to handler layer. (#3381) This is implemented so that the issues like in the following flow don't affect the behavior of operation. ``` GetObjectInfo() .... --> Time window for mutation (no lock held) .... --> Time window for mutation (no lock held) GetObject() ``` This happens when two simultaneous uploads are made to the same object the object has returned wrong info to the client. Another classic example is "CopyObject" API itself which reads from a source object and copies to destination object. Fixes #3370 Fixes #2912 --- cmd/bucket-handlers.go | 16 ++++ cmd/bucket-notification-handlers.go | 18 +--- cmd/bucket-policy-handlers.go | 53 +---------- cmd/bucket-policy.go | 71 ++++++++++++++ cmd/event-notifier.go | 60 ++++++++++-- cmd/fs-v1-multipart.go | 25 ++--- cmd/fs-v1.go | 16 ---- cmd/lock-instrument_test.go | 138 ++++++++++++++-------------- cmd/lockinfo-handlers.go | 12 +-- cmd/namespace-lock.go | 27 +++--- cmd/namespace-lock_test.go | 48 +++++----- cmd/object-handlers.go | 34 +++++++ cmd/web-handlers.go | 50 +++++----- cmd/xl-v1-bucket.go | 12 --- cmd/xl-v1-healing.go | 6 +- cmd/xl-v1-list-objects-heal.go | 2 +- cmd/xl-v1-multipart.go | 28 ++---- cmd/xl-v1-object.go | 18 ---- 18 files changed, 337 insertions(+), 297 deletions(-) diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index c5344227a..afbc3dded 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -334,6 +334,10 @@ func (api objectAPIHandlers) PutBucketHandler(w http.ResponseWriter, r *http.Req return } + bucketLock := globalNSMutex.NewNSLock(bucket, "") + bucketLock.Lock() + defer bucketLock.Unlock() + // Proceed to creating a bucket. err := objectAPI.MakeBucket(bucket) if err != nil { @@ -431,6 +435,10 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h sha256sum := "" + objectLock := globalNSMutex.NewNSLock(bucket, object) + objectLock.Lock() + defer objectLock.Unlock() + objInfo, err := objectAPI.PutObject(bucket, object, -1, fileBody, metadata, sha256sum) if err != nil { errorIf(err, "Unable to create object.") @@ -478,6 +486,10 @@ func (api objectAPIHandlers) HeadBucketHandler(w http.ResponseWriter, r *http.Re return } + bucketLock := globalNSMutex.NewNSLock(bucket, "") + bucketLock.RLock() + defer bucketLock.RUnlock() + if _, err := objectAPI.GetBucketInfo(bucket); err != nil { errorIf(err, "Unable to fetch bucket info.") writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path) @@ -503,6 +515,10 @@ func (api objectAPIHandlers) DeleteBucketHandler(w http.ResponseWriter, r *http. vars := mux.Vars(r) bucket := vars["bucket"] + bucketLock := globalNSMutex.NewNSLock(bucket, "") + bucketLock.Lock() + defer bucketLock.Unlock() + // Attempt to delete bucket. if err := objectAPI.DeleteBucket(bucket); err != nil { errorIf(err, "Unable to delete a bucket.") diff --git a/cmd/bucket-notification-handlers.go b/cmd/bucket-notification-handlers.go index ba4ed2c71..0a3b9e8d8 100644 --- a/cmd/bucket-notification-handlers.go +++ b/cmd/bucket-notification-handlers.go @@ -23,7 +23,6 @@ import ( "fmt" "io" "net/http" - "path" "time" "github.com/gorilla/mux" @@ -174,7 +173,7 @@ func PutBucketNotificationConfig(bucket string, ncfg *notificationConfig, objAPI // Acquire a write lock on bucket before modifying its // configuration. - bucketLock := nsMutex.NewNSLock(bucket, "") + bucketLock := globalNSMutex.NewNSLock(bucket, "") bucketLock.Lock() // Release lock after notifying peers defer bucketLock.Unlock() @@ -381,7 +380,7 @@ func AddBucketListenerConfig(bucket string, lcfg *listenerConfig, objAPI ObjectL // Acquire a write lock on bucket before modifying its // configuration. - bucketLock := nsMutex.NewNSLock(bucket, "") + bucketLock := globalNSMutex.NewNSLock(bucket, "") bucketLock.Lock() // Release lock after notifying peers defer bucketLock.Unlock() @@ -423,7 +422,7 @@ func RemoveBucketListenerConfig(bucket string, lcfg *listenerConfig, objAPI Obje // Acquire a write lock on bucket before modifying its // configuration. - bucketLock := nsMutex.NewNSLock(bucket, "") + bucketLock := globalNSMutex.NewNSLock(bucket, "") bucketLock.Lock() // Release lock after notifying peers defer bucketLock.Unlock() @@ -441,14 +440,3 @@ func RemoveBucketListenerConfig(bucket string, lcfg *listenerConfig, objAPI Obje // peers (including local) S3PeersUpdateBucketListener(bucket, updatedLcfgs) } - -// Removes notification.xml for a given bucket, only used during DeleteBucket. -func removeNotificationConfig(bucket string, objAPI ObjectLayer) error { - // Verify bucket is valid. - if !IsValidBucketName(bucket) { - return BucketNameInvalid{Bucket: bucket} - } - - notificationConfigPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig) - return objAPI.DeleteObject(minioMetaBucket, notificationConfigPath) -} diff --git a/cmd/bucket-policy-handlers.go b/cmd/bucket-policy-handlers.go index 95ef0b900..50589059f 100644 --- a/cmd/bucket-policy-handlers.go +++ b/cmd/bucket-policy-handlers.go @@ -17,7 +17,6 @@ package cmd import ( - "bytes" "fmt" "io" "io/ioutil" @@ -166,65 +165,17 @@ func (api objectAPIHandlers) PutBucketPolicyHandler(w http.ResponseWriter, r *ht writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path) return } - // Parse bucket policy. - var policy = &bucketPolicy{} - err = parseBucketPolicy(bytes.NewReader(policyBytes), policy) - if err != nil { - errorIf(err, "Unable to parse bucket policy.") - writeErrorResponse(w, r, ErrInvalidPolicyDocument, r.URL.Path) - return - } - // Parse check bucket policy. - if s3Error := checkBucketPolicyResources(bucket, policy); s3Error != ErrNone { + // Parse validate and save bucket policy. + if s3Error := parseAndPersistBucketPolicy(bucket, policyBytes, objAPI); s3Error != ErrNone { writeErrorResponse(w, r, s3Error, r.URL.Path) return } - // Save bucket policy. - if err = persistAndNotifyBucketPolicyChange(bucket, policyChange{false, policy}, objAPI); err != nil { - switch err.(type) { - case BucketNameInvalid: - writeErrorResponse(w, r, ErrInvalidBucketName, r.URL.Path) - default: - writeErrorResponse(w, r, ErrInternalError, r.URL.Path) - } - return - } - // Success. writeSuccessNoContent(w) } -// persistAndNotifyBucketPolicyChange - takes a policyChange argument, -// persists it to storage, and notify nodes in the cluster about the -// change. In-memory state is updated in response to the notification. -func persistAndNotifyBucketPolicyChange(bucket string, pCh policyChange, objAPI ObjectLayer) error { - // Acquire a write lock on bucket before modifying its - // configuration. - bucketLock := nsMutex.NewNSLock(bucket, "") - bucketLock.Lock() - // Release lock after notifying peers - defer bucketLock.Unlock() - - if pCh.IsRemove { - if err := removeBucketPolicy(bucket, objAPI); err != nil { - return err - } - } else { - if pCh.BktPolicy == nil { - return errInvalidArgument - } - if err := writeBucketPolicy(bucket, objAPI, pCh.BktPolicy); err != nil { - return err - } - } - - // Notify all peers (including self) to update in-memory state - S3PeersUpdateBucketPolicy(bucket, pCh) - return nil -} - // DeleteBucketPolicyHandler - DELETE Bucket policy // ----------------- // This implementation of the DELETE operation uses the policy diff --git a/cmd/bucket-policy.go b/cmd/bucket-policy.go index 8261caa6b..07d9ca05f 100644 --- a/cmd/bucket-policy.go +++ b/cmd/bucket-policy.go @@ -144,6 +144,12 @@ func getOldBucketsConfigPath() (string, error) { // if bucket policy is not found. func readBucketPolicyJSON(bucket string, objAPI ObjectLayer) (bucketPolicyReader io.Reader, err error) { policyPath := pathJoin(bucketConfigPrefix, bucket, policyJSON) + + // Acquire a read lock on policy config before reading. + objLock := globalNSMutex.NewNSLock(minioMetaBucket, policyPath) + objLock.RLock() + defer objLock.RUnlock() + objInfo, err := objAPI.GetObjectInfo(minioMetaBucket, policyPath) if err != nil { if isErrObjectNotFound(err) || isErrIncompleteBody(err) { @@ -188,6 +194,10 @@ func readBucketPolicy(bucket string, objAPI ObjectLayer) (*bucketPolicy, error) // if no policies are found. func removeBucketPolicy(bucket string, objAPI ObjectLayer) error { policyPath := pathJoin(bucketConfigPrefix, bucket, policyJSON) + // Acquire a write lock on policy config before modifying. + objLock := globalNSMutex.NewNSLock(minioMetaBucket, policyPath) + objLock.Lock() + defer objLock.Unlock() if err := objAPI.DeleteObject(minioMetaBucket, policyPath); err != nil { errorIf(err, "Unable to remove bucket-policy on bucket %s.", bucket) err = errorCause(err) @@ -207,9 +217,70 @@ func writeBucketPolicy(bucket string, objAPI ObjectLayer, bpy *bucketPolicy) err return err } policyPath := pathJoin(bucketConfigPrefix, bucket, policyJSON) + // Acquire a write lock on policy config before modifying. + objLock := globalNSMutex.NewNSLock(minioMetaBucket, policyPath) + objLock.Lock() + defer objLock.Unlock() if _, err := objAPI.PutObject(minioMetaBucket, policyPath, int64(len(buf)), bytes.NewReader(buf), nil, ""); err != nil { errorIf(err, "Unable to set policy for the bucket %s", bucket) return errorCause(err) } return nil } + +func parseAndPersistBucketPolicy(bucket string, policyBytes []byte, objAPI ObjectLayer) APIErrorCode { + // Parse bucket policy. + var policy = &bucketPolicy{} + err := parseBucketPolicy(bytes.NewReader(policyBytes), policy) + if err != nil { + errorIf(err, "Unable to parse bucket policy.") + return ErrInvalidPolicyDocument + } + + // Parse check bucket policy. + if s3Error := checkBucketPolicyResources(bucket, policy); s3Error != ErrNone { + return s3Error + } + + // Acquire a write lock on bucket before modifying its configuration. + bucketLock := globalNSMutex.NewNSLock(bucket, "") + bucketLock.Lock() + // Release lock after notifying peers + defer bucketLock.Unlock() + + // Save bucket policy. + if err = persistAndNotifyBucketPolicyChange(bucket, policyChange{false, policy}, objAPI); err != nil { + switch err.(type) { + case BucketNameInvalid: + return ErrInvalidBucketName + case BucketNotFound: + return ErrNoSuchBucket + default: + errorIf(err, "Unable to save bucket policy.") + return ErrInternalError + } + } + return ErrNone +} + +// persistAndNotifyBucketPolicyChange - takes a policyChange argument, +// persists it to storage, and notify nodes in the cluster about the +// change. In-memory state is updated in response to the notification. +func persistAndNotifyBucketPolicyChange(bucket string, pCh policyChange, objAPI ObjectLayer) error { + if pCh.IsRemove { + if err := removeBucketPolicy(bucket, objAPI); err != nil { + return err + } + } else { + if pCh.BktPolicy == nil { + return errInvalidArgument + } + if err := writeBucketPolicy(bucket, objAPI, pCh.BktPolicy); err != nil { + return err + } + } + + // Notify all peers (including self) to update in-memory state + S3PeersUpdateBucketPolicy(bucket, pCh) + return nil +} diff --git a/cmd/event-notifier.go b/cmd/event-notifier.go index 5c2794413..b8d697d3f 100644 --- a/cmd/event-notifier.go +++ b/cmd/event-notifier.go @@ -301,8 +301,14 @@ func eventNotify(event eventData) { // structured notification config. func loadNotificationConfig(bucket string, objAPI ObjectLayer) (*notificationConfig, error) { // Construct the notification config path. - notificationConfigPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig) - objInfo, err := objAPI.GetObjectInfo(minioMetaBucket, notificationConfigPath) + ncPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig) + + // Acquire a write lock on notification config before modifying. + objLock := globalNSMutex.NewNSLock(minioMetaBucket, ncPath) + objLock.RLock() + defer objLock.RUnlock() + + objInfo, err := objAPI.GetObjectInfo(minioMetaBucket, ncPath) if err != nil { // 'notification.xml' not found return // 'errNoSuchNotifications'. This is default when no @@ -315,7 +321,7 @@ func loadNotificationConfig(bucket string, objAPI ObjectLayer) (*notificationCon return nil, err } var buffer bytes.Buffer - err = objAPI.GetObject(minioMetaBucket, notificationConfigPath, 0, objInfo.Size, &buffer) + err = objAPI.GetObject(minioMetaBucket, ncPath, 0, objInfo.Size, &buffer) if err != nil { // 'notification.xml' not found return // 'errNoSuchNotifications'. This is default when no @@ -350,8 +356,14 @@ func loadListenerConfig(bucket string, objAPI ObjectLayer) ([]listenerConfig, er } // Construct the notification config path. - listenerConfigPath := path.Join(bucketConfigPrefix, bucket, bucketListenerConfig) - objInfo, err := objAPI.GetObjectInfo(minioMetaBucket, listenerConfigPath) + lcPath := path.Join(bucketConfigPrefix, bucket, bucketListenerConfig) + + // Acquire a write lock on notification config before modifying. + objLock := globalNSMutex.NewNSLock(minioMetaBucket, lcPath) + objLock.RLock() + defer objLock.RUnlock() + + objInfo, err := objAPI.GetObjectInfo(minioMetaBucket, lcPath) if err != nil { // 'listener.json' not found return // 'errNoSuchNotifications'. This is default when no @@ -364,7 +376,7 @@ func loadListenerConfig(bucket string, objAPI ObjectLayer) ([]listenerConfig, er return nil, err } var buffer bytes.Buffer - err = objAPI.GetObject(minioMetaBucket, listenerConfigPath, 0, objInfo.Size, &buffer) + err = objAPI.GetObject(minioMetaBucket, lcPath, 0, objInfo.Size, &buffer) if err != nil { // 'notification.xml' not found return // 'errNoSuchNotifications'. This is default when no @@ -399,6 +411,11 @@ func persistNotificationConfig(bucket string, ncfg *notificationConfig, obj Obje // build path ncPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig) + // Acquire a write lock on notification config before modifying. + objLock := globalNSMutex.NewNSLock(minioMetaBucket, ncPath) + objLock.Lock() + defer objLock.Unlock() + // write object to path sha256Sum := getSHA256Hash(buf) _, err = obj.PutObject(minioMetaBucket, ncPath, int64(len(buf)), bytes.NewReader(buf), nil, sha256Sum) @@ -419,6 +436,11 @@ func persistListenerConfig(bucket string, lcfg []listenerConfig, obj ObjectLayer // build path lcPath := path.Join(bucketConfigPrefix, bucket, bucketListenerConfig) + // Acquire a write lock on notification config before modifying. + objLock := globalNSMutex.NewNSLock(minioMetaBucket, lcPath) + objLock.Lock() + defer objLock.Unlock() + // write object to path sha256Sum := getSHA256Hash(buf) _, err = obj.PutObject(minioMetaBucket, lcPath, int64(len(buf)), bytes.NewReader(buf), nil, sha256Sum) @@ -428,12 +450,34 @@ func persistListenerConfig(bucket string, lcfg []listenerConfig, obj ObjectLayer return err } +// Removes notification.xml for a given bucket, only used during DeleteBucket. +func removeNotificationConfig(bucket string, objAPI ObjectLayer) error { + // Verify bucket is valid. + if !IsValidBucketName(bucket) { + return BucketNameInvalid{Bucket: bucket} + } + + ncPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig) + + // Acquire a write lock on notification config before modifying. + objLock := globalNSMutex.NewNSLock(minioMetaBucket, ncPath) + objLock.Lock() + err := objAPI.DeleteObject(minioMetaBucket, ncPath) + objLock.Unlock() + return err +} + // Remove listener configuration from storage layer. Used when a bucket is deleted. func removeListenerConfig(bucket string, objAPI ObjectLayer) error { // make the path lcPath := path.Join(bucketConfigPrefix, bucket, bucketListenerConfig) - // remove it - return objAPI.DeleteObject(minioMetaBucket, lcPath) + + // Acquire a write lock on notification config before modifying. + objLock := globalNSMutex.NewNSLock(minioMetaBucket, lcPath) + objLock.Lock() + err := objAPI.DeleteObject(minioMetaBucket, lcPath) + objLock.Unlock() + return err } // Loads both notification and listener config. diff --git a/cmd/fs-v1-multipart.go b/cmd/fs-v1-multipart.go index 8ae6d554c..4f7e45dc3 100644 --- a/cmd/fs-v1-multipart.go +++ b/cmd/fs-v1-multipart.go @@ -57,7 +57,7 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark var err error var eof bool if uploadIDMarker != "" { - keyMarkerLock := nsMutex.NewNSLock(minioMetaMultipartBucket, + keyMarkerLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, keyMarker)) keyMarkerLock.RLock() uploads, _, err = listMultipartUploadIDs(bucket, keyMarker, uploadIDMarker, maxUploads, fs.storage) @@ -112,7 +112,7 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark var end bool uploadIDMarker = "" - entryLock := nsMutex.NewNSLock(minioMetaMultipartBucket, + entryLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, entry)) entryLock.RLock() tmpUploads, end, err = listMultipartUploadIDs(bucket, entry, uploadIDMarker, maxUploads, fs.storage) @@ -192,7 +192,7 @@ func (fs fsObjects) newMultipartUpload(bucket string, object string, meta map[st // This lock needs to be held for any changes to the directory // contents of ".minio.sys/multipart/object/" - objectMPartPathLock := nsMutex.NewNSLock(minioMetaMultipartBucket, + objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object)) objectMPartPathLock.Lock() defer objectMPartPathLock.Unlock() @@ -248,7 +248,7 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s uploadIDPath := path.Join(bucket, object, uploadID) - preUploadIDLock := nsMutex.NewNSLock(minioMetaMultipartBucket, uploadIDPath) + preUploadIDLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, uploadIDPath) preUploadIDLock.RLock() // Just check if the uploadID exists to avoid copy if it doesn't. uploadIDExists := fs.isUploadIDExists(bucket, object, uploadID) @@ -329,7 +329,7 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s } // Hold write lock as we are updating fs.json - postUploadIDLock := nsMutex.NewNSLock(minioMetaMultipartBucket, uploadIDPath) + postUploadIDLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, uploadIDPath) postUploadIDLock.Lock() defer postUploadIDLock.Unlock() @@ -348,7 +348,7 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s partPath := path.Join(bucket, object, uploadID, partSuffix) // Lock the part so that another part upload with same part-number gets blocked // while the part is getting appended in the background. - partLock := nsMutex.NewNSLock(minioMetaMultipartBucket, partPath) + partLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, partPath) partLock.Lock() err = fs.storage.RenameFile(minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath) if err != nil { @@ -439,7 +439,7 @@ func (fs fsObjects) ListObjectParts(bucket, object, uploadID string, partNumberM // Hold lock so that there is no competing // abort-multipart-upload or complete-multipart-upload. - uploadIDLock := nsMutex.NewNSLock(minioMetaMultipartBucket, + uploadIDLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object, uploadID)) uploadIDLock.Lock() defer uploadIDLock.Unlock() @@ -479,7 +479,7 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload // 1) no one aborts this multipart upload // 2) no one does a parallel complete-multipart-upload on this // multipart upload - uploadIDLock := nsMutex.NewNSLock(minioMetaMultipartBucket, uploadIDPath) + uploadIDLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, uploadIDPath) uploadIDLock.Lock() defer uploadIDLock.Unlock() @@ -502,14 +502,11 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload // This lock is held during rename of the appended tmp file to the actual // location so that any competing GetObject/PutObject/DeleteObject do not race. - objectLock := nsMutex.NewNSLock(bucket, object) appendFallback := true // In case background-append did not append the required parts. if isPartsSame(fsMeta.Parts, parts) { err = fs.bgAppend.complete(fs.storage, bucket, object, uploadID, fsMeta) if err == nil { appendFallback = false - objectLock.Lock() - defer objectLock.Unlock() if err = fs.storage.RenameFile(minioMetaTmpBucket, uploadID, bucket, object); err != nil { return "", toObjectErr(traceError(err), minioMetaTmpBucket, uploadID) } @@ -584,8 +581,6 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload } } - objectLock.Lock() - defer objectLock.Unlock() // Rename the file back to original location, if not delete the temporary object. err = fs.storage.RenameFile(minioMetaTmpBucket, tempObj, bucket, object) if err != nil { @@ -619,7 +614,7 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload // Hold the lock so that two parallel // complete-multipart-uploads do not leave a stale // uploads.json behind. - objectMPartPathLock := nsMutex.NewNSLock(minioMetaMultipartBucket, + objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object)) objectMPartPathLock.Lock() defer objectMPartPathLock.Unlock() @@ -672,7 +667,7 @@ func (fs fsObjects) AbortMultipartUpload(bucket, object, uploadID string) error // Hold lock so that there is no competing // complete-multipart-upload or put-object-part. - uploadIDLock := nsMutex.NewNSLock(minioMetaMultipartBucket, + uploadIDLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object, uploadID)) uploadIDLock.Lock() defer uploadIDLock.Unlock() diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index 4d3d933b6..ead0c5745 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -246,11 +246,6 @@ func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64, return traceError(InvalidRange{offset, length, fi.Size}) } - // Lock the object before reading. - objectLock := nsMutex.NewNSLock(bucket, object) - objectLock.RLock() - defer objectLock.RUnlock() - var totalLeft = length bufSize := int64(readSizeV1) if length > 0 && bufSize > length { @@ -450,11 +445,6 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io. } } - // Lock the object before committing the object. - objectLock := nsMutex.NewNSLock(bucket, object) - objectLock.RLock() - defer objectLock.RUnlock() - // Entire object was written to the temp location, now it's safe to rename it to the actual location. err = fs.storage.RenameFile(minioMetaTmpBucket, tempObj, bucket, object) if err != nil { @@ -484,12 +474,6 @@ func (fs fsObjects) DeleteObject(bucket, object string) error { return err } - // Lock the object before deleting so that an in progress GetObject does not return - // corrupt data or there is no race with a PutObject. - objectLock := nsMutex.NewNSLock(bucket, object) - objectLock.RLock() - defer objectLock.RUnlock() - if bucket != minioMetaBucket { // We don't store fs.json for minio-S3-layer created files like policy.json, // hence we don't try to delete fs.json for such files. diff --git a/cmd/lock-instrument_test.go b/cmd/lock-instrument_test.go index 89c3ee39c..fe33341a1 100644 --- a/cmd/lock-instrument_test.go +++ b/cmd/lock-instrument_test.go @@ -119,26 +119,26 @@ func verifyRPCLockInfoResponse(l lockStateCase, rpcLockInfoMap map[string]*Syste } } -// Asserts the lock counter from the global nsMutex inmemory lock with the expected one. +// Asserts the lock counter from the global globalNSMutex inmemory lock with the expected one. func verifyGlobalLockStats(l lockStateCase, t *testing.T, testNum int) { - nsMutex.lockMapMutex.Lock() + globalNSMutex.lockMapMutex.Lock() // Verifying the lock stats. - if nsMutex.globalLockCounter != int64(l.expectedGlobalLockCount) { + if globalNSMutex.globalLockCounter != int64(l.expectedGlobalLockCount) { t.Errorf("Test %d: Expected the global lock counter to be %v, but got %v", testNum, int64(l.expectedGlobalLockCount), - nsMutex.globalLockCounter) + globalNSMutex.globalLockCounter) } // verify the count for total blocked locks. - if nsMutex.blockedCounter != int64(l.expectedBlockedLockCount) { + if globalNSMutex.blockedCounter != int64(l.expectedBlockedLockCount) { t.Errorf("Test %d: Expected the total blocked lock counter to be %v, but got %v", testNum, int64(l.expectedBlockedLockCount), - nsMutex.blockedCounter) + globalNSMutex.blockedCounter) } // verify the count for total running locks. - if nsMutex.runningLockCounter != int64(l.expectedRunningLockCount) { + if globalNSMutex.runningLockCounter != int64(l.expectedRunningLockCount) { t.Errorf("Test %d: Expected the total running lock counter to be %v, but got %v", testNum, int64(l.expectedRunningLockCount), - nsMutex.runningLockCounter) + globalNSMutex.runningLockCounter) } - nsMutex.lockMapMutex.Unlock() + globalNSMutex.lockMapMutex.Unlock() // Verifying again with the JSON response of the lock info. // Verifying the lock stats. sysLockState, err := getSystemLockState() @@ -164,35 +164,35 @@ func verifyGlobalLockStats(l lockStateCase, t *testing.T, testNum int) { // Verify the lock counter for entries of given pair. func verifyLockStats(l lockStateCase, t *testing.T, testNum int) { - nsMutex.lockMapMutex.Lock() - defer nsMutex.lockMapMutex.Unlock() + globalNSMutex.lockMapMutex.Lock() + defer globalNSMutex.lockMapMutex.Unlock() param := nsParam{l.volume, l.path} // Verify the total locks (blocked+running) for given pair. - if nsMutex.debugLockMap[param].ref != int64(l.expectedVolPathLockCount) { + if globalNSMutex.debugLockMap[param].ref != int64(l.expectedVolPathLockCount) { t.Errorf("Test %d: Expected the total lock count for volume: \"%s\", path: \"%s\" to be %v, but got %v", testNum, - param.volume, param.path, int64(l.expectedVolPathLockCount), nsMutex.debugLockMap[param].ref) + param.volume, param.path, int64(l.expectedVolPathLockCount), globalNSMutex.debugLockMap[param].ref) } // Verify the total running locks for given pair. - if nsMutex.debugLockMap[param].running != int64(l.expectedVolPathRunningCount) { + if globalNSMutex.debugLockMap[param].running != int64(l.expectedVolPathRunningCount) { t.Errorf("Test %d: Expected the total running locks for volume: \"%s\", path: \"%s\" to be %v, but got %v", testNum, param.volume, param.path, - int64(l.expectedVolPathRunningCount), nsMutex.debugLockMap[param].running) + int64(l.expectedVolPathRunningCount), globalNSMutex.debugLockMap[param].running) } // Verify the total blocked locks for givne pair. - if nsMutex.debugLockMap[param].blocked != int64(l.expectedVolPathBlockCount) { + if globalNSMutex.debugLockMap[param].blocked != int64(l.expectedVolPathBlockCount) { t.Errorf("Test %d: Expected the total blocked locks for volume: \"%s\", path: \"%s\" to be %v, but got %v", testNum, param.volume, param.path, - int64(l.expectedVolPathBlockCount), nsMutex.debugLockMap[param].blocked) + int64(l.expectedVolPathBlockCount), globalNSMutex.debugLockMap[param].blocked) } } -// verifyLockState - function which asserts the expected lock info in the system with the actual values in the nsMutex. +// verifyLockState - function which asserts the expected lock info in the system with the actual values in the globalNSMutex. func verifyLockState(l lockStateCase, t *testing.T, testNum int) { param := nsParam{l.volume, l.path} verifyGlobalLockStats(l, t, testNum) - nsMutex.lockMapMutex.Lock() + globalNSMutex.lockMapMutex.Lock() // Verifying the lock statuS fields. - if debugLockMap, ok := nsMutex.debugLockMap[param]; ok { + if debugLockMap, ok := globalNSMutex.debugLockMap[param]; ok { if lockInfo, ok := debugLockMap.lockInfo[l.opsID]; ok { // Validating the lock type filed in the debug lock information. if l.readLock { @@ -222,7 +222,7 @@ func verifyLockState(l lockStateCase, t *testing.T, testNum int) { t.Errorf("Test case %d: Debug lock entry for volume: %s, path: %s doesn't exist", testNum, param.volume, param.path) } // verifyLockStats holds its own lock. - nsMutex.lockMapMutex.Unlock() + globalNSMutex.lockMapMutex.Unlock() // verify the lock count. verifyLockStats(l, t, testNum) @@ -319,7 +319,7 @@ func TestNsLockMapStatusBlockedToRunning(t *testing.T) { param := nsParam{testCases[0].volume, testCases[0].path} // Testing before the initialization done. // Since the data structures for - actualErr := nsMutex.statusBlockedToRunning(param, testCases[0].lockSource, + actualErr := globalNSMutex.statusBlockedToRunning(param, testCases[0].lockSource, testCases[0].opsID, testCases[0].readLock) expectedErr := LockInfoVolPathMissing{testCases[0].volume, testCases[0].path} @@ -327,14 +327,14 @@ func TestNsLockMapStatusBlockedToRunning(t *testing.T) { t.Fatalf("Errors mismatch: Expected \"%s\", got \"%s\"", expectedErr, actualErr) } - nsMutex = &nsLockMap{ + globalNSMutex = &nsLockMap{ // entries of -> stateInfo of locks, for instrumentation purpose. debugLockMap: make(map[nsParam]*debugLockInfoPerVolumePath), lockMap: make(map[nsParam]*nsLock), } // Entry for pair is set to nil. Should fail with `errLockNotInitialized`. - nsMutex.debugLockMap[param] = nil - actualErr = nsMutex.statusBlockedToRunning(param, testCases[0].lockSource, + globalNSMutex.debugLockMap[param] = nil + actualErr = globalNSMutex.statusBlockedToRunning(param, testCases[0].lockSource, testCases[0].opsID, testCases[0].readLock) if errorCause(actualErr) != errLockNotInitialized { @@ -342,14 +342,14 @@ func TestNsLockMapStatusBlockedToRunning(t *testing.T) { } // Setting the lock info the be `nil`. - nsMutex.debugLockMap[param] = &debugLockInfoPerVolumePath{ + globalNSMutex.debugLockMap[param] = &debugLockInfoPerVolumePath{ lockInfo: nil, // setting the lockinfo to nil. ref: 0, blocked: 0, running: 0, } - actualErr = nsMutex.statusBlockedToRunning(param, testCases[0].lockSource, + actualErr = globalNSMutex.statusBlockedToRunning(param, testCases[0].lockSource, testCases[0].opsID, testCases[0].readLock) expectedOpsErr := LockInfoOpsIDNotFound{testCases[0].volume, testCases[0].path, testCases[0].opsID} @@ -359,7 +359,7 @@ func TestNsLockMapStatusBlockedToRunning(t *testing.T) { // Next case: ase whether an attempt to change the state of the lock to "Running" done, // but the initial state if already "Running". Such an attempt should fail - nsMutex.debugLockMap[param] = &debugLockInfoPerVolumePath{ + globalNSMutex.debugLockMap[param] = &debugLockInfoPerVolumePath{ lockInfo: make(map[string]debugLockInfo), ref: 0, blocked: 0, @@ -368,13 +368,13 @@ func TestNsLockMapStatusBlockedToRunning(t *testing.T) { // Setting the status of the lock to be "Running". // The initial state of the lock should set to "Blocked", otherwise its not possible to change the state from "Blocked" -> "Running". - nsMutex.debugLockMap[param].lockInfo[testCases[0].opsID] = debugLockInfo{ + globalNSMutex.debugLockMap[param].lockInfo[testCases[0].opsID] = debugLockInfo{ lockSource: "/home/vadmeste/work/go/src/github.com/minio/minio/xl-v1-object.go:683 +0x2a", status: "Running", // State set to "Running". Should fail with `LockInfoStateNotBlocked`. since: time.Now().UTC(), } - actualErr = nsMutex.statusBlockedToRunning(param, testCases[0].lockSource, + actualErr = globalNSMutex.statusBlockedToRunning(param, testCases[0].lockSource, testCases[0].opsID, testCases[0].readLock) expectedBlockErr := LockInfoStateNotBlocked{testCases[0].volume, testCases[0].path, testCases[0].opsID} @@ -390,22 +390,22 @@ func TestNsLockMapStatusBlockedToRunning(t *testing.T) { param := nsParam{testCase.volume, testCase.path} // status of the lock to be set to "Blocked", before setting Blocked->Running. if testCase.setBlocked { - nsMutex.lockMapMutex.Lock() - err := nsMutex.statusNoneToBlocked(param, testCase.lockSource, testCase.opsID, testCase.readLock) + globalNSMutex.lockMapMutex.Lock() + err := globalNSMutex.statusNoneToBlocked(param, testCase.lockSource, testCase.opsID, testCase.readLock) if err != nil { t.Fatalf("Test %d: Initializing the initial state to Blocked failed %s", i+1, err) } - nsMutex.lockMapMutex.Unlock() + globalNSMutex.lockMapMutex.Unlock() } // invoking the method under test. - actualErr = nsMutex.statusBlockedToRunning(param, testCase.lockSource, testCase.opsID, testCase.readLock) + actualErr = globalNSMutex.statusBlockedToRunning(param, testCase.lockSource, testCase.opsID, testCase.readLock) if errorCause(actualErr) != testCase.expectedErr { t.Fatalf("Test %d: Errors mismatch: Expected: \"%s\", got: \"%s\"", i+1, testCase.expectedErr, actualErr) } // In case of no error proceed with validating the lock state information. if actualErr == nil { // debug entry for given pair should exist. - if debugLockMap, ok := nsMutex.debugLockMap[param]; ok { + if debugLockMap, ok := globalNSMutex.debugLockMap[param]; ok { if lockInfo, ok := debugLockMap.lockInfo[testCase.opsID]; ok { // Validating the lock type filed in the debug lock information. if testCase.readLock { @@ -514,7 +514,7 @@ func TestNsLockMapStatusNoneToBlocked(t *testing.T) { param := nsParam{testCases[0].volume, testCases[0].path} // Testing before the initialization done. // Since the data structures for - actualErr := nsMutex.statusBlockedToRunning(param, testCases[0].lockSource, + actualErr := globalNSMutex.statusBlockedToRunning(param, testCases[0].lockSource, testCases[0].opsID, testCases[0].readLock) expectedErr := LockInfoVolPathMissing{testCases[0].volume, testCases[0].path} @@ -524,13 +524,13 @@ func TestNsLockMapStatusNoneToBlocked(t *testing.T) { // Iterate over the cases and assert the result. for i, testCase := range testCases { - nsMutex.lockMapMutex.Lock() + globalNSMutex.lockMapMutex.Lock() param := nsParam{testCase.volume, testCase.path} - actualErr := nsMutex.statusNoneToBlocked(param, testCase.lockSource, testCase.opsID, testCase.readLock) + actualErr := globalNSMutex.statusNoneToBlocked(param, testCase.lockSource, testCase.opsID, testCase.readLock) if actualErr != testCase.expectedErr { t.Fatalf("Test %d: Errors mismatch: Expected: \"%s\", got: \"%s\"", i+1, testCase.expectedErr, actualErr) } - nsMutex.lockMapMutex.Unlock() + globalNSMutex.lockMapMutex.Unlock() if actualErr == nil { verifyLockState(testCase, t, i+1) } @@ -559,7 +559,7 @@ func TestNsLockMapDeleteLockInfoEntryForOps(t *testing.T) { param := nsParam{testCases[0].volume, testCases[0].path} // Testing before the initialization done. - actualErr := nsMutex.deleteLockInfoEntryForOps(param, testCases[0].opsID) + actualErr := globalNSMutex.deleteLockInfoEntryForOps(param, testCases[0].opsID) expectedErr := LockInfoVolPathMissing{testCases[0].volume, testCases[0].path} if errorCause(actualErr) != expectedErr { @@ -568,17 +568,17 @@ func TestNsLockMapDeleteLockInfoEntryForOps(t *testing.T) { // Case - 2. // Lock state is set to Running and then an attempt to delete the info for non-existent opsID done. - nsMutex.lockMapMutex.Lock() - err := nsMutex.statusNoneToBlocked(param, testCases[0].lockSource, testCases[0].opsID, testCases[0].readLock) + globalNSMutex.lockMapMutex.Lock() + err := globalNSMutex.statusNoneToBlocked(param, testCases[0].lockSource, testCases[0].opsID, testCases[0].readLock) if err != nil { t.Fatalf("Setting lock status to Blocked failed: %s", err) } - nsMutex.lockMapMutex.Unlock() - err = nsMutex.statusBlockedToRunning(param, testCases[0].lockSource, testCases[0].opsID, testCases[0].readLock) + globalNSMutex.lockMapMutex.Unlock() + err = globalNSMutex.statusBlockedToRunning(param, testCases[0].lockSource, testCases[0].opsID, testCases[0].readLock) if err != nil { t.Fatalf("Setting lock status to Running failed: %s", err) } - actualErr = nsMutex.deleteLockInfoEntryForOps(param, "non-existent-OpsID") + actualErr = globalNSMutex.deleteLockInfoEntryForOps(param, "non-existent-OpsID") expectedOpsIDErr := LockInfoOpsIDNotFound{param.volume, param.path, "non-existent-OpsID"} if errorCause(actualErr) != expectedOpsIDErr { @@ -589,7 +589,7 @@ func TestNsLockMapDeleteLockInfoEntryForOps(t *testing.T) { // All metrics should be 0 after deleting the entry. // Verify that the entry the opsID exists. - if debugLockMap, ok := nsMutex.debugLockMap[param]; ok { + if debugLockMap, ok := globalNSMutex.debugLockMap[param]; ok { if _, ok := debugLockMap.lockInfo[testCases[0].opsID]; !ok { t.Fatalf("Entry for OpsID \"%s\" in %s, %s should have existed. ", testCases[0].opsID, param.volume, param.path) } @@ -597,27 +597,27 @@ func TestNsLockMapDeleteLockInfoEntryForOps(t *testing.T) { t.Fatalf("Entry for %s, %s should have existed. ", param.volume, param.path) } - actualErr = nsMutex.deleteLockInfoEntryForOps(param, testCases[0].opsID) + actualErr = globalNSMutex.deleteLockInfoEntryForOps(param, testCases[0].opsID) if actualErr != nil { t.Fatalf("Expected the error to be , but got %s", actualErr) } // Verify that the entry for the opsId doesn't exists. - if debugLockMap, ok := nsMutex.debugLockMap[param]; ok { + if debugLockMap, ok := globalNSMutex.debugLockMap[param]; ok { if _, ok := debugLockMap.lockInfo[testCases[0].opsID]; ok { t.Fatalf("The entry for opsID \"%s\" should have been deleted", testCases[0].opsID) } } else { t.Fatalf("Entry for %s, %s should have existed. ", param.volume, param.path) } - if nsMutex.runningLockCounter != int64(0) { - t.Errorf("Expected the count of total running locks to be %v, but got %v", int64(0), nsMutex.runningLockCounter) + if globalNSMutex.runningLockCounter != int64(0) { + t.Errorf("Expected the count of total running locks to be %v, but got %v", int64(0), globalNSMutex.runningLockCounter) } - if nsMutex.blockedCounter != int64(0) { - t.Errorf("Expected the count of total blocked locks to be %v, but got %v", int64(0), nsMutex.blockedCounter) + if globalNSMutex.blockedCounter != int64(0) { + t.Errorf("Expected the count of total blocked locks to be %v, but got %v", int64(0), globalNSMutex.blockedCounter) } - if nsMutex.globalLockCounter != int64(0) { - t.Errorf("Expected the count of all locks to be %v, but got %v", int64(0), nsMutex.globalLockCounter) + if globalNSMutex.globalLockCounter != int64(0) { + t.Errorf("Expected the count of all locks to be %v, but got %v", int64(0), globalNSMutex.globalLockCounter) } } @@ -643,7 +643,7 @@ func TestNsLockMapDeleteLockInfoEntryForVolumePath(t *testing.T) { // Case where an attempt to delete the entry for non-existent pair is done. // Set the status of the lock to blocked and then to running. param := nsParam{testCases[0].volume, testCases[0].path} - actualErr := nsMutex.deleteLockInfoEntryForVolumePath(param) + actualErr := globalNSMutex.deleteLockInfoEntryForVolumePath(param) expectedNilErr := LockInfoVolPathMissing{param.volume, param.path} if errorCause(actualErr) != expectedNilErr { t.Fatalf("Errors mismatch: Expected \"%s\", got \"%s\"", expectedNilErr, actualErr) @@ -654,39 +654,39 @@ func TestNsLockMapDeleteLockInfoEntryForVolumePath(t *testing.T) { // All metrics should be 0 after deleting the entry. // Registering the entry first. - nsMutex.lockMapMutex.Lock() - err := nsMutex.statusNoneToBlocked(param, testCases[0].lockSource, testCases[0].opsID, testCases[0].readLock) + globalNSMutex.lockMapMutex.Lock() + err := globalNSMutex.statusNoneToBlocked(param, testCases[0].lockSource, testCases[0].opsID, testCases[0].readLock) if err != nil { t.Fatalf("Setting lock status to Blocked failed: %s", err) } - nsMutex.lockMapMutex.Unlock() - err = nsMutex.statusBlockedToRunning(param, testCases[0].lockSource, testCases[0].opsID, testCases[0].readLock) + globalNSMutex.lockMapMutex.Unlock() + err = globalNSMutex.statusBlockedToRunning(param, testCases[0].lockSource, testCases[0].opsID, testCases[0].readLock) if err != nil { t.Fatalf("Setting lock status to Running failed: %s", err) } // Verify that the entry the for given exists. - if _, ok := nsMutex.debugLockMap[param]; !ok { + if _, ok := globalNSMutex.debugLockMap[param]; !ok { t.Fatalf("Entry for %s, %s should have existed.", param.volume, param.path) } // first delete the entry for the operation ID. - _ = nsMutex.deleteLockInfoEntryForOps(param, testCases[0].opsID) - actualErr = nsMutex.deleteLockInfoEntryForVolumePath(param) + _ = globalNSMutex.deleteLockInfoEntryForOps(param, testCases[0].opsID) + actualErr = globalNSMutex.deleteLockInfoEntryForVolumePath(param) if actualErr != nil { t.Fatalf("Expected the error to be , but got %s", actualErr) } // Verify that the entry for the opsId doesn't exists. - if _, ok := nsMutex.debugLockMap[param]; ok { + if _, ok := globalNSMutex.debugLockMap[param]; ok { t.Fatalf("Entry for %s, %s should have been deleted. ", param.volume, param.path) } // The lock count values should be 0. - if nsMutex.runningLockCounter != int64(0) { - t.Errorf("Expected the count of total running locks to be %v, but got %v", int64(0), nsMutex.runningLockCounter) + if globalNSMutex.runningLockCounter != int64(0) { + t.Errorf("Expected the count of total running locks to be %v, but got %v", int64(0), globalNSMutex.runningLockCounter) } - if nsMutex.blockedCounter != int64(0) { - t.Errorf("Expected the count of total blocked locks to be %v, but got %v", int64(0), nsMutex.blockedCounter) + if globalNSMutex.blockedCounter != int64(0) { + t.Errorf("Expected the count of total blocked locks to be %v, but got %v", int64(0), globalNSMutex.blockedCounter) } - if nsMutex.globalLockCounter != int64(0) { - t.Errorf("Expected the count of all locks to be %v, but got %v", int64(0), nsMutex.globalLockCounter) + if globalNSMutex.globalLockCounter != int64(0) { + t.Errorf("Expected the count of all locks to be %v, but got %v", int64(0), globalNSMutex.globalLockCounter) } } diff --git a/cmd/lockinfo-handlers.go b/cmd/lockinfo-handlers.go index 2e8f5c5f8..2ed1b58a5 100644 --- a/cmd/lockinfo-handlers.go +++ b/cmd/lockinfo-handlers.go @@ -61,16 +61,16 @@ type OpsLockState struct { // Read entire state of the locks in the system and return. func getSystemLockState() (SystemLockState, error) { - nsMutex.lockMapMutex.Lock() - defer nsMutex.lockMapMutex.Unlock() + globalNSMutex.lockMapMutex.Lock() + defer globalNSMutex.lockMapMutex.Unlock() lockState := SystemLockState{} - lockState.TotalBlockedLocks = nsMutex.blockedCounter - lockState.TotalLocks = nsMutex.globalLockCounter - lockState.TotalAcquiredLocks = nsMutex.runningLockCounter + lockState.TotalBlockedLocks = globalNSMutex.blockedCounter + lockState.TotalLocks = globalNSMutex.globalLockCounter + lockState.TotalAcquiredLocks = globalNSMutex.runningLockCounter - for param, debugLock := range nsMutex.debugLockMap { + for param, debugLock := range globalNSMutex.debugLockMap { volLockInfo := VolumeLockInfo{} volLockInfo.Bucket = param.volume volLockInfo.Object = param.path diff --git a/cmd/namespace-lock.go b/cmd/namespace-lock.go index 37c0b80f3..f549dd4f8 100644 --- a/cmd/namespace-lock.go +++ b/cmd/namespace-lock.go @@ -26,7 +26,7 @@ import ( ) // Global name space lock. -var nsMutex *nsLockMap +var globalNSMutex *nsLockMap // Initialize distributed locking only in case of distributed setup. // Returns if the setup is distributed or not on success. @@ -57,15 +57,15 @@ func initDsyncNodes(eps []*url.URL) error { } // initNSLock - initialize name space lock map. -func initNSLock(isDist bool) { - nsMutex = &nsLockMap{ - isDist: isDist, - lockMap: make(map[nsParam]*nsLock), +func initNSLock(isDistXL bool) { + globalNSMutex = &nsLockMap{ + isDistXL: isDistXL, + lockMap: make(map[nsParam]*nsLock), } // Initialize nsLockMap with entry for instrumentation information. // Entries of -> stateInfo of locks - nsMutex.debugLockMap = make(map[nsParam]*debugLockInfoPerVolumePath) + globalNSMutex.debugLockMap = make(map[nsParam]*debugLockInfoPerVolumePath) } // RWLocker - interface that any read-write locking library should implement. @@ -98,7 +98,7 @@ type nsLockMap struct { // Indicates whether the locking service is part // of a distributed setup or not. - isDist bool + isDistXL bool lockMap map[nsParam]*nsLock lockMapMutex sync.Mutex } @@ -113,8 +113,8 @@ func (n *nsLockMap) lock(volume, path string, lockSource, opsID string, readLock if !found { nsLk = &nsLock{ RWLocker: func() RWLocker { - if n.isDist { - return dsync.NewDRWMutex(pathutil.Join(volume, path)) + if n.isDistXL { + return dsync.NewDRWMutex(pathJoin(volume, path)) } return &sync.RWMutex{} }(), @@ -126,7 +126,7 @@ func (n *nsLockMap) lock(volume, path string, lockSource, opsID string, readLock // Change the state of the lock to be blocked for the given // pair of and till the lock - // unblocks. The lock for accessing `nsMutex` is held inside + // unblocks. The lock for accessing `globalNSMutex` is held inside // the function itself. if err := n.statusNoneToBlocked(param, lockSource, opsID, readLock); err != nil { errorIf(err, "Failed to set lock state to blocked") @@ -226,7 +226,7 @@ func (n *nsLockMap) ForceUnlock(volume, path string) { defer n.lockMapMutex.Unlock() // Clarification on operation: - // - In case of FS or XL we call ForceUnlock on the local nsMutex + // - In case of FS or XL we call ForceUnlock on the local globalNSMutex // (since there is only a single server) which will cause the 'stuck' // mutex to be removed from the map. Existing operations for this // will continue to be blocked (and timeout). New operations on this @@ -238,9 +238,8 @@ func (n *nsLockMap) ForceUnlock(volume, path string) { // that participated in granting the lock. Any pending dsync locks that // are blocking can now proceed as normal and any new locks will also // participate normally. - - if n.isDist { // For distributed mode, broadcast ForceUnlock message. - dsync.NewDRWMutex(pathutil.Join(volume, path)).ForceUnlock() + if n.isDistXL { // For distributed mode, broadcast ForceUnlock message. + dsync.NewDRWMutex(pathJoin(volume, path)).ForceUnlock() } param := nsParam{volume, path} diff --git a/cmd/namespace-lock_test.go b/cmd/namespace-lock_test.go index b1834acc0..b3b46e640 100644 --- a/cmd/namespace-lock_test.go +++ b/cmd/namespace-lock_test.go @@ -37,22 +37,22 @@ func TestNamespaceLockTest(t *testing.T) { shouldPass bool }{ { - lk: nsMutex.Lock, - unlk: nsMutex.Unlock, + lk: globalNSMutex.Lock, + unlk: globalNSMutex.Unlock, lockedRefCount: 1, unlockedRefCount: 0, shouldPass: true, }, { - rlk: nsMutex.RLock, - runlk: nsMutex.RUnlock, + rlk: globalNSMutex.RLock, + runlk: globalNSMutex.RUnlock, lockedRefCount: 4, unlockedRefCount: 2, shouldPass: true, }, { - rlk: nsMutex.RLock, - runlk: nsMutex.RUnlock, + rlk: globalNSMutex.RLock, + runlk: globalNSMutex.RUnlock, lockedRefCount: 1, unlockedRefCount: 0, shouldPass: true, @@ -64,7 +64,7 @@ func TestNamespaceLockTest(t *testing.T) { // Write lock tests. testCase := testCases[0] testCase.lk("a", "b", "c") // lock once. - nsLk, ok := nsMutex.lockMap[nsParam{"a", "b"}] + nsLk, ok := globalNSMutex.lockMap[nsParam{"a", "b"}] if !ok && testCase.shouldPass { t.Errorf("Lock in map missing.") } @@ -76,7 +76,7 @@ func TestNamespaceLockTest(t *testing.T) { if testCase.unlockedRefCount != nsLk.ref && testCase.shouldPass { t.Errorf("Test %d fails, expected to pass. Wanted ref count is %d, got %d", 1, testCase.unlockedRefCount, nsLk.ref) } - _, ok = nsMutex.lockMap[nsParam{"a", "b"}] + _, ok = globalNSMutex.lockMap[nsParam{"a", "b"}] if ok && !testCase.shouldPass { t.Errorf("Lock map found after unlock.") } @@ -87,7 +87,7 @@ func TestNamespaceLockTest(t *testing.T) { testCase.rlk("a", "b", "c") // lock second time. testCase.rlk("a", "b", "c") // lock third time. testCase.rlk("a", "b", "c") // lock fourth time. - nsLk, ok = nsMutex.lockMap[nsParam{"a", "b"}] + nsLk, ok = globalNSMutex.lockMap[nsParam{"a", "b"}] if !ok && testCase.shouldPass { t.Errorf("Lock in map missing.") } @@ -101,7 +101,7 @@ func TestNamespaceLockTest(t *testing.T) { if testCase.unlockedRefCount != nsLk.ref && testCase.shouldPass { t.Errorf("Test %d fails, expected to pass. Wanted ref count is %d, got %d", 2, testCase.unlockedRefCount, nsLk.ref) } - _, ok = nsMutex.lockMap[nsParam{"a", "b"}] + _, ok = globalNSMutex.lockMap[nsParam{"a", "b"}] if !ok && testCase.shouldPass { t.Errorf("Lock map not found.") } @@ -110,7 +110,7 @@ func TestNamespaceLockTest(t *testing.T) { testCase = testCases[2] testCase.rlk("a", "c", "d") // lock once. - nsLk, ok = nsMutex.lockMap[nsParam{"a", "c"}] + nsLk, ok = globalNSMutex.lockMap[nsParam{"a", "c"}] if !ok && testCase.shouldPass { t.Errorf("Lock in map missing.") } @@ -122,7 +122,7 @@ func TestNamespaceLockTest(t *testing.T) { if testCase.unlockedRefCount != nsLk.ref && testCase.shouldPass { t.Errorf("Test %d fails, expected to pass. Wanted ref count is %d, got %d", 3, testCase.unlockedRefCount, nsLk.ref) } - _, ok = nsMutex.lockMap[nsParam{"a", "c"}] + _, ok = globalNSMutex.lockMap[nsParam{"a", "c"}] if ok && !testCase.shouldPass { t.Errorf("Lock map not found.") } @@ -303,7 +303,7 @@ func TestLockStats(t *testing.T) { // hold 10 read locks. for i := 0; i < 10; i++ { - nsMutex.RLock("my-bucket", "my-object", strconv.Itoa(i)) + globalNSMutex.RLock("my-bucket", "my-object", strconv.Itoa(i)) } // expected lock info. expectedLockStats := expectedResult[0] @@ -311,7 +311,7 @@ func TestLockStats(t *testing.T) { verifyLockState(expectedLockStats, t, 1) // unlock 5 readlock. for i := 0; i < 5; i++ { - nsMutex.RUnlock("my-bucket", "my-object", strconv.Itoa(i)) + globalNSMutex.RUnlock("my-bucket", "my-object", strconv.Itoa(i)) } expectedLockStats = expectedResult[1] @@ -323,14 +323,14 @@ func TestLockStats(t *testing.T) { go func() { defer wg.Done() // blocks till all read locks are released. - nsMutex.Lock("my-bucket", "my-object", strconv.Itoa(10)) + globalNSMutex.Lock("my-bucket", "my-object", strconv.Itoa(10)) // Once the above attempt to lock is unblocked/acquired, we verify the stats and release the lock. expectedWLockStats := expectedResult[2] // Since the write lock acquired here, the number of blocked locks should reduce by 1 and // count of running locks should increase by 1. verifyLockState(expectedWLockStats, t, 3) // release the write lock. - nsMutex.Unlock("my-bucket", "my-object", strconv.Itoa(10)) + globalNSMutex.Unlock("my-bucket", "my-object", strconv.Itoa(10)) // The number of running locks should decrease by 1. // expectedWLockStats = expectedResult[3] // verifyLockState(expectedWLockStats, t, 4) @@ -348,14 +348,14 @@ func TestLockStats(t *testing.T) { go func() { defer wg.Done() // blocks till all read locks are released. - nsMutex.Lock("my-bucket", "my-object", strconv.Itoa(11)) + globalNSMutex.Lock("my-bucket", "my-object", strconv.Itoa(11)) // Once the above attempt to lock is unblocked/acquired, we release the lock. // Unlock the second write lock only after lock stats for first write lock release is taken. <-syncChan // The number of running locks should decrease by 1. expectedWLockStats := expectedResult[4] verifyLockState(expectedWLockStats, t, 5) - nsMutex.Unlock("my-bucket", "my-object", strconv.Itoa(11)) + globalNSMutex.Unlock("my-bucket", "my-object", strconv.Itoa(11)) }() expectedLockStats = expectedResult[5] @@ -366,7 +366,7 @@ func TestLockStats(t *testing.T) { // unlock 4 out of remaining 5 read locks. for i := 0; i < 4; i++ { - nsMutex.RUnlock("my-bucket", "my-object", strconv.Itoa(i+5)) + globalNSMutex.RUnlock("my-bucket", "my-object", strconv.Itoa(i+5)) } // verify the entry for one remaining read lock and count of blocked write locks. @@ -375,7 +375,7 @@ func TestLockStats(t *testing.T) { verifyLockState(expectedLockStats, t, 7) // Releasing the last read lock. - nsMutex.RUnlock("my-bucket", "my-object", strconv.Itoa(9)) + globalNSMutex.RUnlock("my-bucket", "my-object", strconv.Itoa(9)) wg.Wait() expectedLockStats = expectedResult[7] // verify the actual lock info with the expected one. @@ -386,16 +386,16 @@ func TestLockStats(t *testing.T) { func TestNamespaceForceUnlockTest(t *testing.T) { // Create lock. - lock := nsMutex.NewNSLock("bucket", "object") + lock := globalNSMutex.NewNSLock("bucket", "object") lock.Lock() // Forcefully unlock lock. - nsMutex.ForceUnlock("bucket", "object") + globalNSMutex.ForceUnlock("bucket", "object") ch := make(chan struct{}, 1) go func() { // Try to claim lock again. - anotherLock := nsMutex.NewNSLock("bucket", "object") + anotherLock := globalNSMutex.NewNSLock("bucket", "object") anotherLock.Lock() // And signal succes. ch <- struct{}{} @@ -412,5 +412,5 @@ func TestNamespaceForceUnlockTest(t *testing.T) { } // Clean up lock. - nsMutex.ForceUnlock("bucket", "object") + globalNSMutex.ForceUnlock("bucket", "object") } diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index 61a5d6f69..f5b4857d3 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -96,6 +96,11 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req return } + // Lock the object before reading. + objectLock := globalNSMutex.NewNSLock(bucket, object) + objectLock.RLock() + defer objectLock.RUnlock() + objInfo, err := objectAPI.GetObjectInfo(bucket, object) if err != nil { errorIf(err, "Unable to fetch object info.") @@ -195,6 +200,11 @@ func (api objectAPIHandlers) HeadObjectHandler(w http.ResponseWriter, r *http.Re return } + // Lock the object before reading. + objectLock := globalNSMutex.NewNSLock(bucket, object) + objectLock.RLock() + defer objectLock.RUnlock() + objInfo, err := objectAPI.GetObjectInfo(bucket, object) if err != nil { errorIf(err, "Unable to fetch object info.") @@ -269,6 +279,11 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re return } + // Lock the object before reading. + objectRLock := globalNSMutex.NewNSLock(sourceBucket, sourceObject) + objectRLock.RLock() + defer objectRLock.RUnlock() + objInfo, err := objectAPI.GetObjectInfo(sourceBucket, sourceObject) if err != nil { errorIf(err, "Unable to fetch object info.") @@ -311,6 +326,11 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re delete(metadata, "md5Sum") sha256sum := "" + + objectWLock := globalNSMutex.NewNSLock(bucket, object) + objectWLock.Lock() + defer objectWLock.Unlock() + // Create the object. objInfo, err = objectAPI.PutObject(bucket, object, size, pipeReader, metadata, sha256sum) if err != nil { @@ -400,6 +420,11 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req sha256sum := "" + // Lock the object. + objectLock := globalNSMutex.NewNSLock(bucket, object) + objectLock.Lock() + defer objectLock.Unlock() + var objInfo ObjectInfo switch rAuthType { default: @@ -731,6 +756,11 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite completeParts = append(completeParts, part) } + // Hold write lock on the object. + destLock := globalNSMutex.NewNSLock(bucket, object) + destLock.Lock() + defer destLock.Unlock() + md5Sum, err = objectAPI.CompleteMultipartUpload(bucket, object, uploadID, completeParts) if err != nil { err = errorCause(err) @@ -801,6 +831,10 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http. return } + objectLock := globalNSMutex.NewNSLock(bucket, object) + objectLock.Lock() + defer objectLock.Unlock() + /// http://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectDELETE.html /// Ignore delete object errors, since we are suppposed to reply /// only 204. diff --git a/cmd/web-handlers.go b/cmd/web-handlers.go index 43801303e..4f2343123 100644 --- a/cmd/web-handlers.go +++ b/cmd/web-handlers.go @@ -17,7 +17,6 @@ package cmd import ( - "bytes" "encoding/json" "errors" "fmt" @@ -148,6 +147,9 @@ func (web *webAPIHandlers) MakeBucket(r *http.Request, args *MakeBucketArgs, rep if !isJWTReqAuthenticated(r) { return toJSONError(errAuthentication) } + bucketLock := globalNSMutex.NewNSLock(args.BucketName, "") + bucketLock.Lock() + defer bucketLock.Unlock() if err := objectAPI.MakeBucket(args.BucketName); err != nil { return toJSONError(err, args.BucketName) } @@ -272,6 +274,11 @@ func (web *webAPIHandlers) RemoveObject(r *http.Request, args *RemoveObjectArgs, if !isJWTReqAuthenticated(r) { return toJSONError(errAuthentication) } + + objectLock := globalNSMutex.NewNSLock(args.BucketName, args.ObjectName) + objectLock.Lock() + defer objectLock.Unlock() + if err := objectAPI.DeleteObject(args.BucketName, args.ObjectName); err != nil { if isErrObjectNotFound(err) { // Ignore object not found error. @@ -478,16 +485,15 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) { // Extract incoming metadata if any. metadata := extractMetadataFromHeader(r.Header) - sha256sum := "" - if _, err := objectAPI.PutObject(bucket, object, -1, r.Body, metadata, sha256sum); err != nil { - writeWebErrorResponse(w, err) - return - } + // Lock the object. + objectLock := globalNSMutex.NewNSLock(bucket, object) + objectLock.Lock() + defer objectLock.Unlock() - // Fetch object info for notifications. - objInfo, err := objectAPI.GetObjectInfo(bucket, object) + sha256sum := "" + objInfo, err := objectAPI.PutObject(bucket, object, -1, r.Body, metadata, sha256sum) if err != nil { - errorIf(err, "Unable to fetch object info for \"%s\"", path.Join(bucket, object)) + writeWebErrorResponse(w, err) return } @@ -534,6 +540,11 @@ func (web *webAPIHandlers) Download(w http.ResponseWriter, r *http.Request) { // Add content disposition. w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=\"%s\"", path.Base(object))) + // Lock the object before reading. + objectLock := globalNSMutex.NewNSLock(bucket, object) + objectLock.RLock() + defer objectLock.RUnlock() + objInfo, err := objectAPI.GetObjectInfo(bucket, object) if err != nil { writeWebErrorResponse(w, err) @@ -691,16 +702,8 @@ func (web *webAPIHandlers) SetBucketPolicy(r *http.Request, args *SetBucketPolic return toJSONError(err) } - // Parse bucket policy. - var policy = &bucketPolicy{} - err = parseBucketPolicy(bytes.NewReader(data), policy) - if err != nil { - errorIf(err, "Unable to parse bucket policy.") - return toJSONError(err, args.BucketName) - } - - // Parse check bucket policy. - if s3Error := checkBucketPolicyResources(args.BucketName, policy); s3Error != ErrNone { + // Parse validate and save bucket policy. + if s3Error := parseAndPersistBucketPolicy(args.BucketName, data, objectAPI); s3Error != ErrNone { apiErr := getAPIError(s3Error) var err error if apiErr.Code == "XMinioPolicyNesting" { @@ -710,12 +713,6 @@ func (web *webAPIHandlers) SetBucketPolicy(r *http.Request, args *SetBucketPolic } return toJSONError(err, args.BucketName) } - - // TODO: update policy statements according to bucket name, - // prefix and policy arguments. - if err := persistAndNotifyBucketPolicyChange(args.BucketName, policyChange{false, policy}, objectAPI); err != nil { - return toJSONError(err, args.BucketName) - } reply.UIVersion = miniobrowser.UIVersion return nil } @@ -808,8 +805,7 @@ func toJSONError(err error, params ...string) (jerr *json2.Error) { case "InvalidBucketName": if len(params) > 0 { jerr = &json2.Error{ - Message: fmt.Sprintf("Bucket Name %s is invalid. Lowercase letters, period and numerals are the only allowed characters.", - params[0]), + Message: fmt.Sprintf("Bucket Name %s is invalid. Lowercase letters, period and numerals are the only allowed characters.", params[0]), } } // Bucket not found custom error message. diff --git a/cmd/xl-v1-bucket.go b/cmd/xl-v1-bucket.go index 963672c0b..52e993dcf 100644 --- a/cmd/xl-v1-bucket.go +++ b/cmd/xl-v1-bucket.go @@ -36,10 +36,6 @@ func (xl xlObjects) MakeBucket(bucket string) error { return traceError(BucketNameInvalid{Bucket: bucket}) } - bucketLock := nsMutex.NewNSLock(bucket, "") - bucketLock.Lock() - defer bucketLock.Unlock() - // Initialize sync waitgroup. var wg = &sync.WaitGroup{} @@ -153,10 +149,6 @@ func (xl xlObjects) GetBucketInfo(bucket string) (BucketInfo, error) { return BucketInfo{}, BucketNameInvalid{Bucket: bucket} } - bucketLock := nsMutex.NewNSLock(bucket, "") - bucketLock.RLock() - defer bucketLock.RUnlock() - bucketInfo, err := xl.getBucketInfo(bucket) if err != nil { return BucketInfo{}, toObjectErr(err, bucket) @@ -227,10 +219,6 @@ func (xl xlObjects) DeleteBucket(bucket string) error { return BucketNameInvalid{Bucket: bucket} } - bucketLock := nsMutex.NewNSLock(bucket, "") - bucketLock.Lock() - defer bucketLock.Unlock() - // Collect if all disks report volume not found. var wg = &sync.WaitGroup{} var dErrs = make([]error, len(xl.storageDisks)) diff --git a/cmd/xl-v1-healing.go b/cmd/xl-v1-healing.go index 10abbe43a..533d6ff2f 100644 --- a/cmd/xl-v1-healing.go +++ b/cmd/xl-v1-healing.go @@ -73,7 +73,7 @@ func (xl xlObjects) HealBucket(bucket string) error { // Heal bucket - create buckets on disks where it does not exist. func healBucket(storageDisks []StorageAPI, bucket string, writeQuorum int) error { - bucketLock := nsMutex.NewNSLock(bucket, "") + bucketLock := globalNSMutex.NewNSLock(bucket, "") bucketLock.Lock() defer bucketLock.Unlock() @@ -126,7 +126,7 @@ func healBucket(storageDisks []StorageAPI, bucket string, writeQuorum int) error // heals `policy.json`, `notification.xml` and `listeners.json`. func healBucketMetadata(storageDisks []StorageAPI, bucket string, readQuorum int) error { healBucketMetaFn := func(metaPath string) error { - metaLock := nsMutex.NewNSLock(minioMetaBucket, metaPath) + metaLock := globalNSMutex.NewNSLock(minioMetaBucket, metaPath) metaLock.RLock() defer metaLock.RUnlock() // Heals the given file at metaPath. @@ -346,7 +346,7 @@ func (xl xlObjects) HealObject(bucket, object string) error { } // Lock the object before healing. - objectLock := nsMutex.NewNSLock(bucket, object) + objectLock := globalNSMutex.NewNSLock(bucket, object) objectLock.RLock() defer objectLock.RUnlock() diff --git a/cmd/xl-v1-list-objects-heal.go b/cmd/xl-v1-list-objects-heal.go index 8ddf19f85..b38411bb9 100644 --- a/cmd/xl-v1-list-objects-heal.go +++ b/cmd/xl-v1-list-objects-heal.go @@ -144,7 +144,7 @@ func (xl xlObjects) listObjectsHeal(bucket, prefix, marker, delimiter string, ma } // Check if the current object needs healing - objectLock := nsMutex.NewNSLock(bucket, objInfo.Name) + objectLock := globalNSMutex.NewNSLock(bucket, objInfo.Name) objectLock.RLock() partsMetadata, errs := readAllXLMetadata(xl.storageDisks, bucket, objInfo.Name) if xlShouldHeal(partsMetadata, errs) { diff --git a/cmd/xl-v1-multipart.go b/cmd/xl-v1-multipart.go index 496e2672f..802c34ee4 100644 --- a/cmd/xl-v1-multipart.go +++ b/cmd/xl-v1-multipart.go @@ -64,7 +64,7 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark // uploadIDMarker first. if uploadIDMarker != "" { // hold lock on keyMarker path - keyMarkerLock := nsMutex.NewNSLock(minioMetaMultipartBucket, + keyMarkerLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, keyMarker)) keyMarkerLock.RLock() for _, disk := range xl.getLoadBalancedDisks() { @@ -134,7 +134,7 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark // For the new object entry we get all its // pending uploadIDs. - entryLock := nsMutex.NewNSLock(minioMetaMultipartBucket, + entryLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, entry)) entryLock.RLock() var disk StorageAPI @@ -242,7 +242,7 @@ func (xl xlObjects) newMultipartUpload(bucket string, object string, meta map[st // This lock needs to be held for any changes to the directory // contents of ".minio.sys/multipart/object/" - objectMPartPathLock := nsMutex.NewNSLock(minioMetaMultipartBucket, + objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object)) objectMPartPathLock.Lock() defer objectMPartPathLock.Unlock() @@ -305,7 +305,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s uploadIDPath := pathJoin(bucket, object, uploadID) // pre-check upload id lock. - preUploadIDLock := nsMutex.NewNSLock(minioMetaMultipartBucket, uploadIDPath) + preUploadIDLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, uploadIDPath) preUploadIDLock.RLock() // Validates if upload ID exists. if !xl.isUploadIDExists(bucket, object, uploadID) { @@ -414,7 +414,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s } // post-upload check (write) lock - postUploadIDLock := nsMutex.NewNSLock(minioMetaMultipartBucket, uploadIDPath) + postUploadIDLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, uploadIDPath) postUploadIDLock.Lock() defer postUploadIDLock.Unlock() @@ -557,7 +557,7 @@ func (xl xlObjects) ListObjectParts(bucket, object, uploadID string, partNumberM // Hold lock so that there is no competing // abort-multipart-upload or complete-multipart-upload. - uploadIDLock := nsMutex.NewNSLock(minioMetaMultipartBucket, + uploadIDLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object, uploadID)) uploadIDLock.Lock() defer uploadIDLock.Unlock() @@ -586,7 +586,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload // // 2) no one does a parallel complete-multipart-upload on this // multipart upload - uploadIDLock := nsMutex.NewNSLock(minioMetaMultipartBucket, + uploadIDLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object, uploadID)) uploadIDLock.Lock() defer uploadIDLock.Unlock() @@ -704,20 +704,12 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload return "", toObjectErr(rErr, minioMetaMultipartBucket, uploadIDPath) } - // Hold write lock on the destination before rename. - destLock := nsMutex.NewNSLock(bucket, object) - destLock.Lock() defer func() { if xl.objCacheEnabled { // A new complete multipart upload invalidates any // previously cached object in memory. xl.objCache.Delete(path.Join(bucket, object)) - } - // This lock also protects the cache namespace. - destLock.Unlock() - - if xl.objCacheEnabled { // Prefetch the object from disk by triggering a fake GetObject call // Unlike a regular single PutObject, multipart PutObject is comes in // stages and it is harder to cache. @@ -761,7 +753,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload // Hold the lock so that two parallel // complete-multipart-uploads do not leave a stale // uploads.json behind. - objectMPartPathLock := nsMutex.NewNSLock(minioMetaMultipartBucket, + objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object)) objectMPartPathLock.Lock() defer objectMPartPathLock.Unlock() @@ -787,7 +779,7 @@ func (xl xlObjects) abortMultipartUpload(bucket, object, uploadID string) (err e // hold lock so we don't compete with a complete, or abort // multipart request. - objectMPartPathLock := nsMutex.NewNSLock(minioMetaMultipartBucket, + objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object)) objectMPartPathLock.Lock() defer objectMPartPathLock.Unlock() @@ -819,7 +811,7 @@ func (xl xlObjects) AbortMultipartUpload(bucket, object, uploadID string) error // Hold lock so that there is no competing // complete-multipart-upload or put-object-part. - uploadIDLock := nsMutex.NewNSLock(minioMetaMultipartBucket, + uploadIDLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object, uploadID)) uploadIDLock.Lock() defer uploadIDLock.Unlock() diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index a8ed0f7fe..6ee87336a 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -62,11 +62,6 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i return traceError(errUnexpected) } - // Lock the object before reading. - objectLock := nsMutex.NewNSLock(bucket, object) - objectLock.RLock() - defer objectLock.RUnlock() - // Read metadata associated with the object from all disks. metaArr, errs := readAllXLMetadata(xl.storageDisks, bucket, object) // Do we have read quorum? @@ -222,10 +217,6 @@ func (xl xlObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) { return ObjectInfo{}, err } - objectLock := nsMutex.NewNSLock(bucket, object) - objectLock.RLock() - defer objectLock.RUnlock() - info, err := xl.getObjectInfo(bucket, object) if err != nil { return ObjectInfo{}, toObjectErr(err, bucket, object) @@ -485,11 +476,6 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. } } - // Lock the object. - objectLock := nsMutex.NewNSLock(bucket, object) - objectLock.Lock() - defer objectLock.Unlock() - // Check if an object is present as one of the parent dir. // -- FIXME. (needs a new kind of lock). if xl.parentDirIsObject(bucket, path.Dir(object)) { @@ -606,10 +592,6 @@ func (xl xlObjects) DeleteObject(bucket, object string) (err error) { return err } - objectLock := nsMutex.NewNSLock(bucket, object) - objectLock.Lock() - defer objectLock.Unlock() - // Validate object exists. if !xl.isObject(bucket, object) { return traceError(ObjectNotFound{bucket, object})