diff --git a/cmd/http-stats.go b/cmd/http-stats.go index 4497e9c73..e83f7fece 100644 --- a/cmd/http-stats.go +++ b/cmd/http-stats.go @@ -19,6 +19,7 @@ package cmd import ( "fmt" "net/http" + "strings" "time" "github.com/prometheus/client_golang/prometheus" @@ -26,22 +27,50 @@ import ( "go.uber.org/atomic" ) +func getRequestResource(r *http.Request) string { + if r == nil { + // http.Request is nil when non-HTTP data (like TLS record) is read/written. + return "" + } + + if globalDomainName != "" { + host := r.Header.Get("Host") + if strings.HasSuffix(host, "."+globalDomainName) { + return "/" + strings.TrimSuffix(host, "."+globalDomainName) + r.URL.Path + } + } + + return r.URL.Path +} + // ConnStats - Network statistics // Count total input/output transferred bytes during // the server's life. type ConnStats struct { - totalInputBytes atomic.Uint64 - totalOutputBytes atomic.Uint64 + totalInputBytes atomic.Uint64 + totalOutputBytes atomic.Uint64 + totalRPCInputBytes atomic.Uint64 + totalRPCOutputBytes atomic.Uint64 } // Increase total input bytes -func (s *ConnStats) incInputBytes(n int) { - s.totalInputBytes.Add(uint64(n)) +func (s *ConnStats) incInputBytes(r *http.Request, n int) { + resource := getRequestResource(r) + if resource == minioReservedBucketPath || strings.HasPrefix(resource, minioReservedBucketPath+"/") { + s.totalRPCInputBytes.Add(uint64(n)) + } else { + s.totalInputBytes.Add(uint64(n)) + } } // Increase total output bytes -func (s *ConnStats) incOutputBytes(n int) { - s.totalOutputBytes.Add(uint64(n)) +func (s *ConnStats) incOutputBytes(r *http.Request, n int) { + resource := getRequestResource(r) + if resource == minioReservedBucketPath || strings.HasPrefix(resource, minioReservedBucketPath+"/") { + s.totalRPCOutputBytes.Add(uint64(n)) + } else { + s.totalOutputBytes.Add(uint64(n)) + } } // Return total input bytes diff --git a/cmd/http/bufconn.go b/cmd/http/bufconn.go index b21c1ac2c..c57a68337 100644 --- a/cmd/http/bufconn.go +++ b/cmd/http/bufconn.go @@ -19,17 +19,28 @@ package http import ( "bufio" "net" + "net/http" "time" ) // BufConn - is a generic stream-oriented network connection supporting buffered reader and read/write timeout. type BufConn struct { QuirkConn - bufReader *bufio.Reader // buffered reader wraps reader in net.Conn. - readTimeout time.Duration // sets the read timeout in the connection. - writeTimeout time.Duration // sets the write timeout in the connection. - updateBytesReadFunc func(int) // function to be called to update bytes read. - updateBytesWrittenFunc func(int) // function to be called to update bytes written. + bufReader *bufio.Reader // buffered reader wraps reader in net.Conn. + readTimeout time.Duration // sets the read timeout in the connection. + writeTimeout time.Duration // sets the write timeout in the connection. + request *http.Request // HTTP request information. + updateBytesReadFunc func(*http.Request, int) // function to be called to update bytes read. + updateBytesWrittenFunc func(*http.Request, int) // function to be called to update bytes written. +} + +func (c *BufConn) setRequest(request *http.Request) { + c.request = request +} + +func (c *BufConn) setUpdateFuncs(updateBytesReadFunc, updateBytesWrittenFunc func(*http.Request, int)) { + c.updateBytesReadFunc = updateBytesReadFunc + c.updateBytesWrittenFunc = updateBytesWrittenFunc } // Sets read timeout @@ -70,7 +81,7 @@ func (c *BufConn) Read(b []byte) (n int, err error) { c.setReadTimeout() n, err = c.bufReader.Read(b) if err == nil && c.updateBytesReadFunc != nil { - c.updateBytesReadFunc(n) + c.updateBytesReadFunc(c.request, n) } return n, err @@ -81,21 +92,18 @@ func (c *BufConn) Write(b []byte) (n int, err error) { c.setWriteTimeout() n, err = c.Conn.Write(b) if err == nil && c.updateBytesWrittenFunc != nil { - c.updateBytesWrittenFunc(n) + c.updateBytesWrittenFunc(c.request, n) } return n, err } // newBufConn - creates a new connection object wrapping net.Conn. -func newBufConn(c net.Conn, readTimeout, writeTimeout time.Duration, - updateBytesReadFunc, updateBytesWrittenFunc func(int)) *BufConn { +func newBufConn(c net.Conn, readTimeout, writeTimeout time.Duration) *BufConn { return &BufConn{ - QuirkConn: QuirkConn{Conn: c}, - bufReader: bufio.NewReader(c), - readTimeout: readTimeout, - writeTimeout: writeTimeout, - updateBytesReadFunc: updateBytesReadFunc, - updateBytesWrittenFunc: updateBytesWrittenFunc, + QuirkConn: QuirkConn{Conn: c}, + bufReader: bufio.NewReader(c), + readTimeout: readTimeout, + writeTimeout: writeTimeout, } } diff --git a/cmd/http/bufconn_test.go b/cmd/http/bufconn_test.go index 4be9aaed8..f6a366c0e 100644 --- a/cmd/http/bufconn_test.go +++ b/cmd/http/bufconn_test.go @@ -48,7 +48,7 @@ func TestBuffConnReadTimeout(t *testing.T) { if terr != nil { t.Fatalf("failed to accept new connection. %v", terr) } - bufconn := newBufConn(tcpConn, 1*time.Second, 1*time.Second, nil, nil) + bufconn := newBufConn(tcpConn, 1*time.Second, 1*time.Second) defer bufconn.Close() // Read a line diff --git a/cmd/http/listener.go b/cmd/http/listener.go index 47959fdc0..75df1c551 100644 --- a/cmd/http/listener.go +++ b/cmd/http/listener.go @@ -23,6 +23,7 @@ import ( "io" "net" "net/http" + "net/url" "os" "strings" "sync" @@ -72,6 +73,41 @@ func isHTTPMethod(s string) bool { return false } +func getResourceHost(bufConn *BufConn, maxHeaderBytes, methodLen int) (resource string, host string, err error) { + var data []byte + for dataLen := 0; methodLen+dataLen < maxHeaderBytes; dataLen += 8 { + if data, err = bufConn.Peek(methodLen + dataLen); err != nil { + return "", "", err + } + + tokens := strings.Split(string(data), "\n") + if len(tokens) < 2 { + continue + } + + if resource == "" { + resource = strings.SplitN(tokens[0][methodLen:], " ", 2)[0] + } + + for _, token := range tokens[1:] { + if token == "" || !strings.HasSuffix(token, "\r") { + continue + } + + if strings.HasPrefix(token, "Host: ") { + host = strings.TrimPrefix(strings.TrimSuffix(token, "\r"), "Host: ") + return resource, host, nil + } + } + + if tokens[len(tokens)-1] == "\r" { + break + } + } + + return resource, host, nil +} + type acceptResult struct { conn net.Conn err error @@ -87,8 +123,9 @@ type httpListener struct { tcpKeepAliveTimeout time.Duration readTimeout time.Duration writeTimeout time.Duration - updateBytesReadFunc func(int) // function to be called to update bytes read in BufConn. - updateBytesWrittenFunc func(int) // function to be called to update bytes written in BufConn. + maxHeaderBytes int + updateBytesReadFunc func(*http.Request, int) // function to be called to update bytes read in BufConn. + updateBytesWrittenFunc func(*http.Request, int) // function to be called to update bytes written in BufConn. } // isRoutineNetErr returns true if error is due to a network timeout, @@ -134,8 +171,7 @@ func (listener *httpListener) start() { tcpConn.SetKeepAlive(true) tcpConn.SetKeepAlivePeriod(listener.tcpKeepAliveTimeout) - bufconn := newBufConn(tcpConn, listener.readTimeout, listener.writeTimeout, - listener.updateBytesReadFunc, listener.updateBytesWrittenFunc) + bufconn := newBufConn(tcpConn, listener.readTimeout, listener.writeTimeout) // Peek bytes of maximum length of all HTTP methods. data, err := bufconn.Peek(methodMaxLen) @@ -158,14 +194,39 @@ func (listener *httpListener) start() { // Return bufconn if read data is a valid HTTP method. tokens := strings.SplitN(string(data), " ", 2) if isHTTPMethod(tokens[0]) { - if listener.tlsConfig == nil { - send(acceptResult{bufconn, nil}, doneCh) - } else { + if listener.tlsConfig != nil { // As TLS is configured and we got plain text HTTP request, // return 403 (forbidden) error. bufconn.Write(sslRequiredErrMsg) bufconn.Close() + return } + + var resource, host string + if resource, host, err = getResourceHost(bufconn, listener.maxHeaderBytes, len(tokens[0])+1); err != nil { + if !isRoutineNetErr(err) { + reqInfo := (&logger.ReqInfo{}).AppendTags("remoteAddr", bufconn.RemoteAddr().String()) + reqInfo.AppendTags("localAddr", bufconn.LocalAddr().String()) + ctx := logger.SetReqInfo(context.Background(), reqInfo) + logger.LogIf(ctx, err) + } + bufconn.Close() + return + } + + header := make(http.Header) + if host != "" { + header.Add("Host", host) + } + bufconn.setRequest(&http.Request{ + Method: tokens[0], + URL: &url.URL{Path: resource}, + Host: bufconn.LocalAddr().String(), + Header: header, + }) + bufconn.setUpdateFuncs(listener.updateBytesReadFunc, listener.updateBytesWrittenFunc) + + send(acceptResult{bufconn, nil}, doneCh) return } @@ -182,8 +243,7 @@ func (listener *httpListener) start() { } // Check whether the connection contains HTTP request or not. - bufconn = newBufConn(tlsConn, listener.readTimeout, listener.writeTimeout, - listener.updateBytesReadFunc, listener.updateBytesWrittenFunc) + bufconn = newBufConn(tlsConn, listener.readTimeout, listener.writeTimeout) // Peek bytes of maximum length of all HTTP methods. data, err = bufconn.Peek(methodMaxLen) @@ -201,6 +261,30 @@ func (listener *httpListener) start() { // Return bufconn if read data is a valid HTTP method. tokens := strings.SplitN(string(data), " ", 2) if isHTTPMethod(tokens[0]) { + var resource, host string + if resource, host, err = getResourceHost(bufconn, listener.maxHeaderBytes, len(tokens[0])+1); err != nil { + if !isRoutineNetErr(err) { + reqInfo := (&logger.ReqInfo{}).AppendTags("remoteAddr", bufconn.RemoteAddr().String()) + reqInfo.AppendTags("localAddr", bufconn.LocalAddr().String()) + ctx := logger.SetReqInfo(context.Background(), reqInfo) + logger.LogIf(ctx, err) + } + bufconn.Close() + return + } + + header := make(http.Header) + if host != "" { + header.Add("Host", host) + } + bufconn.setRequest(&http.Request{ + Method: tokens[0], + URL: &url.URL{Path: resource}, + Host: bufconn.LocalAddr().String(), + Header: header, + }) + bufconn.setUpdateFuncs(listener.updateBytesReadFunc, listener.updateBytesWrittenFunc) + send(acceptResult{bufconn, nil}, doneCh) return } @@ -296,8 +380,9 @@ func newHTTPListener(serverAddrs []string, tcpKeepAliveTimeout time.Duration, readTimeout time.Duration, writeTimeout time.Duration, - updateBytesReadFunc func(int), - updateBytesWrittenFunc func(int)) (listener *httpListener, err error) { + maxHeaderBytes int, + updateBytesReadFunc func(*http.Request, int), + updateBytesWrittenFunc func(*http.Request, int)) (listener *httpListener, err error) { var tcpListeners []*net.TCPListener @@ -333,6 +418,7 @@ func newHTTPListener(serverAddrs []string, tcpKeepAliveTimeout: tcpKeepAliveTimeout, readTimeout: readTimeout, writeTimeout: writeTimeout, + maxHeaderBytes: maxHeaderBytes, updateBytesReadFunc: updateBytesReadFunc, updateBytesWrittenFunc: updateBytesWrittenFunc, } diff --git a/cmd/http/listener_test.go b/cmd/http/listener_test.go index 62d2a1cc2..42d47d350 100644 --- a/cmd/http/listener_test.go +++ b/cmd/http/listener_test.go @@ -19,12 +19,12 @@ package http import ( "bufio" "bytes" - "context" "crypto/tls" "errors" "fmt" "io" "net" + "net/http" "runtime" "strconv" "strings" @@ -212,7 +212,7 @@ func TestNewHTTPListener(t *testing.T) { } remoteUnknownErr := "lookup unknown-host" + errMsg - if runtime.GOOS == "wpindows" { + if runtime.GOOS == "windows" { remoteUnknownErr = "listen tcp: lookup unknown-host" + errMsg } @@ -224,19 +224,18 @@ func TestNewHTTPListener(t *testing.T) { tcpKeepAliveTimeout time.Duration readTimeout time.Duration writeTimeout time.Duration - updateBytesReadFunc func(int) - updateBytesWrittenFunc func(int) - errorLogFunc func(context.Context, error) + updateBytesReadFunc func(*http.Request, int) + updateBytesWrittenFunc func(*http.Request, int) expectedErr error }{ - {[]string{"93.184.216.34:65432"}, nil, time.Duration(0), time.Duration(0), time.Duration(0), nil, nil, nil, errors.New(remoteAddrErrMsgIP)}, - {[]string{"example.org:65432"}, nil, time.Duration(0), time.Duration(0), time.Duration(0), nil, nil, nil, errors.New(remoteAddrErrMsgHost)}, - {[]string{"unknown-host"}, nil, time.Duration(0), time.Duration(0), time.Duration(0), nil, nil, nil, errors.New(remoteMissingErr)}, - {[]string{"unknown-host:65432"}, nil, time.Duration(0), time.Duration(0), time.Duration(0), nil, nil, nil, errors.New(remoteUnknownErr)}, - {[]string{"localhost:65432", "93.184.216.34:65432"}, nil, time.Duration(0), time.Duration(0), time.Duration(0), nil, nil, nil, errors.New(remoteAddrErrMsgIP)}, - {[]string{"localhost:65432", "unknown-host:65432"}, nil, time.Duration(0), time.Duration(0), time.Duration(0), nil, nil, nil, errors.New(remoteUnknownErr)}, - {[]string{"localhost:0"}, nil, time.Duration(0), time.Duration(0), time.Duration(0), nil, nil, nil, nil}, - {[]string{"localhost:0"}, tlsConfig, time.Duration(0), time.Duration(0), time.Duration(0), nil, nil, nil, nil}, + {[]string{"93.184.216.34:65432"}, nil, time.Duration(0), time.Duration(0), time.Duration(0), nil, nil, errors.New(remoteAddrErrMsgIP)}, + {[]string{"example.org:65432"}, nil, time.Duration(0), time.Duration(0), time.Duration(0), nil, nil, errors.New(remoteAddrErrMsgHost)}, + {[]string{"unknown-host"}, nil, time.Duration(0), time.Duration(0), time.Duration(0), nil, nil, errors.New(remoteMissingErr)}, + {[]string{"unknown-host:65432"}, nil, time.Duration(0), time.Duration(0), time.Duration(0), nil, nil, errors.New(remoteUnknownErr)}, + {[]string{"localhost:65432", "93.184.216.34:65432"}, nil, time.Duration(0), time.Duration(0), time.Duration(0), nil, nil, errors.New(remoteAddrErrMsgIP)}, + {[]string{"localhost:65432", "unknown-host:65432"}, nil, time.Duration(0), time.Duration(0), time.Duration(0), nil, nil, errors.New(remoteUnknownErr)}, + {[]string{"localhost:0"}, nil, time.Duration(0), time.Duration(0), time.Duration(0), nil, nil, nil}, + {[]string{"localhost:0"}, tlsConfig, time.Duration(0), time.Duration(0), time.Duration(0), nil, nil, nil}, } for _, testCase := range testCases { @@ -246,6 +245,7 @@ func TestNewHTTPListener(t *testing.T) { testCase.tcpKeepAliveTimeout, testCase.readTimeout, testCase.writeTimeout, + DefaultMaxHeaderBytes, testCase.updateBytesReadFunc, testCase.updateBytesWrittenFunc, ) @@ -297,6 +297,7 @@ func TestHTTPListenerStartClose(t *testing.T) { time.Duration(0), time.Duration(0), time.Duration(0), + DefaultMaxHeaderBytes, nil, nil, ) @@ -344,6 +345,7 @@ func TestHTTPListenerAddr(t *testing.T) { time.Duration(0), time.Duration(0), time.Duration(0), + DefaultMaxHeaderBytes, nil, nil, ) @@ -388,6 +390,7 @@ func TestHTTPListenerAddrs(t *testing.T) { time.Duration(0), time.Duration(0), time.Duration(0), + DefaultMaxHeaderBytes, nil, nil, ) @@ -414,17 +417,18 @@ func TestHTTPListenerAccept(t *testing.T) { nonLoopBackIP := getNonLoopBackIP(t) testCases := []struct { - serverAddrs []string - tlsConfig *tls.Config - request string - reply string + serverAddrs []string + tlsConfig *tls.Config + request string + reply string + expectedRequestLine string }{ - {[]string{"localhost:0"}, nil, "GET / HTTP/1.0\n", "200 OK\n"}, - {[]string{nonLoopBackIP + ":0"}, nil, "POST / HTTP/1.0\n", "200 OK\n"}, - {[]string{"127.0.0.1:0", nonLoopBackIP + ":0"}, nil, "CONNECT \n", "200 OK\n"}, - {[]string{"localhost:0"}, tlsConfig, "GET / HTTP/1.0\n", "200 OK\n"}, - {[]string{nonLoopBackIP + ":0"}, tlsConfig, "POST / HTTP/1.0\n", "200 OK\n"}, - {[]string{"127.0.0.1:0", nonLoopBackIP + ":0"}, tlsConfig, "CONNECT \n", "200 OK\n"}, + {[]string{"localhost:0"}, nil, "GET / HTTP/1.0\r\nHost: example.org\r\n\r\n", "200 OK\r\n", "GET / HTTP/1.0\r\n"}, + {[]string{nonLoopBackIP + ":0"}, nil, "POST / HTTP/1.0\r\nHost: example.org\r\n\r\n", "200 OK\r\n", "POST / HTTP/1.0\r\n"}, + {[]string{"127.0.0.1:0", nonLoopBackIP + ":0"}, nil, "CONNECT \r\nHost: www.example.org\r\n\r\n", "200 OK\r\n", "CONNECT \r\n"}, + {[]string{"localhost:0"}, tlsConfig, "GET / HTTP/1.0\r\nHost: example.org\r\n\r\n", "200 OK\r\n", "GET / HTTP/1.0\r\n"}, + {[]string{nonLoopBackIP + ":0"}, tlsConfig, "POST / HTTP/1.0\r\nHost: example.org\r\n\r\n", "200 OK\r\n", "POST / HTTP/1.0\r\n"}, + {[]string{"127.0.0.1:0", nonLoopBackIP + ":0"}, tlsConfig, "CONNECT \r\nHost: www.example.org\r\n\r\n", "200 OK\r\n", "CONNECT \r\n"}, } for i, testCase := range testCases { @@ -434,6 +438,7 @@ func TestHTTPListenerAccept(t *testing.T) { time.Duration(0), time.Duration(0), time.Duration(0), + DefaultMaxHeaderBytes, nil, nil, ) @@ -463,13 +468,13 @@ func TestHTTPListenerAccept(t *testing.T) { t.Fatalf("Test %d: accept: expected = , got = %v", i+1, err) } - request, err := bufio.NewReader(serverConn).ReadString('\n') + requestLine, err := bufio.NewReader(serverConn).ReadString('\n') if err != nil { t.Fatalf("Test %d: request read: expected = , got = %v", i+1, err) } - if testCase.request != request { - t.Fatalf("Test %d: request: expected = %v, got = %v", i+1, testCase.request, request) + if requestLine != testCase.expectedRequestLine { + t.Fatalf("Test %d: request: expected = %v, got = %v", i+1, testCase.expectedRequestLine, requestLine) } if _, err = io.WriteString(serverConn, testCase.reply); err != nil { @@ -513,6 +518,7 @@ func TestHTTPListenerAcceptPeekError(t *testing.T) { time.Duration(0), time.Duration(0), time.Duration(0), + DefaultMaxHeaderBytes, nil, nil, ) @@ -566,6 +572,7 @@ func TestHTTPListenerAcceptTLSError(t *testing.T) { time.Duration(0), time.Duration(0), time.Duration(0), + DefaultMaxHeaderBytes, nil, nil, ) @@ -632,6 +639,7 @@ func TestHTTPListenerAcceptError(t *testing.T) { time.Duration(0), time.Duration(0), time.Duration(0), + DefaultMaxHeaderBytes, nil, nil, ) @@ -757,6 +765,7 @@ func TestHTTPListenerAcceptParallel(t *testing.T) { time.Duration(0), time.Duration(0), time.Duration(0), + DefaultMaxHeaderBytes, nil, nil, ) @@ -765,8 +774,8 @@ func TestHTTPListenerAcceptParallel(t *testing.T) { } for _, serverAddr := range listener.Addrs() { - go connect(i, serverAddr.String(), testCase.tlsConfig != nil, true, "GET /1 HTTP/1.0\n", testCase.reply) - go connect(i, serverAddr.String(), testCase.tlsConfig != nil, false, "GET /2 HTTP/1.0\n", testCase.reply) + go connect(i, serverAddr.String(), testCase.tlsConfig != nil, true, "GET /1 HTTP/1.0\r\nHost: example.org\r\nr\n", testCase.reply) + go connect(i, serverAddr.String(), testCase.tlsConfig != nil, false, "GET /2 HTTP/1.0\r\nHost: example.org\r\n\r\n", testCase.reply) var wg sync.WaitGroup @@ -775,14 +784,14 @@ func TestHTTPListenerAcceptParallel(t *testing.T) { t.Fatalf("Test %d: accept: expected = , got = %v", i+1, err) } wg.Add(1) - go handleConnection(i, &wg, serverConn, "GET /2 HTTP/1.0\n", testCase.reply) + go handleConnection(i, &wg, serverConn, "GET /2 HTTP/1.0\r\n", testCase.reply) serverConn, err = listener.Accept() if err != nil { t.Fatalf("Test %d: accept: expected = , got = %v", i+1, err) } wg.Add(1) - go handleConnection(i, &wg, serverConn, "GET /1 HTTP/1.0\n", testCase.reply) + go handleConnection(i, &wg, serverConn, "GET /1 HTTP/1.0\r\n", testCase.reply) wg.Wait() } diff --git a/cmd/http/server.go b/cmd/http/server.go index 4aa0749fe..3598f1ca9 100644 --- a/cmd/http/server.go +++ b/cmd/http/server.go @@ -51,15 +51,15 @@ const ( // Server - extended http.Server supports multiple addresses to serve and enhanced connection handling. type Server struct { http.Server - Addrs []string // addresses on which the server listens for new connection. - ShutdownTimeout time.Duration // timeout used for graceful server shutdown. - TCPKeepAliveTimeout time.Duration // timeout used for underneath TCP connection. - UpdateBytesReadFunc func(int) // function to be called to update bytes read in bufConn. - UpdateBytesWrittenFunc func(int) // function to be called to update bytes written in bufConn. - listenerMutex *sync.Mutex // to guard 'listener' field. - listener *httpListener // HTTP listener for all 'Addrs' field. - inShutdown uint32 // indicates whether the server is in shutdown or not - requestCount int32 // counter holds no. of request in progress. + Addrs []string // addresses on which the server listens for new connection. + ShutdownTimeout time.Duration // timeout used for graceful server shutdown. + TCPKeepAliveTimeout time.Duration // timeout used for underneath TCP connection. + UpdateBytesReadFunc func(*http.Request, int) // function to be called to update bytes read in bufConn. + UpdateBytesWrittenFunc func(*http.Request, int) // function to be called to update bytes written in bufConn. + listenerMutex *sync.Mutex // to guard 'listener' field. + listener *httpListener // HTTP listener for all 'Addrs' field. + inShutdown uint32 // indicates whether the server is in shutdown or not + requestCount int32 // counter holds no. of request in progress. } // GetRequestCount - returns number of request in progress. @@ -91,6 +91,7 @@ func (srv *Server) Start() (err error) { tcpKeepAliveTimeout, readTimeout, writeTimeout, + srv.MaxHeaderBytes, updateBytesReadFunc, updateBytesWrittenFunc, )