mirror of
https://github.com/minio/minio.git
synced 2025-01-11 15:03:22 -05:00
Print storage errors during distributed initialization (#6441)
This commit will print connection failures to other disks in other nodes after 5 retries. It is useful for users to understand why the distribued cluster fails to boot up.
This commit is contained in:
parent
12b4971b70
commit
7571582000
@ -52,6 +52,10 @@ func (d *naughtyDisk) IsOnline() bool {
|
||||
return d.disk.IsOnline()
|
||||
}
|
||||
|
||||
func (d *naughtyDisk) LastError() (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *naughtyDisk) Close() (err error) {
|
||||
if err = d.calcError(); err != nil {
|
||||
return err
|
||||
|
@ -274,6 +274,10 @@ func (s *posix) String() string {
|
||||
return s.diskPath
|
||||
}
|
||||
|
||||
func (s *posix) LastError() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *posix) Close() error {
|
||||
close(s.stopUsageCh)
|
||||
s.connected = false
|
||||
|
@ -120,13 +120,25 @@ var errXLV3ThisEmpty = fmt.Errorf("XL format version 3 has This field empty")
|
||||
// connect to list of endpoints and load all XL disk formats, validate the formats are correct
|
||||
// and are in quorum, if no formats are found attempt to initialize all of them for the first
|
||||
// time. additionally make sure to close all the disks used in this attempt.
|
||||
func connectLoadInitFormats(firstDisk bool, endpoints EndpointList, setCount, drivesPerSet int) (*formatXLV3, error) {
|
||||
func connectLoadInitFormats(retryCount int, firstDisk bool, endpoints EndpointList, setCount, drivesPerSet int) (*formatXLV3, error) {
|
||||
// Initialize all storage disks
|
||||
storageDisks, err := initStorageDisks(endpoints)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer closeStorageDisks(storageDisks)
|
||||
|
||||
// Connect to all storage disks, a connection failure will be
|
||||
// only logged after some retries.
|
||||
for _, disk := range storageDisks {
|
||||
if disk != nil {
|
||||
connectErr := disk.LastError()
|
||||
if connectErr != nil && retryCount >= 5 {
|
||||
logger.Info("Unable to connect to %s: %v\n", disk.String(), connectErr.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Attempt to load all `format.json` from all disks.
|
||||
formatConfigs, sErrs := loadFormatXLAll(storageDisks)
|
||||
// Check if we have
|
||||
@ -238,8 +250,8 @@ func waitForFormatXL(ctx context.Context, firstDisk bool, endpoints EndpointList
|
||||
retryTimerCh := newRetryTimerSimple(doneCh)
|
||||
for {
|
||||
select {
|
||||
case _ = <-retryTimerCh:
|
||||
format, err := connectLoadInitFormats(firstDisk, endpoints, setCount, disksPerSet)
|
||||
case retryCount := <-retryTimerCh:
|
||||
format, err := connectLoadInitFormats(retryCount, firstDisk, endpoints, setCount, disksPerSet)
|
||||
if err != nil {
|
||||
switch err {
|
||||
case errNotFirstDisk:
|
||||
|
@ -27,7 +27,9 @@ type StorageAPI interface {
|
||||
|
||||
// Storage operations.
|
||||
IsOnline() bool // Returns true if disk is online.
|
||||
LastError() error
|
||||
Close() error
|
||||
|
||||
DiskInfo() (info DiskInfo, err error)
|
||||
|
||||
// Volume operations.
|
||||
|
@ -104,6 +104,8 @@ func toStorageErr(err error) error {
|
||||
type StorageRPCClient struct {
|
||||
*RPCClient
|
||||
connected bool
|
||||
// Plain error of the last RPC call
|
||||
lastRPCError error
|
||||
}
|
||||
|
||||
// Stringer provides a canonicalized representation of network device.
|
||||
@ -114,6 +116,11 @@ func (client *StorageRPCClient) String() string {
|
||||
return url.String()
|
||||
}
|
||||
|
||||
// LastError - returns the last RPC call result, nil or error if any
|
||||
func (client *StorageRPCClient) LastError() error {
|
||||
return client.lastRPCError
|
||||
}
|
||||
|
||||
// Close - closes underneath RPC client.
|
||||
func (client *StorageRPCClient) Close() error {
|
||||
client.connected = false
|
||||
@ -125,14 +132,22 @@ func (client *StorageRPCClient) IsOnline() bool {
|
||||
return client.connected
|
||||
}
|
||||
|
||||
func (client *StorageRPCClient) connect() {
|
||||
err := client.Call(storageServiceName+".Connect", &AuthArgs{}, &VoidReply{})
|
||||
client.lastRPCError = err
|
||||
client.connected = err == nil
|
||||
}
|
||||
|
||||
func (client *StorageRPCClient) call(handler string, args interface {
|
||||
SetAuthArgs(args AuthArgs)
|
||||
}, reply interface{}) error {
|
||||
|
||||
if !client.connected {
|
||||
return errDiskNotFound
|
||||
}
|
||||
|
||||
err := client.Call(handler, args, reply)
|
||||
client.lastRPCError = err
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
@ -318,6 +333,7 @@ func newStorageRPC(endpoint Endpoint) *StorageRPCClient {
|
||||
logger.FatalIf(err, "Unable to parse storage RPC Host", context.Background())
|
||||
rpcClient, err := NewStorageRPCClient(host, endpoint.Path)
|
||||
logger.FatalIf(err, "Unable to initialize storage RPC client", context.Background())
|
||||
rpcClient.connected = rpcClient.Call(storageServiceName+".Connect", &AuthArgs{}, &VoidReply{}) == nil
|
||||
// Attempt first try connection and save error if any.
|
||||
rpcClient.connect()
|
||||
return rpcClient
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user