mirror of
https://github.com/minio/minio.git
synced 2025-01-12 15:33:22 -05:00
e7b60c4d65
allow active disk-monitoring to be configurable, and use these add deadlines in various call layers for various syscalls.
359 lines
11 KiB
Go
359 lines
11 KiB
Go
// Copyright (c) 2015-2021 MinIO, Inc.
|
|
//
|
|
// This file is part of MinIO Object Storage stack
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Affero General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"hash/crc32"
|
|
"io"
|
|
|
|
"github.com/minio/minio/internal/logger"
|
|
"github.com/minio/pkg/sync/errgroup"
|
|
)
|
|
|
|
// figure out the most commonVersions across disk that satisfies
|
|
// the 'writeQuorum' this function returns '0' if quorum cannot
|
|
// be achieved and disks have too many inconsistent versions.
|
|
func reduceCommonVersions(diskVersions []uint64, writeQuorum int) (commonVersions uint64) {
|
|
diskVersionsCount := make(map[uint64]int)
|
|
for _, versions := range diskVersions {
|
|
diskVersionsCount[versions]++
|
|
}
|
|
|
|
max := 0
|
|
for versions, count := range diskVersionsCount {
|
|
if max < count {
|
|
max = count
|
|
commonVersions = versions
|
|
}
|
|
}
|
|
|
|
if max >= writeQuorum {
|
|
return commonVersions
|
|
}
|
|
|
|
return 0
|
|
}
|
|
|
|
// Returns number of errors that occurred the most (incl. nil) and the
|
|
// corresponding error value. NB When there is more than one error value that
|
|
// occurs maximum number of times, the error value returned depends on how
|
|
// golang's map orders keys. This doesn't affect correctness as long as quorum
|
|
// value is greater than or equal to simple majority, since none of the equally
|
|
// maximal values would occur quorum or more number of times.
|
|
func reduceErrs(errs []error, ignoredErrs []error) (maxCount int, maxErr error) {
|
|
errorCounts := make(map[error]int)
|
|
for _, err := range errs {
|
|
if IsErrIgnored(err, ignoredErrs...) {
|
|
continue
|
|
}
|
|
// Errors due to context cancelation may be wrapped - group them by context.Canceled.
|
|
if errors.Is(err, context.Canceled) {
|
|
errorCounts[context.Canceled]++
|
|
continue
|
|
}
|
|
errorCounts[err]++
|
|
}
|
|
|
|
max := 0
|
|
for err, count := range errorCounts {
|
|
switch {
|
|
case max < count:
|
|
max = count
|
|
maxErr = err
|
|
|
|
// Prefer `nil` over other error values with the same
|
|
// number of occurrences.
|
|
case max == count && err == nil:
|
|
maxErr = err
|
|
}
|
|
}
|
|
return max, maxErr
|
|
}
|
|
|
|
// reduceQuorumErrs behaves like reduceErrs by only for returning
|
|
// values of maximally occurring errors validated against a generic
|
|
// quorum number that can be read or write quorum depending on usage.
|
|
func reduceQuorumErrs(ctx context.Context, errs []error, ignoredErrs []error, quorum int, quorumErr error) error {
|
|
if contextCanceled(ctx) {
|
|
return context.Canceled
|
|
}
|
|
maxCount, maxErr := reduceErrs(errs, ignoredErrs)
|
|
if maxCount >= quorum {
|
|
return maxErr
|
|
}
|
|
return quorumErr
|
|
}
|
|
|
|
// reduceReadQuorumErrs behaves like reduceErrs but only for returning
|
|
// values of maximally occurring errors validated against readQuorum.
|
|
func reduceReadQuorumErrs(ctx context.Context, errs []error, ignoredErrs []error, readQuorum int) (maxErr error) {
|
|
return reduceQuorumErrs(ctx, errs, ignoredErrs, readQuorum, errErasureReadQuorum)
|
|
}
|
|
|
|
// reduceWriteQuorumErrs behaves like reduceErrs but only for returning
|
|
// values of maximally occurring errors validated against writeQuorum.
|
|
func reduceWriteQuorumErrs(ctx context.Context, errs []error, ignoredErrs []error, writeQuorum int) (maxErr error) {
|
|
return reduceQuorumErrs(ctx, errs, ignoredErrs, writeQuorum, errErasureWriteQuorum)
|
|
}
|
|
|
|
// Similar to 'len(slice)' but returns the actual elements count
|
|
// skipping the unallocated elements.
|
|
func diskCount(disks []StorageAPI) int {
|
|
diskCount := 0
|
|
for _, disk := range disks {
|
|
if disk == nil {
|
|
continue
|
|
}
|
|
diskCount++
|
|
}
|
|
return diskCount
|
|
}
|
|
|
|
// hashOrder - hashes input key to return consistent
|
|
// hashed integer slice. Returned integer order is salted
|
|
// with an input key. This results in consistent order.
|
|
// NOTE: collisions are fine, we are not looking for uniqueness
|
|
// in the slices returned.
|
|
func hashOrder(key string, cardinality int) []int {
|
|
if cardinality <= 0 {
|
|
// Returns an empty int slice for cardinality < 0.
|
|
return nil
|
|
}
|
|
|
|
nums := make([]int, cardinality)
|
|
keyCrc := crc32.Checksum([]byte(key), crc32.IEEETable)
|
|
|
|
start := int(keyCrc % uint32(cardinality))
|
|
for i := 1; i <= cardinality; i++ {
|
|
nums[i-1] = 1 + ((start + i) % cardinality)
|
|
}
|
|
return nums
|
|
}
|
|
|
|
// Reads all `xl.meta` metadata as a FileInfo slice.
|
|
// Returns error slice indicating the failed metadata reads.
|
|
func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, versionID string, readData bool) ([]FileInfo, []error) {
|
|
metadataArray := make([]FileInfo, len(disks))
|
|
|
|
g := errgroup.WithNErrs(len(disks))
|
|
// Read `xl.meta` in parallel across disks.
|
|
for index := range disks {
|
|
index := index
|
|
g.Go(func() (err error) {
|
|
if disks[index] == nil {
|
|
return errDiskNotFound
|
|
}
|
|
metadataArray[index], err = disks[index].ReadVersion(ctx, bucket, object, versionID, readData)
|
|
return err
|
|
}, index)
|
|
}
|
|
|
|
ignoredErrs := []error{
|
|
errFileNotFound,
|
|
errVolumeNotFound,
|
|
errFileVersionNotFound,
|
|
io.ErrUnexpectedEOF, // some times we would read without locks, ignore these errors
|
|
io.EOF, // some times we would read without locks, ignore these errors
|
|
}
|
|
ignoredErrs = append(ignoredErrs, objectOpIgnoredErrs...)
|
|
errs := g.Wait()
|
|
for index, err := range errs {
|
|
if err == nil {
|
|
continue
|
|
}
|
|
if !IsErr(err, ignoredErrs...) {
|
|
logger.LogOnceIf(ctx, fmt.Errorf("Drive %s, path (%s/%s) returned an error (%w)",
|
|
disks[index], bucket, object, err),
|
|
disks[index].String())
|
|
}
|
|
}
|
|
|
|
// Return all the metadata.
|
|
return metadataArray, errs
|
|
}
|
|
|
|
// shuffleDisksAndPartsMetadataByIndex this function should be always used by GetObjectNInfo()
|
|
// and CompleteMultipartUpload code path, it is not meant to be used with PutObject,
|
|
// NewMultipartUpload metadata shuffling.
|
|
func shuffleDisksAndPartsMetadataByIndex(disks []StorageAPI, metaArr []FileInfo, fi FileInfo) (shuffledDisks []StorageAPI, shuffledPartsMetadata []FileInfo) {
|
|
shuffledDisks = make([]StorageAPI, len(disks))
|
|
shuffledPartsMetadata = make([]FileInfo, len(disks))
|
|
distribution := fi.Erasure.Distribution
|
|
|
|
var inconsistent int
|
|
for i, meta := range metaArr {
|
|
if disks[i] == nil {
|
|
// Assuming offline drives as inconsistent,
|
|
// to be safe and fallback to original
|
|
// distribution order.
|
|
inconsistent++
|
|
continue
|
|
}
|
|
if !meta.IsValid() {
|
|
inconsistent++
|
|
continue
|
|
}
|
|
if meta.XLV1 != fi.XLV1 {
|
|
inconsistent++
|
|
continue
|
|
}
|
|
// check if erasure distribution order matches the index
|
|
// position if this is not correct we discard the disk
|
|
// and move to collect others
|
|
if distribution[i] != meta.Erasure.Index {
|
|
inconsistent++ // keep track of inconsistent entries
|
|
continue
|
|
}
|
|
shuffledDisks[meta.Erasure.Index-1] = disks[i]
|
|
shuffledPartsMetadata[meta.Erasure.Index-1] = metaArr[i]
|
|
}
|
|
|
|
// Inconsistent meta info is with in the limit of
|
|
// expected quorum, proceed with EcIndex based
|
|
// disk order.
|
|
if inconsistent < fi.Erasure.ParityBlocks {
|
|
return shuffledDisks, shuffledPartsMetadata
|
|
}
|
|
|
|
// fall back to original distribution based order.
|
|
return shuffleDisksAndPartsMetadata(disks, metaArr, fi)
|
|
}
|
|
|
|
// Return shuffled partsMetadata depending on fi.Distribution.
|
|
// additional validation is attempted and invalid metadata is
|
|
// automatically skipped only when fi.ModTime is non-zero
|
|
// indicating that this is called during read-phase
|
|
func shuffleDisksAndPartsMetadata(disks []StorageAPI, partsMetadata []FileInfo, fi FileInfo) (shuffledDisks []StorageAPI, shuffledPartsMetadata []FileInfo) {
|
|
shuffledDisks = make([]StorageAPI, len(disks))
|
|
shuffledPartsMetadata = make([]FileInfo, len(partsMetadata))
|
|
distribution := fi.Erasure.Distribution
|
|
|
|
init := fi.ModTime.IsZero()
|
|
// Shuffle slice xl metadata for expected distribution.
|
|
for index := range partsMetadata {
|
|
if disks[index] == nil {
|
|
continue
|
|
}
|
|
if !init && !partsMetadata[index].IsValid() {
|
|
// Check for parts metadata validity for only
|
|
// fi.ModTime is not empty - ModTime is always set,
|
|
// if object was ever written previously.
|
|
continue
|
|
}
|
|
if !init && fi.XLV1 != partsMetadata[index].XLV1 {
|
|
continue
|
|
}
|
|
blockIndex := distribution[index]
|
|
shuffledPartsMetadata[blockIndex-1] = partsMetadata[index]
|
|
shuffledDisks[blockIndex-1] = disks[index]
|
|
}
|
|
return shuffledDisks, shuffledPartsMetadata
|
|
}
|
|
|
|
// Return shuffled partsMetadata depending on distribution.
|
|
func shufflePartsMetadata(partsMetadata []FileInfo, distribution []int) (shuffledPartsMetadata []FileInfo) {
|
|
if distribution == nil {
|
|
return partsMetadata
|
|
}
|
|
shuffledPartsMetadata = make([]FileInfo, len(partsMetadata))
|
|
// Shuffle slice xl metadata for expected distribution.
|
|
for index := range partsMetadata {
|
|
blockIndex := distribution[index]
|
|
shuffledPartsMetadata[blockIndex-1] = partsMetadata[index]
|
|
}
|
|
return shuffledPartsMetadata
|
|
}
|
|
|
|
// shuffleDisks - shuffle input disks slice depending on the
|
|
// erasure distribution. Return shuffled slice of disks with
|
|
// their expected distribution.
|
|
func shuffleDisks(disks []StorageAPI, distribution []int) (shuffledDisks []StorageAPI) {
|
|
if distribution == nil {
|
|
return disks
|
|
}
|
|
shuffledDisks = make([]StorageAPI, len(disks))
|
|
// Shuffle disks for expected distribution.
|
|
for index := range disks {
|
|
blockIndex := distribution[index]
|
|
shuffledDisks[blockIndex-1] = disks[index]
|
|
}
|
|
return shuffledDisks
|
|
}
|
|
|
|
// evalDisks - returns a new slice of disks where nil is set if
|
|
// the corresponding error in errs slice is not nil
|
|
func evalDisks(disks []StorageAPI, errs []error) []StorageAPI {
|
|
if len(errs) != len(disks) {
|
|
logger.LogIf(GlobalContext, errors.New("unexpected drives/errors slice length"))
|
|
return nil
|
|
}
|
|
newDisks := make([]StorageAPI, len(disks))
|
|
for index := range errs {
|
|
if errs[index] == nil {
|
|
newDisks[index] = disks[index]
|
|
} else {
|
|
newDisks[index] = nil
|
|
}
|
|
}
|
|
return newDisks
|
|
}
|
|
|
|
// Errors specifically generated by calculatePartSizeFromIdx function.
|
|
var (
|
|
errPartSizeZero = errors.New("Part size cannot be zero")
|
|
errPartSizeIndex = errors.New("Part index cannot be smaller than 1")
|
|
)
|
|
|
|
// calculatePartSizeFromIdx calculates the part size according to input index.
|
|
// returns error if totalSize is -1, partSize is 0, partIndex is 0.
|
|
func calculatePartSizeFromIdx(ctx context.Context, totalSize int64, partSize int64, partIndex int) (currPartSize int64, err error) {
|
|
if totalSize < -1 {
|
|
logger.LogIf(ctx, errInvalidArgument)
|
|
return 0, errInvalidArgument
|
|
}
|
|
if partSize == 0 {
|
|
logger.LogIf(ctx, errPartSizeZero)
|
|
return 0, errPartSizeZero
|
|
}
|
|
if partIndex < 1 {
|
|
logger.LogIf(ctx, errPartSizeIndex)
|
|
return 0, errPartSizeIndex
|
|
}
|
|
if totalSize == -1 {
|
|
return -1, nil
|
|
}
|
|
if totalSize > 0 {
|
|
// Compute the total count of parts
|
|
partsCount := totalSize/partSize + 1
|
|
// Return the part's size
|
|
switch {
|
|
case int64(partIndex) < partsCount:
|
|
currPartSize = partSize
|
|
case int64(partIndex) == partsCount:
|
|
// Size of last part
|
|
currPartSize = totalSize % partSize
|
|
default:
|
|
currPartSize = 0
|
|
}
|
|
}
|
|
return currPartSize, nil
|
|
}
|