mirror of
https://github.com/minio/minio.git
synced 2025-11-20 18:06:10 -05:00
fs: Add granular locking.
This commit is contained in:
@@ -40,19 +40,18 @@ import (
|
||||
|
||||
// GetObject - GET object
|
||||
func (fs Filesystem) GetObject(w io.Writer, bucket, object string, start, length int64) (int64, *probe.Error) {
|
||||
fs.lock.Lock()
|
||||
defer fs.lock.Unlock()
|
||||
fs.rwLock.RLock()
|
||||
defer fs.rwLock.RUnlock()
|
||||
|
||||
// validate bucket
|
||||
// Input validation.
|
||||
if !IsValidBucketName(bucket) {
|
||||
return 0, probe.NewError(BucketNameInvalid{Bucket: bucket})
|
||||
}
|
||||
|
||||
// validate object
|
||||
if !IsValidObjectName(object) {
|
||||
return 0, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
|
||||
}
|
||||
|
||||
// normalize buckets.
|
||||
bucket = fs.denormalizeBucket(bucket)
|
||||
bucketPath := filepath.Join(fs.path, bucket)
|
||||
if _, e := os.Stat(bucketPath); e != nil {
|
||||
@@ -101,11 +100,12 @@ func (fs Filesystem) GetObject(w io.Writer, bucket, object string, start, length
|
||||
return count, nil
|
||||
}
|
||||
|
||||
// GetObjectMetadata - HEAD object
|
||||
// GetObjectMetadata - get object metadata.
|
||||
func (fs Filesystem) GetObjectMetadata(bucket, object string) (ObjectMetadata, *probe.Error) {
|
||||
fs.lock.Lock()
|
||||
defer fs.lock.Unlock()
|
||||
fs.rwLock.RLock()
|
||||
defer fs.rwLock.RUnlock()
|
||||
|
||||
// Input validation.
|
||||
if !IsValidBucketName(bucket) {
|
||||
return ObjectMetadata{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
|
||||
}
|
||||
@@ -114,6 +114,7 @@ func (fs Filesystem) GetObjectMetadata(bucket, object string) (ObjectMetadata, *
|
||||
return ObjectMetadata{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: bucket})
|
||||
}
|
||||
|
||||
// normalize buckets.
|
||||
bucket = fs.denormalizeBucket(bucket)
|
||||
bucketPath := filepath.Join(fs.path, bucket)
|
||||
if _, e := os.Stat(bucketPath); e != nil {
|
||||
@@ -133,9 +134,12 @@ func (fs Filesystem) GetObjectMetadata(bucket, object string) (ObjectMetadata, *
|
||||
return metadata, nil
|
||||
}
|
||||
|
||||
// getMetadata - get object metadata.
|
||||
func getMetadata(rootPath, bucket, object string) (ObjectMetadata, *probe.Error) {
|
||||
// Do not use filepath.Join() since filepath.Join strips off any object names with '/', use them as is
|
||||
// in a static manner so that we can send a proper 'ObjectNotFound' reply back upon os.Stat()
|
||||
// Do not use filepath.Join() since filepath.Join strips off any
|
||||
// object names with '/', use them as is in a static manner so
|
||||
// that we can send a proper 'ObjectNotFound' reply back upon
|
||||
// os.Stat().
|
||||
var objectPath string
|
||||
// For windows use its special os.PathSeparator == "\\"
|
||||
if runtime.GOOS == "windows" {
|
||||
@@ -187,23 +191,24 @@ func isMD5SumEqual(expectedMD5Sum, actualMD5Sum string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// CreateObject - PUT object
|
||||
// CreateObject - create an object.
|
||||
func (fs Filesystem) CreateObject(bucket, object, expectedMD5Sum string, size int64, data io.Reader, signature *Signature) (ObjectMetadata, *probe.Error) {
|
||||
fs.lock.Lock()
|
||||
defer fs.lock.Unlock()
|
||||
fs.rwLock.Lock()
|
||||
defer fs.rwLock.Unlock()
|
||||
|
||||
di, e := disk.GetInfo(fs.path)
|
||||
if e != nil {
|
||||
return ObjectMetadata{}, probe.NewError(e)
|
||||
}
|
||||
|
||||
// Remove 5% from total space for cumulative disk space used for journalling, inodes etc.
|
||||
// Remove 5% from total space for cumulative disk space used for
|
||||
// journalling, inodes etc.
|
||||
availableDiskSpace := (float64(di.Free) / (float64(di.Total) - (0.05 * float64(di.Total)))) * 100
|
||||
if int64(availableDiskSpace) <= fs.minFreeDisk {
|
||||
return ObjectMetadata{}, probe.NewError(RootPathFull{Path: fs.path})
|
||||
}
|
||||
|
||||
// check bucket name valid
|
||||
// Check bucket name valid.
|
||||
if !IsValidBucketName(bucket) {
|
||||
return ObjectMetadata{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
|
||||
}
|
||||
@@ -216,24 +221,24 @@ func (fs Filesystem) CreateObject(bucket, object, expectedMD5Sum string, size in
|
||||
}
|
||||
return ObjectMetadata{}, probe.NewError(e)
|
||||
}
|
||||
// verify object path legal
|
||||
// Verify object path legal.
|
||||
if !IsValidObjectName(object) {
|
||||
return ObjectMetadata{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
|
||||
}
|
||||
|
||||
// get object path
|
||||
// Get object path.
|
||||
objectPath := filepath.Join(bucketPath, object)
|
||||
if strings.TrimSpace(expectedMD5Sum) != "" {
|
||||
var expectedMD5SumBytes []byte
|
||||
expectedMD5SumBytes, e = base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum))
|
||||
if e != nil {
|
||||
// pro-actively close the connection
|
||||
// Pro-actively close the connection.
|
||||
return ObjectMetadata{}, probe.NewError(InvalidDigest{Md5: expectedMD5Sum})
|
||||
}
|
||||
expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes)
|
||||
}
|
||||
|
||||
// write object
|
||||
// Write object.
|
||||
file, e := atomic.FileCreateWithPrefix(objectPath, "")
|
||||
if e != nil {
|
||||
switch e := e.(type) {
|
||||
@@ -266,7 +271,8 @@ func (fs Filesystem) CreateObject(bucket, object, expectedMD5Sum string, size in
|
||||
}
|
||||
|
||||
md5Sum := hex.EncodeToString(h.Sum(nil))
|
||||
// Verify if the written object is equal to what is expected, only if it is requested as such
|
||||
// Verify if the written object is equal to what is expected, only
|
||||
// if it is requested as such.
|
||||
if strings.TrimSpace(expectedMD5Sum) != "" {
|
||||
if !isMD5SumEqual(strings.TrimSpace(expectedMD5Sum), md5Sum) {
|
||||
file.CloseAndPurge()
|
||||
@@ -306,18 +312,19 @@ func (fs Filesystem) CreateObject(bucket, object, expectedMD5Sum string, size in
|
||||
return newObject, nil
|
||||
}
|
||||
|
||||
// deleteObjectPath - delete object path if its empty.
|
||||
func deleteObjectPath(basePath, deletePath, bucket, object string) *probe.Error {
|
||||
if basePath == deletePath {
|
||||
return nil
|
||||
}
|
||||
fi, e := os.Stat(deletePath)
|
||||
pathSt, e := os.Stat(deletePath)
|
||||
if e != nil {
|
||||
if os.IsNotExist(e) {
|
||||
return probe.NewError(ObjectNotFound{Bucket: bucket, Object: object})
|
||||
}
|
||||
return probe.NewError(e)
|
||||
}
|
||||
if fi.IsDir() {
|
||||
if pathSt.IsDir() {
|
||||
empty, e := ioutils.IsDirEmpty(deletePath)
|
||||
if e != nil {
|
||||
return probe.NewError(e)
|
||||
@@ -337,8 +344,8 @@ func deleteObjectPath(basePath, deletePath, bucket, object string) *probe.Error
|
||||
|
||||
// DeleteObject - delete and object
|
||||
func (fs Filesystem) DeleteObject(bucket, object string) *probe.Error {
|
||||
fs.lock.Lock()
|
||||
defer fs.lock.Unlock()
|
||||
fs.rwLock.Lock()
|
||||
defer fs.rwLock.Unlock()
|
||||
|
||||
// check bucket name valid
|
||||
if !IsValidBucketName(bucket) {
|
||||
@@ -369,10 +376,10 @@ func (fs Filesystem) DeleteObject(bucket, object string) *probe.Error {
|
||||
objectPath = fs.path + string(os.PathSeparator) + bucket + string(os.PathSeparator) + object
|
||||
}
|
||||
err := deleteObjectPath(bucketPath, objectPath, bucket, object)
|
||||
if os.IsNotExist(err.ToGoError()) {
|
||||
return probe.NewError(ObjectNotFound{Bucket: bucket, Object: object})
|
||||
}
|
||||
if err != nil {
|
||||
if os.IsNotExist(err.ToGoError()) {
|
||||
return probe.NewError(ObjectNotFound{Bucket: bucket, Object: object})
|
||||
}
|
||||
return err.Trace(bucketPath, objectPath, bucket, object)
|
||||
}
|
||||
return nil
|
||||
|
||||
Reference in New Issue
Block a user