mirror of
https://github.com/minio/minio.git
synced 2025-01-12 15:33:22 -05:00
Remove old ListenBucketNotification API (#8645)
This commit is contained in:
parent
39e8e4f4aa
commit
cc02bf0442
@ -288,23 +288,14 @@ func TestAdminServerInfo(t *testing.T) {
|
|||||||
t.Errorf("Expected to succeed but failed with %d", rec.Code)
|
t.Errorf("Expected to succeed but failed with %d", rec.Code)
|
||||||
}
|
}
|
||||||
|
|
||||||
results := []ServerInfo{}
|
results := madmin.InfoMessage{}
|
||||||
err = json.NewDecoder(rec.Body).Decode(&results)
|
err = json.NewDecoder(rec.Body).Decode(&results)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Failed to decode set config result json %v", err)
|
t.Fatalf("Failed to decode set config result json %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(results) == 0 {
|
if results.Region != globalMinioDefaultRegion {
|
||||||
t.Error("Expected at least one server info result")
|
t.Errorf("Expected %s, got %s", globalMinioDefaultRegion, results.Region)
|
||||||
}
|
|
||||||
|
|
||||||
for _, serverInfo := range results {
|
|
||||||
if serverInfo.Error != "" {
|
|
||||||
t.Errorf("Unexpected error = %v\n", serverInfo.Error)
|
|
||||||
}
|
|
||||||
if serverInfo.Data.Properties.Region != globalMinioDefaultRegion {
|
|
||||||
t.Errorf("Expected %s, got %s", globalMinioDefaultRegion, serverInfo.Data.Properties.Region)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -166,8 +166,6 @@ func registerAPIRouter(router *mux.Router, encryptionEnabled, allowSSEKMS bool)
|
|||||||
bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("getbucketnotification", httpTraceAll(api.GetBucketNotificationHandler))).Queries("notification", "")
|
bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("getbucketnotification", httpTraceAll(api.GetBucketNotificationHandler))).Queries("notification", "")
|
||||||
// ListenBucketNotification
|
// ListenBucketNotification
|
||||||
bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("listenbucketnotification", httpTraceAll(api.ListenBucketNotificationHandler))).Queries("events", "{events:.*}")
|
bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("listenbucketnotification", httpTraceAll(api.ListenBucketNotificationHandler))).Queries("events", "{events:.*}")
|
||||||
// ListenBucketNotificationV2
|
|
||||||
bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("listenbucketnotificationv2", httpTraceAll(api.ListenBucketNotificationHandlerV2))).Queries("type", "2", "events", "{events:.*}")
|
|
||||||
// ListMultipartUploads
|
// ListMultipartUploads
|
||||||
bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("listmultipartuploads", httpTraceAll(api.ListMultipartUploadsHandler))).Queries("uploads", "")
|
bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("listmultipartuploads", httpTraceAll(api.ListMultipartUploadsHandler))).Queries("uploads", "")
|
||||||
// ListObjectsV2M
|
// ListObjectsV2M
|
||||||
|
@ -31,7 +31,6 @@ import (
|
|||||||
xhttp "github.com/minio/minio/cmd/http"
|
xhttp "github.com/minio/minio/cmd/http"
|
||||||
"github.com/minio/minio/cmd/logger"
|
"github.com/minio/minio/cmd/logger"
|
||||||
"github.com/minio/minio/pkg/event"
|
"github.com/minio/minio/pkg/event"
|
||||||
"github.com/minio/minio/pkg/event/target"
|
|
||||||
xnet "github.com/minio/minio/pkg/net"
|
xnet "github.com/minio/minio/pkg/net"
|
||||||
"github.com/minio/minio/pkg/policy"
|
"github.com/minio/minio/pkg/policy"
|
||||||
)
|
)
|
||||||
@ -39,7 +38,6 @@ import (
|
|||||||
const (
|
const (
|
||||||
bucketConfigPrefix = "buckets"
|
bucketConfigPrefix = "buckets"
|
||||||
bucketNotificationConfig = "notification.xml"
|
bucketNotificationConfig = "notification.xml"
|
||||||
bucketListenerConfig = "listener.json"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var errNoSuchNotifications = errors.New("The specified bucket does not have bucket notifications")
|
var errNoSuchNotifications = errors.New("The specified bucket does not have bucket notifications")
|
||||||
@ -174,10 +172,10 @@ func (api objectAPIHandlers) PutBucketNotificationHandler(w http.ResponseWriter,
|
|||||||
writeSuccessResponseHeadersOnly(w)
|
writeSuccessResponseHeadersOnly(w)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api objectAPIHandlers) ListenBucketNotificationHandlerV2(w http.ResponseWriter, r *http.Request) {
|
func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
ctx := newContext(r, w, "ListenBucketNotificationV2")
|
ctx := newContext(r, w, "ListenBucketNotification")
|
||||||
|
|
||||||
defer logger.AuditLog(w, r, "ListenBucketNotificationV2", mustGetClaimsFromToken(r))
|
defer logger.AuditLog(w, r, "ListenBucketNotification", mustGetClaimsFromToken(r))
|
||||||
|
|
||||||
// Validate if bucket exists.
|
// Validate if bucket exists.
|
||||||
objAPI := api.ObjectAPI()
|
objAPI := api.ObjectAPI()
|
||||||
@ -311,131 +309,3 @@ func (api objectAPIHandlers) ListenBucketNotificationHandlerV2(w http.ResponseWr
|
|||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListenBucketNotificationHandler - This HTTP handler sends events to the connected HTTP client.
|
|
||||||
// Client should send prefix/suffix object name to match and events to watch as query parameters.
|
|
||||||
func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWriter, r *http.Request) {
|
|
||||||
ctx := newContext(r, w, "ListenBucketNotification")
|
|
||||||
|
|
||||||
defer logger.AuditLog(w, r, "ListenBucketNotification", mustGetClaimsFromToken(r))
|
|
||||||
|
|
||||||
// Validate if bucket exists.
|
|
||||||
objAPI := api.ObjectAPI()
|
|
||||||
if objAPI == nil {
|
|
||||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL, guessIsBrowserReq(r))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if !objAPI.IsNotificationSupported() {
|
|
||||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL, guessIsBrowserReq(r))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if !objAPI.IsListenBucketSupported() {
|
|
||||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL, guessIsBrowserReq(r))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
vars := mux.Vars(r)
|
|
||||||
bucketName := vars["bucket"]
|
|
||||||
|
|
||||||
if s3Error := checkRequestAuthType(ctx, r, policy.ListenBucketNotificationAction, bucketName, ""); s3Error != ErrNone {
|
|
||||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL, guessIsBrowserReq(r))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
values := r.URL.Query()
|
|
||||||
|
|
||||||
var prefix string
|
|
||||||
if len(values["prefix"]) > 1 {
|
|
||||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrFilterNamePrefix), r.URL, guessIsBrowserReq(r))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(values["prefix"]) == 1 {
|
|
||||||
if err := event.ValidateFilterRuleValue(values["prefix"][0]); err != nil {
|
|
||||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
prefix = values["prefix"][0]
|
|
||||||
}
|
|
||||||
|
|
||||||
var suffix string
|
|
||||||
if len(values["suffix"]) > 1 {
|
|
||||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrFilterNameSuffix), r.URL, guessIsBrowserReq(r))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(values["suffix"]) == 1 {
|
|
||||||
if err := event.ValidateFilterRuleValue(values["suffix"][0]); err != nil {
|
|
||||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
suffix = values["suffix"][0]
|
|
||||||
}
|
|
||||||
|
|
||||||
pattern := event.NewPattern(prefix, suffix)
|
|
||||||
|
|
||||||
eventNames := []event.Name{}
|
|
||||||
for _, s := range values["events"] {
|
|
||||||
eventName, err := event.ParseName(s)
|
|
||||||
if err != nil {
|
|
||||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
eventNames = append(eventNames, eventName)
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, err := objAPI.GetBucketInfo(ctx, bucketName); err != nil {
|
|
||||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
host, err := xnet.ParseHost(r.RemoteAddr)
|
|
||||||
if err != nil {
|
|
||||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
w.Header().Set(xhttp.ContentType, "text/event-stream")
|
|
||||||
|
|
||||||
target, err := target.NewHTTPClientTarget(*host, w)
|
|
||||||
if err != nil {
|
|
||||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
rulesMap := event.NewRulesMap(eventNames, pattern, target.ID())
|
|
||||||
|
|
||||||
if err = globalNotificationSys.AddRemoteTarget(bucketName, target, rulesMap); err != nil {
|
|
||||||
logger.GetReqInfo(ctx).AppendTags("target", target.ID().Name)
|
|
||||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer globalNotificationSys.RemoveRemoteTarget(bucketName, target.ID())
|
|
||||||
defer globalNotificationSys.RemoveRulesMap(bucketName, rulesMap)
|
|
||||||
|
|
||||||
thisAddr, err := xnet.ParseHost(GetLocalPeer(globalEndpoints))
|
|
||||||
if err != nil {
|
|
||||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = SaveListener(objAPI, bucketName, eventNames, pattern, target.ID(), *thisAddr); err != nil {
|
|
||||||
logger.GetReqInfo(ctx).AppendTags("target", target.ID().Name)
|
|
||||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
globalNotificationSys.ListenBucketNotification(ctx, bucketName, eventNames, pattern, target.ID(), *thisAddr)
|
|
||||||
|
|
||||||
<-target.DoneCh
|
|
||||||
|
|
||||||
if err = RemoveListener(objAPI, bucketName, target.ID(), *thisAddr); err != nil {
|
|
||||||
logger.GetReqInfo(ctx).AppendTags("target", target.ID().Name)
|
|
||||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@ -19,7 +19,6 @@ package cmd
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
|
||||||
"encoding/xml"
|
"encoding/xml"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
@ -590,24 +589,6 @@ func (sys *NotificationSys) PutBucketNotification(ctx context.Context, bucketNam
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListenBucketNotification - calls ListenBucketNotification RPC call on all peers.
|
|
||||||
func (sys *NotificationSys) ListenBucketNotification(ctx context.Context, bucketName string,
|
|
||||||
eventNames []event.Name, pattern string, targetID event.TargetID, localPeer xnet.Host) {
|
|
||||||
go func() {
|
|
||||||
ng := WithNPeers(len(sys.peerClients))
|
|
||||||
for idx, client := range sys.peerClients {
|
|
||||||
if client == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
client := client
|
|
||||||
ng.Go(ctx, func() error {
|
|
||||||
return client.ListenBucketNotification(bucketName, eventNames, pattern, targetID, localPeer)
|
|
||||||
}, idx, *client.host)
|
|
||||||
}
|
|
||||||
ng.Wait()
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
// AddNotificationTargetsFromConfig - adds notification targets from server config.
|
// AddNotificationTargetsFromConfig - adds notification targets from server config.
|
||||||
func (sys *NotificationSys) AddNotificationTargetsFromConfig(cfg config.Config) error {
|
func (sys *NotificationSys) AddNotificationTargetsFromConfig(cfg config.Config) error {
|
||||||
targetList, err := notify.GetNotificationTargets(cfg, GlobalServiceDoneCh, NewCustomHTTPTransport())
|
targetList, err := notify.GetNotificationTargets(cfg, GlobalServiceDoneCh, NewCustomHTTPTransport())
|
||||||
@ -660,82 +641,6 @@ func (sys *NotificationSys) RemoteTargetExist(bucketName string, targetID event.
|
|||||||
return ok
|
return ok
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListenBucketNotificationArgs - listen bucket notification RPC arguments.
|
|
||||||
type ListenBucketNotificationArgs struct {
|
|
||||||
BucketName string `json:"-"`
|
|
||||||
EventNames []event.Name `json:"eventNames"`
|
|
||||||
Pattern string `json:"pattern"`
|
|
||||||
TargetID event.TargetID `json:"targetId"`
|
|
||||||
Addr xnet.Host `json:"addr"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// initListeners - initializes PeerREST clients available in listener.json.
|
|
||||||
func (sys *NotificationSys) initListeners(ctx context.Context, objAPI ObjectLayer, bucketName string) error {
|
|
||||||
// listener.json is available/applicable only in DistXL mode.
|
|
||||||
if !globalIsDistXL {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Construct path to listener.json for the given bucket.
|
|
||||||
configFile := path.Join(bucketConfigPrefix, bucketName, bucketListenerConfig)
|
|
||||||
configData, e := readConfig(ctx, objAPI, configFile)
|
|
||||||
if e != nil && !IsErrIgnored(e, errDiskNotFound, errConfigNotFound) {
|
|
||||||
return e
|
|
||||||
}
|
|
||||||
|
|
||||||
listenerList := []ListenBucketNotificationArgs{}
|
|
||||||
if configData != nil {
|
|
||||||
if err := json.Unmarshal(configData, &listenerList); err != nil {
|
|
||||||
logger.LogIf(ctx, err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(listenerList) == 0 {
|
|
||||||
// Nothing to initialize for empty listener list.
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, args := range listenerList {
|
|
||||||
found, err := isLocalHost(args.Addr.Name, args.Addr.Port.String(), args.Addr.Port.String())
|
|
||||||
if err != nil {
|
|
||||||
logger.GetReqInfo(ctx).AppendTags("host", args.Addr.String())
|
|
||||||
logger.LogIf(ctx, err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if found {
|
|
||||||
// As this function is called at startup, skip HTTP listener to this host.
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
client, err := newPeerRESTClient(&args.Addr)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("unable to find PeerHost by address %v in listener.json for bucket %v", args.Addr, bucketName)
|
|
||||||
}
|
|
||||||
|
|
||||||
exist, err := client.RemoteTargetExist(bucketName, args.TargetID)
|
|
||||||
if err != nil {
|
|
||||||
logger.GetReqInfo(ctx).AppendTags("targetID", args.TargetID.Name)
|
|
||||||
logger.LogIf(ctx, err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if !exist {
|
|
||||||
// Skip previously connected HTTP listener which is not found in remote peer.
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
target := NewPeerRESTClientTarget(bucketName, args.TargetID, client)
|
|
||||||
rulesMap := event.NewRulesMap(args.EventNames, args.Pattern, target.ID())
|
|
||||||
if err = sys.AddRemoteTarget(bucketName, target, rulesMap); err != nil {
|
|
||||||
logger.GetReqInfo(ctx).AppendTags("targetName", target.id.Name)
|
|
||||||
logger.LogIf(ctx, err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Loads notification policies for all buckets into NotificationSys.
|
// Loads notification policies for all buckets into NotificationSys.
|
||||||
func (sys *NotificationSys) load(buckets []BucketInfo, objAPI ObjectLayer) error {
|
func (sys *NotificationSys) load(buckets []BucketInfo, objAPI ObjectLayer) error {
|
||||||
for _, bucket := range buckets {
|
for _, bucket := range buckets {
|
||||||
@ -751,9 +656,6 @@ func (sys *NotificationSys) load(buckets []BucketInfo, objAPI ObjectLayer) error
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
sys.AddRulesMap(bucket.Name, config.ToRulesMap())
|
sys.AddRulesMap(bucket.Name, config.ToRulesMap())
|
||||||
if err = sys.initListeners(ctx, objAPI, bucket.Name); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -1283,7 +1185,6 @@ func (args eventArgs) ToEvent() event.Event {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func sendEvent(args eventArgs) {
|
func sendEvent(args eventArgs) {
|
||||||
|
|
||||||
// remove sensitive encryption entries in metadata.
|
// remove sensitive encryption entries in metadata.
|
||||||
switch {
|
switch {
|
||||||
case crypto.IsEncrypted(args.Object.UserDefined):
|
case crypto.IsEncrypted(args.Object.UserDefined):
|
||||||
@ -1297,16 +1198,15 @@ func sendEvent(args eventArgs) {
|
|||||||
crypto.RemoveSensitiveEntries(args.Object.UserDefined)
|
crypto.RemoveSensitiveEntries(args.Object.UserDefined)
|
||||||
crypto.RemoveInternalEntries(args.Object.UserDefined)
|
crypto.RemoveInternalEntries(args.Object.UserDefined)
|
||||||
|
|
||||||
if globalHTTPListen.HasSubscribers() {
|
|
||||||
globalHTTPListen.Publish(args.ToEvent())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// globalNotificationSys is not initialized in gateway mode.
|
// globalNotificationSys is not initialized in gateway mode.
|
||||||
if globalNotificationSys == nil {
|
if globalNotificationSys == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if globalHTTPListen.HasSubscribers() {
|
||||||
|
globalHTTPListen.Publish(args.ToEvent())
|
||||||
|
}
|
||||||
|
|
||||||
notifyCh := globalNotificationSys.Send(args)
|
notifyCh := globalNotificationSys.Send(args)
|
||||||
go func() {
|
go func() {
|
||||||
for _, err := range notifyCh {
|
for _, err := range notifyCh {
|
||||||
@ -1345,92 +1245,3 @@ func saveNotificationConfig(ctx context.Context, objAPI ObjectLayer, bucketName
|
|||||||
configFile := path.Join(bucketConfigPrefix, bucketName, bucketNotificationConfig)
|
configFile := path.Join(bucketConfigPrefix, bucketName, bucketNotificationConfig)
|
||||||
return saveConfig(ctx, objAPI, configFile, data)
|
return saveConfig(ctx, objAPI, configFile, data)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SaveListener - saves HTTP client currently listening for events to listener.json.
|
|
||||||
func SaveListener(objAPI ObjectLayer, bucketName string, eventNames []event.Name, pattern string, targetID event.TargetID, addr xnet.Host) error {
|
|
||||||
// listener.json is available/applicable only in DistXL mode.
|
|
||||||
if !globalIsDistXL {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx := logger.SetReqInfo(context.Background(), &logger.ReqInfo{BucketName: bucketName})
|
|
||||||
|
|
||||||
// Construct path to listener.json for the given bucket.
|
|
||||||
configFile := path.Join(bucketConfigPrefix, bucketName, bucketListenerConfig)
|
|
||||||
|
|
||||||
configData, err := readConfig(ctx, objAPI, configFile)
|
|
||||||
if err != nil && !IsErrIgnored(err, errDiskNotFound, errConfigNotFound) {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
listenerList := []ListenBucketNotificationArgs{}
|
|
||||||
if configData != nil {
|
|
||||||
if err = json.Unmarshal(configData, &listenerList); err != nil {
|
|
||||||
logger.LogIf(ctx, err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
listenerList = append(listenerList, ListenBucketNotificationArgs{
|
|
||||||
EventNames: eventNames,
|
|
||||||
Pattern: pattern,
|
|
||||||
TargetID: targetID,
|
|
||||||
Addr: addr,
|
|
||||||
})
|
|
||||||
|
|
||||||
data, err := json.Marshal(listenerList)
|
|
||||||
if err != nil {
|
|
||||||
logger.LogIf(ctx, err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return saveConfig(ctx, objAPI, configFile, data)
|
|
||||||
}
|
|
||||||
|
|
||||||
// RemoveListener - removes HTTP client currently listening for events from listener.json.
|
|
||||||
func RemoveListener(objAPI ObjectLayer, bucketName string, targetID event.TargetID, addr xnet.Host) error {
|
|
||||||
// listener.json is available/applicable only in DistXL mode.
|
|
||||||
if !globalIsDistXL {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx := logger.SetReqInfo(context.Background(), &logger.ReqInfo{BucketName: bucketName})
|
|
||||||
|
|
||||||
// Construct path to listener.json for the given bucket.
|
|
||||||
configFile := path.Join(bucketConfigPrefix, bucketName, bucketListenerConfig)
|
|
||||||
configData, err := readConfig(ctx, objAPI, configFile)
|
|
||||||
if err != nil && !IsErrIgnored(err, errDiskNotFound, errConfigNotFound) {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
listenerList := []ListenBucketNotificationArgs{}
|
|
||||||
if configData != nil {
|
|
||||||
if err = json.Unmarshal(configData, &listenerList); err != nil {
|
|
||||||
logger.LogIf(ctx, err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(listenerList) == 0 {
|
|
||||||
// Nothing to remove.
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
activeListenerList := []ListenBucketNotificationArgs{}
|
|
||||||
for _, args := range listenerList {
|
|
||||||
if args.TargetID == targetID && args.Addr.Equal(addr) {
|
|
||||||
// Skip if matches
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
activeListenerList = append(activeListenerList, args)
|
|
||||||
}
|
|
||||||
|
|
||||||
data, err := json.Marshal(activeListenerList)
|
|
||||||
if err != nil {
|
|
||||||
logger.LogIf(ctx, err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return saveConfig(ctx, objAPI, configFile, data)
|
|
||||||
}
|
|
||||||
|
@ -91,9 +91,6 @@ func deleteBucketMetadata(ctx context.Context, bucket string, objAPI ObjectLayer
|
|||||||
|
|
||||||
// Delete notification config, if present - ignore any errors.
|
// Delete notification config, if present - ignore any errors.
|
||||||
removeNotificationConfig(ctx, objAPI, bucket)
|
removeNotificationConfig(ctx, objAPI, bucket)
|
||||||
|
|
||||||
// Delete listener config, if present - ignore any errors.
|
|
||||||
removeListenerConfig(ctx, objAPI, bucket)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Depending on the disk type network or local, initialize storage API.
|
// Depending on the disk type network or local, initialize storage API.
|
||||||
@ -233,13 +230,6 @@ func removeNotificationConfig(ctx context.Context, objAPI ObjectLayer, bucket st
|
|||||||
return objAPI.DeleteObject(ctx, minioMetaBucket, ncPath)
|
return objAPI.DeleteObject(ctx, minioMetaBucket, ncPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove listener configuration from storage layer. Used when a bucket is deleted.
|
|
||||||
func removeListenerConfig(ctx context.Context, objAPI ObjectLayer, bucket string) error {
|
|
||||||
// make the path
|
|
||||||
lcPath := path.Join(bucketConfigPrefix, bucket, bucketListenerConfig)
|
|
||||||
return objAPI.DeleteObject(ctx, minioMetaBucket, lcPath)
|
|
||||||
}
|
|
||||||
|
|
||||||
func listObjectsNonSlash(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int, tpool *TreeWalkPool, listDir ListDirFunc, getObjInfo func(context.Context, string, string) (ObjectInfo, error), getObjectInfoDirs ...func(context.Context, string, string) (ObjectInfo, error)) (loi ListObjectsInfo, err error) {
|
func listObjectsNonSlash(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int, tpool *TreeWalkPool, listDir ListDirFunc, getObjInfo func(context.Context, string, string) (ObjectInfo, error), getObjectInfoDirs ...func(context.Context, string, string) (ObjectInfo, error)) (loi ListObjectsInfo, err error) {
|
||||||
endWalkCh := make(chan struct{})
|
endWalkCh := make(chan struct{})
|
||||||
defer close(endWalkCh)
|
defer close(endWalkCh)
|
||||||
|
@ -264,34 +264,6 @@ func (client *peerRESTClient) ReloadFormat(dryRun bool) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListenBucketNotification - send listen bucket notification to peer nodes.
|
|
||||||
func (client *peerRESTClient) ListenBucketNotification(bucket string, eventNames []event.Name,
|
|
||||||
pattern string, targetID event.TargetID, addr xnet.Host) error {
|
|
||||||
args := listenBucketNotificationReq{
|
|
||||||
EventNames: eventNames,
|
|
||||||
Pattern: pattern,
|
|
||||||
TargetID: targetID,
|
|
||||||
Addr: addr,
|
|
||||||
}
|
|
||||||
|
|
||||||
values := make(url.Values)
|
|
||||||
values.Set(peerRESTBucket, bucket)
|
|
||||||
|
|
||||||
var reader bytes.Buffer
|
|
||||||
err := gob.NewEncoder(&reader).Encode(args)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
respBody, err := client.call(peerRESTMethodBucketNotificationListen, values, &reader, -1)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
defer http.DrainBody(respBody)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// SendEvent - calls send event RPC.
|
// SendEvent - calls send event RPC.
|
||||||
func (client *peerRESTClient) SendEvent(bucket string, targetID, remoteTargetID event.TargetID, eventData event.Event) error {
|
func (client *peerRESTClient) SendEvent(bucket string, targetID, remoteTargetID event.TargetID, eventData event.Event) error {
|
||||||
numTries := 10
|
numTries := 10
|
||||||
|
@ -32,7 +32,6 @@ import (
|
|||||||
"github.com/minio/minio/cmd/logger"
|
"github.com/minio/minio/cmd/logger"
|
||||||
"github.com/minio/minio/pkg/event"
|
"github.com/minio/minio/pkg/event"
|
||||||
"github.com/minio/minio/pkg/lifecycle"
|
"github.com/minio/minio/pkg/lifecycle"
|
||||||
xnet "github.com/minio/minio/pkg/net"
|
|
||||||
"github.com/minio/minio/pkg/policy"
|
"github.com/minio/minio/pkg/policy"
|
||||||
trace "github.com/minio/minio/pkg/trace"
|
trace "github.com/minio/minio/pkg/trace"
|
||||||
)
|
)
|
||||||
@ -847,58 +846,6 @@ func (s *peerRESTServer) PutBucketObjectLockConfigHandler(w http.ResponseWriter,
|
|||||||
w.(http.Flusher).Flush()
|
w.(http.Flusher).Flush()
|
||||||
}
|
}
|
||||||
|
|
||||||
type listenBucketNotificationReq struct {
|
|
||||||
EventNames []event.Name `json:"eventNames"`
|
|
||||||
Pattern string `json:"pattern"`
|
|
||||||
TargetID event.TargetID `json:"targetId"`
|
|
||||||
Addr xnet.Host `json:"addr"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// ListenBucketNotificationHandler - Listen bucket notification handler.
|
|
||||||
func (s *peerRESTServer) ListenBucketNotificationHandler(w http.ResponseWriter, r *http.Request) {
|
|
||||||
if !s.IsValid(w, r) {
|
|
||||||
s.writeErrorResponse(w, errors.New("Invalid request"))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
vars := mux.Vars(r)
|
|
||||||
bucketName := vars[peerRESTBucket]
|
|
||||||
if bucketName == "" {
|
|
||||||
s.writeErrorResponse(w, errors.New("Bucket name is missing"))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var args listenBucketNotificationReq
|
|
||||||
if r.ContentLength <= 0 {
|
|
||||||
s.writeErrorResponse(w, errInvalidArgument)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
err := gob.NewDecoder(r.Body).Decode(&args)
|
|
||||||
if err != nil {
|
|
||||||
s.writeErrorResponse(w, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
restClient, err := newPeerRESTClient(&args.Addr)
|
|
||||||
if err != nil {
|
|
||||||
s.writeErrorResponse(w, fmt.Errorf("unable to find PeerRESTClient for provided address %v. This happens only if remote and this minio run with different set of endpoints", args.Addr))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
target := NewPeerRESTClientTarget(bucketName, args.TargetID, restClient)
|
|
||||||
rulesMap := event.NewRulesMap(args.EventNames, args.Pattern, target.ID())
|
|
||||||
if err := globalNotificationSys.AddRemoteTarget(bucketName, target, rulesMap); err != nil {
|
|
||||||
reqInfo := &logger.ReqInfo{BucketName: target.bucketName}
|
|
||||||
reqInfo.AppendTags("target", target.id.Name)
|
|
||||||
ctx := logger.SetReqInfo(context.Background(), reqInfo)
|
|
||||||
logger.LogIf(ctx, err)
|
|
||||||
s.writeErrorResponse(w, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
w.(http.Flusher).Flush()
|
|
||||||
}
|
|
||||||
|
|
||||||
// ServerUpdateHandler - updates the current server.
|
// ServerUpdateHandler - updates the current server.
|
||||||
func (s *peerRESTServer) ServerUpdateHandler(w http.ResponseWriter, r *http.Request) {
|
func (s *peerRESTServer) ServerUpdateHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
if !s.IsValid(w, r) {
|
if !s.IsValid(w, r) {
|
||||||
@ -1154,7 +1101,6 @@ func registerPeerRESTHandlers(router *mux.Router) {
|
|||||||
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodTargetExists).HandlerFunc(httpTraceHdrs(server.TargetExistsHandler)).Queries(restQueries(peerRESTBucket)...)
|
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodTargetExists).HandlerFunc(httpTraceHdrs(server.TargetExistsHandler)).Queries(restQueries(peerRESTBucket)...)
|
||||||
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodSendEvent).HandlerFunc(httpTraceHdrs(server.SendEventHandler)).Queries(restQueries(peerRESTBucket)...)
|
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodSendEvent).HandlerFunc(httpTraceHdrs(server.SendEventHandler)).Queries(restQueries(peerRESTBucket)...)
|
||||||
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodBucketNotificationPut).HandlerFunc(httpTraceHdrs(server.PutBucketNotificationHandler)).Queries(restQueries(peerRESTBucket)...)
|
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodBucketNotificationPut).HandlerFunc(httpTraceHdrs(server.PutBucketNotificationHandler)).Queries(restQueries(peerRESTBucket)...)
|
||||||
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodBucketNotificationListen).HandlerFunc(httpTraceHdrs(server.ListenBucketNotificationHandler)).Queries(restQueries(peerRESTBucket)...)
|
|
||||||
|
|
||||||
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodReloadFormat).HandlerFunc(httpTraceHdrs(server.ReloadFormatHandler)).Queries(restQueries(peerRESTDryRun)...)
|
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodReloadFormat).HandlerFunc(httpTraceHdrs(server.ReloadFormatHandler)).Queries(restQueries(peerRESTDryRun)...)
|
||||||
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodBucketLifecycleSet).HandlerFunc(httpTraceHdrs(server.SetBucketLifecycleHandler)).Queries(restQueries(peerRESTBucket)...)
|
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodBucketLifecycleSet).HandlerFunc(httpTraceHdrs(server.SetBucketLifecycleHandler)).Queries(restQueries(peerRESTBucket)...)
|
||||||
|
Loading…
Reference in New Issue
Block a user