add '.healing.bin' for tracking currently healing disk (#10573)

add a hint on the disk to allow for tracking fresh disk
being healed, to allow for restartable heals, and also
use this as a way to track and remove disks.

There are more pending changes where we should move
all the disk formatting logic to backend drives, this
PR doesn't deal with this refactor instead makes it
easier to track healing in the future.
This commit is contained in:
Harshavardhana 2020-09-28 19:39:32 -07:00 committed by GitHub
parent 849fcf0127
commit 66174692a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 521 additions and 172 deletions

View File

@ -295,6 +295,20 @@ func (a adminAPIHandlers) StorageInfoHandler(w http.ResponseWriter, r *http.Requ
// ignores any errors here.
storageInfo, _ := objectAPI.StorageInfo(ctx, false)
// Collect any disk healing.
healing, _ := getAggregatedBackgroundHealState(ctx)
healDisks := make(map[string]struct{}, len(healing.HealDisks))
for _, disk := range healing.HealDisks {
healDisks[disk] = struct{}{}
}
// find all disks which belong to each respective endpoints
for i, disk := range storageInfo.Disks {
if _, ok := healDisks[disk.Endpoint]; ok {
storageInfo.Disks[i].Healing = true
}
}
// Marshal API response
jsonBytes, err := json.Marshal(storageInfo)
if err != nil {

View File

@ -85,7 +85,7 @@ type healSequenceStatus struct {
// structure to hold state of all heal sequences in server memory
type allHealState struct {
sync.Mutex
sync.RWMutex
// map of heal path to heal sequence
healSeqMap map[string]*healSequence
@ -105,21 +105,21 @@ func newHealState() *allHealState {
}
func (ahs *allHealState) healDriveCount() int {
ahs.Lock()
defer ahs.Unlock()
ahs.RLock()
defer ahs.RUnlock()
return len(ahs.healLocalDisks)
}
func (ahs *allHealState) getHealLocalDisks() Endpoints {
ahs.Lock()
defer ahs.Unlock()
ahs.RLock()
defer ahs.RUnlock()
var healLocalDisks Endpoints
var endpoints Endpoints
for ep := range ahs.healLocalDisks {
healLocalDisks = append(healLocalDisks, ep)
endpoints = append(endpoints, ep)
}
return healLocalDisks
return endpoints
}
func (ahs *allHealState) popHealLocalDisks(healLocalDisks ...Endpoint) {

View File

@ -26,7 +26,17 @@ import (
"github.com/minio/minio/cmd/logger"
)
const defaultMonitorNewDiskInterval = time.Second * 10
const (
defaultMonitorNewDiskInterval = time.Second * 10
healingTrackerFilename = ".healing.bin"
)
//go:generate msgp -file $GOFILE -unexported
type healingTracker struct {
ID string
// future add more tracking capabilities
}
func initAutoHeal(ctx context.Context, objAPI ObjectLayer) {
z, ok := objAPI.(*erasureZones)
@ -47,9 +57,7 @@ func initAutoHeal(ctx context.Context, objAPI ObjectLayer) {
time.Sleep(time.Second)
}
for _, ep := range getLocalDisksToHeal() {
globalBackgroundHealState.pushHealLocalDisks(ep)
}
globalBackgroundHealState.pushHealLocalDisks(getLocalDisksToHeal()...)
if drivesToHeal := globalBackgroundHealState.healDriveCount(); drivesToHeal > 0 {
logger.Info(fmt.Sprintf("Found drives to heal %d, waiting until %s to heal the content...",
@ -76,9 +84,11 @@ func getLocalDisksToHeal() (disksToHeal Endpoints) {
}
// Try to connect to the current endpoint
// and reformat if the current disk is not formatted
_, _, err := connectEndpoint(endpoint)
disk, _, err := connectEndpoint(endpoint)
if errors.Is(err, errUnformattedDisk) {
disksToHeal = append(disksToHeal, endpoint)
} else if err == nil && disk != nil && disk.Healing() {
disksToHeal = append(disksToHeal, disk.Endpoint())
}
}
}
@ -106,7 +116,8 @@ func monitorLocalDisksAndHeal(ctx context.Context, z *erasureZones, bgSeq *healS
case <-time.After(defaultMonitorNewDiskInterval):
waitForLowHTTPReq(int32(globalEndpoints.NEndpoints()), time.Second)
var erasureSetInZoneEndpointToHeal []map[int]Endpoints
var erasureSetInZoneDisksToHeal []map[int][]StorageAPI
healDisks := globalBackgroundHealState.getHealLocalDisks()
if len(healDisks) > 0 {
// Reformat disks
@ -118,22 +129,21 @@ func monitorLocalDisksAndHeal(ctx context.Context, z *erasureZones, bgSeq *healS
logger.Info(fmt.Sprintf("Found drives to heal %d, proceeding to heal content...",
len(healDisks)))
erasureSetInZoneEndpointToHeal = make([]map[int]Endpoints, len(z.zones))
erasureSetInZoneDisksToHeal = make([]map[int][]StorageAPI, len(z.zones))
for i := range z.zones {
erasureSetInZoneEndpointToHeal[i] = map[int]Endpoints{}
erasureSetInZoneDisksToHeal[i] = map[int][]StorageAPI{}
}
}
// heal only if new disks found.
for _, endpoint := range healDisks {
// Load the new format of this passed endpoint
_, format, err := connectEndpoint(endpoint)
disk, format, err := connectEndpoint(endpoint)
if err != nil {
printEndpointError(endpoint, err, true)
continue
}
zoneIdx := globalEndpoints.GetLocalZoneIdx(endpoint)
zoneIdx := globalEndpoints.GetLocalZoneIdx(disk.Endpoint())
if zoneIdx < 0 {
continue
}
@ -145,32 +155,31 @@ func monitorLocalDisksAndHeal(ctx context.Context, z *erasureZones, bgSeq *healS
continue
}
erasureSetInZoneEndpointToHeal[zoneIdx][setIndex] = append(erasureSetInZoneEndpointToHeal[zoneIdx][setIndex], endpoint)
erasureSetInZoneDisksToHeal[zoneIdx][setIndex] = append(erasureSetInZoneDisksToHeal[zoneIdx][setIndex], disk)
}
for i, setMap := range erasureSetInZoneEndpointToHeal {
for setIndex, endpoints := range setMap {
for _, ep := range endpoints {
logger.Info("Healing disk '%s' on %s zone", ep, humanize.Ordinal(i+1))
buckets, _ := z.ListBucketsHeal(ctx)
for i, setMap := range erasureSetInZoneDisksToHeal {
for setIndex, disks := range setMap {
for _, disk := range disks {
logger.Info("Healing disk '%s' on %s zone", disk, humanize.Ordinal(i+1))
buckets, err := z.ListBucketsHeal(ctx)
if err != nil {
lbDisks := z.zones[i].sets[setIndex].getLoadBalancedDisks()
if err := healErasureSet(ctx, setIndex, buckets, lbDisks, z.zones[i].setDriveCount); err != nil {
logger.LogIf(ctx, err)
continue
}
if len(buckets) > 0 {
disks := z.zones[i].sets[setIndex].getLoadBalancedDisks()
if err := healErasureSet(ctx, setIndex, buckets, disks, z.zones[i].setDriveCount); err != nil {
logger.LogIf(ctx, err)
continue
}
logger.Info("Healing disk '%s' on %s zone complete", disk, humanize.Ordinal(i+1))
if err := disk.DeleteFile(ctx, pathJoin(minioMetaBucket, bucketMetaPrefix),
healingTrackerFilename); err != nil {
logger.LogIf(ctx, err)
continue
}
logger.Info("Healing disk '%s' on %s zone complete", ep, humanize.Ordinal(i+1))
// Only upon success pop the healed disk.
globalBackgroundHealState.popHealLocalDisks(ep)
globalBackgroundHealState.popHealLocalDisks(disk.Endpoint())
}
}
}

View File

@ -0,0 +1,110 @@
package cmd
// Code generated by github.com/tinylib/msgp DO NOT EDIT.
import (
"github.com/tinylib/msgp/msgp"
)
// DecodeMsg implements msgp.Decodable
func (z *healingTracker) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "ID":
z.ID, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "ID")
return
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
return
}
// EncodeMsg implements msgp.Encodable
func (z healingTracker) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 1
// write "ID"
err = en.Append(0x81, 0xa2, 0x49, 0x44)
if err != nil {
return
}
err = en.WriteString(z.ID)
if err != nil {
err = msgp.WrapError(err, "ID")
return
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z healingTracker) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 1
// string "ID"
o = append(o, 0x81, 0xa2, 0x49, 0x44)
o = msgp.AppendString(o, z.ID)
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *healingTracker) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "ID":
z.ID, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "ID")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z healingTracker) Msgsize() (s int) {
s = 1 + 3 + msgp.StringPrefixSize + len(z.ID)
return
}

View File

@ -0,0 +1,123 @@
package cmd
// Code generated by github.com/tinylib/msgp DO NOT EDIT.
import (
"bytes"
"testing"
"github.com/tinylib/msgp/msgp"
)
func TestMarshalUnmarshalhealingTracker(t *testing.T) {
v := healingTracker{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsghealingTracker(b *testing.B) {
v := healingTracker{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsghealingTracker(b *testing.B) {
v := healingTracker{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshalhealingTracker(b *testing.B) {
v := healingTracker{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodehealingTracker(t *testing.T) {
v := healingTracker{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodehealingTracker Msgsize() is inaccurate")
}
vn := healingTracker{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodehealingTracker(b *testing.B) {
v := healingTracker{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodehealingTracker(b *testing.B) {
v := healingTracker{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}

View File

@ -85,31 +85,40 @@ func undoDeleteBucket(storageDisks []StorageAPI, bucket string) {
// getBucketInfo - returns the BucketInfo from one of the load balanced disks.
func (er erasureObjects) getBucketInfo(ctx context.Context, bucketName string) (bucketInfo BucketInfo, err error) {
var bucketErrs []error
for _, disk := range er.getLoadBalancedDisks() {
if disk == nil {
bucketErrs = append(bucketErrs, errDiskNotFound)
continue
}
volInfo, serr := disk.StatVol(ctx, bucketName)
if serr == nil {
return BucketInfo(volInfo), nil
}
err = serr
// For any reason disk went offline continue and pick the next one.
if IsErrIgnored(err, bucketMetadataOpIgnoredErrs...) {
bucketErrs = append(bucketErrs, err)
continue
}
// Any error which cannot be ignored, we return quickly.
return BucketInfo{}, err
storageDisks := er.getDisks()
g := errgroup.WithNErrs(len(storageDisks))
var bucketsInfo = make([]BucketInfo, len(storageDisks))
// Undo previous make bucket entry on all underlying storage disks.
for index := range storageDisks {
index := index
g.Go(func() error {
if storageDisks[index] == nil {
return errDiskNotFound
}
volInfo, err := storageDisks[index].StatVol(ctx, bucketName)
if err != nil {
return err
}
bucketsInfo[index] = BucketInfo(volInfo)
return nil
}, index)
}
errs := g.Wait()
for i, err := range errs {
if err == nil {
return bucketsInfo[i], nil
}
}
// If all our errors were ignored, then we try to
// reduce to one error based on read quorum.
// `nil` is deliberately passed for ignoredErrs
// because these errors were already ignored.
readQuorum := getReadQuorum(len(er.getDisks()))
return BucketInfo{}, reduceReadQuorumErrs(ctx, bucketErrs, nil, readQuorum)
readQuorum := getReadQuorum(len(storageDisks))
return BucketInfo{}, reduceReadQuorumErrs(ctx, errs, nil, readQuorum)
}
// GetBucketInfo - returns BucketInfo for a bucket.

View File

@ -28,7 +28,9 @@ func (er erasureObjects) getLoadBalancedLocalDisks() (newDisks []StorageAPI) {
// Based on the random shuffling return back randomized disks.
for _, i := range hashOrder(UTCNow().String(), len(disks)) {
if disks[i-1] != nil && disks[i-1].IsLocal() {
newDisks = append(newDisks, disks[i-1])
if !disks[i-1].Healing() && disks[i-1].IsOnline() {
newDisks = append(newDisks, disks[i-1])
}
}
}
return newDisks
@ -40,9 +42,6 @@ func (er erasureObjects) getLoadBalancedLocalDisks() (newDisks []StorageAPI) {
func (er erasureObjects) getLoadBalancedNDisks(ndisks int) (newDisks []StorageAPI) {
disks := er.getLoadBalancedDisks()
for _, disk := range disks {
if disk == nil {
continue
}
newDisks = append(newDisks, disk)
ndisks--
if ndisks == 0 {
@ -53,11 +52,16 @@ func (er erasureObjects) getLoadBalancedNDisks(ndisks int) (newDisks []StorageAP
}
// getLoadBalancedDisks - fetches load balanced (sufficiently randomized) disk slice.
// ensures to skip disks if they are not healing and online.
func (er erasureObjects) getLoadBalancedDisks() (newDisks []StorageAPI) {
disks := er.getDisks()
// Based on the random shuffling return back randomized disks.
for _, i := range hashOrder(UTCNow().String(), len(disks)) {
newDisks = append(newDisks, disks[i-1])
// Do not consume disks which are being healed.
if disks[i-1] != nil && !disks[i-1].Healing() && disks[i-1].IsOnline() {
newDisks = append(newDisks, disks[i-1])
}
}
return newDisks
}

View File

@ -148,10 +148,8 @@ func (er erasureObjects) ListMultipartUploads(ctx context.Context, bucket, objec
result.Delimiter = delimiter
var uploadIDs []string
for _, disk := range er.getLoadBalancedDisks() {
if disk == nil {
continue
}
var disk StorageAPI
for _, disk = range er.getLoadBalancedDisks() {
uploadIDs, err = disk.ListDir(ctx, minioMetaMultipartBucket, er.getMultipartSHADir(bucket, object), -1)
if err != nil {
if err == errDiskNotFound {
@ -176,30 +174,20 @@ func (er erasureObjects) ListMultipartUploads(ctx context.Context, bucket, objec
populatedUploadIds := set.NewStringSet()
retry:
for _, disk := range er.getLoadBalancedDisks() {
if disk == nil {
for _, uploadID := range uploadIDs {
if populatedUploadIds.Contains(uploadID) {
continue
}
for _, uploadID := range uploadIDs {
if populatedUploadIds.Contains(uploadID) {
continue
}
fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, pathJoin(er.getUploadIDDir(bucket, object, uploadID)), "")
if err != nil {
if err == errDiskNotFound || err == errFileNotFound {
goto retry
}
return result, toObjectErr(err, bucket, object)
}
populatedUploadIds.Add(uploadID)
uploads = append(uploads, MultipartInfo{
Object: object,
UploadID: uploadID,
Initiated: fi.ModTime,
})
fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, pathJoin(er.getUploadIDDir(bucket, object, uploadID)), "")
if err != nil {
return result, toObjectErr(err, bucket, object)
}
break
populatedUploadIds.Add(uploadID)
uploads = append(uploads, MultipartInfo{
Object: object,
UploadID: uploadID,
Initiated: fi.ModTime,
})
}
sort.Slice(uploads, func(i int, j int) bool {

View File

@ -231,6 +231,11 @@ func (s *erasureSets) connectDisks() {
return
}
disk.SetDiskID(format.Erasure.This)
if endpoint.IsLocal && disk.Healing() {
globalBackgroundHealState.pushHealLocalDisks(disk.Endpoint())
logger.Info(fmt.Sprintf("Found the drive %s which needs healing, attempting to heal...", disk))
}
s.erasureDisksMu.Lock()
if s.erasureDisks[setIndex][diskIndex] != nil {
s.erasureDisks[setIndex][diskIndex].Close()
@ -316,7 +321,7 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto
endpointStrings: endpointStrings,
setCount: setCount,
setDriveCount: setDriveCount,
listTolerancePerSet: setDriveCount / 2,
listTolerancePerSet: 3, // Expect 3 good entries across disks.
format: format,
disksConnectEvent: make(chan diskConnectInfo),
distributionAlgo: format.Erasure.DistributionAlgo,
@ -1385,7 +1390,7 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H
}
// Save formats `format.json` across all disks.
if err = saveFormatErasureAll(ctx, storageDisks, tmpNewFormats); err != nil {
if err = saveFormatErasureAllWithErrs(ctx, storageDisks, sErrs, tmpNewFormats); err != nil {
return madmin.HealResultItem{}, err
}

View File

@ -661,7 +661,7 @@ func (z *erasureZones) listObjectsNonSlash(ctx context.Context, bucket, prefix,
for _, zone := range z.zones {
zonesEntryChs = append(zonesEntryChs,
zone.startMergeWalksN(ctx, bucket, prefix, "", true, endWalkCh, zone.setDriveCount, false))
zone.startMergeWalksN(ctx, bucket, prefix, "", true, endWalkCh, zone.listTolerancePerSet, false))
zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.listTolerancePerSet)
}
@ -780,7 +780,7 @@ func (z *erasureZones) listObjectsSplunk(ctx context.Context, bucket, prefix, ma
entryChs, endWalkCh := zone.poolSplunk.Release(listParams{bucket, recursive, marker, prefix})
if entryChs == nil {
endWalkCh = make(chan struct{})
entryChs = zone.startMergeWalksN(ctx, bucket, prefix, marker, recursive, endWalkCh, zone.setDriveCount, true)
entryChs = zone.startMergeWalksN(ctx, bucket, prefix, marker, recursive, endWalkCh, zone.listTolerancePerSet, true)
}
zonesEntryChs = append(zonesEntryChs, entryChs)
zonesEndWalkCh = append(zonesEndWalkCh, endWalkCh)
@ -872,7 +872,7 @@ func (z *erasureZones) listObjects(ctx context.Context, bucket, prefix, marker,
entryChs, endWalkCh := zone.pool.Release(listParams{bucket, recursive, marker, prefix})
if entryChs == nil {
endWalkCh = make(chan struct{})
entryChs = zone.startMergeWalksN(ctx, bucket, prefix, marker, recursive, endWalkCh, zone.setDriveCount, false)
entryChs = zone.startMergeWalksN(ctx, bucket, prefix, marker, recursive, endWalkCh, zone.listTolerancePerSet, false)
}
zonesEntryChs = append(zonesEntryChs, entryChs)
zonesEndWalkCh = append(zonesEndWalkCh, endWalkCh)
@ -1274,7 +1274,7 @@ func (z *erasureZones) listObjectVersions(ctx context.Context, bucket, prefix, m
entryChs, endWalkCh := zone.poolVersions.Release(listParams{bucket, recursive, marker, prefix})
if entryChs == nil {
endWalkCh = make(chan struct{})
entryChs = zone.startMergeWalksVersionsN(ctx, bucket, prefix, marker, recursive, endWalkCh, zone.setDriveCount)
entryChs = zone.startMergeWalksVersionsN(ctx, bucket, prefix, marker, recursive, endWalkCh, zone.listTolerancePerSet)
}
zonesEntryChs = append(zonesEntryChs, entryChs)
zonesEndWalkCh = append(zonesEndWalkCh, endWalkCh)

View File

@ -167,6 +167,7 @@ func getDisksInfo(disks []StorageAPI, endpoints []string) (disksInfo []madmin.Di
AvailableSpace: info.Free,
UUID: info.ID,
RootDisk: info.RootDisk,
Healing: info.Healing,
State: diskErrToDriveState(err),
}
if info.Total > 0 {
@ -256,46 +257,20 @@ func (er erasureObjects) StorageInfo(ctx context.Context, local bool) (StorageIn
// Updates are sent on a regular basis and the caller *must* consume them.
func (er erasureObjects) crawlAndGetDataUsage(ctx context.Context, buckets []BucketInfo, bf *bloomFilter, updates chan<- dataUsageCache) error {
if len(buckets) == 0 {
logger.Info(color.Green("data-crawl:") + " No buckets found, skipping crawl")
return nil
}
// Collect any disk healing.
healing, err := getAggregatedBackgroundHealState(ctx)
if err != nil {
return err
}
healDisks := make(map[string]struct{}, len(healing.HealDisks))
for _, disk := range healing.HealDisks {
healDisks[disk] = struct{}{}
}
// Collect disks we can use.
var disks []StorageAPI
for _, d := range er.getLoadBalancedDisks() {
if d == nil || !d.IsOnline() {
continue
}
di, err := d.DiskInfo(ctx)
if err != nil {
logger.LogIf(ctx, err)
continue
}
if _, ok := healDisks[di.Endpoint]; ok {
logger.Info(color.Green("data-crawl:")+" Disk %q is Healing, skipping disk.", di.Endpoint)
continue
}
disks = append(disks, d)
}
disks := er.getLoadBalancedDisks()
if len(disks) == 0 {
logger.Info(color.Green("data-crawl:") + " No disks found, skipping crawl")
logger.Info(color.Green("data-crawl:") + " all disks are offline or being healed, skipping crawl")
return nil
}
// Load bucket totals
oldCache := dataUsageCache{}
err = oldCache.load(ctx, er, dataUsageCacheName)
if err != nil {
if err := oldCache.load(ctx, er, dataUsageCacheName); err != nil {
return err
}
@ -403,6 +378,7 @@ func (er erasureObjects) crawlAndGetDataUsage(ctx context.Context, buckets []Buc
// Calc usage
before := cache.Info.LastUpdate
var err error
cache, err = disk.CrawlAndGetDataUsage(ctx, cache)
cache.Info.BloomFilter = nil
if err != nil {

View File

@ -21,6 +21,7 @@ import (
"context"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"reflect"
@ -335,7 +336,7 @@ func loadFormatErasureAll(storageDisks []StorageAPI, heal bool) ([]*formatErasur
return formats, g.Wait()
}
func saveFormatErasure(disk StorageAPI, format *formatErasureV3) error {
func saveFormatErasure(disk StorageAPI, format *formatErasureV3, heal bool) error {
if disk == nil || format == nil {
return errDiskNotFound
}
@ -368,6 +369,18 @@ func saveFormatErasure(disk StorageAPI, format *formatErasureV3) error {
}
disk.SetDiskID(diskID)
if heal {
htracker := healingTracker{
ID: diskID,
}
htrackerBytes, err := htracker.MarshalMsg(nil)
if err != nil {
return err
}
return disk.WriteAll(context.TODO(), minioMetaBucket,
pathJoin(bucketMetaPrefix, slashSeparator, healingTrackerFilename),
bytes.NewReader(htrackerBytes))
}
return nil
}
@ -551,7 +564,8 @@ func formatErasureFixLocalDeploymentID(endpoints Endpoints, storageDisks []Stora
return nil
}
format.ID = refFormat.ID
if err := saveFormatErasure(storageDisks[index], format); err != nil {
// Heal the drive if we fixed its deployment ID.
if err := saveFormatErasure(storageDisks[index], format, true); err != nil {
logger.LogIf(GlobalContext, err)
return fmt.Errorf("Unable to save format.json, %w", err)
}
@ -686,6 +700,27 @@ func initErasureMetaVolumesInLocalDisks(storageDisks []StorageAPI, formats []*fo
return nil
}
// saveFormatErasureAllWithErrs - populates `format.json` on disks in its order.
// also adds `.healing.bin` on the disks which are being actively healed.
func saveFormatErasureAllWithErrs(ctx context.Context, storageDisks []StorageAPI, fErrs []error, formats []*formatErasureV3) error {
g := errgroup.WithNErrs(len(storageDisks))
// Write `format.json` to all disks.
for index := range storageDisks {
index := index
g.Go(func() error {
if formats[index] == nil {
return errDiskNotFound
}
return saveFormatErasure(storageDisks[index], formats[index], errors.Is(fErrs[index], errUnformattedDisk))
}, index)
}
writeQuorum := getWriteQuorum(len(storageDisks))
// Wait for the routines to finish.
return reduceWriteQuorumErrs(ctx, g.Wait(), nil, writeQuorum)
}
// saveFormatErasureAll - populates `format.json` on disks in its order.
func saveFormatErasureAll(ctx context.Context, storageDisks []StorageAPI, formats []*formatErasureV3) error {
g := errgroup.WithNErrs(len(storageDisks))
@ -697,7 +732,7 @@ func saveFormatErasureAll(ctx context.Context, storageDisks []StorageAPI, format
if formats[index] == nil {
return errDiskNotFound
}
return saveFormatErasure(storageDisks[index], formats[index])
return saveFormatErasure(storageDisks[index], formats[index], false)
}, index)
}
@ -771,7 +806,8 @@ func fixFormatErasureV3(storageDisks []StorageAPI, endpoints Endpoints, formats
}
if formats[i].Erasure.This == "" {
formats[i].Erasure.This = formats[i].Erasure.Sets[0][i]
if err := saveFormatErasure(storageDisks[i], formats[i]); err != nil {
// Heal the drive if drive has .This empty.
if err := saveFormatErasure(storageDisks[i], formats[i], true); err != nil {
return err
}
}

View File

@ -130,6 +130,9 @@ func (srv *Server) Shutdown() error {
srv.listenerMutex.Lock()
err := srv.listener.Close()
srv.listenerMutex.Unlock()
if err != nil {
return err
}
// Wait for opened connection to be closed up to Shutdown timeout.
shutdownTimeout := srv.ShutdownTimeout
@ -144,12 +147,12 @@ func (srv *Server) Shutdown() error {
if err == nil {
_ = pprof.Lookup("goroutine").WriteTo(tmp, 1)
tmp.Close()
return errors.New("timed out. some connections are still active. doing abnormal shutdown. goroutines written to " + tmp.Name())
return errors.New("timed out. some connections are still active. goroutines written to " + tmp.Name())
}
return errors.New("timed out. some connections are still active. doing abnormal shutdown")
return errors.New("timed out. some connections are still active")
case <-ticker.C:
if atomic.LoadInt32(&srv.requestCount) <= 0 {
return err
return nil
}
}
}

View File

@ -58,10 +58,18 @@ func (d *naughtyDisk) IsLocal() bool {
return d.disk.IsLocal()
}
func (d *naughtyDisk) Endpoint() Endpoint {
return d.disk.Endpoint()
}
func (d *naughtyDisk) Hostname() string {
return d.disk.Hostname()
}
func (d *naughtyDisk) Healing() bool {
return d.disk.Healing()
}
func (d *naughtyDisk) Close() (err error) {
if err = d.calcError(); err != nil {
return err

View File

@ -141,7 +141,7 @@ func formatErasureCleanupTmpLocalEndpoints(endpoints Endpoints) error {
return fmt.Errorf("unable to rename (%s -> %s) %w",
pathJoin(epPath, minioMetaTmpBucket),
tmpOld,
err)
osErrToFileErr(err))
}
// Removal of tmp-old folder is backgrounded completely.

View File

@ -464,12 +464,12 @@ func serverMain(ctx *cli.Context) {
}
newObject, err := newObjectLayer(GlobalContext, globalEndpoints)
logger.SetDeploymentID(globalDeploymentID)
if err != nil {
globalHTTPServer.Shutdown()
logger.Fatal(err, "Unable to initialize backend")
logFatalErrs(err, Endpoint{}, true)
}
logger.SetDeploymentID(globalDeploymentID)
// Once endpoints are finalized, initialize the new object api in safe mode.
globalObjLayerMutex.Lock()
globalSafeMode = true

View File

@ -24,7 +24,6 @@ import (
func setMaxResources() (err error) {
// Set the Go runtime max threads threshold to 90% of kernel setting.
// Do not return when an error when encountered since it is not a crucial task.
sysMaxThreads, mErr := sys.GetMaxThreads()
if mErr == nil {
minioMaxThreads := (sysMaxThreads * 90) / 100

View File

@ -55,7 +55,7 @@ var errFileNotFound = StorageErr("file not found")
var errFileVersionNotFound = StorageErr("file version not found")
// errTooManyOpenFiles - too many open files.
var errTooManyOpenFiles = StorageErr("too many open files")
var errTooManyOpenFiles = StorageErr("too many open files, please increase 'ulimit -n'")
// errFileNameTooLong - given file name is too long than supported length.
var errFileNameTooLong = StorageErr("file name too long")

View File

@ -30,10 +30,13 @@ type StorageAPI interface {
IsOnline() bool // Returns true if disk is online.
IsLocal() bool
Hostname() string // Returns host name if remote host.
Hostname() string // Returns host name if remote host.
Endpoint() Endpoint // Returns endpoint.
Close() error
GetDiskID() (string, error)
SetDiskID(id string)
Healing() bool // Returns if disk is healing.
DiskInfo(ctx context.Context) (info DiskInfo, err error)
CrawlAndGetDataUsage(ctx context.Context, cache dataUsageCache) (dataUsageCache, error)

View File

@ -156,6 +156,14 @@ func (client *storageRESTClient) Hostname() string {
return client.endpoint.Host
}
func (client *storageRESTClient) Endpoint() Endpoint {
return client.endpoint
}
func (client *storageRESTClient) Healing() bool {
return false
}
func (client *storageRESTClient) CrawlAndGetDataUsage(ctx context.Context, cache dataUsageCache) (dataUsageCache, error) {
b := cache.serialize()
respBody, err := client.call(ctx, storageRESTMethodCrawlAndGetDataUsage, url.Values{}, bytes.NewBuffer(b), int64(len(b)))

View File

@ -826,6 +826,69 @@ func (s *storageRESTServer) VerifyFileHandler(w http.ResponseWriter, r *http.Req
w.(http.Flusher).Flush()
}
// A single function to write certain errors to be fatal
// or informative based on the `exit` flag, please look
// at each implementation of error for added hints.
//
// FIXME: This is an unusual function but serves its purpose for
// now, need to revist the overall erroring structure here.
// Do not like it :-(
func logFatalErrs(err error, endpoint Endpoint, exit bool) {
if errors.Is(err, errMinDiskSize) {
logger.Fatal(config.ErrUnableToWriteInBackend(err).Hint(err.Error()), "Unable to initialize backend")
} else if errors.Is(err, errUnsupportedDisk) {
var hint string
if endpoint.URL != nil {
hint = fmt.Sprintf("Disk '%s' does not support O_DIRECT flags, MinIO erasure coding requires filesystems with O_DIRECT support", endpoint.Path)
} else {
hint = "Disks do not support O_DIRECT flags, MinIO erasure coding requires filesystems with O_DIRECT support"
}
logger.Fatal(config.ErrUnsupportedBackend(err).Hint(hint), "Unable to initialize backend")
} else if errors.Is(err, errDiskNotDir) {
var hint string
if endpoint.URL != nil {
hint = fmt.Sprintf("Disk '%s' is not a directory, MinIO erasure coding needs a directory", endpoint.Path)
} else {
hint = "Disks are not directories, MinIO erasure coding needs directories"
}
logger.Fatal(config.ErrUnableToWriteInBackend(err).Hint(hint), "Unable to initialize backend")
} else if errors.Is(err, errFileAccessDenied) {
// Show a descriptive error with a hint about how to fix it.
var username string
if u, err := user.Current(); err == nil {
username = u.Username
} else {
username = "<your-username>"
}
var hint string
if endpoint.URL != nil {
hint = fmt.Sprintf("Run the following command to add write permissions: `sudo chown -R %s %s && sudo chmod u+rxw %s`",
username, endpoint.Path, endpoint.Path)
} else {
hint = fmt.Sprintf("Run the following command to add write permissions: `sudo chown -R %s. <path> && sudo chmod u+rxw <path>`", username)
}
logger.Fatal(config.ErrUnableToWriteInBackend(err).Hint(hint), "Unable to initialize backend")
} else if errors.Is(err, errFaultyDisk) {
if !exit {
logger.LogIf(GlobalContext, fmt.Errorf("disk is faulty at %s, please replace the drive - disk will be offline", endpoint))
} else {
logger.Fatal(err, "Unable to initialize backend")
}
} else if errors.Is(err, errDiskFull) {
if !exit {
logger.LogIf(GlobalContext, fmt.Errorf("disk is already full at %s, incoming I/O will fail - disk will be offline", endpoint))
} else {
logger.Fatal(err, "Unable to initialize backend")
}
} else {
if !exit {
logger.LogIf(GlobalContext, fmt.Errorf("disk returned an unexpected error at %s, please investigate - disk will be offline", endpoint))
} else {
logger.Fatal(err, "Unable to initialize backend")
}
}
}
// registerStorageRPCRouter - register storage rpc router.
func registerStorageRESTHandlers(router *mux.Router, endpointZones EndpointZones) {
for _, ep := range endpointZones {
@ -835,32 +898,9 @@ func registerStorageRESTHandlers(router *mux.Router, endpointZones EndpointZones
}
storage, err := newXLStorage(endpoint)
if err != nil {
switch err {
case errMinDiskSize:
logger.Fatal(config.ErrUnableToWriteInBackend(err).Hint(err.Error()), "Unable to initialize backend")
case errUnsupportedDisk:
hint := fmt.Sprintf("'%s' does not support O_DIRECT flags, MinIO erasure coding requires filesystems with O_DIRECT support", endpoint.Path)
logger.Fatal(config.ErrUnsupportedBackend(err).Hint(hint), "Unable to initialize backend")
case errDiskNotDir:
hint := fmt.Sprintf("'%s' MinIO erasure coding needs a directory", endpoint.Path)
logger.Fatal(config.ErrUnableToWriteInBackend(err).Hint(hint), "Unable to initialize backend")
case errFileAccessDenied:
// Show a descriptive error with a hint about how to fix it.
var username string
if u, err := user.Current(); err == nil {
username = u.Username
} else {
username = "<your-username>"
}
hint := fmt.Sprintf("Run the following command to add write permissions: `sudo chown -R %s %s && sudo chmod u+rxw %s`", username, endpoint.Path, endpoint.Path)
logger.Fatal(config.ErrUnableToWriteInBackend(err).Hint(hint), "Unable to initialize posix backend")
case errFaultyDisk:
logger.LogIf(GlobalContext, fmt.Errorf("disk is faulty at %s, please replace the drive", endpoint))
case errDiskFull:
logger.LogIf(GlobalContext, fmt.Errorf("disk is already full at %s, incoming I/O will fail", endpoint))
default:
logger.LogIf(GlobalContext, fmt.Errorf("disk returned an unexpected error at %s, please investigate", endpoint))
}
// if supported errors don't fail, we proceed to
// printing message and moving forward.
logFatalErrs(err, endpoint, false)
}
server := &storageRESTServer{storage: storage}

View File

@ -43,10 +43,18 @@ func (p *xlStorageDiskIDCheck) IsLocal() bool {
return p.storage.IsLocal()
}
func (p *xlStorageDiskIDCheck) Endpoint() Endpoint {
return p.storage.Endpoint()
}
func (p *xlStorageDiskIDCheck) Hostname() string {
return p.storage.Hostname()
}
func (p *xlStorageDiskIDCheck) Healing() bool {
return p.storage.Healing()
}
func (p *xlStorageDiskIDCheck) CrawlAndGetDataUsage(ctx context.Context, cache dataUsageCache) (dataUsageCache, error) {
if err := p.checkDiskStale(); err != nil {
return dataUsageCache{}, err

View File

@ -92,8 +92,7 @@ type xlStorage struct {
activeIOCount int32
diskPath string
hostname string
endpoint string
endpoint Endpoint
pool sync.Pool
@ -249,7 +248,6 @@ func newLocalXLStorage(path string) (*xlStorage, error) {
// Initialize a new storage disk.
func newXLStorage(ep Endpoint) (*xlStorage, error) {
path := ep.Path
hostname := ep.Host
var err error
if path, err = getValidPath(path, true); err != nil {
return nil, err
@ -262,8 +260,7 @@ func newXLStorage(ep Endpoint) (*xlStorage, error) {
p := &xlStorage{
diskPath: path,
hostname: hostname,
endpoint: ep.String(),
endpoint: ep,
pool: sync.Pool{
New: func() interface{} {
b := disk.AlignedBlock(readBlockSize)
@ -319,7 +316,11 @@ func (s *xlStorage) String() string {
}
func (s *xlStorage) Hostname() string {
return s.hostname
return s.endpoint.Host
}
func (s *xlStorage) Endpoint() Endpoint {
return s.endpoint
}
func (*xlStorage) Close() error {
@ -334,6 +335,13 @@ func (s *xlStorage) IsLocal() bool {
return true
}
func (s *xlStorage) Healing() bool {
healingFile := pathJoin(s.diskPath, minioMetaBucket,
bucketMetaPrefix, healingTrackerFilename)
_, err := os.Stat(healingFile)
return err == nil
}
func (s *xlStorage) waitForLowActiveIO() {
max := lowActiveIOWaitMaxN
for atomic.LoadInt32(&s.activeIOCount) >= s.maxActiveIOCount {
@ -439,6 +447,7 @@ type DiskInfo struct {
Free uint64
Used uint64
RootDisk bool
Healing bool
Endpoint string
MountPath string
ID string
@ -462,9 +471,10 @@ func (s *xlStorage) DiskInfo(context.Context) (info DiskInfo, err error) {
Total: di.Total,
Free: di.Free,
Used: di.Total - di.Free,
Healing: s.Healing(),
RootDisk: s.rootDisk,
MountPath: s.diskPath,
Endpoint: s.endpoint,
Endpoint: s.endpoint.String(),
}
diskID, err := s.GetDiskID()

4
go.sum
View File

@ -469,7 +469,6 @@ github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.etcd.io/bbolt v1.3.5 h1:XAzx9gjCb0Rxj7EoqcClPD1d5ZBxZJk0jbuoPHenBt0=
go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
@ -532,7 +531,6 @@ golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20191112182307-2180aed22343/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20200707034311-ab3426394381 h1:VXak5I6aEWmAXeQjA+QSZzlgNrpq9mjcfDemuexIKsU=
golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
@ -601,8 +599,6 @@ golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200425043458-8463f397d07c h1:iHhCR0b26amDCiiO+kBguKZom9aMF+NrFxh9zeKR/XU=
golang.org/x/tools v0.0.0-20200425043458-8463f397d07c/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20200814172026-c4923e618c08 h1:sfBQLM20fzeXhOixVQirwEbuW4PGStP773EXQpsBB6E=
golang.org/x/tools v0.0.0-20200814172026-c4923e618c08/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200925191224-5d1fdd8fa346 h1:hzJjkvxUIF3bSt+v8N5tBQNx/605vszZJ+3XsIamzZo=
golang.org/x/tools v0.0.0-20200925191224-5d1fdd8fa346/go.mod h1:z6u4i615ZeAfBE4XtMziQW1fSVJXACjjbWkB/mvPzlU=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=