Refactor Heal RPC and add Shutdown RPC (#2488)

This commit is contained in:
Anis Elleuch 2016-08-21 20:06:53 +01:00 committed by Harshavardhana
parent 975eb31973
commit 07506358ff
8 changed files with 312 additions and 132 deletions

View File

@ -88,7 +88,7 @@ func (api objectAPIHandlers) PutBucketNotificationHandler(w http.ResponseWriter,
_, err := api.ObjectAPI.GetBucketInfo(bucket) _, err := api.ObjectAPI.GetBucketInfo(bucket)
if err != nil { if err != nil {
errorIf(err, "Unable to bucket info.") errorIf(err, "Unable to find bucket info.")
writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path) writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path)
return return
} }

120
cmd/control-heal-main.go Normal file
View File

@ -0,0 +1,120 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"fmt"
"net/rpc"
"net/url"
"path"
"strings"
"github.com/minio/cli"
)
var healCmd = cli.Command{
Name: "heal",
Usage: "To heal objects.",
Action: healControl,
CustomHelpTemplate: `NAME:
minio control {{.Name}} - {{.Usage}}
USAGE:
minio control {{.Name}}
EAMPLES:
1. Heal an object.
$ minio control {{.Name}} http://localhost:9000/songs/classical/western/piano.mp3
2. Heal all objects in a bucket recursively.
$ minio control {{.Name}} http://localhost:9000/songs
3. Heall all objects with a given prefix recursively.
$ minio control {{.Name}} http://localhost:9000/songs/classical/
`,
}
// "minio control heal" entry point.
func healControl(ctx *cli.Context) {
// Parse bucket and object from url.URL.Path
parseBucketObject := func(path string) (bucketName string, objectName string) {
splits := strings.SplitN(path, string(slashSeparator), 3)
switch len(splits) {
case 0, 1:
bucketName = ""
objectName = ""
case 2:
bucketName = splits[1]
objectName = ""
case 3:
bucketName = splits[1]
objectName = splits[2]
}
return bucketName, objectName
}
if len(ctx.Args()) != 1 {
cli.ShowCommandHelpAndExit(ctx, "heal", 1)
}
parsedURL, err := url.Parse(ctx.Args()[0])
fatalIf(err, "Unable to parse URL")
bucketName, objectName := parseBucketObject(parsedURL.Path)
if bucketName == "" {
cli.ShowCommandHelpAndExit(ctx, "heal", 1)
}
client, err := rpc.DialHTTPPath("tcp", parsedURL.Host, path.Join(reservedBucket, controlPath))
fatalIf(err, "Unable to connect to %s", parsedURL.Host)
// If object does not have trailing "/" then it's an object, hence heal it.
if objectName != "" && !strings.HasSuffix(objectName, slashSeparator) {
fmt.Printf("Healing : /%s/%s", bucketName, objectName)
args := &HealObjectArgs{bucketName, objectName}
reply := &HealObjectReply{}
err = client.Call("Control.HealObject", args, reply)
fatalIf(err, "RPC Control.HealObject call failed")
fmt.Println()
return
}
// Recursively list and heal the objects.
prefix := objectName
marker := ""
for {
args := HealListArgs{bucketName, prefix, marker, "", 1000}
reply := &HealListReply{}
err = client.Call("Control.ListObjectsHeal", args, reply)
fatalIf(err, "RPC Heal.ListObjects call failed")
// Heal the objects returned in the ListObjects reply.
for _, obj := range reply.Objects {
fmt.Printf("Healing : /%s/%s", bucketName, obj)
reply := &HealObjectReply{}
err = client.Call("Control.HealObject", HealObjectArgs{bucketName, obj}, reply)
fatalIf(err, "RPC Heal.HealObject call failed")
fmt.Println()
}
if !reply.IsTruncated {
// End of listing.
break
}
marker = reply.NextMarker
}
}

View File

@ -16,15 +16,7 @@
package cmd package cmd
import ( import "github.com/minio/cli"
"fmt"
"strings"
"net/rpc"
"net/url"
"github.com/minio/cli"
)
// "minio control" command. // "minio control" command.
var controlCmd = cli.Command{ var controlCmd = cli.Command{
@ -33,102 +25,28 @@ var controlCmd = cli.Command{
Action: mainControl, Action: mainControl,
Subcommands: []cli.Command{ Subcommands: []cli.Command{
healCmd, healCmd,
shutdownCmd,
}, },
}
func mainControl(c *cli.Context) {
cli.ShowCommandHelp(c, "")
}
var healCmd = cli.Command{
Name: "heal",
Usage: "To heal objects.",
Action: healControl,
CustomHelpTemplate: `NAME: CustomHelpTemplate: `NAME:
minio control {{.Name}} - {{.Usage}} {{.Name}} - {{.Usage}}
USAGE: USAGE:
minio control {{.Name}} {{.Name}} [FLAGS] COMMAND
EAMPLES: FLAGS:
1. Heal an object. {{range .Flags}}{{.}}
$ minio control {{.Name}} http://localhost:9000/songs/classical/western/piano.mp3 {{end}}
COMMANDS:
2. Heal all objects in a bucket recursively. {{range .Commands}}{{join .Names ", "}}{{ "\t" }}{{.Usage}}
$ minio control {{.Name}} http://localhost:9000/songs {{end}}
3. Heall all objects with a given prefix recursively.
$ minio control {{.Name}} http://localhost:9000/songs/classical/
`, `,
} }
// "minio control heal" entry point. func mainControl(ctx *cli.Context) {
func healControl(c *cli.Context) { if ctx.Args().First() != "" { // command help.
// Parse bucket and object from url.URL.Path cli.ShowCommandHelp(ctx, ctx.Args().First())
parseBucketObject := func(path string) (bucketName string, objectName string) { } else {
splits := strings.SplitN(path, string(slashSeparator), 3) // command with Subcommands is an App.
switch len(splits) { cli.ShowAppHelp(ctx)
case 0, 1:
bucketName = ""
objectName = ""
case 2:
bucketName = splits[1]
objectName = ""
case 3:
bucketName = splits[1]
objectName = splits[2]
}
return bucketName, objectName
}
if len(c.Args()) != 1 {
cli.ShowCommandHelpAndExit(c, "heal", 1)
}
parsedURL, err := url.ParseRequestURI(c.Args()[0])
fatalIf(err, "Unable to parse URL")
bucketName, objectName := parseBucketObject(parsedURL.Path)
if bucketName == "" {
cli.ShowCommandHelpAndExit(c, "heal", 1)
}
client, err := rpc.DialHTTPPath("tcp", parsedURL.Host, healPath)
fatalIf(err, "Unable to connect to %s", parsedURL.Host)
// If object does not have trailing "/" then it's an object, hence heal it.
if objectName != "" && !strings.HasSuffix(objectName, slashSeparator) {
fmt.Printf("Healing : /%s/%s", bucketName, objectName)
args := &HealObjectArgs{bucketName, objectName}
reply := &HealObjectReply{}
err = client.Call("Heal.HealObject", args, reply)
fatalIf(err, "RPC Heal.HealObject call failed")
fmt.Println()
return
}
// Recursively list and heal the objects.
prefix := objectName
marker := ""
for {
args := HealListArgs{bucketName, prefix, marker, "", 1000}
reply := &HealListReply{}
err = client.Call("Heal.ListObjects", args, reply)
fatalIf(err, "RPC Heal.ListObjects call failed")
// Heal the objects returned in the ListObjects reply.
for _, obj := range reply.Objects {
fmt.Printf("Healing : /%s/%s", bucketName, obj)
reply := &HealObjectReply{}
err = client.Call("Heal.HealObject", HealObjectArgs{bucketName, obj}, reply)
fatalIf(err, "RPC Heal.HealObject call failed")
fmt.Println()
}
if !reply.IsTruncated {
// End of listing.
break
}
marker = reply.NextMarker
} }
} }

View File

@ -0,0 +1,68 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"net/rpc"
"net/url"
"path"
"github.com/minio/cli"
)
var shutdownCmd = cli.Command{
Name: "shutdown",
Usage: "Shutdown or restart the server.",
Action: shutdownControl,
Flags: []cli.Flag{
cli.BoolFlag{
Name: "restart",
Usage: "Restart the server.",
},
},
CustomHelpTemplate: `NAME:
minio control {{.Name}} - {{.Usage}}
USAGE:
minio control {{.Name}} http://localhost:9000/
EAMPLES:
1. Shutdown the server:
$ minio control shutdown http://localhost:9000/
2. Reboot the server:
$ minio control shutdown --restart http://localhost:9000/
`,
}
// "minio control shutdown" entry point.
func shutdownControl(c *cli.Context) {
if len(c.Args()) != 1 {
cli.ShowCommandHelpAndExit(c, "shutdown", 1)
}
parsedURL, err := url.ParseRequestURI(c.Args()[0])
fatalIf(err, "Unable to parse URL")
client, err := rpc.DialHTTPPath("tcp", parsedURL.Host, path.Join(reservedBucket, controlPath))
fatalIf(err, "Unable to connect to %s", parsedURL.Host)
args := &ShutdownArgs{Reboot: c.Bool("restart")}
reply := &ShutdownReply{}
err = client.Call("Control.Shutdown", args, reply)
fatalIf(err, "RPC Control.Shutdown call failed")
}

View File

@ -16,30 +16,6 @@
package cmd package cmd
import (
"net/rpc"
router "github.com/gorilla/mux"
)
// Routes paths for "minio control" commands.
const (
controlRPCPath = reservedBucket + "/control"
healPath = controlRPCPath + "/heal"
)
// Register control RPC handlers.
func registerControlRPCRouter(mux *router.Router, objAPI ObjectLayer) {
healRPCServer := rpc.NewServer()
healRPCServer.RegisterName("Heal", &healHandler{objAPI})
mux.Path(healPath).Handler(healRPCServer)
}
// Handler for object healing.
type healHandler struct {
ObjectAPI ObjectLayer
}
// HealListArgs - argument for ListObjects RPC. // HealListArgs - argument for ListObjects RPC.
type HealListArgs struct { type HealListArgs struct {
Bucket string Bucket string
@ -56,9 +32,13 @@ type HealListReply struct {
Objects []string Objects []string
} }
// ListObjects - list objects. // ListObjects - list all objects that needs healing.
func (h healHandler) ListObjects(arg *HealListArgs, reply *HealListReply) error { func (c *controllerAPIHandlers) ListObjectsHeal(arg *HealListArgs, reply *HealListReply) error {
info, err := h.ObjectAPI.ListObjectsHeal(arg.Bucket, arg.Prefix, arg.Marker, arg.Delimiter, arg.MaxKeys) objAPI := c.ObjectAPI
if objAPI == nil {
return errInvalidArgument
}
info, err := objAPI.ListObjectsHeal(arg.Bucket, arg.Prefix, arg.Marker, arg.Delimiter, arg.MaxKeys)
if err != nil { if err != nil {
return err return err
} }
@ -80,6 +60,29 @@ type HealObjectArgs struct {
type HealObjectReply struct{} type HealObjectReply struct{}
// HealObject - heal the object. // HealObject - heal the object.
func (h healHandler) HealObject(arg *HealObjectArgs, reply *HealObjectReply) error { func (c *controllerAPIHandlers) HealObject(arg *HealObjectArgs, reply *HealObjectReply) error {
return h.ObjectAPI.HealObject(arg.Bucket, arg.Object) objAPI := c.ObjectAPI
if objAPI == nil {
return errInvalidArgument
}
return objAPI.HealObject(arg.Bucket, arg.Object)
}
// ShutdownArgs - argument for Shutdown RPC.
type ShutdownArgs struct {
Reboot bool
}
// ShutdownReply - reply by Shutdown RPC.
type ShutdownReply struct{}
// Shutdown - Shutdown the server.
func (c *controllerAPIHandlers) Shutdown(arg *ShutdownArgs, reply *ShutdownReply) error {
if arg.Reboot {
globalShutdownSignalCh <- shutdownRestart
} else {
globalShutdownSignalCh <- shutdownHalt
}
return nil
} }

42
cmd/controller-router.go Normal file
View File

@ -0,0 +1,42 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"net/rpc"
router "github.com/gorilla/mux"
)
// Routes paths for "minio control" commands.
const (
controlPath = "/controller"
)
// Register control RPC handlers.
func registerControlRPCRouter(mux *router.Router, ctrlHandlers *controllerAPIHandlers) {
ctrlRPCServer := rpc.NewServer()
ctrlRPCServer.RegisterName("Control", ctrlHandlers)
ctrlRouter := mux.NewRoute().PathPrefix(reservedBucket).Subrouter()
ctrlRouter.Path(controlPath).Handler(ctrlRPCServer)
}
// Handler for object healing.
type controllerAPIHandlers struct {
ObjectAPI ObjectLayer
}

View File

@ -69,6 +69,11 @@ func configureServerHandler(srvCmdConfig serverCmdConfig) http.Handler {
ObjectAPI: objAPI, ObjectAPI: objAPI,
} }
// Initialize Controller.
ctrlHandlers := &controllerAPIHandlers{
ObjectAPI: objAPI,
}
// Initialize and monitor shutdown signals. // Initialize and monitor shutdown signals.
err = initGracefulShutdown(os.Exit) err = initGracefulShutdown(os.Exit)
fatalIf(err, "Unable to initialize graceful shutdown operation") fatalIf(err, "Unable to initialize graceful shutdown operation")
@ -98,7 +103,7 @@ func configureServerHandler(srvCmdConfig serverCmdConfig) http.Handler {
// FIXME: till net/rpc auth is brought in "minio control" can be enabled only though // FIXME: till net/rpc auth is brought in "minio control" can be enabled only though
// this env variable. // this env variable.
if os.Getenv("MINIO_CONTROL") != "" { if os.Getenv("MINIO_CONTROL") != "" {
registerControlRPCRouter(mux, objAPI) registerControlRPCRouter(mux, ctrlHandlers)
} }
// set environmental variable MINIO_BROWSER=off to disable minio web browser. // set environmental variable MINIO_BROWSER=off to disable minio web browser.

View File

@ -19,8 +19,10 @@ package cmd
import ( import (
"encoding/base64" "encoding/base64"
"encoding/xml" "encoding/xml"
"errors"
"io" "io"
"os" "os"
"os/exec"
"strings" "strings"
"sync" "sync"
"syscall" "syscall"
@ -145,8 +147,15 @@ func initGracefulShutdown(onExitFn onExitFunc) error {
return startMonitorShutdownSignal(onExitFn) return startMonitorShutdownSignal(onExitFn)
} }
type shutdownSignal int
const (
shutdownHalt = iota
shutdownRestart
)
// Global shutdown signal channel. // Global shutdown signal channel.
var globalShutdownSignalCh = make(chan struct{}, 1) var globalShutdownSignalCh = make(chan shutdownSignal, 1)
// Start to monitor shutdownSignal to execute shutdown callbacks // Start to monitor shutdownSignal to execute shutdown callbacks
func startMonitorShutdownSignal(onExitFn onExitFunc) error { func startMonitorShutdownSignal(onExitFn onExitFunc) error {
@ -162,8 +171,8 @@ func startMonitorShutdownSignal(onExitFn onExitFunc) error {
select { select {
case <-trapCh: case <-trapCh:
// Initiate graceful shutdown. // Initiate graceful shutdown.
globalShutdownSignalCh <- struct{}{} globalShutdownSignalCh <- shutdownHalt
case <-globalShutdownSignalCh: case signal := <-globalShutdownSignalCh:
// Call all object storage shutdown callbacks and exit for emergency // Call all object storage shutdown callbacks and exit for emergency
for _, callback := range globalShutdownCBs.GetObjectLayerCBs() { for _, callback := range globalShutdownCBs.GetObjectLayerCBs() {
exitCode := callback() exitCode := callback()
@ -179,6 +188,21 @@ func startMonitorShutdownSignal(onExitFn onExitFunc) error {
onExitFn(int(exitCode)) onExitFn(int(exitCode))
} }
} }
// All shutdown callbacks ensure that the server is safely terminated
// and any concurrent process could be started again
if signal == shutdownRestart {
path := os.Args[0]
cmdArgs := os.Args[1:]
cmd := exec.Command(path, cmdArgs...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
err := cmd.Start()
if err != nil {
errorIf(errors.New("Unable to reboot."), err.Error())
}
onExitFn(int(exitSuccess))
}
onExitFn(int(exitSuccess)) onExitFn(int(exitSuccess))
} }
} }