add necessary protection err, fileInfo slice reads and writes (#18854)

protection was in place. However, it covered only some
areas, so we re-arranged the code to ensure we could hold
locks properly.

Along with this, remove the DataShardFix code altogether,
in deployments with many drive replacements, this can affect
and lead to quorum loss.
This commit is contained in:
Harshavardhana 2024-01-24 01:08:23 -08:00 committed by GitHub
parent 152023e837
commit 708cebe7f0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 57 additions and 339 deletions

View File

@ -116,7 +116,6 @@ verify-healing: ## verify healing and replacing disks with minio binary
@echo "Verify healing build with race" @echo "Verify healing build with race"
@GORACE=history_size=7 CGO_ENABLED=1 go build -race -tags kqueue -trimpath --ldflags "$(LDFLAGS)" -o $(PWD)/minio 1>/dev/null @GORACE=history_size=7 CGO_ENABLED=1 go build -race -tags kqueue -trimpath --ldflags "$(LDFLAGS)" -o $(PWD)/minio 1>/dev/null
@(env bash $(PWD)/buildscripts/verify-healing.sh) @(env bash $(PWD)/buildscripts/verify-healing.sh)
@(env bash $(PWD)/buildscripts/unaligned-healing.sh)
@(env bash $(PWD)/buildscripts/heal-inconsistent-versions.sh) @(env bash $(PWD)/buildscripts/heal-inconsistent-versions.sh)
verify-healing-with-root-disks: ## verify healing root disks verify-healing-with-root-disks: ## verify healing root disks

View File

@ -1,177 +0,0 @@
#!/bin/bash -e
#
set -E
set -o pipefail
set -x
if [ ! -x "$PWD/minio" ]; then
echo "minio executable binary not found in current directory"
exit 1
fi
WORK_DIR="$PWD/.verify-$RANDOM"
MINIO_CONFIG_DIR="$WORK_DIR/.minio"
MINIO_OLD=("$PWD/minio.RELEASE.2021-11-24T23-19-33Z" --config-dir "$MINIO_CONFIG_DIR" server)
MINIO=("$PWD/minio" --config-dir "$MINIO_CONFIG_DIR" server)
function download_old_release() {
if [ ! -f minio.RELEASE.2021-11-24T23-19-33Z ]; then
curl --silent -O https://dl.minio.io/server/minio/release/linux-amd64/archive/minio.RELEASE.2021-11-24T23-19-33Z
chmod a+x minio.RELEASE.2021-11-24T23-19-33Z
fi
}
function start_minio_16drive() {
start_port=$1
export MINIO_ROOT_USER=minio
export MINIO_ROOT_PASSWORD=minio123
export MC_HOST_minio="http://minio:minio123@127.0.0.1:${start_port}/"
unset MINIO_KMS_AUTO_ENCRYPTION # do not auto-encrypt objects
export _MINIO_SHARD_DISKTIME_DELTA="5s" # do not change this as its needed for tests
export MINIO_CI_CD=1
MC_BUILD_DIR="mc-$RANDOM"
if ! git clone --quiet https://github.com/minio/mc "$MC_BUILD_DIR"; then
echo "failed to download https://github.com/minio/mc"
purge "${MC_BUILD_DIR}"
exit 1
fi
(cd "${MC_BUILD_DIR}" && go build -o "$WORK_DIR/mc")
# remove mc source.
purge "${MC_BUILD_DIR}"
"${MINIO_OLD[@]}" --address ":$start_port" "${WORK_DIR}/xl{1...16}" >"${WORK_DIR}/server1.log" 2>&1 &
pid=$!
disown $pid
sleep 30
if ! ps -p ${pid} 1>&2 >/dev/null; then
echo "server1 log:"
cat "${WORK_DIR}/server1.log"
echo "FAILED"
purge "$WORK_DIR"
exit 1
fi
shred --iterations=1 --size=5241856 - 1>"${WORK_DIR}/unaligned" 2>/dev/null
"${WORK_DIR}/mc" mb minio/healing-shard-bucket --quiet
"${WORK_DIR}/mc" cp \
"${WORK_DIR}/unaligned" \
minio/healing-shard-bucket/unaligned \
--disable-multipart --quiet
## "unaligned" object name gets consistently distributed
## to disks in following distribution order
##
## NOTE: if you change the name make sure to change the
## distribution order present here
##
## [15, 16, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
## make sure to remove the "last" data shard
rm -rf "${WORK_DIR}/xl14/healing-shard-bucket/unaligned"
sleep 10
## Heal the shard
"${WORK_DIR}/mc" admin heal --quiet --recursive minio/healing-shard-bucket
## then remove any other data shard let's pick first disk
## - 1st data shard.
rm -rf "${WORK_DIR}/xl3/healing-shard-bucket/unaligned"
sleep 10
go install github.com/minio/minio/docs/debugging/s3-check-md5@latest
if ! s3-check-md5 \
-debug \
-access-key minio \
-secret-key minio123 \
-endpoint http://127.0.0.1:${start_port}/ 2>&1 | grep CORRUPTED; then
echo "server1 log:"
cat "${WORK_DIR}/server1.log"
echo "FAILED"
purge "$WORK_DIR"
exit 1
fi
pkill minio
sleep 3
"${MINIO[@]}" --address ":$start_port" "${WORK_DIR}/xl{1...16}" >"${WORK_DIR}/server1.log" 2>&1 &
pid=$!
disown $pid
sleep 30
if ! ps -p ${pid} 1>&2 >/dev/null; then
echo "server1 log:"
cat "${WORK_DIR}/server1.log"
echo "FAILED"
purge "$WORK_DIR"
exit 1
fi
if ! s3-check-md5 \
-debug \
-access-key minio \
-secret-key minio123 \
-endpoint http://127.0.0.1:${start_port}/ 2>&1 | grep INTACT; then
echo "server1 log:"
cat "${WORK_DIR}/server1.log"
echo "FAILED"
mkdir -p inspects
(
cd inspects
"${WORK_DIR}/mc" support inspect minio/healing-shard-bucket/unaligned/**
)
"${WORK_DIR}/mc" mb play/inspects
"${WORK_DIR}/mc" mirror inspects play/inspects
purge "$WORK_DIR"
exit 1
fi
"${WORK_DIR}/mc" admin heal --quiet --recursive minio/healing-shard-bucket
if ! s3-check-md5 \
-debug \
-access-key minio \
-secret-key minio123 \
-endpoint http://127.0.0.1:${start_port}/ 2>&1 | grep INTACT; then
echo "server1 log:"
cat "${WORK_DIR}/server1.log"
echo "FAILED"
mkdir -p inspects
(
cd inspects
"${WORK_DIR}/mc" support inspect minio/healing-shard-bucket/unaligned/**
)
"${WORK_DIR}/mc" mb play/inspects
"${WORK_DIR}/mc" mirror inspects play/inspects
purge "$WORK_DIR"
exit 1
fi
pkill minio
sleep 3
}
function main() {
download_old_release
start_port=$(shuf -i 10000-65000 -n 1)
start_minio_16drive ${start_port}
}
function purge() {
rm -rf "$1"
}
(main "$@")
rv=$?
purge "$WORK_DIR"
exit "$rv"

View File

@ -69,8 +69,6 @@ import (
// serverDebugLog will enable debug printing // serverDebugLog will enable debug printing
var serverDebugLog = env.Get("_MINIO_SERVER_DEBUG", config.EnableOff) == config.EnableOn var serverDebugLog = env.Get("_MINIO_SERVER_DEBUG", config.EnableOff) == config.EnableOn
var shardDiskTimeDelta time.Duration
func init() { func init() {
if runtime.GOOS == "windows" { if runtime.GOOS == "windows" {
if mousetrap.StartedByExplorer() { if mousetrap.StartedByExplorer() {
@ -107,12 +105,6 @@ func init() {
gob.Register(madmin.XFSErrorConfigs{}) gob.Register(madmin.XFSErrorConfigs{})
gob.Register(map[string]interface{}{}) gob.Register(map[string]interface{}{})
var err error
shardDiskTimeDelta, err = time.ParseDuration(env.Get("_MINIO_SHARD_DISKTIME_DELTA", "1m"))
if err != nil {
shardDiskTimeDelta = 1 * time.Minute
}
// All minio-go and madmin-go API operations shall be performed only once, // All minio-go and madmin-go API operations shall be performed only once,
// another way to look at this is we are turning off retries. // another way to look at this is we are turning off retries.
minio.MaxRetry = 1 minio.MaxRetry = 1

View File

@ -191,19 +191,6 @@ func filterOnlineDisksInplace(fi FileInfo, partsMetadata []FileInfo, onlineDisks
} }
} }
// Extracts list of disk mtimes from FileInfo slice and returns, skips
// slice elements that have errors.
func listObjectDiskMtimes(partsMetadata []FileInfo) (diskMTimes []time.Time) {
diskMTimes = bootModtimes(len(partsMetadata))
for index, metadata := range partsMetadata {
if metadata.IsValid() {
// Once the file is found, save the disk mtime saved on disk.
diskMTimes[index] = metadata.DiskMTime
}
}
return diskMTimes
}
// Notes: // Notes:
// There are 5 possible states a disk could be in, // There are 5 possible states a disk could be in,
// 1. __online__ - has the latest copy of xl.meta - returned by listOnlineDisks // 1. __online__ - has the latest copy of xl.meta - returned by listOnlineDisks
@ -277,13 +264,6 @@ func disksWithAllParts(ctx context.Context, onlineDisks []StorageAPI, partsMetad
errs []error, latestMeta FileInfo, bucket, object string, errs []error, latestMeta FileInfo, bucket, object string,
scanMode madmin.HealScanMode) ([]StorageAPI, []error, time.Time, scanMode madmin.HealScanMode) ([]StorageAPI, []error, time.Time,
) { ) {
var diskMTime time.Time
var shardFix bool
if !latestMeta.DataShardFixed() {
diskMTime = pickValidDiskTimeWithQuorum(partsMetadata,
latestMeta.Erasure.DataBlocks)
}
availableDisks := make([]StorageAPI, len(onlineDisks)) availableDisks := make([]StorageAPI, len(onlineDisks))
dataErrs := make([]error, len(onlineDisks)) dataErrs := make([]error, len(onlineDisks))
@ -351,23 +331,6 @@ func disksWithAllParts(ctx context.Context, onlineDisks []StorageAPI, partsMetad
} }
} }
if !diskMTime.Equal(timeSentinel) && !diskMTime.IsZero() {
if !partsMetadata[i].AcceptableDelta(diskMTime, shardDiskTimeDelta) {
// not with in acceptable delta, skip.
// If disk mTime mismatches it is considered outdated
// https://github.com/minio/minio/pull/13803
//
// This check only is active if we could find maximally
// occurring disk mtimes that are somewhat same across
// the quorum. Allowing to skip those shards which we
// might think are wrong.
shardFix = true
partsMetadata[i] = FileInfo{}
dataErrs[i] = errFileCorrupt
continue
}
}
// Always check data, if we got it. // Always check data, if we got it.
if (len(meta.Data) > 0 || meta.Size == 0) && len(meta.Parts) > 0 { if (len(meta.Data) > 0 || meta.Size == 0) && len(meta.Parts) > 0 {
checksumInfo := meta.Erasure.GetChecksumInfo(meta.Parts[0].Number) checksumInfo := meta.Erasure.GetChecksumInfo(meta.Parts[0].Number)
@ -410,10 +373,5 @@ func disksWithAllParts(ctx context.Context, onlineDisks []StorageAPI, partsMetad
} }
} }
if shardFix {
// Only when shard is fixed return an appropriate disk mtime value.
return availableDisks, dataErrs, diskMTime
} // else return timeSentinel for disk mtime
return availableDisks, dataErrs, timeSentinel return availableDisks, dataErrs, timeSentinel
} }

View File

@ -33,8 +33,6 @@ import (
"github.com/minio/pkg/v2/sync/errgroup" "github.com/minio/pkg/v2/sync/errgroup"
) )
const reservedMetadataPrefixLowerDataShardFix = ReservedMetadataPrefixLower + "data-shard-fix"
//go:generate stringer -type=healingMetric -trimprefix=healingMetric $GOFILE //go:generate stringer -type=healingMetric -trimprefix=healingMetric $GOFILE
type healingMetric uint8 type healingMetric uint8
@ -45,29 +43,6 @@ const (
healingMetricCheckAbandonedParts healingMetricCheckAbandonedParts
) )
// AcceptableDelta returns 'true' if the fi.DiskMTime is under
// acceptable delta of "delta" duration with maxTime.
//
// This code is primarily used for heuristic detection of
// incorrect shards, as per https://github.com/minio/minio/pull/13803
//
// This check only is active if we could find maximally
// occurring disk mtimes that are somewhat same across
// the quorum. Allowing to skip those shards which we
// might think are wrong.
func (fi FileInfo) AcceptableDelta(maxTime time.Time, delta time.Duration) bool {
diff := maxTime.Sub(fi.DiskMTime)
if diff < 0 {
diff = -diff
}
return diff < delta
}
// DataShardFixed - data shard fixed?
func (fi FileInfo) DataShardFixed() bool {
return fi.Metadata[reservedMetadataPrefixLowerDataShardFix] == "true"
}
func (er erasureObjects) listAndHeal(bucket, prefix string, scanMode madmin.HealScanMode, healEntry func(string, metaCacheEntry, madmin.HealScanMode) error) error { func (er erasureObjects) listAndHeal(bucket, prefix string, scanMode madmin.HealScanMode, healEntry func(string, metaCacheEntry, madmin.HealScanMode) error) error {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
@ -311,7 +286,7 @@ func (er *erasureObjects) healObject(ctx context.Context, bucket string, object
// used here for reconstruction. This is done to ensure that // used here for reconstruction. This is done to ensure that
// we do not skip drives that have inconsistent metadata to be // we do not skip drives that have inconsistent metadata to be
// skipped from purging when they are stale. // skipped from purging when they are stale.
availableDisks, dataErrs, diskMTime := disksWithAllParts(ctx, onlineDisks, partsMetadata, availableDisks, dataErrs, _ := disksWithAllParts(ctx, onlineDisks, partsMetadata,
errs, latestMeta, bucket, object, scanMode) errs, latestMeta, bucket, object, scanMode)
var erasure Erasure var erasure Erasure
@ -627,20 +602,6 @@ func (er *erasureObjects) healObject(ctx context.Context, bucket string, object
} }
} }
if !diskMTime.Equal(timeSentinel) && !diskMTime.IsZero() {
// Update metadata to indicate special fix.
_, err = er.PutObjectMetadata(ctx, bucket, object, ObjectOptions{
NoLock: true,
UserDefined: map[string]string{
reservedMetadataPrefixLowerDataShardFix: "true",
// another reserved metadata to capture original disk-mtime
// captured for this version of the object, to be used
// possibly in future to heal other versions if possible.
ReservedMetadataPrefixLower + "disk-mtime": diskMTime.String(),
},
})
}
return result, nil return result, nil
} }
@ -919,21 +880,25 @@ func isObjectDangling(metaArr []FileInfo, errs []error, dataErrs []error) (valid
// or when xl.meta is not readable in read quorum disks. // or when xl.meta is not readable in read quorum disks.
danglingErrsCount := func(cerrs []error) (int, int, int) { danglingErrsCount := func(cerrs []error) (int, int, int) {
var ( var (
notFoundCount int notFoundCount int
corruptedCount int corruptedCount int
diskNotFoundCount int driveNotFoundCount int
) )
for _, readErr := range cerrs { for _, readErr := range cerrs {
if readErr == nil {
continue
}
switch { switch {
case errors.Is(readErr, errFileNotFound) || errors.Is(readErr, errFileVersionNotFound): case errors.Is(readErr, errFileNotFound) || errors.Is(readErr, errFileVersionNotFound):
notFoundCount++ notFoundCount++
case errors.Is(readErr, errFileCorrupt): case errors.Is(readErr, errFileCorrupt):
corruptedCount++ corruptedCount++
case errors.Is(readErr, errDiskNotFound): default:
diskNotFoundCount++ // All other errors are non-actionable
driveNotFoundCount++
} }
} }
return notFoundCount, corruptedCount, diskNotFoundCount return notFoundCount, corruptedCount, driveNotFoundCount
} }
ndataErrs := make([]error, len(dataErrs)) ndataErrs := make([]error, len(dataErrs))

View File

@ -374,17 +374,6 @@ func findFileInfoInQuorum(ctx context.Context, metaArr []FileInfo, modTime time.
return FileInfo{}, errErasureReadQuorum return FileInfo{}, errErasureReadQuorum
} }
func pickValidDiskTimeWithQuorum(metaArr []FileInfo, quorum int) time.Time {
diskMTimes := listObjectDiskMtimes(metaArr)
diskMTime, diskMaxima := commonTimeAndOccurence(diskMTimes, shardDiskTimeDelta)
if diskMaxima >= quorum {
return diskMTime
}
return timeSentinel
}
// pickValidFileInfo - picks one valid FileInfo content and returns from a // pickValidFileInfo - picks one valid FileInfo content and returns from a
// slice of FileInfo. // slice of FileInfo.
func pickValidFileInfo(ctx context.Context, metaArr []FileInfo, modTime time.Time, etag string, quorum int) (FileInfo, error) { func pickValidFileInfo(ctx context.Context, metaArr []FileInfo, modTime time.Time, etag string, quorum int) (FileInfo, error) {

View File

@ -227,30 +227,6 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri
return nil, toObjectErr(err, bucket, object) return nil, toObjectErr(err, bucket, object)
} }
if !fi.DataShardFixed() {
diskMTime := pickValidDiskTimeWithQuorum(metaArr, fi.Erasure.DataBlocks)
if !diskMTime.Equal(timeSentinel) && !diskMTime.IsZero() {
for index := range onlineDisks {
if onlineDisks[index] == OfflineDisk {
continue
}
if !metaArr[index].IsValid() {
continue
}
if !metaArr[index].AcceptableDelta(diskMTime, shardDiskTimeDelta) {
// If disk mTime mismatches it is considered outdated
// https://github.com/minio/minio/pull/13803
//
// This check only is active if we could find maximally
// occurring disk mtimes that are somewhat same across
// the quorum. Allowing to skip those shards which we
// might think are wrong.
onlineDisks[index] = OfflineDisk
}
}
}
}
objInfo := fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended) objInfo := fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended)
if objInfo.DeleteMarker { if objInfo.DeleteMarker {
if opts.VersionID == "" { if opts.VersionID == "" {
@ -511,7 +487,7 @@ func (er erasureObjects) deleteIfDangling(ctx context.Context, bucket, object st
// count the number of offline disks // count the number of offline disks
offline := 0 offline := 0
for i := 0; i < max(len(errs), len(dataErrs)); i++ { for i := 0; i < max(len(errs), len(dataErrs)); i++ {
if i < len(errs) && errs[i] == errDiskNotFound || i < len(dataErrs) && dataErrs[i] == errDiskNotFound { if i < len(errs) && errors.Is(errs[i], errDiskNotFound) || i < len(dataErrs) && errors.Is(dataErrs[i], errDiskNotFound) {
offline++ offline++
} }
} }
@ -768,8 +744,6 @@ func readAllXL(ctx context.Context, disks []StorageAPI, bucket, object string, r
} }
func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object string, opts ObjectOptions, readData bool) (FileInfo, []FileInfo, []StorageAPI, error) { func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object string, opts ObjectOptions, readData bool) (FileInfo, []FileInfo, []StorageAPI, error) {
var mu sync.Mutex
rawArr := make([]RawFileInfo, er.setDriveCount) rawArr := make([]RawFileInfo, er.setDriveCount)
metaArr := make([]FileInfo, er.setDriveCount) metaArr := make([]FileInfo, er.setDriveCount)
errs := make([]error, er.setDriveCount) errs := make([]error, er.setDriveCount)
@ -780,14 +754,16 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s
done := make(chan bool, er.setDriveCount) done := make(chan bool, er.setDriveCount)
disks := er.getDisks() disks := er.getDisks()
mrfCheck := make(chan FileInfo)
defer close(mrfCheck)
ropts := ReadOptions{ ropts := ReadOptions{
ReadData: readData, ReadData: readData,
Healing: false, Healing: false,
} }
mrfCheck := make(chan FileInfo)
defer close(mrfCheck)
var rw sync.Mutex
// Ask for all disks first; // Ask for all disks first;
go func() { go func() {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
@ -799,31 +775,36 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s
done <- false done <- false
continue continue
} }
if !disk.IsOnline() {
done <- false
continue
}
wg.Add(1) wg.Add(1)
go func(i int, disk StorageAPI) { go func(i int, disk StorageAPI) {
defer wg.Done() defer wg.Done()
var fi FileInfo
var err error var (
fi FileInfo
rfi RawFileInfo
err error
)
if opts.VersionID != "" { if opts.VersionID != "" {
// Read a specific version ID // Read a specific version ID
fi, err = readFileInfo(ctx, disk, bucket, object, opts.VersionID, ropts) fi, err = readFileInfo(ctx, disk, bucket, object, opts.VersionID, ropts)
mu.Lock()
metaArr[i], errs[i] = fi, err
mu.Unlock()
} else { } else {
// Read the latest version // Read the latest version
var ri RawFileInfo rfi, err = readRawFileInfo(ctx, disk, bucket, object, readData)
ri, err = readRawFileInfo(ctx, disk, bucket, object, readData)
mu.Lock()
rawArr[i], errs[i] = ri, err
mu.Unlock()
if err == nil { if err == nil {
fi, err = fileInfoFromRaw(ri, bucket, object, readData, opts.InclFreeVersions, true) fi, err = fileInfoFromRaw(rfi, bucket, object, readData, opts.InclFreeVersions, true)
mu.Lock()
metaArr[i], errs[i] = fi, err
mu.Unlock()
} }
} }
rw.Lock()
rawArr[i] = rfi
metaArr[i], errs[i] = fi, err
rw.Unlock()
done <- err == nil done <- err == nil
}(i, disk) }(i, disk)
} }
@ -835,20 +816,24 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s
if !ok { if !ok {
return return
} }
if fi.Deleted { if fi.Deleted {
return return
} }
// if one of the disk is offline, return right here no need // if one of the disk is offline, return right here no need
// to attempt a heal on the object. // to attempt a heal on the object.
if countErrs(errs, errDiskNotFound) > 0 { if countErrs(errs, errDiskNotFound) > 0 {
return return
} }
var missingBlocks int var missingBlocks int
for i := range errs { for i := range errs {
if errors.Is(errs[i], errFileNotFound) { if errors.Is(errs[i], errFileNotFound) {
missingBlocks++ missingBlocks++
} }
} }
// if missing metadata can be reconstructed, attempt to reconstruct. // if missing metadata can be reconstructed, attempt to reconstruct.
// additionally do not heal delete markers inline, let them be // additionally do not heal delete markers inline, let them be
// healed upon regular heal process. // healed upon regular heal process.
@ -879,13 +864,12 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s
minDisks = er.setDriveCount - er.defaultParityCount minDisks = er.setDriveCount - er.defaultParityCount
} }
calcQuorum := func() (FileInfo, []FileInfo, []StorageAPI, time.Time, string, error) { calcQuorum := func(metaArr []FileInfo, errs []error) (FileInfo, []FileInfo, []StorageAPI, time.Time, string, error) {
readQuorum, _, err := objectQuorumFromMeta(ctx, metaArr, errs, er.defaultParityCount) readQuorum, _, err := objectQuorumFromMeta(ctx, metaArr, errs, er.defaultParityCount)
if err != nil { if err != nil {
return FileInfo{}, nil, nil, time.Time{}, "", err return FileInfo{}, nil, nil, time.Time{}, "", err
} }
err = reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum) if err := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); err != nil {
if err != nil {
return FileInfo{}, nil, nil, time.Time{}, "", err return FileInfo{}, nil, nil, time.Time{}, "", err
} }
onlineDisks, modTime, etag := listOnlineDisks(disks, metaArr, errs, readQuorum) onlineDisks, modTime, etag := listOnlineDisks(disks, metaArr, errs, readQuorum)
@ -895,7 +879,11 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s
} }
onlineMeta := make([]FileInfo, len(metaArr)) onlineMeta := make([]FileInfo, len(metaArr))
copy(onlineMeta, metaArr) for i, disk := range onlineDisks {
if disk != nil {
onlineMeta[i] = metaArr[i]
}
}
return fi, onlineMeta, onlineDisks, modTime, etag, nil return fi, onlineMeta, onlineDisks, modTime, etag, nil
} }
@ -922,20 +910,24 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s
continue continue
} }
} }
rw.Lock()
if opts.VersionID == "" && totalResp == er.setDriveCount { if opts.VersionID == "" && totalResp == er.setDriveCount {
// Disks cannot agree about the latest version, pass this to a more advanced code fi, onlineMeta, onlineDisks, modTime, etag, err = calcQuorum(pickLatestQuorumFilesInfo(ctx,
metaArr, errs = pickLatestQuorumFilesInfo(ctx, rawArr, errs, bucket, object, readData, opts.InclFreeVersions, true) rawArr, errs, bucket, object, readData, opts.InclFreeVersions, true))
} else {
fi, onlineMeta, onlineDisks, modTime, etag, err = calcQuorum(metaArr, errs)
} }
mu.Lock() rw.Unlock()
fi, onlineMeta, onlineDisks, modTime, etag, err = calcQuorum()
mu.Unlock()
if err == nil && fi.InlineData() { if err == nil && fi.InlineData() {
break break
} }
} }
if err != nil { if err != nil {
if shouldCheckForDangling(err, errs, bucket) { // We can only look for dangling if we received all the responses, if we did
// not we simply ignore it, since we can't tell for sure if its dangling object.
if totalResp == er.setDriveCount && shouldCheckForDangling(err, errs, bucket) {
_, derr := er.deleteIfDangling(context.Background(), bucket, object, metaArr, errs, nil, opts) _, derr := er.deleteIfDangling(context.Background(), bucket, object, metaArr, errs, nil, opts)
if derr != nil { if derr != nil {
err = derr err = derr