xl/fs: Multipart re-org introduce "uploads.json" (#1505)

Fixes #1457
This commit is contained in:
Harshavardhana 2016-05-07 02:08:03 -07:00 committed by Anand Babu (AB) Periasamy
parent 434423de89
commit 751fa972f5
7 changed files with 129 additions and 75 deletions

View File

@ -58,7 +58,7 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
return "", InvalidUploadID{UploadID: uploadID}
}
tempObj := path.Join(tmpMetaPrefix, bucket, object, uploadID)
tempObj := path.Join(tmpMetaPrefix, bucket, object, uploadID, incompleteFile)
fileWriter, err := fs.storage.CreateFile(minioMetaBucket, tempObj)
if err != nil {
return "", toObjectErr(err, bucket, object)
@ -67,8 +67,8 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
var md5Sums []string
for _, part := range parts {
// Construct part suffix.
partSuffix := fmt.Sprintf("%s.%.5d.%s", uploadID, part.PartNumber, part.ETag)
multipartPartFile := path.Join(mpartMetaPrefix, bucket, object, partSuffix)
partSuffix := fmt.Sprintf("%.5d.%s", part.PartNumber, part.ETag)
multipartPartFile := path.Join(mpartMetaPrefix, bucket, object, uploadID, partSuffix)
var fileReader io.ReadCloser
fileReader, err = fs.storage.ReadFile(minioMetaBucket, multipartPartFile, 0)
if err != nil {

View File

@ -32,7 +32,6 @@ type fsObjects struct {
listObjectMapMutex *sync.Mutex
}
// FIXME: constructor should return a pointer.
// newFSObjects - initialize new fs object layer.
func newFSObjects(exportPath string) (ObjectLayer, error) {
var storage StorageAPI

View File

@ -31,6 +31,42 @@ import (
"github.com/skyrings/skyring-common/tools/uuid"
)
const (
incompleteFile = "00000.incomplete"
uploadsJSONFile = "uploads.json"
)
// createUploadsJSON - create uploads.json placeholder file.
func createUploadsJSON(storage StorageAPI, bucket, object, uploadID string) error {
// Place holder uploads.json
uploadsPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile)
tmpUploadsPath := path.Join(tmpMetaPrefix, bucket, object, uploadID, uploadsJSONFile)
w, err := storage.CreateFile(minioMetaBucket, uploadsPath)
if err != nil {
return err
}
if err = w.Close(); err != nil {
if clErr := safeCloseAndRemove(w); clErr != nil {
return clErr
}
return err
}
_, err = storage.StatFile(minioMetaBucket, uploadsPath)
if err != nil {
if err == errFileNotFound {
err = storage.RenameFile(minioMetaBucket, tmpUploadsPath, minioMetaBucket, uploadsPath)
if err == nil {
return nil
}
}
if derr := storage.DeleteFile(minioMetaBucket, tmpUploadsPath); derr != nil {
return derr
}
return err
}
return nil
}
/// Common multipart object layer functions.
// newMultipartUploadCommon - initialize a new multipart, is a common
@ -59,8 +95,13 @@ func newMultipartUploadCommon(storage StorageAPI, bucket string, object string)
return "", err
}
uploadID := uuid.String()
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID)
tempUploadIDPath := path.Join(tmpMetaPrefix, bucket, object, uploadID)
// Create placeholder file 'uploads.json'
err = createUploadsJSON(storage, bucket, object, uploadID)
if err != nil {
return "", err
}
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, incompleteFile)
tempUploadIDPath := path.Join(tmpMetaPrefix, bucket, object, uploadID, incompleteFile)
if _, err = storage.StatFile(minioMetaBucket, uploadIDPath); err != nil {
if err != errFileNotFound {
return "", toObjectErr(err, minioMetaBucket, uploadIDPath)
@ -114,7 +155,7 @@ func putObjectPartCommon(storage StorageAPI, bucket string, object string, uploa
return "", InvalidUploadID{UploadID: uploadID}
}
partSuffix := fmt.Sprintf("%s.%d", uploadID, partID)
partSuffix := fmt.Sprintf("%s.%.5d", uploadID, partID)
partSuffixPath := path.Join(tmpMetaPrefix, bucket, object, partSuffix)
fileWriter, err := storage.CreateFile(minioMetaBucket, partSuffixPath)
if err != nil {
@ -170,8 +211,8 @@ func putObjectPartCommon(storage StorageAPI, bucket string, object string, uploa
return "", err
}
partSuffixMD5 := fmt.Sprintf("%s.%.5d.%s", uploadID, partID, newMD5Hex)
partSuffixMD5Path := path.Join(mpartMetaPrefix, bucket, object, partSuffixMD5)
partSuffixMD5 := fmt.Sprintf("%.5d.%s", partID, newMD5Hex)
partSuffixMD5Path := path.Join(mpartMetaPrefix, bucket, object, uploadID, partSuffixMD5)
err = storage.RenameFile(minioMetaBucket, partSuffixPath, minioMetaBucket, partSuffixMD5Path)
if err != nil {
if derr := storage.DeleteFile(minioMetaBucket, partSuffixPath); derr != nil {
@ -190,16 +231,14 @@ func cleanupAllTmpEntries(storage StorageAPI) error {
// Wrapper to which removes all the uploaded parts after a successful
// complete multipart upload.
func cleanupUploadedParts(storage StorageAPI, prefix, bucket, object, uploadID string) error {
multipartDir := path.Join(prefix, bucket, object)
multipartDir := path.Join(prefix, bucket, object, uploadID)
entries, err := storage.ListDir(minioMetaBucket, multipartDir)
if err != nil {
return err
}
for _, entry := range entries {
if strings.HasPrefix(entry, uploadID) {
if err = storage.DeleteFile(minioMetaBucket, path.Join(multipartDir, entry)); err != nil {
return err
}
if err = storage.DeleteFile(minioMetaBucket, path.Join(multipartDir, entry)); err != nil {
return err
}
}
return nil
@ -223,20 +262,39 @@ func abortMultipartUploadCommon(storage StorageAPI, bucket, object, uploadID str
return cleanupUploadedParts(storage, mpartMetaPrefix, bucket, object, uploadID)
}
// isIncompleteMultipart - is object incomplete multipart.
func isIncompleteMultipart(storage StorageAPI, objectPath string) (bool, error) {
_, err := storage.StatFile(minioMetaBucket, path.Join(objectPath, uploadsJSONFile))
if err != nil {
if err == errFileNotFound {
return false, nil
}
return false, err
}
return true, nil
}
// listLeafEntries - lists all entries if a given prefixPath is a leaf
// directory, returns error if any - returns empty list if prefixPath
// is not a leaf directory.
func listLeafEntries(storage StorageAPI, prefixPath string) (entries []string, err error) {
var ok bool
if ok, err = isIncompleteMultipart(storage, prefixPath); err != nil {
return nil, err
} else if !ok {
return nil, nil
}
entries, err = storage.ListDir(minioMetaBucket, prefixPath)
if err != nil {
return nil, err
}
var newEntries []string
for _, entry := range entries {
if strings.HasSuffix(entry, slashSeparator) {
return nil, nil
newEntries = append(newEntries, entry)
}
}
return entries, nil
return newEntries, nil
}
// listMetaBucketMultipartFiles - list all files at a given prefix inside minioMetaBucket.
@ -299,49 +357,32 @@ func listMetaBucketMultipartFiles(layer ObjectLayer, prefixPath string, markerPa
// We reach here for non-recursive case and a leaf entry.
sort.Strings(entries)
for _, entry := range entries {
if strings.ContainsRune(entry, '.') {
continue
}
var fileInfo FileInfo
fileInfo, err = storage.StatFile(minioMetaBucket, path.Join(fi.Name, entry))
incompleteUploadFile := path.Join(fi.Name, entry, incompleteFile)
fileInfo, err = storage.StatFile(minioMetaBucket, incompleteUploadFile)
if err != nil {
return nil, false, err
}
fileInfo.Name = path.Join(fi.Name, entry)
fileInfos = append(fileInfos, fileInfo)
newMaxKeys++
// If we have reached the maxKeys, it means we have listed
// everything that was requested. Return right here.
if newMaxKeys == maxKeys {
// Return values:
// allFileInfos : "maxKeys" number of entries.
// eof : eof returned by fs.storage.ListFiles()
// error : nil
return
}
}
} else {
// We reach here for a non-recursive case non-leaf entry
// OR recursive case with fi.Name matching pattern bucket/object/uploadID[.partNum.md5sum]
// OR recursive case with fi.Name.
if !fi.Mode.IsDir() { // Do not skip non-recursive case directory entries.
// Skip files matching pattern bucket/object/uploadID.partNum.md5sum
// and retain files matching pattern bucket/object/uploadID
specialFile := path.Base(fi.Name)
if strings.Contains(specialFile, ".") {
// Contains partnumber and md5sum info, skip this.
// Validate if 'fi.Name' is incomplete multipart.
if !strings.HasSuffix(fi.Name, incompleteFile) {
continue
}
fi.Name = path.Dir(fi.Name)
}
fileInfos = append(fileInfos, fi)
newMaxKeys++
// If we have reached the maxKeys, it means we have listed
// everything that was requested. Return right here.
if newMaxKeys == maxKeys {
// Return values:
// allFileInfos : "maxKeys" number of entries.
// eof : eof returned by fs.storage.ListFiles()
// error : nil
return
}
}
newMaxKeys++
// If we have reached the maxKeys, it means we have listed
// everything that was requested. Return right here.
if newMaxKeys == maxKeys {
return
}
}
@ -465,35 +506,29 @@ func listObjectPartsCommon(storage StorageAPI, bucket, object, uploadID string,
return ListPartsInfo{}, (InvalidUploadID{UploadID: uploadID})
}
result := ListPartsInfo{}
entries, err := storage.ListDir(minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object))
entries, err := storage.ListDir(minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object, uploadID))
if err != nil {
return result, err
}
sort.Strings(entries)
var newEntries []string
for _, entry := range entries {
if !strings.Contains(entry, ".") {
continue
}
if !strings.HasPrefix(entry, uploadID) {
continue
}
newEntries = append(newEntries, entry)
newEntries = append(newEntries, path.Base(entry))
}
idx := sort.SearchStrings(newEntries, fmt.Sprintf("%s.%.5d.", uploadID, partNumberMarker+1))
idx := sort.SearchStrings(newEntries, fmt.Sprintf("%.5d.", partNumberMarker+1))
newEntries = newEntries[idx:]
count := maxParts
for _, entry := range newEntries {
fi, err := storage.StatFile(minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object, entry))
fi, err := storage.StatFile(minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object, uploadID, entry))
splitEntry := strings.Split(entry, ".")
partNum, err := strconv.Atoi(splitEntry[1])
partNum, err := strconv.Atoi(splitEntry[0])
if err != nil {
return ListPartsInfo{}, err
}
result.Parts = append(result.Parts, partInfo{
PartNumber: partNum,
LastModified: fi.ModTime,
ETag: splitEntry[2],
ETag: splitEntry[1],
Size: fi.Size,
})
count--
@ -513,7 +548,7 @@ func listObjectPartsCommon(storage StorageAPI, bucket, object, uploadID string,
// isUploadIDExists - verify if a given uploadID exists and is valid.
func isUploadIDExists(storage StorageAPI, bucket, object, uploadID string) (bool, error) {
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID)
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, incompleteFile)
st, err := storage.StatFile(minioMetaBucket, uploadIDPath)
if err != nil {
// Upload id does not exist.

View File

@ -758,7 +758,8 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http
// Close the writer.
writer.Close()
}()
partMD5, err = api.ObjectAPI.PutObjectPart(bucket, object, uploadID, partID, size, reader, hex.EncodeToString(md5Bytes))
md5SumHex := hex.EncodeToString(md5Bytes)
partMD5, err = api.ObjectAPI.PutObjectPart(bucket, object, uploadID, partID, size, reader, md5SumHex)
}
if err != nil {
errorIf(err, "PutObjectPart failed.", nil)

View File

@ -131,8 +131,13 @@ func treeWalk(layer ObjectLayer, bucket, prefixDir, entryPrefixMatch, marker str
// For XL multipart files strip the trailing "/" and append ".minio.multipart" to the entry so that
// entryToFileInfo() can call StatFile for regular files or getMultipartObjectInfo() for multipart files.
for i, entry := range entries {
if isXL && strings.HasSuffix(entry, slashSeparator) && isLeafDirectory(disk, bucket, path.Join(prefixDir, entry)) {
entries[i] = strings.TrimSuffix(entry, slashSeparator) + multipartSuffix
if isXL && strings.HasSuffix(entry, slashSeparator) {
if ok, err := isMultipartObject(disk, bucket, path.Join(prefixDir, entry)); err != nil {
send(treeWalkResult{err: err})
return false
} else if ok {
entries[i] = strings.TrimSuffix(entry, slashSeparator) + multipartSuffix
}
}
}
sort.Sort(byMultipartFiles(entries))
@ -146,7 +151,9 @@ func treeWalk(layer ObjectLayer, bucket, prefixDir, entryPrefixMatch, marker str
// example:
// If markerDir="four/" Search() returns the index of "four/" in the sorted
// entries list so we skip all the entries till "four/"
idx := sort.Search(len(entries), func(i int) bool { return strings.TrimSuffix(entries[i], multipartSuffix) >= markerDir })
idx := sort.Search(len(entries), func(i int) bool {
return strings.TrimSuffix(entries[i], multipartSuffix) >= markerDir
})
entries = entries[idx:]
*count += len(entries)
for i, entry := range entries {

View File

@ -111,8 +111,8 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
var md5Sums []string
for _, part := range parts {
// Construct part suffix.
partSuffix := fmt.Sprintf("%s.%.5d.%s", uploadID, part.PartNumber, part.ETag)
multipartPartFile := path.Join(mpartMetaPrefix, bucket, object, partSuffix)
partSuffix := fmt.Sprintf("%.5d.%s", part.PartNumber, part.ETag)
multipartPartFile := path.Join(mpartMetaPrefix, bucket, object, uploadID, partSuffix)
fi, err := xl.storage.StatFile(minioMetaBucket, multipartPartFile)
if err != nil {
if err == errFileNotFound {
@ -179,8 +179,8 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
// Attempt a rename of the upload id to temporary location, if
// successful then delete it.
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID)
tempUploadIDPath := path.Join(tmpMetaPrefix, bucket, object, uploadID)
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, incompleteFile)
tempUploadIDPath := path.Join(tmpMetaPrefix, bucket, object, uploadID, incompleteFile)
if err = xl.storage.RenameFile(minioMetaBucket, uploadIDPath, minioMetaBucket, tempUploadIDPath); err == nil {
if err = xl.storage.DeleteFile(minioMetaBucket, tempUploadIDPath); err != nil {
return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath)
@ -192,6 +192,13 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
if err != nil {
return "", toObjectErr(err, minioMetaBucket, uploadIDPath)
}
uploadsJSONPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile)
err = xl.storage.DeleteFile(minioMetaBucket, uploadsJSONPath)
if err != nil {
return "", toObjectErr(err, minioMetaBucket, uploadsJSONPath)
}
// Return md5sum.
return s3MD5, nil
}

View File

@ -40,11 +40,6 @@ type xlObjects struct {
listObjectMapMutex *sync.Mutex
}
func isLeafDirectory(disk StorageAPI, volume, leafPath string) bool {
_, err := disk.StatFile(volume, pathJoin(leafPath, multipartMetaFile))
return err == nil
}
// isValidFormat - validates input arguments with backend 'format.json'
func isValidFormat(storage StorageAPI, exportPaths ...string) bool {
// Load saved XL format.json and validate.
@ -70,7 +65,6 @@ func isValidFormat(storage StorageAPI, exportPaths ...string) bool {
return true
}
// FIXME: constructor should return a pointer.
// newXLObjects - initialize new xl object layer.
func newXLObjects(exportPaths ...string) (ObjectLayer, error) {
storage, err := newXL(exportPaths...)
@ -289,9 +283,19 @@ func (xl xlObjects) DeleteObject(bucket, object string) error {
return toObjectErr(err, bucket, object)
}
// Range through all files and delete it.
var wg = &sync.WaitGroup{}
var errChs = make([]chan error, len(info.Parts))
for _, part := range info.Parts {
err = xl.storage.DeleteFile(bucket, pathJoin(object, partNumToPartFileName(part.PartNumber)))
if err != nil {
wg.Add(1)
go func(part MultipartPartInfo) {
defer wg.Done()
err = xl.storage.DeleteFile(bucket, pathJoin(object, partNumToPartFileName(part.PartNumber)))
errChs[part.PartNumber-1] <- err
}(part)
}
wg.Wait()
for _, errCh := range errChs {
if err = <-errCh; err != nil {
return toObjectErr(err, bucket, object)
}
}
@ -302,6 +306,7 @@ func (xl xlObjects) DeleteObject(bucket, object string) error {
return nil
}
// ListObjects - list all objects at prefix, delimited by '/'.
func (xl xlObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) {
return listObjectsCommon(xl, bucket, prefix, marker, delimiter, maxKeys)
}