fix: ensure proper usage of DataDir (#12300)

- GetObject() should always use a common dataDir to
  read from when it starts reading, this allows the
  code in erasure decoding to have sane expectations.

- Healing should always heal on the common dataDir, this
  allows the code in dangling object detection to purge
  dangling content.

These both situations can happen under certain types of
retries during PUT when server is restarting etc, some
namespace entries might be left over.
This commit is contained in:
Harshavardhana 2021-05-14 16:50:47 -07:00 committed by GitHub
parent 5b18c57a54
commit d84261aa6d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 49 additions and 31 deletions

View File

@ -766,7 +766,7 @@ var errorCodes = errorCodeMap{
}, },
ErrSlowDown: { ErrSlowDown: {
Code: "SlowDown", Code: "SlowDown",
Description: "Please reduce your request", Description: "Resource requested is unreadable, please reduce your request rate",
HTTPStatusCode: http.StatusServiceUnavailable, HTTPStatusCode: http.StatusServiceUnavailable,
}, },
ErrInvalidPrefixMarker: { ErrInvalidPrefixMarker: {

View File

@ -146,6 +146,11 @@ func (b *streamingBitrotReader) ReadAt(buf []byte, offset int64) (int, error) {
streamOffset := (offset/b.shardSize)*int64(b.h.Size()) + offset streamOffset := (offset/b.shardSize)*int64(b.h.Size()) + offset
if len(b.data) == 0 && b.tillOffset != streamOffset { if len(b.data) == 0 && b.tillOffset != streamOffset {
b.rc, err = b.disk.ReadFileStream(context.TODO(), b.volume, b.filePath, streamOffset, b.tillOffset-streamOffset) b.rc, err = b.disk.ReadFileStream(context.TODO(), b.volume, b.filePath, streamOffset, b.tillOffset-streamOffset)
if err != nil {
logger.LogIf(GlobalContext,
fmt.Errorf("Error(%w) reading erasure shards at (%s: %s/%s), will attempt to reconstruct if we have quorum",
err, b.disk, b.volume, b.filePath))
}
} else { } else {
b.rc = io.NewSectionReader(bytes.NewReader(b.data), streamOffset, b.tillOffset-streamOffset) b.rc = io.NewSectionReader(bytes.NewReader(b.data), streamOffset, b.tillOffset-streamOffset)
} }
@ -153,7 +158,6 @@ func (b *streamingBitrotReader) ReadAt(buf []byte, offset int64) (int, error) {
return 0, err return 0, err
} }
} }
if offset != b.currOffset { if offset != b.currOffset {
// Can never happen unless there are programmer bugs // Can never happen unless there are programmer bugs
return 0, errUnexpected return 0, errUnexpected

View File

@ -32,7 +32,6 @@ type parallelReader struct {
readers []io.ReaderAt readers []io.ReaderAt
orgReaders []io.ReaderAt orgReaders []io.ReaderAt
dataBlocks int dataBlocks int
errs []error
offset int64 offset int64
shardSize int64 shardSize int64
shardFileSize int64 shardFileSize int64
@ -49,7 +48,6 @@ func newParallelReader(readers []io.ReaderAt, e Erasure, offset, totalLength int
return &parallelReader{ return &parallelReader{
readers: readers, readers: readers,
orgReaders: readers, orgReaders: readers,
errs: make([]error, len(readers)),
dataBlocks: e.dataBlocks, dataBlocks: e.dataBlocks,
offset: (offset / e.blockSize) * e.ShardSize(), offset: (offset / e.blockSize) * e.ShardSize(),
shardSize: e.ShardSize(), shardSize: e.ShardSize(),
@ -173,7 +171,6 @@ func (p *parallelReader) Read(dst [][]byte) ([][]byte, error) {
// This will be communicated upstream. // This will be communicated upstream.
p.orgReaders[bufIdx] = nil p.orgReaders[bufIdx] = nil
p.readers[i] = nil p.readers[i] = nil
p.errs[i] = err
// Since ReadAt returned error, trigger another read. // Since ReadAt returned error, trigger another read.
readTriggerCh <- true readTriggerCh <- true
@ -198,19 +195,8 @@ func (p *parallelReader) Read(dst [][]byte) ([][]byte, error) {
return newBuf, nil return newBuf, nil
} }
if countErrs(p.errs, nil) == len(p.errs) { // If we cannot decode, just return read quorum error.
// We have success from all drives this can mean that
// all local drives succeeded, but all remote drives
// failed to read since p.readers[i] was already nil
// for such remote servers - this condition was missed
// we would return instead `nil, nil` from this
// function - it is safer to simply return Quorum error
// when all errs are nil but erasure coding cannot decode
// the content.
return nil, errErasureReadQuorum return nil, errErasureReadQuorum
}
return nil, reduceReadQuorumErrs(context.Background(), p.errs, objectOpIgnoredErrs, p.dataBlocks)
} }
// Decode reads from readers, reconstructs data if needed and writes the data to the writer. // Decode reads from readers, reconstructs data if needed and writes the data to the writer.

View File

@ -200,7 +200,6 @@ func disksWithAllParts(ctx context.Context, onlineDisks []StorageAPI, partsMetad
object string, scanMode madmin.HealScanMode) ([]StorageAPI, []error) { object string, scanMode madmin.HealScanMode) ([]StorageAPI, []error) {
availableDisks := make([]StorageAPI, len(onlineDisks)) availableDisks := make([]StorageAPI, len(onlineDisks))
dataErrs := make([]error, len(onlineDisks)) dataErrs := make([]error, len(onlineDisks))
inconsistent := 0 inconsistent := 0
for i, meta := range partsMetadata { for i, meta := range partsMetadata {
if !meta.IsValid() { if !meta.IsValid() {
@ -270,6 +269,9 @@ func disksWithAllParts(ctx context.Context, onlineDisks []StorageAPI, partsMetad
if dataErrs[i] == nil { if dataErrs[i] == nil {
// All parts verified, mark it as all data available. // All parts verified, mark it as all data available.
availableDisks[i] = onlineDisk availableDisks[i] = onlineDisk
} else {
// upon errors just make that disk's fileinfo invalid
partsMetadata[i] = FileInfo{}
} }
continue continue
} }
@ -291,6 +293,9 @@ func disksWithAllParts(ctx context.Context, onlineDisks []StorageAPI, partsMetad
if dataErrs[i] == nil { if dataErrs[i] == nil {
// All parts verified, mark it as all data available. // All parts verified, mark it as all data available.
availableDisks[i] = onlineDisk availableDisks[i] = onlineDisk
} else {
// upon errors just make that disk's fileinfo invalid
partsMetadata[i] = FileInfo{}
} }
} }

View File

@ -28,7 +28,6 @@ import (
"github.com/minio/madmin-go" "github.com/minio/madmin-go"
"github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/bucket/lifecycle"
"github.com/minio/minio/pkg/sync/errgroup" "github.com/minio/minio/pkg/sync/errgroup"
) )
@ -267,10 +266,27 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
// List of disks having latest version of the object er.meta // List of disks having latest version of the object er.meta
// (by modtime). // (by modtime).
latestDisks, modTime, dataDir := listOnlineDisks(storageDisks, partsMetadata, errs) _, modTime, dataDir := listOnlineDisks(storageDisks, partsMetadata, errs)
// make sure all parts metadata dataDir is same as returned by listOnlineDisks()
// the reason is its possible that some of the disks might have stale data, for those
// we simply override them with maximally occurring 'dataDir' - this ensures that
// disksWithAllParts() verifies same dataDir across all drives.
for i := range partsMetadata {
partsMetadata[i].DataDir = dataDir
}
// List of disks having all parts as per latest metadata.
// NOTE: do not pass in latestDisks to diskWithAllParts since
// the diskWithAllParts needs to reach the drive to ensure
// validity of the metadata content, we should make sure that
// we pass in disks as is for it to be verified. Once verified
// the disksWithAllParts() returns the actual disks that can be
// used here for reconstruction. This is done to ensure that
// we do not skip drives that have inconsistent metadata to be
// skipped from purging when they are stale.
availableDisks, dataErrs := disksWithAllParts(ctx, storageDisks, partsMetadata, errs, bucket, object, scanMode)
// List of disks having all parts as per latest er.meta.
availableDisks, dataErrs := disksWithAllParts(ctx, latestDisks, partsMetadata, errs, bucket, object, scanMode)
// Loop to find number of disks with valid data, per-drive // Loop to find number of disks with valid data, per-drive
// data state and a list of outdated disks on which data needs // data state and a list of outdated disks on which data needs
// to be healed. // to be healed.
@ -336,6 +352,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
// File is fully gone, fileInfo is empty. // File is fully gone, fileInfo is empty.
return defaultHealResult(FileInfo{}, storageDisks, storageEndpoints, errs, bucket, object, versionID, er.defaultParityCount), err return defaultHealResult(FileInfo{}, storageDisks, storageEndpoints, errs, bucket, object, versionID, er.defaultParityCount), err
} }
// If less than read quorum number of disks have all the parts // If less than read quorum number of disks have all the parts
// of the data, we can't reconstruct the erasure-coded data. // of the data, we can't reconstruct the erasure-coded data.
if numAvailableDisks < result.DataBlocks { if numAvailableDisks < result.DataBlocks {
@ -405,7 +422,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
result.ParityBlocks = latestMeta.Erasure.ParityBlocks result.ParityBlocks = latestMeta.Erasure.ParityBlocks
// Reorder so that we have data disks first and parity disks next. // Reorder so that we have data disks first and parity disks next.
latestDisks = shuffleDisks(availableDisks, latestMeta.Erasure.Distribution) latestDisks := shuffleDisks(availableDisks, latestMeta.Erasure.Distribution)
outDatedDisks = shuffleDisks(outDatedDisks, latestMeta.Erasure.Distribution) outDatedDisks = shuffleDisks(outDatedDisks, latestMeta.Erasure.Distribution)
partsMetadata = shufflePartsMetadata(partsMetadata, latestMeta.Erasure.Distribution) partsMetadata = shufflePartsMetadata(partsMetadata, latestMeta.Erasure.Distribution)
copyPartsMetadata = shufflePartsMetadata(copyPartsMetadata, latestMeta.Erasure.Distribution) copyPartsMetadata = shufflePartsMetadata(copyPartsMetadata, latestMeta.Erasure.Distribution)
@ -823,7 +840,7 @@ func isObjectDangling(metaArr []FileInfo, errs []error, dataErrs []error) (valid
break break
} }
if validMeta.Deleted || validMeta.TransitionStatus == lifecycle.TransitionComplete { if validMeta.Deleted || validMeta.IsRemote() {
// notFoundParts is ignored since a // notFoundParts is ignored since a
// - delete marker does not have any parts // - delete marker does not have any parts
// - transition status of complete has no parts // - transition status of complete has no parts

View File

@ -285,6 +285,11 @@ func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, obje
} }
var healOnce sync.Once var healOnce sync.Once
// once we have obtained a common FileInfo i.e latest, we should stick
// to single dataDir to read the content to avoid reading from some other
// dataDir that has stale FileInfo{} to ensure that we fail appropriately
// during reads and expect the same dataDir everywhere.
dataDir := fi.DataDir
for ; partIndex <= lastPartIndex; partIndex++ { for ; partIndex <= lastPartIndex; partIndex++ {
if length == totalBytesRead { if length == totalBytesRead {
break break
@ -313,9 +318,8 @@ func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, obje
continue continue
} }
checksumInfo := metaArr[index].Erasure.GetChecksumInfo(partNumber) checksumInfo := metaArr[index].Erasure.GetChecksumInfo(partNumber)
partPath := pathJoin(object, metaArr[index].DataDir, fmt.Sprintf("part.%d", partNumber)) partPath := pathJoin(object, dataDir, fmt.Sprintf("part.%d", partNumber))
data := metaArr[index].Data readers[index] = newBitrotReader(disk, metaArr[index].Data, bucket, partPath, tillOffset,
readers[index] = newBitrotReader(disk, data, bucket, partPath, tillOffset,
checksumInfo.Algorithm, checksumInfo.Hash, erasure.ShardSize()) checksumInfo.Algorithm, checksumInfo.Hash, erasure.ShardSize())
// Prefer local disks // Prefer local disks
@ -336,10 +340,12 @@ func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, obje
var scan madmin.HealScanMode var scan madmin.HealScanMode
if errors.Is(err, errFileNotFound) { if errors.Is(err, errFileNotFound) {
scan = madmin.HealNormalScan scan = madmin.HealNormalScan
logger.Info("Healing required, attempting to heal missing shards for %s", pathJoin(bucket, object, fi.VersionID))
} else if errors.Is(err, errFileCorrupt) { } else if errors.Is(err, errFileCorrupt) {
scan = madmin.HealDeepScan scan = madmin.HealDeepScan
logger.Info("Healing required, attempting to heal bitrot for %s", pathJoin(bucket, object, fi.VersionID))
} }
if scan != madmin.HealUnknownScan { if scan == madmin.HealNormalScan || scan == madmin.HealDeepScan {
healOnce.Do(func() { healOnce.Do(func() {
if _, healing := er.getOnlineDisksWithHealing(); !healing { if _, healing := er.getOnlineDisksWithHealing(); !healing {
go healObject(bucket, object, fi.VersionID, scan) go healObject(bucket, object, fi.VersionID, scan)

View File

@ -324,8 +324,8 @@ func TestGetObjectNoQuorum(t *testing.T) {
} }
err = xl.GetObject(ctx, bucket, object, 0, int64(len(buf)), ioutil.Discard, "", opts) err = xl.GetObject(ctx, bucket, object, 0, int64(len(buf)), ioutil.Discard, "", opts)
if err != toObjectErr(errFileNotFound, bucket, object) { if err != toObjectErr(errErasureReadQuorum, bucket, object) {
t.Errorf("Expected GetObject to fail with %v, but failed with %v", toObjectErr(errErasureWriteQuorum, bucket, object), err) t.Errorf("Expected GetObject to fail with %v, but failed with %v", toObjectErr(errErasureReadQuorum, bucket, object), err)
} }
// Test use case 2: Make 9 disks offline, which leaves less than quorum number of disks // Test use case 2: Make 9 disks offline, which leaves less than quorum number of disks