From 25a55bae6fa0ccd1815647d89625f69fbd2d07bd Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Thu, 30 Jul 2020 19:45:12 -0700 Subject: [PATCH] fix: avoid buffering of server sent events by proxies (#10164) --- cmd/admin-handlers.go | 11 +++++++---- cmd/api-headers.go | 7 +++++++ cmd/listen-notification-handlers.go | 3 +-- cmd/object-handlers.go | 4 +++- cmd/storage-rest-server.go | 13 +++++++------ 5 files changed, 25 insertions(+), 13 deletions(-) diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index 9f2af6a6f..7d1e7884b 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -695,7 +695,7 @@ func (a adminAPIHandlers) HealHandler(w http.ResponseWriter, r *http.Request) { // Start writing response to client started = true setCommonHeaders(w) - w.Header().Set(xhttp.ContentType, "text/event-stream") + setEventStreamHeaders(w) // Set 200 OK status w.WriteHeader(200) } @@ -995,7 +995,7 @@ func (a adminAPIHandlers) TraceHandler(w http.ResponseWriter, r *http.Request) { return } - w.Header().Set(xhttp.ContentType, "text/event-stream") + setEventStreamHeaders(w) // Trace Publisher and peer-trace-client uses nonblocking send and hence does not wait for slow receivers. // Use buffered channel to take care of burst sends or slow w.Write() @@ -1064,7 +1064,8 @@ func (a adminAPIHandlers) ConsoleLogHandler(w http.ResponseWriter, r *http.Reque // This is needed to make r.Context().Done() work as // expected in case of read timeout w.Header().Add("Connection", "close") - w.Header().Set(xhttp.ContentType, "text/event-stream") + + setEventStreamHeaders(w) logCh := make(chan interface{}, 4000) @@ -1223,7 +1224,9 @@ func (a adminAPIHandlers) OBDInfoHandler(w http.ResponseWriter, r *http.Request) } setCommonHeaders(w) - w.Header().Set(xhttp.ContentType, "text/event-stream") + + setEventStreamHeaders(w) + w.WriteHeader(http.StatusOK) errResp := func(err error) { diff --git a/cmd/api-headers.go b/cmd/api-headers.go index 293ce5169..8eeacf374 100644 --- a/cmd/api-headers.go +++ b/cmd/api-headers.go @@ -38,6 +38,13 @@ func mustGetRequestID(t time.Time) string { return fmt.Sprintf("%X", t.UnixNano()) } +// setEventStreamHeaders to allow proxies to avoid buffering proxy responses +func setEventStreamHeaders(w http.ResponseWriter) { + w.Header().Set(xhttp.ContentType, "text/event-stream") + w.Header().Set(xhttp.CacheControl, "no-cache") // nginx to turn off buffering + w.Header().Set("X-Accel-Buffering", "no") // nginx to turn off buffering +} + // Write http common headers func setCommonHeaders(w http.ResponseWriter) { w.Header().Set(xhttp.ServerInfo, "MinIO/"+ReleaseTag) diff --git a/cmd/listen-notification-handlers.go b/cmd/listen-notification-handlers.go index 6ec5a99e7..9ae8cee6b 100644 --- a/cmd/listen-notification-handlers.go +++ b/cmd/listen-notification-handlers.go @@ -22,7 +22,6 @@ import ( "time" "github.com/gorilla/mux" - xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" policy "github.com/minio/minio/pkg/bucket/policy" "github.com/minio/minio/pkg/event" @@ -119,7 +118,7 @@ func (api objectAPIHandlers) ListenNotificationHandler(w http.ResponseWriter, r rulesMap := event.NewRulesMap(eventNames, pattern, event.TargetID{ID: mustGetUUID()}) - w.Header().Set(xhttp.ContentType, "text/event-stream") + setEventStreamHeaders(w) // Listen Publisher and peer-listen-client uses nonblocking send and hence does not wait for slow receivers. // Use buffered channel to take care of burst sends or slow w.Write() diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index f398d9029..61e096aad 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -2615,7 +2615,9 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite w.Write(encodedErrorResponse) w.(http.Flusher).Flush() } - w.Header().Set(xhttp.ContentType, "text/event-stream") + + setEventStreamHeaders(w) + w = &whiteSpaceWriter{ResponseWriter: w, Flusher: w.(http.Flusher)} completeDoneCh := sendWhiteSpace(w) objInfo, err := completeMultiPartUpload(ctx, bucket, object, uploadID, completeParts, opts) diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index d3baebc13..780bdb05c 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -151,7 +151,8 @@ func (s *storageRESTServer) CrawlAndGetDataUsageHandler(w http.ResponseWriter, r return } - w.Header().Set(xhttp.ContentType, "text/event-stream") + setEventStreamHeaders(w) + var cache dataUsageCache err := cache.deserialize(r.Body) if err != nil { @@ -520,7 +521,7 @@ func (s *storageRESTServer) WalkSplunkHandler(w http.ResponseWriter, r *http.Req dirPath := vars[storageRESTDirPath] markerPath := vars[storageRESTMarkerPath] - w.Header().Set(xhttp.ContentType, "text/event-stream") + setEventStreamHeaders(w) encoder := gob.NewEncoder(w) fch, err := s.storage.WalkSplunk(volume, dirPath, markerPath, r.Context().Done()) @@ -548,7 +549,7 @@ func (s *storageRESTServer) WalkVersionsHandler(w http.ResponseWriter, r *http.R return } - w.Header().Set(xhttp.ContentType, "text/event-stream") + setEventStreamHeaders(w) encoder := gob.NewEncoder(w) fch, err := s.storage.WalkVersions(volume, dirPath, markerPath, recursive, r.Context().Done()) @@ -576,7 +577,7 @@ func (s *storageRESTServer) WalkHandler(w http.ResponseWriter, r *http.Request) return } - w.Header().Set(xhttp.ContentType, "text/event-stream") + setEventStreamHeaders(w) encoder := gob.NewEncoder(w) fch, err := s.storage.Walk(volume, dirPath, markerPath, recursive, r.Context().Done()) @@ -659,7 +660,7 @@ func (s *storageRESTServer) DeleteVersionsHandler(w http.ResponseWriter, r *http dErrsResp := &DeleteVersionsErrsResp{Errs: make([]error, totalVersions)} - w.Header().Set(xhttp.ContentType, "text/event-stream") + setEventStreamHeaders(w) encoder := gob.NewEncoder(w) done := keepHTTPResponseAlive(w) errs := s.storage.DeleteVersions(volume, versions) @@ -805,7 +806,7 @@ func (s *storageRESTServer) VerifyFileHandler(w http.ResponseWriter, r *http.Req return } - w.Header().Set(xhttp.ContentType, "text/event-stream") + setEventStreamHeaders(w) encoder := gob.NewEncoder(w) done := keepHTTPResponseAlive(w) err = s.storage.VerifyFile(volume, filePath, fi)