From 2451b9a75a2495666281cb8f8e5ab25a9bacb770 Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Fri, 27 Aug 2021 18:16:36 +0200 Subject: [PATCH] fix: hanging operations on PUT with slow IO (#13087) #11878 added "keepHTTPResponseAlive" to CreateFile requests. The problem is that it will begin writing to the response before the body is read after 10 seconds. This will abort the writes on the client-side, since it assumes the server has received what it wants. The proposed solution here is to monitor the completion of the body before beginning to send keepalive pings. Fixes observed high number of goroutines stuck in `io.Copy` in `github.com/minio/minio/cmd.(*xlStorage).CreateFile` and `(*storageRESTClient).CreateFile` stuck in `http.DrainBody`. --- cmd/bitrot-streaming.go | 4 ++ cmd/storage-rest-client.go | 4 ++ cmd/storage-rest-server.go | 95 +++++++++++++++++++++++++++++++++++++- 3 files changed, 101 insertions(+), 2 deletions(-) diff --git a/cmd/bitrot-streaming.go b/cmd/bitrot-streaming.go index 54300e9ec..91cddcb0d 100644 --- a/cmd/bitrot-streaming.go +++ b/cmd/bitrot-streaming.go @@ -57,6 +57,10 @@ func (b *streamingBitrotWriter) Write(p []byte) (int, error) { b.closeWithErr(err) return n, err } + if n != len(p) { + err = io.ErrShortWrite + b.closeWithErr(err) + } return n, err } diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index dfe57899e..4fff5da07 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -377,6 +377,10 @@ func (client *storageRESTClient) CreateFile(ctx context.Context, volume, path st values.Set(storageRESTLength, strconv.Itoa(int(size))) respBody, err := client.call(ctx, storageRESTMethodCreateFile, values, ioutil.NopCloser(reader), size) defer xhttp.DrainBody(respBody) + if err != nil { + return err + } + _, err = waitForHTTPResponse(respBody) return err } diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index 50b9a9ff1..a43119155 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -326,8 +326,8 @@ func (s *storageRESTServer) CreateFileHandler(w http.ResponseWriter, r *http.Req return } - done := keepHTTPResponseAlive(w) - done(s.storage.CreateFile(r.Context(), volume, filePath, int64(fileSize), r.Body)) + done, body := keepHTTPReqResponseAlive(w, r) + done(s.storage.CreateFile(r.Context(), volume, filePath, int64(fileSize), body)) } // DeleteVersion delete updated metadata. @@ -719,8 +719,99 @@ func (s *storageRESTServer) RenameFileHandler(w http.ResponseWriter, r *http.Req } } +// closeNotifier is itself a ReadCloser that will notify when either an error occurs or +// the Close() function is called. +type closeNotifier struct { + rc io.ReadCloser + done chan struct{} +} + +func (c *closeNotifier) Read(p []byte) (n int, err error) { + n, err = c.rc.Read(p) + if err != nil { + if c.done != nil { + close(c.done) + c.done = nil + } + } + return n, err +} + +func (c *closeNotifier) Close() error { + if c.done != nil { + close(c.done) + c.done = nil + } + return c.rc.Close() +} + +// keepHTTPReqResponseAlive can be used to avoid timeouts with long storage +// operations, such as bitrot verification or data usage scanning. +// Every 10 seconds a space character is sent. +// keepHTTPReqResponseAlive will wait for the returned body to be read before starting the ticker. +// The returned function should always be called to release resources. +// An optional error can be sent which will be picked as text only error, +// without its original type by the receiver. +// waitForHTTPResponse should be used to the receiving side. +func keepHTTPReqResponseAlive(w http.ResponseWriter, r *http.Request) (resp func(error), body io.ReadCloser) { + bodyDoneCh := make(chan struct{}) + doneCh := make(chan error) + ctx := r.Context() + go func() { + // Wait for body to be read. + select { + case <-ctx.Done(): + case <-bodyDoneCh: + case err := <-doneCh: + if err != nil { + w.Write([]byte{1}) + w.Write([]byte(err.Error())) + } else { + w.Write([]byte{0}) + } + return + } + defer close(doneCh) + // Initiate ticker after body has been read. + ticker := time.NewTicker(time.Second * 10) + for { + select { + case <-ticker.C: + // Response not ready, write a filler byte. + w.Write([]byte{32}) + w.(http.Flusher).Flush() + case err := <-doneCh: + if err != nil { + w.Write([]byte{1}) + w.Write([]byte(err.Error())) + } else { + w.Write([]byte{0}) + } + ticker.Stop() + return + } + } + }() + return func(err error) { + if doneCh == nil { + return + } + + // Indicate we are ready to write. + doneCh <- err + + // Wait for channel to be closed so we don't race on writes. + <-doneCh + + // Clear so we can be called multiple times without crashing. + doneCh = nil + }, &closeNotifier{rc: r.Body, done: bodyDoneCh} +} + // keepHTTPResponseAlive can be used to avoid timeouts with long storage // operations, such as bitrot verification or data usage scanning. +// keepHTTPResponseAlive may NOT be used until the request body has been read, +// use keepHTTPReqResponseAlive instead. // Every 10 seconds a space character is sent. // The returned function should always be called to release resources. // An optional error can be sent which will be picked as text only error,