diff --git a/fs-objects-multipart.go b/fs-objects-multipart.go index 6593bfbaf..289c4538e 100644 --- a/fs-objects-multipart.go +++ b/fs-objects-multipart.go @@ -58,7 +58,7 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload return "", InvalidUploadID{UploadID: uploadID} } - tempObj := path.Join(tmpMetaPrefix, bucket, object, uploadID) + tempObj := path.Join(tmpMetaPrefix, bucket, object, uploadID, incompleteFile) fileWriter, err := fs.storage.CreateFile(minioMetaBucket, tempObj) if err != nil { return "", toObjectErr(err, bucket, object) @@ -67,8 +67,8 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload var md5Sums []string for _, part := range parts { // Construct part suffix. - partSuffix := fmt.Sprintf("%s.%.5d.%s", uploadID, part.PartNumber, part.ETag) - multipartPartFile := path.Join(mpartMetaPrefix, bucket, object, partSuffix) + partSuffix := fmt.Sprintf("%.5d.%s", part.PartNumber, part.ETag) + multipartPartFile := path.Join(mpartMetaPrefix, bucket, object, uploadID, partSuffix) var fileReader io.ReadCloser fileReader, err = fs.storage.ReadFile(minioMetaBucket, multipartPartFile, 0) if err != nil { diff --git a/fs-objects.go b/fs-objects.go index 39a646a87..1b553b9b5 100644 --- a/fs-objects.go +++ b/fs-objects.go @@ -32,7 +32,6 @@ type fsObjects struct { listObjectMapMutex *sync.Mutex } -// FIXME: constructor should return a pointer. // newFSObjects - initialize new fs object layer. func newFSObjects(exportPath string) (ObjectLayer, error) { var storage StorageAPI diff --git a/object-common-multipart.go b/object-common-multipart.go index 63e1c8e56..2252778f3 100644 --- a/object-common-multipart.go +++ b/object-common-multipart.go @@ -31,6 +31,42 @@ import ( "github.com/skyrings/skyring-common/tools/uuid" ) +const ( + incompleteFile = "00000.incomplete" + uploadsJSONFile = "uploads.json" +) + +// createUploadsJSON - create uploads.json placeholder file. +func createUploadsJSON(storage StorageAPI, bucket, object, uploadID string) error { + // Place holder uploads.json + uploadsPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile) + tmpUploadsPath := path.Join(tmpMetaPrefix, bucket, object, uploadID, uploadsJSONFile) + w, err := storage.CreateFile(minioMetaBucket, uploadsPath) + if err != nil { + return err + } + if err = w.Close(); err != nil { + if clErr := safeCloseAndRemove(w); clErr != nil { + return clErr + } + return err + } + _, err = storage.StatFile(minioMetaBucket, uploadsPath) + if err != nil { + if err == errFileNotFound { + err = storage.RenameFile(minioMetaBucket, tmpUploadsPath, minioMetaBucket, uploadsPath) + if err == nil { + return nil + } + } + if derr := storage.DeleteFile(minioMetaBucket, tmpUploadsPath); derr != nil { + return derr + } + return err + } + return nil +} + /// Common multipart object layer functions. // newMultipartUploadCommon - initialize a new multipart, is a common @@ -59,8 +95,13 @@ func newMultipartUploadCommon(storage StorageAPI, bucket string, object string) return "", err } uploadID := uuid.String() - uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) - tempUploadIDPath := path.Join(tmpMetaPrefix, bucket, object, uploadID) + // Create placeholder file 'uploads.json' + err = createUploadsJSON(storage, bucket, object, uploadID) + if err != nil { + return "", err + } + uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, incompleteFile) + tempUploadIDPath := path.Join(tmpMetaPrefix, bucket, object, uploadID, incompleteFile) if _, err = storage.StatFile(minioMetaBucket, uploadIDPath); err != nil { if err != errFileNotFound { return "", toObjectErr(err, minioMetaBucket, uploadIDPath) @@ -114,7 +155,7 @@ func putObjectPartCommon(storage StorageAPI, bucket string, object string, uploa return "", InvalidUploadID{UploadID: uploadID} } - partSuffix := fmt.Sprintf("%s.%d", uploadID, partID) + partSuffix := fmt.Sprintf("%s.%.5d", uploadID, partID) partSuffixPath := path.Join(tmpMetaPrefix, bucket, object, partSuffix) fileWriter, err := storage.CreateFile(minioMetaBucket, partSuffixPath) if err != nil { @@ -170,8 +211,8 @@ func putObjectPartCommon(storage StorageAPI, bucket string, object string, uploa return "", err } - partSuffixMD5 := fmt.Sprintf("%s.%.5d.%s", uploadID, partID, newMD5Hex) - partSuffixMD5Path := path.Join(mpartMetaPrefix, bucket, object, partSuffixMD5) + partSuffixMD5 := fmt.Sprintf("%.5d.%s", partID, newMD5Hex) + partSuffixMD5Path := path.Join(mpartMetaPrefix, bucket, object, uploadID, partSuffixMD5) err = storage.RenameFile(minioMetaBucket, partSuffixPath, minioMetaBucket, partSuffixMD5Path) if err != nil { if derr := storage.DeleteFile(minioMetaBucket, partSuffixPath); derr != nil { @@ -190,16 +231,14 @@ func cleanupAllTmpEntries(storage StorageAPI) error { // Wrapper to which removes all the uploaded parts after a successful // complete multipart upload. func cleanupUploadedParts(storage StorageAPI, prefix, bucket, object, uploadID string) error { - multipartDir := path.Join(prefix, bucket, object) + multipartDir := path.Join(prefix, bucket, object, uploadID) entries, err := storage.ListDir(minioMetaBucket, multipartDir) if err != nil { return err } for _, entry := range entries { - if strings.HasPrefix(entry, uploadID) { - if err = storage.DeleteFile(minioMetaBucket, path.Join(multipartDir, entry)); err != nil { - return err - } + if err = storage.DeleteFile(minioMetaBucket, path.Join(multipartDir, entry)); err != nil { + return err } } return nil @@ -223,20 +262,39 @@ func abortMultipartUploadCommon(storage StorageAPI, bucket, object, uploadID str return cleanupUploadedParts(storage, mpartMetaPrefix, bucket, object, uploadID) } +// isIncompleteMultipart - is object incomplete multipart. +func isIncompleteMultipart(storage StorageAPI, objectPath string) (bool, error) { + _, err := storage.StatFile(minioMetaBucket, path.Join(objectPath, uploadsJSONFile)) + if err != nil { + if err == errFileNotFound { + return false, nil + } + return false, err + } + return true, nil +} + // listLeafEntries - lists all entries if a given prefixPath is a leaf // directory, returns error if any - returns empty list if prefixPath // is not a leaf directory. func listLeafEntries(storage StorageAPI, prefixPath string) (entries []string, err error) { + var ok bool + if ok, err = isIncompleteMultipart(storage, prefixPath); err != nil { + return nil, err + } else if !ok { + return nil, nil + } entries, err = storage.ListDir(minioMetaBucket, prefixPath) if err != nil { return nil, err } + var newEntries []string for _, entry := range entries { if strings.HasSuffix(entry, slashSeparator) { - return nil, nil + newEntries = append(newEntries, entry) } } - return entries, nil + return newEntries, nil } // listMetaBucketMultipartFiles - list all files at a given prefix inside minioMetaBucket. @@ -299,49 +357,32 @@ func listMetaBucketMultipartFiles(layer ObjectLayer, prefixPath string, markerPa // We reach here for non-recursive case and a leaf entry. sort.Strings(entries) for _, entry := range entries { - if strings.ContainsRune(entry, '.') { - continue - } var fileInfo FileInfo - fileInfo, err = storage.StatFile(minioMetaBucket, path.Join(fi.Name, entry)) + incompleteUploadFile := path.Join(fi.Name, entry, incompleteFile) + fileInfo, err = storage.StatFile(minioMetaBucket, incompleteUploadFile) if err != nil { return nil, false, err } + fileInfo.Name = path.Join(fi.Name, entry) fileInfos = append(fileInfos, fileInfo) - newMaxKeys++ - // If we have reached the maxKeys, it means we have listed - // everything that was requested. Return right here. - if newMaxKeys == maxKeys { - // Return values: - // allFileInfos : "maxKeys" number of entries. - // eof : eof returned by fs.storage.ListFiles() - // error : nil - return - } } } else { // We reach here for a non-recursive case non-leaf entry - // OR recursive case with fi.Name matching pattern bucket/object/uploadID[.partNum.md5sum] + // OR recursive case with fi.Name. if !fi.Mode.IsDir() { // Do not skip non-recursive case directory entries. - // Skip files matching pattern bucket/object/uploadID.partNum.md5sum - // and retain files matching pattern bucket/object/uploadID - specialFile := path.Base(fi.Name) - if strings.Contains(specialFile, ".") { - // Contains partnumber and md5sum info, skip this. + // Validate if 'fi.Name' is incomplete multipart. + if !strings.HasSuffix(fi.Name, incompleteFile) { continue } + fi.Name = path.Dir(fi.Name) } fileInfos = append(fileInfos, fi) - newMaxKeys++ - // If we have reached the maxKeys, it means we have listed - // everything that was requested. Return right here. - if newMaxKeys == maxKeys { - // Return values: - // allFileInfos : "maxKeys" number of entries. - // eof : eof returned by fs.storage.ListFiles() - // error : nil - return - } + } + newMaxKeys++ + // If we have reached the maxKeys, it means we have listed + // everything that was requested. Return right here. + if newMaxKeys == maxKeys { + return } } @@ -465,35 +506,29 @@ func listObjectPartsCommon(storage StorageAPI, bucket, object, uploadID string, return ListPartsInfo{}, (InvalidUploadID{UploadID: uploadID}) } result := ListPartsInfo{} - entries, err := storage.ListDir(minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object)) + entries, err := storage.ListDir(minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object, uploadID)) if err != nil { return result, err } sort.Strings(entries) var newEntries []string for _, entry := range entries { - if !strings.Contains(entry, ".") { - continue - } - if !strings.HasPrefix(entry, uploadID) { - continue - } - newEntries = append(newEntries, entry) + newEntries = append(newEntries, path.Base(entry)) } - idx := sort.SearchStrings(newEntries, fmt.Sprintf("%s.%.5d.", uploadID, partNumberMarker+1)) + idx := sort.SearchStrings(newEntries, fmt.Sprintf("%.5d.", partNumberMarker+1)) newEntries = newEntries[idx:] count := maxParts for _, entry := range newEntries { - fi, err := storage.StatFile(minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object, entry)) + fi, err := storage.StatFile(minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object, uploadID, entry)) splitEntry := strings.Split(entry, ".") - partNum, err := strconv.Atoi(splitEntry[1]) + partNum, err := strconv.Atoi(splitEntry[0]) if err != nil { return ListPartsInfo{}, err } result.Parts = append(result.Parts, partInfo{ PartNumber: partNum, LastModified: fi.ModTime, - ETag: splitEntry[2], + ETag: splitEntry[1], Size: fi.Size, }) count-- @@ -513,7 +548,7 @@ func listObjectPartsCommon(storage StorageAPI, bucket, object, uploadID string, // isUploadIDExists - verify if a given uploadID exists and is valid. func isUploadIDExists(storage StorageAPI, bucket, object, uploadID string) (bool, error) { - uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) + uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, incompleteFile) st, err := storage.StatFile(minioMetaBucket, uploadIDPath) if err != nil { // Upload id does not exist. diff --git a/object-handlers.go b/object-handlers.go index 6834e2104..9e4e92eca 100644 --- a/object-handlers.go +++ b/object-handlers.go @@ -758,7 +758,8 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http // Close the writer. writer.Close() }() - partMD5, err = api.ObjectAPI.PutObjectPart(bucket, object, uploadID, partID, size, reader, hex.EncodeToString(md5Bytes)) + md5SumHex := hex.EncodeToString(md5Bytes) + partMD5, err = api.ObjectAPI.PutObjectPart(bucket, object, uploadID, partID, size, reader, md5SumHex) } if err != nil { errorIf(err, "PutObjectPart failed.", nil) diff --git a/tree-walk.go b/tree-walk.go index 8219984d7..85f2e4103 100644 --- a/tree-walk.go +++ b/tree-walk.go @@ -131,8 +131,13 @@ func treeWalk(layer ObjectLayer, bucket, prefixDir, entryPrefixMatch, marker str // For XL multipart files strip the trailing "/" and append ".minio.multipart" to the entry so that // entryToFileInfo() can call StatFile for regular files or getMultipartObjectInfo() for multipart files. for i, entry := range entries { - if isXL && strings.HasSuffix(entry, slashSeparator) && isLeafDirectory(disk, bucket, path.Join(prefixDir, entry)) { - entries[i] = strings.TrimSuffix(entry, slashSeparator) + multipartSuffix + if isXL && strings.HasSuffix(entry, slashSeparator) { + if ok, err := isMultipartObject(disk, bucket, path.Join(prefixDir, entry)); err != nil { + send(treeWalkResult{err: err}) + return false + } else if ok { + entries[i] = strings.TrimSuffix(entry, slashSeparator) + multipartSuffix + } } } sort.Sort(byMultipartFiles(entries)) @@ -146,7 +151,9 @@ func treeWalk(layer ObjectLayer, bucket, prefixDir, entryPrefixMatch, marker str // example: // If markerDir="four/" Search() returns the index of "four/" in the sorted // entries list so we skip all the entries till "four/" - idx := sort.Search(len(entries), func(i int) bool { return strings.TrimSuffix(entries[i], multipartSuffix) >= markerDir }) + idx := sort.Search(len(entries), func(i int) bool { + return strings.TrimSuffix(entries[i], multipartSuffix) >= markerDir + }) entries = entries[idx:] *count += len(entries) for i, entry := range entries { diff --git a/xl-objects-multipart.go b/xl-objects-multipart.go index 806df8602..ee867f155 100644 --- a/xl-objects-multipart.go +++ b/xl-objects-multipart.go @@ -111,8 +111,8 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload var md5Sums []string for _, part := range parts { // Construct part suffix. - partSuffix := fmt.Sprintf("%s.%.5d.%s", uploadID, part.PartNumber, part.ETag) - multipartPartFile := path.Join(mpartMetaPrefix, bucket, object, partSuffix) + partSuffix := fmt.Sprintf("%.5d.%s", part.PartNumber, part.ETag) + multipartPartFile := path.Join(mpartMetaPrefix, bucket, object, uploadID, partSuffix) fi, err := xl.storage.StatFile(minioMetaBucket, multipartPartFile) if err != nil { if err == errFileNotFound { @@ -179,8 +179,8 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload // Attempt a rename of the upload id to temporary location, if // successful then delete it. - uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) - tempUploadIDPath := path.Join(tmpMetaPrefix, bucket, object, uploadID) + uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, incompleteFile) + tempUploadIDPath := path.Join(tmpMetaPrefix, bucket, object, uploadID, incompleteFile) if err = xl.storage.RenameFile(minioMetaBucket, uploadIDPath, minioMetaBucket, tempUploadIDPath); err == nil { if err = xl.storage.DeleteFile(minioMetaBucket, tempUploadIDPath); err != nil { return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath) @@ -192,6 +192,13 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload if err != nil { return "", toObjectErr(err, minioMetaBucket, uploadIDPath) } + + uploadsJSONPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile) + err = xl.storage.DeleteFile(minioMetaBucket, uploadsJSONPath) + if err != nil { + return "", toObjectErr(err, minioMetaBucket, uploadsJSONPath) + } + // Return md5sum. return s3MD5, nil } diff --git a/xl-objects.go b/xl-objects.go index c3ba7ebe6..642f609b2 100644 --- a/xl-objects.go +++ b/xl-objects.go @@ -40,11 +40,6 @@ type xlObjects struct { listObjectMapMutex *sync.Mutex } -func isLeafDirectory(disk StorageAPI, volume, leafPath string) bool { - _, err := disk.StatFile(volume, pathJoin(leafPath, multipartMetaFile)) - return err == nil -} - // isValidFormat - validates input arguments with backend 'format.json' func isValidFormat(storage StorageAPI, exportPaths ...string) bool { // Load saved XL format.json and validate. @@ -70,7 +65,6 @@ func isValidFormat(storage StorageAPI, exportPaths ...string) bool { return true } -// FIXME: constructor should return a pointer. // newXLObjects - initialize new xl object layer. func newXLObjects(exportPaths ...string) (ObjectLayer, error) { storage, err := newXL(exportPaths...) @@ -289,9 +283,19 @@ func (xl xlObjects) DeleteObject(bucket, object string) error { return toObjectErr(err, bucket, object) } // Range through all files and delete it. + var wg = &sync.WaitGroup{} + var errChs = make([]chan error, len(info.Parts)) for _, part := range info.Parts { - err = xl.storage.DeleteFile(bucket, pathJoin(object, partNumToPartFileName(part.PartNumber))) - if err != nil { + wg.Add(1) + go func(part MultipartPartInfo) { + defer wg.Done() + err = xl.storage.DeleteFile(bucket, pathJoin(object, partNumToPartFileName(part.PartNumber))) + errChs[part.PartNumber-1] <- err + }(part) + } + wg.Wait() + for _, errCh := range errChs { + if err = <-errCh; err != nil { return toObjectErr(err, bucket, object) } } @@ -302,6 +306,7 @@ func (xl xlObjects) DeleteObject(bucket, object string) error { return nil } +// ListObjects - list all objects at prefix, delimited by '/'. func (xl xlObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) { return listObjectsCommon(xl, bucket, prefix, marker, delimiter, maxKeys) }