diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index c5344227a..afbc3dded 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -334,6 +334,10 @@ func (api objectAPIHandlers) PutBucketHandler(w http.ResponseWriter, r *http.Req return } + bucketLock := globalNSMutex.NewNSLock(bucket, "") + bucketLock.Lock() + defer bucketLock.Unlock() + // Proceed to creating a bucket. err := objectAPI.MakeBucket(bucket) if err != nil { @@ -431,6 +435,10 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h sha256sum := "" + objectLock := globalNSMutex.NewNSLock(bucket, object) + objectLock.Lock() + defer objectLock.Unlock() + objInfo, err := objectAPI.PutObject(bucket, object, -1, fileBody, metadata, sha256sum) if err != nil { errorIf(err, "Unable to create object.") @@ -478,6 +486,10 @@ func (api objectAPIHandlers) HeadBucketHandler(w http.ResponseWriter, r *http.Re return } + bucketLock := globalNSMutex.NewNSLock(bucket, "") + bucketLock.RLock() + defer bucketLock.RUnlock() + if _, err := objectAPI.GetBucketInfo(bucket); err != nil { errorIf(err, "Unable to fetch bucket info.") writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path) @@ -503,6 +515,10 @@ func (api objectAPIHandlers) DeleteBucketHandler(w http.ResponseWriter, r *http. vars := mux.Vars(r) bucket := vars["bucket"] + bucketLock := globalNSMutex.NewNSLock(bucket, "") + bucketLock.Lock() + defer bucketLock.Unlock() + // Attempt to delete bucket. if err := objectAPI.DeleteBucket(bucket); err != nil { errorIf(err, "Unable to delete a bucket.") diff --git a/cmd/bucket-notification-handlers.go b/cmd/bucket-notification-handlers.go index ba4ed2c71..0a3b9e8d8 100644 --- a/cmd/bucket-notification-handlers.go +++ b/cmd/bucket-notification-handlers.go @@ -23,7 +23,6 @@ import ( "fmt" "io" "net/http" - "path" "time" "github.com/gorilla/mux" @@ -174,7 +173,7 @@ func PutBucketNotificationConfig(bucket string, ncfg *notificationConfig, objAPI // Acquire a write lock on bucket before modifying its // configuration. - bucketLock := nsMutex.NewNSLock(bucket, "") + bucketLock := globalNSMutex.NewNSLock(bucket, "") bucketLock.Lock() // Release lock after notifying peers defer bucketLock.Unlock() @@ -381,7 +380,7 @@ func AddBucketListenerConfig(bucket string, lcfg *listenerConfig, objAPI ObjectL // Acquire a write lock on bucket before modifying its // configuration. - bucketLock := nsMutex.NewNSLock(bucket, "") + bucketLock := globalNSMutex.NewNSLock(bucket, "") bucketLock.Lock() // Release lock after notifying peers defer bucketLock.Unlock() @@ -423,7 +422,7 @@ func RemoveBucketListenerConfig(bucket string, lcfg *listenerConfig, objAPI Obje // Acquire a write lock on bucket before modifying its // configuration. - bucketLock := nsMutex.NewNSLock(bucket, "") + bucketLock := globalNSMutex.NewNSLock(bucket, "") bucketLock.Lock() // Release lock after notifying peers defer bucketLock.Unlock() @@ -441,14 +440,3 @@ func RemoveBucketListenerConfig(bucket string, lcfg *listenerConfig, objAPI Obje // peers (including local) S3PeersUpdateBucketListener(bucket, updatedLcfgs) } - -// Removes notification.xml for a given bucket, only used during DeleteBucket. -func removeNotificationConfig(bucket string, objAPI ObjectLayer) error { - // Verify bucket is valid. - if !IsValidBucketName(bucket) { - return BucketNameInvalid{Bucket: bucket} - } - - notificationConfigPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig) - return objAPI.DeleteObject(minioMetaBucket, notificationConfigPath) -} diff --git a/cmd/bucket-policy-handlers.go b/cmd/bucket-policy-handlers.go index 95ef0b900..50589059f 100644 --- a/cmd/bucket-policy-handlers.go +++ b/cmd/bucket-policy-handlers.go @@ -17,7 +17,6 @@ package cmd import ( - "bytes" "fmt" "io" "io/ioutil" @@ -166,65 +165,17 @@ func (api objectAPIHandlers) PutBucketPolicyHandler(w http.ResponseWriter, r *ht writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path) return } - // Parse bucket policy. - var policy = &bucketPolicy{} - err = parseBucketPolicy(bytes.NewReader(policyBytes), policy) - if err != nil { - errorIf(err, "Unable to parse bucket policy.") - writeErrorResponse(w, r, ErrInvalidPolicyDocument, r.URL.Path) - return - } - // Parse check bucket policy. - if s3Error := checkBucketPolicyResources(bucket, policy); s3Error != ErrNone { + // Parse validate and save bucket policy. + if s3Error := parseAndPersistBucketPolicy(bucket, policyBytes, objAPI); s3Error != ErrNone { writeErrorResponse(w, r, s3Error, r.URL.Path) return } - // Save bucket policy. - if err = persistAndNotifyBucketPolicyChange(bucket, policyChange{false, policy}, objAPI); err != nil { - switch err.(type) { - case BucketNameInvalid: - writeErrorResponse(w, r, ErrInvalidBucketName, r.URL.Path) - default: - writeErrorResponse(w, r, ErrInternalError, r.URL.Path) - } - return - } - // Success. writeSuccessNoContent(w) } -// persistAndNotifyBucketPolicyChange - takes a policyChange argument, -// persists it to storage, and notify nodes in the cluster about the -// change. In-memory state is updated in response to the notification. -func persistAndNotifyBucketPolicyChange(bucket string, pCh policyChange, objAPI ObjectLayer) error { - // Acquire a write lock on bucket before modifying its - // configuration. - bucketLock := nsMutex.NewNSLock(bucket, "") - bucketLock.Lock() - // Release lock after notifying peers - defer bucketLock.Unlock() - - if pCh.IsRemove { - if err := removeBucketPolicy(bucket, objAPI); err != nil { - return err - } - } else { - if pCh.BktPolicy == nil { - return errInvalidArgument - } - if err := writeBucketPolicy(bucket, objAPI, pCh.BktPolicy); err != nil { - return err - } - } - - // Notify all peers (including self) to update in-memory state - S3PeersUpdateBucketPolicy(bucket, pCh) - return nil -} - // DeleteBucketPolicyHandler - DELETE Bucket policy // ----------------- // This implementation of the DELETE operation uses the policy diff --git a/cmd/bucket-policy.go b/cmd/bucket-policy.go index 8261caa6b..07d9ca05f 100644 --- a/cmd/bucket-policy.go +++ b/cmd/bucket-policy.go @@ -144,6 +144,12 @@ func getOldBucketsConfigPath() (string, error) { // if bucket policy is not found. func readBucketPolicyJSON(bucket string, objAPI ObjectLayer) (bucketPolicyReader io.Reader, err error) { policyPath := pathJoin(bucketConfigPrefix, bucket, policyJSON) + + // Acquire a read lock on policy config before reading. + objLock := globalNSMutex.NewNSLock(minioMetaBucket, policyPath) + objLock.RLock() + defer objLock.RUnlock() + objInfo, err := objAPI.GetObjectInfo(minioMetaBucket, policyPath) if err != nil { if isErrObjectNotFound(err) || isErrIncompleteBody(err) { @@ -188,6 +194,10 @@ func readBucketPolicy(bucket string, objAPI ObjectLayer) (*bucketPolicy, error) // if no policies are found. func removeBucketPolicy(bucket string, objAPI ObjectLayer) error { policyPath := pathJoin(bucketConfigPrefix, bucket, policyJSON) + // Acquire a write lock on policy config before modifying. + objLock := globalNSMutex.NewNSLock(minioMetaBucket, policyPath) + objLock.Lock() + defer objLock.Unlock() if err := objAPI.DeleteObject(minioMetaBucket, policyPath); err != nil { errorIf(err, "Unable to remove bucket-policy on bucket %s.", bucket) err = errorCause(err) @@ -207,9 +217,70 @@ func writeBucketPolicy(bucket string, objAPI ObjectLayer, bpy *bucketPolicy) err return err } policyPath := pathJoin(bucketConfigPrefix, bucket, policyJSON) + // Acquire a write lock on policy config before modifying. + objLock := globalNSMutex.NewNSLock(minioMetaBucket, policyPath) + objLock.Lock() + defer objLock.Unlock() if _, err := objAPI.PutObject(minioMetaBucket, policyPath, int64(len(buf)), bytes.NewReader(buf), nil, ""); err != nil { errorIf(err, "Unable to set policy for the bucket %s", bucket) return errorCause(err) } return nil } + +func parseAndPersistBucketPolicy(bucket string, policyBytes []byte, objAPI ObjectLayer) APIErrorCode { + // Parse bucket policy. + var policy = &bucketPolicy{} + err := parseBucketPolicy(bytes.NewReader(policyBytes), policy) + if err != nil { + errorIf(err, "Unable to parse bucket policy.") + return ErrInvalidPolicyDocument + } + + // Parse check bucket policy. + if s3Error := checkBucketPolicyResources(bucket, policy); s3Error != ErrNone { + return s3Error + } + + // Acquire a write lock on bucket before modifying its configuration. + bucketLock := globalNSMutex.NewNSLock(bucket, "") + bucketLock.Lock() + // Release lock after notifying peers + defer bucketLock.Unlock() + + // Save bucket policy. + if err = persistAndNotifyBucketPolicyChange(bucket, policyChange{false, policy}, objAPI); err != nil { + switch err.(type) { + case BucketNameInvalid: + return ErrInvalidBucketName + case BucketNotFound: + return ErrNoSuchBucket + default: + errorIf(err, "Unable to save bucket policy.") + return ErrInternalError + } + } + return ErrNone +} + +// persistAndNotifyBucketPolicyChange - takes a policyChange argument, +// persists it to storage, and notify nodes in the cluster about the +// change. In-memory state is updated in response to the notification. +func persistAndNotifyBucketPolicyChange(bucket string, pCh policyChange, objAPI ObjectLayer) error { + if pCh.IsRemove { + if err := removeBucketPolicy(bucket, objAPI); err != nil { + return err + } + } else { + if pCh.BktPolicy == nil { + return errInvalidArgument + } + if err := writeBucketPolicy(bucket, objAPI, pCh.BktPolicy); err != nil { + return err + } + } + + // Notify all peers (including self) to update in-memory state + S3PeersUpdateBucketPolicy(bucket, pCh) + return nil +} diff --git a/cmd/event-notifier.go b/cmd/event-notifier.go index 5c2794413..b8d697d3f 100644 --- a/cmd/event-notifier.go +++ b/cmd/event-notifier.go @@ -301,8 +301,14 @@ func eventNotify(event eventData) { // structured notification config. func loadNotificationConfig(bucket string, objAPI ObjectLayer) (*notificationConfig, error) { // Construct the notification config path. - notificationConfigPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig) - objInfo, err := objAPI.GetObjectInfo(minioMetaBucket, notificationConfigPath) + ncPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig) + + // Acquire a write lock on notification config before modifying. + objLock := globalNSMutex.NewNSLock(minioMetaBucket, ncPath) + objLock.RLock() + defer objLock.RUnlock() + + objInfo, err := objAPI.GetObjectInfo(minioMetaBucket, ncPath) if err != nil { // 'notification.xml' not found return // 'errNoSuchNotifications'. This is default when no @@ -315,7 +321,7 @@ func loadNotificationConfig(bucket string, objAPI ObjectLayer) (*notificationCon return nil, err } var buffer bytes.Buffer - err = objAPI.GetObject(minioMetaBucket, notificationConfigPath, 0, objInfo.Size, &buffer) + err = objAPI.GetObject(minioMetaBucket, ncPath, 0, objInfo.Size, &buffer) if err != nil { // 'notification.xml' not found return // 'errNoSuchNotifications'. This is default when no @@ -350,8 +356,14 @@ func loadListenerConfig(bucket string, objAPI ObjectLayer) ([]listenerConfig, er } // Construct the notification config path. - listenerConfigPath := path.Join(bucketConfigPrefix, bucket, bucketListenerConfig) - objInfo, err := objAPI.GetObjectInfo(minioMetaBucket, listenerConfigPath) + lcPath := path.Join(bucketConfigPrefix, bucket, bucketListenerConfig) + + // Acquire a write lock on notification config before modifying. + objLock := globalNSMutex.NewNSLock(minioMetaBucket, lcPath) + objLock.RLock() + defer objLock.RUnlock() + + objInfo, err := objAPI.GetObjectInfo(minioMetaBucket, lcPath) if err != nil { // 'listener.json' not found return // 'errNoSuchNotifications'. This is default when no @@ -364,7 +376,7 @@ func loadListenerConfig(bucket string, objAPI ObjectLayer) ([]listenerConfig, er return nil, err } var buffer bytes.Buffer - err = objAPI.GetObject(minioMetaBucket, listenerConfigPath, 0, objInfo.Size, &buffer) + err = objAPI.GetObject(minioMetaBucket, lcPath, 0, objInfo.Size, &buffer) if err != nil { // 'notification.xml' not found return // 'errNoSuchNotifications'. This is default when no @@ -399,6 +411,11 @@ func persistNotificationConfig(bucket string, ncfg *notificationConfig, obj Obje // build path ncPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig) + // Acquire a write lock on notification config before modifying. + objLock := globalNSMutex.NewNSLock(minioMetaBucket, ncPath) + objLock.Lock() + defer objLock.Unlock() + // write object to path sha256Sum := getSHA256Hash(buf) _, err = obj.PutObject(minioMetaBucket, ncPath, int64(len(buf)), bytes.NewReader(buf), nil, sha256Sum) @@ -419,6 +436,11 @@ func persistListenerConfig(bucket string, lcfg []listenerConfig, obj ObjectLayer // build path lcPath := path.Join(bucketConfigPrefix, bucket, bucketListenerConfig) + // Acquire a write lock on notification config before modifying. + objLock := globalNSMutex.NewNSLock(minioMetaBucket, lcPath) + objLock.Lock() + defer objLock.Unlock() + // write object to path sha256Sum := getSHA256Hash(buf) _, err = obj.PutObject(minioMetaBucket, lcPath, int64(len(buf)), bytes.NewReader(buf), nil, sha256Sum) @@ -428,12 +450,34 @@ func persistListenerConfig(bucket string, lcfg []listenerConfig, obj ObjectLayer return err } +// Removes notification.xml for a given bucket, only used during DeleteBucket. +func removeNotificationConfig(bucket string, objAPI ObjectLayer) error { + // Verify bucket is valid. + if !IsValidBucketName(bucket) { + return BucketNameInvalid{Bucket: bucket} + } + + ncPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig) + + // Acquire a write lock on notification config before modifying. + objLock := globalNSMutex.NewNSLock(minioMetaBucket, ncPath) + objLock.Lock() + err := objAPI.DeleteObject(minioMetaBucket, ncPath) + objLock.Unlock() + return err +} + // Remove listener configuration from storage layer. Used when a bucket is deleted. func removeListenerConfig(bucket string, objAPI ObjectLayer) error { // make the path lcPath := path.Join(bucketConfigPrefix, bucket, bucketListenerConfig) - // remove it - return objAPI.DeleteObject(minioMetaBucket, lcPath) + + // Acquire a write lock on notification config before modifying. + objLock := globalNSMutex.NewNSLock(minioMetaBucket, lcPath) + objLock.Lock() + err := objAPI.DeleteObject(minioMetaBucket, lcPath) + objLock.Unlock() + return err } // Loads both notification and listener config. diff --git a/cmd/fs-v1-multipart.go b/cmd/fs-v1-multipart.go index 8ae6d554c..4f7e45dc3 100644 --- a/cmd/fs-v1-multipart.go +++ b/cmd/fs-v1-multipart.go @@ -57,7 +57,7 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark var err error var eof bool if uploadIDMarker != "" { - keyMarkerLock := nsMutex.NewNSLock(minioMetaMultipartBucket, + keyMarkerLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, keyMarker)) keyMarkerLock.RLock() uploads, _, err = listMultipartUploadIDs(bucket, keyMarker, uploadIDMarker, maxUploads, fs.storage) @@ -112,7 +112,7 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark var end bool uploadIDMarker = "" - entryLock := nsMutex.NewNSLock(minioMetaMultipartBucket, + entryLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, entry)) entryLock.RLock() tmpUploads, end, err = listMultipartUploadIDs(bucket, entry, uploadIDMarker, maxUploads, fs.storage) @@ -192,7 +192,7 @@ func (fs fsObjects) newMultipartUpload(bucket string, object string, meta map[st // This lock needs to be held for any changes to the directory // contents of ".minio.sys/multipart/object/" - objectMPartPathLock := nsMutex.NewNSLock(minioMetaMultipartBucket, + objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object)) objectMPartPathLock.Lock() defer objectMPartPathLock.Unlock() @@ -248,7 +248,7 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s uploadIDPath := path.Join(bucket, object, uploadID) - preUploadIDLock := nsMutex.NewNSLock(minioMetaMultipartBucket, uploadIDPath) + preUploadIDLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, uploadIDPath) preUploadIDLock.RLock() // Just check if the uploadID exists to avoid copy if it doesn't. uploadIDExists := fs.isUploadIDExists(bucket, object, uploadID) @@ -329,7 +329,7 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s } // Hold write lock as we are updating fs.json - postUploadIDLock := nsMutex.NewNSLock(minioMetaMultipartBucket, uploadIDPath) + postUploadIDLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, uploadIDPath) postUploadIDLock.Lock() defer postUploadIDLock.Unlock() @@ -348,7 +348,7 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s partPath := path.Join(bucket, object, uploadID, partSuffix) // Lock the part so that another part upload with same part-number gets blocked // while the part is getting appended in the background. - partLock := nsMutex.NewNSLock(minioMetaMultipartBucket, partPath) + partLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, partPath) partLock.Lock() err = fs.storage.RenameFile(minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath) if err != nil { @@ -439,7 +439,7 @@ func (fs fsObjects) ListObjectParts(bucket, object, uploadID string, partNumberM // Hold lock so that there is no competing // abort-multipart-upload or complete-multipart-upload. - uploadIDLock := nsMutex.NewNSLock(minioMetaMultipartBucket, + uploadIDLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object, uploadID)) uploadIDLock.Lock() defer uploadIDLock.Unlock() @@ -479,7 +479,7 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload // 1) no one aborts this multipart upload // 2) no one does a parallel complete-multipart-upload on this // multipart upload - uploadIDLock := nsMutex.NewNSLock(minioMetaMultipartBucket, uploadIDPath) + uploadIDLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, uploadIDPath) uploadIDLock.Lock() defer uploadIDLock.Unlock() @@ -502,14 +502,11 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload // This lock is held during rename of the appended tmp file to the actual // location so that any competing GetObject/PutObject/DeleteObject do not race. - objectLock := nsMutex.NewNSLock(bucket, object) appendFallback := true // In case background-append did not append the required parts. if isPartsSame(fsMeta.Parts, parts) { err = fs.bgAppend.complete(fs.storage, bucket, object, uploadID, fsMeta) if err == nil { appendFallback = false - objectLock.Lock() - defer objectLock.Unlock() if err = fs.storage.RenameFile(minioMetaTmpBucket, uploadID, bucket, object); err != nil { return "", toObjectErr(traceError(err), minioMetaTmpBucket, uploadID) } @@ -584,8 +581,6 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload } } - objectLock.Lock() - defer objectLock.Unlock() // Rename the file back to original location, if not delete the temporary object. err = fs.storage.RenameFile(minioMetaTmpBucket, tempObj, bucket, object) if err != nil { @@ -619,7 +614,7 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload // Hold the lock so that two parallel // complete-multipart-uploads do not leave a stale // uploads.json behind. - objectMPartPathLock := nsMutex.NewNSLock(minioMetaMultipartBucket, + objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object)) objectMPartPathLock.Lock() defer objectMPartPathLock.Unlock() @@ -672,7 +667,7 @@ func (fs fsObjects) AbortMultipartUpload(bucket, object, uploadID string) error // Hold lock so that there is no competing // complete-multipart-upload or put-object-part. - uploadIDLock := nsMutex.NewNSLock(minioMetaMultipartBucket, + uploadIDLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object, uploadID)) uploadIDLock.Lock() defer uploadIDLock.Unlock() diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index 4d3d933b6..ead0c5745 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -246,11 +246,6 @@ func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64, return traceError(InvalidRange{offset, length, fi.Size}) } - // Lock the object before reading. - objectLock := nsMutex.NewNSLock(bucket, object) - objectLock.RLock() - defer objectLock.RUnlock() - var totalLeft = length bufSize := int64(readSizeV1) if length > 0 && bufSize > length { @@ -450,11 +445,6 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io. } } - // Lock the object before committing the object. - objectLock := nsMutex.NewNSLock(bucket, object) - objectLock.RLock() - defer objectLock.RUnlock() - // Entire object was written to the temp location, now it's safe to rename it to the actual location. err = fs.storage.RenameFile(minioMetaTmpBucket, tempObj, bucket, object) if err != nil { @@ -484,12 +474,6 @@ func (fs fsObjects) DeleteObject(bucket, object string) error { return err } - // Lock the object before deleting so that an in progress GetObject does not return - // corrupt data or there is no race with a PutObject. - objectLock := nsMutex.NewNSLock(bucket, object) - objectLock.RLock() - defer objectLock.RUnlock() - if bucket != minioMetaBucket { // We don't store fs.json for minio-S3-layer created files like policy.json, // hence we don't try to delete fs.json for such files. diff --git a/cmd/lock-instrument_test.go b/cmd/lock-instrument_test.go index 89c3ee39c..fe33341a1 100644 --- a/cmd/lock-instrument_test.go +++ b/cmd/lock-instrument_test.go @@ -119,26 +119,26 @@ func verifyRPCLockInfoResponse(l lockStateCase, rpcLockInfoMap map[string]*Syste } } -// Asserts the lock counter from the global nsMutex inmemory lock with the expected one. +// Asserts the lock counter from the global globalNSMutex inmemory lock with the expected one. func verifyGlobalLockStats(l lockStateCase, t *testing.T, testNum int) { - nsMutex.lockMapMutex.Lock() + globalNSMutex.lockMapMutex.Lock() // Verifying the lock stats. - if nsMutex.globalLockCounter != int64(l.expectedGlobalLockCount) { + if globalNSMutex.globalLockCounter != int64(l.expectedGlobalLockCount) { t.Errorf("Test %d: Expected the global lock counter to be %v, but got %v", testNum, int64(l.expectedGlobalLockCount), - nsMutex.globalLockCounter) + globalNSMutex.globalLockCounter) } // verify the count for total blocked locks. - if nsMutex.blockedCounter != int64(l.expectedBlockedLockCount) { + if globalNSMutex.blockedCounter != int64(l.expectedBlockedLockCount) { t.Errorf("Test %d: Expected the total blocked lock counter to be %v, but got %v", testNum, int64(l.expectedBlockedLockCount), - nsMutex.blockedCounter) + globalNSMutex.blockedCounter) } // verify the count for total running locks. - if nsMutex.runningLockCounter != int64(l.expectedRunningLockCount) { + if globalNSMutex.runningLockCounter != int64(l.expectedRunningLockCount) { t.Errorf("Test %d: Expected the total running lock counter to be %v, but got %v", testNum, int64(l.expectedRunningLockCount), - nsMutex.runningLockCounter) + globalNSMutex.runningLockCounter) } - nsMutex.lockMapMutex.Unlock() + globalNSMutex.lockMapMutex.Unlock() // Verifying again with the JSON response of the lock info. // Verifying the lock stats. sysLockState, err := getSystemLockState() @@ -164,35 +164,35 @@ func verifyGlobalLockStats(l lockStateCase, t *testing.T, testNum int) { // Verify the lock counter for entries of given pair. func verifyLockStats(l lockStateCase, t *testing.T, testNum int) { - nsMutex.lockMapMutex.Lock() - defer nsMutex.lockMapMutex.Unlock() + globalNSMutex.lockMapMutex.Lock() + defer globalNSMutex.lockMapMutex.Unlock() param := nsParam{l.volume, l.path} // Verify the total locks (blocked+running) for given pair. - if nsMutex.debugLockMap[param].ref != int64(l.expectedVolPathLockCount) { + if globalNSMutex.debugLockMap[param].ref != int64(l.expectedVolPathLockCount) { t.Errorf("Test %d: Expected the total lock count for volume: \"%s\", path: \"%s\" to be %v, but got %v", testNum, - param.volume, param.path, int64(l.expectedVolPathLockCount), nsMutex.debugLockMap[param].ref) + param.volume, param.path, int64(l.expectedVolPathLockCount), globalNSMutex.debugLockMap[param].ref) } // Verify the total running locks for given pair. - if nsMutex.debugLockMap[param].running != int64(l.expectedVolPathRunningCount) { + if globalNSMutex.debugLockMap[param].running != int64(l.expectedVolPathRunningCount) { t.Errorf("Test %d: Expected the total running locks for volume: \"%s\", path: \"%s\" to be %v, but got %v", testNum, param.volume, param.path, - int64(l.expectedVolPathRunningCount), nsMutex.debugLockMap[param].running) + int64(l.expectedVolPathRunningCount), globalNSMutex.debugLockMap[param].running) } // Verify the total blocked locks for givne pair. - if nsMutex.debugLockMap[param].blocked != int64(l.expectedVolPathBlockCount) { + if globalNSMutex.debugLockMap[param].blocked != int64(l.expectedVolPathBlockCount) { t.Errorf("Test %d: Expected the total blocked locks for volume: \"%s\", path: \"%s\" to be %v, but got %v", testNum, param.volume, param.path, - int64(l.expectedVolPathBlockCount), nsMutex.debugLockMap[param].blocked) + int64(l.expectedVolPathBlockCount), globalNSMutex.debugLockMap[param].blocked) } } -// verifyLockState - function which asserts the expected lock info in the system with the actual values in the nsMutex. +// verifyLockState - function which asserts the expected lock info in the system with the actual values in the globalNSMutex. func verifyLockState(l lockStateCase, t *testing.T, testNum int) { param := nsParam{l.volume, l.path} verifyGlobalLockStats(l, t, testNum) - nsMutex.lockMapMutex.Lock() + globalNSMutex.lockMapMutex.Lock() // Verifying the lock statuS fields. - if debugLockMap, ok := nsMutex.debugLockMap[param]; ok { + if debugLockMap, ok := globalNSMutex.debugLockMap[param]; ok { if lockInfo, ok := debugLockMap.lockInfo[l.opsID]; ok { // Validating the lock type filed in the debug lock information. if l.readLock { @@ -222,7 +222,7 @@ func verifyLockState(l lockStateCase, t *testing.T, testNum int) { t.Errorf("Test case %d: Debug lock entry for volume: %s, path: %s doesn't exist", testNum, param.volume, param.path) } // verifyLockStats holds its own lock. - nsMutex.lockMapMutex.Unlock() + globalNSMutex.lockMapMutex.Unlock() // verify the lock count. verifyLockStats(l, t, testNum) @@ -319,7 +319,7 @@ func TestNsLockMapStatusBlockedToRunning(t *testing.T) { param := nsParam{testCases[0].volume, testCases[0].path} // Testing before the initialization done. // Since the data structures for - actualErr := nsMutex.statusBlockedToRunning(param, testCases[0].lockSource, + actualErr := globalNSMutex.statusBlockedToRunning(param, testCases[0].lockSource, testCases[0].opsID, testCases[0].readLock) expectedErr := LockInfoVolPathMissing{testCases[0].volume, testCases[0].path} @@ -327,14 +327,14 @@ func TestNsLockMapStatusBlockedToRunning(t *testing.T) { t.Fatalf("Errors mismatch: Expected \"%s\", got \"%s\"", expectedErr, actualErr) } - nsMutex = &nsLockMap{ + globalNSMutex = &nsLockMap{ // entries of -> stateInfo of locks, for instrumentation purpose. debugLockMap: make(map[nsParam]*debugLockInfoPerVolumePath), lockMap: make(map[nsParam]*nsLock), } // Entry for pair is set to nil. Should fail with `errLockNotInitialized`. - nsMutex.debugLockMap[param] = nil - actualErr = nsMutex.statusBlockedToRunning(param, testCases[0].lockSource, + globalNSMutex.debugLockMap[param] = nil + actualErr = globalNSMutex.statusBlockedToRunning(param, testCases[0].lockSource, testCases[0].opsID, testCases[0].readLock) if errorCause(actualErr) != errLockNotInitialized { @@ -342,14 +342,14 @@ func TestNsLockMapStatusBlockedToRunning(t *testing.T) { } // Setting the lock info the be `nil`. - nsMutex.debugLockMap[param] = &debugLockInfoPerVolumePath{ + globalNSMutex.debugLockMap[param] = &debugLockInfoPerVolumePath{ lockInfo: nil, // setting the lockinfo to nil. ref: 0, blocked: 0, running: 0, } - actualErr = nsMutex.statusBlockedToRunning(param, testCases[0].lockSource, + actualErr = globalNSMutex.statusBlockedToRunning(param, testCases[0].lockSource, testCases[0].opsID, testCases[0].readLock) expectedOpsErr := LockInfoOpsIDNotFound{testCases[0].volume, testCases[0].path, testCases[0].opsID} @@ -359,7 +359,7 @@ func TestNsLockMapStatusBlockedToRunning(t *testing.T) { // Next case: ase whether an attempt to change the state of the lock to "Running" done, // but the initial state if already "Running". Such an attempt should fail - nsMutex.debugLockMap[param] = &debugLockInfoPerVolumePath{ + globalNSMutex.debugLockMap[param] = &debugLockInfoPerVolumePath{ lockInfo: make(map[string]debugLockInfo), ref: 0, blocked: 0, @@ -368,13 +368,13 @@ func TestNsLockMapStatusBlockedToRunning(t *testing.T) { // Setting the status of the lock to be "Running". // The initial state of the lock should set to "Blocked", otherwise its not possible to change the state from "Blocked" -> "Running". - nsMutex.debugLockMap[param].lockInfo[testCases[0].opsID] = debugLockInfo{ + globalNSMutex.debugLockMap[param].lockInfo[testCases[0].opsID] = debugLockInfo{ lockSource: "/home/vadmeste/work/go/src/github.com/minio/minio/xl-v1-object.go:683 +0x2a", status: "Running", // State set to "Running". Should fail with `LockInfoStateNotBlocked`. since: time.Now().UTC(), } - actualErr = nsMutex.statusBlockedToRunning(param, testCases[0].lockSource, + actualErr = globalNSMutex.statusBlockedToRunning(param, testCases[0].lockSource, testCases[0].opsID, testCases[0].readLock) expectedBlockErr := LockInfoStateNotBlocked{testCases[0].volume, testCases[0].path, testCases[0].opsID} @@ -390,22 +390,22 @@ func TestNsLockMapStatusBlockedToRunning(t *testing.T) { param := nsParam{testCase.volume, testCase.path} // status of the lock to be set to "Blocked", before setting Blocked->Running. if testCase.setBlocked { - nsMutex.lockMapMutex.Lock() - err := nsMutex.statusNoneToBlocked(param, testCase.lockSource, testCase.opsID, testCase.readLock) + globalNSMutex.lockMapMutex.Lock() + err := globalNSMutex.statusNoneToBlocked(param, testCase.lockSource, testCase.opsID, testCase.readLock) if err != nil { t.Fatalf("Test %d: Initializing the initial state to Blocked failed %s", i+1, err) } - nsMutex.lockMapMutex.Unlock() + globalNSMutex.lockMapMutex.Unlock() } // invoking the method under test. - actualErr = nsMutex.statusBlockedToRunning(param, testCase.lockSource, testCase.opsID, testCase.readLock) + actualErr = globalNSMutex.statusBlockedToRunning(param, testCase.lockSource, testCase.opsID, testCase.readLock) if errorCause(actualErr) != testCase.expectedErr { t.Fatalf("Test %d: Errors mismatch: Expected: \"%s\", got: \"%s\"", i+1, testCase.expectedErr, actualErr) } // In case of no error proceed with validating the lock state information. if actualErr == nil { // debug entry for given pair should exist. - if debugLockMap, ok := nsMutex.debugLockMap[param]; ok { + if debugLockMap, ok := globalNSMutex.debugLockMap[param]; ok { if lockInfo, ok := debugLockMap.lockInfo[testCase.opsID]; ok { // Validating the lock type filed in the debug lock information. if testCase.readLock { @@ -514,7 +514,7 @@ func TestNsLockMapStatusNoneToBlocked(t *testing.T) { param := nsParam{testCases[0].volume, testCases[0].path} // Testing before the initialization done. // Since the data structures for - actualErr := nsMutex.statusBlockedToRunning(param, testCases[0].lockSource, + actualErr := globalNSMutex.statusBlockedToRunning(param, testCases[0].lockSource, testCases[0].opsID, testCases[0].readLock) expectedErr := LockInfoVolPathMissing{testCases[0].volume, testCases[0].path} @@ -524,13 +524,13 @@ func TestNsLockMapStatusNoneToBlocked(t *testing.T) { // Iterate over the cases and assert the result. for i, testCase := range testCases { - nsMutex.lockMapMutex.Lock() + globalNSMutex.lockMapMutex.Lock() param := nsParam{testCase.volume, testCase.path} - actualErr := nsMutex.statusNoneToBlocked(param, testCase.lockSource, testCase.opsID, testCase.readLock) + actualErr := globalNSMutex.statusNoneToBlocked(param, testCase.lockSource, testCase.opsID, testCase.readLock) if actualErr != testCase.expectedErr { t.Fatalf("Test %d: Errors mismatch: Expected: \"%s\", got: \"%s\"", i+1, testCase.expectedErr, actualErr) } - nsMutex.lockMapMutex.Unlock() + globalNSMutex.lockMapMutex.Unlock() if actualErr == nil { verifyLockState(testCase, t, i+1) } @@ -559,7 +559,7 @@ func TestNsLockMapDeleteLockInfoEntryForOps(t *testing.T) { param := nsParam{testCases[0].volume, testCases[0].path} // Testing before the initialization done. - actualErr := nsMutex.deleteLockInfoEntryForOps(param, testCases[0].opsID) + actualErr := globalNSMutex.deleteLockInfoEntryForOps(param, testCases[0].opsID) expectedErr := LockInfoVolPathMissing{testCases[0].volume, testCases[0].path} if errorCause(actualErr) != expectedErr { @@ -568,17 +568,17 @@ func TestNsLockMapDeleteLockInfoEntryForOps(t *testing.T) { // Case - 2. // Lock state is set to Running and then an attempt to delete the info for non-existent opsID done. - nsMutex.lockMapMutex.Lock() - err := nsMutex.statusNoneToBlocked(param, testCases[0].lockSource, testCases[0].opsID, testCases[0].readLock) + globalNSMutex.lockMapMutex.Lock() + err := globalNSMutex.statusNoneToBlocked(param, testCases[0].lockSource, testCases[0].opsID, testCases[0].readLock) if err != nil { t.Fatalf("Setting lock status to Blocked failed: %s", err) } - nsMutex.lockMapMutex.Unlock() - err = nsMutex.statusBlockedToRunning(param, testCases[0].lockSource, testCases[0].opsID, testCases[0].readLock) + globalNSMutex.lockMapMutex.Unlock() + err = globalNSMutex.statusBlockedToRunning(param, testCases[0].lockSource, testCases[0].opsID, testCases[0].readLock) if err != nil { t.Fatalf("Setting lock status to Running failed: %s", err) } - actualErr = nsMutex.deleteLockInfoEntryForOps(param, "non-existent-OpsID") + actualErr = globalNSMutex.deleteLockInfoEntryForOps(param, "non-existent-OpsID") expectedOpsIDErr := LockInfoOpsIDNotFound{param.volume, param.path, "non-existent-OpsID"} if errorCause(actualErr) != expectedOpsIDErr { @@ -589,7 +589,7 @@ func TestNsLockMapDeleteLockInfoEntryForOps(t *testing.T) { // All metrics should be 0 after deleting the entry. // Verify that the entry the opsID exists. - if debugLockMap, ok := nsMutex.debugLockMap[param]; ok { + if debugLockMap, ok := globalNSMutex.debugLockMap[param]; ok { if _, ok := debugLockMap.lockInfo[testCases[0].opsID]; !ok { t.Fatalf("Entry for OpsID \"%s\" in %s, %s should have existed. ", testCases[0].opsID, param.volume, param.path) } @@ -597,27 +597,27 @@ func TestNsLockMapDeleteLockInfoEntryForOps(t *testing.T) { t.Fatalf("Entry for %s, %s should have existed. ", param.volume, param.path) } - actualErr = nsMutex.deleteLockInfoEntryForOps(param, testCases[0].opsID) + actualErr = globalNSMutex.deleteLockInfoEntryForOps(param, testCases[0].opsID) if actualErr != nil { t.Fatalf("Expected the error to be , but got %s", actualErr) } // Verify that the entry for the opsId doesn't exists. - if debugLockMap, ok := nsMutex.debugLockMap[param]; ok { + if debugLockMap, ok := globalNSMutex.debugLockMap[param]; ok { if _, ok := debugLockMap.lockInfo[testCases[0].opsID]; ok { t.Fatalf("The entry for opsID \"%s\" should have been deleted", testCases[0].opsID) } } else { t.Fatalf("Entry for %s, %s should have existed. ", param.volume, param.path) } - if nsMutex.runningLockCounter != int64(0) { - t.Errorf("Expected the count of total running locks to be %v, but got %v", int64(0), nsMutex.runningLockCounter) + if globalNSMutex.runningLockCounter != int64(0) { + t.Errorf("Expected the count of total running locks to be %v, but got %v", int64(0), globalNSMutex.runningLockCounter) } - if nsMutex.blockedCounter != int64(0) { - t.Errorf("Expected the count of total blocked locks to be %v, but got %v", int64(0), nsMutex.blockedCounter) + if globalNSMutex.blockedCounter != int64(0) { + t.Errorf("Expected the count of total blocked locks to be %v, but got %v", int64(0), globalNSMutex.blockedCounter) } - if nsMutex.globalLockCounter != int64(0) { - t.Errorf("Expected the count of all locks to be %v, but got %v", int64(0), nsMutex.globalLockCounter) + if globalNSMutex.globalLockCounter != int64(0) { + t.Errorf("Expected the count of all locks to be %v, but got %v", int64(0), globalNSMutex.globalLockCounter) } } @@ -643,7 +643,7 @@ func TestNsLockMapDeleteLockInfoEntryForVolumePath(t *testing.T) { // Case where an attempt to delete the entry for non-existent pair is done. // Set the status of the lock to blocked and then to running. param := nsParam{testCases[0].volume, testCases[0].path} - actualErr := nsMutex.deleteLockInfoEntryForVolumePath(param) + actualErr := globalNSMutex.deleteLockInfoEntryForVolumePath(param) expectedNilErr := LockInfoVolPathMissing{param.volume, param.path} if errorCause(actualErr) != expectedNilErr { t.Fatalf("Errors mismatch: Expected \"%s\", got \"%s\"", expectedNilErr, actualErr) @@ -654,39 +654,39 @@ func TestNsLockMapDeleteLockInfoEntryForVolumePath(t *testing.T) { // All metrics should be 0 after deleting the entry. // Registering the entry first. - nsMutex.lockMapMutex.Lock() - err := nsMutex.statusNoneToBlocked(param, testCases[0].lockSource, testCases[0].opsID, testCases[0].readLock) + globalNSMutex.lockMapMutex.Lock() + err := globalNSMutex.statusNoneToBlocked(param, testCases[0].lockSource, testCases[0].opsID, testCases[0].readLock) if err != nil { t.Fatalf("Setting lock status to Blocked failed: %s", err) } - nsMutex.lockMapMutex.Unlock() - err = nsMutex.statusBlockedToRunning(param, testCases[0].lockSource, testCases[0].opsID, testCases[0].readLock) + globalNSMutex.lockMapMutex.Unlock() + err = globalNSMutex.statusBlockedToRunning(param, testCases[0].lockSource, testCases[0].opsID, testCases[0].readLock) if err != nil { t.Fatalf("Setting lock status to Running failed: %s", err) } // Verify that the entry the for given exists. - if _, ok := nsMutex.debugLockMap[param]; !ok { + if _, ok := globalNSMutex.debugLockMap[param]; !ok { t.Fatalf("Entry for %s, %s should have existed.", param.volume, param.path) } // first delete the entry for the operation ID. - _ = nsMutex.deleteLockInfoEntryForOps(param, testCases[0].opsID) - actualErr = nsMutex.deleteLockInfoEntryForVolumePath(param) + _ = globalNSMutex.deleteLockInfoEntryForOps(param, testCases[0].opsID) + actualErr = globalNSMutex.deleteLockInfoEntryForVolumePath(param) if actualErr != nil { t.Fatalf("Expected the error to be , but got %s", actualErr) } // Verify that the entry for the opsId doesn't exists. - if _, ok := nsMutex.debugLockMap[param]; ok { + if _, ok := globalNSMutex.debugLockMap[param]; ok { t.Fatalf("Entry for %s, %s should have been deleted. ", param.volume, param.path) } // The lock count values should be 0. - if nsMutex.runningLockCounter != int64(0) { - t.Errorf("Expected the count of total running locks to be %v, but got %v", int64(0), nsMutex.runningLockCounter) + if globalNSMutex.runningLockCounter != int64(0) { + t.Errorf("Expected the count of total running locks to be %v, but got %v", int64(0), globalNSMutex.runningLockCounter) } - if nsMutex.blockedCounter != int64(0) { - t.Errorf("Expected the count of total blocked locks to be %v, but got %v", int64(0), nsMutex.blockedCounter) + if globalNSMutex.blockedCounter != int64(0) { + t.Errorf("Expected the count of total blocked locks to be %v, but got %v", int64(0), globalNSMutex.blockedCounter) } - if nsMutex.globalLockCounter != int64(0) { - t.Errorf("Expected the count of all locks to be %v, but got %v", int64(0), nsMutex.globalLockCounter) + if globalNSMutex.globalLockCounter != int64(0) { + t.Errorf("Expected the count of all locks to be %v, but got %v", int64(0), globalNSMutex.globalLockCounter) } } diff --git a/cmd/lockinfo-handlers.go b/cmd/lockinfo-handlers.go index 2e8f5c5f8..2ed1b58a5 100644 --- a/cmd/lockinfo-handlers.go +++ b/cmd/lockinfo-handlers.go @@ -61,16 +61,16 @@ type OpsLockState struct { // Read entire state of the locks in the system and return. func getSystemLockState() (SystemLockState, error) { - nsMutex.lockMapMutex.Lock() - defer nsMutex.lockMapMutex.Unlock() + globalNSMutex.lockMapMutex.Lock() + defer globalNSMutex.lockMapMutex.Unlock() lockState := SystemLockState{} - lockState.TotalBlockedLocks = nsMutex.blockedCounter - lockState.TotalLocks = nsMutex.globalLockCounter - lockState.TotalAcquiredLocks = nsMutex.runningLockCounter + lockState.TotalBlockedLocks = globalNSMutex.blockedCounter + lockState.TotalLocks = globalNSMutex.globalLockCounter + lockState.TotalAcquiredLocks = globalNSMutex.runningLockCounter - for param, debugLock := range nsMutex.debugLockMap { + for param, debugLock := range globalNSMutex.debugLockMap { volLockInfo := VolumeLockInfo{} volLockInfo.Bucket = param.volume volLockInfo.Object = param.path diff --git a/cmd/namespace-lock.go b/cmd/namespace-lock.go index 37c0b80f3..f549dd4f8 100644 --- a/cmd/namespace-lock.go +++ b/cmd/namespace-lock.go @@ -26,7 +26,7 @@ import ( ) // Global name space lock. -var nsMutex *nsLockMap +var globalNSMutex *nsLockMap // Initialize distributed locking only in case of distributed setup. // Returns if the setup is distributed or not on success. @@ -57,15 +57,15 @@ func initDsyncNodes(eps []*url.URL) error { } // initNSLock - initialize name space lock map. -func initNSLock(isDist bool) { - nsMutex = &nsLockMap{ - isDist: isDist, - lockMap: make(map[nsParam]*nsLock), +func initNSLock(isDistXL bool) { + globalNSMutex = &nsLockMap{ + isDistXL: isDistXL, + lockMap: make(map[nsParam]*nsLock), } // Initialize nsLockMap with entry for instrumentation information. // Entries of -> stateInfo of locks - nsMutex.debugLockMap = make(map[nsParam]*debugLockInfoPerVolumePath) + globalNSMutex.debugLockMap = make(map[nsParam]*debugLockInfoPerVolumePath) } // RWLocker - interface that any read-write locking library should implement. @@ -98,7 +98,7 @@ type nsLockMap struct { // Indicates whether the locking service is part // of a distributed setup or not. - isDist bool + isDistXL bool lockMap map[nsParam]*nsLock lockMapMutex sync.Mutex } @@ -113,8 +113,8 @@ func (n *nsLockMap) lock(volume, path string, lockSource, opsID string, readLock if !found { nsLk = &nsLock{ RWLocker: func() RWLocker { - if n.isDist { - return dsync.NewDRWMutex(pathutil.Join(volume, path)) + if n.isDistXL { + return dsync.NewDRWMutex(pathJoin(volume, path)) } return &sync.RWMutex{} }(), @@ -126,7 +126,7 @@ func (n *nsLockMap) lock(volume, path string, lockSource, opsID string, readLock // Change the state of the lock to be blocked for the given // pair of and till the lock - // unblocks. The lock for accessing `nsMutex` is held inside + // unblocks. The lock for accessing `globalNSMutex` is held inside // the function itself. if err := n.statusNoneToBlocked(param, lockSource, opsID, readLock); err != nil { errorIf(err, "Failed to set lock state to blocked") @@ -226,7 +226,7 @@ func (n *nsLockMap) ForceUnlock(volume, path string) { defer n.lockMapMutex.Unlock() // Clarification on operation: - // - In case of FS or XL we call ForceUnlock on the local nsMutex + // - In case of FS or XL we call ForceUnlock on the local globalNSMutex // (since there is only a single server) which will cause the 'stuck' // mutex to be removed from the map. Existing operations for this // will continue to be blocked (and timeout). New operations on this @@ -238,9 +238,8 @@ func (n *nsLockMap) ForceUnlock(volume, path string) { // that participated in granting the lock. Any pending dsync locks that // are blocking can now proceed as normal and any new locks will also // participate normally. - - if n.isDist { // For distributed mode, broadcast ForceUnlock message. - dsync.NewDRWMutex(pathutil.Join(volume, path)).ForceUnlock() + if n.isDistXL { // For distributed mode, broadcast ForceUnlock message. + dsync.NewDRWMutex(pathJoin(volume, path)).ForceUnlock() } param := nsParam{volume, path} diff --git a/cmd/namespace-lock_test.go b/cmd/namespace-lock_test.go index b1834acc0..b3b46e640 100644 --- a/cmd/namespace-lock_test.go +++ b/cmd/namespace-lock_test.go @@ -37,22 +37,22 @@ func TestNamespaceLockTest(t *testing.T) { shouldPass bool }{ { - lk: nsMutex.Lock, - unlk: nsMutex.Unlock, + lk: globalNSMutex.Lock, + unlk: globalNSMutex.Unlock, lockedRefCount: 1, unlockedRefCount: 0, shouldPass: true, }, { - rlk: nsMutex.RLock, - runlk: nsMutex.RUnlock, + rlk: globalNSMutex.RLock, + runlk: globalNSMutex.RUnlock, lockedRefCount: 4, unlockedRefCount: 2, shouldPass: true, }, { - rlk: nsMutex.RLock, - runlk: nsMutex.RUnlock, + rlk: globalNSMutex.RLock, + runlk: globalNSMutex.RUnlock, lockedRefCount: 1, unlockedRefCount: 0, shouldPass: true, @@ -64,7 +64,7 @@ func TestNamespaceLockTest(t *testing.T) { // Write lock tests. testCase := testCases[0] testCase.lk("a", "b", "c") // lock once. - nsLk, ok := nsMutex.lockMap[nsParam{"a", "b"}] + nsLk, ok := globalNSMutex.lockMap[nsParam{"a", "b"}] if !ok && testCase.shouldPass { t.Errorf("Lock in map missing.") } @@ -76,7 +76,7 @@ func TestNamespaceLockTest(t *testing.T) { if testCase.unlockedRefCount != nsLk.ref && testCase.shouldPass { t.Errorf("Test %d fails, expected to pass. Wanted ref count is %d, got %d", 1, testCase.unlockedRefCount, nsLk.ref) } - _, ok = nsMutex.lockMap[nsParam{"a", "b"}] + _, ok = globalNSMutex.lockMap[nsParam{"a", "b"}] if ok && !testCase.shouldPass { t.Errorf("Lock map found after unlock.") } @@ -87,7 +87,7 @@ func TestNamespaceLockTest(t *testing.T) { testCase.rlk("a", "b", "c") // lock second time. testCase.rlk("a", "b", "c") // lock third time. testCase.rlk("a", "b", "c") // lock fourth time. - nsLk, ok = nsMutex.lockMap[nsParam{"a", "b"}] + nsLk, ok = globalNSMutex.lockMap[nsParam{"a", "b"}] if !ok && testCase.shouldPass { t.Errorf("Lock in map missing.") } @@ -101,7 +101,7 @@ func TestNamespaceLockTest(t *testing.T) { if testCase.unlockedRefCount != nsLk.ref && testCase.shouldPass { t.Errorf("Test %d fails, expected to pass. Wanted ref count is %d, got %d", 2, testCase.unlockedRefCount, nsLk.ref) } - _, ok = nsMutex.lockMap[nsParam{"a", "b"}] + _, ok = globalNSMutex.lockMap[nsParam{"a", "b"}] if !ok && testCase.shouldPass { t.Errorf("Lock map not found.") } @@ -110,7 +110,7 @@ func TestNamespaceLockTest(t *testing.T) { testCase = testCases[2] testCase.rlk("a", "c", "d") // lock once. - nsLk, ok = nsMutex.lockMap[nsParam{"a", "c"}] + nsLk, ok = globalNSMutex.lockMap[nsParam{"a", "c"}] if !ok && testCase.shouldPass { t.Errorf("Lock in map missing.") } @@ -122,7 +122,7 @@ func TestNamespaceLockTest(t *testing.T) { if testCase.unlockedRefCount != nsLk.ref && testCase.shouldPass { t.Errorf("Test %d fails, expected to pass. Wanted ref count is %d, got %d", 3, testCase.unlockedRefCount, nsLk.ref) } - _, ok = nsMutex.lockMap[nsParam{"a", "c"}] + _, ok = globalNSMutex.lockMap[nsParam{"a", "c"}] if ok && !testCase.shouldPass { t.Errorf("Lock map not found.") } @@ -303,7 +303,7 @@ func TestLockStats(t *testing.T) { // hold 10 read locks. for i := 0; i < 10; i++ { - nsMutex.RLock("my-bucket", "my-object", strconv.Itoa(i)) + globalNSMutex.RLock("my-bucket", "my-object", strconv.Itoa(i)) } // expected lock info. expectedLockStats := expectedResult[0] @@ -311,7 +311,7 @@ func TestLockStats(t *testing.T) { verifyLockState(expectedLockStats, t, 1) // unlock 5 readlock. for i := 0; i < 5; i++ { - nsMutex.RUnlock("my-bucket", "my-object", strconv.Itoa(i)) + globalNSMutex.RUnlock("my-bucket", "my-object", strconv.Itoa(i)) } expectedLockStats = expectedResult[1] @@ -323,14 +323,14 @@ func TestLockStats(t *testing.T) { go func() { defer wg.Done() // blocks till all read locks are released. - nsMutex.Lock("my-bucket", "my-object", strconv.Itoa(10)) + globalNSMutex.Lock("my-bucket", "my-object", strconv.Itoa(10)) // Once the above attempt to lock is unblocked/acquired, we verify the stats and release the lock. expectedWLockStats := expectedResult[2] // Since the write lock acquired here, the number of blocked locks should reduce by 1 and // count of running locks should increase by 1. verifyLockState(expectedWLockStats, t, 3) // release the write lock. - nsMutex.Unlock("my-bucket", "my-object", strconv.Itoa(10)) + globalNSMutex.Unlock("my-bucket", "my-object", strconv.Itoa(10)) // The number of running locks should decrease by 1. // expectedWLockStats = expectedResult[3] // verifyLockState(expectedWLockStats, t, 4) @@ -348,14 +348,14 @@ func TestLockStats(t *testing.T) { go func() { defer wg.Done() // blocks till all read locks are released. - nsMutex.Lock("my-bucket", "my-object", strconv.Itoa(11)) + globalNSMutex.Lock("my-bucket", "my-object", strconv.Itoa(11)) // Once the above attempt to lock is unblocked/acquired, we release the lock. // Unlock the second write lock only after lock stats for first write lock release is taken. <-syncChan // The number of running locks should decrease by 1. expectedWLockStats := expectedResult[4] verifyLockState(expectedWLockStats, t, 5) - nsMutex.Unlock("my-bucket", "my-object", strconv.Itoa(11)) + globalNSMutex.Unlock("my-bucket", "my-object", strconv.Itoa(11)) }() expectedLockStats = expectedResult[5] @@ -366,7 +366,7 @@ func TestLockStats(t *testing.T) { // unlock 4 out of remaining 5 read locks. for i := 0; i < 4; i++ { - nsMutex.RUnlock("my-bucket", "my-object", strconv.Itoa(i+5)) + globalNSMutex.RUnlock("my-bucket", "my-object", strconv.Itoa(i+5)) } // verify the entry for one remaining read lock and count of blocked write locks. @@ -375,7 +375,7 @@ func TestLockStats(t *testing.T) { verifyLockState(expectedLockStats, t, 7) // Releasing the last read lock. - nsMutex.RUnlock("my-bucket", "my-object", strconv.Itoa(9)) + globalNSMutex.RUnlock("my-bucket", "my-object", strconv.Itoa(9)) wg.Wait() expectedLockStats = expectedResult[7] // verify the actual lock info with the expected one. @@ -386,16 +386,16 @@ func TestLockStats(t *testing.T) { func TestNamespaceForceUnlockTest(t *testing.T) { // Create lock. - lock := nsMutex.NewNSLock("bucket", "object") + lock := globalNSMutex.NewNSLock("bucket", "object") lock.Lock() // Forcefully unlock lock. - nsMutex.ForceUnlock("bucket", "object") + globalNSMutex.ForceUnlock("bucket", "object") ch := make(chan struct{}, 1) go func() { // Try to claim lock again. - anotherLock := nsMutex.NewNSLock("bucket", "object") + anotherLock := globalNSMutex.NewNSLock("bucket", "object") anotherLock.Lock() // And signal succes. ch <- struct{}{} @@ -412,5 +412,5 @@ func TestNamespaceForceUnlockTest(t *testing.T) { } // Clean up lock. - nsMutex.ForceUnlock("bucket", "object") + globalNSMutex.ForceUnlock("bucket", "object") } diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index 61a5d6f69..f5b4857d3 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -96,6 +96,11 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req return } + // Lock the object before reading. + objectLock := globalNSMutex.NewNSLock(bucket, object) + objectLock.RLock() + defer objectLock.RUnlock() + objInfo, err := objectAPI.GetObjectInfo(bucket, object) if err != nil { errorIf(err, "Unable to fetch object info.") @@ -195,6 +200,11 @@ func (api objectAPIHandlers) HeadObjectHandler(w http.ResponseWriter, r *http.Re return } + // Lock the object before reading. + objectLock := globalNSMutex.NewNSLock(bucket, object) + objectLock.RLock() + defer objectLock.RUnlock() + objInfo, err := objectAPI.GetObjectInfo(bucket, object) if err != nil { errorIf(err, "Unable to fetch object info.") @@ -269,6 +279,11 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re return } + // Lock the object before reading. + objectRLock := globalNSMutex.NewNSLock(sourceBucket, sourceObject) + objectRLock.RLock() + defer objectRLock.RUnlock() + objInfo, err := objectAPI.GetObjectInfo(sourceBucket, sourceObject) if err != nil { errorIf(err, "Unable to fetch object info.") @@ -311,6 +326,11 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re delete(metadata, "md5Sum") sha256sum := "" + + objectWLock := globalNSMutex.NewNSLock(bucket, object) + objectWLock.Lock() + defer objectWLock.Unlock() + // Create the object. objInfo, err = objectAPI.PutObject(bucket, object, size, pipeReader, metadata, sha256sum) if err != nil { @@ -400,6 +420,11 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req sha256sum := "" + // Lock the object. + objectLock := globalNSMutex.NewNSLock(bucket, object) + objectLock.Lock() + defer objectLock.Unlock() + var objInfo ObjectInfo switch rAuthType { default: @@ -731,6 +756,11 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite completeParts = append(completeParts, part) } + // Hold write lock on the object. + destLock := globalNSMutex.NewNSLock(bucket, object) + destLock.Lock() + defer destLock.Unlock() + md5Sum, err = objectAPI.CompleteMultipartUpload(bucket, object, uploadID, completeParts) if err != nil { err = errorCause(err) @@ -801,6 +831,10 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http. return } + objectLock := globalNSMutex.NewNSLock(bucket, object) + objectLock.Lock() + defer objectLock.Unlock() + /// http://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectDELETE.html /// Ignore delete object errors, since we are suppposed to reply /// only 204. diff --git a/cmd/web-handlers.go b/cmd/web-handlers.go index 43801303e..4f2343123 100644 --- a/cmd/web-handlers.go +++ b/cmd/web-handlers.go @@ -17,7 +17,6 @@ package cmd import ( - "bytes" "encoding/json" "errors" "fmt" @@ -148,6 +147,9 @@ func (web *webAPIHandlers) MakeBucket(r *http.Request, args *MakeBucketArgs, rep if !isJWTReqAuthenticated(r) { return toJSONError(errAuthentication) } + bucketLock := globalNSMutex.NewNSLock(args.BucketName, "") + bucketLock.Lock() + defer bucketLock.Unlock() if err := objectAPI.MakeBucket(args.BucketName); err != nil { return toJSONError(err, args.BucketName) } @@ -272,6 +274,11 @@ func (web *webAPIHandlers) RemoveObject(r *http.Request, args *RemoveObjectArgs, if !isJWTReqAuthenticated(r) { return toJSONError(errAuthentication) } + + objectLock := globalNSMutex.NewNSLock(args.BucketName, args.ObjectName) + objectLock.Lock() + defer objectLock.Unlock() + if err := objectAPI.DeleteObject(args.BucketName, args.ObjectName); err != nil { if isErrObjectNotFound(err) { // Ignore object not found error. @@ -478,16 +485,15 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) { // Extract incoming metadata if any. metadata := extractMetadataFromHeader(r.Header) - sha256sum := "" - if _, err := objectAPI.PutObject(bucket, object, -1, r.Body, metadata, sha256sum); err != nil { - writeWebErrorResponse(w, err) - return - } + // Lock the object. + objectLock := globalNSMutex.NewNSLock(bucket, object) + objectLock.Lock() + defer objectLock.Unlock() - // Fetch object info for notifications. - objInfo, err := objectAPI.GetObjectInfo(bucket, object) + sha256sum := "" + objInfo, err := objectAPI.PutObject(bucket, object, -1, r.Body, metadata, sha256sum) if err != nil { - errorIf(err, "Unable to fetch object info for \"%s\"", path.Join(bucket, object)) + writeWebErrorResponse(w, err) return } @@ -534,6 +540,11 @@ func (web *webAPIHandlers) Download(w http.ResponseWriter, r *http.Request) { // Add content disposition. w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=\"%s\"", path.Base(object))) + // Lock the object before reading. + objectLock := globalNSMutex.NewNSLock(bucket, object) + objectLock.RLock() + defer objectLock.RUnlock() + objInfo, err := objectAPI.GetObjectInfo(bucket, object) if err != nil { writeWebErrorResponse(w, err) @@ -691,16 +702,8 @@ func (web *webAPIHandlers) SetBucketPolicy(r *http.Request, args *SetBucketPolic return toJSONError(err) } - // Parse bucket policy. - var policy = &bucketPolicy{} - err = parseBucketPolicy(bytes.NewReader(data), policy) - if err != nil { - errorIf(err, "Unable to parse bucket policy.") - return toJSONError(err, args.BucketName) - } - - // Parse check bucket policy. - if s3Error := checkBucketPolicyResources(args.BucketName, policy); s3Error != ErrNone { + // Parse validate and save bucket policy. + if s3Error := parseAndPersistBucketPolicy(args.BucketName, data, objectAPI); s3Error != ErrNone { apiErr := getAPIError(s3Error) var err error if apiErr.Code == "XMinioPolicyNesting" { @@ -710,12 +713,6 @@ func (web *webAPIHandlers) SetBucketPolicy(r *http.Request, args *SetBucketPolic } return toJSONError(err, args.BucketName) } - - // TODO: update policy statements according to bucket name, - // prefix and policy arguments. - if err := persistAndNotifyBucketPolicyChange(args.BucketName, policyChange{false, policy}, objectAPI); err != nil { - return toJSONError(err, args.BucketName) - } reply.UIVersion = miniobrowser.UIVersion return nil } @@ -808,8 +805,7 @@ func toJSONError(err error, params ...string) (jerr *json2.Error) { case "InvalidBucketName": if len(params) > 0 { jerr = &json2.Error{ - Message: fmt.Sprintf("Bucket Name %s is invalid. Lowercase letters, period and numerals are the only allowed characters.", - params[0]), + Message: fmt.Sprintf("Bucket Name %s is invalid. Lowercase letters, period and numerals are the only allowed characters.", params[0]), } } // Bucket not found custom error message. diff --git a/cmd/xl-v1-bucket.go b/cmd/xl-v1-bucket.go index 963672c0b..52e993dcf 100644 --- a/cmd/xl-v1-bucket.go +++ b/cmd/xl-v1-bucket.go @@ -36,10 +36,6 @@ func (xl xlObjects) MakeBucket(bucket string) error { return traceError(BucketNameInvalid{Bucket: bucket}) } - bucketLock := nsMutex.NewNSLock(bucket, "") - bucketLock.Lock() - defer bucketLock.Unlock() - // Initialize sync waitgroup. var wg = &sync.WaitGroup{} @@ -153,10 +149,6 @@ func (xl xlObjects) GetBucketInfo(bucket string) (BucketInfo, error) { return BucketInfo{}, BucketNameInvalid{Bucket: bucket} } - bucketLock := nsMutex.NewNSLock(bucket, "") - bucketLock.RLock() - defer bucketLock.RUnlock() - bucketInfo, err := xl.getBucketInfo(bucket) if err != nil { return BucketInfo{}, toObjectErr(err, bucket) @@ -227,10 +219,6 @@ func (xl xlObjects) DeleteBucket(bucket string) error { return BucketNameInvalid{Bucket: bucket} } - bucketLock := nsMutex.NewNSLock(bucket, "") - bucketLock.Lock() - defer bucketLock.Unlock() - // Collect if all disks report volume not found. var wg = &sync.WaitGroup{} var dErrs = make([]error, len(xl.storageDisks)) diff --git a/cmd/xl-v1-healing.go b/cmd/xl-v1-healing.go index 10abbe43a..533d6ff2f 100644 --- a/cmd/xl-v1-healing.go +++ b/cmd/xl-v1-healing.go @@ -73,7 +73,7 @@ func (xl xlObjects) HealBucket(bucket string) error { // Heal bucket - create buckets on disks where it does not exist. func healBucket(storageDisks []StorageAPI, bucket string, writeQuorum int) error { - bucketLock := nsMutex.NewNSLock(bucket, "") + bucketLock := globalNSMutex.NewNSLock(bucket, "") bucketLock.Lock() defer bucketLock.Unlock() @@ -126,7 +126,7 @@ func healBucket(storageDisks []StorageAPI, bucket string, writeQuorum int) error // heals `policy.json`, `notification.xml` and `listeners.json`. func healBucketMetadata(storageDisks []StorageAPI, bucket string, readQuorum int) error { healBucketMetaFn := func(metaPath string) error { - metaLock := nsMutex.NewNSLock(minioMetaBucket, metaPath) + metaLock := globalNSMutex.NewNSLock(minioMetaBucket, metaPath) metaLock.RLock() defer metaLock.RUnlock() // Heals the given file at metaPath. @@ -346,7 +346,7 @@ func (xl xlObjects) HealObject(bucket, object string) error { } // Lock the object before healing. - objectLock := nsMutex.NewNSLock(bucket, object) + objectLock := globalNSMutex.NewNSLock(bucket, object) objectLock.RLock() defer objectLock.RUnlock() diff --git a/cmd/xl-v1-list-objects-heal.go b/cmd/xl-v1-list-objects-heal.go index 8ddf19f85..b38411bb9 100644 --- a/cmd/xl-v1-list-objects-heal.go +++ b/cmd/xl-v1-list-objects-heal.go @@ -144,7 +144,7 @@ func (xl xlObjects) listObjectsHeal(bucket, prefix, marker, delimiter string, ma } // Check if the current object needs healing - objectLock := nsMutex.NewNSLock(bucket, objInfo.Name) + objectLock := globalNSMutex.NewNSLock(bucket, objInfo.Name) objectLock.RLock() partsMetadata, errs := readAllXLMetadata(xl.storageDisks, bucket, objInfo.Name) if xlShouldHeal(partsMetadata, errs) { diff --git a/cmd/xl-v1-multipart.go b/cmd/xl-v1-multipart.go index 496e2672f..802c34ee4 100644 --- a/cmd/xl-v1-multipart.go +++ b/cmd/xl-v1-multipart.go @@ -64,7 +64,7 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark // uploadIDMarker first. if uploadIDMarker != "" { // hold lock on keyMarker path - keyMarkerLock := nsMutex.NewNSLock(minioMetaMultipartBucket, + keyMarkerLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, keyMarker)) keyMarkerLock.RLock() for _, disk := range xl.getLoadBalancedDisks() { @@ -134,7 +134,7 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark // For the new object entry we get all its // pending uploadIDs. - entryLock := nsMutex.NewNSLock(minioMetaMultipartBucket, + entryLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, entry)) entryLock.RLock() var disk StorageAPI @@ -242,7 +242,7 @@ func (xl xlObjects) newMultipartUpload(bucket string, object string, meta map[st // This lock needs to be held for any changes to the directory // contents of ".minio.sys/multipart/object/" - objectMPartPathLock := nsMutex.NewNSLock(minioMetaMultipartBucket, + objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object)) objectMPartPathLock.Lock() defer objectMPartPathLock.Unlock() @@ -305,7 +305,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s uploadIDPath := pathJoin(bucket, object, uploadID) // pre-check upload id lock. - preUploadIDLock := nsMutex.NewNSLock(minioMetaMultipartBucket, uploadIDPath) + preUploadIDLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, uploadIDPath) preUploadIDLock.RLock() // Validates if upload ID exists. if !xl.isUploadIDExists(bucket, object, uploadID) { @@ -414,7 +414,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s } // post-upload check (write) lock - postUploadIDLock := nsMutex.NewNSLock(minioMetaMultipartBucket, uploadIDPath) + postUploadIDLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, uploadIDPath) postUploadIDLock.Lock() defer postUploadIDLock.Unlock() @@ -557,7 +557,7 @@ func (xl xlObjects) ListObjectParts(bucket, object, uploadID string, partNumberM // Hold lock so that there is no competing // abort-multipart-upload or complete-multipart-upload. - uploadIDLock := nsMutex.NewNSLock(minioMetaMultipartBucket, + uploadIDLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object, uploadID)) uploadIDLock.Lock() defer uploadIDLock.Unlock() @@ -586,7 +586,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload // // 2) no one does a parallel complete-multipart-upload on this // multipart upload - uploadIDLock := nsMutex.NewNSLock(minioMetaMultipartBucket, + uploadIDLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object, uploadID)) uploadIDLock.Lock() defer uploadIDLock.Unlock() @@ -704,20 +704,12 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload return "", toObjectErr(rErr, minioMetaMultipartBucket, uploadIDPath) } - // Hold write lock on the destination before rename. - destLock := nsMutex.NewNSLock(bucket, object) - destLock.Lock() defer func() { if xl.objCacheEnabled { // A new complete multipart upload invalidates any // previously cached object in memory. xl.objCache.Delete(path.Join(bucket, object)) - } - // This lock also protects the cache namespace. - destLock.Unlock() - - if xl.objCacheEnabled { // Prefetch the object from disk by triggering a fake GetObject call // Unlike a regular single PutObject, multipart PutObject is comes in // stages and it is harder to cache. @@ -761,7 +753,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload // Hold the lock so that two parallel // complete-multipart-uploads do not leave a stale // uploads.json behind. - objectMPartPathLock := nsMutex.NewNSLock(minioMetaMultipartBucket, + objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object)) objectMPartPathLock.Lock() defer objectMPartPathLock.Unlock() @@ -787,7 +779,7 @@ func (xl xlObjects) abortMultipartUpload(bucket, object, uploadID string) (err e // hold lock so we don't compete with a complete, or abort // multipart request. - objectMPartPathLock := nsMutex.NewNSLock(minioMetaMultipartBucket, + objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object)) objectMPartPathLock.Lock() defer objectMPartPathLock.Unlock() @@ -819,7 +811,7 @@ func (xl xlObjects) AbortMultipartUpload(bucket, object, uploadID string) error // Hold lock so that there is no competing // complete-multipart-upload or put-object-part. - uploadIDLock := nsMutex.NewNSLock(minioMetaMultipartBucket, + uploadIDLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object, uploadID)) uploadIDLock.Lock() defer uploadIDLock.Unlock() diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index a8ed0f7fe..6ee87336a 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -62,11 +62,6 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i return traceError(errUnexpected) } - // Lock the object before reading. - objectLock := nsMutex.NewNSLock(bucket, object) - objectLock.RLock() - defer objectLock.RUnlock() - // Read metadata associated with the object from all disks. metaArr, errs := readAllXLMetadata(xl.storageDisks, bucket, object) // Do we have read quorum? @@ -222,10 +217,6 @@ func (xl xlObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) { return ObjectInfo{}, err } - objectLock := nsMutex.NewNSLock(bucket, object) - objectLock.RLock() - defer objectLock.RUnlock() - info, err := xl.getObjectInfo(bucket, object) if err != nil { return ObjectInfo{}, toObjectErr(err, bucket, object) @@ -485,11 +476,6 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. } } - // Lock the object. - objectLock := nsMutex.NewNSLock(bucket, object) - objectLock.Lock() - defer objectLock.Unlock() - // Check if an object is present as one of the parent dir. // -- FIXME. (needs a new kind of lock). if xl.parentDirIsObject(bucket, path.Dir(object)) { @@ -606,10 +592,6 @@ func (xl xlObjects) DeleteObject(bucket, object string) (err error) { return err } - objectLock := nsMutex.NewNSLock(bucket, object) - objectLock.Lock() - defer objectLock.Unlock() - // Validate object exists. if !xl.isObject(bucket, object) { return traceError(ObjectNotFound{bucket, object})