From 0bb624705637c658e1aef747564f7f6bd8dc727c Mon Sep 17 00:00:00 2001 From: poornas Date: Fri, 12 Jan 2018 20:34:52 -0800 Subject: [PATCH] Move nslocking from s3 layer to object layer (#5382) Fixes #5350 --- cmd/admin-handlers.go | 6 +- cmd/benchmark-utils_test.go | 4 +- cmd/bucket-handlers.go | 27 +---- cmd/bucket-policy.go | 22 +--- cmd/event-notifier.go | 42 +------ cmd/fs-v1-multipart.go | 41 +++++-- cmd/fs-v1.go | 109 ++++++++++++++++++- cmd/gateway-handlers.go | 10 +- cmd/gateway-router.go | 2 +- cmd/gateway-unsupported.go | 19 +++- cmd/gateway/azure/gateway-azure-anonymous.go | 2 +- cmd/gateway/azure/gateway-azure.go | 4 +- cmd/gateway/b2/gateway-b2-anonymous.go | 2 +- cmd/gateway/b2/gateway-b2.go | 4 +- cmd/gateway/gcs/gateway-gcs-anonymous.go | 2 +- cmd/gateway/gcs/gateway-gcs.go | 4 +- cmd/gateway/manta/gateway-manta.go | 4 +- cmd/gateway/oss/gateway-oss-anonymous.go | 4 +- cmd/gateway/oss/gateway-oss.go | 10 +- cmd/gateway/s3/gateway-s3-anonymous.go | 2 +- cmd/gateway/s3/gateway-s3.go | 6 +- cmd/gateway/sia/gateway-sia.go | 2 +- cmd/lockinfo-handlers.go | 45 +------- cmd/lockinfo-handlers_test.go | 15 ++- cmd/namespace-lock.go | 14 ++- cmd/object-api-errors.go | 7 ++ cmd/object-api-getobject_test.go | 6 +- cmd/object-api-interface.go | 11 +- cmd/object-api-multipart-common.go | 2 +- cmd/object-handlers-common.go | 6 - cmd/object-handlers.go | 72 +----------- cmd/object-handlers_test.go | 10 +- cmd/object_api_suite_test.go | 10 +- cmd/web-handlers.go | 32 +----- cmd/web-handlers_test.go | 2 +- cmd/xl-v1-bucket.go | 5 + cmd/xl-v1-healing.go | 18 ++- cmd/xl-v1-list-objects-heal.go | 12 +- cmd/xl-v1-multipart.go | 83 ++++++++++---- cmd/xl-v1-object.go | 80 +++++++++++++- cmd/xl-v1-object_test.go | 2 +- cmd/xl-v1.go | 63 ++++++++++- 42 files changed, 468 insertions(+), 355 deletions(-) diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index 9ea273dfa..bc97ec790 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -412,11 +412,7 @@ func (adminAPI adminAPIHandlers) ClearLocksHandler(w http.ResponseWriter, r *htt errorIf(err, "Failed to marshal lock information into json.") return } - - // Remove lock matching bucket/prefix held longer than duration. - for _, volLock := range volLocks { - globalNSMutex.ForceUnlock(volLock.Bucket, volLock.Object) - } + newObjectLayerFn().ClearLocks(volLocks) // Reply with list of locks cleared, as json. writeSuccessResponseJSON(w, jsonBytes) diff --git a/cmd/benchmark-utils_test.go b/cmd/benchmark-utils_test.go index 116d304dc..e912ada7d 100644 --- a/cmd/benchmark-utils_test.go +++ b/cmd/benchmark-utils_test.go @@ -239,7 +239,7 @@ func runGetObjectBenchmark(b *testing.B, obj ObjectLayer, objSize int) { b.ResetTimer() for i := 0; i < b.N; i++ { var buffer = new(bytes.Buffer) - err = obj.GetObject(bucket, "object"+strconv.Itoa(i%10), 0, int64(objSize), buffer) + err = obj.GetObject(bucket, "object"+strconv.Itoa(i%10), 0, int64(objSize), buffer, "") if err != nil { b.Error(err) } @@ -402,7 +402,7 @@ func runGetObjectBenchmarkParallel(b *testing.B, obj ObjectLayer, objSize int) { b.RunParallel(func(pb *testing.PB) { i := 0 for pb.Next() { - err = obj.GetObject(bucket, "object"+strconv.Itoa(i), 0, int64(objSize), ioutil.Discard) + err = obj.GetObject(bucket, "object"+strconv.Itoa(i), 0, int64(objSize), ioutil.Discard, "") if err != nil { b.Error(err) } diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index 700416b2e..78a7b2174 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -301,16 +301,9 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, } return } - objectLock := globalNSMutex.NewNSLock(bucket, obj.ObjectName) - if timedOutErr := objectLock.GetLock(globalObjectTimeout); timedOutErr != nil { - dErrs[i] = timedOutErr - } else { - defer objectLock.Unlock() - - dErr := objectAPI.DeleteObject(bucket, obj.ObjectName) - if dErr != nil { - dErrs[i] = dErr - } + dErr := objectAPI.DeleteObject(bucket, obj.ObjectName) + if dErr != nil { + dErrs[i] = dErr } }(index, object) } @@ -404,13 +397,6 @@ func (api objectAPIHandlers) PutBucketHandler(w http.ResponseWriter, r *http.Req return } - bucketLock := globalNSMutex.NewNSLock(bucket, "") - if bucketLock.GetLock(globalObjectTimeout) != nil { - writeErrorResponse(w, ErrOperationTimedOut, r.URL) - return - } - defer bucketLock.Unlock() - // Proceed to creating a bucket. err := objectAPI.MakeBucketWithLocation(bucket, "") if err != nil { @@ -559,13 +545,6 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h return } - objectLock := globalNSMutex.NewNSLock(bucket, object) - if objectLock.GetLock(globalObjectTimeout) != nil { - writeErrorResponse(w, ErrOperationTimedOut, r.URL) - return - } - defer objectLock.Unlock() - hashReader, err := hash.NewReader(fileBody, fileSize, "", "") if err != nil { errorIf(err, "Unable to initialize hashReader.") diff --git a/cmd/bucket-policy.go b/cmd/bucket-policy.go index d2496adc3..0d817f11e 100644 --- a/cmd/bucket-policy.go +++ b/cmd/bucket-policy.go @@ -148,15 +148,8 @@ func initBucketPolicies(objAPI ObjectLayer) error { func readBucketPolicyJSON(bucket string, objAPI ObjectLayer) (bucketPolicyReader io.Reader, err error) { policyPath := pathJoin(bucketConfigPrefix, bucket, bucketPolicyConfig) - // Acquire a read lock on policy config before reading. - objLock := globalNSMutex.NewNSLock(minioMetaBucket, policyPath) - if err = objLock.GetRLock(globalOperationTimeout); err != nil { - return nil, err - } - defer objLock.RUnlock() - var buffer bytes.Buffer - err = objAPI.GetObject(minioMetaBucket, policyPath, 0, -1, &buffer) + err = objAPI.GetObject(minioMetaBucket, policyPath, 0, -1, &buffer, "") if err != nil { if isErrObjectNotFound(err) || isErrIncompleteBody(err) { return nil, BucketPolicyNotFound{Bucket: bucket} @@ -190,12 +183,6 @@ func readBucketPolicy(bucket string, objAPI ObjectLayer) (policy.BucketAccessPol // if no policies are found. func removeBucketPolicy(bucket string, objAPI ObjectLayer) error { policyPath := pathJoin(bucketConfigPrefix, bucket, bucketPolicyConfig) - // Acquire a write lock on policy config before modifying. - objLock := globalNSMutex.NewNSLock(minioMetaBucket, policyPath) - if err := objLock.GetLock(globalOperationTimeout); err != nil { - return err - } - defer objLock.Unlock() err := objAPI.DeleteObject(minioMetaBucket, policyPath) if err != nil { err = errors.Cause(err) @@ -215,13 +202,6 @@ func writeBucketPolicy(bucket string, objAPI ObjectLayer, bpy policy.BucketAcces return err } policyPath := pathJoin(bucketConfigPrefix, bucket, bucketPolicyConfig) - // Acquire a write lock on policy config before modifying. - objLock := globalNSMutex.NewNSLock(minioMetaBucket, policyPath) - if err = objLock.GetLock(globalOperationTimeout); err != nil { - return err - } - defer objLock.Unlock() - hashReader, err := hash.NewReader(bytes.NewReader(buf), int64(len(buf)), "", getSHA256Hash(buf)) if err != nil { errorIf(err, "Unable to set policy for the bucket %s", bucket) diff --git a/cmd/event-notifier.go b/cmd/event-notifier.go index 42fb6f25a..3d5ec1ac2 100644 --- a/cmd/event-notifier.go +++ b/cmd/event-notifier.go @@ -365,15 +365,8 @@ func loadNotificationConfig(bucket string, objAPI ObjectLayer) (*notificationCon // Construct the notification config path. ncPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig) - // Acquire a write lock on notification config before modifying. - objLock := globalNSMutex.NewNSLock(minioMetaBucket, ncPath) - if err := objLock.GetRLock(globalOperationTimeout); err != nil { - return nil, err - } - defer objLock.RUnlock() - var buffer bytes.Buffer - err := objAPI.GetObject(minioMetaBucket, ncPath, 0, -1, &buffer) // Read everything. + err := objAPI.GetObject(minioMetaBucket, ncPath, 0, -1, &buffer, "") // Read everything. if err != nil { // 'notification.xml' not found return // 'errNoSuchNotifications'. This is default when no @@ -416,15 +409,8 @@ func loadListenerConfig(bucket string, objAPI ObjectLayer) ([]listenerConfig, er // Construct the notification config path. lcPath := path.Join(bucketConfigPrefix, bucket, bucketListenerConfig) - // Acquire a write lock on notification config before modifying. - objLock := globalNSMutex.NewNSLock(minioMetaBucket, lcPath) - if err := objLock.GetRLock(globalOperationTimeout); err != nil { - return nil, err - } - defer objLock.RUnlock() - var buffer bytes.Buffer - err := objAPI.GetObject(minioMetaBucket, lcPath, 0, -1, &buffer) + err := objAPI.GetObject(minioMetaBucket, lcPath, 0, -1, &buffer, "") if err != nil { // 'listener.json' not found return // 'errNoSuchNotifications'. This is default when no @@ -463,12 +449,6 @@ func persistNotificationConfig(bucket string, ncfg *notificationConfig, obj Obje // build path ncPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig) - // Acquire a write lock on notification config before modifying. - objLock := globalNSMutex.NewNSLock(minioMetaBucket, ncPath) - if err = objLock.GetLock(globalOperationTimeout); err != nil { - return err - } - defer objLock.Unlock() // write object to path hashReader, err := hash.NewReader(bytes.NewReader(buf), int64(len(buf)), "", getSHA256Hash(buf)) @@ -494,12 +474,6 @@ func persistListenerConfig(bucket string, lcfg []listenerConfig, obj ObjectLayer // build path lcPath := path.Join(bucketConfigPrefix, bucket, bucketListenerConfig) - // Acquire a write lock on notification config before modifying. - objLock := globalNSMutex.NewNSLock(minioMetaBucket, lcPath) - if err = objLock.GetLock(globalOperationTimeout); err != nil { - return err - } - defer objLock.Unlock() // write object to path hashReader, err := hash.NewReader(bytes.NewReader(buf), int64(len(buf)), "", getSHA256Hash(buf)) @@ -526,12 +500,6 @@ func removeNotificationConfig(bucket string, objAPI ObjectLayer) error { ncPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig) - // Acquire a write lock on notification config before modifying. - objLock := globalNSMutex.NewNSLock(minioMetaBucket, ncPath) - if err := objLock.GetLock(globalOperationTimeout); err != nil { - return err - } - defer objLock.Unlock() return objAPI.DeleteObject(minioMetaBucket, ncPath) } @@ -540,12 +508,6 @@ func removeListenerConfig(bucket string, objAPI ObjectLayer) error { // make the path lcPath := path.Join(bucketConfigPrefix, bucket, bucketListenerConfig) - // Acquire a write lock on notification config before modifying. - objLock := globalNSMutex.NewNSLock(minioMetaBucket, lcPath) - if err := objLock.GetLock(globalOperationTimeout); err != nil { - return err - } - defer objLock.Unlock() return objAPI.DeleteObject(minioMetaBucket, lcPath) } diff --git a/cmd/fs-v1-multipart.go b/cmd/fs-v1-multipart.go index 73d81e1a2..385015fa8 100644 --- a/cmd/fs-v1-multipart.go +++ b/cmd/fs-v1-multipart.go @@ -109,7 +109,7 @@ func (fs fsObjects) listMultipartUploadIDs(bucketName, objectName, uploadIDMarke // Hold the lock so that two parallel complete-multipart-uploads // do not leave a stale uploads.json behind. - objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucketName, objectName)) + objectMPartPathLock := fs.nsMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucketName, objectName)) if err := objectMPartPathLock.GetRLock(globalListingTimeout); err != nil { return nil, false, errors.Trace(err) } @@ -408,7 +408,7 @@ func (fs fsObjects) NewMultipartUpload(bucket, object string, meta map[string]st // Hold the lock so that two parallel complete-multipart-uploads // do not leave a stale uploads.json behind. - objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object)) + objectMPartPathLock := fs.nsMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object)) if err := objectMPartPathLock.GetLock(globalOperationTimeout); err != nil { return "", err } @@ -437,17 +437,34 @@ func partToAppend(fsMeta fsMetaV1, fsAppendMeta fsMetaV1) (part objectPartInfo, // object. Internally incoming data is written to '.minio.sys/tmp' location // and safely renamed to '.minio.sys/multipart' for reach parts. func (fs fsObjects) CopyObjectPart(srcBucket, srcObject, dstBucket, dstObject, uploadID string, partID int, - startOffset int64, length int64, metadata map[string]string) (pi PartInfo, e error) { + startOffset int64, length int64, metadata map[string]string, srcEtag string) (pi PartInfo, e error) { + + // Hold read locks on source object only if we are + // going to read data from source object. + objectSRLock := fs.nsMutex.NewNSLock(srcBucket, srcObject) + if err := objectSRLock.GetRLock(globalObjectTimeout); err != nil { + return pi, err + } + defer objectSRLock.RUnlock() if err := checkNewMultipartArgs(srcBucket, srcObject, fs); err != nil { return pi, err } + if srcEtag != "" { + etag, err := fs.getObjectETag(srcBucket, srcObject) + if err != nil { + return pi, toObjectErr(err, srcBucket, srcObject) + } + if etag != srcEtag { + return pi, toObjectErr(errors.Trace(InvalidETag{}), srcBucket, srcObject) + } + } // Initialize pipe. pipeReader, pipeWriter := io.Pipe() go func() { - if gerr := fs.GetObject(srcBucket, srcObject, startOffset, length, pipeWriter); gerr != nil { + if gerr := fs.getObject(srcBucket, srcObject, startOffset, length, pipeWriter, srcEtag); gerr != nil { errorIf(gerr, "Unable to read %s/%s.", srcBucket, srcObject) pipeWriter.CloseWithError(gerr) return @@ -491,7 +508,7 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, d // Hold the lock so that two parallel complete-multipart-uploads // do not leave a stale uploads.json behind. - objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object)) + objectMPartPathLock := fs.nsMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object)) if err := objectMPartPathLock.GetLock(globalOperationTimeout); err != nil { return pi, err } @@ -557,7 +574,7 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, d partPath := pathJoin(bucket, object, uploadID, partSuffix) // Lock the part so that another part upload with same part-number gets blocked // while the part is getting appended in the background. - partLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, partPath) + partLock := fs.nsMutex.NewNSLock(minioMetaMultipartBucket, partPath) if err = partLock.GetLock(globalOperationTimeout); err != nil { return pi, err } @@ -689,7 +706,7 @@ func (fs fsObjects) ListObjectParts(bucket, object, uploadID string, partNumberM // Hold the lock so that two parallel complete-multipart-uploads // do not leave a stale uploads.json behind. - objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object)) + objectMPartPathLock := fs.nsMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object)) if err := objectMPartPathLock.GetRLock(globalListingTimeout); err != nil { return lpi, errors.Trace(err) } @@ -714,6 +731,12 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload return oi, err } + // Hold write lock on the object. + destLock := fs.nsMutex.NewNSLock(bucket, object) + if err := destLock.GetLock(globalObjectTimeout); err != nil { + return oi, err + } + defer destLock.Unlock() // Check if an object is present as one of the parent dir. if fs.parentDirIsObject(bucket, pathutil.Dir(object)) { return oi, toObjectErr(errors.Trace(errFileAccessDenied), bucket, object) @@ -734,7 +757,7 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload // Hold the lock so that two parallel complete-multipart-uploads // do not leave a stale uploads.json behind. - objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object)) + objectMPartPathLock := fs.nsMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object)) if err = objectMPartPathLock.GetLock(globalOperationTimeout); err != nil { return oi, err } @@ -974,7 +997,7 @@ func (fs fsObjects) AbortMultipartUpload(bucket, object, uploadID string) error // Hold the lock so that two parallel complete-multipart-uploads // do not leave a stale uploads.json behind. - objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, + objectMPartPathLock := fs.nsMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object)) if err := objectMPartPathLock.GetLock(globalOperationTimeout); err != nil { return err diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index 9eb93bbe5..a2b469fb4 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -26,6 +26,7 @@ import ( "path/filepath" "sort" "syscall" + "time" "github.com/minio/minio/pkg/errors" "github.com/minio/minio/pkg/hash" @@ -52,6 +53,9 @@ type fsObjects struct { // To manage the appendRoutine go0routines bgAppend *backgroundAppend + + // name space mutex for object layer + nsMutex *nsLockMap } // Initializes meta volume on all the fs path. @@ -138,6 +142,7 @@ func newFSObjectLayer(fsPath string) (ObjectLayer, error) { bgAppend: &backgroundAppend{ infoMap: make(map[string]bgAppendPartsInfo), }, + nsMutex: newNSLock(false), } // Once the filesystem has initialized hold the read lock for @@ -183,6 +188,18 @@ func (fs fsObjects) StorageInfo() StorageInfo { return storageInfo } +// Locking operations + +// List namespace locks held in object layer +func (fs fsObjects) ListLocks(bucket, prefix string, duration time.Duration) ([]VolumeLockInfo, error) { + return []VolumeLockInfo{}, NotImplemented{} +} + +// Clear namespace locks held in object layer +func (fs fsObjects) ClearLocks([]VolumeLockInfo) error { + return NotImplemented{} +} + /// Bucket operations // getBucketDir - will convert incoming bucket names to @@ -213,6 +230,11 @@ func (fs fsObjects) statBucketDir(bucket string) (os.FileInfo, error) { // MakeBucket - create a new bucket, returns if it // already exists. func (fs fsObjects) MakeBucketWithLocation(bucket, location string) error { + bucketLock := fs.nsMutex.NewNSLock(bucket, "") + if err := bucketLock.GetLock(globalObjectTimeout); err != nil { + return err + } + defer bucketLock.Unlock() bucketDir, err := fs.getBucketDir(bucket) if err != nil { return toObjectErr(err, bucket) @@ -313,7 +335,29 @@ func (fs fsObjects) DeleteBucket(bucket string) error { // CopyObject - copy object source object to destination object. // if source object and destination object are same we only // update metadata. -func (fs fsObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string, metadata map[string]string) (oi ObjectInfo, e error) { +func (fs fsObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string, metadata map[string]string, srcEtag string) (oi ObjectInfo, e error) { + cpSrcDstSame := srcBucket == dstBucket && srcObject == dstObject + // Hold write lock on destination since in both cases + // - if source and destination are same + // - if source and destination are different + // it is the sole mutating state. + objectDWLock := fs.nsMutex.NewNSLock(dstBucket, dstObject) + if err := objectDWLock.GetLock(globalObjectTimeout); err != nil { + return oi, err + } + defer objectDWLock.Unlock() + // if source and destination are different, we have to hold + // additional read lock as well to protect against writes on + // source. + if !cpSrcDstSame { + // Hold read locks on source object only if we are + // going to read data from source object. + objectSRLock := fs.nsMutex.NewNSLock(srcBucket, srcObject) + if err := objectSRLock.GetRLock(globalObjectTimeout); err != nil { + return oi, err + } + defer objectSRLock.RUnlock() + } if _, err := fs.statBucketDir(srcBucket); err != nil { return oi, toObjectErr(err, srcBucket) } @@ -323,6 +367,15 @@ func (fs fsObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string if err != nil { return oi, toObjectErr(err, srcBucket, srcObject) } + if srcEtag != "" { + etag, perr := fs.getObjectETag(srcBucket, srcObject) + if perr != nil { + return oi, toObjectErr(perr, srcBucket, srcObject) + } + if etag != srcEtag { + return oi, toObjectErr(errors.Trace(InvalidETag{}), srcBucket, srcObject) + } + } // Check if this request is only metadata update. cpMetadataOnly := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(dstBucket, dstObject)) @@ -355,7 +408,7 @@ func (fs fsObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string go func() { var startOffset int64 // Read the whole file. - if gerr := fs.GetObject(srcBucket, srcObject, startOffset, length, pipeWriter); gerr != nil { + if gerr := fs.getObject(srcBucket, srcObject, startOffset, length, pipeWriter, ""); gerr != nil { errorIf(gerr, "Unable to read %s/%s.", srcBucket, srcObject) pipeWriter.CloseWithError(gerr) return @@ -368,7 +421,7 @@ func (fs fsObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string return oi, toObjectErr(err, dstBucket, dstObject) } - objInfo, err := fs.PutObject(dstBucket, dstObject, hashReader, metadata) + objInfo, err := fs.putObject(dstBucket, dstObject, hashReader, metadata) if err != nil { return oi, toObjectErr(err, dstBucket, dstObject) } @@ -385,11 +438,22 @@ func (fs fsObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string // // startOffset indicates the starting read location of the object. // length indicates the total length of the object. -func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64, writer io.Writer) (err error) { +func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64, writer io.Writer, etag string) (err error) { if err = checkBucketAndObjectNamesFS(bucket, object); err != nil { return err } + // Lock the object before reading. + objectLock := fs.nsMutex.NewNSLock(bucket, object) + if err := objectLock.GetRLock(globalObjectTimeout); err != nil { + return err + } + defer objectLock.RUnlock() + return fs.getObject(bucket, object, offset, length, writer, etag) +} + +// getObject - wrapper for GetObject +func (fs fsObjects) getObject(bucket, object string, offset int64, length int64, writer io.Writer, etag string) (err error) { if _, err = fs.statBucketDir(bucket); err != nil { return toObjectErr(err, bucket) } @@ -419,6 +483,15 @@ func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64, defer fs.rwPool.Close(fsMetaPath) } + if etag != "" { + objEtag, perr := fs.getObjectETag(bucket, object) + if perr != nil { + return toObjectErr(errors.Trace(perr), bucket, object) + } + if objEtag != etag { + return toObjectErr(errors.Trace(InvalidETag{}), bucket, object) + } + } // Read the object, doesn't exist returns an s3 compatible error. fsObjPath := pathJoin(fs.fsPath, bucket, object) reader, size, err := fsOpenFile(fsObjPath, offset) @@ -516,6 +589,13 @@ func checkBucketAndObjectNamesFS(bucket, object string) error { // GetObjectInfo - reads object metadata and replies back ObjectInfo. func (fs fsObjects) GetObjectInfo(bucket, object string) (oi ObjectInfo, e error) { + // Lock the object before reading. + objectLock := fs.nsMutex.NewNSLock(bucket, object) + if err := objectLock.GetRLock(globalObjectTimeout); err != nil { + return oi, err + } + defer objectLock.RUnlock() + if err := checkBucketAndObjectNamesFS(bucket, object); err != nil { return oi, err } @@ -552,11 +632,21 @@ func (fs fsObjects) parentDirIsObject(bucket, parent string) bool { // Additionally writes `fs.json` which carries the necessary metadata // for future object operations. func (fs fsObjects) PutObject(bucket string, object string, data *hash.Reader, metadata map[string]string) (objInfo ObjectInfo, retErr error) { + // Lock the object. + objectLock := fs.nsMutex.NewNSLock(bucket, object) + if err := objectLock.GetLock(globalObjectTimeout); err != nil { + return objInfo, err + } + defer objectLock.Unlock() + return fs.putObject(bucket, object, data, metadata) +} + +// putObject - wrapper for PutObject +func (fs fsObjects) putObject(bucket string, object string, data *hash.Reader, metadata map[string]string) (objInfo ObjectInfo, retErr error) { // No metadata is set, allocate a new one. if metadata == nil { metadata = make(map[string]string) } - var err error // Validate if bucket name is valid and exists. @@ -677,6 +767,13 @@ func (fs fsObjects) PutObject(bucket string, object string, data *hash.Reader, m // DeleteObject - deletes an object from a bucket, this operation is destructive // and there are no rollbacks supported. func (fs fsObjects) DeleteObject(bucket, object string) error { + // Acquire a write lock before deleting the object. + objectLock := fs.nsMutex.NewNSLock(bucket, object) + if err := objectLock.GetLock(globalOperationTimeout); err != nil { + return err + } + defer objectLock.Unlock() + if err := checkBucketAndObjectNamesFS(bucket, object); err != nil { return err } @@ -825,7 +922,7 @@ func (fs fsObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKey // Convert entry to ObjectInfo entryToObjectInfo := func(entry string) (objInfo ObjectInfo, err error) { // Protect the entry from concurrent deletes, or renames. - objectLock := globalNSMutex.NewNSLock(bucket, entry) + objectLock := fs.nsMutex.NewNSLock(bucket, entry) if err = objectLock.GetRLock(globalListingTimeout); err != nil { return ObjectInfo{}, err } diff --git a/cmd/gateway-handlers.go b/cmd/gateway-handlers.go index 3504119dc..47ff64437 100644 --- a/cmd/gateway-handlers.go +++ b/cmd/gateway-handlers.go @@ -128,7 +128,7 @@ func (api gatewayAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Re setHeadGetRespHeaders(w, r.URL.Query()) httpWriter := ioutil.WriteOnClose(w) // Reads the object at startOffset and writes to mw. - if err = getObject(bucket, object, startOffset, length, httpWriter); err != nil { + if err = getObject(bucket, object, startOffset, length, httpWriter, objInfo.ETag); err != nil { errorIf(err, "Unable to write to client.") if !httpWriter.HasWritten() { // Error response only if no data has been written to client yet. i.e if @@ -250,14 +250,6 @@ func (api gatewayAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Re } } - // Lock the object. - objectLock := globalNSMutex.NewNSLock(bucket, object) - if objectLock.GetLock(globalOperationTimeout) != nil { - writeErrorResponse(w, ErrOperationTimedOut, r.URL) - return - } - defer objectLock.Unlock() - var ( // Make sure we hex encode md5sum here. md5hex = hex.EncodeToString(md5Bytes) diff --git a/cmd/gateway-router.go b/cmd/gateway-router.go index 12a3aa3c1..5a3a7d19e 100644 --- a/cmd/gateway-router.go +++ b/cmd/gateway-router.go @@ -45,7 +45,7 @@ type Gateway interface { type GatewayLayer interface { ObjectLayer - AnonGetObject(bucket, object string, startOffset int64, length int64, writer io.Writer) (err error) + AnonGetObject(bucket, object string, startOffset int64, length int64, writer io.Writer, etag string) (err error) AnonGetObjectInfo(bucket, object string) (objInfo ObjectInfo, err error) AnonPutObject(bucket string, object string, data *hash.Reader, metadata map[string]string) (ObjectInfo, error) diff --git a/cmd/gateway-unsupported.go b/cmd/gateway-unsupported.go index 85d14817b..186708f60 100644 --- a/cmd/gateway-unsupported.go +++ b/cmd/gateway-unsupported.go @@ -18,6 +18,7 @@ package cmd import ( "io" + "time" "github.com/minio/minio-go/pkg/policy" "github.com/minio/minio/pkg/errors" @@ -38,7 +39,7 @@ func (a GatewayUnsupported) NewMultipartUpload(bucket string, object string, met } // CopyObjectPart copy part of object to other bucket and object -func (a GatewayUnsupported) CopyObjectPart(srcBucket string, srcObject string, destBucket string, destObject string, uploadID string, partID int, startOffset int64, length int64, metadata map[string]string) (pi PartInfo, err error) { +func (a GatewayUnsupported) CopyObjectPart(srcBucket string, srcObject string, destBucket string, destObject string, uploadID string, partID int, startOffset int64, length int64, metadata map[string]string, srcEtag string) (pi PartInfo, err error) { return pi, errors.Trace(NotImplemented{}) } @@ -132,7 +133,7 @@ func (a GatewayUnsupported) AnonPutObject(bucket, object string, data *hash.Read } // AnonGetObject downloads object anonymously. -func (a GatewayUnsupported) AnonGetObject(bucket, object string, startOffset int64, length int64, writer io.Writer) (err error) { +func (a GatewayUnsupported) AnonGetObject(bucket, object string, startOffset int64, length int64, writer io.Writer, etag string) (err error) { return errors.Trace(NotImplemented{}) } @@ -143,6 +144,18 @@ func (a GatewayUnsupported) AnonGetObjectInfo(bucket, object string) (objInfo Ob // CopyObject copies a blob from source container to destination container. func (a GatewayUnsupported) CopyObject(srcBucket string, srcObject string, destBucket string, destObject string, - metadata map[string]string) (objInfo ObjectInfo, err error) { + metadata map[string]string, srcEtag string) (objInfo ObjectInfo, err error) { return objInfo, errors.Trace(NotImplemented{}) } + +// Locking operations + +// ListLocks lists namespace locks held in object layer +func (a GatewayUnsupported) ListLocks(bucket, prefix string, duration time.Duration) ([]VolumeLockInfo, error) { + return []VolumeLockInfo{}, errors.Trace(NotImplemented{}) +} + +// ClearLocks clears namespace locks held in object layer +func (a GatewayUnsupported) ClearLocks([]VolumeLockInfo) error { + return errors.Trace(NotImplemented{}) +} diff --git a/cmd/gateway/azure/gateway-azure-anonymous.go b/cmd/gateway/azure/gateway-azure-anonymous.go index 532a8b7a6..87be5e9f2 100644 --- a/cmd/gateway/azure/gateway-azure-anonymous.go +++ b/cmd/gateway/azure/gateway-azure-anonymous.go @@ -145,7 +145,7 @@ func (a *azureObjects) AnonGetBucketInfo(bucket string) (bucketInfo minio.Bucket // AnonGetObject - SendGET request without authentication. // This is needed when clients send GET requests on objects that can be downloaded without auth. -func (a *azureObjects) AnonGetObject(bucket, object string, startOffset int64, length int64, writer io.Writer) (err error) { +func (a *azureObjects) AnonGetObject(bucket, object string, startOffset int64, length int64, writer io.Writer, etag string) (err error) { h := make(http.Header) if length > 0 && startOffset > 0 { h.Add("Range", fmt.Sprintf("bytes=%d-%d", startOffset, startOffset+length-1)) diff --git a/cmd/gateway/azure/gateway-azure.go b/cmd/gateway/azure/gateway-azure.go index c1cf17c74..c135a5b46 100644 --- a/cmd/gateway/azure/gateway-azure.go +++ b/cmd/gateway/azure/gateway-azure.go @@ -554,7 +554,7 @@ func (a *azureObjects) ListObjectsV2(bucket, prefix, continuationToken, delimite // // startOffset indicates the starting read location of the object. // length indicates the total length of the object. -func (a *azureObjects) GetObject(bucket, object string, startOffset int64, length int64, writer io.Writer) error { +func (a *azureObjects) GetObject(bucket, object string, startOffset int64, length int64, writer io.Writer, etag string) error { // startOffset cannot be negative. if startOffset < 0 { return azureToObjectError(errors.Trace(minio.InvalidRange{}), bucket, object) @@ -621,7 +621,7 @@ func (a *azureObjects) PutObject(bucket, object string, data *hash.Reader, metad // CopyObject - Copies a blob from source container to destination container. // Uses Azure equivalent CopyBlob API. -func (a *azureObjects) CopyObject(srcBucket, srcObject, destBucket, destObject string, metadata map[string]string) (objInfo minio.ObjectInfo, err error) { +func (a *azureObjects) CopyObject(srcBucket, srcObject, destBucket, destObject string, metadata map[string]string, srcEtag string) (objInfo minio.ObjectInfo, err error) { srcBlobURL := a.client.GetContainerReference(srcBucket).GetBlobReference(srcObject).GetURL() destBlob := a.client.GetContainerReference(destBucket).GetBlobReference(destObject) azureMeta, props, err := s3MetaToAzureProperties(metadata) diff --git a/cmd/gateway/b2/gateway-b2-anonymous.go b/cmd/gateway/b2/gateway-b2-anonymous.go index 6a3f550e1..7acbaac57 100644 --- a/cmd/gateway/b2/gateway-b2-anonymous.go +++ b/cmd/gateway/b2/gateway-b2-anonymous.go @@ -43,7 +43,7 @@ func mkRange(offset, size int64) string { // AnonGetObject - performs a plain http GET request on a public resource, // fails if the resource is not public. -func (l *b2Objects) AnonGetObject(bucket string, object string, startOffset int64, length int64, writer io.Writer) error { +func (l *b2Objects) AnonGetObject(bucket string, object string, startOffset int64, length int64, writer io.Writer, etag string) error { uri := fmt.Sprintf("%s/file/%s/%s", l.b2Client.DownloadURI, bucket, object) req, err := http.NewRequest("GET", uri, nil) if err != nil { diff --git a/cmd/gateway/b2/gateway-b2.go b/cmd/gateway/b2/gateway-b2.go index f381a8dd7..a1f8c2896 100644 --- a/cmd/gateway/b2/gateway-b2.go +++ b/cmd/gateway/b2/gateway-b2.go @@ -379,7 +379,7 @@ func (l *b2Objects) ListObjectsV2(bucket, prefix, continuationToken, delimiter s // // startOffset indicates the starting read location of the object. // length indicates the total length of the object. -func (l *b2Objects) GetObject(bucket string, object string, startOffset int64, length int64, writer io.Writer) error { +func (l *b2Objects) GetObject(bucket string, object string, startOffset int64, length int64, writer io.Writer, etag string) error { bkt, err := l.Bucket(bucket) if err != nil { return err @@ -523,7 +523,7 @@ func (l *b2Objects) PutObject(bucket string, object string, data *h2.Reader, met // CopyObject copies a blob from source container to destination container. func (l *b2Objects) CopyObject(srcBucket string, srcObject string, dstBucket string, - dstObject string, metadata map[string]string) (objInfo minio.ObjectInfo, err error) { + dstObject string, metadata map[string]string, srcEtag string) (objInfo minio.ObjectInfo, err error) { return objInfo, errors.Trace(minio.NotImplemented{}) } diff --git a/cmd/gateway/gcs/gateway-gcs-anonymous.go b/cmd/gateway/gcs/gateway-gcs-anonymous.go index c89cdc61f..3a41c00be 100644 --- a/cmd/gateway/gcs/gateway-gcs-anonymous.go +++ b/cmd/gateway/gcs/gateway-gcs-anonymous.go @@ -33,7 +33,7 @@ func toGCSPublicURL(bucket, object string) string { } // AnonGetObject - Get object anonymously -func (l *gcsGateway) AnonGetObject(bucket string, object string, startOffset int64, length int64, writer io.Writer) error { +func (l *gcsGateway) AnonGetObject(bucket string, object string, startOffset int64, length int64, writer io.Writer, etag string) error { req, err := http.NewRequest("GET", toGCSPublicURL(bucket, object), nil) if err != nil { return gcsToObjectError(errors.Trace(err), bucket, object) diff --git a/cmd/gateway/gcs/gateway-gcs.go b/cmd/gateway/gcs/gateway-gcs.go index 4b6927e3f..45ea876c3 100644 --- a/cmd/gateway/gcs/gateway-gcs.go +++ b/cmd/gateway/gcs/gateway-gcs.go @@ -725,7 +725,7 @@ func (l *gcsGateway) ListObjectsV2(bucket, prefix, continuationToken, delimiter // // startOffset indicates the starting read location of the object. // length indicates the total length of the object. -func (l *gcsGateway) GetObject(bucket string, key string, startOffset int64, length int64, writer io.Writer) error { +func (l *gcsGateway) GetObject(bucket string, key string, startOffset int64, length int64, writer io.Writer, etag string) error { // if we want to mimic S3 behavior exactly, we need to verify if bucket exists first, // otherwise gcs will just return object not exist in case of non-existing bucket if _, err := l.client.Bucket(bucket).Attrs(l.ctx); err != nil { @@ -813,7 +813,7 @@ func (l *gcsGateway) PutObject(bucket string, key string, data *hash.Reader, met // CopyObject - Copies a blob from source container to destination container. func (l *gcsGateway) CopyObject(srcBucket string, srcObject string, destBucket string, destObject string, - metadata map[string]string) (minio.ObjectInfo, error) { + metadata map[string]string, srcEtag string) (minio.ObjectInfo, error) { src := l.client.Bucket(srcBucket).Object(srcObject) dst := l.client.Bucket(destBucket).Object(destObject) diff --git a/cmd/gateway/manta/gateway-manta.go b/cmd/gateway/manta/gateway-manta.go index 845e74489..0f291dd99 100644 --- a/cmd/gateway/manta/gateway-manta.go +++ b/cmd/gateway/manta/gateway-manta.go @@ -468,7 +468,7 @@ func (t *tritonObjects) ListObjectsV2(bucket, prefix, continuationToken, delimit // indicates the total length of the object. // // https://apidocs.joyent.com/manta/api.html#GetObject -func (t *tritonObjects) GetObject(bucket, object string, startOffset int64, length int64, writer io.Writer) error { +func (t *tritonObjects) GetObject(bucket, object string, startOffset int64, length int64, writer io.Writer, etag string) error { // Start offset cannot be negative. if startOffset < 0 { return errors.Trace(fmt.Errorf("Unexpected error")) @@ -568,7 +568,7 @@ func (t *tritonObjects) PutObject(bucket, object string, data *hash.Reader, meta // Uses Manta Snaplinks API. // // https://apidocs.joyent.com/manta/api.html#PutSnapLink -func (t *tritonObjects) CopyObject(srcBucket, srcObject, destBucket, destObject string, metadata map[string]string) (objInfo minio.ObjectInfo, err error) { +func (t *tritonObjects) CopyObject(srcBucket, srcObject, destBucket, destObject string, metadata map[string]string, srcEtag string) (objInfo minio.ObjectInfo, err error) { ctx := context.Background() if err = t.client.SnapLinks().Put(ctx, &storage.PutSnapLinkInput{ SourcePath: path.Join(mantaRoot, srcBucket, srcObject), diff --git a/cmd/gateway/oss/gateway-oss-anonymous.go b/cmd/gateway/oss/gateway-oss-anonymous.go index 3e8442e4e..24856d5fe 100644 --- a/cmd/gateway/oss/gateway-oss-anonymous.go +++ b/cmd/gateway/oss/gateway-oss-anonymous.go @@ -29,8 +29,8 @@ func (l *ossObjects) AnonPutObject(bucket, object string, data *hash.Reader, met } // AnonGetObject - Get object anonymously -func (l *ossObjects) AnonGetObject(bucket, key string, startOffset, length int64, writer io.Writer) error { - return ossGetObject(l.anonClient, bucket, key, startOffset, length, writer) +func (l *ossObjects) AnonGetObject(bucket, key string, startOffset, length int64, writer io.Writer, etag string) error { + return ossGetObject(l.anonClient, bucket, key, startOffset, length, writer, etag) } // AnonGetObjectInfo - Get object info anonymously diff --git a/cmd/gateway/oss/gateway-oss.go b/cmd/gateway/oss/gateway-oss.go index a87789b5d..2cefe33cd 100644 --- a/cmd/gateway/oss/gateway-oss.go +++ b/cmd/gateway/oss/gateway-oss.go @@ -506,7 +506,7 @@ func (l *ossObjects) ListObjectsV2(bucket, prefix, continuationToken, delimiter // // startOffset indicates the starting read location of the object. // length indicates the total length of the object. -func ossGetObject(client *oss.Client, bucket, key string, startOffset, length int64, writer io.Writer) error { +func ossGetObject(client *oss.Client, bucket, key string, startOffset, length int64, writer io.Writer, etag string) error { if length < 0 && length != -1 { return ossToObjectError(errors.Trace(fmt.Errorf("Invalid argument")), bucket, key) } @@ -539,8 +539,8 @@ func ossGetObject(client *oss.Client, bucket, key string, startOffset, length in // // startOffset indicates the starting read location of the object. // length indicates the total length of the object. -func (l *ossObjects) GetObject(bucket, key string, startOffset, length int64, writer io.Writer) error { - return ossGetObject(l.Client, bucket, key, startOffset, length, writer) +func (l *ossObjects) GetObject(bucket, key string, startOffset, length int64, writer io.Writer, etag string) error { + return ossGetObject(l.Client, bucket, key, startOffset, length, writer, etag) } func translatePlainError(err error) error { @@ -618,7 +618,7 @@ func (l *ossObjects) PutObject(bucket, object string, data *hash.Reader, metadat } // CopyObject copies an object from source bucket to a destination bucket. -func (l *ossObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string, metadata map[string]string) (objInfo minio.ObjectInfo, err error) { +func (l *ossObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string, metadata map[string]string, srcEtag string) (objInfo minio.ObjectInfo, err error) { bkt, err := l.Client.Bucket(srcBucket) if err != nil { return objInfo, ossToObjectError(errors.Trace(err), srcBucket, srcObject) @@ -804,7 +804,7 @@ func ossListObjectParts(client *oss.Client, bucket, object, uploadID string, par // CopyObjectPart creates a part in a multipart upload by copying // existing object or a part of it. func (l *ossObjects) CopyObjectPart(srcBucket, srcObject, destBucket, destObject, uploadID string, - partID int, startOffset, length int64, metadata map[string]string) (p minio.PartInfo, err error) { + partID int, startOffset, length int64, metadata map[string]string, srcEtag string) (p minio.PartInfo, err error) { bkt, err := l.Client.Bucket(destBucket) if err != nil { diff --git a/cmd/gateway/s3/gateway-s3-anonymous.go b/cmd/gateway/s3/gateway-s3-anonymous.go index 03103cbe2..d14085970 100644 --- a/cmd/gateway/s3/gateway-s3-anonymous.go +++ b/cmd/gateway/s3/gateway-s3-anonymous.go @@ -37,7 +37,7 @@ func (l *s3Objects) AnonPutObject(bucket string, object string, data *hash.Reade } // AnonGetObject - Get object anonymously -func (l *s3Objects) AnonGetObject(bucket string, key string, startOffset int64, length int64, writer io.Writer) error { +func (l *s3Objects) AnonGetObject(bucket string, key string, startOffset int64, length int64, writer io.Writer, etag string) error { opts := miniogo.GetObjectOptions{} if err := opts.SetRange(startOffset, startOffset+length-1); err != nil { return minio.ErrorRespToObjectError(errors.Trace(err), bucket, key) diff --git a/cmd/gateway/s3/gateway-s3.go b/cmd/gateway/s3/gateway-s3.go index 0eb22630e..8bbb6cedc 100644 --- a/cmd/gateway/s3/gateway-s3.go +++ b/cmd/gateway/s3/gateway-s3.go @@ -254,7 +254,7 @@ func (l *s3Objects) ListObjectsV2(bucket, prefix, continuationToken, delimiter s // // startOffset indicates the starting read location of the object. // length indicates the total length of the object. -func (l *s3Objects) GetObject(bucket string, key string, startOffset int64, length int64, writer io.Writer) error { +func (l *s3Objects) GetObject(bucket string, key string, startOffset int64, length int64, writer io.Writer, etag string) error { if length < 0 && length != -1 { return minio.ErrorRespToObjectError(errors.Trace(minio.InvalidRange{}), bucket, key) } @@ -298,7 +298,7 @@ func (l *s3Objects) PutObject(bucket string, object string, data *hash.Reader, m } // CopyObject copies an object from source bucket to a destination bucket. -func (l *s3Objects) CopyObject(srcBucket string, srcObject string, dstBucket string, dstObject string, metadata map[string]string) (objInfo minio.ObjectInfo, err error) { +func (l *s3Objects) CopyObject(srcBucket string, srcObject string, dstBucket string, dstObject string, metadata map[string]string, srcEtag string) (objInfo minio.ObjectInfo, err error) { // Set this header such that following CopyObject() always sets the right metadata on the destination. // metadata input is already a trickled down value from interpreting x-amz-metadata-directive at // handler layer. So what we have right now is supposed to be applied on the destination object anyways. @@ -354,7 +354,7 @@ func (l *s3Objects) PutObjectPart(bucket string, object string, uploadID string, // CopyObjectPart creates a part in a multipart upload by copying // existing object or a part of it. func (l *s3Objects) CopyObjectPart(srcBucket, srcObject, destBucket, destObject, uploadID string, - partID int, startOffset, length int64, metadata map[string]string) (p minio.PartInfo, err error) { + partID int, startOffset, length int64, metadata map[string]string, srcEtag string) (p minio.PartInfo, err error) { completePart, err := l.Client.CopyObjectPart(srcBucket, srcObject, destBucket, destObject, uploadID, partID, startOffset, length, metadata) diff --git a/cmd/gateway/sia/gateway-sia.go b/cmd/gateway/sia/gateway-sia.go index 5cce2ef05..591e9ac9c 100644 --- a/cmd/gateway/sia/gateway-sia.go +++ b/cmd/gateway/sia/gateway-sia.go @@ -410,7 +410,7 @@ func (s *siaObjects) ListObjects(bucket string, prefix string, marker string, de return loi, nil } -func (s *siaObjects) GetObject(bucket string, object string, startOffset int64, length int64, writer io.Writer) error { +func (s *siaObjects) GetObject(bucket string, object string, startOffset int64, length int64, writer io.Writer, etag string) error { dstFile := path.Join(s.TempDir, minio.MustGetUUID()) defer os.Remove(dstFile) diff --git a/cmd/lockinfo-handlers.go b/cmd/lockinfo-handlers.go index c87c957da..238c7daf6 100644 --- a/cmd/lockinfo-handlers.go +++ b/cmd/lockinfo-handlers.go @@ -66,47 +66,6 @@ type OpsLockState struct { // listLocksInfo - Fetches locks held on bucket, matching prefix held for longer than duration. func listLocksInfo(bucket, prefix string, duration time.Duration) []VolumeLockInfo { - globalNSMutex.lockMapMutex.Lock() - defer globalNSMutex.lockMapMutex.Unlock() - - // Fetch current time once instead of fetching system time for every lock. - timeNow := UTCNow() - volumeLocks := []VolumeLockInfo{} - - for param, debugLock := range globalNSMutex.debugLockMap { - if param.volume != bucket { - continue - } - // N B empty prefix matches all param.path. - if !hasPrefix(param.path, prefix) { - continue - } - - volLockInfo := VolumeLockInfo{ - Bucket: param.volume, - Object: param.path, - LocksOnObject: debugLock.counters.total, - TotalBlockedLocks: debugLock.counters.blocked, - LocksAcquiredOnObject: debugLock.counters.granted, - } - // Filter locks that are held on bucket, prefix. - for opsID, lockInfo := range debugLock.lockInfo { - // filter locks that were held for longer than duration. - elapsed := timeNow.Sub(lockInfo.since) - if elapsed < duration { - continue - } - // Add locks that are held for longer than duration. - volLockInfo.LockDetailsOnObject = append(volLockInfo.LockDetailsOnObject, - OpsLockState{ - OperationID: opsID, - LockSource: lockInfo.lockSource, - LockType: lockInfo.lType, - Status: lockInfo.status, - Since: lockInfo.since, - }) - volumeLocks = append(volumeLocks, volLockInfo) - } - } - return volumeLocks + locksInfo, _ := newObjectLayerFn().ListLocks(bucket, prefix, duration) + return locksInfo } diff --git a/cmd/lockinfo-handlers_test.go b/cmd/lockinfo-handlers_test.go index 767b43add..d71108f84 100644 --- a/cmd/lockinfo-handlers_test.go +++ b/cmd/lockinfo-handlers_test.go @@ -28,11 +28,22 @@ func TestListLocksInfo(t *testing.T) { // instrumentation information. isDistXL := false initNSLock(isDistXL) + objAPI := newObjectLayerFn() + if objAPI == nil { + t.Errorf("Failed to initialize object layer") + } + var nsMutex *nsLockMap + switch objAPI.(type) { + case *fsObjects: + nsMutex = objAPI.(*fsObjects).nsMutex + case *xlObjects: + nsMutex = objAPI.(*xlObjects).nsMutex + } // Acquire a few locks to populate lock instrumentation. // Take 10 read locks on bucket1/prefix1/obj1 for i := 0; i < 10; i++ { - readLk := globalNSMutex.NewNSLock("bucket1", "prefix1/obj1") + readLk := nsMutex.NewNSLock("bucket1", "prefix1/obj1") if readLk.GetRLock(newDynamicTimeout(60*time.Second, time.Second)) != nil { t.Errorf("Failed to get read lock on iteration %d", i) } @@ -40,7 +51,7 @@ func TestListLocksInfo(t *testing.T) { // Take write locks on bucket1/prefix/obj{11..19} for i := 0; i < 10; i++ { - wrLk := globalNSMutex.NewNSLock("bucket1", fmt.Sprintf("prefix1/obj%d", 10+i)) + wrLk := nsMutex.NewNSLock("bucket1", fmt.Sprintf("prefix1/obj%d", 10+i)) if wrLk.GetLock(newDynamicTimeout(60*time.Second, time.Second)) != nil { t.Errorf("Failed to get write lock on iteration %d", i) } diff --git a/cmd/namespace-lock.go b/cmd/namespace-lock.go index ae608777e..4c9a5dcb4 100644 --- a/cmd/namespace-lock.go +++ b/cmd/namespace-lock.go @@ -92,9 +92,9 @@ func newDsyncNodes(endpoints EndpointList) (clnts []dsync.NetLocker, myNode int) return clnts, myNode } -// initNSLock - initialize name space lock map. -func initNSLock(isDistXL bool) { - globalNSMutex = &nsLockMap{ +// newNSLock - return a new name space lock map. +func newNSLock(isDistXL bool) *nsLockMap { + nsMutex := nsLockMap{ isDistXL: isDistXL, lockMap: make(map[nsParam]*nsLock), counters: &lockStat{}, @@ -102,7 +102,13 @@ func initNSLock(isDistXL bool) { // Initialize nsLockMap with entry for instrumentation information. // Entries of -> stateInfo of locks - globalNSMutex.debugLockMap = make(map[nsParam]*debugLockInfoPerVolumePath) + nsMutex.debugLockMap = make(map[nsParam]*debugLockInfoPerVolumePath) + return &nsMutex +} + +// initNSLock - initialize name space lock map. +func initNSLock(isDistXL bool) { + globalNSMutex = newNSLock(isDistXL) } // nsParam - carries name space resource. diff --git a/cmd/object-api-errors.go b/cmd/object-api-errors.go index 17844d4e4..678b35e15 100644 --- a/cmd/object-api-errors.go +++ b/cmd/object-api-errors.go @@ -349,6 +349,13 @@ func (e PartTooBig) Error() string { return "Part size bigger than the allowed limit" } +// InvalidETag error returned when the etag has changed on disk +type InvalidETag struct{} + +func (e InvalidETag) Error() string { + return "etag of the object has changed" +} + // NotImplemented If a feature is not implemented type NotImplemented struct{} diff --git a/cmd/object-api-getobject_test.go b/cmd/object-api-getobject_test.go index 050cee663..772ce1e15 100644 --- a/cmd/object-api-getobject_test.go +++ b/cmd/object-api-getobject_test.go @@ -152,7 +152,7 @@ func testGetObject(obj ObjectLayer, instanceType string, t TestErrHandler) { } for i, testCase := range testCases { - err = obj.GetObject(testCase.bucketName, testCase.objectName, testCase.startOffset, testCase.length, testCase.writer) + err = obj.GetObject(testCase.bucketName, testCase.objectName, testCase.startOffset, testCase.length, testCase.writer, "") if err != nil && testCase.shouldPass { t.Errorf("Test %d: %s: Expected to pass, but failed with: %s", i+1, instanceType, err.Error()) } @@ -262,7 +262,7 @@ func testGetObjectPermissionDenied(obj ObjectLayer, instanceType string, disks [ } } - err = obj.GetObject(testCase.bucketName, testCase.objectName, testCase.startOffset, testCase.length, testCase.writer) + err = obj.GetObject(testCase.bucketName, testCase.objectName, testCase.startOffset, testCase.length, testCase.writer, "") if err != nil && testCase.shouldPass { t.Errorf("Test %d: %s: Expected to pass, but failed with: %s", i+1, instanceType, err.Error()) } @@ -423,7 +423,7 @@ func testGetObjectDiskNotFound(obj ObjectLayer, instanceType string, disks []str } for i, testCase := range testCases { - err = obj.GetObject(testCase.bucketName, testCase.objectName, testCase.startOffset, testCase.length, testCase.writer) + err = obj.GetObject(testCase.bucketName, testCase.objectName, testCase.startOffset, testCase.length, testCase.writer, "") if err != nil && testCase.shouldPass { t.Errorf("Test %d: %s: Expected to pass, but failed with: %s", i+1, instanceType, err.Error()) } diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index 65ca4628b..d6c7b55f0 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -18,6 +18,7 @@ package cmd import ( "io" + "time" "github.com/minio/minio/pkg/hash" ) @@ -36,16 +37,16 @@ type ObjectLayer interface { ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error) // Object operations. - GetObject(bucket, object string, startOffset int64, length int64, writer io.Writer) (err error) + GetObject(bucket, object string, startOffset int64, length int64, writer io.Writer, etag string) (err error) GetObjectInfo(bucket, object string) (objInfo ObjectInfo, err error) PutObject(bucket, object string, data *hash.Reader, metadata map[string]string) (objInfo ObjectInfo, err error) - CopyObject(srcBucket, srcObject, destBucket, destObject string, metadata map[string]string) (objInfo ObjectInfo, err error) + CopyObject(srcBucket, srcObject, destBucket, destObject string, metadata map[string]string, srcETag string) (objInfo ObjectInfo, err error) DeleteObject(bucket, object string) error // Multipart operations. ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, err error) NewMultipartUpload(bucket, object string, metadata map[string]string) (uploadID string, err error) - CopyObjectPart(srcBucket, srcObject, destBucket, destObject string, uploadID string, partID int, startOffset int64, length int64, metadata map[string]string) (info PartInfo, err error) + CopyObjectPart(srcBucket, srcObject, destBucket, destObject string, uploadID string, partID int, startOffset int64, length int64, metadata map[string]string, srcEtag string) (info PartInfo, err error) PutObjectPart(bucket, object, uploadID string, partID int, data *hash.Reader) (info PartInfo, err error) ListObjectParts(bucket, object, uploadID string, partNumberMarker int, maxParts int) (result ListPartsInfo, err error) AbortMultipartUpload(bucket, object, uploadID string) error @@ -58,4 +59,8 @@ type ObjectLayer interface { ListObjectsHeal(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) ListUploadsHeal(bucket, prefix, marker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) + + // Locking operations + ListLocks(bucket, prefix string, duration time.Duration) ([]VolumeLockInfo, error) + ClearLocks([]VolumeLockInfo) error } diff --git a/cmd/object-api-multipart-common.go b/cmd/object-api-multipart-common.go index 9a30e156b..4b706fda3 100644 --- a/cmd/object-api-multipart-common.go +++ b/cmd/object-api-multipart-common.go @@ -169,7 +169,7 @@ func writeUploadJSON(u *uploadsV1, uploadsPath, tmpPath string, disk StorageAPI) } // listMultipartUploadIDs - list all the upload ids from a marker up to 'count'. -func listMultipartUploadIDs(bucketName, objectName, uploadIDMarker string, count int, disk StorageAPI) ([]MultipartInfo, bool, error) { +func (xl xlObjects) listMultipartUploadIDs(bucketName, objectName, uploadIDMarker string, count int, disk StorageAPI) ([]MultipartInfo, bool, error) { var uploads []MultipartInfo // Read `uploads.json`. uploadsJSON, err := readUploadsJSON(bucketName, objectName, disk) diff --git a/cmd/object-handlers-common.go b/cmd/object-handlers-common.go index aee970e89..1d0123d79 100644 --- a/cmd/object-handlers-common.go +++ b/cmd/object-handlers-common.go @@ -230,12 +230,6 @@ func isETagEqual(left, right string) bool { // is a common function to be called from object handlers and // web handlers. func deleteObject(obj ObjectLayer, bucket, object string, r *http.Request) (err error) { - // Acquire a write lock before deleting the object. - objectLock := globalNSMutex.NewNSLock(bucket, object) - if err = objectLock.GetLock(globalOperationTimeout); err != nil { - return err - } - defer objectLock.Unlock() // Proceed to delete the object. if err = obj.DeleteObject(bucket, object); err != nil { diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index 0cbe1aa00..eda1afa21 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -107,14 +107,6 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req return } - // Lock the object before reading. - objectLock := globalNSMutex.NewNSLock(bucket, object) - if objectLock.GetRLock(globalObjectTimeout) != nil { - writeErrorResponse(w, ErrOperationTimedOut, r.URL) - return - } - defer objectLock.RUnlock() - objInfo, err := objectAPI.GetObjectInfo(bucket, object) if err != nil { apiErr := toAPIErrorCode(err) @@ -182,7 +174,7 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req httpWriter := ioutil.WriteOnClose(writer) // Reads the object at startOffset and writes to mw. - if err = objectAPI.GetObject(bucket, object, startOffset, length, httpWriter); err != nil { + if err = objectAPI.GetObject(bucket, object, startOffset, length, httpWriter, objInfo.ETag); err != nil { errorIf(err, "Unable to write to client.") if !httpWriter.HasWritten() { // write error response only if no data has been written to client yet writeErrorResponse(w, toAPIErrorCode(err), r.URL) @@ -234,14 +226,6 @@ func (api objectAPIHandlers) HeadObjectHandler(w http.ResponseWriter, r *http.Re return } - // Lock the object before reading. - objectLock := globalNSMutex.NewNSLock(bucket, object) - if objectLock.GetRLock(globalObjectTimeout) != nil { - writeErrorResponseHeadersOnly(w, ErrOperationTimedOut) - return - } - defer objectLock.RUnlock() - objInfo, err := objectAPI.GetObjectInfo(bucket, object) if err != nil { apiErr := toAPIErrorCode(err) @@ -362,32 +346,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re writeErrorResponse(w, ErrNotImplemented, r.URL) return } - cpSrcDstSame := srcBucket == dstBucket && srcObject == dstObject - // Hold write lock on destination since in both cases - // - if source and destination are same - // - if source and destination are different - // it is the sole mutating state. - objectDWLock := globalNSMutex.NewNSLock(dstBucket, dstObject) - if objectDWLock.GetLock(globalObjectTimeout) != nil { - writeErrorResponse(w, ErrOperationTimedOut, r.URL) - return - } - defer objectDWLock.Unlock() - - // if source and destination are different, we have to hold - // additional read lock as well to protect against writes on - // source. - if !cpSrcDstSame { - // Hold read locks on source object only if we are - // going to read data from source object. - objectSRLock := globalNSMutex.NewNSLock(srcBucket, srcObject) - if objectSRLock.GetRLock(globalObjectTimeout) != nil { - writeErrorResponse(w, ErrOperationTimedOut, r.URL) - return - } - defer objectSRLock.RUnlock() - } objInfo, err := objectAPI.GetObjectInfo(srcBucket, srcObject) if err != nil { @@ -424,7 +383,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re // Copy source object to destination, if source and destination // object is same then only metadata is updated. - objInfo, err = objectAPI.CopyObject(srcBucket, srcObject, dstBucket, dstObject, newMetadata) + objInfo, err = objectAPI.CopyObject(srcBucket, srcObject, dstBucket, dstObject, newMetadata, objInfo.ETag) if err != nil { writeErrorResponse(w, toAPIErrorCode(err), r.URL) return @@ -538,14 +497,6 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req } } - // Lock the object. - objectLock := globalNSMutex.NewNSLock(bucket, object) - if objectLock.GetLock(globalObjectTimeout) != nil { - writeErrorResponse(w, ErrOperationTimedOut, r.URL) - return - } - defer objectLock.Unlock() - var ( md5hex = hex.EncodeToString(md5Bytes) sha256hex = "" @@ -749,15 +700,6 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt return } - // Hold read locks on source object only if we are - // going to read data from source object. - objectSRLock := globalNSMutex.NewNSLock(srcBucket, srcObject) - if objectSRLock.GetRLock(globalObjectTimeout) != nil { - writeErrorResponse(w, ErrOperationTimedOut, r.URL) - return - } - defer objectSRLock.RUnlock() - objInfo, err := objectAPI.GetObjectInfo(srcBucket, srcObject) if err != nil { writeErrorResponse(w, toAPIErrorCode(err), r.URL) @@ -799,7 +741,7 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt // Copy source object to destination, if source and destination // object is same then only metadata is updated. partInfo, err := objectAPI.CopyObjectPart(srcBucket, srcObject, dstBucket, - dstObject, uploadID, partID, startOffset, length, nil) + dstObject, uploadID, partID, startOffset, length, nil, objInfo.ETag) if err != nil { writeErrorResponse(w, toAPIErrorCode(err), r.URL) return @@ -1057,14 +999,6 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite completeParts = append(completeParts, part) } - // Hold write lock on the object. - destLock := globalNSMutex.NewNSLock(bucket, object) - if destLock.GetLock(globalObjectTimeout) != nil { - writeErrorResponse(w, ErrOperationTimedOut, r.URL) - return - } - defer destLock.Unlock() - objInfo, err := objectAPI.CompleteMultipartUpload(bucket, object, uploadID, completeParts) if err != nil { err = errors.Cause(err) diff --git a/cmd/object-handlers_test.go b/cmd/object-handlers_test.go index 9416cbbde..01993c56b 100644 --- a/cmd/object-handlers_test.go +++ b/cmd/object-handlers_test.go @@ -766,7 +766,7 @@ func testAPIPutObjectStreamSigV4Handler(obj ObjectLayer, instanceType, bucketNam t.Fatalf("Test %d: %s: ContentEncoding is set to \"%s\" which is unexpected, expected \"%s\"", i+1, instanceType, objInfo.ContentEncoding, expectedContentEncoding) } buffer := new(bytes.Buffer) - err = obj.GetObject(testCase.bucketName, testCase.objectName, 0, int64(testCase.dataLen), buffer) + err = obj.GetObject(testCase.bucketName, testCase.objectName, 0, int64(testCase.dataLen), buffer, objInfo.ETag) if err != nil { t.Fatalf("Test %d: %s: Failed to fetch the copied object: %s", i+1, instanceType, err) } @@ -943,7 +943,7 @@ func testAPIPutObjectHandler(obj ObjectLayer, instanceType, bucketName string, a buffer := new(bytes.Buffer) // Fetch the object to check whether the content is same as the one uploaded via PutObject. - err = obj.GetObject(testCase.bucketName, testCase.objectName, 0, int64(len(bytesData)), buffer) + err = obj.GetObject(testCase.bucketName, testCase.objectName, 0, int64(len(bytesData)), buffer, "") if err != nil { t.Fatalf("Test %d: %s: Failed to fetch the copied object: %s", i+1, instanceType, err) } @@ -986,7 +986,7 @@ func testAPIPutObjectHandler(obj ObjectLayer, instanceType, bucketName string, a if testCase.expectedRespStatus == http.StatusOK { buffer := new(bytes.Buffer) // Fetch the object to check whether the content is same as the one uploaded via PutObject. - err = obj.GetObject(testCase.bucketName, testCase.objectName, 0, int64(len(bytesData)), buffer) + err = obj.GetObject(testCase.bucketName, testCase.objectName, 0, int64(len(bytesData)), buffer, "") if err != nil { t.Fatalf("Test %d: %s: Failed to fetch the copied object: %s", i+1, instanceType, err) } @@ -1138,7 +1138,7 @@ func testAPICopyObjectPartHandlerSanity(obj ObjectLayer, instanceType, bucketNam } var buf bytes.Buffer - if err = obj.GetObject(bucketName, testObject, 0, int64(len(bytesData[0].byteData)), &buf); err != nil { + if err = obj.GetObject(bucketName, testObject, 0, int64(len(bytesData[0].byteData)), &buf, ""); err != nil { t.Fatalf("Test: %s reading completed file failed: %v", instanceType, err) } if !bytes.Equal(buf.Bytes(), bytesData[0].byteData) { @@ -1795,7 +1795,7 @@ func testAPICopyObjectHandler(obj ObjectLayer, instanceType, bucketName string, if rec.Code == http.StatusOK { // See if the new object is formed. // testing whether the copy was successful. - err = obj.GetObject(testCase.bucketName, testCase.newObjectName, 0, int64(len(bytesData[0].byteData)), buffers[0]) + err = obj.GetObject(testCase.bucketName, testCase.newObjectName, 0, int64(len(bytesData[0].byteData)), buffers[0], "") if err != nil { t.Fatalf("Test %d: %s: Failed to fetch the copied object: %s", i+1, instanceType, err) } diff --git a/cmd/object_api_suite_test.go b/cmd/object_api_suite_test.go index dfeb21e95..44a38c692 100644 --- a/cmd/object_api_suite_test.go +++ b/cmd/object_api_suite_test.go @@ -207,7 +207,7 @@ func testMultipleObjectCreation(obj ObjectLayer, instanceType string, t TestErrH for key, value := range objects { var byteBuffer bytes.Buffer - err = obj.GetObject("bucket", key, 0, int64(len(value)), &byteBuffer) + err = obj.GetObject("bucket", key, 0, int64(len(value)), &byteBuffer, "") if err != nil { t.Fatalf("%s: %s", instanceType, err) } @@ -456,7 +456,7 @@ func testObjectOverwriteWorks(obj ObjectLayer, instanceType string, t TestErrHan } var bytesBuffer bytes.Buffer - err = obj.GetObject("bucket", "object", 0, length, &bytesBuffer) + err = obj.GetObject("bucket", "object", 0, length, &bytesBuffer, "") if err != nil { t.Fatalf("%s: %s", instanceType, err) } @@ -523,7 +523,7 @@ func testPutObject(obj ObjectLayer, instanceType string, t TestErrHandler) { if err != nil { t.Fatalf("%s: %s", instanceType, err) } - err = obj.GetObject("bucket", "object", 0, length, &bytesBuffer1) + err = obj.GetObject("bucket", "object", 0, length, &bytesBuffer1, "") if err != nil { t.Fatalf("%s: %s", instanceType, err) } @@ -536,7 +536,7 @@ func testPutObject(obj ObjectLayer, instanceType string, t TestErrHandler) { if err != nil { t.Fatalf("%s: %s", instanceType, err) } - err = obj.GetObject("bucket", "object", 0, length, &bytesBuffer2) + err = obj.GetObject("bucket", "object", 0, length, &bytesBuffer2, "") if err != nil { t.Fatalf("%s: %s", instanceType, err) } @@ -566,7 +566,7 @@ func testPutObjectInSubdir(obj ObjectLayer, instanceType string, t TestErrHandle } var bytesBuffer bytes.Buffer - err = obj.GetObject("bucket", "dir1/dir2/object", 0, length, &bytesBuffer) + err = obj.GetObject("bucket", "dir1/dir2/object", 0, length, &bytesBuffer, "") if err != nil { t.Fatalf("%s: %s", instanceType, err) } diff --git a/cmd/web-handlers.go b/cmd/web-handlers.go index 47b792c11..f81fd0382 100644 --- a/cmd/web-handlers.go +++ b/cmd/web-handlers.go @@ -131,12 +131,6 @@ func (web *webAPIHandlers) MakeBucket(r *http.Request, args *MakeBucketArgs, rep return toJSONError(errInvalidBucketName) } - bucketLock := globalNSMutex.NewNSLock(args.BucketName, "") - if err := bucketLock.GetLock(globalObjectTimeout); err != nil { - return toJSONError(errOperationTimedOut) - } - defer bucketLock.Unlock() - if err := objectAPI.MakeBucketWithLocation(args.BucketName, globalServerConfig.GetRegion()); err != nil { return toJSONError(err, args.BucketName) } @@ -160,12 +154,6 @@ func (web *webAPIHandlers) DeleteBucket(r *http.Request, args *RemoveBucketArgs, return toJSONError(errAuthentication) } - bucketLock := globalNSMutex.NewNSLock(args.BucketName, "") - if err := bucketLock.GetLock(globalObjectTimeout); err != nil { - return toJSONError(errOperationTimedOut) - } - defer bucketLock.Unlock() - err := objectAPI.DeleteBucket(args.BucketName) if err != nil { return toJSONError(err, args.BucketName) @@ -564,14 +552,6 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) { return } - // Lock the object. - objectLock := globalNSMutex.NewNSLock(bucket, object) - if objectLock.GetLock(globalObjectTimeout) != nil { - writeWebErrorResponse(w, errOperationTimedOut) - return - } - defer objectLock.Unlock() - hashReader, err := hash.NewReader(r.Body, size, "", "") if err != nil { writeWebErrorResponse(w, err) @@ -614,15 +594,7 @@ func (web *webAPIHandlers) Download(w http.ResponseWriter, r *http.Request) { // Add content disposition. w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=\"%s\"", path.Base(object))) - // Lock the object before reading. - objectLock := globalNSMutex.NewNSLock(bucket, object) - if objectLock.GetRLock(globalObjectTimeout) != nil { - writeWebErrorResponse(w, errOperationTimedOut) - return - } - defer objectLock.RUnlock() - - if err := objectAPI.GetObject(bucket, object, 0, -1, w); err != nil { + if err := objectAPI.GetObject(bucket, object, 0, -1, w, ""); err != nil { /// No need to print error, response writer already written to. return } @@ -686,7 +658,7 @@ func (web *webAPIHandlers) DownloadZip(w http.ResponseWriter, r *http.Request) { writeWebErrorResponse(w, errUnexpected) return err } - return objectAPI.GetObject(args.BucketName, objectName, 0, info.Size, writer) + return objectAPI.GetObject(args.BucketName, objectName, 0, info.Size, writer, "") } if !hasSuffix(object, slashSeparator) { diff --git a/cmd/web-handlers_test.go b/cmd/web-handlers_test.go index d2a550210..83f9b8f1a 100644 --- a/cmd/web-handlers_test.go +++ b/cmd/web-handlers_test.go @@ -887,7 +887,7 @@ func testUploadWebHandler(obj ObjectLayer, instanceType string, t TestErrHandler } var byteBuffer bytes.Buffer - err = obj.GetObject(bucketName, objectName, 0, int64(len(content)), &byteBuffer) + err = obj.GetObject(bucketName, objectName, 0, int64(len(content)), &byteBuffer, "") if err != nil { t.Fatalf("Failed, %v", err) } diff --git a/cmd/xl-v1-bucket.go b/cmd/xl-v1-bucket.go index b780de09d..2b0f7d9c4 100644 --- a/cmd/xl-v1-bucket.go +++ b/cmd/xl-v1-bucket.go @@ -33,6 +33,11 @@ var bucketMetadataOpIgnoredErrs = append(bucketOpIgnoredErrs, errVolumeNotFound) // MakeBucket - make a bucket. func (xl xlObjects) MakeBucketWithLocation(bucket, location string) error { + bucketLock := xl.nsMutex.NewNSLock(bucket, "") + if err := bucketLock.GetLock(globalObjectTimeout); err != nil { + return err + } + defer bucketLock.Unlock() // Verify if bucket is valid. if !IsValidBucketName(bucket) { return errors.Trace(BucketNameInvalid{Bucket: bucket}) diff --git a/cmd/xl-v1-healing.go b/cmd/xl-v1-healing.go index a0817eb25..0b0fd8946 100644 --- a/cmd/xl-v1-healing.go +++ b/cmd/xl-v1-healing.go @@ -83,6 +83,11 @@ func (xl xlObjects) HealBucket(bucket string) error { // get write quorum for an object writeQuorum := len(xl.storageDisks)/2 + 1 + bucketLock := xl.nsMutex.NewNSLock(bucket, "") + if err := bucketLock.GetLock(globalHealingTimeout); err != nil { + return err + } + defer bucketLock.Unlock() // Heal bucket. if err := healBucket(xl.storageDisks, bucket, writeQuorum); err != nil { @@ -95,11 +100,6 @@ func (xl xlObjects) HealBucket(bucket string) error { // Heal bucket - create buckets on disks where it does not exist. func healBucket(storageDisks []StorageAPI, bucket string, writeQuorum int) error { - bucketLock := globalNSMutex.NewNSLock(bucket, "") - if err := bucketLock.GetLock(globalHealingTimeout); err != nil { - return err - } - defer bucketLock.Unlock() // Initialize sync waitgroup. var wg = &sync.WaitGroup{} @@ -316,6 +316,12 @@ func quickHeal(xlObj xlObjects, writeQuorum int, readQuorum int) error { for bucketName, occCount := range bucketOcc { // Heal bucket only if healing is needed. if occCount != len(xlObj.storageDisks) { + bucketLock := xlObj.nsMutex.NewNSLock(bucketName, "") + if perr := bucketLock.GetLock(globalHealingTimeout); perr != nil { + return perr + } + defer bucketLock.Unlock() + // Heal bucket and then proceed to heal bucket metadata if any. if err = healBucket(xlObj.storageDisks, bucketName, writeQuorum); err == nil { if err = healBucketMetadata(xlObj, bucketName); err == nil { @@ -542,7 +548,7 @@ func (xl xlObjects) HealObject(bucket, object string) (int, int, error) { } // Lock the object before healing. - objectLock := globalNSMutex.NewNSLock(bucket, object) + objectLock := xl.nsMutex.NewNSLock(bucket, object) if err := objectLock.GetRLock(globalHealingTimeout); err != nil { return 0, 0, err } diff --git a/cmd/xl-v1-list-objects-heal.go b/cmd/xl-v1-list-objects-heal.go index 02e5cdca6..20d058b59 100644 --- a/cmd/xl-v1-list-objects-heal.go +++ b/cmd/xl-v1-list-objects-heal.go @@ -143,7 +143,7 @@ func (xl xlObjects) listObjectsHeal(bucket, prefix, marker, delimiter string, ma } // Check if the current object needs healing - objectLock := globalNSMutex.NewNSLock(bucket, objInfo.Name) + objectLock := xl.nsMutex.NewNSLock(bucket, objInfo.Name) if err := objectLock.GetRLock(globalHealingTimeout); err != nil { return loi, err } @@ -223,12 +223,12 @@ func (xl xlObjects) ListUploadsHeal(bucket, prefix, marker, uploadIDMarker, } // Fetches list of multipart uploadIDs given bucket, keyMarker, uploadIDMarker. -func fetchMultipartUploadIDs(bucket, keyMarker, uploadIDMarker string, +func (xl xlObjects) fetchMultipartUploadIDs(bucket, keyMarker, uploadIDMarker string, maxUploads int, disks []StorageAPI) (uploads []MultipartInfo, end bool, err error) { // Hold a read lock on keyMarker path. - keyMarkerLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, + keyMarkerLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, keyMarker)) if err = keyMarkerLock.GetRLock(globalHealingTimeout); err != nil { return uploads, end, err @@ -237,7 +237,7 @@ func fetchMultipartUploadIDs(bucket, keyMarker, uploadIDMarker string, if disk == nil { continue } - uploads, end, err = listMultipartUploadIDs(bucket, keyMarker, + uploads, end, err = xl.listMultipartUploadIDs(bucket, keyMarker, uploadIDMarker, maxUploads, disk) if err == nil || !errors.IsErrIgnored(err, objMetadataOpIgnoredErrs...) { @@ -268,7 +268,7 @@ func (xl xlObjects) listMultipartUploadsHeal(bucket, prefix, keyMarker, // List all upload ids for the given keyMarker, starting from // uploadIDMarker. if uploadIDMarker != "" { - uploads, _, err = fetchMultipartUploadIDs(bucket, keyMarker, + uploads, _, err = xl.fetchMultipartUploadIDs(bucket, keyMarker, uploadIDMarker, maxUploads, xl.getLoadBalancedDisks()) if err != nil { return lmi, err @@ -349,7 +349,7 @@ func (xl xlObjects) listMultipartUploadsHeal(bucket, prefix, keyMarker, var newUploads []MultipartInfo var end bool uploadIDMarker = "" - newUploads, end, err = fetchMultipartUploadIDs(bucket, entry, uploadIDMarker, + newUploads, end, err = xl.fetchMultipartUploadIDs(bucket, entry, uploadIDMarker, uploadsLeft, xl.getLoadBalancedDisks()) if err != nil { return lmi, err diff --git a/cmd/xl-v1-multipart.go b/cmd/xl-v1-multipart.go index 6b5204884..321759f99 100644 --- a/cmd/xl-v1-multipart.go +++ b/cmd/xl-v1-multipart.go @@ -306,7 +306,7 @@ func (xl xlObjects) listMultipartUploadsCleanup(bucket, prefix, keyMarker, uploa // uploadIDMarker first. if uploadIDMarker != "" { // hold lock on keyMarker path - keyMarkerLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, + keyMarkerLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, keyMarker)) if err = keyMarkerLock.GetRLock(globalListingTimeout); err != nil { return lmi, err @@ -315,7 +315,7 @@ func (xl xlObjects) listMultipartUploadsCleanup(bucket, prefix, keyMarker, uploa if disk == nil { continue } - uploads, _, err = listMultipartUploadIDs(bucket, keyMarker, uploadIDMarker, maxUploads, disk) + uploads, _, err = xl.listMultipartUploadIDs(bucket, keyMarker, uploadIDMarker, maxUploads, disk) if err == nil { break } @@ -374,7 +374,7 @@ func (xl xlObjects) listMultipartUploadsCleanup(bucket, prefix, keyMarker, uploa // For the new object entry we get all its // pending uploadIDs. - entryLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, + entryLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, entry)) if err = entryLock.GetRLock(globalListingTimeout); err != nil { return lmi, err @@ -384,7 +384,7 @@ func (xl xlObjects) listMultipartUploadsCleanup(bucket, prefix, keyMarker, uploa if disk == nil { continue } - newUploads, end, err = listMultipartUploadIDs(bucket, entry, uploadIDMarker, maxUploads, disk) + newUploads, end, err = xl.listMultipartUploadIDs(bucket, entry, uploadIDMarker, maxUploads, disk) if err == nil { break } @@ -465,8 +465,14 @@ func (xl xlObjects) ListMultipartUploads(bucket, object, keyMarker, uploadIDMark if disk == nil { continue } - - uploads, _, err := listMultipartUploadIDs(bucket, object, uploadIDMarker, maxUploads, disk) + // Hold the lock so that two parallel complete-multipart-uploads + // do not leave a stale uploads.json behind. + objectMPartPathLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object)) + if err := objectMPartPathLock.GetRLock(globalListingTimeout); err != nil { + return lmi, err + } + defer objectMPartPathLock.RUnlock() + uploads, _, err := xl.listMultipartUploadIDs(bucket, object, uploadIDMarker, maxUploads, disk) if err != nil { return lmi, err } @@ -525,7 +531,7 @@ func (xl xlObjects) newMultipartUpload(bucket string, object string, meta map[st // This lock needs to be held for any changes to the directory // contents of ".minio.sys/multipart/object/" - objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, + objectMPartPathLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object)) if err := objectMPartPathLock.GetLock(globalOperationTimeout); err != nil { return "", err @@ -582,17 +588,34 @@ func (xl xlObjects) NewMultipartUpload(bucket, object string, meta map[string]st // data is read from an existing object. // // Implements S3 compatible Upload Part Copy API. -func (xl xlObjects) CopyObjectPart(srcBucket, srcObject, dstBucket, dstObject, uploadID string, partID int, startOffset int64, length int64, metadata map[string]string) (pi PartInfo, e error) { +func (xl xlObjects) CopyObjectPart(srcBucket, srcObject, dstBucket, dstObject, uploadID string, partID int, startOffset int64, length int64, metadata map[string]string, srcEtag string) (pi PartInfo, e error) { + // Hold read locks on source object only if we are + // going to read data from source object. + objectSRLock := xl.nsMutex.NewNSLock(srcBucket, srcObject) + if err := objectSRLock.GetRLock(globalObjectTimeout); err != nil { + return pi, err + } + defer objectSRLock.RUnlock() + if err := checkNewMultipartArgs(srcBucket, srcObject, xl); err != nil { return pi, err } + if srcEtag != "" { + objInfo, err := xl.getObjectInfo(srcBucket, srcObject) + if err != nil { + return pi, toObjectErr(err, srcBucket, srcObject) + } + if objInfo.ETag != srcEtag { + return pi, toObjectErr(errors.Trace(InvalidETag{}), srcBucket, srcObject) + } + } // Initialize pipe. pipeReader, pipeWriter := io.Pipe() go func() { - if gerr := xl.GetObject(srcBucket, srcObject, startOffset, length, pipeWriter); gerr != nil { - errorIf(gerr, "Unable to read object `%s/%s`.", srcBucket, srcObject) + if gerr := xl.getObject(srcBucket, srcObject, startOffset, length, pipeWriter, ""); gerr != nil { + errorIf(gerr, "Unable to read %s of the object `%s/%s`.", srcBucket, srcObject) pipeWriter.CloseWithError(toObjectErr(gerr, srcBucket, srcObject)) return } @@ -631,12 +654,20 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, d return pi, toObjectErr(errors.Trace(errInvalidArgument)) } + // Hold the lock so that two parallel complete-multipart-uploads + // do not leave a stale uploads.json behind. + objectMPartPathLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object)) + if err := objectMPartPathLock.GetLock(globalOperationTimeout); err != nil { + return pi, err + } + defer objectMPartPathLock.Unlock() + var partsMetadata []xlMetaV1 var errs []error uploadIDPath := pathJoin(bucket, object, uploadID) // pre-check upload id lock. - preUploadIDLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, uploadIDPath) + preUploadIDLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket, uploadIDPath) if err := preUploadIDLock.GetRLock(globalOperationTimeout); err != nil { return pi, err } @@ -708,7 +739,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, d } // post-upload check (write) lock - postUploadIDLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, uploadIDPath) + postUploadIDLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket, uploadIDPath) if err = postUploadIDLock.GetLock(globalOperationTimeout); err != nil { return pi, err } @@ -859,10 +890,16 @@ func (xl xlObjects) ListObjectParts(bucket, object, uploadID string, partNumberM if err := checkListPartsArgs(bucket, object, xl); err != nil { return lpi, err } - + // Hold the lock so that two parallel complete-multipart-uploads + // do not leave a stale uploads.json behind. + objectMPartPathLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object)) + if err := objectMPartPathLock.GetRLock(globalListingTimeout); err != nil { + return lpi, errors.Trace(err) + } + defer objectMPartPathLock.RUnlock() // Hold lock so that there is no competing // abort-multipart-upload or complete-multipart-upload. - uploadIDLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, + uploadIDLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object, uploadID)) if err := uploadIDLock.GetLock(globalListingTimeout); err != nil { return lpi, err @@ -886,14 +923,19 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload if err := checkCompleteMultipartArgs(bucket, object, xl); err != nil { return oi, err } - + // Hold write lock on the object. + destLock := xl.nsMutex.NewNSLock(bucket, object) + if err := destLock.GetLock(globalObjectTimeout); err != nil { + return oi, err + } + defer destLock.Unlock() // Hold lock so that // // 1) no one aborts this multipart upload // // 2) no one does a parallel complete-multipart-upload on this // multipart upload - uploadIDLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, + uploadIDLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object, uploadID)) if err := uploadIDLock.GetLock(globalOperationTimeout); err != nil { return oi, err @@ -1034,7 +1076,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload // Prefetch the object from disk by triggering a fake GetObject call // Unlike a regular single PutObject, multipart PutObject is comes in // stages and it is harder to cache. - go xl.GetObject(bucket, object, 0, objectSize, ioutil.Discard) + go xl.GetObject(bucket, object, 0, objectSize, ioutil.Discard, s3MD5) } }() @@ -1075,7 +1117,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload // Hold the lock so that two parallel // complete-multipart-uploads do not leave a stale // uploads.json behind. - objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, + objectMPartPathLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object)) if err = objectMPartPathLock.GetLock(globalOperationTimeout); err != nil { return oi, toObjectErr(err, bucket, object) @@ -1155,7 +1197,7 @@ func (xl xlObjects) abortMultipartUpload(bucket, object, uploadID string) (err e // hold lock so we don't compete with a complete, or abort // multipart request. - objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, + objectMPartPathLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object)) if err = objectMPartPathLock.GetLock(globalOperationTimeout); err != nil { return toObjectErr(err, bucket, object) @@ -1186,10 +1228,9 @@ func (xl xlObjects) AbortMultipartUpload(bucket, object, uploadID string) error if err := checkAbortMultipartArgs(bucket, object, xl); err != nil { return err } - // Hold lock so that there is no competing // complete-multipart-upload or put-object-part. - uploadIDLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, + uploadIDLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object, uploadID)) if err := uploadIDLock.GetLock(globalOperationTimeout); err != nil { return err diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index 9548a9fff..2bd8dd0f6 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -57,7 +57,40 @@ func (xl xlObjects) prepareFile(bucket, object string, size int64, onlineDisks [ // CopyObject - copy object source object to destination object. // if source object and destination object are same we only // update metadata. -func (xl xlObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string, metadata map[string]string) (oi ObjectInfo, e error) { +func (xl xlObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string, metadata map[string]string, srcEtag string) (oi ObjectInfo, e error) { + cpSrcDstSame := srcBucket == dstBucket && srcObject == dstObject + // Hold write lock on destination since in both cases + // - if source and destination are same + // - if source and destination are different + // it is the sole mutating state. + objectDWLock := xl.nsMutex.NewNSLock(dstBucket, dstObject) + if err := objectDWLock.GetLock(globalObjectTimeout); err != nil { + return oi, err + } + defer objectDWLock.Unlock() + // if source and destination are different, we have to hold + // additional read lock as well to protect against writes on + // source. + if !cpSrcDstSame { + // Hold read locks on source object only if we are + // going to read data from source object. + objectSRLock := xl.nsMutex.NewNSLock(srcBucket, srcObject) + if err := objectSRLock.GetRLock(globalObjectTimeout); err != nil { + return oi, err + } + defer objectSRLock.RUnlock() + } + + if srcEtag != "" { + objInfo, perr := xl.getObjectInfo(srcBucket, srcObject) + if perr != nil { + return oi, toObjectErr(perr, srcBucket, srcObject) + } + if objInfo.ETag != srcEtag { + return oi, toObjectErr(errors.Trace(InvalidETag{}), srcBucket, srcObject) + } + } + // Read metadata associated with the object from all disks. metaArr, errs := readAllXLMetadata(xl.storageDisks, srcBucket, srcObject) @@ -114,8 +147,8 @@ func (xl xlObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string go func() { var startOffset int64 // Read the whole file. - if gerr := xl.GetObject(srcBucket, srcObject, startOffset, length, pipeWriter); gerr != nil { - errorIf(gerr, "Unable to read object `%s/%s`.", srcBucket, srcObject) + if gerr := xl.getObject(srcBucket, srcObject, startOffset, length, pipeWriter, ""); gerr != nil { + errorIf(gerr, "Unable to read %s of the object `%s/%s`.", srcBucket, srcObject) pipeWriter.CloseWithError(toObjectErr(gerr, srcBucket, srcObject)) return } @@ -127,7 +160,7 @@ func (xl xlObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string return oi, toObjectErr(errors.Trace(err), dstBucket, dstObject) } - objInfo, err := xl.PutObject(dstBucket, dstObject, hashReader, metadata) + objInfo, err := xl.putObject(dstBucket, dstObject, hashReader, metadata) if err != nil { return oi, toObjectErr(err, dstBucket, dstObject) } @@ -144,7 +177,19 @@ func (xl xlObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string // // startOffset indicates the starting read location of the object. // length indicates the total length of the object. -func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length int64, writer io.Writer) error { +func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length int64, writer io.Writer, etag string) error { + // Lock the object before reading. + objectLock := xl.nsMutex.NewNSLock(bucket, object) + if err := objectLock.GetRLock(globalObjectTimeout); err != nil { + return err + } + defer objectLock.RUnlock() + return xl.getObject(bucket, object, startOffset, length, writer, etag) +} + +// getObject wrapper for xl GetObject +func (xl xlObjects) getObject(bucket, object string, startOffset int64, length int64, writer io.Writer, etag string) error { + if err := checkGetObjArgs(bucket, object); err != nil { return err } @@ -311,6 +356,13 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i // GetObjectInfo - reads object metadata and replies back ObjectInfo. func (xl xlObjects) GetObjectInfo(bucket, object string) (oi ObjectInfo, e error) { + // Lock the object before reading. + objectLock := xl.nsMutex.NewNSLock(bucket, object) + if err := objectLock.GetRLock(globalObjectTimeout); err != nil { + return oi, err + } + defer objectLock.RUnlock() + if err := checkGetObjArgs(bucket, object); err != nil { return oi, err } @@ -445,6 +497,17 @@ func renameObject(disks []StorageAPI, srcBucket, srcObject, dstBucket, dstObject // writes `xl.json` which carries the necessary metadata for future // object operations. func (xl xlObjects) PutObject(bucket string, object string, data *hash.Reader, metadata map[string]string) (objInfo ObjectInfo, err error) { + // Lock the object. + objectLock := xl.nsMutex.NewNSLock(bucket, object) + if err := objectLock.GetLock(globalObjectTimeout); err != nil { + return objInfo, err + } + defer objectLock.Unlock() + return xl.putObject(bucket, object, data, metadata) +} + +// putObject wrapper for xl PutObject +func (xl xlObjects) putObject(bucket string, object string, data *hash.Reader, metadata map[string]string) (objInfo ObjectInfo, err error) { // This is a special case with size as '0' and object ends with // a slash separator, we treat it like a valid operation and // return success. @@ -715,6 +778,13 @@ func (xl xlObjects) deleteObject(bucket, object string) error { // any error as it is not necessary for the handler to reply back a // response to the client request. func (xl xlObjects) DeleteObject(bucket, object string) (err error) { + // Acquire a write lock before deleting the object. + objectLock := xl.nsMutex.NewNSLock(bucket, object) + if perr := objectLock.GetLock(globalOperationTimeout); perr != nil { + return perr + } + defer objectLock.Unlock() + if err = checkDelObjArgs(bucket, object); err != nil { return err } diff --git a/cmd/xl-v1-object_test.go b/cmd/xl-v1-object_test.go index 06ed3bbf9..28dd943c7 100644 --- a/cmd/xl-v1-object_test.go +++ b/cmd/xl-v1-object_test.go @@ -206,7 +206,7 @@ func TestGetObjectNoQuorum(t *testing.T) { } } // Fetch object from store. - err = xl.GetObject(bucket, object, 0, int64(len("abcd")), ioutil.Discard) + err = xl.GetObject(bucket, object, 0, int64(len("abcd")), ioutil.Discard, "") err = errors.Cause(err) if err != toObjectErr(errXLReadQuorum, bucket, object) { t.Errorf("Expected putObject to fail with %v, but failed with %v", toObjectErr(errXLWriteQuorum, bucket, object), err) diff --git a/cmd/xl-v1.go b/cmd/xl-v1.go index 6d9a15e4c..f9fad86ab 100644 --- a/cmd/xl-v1.go +++ b/cmd/xl-v1.go @@ -21,6 +21,7 @@ import ( "runtime/debug" "sort" "sync" + "time" humanize "github.com/dustin/go-humanize" "github.com/minio/minio/pkg/disk" @@ -59,6 +60,9 @@ type xlObjects struct { // Object cache enabled. objCacheEnabled bool + + // name space mutex for object layer + nsMutex *nsLockMap } // list of all errors that can be ignored in tree walk operation in XL @@ -106,8 +110,8 @@ func newXLObjects(storageDisks []StorageAPI) (ObjectLayer, error) { mutex: &sync.Mutex{}, storageDisks: newStorageDisks, listPool: listPool, + nsMutex: newNSLock(globalIsDistXL), } - // Get cache size if _MINIO_CACHE environment variable is set. var maxCacheSize uint64 if !globalXLObjCacheDisabled { @@ -171,6 +175,63 @@ func (xl xlObjects) Shutdown() error { return nil } +// Locking operations + +// List namespace locks held in object layer +func (xl xlObjects) ListLocks(bucket, prefix string, duration time.Duration) ([]VolumeLockInfo, error) { + xl.nsMutex.lockMapMutex.Lock() + defer xl.nsMutex.lockMapMutex.Unlock() + // Fetch current time once instead of fetching system time for every lock. + timeNow := UTCNow() + volumeLocks := []VolumeLockInfo{} + + for param, debugLock := range xl.nsMutex.debugLockMap { + if param.volume != bucket { + continue + } + // N B empty prefix matches all param.path. + if !hasPrefix(param.path, prefix) { + continue + } + + volLockInfo := VolumeLockInfo{ + Bucket: param.volume, + Object: param.path, + LocksOnObject: debugLock.counters.total, + TotalBlockedLocks: debugLock.counters.blocked, + LocksAcquiredOnObject: debugLock.counters.granted, + } + // Filter locks that are held on bucket, prefix. + for opsID, lockInfo := range debugLock.lockInfo { + // filter locks that were held for longer than duration. + elapsed := timeNow.Sub(lockInfo.since) + if elapsed < duration { + continue + } + // Add locks that are held for longer than duration. + volLockInfo.LockDetailsOnObject = append(volLockInfo.LockDetailsOnObject, + OpsLockState{ + OperationID: opsID, + LockSource: lockInfo.lockSource, + LockType: lockInfo.lType, + Status: lockInfo.status, + Since: lockInfo.since, + }) + volumeLocks = append(volumeLocks, volLockInfo) + } + } + return volumeLocks, nil +} + +// Clear namespace locks held in object layer +func (xl xlObjects) ClearLocks(volLocks []VolumeLockInfo) error { + // Remove lock matching bucket/prefix held longer than duration. + for _, volLock := range volLocks { + xl.nsMutex.ForceUnlock(volLock.Bucket, volLock.Object) + } + return nil +} + // byDiskTotal is a collection satisfying sort.Interface. type byDiskTotal []disk.Info