Implement an offline mode for a distributed node (#4646)

Implement an offline mode for remote storage to cache the
offline status of a node in order to prevent network calls
that are bound to fail. After a time interval an attempt
will be made to restore the connection and mark the node
as online if successful.

Fixes #4183
This commit is contained in:
Frank Wessels 2017-08-11 11:38:46 -07:00 committed by Harshavardhana
parent 1978b9d8f9
commit 98b62cbec8
6 changed files with 227 additions and 125 deletions

View File

@ -341,6 +341,7 @@ func waitForFormatXLDisks(firstDisk bool, endpoints EndpointList, storageDisks [
maxRetryAttempts: globalStorageInitRetryThreshold, maxRetryAttempts: globalStorageInitRetryThreshold,
retryUnit: time.Second, retryUnit: time.Second,
retryCap: time.Second * 30, // 30 seconds. retryCap: time.Second * 30, // 30 seconds.
offlineTimestamp: UTCNow(),
} }
} }
@ -361,6 +362,7 @@ func waitForFormatXLDisks(firstDisk bool, endpoints EndpointList, storageDisks [
maxRetryAttempts: globalStorageRetryThreshold, maxRetryAttempts: globalStorageRetryThreshold,
retryUnit: time.Millisecond, retryUnit: time.Millisecond,
retryCap: time.Millisecond * 5, // 5 milliseconds. retryCap: time.Millisecond * 5, // 5 milliseconds.
offlineTimestamp: UTCNow(), // Set timestamp to prevent immediate marking as offline
} }
} }

View File

@ -30,210 +30,275 @@ const (
// Attempt to retry only this many number of times before // Attempt to retry only this many number of times before
// giving up on the remote disk entirely after initialization. // giving up on the remote disk entirely after initialization.
globalStorageRetryThreshold = 1 globalStorageRetryThreshold = 1
// Interval to check health status of a node whether it has
// come back up online
globalStorageHealthCheckInterval = 5 * time.Minute
) )
// Converts rpc.ServerError to underlying error. This function is
// written so that the storageAPI errors are consistent across network
// disks as well.
func retryToStorageErr(err error) error {
if err == errDiskNotFoundFromNetError || err == errDiskNotFoundFromRPCShutdown {
return errDiskNotFound
}
return err
}
// Retry storage is an instance of StorageAPI which // Retry storage is an instance of StorageAPI which
// additionally verifies upon network shutdown if the // additionally verifies upon network shutdown if the
// underlying storage is available and is really // underlying storage is available and is really
// formatted. // formatted. After the initialization phase it will
// also cache when the underlying storage is offline
// to prevent needless calls and recheck the health of
// underlying storage in regular intervals.
type retryStorage struct { type retryStorage struct {
remoteStorage StorageAPI remoteStorage StorageAPI
maxRetryAttempts int maxRetryAttempts int
retryUnit time.Duration retryUnit time.Duration
retryCap time.Duration retryCap time.Duration
offline bool // Mark whether node is offline
offlineTimestamp time.Time // Last timestamp of checking status of node
} }
// String representation of remoteStorage. // String representation of remoteStorage.
func (f retryStorage) String() string { func (f *retryStorage) String() string {
return f.remoteStorage.String() return f.remoteStorage.String()
} }
// Reconncts to underlying remote storage. // Reconnects to underlying remote storage.
func (f retryStorage) Init() (err error) { func (f *retryStorage) Init() (err error) {
return f.remoteStorage.Init() return retryToStorageErr(f.remoteStorage.Init())
} }
// Closes the underlying remote storage connection. // Closes the underlying remote storage connection.
func (f retryStorage) Close() (err error) { func (f *retryStorage) Close() (err error) {
return f.remoteStorage.Close() return retryToStorageErr(f.remoteStorage.Close())
}
// Return whether the underlying remote storage is offline
// and, if so, try to reconnect at regular intervals to
// restore the connection
func (f *retryStorage) IsOffline() bool {
// Check if offline and whether enough time has lapsed since most recent check
if f.offline && UTCNow().Sub(f.offlineTimestamp) >= globalStorageHealthCheckInterval {
f.offlineTimestamp = UTCNow() // reset timestamp
if e := f.reInit(nil); e == nil {
// Connection has been re-established
f.offline = false // Mark node as back online
}
}
return f.offline
} }
// DiskInfo - a retryable implementation of disk info. // DiskInfo - a retryable implementation of disk info.
func (f retryStorage) DiskInfo() (info disk.Info, err error) { func (f *retryStorage) DiskInfo() (info disk.Info, err error) {
if f.IsOffline() {
return info, errDiskNotFound
}
info, err = f.remoteStorage.DiskInfo() info, err = f.remoteStorage.DiskInfo()
if err == errDiskNotFound { if f.reInitUponDiskNotFound(err) {
err = f.reInit() info, err = f.remoteStorage.DiskInfo()
if err == nil { return info, retryToStorageErr(err)
return f.remoteStorage.DiskInfo()
} }
} return info, retryToStorageErr(err)
return info, err
} }
// MakeVol - a retryable implementation of creating a volume. // MakeVol - a retryable implementation of creating a volume.
func (f retryStorage) MakeVol(volume string) (err error) { func (f *retryStorage) MakeVol(volume string) (err error) {
if f.IsOffline() {
return errDiskNotFound
}
err = f.remoteStorage.MakeVol(volume) err = f.remoteStorage.MakeVol(volume)
if err == errDiskNotFound { if f.reInitUponDiskNotFound(err) {
err = f.reInit() return retryToStorageErr(f.remoteStorage.MakeVol(volume))
if err == nil {
return f.remoteStorage.MakeVol(volume)
} }
} return retryToStorageErr(err)
return err
} }
// ListVols - a retryable implementation of listing all the volumes. // ListVols - a retryable implementation of listing all the volumes.
func (f retryStorage) ListVols() (vols []VolInfo, err error) { func (f *retryStorage) ListVols() (vols []VolInfo, err error) {
if f.IsOffline() {
return vols, errDiskNotFound
}
vols, err = f.remoteStorage.ListVols() vols, err = f.remoteStorage.ListVols()
if err == errDiskNotFound { if f.reInitUponDiskNotFound(err) {
err = f.reInit() vols, err = f.remoteStorage.ListVols()
if err == nil { return vols, retryToStorageErr(err)
return f.remoteStorage.ListVols()
} }
} return vols, retryToStorageErr(err)
return vols, err
} }
// StatVol - a retryable implementation of stating a volume. // StatVol - a retryable implementation of stating a volume.
func (f retryStorage) StatVol(volume string) (vol VolInfo, err error) { func (f *retryStorage) StatVol(volume string) (vol VolInfo, err error) {
if f.IsOffline() {
return vol, errDiskNotFound
}
vol, err = f.remoteStorage.StatVol(volume) vol, err = f.remoteStorage.StatVol(volume)
if err == errDiskNotFound { if f.reInitUponDiskNotFound(err) {
err = f.reInit() vol, err = f.remoteStorage.StatVol(volume)
if err == nil { return vol, retryToStorageErr(err)
return f.remoteStorage.StatVol(volume)
} }
} return vol, retryToStorageErr(err)
return vol, err
} }
// DeleteVol - a retryable implementation of deleting a volume. // DeleteVol - a retryable implementation of deleting a volume.
func (f retryStorage) DeleteVol(volume string) (err error) { func (f *retryStorage) DeleteVol(volume string) (err error) {
if f.IsOffline() {
return errDiskNotFound
}
err = f.remoteStorage.DeleteVol(volume) err = f.remoteStorage.DeleteVol(volume)
if err == errDiskNotFound { if f.reInitUponDiskNotFound(err) {
err = f.reInit() return retryToStorageErr(f.remoteStorage.DeleteVol(volume))
if err == nil {
return f.remoteStorage.DeleteVol(volume)
} }
} return retryToStorageErr(err)
return err
} }
// PrepareFile - a retryable implementation of preparing a file. // PrepareFile - a retryable implementation of preparing a file.
func (f retryStorage) PrepareFile(volume, path string, length int64) (err error) { func (f *retryStorage) PrepareFile(volume, path string, length int64) (err error) {
if f.IsOffline() {
return errDiskNotFound
}
err = f.remoteStorage.PrepareFile(volume, path, length) err = f.remoteStorage.PrepareFile(volume, path, length)
if err == errDiskNotFound { if f.reInitUponDiskNotFound(err) {
err = f.reInit() return retryToStorageErr(f.remoteStorage.PrepareFile(volume, path, length))
if err == nil {
return f.remoteStorage.PrepareFile(volume, path, length)
} }
} return retryToStorageErr(err)
return err
} }
// AppendFile - a retryable implementation of append to a file. // AppendFile - a retryable implementation of append to a file.
func (f retryStorage) AppendFile(volume, path string, buffer []byte) (err error) { func (f *retryStorage) AppendFile(volume, path string, buffer []byte) (err error) {
if f.IsOffline() {
return errDiskNotFound
}
err = f.remoteStorage.AppendFile(volume, path, buffer) err = f.remoteStorage.AppendFile(volume, path, buffer)
if err == errDiskNotFound { if f.reInitUponDiskNotFound(err) {
err = f.reInit() return retryToStorageErr(f.remoteStorage.AppendFile(volume, path, buffer))
if err == nil {
return f.remoteStorage.AppendFile(volume, path, buffer)
} }
} return retryToStorageErr(err)
return err
} }
// StatFile - a retryable implementation of stating a file. // StatFile - a retryable implementation of stating a file.
func (f retryStorage) StatFile(volume, path string) (fileInfo FileInfo, err error) { func (f *retryStorage) StatFile(volume, path string) (fileInfo FileInfo, err error) {
if f.IsOffline() {
return fileInfo, errDiskNotFound
}
fileInfo, err = f.remoteStorage.StatFile(volume, path) fileInfo, err = f.remoteStorage.StatFile(volume, path)
if err == errDiskNotFound { if f.reInitUponDiskNotFound(err) {
err = f.reInit() fileInfo, err = f.remoteStorage.StatFile(volume, path)
if err == nil { return fileInfo, retryToStorageErr(err)
return f.remoteStorage.StatFile(volume, path)
} }
} return fileInfo, retryToStorageErr(err)
return fileInfo, err
} }
// ReadAll - a retryable implementation of reading all the content from a file. // ReadAll - a retryable implementation of reading all the content from a file.
func (f retryStorage) ReadAll(volume, path string) (buf []byte, err error) { func (f *retryStorage) ReadAll(volume, path string) (buf []byte, err error) {
if f.IsOffline() {
return buf, errDiskNotFound
}
buf, err = f.remoteStorage.ReadAll(volume, path) buf, err = f.remoteStorage.ReadAll(volume, path)
if err == errDiskNotFound { if f.reInitUponDiskNotFound(err) {
err = f.reInit() buf, err = f.remoteStorage.ReadAll(volume, path)
if err == nil { return buf, retryToStorageErr(err)
return f.remoteStorage.ReadAll(volume, path)
} }
} return buf, retryToStorageErr(err)
return buf, err
} }
// ReadFile - a retryable implementation of reading at offset from a file. // ReadFile - a retryable implementation of reading at offset from a file.
func (f retryStorage) ReadFile(volume, path string, offset int64, buffer []byte) (m int64, err error) { func (f *retryStorage) ReadFile(volume, path string, offset int64, buffer []byte) (m int64, err error) {
if f.IsOffline() {
return m, errDiskNotFound
}
m, err = f.remoteStorage.ReadFile(volume, path, offset, buffer) m, err = f.remoteStorage.ReadFile(volume, path, offset, buffer)
if err == errDiskNotFound { if f.reInitUponDiskNotFound(err) {
err = f.reInit() m, err = f.remoteStorage.ReadFile(volume, path, offset, buffer)
if err == nil { return m, retryToStorageErr(err)
return f.remoteStorage.ReadFile(volume, path, offset, buffer)
} }
} return m, retryToStorageErr(err)
return m, err
} }
// ReadFileWithVerify - a retryable implementation of reading at // ReadFileWithVerify - a retryable implementation of reading at
// offset from a file with verification. // offset from a file with verification.
func (f retryStorage) ReadFileWithVerify(volume, path string, offset int64, buffer []byte, func (f *retryStorage) ReadFileWithVerify(volume, path string, offset int64, buffer []byte,
algo HashAlgo, expectedHash string) (m int64, err error) { algo HashAlgo, expectedHash string) (m int64, err error) {
if f.IsOffline() {
return m, errDiskNotFound
}
m, err = f.remoteStorage.ReadFileWithVerify(volume, path, offset, buffer, m, err = f.remoteStorage.ReadFileWithVerify(volume, path, offset, buffer,
algo, expectedHash) algo, expectedHash)
if err == errDiskNotFound { if f.reInitUponDiskNotFound(err) {
err = f.reInit() m, err = f.remoteStorage.ReadFileWithVerify(volume, path,
if err == nil {
return f.remoteStorage.ReadFileWithVerify(volume, path,
offset, buffer, algo, expectedHash) offset, buffer, algo, expectedHash)
return m, retryToStorageErr(err)
} }
} return m, retryToStorageErr(err)
return m, err
} }
// ListDir - a retryable implementation of listing directory entries. // ListDir - a retryable implementation of listing directory entries.
func (f retryStorage) ListDir(volume, path string) (entries []string, err error) { func (f *retryStorage) ListDir(volume, path string) (entries []string, err error) {
if f.IsOffline() {
return entries, errDiskNotFound
}
entries, err = f.remoteStorage.ListDir(volume, path) entries, err = f.remoteStorage.ListDir(volume, path)
if err == errDiskNotFound { if f.reInitUponDiskNotFound(err) {
err = f.reInit() entries, err = f.remoteStorage.ListDir(volume, path)
if err == nil { return entries, retryToStorageErr(err)
return f.remoteStorage.ListDir(volume, path)
} }
} return entries, retryToStorageErr(err)
return entries, err
} }
// DeleteFile - a retryable implementation of deleting a file. // DeleteFile - a retryable implementation of deleting a file.
func (f retryStorage) DeleteFile(volume, path string) (err error) { func (f *retryStorage) DeleteFile(volume, path string) (err error) {
if f.IsOffline() {
return errDiskNotFound
}
err = f.remoteStorage.DeleteFile(volume, path) err = f.remoteStorage.DeleteFile(volume, path)
if err == errDiskNotFound { if f.reInitUponDiskNotFound(err) {
err = f.reInit() return retryToStorageErr(f.remoteStorage.DeleteFile(volume, path))
if err == nil {
return f.remoteStorage.DeleteFile(volume, path)
} }
} return retryToStorageErr(err)
return err
} }
// RenameFile - a retryable implementation of renaming a file. // RenameFile - a retryable implementation of renaming a file.
func (f retryStorage) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err error) { func (f *retryStorage) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err error) {
if f.IsOffline() {
return errDiskNotFound
}
err = f.remoteStorage.RenameFile(srcVolume, srcPath, dstVolume, dstPath) err = f.remoteStorage.RenameFile(srcVolume, srcPath, dstVolume, dstPath)
if err == errDiskNotFound { if f.reInitUponDiskNotFound(err) {
err = f.reInit() return retryToStorageErr(f.remoteStorage.RenameFile(srcVolume, srcPath, dstVolume, dstPath))
if err == nil {
return f.remoteStorage.RenameFile(srcVolume, srcPath, dstVolume, dstPath)
} }
return retryToStorageErr(err)
}
// Try to reinitialize the connection when we have some form of DiskNotFound error
func (f *retryStorage) reInitUponDiskNotFound(err error) bool {
if err == errDiskNotFound || err == errDiskNotFoundFromNetError || err == errDiskNotFoundFromRPCShutdown {
return f.reInit(err) == nil
} }
return err return false
} }
// Connect and attempt to load the format from a disconnected node, // Connect and attempt to load the format from a disconnected node,
// attempts three times before giving up. // attempts three times before giving up.
func (f retryStorage) reInit() (err error) { func (f *retryStorage) reInit(e error) (err error) {
// Only after initialization and minimum of one interval
// has passed (to prevent marking a node as offline right
// after initialization), check whether node has gone offline
if f.maxRetryAttempts == globalStorageRetryThreshold &&
UTCNow().Sub(f.offlineTimestamp) >= globalStorageHealthCheckInterval {
if e == errDiskNotFoundFromNetError { // Make node offline due to network error
f.offline = true // Marking node offline
f.offlineTimestamp = UTCNow()
return errDiskNotFound
}
// Continue for other errors like RPC shutdown (and retry connection below)
}
// Close the underlying connection. // Close the underlying connection.
f.remoteStorage.Close() // Error here is purposefully ignored. f.remoteStorage.Close() // Error here is purposefully ignored.

View File

@ -20,6 +20,7 @@ import (
"bytes" "bytes"
"crypto/sha256" "crypto/sha256"
"encoding/hex" "encoding/hex"
"errors"
"reflect" "reflect"
"testing" "testing"
"time" "time"
@ -423,3 +424,31 @@ func TestRetryStorage(t *testing.T) {
} }
} }
} }
// Tests reply storage error transformation.
func TestReplyStorageErr(t *testing.T) {
unknownErr := errors.New("Unknown error")
testCases := []struct {
expectedErr error
err error
}{
{
expectedErr: errDiskNotFound,
err: errDiskNotFoundFromNetError,
},
{
expectedErr: errDiskNotFound,
err: errDiskNotFoundFromRPCShutdown,
},
{
expectedErr: unknownErr,
err: unknownErr,
},
}
for i, testCase := range testCases {
resultErr := retryToStorageErr(testCase.err)
if testCase.expectedErr != resultErr {
t.Errorf("Test %d: Expected %s, got %s", i+1, testCase.expectedErr, resultErr)
}
}
}

View File

@ -37,9 +37,15 @@ var errUnformattedDisk = errors.New("unformatted disk found")
// errDiskFull - cannot create volume or files when disk is full. // errDiskFull - cannot create volume or files when disk is full.
var errDiskFull = errors.New("disk path full") var errDiskFull = errors.New("disk path full")
// errDiskNotFount - cannot find the underlying configured disk anymore. // errDiskNotFound - cannot find the underlying configured disk anymore.
var errDiskNotFound = errors.New("disk not found") var errDiskNotFound = errors.New("disk not found")
// errDiskNotFoundFromNetError - cannot find the underlying configured disk anymore due to network error.
var errDiskNotFoundFromNetError = errors.New("disk not found from net error")
// errDiskNotFoundFromShutdown - cannot find the underlying configured disk anymore due to rpc shutdown.
var errDiskNotFoundFromRPCShutdown = errors.New("disk not found from rpc shutdown")
// errFaultyRemoteDisk - remote disk is faulty. // errFaultyRemoteDisk - remote disk is faulty.
var errFaultyRemoteDisk = errors.New("remote disk is faulty") var errFaultyRemoteDisk = errors.New("remote disk is faulty")

View File

@ -45,11 +45,11 @@ func toStorageErr(err error) error {
switch err.(type) { switch err.(type) {
case *net.OpError: case *net.OpError:
return errDiskNotFound return errDiskNotFoundFromNetError
} }
if err == rpc.ErrShutdown { if err == rpc.ErrShutdown {
return errDiskNotFound return errDiskNotFoundFromRPCShutdown
} }
switch err.Error() { switch err.Error() {

View File

@ -127,11 +127,11 @@ func TestStorageErr(t *testing.T) {
err: fmt.Errorf("%s", io.ErrUnexpectedEOF.Error()), err: fmt.Errorf("%s", io.ErrUnexpectedEOF.Error()),
}, },
{ {
expectedErr: errDiskNotFound, expectedErr: errDiskNotFoundFromNetError,
err: &net.OpError{}, err: &net.OpError{},
}, },
{ {
expectedErr: errDiskNotFound, expectedErr: errDiskNotFoundFromRPCShutdown,
err: rpc.ErrShutdown, err: rpc.ErrShutdown,
}, },
{ {