add optimizations to bring performance on unversioned READS (#20128)

allow non-inlined on disk to be inlined via
an unversioned ReadVersion() call, we only
need ReadXL() to resolve objects with multiple
versions only.

The choice of this block makes it to be dynamic
and chosen by the user via `mc admin config set`

Other bonus things

- Start measuring internode TTFB performance.
- Set TCP_NODELAY, TCP_CORK for low latency
This commit is contained in:
Harshavardhana 2024-07-23 03:53:03 -07:00 committed by GitHub
parent c0e2886e37
commit 91805bcab6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 69 additions and 22 deletions

View File

@ -23,6 +23,7 @@ import (
"fmt"
"os"
"path/filepath"
"runtime"
"testing"
"time"
@ -147,6 +148,10 @@ func TestCommonTime(t *testing.T) {
// TestListOnlineDisks - checks if listOnlineDisks and outDatedDisks
// are consistent with each other.
func TestListOnlineDisks(t *testing.T) {
if runtime.GOOS == globalWindowsOSName {
t.Skip()
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -230,7 +235,7 @@ func TestListOnlineDisks(t *testing.T) {
}
object := "object"
data := bytes.Repeat([]byte("a"), smallFileThreshold*16)
data := bytes.Repeat([]byte("a"), smallFileThreshold*32)
z := obj.(*erasureServerPools)
erasureDisks, err := z.GetDisks(0, 0)

View File

@ -283,7 +283,7 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri
}
if unlockOnDefer {
unlockOnDefer = fi.InlineData()
unlockOnDefer = fi.InlineData() || len(fi.Data) > 0
}
pr, pw := xioutil.WaitPipe()
@ -908,6 +908,8 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s
}
rw.Lock()
// when its a versioned bucket and empty versionID - at totalResp == setDriveCount
// we must use rawFileInfo to resolve versions to figure out the latest version.
if opts.VersionID == "" && totalResp == er.setDriveCount {
fi, onlineMeta, onlineDisks, modTime, etag, err = calcQuorum(pickLatestQuorumFilesInfo(ctx,
rawArr, errs, bucket, object, readData, opts.InclFreeVersions, true))
@ -915,7 +917,7 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s
fi, onlineMeta, onlineDisks, modTime, etag, err = calcQuorum(metaArr, errs)
}
rw.Unlock()
if err == nil && fi.InlineData() {
if err == nil && (fi.InlineData() || len(fi.Data) > 0) {
break
}
}
@ -1399,7 +1401,7 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
writers := make([]io.Writer, len(onlineDisks))
var inlineBuffers []*bytes.Buffer
if shardFileSize >= 0 {
if !opts.Versioned && shardFileSize < inlineBlock {
if !opts.Versioned && shardFileSize <= inlineBlock {
inlineBuffers = make([]*bytes.Buffer, len(onlineDisks))
} else if shardFileSize < inlineBlock/8 {
inlineBuffers = make([]*bytes.Buffer, len(onlineDisks))
@ -1407,7 +1409,7 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
} else {
// If compressed, use actual size to determine.
if sz := erasure.ShardFileSize(data.ActualSize()); sz > 0 {
if !opts.Versioned && sz < inlineBlock {
if !opts.Versioned && sz <= inlineBlock {
inlineBuffers = make([]*bytes.Buffer, len(onlineDisks))
} else if sz < inlineBlock/8 {
inlineBuffers = make([]*bytes.Buffer, len(onlineDisks))

View File

@ -264,6 +264,24 @@ type FileInfo struct {
Versioned bool `msg:"vs"`
}
func (fi FileInfo) shardSize() int64 {
return ceilFrac(fi.Erasure.BlockSize, int64(fi.Erasure.DataBlocks))
}
// ShardFileSize - returns final erasure size from original size.
func (fi FileInfo) ShardFileSize(totalLength int64) int64 {
if totalLength == 0 {
return 0
}
if totalLength == -1 {
return -1
}
numShards := totalLength / fi.Erasure.BlockSize
lastBlockSize := totalLength % fi.Erasure.BlockSize
lastShardSize := ceilFrac(lastBlockSize, int64(fi.Erasure.DataBlocks))
return numShards*fi.shardSize() + lastShardSize
}
// ShallowCopy - copies minimal information for READ MRF checks.
func (fi FileInfo) ShallowCopy() (n FileInfo) {
n.Volume = fi.Volume

View File

@ -499,7 +499,7 @@ var readMsgpReaderPool = sync.Pool{New: func() interface{} { return &msgp.Reader
func msgpNewReader(r io.Reader) *msgp.Reader {
p := readMsgpReaderPool.Get().(*msgp.Reader)
if p.R == nil {
p.R = xbufio.NewReaderSize(r, 4<<10)
p.R = xbufio.NewReaderSize(r, 32<<10)
} else {
p.R.Reset(r)
}

View File

@ -1703,18 +1703,23 @@ func (s *xlStorage) ReadVersion(ctx context.Context, origvolume, volume, path, v
fi.Data = nil
}
attemptInline := fi.TransitionStatus == "" && fi.DataDir != "" && len(fi.Parts) == 1
// Reading data for small objects when
// - object has not yet transitioned
// - object size lesser than 128KiB
// - object has maximum of 1 parts
if fi.TransitionStatus == "" &&
fi.DataDir != "" && fi.Size <= smallFileThreshold &&
len(fi.Parts) == 1 {
partPath := fmt.Sprintf("part.%d", fi.Parts[0].Number)
dataPath := pathJoin(volumeDir, path, fi.DataDir, partPath)
fi.Data, _, err = s.readAllData(ctx, volume, volumeDir, dataPath, false)
if err != nil {
return FileInfo{}, err
if attemptInline {
inlineBlock := globalStorageClass.InlineBlock()
if inlineBlock <= 0 {
inlineBlock = 128 * humanize.KiByte
}
canInline := fi.ShardFileSize(fi.Parts[0].ActualSize) <= inlineBlock
if canInline {
dataPath := pathJoin(volumeDir, path, fi.DataDir, fmt.Sprintf("part.%d", fi.Parts[0].Number))
fi.Data, _, err = s.readAllData(ctx, volume, volumeDir, dataPath, false)
if err != nil {
return FileInfo{}, err
}
}
}
}
@ -1768,7 +1773,7 @@ func (s *xlStorage) readAllData(ctx context.Context, volume, volumeDir string, f
}
if discard {
// This discard is mostly true for DELETEEs
// This discard is mostly true for deletes
// so we need to make sure we do not keep
// page-cache references after.
defer disk.Fdatasync(f)
@ -2993,7 +2998,7 @@ func (s *xlStorage) ReadMultiple(ctx context.Context, req ReadMultipleReq, resp
if req.MetadataOnly {
data, mt, err = s.readMetadataWithDMTime(ctx, fullPath)
} else {
data, mt, err = s.readAllData(ctx, req.Bucket, volumeDir, fullPath, true)
data, mt, err = s.readAllData(ctx, req.Bucket, volumeDir, fullPath, false)
}
return err
}); err != nil {
@ -3096,7 +3101,7 @@ func (s *xlStorage) CleanAbandonedData(ctx context.Context, volume string, path
}
baseDir := pathJoin(volumeDir, path+slashSeparator)
metaPath := pathutil.Join(baseDir, xlStorageFormatFile)
buf, _, err := s.readAllData(ctx, volume, volumeDir, metaPath, true)
buf, _, err := s.readAllData(ctx, volume, volumeDir, metaPath, false)
if err != nil {
return err
}

View File

@ -49,6 +49,11 @@ func setTCPParametersFn(opts TCPOptions) func(network, address string, c syscall
_ = unix.SetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_RCVBUF, opts.RecvBufSize)
}
if opts.NoDelay {
_ = syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, unix.TCP_NODELAY, 1)
_ = syscall.SetsockoptInt(fd, syscall.SOL_TCP, unix.TCP_CORK, 0)
}
// Enable TCP open
// https://lwn.net/Articles/508865/ - 32k queue size.
_ = syscall.SetsockoptInt(fd, syscall.SOL_TCP, unix.TCP_FASTOPEN, 32*1024)

View File

@ -125,6 +125,7 @@ type TCPOptions struct {
SendBufSize int // SO_SNDBUF size for the socket connection, NOTE: this sets server and client connection
RecvBufSize int // SO_RECVBUF size for the socket connection, NOTE: this sets server and client connection
NoDelay bool // Indicates callers to enable TCP_NODELAY on the net.Conn
Interface string // This is a VRF device passed via `--interface` flag
Trace func(msg string) // Trace when starting.
}
@ -136,6 +137,7 @@ func (t TCPOptions) ForWebsocket() TCPOptions {
Interface: t.Interface,
SendBufSize: t.SendBufSize,
RecvBufSize: t.RecvBufSize,
NoDelay: true,
}
}

View File

@ -22,14 +22,17 @@ import (
"net/http/httptrace"
"sync/atomic"
"time"
"github.com/minio/minio/internal/logger"
)
var globalStats = struct {
errs uint64
tcpDialErrs uint64
tcpDialCount uint64
tcpDialTotalDur uint64
tcpDialErrs uint64
tcpDialCount uint64
tcpDialTotalDur uint64
tcpTimeForFirstByteTotalDur uint64
}{}
// RPCStats holds information about the DHCP/TCP metrics and errors
@ -37,6 +40,7 @@ type RPCStats struct {
Errs uint64
DialAvgDuration uint64
TTFBAvgDuration uint64
DialErrs uint64
}
@ -48,6 +52,7 @@ func GetRPCStats() RPCStats {
}
if v := atomic.LoadUint64(&globalStats.tcpDialCount); v > 0 {
s.DialAvgDuration = atomic.LoadUint64(&globalStats.tcpDialTotalDur) / v
s.TTFBAvgDuration = atomic.LoadUint64(&globalStats.tcpTimeForFirstByteTotalDur) / v
}
return s
}
@ -55,14 +60,19 @@ func GetRPCStats() RPCStats {
// Return a function which update the global stats related to tcp connections
func setupReqStatsUpdate(req *http.Request) (*http.Request, func()) {
var dialStart, dialEnd int64
start := time.Now()
trace := &httptrace.ClientTrace{
GotFirstResponseByte: func() {
atomic.AddUint64(&globalStats.tcpTimeForFirstByteTotalDur, uint64(time.Since(start)))
},
ConnectStart: func(network, addr string) {
atomic.StoreInt64(&dialStart, time.Now().UnixNano())
},
ConnectDone: func(network, addr string, err error) {
if err == nil {
atomic.StoreInt64(&dialEnd, time.Now().UnixNano())
} else {
logger.LogOnceIf(req.Context(), logSubsys, err, req.URL.Hostname())
}
},
}