control: Implement service command 'stop,restart,status'. (#2883)

- stop - stops all the servers.
- restart - restart all the servers.
- status - prints status of storage info about the cluster.
This commit is contained in:
Harshavardhana 2016-10-09 23:03:10 -07:00 committed by GitHub
parent 57f75b1d9b
commit 3cfb23750a
21 changed files with 635 additions and 384 deletions

View File

@ -27,7 +27,7 @@ var controlCmd = cli.Command{
Subcommands: []cli.Command{
lockCmd,
healCmd,
shutdownCmd,
serviceCmd,
},
CustomHelpTemplate: `NAME:
{{.Name}} - {{.Usage}}

View File

@ -72,12 +72,17 @@ func TestControlLockMain(t *testing.T) {
}
}
// Test to call shutdownControl() in control-shutdown-main.go
func TestControlShutdownMain(t *testing.T) {
// Test to call serviceControl(stop) in control-service-main.go
func TestControlServiceStopMain(t *testing.T) {
// create cli app for testing
app := cli.NewApp()
app.Commands = []cli.Command{controlCmd}
// Initialize done channel specifically for each tests.
globalServiceDoneCh = make(chan struct{}, 1)
// Initialize signal channel specifically for each tests.
globalServiceSignalCh = make(chan serviceSignal, 1)
// start test server
testServer := StartTestServer(t, "XL")
@ -88,12 +93,95 @@ func TestControlShutdownMain(t *testing.T) {
url := testServer.Server.URL
// create args to call
args := []string{"./minio", "control", "shutdown", url}
args := []string{"./minio", "control", "service", "stop", url}
// run app
err := app.Run(args)
if err != nil {
t.Errorf("Control-Shutdown-Main test failed with - %s", err)
t.Errorf("Control-Service-Stop-Main test failed with - %s", err)
}
}
// Test to call serviceControl(status) in control-service-main.go
func TestControlServiceStatusMain(t *testing.T) {
// create cli app for testing
app := cli.NewApp()
app.Commands = []cli.Command{controlCmd}
// Initialize done channel specifically for each tests.
globalServiceDoneCh = make(chan struct{}, 1)
// Initialize signal channel specifically for each tests.
globalServiceSignalCh = make(chan serviceSignal, 1)
// start test server
testServer := StartTestServer(t, "XL")
// schedule cleanup at the end
defer testServer.Stop()
// fetch http server endpoint
url := testServer.Server.URL
// Create args to call
args := []string{"./minio", "control", "service", "status", url}
// run app
err := app.Run(args)
if err != nil {
t.Errorf("Control-Service-Status-Main test failed with - %s", err)
}
// Create args to call
args = []string{"./minio", "control", "service", "stop", url}
// run app
err = app.Run(args)
if err != nil {
t.Errorf("Control-Service-Stop-Main test failed with - %s", err)
}
}
// Test to call serviceControl(restart) in control-service-main.go
func TestControlServiceRestartMain(t *testing.T) {
// create cli app for testing
app := cli.NewApp()
app.Commands = []cli.Command{controlCmd}
// Initialize done channel specifically for each tests.
globalServiceDoneCh = make(chan struct{}, 1)
// Initialize signal channel specifically for each tests.
globalServiceSignalCh = make(chan serviceSignal, 1)
// start test server
testServer := StartTestServer(t, "XL")
// schedule cleanup at the end
defer testServer.Stop()
// fetch http server endpoint
url := testServer.Server.URL
// Create args to call
args := []string{"./minio", "control", "service", "restart", url}
// run app
err := app.Run(args)
if err != nil {
t.Errorf("Control-Service-Restart-Main test failed with - %s", err)
}
// Initialize done channel specifically for each tests.
globalServiceDoneCh = make(chan struct{}, 1)
// Initialize signal channel specifically for each tests.
globalServiceSignalCh = make(chan serviceSignal, 1)
// Create args to call
args = []string{"./minio", "control", "service", "stop", url}
// run app
err = app.Run(args)
if err != nil {
t.Errorf("Control-Service-Stop-Main test failed with - %s", err)
}
}

View File

@ -0,0 +1,96 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"net/url"
"path"
"github.com/minio/cli"
"github.com/minio/mc/pkg/console"
)
var serviceCmd = cli.Command{
Name: "service",
Usage: "Service command line to manage Minio server.",
Action: serviceControl,
Flags: globalFlags,
CustomHelpTemplate: `NAME:
minio control {{.Name}} - {{.Usage}}
USAGE:
minio control {{.Name}} [status|restart|stop] URL
FLAGS:
{{range .Flags}}{{.}}
{{end}}
EXAMPLES:
1. Prints current status information of the cluster.
$ minio control service status http://10.1.10.92:9000/
2. Restarts the url and all the servers in the cluster.
$ minio control service restart http://localhost:9000/
3. Shuts down the url and all the servers in the cluster.
$ minio control service stop http://localhost:9000/
`,
}
// "minio control service" entry point.
func serviceControl(c *cli.Context) {
if !c.Args().Present() && len(c.Args()) != 2 {
cli.ShowCommandHelpAndExit(c, "service", 1)
}
var signal serviceSignal
switch c.Args().Get(0) {
case "status":
signal = serviceStatus
case "restart":
signal = serviceRestart
case "stop":
signal = serviceStop
default:
fatalIf(errInvalidArgument, "Unsupported signalling requested %s", c.Args().Get(0))
}
parsedURL, err := url.Parse(c.Args().Get(1))
fatalIf(err, "Unable to parse URL %s", c.Args().Get(1))
authCfg := &authConfig{
accessKey: serverConfig.GetCredential().AccessKeyID,
secretKey: serverConfig.GetCredential().SecretAccessKey,
secureConn: parsedURL.Scheme == "https",
address: parsedURL.Host,
path: path.Join(reservedBucket, controlPath),
loginMethod: "Controller.LoginHandler",
}
client := newAuthClient(authCfg)
args := &ServiceArgs{
Signal: signal,
// This is necessary so that the remotes,
// don't end up sending requests back and forth.
Remote: true,
}
reply := &ServiceReply{}
err = client.Call("Controller.ServiceHandler", args, reply)
fatalIf(err, "Service command %s failed for %s", c.Args().Get(0), parsedURL.Host)
if signal == serviceStatus {
console.Println(getStorageInfoMsg(reply.StorageInfo))
}
}

View File

@ -1,79 +0,0 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"net/url"
"path"
"github.com/minio/cli"
)
var shutdownFlags = []cli.Flag{
cli.BoolFlag{
Name: "restart",
Usage: "Restart the server.",
},
}
var shutdownCmd = cli.Command{
Name: "shutdown",
Usage: "Shutdown or restart the server.",
Action: shutdownControl,
Flags: append(shutdownFlags, globalFlags...),
CustomHelpTemplate: `NAME:
minio control {{.Name}} - {{.Usage}}
USAGE:
minio control {{.Name}} http://localhost:9000/
FLAGS:
{{range .Flags}}{{.}}
{{end}}
EXAMPLES:
1. Shutdown the server:
$ minio control shutdown http://localhost:9000/
2. Reboot the server:
$ minio control shutdown --restart http://localhost:9000/
`,
}
// "minio control shutdown" entry point.
func shutdownControl(c *cli.Context) {
if len(c.Args()) != 1 {
cli.ShowCommandHelpAndExit(c, "shutdown", 1)
}
parsedURL, err := url.Parse(c.Args()[0])
fatalIf(err, "Unable to parse URL.")
authCfg := &authConfig{
accessKey: serverConfig.GetCredential().AccessKeyID,
secretKey: serverConfig.GetCredential().SecretAccessKey,
secureConn: parsedURL.Scheme == "https",
address: parsedURL.Host,
path: path.Join(reservedBucket, controlPath),
loginMethod: "Controller.LoginHandler",
}
client := newAuthClient(authCfg)
args := &ShutdownArgs{Restart: c.Bool("restart")}
err = client.Call("Controller.ShutdownHandler", args, &GenericReply{})
errorIf(err, "Shutting down Minio server at %s failed.", parsedURL.Host)
}

View File

@ -16,7 +16,11 @@
package cmd
import "errors"
import (
"errors"
"sync"
"time"
)
// errServerNotInitialized - server not initialized.
var errServerNotInitialized = errors.New("Server not initialized, please try again.")
@ -130,31 +134,76 @@ func (c *controllerAPIHandlers) HealDiskMetadataHandler(args *GenericArgs, reply
return err
}
// ShutdownArgs - argument for Shutdown RPC.
type ShutdownArgs struct {
// ServiceArgs - argument for Service RPC.
type ServiceArgs struct {
// Authentication token generated by Login.
GenericArgs
// Should the server be restarted, all active connections are
// served before server is restarted.
Restart bool
// Represents the type of operation server is requested
// to perform. Currently supported signals are
// stop, restart and status.
Signal serviceSignal
// Make remote calls.
Remote bool
}
// Shutdown - Shutsdown the server.
func (c *controllerAPIHandlers) ShutdownHandler(args *ShutdownArgs, reply *GenericReply) error {
// ServiceReply - represents service operation success info.
type ServiceReply struct {
StorageInfo StorageInfo
}
func (c *controllerAPIHandlers) remoteCall(serviceMethod string, args interface {
SetToken(token string)
SetTimestamp(tstamp time.Time)
}, reply interface{}) {
var wg sync.WaitGroup
for index, clnt := range c.RemoteControllers {
wg.Add(1)
go func(index int, client *AuthRPCClient) {
defer wg.Done()
err := client.Call(serviceMethod, args, reply)
errorIf(err, "Unable to initiate %s", serviceMethod)
}(index, clnt)
}
wg.Wait()
}
// Service - handler for sending service signals across many servers.
func (c *controllerAPIHandlers) ServiceHandler(args *ServiceArgs, reply *ServiceReply) error {
if !isRPCTokenValid(args.Token) {
return errInvalidToken
}
if args.Restart {
globalShutdownSignalCh <- shutdownRestart
} else {
globalShutdownSignalCh <- shutdownHalt
objAPI := c.ObjectAPI()
if objAPI == nil {
return errServerNotInitialized
}
if args.Signal == serviceStatus {
reply.StorageInfo = objAPI.StorageInfo()
return nil
}
switch args.Signal {
case serviceRestart:
if args.Remote {
// Set remote as false for remote calls.
args.Remote = false
// Send remote call to all neighboring peers to restart minio servers.
c.remoteCall("Controller.ServiceHandler", args, reply)
}
globalServiceSignalCh <- serviceRestart
case serviceStop:
if args.Remote {
// Set remote as false for remote calls.
args.Remote = false
// Send remote call to all neighboring peers to stop minio servers.
c.remoteCall("Controller.ServiceHandler", args, reply)
}
globalServiceSignalCh <- serviceStop
}
return nil
}
// LockInfo - RPC control handler for `minio control lock`.
// Returns the info of the locks held in the system.
// LockInfo - RPC control handler for `minio control lock`. Returns the info of the locks held in the system.
func (c *controllerAPIHandlers) LockInfo(arg *GenericArgs, reply *SystemLockState) error {
// obtain the lock state information.
lockInfo, err := generateSystemLockResponse()
@ -162,7 +211,7 @@ func (c *controllerAPIHandlers) LockInfo(arg *GenericArgs, reply *SystemLockStat
if err != nil {
return err
}
// the response containing the lock info.
// The response containing the lock info.
*reply = lockInfo
return nil
}

View File

@ -17,10 +17,14 @@
package cmd
import (
"fmt"
"net/rpc"
"path"
"strings"
"time"
router "github.com/gorilla/mux"
"github.com/minio/minio-go/pkg/set"
)
// Routes paths for "minio control" commands.
@ -28,15 +32,66 @@ const (
controlPath = "/controller"
)
// Initializes remote controller clients for making remote requests.
func initRemoteControllerClients(srvCmdConfig serverCmdConfig) []*AuthRPCClient {
if !srvCmdConfig.isDistXL {
return nil
}
var newExports []string
// Initialize auth rpc clients.
exports := srvCmdConfig.disks
ignoredExports := srvCmdConfig.ignoredDisks
remoteHosts := set.NewStringSet()
// Initialize ignored disks in a new set.
ignoredSet := set.NewStringSet()
if len(ignoredExports) > 0 {
ignoredSet = set.CreateStringSet(ignoredExports...)
}
var authRPCClients []*AuthRPCClient
for _, export := range exports {
if ignoredSet.Contains(export) {
// Ignore initializing ignored export.
continue
}
// Validates if remote disk is local.
if isLocalStorage(export) {
continue
}
newExports = append(newExports, export)
}
for _, export := range newExports {
var host string
if idx := strings.LastIndex(export, ":"); idx != -1 {
host = export[:idx]
}
remoteHosts.Add(fmt.Sprintf("%s:%d", host, globalMinioPort))
}
for host := range remoteHosts {
authRPCClients = append(authRPCClients, newAuthClient(&authConfig{
accessKey: serverConfig.GetCredential().AccessKeyID,
secretKey: serverConfig.GetCredential().SecretAccessKey,
secureConn: isSSL(),
address: host,
path: path.Join(reservedBucket, controlPath),
loginMethod: "Controller.LoginHandler",
}))
}
return authRPCClients
}
// Register controller RPC handlers.
func registerControllerRPCRouter(mux *router.Router, srvCmdConfig serverCmdConfig) {
// Initialize Controller.
// Initialize controller.
ctrlHandlers := &controllerAPIHandlers{
ObjectAPI: newObjectLayerFn,
StorageDisks: srvCmdConfig.storageDisks,
timestamp: time.Now().UTC(),
}
// Initializes remote controller clients.
ctrlHandlers.RemoteControllers = initRemoteControllerClients(srvCmdConfig)
ctrlRPCServer := rpc.NewServer()
ctrlRPCServer.RegisterName("Controller", ctrlHandlers)
@ -48,5 +103,6 @@ func registerControllerRPCRouter(mux *router.Router, srvCmdConfig serverCmdConfi
type controllerAPIHandlers struct {
ObjectAPI func() ObjectLayer
StorageDisks []StorageAPI
RemoteControllers []*AuthRPCClient
timestamp time.Time
}

View File

@ -0,0 +1,91 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import "testing"
// Tests initialization of remote controller clients.
func TestInitRemoteControllerClients(t *testing.T) {
rootPath, err := newTestConfig("us-east-1")
if err != nil {
t.Fatal("Unable to initialize config", err)
}
defer removeAll(rootPath)
testCases := []struct {
srvCmdConfig serverCmdConfig
totalClients int
}{
// Test - 1 no allocation if server config is not distributed XL.
{
srvCmdConfig: serverCmdConfig{
isDistXL: false,
},
totalClients: 0,
},
// Test - 2 two clients allocated with 4 disks with 2 disks on same node each.
{
srvCmdConfig: serverCmdConfig{
isDistXL: true,
disks: []string{
"10.1.10.1:/mnt/disk1",
"10.1.10.1:/mnt/disk2",
"10.1.10.2:/mnt/disk3",
"10.1.10.2:/mnt/disk4",
},
},
totalClients: 2,
},
// Test - 3 4 clients allocated with 4 disks with 1 disk on each node.
{
srvCmdConfig: serverCmdConfig{
isDistXL: true,
disks: []string{
"10.1.10.1:/mnt/disk1",
"10.1.10.2:/mnt/disk2",
"10.1.10.3:/mnt/disk3",
"10.1.10.4:/mnt/disk4",
},
},
totalClients: 4,
},
// Test - 4 2 clients allocated with 4 disks with 1 disk ignored.
{
srvCmdConfig: serverCmdConfig{
isDistXL: true,
disks: []string{
"10.1.10.1:/mnt/disk1",
"10.1.10.2:/mnt/disk2",
"10.1.10.3:/mnt/disk3",
"10.1.10.4:/mnt/disk4",
},
ignoredDisks: []string{
"10.1.10.1:/mnt/disk1",
},
},
totalClients: 3,
},
}
// Evaluate and validate all test cases.
for i, testCase := range testCases {
rclients := initRemoteControllerClients(testCase.srvCmdConfig)
if len(rclients) != testCase.totalClients {
t.Errorf("Test %d, Expected %d, got %d RPC clients.", i+1, testCase.totalClients, len(rclients))
}
}
}

View File

@ -33,6 +33,17 @@ const (
bucketMetaPrefix = "buckets"
)
// Global object layer mutex, used for safely updating object layer.
var globalObjLayerMutex *sync.Mutex
// Global object layer, only accessed by newObjectLayerFn().
var globalObjectAPI ObjectLayer
func init() {
// Initialize this once per server initialization.
globalObjLayerMutex = &sync.Mutex{}
}
// isErrIgnored should we ignore this error?, takes a list of errors which can be ignored.
func isErrIgnored(err error, ignoredErrs []error) bool {
err = errorCause(err)

View File

@ -45,7 +45,8 @@ type StorageInfo struct {
// Following fields are only meaningful if BackendType is XL.
OnlineDisks int // Online disks during server startup.
OfflineDisks int // Offline disks during server startup.
Quorum int // Minimum disks required for successful operations.
ReadQuorum int // Minimum disks required for successful read operations.
WriteQuorum int // Minimum disks required for successful write operations.
}
}

View File

@ -27,18 +27,10 @@ import (
"sort"
"strconv"
"strings"
"sync"
mux "github.com/gorilla/mux"
)
var objLayerMutex *sync.Mutex
var globalObjectAPI ObjectLayer
func init() {
objLayerMutex = &sync.Mutex{}
}
// supportedGetReqParams - supported request parameters for GET presigned request.
var supportedGetReqParams = map[string]string{
"response-expires": "Expires",

View File

@ -25,8 +25,8 @@ import (
)
func newObjectLayerFn() ObjectLayer {
objLayerMutex.Lock()
defer objLayerMutex.Unlock()
globalObjLayerMutex.Lock()
defer globalObjLayerMutex.Unlock()
return globalObjectAPI
}
@ -58,19 +58,6 @@ func newObjectLayer(storageDisks []StorageAPI) (ObjectLayer, error) {
return nil, err
}
if globalShutdownCBs != nil {
// Register the callback that should be called when the process shuts down.
globalShutdownCBs.AddObjectLayerCB(func() errCode {
if objAPI != nil {
if sErr := objAPI.Shutdown(); sErr != nil {
errorIf(err, "Unable to shutdown object API.")
return exitFailure
}
}
return exitSuccess
})
}
// Initialize and load bucket policies.
err = initBucketPolicies(objAPI)
fatalIf(err, "Unable to load all bucket policies.")
@ -85,7 +72,7 @@ func configureServerHandler(srvCmdConfig serverCmdConfig) http.Handler {
mux := router.NewRouter()
// Initialize distributed NS lock.
if isDistributedSetup(srvCmdConfig.disks) {
if srvCmdConfig.isDistXL {
// Register storage rpc router only if its a distributed setup.
registerStorageRPCRouters(mux, srvCmdConfig)

View File

@ -99,6 +99,7 @@ type serverCmdConfig struct {
serverAddr string
disks []string
ignoredDisks []string
isDistXL bool // True only if its distributed XL.
storageDisks []StorageAPI
}
@ -308,10 +309,10 @@ func serverMain(c *cli.Context) {
}
// Server address.
serverAddress := c.String("address")
serverAddr := c.String("address")
// Check if requested port is available.
port := getPort(serverAddress)
port := getPort(serverAddr)
fatalIf(checkPortAvailability(port), "Port unavailable %d", port)
// Saves port in a globally accessible value.
@ -335,69 +336,55 @@ func serverMain(c *cli.Context) {
// First disk argument check if it is local.
firstDisk := isLocalStorage(disks[0])
// Initialize and monitor shutdown signals.
err := initGracefulShutdown(os.Exit)
fatalIf(err, "Unable to initialize graceful shutdown operation")
// Configure server.
srvConfig := serverCmdConfig{
serverAddr: serverAddress,
serverAddr: serverAddr,
disks: disks,
ignoredDisks: ignoredDisks,
storageDisks: storageDisks,
isDistXL: isDistributedSetup(disks),
}
// Configure server.
handler := configureServerHandler(srvConfig)
// Set nodes for dsync for distributed setup.
isDist := isDistributedSetup(disks)
if isDist {
err = initDsyncNodes(disks, port)
fatalIf(err, "Unable to initialize distributed locking")
if srvConfig.isDistXL {
fatalIf(initDsyncNodes(disks, port), "Unable to initialize distributed locking")
}
// Initialize name space lock.
initNSLock(isDist)
initNSLock(srvConfig.isDistXL)
// Initialize a new HTTP server.
apiServer := NewServerMux(serverAddress, handler)
apiServer := NewServerMux(serverAddr, handler)
// Fetch endpoints which we are going to serve from.
endPoints := finalizeEndpoints(tls, &apiServer.Server)
// Register generic callbacks.
globalShutdownCBs.AddGenericCB(func() errCode {
// apiServer.Stop()
return exitSuccess
})
// Start server.
// Configure TLS if certs are available.
wait := make(chan struct{}, 1)
go func(tls bool, wait chan<- struct{}) {
fatalIf(func() error {
defer func() {
wait <- struct{}{}
}()
// Start server, automatically configures TLS if certs are available.
go func(tls bool) {
var lerr error
if tls {
return apiServer.ListenAndServeTLS(mustGetCertFile(), mustGetKeyFile())
} // Fallback to http.
return apiServer.ListenAndServe()
}(), "Failed to start minio server.")
}(tls, wait)
lerr = apiServer.ListenAndServeTLS(mustGetCertFile(), mustGetKeyFile())
} else {
// Fallback to http.
lerr = apiServer.ListenAndServe()
}
fatalIf(lerr, "Failed to start minio server.")
}(tls)
// Wait for formatting of disks.
err = waitForFormatDisks(firstDisk, endPoints[0], storageDisks)
err := waitForFormatDisks(firstDisk, endPoints[0], storageDisks)
fatalIf(err, "formatting storage disks failed")
// Once formatted, initialize object layer.
newObject, err := newObjectLayer(storageDisks)
fatalIf(err, "intializing object layer failed")
objLayerMutex.Lock()
globalObjLayerMutex.Lock()
globalObjectAPI = newObject
objLayerMutex.Unlock()
globalObjLayerMutex.Unlock()
// Initialize a new event notifier.
err = initEventNotifier(newObjectLayerFn())
@ -407,5 +394,5 @@ func serverMain(c *cli.Context) {
printStartupMessage(endPoints)
// Waits on the server.
<-wait
<-globalServiceDoneCh
}

View File

@ -187,6 +187,8 @@ func NewServerMux(addr string, handler http.Handler) *ServerMux {
MaxHeaderBytes: 1 << 20,
},
WaitGroup: &sync.WaitGroup{},
// Wait for 5 seconds for new incoming connnections, otherwise
// forcibly close them during graceful stop or restart.
GracefulTimeout: 5 * time.Second,
}
@ -200,12 +202,7 @@ func NewServerMux(addr string, handler http.Handler) *ServerMux {
// ListenAndServeTLS - similar to the http.Server version. However, it has the
// ability to redirect http requests to the correct HTTPS url if the client
// mistakenly initiates a http connection over the https port
func (m *ServerMux) ListenAndServeTLS(certFile, keyFile string) error {
listener, err := net.Listen("tcp", m.Server.Addr)
if err != nil {
return err
}
func (m *ServerMux) ListenAndServeTLS(certFile, keyFile string) (err error) {
config := &tls.Config{} // Always instantiate.
if config.NextProtos == nil {
config.NextProtos = []string{"http/1.1", "h2"}
@ -216,6 +213,13 @@ func (m *ServerMux) ListenAndServeTLS(certFile, keyFile string) error {
return err
}
go m.handleServiceSignals()
listener, err := net.Listen("tcp", m.Server.Addr)
if err != nil {
return err
}
listenerMux := &ListenerMux{Listener: listener, config: config}
m.mu.Lock()
@ -240,12 +244,20 @@ func (m *ServerMux) ListenAndServeTLS(certFile, keyFile string) error {
// Execute registered handlers
m.Server.Handler.ServeHTTP(w, r)
}
}))
}),
)
if nerr, ok := err.(*net.OpError); ok {
if nerr.Op == "accept" && nerr.Net == "tcp" {
return nil
}
}
return err
}
// ListenAndServe - Same as the http.Server version
func (m *ServerMux) ListenAndServe() error {
go m.handleServiceSignals()
listener, err := net.Listen("tcp", m.Server.Addr)
if err != nil {
return err
@ -257,7 +269,13 @@ func (m *ServerMux) ListenAndServe() error {
m.listener = listenerMux
m.mu.Unlock()
return m.Server.Serve(listenerMux)
err = m.Server.Serve(listenerMux)
if nerr, ok := err.(*net.OpError); ok {
if nerr.Op == "accept" && nerr.Net == "tcp" {
return nil
}
}
return err
}
// Close initiates the graceful shutdown
@ -266,17 +284,18 @@ func (m *ServerMux) Close() error {
if m.closed {
return errors.New("Server has been closed")
}
// Closed completely.
m.closed = true
// Make sure a listener was set
// Close the listener.
if err := m.listener.Close(); err != nil {
return err
}
m.SetKeepAlivesEnabled(false)
for c, st := range m.conns {
// Force close any idle and new connections. Waiting for other connections
// to close on their own (within the timeout period)
for c, st := range m.conns {
if st == http.StateIdle || st == http.StateNew {
c.Close()
}
@ -288,12 +307,15 @@ func (m *ServerMux) Close() error {
c.Close()
}
})
// Wait for graceful timeout of connections.
defer t.Stop()
m.mu.Unlock()
// Block until all connections are closed
m.WaitGroup.Wait()
return nil
}

View File

@ -24,7 +24,6 @@ import (
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"errors"
"fmt"
"io/ioutil"
"math/big"
@ -158,6 +157,11 @@ func TestListenAndServePlain(t *testing.T) {
errc := make(chan error)
once := &sync.Once{}
// Initialize done channel specifically for each tests.
globalServiceDoneCh = make(chan struct{}, 1)
// Initialize signal channel specifically for each tests.
globalServiceSignalCh = make(chan serviceSignal, 1)
// Create ServerMux and when we receive a request we stop waiting
m := NewServerMux(addr, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, "hello")
@ -167,9 +171,6 @@ func TestListenAndServePlain(t *testing.T) {
// ListenAndServe in a goroutine, but we don't know when it's ready
go func() { errc <- m.ListenAndServe() }()
// Make sure we don't block by closing wait after a timeout
tf := time.AfterFunc(time.Millisecond*500, func() { errc <- errors.New("Unable to connect to server") })
wg := &sync.WaitGroup{}
wg.Add(1)
// Keep trying the server until it's accepting connections
@ -184,7 +185,6 @@ func TestListenAndServePlain(t *testing.T) {
}
wg.Done()
tf.Stop() // Cancel the timeout since we made a successful request
}()
wg.Wait()
@ -207,6 +207,9 @@ func TestListenAndServeTLS(t *testing.T) {
errc := make(chan error)
once := &sync.Once{}
// Initialize done channel specifically for each tests.
globalServiceDoneCh = make(chan struct{}, 1)
// Create ServerMux and when we receive a request we stop waiting
m := NewServerMux(addr, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, "hello")
@ -232,11 +235,8 @@ func TestListenAndServeTLS(t *testing.T) {
// ListenAndServe in a goroutine, but we don't know when it's ready
go func() { errc <- m.ListenAndServeTLS(certFile, keyFile) }()
// Make sure we don't block by closing wait after a timeout
tf := time.AfterFunc(time.Millisecond*500, func() { errc <- errors.New("Unable to connect to server") })
wg := &sync.WaitGroup{}
wg.Add(1)
// Keep trying the server until it's accepting connections
go func() {
tr := &http.Transport{
@ -255,7 +255,6 @@ func TestListenAndServeTLS(t *testing.T) {
}
wg.Done()
tf.Stop() // Cancel the timeout since we made a successful request
}()
wg.Wait()

View File

@ -46,7 +46,10 @@ func printStartupMessage(endPoints []string) {
printServerCommonMsg(endPoints)
printCLIAccessMsg(endPoints[0])
printObjectAPIMsg()
printStorageInfo()
objAPI := newObjectLayerFn()
if objAPI != nil {
printStorageInfo(objAPI.StorageInfo())
}
}
// Prints common server startup message. Prints credential, region and browser access.
@ -120,13 +123,12 @@ func printObjectAPIMsg() {
}
// Get formatted disk/storage info message.
func getStorageInfoMsg() string {
storageInfo := newObjectLayerFn().StorageInfo()
func getStorageInfoMsg(storageInfo StorageInfo) string {
msg := fmt.Sprintf("%s %s Free", colorBlue("Drive Capacity:"), humanize.IBytes(uint64(storageInfo.Free)))
diskInfo := fmt.Sprintf(" %d Online, %d Offline. We can withstand [%d] more drive failure(s).",
storageInfo.Backend.OnlineDisks,
storageInfo.Backend.OfflineDisks,
storageInfo.Backend.Quorum,
storageInfo.Backend.ReadQuorum,
)
if storageInfo.Backend.Type == XL {
msg += colorBlue("\nStatus:") + fmt.Sprintf(getFormatStr(len(diskInfo), 8), diskInfo)
@ -135,7 +137,7 @@ func getStorageInfoMsg() string {
}
// Prints startup message of storage capacity and erasure information.
func printStorageInfo() {
func printStorageInfo(storageInfo StorageInfo) {
console.Println()
console.Println(getStorageInfoMsg())
console.Println(getStorageInfoMsg(storageInfo))
}

View File

@ -24,11 +24,11 @@ func TestStorageInfoMsg(t *testing.T) {
if err != nil {
t.Fatal("Unable to initialize XL backend", err)
}
objLayerMutex.Lock()
globalObjLayerMutex.Lock()
globalObjectAPI = obj
objLayerMutex.Unlock()
globalObjLayerMutex.Unlock()
if msg := getStorageInfoMsg(); msg == "" {
if msg := getStorageInfoMsg(obj.StorageInfo()); msg == "" {
t.Fatal("Empty message string is not implemented")
}
}

121
cmd/service.go Normal file
View File

@ -0,0 +1,121 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"os"
"os/exec"
"syscall"
)
// Represents a type of an exit func which will be invoked upon service signal.
type onExitFunc func(err error)
// Represents a type for all the the callback functions invoked upon service signal.
type cleanupOnExitFunc func() error
// Type of service signals currently supported.
type serviceSignal int
const (
serviceStatus = iota // Gets status about the service.
serviceRestart // Restarts the service.
serviceStop // Stops the server.
// Add new service requests here.
)
// Global service signal channel.
var globalServiceSignalCh chan serviceSignal
// Global service done channel.
var globalServiceDoneCh chan struct{}
// Initialize service mutex once.
func init() {
globalServiceDoneCh = make(chan struct{}, 1)
globalServiceSignalCh = make(chan serviceSignal, 1)
}
// restartProcess starts a new process passing it the active fd's. It
// doesn't fork, but starts a new process using the same environment and
// arguments as when it was originally started. This allows for a newly
// deployed binary to be started. It returns the pid of the newly started
// process when successful.
func restartProcess() error {
// Use the original binary location. This works with symlinks such that if
// the file it points to has been changed we will use the updated symlink.
argv0, err := exec.LookPath(os.Args[0])
if err != nil {
return err
}
// Pass on the environment and replace the old count key with the new one.
cmd := exec.Command(argv0, os.Args[1:]...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
return cmd.Start()
}
// Handles all serviceSignal and execute service functions.
func (m *ServerMux) handleServiceSignals() error {
// Custom exit function
runExitFn := func(err error) {
// If global profiler is set stop before we exit.
if globalProfiler != nil {
globalProfiler.Stop()
}
// Call user supplied user exit function
fatalIf(err, "Unable to gracefully complete service operation.")
// We are usually done here, close global service done channel.
globalServiceDoneCh <- struct{}{}
}
// Start listening on service signal. Monitor signals.
trapCh := signalTrap(os.Interrupt, syscall.SIGTERM)
for {
select {
case <-trapCh:
// Initiate graceful stop.
globalServiceSignalCh <- serviceStop
case signal := <-globalServiceSignalCh:
switch signal {
case serviceStatus:
/// We don't do anything for this.
case serviceRestart:
if err := m.Close(); err != nil {
errorIf(err, "Unable to close server gracefully")
}
if err := restartProcess(); err != nil {
errorIf(err, "Unable to restart the server.")
}
runExitFn(nil)
case serviceStop:
if err := m.Close(); err != nil {
errorIf(err, "Unable to close server gracefully")
}
objAPI := newObjectLayerFn()
if objAPI == nil {
// Server not initialized yet, exit happily.
runExitFn(nil)
}
runExitFn(objAPI.Shutdown())
}
}
}
}

View File

@ -189,9 +189,9 @@ func StartTestServer(t TestErrHandler, instanceType string) TestServer {
))
testServer.Obj = objLayer
objLayerMutex.Lock()
globalObjLayerMutex.Lock()
globalObjectAPI = objLayer
objLayerMutex.Unlock()
globalObjLayerMutex.Unlock()
return testServer
}
@ -275,9 +275,9 @@ func StartTestControlRPCServer(t TestErrHandler, instanceType string) TestServer
t.Fatalf("Failed obtaining Temp Backend: <ERROR> %s", err)
}
objLayerMutex.Lock()
globalObjLayerMutex.Lock()
globalObjectAPI = objLayer
objLayerMutex.Unlock()
globalObjLayerMutex.Unlock()
// Run TestServer.
testRPCServer.Server = httptest.NewServer(initTestControlRPCEndPoint(serverCmdConfig{
@ -1488,9 +1488,9 @@ func ExecObjectLayerAPINilTest(t TestErrHandler, bucketName, objectName, instanc
// The API handler gets the referece to the object layer via the global object Layer,
// setting it to `nil` in order test for handlers response for uninitialized object layer.
objLayerMutex.Lock()
globalObjLayerMutex.Lock()
globalObjectAPI = nil
objLayerMutex.Unlock()
globalObjLayerMutex.Unlock()
// call the HTTP handler.
apiRouter.ServeHTTP(rec, req)
@ -1680,9 +1680,9 @@ func registerAPIFunctions(muxRouter *router.Router, objLayer ObjectLayer, apiFun
// All object storage operations are registered as HTTP handlers on `objectAPIHandlers`.
// When the handlers get a HTTP request they use the underlyting ObjectLayer to perform operations.
objLayerMutex.Lock()
globalObjLayerMutex.Lock()
globalObjectAPI = objLayer
objLayerMutex.Unlock()
globalObjLayerMutex.Unlock()
api := objectAPIHandlers{
ObjectAPI: newObjectLayerFn,
@ -1712,9 +1712,9 @@ func initTestAPIEndPoints(objLayer ObjectLayer, apiFunctions []string) http.Hand
// Initialize Web RPC Handlers for testing
func initTestWebRPCEndPoint(objLayer ObjectLayer) http.Handler {
objLayerMutex.Lock()
globalObjLayerMutex.Lock()
globalObjectAPI = objLayer
objLayerMutex.Unlock()
globalObjLayerMutex.Unlock()
// Initialize router.
muxRouter := router.NewRouter()

View File

@ -18,14 +18,6 @@ package cmd
import "errors"
// errCode represents the return status of shutdown functions
type errCode int
const (
exitFailure errCode = -1
exitSuccess errCode = 0
)
// errSyslogNotSupported - this message is only meaningful on windows
var errSyslogNotSupported = errors.New("Syslog logger not supported on windows")
@ -42,7 +34,7 @@ var errInvalidToken = errors.New("Invalid token")
var errInvalidTimestamp = errors.New("Timestamps don't match, server may have restarted.")
// If x-amz-content-sha256 header value mismatches with what we calculate.
var errContentSHA256Mismatch = errors.New("sha256 mismatch")
var errContentSHA256Mismatch = errors.New("Content checksum SHA256 mismatch")
// used when we deal with data larger than expected
var errSizeUnexpected = errors.New("data size larger than expected")
var errSizeUnexpected = errors.New("Data size larger than expected")

View File

@ -19,18 +19,14 @@ package cmd
import (
"encoding/base64"
"encoding/xml"
"errors"
"fmt"
"io"
"net"
"net/http"
"os"
"os/exec"
"path/filepath"
"runtime"
"strings"
"sync"
"syscall"
"encoding/json"
@ -148,95 +144,6 @@ func contains(stringList []string, element string) bool {
return false
}
// Represents a type of an exit func which will be invoked upon shutdown signal.
type onExitFunc func(code int)
// Represents a type for all the the callback functions invoked upon shutdown signal.
type cleanupOnExitFunc func() errCode
// Represents a collection of various callbacks executed upon exit signals.
type shutdownCallbacks struct {
// Protect callbacks list from a concurrent access
*sync.RWMutex
// genericCallbacks - is the list of function callbacks executed one by one
// when a shutdown starts. A callback returns 0 for success and 1 for failure.
// Failure is considered an emergency error that needs an immediate exit
genericCallbacks []cleanupOnExitFunc
// objectLayerCallbacks - contains the list of function callbacks that
// need to be invoked when a shutdown starts. These callbacks will be called before
// the general callback shutdowns
objectLayerCallbacks []cleanupOnExitFunc
}
// globalShutdownCBs stores regular and object storages callbacks
var globalShutdownCBs *shutdownCallbacks
func (s *shutdownCallbacks) RunObjectLayerCBs() errCode {
s.RLock()
defer s.RUnlock()
exitCode := exitSuccess
for _, callback := range s.objectLayerCallbacks {
exitCode = callback()
if exitCode != exitSuccess {
break
}
}
return exitCode
}
func (s *shutdownCallbacks) RunGenericCBs() errCode {
s.RLock()
defer s.RUnlock()
exitCode := exitSuccess
for _, callback := range s.genericCallbacks {
exitCode = callback()
if exitCode != exitSuccess {
break
}
}
return exitCode
}
func (s *shutdownCallbacks) AddObjectLayerCB(callback cleanupOnExitFunc) error {
s.Lock()
defer s.Unlock()
if callback == nil {
return errInvalidArgument
}
s.objectLayerCallbacks = append(s.objectLayerCallbacks, callback)
return nil
}
func (s *shutdownCallbacks) AddGenericCB(callback cleanupOnExitFunc) error {
s.Lock()
defer s.Unlock()
if callback == nil {
return errInvalidArgument
}
s.genericCallbacks = append(s.genericCallbacks, callback)
return nil
}
// Initialize graceful shutdown mechanism.
func initGracefulShutdown(onExitFn onExitFunc) error {
// Validate exit func.
if onExitFn == nil {
return errInvalidArgument
}
globalShutdownCBs = &shutdownCallbacks{
RWMutex: &sync.RWMutex{},
}
// Return start monitor shutdown signal.
return startMonitorShutdownSignal(onExitFn)
}
type shutdownSignal int
const (
shutdownHalt = iota
shutdownRestart
)
// Starts a profiler returns nil if profiler is not enabled, caller needs to handle this.
func startProfiler(profiler string) interface {
Stop()
@ -256,84 +163,11 @@ func startProfiler(profiler string) interface {
}
}
// Global shutdown signal channel.
var globalShutdownSignalCh = make(chan shutdownSignal, 1)
// Global profiler to be used by shutdown go-routine.
// Global profiler to be used by service go-routine.
var globalProfiler interface {
Stop()
}
// Start to monitor shutdownSignal to execute shutdown callbacks
func startMonitorShutdownSignal(onExitFn onExitFunc) error {
// Validate exit func.
if onExitFn == nil {
return errInvalidArgument
}
// Custom exit function
runExitFn := func(exitCode errCode) {
// If global profiler is set stop before we exit.
if globalProfiler != nil {
globalProfiler.Stop()
}
// Call user supplied user exit function
onExitFn(int(exitCode))
}
// Start listening on shutdown signal.
go func() {
defer close(globalShutdownSignalCh)
// Monitor signals.
trapCh := signalTrap(os.Interrupt, syscall.SIGTERM)
for {
select {
case <-trapCh:
// Initiate graceful shutdown.
globalShutdownSignalCh <- shutdownHalt
case signal := <-globalShutdownSignalCh:
// Call all object storage shutdown
// callbacks and exit for emergency
exitCode := globalShutdownCBs.RunObjectLayerCBs()
if exitCode != exitSuccess {
runExitFn(exitCode)
}
exitCode = globalShutdownCBs.RunGenericCBs()
if exitCode != exitSuccess {
runExitFn(exitCode)
}
// All shutdown callbacks ensure that
// the server is safely terminated and
// any concurrent process could be
// started again
if signal == shutdownRestart {
path := os.Args[0]
cmdArgs := os.Args[1:]
cmd := exec.Command(path, cmdArgs...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
err := cmd.Start()
if err != nil {
errorIf(errors.New("Unable to reboot."), err.Error())
}
// Successfully forked.
runExitFn(exitSuccess)
}
// Exit as success if no errors.
runExitFn(exitSuccess)
}
}
}()
// Successfully started routine.
return nil
}
// dump the request into a string in JSON format.
func dumpRequest(r *http.Request) string {
header := cloneHeader(r.Header)

View File

@ -221,11 +221,13 @@ func getStorageInfo(disks []StorageAPI) StorageInfo {
storageInfo.Backend.Type = XL
storageInfo.Backend.OnlineDisks = onlineDisks
storageInfo.Backend.OfflineDisks = offlineDisks
storageInfo.Backend.Quorum = len(disks) / 2
return storageInfo
}
// StorageInfo - returns underlying storage statistics.
func (xl xlObjects) StorageInfo() StorageInfo {
return getStorageInfo(xl.storageDisks)
storageInfo := getStorageInfo(xl.storageDisks)
storageInfo.Backend.ReadQuorum = xl.readQuorum
storageInfo.Backend.WriteQuorum = xl.writeQuorum
return storageInfo
}