mirror of
https://github.com/minio/minio.git
synced 2025-01-23 12:43:16 -05:00
fix: avoid writing more content on network with O_DIRECT reads (#11659)
There was an io.LimitReader was missing for the 'length' parameter for ranged requests, that would cause client to get truncated responses and errors. fixes #11651
This commit is contained in:
parent
c67d1bf120
commit
37960cbc2f
@ -54,6 +54,7 @@ import (
|
|||||||
"github.com/minio/minio/pkg/hash"
|
"github.com/minio/minio/pkg/hash"
|
||||||
iampolicy "github.com/minio/minio/pkg/iam/policy"
|
iampolicy "github.com/minio/minio/pkg/iam/policy"
|
||||||
"github.com/minio/minio/pkg/ioutil"
|
"github.com/minio/minio/pkg/ioutil"
|
||||||
|
xnet "github.com/minio/minio/pkg/net"
|
||||||
"github.com/minio/minio/pkg/s3select"
|
"github.com/minio/minio/pkg/s3select"
|
||||||
"github.com/minio/sio"
|
"github.com/minio/sio"
|
||||||
)
|
)
|
||||||
@ -507,6 +508,10 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req
|
|||||||
if !httpWriter.HasWritten() && !statusCodeWritten {
|
if !httpWriter.HasWritten() && !statusCodeWritten {
|
||||||
// write error response only if no data or headers has been written to client yet
|
// write error response only if no data or headers has been written to client yet
|
||||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !xnet.IsNetworkOrHostDown(err, true) { // do not need to log disconnected clients
|
||||||
|
logger.LogIf(ctx, fmt.Errorf("Unable to write all the data to client %w", err))
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -516,6 +521,10 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req
|
|||||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if !xnet.IsNetworkOrHostDown(err, true) { // do not need to log disconnected clients
|
||||||
|
logger.LogIf(ctx, fmt.Errorf("Unable to write all the data to client %w", err))
|
||||||
|
}
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Notify object accessed via a GET request.
|
// Notify object accessed via a GET request.
|
||||||
|
@ -40,6 +40,7 @@ import (
|
|||||||
xhttp "github.com/minio/minio/cmd/http"
|
xhttp "github.com/minio/minio/cmd/http"
|
||||||
xjwt "github.com/minio/minio/cmd/jwt"
|
xjwt "github.com/minio/minio/cmd/jwt"
|
||||||
"github.com/minio/minio/cmd/logger"
|
"github.com/minio/minio/cmd/logger"
|
||||||
|
xnet "github.com/minio/minio/pkg/net"
|
||||||
)
|
)
|
||||||
|
|
||||||
var errDiskStale = errors.New("disk stale")
|
var errDiskStale = errors.New("disk stale")
|
||||||
@ -527,7 +528,12 @@ func (s *storageRESTServer) ReadFileStreamHandler(w http.ResponseWriter, r *http
|
|||||||
defer rc.Close()
|
defer rc.Close()
|
||||||
|
|
||||||
w.Header().Set(xhttp.ContentLength, strconv.Itoa(length))
|
w.Header().Set(xhttp.ContentLength, strconv.Itoa(length))
|
||||||
io.Copy(w, rc)
|
if _, err = io.Copy(w, rc); err != nil {
|
||||||
|
if !xnet.IsNetworkOrHostDown(err, true) { // do not need to log disconnected clients
|
||||||
|
logger.LogIf(r.Context(), err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
w.(http.Flusher).Flush()
|
w.(http.Flusher).Flush()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1423,9 +1423,6 @@ func (o *odirectReader) Close() error {
|
|||||||
} else {
|
} else {
|
||||||
o.s.poolLarge.Put(o.bufp)
|
o.s.poolLarge.Put(o.bufp)
|
||||||
}
|
}
|
||||||
defer func() {
|
|
||||||
atomic.AddInt32(&o.s.activeIOCount, -1)
|
|
||||||
}()
|
|
||||||
return o.f.Close()
|
return o.f.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1449,10 +1446,10 @@ func (s *xlStorage) ReadFileStream(ctx context.Context, volume, path string, off
|
|||||||
var file *os.File
|
var file *os.File
|
||||||
// O_DIRECT only supported if offset is zero
|
// O_DIRECT only supported if offset is zero
|
||||||
if offset == 0 && globalStorageClass.GetDMA() == storageclass.DMAReadWrite && s.readODirectSupported {
|
if offset == 0 && globalStorageClass.GetDMA() == storageclass.DMAReadWrite && s.readODirectSupported {
|
||||||
file, err = disk.OpenFileDirectIO(filePath, os.O_RDONLY, 0666)
|
file, err = disk.OpenFileDirectIO(filePath, readMode, 0666)
|
||||||
} else {
|
} else {
|
||||||
// Open the file for reading.
|
// Open the file for reading.
|
||||||
file, err = os.Open(filePath)
|
file, err = os.OpenFile(filePath, readMode, 0666)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
switch {
|
switch {
|
||||||
@ -1479,27 +1476,33 @@ func (s *xlStorage) ReadFileStream(ctx context.Context, volume, path string, off
|
|||||||
|
|
||||||
st, err := file.Stat()
|
st, err := file.Stat()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
file.Close()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify it is a regular file, otherwise subsequent Seek is
|
// Verify it is a regular file, otherwise subsequent Seek is
|
||||||
// undefined.
|
// undefined.
|
||||||
if !st.Mode().IsRegular() {
|
if !st.Mode().IsRegular() {
|
||||||
|
file.Close()
|
||||||
return nil, errIsNotRegular
|
return nil, errIsNotRegular
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic.AddInt32(&s.activeIOCount, 1)
|
atomic.AddInt32(&s.activeIOCount, 1)
|
||||||
if offset == 0 && globalStorageClass.GetDMA() == storageclass.DMAReadWrite && s.readODirectSupported {
|
if offset == 0 && globalStorageClass.GetDMA() == storageclass.DMAReadWrite && s.readODirectSupported {
|
||||||
|
or := &odirectReader{file, nil, nil, true, false, s, nil}
|
||||||
if length <= smallFileThreshold {
|
if length <= smallFileThreshold {
|
||||||
return &odirectReader{file, nil, nil, true, true, s, nil}, nil
|
or = &odirectReader{file, nil, nil, true, true, s, nil}
|
||||||
}
|
|
||||||
return &odirectReader{file, nil, nil, true, false, s, nil}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if offset > 0 {
|
|
||||||
if _, err = file.Seek(offset, io.SeekStart); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
|
r := struct {
|
||||||
|
io.Reader
|
||||||
|
io.Closer
|
||||||
|
}{Reader: io.LimitReader(or, length), Closer: closeWrapper(func() error {
|
||||||
|
defer func() {
|
||||||
|
atomic.AddInt32(&s.activeIOCount, -1)
|
||||||
|
}()
|
||||||
|
return or.Close()
|
||||||
|
})}
|
||||||
|
return r, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
r := struct {
|
r := struct {
|
||||||
@ -1512,6 +1515,13 @@ func (s *xlStorage) ReadFileStream(ctx context.Context, volume, path string, off
|
|||||||
return file.Close()
|
return file.Close()
|
||||||
})}
|
})}
|
||||||
|
|
||||||
|
if offset > 0 {
|
||||||
|
if _, err = file.Seek(offset, io.SeekStart); err != nil {
|
||||||
|
r.Close()
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Add readahead to big reads
|
// Add readahead to big reads
|
||||||
if length >= readAheadSize {
|
if length >= readAheadSize {
|
||||||
rc, err := readahead.NewReadCloserSize(r, readAheadBuffers, readAheadBufSize)
|
rc, err := readahead.NewReadCloserSize(r, readAheadBuffers, readAheadBufSize)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user