mirror of
https://github.com/minio/minio.git
synced 2025-01-23 20:53:18 -05:00
Merge pull request #1090 from harshavardhana/multipart
fs: Add granular locking.
This commit is contained in:
commit
198a92c3c4
@ -18,8 +18,8 @@ package fs
|
||||
|
||||
// IsPrivateBucket - is private bucket
|
||||
func (fs Filesystem) IsPrivateBucket(bucket string) bool {
|
||||
fs.lock.Lock()
|
||||
defer fs.lock.Unlock()
|
||||
fs.rwLock.Lock()
|
||||
defer fs.rwLock.Unlock()
|
||||
bucketMetadata, ok := fs.buckets.Metadata[bucket]
|
||||
if !ok {
|
||||
return true
|
||||
@ -29,8 +29,8 @@ func (fs Filesystem) IsPrivateBucket(bucket string) bool {
|
||||
|
||||
// IsPublicBucket - is public bucket
|
||||
func (fs Filesystem) IsPublicBucket(bucket string) bool {
|
||||
fs.lock.Lock()
|
||||
defer fs.lock.Unlock()
|
||||
fs.rwLock.Lock()
|
||||
defer fs.rwLock.Unlock()
|
||||
bucketMetadata, ok := fs.buckets.Metadata[bucket]
|
||||
if !ok {
|
||||
return true
|
||||
@ -40,8 +40,8 @@ func (fs Filesystem) IsPublicBucket(bucket string) bool {
|
||||
|
||||
// IsReadOnlyBucket - is read only bucket
|
||||
func (fs Filesystem) IsReadOnlyBucket(bucket string) bool {
|
||||
fs.lock.Lock()
|
||||
defer fs.lock.Unlock()
|
||||
fs.rwLock.Lock()
|
||||
defer fs.rwLock.Unlock()
|
||||
bucketMetadata, ok := fs.buckets.Metadata[bucket]
|
||||
if !ok {
|
||||
return true
|
||||
|
@ -271,16 +271,17 @@ func (fs *Filesystem) listObjectsService() *probe.Error {
|
||||
// ListObjects - lists all objects for a given prefix, returns upto
|
||||
// maxKeys number of objects per call.
|
||||
func (fs Filesystem) ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsResult, *probe.Error) {
|
||||
fs.lock.Lock()
|
||||
defer fs.lock.Unlock()
|
||||
fs.rwLock.RLock()
|
||||
defer fs.rwLock.RUnlock()
|
||||
|
||||
// Input validation.
|
||||
if !IsValidBucketName(bucket) {
|
||||
return ListObjectsResult{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
|
||||
}
|
||||
|
||||
bucket = fs.denormalizeBucket(bucket)
|
||||
rootPrefix := filepath.Join(fs.path, bucket)
|
||||
// check bucket exists
|
||||
// Check bucket exists.
|
||||
if _, e := os.Stat(rootPrefix); e != nil {
|
||||
if os.IsNotExist(e) {
|
||||
return ListObjectsResult{}, probe.NewError(BucketNotFound{Bucket: bucket})
|
||||
|
@ -32,8 +32,8 @@ import (
|
||||
|
||||
// DeleteBucket - delete bucket
|
||||
func (fs Filesystem) DeleteBucket(bucket string) *probe.Error {
|
||||
fs.lock.Lock()
|
||||
defer fs.lock.Unlock()
|
||||
fs.rwLock.Lock()
|
||||
defer fs.rwLock.Unlock()
|
||||
// verify bucket path legal
|
||||
if !IsValidBucketName(bucket) {
|
||||
return probe.NewError(BucketNameInvalid{Bucket: bucket})
|
||||
@ -66,6 +66,44 @@ func (fs Filesystem) DeleteBucket(bucket string) *probe.Error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// ListBuckets - Get service.
|
||||
func (fs Filesystem) ListBuckets() ([]BucketMetadata, *probe.Error) {
|
||||
fs.rwLock.RLock()
|
||||
defer fs.rwLock.RUnlock()
|
||||
|
||||
files, err := ioutils.ReadDirN(fs.path, fs.maxBuckets)
|
||||
if err != nil && err != io.EOF {
|
||||
return []BucketMetadata{}, probe.NewError(err)
|
||||
}
|
||||
if err == io.EOF {
|
||||
// This message is printed if there are more than 1000 buckets.
|
||||
fmt.Printf("More buckets found, truncating the bucket list to %d entries only.", fs.maxBuckets)
|
||||
}
|
||||
var metadataList []BucketMetadata
|
||||
for _, file := range files {
|
||||
if !file.IsDir() {
|
||||
// if files found ignore them
|
||||
continue
|
||||
}
|
||||
dirName := strings.ToLower(file.Name())
|
||||
if file.IsDir() {
|
||||
// If directories found with odd names, skip them.
|
||||
if !IsValidBucketName(dirName) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
metadata := BucketMetadata{
|
||||
Name: dirName,
|
||||
Created: file.ModTime(),
|
||||
}
|
||||
metadataList = append(metadataList, metadata)
|
||||
}
|
||||
// Remove duplicated entries.
|
||||
metadataList = removeDuplicateBuckets(metadataList)
|
||||
return metadataList, nil
|
||||
}
|
||||
|
||||
// removeDuplicateBuckets - remove duplicate buckets.
|
||||
func removeDuplicateBuckets(elements []BucketMetadata) (result []BucketMetadata) {
|
||||
// Use map to record duplicates as we find them.
|
||||
duplicates := make(map[string]struct{})
|
||||
@ -78,81 +116,49 @@ func removeDuplicateBuckets(elements []BucketMetadata) (result []BucketMetadata)
|
||||
return result
|
||||
}
|
||||
|
||||
// ListBuckets - Get service
|
||||
func (fs Filesystem) ListBuckets() ([]BucketMetadata, *probe.Error) {
|
||||
fs.lock.Lock()
|
||||
defer fs.lock.Unlock()
|
||||
|
||||
files, err := ioutils.ReadDirN(fs.path, fs.maxBuckets)
|
||||
if err != nil && err != io.EOF {
|
||||
return []BucketMetadata{}, probe.NewError(err)
|
||||
}
|
||||
if err == io.EOF {
|
||||
fmt.Printf("Truncating the bucket list to %d entries only.", fs.maxBuckets)
|
||||
}
|
||||
var metadataList []BucketMetadata
|
||||
for _, file := range files {
|
||||
if !file.IsDir() {
|
||||
// if files found ignore them
|
||||
continue
|
||||
}
|
||||
dirName := strings.ToLower(file.Name())
|
||||
if file.IsDir() {
|
||||
// if directories found with odd names, skip them too
|
||||
if !IsValidBucketName(dirName) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
metadata := BucketMetadata{
|
||||
Name: dirName,
|
||||
Created: file.ModTime(),
|
||||
}
|
||||
metadataList = append(metadataList, metadata)
|
||||
}
|
||||
metadataList = removeDuplicateBuckets(metadataList)
|
||||
return metadataList, nil
|
||||
}
|
||||
|
||||
// MakeBucket - PUT Bucket
|
||||
// MakeBucket - PUT Bucket.
|
||||
func (fs Filesystem) MakeBucket(bucket, acl string) *probe.Error {
|
||||
fs.lock.Lock()
|
||||
defer fs.lock.Unlock()
|
||||
fs.rwLock.Lock()
|
||||
defer fs.rwLock.Unlock()
|
||||
|
||||
di, err := disk.GetInfo(fs.path)
|
||||
if err != nil {
|
||||
return probe.NewError(err)
|
||||
}
|
||||
|
||||
// Remove 5% from total space for cumulative disk space used for journalling, inodes etc.
|
||||
// Remove 5% from total space for cumulative disk space used for
|
||||
// journalling, inodes etc.
|
||||
availableDiskSpace := (float64(di.Free) / (float64(di.Total) - (0.05 * float64(di.Total)))) * 100
|
||||
if int64(availableDiskSpace) <= fs.minFreeDisk {
|
||||
return probe.NewError(RootPathFull{Path: fs.path})
|
||||
}
|
||||
|
||||
// verify bucket path legal
|
||||
// Verify if bucket path legal.
|
||||
if !IsValidBucketName(bucket) {
|
||||
return probe.NewError(BucketNameInvalid{Bucket: bucket})
|
||||
}
|
||||
|
||||
// Verify if bucket acl is legal.
|
||||
if !IsValidBucketACL(acl) {
|
||||
return probe.NewError(InvalidACL{ACL: acl})
|
||||
}
|
||||
|
||||
bucket = fs.denormalizeBucket(bucket)
|
||||
// get bucket path
|
||||
|
||||
// Get bucket path.
|
||||
bucketDir := filepath.Join(fs.path, bucket)
|
||||
if _, e := os.Stat(bucketDir); e == nil {
|
||||
return probe.NewError(BucketExists{Bucket: bucket})
|
||||
}
|
||||
|
||||
// make bucket
|
||||
// Make bucket.
|
||||
if e := os.Mkdir(bucketDir, 0700); e != nil {
|
||||
return probe.NewError(err)
|
||||
}
|
||||
|
||||
bucketMetadata := &BucketMetadata{}
|
||||
fi, e := os.Stat(bucketDir)
|
||||
// check if bucket exists
|
||||
// Check if bucket exists.
|
||||
if e != nil {
|
||||
if os.IsNotExist(e) {
|
||||
return probe.NewError(BucketNotFound{Bucket: bucket})
|
||||
@ -172,12 +178,17 @@ func (fs Filesystem) MakeBucket(bucket, acl string) *probe.Error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// denormalizeBucket - will convert incoming bucket names to
|
||||
// corresponding valid bucketnames on the backend in a platform
|
||||
// compatible way for all operating systems.
|
||||
func (fs Filesystem) denormalizeBucket(bucket string) string {
|
||||
buckets, err := ioutils.ReadDirNamesN(fs.path, fs.maxBuckets)
|
||||
if err != nil {
|
||||
buckets, e := ioutils.ReadDirNamesN(fs.path, fs.maxBuckets)
|
||||
if e != nil {
|
||||
return bucket
|
||||
}
|
||||
for _, b := range buckets {
|
||||
// Verify if lowercase version of the bucket is equal to the
|
||||
// incoming bucket, then use the proper name.
|
||||
if strings.ToLower(b) == bucket {
|
||||
return b
|
||||
}
|
||||
@ -185,21 +196,20 @@ func (fs Filesystem) denormalizeBucket(bucket string) string {
|
||||
return bucket
|
||||
}
|
||||
|
||||
// GetBucketMetadata - get bucket metadata
|
||||
// GetBucketMetadata - get bucket metadata.
|
||||
func (fs Filesystem) GetBucketMetadata(bucket string) (BucketMetadata, *probe.Error) {
|
||||
fs.lock.Lock()
|
||||
defer fs.lock.Unlock()
|
||||
fs.rwLock.RLock()
|
||||
defer fs.rwLock.RUnlock()
|
||||
if !IsValidBucketName(bucket) {
|
||||
return BucketMetadata{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
|
||||
}
|
||||
|
||||
bucket = fs.denormalizeBucket(bucket)
|
||||
|
||||
// get bucket path
|
||||
// Get bucket path.
|
||||
bucketDir := filepath.Join(fs.path, bucket)
|
||||
fi, e := os.Stat(bucketDir)
|
||||
if e != nil {
|
||||
// check if bucket exists
|
||||
// Check if bucket exists.
|
||||
if os.IsNotExist(e) {
|
||||
return BucketMetadata{}, probe.NewError(BucketNotFound{Bucket: bucket})
|
||||
}
|
||||
@ -215,13 +225,15 @@ func (fs Filesystem) GetBucketMetadata(bucket string) (BucketMetadata, *probe.Er
|
||||
return *bucketMetadata, nil
|
||||
}
|
||||
|
||||
// SetBucketMetadata - set bucket metadata
|
||||
// SetBucketMetadata - set bucket metadata.
|
||||
func (fs Filesystem) SetBucketMetadata(bucket string, metadata map[string]string) *probe.Error {
|
||||
fs.lock.Lock()
|
||||
defer fs.lock.Unlock()
|
||||
fs.rwLock.Lock()
|
||||
defer fs.rwLock.Unlock()
|
||||
// Input validation.
|
||||
if !IsValidBucketName(bucket) {
|
||||
return probe.NewError(BucketNameInvalid{Bucket: bucket})
|
||||
}
|
||||
// Save the acl.
|
||||
acl := metadata["acl"]
|
||||
if !IsValidBucketACL(acl) {
|
||||
return probe.NewError(InvalidACL{ACL: acl})
|
||||
@ -233,7 +245,7 @@ func (fs Filesystem) SetBucketMetadata(bucket string, metadata map[string]string
|
||||
bucketDir := filepath.Join(fs.path, bucket)
|
||||
fi, e := os.Stat(bucketDir)
|
||||
if e != nil {
|
||||
// check if bucket exists
|
||||
// Check if bucket exists.
|
||||
if os.IsNotExist(e) {
|
||||
return probe.NewError(BucketNotFound{Bucket: bucket})
|
||||
}
|
||||
|
@ -42,6 +42,7 @@ import (
|
||||
"github.com/minio/minio/pkg/disk"
|
||||
)
|
||||
|
||||
// isValidUploadID - is upload id.
|
||||
func (fs Filesystem) isValidUploadID(object, uploadID string) bool {
|
||||
s, ok := fs.multiparts.ActiveSession[object]
|
||||
if !ok {
|
||||
@ -55,15 +56,17 @@ func (fs Filesystem) isValidUploadID(object, uploadID string) bool {
|
||||
|
||||
// ListMultipartUploads - list incomplete multipart sessions for a given BucketMultipartResourcesMetadata
|
||||
func (fs Filesystem) ListMultipartUploads(bucket string, resources BucketMultipartResourcesMetadata) (BucketMultipartResourcesMetadata, *probe.Error) {
|
||||
fs.lock.Lock()
|
||||
defer fs.lock.Unlock()
|
||||
fs.rwLock.RLock()
|
||||
defer fs.rwLock.RUnlock()
|
||||
|
||||
// Input validation.
|
||||
if !IsValidBucketName(bucket) {
|
||||
return BucketMultipartResourcesMetadata{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
|
||||
}
|
||||
bucket = fs.denormalizeBucket(bucket)
|
||||
bucketPath := filepath.Join(fs.path, bucket)
|
||||
if _, e := os.Stat(bucketPath); e != nil {
|
||||
// check bucket exists
|
||||
// Check bucket exists.
|
||||
if os.IsNotExist(e) {
|
||||
return BucketMultipartResourcesMetadata{}, probe.NewError(BucketNotFound{Bucket: bucket})
|
||||
}
|
||||
@ -80,7 +83,7 @@ func (fs Filesystem) ListMultipartUploads(bucket string, resources BucketMultipa
|
||||
resources.IsTruncated = true
|
||||
return resources, nil
|
||||
}
|
||||
// uploadIDMarker is ignored if KeyMarker is empty
|
||||
// UploadIDMarker is ignored if KeyMarker is empty.
|
||||
switch {
|
||||
case resources.KeyMarker != "" && resources.UploadIDMarker == "":
|
||||
if object > resources.KeyMarker {
|
||||
@ -114,6 +117,7 @@ func (fs Filesystem) ListMultipartUploads(bucket string, resources BucketMultipa
|
||||
return resources, nil
|
||||
}
|
||||
|
||||
// concatenate parts.
|
||||
func (fs Filesystem) concatParts(parts *CompleteMultipartUpload, objectPath string, mw io.Writer) *probe.Error {
|
||||
for _, part := range parts.Part {
|
||||
partFile, e := os.OpenFile(objectPath+fmt.Sprintf("$%d-$multiparts", part.PartNumber), os.O_RDONLY, 0600)
|
||||
@ -123,15 +127,13 @@ func (fs Filesystem) concatParts(parts *CompleteMultipartUpload, objectPath stri
|
||||
}
|
||||
|
||||
recvMD5 := part.ETag
|
||||
// complete multipart request header md5sum per part is hex encoded
|
||||
// trim it and decode if possible.
|
||||
_, e = hex.DecodeString(strings.Trim(recvMD5, "\""))
|
||||
if e != nil {
|
||||
// Complete multipart request header md5sum per part is hex
|
||||
// encoded trim it and decode if possible.
|
||||
if _, e = hex.DecodeString(strings.Trim(recvMD5, "\"")); e != nil {
|
||||
return probe.NewError(InvalidDigest{Md5: recvMD5})
|
||||
}
|
||||
|
||||
_, e = io.Copy(mw, partFile)
|
||||
if e != nil {
|
||||
if _, e = io.Copy(mw, partFile); e != nil {
|
||||
return probe.NewError(e)
|
||||
}
|
||||
}
|
||||
@ -140,20 +142,22 @@ func (fs Filesystem) concatParts(parts *CompleteMultipartUpload, objectPath stri
|
||||
|
||||
// NewMultipartUpload - initiate a new multipart session
|
||||
func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.Error) {
|
||||
fs.lock.Lock()
|
||||
defer fs.lock.Unlock()
|
||||
fs.rwLock.Lock()
|
||||
defer fs.rwLock.Unlock()
|
||||
|
||||
di, e := disk.GetInfo(fs.path)
|
||||
if e != nil {
|
||||
return "", probe.NewError(e)
|
||||
}
|
||||
|
||||
// Remove 5% from total space for cumulative disk space used for journalling, inodes etc.
|
||||
// Remove 5% from total space for cumulative disk space used for
|
||||
// journalling, inodes etc.
|
||||
availableDiskSpace := (float64(di.Free) / (float64(di.Total) - (0.05 * float64(di.Total)))) * 100
|
||||
if int64(availableDiskSpace) <= fs.minFreeDisk {
|
||||
return "", probe.NewError(RootPathFull{Path: fs.path})
|
||||
}
|
||||
|
||||
// Input validation.
|
||||
if !IsValidBucketName(bucket) {
|
||||
return "", probe.NewError(BucketNameInvalid{Bucket: bucket})
|
||||
}
|
||||
@ -164,7 +168,7 @@ func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.E
|
||||
bucket = fs.denormalizeBucket(bucket)
|
||||
bucketPath := filepath.Join(fs.path, bucket)
|
||||
if _, e = os.Stat(bucketPath); e != nil {
|
||||
// check bucket exists
|
||||
// Check bucket exists.
|
||||
if os.IsNotExist(e) {
|
||||
return "", probe.NewError(BucketNotFound{Bucket: bucket})
|
||||
}
|
||||
@ -183,6 +187,7 @@ func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.E
|
||||
}
|
||||
}
|
||||
|
||||
// Generate new upload id.
|
||||
id := []byte(strconv.FormatInt(rand.Int63(), 10) + bucket + object + time.Now().String())
|
||||
uploadIDSum := sha512.Sum512(id)
|
||||
uploadID := base64.URLEncoding.EncodeToString(uploadIDSum[:])[:47]
|
||||
@ -193,6 +198,7 @@ func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.E
|
||||
}
|
||||
defer multiPartfile.Close()
|
||||
|
||||
// Initialize multipart session.
|
||||
mpartSession := &MultipartSession{}
|
||||
mpartSession.TotalParts = 0
|
||||
mpartSession.UploadID = uploadID
|
||||
@ -211,7 +217,7 @@ func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.E
|
||||
return uploadID, nil
|
||||
}
|
||||
|
||||
// partNumber is a sortable interface for Part slice
|
||||
// partNumber is a sortable interface for Part slice.
|
||||
type partNumber []*PartMetadata
|
||||
|
||||
func (a partNumber) Len() int { return len(a) }
|
||||
@ -220,33 +226,37 @@ func (a partNumber) Less(i, j int) bool { return a[i].PartNumber < a[j].PartNumb
|
||||
|
||||
// CreateObjectPart - create a part in a multipart session
|
||||
func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum string, partID int, size int64, data io.Reader, signature *Signature) (string, *probe.Error) {
|
||||
fs.lock.Lock()
|
||||
defer fs.lock.Unlock()
|
||||
fs.rwLock.Lock()
|
||||
defer fs.rwLock.Unlock()
|
||||
|
||||
di, err := disk.GetInfo(fs.path)
|
||||
if err != nil {
|
||||
return "", probe.NewError(err)
|
||||
}
|
||||
|
||||
// Remove 5% from total space for cumulative disk space used for journalling, inodes etc.
|
||||
// Remove 5% from total space for cumulative disk space used for
|
||||
// journalling, inodes etc.
|
||||
availableDiskSpace := (float64(di.Free) / (float64(di.Total) - (0.05 * float64(di.Total)))) * 100
|
||||
if int64(availableDiskSpace) <= fs.minFreeDisk {
|
||||
return "", probe.NewError(RootPathFull{Path: fs.path})
|
||||
}
|
||||
|
||||
// Part id cannot be negative.
|
||||
if partID <= 0 {
|
||||
return "", probe.NewError(errors.New("invalid part id, cannot be zero or less than zero"))
|
||||
}
|
||||
// check bucket name valid
|
||||
|
||||
// Check bucket name valid.
|
||||
if !IsValidBucketName(bucket) {
|
||||
return "", probe.NewError(BucketNameInvalid{Bucket: bucket})
|
||||
}
|
||||
|
||||
// verify object path legal
|
||||
// Verify object path legal.
|
||||
if !IsValidObjectName(object) {
|
||||
return "", probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
|
||||
}
|
||||
|
||||
// Verify upload is valid for the incoming object.
|
||||
if !fs.isValidUploadID(object, uploadID) {
|
||||
return "", probe.NewError(InvalidUploadID{UploadID: uploadID})
|
||||
}
|
||||
@ -255,7 +265,7 @@ func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum s
|
||||
var expectedMD5SumBytes []byte
|
||||
expectedMD5SumBytes, err = base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum))
|
||||
if err != nil {
|
||||
// pro-actively close the connection
|
||||
// Pro-actively close the connection
|
||||
return "", probe.NewError(InvalidDigest{Md5: expectedMD5Sum})
|
||||
}
|
||||
expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes)
|
||||
@ -264,7 +274,7 @@ func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum s
|
||||
bucket = fs.denormalizeBucket(bucket)
|
||||
bucketPath := filepath.Join(fs.path, bucket)
|
||||
if _, err = os.Stat(bucketPath); err != nil {
|
||||
// check bucket exists
|
||||
// Check bucket exists.
|
||||
if os.IsNotExist(err) {
|
||||
return "", probe.NewError(BucketNotFound{Bucket: bucket})
|
||||
}
|
||||
@ -285,7 +295,8 @@ func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum s
|
||||
return "", probe.NewError(e)
|
||||
}
|
||||
md5sum := hex.EncodeToString(h.Sum(nil))
|
||||
// Verify if the written object is equal to what is expected, only if it is requested as such
|
||||
// Verify if the written object is equal to what is expected, only
|
||||
// if it is requested as such.
|
||||
if strings.TrimSpace(expectedMD5Sum) != "" {
|
||||
if !isMD5SumEqual(strings.TrimSpace(expectedMD5Sum), md5sum) {
|
||||
partFile.CloseAndPurge()
|
||||
@ -340,19 +351,20 @@ func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum s
|
||||
|
||||
// CompleteMultipartUpload - complete a multipart upload and persist the data
|
||||
func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, data io.Reader, signature *Signature) (ObjectMetadata, *probe.Error) {
|
||||
fs.lock.Lock()
|
||||
defer fs.lock.Unlock()
|
||||
fs.rwLock.Lock()
|
||||
defer fs.rwLock.Unlock()
|
||||
|
||||
// check bucket name valid
|
||||
// Check bucket name is valid.
|
||||
if !IsValidBucketName(bucket) {
|
||||
return ObjectMetadata{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
|
||||
}
|
||||
|
||||
// verify object path legal
|
||||
// Verify object path is legal.
|
||||
if !IsValidObjectName(object) {
|
||||
return ObjectMetadata{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
|
||||
}
|
||||
|
||||
// Verify if valid upload for incoming object.
|
||||
if !fs.isValidUploadID(object, uploadID) {
|
||||
return ObjectMetadata{}, probe.NewError(InvalidUploadID{UploadID: uploadID})
|
||||
}
|
||||
@ -360,7 +372,7 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da
|
||||
bucket = fs.denormalizeBucket(bucket)
|
||||
bucketPath := filepath.Join(fs.path, bucket)
|
||||
if _, e := os.Stat(bucketPath); e != nil {
|
||||
// check bucket exists
|
||||
// Check bucket exists.
|
||||
if os.IsNotExist(e) {
|
||||
return ObjectMetadata{}, probe.NewError(BucketNotFound{Bucket: bucket})
|
||||
}
|
||||
@ -446,19 +458,20 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da
|
||||
|
||||
// ListObjectParts - list parts from incomplete multipart session for a given ObjectResourcesMetadata
|
||||
func (fs Filesystem) ListObjectParts(bucket, object string, resources ObjectResourcesMetadata) (ObjectResourcesMetadata, *probe.Error) {
|
||||
fs.lock.Lock()
|
||||
defer fs.lock.Unlock()
|
||||
fs.rwLock.Lock()
|
||||
defer fs.rwLock.Unlock()
|
||||
|
||||
// check bucket name valid
|
||||
// Check bucket name is valid.
|
||||
if !IsValidBucketName(bucket) {
|
||||
return ObjectResourcesMetadata{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
|
||||
}
|
||||
|
||||
// verify object path legal
|
||||
// Verify object path legal.
|
||||
if !IsValidObjectName(object) {
|
||||
return ObjectResourcesMetadata{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
|
||||
}
|
||||
|
||||
// Verify if upload id is valid for incoming object.
|
||||
if !fs.isValidUploadID(object, resources.UploadID) {
|
||||
return ObjectResourcesMetadata{}, probe.NewError(InvalidUploadID{UploadID: resources.UploadID})
|
||||
}
|
||||
@ -477,7 +490,7 @@ func (fs Filesystem) ListObjectParts(bucket, object string, resources ObjectReso
|
||||
bucket = fs.denormalizeBucket(bucket)
|
||||
bucketPath := filepath.Join(fs.path, bucket)
|
||||
if _, e := os.Stat(bucketPath); e != nil {
|
||||
// check bucket exists
|
||||
// Check bucket exists.
|
||||
if os.IsNotExist(e) {
|
||||
return ObjectResourcesMetadata{}, probe.NewError(BucketNotFound{Bucket: bucket})
|
||||
}
|
||||
@ -514,15 +527,15 @@ func (fs Filesystem) ListObjectParts(bucket, object string, resources ObjectReso
|
||||
|
||||
// AbortMultipartUpload - abort an incomplete multipart session
|
||||
func (fs Filesystem) AbortMultipartUpload(bucket, object, uploadID string) *probe.Error {
|
||||
fs.lock.Lock()
|
||||
defer fs.lock.Unlock()
|
||||
fs.rwLock.Lock()
|
||||
defer fs.rwLock.Unlock()
|
||||
|
||||
// check bucket name valid
|
||||
// Check bucket name valid.
|
||||
if !IsValidBucketName(bucket) {
|
||||
return probe.NewError(BucketNameInvalid{Bucket: bucket})
|
||||
}
|
||||
|
||||
// verify object path legal
|
||||
// Verify object path legal.
|
||||
if !IsValidObjectName(object) {
|
||||
return probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
|
||||
}
|
||||
@ -534,7 +547,7 @@ func (fs Filesystem) AbortMultipartUpload(bucket, object, uploadID string) *prob
|
||||
bucket = fs.denormalizeBucket(bucket)
|
||||
bucketPath := filepath.Join(fs.path, bucket)
|
||||
if _, e := os.Stat(bucketPath); e != nil {
|
||||
// check bucket exists
|
||||
// Check bucket exists.
|
||||
if os.IsNotExist(e) {
|
||||
return probe.NewError(BucketNotFound{Bucket: bucket})
|
||||
}
|
||||
|
@ -40,19 +40,18 @@ import (
|
||||
|
||||
// GetObject - GET object
|
||||
func (fs Filesystem) GetObject(w io.Writer, bucket, object string, start, length int64) (int64, *probe.Error) {
|
||||
fs.lock.Lock()
|
||||
defer fs.lock.Unlock()
|
||||
fs.rwLock.RLock()
|
||||
defer fs.rwLock.RUnlock()
|
||||
|
||||
// validate bucket
|
||||
// Input validation.
|
||||
if !IsValidBucketName(bucket) {
|
||||
return 0, probe.NewError(BucketNameInvalid{Bucket: bucket})
|
||||
}
|
||||
|
||||
// validate object
|
||||
if !IsValidObjectName(object) {
|
||||
return 0, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
|
||||
}
|
||||
|
||||
// normalize buckets.
|
||||
bucket = fs.denormalizeBucket(bucket)
|
||||
bucketPath := filepath.Join(fs.path, bucket)
|
||||
if _, e := os.Stat(bucketPath); e != nil {
|
||||
@ -101,11 +100,12 @@ func (fs Filesystem) GetObject(w io.Writer, bucket, object string, start, length
|
||||
return count, nil
|
||||
}
|
||||
|
||||
// GetObjectMetadata - HEAD object
|
||||
// GetObjectMetadata - get object metadata.
|
||||
func (fs Filesystem) GetObjectMetadata(bucket, object string) (ObjectMetadata, *probe.Error) {
|
||||
fs.lock.Lock()
|
||||
defer fs.lock.Unlock()
|
||||
fs.rwLock.RLock()
|
||||
defer fs.rwLock.RUnlock()
|
||||
|
||||
// Input validation.
|
||||
if !IsValidBucketName(bucket) {
|
||||
return ObjectMetadata{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
|
||||
}
|
||||
@ -114,6 +114,7 @@ func (fs Filesystem) GetObjectMetadata(bucket, object string) (ObjectMetadata, *
|
||||
return ObjectMetadata{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: bucket})
|
||||
}
|
||||
|
||||
// normalize buckets.
|
||||
bucket = fs.denormalizeBucket(bucket)
|
||||
bucketPath := filepath.Join(fs.path, bucket)
|
||||
if _, e := os.Stat(bucketPath); e != nil {
|
||||
@ -133,9 +134,12 @@ func (fs Filesystem) GetObjectMetadata(bucket, object string) (ObjectMetadata, *
|
||||
return metadata, nil
|
||||
}
|
||||
|
||||
// getMetadata - get object metadata.
|
||||
func getMetadata(rootPath, bucket, object string) (ObjectMetadata, *probe.Error) {
|
||||
// Do not use filepath.Join() since filepath.Join strips off any object names with '/', use them as is
|
||||
// in a static manner so that we can send a proper 'ObjectNotFound' reply back upon os.Stat()
|
||||
// Do not use filepath.Join() since filepath.Join strips off any
|
||||
// object names with '/', use them as is in a static manner so
|
||||
// that we can send a proper 'ObjectNotFound' reply back upon
|
||||
// os.Stat().
|
||||
var objectPath string
|
||||
// For windows use its special os.PathSeparator == "\\"
|
||||
if runtime.GOOS == "windows" {
|
||||
@ -187,23 +191,24 @@ func isMD5SumEqual(expectedMD5Sum, actualMD5Sum string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// CreateObject - PUT object
|
||||
// CreateObject - create an object.
|
||||
func (fs Filesystem) CreateObject(bucket, object, expectedMD5Sum string, size int64, data io.Reader, signature *Signature) (ObjectMetadata, *probe.Error) {
|
||||
fs.lock.Lock()
|
||||
defer fs.lock.Unlock()
|
||||
fs.rwLock.Lock()
|
||||
defer fs.rwLock.Unlock()
|
||||
|
||||
di, e := disk.GetInfo(fs.path)
|
||||
if e != nil {
|
||||
return ObjectMetadata{}, probe.NewError(e)
|
||||
}
|
||||
|
||||
// Remove 5% from total space for cumulative disk space used for journalling, inodes etc.
|
||||
// Remove 5% from total space for cumulative disk space used for
|
||||
// journalling, inodes etc.
|
||||
availableDiskSpace := (float64(di.Free) / (float64(di.Total) - (0.05 * float64(di.Total)))) * 100
|
||||
if int64(availableDiskSpace) <= fs.minFreeDisk {
|
||||
return ObjectMetadata{}, probe.NewError(RootPathFull{Path: fs.path})
|
||||
}
|
||||
|
||||
// check bucket name valid
|
||||
// Check bucket name valid.
|
||||
if !IsValidBucketName(bucket) {
|
||||
return ObjectMetadata{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
|
||||
}
|
||||
@ -216,24 +221,24 @@ func (fs Filesystem) CreateObject(bucket, object, expectedMD5Sum string, size in
|
||||
}
|
||||
return ObjectMetadata{}, probe.NewError(e)
|
||||
}
|
||||
// verify object path legal
|
||||
// Verify object path legal.
|
||||
if !IsValidObjectName(object) {
|
||||
return ObjectMetadata{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
|
||||
}
|
||||
|
||||
// get object path
|
||||
// Get object path.
|
||||
objectPath := filepath.Join(bucketPath, object)
|
||||
if strings.TrimSpace(expectedMD5Sum) != "" {
|
||||
var expectedMD5SumBytes []byte
|
||||
expectedMD5SumBytes, e = base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum))
|
||||
if e != nil {
|
||||
// pro-actively close the connection
|
||||
// Pro-actively close the connection.
|
||||
return ObjectMetadata{}, probe.NewError(InvalidDigest{Md5: expectedMD5Sum})
|
||||
}
|
||||
expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes)
|
||||
}
|
||||
|
||||
// write object
|
||||
// Write object.
|
||||
file, e := atomic.FileCreateWithPrefix(objectPath, "")
|
||||
if e != nil {
|
||||
switch e := e.(type) {
|
||||
@ -266,7 +271,8 @@ func (fs Filesystem) CreateObject(bucket, object, expectedMD5Sum string, size in
|
||||
}
|
||||
|
||||
md5Sum := hex.EncodeToString(h.Sum(nil))
|
||||
// Verify if the written object is equal to what is expected, only if it is requested as such
|
||||
// Verify if the written object is equal to what is expected, only
|
||||
// if it is requested as such.
|
||||
if strings.TrimSpace(expectedMD5Sum) != "" {
|
||||
if !isMD5SumEqual(strings.TrimSpace(expectedMD5Sum), md5Sum) {
|
||||
file.CloseAndPurge()
|
||||
@ -306,18 +312,19 @@ func (fs Filesystem) CreateObject(bucket, object, expectedMD5Sum string, size in
|
||||
return newObject, nil
|
||||
}
|
||||
|
||||
// deleteObjectPath - delete object path if its empty.
|
||||
func deleteObjectPath(basePath, deletePath, bucket, object string) *probe.Error {
|
||||
if basePath == deletePath {
|
||||
return nil
|
||||
}
|
||||
fi, e := os.Stat(deletePath)
|
||||
pathSt, e := os.Stat(deletePath)
|
||||
if e != nil {
|
||||
if os.IsNotExist(e) {
|
||||
return probe.NewError(ObjectNotFound{Bucket: bucket, Object: object})
|
||||
}
|
||||
return probe.NewError(e)
|
||||
}
|
||||
if fi.IsDir() {
|
||||
if pathSt.IsDir() {
|
||||
empty, e := ioutils.IsDirEmpty(deletePath)
|
||||
if e != nil {
|
||||
return probe.NewError(e)
|
||||
@ -337,8 +344,8 @@ func deleteObjectPath(basePath, deletePath, bucket, object string) *probe.Error
|
||||
|
||||
// DeleteObject - delete and object
|
||||
func (fs Filesystem) DeleteObject(bucket, object string) *probe.Error {
|
||||
fs.lock.Lock()
|
||||
defer fs.lock.Unlock()
|
||||
fs.rwLock.Lock()
|
||||
defer fs.rwLock.Unlock()
|
||||
|
||||
// check bucket name valid
|
||||
if !IsValidBucketName(bucket) {
|
||||
@ -369,10 +376,10 @@ func (fs Filesystem) DeleteObject(bucket, object string) *probe.Error {
|
||||
objectPath = fs.path + string(os.PathSeparator) + bucket + string(os.PathSeparator) + object
|
||||
}
|
||||
err := deleteObjectPath(bucketPath, objectPath, bucket, object)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err.ToGoError()) {
|
||||
return probe.NewError(ObjectNotFound{Bucket: bucket, Object: object})
|
||||
}
|
||||
if err != nil {
|
||||
return err.Trace(bucketPath, objectPath, bucket, object)
|
||||
}
|
||||
return nil
|
||||
|
34
pkg/fs/fs.go
34
pkg/fs/fs.go
@ -31,7 +31,7 @@ type Filesystem struct {
|
||||
path string
|
||||
minFreeDisk int64
|
||||
maxBuckets int
|
||||
lock *sync.Mutex
|
||||
rwLock *sync.RWMutex
|
||||
multiparts *Multiparts
|
||||
buckets *Buckets
|
||||
listServiceReqCh chan<- listServiceReq
|
||||
@ -59,7 +59,7 @@ type Multiparts struct {
|
||||
}
|
||||
|
||||
// New instantiate a new donut
|
||||
func New(rootPath string) (Filesystem, *probe.Error) {
|
||||
func New(rootPath string, minFreeDisk int64, maxBuckets int) (Filesystem, *probe.Error) {
|
||||
setFSBucketsConfigPath(filepath.Join(rootPath, "$buckets.json"))
|
||||
setFSMultipartsConfigPath(filepath.Join(rootPath, "$multiparts-session.json"))
|
||||
|
||||
@ -80,8 +80,11 @@ func New(rootPath string) (Filesystem, *probe.Error) {
|
||||
return Filesystem{}, err.Trace()
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize contentdb.
|
||||
contentdb.Init()
|
||||
if e := contentdb.Init(); e != nil {
|
||||
return Filesystem{}, probe.NewError(e)
|
||||
}
|
||||
|
||||
var buckets *Buckets
|
||||
buckets, err = loadBucketsMetadata()
|
||||
@ -98,16 +101,18 @@ func New(rootPath string) (Filesystem, *probe.Error) {
|
||||
return Filesystem{}, err.Trace()
|
||||
}
|
||||
}
|
||||
fs := Filesystem{lock: new(sync.Mutex)}
|
||||
fs := Filesystem{
|
||||
rwLock: &sync.RWMutex{},
|
||||
}
|
||||
fs.path = rootPath
|
||||
fs.multiparts = multiparts
|
||||
fs.buckets = buckets
|
||||
/// Defaults
|
||||
|
||||
// maximum buckets to be listed from list buckets.
|
||||
fs.maxBuckets = 1000
|
||||
fs.maxBuckets = maxBuckets
|
||||
// minium free disk required for i/o operations to succeed.
|
||||
fs.minFreeDisk = 10
|
||||
fs.minFreeDisk = minFreeDisk
|
||||
|
||||
// Start list goroutine.
|
||||
if err = fs.listObjectsService(); err != nil {
|
||||
@ -116,20 +121,3 @@ func New(rootPath string) (Filesystem, *probe.Error) {
|
||||
// Return here.
|
||||
return fs, nil
|
||||
}
|
||||
|
||||
// SetMinFreeDisk - set min free disk
|
||||
func (fs *Filesystem) SetMinFreeDisk(minFreeDisk int64) {
|
||||
fs.lock.Lock()
|
||||
defer fs.lock.Unlock()
|
||||
fs.minFreeDisk = minFreeDisk
|
||||
}
|
||||
|
||||
// SetMaxBuckets - set total number of buckets supported, default is 100.
|
||||
func (fs *Filesystem) SetMaxBuckets(maxBuckets int) {
|
||||
fs.lock.Lock()
|
||||
defer fs.lock.Unlock()
|
||||
if maxBuckets == 0 {
|
||||
maxBuckets = 100
|
||||
}
|
||||
fs.maxBuckets = maxBuckets
|
||||
}
|
||||
|
@ -36,8 +36,7 @@ func (s *MySuite) TestAPISuite(c *C) {
|
||||
path, e := ioutil.TempDir(os.TempDir(), "minio-")
|
||||
c.Check(e, IsNil)
|
||||
storageList = append(storageList, path)
|
||||
store, err := New(path)
|
||||
store.SetMinFreeDisk(0)
|
||||
store, err := New(path, 0, 1000)
|
||||
c.Check(err, IsNil)
|
||||
return store
|
||||
}
|
||||
|
@ -143,10 +143,9 @@ func getNewWebAPI(conf cloudServerConfig) *WebAPI {
|
||||
|
||||
// getNewCloudStorageAPI instantiate a new CloudStorageAPI.
|
||||
func getNewCloudStorageAPI(conf cloudServerConfig) CloudStorageAPI {
|
||||
fs, err := fs.New(conf.Path)
|
||||
fs, err := fs.New(conf.Path, conf.MinFreeDisk, conf.MaxBuckets)
|
||||
fatalIf(err.Trace(), "Initializing filesystem failed.", nil)
|
||||
|
||||
fs.SetMinFreeDisk(conf.MinFreeDisk)
|
||||
return CloudStorageAPI{
|
||||
Filesystem: fs,
|
||||
AccessLog: conf.AccessLog,
|
||||
|
@ -58,6 +58,9 @@ EXAMPLES:
|
||||
4. Start minio server with minimum free disk threshold to 5%
|
||||
$ minio {{.Name}} min-free-disk 5% /home/shared/Pictures
|
||||
|
||||
5. Start minio server with minimum free disk threshold to 15% and support upto 2000 buckets.
|
||||
$ minio {{.Name}} min-free-disk 15% /home/shared/Documents max-buckets 2000
|
||||
|
||||
`,
|
||||
}
|
||||
|
||||
@ -74,6 +77,7 @@ type cloudServerConfig struct {
|
||||
/// FS options
|
||||
Path string // Path to export for cloud storage
|
||||
MinFreeDisk int64 // Minimum free disk space for filesystem
|
||||
MaxBuckets int // Maximum number of buckets suppported by filesystem.
|
||||
|
||||
/// TLS service
|
||||
TLS bool // TLS on when certs are specified
|
||||
@ -284,9 +288,12 @@ func serverMain(c *cli.Context) {
|
||||
}
|
||||
|
||||
var minFreeDisk int64
|
||||
var maxBuckets int
|
||||
minFreeDiskSet := false
|
||||
maxBucketsSet := false
|
||||
// Default
|
||||
minFreeDisk = 10
|
||||
maxBuckets = 1000
|
||||
|
||||
args := c.Args()
|
||||
for len(args) >= 2 {
|
||||
@ -300,6 +307,16 @@ func serverMain(c *cli.Context) {
|
||||
fatalIf(err.Trace(args.First()), "Invalid minium free disk size "+args.First()+" passed.", nil)
|
||||
args = args.Tail()
|
||||
minFreeDiskSet = true
|
||||
case "max-buckets":
|
||||
if maxBucketsSet {
|
||||
fatalIf(probe.NewError(errInvalidArgument), "Maximum buckets should be set only once.", nil)
|
||||
}
|
||||
args = args.Tail()
|
||||
var e error
|
||||
maxBuckets, e = strconv.Atoi(args.First())
|
||||
fatalIf(probe.NewError(e), "Invalid max buckets "+args.First()+" passed.", nil)
|
||||
args = args.Tail()
|
||||
maxBucketsSet = true
|
||||
default:
|
||||
cli.ShowCommandHelpAndExit(c, "server", 1) // last argument is exit code
|
||||
}
|
||||
@ -318,6 +335,7 @@ func serverMain(c *cli.Context) {
|
||||
SecretAccessKey: conf.Credentials.SecretAccessKey,
|
||||
Path: path,
|
||||
MinFreeDisk: minFreeDisk,
|
||||
MaxBuckets: maxBuckets,
|
||||
TLS: tls,
|
||||
CertFile: certFile,
|
||||
KeyFile: keyFile,
|
||||
|
Loading…
x
Reference in New Issue
Block a user