mirror of
https://github.com/minio/minio.git
synced 2025-01-11 23:13:23 -05:00
Remove uploadIDChange structure. (#3309)
addUploadID() and removeUploadID() are wrappers to updateUploadJSON() which is called with respective arguments.
This commit is contained in:
parent
339c9019b9
commit
71b357e4f2
@ -16,7 +16,10 @@
|
|||||||
|
|
||||||
package cmd
|
package cmd
|
||||||
|
|
||||||
import "path"
|
import (
|
||||||
|
"path"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
// Returns if the prefix is a multipart upload.
|
// Returns if the prefix is a multipart upload.
|
||||||
func (fs fsObjects) isMultipartUpload(bucket, prefix string) bool {
|
func (fs fsObjects) isMultipartUpload(bucket, prefix string) bool {
|
||||||
@ -52,11 +55,10 @@ func (fs fsObjects) isUploadIDExists(bucket, object, uploadID string) bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// writeUploadJSON - create `uploads.json` or update it with new uploadID.
|
// updateUploadJSON - add or remove upload ID info in all `uploads.json`.
|
||||||
func (fs fsObjects) updateUploadJSON(bucket, object string, uCh uploadIDChange) error {
|
func (fs fsObjects) updateUploadJSON(bucket, object, uploadID string, initiated time.Time, isRemove bool) error {
|
||||||
uploadsPath := path.Join(bucket, object, uploadsJSONFile)
|
uploadsPath := path.Join(bucket, object, uploadsJSONFile)
|
||||||
uniqueID := getUUID()
|
tmpUploadsPath := getUUID()
|
||||||
tmpUploadsPath := uniqueID
|
|
||||||
|
|
||||||
uploadsJSON, err := readUploadsJSON(bucket, object, fs.storage)
|
uploadsJSON, err := readUploadsJSON(bucket, object, fs.storage)
|
||||||
if errorCause(err) == errFileNotFound {
|
if errorCause(err) == errFileNotFound {
|
||||||
@ -69,12 +71,12 @@ func (fs fsObjects) updateUploadJSON(bucket, object string, uCh uploadIDChange)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// update the uploadsJSON struct
|
// update the uploadsJSON struct
|
||||||
if !uCh.isRemove {
|
if !isRemove {
|
||||||
// Add the uploadID
|
// Add the uploadID
|
||||||
uploadsJSON.AddUploadID(uCh.uploadID, uCh.initiated)
|
uploadsJSON.AddUploadID(uploadID, initiated)
|
||||||
} else {
|
} else {
|
||||||
// Remove the upload ID
|
// Remove the upload ID
|
||||||
uploadsJSON.RemoveUploadID(uCh.uploadID)
|
uploadsJSON.RemoveUploadID(uploadID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// update the file or delete it?
|
// update the file or delete it?
|
||||||
@ -88,3 +90,13 @@ func (fs fsObjects) updateUploadJSON(bucket, object string, uCh uploadIDChange)
|
|||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// addUploadID - add upload ID and its initiated time to 'uploads.json'.
|
||||||
|
func (fs fsObjects) addUploadID(bucket, object string, uploadID string, initiated time.Time) error {
|
||||||
|
return fs.updateUploadJSON(bucket, object, uploadID, initiated, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
// removeUploadID - remove upload ID in 'uploads.json'.
|
||||||
|
func (fs fsObjects) removeUploadID(bucket, object string, uploadID string) error {
|
||||||
|
return fs.updateUploadJSON(bucket, object, uploadID, time.Time{}, true)
|
||||||
|
}
|
||||||
|
@ -122,7 +122,7 @@ func TestFSWriteUploadJSON(t *testing.T) {
|
|||||||
t.Fatal("Unexpected err: ", err)
|
t.Fatal("Unexpected err: ", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := fs.updateUploadJSON(bucketName, objectName, uploadIDChange{uploadID, time.Now().UTC(), false}); err != nil {
|
if err := fs.addUploadID(bucketName, objectName, uploadID, time.Now().UTC()); err != nil {
|
||||||
t.Fatal("Unexpected err: ", err)
|
t.Fatal("Unexpected err: ", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -131,8 +131,7 @@ func TestFSWriteUploadJSON(t *testing.T) {
|
|||||||
for i := 1; i <= 3; i++ {
|
for i := 1; i <= 3; i++ {
|
||||||
naughty := newNaughtyDisk(fsStorage, map[int]error{i: errFaultyDisk}, nil)
|
naughty := newNaughtyDisk(fsStorage, map[int]error{i: errFaultyDisk}, nil)
|
||||||
fs.storage = naughty
|
fs.storage = naughty
|
||||||
if err := fs.updateUploadJSON(bucketName, objectName,
|
if err := fs.addUploadID(bucketName, objectName, uploadID, time.Now().UTC()); errorCause(err) != errFaultyDisk {
|
||||||
uploadIDChange{uploadID, time.Now().UTC(), false}); errorCause(err) != errFaultyDisk {
|
|
||||||
t.Fatal("Unexpected err: ", err)
|
t.Fatal("Unexpected err: ", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -238,8 +238,8 @@ func (fs fsObjects) newMultipartUpload(bucket string, object string, meta map[st
|
|||||||
|
|
||||||
uploadID = getUUID()
|
uploadID = getUUID()
|
||||||
initiated := time.Now().UTC()
|
initiated := time.Now().UTC()
|
||||||
// Create 'uploads.json'
|
// Add upload ID to uploads.json
|
||||||
if err = fs.updateUploadJSON(bucket, object, uploadIDChange{uploadID, initiated, false}); err != nil {
|
if err = fs.addUploadID(bucket, object, uploadID, initiated); err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
uploadIDPath := path.Join(bucket, object, uploadID)
|
uploadIDPath := path.Join(bucket, object, uploadID)
|
||||||
@ -685,7 +685,7 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
|
|||||||
defer objectMPartPathLock.Unlock()
|
defer objectMPartPathLock.Unlock()
|
||||||
|
|
||||||
// remove entry from uploads.json
|
// remove entry from uploads.json
|
||||||
if err = fs.updateUploadJSON(bucket, object, uploadIDChange{uploadID: uploadID, isRemove: true}); err != nil {
|
if err := fs.removeUploadID(bucket, object, uploadID); err != nil {
|
||||||
return "", toObjectErr(err, minioMetaMultipartBucket, path.Join(bucket, object))
|
return "", toObjectErr(err, minioMetaMultipartBucket, path.Join(bucket, object))
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -703,8 +703,9 @@ func (fs fsObjects) abortMultipartUpload(bucket, object, uploadID string) error
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
fs.bgAppend.remove(uploadID)
|
fs.bgAppend.remove(uploadID)
|
||||||
// remove entry from uploads.json with quorum
|
|
||||||
if err := fs.updateUploadJSON(bucket, object, uploadIDChange{uploadID: uploadID, isRemove: true}); err != nil {
|
// remove upload ID in uploads.json
|
||||||
|
if err := fs.removeUploadID(bucket, object, uploadID); err != nil {
|
||||||
return toObjectErr(err, bucket, object)
|
return toObjectErr(err, bucket, object)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -93,17 +93,6 @@ func newUploadsV1(format string) uploadsV1 {
|
|||||||
return uploadIDs
|
return uploadIDs
|
||||||
}
|
}
|
||||||
|
|
||||||
// uploadIDChange - represents a change to uploads.json - either add
|
|
||||||
// or remove an upload id.
|
|
||||||
type uploadIDChange struct {
|
|
||||||
// the id being added or removed.
|
|
||||||
uploadID string
|
|
||||||
// time of upload start. only used in uploadid add operations.
|
|
||||||
initiated time.Time
|
|
||||||
// if true, removes uploadID and ignores initiated time.
|
|
||||||
isRemove bool
|
|
||||||
}
|
|
||||||
|
|
||||||
func writeUploadJSON(u *uploadsV1, uploadsPath, tmpPath string, disk StorageAPI) error {
|
func writeUploadJSON(u *uploadsV1, uploadsPath, tmpPath string, disk StorageAPI) error {
|
||||||
// Serialize to prepare to write to disk.
|
// Serialize to prepare to write to disk.
|
||||||
uplBytes, wErr := json.Marshal(&u)
|
uplBytes, wErr := json.Marshal(&u)
|
||||||
|
@ -19,14 +19,13 @@ package cmd
|
|||||||
import (
|
import (
|
||||||
"path"
|
"path"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// writeUploadJSON - create `uploads.json` or update it with change
|
// updateUploadJSON - add or remove upload ID info in all `uploads.json`.
|
||||||
// described in uCh.
|
func (xl xlObjects) updateUploadJSON(bucket, object, uploadID string, initiated time.Time, isRemove bool) error {
|
||||||
func (xl xlObjects) updateUploadJSON(bucket, object string, uCh uploadIDChange) error {
|
|
||||||
uploadsPath := path.Join(bucket, object, uploadsJSONFile)
|
uploadsPath := path.Join(bucket, object, uploadsJSONFile)
|
||||||
uniqueID := getUUID()
|
tmpUploadsPath := getUUID()
|
||||||
tmpUploadsPath := uniqueID
|
|
||||||
|
|
||||||
// slice to store errors from disks
|
// slice to store errors from disks
|
||||||
errs := make([]error, len(xl.storageDisks))
|
errs := make([]error, len(xl.storageDisks))
|
||||||
@ -58,12 +57,12 @@ func (xl xlObjects) updateUploadJSON(bucket, object string, uCh uploadIDChange)
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if !uCh.isRemove {
|
if !isRemove {
|
||||||
// Add the uploadID
|
// Add the uploadID
|
||||||
uploadsJSON.AddUploadID(uCh.uploadID, uCh.initiated)
|
uploadsJSON.AddUploadID(uploadID, initiated)
|
||||||
} else {
|
} else {
|
||||||
// Remove the upload ID
|
// Remove the upload ID
|
||||||
uploadsJSON.RemoveUploadID(uCh.uploadID)
|
uploadsJSON.RemoveUploadID(uploadID)
|
||||||
if len(uploadsJSON.Uploads) == 0 {
|
if len(uploadsJSON.Uploads) == 0 {
|
||||||
isDelete[index] = true
|
isDelete[index] = true
|
||||||
}
|
}
|
||||||
@ -146,6 +145,16 @@ func (xl xlObjects) updateUploadJSON(bucket, object string, uCh uploadIDChange)
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// addUploadID - add upload ID and its initiated time to 'uploads.json'.
|
||||||
|
func (xl xlObjects) addUploadID(bucket, object string, uploadID string, initiated time.Time) error {
|
||||||
|
return xl.updateUploadJSON(bucket, object, uploadID, initiated, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
// removeUploadID - remove upload ID in 'uploads.json'.
|
||||||
|
func (xl xlObjects) removeUploadID(bucket, object string, uploadID string) error {
|
||||||
|
return xl.updateUploadJSON(bucket, object, uploadID, time.Time{}, true)
|
||||||
|
}
|
||||||
|
|
||||||
// Returns if the prefix is a multipart upload.
|
// Returns if the prefix is a multipart upload.
|
||||||
func (xl xlObjects) isMultipartUpload(bucket, prefix string) bool {
|
func (xl xlObjects) isMultipartUpload(bucket, prefix string) bool {
|
||||||
for _, disk := range xl.getLoadBalancedDisks() {
|
for _, disk := range xl.getLoadBalancedDisks() {
|
||||||
|
@ -44,17 +44,19 @@ func TestUpdateUploadJSON(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
uCh uploadIDChange
|
uploadID string
|
||||||
|
initiated time.Time
|
||||||
|
isRemove bool
|
||||||
errVal error
|
errVal error
|
||||||
}{
|
}{
|
||||||
{uploadIDChange{"111abc", time.Now().UTC(), false}, nil},
|
{"111abc", time.Now().UTC(), false, nil},
|
||||||
{uploadIDChange{"222abc", time.Now().UTC(), false}, nil},
|
{"222abc", time.Now().UTC(), false, nil},
|
||||||
{uploadIDChange{uploadID: "111abc", isRemove: true}, nil},
|
{"111abc", time.Time{}, true, nil},
|
||||||
}
|
}
|
||||||
|
|
||||||
xl := obj.(*xlObjects)
|
xl := obj.(*xlObjects)
|
||||||
for i, test := range testCases {
|
for i, test := range testCases {
|
||||||
testErrVal := xl.updateUploadJSON(bucket, object, test.uCh)
|
testErrVal := xl.updateUploadJSON(bucket, object, test.uploadID, test.initiated, test.isRemove)
|
||||||
if testErrVal != test.errVal {
|
if testErrVal != test.errVal {
|
||||||
t.Errorf("Test %d: Expected error value %v, but got %v",
|
t.Errorf("Test %d: Expected error value %v, but got %v",
|
||||||
i+1, test.errVal, testErrVal)
|
i+1, test.errVal, testErrVal)
|
||||||
@ -66,7 +68,7 @@ func TestUpdateUploadJSON(t *testing.T) {
|
|||||||
xl.storageDisks[i] = newNaughtyDisk(xl.storageDisks[i].(*posix), nil, errFaultyDisk)
|
xl.storageDisks[i] = newNaughtyDisk(xl.storageDisks[i].(*posix), nil, errFaultyDisk)
|
||||||
}
|
}
|
||||||
|
|
||||||
testErrVal := xl.updateUploadJSON(bucket, object, uploadIDChange{"222abc", time.Now().UTC(), false})
|
testErrVal := xl.updateUploadJSON(bucket, object, "222abc", time.Now().UTC(), false)
|
||||||
if testErrVal == nil || testErrVal.Error() != errXLWriteQuorum.Error() {
|
if testErrVal == nil || testErrVal.Error() != errXLWriteQuorum.Error() {
|
||||||
t.Errorf("Expected write quorum error, but got: %v", testErrVal)
|
t.Errorf("Expected write quorum error, but got: %v", testErrVal)
|
||||||
}
|
}
|
||||||
|
@ -306,7 +306,7 @@ func (xl xlObjects) newMultipartUpload(bucket string, object string, meta map[st
|
|||||||
|
|
||||||
initiated := time.Now().UTC()
|
initiated := time.Now().UTC()
|
||||||
// Create or update 'uploads.json'
|
// Create or update 'uploads.json'
|
||||||
if err := xl.updateUploadJSON(bucket, object, uploadIDChange{uploadID, initiated, false}); err != nil {
|
if err := xl.addUploadID(bucket, object, uploadID, initiated); err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
// Return success.
|
// Return success.
|
||||||
@ -839,7 +839,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
|
|||||||
defer objectMPartPathLock.Unlock()
|
defer objectMPartPathLock.Unlock()
|
||||||
|
|
||||||
// remove entry from uploads.json with quorum
|
// remove entry from uploads.json with quorum
|
||||||
if err = xl.updateUploadJSON(bucket, object, uploadIDChange{uploadID: uploadID, isRemove: true}); err != nil {
|
if err = xl.removeUploadID(bucket, object, uploadID); err != nil {
|
||||||
return "", toObjectErr(err, minioMetaMultipartBucket, path.Join(bucket, object))
|
return "", toObjectErr(err, minioMetaMultipartBucket, path.Join(bucket, object))
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -865,7 +865,7 @@ func (xl xlObjects) abortMultipartUpload(bucket, object, uploadID string) (err e
|
|||||||
defer objectMPartPathLock.Unlock()
|
defer objectMPartPathLock.Unlock()
|
||||||
|
|
||||||
// remove entry from uploads.json with quorum
|
// remove entry from uploads.json with quorum
|
||||||
if err = xl.updateUploadJSON(bucket, object, uploadIDChange{uploadID: uploadID, isRemove: true}); err != nil {
|
if err = xl.removeUploadID(bucket, object, uploadID); err != nil {
|
||||||
return toObjectErr(err, bucket, object)
|
return toObjectErr(err, bucket, object)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user