mirror of https://github.com/minio/minio.git
Merge pull request #810 from harshavardhana/restructure
Restructure server code, controller now runs in silo
This commit is contained in:
commit
dcf0c71ca3
|
@ -23,52 +23,27 @@ import (
|
|||
|
||||
var controllerCmd = cli.Command{
|
||||
Name: "controller",
|
||||
Usage: "Get|Set server configuration",
|
||||
Usage: "Start minio controller",
|
||||
Action: controllerMain,
|
||||
CustomHelpTemplate: `NAME:
|
||||
minio {{.Name}} - {{.Description}}
|
||||
|
||||
USAGE:
|
||||
minio {{.Name}} [get|set] [INFOTYPE] [SERVERURL]
|
||||
minio {{.Name}}
|
||||
|
||||
EXAMPLES:
|
||||
1. Get disks from controller
|
||||
$ minio {{.Name}} get disks http://localhost:9001/rpc
|
||||
|
||||
2. Get memstats from controller
|
||||
$ minio {{.Name}} get mem http://localhost:9001/rpc
|
||||
1. Start minio controller
|
||||
$ minio {{.Name}}
|
||||
|
||||
`,
|
||||
}
|
||||
|
||||
func controllerMain(c *cli.Context) {
|
||||
if len(c.Args()) < 2 || c.Args().First() == "help" {
|
||||
cli.ShowCommandHelpAndExit(c, "controller", 1) // last argument is exit code
|
||||
if c.Args().Present() {
|
||||
cli.ShowCommandHelpAndExit(c, "controller", 1)
|
||||
}
|
||||
if c.Args().First() == "get" {
|
||||
newArgs := c.Args().Tail()
|
||||
switch newArgs.First() {
|
||||
case "mem":
|
||||
memstats, err := controller.GetMemStats(newArgs.Tail().First())
|
||||
if err != nil {
|
||||
Fatalln(err)
|
||||
}
|
||||
Println(string(memstats))
|
||||
case "sysinfo":
|
||||
sysinfo, err := controller.GetSysInfo(newArgs.Tail().First())
|
||||
if err != nil {
|
||||
Fatalln(err)
|
||||
}
|
||||
Println(string(sysinfo))
|
||||
case "auth":
|
||||
keys, err := controller.GetAuthKeys(newArgs.Tail().First())
|
||||
if err != nil {
|
||||
Fatalln(err)
|
||||
}
|
||||
Println(string(keys))
|
||||
}
|
||||
}
|
||||
if c.Args().First() == "set" {
|
||||
Fatalln("Not supported yet")
|
||||
err := controller.StartController()
|
||||
if err != nil {
|
||||
Fatalln(err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,124 +0,0 @@
|
|||
/*
|
||||
* Minio Cloud Storage, (C) 2015 Minio, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package controller
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
|
||||
jsonrpc "github.com/gorilla/rpc/v2/json"
|
||||
"github.com/minio/minio/pkg/auth"
|
||||
"github.com/minio/minio/pkg/probe"
|
||||
"github.com/minio/minio/pkg/server/rpc"
|
||||
)
|
||||
|
||||
func closeResp(resp *http.Response) {
|
||||
if resp != nil && resp.Body != nil {
|
||||
resp.Body.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// GetMemStats get memory status of the server at given url
|
||||
func GetMemStats(url string) ([]byte, *probe.Error) {
|
||||
op := RPCOps{
|
||||
Method: "MemStats.Get",
|
||||
Request: rpc.Args{Request: ""},
|
||||
}
|
||||
req, perr := NewRequest(url, op, http.DefaultTransport)
|
||||
if perr != nil {
|
||||
return nil, perr.Trace()
|
||||
}
|
||||
resp, perr := req.Do()
|
||||
defer closeResp(resp)
|
||||
if perr != nil {
|
||||
return nil, perr.Trace()
|
||||
}
|
||||
var reply rpc.MemStatsReply
|
||||
if err := jsonrpc.DecodeClientResponse(resp.Body, &reply); err != nil {
|
||||
return nil, probe.NewError(err)
|
||||
}
|
||||
jsonRespBytes, err := json.MarshalIndent(reply, "", "\t")
|
||||
if err != nil {
|
||||
return nil, probe.NewError(err)
|
||||
}
|
||||
return jsonRespBytes, nil
|
||||
}
|
||||
|
||||
// GetSysInfo get system status of the server at given url
|
||||
func GetSysInfo(url string) ([]byte, *probe.Error) {
|
||||
op := RPCOps{
|
||||
Method: "SysInfo.Get",
|
||||
Request: rpc.Args{Request: ""},
|
||||
}
|
||||
req, perr := NewRequest(url, op, http.DefaultTransport)
|
||||
if perr != nil {
|
||||
return nil, perr.Trace()
|
||||
}
|
||||
resp, perr := req.Do()
|
||||
defer closeResp(resp)
|
||||
if perr != nil {
|
||||
return nil, perr.Trace()
|
||||
}
|
||||
var reply rpc.SysInfoReply
|
||||
if err := jsonrpc.DecodeClientResponse(resp.Body, &reply); err != nil {
|
||||
return nil, probe.NewError(err)
|
||||
}
|
||||
jsonRespBytes, err := json.MarshalIndent(reply, "", "\t")
|
||||
if err != nil {
|
||||
return nil, probe.NewError(err)
|
||||
}
|
||||
return jsonRespBytes, nil
|
||||
}
|
||||
|
||||
// GetAuthKeys get access key id and secret access key
|
||||
func GetAuthKeys(url string) ([]byte, *probe.Error) {
|
||||
op := RPCOps{
|
||||
Method: "Auth.Get",
|
||||
Request: rpc.Args{Request: ""},
|
||||
}
|
||||
req, perr := NewRequest(url, op, http.DefaultTransport)
|
||||
if perr != nil {
|
||||
return nil, perr.Trace()
|
||||
}
|
||||
resp, perr := req.Do()
|
||||
defer closeResp(resp)
|
||||
if perr != nil {
|
||||
return nil, perr.Trace()
|
||||
}
|
||||
var reply rpc.AuthReply
|
||||
if err := jsonrpc.DecodeClientResponse(resp.Body, &reply); err != nil {
|
||||
return nil, probe.NewError(err)
|
||||
}
|
||||
authConfig := &auth.Config{}
|
||||
authConfig.Version = "0.0.1"
|
||||
authConfig.Users = make(map[string]*auth.User)
|
||||
user := &auth.User{}
|
||||
user.Name = "testuser"
|
||||
user.AccessKeyID = reply.AccessKeyID
|
||||
user.SecretAccessKey = reply.SecretAccessKey
|
||||
authConfig.Users[reply.AccessKeyID] = user
|
||||
if err := auth.SaveConfig(authConfig); err != nil {
|
||||
return nil, err.Trace()
|
||||
}
|
||||
jsonRespBytes, err := json.MarshalIndent(reply, "", "\t")
|
||||
if err != nil {
|
||||
return nil, probe.NewError(err)
|
||||
}
|
||||
return jsonRespBytes, nil
|
||||
}
|
||||
|
||||
// Add more functions here for other RPC messages
|
|
@ -0,0 +1,43 @@
|
|||
/*
|
||||
* Minio Cloud Storage, (C) 2015 Minio, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package controller
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
router "github.com/gorilla/mux"
|
||||
"github.com/minio/minio/pkg/rpc"
|
||||
)
|
||||
|
||||
// getRPCHandler rpc handler
|
||||
func getRPCHandler() http.Handler {
|
||||
s := rpc.NewServer()
|
||||
s.RegisterJSONCodec()
|
||||
s.RegisterService(new(rpc.VersionService), "Version")
|
||||
s.RegisterService(new(rpc.SysInfoService), "SysInfo")
|
||||
s.RegisterService(new(rpc.MemStatsService), "MemStats")
|
||||
s.RegisterService(new(rpc.DonutService), "Donut")
|
||||
s.RegisterService(new(rpc.AuthService), "Auth")
|
||||
// Add new RPC services here
|
||||
return registerRPC(router.NewRouter(), s)
|
||||
}
|
||||
|
||||
// registerRPC - register rpc handlers
|
||||
func registerRPC(mux *router.Router, s *rpc.Server) http.Handler {
|
||||
mux.Handle("/rpc", s)
|
||||
return mux
|
||||
}
|
|
@ -14,38 +14,41 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package server
|
||||
package controller
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
jsonrpc "github.com/gorilla/rpc/v2/json"
|
||||
"github.com/minio/minio/pkg/controller"
|
||||
"github.com/minio/minio/pkg/server/rpc"
|
||||
"github.com/minio/minio/pkg/rpc"
|
||||
. "gopkg.in/check.v1"
|
||||
)
|
||||
|
||||
type MyRPCSuite struct{}
|
||||
// Hook up gocheck into the "go test" runner.
|
||||
func Test(t *testing.T) { TestingT(t) }
|
||||
|
||||
var _ = Suite(&MyRPCSuite{})
|
||||
type MySuite struct{}
|
||||
|
||||
var _ = Suite(&MySuite{})
|
||||
|
||||
var testRPCServer *httptest.Server
|
||||
|
||||
func (s *MyRPCSuite) SetUpSuite(c *C) {
|
||||
func (s *MySuite) SetUpSuite(c *C) {
|
||||
testRPCServer = httptest.NewServer(getRPCHandler())
|
||||
}
|
||||
|
||||
func (s *MyRPCSuite) TearDownSuite(c *C) {
|
||||
func (s *MySuite) TearDownSuite(c *C) {
|
||||
testRPCServer.Close()
|
||||
}
|
||||
|
||||
func (s *MyRPCSuite) TestMemStats(c *C) {
|
||||
op := controller.RPCOps{
|
||||
func (s *MySuite) TestMemStats(c *C) {
|
||||
op := rpc.Operation{
|
||||
Method: "MemStats.Get",
|
||||
Request: rpc.Args{Request: ""},
|
||||
}
|
||||
req, err := controller.NewRequest(testRPCServer.URL+"/rpc", op, http.DefaultTransport)
|
||||
req, err := rpc.NewRequest(testRPCServer.URL+"/rpc", op, http.DefaultTransport)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(req.Get("Content-Type"), Equals, "application/json")
|
||||
resp, err := req.Do()
|
||||
|
@ -58,12 +61,12 @@ func (s *MyRPCSuite) TestMemStats(c *C) {
|
|||
c.Assert(reply, Not(DeepEquals), rpc.MemStatsReply{})
|
||||
}
|
||||
|
||||
func (s *MyRPCSuite) TestSysInfo(c *C) {
|
||||
op := controller.RPCOps{
|
||||
func (s *MySuite) TestSysInfo(c *C) {
|
||||
op := rpc.Operation{
|
||||
Method: "SysInfo.Get",
|
||||
Request: rpc.Args{Request: ""},
|
||||
}
|
||||
req, err := controller.NewRequest(testRPCServer.URL+"/rpc", op, http.DefaultTransport)
|
||||
req, err := rpc.NewRequest(testRPCServer.URL+"/rpc", op, http.DefaultTransport)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(req.Get("Content-Type"), Equals, "application/json")
|
||||
resp, err := req.Do()
|
||||
|
@ -76,12 +79,12 @@ func (s *MyRPCSuite) TestSysInfo(c *C) {
|
|||
c.Assert(reply, Not(DeepEquals), rpc.SysInfoReply{})
|
||||
}
|
||||
|
||||
func (s *MyRPCSuite) TestAuth(c *C) {
|
||||
op := controller.RPCOps{
|
||||
func (s *MySuite) TestAuth(c *C) {
|
||||
op := rpc.Operation{
|
||||
Method: "Auth.Get",
|
||||
Request: rpc.Args{Request: ""},
|
||||
}
|
||||
req, err := controller.NewRequest(testRPCServer.URL+"/rpc", op, http.DefaultTransport)
|
||||
req, err := rpc.NewRequest(testRPCServer.URL+"/rpc", op, http.DefaultTransport)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(req.Get("Content-Type"), Equals, "application/json")
|
||||
resp, err := req.Do()
|
|
@ -0,0 +1,67 @@
|
|||
/*
|
||||
* Minio Cloud Storage, (C) 2015 Minio, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package controller
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/minio/minio/pkg/minhttp"
|
||||
"github.com/minio/minio/pkg/probe"
|
||||
)
|
||||
|
||||
// getRPCServer instance
|
||||
func getRPCServer(rpcHandler http.Handler) (*http.Server, *probe.Error) {
|
||||
// Minio server config
|
||||
httpServer := &http.Server{
|
||||
Addr: ":9001", // TODO make this configurable
|
||||
Handler: rpcHandler,
|
||||
MaxHeaderBytes: 1 << 20,
|
||||
}
|
||||
var hosts []string
|
||||
addrs, err := net.InterfaceAddrs()
|
||||
if err != nil {
|
||||
return nil, probe.NewError(err)
|
||||
}
|
||||
for _, addr := range addrs {
|
||||
if addr.Network() == "ip+net" {
|
||||
host := strings.Split(addr.String(), "/")[0]
|
||||
if ip := net.ParseIP(host); ip.To4() != nil {
|
||||
hosts = append(hosts, host)
|
||||
}
|
||||
}
|
||||
}
|
||||
for _, host := range hosts {
|
||||
fmt.Printf("Starting minio server on: http://%s:9001/rpc, PID: %d\n", host, os.Getpid())
|
||||
}
|
||||
return httpServer, nil
|
||||
}
|
||||
|
||||
func StartController() *probe.Error {
|
||||
rpcServer, err := getRPCServer(getRPCHandler())
|
||||
if err != nil {
|
||||
return err.Trace()
|
||||
}
|
||||
// Setting rate limit to 'zero' no ratelimiting implemented
|
||||
if err := minhttp.ListenAndServeLimited(0, rpcServer); err != nil {
|
||||
return err.Trace()
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -14,7 +14,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package controller
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
|
@ -24,20 +24,20 @@ import (
|
|||
"github.com/minio/minio/pkg/probe"
|
||||
)
|
||||
|
||||
// RPCOps RPC operation
|
||||
type RPCOps struct {
|
||||
// Operation RPC operation
|
||||
type Operation struct {
|
||||
Method string
|
||||
Request interface{}
|
||||
}
|
||||
|
||||
// RPCRequest rpc client request
|
||||
type RPCRequest struct {
|
||||
// Request rpc client request
|
||||
type Request struct {
|
||||
req *http.Request
|
||||
transport http.RoundTripper
|
||||
}
|
||||
|
||||
// NewRequest initiate a new client RPC request
|
||||
func NewRequest(url string, op RPCOps, transport http.RoundTripper) (*RPCRequest, *probe.Error) {
|
||||
func NewRequest(url string, op Operation, transport http.RoundTripper) (*Request, *probe.Error) {
|
||||
params, err := json.EncodeClientRequest(op.Method, op.Request)
|
||||
if err != nil {
|
||||
return nil, probe.NewError(err)
|
||||
|
@ -46,7 +46,7 @@ func NewRequest(url string, op RPCOps, transport http.RoundTripper) (*RPCRequest
|
|||
if err != nil {
|
||||
return nil, probe.NewError(err)
|
||||
}
|
||||
rpcReq := &RPCRequest{}
|
||||
rpcReq := &Request{}
|
||||
rpcReq.req = req
|
||||
rpcReq.req.Header.Set("Content-Type", "application/json")
|
||||
if transport == nil {
|
||||
|
@ -57,7 +57,7 @@ func NewRequest(url string, op RPCOps, transport http.RoundTripper) (*RPCRequest
|
|||
}
|
||||
|
||||
// Do - make a http connection
|
||||
func (r RPCRequest) Do() (*http.Response, *probe.Error) {
|
||||
func (r Request) Do() (*http.Response, *probe.Error) {
|
||||
resp, err := r.transport.RoundTrip(r.req)
|
||||
if err != nil {
|
||||
if err, ok := probe.UnwrapError(err); ok {
|
||||
|
@ -69,11 +69,11 @@ func (r RPCRequest) Do() (*http.Response, *probe.Error) {
|
|||
}
|
||||
|
||||
// Get - get value of requested header
|
||||
func (r RPCRequest) Get(key string) string {
|
||||
func (r Request) Get(key string) string {
|
||||
return r.req.Header.Get(key)
|
||||
}
|
||||
|
||||
// Set - set value of a header key
|
||||
func (r *RPCRequest) Set(key, value string) {
|
||||
func (r *Request) Set(key, value string) {
|
||||
r.req.Header.Set(key, value)
|
||||
}
|
|
@ -20,8 +20,8 @@ import (
|
|||
"net/http"
|
||||
|
||||
router "github.com/gorilla/mux"
|
||||
"github.com/minio/minio/pkg/rpc"
|
||||
"github.com/minio/minio/pkg/server/api"
|
||||
"github.com/minio/minio/pkg/server/rpc"
|
||||
)
|
||||
|
||||
// registerAPI - register all the object API handlers to their respective paths
|
||||
|
@ -92,12 +92,6 @@ func registerCustomMiddleware(mux http.Handler, conf api.Config) http.Handler {
|
|||
return mux
|
||||
}
|
||||
|
||||
// registerRPC - register rpc handlers
|
||||
func registerRPC(mux *router.Router, s *rpc.Server) http.Handler {
|
||||
mux.Handle("/rpc", s)
|
||||
return mux
|
||||
}
|
||||
|
||||
// getAPIHandler api handler
|
||||
func getAPIHandler(conf api.Config) (http.Handler, api.Minio) {
|
||||
mux := router.NewRouter()
|
||||
|
@ -111,11 +105,12 @@ func getAPIHandler(conf api.Config) (http.Handler, api.Minio) {
|
|||
func getRPCHandler() http.Handler {
|
||||
s := rpc.NewServer()
|
||||
s.RegisterJSONCodec()
|
||||
s.RegisterService(new(rpc.VersionService), "Version")
|
||||
s.RegisterService(new(rpc.SysInfoService), "SysInfo")
|
||||
s.RegisterService(new(rpc.MemStatsService), "MemStats")
|
||||
s.RegisterService(new(rpc.DonutService), "Donut")
|
||||
s.RegisterService(new(rpc.AuthService), "Auth")
|
||||
// Add new RPC services here
|
||||
return registerRPC(router.NewRouter(), s)
|
||||
}
|
||||
|
||||
// registerRPC - register rpc handlers
|
||||
func registerRPC(mux *router.Router, s *rpc.Server) http.Handler {
|
||||
mux.Handle("/rpc", s)
|
||||
return mux
|
||||
}
|
||||
|
|
|
@ -24,9 +24,9 @@ import (
|
|||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/minio/minio/pkg/minhttp"
|
||||
"github.com/minio/minio/pkg/probe"
|
||||
"github.com/minio/minio/pkg/server/api"
|
||||
"github.com/minio/minio/pkg/server/minhttp"
|
||||
)
|
||||
|
||||
// getAPI server instance
|
||||
|
|
Loading…
Reference in New Issue