mirror of
https://github.com/minio/minio.git
synced 2025-04-23 11:55:47 -04:00
fix: add support for O_DIRECT reads for erasure backends (#10718)
This commit is contained in:
parent
6135f072d2
commit
3a2f89b3c0
@ -18,6 +18,7 @@ package storageclass
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
@ -32,17 +33,26 @@ const (
|
|||||||
RRS = "REDUCED_REDUNDANCY"
|
RRS = "REDUCED_REDUNDANCY"
|
||||||
// Standard storage class
|
// Standard storage class
|
||||||
STANDARD = "STANDARD"
|
STANDARD = "STANDARD"
|
||||||
|
// DMA storage class
|
||||||
|
DMA = "DMA"
|
||||||
|
|
||||||
|
// Valid values are "write" and "read-write"
|
||||||
|
DMAWrite = "write"
|
||||||
|
DMAReadWrite = "read-write"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Standard constats for config info storage class
|
// Standard constats for config info storage class
|
||||||
const (
|
const (
|
||||||
ClassStandard = "standard"
|
ClassStandard = "standard"
|
||||||
ClassRRS = "rrs"
|
ClassRRS = "rrs"
|
||||||
|
ClassDMA = "dma"
|
||||||
|
|
||||||
// Reduced redundancy storage class environment variable
|
// Reduced redundancy storage class environment variable
|
||||||
RRSEnv = "MINIO_STORAGE_CLASS_RRS"
|
RRSEnv = "MINIO_STORAGE_CLASS_RRS"
|
||||||
// Standard storage class environment variable
|
// Standard storage class environment variable
|
||||||
StandardEnv = "MINIO_STORAGE_CLASS_STANDARD"
|
StandardEnv = "MINIO_STORAGE_CLASS_STANDARD"
|
||||||
|
// DMA storage class environment variable
|
||||||
|
DMAEnv = "MINIO_STORAGE_CLASS_DMA"
|
||||||
|
|
||||||
// Supported storage class scheme is EC
|
// Supported storage class scheme is EC
|
||||||
schemePrefix = "EC"
|
schemePrefix = "EC"
|
||||||
@ -52,6 +62,9 @@ const (
|
|||||||
|
|
||||||
// Default RRS parity is always minimum parity.
|
// Default RRS parity is always minimum parity.
|
||||||
defaultRRSParity = minParityDisks
|
defaultRRSParity = minParityDisks
|
||||||
|
|
||||||
|
// Default DMA value
|
||||||
|
defaultDMA = DMAWrite
|
||||||
)
|
)
|
||||||
|
|
||||||
// DefaultKVS - default storage class config
|
// DefaultKVS - default storage class config
|
||||||
@ -65,18 +78,24 @@ var (
|
|||||||
Key: ClassRRS,
|
Key: ClassRRS,
|
||||||
Value: "EC:2",
|
Value: "EC:2",
|
||||||
},
|
},
|
||||||
|
config.KV{
|
||||||
|
Key: ClassDMA,
|
||||||
|
Value: defaultDMA,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
// StorageClass - holds storage class information
|
// StorageClass - holds storage class information
|
||||||
type StorageClass struct {
|
type StorageClass struct {
|
||||||
Parity int
|
Parity int
|
||||||
|
DMA string
|
||||||
}
|
}
|
||||||
|
|
||||||
// Config storage class configuration
|
// Config storage class configuration
|
||||||
type Config struct {
|
type Config struct {
|
||||||
Standard StorageClass `json:"standard"`
|
Standard StorageClass `json:"standard"`
|
||||||
RRS StorageClass `json:"rrs"`
|
RRS StorageClass `json:"rrs"`
|
||||||
|
DMA StorageClass `json:"dma"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// UnmarshalJSON - Validate SS and RRS parity when unmarshalling JSON.
|
// UnmarshalJSON - Validate SS and RRS parity when unmarshalling JSON.
|
||||||
@ -93,7 +112,7 @@ func (sCfg *Config) UnmarshalJSON(data []byte) error {
|
|||||||
// IsValid - returns true if input string is a valid
|
// IsValid - returns true if input string is a valid
|
||||||
// storage class kind supported.
|
// storage class kind supported.
|
||||||
func IsValid(sc string) bool {
|
func IsValid(sc string) bool {
|
||||||
return sc == RRS || sc == STANDARD
|
return sc == RRS || sc == STANDARD || sc == DMA
|
||||||
}
|
}
|
||||||
|
|
||||||
// UnmarshalText unmarshals storage class from its textual form into
|
// UnmarshalText unmarshals storage class from its textual form into
|
||||||
@ -103,6 +122,14 @@ func (sc *StorageClass) UnmarshalText(b []byte) error {
|
|||||||
if scStr == "" {
|
if scStr == "" {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
if scStr == DMAWrite {
|
||||||
|
sc.DMA = DMAWrite
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if scStr == DMAReadWrite {
|
||||||
|
sc.DMA = DMAReadWrite
|
||||||
|
return nil
|
||||||
|
}
|
||||||
s, err := parseStorageClass(scStr)
|
s, err := parseStorageClass(scStr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -116,14 +143,14 @@ func (sc *StorageClass) MarshalText() ([]byte, error) {
|
|||||||
if sc.Parity != 0 {
|
if sc.Parity != 0 {
|
||||||
return []byte(fmt.Sprintf("%s:%d", schemePrefix, sc.Parity)), nil
|
return []byte(fmt.Sprintf("%s:%d", schemePrefix, sc.Parity)), nil
|
||||||
}
|
}
|
||||||
return []byte(""), nil
|
return []byte(sc.DMA), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sc *StorageClass) String() string {
|
func (sc *StorageClass) String() string {
|
||||||
if sc.Parity != 0 {
|
if sc.Parity != 0 {
|
||||||
return fmt.Sprintf("%s:%d", schemePrefix, sc.Parity)
|
return fmt.Sprintf("%s:%d", schemePrefix, sc.Parity)
|
||||||
}
|
}
|
||||||
return ""
|
return sc.DMA
|
||||||
}
|
}
|
||||||
|
|
||||||
// Parses given storageClassEnv and returns a storageClass structure.
|
// Parses given storageClassEnv and returns a storageClass structure.
|
||||||
@ -212,6 +239,11 @@ func (sCfg Config) GetParityForSC(sc string) (parity int) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetDMA - returns DMA configuration.
|
||||||
|
func (sCfg Config) GetDMA() string {
|
||||||
|
return sCfg.DMA.DMA
|
||||||
|
}
|
||||||
|
|
||||||
// Enabled returns if etcd is enabled.
|
// Enabled returns if etcd is enabled.
|
||||||
func Enabled(kvs config.KVS) bool {
|
func Enabled(kvs config.KVS) bool {
|
||||||
ssc := kvs.Get(ClassStandard)
|
ssc := kvs.Get(ClassStandard)
|
||||||
@ -231,6 +263,7 @@ func LookupConfig(kvs config.KVS, setDriveCount int) (cfg Config, err error) {
|
|||||||
|
|
||||||
ssc := env.Get(StandardEnv, kvs.Get(ClassStandard))
|
ssc := env.Get(StandardEnv, kvs.Get(ClassStandard))
|
||||||
rrsc := env.Get(RRSEnv, kvs.Get(ClassRRS))
|
rrsc := env.Get(RRSEnv, kvs.Get(ClassRRS))
|
||||||
|
dma := env.Get(DMAEnv, kvs.Get(ClassDMA))
|
||||||
// Check for environment variables and parse into storageClass struct
|
// Check for environment variables and parse into storageClass struct
|
||||||
if ssc != "" {
|
if ssc != "" {
|
||||||
cfg.Standard, err = parseStorageClass(ssc)
|
cfg.Standard, err = parseStorageClass(ssc)
|
||||||
@ -252,6 +285,14 @@ func LookupConfig(kvs config.KVS, setDriveCount int) (cfg Config, err error) {
|
|||||||
cfg.RRS.Parity = defaultRRSParity
|
cfg.RRS.Parity = defaultRRSParity
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if dma == "" {
|
||||||
|
dma = defaultDMA
|
||||||
|
}
|
||||||
|
if dma != DMAReadWrite && dma != DMAWrite {
|
||||||
|
return Config{}, errors.New(`valid dma values are "read-write" and "write"`)
|
||||||
|
}
|
||||||
|
cfg.DMA.DMA = dma
|
||||||
|
|
||||||
// Validation is done after parsing both the storage classes. This is needed because we need one
|
// Validation is done after parsing both the storage classes. This is needed because we need one
|
||||||
// storage class value to deduce the correct value of the other storage class.
|
// storage class value to deduce the correct value of the other storage class.
|
||||||
if err = validateParity(cfg.Standard.Parity, cfg.RRS.Parity, setDriveCount); err != nil {
|
if err = validateParity(cfg.Standard.Parity, cfg.RRS.Parity, setDriveCount); err != nil {
|
||||||
|
@ -42,6 +42,7 @@ import (
|
|||||||
jsoniter "github.com/json-iterator/go"
|
jsoniter "github.com/json-iterator/go"
|
||||||
"github.com/klauspost/readahead"
|
"github.com/klauspost/readahead"
|
||||||
"github.com/minio/minio/cmd/config"
|
"github.com/minio/minio/cmd/config"
|
||||||
|
"github.com/minio/minio/cmd/config/storageclass"
|
||||||
"github.com/minio/minio/cmd/logger"
|
"github.com/minio/minio/cmd/logger"
|
||||||
"github.com/minio/minio/pkg/disk"
|
"github.com/minio/minio/pkg/disk"
|
||||||
"github.com/minio/minio/pkg/env"
|
"github.com/minio/minio/pkg/env"
|
||||||
@ -1584,6 +1585,56 @@ func (s *xlStorage) openFile(volume, path string, mode int) (f *os.File, err err
|
|||||||
return w, nil
|
return w, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// To support O_DIRECT reads for erasure backends.
|
||||||
|
type odirectreader struct {
|
||||||
|
f *os.File
|
||||||
|
buf []byte
|
||||||
|
bufp *[]byte
|
||||||
|
freshRead bool
|
||||||
|
s *xlStorage
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read - Implements Reader interface.
|
||||||
|
func (o *odirectreader) Read(buf []byte) (n int, err error) {
|
||||||
|
if o.err != nil {
|
||||||
|
return 0, o.err
|
||||||
|
}
|
||||||
|
if o.buf == nil {
|
||||||
|
o.bufp = o.s.pool.Get().(*[]byte)
|
||||||
|
}
|
||||||
|
if o.freshRead {
|
||||||
|
o.buf = *o.bufp
|
||||||
|
n, err = o.f.Read(o.buf)
|
||||||
|
if err != nil && err != io.EOF {
|
||||||
|
o.err = err
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
if n == 0 {
|
||||||
|
// err is io.EOF
|
||||||
|
o.err = err
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
o.buf = o.buf[:n]
|
||||||
|
o.freshRead = false
|
||||||
|
}
|
||||||
|
if len(buf) >= len(o.buf) {
|
||||||
|
n = copy(buf, o.buf)
|
||||||
|
o.freshRead = true
|
||||||
|
return n, nil
|
||||||
|
}
|
||||||
|
n = copy(buf, o.buf)
|
||||||
|
o.buf = o.buf[n:]
|
||||||
|
return n, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close - Release the buffer and close the file.
|
||||||
|
func (o *odirectreader) Close() error {
|
||||||
|
o.s.pool.Put(o.bufp)
|
||||||
|
atomic.AddInt32(&o.s.activeIOCount, -1)
|
||||||
|
return o.f.Close()
|
||||||
|
}
|
||||||
|
|
||||||
// ReadFileStream - Returns the read stream of the file.
|
// ReadFileStream - Returns the read stream of the file.
|
||||||
func (s *xlStorage) ReadFileStream(ctx context.Context, volume, path string, offset, length int64) (io.ReadCloser, error) {
|
func (s *xlStorage) ReadFileStream(ctx context.Context, volume, path string, offset, length int64) (io.ReadCloser, error) {
|
||||||
if offset < 0 {
|
if offset < 0 {
|
||||||
@ -1611,6 +1662,29 @@ func (s *xlStorage) ReadFileStream(ctx context.Context, volume, path string, off
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if offset == 0 && globalStorageClass.GetDMA() == storageclass.DMAReadWrite {
|
||||||
|
file, err := disk.OpenFileDirectIO(filePath, os.O_RDONLY, 0666)
|
||||||
|
if err != nil {
|
||||||
|
switch {
|
||||||
|
case os.IsNotExist(err):
|
||||||
|
return nil, errFileNotFound
|
||||||
|
case os.IsPermission(err):
|
||||||
|
return nil, errFileAccessDenied
|
||||||
|
case isSysErrNotDir(err):
|
||||||
|
return nil, errFileAccessDenied
|
||||||
|
case isSysErrIO(err):
|
||||||
|
return nil, errFaultyDisk
|
||||||
|
case isSysErrTooManyFiles(err):
|
||||||
|
return nil, errTooManyOpenFiles
|
||||||
|
default:
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
atomic.AddInt32(&s.activeIOCount, 1)
|
||||||
|
return &odirectreader{file, nil, nil, true, s, nil}, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Open the file for reading.
|
// Open the file for reading.
|
||||||
file, err := os.Open(filePath)
|
file, err := os.Open(filePath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -30,6 +30,7 @@ import (
|
|||||||
"syscall"
|
"syscall"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/minio/minio/cmd/config/storageclass"
|
||||||
"github.com/minio/minio/pkg/disk"
|
"github.com/minio/minio/pkg/disk"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -1063,6 +1064,11 @@ func TestXLStorageReadFile(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for l := 0; l < 2; l++ {
|
||||||
|
// 1st loop tests with dma=write, 2nd loop tests with dma=read-write.
|
||||||
|
if l == 1 {
|
||||||
|
globalStorageClass.DMA.DMA = storageclass.DMAReadWrite
|
||||||
|
}
|
||||||
// Following block validates all ReadFile test cases.
|
// Following block validates all ReadFile test cases.
|
||||||
for i, testCase := range testCases {
|
for i, testCase := range testCases {
|
||||||
var n int64
|
var n int64
|
||||||
@ -1118,6 +1124,10 @@ func TestXLStorageReadFile(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset the flag.
|
||||||
|
globalStorageClass.DMA.DMA = storageclass.DMAWrite
|
||||||
|
|
||||||
// TestXLStorage for permission denied.
|
// TestXLStorage for permission denied.
|
||||||
if runtime.GOOS != globalWindowsOSName {
|
if runtime.GOOS != globalWindowsOSName {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user