diff --git a/pkg/server/nimble/http.go b/pkg/server/nimble/http.go new file mode 100644 index 000000000..0d477b9c6 --- /dev/null +++ b/pkg/server/nimble/http.go @@ -0,0 +1,147 @@ +// Based on https://github.com/facebookgo/grace and https://github.com/facebookgo/httpdown +// +// Modified for Minio's internal use +package nimble + +import ( + "crypto/tls" + "net" + "net/http" + "os" + "os/signal" + "sync" + "syscall" + + "github.com/facebookgo/httpdown" + "github.com/minio/minio/pkg/iodine" +) + +// An app contains one or more servers and associated configuration. +type app struct { + servers []*http.Server + net *nimbleNet + listeners []net.Listener + sds []httpdown.Server + errors chan error +} + +func newApp(servers []*http.Server) *app { + return &app{ + servers: servers, + net: &nimbleNet{}, + listeners: make([]net.Listener, 0, len(servers)), + sds: make([]httpdown.Server, 0, len(servers)), + errors: make(chan error, 1+(len(servers)*2)), + } +} + +func (a *app) listen() error { + for _, s := range a.servers { + l, err := a.net.Listen("tcp", s.Addr) + if err != nil { + return iodine.New(err, nil) + } + if s.TLSConfig != nil { + l = tls.NewListener(l, s.TLSConfig) + } + a.listeners = append(a.listeners, l) + } + return nil +} + +func (a *app) serve() { + h := &httpdown.HTTP{} + for i, s := range a.servers { + a.sds = append(a.sds, h.Serve(s, a.listeners[i])) + } +} + +func (a *app) wait() { + var wg sync.WaitGroup + wg.Add(len(a.sds) * 2) // Wait & Stop + go a.signalHandler(&wg) + for _, s := range a.sds { + go func(s httpdown.Server) { + defer wg.Done() + if err := s.Wait(); err != nil { + a.errors <- err + } + }(s) + } + wg.Wait() +} + +func (a *app) term(wg *sync.WaitGroup) { + for _, s := range a.sds { + go func(s httpdown.Server) { + defer wg.Done() + if err := s.Stop(); err != nil { + a.errors <- err + } + }(s) + } +} + +func (a *app) signalHandler(wg *sync.WaitGroup) { + ch := make(chan os.Signal, 10) + signal.Notify(ch, syscall.SIGTERM, syscall.SIGUSR2) + for { + sig := <-ch + switch sig { + case syscall.SIGTERM: + // this ensures a subsequent TERM will trigger standard go behaviour of + // terminating. + signal.Stop(ch) + a.term(wg) + return + case syscall.SIGUSR2: + // we only return here if there's an error, otherwise the new process + // will send us a TERM when it's ready to trigger the actual shutdown. + if _, err := a.net.StartProcess(); err != nil { + a.errors <- iodine.New(err, nil) + } + } + } +} + +// ListenAndServe will serve the given http.Servers and will monitor for signals +// allowing for graceful termination (SIGTERM) or restart (SIGHUP). +func ListenAndServe(servers ...*http.Server) error { + ppid := os.Getppid() + + a := newApp(servers) + + // Acquire Listeners + if err := a.listen(); err != nil { + return iodine.New(err, nil) + } + + // Start serving. + a.serve() + + // Close the parent if we inherited and it wasn't init that started us. + if os.Getenv("LISTEN_FDS") != "" && ppid != 1 { + if err := syscall.Kill(ppid, syscall.SIGTERM); err != nil { + return iodine.New(err, nil) + } + } + + waitDone := make(chan struct{}) + go func() { + defer close(waitDone) + a.wait() + // do not use closing a channel as a way of communicating over + // channel send an appropriate message + waitDone <- struct{}{} + }() + + select { + case err := <-a.errors: + if err == nil { + panic("unexpected nil error") + } + return iodine.New(err, nil) + case <-waitDone: + return nil + } +} diff --git a/pkg/server/nimble/net.go b/pkg/server/nimble/net.go new file mode 100644 index 000000000..bc5421916 --- /dev/null +++ b/pkg/server/nimble/net.go @@ -0,0 +1,248 @@ +// Package gracenet provides a family of Listen functions that either open a +// fresh connection or provide an inherited connection from when the process +// was started. The behave like their counterparts in the net pacakge, but +// transparently provide support for graceful restarts without dropping +// connections. This is provided in a systemd socket activation compatible form +// to allow using socket activation. +// +// BUG: Doesn't handle closing of listeners. +package nimble + +import ( + "fmt" + "net" + "os" + "os/exec" + "strconv" + "strings" + "sync" + + "github.com/minio/minio/pkg/iodine" +) + +const ( + // Used to indicate a graceful restart in the new process. + envCountKey = "LISTEN_FDS" + envCountKeyPrefix = envCountKey + "=" +) + +// In order to keep the working directory the same as when we started we record +// it at startup. +var originalWD, _ = os.Getwd() + +// Net provides the family of Listen functions and maintains the associated +// state. Typically you will have only once instance of Net per application. +type nimbleNet struct { + inherited []net.Listener + active []net.Listener + mutex sync.Mutex + inheritOnce sync.Once +} + +// fileListener using this to extract underlying file descriptor from net.Listener +type fileListener interface { + File() (*os.File, error) +} + +func (n *nimbleNet) inherit() error { + var retErr error + n.inheritOnce.Do(func() { + n.mutex.Lock() + defer n.mutex.Unlock() + countStr := os.Getenv(envCountKey) + if countStr == "" { + return + } + count, err := strconv.Atoi(countStr) + if err != nil { + retErr = fmt.Errorf("found invalid count value: %s=%s", envCountKey, countStr) + return + } + + // In normal operations if we are inheriting, the listeners will begin at fd 3. + fdStart := 3 + for i := fdStart; i < fdStart+count; i++ { + file := os.NewFile(uintptr(i), "listener") + l, err := net.FileListener(file) + if err != nil { + file.Close() + retErr = fmt.Errorf("error inheriting socket fd %d: %s", i, err) + return + } + if err := file.Close(); err != nil { + retErr = fmt.Errorf("error closing inherited socket fd %d: %s", i, err) + return + } + n.inherited = append(n.inherited, l) + } + }) + return iodine.New(retErr, nil) +} + +// Listen announces on the local network address laddr. The network net must be +// a stream-oriented network: "tcp", "tcp4", "tcp6", "unix" or "unixpacket". It +// returns an inherited net.Listener for the matching network and address, or +// creates a new one using net.Listen. +func (n *nimbleNet) Listen(nett, laddr string) (net.Listener, error) { + switch nett { + default: + return nil, net.UnknownNetworkError(nett) + case "tcp", "tcp4", "tcp6": + addr, err := net.ResolveTCPAddr(nett, laddr) + if err != nil { + return nil, iodine.New(err, nil) + } + return n.ListenTCP(nett, addr) + case "unix", "unixpacket", "invalid_unix_net_for_test": + addr, err := net.ResolveUnixAddr(nett, laddr) + if err != nil { + return nil, iodine.New(err, nil) + } + return n.ListenUnix(nett, addr) + } +} + +// ListenTCP announces on the local network address laddr. The network net must +// be: "tcp", "tcp4" or "tcp6". It returns an inherited net.Listener for the +// matching network and address, or creates a new one using net.ListenTCP. +func (n *nimbleNet) ListenTCP(nett string, laddr *net.TCPAddr) (*net.TCPListener, error) { + if err := n.inherit(); err != nil { + return nil, iodine.New(err, nil) + } + + n.mutex.Lock() + defer n.mutex.Unlock() + + // look for an inherited listener + for i, l := range n.inherited { + if l == nil { // we nil used inherited listeners + continue + } + if isSameAddr(l.Addr(), laddr) { + n.inherited[i] = nil + n.active = append(n.active, l) + return l.(*net.TCPListener), nil + } + } + + // make a fresh listener + l, err := net.ListenTCP(nett, laddr) + if err != nil { + return nil, iodine.New(err, nil) + } + n.active = append(n.active, l) + return l, nil +} + +// ListenUnix announces on the local network address laddr. The network net +// must be a: "unix" or "unixpacket". It returns an inherited net.Listener for +// the matching network and address, or creates a new one using net.ListenUnix. +func (n *nimbleNet) ListenUnix(nett string, laddr *net.UnixAddr) (*net.UnixListener, error) { + if err := n.inherit(); err != nil { + return nil, err + } + + n.mutex.Lock() + defer n.mutex.Unlock() + + // look for an inherited listener + for i, l := range n.inherited { + if l == nil { // we nil used inherited listeners + continue + } + if isSameAddr(l.Addr(), laddr) { + n.inherited[i] = nil + n.active = append(n.active, l) + return l.(*net.UnixListener), nil + } + } + + // make a fresh listener + l, err := net.ListenUnix(nett, laddr) + if err != nil { + return nil, iodine.New(err, nil) + } + n.active = append(n.active, l) + return l, nil +} + +// activeListeners returns a snapshot copy of the active listeners. +func (n *nimbleNet) activeListeners() ([]net.Listener, error) { + n.mutex.Lock() + defer n.mutex.Unlock() + listeners := make([]net.Listener, len(n.active)) + copy(listeners, n.active) + return listeners, nil +} + +func isSameAddr(a1, a2 net.Addr) bool { + if a1.Network() != a2.Network() { + return false + } + if a1.String() == a2.String() { + return true + } + // This allows for ipv6 vs ipv4 local addresses to compare as equal. This + // scenario is common when listening on localhost. + a1host, a1port, _ := net.SplitHostPort(a1.String()) + a2host, a2port, _ := net.SplitHostPort(a2.String()) + if a1host == a2host { + if a1port == a2port { + return true + } + } + return false +} + +// StartProcess starts a new process passing it the active listeners. It +// doesn't fork, but starts a new process using the same environment and +// arguments as when it was originally started. This allows for a newly +// deployed binary to be started. It returns the pid of the newly started +// process when successful. +func (n *nimbleNet) StartProcess() (int, error) { + listeners, err := n.activeListeners() + if err != nil { + return 0, iodine.New(err, nil) + } + + // Extract the fds from the listeners. + files := make([]*os.File, len(listeners)) + for i, l := range listeners { + files[i], err = l.(fileListener).File() + if err != nil { + return 0, iodine.New(err, nil) + } + defer files[i].Close() + } + + // Use the original binary location. This works with symlinks such that if + // the file it points to has been changed we will use the updated symlink. + argv0, err := exec.LookPath(os.Args[0]) + if err != nil { + return 0, iodine.New(err, nil) + } + + // Pass on the environment and replace the old count key with the new one. + var env []string + for _, v := range os.Environ() { + if !strings.HasPrefix(v, envCountKeyPrefix) { + env = append(env, v) + } + } + env = append(env, fmt.Sprintf("%s%d", envCountKeyPrefix, len(listeners))) + + inheritedFiles := append([]*os.File{os.Stdin, os.Stdout, os.Stderr}, files...) + process, err := os.StartProcess( + argv0, + os.Args, + &os.ProcAttr{ + Dir: originalWD, + Env: env, + Files: inheritedFiles, + }, + ) + if err != nil { + return 0, iodine.New(err, nil) + } + return process.Pid, nil +} diff --git a/pkg/server/server.go b/pkg/server/server.go index 1ebe9a243..0d068f3b9 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -25,12 +25,17 @@ import ( "github.com/minio/minio/pkg/iodine" "github.com/minio/minio/pkg/server/api" + "github.com/minio/minio/pkg/server/nimble" ) -// Start API listener -func startAPI(errCh chan error, conf api.Config, apiHandler http.Handler) { +// startServices start all services +func startServices(errCh chan error, servers ...*http.Server) { defer close(errCh) + errCh <- nimble.ListenAndServe(servers...) +} +// getAPI server instance +func getAPIServer(conf api.Config, apiHandler http.Handler) (*http.Server, error) { // Minio server config httpServer := &http.Server{ Addr: conf.Address, @@ -48,14 +53,13 @@ func startAPI(errCh chan error, conf api.Config, apiHandler http.Handler) { config.Certificates = make([]tls.Certificate, 1) config.Certificates[0], err = tls.LoadX509KeyPair(conf.CertFile, conf.KeyFile) if err != nil { - errCh <- iodine.New(err, nil) + return nil, iodine.New(err, nil) } } host, port, err := net.SplitHostPort(conf.Address) if err != nil { - errCh <- iodine.New(err, nil) - return + return nil, iodine.New(err, nil) } var hosts []string @@ -65,8 +69,7 @@ func startAPI(errCh chan error, conf api.Config, apiHandler http.Handler) { default: addrs, err := net.InterfaceAddrs() if err != nil { - errCh <- iodine.New(err, nil) - return + return nil, iodine.New(err, nil) } for _, addr := range addrs { if addr.Network() == "ip+net" { @@ -86,20 +89,18 @@ func startAPI(errCh chan error, conf api.Config, apiHandler http.Handler) { } } - errCh <- httpServer.ListenAndServe() + return httpServer, nil } -// Start RPC listener -func startRPC(errCh chan error, rpcHandler http.Handler) { - defer close(errCh) - +// getRPCServer instance +func getRPCServer(rpcHandler http.Handler) *http.Server { // Minio server config httpServer := &http.Server{ Addr: "127.0.0.1:9001", // TODO make this configurable Handler: rpcHandler, MaxHeaderBytes: 1 << 20, } - errCh <- httpServer.ListenAndServe() + return httpServer } // Start ticket master @@ -113,18 +114,18 @@ func startTM(a api.Minio) { // StartServices starts basic services for a server func StartServices(conf api.Config, doneCh chan struct{}) error { - apiErrCh := make(chan error) - rpcErrCh := make(chan error) - + errCh := make(chan error) apiHandler, minioAPI := getAPIHandler(conf) - go startAPI(apiErrCh, conf, apiHandler) - go startRPC(rpcErrCh, getRPCHandler()) + apiServer, err := getAPIServer(conf, apiHandler) + if err != nil { + return iodine.New(err, nil) + } + rpcServer := getRPCServer(getRPCHandler()) + go startServices(errCh, apiServer, rpcServer) go startTM(minioAPI) select { - case err := <-apiErrCh: - return iodine.New(err, nil) - case err := <-rpcErrCh: + case err := <-errCh: return iodine.New(err, nil) case <-doneCh: return nil