mirror of
https://github.com/minio/minio.git
synced 2025-02-02 17:35:58 -05:00
Fix CreateFile shared buffer corruption. (#18652)
`(*xlStorageDiskIDCheck).CreateFile` wraps the incoming reader in `xioutil.NewDeadlineReader`. The wrapped reader is handed to `(*xlStorage).CreateFile`. This performs a Read call via `writeAllDirect`, which reads into an `ODirectPool` buffer. `(*DeadlineReader).Read` spawns an async read into the buffer. If a timeout is hit while reading, the read operation returns to `writeAllDirect`. The operation returns an error and the buffer is reused. However, if the async `Read` call unblocks, it will write to the now recycled buffer. Fix: Remove the `DeadlineReader` - it is inherently unsafe. Instead, rely on the network timeouts. This is not a disk timeout, anyway. Regression in https://github.com/minio/minio/pull/17745
This commit is contained in:
parent
8fa2898ff1
commit
6c89a81af4
@ -639,17 +639,14 @@ func (s *storageRESTServer) ReadFileStreamHandler(w http.ResponseWriter, r *http
|
||||
// Attempt to use splice/sendfile() optimization, A very specific behavior mentioned below is necessary.
|
||||
// See https://github.com/golang/go/blob/f7c5cbb82087c55aa82081e931e0142783700ce8/src/net/sendfile_linux.go#L20
|
||||
// Windows can lock up with this optimization, so we fall back to regular copy.
|
||||
dr, ok := rc.(*xioutil.DeadlineReader)
|
||||
sr, ok := rc.(*sendFileReader)
|
||||
if ok {
|
||||
sr, ok := dr.ReadCloser.(*sendFileReader)
|
||||
if ok {
|
||||
_, err = rf.ReadFrom(sr.Reader)
|
||||
if !xnet.IsNetworkOrHostDown(err, true) { // do not need to log disconnected clients
|
||||
logger.LogIf(r.Context(), err)
|
||||
}
|
||||
if err == nil || !errors.Is(err, xhttp.ErrNotImplemented) {
|
||||
return
|
||||
}
|
||||
_, err = rf.ReadFrom(sr.Reader)
|
||||
if !xnet.IsNetworkOrHostDown(err, true) { // do not need to log disconnected clients
|
||||
logger.LogIf(r.Context(), err)
|
||||
}
|
||||
if err == nil || !errors.Is(err, xhttp.ErrNotImplemented) {
|
||||
return
|
||||
}
|
||||
}
|
||||
} // Fallback to regular copy
|
||||
|
@ -458,7 +458,7 @@ func (p *xlStorageDiskIDCheck) CreateFile(ctx context.Context, volume, path stri
|
||||
}
|
||||
defer done(&err)
|
||||
|
||||
return p.storage.CreateFile(ctx, volume, path, size, xioutil.NewDeadlineReader(io.NopCloser(reader), globalDriveConfig.GetMaxTimeout()))
|
||||
return p.storage.CreateFile(ctx, volume, path, size, io.NopCloser(reader))
|
||||
}
|
||||
|
||||
func (p *xlStorageDiskIDCheck) ReadFileStream(ctx context.Context, volume, path string, offset, length int64) (io.ReadCloser, error) {
|
||||
@ -480,7 +480,7 @@ func (p *xlStorageDiskIDCheck) ReadFileStream(ctx context.Context, volume, path
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return xioutil.NewDeadlineReader(rc, globalDriveConfig.GetMaxTimeout()), nil
|
||||
return rc, nil
|
||||
}
|
||||
|
||||
func (p *xlStorageDiskIDCheck) RenameFile(ctx context.Context, srcVolume, srcPath, dstVolume, dstPath string) (err error) {
|
||||
|
@ -104,53 +104,6 @@ type ioret struct {
|
||||
err error
|
||||
}
|
||||
|
||||
// DeadlineReader deadline reader with timeout
|
||||
type DeadlineReader struct {
|
||||
io.ReadCloser
|
||||
timeout time.Duration
|
||||
err error
|
||||
}
|
||||
|
||||
// NewDeadlineReader wraps a writer to make it respect given deadline
|
||||
// value per Write(). If there is a blocking write, the returned Reader
|
||||
// will return whenever the timer hits (the return values are n=0
|
||||
// and err=context.DeadlineExceeded.)
|
||||
func NewDeadlineReader(r io.ReadCloser, timeout time.Duration) io.ReadCloser {
|
||||
return &DeadlineReader{ReadCloser: r, timeout: timeout}
|
||||
}
|
||||
|
||||
func (r *DeadlineReader) Read(buf []byte) (int, error) {
|
||||
if r.err != nil {
|
||||
return 0, r.err
|
||||
}
|
||||
|
||||
c := make(chan ioret, 1)
|
||||
t := time.NewTimer(r.timeout)
|
||||
go func() {
|
||||
n, err := r.ReadCloser.Read(buf)
|
||||
c <- ioret{n, err}
|
||||
close(c)
|
||||
}()
|
||||
|
||||
select {
|
||||
case res := <-c:
|
||||
if !t.Stop() {
|
||||
<-t.C
|
||||
}
|
||||
r.err = res.err
|
||||
return res.n, res.err
|
||||
case <-t.C:
|
||||
r.ReadCloser.Close()
|
||||
r.err = context.DeadlineExceeded
|
||||
return 0, context.DeadlineExceeded
|
||||
}
|
||||
}
|
||||
|
||||
// Close closer interface to close the underlying closer
|
||||
func (r *DeadlineReader) Close() error {
|
||||
return r.ReadCloser.Close()
|
||||
}
|
||||
|
||||
// DeadlineWriter deadline writer with timeout
|
||||
type DeadlineWriter struct {
|
||||
io.WriteCloser
|
||||
|
@ -28,19 +28,6 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
type sleepReader struct {
|
||||
timeout time.Duration
|
||||
}
|
||||
|
||||
func (r *sleepReader) Read(p []byte) (n int, err error) {
|
||||
time.Sleep(r.timeout)
|
||||
return len(p), nil
|
||||
}
|
||||
|
||||
func (r *sleepReader) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type sleepWriter struct {
|
||||
timeout time.Duration
|
||||
}
|
||||
@ -54,29 +41,6 @@ func (w *sleepWriter) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestDeadlineReader(t *testing.T) {
|
||||
r := NewDeadlineReader(&sleepReader{timeout: 500 * time.Millisecond}, 450*time.Millisecond)
|
||||
buf := make([]byte, 4)
|
||||
_, err := r.Read(buf)
|
||||
r.Close()
|
||||
if err != context.DeadlineExceeded {
|
||||
t.Errorf("DeadlineReader shouldn't be successful %v - should return context.DeadlineExceeded", err)
|
||||
}
|
||||
_, err = r.Read(buf)
|
||||
if err != context.DeadlineExceeded {
|
||||
t.Errorf("DeadlineReader shouldn't be successful %v - should return context.DeadlineExceeded", err)
|
||||
}
|
||||
r = NewDeadlineReader(&sleepReader{timeout: 100 * time.Millisecond}, 600*time.Millisecond)
|
||||
n, err := r.Read(buf)
|
||||
r.Close()
|
||||
if err != nil {
|
||||
t.Errorf("DeadlineReader should succeed but failed with %s", err)
|
||||
}
|
||||
if n != 4 {
|
||||
t.Errorf("DeadlineReader should succeed but should have only read 4 bytes, but returned %d instead", n)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeadlineWriter(t *testing.T) {
|
||||
w := NewDeadlineWriter(&sleepWriter{timeout: 500 * time.Millisecond}, 450*time.Millisecond)
|
||||
_, err := w.Write([]byte("1"))
|
||||
|
Loading…
x
Reference in New Issue
Block a user