mirror of
https://github.com/minio/minio.git
synced 2025-01-22 20:23:14 -05:00
MRF: Better detection of non stable disks (#12252)
MRF does not detect when a node is disconnected and reconnected quickly this change will ensure that MRF is alerted by comparing the last disk reconnection timestamp with the last MRF check time. Signed-off-by: Anis Elleuch <anis@min.io> Co-authored-by: Klaus Post <klauspost@gmail.com>
This commit is contained in:
parent
e84f533c6c
commit
56d4d7b8b1
@ -95,16 +95,24 @@ type erasureSets struct {
|
||||
|
||||
disksStorageInfoCache timedValue
|
||||
|
||||
mrfMU sync.Mutex
|
||||
mrfOperations map[healSource]int
|
||||
mrfMU sync.Mutex
|
||||
mrfOperations map[healSource]int
|
||||
lastConnectDisksOpTime time.Time
|
||||
}
|
||||
|
||||
func isEndpointConnected(diskMap map[string]StorageAPI, endpoint string) bool {
|
||||
// Return false if endpoint is not connected or has been reconnected after last check
|
||||
func isEndpointConnectionStable(diskMap map[string]StorageAPI, endpoint string, lastCheck time.Time) bool {
|
||||
disk := diskMap[endpoint]
|
||||
if disk == nil {
|
||||
return false
|
||||
}
|
||||
return disk.IsOnline()
|
||||
if !disk.IsOnline() {
|
||||
return false
|
||||
}
|
||||
if disk.LastConn().After(lastCheck) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (s *erasureSets) getDiskMap() map[string]StorageAPI {
|
||||
@ -196,6 +204,10 @@ func findDiskIndex(refFormat, format *formatErasureV3) (int, int, error) {
|
||||
// connectDisks - attempt to connect all the endpoints, loads format
|
||||
// and re-arranges the disks in proper position.
|
||||
func (s *erasureSets) connectDisks() {
|
||||
defer func() {
|
||||
s.lastConnectDisksOpTime = time.Now()
|
||||
}()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
var setsJustConnected = make([]bool, s.setCount)
|
||||
diskMap := s.getDiskMap()
|
||||
@ -204,7 +216,7 @@ func (s *erasureSets) connectDisks() {
|
||||
if endpoint.IsLocal {
|
||||
diskPath = endpoint.Path
|
||||
}
|
||||
if isEndpointConnected(diskMap, diskPath) {
|
||||
if isEndpointConnectionStable(diskMap, diskPath, s.lastConnectDisksOpTime) {
|
||||
continue
|
||||
}
|
||||
wg.Add(1)
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
"context"
|
||||
"io"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// naughtyDisk wraps a POSIX disk and returns programmed errors
|
||||
@ -55,6 +56,10 @@ func (d *naughtyDisk) IsOnline() bool {
|
||||
return d.disk.IsOnline()
|
||||
}
|
||||
|
||||
func (d *naughtyDisk) LastConn() time.Time {
|
||||
return d.disk.LastConn()
|
||||
}
|
||||
|
||||
func (d *naughtyDisk) IsLocal() bool {
|
||||
return d.disk.IsLocal()
|
||||
}
|
||||
|
@ -75,6 +75,8 @@ func (n *NetworkError) Unwrap() error {
|
||||
// Client - http based RPC client.
|
||||
type Client struct {
|
||||
connected int32 // ref: https://golang.org/pkg/sync/atomic/#pkg-note-BUG
|
||||
_ int32 // For 64 bits alignment
|
||||
lastConn int64
|
||||
|
||||
// HealthCheckFn is the function set to test for health.
|
||||
// If not set the client will not keep track of health.
|
||||
@ -196,6 +198,7 @@ func NewClient(url *url.URL, tr http.RoundTripper, newAuthToken func(aud string)
|
||||
url: url,
|
||||
newAuthToken: newAuthToken,
|
||||
connected: online,
|
||||
lastConn: time.Now().UnixNano(),
|
||||
MaxErrResponseSize: 4096,
|
||||
HealthCheckInterval: 200 * time.Millisecond,
|
||||
HealthCheckTimeout: time.Second,
|
||||
@ -207,6 +210,11 @@ func (c *Client) IsOnline() bool {
|
||||
return atomic.LoadInt32(&c.connected) == online
|
||||
}
|
||||
|
||||
// LastConn returns when the disk was (re-)connected
|
||||
func (c *Client) LastConn() time.Time {
|
||||
return time.Unix(0, atomic.LoadInt64(&c.lastConn))
|
||||
}
|
||||
|
||||
// MarkOffline - will mark a client as being offline and spawns
|
||||
// a goroutine that will attempt to reconnect if HealthCheckFn is set.
|
||||
// returns true if the node changed state from online to offline
|
||||
@ -223,6 +231,7 @@ func (c *Client) MarkOffline() bool {
|
||||
if c.HealthCheckFn() {
|
||||
if atomic.CompareAndSwapInt32(&c.connected, offline, online) {
|
||||
logger.Info("Client %s online", c.url.String())
|
||||
atomic.StoreInt64(&c.lastConn, time.Now().UnixNano())
|
||||
}
|
||||
return
|
||||
}
|
||||
|
@ -20,6 +20,7 @@ package cmd
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"time"
|
||||
)
|
||||
|
||||
// StorageAPI interface.
|
||||
@ -28,7 +29,9 @@ type StorageAPI interface {
|
||||
String() string
|
||||
|
||||
// Storage operations.
|
||||
IsOnline() bool // Returns true if disk is online.
|
||||
IsOnline() bool // Returns true if disk is online.
|
||||
LastConn() time.Time // Returns the last time this disk (re)-connected
|
||||
|
||||
IsLocal() bool
|
||||
|
||||
Hostname() string // Returns host name if remote host.
|
||||
|
@ -167,6 +167,11 @@ func (client *storageRESTClient) IsOnline() bool {
|
||||
return client.restClient.IsOnline()
|
||||
}
|
||||
|
||||
// LastConn - returns when the disk is seen to be connected the last time
|
||||
func (client *storageRESTClient) LastConn() time.Time {
|
||||
return client.restClient.LastConn()
|
||||
}
|
||||
|
||||
func (client *storageRESTClient) IsLocal() bool {
|
||||
return false
|
||||
}
|
||||
|
@ -138,6 +138,10 @@ func (p *xlStorageDiskIDCheck) IsOnline() bool {
|
||||
return storedDiskID == p.diskID
|
||||
}
|
||||
|
||||
func (p *xlStorageDiskIDCheck) LastConn() time.Time {
|
||||
return p.storage.LastConn()
|
||||
}
|
||||
|
||||
func (p *xlStorageDiskIDCheck) IsLocal() bool {
|
||||
return p.storage.IsLocal()
|
||||
}
|
||||
|
@ -335,6 +335,10 @@ func (s *xlStorage) IsOnline() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (s *xlStorage) LastConn() time.Time {
|
||||
return time.Time{}
|
||||
}
|
||||
|
||||
func (s *xlStorage) IsLocal() bool {
|
||||
return true
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user