mirror of
https://github.com/minio/minio.git
synced 2025-01-11 15:03:22 -05:00
XL/erasure-read: refactor erasure read and add tests (#2232)
This commit is contained in:
parent
cef26fd6ea
commit
18728a0b59
@ -166,6 +166,11 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s
|
|||||||
return 0, errUnexpected
|
return 0, errUnexpected
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Can't request more data than what is available.
|
||||||
|
if offset+length > totalLength {
|
||||||
|
return 0, errUnexpected
|
||||||
|
}
|
||||||
|
|
||||||
// bitRotVerify verifies if the file on a particular disk doesn't have bitrot
|
// bitRotVerify verifies if the file on a particular disk doesn't have bitrot
|
||||||
// by verifying the hash of the contents of the file.
|
// by verifying the hash of the contents of the file.
|
||||||
bitRotVerify := func() func(diskIndex int) bool {
|
bitRotVerify := func() func(diskIndex int) bool {
|
||||||
@ -188,39 +193,33 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s
|
|||||||
// Total bytes written to writer
|
// Total bytes written to writer
|
||||||
bytesWritten := int64(0)
|
bytesWritten := int64(0)
|
||||||
|
|
||||||
// chunkSize is roughly BlockSize/DataBlocks.
|
// chunkSize is the amount of data that needs to be read from each disk at a time.
|
||||||
// chunkSize is calculated such that chunkSize*DataBlocks accommodates BlockSize bytes.
|
chunkSize := getChunkSize(blockSize, dataBlocks)
|
||||||
// So chunkSize*DataBlocks can be slightly larger than BlockSize if BlockSize is not divisible by
|
|
||||||
// DataBlocks. The extra space will have 0-padding.
|
|
||||||
chunkSize := getEncodedBlockLen(blockSize, dataBlocks)
|
|
||||||
|
|
||||||
// Get start and end block, also bytes to be skipped based on the input offset.
|
startBlock := offset / blockSize
|
||||||
startBlock, endBlock, bytesToSkip := getBlockInfo(offset, totalLength, blockSize)
|
endBlock := (offset + length) / blockSize
|
||||||
|
|
||||||
|
// curChunkSize = chunk size for the current block in the for loop below.
|
||||||
|
// curBlockSize = block size for the current block in the for loop below.
|
||||||
|
// curChunkSize and curBlockSize can change for the last block if totalLength%blockSize != 0
|
||||||
|
curChunkSize := chunkSize
|
||||||
|
curBlockSize := blockSize
|
||||||
|
|
||||||
// For each block, read chunk from each disk. If we are able to read all the data disks then we don't
|
// For each block, read chunk from each disk. If we are able to read all the data disks then we don't
|
||||||
// need to read parity disks. If one of the data disk is missing we need to read DataBlocks+1 number
|
// need to read parity disks. If one of the data disk is missing we need to read DataBlocks+1 number
|
||||||
// of disks. Once read, we Reconstruct() missing data if needed and write it to the given writer.
|
// of disks. Once read, we Reconstruct() missing data if needed and write it to the given writer.
|
||||||
for block := startBlock; bytesWritten < length; block++ {
|
for block := startBlock; block <= endBlock; block++ {
|
||||||
// Each element of enBlocks holds curChunkSize'd amount of data read from its corresponding disk.
|
// Each element of enBlocks holds curChunkSize'd amount of data read from its corresponding disk.
|
||||||
enBlocks := make([][]byte, len(disks))
|
enBlocks := make([][]byte, len(disks))
|
||||||
|
|
||||||
// enBlocks data can have 0-padding hence we need to figure the exact number
|
if ((offset + bytesWritten) / blockSize) == (totalLength / blockSize) {
|
||||||
// of bytes we want to read from enBlocks.
|
// This is the last block for which curBlockSize and curChunkSize can change.
|
||||||
blockSize := blockSize
|
// For ex. if totalLength is 15M and blockSize is 10MB, curBlockSize for
|
||||||
|
// the last block should be 5MB.
|
||||||
// curChunkSize is chunkSize until end block.
|
curBlockSize = totalLength % blockSize
|
||||||
curChunkSize := chunkSize
|
curChunkSize = getChunkSize(curBlockSize, dataBlocks)
|
||||||
|
|
||||||
// We have endBlock, verify if we need to have padding.
|
|
||||||
if block == endBlock && (totalLength%blockSize != 0) {
|
|
||||||
// If this is the last block and size of the block is < BlockSize.
|
|
||||||
curChunkSize = getEncodedBlockLen(totalLength%blockSize, dataBlocks)
|
|
||||||
|
|
||||||
// For the last block, the block size can be less than BlockSize.
|
|
||||||
blockSize = totalLength % blockSize
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Block offset.
|
|
||||||
// NOTE: That for the offset calculation we have to use chunkSize and
|
// NOTE: That for the offset calculation we have to use chunkSize and
|
||||||
// not curChunkSize. If we use curChunkSize for offset calculation
|
// not curChunkSize. If we use curChunkSize for offset calculation
|
||||||
// then it can result in wrong offset for the last block.
|
// then it can result in wrong offset for the last block.
|
||||||
@ -261,30 +260,37 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var outSize, outOffset int64
|
// Offset in enBlocks from where data should be read from.
|
||||||
|
enBlocksOffset := int64(0)
|
||||||
|
|
||||||
// Total data to be read.
|
// Total data to be read from enBlocks.
|
||||||
outSize = blockSize
|
enBlocksLength := curBlockSize
|
||||||
|
|
||||||
// If this is start block, skip unwanted bytes.
|
// If this is the start block then enBlocksOffset might not be 0.
|
||||||
if block == startBlock {
|
if block == startBlock {
|
||||||
outOffset = bytesToSkip
|
enBlocksOffset = offset % blockSize
|
||||||
outSize -= bytesToSkip
|
enBlocksLength -= enBlocksOffset
|
||||||
}
|
}
|
||||||
|
|
||||||
if length-bytesWritten < outSize {
|
remaining := length - bytesWritten
|
||||||
|
if remaining < enBlocksLength {
|
||||||
// We should not send more data than what was requested.
|
// We should not send more data than what was requested.
|
||||||
outSize = length - bytesWritten
|
enBlocksLength = remaining
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write data blocks.
|
// Write data blocks.
|
||||||
n, err := writeDataBlocks(writer, enBlocks, dataBlocks, outOffset, outSize)
|
n, err := writeDataBlocks(writer, enBlocks, dataBlocks, enBlocksOffset, enBlocksLength)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return bytesWritten, err
|
return bytesWritten, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update total bytes written.
|
// Update total bytes written.
|
||||||
bytesWritten += n
|
bytesWritten += n
|
||||||
|
|
||||||
|
if bytesWritten == length {
|
||||||
|
// Done writing all the requested data.
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Success.
|
// Success.
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
* Minio Cloud Storage, (C) 2015, 2016 Minio, Inc.
|
* Minio Cloud Storage, (C) 2016 Minio, Inc.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
@ -18,8 +18,9 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"crypto/rand"
|
"math/rand"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
import "reflect"
|
import "reflect"
|
||||||
|
|
||||||
@ -308,7 +309,7 @@ func TestErasureReadFileOffsetLength(t *testing.T) {
|
|||||||
|
|
||||||
disks := setup.disks
|
disks := setup.disks
|
||||||
|
|
||||||
// Prepare a slice of 1MB with random data.
|
// Prepare a slice of 5MB with random data.
|
||||||
data := make([]byte, 5*1024*1024)
|
data := make([]byte, 5*1024*1024)
|
||||||
length := int64(len(data))
|
length := int64(len(data))
|
||||||
_, err = rand.Read(data)
|
_, err = rand.Read(data)
|
||||||
@ -330,6 +331,8 @@ func TestErasureReadFileOffsetLength(t *testing.T) {
|
|||||||
}{
|
}{
|
||||||
// Full file.
|
// Full file.
|
||||||
{0, length},
|
{0, length},
|
||||||
|
// Read nothing.
|
||||||
|
{length, 0},
|
||||||
// 2nd block.
|
// 2nd block.
|
||||||
{blockSize, blockSize},
|
{blockSize, blockSize},
|
||||||
// Test cases for random offsets and lengths.
|
// Test cases for random offsets and lengths.
|
||||||
@ -338,7 +341,7 @@ func TestErasureReadFileOffsetLength(t *testing.T) {
|
|||||||
{blockSize + 1, blockSize - 1},
|
{blockSize + 1, blockSize - 1},
|
||||||
{blockSize + 1, blockSize},
|
{blockSize + 1, blockSize},
|
||||||
{blockSize + 1, blockSize + 1},
|
{blockSize + 1, blockSize + 1},
|
||||||
{blockSize*2 - 1, blockSize*3 + 1},
|
{blockSize*2 - 1, blockSize + 1},
|
||||||
{length - 1, 1},
|
{length - 1, 1},
|
||||||
{length - blockSize, blockSize},
|
{length - blockSize, blockSize},
|
||||||
{length - blockSize - 1, blockSize},
|
{length - blockSize - 1, blockSize},
|
||||||
@ -359,3 +362,66 @@ func TestErasureReadFileOffsetLength(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test erasureReadFile with random offset and lengths.
|
||||||
|
// This test is t.Skip()ed as it a long time to run, hence should be run
|
||||||
|
// explicitly after commenting out t.Skip()
|
||||||
|
func TestErasureReadFileRandomOffsetLength(t *testing.T) {
|
||||||
|
// Comment the following line to run this test.
|
||||||
|
t.SkipNow()
|
||||||
|
// Initialize environment needed for the test.
|
||||||
|
dataBlocks := 7
|
||||||
|
parityBlocks := 7
|
||||||
|
blockSize := int64(1 * 1024 * 1024)
|
||||||
|
setup, err := newErasureTestSetup(dataBlocks, parityBlocks, blockSize)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer setup.Remove()
|
||||||
|
|
||||||
|
disks := setup.disks
|
||||||
|
|
||||||
|
// Prepare a slice of 5MB with random data.
|
||||||
|
data := make([]byte, 5*1024*1024)
|
||||||
|
length := int64(len(data))
|
||||||
|
_, err = rand.Read(data)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 10000 iterations with random offsets and lengths.
|
||||||
|
iterations := 10000
|
||||||
|
|
||||||
|
// Create a test file to read from.
|
||||||
|
size, checkSums, err := erasureCreateFile(disks, "testbucket", "testobject", bytes.NewReader(data), blockSize, dataBlocks, parityBlocks, dataBlocks+1)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if size != length {
|
||||||
|
t.Errorf("erasureCreateFile returned %d, expected %d", size, length)
|
||||||
|
}
|
||||||
|
|
||||||
|
// To generate random offset/length.
|
||||||
|
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||||
|
|
||||||
|
buf := &bytes.Buffer{}
|
||||||
|
|
||||||
|
// Verify erasureReadFile() for random offsets and lengths.
|
||||||
|
for i := 0; i < iterations; i++ {
|
||||||
|
offset := r.Int63n(length)
|
||||||
|
readLen := r.Int63n(length - offset)
|
||||||
|
|
||||||
|
expected := data[offset : offset+readLen]
|
||||||
|
|
||||||
|
size, err = erasureReadFile(buf, disks, "testbucket", "testobject", offset, readLen, length, blockSize, dataBlocks, parityBlocks, checkSums)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err, offset, readLen)
|
||||||
|
}
|
||||||
|
got := buf.Bytes()
|
||||||
|
if !bytes.Equal(expected, got) {
|
||||||
|
t.Fatalf("read data is different from what was expected, offset=%d length=%d", offset, readLen)
|
||||||
|
}
|
||||||
|
buf.Reset()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -73,9 +73,9 @@ func getDataBlockLen(enBlocks [][]byte, dataBlocks int) int {
|
|||||||
|
|
||||||
// Writes all the data blocks from encoded blocks until requested
|
// Writes all the data blocks from encoded blocks until requested
|
||||||
// outSize length. Provides a way to skip bytes until the offset.
|
// outSize length. Provides a way to skip bytes until the offset.
|
||||||
func writeDataBlocks(dst io.Writer, enBlocks [][]byte, dataBlocks int, outOffset int64, outSize int64) (int64, error) {
|
func writeDataBlocks(dst io.Writer, enBlocks [][]byte, dataBlocks int, offset int64, length int64) (int64, error) {
|
||||||
// Offset and out size cannot be negative.
|
// Offset and out size cannot be negative.
|
||||||
if outOffset < 0 || outSize < 0 {
|
if offset < 0 || length < 0 {
|
||||||
return 0, errUnexpected
|
return 0, errUnexpected
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -85,12 +85,12 @@ func writeDataBlocks(dst io.Writer, enBlocks [][]byte, dataBlocks int, outOffset
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Do we have enough data?
|
// Do we have enough data?
|
||||||
if int64(getDataBlockLen(enBlocks, dataBlocks)) < outSize {
|
if int64(getDataBlockLen(enBlocks, dataBlocks)) < length {
|
||||||
return 0, reedsolomon.ErrShortData
|
return 0, reedsolomon.ErrShortData
|
||||||
}
|
}
|
||||||
|
|
||||||
// Counter to decrement total left to write.
|
// Counter to decrement total left to write.
|
||||||
write := outSize
|
write := length
|
||||||
|
|
||||||
// Counter to increment total written.
|
// Counter to increment total written.
|
||||||
totalWritten := int64(0)
|
totalWritten := int64(0)
|
||||||
@ -98,17 +98,17 @@ func writeDataBlocks(dst io.Writer, enBlocks [][]byte, dataBlocks int, outOffset
|
|||||||
// Write all data blocks to dst.
|
// Write all data blocks to dst.
|
||||||
for _, block := range enBlocks[:dataBlocks] {
|
for _, block := range enBlocks[:dataBlocks] {
|
||||||
// Skip blocks until we have reached our offset.
|
// Skip blocks until we have reached our offset.
|
||||||
if outOffset >= int64(len(block)) {
|
if offset >= int64(len(block)) {
|
||||||
// Decrement offset.
|
// Decrement offset.
|
||||||
outOffset -= int64(len(block))
|
offset -= int64(len(block))
|
||||||
continue
|
continue
|
||||||
} else {
|
} else {
|
||||||
// Skip until offset.
|
// Skip until offset.
|
||||||
block = block[outOffset:]
|
block = block[offset:]
|
||||||
|
|
||||||
// Reset the offset for next iteration to read everything
|
// Reset the offset for next iteration to read everything
|
||||||
// from subsequent blocks.
|
// from subsequent blocks.
|
||||||
outOffset = 0
|
offset = 0
|
||||||
}
|
}
|
||||||
// We have written all the blocks, write the last remaining block.
|
// We have written all the blocks, write the last remaining block.
|
||||||
if write < int64(len(block)) {
|
if write < int64(len(block)) {
|
||||||
@ -136,20 +136,12 @@ func writeDataBlocks(dst io.Writer, enBlocks [][]byte, dataBlocks int, outOffset
|
|||||||
return totalWritten, nil
|
return totalWritten, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// getBlockInfo - find start/end block and bytes to skip for given offset, length and block size.
|
// chunkSize is roughly BlockSize/DataBlocks.
|
||||||
func getBlockInfo(offset, length, blockSize int64) (startBlock, endBlock, bytesToSkip int64) {
|
// chunkSize is calculated such that chunkSize*DataBlocks accommodates BlockSize bytes.
|
||||||
// Calculate start block for given offset and how many bytes to skip to get the offset.
|
// So chunkSize*DataBlocks can be slightly larger than BlockSize if BlockSize is not divisible by
|
||||||
startBlock = offset / blockSize
|
// DataBlocks. The extra space will have 0-padding.
|
||||||
bytesToSkip = offset % blockSize
|
func getChunkSize(blockSize int64, dataBlocks int) int64 {
|
||||||
endBlock = length / blockSize
|
return (blockSize + int64(dataBlocks) - 1) / int64(dataBlocks)
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// calculate the blockSize based on input length and total number of
|
|
||||||
// data blocks.
|
|
||||||
func getEncodedBlockLen(inputLen int64, dataBlocks int) (curEncBlockSize int64) {
|
|
||||||
curEncBlockSize = (inputLen + int64(dataBlocks) - 1) / int64(dataBlocks)
|
|
||||||
return curEncBlockSize
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// copyN - copies from disk, volume, path to input writer until length
|
// copyN - copies from disk, volume, path to input writer until length
|
||||||
|
53
erasure-utils_test.go
Normal file
53
erasure-utils_test.go
Normal file
@ -0,0 +1,53 @@
|
|||||||
|
/*
|
||||||
|
* Minio Cloud Storage, (C) 2016 Minio, Inc.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package main
|
||||||
|
|
||||||
|
import "testing"
|
||||||
|
|
||||||
|
// Test for getChunkSize()
|
||||||
|
func TestGetChunkSize(t *testing.T) {
|
||||||
|
// Refer to comments on getChunkSize() for details.
|
||||||
|
testCases := []struct {
|
||||||
|
blockSize int64
|
||||||
|
dataBlocks int
|
||||||
|
chunkSize int64
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
10,
|
||||||
|
10,
|
||||||
|
1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
10,
|
||||||
|
11,
|
||||||
|
1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
10,
|
||||||
|
9,
|
||||||
|
2,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
// Verify getChunkSize() for the test cases.
|
||||||
|
for i, test := range testCases {
|
||||||
|
expected := test.chunkSize
|
||||||
|
got := getChunkSize(test.blockSize, test.dataBlocks)
|
||||||
|
if expected != got {
|
||||||
|
t.Errorf("Test %d : expected=%d got=%d", i+1, expected, got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user