From cefc43e4daa4cbb490ef6726ea374e26a93eb85e Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Wed, 6 Nov 2024 16:32:39 -0800 Subject: [PATCH] simplify the Get()/GetMultiple() re-use GetRaw() for both (#179) Remember GetMultiple() must be used if your target is calling PutMultiple(), without that the multiple events will not be replayed. --- internal/store/queuestore.go | 50 +++------------------- internal/store/queuestore_test.go | 70 +++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+), 45 deletions(-) diff --git a/internal/store/queuestore.go b/internal/store/queuestore.go index 2fb2ac475..640f821f5 100644 --- a/internal/store/queuestore.go +++ b/internal/store/queuestore.go @@ -251,61 +251,21 @@ func (store *QueueStore[I]) GetRaw(key Key) (raw []byte, err error) { // Get - gets an item from the store. func (store *QueueStore[I]) Get(key Key) (item I, err error) { - store.RLock() - - defer func(store *QueueStore[I]) { - store.RUnlock() - if err != nil && !os.IsNotExist(err) { - // Upon error we remove the entry. - store.Del(key) - } - }(store) - - var eventData []byte - eventData, err = os.ReadFile(filepath.Join(store.directory, key.String())) + items, err := store.GetMultiple(key) if err != nil { return item, err } - - if len(eventData) == 0 { - return item, os.ErrNotExist - } - - if err = json.Unmarshal(eventData, &item); err != nil { - return item, err - } - - return item, nil + return items[0], nil } // GetMultiple will read the multi payload file and fetch the items func (store *QueueStore[I]) GetMultiple(key Key) (items []I, err error) { - store.RLock() - - defer func(store *QueueStore[I]) { - store.RUnlock() - if err != nil && !os.IsNotExist(err) { - // Upon error we remove the entry. - store.Del(key) - } - }(store) - - raw, err := os.ReadFile(filepath.Join(store.directory, key.String())) + raw, err := store.GetRaw(key) if err != nil { - return - } - - var decoder *jsoniter.Decoder - if key.Compress { - decodedBytes, err := s2.Decode(nil, raw) - if err != nil { - return nil, err - } - decoder = jsoniter.ConfigCompatibleWithStandardLibrary.NewDecoder(bytes.NewReader(decodedBytes)) - } else { - decoder = jsoniter.ConfigCompatibleWithStandardLibrary.NewDecoder(bytes.NewReader(raw)) + return nil, err } + decoder := jsoniter.ConfigCompatibleWithStandardLibrary.NewDecoder(bytes.NewReader(raw)) for decoder.More() { var item I if err := decoder.Decode(&item); err != nil { diff --git a/internal/store/queuestore_test.go b/internal/store/queuestore_test.go index bcb47049d..8e50d5964 100644 --- a/internal/store/queuestore_test.go +++ b/internal/store/queuestore_test.go @@ -18,11 +18,15 @@ package store import ( + "bytes" "fmt" "os" "path/filepath" "reflect" "testing" + + jsoniter "github.com/json-iterator/go" + "github.com/valyala/bytebufferpool" ) type TestItem struct { @@ -221,6 +225,72 @@ func TestQueueStoreListN(t *testing.T) { } } +func TestMultiplePutGetRaw(t *testing.T) { + defer func() { + if err := tearDownQueueStore(); err != nil { + t.Fatalf("Failed to tear down store; %v", err) + } + }() + store, err := setUpQueueStore(queueDir, 10) + if err != nil { + t.Fatalf("Failed to create a queue store; %v", err) + } + // TestItem{Name: "test-item", Property: "property"} + var items []TestItem + for i := 0; i < 10; i++ { + items = append(items, TestItem{ + Name: fmt.Sprintf("test-item-%d", i), + Property: "property", + }) + } + + buf := bytebufferpool.Get() + defer bytebufferpool.Put(buf) + + enc := jsoniter.ConfigCompatibleWithStandardLibrary.NewEncoder(buf) + for i := range items { + if err = enc.Encode(items[i]); err != nil { + t.Fatal(err) + } + } + + if _, err := store.PutMultiple(items); err != nil { + t.Fatalf("failed to put multiple; %v", err) + } + + keys := store.List() + if len(keys) != 1 { + t.Fatalf("expected len(keys)=1, but found %d", len(keys)) + } + + key := keys[0] + if !key.Compress { + t.Fatal("expected the item to be compressed") + } + if key.ItemCount != 10 { + t.Fatalf("expected itemcount=10 but found %v", key.ItemCount) + } + + raw, err := store.GetRaw(key) + if err != nil { + t.Fatalf("unable to get multiple items; %v", err) + } + + if !bytes.Equal(buf.Bytes(), raw) { + t.Fatalf("expected bytes: %d vs read bytes is wrong %d", len(buf.Bytes()), len(raw)) + } + + if err := store.Del(key); err != nil { + t.Fatalf("unable to Del; %v", err) + } + + // Re-list + keys = store.List() + if len(keys) > 0 || err != nil { + t.Fatalf("Expected List() to return empty list and no error, got %v err: %v", keys, err) + } +} + func TestMultiplePutGets(t *testing.T) { defer func() { if err := tearDownQueueStore(); err != nil {