mirror of
https://github.com/minio/minio.git
synced 2025-05-23 02:21:51 -04:00
- Reads and writes of uploads.json in XL now uses quorum for newMultipart, completeMultipart and abortMultipart operations. - Each disk's `uploads.json` file is read and updated independently for adding or removing an upload id from the file. Quorum is used to decide if the high-level operation actually succeeded. - Refactor FS code to simplify the flow, and fix a bug while reading uploads.json.
This commit is contained in:
parent
90e1803798
commit
dd6ecf1193
@ -16,11 +16,7 @@
|
|||||||
|
|
||||||
package cmd
|
package cmd
|
||||||
|
|
||||||
import (
|
import "path"
|
||||||
"encoding/json"
|
|
||||||
"path"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Returns if the prefix is a multipart upload.
|
// Returns if the prefix is a multipart upload.
|
||||||
func (fs fsObjects) isMultipartUpload(bucket, prefix string) bool {
|
func (fs fsObjects) isMultipartUpload(bucket, prefix string) bool {
|
||||||
@ -57,56 +53,38 @@ func (fs fsObjects) isUploadIDExists(bucket, object, uploadID string) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// writeUploadJSON - create `uploads.json` or update it with new uploadID.
|
// writeUploadJSON - create `uploads.json` or update it with new uploadID.
|
||||||
func (fs fsObjects) writeUploadJSON(bucket, object, uploadID string, initiated time.Time) (err error) {
|
func (fs fsObjects) updateUploadJSON(bucket, object string, uCh uploadIDChange) error {
|
||||||
uploadsPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile)
|
uploadsPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile)
|
||||||
uniqueID := getUUID()
|
uniqueID := getUUID()
|
||||||
tmpUploadsPath := path.Join(tmpMetaPrefix, uniqueID)
|
tmpUploadsPath := path.Join(tmpMetaPrefix, uniqueID)
|
||||||
var uploadsJSON uploadsV1
|
|
||||||
uploadsJSON, err = readUploadsJSON(bucket, object, fs.storage)
|
uploadsJSON, err := readUploadsJSON(bucket, object, fs.storage)
|
||||||
|
if errorCause(err) == errFileNotFound {
|
||||||
|
// If file is not found, we assume a default (empty)
|
||||||
|
// upload info.
|
||||||
|
uploadsJSON, err = newUploadsV1("fs"), nil
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// uploads.json might not exist hence ignore errFileNotFound.
|
|
||||||
if errorCause(err) != errFileNotFound {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// Set uploads format to `fs`.
|
|
||||||
uploadsJSON = newUploadsV1("fs")
|
|
||||||
}
|
|
||||||
// Add a new upload id.
|
|
||||||
uploadsJSON.AddUploadID(uploadID, initiated)
|
|
||||||
|
|
||||||
// Update `uploads.json` on all disks.
|
// update the uploadsJSON struct
|
||||||
uploadsJSONBytes, wErr := json.Marshal(&uploadsJSON)
|
if !uCh.isRemove {
|
||||||
if wErr != nil {
|
// Add the uploadID
|
||||||
return traceError(wErr)
|
uploadsJSON.AddUploadID(uCh.uploadID, uCh.initiated)
|
||||||
}
|
} else {
|
||||||
// Write `uploads.json` to disk.
|
// Remove the upload ID
|
||||||
if wErr = fs.storage.AppendFile(minioMetaBucket, tmpUploadsPath, uploadsJSONBytes); wErr != nil {
|
uploadsJSON.RemoveUploadID(uCh.uploadID)
|
||||||
return traceError(wErr)
|
|
||||||
}
|
|
||||||
wErr = fs.storage.RenameFile(minioMetaBucket, tmpUploadsPath, minioMetaBucket, uploadsPath)
|
|
||||||
if wErr != nil {
|
|
||||||
if dErr := fs.storage.DeleteFile(minioMetaBucket, tmpUploadsPath); dErr != nil {
|
|
||||||
return traceError(dErr)
|
|
||||||
}
|
|
||||||
return traceError(wErr)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// updateUploadsJSON - update `uploads.json` with new uploadsJSON for all disks.
|
// update the file or delete it?
|
||||||
func (fs fsObjects) updateUploadsJSON(bucket, object string, uploadsJSON uploadsV1) (err error) {
|
if len(uploadsJSON.Uploads) > 0 {
|
||||||
uploadsPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile)
|
err = writeUploadJSON(&uploadsJSON, uploadsPath, tmpUploadsPath, fs.storage)
|
||||||
uniqueID := getUUID()
|
} else {
|
||||||
tmpUploadsPath := path.Join(tmpMetaPrefix, uniqueID)
|
// no uploads, so we delete the file.
|
||||||
uploadsBytes, wErr := json.Marshal(uploadsJSON)
|
if err = fs.storage.DeleteFile(minioMetaBucket, uploadsPath); err != nil {
|
||||||
if wErr != nil {
|
return toObjectErr(traceError(err), minioMetaBucket, uploadsPath)
|
||||||
return traceError(wErr)
|
|
||||||
}
|
}
|
||||||
if wErr = fs.storage.AppendFile(minioMetaBucket, tmpUploadsPath, uploadsBytes); wErr != nil {
|
|
||||||
return traceError(wErr)
|
|
||||||
}
|
}
|
||||||
if wErr = fs.storage.RenameFile(minioMetaBucket, tmpUploadsPath, minioMetaBucket, uploadsPath); wErr != nil {
|
return err
|
||||||
return traceError(wErr)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
@ -122,7 +122,7 @@ func TestFSWriteUploadJSON(t *testing.T) {
|
|||||||
t.Fatal("Unexpected err: ", err)
|
t.Fatal("Unexpected err: ", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := fs.writeUploadJSON(bucketName, objectName, uploadID, time.Now().UTC()); err != nil {
|
if err := fs.updateUploadJSON(bucketName, objectName, uploadIDChange{uploadID, time.Now().UTC(), false}); err != nil {
|
||||||
t.Fatal("Unexpected err: ", err)
|
t.Fatal("Unexpected err: ", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -131,36 +131,8 @@ func TestFSWriteUploadJSON(t *testing.T) {
|
|||||||
for i := 1; i <= 3; i++ {
|
for i := 1; i <= 3; i++ {
|
||||||
naughty := newNaughtyDisk(fsStorage, map[int]error{i: errFaultyDisk}, nil)
|
naughty := newNaughtyDisk(fsStorage, map[int]error{i: errFaultyDisk}, nil)
|
||||||
fs.storage = naughty
|
fs.storage = naughty
|
||||||
if err := fs.writeUploadJSON(bucketName, objectName, uploadID, time.Now().UTC()); errorCause(err) != errFaultyDisk {
|
if err := fs.updateUploadJSON(bucketName, objectName,
|
||||||
t.Fatal("Unexpected err: ", err)
|
uploadIDChange{uploadID, time.Now().UTC(), false}); errorCause(err) != errFaultyDisk {
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestFSUpdateUploadsJSON - tests for updateUploadsJSON for FS
|
|
||||||
func TestFSUpdateUploadsJSON(t *testing.T) {
|
|
||||||
// Prepare for tests
|
|
||||||
disk := filepath.Join(os.TempDir(), "minio-"+nextSuffix())
|
|
||||||
defer removeAll(disk)
|
|
||||||
|
|
||||||
obj := initFSObjects(disk, t)
|
|
||||||
fs := obj.(fsObjects)
|
|
||||||
|
|
||||||
bucketName := "bucket"
|
|
||||||
objectName := "object"
|
|
||||||
|
|
||||||
obj.MakeBucket(bucketName)
|
|
||||||
|
|
||||||
if err := fs.updateUploadsJSON(bucketName, objectName, uploadsV1{}); err != nil {
|
|
||||||
t.Fatal("Unexpected err: ", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// isUploadIdExists with a faulty disk should return false
|
|
||||||
fsStorage := fs.storage.(*posix)
|
|
||||||
for i := 1; i <= 2; i++ {
|
|
||||||
naughty := newNaughtyDisk(fsStorage, map[int]error{i: errFaultyDisk}, nil)
|
|
||||||
fs.storage = naughty
|
|
||||||
if err := fs.updateUploadsJSON(bucketName, objectName, uploadsV1{}); errorCause(err) != errFaultyDisk {
|
|
||||||
t.Fatal("Unexpected err: ", err)
|
t.Fatal("Unexpected err: ", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -243,7 +243,7 @@ func (fs fsObjects) newMultipartUpload(bucket string, object string, meta map[st
|
|||||||
uploadID = getUUID()
|
uploadID = getUUID()
|
||||||
initiated := time.Now().UTC()
|
initiated := time.Now().UTC()
|
||||||
// Create 'uploads.json'
|
// Create 'uploads.json'
|
||||||
if err = fs.writeUploadJSON(bucket, object, uploadID, initiated); err != nil {
|
if err = fs.updateUploadJSON(bucket, object, uploadIDChange{uploadID, initiated, false}); err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID)
|
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID)
|
||||||
@ -791,29 +791,10 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
|
|||||||
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID)
|
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID)
|
||||||
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID)
|
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID)
|
||||||
|
|
||||||
// Validate if there are other incomplete upload-id's present for
|
// remove entry from uploads.json
|
||||||
// the object, if yes do not attempt to delete 'uploads.json'.
|
if err = fs.updateUploadJSON(bucket, object, uploadIDChange{uploadID: uploadID, isRemove: true}); err != nil {
|
||||||
uploadsJSON, err := readUploadsJSON(bucket, object, fs.storage)
|
|
||||||
if err != nil {
|
|
||||||
return "", toObjectErr(err, minioMetaBucket, object)
|
|
||||||
}
|
|
||||||
// If we have successfully read `uploads.json`, then we proceed to
|
|
||||||
// purge or update `uploads.json`.
|
|
||||||
uploadIDIdx := uploadsJSON.Index(uploadID)
|
|
||||||
if uploadIDIdx != -1 {
|
|
||||||
uploadsJSON.Uploads = append(uploadsJSON.Uploads[:uploadIDIdx], uploadsJSON.Uploads[uploadIDIdx+1:]...)
|
|
||||||
}
|
|
||||||
if len(uploadsJSON.Uploads) > 0 {
|
|
||||||
if err = fs.updateUploadsJSON(bucket, object, uploadsJSON); err != nil {
|
|
||||||
return "", toObjectErr(err, minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object))
|
return "", toObjectErr(err, minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object))
|
||||||
}
|
}
|
||||||
// Return success.
|
|
||||||
return s3MD5, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = fs.storage.DeleteFile(minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile)); err != nil {
|
|
||||||
return "", toObjectErr(traceError(err), minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Return md5sum.
|
// Return md5sum.
|
||||||
return s3MD5, nil
|
return s3MD5, nil
|
||||||
@ -829,28 +810,12 @@ func (fs fsObjects) abortMultipartUpload(bucket, object, uploadID string) error
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate if there are other incomplete upload-id's present for
|
// remove entry from uploads.json with quorum
|
||||||
// the object, if yes do not attempt to delete 'uploads.json'.
|
if err := fs.updateUploadJSON(bucket, object, uploadIDChange{uploadID: uploadID, isRemove: true}); err != nil {
|
||||||
uploadsJSON, err := readUploadsJSON(bucket, object, fs.storage)
|
|
||||||
if err == nil {
|
|
||||||
uploadIDIdx := uploadsJSON.Index(uploadID)
|
|
||||||
if uploadIDIdx != -1 {
|
|
||||||
uploadsJSON.Uploads = append(uploadsJSON.Uploads[:uploadIDIdx], uploadsJSON.Uploads[uploadIDIdx+1:]...)
|
|
||||||
}
|
|
||||||
// There are pending uploads for the same object, preserve
|
|
||||||
// them update 'uploads.json' in-place.
|
|
||||||
if len(uploadsJSON.Uploads) > 0 {
|
|
||||||
err = fs.updateUploadsJSON(bucket, object, uploadsJSON)
|
|
||||||
if err != nil {
|
|
||||||
return toObjectErr(err, bucket, object)
|
return toObjectErr(err, bucket, object)
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
}
|
// success
|
||||||
} // No more pending uploads for the object, we purge the entire
|
|
||||||
// entry at '.minio.sys/multipart/bucket/object'.
|
|
||||||
if err = fs.storage.DeleteFile(minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile)); err != nil {
|
|
||||||
return toObjectErr(traceError(err), minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object))
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -56,6 +56,17 @@ func (u *uploadsV1) AddUploadID(uploadID string, initiated time.Time) {
|
|||||||
sort.Sort(byInitiatedTime(u.Uploads))
|
sort.Sort(byInitiatedTime(u.Uploads))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RemoveUploadID - removes upload id from uploads metadata.
|
||||||
|
func (u *uploadsV1) RemoveUploadID(uploadID string) {
|
||||||
|
// If the uploadID is absent, we do nothing.
|
||||||
|
for i, uInfo := range u.Uploads {
|
||||||
|
if uInfo.UploadID == uploadID {
|
||||||
|
u.Uploads = append(u.Uploads[:i], u.Uploads[i+1:]...)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Index - returns the index of matching the upload id.
|
// Index - returns the index of matching the upload id.
|
||||||
func (u uploadsV1) Index(uploadID string) int {
|
func (u uploadsV1) Index(uploadID string) int {
|
||||||
for i, u := range u.Uploads {
|
for i, u := range u.Uploads {
|
||||||
@ -92,6 +103,40 @@ func newUploadsV1(format string) uploadsV1 {
|
|||||||
return uploadIDs
|
return uploadIDs
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// uploadIDChange - represents a change to uploads.json - either add
|
||||||
|
// or remove an upload id.
|
||||||
|
type uploadIDChange struct {
|
||||||
|
// the id being added or removed.
|
||||||
|
uploadID string
|
||||||
|
// time of upload start. only used in uploadid add operations.
|
||||||
|
initiated time.Time
|
||||||
|
// if true, removes uploadID and ignores initiated time.
|
||||||
|
isRemove bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func writeUploadJSON(u *uploadsV1, uploadsPath, tmpPath string, disk StorageAPI) error {
|
||||||
|
// Serialize to prepare to write to disk.
|
||||||
|
uplBytes, wErr := json.Marshal(&u)
|
||||||
|
if wErr != nil {
|
||||||
|
return traceError(wErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write `uploads.json` to disk. First to tmp location and
|
||||||
|
// then rename.
|
||||||
|
if wErr = disk.AppendFile(minioMetaBucket, tmpPath, uplBytes); wErr != nil {
|
||||||
|
return traceError(wErr)
|
||||||
|
}
|
||||||
|
wErr = disk.RenameFile(minioMetaBucket, tmpPath, minioMetaBucket, uploadsPath)
|
||||||
|
if wErr != nil {
|
||||||
|
if dErr := disk.DeleteFile(minioMetaBucket, tmpPath); dErr != nil {
|
||||||
|
// we return the most recent error.
|
||||||
|
return traceError(dErr)
|
||||||
|
}
|
||||||
|
return traceError(wErr)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Wrapper which removes all the uploaded parts.
|
// Wrapper which removes all the uploaded parts.
|
||||||
func cleanupUploadedParts(bucket, object, uploadID string, storageDisks ...StorageAPI) error {
|
func cleanupUploadedParts(bucket, object, uploadID string, storageDisks ...StorageAPI) error {
|
||||||
var errs = make([]error, len(storageDisks))
|
var errs = make([]error, len(storageDisks))
|
||||||
|
@ -17,172 +17,129 @@
|
|||||||
package cmd
|
package cmd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
|
||||||
"path"
|
"path"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// updateUploadsJSON - update `uploads.json` with new uploadsJSON for all disks.
|
// writeUploadJSON - create `uploads.json` or update it with change
|
||||||
func (xl xlObjects) updateUploadsJSON(bucket, object string, uploadsJSON uploadsV1) (err error) {
|
// described in uCh.
|
||||||
uploadsPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile)
|
func (xl xlObjects) updateUploadJSON(bucket, object string, uCh uploadIDChange) error {
|
||||||
uniqueID := getUUID()
|
|
||||||
tmpUploadsPath := path.Join(tmpMetaPrefix, uniqueID)
|
|
||||||
var errs = make([]error, len(xl.storageDisks))
|
|
||||||
var wg = &sync.WaitGroup{}
|
|
||||||
|
|
||||||
// Update `uploads.json` for all the disks.
|
|
||||||
for index, disk := range xl.storageDisks {
|
|
||||||
if disk == nil {
|
|
||||||
errs[index] = errDiskNotFound
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
wg.Add(1)
|
|
||||||
// Update `uploads.json` in routine.
|
|
||||||
go func(index int, disk StorageAPI) {
|
|
||||||
defer wg.Done()
|
|
||||||
uploadsBytes, wErr := json.Marshal(uploadsJSON)
|
|
||||||
if wErr != nil {
|
|
||||||
errs[index] = traceError(wErr)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if wErr = disk.AppendFile(minioMetaBucket, tmpUploadsPath, uploadsBytes); wErr != nil {
|
|
||||||
errs[index] = traceError(wErr)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if wErr = disk.RenameFile(minioMetaBucket, tmpUploadsPath, minioMetaBucket, uploadsPath); wErr != nil {
|
|
||||||
errs[index] = traceError(wErr)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}(index, disk)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for all the routines to finish updating `uploads.json`
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
// Count all the errors and validate if we have write quorum.
|
|
||||||
if !isDiskQuorum(errs, xl.writeQuorum) {
|
|
||||||
// Do we have readQuorum?.
|
|
||||||
if isDiskQuorum(errs, xl.readQuorum) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
// Rename `uploads.json` left over back to tmp location.
|
|
||||||
for index, disk := range xl.storageDisks {
|
|
||||||
if disk == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// Undo rename `uploads.json` in parallel.
|
|
||||||
wg.Add(1)
|
|
||||||
go func(index int, disk StorageAPI) {
|
|
||||||
defer wg.Done()
|
|
||||||
if errs[index] != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
_ = disk.RenameFile(minioMetaBucket, uploadsPath, minioMetaBucket, tmpUploadsPath)
|
|
||||||
}(index, disk)
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
return traceError(errXLWriteQuorum)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Reads uploads.json from any of the load balanced disks.
|
|
||||||
func (xl xlObjects) readUploadsJSON(bucket, object string) (uploadsJSON uploadsV1, err error) {
|
|
||||||
for _, disk := range xl.getLoadBalancedDisks() {
|
|
||||||
if disk == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
uploadsJSON, err = readUploadsJSON(bucket, object, disk)
|
|
||||||
if err == nil {
|
|
||||||
return uploadsJSON, nil
|
|
||||||
}
|
|
||||||
if isErrIgnored(err, objMetadataOpIgnoredErrs) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
break
|
|
||||||
}
|
|
||||||
return uploadsV1{}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// writeUploadJSON - create `uploads.json` or update it with new uploadID.
|
|
||||||
func (xl xlObjects) writeUploadJSON(bucket, object, uploadID string, initiated time.Time) (err error) {
|
|
||||||
uploadsPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile)
|
uploadsPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile)
|
||||||
uniqueID := getUUID()
|
uniqueID := getUUID()
|
||||||
tmpUploadsPath := path.Join(tmpMetaPrefix, uniqueID)
|
tmpUploadsPath := path.Join(tmpMetaPrefix, uniqueID)
|
||||||
|
|
||||||
var errs = make([]error, len(xl.storageDisks))
|
// slice to store errors from disks
|
||||||
var wg = &sync.WaitGroup{}
|
errs := make([]error, len(xl.storageDisks))
|
||||||
|
// slice to store if it is a delete operation on a disk
|
||||||
|
isDelete := make([]bool, len(xl.storageDisks))
|
||||||
|
|
||||||
// Reads `uploads.json` and returns error.
|
wg := sync.WaitGroup{}
|
||||||
uploadsJSON, err := xl.readUploadsJSON(bucket, object)
|
|
||||||
if err != nil {
|
|
||||||
if errorCause(err) != errFileNotFound {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
// Set uploads format to `xl` otherwise.
|
|
||||||
uploadsJSON = newUploadsV1("xl")
|
|
||||||
}
|
|
||||||
// Add a new upload id.
|
|
||||||
uploadsJSON.AddUploadID(uploadID, initiated)
|
|
||||||
|
|
||||||
// Update `uploads.json` on all disks.
|
|
||||||
for index, disk := range xl.storageDisks {
|
for index, disk := range xl.storageDisks {
|
||||||
if disk == nil {
|
if disk == nil {
|
||||||
errs[index] = traceError(errDiskNotFound)
|
errs[index] = traceError(errDiskNotFound)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
// Update `uploads.json` in a go routine.
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
// Update `uploads.json` in a routine.
|
|
||||||
go func(index int, disk StorageAPI) {
|
go func(index int, disk StorageAPI) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
uploadsJSONBytes, wErr := json.Marshal(&uploadsJSON)
|
|
||||||
|
// read and parse uploads.json on this disk
|
||||||
|
uploadsJSON, err := readUploadsJSON(bucket, object, disk)
|
||||||
|
if errorCause(err) == errFileNotFound {
|
||||||
|
// If file is not found, we assume an
|
||||||
|
// default (empty) upload info.
|
||||||
|
uploadsJSON, err = newUploadsV1("xl"), nil
|
||||||
|
}
|
||||||
|
// If we have a read error, we store error and
|
||||||
|
// exit.
|
||||||
|
if err != nil {
|
||||||
|
errs[index] = traceError(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if !uCh.isRemove {
|
||||||
|
// Add the uploadID
|
||||||
|
uploadsJSON.AddUploadID(uCh.uploadID, uCh.initiated)
|
||||||
|
} else {
|
||||||
|
// Remove the upload ID
|
||||||
|
uploadsJSON.RemoveUploadID(uCh.uploadID)
|
||||||
|
if len(uploadsJSON.Uploads) == 0 {
|
||||||
|
isDelete[index] = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// For delete, rename to tmp, for the
|
||||||
|
// possibility of recovery in case of quorum
|
||||||
|
// failure.
|
||||||
|
if !isDelete[index] {
|
||||||
|
errs[index] = writeUploadJSON(&uploadsJSON, uploadsPath, tmpUploadsPath, disk)
|
||||||
|
} else {
|
||||||
|
wErr := disk.RenameFile(minioMetaBucket, uploadsPath, minioMetaBucket, tmpUploadsPath)
|
||||||
if wErr != nil {
|
if wErr != nil {
|
||||||
errs[index] = traceError(wErr)
|
errs[index] = traceError(wErr)
|
||||||
return
|
|
||||||
}
|
}
|
||||||
// Write `uploads.json` to disk.
|
|
||||||
if wErr = disk.AppendFile(minioMetaBucket, tmpUploadsPath, uploadsJSONBytes); wErr != nil {
|
|
||||||
errs[index] = traceError(wErr)
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
wErr = disk.RenameFile(minioMetaBucket, tmpUploadsPath, minioMetaBucket, uploadsPath)
|
|
||||||
if wErr != nil {
|
|
||||||
if dErr := disk.DeleteFile(minioMetaBucket, tmpUploadsPath); dErr != nil {
|
|
||||||
errs[index] = traceError(dErr)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
errs[index] = traceError(wErr)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
errs[index] = nil
|
|
||||||
}(index, disk)
|
}(index, disk)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait here for all the writes to finish.
|
// Wait for all the writes to finish.
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
// Do we have write quorum?.
|
// Do we have write quorum?
|
||||||
if !isDiskQuorum(errs, xl.writeQuorum) {
|
if !isDiskQuorum(errs, xl.writeQuorum) {
|
||||||
// Rename `uploads.json` left over back to tmp location.
|
// No quorum. Perform cleanup on the minority of disks
|
||||||
|
// on which the operation succeeded.
|
||||||
|
|
||||||
|
// There are two cases:
|
||||||
|
//
|
||||||
|
// 1. uploads.json file was updated -> we delete the
|
||||||
|
// file that we successfully overwrote on the
|
||||||
|
// minority of disks, so that the failed quorum
|
||||||
|
// operation is not partially visible.
|
||||||
|
//
|
||||||
|
// 2. uploads.json was deleted -> in this case since
|
||||||
|
// the delete failed, we restore from tmp.
|
||||||
for index, disk := range xl.storageDisks {
|
for index, disk := range xl.storageDisks {
|
||||||
if disk == nil {
|
if disk == nil || errs[index] != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// Undo rename `uploads.json` in parallel.
|
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(index int, disk StorageAPI) {
|
go func(index int, disk StorageAPI) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
if errs[index] != nil {
|
if !isDelete[index] {
|
||||||
return
|
_ = disk.DeleteFile(
|
||||||
|
minioMetaBucket,
|
||||||
|
uploadsPath,
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
_ = disk.RenameFile(
|
||||||
|
minioMetaBucket, tmpUploadsPath,
|
||||||
|
minioMetaBucket, uploadsPath,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
_ = disk.RenameFile(minioMetaBucket, uploadsPath, minioMetaBucket, tmpUploadsPath)
|
|
||||||
}(index, disk)
|
}(index, disk)
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
return traceError(errXLWriteQuorum)
|
return traceError(errXLWriteQuorum)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// we do have quorum, so in case of delete upload.json file
|
||||||
|
// operation, we purge from tmp.
|
||||||
|
for index, disk := range xl.storageDisks {
|
||||||
|
if disk == nil || !isDelete[index] {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
wg.Add(1)
|
||||||
|
go func(index int, disk StorageAPI) {
|
||||||
|
defer wg.Done()
|
||||||
|
// isDelete[index] = true at this point.
|
||||||
|
_ = disk.DeleteFile(minioMetaBucket, tmpUploadsPath)
|
||||||
|
}(index, disk)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
// Ignored errors list.
|
// Ignored errors list.
|
||||||
ignoredErrs := []error{
|
ignoredErrs := []error{
|
||||||
errDiskNotFound,
|
errDiskNotFound,
|
||||||
|
@ -306,7 +306,7 @@ func (xl xlObjects) newMultipartUpload(bucket string, object string, meta map[st
|
|||||||
|
|
||||||
initiated := time.Now().UTC()
|
initiated := time.Now().UTC()
|
||||||
// Create or update 'uploads.json'
|
// Create or update 'uploads.json'
|
||||||
if err := xl.writeUploadJSON(bucket, object, uploadID, initiated); err != nil {
|
if err := xl.updateUploadJSON(bucket, object, uploadIDChange{uploadID, initiated, false}); err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
// Return success.
|
// Return success.
|
||||||
@ -832,27 +832,8 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
|
|||||||
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID)
|
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID)
|
||||||
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID)
|
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID)
|
||||||
|
|
||||||
// Validate if there are other incomplete upload-id's present for
|
// remove entry from uploads.json with quorum
|
||||||
// the object, if yes do not attempt to delete 'uploads.json'.
|
if err = xl.updateUploadJSON(bucket, object, uploadIDChange{uploadID: uploadID, isRemove: true}); err != nil {
|
||||||
uploadsJSON, err := xl.readUploadsJSON(bucket, object)
|
|
||||||
if err != nil {
|
|
||||||
return "", toObjectErr(err, minioMetaBucket, object)
|
|
||||||
}
|
|
||||||
// If we have successfully read `uploads.json`, then we proceed to
|
|
||||||
// purge or update `uploads.json`.
|
|
||||||
uploadIDIdx := uploadsJSON.Index(uploadID)
|
|
||||||
if uploadIDIdx != -1 {
|
|
||||||
uploadsJSON.Uploads = append(uploadsJSON.Uploads[:uploadIDIdx], uploadsJSON.Uploads[uploadIDIdx+1:]...)
|
|
||||||
}
|
|
||||||
if len(uploadsJSON.Uploads) > 0 {
|
|
||||||
if err = xl.updateUploadsJSON(bucket, object, uploadsJSON); err != nil {
|
|
||||||
return "", toObjectErr(err, minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object))
|
|
||||||
}
|
|
||||||
// Return success.
|
|
||||||
return s3MD5, nil
|
|
||||||
} // No more pending uploads for the object, proceed to delete
|
|
||||||
// object completely from '.minio.sys/multipart'.
|
|
||||||
if err = xl.deleteObject(minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object)); err != nil {
|
|
||||||
return "", toObjectErr(err, minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object))
|
return "", toObjectErr(err, minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object))
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -875,29 +856,11 @@ func (xl xlObjects) abortMultipartUpload(bucket, object, uploadID string) (err e
|
|||||||
|
|
||||||
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID)
|
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID)
|
||||||
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID)
|
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID)
|
||||||
// Validate if there are other incomplete upload-id's present for
|
|
||||||
// the object, if yes do not attempt to delete 'uploads.json'.
|
// remove entry from uploads.json with quorum
|
||||||
uploadsJSON, err := xl.readUploadsJSON(bucket, object)
|
if err = xl.updateUploadJSON(bucket, object, uploadIDChange{uploadID: uploadID, isRemove: true}); err != nil {
|
||||||
if err != nil {
|
|
||||||
return toObjectErr(err, bucket, object)
|
return toObjectErr(err, bucket, object)
|
||||||
}
|
}
|
||||||
uploadIDIdx := uploadsJSON.Index(uploadID)
|
|
||||||
if uploadIDIdx != -1 {
|
|
||||||
uploadsJSON.Uploads = append(uploadsJSON.Uploads[:uploadIDIdx], uploadsJSON.Uploads[uploadIDIdx+1:]...)
|
|
||||||
}
|
|
||||||
if len(uploadsJSON.Uploads) > 0 {
|
|
||||||
// There are pending uploads for the same object, preserve
|
|
||||||
// them update 'uploads.json' in-place.
|
|
||||||
err = xl.updateUploadsJSON(bucket, object, uploadsJSON)
|
|
||||||
if err != nil {
|
|
||||||
return toObjectErr(err, bucket, object)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
} // No more pending uploads for the object, we purge the entire
|
|
||||||
// entry at '.minio.sys/multipart/bucket/object'.
|
|
||||||
if err = xl.deleteObject(minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object)); err != nil {
|
|
||||||
return toObjectErr(err, minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Successfully purged.
|
// Successfully purged.
|
||||||
return nil
|
return nil
|
||||||
|
Loading…
x
Reference in New Issue
Block a user