Add 'tiedot' integration package, first cut - more improvements needed

This commit is contained in:
Harshavardhana 2015-01-27 17:20:51 -08:00
parent 25ad24997e
commit c63a6dba2f
28 changed files with 3786 additions and 0 deletions

25
Godeps/Godeps.json generated
View File

@ -5,6 +5,31 @@
"./..."
],
"Deps": [
{
"ImportPath": "github.com/HouzuoGuo/tiedot/data",
"Comment": "3.1.3-33-g9cd340d",
"Rev": "9cd340d7fda74e630319c58254b04787f3ffe43a"
},
{
"ImportPath": "github.com/HouzuoGuo/tiedot/db",
"Comment": "3.1.3-33-g9cd340d",
"Rev": "9cd340d7fda74e630319c58254b04787f3ffe43a"
},
{
"ImportPath": "github.com/HouzuoGuo/tiedot/dberr",
"Comment": "3.1.3-33-g9cd340d",
"Rev": "9cd340d7fda74e630319c58254b04787f3ffe43a"
},
{
"ImportPath": "github.com/HouzuoGuo/tiedot/gommap",
"Comment": "3.1.3-33-g9cd340d",
"Rev": "9cd340d7fda74e630319c58254b04787f3ffe43a"
},
{
"ImportPath": "github.com/HouzuoGuo/tiedot/tdlog",
"Comment": "3.1.3-33-g9cd340d",
"Rev": "9cd340d7fda74e630319c58254b04787f3ffe43a"
},
{
"ImportPath": "github.com/codegangsta/cli",
"Comment": "1.2.0-42-gfbda1ce",

View File

@ -0,0 +1,144 @@
/*
Collection data file contains document data. Every document has a binary header and UTF-8 text content.
Documents are inserted one after another, and occupies 2x original document size to leave room for future updates.
Deleted documents are marked as deleted and the space is irrecoverable until a "scrub" action (in DB logic) is carried out.
When update takes place, the new document may overwrite original document if there is enough space, otherwise the
original document is marked as deleted and the updated document is inserted as a new document.
*/
package data
import (
"encoding/binary"
"github.com/HouzuoGuo/tiedot/dberr"
)
const (
COL_FILE_GROWTH = 32 * 1048576 // Collection file initial size & size growth (32 MBytes)
DOC_MAX_ROOM = 2 * 1048576 // Max document size (2 MBytes)
DOC_HEADER = 1 + 10 // Document header size - validity (single byte), document room (int 10 bytes)
// Pre-compiled document padding (128 spaces)
PADDING = " "
LEN_PADDING = len(PADDING)
)
// Collection file contains document headers and document text data.
type Collection struct {
*DataFile
}
// Open a collection file.
func OpenCollection(path string) (col *Collection, err error) {
col = new(Collection)
col.DataFile, err = OpenDataFile(path, COL_FILE_GROWTH)
return
}
// Find and retrieve a document by ID (physical document location). Return value is a copy of the document.
func (col *Collection) Read(id int) []byte {
if id < 0 || id > col.Used-DOC_HEADER || col.Buf[id] != 1 {
return nil
} else if room, _ := binary.Varint(col.Buf[id+1 : id+11]); room > DOC_MAX_ROOM {
return nil
} else if docEnd := id + DOC_HEADER + int(room); docEnd >= col.Size {
return nil
} else {
docCopy := make([]byte, room)
copy(docCopy, col.Buf[id+DOC_HEADER:docEnd])
return docCopy
}
}
// Insert a new document, return the new document ID.
func (col *Collection) Insert(data []byte) (id int, err error) {
room := len(data) << 1
if room > DOC_MAX_ROOM {
return 0, dberr.New(dberr.ErrorDocTooLarge, DOC_MAX_ROOM, room)
}
id = col.Used
docSize := DOC_HEADER + room
if err = col.EnsureSize(docSize); err != nil {
return
}
col.Used += docSize
// Write validity, room, document data and padding
col.Buf[id] = 1
binary.PutVarint(col.Buf[id+1:id+11], int64(room))
copy(col.Buf[id+DOC_HEADER:col.Used], data)
for padding := id + DOC_HEADER + len(data); padding < col.Used; padding += LEN_PADDING {
copySize := LEN_PADDING
if padding+LEN_PADDING >= col.Used {
copySize = col.Used - padding
}
copy(col.Buf[padding:padding+copySize], PADDING)
}
return
}
// Overwrite or re-insert a document, return the new document ID if re-inserted.
func (col *Collection) Update(id int, data []byte) (newID int, err error) {
dataLen := len(data)
if dataLen > DOC_MAX_ROOM {
return 0, dberr.New(dberr.ErrorDocTooLarge, DOC_MAX_ROOM, dataLen)
}
if id < 0 || id >= col.Used-DOC_HEADER || col.Buf[id] != 1 {
return 0, dberr.New(dberr.ErrorNoDoc, id)
}
currentDocRoom, _ := binary.Varint(col.Buf[id+1 : id+11])
if currentDocRoom > DOC_MAX_ROOM {
return 0, dberr.New(dberr.ErrorNoDoc, id)
}
if docEnd := id + DOC_HEADER + int(currentDocRoom); docEnd >= col.Size {
return 0, dberr.New(dberr.ErrorNoDoc, id)
}
if dataLen <= int(currentDocRoom) {
padding := id + DOC_HEADER + len(data)
paddingEnd := id + DOC_HEADER + int(currentDocRoom)
// Overwrite data and then overwrite padding
copy(col.Buf[id+DOC_HEADER:padding], data)
for ; padding < paddingEnd; padding += LEN_PADDING {
copySize := LEN_PADDING
if padding+LEN_PADDING >= paddingEnd {
copySize = paddingEnd - padding
}
copy(col.Buf[padding:padding+copySize], PADDING)
}
return id, nil
} else {
// No enough room - re-insert the document
col.Delete(id)
return col.Insert(data)
}
}
// Delete a document by ID.
func (col *Collection) Delete(id int) error {
if id < 0 || id > col.Used-DOC_HEADER || col.Buf[id] != 1 {
return dberr.New(dberr.ErrorNoDoc, id)
}
if col.Buf[id] == 1 {
col.Buf[id] = 0
}
return nil
}
// Run the function on every document; stop when the function returns false.
func (col *Collection) ForEachDoc(fun func(id int, doc []byte) bool) {
for id := 0; id < col.Used-DOC_HEADER && id >= 0; {
validity := col.Buf[id]
room, _ := binary.Varint(col.Buf[id+1 : id+11])
docEnd := id + DOC_HEADER + int(room)
if (validity == 0 || validity == 1) && room <= DOC_MAX_ROOM && docEnd > 0 && docEnd <= col.Used {
if validity == 1 && !fun(id, col.Buf[id+DOC_HEADER:docEnd]) {
break
}
id = docEnd
} else {
// Corrupted document - move on
id++
}
}
}

View File

@ -0,0 +1,263 @@
package data
import (
"os"
"strings"
"testing"
"github.com/HouzuoGuo/tiedot/dberr"
)
func TestInsertRead(t *testing.T) {
tmp := "/tmp/tiedot_test_col"
os.Remove(tmp)
defer os.Remove(tmp)
col, err := OpenCollection(tmp)
if err != nil {
t.Fatalf("Failed to open: %v", err)
return
}
defer col.Close()
docs := [][]byte{
[]byte("abc"),
[]byte("1234")}
ids := [2]int{}
if ids[0], err = col.Insert(docs[0]); ids[0] != 0 || err != nil {
t.Fatalf("Failed to insert: %d %v", ids[0], err)
}
if ids[1], err = col.Insert(docs[1]); err != nil {
t.Fatalf("Failed to insert: %v", err)
}
if doc0 := col.Read(ids[0]); doc0 == nil || strings.TrimSpace(string(doc0)) != string(docs[0]) {
t.Fatal("Failed to read", doc0)
}
if doc1 := col.Read(ids[1]); doc1 == nil || strings.TrimSpace(string(doc1)) != string(docs[1]) {
t.Fatalf("Failed to read")
}
// it shall not panic
col.Read(col.Size)
}
func TestInsertUpdateRead(t *testing.T) {
tmp := "/tmp/tiedot_test_col"
os.Remove(tmp)
defer os.Remove(tmp)
col, err := OpenCollection(tmp)
if err != nil {
t.Fatalf("Failed to open: %v", err)
return
}
defer col.Close()
docs := [][]byte{
[]byte("abc"),
[]byte("1234")}
ids := [2]int{}
if ids[0], err = col.Insert(docs[0]); err != nil {
t.Fatalf("Failed to insert: %v", err)
}
if ids[1], err = col.Insert(docs[1]); err != nil {
t.Fatalf("Failed to insert: %v", err)
}
updated := [2]int{}
if updated[0], err = col.Update(ids[0], []byte("abcdef")); err != nil || updated[0] != ids[0] {
t.Fatalf("Failed to update: %v", err)
}
if updated[1], err = col.Update(ids[1], []byte("longlonglonglonglonglonglong")); err != nil || updated[1] == ids[1] {
t.Fatalf("Failed to update: %v", err)
}
if doc0 := col.Read(updated[0]); doc0 == nil || strings.TrimSpace(string(doc0)) != "abcdef" {
t.Fatalf("Failed to read")
}
if doc1 := col.Read(updated[1]); doc1 == nil || strings.TrimSpace(string(doc1)) != "longlonglonglonglonglonglong" {
t.Fatalf("Failed to read")
}
// it shall not panic
col.Update(col.Size, []byte("abcdef"))
}
func TestInsertDeleteRead(t *testing.T) {
tmp := "/tmp/tiedot_test_col"
os.Remove(tmp)
defer os.Remove(tmp)
col, err := OpenCollection(tmp)
if err != nil {
t.Fatalf("Failed to open: %v", err)
return
}
defer col.Close()
docs := [][]byte{
[]byte("abc"),
[]byte("1234"),
[]byte("2345")}
ids := [3]int{}
if ids[0], err = col.Insert(docs[0]); err != nil {
t.Fatalf("Failed to insert: %v", err)
}
if ids[1], err = col.Insert(docs[1]); err != nil {
t.Fatalf("Failed to insert: %v", err)
}
if ids[2], err = col.Insert(docs[2]); err != nil {
t.Fatalf("Failed to insert: %v", err)
}
if doc0 := col.Read(ids[0]); doc0 == nil || strings.TrimSpace(string(doc0)) != string(docs[0]) {
t.Fatalf("Failed to read")
}
if err = col.Delete(ids[1]); err != nil {
t.Fatal(err)
}
if doc1 := col.Read(ids[1]); doc1 != nil {
t.Fatalf("Did not delete")
}
if doc2 := col.Read(ids[2]); doc2 == nil || strings.TrimSpace(string(doc2)) != string(docs[2]) {
t.Fatalf("Failed to read")
}
// it shall not panic
if err = col.Delete(col.Size); dberr.Type(err) != dberr.ErrorNoDoc {
t.Fatal("did not error")
}
}
func TestInsertReadAll(t *testing.T) {
tmp := "/tmp/tiedot_test_col"
os.Remove(tmp)
defer os.Remove(tmp)
col, err := OpenCollection(tmp)
if err != nil {
t.Fatalf("Failed to open: %v", err)
return
}
defer col.Close()
var ids [5]int
ids[0], err = col.Insert([]byte("abc"))
if err != nil {
t.Fatalf("Insert failed: %v", err)
}
ids[1], err = col.Insert([]byte("abc"))
if err != nil {
t.Fatalf("Insert failed: %v", err)
}
ids[2], err = col.Insert([]byte("abc"))
if err != nil {
t.Fatalf("Insert failed: %v", err)
}
ids[3], err = col.Insert([]byte("abc"))
if err != nil {
t.Fatalf("Insert failed: %v", err)
}
ids[4], err = col.Insert([]byte("abc"))
if err != nil {
t.Fatalf("Insert failed: %v", err)
}
successfullyRead := 0
t.Log(ids)
col.ForEachDoc(func(_ int, _ []byte) bool {
successfullyRead++
return true
})
if successfullyRead != 5 {
t.Fatalf("Should have read 5 documents, but only got %d", successfullyRead)
}
successfullyRead = 0
// intentionally corrupt two docuemnts
col.Buf[ids[4]] = 3 // corrupted validity
col.Buf[ids[2]+1] = 255 // corrupted room
col.ForEachDoc(func(_ int, _ []byte) bool {
successfullyRead++
return true
})
if successfullyRead != 3 {
t.Fatalf("Should have read 3 documents, but %d are recovered", successfullyRead)
}
}
func TestCollectionGrowAndOutOfBoundAccess(t *testing.T) {
tmp := "/tmp/tiedot_test_col"
os.Remove(tmp)
defer os.Remove(tmp)
col, err := OpenCollection(tmp)
if err != nil {
t.Fatalf("Failed to open: %v", err)
return
}
defer col.Close()
// Insert several documents
docs := [][]byte{
[]byte("abc"),
[]byte("1234"),
[]byte("2345")}
if _, err = col.Insert(docs[0]); err != nil {
t.Fatalf("Failed to insert: %v", err)
}
if _, err = col.Insert(docs[1]); err != nil {
t.Fatalf("Failed to insert: %v", err)
}
if _, err = col.Insert(docs[2]); err != nil {
t.Fatalf("Failed to insert: %v", err)
}
// Test UsedSize
calculatedUsedSize := (DOC_HEADER + 3*2) + (DOC_HEADER+4*2)*2
if col.Used != calculatedUsedSize {
t.Fatalf("Invalid UsedSize")
}
// Read invalid location
if doc := col.Read(1); doc != nil {
t.Fatalf("Read invalid location")
}
if doc := col.Read(col.Used); doc != nil {
t.Fatalf("Read invalid location")
}
if doc := col.Read(col.Size); doc != nil {
t.Fatalf("Read invalid location")
}
if doc := col.Read(999999999); doc != nil {
t.Fatalf("Read invalid location")
}
// Update invalid location
if _, err := col.Update(1, []byte{}); dberr.Type(err) != dberr.ErrorNoDoc {
t.Fatalf("Update invalid location")
}
if _, err := col.Update(col.Used, []byte{}); dberr.Type(err) != dberr.ErrorNoDoc {
t.Fatalf("Update invalid location")
}
if _, err := col.Update(col.Size, []byte{}); dberr.Type(err) != dberr.ErrorNoDoc {
t.Fatalf("Update invalid location")
}
if _, err := col.Update(999999999, []byte{}); dberr.Type(err) != dberr.ErrorNoDoc {
t.Fatalf("Update invalid location")
}
// Delete invalid location
if err = col.Delete(1); dberr.Type(err) != dberr.ErrorNoDoc {
t.Fatal("did not error")
}
if err = col.Delete(col.Used); dberr.Type(err) != dberr.ErrorNoDoc {
t.Fatal("did not error")
}
if err = col.Delete(col.Size); dberr.Type(err) != dberr.ErrorNoDoc {
t.Fatal("did not error")
}
if err = col.Delete(999999999); dberr.Type(err) != dberr.ErrorNoDoc {
t.Fatal("did not error")
}
// Insert - not enough room
count := 0
for i := 0; i < COL_FILE_GROWTH; i += DOC_MAX_ROOM {
if _, err := col.Insert(make([]byte, DOC_MAX_ROOM/2)); err != nil {
t.Fatal(err)
}
count++
}
if _, err := col.Insert(make([]byte, DOC_MAX_ROOM/2)); err != nil {
t.Fatal(err)
}
count++
calculatedUsedSize += count * (DOC_HEADER + DOC_MAX_ROOM)
if col.Used != calculatedUsedSize {
t.Fatalf("Wrong UsedSize %d %d", col.Used, calculatedUsedSize)
}
if col.Size != COL_FILE_GROWTH+col.Growth {
t.Fatalf("Size changed?! %d %d %d", col.Size, COL_FILE_GROWTH, col.Growth)
}
if err = col.Close(); err != nil {
t.Fatal(err)
}
}

View File

@ -0,0 +1,120 @@
// Common data file features - enlarge, close, close, etc.
package data
import (
"github.com/HouzuoGuo/tiedot/gommap"
"github.com/HouzuoGuo/tiedot/tdlog"
"os"
)
// Data file keeps track of the amount of total and used space.
type DataFile struct {
Path string
Size, Used, Growth int
Fh *os.File
Buf gommap.MMap
}
// Return true if the buffer begins with 64 consecutive zero bytes.
func LooksEmpty(buf gommap.MMap) bool {
upTo := 1024
if upTo >= len(buf) {
upTo = len(buf) - 1
}
for i := 0; i < upTo; i++ {
if buf[i] != 0 {
return false
}
}
return true
}
// Open a data file that grows by the specified size.
func OpenDataFile(path string, growth int) (file *DataFile, err error) {
file = &DataFile{Path: path, Growth: growth}
if file.Fh, err = os.OpenFile(file.Path, os.O_CREATE|os.O_RDWR, 0600); err != nil {
return
}
var size int64
if size, err = file.Fh.Seek(0, os.SEEK_END); err != nil {
return
}
// Ensure the file is not smaller than file growth
if file.Size = int(size); file.Size < file.Growth {
if err = file.EnsureSize(file.Growth); err != nil {
return
}
}
if file.Buf == nil {
file.Buf, err = gommap.Map(file.Fh)
}
// Bi-sect file buffer to find out how much space is in-use
for low, mid, high := 0, file.Size/2, file.Size; ; {
switch {
case high-mid == 1:
if LooksEmpty(file.Buf[mid:]) {
if mid > 0 && LooksEmpty(file.Buf[mid-1:]) {
file.Used = mid - 1
} else {
file.Used = mid
}
return
}
file.Used = high
return
case LooksEmpty(file.Buf[mid:]):
high = mid
mid = low + (mid-low)/2
default:
low = mid
mid = mid + (high-mid)/2
}
}
tdlog.Infof("%s opened: %d of %d bytes in-use", file.Path, file.Used, file.Size)
return
}
// Ensure there is enough room for that many bytes of data.
func (file *DataFile) EnsureSize(more int) (err error) {
if file.Used+more <= file.Size {
return
} else if file.Buf != nil {
if err = file.Buf.Unmap(); err != nil {
return
}
}
if err = os.Truncate(file.Path, int64(file.Size+file.Growth)); err != nil {
return
} else if file.Buf, err = gommap.Map(file.Fh); err != nil {
return
}
file.Size += file.Growth
tdlog.Infof("%s grown: %d -> %d bytes (%d bytes in-use)", file.Path, file.Size-file.Growth, file.Size, file.Used)
return file.EnsureSize(more)
}
// Un-map the file buffer and close the file handle.
func (file *DataFile) Close() (err error) {
if err = file.Buf.Unmap(); err != nil {
return
}
return file.Fh.Close()
}
// Clear the entire file and resize it to initial size.
func (file *DataFile) Clear() (err error) {
if err = file.Close(); err != nil {
return
} else if err = os.Truncate(file.Path, 0); err != nil {
return
} else if err = os.Truncate(file.Path, int64(file.Growth)); err != nil {
return
} else if file.Fh, err = os.OpenFile(file.Path, os.O_CREATE|os.O_RDWR, 0600); err != nil {
return
} else if file.Buf, err = gommap.Map(file.Fh); err != nil {
return
}
file.Used, file.Size = 0, file.Growth
tdlog.Infof("%s cleared: %d of %d bytes in-use", file.Path, file.Used, file.Size)
return
}

View File

@ -0,0 +1,119 @@
package data
import (
"os"
"testing"
)
func TestOpenFlushClose(t *testing.T) {
tmp := "/tmp/tiedot_test_file"
os.Remove(tmp)
defer os.Remove(tmp)
tmpFile, err := OpenDataFile(tmp, 999)
if err != nil {
t.Fatalf("Failed to open: %v", err)
return
}
defer tmpFile.Close()
if tmpFile.Path != tmp {
t.Fatal("Name not set")
}
if tmpFile.Used != 0 {
t.Fatal("Incorrect Used")
}
if tmpFile.Growth != 999 {
t.Fatal("Growth not set")
}
if tmpFile.Fh == nil || tmpFile.Buf == nil {
t.Fatal("Not mmapped")
}
if err := tmpFile.Close(); err != nil {
t.Fatalf("Failed to close: %v", err)
}
}
func TestFindingAppendAndClear(t *testing.T) {
tmp := "/tmp/tiedot_test_file"
os.Remove(tmp)
defer os.Remove(tmp)
// Open
tmpFile, err := OpenDataFile(tmp, 1024)
if err != nil {
t.Fatalf("Failed to open: %v", err)
return
}
if tmpFile.Used != 0 {
t.Fatal("Incorrect Used", tmpFile.Used)
}
// Write something
tmpFile.Buf[500] = 1
tmpFile.Close()
// Re-open
tmpFile, err = OpenDataFile(tmp, 1024)
if err != nil {
t.Fatalf("Failed to open: %v", err)
}
if tmpFile.Used != 501 {
t.Fatal("Incorrect Used")
}
// Write something again
for i := 750; i < 800; i++ {
tmpFile.Buf[i] = byte('a')
}
tmpFile.Close()
// Re-open again
tmpFile, err = OpenDataFile(tmp, 1024)
if err != nil {
t.Fatalf("Failed to open: %v", err)
}
if tmpFile.Used != 800 {
t.Fatal("Incorrect Append", tmpFile.Used)
}
// Clear the file and test size
if err = tmpFile.Clear(); err != nil {
t.Fatal(err)
}
if !(len(tmpFile.Buf) == 1024 && tmpFile.Buf[750] == 0 && tmpFile.Growth == 1024 && tmpFile.Size == 1024 && tmpFile.Used == 0) {
t.Fatal("Did not clear", len(tmpFile.Buf), tmpFile.Growth, tmpFile.Size, tmpFile.Used)
}
// Can still write to the buffer?
tmpFile.Buf[999] = 1
tmpFile.Close()
}
func TestFileGrow(t *testing.T) {
tmp := "/tmp/tiedot_test_file"
os.Remove(tmp)
defer os.Remove(tmp)
// Open and write something
tmpFile, err := OpenDataFile(tmp, 4)
if err != nil {
t.Fatalf("Failed to open: %v", err)
return
}
tmpFile.Buf[2] = 1
tmpFile.Used = 3
if tmpFile.Size != 4 {
t.Fatal("Incorrect Size", tmpFile.Size)
}
tmpFile.EnsureSize(8)
if tmpFile.Size != 12 { // 3 times file growth = 12 bytes
t.Fatalf("Incorrect Size")
}
if tmpFile.Used != 3 { // Used should not change
t.Fatalf("Incorrect Used")
}
if len(tmpFile.Buf) != 12 {
t.Fatal("Did not remap")
}
if tmpFile.Growth != 4 {
t.Fatalf("Incorrect Growth")
}
// Can write to the new (now larger) region
tmpFile.Buf[10] = 1
tmpFile.Buf[11] = 1
tmpFile.Close()
}

View File

@ -0,0 +1,258 @@
/*
Hash table file contains binary content; it implements a static hash table made of hash buckets and integer entries.
Every bucket has a fixed number of entries. When a bucket becomes full, a new bucket is chained to it in order to store
more entries. Every entry has an integer key and value.
An entry key may have multiple values assigned to it, however the combination of entry key and value must be unique
across the entire hash table.
*/
package data
import (
"encoding/binary"
"github.com/HouzuoGuo/tiedot/tdlog"
"sync"
)
const (
HT_FILE_GROWTH = 32 * 1048576 // Hash table file initial size & file growth
ENTRY_SIZE = 1 + 10 + 10 // Hash entry size: validity (single byte), key (int 10 bytes), value (int 10 bytes)
BUCKET_HEADER = 10 // Bucket header size: next chained bucket number (int 10 bytes)
PER_BUCKET = 16 // Entries per bucket
HASH_BITS = 16 // Number of hash key bits
BUCKET_SIZE = BUCKET_HEADER + PER_BUCKET*ENTRY_SIZE // Size of a bucket
INITIAL_BUCKETS = 65536 // Initial number of buckets == 2 ^ HASH_BITS
)
// Hash table file is a binary file containing buckets of hash entries.
type HashTable struct {
*DataFile
numBuckets int
Lock *sync.RWMutex
}
// Smear the integer entry key and return the portion (first HASH_BITS bytes) used for allocating the entry.
func HashKey(key int) int {
/*
tiedot should be compiled/run on x86-64 systems.
If you decide to compile tiedot on 32-bit systems, the following integer-smear algorithm will cause compilation failure
due to 32-bit interger overflow; therefore you must modify the algorithm.
Do not remove the integer-smear process, and remember to run test cases to verify your mods.
*/
// ========== Integer-smear start =======
key = key ^ (key >> 4)
key = (key ^ 0xdeadbeef) + (key << 5)
key = key ^ (key >> 11)
// ========== Integer-smear end =========
return key & ((1 << HASH_BITS) - 1) // Do not modify this line
}
// Open a hash table file.
func OpenHashTable(path string) (ht *HashTable, err error) {
ht = &HashTable{Lock: new(sync.RWMutex)}
if ht.DataFile, err = OpenDataFile(path, HT_FILE_GROWTH); err != nil {
return
}
ht.calculateNumBuckets()
return
}
// Follow the longest bucket chain to calculate total number of buckets, hence the "used size" of hash table file.
func (ht *HashTable) calculateNumBuckets() {
ht.numBuckets = ht.Size / BUCKET_SIZE
largestBucketNum := INITIAL_BUCKETS - 1
for i := 0; i < INITIAL_BUCKETS; i++ {
lastBucket := ht.lastBucket(i)
if lastBucket > largestBucketNum && lastBucket < ht.numBuckets {
largestBucketNum = lastBucket
}
}
ht.numBuckets = largestBucketNum + 1
usedSize := ht.numBuckets * BUCKET_SIZE
if usedSize > ht.Size {
ht.Used = ht.Size
ht.EnsureSize(usedSize - ht.Used)
}
ht.Used = usedSize
tdlog.Infof("%s: calculated used size is %d", ht.Path, usedSize)
}
// Return number of the next chained bucket.
func (ht *HashTable) nextBucket(bucket int) int {
if bucket >= ht.numBuckets {
return 0
}
bucketAddr := bucket * BUCKET_SIZE
nextUint, err := binary.Varint(ht.Buf[bucketAddr : bucketAddr+10])
next := int(nextUint)
if next == 0 {
return 0
} else if err < 0 || next <= bucket || next >= ht.numBuckets || next < INITIAL_BUCKETS {
tdlog.CritNoRepeat("Bad hash table - repair ASAP %s", ht.Path)
return 0
} else {
return next
}
}
// Return number of the last bucket in chain.
func (ht *HashTable) lastBucket(bucket int) int {
for curr := bucket; ; {
next := ht.nextBucket(curr)
if next == 0 {
return curr
}
curr = next
}
}
// Create and chain a new bucket.
func (ht *HashTable) growBucket(bucket int) {
ht.EnsureSize(BUCKET_SIZE)
lastBucketAddr := ht.lastBucket(bucket) * BUCKET_SIZE
binary.PutVarint(ht.Buf[lastBucketAddr:lastBucketAddr+10], int64(ht.numBuckets))
ht.Used += BUCKET_SIZE
ht.numBuckets++
}
// Clear the entire hash table.
func (ht *HashTable) Clear() (err error) {
if err = ht.DataFile.Clear(); err != nil {
return
}
ht.calculateNumBuckets()
return
}
// Store the entry into a vacant (invalidated or empty) place in the appropriate bucket.
func (ht *HashTable) Put(key, val int) {
for bucket, entry := HashKey(key), 0; ; {
entryAddr := bucket*BUCKET_SIZE + BUCKET_HEADER + entry*ENTRY_SIZE
if ht.Buf[entryAddr] != 1 {
ht.Buf[entryAddr] = 1
binary.PutVarint(ht.Buf[entryAddr+1:entryAddr+11], int64(key))
binary.PutVarint(ht.Buf[entryAddr+11:entryAddr+21], int64(val))
return
}
if entry++; entry == PER_BUCKET {
entry = 0
if bucket = ht.nextBucket(bucket); bucket == 0 {
ht.growBucket(HashKey(key))
ht.Put(key, val)
return
}
}
}
}
// Look up values by key.
func (ht *HashTable) Get(key, limit int) (vals []int) {
if limit == 0 {
vals = make([]int, 0, 10)
} else {
vals = make([]int, 0, limit)
}
for count, entry, bucket := 0, 0, HashKey(key); ; {
entryAddr := bucket*BUCKET_SIZE + BUCKET_HEADER + entry*ENTRY_SIZE
entryKey, _ := binary.Varint(ht.Buf[entryAddr+1 : entryAddr+11])
entryVal, _ := binary.Varint(ht.Buf[entryAddr+11 : entryAddr+21])
if ht.Buf[entryAddr] == 1 {
if int(entryKey) == key {
vals = append(vals, int(entryVal))
if count++; count == limit {
return
}
}
} else if entryKey == 0 && entryVal == 0 {
return
}
if entry++; entry == PER_BUCKET {
entry = 0
if bucket = ht.nextBucket(bucket); bucket == 0 {
return
}
}
}
}
// Flag an entry as invalid, so that Get will not return it later on.
func (ht *HashTable) Remove(key, val int) {
for entry, bucket := 0, HashKey(key); ; {
entryAddr := bucket*BUCKET_SIZE + BUCKET_HEADER + entry*ENTRY_SIZE
entryKey, _ := binary.Varint(ht.Buf[entryAddr+1 : entryAddr+11])
entryVal, _ := binary.Varint(ht.Buf[entryAddr+11 : entryAddr+21])
if ht.Buf[entryAddr] == 1 {
if int(entryKey) == key && int(entryVal) == val {
ht.Buf[entryAddr] = 0
return
}
} else if entryKey == 0 && entryVal == 0 {
return
}
if entry++; entry == PER_BUCKET {
entry = 0
if bucket = ht.nextBucket(bucket); bucket == 0 {
return
}
}
}
}
// Divide the entire hash table into roughly equally sized partitions, and return the start/end key range of the chosen partition.
func GetPartitionRange(partNum, totalParts int) (start int, end int) {
perPart := INITIAL_BUCKETS / totalParts
leftOver := INITIAL_BUCKETS % totalParts
start = partNum * perPart
if leftOver > 0 {
if partNum == 0 {
end += 1
} else if partNum < leftOver {
start += partNum
end += 1
} else {
start += leftOver
}
}
end += start + perPart
if partNum == totalParts-1 {
end = INITIAL_BUCKETS
}
return
}
// Collect entries all the way from "head" bucket to the end of its chained buckets.
func (ht *HashTable) collectEntries(head int) (keys, vals []int) {
keys = make([]int, 0, PER_BUCKET)
vals = make([]int, 0, PER_BUCKET)
var entry, bucket int = 0, head
for {
entryAddr := bucket*BUCKET_SIZE + BUCKET_HEADER + entry*ENTRY_SIZE
entryKey, _ := binary.Varint(ht.Buf[entryAddr+1 : entryAddr+11])
entryVal, _ := binary.Varint(ht.Buf[entryAddr+11 : entryAddr+21])
if ht.Buf[entryAddr] == 1 {
keys = append(keys, int(entryKey))
vals = append(vals, int(entryVal))
} else if entryKey == 0 && entryVal == 0 {
return
}
if entry++; entry == PER_BUCKET {
entry = 0
if bucket = ht.nextBucket(bucket); bucket == 0 {
return
}
}
}
}
// Return all entries in the chosen partition.
func (ht *HashTable) GetPartition(partNum, partSize int) (keys, vals []int) {
rangeStart, rangeEnd := GetPartitionRange(partNum, partSize)
prealloc := (rangeEnd - rangeStart) * PER_BUCKET
keys = make([]int, 0, prealloc)
vals = make([]int, 0, prealloc)
for head := rangeStart; head < rangeEnd; head++ {
k, v := ht.collectEntries(head)
keys = append(keys, k...)
vals = append(vals, v...)
}
return
}

View File

@ -0,0 +1,172 @@
package data
import (
"math"
"os"
"testing"
)
func TestPutGetReopenClear(t *testing.T) {
tmp := "/tmp/tiedot_test_hash"
os.Remove(tmp)
defer os.Remove(tmp)
ht, err := OpenHashTable(tmp)
if err != nil {
t.Fatalf("Failed to open: %v", err)
}
// Test initial size information
if !(ht.numBuckets == INITIAL_BUCKETS && ht.Used == INITIAL_BUCKETS*BUCKET_SIZE && ht.Size == HT_FILE_GROWTH) {
t.Fatal("Wrong size", ht.numBuckets, INITIAL_BUCKETS, ht.Used, INITIAL_BUCKETS*BUCKET_SIZE, ht.Size, HT_FILE_GROWTH)
}
for i := int(0); i < 1024*1024; i++ {
ht.Put(i, i)
}
for i := int(0); i < 1024*1024; i++ {
vals := ht.Get(i, 0)
if !(len(vals) == 1 && vals[0] == i) {
t.Fatalf("Get failed on key %d, got %v", i, vals)
}
}
numBuckets := ht.numBuckets
// Reopen the hash table and test the features
if ht.Close(); err != nil {
panic(err)
}
reopened, err := OpenHashTable(tmp)
if err != nil {
t.Fatalf("Failed to open: %v", err)
}
if reopened.numBuckets != numBuckets {
t.Fatalf("Wrong.numBuckets")
}
if reopened.Used != numBuckets*BUCKET_SIZE {
t.Fatalf("Wrong UsedSize")
}
for i := int(0); i < 1024*1024; i++ {
vals := reopened.Get(i, 0)
if !(len(vals) == 1 && vals[0] == i) {
t.Fatalf("Get failed on key %d, got %v", i, vals)
}
}
// Clear the hash table
if err = reopened.Clear(); err != nil {
t.Fatal(err)
}
if !(reopened.numBuckets == INITIAL_BUCKETS && reopened.Used == INITIAL_BUCKETS*BUCKET_SIZE) {
t.Fatal("Did not clear the hash table")
}
allKV := make(map[int]int)
for i := 0; i < 10; i++ {
keys, vals := reopened.GetPartition(i, 10)
for i, key := range keys {
allKV[key] = vals[i]
}
}
if len(allKV) != 0 {
t.Fatal("Did not clear the hash table")
}
if err = reopened.Close(); err != nil {
t.Fatal(err)
}
}
func TestPutGet2(t *testing.T) {
tmp := "/tmp/tiedot_test_hash"
os.Remove(tmp)
defer os.Remove(tmp)
ht, err := OpenHashTable(tmp)
if err != nil {
t.Fatalf("Failed to open: %v", err)
return
}
defer ht.Close()
ht.Put(1, 1)
ht.Put(1, 2)
ht.Put(1, 3)
ht.Put(2, 1)
ht.Put(2, 2)
ht.Put(2, 3)
vals := ht.Get(1, 0)
if !(len(vals) == 3) {
t.Fatalf("Get failed, got %v", vals)
}
vals = ht.Get(2, 2)
if !(len(vals) == 2) {
t.Fatalf("Get failed, got %v", vals)
}
}
func TestPutRemove(t *testing.T) {
tmp := "/tmp/tiedot_test_hash"
os.Remove(tmp)
defer os.Remove(tmp)
ht, err := OpenHashTable(tmp)
if err != nil {
t.Fatalf("Failed to open: %v", err)
return
}
defer ht.Close()
ht.Put(1, 1)
ht.Put(1, 2)
ht.Put(1, 3)
ht.Put(2, 1)
ht.Put(2, 2)
ht.Put(2, 3)
ht.Remove(1, 1)
ht.Remove(2, 2)
vals := ht.Get(1, 0)
if !(len(vals) == 2) {
t.Fatalf("Did not delete, still have %v", vals)
}
vals = ht.Get(2, 0)
if !(len(vals) == 2) {
t.Fatalf("Did not delete, still have %v", vals)
}
}
func TestPartitionEntries(t *testing.T) {
tmp := "/tmp/tiedot_test_hash"
os.Remove(tmp)
defer os.Remove(tmp)
ht, err := OpenHashTable(tmp)
if err != nil {
t.Fatalf("Failed to open: %v", err)
return
}
defer ht.Close()
number := 2000000
for i := 1; i <= number; i++ {
ht.Put(i, i*2)
if gotBack := ht.Get(i, 0); len(gotBack) != 1 || gotBack[0] != i*2 {
t.Fatal("Written ", i, i*2, "got back", gotBack)
}
}
for parts := 2; parts < 19; parts++ {
t.Log("parts is", parts)
allKV := make(map[int]int)
counter := 0
for i := 0; i < parts; i++ {
start, end := GetPartitionRange(i, parts)
keys, vals := ht.GetPartition(i, parts)
t.Log("Between ", start, end, " there are ", len(keys))
sizeDev := math.Abs(float64(len(keys)-number/parts)) / float64(number/parts)
t.Log("sizeDev", sizeDev)
if sizeDev > 0.1 {
t.Fatal("imbalanced keys")
}
for i, key := range keys {
allKV[key] = vals[i]
}
counter += len(keys)
}
// Verify read back
if counter != number {
t.Fatal("Number of entries does not match, got ", counter)
}
for i := 0; i < number; i++ {
if allKV[i] != i*2 {
t.Fatal("Wrong readback", i, allKV[i])
}
}
}
}

View File

@ -0,0 +1,168 @@
/*
(Collection) Partition is a collection data file accompanied by a hash table in order to allow addressing of a
document using an unchanging ID:
The hash table stores the unchanging ID as entry key and the physical document location as entry value.
*/
package data
import (
"sync"
"github.com/HouzuoGuo/tiedot/dberr"
"github.com/HouzuoGuo/tiedot/tdlog"
)
// Partition associates a hash table with collection documents, allowing addressing of a document using an unchanging ID.
type Partition struct {
col *Collection
lookup *HashTable
updating map[int]struct{}
Lock *sync.RWMutex
}
// Open a collection partition.
func OpenPartition(colPath, lookupPath string) (part *Partition, err error) {
part = &Partition{updating: make(map[int]struct{}), Lock: new(sync.RWMutex)}
if part.col, err = OpenCollection(colPath); err != nil {
return
} else if part.lookup, err = OpenHashTable(lookupPath); err != nil {
return
}
return
}
// Insert a document. The ID may be used to retrieve/update/delete the document later on.
func (part *Partition) Insert(id int, data []byte) (physID int, err error) {
physID, err = part.col.Insert(data)
if err != nil {
return
}
part.lookup.Put(id, physID)
return
}
// Find and retrieve a document by ID.
func (part *Partition) Read(id int) ([]byte, error) {
physID := part.lookup.Get(id, 1)
if len(physID) == 0 {
return nil, dberr.New(dberr.ErrorNoDoc, id)
}
data := part.col.Read(physID[0])
if data == nil {
return nil, dberr.New(dberr.ErrorNoDoc, id)
}
return data, nil
}
// Update a document.
func (part *Partition) Update(id int, data []byte) (err error) {
physID := part.lookup.Get(id, 1)
if len(physID) == 0 {
return dberr.New(dberr.ErrorNoDoc, id)
}
newID, err := part.col.Update(physID[0], data)
if err != nil {
return
}
if newID != physID[0] {
part.lookup.Remove(id, physID[0])
part.lookup.Put(id, newID)
}
return
}
// Lock a document for exclusive update.
func (part *Partition) LockUpdate(id int) (err error) {
if _, alreadyLocked := part.updating[id]; alreadyLocked {
return dberr.New(dberr.ErrorDocLocked, id)
}
part.updating[id] = struct{}{}
return
}
// Unlock a document to make it ready for the next update.
func (part *Partition) UnlockUpdate(id int) {
delete(part.updating, id)
}
// Delete a document.
func (part *Partition) Delete(id int) (err error) {
physID := part.lookup.Get(id, 1)
if len(physID) == 0 {
return dberr.New(dberr.ErrorNoDoc, id)
}
part.col.Delete(physID[0])
part.lookup.Remove(id, physID[0])
return
}
// Partition documents into roughly equally sized portions, and run the function on every document in the portion.
func (part *Partition) ForEachDoc(partNum, totalPart int, fun func(id int, doc []byte) bool) (moveOn bool) {
ids, physIDs := part.lookup.GetPartition(partNum, totalPart)
for i, id := range ids {
data := part.col.Read(physIDs[i])
if data != nil {
if !fun(id, data) {
return false
}
}
}
return true
}
// Return approximate number of documents in the partition.
func (part *Partition) ApproxDocCount() int {
totalPart := 24 // not magic; a larger number makes estimation less accurate, but improves performance
for {
keys, _ := part.lookup.GetPartition(0, totalPart)
if len(keys) == 0 {
if totalPart < 8 {
return 0 // the hash table is really really empty
}
// Try a larger partition size
totalPart = totalPart / 2
} else {
return int(float64(len(keys)) * float64(totalPart))
}
}
}
// Clear data file and lookup hash table.
func (part *Partition) Clear() error {
var err error
if e := part.col.Clear(); e != nil {
tdlog.CritNoRepeat("Failed to clear %s: %v", part.col.Path, e)
err = dberr.New(dberr.ErrorIO)
}
if e := part.lookup.Clear(); e != nil {
tdlog.CritNoRepeat("Failed to clear %s: %v", part.lookup.Path, e)
err = dberr.New(dberr.ErrorIO)
}
return err
}
// Close all file handles.
func (part *Partition) Close() error {
var err error
if e := part.col.Close(); e != nil {
tdlog.CritNoRepeat("Failed to close %s: %v", part.col.Path, e)
err = dberr.New(dberr.ErrorIO)
}
if e := part.lookup.Close(); e != nil {
tdlog.CritNoRepeat("Failed to close %s: %v", part.lookup.Path, e)
err = dberr.New(dberr.ErrorIO)
}
return err
}

View File

@ -0,0 +1,137 @@
package data
import (
"math/rand"
"os"
"strconv"
"testing"
"time"
"github.com/HouzuoGuo/tiedot/dberr"
)
func TestPartitionDocCRUD(t *testing.T) {
colPath := "/tmp/tiedot_test_col"
htPath := "/tmp/tiedot_test_ht"
os.Remove(colPath)
os.Remove(htPath)
defer os.Remove(colPath)
defer os.Remove(htPath)
part, err := OpenPartition(colPath, htPath)
if err != nil {
t.Fatal(err)
}
// Insert & read
if _, err = part.Insert(1, []byte("1")); err != nil {
t.Fatal(err)
}
if _, err = part.Insert(2, []byte("2")); err != nil {
t.Fatal(err)
}
if readback, err := part.Read(1); err != nil || string(readback) != "1 " {
t.Fatal(err, readback)
}
if readback, err := part.Read(2); err != nil || string(readback) != "2 " {
t.Fatal(err, readback)
}
// Update & read
if err = part.Update(1, []byte("abcdef")); err != nil {
t.Fatal(err)
}
if err := part.Update(1234, []byte("abcdef")); dberr.Type(err) != dberr.ErrorNoDoc {
t.Fatal("Did not error")
}
if readback, err := part.Read(1); err != nil || string(readback) != "abcdef " {
t.Fatal(err, readback)
}
// Delete & read
if err = part.Delete(1); err != nil {
t.Fatal(err)
}
if _, err = part.Read(1); dberr.Type(err) != dberr.ErrorNoDoc {
t.Fatal("Did not error")
}
if err = part.Delete(123); dberr.Type(err) != dberr.ErrorNoDoc {
t.Fatal("Did not error")
}
// Lock & unlock
if err = part.LockUpdate(123); err != nil {
t.Fatal(err)
}
if err = part.LockUpdate(123); dberr.Type(err) != dberr.ErrorDocLocked {
t.Fatal("Did not error")
}
part.UnlockUpdate(123)
if err = part.LockUpdate(123); err != nil {
t.Fatal(err)
}
// Foreach
part.ForEachDoc(0, 1, func(id int, doc []byte) bool {
if id != 2 || string(doc) != "2 " {
t.Fatal("ID 2 should be the only remaining document")
}
return true
})
// Finish up
if err = part.Clear(); err != nil {
t.Fatal(err)
}
if err = part.Close(); err != nil {
t.Fatal(err)
}
}
func TestApproxDocCount(t *testing.T) {
rand.Seed(time.Now().UnixNano())
colPath := "/tmp/tiedot_test_col"
htPath := "/tmp/tiedot_test_ht"
os.Remove(colPath)
os.Remove(htPath)
defer os.Remove(colPath)
defer os.Remove(htPath)
part, err := OpenPartition(colPath, htPath)
if err != nil {
t.Fatal(err)
}
defer part.Close()
// Insert 100 documents
for i := 0; i < 100; i++ {
if _, err = part.Insert(rand.Int(), []byte(strconv.Itoa(i))); err != nil {
t.Fatal(err)
}
}
t.Log("ApproxDocCount", part.ApproxDocCount())
if part.ApproxDocCount() < 10 || part.ApproxDocCount() > 300 {
t.Fatal("Approximate is way off", part.ApproxDocCount())
}
// Insert 900 documents
for i := 0; i < 900; i++ {
if _, err = part.Insert(rand.Int(), []byte(strconv.Itoa(i))); err != nil {
t.Fatal(err)
}
}
t.Log("ApproxDocCount", part.ApproxDocCount())
if part.ApproxDocCount() < 500 || part.ApproxDocCount() > 1500 {
t.Fatal("Approximate is way off", part.ApproxDocCount())
}
// Insert another 2000 documents
for i := 0; i < 2000; i++ {
if _, err = part.Insert(rand.Int(), []byte(strconv.Itoa(i))); err != nil {
t.Fatal(err)
}
}
t.Log("ApproxDocCount", part.ApproxDocCount())
if part.ApproxDocCount() < 2000 || part.ApproxDocCount() > 4000 {
t.Fatal("Approximate is way off", part.ApproxDocCount())
}
// See how fast doc count is
start := time.Now().UnixNano()
for i := 0; i < 1000; i++ {
part.ApproxDocCount()
}
timediff := time.Now().UnixNano() - start
t.Log("It took", timediff/1000000, "milliseconds")
if timediff/1000000 > 3500 {
t.Fatal("Algorithm is way too slow")
}
}

View File

@ -0,0 +1,233 @@
/* Collection schema and index management. */
package db
import (
"encoding/json"
"fmt"
"github.com/HouzuoGuo/tiedot/data"
"io/ioutil"
"os"
"path"
"strconv"
"strings"
)
const (
DOC_DATA_FILE = "dat_" // Prefix of partition collection data file name.
DOC_LOOKUP_FILE = "id_" // Prefix of partition hash table (ID lookup) file name.
INDEX_PATH_SEP = "!" // Separator between index keys in index directory name.
)
// Collection has data partitions and some index meta information.
type Col struct {
db *DB
name string
parts []*data.Partition // Collection partitions
hts []map[string]*data.HashTable // Index partitions
indexPaths map[string][]string // Index names and paths
}
// Open a collection and load all indexes.
func OpenCol(db *DB, name string) (*Col, error) {
col := &Col{db: db, name: name}
return col, col.load()
}
// Load collection schema including index schema.
func (col *Col) load() error {
if err := os.MkdirAll(path.Join(col.db.path, col.name), 0700); err != nil {
return err
}
col.parts = make([]*data.Partition, col.db.numParts)
col.hts = make([]map[string]*data.HashTable, col.db.numParts)
for i := 0; i < col.db.numParts; i++ {
col.hts[i] = make(map[string]*data.HashTable)
}
col.indexPaths = make(map[string][]string)
// Open collection document partitions
for i := 0; i < col.db.numParts; i++ {
var err error
if col.parts[i], err = data.OpenPartition(
path.Join(col.db.path, col.name, DOC_DATA_FILE+strconv.Itoa(i)),
path.Join(col.db.path, col.name, DOC_LOOKUP_FILE+strconv.Itoa(i))); err != nil {
return err
}
}
// Look for index directories
colDirContent, err := ioutil.ReadDir(path.Join(col.db.path, col.name))
if err != nil {
return err
}
for _, htDir := range colDirContent {
if !htDir.IsDir() {
continue
}
// Open index partitions
idxName := htDir.Name()
idxPath := strings.Split(idxName, INDEX_PATH_SEP)
col.indexPaths[idxName] = idxPath
for i := 0; i < col.db.numParts; i++ {
if col.hts[i] == nil {
col.hts[i] = make(map[string]*data.HashTable)
}
if col.hts[i][idxName], err = data.OpenHashTable(
path.Join(col.db.path, col.name, idxName, strconv.Itoa(i))); err != nil {
return err
}
}
}
return nil
}
// Close all collection files. Do not use the collection afterwards!
func (col *Col) close() error {
errs := make([]error, 0, 0)
for i := 0; i < col.db.numParts; i++ {
col.parts[i].Lock.Lock()
if err := col.parts[i].Close(); err != nil {
errs = append(errs, err)
}
for _, ht := range col.hts[i] {
if err := ht.Close(); err != nil {
errs = append(errs, err)
}
}
col.parts[i].Lock.Unlock()
}
if len(errs) == 0 {
return nil
}
return fmt.Errorf("%v", errs)
}
func (col *Col) forEachDoc(fun func(id int, doc []byte) (moveOn bool), placeSchemaLock bool) {
if placeSchemaLock {
col.db.schemaLock.RLock()
defer col.db.schemaLock.RUnlock()
}
// Process approx.4k documents in each iteration
partDiv := col.approxDocCount(false) / col.db.numParts / 4000
if partDiv == 0 {
partDiv++
}
for iteratePart := 0; iteratePart < col.db.numParts; iteratePart++ {
part := col.parts[iteratePart]
part.Lock.RLock()
for i := 0; i < partDiv; i++ {
if !part.ForEachDoc(i, partDiv, fun) {
part.Lock.RUnlock()
return
}
}
part.Lock.RUnlock()
}
}
// Do fun for all documents in the collection.
func (col *Col) ForEachDoc(fun func(id int, doc []byte) (moveOn bool)) {
col.forEachDoc(fun, true)
}
// Create an index on the path.
func (col *Col) Index(idxPath []string) (err error) {
col.db.schemaLock.Lock()
defer col.db.schemaLock.Unlock()
idxName := strings.Join(idxPath, INDEX_PATH_SEP)
if _, exists := col.indexPaths[idxName]; exists {
return fmt.Errorf("Path %v is already indexed", idxPath)
}
col.indexPaths[idxName] = idxPath
idxDir := path.Join(col.db.path, col.name, idxName)
if err = os.MkdirAll(idxDir, 0700); err != nil {
return err
}
for i := 0; i < col.db.numParts; i++ {
if col.hts[i][idxName], err = data.OpenHashTable(path.Join(idxDir, strconv.Itoa(i))); err != nil {
return err
}
}
// Put all documents on the new index
col.forEachDoc(func(id int, doc []byte) (moveOn bool) {
var docObj map[string]interface{}
if err := json.Unmarshal(doc, &docObj); err != nil {
// Skip corrupted document
return true
}
for _, idxVal := range GetIn(docObj, idxPath) {
if idxVal != nil {
hashKey := StrHash(fmt.Sprint(idxVal))
col.hts[hashKey%col.db.numParts][idxName].Put(hashKey, id)
}
}
return true
}, false)
return
}
// Return all indexed paths.
func (col *Col) AllIndexes() (ret [][]string) {
col.db.schemaLock.RLock()
defer col.db.schemaLock.RUnlock()
ret = make([][]string, 0, len(col.indexPaths))
for _, path := range col.indexPaths {
pathCopy := make([]string, len(path))
for i, p := range path {
pathCopy[i] = p
}
ret = append(ret, pathCopy)
}
return ret
}
// Remove an index.
func (col *Col) Unindex(idxPath []string) error {
col.db.schemaLock.Lock()
defer col.db.schemaLock.Unlock()
idxName := strings.Join(idxPath, INDEX_PATH_SEP)
if _, exists := col.indexPaths[idxName]; !exists {
return fmt.Errorf("Path %v is not indexed", idxPath)
}
delete(col.indexPaths, idxName)
for i := 0; i < col.db.numParts; i++ {
col.hts[i][idxName].Close()
delete(col.hts[i], idxName)
}
if err := os.RemoveAll(path.Join(col.db.path, col.name, idxName)); err != nil {
return err
}
return nil
}
func (col *Col) approxDocCount(placeSchemaLock bool) int {
if placeSchemaLock {
col.db.schemaLock.RLock()
defer col.db.schemaLock.RUnlock()
}
total := 0
for _, part := range col.parts {
part.Lock.RLock()
total += part.ApproxDocCount()
part.Lock.RUnlock()
}
return total
}
// Return approximate number of documents in the collection.
func (col *Col) ApproxDocCount() int {
return col.approxDocCount(true)
}
// Divide the collection into roughly equally sized pages, and do fun on all documents in the specified page.
func (col *Col) ForEachDocInPage(page, total int, fun func(id int, doc []byte) bool) {
col.db.schemaLock.RLock()
defer col.db.schemaLock.RUnlock()
for iteratePart := 0; iteratePart < col.db.numParts; iteratePart++ {
part := col.parts[iteratePart]
part.Lock.RLock()
if !part.ForEachDoc(page, total, fun) {
part.Lock.RUnlock()
return
}
part.Lock.RUnlock()
}
}

View File

@ -0,0 +1,287 @@
/* Collection and DB storage management. */
package db
import (
"encoding/json"
"fmt"
"github.com/HouzuoGuo/tiedot/tdlog"
"io"
"io/ioutil"
"math/rand"
"os"
"path"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync"
"time"
)
const (
PART_NUM_FILE = "number_of_partitions" // DB-collection-partition-number-configuration file name
)
// Database structures.
type DB struct {
path string // Root path of database directory
numParts int // Total number of partitions
cols map[string]*Col // All collections
schemaLock *sync.RWMutex // Control access to collection instances.
}
// Open database and load all collections & indexes.
func OpenDB(dbPath string) (*DB, error) {
rand.Seed(time.Now().UnixNano()) // document ID generation relies on this RNG
db := &DB{path: dbPath, schemaLock: new(sync.RWMutex)}
return db, db.load()
}
// Load all collection schema.
func (db *DB) load() error {
// Create DB directory and PART_NUM_FILE if necessary
var numPartsAssumed = false
numPartsFilePath := path.Join(db.path, PART_NUM_FILE)
if err := os.MkdirAll(db.path, 0700); err != nil {
return err
}
if partNumFile, err := os.Stat(numPartsFilePath); err != nil {
// The new database has as many partitions as number of CPUs recognized by OS
if err := ioutil.WriteFile(numPartsFilePath, []byte(strconv.Itoa(runtime.NumCPU())), 0600); err != nil {
return err
}
numPartsAssumed = true
} else if partNumFile.IsDir() {
return fmt.Errorf("Database config file %s is actually a directory, is database path correct?", PART_NUM_FILE)
}
// Get number of partitions from the text file
if numParts, err := ioutil.ReadFile(numPartsFilePath); err != nil {
return err
} else if db.numParts, err = strconv.Atoi(strings.Trim(string(numParts), "\r\n ")); err != nil {
return err
}
// Look for collection directories and open the collections
db.cols = make(map[string]*Col)
dirContent, err := ioutil.ReadDir(db.path)
if err != nil {
return err
}
for _, maybeColDir := range dirContent {
if !maybeColDir.IsDir() {
continue
}
if numPartsAssumed {
return fmt.Errorf("Please manually repair database partition number config file %s", numPartsFilePath)
}
if db.cols[maybeColDir.Name()], err = OpenCol(db, maybeColDir.Name()); err != nil {
return err
}
}
return err
}
// Close all database files. Do not use the DB afterwards!
func (db *DB) Close() error {
db.schemaLock.Lock()
defer db.schemaLock.Unlock()
errs := make([]error, 0, 0)
for _, col := range db.cols {
if err := col.close(); err != nil {
errs = append(errs, err)
}
}
if len(errs) == 0 {
return nil
}
return fmt.Errorf("%v", errs)
}
// Create a new collection.
func (db *DB) Create(name string) error {
db.schemaLock.Lock()
defer db.schemaLock.Unlock()
if _, exists := db.cols[name]; exists {
return fmt.Errorf("Collection %s already exists", name)
} else if err := os.MkdirAll(path.Join(db.path, name), 0700); err != nil {
return err
} else if db.cols[name], err = OpenCol(db, name); err != nil {
return err
}
return nil
}
// Return all collection names.
func (db *DB) AllCols() (ret []string) {
db.schemaLock.RLock()
defer db.schemaLock.RUnlock()
ret = make([]string, 0, len(db.cols))
for name, _ := range db.cols {
ret = append(ret, name)
}
return
}
// Use the return value to interact with collection. Return value may be nil if the collection does not exist.
func (db *DB) Use(name string) *Col {
db.schemaLock.RLock()
defer db.schemaLock.RUnlock()
if col, exists := db.cols[name]; exists {
return col
}
return nil
}
// Rename a collection.
func (db *DB) Rename(oldName, newName string) error {
db.schemaLock.Lock()
defer db.schemaLock.Unlock()
if _, exists := db.cols[oldName]; !exists {
return fmt.Errorf("Collection %s does not exist", oldName)
} else if _, exists := db.cols[newName]; exists {
return fmt.Errorf("Collection %s already exists", newName)
} else if newName == oldName {
return fmt.Errorf("Old and new names are the same")
} else if err := db.cols[oldName].close(); err != nil {
return err
} else if err := os.Rename(path.Join(db.path, oldName), path.Join(db.path, newName)); err != nil {
return err
} else if db.cols[newName], err = OpenCol(db, newName); err != nil {
return err
}
delete(db.cols, oldName)
return nil
}
// Truncate a collection - delete all documents and clear
func (db *DB) Truncate(name string) error {
db.schemaLock.Lock()
defer db.schemaLock.Unlock()
if _, exists := db.cols[name]; !exists {
return fmt.Errorf("Collection %s does not exist", name)
}
col := db.cols[name]
for i := 0; i < db.numParts; i++ {
if err := col.parts[i].Clear(); err != nil {
return err
}
for _, ht := range col.hts[i] {
if err := ht.Clear(); err != nil {
return err
}
}
}
return nil
}
// Scrub a collection - fix corrupted documents and de-fragment free space.
func (db *DB) Scrub(name string) error {
db.schemaLock.Lock()
defer db.schemaLock.Unlock()
if _, exists := db.cols[name]; !exists {
return fmt.Errorf("Collection %s does not exist", name)
}
// Prepare a temporary collection in file system
tmpColName := fmt.Sprintf("scrub-%s-%d", name, time.Now().UnixNano())
tmpColDir := path.Join(db.path, tmpColName)
if err := os.MkdirAll(tmpColDir, 0700); err != nil {
return err
}
// Mirror indexes from original collection
for _, idxPath := range db.cols[name].indexPaths {
if err := os.MkdirAll(path.Join(tmpColDir, strings.Join(idxPath, INDEX_PATH_SEP)), 0700); err != nil {
return err
}
}
// Iterate through all documents and put them into the temporary collection
tmpCol, err := OpenCol(db, tmpColName)
if err != nil {
return err
}
db.cols[name].forEachDoc(func(id int, doc []byte) bool {
var docObj map[string]interface{}
if err := json.Unmarshal([]byte(doc), &docObj); err != nil {
// Skip corrupted document
return true
}
if err := tmpCol.InsertRecovery(id, docObj); err != nil {
tdlog.Noticef("Scrub %s: failed to insert back document %v", name, docObj)
}
return true
}, false)
if err := tmpCol.close(); err != nil {
return err
}
// Replace the original collection with the "temporary" one
db.cols[name].close()
if err := os.RemoveAll(path.Join(db.path, name)); err != nil {
return err
}
if err := os.Rename(path.Join(db.path, tmpColName), path.Join(db.path, name)); err != nil {
return err
}
if db.cols[name], err = OpenCol(db, name); err != nil {
return err
}
return nil
}
// Drop a collection and lose all of its documents and indexes.
func (db *DB) Drop(name string) error {
db.schemaLock.Lock()
defer db.schemaLock.Unlock()
if _, exists := db.cols[name]; !exists {
return fmt.Errorf("Collection %s does not exist", name)
} else if err := db.cols[name].close(); err != nil {
return err
} else if err := os.RemoveAll(path.Join(db.path, name)); err != nil {
return err
}
delete(db.cols, name)
return nil
}
// Copy this database into destination directory (for backup).
func (db *DB) Dump(dest string) error {
db.schemaLock.Lock()
defer db.schemaLock.Unlock()
cpFun := func(currPath string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
relPath, err := filepath.Rel(db.path, currPath)
if err != nil {
return err
}
destDir := path.Join(dest, relPath)
if err := os.MkdirAll(destDir, 0700); err != nil {
return err
}
tdlog.Noticef("Dump: created directory %s", destDir)
} else {
src, err := os.Open(currPath)
if err != nil {
return err
}
relPath, err := filepath.Rel(db.path, currPath)
if err != nil {
return err
}
destPath := path.Join(dest, relPath)
if _, fileExists := os.Open(destPath); fileExists == nil {
return fmt.Errorf("Destination file %s already exists", destPath)
}
destFile, err := os.Create(destPath)
if err != nil {
return err
}
written, err := io.Copy(destFile, src)
if err != nil {
return err
}
tdlog.Noticef("Dump: copied file %s, size is %d", destPath, written)
}
return nil
}
return filepath.Walk(db.path, cpFun)
}

View File

@ -0,0 +1,255 @@
package db
import (
"io/ioutil"
"os"
"path"
"runtime"
"testing"
)
const (
TEST_DATA_DIR = "/tmp/tiedot_test"
)
func touchFile(dir, filename string) {
if err := os.MkdirAll(dir, 0700); err != nil {
panic(err)
}
if err := ioutil.WriteFile(path.Join(dir, filename), make([]byte, 0), 0600); err != nil {
panic(err)
}
}
func TestOpenEmptyDB(t *testing.T) {
os.RemoveAll(TEST_DATA_DIR)
defer os.RemoveAll(TEST_DATA_DIR)
db, err := OpenDB(TEST_DATA_DIR)
if err != nil {
t.Fatal(err)
}
if db.numParts != runtime.NumCPU() {
t.Fatal(db.numParts)
}
if err := db.Create("a"); err != nil {
t.Fatal(err)
}
if len(db.cols["a"].parts) != runtime.NumCPU() {
t.Fatal(err)
}
if err := db.Close(); err != nil {
t.Fatal(err)
}
}
func TestOpenErrDB(t *testing.T) {
os.RemoveAll(TEST_DATA_DIR)
defer os.RemoveAll(TEST_DATA_DIR)
if err := os.MkdirAll(TEST_DATA_DIR, 0700); err != nil {
t.Fatal(err)
}
touchFile(TEST_DATA_DIR+"/ColA", "dat_0")
touchFile(TEST_DATA_DIR+"/ColA/a!b!c", "0")
if db, err := OpenDB(TEST_DATA_DIR); err == nil {
t.Fatal("Did not error")
} else if err := db.Close(); err != nil {
t.Fatal(err)
}
}
func TestOpenCloseDB(t *testing.T) {
os.RemoveAll(TEST_DATA_DIR)
defer os.RemoveAll(TEST_DATA_DIR)
if err := os.MkdirAll(TEST_DATA_DIR, 0700); err != nil {
t.Fatal(err)
}
if err := ioutil.WriteFile(TEST_DATA_DIR+"/number_of_partitions", []byte("2"), 0600); err != nil {
t.Fatal(err)
}
touchFile(TEST_DATA_DIR+"/ColA", "dat_0")
touchFile(TEST_DATA_DIR+"/ColA/a!b!c", "0")
if err := os.MkdirAll(TEST_DATA_DIR+"/ColB", 0700); err != nil {
panic(err)
}
db, err := OpenDB(TEST_DATA_DIR)
if err != nil {
t.Fatal(err)
}
if db.path != TEST_DATA_DIR || db.numParts != 2 || db.cols["ColA"] == nil || db.cols["ColB"] == nil {
t.Fatal(db.cols)
}
colA := db.cols["ColA"]
colB := db.cols["ColB"]
if len(colA.parts) != 2 || len(colA.hts) != 2 {
t.Fatal(colA)
}
if colA.indexPaths["a!b!c"][0] != "a" || colA.indexPaths["a!b!c"][1] != "b" || colA.indexPaths["a!b!c"][2] != "c" {
t.Fatal(colA.indexPaths)
}
if colA.hts[0]["a!b!c"] == nil || colA.hts[1]["a!b!c"] == nil {
t.Fatal(colA.hts)
}
if len(colB.parts) != 2 || len(colB.hts) != 2 {
t.Fatal(colB)
}
if err := db.Close(); err != nil {
t.Fatal(err)
}
}
func TestColCrud(t *testing.T) {
os.RemoveAll(TEST_DATA_DIR)
defer os.RemoveAll(TEST_DATA_DIR)
if err := os.MkdirAll(TEST_DATA_DIR, 0700); err != nil {
t.Fatal(err)
}
if err := ioutil.WriteFile(TEST_DATA_DIR+"/number_of_partitions", []byte("2"), 0600); err != nil {
t.Fatal(err)
}
db, err := OpenDB(TEST_DATA_DIR)
if err != nil {
t.Fatal(err)
}
if len(db.AllCols()) != 0 {
t.Fatal(db.AllCols())
}
// Create
if err := db.Create("a"); err != nil {
t.Fatal(err)
}
if db.Create("a") == nil {
t.Fatal("Did not error")
}
if err := db.Create("b"); err != nil {
t.Fatal(err)
}
// Get all names & use
if allNames := db.AllCols(); len(allNames) != 2 || !(allNames[0] == "a" && allNames[1] == "b" || allNames[0] == "b" && allNames[1] == "a") {
t.Fatal(allNames)
}
if db.Use("a") == nil || db.Use("b") == nil || db.Use("abcde") != nil {
t.Fatal(db.cols)
}
// Rename
if db.Rename("a", "a") == nil {
t.Fatal("Did not error")
}
if db.Rename("a", "b") == nil {
t.Fatal("Did not error")
}
if db.Rename("abc", "b") == nil {
t.Fatal("Did not error")
}
if err := db.Rename("a", "c"); err != nil {
t.Fatal(err)
}
if err := db.Rename("b", "d"); err != nil {
t.Fatal(err)
}
// Rename - verify
if allNames := db.AllCols(); len(allNames) != 2 || !(allNames[0] == "d" && allNames[1] == "c" || allNames[0] == "c" && allNames[1] == "d") {
t.Fatal(allNames)
}
if db.Use("c") == nil || db.Use("d") == nil || db.Use("a") != nil {
t.Fatal(db.cols)
}
// Truncate
if db.Truncate("a") == nil {
t.Fatal("Did not error")
}
if err := db.Truncate("c"); err != nil {
t.Fatal(err)
}
if err := db.Truncate("d"); err != nil {
t.Fatal(err)
}
// Truncate - verify
if allNames := db.AllCols(); len(allNames) != 2 || !(allNames[0] == "d" && allNames[1] == "c" || allNames[0] == "c" && allNames[1] == "d") {
t.Fatal(allNames)
}
if db.Use("c") == nil || db.Use("d") == nil || db.Use("a") != nil {
t.Fatal(db.cols)
}
// Scrub
if err := db.Scrub("c"); err != nil {
t.Fatal(err)
}
// Scrub - verify
if allNames := db.AllCols(); len(allNames) != 2 || !(allNames[0] == "d" && allNames[1] == "c" || allNames[0] == "c" && allNames[1] == "d") {
t.Fatal(allNames)
}
if db.Use("c") == nil || db.Use("d") == nil || db.Use("a") != nil {
t.Fatal(db.cols)
}
// More scrub tests are in doc_test.go
// Drop
if db.Drop("a") == nil {
t.Fatal("Did not error")
}
if err := db.Drop("c"); err != nil {
t.Fatal(err)
}
if allNames := db.AllCols(); len(allNames) != 1 || allNames[0] != "d" {
t.Fatal(allNames)
}
if db.Use("d") == nil {
t.Fatal(db.cols)
}
if err := db.Drop("d"); err != nil {
t.Fatal(err)
}
if allNames := db.AllCols(); len(allNames) != 0 {
t.Fatal(allNames)
}
if db.Use("d") != nil {
t.Fatal(db.cols)
}
if err := db.Close(); err != nil {
t.Fatal(err)
}
}
func TestDumpDB(t *testing.T) {
os.RemoveAll(TEST_DATA_DIR)
os.RemoveAll(TEST_DATA_DIR + "bak")
defer os.RemoveAll(TEST_DATA_DIR)
defer os.RemoveAll(TEST_DATA_DIR + "bak")
if err := os.MkdirAll(TEST_DATA_DIR, 0700); err != nil {
t.Fatal(err)
}
if err := ioutil.WriteFile(TEST_DATA_DIR+"/number_of_partitions", []byte("2"), 0600); err != nil {
t.Fatal(err)
}
db, err := OpenDB(TEST_DATA_DIR)
if err != nil {
t.Fatal(err)
}
if err := db.Create("a"); err != nil {
t.Fatal(err)
} else if err := db.Create("b"); err != nil {
t.Fatal(err)
}
id1, err := db.Use("a").Insert(map[string]interface{}{"whatever": "1"})
if err != nil {
t.Fatal(err)
} else if err := db.Dump(TEST_DATA_DIR + "bak"); err != nil {
t.Fatal(err)
}
// Open the new database
db2, err := OpenDB(TEST_DATA_DIR + "bak")
if err != nil {
t.Fatal(err)
}
if allCols := db2.AllCols(); !(allCols[0] == "a" && allCols[1] == "b" || allCols[0] == "b" && allCols[1] == "a") {
t.Fatal(allCols)
}
if doc, err := db2.Use("a").Read(id1); err != nil || doc["whatever"].(string) != "1" {
t.Fatal(doc, err)
}
if err := db.Close(); err != nil {
t.Fatal(err)
}
if err := db2.Close(); err != nil {
t.Fatal(err)
}
}

View File

@ -0,0 +1,246 @@
/* Document management and index maintenance. */
package db
import (
"encoding/json"
"fmt"
"github.com/HouzuoGuo/tiedot/tdlog"
"math/rand"
)
// Resolve the attribute(s) in the document structure along the given path.
func GetIn(doc interface{}, path []string) (ret []interface{}) {
docMap, ok := doc.(map[string]interface{})
if !ok {
return
}
var thing interface{} = docMap
// Get into each path segment
for i, seg := range path {
if aMap, ok := thing.(map[string]interface{}); ok {
thing = aMap[seg]
} else if anArray, ok := thing.([]interface{}); ok {
for _, element := range anArray {
ret = append(ret, GetIn(element, path[i:])...)
}
return ret
} else {
return nil
}
}
switch thing.(type) {
case []interface{}:
return append(ret, thing.([]interface{})...)
default:
return append(ret, thing)
}
}
// Hash a string using sdbm algorithm.
func StrHash(str string) int {
var hash int
for _, c := range str {
hash = int(c) + (hash << 6) + (hash << 16) - hash
}
if hash < 0 {
return -hash
}
return hash
}
// Put a document on all user-created indexes.
func (col *Col) indexDoc(id int, doc map[string]interface{}) {
for idxName, idxPath := range col.indexPaths {
for _, idxVal := range GetIn(doc, idxPath) {
if idxVal != nil {
hashKey := StrHash(fmt.Sprint(idxVal))
partNum := hashKey % col.db.numParts
ht := col.hts[partNum][idxName]
ht.Lock.Lock()
ht.Put(hashKey, id)
ht.Lock.Unlock()
}
}
}
}
// Remove a document from all user-created indexes.
func (col *Col) unindexDoc(id int, doc map[string]interface{}) {
for idxName, idxPath := range col.indexPaths {
for _, idxVal := range GetIn(doc, idxPath) {
if idxVal != nil {
hashKey := StrHash(fmt.Sprint(idxVal))
partNum := hashKey % col.db.numParts
ht := col.hts[partNum][idxName]
ht.Lock.Lock()
ht.Remove(hashKey, id)
ht.Lock.Unlock()
}
}
}
}
// Insert a document with the specified ID into the collection (incl. index). Does not place partition/schema lock.
func (col *Col) InsertRecovery(id int, doc map[string]interface{}) (err error) {
docJS, err := json.Marshal(doc)
if err != nil {
return
}
partNum := id % col.db.numParts
part := col.parts[partNum]
// Put document data into collection
if _, err = part.Insert(id, []byte(docJS)); err != nil {
return
}
// Index the document
col.indexDoc(id, doc)
return
}
// Insert a document into the collection.
func (col *Col) Insert(doc map[string]interface{}) (id int, err error) {
docJS, err := json.Marshal(doc)
if err != nil {
return
}
id = rand.Int()
partNum := id % col.db.numParts
col.db.schemaLock.RLock()
part := col.parts[partNum]
// Put document data into collection
part.Lock.Lock()
if _, err = part.Insert(id, []byte(docJS)); err != nil {
part.Lock.Unlock()
col.db.schemaLock.RUnlock()
return
}
// If another thread is updating the document in the meanwhile, let it take over index maintenance
if err = part.LockUpdate(id); err != nil {
part.Lock.Unlock()
col.db.schemaLock.RUnlock()
return id, nil
}
part.Lock.Unlock()
// Index the document
col.indexDoc(id, doc)
part.Lock.Lock()
part.UnlockUpdate(id)
part.Lock.Unlock()
col.db.schemaLock.RUnlock()
return
}
func (col *Col) read(id int, placeSchemaLock bool) (doc map[string]interface{}, err error) {
if placeSchemaLock {
col.db.schemaLock.RLock()
}
part := col.parts[id%col.db.numParts]
part.Lock.RLock()
docB, err := part.Read(id)
part.Lock.RUnlock()
if err != nil {
if placeSchemaLock {
col.db.schemaLock.RUnlock()
}
return
}
err = json.Unmarshal(docB, &doc)
if placeSchemaLock {
col.db.schemaLock.RUnlock()
}
return
}
// Find and retrieve a document by ID.
func (col *Col) Read(id int) (doc map[string]interface{}, err error) {
return col.read(id, true)
}
// Update a document.
func (col *Col) Update(id int, doc map[string]interface{}) error {
if doc == nil {
return fmt.Errorf("Updating %d: input doc may not be nil", id)
}
docJS, err := json.Marshal(doc)
if err != nil {
return err
}
col.db.schemaLock.RLock()
part := col.parts[id%col.db.numParts]
part.Lock.Lock()
// Place lock, read back original document and update
if err := part.LockUpdate(id); err != nil {
part.Lock.Unlock()
col.db.schemaLock.RUnlock()
return err
}
originalB, err := part.Read(id)
if err != nil {
part.UnlockUpdate(id)
part.Lock.Unlock()
col.db.schemaLock.RUnlock()
return err
}
var original map[string]interface{}
if err = json.Unmarshal(originalB, &original); err != nil {
tdlog.Noticef("Will not attempt to unindex document %d during update", id)
}
if err = part.Update(id, []byte(docJS)); err != nil {
part.UnlockUpdate(id)
part.Lock.Unlock()
col.db.schemaLock.RUnlock()
return err
}
// Done with the collection data, next is to maintain indexed values
part.Lock.Unlock()
if original != nil {
col.unindexDoc(id, original)
}
col.indexDoc(id, doc)
// Done with the document
part.Lock.Lock()
part.UnlockUpdate(id)
part.Lock.Unlock()
col.db.schemaLock.RUnlock()
return nil
}
// Delete a document.
func (col *Col) Delete(id int) error {
col.db.schemaLock.RLock()
part := col.parts[id%col.db.numParts]
part.Lock.Lock()
// Place lock, read back original document and delete document
if err := part.LockUpdate(id); err != nil {
part.Lock.Unlock()
col.db.schemaLock.RUnlock()
return err
}
originalB, err := part.Read(id)
if err != nil {
part.UnlockUpdate(id)
part.Lock.Unlock()
col.db.schemaLock.RUnlock()
return err
}
var original map[string]interface{}
if err = json.Unmarshal(originalB, &original); err != nil {
tdlog.Noticef("Will not attempt to unindex document %d during delete", id)
}
if err = part.Delete(id); err != nil {
part.UnlockUpdate(id)
part.Lock.Unlock()
col.db.schemaLock.RUnlock()
return err
}
// Done with the collection data, next is to remove indexed values
part.Lock.Unlock()
if original != nil {
col.unindexDoc(id, original)
}
part.Lock.Lock()
part.UnlockUpdate(id)
part.Lock.Unlock()
col.db.schemaLock.RUnlock()
return nil
}

View File

@ -0,0 +1,279 @@
package db
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"strings"
"testing"
"time"
"github.com/HouzuoGuo/tiedot/dberr"
)
func StrHashTest(t *testing.T) {
strings := []string{"", " ", "abc", "123"}
hashes := []int{0, 32, 417419622498, 210861491250}
for i := range strings {
if StrHash(strings[i]) != hashes[i] {
t.Fatalf("Hash of %s equals to %d, it should equal to %d", strings[i], StrHash(strings[i]), hashes[i])
}
}
}
func GetInTest(t *testing.T) {
var obj interface{}
// Get inside a JSON object
json.Unmarshal([]byte(`{"a": {"b": {"c": 1}}}`), &obj)
if val, ok := GetIn(obj, []string{"a", "b", "c"})[0].(float64); !ok || val != 1 {
t.Fatal()
}
// Get inside a JSON array
json.Unmarshal([]byte(`{"a": {"b": {"c": [1, 2, 3]}}}`), &obj)
if val, ok := GetIn(obj, []string{"a", "b", "c"})[0].(float64); !ok || val != 1 {
t.Fatal()
}
if val, ok := GetIn(obj, []string{"a", "b", "c"})[1].(float64); !ok || val != 2 {
t.Fatal()
}
if val, ok := GetIn(obj, []string{"a", "b", "c"})[2].(float64); !ok || val != 3 {
t.Fatal()
}
// Get inside JSON objects contained in JSON array
json.Unmarshal([]byte(`{"a": [{"b": {"c": [1]}}, {"b": {"c": [2, 3]}}]}`), &obj)
if val, ok := GetIn(obj, []string{"a", "b", "c"})[0].(float64); !ok || val != 1 {
t.Fatal()
}
if val, ok := GetIn(obj, []string{"a", "b", "c"})[1].(float64); !ok || val != 2 {
t.Fatal()
}
if val, ok := GetIn(obj, []string{"a", "b", "c"})[2].(float64); !ok || val != 3 {
t.Fatal()
}
// Get inside a JSON array and fetch attributes from array elements, which are JSON objects
json.Unmarshal([]byte(`{"a": [{"b": {"c": [4]}}, {"b": {"c": [5, 6]}}], "d": [0, 9]}`), &obj)
if val, ok := GetIn(obj, []string{"a", "b", "c"})[0].(float64); !ok || val != 4 {
t.Fatal()
}
if val, ok := GetIn(obj, []string{"a", "b", "c"})[1].(float64); !ok || val != 5 {
t.Fatal()
}
if val, ok := GetIn(obj, []string{"a", "b", "c"})[2].(float64); !ok || val != 6 {
t.Fatal()
}
if len(GetIn(obj, []string{"a", "b", "c"})) != 3 {
t.Fatal()
}
if val, ok := GetIn(obj, []string{"d"})[0].(float64); !ok || val != 0 {
t.Fatal()
}
if val, ok := GetIn(obj, []string{"d"})[1].(float64); !ok || val != 9 {
t.Fatal()
}
if len(GetIn(obj, []string{"d"})) != 2 {
t.Fatal()
}
// Another example
json.Unmarshal([]byte(`{"a": {"b": [{"c": 2}]}, "d": 0}`), &obj)
if val, ok := GetIn(obj, []string{"a", "b", "c"})[0].(float64); !ok || val != 2 {
t.Fatal()
}
if len(GetIn(obj, []string{"a", "b", "c"})) != 1 {
t.Fatal()
}
}
func idxHas(col *Col, path []string, idxVal interface{}, docID int) error {
idxName := strings.Join(path, INDEX_PATH_SEP)
hashKey := StrHash(fmt.Sprint(idxVal))
vals := col.hts[hashKey%col.db.numParts][idxName].Get(hashKey, 0)
if len(vals) != 1 || vals[0] != docID {
return fmt.Errorf("Looking for %v (%v) docID %v in %v partition %d, but got result %v", idxVal, hashKey, docID, path, hashKey%col.db.numParts, vals)
}
return nil
}
func idxHasNot(col *Col, path []string, idxVal, docID int) error {
idxName := strings.Join(path, INDEX_PATH_SEP)
hashKey := StrHash(fmt.Sprint(idxVal))
vals := col.hts[hashKey%col.db.numParts][idxName].Get(hashKey, 0)
for _, v := range vals {
if v == docID {
return fmt.Errorf("Looking for %v %v %v in %v (should not return any), but got result %v", idxVal, hashKey, docID, path, vals)
}
}
return nil
}
func TestDocCrudAndIdx(t *testing.T) {
os.RemoveAll(TEST_DATA_DIR)
defer os.RemoveAll(TEST_DATA_DIR)
if err := os.MkdirAll(TEST_DATA_DIR, 0700); err != nil {
t.Fatal(err)
}
if err := ioutil.WriteFile(TEST_DATA_DIR+"/number_of_partitions", []byte("2"), 0600); err != nil {
t.Fatal(err)
}
db, err := OpenDB(TEST_DATA_DIR)
if err != nil {
t.Fatal(err)
}
// Prepare collection and index
if err = db.Create("col"); err != nil {
t.Fatal(err)
}
col := db.Use("col")
if err = col.Index([]string{"a", "b"}); err != nil {
t.Fatal(err)
}
numDocs := 2011
docIDs := make([]int, numDocs)
// Insert documents
for i := 0; i < numDocs; i++ {
if docIDs[i], err = col.Insert(map[string]interface{}{"a": map[string]interface{}{"b": i}}); err != nil {
t.Fatal(err)
}
}
// Read documents and verify index
if _, err = col.Read(123456); dberr.Type(err) != dberr.ErrorNoDoc {
t.Fatal("Did not error")
}
for i, docID := range docIDs {
if doc, err := col.Read(docID); err != nil || doc["a"].(map[string]interface{})["b"].(float64) != float64(i) {
t.Fatal(docID, doc)
}
if err = idxHas(col, []string{"a", "b"}, i, docID); err != nil {
t.Fatal(err)
}
}
// Update document
if err = col.Update(654321, map[string]interface{}{}); dberr.Type(err) != dberr.ErrorNoDoc {
t.Fatal("Did not error")
}
for i, docID := range docIDs {
// i -> i * 2
if err = col.Update(docID, map[string]interface{}{"a": map[string]interface{}{"b": i * 2}}); err != nil {
t.Fatal(err)
}
}
// After update - verify documents and index
for i, docID := range docIDs {
if doc, err := col.Read(docID); err != nil || doc["a"].(map[string]interface{})["b"].(float64) != float64(i*2) {
t.Fatal(docID, doc)
}
if i == 0 {
if err = idxHas(col, []string{"a", "b"}, 0, docID); err != nil {
t.Fatal(err)
}
} else {
if err = idxHasNot(col, []string{"a", "b"}, i, docID); err != nil {
t.Fatal(err)
}
if err = idxHas(col, []string{"a", "b"}, i*2, docID); err != nil {
t.Fatal(err)
}
}
}
// Delete half of those documents
if err = col.Delete(654321); dberr.Type(err) != dberr.ErrorNoDoc {
t.Fatal("Did not error")
}
for i := 0; i < numDocs/2+1; i++ {
if err := col.Delete(docIDs[i]); err != nil {
t.Fatal(err)
}
if err := col.Delete(docIDs[i]); dberr.Type(err) != dberr.ErrorNoDoc {
t.Fatal("Did not error")
}
}
// After delete - verify
for i, docID := range docIDs {
if i < numDocs/2+1 {
// After delete - verify deleted documents and index
if _, err := col.Read(docID); dberr.Type(err) != dberr.ErrorNoDoc {
t.Fatal("Did not delete", i, docID)
}
if err = idxHasNot(col, []string{"a", "b"}, i*2, docID); err != nil {
t.Fatal(err)
}
} else {
// After delete - verify unaffected documents and index
if doc, err := col.Read(docID); err != nil || doc["a"].(map[string]interface{})["b"].(float64) != float64(i*2) {
t.Fatal(docID, doc)
}
if err = idxHas(col, []string{"a", "b"}, i*2, docID); err != nil {
t.Fatal(err)
}
}
}
// Recreate index and verify
if err = col.Unindex([]string{"a", "b"}); err != nil {
t.Fatal(err)
}
if err = col.Index([]string{"a", "b"}); err != nil {
t.Fatal(err)
}
for i := numDocs/2 + 1; i < numDocs; i++ {
if doc, err := col.Read(docIDs[i]); err != nil || doc["a"].(map[string]interface{})["b"].(float64) != float64(i*2) {
t.Fatal(doc, err)
}
if err = idxHas(col, []string{"a", "b"}, i*2, docIDs[i]); err != nil {
t.Fatal(err)
}
}
// Verify that there are approximately 1000 documents
t.Log("ApproxDocCount", col.ApproxDocCount())
if col.ApproxDocCount() < 600 || col.ApproxDocCount() > 1400 {
t.Fatal("Approximate is way off", col.ApproxDocCount())
}
// Scrub and verify
if err = db.Scrub("col"); err != nil {
t.Fatal(err)
}
col = db.Use("col")
for i := numDocs/2 + 1; i < numDocs; i++ {
if doc, err := col.Read(docIDs[i]); err != nil || doc["a"].(map[string]interface{})["b"].(float64) != float64(i*2) {
t.Fatal(doc, err)
}
if err = idxHas(col, []string{"a", "b"}, i*2, docIDs[i]); err != nil {
t.Fatal(err)
}
}
// Iterate over all documents 10 times
start := time.Now().UnixNano()
for i := 0; i < 10; i++ {
col.ForEachDoc(func(_ int, _ []byte) bool {
return true
})
}
timediff := time.Now().UnixNano() - start
t.Log("It took", timediff/1000000, "milliseconds")
// Verify again that there are approximately 1000 documents
t.Log("ApproxDocCount", col.ApproxDocCount())
if col.ApproxDocCount() < 600 || col.ApproxDocCount() > 1400 {
t.Fatal("Approximate is way off", col.ApproxDocCount())
}
// Read back all documents page by pabe
totalPage := col.ApproxDocCount() / 100
collectedIDs := make(map[int]struct{})
for page := 0; page < totalPage; page++ {
col.ForEachDocInPage(page, totalPage, func(id int, _ []byte) bool {
collectedIDs[id] = struct{}{}
return true
})
t.Log("Went through page ", page, " got ", len(collectedIDs), " documents so far")
}
if len(collectedIDs) != numDocs/2 {
t.Fatal("Wrong number of docs", len(collectedIDs))
}
if err = db.Close(); err != nil {
t.Fatal(err)
}
}

View File

@ -0,0 +1,68 @@
package db
import (
"io/ioutil"
"os"
"strings"
"testing"
)
func TestIdxCRUD(t *testing.T) {
os.RemoveAll(TEST_DATA_DIR)
defer os.RemoveAll(TEST_DATA_DIR)
if err := os.MkdirAll(TEST_DATA_DIR, 0700); err != nil {
t.Fatal(err)
}
if err := ioutil.WriteFile(TEST_DATA_DIR+"/number_of_partitions", []byte("2"), 0600); err != nil {
t.Fatal(err)
}
db, err := OpenDB(TEST_DATA_DIR)
if err != nil {
t.Fatal(err)
}
if err = db.Create("col"); err != nil {
t.Fatal(err)
}
col := db.Use("col")
if len(col.AllIndexes()) != 0 {
t.Fatal(col.AllIndexes())
}
// Create index & verify
if err = col.Index([]string{"a", "b"}); err != nil {
t.Fatal(err)
}
if col.Index([]string{"a", "b"}) == nil {
t.Fatal(col.indexPaths, "Did not error")
}
if len(col.AllIndexes()) != 1 || col.AllIndexes()[0][0] != "a" || col.AllIndexes()[0][1] != "b" {
t.Fatal(col.AllIndexes())
}
if err = col.Index([]string{"c"}); err != nil {
t.Fatal(err)
}
allIndexes := col.AllIndexes()
idx0 := strings.Join(allIndexes[0], ",")
idx1 := strings.Join(allIndexes[1], ",")
if !(idx0 == "a,b" && idx1 == "c" || idx0 == "c" && idx1 == "a,b") {
t.Fatal(allIndexes)
}
// Unindex & verify
if col.Unindex([]string{"%&^*"}) == nil {
t.Fatal("Did not error")
}
if err = col.Unindex([]string{"c"}); err != nil {
t.Fatal(err)
}
if len(col.AllIndexes()) != 1 || col.AllIndexes()[0][0] != "a" || col.AllIndexes()[0][1] != "b" {
t.Fatal(col.AllIndexes())
}
if err = col.Unindex([]string{"a", "b"}); err != nil {
t.Fatal(err)
}
if len(col.AllIndexes()) != 0 {
t.Fatal(col.AllIndexes())
}
if err := db.Close(); err != nil {
t.Fatal(err)
}
}

View File

@ -0,0 +1,343 @@
/* Query processor. */
package db
import (
"errors"
"fmt"
"strconv"
"strings"
"github.com/HouzuoGuo/tiedot/dberr"
"github.com/HouzuoGuo/tiedot/tdlog"
)
// Calculate union of sub-query results.
func EvalUnion(exprs []interface{}, src *Col, result *map[int]struct{}) (err error) {
for _, subExpr := range exprs {
if err = evalQuery(subExpr, src, result, false); err != nil {
return
}
}
return
}
// Put all document IDs into result.
func EvalAllIDs(src *Col, result *map[int]struct{}) (err error) {
src.forEachDoc(func(id int, _ []byte) bool {
(*result)[id] = struct{}{}
return true
}, false)
return
}
// Value equity check ("attribute == value") using hash lookup.
func Lookup(lookupValue interface{}, expr map[string]interface{}, src *Col, result *map[int]struct{}) (err error) {
// Figure out lookup path - JSON array "in"
path, hasPath := expr["in"]
if !hasPath {
return errors.New("Missing lookup path `in`")
}
vecPath := make([]string, 0)
if vecPathInterface, ok := path.([]interface{}); ok {
for _, v := range vecPathInterface {
vecPath = append(vecPath, fmt.Sprint(v))
}
} else {
return errors.New(fmt.Sprintf("Expecting vector lookup path `in`, but %v given", path))
}
// Figure out result number limit
intLimit := int(0)
if limit, hasLimit := expr["limit"]; hasLimit {
if floatLimit, ok := limit.(float64); ok {
intLimit = int(floatLimit)
} else if _, ok := limit.(int); ok {
intLimit = limit.(int)
} else {
return dberr.New(dberr.ErrorExpectingInt, "limit", limit)
}
}
lookupStrValue := fmt.Sprint(lookupValue) // the value to look for
lookupValueHash := StrHash(lookupStrValue)
scanPath := strings.Join(vecPath, INDEX_PATH_SEP)
if _, indexed := src.indexPaths[scanPath]; !indexed {
return dberr.New(dberr.ErrorNeedIndex, scanPath, expr)
}
num := lookupValueHash % src.db.numParts
ht := src.hts[num][scanPath]
ht.Lock.RLock()
vals := ht.Get(lookupValueHash, intLimit)
ht.Lock.RUnlock()
for _, match := range vals {
// Filter result to avoid hash collision
if doc, err := src.read(match, false); err == nil {
for _, v := range GetIn(doc, vecPath) {
if fmt.Sprint(v) == lookupStrValue {
(*result)[match] = struct{}{}
}
}
}
}
return
}
// Value existence check (value != nil) using hash lookup.
func PathExistence(hasPath interface{}, expr map[string]interface{}, src *Col, result *map[int]struct{}) (err error) {
// Figure out the path
vecPath := make([]string, 0)
if vecPathInterface, ok := hasPath.([]interface{}); ok {
for _, v := range vecPathInterface {
vecPath = append(vecPath, fmt.Sprint(v))
}
} else {
return errors.New(fmt.Sprintf("Expecting vector path, but %v given", hasPath))
}
// Figure out result number limit
intLimit := 0
if limit, hasLimit := expr["limit"]; hasLimit {
if floatLimit, ok := limit.(float64); ok {
intLimit = int(floatLimit)
} else if _, ok := limit.(int); ok {
intLimit = limit.(int)
} else {
return dberr.New(dberr.ErrorExpectingInt, "limit", limit)
}
}
jointPath := strings.Join(vecPath, INDEX_PATH_SEP)
if _, indexed := src.indexPaths[jointPath]; !indexed {
return dberr.New(dberr.ErrorNeedIndex, vecPath, expr)
}
counter := 0
partDiv := src.approxDocCount(false) / src.db.numParts / 4000 // collect approx. 4k document IDs in each iteration
if partDiv == 0 {
partDiv++
}
for iteratePart := 0; iteratePart < src.db.numParts; iteratePart++ {
ht := src.hts[iteratePart][jointPath]
ht.Lock.RLock()
for i := 0; i < partDiv; i++ {
_, ids := ht.GetPartition(i, partDiv)
for _, id := range ids {
(*result)[id] = struct{}{}
counter++
if counter == intLimit {
ht.Lock.RUnlock()
return nil
}
}
}
ht.Lock.RUnlock()
}
return nil
}
// Calculate intersection of sub-query results.
func Intersect(subExprs interface{}, src *Col, result *map[int]struct{}) (err error) {
myResult := make(map[int]struct{})
if subExprVecs, ok := subExprs.([]interface{}); ok {
first := true
for _, subExpr := range subExprVecs {
subResult := make(map[int]struct{})
intersection := make(map[int]struct{})
if err = evalQuery(subExpr, src, &subResult, false); err != nil {
return
}
if first {
myResult = subResult
first = false
} else {
for k, _ := range subResult {
if _, inBoth := myResult[k]; inBoth {
intersection[k] = struct{}{}
}
}
myResult = intersection
}
}
for docID := range myResult {
(*result)[docID] = struct{}{}
}
} else {
return dberr.New(dberr.ErrorExpectingSubQuery, subExprs)
}
return
}
// Calculate complement of sub-query results.
func Complement(subExprs interface{}, src *Col, result *map[int]struct{}) (err error) {
myResult := make(map[int]struct{})
if subExprVecs, ok := subExprs.([]interface{}); ok {
for _, subExpr := range subExprVecs {
subResult := make(map[int]struct{})
complement := make(map[int]struct{})
if err = evalQuery(subExpr, src, &subResult, false); err != nil {
return
}
for k, _ := range subResult {
if _, inBoth := myResult[k]; !inBoth {
complement[k] = struct{}{}
}
}
for k, _ := range myResult {
if _, inBoth := subResult[k]; !inBoth {
complement[k] = struct{}{}
}
}
myResult = complement
}
for docID := range myResult {
(*result)[docID] = struct{}{}
}
} else {
return dberr.New(dberr.ErrorExpectingSubQuery, subExprs)
}
return
}
func (col *Col) hashScan(idxName string, key, limit int) []int {
ht := col.hts[key%col.db.numParts][idxName]
ht.Lock.RLock()
vals := ht.Get(key, limit)
ht.Lock.RUnlock()
return vals
}
// Look for indexed integer values within the specified integer range.
func IntRange(intFrom interface{}, expr map[string]interface{}, src *Col, result *map[int]struct{}) (err error) {
path, hasPath := expr["in"]
if !hasPath {
return errors.New("Missing path `in`")
}
// Figure out the path
vecPath := make([]string, 0)
if vecPathInterface, ok := path.([]interface{}); ok {
for _, v := range vecPathInterface {
vecPath = append(vecPath, fmt.Sprint(v))
}
} else {
return errors.New(fmt.Sprintf("Expecting vector path `in`, but %v given", path))
}
// Figure out result number limit
intLimit := int(0)
if limit, hasLimit := expr["limit"]; hasLimit {
if floatLimit, ok := limit.(float64); ok {
intLimit = int(floatLimit)
} else if _, ok := limit.(int); ok {
intLimit = limit.(int)
} else {
return dberr.New(dberr.ErrorExpectingInt, limit)
}
}
// Figure out the range ("from" value & "to" value)
from, to := int(0), int(0)
if floatFrom, ok := intFrom.(float64); ok {
from = int(floatFrom)
} else if _, ok := intFrom.(int); ok {
from = intFrom.(int)
} else {
return dberr.New(dberr.ErrorExpectingInt, "int-from", from)
}
if intTo, ok := expr["int-to"]; ok {
if floatTo, ok := intTo.(float64); ok {
to = int(floatTo)
} else if _, ok := intTo.(int); ok {
to = intTo.(int)
} else {
return dberr.New(dberr.ErrorExpectingInt, "int-to", to)
}
} else if intTo, ok := expr["int to"]; ok {
if floatTo, ok := intTo.(float64); ok {
to = int(floatTo)
} else if _, ok := intTo.(int); ok {
to = intTo.(int)
} else {
return dberr.New(dberr.ErrorExpectingInt, "int to", to)
}
} else {
return dberr.New(dberr.ErrorMissing, "int-to")
}
if to > from && to-from > 1000 || from > to && from-to > 1000 {
tdlog.CritNoRepeat("Query %v involves index lookup on more than 1000 values, which can be very inefficient", expr)
}
counter := int(0) // Number of results already collected
htPath := strings.Join(vecPath, ",")
if _, indexScan := src.indexPaths[htPath]; !indexScan {
return dberr.New(dberr.ErrorNeedIndex, vecPath, expr)
}
if from < to {
// Forward scan - from low value to high value
for lookupValue := from; lookupValue <= to; lookupValue++ {
lookupStrValue := fmt.Sprint(lookupValue)
hashValue := StrHash(lookupStrValue)
vals := src.hashScan(htPath, hashValue, int(intLimit))
for _, docID := range vals {
if intLimit > 0 && counter == intLimit {
break
}
counter += 1
(*result)[docID] = struct{}{}
}
}
} else {
// Backward scan - from high value to low value
for lookupValue := from; lookupValue >= to; lookupValue-- {
lookupStrValue := fmt.Sprint(lookupValue)
hashValue := StrHash(lookupStrValue)
vals := src.hashScan(htPath, hashValue, int(intLimit))
for _, docID := range vals {
if intLimit > 0 && counter == intLimit {
break
}
counter += 1
(*result)[docID] = struct{}{}
}
}
}
return
}
func evalQuery(q interface{}, src *Col, result *map[int]struct{}, placeSchemaLock bool) (err error) {
if placeSchemaLock {
src.db.schemaLock.RLock()
defer src.db.schemaLock.RUnlock()
}
switch expr := q.(type) {
case []interface{}: // [sub query 1, sub query 2, etc]
return EvalUnion(expr, src, result)
case string:
if expr == "all" {
return EvalAllIDs(src, result)
} else {
// Might be single document number
docID, err := strconv.ParseInt(expr, 10, 64)
if err != nil {
return dberr.New(dberr.ErrorExpectingInt, "Single Document ID", docID)
}
(*result)[int(docID)] = struct{}{}
}
case map[string]interface{}:
if lookupValue, lookup := expr["eq"]; lookup { // eq - lookup
return Lookup(lookupValue, expr, src, result)
} else if hasPath, exist := expr["has"]; exist { // has - path existence test
return PathExistence(hasPath, expr, src, result)
} else if subExprs, intersect := expr["n"]; intersect { // n - intersection
return Intersect(subExprs, src, result)
} else if subExprs, complement := expr["c"]; complement { // c - complement
return Complement(subExprs, src, result)
} else if intFrom, htRange := expr["int-from"]; htRange { // int-from, int-to - integer range query
return IntRange(intFrom, expr, src, result)
} else if intFrom, htRange := expr["int from"]; htRange { // "int from, "int to" - integer range query - same as above, just without dash
return IntRange(intFrom, expr, src, result)
} else {
return errors.New(fmt.Sprintf("Query %v does not contain any operation (lookup/union/etc)", expr))
}
}
return nil
}
// Main entrance to query processor - evaluate a query and put result into result map (as map keys).
func EvalQuery(q interface{}, src *Col, result *map[int]struct{}) (err error) {
return evalQuery(q, src, result, true)
}
// TODO: How to bring back regex matcher?
// TODO: How to bring back JSON parameterized query?

View File

@ -0,0 +1,279 @@
package db
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"testing"
"github.com/HouzuoGuo/tiedot/dberr"
)
func ensureMapHasKeys(m map[int]struct{}, keys ...int) bool {
if len(m) != len(keys) {
return false
}
for _, v := range keys {
if _, ok := m[v]; !ok {
return false
}
}
return true
}
func runQuery(query string, col *Col) (map[int]struct{}, error) {
result := make(map[int]struct{})
var jq interface{}
if err := json.Unmarshal([]byte(query), &jq); err != nil {
fmt.Println(err)
}
return result, EvalQuery(jq, col, &result)
}
func TestQuery(t *testing.T) {
os.RemoveAll(TEST_DATA_DIR)
defer os.RemoveAll(TEST_DATA_DIR)
if err := os.MkdirAll(TEST_DATA_DIR, 0700); err != nil {
t.Fatal(err)
}
if err := ioutil.WriteFile(TEST_DATA_DIR+"/number_of_partitions", []byte("2"), 0600); err != nil {
t.Fatal(err)
}
db, err := OpenDB(TEST_DATA_DIR)
if err != nil {
t.Fatal(err)
}
defer db.Close()
// Prepare collection and index
if err = db.Create("col"); err != nil {
t.Fatal(err)
}
col := db.Use("col")
docs := []string{
`{"a": {"b": [1]}, "c": 1, "d": 1, "f": 1, "g": 1, "special": {"thing": null}, "h": 1}`,
`{"a": {"b": 1}, "c": [1], "d": 2, "f": 2, "g": 2}`,
`{"a": [{"b": [2]}], "c": 2, "d": 1, "f": 3, "g": 3, "h": 3}`,
`{"a": {"b": 3}, "c": [3], "d": 2, "f": 4, "g": 4}`,
`{"a": {"b": [4]}, "c": 4, "d": 1, "f": 5, "g": 5}`,
`{"a": [{"b": 5}, {"b": 6}], "c": 4, "d": 1, "f": 5, "g": 5, "h": 2}`,
`{"a": [{"b": "val1"}, {"b": "val2"}]}`,
`{"a": [{"b": "val3"}, {"b": ["val4", "val5"]}]}`}
ids := make([]int, len(docs))
for i, doc := range docs {
var jsonDoc map[string]interface{}
if err := json.Unmarshal([]byte(doc), &jsonDoc); err != nil {
panic(err)
}
if ids[i], err = col.Insert(jsonDoc); err != nil {
t.Fatal(err)
return
}
}
q, err := runQuery(`["all"]`, col)
if err != nil {
t.Fatal(err)
}
col.Index([]string{"a", "b"})
col.Index([]string{"f"})
col.Index([]string{"h"})
col.Index([]string{"special"})
col.Index([]string{"e"})
// expand numbers
q, err = runQuery(`["1", "2", ["3", "4"], "5"]`, col)
if err != nil {
t.Fatal(err)
}
if !ensureMapHasKeys(q, 1, 2, 3, 4, 5) {
t.Fatal(q)
}
// hash scan
q, err = runQuery(`{"eq": 1, "in": ["a", "b"]}`, col)
if err != nil {
t.Fatal(err)
}
if !ensureMapHasKeys(q, ids[0], ids[1]) {
t.Fatal(q)
}
q, err = runQuery(`{"eq": 5, "in": ["a", "b"]}`, col)
if err != nil {
t.Fatal(err)
}
if !ensureMapHasKeys(q, ids[5]) {
t.Fatal(q)
}
q, err = runQuery(`{"eq": 6, "in": ["a", "b"]}`, col)
if err != nil {
t.Fatal(err)
}
if !ensureMapHasKeys(q, ids[5]) {
t.Fatal(q)
}
q, err = runQuery(`{"eq": 1, "limit": 1, "in": ["a", "b"]}`, col)
if err != nil {
fmt.Println(err)
}
if !ensureMapHasKeys(q, ids[1]) && !ensureMapHasKeys(q, ids[0]) {
t.Fatal(q, ids[1], ids[0])
}
// collection scan
q, err = runQuery(`{"eq": 1, "in": ["c"]}`, col)
if dberr.Type(err) != dberr.ErrorNeedIndex {
t.Fatal("Collection scan should not happen")
}
// lookup on "special" (null)
q, err = runQuery(`{"eq": {"thing": null}, "in": ["special"]}`, col)
if err != nil {
t.Fatal(err)
}
if !ensureMapHasKeys(q, ids[0]) {
t.Fatal(q)
}
// lookup in list
q, err = runQuery(`{"eq": "val1", "in": ["a", "b"]}`, col)
if err != nil {
t.Fatal(err)
}
if !ensureMapHasKeys(q, ids[6]) {
t.Fatal(q)
}
q, err = runQuery(`{"eq": "val5", "in": ["a", "b"]}`, col)
if err != nil {
t.Fatal(err)
}
if !ensureMapHasKeys(q, ids[7]) {
t.Fatal(q)
}
// "e" should not exist
q, err = runQuery(`{"has": ["e"]}`, col)
if err != nil {
t.Fatal(err)
}
if !ensureMapHasKeys(q) {
t.Fatal(q)
}
// existence test, hash scan, with limit
q, err = runQuery(`{"has": ["h"], "limit": 2}`, col)
if err != nil {
t.Fatal(err)
}
if !ensureMapHasKeys(q, ids[0], ids[2]) && !ensureMapHasKeys(q, ids[2], ids[5]) && !ensureMapHasKeys(q, ids[5], ids[0]) {
t.Fatal(q, ids[0], ids[1], ids[2])
}
// existence test with incorrect input
q, err = runQuery(`{"has": ["c"], "limit": "a"}`, col)
if dberr.Type(err) != dberr.ErrorExpectingInt {
t.Fatal(err)
}
// existence test, collection scan & PK
q, err = runQuery(`{"has": ["c"], "limit": 2}`, col)
if dberr.Type(err) != dberr.ErrorNeedIndex {
t.Fatal("Existence test should return error")
}
q, err = runQuery(`{"has": ["@id"], "limit": 2}`, col)
if dberr.Type(err) != dberr.ErrorNeedIndex {
t.Fatal("Existence test should return error")
}
// int range scan with incorrect input
q, err = runQuery(`{"int-from": "a", "int-to": 4, "in": ["f"], "limit": 1}`, col)
if dberr.Type(err) != dberr.ErrorExpectingInt {
t.Fatal(err)
}
q, err = runQuery(`{"int-from": 1, "int-to": "a", "in": ["f"], "limit": 1}`, col)
if dberr.Type(err) != dberr.ErrorExpectingInt {
t.Fatal(err)
}
q, err = runQuery(`{"int-from": 1, "int-to": 2, "in": ["f"], "limit": "a"}`, col)
if dberr.Type(err) != dberr.ErrorExpectingInt {
t.Fatal(err)
}
// int range scan
q, err = runQuery(`{"int-from": 2, "int-to": 4, "in": ["f"]}`, col)
if err != nil {
t.Fatal(err)
}
if !ensureMapHasKeys(q, ids[1], ids[2], ids[3]) {
t.Fatal(q)
}
q, err = runQuery(`{"int-from": 2, "int-to": 4, "in": ["f"], "limit": 2}`, col)
if err != nil {
t.Fatal(err)
}
if !ensureMapHasKeys(q, ids[1], ids[2]) {
t.Fatal(q, ids[1], ids[2])
}
// int hash scan using reversed range and limit
q, err = runQuery(`{"int-from": 10, "int-to": 0, "in": ["f"]}`, col)
if err != nil {
t.Fatal(err)
}
if !ensureMapHasKeys(q, ids[5], ids[4], ids[3], ids[2], ids[1], ids[0]) {
t.Fatal(q)
}
q, err = runQuery(`{"int-from": 10, "int-to": 0, "in": ["f"], "limit": 2}`, col)
if err != nil {
t.Fatal(err)
}
if !ensureMapHasKeys(q, ids[5], ids[4]) {
t.Fatal(q)
}
// all documents
q, err = runQuery(`"all"`, col)
if err != nil {
t.Fatal(err)
}
if !ensureMapHasKeys(q, ids[0], ids[1], ids[2], ids[3], ids[4], ids[5], ids[6], ids[7]) {
t.Fatal(q)
}
// union
col.Index([]string{"c"})
q, err = runQuery(`[{"eq": 4, "limit": 1, "in": ["a", "b"]}, {"eq": 1, "limit": 1, "in": ["c"]}]`, col)
if err != nil {
t.Fatal(err)
}
if !ensureMapHasKeys(q, ids[0], ids[4]) && !ensureMapHasKeys(q, ids[1], ids[4]) {
t.Fatal(q)
}
// intersection
col.Index([]string{"d"})
q, err = runQuery(`{"n": [{"eq": 2, "in": ["d"]}, "all"]}`, col)
if err != nil {
t.Fatal(err)
}
if !ensureMapHasKeys(q, ids[1], ids[3]) {
t.Fatal(q)
}
// intersection with incorrect input
q, err = runQuery(`{"c": null}`, col)
if dberr.Type(err) != dberr.ErrorExpectingSubQuery {
t.Fatal(err)
}
// complement
q, err = runQuery(`{"c": [{"eq": 4, "in": ["c"]}, {"eq": 2, "in": ["d"]}, "all"]}`, col)
if err != nil {
t.Fatal(err)
}
if !ensureMapHasKeys(q, ids[0], ids[2], ids[6], ids[7]) {
t.Fatal(q)
}
// complement with incorrect input
q, err = runQuery(`{"c": null}`, col)
if dberr.Type(err) != dberr.ErrorExpectingSubQuery {
t.Fatal(err)
}
// union of intersection
q, err = runQuery(`[{"n": [{"eq": 3, "in": ["c"]}]}, {"n": [{"eq": 2, "in": ["c"]}]}]`, col)
if !ensureMapHasKeys(q, ids[2], ids[3]) {
t.Fatal(q)
}
// union of complement
q, err = runQuery(`[{"c": [{"eq": 3, "in": ["c"]}]}, {"c": [{"eq": 2, "in": ["c"]}]}]`, col)
if !ensureMapHasKeys(q, ids[2], ids[3]) {
t.Fatal(q)
}
// union of complement of intersection
q, err = runQuery(`[{"c": [{"n": [{"eq": 1, "in": ["d"]},{"eq": 1, "in": ["c"]}]},{"eq": 1, "in": ["d"]}]},{"eq": 2, "in": ["c"]}]`, col)
if !ensureMapHasKeys(q, ids[2], ids[4], ids[5]) {
t.Fatal(q)
}
}

View File

@ -0,0 +1,48 @@
package dberr
import "fmt"
type errorType string
const (
ErrorNil errorType = ""
ErrorUndefined errorType = "Unknown Error."
// IO error
ErrorIO errorType = "IO error has occured, see log for more details."
ErrorNoDoc errorType = "Document `%d` does not exist"
// Document errors
ErrorDocTooLarge errorType = "Document is too large. Max: `%d`, Given: `%d`"
ErrorDocLocked errorType = "Document `%d` is locked for update - try again later"
// Query input errors
ErrorNeedIndex errorType = "Please index %v and retry query %v."
ErrorExpectingSubQuery errorType = "Expecting a vector of sub-queries, but %v given."
ErrorExpectingInt errorType = "Expecting `%s` as an integer, but %v given."
ErrorMissing errorType = "Missing `%s`"
)
func New(err errorType, details ...interface{}) Error {
return Error{err, details}
}
type Error struct {
err errorType
details []interface{}
}
func (e Error) Error() string {
return fmt.Sprintf(string(e.err), e.details...)
}
func Type(e error) errorType {
if e == nil {
return ErrorNil
}
if err, ok := e.(Error); ok {
return err.err
}
return ErrorUndefined
}

View File

@ -0,0 +1,24 @@
Copyright (c) 2011, Evan Shaw <edsrzf@gmail.com>
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
* Neither the name of the copyright holder nor the
names of its contributors may be used to endorse or promote products
derived from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER> BE LIABLE FOR ANY
DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@ -0,0 +1,4 @@
This work is based on [mmap-go][] (BSD-style license) written by [Evan Shaw][].
[mmap-go]: https://github.com/edsrzf/mmap-go
[Evan Shaw]: https://github.com/edsrzf/

View File

@ -0,0 +1,58 @@
// Copyright 2011 Evan Shaw. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// This file defines the common package interface and contains a little bit of
// factored out logic.
// Package gommap allows mapping files into memory. It tries to provide a simple, reasonably portable interface,
// but doesn't go out of its way to abstract away every little platform detail.
// This specifically means:
// * forked processes may or may not inherit mappings
// * a file's timestamp may or may not be updated by writes through mappings
// * specifying a size larger than the file's actual size can increase the file's size
// * If the mapped file is being modified by another process while your program's running, don't expect consistent results between platforms
package gommap
import (
"errors"
"os"
"reflect"
"unsafe"
)
// MMap represents a file mapped into memory.
type MMap []byte
// Map maps an entire file into memory.
// Note that because of runtime limitations, no file larger than about 2GB can
// be completely mapped into memory.
func Map(f *os.File) (MMap, error) {
fd := uintptr(f.Fd())
fi, err := f.Stat()
if err != nil {
return nil, err
}
length := int(fi.Size())
if int64(length) != fi.Size() {
return nil, errors.New("memory map file length overflow")
}
return mmap(length, fd)
}
func (m *MMap) header() *reflect.SliceHeader {
return (*reflect.SliceHeader)(unsafe.Pointer(m))
}
// Unmap deletes the memory mapped region, flushes any remaining changes, and sets
// m to nil.
// Trying to read or write any remaining references to m after Unmap is called will
// result in undefined behavior.
// Unmap should only be called on the slice value that was originally returned from
// a call to Map. Calling Unmap on a derived slice may cause errors.
func (m *MMap) Unmap() error {
dh := m.header()
err := unmap(dh.Data, uintptr(dh.Len))
*m = nil
return err
}

View File

@ -0,0 +1,23 @@
// Copyright 2011 Evan Shaw. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build darwin freebsd linux netbsd openbsd
package gommap
import (
"syscall"
)
func mmap(len int, fd uintptr) ([]byte, error) {
return syscall.Mmap(int(fd), 0, len, syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED)
}
func unmap(addr, len uintptr) error {
_, _, errno := syscall.Syscall(syscall.SYS_MUNMAP, addr, len, 0)
if errno != 0 {
return syscall.Errno(errno)
}
return nil
}

View File

@ -0,0 +1,59 @@
// Copyright 2011 Evan Shaw. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package gommap
import (
"os"
"sync"
"syscall"
)
// mmap on Windows is a two-step process.
// First, we call CreateFileMapping to get a handle.
// Then, we call MapviewToFile to get an actual pointer into memory.
// Because we want to emulate a POSIX-style mmap, we don't want to expose
// the handle -- only the pointer. We also want to return only a byte slice,
// not a struct, so it's convenient to manipulate.
// We keep this map so that we can get back the original handle from the memory address.
var handleLock sync.Mutex
var handleMap = map[uintptr]syscall.Handle{}
// Windows mmap always mapes the entire file regardless of the specified length.
func mmap(length int, hfile uintptr) ([]byte, error) {
h, errno := syscall.CreateFileMapping(syscall.Handle(hfile), nil, syscall.PAGE_READWRITE, 0, 0, nil)
if h == 0 {
return nil, os.NewSyscallError("CreateFileMapping", errno)
}
addr, errno := syscall.MapViewOfFile(h, syscall.FILE_MAP_WRITE, 0, 0, 0)
if addr == 0 {
return nil, os.NewSyscallError("MapViewOfFile", errno)
}
handleLock.Lock()
handleMap[addr] = h
handleLock.Unlock()
m := MMap{}
dh := m.header()
dh.Data = addr
dh.Len = length
dh.Cap = length
return m, nil
}
func unmap(addr, len uintptr) error {
if err := syscall.UnmapViewOfFile(addr); err != nil {
return err
}
handleLock.Lock()
defer handleLock.Unlock()
handle := handleMap[addr]
delete(handleMap, addr)
return os.NewSyscallError("CloseHandle", syscall.CloseHandle(syscall.Handle(handle)))
}

View File

@ -0,0 +1,54 @@
package tdlog
import (
"fmt"
"log"
"sync"
)
// Controls whether INFO log messages are generated
var VerboseLog = false
// LVL 6
func Infof(template string, params ...interface{}) {
if VerboseLog {
log.Printf(template, params...)
}
}
func Info(params ...interface{}) {
if VerboseLog {
log.Print(params...)
}
}
// LVL 5
func Noticef(template string, params ...interface{}) {
log.Printf(template, params...)
}
func Notice(params ...interface{}) {
log.Print(params...)
}
var critHistory = make(map[string]struct{})
var critLock = new(sync.Mutex)
// LVL 2 - will not repeat a message twice over the past 100 distinct crit messages
func CritNoRepeat(template string, params ...interface{}) {
msg := fmt.Sprintf(template, params...)
critLock.Lock()
if _, exists := critHistory[msg]; !exists {
log.Print(msg)
critHistory[msg] = struct{}{}
}
if len(critHistory) > 100 {
critHistory = make(map[string]struct{})
}
critLock.Unlock()
}
// LVL 1
func Panicf(template string, params ...interface{}) {
log.Panicf(template, params...)
}

View File

@ -0,0 +1,23 @@
package tdlog
import (
"testing"
)
func TestAllLogLevels(t *testing.T) {
defer func() {
if recover() == nil {
t.Fatal("Did not catch Panicf")
}
}()
Infof("a %s %s", "b", "c")
Info("a", "b", "c")
Noticef("a %s %s", "b", "c")
Notice("a", "b", "c")
CritNoRepeat("a %s %s", "b", "c")
if _, exists := critHistory["a b c"]; !exists {
t.Fatal("did not record history")
}
Panicf("a %s %s", "b", "c")
t.Fatal("Cannot reach here")
}

View File

@ -0,0 +1,51 @@
package database
import (
"github.com/HouzuoGuo/tiedot/db"
)
func NewDatabase() *Database {
return &Database{}
}
func (data *Database) setdbdir(dirname string) {
data.DBdir = dirname
}
func (data *Database) GetDBHandle(dirname string) error {
var err error
data.setdbdir(dirname)
data.DBhandle, err = db.OpenDB(data.DBdir)
if err != nil {
return err
}
return nil
}
func (data *Database) InitCollection(colls ...string) {
for _, str := range colls {
data.DBhandle.Create(str)
}
}
func (data *Database) GetCollections() []string {
var colls []string
for _, name := range data.DBhandle.AllCols() {
colls = append(colls, name)
}
return colls
}
func (data *Database) getCollectionHandle(coll string) *db.Col {
return data.DBhandle.Use(coll)
}
func (data *Database) GetCollectionData(coll string, docid int) (map[string]interface{}, error) {
collHandle := data.getCollectionHandle(coll)
return collHandle.Read(docid)
}
func (data *Database) InsertToCollection(coll string, model map[string]interface{}) (docid int, err error) {
collHandle := data.getCollectionHandle(coll)
return collHandle.Insert(model)
}

View File

@ -0,0 +1,36 @@
package database
import (
"os"
"testing"
. "gopkg.in/check.v1"
)
func Test(t *testing.T) { TestingT(t) }
type MySuite struct{}
var _ = Suite(&MySuite{})
func (s *MySuite) Testing(c *C) {
d := NewDatabase()
d.GetDBHandle("/tmp/testdata")
defer os.RemoveAll("/tmp/testdata")
d.InitCollection("Matrix")
data := map[string]interface{}{
"version": "1.4",
"url": "golang.org",
"language": "Go",
}
docId, err1 := d.InsertToCollection("Matrix", data)
c.Assert(err1, IsNil)
retdata, err2 := d.GetCollectionData("Matrix", docId)
c.Assert(err2, IsNil)
c.Assert(data, DeepEquals, retdata)
}

View File

@ -0,0 +1,10 @@
package database
import (
"github.com/HouzuoGuo/tiedot/db"
)
type Database struct {
DBdir string
DBhandle *db.DB
}