mirror of https://github.com/minio/minio.git
simplify the Get()/GetMultiple() re-use GetRaw() for both (#179)
Remember GetMultiple() must be used if your target is calling PutMultiple(), without that the multiple events will not be replayed.
This commit is contained in:
parent
25e34fda5f
commit
cefc43e4da
|
@ -251,61 +251,21 @@ func (store *QueueStore[I]) GetRaw(key Key) (raw []byte, err error) {
|
||||||
|
|
||||||
// Get - gets an item from the store.
|
// Get - gets an item from the store.
|
||||||
func (store *QueueStore[I]) Get(key Key) (item I, err error) {
|
func (store *QueueStore[I]) Get(key Key) (item I, err error) {
|
||||||
store.RLock()
|
items, err := store.GetMultiple(key)
|
||||||
|
|
||||||
defer func(store *QueueStore[I]) {
|
|
||||||
store.RUnlock()
|
|
||||||
if err != nil && !os.IsNotExist(err) {
|
|
||||||
// Upon error we remove the entry.
|
|
||||||
store.Del(key)
|
|
||||||
}
|
|
||||||
}(store)
|
|
||||||
|
|
||||||
var eventData []byte
|
|
||||||
eventData, err = os.ReadFile(filepath.Join(store.directory, key.String()))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return item, err
|
return item, err
|
||||||
}
|
}
|
||||||
|
return items[0], nil
|
||||||
if len(eventData) == 0 {
|
|
||||||
return item, os.ErrNotExist
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = json.Unmarshal(eventData, &item); err != nil {
|
|
||||||
return item, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return item, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetMultiple will read the multi payload file and fetch the items
|
// GetMultiple will read the multi payload file and fetch the items
|
||||||
func (store *QueueStore[I]) GetMultiple(key Key) (items []I, err error) {
|
func (store *QueueStore[I]) GetMultiple(key Key) (items []I, err error) {
|
||||||
store.RLock()
|
raw, err := store.GetRaw(key)
|
||||||
|
|
||||||
defer func(store *QueueStore[I]) {
|
|
||||||
store.RUnlock()
|
|
||||||
if err != nil && !os.IsNotExist(err) {
|
|
||||||
// Upon error we remove the entry.
|
|
||||||
store.Del(key)
|
|
||||||
}
|
|
||||||
}(store)
|
|
||||||
|
|
||||||
raw, err := os.ReadFile(filepath.Join(store.directory, key.String()))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return nil, err
|
||||||
}
|
|
||||||
|
|
||||||
var decoder *jsoniter.Decoder
|
|
||||||
if key.Compress {
|
|
||||||
decodedBytes, err := s2.Decode(nil, raw)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
decoder = jsoniter.ConfigCompatibleWithStandardLibrary.NewDecoder(bytes.NewReader(decodedBytes))
|
|
||||||
} else {
|
|
||||||
decoder = jsoniter.ConfigCompatibleWithStandardLibrary.NewDecoder(bytes.NewReader(raw))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
decoder := jsoniter.ConfigCompatibleWithStandardLibrary.NewDecoder(bytes.NewReader(raw))
|
||||||
for decoder.More() {
|
for decoder.More() {
|
||||||
var item I
|
var item I
|
||||||
if err := decoder.Decode(&item); err != nil {
|
if err := decoder.Decode(&item); err != nil {
|
||||||
|
|
|
@ -18,11 +18,15 @@
|
||||||
package store
|
package store
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
jsoniter "github.com/json-iterator/go"
|
||||||
|
"github.com/valyala/bytebufferpool"
|
||||||
)
|
)
|
||||||
|
|
||||||
type TestItem struct {
|
type TestItem struct {
|
||||||
|
@ -221,6 +225,72 @@ func TestQueueStoreListN(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMultiplePutGetRaw(t *testing.T) {
|
||||||
|
defer func() {
|
||||||
|
if err := tearDownQueueStore(); err != nil {
|
||||||
|
t.Fatalf("Failed to tear down store; %v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
store, err := setUpQueueStore(queueDir, 10)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create a queue store; %v", err)
|
||||||
|
}
|
||||||
|
// TestItem{Name: "test-item", Property: "property"}
|
||||||
|
var items []TestItem
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
items = append(items, TestItem{
|
||||||
|
Name: fmt.Sprintf("test-item-%d", i),
|
||||||
|
Property: "property",
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
buf := bytebufferpool.Get()
|
||||||
|
defer bytebufferpool.Put(buf)
|
||||||
|
|
||||||
|
enc := jsoniter.ConfigCompatibleWithStandardLibrary.NewEncoder(buf)
|
||||||
|
for i := range items {
|
||||||
|
if err = enc.Encode(items[i]); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := store.PutMultiple(items); err != nil {
|
||||||
|
t.Fatalf("failed to put multiple; %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
keys := store.List()
|
||||||
|
if len(keys) != 1 {
|
||||||
|
t.Fatalf("expected len(keys)=1, but found %d", len(keys))
|
||||||
|
}
|
||||||
|
|
||||||
|
key := keys[0]
|
||||||
|
if !key.Compress {
|
||||||
|
t.Fatal("expected the item to be compressed")
|
||||||
|
}
|
||||||
|
if key.ItemCount != 10 {
|
||||||
|
t.Fatalf("expected itemcount=10 but found %v", key.ItemCount)
|
||||||
|
}
|
||||||
|
|
||||||
|
raw, err := store.GetRaw(key)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to get multiple items; %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !bytes.Equal(buf.Bytes(), raw) {
|
||||||
|
t.Fatalf("expected bytes: %d vs read bytes is wrong %d", len(buf.Bytes()), len(raw))
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := store.Del(key); err != nil {
|
||||||
|
t.Fatalf("unable to Del; %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Re-list
|
||||||
|
keys = store.List()
|
||||||
|
if len(keys) > 0 || err != nil {
|
||||||
|
t.Fatalf("Expected List() to return empty list and no error, got %v err: %v", keys, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestMultiplePutGets(t *testing.T) {
|
func TestMultiplePutGets(t *testing.T) {
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := tearDownQueueStore(); err != nil {
|
if err := tearDownQueueStore(); err != nil {
|
||||||
|
|
Loading…
Reference in New Issue