mirror of
https://github.com/minio/minio.git
synced 2024-12-25 06:35:56 -05:00
073aac3d92
By monitoring PUT/DELETE and heal operations it is possible to track changed paths and keep a bloom filter for this data. This can help prioritize paths to scan. The bloom filter can identify paths that have not changed, and the few collisions will only result in a marginal extra workload. This can be implemented on either a bucket+(1 prefix level) with reasonable performance. The bloom filter is set to have a false positive rate at 1% at 1M entries. A bloom table of this size is about ~2500 bytes when serialized. To not force a full scan of all paths that have changed cycle bloom filters would need to be kept, so we guarantee that dirty paths have been scanned within cycle runs. Until cycle bloom filters have been collected all paths are considered dirty.
786 lines
24 KiB
Go
786 lines
24 KiB
Go
/*
|
|
* MinIO Cloud Storage, (C) 2016, 2017, 2018 MinIO, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"time"
|
|
|
|
"github.com/minio/minio/cmd/logger"
|
|
"github.com/minio/minio/pkg/madmin"
|
|
"github.com/minio/minio/pkg/sync/errgroup"
|
|
)
|
|
|
|
func (xl xlObjects) ReloadFormat(ctx context.Context, dryRun bool) error {
|
|
logger.LogIf(ctx, NotImplemented{})
|
|
return NotImplemented{}
|
|
}
|
|
|
|
func (xl xlObjects) HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error) {
|
|
logger.LogIf(ctx, NotImplemented{})
|
|
return madmin.HealResultItem{}, NotImplemented{}
|
|
}
|
|
|
|
// Heals a bucket if it doesn't exist on one of the disks, additionally
|
|
// also heals the missing entries for bucket metadata files
|
|
// `policy.json, notification.xml, listeners.json`.
|
|
func (xl xlObjects) HealBucket(ctx context.Context, bucket string, dryRun, remove bool) (
|
|
result madmin.HealResultItem, err error) {
|
|
if !dryRun {
|
|
defer ObjectPathUpdated(bucket)
|
|
}
|
|
|
|
storageDisks := xl.getDisks()
|
|
|
|
// get write quorum for an object
|
|
writeQuorum := getWriteQuorum(len(storageDisks))
|
|
|
|
// Heal bucket.
|
|
return healBucket(ctx, storageDisks, bucket, writeQuorum, dryRun)
|
|
}
|
|
|
|
// Heal bucket - create buckets on disks where it does not exist.
|
|
func healBucket(ctx context.Context, storageDisks []StorageAPI, bucket string, writeQuorum int,
|
|
dryRun bool) (res madmin.HealResultItem, err error) {
|
|
|
|
// Initialize sync waitgroup.
|
|
g := errgroup.WithNErrs(len(storageDisks))
|
|
|
|
// Disk states slices
|
|
beforeState := make([]string, len(storageDisks))
|
|
afterState := make([]string, len(storageDisks))
|
|
|
|
// Make a volume entry on all underlying storage disks.
|
|
for index := range storageDisks {
|
|
index := index
|
|
g.Go(func() error {
|
|
if storageDisks[index] == nil {
|
|
beforeState[index] = madmin.DriveStateOffline
|
|
afterState[index] = madmin.DriveStateOffline
|
|
return errDiskNotFound
|
|
}
|
|
if _, serr := storageDisks[index].StatVol(bucket); serr != nil {
|
|
if serr == errDiskNotFound {
|
|
beforeState[index] = madmin.DriveStateOffline
|
|
afterState[index] = madmin.DriveStateOffline
|
|
return serr
|
|
}
|
|
if serr != errVolumeNotFound {
|
|
beforeState[index] = madmin.DriveStateCorrupt
|
|
afterState[index] = madmin.DriveStateCorrupt
|
|
return serr
|
|
}
|
|
|
|
beforeState[index] = madmin.DriveStateMissing
|
|
afterState[index] = madmin.DriveStateMissing
|
|
|
|
// mutate only if not a dry-run
|
|
if dryRun {
|
|
return nil
|
|
}
|
|
|
|
return serr
|
|
}
|
|
beforeState[index] = madmin.DriveStateOk
|
|
afterState[index] = madmin.DriveStateOk
|
|
return nil
|
|
}, index)
|
|
}
|
|
|
|
errs := g.Wait()
|
|
|
|
reducedErr := reduceWriteQuorumErrs(ctx, errs, bucketOpIgnoredErrs, writeQuorum-1)
|
|
if reducedErr == errVolumeNotFound {
|
|
return res, nil
|
|
}
|
|
|
|
// Initialize heal result info
|
|
res = madmin.HealResultItem{
|
|
Type: madmin.HealItemBucket,
|
|
Bucket: bucket,
|
|
DiskCount: len(storageDisks),
|
|
}
|
|
|
|
for i := range beforeState {
|
|
if storageDisks[i] != nil {
|
|
drive := storageDisks[i].String()
|
|
res.Before.Drives = append(res.Before.Drives, madmin.HealDriveInfo{
|
|
UUID: "",
|
|
Endpoint: drive,
|
|
State: beforeState[i],
|
|
})
|
|
}
|
|
}
|
|
|
|
// Initialize sync waitgroup.
|
|
g = errgroup.WithNErrs(len(storageDisks))
|
|
|
|
// Make a volume entry on all underlying storage disks.
|
|
for index := range storageDisks {
|
|
index := index
|
|
g.Go(func() error {
|
|
if beforeState[index] == madmin.DriveStateMissing {
|
|
makeErr := storageDisks[index].MakeVol(bucket)
|
|
if makeErr == nil {
|
|
afterState[index] = madmin.DriveStateOk
|
|
}
|
|
return makeErr
|
|
}
|
|
return errs[index]
|
|
}, index)
|
|
}
|
|
|
|
errs = g.Wait()
|
|
|
|
reducedErr = reduceWriteQuorumErrs(ctx, errs, bucketOpIgnoredErrs, writeQuorum)
|
|
if reducedErr != nil {
|
|
if reducedErr == errXLWriteQuorum {
|
|
// Purge successfully created buckets if we don't have writeQuorum.
|
|
undoMakeBucket(storageDisks, bucket)
|
|
}
|
|
return res, reducedErr
|
|
}
|
|
|
|
for i := range afterState {
|
|
if storageDisks[i] != nil {
|
|
drive := storageDisks[i].String()
|
|
res.After.Drives = append(res.After.Drives, madmin.HealDriveInfo{
|
|
UUID: "",
|
|
Endpoint: drive,
|
|
State: afterState[i],
|
|
})
|
|
}
|
|
}
|
|
|
|
return res, nil
|
|
}
|
|
|
|
// listAllBuckets lists all buckets from all disks. It also
|
|
// returns the occurrence of each buckets in all disks
|
|
func listAllBuckets(storageDisks []StorageAPI) (buckets map[string]VolInfo,
|
|
bucketsOcc map[string]int, err error) {
|
|
|
|
buckets = make(map[string]VolInfo)
|
|
bucketsOcc = make(map[string]int)
|
|
for _, disk := range storageDisks {
|
|
if disk == nil {
|
|
continue
|
|
}
|
|
var volsInfo []VolInfo
|
|
volsInfo, err = disk.ListVols()
|
|
if err != nil {
|
|
if IsErrIgnored(err, bucketMetadataOpIgnoredErrs...) {
|
|
continue
|
|
}
|
|
return nil, nil, err
|
|
}
|
|
for _, volInfo := range volsInfo {
|
|
// StorageAPI can send volume names which are
|
|
// incompatible with buckets - these are
|
|
// skipped, like the meta-bucket.
|
|
if isReservedOrInvalidBucket(volInfo.Name, false) {
|
|
continue
|
|
}
|
|
// Increase counter per bucket name
|
|
bucketsOcc[volInfo.Name]++
|
|
// Save volume info under bucket name
|
|
buckets[volInfo.Name] = volInfo
|
|
}
|
|
}
|
|
return buckets, bucketsOcc, nil
|
|
}
|
|
|
|
// Only heal on disks where we are sure that healing is needed. We can expand
|
|
// this list as and when we figure out more errors can be added to this list safely.
|
|
func shouldHealObjectOnDisk(xlErr, dataErr error, meta xlMetaV1, quorumModTime time.Time) bool {
|
|
switch xlErr {
|
|
case errFileNotFound:
|
|
return true
|
|
case errCorruptedFormat:
|
|
return true
|
|
}
|
|
if xlErr == nil {
|
|
// If xl.json was read fine but there may be problem with the part.N files.
|
|
if IsErr(dataErr, []error{
|
|
errFileNotFound,
|
|
errFileCorrupt,
|
|
}...) {
|
|
return true
|
|
}
|
|
if !quorumModTime.Equal(meta.Stat.ModTime) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// Heals an object by re-writing corrupt/missing erasure blocks.
|
|
func (xl xlObjects) healObject(ctx context.Context, bucket string, object string,
|
|
partsMetadata []xlMetaV1, errs []error, latestXLMeta xlMetaV1,
|
|
dryRun bool, remove bool, scanMode madmin.HealScanMode) (result madmin.HealResultItem, err error) {
|
|
|
|
dataBlocks := latestXLMeta.Erasure.DataBlocks
|
|
|
|
storageDisks := xl.getDisks()
|
|
|
|
// List of disks having latest version of the object xl.json
|
|
// (by modtime).
|
|
latestDisks, modTime := listOnlineDisks(storageDisks, partsMetadata, errs)
|
|
|
|
// List of disks having all parts as per latest xl.json.
|
|
availableDisks, dataErrs := disksWithAllParts(ctx, latestDisks, partsMetadata, errs, bucket, object, scanMode)
|
|
|
|
// Initialize heal result object
|
|
result = madmin.HealResultItem{
|
|
Type: madmin.HealItemObject,
|
|
Bucket: bucket,
|
|
Object: object,
|
|
DiskCount: len(storageDisks),
|
|
ParityBlocks: latestXLMeta.Erasure.ParityBlocks,
|
|
DataBlocks: latestXLMeta.Erasure.DataBlocks,
|
|
|
|
// Initialize object size to -1, so we can detect if we are
|
|
// unable to reliably find the object size.
|
|
ObjectSize: -1,
|
|
}
|
|
|
|
// Loop to find number of disks with valid data, per-drive
|
|
// data state and a list of outdated disks on which data needs
|
|
// to be healed.
|
|
outDatedDisks := make([]StorageAPI, len(storageDisks))
|
|
numAvailableDisks := 0
|
|
disksToHealCount := 0
|
|
for i, v := range availableDisks {
|
|
driveState := ""
|
|
switch {
|
|
case v != nil:
|
|
driveState = madmin.DriveStateOk
|
|
numAvailableDisks++
|
|
// If data is sane on any one disk, we can
|
|
// extract the correct object size.
|
|
result.ObjectSize = partsMetadata[i].Stat.Size
|
|
result.ParityBlocks = partsMetadata[i].Erasure.ParityBlocks
|
|
result.DataBlocks = partsMetadata[i].Erasure.DataBlocks
|
|
case errs[i] == errDiskNotFound, dataErrs[i] == errDiskNotFound:
|
|
driveState = madmin.DriveStateOffline
|
|
case errs[i] == errFileNotFound, errs[i] == errVolumeNotFound:
|
|
fallthrough
|
|
case dataErrs[i] == errFileNotFound, dataErrs[i] == errVolumeNotFound:
|
|
driveState = madmin.DriveStateMissing
|
|
default:
|
|
// all remaining cases imply corrupt data/metadata
|
|
driveState = madmin.DriveStateCorrupt
|
|
}
|
|
|
|
var drive string
|
|
if storageDisks[i] != nil {
|
|
drive = storageDisks[i].String()
|
|
}
|
|
if shouldHealObjectOnDisk(errs[i], dataErrs[i], partsMetadata[i], modTime) {
|
|
outDatedDisks[i] = storageDisks[i]
|
|
disksToHealCount++
|
|
result.Before.Drives = append(result.Before.Drives, madmin.HealDriveInfo{
|
|
UUID: "",
|
|
Endpoint: drive,
|
|
State: driveState,
|
|
})
|
|
result.After.Drives = append(result.After.Drives, madmin.HealDriveInfo{
|
|
UUID: "",
|
|
Endpoint: drive,
|
|
State: driveState,
|
|
})
|
|
continue
|
|
}
|
|
result.Before.Drives = append(result.Before.Drives, madmin.HealDriveInfo{
|
|
UUID: "",
|
|
Endpoint: drive,
|
|
State: driveState,
|
|
})
|
|
result.After.Drives = append(result.After.Drives, madmin.HealDriveInfo{
|
|
UUID: "",
|
|
Endpoint: drive,
|
|
State: driveState,
|
|
})
|
|
}
|
|
|
|
// If less than read quorum number of disks have all the parts
|
|
// of the data, we can't reconstruct the erasure-coded data.
|
|
if numAvailableDisks < dataBlocks {
|
|
// Check if xl.json, and corresponding parts are also missing.
|
|
if m, ok := isObjectDangling(partsMetadata, errs, dataErrs); ok {
|
|
writeQuorum := m.Erasure.DataBlocks + 1
|
|
if m.Erasure.DataBlocks == 0 {
|
|
writeQuorum = getWriteQuorum(len(storageDisks))
|
|
}
|
|
if !dryRun && remove {
|
|
err = xl.deleteObject(ctx, bucket, object, writeQuorum, false)
|
|
}
|
|
return defaultHealResult(latestXLMeta, storageDisks, errs, bucket, object), err
|
|
}
|
|
return result, toObjectErr(errXLReadQuorum, bucket, object)
|
|
}
|
|
|
|
if disksToHealCount == 0 {
|
|
// Nothing to heal!
|
|
return result, nil
|
|
}
|
|
|
|
// After this point, only have to repair data on disk - so
|
|
// return if it is a dry-run
|
|
if dryRun {
|
|
return result, nil
|
|
}
|
|
|
|
// Latest xlMetaV1 for reference. If a valid metadata is not
|
|
// present, it is as good as object not found.
|
|
latestMeta, pErr := pickValidXLMeta(ctx, partsMetadata, modTime, dataBlocks)
|
|
if pErr != nil {
|
|
return result, toObjectErr(pErr, bucket, object)
|
|
}
|
|
|
|
// Clear data files of the object on outdated disks
|
|
for _, disk := range outDatedDisks {
|
|
// Before healing outdated disks, we need to remove
|
|
// xl.json and part files from "bucket/object/" so
|
|
// that rename(minioMetaBucket, "tmp/tmpuuid/",
|
|
// "bucket", "object/") succeeds.
|
|
if disk == nil {
|
|
// Not an outdated disk.
|
|
continue
|
|
}
|
|
|
|
// List and delete the object directory,
|
|
files, derr := disk.ListDir(bucket, object, -1, "")
|
|
if derr == nil {
|
|
for _, entry := range files {
|
|
_ = disk.DeleteFile(bucket,
|
|
pathJoin(object, entry))
|
|
}
|
|
}
|
|
}
|
|
|
|
// Reorder so that we have data disks first and parity disks next.
|
|
latestDisks = shuffleDisks(availableDisks, latestMeta.Erasure.Distribution)
|
|
outDatedDisks = shuffleDisks(outDatedDisks, latestMeta.Erasure.Distribution)
|
|
partsMetadata = shufflePartsMetadata(partsMetadata, latestMeta.Erasure.Distribution)
|
|
for i := range outDatedDisks {
|
|
if outDatedDisks[i] == nil {
|
|
continue
|
|
}
|
|
partsMetadata[i] = newXLMetaFromXLMeta(latestMeta)
|
|
}
|
|
|
|
// We write at temporary location and then rename to final location.
|
|
tmpID := mustGetUUID()
|
|
|
|
// Heal each part. erasureHealFile() will write the healed
|
|
// part to .minio/tmp/uuid/ which needs to be renamed later to
|
|
// the final location.
|
|
erasure, err := NewErasure(ctx, latestMeta.Erasure.DataBlocks,
|
|
latestMeta.Erasure.ParityBlocks, latestMeta.Erasure.BlockSize)
|
|
if err != nil {
|
|
return result, toObjectErr(err, bucket, object)
|
|
}
|
|
|
|
erasureInfo := latestMeta.Erasure
|
|
for partIndex := 0; partIndex < len(latestMeta.Parts); partIndex++ {
|
|
partSize := latestMeta.Parts[partIndex].Size
|
|
partActualSize := latestMeta.Parts[partIndex].ActualSize
|
|
partNumber := latestMeta.Parts[partIndex].Number
|
|
tillOffset := erasure.ShardFileTillOffset(0, partSize, partSize)
|
|
readers := make([]io.ReaderAt, len(latestDisks))
|
|
checksumAlgo := erasureInfo.GetChecksumInfo(partNumber).Algorithm
|
|
for i, disk := range latestDisks {
|
|
if disk == OfflineDisk {
|
|
continue
|
|
}
|
|
checksumInfo := partsMetadata[i].Erasure.GetChecksumInfo(partNumber)
|
|
partPath := pathJoin(object, fmt.Sprintf("part.%d", partNumber))
|
|
readers[i] = newBitrotReader(disk, bucket, partPath, tillOffset, checksumAlgo, checksumInfo.Hash, erasure.ShardSize())
|
|
}
|
|
writers := make([]io.Writer, len(outDatedDisks))
|
|
for i, disk := range outDatedDisks {
|
|
if disk == OfflineDisk {
|
|
continue
|
|
}
|
|
partPath := pathJoin(tmpID, fmt.Sprintf("part.%d", partNumber))
|
|
writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, partPath, tillOffset, checksumAlgo, erasure.ShardSize())
|
|
}
|
|
hErr := erasure.Heal(ctx, readers, writers, partSize)
|
|
closeBitrotReaders(readers)
|
|
closeBitrotWriters(writers)
|
|
if hErr != nil {
|
|
return result, toObjectErr(hErr, bucket, object)
|
|
}
|
|
// outDatedDisks that had write errors should not be
|
|
// written to for remaining parts, so we nil it out.
|
|
for i, disk := range outDatedDisks {
|
|
if disk == nil {
|
|
continue
|
|
}
|
|
// A non-nil stale disk which did not receive
|
|
// a healed part checksum had a write error.
|
|
if writers[i] == nil {
|
|
outDatedDisks[i] = nil
|
|
disksToHealCount--
|
|
continue
|
|
}
|
|
partsMetadata[i].AddObjectPart(partNumber, "", partSize, partActualSize)
|
|
partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{
|
|
PartNumber: partNumber,
|
|
Algorithm: checksumAlgo,
|
|
Hash: bitrotWriterSum(writers[i]),
|
|
})
|
|
}
|
|
|
|
// If all disks are having errors, we give up.
|
|
if disksToHealCount == 0 {
|
|
return result, fmt.Errorf("all disks without up-to-date data had write errors")
|
|
}
|
|
}
|
|
|
|
// Cleanup in case of xl.json writing failure
|
|
writeQuorum := latestMeta.Erasure.DataBlocks + 1
|
|
defer xl.deleteObject(ctx, minioMetaTmpBucket, tmpID, writeQuorum, false)
|
|
|
|
// Generate and write `xl.json` generated from other disks.
|
|
outDatedDisks, aErr := writeUniqueXLMetadata(ctx, outDatedDisks, minioMetaTmpBucket, tmpID,
|
|
partsMetadata, diskCount(outDatedDisks))
|
|
if aErr != nil {
|
|
return result, toObjectErr(aErr, bucket, object)
|
|
}
|
|
|
|
// Rename from tmp location to the actual location.
|
|
for _, disk := range outDatedDisks {
|
|
if disk == nil {
|
|
continue
|
|
}
|
|
|
|
// Attempt a rename now from healed data to final location.
|
|
aErr = disk.RenameFile(minioMetaTmpBucket, retainSlash(tmpID), bucket,
|
|
retainSlash(object))
|
|
if aErr != nil {
|
|
logger.LogIf(ctx, aErr)
|
|
return result, toObjectErr(aErr, bucket, object)
|
|
}
|
|
|
|
for i, v := range result.Before.Drives {
|
|
if v.Endpoint == disk.String() {
|
|
result.After.Drives[i].State = madmin.DriveStateOk
|
|
}
|
|
}
|
|
}
|
|
|
|
// Set the size of the object in the heal result
|
|
result.ObjectSize = latestMeta.Stat.Size
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// healObjectDir - heals object directory specifically, this special call
|
|
// is needed since we do not have a special backend format for directories.
|
|
func (xl xlObjects) healObjectDir(ctx context.Context, bucket, object string, dryRun bool, remove bool) (hr madmin.HealResultItem, err error) {
|
|
storageDisks := xl.getDisks()
|
|
|
|
// Initialize heal result object
|
|
hr = madmin.HealResultItem{
|
|
Type: madmin.HealItemObject,
|
|
Bucket: bucket,
|
|
Object: object,
|
|
DiskCount: len(storageDisks),
|
|
ParityBlocks: getDefaultParityBlocks(len(storageDisks)),
|
|
DataBlocks: getDefaultDataBlocks(len(storageDisks)),
|
|
ObjectSize: 0,
|
|
}
|
|
|
|
hr.Before.Drives = make([]madmin.HealDriveInfo, len(storageDisks))
|
|
hr.After.Drives = make([]madmin.HealDriveInfo, len(storageDisks))
|
|
|
|
errs := statAllDirs(ctx, storageDisks, bucket, object)
|
|
danglingObject := isObjectDirDangling(errs)
|
|
if danglingObject {
|
|
if !dryRun && remove {
|
|
xl.deleteObject(ctx, bucket, object, hr.DataBlocks+1, true)
|
|
}
|
|
}
|
|
|
|
// Prepare object creation in all disks
|
|
for i, err := range errs {
|
|
var drive string
|
|
if storageDisks[i] != nil {
|
|
drive = storageDisks[i].String()
|
|
}
|
|
switch err {
|
|
case nil:
|
|
hr.Before.Drives[i] = madmin.HealDriveInfo{Endpoint: drive, State: madmin.DriveStateOk}
|
|
hr.After.Drives[i] = madmin.HealDriveInfo{Endpoint: drive, State: madmin.DriveStateOk}
|
|
case errDiskNotFound:
|
|
hr.Before.Drives[i] = madmin.HealDriveInfo{State: madmin.DriveStateOffline}
|
|
hr.After.Drives[i] = madmin.HealDriveInfo{State: madmin.DriveStateOffline}
|
|
case errVolumeNotFound, errFileNotFound:
|
|
// Bucket or prefix/directory not found
|
|
hr.Before.Drives[i] = madmin.HealDriveInfo{Endpoint: drive, State: madmin.DriveStateMissing}
|
|
hr.After.Drives[i] = madmin.HealDriveInfo{Endpoint: drive, State: madmin.DriveStateMissing}
|
|
default:
|
|
hr.Before.Drives[i] = madmin.HealDriveInfo{Endpoint: drive, State: madmin.DriveStateCorrupt}
|
|
hr.After.Drives[i] = madmin.HealDriveInfo{Endpoint: drive, State: madmin.DriveStateCorrupt}
|
|
}
|
|
}
|
|
if dryRun || danglingObject {
|
|
return hr, nil
|
|
}
|
|
for i, err := range errs {
|
|
if err == errVolumeNotFound || err == errFileNotFound {
|
|
// Bucket or prefix/directory not found
|
|
merr := storageDisks[i].MakeVol(pathJoin(bucket, object))
|
|
switch merr {
|
|
case nil, errVolumeExists:
|
|
hr.After.Drives[i].State = madmin.DriveStateOk
|
|
case errDiskNotFound:
|
|
hr.After.Drives[i].State = madmin.DriveStateOffline
|
|
default:
|
|
logger.LogIf(ctx, merr)
|
|
hr.After.Drives[i].State = madmin.DriveStateCorrupt
|
|
}
|
|
}
|
|
}
|
|
return hr, nil
|
|
}
|
|
|
|
// Populates default heal result item entries with possible values when we are returning prematurely.
|
|
// This is to ensure that in any circumstance we are not returning empty arrays with wrong values.
|
|
func defaultHealResult(latestXLMeta xlMetaV1, storageDisks []StorageAPI, errs []error, bucket, object string) madmin.HealResultItem {
|
|
// Initialize heal result object
|
|
result := madmin.HealResultItem{
|
|
Type: madmin.HealItemObject,
|
|
Bucket: bucket,
|
|
Object: object,
|
|
DiskCount: len(storageDisks),
|
|
|
|
// Initialize object size to -1, so we can detect if we are
|
|
// unable to reliably find the object size.
|
|
ObjectSize: -1,
|
|
}
|
|
if latestXLMeta.IsValid() {
|
|
result.ObjectSize = latestXLMeta.Stat.Size
|
|
}
|
|
|
|
for index, disk := range storageDisks {
|
|
if disk == nil {
|
|
result.Before.Drives = append(result.Before.Drives, madmin.HealDriveInfo{
|
|
UUID: "",
|
|
State: madmin.DriveStateOffline,
|
|
})
|
|
result.After.Drives = append(result.After.Drives, madmin.HealDriveInfo{
|
|
UUID: "",
|
|
State: madmin.DriveStateOffline,
|
|
})
|
|
continue
|
|
}
|
|
drive := disk.String()
|
|
driveState := madmin.DriveStateCorrupt
|
|
switch errs[index] {
|
|
case errFileNotFound, errVolumeNotFound:
|
|
driveState = madmin.DriveStateMissing
|
|
}
|
|
result.Before.Drives = append(result.Before.Drives, madmin.HealDriveInfo{
|
|
UUID: "",
|
|
Endpoint: drive,
|
|
State: driveState,
|
|
})
|
|
result.After.Drives = append(result.After.Drives, madmin.HealDriveInfo{
|
|
UUID: "",
|
|
Endpoint: drive,
|
|
State: driveState,
|
|
})
|
|
}
|
|
|
|
if !latestXLMeta.IsValid() {
|
|
// Default to most common configuration for erasure blocks.
|
|
result.ParityBlocks = getDefaultParityBlocks(len(storageDisks))
|
|
result.DataBlocks = getDefaultDataBlocks(len(storageDisks))
|
|
} else {
|
|
result.ParityBlocks = latestXLMeta.Erasure.ParityBlocks
|
|
result.DataBlocks = latestXLMeta.Erasure.DataBlocks
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
// Stat all directories.
|
|
func statAllDirs(ctx context.Context, storageDisks []StorageAPI, bucket, prefix string) []error {
|
|
g := errgroup.WithNErrs(len(storageDisks))
|
|
for index, disk := range storageDisks {
|
|
if disk == nil {
|
|
continue
|
|
}
|
|
index := index
|
|
g.Go(func() error {
|
|
entries, err := storageDisks[index].ListDir(bucket, prefix, 1, "")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(entries) > 0 {
|
|
return errVolumeNotEmpty
|
|
}
|
|
return nil
|
|
}, index)
|
|
}
|
|
|
|
return g.Wait()
|
|
}
|
|
|
|
// ObjectDir is considered dangling/corrupted if any only
|
|
// if total disks - a combination of corrupted and missing
|
|
// files is lesser than N/2+1 number of disks.
|
|
func isObjectDirDangling(errs []error) (ok bool) {
|
|
var found int
|
|
var notFound int
|
|
var foundNotEmpty int
|
|
var otherFound int
|
|
for _, readErr := range errs {
|
|
if readErr == nil {
|
|
found++
|
|
} else if readErr == errFileNotFound || readErr == errVolumeNotFound {
|
|
notFound++
|
|
} else if readErr == errVolumeNotEmpty {
|
|
foundNotEmpty++
|
|
} else {
|
|
otherFound++
|
|
}
|
|
}
|
|
return found+foundNotEmpty+otherFound < notFound
|
|
}
|
|
|
|
// Object is considered dangling/corrupted if any only
|
|
// if total disks - a combination of corrupted and missing
|
|
// files is lesser than number of data blocks.
|
|
func isObjectDangling(metaArr []xlMetaV1, errs []error, dataErrs []error) (validMeta xlMetaV1, ok bool) {
|
|
// We can consider an object data not reliable
|
|
// when xl.json is not found in read quorum disks.
|
|
// or when xl.json is not readable in read quorum disks.
|
|
var notFoundXLJSON, corruptedXLJSON int
|
|
for _, readErr := range errs {
|
|
if readErr == errFileNotFound {
|
|
notFoundXLJSON++
|
|
} else if readErr == errCorruptedFormat {
|
|
corruptedXLJSON++
|
|
}
|
|
}
|
|
var notFoundParts int
|
|
for i := range dataErrs {
|
|
// Only count part errors, if the error is not
|
|
// same as xl.json error. This is to avoid
|
|
// double counting when both parts and xl.json
|
|
// are not available.
|
|
if errs[i] != dataErrs[i] {
|
|
if dataErrs[i] == errFileNotFound {
|
|
notFoundParts++
|
|
}
|
|
}
|
|
}
|
|
|
|
for _, m := range metaArr {
|
|
if !m.IsValid() {
|
|
continue
|
|
}
|
|
validMeta = m
|
|
break
|
|
}
|
|
|
|
// We couldn't find any valid meta we are indeed corrupted, return true right away.
|
|
if validMeta.Erasure.DataBlocks == 0 {
|
|
return validMeta, true
|
|
}
|
|
|
|
// We have valid meta, now verify if we have enough files with parity blocks.
|
|
return validMeta, corruptedXLJSON+notFoundXLJSON+notFoundParts > validMeta.Erasure.ParityBlocks
|
|
}
|
|
|
|
// HealObject - heal the given object, automatically deletes the object if stale/corrupted if `remove` is true.
|
|
func (xl xlObjects) HealObject(ctx context.Context, bucket, object string, opts madmin.HealOpts) (hr madmin.HealResultItem, err error) {
|
|
// Create context that also contains information about the object and bucket.
|
|
// The top level handler might not have this information.
|
|
reqInfo := logger.GetReqInfo(ctx)
|
|
var newReqInfo *logger.ReqInfo
|
|
if reqInfo != nil {
|
|
newReqInfo = logger.NewReqInfo(reqInfo.RemoteHost, reqInfo.UserAgent, reqInfo.DeploymentID, reqInfo.RequestID, reqInfo.API, bucket, object)
|
|
} else {
|
|
newReqInfo = logger.NewReqInfo("", "", globalDeploymentID, "", "Heal", bucket, object)
|
|
}
|
|
healCtx := logger.SetReqInfo(GlobalContext, newReqInfo)
|
|
|
|
// Healing directories handle it separately.
|
|
if HasSuffix(object, SlashSeparator) {
|
|
return xl.healObjectDir(healCtx, bucket, object, opts.DryRun, opts.Remove)
|
|
}
|
|
|
|
storageDisks := xl.getDisks()
|
|
|
|
// Read metadata files from all the disks
|
|
partsMetadata, errs := readAllXLMetadata(healCtx, storageDisks, bucket, object)
|
|
|
|
// Check if the object is dangling, if yes and user requested
|
|
// remove we simply delete it from namespace.
|
|
if m, ok := isObjectDangling(partsMetadata, errs, []error{}); ok {
|
|
writeQuorum := m.Erasure.DataBlocks + 1
|
|
if m.Erasure.DataBlocks == 0 {
|
|
writeQuorum = getWriteQuorum(len(storageDisks))
|
|
}
|
|
if !opts.DryRun && opts.Remove {
|
|
xl.deleteObject(healCtx, bucket, object, writeQuorum, false)
|
|
}
|
|
err = reduceReadQuorumErrs(ctx, errs, nil, writeQuorum-1)
|
|
return defaultHealResult(xlMetaV1{}, storageDisks, errs, bucket, object), toObjectErr(err, bucket, object)
|
|
}
|
|
|
|
latestXLMeta, err := getLatestXLMeta(healCtx, partsMetadata, errs)
|
|
if err != nil {
|
|
return defaultHealResult(xlMetaV1{}, storageDisks, errs, bucket, object), toObjectErr(err, bucket, object)
|
|
}
|
|
|
|
errCount := 0
|
|
for _, err := range errs {
|
|
if err != nil {
|
|
errCount++
|
|
}
|
|
}
|
|
|
|
if errCount == len(errs) {
|
|
// Only if we get errors from all the disks we return error. Else we need to
|
|
// continue to return filled madmin.HealResultItem struct which includes info
|
|
// on what disks the file is available etc.
|
|
if err = reduceReadQuorumErrs(ctx, errs, nil, latestXLMeta.Erasure.DataBlocks); err != nil {
|
|
if m, ok := isObjectDangling(partsMetadata, errs, []error{}); ok {
|
|
writeQuorum := m.Erasure.DataBlocks + 1
|
|
if m.Erasure.DataBlocks == 0 {
|
|
writeQuorum = getWriteQuorum(len(storageDisks))
|
|
}
|
|
if !opts.DryRun && opts.Remove {
|
|
xl.deleteObject(ctx, bucket, object, writeQuorum, false)
|
|
}
|
|
}
|
|
return defaultHealResult(latestXLMeta, storageDisks, errs, bucket, object), toObjectErr(err, bucket, object)
|
|
}
|
|
}
|
|
|
|
// Heal the object.
|
|
return xl.healObject(healCtx, bucket, object, partsMetadata, errs, latestXLMeta, opts.DryRun, opts.Remove, opts.ScanMode)
|
|
}
|