diff --git a/cmd/config-current.go b/cmd/config-current.go index c3c0b328a..3a6886d67 100644 --- a/cmd/config-current.go +++ b/cmd/config-current.go @@ -336,7 +336,7 @@ func (s *serverConfig) TestNotificationTargets() error { if !v.Enable { continue } - t, err := target.NewMQTTTarget(k, v) + t, err := target.NewMQTTTarget(k, v, GlobalServiceDoneCh) if err != nil { return fmt.Errorf("mqtt(%s): %s", k, err.Error()) } @@ -682,7 +682,7 @@ func getNotificationTargets(config *serverConfig) *event.TargetList { for id, args := range config.Notify.MQTT { if args.Enable { args.RootCAs = globalRootCAs - newTarget, err := target.NewMQTTTarget(id, args) + newTarget, err := target.NewMQTTTarget(id, args, GlobalServiceDoneCh) if err != nil { logger.LogIf(context.Background(), err) continue diff --git a/cmd/peer-rest-client-target.go b/cmd/peer-rest-client-target.go index 6adc48a57..27b0e01f6 100644 --- a/cmd/peer-rest-client-target.go +++ b/cmd/peer-rest-client-target.go @@ -31,8 +31,18 @@ func (target *PeerRESTClientTarget) ID() event.TargetID { return target.id } -// Send - sends event to remote peer by making RPC call. -func (target *PeerRESTClientTarget) Send(eventData event.Event) error { +// Save - Sends event directly without persisting. +func (target *PeerRESTClientTarget) Save(eventData event.Event) error { + return target.send(eventData) +} + +// Send - interface compatible method does no-op. +func (target *PeerRESTClientTarget) Send(eventKey string) error { + return nil +} + +// sends event to remote peer by making RPC call. +func (target *PeerRESTClientTarget) send(eventData event.Event) error { return target.restClient.SendEvent(target.bucketName, target.id, target.remoteTargetID, eventData) } diff --git a/pkg/event/target/amqp.go b/pkg/event/target/amqp.go index e752ae91a..8322557bd 100644 --- a/pkg/event/target/amqp.go +++ b/pkg/event/target/amqp.go @@ -107,8 +107,12 @@ func (target *AMQPTarget) channel() (*amqp.Channel, error) { return ch, nil } -// Send - sends event to AMQP. -func (target *AMQPTarget) Send(eventData event.Event) error { +// Save - Sends event directly without persisting. +func (target *AMQPTarget) Save(eventData event.Event) error { + return target.send(eventData) +} + +func (target *AMQPTarget) send(eventData event.Event) error { ch, err := target.channel() if err != nil { return err @@ -142,6 +146,11 @@ func (target *AMQPTarget) Send(eventData event.Event) error { }) } +// Send - interface compatible method does no-op. +func (target *AMQPTarget) Send(eventKey string) error { + return nil +} + // Close - does nothing and available for interface compatibility. func (target *AMQPTarget) Close() error { return nil diff --git a/pkg/event/target/elasticsearch.go b/pkg/event/target/elasticsearch.go index 8cab4e2a1..46eef6409 100644 --- a/pkg/event/target/elasticsearch.go +++ b/pkg/event/target/elasticsearch.go @@ -69,8 +69,13 @@ func (target *ElasticsearchTarget) ID() event.TargetID { return target.id } -// Send - sends event to Elasticsearch. -func (target *ElasticsearchTarget) Send(eventData event.Event) error { +// Save - Sends event directly without persisting. +func (target *ElasticsearchTarget) Save(eventData event.Event) error { + return target.send(eventData) +} + +func (target *ElasticsearchTarget) send(eventData event.Event) error { + var key string remove := func() error { @@ -111,6 +116,11 @@ func (target *ElasticsearchTarget) Send(eventData event.Event) error { return nil } +// Send - interface compatible method does no-op. +func (target *ElasticsearchTarget) Send(eventKey string) error { + return nil +} + // Close - does nothing and available for interface compatibility. func (target *ElasticsearchTarget) Close() error { return nil diff --git a/pkg/event/target/httpclient.go b/pkg/event/target/httpclient.go index 72591779c..68420ff01 100644 --- a/pkg/event/target/httpclient.go +++ b/pkg/event/target/httpclient.go @@ -89,8 +89,12 @@ func (target *HTTPClientTarget) start() { }() } -// Send - sends event to HTTP client. -func (target *HTTPClientTarget) Send(eventData event.Event) error { +// Save - sends event to HTTP client. +func (target *HTTPClientTarget) Save(eventData event.Event) error { + return target.send(eventData) +} + +func (target *HTTPClientTarget) send(eventData event.Event) error { if atomic.LoadUint32(&target.isRunning) != 0 { return errors.New("closed http connection") } @@ -109,6 +113,11 @@ func (target *HTTPClientTarget) Send(eventData event.Event) error { } } +// Send - interface compatible method does no-op. +func (target *HTTPClientTarget) Send(eventKey string) error { + return nil +} + // Close - closes underneath goroutine. func (target *HTTPClientTarget) Close() error { atomic.AddUint32(&target.isStopped, 1) diff --git a/pkg/event/target/kafka.go b/pkg/event/target/kafka.go index 20963c076..fe1a01b68 100644 --- a/pkg/event/target/kafka.go +++ b/pkg/event/target/kafka.go @@ -73,8 +73,12 @@ func (target *KafkaTarget) ID() event.TargetID { return target.id } -// Send - sends event to Kafka. -func (target *KafkaTarget) Send(eventData event.Event) error { +// Save - Sends event directly without persisting. +func (target *KafkaTarget) Save(eventData event.Event) error { + return target.send(eventData) +} + +func (target *KafkaTarget) send(eventData event.Event) error { objectName, err := url.QueryUnescape(eventData.S3.Object.Key) if err != nil { return err @@ -96,6 +100,11 @@ func (target *KafkaTarget) Send(eventData event.Event) error { return err } +// Send - interface compatible method does no-op. +func (target *KafkaTarget) Send(eventKey string) error { + return nil +} + // Close - closes underneath kafka connection. func (target *KafkaTarget) Close() error { return target.producer.Close() diff --git a/pkg/event/target/memorystore.go b/pkg/event/target/memorystore.go index f790cbbbb..e35f5ae26 100644 --- a/pkg/event/target/memorystore.go +++ b/pkg/event/target/memorystore.go @@ -57,7 +57,7 @@ func (store *MemoryStore) Put(e event.Event) error { store.Lock() defer store.Unlock() if store.eC == store.limit { - return ErrLimitExceeded + return errLimitExceeded } key, kErr := getNewUUID() if kErr != nil { @@ -77,27 +77,41 @@ func (store *MemoryStore) Get(key string) (event.Event, error) { return event, nil } - return event.Event{}, ErrNoSuchKey + return event.Event{}, errNoSuchKey } // Del - deletes the event from store. -func (store *MemoryStore) Del(key string) { +func (store *MemoryStore) Del(key string) error { store.Lock() defer store.Unlock() delete(store.events, key) store.eC-- + + return nil } -// ListAll - lists all the keys in the store. -func (store *MemoryStore) ListAll() []string { +// ListN - lists atmost N keys in the store. +func (store *MemoryStore) ListN(n int) []string { store.RLock() defer store.RUnlock() + var i int + + if n == -1 { + n = len(store.events) + } + keys := []string{} for k := range store.events { - keys = append(keys, k) + if i < n { + keys = append(keys, k) + i++ + continue + } else { + break + } } return keys diff --git a/pkg/event/target/memorystore_test.go b/pkg/event/target/memorystore_test.go index 09848a33e..1745aeeab 100644 --- a/pkg/event/target/memorystore_test.go +++ b/pkg/event/target/memorystore_test.go @@ -23,7 +23,7 @@ import ( // TestMemoryStorePut - Tests for store.Put func TestMemoryStorePut(t *testing.T) { - store := NewMemoryStore(1000) + store := NewMemoryStore(100) defer func() { store = nil }() @@ -32,14 +32,14 @@ func TestMemoryStorePut(t *testing.T) { t.Fatal("Failed to put to queue store ", err) } } - if len(store.ListAll()) != 100 { - t.Fatalf("ListAll() Expected: 100, got %d", len(store.ListAll())) + if len(store.ListN(-1)) != 100 { + t.Fatalf("ListN() Expected: 100, got %d", len(store.ListN(-1))) } } // TestMemoryStoreGet - Tests for store.Get. func TestMemoryStoreGet(t *testing.T) { - store := NewMemoryStore(1000) + store := NewMemoryStore(10) defer func() { store = nil }() @@ -48,7 +48,7 @@ func TestMemoryStoreGet(t *testing.T) { t.Fatal("Failed to put to queue store ", err) } } - eventKeys := store.ListAll() + eventKeys := store.ListN(-1) if len(eventKeys) == 10 { for _, key := range eventKeys { event, eErr := store.Get(key) @@ -60,13 +60,13 @@ func TestMemoryStoreGet(t *testing.T) { } } } else { - t.Fatalf("ListAll() Expected: 10, got %d", len(eventKeys)) + t.Fatalf("ListN() Expected: 10, got %d", len(eventKeys)) } } // TestMemoryStoreDel - Tests for store.Del. func TestMemoryStoreDel(t *testing.T) { - store := NewMemoryStore(1000) + store := NewMemoryStore(20) defer func() { store = nil }() @@ -75,17 +75,17 @@ func TestMemoryStoreDel(t *testing.T) { t.Fatal("Failed to put to queue store ", err) } } - eventKeys := store.ListAll() + eventKeys := store.ListN(-1) if len(eventKeys) == 20 { for _, key := range eventKeys { - store.Del(key) + _ = store.Del(key) } } else { - t.Fatalf("ListAll() Expected: 20, got %d", len(eventKeys)) + t.Fatalf("ListN() Expected: 20, got %d", len(eventKeys)) } - if len(store.ListAll()) != 0 { - t.Fatalf("ListAll() Expected: 0, got %d", len(store.ListAll())) + if len(store.ListN(-1)) != 0 { + t.Fatalf("ListN() Expected: 0, got %d", len(store.ListN(-1))) } } @@ -101,6 +101,25 @@ func TestMemoryStoreLimit(t *testing.T) { } } if err := store.Put(testEvent); err == nil { - t.Fatalf("Expected to fail with %s, but passes", ErrLimitExceeded) + t.Fatalf("Expected to fail with %s, but passes", errLimitExceeded) + } +} + +// TestMemoryStoreListN - tests for store.ListN. +func TestMemoryStoreListN(t *testing.T) { + store := NewMemoryStore(10) + defer func() { + store = nil + }() + for i := 0; i < 10; i++ { + if err := store.Put(testEvent); err != nil { + t.Fatal("Failed to put to queue store ", err) + } + } + if len(store.ListN(5)) != 5 { + t.Fatalf("ListN(5) Expected: 5, got %d", len(store.ListN(5))) + } + if len(store.ListN(-1)) != 10 { + t.Fatalf("ListN(-1) Expected: 10, got %d", len(store.ListN(-1))) } } diff --git a/pkg/event/target/mqtt.go b/pkg/event/target/mqtt.go index 20246ec36..9d9b79d2d 100644 --- a/pkg/event/target/mqtt.go +++ b/pkg/event/target/mqtt.go @@ -22,6 +22,7 @@ import ( "encoding/json" "errors" "net/url" + "os" "path/filepath" "time" @@ -31,8 +32,8 @@ import ( ) const ( - retryInterval = 3 // In Seconds reconnectInterval = 5 // In Seconds + storePrefix = "minio" ) // MQTTArgs - MQTT target arguments. @@ -82,7 +83,6 @@ type MQTTTarget struct { args MQTTArgs client mqtt.Client store Store - reconn bool } // ID - returns target ID. @@ -90,33 +90,22 @@ func (target *MQTTTarget) ID() event.TargetID { return target.id } -// Reads persisted events from the store and re-plays. -func (target *MQTTTarget) retry() { - target.reconn = true - events := target.store.ListAll() - for len(events) != 0 { - for _, key := range events { - event, eErr := target.store.Get(key) - if eErr != nil { - continue - } - for !target.client.IsConnectionOpen() { - time.Sleep(retryInterval * time.Second) - } - // The connection is open. - if err := target.send(event); err != nil { - continue - } - // Delete after a successful publish. - target.store.Del(key) - } - events = target.store.ListAll() - } - // Release the reconn state. - target.reconn = false -} +// Send - sends event to MQTT. +func (target *MQTTTarget) Send(eventKey string) error { -func (target *MQTTTarget) send(eventData event.Event) error { + if !target.client.IsConnectionOpen() { + return errNotConnected + } + + eventData, eErr := target.store.Get(eventKey) + if eErr != nil { + // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() + // Such events will not exist and wouldve been already been sent successfully. + if os.IsNotExist(eErr) { + return nil + } + return eErr + } objectName, err := url.QueryUnescape(eventData.S3.Object.Key) if err != nil { @@ -131,26 +120,17 @@ func (target *MQTTTarget) send(eventData event.Event) error { token := target.client.Publish(target.args.Topic, target.args.QoS, false, string(data)) token.Wait() - return token.Error() - -} - -// Send - sends event to MQTT when the connection is active. -func (target *MQTTTarget) Send(eventData event.Event) error { - // Persist the events if the connection is not active. - if !target.client.IsConnectionOpen() { - if err := target.store.Put(eventData); err != nil { - return err - } - // Ignore if retry is triggered already. - if !target.reconn { - go target.retry() - } - return nil + if token.Error() != nil { + return token.Error() } - // Publishes to the broker as the connection is active. - return target.send(eventData) + // Delete the event from store. + return target.store.Del(eventKey) +} + +// Save - saves the events to the store which will be replayed when the mqtt connection is active. +func (target *MQTTTarget) Save(eventData event.Event) error { + return target.store.Put(eventData) } // Close - does nothing and available for interface compatibility. @@ -159,7 +139,7 @@ func (target *MQTTTarget) Close() error { } // NewMQTTTarget - creates new MQTT target. -func NewMQTTTarget(id string, args MQTTArgs) (*MQTTTarget, error) { +func NewMQTTTarget(id string, args MQTTArgs, doneCh <-chan struct{}) (*MQTTTarget, error) { options := mqtt.NewClientOptions(). SetClientID(""). SetCleanSession(true). @@ -173,7 +153,8 @@ func NewMQTTTarget(id string, args MQTTArgs) (*MQTTTarget, error) { var store Store if args.QueueDir != "" { - store = NewQueueStore(args.QueueDir, args.QueueLimit) + queueDir := filepath.Join(args.QueueDir, storePrefix+"-mqtt-"+id) + store = NewQueueStore(queueDir, args.QueueLimit) if oErr := store.Open(); oErr != nil { return nil, oErr } @@ -205,14 +186,13 @@ func NewMQTTTarget(id string, args MQTTArgs) (*MQTTTarget, error) { args: args, client: client, store: store, - reconn: false, } - // Replay any previously persisted events in the store. - if len(target.store.ListAll()) != 0 { - go target.retry() + // Replays the events from the store. + eventKeyCh := replayEvents(target.store, doneCh) - } + // Start replaying events from the store. + go sendEvents(target, eventKeyCh, doneCh) return target, nil } diff --git a/pkg/event/target/mysql.go b/pkg/event/target/mysql.go index 09bda6996..cbe01c235 100644 --- a/pkg/event/target/mysql.go +++ b/pkg/event/target/mysql.go @@ -141,8 +141,12 @@ func (target *MySQLTarget) ID() event.TargetID { return target.id } -// Send - sends event to MySQL. -func (target *MySQLTarget) Send(eventData event.Event) error { +// Save - Sends event directly without persisting. +func (target *MySQLTarget) Save(eventData event.Event) error { + return target.send(eventData) +} + +func (target *MySQLTarget) send(eventData event.Event) error { if target.args.Format == event.NamespaceFormat { objectName, err := url.QueryUnescape(eventData.S3.Object.Key) if err != nil { @@ -181,6 +185,11 @@ func (target *MySQLTarget) Send(eventData event.Event) error { return nil } +// Send - interface compatible method does no-op. +func (target *MySQLTarget) Send(eventKey string) error { + return nil +} + // Close - closes underneath connections to MySQL database. func (target *MySQLTarget) Close() error { if target.updateStmt != nil { diff --git a/pkg/event/target/nats.go b/pkg/event/target/nats.go index d0c25eb59..830962dbd 100644 --- a/pkg/event/target/nats.go +++ b/pkg/event/target/nats.go @@ -81,8 +81,12 @@ func (target *NATSTarget) ID() event.TargetID { return target.id } -// Send - sends event to NATS. -func (target *NATSTarget) Send(eventData event.Event) (err error) { +// Save - Sends event directly without persisting. +func (target *NATSTarget) Save(eventData event.Event) error { + return target.send(eventData) +} + +func (target *NATSTarget) send(eventData event.Event) error { objectName, err := url.QueryUnescape(eventData.S3.Object.Key) if err != nil { return err @@ -107,6 +111,11 @@ func (target *NATSTarget) Send(eventData event.Event) (err error) { return err } +// Send - interface compatible method does no-op. +func (target *NATSTarget) Send(eventKey string) error { + return nil +} + // Close - closes underneath connections to NATS server. func (target *NATSTarget) Close() (err error) { if target.stanConn != nil { diff --git a/pkg/event/target/nsq.go b/pkg/event/target/nsq.go index 600ab50a0..1d5e34d08 100644 --- a/pkg/event/target/nsq.go +++ b/pkg/event/target/nsq.go @@ -68,8 +68,12 @@ func (target *NSQTarget) ID() event.TargetID { return target.id } -// Send - sends event to NSQD. -func (target *NSQTarget) Send(eventData event.Event) (err error) { +// Save - Sends event directly without persisting. +func (target *NSQTarget) Save(eventData event.Event) error { + return target.send(eventData) +} + +func (target *NSQTarget) send(eventData event.Event) (err error) { objectName, err := url.QueryUnescape(eventData.S3.Object.Key) if err != nil { return err @@ -86,6 +90,11 @@ func (target *NSQTarget) Send(eventData event.Event) (err error) { return err } +// Send - interface compatible method does no-op. +func (target *NSQTarget) Send(eventKey string) error { + return nil +} + // Close - closes underneath connections to NSQD server. func (target *NSQTarget) Close() (err error) { // this blocks until complete: diff --git a/pkg/event/target/postgresql.go b/pkg/event/target/postgresql.go index 193227341..da2fbba29 100644 --- a/pkg/event/target/postgresql.go +++ b/pkg/event/target/postgresql.go @@ -140,8 +140,12 @@ func (target *PostgreSQLTarget) ID() event.TargetID { return target.id } -// Send - sends event to PostgreSQL. -func (target *PostgreSQLTarget) Send(eventData event.Event) error { +// Save - Sends event directly without persisting. +func (target *PostgreSQLTarget) Save(eventData event.Event) error { + return target.send(eventData) +} + +func (target *PostgreSQLTarget) send(eventData event.Event) error { if target.args.Format == event.NamespaceFormat { objectName, err := url.QueryUnescape(eventData.S3.Object.Key) if err != nil { @@ -180,6 +184,11 @@ func (target *PostgreSQLTarget) Send(eventData event.Event) error { return nil } +// Send - interface compatible method does no-op. +func (target *PostgreSQLTarget) Send(eventKey string) error { + return nil +} + // Close - closes underneath connections to PostgreSQL database. func (target *PostgreSQLTarget) Close() error { if target.updateStmt != nil { diff --git a/pkg/event/target/queuestore.go b/pkg/event/target/queuestore.go index 79f5d8b89..06210a911 100644 --- a/pkg/event/target/queuestore.go +++ b/pkg/event/target/queuestore.go @@ -21,7 +21,6 @@ import ( "io/ioutil" "os" "path/filepath" - "strings" "sync" "github.com/minio/minio/pkg/event" @@ -61,9 +60,9 @@ func (store *QueueStore) Open() error { return terr } - eCount := uint16(len(store.listAll())) + eCount := uint16(len(store.listN(-1))) if eCount >= store.limit { - return ErrLimitExceeded + return errLimitExceeded } store.eC = eCount @@ -96,7 +95,7 @@ func (store *QueueStore) Put(e event.Event) error { store.Lock() defer store.Unlock() if store.eC >= store.limit { - return ErrLimitExceeded + return errLimitExceeded } key, kErr := getNewUUID() if kErr != nil { @@ -134,45 +133,38 @@ func (store *QueueStore) Get(key string) (event.Event, error) { } // Del - Deletes an entry from the store. -func (store *QueueStore) Del(key string) { +func (store *QueueStore) Del(key string) error { store.Lock() defer store.Unlock() - store.del(key) + return store.del(key) } // lockless call -func (store *QueueStore) del(key string) { +func (store *QueueStore) del(key string) error { p := filepath.Join(store.directory, key+eventExt) rerr := os.Remove(p) if rerr != nil { - return + return rerr } // Decrement the event count. store.eC-- + + return nil } -// ListAll - lists all the keys in the directory. -func (store *QueueStore) ListAll() []string { +// ListN - lists atmost N files from the directory. +func (store *QueueStore) ListN(n int) []string { store.RLock() defer store.RUnlock() - return store.listAll() + return store.listN(n) } // lockless call. -func (store *QueueStore) listAll() []string { - var err error - var keys []string - var files []os.FileInfo - - files, err = ioutil.ReadDir(store.directory) - if err != nil { - return nil - } - - for _, f := range files { - keys = append(keys, strings.TrimSuffix(f.Name(), eventExt)) - } - return keys +func (store *QueueStore) listN(n int) []string { + storeDir, _ := os.Open(store.directory) + names, _ := storeDir.Readdirnames(n) + _ = storeDir.Close() + return names } diff --git a/pkg/event/target/queuestore_test.go b/pkg/event/target/queuestore_test.go index b57b18bdd..04892573b 100644 --- a/pkg/event/target/queuestore_test.go +++ b/pkg/event/target/queuestore_test.go @@ -20,6 +20,7 @@ import ( "os" "path/filepath" "reflect" + "strings" "testing" "github.com/minio/minio/pkg/event" @@ -55,7 +56,7 @@ func TestQueueStorePut(t *testing.T) { t.Fatal("Failed to tear down store ", err) } }() - store, err := setUpStore(queueDir, 10000) + store, err := setUpStore(queueDir, 100) if err != nil { t.Fatal("Failed to create a queue store ", err) @@ -67,8 +68,8 @@ func TestQueueStorePut(t *testing.T) { } } // Count the events. - if len(store.ListAll()) != 100 { - t.Fatalf("ListAll() Expected: 100, got %d", len(store.ListAll())) + if len(store.ListN(-1)) != 100 { + t.Fatalf("ListN() Expected: 100, got %d", len(store.ListN(-1))) } } @@ -79,7 +80,7 @@ func TestQueueStoreGet(t *testing.T) { t.Fatal("Failed to tear down store ", err) } }() - store, err := setUpStore(queueDir, 10000) + store, err := setUpStore(queueDir, 10) if err != nil { t.Fatal("Failed to create a queue store ", err) } @@ -89,11 +90,11 @@ func TestQueueStoreGet(t *testing.T) { t.Fatal("Failed to put to queue store ", err) } } - eventKeys := store.ListAll() + eventKeys := store.ListN(-1) // Get 10 events. if len(eventKeys) == 10 { for _, key := range eventKeys { - event, eErr := store.Get(key) + event, eErr := store.Get(strings.TrimSuffix(key, eventExt)) if eErr != nil { t.Fatal("Failed to Get the event from the queue store ", eErr) } @@ -102,7 +103,7 @@ func TestQueueStoreGet(t *testing.T) { } } } else { - t.Fatalf("ListAll() Expected: 10, got %d", len(eventKeys)) + t.Fatalf("ListN() Expected: 10, got %d", len(eventKeys)) } } @@ -113,7 +114,7 @@ func TestQueueStoreDel(t *testing.T) { t.Fatal("Failed to tear down store ", err) } }() - store, err := setUpStore(queueDir, 10000) + store, err := setUpStore(queueDir, 20) if err != nil { t.Fatal("Failed to create a queue store ", err) } @@ -123,18 +124,21 @@ func TestQueueStoreDel(t *testing.T) { t.Fatal("Failed to put to queue store ", err) } } - eventKeys := store.ListAll() + eventKeys := store.ListN(-1) // Remove all the events. if len(eventKeys) == 20 { for _, key := range eventKeys { - store.Del(key) + err := store.Del(strings.TrimSuffix(key, eventExt)) + if err != nil { + t.Fatal("queue store Del failed with ", err) + } } } else { - t.Fatalf("ListAll() Expected: 20, got %d", len(eventKeys)) + t.Fatalf("ListN() Expected: 20, got %d", len(eventKeys)) } - if len(store.ListAll()) != 0 { - t.Fatalf("ListAll() Expected: 0, got %d", len(store.ListAll())) + if len(store.ListN(-1)) != 0 { + t.Fatalf("ListN() Expected: 0, got %d", len(store.ListN(-1))) } } @@ -157,6 +161,32 @@ func TestQueueStoreLimit(t *testing.T) { } // Should not allow 6th Put. if err := store.Put(testEvent); err == nil { - t.Fatalf("Expected to fail with %s, but passes", ErrLimitExceeded) + t.Fatalf("Expected to fail with %s, but passes", errLimitExceeded) + } +} + +// TestQueueStoreLimit - tests for store.LimitN. +func TestQueueStoreListN(t *testing.T) { + defer func() { + if err := tearDownStore(); err != nil { + t.Fatal("Failed to tear down store ", err) + } + }() + store, err := setUpStore(queueDir, 10) + if err != nil { + t.Fatal("Failed to create a queue store ", err) + } + for i := 0; i < 10; i++ { + if err := store.Put(testEvent); err != nil { + t.Fatal("Failed to put to queue store ", err) + } + } + // Should return only 5 event keys. + if len(store.ListN(5)) != 5 { + t.Fatalf("ListN(5) Expected: 5, got %d", len(store.ListN(5))) + } + // Should return all the event keys in the store. + if len(store.ListN(-1)) != 10 { + t.Fatalf("ListN(-1) Expected: 10, got %d", len(store.ListN(-1))) } } diff --git a/pkg/event/target/redis.go b/pkg/event/target/redis.go index f37905e04..2db62ba97 100644 --- a/pkg/event/target/redis.go +++ b/pkg/event/target/redis.go @@ -69,8 +69,12 @@ func (target *RedisTarget) ID() event.TargetID { return target.id } -// Send - sends event to Redis. -func (target *RedisTarget) Send(eventData event.Event) error { +// Save - Sends event directly without persisting. +func (target *RedisTarget) Save(eventData event.Event) error { + return target.send(eventData) +} + +func (target *RedisTarget) send(eventData event.Event) error { conn := target.pool.Get() defer func() { // FIXME: log returned error. ignore time being. @@ -109,6 +113,11 @@ func (target *RedisTarget) Send(eventData event.Event) error { return nil } +// Send - interface compatible method does no-op. +func (target *RedisTarget) Send(eventKey string) error { + return nil +} + // Close - does nothing and available for interface compatibility. func (target *RedisTarget) Close() error { return nil diff --git a/pkg/event/target/store.go b/pkg/event/target/store.go index ee1824e95..316de4ff5 100644 --- a/pkg/event/target/store.go +++ b/pkg/event/target/store.go @@ -18,20 +18,106 @@ package target import ( "errors" + "fmt" + "strings" + "time" + "github.com/minio/minio/pkg/event" ) -// ErrLimitExceeded error is sent when the maximum limit is reached. -var ErrLimitExceeded = errors.New("[Store] The maximum limit reached") +const retryInterval = 3 * time.Second -// ErrNoSuchKey error is sent in Get when the key is not found. -var ErrNoSuchKey = errors.New("[Store] No such key found") +// errNotConnected - indicates that the target connection is not active. +var errNotConnected = errors.New("not connected to target server/service") + +// errLimitExceeded error is sent when the maximum limit is reached. +var errLimitExceeded = errors.New("the maximum store limit reached") + +// errNoSuchKey error is sent in Get when the key is not found. +var errNoSuchKey = errors.New("no such key found in store") // Store - To persist the events. type Store interface { Put(event event.Event) error Get(key string) (event.Event, error) - ListAll() []string - Del(key string) + ListN(n int) []string + Del(key string) error Open() error } + +// replayEvents - Reads the events from the store and replays. +func replayEvents(store Store, doneCh <-chan struct{}) <-chan string { + var names []string + eventKeyCh := make(chan string) + + go func() { + retryTimer := time.NewTimer(retryInterval) + defer retryTimer.Stop() + defer close(eventKeyCh) + for { + names = store.ListN(100) + for _, name := range names { + select { + case eventKeyCh <- strings.TrimSuffix(name, eventExt): + // Get next key. + case <-doneCh: + return + } + } + + if len(names) < 2 { + retryTimer.Reset(retryInterval) + select { + case <-retryTimer.C: + case <-doneCh: + return + } + } + } + }() + + return eventKeyCh +} + +// sendEvents - Reads events from the store and re-plays. +func sendEvents(target event.Target, eventKeyCh <-chan string, doneCh <-chan struct{}) { + retryTimer := time.NewTimer(retryInterval) + defer retryTimer.Stop() + + send := func(eventKey string) bool { + for { + err := target.Send(eventKey) + if err == nil { + break + } + + if err != errNotConnected { + panic(fmt.Errorf("target.Send() failed with '%v'", err)) + } + + retryTimer.Reset(retryInterval) + select { + case <-retryTimer.C: + case <-doneCh: + return false + } + } + return true + } + + for { + select { + case eventKey, ok := <-eventKeyCh: + if !ok { + // closed channel. + return + } + + if !send(eventKey) { + return + } + case <-doneCh: + return + } + } +} diff --git a/pkg/event/target/webhook.go b/pkg/event/target/webhook.go index 6e629944b..78fe19575 100644 --- a/pkg/event/target/webhook.go +++ b/pkg/event/target/webhook.go @@ -64,8 +64,12 @@ func (target WebhookTarget) ID() event.TargetID { return target.id } -// Send - sends event to Webhook. -func (target *WebhookTarget) Send(eventData event.Event) error { +// Save - Sends event directly without persisting. +func (target *WebhookTarget) Save(eventData event.Event) error { + return target.send(eventData) +} + +func (target *WebhookTarget) send(eventData event.Event) error { objectName, err := url.QueryUnescape(eventData.S3.Object.Key) if err != nil { return err @@ -100,6 +104,11 @@ func (target *WebhookTarget) Send(eventData event.Event) error { return nil } +// Send - interface compatible method does no-op. +func (target *WebhookTarget) Send(eventKey string) error { + return nil +} + // Close - does nothing and available for interface compatibility. func (target *WebhookTarget) Close() error { return nil diff --git a/pkg/event/targetlist.go b/pkg/event/targetlist.go index 3fb5a4d61..c2082c386 100644 --- a/pkg/event/targetlist.go +++ b/pkg/event/targetlist.go @@ -24,7 +24,8 @@ import ( // Target - event target interface type Target interface { ID() TargetID - Send(Event) error + Save(Event) error + Send(string) error Close() error } @@ -130,7 +131,7 @@ func (list *TargetList) Send(event Event, targetIDs ...TargetID) <-chan TargetID wg.Add(1) go func(id TargetID, target Target) { defer wg.Done() - if err := target.Send(event); err != nil { + if err := target.Save(event); err != nil { errCh <- TargetIDErr{ ID: id, Err: err, diff --git a/pkg/event/targetlist_test.go b/pkg/event/targetlist_test.go index ec7912566..149973cf1 100644 --- a/pkg/event/targetlist_test.go +++ b/pkg/event/targetlist_test.go @@ -34,7 +34,12 @@ func (target ExampleTarget) ID() TargetID { return target.id } -func (target ExampleTarget) Send(eventData Event) error { +// Save - Sends event directly without persisting. +func (target ExampleTarget) Save(eventData Event) error { + return target.send(eventData) +} + +func (target ExampleTarget) send(eventData Event) error { b := make([]byte, 1) if _, err := rand.Read(b); err != nil { panic(err) @@ -49,6 +54,11 @@ func (target ExampleTarget) Send(eventData Event) error { return nil } +// Send - interface compatible method does no-op. +func (target ExampleTarget) Send(eventKey string) error { + return nil +} + func (target ExampleTarget) Close() error { if target.closeErr { return errors.New("close error")