mirror of
https://github.com/minio/minio.git
synced 2025-01-11 23:13:23 -05:00
quorum calculation getLatestFileInfo should be itself (#13717)
FileInfo quorum shouldn't be passed down, instead inferred after obtaining a maximally occurring FileInfo. This PR also changes other functions that rely on wrong quorum calculation. Update tests as well to handle the proper requirement. All these changes are needed when migrating from older deployments where we used to set N/2 quorum for reads to EC:4 parity in newer releases.
This commit is contained in:
parent
c791de0e1e
commit
28f95f1fbe
@ -10,6 +10,38 @@ cleanup() {
|
|||||||
docker volume prune -f
|
docker volume prune -f
|
||||||
}
|
}
|
||||||
|
|
||||||
|
verify_checksum_after_heal() {
|
||||||
|
local sum1
|
||||||
|
sum1=$(curl -s "$2" | sha256sum);
|
||||||
|
mc admin heal --json -r "$1" >/dev/null; # test after healing
|
||||||
|
local sum1_heal
|
||||||
|
sum1_heal=$(curl -s "$2" | sha256sum);
|
||||||
|
|
||||||
|
if [ "${sum1_heal}" != "${sum1}" ]; then
|
||||||
|
echo "mismatch expected ${sum1_heal}, got ${sum1}"
|
||||||
|
exit 1;
|
||||||
|
fi
|
||||||
|
}
|
||||||
|
|
||||||
|
verify_checksum_mc() {
|
||||||
|
local expected
|
||||||
|
expected=$(mc cat "$1" | sha256sum)
|
||||||
|
local got
|
||||||
|
got=$(mc cat "$2" | sha256sum)
|
||||||
|
|
||||||
|
if [ "${expected}" != "${got}" ]; then
|
||||||
|
echo "mismatch - expected ${expected}, got ${got}"
|
||||||
|
exit 1;
|
||||||
|
fi
|
||||||
|
echo "matches - ${expected}, got ${got}"
|
||||||
|
}
|
||||||
|
|
||||||
|
add_alias() {
|
||||||
|
until (mc alias set minio http://127.0.0.1:9000 minioadmin minioadmin); do
|
||||||
|
echo "...waiting... for 5secs" && sleep 5
|
||||||
|
done
|
||||||
|
}
|
||||||
|
|
||||||
__init__() {
|
__init__() {
|
||||||
sudo apt install curl -y
|
sudo apt install curl -y
|
||||||
export GOPATH=/tmp/gopath
|
export GOPATH=/tmp/gopath
|
||||||
@ -22,48 +54,25 @@ __init__() {
|
|||||||
MINIO_VERSION=RELEASE.2019-12-19T22-52-26Z docker-compose \
|
MINIO_VERSION=RELEASE.2019-12-19T22-52-26Z docker-compose \
|
||||||
-f "buildscripts/upgrade-tests/compose.yml" \
|
-f "buildscripts/upgrade-tests/compose.yml" \
|
||||||
up -d --build
|
up -d --build
|
||||||
until (mc alias set minio http://127.0.0.1:9000 minioadmin minioadmin); do
|
|
||||||
echo "...waiting..." && sleep 5;
|
add_alias
|
||||||
done
|
|
||||||
|
|
||||||
mc mb minio/minio-test/
|
mc mb minio/minio-test/
|
||||||
mc cp ./minio minio/minio-test/to-read/
|
mc cp ./minio minio/minio-test/to-read/
|
||||||
mc cp /etc/hosts minio/minio-test/to-read/hosts
|
mc cp /etc/hosts minio/minio-test/to-read/hosts
|
||||||
mc policy set download minio/minio-test
|
mc policy set download minio/minio-test
|
||||||
mc cat minio/minio-test/to-read/minio | sha256sum
|
|
||||||
mc cat ./minio | sha256sum
|
verify_checksum_mc ./minio minio/minio-test/to-read/minio
|
||||||
|
|
||||||
curl -s http://127.0.0.1:9000/minio-test/to-read/hosts | sha256sum
|
curl -s http://127.0.0.1:9000/minio-test/to-read/hosts | sha256sum
|
||||||
|
|
||||||
MINIO_VERSION=dev docker-compose -f "buildscripts/upgrade-tests/compose.yml" stop
|
MINIO_VERSION=dev docker-compose -f "buildscripts/upgrade-tests/compose.yml" stop
|
||||||
}
|
}
|
||||||
|
|
||||||
verify_checksum_after_heal() {
|
|
||||||
sum1=$(curl -s "$2" | sha256sum);
|
|
||||||
mc admin heal --json -r "$1" >/dev/null; # test after healing
|
|
||||||
sum1_heal=$(curl -s "$2" | sha256sum);
|
|
||||||
|
|
||||||
if [ "${sum1_heal}" != "${sum1}" ]; then
|
|
||||||
echo "mismatch expected ${sum1_heal}, got ${sum1}"
|
|
||||||
exit 1;
|
|
||||||
fi
|
|
||||||
}
|
|
||||||
|
|
||||||
verify_checksum_mc() {
|
|
||||||
expected=$(mc cat "$1" | sha256sum)
|
|
||||||
got=$(mc cat "$2" | sha256sum)
|
|
||||||
|
|
||||||
if [ "${expected}" != "${got}" ]; then
|
|
||||||
echo "mismatch expected ${expected}, got ${got}"
|
|
||||||
exit 1;
|
|
||||||
fi
|
|
||||||
}
|
|
||||||
|
|
||||||
main() {
|
main() {
|
||||||
MINIO_VERSION=dev docker-compose -f "buildscripts/upgrade-tests/compose.yml" up -d --build
|
MINIO_VERSION=dev docker-compose -f "buildscripts/upgrade-tests/compose.yml" up -d --build
|
||||||
|
|
||||||
until (mc alias set minio http://127.0.0.1:9000 minioadmin minioadmin); do
|
add_alias
|
||||||
echo "...waiting..." && sleep 5
|
|
||||||
done
|
|
||||||
|
|
||||||
verify_checksum_after_heal minio/minio-test http://127.0.0.1:9000/minio-test/to-read/hosts
|
verify_checksum_after_heal minio/minio-test http://127.0.0.1:9000/minio-test/to-read/hosts
|
||||||
|
|
||||||
|
@ -134,9 +134,9 @@ func listOnlineDisks(disks []StorageAPI, partsMetadata []FileInfo, errs []error)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Returns the latest updated FileInfo files and error in case of failure.
|
// Returns the latest updated FileInfo files and error in case of failure.
|
||||||
func getLatestFileInfo(ctx context.Context, partsMetadata []FileInfo, errs []error, quorum int) (FileInfo, error) {
|
func getLatestFileInfo(ctx context.Context, partsMetadata []FileInfo, errs []error) (FileInfo, error) {
|
||||||
// There should be atleast half correct entries, if not return failure
|
// There should be atleast half correct entries, if not return failure
|
||||||
reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, quorum)
|
reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, len(partsMetadata)/2)
|
||||||
if reducedErr != nil {
|
if reducedErr != nil {
|
||||||
return FileInfo{}, reducedErr
|
return FileInfo{}, reducedErr
|
||||||
}
|
}
|
||||||
@ -151,6 +151,10 @@ func getLatestFileInfo(ctx context.Context, partsMetadata []FileInfo, errs []err
|
|||||||
// Reduce list of UUIDs to a single common value - i.e. the last updated Time
|
// Reduce list of UUIDs to a single common value - i.e. the last updated Time
|
||||||
modTime := commonTime(modTimes)
|
modTime := commonTime(modTimes)
|
||||||
|
|
||||||
|
if modTime.IsZero() || modTime.Equal(timeSentinel) {
|
||||||
|
return FileInfo{}, errErasureReadQuorum
|
||||||
|
}
|
||||||
|
|
||||||
// Interate through all the modTimes and count the FileInfo(s) with latest time.
|
// Interate through all the modTimes and count the FileInfo(s) with latest time.
|
||||||
for index, t := range modTimes {
|
for index, t := range modTimes {
|
||||||
if partsMetadata[index].IsValid() && t.Equal(modTime) {
|
if partsMetadata[index].IsValid() && t.Equal(modTime) {
|
||||||
@ -159,7 +163,11 @@ func getLatestFileInfo(ctx context.Context, partsMetadata []FileInfo, errs []err
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if count < quorum {
|
if !latestFileInfo.IsValid() {
|
||||||
|
return FileInfo{}, errErasureReadQuorum
|
||||||
|
}
|
||||||
|
|
||||||
|
if count < latestFileInfo.Erasure.DataBlocks {
|
||||||
return FileInfo{}, errErasureReadQuorum
|
return FileInfo{}, errErasureReadQuorum
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -174,10 +182,8 @@ func getLatestFileInfo(ctx context.Context, partsMetadata []FileInfo, errs []err
|
|||||||
// - slice of errors about the state of data files on disk - can have
|
// - slice of errors about the state of data files on disk - can have
|
||||||
// a not-found error or a hash-mismatch error.
|
// a not-found error or a hash-mismatch error.
|
||||||
func disksWithAllParts(ctx context.Context, onlineDisks []StorageAPI, partsMetadata []FileInfo,
|
func disksWithAllParts(ctx context.Context, onlineDisks []StorageAPI, partsMetadata []FileInfo,
|
||||||
errs []error, bucket, object string, scanMode madmin.HealScanMode) ([]StorageAPI, []error) {
|
errs []error, latestMeta FileInfo,
|
||||||
|
bucket, object string, scanMode madmin.HealScanMode) ([]StorageAPI, []error) {
|
||||||
// List of disks having latest version of the object xl.meta (by modtime)
|
|
||||||
_, modTime := listOnlineDisks(onlineDisks, partsMetadata, errs)
|
|
||||||
|
|
||||||
availableDisks := make([]StorageAPI, len(onlineDisks))
|
availableDisks := make([]StorageAPI, len(onlineDisks))
|
||||||
dataErrs := make([]error, len(onlineDisks))
|
dataErrs := make([]error, len(onlineDisks))
|
||||||
@ -214,13 +220,13 @@ func disksWithAllParts(ctx context.Context, onlineDisks []StorageAPI, partsMetad
|
|||||||
dataErrs[i] = errs[i]
|
dataErrs[i] = errs[i]
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if onlineDisk == nil {
|
if onlineDisk == OfflineDisk {
|
||||||
dataErrs[i] = errDiskNotFound
|
dataErrs[i] = errDiskNotFound
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
meta := partsMetadata[i]
|
meta := partsMetadata[i]
|
||||||
if !meta.ModTime.Equal(modTime) {
|
if !meta.ModTime.Equal(latestMeta.ModTime) || meta.DataDir != latestMeta.DataDir {
|
||||||
dataErrs[i] = errFileCorrupt
|
dataErrs[i] = errFileCorrupt
|
||||||
partsMetadata[i] = FileInfo{}
|
partsMetadata[i] = FileInfo{}
|
||||||
continue
|
continue
|
||||||
@ -268,17 +274,18 @@ func disksWithAllParts(ctx context.Context, onlineDisks []StorageAPI, partsMetad
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
meta.DataDir = latestMeta.DataDir
|
||||||
switch scanMode {
|
switch scanMode {
|
||||||
case madmin.HealDeepScan:
|
case madmin.HealDeepScan:
|
||||||
// disk has a valid xl.meta but may not have all the
|
// disk has a valid xl.meta but may not have all the
|
||||||
// parts. This is considered an outdated disk, since
|
// parts. This is considered an outdated disk, since
|
||||||
// it needs healing too.
|
// it needs healing too.
|
||||||
if !partsMetadata[i].Deleted && !partsMetadata[i].IsRemote() {
|
if !meta.Deleted && !meta.IsRemote() {
|
||||||
dataErrs[i] = onlineDisk.VerifyFile(ctx, bucket, object, partsMetadata[i])
|
dataErrs[i] = onlineDisk.VerifyFile(ctx, bucket, object, meta)
|
||||||
}
|
}
|
||||||
case madmin.HealNormalScan:
|
case madmin.HealNormalScan:
|
||||||
if !partsMetadata[i].Deleted && !partsMetadata[i].IsRemote() {
|
if !meta.Deleted && !meta.IsRemote() {
|
||||||
dataErrs[i] = onlineDisk.CheckParts(ctx, bucket, object, partsMetadata[i])
|
dataErrs[i] = onlineDisk.CheckParts(ctx, bucket, object, meta)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -20,6 +20,7 @@ package cmd
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
@ -193,7 +194,7 @@ func TestListOnlineDisks(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false)
|
partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false)
|
||||||
fi, err := getLatestFileInfo(ctx, partsMetadata, errs, getReadQuorum(len(disks)))
|
fi, err := getLatestFileInfo(ctx, partsMetadata, errs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Failed to getLatestFileInfo %v", err)
|
t.Fatalf("Failed to getLatestFileInfo %v", err)
|
||||||
}
|
}
|
||||||
@ -250,7 +251,7 @@ func TestListOnlineDisks(t *testing.T) {
|
|||||||
t.Fatalf("Expected modTime to be equal to %v but was found to be %v",
|
t.Fatalf("Expected modTime to be equal to %v but was found to be %v",
|
||||||
test.expectedTime, modTime)
|
test.expectedTime, modTime)
|
||||||
}
|
}
|
||||||
availableDisks, newErrs := disksWithAllParts(ctx, onlineDisks, partsMetadata, test.errs, bucket, object, madmin.HealDeepScan)
|
availableDisks, newErrs := disksWithAllParts(ctx, onlineDisks, partsMetadata, test.errs, fi, bucket, object, madmin.HealDeepScan)
|
||||||
test.errs = newErrs
|
test.errs = newErrs
|
||||||
|
|
||||||
if test._tamperBackend != noTamper {
|
if test._tamperBackend != noTamper {
|
||||||
@ -365,7 +366,7 @@ func TestListOnlineDisksSmallObjects(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", true)
|
partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", true)
|
||||||
_, err = getLatestFileInfo(ctx, partsMetadata, errs, getReadQuorum(len(disks)))
|
_, err = getLatestFileInfo(ctx, partsMetadata, errs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Failed to getLatestFileInfo %v", err)
|
t.Fatalf("Failed to getLatestFileInfo %v", err)
|
||||||
}
|
}
|
||||||
@ -421,9 +422,9 @@ func TestListOnlineDisksSmallObjects(t *testing.T) {
|
|||||||
|
|
||||||
}
|
}
|
||||||
partsMetadata, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", true)
|
partsMetadata, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", true)
|
||||||
_, err = getLatestFileInfo(ctx, partsMetadata, errs, len(disks)/2)
|
fi, err := getLatestFileInfo(ctx, partsMetadata, errs)
|
||||||
if err != nil {
|
if !errors.Is(err, errErasureReadQuorum) {
|
||||||
t.Fatalf("Failed to getLatestFileInfo %v", err)
|
t.Fatalf("Failed to getLatestFileInfo, expected %v, got %v", errErasureReadQuorum, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
onlineDisks, modTime := listOnlineDisks(erasureDisks, partsMetadata, test.errs)
|
onlineDisks, modTime := listOnlineDisks(erasureDisks, partsMetadata, test.errs)
|
||||||
@ -432,7 +433,7 @@ func TestListOnlineDisksSmallObjects(t *testing.T) {
|
|||||||
test.expectedTime, modTime)
|
test.expectedTime, modTime)
|
||||||
}
|
}
|
||||||
|
|
||||||
availableDisks, newErrs := disksWithAllParts(ctx, onlineDisks, partsMetadata, test.errs, bucket, object, madmin.HealDeepScan)
|
availableDisks, newErrs := disksWithAllParts(ctx, onlineDisks, partsMetadata, test.errs, fi, bucket, object, madmin.HealDeepScan)
|
||||||
test.errs = newErrs
|
test.errs = newErrs
|
||||||
|
|
||||||
if test._tamperBackend != noTamper {
|
if test._tamperBackend != noTamper {
|
||||||
@ -486,9 +487,14 @@ func TestDisksWithAllParts(t *testing.T) {
|
|||||||
t.Fatalf("Failed to read xl meta data %v", err)
|
t.Fatalf("Failed to read xl meta data %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fi, err := getLatestFileInfo(ctx, partsMetadata, errs)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to get quorum consistent fileInfo %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
erasureDisks, _ = listOnlineDisks(erasureDisks, partsMetadata, errs)
|
erasureDisks, _ = listOnlineDisks(erasureDisks, partsMetadata, errs)
|
||||||
|
|
||||||
filteredDisks, errs := disksWithAllParts(ctx, erasureDisks, partsMetadata, errs, bucket, object, madmin.HealDeepScan)
|
filteredDisks, errs := disksWithAllParts(ctx, erasureDisks, partsMetadata, errs, fi, bucket, object, madmin.HealDeepScan)
|
||||||
|
|
||||||
if len(filteredDisks) != len(erasureDisks) {
|
if len(filteredDisks) != len(erasureDisks) {
|
||||||
t.Errorf("Unexpected number of disks: %d", len(filteredDisks))
|
t.Errorf("Unexpected number of disks: %d", len(filteredDisks))
|
||||||
@ -509,7 +515,7 @@ func TestDisksWithAllParts(t *testing.T) {
|
|||||||
partsMetadata[0].ModTime = partsMetadata[0].ModTime.Add(-1 * time.Hour)
|
partsMetadata[0].ModTime = partsMetadata[0].ModTime.Add(-1 * time.Hour)
|
||||||
|
|
||||||
errs = make([]error, len(erasureDisks))
|
errs = make([]error, len(erasureDisks))
|
||||||
filteredDisks, _ = disksWithAllParts(ctx, erasureDisks, partsMetadata, errs, bucket, object, madmin.HealDeepScan)
|
filteredDisks, _ = disksWithAllParts(ctx, erasureDisks, partsMetadata, errs, fi, bucket, object, madmin.HealDeepScan)
|
||||||
|
|
||||||
if len(filteredDisks) != len(erasureDisks) {
|
if len(filteredDisks) != len(erasureDisks) {
|
||||||
t.Errorf("Unexpected number of disks: %d", len(filteredDisks))
|
t.Errorf("Unexpected number of disks: %d", len(filteredDisks))
|
||||||
@ -529,7 +535,7 @@ func TestDisksWithAllParts(t *testing.T) {
|
|||||||
partsMetadata[1].DataDir = "foo-random"
|
partsMetadata[1].DataDir = "foo-random"
|
||||||
|
|
||||||
errs = make([]error, len(erasureDisks))
|
errs = make([]error, len(erasureDisks))
|
||||||
filteredDisks, _ = disksWithAllParts(ctx, erasureDisks, partsMetadata, errs, bucket, object, madmin.HealDeepScan)
|
filteredDisks, _ = disksWithAllParts(ctx, erasureDisks, partsMetadata, errs, fi, bucket, object, madmin.HealDeepScan)
|
||||||
|
|
||||||
if len(filteredDisks) != len(erasureDisks) {
|
if len(filteredDisks) != len(erasureDisks) {
|
||||||
t.Errorf("Unexpected number of disks: %d", len(filteredDisks))
|
t.Errorf("Unexpected number of disks: %d", len(filteredDisks))
|
||||||
@ -565,7 +571,7 @@ func TestDisksWithAllParts(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
errs = make([]error, len(erasureDisks))
|
errs = make([]error, len(erasureDisks))
|
||||||
filteredDisks, errs = disksWithAllParts(ctx, erasureDisks, partsMetadata, errs, bucket, object, madmin.HealDeepScan)
|
filteredDisks, errs = disksWithAllParts(ctx, erasureDisks, partsMetadata, errs, fi, bucket, object, madmin.HealDeepScan)
|
||||||
|
|
||||||
if len(filteredDisks) != len(erasureDisks) {
|
if len(filteredDisks) != len(erasureDisks) {
|
||||||
t.Errorf("Unexpected number of disks: %d", len(filteredDisks))
|
t.Errorf("Unexpected number of disks: %d", len(filteredDisks))
|
||||||
|
@ -288,13 +288,20 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
|
|||||||
// Re-read when we have lock...
|
// Re-read when we have lock...
|
||||||
partsMetadata, errs := readAllFileInfo(ctx, storageDisks, bucket, object, versionID, true)
|
partsMetadata, errs := readAllFileInfo(ctx, storageDisks, bucket, object, versionID, true)
|
||||||
|
|
||||||
if _, err = getLatestFileInfo(ctx, partsMetadata, errs, er.defaultParityCount); err != nil {
|
if _, err = getLatestFileInfo(ctx, partsMetadata, errs); err != nil {
|
||||||
return er.purgeObjectDangling(ctx, bucket, object, versionID, partsMetadata, errs, []error{}, opts)
|
return er.purgeObjectDangling(ctx, bucket, object, versionID, partsMetadata, errs, []error{}, opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
// List of disks having latest version of the object er.meta
|
// List of disks having latest version of the object er.meta
|
||||||
// (by modtime).
|
// (by modtime).
|
||||||
_, modTime := listOnlineDisks(storageDisks, partsMetadata, errs)
|
onlineDisks, modTime := listOnlineDisks(storageDisks, partsMetadata, errs)
|
||||||
|
|
||||||
|
// Latest FileInfo for reference. If a valid metadata is not
|
||||||
|
// present, it is as good as object not found.
|
||||||
|
latestMeta, err := pickValidFileInfo(ctx, partsMetadata, modTime, result.DataBlocks)
|
||||||
|
if err != nil {
|
||||||
|
return result, toObjectErr(err, bucket, object, versionID)
|
||||||
|
}
|
||||||
|
|
||||||
// List of disks having all parts as per latest metadata.
|
// List of disks having all parts as per latest metadata.
|
||||||
// NOTE: do not pass in latestDisks to diskWithAllParts since
|
// NOTE: do not pass in latestDisks to diskWithAllParts since
|
||||||
@ -305,15 +312,8 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
|
|||||||
// used here for reconstruction. This is done to ensure that
|
// used here for reconstruction. This is done to ensure that
|
||||||
// we do not skip drives that have inconsistent metadata to be
|
// we do not skip drives that have inconsistent metadata to be
|
||||||
// skipped from purging when they are stale.
|
// skipped from purging when they are stale.
|
||||||
availableDisks, dataErrs := disksWithAllParts(ctx, storageDisks, partsMetadata,
|
availableDisks, dataErrs := disksWithAllParts(ctx, onlineDisks, partsMetadata,
|
||||||
errs, bucket, object, scanMode)
|
errs, latestMeta, bucket, object, scanMode)
|
||||||
|
|
||||||
// Latest FileInfo for reference. If a valid metadata is not
|
|
||||||
// present, it is as good as object not found.
|
|
||||||
latestMeta, err := pickValidFileInfo(ctx, partsMetadata, modTime, result.DataBlocks)
|
|
||||||
if err != nil {
|
|
||||||
return result, toObjectErr(err, bucket, object, versionID)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Loop to find number of disks with valid data, per-drive
|
// Loop to find number of disks with valid data, per-drive
|
||||||
// data state and a list of outdated disks on which data needs
|
// data state and a list of outdated disks on which data needs
|
||||||
|
@ -416,7 +416,7 @@ func TestHealObjectCorrupted(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fileInfos, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false)
|
fileInfos, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false)
|
||||||
fi, err := getLatestFileInfo(ctx, fileInfos, errs, er.defaultParityCount)
|
fi, err := getLatestFileInfo(ctx, fileInfos, errs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Failed to getLatestFileInfo - %v", err)
|
t.Fatalf("Failed to getLatestFileInfo - %v", err)
|
||||||
}
|
}
|
||||||
@ -441,7 +441,7 @@ func TestHealObjectCorrupted(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false)
|
fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false)
|
||||||
nfi, err := getLatestFileInfo(ctx, fileInfos, errs, er.defaultParityCount)
|
nfi, err := getLatestFileInfo(ctx, fileInfos, errs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Failed to getLatestFileInfo - %v", err)
|
t.Fatalf("Failed to getLatestFileInfo - %v", err)
|
||||||
}
|
}
|
||||||
@ -467,7 +467,7 @@ func TestHealObjectCorrupted(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false)
|
fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false)
|
||||||
nfi, err = getLatestFileInfo(ctx, fileInfos, errs, er.defaultParityCount)
|
nfi, err = getLatestFileInfo(ctx, fileInfos, errs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Failed to getLatestFileInfo - %v", err)
|
t.Fatalf("Failed to getLatestFileInfo - %v", err)
|
||||||
}
|
}
|
||||||
|
@ -390,15 +390,11 @@ func writeUniqueFileInfo(ctx context.Context, disks []StorageAPI, bucket, prefix
|
|||||||
// writeQuorum is the min required disks to write data.
|
// writeQuorum is the min required disks to write data.
|
||||||
func objectQuorumFromMeta(ctx context.Context, partsMetaData []FileInfo, errs []error, defaultParityCount int) (objectReadQuorum, objectWriteQuorum int, err error) {
|
func objectQuorumFromMeta(ctx context.Context, partsMetaData []FileInfo, errs []error, defaultParityCount int) (objectReadQuorum, objectWriteQuorum int, err error) {
|
||||||
// get the latest updated Metadata and a count of all the latest updated FileInfo(s)
|
// get the latest updated Metadata and a count of all the latest updated FileInfo(s)
|
||||||
latestFileInfo, err := getLatestFileInfo(ctx, partsMetaData, errs, defaultParityCount)
|
latestFileInfo, err := getLatestFileInfo(ctx, partsMetaData, errs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, 0, err
|
return 0, 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if !latestFileInfo.IsValid() {
|
|
||||||
return 0, 0, errErasureReadQuorum
|
|
||||||
}
|
|
||||||
|
|
||||||
parityBlocks := globalStorageClass.GetParityForSC(latestFileInfo.Metadata[xhttp.AmzStorageClass])
|
parityBlocks := globalStorageClass.GetParityForSC(latestFileInfo.Metadata[xhttp.AmzStorageClass])
|
||||||
if parityBlocks <= 0 {
|
if parityBlocks <= 0 {
|
||||||
parityBlocks = defaultParityCount
|
parityBlocks = defaultParityCount
|
||||||
|
Loading…
Reference in New Issue
Block a user