From 5ff30777e149fdd300b09791cbec2bb40f78ac3a Mon Sep 17 00:00:00 2001 From: Anis Elleuch Date: Sun, 6 Nov 2016 20:41:01 +0100 Subject: [PATCH] Rewrite connection muxer peek process to avoid server blocking by silent clients (#3187) --- cmd/server-mux.go | 65 +++++++++++++++++++++++++++++------------- cmd/server-mux_test.go | 7 ++--- 2 files changed, 47 insertions(+), 25 deletions(-) diff --git a/cmd/server-mux.go b/cmd/server-mux.go index 3c1c33e51..a3167dae0 100644 --- a/cmd/server-mux.go +++ b/cmd/server-mux.go @@ -134,11 +134,52 @@ func (c *ConnMux) Close() (err error) { type ListenerMux struct { net.Listener config *tls.Config + // acceptResCh is a channel for transporting wrapped net.Conn (regular or tls) + // after peeking the content of the latter + acceptResCh chan ListenerMuxAcceptRes // Cond is used to signal Close when there are no references to the listener. cond *sync.Cond refs int } +// ListenerMuxAcceptRes contains then final net.Conn data (wrapper by tls or not) to be sent to the http handler +type ListenerMuxAcceptRes struct { + conn net.Conn + err error +} + +// newListenerMux listens and wraps accepted connections with tls after protocol peeking +func newListenerMux(listener net.Listener, config *tls.Config) *ListenerMux { + l := ListenerMux{ + Listener: listener, + config: config, + cond: sync.NewCond(&sync.Mutex{}), + acceptResCh: make(chan ListenerMuxAcceptRes), + } + // Start listening, wrap connections with tls when needed + go func() { + // Loop for accepting new connections + for { + conn, err := l.Listener.Accept() + if err != nil { + l.acceptResCh <- ListenerMuxAcceptRes{err: err} + return + } + // Wrap the connection with ConnMux to be able to peek the data in the incoming connection + // and decide if we need to wrap the connection itself with a TLS or not + go func(conn net.Conn) { + connMux := NewConnMux(conn) + if connMux.PeekProtocol() == "tls" { + l.acceptResCh <- ListenerMuxAcceptRes{conn: tls.Server(connMux, l.config)} + } else { + l.acceptResCh <- ListenerMuxAcceptRes{conn: connMux} + } + }(conn) + } + }() + return &l +} + // IsClosed - Returns if the underlying listener is closed fully. func (l *ListenerMux) IsClosed() bool { l.cond.L.Lock() @@ -187,16 +228,8 @@ func (l *ListenerMux) Accept() (net.Conn, error) { l.incRef() defer l.decRef() - conn, err := l.Listener.Accept() - if err != nil { - return conn, err - } - connMux := NewConnMux(conn) - protocol := connMux.PeekProtocol() - if protocol == "tls" { - return tls.Server(connMux, l.config), nil - } - return connMux, nil + res := <-l.acceptResCh + return res.conn, res.err } // ServerMux - the main mux server @@ -247,11 +280,7 @@ func initListeners(serverAddr string, tls *tls.Config) ([]*ListenerMux, error) { if err != nil { return nil, err } - listeners = append(listeners, &ListenerMux{ - Listener: listener, - config: tls, - cond: sync.NewCond(&sync.Mutex{}), - }) + listeners = append(listeners, newListenerMux(listener, tls)) return listeners, nil } var addrs []string @@ -272,11 +301,7 @@ func initListeners(serverAddr string, tls *tls.Config) ([]*ListenerMux, error) { if err != nil { return nil, err } - listeners = append(listeners, &ListenerMux{ - Listener: listener, - config: tls, - cond: sync.NewCond(&sync.Mutex{}), - }) + listeners = append(listeners, newListenerMux(listener, tls)) } return listeners, nil } diff --git a/cmd/server-mux_test.go b/cmd/server-mux_test.go index af29fb29f..62ba9af48 100644 --- a/cmd/server-mux_test.go +++ b/cmd/server-mux_test.go @@ -59,11 +59,8 @@ func runTest(t *testing.T) { if err != nil { t.Fatal(err) } - ln = &ListenerMux{ - Listener: ln, - config: &tls.Config{}, - cond: sync.NewCond(&sync.Mutex{}), - } + + ln = newListenerMux(ln, &tls.Config{}) addr := ln.Addr().String() waitForListener := make(chan error)