mirror of
https://github.com/minio/minio.git
synced 2025-01-11 15:03:22 -05:00
lock: Moving locking to handler layer. (#3381)
This is implemented so that the issues like in the following flow don't affect the behavior of operation. ``` GetObjectInfo() .... --> Time window for mutation (no lock held) .... --> Time window for mutation (no lock held) GetObject() ``` This happens when two simultaneous uploads are made to the same object the object has returned wrong info to the client. Another classic example is "CopyObject" API itself which reads from a source object and copies to destination object. Fixes #3370 Fixes #2912
This commit is contained in:
parent
cd0f350c02
commit
4daa0d2cee
@ -334,6 +334,10 @@ func (api objectAPIHandlers) PutBucketHandler(w http.ResponseWriter, r *http.Req
|
||||
return
|
||||
}
|
||||
|
||||
bucketLock := globalNSMutex.NewNSLock(bucket, "")
|
||||
bucketLock.Lock()
|
||||
defer bucketLock.Unlock()
|
||||
|
||||
// Proceed to creating a bucket.
|
||||
err := objectAPI.MakeBucket(bucket)
|
||||
if err != nil {
|
||||
@ -431,6 +435,10 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
|
||||
|
||||
sha256sum := ""
|
||||
|
||||
objectLock := globalNSMutex.NewNSLock(bucket, object)
|
||||
objectLock.Lock()
|
||||
defer objectLock.Unlock()
|
||||
|
||||
objInfo, err := objectAPI.PutObject(bucket, object, -1, fileBody, metadata, sha256sum)
|
||||
if err != nil {
|
||||
errorIf(err, "Unable to create object.")
|
||||
@ -478,6 +486,10 @@ func (api objectAPIHandlers) HeadBucketHandler(w http.ResponseWriter, r *http.Re
|
||||
return
|
||||
}
|
||||
|
||||
bucketLock := globalNSMutex.NewNSLock(bucket, "")
|
||||
bucketLock.RLock()
|
||||
defer bucketLock.RUnlock()
|
||||
|
||||
if _, err := objectAPI.GetBucketInfo(bucket); err != nil {
|
||||
errorIf(err, "Unable to fetch bucket info.")
|
||||
writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path)
|
||||
@ -503,6 +515,10 @@ func (api objectAPIHandlers) DeleteBucketHandler(w http.ResponseWriter, r *http.
|
||||
vars := mux.Vars(r)
|
||||
bucket := vars["bucket"]
|
||||
|
||||
bucketLock := globalNSMutex.NewNSLock(bucket, "")
|
||||
bucketLock.Lock()
|
||||
defer bucketLock.Unlock()
|
||||
|
||||
// Attempt to delete bucket.
|
||||
if err := objectAPI.DeleteBucket(bucket); err != nil {
|
||||
errorIf(err, "Unable to delete a bucket.")
|
||||
|
@ -23,7 +23,6 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"path"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
@ -174,7 +173,7 @@ func PutBucketNotificationConfig(bucket string, ncfg *notificationConfig, objAPI
|
||||
|
||||
// Acquire a write lock on bucket before modifying its
|
||||
// configuration.
|
||||
bucketLock := nsMutex.NewNSLock(bucket, "")
|
||||
bucketLock := globalNSMutex.NewNSLock(bucket, "")
|
||||
bucketLock.Lock()
|
||||
// Release lock after notifying peers
|
||||
defer bucketLock.Unlock()
|
||||
@ -381,7 +380,7 @@ func AddBucketListenerConfig(bucket string, lcfg *listenerConfig, objAPI ObjectL
|
||||
|
||||
// Acquire a write lock on bucket before modifying its
|
||||
// configuration.
|
||||
bucketLock := nsMutex.NewNSLock(bucket, "")
|
||||
bucketLock := globalNSMutex.NewNSLock(bucket, "")
|
||||
bucketLock.Lock()
|
||||
// Release lock after notifying peers
|
||||
defer bucketLock.Unlock()
|
||||
@ -423,7 +422,7 @@ func RemoveBucketListenerConfig(bucket string, lcfg *listenerConfig, objAPI Obje
|
||||
|
||||
// Acquire a write lock on bucket before modifying its
|
||||
// configuration.
|
||||
bucketLock := nsMutex.NewNSLock(bucket, "")
|
||||
bucketLock := globalNSMutex.NewNSLock(bucket, "")
|
||||
bucketLock.Lock()
|
||||
// Release lock after notifying peers
|
||||
defer bucketLock.Unlock()
|
||||
@ -441,14 +440,3 @@ func RemoveBucketListenerConfig(bucket string, lcfg *listenerConfig, objAPI Obje
|
||||
// peers (including local)
|
||||
S3PeersUpdateBucketListener(bucket, updatedLcfgs)
|
||||
}
|
||||
|
||||
// Removes notification.xml for a given bucket, only used during DeleteBucket.
|
||||
func removeNotificationConfig(bucket string, objAPI ObjectLayer) error {
|
||||
// Verify bucket is valid.
|
||||
if !IsValidBucketName(bucket) {
|
||||
return BucketNameInvalid{Bucket: bucket}
|
||||
}
|
||||
|
||||
notificationConfigPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig)
|
||||
return objAPI.DeleteObject(minioMetaBucket, notificationConfigPath)
|
||||
}
|
||||
|
@ -17,7 +17,6 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
@ -166,65 +165,17 @@ func (api objectAPIHandlers) PutBucketPolicyHandler(w http.ResponseWriter, r *ht
|
||||
writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path)
|
||||
return
|
||||
}
|
||||
// Parse bucket policy.
|
||||
var policy = &bucketPolicy{}
|
||||
err = parseBucketPolicy(bytes.NewReader(policyBytes), policy)
|
||||
if err != nil {
|
||||
errorIf(err, "Unable to parse bucket policy.")
|
||||
writeErrorResponse(w, r, ErrInvalidPolicyDocument, r.URL.Path)
|
||||
return
|
||||
}
|
||||
|
||||
// Parse check bucket policy.
|
||||
if s3Error := checkBucketPolicyResources(bucket, policy); s3Error != ErrNone {
|
||||
// Parse validate and save bucket policy.
|
||||
if s3Error := parseAndPersistBucketPolicy(bucket, policyBytes, objAPI); s3Error != ErrNone {
|
||||
writeErrorResponse(w, r, s3Error, r.URL.Path)
|
||||
return
|
||||
}
|
||||
|
||||
// Save bucket policy.
|
||||
if err = persistAndNotifyBucketPolicyChange(bucket, policyChange{false, policy}, objAPI); err != nil {
|
||||
switch err.(type) {
|
||||
case BucketNameInvalid:
|
||||
writeErrorResponse(w, r, ErrInvalidBucketName, r.URL.Path)
|
||||
default:
|
||||
writeErrorResponse(w, r, ErrInternalError, r.URL.Path)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Success.
|
||||
writeSuccessNoContent(w)
|
||||
}
|
||||
|
||||
// persistAndNotifyBucketPolicyChange - takes a policyChange argument,
|
||||
// persists it to storage, and notify nodes in the cluster about the
|
||||
// change. In-memory state is updated in response to the notification.
|
||||
func persistAndNotifyBucketPolicyChange(bucket string, pCh policyChange, objAPI ObjectLayer) error {
|
||||
// Acquire a write lock on bucket before modifying its
|
||||
// configuration.
|
||||
bucketLock := nsMutex.NewNSLock(bucket, "")
|
||||
bucketLock.Lock()
|
||||
// Release lock after notifying peers
|
||||
defer bucketLock.Unlock()
|
||||
|
||||
if pCh.IsRemove {
|
||||
if err := removeBucketPolicy(bucket, objAPI); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if pCh.BktPolicy == nil {
|
||||
return errInvalidArgument
|
||||
}
|
||||
if err := writeBucketPolicy(bucket, objAPI, pCh.BktPolicy); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Notify all peers (including self) to update in-memory state
|
||||
S3PeersUpdateBucketPolicy(bucket, pCh)
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteBucketPolicyHandler - DELETE Bucket policy
|
||||
// -----------------
|
||||
// This implementation of the DELETE operation uses the policy
|
||||
|
@ -144,6 +144,12 @@ func getOldBucketsConfigPath() (string, error) {
|
||||
// if bucket policy is not found.
|
||||
func readBucketPolicyJSON(bucket string, objAPI ObjectLayer) (bucketPolicyReader io.Reader, err error) {
|
||||
policyPath := pathJoin(bucketConfigPrefix, bucket, policyJSON)
|
||||
|
||||
// Acquire a read lock on policy config before reading.
|
||||
objLock := globalNSMutex.NewNSLock(minioMetaBucket, policyPath)
|
||||
objLock.RLock()
|
||||
defer objLock.RUnlock()
|
||||
|
||||
objInfo, err := objAPI.GetObjectInfo(minioMetaBucket, policyPath)
|
||||
if err != nil {
|
||||
if isErrObjectNotFound(err) || isErrIncompleteBody(err) {
|
||||
@ -188,6 +194,10 @@ func readBucketPolicy(bucket string, objAPI ObjectLayer) (*bucketPolicy, error)
|
||||
// if no policies are found.
|
||||
func removeBucketPolicy(bucket string, objAPI ObjectLayer) error {
|
||||
policyPath := pathJoin(bucketConfigPrefix, bucket, policyJSON)
|
||||
// Acquire a write lock on policy config before modifying.
|
||||
objLock := globalNSMutex.NewNSLock(minioMetaBucket, policyPath)
|
||||
objLock.Lock()
|
||||
defer objLock.Unlock()
|
||||
if err := objAPI.DeleteObject(minioMetaBucket, policyPath); err != nil {
|
||||
errorIf(err, "Unable to remove bucket-policy on bucket %s.", bucket)
|
||||
err = errorCause(err)
|
||||
@ -207,9 +217,70 @@ func writeBucketPolicy(bucket string, objAPI ObjectLayer, bpy *bucketPolicy) err
|
||||
return err
|
||||
}
|
||||
policyPath := pathJoin(bucketConfigPrefix, bucket, policyJSON)
|
||||
// Acquire a write lock on policy config before modifying.
|
||||
objLock := globalNSMutex.NewNSLock(minioMetaBucket, policyPath)
|
||||
objLock.Lock()
|
||||
defer objLock.Unlock()
|
||||
if _, err := objAPI.PutObject(minioMetaBucket, policyPath, int64(len(buf)), bytes.NewReader(buf), nil, ""); err != nil {
|
||||
errorIf(err, "Unable to set policy for the bucket %s", bucket)
|
||||
return errorCause(err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func parseAndPersistBucketPolicy(bucket string, policyBytes []byte, objAPI ObjectLayer) APIErrorCode {
|
||||
// Parse bucket policy.
|
||||
var policy = &bucketPolicy{}
|
||||
err := parseBucketPolicy(bytes.NewReader(policyBytes), policy)
|
||||
if err != nil {
|
||||
errorIf(err, "Unable to parse bucket policy.")
|
||||
return ErrInvalidPolicyDocument
|
||||
}
|
||||
|
||||
// Parse check bucket policy.
|
||||
if s3Error := checkBucketPolicyResources(bucket, policy); s3Error != ErrNone {
|
||||
return s3Error
|
||||
}
|
||||
|
||||
// Acquire a write lock on bucket before modifying its configuration.
|
||||
bucketLock := globalNSMutex.NewNSLock(bucket, "")
|
||||
bucketLock.Lock()
|
||||
// Release lock after notifying peers
|
||||
defer bucketLock.Unlock()
|
||||
|
||||
// Save bucket policy.
|
||||
if err = persistAndNotifyBucketPolicyChange(bucket, policyChange{false, policy}, objAPI); err != nil {
|
||||
switch err.(type) {
|
||||
case BucketNameInvalid:
|
||||
return ErrInvalidBucketName
|
||||
case BucketNotFound:
|
||||
return ErrNoSuchBucket
|
||||
default:
|
||||
errorIf(err, "Unable to save bucket policy.")
|
||||
return ErrInternalError
|
||||
}
|
||||
}
|
||||
return ErrNone
|
||||
}
|
||||
|
||||
// persistAndNotifyBucketPolicyChange - takes a policyChange argument,
|
||||
// persists it to storage, and notify nodes in the cluster about the
|
||||
// change. In-memory state is updated in response to the notification.
|
||||
func persistAndNotifyBucketPolicyChange(bucket string, pCh policyChange, objAPI ObjectLayer) error {
|
||||
if pCh.IsRemove {
|
||||
if err := removeBucketPolicy(bucket, objAPI); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if pCh.BktPolicy == nil {
|
||||
return errInvalidArgument
|
||||
}
|
||||
if err := writeBucketPolicy(bucket, objAPI, pCh.BktPolicy); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Notify all peers (including self) to update in-memory state
|
||||
S3PeersUpdateBucketPolicy(bucket, pCh)
|
||||
return nil
|
||||
}
|
||||
|
@ -301,8 +301,14 @@ func eventNotify(event eventData) {
|
||||
// structured notification config.
|
||||
func loadNotificationConfig(bucket string, objAPI ObjectLayer) (*notificationConfig, error) {
|
||||
// Construct the notification config path.
|
||||
notificationConfigPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig)
|
||||
objInfo, err := objAPI.GetObjectInfo(minioMetaBucket, notificationConfigPath)
|
||||
ncPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig)
|
||||
|
||||
// Acquire a write lock on notification config before modifying.
|
||||
objLock := globalNSMutex.NewNSLock(minioMetaBucket, ncPath)
|
||||
objLock.RLock()
|
||||
defer objLock.RUnlock()
|
||||
|
||||
objInfo, err := objAPI.GetObjectInfo(minioMetaBucket, ncPath)
|
||||
if err != nil {
|
||||
// 'notification.xml' not found return
|
||||
// 'errNoSuchNotifications'. This is default when no
|
||||
@ -315,7 +321,7 @@ func loadNotificationConfig(bucket string, objAPI ObjectLayer) (*notificationCon
|
||||
return nil, err
|
||||
}
|
||||
var buffer bytes.Buffer
|
||||
err = objAPI.GetObject(minioMetaBucket, notificationConfigPath, 0, objInfo.Size, &buffer)
|
||||
err = objAPI.GetObject(minioMetaBucket, ncPath, 0, objInfo.Size, &buffer)
|
||||
if err != nil {
|
||||
// 'notification.xml' not found return
|
||||
// 'errNoSuchNotifications'. This is default when no
|
||||
@ -350,8 +356,14 @@ func loadListenerConfig(bucket string, objAPI ObjectLayer) ([]listenerConfig, er
|
||||
}
|
||||
|
||||
// Construct the notification config path.
|
||||
listenerConfigPath := path.Join(bucketConfigPrefix, bucket, bucketListenerConfig)
|
||||
objInfo, err := objAPI.GetObjectInfo(minioMetaBucket, listenerConfigPath)
|
||||
lcPath := path.Join(bucketConfigPrefix, bucket, bucketListenerConfig)
|
||||
|
||||
// Acquire a write lock on notification config before modifying.
|
||||
objLock := globalNSMutex.NewNSLock(minioMetaBucket, lcPath)
|
||||
objLock.RLock()
|
||||
defer objLock.RUnlock()
|
||||
|
||||
objInfo, err := objAPI.GetObjectInfo(minioMetaBucket, lcPath)
|
||||
if err != nil {
|
||||
// 'listener.json' not found return
|
||||
// 'errNoSuchNotifications'. This is default when no
|
||||
@ -364,7 +376,7 @@ func loadListenerConfig(bucket string, objAPI ObjectLayer) ([]listenerConfig, er
|
||||
return nil, err
|
||||
}
|
||||
var buffer bytes.Buffer
|
||||
err = objAPI.GetObject(minioMetaBucket, listenerConfigPath, 0, objInfo.Size, &buffer)
|
||||
err = objAPI.GetObject(minioMetaBucket, lcPath, 0, objInfo.Size, &buffer)
|
||||
if err != nil {
|
||||
// 'notification.xml' not found return
|
||||
// 'errNoSuchNotifications'. This is default when no
|
||||
@ -399,6 +411,11 @@ func persistNotificationConfig(bucket string, ncfg *notificationConfig, obj Obje
|
||||
|
||||
// build path
|
||||
ncPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig)
|
||||
// Acquire a write lock on notification config before modifying.
|
||||
objLock := globalNSMutex.NewNSLock(minioMetaBucket, ncPath)
|
||||
objLock.Lock()
|
||||
defer objLock.Unlock()
|
||||
|
||||
// write object to path
|
||||
sha256Sum := getSHA256Hash(buf)
|
||||
_, err = obj.PutObject(minioMetaBucket, ncPath, int64(len(buf)), bytes.NewReader(buf), nil, sha256Sum)
|
||||
@ -419,6 +436,11 @@ func persistListenerConfig(bucket string, lcfg []listenerConfig, obj ObjectLayer
|
||||
|
||||
// build path
|
||||
lcPath := path.Join(bucketConfigPrefix, bucket, bucketListenerConfig)
|
||||
// Acquire a write lock on notification config before modifying.
|
||||
objLock := globalNSMutex.NewNSLock(minioMetaBucket, lcPath)
|
||||
objLock.Lock()
|
||||
defer objLock.Unlock()
|
||||
|
||||
// write object to path
|
||||
sha256Sum := getSHA256Hash(buf)
|
||||
_, err = obj.PutObject(minioMetaBucket, lcPath, int64(len(buf)), bytes.NewReader(buf), nil, sha256Sum)
|
||||
@ -428,12 +450,34 @@ func persistListenerConfig(bucket string, lcfg []listenerConfig, obj ObjectLayer
|
||||
return err
|
||||
}
|
||||
|
||||
// Removes notification.xml for a given bucket, only used during DeleteBucket.
|
||||
func removeNotificationConfig(bucket string, objAPI ObjectLayer) error {
|
||||
// Verify bucket is valid.
|
||||
if !IsValidBucketName(bucket) {
|
||||
return BucketNameInvalid{Bucket: bucket}
|
||||
}
|
||||
|
||||
ncPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig)
|
||||
|
||||
// Acquire a write lock on notification config before modifying.
|
||||
objLock := globalNSMutex.NewNSLock(minioMetaBucket, ncPath)
|
||||
objLock.Lock()
|
||||
err := objAPI.DeleteObject(minioMetaBucket, ncPath)
|
||||
objLock.Unlock()
|
||||
return err
|
||||
}
|
||||
|
||||
// Remove listener configuration from storage layer. Used when a bucket is deleted.
|
||||
func removeListenerConfig(bucket string, objAPI ObjectLayer) error {
|
||||
// make the path
|
||||
lcPath := path.Join(bucketConfigPrefix, bucket, bucketListenerConfig)
|
||||
// remove it
|
||||
return objAPI.DeleteObject(minioMetaBucket, lcPath)
|
||||
|
||||
// Acquire a write lock on notification config before modifying.
|
||||
objLock := globalNSMutex.NewNSLock(minioMetaBucket, lcPath)
|
||||
objLock.Lock()
|
||||
err := objAPI.DeleteObject(minioMetaBucket, lcPath)
|
||||
objLock.Unlock()
|
||||
return err
|
||||
}
|
||||
|
||||
// Loads both notification and listener config.
|
||||
|
@ -57,7 +57,7 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
|
||||
var err error
|
||||
var eof bool
|
||||
if uploadIDMarker != "" {
|
||||
keyMarkerLock := nsMutex.NewNSLock(minioMetaMultipartBucket,
|
||||
keyMarkerLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket,
|
||||
pathJoin(bucket, keyMarker))
|
||||
keyMarkerLock.RLock()
|
||||
uploads, _, err = listMultipartUploadIDs(bucket, keyMarker, uploadIDMarker, maxUploads, fs.storage)
|
||||
@ -112,7 +112,7 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
|
||||
var end bool
|
||||
uploadIDMarker = ""
|
||||
|
||||
entryLock := nsMutex.NewNSLock(minioMetaMultipartBucket,
|
||||
entryLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket,
|
||||
pathJoin(bucket, entry))
|
||||
entryLock.RLock()
|
||||
tmpUploads, end, err = listMultipartUploadIDs(bucket, entry, uploadIDMarker, maxUploads, fs.storage)
|
||||
@ -192,7 +192,7 @@ func (fs fsObjects) newMultipartUpload(bucket string, object string, meta map[st
|
||||
|
||||
// This lock needs to be held for any changes to the directory
|
||||
// contents of ".minio.sys/multipart/object/"
|
||||
objectMPartPathLock := nsMutex.NewNSLock(minioMetaMultipartBucket,
|
||||
objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket,
|
||||
pathJoin(bucket, object))
|
||||
objectMPartPathLock.Lock()
|
||||
defer objectMPartPathLock.Unlock()
|
||||
@ -248,7 +248,7 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
|
||||
|
||||
uploadIDPath := path.Join(bucket, object, uploadID)
|
||||
|
||||
preUploadIDLock := nsMutex.NewNSLock(minioMetaMultipartBucket, uploadIDPath)
|
||||
preUploadIDLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, uploadIDPath)
|
||||
preUploadIDLock.RLock()
|
||||
// Just check if the uploadID exists to avoid copy if it doesn't.
|
||||
uploadIDExists := fs.isUploadIDExists(bucket, object, uploadID)
|
||||
@ -329,7 +329,7 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
|
||||
}
|
||||
|
||||
// Hold write lock as we are updating fs.json
|
||||
postUploadIDLock := nsMutex.NewNSLock(minioMetaMultipartBucket, uploadIDPath)
|
||||
postUploadIDLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, uploadIDPath)
|
||||
postUploadIDLock.Lock()
|
||||
defer postUploadIDLock.Unlock()
|
||||
|
||||
@ -348,7 +348,7 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
|
||||
partPath := path.Join(bucket, object, uploadID, partSuffix)
|
||||
// Lock the part so that another part upload with same part-number gets blocked
|
||||
// while the part is getting appended in the background.
|
||||
partLock := nsMutex.NewNSLock(minioMetaMultipartBucket, partPath)
|
||||
partLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, partPath)
|
||||
partLock.Lock()
|
||||
err = fs.storage.RenameFile(minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath)
|
||||
if err != nil {
|
||||
@ -439,7 +439,7 @@ func (fs fsObjects) ListObjectParts(bucket, object, uploadID string, partNumberM
|
||||
|
||||
// Hold lock so that there is no competing
|
||||
// abort-multipart-upload or complete-multipart-upload.
|
||||
uploadIDLock := nsMutex.NewNSLock(minioMetaMultipartBucket,
|
||||
uploadIDLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket,
|
||||
pathJoin(bucket, object, uploadID))
|
||||
uploadIDLock.Lock()
|
||||
defer uploadIDLock.Unlock()
|
||||
@ -479,7 +479,7 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
|
||||
// 1) no one aborts this multipart upload
|
||||
// 2) no one does a parallel complete-multipart-upload on this
|
||||
// multipart upload
|
||||
uploadIDLock := nsMutex.NewNSLock(minioMetaMultipartBucket, uploadIDPath)
|
||||
uploadIDLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, uploadIDPath)
|
||||
uploadIDLock.Lock()
|
||||
defer uploadIDLock.Unlock()
|
||||
|
||||
@ -502,14 +502,11 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
|
||||
|
||||
// This lock is held during rename of the appended tmp file to the actual
|
||||
// location so that any competing GetObject/PutObject/DeleteObject do not race.
|
||||
objectLock := nsMutex.NewNSLock(bucket, object)
|
||||
appendFallback := true // In case background-append did not append the required parts.
|
||||
if isPartsSame(fsMeta.Parts, parts) {
|
||||
err = fs.bgAppend.complete(fs.storage, bucket, object, uploadID, fsMeta)
|
||||
if err == nil {
|
||||
appendFallback = false
|
||||
objectLock.Lock()
|
||||
defer objectLock.Unlock()
|
||||
if err = fs.storage.RenameFile(minioMetaTmpBucket, uploadID, bucket, object); err != nil {
|
||||
return "", toObjectErr(traceError(err), minioMetaTmpBucket, uploadID)
|
||||
}
|
||||
@ -584,8 +581,6 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
|
||||
}
|
||||
}
|
||||
|
||||
objectLock.Lock()
|
||||
defer objectLock.Unlock()
|
||||
// Rename the file back to original location, if not delete the temporary object.
|
||||
err = fs.storage.RenameFile(minioMetaTmpBucket, tempObj, bucket, object)
|
||||
if err != nil {
|
||||
@ -619,7 +614,7 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
|
||||
// Hold the lock so that two parallel
|
||||
// complete-multipart-uploads do not leave a stale
|
||||
// uploads.json behind.
|
||||
objectMPartPathLock := nsMutex.NewNSLock(minioMetaMultipartBucket,
|
||||
objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket,
|
||||
pathJoin(bucket, object))
|
||||
objectMPartPathLock.Lock()
|
||||
defer objectMPartPathLock.Unlock()
|
||||
@ -672,7 +667,7 @@ func (fs fsObjects) AbortMultipartUpload(bucket, object, uploadID string) error
|
||||
|
||||
// Hold lock so that there is no competing
|
||||
// complete-multipart-upload or put-object-part.
|
||||
uploadIDLock := nsMutex.NewNSLock(minioMetaMultipartBucket,
|
||||
uploadIDLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket,
|
||||
pathJoin(bucket, object, uploadID))
|
||||
uploadIDLock.Lock()
|
||||
defer uploadIDLock.Unlock()
|
||||
|
16
cmd/fs-v1.go
16
cmd/fs-v1.go
@ -246,11 +246,6 @@ func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64,
|
||||
return traceError(InvalidRange{offset, length, fi.Size})
|
||||
}
|
||||
|
||||
// Lock the object before reading.
|
||||
objectLock := nsMutex.NewNSLock(bucket, object)
|
||||
objectLock.RLock()
|
||||
defer objectLock.RUnlock()
|
||||
|
||||
var totalLeft = length
|
||||
bufSize := int64(readSizeV1)
|
||||
if length > 0 && bufSize > length {
|
||||
@ -450,11 +445,6 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.
|
||||
}
|
||||
}
|
||||
|
||||
// Lock the object before committing the object.
|
||||
objectLock := nsMutex.NewNSLock(bucket, object)
|
||||
objectLock.RLock()
|
||||
defer objectLock.RUnlock()
|
||||
|
||||
// Entire object was written to the temp location, now it's safe to rename it to the actual location.
|
||||
err = fs.storage.RenameFile(minioMetaTmpBucket, tempObj, bucket, object)
|
||||
if err != nil {
|
||||
@ -484,12 +474,6 @@ func (fs fsObjects) DeleteObject(bucket, object string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Lock the object before deleting so that an in progress GetObject does not return
|
||||
// corrupt data or there is no race with a PutObject.
|
||||
objectLock := nsMutex.NewNSLock(bucket, object)
|
||||
objectLock.RLock()
|
||||
defer objectLock.RUnlock()
|
||||
|
||||
if bucket != minioMetaBucket {
|
||||
// We don't store fs.json for minio-S3-layer created files like policy.json,
|
||||
// hence we don't try to delete fs.json for such files.
|
||||
|
@ -119,26 +119,26 @@ func verifyRPCLockInfoResponse(l lockStateCase, rpcLockInfoMap map[string]*Syste
|
||||
}
|
||||
}
|
||||
|
||||
// Asserts the lock counter from the global nsMutex inmemory lock with the expected one.
|
||||
// Asserts the lock counter from the global globalNSMutex inmemory lock with the expected one.
|
||||
func verifyGlobalLockStats(l lockStateCase, t *testing.T, testNum int) {
|
||||
nsMutex.lockMapMutex.Lock()
|
||||
globalNSMutex.lockMapMutex.Lock()
|
||||
|
||||
// Verifying the lock stats.
|
||||
if nsMutex.globalLockCounter != int64(l.expectedGlobalLockCount) {
|
||||
if globalNSMutex.globalLockCounter != int64(l.expectedGlobalLockCount) {
|
||||
t.Errorf("Test %d: Expected the global lock counter to be %v, but got %v", testNum, int64(l.expectedGlobalLockCount),
|
||||
nsMutex.globalLockCounter)
|
||||
globalNSMutex.globalLockCounter)
|
||||
}
|
||||
// verify the count for total blocked locks.
|
||||
if nsMutex.blockedCounter != int64(l.expectedBlockedLockCount) {
|
||||
if globalNSMutex.blockedCounter != int64(l.expectedBlockedLockCount) {
|
||||
t.Errorf("Test %d: Expected the total blocked lock counter to be %v, but got %v", testNum, int64(l.expectedBlockedLockCount),
|
||||
nsMutex.blockedCounter)
|
||||
globalNSMutex.blockedCounter)
|
||||
}
|
||||
// verify the count for total running locks.
|
||||
if nsMutex.runningLockCounter != int64(l.expectedRunningLockCount) {
|
||||
if globalNSMutex.runningLockCounter != int64(l.expectedRunningLockCount) {
|
||||
t.Errorf("Test %d: Expected the total running lock counter to be %v, but got %v", testNum, int64(l.expectedRunningLockCount),
|
||||
nsMutex.runningLockCounter)
|
||||
globalNSMutex.runningLockCounter)
|
||||
}
|
||||
nsMutex.lockMapMutex.Unlock()
|
||||
globalNSMutex.lockMapMutex.Unlock()
|
||||
// Verifying again with the JSON response of the lock info.
|
||||
// Verifying the lock stats.
|
||||
sysLockState, err := getSystemLockState()
|
||||
@ -164,35 +164,35 @@ func verifyGlobalLockStats(l lockStateCase, t *testing.T, testNum int) {
|
||||
|
||||
// Verify the lock counter for entries of given <volume, path> pair.
|
||||
func verifyLockStats(l lockStateCase, t *testing.T, testNum int) {
|
||||
nsMutex.lockMapMutex.Lock()
|
||||
defer nsMutex.lockMapMutex.Unlock()
|
||||
globalNSMutex.lockMapMutex.Lock()
|
||||
defer globalNSMutex.lockMapMutex.Unlock()
|
||||
param := nsParam{l.volume, l.path}
|
||||
|
||||
// Verify the total locks (blocked+running) for given <vol,path> pair.
|
||||
if nsMutex.debugLockMap[param].ref != int64(l.expectedVolPathLockCount) {
|
||||
if globalNSMutex.debugLockMap[param].ref != int64(l.expectedVolPathLockCount) {
|
||||
t.Errorf("Test %d: Expected the total lock count for volume: \"%s\", path: \"%s\" to be %v, but got %v", testNum,
|
||||
param.volume, param.path, int64(l.expectedVolPathLockCount), nsMutex.debugLockMap[param].ref)
|
||||
param.volume, param.path, int64(l.expectedVolPathLockCount), globalNSMutex.debugLockMap[param].ref)
|
||||
}
|
||||
// Verify the total running locks for given <volume, path> pair.
|
||||
if nsMutex.debugLockMap[param].running != int64(l.expectedVolPathRunningCount) {
|
||||
if globalNSMutex.debugLockMap[param].running != int64(l.expectedVolPathRunningCount) {
|
||||
t.Errorf("Test %d: Expected the total running locks for volume: \"%s\", path: \"%s\" to be %v, but got %v", testNum, param.volume, param.path,
|
||||
int64(l.expectedVolPathRunningCount), nsMutex.debugLockMap[param].running)
|
||||
int64(l.expectedVolPathRunningCount), globalNSMutex.debugLockMap[param].running)
|
||||
}
|
||||
// Verify the total blocked locks for givne <volume, path> pair.
|
||||
if nsMutex.debugLockMap[param].blocked != int64(l.expectedVolPathBlockCount) {
|
||||
if globalNSMutex.debugLockMap[param].blocked != int64(l.expectedVolPathBlockCount) {
|
||||
t.Errorf("Test %d: Expected the total blocked locks for volume: \"%s\", path: \"%s\" to be %v, but got %v", testNum, param.volume, param.path,
|
||||
int64(l.expectedVolPathBlockCount), nsMutex.debugLockMap[param].blocked)
|
||||
int64(l.expectedVolPathBlockCount), globalNSMutex.debugLockMap[param].blocked)
|
||||
}
|
||||
}
|
||||
|
||||
// verifyLockState - function which asserts the expected lock info in the system with the actual values in the nsMutex.
|
||||
// verifyLockState - function which asserts the expected lock info in the system with the actual values in the globalNSMutex.
|
||||
func verifyLockState(l lockStateCase, t *testing.T, testNum int) {
|
||||
param := nsParam{l.volume, l.path}
|
||||
|
||||
verifyGlobalLockStats(l, t, testNum)
|
||||
nsMutex.lockMapMutex.Lock()
|
||||
globalNSMutex.lockMapMutex.Lock()
|
||||
// Verifying the lock statuS fields.
|
||||
if debugLockMap, ok := nsMutex.debugLockMap[param]; ok {
|
||||
if debugLockMap, ok := globalNSMutex.debugLockMap[param]; ok {
|
||||
if lockInfo, ok := debugLockMap.lockInfo[l.opsID]; ok {
|
||||
// Validating the lock type filed in the debug lock information.
|
||||
if l.readLock {
|
||||
@ -222,7 +222,7 @@ func verifyLockState(l lockStateCase, t *testing.T, testNum int) {
|
||||
t.Errorf("Test case %d: Debug lock entry for volume: %s, path: %s doesn't exist", testNum, param.volume, param.path)
|
||||
}
|
||||
// verifyLockStats holds its own lock.
|
||||
nsMutex.lockMapMutex.Unlock()
|
||||
globalNSMutex.lockMapMutex.Unlock()
|
||||
|
||||
// verify the lock count.
|
||||
verifyLockStats(l, t, testNum)
|
||||
@ -319,7 +319,7 @@ func TestNsLockMapStatusBlockedToRunning(t *testing.T) {
|
||||
param := nsParam{testCases[0].volume, testCases[0].path}
|
||||
// Testing before the initialization done.
|
||||
// Since the data structures for
|
||||
actualErr := nsMutex.statusBlockedToRunning(param, testCases[0].lockSource,
|
||||
actualErr := globalNSMutex.statusBlockedToRunning(param, testCases[0].lockSource,
|
||||
testCases[0].opsID, testCases[0].readLock)
|
||||
|
||||
expectedErr := LockInfoVolPathMissing{testCases[0].volume, testCases[0].path}
|
||||
@ -327,14 +327,14 @@ func TestNsLockMapStatusBlockedToRunning(t *testing.T) {
|
||||
t.Fatalf("Errors mismatch: Expected \"%s\", got \"%s\"", expectedErr, actualErr)
|
||||
}
|
||||
|
||||
nsMutex = &nsLockMap{
|
||||
globalNSMutex = &nsLockMap{
|
||||
// entries of <volume,path> -> stateInfo of locks, for instrumentation purpose.
|
||||
debugLockMap: make(map[nsParam]*debugLockInfoPerVolumePath),
|
||||
lockMap: make(map[nsParam]*nsLock),
|
||||
}
|
||||
// Entry for <volume, path> pair is set to nil. Should fail with `errLockNotInitialized`.
|
||||
nsMutex.debugLockMap[param] = nil
|
||||
actualErr = nsMutex.statusBlockedToRunning(param, testCases[0].lockSource,
|
||||
globalNSMutex.debugLockMap[param] = nil
|
||||
actualErr = globalNSMutex.statusBlockedToRunning(param, testCases[0].lockSource,
|
||||
testCases[0].opsID, testCases[0].readLock)
|
||||
|
||||
if errorCause(actualErr) != errLockNotInitialized {
|
||||
@ -342,14 +342,14 @@ func TestNsLockMapStatusBlockedToRunning(t *testing.T) {
|
||||
}
|
||||
|
||||
// Setting the lock info the be `nil`.
|
||||
nsMutex.debugLockMap[param] = &debugLockInfoPerVolumePath{
|
||||
globalNSMutex.debugLockMap[param] = &debugLockInfoPerVolumePath{
|
||||
lockInfo: nil, // setting the lockinfo to nil.
|
||||
ref: 0,
|
||||
blocked: 0,
|
||||
running: 0,
|
||||
}
|
||||
|
||||
actualErr = nsMutex.statusBlockedToRunning(param, testCases[0].lockSource,
|
||||
actualErr = globalNSMutex.statusBlockedToRunning(param, testCases[0].lockSource,
|
||||
testCases[0].opsID, testCases[0].readLock)
|
||||
|
||||
expectedOpsErr := LockInfoOpsIDNotFound{testCases[0].volume, testCases[0].path, testCases[0].opsID}
|
||||
@ -359,7 +359,7 @@ func TestNsLockMapStatusBlockedToRunning(t *testing.T) {
|
||||
|
||||
// Next case: ase whether an attempt to change the state of the lock to "Running" done,
|
||||
// but the initial state if already "Running". Such an attempt should fail
|
||||
nsMutex.debugLockMap[param] = &debugLockInfoPerVolumePath{
|
||||
globalNSMutex.debugLockMap[param] = &debugLockInfoPerVolumePath{
|
||||
lockInfo: make(map[string]debugLockInfo),
|
||||
ref: 0,
|
||||
blocked: 0,
|
||||
@ -368,13 +368,13 @@ func TestNsLockMapStatusBlockedToRunning(t *testing.T) {
|
||||
|
||||
// Setting the status of the lock to be "Running".
|
||||
// The initial state of the lock should set to "Blocked", otherwise its not possible to change the state from "Blocked" -> "Running".
|
||||
nsMutex.debugLockMap[param].lockInfo[testCases[0].opsID] = debugLockInfo{
|
||||
globalNSMutex.debugLockMap[param].lockInfo[testCases[0].opsID] = debugLockInfo{
|
||||
lockSource: "/home/vadmeste/work/go/src/github.com/minio/minio/xl-v1-object.go:683 +0x2a",
|
||||
status: "Running", // State set to "Running". Should fail with `LockInfoStateNotBlocked`.
|
||||
since: time.Now().UTC(),
|
||||
}
|
||||
|
||||
actualErr = nsMutex.statusBlockedToRunning(param, testCases[0].lockSource,
|
||||
actualErr = globalNSMutex.statusBlockedToRunning(param, testCases[0].lockSource,
|
||||
testCases[0].opsID, testCases[0].readLock)
|
||||
|
||||
expectedBlockErr := LockInfoStateNotBlocked{testCases[0].volume, testCases[0].path, testCases[0].opsID}
|
||||
@ -390,22 +390,22 @@ func TestNsLockMapStatusBlockedToRunning(t *testing.T) {
|
||||
param := nsParam{testCase.volume, testCase.path}
|
||||
// status of the lock to be set to "Blocked", before setting Blocked->Running.
|
||||
if testCase.setBlocked {
|
||||
nsMutex.lockMapMutex.Lock()
|
||||
err := nsMutex.statusNoneToBlocked(param, testCase.lockSource, testCase.opsID, testCase.readLock)
|
||||
globalNSMutex.lockMapMutex.Lock()
|
||||
err := globalNSMutex.statusNoneToBlocked(param, testCase.lockSource, testCase.opsID, testCase.readLock)
|
||||
if err != nil {
|
||||
t.Fatalf("Test %d: Initializing the initial state to Blocked failed <ERROR> %s", i+1, err)
|
||||
}
|
||||
nsMutex.lockMapMutex.Unlock()
|
||||
globalNSMutex.lockMapMutex.Unlock()
|
||||
}
|
||||
// invoking the method under test.
|
||||
actualErr = nsMutex.statusBlockedToRunning(param, testCase.lockSource, testCase.opsID, testCase.readLock)
|
||||
actualErr = globalNSMutex.statusBlockedToRunning(param, testCase.lockSource, testCase.opsID, testCase.readLock)
|
||||
if errorCause(actualErr) != testCase.expectedErr {
|
||||
t.Fatalf("Test %d: Errors mismatch: Expected: \"%s\", got: \"%s\"", i+1, testCase.expectedErr, actualErr)
|
||||
}
|
||||
// In case of no error proceed with validating the lock state information.
|
||||
if actualErr == nil {
|
||||
// debug entry for given <volume, path> pair should exist.
|
||||
if debugLockMap, ok := nsMutex.debugLockMap[param]; ok {
|
||||
if debugLockMap, ok := globalNSMutex.debugLockMap[param]; ok {
|
||||
if lockInfo, ok := debugLockMap.lockInfo[testCase.opsID]; ok {
|
||||
// Validating the lock type filed in the debug lock information.
|
||||
if testCase.readLock {
|
||||
@ -514,7 +514,7 @@ func TestNsLockMapStatusNoneToBlocked(t *testing.T) {
|
||||
param := nsParam{testCases[0].volume, testCases[0].path}
|
||||
// Testing before the initialization done.
|
||||
// Since the data structures for
|
||||
actualErr := nsMutex.statusBlockedToRunning(param, testCases[0].lockSource,
|
||||
actualErr := globalNSMutex.statusBlockedToRunning(param, testCases[0].lockSource,
|
||||
testCases[0].opsID, testCases[0].readLock)
|
||||
|
||||
expectedErr := LockInfoVolPathMissing{testCases[0].volume, testCases[0].path}
|
||||
@ -524,13 +524,13 @@ func TestNsLockMapStatusNoneToBlocked(t *testing.T) {
|
||||
|
||||
// Iterate over the cases and assert the result.
|
||||
for i, testCase := range testCases {
|
||||
nsMutex.lockMapMutex.Lock()
|
||||
globalNSMutex.lockMapMutex.Lock()
|
||||
param := nsParam{testCase.volume, testCase.path}
|
||||
actualErr := nsMutex.statusNoneToBlocked(param, testCase.lockSource, testCase.opsID, testCase.readLock)
|
||||
actualErr := globalNSMutex.statusNoneToBlocked(param, testCase.lockSource, testCase.opsID, testCase.readLock)
|
||||
if actualErr != testCase.expectedErr {
|
||||
t.Fatalf("Test %d: Errors mismatch: Expected: \"%s\", got: \"%s\"", i+1, testCase.expectedErr, actualErr)
|
||||
}
|
||||
nsMutex.lockMapMutex.Unlock()
|
||||
globalNSMutex.lockMapMutex.Unlock()
|
||||
if actualErr == nil {
|
||||
verifyLockState(testCase, t, i+1)
|
||||
}
|
||||
@ -559,7 +559,7 @@ func TestNsLockMapDeleteLockInfoEntryForOps(t *testing.T) {
|
||||
param := nsParam{testCases[0].volume, testCases[0].path}
|
||||
// Testing before the initialization done.
|
||||
|
||||
actualErr := nsMutex.deleteLockInfoEntryForOps(param, testCases[0].opsID)
|
||||
actualErr := globalNSMutex.deleteLockInfoEntryForOps(param, testCases[0].opsID)
|
||||
|
||||
expectedErr := LockInfoVolPathMissing{testCases[0].volume, testCases[0].path}
|
||||
if errorCause(actualErr) != expectedErr {
|
||||
@ -568,17 +568,17 @@ func TestNsLockMapDeleteLockInfoEntryForOps(t *testing.T) {
|
||||
|
||||
// Case - 2.
|
||||
// Lock state is set to Running and then an attempt to delete the info for non-existent opsID done.
|
||||
nsMutex.lockMapMutex.Lock()
|
||||
err := nsMutex.statusNoneToBlocked(param, testCases[0].lockSource, testCases[0].opsID, testCases[0].readLock)
|
||||
globalNSMutex.lockMapMutex.Lock()
|
||||
err := globalNSMutex.statusNoneToBlocked(param, testCases[0].lockSource, testCases[0].opsID, testCases[0].readLock)
|
||||
if err != nil {
|
||||
t.Fatalf("Setting lock status to Blocked failed: <ERROR> %s", err)
|
||||
}
|
||||
nsMutex.lockMapMutex.Unlock()
|
||||
err = nsMutex.statusBlockedToRunning(param, testCases[0].lockSource, testCases[0].opsID, testCases[0].readLock)
|
||||
globalNSMutex.lockMapMutex.Unlock()
|
||||
err = globalNSMutex.statusBlockedToRunning(param, testCases[0].lockSource, testCases[0].opsID, testCases[0].readLock)
|
||||
if err != nil {
|
||||
t.Fatalf("Setting lock status to Running failed: <ERROR> %s", err)
|
||||
}
|
||||
actualErr = nsMutex.deleteLockInfoEntryForOps(param, "non-existent-OpsID")
|
||||
actualErr = globalNSMutex.deleteLockInfoEntryForOps(param, "non-existent-OpsID")
|
||||
|
||||
expectedOpsIDErr := LockInfoOpsIDNotFound{param.volume, param.path, "non-existent-OpsID"}
|
||||
if errorCause(actualErr) != expectedOpsIDErr {
|
||||
@ -589,7 +589,7 @@ func TestNsLockMapDeleteLockInfoEntryForOps(t *testing.T) {
|
||||
// All metrics should be 0 after deleting the entry.
|
||||
|
||||
// Verify that the entry the opsID exists.
|
||||
if debugLockMap, ok := nsMutex.debugLockMap[param]; ok {
|
||||
if debugLockMap, ok := globalNSMutex.debugLockMap[param]; ok {
|
||||
if _, ok := debugLockMap.lockInfo[testCases[0].opsID]; !ok {
|
||||
t.Fatalf("Entry for OpsID \"%s\" in <volume> %s, <path> %s should have existed. ", testCases[0].opsID, param.volume, param.path)
|
||||
}
|
||||
@ -597,27 +597,27 @@ func TestNsLockMapDeleteLockInfoEntryForOps(t *testing.T) {
|
||||
t.Fatalf("Entry for <volume> %s, <path> %s should have existed. ", param.volume, param.path)
|
||||
}
|
||||
|
||||
actualErr = nsMutex.deleteLockInfoEntryForOps(param, testCases[0].opsID)
|
||||
actualErr = globalNSMutex.deleteLockInfoEntryForOps(param, testCases[0].opsID)
|
||||
if actualErr != nil {
|
||||
t.Fatalf("Expected the error to be <nil>, but got <ERROR> %s", actualErr)
|
||||
}
|
||||
|
||||
// Verify that the entry for the opsId doesn't exists.
|
||||
if debugLockMap, ok := nsMutex.debugLockMap[param]; ok {
|
||||
if debugLockMap, ok := globalNSMutex.debugLockMap[param]; ok {
|
||||
if _, ok := debugLockMap.lockInfo[testCases[0].opsID]; ok {
|
||||
t.Fatalf("The entry for opsID \"%s\" should have been deleted", testCases[0].opsID)
|
||||
}
|
||||
} else {
|
||||
t.Fatalf("Entry for <volume> %s, <path> %s should have existed. ", param.volume, param.path)
|
||||
}
|
||||
if nsMutex.runningLockCounter != int64(0) {
|
||||
t.Errorf("Expected the count of total running locks to be %v, but got %v", int64(0), nsMutex.runningLockCounter)
|
||||
if globalNSMutex.runningLockCounter != int64(0) {
|
||||
t.Errorf("Expected the count of total running locks to be %v, but got %v", int64(0), globalNSMutex.runningLockCounter)
|
||||
}
|
||||
if nsMutex.blockedCounter != int64(0) {
|
||||
t.Errorf("Expected the count of total blocked locks to be %v, but got %v", int64(0), nsMutex.blockedCounter)
|
||||
if globalNSMutex.blockedCounter != int64(0) {
|
||||
t.Errorf("Expected the count of total blocked locks to be %v, but got %v", int64(0), globalNSMutex.blockedCounter)
|
||||
}
|
||||
if nsMutex.globalLockCounter != int64(0) {
|
||||
t.Errorf("Expected the count of all locks to be %v, but got %v", int64(0), nsMutex.globalLockCounter)
|
||||
if globalNSMutex.globalLockCounter != int64(0) {
|
||||
t.Errorf("Expected the count of all locks to be %v, but got %v", int64(0), globalNSMutex.globalLockCounter)
|
||||
}
|
||||
}
|
||||
|
||||
@ -643,7 +643,7 @@ func TestNsLockMapDeleteLockInfoEntryForVolumePath(t *testing.T) {
|
||||
// Case where an attempt to delete the entry for non-existent <volume, path> pair is done.
|
||||
// Set the status of the lock to blocked and then to running.
|
||||
param := nsParam{testCases[0].volume, testCases[0].path}
|
||||
actualErr := nsMutex.deleteLockInfoEntryForVolumePath(param)
|
||||
actualErr := globalNSMutex.deleteLockInfoEntryForVolumePath(param)
|
||||
expectedNilErr := LockInfoVolPathMissing{param.volume, param.path}
|
||||
if errorCause(actualErr) != expectedNilErr {
|
||||
t.Fatalf("Errors mismatch: Expected \"%s\", got \"%s\"", expectedNilErr, actualErr)
|
||||
@ -654,39 +654,39 @@ func TestNsLockMapDeleteLockInfoEntryForVolumePath(t *testing.T) {
|
||||
// All metrics should be 0 after deleting the entry.
|
||||
|
||||
// Registering the entry first.
|
||||
nsMutex.lockMapMutex.Lock()
|
||||
err := nsMutex.statusNoneToBlocked(param, testCases[0].lockSource, testCases[0].opsID, testCases[0].readLock)
|
||||
globalNSMutex.lockMapMutex.Lock()
|
||||
err := globalNSMutex.statusNoneToBlocked(param, testCases[0].lockSource, testCases[0].opsID, testCases[0].readLock)
|
||||
if err != nil {
|
||||
t.Fatalf("Setting lock status to Blocked failed: <ERROR> %s", err)
|
||||
}
|
||||
nsMutex.lockMapMutex.Unlock()
|
||||
err = nsMutex.statusBlockedToRunning(param, testCases[0].lockSource, testCases[0].opsID, testCases[0].readLock)
|
||||
globalNSMutex.lockMapMutex.Unlock()
|
||||
err = globalNSMutex.statusBlockedToRunning(param, testCases[0].lockSource, testCases[0].opsID, testCases[0].readLock)
|
||||
if err != nil {
|
||||
t.Fatalf("Setting lock status to Running failed: <ERROR> %s", err)
|
||||
}
|
||||
// Verify that the entry the for given <volume, path> exists.
|
||||
if _, ok := nsMutex.debugLockMap[param]; !ok {
|
||||
if _, ok := globalNSMutex.debugLockMap[param]; !ok {
|
||||
t.Fatalf("Entry for <volume> %s, <path> %s should have existed.", param.volume, param.path)
|
||||
}
|
||||
// first delete the entry for the operation ID.
|
||||
_ = nsMutex.deleteLockInfoEntryForOps(param, testCases[0].opsID)
|
||||
actualErr = nsMutex.deleteLockInfoEntryForVolumePath(param)
|
||||
_ = globalNSMutex.deleteLockInfoEntryForOps(param, testCases[0].opsID)
|
||||
actualErr = globalNSMutex.deleteLockInfoEntryForVolumePath(param)
|
||||
if actualErr != nil {
|
||||
t.Fatalf("Expected the error to be <nil>, but got <ERROR> %s", actualErr)
|
||||
}
|
||||
|
||||
// Verify that the entry for the opsId doesn't exists.
|
||||
if _, ok := nsMutex.debugLockMap[param]; ok {
|
||||
if _, ok := globalNSMutex.debugLockMap[param]; ok {
|
||||
t.Fatalf("Entry for <volume> %s, <path> %s should have been deleted. ", param.volume, param.path)
|
||||
}
|
||||
// The lock count values should be 0.
|
||||
if nsMutex.runningLockCounter != int64(0) {
|
||||
t.Errorf("Expected the count of total running locks to be %v, but got %v", int64(0), nsMutex.runningLockCounter)
|
||||
if globalNSMutex.runningLockCounter != int64(0) {
|
||||
t.Errorf("Expected the count of total running locks to be %v, but got %v", int64(0), globalNSMutex.runningLockCounter)
|
||||
}
|
||||
if nsMutex.blockedCounter != int64(0) {
|
||||
t.Errorf("Expected the count of total blocked locks to be %v, but got %v", int64(0), nsMutex.blockedCounter)
|
||||
if globalNSMutex.blockedCounter != int64(0) {
|
||||
t.Errorf("Expected the count of total blocked locks to be %v, but got %v", int64(0), globalNSMutex.blockedCounter)
|
||||
}
|
||||
if nsMutex.globalLockCounter != int64(0) {
|
||||
t.Errorf("Expected the count of all locks to be %v, but got %v", int64(0), nsMutex.globalLockCounter)
|
||||
if globalNSMutex.globalLockCounter != int64(0) {
|
||||
t.Errorf("Expected the count of all locks to be %v, but got %v", int64(0), globalNSMutex.globalLockCounter)
|
||||
}
|
||||
}
|
||||
|
@ -61,16 +61,16 @@ type OpsLockState struct {
|
||||
|
||||
// Read entire state of the locks in the system and return.
|
||||
func getSystemLockState() (SystemLockState, error) {
|
||||
nsMutex.lockMapMutex.Lock()
|
||||
defer nsMutex.lockMapMutex.Unlock()
|
||||
globalNSMutex.lockMapMutex.Lock()
|
||||
defer globalNSMutex.lockMapMutex.Unlock()
|
||||
|
||||
lockState := SystemLockState{}
|
||||
|
||||
lockState.TotalBlockedLocks = nsMutex.blockedCounter
|
||||
lockState.TotalLocks = nsMutex.globalLockCounter
|
||||
lockState.TotalAcquiredLocks = nsMutex.runningLockCounter
|
||||
lockState.TotalBlockedLocks = globalNSMutex.blockedCounter
|
||||
lockState.TotalLocks = globalNSMutex.globalLockCounter
|
||||
lockState.TotalAcquiredLocks = globalNSMutex.runningLockCounter
|
||||
|
||||
for param, debugLock := range nsMutex.debugLockMap {
|
||||
for param, debugLock := range globalNSMutex.debugLockMap {
|
||||
volLockInfo := VolumeLockInfo{}
|
||||
volLockInfo.Bucket = param.volume
|
||||
volLockInfo.Object = param.path
|
||||
|
@ -26,7 +26,7 @@ import (
|
||||
)
|
||||
|
||||
// Global name space lock.
|
||||
var nsMutex *nsLockMap
|
||||
var globalNSMutex *nsLockMap
|
||||
|
||||
// Initialize distributed locking only in case of distributed setup.
|
||||
// Returns if the setup is distributed or not on success.
|
||||
@ -57,15 +57,15 @@ func initDsyncNodes(eps []*url.URL) error {
|
||||
}
|
||||
|
||||
// initNSLock - initialize name space lock map.
|
||||
func initNSLock(isDist bool) {
|
||||
nsMutex = &nsLockMap{
|
||||
isDist: isDist,
|
||||
lockMap: make(map[nsParam]*nsLock),
|
||||
func initNSLock(isDistXL bool) {
|
||||
globalNSMutex = &nsLockMap{
|
||||
isDistXL: isDistXL,
|
||||
lockMap: make(map[nsParam]*nsLock),
|
||||
}
|
||||
|
||||
// Initialize nsLockMap with entry for instrumentation information.
|
||||
// Entries of <volume,path> -> stateInfo of locks
|
||||
nsMutex.debugLockMap = make(map[nsParam]*debugLockInfoPerVolumePath)
|
||||
globalNSMutex.debugLockMap = make(map[nsParam]*debugLockInfoPerVolumePath)
|
||||
}
|
||||
|
||||
// RWLocker - interface that any read-write locking library should implement.
|
||||
@ -98,7 +98,7 @@ type nsLockMap struct {
|
||||
|
||||
// Indicates whether the locking service is part
|
||||
// of a distributed setup or not.
|
||||
isDist bool
|
||||
isDistXL bool
|
||||
lockMap map[nsParam]*nsLock
|
||||
lockMapMutex sync.Mutex
|
||||
}
|
||||
@ -113,8 +113,8 @@ func (n *nsLockMap) lock(volume, path string, lockSource, opsID string, readLock
|
||||
if !found {
|
||||
nsLk = &nsLock{
|
||||
RWLocker: func() RWLocker {
|
||||
if n.isDist {
|
||||
return dsync.NewDRWMutex(pathutil.Join(volume, path))
|
||||
if n.isDistXL {
|
||||
return dsync.NewDRWMutex(pathJoin(volume, path))
|
||||
}
|
||||
return &sync.RWMutex{}
|
||||
}(),
|
||||
@ -126,7 +126,7 @@ func (n *nsLockMap) lock(volume, path string, lockSource, opsID string, readLock
|
||||
|
||||
// Change the state of the lock to be blocked for the given
|
||||
// pair of <volume, path> and <OperationID> till the lock
|
||||
// unblocks. The lock for accessing `nsMutex` is held inside
|
||||
// unblocks. The lock for accessing `globalNSMutex` is held inside
|
||||
// the function itself.
|
||||
if err := n.statusNoneToBlocked(param, lockSource, opsID, readLock); err != nil {
|
||||
errorIf(err, "Failed to set lock state to blocked")
|
||||
@ -226,7 +226,7 @@ func (n *nsLockMap) ForceUnlock(volume, path string) {
|
||||
defer n.lockMapMutex.Unlock()
|
||||
|
||||
// Clarification on operation:
|
||||
// - In case of FS or XL we call ForceUnlock on the local nsMutex
|
||||
// - In case of FS or XL we call ForceUnlock on the local globalNSMutex
|
||||
// (since there is only a single server) which will cause the 'stuck'
|
||||
// mutex to be removed from the map. Existing operations for this
|
||||
// will continue to be blocked (and timeout). New operations on this
|
||||
@ -238,9 +238,8 @@ func (n *nsLockMap) ForceUnlock(volume, path string) {
|
||||
// that participated in granting the lock. Any pending dsync locks that
|
||||
// are blocking can now proceed as normal and any new locks will also
|
||||
// participate normally.
|
||||
|
||||
if n.isDist { // For distributed mode, broadcast ForceUnlock message.
|
||||
dsync.NewDRWMutex(pathutil.Join(volume, path)).ForceUnlock()
|
||||
if n.isDistXL { // For distributed mode, broadcast ForceUnlock message.
|
||||
dsync.NewDRWMutex(pathJoin(volume, path)).ForceUnlock()
|
||||
}
|
||||
|
||||
param := nsParam{volume, path}
|
||||
|
@ -37,22 +37,22 @@ func TestNamespaceLockTest(t *testing.T) {
|
||||
shouldPass bool
|
||||
}{
|
||||
{
|
||||
lk: nsMutex.Lock,
|
||||
unlk: nsMutex.Unlock,
|
||||
lk: globalNSMutex.Lock,
|
||||
unlk: globalNSMutex.Unlock,
|
||||
lockedRefCount: 1,
|
||||
unlockedRefCount: 0,
|
||||
shouldPass: true,
|
||||
},
|
||||
{
|
||||
rlk: nsMutex.RLock,
|
||||
runlk: nsMutex.RUnlock,
|
||||
rlk: globalNSMutex.RLock,
|
||||
runlk: globalNSMutex.RUnlock,
|
||||
lockedRefCount: 4,
|
||||
unlockedRefCount: 2,
|
||||
shouldPass: true,
|
||||
},
|
||||
{
|
||||
rlk: nsMutex.RLock,
|
||||
runlk: nsMutex.RUnlock,
|
||||
rlk: globalNSMutex.RLock,
|
||||
runlk: globalNSMutex.RUnlock,
|
||||
lockedRefCount: 1,
|
||||
unlockedRefCount: 0,
|
||||
shouldPass: true,
|
||||
@ -64,7 +64,7 @@ func TestNamespaceLockTest(t *testing.T) {
|
||||
// Write lock tests.
|
||||
testCase := testCases[0]
|
||||
testCase.lk("a", "b", "c") // lock once.
|
||||
nsLk, ok := nsMutex.lockMap[nsParam{"a", "b"}]
|
||||
nsLk, ok := globalNSMutex.lockMap[nsParam{"a", "b"}]
|
||||
if !ok && testCase.shouldPass {
|
||||
t.Errorf("Lock in map missing.")
|
||||
}
|
||||
@ -76,7 +76,7 @@ func TestNamespaceLockTest(t *testing.T) {
|
||||
if testCase.unlockedRefCount != nsLk.ref && testCase.shouldPass {
|
||||
t.Errorf("Test %d fails, expected to pass. Wanted ref count is %d, got %d", 1, testCase.unlockedRefCount, nsLk.ref)
|
||||
}
|
||||
_, ok = nsMutex.lockMap[nsParam{"a", "b"}]
|
||||
_, ok = globalNSMutex.lockMap[nsParam{"a", "b"}]
|
||||
if ok && !testCase.shouldPass {
|
||||
t.Errorf("Lock map found after unlock.")
|
||||
}
|
||||
@ -87,7 +87,7 @@ func TestNamespaceLockTest(t *testing.T) {
|
||||
testCase.rlk("a", "b", "c") // lock second time.
|
||||
testCase.rlk("a", "b", "c") // lock third time.
|
||||
testCase.rlk("a", "b", "c") // lock fourth time.
|
||||
nsLk, ok = nsMutex.lockMap[nsParam{"a", "b"}]
|
||||
nsLk, ok = globalNSMutex.lockMap[nsParam{"a", "b"}]
|
||||
if !ok && testCase.shouldPass {
|
||||
t.Errorf("Lock in map missing.")
|
||||
}
|
||||
@ -101,7 +101,7 @@ func TestNamespaceLockTest(t *testing.T) {
|
||||
if testCase.unlockedRefCount != nsLk.ref && testCase.shouldPass {
|
||||
t.Errorf("Test %d fails, expected to pass. Wanted ref count is %d, got %d", 2, testCase.unlockedRefCount, nsLk.ref)
|
||||
}
|
||||
_, ok = nsMutex.lockMap[nsParam{"a", "b"}]
|
||||
_, ok = globalNSMutex.lockMap[nsParam{"a", "b"}]
|
||||
if !ok && testCase.shouldPass {
|
||||
t.Errorf("Lock map not found.")
|
||||
}
|
||||
@ -110,7 +110,7 @@ func TestNamespaceLockTest(t *testing.T) {
|
||||
testCase = testCases[2]
|
||||
testCase.rlk("a", "c", "d") // lock once.
|
||||
|
||||
nsLk, ok = nsMutex.lockMap[nsParam{"a", "c"}]
|
||||
nsLk, ok = globalNSMutex.lockMap[nsParam{"a", "c"}]
|
||||
if !ok && testCase.shouldPass {
|
||||
t.Errorf("Lock in map missing.")
|
||||
}
|
||||
@ -122,7 +122,7 @@ func TestNamespaceLockTest(t *testing.T) {
|
||||
if testCase.unlockedRefCount != nsLk.ref && testCase.shouldPass {
|
||||
t.Errorf("Test %d fails, expected to pass. Wanted ref count is %d, got %d", 3, testCase.unlockedRefCount, nsLk.ref)
|
||||
}
|
||||
_, ok = nsMutex.lockMap[nsParam{"a", "c"}]
|
||||
_, ok = globalNSMutex.lockMap[nsParam{"a", "c"}]
|
||||
if ok && !testCase.shouldPass {
|
||||
t.Errorf("Lock map not found.")
|
||||
}
|
||||
@ -303,7 +303,7 @@ func TestLockStats(t *testing.T) {
|
||||
|
||||
// hold 10 read locks.
|
||||
for i := 0; i < 10; i++ {
|
||||
nsMutex.RLock("my-bucket", "my-object", strconv.Itoa(i))
|
||||
globalNSMutex.RLock("my-bucket", "my-object", strconv.Itoa(i))
|
||||
}
|
||||
// expected lock info.
|
||||
expectedLockStats := expectedResult[0]
|
||||
@ -311,7 +311,7 @@ func TestLockStats(t *testing.T) {
|
||||
verifyLockState(expectedLockStats, t, 1)
|
||||
// unlock 5 readlock.
|
||||
for i := 0; i < 5; i++ {
|
||||
nsMutex.RUnlock("my-bucket", "my-object", strconv.Itoa(i))
|
||||
globalNSMutex.RUnlock("my-bucket", "my-object", strconv.Itoa(i))
|
||||
}
|
||||
|
||||
expectedLockStats = expectedResult[1]
|
||||
@ -323,14 +323,14 @@ func TestLockStats(t *testing.T) {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
// blocks till all read locks are released.
|
||||
nsMutex.Lock("my-bucket", "my-object", strconv.Itoa(10))
|
||||
globalNSMutex.Lock("my-bucket", "my-object", strconv.Itoa(10))
|
||||
// Once the above attempt to lock is unblocked/acquired, we verify the stats and release the lock.
|
||||
expectedWLockStats := expectedResult[2]
|
||||
// Since the write lock acquired here, the number of blocked locks should reduce by 1 and
|
||||
// count of running locks should increase by 1.
|
||||
verifyLockState(expectedWLockStats, t, 3)
|
||||
// release the write lock.
|
||||
nsMutex.Unlock("my-bucket", "my-object", strconv.Itoa(10))
|
||||
globalNSMutex.Unlock("my-bucket", "my-object", strconv.Itoa(10))
|
||||
// The number of running locks should decrease by 1.
|
||||
// expectedWLockStats = expectedResult[3]
|
||||
// verifyLockState(expectedWLockStats, t, 4)
|
||||
@ -348,14 +348,14 @@ func TestLockStats(t *testing.T) {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
// blocks till all read locks are released.
|
||||
nsMutex.Lock("my-bucket", "my-object", strconv.Itoa(11))
|
||||
globalNSMutex.Lock("my-bucket", "my-object", strconv.Itoa(11))
|
||||
// Once the above attempt to lock is unblocked/acquired, we release the lock.
|
||||
// Unlock the second write lock only after lock stats for first write lock release is taken.
|
||||
<-syncChan
|
||||
// The number of running locks should decrease by 1.
|
||||
expectedWLockStats := expectedResult[4]
|
||||
verifyLockState(expectedWLockStats, t, 5)
|
||||
nsMutex.Unlock("my-bucket", "my-object", strconv.Itoa(11))
|
||||
globalNSMutex.Unlock("my-bucket", "my-object", strconv.Itoa(11))
|
||||
}()
|
||||
|
||||
expectedLockStats = expectedResult[5]
|
||||
@ -366,7 +366,7 @@ func TestLockStats(t *testing.T) {
|
||||
|
||||
// unlock 4 out of remaining 5 read locks.
|
||||
for i := 0; i < 4; i++ {
|
||||
nsMutex.RUnlock("my-bucket", "my-object", strconv.Itoa(i+5))
|
||||
globalNSMutex.RUnlock("my-bucket", "my-object", strconv.Itoa(i+5))
|
||||
}
|
||||
|
||||
// verify the entry for one remaining read lock and count of blocked write locks.
|
||||
@ -375,7 +375,7 @@ func TestLockStats(t *testing.T) {
|
||||
verifyLockState(expectedLockStats, t, 7)
|
||||
|
||||
// Releasing the last read lock.
|
||||
nsMutex.RUnlock("my-bucket", "my-object", strconv.Itoa(9))
|
||||
globalNSMutex.RUnlock("my-bucket", "my-object", strconv.Itoa(9))
|
||||
wg.Wait()
|
||||
expectedLockStats = expectedResult[7]
|
||||
// verify the actual lock info with the expected one.
|
||||
@ -386,16 +386,16 @@ func TestLockStats(t *testing.T) {
|
||||
func TestNamespaceForceUnlockTest(t *testing.T) {
|
||||
|
||||
// Create lock.
|
||||
lock := nsMutex.NewNSLock("bucket", "object")
|
||||
lock := globalNSMutex.NewNSLock("bucket", "object")
|
||||
lock.Lock()
|
||||
// Forcefully unlock lock.
|
||||
nsMutex.ForceUnlock("bucket", "object")
|
||||
globalNSMutex.ForceUnlock("bucket", "object")
|
||||
|
||||
ch := make(chan struct{}, 1)
|
||||
|
||||
go func() {
|
||||
// Try to claim lock again.
|
||||
anotherLock := nsMutex.NewNSLock("bucket", "object")
|
||||
anotherLock := globalNSMutex.NewNSLock("bucket", "object")
|
||||
anotherLock.Lock()
|
||||
// And signal succes.
|
||||
ch <- struct{}{}
|
||||
@ -412,5 +412,5 @@ func TestNamespaceForceUnlockTest(t *testing.T) {
|
||||
}
|
||||
|
||||
// Clean up lock.
|
||||
nsMutex.ForceUnlock("bucket", "object")
|
||||
globalNSMutex.ForceUnlock("bucket", "object")
|
||||
}
|
||||
|
@ -96,6 +96,11 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req
|
||||
return
|
||||
}
|
||||
|
||||
// Lock the object before reading.
|
||||
objectLock := globalNSMutex.NewNSLock(bucket, object)
|
||||
objectLock.RLock()
|
||||
defer objectLock.RUnlock()
|
||||
|
||||
objInfo, err := objectAPI.GetObjectInfo(bucket, object)
|
||||
if err != nil {
|
||||
errorIf(err, "Unable to fetch object info.")
|
||||
@ -195,6 +200,11 @@ func (api objectAPIHandlers) HeadObjectHandler(w http.ResponseWriter, r *http.Re
|
||||
return
|
||||
}
|
||||
|
||||
// Lock the object before reading.
|
||||
objectLock := globalNSMutex.NewNSLock(bucket, object)
|
||||
objectLock.RLock()
|
||||
defer objectLock.RUnlock()
|
||||
|
||||
objInfo, err := objectAPI.GetObjectInfo(bucket, object)
|
||||
if err != nil {
|
||||
errorIf(err, "Unable to fetch object info.")
|
||||
@ -269,6 +279,11 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
|
||||
return
|
||||
}
|
||||
|
||||
// Lock the object before reading.
|
||||
objectRLock := globalNSMutex.NewNSLock(sourceBucket, sourceObject)
|
||||
objectRLock.RLock()
|
||||
defer objectRLock.RUnlock()
|
||||
|
||||
objInfo, err := objectAPI.GetObjectInfo(sourceBucket, sourceObject)
|
||||
if err != nil {
|
||||
errorIf(err, "Unable to fetch object info.")
|
||||
@ -311,6 +326,11 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
|
||||
delete(metadata, "md5Sum")
|
||||
|
||||
sha256sum := ""
|
||||
|
||||
objectWLock := globalNSMutex.NewNSLock(bucket, object)
|
||||
objectWLock.Lock()
|
||||
defer objectWLock.Unlock()
|
||||
|
||||
// Create the object.
|
||||
objInfo, err = objectAPI.PutObject(bucket, object, size, pipeReader, metadata, sha256sum)
|
||||
if err != nil {
|
||||
@ -400,6 +420,11 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
|
||||
|
||||
sha256sum := ""
|
||||
|
||||
// Lock the object.
|
||||
objectLock := globalNSMutex.NewNSLock(bucket, object)
|
||||
objectLock.Lock()
|
||||
defer objectLock.Unlock()
|
||||
|
||||
var objInfo ObjectInfo
|
||||
switch rAuthType {
|
||||
default:
|
||||
@ -731,6 +756,11 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
|
||||
completeParts = append(completeParts, part)
|
||||
}
|
||||
|
||||
// Hold write lock on the object.
|
||||
destLock := globalNSMutex.NewNSLock(bucket, object)
|
||||
destLock.Lock()
|
||||
defer destLock.Unlock()
|
||||
|
||||
md5Sum, err = objectAPI.CompleteMultipartUpload(bucket, object, uploadID, completeParts)
|
||||
if err != nil {
|
||||
err = errorCause(err)
|
||||
@ -801,6 +831,10 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http.
|
||||
return
|
||||
}
|
||||
|
||||
objectLock := globalNSMutex.NewNSLock(bucket, object)
|
||||
objectLock.Lock()
|
||||
defer objectLock.Unlock()
|
||||
|
||||
/// http://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectDELETE.html
|
||||
/// Ignore delete object errors, since we are suppposed to reply
|
||||
/// only 204.
|
||||
|
@ -17,7 +17,6 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
@ -148,6 +147,9 @@ func (web *webAPIHandlers) MakeBucket(r *http.Request, args *MakeBucketArgs, rep
|
||||
if !isJWTReqAuthenticated(r) {
|
||||
return toJSONError(errAuthentication)
|
||||
}
|
||||
bucketLock := globalNSMutex.NewNSLock(args.BucketName, "")
|
||||
bucketLock.Lock()
|
||||
defer bucketLock.Unlock()
|
||||
if err := objectAPI.MakeBucket(args.BucketName); err != nil {
|
||||
return toJSONError(err, args.BucketName)
|
||||
}
|
||||
@ -272,6 +274,11 @@ func (web *webAPIHandlers) RemoveObject(r *http.Request, args *RemoveObjectArgs,
|
||||
if !isJWTReqAuthenticated(r) {
|
||||
return toJSONError(errAuthentication)
|
||||
}
|
||||
|
||||
objectLock := globalNSMutex.NewNSLock(args.BucketName, args.ObjectName)
|
||||
objectLock.Lock()
|
||||
defer objectLock.Unlock()
|
||||
|
||||
if err := objectAPI.DeleteObject(args.BucketName, args.ObjectName); err != nil {
|
||||
if isErrObjectNotFound(err) {
|
||||
// Ignore object not found error.
|
||||
@ -478,16 +485,15 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) {
|
||||
// Extract incoming metadata if any.
|
||||
metadata := extractMetadataFromHeader(r.Header)
|
||||
|
||||
sha256sum := ""
|
||||
if _, err := objectAPI.PutObject(bucket, object, -1, r.Body, metadata, sha256sum); err != nil {
|
||||
writeWebErrorResponse(w, err)
|
||||
return
|
||||
}
|
||||
// Lock the object.
|
||||
objectLock := globalNSMutex.NewNSLock(bucket, object)
|
||||
objectLock.Lock()
|
||||
defer objectLock.Unlock()
|
||||
|
||||
// Fetch object info for notifications.
|
||||
objInfo, err := objectAPI.GetObjectInfo(bucket, object)
|
||||
sha256sum := ""
|
||||
objInfo, err := objectAPI.PutObject(bucket, object, -1, r.Body, metadata, sha256sum)
|
||||
if err != nil {
|
||||
errorIf(err, "Unable to fetch object info for \"%s\"", path.Join(bucket, object))
|
||||
writeWebErrorResponse(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
@ -534,6 +540,11 @@ func (web *webAPIHandlers) Download(w http.ResponseWriter, r *http.Request) {
|
||||
// Add content disposition.
|
||||
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=\"%s\"", path.Base(object)))
|
||||
|
||||
// Lock the object before reading.
|
||||
objectLock := globalNSMutex.NewNSLock(bucket, object)
|
||||
objectLock.RLock()
|
||||
defer objectLock.RUnlock()
|
||||
|
||||
objInfo, err := objectAPI.GetObjectInfo(bucket, object)
|
||||
if err != nil {
|
||||
writeWebErrorResponse(w, err)
|
||||
@ -691,16 +702,8 @@ func (web *webAPIHandlers) SetBucketPolicy(r *http.Request, args *SetBucketPolic
|
||||
return toJSONError(err)
|
||||
}
|
||||
|
||||
// Parse bucket policy.
|
||||
var policy = &bucketPolicy{}
|
||||
err = parseBucketPolicy(bytes.NewReader(data), policy)
|
||||
if err != nil {
|
||||
errorIf(err, "Unable to parse bucket policy.")
|
||||
return toJSONError(err, args.BucketName)
|
||||
}
|
||||
|
||||
// Parse check bucket policy.
|
||||
if s3Error := checkBucketPolicyResources(args.BucketName, policy); s3Error != ErrNone {
|
||||
// Parse validate and save bucket policy.
|
||||
if s3Error := parseAndPersistBucketPolicy(args.BucketName, data, objectAPI); s3Error != ErrNone {
|
||||
apiErr := getAPIError(s3Error)
|
||||
var err error
|
||||
if apiErr.Code == "XMinioPolicyNesting" {
|
||||
@ -710,12 +713,6 @@ func (web *webAPIHandlers) SetBucketPolicy(r *http.Request, args *SetBucketPolic
|
||||
}
|
||||
return toJSONError(err, args.BucketName)
|
||||
}
|
||||
|
||||
// TODO: update policy statements according to bucket name,
|
||||
// prefix and policy arguments.
|
||||
if err := persistAndNotifyBucketPolicyChange(args.BucketName, policyChange{false, policy}, objectAPI); err != nil {
|
||||
return toJSONError(err, args.BucketName)
|
||||
}
|
||||
reply.UIVersion = miniobrowser.UIVersion
|
||||
return nil
|
||||
}
|
||||
@ -808,8 +805,7 @@ func toJSONError(err error, params ...string) (jerr *json2.Error) {
|
||||
case "InvalidBucketName":
|
||||
if len(params) > 0 {
|
||||
jerr = &json2.Error{
|
||||
Message: fmt.Sprintf("Bucket Name %s is invalid. Lowercase letters, period and numerals are the only allowed characters.",
|
||||
params[0]),
|
||||
Message: fmt.Sprintf("Bucket Name %s is invalid. Lowercase letters, period and numerals are the only allowed characters.", params[0]),
|
||||
}
|
||||
}
|
||||
// Bucket not found custom error message.
|
||||
|
@ -36,10 +36,6 @@ func (xl xlObjects) MakeBucket(bucket string) error {
|
||||
return traceError(BucketNameInvalid{Bucket: bucket})
|
||||
}
|
||||
|
||||
bucketLock := nsMutex.NewNSLock(bucket, "")
|
||||
bucketLock.Lock()
|
||||
defer bucketLock.Unlock()
|
||||
|
||||
// Initialize sync waitgroup.
|
||||
var wg = &sync.WaitGroup{}
|
||||
|
||||
@ -153,10 +149,6 @@ func (xl xlObjects) GetBucketInfo(bucket string) (BucketInfo, error) {
|
||||
return BucketInfo{}, BucketNameInvalid{Bucket: bucket}
|
||||
}
|
||||
|
||||
bucketLock := nsMutex.NewNSLock(bucket, "")
|
||||
bucketLock.RLock()
|
||||
defer bucketLock.RUnlock()
|
||||
|
||||
bucketInfo, err := xl.getBucketInfo(bucket)
|
||||
if err != nil {
|
||||
return BucketInfo{}, toObjectErr(err, bucket)
|
||||
@ -227,10 +219,6 @@ func (xl xlObjects) DeleteBucket(bucket string) error {
|
||||
return BucketNameInvalid{Bucket: bucket}
|
||||
}
|
||||
|
||||
bucketLock := nsMutex.NewNSLock(bucket, "")
|
||||
bucketLock.Lock()
|
||||
defer bucketLock.Unlock()
|
||||
|
||||
// Collect if all disks report volume not found.
|
||||
var wg = &sync.WaitGroup{}
|
||||
var dErrs = make([]error, len(xl.storageDisks))
|
||||
|
@ -73,7 +73,7 @@ func (xl xlObjects) HealBucket(bucket string) error {
|
||||
|
||||
// Heal bucket - create buckets on disks where it does not exist.
|
||||
func healBucket(storageDisks []StorageAPI, bucket string, writeQuorum int) error {
|
||||
bucketLock := nsMutex.NewNSLock(bucket, "")
|
||||
bucketLock := globalNSMutex.NewNSLock(bucket, "")
|
||||
bucketLock.Lock()
|
||||
defer bucketLock.Unlock()
|
||||
|
||||
@ -126,7 +126,7 @@ func healBucket(storageDisks []StorageAPI, bucket string, writeQuorum int) error
|
||||
// heals `policy.json`, `notification.xml` and `listeners.json`.
|
||||
func healBucketMetadata(storageDisks []StorageAPI, bucket string, readQuorum int) error {
|
||||
healBucketMetaFn := func(metaPath string) error {
|
||||
metaLock := nsMutex.NewNSLock(minioMetaBucket, metaPath)
|
||||
metaLock := globalNSMutex.NewNSLock(minioMetaBucket, metaPath)
|
||||
metaLock.RLock()
|
||||
defer metaLock.RUnlock()
|
||||
// Heals the given file at metaPath.
|
||||
@ -346,7 +346,7 @@ func (xl xlObjects) HealObject(bucket, object string) error {
|
||||
}
|
||||
|
||||
// Lock the object before healing.
|
||||
objectLock := nsMutex.NewNSLock(bucket, object)
|
||||
objectLock := globalNSMutex.NewNSLock(bucket, object)
|
||||
objectLock.RLock()
|
||||
defer objectLock.RUnlock()
|
||||
|
||||
|
@ -144,7 +144,7 @@ func (xl xlObjects) listObjectsHeal(bucket, prefix, marker, delimiter string, ma
|
||||
}
|
||||
|
||||
// Check if the current object needs healing
|
||||
objectLock := nsMutex.NewNSLock(bucket, objInfo.Name)
|
||||
objectLock := globalNSMutex.NewNSLock(bucket, objInfo.Name)
|
||||
objectLock.RLock()
|
||||
partsMetadata, errs := readAllXLMetadata(xl.storageDisks, bucket, objInfo.Name)
|
||||
if xlShouldHeal(partsMetadata, errs) {
|
||||
|
@ -64,7 +64,7 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
|
||||
// uploadIDMarker first.
|
||||
if uploadIDMarker != "" {
|
||||
// hold lock on keyMarker path
|
||||
keyMarkerLock := nsMutex.NewNSLock(minioMetaMultipartBucket,
|
||||
keyMarkerLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket,
|
||||
pathJoin(bucket, keyMarker))
|
||||
keyMarkerLock.RLock()
|
||||
for _, disk := range xl.getLoadBalancedDisks() {
|
||||
@ -134,7 +134,7 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
|
||||
|
||||
// For the new object entry we get all its
|
||||
// pending uploadIDs.
|
||||
entryLock := nsMutex.NewNSLock(minioMetaMultipartBucket,
|
||||
entryLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket,
|
||||
pathJoin(bucket, entry))
|
||||
entryLock.RLock()
|
||||
var disk StorageAPI
|
||||
@ -242,7 +242,7 @@ func (xl xlObjects) newMultipartUpload(bucket string, object string, meta map[st
|
||||
|
||||
// This lock needs to be held for any changes to the directory
|
||||
// contents of ".minio.sys/multipart/object/"
|
||||
objectMPartPathLock := nsMutex.NewNSLock(minioMetaMultipartBucket,
|
||||
objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket,
|
||||
pathJoin(bucket, object))
|
||||
objectMPartPathLock.Lock()
|
||||
defer objectMPartPathLock.Unlock()
|
||||
@ -305,7 +305,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
|
||||
uploadIDPath := pathJoin(bucket, object, uploadID)
|
||||
|
||||
// pre-check upload id lock.
|
||||
preUploadIDLock := nsMutex.NewNSLock(minioMetaMultipartBucket, uploadIDPath)
|
||||
preUploadIDLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, uploadIDPath)
|
||||
preUploadIDLock.RLock()
|
||||
// Validates if upload ID exists.
|
||||
if !xl.isUploadIDExists(bucket, object, uploadID) {
|
||||
@ -414,7 +414,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
|
||||
}
|
||||
|
||||
// post-upload check (write) lock
|
||||
postUploadIDLock := nsMutex.NewNSLock(minioMetaMultipartBucket, uploadIDPath)
|
||||
postUploadIDLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, uploadIDPath)
|
||||
postUploadIDLock.Lock()
|
||||
defer postUploadIDLock.Unlock()
|
||||
|
||||
@ -557,7 +557,7 @@ func (xl xlObjects) ListObjectParts(bucket, object, uploadID string, partNumberM
|
||||
|
||||
// Hold lock so that there is no competing
|
||||
// abort-multipart-upload or complete-multipart-upload.
|
||||
uploadIDLock := nsMutex.NewNSLock(minioMetaMultipartBucket,
|
||||
uploadIDLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket,
|
||||
pathJoin(bucket, object, uploadID))
|
||||
uploadIDLock.Lock()
|
||||
defer uploadIDLock.Unlock()
|
||||
@ -586,7 +586,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
|
||||
//
|
||||
// 2) no one does a parallel complete-multipart-upload on this
|
||||
// multipart upload
|
||||
uploadIDLock := nsMutex.NewNSLock(minioMetaMultipartBucket,
|
||||
uploadIDLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket,
|
||||
pathJoin(bucket, object, uploadID))
|
||||
uploadIDLock.Lock()
|
||||
defer uploadIDLock.Unlock()
|
||||
@ -704,20 +704,12 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
|
||||
return "", toObjectErr(rErr, minioMetaMultipartBucket, uploadIDPath)
|
||||
}
|
||||
|
||||
// Hold write lock on the destination before rename.
|
||||
destLock := nsMutex.NewNSLock(bucket, object)
|
||||
destLock.Lock()
|
||||
defer func() {
|
||||
if xl.objCacheEnabled {
|
||||
// A new complete multipart upload invalidates any
|
||||
// previously cached object in memory.
|
||||
xl.objCache.Delete(path.Join(bucket, object))
|
||||
}
|
||||
|
||||
// This lock also protects the cache namespace.
|
||||
destLock.Unlock()
|
||||
|
||||
if xl.objCacheEnabled {
|
||||
// Prefetch the object from disk by triggering a fake GetObject call
|
||||
// Unlike a regular single PutObject, multipart PutObject is comes in
|
||||
// stages and it is harder to cache.
|
||||
@ -761,7 +753,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
|
||||
// Hold the lock so that two parallel
|
||||
// complete-multipart-uploads do not leave a stale
|
||||
// uploads.json behind.
|
||||
objectMPartPathLock := nsMutex.NewNSLock(minioMetaMultipartBucket,
|
||||
objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket,
|
||||
pathJoin(bucket, object))
|
||||
objectMPartPathLock.Lock()
|
||||
defer objectMPartPathLock.Unlock()
|
||||
@ -787,7 +779,7 @@ func (xl xlObjects) abortMultipartUpload(bucket, object, uploadID string) (err e
|
||||
|
||||
// hold lock so we don't compete with a complete, or abort
|
||||
// multipart request.
|
||||
objectMPartPathLock := nsMutex.NewNSLock(minioMetaMultipartBucket,
|
||||
objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket,
|
||||
pathJoin(bucket, object))
|
||||
objectMPartPathLock.Lock()
|
||||
defer objectMPartPathLock.Unlock()
|
||||
@ -819,7 +811,7 @@ func (xl xlObjects) AbortMultipartUpload(bucket, object, uploadID string) error
|
||||
|
||||
// Hold lock so that there is no competing
|
||||
// complete-multipart-upload or put-object-part.
|
||||
uploadIDLock := nsMutex.NewNSLock(minioMetaMultipartBucket,
|
||||
uploadIDLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket,
|
||||
pathJoin(bucket, object, uploadID))
|
||||
uploadIDLock.Lock()
|
||||
defer uploadIDLock.Unlock()
|
||||
|
@ -62,11 +62,6 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i
|
||||
return traceError(errUnexpected)
|
||||
}
|
||||
|
||||
// Lock the object before reading.
|
||||
objectLock := nsMutex.NewNSLock(bucket, object)
|
||||
objectLock.RLock()
|
||||
defer objectLock.RUnlock()
|
||||
|
||||
// Read metadata associated with the object from all disks.
|
||||
metaArr, errs := readAllXLMetadata(xl.storageDisks, bucket, object)
|
||||
// Do we have read quorum?
|
||||
@ -222,10 +217,6 @@ func (xl xlObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) {
|
||||
return ObjectInfo{}, err
|
||||
}
|
||||
|
||||
objectLock := nsMutex.NewNSLock(bucket, object)
|
||||
objectLock.RLock()
|
||||
defer objectLock.RUnlock()
|
||||
|
||||
info, err := xl.getObjectInfo(bucket, object)
|
||||
if err != nil {
|
||||
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||
@ -485,11 +476,6 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
|
||||
}
|
||||
}
|
||||
|
||||
// Lock the object.
|
||||
objectLock := nsMutex.NewNSLock(bucket, object)
|
||||
objectLock.Lock()
|
||||
defer objectLock.Unlock()
|
||||
|
||||
// Check if an object is present as one of the parent dir.
|
||||
// -- FIXME. (needs a new kind of lock).
|
||||
if xl.parentDirIsObject(bucket, path.Dir(object)) {
|
||||
@ -606,10 +592,6 @@ func (xl xlObjects) DeleteObject(bucket, object string) (err error) {
|
||||
return err
|
||||
}
|
||||
|
||||
objectLock := nsMutex.NewNSLock(bucket, object)
|
||||
objectLock.Lock()
|
||||
defer objectLock.Unlock()
|
||||
|
||||
// Validate object exists.
|
||||
if !xl.isObject(bucket, object) {
|
||||
return traceError(ObjectNotFound{bucket, object})
|
||||
|
Loading…
Reference in New Issue
Block a user