ignore swapped drives instead of throwing errors (#13655)

- add checks such that swapped disks are detected
  and ignored - never used for normal operations.

- implement `unrecognizedDisk` to be ignored with
  all operations returning `errDiskNotFound`.

- also add checks such that we do not load unexpected
  disks while connecting automatically.

- additionally humanize the values when printing the errors.

Bonus: fixes handling of non-quorum situations in
getLatestFileInfo(), that does not work when 2 drives
are down, currently this function would return errors
incorrectly.
This commit is contained in:
Harshavardhana 2021-11-15 09:46:55 -08:00 committed by GitHub
parent ac74237f01
commit 4545ecad58
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 245 additions and 130 deletions

View File

@ -133,8 +133,6 @@ func initFederatorBackend(buckets []BucketInfo, objLayer ObjectLayer) {
// Add/update buckets that are not registered with the DNS // Add/update buckets that are not registered with the DNS
bucketsToBeUpdatedSlice := bucketsToBeUpdated.ToSlice() bucketsToBeUpdatedSlice := bucketsToBeUpdated.ToSlice()
g := errgroup.WithNErrs(len(bucketsToBeUpdatedSlice)).WithConcurrency(50) g := errgroup.WithNErrs(len(bucketsToBeUpdatedSlice)).WithConcurrency(50)
ctx, cancel := g.WithCancelOnError(GlobalContext)
defer cancel()
for index := range bucketsToBeUpdatedSlice { for index := range bucketsToBeUpdatedSlice {
index := index index := index
@ -143,9 +141,12 @@ func initFederatorBackend(buckets []BucketInfo, objLayer ObjectLayer) {
}, index) }, index)
} }
if err := g.WaitErr(); err != nil { ctx := GlobalContext
logger.LogIf(ctx, err) for _, err := range g.Wait() {
return if err != nil {
logger.LogIf(ctx, err)
return
}
} }
for _, bucket := range bucketsInConflict.ToSlice() { for _, bucket := range bucketsInConflict.ToSlice() {

View File

@ -32,8 +32,6 @@ import (
func concurrentDecryptETag(ctx context.Context, objects []ObjectInfo) { func concurrentDecryptETag(ctx context.Context, objects []ObjectInfo) {
g := errgroup.WithNErrs(len(objects)).WithConcurrency(500) g := errgroup.WithNErrs(len(objects)).WithConcurrency(500)
_, cancel := g.WithCancelOnError(ctx)
defer cancel()
for index := range objects { for index := range objects {
index := index index := index
g.Go(func() error { g.Go(func() error {
@ -45,7 +43,7 @@ func concurrentDecryptETag(ctx context.Context, objects []ObjectInfo) {
return nil return nil
}, index) }, index)
} }
g.WaitErr() g.Wait()
} }
// Validate all the ListObjects query arguments, returns an APIErrorCode // Validate all the ListObjects query arguments, returns an APIErrorCode

View File

@ -21,6 +21,7 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt"
"path" "path"
"sort" "sort"
"strings" "strings"
@ -222,18 +223,18 @@ func initConfig(objAPI ObjectLayer) error {
// If etcd is set then migrates /config/config.json // If etcd is set then migrates /config/config.json
// to '<export_path>/.minio.sys/config/config.json' // to '<export_path>/.minio.sys/config/config.json'
if err := migrateConfigToMinioSys(objAPI); err != nil { if err := migrateConfigToMinioSys(objAPI); err != nil {
return err return fmt.Errorf("migrateConfigToMinioSys: %w", err)
} }
// Migrates backend '<export_path>/.minio.sys/config/config.json' to latest version. // Migrates backend '<export_path>/.minio.sys/config/config.json' to latest version.
if err := migrateMinioSysConfig(objAPI); err != nil { if err := migrateMinioSysConfig(objAPI); err != nil {
return err return fmt.Errorf("migrateMinioSysConfig: %w", err)
} }
// Migrates backend '<export_path>/.minio.sys/config/config.json' to // Migrates backend '<export_path>/.minio.sys/config/config.json' to
// latest config format. // latest config format.
if err := migrateMinioSysConfigToKV(objAPI); err != nil { if err := migrateMinioSysConfigToKV(objAPI); err != nil {
return err return fmt.Errorf("migrateMinioSysConfigToKV: %w", err)
} }
return loadConfig(objAPI) return loadConfig(objAPI)

View File

@ -30,7 +30,6 @@ func commonTime(modTimes []time.Time, dataDirs []string) (modTime time.Time, dat
var maxima int // Counter for remembering max occurrence of elements. var maxima int // Counter for remembering max occurrence of elements.
timeOccurenceMap := make(map[int64]int, len(modTimes)) timeOccurenceMap := make(map[int64]int, len(modTimes))
dataDirOccurenceMap := make(map[string]int, len(dataDirs))
// Ignore the uuid sentinel and count the rest. // Ignore the uuid sentinel and count the rest.
for _, time := range modTimes { for _, time := range modTimes {
if time.Equal(timeSentinel) { if time.Equal(timeSentinel) {
@ -39,34 +38,21 @@ func commonTime(modTimes []time.Time, dataDirs []string) (modTime time.Time, dat
timeOccurenceMap[time.UnixNano()]++ timeOccurenceMap[time.UnixNano()]++
} }
for _, dataDir := range dataDirs {
if dataDir == errorDir {
continue
}
if dataDir == delMarkerDir {
dataDirOccurenceMap[delMarkerDir]++
continue
}
dataDirOccurenceMap[dataDir]++
}
// Find the common cardinality from previously collected // Find the common cardinality from previously collected
// occurrences of elements. // occurrences of elements.
for nano, count := range timeOccurenceMap { for nano, count := range timeOccurenceMap {
t := time.Unix(0, nano) t := time.Unix(0, nano).UTC()
if count > maxima || (count == maxima && t.After(modTime)) { if count > maxima || (count == maxima && t.After(modTime)) {
maxima = count maxima = count
modTime = t modTime = t
} }
} }
// Find the common cardinality from the previously collected for i, ddataDir := range dataDirs {
// occurrences of elements. if modTimes[i].Equal(modTime) {
var dmaxima int // Return the data-dir that matches modTime.
for ddataDir, count := range dataDirOccurenceMap {
if count > dmaxima {
dmaxima = count
dataDir = ddataDir dataDir = ddataDir
break
} }
} }
@ -136,14 +122,7 @@ func listOnlineDisks(disks []StorageAPI, partsMetadata []FileInfo, errs []error)
// List all the file commit ids from parts metadata. // List all the file commit ids from parts metadata.
modTimes := listObjectModtimes(partsMetadata, errs) modTimes := listObjectModtimes(partsMetadata, errs)
dataDirs := make([]string, len(partsMetadata)) dataDirs := getDataDirs(partsMetadata, errs)
for idx, fi := range partsMetadata {
if errs[idx] != nil {
dataDirs[idx] = errorDir
continue
}
dataDirs[idx] = fi.DataDir
}
// Reduce list of UUIDs to a single common value. // Reduce list of UUIDs to a single common value.
modTime, dataDir = commonTime(modTimes, dataDirs) modTime, dataDir = commonTime(modTimes, dataDirs)
@ -160,23 +139,34 @@ func listOnlineDisks(disks []StorageAPI, partsMetadata []FileInfo, errs []error)
return onlineDisks, modTime, dataDir return onlineDisks, modTime, dataDir
} }
func getDataDirs(partsMetadata []FileInfo, errs []error) []string {
dataDirs := make([]string, len(partsMetadata))
for idx, fi := range partsMetadata {
if errs[idx] != nil {
dataDirs[idx] = errorDir
continue
}
if fi.Deleted {
dataDirs[idx] = delMarkerDir
} else {
dataDirs[idx] = fi.DataDir
}
}
return dataDirs
}
// Returns the latest updated FileInfo files and error in case of failure. // Returns the latest updated FileInfo files and error in case of failure.
func getLatestFileInfo(ctx context.Context, partsMetadata []FileInfo, errs []error, quorum int) (FileInfo, error) { func getLatestFileInfo(ctx context.Context, partsMetadata []FileInfo, errs []error, quorum int) (FileInfo, error) {
// There should be atleast half correct entries, if not return failure // There should be atleast half correct entries, if not return failure
if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, quorum); reducedErr != nil { reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, quorum)
if reducedErr != nil {
return FileInfo{}, reducedErr return FileInfo{}, reducedErr
} }
// List all the file commit ids from parts metadata. // List all the file commit ids from parts metadata.
modTimes := listObjectModtimes(partsMetadata, errs) modTimes := listObjectModtimes(partsMetadata, errs)
dataDirs := make([]string, len(partsMetadata)) dataDirs := getDataDirs(partsMetadata, errs)
for idx, fi := range partsMetadata {
if errs[idx] != nil {
continue
}
dataDirs[idx] = fi.DataDir
}
// Count all latest updated FileInfo values // Count all latest updated FileInfo values
var count int var count int

View File

@ -42,20 +42,18 @@ func (er erasureObjects) HealBucket(ctx context.Context, bucket string, opts mad
storageDisks := er.getDisks() storageDisks := er.getDisks()
storageEndpoints := er.getEndpoints() storageEndpoints := er.getEndpoints()
// Heal bucket.
return er.healBucket(ctx, storageDisks, storageEndpoints, bucket, opts)
}
// Heal bucket - create buckets on disks where it does not exist.
func (er erasureObjects) healBucket(ctx context.Context, storageDisks []StorageAPI, storageEndpoints []Endpoint, bucket string, opts madmin.HealOpts) (res madmin.HealResultItem, err error) {
// get write quorum for an object // get write quorum for an object
writeQuorum := len(storageDisks) - er.defaultParityCount writeQuorum := len(storageDisks) - er.defaultParityCount
if writeQuorum == er.defaultParityCount { if writeQuorum == er.defaultParityCount {
writeQuorum++ writeQuorum++
} }
// Heal bucket.
return healBucket(ctx, storageDisks, storageEndpoints, bucket, writeQuorum, opts)
}
// Heal bucket - create buckets on disks where it does not exist.
func healBucket(ctx context.Context, storageDisks []StorageAPI, storageEndpoints []Endpoint, bucket string, writeQuorum int,
opts madmin.HealOpts) (res madmin.HealResultItem, err error) {
// Initialize sync waitgroup. // Initialize sync waitgroup.
g := errgroup.WithNErrs(len(storageDisks)) g := errgroup.WithNErrs(len(storageDisks))
@ -72,6 +70,14 @@ func healBucket(ctx context.Context, storageDisks []StorageAPI, storageEndpoints
afterState[index] = madmin.DriveStateOffline afterState[index] = madmin.DriveStateOffline
return errDiskNotFound return errDiskNotFound
} }
beforeState[index] = madmin.DriveStateOk
afterState[index] = madmin.DriveStateOk
if bucket == minioReservedBucket {
return nil
}
if _, serr := storageDisks[index].StatVol(ctx, bucket); serr != nil { if _, serr := storageDisks[index].StatVol(ctx, bucket); serr != nil {
if serr == errDiskNotFound { if serr == errDiskNotFound {
beforeState[index] = madmin.DriveStateOffline beforeState[index] = madmin.DriveStateOffline
@ -94,8 +100,6 @@ func healBucket(ctx context.Context, storageDisks []StorageAPI, storageEndpoints
return serr return serr
} }
beforeState[index] = madmin.DriveStateOk
afterState[index] = madmin.DriveStateOk
return nil return nil
}, index) }, index)
} }
@ -107,8 +111,8 @@ func healBucket(ctx context.Context, storageDisks []StorageAPI, storageEndpoints
Type: madmin.HealItemBucket, Type: madmin.HealItemBucket,
Bucket: bucket, Bucket: bucket,
DiskCount: len(storageDisks), DiskCount: len(storageDisks),
ParityBlocks: len(storageDisks) / 2, ParityBlocks: er.defaultParityCount,
DataBlocks: len(storageDisks) / 2, DataBlocks: len(storageDisks) - er.defaultParityCount,
} }
for i := range beforeState { for i := range beforeState {
@ -119,7 +123,7 @@ func healBucket(ctx context.Context, storageDisks []StorageAPI, storageEndpoints
}) })
} }
reducedErr := reduceWriteQuorumErrs(ctx, errs, bucketOpIgnoredErrs, writeQuorum-1) reducedErr := reduceReadQuorumErrs(ctx, errs, bucketOpIgnoredErrs, res.DataBlocks)
if errors.Is(reducedErr, errVolumeNotFound) && !opts.Recreate { if errors.Is(reducedErr, errVolumeNotFound) && !opts.Recreate {
for i := range beforeState { for i := range beforeState {
res.After.Drives = append(res.After.Drives, madmin.HealDriveInfo{ res.After.Drives = append(res.After.Drives, madmin.HealDriveInfo{
@ -153,7 +157,16 @@ func healBucket(ctx context.Context, storageDisks []StorageAPI, storageEndpoints
reducedErr = reduceWriteQuorumErrs(ctx, errs, bucketOpIgnoredErrs, writeQuorum) reducedErr = reduceWriteQuorumErrs(ctx, errs, bucketOpIgnoredErrs, writeQuorum)
if reducedErr != nil { if reducedErr != nil {
return res, reducedErr // If we have exactly half the drives not available,
// we should still allow HealBucket to not return error.
// this is necessary for starting the server.
readQuorum := res.DataBlocks
switch reduceReadQuorumErrs(ctx, errs, nil, readQuorum) {
case nil:
case errDiskNotFound:
default:
return res, reducedErr
}
} }
for i := range afterState { for i := range afterState {
@ -168,7 +181,7 @@ func healBucket(ctx context.Context, storageDisks []StorageAPI, storageEndpoints
// listAllBuckets lists all buckets from all disks. It also // listAllBuckets lists all buckets from all disks. It also
// returns the occurrence of each buckets in all disks // returns the occurrence of each buckets in all disks
func listAllBuckets(ctx context.Context, storageDisks []StorageAPI, healBuckets map[string]VolInfo) error { func listAllBuckets(ctx context.Context, storageDisks []StorageAPI, healBuckets map[string]VolInfo, readQuorum int) error {
g := errgroup.WithNErrs(len(storageDisks)) g := errgroup.WithNErrs(len(storageDisks))
var mu sync.Mutex var mu sync.Mutex
for index := range storageDisks { for index := range storageDisks {
@ -198,7 +211,7 @@ func listAllBuckets(ctx context.Context, storageDisks []StorageAPI, healBuckets
return nil return nil
}, index) }, index)
} }
return reduceReadQuorumErrs(ctx, g.Wait(), bucketMetadataOpIgnoredErrs, len(storageDisks)/2) return reduceReadQuorumErrs(ctx, g.Wait(), bucketMetadataOpIgnoredErrs, readQuorum)
} }
// Only heal on disks where we are sure that healing is needed. We can expand // Only heal on disks where we are sure that healing is needed. We can expand

View File

@ -913,21 +913,15 @@ func (z *erasureServerPools) DeleteObjects(ctx context.Context, bucket string, o
var mu sync.Mutex var mu sync.Mutex
eg := errgroup.WithNErrs(len(objects)).WithConcurrency(10) eg := errgroup.WithNErrs(len(objects)).WithConcurrency(10)
cctx, cancel := eg.WithCancelOnError(ctx)
defer cancel()
for j, obj := range objects { for j, obj := range objects {
j := j j := j
obj := obj obj := obj
eg.Go(func() error { eg.Go(func() error {
idx, err := z.getPoolIdxExistingNoLock(cctx, bucket, obj.ObjectName) idx, err := z.getPoolIdxExistingNoLock(ctx, bucket, obj.ObjectName)
if isErrObjectNotFound(err) { if err != nil {
derrs[j] = err derrs[j] = err
return nil return nil
} }
if err != nil {
// unhandled errors return right here.
return err
}
mu.Lock() mu.Lock()
poolObjIdxMap[idx] = append(poolObjIdxMap[idx], obj) poolObjIdxMap[idx] = append(poolObjIdxMap[idx], obj)
origIndexMap[idx] = append(origIndexMap[idx], j) origIndexMap[idx] = append(origIndexMap[idx], j)
@ -936,12 +930,7 @@ func (z *erasureServerPools) DeleteObjects(ctx context.Context, bucket string, o
}, j) }, j)
} }
if err := eg.WaitErr(); err != nil { eg.Wait() // wait to check all the pools.
for i := range derrs {
derrs[i] = err
}
return dobjects, derrs
}
// Delete concurrently in all server pools. // Delete concurrently in all server pools.
var wg sync.WaitGroup var wg sync.WaitGroup
@ -1571,7 +1560,7 @@ func (z *erasureServerPools) HealBucket(ctx context.Context, bucket string, opts
} }
// Attempt heal on the bucket metadata, ignore any failures // Attempt heal on the bucket metadata, ignore any failures
_, _ = z.HealObject(ctx, minioMetaBucket, pathJoin(bucketConfigPrefix, bucket, bucketMetadataFile), "", opts) defer z.HealObject(ctx, minioMetaBucket, pathJoin(bucketConfigPrefix, bucket, bucketMetadataFile), "", opts)
for _, pool := range z.serverPools { for _, pool := range z.serverPools {
result, err := pool.HealBucket(ctx, bucket, opts) result, err := pool.HealBucket(ctx, bucket, opts)

View File

@ -26,6 +26,7 @@ import (
"hash/crc32" "hash/crc32"
"math/rand" "math/rand"
"net/http" "net/http"
"reflect"
"sort" "sort"
"sync" "sync"
"time" "time"
@ -233,11 +234,20 @@ func (s *erasureSets) connectDisks() {
s.erasureDisksMu.RUnlock() s.erasureDisksMu.RUnlock()
if err != nil { if err != nil {
printEndpointError(endpoint, err, false) printEndpointError(endpoint, err, false)
disk.Close()
return return
} }
s.erasureDisksMu.Lock() s.erasureDisksMu.Lock()
if s.erasureDisks[setIndex][diskIndex] != nil { if currentDisk := s.erasureDisks[setIndex][diskIndex]; currentDisk != nil {
if !reflect.DeepEqual(currentDisk.Endpoint(), disk.Endpoint()) {
err = fmt.Errorf("Detected unexpected disk ordering refusing to use the disk: expecting %s, found %s, refusing to use the disk",
currentDisk.Endpoint(), disk.Endpoint())
printEndpointError(endpoint, err, false)
disk.Close()
s.erasureDisksMu.Unlock()
return
}
s.erasureDisks[setIndex][diskIndex].Close() s.erasureDisks[setIndex][diskIndex].Close()
} }
if disk.IsLocal() { if disk.IsLocal() {
@ -248,6 +258,7 @@ func (s *erasureSets) connectDisks() {
disk, err = newStorageAPI(endpoint) disk, err = newStorageAPI(endpoint)
if err != nil { if err != nil {
printEndpointError(endpoint, err, false) printEndpointError(endpoint, err, false)
s.erasureDisksMu.Unlock()
return return
} }
disk.SetDiskID(format.Erasure.This) disk.SetDiskID(format.Erasure.This)
@ -398,7 +409,7 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto
var lockerEpSet = set.NewStringSet() var lockerEpSet = set.NewStringSet()
for j := 0; j < setDriveCount; j++ { for j := 0; j < setDriveCount; j++ {
endpoint := endpoints[i*setDriveCount+j] endpoint := endpoints[i*setDriveCount+j]
// Only add lockers only one per endpoint and per erasure set. // Only add lockers per endpoint.
if locker, ok := erasureLockers[endpoint.Host]; ok && !lockerEpSet.Contains(endpoint.Host) { if locker, ok := erasureLockers[endpoint.Host]; ok && !lockerEpSet.Contains(endpoint.Host) {
lockerEpSet.Add(endpoint.Host) lockerEpSet.Add(endpoint.Host)
s.erasureLockers[i] = append(s.erasureLockers[i], locker) s.erasureLockers[i] = append(s.erasureLockers[i], locker)
@ -416,7 +427,9 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto
continue continue
} }
if m != i || n != j { if m != i || n != j {
return nil, fmt.Errorf("found a disk in an unexpected location, pool: %d, found (set=%d, disk=%d) expected (set=%d, disk=%d): %s(%s)", poolIdx, m, n, i, j, disk, diskID) logger.LogIf(GlobalContext, fmt.Errorf("Detected unexpected disk ordering refusing to use the disk - poolID: %s, found disk mounted at (set=%s, disk=%s) expected mount at (set=%s, disk=%s): %s(%s)", humanize.Ordinal(poolIdx+1), humanize.Ordinal(m+1), humanize.Ordinal(n+1), humanize.Ordinal(i+1), humanize.Ordinal(j+1), disk, diskID))
s.erasureDisks[i][j] = &unrecognizedDisk{storage: disk}
continue
} }
disk.SetDiskLoc(s.poolIndex, m, n) disk.SetDiskLoc(s.poolIndex, m, n)
s.endpointStrings[m*setDriveCount+n] = disk.String() s.endpointStrings[m*setDriveCount+n] = disk.String()
@ -855,7 +868,7 @@ func (s *erasureSets) ListBuckets(ctx context.Context) (buckets []BucketInfo, er
var healBuckets = map[string]VolInfo{} var healBuckets = map[string]VolInfo{}
for _, set := range s.sets { for _, set := range s.sets {
// lists all unique buckets across drives. // lists all unique buckets across drives.
if err := listAllBuckets(ctx, set.getDisks(), healBuckets); err != nil { if err := listAllBuckets(ctx, set.getDisks(), healBuckets, s.defaultParityCount); err != nil {
return nil, err return nil, err
} }
} }
@ -1326,8 +1339,7 @@ func (s *erasureSets) HealBucket(ctx context.Context, bucket string, opts madmin
} }
for _, set := range s.sets { for _, set := range s.sets {
var healResult madmin.HealResultItem healResult, err := set.HealBucket(ctx, bucket, opts)
healResult, err = set.HealBucket(ctx, bucket, opts)
if err != nil { if err != nil {
return result, toObjectErr(err, bucket) return result, toObjectErr(err, bucket)
} }
@ -1335,12 +1347,6 @@ func (s *erasureSets) HealBucket(ctx context.Context, bucket string, opts madmin
result.After.Drives = append(result.After.Drives, healResult.After.Drives...) result.After.Drives = append(result.After.Drives, healResult.After.Drives...)
} }
// Check if we had quorum to write, if not return an appropriate error.
_, afterDriveOnline := result.GetOnlineCounts()
if afterDriveOnline < ((s.setCount*s.setDriveCount)/2)+1 {
return result, toObjectErr(errErasureWriteQuorum, bucket)
}
return result, nil return result, nil
} }

View File

@ -283,8 +283,6 @@ func listObjects(ctx context.Context, obj ObjectLayer, bucket, prefix, marker, d
// List until maxKeys requested. // List until maxKeys requested.
g := errgroup.WithNErrs(maxKeys).WithConcurrency(maxConcurrent) g := errgroup.WithNErrs(maxKeys).WithConcurrency(maxConcurrent)
ctx, cancel := g.WithCancelOnError(ctx)
defer cancel()
objInfoFound := make([]*ObjectInfo, maxKeys) objInfoFound := make([]*ObjectInfo, maxKeys)
var i int var i int
@ -345,8 +343,10 @@ func listObjects(ctx context.Context, obj ObjectLayer, bucket, prefix, marker, d
break break
} }
} }
if err := g.WaitErr(); err != nil { for _, err := range g.Wait() {
return loi, err if err != nil {
return loi, err
}
} }
// Copy found objects // Copy found objects
objInfos := make([]ObjectInfo, 0, i+1) objInfos := make([]ObjectInfo, 0, i+1)

View File

@ -335,7 +335,7 @@ func initServer(ctx context.Context, newObject ObjectLayer) error {
if err = handleEncryptedConfigBackend(newObject); err == nil { if err = handleEncryptedConfigBackend(newObject); err == nil {
// Upon success migrating the config, initialize all sub-systems // Upon success migrating the config, initialize all sub-systems
// if all sub-systems initialized successfully return right away // if all sub-systems initialized successfully return right away
if err = initAllSubsystems(ctx, newObject); err == nil { if err = initAllSubsystems(lkctx.Context(), newObject); err == nil {
txnLk.Unlock(lkctx.Cancel) txnLk.Unlock(lkctx.Cancel)
// All successful return. // All successful return.
if globalIsDistErasure { if globalIsDistErasure {
@ -384,17 +384,17 @@ func initAllSubsystems(ctx context.Context, newObject ObjectLayer) (err error) {
// Limit to no more than 50 concurrent buckets. // Limit to no more than 50 concurrent buckets.
g := errgroup.WithNErrs(len(buckets)).WithConcurrency(50) g := errgroup.WithNErrs(len(buckets)).WithConcurrency(50)
ctx, cancel := g.WithCancelOnError(ctx)
defer cancel()
for index := range buckets { for index := range buckets {
index := index index := index
g.Go(func() error { g.Go(func() error {
_, berr := newObject.HealBucket(ctx, buckets[index].Name, madmin.HealOpts{Recreate: true}) _, herr := newObject.HealBucket(ctx, buckets[index].Name, madmin.HealOpts{Recreate: true})
return berr return herr
}, index) }, index)
} }
if err := g.WaitErr(); err != nil { for _, err := range g.Wait() {
return fmt.Errorf("Unable to list buckets to heal: %w", err) if err != nil {
return fmt.Errorf("Unable to list buckets to heal: %w", err)
}
} }
} }

View File

@ -86,46 +86,163 @@ type StorageAPI interface {
SetDiskLoc(poolIdx, setIdx, diskIdx int) // Set location indexes. SetDiskLoc(poolIdx, setIdx, diskIdx int) // Set location indexes.
} }
// storageReader is an io.Reader view of a disk type unrecognizedDisk struct {
type storageReader struct { storage StorageAPI
storage StorageAPI
volume, path string
offset int64
} }
func (r *storageReader) Read(p []byte) (n int, err error) { func (p *unrecognizedDisk) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writer) (err error) {
nn, err := r.storage.ReadFile(context.TODO(), r.volume, r.path, r.offset, p, nil) return errDiskNotFound
r.offset += nn }
n = int(nn)
if err == io.ErrUnexpectedEOF && nn > 0 { func (p *unrecognizedDisk) String() string {
err = io.EOF return p.storage.String()
}
func (p *unrecognizedDisk) IsOnline() bool {
return false
}
func (p *unrecognizedDisk) LastConn() time.Time {
return p.storage.LastConn()
}
func (p *unrecognizedDisk) IsLocal() bool {
return p.storage.IsLocal()
}
func (p *unrecognizedDisk) Endpoint() Endpoint {
return p.storage.Endpoint()
}
func (p *unrecognizedDisk) Hostname() string {
return p.storage.Hostname()
}
func (p *unrecognizedDisk) Healing() *healingTracker {
return nil
}
func (p *unrecognizedDisk) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry) (dataUsageCache, error) {
return dataUsageCache{}, errDiskNotFound
}
func (p *unrecognizedDisk) GetDiskLoc() (poolIdx, setIdx, diskIdx int) {
return -1, -1, -1
}
func (p *unrecognizedDisk) SetDiskLoc(poolIdx, setIdx, diskIdx int) {
}
func (p *unrecognizedDisk) Close() error {
return p.storage.Close()
}
func (p *unrecognizedDisk) GetDiskID() (string, error) {
return "", errDiskNotFound
}
func (p *unrecognizedDisk) SetDiskID(id string) {
}
func (p *unrecognizedDisk) DiskInfo(ctx context.Context) (info DiskInfo, err error) {
return info, errDiskNotFound
}
func (p *unrecognizedDisk) MakeVolBulk(ctx context.Context, volumes ...string) (err error) {
return errDiskNotFound
}
func (p *unrecognizedDisk) MakeVol(ctx context.Context, volume string) (err error) {
return errDiskNotFound
}
func (p *unrecognizedDisk) ListVols(ctx context.Context) ([]VolInfo, error) {
return nil, errDiskNotFound
}
func (p *unrecognizedDisk) StatVol(ctx context.Context, volume string) (vol VolInfo, err error) {
return vol, errDiskNotFound
}
func (p *unrecognizedDisk) DeleteVol(ctx context.Context, volume string, forceDelete bool) (err error) {
return errDiskNotFound
}
func (p *unrecognizedDisk) ListDir(ctx context.Context, volume, dirPath string, count int) ([]string, error) {
return nil, errDiskNotFound
}
func (p *unrecognizedDisk) ReadFile(ctx context.Context, volume string, path string, offset int64, buf []byte, verifier *BitrotVerifier) (n int64, err error) {
return 0, errDiskNotFound
}
func (p *unrecognizedDisk) AppendFile(ctx context.Context, volume string, path string, buf []byte) (err error) {
return errDiskNotFound
}
func (p *unrecognizedDisk) CreateFile(ctx context.Context, volume, path string, size int64, reader io.Reader) error {
return errDiskNotFound
}
func (p *unrecognizedDisk) ReadFileStream(ctx context.Context, volume, path string, offset, length int64) (io.ReadCloser, error) {
return nil, errDiskNotFound
}
func (p *unrecognizedDisk) RenameFile(ctx context.Context, srcVolume, srcPath, dstVolume, dstPath string) error {
return errDiskNotFound
}
func (p *unrecognizedDisk) RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string) error {
return errDiskNotFound
}
func (p *unrecognizedDisk) CheckParts(ctx context.Context, volume string, path string, fi FileInfo) (err error) {
return errDiskNotFound
}
func (p *unrecognizedDisk) Delete(ctx context.Context, volume string, path string, recursive bool) (err error) {
return errDiskNotFound
}
// DeleteVersions deletes slice of versions, it can be same object
// or multiple objects.
func (p *unrecognizedDisk) DeleteVersions(ctx context.Context, volume string, versions []FileInfoVersions) (errs []error) {
errs = make([]error, len(versions))
for i := range errs {
errs[i] = errDiskNotFound
} }
return return errs
} }
// storageWriter is a io.Writer view of a disk. func (p *unrecognizedDisk) VerifyFile(ctx context.Context, volume, path string, fi FileInfo) error {
type storageWriter struct { return errDiskNotFound
storage StorageAPI
volume, path string
} }
func (w *storageWriter) Write(p []byte) (n int, err error) { func (p *unrecognizedDisk) WriteAll(ctx context.Context, volume string, path string, b []byte) (err error) {
err = w.storage.AppendFile(context.TODO(), w.volume, w.path, p) return errDiskNotFound
if err == nil {
n = len(p)
}
return
} }
// StorageWriter returns a new io.Writer which appends data to the file func (p *unrecognizedDisk) DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool) (err error) {
// at the given disk, volume and path. return errDiskNotFound
func StorageWriter(storage StorageAPI, volume, path string) io.Writer {
return &storageWriter{storage, volume, path}
} }
// StorageReader returns a new io.Reader which reads data to the file func (p *unrecognizedDisk) UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo) (err error) {
// at the given disk, volume, path and offset. return errDiskNotFound
func StorageReader(storage StorageAPI, volume, path string, offset int64) io.Reader { }
return &storageReader{storage, volume, path, offset}
func (p *unrecognizedDisk) WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) (err error) {
return errDiskNotFound
}
func (p *unrecognizedDisk) ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (fi FileInfo, err error) {
return fi, errDiskNotFound
}
func (p *unrecognizedDisk) ReadAll(ctx context.Context, volume string, path string) (buf []byte, err error) {
return nil, errDiskNotFound
}
func (p *unrecognizedDisk) StatInfoFile(ctx context.Context, volume, path string, glob bool) (stat []StatInfo, err error) {
return nil, errDiskNotFound
} }