Consolidating more codebase and cleanup in server / controller

This commit is contained in:
Harshavardhana 2015-09-19 02:36:50 -07:00
parent d9328d25e9
commit e510e97f28
13 changed files with 206 additions and 149 deletions

View File

@ -16,7 +16,17 @@
package main
import "github.com/minio/cli"
import (
"fmt"
"net"
"net/http"
"os"
"strings"
"github.com/minio/cli"
"github.com/minio/minio/pkg/minhttp"
"github.com/minio/minio/pkg/probe"
)
var controllerCmd = cli.Command{
Name: "controller",
@ -35,11 +45,51 @@ EXAMPLES:
`,
}
// getRPCServer instance
func getRPCServer(rpcHandler http.Handler) (*http.Server, *probe.Error) {
// Minio server config
httpServer := &http.Server{
Addr: ":9001", // TODO make this configurable
Handler: rpcHandler,
MaxHeaderBytes: 1 << 20,
}
var hosts []string
addrs, err := net.InterfaceAddrs()
if err != nil {
return nil, probe.NewError(err)
}
for _, addr := range addrs {
if addr.Network() == "ip+net" {
host := strings.Split(addr.String(), "/")[0]
if ip := net.ParseIP(host); ip.To4() != nil {
hosts = append(hosts, host)
}
}
}
for _, host := range hosts {
fmt.Printf("Starting minio server on: http://%s:9001/rpc, PID: %d\n", host, os.Getpid())
}
return httpServer, nil
}
// startController starts a minio controller
func startController() *probe.Error {
rpcServer, err := getRPCServer(getRPCHandler())
if err != nil {
return err.Trace()
}
// Setting rate limit to 'zero' no ratelimiting implemented
if err := minhttp.ListenAndServeLimited(0, rpcServer); err != nil {
return err.Trace()
}
return nil
}
func controllerMain(c *cli.Context) {
if c.Args().Present() {
cli.ShowCommandHelpAndExit(c, "controller", 1)
}
err := StartController()
err := startController()
errorIf(err.Trace(), "Failed to start minio controller.", nil)
}

View File

@ -1,68 +0,0 @@
/*
* Minio Cloud Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"fmt"
"net"
"net/http"
"os"
"strings"
"github.com/minio/minio/pkg/minhttp"
"github.com/minio/minio/pkg/probe"
)
// getRPCServer instance
func getControllerRPCServer(rpcHandler http.Handler) (*http.Server, *probe.Error) {
// Minio server config
httpServer := &http.Server{
Addr: ":9001", // TODO make this configurable
Handler: rpcHandler,
MaxHeaderBytes: 1 << 20,
}
var hosts []string
addrs, err := net.InterfaceAddrs()
if err != nil {
return nil, probe.NewError(err)
}
for _, addr := range addrs {
if addr.Network() == "ip+net" {
host := strings.Split(addr.String(), "/")[0]
if ip := net.ParseIP(host); ip.To4() != nil {
hosts = append(hosts, host)
}
}
}
for _, host := range hosts {
fmt.Printf("Starting minio server on: http://%s:9001/rpc, PID: %d\n", host, os.Getpid())
}
return httpServer, nil
}
// StartController starts a minio controller
func StartController() *probe.Error {
rpcServer, err := getControllerRPCServer(getRPCCtrlHandler())
if err != nil {
return err.Trace()
}
// Setting rate limit to 'zero' no ratelimiting implemented
if err := minhttp.ListenAndServeLimited(0, rpcServer); err != nil {
return err.Trace()
}
return nil
}

View File

@ -54,7 +54,21 @@ func errorIf(err *probe.Error, msg string, fields map[string]interface{}) {
fields["probe"] = string(jsonErr)
}
log.WithFields(fields).Error(msg)
}
func fatalIf(err *probe.Error, msg string, fields map[string]interface{}) {
if err == nil {
return
}
if fields == nil {
fields = make(map[string]interface{})
}
fields["error"] = err.ToGoError()
if jsonErr, e := json.Marshal(err); e == nil {
fields["probe"] = string(jsonErr)
}
log.WithFields(fields).Fatal(msg)
}
func audit(msg string, fields logrus.Fields) {

View File

@ -18,16 +18,6 @@ package main
import "encoding/xml"
// APIConfig - http server config
type APIConfig struct {
Address string
AddressRPC string
TLS bool
CertFile string
KeyFile string
RateLimit int
}
// Limit number of objects in a given response
const (
maxObjectList = 1000

View File

@ -18,7 +18,6 @@ package main
import (
"net/http"
"sort"
"github.com/minio/minio/pkg/donut"
)
@ -55,13 +54,6 @@ func generateListBucketsResponse(buckets []donut.BucketMetadata) ListBucketsResp
return data
}
// itemKey
type itemKey []*Object
func (b itemKey) Len() int { return len(b) }
func (b itemKey) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
func (b itemKey) Less(i, j int) bool { return b[i].Key < b[j].Key }
// takes a set of objects and prepares the objects for serialization
// input:
// bucket name
@ -92,7 +84,6 @@ func generateListObjectsResponse(bucket string, objects []donut.ObjectMetadata,
content.Owner = owner
contents = append(contents, content)
}
sort.Sort(itemKey(contents))
// TODO - support EncodingType in xml decoding
data.Name = bucket
data.Contents = contents
@ -201,7 +192,7 @@ func writeErrorResponse(w http.ResponseWriter, req *http.Request, errorType int,
setCommonHeaders(w, getContentTypeString(acceptsContentType), len(encodedErrorResponse))
// write Header
w.WriteHeader(error.HTTPStatusCode)
// HEAD should have no body
// HEAD should have no body, do not attempt to write to it
if req.Method != "HEAD" {
// write error body
w.Write(encodedErrorResponse)

View File

@ -1,43 +0,0 @@
/*
* Minio Cloud Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import "github.com/minio/minio/pkg/donut"
// APIOperation container for individual operations read by Ticket Master
type APIOperation struct {
ProceedCh chan struct{}
}
// MinioAPI container for API and also carries OP (operation) channel
type MinioAPI struct {
OP chan APIOperation
Donut donut.Interface
}
// NewAPI instantiate a new minio API
func NewAPI() MinioAPI {
// ignore errors for now
d, err := donut.New()
if err != nil {
panic(err)
}
return MinioAPI{
OP: make(chan APIOperation),
Donut: d,
}
}

View File

@ -16,7 +16,18 @@
package main
import "github.com/minio/cli"
import (
"crypto/tls"
"fmt"
"net"
"net/http"
"os"
"strings"
"github.com/minio/cli"
"github.com/minio/minio/pkg/minhttp"
"github.com/minio/minio/pkg/probe"
)
var serverCmd = cli.Command{
Name: "server",
@ -35,17 +46,102 @@ EXAMPLES:
`,
}
func getServerConfig(c *cli.Context) APIConfig {
// apiConfig - http server config
type apiConfig struct {
Address string
TLS bool
CertFile string
KeyFile string
RateLimit int
}
// getAPI server instance
func getAPIServer(conf apiConfig, apiHandler http.Handler) (*http.Server, *probe.Error) {
// Minio server config
httpServer := &http.Server{
Addr: conf.Address,
Handler: apiHandler,
MaxHeaderBytes: 1 << 20,
}
if conf.TLS {
var err error
httpServer.TLSConfig = &tls.Config{}
httpServer.TLSConfig.Certificates = make([]tls.Certificate, 1)
httpServer.TLSConfig.Certificates[0], err = tls.LoadX509KeyPair(conf.CertFile, conf.KeyFile)
if err != nil {
return nil, probe.NewError(err)
}
}
host, port, err := net.SplitHostPort(conf.Address)
if err != nil {
return nil, probe.NewError(err)
}
var hosts []string
switch {
case host != "":
hosts = append(hosts, host)
default:
addrs, err := net.InterfaceAddrs()
if err != nil {
return nil, probe.NewError(err)
}
for _, addr := range addrs {
if addr.Network() == "ip+net" {
host := strings.Split(addr.String(), "/")[0]
if ip := net.ParseIP(host); ip.To4() != nil {
hosts = append(hosts, host)
}
}
}
}
for _, host := range hosts {
if conf.TLS {
fmt.Printf("Starting minio server on: https://%s:%s, PID: %d\n", host, port, os.Getpid())
} else {
fmt.Printf("Starting minio server on: http://%s:%s, PID: %d\n", host, port, os.Getpid())
}
}
return httpServer, nil
}
// Start ticket master
func startTM(a MinioAPI) {
for {
for op := range a.OP {
op.ProceedCh <- struct{}{}
}
}
}
// startServer starts an s3 compatible cloud storage server
func startServer(conf apiConfig) *probe.Error {
apiHandler, minioAPI := getAPIHandler()
apiServer, err := getAPIServer(conf, apiHandler)
if err != nil {
return err.Trace()
}
// start ticket master
go startTM(minioAPI)
if err := minhttp.ListenAndServeLimited(conf.RateLimit, apiServer); err != nil {
return err.Trace()
}
return nil
}
func getServerConfig(c *cli.Context) apiConfig {
certFile := c.GlobalString("cert")
keyFile := c.GlobalString("key")
if (certFile != "" && keyFile == "") || (certFile == "" && keyFile != "") {
Fatalln("Both certificate and key are required to enable https.")
}
tls := (certFile != "" && keyFile != "")
return APIConfig{
return apiConfig{
Address: c.GlobalString("address"),
AddressRPC: c.GlobalString("address-rpcserver"),
TLS: tls,
CertFile: certFile,
KeyFile: keyFile,
@ -59,6 +155,6 @@ func serverMain(c *cli.Context) {
}
apiServerConfig := getServerConfig(c)
err := StartServer(apiServerConfig)
err := startServer(apiServerConfig)
errorIf(err.Trace(), "Failed to start the minio server.", nil)
}

View File

@ -22,6 +22,7 @@ import (
router "github.com/gorilla/mux"
jsonrpc "github.com/gorilla/rpc/v2"
"github.com/gorilla/rpc/v2/json"
"github.com/minio/minio/pkg/donut"
)
// registerAPI - register all the object API handlers to their respective paths
@ -55,8 +56,31 @@ func registerCustomMiddleware(mux *router.Router, mwHandlers ...MiddlewareHandle
return f
}
// APIOperation container for individual operations read by Ticket Master
type APIOperation struct {
ProceedCh chan struct{}
}
// MinioAPI container for API and also carries OP (operation) channel
type MinioAPI struct {
OP chan APIOperation
Donut donut.Interface
}
// getNewAPI instantiate a new minio API
func getNewAPI() MinioAPI {
// ignore errors for now
d, err := donut.New()
fatalIf(err.Trace(), "Instantiating donut failed.", nil)
return MinioAPI{
OP: make(chan APIOperation),
Donut: d,
}
}
// getAPIHandler api handler
func getAPIHandler(conf APIConfig) (http.Handler, MinioAPI) {
func getAPIHandler() (http.Handler, MinioAPI) {
var mwHandlers = []MiddlewareHandler{
ValidContentTypeHandler,
TimeValidityHandler,
@ -67,7 +91,7 @@ func getAPIHandler(conf APIConfig) (http.Handler, MinioAPI) {
}
mux := router.NewRouter()
minioAPI := NewAPI()
minioAPI := getNewAPI()
registerAPI(mux, minioAPI)
apiHandler := registerCustomMiddleware(mux, mwHandlers...)
return apiHandler, minioAPI

View File

@ -15,6 +15,7 @@
*/
package main
<<<<<<< HEAD
import (
"crypto/tls"
@ -127,3 +128,5 @@ func StartServer(conf APIConfig) *probe.Error {
}
return nil
}
=======
>>>>>>> Consolidating more codebase and cleanup in server / controller

View File

@ -51,7 +51,7 @@ func (s *MyAPIDonutCacheSuite) SetUpSuite(c *C) {
perr := donut.SaveConfig(conf)
c.Assert(perr, IsNil)
httpHandler, minioAPI := getAPIHandler(APIConfig{RateLimit: 16})
httpHandler, minioAPI := getAPIHandler()
go startTM(minioAPI)
testAPIDonutCacheServer = httptest.NewServer(httpHandler)
}

View File

@ -70,7 +70,7 @@ func (s *MyAPIDonutSuite) SetUpSuite(c *C) {
perr := donut.SaveConfig(conf)
c.Assert(perr, IsNil)
httpHandler, minioAPI := getAPIHandler(APIConfig{RateLimit: 16})
httpHandler, minioAPI := getAPIHandler()
go startTM(minioAPI)
testAPIDonutServer = httptest.NewServer(httpHandler)
}

View File

@ -78,7 +78,7 @@ func (s *MyAPISignatureV4Suite) SetUpSuite(c *C) {
perr = auth.SaveConfig(authConf)
c.Assert(perr, IsNil)
httpHandler, minioAPI := getAPIHandler(APIConfig{RateLimit: 16})
httpHandler, minioAPI := getAPIHandler()
go startTM(minioAPI)
testSignatureV4Server = httptest.NewServer(httpHandler)
}