mirror of
https://github.com/minio/minio.git
synced 2025-01-11 15:03:22 -05:00
XL/erasure: refactor erasureReadFile. Move parallelRead into a separate function. (#2008)
This commit is contained in:
parent
2e1f66c37d
commit
5291db60c6
@ -125,6 +125,50 @@ func getReadDisks(orderedDisks []StorageAPI, index int, dataBlocks int) (readDis
|
||||
return nil, 0, errXLReadQuorum
|
||||
}
|
||||
|
||||
// parallelRead - reads chunks in parallel from the disks specified in []readDisks.
|
||||
func parallelRead(volume, path string, readDisks []StorageAPI, orderedDisks []StorageAPI, enBlocks [][]byte, blockOffset int64, curChunkSize int64, bitRotVerify func(diskIndex int) bool) {
|
||||
// WaitGroup to synchronise the read go-routines.
|
||||
wg := &sync.WaitGroup{}
|
||||
|
||||
// Read disks in parallel.
|
||||
for index := range readDisks {
|
||||
if readDisks[index] == nil {
|
||||
continue
|
||||
}
|
||||
wg.Add(1)
|
||||
// Reads chunk from readDisk[index] in routine.
|
||||
go func(index int) {
|
||||
defer wg.Done()
|
||||
|
||||
// Verify bit rot for the file on this disk.
|
||||
if !bitRotVerify(index) {
|
||||
// So that we don't read from this disk for the next block.
|
||||
orderedDisks[index] = nil
|
||||
return
|
||||
}
|
||||
|
||||
// Chunk writer.
|
||||
chunkWriter := bytes.NewBuffer(make([]byte, 0, curChunkSize))
|
||||
|
||||
// CopyN - copies until current chunk size.
|
||||
err := copyN(chunkWriter, readDisks[index], volume, path, blockOffset, curChunkSize)
|
||||
if err != nil {
|
||||
// So that we don't read from this disk for the next block.
|
||||
orderedDisks[index] = nil
|
||||
return
|
||||
}
|
||||
|
||||
// Copy the read blocks.
|
||||
enBlocks[index] = chunkWriter.Bytes()
|
||||
|
||||
// Successfully read.
|
||||
}(index)
|
||||
}
|
||||
|
||||
// Waiting for first routines to finish.
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// erasureReadFile - read bytes from erasure coded files and writes to given writer.
|
||||
// Erasure coded files are read block by block as per given erasureInfo and data chunks
|
||||
// are decoded into a data block. Data block is trimmed for given offset and length,
|
||||
@ -205,86 +249,29 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s
|
||||
// should happen.
|
||||
nextIndex := 0
|
||||
|
||||
// parallelRead() reads DataBlocks number of disks if all data
|
||||
// disks are available or DataBlocks+1 number of disks if one
|
||||
// of the data disks is missing. All the reads happen in parallel.
|
||||
var parallelRead func() error
|
||||
parallelRead = func() error {
|
||||
// If enough blocks are available to do rs.Reconstruct()
|
||||
if isSuccessDecodeBlocks(enBlocks, eInfo.DataBlocks) {
|
||||
return nil
|
||||
}
|
||||
if nextIndex == len(orderedDisks) {
|
||||
// No more disks to read from.
|
||||
return errXLReadQuorum
|
||||
}
|
||||
|
||||
for {
|
||||
// readDisks - disks from which we need to read in parallel.
|
||||
var readDisks []StorageAPI
|
||||
var err error
|
||||
readDisks, nextIndex, err = getReadDisks(orderedDisks, nextIndex, eInfo.DataBlocks)
|
||||
if err != nil {
|
||||
return err
|
||||
return bytesWritten, err
|
||||
}
|
||||
|
||||
// WaitGroup to synchronise the read go-routines.
|
||||
wg := &sync.WaitGroup{}
|
||||
|
||||
// Read disks in parallel.
|
||||
for index := range readDisks {
|
||||
if readDisks[index] == nil {
|
||||
continue
|
||||
}
|
||||
wg.Add(1)
|
||||
// Reads chunk from readDisk[index] in routine.
|
||||
go func(index int) {
|
||||
defer wg.Done()
|
||||
|
||||
// Verify bit rot for the file on this disk.
|
||||
if !bitRotVerify(index) {
|
||||
// So that we don't read from this disk for the next block.
|
||||
orderedDisks[index] = nil
|
||||
return
|
||||
}
|
||||
|
||||
// Chunk writer.
|
||||
chunkWriter := bytes.NewBuffer(make([]byte, 0, curChunkSize))
|
||||
|
||||
// CopyN - copies until current chunk size.
|
||||
err := copyN(chunkWriter, readDisks[index], volume, path, blockOffset, curChunkSize)
|
||||
if err != nil {
|
||||
// So that we don't read from this disk for the next block.
|
||||
orderedDisks[index] = nil
|
||||
return
|
||||
}
|
||||
|
||||
// Copy the read blocks.
|
||||
enBlocks[index] = chunkWriter.Bytes()
|
||||
|
||||
// Reset the buffer.
|
||||
chunkWriter.Reset()
|
||||
|
||||
// Successfully read.
|
||||
}(index)
|
||||
parallelRead(volume, path, readDisks, orderedDisks, enBlocks, blockOffset, curChunkSize, bitRotVerify)
|
||||
if isSuccessDecodeBlocks(enBlocks, eInfo.DataBlocks) {
|
||||
// If enough blocks are available to do rs.Reconstruct()
|
||||
break
|
||||
}
|
||||
if nextIndex == len(orderedDisks) {
|
||||
// No more disks to read from.
|
||||
return bytesWritten, errXLReadQuorum
|
||||
}
|
||||
|
||||
// Waiting for first routines to finish.
|
||||
wg.Wait()
|
||||
|
||||
// Continue to read the rest of the blocks in parallel.
|
||||
return parallelRead()
|
||||
}
|
||||
|
||||
// Start reading all blocks in parallel.
|
||||
err := parallelRead()
|
||||
if err != nil {
|
||||
return bytesWritten, err
|
||||
}
|
||||
|
||||
// If we have all the data blocks no need to decode, continue to write.
|
||||
if !isSuccessDataBlocks(enBlocks, eInfo.DataBlocks) {
|
||||
// Reconstruct the missing data blocks.
|
||||
if err = decodeData(enBlocks, eInfo.DataBlocks, eInfo.ParityBlocks); err != nil {
|
||||
if err := decodeData(enBlocks, eInfo.DataBlocks, eInfo.ParityBlocks); err != nil {
|
||||
return bytesWritten, err
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user