fix: console log peer API from its broken implementation (#14873)

console logging peer API was broken as it would
timeout after 15minutes, this never really worked
beyond this value and basically failed to provide
the streaming "log" functionality that was expected
from this implementation.

also fix convoluted channel handling by keeping things
simple, this is rewritten.
This commit is contained in:
Harshavardhana 2022-05-06 12:39:58 -07:00 committed by GitHub
parent e55104a155
commit 35dea24ffd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 52 additions and 32 deletions

View File

@ -903,43 +903,54 @@ func (client *peerRESTClient) Trace(traceCh chan interface{}, doneCh <-chan stru
}() }()
} }
func (client *peerRESTClient) doConsoleLog(logCh chan interface{}, doneCh <-chan struct{}) {
// To cancel the REST request in case doneCh gets closed.
ctx, cancel := context.WithCancel(GlobalContext)
cancelCh := make(chan struct{})
defer close(cancelCh)
go func() {
select {
case <-doneCh:
case <-cancelCh:
// There was an error in the REST request.
}
cancel()
}()
respBody, err := client.callWithContext(ctx, peerRESTMethodLog, nil, nil, -1)
defer http.DrainBody(respBody)
if err != nil {
return
}
dec := gob.NewDecoder(respBody)
for {
var lg madmin.LogInfo
if err = dec.Decode(&lg); err != nil {
break
}
if lg.DeploymentID != "" {
select {
case logCh <- lg:
default:
// Do not block on slow receivers.
}
}
}
}
// ConsoleLog - sends request to peer nodes to get console logs // ConsoleLog - sends request to peer nodes to get console logs
func (client *peerRESTClient) ConsoleLog(logCh chan interface{}, doneCh <-chan struct{}) { func (client *peerRESTClient) ConsoleLog(logCh chan interface{}, doneCh <-chan struct{}) {
go func() { go func() {
for { for {
// get cancellation context to properly unsubscribe peers client.doConsoleLog(logCh, doneCh)
ctx, cancel := context.WithCancel(GlobalContext)
respBody, err := client.callWithContext(ctx, peerRESTMethodLog, nil, nil, -1)
if err != nil {
// Retry the failed request.
time.Sleep(5 * time.Second)
} else {
dec := gob.NewDecoder(respBody)
go func() {
<-doneCh
cancel()
}()
for {
var log madmin.LogInfo
if err = dec.Decode(&log); err != nil {
break
}
select {
case logCh <- log:
default:
}
}
}
select { select {
case <-doneCh: case <-doneCh:
cancel()
http.DrainBody(respBody)
return return
default: default:
// There was error in the REST request, retry. // There was error in the REST request, retry after sometime as probably the peer is down.
time.Sleep(5 * time.Second)
} }
} }
}() }()

View File

@ -932,6 +932,8 @@ func (s *peerRESTServer) ListenHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
w.(http.Flusher).Flush() w.(http.Flusher).Flush()
case <-r.Context().Done():
return
case <-keepAliveTicker.C: case <-keepAliveTicker.C:
if err := enc.Encode(&event.Event{}); err != nil { if err := enc.Encode(&event.Event{}); err != nil {
return return
@ -993,6 +995,8 @@ func (s *peerRESTServer) TraceHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
w.(http.Flusher).Flush() w.(http.Flusher).Flush()
case <-r.Context().Done():
return
case <-keepAliveTicker.C: case <-keepAliveTicker.C:
if err := enc.Encode(&madmin.TraceInfo{}); err != nil { if err := enc.Encode(&madmin.TraceInfo{}); err != nil {
return return
@ -1059,15 +1063,15 @@ func (s *peerRESTServer) ConsoleLogHandler(w http.ResponseWriter, r *http.Reques
return return
} }
w.Header().Set("Connection", "close")
w.WriteHeader(http.StatusOK)
doneCh := make(chan struct{}) doneCh := make(chan struct{})
defer close(doneCh) defer close(doneCh)
ch := make(chan interface{}, 2000) ch := make(chan interface{}, 2000)
globalConsoleSys.Subscribe(ch, doneCh, "", 0, string(logger.All), nil) globalConsoleSys.Subscribe(ch, doneCh, "", 0, string(logger.All), nil)
keepAliveTicker := time.NewTicker(500 * time.Millisecond)
defer keepAliveTicker.Stop()
enc := gob.NewEncoder(w) enc := gob.NewEncoder(w)
for { for {
select { select {
@ -1076,6 +1080,11 @@ func (s *peerRESTServer) ConsoleLogHandler(w http.ResponseWriter, r *http.Reques
return return
} }
w.(http.Flusher).Flush() w.(http.Flusher).Flush()
case <-keepAliveTicker.C:
if err := enc.Encode(&madmin.LogInfo{}); err != nil {
return
}
w.(http.Flusher).Flush()
case <-r.Context().Done(): case <-r.Context().Done():
return return
} }