mirror of
https://github.com/minio/minio.git
synced 2024-12-25 06:35:56 -05:00
730ac5381c
Simplify parallelReader.Read() which also fixes previous implementation where it was returning before all the parallel reading go-routines had terminated which caused race conditions.
194 lines
4.9 KiB
Go
194 lines
4.9 KiB
Go
/*
|
|
* Minio Cloud Storage, (C) 2016 Minio, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"sync"
|
|
|
|
"github.com/minio/minio/cmd/logger"
|
|
)
|
|
|
|
// Reads in parallel from readers.
|
|
type parallelReader struct {
|
|
readers []io.ReaderAt
|
|
dataBlocks int
|
|
offset int64
|
|
shardSize int64
|
|
shardFileSize int64
|
|
buf [][]byte
|
|
}
|
|
|
|
// newParallelReader returns parallelReader.
|
|
func newParallelReader(readers []io.ReaderAt, e Erasure, offset, totalLength int64) *parallelReader {
|
|
return ¶llelReader{
|
|
readers,
|
|
e.dataBlocks,
|
|
(offset / e.blockSize) * e.ShardSize(),
|
|
e.ShardSize(),
|
|
e.ShardFileSize(totalLength),
|
|
make([][]byte, len(readers)),
|
|
}
|
|
}
|
|
|
|
// Returns if buf can be erasure decoded.
|
|
func (p *parallelReader) canDecode(buf [][]byte) bool {
|
|
bufCount := 0
|
|
for _, b := range buf {
|
|
if b != nil {
|
|
bufCount++
|
|
}
|
|
}
|
|
return bufCount >= p.dataBlocks
|
|
}
|
|
|
|
// Read reads from readers in parallel. Returns p.dataBlocks number of bufs.
|
|
func (p *parallelReader) Read() ([][]byte, error) {
|
|
newBuf := make([][]byte, len(p.readers))
|
|
var newBufLK sync.RWMutex
|
|
|
|
if p.offset+p.shardSize > p.shardFileSize {
|
|
p.shardSize = p.shardFileSize - p.offset
|
|
}
|
|
|
|
readTriggerCh := make(chan bool, len(p.readers))
|
|
for i := 0; i < p.dataBlocks; i++ {
|
|
// Setup read triggers for p.dataBlocks number of reads so that it reads in parallel.
|
|
readTriggerCh <- true
|
|
}
|
|
|
|
readerIndex := 0
|
|
var wg sync.WaitGroup
|
|
// if readTrigger is true, it implies next disk.ReadAt() should be tried
|
|
// if readTrigger is false, it implies previous disk.ReadAt() was successful and there is no need
|
|
// to try reading the next disk.
|
|
for readTrigger := range readTriggerCh {
|
|
newBufLK.RLock()
|
|
canDecode := p.canDecode(newBuf)
|
|
newBufLK.RUnlock()
|
|
if canDecode {
|
|
break
|
|
}
|
|
if readerIndex == len(p.readers) {
|
|
break
|
|
}
|
|
if !readTrigger {
|
|
continue
|
|
}
|
|
wg.Add(1)
|
|
go func(i int) {
|
|
defer wg.Done()
|
|
disk := p.readers[i]
|
|
if disk == nil {
|
|
// Since disk is nil, trigger another read.
|
|
readTriggerCh <- true
|
|
return
|
|
}
|
|
if p.buf[i] == nil {
|
|
// Reading first time on this disk, hence the buffer needs to be allocated.
|
|
// Subsequent reads will re-use this buffer.
|
|
p.buf[i] = make([]byte, p.shardSize)
|
|
}
|
|
// For the last shard, the shardsize might be less than previous shard sizes.
|
|
// Hence the following statement ensures that the buffer size is reset to the right size.
|
|
p.buf[i] = p.buf[i][:p.shardSize]
|
|
_, err := disk.ReadAt(p.buf[i], p.offset)
|
|
if err != nil {
|
|
p.readers[i] = nil
|
|
// Since ReadAt returned error, trigger another read.
|
|
readTriggerCh <- true
|
|
return
|
|
}
|
|
newBufLK.Lock()
|
|
newBuf[i] = p.buf[i]
|
|
newBufLK.Unlock()
|
|
// Since ReadAt returned success, there is no need to trigger another read.
|
|
readTriggerCh <- false
|
|
}(readerIndex)
|
|
readerIndex++
|
|
}
|
|
wg.Wait()
|
|
|
|
if p.canDecode(newBuf) {
|
|
p.offset += p.shardSize
|
|
return newBuf, nil
|
|
}
|
|
|
|
return nil, errXLReadQuorum
|
|
}
|
|
|
|
// Decode reads from readers, reconstructs data if needed and writes the data to the writer.
|
|
func (e Erasure) Decode(ctx context.Context, writer io.Writer, readers []io.ReaderAt, offset, length, totalLength int64) error {
|
|
if offset < 0 || length < 0 {
|
|
logger.LogIf(ctx, errInvalidArgument)
|
|
return errInvalidArgument
|
|
}
|
|
if offset+length > totalLength {
|
|
logger.LogIf(ctx, errInvalidArgument)
|
|
return errInvalidArgument
|
|
}
|
|
if length == 0 {
|
|
return nil
|
|
}
|
|
|
|
reader := newParallelReader(readers, e, offset, totalLength)
|
|
|
|
startBlock := offset / e.blockSize
|
|
endBlock := (offset + length) / e.blockSize
|
|
|
|
var bytesWritten int64
|
|
for block := startBlock; block <= endBlock; block++ {
|
|
var blockOffset, blockLength int64
|
|
switch {
|
|
case startBlock == endBlock:
|
|
blockOffset = offset % e.blockSize
|
|
blockLength = length
|
|
case block == startBlock:
|
|
blockOffset = offset % e.blockSize
|
|
blockLength = e.blockSize - blockOffset
|
|
case block == endBlock:
|
|
blockOffset = 0
|
|
blockLength = (offset + length) % e.blockSize
|
|
default:
|
|
blockOffset = 0
|
|
blockLength = e.blockSize
|
|
}
|
|
if blockLength == 0 {
|
|
break
|
|
}
|
|
bufs, err := reader.Read()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err = e.DecodeDataBlocks(bufs); err != nil {
|
|
logger.LogIf(ctx, err)
|
|
return err
|
|
}
|
|
n, err := writeDataBlocks(ctx, writer, bufs, e.dataBlocks, blockOffset, blockLength)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
bytesWritten += n
|
|
}
|
|
if bytesWritten != length {
|
|
logger.LogIf(ctx, errLessData)
|
|
return errLessData
|
|
}
|
|
return nil
|
|
}
|