api: Various fixes.

- limit list buckets to limit only 100 buckets, all uppercase buckets
  are now lowercase and work transparently with all calls.
- Change disk.Stat to disk.GetInfo and return back disk.Info{} struct.
- Introduce new ioutils package which implements ReadDirN(path, n),
  ReadDirNamesN(path, n)
This commit is contained in:
Harshavardhana
2016-01-24 23:03:38 -08:00
committed by Harshavardhana
parent 432a073e6b
commit 497f13d733
11 changed files with 247 additions and 132 deletions

View File

@@ -59,14 +59,14 @@ func (fs Filesystem) ListMultipartUploads(bucket string, resources BucketMultipa
if !IsValidBucketName(bucket) {
return BucketMultipartResourcesMetadata{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
}
bucket = fs.denormalizeBucket(bucket)
bucketPath := filepath.Join(fs.path, bucket)
_, err := os.Stat(bucketPath)
// check bucket exists
if os.IsNotExist(err) {
return BucketMultipartResourcesMetadata{}, probe.NewError(BucketNotFound{Bucket: bucket})
}
if err != nil {
return BucketMultipartResourcesMetadata{}, probe.NewError(InternalError{})
if _, e := os.Stat(bucketPath); e != nil {
// check bucket exists
if os.IsNotExist(e) {
return BucketMultipartResourcesMetadata{}, probe.NewError(BucketNotFound{Bucket: bucket})
}
return BucketMultipartResourcesMetadata{}, probe.NewError(e)
}
var uploads []*UploadMetadata
for object, session := range fs.multiparts.ActiveSession {
@@ -142,13 +142,13 @@ func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.E
fs.lock.Lock()
defer fs.lock.Unlock()
stfs, err := disk.Stat(fs.path)
if err != nil {
return "", probe.NewError(err)
di, e := disk.GetInfo(fs.path)
if e != nil {
return "", probe.NewError(e)
}
// Remove 5% from total space for cumulative disk space used for journalling, inodes etc.
availableDiskSpace := (float64(stfs.Free) / (float64(stfs.Total) - (0.05 * float64(stfs.Total)))) * 100
availableDiskSpace := (float64(di.Free) / (float64(di.Total) - (0.05 * float64(di.Total)))) * 100
if int64(availableDiskSpace) <= fs.minFreeDisk {
return "", probe.NewError(RootPathFull{Path: fs.path})
}
@@ -160,31 +160,35 @@ func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.E
return "", probe.NewError(ObjectNameInvalid{Object: object})
}
bucket = fs.denormalizeBucket(bucket)
bucketPath := filepath.Join(fs.path, bucket)
_, err = os.Stat(bucketPath)
// check bucket exists
if os.IsNotExist(err) {
return "", probe.NewError(BucketNotFound{Bucket: bucket})
}
if err != nil {
return "", probe.NewError(InternalError{})
if _, e := os.Stat(bucketPath); e != nil {
// check bucket exists
if os.IsNotExist(e) {
return "", probe.NewError(BucketNotFound{Bucket: bucket})
}
return "", probe.NewError(e)
}
objectPath := filepath.Join(bucketPath, object)
objectDir := filepath.Dir(objectPath)
if _, err = os.Stat(objectDir); os.IsNotExist(err) {
err = os.MkdirAll(objectDir, 0700)
if err != nil {
return "", probe.NewError(err)
if _, e := os.Stat(objectDir); e != nil {
if os.IsNotExist(e) {
e = os.MkdirAll(objectDir, 0700)
if e != nil {
return "", probe.NewError(e)
}
}
return "", probe.NewError(e)
}
id := []byte(strconv.FormatInt(rand.Int63(), 10) + bucket + object + time.Now().String())
uploadIDSum := sha512.Sum512(id)
uploadID := base64.URLEncoding.EncodeToString(uploadIDSum[:])[:47]
multiPartfile, err := os.OpenFile(objectPath+"$multiparts", os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
return "", probe.NewError(err)
multiPartfile, e := os.OpenFile(objectPath+"$multiparts", os.O_WRONLY|os.O_CREATE, 0600)
if e != nil {
return "", probe.NewError(e)
}
defer multiPartfile.Close()
@@ -197,9 +201,8 @@ func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.E
fs.multiparts.ActiveSession[object] = mpartSession
encoder := json.NewEncoder(multiPartfile)
err = encoder.Encode(mpartSession)
if err != nil {
return "", probe.NewError(err)
if e = encoder.Encode(mpartSession); e != nil {
return "", probe.NewError(e)
}
if err := saveMultipartsSession(fs.multiparts); err != nil {
return "", err.Trace()
@@ -219,13 +222,13 @@ func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum s
fs.lock.Lock()
defer fs.lock.Unlock()
stfs, err := disk.Stat(fs.path)
di, err := disk.GetInfo(fs.path)
if err != nil {
return "", probe.NewError(err)
}
// Remove 5% from total space for cumulative disk space used for journalling, inodes etc.
availableDiskSpace := (float64(stfs.Free) / (float64(stfs.Total) - (0.05 * float64(stfs.Total)))) * 100
availableDiskSpace := (float64(di.Free) / (float64(di.Total) - (0.05 * float64(di.Total)))) * 100
if int64(availableDiskSpace) <= fs.minFreeDisk {
return "", probe.NewError(RootPathFull{Path: fs.path})
}
@@ -257,15 +260,14 @@ func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum s
expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes)
}
bucket = fs.denormalizeBucket(bucket)
bucketPath := filepath.Join(fs.path, bucket)
if _, err = os.Stat(bucketPath); err != nil {
// check bucket exists
if os.IsNotExist(err) {
return "", probe.NewError(BucketNotFound{Bucket: bucket})
}
if err != nil {
return "", probe.NewError(InternalError{})
}
return "", probe.NewError(err)
}
objectPath := filepath.Join(bucketPath, object)
@@ -357,6 +359,7 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da
return ObjectMetadata{}, probe.NewError(InvalidUploadID{UploadID: uploadID})
}
bucket = fs.denormalizeBucket(bucket)
bucketPath := filepath.Join(fs.path, bucket)
if _, err := os.Stat(bucketPath); err != nil {
// check bucket exists
@@ -470,14 +473,14 @@ func (fs Filesystem) ListObjectParts(bucket, object string, resources ObjectReso
startPartNumber = objectResourcesMetadata.PartNumberMarker
}
bucket = fs.denormalizeBucket(bucket)
bucketPath := filepath.Join(fs.path, bucket)
_, err := os.Stat(bucketPath)
// check bucket exists
if os.IsNotExist(err) {
return ObjectResourcesMetadata{}, probe.NewError(BucketNotFound{Bucket: bucket})
}
if err != nil {
return ObjectResourcesMetadata{}, probe.NewError(InternalError{})
if _, e := os.Stat(bucketPath); e != nil {
// check bucket exists
if os.IsNotExist(e) {
return ObjectResourcesMetadata{}, probe.NewError(BucketNotFound{Bucket: bucket})
}
return ObjectResourcesMetadata{}, probe.NewError(e)
}
objectPath := filepath.Join(bucketPath, object)
@@ -528,27 +531,26 @@ func (fs Filesystem) AbortMultipartUpload(bucket, object, uploadID string) *prob
return probe.NewError(InvalidUploadID{UploadID: uploadID})
}
bucket = fs.denormalizeBucket(bucket)
bucketPath := filepath.Join(fs.path, bucket)
_, err := os.Stat(bucketPath)
// check bucket exists
if os.IsNotExist(err) {
return probe.NewError(BucketNotFound{Bucket: bucket})
}
if err != nil {
return probe.NewError(InternalError{})
if _, e := os.Stat(bucketPath); e != nil {
// check bucket exists
if os.IsNotExist(e) {
return probe.NewError(BucketNotFound{Bucket: bucket})
}
return probe.NewError(e)
}
objectPath := filepath.Join(bucketPath, object)
for _, part := range fs.multiparts.ActiveSession[object].Parts {
err = os.RemoveAll(objectPath + fmt.Sprintf("$%d-$multiparts", part.PartNumber))
if err != nil {
return probe.NewError(err)
e := os.RemoveAll(objectPath + fmt.Sprintf("$%d-$multiparts", part.PartNumber))
if e != nil {
return probe.NewError(e)
}
}
delete(fs.multiparts.ActiveSession, object)
err = os.RemoveAll(objectPath + "$multiparts")
if err != nil {
return probe.NewError(err)
if e := os.RemoveAll(objectPath + "$multiparts"); e != nil {
return probe.NewError(e)
}
return nil
}