use sendfile/splice implementation to perform DMA (#18411)

sendfile implementation to perform DMA on all platforms

Go stdlib already supports sendfile/splice implementations
for

- Linux
- Windows
- *BSD
- Solaris

Along with this change however O_DIRECT for reads() must be
removed as well since we need to use sendfile() implementation

The main reason to add O_DIRECT for reads was to reduce the
chances of page-cache causing OOMs for MinIO, however it would
seem that avoiding buffer copies from user-space to kernel space
this issue is not a problem anymore.

There is no Go based memory allocation required, and neither
the page-cache is referenced back to MinIO. This page-
cache reference is fully owned by kernel at this point, this
essentially should solve the problem of page-cache build up.

With this now we also support SG - when NIC supports Scatter/Gather
https://en.wikipedia.org/wiki/Gather/scatter_(vector_addressing)
This commit is contained in:
Harshavardhana 2023-11-10 10:10:14 -08:00 committed by GitHub
parent 80adc87a14
commit 91d8bddbd1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 68 additions and 47 deletions

View File

@ -611,6 +611,8 @@ func (s *storageRESTServer) ReadFileStreamHandler(w http.ResponseWriter, r *http
return
}
w.Header().Set(xhttp.ContentLength, strconv.Itoa(length))
rc, err := s.storage.ReadFileStream(r.Context(), volume, filePath, int64(offset), int64(length))
if err != nil {
s.writeErrorResponse(w, err)
@ -618,12 +620,28 @@ func (s *storageRESTServer) ReadFileStreamHandler(w http.ResponseWriter, r *http
}
defer rc.Close()
w.Header().Set(xhttp.ContentLength, strconv.Itoa(length))
if _, err = xioutil.Copy(w, rc); err != nil {
if !xnet.IsNetworkOrHostDown(err, true) { // do not need to log disconnected clients
logger.LogIf(r.Context(), err)
rf, ok := w.(io.ReaderFrom)
if ok {
// Attempt to use splice/sendfile() optimization, A very specific behavior mentioned below is necessary.
// See https://github.com/golang/go/blob/f7c5cbb82087c55aa82081e931e0142783700ce8/src/net/sendfile_linux.go#L20
dr, ok := rc.(*xioutil.DeadlineReader)
if ok {
sr, ok := dr.ReadCloser.(*sendFileReader)
if ok {
_, err = rf.ReadFrom(sr.Reader)
if !xnet.IsNetworkOrHostDown(err, true) { // do not need to log disconnected clients
logger.LogIf(r.Context(), err)
}
if err == nil || !errors.Is(err, xhttp.ErrNotImplemented) {
return
}
}
}
return
} // Fallback to regular copy
_, err = xioutil.Copy(w, rc)
if !xnet.IsNetworkOrHostDown(err, true) { // do not need to log disconnected clients
logger.LogIf(r.Context(), err)
}
}

View File

@ -421,7 +421,13 @@ func (p *xlStorageDiskIDCheck) ReadFile(ctx context.Context, volume string, path
}
defer done(&err)
return p.storage.ReadFile(ctx, volume, path, offset, buf, verifier)
w := xioutil.NewDeadlineWorker(diskMaxTimeout)
err = w.Run(func() error {
n, err = p.storage.ReadFile(ctx, volume, path, offset, buf, verifier)
return err
})
return n, err
}
// Legacy API - does not have any deadlines
@ -432,7 +438,10 @@ func (p *xlStorageDiskIDCheck) AppendFile(ctx context.Context, volume string, pa
}
defer done(&err)
return p.storage.AppendFile(ctx, volume, path, buf)
w := xioutil.NewDeadlineWorker(diskMaxTimeout)
return w.Run(func() error {
return p.storage.AppendFile(ctx, volume, path, buf)
})
}
func (p *xlStorageDiskIDCheck) CreateFile(ctx context.Context, volume, path string, size int64, reader io.Reader) (err error) {
@ -442,7 +451,7 @@ func (p *xlStorageDiskIDCheck) CreateFile(ctx context.Context, volume, path stri
}
defer done(&err)
return p.storage.CreateFile(ctx, volume, path, size, reader)
return p.storage.CreateFile(ctx, volume, path, size, xioutil.NewDeadlineReader(io.NopCloser(reader), diskMaxTimeout))
}
func (p *xlStorageDiskIDCheck) ReadFileStream(ctx context.Context, volume, path string, offset, length int64) (io.ReadCloser, error) {
@ -452,9 +461,16 @@ func (p *xlStorageDiskIDCheck) ReadFileStream(ctx context.Context, volume, path
}
defer done(&err)
rc, err := p.storage.ReadFileStream(ctx, volume, path, offset, length)
w := xioutil.NewDeadlineWorker(diskMaxTimeout)
var rc io.ReadCloser
err = w.Run(func() error {
var ierr error
rc, ierr = p.storage.ReadFileStream(ctx, volume, path, offset, length)
return ierr
})
if err != nil {
return rc, err
return nil, err
}
return xioutil.NewDeadlineReader(rc, diskMaxTimeout), nil

View File

@ -1779,6 +1779,11 @@ func (s *xlStorage) openFile(filePath string, mode int) (f *os.File, err error)
return w, nil
}
type sendFileReader struct {
io.Reader
io.Closer
}
// ReadFileStream - Returns the read stream of the file.
func (s *xlStorage) ReadFileStream(ctx context.Context, volume, path string, offset, length int64) (io.ReadCloser, error) {
if offset < 0 {
@ -1796,14 +1801,7 @@ func (s *xlStorage) ReadFileStream(ctx context.Context, volume, path string, off
return nil, err
}
odirectEnabled := globalAPIConfig.odirectEnabled() && s.oDirect && length >= 0
var file *os.File
if odirectEnabled {
file, err = OpenFileDirectIO(filePath, readMode, 0o666)
} else {
file, err = OpenFile(filePath, readMode, 0o666)
}
file, err := OpenFile(filePath, readMode, 0o666)
if err != nil {
switch {
case osIsNotExist(err):
@ -1852,14 +1850,6 @@ func (s *xlStorage) ReadFileStream(ctx context.Context, volume, path string, off
return nil, errFileCorrupt
}
alignment := offset%xioutil.DirectioAlignSize == 0
if !alignment && odirectEnabled {
if err = disk.DisableDirectIO(file); err != nil {
file.Close()
return nil, err
}
}
if offset > 0 {
if _, err = file.Seek(offset, io.SeekStart); err != nil {
file.Close()
@ -1867,26 +1857,7 @@ func (s *xlStorage) ReadFileStream(ctx context.Context, volume, path string, off
}
}
or := &xioutil.ODirectReader{
File: file,
// Select bigger blocks when reading at least 50% of a big block.
SmallFile: length <= xioutil.BlockSizeLarge/2,
}
r := struct {
io.Reader
io.Closer
}{Reader: io.LimitReader(diskHealthReader(ctx, or), length), Closer: closeWrapper(func() error {
if (!alignment || offset+length%xioutil.DirectioAlignSize != 0) && odirectEnabled {
// invalidate page-cache for unaligned reads.
// skip removing from page-cache only
// if O_DIRECT was disabled.
disk.FadviseDontNeed(file)
}
return or.Close()
})}
return r, nil
return &sendFileReader{Reader: io.LimitReader(file, length), Closer: file}, nil
}
// closeWrapper converts a function to an io.Closer

View File

@ -94,7 +94,7 @@ var (
},
config.HelpKV{
Key: apiODirect,
Description: "set to enable or disable O_DIRECT for read and writes under special conditions. NOTE: do not disable O_DIRECT without prior testing" + defaultHelpPostfix(apiODirect),
Description: "set to enable or disable O_DIRECT for writes under special conditions. NOTE: do not disable O_DIRECT without prior testing" + defaultHelpPostfix(apiODirect),
Optional: true,
Type: "boolean",
},

View File

@ -19,6 +19,7 @@ package http
import (
"bytes"
"errors"
"fmt"
"io"
"net/http"
@ -29,6 +30,7 @@ import (
// status code and to record the response body
type ResponseRecorder struct {
http.ResponseWriter
io.ReaderFrom
StatusCode int
// Log body of 4xx or 5xx responses
LogErrBody bool
@ -51,13 +53,27 @@ type ResponseRecorder struct {
// NewResponseRecorder - returns a wrapped response writer to trap
// http status codes for auditing purposes.
func NewResponseRecorder(w http.ResponseWriter) *ResponseRecorder {
rf, _ := w.(io.ReaderFrom)
return &ResponseRecorder{
ResponseWriter: w,
ReaderFrom: rf,
StatusCode: http.StatusOK,
StartTime: time.Now().UTC(),
}
}
// ErrNotImplemented when a functionality is not implemented
var ErrNotImplemented = errors.New("not implemented")
// ReadFrom implements support for calling internal io.ReaderFrom implementations
// returns an error if the underlying ResponseWriter does not implement io.ReaderFrom
func (lrw *ResponseRecorder) ReadFrom(r io.Reader) (int64, error) {
if lrw.ReaderFrom != nil {
return lrw.ReaderFrom.ReadFrom(r)
}
return 0, ErrNotImplemented
}
func (lrw *ResponseRecorder) Write(p []byte) (int, error) {
if !lrw.headersLogged {
// We assume the response code to be '200 OK' when WriteHeader() is not called,