mirror of
https://github.com/minio/minio.git
synced 2025-04-07 05:10:30 -04:00
Optimize healObject by eliminating extra data passes (#4949)
This commit is contained in:
parent
94670a387e
commit
4c9fae90ff
@ -17,13 +17,16 @@
|
|||||||
package cmd
|
package cmd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"hash"
|
"hash"
|
||||||
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
// HealFile tries to reconstruct an erasure-coded file spread over all
|
// HealFile tries to reconstruct an erasure-coded file spread over all
|
||||||
// available disks. HealFile will read the valid parts of the file,
|
// available disks. HealFile will read the valid parts of the file,
|
||||||
// reconstruct the missing data and write the reconstructed parts back
|
// reconstruct the missing data and write the reconstructed parts back
|
||||||
// to `staleDisks`.
|
// to `staleDisks` at the destination `dstVol/dstPath/`. Parts are
|
||||||
|
// verified against the given BitrotAlgorithm and checksums.
|
||||||
//
|
//
|
||||||
// `staleDisks` is a slice of disks where each non-nil entry has stale
|
// `staleDisks` is a slice of disks where each non-nil entry has stale
|
||||||
// or no data, and so will be healed.
|
// or no data, and so will be healed.
|
||||||
@ -34,19 +37,17 @@ import (
|
|||||||
// In addition, `staleDisks` and `s.disks` must have the same ordering
|
// In addition, `staleDisks` and `s.disks` must have the same ordering
|
||||||
// of disks w.r.t. erasure coding of the object.
|
// of disks w.r.t. erasure coding of the object.
|
||||||
//
|
//
|
||||||
// The function will try to read the valid parts from the file under
|
// Errors when writing to `staleDisks` are not propagated as long as
|
||||||
// the given volume and path and tries to reconstruct the file under
|
// writes succeed for at least one disk. This allows partial healing
|
||||||
// the given healVolume and healPath (on staleDisks). The given
|
// despite stale disks being faulty.
|
||||||
// algorithm will be used to verify the valid parts and to protect the
|
|
||||||
// reconstructed file.
|
|
||||||
//
|
//
|
||||||
// It returns bitrot checksums for the non-nil staleDisks.
|
// It returns bitrot checksums for the non-nil staleDisks on which
|
||||||
func (s ErasureStorage) HealFile(staleDisks []StorageAPI, volume, path string,
|
// healing succeeded.
|
||||||
blocksize int64, healVolume, healPath string, size int64,
|
func (s ErasureStorage) HealFile(staleDisks []StorageAPI, volume, path string, blocksize int64,
|
||||||
algorithm BitrotAlgorithm, checksums [][]byte) (f ErasureFileInfo,
|
dstVol, dstPath string, size int64, alg BitrotAlgorithm, checksums [][]byte) (
|
||||||
err error) {
|
f ErasureFileInfo, err error) {
|
||||||
|
|
||||||
if !algorithm.Available() {
|
if !alg.Available() {
|
||||||
return f, traceError(errBitrotHashAlgoInvalid)
|
return f, traceError(errBitrotHashAlgoInvalid)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -57,15 +58,15 @@ func (s ErasureStorage) HealFile(staleDisks []StorageAPI, volume, path string,
|
|||||||
for i, disk := range s.disks {
|
for i, disk := range s.disks {
|
||||||
switch {
|
switch {
|
||||||
case staleDisks[i] != nil:
|
case staleDisks[i] != nil:
|
||||||
hashers[i] = algorithm.New()
|
hashers[i] = alg.New()
|
||||||
case disk == nil:
|
case disk == nil:
|
||||||
// disregard unavailable disk
|
// disregard unavailable disk
|
||||||
continue
|
continue
|
||||||
default:
|
default:
|
||||||
verifiers[i] = NewBitrotVerifier(algorithm, checksums[i])
|
verifiers[i] = NewBitrotVerifier(alg, checksums[i])
|
||||||
f.Checksums[i] = checksums[i]
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
writeErrors := make([]error, len(s.disks))
|
||||||
|
|
||||||
// Scan part files on disk, block-by-block reconstruct it and
|
// Scan part files on disk, block-by-block reconstruct it and
|
||||||
// write to stale disks.
|
// write to stale disks.
|
||||||
@ -125,27 +126,48 @@ func (s ErasureStorage) HealFile(staleDisks []StorageAPI, volume, path string,
|
|||||||
|
|
||||||
// write computed shards as chunks on file in each
|
// write computed shards as chunks on file in each
|
||||||
// stale disk
|
// stale disk
|
||||||
|
writeSucceeded := false
|
||||||
for i, disk := range staleDisks {
|
for i, disk := range staleDisks {
|
||||||
if disk == nil {
|
// skip nil disk or disk that had error on
|
||||||
|
// previous write
|
||||||
|
if disk == nil || writeErrors[i] != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
err = disk.AppendFile(healVolume, healPath, blocks[i])
|
writeErrors[i] = disk.AppendFile(dstVol, dstPath, blocks[i])
|
||||||
if err != nil {
|
if writeErrors[i] == nil {
|
||||||
return f, traceError(err)
|
hashers[i].Write(blocks[i])
|
||||||
|
writeSucceeded = true
|
||||||
}
|
}
|
||||||
hashers[i].Write(blocks[i])
|
}
|
||||||
|
|
||||||
|
// If all disks had write errors we quit.
|
||||||
|
if !writeSucceeded {
|
||||||
|
// build error from all write errors
|
||||||
|
return f, traceError(joinWriteErrors(writeErrors))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// copy computed file hashes into output variable
|
// copy computed file hashes into output variable
|
||||||
f.Size = size
|
f.Size = size
|
||||||
f.Algorithm = algorithm
|
f.Algorithm = alg
|
||||||
for i, disk := range staleDisks {
|
for i, disk := range staleDisks {
|
||||||
if disk == nil {
|
if disk == nil || writeErrors[i] != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
f.Checksums[i] = hashers[i].Sum(nil)
|
f.Checksums[i] = hashers[i].Sum(nil)
|
||||||
}
|
}
|
||||||
return f, nil
|
return f, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func joinWriteErrors(errs []error) error {
|
||||||
|
msgs := []string{}
|
||||||
|
for i, err := range errs {
|
||||||
|
if err == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
msgs = append(msgs, fmt.Sprintf("disk %d: %v", i+1, err))
|
||||||
|
}
|
||||||
|
return fmt.Errorf("all stale disks had write errors during healing: %s",
|
||||||
|
strings.Join(msgs, ", "))
|
||||||
|
}
|
||||||
|
@ -44,7 +44,7 @@ var erasureHealFileTests = []struct {
|
|||||||
{dataBlocks: 5, disks: 10, offDisks: 3, badDisks: 1, badStaleDisks: 0, blocksize: int64(blockSizeV1), size: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: false}, // 3
|
{dataBlocks: 5, disks: 10, offDisks: 3, badDisks: 1, badStaleDisks: 0, blocksize: int64(blockSizeV1), size: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: false}, // 3
|
||||||
{dataBlocks: 6, disks: 12, offDisks: 2, badDisks: 3, badStaleDisks: 0, blocksize: int64(blockSizeV1), size: oneMiByte, algorithm: SHA256, shouldFail: false}, // 4
|
{dataBlocks: 6, disks: 12, offDisks: 2, badDisks: 3, badStaleDisks: 0, blocksize: int64(blockSizeV1), size: oneMiByte, algorithm: SHA256, shouldFail: false}, // 4
|
||||||
{dataBlocks: 7, disks: 14, offDisks: 4, badDisks: 1, badStaleDisks: 0, blocksize: int64(blockSizeV1), size: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: false}, // 5
|
{dataBlocks: 7, disks: 14, offDisks: 4, badDisks: 1, badStaleDisks: 0, blocksize: int64(blockSizeV1), size: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: false}, // 5
|
||||||
{dataBlocks: 8, disks: 16, offDisks: 6, badDisks: 1, badStaleDisks: 1, blocksize: int64(blockSizeV1), size: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: true}, // 6
|
{dataBlocks: 8, disks: 16, offDisks: 6, badDisks: 1, badStaleDisks: 1, blocksize: int64(blockSizeV1), size: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: false}, // 6
|
||||||
{dataBlocks: 7, disks: 14, offDisks: 2, badDisks: 3, badStaleDisks: 0, blocksize: int64(oneMiByte / 2), size: oneMiByte, algorithm: BLAKE2b512, shouldFail: false}, // 7
|
{dataBlocks: 7, disks: 14, offDisks: 2, badDisks: 3, badStaleDisks: 0, blocksize: int64(oneMiByte / 2), size: oneMiByte, algorithm: BLAKE2b512, shouldFail: false}, // 7
|
||||||
{dataBlocks: 6, disks: 12, offDisks: 1, badDisks: 0, badStaleDisks: 1, blocksize: int64(oneMiByte - 1), size: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: true}, // 8
|
{dataBlocks: 6, disks: 12, offDisks: 1, badDisks: 0, badStaleDisks: 1, blocksize: int64(oneMiByte - 1), size: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: true}, // 8
|
||||||
{dataBlocks: 5, disks: 10, offDisks: 3, badDisks: 0, badStaleDisks: 3, blocksize: int64(oneMiByte / 2), size: oneMiByte, algorithm: SHA256, shouldFail: true}, // 9
|
{dataBlocks: 5, disks: 10, offDisks: 3, badDisks: 0, badStaleDisks: 3, blocksize: int64(oneMiByte / 2), size: oneMiByte, algorithm: SHA256, shouldFail: true}, // 9
|
||||||
@ -54,7 +54,7 @@ var erasureHealFileTests = []struct {
|
|||||||
{dataBlocks: 7, disks: 14, offDisks: 3, badDisks: 4, badStaleDisks: 0, blocksize: int64(blockSizeV1), size: oneMiByte, algorithm: BLAKE2b512, shouldFail: false}, // 13
|
{dataBlocks: 7, disks: 14, offDisks: 3, badDisks: 4, badStaleDisks: 0, blocksize: int64(blockSizeV1), size: oneMiByte, algorithm: BLAKE2b512, shouldFail: false}, // 13
|
||||||
{dataBlocks: 7, disks: 14, offDisks: 6, badDisks: 1, badStaleDisks: 0, blocksize: int64(blockSizeV1), size: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: false}, // 14
|
{dataBlocks: 7, disks: 14, offDisks: 6, badDisks: 1, badStaleDisks: 0, blocksize: int64(blockSizeV1), size: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: false}, // 14
|
||||||
{dataBlocks: 8, disks: 16, offDisks: 4, badDisks: 5, badStaleDisks: 0, blocksize: int64(blockSizeV1), size: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: true}, // 15
|
{dataBlocks: 8, disks: 16, offDisks: 4, badDisks: 5, badStaleDisks: 0, blocksize: int64(blockSizeV1), size: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: true}, // 15
|
||||||
{dataBlocks: 2, disks: 4, offDisks: 0, badDisks: 0, badStaleDisks: 0, blocksize: int64(blockSizeV1), size: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: false}, // 16
|
{dataBlocks: 2, disks: 4, offDisks: 1, badDisks: 0, badStaleDisks: 0, blocksize: int64(blockSizeV1), size: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: false}, // 16
|
||||||
{dataBlocks: 2, disks: 4, offDisks: 0, badDisks: 0, badStaleDisks: 0, blocksize: int64(blockSizeV1), size: oneMiByte, algorithm: 0, shouldFail: true}, // 17
|
{dataBlocks: 2, disks: 4, offDisks: 0, badDisks: 0, badStaleDisks: 0, blocksize: int64(blockSizeV1), size: oneMiByte, algorithm: 0, shouldFail: true}, // 17
|
||||||
{dataBlocks: 12, disks: 16, offDisks: 2, badDisks: 1, badStaleDisks: 0, blocksize: int64(blockSizeV1), size: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: false}, // 18
|
{dataBlocks: 12, disks: 16, offDisks: 2, badDisks: 1, badStaleDisks: 0, blocksize: int64(blockSizeV1), size: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: false}, // 18
|
||||||
{dataBlocks: 6, disks: 8, offDisks: 1, badDisks: 0, badStaleDisks: 0, blocksize: int64(blockSizeV1), size: oneMiByte, algorithm: BLAKE2b512, shouldFail: false}, // 19
|
{dataBlocks: 6, disks: 8, offDisks: 1, badDisks: 0, badStaleDisks: 0, blocksize: int64(blockSizeV1), size: oneMiByte, algorithm: BLAKE2b512, shouldFail: false}, // 19
|
||||||
@ -130,7 +130,7 @@ func TestErasureHealFile(t *testing.T) {
|
|||||||
// Verify that checksums of staleDisks
|
// Verify that checksums of staleDisks
|
||||||
// match expected values
|
// match expected values
|
||||||
for i, disk := range staleDisks {
|
for i, disk := range staleDisks {
|
||||||
if disk == nil {
|
if disk == nil || info.Checksums[i] == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if !reflect.DeepEqual(info.Checksums[i], file.Checksums[i]) {
|
if !reflect.DeepEqual(info.Checksums[i], file.Checksums[i]) {
|
||||||
|
@ -17,23 +17,10 @@
|
|||||||
package cmd
|
package cmd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"crypto/subtle"
|
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"io"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// healBufferPool is a pool of reusable buffers used to verify a stream
|
|
||||||
// while healing.
|
|
||||||
var healBufferPool = sync.Pool{
|
|
||||||
New: func() interface{} {
|
|
||||||
b := make([]byte, readSizeV1)
|
|
||||||
return &b
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
// commonTime returns a maximally occurring time from a list of time.
|
// commonTime returns a maximally occurring time from a list of time.
|
||||||
func commonTime(modTimes []time.Time) (modTime time.Time, count int) {
|
func commonTime(modTimes []time.Time) (modTime time.Time, count int) {
|
||||||
var maxima int // Counter for remembering max occurrence of elements.
|
var maxima int // Counter for remembering max occurrence of elements.
|
||||||
@ -257,42 +244,49 @@ func xlHealStat(xl xlObjects, partsMetadata []xlMetaV1, errs []error) HealObject
|
|||||||
|
|
||||||
// disksWithAllParts - This function needs to be called with
|
// disksWithAllParts - This function needs to be called with
|
||||||
// []StorageAPI returned by listOnlineDisks. Returns,
|
// []StorageAPI returned by listOnlineDisks. Returns,
|
||||||
|
//
|
||||||
// - disks which have all parts specified in the latest xl.json.
|
// - disks which have all parts specified in the latest xl.json.
|
||||||
|
//
|
||||||
// - errs updated to have errFileNotFound in place of disks that had
|
// - errs updated to have errFileNotFound in place of disks that had
|
||||||
// missing parts.
|
// missing or corrupted parts.
|
||||||
// - non-nil error if any of the online disks failed during
|
//
|
||||||
// calculating blake2b checksum.
|
// - non-nil error if any of the disks failed unexpectedly (i.e. error
|
||||||
func disksWithAllParts(onlineDisks []StorageAPI, partsMetadata []xlMetaV1, errs []error, bucket, object string) ([]StorageAPI, []error, error) {
|
// other than file not found and not a checksum error).
|
||||||
availableDisks := make([]StorageAPI, len(onlineDisks))
|
func disksWithAllParts(onlineDisks []StorageAPI, partsMetadata []xlMetaV1, errs []error, bucket,
|
||||||
buffer := healBufferPool.Get().(*[]byte)
|
object string) ([]StorageAPI, []error, error) {
|
||||||
defer healBufferPool.Put(buffer)
|
|
||||||
|
|
||||||
for diskIndex, onlineDisk := range onlineDisks {
|
availableDisks := make([]StorageAPI, len(onlineDisks))
|
||||||
|
buffer := []byte{}
|
||||||
|
|
||||||
|
for i, onlineDisk := range onlineDisks {
|
||||||
if onlineDisk == OfflineDisk {
|
if onlineDisk == OfflineDisk {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// disk has a valid xl.json but may not have all the
|
// disk has a valid xl.json but may not have all the
|
||||||
// parts. This is considered an outdated disk, since
|
// parts. This is considered an outdated disk, since
|
||||||
// it needs healing too.
|
// it needs healing too.
|
||||||
for _, part := range partsMetadata[diskIndex].Parts {
|
for _, part := range partsMetadata[i].Parts {
|
||||||
partPath := filepath.Join(object, part.Name)
|
partPath := filepath.Join(object, part.Name)
|
||||||
checkSumInfo := partsMetadata[diskIndex].Erasure.GetChecksumInfo(part.Name)
|
checksumInfo := partsMetadata[i].Erasure.GetChecksumInfo(part.Name)
|
||||||
hash := checkSumInfo.Algorithm.New()
|
verifier := NewBitrotVerifier(checksumInfo.Algorithm, checksumInfo.Hash)
|
||||||
_, hErr := io.CopyBuffer(hash, StorageReader(onlineDisk, bucket, partPath, 0), *buffer)
|
|
||||||
if hErr == errFileNotFound {
|
// verification happens even if a 0-length
|
||||||
errs[diskIndex] = errFileNotFound
|
// buffer is passed
|
||||||
availableDisks[diskIndex] = OfflineDisk
|
_, hErr := onlineDisk.ReadFile(bucket, partPath, 0, buffer, verifier)
|
||||||
break
|
if hErr != nil {
|
||||||
}
|
_, isCorrupted := hErr.(hashMismatchError)
|
||||||
if hErr != nil && hErr != errFileNotFound {
|
if isCorrupted || hErr == errFileNotFound {
|
||||||
|
errs[i] = errFileNotFound
|
||||||
|
availableDisks[i] = OfflineDisk
|
||||||
|
break
|
||||||
|
}
|
||||||
return nil, nil, traceError(hErr)
|
return nil, nil, traceError(hErr)
|
||||||
}
|
}
|
||||||
if subtle.ConstantTimeCompare(hash.Sum(nil), checkSumInfo.Hash) != 1 {
|
}
|
||||||
errs[diskIndex] = errFileNotFound
|
|
||||||
availableDisks[diskIndex] = OfflineDisk
|
if errs[i] == nil {
|
||||||
break
|
// All parts verified, mark it as all data available.
|
||||||
}
|
availableDisks[i] = onlineDisk
|
||||||
availableDisks[diskIndex] = onlineDisk
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -333,25 +333,24 @@ func quickHeal(storageDisks []StorageAPI, writeQuorum int, readQuorum int) error
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Heals an object only the corrupted/missing erasure blocks.
|
// Heals an object only the corrupted/missing erasure blocks.
|
||||||
func healObject(storageDisks []StorageAPI, bucket string, object string, quorum int) (int, int, error) {
|
func healObject(storageDisks []StorageAPI, bucket, object string, quorum int) (int, int, error) {
|
||||||
|
|
||||||
partsMetadata, errs := readAllXLMetadata(storageDisks, bucket, object)
|
partsMetadata, errs := readAllXLMetadata(storageDisks, bucket, object)
|
||||||
// readQuorum suffices for xl.json since we use monotonic
|
// readQuorum suffices for xl.json since we use monotonic
|
||||||
// system time to break the tie when a split-brain situation
|
// system time to break the tie when a split-brain situation
|
||||||
// arises.
|
// arises.
|
||||||
if reducedErr := reduceReadQuorumErrs(errs, nil, quorum); reducedErr != nil {
|
if rErr := reduceReadQuorumErrs(errs, nil, quorum); rErr != nil {
|
||||||
return 0, 0, toObjectErr(reducedErr, bucket, object)
|
return 0, 0, toObjectErr(rErr, bucket, object)
|
||||||
}
|
|
||||||
|
|
||||||
if !xlShouldHeal(storageDisks, partsMetadata, errs, bucket, object) {
|
|
||||||
// There is nothing to heal.
|
|
||||||
return 0, 0, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// List of disks having latest version of the object.
|
// List of disks having latest version of the object.
|
||||||
latestDisks, modTime := listOnlineDisks(storageDisks, partsMetadata, errs)
|
latestDisks, modTime := listOnlineDisks(storageDisks, partsMetadata, errs)
|
||||||
|
|
||||||
// List of disks having all parts as per latest xl.json.
|
// List of disks having all parts as per latest xl.json - this
|
||||||
availableDisks, errs, aErr := disksWithAllParts(latestDisks, partsMetadata, errs, bucket, object)
|
// does a full pass over the data and verifies all part files
|
||||||
|
// on disk
|
||||||
|
availableDisks, errs, aErr := disksWithAllParts(latestDisks, partsMetadata, errs, bucket,
|
||||||
|
object)
|
||||||
if aErr != nil {
|
if aErr != nil {
|
||||||
return 0, 0, toObjectErr(aErr, bucket, object)
|
return 0, 0, toObjectErr(aErr, bucket, object)
|
||||||
}
|
}
|
||||||
@ -359,8 +358,7 @@ func healObject(storageDisks []StorageAPI, bucket string, object string, quorum
|
|||||||
// Number of disks which don't serve data.
|
// Number of disks which don't serve data.
|
||||||
numOfflineDisks := 0
|
numOfflineDisks := 0
|
||||||
for index, disk := range storageDisks {
|
for index, disk := range storageDisks {
|
||||||
switch {
|
if disk == nil || errs[index] == errDiskNotFound {
|
||||||
case disk == nil, errs[index] == errDiskNotFound:
|
|
||||||
numOfflineDisks++
|
numOfflineDisks++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -368,12 +366,16 @@ func healObject(storageDisks []StorageAPI, bucket string, object string, quorum
|
|||||||
// Number of disks which have all parts of the given object.
|
// Number of disks which have all parts of the given object.
|
||||||
numAvailableDisks := 0
|
numAvailableDisks := 0
|
||||||
for _, disk := range availableDisks {
|
for _, disk := range availableDisks {
|
||||||
switch {
|
if disk != nil {
|
||||||
case disk != nil:
|
|
||||||
numAvailableDisks++
|
numAvailableDisks++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if numAvailableDisks == len(storageDisks) {
|
||||||
|
// nothing to heal in this case
|
||||||
|
return 0, 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
// If less than read quorum number of disks have all the parts
|
// If less than read quorum number of disks have all the parts
|
||||||
// of the data, we can't reconstruct the erasure-coded data.
|
// of the data, we can't reconstruct the erasure-coded data.
|
||||||
if numAvailableDisks < quorum {
|
if numAvailableDisks < quorum {
|
||||||
@ -381,8 +383,8 @@ func healObject(storageDisks []StorageAPI, bucket string, object string, quorum
|
|||||||
}
|
}
|
||||||
|
|
||||||
// List of disks having outdated version of the object or missing object.
|
// List of disks having outdated version of the object or missing object.
|
||||||
outDatedDisks := outDatedDisks(storageDisks, availableDisks, errs, partsMetadata,
|
outDatedDisks := outDatedDisks(storageDisks, availableDisks, errs, partsMetadata, bucket,
|
||||||
bucket, object)
|
object)
|
||||||
|
|
||||||
// Number of disks that had outdated content of the given
|
// Number of disks that had outdated content of the given
|
||||||
// object and are online to be healed.
|
// object and are online to be healed.
|
||||||
@ -401,9 +403,10 @@ func healObject(storageDisks []StorageAPI, bucket string, object string, quorum
|
|||||||
}
|
}
|
||||||
|
|
||||||
for index, disk := range outDatedDisks {
|
for index, disk := range outDatedDisks {
|
||||||
// Before healing outdated disks, we need to remove xl.json
|
// Before healing outdated disks, we need to remove
|
||||||
// and part files from "bucket/object/" so that
|
// xl.json and part files from "bucket/object/" so
|
||||||
// rename(minioMetaBucket, "tmp/tmpuuid/", "bucket", "object/") succeeds.
|
// that rename(minioMetaBucket, "tmp/tmpuuid/",
|
||||||
|
// "bucket", "object/") succeeds.
|
||||||
if disk == nil {
|
if disk == nil {
|
||||||
// Not an outdated disk.
|
// Not an outdated disk.
|
||||||
continue
|
continue
|
||||||
@ -417,27 +420,15 @@ func healObject(storageDisks []StorageAPI, bucket string, object string, quorum
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Outdated object with the same name exists that needs to be deleted.
|
// List and delete the object directory, ignoring
|
||||||
outDatedMeta := partsMetadata[index]
|
// errors.
|
||||||
// Consult valid metadata picked when there is no
|
files, err := disk.ListDir(bucket, object)
|
||||||
// metadata available on this disk.
|
if err == nil {
|
||||||
if isErr(errs[index], errFileNotFound) {
|
for _, entry := range files {
|
||||||
outDatedMeta = latestMeta
|
_ = disk.DeleteFile(bucket,
|
||||||
}
|
pathJoin(object, entry))
|
||||||
|
|
||||||
// Delete all the parts. Ignore if parts are not found.
|
|
||||||
for _, part := range outDatedMeta.Parts {
|
|
||||||
dErr := disk.DeleteFile(bucket, pathJoin(object, part.Name))
|
|
||||||
if dErr != nil && !isErr(dErr, errFileNotFound) {
|
|
||||||
return 0, 0, toObjectErr(traceError(dErr), bucket, object)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete xl.json file. Ignore if xl.json not found.
|
|
||||||
dErr := disk.DeleteFile(bucket, pathJoin(object, xlMetaJSONFile))
|
|
||||||
if dErr != nil && !isErr(dErr, errFileNotFound) {
|
|
||||||
return 0, 0, toObjectErr(traceError(dErr), bucket, object)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reorder so that we have data disks first and parity disks next.
|
// Reorder so that we have data disks first and parity disks next.
|
||||||
@ -445,16 +436,19 @@ func healObject(storageDisks []StorageAPI, bucket string, object string, quorum
|
|||||||
outDatedDisks = shuffleDisks(outDatedDisks, latestMeta.Erasure.Distribution)
|
outDatedDisks = shuffleDisks(outDatedDisks, latestMeta.Erasure.Distribution)
|
||||||
partsMetadata = shufflePartsMetadata(partsMetadata, latestMeta.Erasure.Distribution)
|
partsMetadata = shufflePartsMetadata(partsMetadata, latestMeta.Erasure.Distribution)
|
||||||
|
|
||||||
// We write at temporary location and then rename to fianal location.
|
// We write at temporary location and then rename to final location.
|
||||||
tmpID := mustGetUUID()
|
tmpID := mustGetUUID()
|
||||||
|
|
||||||
// Checksum of the part files. checkSumInfos[index] will contain checksums
|
// Checksum of the part files. checkSumInfos[index] will
|
||||||
// of all the part files in the outDatedDisks[index]
|
// contain checksums of all the part files in the
|
||||||
|
// outDatedDisks[index]
|
||||||
checksumInfos := make([][]ChecksumInfo, len(outDatedDisks))
|
checksumInfos := make([][]ChecksumInfo, len(outDatedDisks))
|
||||||
|
|
||||||
// Heal each part. erasureHealFile() will write the healed part to
|
// Heal each part. erasureHealFile() will write the healed
|
||||||
// .minio/tmp/uuid/ which needs to be renamed later to the final location.
|
// part to .minio/tmp/uuid/ which needs to be renamed later to
|
||||||
storage, err := NewErasureStorage(latestDisks, latestMeta.Erasure.DataBlocks, latestMeta.Erasure.ParityBlocks)
|
// the final location.
|
||||||
|
storage, err := NewErasureStorage(latestDisks,
|
||||||
|
latestMeta.Erasure.DataBlocks, latestMeta.Erasure.ParityBlocks)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, 0, toObjectErr(err, bucket, object)
|
return 0, 0, toObjectErr(err, bucket, object)
|
||||||
}
|
}
|
||||||
@ -472,14 +466,33 @@ func healObject(storageDisks []StorageAPI, bucket string, object string, quorum
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Heal the part file.
|
// Heal the part file.
|
||||||
file, hErr := storage.HealFile(outDatedDisks, bucket, pathJoin(object, partName), erasure.BlockSize, minioMetaTmpBucket, pathJoin(tmpID, partName), partSize, algorithm, checksums)
|
file, hErr := storage.HealFile(outDatedDisks, bucket, pathJoin(object, partName),
|
||||||
|
erasure.BlockSize, minioMetaTmpBucket, pathJoin(tmpID, partName), partSize,
|
||||||
|
algorithm, checksums)
|
||||||
if hErr != nil {
|
if hErr != nil {
|
||||||
return 0, 0, toObjectErr(hErr, bucket, object)
|
return 0, 0, toObjectErr(hErr, bucket, object)
|
||||||
}
|
}
|
||||||
for i := range outDatedDisks {
|
// outDatedDisks that had write errors should not be
|
||||||
if outDatedDisks[i] != OfflineDisk {
|
// written to for remaining parts, so we nil it out.
|
||||||
checksumInfos[i] = append(checksumInfos[i], ChecksumInfo{partName, file.Algorithm, file.Checksums[i]})
|
for i, disk := range outDatedDisks {
|
||||||
|
if disk == nil {
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
// A non-nil stale disk which did not receive
|
||||||
|
// a healed part checksum had a write error.
|
||||||
|
if file.Checksums[i] == nil {
|
||||||
|
outDatedDisks[i] = nil
|
||||||
|
numHealedDisks--
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// append part checksums
|
||||||
|
checksumInfos[i] = append(checksumInfos[i],
|
||||||
|
ChecksumInfo{partName, file.Algorithm, file.Checksums[i]})
|
||||||
|
}
|
||||||
|
|
||||||
|
// If all disks are having errors, we give up.
|
||||||
|
if numHealedDisks == 0 {
|
||||||
|
return 0, 0, fmt.Errorf("all disks without up-to-date data had write errors")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -493,7 +506,8 @@ func healObject(storageDisks []StorageAPI, bucket string, object string, quorum
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Generate and write `xl.json` generated from other disks.
|
// Generate and write `xl.json` generated from other disks.
|
||||||
outDatedDisks, aErr = writeUniqueXLMetadata(outDatedDisks, minioMetaTmpBucket, tmpID, partsMetadata, diskCount(outDatedDisks))
|
outDatedDisks, aErr = writeUniqueXLMetadata(outDatedDisks, minioMetaTmpBucket, tmpID,
|
||||||
|
partsMetadata, diskCount(outDatedDisks))
|
||||||
if aErr != nil {
|
if aErr != nil {
|
||||||
return 0, 0, toObjectErr(aErr, bucket, object)
|
return 0, 0, toObjectErr(aErr, bucket, object)
|
||||||
}
|
}
|
||||||
@ -503,13 +517,10 @@ func healObject(storageDisks []StorageAPI, bucket string, object string, quorum
|
|||||||
if disk == nil {
|
if disk == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// Remove any lingering partial data from current namespace.
|
|
||||||
aErr = disk.DeleteFile(bucket, retainSlash(object))
|
|
||||||
if aErr != nil && aErr != errFileNotFound {
|
|
||||||
return 0, 0, toObjectErr(traceError(aErr), bucket, object)
|
|
||||||
}
|
|
||||||
// Attempt a rename now from healed data to final location.
|
// Attempt a rename now from healed data to final location.
|
||||||
aErr = disk.RenameFile(minioMetaTmpBucket, retainSlash(tmpID), bucket, retainSlash(object))
|
aErr = disk.RenameFile(minioMetaTmpBucket, retainSlash(tmpID), bucket,
|
||||||
|
retainSlash(object))
|
||||||
if aErr != nil {
|
if aErr != nil {
|
||||||
return 0, 0, toObjectErr(traceError(aErr), bucket, object)
|
return 0, 0, toObjectErr(traceError(aErr), bucket, object)
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user