Move nslocking from s3 layer to object layer (#5382)

Fixes #5350
This commit is contained in:
poornas
2018-01-12 20:34:52 -08:00
committed by Nitish Tiwari
parent dd202a1a5f
commit 0bb6247056
42 changed files with 468 additions and 355 deletions

View File

@@ -26,6 +26,7 @@ import (
"path/filepath"
"sort"
"syscall"
"time"
"github.com/minio/minio/pkg/errors"
"github.com/minio/minio/pkg/hash"
@@ -52,6 +53,9 @@ type fsObjects struct {
// To manage the appendRoutine go0routines
bgAppend *backgroundAppend
// name space mutex for object layer
nsMutex *nsLockMap
}
// Initializes meta volume on all the fs path.
@@ -138,6 +142,7 @@ func newFSObjectLayer(fsPath string) (ObjectLayer, error) {
bgAppend: &backgroundAppend{
infoMap: make(map[string]bgAppendPartsInfo),
},
nsMutex: newNSLock(false),
}
// Once the filesystem has initialized hold the read lock for
@@ -183,6 +188,18 @@ func (fs fsObjects) StorageInfo() StorageInfo {
return storageInfo
}
// Locking operations
// List namespace locks held in object layer
func (fs fsObjects) ListLocks(bucket, prefix string, duration time.Duration) ([]VolumeLockInfo, error) {
return []VolumeLockInfo{}, NotImplemented{}
}
// Clear namespace locks held in object layer
func (fs fsObjects) ClearLocks([]VolumeLockInfo) error {
return NotImplemented{}
}
/// Bucket operations
// getBucketDir - will convert incoming bucket names to
@@ -213,6 +230,11 @@ func (fs fsObjects) statBucketDir(bucket string) (os.FileInfo, error) {
// MakeBucket - create a new bucket, returns if it
// already exists.
func (fs fsObjects) MakeBucketWithLocation(bucket, location string) error {
bucketLock := fs.nsMutex.NewNSLock(bucket, "")
if err := bucketLock.GetLock(globalObjectTimeout); err != nil {
return err
}
defer bucketLock.Unlock()
bucketDir, err := fs.getBucketDir(bucket)
if err != nil {
return toObjectErr(err, bucket)
@@ -313,7 +335,29 @@ func (fs fsObjects) DeleteBucket(bucket string) error {
// CopyObject - copy object source object to destination object.
// if source object and destination object are same we only
// update metadata.
func (fs fsObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string, metadata map[string]string) (oi ObjectInfo, e error) {
func (fs fsObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string, metadata map[string]string, srcEtag string) (oi ObjectInfo, e error) {
cpSrcDstSame := srcBucket == dstBucket && srcObject == dstObject
// Hold write lock on destination since in both cases
// - if source and destination are same
// - if source and destination are different
// it is the sole mutating state.
objectDWLock := fs.nsMutex.NewNSLock(dstBucket, dstObject)
if err := objectDWLock.GetLock(globalObjectTimeout); err != nil {
return oi, err
}
defer objectDWLock.Unlock()
// if source and destination are different, we have to hold
// additional read lock as well to protect against writes on
// source.
if !cpSrcDstSame {
// Hold read locks on source object only if we are
// going to read data from source object.
objectSRLock := fs.nsMutex.NewNSLock(srcBucket, srcObject)
if err := objectSRLock.GetRLock(globalObjectTimeout); err != nil {
return oi, err
}
defer objectSRLock.RUnlock()
}
if _, err := fs.statBucketDir(srcBucket); err != nil {
return oi, toObjectErr(err, srcBucket)
}
@@ -323,6 +367,15 @@ func (fs fsObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string
if err != nil {
return oi, toObjectErr(err, srcBucket, srcObject)
}
if srcEtag != "" {
etag, perr := fs.getObjectETag(srcBucket, srcObject)
if perr != nil {
return oi, toObjectErr(perr, srcBucket, srcObject)
}
if etag != srcEtag {
return oi, toObjectErr(errors.Trace(InvalidETag{}), srcBucket, srcObject)
}
}
// Check if this request is only metadata update.
cpMetadataOnly := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(dstBucket, dstObject))
@@ -355,7 +408,7 @@ func (fs fsObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string
go func() {
var startOffset int64 // Read the whole file.
if gerr := fs.GetObject(srcBucket, srcObject, startOffset, length, pipeWriter); gerr != nil {
if gerr := fs.getObject(srcBucket, srcObject, startOffset, length, pipeWriter, ""); gerr != nil {
errorIf(gerr, "Unable to read %s/%s.", srcBucket, srcObject)
pipeWriter.CloseWithError(gerr)
return
@@ -368,7 +421,7 @@ func (fs fsObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string
return oi, toObjectErr(err, dstBucket, dstObject)
}
objInfo, err := fs.PutObject(dstBucket, dstObject, hashReader, metadata)
objInfo, err := fs.putObject(dstBucket, dstObject, hashReader, metadata)
if err != nil {
return oi, toObjectErr(err, dstBucket, dstObject)
}
@@ -385,11 +438,22 @@ func (fs fsObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string
//
// startOffset indicates the starting read location of the object.
// length indicates the total length of the object.
func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64, writer io.Writer) (err error) {
func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64, writer io.Writer, etag string) (err error) {
if err = checkBucketAndObjectNamesFS(bucket, object); err != nil {
return err
}
// Lock the object before reading.
objectLock := fs.nsMutex.NewNSLock(bucket, object)
if err := objectLock.GetRLock(globalObjectTimeout); err != nil {
return err
}
defer objectLock.RUnlock()
return fs.getObject(bucket, object, offset, length, writer, etag)
}
// getObject - wrapper for GetObject
func (fs fsObjects) getObject(bucket, object string, offset int64, length int64, writer io.Writer, etag string) (err error) {
if _, err = fs.statBucketDir(bucket); err != nil {
return toObjectErr(err, bucket)
}
@@ -419,6 +483,15 @@ func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64,
defer fs.rwPool.Close(fsMetaPath)
}
if etag != "" {
objEtag, perr := fs.getObjectETag(bucket, object)
if perr != nil {
return toObjectErr(errors.Trace(perr), bucket, object)
}
if objEtag != etag {
return toObjectErr(errors.Trace(InvalidETag{}), bucket, object)
}
}
// Read the object, doesn't exist returns an s3 compatible error.
fsObjPath := pathJoin(fs.fsPath, bucket, object)
reader, size, err := fsOpenFile(fsObjPath, offset)
@@ -516,6 +589,13 @@ func checkBucketAndObjectNamesFS(bucket, object string) error {
// GetObjectInfo - reads object metadata and replies back ObjectInfo.
func (fs fsObjects) GetObjectInfo(bucket, object string) (oi ObjectInfo, e error) {
// Lock the object before reading.
objectLock := fs.nsMutex.NewNSLock(bucket, object)
if err := objectLock.GetRLock(globalObjectTimeout); err != nil {
return oi, err
}
defer objectLock.RUnlock()
if err := checkBucketAndObjectNamesFS(bucket, object); err != nil {
return oi, err
}
@@ -552,11 +632,21 @@ func (fs fsObjects) parentDirIsObject(bucket, parent string) bool {
// Additionally writes `fs.json` which carries the necessary metadata
// for future object operations.
func (fs fsObjects) PutObject(bucket string, object string, data *hash.Reader, metadata map[string]string) (objInfo ObjectInfo, retErr error) {
// Lock the object.
objectLock := fs.nsMutex.NewNSLock(bucket, object)
if err := objectLock.GetLock(globalObjectTimeout); err != nil {
return objInfo, err
}
defer objectLock.Unlock()
return fs.putObject(bucket, object, data, metadata)
}
// putObject - wrapper for PutObject
func (fs fsObjects) putObject(bucket string, object string, data *hash.Reader, metadata map[string]string) (objInfo ObjectInfo, retErr error) {
// No metadata is set, allocate a new one.
if metadata == nil {
metadata = make(map[string]string)
}
var err error
// Validate if bucket name is valid and exists.
@@ -677,6 +767,13 @@ func (fs fsObjects) PutObject(bucket string, object string, data *hash.Reader, m
// DeleteObject - deletes an object from a bucket, this operation is destructive
// and there are no rollbacks supported.
func (fs fsObjects) DeleteObject(bucket, object string) error {
// Acquire a write lock before deleting the object.
objectLock := fs.nsMutex.NewNSLock(bucket, object)
if err := objectLock.GetLock(globalOperationTimeout); err != nil {
return err
}
defer objectLock.Unlock()
if err := checkBucketAndObjectNamesFS(bucket, object); err != nil {
return err
}
@@ -825,7 +922,7 @@ func (fs fsObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKey
// Convert entry to ObjectInfo
entryToObjectInfo := func(entry string) (objInfo ObjectInfo, err error) {
// Protect the entry from concurrent deletes, or renames.
objectLock := globalNSMutex.NewNSLock(bucket, entry)
objectLock := fs.nsMutex.NewNSLock(bucket, entry)
if err = objectLock.GetRLock(globalListingTimeout); err != nil {
return ObjectInfo{}, err
}