From 9547b7d0e97662ccbe56b6f1ff9970f2a9b8dbb5 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Sat, 5 Nov 2022 11:09:21 -0700 Subject: [PATCH] add deadlineConnections on remoteTransport (#16010) --- cmd/globals.go | 2 + cmd/server-main.go | 17 +++ cmd/utils.go | 31 +++++- internal/deadlineconn/deadlineconn.go | 77 ++++++++++++++ internal/deadlineconn/deadlineconn_test.go | 117 +++++++++++++++++++++ 5 files changed, 239 insertions(+), 5 deletions(-) create mode 100644 internal/deadlineconn/deadlineconn.go create mode 100644 internal/deadlineconn/deadlineconn_test.go diff --git a/cmd/globals.go b/cmd/globals.go index ec3be18ff..c7fe45601 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -373,6 +373,8 @@ var ( // Public key for subnet confidential information subnetAdminPublicKey = []byte("-----BEGIN PUBLIC KEY-----\nMIIBCgKCAQEAyC+ol5v0FP+QcsR6d1KypR/063FInmNEFsFzbEwlHQyEQN3O7kNI\nwVDN1vqp1wDmJYmv4VZGRGzfFw1q+QV7K1TnysrEjrqpVxfxzDQCoUadAp8IxLLc\ns2fjyDNxnZjoC6fTID9C0khKnEa5fPZZc3Ihci9SiCGkPmyUyCGVSxWXIKqL2Lrj\nyDc0pGeEhWeEPqw6q8X2jvTC246tlzqpDeNsPbcv2KblXRcKniQNbBrizT37CKHQ\nM6hc9kugrZbFuo8U5/4RQvZPJnx/DVjLDyoKo2uzuVQs4s+iBrA5sSSLp8rPED/3\n6DgWw3e244Dxtrg972dIT1IOqgn7KUJzVQIDAQAB\n-----END PUBLIC KEY-----") + globalConnReadDeadline time.Duration + globalConnWriteDeadline time.Duration // Add new variable global values here. ) diff --git a/cmd/server-main.go b/cmd/server-main.go index a4edfca0b..a3a0d28f4 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -91,6 +91,20 @@ var ServerFlags = []cli.Flag{ EnvVar: "MINIO_READ_HEADER_TIMEOUT", Hidden: true, }, + cli.DurationFlag{ + Name: "conn-read-deadline", + Usage: "custom connection READ deadline", + Hidden: true, + Value: 10 * time.Minute, + EnvVar: "MINIO_CONN_READ_DEADLINE", + }, + cli.DurationFlag{ + Name: "conn-write-deadline", + Usage: "custom connection WRITE deadline", + Hidden: true, + Value: 10 * time.Minute, + EnvVar: "MINIO_CONN_WRITE_DEADLINE", + }, } var gatewayCmd = cli.Command{ @@ -249,6 +263,9 @@ func serverHandleCmdArgs(ctx *cli.Context) { globalIsErasure = true } globalIsErasureSD = (setupType == ErasureSDSetupType) + + globalConnReadDeadline = ctx.Duration("conn-read-deadline") + globalConnWriteDeadline = ctx.Duration("conn-write-deadline") } func serverHandleEnvVars() { diff --git a/cmd/utils.go b/cmd/utils.go index 97f6b5e9e..5465ad585 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -51,6 +51,7 @@ import ( "github.com/minio/minio/internal/config" "github.com/minio/minio/internal/config/api" xtls "github.com/minio/minio/internal/config/identity/tls" + "github.com/minio/minio/internal/deadlineconn" "github.com/minio/minio/internal/fips" "github.com/minio/minio/internal/handlers" "github.com/minio/minio/internal/hash" @@ -726,17 +727,37 @@ func newHTTPTransport(timeout time.Duration) *http.Transport { return tr } +type dialContext func(ctx context.Context, network, addr string) (net.Conn, error) + +// newCustomDialContext setups a custom dialer for any external communication and proxies. +func newCustomDialContext() dialContext { + return func(ctx context.Context, network, addr string) (net.Conn, error) { + dialer := &net.Dialer{ + Timeout: 15 * time.Second, + KeepAlive: 30 * time.Second, + } + + conn, err := dialer.DialContext(ctx, network, addr) + if err != nil { + return nil, err + } + + dconn := deadlineconn.New(conn). + WithReadDeadline(globalConnReadDeadline). + WithWriteDeadline(globalConnWriteDeadline) + + return dconn, nil + } +} + // NewRemoteTargetHTTPTransport returns a new http configuration // used while communicating with the remote replication targets. func NewRemoteTargetHTTPTransport() func() *http.Transport { // For more details about various values used here refer // https://golang.org/pkg/net/http/#Transport documentation tr := &http.Transport{ - Proxy: http.ProxyFromEnvironment, - DialContext: (&net.Dialer{ - Timeout: 15 * time.Second, - KeepAlive: 30 * time.Second, - }).DialContext, + Proxy: http.ProxyFromEnvironment, + DialContext: newCustomDialContext(), MaxIdleConnsPerHost: 1024, WriteBufferSize: 32 << 10, // 32KiB moving up from 4KiB default ReadBufferSize: 32 << 10, // 32KiB moving up from 4KiB default diff --git a/internal/deadlineconn/deadlineconn.go b/internal/deadlineconn/deadlineconn.go new file mode 100644 index 000000000..25b479ea3 --- /dev/null +++ b/internal/deadlineconn/deadlineconn.go @@ -0,0 +1,77 @@ +// Copyright (c) 2015-2022 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +// Package deadlineconn implements net.Conn wrapper with configured deadlines. +package deadlineconn + +import ( + "net" + "time" +) + +// DeadlineConn - is a generic stream-oriented network connection supporting buffered reader and read/write timeout. +type DeadlineConn struct { + net.Conn + readDeadline time.Duration // sets the read deadline on a connection. + writeDeadline time.Duration // sets the write deadline on a connection. +} + +// Sets read deadline +func (c *DeadlineConn) setReadDeadline() { + if c.readDeadline > 0 { + c.SetReadDeadline(time.Now().UTC().Add(c.readDeadline)) + } +} + +func (c *DeadlineConn) setWriteDeadline() { + if c.writeDeadline > 0 { + c.SetWriteDeadline(time.Now().UTC().Add(c.writeDeadline)) + } +} + +// Read - reads data from the connection using wrapped buffered reader. +func (c *DeadlineConn) Read(b []byte) (n int, err error) { + c.setReadDeadline() + n, err = c.Conn.Read(b) + return n, err +} + +// Write - writes data to the connection. +func (c *DeadlineConn) Write(b []byte) (n int, err error) { + c.setWriteDeadline() + n, err = c.Conn.Write(b) + return n, err +} + +// WithReadDeadline sets a new read side net.Conn deadline. +func (c *DeadlineConn) WithReadDeadline(d time.Duration) *DeadlineConn { + c.readDeadline = d + return c +} + +// WithWriteDeadline sets a new write side net.Conn deadline. +func (c *DeadlineConn) WithWriteDeadline(d time.Duration) *DeadlineConn { + c.writeDeadline = d + return c +} + +// New - creates a new connection object wrapping net.Conn with deadlines. +func New(c net.Conn) *DeadlineConn { + return &DeadlineConn{ + Conn: c, + } +} diff --git a/internal/deadlineconn/deadlineconn_test.go b/internal/deadlineconn/deadlineconn_test.go new file mode 100644 index 000000000..c8269f97b --- /dev/null +++ b/internal/deadlineconn/deadlineconn_test.go @@ -0,0 +1,117 @@ +// Copyright (c) 2015-2022 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package deadlineconn + +import ( + "bufio" + "io" + "net" + "sync" + "testing" + "time" +) + +// Test deadlineconn handles read timeout properly by reading two messages beyond deadline. +func TestBuffConnReadTimeout(t *testing.T) { + l, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("unable to create listener. %v", err) + } + defer l.Close() + serverAddr := l.Addr().String() + + tcpListener, ok := l.(*net.TCPListener) + if !ok { + t.Fatalf("failed to assert to net.TCPListener") + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + + tcpConn, terr := tcpListener.AcceptTCP() + if terr != nil { + t.Errorf("failed to accept new connection. %v", terr) + return + } + deadlineconn := New(tcpConn) + deadlineconn.WithReadDeadline(time.Second) + deadlineconn.WithWriteDeadline(time.Second) + defer deadlineconn.Close() + + // Read a line + b := make([]byte, 12) + _, terr = deadlineconn.Read(b) + if terr != nil { + t.Errorf("failed to read from client. %v", terr) + return + } + received := string(b) + if received != "message one\n" { + t.Errorf(`server: expected: "message one\n", got: %v`, received) + return + } + + // Wait for more than read timeout to simulate processing. + time.Sleep(3 * time.Second) + + _, terr = deadlineconn.Read(b) + if terr != nil { + t.Errorf("failed to read from client. %v", terr) + return + } + received = string(b) + if received != "message two\n" { + t.Errorf(`server: expected: "message two\n", got: %v`, received) + return + } + + // Send a response. + _, terr = io.WriteString(deadlineconn, "messages received\n") + if terr != nil { + t.Errorf("failed to write to client. %v", terr) + return + } + }() + + c, err := net.Dial("tcp", serverAddr) + if err != nil { + t.Fatalf("unable to connect to server. %v", err) + } + defer c.Close() + + _, err = io.WriteString(c, "message one\n") + if err != nil { + t.Fatalf("failed to write to server. %v", err) + } + _, err = io.WriteString(c, "message two\n") + if err != nil { + t.Fatalf("failed to write to server. %v", err) + } + + received, err := bufio.NewReader(c).ReadString('\n') + if err != nil { + t.Fatalf("failed to read from server. %v", err) + } + if received != "messages received\n" { + t.Fatalf(`client: expected: "messages received\n", got: %v`, received) + } + + wg.Wait() +}