mirror of
https://github.com/minio/minio.git
synced 2024-12-24 06:05:55 -05:00
c0721164be
Update reedsolomon library to enable feature to automatically set number of go-routines based on the input shard size, since shard size is sort of a constant in Minio for objects > 10MiB (default blocksize) klauspost reported around 15-20% improvement in performance numbers on older systems such as AVX and SSE3 ``` name old speed new speed delta Encode10x2x10000-8 5.45GB/s ± 1% 6.22GB/s ± 1% +14.20% (p=0.000 n=9+9) Encode100x20x10000-8 1.44GB/s ± 1% 1.64GB/s ± 1% +13.77% (p=0.000 n=10+10) Encode17x3x1M-8 10.0GB/s ± 5% 12.0GB/s ± 1% +19.88% (p=0.000 n=10+10) Encode10x4x16M-8 7.81GB/s ± 5% 8.56GB/s ± 5% +9.58% (p=0.000 n=10+9) Encode5x2x1M-8 15.3GB/s ± 2% 19.6GB/s ± 2% +28.57% (p=0.000 n=9+10) Encode10x2x1M-8 12.2GB/s ± 5% 15.0GB/s ± 5% +22.45% (p=0.000 n=10+10) Encode10x4x1M-8 7.84GB/s ± 1% 9.03GB/s ± 1% +15.19% (p=0.000 n=9+9) Encode50x20x1M-8 1.73GB/s ± 4% 2.09GB/s ± 4% +20.59% (p=0.000 n=10+9) Encode17x3x16M-8 10.6GB/s ± 1% 11.7GB/s ± 4% +10.12% (p=0.000 n=8+10) ```
554 lines
17 KiB
Go
554 lines
17 KiB
Go
/*
|
|
* Minio Cloud Storage, (C) 2016 Minio, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"fmt"
|
|
"path"
|
|
"sort"
|
|
"sync"
|
|
|
|
"github.com/minio/minio/pkg/errors"
|
|
)
|
|
|
|
// healFormatXL - heals missing `format.json` on freshly or corrupted
|
|
// disks (missing format.json but does have erasure coded data in it).
|
|
func healFormatXL(storageDisks []StorageAPI) (err error) {
|
|
// Attempt to load all `format.json`.
|
|
formatConfigs, sErrs := loadAllFormats(storageDisks)
|
|
|
|
// Generic format check.
|
|
// - if (no quorum) return error
|
|
// - if (disks not recognized) // Always error.
|
|
if err = genericFormatCheckXL(formatConfigs, sErrs); err != nil {
|
|
return err
|
|
}
|
|
|
|
numDisks := len(storageDisks)
|
|
_, unformattedDiskCount, diskNotFoundCount,
|
|
corruptedFormatCount, otherErrCount := formatErrsSummary(sErrs)
|
|
|
|
switch {
|
|
case unformattedDiskCount == numDisks:
|
|
// all unformatted.
|
|
if err = initFormatXL(storageDisks); err != nil {
|
|
return err
|
|
}
|
|
|
|
case diskNotFoundCount > 0:
|
|
return fmt.Errorf("cannot proceed with heal as %s",
|
|
errSomeDiskOffline)
|
|
|
|
case otherErrCount > 0:
|
|
return fmt.Errorf("cannot proceed with heal as some disks had unhandled errors")
|
|
|
|
case corruptedFormatCount > 0:
|
|
if err = healFormatXLCorruptedDisks(storageDisks, formatConfigs); err != nil {
|
|
return fmt.Errorf("Unable to repair corrupted format, %s", err)
|
|
}
|
|
|
|
case unformattedDiskCount > 0:
|
|
// All drives online but some report missing format.json.
|
|
if err = healFormatXLFreshDisks(storageDisks, formatConfigs); err != nil {
|
|
// There was an unexpected unrecoverable error
|
|
// during healing.
|
|
return fmt.Errorf("Unable to heal backend %s", err)
|
|
}
|
|
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Heals a bucket if it doesn't exist on one of the disks, additionally
|
|
// also heals the missing entries for bucket metadata files
|
|
// `policy.json, notification.xml, listeners.json`.
|
|
func (xl xlObjects) HealBucket(bucket string) error {
|
|
if err := checkBucketExist(bucket, xl); err != nil {
|
|
return err
|
|
}
|
|
|
|
// get write quorum for an object
|
|
writeQuorum := len(xl.storageDisks)/2 + 1
|
|
|
|
// Heal bucket.
|
|
if err := healBucket(xl.storageDisks, bucket, writeQuorum); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Proceed to heal bucket metadata.
|
|
return healBucketMetadata(xl, bucket)
|
|
}
|
|
|
|
// Heal bucket - create buckets on disks where it does not exist.
|
|
func healBucket(storageDisks []StorageAPI, bucket string, writeQuorum int) error {
|
|
bucketLock := globalNSMutex.NewNSLock(bucket, "")
|
|
if err := bucketLock.GetLock(globalHealingTimeout); err != nil {
|
|
return err
|
|
}
|
|
defer bucketLock.Unlock()
|
|
|
|
// Initialize sync waitgroup.
|
|
var wg = &sync.WaitGroup{}
|
|
|
|
// Initialize list of errors.
|
|
var dErrs = make([]error, len(storageDisks))
|
|
|
|
// Make a volume entry on all underlying storage disks.
|
|
for index, disk := range storageDisks {
|
|
if disk == nil {
|
|
dErrs[index] = errors.Trace(errDiskNotFound)
|
|
continue
|
|
}
|
|
wg.Add(1)
|
|
// Make a volume inside a go-routine.
|
|
go func(index int, disk StorageAPI) {
|
|
defer wg.Done()
|
|
if _, err := disk.StatVol(bucket); err != nil {
|
|
if err != errVolumeNotFound {
|
|
dErrs[index] = errors.Trace(err)
|
|
return
|
|
}
|
|
if err = disk.MakeVol(bucket); err != nil {
|
|
dErrs[index] = errors.Trace(err)
|
|
}
|
|
}
|
|
}(index, disk)
|
|
}
|
|
|
|
// Wait for all make vol to finish.
|
|
wg.Wait()
|
|
|
|
reducedErr := reduceWriteQuorumErrs(dErrs, bucketOpIgnoredErrs, writeQuorum)
|
|
if errors.Cause(reducedErr) == errXLWriteQuorum {
|
|
// Purge successfully created buckets if we don't have writeQuorum.
|
|
undoMakeBucket(storageDisks, bucket)
|
|
}
|
|
return reducedErr
|
|
}
|
|
|
|
// Heals all the metadata associated for a given bucket, this function
|
|
// heals `policy.json`, `notification.xml` and `listeners.json`.
|
|
func healBucketMetadata(xlObj xlObjects, bucket string) error {
|
|
healBucketMetaFn := func(metaPath string) error {
|
|
if _, _, err := xlObj.HealObject(minioMetaBucket, metaPath); err != nil && !isErrObjectNotFound(err) {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Heal `policy.json` for missing entries, ignores if `policy.json` is not found.
|
|
policyPath := pathJoin(bucketConfigPrefix, bucket, bucketPolicyConfig)
|
|
if err := healBucketMetaFn(policyPath); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Heal `notification.xml` for missing entries, ignores if `notification.xml` is not found.
|
|
nConfigPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig)
|
|
if err := healBucketMetaFn(nConfigPath); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Heal `listeners.json` for missing entries, ignores if `listeners.json` is not found.
|
|
lConfigPath := path.Join(bucketConfigPrefix, bucket, bucketListenerConfig)
|
|
return healBucketMetaFn(lConfigPath)
|
|
}
|
|
|
|
// listAllBuckets lists all buckets from all disks. It also
|
|
// returns the occurrence of each buckets in all disks
|
|
func listAllBuckets(storageDisks []StorageAPI) (buckets map[string]VolInfo, bucketsOcc map[string]int, err error) {
|
|
buckets = make(map[string]VolInfo)
|
|
bucketsOcc = make(map[string]int)
|
|
for _, disk := range storageDisks {
|
|
if disk == nil {
|
|
continue
|
|
}
|
|
var volsInfo []VolInfo
|
|
volsInfo, err = disk.ListVols()
|
|
if err == nil {
|
|
for _, volInfo := range volsInfo {
|
|
// StorageAPI can send volume names which are
|
|
// incompatible with buckets, handle it and skip them.
|
|
if !IsValidBucketName(volInfo.Name) {
|
|
continue
|
|
}
|
|
// Skip special volume buckets.
|
|
if isMinioMetaBucketName(volInfo.Name) {
|
|
continue
|
|
}
|
|
// Increase counter per bucket name
|
|
bucketsOcc[volInfo.Name]++
|
|
// Save volume info under bucket name
|
|
buckets[volInfo.Name] = volInfo
|
|
}
|
|
continue
|
|
}
|
|
// Ignore any disks not found.
|
|
if errors.IsErrIgnored(err, bucketMetadataOpIgnoredErrs...) {
|
|
continue
|
|
}
|
|
break
|
|
}
|
|
return buckets, bucketsOcc, err
|
|
}
|
|
|
|
// reduceHealStatus - fetches the worst heal status in a provided slice
|
|
func reduceHealStatus(status []healStatus) healStatus {
|
|
worstStatus := healthy
|
|
for _, st := range status {
|
|
if st > worstStatus {
|
|
worstStatus = st
|
|
}
|
|
}
|
|
return worstStatus
|
|
}
|
|
|
|
// bucketHealStatus - returns the heal status of the provided bucket. Internally,
|
|
// this function lists all object heal status of objects inside meta bucket config
|
|
// directory and returns the worst heal status that can be found
|
|
func (xl xlObjects) bucketHealStatus(bucketName string) (healStatus, error) {
|
|
// A list of all the bucket config files
|
|
configFiles := []string{bucketPolicyConfig, bucketNotificationConfig, bucketListenerConfig}
|
|
// The status of buckets config files
|
|
configsHealStatus := make([]healStatus, len(configFiles))
|
|
// The list of errors found during checking heal status of each config file
|
|
configsErrs := make([]error, len(configFiles))
|
|
// The path of meta bucket that contains all config files
|
|
configBucket := path.Join(minioMetaBucket, bucketConfigPrefix, bucketName)
|
|
|
|
// Check of config files heal status in go-routines
|
|
var wg sync.WaitGroup
|
|
// Loop over config files
|
|
for idx, configFile := range configFiles {
|
|
wg.Add(1)
|
|
// Compute heal status of current config file
|
|
go func(bucket, object string, index int) {
|
|
defer wg.Done()
|
|
// Check
|
|
listObjectsHeal, err := xl.listObjectsHeal(bucket, object, "", "", 1)
|
|
// If any error, save and immediately quit
|
|
if err != nil {
|
|
configsErrs[index] = err
|
|
return
|
|
}
|
|
// Check if current bucket contains any not healthy config file and save heal status
|
|
if len(listObjectsHeal.Objects) > 0 {
|
|
configsHealStatus[index] = listObjectsHeal.Objects[0].HealObjectInfo.Status
|
|
}
|
|
}(configBucket, configFile, idx)
|
|
}
|
|
wg.Wait()
|
|
|
|
// Return any found error
|
|
for _, err := range configsErrs {
|
|
if err != nil {
|
|
return healthy, err
|
|
}
|
|
}
|
|
|
|
// Reduce and return heal status
|
|
return reduceHealStatus(configsHealStatus), nil
|
|
}
|
|
|
|
// ListBucketsHeal - Find all buckets that need to be healed
|
|
func (xl xlObjects) ListBucketsHeal() ([]BucketInfo, error) {
|
|
listBuckets := []BucketInfo{}
|
|
// List all buckets that can be found in all disks
|
|
buckets, occ, err := listAllBuckets(xl.storageDisks)
|
|
if err != nil {
|
|
return listBuckets, err
|
|
}
|
|
|
|
// Iterate over all buckets
|
|
for _, currBucket := range buckets {
|
|
// Check the status of bucket metadata
|
|
bucketHealStatus, err := xl.bucketHealStatus(currBucket.Name)
|
|
if err != nil {
|
|
return []BucketInfo{}, err
|
|
}
|
|
// If all metadata are sane, check if the bucket directory is present in all disks
|
|
if bucketHealStatus == healthy && occ[currBucket.Name] != len(xl.storageDisks) {
|
|
// Current bucket is missing in some of the storage disks
|
|
bucketHealStatus = canHeal
|
|
}
|
|
// Add current bucket to the returned result if not healthy
|
|
if bucketHealStatus != healthy {
|
|
listBuckets = append(listBuckets,
|
|
BucketInfo{
|
|
Name: currBucket.Name,
|
|
Created: currBucket.Created,
|
|
HealBucketInfo: &HealBucketInfo{Status: bucketHealStatus},
|
|
})
|
|
}
|
|
|
|
}
|
|
|
|
// Sort found buckets
|
|
sort.Sort(byBucketName(listBuckets))
|
|
return listBuckets, nil
|
|
}
|
|
|
|
// This function is meant for all the healing that needs to be done
|
|
// during startup i.e healing of buckets, bucket metadata (policy.json,
|
|
// notification.xml, listeners.json) etc. Currently this function
|
|
// supports quick healing of buckets, bucket metadata.
|
|
func quickHeal(xlObj xlObjects, writeQuorum int, readQuorum int) error {
|
|
// List all bucket name occurrence from all disks.
|
|
_, bucketOcc, err := listAllBuckets(xlObj.storageDisks)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// All bucket names and bucket metadata that should be healed.
|
|
for bucketName, occCount := range bucketOcc {
|
|
// Heal bucket only if healing is needed.
|
|
if occCount != len(xlObj.storageDisks) {
|
|
// Heal bucket and then proceed to heal bucket metadata if any.
|
|
if err = healBucket(xlObj.storageDisks, bucketName, writeQuorum); err == nil {
|
|
if err = healBucketMetadata(xlObj, bucketName); err == nil {
|
|
continue
|
|
}
|
|
return err
|
|
}
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Success.
|
|
return nil
|
|
}
|
|
|
|
// Heals an object only the corrupted/missing erasure blocks.
|
|
func healObject(storageDisks []StorageAPI, bucket, object string, quorum int) (int, int, error) {
|
|
|
|
partsMetadata, errs := readAllXLMetadata(storageDisks, bucket, object)
|
|
// readQuorum suffices for xl.json since we use monotonic
|
|
// system time to break the tie when a split-brain situation
|
|
// arises.
|
|
if rErr := reduceReadQuorumErrs(errs, nil, quorum); rErr != nil {
|
|
return 0, 0, toObjectErr(rErr, bucket, object)
|
|
}
|
|
|
|
// List of disks having latest version of the object.
|
|
latestDisks, modTime := listOnlineDisks(storageDisks, partsMetadata, errs)
|
|
|
|
// List of disks having all parts as per latest xl.json - this
|
|
// does a full pass over the data and verifies all part files
|
|
// on disk
|
|
availableDisks, errs, aErr := disksWithAllParts(latestDisks, partsMetadata, errs, bucket,
|
|
object)
|
|
if aErr != nil {
|
|
return 0, 0, toObjectErr(aErr, bucket, object)
|
|
}
|
|
|
|
// Number of disks which don't serve data.
|
|
numOfflineDisks := 0
|
|
for index, disk := range storageDisks {
|
|
if disk == nil || errs[index] == errDiskNotFound {
|
|
numOfflineDisks++
|
|
}
|
|
}
|
|
|
|
// Number of disks which have all parts of the given object.
|
|
numAvailableDisks := 0
|
|
for _, disk := range availableDisks {
|
|
if disk != nil {
|
|
numAvailableDisks++
|
|
}
|
|
}
|
|
|
|
if numAvailableDisks == len(storageDisks) {
|
|
// nothing to heal in this case
|
|
return 0, 0, nil
|
|
}
|
|
|
|
// If less than read quorum number of disks have all the parts
|
|
// of the data, we can't reconstruct the erasure-coded data.
|
|
if numAvailableDisks < quorum {
|
|
return 0, 0, toObjectErr(errXLReadQuorum, bucket, object)
|
|
}
|
|
|
|
// List of disks having outdated version of the object or missing object.
|
|
outDatedDisks := outDatedDisks(storageDisks, availableDisks, errs, partsMetadata, bucket,
|
|
object)
|
|
|
|
// Number of disks that had outdated content of the given
|
|
// object and are online to be healed.
|
|
numHealedDisks := 0
|
|
for _, disk := range outDatedDisks {
|
|
if disk != nil {
|
|
numHealedDisks++
|
|
}
|
|
}
|
|
|
|
// Latest xlMetaV1 for reference. If a valid metadata is not
|
|
// present, it is as good as object not found.
|
|
latestMeta, pErr := pickValidXLMeta(partsMetadata, modTime)
|
|
if pErr != nil {
|
|
return 0, 0, toObjectErr(pErr, bucket, object)
|
|
}
|
|
|
|
for index, disk := range outDatedDisks {
|
|
// Before healing outdated disks, we need to remove
|
|
// xl.json and part files from "bucket/object/" so
|
|
// that rename(minioMetaBucket, "tmp/tmpuuid/",
|
|
// "bucket", "object/") succeeds.
|
|
if disk == nil {
|
|
// Not an outdated disk.
|
|
continue
|
|
}
|
|
|
|
// errFileNotFound implies that xl.json is missing. We
|
|
// may have object parts still present in the object
|
|
// directory. This needs to be deleted for object to
|
|
// healed successfully.
|
|
if errs[index] != nil && !errors.IsErr(errs[index], errFileNotFound) {
|
|
continue
|
|
}
|
|
|
|
// List and delete the object directory, ignoring
|
|
// errors.
|
|
files, err := disk.ListDir(bucket, object)
|
|
if err == nil {
|
|
for _, entry := range files {
|
|
_ = disk.DeleteFile(bucket,
|
|
pathJoin(object, entry))
|
|
}
|
|
}
|
|
}
|
|
|
|
// Reorder so that we have data disks first and parity disks next.
|
|
latestDisks = shuffleDisks(latestDisks, latestMeta.Erasure.Distribution)
|
|
outDatedDisks = shuffleDisks(outDatedDisks, latestMeta.Erasure.Distribution)
|
|
partsMetadata = shufflePartsMetadata(partsMetadata, latestMeta.Erasure.Distribution)
|
|
|
|
// We write at temporary location and then rename to final location.
|
|
tmpID := mustGetUUID()
|
|
|
|
// Checksum of the part files. checkSumInfos[index] will
|
|
// contain checksums of all the part files in the
|
|
// outDatedDisks[index]
|
|
checksumInfos := make([][]ChecksumInfo, len(outDatedDisks))
|
|
|
|
// Heal each part. erasureHealFile() will write the healed
|
|
// part to .minio/tmp/uuid/ which needs to be renamed later to
|
|
// the final location.
|
|
storage, err := NewErasureStorage(latestDisks,
|
|
latestMeta.Erasure.DataBlocks, latestMeta.Erasure.ParityBlocks, latestMeta.Erasure.BlockSize)
|
|
if err != nil {
|
|
return 0, 0, toObjectErr(err, bucket, object)
|
|
}
|
|
checksums := make([][]byte, len(latestDisks))
|
|
for partIndex := 0; partIndex < len(latestMeta.Parts); partIndex++ {
|
|
partName := latestMeta.Parts[partIndex].Name
|
|
partSize := latestMeta.Parts[partIndex].Size
|
|
erasure := latestMeta.Erasure
|
|
var algorithm BitrotAlgorithm
|
|
for i, disk := range storage.disks {
|
|
if disk != OfflineDisk {
|
|
info := partsMetadata[i].Erasure.GetChecksumInfo(partName)
|
|
algorithm = info.Algorithm
|
|
checksums[i] = info.Hash
|
|
}
|
|
}
|
|
// Heal the part file.
|
|
file, hErr := storage.HealFile(outDatedDisks, bucket, pathJoin(object, partName),
|
|
erasure.BlockSize, minioMetaTmpBucket, pathJoin(tmpID, partName), partSize,
|
|
algorithm, checksums)
|
|
if hErr != nil {
|
|
return 0, 0, toObjectErr(hErr, bucket, object)
|
|
}
|
|
// outDatedDisks that had write errors should not be
|
|
// written to for remaining parts, so we nil it out.
|
|
for i, disk := range outDatedDisks {
|
|
if disk == nil {
|
|
continue
|
|
}
|
|
// A non-nil stale disk which did not receive
|
|
// a healed part checksum had a write error.
|
|
if file.Checksums[i] == nil {
|
|
outDatedDisks[i] = nil
|
|
numHealedDisks--
|
|
continue
|
|
}
|
|
// append part checksums
|
|
checksumInfos[i] = append(checksumInfos[i],
|
|
ChecksumInfo{partName, file.Algorithm, file.Checksums[i]})
|
|
}
|
|
|
|
// If all disks are having errors, we give up.
|
|
if numHealedDisks == 0 {
|
|
return 0, 0, fmt.Errorf("all disks without up-to-date data had write errors")
|
|
}
|
|
}
|
|
|
|
// xl.json should be written to all the healed disks.
|
|
for index, disk := range outDatedDisks {
|
|
if disk == nil {
|
|
continue
|
|
}
|
|
partsMetadata[index] = latestMeta
|
|
partsMetadata[index].Erasure.Checksums = checksumInfos[index]
|
|
}
|
|
|
|
// Generate and write `xl.json` generated from other disks.
|
|
outDatedDisks, aErr = writeUniqueXLMetadata(outDatedDisks, minioMetaTmpBucket, tmpID,
|
|
partsMetadata, diskCount(outDatedDisks))
|
|
if aErr != nil {
|
|
return 0, 0, toObjectErr(aErr, bucket, object)
|
|
}
|
|
|
|
// Rename from tmp location to the actual location.
|
|
for _, disk := range outDatedDisks {
|
|
if disk == nil {
|
|
continue
|
|
}
|
|
|
|
// Attempt a rename now from healed data to final location.
|
|
aErr = disk.RenameFile(minioMetaTmpBucket, retainSlash(tmpID), bucket,
|
|
retainSlash(object))
|
|
if aErr != nil {
|
|
return 0, 0, toObjectErr(errors.Trace(aErr), bucket, object)
|
|
}
|
|
}
|
|
return numOfflineDisks, numHealedDisks, nil
|
|
}
|
|
|
|
// HealObject heals a given object for all its missing entries.
|
|
// FIXME: If an object object was deleted and one disk was down,
|
|
// and later the disk comes back up again, heal on the object
|
|
// should delete it.
|
|
func (xl xlObjects) HealObject(bucket, object string) (int, int, error) {
|
|
// Read metadata files from all the disks
|
|
partsMetadata, errs := readAllXLMetadata(xl.storageDisks, bucket, object)
|
|
|
|
// get read quorum for this object
|
|
readQuorum, _, err := objectQuorumFromMeta(xl, partsMetadata, errs)
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
|
|
// Lock the object before healing.
|
|
objectLock := globalNSMutex.NewNSLock(bucket, object)
|
|
if err := objectLock.GetRLock(globalHealingTimeout); err != nil {
|
|
return 0, 0, err
|
|
}
|
|
defer objectLock.RUnlock()
|
|
|
|
// Heal the object.
|
|
return healObject(xl.storageDisks, bucket, object, readQuorum)
|
|
}
|