From fd1b8491dbe541d780197064e945c451ad733b73 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Wed, 5 Sep 2018 16:47:14 -0700 Subject: [PATCH] Drain response body properly for http connection pool (#6415) Currently Go http connection pool was not being properly utilized leading to degrading performance as the number of concurrent requests increased. As recommended by Go implementation, we have to drain the response body and close it. --- cmd/gateway/oss/gateway-oss.go | 8 ++------ cmd/gateway/sia/gateway-sia.go | 14 +++++++------- cmd/logger/http.go | 4 ++++ cmd/rpc/client.go | 26 +++++++++++++++++++++++++- cmd/rpc/pool.go | 7 +++++++ cmd/update-main.go | 4 ++-- cmd/utils.go | 31 +++++++++++++++++++++++++++++++ pkg/event/target/webhook.go | 3 +++ pkg/madmin/config-commands.go | 2 +- 9 files changed, 82 insertions(+), 17 deletions(-) diff --git a/cmd/gateway/oss/gateway-oss.go b/cmd/gateway/oss/gateway-oss.go index 1423d6b7a..4a7c4c5a9 100644 --- a/cmd/gateway/oss/gateway-oss.go +++ b/cmd/gateway/oss/gateway-oss.go @@ -21,7 +21,6 @@ import ( "encoding/xml" "fmt" "io" - "io/ioutil" "net/http" "strconv" "strings" @@ -816,11 +815,8 @@ func ossListObjectParts(client *oss.Client, bucket, object, uploadID string, par return lupr, err } - defer func() { - // always drain output (response body) - io.CopyN(ioutil.Discard, resp.Body, 512) - resp.Body.Close() - }() + // always drain output (response body) + defer minio.CloseResponse(resp.Body) err = xml.NewDecoder(resp.Body).Decode(&lupr) if err != nil { diff --git a/cmd/gateway/sia/gateway-sia.go b/cmd/gateway/sia/gateway-sia.go index f2d2f49dd..8695b9b85 100644 --- a/cmd/gateway/sia/gateway-sia.go +++ b/cmd/gateway/sia/gateway-sia.go @@ -233,13 +233,13 @@ func apiGet(ctx context.Context, addr, call, apiPassword string) (*http.Response return nil, err } if resp.StatusCode == http.StatusNotFound { - resp.Body.Close() + minio.CloseResponse(resp.Body) logger.LogIf(ctx, MethodNotSupported{call}) return nil, MethodNotSupported{call} } if non2xx(resp.StatusCode) { err := decodeError(resp) - resp.Body.Close() + minio.CloseResponse(resp.Body) logger.LogIf(ctx, err) return nil, err } @@ -266,13 +266,13 @@ func apiPost(ctx context.Context, addr, call, vals, apiPassword string) (*http.R } if resp.StatusCode == http.StatusNotFound { - resp.Body.Close() + minio.CloseResponse(resp.Body) return nil, MethodNotSupported{call} } if non2xx(resp.StatusCode) { err := decodeError(resp) - resp.Body.Close() + minio.CloseResponse(resp.Body) return nil, err } return resp, nil @@ -285,7 +285,7 @@ func post(ctx context.Context, addr, call, vals, apiPassword string) error { if err != nil { return err } - resp.Body.Close() + minio.CloseResponse(resp.Body) return nil } @@ -295,7 +295,7 @@ func list(ctx context.Context, addr string, apiPassword string, obj *renterFiles if err != nil { return err } - defer resp.Body.Close() + defer minio.CloseResponse(resp.Body) if resp.StatusCode == http.StatusNoContent { logger.LogIf(ctx, fmt.Errorf("Expecting a response, but API returned %s", resp.Status)) @@ -313,7 +313,7 @@ func get(ctx context.Context, addr, call, apiPassword string) error { if err != nil { return err } - resp.Body.Close() + minio.CloseResponse(resp.Body) return nil } diff --git a/cmd/logger/http.go b/cmd/logger/http.go index d331eca36..5c5b9b72c 100644 --- a/cmd/logger/http.go +++ b/cmd/logger/http.go @@ -20,6 +20,8 @@ import ( "bytes" "encoding/json" "errors" + "io" + "io/ioutil" "net/http" ) @@ -54,6 +56,8 @@ func (h *HTTPTarget) startHTTPLogger() { continue } if resp.Body != nil { + buf := make([]byte, 512) + io.CopyBuffer(ioutil.Discard, resp.Body, buf) resp.Body.Close() } } diff --git a/cmd/rpc/client.go b/cmd/rpc/client.go index 44025986d..efa45ff69 100644 --- a/cmd/rpc/client.go +++ b/cmd/rpc/client.go @@ -22,6 +22,8 @@ import ( "encoding/gob" "errors" "fmt" + "io" + "io/ioutil" "net" "net/http" "reflect" @@ -40,6 +42,28 @@ type Client struct { serviceURL *xnet.URL } +// closeResponse close non nil response with any response Body. +// convenient wrapper to drain any remaining data on response body. +// +// Subsequently this allows golang http RoundTripper +// to re-use the same connection for future requests. +func closeResponse(body io.ReadCloser) { + // Callers should close resp.Body when done reading from it. + // If resp.Body is not closed, the Client's underlying RoundTripper + // (typically Transport) may not be able to re-use a persistent TCP + // connection to the server for a subsequent "keep-alive" request. + if body != nil { + // Drain any remaining Body and then close the connection. + // Without this closing connection would disallow re-using + // the same connection for future uses. + // - http://stackoverflow.com/a/17961593/4465767 + bufp := b512pool.Get().(*[]byte) + defer b512pool.Put(bufp) + io.CopyBuffer(ioutil.Discard, body, *bufp) + body.Close() + } +} + // Call - calls service method on RPC server. func (client *Client) Call(serviceMethod string, args, reply interface{}) error { replyKind := reflect.TypeOf(reply).Kind() @@ -69,7 +93,7 @@ func (client *Client) Call(serviceMethod string, args, reply interface{}) error if err != nil { return err } - defer response.Body.Close() + defer closeResponse(response.Body) if response.StatusCode != http.StatusOK { return fmt.Errorf("%v rpc call failed with error code %v", serviceMethod, response.StatusCode) diff --git a/cmd/rpc/pool.go b/cmd/rpc/pool.go index 84dcbf6e1..72f627316 100644 --- a/cmd/rpc/pool.go +++ b/cmd/rpc/pool.go @@ -21,6 +21,13 @@ import ( "sync" ) +var b512pool = sync.Pool{ + New: func() interface{} { + buf := make([]byte, 512) + return &buf + }, +} + // A Pool is a type-safe wrapper around a sync.Pool. type Pool struct { p *sync.Pool diff --git a/cmd/update-main.go b/cmd/update-main.go index dae3b87c7..4f31ce2f3 100644 --- a/cmd/update-main.go +++ b/cmd/update-main.go @@ -324,7 +324,7 @@ func downloadReleaseURL(releaseChecksumURL string, timeout time.Duration, mode s if resp == nil { return content, fmt.Errorf("No response from server to download URL %s", releaseChecksumURL) } - defer resp.Body.Close() + defer CloseResponse(resp.Body) if resp.StatusCode != http.StatusOK { return content, fmt.Errorf("Error downloading URL %s. Response: %v", releaseChecksumURL, resp.Status) @@ -471,7 +471,7 @@ func doUpdate(sha256Hex string, latestReleaseTime time.Time, ok bool) (updateSta if err != nil { return updateStatusMsg, err } - defer resp.Body.Close() + defer CloseResponse(resp.Body) // FIXME: add support for gpg verification as well. if err = update.Apply(resp.Body, diff --git a/cmd/utils.go b/cmd/utils.go index 904d00a91..17aa2b966 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -25,12 +25,14 @@ import ( "encoding/xml" "fmt" "io" + "io/ioutil" "net" "net/http" "net/url" "os" "reflect" "strings" + "sync" "time" "github.com/minio/minio/cmd/logger" @@ -381,3 +383,32 @@ func isNetworkOrHostDown(err error) bool { } return false } + +var b512pool = sync.Pool{ + New: func() interface{} { + buf := make([]byte, 512) + return &buf + }, +} + +// CloseResponse close non nil response with any response Body. +// convenient wrapper to drain any remaining data on response body. +// +// Subsequently this allows golang http RoundTripper +// to re-use the same connection for future requests. +func CloseResponse(respBody io.ReadCloser) { + // Callers should close resp.Body when done reading from it. + // If resp.Body is not closed, the Client's underlying RoundTripper + // (typically Transport) may not be able to re-use a persistent TCP + // connection to the server for a subsequent "keep-alive" request. + if respBody != nil { + // Drain any remaining Body and then close the connection. + // Without this closing connection would disallow re-using + // the same connection for future uses. + // - http://stackoverflow.com/a/17961593/4465767 + bufp := b512pool.Get().(*[]byte) + defer b512pool.Put(bufp) + io.CopyBuffer(ioutil.Discard, respBody, *bufp) + respBody.Close() + } +} diff --git a/pkg/event/target/webhook.go b/pkg/event/target/webhook.go index a3bb03877..9ca49f6de 100644 --- a/pkg/event/target/webhook.go +++ b/pkg/event/target/webhook.go @@ -23,6 +23,8 @@ import ( "encoding/json" "errors" "fmt" + "io" + "io/ioutil" "net" "net/http" "net/url" @@ -89,6 +91,7 @@ func (target *WebhookTarget) Send(eventData event.Event) error { } // FIXME: log returned error. ignore time being. + io.Copy(ioutil.Discard, resp.Body) _ = resp.Body.Close() if resp.StatusCode < 200 || resp.StatusCode > 299 { diff --git a/pkg/madmin/config-commands.go b/pkg/madmin/config-commands.go index 7719fb57c..429a7aaf0 100644 --- a/pkg/madmin/config-commands.go +++ b/pkg/madmin/config-commands.go @@ -85,7 +85,7 @@ func (adm *AdminClient) GetConfig() ([]byte, error) { if resp.StatusCode != http.StatusOK { return nil, httpRespToErrorResponse(resp) } - defer resp.Body.Close() + defer closeResponse(resp) return DecryptServerConfigData(adm.secretAccessKey, resp.Body) }