mirror of
https://github.com/minio/minio.git
synced 2025-01-11 23:13:23 -05:00
Refactor xl.GetObject and erasureReadFile. (#2211)
* XL: Refactor xl.GetObject and erasureReadFile. erasureReadFile() responsible for just erasure coding, it takes ordered disks and checkSum slice. * move getOrderedPartsMetadata and getOrderedDisks to xl-v1-utils.go * Review fixes.
This commit is contained in:
parent
2d38046a5a
commit
8cc163e51a
@ -65,38 +65,6 @@ func isSuccessDataBlocks(enBlocks [][]byte, dataBlocks int) bool {
|
||||
return successDataBlocksCount >= dataBlocks
|
||||
}
|
||||
|
||||
// Return ordered partsMetadata depeinding on distribution.
|
||||
func getOrderedPartsMetadata(distribution []int, partsMetadata []xlMetaV1) (orderedPartsMetadata []xlMetaV1) {
|
||||
orderedPartsMetadata = make([]xlMetaV1, len(partsMetadata))
|
||||
for index := range partsMetadata {
|
||||
blockIndex := distribution[index]
|
||||
orderedPartsMetadata[blockIndex-1] = partsMetadata[index]
|
||||
}
|
||||
return orderedPartsMetadata
|
||||
}
|
||||
|
||||
// getOrderedDisks - get ordered disks from erasure distribution.
|
||||
// returns ordered slice of disks from their actual distribution.
|
||||
func getOrderedDisks(distribution []int, disks []StorageAPI) (orderedDisks []StorageAPI) {
|
||||
orderedDisks = make([]StorageAPI, len(disks))
|
||||
// From disks gets ordered disks.
|
||||
for index := range disks {
|
||||
blockIndex := distribution[index]
|
||||
orderedDisks[blockIndex-1] = disks[index]
|
||||
}
|
||||
return orderedDisks
|
||||
}
|
||||
|
||||
// Return ordered CheckSums depending on the distribution.
|
||||
func getOrderedCheckSums(distribution []int, blockCheckSums []checkSumInfo) (orderedBlockCheckSums []checkSumInfo) {
|
||||
orderedBlockCheckSums = make([]checkSumInfo, len(blockCheckSums))
|
||||
for index := range blockCheckSums {
|
||||
blockIndex := distribution[index]
|
||||
orderedBlockCheckSums[blockIndex-1] = blockCheckSums[index]
|
||||
}
|
||||
return orderedBlockCheckSums
|
||||
}
|
||||
|
||||
// Return readable disks slice from which we can read parallelly.
|
||||
func getReadDisks(orderedDisks []StorageAPI, index int, dataBlocks int) (readDisks []StorageAPI, nextIndex int, err error) {
|
||||
readDisks = make([]StorageAPI, len(orderedDisks))
|
||||
@ -192,27 +160,16 @@ func parallelRead(volume, path string, readDisks []StorageAPI, orderedDisks []St
|
||||
// are decoded into a data block. Data block is trimmed for given offset and length,
|
||||
// then written to given writer. This function also supports bit-rot detection by
|
||||
// verifying checksum of individual block's checksum.
|
||||
func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path string, partName string, eInfos []erasureInfo, offset int64, length int64, totalLength int64) (int64, error) {
|
||||
func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path string, offset int64, length int64, totalLength int64, blockSize int64, dataBlocks int, parityBlocks int, checkSums []string) (int64, error) {
|
||||
// Offset and length cannot be negative.
|
||||
if offset < 0 || length < 0 {
|
||||
return 0, errUnexpected
|
||||
}
|
||||
|
||||
// Pick one erasure info.
|
||||
eInfo := pickValidErasureInfo(eInfos)
|
||||
|
||||
// Gather previously calculated block checksums.
|
||||
blockCheckSums := metaPartBlockChecksums(disks, eInfos, partName)
|
||||
|
||||
// []orderedDisks will have first eInfo.DataBlocks disks as data
|
||||
// disks and rest will be parity.
|
||||
orderedDisks := getOrderedDisks(eInfo.Distribution, disks)
|
||||
orderedBlockCheckSums := getOrderedCheckSums(eInfo.Distribution, blockCheckSums)
|
||||
|
||||
// bitRotVerify verifies if the file on a particular disk doesn't have bitrot
|
||||
// by verifying the hash of the contents of the file.
|
||||
bitRotVerify := func() func(diskIndex int) bool {
|
||||
verified := make([]bool, len(orderedDisks))
|
||||
verified := make([]bool, len(disks))
|
||||
// Return closure so that we have reference to []verified and
|
||||
// not recalculate the hash on it every time the function is
|
||||
// called for the same disk.
|
||||
@ -222,7 +179,7 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s
|
||||
return true
|
||||
}
|
||||
// Is this a valid block?
|
||||
isValid := isValidBlock(orderedDisks[diskIndex], volume, path, orderedBlockCheckSums[diskIndex])
|
||||
isValid := isValidBlock(disks[diskIndex], volume, path, checkSums[diskIndex])
|
||||
verified[diskIndex] = isValid
|
||||
return isValid
|
||||
}
|
||||
@ -235,32 +192,32 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s
|
||||
// chunkSize is calculated such that chunkSize*DataBlocks accommodates BlockSize bytes.
|
||||
// So chunkSize*DataBlocks can be slightly larger than BlockSize if BlockSize is not divisible by
|
||||
// DataBlocks. The extra space will have 0-padding.
|
||||
chunkSize := getEncodedBlockLen(eInfo.BlockSize, eInfo.DataBlocks)
|
||||
chunkSize := getEncodedBlockLen(blockSize, dataBlocks)
|
||||
|
||||
// Get start and end block, also bytes to be skipped based on the input offset.
|
||||
startBlock, endBlock, bytesToSkip := getBlockInfo(offset, totalLength, eInfo.BlockSize)
|
||||
startBlock, endBlock, bytesToSkip := getBlockInfo(offset, totalLength, blockSize)
|
||||
|
||||
// For each block, read chunk from each disk. If we are able to read all the data disks then we don't
|
||||
// need to read parity disks. If one of the data disk is missing we need to read DataBlocks+1 number
|
||||
// of disks. Once read, we Reconstruct() missing data if needed and write it to the given writer.
|
||||
for block := startBlock; bytesWritten < length; block++ {
|
||||
// Each element of enBlocks holds curChunkSize'd amount of data read from its corresponding disk.
|
||||
enBlocks := make([][]byte, len(orderedDisks))
|
||||
enBlocks := make([][]byte, len(disks))
|
||||
|
||||
// enBlocks data can have 0-padding hence we need to figure the exact number
|
||||
// of bytes we want to read from enBlocks.
|
||||
blockSize := eInfo.BlockSize
|
||||
blockSize := blockSize
|
||||
|
||||
// curChunkSize is chunkSize until end block.
|
||||
curChunkSize := chunkSize
|
||||
|
||||
// We have endBlock, verify if we need to have padding.
|
||||
if block == endBlock && (totalLength%eInfo.BlockSize != 0) {
|
||||
if block == endBlock && (totalLength%blockSize != 0) {
|
||||
// If this is the last block and size of the block is < BlockSize.
|
||||
curChunkSize = getEncodedBlockLen(totalLength%eInfo.BlockSize, eInfo.DataBlocks)
|
||||
curChunkSize = getEncodedBlockLen(totalLength%blockSize, dataBlocks)
|
||||
|
||||
// For the last block, the block size can be less than BlockSize.
|
||||
blockSize = totalLength % eInfo.BlockSize
|
||||
blockSize = totalLength % blockSize
|
||||
}
|
||||
|
||||
// Block offset.
|
||||
@ -277,25 +234,29 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s
|
||||
// readDisks - disks from which we need to read in parallel.
|
||||
var readDisks []StorageAPI
|
||||
var err error
|
||||
readDisks, nextIndex, err = getReadDisks(orderedDisks, nextIndex, eInfo.DataBlocks)
|
||||
// get readable disks slice from which we can read parallelly.
|
||||
readDisks, nextIndex, err = getReadDisks(disks, nextIndex, dataBlocks)
|
||||
if err != nil {
|
||||
return bytesWritten, err
|
||||
}
|
||||
parallelRead(volume, path, readDisks, orderedDisks, enBlocks, blockOffset, curChunkSize, bitRotVerify)
|
||||
if isSuccessDecodeBlocks(enBlocks, eInfo.DataBlocks) {
|
||||
// Issue a parallel read across the disks specified in readDisks.
|
||||
parallelRead(volume, path, readDisks, disks, enBlocks, blockOffset, curChunkSize, bitRotVerify)
|
||||
if isSuccessDecodeBlocks(enBlocks, dataBlocks) {
|
||||
// If enough blocks are available to do rs.Reconstruct()
|
||||
break
|
||||
}
|
||||
if nextIndex == len(orderedDisks) {
|
||||
if nextIndex == len(disks) {
|
||||
// No more disks to read from.
|
||||
return bytesWritten, errXLReadQuorum
|
||||
}
|
||||
// We do not have enough enough data blocks to reconstruct the data
|
||||
// hence continue the for-loop till we have enough data blocks.
|
||||
}
|
||||
|
||||
// If we have all the data blocks no need to decode, continue to write.
|
||||
if !isSuccessDataBlocks(enBlocks, eInfo.DataBlocks) {
|
||||
if !isSuccessDataBlocks(enBlocks, dataBlocks) {
|
||||
// Reconstruct the missing data blocks.
|
||||
if err := decodeData(enBlocks, eInfo.DataBlocks, eInfo.ParityBlocks); err != nil {
|
||||
if err := decodeData(enBlocks, dataBlocks, parityBlocks); err != nil {
|
||||
return bytesWritten, err
|
||||
}
|
||||
}
|
||||
@ -314,7 +275,7 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s
|
||||
}
|
||||
|
||||
// Write data blocks.
|
||||
n, err := writeDataBlocks(writer, enBlocks, eInfo.DataBlocks, outOffset, outSize)
|
||||
n, err := writeDataBlocks(writer, enBlocks, dataBlocks, outOffset, outSize)
|
||||
if err != nil {
|
||||
return bytesWritten, err
|
||||
}
|
||||
@ -337,34 +298,21 @@ func (e erasureInfo) PartObjectChecksum(partName string) checkSumInfo {
|
||||
return checkSumInfo{}
|
||||
}
|
||||
|
||||
// xlMetaPartBlockChecksums - get block checksums for a given part.
|
||||
func metaPartBlockChecksums(disks []StorageAPI, eInfos []erasureInfo, partName string) (blockCheckSums []checkSumInfo) {
|
||||
for index := range disks {
|
||||
if eInfos[index].IsValid() {
|
||||
// Save the read checksums for a given part.
|
||||
blockCheckSums = append(blockCheckSums, eInfos[index].PartObjectChecksum(partName))
|
||||
} else {
|
||||
blockCheckSums = append(blockCheckSums, checkSumInfo{})
|
||||
}
|
||||
}
|
||||
return blockCheckSums
|
||||
}
|
||||
|
||||
// isValidBlock - calculates the checksum hash for the block and
|
||||
// validates if its correct returns true for valid cases, false otherwise.
|
||||
func isValidBlock(disk StorageAPI, volume, path string, blockCheckSum checkSumInfo) (ok bool) {
|
||||
func isValidBlock(disk StorageAPI, volume, path string, checksum string) (ok bool) {
|
||||
// Disk is not available, not a valid block.
|
||||
if disk == nil {
|
||||
return false
|
||||
}
|
||||
// Read everything for a given block and calculate hash.
|
||||
hashWriter := newHash(blockCheckSum.Algorithm)
|
||||
hashWriter := newHash("blake2b")
|
||||
hashBytes, err := hashSum(disk, volume, path, hashWriter)
|
||||
if err != nil {
|
||||
errorIf(err, "Unable to calculate checksum %s/%s", volume, path)
|
||||
return false
|
||||
}
|
||||
return hex.EncodeToString(hashBytes) == blockCheckSum.Hash
|
||||
return hex.EncodeToString(hashBytes) == checksum
|
||||
}
|
||||
|
||||
// decodeData - decode encoded blocks.
|
||||
|
@ -86,6 +86,8 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i
|
||||
break
|
||||
}
|
||||
}
|
||||
onlineDisks = getOrderedDisks(xlMeta.Erasure.Distribution, onlineDisks)
|
||||
metaArr = getOrderedPartsMetadata(xlMeta.Erasure.Distribution, metaArr)
|
||||
|
||||
// Reply back invalid range if the input offset and length fall out of range.
|
||||
if startOffset > xlMeta.Stat.Size || length > xlMeta.Stat.Size {
|
||||
@ -109,12 +111,6 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i
|
||||
return toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
// Collect all the previous erasure infos across the disk.
|
||||
var eInfos []erasureInfo
|
||||
for index := range onlineDisks {
|
||||
eInfos = append(eInfos, metaArr[index].Erasure)
|
||||
}
|
||||
|
||||
// Save the writer.
|
||||
mw := writer
|
||||
|
||||
@ -173,8 +169,17 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i
|
||||
readSize = length - totalBytesRead
|
||||
}
|
||||
|
||||
// Get the checksums of the current part.
|
||||
checkSums := make([]string, len(onlineDisks))
|
||||
for index := range onlineDisks {
|
||||
checkSums[index], _, err = metaArr[index].GetCheckSum(partName)
|
||||
if err != nil {
|
||||
return toObjectErr(err, bucket, object)
|
||||
}
|
||||
}
|
||||
|
||||
// Start reading the part name.
|
||||
n, err := erasureReadFile(mw, onlineDisks, bucket, pathJoin(object, partName), partName, eInfos, partOffset, readSize, partSize)
|
||||
n, err := erasureReadFile(mw, onlineDisks, bucket, pathJoin(object, partName), partOffset, readSize, partSize, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks, xlMeta.Erasure.ParityBlocks, checkSums)
|
||||
if err != nil {
|
||||
return toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
@ -142,3 +142,25 @@ func unionChecksumInfos(cur []checkSumInfo, updated []checkSumInfo, curPartName
|
||||
}
|
||||
return finalChecksums
|
||||
}
|
||||
|
||||
// Return ordered partsMetadata depeinding on distribution.
|
||||
func getOrderedPartsMetadata(distribution []int, partsMetadata []xlMetaV1) (orderedPartsMetadata []xlMetaV1) {
|
||||
orderedPartsMetadata = make([]xlMetaV1, len(partsMetadata))
|
||||
for index := range partsMetadata {
|
||||
blockIndex := distribution[index]
|
||||
orderedPartsMetadata[blockIndex-1] = partsMetadata[index]
|
||||
}
|
||||
return orderedPartsMetadata
|
||||
}
|
||||
|
||||
// getOrderedDisks - get ordered disks from erasure distribution.
|
||||
// returns ordered slice of disks from their actual distribution.
|
||||
func getOrderedDisks(distribution []int, disks []StorageAPI) (orderedDisks []StorageAPI) {
|
||||
orderedDisks = make([]StorageAPI, len(disks))
|
||||
// From disks gets ordered disks.
|
||||
for index := range disks {
|
||||
blockIndex := distribution[index]
|
||||
orderedDisks[blockIndex-1] = disks[index]
|
||||
}
|
||||
return orderedDisks
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user