XL: Cleanup, comments and all the updated functions. (#1830)

This commit is contained in:
Harshavardhana
2016-06-01 16:43:31 -07:00
parent 9b79760dcf
commit ae311aa53b
17 changed files with 854 additions and 822 deletions

View File

@@ -23,22 +23,20 @@ import (
"strings"
"sync"
"time"
"github.com/skyrings/skyring-common/tools/uuid"
)
// uploadInfo -
// A uploadInfo represents the s3 compatible spec.
type uploadInfo struct {
UploadID string `json:"uploadId"`
Deleted bool `json:"deleted"` // Currently unused.
Initiated time.Time `json:"initiated"`
UploadID string `json:"uploadId"` // UploadID for the active multipart upload.
Deleted bool `json:"deleted"` // Currently unused, for future use.
Initiated time.Time `json:"initiated"` // Indicates when the uploadID was initiated.
}
// uploadsV1 -
// A uploadsV1 represents `uploads.json` metadata header.
type uploadsV1 struct {
Version string `json:"version"`
Format string `json:"format"`
Uploads []uploadInfo `json:"uploadIds"`
Version string `json:"version"` // Version of the current `uploads.json`
Format string `json:"format"` // Format of the current `uploads.json`
Uploads []uploadInfo `json:"uploadIds"` // Captures all the upload ids for a given object.
}
// byInitiatedTime is a collection satisfying sort.Interface.
@@ -70,49 +68,21 @@ func (u uploadsV1) Index(uploadID string) int {
}
// readUploadsJSON - get all the saved uploads JSON.
func readUploadsJSON(bucket, object string, storageDisks ...StorageAPI) (uploadIDs uploadsV1, err error) {
func readUploadsJSON(bucket, object string, disk StorageAPI) (uploadIDs uploadsV1, err error) {
uploadJSONPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile)
var errs = make([]error, len(storageDisks))
var uploads = make([]uploadsV1, len(storageDisks))
var wg = &sync.WaitGroup{}
// Read `uploads.json` from all disks.
for index, disk := range storageDisks {
wg.Add(1)
// Read `uploads.json` in a routine.
go func(index int, disk StorageAPI) {
defer wg.Done()
// Read all of 'uploads.json'
buffer, rErr := readAll(disk, minioMetaBucket, uploadJSONPath)
if rErr != nil {
errs[index] = rErr
return
}
rErr = json.Unmarshal(buffer, &uploads[index])
if rErr != nil {
errs[index] = rErr
return
}
buffer = nil
errs[index] = nil
}(index, disk)
// Read all of 'uploads.json'
buffer, rErr := readAll(disk, minioMetaBucket, uploadJSONPath)
if rErr != nil {
return uploadsV1{}, rErr
}
// Wait for all the routines.
wg.Wait()
// Return for first error.
for _, err = range errs {
if err != nil {
return uploadsV1{}, err
}
rErr = json.Unmarshal(buffer, &uploadIDs)
if rErr != nil {
return uploadsV1{}, rErr
}
// FIXME: Do not know if it should pick the picks the first successful one and returns.
return uploads[0], nil
return uploadIDs, nil
}
// uploadUploadsJSON - update `uploads.json` with new uploadsJSON for all disks.
// updateUploadsJSON - update `uploads.json` with new uploadsJSON for all disks.
func updateUploadsJSON(bucket, object string, uploadsJSON uploadsV1, storageDisks ...StorageAPI) error {
uploadsPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile)
uniqueID := getUUID()
@@ -178,7 +148,10 @@ func writeUploadJSON(bucket, object, uploadID string, initiated time.Time, stora
var wg = &sync.WaitGroup{}
var uploadsJSON uploadsV1
uploadsJSON, err = readUploadsJSON(bucket, object, storageDisks...)
for _, disk := range storageDisks {
uploadsJSON, err = readUploadsJSON(bucket, object, disk)
break
}
if err != nil {
// For any other errors.
if err != errFileNotFound {
@@ -206,6 +179,7 @@ func writeUploadJSON(bucket, object, uploadID string, initiated time.Time, stora
errs[index] = wErr
return
}
// Write `uploads.json` to disk.
n, wErr := disk.AppendFile(minioMetaBucket, tmpUploadsPath, uploadsJSONBytes)
if wErr != nil {
errs[index] = wErr
@@ -312,184 +286,33 @@ func listMultipartUploadIDs(bucketName, objectName, uploadIDMarker string, count
// Returns if the prefix is a multipart upload.
func (xl xlObjects) isMultipartUpload(bucket, prefix string) bool {
disk := xl.getRandomDisk() // Choose a random disk.
_, err := disk.StatFile(bucket, pathJoin(prefix, uploadsJSONFile))
return err == nil
for _, disk := range xl.getLoadBalancedQuorumDisks() {
_, err := disk.StatFile(bucket, pathJoin(prefix, uploadsJSONFile))
if err != nil {
return false
}
break
}
return true
}
// listUploadsInfo - list all uploads info.
func (xl xlObjects) listUploadsInfo(prefixPath string) (uploadsInfo []uploadInfo, err error) {
disk := xl.getRandomDisk() // Choose a random disk on each attempt.
splitPrefixes := strings.SplitN(prefixPath, "/", 3)
uploadsJSON, err := readUploadsJSON(splitPrefixes[1], splitPrefixes[2], disk)
if err != nil {
if err == errFileNotFound {
return []uploadInfo{}, nil
for _, disk := range xl.getLoadBalancedQuorumDisks() {
splitPrefixes := strings.SplitN(prefixPath, "/", 3)
uploadsJSON, err := readUploadsJSON(splitPrefixes[1], splitPrefixes[2], disk)
if err != nil {
if err == errFileNotFound {
return []uploadInfo{}, nil
}
return nil, err
}
return nil, err
uploadsInfo = uploadsJSON.Uploads
break
}
uploadsInfo = uploadsJSON.Uploads
return uploadsInfo, nil
}
// listMultipartUploads - lists all multipart uploads.
func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) {
result := ListMultipartsInfo{}
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return ListMultipartsInfo{}, BucketNameInvalid{Bucket: bucket}
}
if !xl.isBucketExist(bucket) {
return ListMultipartsInfo{}, BucketNotFound{Bucket: bucket}
}
if !IsValidObjectPrefix(prefix) {
return ListMultipartsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: prefix}
}
// Verify if delimiter is anything other than '/', which we do not support.
if delimiter != "" && delimiter != slashSeparator {
return ListMultipartsInfo{}, UnsupportedDelimiter{
Delimiter: delimiter,
}
}
// Verify if marker has prefix.
if keyMarker != "" && !strings.HasPrefix(keyMarker, prefix) {
return ListMultipartsInfo{}, InvalidMarkerPrefixCombination{
Marker: keyMarker,
Prefix: prefix,
}
}
if uploadIDMarker != "" {
if strings.HasSuffix(keyMarker, slashSeparator) {
return result, InvalidUploadIDKeyCombination{
UploadIDMarker: uploadIDMarker,
KeyMarker: keyMarker,
}
}
id, err := uuid.Parse(uploadIDMarker)
if err != nil {
return result, err
}
if id.IsZero() {
return result, MalformedUploadID{
UploadID: uploadIDMarker,
}
}
}
recursive := true
if delimiter == slashSeparator {
recursive = false
}
result.IsTruncated = true
result.MaxUploads = maxUploads
result.KeyMarker = keyMarker
result.Prefix = prefix
result.Delimiter = delimiter
// Not using path.Join() as it strips off the trailing '/'.
multipartPrefixPath := pathJoin(mpartMetaPrefix, bucket, prefix)
if prefix == "" {
// Should have a trailing "/" if prefix is ""
// For ex. multipartPrefixPath should be "multipart/bucket/" if prefix is ""
multipartPrefixPath += slashSeparator
}
multipartMarkerPath := ""
if keyMarker != "" {
multipartMarkerPath = pathJoin(mpartMetaPrefix, bucket, keyMarker)
}
var uploads []uploadMetadata
var err error
var eof bool
if uploadIDMarker != "" {
nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, keyMarker))
uploads, _, err = listMultipartUploadIDs(bucket, keyMarker, uploadIDMarker, maxUploads, xl.getRandomDisk())
nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, keyMarker))
if err != nil {
return ListMultipartsInfo{}, err
}
maxUploads = maxUploads - len(uploads)
}
if maxUploads > 0 {
walker := xl.lookupTreeWalk(listParams{minioMetaBucket, recursive, multipartMarkerPath, multipartPrefixPath})
if walker == nil {
walker = xl.startTreeWalk(minioMetaBucket, multipartPrefixPath, multipartMarkerPath, recursive, xl.isMultipartUpload)
}
for maxUploads > 0 {
walkResult, ok := <-walker.ch
if !ok {
// Closed channel.
eof = true
break
}
// For any walk error return right away.
if walkResult.err != nil {
// File not found or Disk not found is a valid case.
if walkResult.err == errFileNotFound || walkResult.err == errDiskNotFound {
continue
}
return ListMultipartsInfo{}, err
}
entry := strings.TrimPrefix(walkResult.entry, retainSlash(pathJoin(mpartMetaPrefix, bucket)))
if strings.HasSuffix(walkResult.entry, slashSeparator) {
uploads = append(uploads, uploadMetadata{
Object: entry,
})
maxUploads--
if maxUploads == 0 {
if walkResult.end {
eof = true
break
}
}
continue
}
var newUploads []uploadMetadata
var end bool
uploadIDMarker = ""
nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, entry))
newUploads, end, err = listMultipartUploadIDs(bucket, entry, uploadIDMarker, maxUploads, xl.getRandomDisk())
nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, entry))
if err != nil {
if err == errFileNotFound || walkResult.err == errDiskNotFound {
continue
}
return ListMultipartsInfo{}, err
}
uploads = append(uploads, newUploads...)
maxUploads -= len(newUploads)
if walkResult.end && end {
eof = true
break
}
}
}
// Loop through all the received uploads fill in the multiparts result.
for _, upload := range uploads {
var objectName string
var uploadID string
if strings.HasSuffix(upload.Object, slashSeparator) {
// All directory entries are common prefixes.
uploadID = "" // Upload ids are empty for CommonPrefixes.
objectName = upload.Object
result.CommonPrefixes = append(result.CommonPrefixes, objectName)
} else {
uploadID = upload.UploadID
objectName = upload.Object
result.Uploads = append(result.Uploads, upload)
}
result.NextKeyMarker = objectName
result.NextUploadIDMarker = uploadID
}
result.IsTruncated = !eof
if !result.IsTruncated {
result.NextKeyMarker = ""
result.NextUploadIDMarker = ""
}
return result, nil
}
// isUploadIDExists - verify if a given uploadID exists and is valid.
func (xl xlObjects) isUploadIDExists(bucket, object, uploadID string) bool {
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID)