Protect shutdown callbacks lists with a mutex (#2432)

This commit is contained in:
Anis Elleuch 2016-08-15 07:55:48 +01:00 committed by Harshavardhana
parent 9606cb9bcd
commit 5526ac13d2
9 changed files with 150 additions and 97 deletions

View File

@ -19,12 +19,5 @@ script:
after_success: after_success:
- bash <(curl -s https://codecov.io/bash) - bash <(curl -s https://codecov.io/bash)
after_success:
- bash <(curl -s https://codecov.io/bash)
go: go:
- 1.6 - 1.6.2
notifications:
slack:
secure: K9tsn5MvrCAxuEZTxn+m3Kq1K2NG2xMEJFSv/sTp+RQBW7TslPHzv859GsIvrm8mU1y1btOU9RlOzqrRUczI5cJpE8IL1oljPZbXrIXgetE0kbsw0Wpy99g27UQ2VGp933WDu8tfj7zU4cZv+BI0RltNLwqYO6GWXmcWP0IueCU=

View File

@ -62,36 +62,6 @@ func loadFormatFS(storageDisk StorageAPI) (format formatConfigV1, err error) {
return format, nil return format, nil
} }
// Should be called when process shuts down.
func shutdownFS(storage StorageAPI) errCode {
// List if there are any multipart entries.
_, err := storage.ListDir(minioMetaBucket, mpartMetaPrefix)
if err != errFileNotFound {
// Multipart directory is not empty hence do not remove '.minio.sys' volume.
return exitSuccess
}
// List if there are any bucket configuration entries.
_, err = storage.ListDir(minioMetaBucket, bucketConfigPrefix)
if err != errFileNotFound {
// Bucket config directory is not empty hence do not remove '.minio.sys' volume.
return exitSuccess
}
// Cleanup everything else.
prefix := ""
if err = cleanupDir(storage, minioMetaBucket, prefix); err != nil {
errorIf(err, "Unable to cleanup minio meta bucket")
return exitFailure
}
if err = storage.DeleteVol(minioMetaBucket); err != nil {
if err != errVolumeNotEmpty {
errorIf(err, "Unable to delete minio meta bucket %s", minioMetaBucket)
return exitFailure
}
}
// Successful exit.
return exitSuccess
}
// newFSObjects - initialize new fs object layer. // newFSObjects - initialize new fs object layer.
func newFSObjects(disk string) (ObjectLayer, error) { func newFSObjects(disk string) (ObjectLayer, error) {
storage, err := newStorageAPI(disk) storage, err := newStorageAPI(disk)
@ -132,11 +102,6 @@ func newFSObjects(disk string) (ObjectLayer, error) {
return nil, errFSDiskFormat return nil, errFSDiskFormat
} }
// Register the callback that should be called when the process shuts down.
registerObjectStorageShutdown(func() errCode {
return shutdownFS(storage)
})
// Initialize fs objects. // Initialize fs objects.
fs := fsObjects{ fs := fsObjects{
storage: storage, storage: storage,
@ -148,6 +113,36 @@ func newFSObjects(disk string) (ObjectLayer, error) {
return fs, nil return fs, nil
} }
// Should be called when process shuts down.
func (fs fsObjects) Shutdown() error {
// List if there are any multipart entries.
_, err := fs.storage.ListDir(minioMetaBucket, mpartMetaPrefix)
if err != errFileNotFound {
// Multipart directory is not empty hence do not remove '.minio.sys' volume.
return nil
}
// List if there are any bucket configuration entries.
_, err = fs.storage.ListDir(minioMetaBucket, bucketConfigPrefix)
if err != errFileNotFound {
// Bucket config directory is not empty hence do not remove '.minio.sys' volume.
return nil
}
// Cleanup everything else.
prefix := ""
if err = cleanupDir(fs.storage, minioMetaBucket, prefix); err != nil {
errorIf(err, "Unable to cleanup minio meta bucket")
return err
}
if err = fs.storage.DeleteVol(minioMetaBucket); err != nil {
if err != errVolumeNotEmpty {
errorIf(err, "Unable to delete minio meta bucket %s", minioMetaBucket)
return err
}
}
// Successful.
return nil
}
// StorageInfo - returns underlying storage statistics. // StorageInfo - returns underlying storage statistics.
func (fs fsObjects) StorageInfo() StorageInfo { func (fs fsObjects) StorageInfo() StorageInfo {
info, err := disk.GetInfo(fs.physicalDisk) info, err := disk.GetInfo(fs.physicalDisk)

View File

@ -158,9 +158,6 @@ func main() {
// Enable all loggers by now. // Enable all loggers by now.
enableLoggers() enableLoggers()
// Initialize name space lock.
initNSLock()
// Set global quiet flag. // Set global quiet flag.
globalQuiet = c.Bool("quiet") || c.GlobalBool("quiet") globalQuiet = c.Bool("quiet") || c.GlobalBool("quiet")
@ -189,10 +186,6 @@ func main() {
defer profile.Start(profile.BlockProfile, profile.ProfilePath(profileDir)).Stop() defer profile.Start(profile.BlockProfile, profile.ProfilePath(profileDir)).Stop()
} }
// Initialize and monitor shutdown signal
shutdownSignal = make(chan bool, 1)
monitorShutdownSignal(os.Exit)
// Run the app - exit on error. // Run the app - exit on error.
app.RunAndExitOnError() app.RunAndExitOnError()
} }

View File

@ -21,6 +21,7 @@ import "io"
// ObjectLayer implements primitives for object API layer. // ObjectLayer implements primitives for object API layer.
type ObjectLayer interface { type ObjectLayer interface {
// Storage operations. // Storage operations.
Shutdown() error
StorageInfo() StorageInfo StorageInfo() StorageInfo
// Bucket operations. // Bucket operations.

View File

@ -42,6 +42,9 @@ func newObjectLayer(disks, ignoredDisks []string) (ObjectLayer, error) {
// configureServer handler returns final handler for the http server. // configureServer handler returns final handler for the http server.
func configureServerHandler(srvCmdConfig serverCmdConfig) http.Handler { func configureServerHandler(srvCmdConfig serverCmdConfig) http.Handler {
// Initialize name space lock.
initNSLock()
objAPI, err := newObjectLayer(srvCmdConfig.disks, srvCmdConfig.ignoredDisks) objAPI, err := newObjectLayer(srvCmdConfig.disks, srvCmdConfig.ignoredDisks)
fatalIf(err, "Unable to intialize object layer.") fatalIf(err, "Unable to intialize object layer.")
@ -66,6 +69,18 @@ func configureServerHandler(srvCmdConfig serverCmdConfig) http.Handler {
ObjectAPI: objAPI, ObjectAPI: objAPI,
} }
// Initialize and monitor shutdown signals.
err = initGracefulShutdown(os.Exit)
fatalIf(err, "Unable to initialize graceful shutdown operation")
// Register the callback that should be called when the process shuts down.
globalShutdownCBs.AddObjectLayerCB(func() errCode {
if sErr := objAPI.Shutdown(); sErr != nil {
return exitFailure
}
return exitSuccess
})
// Initialize a new event notifier. // Initialize a new event notifier.
err = initEventNotifier(objAPI) err = initEventNotifier(objAPI)
fatalIf(err, "Unable to initialize event notification queue") fatalIf(err, "Unable to initialize event notification queue")

View File

@ -259,7 +259,8 @@ func serverMain(c *cli.Context) {
// Prints the formatted startup message. // Prints the formatted startup message.
printStartupMessage(endPoints) printStartupMessage(endPoints)
registerShutdown(func() errCode { // Register generic callbacks.
globalShutdownCBs.AddGenericCB(func() errCode {
// apiServer.Stop() // apiServer.Stop()
return exitSuccess return exitSuccess
}) })

127
utils.go
View File

@ -22,6 +22,7 @@ import (
"io" "io"
"os" "os"
"strings" "strings"
"sync"
"syscall" "syscall"
) )
@ -76,62 +77,112 @@ func contains(stringList []string, element string) bool {
return false return false
} }
// shutdownSignal - is the channel that receives any boolean when // Represents a type of an exit func which will be invoked upon shutdown signal.
// we want broadcast the start of shutdown
var shutdownSignal chan bool
// shutdownCallbacks - is the list of function callbacks executed one by one
// when a shutdown starts. A callback returns 0 for success and 1 for failure.
// Failure is considered an emergency error that needs an immediate exit
var shutdownCallbacks []func() errCode
// shutdownObjectStorageCallbacks - contains the list of function callbacks that
// need to be invoked when a shutdown starts. These callbacks will be called before
// the general callback shutdowns
var shutdownObjectStorageCallbacks []func() errCode
// Register callback functions that need to be called when process terminates.
func registerShutdown(callback func() errCode) {
shutdownCallbacks = append(shutdownCallbacks, callback)
}
// Register object storagecallback functions that need to be called when process terminates.
func registerObjectStorageShutdown(callback func() errCode) {
shutdownObjectStorageCallbacks = append(shutdownObjectStorageCallbacks, callback)
}
// Represents a type of an exit func which will be invoked during shutdown signal.
type onExitFunc func(code int) type onExitFunc func(code int)
// Represents a type for all the the callback functions invoked upon shutdown signal.
type cleanupOnExitFunc func() errCode
// Represents a collection of various callbacks executed upon exit signals.
type shutdownCallbacks struct {
// Protect callbacks list from a concurrent access
*sync.RWMutex
// genericCallbacks - is the list of function callbacks executed one by one
// when a shutdown starts. A callback returns 0 for success and 1 for failure.
// Failure is considered an emergency error that needs an immediate exit
genericCallbacks []cleanupOnExitFunc
// objectLayerCallbacks - contains the list of function callbacks that
// need to be invoked when a shutdown starts. These callbacks will be called before
// the general callback shutdowns
objectLayerCallbacks []cleanupOnExitFunc
}
// globalShutdownCBs stores regular and object storages callbacks
var globalShutdownCBs *shutdownCallbacks
func (s shutdownCallbacks) GetObjectLayerCBs() []cleanupOnExitFunc {
s.RLock()
defer s.RUnlock()
return s.objectLayerCallbacks
}
func (s shutdownCallbacks) GetGenericCBs() []cleanupOnExitFunc {
s.RLock()
defer s.RUnlock()
return s.genericCallbacks
}
func (s *shutdownCallbacks) AddObjectLayerCB(callback cleanupOnExitFunc) error {
s.Lock()
defer s.Unlock()
if callback == nil {
return errInvalidArgument
}
s.objectLayerCallbacks = append(s.objectLayerCallbacks, callback)
return nil
}
func (s *shutdownCallbacks) AddGenericCB(callback cleanupOnExitFunc) error {
s.Lock()
defer s.Unlock()
if callback == nil {
return errInvalidArgument
}
s.genericCallbacks = append(s.genericCallbacks, callback)
return nil
}
// Initialize graceful shutdown mechanism.
func initGracefulShutdown(onExitFn onExitFunc) error {
// Validate exit func.
if onExitFn == nil {
return errInvalidArgument
}
globalShutdownCBs = &shutdownCallbacks{
RWMutex: &sync.RWMutex{},
}
// Return start monitor shutdown signal.
return startMonitorShutdownSignal(onExitFn)
}
// Global shutdown signal channel.
var globalShutdownSignalCh = make(chan struct{})
// Start to monitor shutdownSignal to execute shutdown callbacks // Start to monitor shutdownSignal to execute shutdown callbacks
func monitorShutdownSignal(onExitFn onExitFunc) { func startMonitorShutdownSignal(onExitFn onExitFunc) error {
// Validate exit func.
if onExitFn == nil {
return errInvalidArgument
}
go func() { go func() {
defer close(globalShutdownSignalCh)
// Monitor signals. // Monitor signals.
trapCh := signalTrap(os.Interrupt, syscall.SIGTERM) trapCh := signalTrap(os.Interrupt, syscall.SIGTERM)
for { for {
select { select {
case <-trapCh: case <-trapCh:
// Start a graceful shutdown call // Initiate graceful shutdown.
shutdownSignal <- true globalShutdownSignalCh <- struct{}{}
case <-shutdownSignal: case <-globalShutdownSignalCh:
// Call all callbacks and exit for emergency
for _, callback := range shutdownCallbacks {
exitCode := callback()
if exitCode != exitSuccess {
onExitFn(int(exitCode))
}
}
// Call all object storage shutdown callbacks and exit for emergency // Call all object storage shutdown callbacks and exit for emergency
for _, callback := range shutdownObjectStorageCallbacks { for _, callback := range globalShutdownCBs.GetObjectLayerCBs() {
exitCode := callback() exitCode := callback()
if exitCode != exitSuccess { if exitCode != exitSuccess {
onExitFn(int(exitCode)) onExitFn(int(exitCode))
} }
} }
// Call all callbacks and exit for emergency
for _, callback := range globalShutdownCBs.GetGenericCBs() {
exitCode := callback()
if exitCode != exitSuccess {
onExitFn(int(exitCode))
}
}
onExitFn(int(exitSuccess)) onExitFn(int(exitSuccess))
} }
} }
}() }()
// Successfully started routine.
return nil
} }

View File

@ -20,21 +20,19 @@ import "testing"
// ShutdownCallback simulates a successful and failure exit here. // ShutdownCallback simulates a successful and failure exit here.
func TestShutdownCallbackSuccess(t *testing.T) { func TestShutdownCallbackSuccess(t *testing.T) {
// Register two callbacks that return success // initialize graceful shutdown
registerObjectStorageShutdown(func() errCode {
return exitSuccess
})
registerShutdown(func() errCode {
return exitSuccess
})
shutdownSignal = make(chan bool, 1)
shutdownSignal <- true
// Start executing callbacks and exitFunc receives a success.
dummySuccess := func(code int) { dummySuccess := func(code int) {
if code != int(exitSuccess) { if code != int(exitSuccess) {
t.Fatalf("Expected %d, got %d instead.", code, exitSuccess) t.Fatalf("Expected %d, got %d instead.", code, exitSuccess)
} }
} }
monitorShutdownSignal(dummySuccess) initGracefulShutdown(dummySuccess)
// Register two callbacks that return success
globalShutdownCBs.AddObjectLayerCB(func() errCode {
return exitSuccess
})
globalShutdownCBs.AddGenericCB(func() errCode {
return exitSuccess
})
globalShutdownSignalCh <- struct{}{}
} }

View File

@ -206,6 +206,12 @@ func newXLObjects(disks, ignoredDisks []string) (ObjectLayer, error) {
return xl, nil return xl, nil
} }
// Shutdown function for object storage interface.
func (xl xlObjects) Shutdown() error {
// Add any object layer shutdown activities here.
return nil
}
// byDiskTotal is a collection satisfying sort.Interface. // byDiskTotal is a collection satisfying sort.Interface.
type byDiskTotal []disk.Info type byDiskTotal []disk.Info