mirror of
https://github.com/minio/minio.git
synced 2025-11-09 13:39:46 -05:00
This change provides new implementations of the XL backend operations: - create file - read file - heal file Further this change adds table based tests for all three operations. This affects also the bitrot algorithm integration. Algorithms are now integrated in an idiomatic way (like crypto.Hash). Fixes #4696 Fixes #4649 Fixes #4359
This commit is contained in:
committed by
Dee Koder
parent
617f2394fb
commit
85fcee1919
@@ -18,321 +18,150 @@ package cmd
|
||||
|
||||
import (
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"github.com/klauspost/reedsolomon"
|
||||
"github.com/minio/minio/pkg/bpool"
|
||||
)
|
||||
|
||||
// isSuccessDecodeBlocks - do we have all the blocks to be
|
||||
// successfully decoded?. Input encoded blocks ordered matrix.
|
||||
func isSuccessDecodeBlocks(enBlocks [][]byte, dataBlocks int) bool {
|
||||
// Count number of data and parity blocks that were read.
|
||||
var successDataBlocksCount = 0
|
||||
var successParityBlocksCount = 0
|
||||
for index := range enBlocks {
|
||||
if enBlocks[index] == nil {
|
||||
continue
|
||||
}
|
||||
// block index lesser than data blocks, update data block count.
|
||||
if index < dataBlocks {
|
||||
successDataBlocksCount++
|
||||
continue
|
||||
} // else { // update parity block count.
|
||||
successParityBlocksCount++
|
||||
}
|
||||
// Returns true if we have atleast dataBlocks parity.
|
||||
return successDataBlocksCount == dataBlocks || successDataBlocksCount+successParityBlocksCount >= dataBlocks
|
||||
}
|
||||
|
||||
// isSuccessDataBlocks - do we have all the data blocks?
|
||||
// Input encoded blocks ordered matrix.
|
||||
func isSuccessDataBlocks(enBlocks [][]byte, dataBlocks int) bool {
|
||||
// Count number of data blocks that were read.
|
||||
var successDataBlocksCount = 0
|
||||
for index := range enBlocks[:dataBlocks] {
|
||||
if enBlocks[index] == nil {
|
||||
continue
|
||||
}
|
||||
// block index lesser than data blocks, update data block count.
|
||||
if index < dataBlocks {
|
||||
successDataBlocksCount++
|
||||
}
|
||||
}
|
||||
// Returns true if we have atleast the dataBlocks.
|
||||
return successDataBlocksCount >= dataBlocks
|
||||
}
|
||||
|
||||
// Return readable disks slice from which we can read parallelly.
|
||||
func getReadDisks(orderedDisks []StorageAPI, index int, dataBlocks int) (readDisks []StorageAPI, nextIndex int, err error) {
|
||||
readDisks = make([]StorageAPI, len(orderedDisks))
|
||||
dataDisks := 0
|
||||
parityDisks := 0
|
||||
// Count already read data and parity chunks.
|
||||
for i := 0; i < index; i++ {
|
||||
if orderedDisks[i] == nil {
|
||||
continue
|
||||
}
|
||||
if i < dataBlocks {
|
||||
dataDisks++
|
||||
} else {
|
||||
parityDisks++
|
||||
}
|
||||
}
|
||||
|
||||
// Sanity checks - we should never have this situation.
|
||||
if dataDisks == dataBlocks {
|
||||
return nil, 0, traceError(errUnexpected)
|
||||
}
|
||||
if dataDisks+parityDisks >= dataBlocks {
|
||||
return nil, 0, traceError(errUnexpected)
|
||||
}
|
||||
|
||||
// Find the disks from which next set of parallel reads should happen.
|
||||
for i := index; i < len(orderedDisks); i++ {
|
||||
if orderedDisks[i] == nil {
|
||||
continue
|
||||
}
|
||||
if i < dataBlocks {
|
||||
dataDisks++
|
||||
} else {
|
||||
parityDisks++
|
||||
}
|
||||
readDisks[i] = orderedDisks[i]
|
||||
if dataDisks == dataBlocks {
|
||||
return readDisks, i + 1, nil
|
||||
} else if dataDisks+parityDisks == dataBlocks {
|
||||
return readDisks, i + 1, nil
|
||||
}
|
||||
}
|
||||
return nil, 0, traceError(errXLReadQuorum)
|
||||
}
|
||||
|
||||
// parallelRead - reads chunks in parallel from the disks specified in []readDisks.
|
||||
func parallelRead(volume, path string, readDisks, orderedDisks []StorageAPI, enBlocks [][]byte,
|
||||
blockOffset, curChunkSize int64, brVerifiers []bitRotVerifier, pool *bpool.BytePool) {
|
||||
|
||||
// WaitGroup to synchronise the read go-routines.
|
||||
wg := &sync.WaitGroup{}
|
||||
|
||||
// Read disks in parallel.
|
||||
for index := range readDisks {
|
||||
if readDisks[index] == nil {
|
||||
continue
|
||||
}
|
||||
wg.Add(1)
|
||||
// Reads chunk from readDisk[index] in routine.
|
||||
go func(index int) {
|
||||
defer wg.Done()
|
||||
|
||||
// evaluate if we need to perform bit-rot checking
|
||||
needBitRotVerification := true
|
||||
if brVerifiers[index].isVerified {
|
||||
needBitRotVerification = false
|
||||
// if file has bit-rot, do not reuse disk
|
||||
if brVerifiers[index].hasBitRot {
|
||||
orderedDisks[index] = nil
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
buf, err := pool.Get()
|
||||
if err != nil {
|
||||
errorIf(err, "unable to get buffer from byte pool")
|
||||
orderedDisks[index] = nil
|
||||
return
|
||||
}
|
||||
buf = buf[:curChunkSize]
|
||||
|
||||
if needBitRotVerification {
|
||||
_, err = readDisks[index].ReadFileWithVerify(
|
||||
volume, path, blockOffset, buf,
|
||||
brVerifiers[index].algo,
|
||||
brVerifiers[index].checkSum)
|
||||
} else {
|
||||
_, err = readDisks[index].ReadFile(volume, path,
|
||||
blockOffset, buf)
|
||||
}
|
||||
|
||||
// if bit-rot verification was done, store the
|
||||
// result of verification so we can skip
|
||||
// re-doing it next time
|
||||
if needBitRotVerification {
|
||||
brVerifiers[index].isVerified = true
|
||||
_, ok := err.(hashMismatchError)
|
||||
brVerifiers[index].hasBitRot = ok
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
orderedDisks[index] = nil
|
||||
return
|
||||
}
|
||||
enBlocks[index] = buf
|
||||
}(index)
|
||||
}
|
||||
|
||||
// Waiting for first routines to finish.
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// erasureReadFile - read bytes from erasure coded files and writes to
|
||||
// given writer. Erasure coded files are read block by block as per
|
||||
// given erasureInfo and data chunks are decoded into a data
|
||||
// block. Data block is trimmed for given offset and length, then
|
||||
// written to given writer. This function also supports bit-rot
|
||||
// detection by verifying checksum of individual block's checksum.
|
||||
func erasureReadFile(writer io.Writer, disks []StorageAPI, volume, path string,
|
||||
offset, length, totalLength, blockSize int64, dataBlocks, parityBlocks int,
|
||||
checkSums []string, algo HashAlgo, pool *bpool.BytePool) (int64, error) {
|
||||
|
||||
// Offset and length cannot be negative.
|
||||
// ReadFile reads as much data as requested from the file under the given volume and path and writes the data to the provided writer.
|
||||
// The algorithm and the keys/checksums are used to verify the integrity of the given file. ReadFile will read data from the given offset
|
||||
// up to the given length. If parts of the file are corrupted ReadFile tries to reconstruct the data.
|
||||
func (s ErasureStorage) ReadFile(writer io.Writer, volume, path string, offset, length int64, totalLength int64, checksums [][]byte, algorithm BitrotAlgorithm, blocksize int64, pool *bpool.BytePool) (f ErasureFileInfo, err error) {
|
||||
if offset < 0 || length < 0 {
|
||||
return 0, traceError(errUnexpected)
|
||||
return f, traceError(errUnexpected)
|
||||
}
|
||||
|
||||
// Can't request more data than what is available.
|
||||
if offset+length > totalLength {
|
||||
return 0, traceError(errUnexpected)
|
||||
return f, traceError(errUnexpected)
|
||||
}
|
||||
if !algorithm.Available() {
|
||||
return f, traceError(errBitrotHashAlgoInvalid)
|
||||
}
|
||||
|
||||
// chunkSize is the amount of data that needs to be read from
|
||||
// each disk at a time.
|
||||
chunkSize := getChunkSize(blockSize, dataBlocks)
|
||||
|
||||
brVerifiers := make([]bitRotVerifier, len(disks))
|
||||
for i := range brVerifiers {
|
||||
brVerifiers[i].algo = algo
|
||||
brVerifiers[i].checkSum = checkSums[i]
|
||||
f.Checksums = make([][]byte, len(s.disks))
|
||||
verifiers := make([]*BitrotVerifier, len(s.disks))
|
||||
for i, disk := range s.disks {
|
||||
if disk == OfflineDisk {
|
||||
continue
|
||||
}
|
||||
verifiers[i] = NewBitrotVerifier(algorithm, checksums[i])
|
||||
}
|
||||
errChans := make([]chan error, len(s.disks))
|
||||
for i := range errChans {
|
||||
errChans[i] = make(chan error, 1)
|
||||
}
|
||||
lastBlock := totalLength / blocksize
|
||||
startOffset := offset % blocksize
|
||||
chunksize := getChunkSize(blocksize, s.dataBlocks)
|
||||
|
||||
// Total bytes written to writer
|
||||
var bytesWritten int64
|
||||
|
||||
startBlock := offset / blockSize
|
||||
endBlock := (offset + length) / blockSize
|
||||
|
||||
// curChunkSize = chunk size for the current block in the for loop below.
|
||||
// curBlockSize = block size for the current block in the for loop below.
|
||||
// curChunkSize and curBlockSize can change for the last block if totalLength%blockSize != 0
|
||||
curChunkSize := chunkSize
|
||||
curBlockSize := blockSize
|
||||
|
||||
// For each block, read chunk from each disk. If we are able to read all the data disks then we don't
|
||||
// need to read parity disks. If one of the data disk is missing we need to read DataBlocks+1 number
|
||||
// of disks. Once read, we Reconstruct() missing data if needed and write it to the given writer.
|
||||
for block := startBlock; block <= endBlock; block++ {
|
||||
// Mark all buffers as unused at the start of the loop so that the buffers
|
||||
// can be reused.
|
||||
blocks := make([][]byte, len(s.disks))
|
||||
for off := offset / blocksize; length > 0; off++ {
|
||||
blockOffset := off * chunksize
|
||||
pool.Reset()
|
||||
|
||||
// Each element of enBlocks holds curChunkSize'd amount of data read from its corresponding disk.
|
||||
enBlocks := make([][]byte, len(disks))
|
||||
|
||||
if ((offset + bytesWritten) / blockSize) == (totalLength / blockSize) {
|
||||
// This is the last block for which curBlockSize and curChunkSize can change.
|
||||
// For ex. if totalLength is 15M and blockSize is 10MB, curBlockSize for
|
||||
// the last block should be 5MB.
|
||||
curBlockSize = totalLength % blockSize
|
||||
curChunkSize = getChunkSize(curBlockSize, dataBlocks)
|
||||
if currentBlock := (offset + f.Size) / blocksize; currentBlock == lastBlock {
|
||||
blocksize = totalLength % blocksize
|
||||
chunksize = getChunkSize(blocksize, s.dataBlocks)
|
||||
}
|
||||
|
||||
// NOTE: That for the offset calculation we have to use chunkSize and
|
||||
// not curChunkSize. If we use curChunkSize for offset calculation
|
||||
// then it can result in wrong offset for the last block.
|
||||
blockOffset := block * chunkSize
|
||||
|
||||
// nextIndex - index from which next set of parallel reads
|
||||
// should happen.
|
||||
nextIndex := 0
|
||||
|
||||
for {
|
||||
// readDisks - disks from which we need to read in parallel.
|
||||
var readDisks []StorageAPI
|
||||
var err error
|
||||
// get readable disks slice from which we can read parallelly.
|
||||
readDisks, nextIndex, err = getReadDisks(disks, nextIndex, dataBlocks)
|
||||
if err != nil {
|
||||
return bytesWritten, err
|
||||
}
|
||||
// Issue a parallel read across the disks specified in readDisks.
|
||||
parallelRead(volume, path, readDisks, disks, enBlocks, blockOffset, curChunkSize, brVerifiers, pool)
|
||||
if isSuccessDecodeBlocks(enBlocks, dataBlocks) {
|
||||
// If enough blocks are available to do rs.Reconstruct()
|
||||
break
|
||||
}
|
||||
if nextIndex == len(disks) {
|
||||
// No more disks to read from.
|
||||
return bytesWritten, traceError(errXLReadQuorum)
|
||||
}
|
||||
// We do not have enough enough data blocks to reconstruct the data
|
||||
// hence continue the for-loop till we have enough data blocks.
|
||||
}
|
||||
|
||||
// If we have all the data blocks no need to decode, continue to write.
|
||||
if !isSuccessDataBlocks(enBlocks, dataBlocks) {
|
||||
// Reconstruct the missing data blocks.
|
||||
if err := decodeMissingData(enBlocks, dataBlocks, parityBlocks); err != nil {
|
||||
return bytesWritten, err
|
||||
}
|
||||
}
|
||||
|
||||
// Offset in enBlocks from where data should be read from.
|
||||
var enBlocksOffset int64
|
||||
|
||||
// Total data to be read from enBlocks.
|
||||
enBlocksLength := curBlockSize
|
||||
|
||||
// If this is the start block then enBlocksOffset might not be 0.
|
||||
if block == startBlock {
|
||||
enBlocksOffset = offset % blockSize
|
||||
enBlocksLength -= enBlocksOffset
|
||||
}
|
||||
|
||||
remaining := length - bytesWritten
|
||||
if remaining < enBlocksLength {
|
||||
// We should not send more data than what was requested.
|
||||
enBlocksLength = remaining
|
||||
}
|
||||
|
||||
// Write data blocks.
|
||||
n, err := writeDataBlocks(writer, enBlocks, dataBlocks, enBlocksOffset, enBlocksLength)
|
||||
err = s.readConcurrent(volume, path, blockOffset, chunksize, blocks, verifiers, errChans, pool)
|
||||
if err != nil {
|
||||
return bytesWritten, err
|
||||
return f, traceError(errXLReadQuorum)
|
||||
}
|
||||
|
||||
// Update total bytes written.
|
||||
bytesWritten += n
|
||||
writeLength := blocksize - startOffset
|
||||
if length < writeLength {
|
||||
writeLength = length
|
||||
}
|
||||
n, err := writeDataBlocks(writer, blocks, s.dataBlocks, startOffset, writeLength)
|
||||
if err != nil {
|
||||
return f, err
|
||||
}
|
||||
startOffset = 0
|
||||
f.Size += int64(n)
|
||||
length -= int64(n)
|
||||
}
|
||||
|
||||
if bytesWritten == length {
|
||||
// Done writing all the requested data.
|
||||
break
|
||||
f.Algorithm = algorithm
|
||||
for i, disk := range s.disks {
|
||||
if disk == OfflineDisk {
|
||||
continue
|
||||
}
|
||||
f.Checksums[i] = verifiers[i].Sum(nil)
|
||||
}
|
||||
return f, nil
|
||||
}
|
||||
|
||||
func erasureCountMissingBlocks(blocks [][]byte, limit int) int {
|
||||
missing := 0
|
||||
for i := range blocks[:limit] {
|
||||
if blocks[i] == nil {
|
||||
missing++
|
||||
}
|
||||
}
|
||||
|
||||
// Success.
|
||||
return bytesWritten, nil
|
||||
return missing
|
||||
}
|
||||
|
||||
// decodeMissingData - decode any missing data blocks.
|
||||
func decodeMissingData(enBlocks [][]byte, dataBlocks, parityBlocks int) error {
|
||||
// Initialize reedsolomon.
|
||||
rs, err := reedsolomon.New(dataBlocks, parityBlocks)
|
||||
if err != nil {
|
||||
return traceError(err)
|
||||
// readConcurrent reads all requested data concurrently from the disks into blocks. It returns an error if
|
||||
// too many disks failed while reading.
|
||||
func (s *ErasureStorage) readConcurrent(volume, path string, offset int64, length int64, blocks [][]byte, verifiers []*BitrotVerifier, errChans []chan error, pool *bpool.BytePool) (err error) {
|
||||
errs := make([]error, len(s.disks))
|
||||
for i := range blocks {
|
||||
blocks[i], err = pool.Get()
|
||||
if err != nil {
|
||||
return traceErrorf("failed to get new buffer from pool: %v", err)
|
||||
}
|
||||
blocks[i] = blocks[i][:length]
|
||||
}
|
||||
|
||||
// Reconstruct any missing data blocks.
|
||||
return rs.ReconstructData(enBlocks)
|
||||
}
|
||||
|
||||
// decodeDataAndParity - decode all encoded data and parity blocks.
|
||||
func decodeDataAndParity(enBlocks [][]byte, dataBlocks, parityBlocks int) error {
|
||||
// Initialize reedsolomon.
|
||||
rs, err := reedsolomon.New(dataBlocks, parityBlocks)
|
||||
if err != nil {
|
||||
return traceError(err)
|
||||
erasureReadBlocksConcurrent(s.disks[:s.dataBlocks], volume, path, offset, blocks[:s.dataBlocks], verifiers[:s.dataBlocks], errs[:s.dataBlocks], errChans[:s.dataBlocks])
|
||||
missingDataBlocks := erasureCountMissingBlocks(blocks, s.dataBlocks)
|
||||
mustReconstruct := missingDataBlocks > 0
|
||||
if mustReconstruct {
|
||||
requiredReads := s.dataBlocks + missingDataBlocks
|
||||
if requiredReads > s.dataBlocks+s.parityBlocks {
|
||||
return errXLReadQuorum
|
||||
}
|
||||
erasureReadBlocksConcurrent(s.disks[s.dataBlocks:requiredReads], volume, path, offset, blocks[s.dataBlocks:requiredReads], verifiers[s.dataBlocks:requiredReads], errs[s.dataBlocks:requiredReads], errChans[s.dataBlocks:requiredReads])
|
||||
if erasureCountMissingBlocks(blocks, requiredReads) > 0 {
|
||||
erasureReadBlocksConcurrent(s.disks[requiredReads:], volume, path, offset, blocks[requiredReads:], verifiers[requiredReads:], errs[requiredReads:], errChans[requiredReads:])
|
||||
}
|
||||
}
|
||||
|
||||
// Reconstruct encoded blocks.
|
||||
return rs.Reconstruct(enBlocks)
|
||||
if err = reduceReadQuorumErrs(errs, []error{}, s.dataBlocks); err != nil {
|
||||
return err
|
||||
}
|
||||
if mustReconstruct {
|
||||
if err = s.ErasureDecodeDataBlocks(blocks); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// erasureReadBlocksConcurrent reads all data from each disk to each data block in parallel.
|
||||
// Therefore disks, blocks, verifiers errors and locks must have the same length.
|
||||
func erasureReadBlocksConcurrent(disks []StorageAPI, volume, path string, offset int64, blocks [][]byte, verifiers []*BitrotVerifier, errors []error, errChans []chan error) {
|
||||
for i := range errChans {
|
||||
go erasureReadFromFile(disks[i], volume, path, offset, blocks[i], verifiers[i], errChans[i])
|
||||
}
|
||||
for i := range errChans {
|
||||
errors[i] = <-errChans[i] // blocks until the go routine 'i' is done - no data race
|
||||
if errors[i] != nil {
|
||||
disks[i] = OfflineDisk
|
||||
blocks[i] = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// erasureReadFromFile reads data from the disk to buffer in parallel.
|
||||
// It sends the returned error through the error channel.
|
||||
func erasureReadFromFile(disk StorageAPI, volume, path string, offset int64, buffer []byte, verifier *BitrotVerifier, errChan chan<- error) {
|
||||
if disk == OfflineDisk {
|
||||
errChan <- traceError(errDiskNotFound)
|
||||
return
|
||||
}
|
||||
var err error
|
||||
if !verifier.IsVerified() {
|
||||
_, err = disk.ReadFileWithVerify(volume, path, offset, buffer, verifier)
|
||||
} else {
|
||||
_, err = disk.ReadFile(volume, path, offset, buffer)
|
||||
}
|
||||
errChan <- err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user