Add active disk health checks (#17539)

Add check every 2 minutes to see if a write+read operation can complete.

If disk is unresponsive for 2 minutes or returns errFaultyDisk, take it offline.
This commit is contained in:
Klaus Post 2023-07-13 11:41:55 -07:00 committed by GitHub
parent 183428db03
commit 4f89e5bba9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 89 additions and 6 deletions

View File

@ -38,7 +38,7 @@ func TestFixFormatV3(t *testing.T) {
}
endpoints := mustGetNewEndpoints(0, erasureDirs...)
storageDisks, errs := initStorageDisksWithErrors(endpoints, true)
storageDisks, errs := initStorageDisksWithErrors(endpoints, false)
for _, err := range errs {
if err != nil && err != errDiskNotFound {
t.Fatal(err)
@ -559,7 +559,7 @@ func benchmarkInitStorageDisksN(b *testing.B, nDisks int) {
b.RunParallel(func(pb *testing.PB) {
endpoints := endpoints
for pb.Next() {
initStorageDisksWithErrors(endpoints, true)
initStorageDisksWithErrors(endpoints, false)
}
})
}

View File

@ -62,7 +62,7 @@ func newStorageAPI(endpoint Endpoint, healthCheck bool) (storage StorageAPI, err
if err != nil {
return nil, err
}
return newXLStorageDiskIDCheck(storage), nil
return newXLStorageDiskIDCheck(storage, healthCheck), nil
}
return newStorageRESTClient(endpoint, healthCheck), nil

View File

@ -1361,7 +1361,7 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerPools Endpoin
endpoint := storage.Endpoint()
server := &storageRESTServer{storage: newXLStorageDiskIDCheck(storage)}
server := &storageRESTServer{storage: newXLStorageDiskIDCheck(storage, true)}
server.storage.SetDiskID(storage.diskID)
subrouter := router.PathPrefix(path.Join(storageRESTPrefix, endpoint.Path)).Subrouter()

View File

@ -31,6 +31,7 @@ import (
"time"
"github.com/minio/madmin-go/v3"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger"
"github.com/minio/pkg/env"
)
@ -83,6 +84,8 @@ type xlStorageDiskIDCheck struct {
storage *xlStorage
health *diskHealthTracker
metricsCache timedValue
diskCtx context.Context
cancel context.CancelFunc
}
func (p *xlStorageDiskIDCheck) getMetrics() DiskMetrics {
@ -160,14 +163,18 @@ func (e *lockedLastMinuteLatency) total() AccElem {
return e.lastMinuteLatency.getTotal()
}
func newXLStorageDiskIDCheck(storage *xlStorage) *xlStorageDiskIDCheck {
func newXLStorageDiskIDCheck(storage *xlStorage, healthCheck bool) *xlStorageDiskIDCheck {
xl := xlStorageDiskIDCheck{
storage: storage,
health: newDiskHealthTracker(),
}
xl.diskCtx, xl.cancel = context.WithCancel(context.TODO())
for i := range xl.apiLatencies[:] {
xl.apiLatencies[i] = &lockedLastMinuteLatency{}
}
if healthCheck {
go xl.monitorDiskWritable(xl.diskCtx)
}
return &xl
}
@ -225,6 +232,7 @@ func (p *xlStorageDiskIDCheck) SetDiskLoc(poolIdx, setIdx, diskIdx int) {
}
func (p *xlStorageDiskIDCheck) Close() error {
p.cancel()
return p.storage.Close()
}
@ -859,6 +867,81 @@ func (p *xlStorageDiskIDCheck) monitorDiskStatus() {
}
}
// monitorDiskStatus should be called once when a drive has been marked offline.
// Once the disk has been deemed ok, it will return to online status.
func (p *xlStorageDiskIDCheck) monitorDiskWritable(ctx context.Context) {
const (
// We check every 15 seconds if the disk is writable and we can read back.
checkEvery = 15 * time.Second
// Disk has 2 minutes to complete write+read.
timeoutOperation = 2 * time.Minute
// If the disk has completed an operation successfully within last 5 seconds, don't check it.
skipIfSuccessBefore = 5 * time.Second
)
t := time.NewTicker(checkEvery)
defer t.Stop()
fn := mustGetUUID()
// Be just above directio size.
toWrite := []byte{xioutil.DirectioAlignSize + 1: 42}
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
for range t.C {
if contextCanceled(ctx) {
break
}
if atomic.LoadInt32(&p.health.status) != diskHealthOK {
continue
}
if time.Since(time.Unix(0, atomic.LoadInt64(&p.health.lastSuccess))) < skipIfSuccessBefore {
// We recently saw a success - no need to check.
continue
}
goOffline := func(err error) {
if atomic.CompareAndSwapInt32(&p.health.status, diskHealthOK, diskHealthFaulty) {
logger.LogAlwaysIf(ctx, fmt.Errorf("node(%s): taking drive %s offline: %v", globalLocalNodeName, p.storage.String(), err))
go p.monitorDiskStatus()
}
}
// Offset checks a bit.
time.Sleep(time.Duration(rng.Int63n(int64(1 * time.Second))))
done := make(chan struct{})
started := time.Now()
go func() {
timeout := time.NewTimer(timeoutOperation)
select {
case <-timeout.C:
spent := time.Since(started)
goOffline(fmt.Errorf("unable to write+read for %v", spent.Round(time.Millisecond)))
case <-done:
if !timeout.Stop() {
<-timeout.C
}
}
}()
func() {
defer close(done)
err := p.storage.WriteAll(ctx, minioMetaTmpBucket, fn, toWrite)
if err != nil {
if osErrToFileErr(err) == errFaultyDisk {
goOffline(fmt.Errorf("unable to write: %w", err))
}
return
}
b, err := p.storage.ReadAll(context.Background(), minioMetaTmpBucket, fn)
if err != nil || len(b) != len(toWrite) {
if osErrToFileErr(err) == errFaultyDisk {
goOffline(fmt.Errorf("unable to read: %w", err))
}
return
}
}()
}
}
// diskHealthCheckOK will check if the provided error is nil
// and update disk status if good.
// For convenience a bool is returned to indicate any error state

View File

@ -130,7 +130,7 @@ func newXLStorageTestSetup(tb testing.TB) (*xlStorageDiskIDCheck, string, error)
return nil, "", err
}
disk := newXLStorageDiskIDCheck(storage)
disk := newXLStorageDiskIDCheck(storage, false)
disk.SetDiskID("da017d62-70e3-45f1-8a1a-587707e69ad1")
return disk, diskPath, nil
}