connection muxer should use bufio.Reader to be simpler. (#3177)

This commit is contained in:
Harshavardhana 2016-11-05 12:57:31 -07:00 committed by GitHub
parent 1ba497950c
commit 1105508453
2 changed files with 34 additions and 66 deletions

View File

@ -86,10 +86,7 @@ func isKeyFileExists() bool {
// isSSL - returns true with both cert and key exists. // isSSL - returns true with both cert and key exists.
func isSSL() bool { func isSSL() bool {
if isCertFileExists() && isKeyFileExists() { return isCertFileExists() && isKeyFileExists()
return true
}
return false
} }
// Reads certificated file and returns a list of parsed certificates. // Reads certificated file and returns a list of parsed certificates.

View File

@ -17,9 +17,9 @@
package cmd package cmd
import ( import (
"bufio"
"crypto/tls" "crypto/tls"
"errors" "errors"
"io"
"net" "net"
"net/http" "net/http"
"net/url" "net/url"
@ -28,6 +28,14 @@ import (
"time" "time"
) )
// The value choosen below is longest word choosen
// from all the http verbs comprising of
// "PRI", "OPTIONS", "GET", "HEAD", "POST",
// "PUT", "DELETE", "TRACE", "CONNECT".
const (
maxHTTPVerbLen = 7
)
var defaultHTTP2Methods = []string{ var defaultHTTP2Methods = []string{
"PRI", "PRI",
} }
@ -43,64 +51,40 @@ var defaultHTTP1Methods = []string{
"CONNECT", "CONNECT",
} }
// ConnBuf - contains network buffer to record data // ConnMux - Peeks into the incoming connection for relevant
type ConnBuf struct { // protocol without advancing the underlying net.Conn (io.Reader).
buffer []byte // ConnMux - allows us to multiplex between TLS and Regular HTTP
unRead bool // connections on the same listeners.
offset int
}
// ConnMux - implements a Read() which streams twice the firs bytes from
// the incoming connection, to help peeking protocol
type ConnMux struct { type ConnMux struct {
net.Conn net.Conn
lastError error bufrw *bufio.ReadWriter
dataBuf ConnBuf
}
func longestWord(strings []string) int {
maxLen := 0
for _, m := range defaultHTTP1Methods {
if maxLen < len(m) {
maxLen = len(m)
}
}
for _, m := range defaultHTTP2Methods {
if maxLen < len(m) {
maxLen = len(m)
}
}
return maxLen
} }
// NewConnMux - creates a new ConnMux instance // NewConnMux - creates a new ConnMux instance
func NewConnMux(c net.Conn) *ConnMux { func NewConnMux(c net.Conn) *ConnMux {
h1 := longestWord(defaultHTTP1Methods) br := bufio.NewReader(c)
h2 := longestWord(defaultHTTP2Methods) bw := bufio.NewWriter(c)
max := h1 return &ConnMux{
if h2 > max { Conn: c,
max = h2 bufrw: bufio.NewReadWriter(br, bw),
} }
return &ConnMux{Conn: c, dataBuf: ConnBuf{buffer: make([]byte, max+1)}}
} }
// PeekProtocol - reads the first bytes, then checks if it is similar // PeekProtocol - reads the first bytes, then checks if it is similar
// to one of the default http methods // to one of the default http methods
func (c *ConnMux) PeekProtocol() string { func (c *ConnMux) PeekProtocol() string {
var n int buf, err := c.bufrw.Peek(maxHTTPVerbLen)
n, c.lastError = c.Conn.Read(c.dataBuf.buffer) if err != nil {
if n == 0 || (c.lastError != nil && c.lastError != io.EOF) { errorIf(err, "Unable to peek into the protocol")
return "" return "http"
} }
c.dataBuf.unRead = true
for _, m := range defaultHTTP1Methods { for _, m := range defaultHTTP1Methods {
if strings.HasPrefix(string(c.dataBuf.buffer), m) { if strings.HasPrefix(string(buf), m) {
return "http" return "http"
} }
} }
for _, m := range defaultHTTP2Methods { for _, m := range defaultHTTP2Methods {
if strings.HasPrefix(string(c.dataBuf.buffer), m) { if strings.HasPrefix(string(buf), m) {
return "http2" return "http2"
} }
} }
@ -110,28 +94,15 @@ func (c *ConnMux) PeekProtocol() string {
// Read - streams the ConnMux buffer when reset flag is activated, otherwise // Read - streams the ConnMux buffer when reset flag is activated, otherwise
// streams from the incoming network connection // streams from the incoming network connection
func (c *ConnMux) Read(b []byte) (int, error) { func (c *ConnMux) Read(b []byte) (int, error) {
if c.dataBuf.unRead { return c.bufrw.Read(b)
n := copy(b, c.dataBuf.buffer[c.dataBuf.offset:]) }
c.dataBuf.offset += n
if c.dataBuf.offset == len(c.dataBuf.buffer) { // Close the connection.
// We finished copying all c.buffer, reset all func (c *ConnMux) Close() (err error) {
c.dataBuf.unRead = false if err = c.bufrw.Flush(); err != nil {
c.dataBuf.offset = 0 return err
c.dataBuf.buffer = c.dataBuf.buffer[:]
if n < len(b) {
// Continue copying from socket if b still has room for data
tmpBuffer := make([]byte, len(b)-n-1)
nr, err := c.Conn.Read(tmpBuffer)
for idx, val := range tmpBuffer {
b[n+idx] = val
} }
return n + nr, err return c.Conn.Close()
}
}
// We here return the last error
return n, c.lastError
}
return c.Conn.Read(b)
} }
// ListenerMux wraps the standard net.Listener to inspect // ListenerMux wraps the standard net.Listener to inspect