diff --git a/cmd/erasure-healing-common_test.go b/cmd/erasure-healing-common_test.go index 99b1ca99e..7cf755059 100644 --- a/cmd/erasure-healing-common_test.go +++ b/cmd/erasure-healing-common_test.go @@ -23,6 +23,7 @@ import ( "fmt" "os" "path/filepath" + "runtime" "testing" "time" @@ -147,6 +148,10 @@ func TestCommonTime(t *testing.T) { // TestListOnlineDisks - checks if listOnlineDisks and outDatedDisks // are consistent with each other. func TestListOnlineDisks(t *testing.T) { + if runtime.GOOS == globalWindowsOSName { + t.Skip() + } + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -230,7 +235,7 @@ func TestListOnlineDisks(t *testing.T) { } object := "object" - data := bytes.Repeat([]byte("a"), smallFileThreshold*16) + data := bytes.Repeat([]byte("a"), smallFileThreshold*32) z := obj.(*erasureServerPools) erasureDisks, err := z.GetDisks(0, 0) diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 0ed33a04b..f3f7953a5 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -283,7 +283,7 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri } if unlockOnDefer { - unlockOnDefer = fi.InlineData() + unlockOnDefer = fi.InlineData() || len(fi.Data) > 0 } pr, pw := xioutil.WaitPipe() @@ -908,6 +908,8 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s } rw.Lock() + // when its a versioned bucket and empty versionID - at totalResp == setDriveCount + // we must use rawFileInfo to resolve versions to figure out the latest version. if opts.VersionID == "" && totalResp == er.setDriveCount { fi, onlineMeta, onlineDisks, modTime, etag, err = calcQuorum(pickLatestQuorumFilesInfo(ctx, rawArr, errs, bucket, object, readData, opts.InclFreeVersions, true)) @@ -915,7 +917,7 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s fi, onlineMeta, onlineDisks, modTime, etag, err = calcQuorum(metaArr, errs) } rw.Unlock() - if err == nil && fi.InlineData() { + if err == nil && (fi.InlineData() || len(fi.Data) > 0) { break } } @@ -1399,7 +1401,7 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st writers := make([]io.Writer, len(onlineDisks)) var inlineBuffers []*bytes.Buffer if shardFileSize >= 0 { - if !opts.Versioned && shardFileSize < inlineBlock { + if !opts.Versioned && shardFileSize <= inlineBlock { inlineBuffers = make([]*bytes.Buffer, len(onlineDisks)) } else if shardFileSize < inlineBlock/8 { inlineBuffers = make([]*bytes.Buffer, len(onlineDisks)) @@ -1407,7 +1409,7 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st } else { // If compressed, use actual size to determine. if sz := erasure.ShardFileSize(data.ActualSize()); sz > 0 { - if !opts.Versioned && sz < inlineBlock { + if !opts.Versioned && sz <= inlineBlock { inlineBuffers = make([]*bytes.Buffer, len(onlineDisks)) } else if sz < inlineBlock/8 { inlineBuffers = make([]*bytes.Buffer, len(onlineDisks)) diff --git a/cmd/storage-datatypes.go b/cmd/storage-datatypes.go index eb74d9e77..8a0abcabe 100644 --- a/cmd/storage-datatypes.go +++ b/cmd/storage-datatypes.go @@ -264,6 +264,24 @@ type FileInfo struct { Versioned bool `msg:"vs"` } +func (fi FileInfo) shardSize() int64 { + return ceilFrac(fi.Erasure.BlockSize, int64(fi.Erasure.DataBlocks)) +} + +// ShardFileSize - returns final erasure size from original size. +func (fi FileInfo) ShardFileSize(totalLength int64) int64 { + if totalLength == 0 { + return 0 + } + if totalLength == -1 { + return -1 + } + numShards := totalLength / fi.Erasure.BlockSize + lastBlockSize := totalLength % fi.Erasure.BlockSize + lastShardSize := ceilFrac(lastBlockSize, int64(fi.Erasure.DataBlocks)) + return numShards*fi.shardSize() + lastShardSize +} + // ShallowCopy - copies minimal information for READ MRF checks. func (fi FileInfo) ShallowCopy() (n FileInfo) { n.Volume = fi.Volume diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index 9350178d0..7a71c174e 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -499,7 +499,7 @@ var readMsgpReaderPool = sync.Pool{New: func() interface{} { return &msgp.Reader func msgpNewReader(r io.Reader) *msgp.Reader { p := readMsgpReaderPool.Get().(*msgp.Reader) if p.R == nil { - p.R = xbufio.NewReaderSize(r, 4<<10) + p.R = xbufio.NewReaderSize(r, 32<<10) } else { p.R.Reset(r) } diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index b6b8a1bcd..dbef68ad2 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -1703,18 +1703,23 @@ func (s *xlStorage) ReadVersion(ctx context.Context, origvolume, volume, path, v fi.Data = nil } + attemptInline := fi.TransitionStatus == "" && fi.DataDir != "" && len(fi.Parts) == 1 // Reading data for small objects when // - object has not yet transitioned - // - object size lesser than 128KiB // - object has maximum of 1 parts - if fi.TransitionStatus == "" && - fi.DataDir != "" && fi.Size <= smallFileThreshold && - len(fi.Parts) == 1 { - partPath := fmt.Sprintf("part.%d", fi.Parts[0].Number) - dataPath := pathJoin(volumeDir, path, fi.DataDir, partPath) - fi.Data, _, err = s.readAllData(ctx, volume, volumeDir, dataPath, false) - if err != nil { - return FileInfo{}, err + if attemptInline { + inlineBlock := globalStorageClass.InlineBlock() + if inlineBlock <= 0 { + inlineBlock = 128 * humanize.KiByte + } + + canInline := fi.ShardFileSize(fi.Parts[0].ActualSize) <= inlineBlock + if canInline { + dataPath := pathJoin(volumeDir, path, fi.DataDir, fmt.Sprintf("part.%d", fi.Parts[0].Number)) + fi.Data, _, err = s.readAllData(ctx, volume, volumeDir, dataPath, false) + if err != nil { + return FileInfo{}, err + } } } } @@ -1768,7 +1773,7 @@ func (s *xlStorage) readAllData(ctx context.Context, volume, volumeDir string, f } if discard { - // This discard is mostly true for DELETEEs + // This discard is mostly true for deletes // so we need to make sure we do not keep // page-cache references after. defer disk.Fdatasync(f) @@ -2993,7 +2998,7 @@ func (s *xlStorage) ReadMultiple(ctx context.Context, req ReadMultipleReq, resp if req.MetadataOnly { data, mt, err = s.readMetadataWithDMTime(ctx, fullPath) } else { - data, mt, err = s.readAllData(ctx, req.Bucket, volumeDir, fullPath, true) + data, mt, err = s.readAllData(ctx, req.Bucket, volumeDir, fullPath, false) } return err }); err != nil { @@ -3096,7 +3101,7 @@ func (s *xlStorage) CleanAbandonedData(ctx context.Context, volume string, path } baseDir := pathJoin(volumeDir, path+slashSeparator) metaPath := pathutil.Join(baseDir, xlStorageFormatFile) - buf, _, err := s.readAllData(ctx, volume, volumeDir, metaPath, true) + buf, _, err := s.readAllData(ctx, volume, volumeDir, metaPath, false) if err != nil { return err } diff --git a/internal/http/dial_linux.go b/internal/http/dial_linux.go index f271e36c2..fd279aab5 100644 --- a/internal/http/dial_linux.go +++ b/internal/http/dial_linux.go @@ -49,6 +49,11 @@ func setTCPParametersFn(opts TCPOptions) func(network, address string, c syscall _ = unix.SetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_RCVBUF, opts.RecvBufSize) } + if opts.NoDelay { + _ = syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, unix.TCP_NODELAY, 1) + _ = syscall.SetsockoptInt(fd, syscall.SOL_TCP, unix.TCP_CORK, 0) + } + // Enable TCP open // https://lwn.net/Articles/508865/ - 32k queue size. _ = syscall.SetsockoptInt(fd, syscall.SOL_TCP, unix.TCP_FASTOPEN, 32*1024) diff --git a/internal/http/listener.go b/internal/http/listener.go index f7e7e65d4..b1a497332 100644 --- a/internal/http/listener.go +++ b/internal/http/listener.go @@ -125,6 +125,7 @@ type TCPOptions struct { SendBufSize int // SO_SNDBUF size for the socket connection, NOTE: this sets server and client connection RecvBufSize int // SO_RECVBUF size for the socket connection, NOTE: this sets server and client connection + NoDelay bool // Indicates callers to enable TCP_NODELAY on the net.Conn Interface string // This is a VRF device passed via `--interface` flag Trace func(msg string) // Trace when starting. } @@ -136,6 +137,7 @@ func (t TCPOptions) ForWebsocket() TCPOptions { Interface: t.Interface, SendBufSize: t.SendBufSize, RecvBufSize: t.RecvBufSize, + NoDelay: true, } } diff --git a/internal/rest/rpc-stats.go b/internal/rest/rpc-stats.go index ab0f6fe48..38e102c01 100644 --- a/internal/rest/rpc-stats.go +++ b/internal/rest/rpc-stats.go @@ -22,14 +22,17 @@ import ( "net/http/httptrace" "sync/atomic" "time" + + "github.com/minio/minio/internal/logger" ) var globalStats = struct { errs uint64 - tcpDialErrs uint64 - tcpDialCount uint64 - tcpDialTotalDur uint64 + tcpDialErrs uint64 + tcpDialCount uint64 + tcpDialTotalDur uint64 + tcpTimeForFirstByteTotalDur uint64 }{} // RPCStats holds information about the DHCP/TCP metrics and errors @@ -37,6 +40,7 @@ type RPCStats struct { Errs uint64 DialAvgDuration uint64 + TTFBAvgDuration uint64 DialErrs uint64 } @@ -48,6 +52,7 @@ func GetRPCStats() RPCStats { } if v := atomic.LoadUint64(&globalStats.tcpDialCount); v > 0 { s.DialAvgDuration = atomic.LoadUint64(&globalStats.tcpDialTotalDur) / v + s.TTFBAvgDuration = atomic.LoadUint64(&globalStats.tcpTimeForFirstByteTotalDur) / v } return s } @@ -55,14 +60,19 @@ func GetRPCStats() RPCStats { // Return a function which update the global stats related to tcp connections func setupReqStatsUpdate(req *http.Request) (*http.Request, func()) { var dialStart, dialEnd int64 - + start := time.Now() trace := &httptrace.ClientTrace{ + GotFirstResponseByte: func() { + atomic.AddUint64(&globalStats.tcpTimeForFirstByteTotalDur, uint64(time.Since(start))) + }, ConnectStart: func(network, addr string) { atomic.StoreInt64(&dialStart, time.Now().UnixNano()) }, ConnectDone: func(network, addr string, err error) { if err == nil { atomic.StoreInt64(&dialEnd, time.Now().UnixNano()) + } else { + logger.LogOnceIf(req.Context(), logSubsys, err, req.URL.Hostname()) } }, }