From 91805bcab69f270e1b75a755658ce36423ee4f7e Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 23 Jul 2024 03:53:03 -0700 Subject: [PATCH] add optimizations to bring performance on unversioned READS (#20128) allow non-inlined on disk to be inlined via an unversioned ReadVersion() call, we only need ReadXL() to resolve objects with multiple versions only. The choice of this block makes it to be dynamic and chosen by the user via `mc admin config set` Other bonus things - Start measuring internode TTFB performance. - Set TCP_NODELAY, TCP_CORK for low latency --- cmd/erasure-healing-common_test.go | 7 ++++++- cmd/erasure-object.go | 10 ++++++---- cmd/storage-datatypes.go | 18 ++++++++++++++++++ cmd/storage-rest-client.go | 2 +- cmd/xl-storage.go | 29 +++++++++++++++++------------ internal/http/dial_linux.go | 5 +++++ internal/http/listener.go | 2 ++ internal/rest/rpc-stats.go | 18 ++++++++++++++---- 8 files changed, 69 insertions(+), 22 deletions(-) diff --git a/cmd/erasure-healing-common_test.go b/cmd/erasure-healing-common_test.go index 99b1ca99e..7cf755059 100644 --- a/cmd/erasure-healing-common_test.go +++ b/cmd/erasure-healing-common_test.go @@ -23,6 +23,7 @@ import ( "fmt" "os" "path/filepath" + "runtime" "testing" "time" @@ -147,6 +148,10 @@ func TestCommonTime(t *testing.T) { // TestListOnlineDisks - checks if listOnlineDisks and outDatedDisks // are consistent with each other. func TestListOnlineDisks(t *testing.T) { + if runtime.GOOS == globalWindowsOSName { + t.Skip() + } + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -230,7 +235,7 @@ func TestListOnlineDisks(t *testing.T) { } object := "object" - data := bytes.Repeat([]byte("a"), smallFileThreshold*16) + data := bytes.Repeat([]byte("a"), smallFileThreshold*32) z := obj.(*erasureServerPools) erasureDisks, err := z.GetDisks(0, 0) diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 0ed33a04b..f3f7953a5 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -283,7 +283,7 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri } if unlockOnDefer { - unlockOnDefer = fi.InlineData() + unlockOnDefer = fi.InlineData() || len(fi.Data) > 0 } pr, pw := xioutil.WaitPipe() @@ -908,6 +908,8 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s } rw.Lock() + // when its a versioned bucket and empty versionID - at totalResp == setDriveCount + // we must use rawFileInfo to resolve versions to figure out the latest version. if opts.VersionID == "" && totalResp == er.setDriveCount { fi, onlineMeta, onlineDisks, modTime, etag, err = calcQuorum(pickLatestQuorumFilesInfo(ctx, rawArr, errs, bucket, object, readData, opts.InclFreeVersions, true)) @@ -915,7 +917,7 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s fi, onlineMeta, onlineDisks, modTime, etag, err = calcQuorum(metaArr, errs) } rw.Unlock() - if err == nil && fi.InlineData() { + if err == nil && (fi.InlineData() || len(fi.Data) > 0) { break } } @@ -1399,7 +1401,7 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st writers := make([]io.Writer, len(onlineDisks)) var inlineBuffers []*bytes.Buffer if shardFileSize >= 0 { - if !opts.Versioned && shardFileSize < inlineBlock { + if !opts.Versioned && shardFileSize <= inlineBlock { inlineBuffers = make([]*bytes.Buffer, len(onlineDisks)) } else if shardFileSize < inlineBlock/8 { inlineBuffers = make([]*bytes.Buffer, len(onlineDisks)) @@ -1407,7 +1409,7 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st } else { // If compressed, use actual size to determine. if sz := erasure.ShardFileSize(data.ActualSize()); sz > 0 { - if !opts.Versioned && sz < inlineBlock { + if !opts.Versioned && sz <= inlineBlock { inlineBuffers = make([]*bytes.Buffer, len(onlineDisks)) } else if sz < inlineBlock/8 { inlineBuffers = make([]*bytes.Buffer, len(onlineDisks)) diff --git a/cmd/storage-datatypes.go b/cmd/storage-datatypes.go index eb74d9e77..8a0abcabe 100644 --- a/cmd/storage-datatypes.go +++ b/cmd/storage-datatypes.go @@ -264,6 +264,24 @@ type FileInfo struct { Versioned bool `msg:"vs"` } +func (fi FileInfo) shardSize() int64 { + return ceilFrac(fi.Erasure.BlockSize, int64(fi.Erasure.DataBlocks)) +} + +// ShardFileSize - returns final erasure size from original size. +func (fi FileInfo) ShardFileSize(totalLength int64) int64 { + if totalLength == 0 { + return 0 + } + if totalLength == -1 { + return -1 + } + numShards := totalLength / fi.Erasure.BlockSize + lastBlockSize := totalLength % fi.Erasure.BlockSize + lastShardSize := ceilFrac(lastBlockSize, int64(fi.Erasure.DataBlocks)) + return numShards*fi.shardSize() + lastShardSize +} + // ShallowCopy - copies minimal information for READ MRF checks. func (fi FileInfo) ShallowCopy() (n FileInfo) { n.Volume = fi.Volume diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index 9350178d0..7a71c174e 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -499,7 +499,7 @@ var readMsgpReaderPool = sync.Pool{New: func() interface{} { return &msgp.Reader func msgpNewReader(r io.Reader) *msgp.Reader { p := readMsgpReaderPool.Get().(*msgp.Reader) if p.R == nil { - p.R = xbufio.NewReaderSize(r, 4<<10) + p.R = xbufio.NewReaderSize(r, 32<<10) } else { p.R.Reset(r) } diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index b6b8a1bcd..dbef68ad2 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -1703,18 +1703,23 @@ func (s *xlStorage) ReadVersion(ctx context.Context, origvolume, volume, path, v fi.Data = nil } + attemptInline := fi.TransitionStatus == "" && fi.DataDir != "" && len(fi.Parts) == 1 // Reading data for small objects when // - object has not yet transitioned - // - object size lesser than 128KiB // - object has maximum of 1 parts - if fi.TransitionStatus == "" && - fi.DataDir != "" && fi.Size <= smallFileThreshold && - len(fi.Parts) == 1 { - partPath := fmt.Sprintf("part.%d", fi.Parts[0].Number) - dataPath := pathJoin(volumeDir, path, fi.DataDir, partPath) - fi.Data, _, err = s.readAllData(ctx, volume, volumeDir, dataPath, false) - if err != nil { - return FileInfo{}, err + if attemptInline { + inlineBlock := globalStorageClass.InlineBlock() + if inlineBlock <= 0 { + inlineBlock = 128 * humanize.KiByte + } + + canInline := fi.ShardFileSize(fi.Parts[0].ActualSize) <= inlineBlock + if canInline { + dataPath := pathJoin(volumeDir, path, fi.DataDir, fmt.Sprintf("part.%d", fi.Parts[0].Number)) + fi.Data, _, err = s.readAllData(ctx, volume, volumeDir, dataPath, false) + if err != nil { + return FileInfo{}, err + } } } } @@ -1768,7 +1773,7 @@ func (s *xlStorage) readAllData(ctx context.Context, volume, volumeDir string, f } if discard { - // This discard is mostly true for DELETEEs + // This discard is mostly true for deletes // so we need to make sure we do not keep // page-cache references after. defer disk.Fdatasync(f) @@ -2993,7 +2998,7 @@ func (s *xlStorage) ReadMultiple(ctx context.Context, req ReadMultipleReq, resp if req.MetadataOnly { data, mt, err = s.readMetadataWithDMTime(ctx, fullPath) } else { - data, mt, err = s.readAllData(ctx, req.Bucket, volumeDir, fullPath, true) + data, mt, err = s.readAllData(ctx, req.Bucket, volumeDir, fullPath, false) } return err }); err != nil { @@ -3096,7 +3101,7 @@ func (s *xlStorage) CleanAbandonedData(ctx context.Context, volume string, path } baseDir := pathJoin(volumeDir, path+slashSeparator) metaPath := pathutil.Join(baseDir, xlStorageFormatFile) - buf, _, err := s.readAllData(ctx, volume, volumeDir, metaPath, true) + buf, _, err := s.readAllData(ctx, volume, volumeDir, metaPath, false) if err != nil { return err } diff --git a/internal/http/dial_linux.go b/internal/http/dial_linux.go index f271e36c2..fd279aab5 100644 --- a/internal/http/dial_linux.go +++ b/internal/http/dial_linux.go @@ -49,6 +49,11 @@ func setTCPParametersFn(opts TCPOptions) func(network, address string, c syscall _ = unix.SetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_RCVBUF, opts.RecvBufSize) } + if opts.NoDelay { + _ = syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, unix.TCP_NODELAY, 1) + _ = syscall.SetsockoptInt(fd, syscall.SOL_TCP, unix.TCP_CORK, 0) + } + // Enable TCP open // https://lwn.net/Articles/508865/ - 32k queue size. _ = syscall.SetsockoptInt(fd, syscall.SOL_TCP, unix.TCP_FASTOPEN, 32*1024) diff --git a/internal/http/listener.go b/internal/http/listener.go index f7e7e65d4..b1a497332 100644 --- a/internal/http/listener.go +++ b/internal/http/listener.go @@ -125,6 +125,7 @@ type TCPOptions struct { SendBufSize int // SO_SNDBUF size for the socket connection, NOTE: this sets server and client connection RecvBufSize int // SO_RECVBUF size for the socket connection, NOTE: this sets server and client connection + NoDelay bool // Indicates callers to enable TCP_NODELAY on the net.Conn Interface string // This is a VRF device passed via `--interface` flag Trace func(msg string) // Trace when starting. } @@ -136,6 +137,7 @@ func (t TCPOptions) ForWebsocket() TCPOptions { Interface: t.Interface, SendBufSize: t.SendBufSize, RecvBufSize: t.RecvBufSize, + NoDelay: true, } } diff --git a/internal/rest/rpc-stats.go b/internal/rest/rpc-stats.go index ab0f6fe48..38e102c01 100644 --- a/internal/rest/rpc-stats.go +++ b/internal/rest/rpc-stats.go @@ -22,14 +22,17 @@ import ( "net/http/httptrace" "sync/atomic" "time" + + "github.com/minio/minio/internal/logger" ) var globalStats = struct { errs uint64 - tcpDialErrs uint64 - tcpDialCount uint64 - tcpDialTotalDur uint64 + tcpDialErrs uint64 + tcpDialCount uint64 + tcpDialTotalDur uint64 + tcpTimeForFirstByteTotalDur uint64 }{} // RPCStats holds information about the DHCP/TCP metrics and errors @@ -37,6 +40,7 @@ type RPCStats struct { Errs uint64 DialAvgDuration uint64 + TTFBAvgDuration uint64 DialErrs uint64 } @@ -48,6 +52,7 @@ func GetRPCStats() RPCStats { } if v := atomic.LoadUint64(&globalStats.tcpDialCount); v > 0 { s.DialAvgDuration = atomic.LoadUint64(&globalStats.tcpDialTotalDur) / v + s.TTFBAvgDuration = atomic.LoadUint64(&globalStats.tcpTimeForFirstByteTotalDur) / v } return s } @@ -55,14 +60,19 @@ func GetRPCStats() RPCStats { // Return a function which update the global stats related to tcp connections func setupReqStatsUpdate(req *http.Request) (*http.Request, func()) { var dialStart, dialEnd int64 - + start := time.Now() trace := &httptrace.ClientTrace{ + GotFirstResponseByte: func() { + atomic.AddUint64(&globalStats.tcpTimeForFirstByteTotalDur, uint64(time.Since(start))) + }, ConnectStart: func(network, addr string) { atomic.StoreInt64(&dialStart, time.Now().UnixNano()) }, ConnectDone: func(network, addr string, err error) { if err == nil { atomic.StoreInt64(&dialEnd, time.Now().UnixNano()) + } else { + logger.LogOnceIf(req.Context(), logSubsys, err, req.URL.Hostname()) } }, }