Implementing min-free-disk

This commit is contained in:
Harshavardhana
2015-10-17 19:17:33 -07:00
parent 18a6d7ea5d
commit 5b2fa33bdb
20 changed files with 710 additions and 630 deletions

View File

@@ -39,9 +39,10 @@ import (
"github.com/minio/minio-xl/pkg/crypto/sha256"
"github.com/minio/minio-xl/pkg/crypto/sha512"
"github.com/minio/minio-xl/pkg/probe"
"github.com/minio/minio/pkg/disk"
)
func (fs API) isValidUploadID(object, uploadID string) bool {
func (fs Filesystem) isValidUploadID(object, uploadID string) bool {
s, ok := fs.multiparts.ActiveSession[object]
if !ok {
return false
@@ -53,7 +54,7 @@ func (fs API) isValidUploadID(object, uploadID string) bool {
}
// ListMultipartUploads - list incomplete multipart sessions for a given BucketMultipartResourcesMetadata
func (fs API) ListMultipartUploads(bucket string, resources BucketMultipartResourcesMetadata) (BucketMultipartResourcesMetadata, *probe.Error) {
func (fs Filesystem) ListMultipartUploads(bucket string, resources BucketMultipartResourcesMetadata) (BucketMultipartResourcesMetadata, *probe.Error) {
fs.lock.Lock()
defer fs.lock.Unlock()
if !IsValidBucket(bucket) {
@@ -113,7 +114,7 @@ func (fs API) ListMultipartUploads(bucket string, resources BucketMultipartResou
return resources, nil
}
func (fs API) concatParts(parts *CompleteMultipartUpload, objectPath string, mw io.Writer) *probe.Error {
func (fs Filesystem) concatParts(parts *CompleteMultipartUpload, objectPath string, mw io.Writer) *probe.Error {
for _, part := range parts.Part {
recvMD5 := part.ETag
partFile, err := os.OpenFile(objectPath+fmt.Sprintf("$%d", part.PartNumber), os.O_RDONLY, 0600)
@@ -143,9 +144,19 @@ func (fs API) concatParts(parts *CompleteMultipartUpload, objectPath string, mw
}
// NewMultipartUpload - initiate a new multipart session
func (fs API) NewMultipartUpload(bucket, object string) (string, *probe.Error) {
func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.Error) {
fs.lock.Lock()
defer fs.lock.Unlock()
stfs, err := disk.Stat(fs.path)
if err != nil {
return "", probe.NewError(err)
}
if int64((float64(stfs.Free)/float64(stfs.Total))*100) <= fs.minFreeDisk {
return "", probe.NewError(RootPathFull{Path: fs.path})
}
if !IsValidBucket(bucket) {
return "", probe.NewError(BucketNameInvalid{Bucket: bucket})
}
@@ -154,7 +165,7 @@ func (fs API) NewMultipartUpload(bucket, object string) (string, *probe.Error) {
}
bucketPath := filepath.Join(fs.path, bucket)
_, err := os.Stat(bucketPath)
_, err = os.Stat(bucketPath)
// check bucket exists
if os.IsNotExist(err) {
return "", probe.NewError(BucketNotFound{Bucket: bucket})
@@ -208,10 +219,19 @@ func (a partNumber) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a partNumber) Less(i, j int) bool { return a[i].PartNumber < a[j].PartNumber }
// CreateObjectPart - create a part in a multipart session
func (fs API) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum string, partID int, size int64, data io.Reader, signature *Signature) (string, *probe.Error) {
func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum string, partID int, size int64, data io.Reader, signature *Signature) (string, *probe.Error) {
fs.lock.Lock()
defer fs.lock.Unlock()
stfs, err := disk.Stat(fs.path)
if err != nil {
return "", probe.NewError(err)
}
if int64((float64(stfs.Free)/float64(stfs.Total))*100) <= fs.minFreeDisk {
return "", probe.NewError(RootPathFull{Path: fs.path})
}
if partID <= 0 {
return "", probe.NewError(errors.New("invalid part id, cannot be zero or less than zero"))
}
@@ -230,7 +250,8 @@ func (fs API) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum string,
}
if strings.TrimSpace(expectedMD5Sum) != "" {
expectedMD5SumBytes, err := base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum))
var expectedMD5SumBytes []byte
expectedMD5SumBytes, err = base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum))
if err != nil {
// pro-actively close the connection
return "", probe.NewError(InvalidDigest{Md5: expectedMD5Sum})
@@ -239,7 +260,7 @@ func (fs API) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum string,
}
bucketPath := filepath.Join(fs.path, bucket)
if _, err := os.Stat(bucketPath); err != nil {
if _, err = os.Stat(bucketPath); err != nil {
// check bucket exists
if os.IsNotExist(err) {
return "", probe.NewError(BucketNotFound{Bucket: bucket})
@@ -321,7 +342,7 @@ func (fs API) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum string,
}
// CompleteMultipartUpload - complete a multipart upload and persist the data
func (fs API) CompleteMultipartUpload(bucket, object, uploadID string, data io.Reader, signature *Signature) (ObjectMetadata, *probe.Error) {
func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, data io.Reader, signature *Signature) (ObjectMetadata, *probe.Error) {
fs.lock.Lock()
defer fs.lock.Unlock()
@@ -424,7 +445,7 @@ func (fs API) CompleteMultipartUpload(bucket, object, uploadID string, data io.R
}
// ListObjectParts - list parts from incomplete multipart session for a given ObjectResourcesMetadata
func (fs API) ListObjectParts(bucket, object string, resources ObjectResourcesMetadata) (ObjectResourcesMetadata, *probe.Error) {
func (fs Filesystem) ListObjectParts(bucket, object string, resources ObjectResourcesMetadata) (ObjectResourcesMetadata, *probe.Error) {
fs.lock.Lock()
defer fs.lock.Unlock()
@@ -493,7 +514,7 @@ func (fs API) ListObjectParts(bucket, object string, resources ObjectResourcesMe
}
// AbortMultipartUpload - abort an incomplete multipart session
func (fs API) AbortMultipartUpload(bucket, object, uploadID string) *probe.Error {
func (fs Filesystem) AbortMultipartUpload(bucket, object, uploadID string) *probe.Error {
fs.lock.Lock()
defer fs.lock.Unlock()