mirror of
https://github.com/minio/minio.git
synced 2025-01-11 15:03:22 -05:00
Remove setting net.Conn Deadlines as its not needed anymore (#8269)
This commit fixes a bug introduced in af6c6a2b35
.
Setting deadlines in Go results in arbitrary hangs as reported here
https://github.com/golang/go/issues/34385
Fixes https://github.com/minio/minio/issues/7852
This commit is contained in:
parent
520552ffa9
commit
4780fa5a58
57
cmd/http/accountingconn.go
Normal file
57
cmd/http/accountingconn.go
Normal file
@ -0,0 +1,57 @@
|
||||
/*
|
||||
* MinIO Cloud Storage, (C) 2017-2019 MinIO, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package http
|
||||
|
||||
import (
|
||||
"net"
|
||||
)
|
||||
|
||||
// AccountingConn - is a generic stream-oriented network connection supporting buffered reader and read/write timeout.
|
||||
type AccountingConn struct {
|
||||
net.Conn
|
||||
updateBytesReadFunc func(int) // function to be called to update bytes read.
|
||||
updateBytesWrittenFunc func(int) // function to be called to update bytes written.
|
||||
}
|
||||
|
||||
// Read - reads data from the connection using wrapped buffered reader.
|
||||
func (c *AccountingConn) Read(b []byte) (n int, err error) {
|
||||
n, err = c.Conn.Read(b)
|
||||
if err == nil && c.updateBytesReadFunc != nil {
|
||||
c.updateBytesReadFunc(n)
|
||||
}
|
||||
|
||||
return n, err
|
||||
}
|
||||
|
||||
// Write - writes data to the connection.
|
||||
func (c *AccountingConn) Write(b []byte) (n int, err error) {
|
||||
n, err = c.Conn.Write(b)
|
||||
if err == nil && c.updateBytesWrittenFunc != nil {
|
||||
c.updateBytesWrittenFunc(n)
|
||||
}
|
||||
|
||||
return n, err
|
||||
}
|
||||
|
||||
// newAccountingConn - creates a new connection object wrapping net.Conn with deadlines.
|
||||
func newAccountingConn(c net.Conn, updateBytesReadFunc, updateBytesWrittenFunc func(int)) *AccountingConn {
|
||||
return &AccountingConn{
|
||||
Conn: c,
|
||||
updateBytesReadFunc: updateBytesReadFunc,
|
||||
updateBytesWrittenFunc: updateBytesWrittenFunc,
|
||||
}
|
||||
}
|
@ -1,77 +0,0 @@
|
||||
/*
|
||||
* MinIO Cloud Storage, (C) 2017 MinIO, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package http
|
||||
|
||||
import (
|
||||
"net"
|
||||
"time"
|
||||
)
|
||||
|
||||
// DeadlineConn - is a generic stream-oriented network connection supporting buffered reader and read/write timeout.
|
||||
type DeadlineConn struct {
|
||||
net.Conn
|
||||
readTimeout time.Duration // sets the read timeout in the connection.
|
||||
writeTimeout time.Duration // sets the write timeout in the connection.
|
||||
updateBytesReadFunc func(int) // function to be called to update bytes read.
|
||||
updateBytesWrittenFunc func(int) // function to be called to update bytes written.
|
||||
}
|
||||
|
||||
// Sets read timeout
|
||||
func (c *DeadlineConn) setReadTimeout() {
|
||||
if c.readTimeout != 0 {
|
||||
c.SetReadDeadline(time.Now().UTC().Add(c.readTimeout))
|
||||
}
|
||||
}
|
||||
|
||||
func (c *DeadlineConn) setWriteTimeout() {
|
||||
if c.writeTimeout != 0 {
|
||||
c.SetWriteDeadline(time.Now().UTC().Add(c.writeTimeout))
|
||||
}
|
||||
}
|
||||
|
||||
// Read - reads data from the connection using wrapped buffered reader.
|
||||
func (c *DeadlineConn) Read(b []byte) (n int, err error) {
|
||||
c.setReadTimeout()
|
||||
n, err = c.Conn.Read(b)
|
||||
if err == nil && c.updateBytesReadFunc != nil {
|
||||
c.updateBytesReadFunc(n)
|
||||
}
|
||||
|
||||
return n, err
|
||||
}
|
||||
|
||||
// Write - writes data to the connection.
|
||||
func (c *DeadlineConn) Write(b []byte) (n int, err error) {
|
||||
c.setWriteTimeout()
|
||||
n, err = c.Conn.Write(b)
|
||||
if err == nil && c.updateBytesWrittenFunc != nil {
|
||||
c.updateBytesWrittenFunc(n)
|
||||
}
|
||||
|
||||
return n, err
|
||||
}
|
||||
|
||||
// newDeadlineConn - creates a new connection object wrapping net.Conn with deadlines.
|
||||
func newDeadlineConn(c net.Conn, readTimeout, writeTimeout time.Duration, updateBytesReadFunc, updateBytesWrittenFunc func(int)) *DeadlineConn {
|
||||
return &DeadlineConn{
|
||||
Conn: c,
|
||||
readTimeout: readTimeout,
|
||||
writeTimeout: writeTimeout,
|
||||
updateBytesReadFunc: updateBytesReadFunc,
|
||||
updateBytesWrittenFunc: updateBytesWrittenFunc,
|
||||
}
|
||||
}
|
@ -1,114 +0,0 @@
|
||||
/*
|
||||
* MinIO Cloud Storage, (C) 2017 MinIO, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package http
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"io"
|
||||
"net"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Test deadlineconn handles read timeout properly by reading two messages beyond deadline.
|
||||
func TestBuffConnReadTimeout(t *testing.T) {
|
||||
l, err := net.Listen("tcp", "localhost:0")
|
||||
if err != nil {
|
||||
t.Fatalf("unable to create listener. %v", err)
|
||||
}
|
||||
defer l.Close()
|
||||
serverAddr := l.Addr().String()
|
||||
|
||||
tcpListener, ok := l.(*net.TCPListener)
|
||||
if !ok {
|
||||
t.Fatalf("failed to assert to net.TCPListener")
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
tcpConn, terr := tcpListener.AcceptTCP()
|
||||
if terr != nil {
|
||||
t.Errorf("failed to accept new connection. %v", terr)
|
||||
return
|
||||
}
|
||||
deadlineconn := newDeadlineConn(tcpConn, 1*time.Second, 1*time.Second, nil, nil)
|
||||
defer deadlineconn.Close()
|
||||
|
||||
// Read a line
|
||||
var b = make([]byte, 12)
|
||||
_, terr = deadlineconn.Read(b)
|
||||
if terr != nil {
|
||||
t.Errorf("failed to read from client. %v", terr)
|
||||
return
|
||||
}
|
||||
received := string(b)
|
||||
if received != "message one\n" {
|
||||
t.Errorf(`server: expected: "message one\n", got: %v`, received)
|
||||
return
|
||||
}
|
||||
|
||||
// Wait for more than read timeout to simulate processing.
|
||||
time.Sleep(3 * time.Second)
|
||||
|
||||
_, terr = deadlineconn.Read(b)
|
||||
if terr != nil {
|
||||
t.Errorf("failed to read from client. %v", terr)
|
||||
return
|
||||
}
|
||||
received = string(b)
|
||||
if received != "message two\n" {
|
||||
t.Errorf(`server: expected: "message two\n", got: %v`, received)
|
||||
return
|
||||
}
|
||||
|
||||
// Send a response.
|
||||
_, terr = io.WriteString(deadlineconn, "messages received\n")
|
||||
if terr != nil {
|
||||
t.Errorf("failed to write to client. %v", terr)
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
c, err := net.Dial("tcp", serverAddr)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to connect to server. %v", err)
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
_, err = io.WriteString(c, "message one\n")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to write to server. %v", err)
|
||||
}
|
||||
_, err = io.WriteString(c, "message two\n")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to write to server. %v", err)
|
||||
}
|
||||
|
||||
received, err := bufio.NewReader(c).ReadString('\n')
|
||||
if err != nil {
|
||||
t.Fatalf("failed to read from server. %v", err)
|
||||
}
|
||||
if received != "messages received\n" {
|
||||
t.Fatalf(`client: expected: "messages received\n", got: %v`, received)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
@ -38,8 +38,6 @@ type httpListener struct {
|
||||
acceptCh chan acceptResult // channel where all TCP listeners write accepted connection.
|
||||
doneCh chan struct{} // done channel for TCP listener goroutines.
|
||||
tcpKeepAliveTimeout time.Duration
|
||||
readTimeout time.Duration
|
||||
writeTimeout time.Duration
|
||||
updateBytesReadFunc func(int) // function to be called to update bytes read in Deadlineconn.
|
||||
updateBytesWrittenFunc func(int) // function to be called to update bytes written in Deadlineconn.
|
||||
}
|
||||
@ -91,10 +89,9 @@ func (listener *httpListener) start() {
|
||||
tcpConn.SetKeepAlive(true)
|
||||
tcpConn.SetKeepAlivePeriod(listener.tcpKeepAliveTimeout)
|
||||
|
||||
deadlineConn := newDeadlineConn(tcpConn, listener.readTimeout,
|
||||
listener.writeTimeout, listener.updateBytesReadFunc, listener.updateBytesWrittenFunc)
|
||||
acctConn := newAccountingConn(tcpConn, listener.updateBytesReadFunc, listener.updateBytesWrittenFunc)
|
||||
|
||||
send(acceptResult{deadlineConn, nil}, doneCh)
|
||||
send(acceptResult{acctConn, nil}, doneCh)
|
||||
}
|
||||
|
||||
// Closure to handle TCPListener until done channel is closed.
|
||||
@ -176,8 +173,6 @@ func (listener *httpListener) Addrs() (addrs []net.Addr) {
|
||||
// * controls incoming connections only doing HTTP protocol
|
||||
func newHTTPListener(serverAddrs []string,
|
||||
tcpKeepAliveTimeout time.Duration,
|
||||
readTimeout time.Duration,
|
||||
writeTimeout time.Duration,
|
||||
updateBytesReadFunc func(int),
|
||||
updateBytesWrittenFunc func(int)) (listener *httpListener, err error) {
|
||||
|
||||
@ -214,8 +209,6 @@ func newHTTPListener(serverAddrs []string,
|
||||
listener = &httpListener{
|
||||
tcpListeners: tcpListeners,
|
||||
tcpKeepAliveTimeout: tcpKeepAliveTimeout,
|
||||
readTimeout: readTimeout,
|
||||
writeTimeout: writeTimeout,
|
||||
updateBytesReadFunc: updateBytesReadFunc,
|
||||
updateBytesWrittenFunc: updateBytesWrittenFunc,
|
||||
}
|
||||
|
@ -154,8 +154,6 @@ func TestNewHTTPListener(t *testing.T) {
|
||||
listener, err := newHTTPListener(
|
||||
testCase.serverAddrs,
|
||||
testCase.tcpKeepAliveTimeout,
|
||||
testCase.readTimeout,
|
||||
testCase.writeTimeout,
|
||||
testCase.updateBytesReadFunc,
|
||||
testCase.updateBytesWrittenFunc,
|
||||
)
|
||||
@ -192,8 +190,6 @@ func TestHTTPListenerStartClose(t *testing.T) {
|
||||
listener, err := newHTTPListener(
|
||||
testCase.serverAddrs,
|
||||
time.Duration(0),
|
||||
time.Duration(0),
|
||||
time.Duration(0),
|
||||
nil, nil,
|
||||
)
|
||||
if err != nil {
|
||||
@ -235,8 +231,6 @@ func TestHTTPListenerAddr(t *testing.T) {
|
||||
listener, err := newHTTPListener(
|
||||
testCase.serverAddrs,
|
||||
time.Duration(0),
|
||||
time.Duration(0),
|
||||
time.Duration(0),
|
||||
nil, nil,
|
||||
)
|
||||
if err != nil {
|
||||
@ -275,8 +269,6 @@ func TestHTTPListenerAddrs(t *testing.T) {
|
||||
listener, err := newHTTPListener(
|
||||
testCase.serverAddrs,
|
||||
time.Duration(0),
|
||||
time.Duration(0),
|
||||
time.Duration(0),
|
||||
nil, nil,
|
||||
)
|
||||
if err != nil {
|
||||
|
@ -39,12 +39,6 @@ const (
|
||||
// DefaultTCPKeepAliveTimeout - default TCP keep alive timeout for accepted connection.
|
||||
DefaultTCPKeepAliveTimeout = 30 * time.Second
|
||||
|
||||
// DefaultReadTimeout - default timout to read data from accepted connection.
|
||||
DefaultReadTimeout = 5 * time.Minute
|
||||
|
||||
// DefaultWriteTimeout - default timout to write data to accepted connection.
|
||||
DefaultWriteTimeout = 5 * time.Minute
|
||||
|
||||
// DefaultMaxHeaderBytes - default maximum HTTP header size in bytes.
|
||||
DefaultMaxHeaderBytes = 1 * humanize.MiByte
|
||||
)
|
||||
@ -53,8 +47,6 @@ const (
|
||||
type Server struct {
|
||||
http.Server
|
||||
Addrs []string // addresses on which the server listens for new connection.
|
||||
ReadTimeout time.Duration // timeout used for net.Conn.Read() deadlines.
|
||||
WriteTimeout time.Duration // timeout used for net.Conn.Write() deadlines.
|
||||
ShutdownTimeout time.Duration // timeout used for graceful server shutdown.
|
||||
TCPKeepAliveTimeout time.Duration // timeout used for underneath TCP connection.
|
||||
UpdateBytesReadFunc func(int) // function to be called to update bytes read in bufConn.
|
||||
@ -77,8 +69,6 @@ func (srv *Server) Start() (err error) {
|
||||
if srv.TLSConfig != nil {
|
||||
tlsConfig = srv.TLSConfig.Clone()
|
||||
}
|
||||
readTimeout := srv.ReadTimeout
|
||||
writeTimeout := srv.WriteTimeout
|
||||
handler := srv.Handler // if srv.Handler holds non-synced state -> possible data race
|
||||
|
||||
addrs := set.CreateStringSet(srv.Addrs...).ToSlice() // copy and remove duplicates
|
||||
@ -89,8 +79,6 @@ func (srv *Server) Start() (err error) {
|
||||
listener, err = newHTTPListener(
|
||||
addrs,
|
||||
tcpKeepAliveTimeout,
|
||||
readTimeout,
|
||||
writeTimeout,
|
||||
srv.UpdateBytesReadFunc,
|
||||
srv.UpdateBytesWrittenFunc,
|
||||
)
|
||||
@ -213,8 +201,6 @@ func NewServer(addrs []string, handler http.Handler, getCert certs.GetCertificat
|
||||
}
|
||||
httpServer.Handler = handler
|
||||
httpServer.TLSConfig = tlsConfig
|
||||
httpServer.ReadTimeout = DefaultReadTimeout
|
||||
httpServer.WriteTimeout = DefaultWriteTimeout
|
||||
httpServer.MaxHeaderBytes = DefaultMaxHeaderBytes
|
||||
|
||||
return httpServer
|
||||
|
@ -77,14 +77,6 @@ func TestNewServer(t *testing.T) {
|
||||
t.Fatalf("Case %v: server.TCPKeepAliveTimeout: expected: %v, got: %v", (i + 1), DefaultTCPKeepAliveTimeout, server.TCPKeepAliveTimeout)
|
||||
}
|
||||
|
||||
if server.ReadTimeout != DefaultReadTimeout {
|
||||
t.Fatalf("Case %v: server.ReadTimeout: expected: %v, got: %v", (i + 1), DefaultReadTimeout, server.ReadTimeout)
|
||||
}
|
||||
|
||||
if server.WriteTimeout != DefaultWriteTimeout {
|
||||
t.Fatalf("Case %v: server.WriteTimeout: expected: %v, got: %v", (i + 1), DefaultWriteTimeout, server.WriteTimeout)
|
||||
}
|
||||
|
||||
if server.MaxHeaderBytes != DefaultMaxHeaderBytes {
|
||||
t.Fatalf("Case %v: server.MaxHeaderBytes: expected: %v, got: %v", (i + 1), DefaultMaxHeaderBytes, server.MaxHeaderBytes)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user