diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index b33c822b1..8210ce097 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -611,6 +611,8 @@ func (s *storageRESTServer) ReadFileStreamHandler(w http.ResponseWriter, r *http return } + w.Header().Set(xhttp.ContentLength, strconv.Itoa(length)) + rc, err := s.storage.ReadFileStream(r.Context(), volume, filePath, int64(offset), int64(length)) if err != nil { s.writeErrorResponse(w, err) @@ -618,12 +620,28 @@ func (s *storageRESTServer) ReadFileStreamHandler(w http.ResponseWriter, r *http } defer rc.Close() - w.Header().Set(xhttp.ContentLength, strconv.Itoa(length)) - if _, err = xioutil.Copy(w, rc); err != nil { - if !xnet.IsNetworkOrHostDown(err, true) { // do not need to log disconnected clients - logger.LogIf(r.Context(), err) + rf, ok := w.(io.ReaderFrom) + if ok { + // Attempt to use splice/sendfile() optimization, A very specific behavior mentioned below is necessary. + // See https://github.com/golang/go/blob/f7c5cbb82087c55aa82081e931e0142783700ce8/src/net/sendfile_linux.go#L20 + dr, ok := rc.(*xioutil.DeadlineReader) + if ok { + sr, ok := dr.ReadCloser.(*sendFileReader) + if ok { + _, err = rf.ReadFrom(sr.Reader) + if !xnet.IsNetworkOrHostDown(err, true) { // do not need to log disconnected clients + logger.LogIf(r.Context(), err) + } + if err == nil || !errors.Is(err, xhttp.ErrNotImplemented) { + return + } + } } - return + } // Fallback to regular copy + + _, err = xioutil.Copy(w, rc) + if !xnet.IsNetworkOrHostDown(err, true) { // do not need to log disconnected clients + logger.LogIf(r.Context(), err) } } diff --git a/cmd/xl-storage-disk-id-check.go b/cmd/xl-storage-disk-id-check.go index 916281299..86611aee8 100644 --- a/cmd/xl-storage-disk-id-check.go +++ b/cmd/xl-storage-disk-id-check.go @@ -421,7 +421,13 @@ func (p *xlStorageDiskIDCheck) ReadFile(ctx context.Context, volume string, path } defer done(&err) - return p.storage.ReadFile(ctx, volume, path, offset, buf, verifier) + w := xioutil.NewDeadlineWorker(diskMaxTimeout) + err = w.Run(func() error { + n, err = p.storage.ReadFile(ctx, volume, path, offset, buf, verifier) + return err + }) + + return n, err } // Legacy API - does not have any deadlines @@ -432,7 +438,10 @@ func (p *xlStorageDiskIDCheck) AppendFile(ctx context.Context, volume string, pa } defer done(&err) - return p.storage.AppendFile(ctx, volume, path, buf) + w := xioutil.NewDeadlineWorker(diskMaxTimeout) + return w.Run(func() error { + return p.storage.AppendFile(ctx, volume, path, buf) + }) } func (p *xlStorageDiskIDCheck) CreateFile(ctx context.Context, volume, path string, size int64, reader io.Reader) (err error) { @@ -442,7 +451,7 @@ func (p *xlStorageDiskIDCheck) CreateFile(ctx context.Context, volume, path stri } defer done(&err) - return p.storage.CreateFile(ctx, volume, path, size, reader) + return p.storage.CreateFile(ctx, volume, path, size, xioutil.NewDeadlineReader(io.NopCloser(reader), diskMaxTimeout)) } func (p *xlStorageDiskIDCheck) ReadFileStream(ctx context.Context, volume, path string, offset, length int64) (io.ReadCloser, error) { @@ -452,9 +461,16 @@ func (p *xlStorageDiskIDCheck) ReadFileStream(ctx context.Context, volume, path } defer done(&err) - rc, err := p.storage.ReadFileStream(ctx, volume, path, offset, length) + w := xioutil.NewDeadlineWorker(diskMaxTimeout) + + var rc io.ReadCloser + err = w.Run(func() error { + var ierr error + rc, ierr = p.storage.ReadFileStream(ctx, volume, path, offset, length) + return ierr + }) if err != nil { - return rc, err + return nil, err } return xioutil.NewDeadlineReader(rc, diskMaxTimeout), nil diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 67981d475..8edd7d2cf 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -1779,6 +1779,11 @@ func (s *xlStorage) openFile(filePath string, mode int) (f *os.File, err error) return w, nil } +type sendFileReader struct { + io.Reader + io.Closer +} + // ReadFileStream - Returns the read stream of the file. func (s *xlStorage) ReadFileStream(ctx context.Context, volume, path string, offset, length int64) (io.ReadCloser, error) { if offset < 0 { @@ -1796,14 +1801,7 @@ func (s *xlStorage) ReadFileStream(ctx context.Context, volume, path string, off return nil, err } - odirectEnabled := globalAPIConfig.odirectEnabled() && s.oDirect && length >= 0 - - var file *os.File - if odirectEnabled { - file, err = OpenFileDirectIO(filePath, readMode, 0o666) - } else { - file, err = OpenFile(filePath, readMode, 0o666) - } + file, err := OpenFile(filePath, readMode, 0o666) if err != nil { switch { case osIsNotExist(err): @@ -1852,14 +1850,6 @@ func (s *xlStorage) ReadFileStream(ctx context.Context, volume, path string, off return nil, errFileCorrupt } - alignment := offset%xioutil.DirectioAlignSize == 0 - if !alignment && odirectEnabled { - if err = disk.DisableDirectIO(file); err != nil { - file.Close() - return nil, err - } - } - if offset > 0 { if _, err = file.Seek(offset, io.SeekStart); err != nil { file.Close() @@ -1867,26 +1857,7 @@ func (s *xlStorage) ReadFileStream(ctx context.Context, volume, path string, off } } - or := &xioutil.ODirectReader{ - File: file, - // Select bigger blocks when reading at least 50% of a big block. - SmallFile: length <= xioutil.BlockSizeLarge/2, - } - - r := struct { - io.Reader - io.Closer - }{Reader: io.LimitReader(diskHealthReader(ctx, or), length), Closer: closeWrapper(func() error { - if (!alignment || offset+length%xioutil.DirectioAlignSize != 0) && odirectEnabled { - // invalidate page-cache for unaligned reads. - // skip removing from page-cache only - // if O_DIRECT was disabled. - disk.FadviseDontNeed(file) - } - return or.Close() - })} - - return r, nil + return &sendFileReader{Reader: io.LimitReader(file, length), Closer: file}, nil } // closeWrapper converts a function to an io.Closer diff --git a/internal/config/api/help.go b/internal/config/api/help.go index 84b34e10b..d87fdef2c 100644 --- a/internal/config/api/help.go +++ b/internal/config/api/help.go @@ -94,7 +94,7 @@ var ( }, config.HelpKV{ Key: apiODirect, - Description: "set to enable or disable O_DIRECT for read and writes under special conditions. NOTE: do not disable O_DIRECT without prior testing" + defaultHelpPostfix(apiODirect), + Description: "set to enable or disable O_DIRECT for writes under special conditions. NOTE: do not disable O_DIRECT without prior testing" + defaultHelpPostfix(apiODirect), Optional: true, Type: "boolean", }, diff --git a/internal/http/response-recorder.go b/internal/http/response-recorder.go index 0f4b29d94..9ab47d5e8 100644 --- a/internal/http/response-recorder.go +++ b/internal/http/response-recorder.go @@ -19,6 +19,7 @@ package http import ( "bytes" + "errors" "fmt" "io" "net/http" @@ -29,6 +30,7 @@ import ( // status code and to record the response body type ResponseRecorder struct { http.ResponseWriter + io.ReaderFrom StatusCode int // Log body of 4xx or 5xx responses LogErrBody bool @@ -51,13 +53,27 @@ type ResponseRecorder struct { // NewResponseRecorder - returns a wrapped response writer to trap // http status codes for auditing purposes. func NewResponseRecorder(w http.ResponseWriter) *ResponseRecorder { + rf, _ := w.(io.ReaderFrom) return &ResponseRecorder{ ResponseWriter: w, + ReaderFrom: rf, StatusCode: http.StatusOK, StartTime: time.Now().UTC(), } } +// ErrNotImplemented when a functionality is not implemented +var ErrNotImplemented = errors.New("not implemented") + +// ReadFrom implements support for calling internal io.ReaderFrom implementations +// returns an error if the underlying ResponseWriter does not implement io.ReaderFrom +func (lrw *ResponseRecorder) ReadFrom(r io.Reader) (int64, error) { + if lrw.ReaderFrom != nil { + return lrw.ReaderFrom.ReadFrom(r) + } + return 0, ErrNotImplemented +} + func (lrw *ResponseRecorder) Write(p []byte) (int, error) { if !lrw.headersLogged { // We assume the response code to be '200 OK' when WriteHeader() is not called,