mirror of
https://github.com/minio/minio.git
synced 2025-04-16 00:49:09 -04:00
fix: erasure index based reading based on actual ParityBlocks (#11792)
in some setups with ordering issues in drive configuration, we should rely on expected parityBlocks instead of `len(disks)/2`
This commit is contained in:
parent
e5a1a2a974
commit
6160188bf3
@ -161,7 +161,7 @@ func (p *parallelReader) Read(dst [][]byte) ([][]byte, error) {
|
|||||||
// For the last shard, the shardsize might be less than previous shard sizes.
|
// For the last shard, the shardsize might be less than previous shard sizes.
|
||||||
// Hence the following statement ensures that the buffer size is reset to the right size.
|
// Hence the following statement ensures that the buffer size is reset to the right size.
|
||||||
p.buf[bufIdx] = p.buf[bufIdx][:p.shardSize]
|
p.buf[bufIdx] = p.buf[bufIdx][:p.shardSize]
|
||||||
_, err := rr.ReadAt(p.buf[bufIdx], p.offset)
|
n, err := rr.ReadAt(p.buf[bufIdx], p.offset)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, errFileNotFound) {
|
if errors.Is(err, errFileNotFound) {
|
||||||
atomic.StoreInt32(&missingPartsHeal, 1)
|
atomic.StoreInt32(&missingPartsHeal, 1)
|
||||||
@ -179,7 +179,7 @@ func (p *parallelReader) Read(dst [][]byte) ([][]byte, error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
newBufLK.Lock()
|
newBufLK.Lock()
|
||||||
newBuf[bufIdx] = p.buf[bufIdx]
|
newBuf[bufIdx] = p.buf[bufIdx][:n]
|
||||||
newBufLK.Unlock()
|
newBufLK.Unlock()
|
||||||
// Since ReadAt returned success, there is no need to trigger another read.
|
// Since ReadAt returned success, there is no need to trigger another read.
|
||||||
readTriggerCh <- false
|
readTriggerCh <- false
|
||||||
|
@ -145,10 +145,11 @@ func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, ve
|
|||||||
return metadataArray, g.Wait()
|
return metadataArray, g.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
func shuffleDisksAndPartsMetadataByIndex(disks []StorageAPI, metaArr []FileInfo, distribution []int) (shuffledDisks []StorageAPI, shuffledPartsMetadata []FileInfo) {
|
func shuffleDisksAndPartsMetadataByIndex(disks []StorageAPI, metaArr []FileInfo, fi FileInfo) (shuffledDisks []StorageAPI, shuffledPartsMetadata []FileInfo) {
|
||||||
shuffledDisks = make([]StorageAPI, len(disks))
|
shuffledDisks = make([]StorageAPI, len(disks))
|
||||||
shuffledPartsMetadata = make([]FileInfo, len(disks))
|
shuffledPartsMetadata = make([]FileInfo, len(disks))
|
||||||
var inconsistent int
|
var inconsistent int
|
||||||
|
distribution := fi.Erasure.Distribution
|
||||||
for i, meta := range metaArr {
|
for i, meta := range metaArr {
|
||||||
if disks[i] == nil {
|
if disks[i] == nil {
|
||||||
// Assuming offline drives as inconsistent,
|
// Assuming offline drives as inconsistent,
|
||||||
@ -171,7 +172,7 @@ func shuffleDisksAndPartsMetadataByIndex(disks []StorageAPI, metaArr []FileInfo,
|
|||||||
// Inconsistent meta info is with in the limit of
|
// Inconsistent meta info is with in the limit of
|
||||||
// expected quorum, proceed with EcIndex based
|
// expected quorum, proceed with EcIndex based
|
||||||
// disk order.
|
// disk order.
|
||||||
if inconsistent < len(disks)/2 {
|
if inconsistent < fi.Erasure.ParityBlocks {
|
||||||
return shuffledDisks, shuffledPartsMetadata
|
return shuffledDisks, shuffledPartsMetadata
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -783,7 +783,7 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
|
|||||||
|
|
||||||
// Order online disks in accordance with distribution order.
|
// Order online disks in accordance with distribution order.
|
||||||
// Order parts metadata in accordance with distribution order.
|
// Order parts metadata in accordance with distribution order.
|
||||||
onlineDisks, partsMetadata = shuffleDisksAndPartsMetadataByIndex(onlineDisks, partsMetadata, fi.Erasure.Distribution)
|
onlineDisks, partsMetadata = shuffleDisksAndPartsMetadataByIndex(onlineDisks, partsMetadata, fi)
|
||||||
|
|
||||||
// Save current erasure metadata for validation.
|
// Save current erasure metadata for validation.
|
||||||
var currentFI = fi
|
var currentFI = fi
|
||||||
|
@ -85,7 +85,7 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d
|
|||||||
return fi.ToObjectInfo(srcBucket, srcObject), toObjectErr(errMethodNotAllowed, srcBucket, srcObject)
|
return fi.ToObjectInfo(srcBucket, srcObject), toObjectErr(errMethodNotAllowed, srcBucket, srcObject)
|
||||||
}
|
}
|
||||||
|
|
||||||
onlineDisks, metaArr = shuffleDisksAndPartsMetadataByIndex(onlineDisks, metaArr, fi.Erasure.Distribution)
|
onlineDisks, metaArr = shuffleDisksAndPartsMetadataByIndex(onlineDisks, metaArr, fi)
|
||||||
|
|
||||||
versionID := srcInfo.VersionID
|
versionID := srcInfo.VersionID
|
||||||
if srcInfo.versionOnly {
|
if srcInfo.versionOnly {
|
||||||
@ -238,7 +238,7 @@ func (er erasureObjects) GetObject(ctx context.Context, bucket, object string, s
|
|||||||
func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, fi FileInfo, metaArr []FileInfo, onlineDisks []StorageAPI) error {
|
func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, fi FileInfo, metaArr []FileInfo, onlineDisks []StorageAPI) error {
|
||||||
// Reorder online disks based on erasure distribution order.
|
// Reorder online disks based on erasure distribution order.
|
||||||
// Reorder parts metadata based on erasure distribution order.
|
// Reorder parts metadata based on erasure distribution order.
|
||||||
onlineDisks, metaArr = shuffleDisksAndPartsMetadataByIndex(onlineDisks, metaArr, fi.Erasure.Distribution)
|
onlineDisks, metaArr = shuffleDisksAndPartsMetadataByIndex(onlineDisks, metaArr, fi)
|
||||||
|
|
||||||
// For negative length read everything.
|
// For negative length read everything.
|
||||||
if length < 0 {
|
if length < 0 {
|
||||||
@ -1190,7 +1190,7 @@ func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object strin
|
|||||||
return ObjectInfo{}, toObjectErr(errMethodNotAllowed, bucket, object)
|
return ObjectInfo{}, toObjectErr(errMethodNotAllowed, bucket, object)
|
||||||
}
|
}
|
||||||
|
|
||||||
onlineDisks, metaArr = shuffleDisksAndPartsMetadataByIndex(onlineDisks, metaArr, fi.Erasure.Distribution)
|
onlineDisks, metaArr = shuffleDisksAndPartsMetadataByIndex(onlineDisks, metaArr, fi)
|
||||||
for i, metaFi := range metaArr {
|
for i, metaFi := range metaArr {
|
||||||
if metaFi.IsValid() {
|
if metaFi.IsValid() {
|
||||||
// clean fi.Meta of tag key, before updating the new tags
|
// clean fi.Meta of tag key, before updating the new tags
|
||||||
|
@ -19,6 +19,7 @@ package cmd
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
"github.com/klauspost/reedsolomon"
|
"github.com/klauspost/reedsolomon"
|
||||||
@ -46,13 +47,13 @@ func writeDataBlocks(ctx context.Context, dst io.Writer, enBlocks [][]byte, data
|
|||||||
|
|
||||||
// Do we have enough blocks?
|
// Do we have enough blocks?
|
||||||
if len(enBlocks) < dataBlocks {
|
if len(enBlocks) < dataBlocks {
|
||||||
logger.LogIf(ctx, reedsolomon.ErrTooFewShards)
|
logger.LogIf(ctx, fmt.Errorf("diskBlocks(%d)/dataBlocks(%d) - %w", len(enBlocks), dataBlocks, reedsolomon.ErrTooFewShards))
|
||||||
return 0, reedsolomon.ErrTooFewShards
|
return 0, reedsolomon.ErrTooFewShards
|
||||||
}
|
}
|
||||||
|
|
||||||
// Do we have enough data?
|
// Do we have enough data?
|
||||||
if int64(getDataBlockLen(enBlocks, dataBlocks)) < length {
|
if int64(getDataBlockLen(enBlocks, dataBlocks)) < length {
|
||||||
logger.LogIf(ctx, reedsolomon.ErrShortData)
|
logger.LogIf(ctx, fmt.Errorf("getDataBlockLen(enBlocks, dataBlocks)(%d)/length(%d) - %w", getDataBlockLen(enBlocks, dataBlocks), length, reedsolomon.ErrShortData))
|
||||||
return 0, reedsolomon.ErrShortData
|
return 0, reedsolomon.ErrShortData
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user