mirror of
https://github.com/minio/minio.git
synced 2024-12-24 06:05:55 -05:00
fix: hanging operations on PUT with slow IO (#13087)
#11878 added "keepHTTPResponseAlive" to CreateFile requests. The problem is that it will begin writing to the response before the body is read after 10 seconds. This will abort the writes on the client-side, since it assumes the server has received what it wants. The proposed solution here is to monitor the completion of the body before beginning to send keepalive pings. Fixes observed high number of goroutines stuck in `io.Copy` in `github.com/minio/minio/cmd.(*xlStorage).CreateFile` and `(*storageRESTClient).CreateFile` stuck in `http.DrainBody`.
This commit is contained in:
parent
06b71c99ee
commit
2451b9a75a
@ -57,6 +57,10 @@ func (b *streamingBitrotWriter) Write(p []byte) (int, error) {
|
|||||||
b.closeWithErr(err)
|
b.closeWithErr(err)
|
||||||
return n, err
|
return n, err
|
||||||
}
|
}
|
||||||
|
if n != len(p) {
|
||||||
|
err = io.ErrShortWrite
|
||||||
|
b.closeWithErr(err)
|
||||||
|
}
|
||||||
return n, err
|
return n, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -377,6 +377,10 @@ func (client *storageRESTClient) CreateFile(ctx context.Context, volume, path st
|
|||||||
values.Set(storageRESTLength, strconv.Itoa(int(size)))
|
values.Set(storageRESTLength, strconv.Itoa(int(size)))
|
||||||
respBody, err := client.call(ctx, storageRESTMethodCreateFile, values, ioutil.NopCloser(reader), size)
|
respBody, err := client.call(ctx, storageRESTMethodCreateFile, values, ioutil.NopCloser(reader), size)
|
||||||
defer xhttp.DrainBody(respBody)
|
defer xhttp.DrainBody(respBody)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_, err = waitForHTTPResponse(respBody)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -326,8 +326,8 @@ func (s *storageRESTServer) CreateFileHandler(w http.ResponseWriter, r *http.Req
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
done := keepHTTPResponseAlive(w)
|
done, body := keepHTTPReqResponseAlive(w, r)
|
||||||
done(s.storage.CreateFile(r.Context(), volume, filePath, int64(fileSize), r.Body))
|
done(s.storage.CreateFile(r.Context(), volume, filePath, int64(fileSize), body))
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteVersion delete updated metadata.
|
// DeleteVersion delete updated metadata.
|
||||||
@ -719,8 +719,99 @@ func (s *storageRESTServer) RenameFileHandler(w http.ResponseWriter, r *http.Req
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// closeNotifier is itself a ReadCloser that will notify when either an error occurs or
|
||||||
|
// the Close() function is called.
|
||||||
|
type closeNotifier struct {
|
||||||
|
rc io.ReadCloser
|
||||||
|
done chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *closeNotifier) Read(p []byte) (n int, err error) {
|
||||||
|
n, err = c.rc.Read(p)
|
||||||
|
if err != nil {
|
||||||
|
if c.done != nil {
|
||||||
|
close(c.done)
|
||||||
|
c.done = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *closeNotifier) Close() error {
|
||||||
|
if c.done != nil {
|
||||||
|
close(c.done)
|
||||||
|
c.done = nil
|
||||||
|
}
|
||||||
|
return c.rc.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// keepHTTPReqResponseAlive can be used to avoid timeouts with long storage
|
||||||
|
// operations, such as bitrot verification or data usage scanning.
|
||||||
|
// Every 10 seconds a space character is sent.
|
||||||
|
// keepHTTPReqResponseAlive will wait for the returned body to be read before starting the ticker.
|
||||||
|
// The returned function should always be called to release resources.
|
||||||
|
// An optional error can be sent which will be picked as text only error,
|
||||||
|
// without its original type by the receiver.
|
||||||
|
// waitForHTTPResponse should be used to the receiving side.
|
||||||
|
func keepHTTPReqResponseAlive(w http.ResponseWriter, r *http.Request) (resp func(error), body io.ReadCloser) {
|
||||||
|
bodyDoneCh := make(chan struct{})
|
||||||
|
doneCh := make(chan error)
|
||||||
|
ctx := r.Context()
|
||||||
|
go func() {
|
||||||
|
// Wait for body to be read.
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
case <-bodyDoneCh:
|
||||||
|
case err := <-doneCh:
|
||||||
|
if err != nil {
|
||||||
|
w.Write([]byte{1})
|
||||||
|
w.Write([]byte(err.Error()))
|
||||||
|
} else {
|
||||||
|
w.Write([]byte{0})
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer close(doneCh)
|
||||||
|
// Initiate ticker after body has been read.
|
||||||
|
ticker := time.NewTicker(time.Second * 10)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
// Response not ready, write a filler byte.
|
||||||
|
w.Write([]byte{32})
|
||||||
|
w.(http.Flusher).Flush()
|
||||||
|
case err := <-doneCh:
|
||||||
|
if err != nil {
|
||||||
|
w.Write([]byte{1})
|
||||||
|
w.Write([]byte(err.Error()))
|
||||||
|
} else {
|
||||||
|
w.Write([]byte{0})
|
||||||
|
}
|
||||||
|
ticker.Stop()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return func(err error) {
|
||||||
|
if doneCh == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Indicate we are ready to write.
|
||||||
|
doneCh <- err
|
||||||
|
|
||||||
|
// Wait for channel to be closed so we don't race on writes.
|
||||||
|
<-doneCh
|
||||||
|
|
||||||
|
// Clear so we can be called multiple times without crashing.
|
||||||
|
doneCh = nil
|
||||||
|
}, &closeNotifier{rc: r.Body, done: bodyDoneCh}
|
||||||
|
}
|
||||||
|
|
||||||
// keepHTTPResponseAlive can be used to avoid timeouts with long storage
|
// keepHTTPResponseAlive can be used to avoid timeouts with long storage
|
||||||
// operations, such as bitrot verification or data usage scanning.
|
// operations, such as bitrot verification or data usage scanning.
|
||||||
|
// keepHTTPResponseAlive may NOT be used until the request body has been read,
|
||||||
|
// use keepHTTPReqResponseAlive instead.
|
||||||
// Every 10 seconds a space character is sent.
|
// Every 10 seconds a space character is sent.
|
||||||
// The returned function should always be called to release resources.
|
// The returned function should always be called to release resources.
|
||||||
// An optional error can be sent which will be picked as text only error,
|
// An optional error can be sent which will be picked as text only error,
|
||||||
|
Loading…
Reference in New Issue
Block a user