1
0
mirror of https://github.com/minio/minio.git synced 2025-04-10 22:47:52 -04:00

replace io.Discard usage to fix some NUMA copy() latencies ()

replace io.Discard usage to fix NUMA copy() latencies

On NUMA systems copying from 8K buffer allocated via
io.Discard leads to large latency build-up for every

```
copy(new8kbuf, largebuf)
```

can in-cur upto 1ms worth of latencies on NUMA systems
due to memory sharding across NUMA nodes.
This commit is contained in:
Harshavardhana 2023-11-06 14:26:08 -08:00 committed by GitHub
parent 64bafe1dfe
commit 754f7a8a39
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 78 additions and 57 deletions

@ -31,6 +31,7 @@ import (
"github.com/dustin/go-humanize"
"github.com/minio/madmin-go/v3"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger"
"github.com/minio/mux"
"github.com/minio/pkg/v2/policy"
@ -537,7 +538,7 @@ func (a adminAPIHandlers) SiteReplicationDevNull(w http.ResponseWriter, r *http.
connectTime := time.Now()
for {
n, err := io.CopyN(io.Discard, r.Body, 128*humanize.KiByte)
n, err := io.CopyN(xioutil.Discard, r.Body, 128*humanize.KiByte)
atomic.AddUint64(&globalSiteNetPerfRX.RX, uint64(n))
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
// If there is a disconnection before globalNetPerfMinDuration (we give a margin of error of 1 sec)

@ -52,6 +52,7 @@ import (
"github.com/minio/minio/internal/dsync"
"github.com/minio/minio/internal/handlers"
xhttp "github.com/minio/minio/internal/http"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/kms"
"github.com/minio/minio/internal/logger"
"github.com/minio/mux"
@ -756,11 +757,8 @@ func (a adminAPIHandlers) ProfileHandler(w http.ResponseWriter, r *http.Request)
return
}
}
// read request body
io.CopyN(io.Discard, r.Body, 1)
globalProfilerMu.Lock()
if globalProfiler == nil {
globalProfiler = make(map[string]minioProfiler, 10)
}
@ -1220,7 +1218,7 @@ func (a adminAPIHandlers) ClientDevNull(w http.ResponseWriter, r *http.Request)
totalRx := int64(0)
connectTime := time.Now()
for {
n, err := io.CopyN(io.Discard, r.Body, 128*humanize.KiByte)
n, err := io.CopyN(xioutil.Discard, r.Body, 128*humanize.KiByte)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
// would mean the network is not stable. Logging here will help in debugging network issues.
if time.Since(connectTime) < (globalNetPerfMinDuration - time.Second) {

@ -18,7 +18,6 @@
package cmd
import (
"bytes"
"context"
"encoding/base64"
"fmt"
@ -26,7 +25,6 @@ import (
"strings"
"github.com/klauspost/reedsolomon"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger"
)
@ -85,7 +83,7 @@ func writeDataBlocks(ctx context.Context, dst io.Writer, enBlocks [][]byte, data
// We have written all the blocks, write the last remaining block.
if write < int64(len(block)) {
n, err := xioutil.Copy(dst, bytes.NewReader(block[:write]))
n, err := dst.Write(block[:write])
if err != nil {
// The writer will be closed incase of range queries, which will emit ErrClosedPipe.
// The reader pipe might be closed at ListObjects io.EOF ignore it.
@ -94,12 +92,12 @@ func writeDataBlocks(ctx context.Context, dst io.Writer, enBlocks [][]byte, data
}
return 0, err
}
totalWritten += n
totalWritten += int64(n)
break
}
// Copy the block.
n, err := xioutil.Copy(dst, bytes.NewReader(block))
n, err := dst.Write(block)
if err != nil {
// The writer will be closed incase of range queries, which will emit ErrClosedPipe.
// The reader pipe might be closed at ListObjects io.EOF ignore it.
@ -110,10 +108,10 @@ func writeDataBlocks(ctx context.Context, dst io.Writer, enBlocks [][]byte, data
}
// Decrement output size.
write -= n
write -= int64(n)
// Increment written.
totalWritten += n
totalWritten += int64(n)
}
// Success.

@ -33,6 +33,7 @@ import (
"github.com/minio/madmin-go/v3"
b "github.com/minio/minio/internal/bucket/bandwidth"
"github.com/minio/minio/internal/event"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/pubsub"
"github.com/minio/mux"
@ -1423,7 +1424,7 @@ func (s *peerRESTServer) DevNull(w http.ResponseWriter, r *http.Request) {
connectTime := time.Now()
ctx := newContext(r, w, "DevNull")
for {
n, err := io.CopyN(io.Discard, r.Body, 128*humanize.KiByte)
n, err := io.CopyN(xioutil.Discard, r.Body, 128*humanize.KiByte)
atomic.AddUint64(&globalNetPerfRX.RX, uint64(n))
if err != nil && err != io.EOF {
// If there is a disconnection before globalNetPerfMinDuration (we give a margin of error of 1 sec)

@ -34,6 +34,7 @@ import (
"github.com/minio/madmin-go/v3"
"github.com/minio/minio-go/v7"
xhttp "github.com/minio/minio/internal/http"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/pkg/v2/randreader"
)
@ -148,6 +149,8 @@ func selfSpeedTest(ctx context.Context, opts speedTestOpts) (SpeedTestResult, er
var downloadTimes madmin.TimeDurations
var downloadTTFB madmin.TimeDurations
wg.Add(opts.concurrency)
c := minio.Core{Client: globalMinioClient}
for i := 0; i < opts.concurrency; i++ {
go func(i int) {
defer wg.Done()
@ -161,7 +164,8 @@ func selfSpeedTest(ctx context.Context, opts speedTestOpts) (SpeedTestResult, er
}
tmpObjName := pathJoin(objNamePrefix, fmt.Sprintf("%d/%d", i, j))
t := time.Now()
r, err := globalMinioClient.GetObject(downloadsCtx, opts.bucketName, tmpObjName, gopts)
r, _, _, err := c.GetObject(downloadsCtx, opts.bucketName, tmpObjName, gopts)
if err != nil {
errResp, ok := err.(minio.ErrorResponse)
if ok && errResp.StatusCode == http.StatusNotFound {
@ -178,7 +182,7 @@ func selfSpeedTest(ctx context.Context, opts speedTestOpts) (SpeedTestResult, er
fbr := firstByteRecorder{
r: r,
}
n, err := io.Copy(io.Discard, &fbr)
n, err := xioutil.Copy(xioutil.Discard, &fbr)
r.Close()
if err == nil {
response := time.Since(t)

@ -856,10 +856,7 @@ func (client *storageRESTClient) CleanAbandonedData(ctx context.Context, volume
return err
}
defer xhttp.DrainBody(respBody)
respReader, err := waitForHTTPResponse(respBody)
if err == nil {
io.Copy(io.Discard, respReader)
}
_, err = waitForHTTPResponse(respBody)
return err
}

2
go.mod

@ -45,7 +45,7 @@ require (
github.com/minio/console v0.41.0
github.com/minio/csvparser v1.0.0
github.com/minio/dnscache v0.1.1
github.com/minio/dperf v0.5.1
github.com/minio/dperf v0.5.2
github.com/minio/highwayhash v1.0.2
github.com/minio/kes-go v0.2.0
github.com/minio/madmin-go/v3 v3.0.29

4
go.sum

@ -470,8 +470,8 @@ github.com/minio/csvparser v1.0.0 h1:xJEHcYK8ZAjeW4hNV9Zu30u+/2o4UyPnYgyjWp8b7ZU
github.com/minio/csvparser v1.0.0/go.mod h1:lKXskSLzPgC5WQyzP7maKH7Sl1cqvANXo9YCto8zbtM=
github.com/minio/dnscache v0.1.1 h1:AMYLqomzskpORiUA1ciN9k7bZT1oB3YZN4cEIi88W5o=
github.com/minio/dnscache v0.1.1/go.mod h1:WCumm6offO4rQ/82oTCSoHnlhTmc81+vXgKpBtSYRbg=
github.com/minio/dperf v0.5.1 h1:oXX1og7sajPlcnr0c/cJ9qBeoPFNlbCqt0WAd5t3TPY=
github.com/minio/dperf v0.5.1/go.mod h1:Y8HlbQ90LbtybYXEyMfsoVyOWndB1kGUo707uCwEuxo=
github.com/minio/dperf v0.5.2 h1:ZTCyWE9jnngK55w6eU9u4VvQktJGYsUP6kaaUYORDyE=
github.com/minio/dperf v0.5.2/go.mod h1:Y8HlbQ90LbtybYXEyMfsoVyOWndB1kGUo707uCwEuxo=
github.com/minio/filepath v1.0.0 h1:fvkJu1+6X+ECRA6G3+JJETj4QeAYO9sV43I79H8ubDY=
github.com/minio/filepath v1.0.0/go.mod h1:/nRZA2ldl5z6jT9/KQuvZcQlxZIMQoFFQPvEXx9T/Bw=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=

@ -23,7 +23,6 @@ import (
"crypto/tls"
"encoding/json"
"errors"
"io"
"net/http"
"strings"
"sync/atomic"
@ -31,6 +30,7 @@ import (
"time"
"github.com/minio/minio/internal/config/lambda/event"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger"
"github.com/minio/pkg/v2/certs"
xnet "github.com/minio/pkg/v2/net"
@ -133,8 +133,7 @@ func (target *WebhookTarget) isActive() (bool, error) {
}
return false, err
}
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
xioutil.DiscardReader(resp.Body)
// No network failure i.e response from the target means its up
return true, nil
}

@ -34,6 +34,7 @@ import (
elasticsearch7 "github.com/elastic/go-elasticsearch/v7"
"github.com/minio/highwayhash"
"github.com/minio/minio/internal/event"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/once"
"github.com/minio/minio/internal/store"
@ -474,7 +475,7 @@ func (c *esClientV7) createIndex(args ElasticsearchArgs) error {
if err != nil {
return err
}
defer DrainBody(resp.Body)
defer xioutil.DiscardReader(resp.Body)
if resp.IsError() {
return fmt.Errorf("Create index err: %v", res)
}
@ -490,7 +491,7 @@ func (c *esClientV7) ping(ctx context.Context, _ ElasticsearchArgs) (bool, error
if err != nil {
return false, store.ErrNotConnected
}
DrainBody(resp.Body)
xioutil.DiscardReader(resp.Body)
return !resp.IsError(), nil
}
@ -503,7 +504,7 @@ func (c *esClientV7) entryExists(ctx context.Context, index string, key string)
if err != nil {
return false, err
}
DrainBody(res.Body)
xioutil.DiscardReader(res.Body)
return !res.IsError(), nil
}
@ -518,7 +519,7 @@ func (c *esClientV7) removeEntry(ctx context.Context, index string, key string)
if err != nil {
return err
}
defer DrainBody(res.Body)
defer xioutil.DiscardReader(res.Body)
if res.IsError() {
return fmt.Errorf("Delete err: %s", res.String())
}
@ -546,7 +547,7 @@ func (c *esClientV7) updateEntry(ctx context.Context, index string, key string,
if err != nil {
return err
}
defer DrainBody(res.Body)
defer xioutil.DiscardReader(res.Body)
if res.IsError() {
return fmt.Errorf("Update err: %s", res.String())
}
@ -572,7 +573,7 @@ func (c *esClientV7) addEntry(ctx context.Context, index string, eventData event
if err != nil {
return err
}
defer DrainBody(res.Body)
defer xioutil.DiscardReader(res.Body)
if res.IsError() {
return fmt.Errorf("Add err: %s", res.String())
}

@ -24,7 +24,6 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"net"
"net/http"
"net/url"
@ -34,8 +33,8 @@ import (
"syscall"
"time"
"github.com/dustin/go-humanize"
"github.com/minio/minio/internal/event"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/once"
"github.com/minio/minio/internal/store"
@ -197,7 +196,7 @@ func (target *WebhookTarget) send(eventData event.Event) error {
if err != nil {
return err
}
defer DrainBody(resp.Body)
defer xioutil.DiscardReader(resp.Body)
if resp.StatusCode < 200 || resp.StatusCode > 299 {
return fmt.Errorf("sending event failed with %v", resp.Status)
@ -312,23 +311,3 @@ func NewWebhookTarget(ctx context.Context, id string, args WebhookArgs, loggerOn
return target, nil
}
// DrainBody close non nil response with any response Body.
// convenient wrapper to drain any remaining data on response body.
//
// Subsequently this allows golang http RoundTripper
// to re-use the same connection for future requests.
func DrainBody(respBody io.ReadCloser) {
// Callers should close resp.Body when done reading from it.
// If resp.Body is not closed, the Client's underlying RoundTripper
// (typically Transport) may not be able to re-use a persistent TCP
// connection to the server for a subsequent "keep-alive" request.
if respBody != nil {
// Drain any remaining Body and then close the connection.
// Without this closing connection would disallow re-using
// the same connection for future uses.
// - http://stackoverflow.com/a/17961593/4465767
defer respBody.Close()
io.CopyN(io.Discard, respBody, 1*humanize.MiByte)
}
}

@ -19,6 +19,8 @@ package http
import (
"io"
xioutil "github.com/minio/minio/internal/ioutil"
)
// DrainBody close non nil response with any response Body.
@ -37,6 +39,6 @@ func DrainBody(respBody io.ReadCloser) {
// the same connection for future uses.
// - http://stackoverflow.com/a/17961593/4465767
defer respBody.Close()
io.Copy(io.Discard, respBody)
xioutil.DiscardReader(respBody)
}
}

@ -0,0 +1,40 @@
// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package ioutil
import (
"io"
)
// Discard is just like io.Discard without the io.ReaderFrom compatible
// implementation which is buggy on NUMA systems, we have to use a simpler
// io.Writer implementation alone avoids also unnecessary buffer copies,
// and as such incurred latencies.
var Discard io.Writer = discard{}
// discard is /dev/null for Golang.
type discard struct{}
func (discard) Write(p []byte) (int, error) {
return len(p), nil
}
// DiscardReader discarded reader
func DiscardReader(r io.Reader) {
Copy(Discard, r)
}

@ -33,6 +33,7 @@ import (
"github.com/klauspost/compress/zstd"
gzip "github.com/klauspost/pgzip"
"github.com/minio/minio/internal/config"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/s3select/csv"
"github.com/minio/minio/internal/s3select/json"
"github.com/minio/minio/internal/s3select/parquet"
@ -91,7 +92,7 @@ var bufioWriterPool = sync.Pool{
New: func() interface{} {
// io.Discard is just used to create the writer. Actual destination
// writer is set later by Reset() before using it.
return bufio.NewWriter(io.Discard)
return bufio.NewWriter(xioutil.Discard)
},
}
@ -467,7 +468,7 @@ func (s3Select *S3Select) marshal(buf *bytes.Buffer, record sql.Record) error {
// Use bufio Writer to prevent csv.Writer from allocating a new buffer.
bufioWriter := bufioWriterPool.Get().(*bufio.Writer)
defer func() {
bufioWriter.Reset(io.Discard)
bufioWriter.Reset(xioutil.Discard)
bufioWriterPool.Put(bufioWriter)
}()