Event persistence for MQTT (#7268)

- The events will be persisted in queueStore if `queueDir` is set.
- Else, if queueDir is not set events persist in memory.

The events are replayed back when the mqtt broker is back online.
This commit is contained in:
Praveen raj Mani
2019-02-26 07:31:13 +05:30
committed by Harshavardhana
parent 2fc341394d
commit 78d116c487
11 changed files with 684 additions and 28 deletions

View File

@@ -0,0 +1,104 @@
/*
* Minio Cloud Storage, (C) 2019 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package target
import (
"sync"
"github.com/minio/minio/pkg/event"
)
const (
maxStoreLimit = 10000
)
// MemoryStore persists events in memory.
type MemoryStore struct {
sync.RWMutex
events map[string]event.Event
eC uint16
limit uint16
}
// NewMemoryStore creates a memory store instance.
func NewMemoryStore(limit uint16) *MemoryStore {
if limit == 0 || limit > maxStoreLimit {
limit = maxStoreLimit
}
memoryStore := &MemoryStore{
events: make(map[string]event.Event),
limit: limit,
}
return memoryStore
}
// Open is in-effective here.
// Implemented for interface compatibility.
func (store *MemoryStore) Open() error {
return nil
}
// Put - puts the event in store.
func (store *MemoryStore) Put(e event.Event) error {
store.Lock()
defer store.Unlock()
if store.eC == store.limit {
return ErrLimitExceeded
}
key, kErr := getNewUUID()
if kErr != nil {
return kErr
}
store.events[key] = e
store.eC++
return nil
}
// Get - retrieves the event from store.
func (store *MemoryStore) Get(key string) (event.Event, error) {
store.RLock()
defer store.RUnlock()
if event, exist := store.events[key]; exist {
return event, nil
}
return event.Event{}, ErrNoSuchKey
}
// Del - deletes the event from store.
func (store *MemoryStore) Del(key string) {
store.Lock()
defer store.Unlock()
delete(store.events, key)
store.eC--
}
// ListAll - lists all the keys in the store.
func (store *MemoryStore) ListAll() []string {
store.RLock()
defer store.RUnlock()
keys := []string{}
for k := range store.events {
keys = append(keys, k)
}
return keys
}

View File

@@ -0,0 +1,106 @@
/*
* Minio Cloud Storage, (C) 2019 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package target
import (
"reflect"
"testing"
)
// TestMemoryStorePut - Tests for store.Put
func TestMemoryStorePut(t *testing.T) {
store := NewMemoryStore(1000)
defer func() {
store = nil
}()
for i := 0; i < 100; i++ {
if err := store.Put(testEvent); err != nil {
t.Fatal("Failed to put to queue store ", err)
}
}
if len(store.ListAll()) != 100 {
t.Fatalf("ListAll() Expected: 100, got %d", len(store.ListAll()))
}
}
// TestMemoryStoreGet - Tests for store.Get.
func TestMemoryStoreGet(t *testing.T) {
store := NewMemoryStore(1000)
defer func() {
store = nil
}()
for i := 0; i < 10; i++ {
if err := store.Put(testEvent); err != nil {
t.Fatal("Failed to put to queue store ", err)
}
}
eventKeys := store.ListAll()
if len(eventKeys) == 10 {
for _, key := range eventKeys {
event, eErr := store.Get(key)
if eErr != nil {
t.Fatal("Failed to Get the event from the queue store ", eErr)
}
if !reflect.DeepEqual(testEvent, event) {
t.Fatalf("Failed to read the event: error: expected = %v, got = %v", testEvent, event)
}
}
} else {
t.Fatalf("ListAll() Expected: 10, got %d", len(eventKeys))
}
}
// TestMemoryStoreDel - Tests for store.Del.
func TestMemoryStoreDel(t *testing.T) {
store := NewMemoryStore(1000)
defer func() {
store = nil
}()
for i := 0; i < 20; i++ {
if err := store.Put(testEvent); err != nil {
t.Fatal("Failed to put to queue store ", err)
}
}
eventKeys := store.ListAll()
if len(eventKeys) == 20 {
for _, key := range eventKeys {
store.Del(key)
}
} else {
t.Fatalf("ListAll() Expected: 20, got %d", len(eventKeys))
}
if len(store.ListAll()) != 0 {
t.Fatalf("ListAll() Expected: 0, got %d", len(store.ListAll()))
}
}
// TestMemoryStoreLimit - tests for store limit.
func TestMemoryStoreLimit(t *testing.T) {
store := NewMemoryStore(5)
defer func() {
store = nil
}()
for i := 0; i < 5; i++ {
if err := store.Put(testEvent); err != nil {
t.Fatal("Failed to put to queue store ", err)
}
}
if err := store.Put(testEvent); err == nil {
t.Fatalf("Expected to fail with %s, but passes", ErrLimitExceeded)
}
}

View File

@@ -30,6 +30,11 @@ import (
xnet "github.com/minio/minio/pkg/net"
)
const (
retryInterval = 3 // In Seconds
reconnectInterval = 5 // In Seconds
)
// MQTTArgs - MQTT target arguments.
type MQTTArgs struct {
Enable bool `json:"enable"`
@@ -42,6 +47,7 @@ type MQTTArgs struct {
KeepAlive time.Duration `json:"keepAliveInterval"`
RootCAs *x509.CertPool `json:"-"`
QueueDir string `json:"queueDir"`
QueueLimit uint16 `json:"queueLimit"`
}
// Validate MQTTArgs fields
@@ -75,6 +81,8 @@ type MQTTTarget struct {
id event.TargetID
args MQTTArgs
client mqtt.Client
store Store
reconn bool
}
// ID - returns target ID.
@@ -82,16 +90,33 @@ func (target *MQTTTarget) ID() event.TargetID {
return target.id
}
// Send - sends event to MQTT.
func (target *MQTTTarget) Send(eventData event.Event) error {
if !target.client.IsConnected() {
token := target.client.Connect()
if token.Wait() {
if err := token.Error(); err != nil {
return err
// Reads persisted events from the store and re-plays.
func (target *MQTTTarget) retry() {
target.reconn = true
events := target.store.ListAll()
for len(events) != 0 {
for _, key := range events {
event, eErr := target.store.Get(key)
if eErr != nil {
continue
}
for !target.client.IsConnectionOpen() {
time.Sleep(retryInterval * time.Second)
}
// The connection is open.
if err := target.send(event); err != nil {
continue
}
// Delete after a successful publish.
target.store.Del(key)
}
events = target.store.ListAll()
}
// Release the reconn state.
target.reconn = false
}
func (target *MQTTTarget) send(eventData event.Event) error {
objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
if err != nil {
@@ -105,16 +130,27 @@ func (target *MQTTTarget) Send(eventData event.Event) error {
}
token := target.client.Publish(target.args.Topic, target.args.QoS, false, string(data))
token.Wait()
return token.Error()
// In-case if the queueDir is configured OR mqtt broker is offline, the token.Wait() waits indefinitely
if target.args.QueueDir == "" {
token.Wait()
return token.Error()
}
// Send - sends event to MQTT when the connection is active.
func (target *MQTTTarget) Send(eventData event.Event) error {
// Persist the events if the connection is not active.
if !target.client.IsConnectionOpen() {
if err := target.store.Put(eventData); err != nil {
return err
}
// Ignore if retry is triggered already.
if !target.reconn {
go target.retry()
}
return nil
}
// Need a fix from the paho.mqtt.golang library - https://github.com/eclipse/paho.mqtt.golang/issues/274
// Right now the server panics on IO File store errors, returning nil for now.
return nil
// Publishes to the broker as the connection is active.
return target.send(eventData)
}
// Close - does nothing and available for interface compatibility.
@@ -134,19 +170,49 @@ func NewMQTTTarget(id string, args MQTTArgs) (*MQTTTarget, error) {
SetTLSConfig(&tls.Config{RootCAs: args.RootCAs}).
AddBroker(args.Broker.String())
var store Store
if args.QueueDir != "" {
options = options.SetStore(mqtt.NewFileStore(args.QueueDir))
store = NewQueueStore(args.QueueDir, args.QueueLimit)
if oErr := store.Open(); oErr != nil {
return nil, oErr
}
} else {
store = NewMemoryStore(args.QueueLimit)
}
client := mqtt.NewClient(options)
token := client.Connect()
if token.Wait() && token.Error() != nil {
return nil, token.Error()
}
return &MQTTTarget{
// The client should establish a first time connection.
// Connect() should be successful atleast once to publish events.
token := client.Connect()
go func() {
// Repeat the pings until the client registers the clientId and receives a token.
for {
if token.Wait() && token.Error() == nil {
// Connected
break
}
// Reconnecting
time.Sleep(reconnectInterval * time.Second)
token = client.Connect()
}
}()
target := &MQTTTarget{
id: event.TargetID{ID: id, Name: "mqtt"},
args: args,
client: client,
}, nil
store: store,
reconn: false,
}
// Replay any previously persisted events in the store.
if len(target.store.ListAll()) != 0 {
go target.retry()
}
return target, nil
}

View File

@@ -0,0 +1,178 @@
/*
* Minio Cloud Storage, (C) 2019 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package target
import (
"encoding/json"
"io/ioutil"
"os"
"path/filepath"
"strings"
"sync"
"github.com/minio/minio/pkg/event"
)
const (
maxLimit = 10000 // Max store limit.
eventExt = ".event"
)
// QueueStore - Filestore for persisting events.
type QueueStore struct {
sync.RWMutex
directory string
eC uint16
limit uint16
}
// NewQueueStore - Creates an instance for QueueStore.
func NewQueueStore(directory string, limit uint16) *QueueStore {
if limit == 0 {
limit = maxLimit
}
queueStore := &QueueStore{
directory: directory,
limit: limit,
}
return queueStore
}
// Open - Creates the directory if not present.
func (store *QueueStore) Open() error {
store.Lock()
defer store.Unlock()
if terr := os.MkdirAll(store.directory, os.FileMode(0770)); terr != nil {
return terr
}
eCount := uint16(len(store.listAll()))
if eCount >= store.limit {
return ErrLimitExceeded
}
store.eC = eCount
return nil
}
// write - writes event to the directory.
func (store *QueueStore) write(directory string, key string, e event.Event) error {
// Marshalls the event.
eventData, err := json.Marshal(e)
if err != nil {
return err
}
path := filepath.Join(store.directory, key+eventExt)
if err := ioutil.WriteFile(path, eventData, os.FileMode(0770)); err != nil {
return err
}
// Increment the event count.
store.eC++
return nil
}
// Put - puts a event to the store.
func (store *QueueStore) Put(e event.Event) error {
store.Lock()
defer store.Unlock()
if store.eC >= store.limit {
return ErrLimitExceeded
}
key, kErr := getNewUUID()
if kErr != nil {
return kErr
}
return store.write(store.directory, key, e)
}
// Get - gets a event from the store.
func (store *QueueStore) Get(key string) (event.Event, error) {
store.RLock()
defer store.RUnlock()
var event event.Event
filepath := filepath.Join(store.directory, key+eventExt)
eventData, rerr := ioutil.ReadFile(filepath)
if rerr != nil {
store.del(key)
return event, rerr
}
if len(eventData) == 0 {
store.del(key)
}
uerr := json.Unmarshal(eventData, &event)
if uerr != nil {
store.del(key)
return event, uerr
}
return event, nil
}
// Del - Deletes an entry from the store.
func (store *QueueStore) Del(key string) {
store.Lock()
defer store.Unlock()
store.del(key)
}
// lockless call
func (store *QueueStore) del(key string) {
p := filepath.Join(store.directory, key+eventExt)
rerr := os.Remove(p)
if rerr != nil {
return
}
// Decrement the event count.
store.eC--
}
// ListAll - lists all the keys in the directory.
func (store *QueueStore) ListAll() []string {
store.RLock()
defer store.RUnlock()
return store.listAll()
}
// lockless call.
func (store *QueueStore) listAll() []string {
var err error
var keys []string
var files []os.FileInfo
files, err = ioutil.ReadDir(store.directory)
if err != nil {
return nil
}
for _, f := range files {
keys = append(keys, strings.TrimSuffix(f.Name(), eventExt))
}
return keys
}

View File

@@ -0,0 +1,162 @@
/*
* Minio Cloud Storage, (C) 2019 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package target
import (
"os"
"path/filepath"
"reflect"
"testing"
"github.com/minio/minio/pkg/event"
)
// TestDir
var queueDir = filepath.Join(os.TempDir(), "minio_test")
// Sample test event.
var testEvent = event.Event{EventVersion: "1.0", EventSource: "test_source", AwsRegion: "test_region", EventTime: "test_time", EventName: event.ObjectAccessedGet}
// Initialize the store.
func setUpStore(directory string, limit uint16) (Store, error) {
store := NewQueueStore(queueDir, limit)
if oErr := store.Open(); oErr != nil {
return nil, oErr
}
return store, nil
}
// Tear down store
func tearDownStore() error {
if err := os.RemoveAll(queueDir); err != nil {
return err
}
return nil
}
// TestQueueStorePut - tests for store.Put
func TestQueueStorePut(t *testing.T) {
defer func() {
if err := tearDownStore(); err != nil {
t.Fatal("Failed to tear down store ", err)
}
}()
store, err := setUpStore(queueDir, 10000)
if err != nil {
t.Fatal("Failed to create a queue store ", err)
}
// Put 100 events.
for i := 0; i < 100; i++ {
if err := store.Put(testEvent); err != nil {
t.Fatal("Failed to put to queue store ", err)
}
}
// Count the events.
if len(store.ListAll()) != 100 {
t.Fatalf("ListAll() Expected: 100, got %d", len(store.ListAll()))
}
}
// TestQueueStoreGet - tests for store.Get
func TestQueueStoreGet(t *testing.T) {
defer func() {
if err := tearDownStore(); err != nil {
t.Fatal("Failed to tear down store ", err)
}
}()
store, err := setUpStore(queueDir, 10000)
if err != nil {
t.Fatal("Failed to create a queue store ", err)
}
// Put 10 events
for i := 0; i < 10; i++ {
if err := store.Put(testEvent); err != nil {
t.Fatal("Failed to put to queue store ", err)
}
}
eventKeys := store.ListAll()
// Get 10 events.
if len(eventKeys) == 10 {
for _, key := range eventKeys {
event, eErr := store.Get(key)
if eErr != nil {
t.Fatal("Failed to Get the event from the queue store ", eErr)
}
if !reflect.DeepEqual(testEvent, event) {
t.Fatalf("Failed to read the event: error: expected = %v, got = %v", testEvent, event)
}
}
} else {
t.Fatalf("ListAll() Expected: 10, got %d", len(eventKeys))
}
}
// TestQueueStoreDel - tests for store.Del
func TestQueueStoreDel(t *testing.T) {
defer func() {
if err := tearDownStore(); err != nil {
t.Fatal("Failed to tear down store ", err)
}
}()
store, err := setUpStore(queueDir, 10000)
if err != nil {
t.Fatal("Failed to create a queue store ", err)
}
// Put 20 events.
for i := 0; i < 20; i++ {
if err := store.Put(testEvent); err != nil {
t.Fatal("Failed to put to queue store ", err)
}
}
eventKeys := store.ListAll()
// Remove all the events.
if len(eventKeys) == 20 {
for _, key := range eventKeys {
store.Del(key)
}
} else {
t.Fatalf("ListAll() Expected: 20, got %d", len(eventKeys))
}
if len(store.ListAll()) != 0 {
t.Fatalf("ListAll() Expected: 0, got %d", len(store.ListAll()))
}
}
// TestQueueStoreLimit - tests the event limit for the store.
func TestQueueStoreLimit(t *testing.T) {
defer func() {
if err := tearDownStore(); err != nil {
t.Fatal("Failed to tear down store ", err)
}
}()
// The max limit is set to 5.
store, err := setUpStore(queueDir, 5)
if err != nil {
t.Fatal("Failed to create a queue store ", err)
}
for i := 0; i < 5; i++ {
if err := store.Put(testEvent); err != nil {
t.Fatal("Failed to put to queue store ", err)
}
}
// Should not allow 6th Put.
if err := store.Put(testEvent); err == nil {
t.Fatalf("Expected to fail with %s, but passes", ErrLimitExceeded)
}
}

37
pkg/event/target/store.go Normal file
View File

@@ -0,0 +1,37 @@
/*
* Minio Cloud Storage, (C) 2019 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package target
import (
"errors"
"github.com/minio/minio/pkg/event"
)
// ErrLimitExceeded error is sent when the maximum limit is reached.
var ErrLimitExceeded = errors.New("[Store] The maximum limit reached")
// ErrNoSuchKey error is sent in Get when the key is not found.
var ErrNoSuchKey = errors.New("[Store] No such key found")
// Store - To persist the events.
type Store interface {
Put(event event.Event) error
Get(key string) (event.Event, error)
ListAll() []string
Del(key string)
Open() error
}