mirror of
https://github.com/minio/minio.git
synced 2024-12-24 06:05:55 -05:00
re-use io.Copy buffers with 32k pools (#13553)
Borrowed idea from Go's usage of this optimization for ReadFrom() on client side, we should re-use the 32k buffers io.Copy() allocates for generic copy from a reader to writer. the performance increase for reads for really tiny objects is at this range after this change. > * Fastest: +7.89% (+1.3 MiB/s) throughput, +7.89% (+1308.1) obj/s
This commit is contained in:
parent
30ba85bc67
commit
14d8a931fe
@ -24,6 +24,7 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
|
|
||||||
"github.com/klauspost/reedsolomon"
|
"github.com/klauspost/reedsolomon"
|
||||||
|
xioutil "github.com/minio/minio/internal/ioutil"
|
||||||
"github.com/minio/minio/internal/logger"
|
"github.com/minio/minio/internal/logger"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -82,7 +83,7 @@ func writeDataBlocks(ctx context.Context, dst io.Writer, enBlocks [][]byte, data
|
|||||||
|
|
||||||
// We have written all the blocks, write the last remaining block.
|
// We have written all the blocks, write the last remaining block.
|
||||||
if write < int64(len(block)) {
|
if write < int64(len(block)) {
|
||||||
n, err := io.Copy(dst, bytes.NewReader(block[:write]))
|
n, err := xioutil.Copy(dst, bytes.NewReader(block[:write]))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// The writer will be closed incase of range queries, which will emit ErrClosedPipe.
|
// The writer will be closed incase of range queries, which will emit ErrClosedPipe.
|
||||||
// The reader pipe might be closed at ListObjects io.EOF ignore it.
|
// The reader pipe might be closed at ListObjects io.EOF ignore it.
|
||||||
@ -96,7 +97,7 @@ func writeDataBlocks(ctx context.Context, dst io.Writer, enBlocks [][]byte, data
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Copy the block.
|
// Copy the block.
|
||||||
n, err := io.Copy(dst, bytes.NewReader(block))
|
n, err := xioutil.Copy(dst, bytes.NewReader(block))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// The writer will be closed incase of range queries, which will emit ErrClosedPipe.
|
// The writer will be closed incase of range queries, which will emit ErrClosedPipe.
|
||||||
// The reader pipe might be closed at ListObjects io.EOF ignore it.
|
// The reader pipe might be closed at ListObjects io.EOF ignore it.
|
||||||
|
@ -26,6 +26,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
xioutil "github.com/minio/minio/internal/ioutil"
|
||||||
"github.com/minio/minio/internal/lock"
|
"github.com/minio/minio/internal/lock"
|
||||||
"github.com/minio/minio/internal/logger"
|
"github.com/minio/minio/internal/logger"
|
||||||
)
|
)
|
||||||
@ -334,7 +335,7 @@ func fsCreateFile(ctx context.Context, filePath string, reader io.Reader, falloc
|
|||||||
}
|
}
|
||||||
defer writer.Close()
|
defer writer.Close()
|
||||||
|
|
||||||
bytesWritten, err := io.Copy(writer, reader)
|
bytesWritten, err := xioutil.Copy(writer, reader)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.LogIf(ctx, err)
|
logger.LogIf(ctx, err)
|
||||||
return 0, err
|
return 0, err
|
||||||
|
@ -26,8 +26,10 @@ import (
|
|||||||
"github.com/minio/madmin-go"
|
"github.com/minio/madmin-go"
|
||||||
"github.com/minio/minio-go/v7/pkg/encrypt"
|
"github.com/minio/minio-go/v7/pkg/encrypt"
|
||||||
"github.com/minio/minio-go/v7/pkg/tags"
|
"github.com/minio/minio-go/v7/pkg/tags"
|
||||||
"github.com/minio/minio/internal/bucket/replication"
|
|
||||||
"github.com/minio/pkg/bucket/policy"
|
"github.com/minio/pkg/bucket/policy"
|
||||||
|
|
||||||
|
"github.com/minio/minio/internal/bucket/replication"
|
||||||
|
xioutil "github.com/minio/minio/internal/ioutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
// CheckPreconditionFn returns true if precondition check failed.
|
// CheckPreconditionFn returns true if precondition check failed.
|
||||||
@ -261,6 +263,6 @@ func GetObject(ctx context.Context, api ObjectLayer, bucket, object string, star
|
|||||||
}
|
}
|
||||||
defer reader.Close()
|
defer reader.Close()
|
||||||
|
|
||||||
_, err = io.Copy(writer, reader)
|
_, err = xioutil.Copy(writer, reader)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -54,7 +54,7 @@ import (
|
|||||||
"github.com/minio/minio/internal/handlers"
|
"github.com/minio/minio/internal/handlers"
|
||||||
"github.com/minio/minio/internal/hash"
|
"github.com/minio/minio/internal/hash"
|
||||||
xhttp "github.com/minio/minio/internal/http"
|
xhttp "github.com/minio/minio/internal/http"
|
||||||
"github.com/minio/minio/internal/ioutil"
|
xioutil "github.com/minio/minio/internal/ioutil"
|
||||||
"github.com/minio/minio/internal/kms"
|
"github.com/minio/minio/internal/kms"
|
||||||
"github.com/minio/minio/internal/logger"
|
"github.com/minio/minio/internal/logger"
|
||||||
"github.com/minio/minio/internal/s3select"
|
"github.com/minio/minio/internal/s3select"
|
||||||
@ -504,14 +504,14 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj
|
|||||||
setHeadGetRespHeaders(w, r.Form)
|
setHeadGetRespHeaders(w, r.Form)
|
||||||
|
|
||||||
statusCodeWritten := false
|
statusCodeWritten := false
|
||||||
httpWriter := ioutil.WriteOnClose(w)
|
httpWriter := xioutil.WriteOnClose(w)
|
||||||
if rs != nil || opts.PartNumber > 0 {
|
if rs != nil || opts.PartNumber > 0 {
|
||||||
statusCodeWritten = true
|
statusCodeWritten = true
|
||||||
w.WriteHeader(http.StatusPartialContent)
|
w.WriteHeader(http.StatusPartialContent)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write object content to response body
|
// Write object content to response body
|
||||||
if _, err = io.Copy(httpWriter, gr); err != nil {
|
if _, err = xioutil.Copy(httpWriter, gr); err != nil {
|
||||||
if !httpWriter.HasWritten() && !statusCodeWritten {
|
if !httpWriter.HasWritten() && !statusCodeWritten {
|
||||||
// write error response only if no data or headers has been written to client yet
|
// write error response only if no data or headers has been written to client yet
|
||||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||||
|
@ -23,13 +23,13 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
stdioutil "io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/minio/minio/internal/crypto"
|
"github.com/minio/minio/internal/crypto"
|
||||||
"github.com/minio/minio/internal/ioutil"
|
xioutil "github.com/minio/minio/internal/ioutil"
|
||||||
"github.com/minio/minio/internal/logger"
|
"github.com/minio/minio/internal/logger"
|
||||||
"github.com/minio/pkg/bucket/policy"
|
"github.com/minio/pkg/bucket/policy"
|
||||||
xnet "github.com/minio/pkg/net"
|
xnet "github.com/minio/pkg/net"
|
||||||
@ -187,7 +187,7 @@ func (api objectAPIHandlers) getObjectInArchiveFileHandler(ctx context.Context,
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
rc = stdioutil.NopCloser(bytes.NewReader([]byte{}))
|
rc = ioutil.NopCloser(bytes.NewReader([]byte{}))
|
||||||
}
|
}
|
||||||
|
|
||||||
defer rc.Close()
|
defer rc.Close()
|
||||||
@ -199,10 +199,10 @@ func (api objectAPIHandlers) getObjectInArchiveFileHandler(ctx context.Context,
|
|||||||
|
|
||||||
setHeadGetRespHeaders(w, r.Form)
|
setHeadGetRespHeaders(w, r.Form)
|
||||||
|
|
||||||
httpWriter := ioutil.WriteOnClose(w)
|
httpWriter := xioutil.WriteOnClose(w)
|
||||||
|
|
||||||
// Write object content to response body
|
// Write object content to response body
|
||||||
if _, err = io.Copy(httpWriter, rc); err != nil {
|
if _, err = xioutil.Copy(httpWriter, rc); err != nil {
|
||||||
if !httpWriter.HasWritten() {
|
if !httpWriter.HasWritten() {
|
||||||
// write error response only if no data or headers has been written to client yet
|
// write error response only if no data or headers has been written to client yet
|
||||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||||
@ -324,7 +324,7 @@ func getFilesListFromZIPObject(ctx context.Context, objectAPI ObjectLayer, bucke
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, ObjectInfo{}, err
|
return nil, ObjectInfo{}, err
|
||||||
}
|
}
|
||||||
b, err := stdioutil.ReadAll(gr)
|
b, err := ioutil.ReadAll(gr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
gr.Close()
|
gr.Close()
|
||||||
return nil, ObjectInfo{}, err
|
return nil, ObjectInfo{}, err
|
||||||
|
@ -41,6 +41,7 @@ import (
|
|||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
"github.com/minio/minio/internal/config"
|
"github.com/minio/minio/internal/config"
|
||||||
xhttp "github.com/minio/minio/internal/http"
|
xhttp "github.com/minio/minio/internal/http"
|
||||||
|
xioutil "github.com/minio/minio/internal/ioutil"
|
||||||
xjwt "github.com/minio/minio/internal/jwt"
|
xjwt "github.com/minio/minio/internal/jwt"
|
||||||
"github.com/minio/minio/internal/logger"
|
"github.com/minio/minio/internal/logger"
|
||||||
xnet "github.com/minio/pkg/net"
|
xnet "github.com/minio/pkg/net"
|
||||||
@ -574,7 +575,7 @@ func (s *storageRESTServer) ReadFileStreamHandler(w http.ResponseWriter, r *http
|
|||||||
defer rc.Close()
|
defer rc.Close()
|
||||||
|
|
||||||
w.Header().Set(xhttp.ContentLength, strconv.Itoa(length))
|
w.Header().Set(xhttp.ContentLength, strconv.Itoa(length))
|
||||||
if _, err = io.Copy(w, rc); err != nil {
|
if _, err = xioutil.Copy(w, rc); err != nil {
|
||||||
if !xnet.IsNetworkOrHostDown(err, true) { // do not need to log disconnected clients
|
if !xnet.IsNetworkOrHostDown(err, true) { // do not need to log disconnected clients
|
||||||
logger.LogIf(r.Context(), err)
|
logger.LogIf(r.Context(), err)
|
||||||
}
|
}
|
||||||
@ -954,13 +955,22 @@ func streamHTTPResponse(w http.ResponseWriter) *httpStreamResponse {
|
|||||||
return &h
|
return &h
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var poolBuf8k = sync.Pool{
|
||||||
|
New: func() interface{} {
|
||||||
|
b := make([]byte, 8192)
|
||||||
|
return &b
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
// waitForHTTPStream will wait for responses where
|
// waitForHTTPStream will wait for responses where
|
||||||
// streamHTTPResponse has been used.
|
// streamHTTPResponse has been used.
|
||||||
// The returned reader contains the payload and must be closed if no error is returned.
|
// The returned reader contains the payload and must be closed if no error is returned.
|
||||||
func waitForHTTPStream(respBody io.ReadCloser, w io.Writer) error {
|
func waitForHTTPStream(respBody io.ReadCloser, w io.Writer) error {
|
||||||
var tmp [1]byte
|
var tmp [1]byte
|
||||||
// 8K copy buffer, reused for less allocs...
|
// 8K copy buffer, reused for less allocs...
|
||||||
var buf [8 << 10]byte
|
bufp := poolBuf8k.Get().(*[]byte)
|
||||||
|
buf := *bufp
|
||||||
|
defer poolBuf8k.Put(bufp)
|
||||||
for {
|
for {
|
||||||
_, err := io.ReadFull(respBody, tmp[:])
|
_, err := io.ReadFull(respBody, tmp[:])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -970,7 +980,7 @@ func waitForHTTPStream(respBody io.ReadCloser, w io.Writer) error {
|
|||||||
switch tmp[0] {
|
switch tmp[0] {
|
||||||
case 0:
|
case 0:
|
||||||
// 0 is unbuffered, copy the rest.
|
// 0 is unbuffered, copy the rest.
|
||||||
_, err := io.CopyBuffer(w, respBody, buf[:])
|
_, err := io.CopyBuffer(w, respBody, buf)
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -989,7 +999,7 @@ func waitForHTTPStream(respBody io.ReadCloser, w io.Writer) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
length := binary.LittleEndian.Uint32(tmp[:])
|
length := binary.LittleEndian.Uint32(tmp[:])
|
||||||
_, err = io.CopyBuffer(w, io.LimitReader(respBody, int64(length)), buf[:])
|
_, err = io.CopyBuffer(w, io.LimitReader(respBody, int64(length)), buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -27,6 +27,8 @@ import (
|
|||||||
"google.golang.org/api/googleapi"
|
"google.golang.org/api/googleapi"
|
||||||
"google.golang.org/api/iterator"
|
"google.golang.org/api/iterator"
|
||||||
"google.golang.org/api/option"
|
"google.golang.org/api/option"
|
||||||
|
|
||||||
|
xioutil "github.com/minio/minio/internal/ioutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
type warmBackendGCS struct {
|
type warmBackendGCS struct {
|
||||||
@ -54,7 +56,7 @@ func (gcs *warmBackendGCS) Put(ctx context.Context, key string, data io.Reader,
|
|||||||
if gcs.StorageClass != "" {
|
if gcs.StorageClass != "" {
|
||||||
w.ObjectAttrs.StorageClass = gcs.StorageClass
|
w.ObjectAttrs.StorageClass = gcs.StorageClass
|
||||||
}
|
}
|
||||||
if _, err := io.Copy(w, data); err != nil {
|
if _, err := xioutil.Copy(w, data); err != nil {
|
||||||
return "", gcsToObjectError(err, gcs.Bucket, key)
|
return "", gcsToObjectError(err, gcs.Bucket, key)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -24,6 +24,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/minio/minio/internal/disk"
|
"github.com/minio/minio/internal/disk"
|
||||||
@ -210,6 +211,22 @@ func NewSkipReader(r io.Reader, n int64) io.Reader {
|
|||||||
return &SkipReader{r, n}
|
return &SkipReader{r, n}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var copyBufPool = sync.Pool{
|
||||||
|
New: func() interface{} {
|
||||||
|
b := make([]byte, 32*1024)
|
||||||
|
return &b
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Copy is exactly like io.Copy but with re-usable buffers.
|
||||||
|
func Copy(dst io.Writer, src io.Reader) (written int64, err error) {
|
||||||
|
bufp := copyBufPool.Get().(*[]byte)
|
||||||
|
buf := *bufp
|
||||||
|
defer copyBufPool.Put(bufp)
|
||||||
|
|
||||||
|
return io.CopyBuffer(dst, src, buf)
|
||||||
|
}
|
||||||
|
|
||||||
// SameFile returns if the files are same.
|
// SameFile returns if the files are same.
|
||||||
func SameFile(fi1, fi2 os.FileInfo) bool {
|
func SameFile(fi1, fi2 os.FileInfo) bool {
|
||||||
if !os.SameFile(fi1, fi2) {
|
if !os.SameFile(fi1, fi2) {
|
||||||
@ -250,6 +267,7 @@ func CopyAligned(w *os.File, r io.Reader, alignedBuf []byte, totalSize int64) (i
|
|||||||
if err = disk.DisableDirectIO(w); err != nil {
|
if err = disk.DisableDirectIO(w); err != nil {
|
||||||
return remainingWritten, err
|
return remainingWritten, err
|
||||||
}
|
}
|
||||||
|
// Since w is *os.File io.Copy shall use ReadFrom() call.
|
||||||
return io.Copy(w, bytes.NewReader(buf))
|
return io.Copy(w, bytes.NewReader(buf))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user