Fix crash in console logger and also handle bucket DNS updates (#8654)

Also fix listenBucketNotification bugs seen by minio-js
listen bucket notification API.
This commit is contained in:
Harshavardhana
2019-12-16 20:30:57 -08:00
committed by kannappanr
parent 842d0241ed
commit c8d82588c2
19 changed files with 208 additions and 273 deletions

View File

@@ -24,6 +24,7 @@ import (
"io"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"strings"
"time"
@@ -915,6 +916,53 @@ func (s *peerRESTServer) ListenHandler(w http.ResponseWriter, r *http.Request) {
return
}
values := r.URL.Query()
var prefix string
if len(values[peerRESTListenPrefix]) > 1 {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
if len(values[peerRESTListenPrefix]) == 1 {
if err := event.ValidateFilterRuleValue(values[peerRESTListenPrefix][0]); err != nil {
s.writeErrorResponse(w, err)
return
}
prefix = values[peerRESTListenPrefix][0]
}
var suffix string
if len(values[peerRESTListenSuffix]) > 1 {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
if len(values[peerRESTListenSuffix]) == 1 {
if err := event.ValidateFilterRuleValue(values[peerRESTListenSuffix][0]); err != nil {
s.writeErrorResponse(w, err)
return
}
suffix = values[peerRESTListenSuffix][0]
}
pattern := event.NewPattern(prefix, suffix)
var eventNames []event.Name
for _, ev := range values[peerRESTListenEvents] {
eventName, err := event.ParseName(ev)
if err != nil {
s.writeErrorResponse(w, err)
return
}
eventNames = append(eventNames, eventName)
}
rulesMap := event.NewRulesMap(eventNames, pattern, event.TargetID{ID: mustGetUUID()})
w.WriteHeader(http.StatusOK)
w.(http.Flusher).Flush()
@@ -925,8 +973,16 @@ func (s *peerRESTServer) ListenHandler(w http.ResponseWriter, r *http.Request) {
// Use buffered channel to take care of burst sends or slow w.Write()
ch := make(chan interface{}, 2000)
globalHTTPListen.Subscribe(ch, doneCh, func(entry interface{}) bool {
return true
globalHTTPListen.Subscribe(ch, doneCh, func(evI interface{}) bool {
ev, ok := evI.(event.Event)
if !ok {
return false
}
objectName, uerr := url.QueryUnescape(ev.S3.Object.Key)
if uerr != nil {
objectName = ev.S3.Object.Key
}
return len(rulesMap.Match(ev.EventName, objectName).ToSlice()) != 0
})
keepAliveTicker := time.NewTicker(500 * time.Millisecond)
@@ -1108,7 +1164,7 @@ func registerPeerRESTHandlers(router *mux.Router) {
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodBackgroundOpsStatus).HandlerFunc(server.BackgroundOpsStatusHandler)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodTrace).HandlerFunc(server.TraceHandler)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodListen).HandlerFunc(server.ListenHandler)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodListen).HandlerFunc(httpTraceHdrs(server.ListenHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodBackgroundHealStatus).HandlerFunc(server.BackgroundHealStatusHandler)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLog).HandlerFunc(server.ConsoleLogHandler)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodPutBucketObjectLockConfig).HandlerFunc(httpTraceHdrs(server.PutBucketObjectLockConfigHandler)).Queries(restQueries(peerRESTBucket)...)