mirror of
https://github.com/minio/minio.git
synced 2025-01-11 15:03:22 -05:00
fix: incorrect O_DIRECT behavior for reads (#12811)
O_DIRECT behavior was broken and it was still caching all the reads, this change properly fixes this behavior.
This commit is contained in:
parent
397637a042
commit
bfbdb8f0a8
@ -43,7 +43,6 @@ import (
|
|||||||
"github.com/minio/minio/internal/bucket/lifecycle"
|
"github.com/minio/minio/internal/bucket/lifecycle"
|
||||||
"github.com/minio/minio/internal/color"
|
"github.com/minio/minio/internal/color"
|
||||||
"github.com/minio/minio/internal/config"
|
"github.com/minio/minio/internal/config"
|
||||||
"github.com/minio/minio/internal/config/storageclass"
|
|
||||||
"github.com/minio/minio/internal/disk"
|
"github.com/minio/minio/internal/disk"
|
||||||
xioutil "github.com/minio/minio/internal/ioutil"
|
xioutil "github.com/minio/minio/internal/ioutil"
|
||||||
"github.com/minio/minio/internal/logger"
|
"github.com/minio/minio/internal/logger"
|
||||||
@ -1109,11 +1108,12 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str
|
|||||||
// - object has not yet transitioned
|
// - object has not yet transitioned
|
||||||
// - object size lesser than 128KiB
|
// - object size lesser than 128KiB
|
||||||
// - object has maximum of 1 parts
|
// - object has maximum of 1 parts
|
||||||
if fi.TransitionStatus == "" && fi.DataDir != "" && fi.Size <= smallFileThreshold && len(fi.Parts) == 1 {
|
if fi.TransitionStatus == "" &&
|
||||||
// Enable O_DIRECT optionally only if drive supports it.
|
fi.DataDir != "" && fi.Size <= smallFileThreshold &&
|
||||||
requireDirectIO := globalStorageClass.GetDMA() == storageclass.DMAReadWrite
|
len(fi.Parts) == 1 {
|
||||||
partPath := fmt.Sprintf("part.%d", fi.Parts[0].Number)
|
partPath := fmt.Sprintf("part.%d", fi.Parts[0].Number)
|
||||||
fi.Data, err = s.readAllData(volumeDir, pathJoin(volumeDir, path, fi.DataDir, partPath), requireDirectIO)
|
dataPath := pathJoin(volumeDir, path, fi.DataDir, partPath)
|
||||||
|
fi.Data, err = s.readAllData(volumeDir, dataPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return FileInfo{}, err
|
return FileInfo{}, err
|
||||||
}
|
}
|
||||||
@ -1123,15 +1123,8 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str
|
|||||||
return fi, nil
|
return fi, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *xlStorage) readAllData(volumeDir string, filePath string, requireDirectIO bool) (buf []byte, err error) {
|
func (s *xlStorage) readAllData(volumeDir string, filePath string) (buf []byte, err error) {
|
||||||
var r io.ReadCloser
|
f, err := OpenFileDirectIO(filePath, readMode, 0666)
|
||||||
if requireDirectIO {
|
|
||||||
var f *os.File
|
|
||||||
f, err = OpenFileDirectIO(filePath, readMode, 0666)
|
|
||||||
r = &odirectReader{f, nil, nil, true, true, s, nil}
|
|
||||||
} else {
|
|
||||||
r, err = OpenFile(filePath, readMode, 0)
|
|
||||||
}
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if osIsNotExist(err) {
|
if osIsNotExist(err) {
|
||||||
// Check if the object doesn't exist because its bucket
|
// Check if the object doesn't exist because its bucket
|
||||||
@ -1163,7 +1156,7 @@ func (s *xlStorage) readAllData(volumeDir string, filePath string, requireDirect
|
|||||||
}
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
r := &odirectReader{f, nil, nil, true, true, s, nil}
|
||||||
defer r.Close()
|
defer r.Close()
|
||||||
buf, err = ioutil.ReadAll(r)
|
buf, err = ioutil.ReadAll(r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -1191,8 +1184,7 @@ func (s *xlStorage) ReadAll(ctx context.Context, volume string, path string) (bu
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
requireDirectIO := globalStorageClass.GetDMA() == storageclass.DMAReadWrite
|
return s.readAllData(volumeDir, filePath)
|
||||||
return s.readAllData(volumeDir, filePath, requireDirectIO)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReadFile reads exactly len(buf) bytes into buf. It returns the
|
// ReadFile reads exactly len(buf) bytes into buf. It returns the
|
||||||
@ -1415,7 +1407,7 @@ func (s *xlStorage) ReadFileStream(ctx context.Context, volume, path string, off
|
|||||||
|
|
||||||
var file *os.File
|
var file *os.File
|
||||||
// O_DIRECT only supported if offset is zero
|
// O_DIRECT only supported if offset is zero
|
||||||
if offset == 0 && globalStorageClass.GetDMA() == storageclass.DMAReadWrite {
|
if offset == 0 {
|
||||||
file, err = OpenFileDirectIO(filePath, readMode, 0666)
|
file, err = OpenFileDirectIO(filePath, readMode, 0666)
|
||||||
} else {
|
} else {
|
||||||
// Open the file for reading.
|
// Open the file for reading.
|
||||||
@ -1456,7 +1448,7 @@ func (s *xlStorage) ReadFileStream(ctx context.Context, volume, path string, off
|
|||||||
return nil, errIsNotRegular
|
return nil, errIsNotRegular
|
||||||
}
|
}
|
||||||
|
|
||||||
if offset == 0 && globalStorageClass.GetDMA() == storageclass.DMAReadWrite {
|
if offset == 0 {
|
||||||
or := &odirectReader{file, nil, nil, true, false, s, nil}
|
or := &odirectReader{file, nil, nil, true, false, s, nil}
|
||||||
if length <= smallFileThreshold {
|
if length <= smallFileThreshold {
|
||||||
or = &odirectReader{file, nil, nil, true, true, s, nil}
|
or = &odirectReader{file, nil, nil, true, true, s, nil}
|
||||||
|
@ -30,8 +30,6 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"syscall"
|
"syscall"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/minio/minio/internal/config/storageclass"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestCheckPathLength(t *testing.T) {
|
func TestCheckPathLength(t *testing.T) {
|
||||||
@ -1152,10 +1150,6 @@ func TestXLStorageReadFile(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for l := 0; l < 2; l++ {
|
for l := 0; l < 2; l++ {
|
||||||
// 1st loop tests with dma=write, 2nd loop tests with dma=read-write.
|
|
||||||
if l == 1 {
|
|
||||||
globalStorageClass.DMA = storageclass.DMAReadWrite
|
|
||||||
}
|
|
||||||
// Following block validates all ReadFile test cases.
|
// Following block validates all ReadFile test cases.
|
||||||
for i, testCase := range testCases {
|
for i, testCase := range testCases {
|
||||||
var n int64
|
var n int64
|
||||||
@ -1213,9 +1207,6 @@ func TestXLStorageReadFile(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reset the flag.
|
|
||||||
globalStorageClass.DMA = storageclass.DMAWrite
|
|
||||||
|
|
||||||
// TestXLStorage for permission denied.
|
// TestXLStorage for permission denied.
|
||||||
if runtime.GOOS != globalWindowsOSName {
|
if runtime.GOOS != globalWindowsOSName {
|
||||||
permDeniedDir := createPermDeniedFile(t)
|
permDeniedDir := createPermDeniedFile(t)
|
||||||
|
@ -34,12 +34,6 @@ var (
|
|||||||
Optional: true,
|
Optional: true,
|
||||||
Type: "string",
|
Type: "string",
|
||||||
},
|
},
|
||||||
config.HelpKV{
|
|
||||||
Key: ClassDMA,
|
|
||||||
Description: `enable O_DIRECT for both read and write, defaults to "write" e.g. "read+write"`,
|
|
||||||
Optional: true,
|
|
||||||
Type: "string",
|
|
||||||
},
|
|
||||||
config.HelpKV{
|
config.HelpKV{
|
||||||
Key: config.Comment,
|
Key: config.Comment,
|
||||||
Description: config.DefaultComment,
|
Description: config.DefaultComment,
|
||||||
|
@ -19,7 +19,6 @@ package storageclass
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
@ -35,26 +34,17 @@ const (
|
|||||||
RRS = "REDUCED_REDUNDANCY"
|
RRS = "REDUCED_REDUNDANCY"
|
||||||
// Standard storage class
|
// Standard storage class
|
||||||
STANDARD = "STANDARD"
|
STANDARD = "STANDARD"
|
||||||
// DMA storage class
|
|
||||||
DMA = "DMA"
|
|
||||||
|
|
||||||
// Valid values are "write" and "read+write"
|
|
||||||
DMAWrite = "write"
|
|
||||||
DMAReadWrite = "read+write"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Standard constats for config info storage class
|
// Standard constats for config info storage class
|
||||||
const (
|
const (
|
||||||
ClassStandard = "standard"
|
ClassStandard = "standard"
|
||||||
ClassRRS = "rrs"
|
ClassRRS = "rrs"
|
||||||
ClassDMA = "dma"
|
|
||||||
|
|
||||||
// Reduced redundancy storage class environment variable
|
// Reduced redundancy storage class environment variable
|
||||||
RRSEnv = "MINIO_STORAGE_CLASS_RRS"
|
RRSEnv = "MINIO_STORAGE_CLASS_RRS"
|
||||||
// Standard storage class environment variable
|
// Standard storage class environment variable
|
||||||
StandardEnv = "MINIO_STORAGE_CLASS_STANDARD"
|
StandardEnv = "MINIO_STORAGE_CLASS_STANDARD"
|
||||||
// DMA storage class environment variable
|
|
||||||
DMAEnv = "MINIO_STORAGE_CLASS_DMA"
|
|
||||||
|
|
||||||
// Supported storage class scheme is EC
|
// Supported storage class scheme is EC
|
||||||
schemePrefix = "EC"
|
schemePrefix = "EC"
|
||||||
@ -64,9 +54,6 @@ const (
|
|||||||
|
|
||||||
// Default RRS parity is always minimum parity.
|
// Default RRS parity is always minimum parity.
|
||||||
defaultRRSParity = minParityDisks
|
defaultRRSParity = minParityDisks
|
||||||
|
|
||||||
// Default DMA value
|
|
||||||
defaultDMA = DMAReadWrite
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// DefaultKVS - default storage class config
|
// DefaultKVS - default storage class config
|
||||||
@ -80,10 +67,6 @@ var (
|
|||||||
Key: ClassRRS,
|
Key: ClassRRS,
|
||||||
Value: "EC:2",
|
Value: "EC:2",
|
||||||
},
|
},
|
||||||
config.KV{
|
|
||||||
Key: ClassDMA,
|
|
||||||
Value: defaultDMA,
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -99,7 +82,6 @@ var ConfigLock = sync.RWMutex{}
|
|||||||
type Config struct {
|
type Config struct {
|
||||||
Standard StorageClass `json:"standard"`
|
Standard StorageClass `json:"standard"`
|
||||||
RRS StorageClass `json:"rrs"`
|
RRS StorageClass `json:"rrs"`
|
||||||
DMA string `json:"dma"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// UnmarshalJSON - Validate SS and RRS parity when unmarshalling JSON.
|
// UnmarshalJSON - Validate SS and RRS parity when unmarshalling JSON.
|
||||||
@ -256,17 +238,9 @@ func (sCfg Config) Update(newCfg Config) {
|
|||||||
ConfigLock.Lock()
|
ConfigLock.Lock()
|
||||||
defer ConfigLock.Unlock()
|
defer ConfigLock.Unlock()
|
||||||
sCfg.RRS = newCfg.RRS
|
sCfg.RRS = newCfg.RRS
|
||||||
sCfg.DMA = newCfg.DMA
|
|
||||||
sCfg.Standard = newCfg.Standard
|
sCfg.Standard = newCfg.Standard
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetDMA - returns DMA configuration.
|
|
||||||
func (sCfg Config) GetDMA() string {
|
|
||||||
ConfigLock.RLock()
|
|
||||||
defer ConfigLock.RUnlock()
|
|
||||||
return sCfg.DMA
|
|
||||||
}
|
|
||||||
|
|
||||||
// Enabled returns if etcd is enabled.
|
// Enabled returns if etcd is enabled.
|
||||||
func Enabled(kvs config.KVS) bool {
|
func Enabled(kvs config.KVS) bool {
|
||||||
ssc := kvs.Get(ClassStandard)
|
ssc := kvs.Get(ClassStandard)
|
||||||
@ -278,13 +252,14 @@ func Enabled(kvs config.KVS) bool {
|
|||||||
func LookupConfig(kvs config.KVS, setDriveCount int) (cfg Config, err error) {
|
func LookupConfig(kvs config.KVS, setDriveCount int) (cfg Config, err error) {
|
||||||
cfg = Config{}
|
cfg = Config{}
|
||||||
|
|
||||||
|
kvs.Delete("dma")
|
||||||
|
|
||||||
if err = config.CheckValidKeys(config.StorageClassSubSys, kvs, DefaultKVS); err != nil {
|
if err = config.CheckValidKeys(config.StorageClassSubSys, kvs, DefaultKVS); err != nil {
|
||||||
return Config{}, err
|
return Config{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
ssc := env.Get(StandardEnv, kvs.Get(ClassStandard))
|
ssc := env.Get(StandardEnv, kvs.Get(ClassStandard))
|
||||||
rrsc := env.Get(RRSEnv, kvs.Get(ClassRRS))
|
rrsc := env.Get(RRSEnv, kvs.Get(ClassRRS))
|
||||||
dma := env.Get(DMAEnv, kvs.Get(ClassDMA))
|
|
||||||
// Check for environment variables and parse into storageClass struct
|
// Check for environment variables and parse into storageClass struct
|
||||||
if ssc != "" {
|
if ssc != "" {
|
||||||
cfg.Standard, err = parseStorageClass(ssc)
|
cfg.Standard, err = parseStorageClass(ssc)
|
||||||
@ -303,14 +278,6 @@ func LookupConfig(kvs config.KVS, setDriveCount int) (cfg Config, err error) {
|
|||||||
cfg.RRS.Parity = defaultRRSParity
|
cfg.RRS.Parity = defaultRRSParity
|
||||||
}
|
}
|
||||||
|
|
||||||
if dma == "" {
|
|
||||||
dma = defaultDMA
|
|
||||||
}
|
|
||||||
if dma != DMAReadWrite && dma != DMAWrite {
|
|
||||||
return Config{}, errors.New(`valid dma values are "read-write" and "write"`)
|
|
||||||
}
|
|
||||||
cfg.DMA = dma
|
|
||||||
|
|
||||||
// Validation is done after parsing both the storage classes. This is needed because we need one
|
// Validation is done after parsing both the storage classes. This is needed because we need one
|
||||||
// storage class value to deduce the correct value of the other storage class.
|
// storage class value to deduce the correct value of the other storage class.
|
||||||
if err = validateParity(cfg.Standard.Parity, cfg.RRS.Parity, setDriveCount); err != nil {
|
if err = validateParity(cfg.Standard.Parity, cfg.RRS.Parity, setDriveCount); err != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user