From c4c79f61cef1d4689206249c10a569c7f7cc90e1 Mon Sep 17 00:00:00 2001 From: Praveen raj Mani Date: Thu, 23 May 2019 02:04:48 +0530 Subject: [PATCH] Notification: Changes to persistent event store (#7658) This patch includes the following changes in event store interface - Removes memory store. We will not persist events in memory anymore, if `queueDir` is not set. - Orders the events before replaying to the broker. --- pkg/event/target/memorystore.go | 118 ------------------------- pkg/event/target/memorystore_test.go | 125 --------------------------- pkg/event/target/mqtt.go | 85 ++++++++++++------ pkg/event/target/queuestore.go | 24 +++-- pkg/event/target/queuestore_test.go | 24 +++-- pkg/event/target/store.go | 7 +- 6 files changed, 87 insertions(+), 296 deletions(-) delete mode 100644 pkg/event/target/memorystore.go delete mode 100644 pkg/event/target/memorystore_test.go diff --git a/pkg/event/target/memorystore.go b/pkg/event/target/memorystore.go deleted file mode 100644 index e35f5ae26..000000000 --- a/pkg/event/target/memorystore.go +++ /dev/null @@ -1,118 +0,0 @@ -/* - * MinIO Cloud Storage, (C) 2019 MinIO, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package target - -import ( - "sync" - - "github.com/minio/minio/pkg/event" -) - -const ( - maxStoreLimit = 10000 -) - -// MemoryStore persists events in memory. -type MemoryStore struct { - sync.RWMutex - events map[string]event.Event - eC uint16 - limit uint16 -} - -// NewMemoryStore creates a memory store instance. -func NewMemoryStore(limit uint16) *MemoryStore { - if limit == 0 || limit > maxStoreLimit { - limit = maxStoreLimit - } - memoryStore := &MemoryStore{ - events: make(map[string]event.Event), - limit: limit, - } - return memoryStore -} - -// Open is in-effective here. -// Implemented for interface compatibility. -func (store *MemoryStore) Open() error { - return nil -} - -// Put - puts the event in store. -func (store *MemoryStore) Put(e event.Event) error { - store.Lock() - defer store.Unlock() - if store.eC == store.limit { - return errLimitExceeded - } - key, kErr := getNewUUID() - if kErr != nil { - return kErr - } - store.events[key] = e - store.eC++ - return nil -} - -// Get - retrieves the event from store. -func (store *MemoryStore) Get(key string) (event.Event, error) { - store.RLock() - defer store.RUnlock() - - if event, exist := store.events[key]; exist { - return event, nil - } - - return event.Event{}, errNoSuchKey -} - -// Del - deletes the event from store. -func (store *MemoryStore) Del(key string) error { - store.Lock() - defer store.Unlock() - - delete(store.events, key) - - store.eC-- - - return nil -} - -// ListN - lists atmost N keys in the store. -func (store *MemoryStore) ListN(n int) []string { - store.RLock() - defer store.RUnlock() - - var i int - - if n == -1 { - n = len(store.events) - } - - keys := []string{} - for k := range store.events { - if i < n { - keys = append(keys, k) - i++ - continue - } else { - break - } - } - - return keys -} diff --git a/pkg/event/target/memorystore_test.go b/pkg/event/target/memorystore_test.go deleted file mode 100644 index 1745aeeab..000000000 --- a/pkg/event/target/memorystore_test.go +++ /dev/null @@ -1,125 +0,0 @@ -/* - * MinIO Cloud Storage, (C) 2019 MinIO, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package target - -import ( - "reflect" - "testing" -) - -// TestMemoryStorePut - Tests for store.Put -func TestMemoryStorePut(t *testing.T) { - store := NewMemoryStore(100) - defer func() { - store = nil - }() - for i := 0; i < 100; i++ { - if err := store.Put(testEvent); err != nil { - t.Fatal("Failed to put to queue store ", err) - } - } - if len(store.ListN(-1)) != 100 { - t.Fatalf("ListN() Expected: 100, got %d", len(store.ListN(-1))) - } -} - -// TestMemoryStoreGet - Tests for store.Get. -func TestMemoryStoreGet(t *testing.T) { - store := NewMemoryStore(10) - defer func() { - store = nil - }() - for i := 0; i < 10; i++ { - if err := store.Put(testEvent); err != nil { - t.Fatal("Failed to put to queue store ", err) - } - } - eventKeys := store.ListN(-1) - if len(eventKeys) == 10 { - for _, key := range eventKeys { - event, eErr := store.Get(key) - if eErr != nil { - t.Fatal("Failed to Get the event from the queue store ", eErr) - } - if !reflect.DeepEqual(testEvent, event) { - t.Fatalf("Failed to read the event: error: expected = %v, got = %v", testEvent, event) - } - } - } else { - t.Fatalf("ListN() Expected: 10, got %d", len(eventKeys)) - } -} - -// TestMemoryStoreDel - Tests for store.Del. -func TestMemoryStoreDel(t *testing.T) { - store := NewMemoryStore(20) - defer func() { - store = nil - }() - for i := 0; i < 20; i++ { - if err := store.Put(testEvent); err != nil { - t.Fatal("Failed to put to queue store ", err) - } - } - eventKeys := store.ListN(-1) - if len(eventKeys) == 20 { - for _, key := range eventKeys { - _ = store.Del(key) - } - } else { - t.Fatalf("ListN() Expected: 20, got %d", len(eventKeys)) - } - - if len(store.ListN(-1)) != 0 { - t.Fatalf("ListN() Expected: 0, got %d", len(store.ListN(-1))) - } -} - -// TestMemoryStoreLimit - tests for store limit. -func TestMemoryStoreLimit(t *testing.T) { - store := NewMemoryStore(5) - defer func() { - store = nil - }() - for i := 0; i < 5; i++ { - if err := store.Put(testEvent); err != nil { - t.Fatal("Failed to put to queue store ", err) - } - } - if err := store.Put(testEvent); err == nil { - t.Fatalf("Expected to fail with %s, but passes", errLimitExceeded) - } -} - -// TestMemoryStoreListN - tests for store.ListN. -func TestMemoryStoreListN(t *testing.T) { - store := NewMemoryStore(10) - defer func() { - store = nil - }() - for i := 0; i < 10; i++ { - if err := store.Put(testEvent); err != nil { - t.Fatal("Failed to put to queue store ", err) - } - } - if len(store.ListN(5)) != 5 { - t.Fatalf("ListN(5) Expected: 5, got %d", len(store.ListN(5))) - } - if len(store.ListN(-1)) != 10 { - t.Fatalf("ListN(-1) Expected: 10, got %d", len(store.ListN(-1))) - } -} diff --git a/pkg/event/target/mqtt.go b/pkg/event/target/mqtt.go index 9d9b79d2d..6de5d8fe1 100644 --- a/pkg/event/target/mqtt.go +++ b/pkg/event/target/mqtt.go @@ -73,6 +73,9 @@ func (m MQTTArgs) Validate() error { return errors.New("qos should be set to 1 or 2 if queueDir is set") } } + if m.QueueLimit > 10000 { + return errors.New("queueLimit should not exceed 10000") + } return nil } @@ -90,23 +93,8 @@ func (target *MQTTTarget) ID() event.TargetID { return target.id } -// Send - sends event to MQTT. -func (target *MQTTTarget) Send(eventKey string) error { - - if !target.client.IsConnectionOpen() { - return errNotConnected - } - - eventData, eErr := target.store.Get(eventKey) - if eErr != nil { - // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() - // Such events will not exist and wouldve been already been sent successfully. - if os.IsNotExist(eErr) { - return nil - } - return eErr - } - +// send - sends an event to the mqtt. +func (target *MQTTTarget) send(eventData event.Event) error { objectName, err := url.QueryUnescape(eventData.S3.Object.Key) if err != nil { return err @@ -124,13 +112,46 @@ func (target *MQTTTarget) Send(eventKey string) error { return token.Error() } + return nil +} + +// Send - reads an event from store and sends it to MQTT. +func (target *MQTTTarget) Send(eventKey string) error { + + if !target.client.IsConnectionOpen() { + return errNotConnected + } + + eventData, eErr := target.store.Get(eventKey) + if eErr != nil { + // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() + // Such events will not exist and wouldve been already been sent successfully. + if os.IsNotExist(eErr) { + return nil + } + return eErr + } + + if err := target.send(eventData); err != nil { + return err + } + // Delete the event from store. return target.store.Del(eventKey) } -// Save - saves the events to the store which will be replayed when the mqtt connection is active. +// Save - saves the events to the store if questore is configured, which will be replayed when the mqtt connection is active. func (target *MQTTTarget) Save(eventData event.Event) error { - return target.store.Put(eventData) + if target.store != nil { + return target.store.Put(eventData) + } + + // Do not send if the connection is not active. + if !target.client.IsConnectionOpen() { + return errNotConnected + } + + return target.send(eventData) } // Close - does nothing and available for interface compatibility. @@ -158,8 +179,6 @@ func NewMQTTTarget(id string, args MQTTArgs, doneCh <-chan struct{}) (*MQTTTarge if oErr := store.Open(); oErr != nil { return nil, oErr } - } else { - store = NewMemoryStore(args.QueueLimit) } client := mqtt.NewClient(options) @@ -168,7 +187,8 @@ func NewMQTTTarget(id string, args MQTTArgs, doneCh <-chan struct{}) (*MQTTTarge // Connect() should be successful atleast once to publish events. token := client.Connect() - go func() { + // Retries until the clientID gets registered. + retryRegister := func() { // Repeat the pings until the client registers the clientId and receives a token. for { if token.Wait() && token.Error() == nil { @@ -179,7 +199,15 @@ func NewMQTTTarget(id string, args MQTTArgs, doneCh <-chan struct{}) (*MQTTTarge time.Sleep(reconnectInterval * time.Second) token = client.Connect() } - }() + } + + if store == nil { + if token.Wait() && token.Error() != nil { + return nil, token.Error() + } + } else { + go retryRegister() + } target := &MQTTTarget{ id: event.TargetID{ID: id, Name: "mqtt"}, @@ -188,11 +216,12 @@ func NewMQTTTarget(id string, args MQTTArgs, doneCh <-chan struct{}) (*MQTTTarge store: store, } - // Replays the events from the store. - eventKeyCh := replayEvents(target.store, doneCh) - - // Start replaying events from the store. - go sendEvents(target, eventKeyCh, doneCh) + if target.store != nil { + // Replays the events from the store. + eventKeyCh := replayEvents(target.store, doneCh) + // Start replaying events from the store. + go sendEvents(target, eventKeyCh, doneCh) + } return target, nil } diff --git a/pkg/event/target/queuestore.go b/pkg/event/target/queuestore.go index 06210a911..3b55970ac 100644 --- a/pkg/event/target/queuestore.go +++ b/pkg/event/target/queuestore.go @@ -21,6 +21,7 @@ import ( "io/ioutil" "os" "path/filepath" + "sort" "sync" "github.com/minio/minio/pkg/event" @@ -60,7 +61,7 @@ func (store *QueueStore) Open() error { return terr } - eCount := uint16(len(store.listN(-1))) + eCount := uint16(len(store.list())) if eCount >= store.limit { return errLimitExceeded } @@ -154,17 +155,28 @@ func (store *QueueStore) del(key string) error { return nil } -// ListN - lists atmost N files from the directory. -func (store *QueueStore) ListN(n int) []string { +// List - lists all files from the directory. +func (store *QueueStore) List() []string { store.RLock() defer store.RUnlock() - return store.listN(n) + return store.list() } // lockless call. -func (store *QueueStore) listN(n int) []string { +func (store *QueueStore) list() []string { + var names []string storeDir, _ := os.Open(store.directory) - names, _ := storeDir.Readdirnames(n) + files, _ := storeDir.Readdir(-1) + + // Sort the dentries. + sort.Slice(files, func(i, j int) bool { + return files[i].ModTime().Unix() < files[j].ModTime().Unix() + }) + + for _, file := range files { + names = append(names, file.Name()) + } + _ = storeDir.Close() return names } diff --git a/pkg/event/target/queuestore_test.go b/pkg/event/target/queuestore_test.go index 04892573b..9f2bcc109 100644 --- a/pkg/event/target/queuestore_test.go +++ b/pkg/event/target/queuestore_test.go @@ -68,8 +68,8 @@ func TestQueueStorePut(t *testing.T) { } } // Count the events. - if len(store.ListN(-1)) != 100 { - t.Fatalf("ListN() Expected: 100, got %d", len(store.ListN(-1))) + if len(store.List()) != 100 { + t.Fatalf("List() Expected: 100, got %d", len(store.List())) } } @@ -90,7 +90,7 @@ func TestQueueStoreGet(t *testing.T) { t.Fatal("Failed to put to queue store ", err) } } - eventKeys := store.ListN(-1) + eventKeys := store.List() // Get 10 events. if len(eventKeys) == 10 { for _, key := range eventKeys { @@ -103,7 +103,7 @@ func TestQueueStoreGet(t *testing.T) { } } } else { - t.Fatalf("ListN() Expected: 10, got %d", len(eventKeys)) + t.Fatalf("List() Expected: 10, got %d", len(eventKeys)) } } @@ -124,7 +124,7 @@ func TestQueueStoreDel(t *testing.T) { t.Fatal("Failed to put to queue store ", err) } } - eventKeys := store.ListN(-1) + eventKeys := store.List() // Remove all the events. if len(eventKeys) == 20 { for _, key := range eventKeys { @@ -134,11 +134,11 @@ func TestQueueStoreDel(t *testing.T) { } } } else { - t.Fatalf("ListN() Expected: 20, got %d", len(eventKeys)) + t.Fatalf("List() Expected: 20, got %d", len(eventKeys)) } - if len(store.ListN(-1)) != 0 { - t.Fatalf("ListN() Expected: 0, got %d", len(store.ListN(-1))) + if len(store.List()) != 0 { + t.Fatalf("List() Expected: 0, got %d", len(store.List())) } } @@ -181,12 +181,8 @@ func TestQueueStoreListN(t *testing.T) { t.Fatal("Failed to put to queue store ", err) } } - // Should return only 5 event keys. - if len(store.ListN(5)) != 5 { - t.Fatalf("ListN(5) Expected: 5, got %d", len(store.ListN(5))) - } // Should return all the event keys in the store. - if len(store.ListN(-1)) != 10 { - t.Fatalf("ListN(-1) Expected: 10, got %d", len(store.ListN(-1))) + if len(store.List()) != 10 { + t.Fatalf("List() Expected: 10, got %d", len(store.List())) } } diff --git a/pkg/event/target/store.go b/pkg/event/target/store.go index 316de4ff5..16b125c9b 100644 --- a/pkg/event/target/store.go +++ b/pkg/event/target/store.go @@ -33,14 +33,11 @@ var errNotConnected = errors.New("not connected to target server/service") // errLimitExceeded error is sent when the maximum limit is reached. var errLimitExceeded = errors.New("the maximum store limit reached") -// errNoSuchKey error is sent in Get when the key is not found. -var errNoSuchKey = errors.New("no such key found in store") - // Store - To persist the events. type Store interface { Put(event event.Event) error Get(key string) (event.Event, error) - ListN(n int) []string + List() []string Del(key string) error Open() error } @@ -55,7 +52,7 @@ func replayEvents(store Store, doneCh <-chan struct{}) <-chan string { defer retryTimer.Stop() defer close(eventKeyCh) for { - names = store.ListN(100) + names = store.List() for _, name := range names { select { case eventKeyCh <- strings.TrimSuffix(name, eventExt):