mirror of
https://github.com/minio/minio.git
synced 2025-11-09 13:39:46 -05:00
use O_DIRECT for all ReadFileStream (#13324)
This PR also removes #13312 to ensure that we can use a better mechanism to handle page-cache, using O_DIRECT even for Range GETs.
This commit is contained in:
@@ -27,8 +27,10 @@ import (
|
||||
"io"
|
||||
|
||||
"github.com/minio/highwayhash"
|
||||
"github.com/minio/minio/internal/logger"
|
||||
"golang.org/x/crypto/blake2b"
|
||||
|
||||
xioutil "github.com/minio/minio/internal/ioutil"
|
||||
"github.com/minio/minio/internal/logger"
|
||||
)
|
||||
|
||||
// magic HH-256 key as HH-256 hash of the first 100 decimals of π as utf-8 string with a zero key.
|
||||
@@ -172,8 +174,8 @@ func bitrotVerify(r io.Reader, wantSize, partSize int64, algo BitrotAlgorithm, w
|
||||
return errFileCorrupt
|
||||
}
|
||||
|
||||
bufp := xlPoolSmall.Get().(*[]byte)
|
||||
defer xlPoolSmall.Put(bufp)
|
||||
bufp := xioutil.ODirectPoolSmall.Get().(*[]byte)
|
||||
defer xioutil.ODirectPoolSmall.Put(bufp)
|
||||
|
||||
for left > 0 {
|
||||
// Read expected hash...
|
||||
|
||||
@@ -22,9 +22,11 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/minio/minio/internal/config/api"
|
||||
"github.com/minio/minio/internal/logger"
|
||||
mem "github.com/shirou/gopsutil/v3/mem"
|
||||
|
||||
"github.com/minio/minio/internal/config/api"
|
||||
xioutil "github.com/minio/minio/internal/ioutil"
|
||||
"github.com/minio/minio/internal/logger"
|
||||
)
|
||||
|
||||
type apiConfig struct {
|
||||
@@ -71,7 +73,8 @@ func (t *apiConfig) init(cfg api.Config, setDriveCounts []int) {
|
||||
// total_ram / ram_per_request
|
||||
// ram_per_request is (2MiB+128KiB) * driveCount \
|
||||
// + 2 * 10MiB (default erasure block size v1) + 2 * 1MiB (default erasure block size v2)
|
||||
apiRequestsMaxPerNode = int(maxMem / uint64(maxSetDrives*(blockSizeLarge+blockSizeSmall)+int(blockSizeV1*2+blockSizeV2*2)))
|
||||
blockSize := xioutil.BlockSizeLarge + xioutil.BlockSizeSmall
|
||||
apiRequestsMaxPerNode = int(maxMem / uint64(maxSetDrives*blockSize+int(blockSizeV1*2+blockSizeV2*2)))
|
||||
|
||||
if globalIsErasure {
|
||||
logger.Info("Automatically configured API requests per node based on available memory on the system: %d", apiRequestsMaxPerNode)
|
||||
|
||||
@@ -28,7 +28,6 @@ import (
|
||||
"syscall"
|
||||
"unsafe"
|
||||
|
||||
"github.com/minio/minio/internal/disk"
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
@@ -111,11 +110,6 @@ func readDirFn(dirPath string, fn func(name string, typ os.FileMode) error) erro
|
||||
}
|
||||
return osErrToFileErr(err)
|
||||
}
|
||||
if err := disk.Fadvise(f, disk.FadvSequential); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer disk.Fadvise(f, disk.FadvNoReuse)
|
||||
defer f.Close()
|
||||
|
||||
bufp := direntPool.Get().(*[]byte)
|
||||
@@ -191,12 +185,6 @@ func readDirWithOpts(dirPath string, opts readDirOpts) (entries []string, err er
|
||||
if err != nil {
|
||||
return nil, osErrToFileErr(err)
|
||||
}
|
||||
|
||||
if err := disk.Fadvise(f, disk.FadvSequential); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer disk.Fadvise(f, disk.FadvNoReuse)
|
||||
defer f.Close()
|
||||
|
||||
bufp := direntPool.Get().(*[]byte)
|
||||
|
||||
@@ -18,7 +18,6 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/rand"
|
||||
@@ -39,7 +38,6 @@ import (
|
||||
|
||||
"github.com/dustin/go-humanize"
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
"github.com/klauspost/readahead"
|
||||
"github.com/minio/minio/internal/bucket/lifecycle"
|
||||
"github.com/minio/minio/internal/color"
|
||||
"github.com/minio/minio/internal/config"
|
||||
@@ -51,18 +49,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
nullVersionID = "null"
|
||||
blockSizeSmall = 128 * humanize.KiByte // Default r/w block size for smaller objects.
|
||||
blockSizeLarge = 2 * humanize.MiByte // Default r/w block size for larger objects.
|
||||
blockSizeReallyLarge = 4 * humanize.MiByte // Default write block size for objects per shard >= 64MiB
|
||||
|
||||
// On regular files bigger than this;
|
||||
readAheadSize = 16 << 20
|
||||
// Read this many buffers ahead.
|
||||
readAheadBuffers = 4
|
||||
// Size of each buffer.
|
||||
readAheadBufSize = 1 << 20
|
||||
|
||||
nullVersionID = "null"
|
||||
// Really large streams threshold per shard.
|
||||
reallyLargeFileThreshold = 64 * humanize.MiByte // Optimized for HDDs
|
||||
|
||||
@@ -78,7 +65,7 @@ const (
|
||||
var alignedBuf []byte
|
||||
|
||||
func init() {
|
||||
alignedBuf = disk.AlignedBlock(4096)
|
||||
alignedBuf = disk.AlignedBlock(xioutil.DirectioAlignSize)
|
||||
_, _ = rand.Read(alignedBuf)
|
||||
}
|
||||
|
||||
@@ -97,27 +84,6 @@ func isValidVolname(volname string) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
var (
|
||||
xlPoolReallyLarge = sync.Pool{
|
||||
New: func() interface{} {
|
||||
b := disk.AlignedBlock(blockSizeReallyLarge)
|
||||
return &b
|
||||
},
|
||||
}
|
||||
xlPoolLarge = sync.Pool{
|
||||
New: func() interface{} {
|
||||
b := disk.AlignedBlock(blockSizeLarge)
|
||||
return &b
|
||||
},
|
||||
}
|
||||
xlPoolSmall = sync.Pool{
|
||||
New: func() interface{} {
|
||||
b := disk.AlignedBlock(blockSizeSmall)
|
||||
return &b
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
// xlStorage - implements StorageAPI interface.
|
||||
type xlStorage struct {
|
||||
diskPath string
|
||||
@@ -410,12 +376,6 @@ func (s *xlStorage) readMetadata(ctx context.Context, itemPath string) ([]byte,
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := disk.Fadvise(f, disk.FadvSequential); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer disk.Fadvise(f, disk.FadvNoReuse)
|
||||
defer f.Close()
|
||||
stat, err := f.Stat()
|
||||
if err != nil {
|
||||
@@ -1234,11 +1194,10 @@ func (s *xlStorage) readAllData(volumeDir string, filePath string) (buf []byte,
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
if err := disk.Fadvise(f, disk.FadvSequential); err != nil {
|
||||
return nil, err
|
||||
r := &xioutil.ODirectReader{
|
||||
File: f,
|
||||
SmallFile: true,
|
||||
}
|
||||
defer disk.Fadvise(f, disk.FadvNoReuse)
|
||||
r := &odirectReader{f, nil, nil, true, true, s, nil}
|
||||
defer r.Close()
|
||||
buf, err = ioutil.ReadAll(r)
|
||||
if err != nil {
|
||||
@@ -1428,75 +1387,6 @@ func (s *xlStorage) openFileNoSync(filePath string, mode int) (f *os.File, err e
|
||||
return w, nil
|
||||
}
|
||||
|
||||
// To support O_DIRECT reads for erasure backends.
|
||||
type odirectReader struct {
|
||||
f *os.File
|
||||
buf []byte
|
||||
bufp *[]byte
|
||||
freshRead bool
|
||||
smallFile bool
|
||||
s *xlStorage
|
||||
err error
|
||||
}
|
||||
|
||||
// Read - Implements Reader interface.
|
||||
func (o *odirectReader) Read(buf []byte) (n int, err error) {
|
||||
if o.err != nil && (len(o.buf) == 0 || o.freshRead) {
|
||||
return 0, o.err
|
||||
}
|
||||
if o.buf == nil {
|
||||
if o.smallFile {
|
||||
o.bufp = xlPoolSmall.Get().(*[]byte)
|
||||
} else {
|
||||
o.bufp = xlPoolLarge.Get().(*[]byte)
|
||||
}
|
||||
}
|
||||
if o.freshRead {
|
||||
o.buf = *o.bufp
|
||||
n, err = o.f.Read(o.buf)
|
||||
if err != nil && err != io.EOF {
|
||||
if isSysErrInvalidArg(err) {
|
||||
if err = disk.DisableDirectIO(o.f); err != nil {
|
||||
o.err = err
|
||||
return n, err
|
||||
}
|
||||
n, err = o.f.Read(o.buf)
|
||||
}
|
||||
if err != nil && err != io.EOF {
|
||||
o.err = err
|
||||
return n, err
|
||||
}
|
||||
}
|
||||
if n == 0 {
|
||||
// err is likely io.EOF
|
||||
o.err = err
|
||||
return n, err
|
||||
}
|
||||
o.err = err
|
||||
o.buf = o.buf[:n]
|
||||
o.freshRead = false
|
||||
}
|
||||
if len(buf) >= len(o.buf) {
|
||||
n = copy(buf, o.buf)
|
||||
o.freshRead = true
|
||||
return n, o.err
|
||||
}
|
||||
n = copy(buf, o.buf)
|
||||
o.buf = o.buf[n:]
|
||||
// There is more left in buffer, do not return any EOF yet.
|
||||
return n, nil
|
||||
}
|
||||
|
||||
// Close - Release the buffer and close the file.
|
||||
func (o *odirectReader) Close() error {
|
||||
if o.smallFile {
|
||||
xlPoolSmall.Put(o.bufp)
|
||||
} else {
|
||||
xlPoolLarge.Put(o.bufp)
|
||||
}
|
||||
return o.f.Close()
|
||||
}
|
||||
|
||||
// ReadFileStream - Returns the read stream of the file.
|
||||
func (s *xlStorage) ReadFileStream(ctx context.Context, volume, path string, offset, length int64) (io.ReadCloser, error) {
|
||||
if offset < 0 {
|
||||
@@ -1514,14 +1404,7 @@ func (s *xlStorage) ReadFileStream(ctx context.Context, volume, path string, off
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var file *os.File
|
||||
// O_DIRECT only supported if offset is zero
|
||||
if offset == 0 {
|
||||
file, err = OpenFileDirectIO(filePath, readMode, 0666)
|
||||
} else {
|
||||
// Open the file for reading.
|
||||
file, err = OpenFile(filePath, readMode, 0666)
|
||||
}
|
||||
file, err := OpenFileDirectIO(filePath, readMode, 0666)
|
||||
if err != nil {
|
||||
switch {
|
||||
case osIsNotExist(err):
|
||||
@@ -1557,52 +1440,44 @@ func (s *xlStorage) ReadFileStream(ctx context.Context, volume, path string, off
|
||||
return nil, errIsNotRegular
|
||||
}
|
||||
|
||||
// Enable sequential read access pattern - only applicable on Linux.
|
||||
if err := disk.Fadvise(file, disk.FadvSequential); err != nil {
|
||||
return nil, err
|
||||
alignment := offset%xioutil.DirectioAlignSize == 0
|
||||
if !alignment {
|
||||
if err = disk.DisableDirectIO(file); err != nil {
|
||||
file.Close()
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if offset == 0 {
|
||||
or := &odirectReader{file, nil, nil, true, false, s, nil}
|
||||
if length <= smallFileThreshold {
|
||||
or = &odirectReader{file, nil, nil, true, true, s, nil}
|
||||
if offset > 0 {
|
||||
if _, err = file.Seek(offset, io.SeekStart); err != nil {
|
||||
file.Close()
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
or := &xioutil.ODirectReader{
|
||||
File: file,
|
||||
SmallFile: false,
|
||||
}
|
||||
|
||||
if length <= smallFileThreshold {
|
||||
or = &xioutil.ODirectReader{
|
||||
File: file,
|
||||
SmallFile: true,
|
||||
}
|
||||
r := struct {
|
||||
io.Reader
|
||||
io.Closer
|
||||
}{Reader: io.LimitReader(or, length), Closer: closeWrapper(func() error {
|
||||
return or.Close()
|
||||
})}
|
||||
return r, nil
|
||||
}
|
||||
|
||||
r := struct {
|
||||
io.Reader
|
||||
io.Closer
|
||||
}{Reader: io.LimitReader(file, length), Closer: closeWrapper(func() error {
|
||||
disk.Fadvise(file, disk.FadvNoReuse)
|
||||
return file.Close()
|
||||
}{Reader: io.LimitReader(or, length), Closer: closeWrapper(func() error {
|
||||
if !alignment || offset+length%xioutil.DirectioAlignSize != 0 {
|
||||
// invalidate page-cache for unaligned reads.
|
||||
disk.FadviseDontNeed(file)
|
||||
}
|
||||
return or.Close()
|
||||
})}
|
||||
|
||||
if offset > 0 {
|
||||
if _, err = file.Seek(offset, io.SeekStart); err != nil {
|
||||
r.Close()
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// Add readahead to big reads
|
||||
if length >= readAheadSize {
|
||||
rc, err := readahead.NewReadCloserSize(r, readAheadBuffers, readAheadBufSize)
|
||||
if err != nil {
|
||||
r.Close()
|
||||
return nil, err
|
||||
}
|
||||
return rc, nil
|
||||
}
|
||||
|
||||
// Just add a small 64k buffer.
|
||||
r.Reader = bufio.NewReaderSize(r.Reader, 64<<10)
|
||||
return r, nil
|
||||
}
|
||||
|
||||
@@ -1683,11 +1558,11 @@ func (s *xlStorage) CreateFile(ctx context.Context, volume, path string, fileSiz
|
||||
var bufp *[]byte
|
||||
if fileSize > 0 && fileSize >= reallyLargeFileThreshold {
|
||||
// use a larger 4MiB buffer for really large streams.
|
||||
bufp = xlPoolReallyLarge.Get().(*[]byte)
|
||||
defer xlPoolReallyLarge.Put(bufp)
|
||||
bufp = xioutil.ODirectPoolXLarge.Get().(*[]byte)
|
||||
defer xioutil.ODirectPoolXLarge.Put(bufp)
|
||||
} else {
|
||||
bufp = xlPoolLarge.Get().(*[]byte)
|
||||
defer xlPoolLarge.Put(bufp)
|
||||
bufp = xioutil.ODirectPoolLarge.Get().(*[]byte)
|
||||
defer xioutil.ODirectPoolLarge.Put(bufp)
|
||||
}
|
||||
|
||||
written, err := xioutil.CopyAligned(w, r, *bufp, fileSize)
|
||||
|
||||
Reference in New Issue
Block a user