mirror of
https://github.com/minio/minio.git
synced 2025-11-10 14:09:48 -05:00
Add Put input readahead (#14084)
When reading input for PutObject or PutObjectPart add a readahead buffer for big inputs. This will make network reads+hashing separate run async with erasure coding and writes. This will reduce overall latency in distributed setups where the input is from upstream and writes go to other servers. We will read at 2 buffers ahead, meaning one will always be ready/waiting and one is currently being read from. This improves PutObject and PutObjectParts for these cases.
This commit is contained in:
@@ -29,6 +29,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/klauspost/readahead"
|
||||
"github.com/minio/minio-go/v7/pkg/set"
|
||||
xhttp "github.com/minio/minio/internal/http"
|
||||
"github.com/minio/minio/internal/logger"
|
||||
@@ -568,7 +569,22 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
||||
writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, tmpPartPath, erasure.ShardFileSize(data.Size()), DefaultBitrotAlgorithm, erasure.ShardSize())
|
||||
}
|
||||
|
||||
n, err := erasure.Encode(pctx, data, writers, buffer, writeQuorum)
|
||||
toEncode := io.Reader(data)
|
||||
if data.Size() > bigFileThreshold {
|
||||
// Add input readahead.
|
||||
// We use 2 buffers, so we always have a full buffer of input.
|
||||
bufA := er.bp.Get()
|
||||
bufB := er.bp.Get()
|
||||
defer er.bp.Put(bufA)
|
||||
defer er.bp.Put(bufB)
|
||||
ra, err := readahead.NewReaderBuffer(data, [][]byte{bufA[:fi.Erasure.BlockSize], bufB[:fi.Erasure.BlockSize]})
|
||||
if err == nil {
|
||||
toEncode = ra
|
||||
defer ra.Close()
|
||||
}
|
||||
}
|
||||
|
||||
n, err := erasure.Encode(pctx, toEncode, writers, buffer, writeQuorum)
|
||||
closeBitrotWriters(writers)
|
||||
if err != nil {
|
||||
return pi, toObjectErr(err, bucket, object)
|
||||
|
||||
@@ -29,6 +29,7 @@ import (
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/klauspost/readahead"
|
||||
"github.com/minio/madmin-go"
|
||||
"github.com/minio/minio-go/v7/pkg/tags"
|
||||
"github.com/minio/minio/internal/bucket/lifecycle"
|
||||
@@ -884,7 +885,21 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
|
||||
writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, tempErasureObj, shardFileSize, DefaultBitrotAlgorithm, erasure.ShardSize())
|
||||
}
|
||||
|
||||
n, erasureErr := erasure.Encode(ctx, data, writers, buffer, writeQuorum)
|
||||
toEncode := io.Reader(data)
|
||||
if data.Size() > bigFileThreshold {
|
||||
// We use 2 buffers, so we always have a full buffer of input.
|
||||
bufA := er.bp.Get()
|
||||
bufB := er.bp.Get()
|
||||
defer er.bp.Put(bufA)
|
||||
defer er.bp.Put(bufB)
|
||||
ra, err := readahead.NewReaderBuffer(data, [][]byte{bufA[:fi.Erasure.BlockSize], bufB[:fi.Erasure.BlockSize]})
|
||||
if err == nil {
|
||||
toEncode = ra
|
||||
defer ra.Close()
|
||||
}
|
||||
logger.LogIf(ctx, err)
|
||||
}
|
||||
n, erasureErr := erasure.Encode(ctx, toEncode, writers, buffer, writeQuorum)
|
||||
closeBitrotWriters(writers)
|
||||
if erasureErr != nil {
|
||||
return ObjectInfo{}, toObjectErr(erasureErr, minioMetaTmpBucket, tempErasureObj)
|
||||
|
||||
@@ -60,6 +60,9 @@ const (
|
||||
// For hardrives it is possible to set this to a lower value to avoid any
|
||||
// spike in latency. But currently we are simply keeping it optimal for SSDs.
|
||||
|
||||
// bigFileThreshold is the point where we add readahead to put operations.
|
||||
bigFileThreshold = 128 * humanize.MiByte
|
||||
|
||||
// XL metadata file carries per object metadata.
|
||||
xlStorageFormatFile = "xl.meta"
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user