From dd6ecf1193fef81914238bd269a88f0c501175f1 Mon Sep 17 00:00:00 2001 From: Aditya Manthramurthy Date: Sun, 30 Oct 2016 09:27:29 -0700 Subject: [PATCH] Read/write quorum algo with uploads.json in xl (Fixes #3123) (#3124) - Reads and writes of uploads.json in XL now uses quorum for newMultipart, completeMultipart and abortMultipart operations. - Each disk's `uploads.json` file is read and updated independently for adding or removing an upload id from the file. Quorum is used to decide if the high-level operation actually succeeded. - Refactor FS code to simplify the flow, and fix a bug while reading uploads.json. --- cmd/fs-v1-multipart-common.go | 74 ++++------ cmd/fs-v1-multipart-common_test.go | 34 +---- cmd/fs-v1-multipart.go | 53 ++------ cmd/object-multipart-common.go | 45 +++++++ cmd/xl-v1-multipart-common.go | 209 ++++++++++++----------------- cmd/xl-v1-multipart.go | 49 +------ 6 files changed, 172 insertions(+), 292 deletions(-) diff --git a/cmd/fs-v1-multipart-common.go b/cmd/fs-v1-multipart-common.go index 3baf39242..156fa1050 100644 --- a/cmd/fs-v1-multipart-common.go +++ b/cmd/fs-v1-multipart-common.go @@ -16,11 +16,7 @@ package cmd -import ( - "encoding/json" - "path" - "time" -) +import "path" // Returns if the prefix is a multipart upload. func (fs fsObjects) isMultipartUpload(bucket, prefix string) bool { @@ -57,56 +53,38 @@ func (fs fsObjects) isUploadIDExists(bucket, object, uploadID string) bool { } // writeUploadJSON - create `uploads.json` or update it with new uploadID. -func (fs fsObjects) writeUploadJSON(bucket, object, uploadID string, initiated time.Time) (err error) { +func (fs fsObjects) updateUploadJSON(bucket, object string, uCh uploadIDChange) error { uploadsPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile) uniqueID := getUUID() tmpUploadsPath := path.Join(tmpMetaPrefix, uniqueID) - var uploadsJSON uploadsV1 - uploadsJSON, err = readUploadsJSON(bucket, object, fs.storage) + + uploadsJSON, err := readUploadsJSON(bucket, object, fs.storage) + if errorCause(err) == errFileNotFound { + // If file is not found, we assume a default (empty) + // upload info. + uploadsJSON, err = newUploadsV1("fs"), nil + } if err != nil { - // uploads.json might not exist hence ignore errFileNotFound. - if errorCause(err) != errFileNotFound { - return err - } - // Set uploads format to `fs`. - uploadsJSON = newUploadsV1("fs") + return err } - // Add a new upload id. - uploadsJSON.AddUploadID(uploadID, initiated) - // Update `uploads.json` on all disks. - uploadsJSONBytes, wErr := json.Marshal(&uploadsJSON) - if wErr != nil { - return traceError(wErr) + // update the uploadsJSON struct + if !uCh.isRemove { + // Add the uploadID + uploadsJSON.AddUploadID(uCh.uploadID, uCh.initiated) + } else { + // Remove the upload ID + uploadsJSON.RemoveUploadID(uCh.uploadID) } - // Write `uploads.json` to disk. - if wErr = fs.storage.AppendFile(minioMetaBucket, tmpUploadsPath, uploadsJSONBytes); wErr != nil { - return traceError(wErr) - } - wErr = fs.storage.RenameFile(minioMetaBucket, tmpUploadsPath, minioMetaBucket, uploadsPath) - if wErr != nil { - if dErr := fs.storage.DeleteFile(minioMetaBucket, tmpUploadsPath); dErr != nil { - return traceError(dErr) - } - return traceError(wErr) - } - return nil -} -// updateUploadsJSON - update `uploads.json` with new uploadsJSON for all disks. -func (fs fsObjects) updateUploadsJSON(bucket, object string, uploadsJSON uploadsV1) (err error) { - uploadsPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile) - uniqueID := getUUID() - tmpUploadsPath := path.Join(tmpMetaPrefix, uniqueID) - uploadsBytes, wErr := json.Marshal(uploadsJSON) - if wErr != nil { - return traceError(wErr) + // update the file or delete it? + if len(uploadsJSON.Uploads) > 0 { + err = writeUploadJSON(&uploadsJSON, uploadsPath, tmpUploadsPath, fs.storage) + } else { + // no uploads, so we delete the file. + if err = fs.storage.DeleteFile(minioMetaBucket, uploadsPath); err != nil { + return toObjectErr(traceError(err), minioMetaBucket, uploadsPath) + } } - if wErr = fs.storage.AppendFile(minioMetaBucket, tmpUploadsPath, uploadsBytes); wErr != nil { - return traceError(wErr) - } - if wErr = fs.storage.RenameFile(minioMetaBucket, tmpUploadsPath, minioMetaBucket, uploadsPath); wErr != nil { - return traceError(wErr) - } - return nil + return err } diff --git a/cmd/fs-v1-multipart-common_test.go b/cmd/fs-v1-multipart-common_test.go index a15b638c3..53392ea6b 100644 --- a/cmd/fs-v1-multipart-common_test.go +++ b/cmd/fs-v1-multipart-common_test.go @@ -122,7 +122,7 @@ func TestFSWriteUploadJSON(t *testing.T) { t.Fatal("Unexpected err: ", err) } - if err := fs.writeUploadJSON(bucketName, objectName, uploadID, time.Now().UTC()); err != nil { + if err := fs.updateUploadJSON(bucketName, objectName, uploadIDChange{uploadID, time.Now().UTC(), false}); err != nil { t.Fatal("Unexpected err: ", err) } @@ -131,36 +131,8 @@ func TestFSWriteUploadJSON(t *testing.T) { for i := 1; i <= 3; i++ { naughty := newNaughtyDisk(fsStorage, map[int]error{i: errFaultyDisk}, nil) fs.storage = naughty - if err := fs.writeUploadJSON(bucketName, objectName, uploadID, time.Now().UTC()); errorCause(err) != errFaultyDisk { - t.Fatal("Unexpected err: ", err) - } - } -} - -// TestFSUpdateUploadsJSON - tests for updateUploadsJSON for FS -func TestFSUpdateUploadsJSON(t *testing.T) { - // Prepare for tests - disk := filepath.Join(os.TempDir(), "minio-"+nextSuffix()) - defer removeAll(disk) - - obj := initFSObjects(disk, t) - fs := obj.(fsObjects) - - bucketName := "bucket" - objectName := "object" - - obj.MakeBucket(bucketName) - - if err := fs.updateUploadsJSON(bucketName, objectName, uploadsV1{}); err != nil { - t.Fatal("Unexpected err: ", err) - } - - // isUploadIdExists with a faulty disk should return false - fsStorage := fs.storage.(*posix) - for i := 1; i <= 2; i++ { - naughty := newNaughtyDisk(fsStorage, map[int]error{i: errFaultyDisk}, nil) - fs.storage = naughty - if err := fs.updateUploadsJSON(bucketName, objectName, uploadsV1{}); errorCause(err) != errFaultyDisk { + if err := fs.updateUploadJSON(bucketName, objectName, + uploadIDChange{uploadID, time.Now().UTC(), false}); errorCause(err) != errFaultyDisk { t.Fatal("Unexpected err: ", err) } } diff --git a/cmd/fs-v1-multipart.go b/cmd/fs-v1-multipart.go index 56041e3dd..b872d88de 100644 --- a/cmd/fs-v1-multipart.go +++ b/cmd/fs-v1-multipart.go @@ -243,7 +243,7 @@ func (fs fsObjects) newMultipartUpload(bucket string, object string, meta map[st uploadID = getUUID() initiated := time.Now().UTC() // Create 'uploads.json' - if err = fs.writeUploadJSON(bucket, object, uploadID, initiated); err != nil { + if err = fs.updateUploadJSON(bucket, object, uploadIDChange{uploadID, initiated, false}); err != nil { return "", err } uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) @@ -791,28 +791,9 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID) defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID) - // Validate if there are other incomplete upload-id's present for - // the object, if yes do not attempt to delete 'uploads.json'. - uploadsJSON, err := readUploadsJSON(bucket, object, fs.storage) - if err != nil { - return "", toObjectErr(err, minioMetaBucket, object) - } - // If we have successfully read `uploads.json`, then we proceed to - // purge or update `uploads.json`. - uploadIDIdx := uploadsJSON.Index(uploadID) - if uploadIDIdx != -1 { - uploadsJSON.Uploads = append(uploadsJSON.Uploads[:uploadIDIdx], uploadsJSON.Uploads[uploadIDIdx+1:]...) - } - if len(uploadsJSON.Uploads) > 0 { - if err = fs.updateUploadsJSON(bucket, object, uploadsJSON); err != nil { - return "", toObjectErr(err, minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object)) - } - // Return success. - return s3MD5, nil - } - - if err = fs.storage.DeleteFile(minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile)); err != nil { - return "", toObjectErr(traceError(err), minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object)) + // remove entry from uploads.json + if err = fs.updateUploadJSON(bucket, object, uploadIDChange{uploadID: uploadID, isRemove: true}); err != nil { + return "", toObjectErr(err, minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object)) } // Return md5sum. @@ -829,28 +810,12 @@ func (fs fsObjects) abortMultipartUpload(bucket, object, uploadID string) error return err } - // Validate if there are other incomplete upload-id's present for - // the object, if yes do not attempt to delete 'uploads.json'. - uploadsJSON, err := readUploadsJSON(bucket, object, fs.storage) - if err == nil { - uploadIDIdx := uploadsJSON.Index(uploadID) - if uploadIDIdx != -1 { - uploadsJSON.Uploads = append(uploadsJSON.Uploads[:uploadIDIdx], uploadsJSON.Uploads[uploadIDIdx+1:]...) - } - // There are pending uploads for the same object, preserve - // them update 'uploads.json' in-place. - if len(uploadsJSON.Uploads) > 0 { - err = fs.updateUploadsJSON(bucket, object, uploadsJSON) - if err != nil { - return toObjectErr(err, bucket, object) - } - return nil - } - } // No more pending uploads for the object, we purge the entire - // entry at '.minio.sys/multipart/bucket/object'. - if err = fs.storage.DeleteFile(minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile)); err != nil { - return toObjectErr(traceError(err), minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object)) + // remove entry from uploads.json with quorum + if err := fs.updateUploadJSON(bucket, object, uploadIDChange{uploadID: uploadID, isRemove: true}); err != nil { + return toObjectErr(err, bucket, object) } + + // success return nil } diff --git a/cmd/object-multipart-common.go b/cmd/object-multipart-common.go index 689a21dae..59fef1a28 100644 --- a/cmd/object-multipart-common.go +++ b/cmd/object-multipart-common.go @@ -56,6 +56,17 @@ func (u *uploadsV1) AddUploadID(uploadID string, initiated time.Time) { sort.Sort(byInitiatedTime(u.Uploads)) } +// RemoveUploadID - removes upload id from uploads metadata. +func (u *uploadsV1) RemoveUploadID(uploadID string) { + // If the uploadID is absent, we do nothing. + for i, uInfo := range u.Uploads { + if uInfo.UploadID == uploadID { + u.Uploads = append(u.Uploads[:i], u.Uploads[i+1:]...) + break + } + } +} + // Index - returns the index of matching the upload id. func (u uploadsV1) Index(uploadID string) int { for i, u := range u.Uploads { @@ -92,6 +103,40 @@ func newUploadsV1(format string) uploadsV1 { return uploadIDs } +// uploadIDChange - represents a change to uploads.json - either add +// or remove an upload id. +type uploadIDChange struct { + // the id being added or removed. + uploadID string + // time of upload start. only used in uploadid add operations. + initiated time.Time + // if true, removes uploadID and ignores initiated time. + isRemove bool +} + +func writeUploadJSON(u *uploadsV1, uploadsPath, tmpPath string, disk StorageAPI) error { + // Serialize to prepare to write to disk. + uplBytes, wErr := json.Marshal(&u) + if wErr != nil { + return traceError(wErr) + } + + // Write `uploads.json` to disk. First to tmp location and + // then rename. + if wErr = disk.AppendFile(minioMetaBucket, tmpPath, uplBytes); wErr != nil { + return traceError(wErr) + } + wErr = disk.RenameFile(minioMetaBucket, tmpPath, minioMetaBucket, uploadsPath) + if wErr != nil { + if dErr := disk.DeleteFile(minioMetaBucket, tmpPath); dErr != nil { + // we return the most recent error. + return traceError(dErr) + } + return traceError(wErr) + } + return nil +} + // Wrapper which removes all the uploaded parts. func cleanupUploadedParts(bucket, object, uploadID string, storageDisks ...StorageAPI) error { var errs = make([]error, len(storageDisks)) diff --git a/cmd/xl-v1-multipart-common.go b/cmd/xl-v1-multipart-common.go index b637b4a25..ee73768d0 100644 --- a/cmd/xl-v1-multipart-common.go +++ b/cmd/xl-v1-multipart-common.go @@ -17,172 +17,129 @@ package cmd import ( - "encoding/json" "path" "sync" - "time" ) -// updateUploadsJSON - update `uploads.json` with new uploadsJSON for all disks. -func (xl xlObjects) updateUploadsJSON(bucket, object string, uploadsJSON uploadsV1) (err error) { - uploadsPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile) - uniqueID := getUUID() - tmpUploadsPath := path.Join(tmpMetaPrefix, uniqueID) - var errs = make([]error, len(xl.storageDisks)) - var wg = &sync.WaitGroup{} - - // Update `uploads.json` for all the disks. - for index, disk := range xl.storageDisks { - if disk == nil { - errs[index] = errDiskNotFound - continue - } - wg.Add(1) - // Update `uploads.json` in routine. - go func(index int, disk StorageAPI) { - defer wg.Done() - uploadsBytes, wErr := json.Marshal(uploadsJSON) - if wErr != nil { - errs[index] = traceError(wErr) - return - } - if wErr = disk.AppendFile(minioMetaBucket, tmpUploadsPath, uploadsBytes); wErr != nil { - errs[index] = traceError(wErr) - return - } - if wErr = disk.RenameFile(minioMetaBucket, tmpUploadsPath, minioMetaBucket, uploadsPath); wErr != nil { - errs[index] = traceError(wErr) - return - } - }(index, disk) - } - - // Wait for all the routines to finish updating `uploads.json` - wg.Wait() - - // Count all the errors and validate if we have write quorum. - if !isDiskQuorum(errs, xl.writeQuorum) { - // Do we have readQuorum?. - if isDiskQuorum(errs, xl.readQuorum) { - return nil - } - // Rename `uploads.json` left over back to tmp location. - for index, disk := range xl.storageDisks { - if disk == nil { - continue - } - // Undo rename `uploads.json` in parallel. - wg.Add(1) - go func(index int, disk StorageAPI) { - defer wg.Done() - if errs[index] != nil { - return - } - _ = disk.RenameFile(minioMetaBucket, uploadsPath, minioMetaBucket, tmpUploadsPath) - }(index, disk) - } - wg.Wait() - return traceError(errXLWriteQuorum) - } - return nil -} - -// Reads uploads.json from any of the load balanced disks. -func (xl xlObjects) readUploadsJSON(bucket, object string) (uploadsJSON uploadsV1, err error) { - for _, disk := range xl.getLoadBalancedDisks() { - if disk == nil { - continue - } - uploadsJSON, err = readUploadsJSON(bucket, object, disk) - if err == nil { - return uploadsJSON, nil - } - if isErrIgnored(err, objMetadataOpIgnoredErrs) { - continue - } - break - } - return uploadsV1{}, err -} - -// writeUploadJSON - create `uploads.json` or update it with new uploadID. -func (xl xlObjects) writeUploadJSON(bucket, object, uploadID string, initiated time.Time) (err error) { +// writeUploadJSON - create `uploads.json` or update it with change +// described in uCh. +func (xl xlObjects) updateUploadJSON(bucket, object string, uCh uploadIDChange) error { uploadsPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile) uniqueID := getUUID() tmpUploadsPath := path.Join(tmpMetaPrefix, uniqueID) - var errs = make([]error, len(xl.storageDisks)) - var wg = &sync.WaitGroup{} + // slice to store errors from disks + errs := make([]error, len(xl.storageDisks)) + // slice to store if it is a delete operation on a disk + isDelete := make([]bool, len(xl.storageDisks)) - // Reads `uploads.json` and returns error. - uploadsJSON, err := xl.readUploadsJSON(bucket, object) - if err != nil { - if errorCause(err) != errFileNotFound { - return err - } - // Set uploads format to `xl` otherwise. - uploadsJSON = newUploadsV1("xl") - } - // Add a new upload id. - uploadsJSON.AddUploadID(uploadID, initiated) - - // Update `uploads.json` on all disks. + wg := sync.WaitGroup{} for index, disk := range xl.storageDisks { if disk == nil { errs[index] = traceError(errDiskNotFound) continue } + // Update `uploads.json` in a go routine. wg.Add(1) - // Update `uploads.json` in a routine. go func(index int, disk StorageAPI) { defer wg.Done() - uploadsJSONBytes, wErr := json.Marshal(&uploadsJSON) - if wErr != nil { - errs[index] = traceError(wErr) + + // read and parse uploads.json on this disk + uploadsJSON, err := readUploadsJSON(bucket, object, disk) + if errorCause(err) == errFileNotFound { + // If file is not found, we assume an + // default (empty) upload info. + uploadsJSON, err = newUploadsV1("xl"), nil + } + // If we have a read error, we store error and + // exit. + if err != nil { + errs[index] = traceError(err) return } - // Write `uploads.json` to disk. - if wErr = disk.AppendFile(minioMetaBucket, tmpUploadsPath, uploadsJSONBytes); wErr != nil { - errs[index] = traceError(wErr) - return - } - wErr = disk.RenameFile(minioMetaBucket, tmpUploadsPath, minioMetaBucket, uploadsPath) - if wErr != nil { - if dErr := disk.DeleteFile(minioMetaBucket, tmpUploadsPath); dErr != nil { - errs[index] = traceError(dErr) - return + + if !uCh.isRemove { + // Add the uploadID + uploadsJSON.AddUploadID(uCh.uploadID, uCh.initiated) + } else { + // Remove the upload ID + uploadsJSON.RemoveUploadID(uCh.uploadID) + if len(uploadsJSON.Uploads) == 0 { + isDelete[index] = true } - errs[index] = traceError(wErr) - return } - errs[index] = nil + + // For delete, rename to tmp, for the + // possibility of recovery in case of quorum + // failure. + if !isDelete[index] { + errs[index] = writeUploadJSON(&uploadsJSON, uploadsPath, tmpUploadsPath, disk) + } else { + wErr := disk.RenameFile(minioMetaBucket, uploadsPath, minioMetaBucket, tmpUploadsPath) + if wErr != nil { + errs[index] = traceError(wErr) + } + + } }(index, disk) } - // Wait here for all the writes to finish. + // Wait for all the writes to finish. wg.Wait() - // Do we have write quorum?. + // Do we have write quorum? if !isDiskQuorum(errs, xl.writeQuorum) { - // Rename `uploads.json` left over back to tmp location. + // No quorum. Perform cleanup on the minority of disks + // on which the operation succeeded. + + // There are two cases: + // + // 1. uploads.json file was updated -> we delete the + // file that we successfully overwrote on the + // minority of disks, so that the failed quorum + // operation is not partially visible. + // + // 2. uploads.json was deleted -> in this case since + // the delete failed, we restore from tmp. for index, disk := range xl.storageDisks { - if disk == nil { + if disk == nil || errs[index] != nil { continue } - // Undo rename `uploads.json` in parallel. wg.Add(1) go func(index int, disk StorageAPI) { defer wg.Done() - if errs[index] != nil { - return + if !isDelete[index] { + _ = disk.DeleteFile( + minioMetaBucket, + uploadsPath, + ) + } else { + _ = disk.RenameFile( + minioMetaBucket, tmpUploadsPath, + minioMetaBucket, uploadsPath, + ) } - _ = disk.RenameFile(minioMetaBucket, uploadsPath, minioMetaBucket, tmpUploadsPath) }(index, disk) } wg.Wait() return traceError(errXLWriteQuorum) } + // we do have quorum, so in case of delete upload.json file + // operation, we purge from tmp. + for index, disk := range xl.storageDisks { + if disk == nil || !isDelete[index] { + continue + } + wg.Add(1) + go func(index int, disk StorageAPI) { + defer wg.Done() + // isDelete[index] = true at this point. + _ = disk.DeleteFile(minioMetaBucket, tmpUploadsPath) + }(index, disk) + } + wg.Wait() + // Ignored errors list. ignoredErrs := []error{ errDiskNotFound, diff --git a/cmd/xl-v1-multipart.go b/cmd/xl-v1-multipart.go index 7bfe04234..eb32c4275 100644 --- a/cmd/xl-v1-multipart.go +++ b/cmd/xl-v1-multipart.go @@ -306,7 +306,7 @@ func (xl xlObjects) newMultipartUpload(bucket string, object string, meta map[st initiated := time.Now().UTC() // Create or update 'uploads.json' - if err := xl.writeUploadJSON(bucket, object, uploadID, initiated); err != nil { + if err := xl.updateUploadJSON(bucket, object, uploadIDChange{uploadID, initiated, false}); err != nil { return "", err } // Return success. @@ -832,27 +832,8 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID) defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID) - // Validate if there are other incomplete upload-id's present for - // the object, if yes do not attempt to delete 'uploads.json'. - uploadsJSON, err := xl.readUploadsJSON(bucket, object) - if err != nil { - return "", toObjectErr(err, minioMetaBucket, object) - } - // If we have successfully read `uploads.json`, then we proceed to - // purge or update `uploads.json`. - uploadIDIdx := uploadsJSON.Index(uploadID) - if uploadIDIdx != -1 { - uploadsJSON.Uploads = append(uploadsJSON.Uploads[:uploadIDIdx], uploadsJSON.Uploads[uploadIDIdx+1:]...) - } - if len(uploadsJSON.Uploads) > 0 { - if err = xl.updateUploadsJSON(bucket, object, uploadsJSON); err != nil { - return "", toObjectErr(err, minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object)) - } - // Return success. - return s3MD5, nil - } // No more pending uploads for the object, proceed to delete - // object completely from '.minio.sys/multipart'. - if err = xl.deleteObject(minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object)); err != nil { + // remove entry from uploads.json with quorum + if err = xl.updateUploadJSON(bucket, object, uploadIDChange{uploadID: uploadID, isRemove: true}); err != nil { return "", toObjectErr(err, minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object)) } @@ -875,29 +856,11 @@ func (xl xlObjects) abortMultipartUpload(bucket, object, uploadID string) (err e nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID) defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID) - // Validate if there are other incomplete upload-id's present for - // the object, if yes do not attempt to delete 'uploads.json'. - uploadsJSON, err := xl.readUploadsJSON(bucket, object) - if err != nil { + + // remove entry from uploads.json with quorum + if err = xl.updateUploadJSON(bucket, object, uploadIDChange{uploadID: uploadID, isRemove: true}); err != nil { return toObjectErr(err, bucket, object) } - uploadIDIdx := uploadsJSON.Index(uploadID) - if uploadIDIdx != -1 { - uploadsJSON.Uploads = append(uploadsJSON.Uploads[:uploadIDIdx], uploadsJSON.Uploads[uploadIDIdx+1:]...) - } - if len(uploadsJSON.Uploads) > 0 { - // There are pending uploads for the same object, preserve - // them update 'uploads.json' in-place. - err = xl.updateUploadsJSON(bucket, object, uploadsJSON) - if err != nil { - return toObjectErr(err, bucket, object) - } - return nil - } // No more pending uploads for the object, we purge the entire - // entry at '.minio.sys/multipart/bucket/object'. - if err = xl.deleteObject(minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object)); err != nil { - return toObjectErr(err, minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object)) - } // Successfully purged. return nil