mirror of
https://github.com/minio/minio.git
synced 2025-01-23 04:33:15 -05:00
XL: Remove usage of reduceErr and make it isQuorum verification. (#1909)
Fixes #1908
This commit is contained in:
parent
7f38f46e20
commit
8c0942bf0d
@ -144,5 +144,9 @@ func appendFile(disks []StorageAPI, volume, path string, enBlocks [][]byte, dist
|
||||
// Wait for all the appends to finish.
|
||||
wg.Wait()
|
||||
|
||||
return reduceError(wErrs, writeQuorum)
|
||||
// Do we have write quorum?.
|
||||
if !isQuorum(wErrs, writeQuorum) {
|
||||
return toObjectErr(errXLWriteQuorum, volume, path)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -443,8 +443,7 @@ func testNonExistantObjectInBucket(c *check.C, create func() ObjectLayer) {
|
||||
err := obj.MakeBucket("bucket")
|
||||
c.Assert(err, check.IsNil)
|
||||
|
||||
var bytesBuffer bytes.Buffer
|
||||
err = obj.GetObject("bucket", "dir1", 0, 10, &bytesBuffer)
|
||||
_, err = obj.GetObjectInfo("bucket", "dir1")
|
||||
c.Assert(err, check.Not(check.IsNil))
|
||||
switch err := err.(type) {
|
||||
case ObjectNotFound:
|
||||
@ -463,8 +462,7 @@ func testGetDirectoryReturnsObjectNotFound(c *check.C, create func() ObjectLayer
|
||||
_, err = obj.PutObject("bucket", "dir1/dir3/object", int64(len("The specified multipart upload does not exist. The upload ID might be invalid, or the multipart upload might have been aborted or completed.")), bytes.NewBufferString("One or more of the specified parts could not be found. The part might not have been uploaded, or the specified entity tag might not have matched the part's entity tag."), nil)
|
||||
c.Assert(err, check.IsNil)
|
||||
|
||||
var bytesBuffer bytes.Buffer
|
||||
err = obj.GetObject("bucket", "dir1", 0, 10, &bytesBuffer)
|
||||
_, err = obj.GetObjectInfo("bucket", "dir1")
|
||||
switch err := err.(type) {
|
||||
case ObjectNotFound:
|
||||
c.Assert(err.Bucket, check.Equals, "bucket")
|
||||
@ -474,7 +472,7 @@ func testGetDirectoryReturnsObjectNotFound(c *check.C, create func() ObjectLayer
|
||||
c.Assert(err, check.Equals, "ObjectNotFound")
|
||||
}
|
||||
|
||||
err = obj.GetObject("bucket", "dir1/", 0, 10, &bytesBuffer)
|
||||
_, err = obj.GetObjectInfo("bucket", "dir1/")
|
||||
switch err := err.(type) {
|
||||
case ObjectNameInvalid:
|
||||
c.Assert(err.Bucket, check.Equals, "bucket")
|
||||
|
17
posix.go
17
posix.go
@ -458,9 +458,6 @@ func (s posix) AppendFile(volume, path string, buf []byte) (n int64, err error)
|
||||
}
|
||||
return 0, err
|
||||
}
|
||||
if err = checkDiskFree(s.diskPath, s.minFreeDisk); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
filePath := pathJoin(volumeDir, path)
|
||||
if err = checkPathLength(filePath); err != nil {
|
||||
return 0, err
|
||||
@ -670,9 +667,17 @@ func (s posix) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err er
|
||||
if !(srcIsDir && dstIsDir || !srcIsDir && !dstIsDir) {
|
||||
return errFileAccessDenied
|
||||
}
|
||||
srcFilePath := slashpath.Join(srcVolumeDir, srcPath)
|
||||
if err = checkPathLength(srcFilePath); err != nil {
|
||||
return err
|
||||
}
|
||||
dstFilePath := slashpath.Join(dstVolumeDir, dstPath)
|
||||
if err = checkPathLength(dstFilePath); err != nil {
|
||||
return err
|
||||
}
|
||||
if srcIsDir {
|
||||
// If source is a directory we expect the destination to be non-existent always.
|
||||
_, err = os.Stat(preparePath(slashpath.Join(dstVolumeDir, dstPath)))
|
||||
_, err = os.Stat(preparePath(dstFilePath))
|
||||
if err == nil {
|
||||
return errFileAccessDenied
|
||||
}
|
||||
@ -681,14 +686,14 @@ func (s posix) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err er
|
||||
}
|
||||
// Destination does not exist, hence proceed with the rename.
|
||||
}
|
||||
if err = mkdirAll(preparePath(slashpath.Dir(slashpath.Join(dstVolumeDir, dstPath))), 0755); err != nil {
|
||||
if err = mkdirAll(preparePath(slashpath.Dir(dstFilePath)), 0755); err != nil {
|
||||
// File path cannot be verified since one of the parents is a file.
|
||||
if strings.Contains(err.Error(), "not a directory") {
|
||||
return errFileAccessDenied
|
||||
}
|
||||
return err
|
||||
}
|
||||
err = os.Rename(preparePath(slashpath.Join(srcVolumeDir, srcPath)), preparePath(slashpath.Join(dstVolumeDir, dstPath)))
|
||||
err = os.Rename(preparePath(srcFilePath), preparePath(dstFilePath))
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return errFileNotFound
|
||||
|
@ -400,7 +400,7 @@ func (s *MyAPIXLSuite) TestDeleteObject(c *C) {
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(response.StatusCode, Equals, http.StatusNoContent)
|
||||
|
||||
// Delete of non-existant data should return success.
|
||||
// Delete of non-existent data should return success.
|
||||
request, err = s.newRequest("DELETE", testAPIXLServer.URL+"/deletebucketobject/prefix/myobject1", 0, nil)
|
||||
c.Assert(err, IsNil)
|
||||
client = http.Client{}
|
||||
|
@ -33,10 +33,6 @@ func (xl xlObjects) MakeBucket(bucket string) error {
|
||||
nsMutex.Lock(bucket, "")
|
||||
defer nsMutex.Unlock(bucket, "")
|
||||
|
||||
// Err counters.
|
||||
createVolErr := 0 // Count generic create vol errs.
|
||||
volumeExistsErrCnt := 0 // Count all errVolumeExists errs.
|
||||
|
||||
// Initialize sync waitgroup.
|
||||
var wg = &sync.WaitGroup{}
|
||||
|
||||
@ -63,31 +59,49 @@ func (xl xlObjects) MakeBucket(bucket string) error {
|
||||
// Wait for all make vol to finish.
|
||||
wg.Wait()
|
||||
|
||||
// Look for specific errors and count them to be verified later.
|
||||
for _, err := range dErrs {
|
||||
if err == nil {
|
||||
continue
|
||||
}
|
||||
// if volume already exists, count them.
|
||||
if err == errVolumeExists {
|
||||
volumeExistsErrCnt++
|
||||
continue
|
||||
}
|
||||
|
||||
// Update error counter separately.
|
||||
createVolErr++
|
||||
}
|
||||
|
||||
// Return err if all disks report volume exists.
|
||||
if volumeExistsErrCnt > len(xl.storageDisks)-xl.readQuorum {
|
||||
return toObjectErr(errVolumeExists, bucket)
|
||||
} else if createVolErr > len(xl.storageDisks)-xl.writeQuorum {
|
||||
// Return errXLWriteQuorum if errors were more than allowed write quorum.
|
||||
// Do we have write quorum?.
|
||||
if !isQuorum(dErrs, xl.writeQuorum) {
|
||||
// Purge successfully created buckets if we don't have writeQuorum.
|
||||
xl.undoMakeBucket(bucket)
|
||||
return toObjectErr(errXLWriteQuorum, bucket)
|
||||
}
|
||||
|
||||
// Verify we have any other errors which should undo make bucket.
|
||||
for _, err := range dErrs {
|
||||
// Bucket already exists, return BucketExists error.
|
||||
if err == errVolumeExists {
|
||||
return toObjectErr(errVolumeExists, bucket)
|
||||
}
|
||||
// Undo make bucket for any other errors.
|
||||
if err != nil && err != errDiskNotFound {
|
||||
xl.undoMakeBucket(bucket)
|
||||
return toObjectErr(err, bucket)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// undo make bucket operation upon quorum failure.
|
||||
func (xl xlObjects) undoMakeBucket(bucket string) {
|
||||
// Initialize sync waitgroup.
|
||||
var wg = &sync.WaitGroup{}
|
||||
// Undo previous make bucket entry on all underlying storage disks.
|
||||
for index, disk := range xl.storageDisks {
|
||||
if disk == nil {
|
||||
continue
|
||||
}
|
||||
wg.Add(1)
|
||||
// Delete a bucket inside a go-routine.
|
||||
go func(index int, disk StorageAPI) {
|
||||
defer wg.Done()
|
||||
_ = disk.DeleteVol(bucket)
|
||||
}(index, disk)
|
||||
}
|
||||
|
||||
// Wait for all make vol to finish.
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// getBucketInfo - returns the BucketInfo from one of the load balanced disks.
|
||||
func (xl xlObjects) getBucketInfo(bucketName string) (bucketInfo BucketInfo, err error) {
|
||||
for _, disk := range xl.getLoadBalancedQuorumDisks() {
|
||||
|
101
xl-v1-healing.go
101
xl-v1-healing.go
@ -1,3 +1,19 @@
|
||||
/*
|
||||
* Minio Cloud Storage, (C) 2016 Minio, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
@ -7,11 +23,12 @@ import (
|
||||
)
|
||||
|
||||
// Get the highest integer from a given integer slice.
|
||||
func highestInt(intSlice []int64) (highestInteger int64) {
|
||||
highestInteger = int64(1)
|
||||
func highestInt(intSlice []int64, highestInt int64) (highestInteger int64) {
|
||||
highestInteger = highestInt
|
||||
for _, integer := range intSlice {
|
||||
if highestInteger < integer {
|
||||
highestInteger = integer
|
||||
break
|
||||
}
|
||||
}
|
||||
return highestInteger
|
||||
@ -23,6 +40,8 @@ func listObjectVersions(partsMetadata []xlMetaV1, errs []error) (versions []int6
|
||||
for index, metadata := range partsMetadata {
|
||||
if errs[index] == nil {
|
||||
versions[index] = metadata.Stat.Version
|
||||
} else if errs[index] == errFileNotFound {
|
||||
versions[index] = 1
|
||||
} else {
|
||||
versions[index] = -1
|
||||
}
|
||||
@ -56,8 +75,6 @@ func (xl xlObjects) readAllXLMetadata(bucket, object string) ([]xlMetaV1, []erro
|
||||
errs[index] = err
|
||||
return
|
||||
}
|
||||
// Relinquish buffer.
|
||||
buffer = nil
|
||||
errs[index] = nil
|
||||
}(index, disk)
|
||||
}
|
||||
@ -69,67 +86,6 @@ func (xl xlObjects) readAllXLMetadata(bucket, object string) ([]xlMetaV1, []erro
|
||||
return metadataArray, errs
|
||||
}
|
||||
|
||||
// error based on total errors and quorum.
|
||||
func reduceError(errs []error, quorum int) error {
|
||||
fileNotFoundCount := 0
|
||||
longNameCount := 0
|
||||
diskNotFoundCount := 0
|
||||
volumeNotFoundCount := 0
|
||||
diskAccessDeniedCount := 0
|
||||
for _, err := range errs {
|
||||
if err == errFileNotFound {
|
||||
fileNotFoundCount++
|
||||
} else if err == errFileNameTooLong {
|
||||
longNameCount++
|
||||
} else if err == errDiskNotFound {
|
||||
diskNotFoundCount++
|
||||
} else if err == errVolumeAccessDenied {
|
||||
diskAccessDeniedCount++
|
||||
} else if err == errVolumeNotFound {
|
||||
volumeNotFoundCount++
|
||||
}
|
||||
}
|
||||
// If we have errors with 'file not found' greater than
|
||||
// quorum, return as errFileNotFound.
|
||||
// else if we have errors with 'volume not found'
|
||||
// greater than quorum, return as errVolumeNotFound.
|
||||
if fileNotFoundCount > len(errs)-quorum {
|
||||
return errFileNotFound
|
||||
} else if longNameCount > len(errs)-quorum {
|
||||
return errFileNameTooLong
|
||||
} else if volumeNotFoundCount > len(errs)-quorum {
|
||||
return errVolumeNotFound
|
||||
}
|
||||
// If we have errors with disk not found equal to the
|
||||
// number of disks, return as errDiskNotFound.
|
||||
if diskNotFoundCount == len(errs) {
|
||||
return errDiskNotFound
|
||||
} else if diskNotFoundCount > len(errs)-quorum {
|
||||
// If we have errors with 'disk not found'
|
||||
// greater than quorum, return as errFileNotFound.
|
||||
return errFileNotFound
|
||||
}
|
||||
// If we have errors with disk not found equal to the
|
||||
// number of disks, return as errDiskNotFound.
|
||||
if diskAccessDeniedCount == len(errs) {
|
||||
return errVolumeAccessDenied
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Similar to 'len(slice)' but returns the actualelements count
|
||||
// skipping the unallocated elements.
|
||||
func diskCount(disks []StorageAPI) int {
|
||||
diskCount := 0
|
||||
for _, disk := range disks {
|
||||
if disk == nil {
|
||||
continue
|
||||
}
|
||||
diskCount++
|
||||
}
|
||||
return diskCount
|
||||
}
|
||||
|
||||
func (xl xlObjects) shouldHeal(onlineDisks []StorageAPI) (heal bool) {
|
||||
onlineDiskCount := diskCount(onlineDisks)
|
||||
// If online disks count is lesser than configured disks, most
|
||||
@ -156,21 +112,16 @@ func (xl xlObjects) shouldHeal(onlineDisks []StorageAPI) (heal bool) {
|
||||
// - error if any.
|
||||
func (xl xlObjects) listOnlineDisks(partsMetadata []xlMetaV1, errs []error) (onlineDisks []StorageAPI, version int64, err error) {
|
||||
onlineDisks = make([]StorageAPI, len(xl.storageDisks))
|
||||
if err = reduceError(errs, xl.readQuorum); err != nil {
|
||||
if err == errFileNotFound {
|
||||
// For file not found, treat as if disks are available
|
||||
// return all the configured ones.
|
||||
onlineDisks = xl.storageDisks
|
||||
return onlineDisks, 1, nil
|
||||
}
|
||||
return nil, 0, err
|
||||
// Do we have read Quorum?.
|
||||
if !isQuorum(errs, xl.readQuorum) {
|
||||
return nil, 0, errXLReadQuorum
|
||||
}
|
||||
highestVersion := int64(0)
|
||||
|
||||
// List all the file versions from partsMetadata list.
|
||||
versions := listObjectVersions(partsMetadata, errs)
|
||||
|
||||
// Get highest object version.
|
||||
highestVersion = highestInt(versions)
|
||||
highestVersion := highestInt(versions, int64(1))
|
||||
|
||||
// Pick online disks with version set to highestVersion.
|
||||
for index, version := range versions {
|
||||
|
@ -224,76 +224,28 @@ func (xl xlObjects) readXLMetadata(bucket, object string) (xlMeta xlMetaV1, err
|
||||
return xlMeta, nil
|
||||
}
|
||||
|
||||
// renameXLMetadata - renames `xl.json` from source prefix to destination prefix.
|
||||
func (xl xlObjects) renameXLMetadata(srcBucket, srcPrefix, dstBucket, dstPrefix string) error {
|
||||
// Undo rename xl metadata, renames successfully renamed `xl.json` back to source location.
|
||||
func (xl xlObjects) undoRenameXLMetadata(srcBucket, srcPrefix, dstBucket, dstPrefix string, errs []error) {
|
||||
var wg = &sync.WaitGroup{}
|
||||
var mErrs = make([]error, len(xl.storageDisks))
|
||||
|
||||
srcJSONFile := path.Join(srcPrefix, xlMetaJSONFile)
|
||||
dstJSONFile := path.Join(dstPrefix, xlMetaJSONFile)
|
||||
// Rename `xl.json` to all disks in parallel.
|
||||
|
||||
// Undo rename `xl.json` on disks where RenameFile succeeded.
|
||||
for index, disk := range xl.storageDisks {
|
||||
if disk == nil {
|
||||
mErrs[index] = errDiskNotFound
|
||||
continue
|
||||
}
|
||||
// Undo rename object in parallel.
|
||||
wg.Add(1)
|
||||
// Rename `xl.json` in a routine.
|
||||
go func(index int, disk StorageAPI) {
|
||||
defer wg.Done()
|
||||
// Renames `xl.json` from source prefix to destination prefix.
|
||||
rErr := disk.RenameFile(srcBucket, srcJSONFile, dstBucket, dstJSONFile)
|
||||
if rErr != nil {
|
||||
mErrs[index] = rErr
|
||||
if errs[index] != nil {
|
||||
return
|
||||
}
|
||||
// Delete any dangling directories.
|
||||
dErr := disk.DeleteFile(srcBucket, srcPrefix)
|
||||
if dErr != nil {
|
||||
mErrs[index] = dErr
|
||||
return
|
||||
}
|
||||
mErrs[index] = nil
|
||||
_ = disk.RenameFile(dstBucket, dstJSONFile, srcBucket, srcJSONFile)
|
||||
}(index, disk)
|
||||
}
|
||||
// Wait for all the routines.
|
||||
wg.Wait()
|
||||
|
||||
// Gather err count.
|
||||
var errCount = 0
|
||||
for _, err := range mErrs {
|
||||
if err == nil {
|
||||
continue
|
||||
}
|
||||
errCount++
|
||||
}
|
||||
// We can safely allow RenameFile errors up to len(xl.storageDisks) - xl.writeQuorum
|
||||
// otherwise return failure. Cleanup successful renames.
|
||||
if errCount > len(xl.storageDisks)-xl.writeQuorum {
|
||||
// Check we have successful read quorum.
|
||||
if errCount <= len(xl.storageDisks)-xl.readQuorum {
|
||||
return nil // Return success.
|
||||
} // else - failed to acquire read quorum.
|
||||
|
||||
// Undo rename `xl.json` on disks where RenameFile succeeded.
|
||||
for index, disk := range xl.storageDisks {
|
||||
if disk == nil {
|
||||
continue
|
||||
}
|
||||
// Undo rename object in parallel.
|
||||
wg.Add(1)
|
||||
go func(index int, disk StorageAPI) {
|
||||
defer wg.Done()
|
||||
if mErrs[index] != nil {
|
||||
return
|
||||
}
|
||||
_ = disk.RenameFile(dstBucket, dstJSONFile, srcBucket, srcJSONFile)
|
||||
}(index, disk)
|
||||
}
|
||||
wg.Wait()
|
||||
return errXLWriteQuorum
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// deleteXLMetadata - deletes `xl.json` on a single disk.
|
||||
@ -322,6 +274,27 @@ func writeXLMetadata(disk StorageAPI, bucket, prefix string, xlMeta xlMetaV1) er
|
||||
return nil
|
||||
}
|
||||
|
||||
// deleteAllXLMetadata - deletes all partially written `xl.json` depending on errs.
|
||||
func (xl xlObjects) deleteAllXLMetadata(bucket, prefix string, errs []error) {
|
||||
var wg = &sync.WaitGroup{}
|
||||
// Delete all the `xl.json` left over.
|
||||
for index, disk := range xl.storageDisks {
|
||||
if disk == nil {
|
||||
continue
|
||||
}
|
||||
// Undo rename object in parallel.
|
||||
wg.Add(1)
|
||||
go func(index int, disk StorageAPI) {
|
||||
defer wg.Done()
|
||||
if errs[index] != nil {
|
||||
return
|
||||
}
|
||||
_ = deleteXLMetdata(disk, bucket, prefix)
|
||||
}(index, disk)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// writeUniqueXLMetadata - writes unique `xl.json` content for each disk in order.
|
||||
func (xl xlObjects) writeUniqueXLMetadata(bucket, prefix string, xlMetas []xlMetaV1) error {
|
||||
var wg = &sync.WaitGroup{}
|
||||
@ -352,38 +325,26 @@ func (xl xlObjects) writeUniqueXLMetadata(bucket, prefix string, xlMetas []xlMet
|
||||
// Wait for all the routines.
|
||||
wg.Wait()
|
||||
|
||||
var errCount = 0
|
||||
// Return the first error.
|
||||
for _, err := range mErrs {
|
||||
if err == nil {
|
||||
continue
|
||||
}
|
||||
errCount++
|
||||
}
|
||||
// Count all the errors and validate if we have write quorum.
|
||||
if errCount > len(xl.storageDisks)-xl.writeQuorum {
|
||||
// Validate if we have read quorum, then return success.
|
||||
if errCount > len(xl.storageDisks)-xl.readQuorum {
|
||||
// Do we have write quorum?.
|
||||
if !isQuorum(mErrs, xl.writeQuorum) {
|
||||
// Validate if we have read quorum.
|
||||
if isQuorum(mErrs, xl.readQuorum) {
|
||||
// Return success.
|
||||
return nil
|
||||
}
|
||||
// Delete all the `xl.json` left over.
|
||||
for index, disk := range xl.storageDisks {
|
||||
if disk == nil {
|
||||
continue
|
||||
}
|
||||
// Undo rename object in parallel.
|
||||
wg.Add(1)
|
||||
go func(index int, disk StorageAPI) {
|
||||
defer wg.Done()
|
||||
if mErrs[index] != nil {
|
||||
return
|
||||
}
|
||||
_ = deleteXLMetdata(disk, bucket, prefix)
|
||||
}(index, disk)
|
||||
}
|
||||
wg.Wait()
|
||||
// Delete all `xl.json` successfully renamed.
|
||||
xl.deleteAllXLMetadata(bucket, prefix, mErrs)
|
||||
return errXLWriteQuorum
|
||||
}
|
||||
|
||||
// For all other errors return.
|
||||
for _, err := range mErrs {
|
||||
if err != nil && err != errDiskNotFound {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Success.
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -417,37 +378,25 @@ func (xl xlObjects) writeSameXLMetadata(bucket, prefix string, xlMeta xlMetaV1)
|
||||
// Wait for all the routines.
|
||||
wg.Wait()
|
||||
|
||||
var errCount = 0
|
||||
// Return the first error.
|
||||
for _, err := range mErrs {
|
||||
if err == nil {
|
||||
continue
|
||||
}
|
||||
errCount++
|
||||
}
|
||||
// Count all the errors and validate if we have write quorum.
|
||||
if errCount > len(xl.storageDisks)-xl.writeQuorum {
|
||||
// Validate if we have read quorum, then return success.
|
||||
if errCount > len(xl.storageDisks)-xl.readQuorum {
|
||||
// Do we have write Quorum?.
|
||||
if !isQuorum(mErrs, xl.writeQuorum) {
|
||||
// Do we have readQuorum?.
|
||||
if isQuorum(mErrs, xl.readQuorum) {
|
||||
// Return success.
|
||||
return nil
|
||||
}
|
||||
// Delete all the `xl.json` left over.
|
||||
for index, disk := range xl.storageDisks {
|
||||
if disk == nil {
|
||||
continue
|
||||
}
|
||||
// Undo rename object in parallel.
|
||||
wg.Add(1)
|
||||
go func(index int, disk StorageAPI) {
|
||||
defer wg.Done()
|
||||
if mErrs[index] != nil {
|
||||
return
|
||||
}
|
||||
_ = deleteXLMetdata(disk, bucket, prefix)
|
||||
}(index, disk)
|
||||
}
|
||||
wg.Wait()
|
||||
// Delete all `xl.json` successfully renamed.
|
||||
xl.deleteAllXLMetadata(bucket, prefix, mErrs)
|
||||
return errXLWriteQuorum
|
||||
}
|
||||
|
||||
// For any other errors delete `xl.json` as well.
|
||||
for _, err := range mErrs {
|
||||
if err != nil && err != errDiskNotFound {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Success.
|
||||
return nil
|
||||
}
|
||||
|
@ -444,3 +444,57 @@ func (xl xlObjects) statPart(bucket, object, uploadID, partName string) (fileInf
|
||||
}
|
||||
return fileInfo, nil
|
||||
}
|
||||
|
||||
// commitXLMetadata - commit `xl.json` from source prefix to destination prefix.
|
||||
func (xl xlObjects) commitXLMetadata(srcPrefix, dstPrefix string) error {
|
||||
var wg = &sync.WaitGroup{}
|
||||
var mErrs = make([]error, len(xl.storageDisks))
|
||||
|
||||
srcJSONFile := path.Join(srcPrefix, xlMetaJSONFile)
|
||||
dstJSONFile := path.Join(dstPrefix, xlMetaJSONFile)
|
||||
|
||||
// Rename `xl.json` to all disks in parallel.
|
||||
for index, disk := range xl.storageDisks {
|
||||
if disk == nil {
|
||||
mErrs[index] = errDiskNotFound
|
||||
continue
|
||||
}
|
||||
wg.Add(1)
|
||||
// Rename `xl.json` in a routine.
|
||||
go func(index int, disk StorageAPI) {
|
||||
defer wg.Done()
|
||||
// Renames `xl.json` from source prefix to destination prefix.
|
||||
rErr := disk.RenameFile(minioMetaBucket, srcJSONFile, minioMetaBucket, dstJSONFile)
|
||||
if rErr != nil {
|
||||
mErrs[index] = rErr
|
||||
return
|
||||
}
|
||||
// Delete any dangling directories.
|
||||
dErr := disk.DeleteFile(minioMetaBucket, srcPrefix)
|
||||
if dErr != nil {
|
||||
mErrs[index] = dErr
|
||||
return
|
||||
}
|
||||
mErrs[index] = nil
|
||||
}(index, disk)
|
||||
}
|
||||
// Wait for all the routines.
|
||||
wg.Wait()
|
||||
|
||||
// Do we have write quorum?.
|
||||
if !isQuorum(mErrs, xl.writeQuorum) {
|
||||
// Do we have read quorum?.
|
||||
if isQuorum(mErrs, xl.readQuorum) {
|
||||
// Return success on read quorum.
|
||||
return nil
|
||||
}
|
||||
return errXLWriteQuorum
|
||||
}
|
||||
// For all other errors return.
|
||||
for _, err := range mErrs {
|
||||
if err != nil && err != errDiskNotFound {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -323,6 +323,11 @@ func (xl xlObjects) putObjectPart(bucket string, object string, uploadID string,
|
||||
return "", toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
// Increment version only if we have online disks less than configured storage disks.
|
||||
if diskCount(onlineDisks) < len(xl.storageDisks) {
|
||||
higherVersion++
|
||||
}
|
||||
|
||||
// Pick one from the first valid metadata.
|
||||
xlMeta := pickValidXLMeta(partsMetadata)
|
||||
|
||||
@ -374,6 +379,7 @@ func (xl xlObjects) putObjectPart(bucket string, object string, uploadID string,
|
||||
|
||||
// Once part is successfully committed, proceed with updating XL metadata.
|
||||
xlMeta.Stat.Version = higherVersion
|
||||
|
||||
// Add the current part.
|
||||
xlMeta.AddObjectPart(partID, partSuffix, newMD5Hex, size)
|
||||
|
||||
@ -391,7 +397,7 @@ func (xl xlObjects) putObjectPart(bucket string, object string, uploadID string,
|
||||
if err = xl.writeUniqueXLMetadata(minioMetaBucket, tempUploadIDPath, partsMetadata); err != nil {
|
||||
return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath)
|
||||
}
|
||||
rErr := xl.renameXLMetadata(minioMetaBucket, tempUploadIDPath, minioMetaBucket, uploadIDPath)
|
||||
rErr := xl.commitXLMetadata(tempUploadIDPath, uploadIDPath)
|
||||
if rErr != nil {
|
||||
return "", toObjectErr(rErr, minioMetaBucket, uploadIDPath)
|
||||
}
|
||||
@ -553,8 +559,9 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
|
||||
|
||||
// Read metadata associated with the object from all disks.
|
||||
partsMetadata, errs := xl.readAllXLMetadata(minioMetaBucket, uploadIDPath)
|
||||
if err = reduceError(errs, xl.readQuorum); err != nil {
|
||||
return "", toObjectErr(err, minioMetaBucket, uploadIDPath)
|
||||
// Do we have readQuorum?.
|
||||
if !isQuorum(errs, xl.readQuorum) {
|
||||
return "", toObjectErr(errXLReadQuorum, minioMetaBucket, uploadIDPath)
|
||||
}
|
||||
|
||||
// Calculate full object size.
|
||||
@ -621,7 +628,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
|
||||
if err = xl.writeUniqueXLMetadata(minioMetaBucket, tempUploadIDPath, partsMetadata); err != nil {
|
||||
return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath)
|
||||
}
|
||||
rErr := xl.renameXLMetadata(minioMetaBucket, tempUploadIDPath, minioMetaBucket, uploadIDPath)
|
||||
rErr := xl.commitXLMetadata(tempUploadIDPath, uploadIDPath)
|
||||
if rErr != nil {
|
||||
return "", toObjectErr(rErr, minioMetaBucket, uploadIDPath)
|
||||
}
|
||||
@ -693,8 +700,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
|
||||
return s3MD5, nil
|
||||
} // No more pending uploads for the object, proceed to delete
|
||||
// object completely from '.minio/multipart'.
|
||||
err = xl.deleteObject(minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object))
|
||||
if err != nil {
|
||||
if err = xl.deleteObject(minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object)); err != nil {
|
||||
return "", toObjectErr(err, minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object))
|
||||
}
|
||||
|
||||
|
100
xl-v1-object.go
100
xl-v1-object.go
@ -55,9 +55,6 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i
|
||||
|
||||
// Read metadata associated with the object from all disks.
|
||||
metaArr, errs := xl.readAllXLMetadata(bucket, object)
|
||||
if err := reduceError(errs, xl.readQuorum); err != nil {
|
||||
return toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
// List all online disks.
|
||||
onlineDisks, highestVersion, err := xl.listOnlineDisks(metaArr, errs)
|
||||
@ -163,6 +160,27 @@ func (xl xlObjects) getObjectInfo(bucket, object string) (objInfo ObjectInfo, er
|
||||
return objInfo, nil
|
||||
}
|
||||
|
||||
// undoRenameObject - renames back the partially successful rename operations.
|
||||
func (xl xlObjects) undoRenameObject(srcBucket, srcObject, dstBucket, dstObject string, errs []error) {
|
||||
var wg = &sync.WaitGroup{}
|
||||
// Undo rename object on disks where RenameFile succeeded.
|
||||
for index, disk := range xl.storageDisks {
|
||||
if disk == nil {
|
||||
continue
|
||||
}
|
||||
// Undo rename object in parallel.
|
||||
wg.Add(1)
|
||||
go func(index int, disk StorageAPI) {
|
||||
defer wg.Done()
|
||||
if errs[index] != nil {
|
||||
return
|
||||
}
|
||||
_ = disk.RenameFile(dstBucket, retainSlash(dstObject), srcBucket, retainSlash(srcObject))
|
||||
}(index, disk)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// renameObject - renames all source objects to destination object
|
||||
// across all disks in parallel. Additionally if we have errors and do
|
||||
// not have a readQuorum partially renamed files are renamed back to
|
||||
@ -196,40 +214,25 @@ func (xl xlObjects) renameObject(srcBucket, srcObject, dstBucket, dstObject stri
|
||||
// Wait for all renames to finish.
|
||||
wg.Wait()
|
||||
|
||||
// Gather err count.
|
||||
var errCount = 0
|
||||
for _, err := range errs {
|
||||
if err == nil {
|
||||
continue
|
||||
}
|
||||
errCount++
|
||||
}
|
||||
// We can safely allow RenameFile errors up to len(xl.storageDisks) - xl.writeQuorum
|
||||
// otherwise return failure. Cleanup successful renames.
|
||||
if errCount > len(xl.storageDisks)-xl.writeQuorum {
|
||||
if !isQuorum(errs, xl.writeQuorum) {
|
||||
// Check we have successful read quorum.
|
||||
if errCount <= len(xl.storageDisks)-xl.readQuorum {
|
||||
if isQuorum(errs, xl.readQuorum) {
|
||||
return nil // Return success.
|
||||
} // else - failed to acquire read quorum.
|
||||
|
||||
// Undo rename object on disks where RenameFile succeeded.
|
||||
for index, disk := range xl.storageDisks {
|
||||
if disk == nil {
|
||||
continue
|
||||
}
|
||||
// Undo rename object in parallel.
|
||||
wg.Add(1)
|
||||
go func(index int, disk StorageAPI) {
|
||||
defer wg.Done()
|
||||
if errs[index] != nil {
|
||||
return
|
||||
}
|
||||
_ = disk.RenameFile(dstBucket, retainSlash(dstObject), srcBucket, retainSlash(srcObject))
|
||||
}(index, disk)
|
||||
}
|
||||
wg.Wait()
|
||||
// Undo all the partial rename operations.
|
||||
xl.undoRenameObject(srcBucket, srcObject, dstBucket, dstObject, errs)
|
||||
return errXLWriteQuorum
|
||||
}
|
||||
// Return on first error, also undo any partially successful rename operations.
|
||||
for _, err := range errs {
|
||||
if err != nil && err != errDiskNotFound {
|
||||
// Undo all the partial rename operations.
|
||||
xl.undoRenameObject(srcBucket, srcObject, dstBucket, dstObject, errs)
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -394,7 +397,7 @@ func (xl xlObjects) deleteObject(bucket, object string) error {
|
||||
go func(index int, disk StorageAPI) {
|
||||
defer wg.Done()
|
||||
err := cleanupDir(disk, bucket, object)
|
||||
if err != nil {
|
||||
if err != nil && err != errFileNotFound {
|
||||
dErrs[index] = err
|
||||
}
|
||||
}(index, disk)
|
||||
@ -403,25 +406,7 @@ func (xl xlObjects) deleteObject(bucket, object string) error {
|
||||
// Wait for all routines to finish.
|
||||
wg.Wait()
|
||||
|
||||
var fileNotFoundCnt, deleteFileErr int
|
||||
// Count for specific errors.
|
||||
for _, err := range dErrs {
|
||||
if err == nil {
|
||||
continue
|
||||
}
|
||||
// If file not found, count them.
|
||||
if err == errFileNotFound {
|
||||
fileNotFoundCnt++
|
||||
continue
|
||||
}
|
||||
|
||||
// Update error counter separately.
|
||||
deleteFileErr++
|
||||
}
|
||||
// Return err if all disks report file not found.
|
||||
if fileNotFoundCnt == len(xl.storageDisks) {
|
||||
return errFileNotFound
|
||||
} else if deleteFileErr > len(xl.storageDisks)-xl.writeQuorum {
|
||||
if !isQuorum(dErrs, xl.writeQuorum) {
|
||||
// Return errXLWriteQuorum if errors were more than
|
||||
// allowed write quorum.
|
||||
return errXLWriteQuorum
|
||||
@ -444,14 +429,17 @@ func (xl xlObjects) DeleteObject(bucket, object string) (err error) {
|
||||
nsMutex.Lock(bucket, object)
|
||||
defer nsMutex.Unlock(bucket, object)
|
||||
|
||||
// Validate object exists.
|
||||
if !xl.isObject(bucket, object) {
|
||||
return ObjectNotFound{bucket, object}
|
||||
} // else proceed to delete the object.
|
||||
|
||||
// Delete the object on all disks.
|
||||
err = xl.deleteObject(bucket, object)
|
||||
if err != nil {
|
||||
return toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
if err = xl.deleteObject(bucket, object); err == errFileNotFound {
|
||||
// Its valid to return success if given object is not found.
|
||||
err = nil
|
||||
}
|
||||
|
||||
return err
|
||||
// Success.
|
||||
return nil
|
||||
}
|
||||
|
@ -24,6 +24,31 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// Validates if we have quorum based on the errors with errDiskNotFound.
|
||||
func isQuorum(errs []error, minQuorumCount int) bool {
|
||||
var diskFoundCount int
|
||||
for _, err := range errs {
|
||||
if err == errDiskNotFound {
|
||||
continue
|
||||
}
|
||||
diskFoundCount++
|
||||
}
|
||||
return diskFoundCount >= minQuorumCount
|
||||
}
|
||||
|
||||
// Similar to 'len(slice)' but returns the actual elements count
|
||||
// skipping the unallocated elements.
|
||||
func diskCount(disks []StorageAPI) int {
|
||||
diskCount := 0
|
||||
for _, disk := range disks {
|
||||
if disk == nil {
|
||||
continue
|
||||
}
|
||||
diskCount++
|
||||
}
|
||||
return diskCount
|
||||
}
|
||||
|
||||
// randInts - uses Knuth Fisher-Yates shuffle algorithm for generating uniform shuffling.
|
||||
func randInts(count int) []int {
|
||||
rand.Seed(time.Now().UTC().UnixNano()) // Seed with current time.
|
||||
|
Loading…
x
Reference in New Issue
Block a user