re-implement pickValidInfo dataDir, move to quorum calculation (#13681)

dataDir loosely based on maxima is incorrect and does not
work in all situations such as disks in the following order

- xl.json migration to xl.meta there may be partial xl.json's
  leftover if some disks are not yet connected when the disk
  is yet to come up, since xl.json mtime and xl.meta is
  same the dataDir maxima doesn't work properly leading to
  quorum issues.

- its also possible that XLV1 might be true among the disks
  available, make sure to keep FileInfo based on common quorum
  and skip unexpected disks with the older data format.

Also, this PR tests upgrade from older to a newer release if the 
data is readable and matches the checksum.

NOTE: this is just initial work we can build on top of this to do further tests.
This commit is contained in:
Harshavardhana 2021-11-21 10:41:30 -08:00 committed by GitHub
parent 36b5426f6e
commit c791de0e1e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 378 additions and 155 deletions

View File

@ -17,7 +17,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
go-version: [1.16.x, 1.17.x]
go-version: [1.17.x]
os: [ubuntu-latest]
steps:
- uses: actions/checkout@v2

View File

@ -17,7 +17,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
go-version: [1.16.x, 1.17.x]
go-version: [1.17.x]
os: [ubuntu-latest, windows-latest]
steps:
- uses: actions/checkout@v2

31
.github/workflows/upgrade-ci-cd.yaml vendored Normal file
View File

@ -0,0 +1,31 @@
name: Upgrade old version tests
on:
pull_request:
branches:
- master
# This ensures that previous jobs for the PR are canceled when the PR is
# updated.
concurrency:
group: ${{ github.workflow }}-${{ github.head_ref }}
cancel-in-progress: true
jobs:
build:
name: Go ${{ matrix.go-version }} on ${{ matrix.os }}
runs-on: ${{ matrix.os }}
strategy:
matrix:
go-version: [1.17.x]
os: [ubuntu-latest]
steps:
- uses: actions/checkout@v1
- uses: actions/setup-go@v2
with:
go-version: ${{ matrix.go-version }}
- name: Start upgrade tests
run: |
make test-upgrade

View File

@ -42,6 +42,10 @@ test: verifiers build ## builds minio, runs linters, tests
@echo "Running unit tests"
@GO111MODULE=on CGO_ENABLED=0 go test -tags kqueue ./... 1>/dev/null
test-upgrade: build
@echo "Running minio upgrade tests"
@(env bash $(PWD)/buildscripts/minio-upgrade.sh)
test-race: verifiers build ## builds minio, runs linters, tests (race)
@echo "Running unit tests under -race"
@(env bash $(PWD)/buildscripts/race.sh)

View File

@ -0,0 +1,77 @@
#!/bin/bash
trap 'cleanup $LINENO' ERR
# shellcheck disable=SC2120
cleanup() {
MINIO_VERSION=dev docker-compose \
-f "buildscripts/upgrade-tests/compose.yml" \
rm -s -f
docker volume prune -f
}
__init__() {
sudo apt install curl -y
export GOPATH=/tmp/gopath
export PATH=${PATH}:${GOPATH}/bin
go install github.com/minio/mc@latest
TAG=minio/minio:dev make docker
MINIO_VERSION=RELEASE.2019-12-19T22-52-26Z docker-compose \
-f "buildscripts/upgrade-tests/compose.yml" \
up -d --build
until (mc alias set minio http://127.0.0.1:9000 minioadmin minioadmin); do
echo "...waiting..." && sleep 5;
done
mc mb minio/minio-test/
mc cp ./minio minio/minio-test/to-read/
mc cp /etc/hosts minio/minio-test/to-read/hosts
mc policy set download minio/minio-test
mc cat minio/minio-test/to-read/minio | sha256sum
mc cat ./minio | sha256sum
curl -s http://127.0.0.1:9000/minio-test/to-read/hosts | sha256sum
MINIO_VERSION=dev docker-compose -f "buildscripts/upgrade-tests/compose.yml" stop
}
verify_checksum_after_heal() {
sum1=$(curl -s "$2" | sha256sum);
mc admin heal --json -r "$1" >/dev/null; # test after healing
sum1_heal=$(curl -s "$2" | sha256sum);
if [ "${sum1_heal}" != "${sum1}" ]; then
echo "mismatch expected ${sum1_heal}, got ${sum1}"
exit 1;
fi
}
verify_checksum_mc() {
expected=$(mc cat "$1" | sha256sum)
got=$(mc cat "$2" | sha256sum)
if [ "${expected}" != "${got}" ]; then
echo "mismatch expected ${expected}, got ${got}"
exit 1;
fi
}
main() {
MINIO_VERSION=dev docker-compose -f "buildscripts/upgrade-tests/compose.yml" up -d --build
until (mc alias set minio http://127.0.0.1:9000 minioadmin minioadmin); do
echo "...waiting..." && sleep 5
done
verify_checksum_after_heal minio/minio-test http://127.0.0.1:9000/minio-test/to-read/hosts
verify_checksum_mc ./minio minio/minio-test/to-read/minio
verify_checksum_mc /etc/hosts minio/minio-test/to-read/hosts
cleanup
}
( __init__ "$@" && main "$@" )

View File

@ -0,0 +1,81 @@
version: '3.7'
# Settings and configurations that are common for all containers
x-minio-common: &minio-common
image: minio/minio:${MINIO_VERSION}
command: server http://minio{1...4}/data{1...3}
env_file:
- ./minio.env
expose:
- "9000"
- "9001"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 30s
timeout: 20s
retries: 3
# starts 4 docker containers running minio server instances.
# using nginx reverse proxy, load balancing, you can access
# it through port 9000.
services:
minio1:
<<: *minio-common
hostname: minio1
volumes:
- data1-1:/data1
- data1-2:/data2
- data1-3:/data3
minio2:
<<: *minio-common
hostname: minio2
volumes:
- data2-1:/data1
- data2-2:/data2
- data2-3:/data3
minio3:
<<: *minio-common
hostname: minio3
volumes:
- data3-1:/data1
- data3-2:/data2
- data3-3:/data3
minio4:
<<: *minio-common
hostname: minio4
volumes:
- data4-1:/data1
- data4-2:/data2
- data4-3:/data3
nginx:
image: nginx:1.19.2-alpine
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf:ro
ports:
- "9000:9000"
- "9001:9001"
depends_on:
- minio1
- minio2
- minio3
- minio4
## By default this config uses default local driver,
## For custom volumes replace with volume driver configuration.
volumes:
data1-1:
data1-2:
data1-3:
data2-1:
data2-2:
data2-3:
data3-1:
data3-2:
data3-3:
data4-1:
data4-2:
data4-3:

View File

@ -0,0 +1,3 @@
MINIO_ACCESS_KEY=minioadmin
MINIO_SECRET_KEY=minioadmin
MINIO_BROWSER=off

View File

@ -0,0 +1,68 @@
user nginx;
worker_processes auto;
error_log /var/log/nginx/error.log warn;
pid /var/run/nginx.pid;
events {
worker_connections 1024;
}
http {
include /etc/nginx/mime.types;
default_type application/octet-stream;
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';
access_log /var/log/nginx/access.log main;
sendfile on;
#tcp_nopush on;
keepalive_timeout 65;
#gzip on;
# include /etc/nginx/conf.d/*.conf;
upstream minio {
server minio1:9000;
server minio2:9000;
server minio3:9000;
server minio4:9000;
}
# main minio
server {
listen 9000;
listen [::]:9000;
server_name localhost;
# To allow special characters in headers
ignore_invalid_headers off;
# Allow any size file to be uploaded.
# Set to a value such as 1000m; to restrict file size to a specific value
client_max_body_size 0;
# To disable buffering
proxy_buffering off;
location / {
proxy_set_header Host $http_host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_connect_timeout 300;
# Default is HTTP/1, keepalive is only enabled in HTTP/1.1
proxy_http_version 1.1;
proxy_set_header Connection "";
chunked_transfer_encoding off;
proxy_pass http://minio;
}
}
}

View File

@ -26,33 +26,30 @@ import (
)
// commonTime returns a maximally occurring time from a list of time.
func commonTime(modTimes []time.Time, dataDirs []string) (modTime time.Time, dataDir string) {
var maxima int // Counter for remembering max occurrence of elements.
func commonTime(modTimes []time.Time) (modTime time.Time) {
timeOccurenceMap := make(map[int64]int, len(modTimes))
dataDirMap := make(map[int64]string, len(modTimes))
// Ignore the uuid sentinel and count the rest.
for i, t := range modTimes {
for _, t := range modTimes {
if t.Equal(timeSentinel) {
continue
}
dataDirMap[t.UnixNano()] = dataDirs[i]
timeOccurenceMap[t.UnixNano()]++
}
var maxima int // Counter for remembering max occurrence of elements.
// Find the common cardinality from previously collected
// occurrences of elements.
for nano, count := range timeOccurenceMap {
t := time.Unix(0, nano).UTC()
if count > maxima || (count == maxima && t.After(modTime)) {
maxima = count
dataDir = dataDirMap[nano]
modTime = t
}
}
// Return the collected common uuid.
return modTime, dataDir
// Return the collected common modTime.
return modTime
}
// Beginning of unix time is treated as sentinel value here.
@ -82,10 +79,14 @@ func listObjectModtimes(partsMetadata []FileInfo, errs []error) (modTimes []time
return modTimes
}
const (
errorDir = "error-dir"
delMarkerDir = ""
)
func filterOnlineDisksInplace(fi FileInfo, partsMetadata []FileInfo, onlineDisks []StorageAPI) {
for i, meta := range partsMetadata {
if fi.XLV1 == meta.XLV1 {
continue
}
onlineDisks[i] = nil
}
}
// Notes:
// There are 5 possible states a disk could be in,
@ -111,43 +112,25 @@ const (
// - a slice of disks where disk having 'older' xl.meta (or nothing)
// are set to nil.
// - latest (in time) of the maximally occurring modTime(s).
func listOnlineDisks(disks []StorageAPI, partsMetadata []FileInfo, errs []error) (onlineDisks []StorageAPI, modTime time.Time, dataDir string) {
func listOnlineDisks(disks []StorageAPI, partsMetadata []FileInfo, errs []error) (onlineDisks []StorageAPI, modTime time.Time) {
onlineDisks = make([]StorageAPI, len(disks))
// List all the file commit ids from parts metadata.
modTimes := listObjectModtimes(partsMetadata, errs)
dataDirs := getDataDirs(partsMetadata, errs)
// Reduce list of UUIDs to a single common value.
modTime, dataDir = commonTime(modTimes, dataDirs)
modTime = commonTime(modTimes)
// Create a new online disks slice, which have common uuid.
for index, t := range modTimes {
if partsMetadata[index].IsValid() && t.Equal(modTime) && partsMetadata[index].DataDir == dataDir {
if partsMetadata[index].IsValid() && t.Equal(modTime) {
onlineDisks[index] = disks[index]
} else {
onlineDisks[index] = nil
}
}
return onlineDisks, modTime, dataDir
}
func getDataDirs(partsMetadata []FileInfo, errs []error) []string {
dataDirs := make([]string, len(partsMetadata))
for idx, fi := range partsMetadata {
if errs[idx] != nil {
dataDirs[idx] = errorDir
continue
}
if fi.Deleted {
dataDirs[idx] = delMarkerDir
} else {
dataDirs[idx] = fi.DataDir
}
}
return dataDirs
return onlineDisks, modTime
}
// Returns the latest updated FileInfo files and error in case of failure.
@ -161,18 +144,16 @@ func getLatestFileInfo(ctx context.Context, partsMetadata []FileInfo, errs []err
// List all the file commit ids from parts metadata.
modTimes := listObjectModtimes(partsMetadata, errs)
dataDirs := getDataDirs(partsMetadata, errs)
// Count all latest updated FileInfo values
var count int
var latestFileInfo FileInfo
// Reduce list of UUIDs to a single common value - i.e. the last updated Time
modTime, dataDir := commonTime(modTimes, dataDirs)
modTime := commonTime(modTimes)
// Interate through all the modTimes and count the FileInfo(s) with latest time.
for index, t := range modTimes {
if partsMetadata[index].IsValid() && t.Equal(modTime) && dataDir == partsMetadata[index].DataDir {
if partsMetadata[index].IsValid() && t.Equal(modTime) {
latestFileInfo = partsMetadata[index]
count++
}
@ -195,8 +176,8 @@ func getLatestFileInfo(ctx context.Context, partsMetadata []FileInfo, errs []err
func disksWithAllParts(ctx context.Context, onlineDisks []StorageAPI, partsMetadata []FileInfo,
errs []error, bucket, object string, scanMode madmin.HealScanMode) ([]StorageAPI, []error) {
// List of disks having latest version of the object er.meta (by modtime)
_, modTime, dataDir := listOnlineDisks(onlineDisks, partsMetadata, errs)
// List of disks having latest version of the object xl.meta (by modtime)
_, modTime := listOnlineDisks(onlineDisks, partsMetadata, errs)
availableDisks := make([]StorageAPI, len(onlineDisks))
dataErrs := make([]error, len(onlineDisks))
@ -239,7 +220,7 @@ func disksWithAllParts(ctx context.Context, onlineDisks []StorageAPI, partsMetad
}
meta := partsMetadata[i]
if !meta.ModTime.Equal(modTime) || meta.DataDir != dataDir {
if !meta.ModTime.Equal(modTime) {
dataErrs[i] = errFileCorrupt
partsMetadata[i] = FileInfo{}
continue

View File

@ -34,10 +34,8 @@ import (
func TestCommonTime(t *testing.T) {
// List of test cases for common modTime.
testCases := []struct {
times []time.Time
dataDirs []string
time time.Time
dataDir string
times []time.Time
time time.Time
}{
{
// 1. Tests common times when slice has varying time elements.
@ -49,17 +47,8 @@ func TestCommonTime(t *testing.T) {
time.Unix(0, 2).UTC(),
time.Unix(0, 3).UTC(),
time.Unix(0, 1).UTC(),
}, []string{
errorDir,
delMarkerDir,
"cd3b36c0-49e6-11ec-8087-73a2b2fd4016",
"cd3b36c0-49e6-11ec-8087-73a2b2fd4016",
"cd3b36c0-49e6-11ec-8087-73a2b2fd4016",
"cd3b36c0-49e6-11ec-8087-73a2b2fd4016",
"cd3b36c0-49e6-11ec-8087-73a2b2fd4016",
},
time.Unix(0, 3).UTC(),
"cd3b36c0-49e6-11ec-8087-73a2b2fd4016",
},
{
// 2. Tests common time obtained when all elements are equal.
@ -71,17 +60,8 @@ func TestCommonTime(t *testing.T) {
time.Unix(0, 3).UTC(),
time.Unix(0, 3).UTC(),
time.Unix(0, 3).UTC(),
}, []string{
errorDir,
delMarkerDir,
delMarkerDir,
delMarkerDir,
delMarkerDir,
delMarkerDir,
delMarkerDir,
},
time.Unix(0, 3).UTC(),
delMarkerDir,
},
{
// 3. Tests common time obtained when elements have a mixture
@ -97,17 +77,8 @@ func TestCommonTime(t *testing.T) {
timeSentinel,
timeSentinel,
timeSentinel,
}, []string{
errorDir,
delMarkerDir,
delMarkerDir,
delMarkerDir,
delMarkerDir,
delMarkerDir,
delMarkerDir,
},
time.Unix(0, 3).UTC(),
delMarkerDir,
},
}
@ -115,13 +86,9 @@ func TestCommonTime(t *testing.T) {
// common modtime. Tests fail if modtime does not match.
for i, testCase := range testCases {
// Obtain a common mod time from modTimes slice.
ctime, dataDir := commonTime(testCase.times, testCase.dataDirs)
ctime := commonTime(testCase.times)
if !testCase.time.Equal(ctime) {
t.Errorf("Test case %d, expect to pass but failed. Wanted modTime: %s, got modTime: %s\n", i+1, testCase.time, ctime)
continue
}
if dataDir != testCase.dataDir {
t.Errorf("Test case %d, expect to pass but failed. Wanted dataDir: %s, got dataDir: %s\n", i+1, testCase.dataDir, dataDir)
}
}
}
@ -278,15 +245,11 @@ func TestListOnlineDisks(t *testing.T) {
}
onlineDisks, modTime, dataDir := listOnlineDisks(erasureDisks, partsMetadata, test.errs)
onlineDisks, modTime := listOnlineDisks(erasureDisks, partsMetadata, test.errs)
if !modTime.Equal(test.expectedTime) {
t.Fatalf("Expected modTime to be equal to %v but was found to be %v",
test.expectedTime, modTime)
}
if fi.DataDir != dataDir {
t.Fatalf("Expected dataDir to be equal to %v but was found to be %v",
fi.DataDir, dataDir)
}
availableDisks, newErrs := disksWithAllParts(ctx, onlineDisks, partsMetadata, test.errs, bucket, object, madmin.HealDeepScan)
test.errs = newErrs
@ -395,13 +358,14 @@ func TestListOnlineDisksSmallObjects(t *testing.T) {
for i, test := range testCases {
test := test
t.Run(fmt.Sprintf("case-%d", i), func(t *testing.T) {
_, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), ObjectOptions{})
_, err := obj.PutObject(ctx, bucket, object,
mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), ObjectOptions{})
if err != nil {
t.Fatalf("Failed to putObject %v", err)
}
partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", true)
fi, err := getLatestFileInfo(ctx, partsMetadata, errs, getReadQuorum(len(disks)))
_, err = getLatestFileInfo(ctx, partsMetadata, errs, getReadQuorum(len(disks)))
if err != nil {
t.Fatalf("Failed to getLatestFileInfo %v", err)
}
@ -462,17 +426,12 @@ func TestListOnlineDisksSmallObjects(t *testing.T) {
t.Fatalf("Failed to getLatestFileInfo %v", err)
}
onlineDisks, modTime, dataDir := listOnlineDisks(erasureDisks, partsMetadata, test.errs)
onlineDisks, modTime := listOnlineDisks(erasureDisks, partsMetadata, test.errs)
if !modTime.Equal(test.expectedTime) {
t.Fatalf("Expected modTime to be equal to %v but was found to be %v",
test.expectedTime, modTime)
}
if fi.DataDir != dataDir {
t.Fatalf("Expected dataDir to be equal to %v but was found to be %v",
fi.DataDir, dataDir)
}
availableDisks, newErrs := disksWithAllParts(ctx, onlineDisks, partsMetadata, test.errs, bucket, object, madmin.HealDeepScan)
test.errs = newErrs
@ -527,7 +486,7 @@ func TestDisksWithAllParts(t *testing.T) {
t.Fatalf("Failed to read xl meta data %v", err)
}
erasureDisks, _, _ = listOnlineDisks(erasureDisks, partsMetadata, errs)
erasureDisks, _ = listOnlineDisks(erasureDisks, partsMetadata, errs)
filteredDisks, errs := disksWithAllParts(ctx, erasureDisks, partsMetadata, errs, bucket, object, madmin.HealDeepScan)

View File

@ -294,15 +294,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
// List of disks having latest version of the object er.meta
// (by modtime).
_, modTime, dataDir := listOnlineDisks(storageDisks, partsMetadata, errs)
// make sure all parts metadata dataDir is same as returned by listOnlineDisks()
// the reason is its possible that some of the disks might have stale data, for those
// we simply override them with maximally occurring 'dataDir' - this ensures that
// disksWithAllParts() verifies same dataDir across all drives.
for i := range partsMetadata {
partsMetadata[i].DataDir = dataDir
}
_, modTime := listOnlineDisks(storageDisks, partsMetadata, errs)
// List of disks having all parts as per latest metadata.
// NOTE: do not pass in latestDisks to diskWithAllParts since
@ -318,7 +310,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
// Latest FileInfo for reference. If a valid metadata is not
// present, it is as good as object not found.
latestMeta, err := pickValidFileInfo(ctx, partsMetadata, modTime, dataDir, result.DataBlocks)
latestMeta, err := pickValidFileInfo(ctx, partsMetadata, modTime, result.DataBlocks)
if err != nil {
return result, toObjectErr(err, bucket, object, versionID)
}
@ -551,6 +543,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
if disk == OfflineDisk {
continue
}
// record the index of the updated disks
partsMetadata[i].Erasure.Index = i + 1

View File

@ -179,6 +179,10 @@ func shuffleDisksAndPartsMetadataByIndex(disks []StorageAPI, metaArr []FileInfo,
inconsistent++
continue
}
if meta.XLV1 != fi.XLV1 {
inconsistent++
continue
}
// check if erasure distribution order matches the index
// position if this is not correct we discard the disk
// and move to collect others
@ -228,6 +232,9 @@ func shuffleDisksAndPartsMetadata(disks []StorageAPI, partsMetadata []FileInfo,
// if object was ever written previously.
continue
}
if !init && fi.XLV1 != partsMetadata[index].XLV1 {
continue
}
blockIndex := distribution[index]
shuffledPartsMetadata[blockIndex-1] = partsMetadata[index]
shuffledDisks[blockIndex-1] = disks[index]

View File

@ -282,7 +282,7 @@ func (fi FileInfo) ObjectToPartOffset(ctx context.Context, offset int64) (partIn
return 0, 0, InvalidRange{}
}
func findFileInfoInQuorum(ctx context.Context, metaArr []FileInfo, modTime time.Time, dataDir string, quorum int) (FileInfo, error) {
func findFileInfoInQuorum(ctx context.Context, metaArr []FileInfo, modTime time.Time, quorum int) (FileInfo, error) {
// with less quorum return error.
if quorum < 2 {
return FileInfo{}, errErasureReadQuorum
@ -290,7 +290,9 @@ func findFileInfoInQuorum(ctx context.Context, metaArr []FileInfo, modTime time.
metaHashes := make([]string, len(metaArr))
h := sha256.New()
for i, meta := range metaArr {
if meta.IsValid() && meta.ModTime.Equal(modTime) && meta.DataDir == dataDir {
if meta.IsValid() && meta.ModTime.Equal(modTime) {
fmt.Fprintf(h, "%v", meta.XLV1)
fmt.Fprintf(h, "%v", meta.GetDataDir())
for _, part := range meta.Parts {
fmt.Fprintf(h, "part.%d", part.Number)
}
@ -351,8 +353,8 @@ func findFileInfoInQuorum(ctx context.Context, metaArr []FileInfo, modTime time.
// pickValidFileInfo - picks one valid FileInfo content and returns from a
// slice of FileInfo.
func pickValidFileInfo(ctx context.Context, metaArr []FileInfo, modTime time.Time, dataDir string, quorum int) (FileInfo, error) {
return findFileInfoInQuorum(ctx, metaArr, modTime, dataDir, quorum)
func pickValidFileInfo(ctx context.Context, metaArr []FileInfo, modTime time.Time, quorum int) (FileInfo, error) {
return findFileInfoInQuorum(ctx, metaArr, modTime, quorum)
}
// writeUniqueFileInfo - writes unique `xl.meta` content for each disk concurrently.

View File

@ -178,28 +178,24 @@ func TestFindFileInfoInQuorum(t *testing.T) {
tests := []struct {
fis []FileInfo
modTime time.Time
dataDir string
expectedErr error
expectedQuorum int
}{
{
fis: getNFInfo(16, 16, 1603863445, "36a21454-a2ca-11eb-bbaa-93a81c686f21"),
modTime: time.Unix(1603863445, 0),
dataDir: "36a21454-a2ca-11eb-bbaa-93a81c686f21",
expectedErr: nil,
expectedQuorum: 8,
},
{
fis: getNFInfo(16, 7, 1603863445, "36a21454-a2ca-11eb-bbaa-93a81c686f21"),
modTime: time.Unix(1603863445, 0),
dataDir: "36a21454-a2ca-11eb-bbaa-93a81c686f21",
expectedErr: errErasureReadQuorum,
expectedQuorum: 8,
},
{
fis: getNFInfo(16, 16, 1603863445, "36a21454-a2ca-11eb-bbaa-93a81c686f21"),
modTime: time.Unix(1603863445, 0),
dataDir: "36a21454-a2ca-11eb-bbaa-93a81c686f21",
expectedErr: errErasureReadQuorum,
expectedQuorum: 0,
},
@ -208,7 +204,7 @@ func TestFindFileInfoInQuorum(t *testing.T) {
for _, test := range tests {
test := test
t.Run("", func(t *testing.T) {
_, err := findFileInfoInQuorum(context.Background(), test.fis, test.modTime, test.dataDir, test.expectedQuorum)
_, err := findFileInfoInQuorum(context.Background(), test.fis, test.modTime, test.expectedQuorum)
if err != test.expectedErr {
t.Errorf("Expected %s, got %s", test.expectedErr, err)
}

View File

@ -67,10 +67,10 @@ func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object
}
// List all online disks.
_, modTime, dataDir := listOnlineDisks(disks, metaArr, errs)
_, modTime := listOnlineDisks(disks, metaArr, errs)
// Pick latest valid metadata.
_, err = pickValidFileInfo(ctx, metaArr, modTime, dataDir, readQuorum)
_, err = pickValidFileInfo(ctx, metaArr, modTime, readQuorum)
return err
}
@ -460,10 +460,10 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
}
// List all online disks.
onlineDisks, modTime, dataDir := listOnlineDisks(storageDisks, partsMetadata, errs)
onlineDisks, modTime := listOnlineDisks(storageDisks, partsMetadata, errs)
// Pick one from the first valid metadata.
fi, err := pickValidFileInfo(pctx, partsMetadata, modTime, dataDir, writeQuorum)
fi, err := pickValidFileInfo(pctx, partsMetadata, modTime, writeQuorum)
if err != nil {
return pi, err
}
@ -568,10 +568,10 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
}
// Get current highest version based on re-read partsMetadata.
onlineDisks, modTime, dataDir = listOnlineDisks(onlineDisks, partsMetadata, errs)
onlineDisks, modTime = listOnlineDisks(onlineDisks, partsMetadata, errs)
// Pick one from the first valid metadata.
fi, err = pickValidFileInfo(wctx, partsMetadata, modTime, dataDir, writeQuorum)
fi, err = pickValidFileInfo(wctx, partsMetadata, modTime, writeQuorum)
if err != nil {
return pi, err
}
@ -656,10 +656,10 @@ func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, u
return result, toObjectErr(reducedErr, minioMetaMultipartBucket, uploadIDPath)
}
_, modTime, dataDir := listOnlineDisks(storageDisks, partsMetadata, errs)
_, modTime := listOnlineDisks(storageDisks, partsMetadata, errs)
// Pick one from the first valid metadata.
fi, err := pickValidFileInfo(ctx, partsMetadata, modTime, dataDir, readQuorum)
fi, err := pickValidFileInfo(ctx, partsMetadata, modTime, readQuorum)
if err != nil {
return result, err
}
@ -706,10 +706,10 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up
return result, toObjectErr(reducedErr, minioMetaMultipartBucket, uploadIDPath)
}
_, modTime, dataDir := listOnlineDisks(storageDisks, partsMetadata, errs)
_, modTime := listOnlineDisks(storageDisks, partsMetadata, errs)
// Pick one from the first valid metadata.
fi, err := pickValidFileInfo(ctx, partsMetadata, modTime, dataDir, writeQuorum)
fi, err := pickValidFileInfo(ctx, partsMetadata, modTime, writeQuorum)
if err != nil {
return result, err
}
@ -801,10 +801,10 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
return oi, toObjectErr(reducedErr, bucket, object)
}
onlineDisks, modTime, dataDir := listOnlineDisks(storageDisks, partsMetadata, errs)
onlineDisks, modTime := listOnlineDisks(storageDisks, partsMetadata, errs)
// Pick one from the first valid metadata.
fi, err := pickValidFileInfo(rctx, partsMetadata, modTime, dataDir, writeQuorum)
fi, err := pickValidFileInfo(rctx, partsMetadata, modTime, writeQuorum)
if err != nil {
return oi, err
}

View File

@ -88,10 +88,10 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d
}
// List all online disks.
onlineDisks, modTime, dataDir := listOnlineDisks(storageDisks, metaArr, errs)
onlineDisks, modTime := listOnlineDisks(storageDisks, metaArr, errs)
// Pick latest valid metadata.
fi, err := pickValidFileInfo(ctx, metaArr, modTime, dataDir, readQuorum)
fi, err := pickValidFileInfo(ctx, metaArr, modTime, readQuorum)
if err != nil {
return oi, toObjectErr(err, srcBucket, srcObject)
}
@ -102,6 +102,8 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d
return fi.ToObjectInfo(srcBucket, srcObject), toObjectErr(errMethodNotAllowed, srcBucket, srcObject)
}
filterOnlineDisksInplace(fi, metaArr, onlineDisks)
versionID := srcInfo.VersionID
if srcInfo.versionOnly {
versionID = dstOpts.VersionID
@ -404,14 +406,16 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s
}
// List all online disks.
onlineDisks, modTime, dataDir := listOnlineDisks(disks, metaArr, errs)
onlineDisks, modTime := listOnlineDisks(disks, metaArr, errs)
// Pick latest valid metadata.
fi, err = pickValidFileInfo(ctx, metaArr, modTime, dataDir, readQuorum)
fi, err = pickValidFileInfo(ctx, metaArr, modTime, readQuorum)
if err != nil {
return fi, nil, nil, err
}
filterOnlineDisksInplace(fi, metaArr, onlineDisks)
// if one of the disk is offline, return right here no need
// to attempt a heal on the object.
if countErrs(errs, errDiskNotFound) > 0 {
@ -424,7 +428,7 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s
missingBlocks++
continue
}
if metaArr[i].IsValid() && metaArr[i].ModTime.Equal(fi.ModTime) && metaArr[i].DataDir == fi.DataDir {
if metaArr[i].IsValid() && metaArr[i].ModTime.Equal(fi.ModTime) {
continue
}
missingBlocks++
@ -1455,17 +1459,20 @@ func (er erasureObjects) PutObjectMetadata(ctx context.Context, bucket, object s
}
// List all online disks.
_, modTime, dataDir := listOnlineDisks(disks, metaArr, errs)
onlineDisks, modTime := listOnlineDisks(disks, metaArr, errs)
// Pick latest valid metadata.
fi, err := pickValidFileInfo(ctx, metaArr, modTime, dataDir, readQuorum)
fi, err := pickValidFileInfo(ctx, metaArr, modTime, readQuorum)
if err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
if fi.Deleted {
return ObjectInfo{}, toObjectErr(errMethodNotAllowed, bucket, object)
}
filterOnlineDisksInplace(fi, metaArr, onlineDisks)
// if version-id is not specified retention is supposed to be set on the latest object.
if opts.VersionID == "" {
opts.VersionID = fi.VersionID
@ -1483,7 +1490,7 @@ func (er erasureObjects) PutObjectMetadata(ctx context.Context, bucket, object s
fi.ModTime = opts.MTime
fi.VersionID = opts.VersionID
if err = er.updateObjectMeta(ctx, bucket, object, fi); err != nil {
if err = er.updateObjectMeta(ctx, bucket, object, fi, onlineDisks); err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
@ -1512,10 +1519,10 @@ func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object strin
}
// List all online disks.
_, modTime, dataDir := listOnlineDisks(disks, metaArr, errs)
onlineDisks, modTime := listOnlineDisks(disks, metaArr, errs)
// Pick latest valid metadata.
fi, err := pickValidFileInfo(ctx, metaArr, modTime, dataDir, readQuorum)
fi, err := pickValidFileInfo(ctx, metaArr, modTime, readQuorum)
if err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
@ -1526,12 +1533,14 @@ func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object strin
return ObjectInfo{}, toObjectErr(errMethodNotAllowed, bucket, object)
}
filterOnlineDisksInplace(fi, metaArr, onlineDisks)
fi.Metadata[xhttp.AmzObjectTagging] = tags
for k, v := range opts.UserDefined {
fi.Metadata[k] = v
}
if err = er.updateObjectMeta(ctx, bucket, object, fi); err != nil {
if err = er.updateObjectMeta(ctx, bucket, object, fi, onlineDisks); err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
@ -1539,30 +1548,28 @@ func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object strin
}
// updateObjectMeta will update the metadata of a file.
func (er erasureObjects) updateObjectMeta(ctx context.Context, bucket, object string, fi FileInfo) error {
func (er erasureObjects) updateObjectMeta(ctx context.Context, bucket, object string, fi FileInfo, onlineDisks []StorageAPI) error {
if len(fi.Metadata) == 0 {
return nil
}
disks := er.getDisks()
g := errgroup.WithNErrs(len(disks))
g := errgroup.WithNErrs(len(onlineDisks))
// Start writing `xl.meta` to all disks in parallel.
for index := range disks {
for index := range onlineDisks {
index := index
g.Go(func() error {
if disks[index] == nil {
if onlineDisks[index] == nil {
return errDiskNotFound
}
return disks[index].UpdateMetadata(ctx, bucket, object, fi)
return onlineDisks[index].UpdateMetadata(ctx, bucket, object, fi)
}, index)
}
// Wait for all the routines.
mErrs := g.Wait()
return reduceWriteQuorumErrs(ctx, mErrs, objectOpIgnoredErrs, getWriteQuorum(len(disks)))
return reduceWriteQuorumErrs(ctx, mErrs, objectOpIgnoredErrs, getWriteQuorum(len(onlineDisks)))
}
// DeleteObjectTags - delete object tags from an existing object

View File

@ -700,7 +700,7 @@ func (er *erasureObjects) saveMetaCacheStream(ctx context.Context, mc *metaCache
for k, v := range meta {
fi.Metadata[k] = v
}
err := er.updateObjectMeta(ctx, minioMetaBucket, o.objectPath(0), fi)
err := er.updateObjectMeta(ctx, minioMetaBucket, o.objectPath(0), fi, er.getDisks())
if err == nil {
break
}

View File

@ -136,7 +136,7 @@ func renameAll(srcFilePath, dstFilePath string) (err error) {
switch {
case isSysErrNotDir(err) && !osIsNotExist(err):
// Windows can have both isSysErrNotDir(err) and osIsNotExist(err) returning
// true if the source file path contains an inexistant directory. In that case,
// true if the source file path contains an non-existent directory. In that case,
// we want to return errFileNotFound instead, which will honored in subsequent
// switch cases
return errFileAccessDenied

View File

@ -186,6 +186,21 @@ type FileInfo struct {
Idx int `msg:"i"`
}
// GetDataDir returns an expected dataDir given FileInfo
// - deleteMarker returns "delete-marker"
// - returns "legacy" if FileInfo is XLV1 and DataDir is
// empty, returns DataDir otherwise
// - returns "dataDir"
func (fi FileInfo) GetDataDir() string {
if fi.Deleted {
return "delete-marker"
}
if fi.XLV1 && fi.DataDir == "" {
return "legacy"
}
return fi.DataDir
}
// InlineData returns true if object contents are inlined alongside its metadata.
func (fi FileInfo) InlineData() bool {
_, ok := fi.Metadata[ReservedMetadataPrefixLower+"inline-data"]

View File

@ -105,8 +105,6 @@ type xlStorage struct {
formatLastCheck time.Time
diskInfoCache timedValue
ctx context.Context
sync.RWMutex
// mutex to prevent concurrent read operations overloading walks.
@ -255,7 +253,6 @@ func newXLStorage(ep Endpoint) (*xlStorage, error) {
diskPath: path,
endpoint: ep,
globalSync: env.Get(config.EnvFSOSync, config.EnableOff) == config.EnableOn,
ctx: GlobalContext,
rootDisk: rootDisk,
poolIndex: -1,
setIndex: -1,
@ -1952,6 +1949,9 @@ func (s *xlStorage) Delete(ctx context.Context, volume string, path string, recu
// RenameData - rename source path to destination path atomically, metadata and data directory.
func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string) (err error) {
defer func() {
if err != nil {
logger.LogIf(ctx, err)
}
if err == nil {
if s.globalSync {
globalSync()
@ -2045,7 +2045,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
if len(dstBuf) > 0 {
if isXL2V1Format(dstBuf) {
if err = xlMeta.Load(dstBuf); err != nil {
logger.LogIf(s.ctx, err)
logger.LogIf(ctx, err)
// Data appears corrupt. Drop data.
xlMeta = xlMetaV2{}
}
@ -2054,11 +2054,11 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
xlMetaLegacy := &xlMetaV1Object{}
var json = jsoniter.ConfigCompatibleWithStandardLibrary
if err := json.Unmarshal(dstBuf, xlMetaLegacy); err != nil {
logger.LogIf(s.ctx, err)
logger.LogIf(ctx, err)
// Data appears corrupt. Drop data.
} else {
if err = xlMeta.AddLegacy(xlMetaLegacy); err != nil {
logger.LogIf(s.ctx, err)
logger.LogIf(ctx, err)
}
legacyPreserved = true
}
@ -2195,7 +2195,6 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
s.deleteFile(dstVolumeDir, legacyDataPath, true)
}
s.deleteFile(dstVolumeDir, dstDataPath, false)
if err != errFileNotFound {
logger.LogIf(ctx, err)
}
@ -2373,8 +2372,8 @@ func (s *xlStorage) VerifyFile(ctx context.Context, volume, path string, fi File
errVolumeNotFound,
errFileCorrupt,
}...) {
logger.GetReqInfo(s.ctx).AppendTags("disk", s.String())
logger.LogIf(s.ctx, err)
logger.GetReqInfo(ctx).AppendTags("disk", s.String())
logger.LogIf(ctx, err)
}
return err
}