fix: remove ODirectReader entirely since we do not need it anymore (#18619)

This commit is contained in:
Harshavardhana 2023-12-09 10:17:51 -08:00 committed by GitHub
parent 196e7e072b
commit 65f34cd823
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 60 additions and 205 deletions

View File

@ -1005,8 +1005,10 @@ func (s *xlStorage) deleteVersions(ctx context.Context, volume, path string, fis
return err return err
} }
discard := true
var legacyJSON bool var legacyJSON bool
buf, _, err := s.readAllData(ctx, volume, volumeDir, pathJoin(volumeDir, path, xlStorageFormatFile), false) buf, _, err := s.readAllData(ctx, volume, volumeDir, pathJoin(volumeDir, path, xlStorageFormatFile), discard)
if err != nil { if err != nil {
if !errors.Is(err, errFileNotFound) { if !errors.Is(err, errFileNotFound) {
return err return err
@ -1016,7 +1018,7 @@ func (s *xlStorage) deleteVersions(ctx context.Context, volume, path string, fis
legacy := s.formatLegacy legacy := s.formatLegacy
s.RUnlock() s.RUnlock()
if legacy { if legacy {
buf, _, err = s.readAllData(ctx, volume, volumeDir, pathJoin(volumeDir, path, xlStorageFormatFileV1), false) buf, _, err = s.readAllData(ctx, volume, volumeDir, pathJoin(volumeDir, path, xlStorageFormatFileV1), discard)
if err != nil { if err != nil {
return err return err
} }
@ -1163,7 +1165,7 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F
} }
var legacyJSON bool var legacyJSON bool
buf, _, err := s.readAllData(ctx, volume, volumeDir, pathJoin(filePath, xlStorageFormatFile), false) buf, _, err := s.readAllData(ctx, volume, volumeDir, pathJoin(filePath, xlStorageFormatFile), true)
if err != nil { if err != nil {
if !errors.Is(err, errFileNotFound) { if !errors.Is(err, errFileNotFound) {
return err return err
@ -1401,7 +1403,7 @@ func (s *xlStorage) readRaw(ctx context.Context, volume, volumeDir, filePath str
xlPath := pathJoin(filePath, xlStorageFormatFile) xlPath := pathJoin(filePath, xlStorageFormatFile)
if readData { if readData {
buf, dmTime, err = s.readAllData(ctx, volume, volumeDir, xlPath, true) buf, dmTime, err = s.readAllData(ctx, volume, volumeDir, xlPath, false)
} else { } else {
buf, dmTime, err = s.readMetadataWithDMTime(ctx, xlPath) buf, dmTime, err = s.readMetadataWithDMTime(ctx, xlPath)
if err != nil { if err != nil {
@ -1421,7 +1423,7 @@ func (s *xlStorage) readRaw(ctx context.Context, volume, volumeDir, filePath str
s.RUnlock() s.RUnlock()
if err != nil && errors.Is(err, errFileNotFound) && legacy { if err != nil && errors.Is(err, errFileNotFound) && legacy {
buf, dmTime, err = s.readAllData(ctx, volume, volumeDir, pathJoin(filePath, xlStorageFormatFileV1), true) buf, dmTime, err = s.readAllData(ctx, volume, volumeDir, pathJoin(filePath, xlStorageFormatFileV1), false)
if err != nil { if err != nil {
return nil, time.Time{}, err return nil, time.Time{}, err
} }
@ -1538,7 +1540,7 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str
len(fi.Parts) == 1 { len(fi.Parts) == 1 {
partPath := fmt.Sprintf("part.%d", fi.Parts[0].Number) partPath := fmt.Sprintf("part.%d", fi.Parts[0].Number)
dataPath := pathJoin(volumeDir, path, fi.DataDir, partPath) dataPath := pathJoin(volumeDir, path, fi.DataDir, partPath)
fi.Data, _, err = s.readAllData(ctx, volume, volumeDir, dataPath, true) fi.Data, _, err = s.readAllData(ctx, volume, volumeDir, dataPath, false)
if err != nil { if err != nil {
return FileInfo{}, err return FileInfo{}, err
} }
@ -1564,7 +1566,7 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str
return fi, nil return fi, nil
} }
func (s *xlStorage) readAllData(ctx context.Context, volume, volumeDir string, filePath string, sync bool) (buf []byte, dmTime time.Time, err error) { func (s *xlStorage) readAllData(ctx context.Context, volume, volumeDir string, filePath string, discard bool) (buf []byte, dmTime time.Time, err error) {
if filePath == "" { if filePath == "" {
return nil, dmTime, errFileNotFound return nil, dmTime, errFileNotFound
} }
@ -1573,13 +1575,7 @@ func (s *xlStorage) readAllData(ctx context.Context, volume, volumeDir string, f
return nil, time.Time{}, ctx.Err() return nil, time.Time{}, ctx.Err()
} }
odirectEnabled := globalAPIConfig.odirectEnabled() && s.oDirect f, err := OpenFile(filePath, readMode, 0o666)
var f *os.File
if odirectEnabled && sync {
f, err = OpenFileDirectIO(filePath, readMode, 0o666)
} else {
f, err = OpenFile(filePath, readMode, 0o666)
}
if err != nil { if err != nil {
switch { switch {
case osIsNotExist(err): case osIsNotExist(err):
@ -1614,23 +1610,20 @@ func (s *xlStorage) readAllData(ctx context.Context, volume, volumeDir string, f
} }
return nil, dmTime, err return nil, dmTime, err
} }
r := io.Reader(f)
var dr *xioutil.ODirectReader if discard {
if odirectEnabled { // This discard is mostly true for DELETEs
dr = &xioutil.ODirectReader{ // so we need to make sure we do not keep
File: f, // page-cache references after.
SmallFile: true, defer disk.Fdatasync(f)
} }
defer dr.Close()
r = dr
} else {
defer f.Close() defer f.Close()
}
// Get size for precise allocation. // Get size for precise allocation.
stat, err := f.Stat() stat, err := f.Stat()
if err != nil { if err != nil {
buf, err = io.ReadAll(r) buf, err = io.ReadAll(f)
return buf, dmTime, osErrToFileErr(err) return buf, dmTime, osErrToFileErr(err)
} }
if stat.IsDir() { if stat.IsDir() {
@ -1645,11 +1638,9 @@ func (s *xlStorage) readAllData(ctx context.Context, volume, volumeDir string, f
} else { } else {
buf = make([]byte, sz) buf = make([]byte, sz)
} }
if dr != nil {
dr.SmallFile = sz <= xioutil.BlockSizeSmall*2
}
// Read file... // Read file...
_, err = io.ReadFull(r, buf) _, err = io.ReadFull(f, buf)
return buf, stat.ModTime().UTC(), osErrToFileErr(err) return buf, stat.ModTime().UTC(), osErrToFileErr(err)
} }
@ -2780,7 +2771,7 @@ func (s *xlStorage) ReadMultiple(ctx context.Context, req ReadMultipleReq, resp
if req.MetadataOnly { if req.MetadataOnly {
data, mt, err = s.readMetadataWithDMTime(ctx, fullPath) data, mt, err = s.readMetadataWithDMTime(ctx, fullPath)
} else { } else {
data, mt, err = s.readAllData(ctx, req.Bucket, volumeDir, fullPath, false) data, mt, err = s.readAllData(ctx, req.Bucket, volumeDir, fullPath, true)
} }
return err return err
}); err != nil { }); err != nil {
@ -2875,7 +2866,7 @@ func (s *xlStorage) CleanAbandonedData(ctx context.Context, volume string, path
} }
baseDir := pathJoin(volumeDir, path+slashSeparator) baseDir := pathJoin(volumeDir, path+slashSeparator)
metaPath := pathutil.Join(baseDir, xlStorageFormatFile) metaPath := pathutil.Join(baseDir, xlStorageFormatFile)
buf, _, err := s.readAllData(ctx, volume, volumeDir, metaPath, false) buf, _, err := s.readAllData(ctx, volume, volumeDir, metaPath, true)
if err != nil { if err != nil {
return err return err
} }

View File

@ -28,9 +28,39 @@ import (
"sync" "sync"
"time" "time"
"github.com/dustin/go-humanize"
"github.com/minio/minio/internal/disk" "github.com/minio/minio/internal/disk"
) )
// Block sizes constant.
const (
BlockSizeSmall = 32 * humanize.KiByte // Default r/w block size for smaller objects.
BlockSizeLarge = 2 * humanize.MiByte // Default r/w block size for larger objects.
BlockSizeReallyLarge = 4 * humanize.MiByte // Default write block size for objects per shard >= 64MiB
)
// aligned sync.Pool's
var (
ODirectPoolXLarge = sync.Pool{
New: func() interface{} {
b := disk.AlignedBlock(BlockSizeReallyLarge)
return &b
},
}
ODirectPoolLarge = sync.Pool{
New: func() interface{} {
b := disk.AlignedBlock(BlockSizeLarge)
return &b
},
}
ODirectPoolSmall = sync.Pool{
New: func() interface{} {
b := disk.AlignedBlock(BlockSizeSmall)
return &b
},
}
)
// WriteOnCloser implements io.WriteCloser and always // WriteOnCloser implements io.WriteCloser and always
// executes at least one write operation if it is closed. // executes at least one write operation if it is closed.
// //

View File

@ -1,136 +0,0 @@
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package ioutil
import (
"errors"
"io"
"os"
"sync"
"syscall"
"github.com/dustin/go-humanize"
"github.com/minio/minio/internal/disk"
)
// ODirectReader - to support O_DIRECT reads for erasure backends.
type ODirectReader struct {
File *os.File
SmallFile bool
err error
buf []byte
bufp *[]byte
seenRead bool
}
// Block sizes constant.
const (
BlockSizeSmall = 32 * humanize.KiByte // Default r/w block size for smaller objects.
BlockSizeLarge = 2 * humanize.MiByte // Default r/w block size for larger objects.
BlockSizeReallyLarge = 4 * humanize.MiByte // Default write block size for objects per shard >= 64MiB
)
// O_DIRECT aligned sync.Pool's
var (
ODirectPoolXLarge = sync.Pool{
New: func() interface{} {
b := disk.AlignedBlock(BlockSizeReallyLarge)
return &b
},
}
ODirectPoolLarge = sync.Pool{
New: func() interface{} {
b := disk.AlignedBlock(BlockSizeLarge)
return &b
},
}
ODirectPoolSmall = sync.Pool{
New: func() interface{} {
b := disk.AlignedBlock(BlockSizeSmall)
return &b
},
}
)
// Invalid argument, unsupported flags such as O_DIRECT
func isSysErrInvalidArg(err error) bool {
return errors.Is(err, syscall.EINVAL)
}
// Read - Implements Reader interface.
func (o *ODirectReader) Read(buf []byte) (n int, err error) {
if o.err != nil && (len(o.buf) == 0 || !o.seenRead) {
return 0, o.err
}
if o.buf == nil {
if o.SmallFile {
o.bufp = ODirectPoolSmall.Get().(*[]byte)
} else {
o.bufp = ODirectPoolLarge.Get().(*[]byte)
}
}
if !o.seenRead {
o.buf = *o.bufp
n, err = o.File.Read(o.buf)
if err != nil && err != io.EOF {
if isSysErrInvalidArg(err) {
if err = disk.DisableDirectIO(o.File); err != nil {
o.err = err
return n, err
}
n, err = o.File.Read(o.buf)
}
if err != nil && err != io.EOF {
o.err = err
return n, err
}
}
if n == 0 {
// err is likely io.EOF
o.err = err
return n, err
}
o.err = err
o.buf = o.buf[:n]
o.seenRead = true
}
if len(buf) >= len(o.buf) {
n = copy(buf, o.buf)
o.seenRead = false
return n, o.err
}
n = copy(buf, o.buf)
o.buf = o.buf[n:]
// There is more left in buffer, do not return any EOF yet.
return n, nil
}
// Close - Release the buffer and close the file.
func (o *ODirectReader) Close() error {
if o.bufp != nil {
if o.SmallFile {
ODirectPoolSmall.Put(o.bufp)
} else {
ODirectPoolLarge.Put(o.bufp)
}
o.bufp = nil
o.buf = nil
}
o.err = errors.New("internal error: ODirectReader Read after Close")
return o.File.Close()
}

View File

@ -62,7 +62,6 @@ func ReadFileWithFileInfo(name string) ([]byte, fs.FileInfo, error) {
// //
// passes NOATIME flag for reads on Unix systems to avoid atime updates. // passes NOATIME flag for reads on Unix systems to avoid atime updates.
func ReadFile(name string) ([]byte, error) { func ReadFile(name string) ([]byte, error) {
if !disk.ODirectPlatform {
// Don't wrap with un-needed buffer. // Don't wrap with un-needed buffer.
// Don't use os.ReadFile, since it doesn't pass NO_ATIME when present. // Don't use os.ReadFile, since it doesn't pass NO_ATIME when present.
f, err := OsOpenFile(name, readMode, 0o666) f, err := OsOpenFile(name, readMode, 0o666)
@ -78,32 +77,3 @@ func ReadFile(name string) ([]byte, error) {
_, err = io.ReadFull(f, dst) _, err = io.ReadFull(f, dst)
return dst, err return dst, err
} }
f, err := OpenFileDirectIO(name, readMode, 0o666)
if err != nil {
// fallback if there is an error to read
// 'name' with O_DIRECT
f, err = OsOpenFile(name, readMode, 0o666)
if err != nil {
return nil, err
}
}
r := &ODirectReader{
File: f,
SmallFile: true,
}
defer r.Close()
st, err := f.Stat()
if err != nil {
return io.ReadAll(r)
}
// Select bigger blocks when reading would do more than 2 reads.
r.SmallFile = st.Size() <= BlockSizeSmall*2
dst := make([]byte, st.Size())
_, err = io.ReadFull(r, dst)
return dst, err
}