mirror of https://github.com/minio/minio.git
storage write call path optimizations (#11805)
- write in o_dsync instead of o_direct for smaller objects to avoid unaligned double Write() situations that may arise for smaller objects < 128KiB - avoid fallocate() as its not useful since we do not use Append() semantics anymore, fallocate is not useful for streaming I/O we can save on a syscall - createFile() doesn't need to validate `bucket` name with a Lstat() call since createFile() is only used to write at `minioTmpBucket` - use io.Copy() when writing unAligned writes to allow usage of ReadFrom() from *os.File providing zero buffer writes().
This commit is contained in:
parent
0eb146e1b2
commit
60b0f2324e
2
Makefile
2
Makefile
|
@ -38,7 +38,7 @@ lint:
|
||||||
check: test
|
check: test
|
||||||
test: verifiers build
|
test: verifiers build
|
||||||
@echo "Running unit tests"
|
@echo "Running unit tests"
|
||||||
@GO111MODULE=on CGO_ENABLED=0 go test -tags kqueue ./... 1>/dev/null
|
@GOGC=25 GO111MODULE=on CGO_ENABLED=0 go test -tags kqueue ./... 1>/dev/null
|
||||||
|
|
||||||
test-race: verifiers build
|
test-race: verifiers build
|
||||||
@echo "Running unit tests under -race"
|
@echo "Running unit tests under -race"
|
||||||
|
|
|
@ -33,6 +33,7 @@ export ACCESS_KEY="minio"
|
||||||
export SECRET_KEY="minio123"
|
export SECRET_KEY="minio123"
|
||||||
export ENABLE_HTTPS=0
|
export ENABLE_HTTPS=0
|
||||||
export GO111MODULE=on
|
export GO111MODULE=on
|
||||||
|
export GOGC=25
|
||||||
|
|
||||||
MINIO_CONFIG_DIR="$WORK_DIR/.minio"
|
MINIO_CONFIG_DIR="$WORK_DIR/.minio"
|
||||||
MINIO=( "$PWD/minio" --config-dir "$MINIO_CONFIG_DIR" )
|
MINIO=( "$PWD/minio" --config-dir "$MINIO_CONFIG_DIR" )
|
||||||
|
|
|
@ -28,6 +28,8 @@ WORK_DIR="$PWD/.verify-$RANDOM"
|
||||||
MINIO_CONFIG_DIR="$WORK_DIR/.minio"
|
MINIO_CONFIG_DIR="$WORK_DIR/.minio"
|
||||||
MINIO=( "$PWD/minio" --config-dir "$MINIO_CONFIG_DIR" server )
|
MINIO=( "$PWD/minio" --config-dir "$MINIO_CONFIG_DIR" server )
|
||||||
|
|
||||||
|
export GOGC=25
|
||||||
|
|
||||||
function start_minio_3_node() {
|
function start_minio_3_node() {
|
||||||
export MINIO_ROOT_USER=minio
|
export MINIO_ROOT_USER=minio
|
||||||
export MINIO_ROOT_PASSWORD=minio123
|
export MINIO_ROOT_PASSWORD=minio123
|
||||||
|
|
|
@ -224,17 +224,17 @@ func newXLStorage(ep Endpoint) (*xlStorage, error) {
|
||||||
// is emphemeral and we should treat it
|
// is emphemeral and we should treat it
|
||||||
// as root disk from the baremetal
|
// as root disk from the baremetal
|
||||||
// terminology.
|
// terminology.
|
||||||
rootDisk, err = disk.IsRootDisk(path, "/")
|
rootDisk, err = disk.IsRootDisk(path, SlashSeparator)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if !rootDisk {
|
if !rootDisk {
|
||||||
// No root disk was found, its possible that
|
// No root disk was found, its possible that
|
||||||
// path is referenced at "/data" which has
|
// path is referenced at "/etc/hosts" which has
|
||||||
// different device ID that points to the original
|
// different device ID that points to the original
|
||||||
// "/" on the host system, fall back to that instead
|
// "/" on the host system, fall back to that instead
|
||||||
// to verify of the device id is same.
|
// to verify of the device id is same.
|
||||||
rootDisk, err = disk.IsRootDisk(path, "/data")
|
rootDisk, err = disk.IsRootDisk(path, "/etc/hosts")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -242,7 +242,7 @@ func newXLStorage(ep Endpoint) (*xlStorage, error) {
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
// On baremetal setups its always "/" is the root disk.
|
// On baremetal setups its always "/" is the root disk.
|
||||||
rootDisk, err = disk.IsRootDisk(path, "/")
|
rootDisk, err = disk.IsRootDisk(path, SlashSeparator)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -1233,7 +1233,7 @@ func (s *xlStorage) openFile(volume, path string, mode int) (f *os.File, err err
|
||||||
// Create top level directories if they don't exist.
|
// Create top level directories if they don't exist.
|
||||||
// with mode 0777 mkdir honors system umask.
|
// with mode 0777 mkdir honors system umask.
|
||||||
if err = mkdirAll(pathutil.Dir(filePath), 0777); err != nil {
|
if err = mkdirAll(pathutil.Dir(filePath), 0777); err != nil {
|
||||||
return nil, err
|
return nil, osErrToFileErr(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
w, err := os.OpenFile(filePath, mode|writeMode, 0666)
|
w, err := os.OpenFile(filePath, mode|writeMode, 0666)
|
||||||
|
@ -1432,82 +1432,42 @@ func (s *xlStorage) CreateFile(ctx context.Context, volume, path string, fileSiz
|
||||||
return errInvalidArgument
|
return errInvalidArgument
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if fileSize <= smallFileThreshold {
|
||||||
|
// For streams smaller than 128KiB we simply write them as O_DSYNC (fdatasync)
|
||||||
|
// and not O_DIRECT to avoid the complexities of aligned I/O.
|
||||||
|
w, err := s.openFile(volume, path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer w.Close()
|
||||||
|
|
||||||
|
written, err := io.Copy(w, r)
|
||||||
|
if err != nil {
|
||||||
|
return osErrToFileErr(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if written > fileSize {
|
||||||
|
return errMoreData
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
volumeDir, err := s.getVolDir(volume)
|
volumeDir, err := s.getVolDir(volume)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stat a volume entry.
|
|
||||||
_, err = os.Lstat(volumeDir)
|
|
||||||
if err != nil {
|
|
||||||
if osIsNotExist(err) {
|
|
||||||
return errVolumeNotFound
|
|
||||||
} else if isSysErrIO(err) {
|
|
||||||
return errFaultyDisk
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
filePath := pathJoin(volumeDir, path)
|
filePath := pathJoin(volumeDir, path)
|
||||||
if err = checkPathLength(filePath); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create top level directories if they don't exist.
|
// Create top level directories if they don't exist.
|
||||||
// with mode 0777 mkdir honors system umask.
|
// with mode 0777 mkdir honors system umask.
|
||||||
if err = mkdirAll(pathutil.Dir(filePath), 0777); err != nil {
|
if err = mkdirAll(pathutil.Dir(filePath), 0777); err != nil {
|
||||||
switch {
|
return osErrToFileErr(err)
|
||||||
case osIsPermission(err):
|
|
||||||
return errFileAccessDenied
|
|
||||||
case osIsExist(err):
|
|
||||||
return errFileAccessDenied
|
|
||||||
case isSysErrIO(err):
|
|
||||||
return errFaultyDisk
|
|
||||||
case isSysErrInvalidArg(err):
|
|
||||||
return errUnsupportedDisk
|
|
||||||
case isSysErrNoSpace(err):
|
|
||||||
return errDiskFull
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
w, err := disk.OpenFileDirectIO(filePath, os.O_CREATE|os.O_WRONLY|os.O_EXCL, 0666)
|
w, err := disk.OpenFileDirectIO(filePath, os.O_CREATE|os.O_WRONLY|os.O_EXCL, 0666)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
switch {
|
return osErrToFileErr(err)
|
||||||
case osIsPermission(err):
|
|
||||||
return errFileAccessDenied
|
|
||||||
case osIsExist(err):
|
|
||||||
return errFileAccessDenied
|
|
||||||
case isSysErrIO(err):
|
|
||||||
return errFaultyDisk
|
|
||||||
case isSysErrInvalidArg(err):
|
|
||||||
return errUnsupportedDisk
|
|
||||||
case isSysErrNoSpace(err):
|
|
||||||
return errDiskFull
|
|
||||||
default:
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var e error
|
|
||||||
if fileSize > 0 {
|
|
||||||
// Allocate needed disk space to append data
|
|
||||||
e = Fallocate(int(w.Fd()), 0, fileSize)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ignore errors when Fallocate is not supported in the current system
|
|
||||||
if e != nil && !isSysErrNoSys(e) && !isSysErrOpNotSupported(e) {
|
|
||||||
switch {
|
|
||||||
case isSysErrNoSpace(e):
|
|
||||||
err = errDiskFull
|
|
||||||
case isSysErrIO(e):
|
|
||||||
err = errFaultyDisk
|
|
||||||
default:
|
|
||||||
// For errors: EBADF, EINTR, EINVAL, ENODEV, EPERM, ESPIPE and ETXTBSY
|
|
||||||
// Appending was failed anyway, returns unexpected error
|
|
||||||
err = errUnexpected
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -1515,14 +1475,8 @@ func (s *xlStorage) CreateFile(ctx context.Context, volume, path string, fileSiz
|
||||||
w.Close()
|
w.Close()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
var bufp *[]byte
|
bufp := s.poolLarge.Get().(*[]byte)
|
||||||
if fileSize <= smallFileThreshold {
|
defer s.poolLarge.Put(bufp)
|
||||||
bufp = s.poolSmall.Get().(*[]byte)
|
|
||||||
defer s.poolSmall.Put(bufp)
|
|
||||||
} else {
|
|
||||||
bufp = s.poolLarge.Get().(*[]byte)
|
|
||||||
defer s.poolLarge.Put(bufp)
|
|
||||||
}
|
|
||||||
|
|
||||||
written, err := xioutil.CopyAligned(w, r, *bufp, fileSize)
|
written, err := xioutil.CopyAligned(w, r, *bufp, fileSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
package ioutil
|
package ioutil
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
|
@ -188,33 +189,15 @@ const directioAlignSize = 4096
|
||||||
// the file opened for writes with syscall.O_DIRECT flag.
|
// the file opened for writes with syscall.O_DIRECT flag.
|
||||||
func CopyAligned(w *os.File, r io.Reader, alignedBuf []byte, totalSize int64) (int64, error) {
|
func CopyAligned(w *os.File, r io.Reader, alignedBuf []byte, totalSize int64) (int64, error) {
|
||||||
// Writes remaining bytes in the buffer.
|
// Writes remaining bytes in the buffer.
|
||||||
writeUnaligned := func(w *os.File, buf []byte) (remainingWritten int, err error) {
|
writeUnaligned := func(w *os.File, buf []byte) (remainingWritten int64, err error) {
|
||||||
var n int
|
// Disable O_DIRECT on fd's on unaligned buffer
|
||||||
remaining := len(buf)
|
// perform an amortized Fdatasync(fd) on the fd at
|
||||||
// The following logic writes the remainging data such that it writes whatever best is possible (aligned buffer)
|
// the end, this is performed by the caller before
|
||||||
// in O_DIRECT mode and remaining (unaligned buffer) in non-O_DIRECT mode.
|
// closing 'w'.
|
||||||
remainingAligned := (remaining / directioAlignSize) * directioAlignSize
|
if err = disk.DisableDirectIO(w); err != nil {
|
||||||
remainingAlignedBuf := buf[:remainingAligned]
|
return remainingWritten, err
|
||||||
remainingUnalignedBuf := buf[remainingAligned:]
|
|
||||||
if len(remainingAlignedBuf) > 0 {
|
|
||||||
n, err = w.Write(remainingAlignedBuf)
|
|
||||||
if err != nil {
|
|
||||||
return remainingWritten, err
|
|
||||||
}
|
|
||||||
remainingWritten += n
|
|
||||||
}
|
}
|
||||||
if len(remainingUnalignedBuf) > 0 {
|
return io.Copy(w, bytes.NewReader(buf))
|
||||||
// Write on O_DIRECT fds fail if buffer is not 4K aligned, hence disable O_DIRECT.
|
|
||||||
if err = disk.DisableDirectIO(w); err != nil {
|
|
||||||
return remainingWritten, err
|
|
||||||
}
|
|
||||||
n, err = w.Write(remainingUnalignedBuf)
|
|
||||||
if err != nil {
|
|
||||||
return remainingWritten, err
|
|
||||||
}
|
|
||||||
remainingWritten += n
|
|
||||||
}
|
|
||||||
return remainingWritten, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var written int64
|
var written int64
|
||||||
|
@ -232,21 +215,23 @@ func CopyAligned(w *os.File, r io.Reader, alignedBuf []byte, totalSize int64) (i
|
||||||
return written, err
|
return written, err
|
||||||
}
|
}
|
||||||
buf = buf[:nr]
|
buf = buf[:nr]
|
||||||
var nw int
|
var nw int64
|
||||||
if len(buf)%directioAlignSize == 0 {
|
if len(buf)%directioAlignSize == 0 {
|
||||||
|
var n int
|
||||||
// buf is aligned for directio write()
|
// buf is aligned for directio write()
|
||||||
nw, err = w.Write(buf)
|
n, err = w.Write(buf)
|
||||||
|
nw = int64(n)
|
||||||
} else {
|
} else {
|
||||||
// buf is not aligned, hence use writeUnaligned()
|
// buf is not aligned, hence use writeUnaligned()
|
||||||
nw, err = writeUnaligned(w, buf)
|
nw, err = writeUnaligned(w, buf)
|
||||||
}
|
}
|
||||||
if nw > 0 {
|
if nw > 0 {
|
||||||
written += int64(nw)
|
written += nw
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return written, err
|
return written, err
|
||||||
}
|
}
|
||||||
if nw != len(buf) {
|
if nw != int64(len(buf)) {
|
||||||
return written, io.ErrShortWrite
|
return written, io.ErrShortWrite
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue