From 3cfb23750a3bc7f125bfa1f058da371d240b36f9 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Sun, 9 Oct 2016 23:03:10 -0700 Subject: [PATCH] control: Implement service command 'stop,restart,status'. (#2883) - stop - stops all the servers. - restart - restart all the servers. - status - prints status of storage info about the cluster. --- cmd/control-main.go | 2 +- cmd/control-mains_test.go | 96 ++++++++++++++++++- cmd/control-service-main.go | 96 +++++++++++++++++++ cmd/control-shutdown-main.go | 79 ---------------- cmd/controller-handlers.go | 79 +++++++++++++--- cmd/controller-router.go | 64 ++++++++++++- cmd/controller-router_test.go | 91 ++++++++++++++++++ cmd/object-common.go | 11 +++ cmd/object-datatypes.go | 3 +- cmd/object-handlers.go | 8 -- cmd/routers.go | 19 +--- cmd/server-main.go | 61 +++++------- cmd/server-mux.go | 46 ++++++--- cmd/server-mux_test.go | 17 ++-- cmd/server-startup-msg.go | 14 +-- cmd/server-startup-msg_test.go | 6 +- cmd/service.go | 121 ++++++++++++++++++++++++ cmd/test-utils_test.go | 20 ++-- cmd/typed-errors.go | 12 +-- cmd/utils.go | 168 +-------------------------------- cmd/xl-v1.go | 6 +- 21 files changed, 635 insertions(+), 384 deletions(-) create mode 100644 cmd/control-service-main.go delete mode 100644 cmd/control-shutdown-main.go create mode 100644 cmd/controller-router_test.go create mode 100644 cmd/service.go diff --git a/cmd/control-main.go b/cmd/control-main.go index 90987ab77..c25d49cbf 100644 --- a/cmd/control-main.go +++ b/cmd/control-main.go @@ -27,7 +27,7 @@ var controlCmd = cli.Command{ Subcommands: []cli.Command{ lockCmd, healCmd, - shutdownCmd, + serviceCmd, }, CustomHelpTemplate: `NAME: {{.Name}} - {{.Usage}} diff --git a/cmd/control-mains_test.go b/cmd/control-mains_test.go index 161f5cf43..d8af71574 100644 --- a/cmd/control-mains_test.go +++ b/cmd/control-mains_test.go @@ -72,12 +72,17 @@ func TestControlLockMain(t *testing.T) { } } -// Test to call shutdownControl() in control-shutdown-main.go -func TestControlShutdownMain(t *testing.T) { +// Test to call serviceControl(stop) in control-service-main.go +func TestControlServiceStopMain(t *testing.T) { // create cli app for testing app := cli.NewApp() app.Commands = []cli.Command{controlCmd} + // Initialize done channel specifically for each tests. + globalServiceDoneCh = make(chan struct{}, 1) + // Initialize signal channel specifically for each tests. + globalServiceSignalCh = make(chan serviceSignal, 1) + // start test server testServer := StartTestServer(t, "XL") @@ -88,12 +93,95 @@ func TestControlShutdownMain(t *testing.T) { url := testServer.Server.URL // create args to call - args := []string{"./minio", "control", "shutdown", url} + args := []string{"./minio", "control", "service", "stop", url} // run app err := app.Run(args) if err != nil { - t.Errorf("Control-Shutdown-Main test failed with - %s", err) + t.Errorf("Control-Service-Stop-Main test failed with - %s", err) + } +} + +// Test to call serviceControl(status) in control-service-main.go +func TestControlServiceStatusMain(t *testing.T) { + // create cli app for testing + app := cli.NewApp() + app.Commands = []cli.Command{controlCmd} + + // Initialize done channel specifically for each tests. + globalServiceDoneCh = make(chan struct{}, 1) + // Initialize signal channel specifically for each tests. + globalServiceSignalCh = make(chan serviceSignal, 1) + + // start test server + testServer := StartTestServer(t, "XL") + + // schedule cleanup at the end + defer testServer.Stop() + + // fetch http server endpoint + url := testServer.Server.URL + + // Create args to call + args := []string{"./minio", "control", "service", "status", url} + + // run app + err := app.Run(args) + if err != nil { + t.Errorf("Control-Service-Status-Main test failed with - %s", err) + } + + // Create args to call + args = []string{"./minio", "control", "service", "stop", url} + + // run app + err = app.Run(args) + if err != nil { + t.Errorf("Control-Service-Stop-Main test failed with - %s", err) + } +} + +// Test to call serviceControl(restart) in control-service-main.go +func TestControlServiceRestartMain(t *testing.T) { + // create cli app for testing + app := cli.NewApp() + app.Commands = []cli.Command{controlCmd} + + // Initialize done channel specifically for each tests. + globalServiceDoneCh = make(chan struct{}, 1) + // Initialize signal channel specifically for each tests. + globalServiceSignalCh = make(chan serviceSignal, 1) + + // start test server + testServer := StartTestServer(t, "XL") + + // schedule cleanup at the end + defer testServer.Stop() + + // fetch http server endpoint + url := testServer.Server.URL + + // Create args to call + args := []string{"./minio", "control", "service", "restart", url} + + // run app + err := app.Run(args) + if err != nil { + t.Errorf("Control-Service-Restart-Main test failed with - %s", err) + } + + // Initialize done channel specifically for each tests. + globalServiceDoneCh = make(chan struct{}, 1) + // Initialize signal channel specifically for each tests. + globalServiceSignalCh = make(chan serviceSignal, 1) + + // Create args to call + args = []string{"./minio", "control", "service", "stop", url} + + // run app + err = app.Run(args) + if err != nil { + t.Errorf("Control-Service-Stop-Main test failed with - %s", err) } } diff --git a/cmd/control-service-main.go b/cmd/control-service-main.go new file mode 100644 index 000000000..b1bf51255 --- /dev/null +++ b/cmd/control-service-main.go @@ -0,0 +1,96 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "net/url" + "path" + + "github.com/minio/cli" + "github.com/minio/mc/pkg/console" +) + +var serviceCmd = cli.Command{ + Name: "service", + Usage: "Service command line to manage Minio server.", + Action: serviceControl, + Flags: globalFlags, + CustomHelpTemplate: `NAME: + minio control {{.Name}} - {{.Usage}} + +USAGE: + minio control {{.Name}} [status|restart|stop] URL + +FLAGS: + {{range .Flags}}{{.}} + {{end}} +EXAMPLES: + 1. Prints current status information of the cluster. + $ minio control service status http://10.1.10.92:9000/ + + 2. Restarts the url and all the servers in the cluster. + $ minio control service restart http://localhost:9000/ + + 3. Shuts down the url and all the servers in the cluster. + $ minio control service stop http://localhost:9000/ +`, +} + +// "minio control service" entry point. +func serviceControl(c *cli.Context) { + if !c.Args().Present() && len(c.Args()) != 2 { + cli.ShowCommandHelpAndExit(c, "service", 1) + } + + var signal serviceSignal + switch c.Args().Get(0) { + case "status": + signal = serviceStatus + case "restart": + signal = serviceRestart + case "stop": + signal = serviceStop + default: + fatalIf(errInvalidArgument, "Unsupported signalling requested %s", c.Args().Get(0)) + } + + parsedURL, err := url.Parse(c.Args().Get(1)) + fatalIf(err, "Unable to parse URL %s", c.Args().Get(1)) + + authCfg := &authConfig{ + accessKey: serverConfig.GetCredential().AccessKeyID, + secretKey: serverConfig.GetCredential().SecretAccessKey, + secureConn: parsedURL.Scheme == "https", + address: parsedURL.Host, + path: path.Join(reservedBucket, controlPath), + loginMethod: "Controller.LoginHandler", + } + client := newAuthClient(authCfg) + + args := &ServiceArgs{ + Signal: signal, + // This is necessary so that the remotes, + // don't end up sending requests back and forth. + Remote: true, + } + reply := &ServiceReply{} + err = client.Call("Controller.ServiceHandler", args, reply) + fatalIf(err, "Service command %s failed for %s", c.Args().Get(0), parsedURL.Host) + if signal == serviceStatus { + console.Println(getStorageInfoMsg(reply.StorageInfo)) + } +} diff --git a/cmd/control-shutdown-main.go b/cmd/control-shutdown-main.go deleted file mode 100644 index 806be942a..000000000 --- a/cmd/control-shutdown-main.go +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Minio Cloud Storage, (C) 2016 Minio, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cmd - -import ( - "net/url" - "path" - - "github.com/minio/cli" -) - -var shutdownFlags = []cli.Flag{ - cli.BoolFlag{ - Name: "restart", - Usage: "Restart the server.", - }, -} - -var shutdownCmd = cli.Command{ - Name: "shutdown", - Usage: "Shutdown or restart the server.", - Action: shutdownControl, - Flags: append(shutdownFlags, globalFlags...), - CustomHelpTemplate: `NAME: - minio control {{.Name}} - {{.Usage}} - -USAGE: - minio control {{.Name}} http://localhost:9000/ - -FLAGS: - {{range .Flags}}{{.}} - {{end}} - -EXAMPLES: - 1. Shutdown the server: - $ minio control shutdown http://localhost:9000/ - - 2. Reboot the server: - $ minio control shutdown --restart http://localhost:9000/ -`, -} - -// "minio control shutdown" entry point. -func shutdownControl(c *cli.Context) { - if len(c.Args()) != 1 { - cli.ShowCommandHelpAndExit(c, "shutdown", 1) - } - - parsedURL, err := url.Parse(c.Args()[0]) - fatalIf(err, "Unable to parse URL.") - - authCfg := &authConfig{ - accessKey: serverConfig.GetCredential().AccessKeyID, - secretKey: serverConfig.GetCredential().SecretAccessKey, - secureConn: parsedURL.Scheme == "https", - address: parsedURL.Host, - path: path.Join(reservedBucket, controlPath), - loginMethod: "Controller.LoginHandler", - } - client := newAuthClient(authCfg) - - args := &ShutdownArgs{Restart: c.Bool("restart")} - err = client.Call("Controller.ShutdownHandler", args, &GenericReply{}) - errorIf(err, "Shutting down Minio server at %s failed.", parsedURL.Host) -} diff --git a/cmd/controller-handlers.go b/cmd/controller-handlers.go index 0741046f1..330be1e22 100644 --- a/cmd/controller-handlers.go +++ b/cmd/controller-handlers.go @@ -16,7 +16,11 @@ package cmd -import "errors" +import ( + "errors" + "sync" + "time" +) // errServerNotInitialized - server not initialized. var errServerNotInitialized = errors.New("Server not initialized, please try again.") @@ -130,31 +134,76 @@ func (c *controllerAPIHandlers) HealDiskMetadataHandler(args *GenericArgs, reply return err } -// ShutdownArgs - argument for Shutdown RPC. -type ShutdownArgs struct { +// ServiceArgs - argument for Service RPC. +type ServiceArgs struct { // Authentication token generated by Login. GenericArgs - // Should the server be restarted, all active connections are - // served before server is restarted. - Restart bool + // Represents the type of operation server is requested + // to perform. Currently supported signals are + // stop, restart and status. + Signal serviceSignal + + // Make remote calls. + Remote bool } -// Shutdown - Shutsdown the server. -func (c *controllerAPIHandlers) ShutdownHandler(args *ShutdownArgs, reply *GenericReply) error { +// ServiceReply - represents service operation success info. +type ServiceReply struct { + StorageInfo StorageInfo +} + +func (c *controllerAPIHandlers) remoteCall(serviceMethod string, args interface { + SetToken(token string) + SetTimestamp(tstamp time.Time) +}, reply interface{}) { + var wg sync.WaitGroup + for index, clnt := range c.RemoteControllers { + wg.Add(1) + go func(index int, client *AuthRPCClient) { + defer wg.Done() + err := client.Call(serviceMethod, args, reply) + errorIf(err, "Unable to initiate %s", serviceMethod) + }(index, clnt) + } + wg.Wait() +} + +// Service - handler for sending service signals across many servers. +func (c *controllerAPIHandlers) ServiceHandler(args *ServiceArgs, reply *ServiceReply) error { if !isRPCTokenValid(args.Token) { return errInvalidToken } - if args.Restart { - globalShutdownSignalCh <- shutdownRestart - } else { - globalShutdownSignalCh <- shutdownHalt + objAPI := c.ObjectAPI() + if objAPI == nil { + return errServerNotInitialized + } + if args.Signal == serviceStatus { + reply.StorageInfo = objAPI.StorageInfo() + return nil + } + switch args.Signal { + case serviceRestart: + if args.Remote { + // Set remote as false for remote calls. + args.Remote = false + // Send remote call to all neighboring peers to restart minio servers. + c.remoteCall("Controller.ServiceHandler", args, reply) + } + globalServiceSignalCh <- serviceRestart + case serviceStop: + if args.Remote { + // Set remote as false for remote calls. + args.Remote = false + // Send remote call to all neighboring peers to stop minio servers. + c.remoteCall("Controller.ServiceHandler", args, reply) + } + globalServiceSignalCh <- serviceStop } return nil } -// LockInfo - RPC control handler for `minio control lock`. -// Returns the info of the locks held in the system. +// LockInfo - RPC control handler for `minio control lock`. Returns the info of the locks held in the system. func (c *controllerAPIHandlers) LockInfo(arg *GenericArgs, reply *SystemLockState) error { // obtain the lock state information. lockInfo, err := generateSystemLockResponse() @@ -162,7 +211,7 @@ func (c *controllerAPIHandlers) LockInfo(arg *GenericArgs, reply *SystemLockStat if err != nil { return err } - // the response containing the lock info. + // The response containing the lock info. *reply = lockInfo return nil } diff --git a/cmd/controller-router.go b/cmd/controller-router.go index b7025cfbd..0afdf883b 100644 --- a/cmd/controller-router.go +++ b/cmd/controller-router.go @@ -17,10 +17,14 @@ package cmd import ( + "fmt" "net/rpc" + "path" + "strings" "time" router "github.com/gorilla/mux" + "github.com/minio/minio-go/pkg/set" ) // Routes paths for "minio control" commands. @@ -28,15 +32,66 @@ const ( controlPath = "/controller" ) +// Initializes remote controller clients for making remote requests. +func initRemoteControllerClients(srvCmdConfig serverCmdConfig) []*AuthRPCClient { + if !srvCmdConfig.isDistXL { + return nil + } + var newExports []string + // Initialize auth rpc clients. + exports := srvCmdConfig.disks + ignoredExports := srvCmdConfig.ignoredDisks + remoteHosts := set.NewStringSet() + + // Initialize ignored disks in a new set. + ignoredSet := set.NewStringSet() + if len(ignoredExports) > 0 { + ignoredSet = set.CreateStringSet(ignoredExports...) + } + var authRPCClients []*AuthRPCClient + for _, export := range exports { + if ignoredSet.Contains(export) { + // Ignore initializing ignored export. + continue + } + // Validates if remote disk is local. + if isLocalStorage(export) { + continue + } + newExports = append(newExports, export) + } + for _, export := range newExports { + var host string + if idx := strings.LastIndex(export, ":"); idx != -1 { + host = export[:idx] + } + remoteHosts.Add(fmt.Sprintf("%s:%d", host, globalMinioPort)) + } + for host := range remoteHosts { + authRPCClients = append(authRPCClients, newAuthClient(&authConfig{ + accessKey: serverConfig.GetCredential().AccessKeyID, + secretKey: serverConfig.GetCredential().SecretAccessKey, + secureConn: isSSL(), + address: host, + path: path.Join(reservedBucket, controlPath), + loginMethod: "Controller.LoginHandler", + })) + } + return authRPCClients +} + // Register controller RPC handlers. func registerControllerRPCRouter(mux *router.Router, srvCmdConfig serverCmdConfig) { - // Initialize Controller. + // Initialize controller. ctrlHandlers := &controllerAPIHandlers{ ObjectAPI: newObjectLayerFn, StorageDisks: srvCmdConfig.storageDisks, timestamp: time.Now().UTC(), } + // Initializes remote controller clients. + ctrlHandlers.RemoteControllers = initRemoteControllerClients(srvCmdConfig) + ctrlRPCServer := rpc.NewServer() ctrlRPCServer.RegisterName("Controller", ctrlHandlers) @@ -46,7 +101,8 @@ func registerControllerRPCRouter(mux *router.Router, srvCmdConfig serverCmdConfi // Handler for object healing. type controllerAPIHandlers struct { - ObjectAPI func() ObjectLayer - StorageDisks []StorageAPI - timestamp time.Time + ObjectAPI func() ObjectLayer + StorageDisks []StorageAPI + RemoteControllers []*AuthRPCClient + timestamp time.Time } diff --git a/cmd/controller-router_test.go b/cmd/controller-router_test.go new file mode 100644 index 000000000..24982d943 --- /dev/null +++ b/cmd/controller-router_test.go @@ -0,0 +1,91 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import "testing" + +// Tests initialization of remote controller clients. +func TestInitRemoteControllerClients(t *testing.T) { + rootPath, err := newTestConfig("us-east-1") + if err != nil { + t.Fatal("Unable to initialize config", err) + } + defer removeAll(rootPath) + + testCases := []struct { + srvCmdConfig serverCmdConfig + totalClients int + }{ + // Test - 1 no allocation if server config is not distributed XL. + { + srvCmdConfig: serverCmdConfig{ + isDistXL: false, + }, + totalClients: 0, + }, + // Test - 2 two clients allocated with 4 disks with 2 disks on same node each. + { + srvCmdConfig: serverCmdConfig{ + isDistXL: true, + disks: []string{ + "10.1.10.1:/mnt/disk1", + "10.1.10.1:/mnt/disk2", + "10.1.10.2:/mnt/disk3", + "10.1.10.2:/mnt/disk4", + }, + }, + totalClients: 2, + }, + // Test - 3 4 clients allocated with 4 disks with 1 disk on each node. + { + srvCmdConfig: serverCmdConfig{ + isDistXL: true, + disks: []string{ + "10.1.10.1:/mnt/disk1", + "10.1.10.2:/mnt/disk2", + "10.1.10.3:/mnt/disk3", + "10.1.10.4:/mnt/disk4", + }, + }, + totalClients: 4, + }, + // Test - 4 2 clients allocated with 4 disks with 1 disk ignored. + { + srvCmdConfig: serverCmdConfig{ + isDistXL: true, + disks: []string{ + "10.1.10.1:/mnt/disk1", + "10.1.10.2:/mnt/disk2", + "10.1.10.3:/mnt/disk3", + "10.1.10.4:/mnt/disk4", + }, + ignoredDisks: []string{ + "10.1.10.1:/mnt/disk1", + }, + }, + totalClients: 3, + }, + } + + // Evaluate and validate all test cases. + for i, testCase := range testCases { + rclients := initRemoteControllerClients(testCase.srvCmdConfig) + if len(rclients) != testCase.totalClients { + t.Errorf("Test %d, Expected %d, got %d RPC clients.", i+1, testCase.totalClients, len(rclients)) + } + } +} diff --git a/cmd/object-common.go b/cmd/object-common.go index b109a5e4a..2c076d668 100644 --- a/cmd/object-common.go +++ b/cmd/object-common.go @@ -33,6 +33,17 @@ const ( bucketMetaPrefix = "buckets" ) +// Global object layer mutex, used for safely updating object layer. +var globalObjLayerMutex *sync.Mutex + +// Global object layer, only accessed by newObjectLayerFn(). +var globalObjectAPI ObjectLayer + +func init() { + // Initialize this once per server initialization. + globalObjLayerMutex = &sync.Mutex{} +} + // isErrIgnored should we ignore this error?, takes a list of errors which can be ignored. func isErrIgnored(err error, ignoredErrs []error) bool { err = errorCause(err) diff --git a/cmd/object-datatypes.go b/cmd/object-datatypes.go index d93b2af3f..93b512e81 100644 --- a/cmd/object-datatypes.go +++ b/cmd/object-datatypes.go @@ -45,7 +45,8 @@ type StorageInfo struct { // Following fields are only meaningful if BackendType is XL. OnlineDisks int // Online disks during server startup. OfflineDisks int // Offline disks during server startup. - Quorum int // Minimum disks required for successful operations. + ReadQuorum int // Minimum disks required for successful read operations. + WriteQuorum int // Minimum disks required for successful write operations. } } diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index d4187b08f..008bb199c 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -27,18 +27,10 @@ import ( "sort" "strconv" "strings" - "sync" mux "github.com/gorilla/mux" ) -var objLayerMutex *sync.Mutex -var globalObjectAPI ObjectLayer - -func init() { - objLayerMutex = &sync.Mutex{} -} - // supportedGetReqParams - supported request parameters for GET presigned request. var supportedGetReqParams = map[string]string{ "response-expires": "Expires", diff --git a/cmd/routers.go b/cmd/routers.go index ec4d80903..ebf7bda4e 100644 --- a/cmd/routers.go +++ b/cmd/routers.go @@ -25,8 +25,8 @@ import ( ) func newObjectLayerFn() ObjectLayer { - objLayerMutex.Lock() - defer objLayerMutex.Unlock() + globalObjLayerMutex.Lock() + defer globalObjLayerMutex.Unlock() return globalObjectAPI } @@ -58,19 +58,6 @@ func newObjectLayer(storageDisks []StorageAPI) (ObjectLayer, error) { return nil, err } - if globalShutdownCBs != nil { - // Register the callback that should be called when the process shuts down. - globalShutdownCBs.AddObjectLayerCB(func() errCode { - if objAPI != nil { - if sErr := objAPI.Shutdown(); sErr != nil { - errorIf(err, "Unable to shutdown object API.") - return exitFailure - } - } - return exitSuccess - }) - } - // Initialize and load bucket policies. err = initBucketPolicies(objAPI) fatalIf(err, "Unable to load all bucket policies.") @@ -85,7 +72,7 @@ func configureServerHandler(srvCmdConfig serverCmdConfig) http.Handler { mux := router.NewRouter() // Initialize distributed NS lock. - if isDistributedSetup(srvCmdConfig.disks) { + if srvCmdConfig.isDistXL { // Register storage rpc router only if its a distributed setup. registerStorageRPCRouters(mux, srvCmdConfig) diff --git a/cmd/server-main.go b/cmd/server-main.go index d203ed061..e7de50e3b 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -99,6 +99,7 @@ type serverCmdConfig struct { serverAddr string disks []string ignoredDisks []string + isDistXL bool // True only if its distributed XL. storageDisks []StorageAPI } @@ -308,10 +309,10 @@ func serverMain(c *cli.Context) { } // Server address. - serverAddress := c.String("address") + serverAddr := c.String("address") // Check if requested port is available. - port := getPort(serverAddress) + port := getPort(serverAddr) fatalIf(checkPortAvailability(port), "Port unavailable %d", port) // Saves port in a globally accessible value. @@ -335,69 +336,55 @@ func serverMain(c *cli.Context) { // First disk argument check if it is local. firstDisk := isLocalStorage(disks[0]) - // Initialize and monitor shutdown signals. - err := initGracefulShutdown(os.Exit) - fatalIf(err, "Unable to initialize graceful shutdown operation") - // Configure server. srvConfig := serverCmdConfig{ - serverAddr: serverAddress, + serverAddr: serverAddr, disks: disks, ignoredDisks: ignoredDisks, storageDisks: storageDisks, + isDistXL: isDistributedSetup(disks), } // Configure server. handler := configureServerHandler(srvConfig) // Set nodes for dsync for distributed setup. - isDist := isDistributedSetup(disks) - if isDist { - err = initDsyncNodes(disks, port) - fatalIf(err, "Unable to initialize distributed locking") + if srvConfig.isDistXL { + fatalIf(initDsyncNodes(disks, port), "Unable to initialize distributed locking") } // Initialize name space lock. - initNSLock(isDist) + initNSLock(srvConfig.isDistXL) // Initialize a new HTTP server. - apiServer := NewServerMux(serverAddress, handler) + apiServer := NewServerMux(serverAddr, handler) // Fetch endpoints which we are going to serve from. endPoints := finalizeEndpoints(tls, &apiServer.Server) - // Register generic callbacks. - globalShutdownCBs.AddGenericCB(func() errCode { - // apiServer.Stop() - return exitSuccess - }) - - // Start server. - // Configure TLS if certs are available. - wait := make(chan struct{}, 1) - go func(tls bool, wait chan<- struct{}) { - fatalIf(func() error { - defer func() { - wait <- struct{}{} - }() - if tls { - return apiServer.ListenAndServeTLS(mustGetCertFile(), mustGetKeyFile()) - } // Fallback to http. - return apiServer.ListenAndServe() - }(), "Failed to start minio server.") - }(tls, wait) + // Start server, automatically configures TLS if certs are available. + go func(tls bool) { + var lerr error + if tls { + lerr = apiServer.ListenAndServeTLS(mustGetCertFile(), mustGetKeyFile()) + } else { + // Fallback to http. + lerr = apiServer.ListenAndServe() + } + fatalIf(lerr, "Failed to start minio server.") + }(tls) // Wait for formatting of disks. - err = waitForFormatDisks(firstDisk, endPoints[0], storageDisks) + err := waitForFormatDisks(firstDisk, endPoints[0], storageDisks) fatalIf(err, "formatting storage disks failed") // Once formatted, initialize object layer. newObject, err := newObjectLayer(storageDisks) fatalIf(err, "intializing object layer failed") - objLayerMutex.Lock() + globalObjLayerMutex.Lock() globalObjectAPI = newObject - objLayerMutex.Unlock() + globalObjLayerMutex.Unlock() // Initialize a new event notifier. err = initEventNotifier(newObjectLayerFn()) @@ -407,5 +394,5 @@ func serverMain(c *cli.Context) { printStartupMessage(endPoints) // Waits on the server. - <-wait + <-globalServiceDoneCh } diff --git a/cmd/server-mux.go b/cmd/server-mux.go index e4a4c91a7..9409b95de 100644 --- a/cmd/server-mux.go +++ b/cmd/server-mux.go @@ -186,7 +186,9 @@ func NewServerMux(addr string, handler http.Handler) *ServerMux { Handler: handler, MaxHeaderBytes: 1 << 20, }, - WaitGroup: &sync.WaitGroup{}, + WaitGroup: &sync.WaitGroup{}, + // Wait for 5 seconds for new incoming connnections, otherwise + // forcibly close them during graceful stop or restart. GracefulTimeout: 5 * time.Second, } @@ -200,12 +202,7 @@ func NewServerMux(addr string, handler http.Handler) *ServerMux { // ListenAndServeTLS - similar to the http.Server version. However, it has the // ability to redirect http requests to the correct HTTPS url if the client // mistakenly initiates a http connection over the https port -func (m *ServerMux) ListenAndServeTLS(certFile, keyFile string) error { - listener, err := net.Listen("tcp", m.Server.Addr) - if err != nil { - return err - } - +func (m *ServerMux) ListenAndServeTLS(certFile, keyFile string) (err error) { config := &tls.Config{} // Always instantiate. if config.NextProtos == nil { config.NextProtos = []string{"http/1.1", "h2"} @@ -216,6 +213,13 @@ func (m *ServerMux) ListenAndServeTLS(certFile, keyFile string) error { return err } + go m.handleServiceSignals() + + listener, err := net.Listen("tcp", m.Server.Addr) + if err != nil { + return err + } + listenerMux := &ListenerMux{Listener: listener, config: config} m.mu.Lock() @@ -240,12 +244,20 @@ func (m *ServerMux) ListenAndServeTLS(certFile, keyFile string) error { // Execute registered handlers m.Server.Handler.ServeHTTP(w, r) } - })) + }), + ) + if nerr, ok := err.(*net.OpError); ok { + if nerr.Op == "accept" && nerr.Net == "tcp" { + return nil + } + } return err } // ListenAndServe - Same as the http.Server version func (m *ServerMux) ListenAndServe() error { + go m.handleServiceSignals() + listener, err := net.Listen("tcp", m.Server.Addr) if err != nil { return err @@ -257,7 +269,13 @@ func (m *ServerMux) ListenAndServe() error { m.listener = listenerMux m.mu.Unlock() - return m.Server.Serve(listenerMux) + err = m.Server.Serve(listenerMux) + if nerr, ok := err.(*net.OpError); ok { + if nerr.Op == "accept" && nerr.Net == "tcp" { + return nil + } + } + return err } // Close initiates the graceful shutdown @@ -266,17 +284,18 @@ func (m *ServerMux) Close() error { if m.closed { return errors.New("Server has been closed") } + // Closed completely. m.closed = true - // Make sure a listener was set + // Close the listener. if err := m.listener.Close(); err != nil { return err } m.SetKeepAlivesEnabled(false) + // Force close any idle and new connections. Waiting for other connections + // to close on their own (within the timeout period) for c, st := range m.conns { - // Force close any idle and new connections. Waiting for other connections - // to close on their own (within the timeout period) if st == http.StateIdle || st == http.StateNew { c.Close() } @@ -288,12 +307,15 @@ func (m *ServerMux) Close() error { c.Close() } }) + + // Wait for graceful timeout of connections. defer t.Stop() m.mu.Unlock() // Block until all connections are closed m.WaitGroup.Wait() + return nil } diff --git a/cmd/server-mux_test.go b/cmd/server-mux_test.go index 072068ffa..d209ce587 100644 --- a/cmd/server-mux_test.go +++ b/cmd/server-mux_test.go @@ -24,7 +24,6 @@ import ( "crypto/x509" "crypto/x509/pkix" "encoding/pem" - "errors" "fmt" "io/ioutil" "math/big" @@ -158,6 +157,11 @@ func TestListenAndServePlain(t *testing.T) { errc := make(chan error) once := &sync.Once{} + // Initialize done channel specifically for each tests. + globalServiceDoneCh = make(chan struct{}, 1) + // Initialize signal channel specifically for each tests. + globalServiceSignalCh = make(chan serviceSignal, 1) + // Create ServerMux and when we receive a request we stop waiting m := NewServerMux(addr, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { fmt.Fprint(w, "hello") @@ -167,9 +171,6 @@ func TestListenAndServePlain(t *testing.T) { // ListenAndServe in a goroutine, but we don't know when it's ready go func() { errc <- m.ListenAndServe() }() - // Make sure we don't block by closing wait after a timeout - tf := time.AfterFunc(time.Millisecond*500, func() { errc <- errors.New("Unable to connect to server") }) - wg := &sync.WaitGroup{} wg.Add(1) // Keep trying the server until it's accepting connections @@ -184,7 +185,6 @@ func TestListenAndServePlain(t *testing.T) { } wg.Done() - tf.Stop() // Cancel the timeout since we made a successful request }() wg.Wait() @@ -207,6 +207,9 @@ func TestListenAndServeTLS(t *testing.T) { errc := make(chan error) once := &sync.Once{} + // Initialize done channel specifically for each tests. + globalServiceDoneCh = make(chan struct{}, 1) + // Create ServerMux and when we receive a request we stop waiting m := NewServerMux(addr, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { fmt.Fprint(w, "hello") @@ -232,11 +235,8 @@ func TestListenAndServeTLS(t *testing.T) { // ListenAndServe in a goroutine, but we don't know when it's ready go func() { errc <- m.ListenAndServeTLS(certFile, keyFile) }() - // Make sure we don't block by closing wait after a timeout - tf := time.AfterFunc(time.Millisecond*500, func() { errc <- errors.New("Unable to connect to server") }) wg := &sync.WaitGroup{} wg.Add(1) - // Keep trying the server until it's accepting connections go func() { tr := &http.Transport{ @@ -255,7 +255,6 @@ func TestListenAndServeTLS(t *testing.T) { } wg.Done() - tf.Stop() // Cancel the timeout since we made a successful request }() wg.Wait() diff --git a/cmd/server-startup-msg.go b/cmd/server-startup-msg.go index 5ca03c504..da9250891 100644 --- a/cmd/server-startup-msg.go +++ b/cmd/server-startup-msg.go @@ -46,7 +46,10 @@ func printStartupMessage(endPoints []string) { printServerCommonMsg(endPoints) printCLIAccessMsg(endPoints[0]) printObjectAPIMsg() - printStorageInfo() + objAPI := newObjectLayerFn() + if objAPI != nil { + printStorageInfo(objAPI.StorageInfo()) + } } // Prints common server startup message. Prints credential, region and browser access. @@ -120,13 +123,12 @@ func printObjectAPIMsg() { } // Get formatted disk/storage info message. -func getStorageInfoMsg() string { - storageInfo := newObjectLayerFn().StorageInfo() +func getStorageInfoMsg(storageInfo StorageInfo) string { msg := fmt.Sprintf("%s %s Free", colorBlue("Drive Capacity:"), humanize.IBytes(uint64(storageInfo.Free))) diskInfo := fmt.Sprintf(" %d Online, %d Offline. We can withstand [%d] more drive failure(s).", storageInfo.Backend.OnlineDisks, storageInfo.Backend.OfflineDisks, - storageInfo.Backend.Quorum, + storageInfo.Backend.ReadQuorum, ) if storageInfo.Backend.Type == XL { msg += colorBlue("\nStatus:") + fmt.Sprintf(getFormatStr(len(diskInfo), 8), diskInfo) @@ -135,7 +137,7 @@ func getStorageInfoMsg() string { } // Prints startup message of storage capacity and erasure information. -func printStorageInfo() { +func printStorageInfo(storageInfo StorageInfo) { console.Println() - console.Println(getStorageInfoMsg()) + console.Println(getStorageInfoMsg(storageInfo)) } diff --git a/cmd/server-startup-msg_test.go b/cmd/server-startup-msg_test.go index da15eaf11..37b6b17e6 100644 --- a/cmd/server-startup-msg_test.go +++ b/cmd/server-startup-msg_test.go @@ -24,11 +24,11 @@ func TestStorageInfoMsg(t *testing.T) { if err != nil { t.Fatal("Unable to initialize XL backend", err) } - objLayerMutex.Lock() + globalObjLayerMutex.Lock() globalObjectAPI = obj - objLayerMutex.Unlock() + globalObjLayerMutex.Unlock() - if msg := getStorageInfoMsg(); msg == "" { + if msg := getStorageInfoMsg(obj.StorageInfo()); msg == "" { t.Fatal("Empty message string is not implemented") } } diff --git a/cmd/service.go b/cmd/service.go new file mode 100644 index 000000000..ceb2b23d0 --- /dev/null +++ b/cmd/service.go @@ -0,0 +1,121 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "os" + "os/exec" + "syscall" +) + +// Represents a type of an exit func which will be invoked upon service signal. +type onExitFunc func(err error) + +// Represents a type for all the the callback functions invoked upon service signal. +type cleanupOnExitFunc func() error + +// Type of service signals currently supported. +type serviceSignal int + +const ( + serviceStatus = iota // Gets status about the service. + serviceRestart // Restarts the service. + serviceStop // Stops the server. + // Add new service requests here. +) + +// Global service signal channel. +var globalServiceSignalCh chan serviceSignal + +// Global service done channel. +var globalServiceDoneCh chan struct{} + +// Initialize service mutex once. +func init() { + globalServiceDoneCh = make(chan struct{}, 1) + globalServiceSignalCh = make(chan serviceSignal, 1) +} + +// restartProcess starts a new process passing it the active fd's. It +// doesn't fork, but starts a new process using the same environment and +// arguments as when it was originally started. This allows for a newly +// deployed binary to be started. It returns the pid of the newly started +// process when successful. +func restartProcess() error { + // Use the original binary location. This works with symlinks such that if + // the file it points to has been changed we will use the updated symlink. + argv0, err := exec.LookPath(os.Args[0]) + if err != nil { + return err + } + + // Pass on the environment and replace the old count key with the new one. + cmd := exec.Command(argv0, os.Args[1:]...) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + return cmd.Start() +} + +// Handles all serviceSignal and execute service functions. +func (m *ServerMux) handleServiceSignals() error { + // Custom exit function + runExitFn := func(err error) { + // If global profiler is set stop before we exit. + if globalProfiler != nil { + globalProfiler.Stop() + } + + // Call user supplied user exit function + fatalIf(err, "Unable to gracefully complete service operation.") + + // We are usually done here, close global service done channel. + globalServiceDoneCh <- struct{}{} + } + + // Start listening on service signal. Monitor signals. + trapCh := signalTrap(os.Interrupt, syscall.SIGTERM) + for { + select { + case <-trapCh: + // Initiate graceful stop. + globalServiceSignalCh <- serviceStop + case signal := <-globalServiceSignalCh: + switch signal { + case serviceStatus: + /// We don't do anything for this. + case serviceRestart: + if err := m.Close(); err != nil { + errorIf(err, "Unable to close server gracefully") + } + if err := restartProcess(); err != nil { + errorIf(err, "Unable to restart the server.") + } + runExitFn(nil) + case serviceStop: + if err := m.Close(); err != nil { + errorIf(err, "Unable to close server gracefully") + } + objAPI := newObjectLayerFn() + if objAPI == nil { + // Server not initialized yet, exit happily. + runExitFn(nil) + } + runExitFn(objAPI.Shutdown()) + } + } + } +} diff --git a/cmd/test-utils_test.go b/cmd/test-utils_test.go index dee6e77f5..0dc11467e 100644 --- a/cmd/test-utils_test.go +++ b/cmd/test-utils_test.go @@ -189,9 +189,9 @@ func StartTestServer(t TestErrHandler, instanceType string) TestServer { )) testServer.Obj = objLayer - objLayerMutex.Lock() + globalObjLayerMutex.Lock() globalObjectAPI = objLayer - objLayerMutex.Unlock() + globalObjLayerMutex.Unlock() return testServer } @@ -275,9 +275,9 @@ func StartTestControlRPCServer(t TestErrHandler, instanceType string) TestServer t.Fatalf("Failed obtaining Temp Backend: %s", err) } - objLayerMutex.Lock() + globalObjLayerMutex.Lock() globalObjectAPI = objLayer - objLayerMutex.Unlock() + globalObjLayerMutex.Unlock() // Run TestServer. testRPCServer.Server = httptest.NewServer(initTestControlRPCEndPoint(serverCmdConfig{ @@ -1488,9 +1488,9 @@ func ExecObjectLayerAPINilTest(t TestErrHandler, bucketName, objectName, instanc // The API handler gets the referece to the object layer via the global object Layer, // setting it to `nil` in order test for handlers response for uninitialized object layer. - objLayerMutex.Lock() + globalObjLayerMutex.Lock() globalObjectAPI = nil - objLayerMutex.Unlock() + globalObjLayerMutex.Unlock() // call the HTTP handler. apiRouter.ServeHTTP(rec, req) @@ -1680,9 +1680,9 @@ func registerAPIFunctions(muxRouter *router.Router, objLayer ObjectLayer, apiFun // All object storage operations are registered as HTTP handlers on `objectAPIHandlers`. // When the handlers get a HTTP request they use the underlyting ObjectLayer to perform operations. - objLayerMutex.Lock() + globalObjLayerMutex.Lock() globalObjectAPI = objLayer - objLayerMutex.Unlock() + globalObjLayerMutex.Unlock() api := objectAPIHandlers{ ObjectAPI: newObjectLayerFn, @@ -1712,9 +1712,9 @@ func initTestAPIEndPoints(objLayer ObjectLayer, apiFunctions []string) http.Hand // Initialize Web RPC Handlers for testing func initTestWebRPCEndPoint(objLayer ObjectLayer) http.Handler { - objLayerMutex.Lock() + globalObjLayerMutex.Lock() globalObjectAPI = objLayer - objLayerMutex.Unlock() + globalObjLayerMutex.Unlock() // Initialize router. muxRouter := router.NewRouter() diff --git a/cmd/typed-errors.go b/cmd/typed-errors.go index 6d6225376..df85cc7da 100644 --- a/cmd/typed-errors.go +++ b/cmd/typed-errors.go @@ -18,14 +18,6 @@ package cmd import "errors" -// errCode represents the return status of shutdown functions -type errCode int - -const ( - exitFailure errCode = -1 - exitSuccess errCode = 0 -) - // errSyslogNotSupported - this message is only meaningful on windows var errSyslogNotSupported = errors.New("Syslog logger not supported on windows") @@ -42,7 +34,7 @@ var errInvalidToken = errors.New("Invalid token") var errInvalidTimestamp = errors.New("Timestamps don't match, server may have restarted.") // If x-amz-content-sha256 header value mismatches with what we calculate. -var errContentSHA256Mismatch = errors.New("sha256 mismatch") +var errContentSHA256Mismatch = errors.New("Content checksum SHA256 mismatch") // used when we deal with data larger than expected -var errSizeUnexpected = errors.New("data size larger than expected") +var errSizeUnexpected = errors.New("Data size larger than expected") diff --git a/cmd/utils.go b/cmd/utils.go index 2c289421d..cae58fbaf 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -19,18 +19,14 @@ package cmd import ( "encoding/base64" "encoding/xml" - "errors" "fmt" "io" "net" "net/http" "os" - "os/exec" "path/filepath" "runtime" "strings" - "sync" - "syscall" "encoding/json" @@ -148,95 +144,6 @@ func contains(stringList []string, element string) bool { return false } -// Represents a type of an exit func which will be invoked upon shutdown signal. -type onExitFunc func(code int) - -// Represents a type for all the the callback functions invoked upon shutdown signal. -type cleanupOnExitFunc func() errCode - -// Represents a collection of various callbacks executed upon exit signals. -type shutdownCallbacks struct { - // Protect callbacks list from a concurrent access - *sync.RWMutex - // genericCallbacks - is the list of function callbacks executed one by one - // when a shutdown starts. A callback returns 0 for success and 1 for failure. - // Failure is considered an emergency error that needs an immediate exit - genericCallbacks []cleanupOnExitFunc - // objectLayerCallbacks - contains the list of function callbacks that - // need to be invoked when a shutdown starts. These callbacks will be called before - // the general callback shutdowns - objectLayerCallbacks []cleanupOnExitFunc -} - -// globalShutdownCBs stores regular and object storages callbacks -var globalShutdownCBs *shutdownCallbacks - -func (s *shutdownCallbacks) RunObjectLayerCBs() errCode { - s.RLock() - defer s.RUnlock() - exitCode := exitSuccess - for _, callback := range s.objectLayerCallbacks { - exitCode = callback() - if exitCode != exitSuccess { - break - } - } - return exitCode -} - -func (s *shutdownCallbacks) RunGenericCBs() errCode { - s.RLock() - defer s.RUnlock() - exitCode := exitSuccess - for _, callback := range s.genericCallbacks { - exitCode = callback() - if exitCode != exitSuccess { - break - } - } - return exitCode -} - -func (s *shutdownCallbacks) AddObjectLayerCB(callback cleanupOnExitFunc) error { - s.Lock() - defer s.Unlock() - if callback == nil { - return errInvalidArgument - } - s.objectLayerCallbacks = append(s.objectLayerCallbacks, callback) - return nil -} - -func (s *shutdownCallbacks) AddGenericCB(callback cleanupOnExitFunc) error { - s.Lock() - defer s.Unlock() - if callback == nil { - return errInvalidArgument - } - s.genericCallbacks = append(s.genericCallbacks, callback) - return nil -} - -// Initialize graceful shutdown mechanism. -func initGracefulShutdown(onExitFn onExitFunc) error { - // Validate exit func. - if onExitFn == nil { - return errInvalidArgument - } - globalShutdownCBs = &shutdownCallbacks{ - RWMutex: &sync.RWMutex{}, - } - // Return start monitor shutdown signal. - return startMonitorShutdownSignal(onExitFn) -} - -type shutdownSignal int - -const ( - shutdownHalt = iota - shutdownRestart -) - // Starts a profiler returns nil if profiler is not enabled, caller needs to handle this. func startProfiler(profiler string) interface { Stop() @@ -256,84 +163,11 @@ func startProfiler(profiler string) interface { } } -// Global shutdown signal channel. -var globalShutdownSignalCh = make(chan shutdownSignal, 1) - -// Global profiler to be used by shutdown go-routine. +// Global profiler to be used by service go-routine. var globalProfiler interface { Stop() } -// Start to monitor shutdownSignal to execute shutdown callbacks -func startMonitorShutdownSignal(onExitFn onExitFunc) error { - // Validate exit func. - if onExitFn == nil { - return errInvalidArgument - } - - // Custom exit function - runExitFn := func(exitCode errCode) { - // If global profiler is set stop before we exit. - if globalProfiler != nil { - globalProfiler.Stop() - } - // Call user supplied user exit function - onExitFn(int(exitCode)) - } - - // Start listening on shutdown signal. - go func() { - defer close(globalShutdownSignalCh) - - // Monitor signals. - trapCh := signalTrap(os.Interrupt, syscall.SIGTERM) - for { - select { - case <-trapCh: - // Initiate graceful shutdown. - globalShutdownSignalCh <- shutdownHalt - case signal := <-globalShutdownSignalCh: - // Call all object storage shutdown - // callbacks and exit for emergency - exitCode := globalShutdownCBs.RunObjectLayerCBs() - if exitCode != exitSuccess { - runExitFn(exitCode) - } - - exitCode = globalShutdownCBs.RunGenericCBs() - if exitCode != exitSuccess { - runExitFn(exitCode) - } - - // All shutdown callbacks ensure that - // the server is safely terminated and - // any concurrent process could be - // started again - if signal == shutdownRestart { - path := os.Args[0] - cmdArgs := os.Args[1:] - cmd := exec.Command(path, cmdArgs...) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - - err := cmd.Start() - if err != nil { - errorIf(errors.New("Unable to reboot."), err.Error()) - } - - // Successfully forked. - runExitFn(exitSuccess) - } - - // Exit as success if no errors. - runExitFn(exitSuccess) - } - } - }() - // Successfully started routine. - return nil -} - // dump the request into a string in JSON format. func dumpRequest(r *http.Request) string { header := cloneHeader(r.Header) diff --git a/cmd/xl-v1.go b/cmd/xl-v1.go index f64927b7e..34dc85fea 100644 --- a/cmd/xl-v1.go +++ b/cmd/xl-v1.go @@ -221,11 +221,13 @@ func getStorageInfo(disks []StorageAPI) StorageInfo { storageInfo.Backend.Type = XL storageInfo.Backend.OnlineDisks = onlineDisks storageInfo.Backend.OfflineDisks = offlineDisks - storageInfo.Backend.Quorum = len(disks) / 2 return storageInfo } // StorageInfo - returns underlying storage statistics. func (xl xlObjects) StorageInfo() StorageInfo { - return getStorageInfo(xl.storageDisks) + storageInfo := getStorageInfo(xl.storageDisks) + storageInfo.Backend.ReadQuorum = xl.readQuorum + storageInfo.Backend.WriteQuorum = xl.writeQuorum + return storageInfo }