diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 7763d0c1d..656687437 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -5,6 +5,31 @@ "./..." ], "Deps": [ + { + "ImportPath": "github.com/HouzuoGuo/tiedot/data", + "Comment": "3.1.3-33-g9cd340d", + "Rev": "9cd340d7fda74e630319c58254b04787f3ffe43a" + }, + { + "ImportPath": "github.com/HouzuoGuo/tiedot/db", + "Comment": "3.1.3-33-g9cd340d", + "Rev": "9cd340d7fda74e630319c58254b04787f3ffe43a" + }, + { + "ImportPath": "github.com/HouzuoGuo/tiedot/dberr", + "Comment": "3.1.3-33-g9cd340d", + "Rev": "9cd340d7fda74e630319c58254b04787f3ffe43a" + }, + { + "ImportPath": "github.com/HouzuoGuo/tiedot/gommap", + "Comment": "3.1.3-33-g9cd340d", + "Rev": "9cd340d7fda74e630319c58254b04787f3ffe43a" + }, + { + "ImportPath": "github.com/HouzuoGuo/tiedot/tdlog", + "Comment": "3.1.3-33-g9cd340d", + "Rev": "9cd340d7fda74e630319c58254b04787f3ffe43a" + }, { "ImportPath": "github.com/codegangsta/cli", "Comment": "1.2.0-42-gfbda1ce", diff --git a/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/data/collection.go b/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/data/collection.go new file mode 100644 index 000000000..de7ddf859 --- /dev/null +++ b/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/data/collection.go @@ -0,0 +1,144 @@ +/* +Collection data file contains document data. Every document has a binary header and UTF-8 text content. +Documents are inserted one after another, and occupies 2x original document size to leave room for future updates. +Deleted documents are marked as deleted and the space is irrecoverable until a "scrub" action (in DB logic) is carried out. +When update takes place, the new document may overwrite original document if there is enough space, otherwise the +original document is marked as deleted and the updated document is inserted as a new document. +*/ +package data + +import ( + "encoding/binary" + + "github.com/HouzuoGuo/tiedot/dberr" +) + +const ( + COL_FILE_GROWTH = 32 * 1048576 // Collection file initial size & size growth (32 MBytes) + DOC_MAX_ROOM = 2 * 1048576 // Max document size (2 MBytes) + DOC_HEADER = 1 + 10 // Document header size - validity (single byte), document room (int 10 bytes) + // Pre-compiled document padding (128 spaces) + PADDING = " " + LEN_PADDING = len(PADDING) +) + +// Collection file contains document headers and document text data. +type Collection struct { + *DataFile +} + +// Open a collection file. +func OpenCollection(path string) (col *Collection, err error) { + col = new(Collection) + col.DataFile, err = OpenDataFile(path, COL_FILE_GROWTH) + return +} + +// Find and retrieve a document by ID (physical document location). Return value is a copy of the document. +func (col *Collection) Read(id int) []byte { + if id < 0 || id > col.Used-DOC_HEADER || col.Buf[id] != 1 { + return nil + } else if room, _ := binary.Varint(col.Buf[id+1 : id+11]); room > DOC_MAX_ROOM { + return nil + } else if docEnd := id + DOC_HEADER + int(room); docEnd >= col.Size { + return nil + } else { + docCopy := make([]byte, room) + copy(docCopy, col.Buf[id+DOC_HEADER:docEnd]) + return docCopy + } +} + +// Insert a new document, return the new document ID. +func (col *Collection) Insert(data []byte) (id int, err error) { + room := len(data) << 1 + if room > DOC_MAX_ROOM { + return 0, dberr.New(dberr.ErrorDocTooLarge, DOC_MAX_ROOM, room) + } + id = col.Used + docSize := DOC_HEADER + room + if err = col.EnsureSize(docSize); err != nil { + return + } + col.Used += docSize + // Write validity, room, document data and padding + col.Buf[id] = 1 + binary.PutVarint(col.Buf[id+1:id+11], int64(room)) + copy(col.Buf[id+DOC_HEADER:col.Used], data) + for padding := id + DOC_HEADER + len(data); padding < col.Used; padding += LEN_PADDING { + copySize := LEN_PADDING + if padding+LEN_PADDING >= col.Used { + copySize = col.Used - padding + } + copy(col.Buf[padding:padding+copySize], PADDING) + } + return +} + +// Overwrite or re-insert a document, return the new document ID if re-inserted. +func (col *Collection) Update(id int, data []byte) (newID int, err error) { + dataLen := len(data) + if dataLen > DOC_MAX_ROOM { + return 0, dberr.New(dberr.ErrorDocTooLarge, DOC_MAX_ROOM, dataLen) + } + if id < 0 || id >= col.Used-DOC_HEADER || col.Buf[id] != 1 { + return 0, dberr.New(dberr.ErrorNoDoc, id) + } + currentDocRoom, _ := binary.Varint(col.Buf[id+1 : id+11]) + if currentDocRoom > DOC_MAX_ROOM { + return 0, dberr.New(dberr.ErrorNoDoc, id) + } + if docEnd := id + DOC_HEADER + int(currentDocRoom); docEnd >= col.Size { + return 0, dberr.New(dberr.ErrorNoDoc, id) + } + if dataLen <= int(currentDocRoom) { + padding := id + DOC_HEADER + len(data) + paddingEnd := id + DOC_HEADER + int(currentDocRoom) + // Overwrite data and then overwrite padding + copy(col.Buf[id+DOC_HEADER:padding], data) + for ; padding < paddingEnd; padding += LEN_PADDING { + copySize := LEN_PADDING + if padding+LEN_PADDING >= paddingEnd { + copySize = paddingEnd - padding + } + copy(col.Buf[padding:padding+copySize], PADDING) + } + return id, nil + } else { + // No enough room - re-insert the document + col.Delete(id) + return col.Insert(data) + } +} + +// Delete a document by ID. +func (col *Collection) Delete(id int) error { + + if id < 0 || id > col.Used-DOC_HEADER || col.Buf[id] != 1 { + return dberr.New(dberr.ErrorNoDoc, id) + } + + if col.Buf[id] == 1 { + col.Buf[id] = 0 + } + + return nil +} + +// Run the function on every document; stop when the function returns false. +func (col *Collection) ForEachDoc(fun func(id int, doc []byte) bool) { + for id := 0; id < col.Used-DOC_HEADER && id >= 0; { + validity := col.Buf[id] + room, _ := binary.Varint(col.Buf[id+1 : id+11]) + docEnd := id + DOC_HEADER + int(room) + if (validity == 0 || validity == 1) && room <= DOC_MAX_ROOM && docEnd > 0 && docEnd <= col.Used { + if validity == 1 && !fun(id, col.Buf[id+DOC_HEADER:docEnd]) { + break + } + id = docEnd + } else { + // Corrupted document - move on + id++ + } + } +} diff --git a/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/data/collection_test.go b/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/data/collection_test.go new file mode 100644 index 000000000..10b865ed6 --- /dev/null +++ b/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/data/collection_test.go @@ -0,0 +1,263 @@ +package data + +import ( + "os" + "strings" + "testing" + + "github.com/HouzuoGuo/tiedot/dberr" +) + +func TestInsertRead(t *testing.T) { + tmp := "/tmp/tiedot_test_col" + os.Remove(tmp) + defer os.Remove(tmp) + col, err := OpenCollection(tmp) + if err != nil { + t.Fatalf("Failed to open: %v", err) + return + } + defer col.Close() + docs := [][]byte{ + []byte("abc"), + []byte("1234")} + ids := [2]int{} + if ids[0], err = col.Insert(docs[0]); ids[0] != 0 || err != nil { + t.Fatalf("Failed to insert: %d %v", ids[0], err) + } + if ids[1], err = col.Insert(docs[1]); err != nil { + t.Fatalf("Failed to insert: %v", err) + } + if doc0 := col.Read(ids[0]); doc0 == nil || strings.TrimSpace(string(doc0)) != string(docs[0]) { + t.Fatal("Failed to read", doc0) + } + if doc1 := col.Read(ids[1]); doc1 == nil || strings.TrimSpace(string(doc1)) != string(docs[1]) { + t.Fatalf("Failed to read") + } + // it shall not panic + col.Read(col.Size) +} + +func TestInsertUpdateRead(t *testing.T) { + tmp := "/tmp/tiedot_test_col" + os.Remove(tmp) + defer os.Remove(tmp) + col, err := OpenCollection(tmp) + if err != nil { + t.Fatalf("Failed to open: %v", err) + return + } + defer col.Close() + docs := [][]byte{ + []byte("abc"), + []byte("1234")} + ids := [2]int{} + if ids[0], err = col.Insert(docs[0]); err != nil { + t.Fatalf("Failed to insert: %v", err) + } + if ids[1], err = col.Insert(docs[1]); err != nil { + t.Fatalf("Failed to insert: %v", err) + } + updated := [2]int{} + if updated[0], err = col.Update(ids[0], []byte("abcdef")); err != nil || updated[0] != ids[0] { + t.Fatalf("Failed to update: %v", err) + } + if updated[1], err = col.Update(ids[1], []byte("longlonglonglonglonglonglong")); err != nil || updated[1] == ids[1] { + t.Fatalf("Failed to update: %v", err) + } + if doc0 := col.Read(updated[0]); doc0 == nil || strings.TrimSpace(string(doc0)) != "abcdef" { + t.Fatalf("Failed to read") + } + if doc1 := col.Read(updated[1]); doc1 == nil || strings.TrimSpace(string(doc1)) != "longlonglonglonglonglonglong" { + t.Fatalf("Failed to read") + } + // it shall not panic + col.Update(col.Size, []byte("abcdef")) +} + +func TestInsertDeleteRead(t *testing.T) { + tmp := "/tmp/tiedot_test_col" + os.Remove(tmp) + defer os.Remove(tmp) + col, err := OpenCollection(tmp) + if err != nil { + t.Fatalf("Failed to open: %v", err) + return + } + defer col.Close() + docs := [][]byte{ + []byte("abc"), + []byte("1234"), + []byte("2345")} + ids := [3]int{} + if ids[0], err = col.Insert(docs[0]); err != nil { + t.Fatalf("Failed to insert: %v", err) + } + if ids[1], err = col.Insert(docs[1]); err != nil { + t.Fatalf("Failed to insert: %v", err) + } + if ids[2], err = col.Insert(docs[2]); err != nil { + t.Fatalf("Failed to insert: %v", err) + } + if doc0 := col.Read(ids[0]); doc0 == nil || strings.TrimSpace(string(doc0)) != string(docs[0]) { + t.Fatalf("Failed to read") + } + if err = col.Delete(ids[1]); err != nil { + t.Fatal(err) + } + if doc1 := col.Read(ids[1]); doc1 != nil { + t.Fatalf("Did not delete") + } + if doc2 := col.Read(ids[2]); doc2 == nil || strings.TrimSpace(string(doc2)) != string(docs[2]) { + t.Fatalf("Failed to read") + } + // it shall not panic + if err = col.Delete(col.Size); dberr.Type(err) != dberr.ErrorNoDoc { + t.Fatal("did not error") + } +} + +func TestInsertReadAll(t *testing.T) { + tmp := "/tmp/tiedot_test_col" + os.Remove(tmp) + defer os.Remove(tmp) + col, err := OpenCollection(tmp) + if err != nil { + t.Fatalf("Failed to open: %v", err) + return + } + defer col.Close() + var ids [5]int + ids[0], err = col.Insert([]byte("abc")) + if err != nil { + t.Fatalf("Insert failed: %v", err) + } + ids[1], err = col.Insert([]byte("abc")) + if err != nil { + t.Fatalf("Insert failed: %v", err) + } + ids[2], err = col.Insert([]byte("abc")) + if err != nil { + t.Fatalf("Insert failed: %v", err) + } + ids[3], err = col.Insert([]byte("abc")) + if err != nil { + t.Fatalf("Insert failed: %v", err) + } + ids[4], err = col.Insert([]byte("abc")) + if err != nil { + t.Fatalf("Insert failed: %v", err) + } + successfullyRead := 0 + t.Log(ids) + col.ForEachDoc(func(_ int, _ []byte) bool { + successfullyRead++ + return true + }) + if successfullyRead != 5 { + t.Fatalf("Should have read 5 documents, but only got %d", successfullyRead) + } + successfullyRead = 0 + // intentionally corrupt two docuemnts + col.Buf[ids[4]] = 3 // corrupted validity + col.Buf[ids[2]+1] = 255 // corrupted room + col.ForEachDoc(func(_ int, _ []byte) bool { + successfullyRead++ + return true + }) + if successfullyRead != 3 { + t.Fatalf("Should have read 3 documents, but %d are recovered", successfullyRead) + } +} + +func TestCollectionGrowAndOutOfBoundAccess(t *testing.T) { + tmp := "/tmp/tiedot_test_col" + os.Remove(tmp) + defer os.Remove(tmp) + col, err := OpenCollection(tmp) + if err != nil { + t.Fatalf("Failed to open: %v", err) + return + } + defer col.Close() + // Insert several documents + docs := [][]byte{ + []byte("abc"), + []byte("1234"), + []byte("2345")} + if _, err = col.Insert(docs[0]); err != nil { + t.Fatalf("Failed to insert: %v", err) + } + if _, err = col.Insert(docs[1]); err != nil { + t.Fatalf("Failed to insert: %v", err) + } + if _, err = col.Insert(docs[2]); err != nil { + t.Fatalf("Failed to insert: %v", err) + } + // Test UsedSize + calculatedUsedSize := (DOC_HEADER + 3*2) + (DOC_HEADER+4*2)*2 + if col.Used != calculatedUsedSize { + t.Fatalf("Invalid UsedSize") + } + // Read invalid location + if doc := col.Read(1); doc != nil { + t.Fatalf("Read invalid location") + } + if doc := col.Read(col.Used); doc != nil { + t.Fatalf("Read invalid location") + } + if doc := col.Read(col.Size); doc != nil { + t.Fatalf("Read invalid location") + } + if doc := col.Read(999999999); doc != nil { + t.Fatalf("Read invalid location") + } + // Update invalid location + if _, err := col.Update(1, []byte{}); dberr.Type(err) != dberr.ErrorNoDoc { + t.Fatalf("Update invalid location") + } + if _, err := col.Update(col.Used, []byte{}); dberr.Type(err) != dberr.ErrorNoDoc { + t.Fatalf("Update invalid location") + } + if _, err := col.Update(col.Size, []byte{}); dberr.Type(err) != dberr.ErrorNoDoc { + t.Fatalf("Update invalid location") + } + if _, err := col.Update(999999999, []byte{}); dberr.Type(err) != dberr.ErrorNoDoc { + t.Fatalf("Update invalid location") + } + // Delete invalid location + if err = col.Delete(1); dberr.Type(err) != dberr.ErrorNoDoc { + t.Fatal("did not error") + } + if err = col.Delete(col.Used); dberr.Type(err) != dberr.ErrorNoDoc { + t.Fatal("did not error") + } + if err = col.Delete(col.Size); dberr.Type(err) != dberr.ErrorNoDoc { + t.Fatal("did not error") + } + if err = col.Delete(999999999); dberr.Type(err) != dberr.ErrorNoDoc { + t.Fatal("did not error") + } + // Insert - not enough room + count := 0 + for i := 0; i < COL_FILE_GROWTH; i += DOC_MAX_ROOM { + if _, err := col.Insert(make([]byte, DOC_MAX_ROOM/2)); err != nil { + t.Fatal(err) + } + count++ + } + if _, err := col.Insert(make([]byte, DOC_MAX_ROOM/2)); err != nil { + t.Fatal(err) + } + count++ + calculatedUsedSize += count * (DOC_HEADER + DOC_MAX_ROOM) + if col.Used != calculatedUsedSize { + t.Fatalf("Wrong UsedSize %d %d", col.Used, calculatedUsedSize) + } + if col.Size != COL_FILE_GROWTH+col.Growth { + t.Fatalf("Size changed?! %d %d %d", col.Size, COL_FILE_GROWTH, col.Growth) + } + if err = col.Close(); err != nil { + t.Fatal(err) + } +} diff --git a/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/data/file.go b/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/data/file.go new file mode 100644 index 000000000..b429f8783 --- /dev/null +++ b/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/data/file.go @@ -0,0 +1,120 @@ +// Common data file features - enlarge, close, close, etc. +package data + +import ( + "github.com/HouzuoGuo/tiedot/gommap" + "github.com/HouzuoGuo/tiedot/tdlog" + "os" +) + +// Data file keeps track of the amount of total and used space. +type DataFile struct { + Path string + Size, Used, Growth int + Fh *os.File + Buf gommap.MMap +} + +// Return true if the buffer begins with 64 consecutive zero bytes. +func LooksEmpty(buf gommap.MMap) bool { + upTo := 1024 + if upTo >= len(buf) { + upTo = len(buf) - 1 + } + for i := 0; i < upTo; i++ { + if buf[i] != 0 { + return false + } + } + return true +} + +// Open a data file that grows by the specified size. +func OpenDataFile(path string, growth int) (file *DataFile, err error) { + file = &DataFile{Path: path, Growth: growth} + if file.Fh, err = os.OpenFile(file.Path, os.O_CREATE|os.O_RDWR, 0600); err != nil { + return + } + var size int64 + if size, err = file.Fh.Seek(0, os.SEEK_END); err != nil { + return + } + // Ensure the file is not smaller than file growth + if file.Size = int(size); file.Size < file.Growth { + if err = file.EnsureSize(file.Growth); err != nil { + return + } + } + if file.Buf == nil { + file.Buf, err = gommap.Map(file.Fh) + } + // Bi-sect file buffer to find out how much space is in-use + for low, mid, high := 0, file.Size/2, file.Size; ; { + switch { + case high-mid == 1: + if LooksEmpty(file.Buf[mid:]) { + if mid > 0 && LooksEmpty(file.Buf[mid-1:]) { + file.Used = mid - 1 + } else { + file.Used = mid + } + return + } + file.Used = high + return + case LooksEmpty(file.Buf[mid:]): + high = mid + mid = low + (mid-low)/2 + default: + low = mid + mid = mid + (high-mid)/2 + } + } + tdlog.Infof("%s opened: %d of %d bytes in-use", file.Path, file.Used, file.Size) + return +} + +// Ensure there is enough room for that many bytes of data. +func (file *DataFile) EnsureSize(more int) (err error) { + if file.Used+more <= file.Size { + return + } else if file.Buf != nil { + if err = file.Buf.Unmap(); err != nil { + return + } + } + if err = os.Truncate(file.Path, int64(file.Size+file.Growth)); err != nil { + return + } else if file.Buf, err = gommap.Map(file.Fh); err != nil { + return + } + file.Size += file.Growth + tdlog.Infof("%s grown: %d -> %d bytes (%d bytes in-use)", file.Path, file.Size-file.Growth, file.Size, file.Used) + return file.EnsureSize(more) +} + +// Un-map the file buffer and close the file handle. +func (file *DataFile) Close() (err error) { + if err = file.Buf.Unmap(); err != nil { + return + } + return file.Fh.Close() +} + +// Clear the entire file and resize it to initial size. +func (file *DataFile) Clear() (err error) { + if err = file.Close(); err != nil { + return + } else if err = os.Truncate(file.Path, 0); err != nil { + return + } else if err = os.Truncate(file.Path, int64(file.Growth)); err != nil { + return + } else if file.Fh, err = os.OpenFile(file.Path, os.O_CREATE|os.O_RDWR, 0600); err != nil { + return + } else if file.Buf, err = gommap.Map(file.Fh); err != nil { + return + } + file.Used, file.Size = 0, file.Growth + tdlog.Infof("%s cleared: %d of %d bytes in-use", file.Path, file.Used, file.Size) + return +} diff --git a/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/data/file_test.go b/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/data/file_test.go new file mode 100644 index 000000000..26b98e60e --- /dev/null +++ b/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/data/file_test.go @@ -0,0 +1,119 @@ +package data + +import ( + "os" + "testing" +) + +func TestOpenFlushClose(t *testing.T) { + tmp := "/tmp/tiedot_test_file" + os.Remove(tmp) + defer os.Remove(tmp) + tmpFile, err := OpenDataFile(tmp, 999) + if err != nil { + t.Fatalf("Failed to open: %v", err) + return + } + defer tmpFile.Close() + if tmpFile.Path != tmp { + t.Fatal("Name not set") + } + if tmpFile.Used != 0 { + t.Fatal("Incorrect Used") + } + if tmpFile.Growth != 999 { + t.Fatal("Growth not set") + } + if tmpFile.Fh == nil || tmpFile.Buf == nil { + t.Fatal("Not mmapped") + } + if err := tmpFile.Close(); err != nil { + t.Fatalf("Failed to close: %v", err) + } +} + +func TestFindingAppendAndClear(t *testing.T) { + tmp := "/tmp/tiedot_test_file" + os.Remove(tmp) + defer os.Remove(tmp) + // Open + tmpFile, err := OpenDataFile(tmp, 1024) + if err != nil { + t.Fatalf("Failed to open: %v", err) + return + } + if tmpFile.Used != 0 { + t.Fatal("Incorrect Used", tmpFile.Used) + } + // Write something + tmpFile.Buf[500] = 1 + tmpFile.Close() + + // Re-open + tmpFile, err = OpenDataFile(tmp, 1024) + if err != nil { + t.Fatalf("Failed to open: %v", err) + } + if tmpFile.Used != 501 { + t.Fatal("Incorrect Used") + } + + // Write something again + for i := 750; i < 800; i++ { + tmpFile.Buf[i] = byte('a') + } + tmpFile.Close() + + // Re-open again + tmpFile, err = OpenDataFile(tmp, 1024) + if err != nil { + t.Fatalf("Failed to open: %v", err) + } + if tmpFile.Used != 800 { + t.Fatal("Incorrect Append", tmpFile.Used) + } + // Clear the file and test size + if err = tmpFile.Clear(); err != nil { + t.Fatal(err) + } + if !(len(tmpFile.Buf) == 1024 && tmpFile.Buf[750] == 0 && tmpFile.Growth == 1024 && tmpFile.Size == 1024 && tmpFile.Used == 0) { + t.Fatal("Did not clear", len(tmpFile.Buf), tmpFile.Growth, tmpFile.Size, tmpFile.Used) + } + // Can still write to the buffer? + tmpFile.Buf[999] = 1 + tmpFile.Close() +} + +func TestFileGrow(t *testing.T) { + tmp := "/tmp/tiedot_test_file" + os.Remove(tmp) + defer os.Remove(tmp) + // Open and write something + tmpFile, err := OpenDataFile(tmp, 4) + if err != nil { + t.Fatalf("Failed to open: %v", err) + return + } + tmpFile.Buf[2] = 1 + tmpFile.Used = 3 + if tmpFile.Size != 4 { + t.Fatal("Incorrect Size", tmpFile.Size) + } + tmpFile.EnsureSize(8) + if tmpFile.Size != 12 { // 3 times file growth = 12 bytes + t.Fatalf("Incorrect Size") + } + if tmpFile.Used != 3 { // Used should not change + t.Fatalf("Incorrect Used") + } + if len(tmpFile.Buf) != 12 { + t.Fatal("Did not remap") + } + if tmpFile.Growth != 4 { + t.Fatalf("Incorrect Growth") + } + // Can write to the new (now larger) region + tmpFile.Buf[10] = 1 + tmpFile.Buf[11] = 1 + tmpFile.Close() +} diff --git a/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/data/hashtable.go b/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/data/hashtable.go new file mode 100644 index 000000000..d94480196 --- /dev/null +++ b/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/data/hashtable.go @@ -0,0 +1,258 @@ +/* +Hash table file contains binary content; it implements a static hash table made of hash buckets and integer entries. +Every bucket has a fixed number of entries. When a bucket becomes full, a new bucket is chained to it in order to store +more entries. Every entry has an integer key and value. +An entry key may have multiple values assigned to it, however the combination of entry key and value must be unique +across the entire hash table. +*/ +package data + +import ( + "encoding/binary" + "github.com/HouzuoGuo/tiedot/tdlog" + "sync" +) + +const ( + HT_FILE_GROWTH = 32 * 1048576 // Hash table file initial size & file growth + ENTRY_SIZE = 1 + 10 + 10 // Hash entry size: validity (single byte), key (int 10 bytes), value (int 10 bytes) + BUCKET_HEADER = 10 // Bucket header size: next chained bucket number (int 10 bytes) + PER_BUCKET = 16 // Entries per bucket + HASH_BITS = 16 // Number of hash key bits + BUCKET_SIZE = BUCKET_HEADER + PER_BUCKET*ENTRY_SIZE // Size of a bucket + INITIAL_BUCKETS = 65536 // Initial number of buckets == 2 ^ HASH_BITS +) + +// Hash table file is a binary file containing buckets of hash entries. +type HashTable struct { + *DataFile + numBuckets int + Lock *sync.RWMutex +} + +// Smear the integer entry key and return the portion (first HASH_BITS bytes) used for allocating the entry. +func HashKey(key int) int { + /* + tiedot should be compiled/run on x86-64 systems. + If you decide to compile tiedot on 32-bit systems, the following integer-smear algorithm will cause compilation failure + due to 32-bit interger overflow; therefore you must modify the algorithm. + Do not remove the integer-smear process, and remember to run test cases to verify your mods. + */ + // ========== Integer-smear start ======= + key = key ^ (key >> 4) + key = (key ^ 0xdeadbeef) + (key << 5) + key = key ^ (key >> 11) + // ========== Integer-smear end ========= + return key & ((1 << HASH_BITS) - 1) // Do not modify this line +} + +// Open a hash table file. +func OpenHashTable(path string) (ht *HashTable, err error) { + ht = &HashTable{Lock: new(sync.RWMutex)} + if ht.DataFile, err = OpenDataFile(path, HT_FILE_GROWTH); err != nil { + return + } + ht.calculateNumBuckets() + return +} + +// Follow the longest bucket chain to calculate total number of buckets, hence the "used size" of hash table file. +func (ht *HashTable) calculateNumBuckets() { + ht.numBuckets = ht.Size / BUCKET_SIZE + largestBucketNum := INITIAL_BUCKETS - 1 + for i := 0; i < INITIAL_BUCKETS; i++ { + lastBucket := ht.lastBucket(i) + if lastBucket > largestBucketNum && lastBucket < ht.numBuckets { + largestBucketNum = lastBucket + } + } + ht.numBuckets = largestBucketNum + 1 + usedSize := ht.numBuckets * BUCKET_SIZE + if usedSize > ht.Size { + ht.Used = ht.Size + ht.EnsureSize(usedSize - ht.Used) + } + ht.Used = usedSize + tdlog.Infof("%s: calculated used size is %d", ht.Path, usedSize) +} + +// Return number of the next chained bucket. +func (ht *HashTable) nextBucket(bucket int) int { + if bucket >= ht.numBuckets { + return 0 + } + bucketAddr := bucket * BUCKET_SIZE + nextUint, err := binary.Varint(ht.Buf[bucketAddr : bucketAddr+10]) + next := int(nextUint) + if next == 0 { + return 0 + } else if err < 0 || next <= bucket || next >= ht.numBuckets || next < INITIAL_BUCKETS { + tdlog.CritNoRepeat("Bad hash table - repair ASAP %s", ht.Path) + return 0 + } else { + return next + } +} + +// Return number of the last bucket in chain. +func (ht *HashTable) lastBucket(bucket int) int { + for curr := bucket; ; { + next := ht.nextBucket(curr) + if next == 0 { + return curr + } + curr = next + } +} + +// Create and chain a new bucket. +func (ht *HashTable) growBucket(bucket int) { + ht.EnsureSize(BUCKET_SIZE) + lastBucketAddr := ht.lastBucket(bucket) * BUCKET_SIZE + binary.PutVarint(ht.Buf[lastBucketAddr:lastBucketAddr+10], int64(ht.numBuckets)) + ht.Used += BUCKET_SIZE + ht.numBuckets++ +} + +// Clear the entire hash table. +func (ht *HashTable) Clear() (err error) { + if err = ht.DataFile.Clear(); err != nil { + return + } + ht.calculateNumBuckets() + return +} + +// Store the entry into a vacant (invalidated or empty) place in the appropriate bucket. +func (ht *HashTable) Put(key, val int) { + for bucket, entry := HashKey(key), 0; ; { + entryAddr := bucket*BUCKET_SIZE + BUCKET_HEADER + entry*ENTRY_SIZE + if ht.Buf[entryAddr] != 1 { + ht.Buf[entryAddr] = 1 + binary.PutVarint(ht.Buf[entryAddr+1:entryAddr+11], int64(key)) + binary.PutVarint(ht.Buf[entryAddr+11:entryAddr+21], int64(val)) + return + } + if entry++; entry == PER_BUCKET { + entry = 0 + if bucket = ht.nextBucket(bucket); bucket == 0 { + ht.growBucket(HashKey(key)) + ht.Put(key, val) + return + } + } + } +} + +// Look up values by key. +func (ht *HashTable) Get(key, limit int) (vals []int) { + if limit == 0 { + vals = make([]int, 0, 10) + } else { + vals = make([]int, 0, limit) + } + for count, entry, bucket := 0, 0, HashKey(key); ; { + entryAddr := bucket*BUCKET_SIZE + BUCKET_HEADER + entry*ENTRY_SIZE + entryKey, _ := binary.Varint(ht.Buf[entryAddr+1 : entryAddr+11]) + entryVal, _ := binary.Varint(ht.Buf[entryAddr+11 : entryAddr+21]) + if ht.Buf[entryAddr] == 1 { + if int(entryKey) == key { + vals = append(vals, int(entryVal)) + if count++; count == limit { + return + } + } + } else if entryKey == 0 && entryVal == 0 { + return + } + if entry++; entry == PER_BUCKET { + entry = 0 + if bucket = ht.nextBucket(bucket); bucket == 0 { + return + } + } + } +} + +// Flag an entry as invalid, so that Get will not return it later on. +func (ht *HashTable) Remove(key, val int) { + for entry, bucket := 0, HashKey(key); ; { + entryAddr := bucket*BUCKET_SIZE + BUCKET_HEADER + entry*ENTRY_SIZE + entryKey, _ := binary.Varint(ht.Buf[entryAddr+1 : entryAddr+11]) + entryVal, _ := binary.Varint(ht.Buf[entryAddr+11 : entryAddr+21]) + if ht.Buf[entryAddr] == 1 { + if int(entryKey) == key && int(entryVal) == val { + ht.Buf[entryAddr] = 0 + return + } + } else if entryKey == 0 && entryVal == 0 { + return + } + if entry++; entry == PER_BUCKET { + entry = 0 + if bucket = ht.nextBucket(bucket); bucket == 0 { + return + } + } + } +} + +// Divide the entire hash table into roughly equally sized partitions, and return the start/end key range of the chosen partition. +func GetPartitionRange(partNum, totalParts int) (start int, end int) { + perPart := INITIAL_BUCKETS / totalParts + leftOver := INITIAL_BUCKETS % totalParts + start = partNum * perPart + if leftOver > 0 { + if partNum == 0 { + end += 1 + } else if partNum < leftOver { + start += partNum + end += 1 + } else { + start += leftOver + } + } + end += start + perPart + if partNum == totalParts-1 { + end = INITIAL_BUCKETS + } + return +} + +// Collect entries all the way from "head" bucket to the end of its chained buckets. +func (ht *HashTable) collectEntries(head int) (keys, vals []int) { + keys = make([]int, 0, PER_BUCKET) + vals = make([]int, 0, PER_BUCKET) + var entry, bucket int = 0, head + for { + entryAddr := bucket*BUCKET_SIZE + BUCKET_HEADER + entry*ENTRY_SIZE + entryKey, _ := binary.Varint(ht.Buf[entryAddr+1 : entryAddr+11]) + entryVal, _ := binary.Varint(ht.Buf[entryAddr+11 : entryAddr+21]) + if ht.Buf[entryAddr] == 1 { + keys = append(keys, int(entryKey)) + vals = append(vals, int(entryVal)) + } else if entryKey == 0 && entryVal == 0 { + return + } + if entry++; entry == PER_BUCKET { + entry = 0 + if bucket = ht.nextBucket(bucket); bucket == 0 { + return + } + } + } +} + +// Return all entries in the chosen partition. +func (ht *HashTable) GetPartition(partNum, partSize int) (keys, vals []int) { + rangeStart, rangeEnd := GetPartitionRange(partNum, partSize) + prealloc := (rangeEnd - rangeStart) * PER_BUCKET + keys = make([]int, 0, prealloc) + vals = make([]int, 0, prealloc) + for head := rangeStart; head < rangeEnd; head++ { + k, v := ht.collectEntries(head) + keys = append(keys, k...) + vals = append(vals, v...) + } + return +} diff --git a/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/data/hashtable_test.go b/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/data/hashtable_test.go new file mode 100644 index 000000000..addf1a35a --- /dev/null +++ b/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/data/hashtable_test.go @@ -0,0 +1,172 @@ +package data + +import ( + "math" + "os" + "testing" +) + +func TestPutGetReopenClear(t *testing.T) { + tmp := "/tmp/tiedot_test_hash" + os.Remove(tmp) + defer os.Remove(tmp) + ht, err := OpenHashTable(tmp) + if err != nil { + t.Fatalf("Failed to open: %v", err) + } + // Test initial size information + if !(ht.numBuckets == INITIAL_BUCKETS && ht.Used == INITIAL_BUCKETS*BUCKET_SIZE && ht.Size == HT_FILE_GROWTH) { + t.Fatal("Wrong size", ht.numBuckets, INITIAL_BUCKETS, ht.Used, INITIAL_BUCKETS*BUCKET_SIZE, ht.Size, HT_FILE_GROWTH) + } + for i := int(0); i < 1024*1024; i++ { + ht.Put(i, i) + } + for i := int(0); i < 1024*1024; i++ { + vals := ht.Get(i, 0) + if !(len(vals) == 1 && vals[0] == i) { + t.Fatalf("Get failed on key %d, got %v", i, vals) + } + } + numBuckets := ht.numBuckets + // Reopen the hash table and test the features + if ht.Close(); err != nil { + panic(err) + } + reopened, err := OpenHashTable(tmp) + if err != nil { + t.Fatalf("Failed to open: %v", err) + } + if reopened.numBuckets != numBuckets { + t.Fatalf("Wrong.numBuckets") + } + if reopened.Used != numBuckets*BUCKET_SIZE { + t.Fatalf("Wrong UsedSize") + } + for i := int(0); i < 1024*1024; i++ { + vals := reopened.Get(i, 0) + if !(len(vals) == 1 && vals[0] == i) { + t.Fatalf("Get failed on key %d, got %v", i, vals) + } + } + // Clear the hash table + if err = reopened.Clear(); err != nil { + t.Fatal(err) + } + if !(reopened.numBuckets == INITIAL_BUCKETS && reopened.Used == INITIAL_BUCKETS*BUCKET_SIZE) { + t.Fatal("Did not clear the hash table") + } + allKV := make(map[int]int) + for i := 0; i < 10; i++ { + keys, vals := reopened.GetPartition(i, 10) + for i, key := range keys { + allKV[key] = vals[i] + } + } + if len(allKV) != 0 { + t.Fatal("Did not clear the hash table") + } + if err = reopened.Close(); err != nil { + t.Fatal(err) + } +} + +func TestPutGet2(t *testing.T) { + tmp := "/tmp/tiedot_test_hash" + os.Remove(tmp) + defer os.Remove(tmp) + ht, err := OpenHashTable(tmp) + if err != nil { + t.Fatalf("Failed to open: %v", err) + return + } + defer ht.Close() + ht.Put(1, 1) + ht.Put(1, 2) + ht.Put(1, 3) + ht.Put(2, 1) + ht.Put(2, 2) + ht.Put(2, 3) + vals := ht.Get(1, 0) + if !(len(vals) == 3) { + t.Fatalf("Get failed, got %v", vals) + } + vals = ht.Get(2, 2) + if !(len(vals) == 2) { + t.Fatalf("Get failed, got %v", vals) + } +} + +func TestPutRemove(t *testing.T) { + tmp := "/tmp/tiedot_test_hash" + os.Remove(tmp) + defer os.Remove(tmp) + ht, err := OpenHashTable(tmp) + if err != nil { + t.Fatalf("Failed to open: %v", err) + return + } + defer ht.Close() + ht.Put(1, 1) + ht.Put(1, 2) + ht.Put(1, 3) + ht.Put(2, 1) + ht.Put(2, 2) + ht.Put(2, 3) + ht.Remove(1, 1) + ht.Remove(2, 2) + vals := ht.Get(1, 0) + if !(len(vals) == 2) { + t.Fatalf("Did not delete, still have %v", vals) + } + vals = ht.Get(2, 0) + if !(len(vals) == 2) { + t.Fatalf("Did not delete, still have %v", vals) + } +} + +func TestPartitionEntries(t *testing.T) { + tmp := "/tmp/tiedot_test_hash" + os.Remove(tmp) + defer os.Remove(tmp) + ht, err := OpenHashTable(tmp) + if err != nil { + t.Fatalf("Failed to open: %v", err) + return + } + defer ht.Close() + number := 2000000 + for i := 1; i <= number; i++ { + ht.Put(i, i*2) + if gotBack := ht.Get(i, 0); len(gotBack) != 1 || gotBack[0] != i*2 { + t.Fatal("Written ", i, i*2, "got back", gotBack) + } + } + for parts := 2; parts < 19; parts++ { + t.Log("parts is", parts) + allKV := make(map[int]int) + counter := 0 + for i := 0; i < parts; i++ { + start, end := GetPartitionRange(i, parts) + keys, vals := ht.GetPartition(i, parts) + t.Log("Between ", start, end, " there are ", len(keys)) + sizeDev := math.Abs(float64(len(keys)-number/parts)) / float64(number/parts) + t.Log("sizeDev", sizeDev) + if sizeDev > 0.1 { + t.Fatal("imbalanced keys") + } + for i, key := range keys { + allKV[key] = vals[i] + } + counter += len(keys) + } + // Verify read back + if counter != number { + t.Fatal("Number of entries does not match, got ", counter) + } + for i := 0; i < number; i++ { + if allKV[i] != i*2 { + t.Fatal("Wrong readback", i, allKV[i]) + } + } + } +} diff --git a/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/data/partition.go b/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/data/partition.go new file mode 100644 index 000000000..5ce85c0f4 --- /dev/null +++ b/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/data/partition.go @@ -0,0 +1,168 @@ +/* +(Collection) Partition is a collection data file accompanied by a hash table in order to allow addressing of a +document using an unchanging ID: +The hash table stores the unchanging ID as entry key and the physical document location as entry value. +*/ +package data + +import ( + "sync" + + "github.com/HouzuoGuo/tiedot/dberr" + "github.com/HouzuoGuo/tiedot/tdlog" +) + +// Partition associates a hash table with collection documents, allowing addressing of a document using an unchanging ID. +type Partition struct { + col *Collection + lookup *HashTable + updating map[int]struct{} + Lock *sync.RWMutex +} + +// Open a collection partition. +func OpenPartition(colPath, lookupPath string) (part *Partition, err error) { + part = &Partition{updating: make(map[int]struct{}), Lock: new(sync.RWMutex)} + if part.col, err = OpenCollection(colPath); err != nil { + return + } else if part.lookup, err = OpenHashTable(lookupPath); err != nil { + return + } + return +} + +// Insert a document. The ID may be used to retrieve/update/delete the document later on. +func (part *Partition) Insert(id int, data []byte) (physID int, err error) { + physID, err = part.col.Insert(data) + if err != nil { + return + } + part.lookup.Put(id, physID) + return +} + +// Find and retrieve a document by ID. +func (part *Partition) Read(id int) ([]byte, error) { + physID := part.lookup.Get(id, 1) + + if len(physID) == 0 { + return nil, dberr.New(dberr.ErrorNoDoc, id) + } + + data := part.col.Read(physID[0]) + + if data == nil { + return nil, dberr.New(dberr.ErrorNoDoc, id) + } + + return data, nil +} + +// Update a document. +func (part *Partition) Update(id int, data []byte) (err error) { + physID := part.lookup.Get(id, 1) + if len(physID) == 0 { + return dberr.New(dberr.ErrorNoDoc, id) + } + newID, err := part.col.Update(physID[0], data) + if err != nil { + return + } + if newID != physID[0] { + part.lookup.Remove(id, physID[0]) + part.lookup.Put(id, newID) + } + return +} + +// Lock a document for exclusive update. +func (part *Partition) LockUpdate(id int) (err error) { + if _, alreadyLocked := part.updating[id]; alreadyLocked { + return dberr.New(dberr.ErrorDocLocked, id) + } + part.updating[id] = struct{}{} + return +} + +// Unlock a document to make it ready for the next update. +func (part *Partition) UnlockUpdate(id int) { + delete(part.updating, id) +} + +// Delete a document. +func (part *Partition) Delete(id int) (err error) { + physID := part.lookup.Get(id, 1) + if len(physID) == 0 { + return dberr.New(dberr.ErrorNoDoc, id) + } + part.col.Delete(physID[0]) + part.lookup.Remove(id, physID[0]) + return +} + +// Partition documents into roughly equally sized portions, and run the function on every document in the portion. +func (part *Partition) ForEachDoc(partNum, totalPart int, fun func(id int, doc []byte) bool) (moveOn bool) { + ids, physIDs := part.lookup.GetPartition(partNum, totalPart) + for i, id := range ids { + data := part.col.Read(physIDs[i]) + if data != nil { + if !fun(id, data) { + return false + } + } + } + return true +} + +// Return approximate number of documents in the partition. +func (part *Partition) ApproxDocCount() int { + totalPart := 24 // not magic; a larger number makes estimation less accurate, but improves performance + for { + keys, _ := part.lookup.GetPartition(0, totalPart) + if len(keys) == 0 { + if totalPart < 8 { + return 0 // the hash table is really really empty + } + // Try a larger partition size + totalPart = totalPart / 2 + } else { + return int(float64(len(keys)) * float64(totalPart)) + } + } +} + +// Clear data file and lookup hash table. +func (part *Partition) Clear() error { + + var err error + + if e := part.col.Clear(); e != nil { + tdlog.CritNoRepeat("Failed to clear %s: %v", part.col.Path, e) + + err = dberr.New(dberr.ErrorIO) + } + + if e := part.lookup.Clear(); e != nil { + tdlog.CritNoRepeat("Failed to clear %s: %v", part.lookup.Path, e) + + err = dberr.New(dberr.ErrorIO) + } + + return err +} + +// Close all file handles. +func (part *Partition) Close() error { + + var err error + + if e := part.col.Close(); e != nil { + tdlog.CritNoRepeat("Failed to close %s: %v", part.col.Path, e) + err = dberr.New(dberr.ErrorIO) + } + if e := part.lookup.Close(); e != nil { + tdlog.CritNoRepeat("Failed to close %s: %v", part.lookup.Path, e) + err = dberr.New(dberr.ErrorIO) + } + return err +} diff --git a/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/data/partition_test.go b/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/data/partition_test.go new file mode 100644 index 000000000..95373b75b --- /dev/null +++ b/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/data/partition_test.go @@ -0,0 +1,137 @@ +package data + +import ( + "math/rand" + "os" + "strconv" + "testing" + "time" + + "github.com/HouzuoGuo/tiedot/dberr" +) + +func TestPartitionDocCRUD(t *testing.T) { + colPath := "/tmp/tiedot_test_col" + htPath := "/tmp/tiedot_test_ht" + os.Remove(colPath) + os.Remove(htPath) + defer os.Remove(colPath) + defer os.Remove(htPath) + part, err := OpenPartition(colPath, htPath) + if err != nil { + t.Fatal(err) + } + // Insert & read + if _, err = part.Insert(1, []byte("1")); err != nil { + t.Fatal(err) + } + if _, err = part.Insert(2, []byte("2")); err != nil { + t.Fatal(err) + } + if readback, err := part.Read(1); err != nil || string(readback) != "1 " { + t.Fatal(err, readback) + } + if readback, err := part.Read(2); err != nil || string(readback) != "2 " { + t.Fatal(err, readback) + } + // Update & read + if err = part.Update(1, []byte("abcdef")); err != nil { + t.Fatal(err) + } + if err := part.Update(1234, []byte("abcdef")); dberr.Type(err) != dberr.ErrorNoDoc { + t.Fatal("Did not error") + } + if readback, err := part.Read(1); err != nil || string(readback) != "abcdef " { + t.Fatal(err, readback) + } + // Delete & read + if err = part.Delete(1); err != nil { + t.Fatal(err) + } + if _, err = part.Read(1); dberr.Type(err) != dberr.ErrorNoDoc { + t.Fatal("Did not error") + } + if err = part.Delete(123); dberr.Type(err) != dberr.ErrorNoDoc { + t.Fatal("Did not error") + } + // Lock & unlock + if err = part.LockUpdate(123); err != nil { + t.Fatal(err) + } + if err = part.LockUpdate(123); dberr.Type(err) != dberr.ErrorDocLocked { + t.Fatal("Did not error") + } + part.UnlockUpdate(123) + if err = part.LockUpdate(123); err != nil { + t.Fatal(err) + } + // Foreach + part.ForEachDoc(0, 1, func(id int, doc []byte) bool { + if id != 2 || string(doc) != "2 " { + t.Fatal("ID 2 should be the only remaining document") + } + return true + }) + // Finish up + if err = part.Clear(); err != nil { + t.Fatal(err) + } + if err = part.Close(); err != nil { + t.Fatal(err) + } +} + +func TestApproxDocCount(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + colPath := "/tmp/tiedot_test_col" + htPath := "/tmp/tiedot_test_ht" + os.Remove(colPath) + os.Remove(htPath) + defer os.Remove(colPath) + defer os.Remove(htPath) + part, err := OpenPartition(colPath, htPath) + if err != nil { + t.Fatal(err) + } + defer part.Close() + // Insert 100 documents + for i := 0; i < 100; i++ { + if _, err = part.Insert(rand.Int(), []byte(strconv.Itoa(i))); err != nil { + t.Fatal(err) + } + } + t.Log("ApproxDocCount", part.ApproxDocCount()) + if part.ApproxDocCount() < 10 || part.ApproxDocCount() > 300 { + t.Fatal("Approximate is way off", part.ApproxDocCount()) + } + // Insert 900 documents + for i := 0; i < 900; i++ { + if _, err = part.Insert(rand.Int(), []byte(strconv.Itoa(i))); err != nil { + t.Fatal(err) + } + } + t.Log("ApproxDocCount", part.ApproxDocCount()) + if part.ApproxDocCount() < 500 || part.ApproxDocCount() > 1500 { + t.Fatal("Approximate is way off", part.ApproxDocCount()) + } + // Insert another 2000 documents + for i := 0; i < 2000; i++ { + if _, err = part.Insert(rand.Int(), []byte(strconv.Itoa(i))); err != nil { + t.Fatal(err) + } + } + t.Log("ApproxDocCount", part.ApproxDocCount()) + if part.ApproxDocCount() < 2000 || part.ApproxDocCount() > 4000 { + t.Fatal("Approximate is way off", part.ApproxDocCount()) + } + // See how fast doc count is + start := time.Now().UnixNano() + for i := 0; i < 1000; i++ { + part.ApproxDocCount() + } + timediff := time.Now().UnixNano() - start + t.Log("It took", timediff/1000000, "milliseconds") + if timediff/1000000 > 3500 { + t.Fatal("Algorithm is way too slow") + } +} diff --git a/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/db/col.go b/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/db/col.go new file mode 100644 index 000000000..f24815c1a --- /dev/null +++ b/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/db/col.go @@ -0,0 +1,233 @@ +/* Collection schema and index management. */ +package db + +import ( + "encoding/json" + "fmt" + "github.com/HouzuoGuo/tiedot/data" + "io/ioutil" + "os" + "path" + "strconv" + "strings" +) + +const ( + DOC_DATA_FILE = "dat_" // Prefix of partition collection data file name. + DOC_LOOKUP_FILE = "id_" // Prefix of partition hash table (ID lookup) file name. + INDEX_PATH_SEP = "!" // Separator between index keys in index directory name. +) + +// Collection has data partitions and some index meta information. +type Col struct { + db *DB + name string + parts []*data.Partition // Collection partitions + hts []map[string]*data.HashTable // Index partitions + indexPaths map[string][]string // Index names and paths +} + +// Open a collection and load all indexes. +func OpenCol(db *DB, name string) (*Col, error) { + col := &Col{db: db, name: name} + return col, col.load() +} + +// Load collection schema including index schema. +func (col *Col) load() error { + if err := os.MkdirAll(path.Join(col.db.path, col.name), 0700); err != nil { + return err + } + col.parts = make([]*data.Partition, col.db.numParts) + col.hts = make([]map[string]*data.HashTable, col.db.numParts) + for i := 0; i < col.db.numParts; i++ { + col.hts[i] = make(map[string]*data.HashTable) + } + col.indexPaths = make(map[string][]string) + // Open collection document partitions + for i := 0; i < col.db.numParts; i++ { + var err error + if col.parts[i], err = data.OpenPartition( + path.Join(col.db.path, col.name, DOC_DATA_FILE+strconv.Itoa(i)), + path.Join(col.db.path, col.name, DOC_LOOKUP_FILE+strconv.Itoa(i))); err != nil { + return err + } + } + // Look for index directories + colDirContent, err := ioutil.ReadDir(path.Join(col.db.path, col.name)) + if err != nil { + return err + } + for _, htDir := range colDirContent { + if !htDir.IsDir() { + continue + } + // Open index partitions + idxName := htDir.Name() + idxPath := strings.Split(idxName, INDEX_PATH_SEP) + col.indexPaths[idxName] = idxPath + for i := 0; i < col.db.numParts; i++ { + if col.hts[i] == nil { + col.hts[i] = make(map[string]*data.HashTable) + } + if col.hts[i][idxName], err = data.OpenHashTable( + path.Join(col.db.path, col.name, idxName, strconv.Itoa(i))); err != nil { + return err + } + } + } + return nil +} + +// Close all collection files. Do not use the collection afterwards! +func (col *Col) close() error { + errs := make([]error, 0, 0) + for i := 0; i < col.db.numParts; i++ { + col.parts[i].Lock.Lock() + if err := col.parts[i].Close(); err != nil { + errs = append(errs, err) + } + for _, ht := range col.hts[i] { + if err := ht.Close(); err != nil { + errs = append(errs, err) + } + } + col.parts[i].Lock.Unlock() + } + if len(errs) == 0 { + return nil + } + return fmt.Errorf("%v", errs) +} + +func (col *Col) forEachDoc(fun func(id int, doc []byte) (moveOn bool), placeSchemaLock bool) { + if placeSchemaLock { + col.db.schemaLock.RLock() + defer col.db.schemaLock.RUnlock() + } + // Process approx.4k documents in each iteration + partDiv := col.approxDocCount(false) / col.db.numParts / 4000 + if partDiv == 0 { + partDiv++ + } + for iteratePart := 0; iteratePart < col.db.numParts; iteratePart++ { + part := col.parts[iteratePart] + part.Lock.RLock() + for i := 0; i < partDiv; i++ { + if !part.ForEachDoc(i, partDiv, fun) { + part.Lock.RUnlock() + return + } + } + part.Lock.RUnlock() + } +} + +// Do fun for all documents in the collection. +func (col *Col) ForEachDoc(fun func(id int, doc []byte) (moveOn bool)) { + col.forEachDoc(fun, true) +} + +// Create an index on the path. +func (col *Col) Index(idxPath []string) (err error) { + col.db.schemaLock.Lock() + defer col.db.schemaLock.Unlock() + idxName := strings.Join(idxPath, INDEX_PATH_SEP) + if _, exists := col.indexPaths[idxName]; exists { + return fmt.Errorf("Path %v is already indexed", idxPath) + } + col.indexPaths[idxName] = idxPath + idxDir := path.Join(col.db.path, col.name, idxName) + if err = os.MkdirAll(idxDir, 0700); err != nil { + return err + } + for i := 0; i < col.db.numParts; i++ { + if col.hts[i][idxName], err = data.OpenHashTable(path.Join(idxDir, strconv.Itoa(i))); err != nil { + return err + } + } + // Put all documents on the new index + col.forEachDoc(func(id int, doc []byte) (moveOn bool) { + var docObj map[string]interface{} + if err := json.Unmarshal(doc, &docObj); err != nil { + // Skip corrupted document + return true + } + for _, idxVal := range GetIn(docObj, idxPath) { + if idxVal != nil { + hashKey := StrHash(fmt.Sprint(idxVal)) + col.hts[hashKey%col.db.numParts][idxName].Put(hashKey, id) + } + } + return true + }, false) + return +} + +// Return all indexed paths. +func (col *Col) AllIndexes() (ret [][]string) { + col.db.schemaLock.RLock() + defer col.db.schemaLock.RUnlock() + ret = make([][]string, 0, len(col.indexPaths)) + for _, path := range col.indexPaths { + pathCopy := make([]string, len(path)) + for i, p := range path { + pathCopy[i] = p + } + ret = append(ret, pathCopy) + } + return ret +} + +// Remove an index. +func (col *Col) Unindex(idxPath []string) error { + col.db.schemaLock.Lock() + defer col.db.schemaLock.Unlock() + idxName := strings.Join(idxPath, INDEX_PATH_SEP) + if _, exists := col.indexPaths[idxName]; !exists { + return fmt.Errorf("Path %v is not indexed", idxPath) + } + delete(col.indexPaths, idxName) + for i := 0; i < col.db.numParts; i++ { + col.hts[i][idxName].Close() + delete(col.hts[i], idxName) + } + if err := os.RemoveAll(path.Join(col.db.path, col.name, idxName)); err != nil { + return err + } + return nil +} + +func (col *Col) approxDocCount(placeSchemaLock bool) int { + if placeSchemaLock { + col.db.schemaLock.RLock() + defer col.db.schemaLock.RUnlock() + } + total := 0 + for _, part := range col.parts { + part.Lock.RLock() + total += part.ApproxDocCount() + part.Lock.RUnlock() + } + return total +} + +// Return approximate number of documents in the collection. +func (col *Col) ApproxDocCount() int { + return col.approxDocCount(true) +} + +// Divide the collection into roughly equally sized pages, and do fun on all documents in the specified page. +func (col *Col) ForEachDocInPage(page, total int, fun func(id int, doc []byte) bool) { + col.db.schemaLock.RLock() + defer col.db.schemaLock.RUnlock() + for iteratePart := 0; iteratePart < col.db.numParts; iteratePart++ { + part := col.parts[iteratePart] + part.Lock.RLock() + if !part.ForEachDoc(page, total, fun) { + part.Lock.RUnlock() + return + } + part.Lock.RUnlock() + } +} diff --git a/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/db/db.go b/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/db/db.go new file mode 100644 index 000000000..5c6767ad4 --- /dev/null +++ b/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/db/db.go @@ -0,0 +1,287 @@ +/* Collection and DB storage management. */ +package db + +import ( + "encoding/json" + "fmt" + "github.com/HouzuoGuo/tiedot/tdlog" + "io" + "io/ioutil" + "math/rand" + "os" + "path" + "path/filepath" + "runtime" + "strconv" + "strings" + "sync" + "time" +) + +const ( + PART_NUM_FILE = "number_of_partitions" // DB-collection-partition-number-configuration file name +) + +// Database structures. +type DB struct { + path string // Root path of database directory + numParts int // Total number of partitions + cols map[string]*Col // All collections + schemaLock *sync.RWMutex // Control access to collection instances. +} + +// Open database and load all collections & indexes. +func OpenDB(dbPath string) (*DB, error) { + rand.Seed(time.Now().UnixNano()) // document ID generation relies on this RNG + db := &DB{path: dbPath, schemaLock: new(sync.RWMutex)} + return db, db.load() +} + +// Load all collection schema. +func (db *DB) load() error { + // Create DB directory and PART_NUM_FILE if necessary + var numPartsAssumed = false + numPartsFilePath := path.Join(db.path, PART_NUM_FILE) + if err := os.MkdirAll(db.path, 0700); err != nil { + return err + } + if partNumFile, err := os.Stat(numPartsFilePath); err != nil { + // The new database has as many partitions as number of CPUs recognized by OS + if err := ioutil.WriteFile(numPartsFilePath, []byte(strconv.Itoa(runtime.NumCPU())), 0600); err != nil { + return err + } + numPartsAssumed = true + } else if partNumFile.IsDir() { + return fmt.Errorf("Database config file %s is actually a directory, is database path correct?", PART_NUM_FILE) + } + // Get number of partitions from the text file + if numParts, err := ioutil.ReadFile(numPartsFilePath); err != nil { + return err + } else if db.numParts, err = strconv.Atoi(strings.Trim(string(numParts), "\r\n ")); err != nil { + return err + } + // Look for collection directories and open the collections + db.cols = make(map[string]*Col) + dirContent, err := ioutil.ReadDir(db.path) + if err != nil { + return err + } + for _, maybeColDir := range dirContent { + if !maybeColDir.IsDir() { + continue + } + if numPartsAssumed { + return fmt.Errorf("Please manually repair database partition number config file %s", numPartsFilePath) + } + if db.cols[maybeColDir.Name()], err = OpenCol(db, maybeColDir.Name()); err != nil { + return err + } + } + return err +} + +// Close all database files. Do not use the DB afterwards! +func (db *DB) Close() error { + db.schemaLock.Lock() + defer db.schemaLock.Unlock() + errs := make([]error, 0, 0) + for _, col := range db.cols { + if err := col.close(); err != nil { + errs = append(errs, err) + } + } + if len(errs) == 0 { + return nil + } + return fmt.Errorf("%v", errs) +} + +// Create a new collection. +func (db *DB) Create(name string) error { + db.schemaLock.Lock() + defer db.schemaLock.Unlock() + if _, exists := db.cols[name]; exists { + return fmt.Errorf("Collection %s already exists", name) + } else if err := os.MkdirAll(path.Join(db.path, name), 0700); err != nil { + return err + } else if db.cols[name], err = OpenCol(db, name); err != nil { + return err + } + return nil +} + +// Return all collection names. +func (db *DB) AllCols() (ret []string) { + db.schemaLock.RLock() + defer db.schemaLock.RUnlock() + ret = make([]string, 0, len(db.cols)) + for name, _ := range db.cols { + ret = append(ret, name) + } + return +} + +// Use the return value to interact with collection. Return value may be nil if the collection does not exist. +func (db *DB) Use(name string) *Col { + db.schemaLock.RLock() + defer db.schemaLock.RUnlock() + if col, exists := db.cols[name]; exists { + return col + } + return nil +} + +// Rename a collection. +func (db *DB) Rename(oldName, newName string) error { + db.schemaLock.Lock() + defer db.schemaLock.Unlock() + if _, exists := db.cols[oldName]; !exists { + return fmt.Errorf("Collection %s does not exist", oldName) + } else if _, exists := db.cols[newName]; exists { + return fmt.Errorf("Collection %s already exists", newName) + } else if newName == oldName { + return fmt.Errorf("Old and new names are the same") + } else if err := db.cols[oldName].close(); err != nil { + return err + } else if err := os.Rename(path.Join(db.path, oldName), path.Join(db.path, newName)); err != nil { + return err + } else if db.cols[newName], err = OpenCol(db, newName); err != nil { + return err + } + delete(db.cols, oldName) + return nil +} + +// Truncate a collection - delete all documents and clear +func (db *DB) Truncate(name string) error { + db.schemaLock.Lock() + defer db.schemaLock.Unlock() + if _, exists := db.cols[name]; !exists { + return fmt.Errorf("Collection %s does not exist", name) + } + col := db.cols[name] + for i := 0; i < db.numParts; i++ { + if err := col.parts[i].Clear(); err != nil { + return err + } + for _, ht := range col.hts[i] { + if err := ht.Clear(); err != nil { + return err + } + } + } + return nil +} + +// Scrub a collection - fix corrupted documents and de-fragment free space. +func (db *DB) Scrub(name string) error { + db.schemaLock.Lock() + defer db.schemaLock.Unlock() + if _, exists := db.cols[name]; !exists { + return fmt.Errorf("Collection %s does not exist", name) + } + // Prepare a temporary collection in file system + tmpColName := fmt.Sprintf("scrub-%s-%d", name, time.Now().UnixNano()) + tmpColDir := path.Join(db.path, tmpColName) + if err := os.MkdirAll(tmpColDir, 0700); err != nil { + return err + } + // Mirror indexes from original collection + for _, idxPath := range db.cols[name].indexPaths { + if err := os.MkdirAll(path.Join(tmpColDir, strings.Join(idxPath, INDEX_PATH_SEP)), 0700); err != nil { + return err + } + } + // Iterate through all documents and put them into the temporary collection + tmpCol, err := OpenCol(db, tmpColName) + if err != nil { + return err + } + db.cols[name].forEachDoc(func(id int, doc []byte) bool { + var docObj map[string]interface{} + if err := json.Unmarshal([]byte(doc), &docObj); err != nil { + // Skip corrupted document + return true + } + if err := tmpCol.InsertRecovery(id, docObj); err != nil { + tdlog.Noticef("Scrub %s: failed to insert back document %v", name, docObj) + } + return true + }, false) + if err := tmpCol.close(); err != nil { + return err + } + // Replace the original collection with the "temporary" one + db.cols[name].close() + if err := os.RemoveAll(path.Join(db.path, name)); err != nil { + return err + } + if err := os.Rename(path.Join(db.path, tmpColName), path.Join(db.path, name)); err != nil { + return err + } + if db.cols[name], err = OpenCol(db, name); err != nil { + return err + } + return nil +} + +// Drop a collection and lose all of its documents and indexes. +func (db *DB) Drop(name string) error { + db.schemaLock.Lock() + defer db.schemaLock.Unlock() + if _, exists := db.cols[name]; !exists { + return fmt.Errorf("Collection %s does not exist", name) + } else if err := db.cols[name].close(); err != nil { + return err + } else if err := os.RemoveAll(path.Join(db.path, name)); err != nil { + return err + } + delete(db.cols, name) + return nil +} + +// Copy this database into destination directory (for backup). +func (db *DB) Dump(dest string) error { + db.schemaLock.Lock() + defer db.schemaLock.Unlock() + cpFun := func(currPath string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if info.IsDir() { + relPath, err := filepath.Rel(db.path, currPath) + if err != nil { + return err + } + destDir := path.Join(dest, relPath) + if err := os.MkdirAll(destDir, 0700); err != nil { + return err + } + tdlog.Noticef("Dump: created directory %s", destDir) + } else { + src, err := os.Open(currPath) + if err != nil { + return err + } + relPath, err := filepath.Rel(db.path, currPath) + if err != nil { + return err + } + destPath := path.Join(dest, relPath) + if _, fileExists := os.Open(destPath); fileExists == nil { + return fmt.Errorf("Destination file %s already exists", destPath) + } + destFile, err := os.Create(destPath) + if err != nil { + return err + } + written, err := io.Copy(destFile, src) + if err != nil { + return err + } + tdlog.Noticef("Dump: copied file %s, size is %d", destPath, written) + } + return nil + } + return filepath.Walk(db.path, cpFun) +} diff --git a/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/db/db_test.go b/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/db/db_test.go new file mode 100644 index 000000000..5b90fab19 --- /dev/null +++ b/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/db/db_test.go @@ -0,0 +1,255 @@ +package db + +import ( + "io/ioutil" + "os" + "path" + "runtime" + "testing" +) + +const ( + TEST_DATA_DIR = "/tmp/tiedot_test" +) + +func touchFile(dir, filename string) { + if err := os.MkdirAll(dir, 0700); err != nil { + panic(err) + } + if err := ioutil.WriteFile(path.Join(dir, filename), make([]byte, 0), 0600); err != nil { + panic(err) + } +} + +func TestOpenEmptyDB(t *testing.T) { + os.RemoveAll(TEST_DATA_DIR) + defer os.RemoveAll(TEST_DATA_DIR) + db, err := OpenDB(TEST_DATA_DIR) + if err != nil { + t.Fatal(err) + } + if db.numParts != runtime.NumCPU() { + t.Fatal(db.numParts) + } + if err := db.Create("a"); err != nil { + t.Fatal(err) + } + if len(db.cols["a"].parts) != runtime.NumCPU() { + t.Fatal(err) + } + if err := db.Close(); err != nil { + t.Fatal(err) + } +} + +func TestOpenErrDB(t *testing.T) { + os.RemoveAll(TEST_DATA_DIR) + defer os.RemoveAll(TEST_DATA_DIR) + if err := os.MkdirAll(TEST_DATA_DIR, 0700); err != nil { + t.Fatal(err) + } + touchFile(TEST_DATA_DIR+"/ColA", "dat_0") + touchFile(TEST_DATA_DIR+"/ColA/a!b!c", "0") + if db, err := OpenDB(TEST_DATA_DIR); err == nil { + t.Fatal("Did not error") + } else if err := db.Close(); err != nil { + t.Fatal(err) + } +} + +func TestOpenCloseDB(t *testing.T) { + os.RemoveAll(TEST_DATA_DIR) + defer os.RemoveAll(TEST_DATA_DIR) + if err := os.MkdirAll(TEST_DATA_DIR, 0700); err != nil { + t.Fatal(err) + } + if err := ioutil.WriteFile(TEST_DATA_DIR+"/number_of_partitions", []byte("2"), 0600); err != nil { + t.Fatal(err) + } + touchFile(TEST_DATA_DIR+"/ColA", "dat_0") + touchFile(TEST_DATA_DIR+"/ColA/a!b!c", "0") + if err := os.MkdirAll(TEST_DATA_DIR+"/ColB", 0700); err != nil { + panic(err) + } + db, err := OpenDB(TEST_DATA_DIR) + if err != nil { + t.Fatal(err) + } + if db.path != TEST_DATA_DIR || db.numParts != 2 || db.cols["ColA"] == nil || db.cols["ColB"] == nil { + t.Fatal(db.cols) + } + colA := db.cols["ColA"] + colB := db.cols["ColB"] + if len(colA.parts) != 2 || len(colA.hts) != 2 { + t.Fatal(colA) + } + if colA.indexPaths["a!b!c"][0] != "a" || colA.indexPaths["a!b!c"][1] != "b" || colA.indexPaths["a!b!c"][2] != "c" { + t.Fatal(colA.indexPaths) + } + if colA.hts[0]["a!b!c"] == nil || colA.hts[1]["a!b!c"] == nil { + t.Fatal(colA.hts) + } + if len(colB.parts) != 2 || len(colB.hts) != 2 { + t.Fatal(colB) + } + if err := db.Close(); err != nil { + t.Fatal(err) + } +} + +func TestColCrud(t *testing.T) { + os.RemoveAll(TEST_DATA_DIR) + defer os.RemoveAll(TEST_DATA_DIR) + if err := os.MkdirAll(TEST_DATA_DIR, 0700); err != nil { + t.Fatal(err) + } + if err := ioutil.WriteFile(TEST_DATA_DIR+"/number_of_partitions", []byte("2"), 0600); err != nil { + t.Fatal(err) + } + db, err := OpenDB(TEST_DATA_DIR) + if err != nil { + t.Fatal(err) + } + if len(db.AllCols()) != 0 { + t.Fatal(db.AllCols()) + } + // Create + if err := db.Create("a"); err != nil { + t.Fatal(err) + } + if db.Create("a") == nil { + t.Fatal("Did not error") + } + if err := db.Create("b"); err != nil { + t.Fatal(err) + } + // Get all names & use + if allNames := db.AllCols(); len(allNames) != 2 || !(allNames[0] == "a" && allNames[1] == "b" || allNames[0] == "b" && allNames[1] == "a") { + t.Fatal(allNames) + } + if db.Use("a") == nil || db.Use("b") == nil || db.Use("abcde") != nil { + t.Fatal(db.cols) + } + // Rename + if db.Rename("a", "a") == nil { + t.Fatal("Did not error") + } + if db.Rename("a", "b") == nil { + t.Fatal("Did not error") + } + if db.Rename("abc", "b") == nil { + t.Fatal("Did not error") + } + if err := db.Rename("a", "c"); err != nil { + t.Fatal(err) + } + if err := db.Rename("b", "d"); err != nil { + t.Fatal(err) + } + // Rename - verify + if allNames := db.AllCols(); len(allNames) != 2 || !(allNames[0] == "d" && allNames[1] == "c" || allNames[0] == "c" && allNames[1] == "d") { + t.Fatal(allNames) + } + if db.Use("c") == nil || db.Use("d") == nil || db.Use("a") != nil { + t.Fatal(db.cols) + } + // Truncate + if db.Truncate("a") == nil { + t.Fatal("Did not error") + } + if err := db.Truncate("c"); err != nil { + t.Fatal(err) + } + if err := db.Truncate("d"); err != nil { + t.Fatal(err) + } + // Truncate - verify + if allNames := db.AllCols(); len(allNames) != 2 || !(allNames[0] == "d" && allNames[1] == "c" || allNames[0] == "c" && allNames[1] == "d") { + t.Fatal(allNames) + } + if db.Use("c") == nil || db.Use("d") == nil || db.Use("a") != nil { + t.Fatal(db.cols) + } + // Scrub + if err := db.Scrub("c"); err != nil { + t.Fatal(err) + } + // Scrub - verify + if allNames := db.AllCols(); len(allNames) != 2 || !(allNames[0] == "d" && allNames[1] == "c" || allNames[0] == "c" && allNames[1] == "d") { + t.Fatal(allNames) + } + if db.Use("c") == nil || db.Use("d") == nil || db.Use("a") != nil { + t.Fatal(db.cols) + } + // More scrub tests are in doc_test.go + // Drop + if db.Drop("a") == nil { + t.Fatal("Did not error") + } + if err := db.Drop("c"); err != nil { + t.Fatal(err) + } + if allNames := db.AllCols(); len(allNames) != 1 || allNames[0] != "d" { + t.Fatal(allNames) + } + if db.Use("d") == nil { + t.Fatal(db.cols) + } + if err := db.Drop("d"); err != nil { + t.Fatal(err) + } + if allNames := db.AllCols(); len(allNames) != 0 { + t.Fatal(allNames) + } + if db.Use("d") != nil { + t.Fatal(db.cols) + } + if err := db.Close(); err != nil { + t.Fatal(err) + } +} + +func TestDumpDB(t *testing.T) { + os.RemoveAll(TEST_DATA_DIR) + os.RemoveAll(TEST_DATA_DIR + "bak") + defer os.RemoveAll(TEST_DATA_DIR) + defer os.RemoveAll(TEST_DATA_DIR + "bak") + if err := os.MkdirAll(TEST_DATA_DIR, 0700); err != nil { + t.Fatal(err) + } + if err := ioutil.WriteFile(TEST_DATA_DIR+"/number_of_partitions", []byte("2"), 0600); err != nil { + t.Fatal(err) + } + db, err := OpenDB(TEST_DATA_DIR) + if err != nil { + t.Fatal(err) + } + if err := db.Create("a"); err != nil { + t.Fatal(err) + } else if err := db.Create("b"); err != nil { + t.Fatal(err) + } + id1, err := db.Use("a").Insert(map[string]interface{}{"whatever": "1"}) + if err != nil { + t.Fatal(err) + } else if err := db.Dump(TEST_DATA_DIR + "bak"); err != nil { + t.Fatal(err) + } + // Open the new database + db2, err := OpenDB(TEST_DATA_DIR + "bak") + if err != nil { + t.Fatal(err) + } + if allCols := db2.AllCols(); !(allCols[0] == "a" && allCols[1] == "b" || allCols[0] == "b" && allCols[1] == "a") { + t.Fatal(allCols) + } + if doc, err := db2.Use("a").Read(id1); err != nil || doc["whatever"].(string) != "1" { + t.Fatal(doc, err) + } + if err := db.Close(); err != nil { + t.Fatal(err) + } + if err := db2.Close(); err != nil { + t.Fatal(err) + } +} diff --git a/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/db/doc.go b/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/db/doc.go new file mode 100644 index 000000000..99c6100b5 --- /dev/null +++ b/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/db/doc.go @@ -0,0 +1,246 @@ +/* Document management and index maintenance. */ +package db + +import ( + "encoding/json" + "fmt" + "github.com/HouzuoGuo/tiedot/tdlog" + "math/rand" +) + +// Resolve the attribute(s) in the document structure along the given path. +func GetIn(doc interface{}, path []string) (ret []interface{}) { + docMap, ok := doc.(map[string]interface{}) + if !ok { + return + } + var thing interface{} = docMap + // Get into each path segment + for i, seg := range path { + if aMap, ok := thing.(map[string]interface{}); ok { + thing = aMap[seg] + } else if anArray, ok := thing.([]interface{}); ok { + for _, element := range anArray { + ret = append(ret, GetIn(element, path[i:])...) + } + return ret + } else { + return nil + } + } + switch thing.(type) { + case []interface{}: + return append(ret, thing.([]interface{})...) + default: + return append(ret, thing) + } +} + +// Hash a string using sdbm algorithm. +func StrHash(str string) int { + var hash int + for _, c := range str { + hash = int(c) + (hash << 6) + (hash << 16) - hash + } + if hash < 0 { + return -hash + } + return hash +} + +// Put a document on all user-created indexes. +func (col *Col) indexDoc(id int, doc map[string]interface{}) { + for idxName, idxPath := range col.indexPaths { + for _, idxVal := range GetIn(doc, idxPath) { + if idxVal != nil { + hashKey := StrHash(fmt.Sprint(idxVal)) + partNum := hashKey % col.db.numParts + ht := col.hts[partNum][idxName] + ht.Lock.Lock() + ht.Put(hashKey, id) + ht.Lock.Unlock() + } + } + } +} + +// Remove a document from all user-created indexes. +func (col *Col) unindexDoc(id int, doc map[string]interface{}) { + for idxName, idxPath := range col.indexPaths { + for _, idxVal := range GetIn(doc, idxPath) { + if idxVal != nil { + hashKey := StrHash(fmt.Sprint(idxVal)) + partNum := hashKey % col.db.numParts + ht := col.hts[partNum][idxName] + ht.Lock.Lock() + ht.Remove(hashKey, id) + ht.Lock.Unlock() + } + } + } +} + +// Insert a document with the specified ID into the collection (incl. index). Does not place partition/schema lock. +func (col *Col) InsertRecovery(id int, doc map[string]interface{}) (err error) { + docJS, err := json.Marshal(doc) + if err != nil { + return + } + partNum := id % col.db.numParts + part := col.parts[partNum] + // Put document data into collection + if _, err = part.Insert(id, []byte(docJS)); err != nil { + return + } + // Index the document + col.indexDoc(id, doc) + return +} + +// Insert a document into the collection. +func (col *Col) Insert(doc map[string]interface{}) (id int, err error) { + docJS, err := json.Marshal(doc) + if err != nil { + return + } + id = rand.Int() + partNum := id % col.db.numParts + col.db.schemaLock.RLock() + part := col.parts[partNum] + // Put document data into collection + part.Lock.Lock() + if _, err = part.Insert(id, []byte(docJS)); err != nil { + part.Lock.Unlock() + col.db.schemaLock.RUnlock() + return + } + // If another thread is updating the document in the meanwhile, let it take over index maintenance + if err = part.LockUpdate(id); err != nil { + part.Lock.Unlock() + col.db.schemaLock.RUnlock() + return id, nil + } + part.Lock.Unlock() + // Index the document + col.indexDoc(id, doc) + part.Lock.Lock() + part.UnlockUpdate(id) + part.Lock.Unlock() + col.db.schemaLock.RUnlock() + return +} + +func (col *Col) read(id int, placeSchemaLock bool) (doc map[string]interface{}, err error) { + if placeSchemaLock { + col.db.schemaLock.RLock() + } + part := col.parts[id%col.db.numParts] + part.Lock.RLock() + docB, err := part.Read(id) + part.Lock.RUnlock() + if err != nil { + if placeSchemaLock { + col.db.schemaLock.RUnlock() + } + return + } + err = json.Unmarshal(docB, &doc) + if placeSchemaLock { + col.db.schemaLock.RUnlock() + } + return +} + +// Find and retrieve a document by ID. +func (col *Col) Read(id int) (doc map[string]interface{}, err error) { + return col.read(id, true) +} + +// Update a document. +func (col *Col) Update(id int, doc map[string]interface{}) error { + if doc == nil { + return fmt.Errorf("Updating %d: input doc may not be nil", id) + } + docJS, err := json.Marshal(doc) + if err != nil { + return err + } + col.db.schemaLock.RLock() + part := col.parts[id%col.db.numParts] + part.Lock.Lock() + // Place lock, read back original document and update + if err := part.LockUpdate(id); err != nil { + part.Lock.Unlock() + col.db.schemaLock.RUnlock() + return err + } + originalB, err := part.Read(id) + if err != nil { + part.UnlockUpdate(id) + part.Lock.Unlock() + col.db.schemaLock.RUnlock() + return err + } + var original map[string]interface{} + if err = json.Unmarshal(originalB, &original); err != nil { + tdlog.Noticef("Will not attempt to unindex document %d during update", id) + } + if err = part.Update(id, []byte(docJS)); err != nil { + part.UnlockUpdate(id) + part.Lock.Unlock() + col.db.schemaLock.RUnlock() + return err + } + // Done with the collection data, next is to maintain indexed values + part.Lock.Unlock() + if original != nil { + col.unindexDoc(id, original) + } + col.indexDoc(id, doc) + // Done with the document + part.Lock.Lock() + part.UnlockUpdate(id) + part.Lock.Unlock() + col.db.schemaLock.RUnlock() + return nil +} + +// Delete a document. +func (col *Col) Delete(id int) error { + col.db.schemaLock.RLock() + part := col.parts[id%col.db.numParts] + part.Lock.Lock() + // Place lock, read back original document and delete document + if err := part.LockUpdate(id); err != nil { + part.Lock.Unlock() + col.db.schemaLock.RUnlock() + return err + } + originalB, err := part.Read(id) + if err != nil { + part.UnlockUpdate(id) + part.Lock.Unlock() + col.db.schemaLock.RUnlock() + return err + } + var original map[string]interface{} + if err = json.Unmarshal(originalB, &original); err != nil { + tdlog.Noticef("Will not attempt to unindex document %d during delete", id) + } + if err = part.Delete(id); err != nil { + part.UnlockUpdate(id) + part.Lock.Unlock() + col.db.schemaLock.RUnlock() + return err + } + // Done with the collection data, next is to remove indexed values + part.Lock.Unlock() + if original != nil { + col.unindexDoc(id, original) + } + part.Lock.Lock() + part.UnlockUpdate(id) + part.Lock.Unlock() + col.db.schemaLock.RUnlock() + return nil +} diff --git a/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/db/doc_test.go b/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/db/doc_test.go new file mode 100644 index 000000000..df72d67cc --- /dev/null +++ b/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/db/doc_test.go @@ -0,0 +1,279 @@ +package db + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "os" + "strings" + "testing" + "time" + + "github.com/HouzuoGuo/tiedot/dberr" +) + +func StrHashTest(t *testing.T) { + strings := []string{"", " ", "abc", "123"} + hashes := []int{0, 32, 417419622498, 210861491250} + for i := range strings { + if StrHash(strings[i]) != hashes[i] { + t.Fatalf("Hash of %s equals to %d, it should equal to %d", strings[i], StrHash(strings[i]), hashes[i]) + } + } +} + +func GetInTest(t *testing.T) { + var obj interface{} + // Get inside a JSON object + json.Unmarshal([]byte(`{"a": {"b": {"c": 1}}}`), &obj) + if val, ok := GetIn(obj, []string{"a", "b", "c"})[0].(float64); !ok || val != 1 { + t.Fatal() + } + // Get inside a JSON array + json.Unmarshal([]byte(`{"a": {"b": {"c": [1, 2, 3]}}}`), &obj) + if val, ok := GetIn(obj, []string{"a", "b", "c"})[0].(float64); !ok || val != 1 { + t.Fatal() + } + if val, ok := GetIn(obj, []string{"a", "b", "c"})[1].(float64); !ok || val != 2 { + t.Fatal() + } + if val, ok := GetIn(obj, []string{"a", "b", "c"})[2].(float64); !ok || val != 3 { + t.Fatal() + } + // Get inside JSON objects contained in JSON array + json.Unmarshal([]byte(`{"a": [{"b": {"c": [1]}}, {"b": {"c": [2, 3]}}]}`), &obj) + if val, ok := GetIn(obj, []string{"a", "b", "c"})[0].(float64); !ok || val != 1 { + t.Fatal() + } + if val, ok := GetIn(obj, []string{"a", "b", "c"})[1].(float64); !ok || val != 2 { + t.Fatal() + } + if val, ok := GetIn(obj, []string{"a", "b", "c"})[2].(float64); !ok || val != 3 { + t.Fatal() + } + // Get inside a JSON array and fetch attributes from array elements, which are JSON objects + json.Unmarshal([]byte(`{"a": [{"b": {"c": [4]}}, {"b": {"c": [5, 6]}}], "d": [0, 9]}`), &obj) + if val, ok := GetIn(obj, []string{"a", "b", "c"})[0].(float64); !ok || val != 4 { + t.Fatal() + } + if val, ok := GetIn(obj, []string{"a", "b", "c"})[1].(float64); !ok || val != 5 { + t.Fatal() + } + if val, ok := GetIn(obj, []string{"a", "b", "c"})[2].(float64); !ok || val != 6 { + t.Fatal() + } + if len(GetIn(obj, []string{"a", "b", "c"})) != 3 { + t.Fatal() + } + if val, ok := GetIn(obj, []string{"d"})[0].(float64); !ok || val != 0 { + t.Fatal() + } + if val, ok := GetIn(obj, []string{"d"})[1].(float64); !ok || val != 9 { + t.Fatal() + } + if len(GetIn(obj, []string{"d"})) != 2 { + t.Fatal() + } + // Another example + json.Unmarshal([]byte(`{"a": {"b": [{"c": 2}]}, "d": 0}`), &obj) + if val, ok := GetIn(obj, []string{"a", "b", "c"})[0].(float64); !ok || val != 2 { + t.Fatal() + } + if len(GetIn(obj, []string{"a", "b", "c"})) != 1 { + t.Fatal() + } +} + +func idxHas(col *Col, path []string, idxVal interface{}, docID int) error { + idxName := strings.Join(path, INDEX_PATH_SEP) + hashKey := StrHash(fmt.Sprint(idxVal)) + vals := col.hts[hashKey%col.db.numParts][idxName].Get(hashKey, 0) + if len(vals) != 1 || vals[0] != docID { + return fmt.Errorf("Looking for %v (%v) docID %v in %v partition %d, but got result %v", idxVal, hashKey, docID, path, hashKey%col.db.numParts, vals) + } + return nil +} + +func idxHasNot(col *Col, path []string, idxVal, docID int) error { + idxName := strings.Join(path, INDEX_PATH_SEP) + hashKey := StrHash(fmt.Sprint(idxVal)) + vals := col.hts[hashKey%col.db.numParts][idxName].Get(hashKey, 0) + for _, v := range vals { + if v == docID { + return fmt.Errorf("Looking for %v %v %v in %v (should not return any), but got result %v", idxVal, hashKey, docID, path, vals) + } + } + return nil +} + +func TestDocCrudAndIdx(t *testing.T) { + os.RemoveAll(TEST_DATA_DIR) + defer os.RemoveAll(TEST_DATA_DIR) + if err := os.MkdirAll(TEST_DATA_DIR, 0700); err != nil { + t.Fatal(err) + } + if err := ioutil.WriteFile(TEST_DATA_DIR+"/number_of_partitions", []byte("2"), 0600); err != nil { + t.Fatal(err) + } + db, err := OpenDB(TEST_DATA_DIR) + if err != nil { + t.Fatal(err) + } + // Prepare collection and index + if err = db.Create("col"); err != nil { + t.Fatal(err) + } + col := db.Use("col") + if err = col.Index([]string{"a", "b"}); err != nil { + t.Fatal(err) + } + numDocs := 2011 + docIDs := make([]int, numDocs) + // Insert documents + for i := 0; i < numDocs; i++ { + if docIDs[i], err = col.Insert(map[string]interface{}{"a": map[string]interface{}{"b": i}}); err != nil { + t.Fatal(err) + } + } + // Read documents and verify index + if _, err = col.Read(123456); dberr.Type(err) != dberr.ErrorNoDoc { + t.Fatal("Did not error") + } + for i, docID := range docIDs { + if doc, err := col.Read(docID); err != nil || doc["a"].(map[string]interface{})["b"].(float64) != float64(i) { + t.Fatal(docID, doc) + } + if err = idxHas(col, []string{"a", "b"}, i, docID); err != nil { + t.Fatal(err) + } + } + // Update document + if err = col.Update(654321, map[string]interface{}{}); dberr.Type(err) != dberr.ErrorNoDoc { + t.Fatal("Did not error") + } + for i, docID := range docIDs { + // i -> i * 2 + if err = col.Update(docID, map[string]interface{}{"a": map[string]interface{}{"b": i * 2}}); err != nil { + t.Fatal(err) + } + } + // After update - verify documents and index + for i, docID := range docIDs { + if doc, err := col.Read(docID); err != nil || doc["a"].(map[string]interface{})["b"].(float64) != float64(i*2) { + t.Fatal(docID, doc) + } + if i == 0 { + if err = idxHas(col, []string{"a", "b"}, 0, docID); err != nil { + t.Fatal(err) + } + } else { + if err = idxHasNot(col, []string{"a", "b"}, i, docID); err != nil { + t.Fatal(err) + } + if err = idxHas(col, []string{"a", "b"}, i*2, docID); err != nil { + t.Fatal(err) + } + } + } + // Delete half of those documents + if err = col.Delete(654321); dberr.Type(err) != dberr.ErrorNoDoc { + t.Fatal("Did not error") + } + for i := 0; i < numDocs/2+1; i++ { + if err := col.Delete(docIDs[i]); err != nil { + t.Fatal(err) + } + if err := col.Delete(docIDs[i]); dberr.Type(err) != dberr.ErrorNoDoc { + t.Fatal("Did not error") + } + } + // After delete - verify + for i, docID := range docIDs { + if i < numDocs/2+1 { + // After delete - verify deleted documents and index + if _, err := col.Read(docID); dberr.Type(err) != dberr.ErrorNoDoc { + t.Fatal("Did not delete", i, docID) + } + if err = idxHasNot(col, []string{"a", "b"}, i*2, docID); err != nil { + t.Fatal(err) + } + } else { + // After delete - verify unaffected documents and index + if doc, err := col.Read(docID); err != nil || doc["a"].(map[string]interface{})["b"].(float64) != float64(i*2) { + t.Fatal(docID, doc) + } + if err = idxHas(col, []string{"a", "b"}, i*2, docID); err != nil { + t.Fatal(err) + } + } + } + // Recreate index and verify + if err = col.Unindex([]string{"a", "b"}); err != nil { + t.Fatal(err) + } + if err = col.Index([]string{"a", "b"}); err != nil { + t.Fatal(err) + } + for i := numDocs/2 + 1; i < numDocs; i++ { + if doc, err := col.Read(docIDs[i]); err != nil || doc["a"].(map[string]interface{})["b"].(float64) != float64(i*2) { + t.Fatal(doc, err) + } + if err = idxHas(col, []string{"a", "b"}, i*2, docIDs[i]); err != nil { + t.Fatal(err) + } + } + + // Verify that there are approximately 1000 documents + t.Log("ApproxDocCount", col.ApproxDocCount()) + if col.ApproxDocCount() < 600 || col.ApproxDocCount() > 1400 { + t.Fatal("Approximate is way off", col.ApproxDocCount()) + } + + // Scrub and verify + if err = db.Scrub("col"); err != nil { + t.Fatal(err) + } + col = db.Use("col") + for i := numDocs/2 + 1; i < numDocs; i++ { + if doc, err := col.Read(docIDs[i]); err != nil || doc["a"].(map[string]interface{})["b"].(float64) != float64(i*2) { + t.Fatal(doc, err) + } + if err = idxHas(col, []string{"a", "b"}, i*2, docIDs[i]); err != nil { + t.Fatal(err) + } + } + + // Iterate over all documents 10 times + start := time.Now().UnixNano() + for i := 0; i < 10; i++ { + col.ForEachDoc(func(_ int, _ []byte) bool { + return true + }) + } + timediff := time.Now().UnixNano() - start + t.Log("It took", timediff/1000000, "milliseconds") + + // Verify again that there are approximately 1000 documents + t.Log("ApproxDocCount", col.ApproxDocCount()) + if col.ApproxDocCount() < 600 || col.ApproxDocCount() > 1400 { + t.Fatal("Approximate is way off", col.ApproxDocCount()) + } + + // Read back all documents page by pabe + totalPage := col.ApproxDocCount() / 100 + collectedIDs := make(map[int]struct{}) + for page := 0; page < totalPage; page++ { + col.ForEachDocInPage(page, totalPage, func(id int, _ []byte) bool { + collectedIDs[id] = struct{}{} + return true + }) + t.Log("Went through page ", page, " got ", len(collectedIDs), " documents so far") + } + if len(collectedIDs) != numDocs/2 { + t.Fatal("Wrong number of docs", len(collectedIDs)) + } + + if err = db.Close(); err != nil { + t.Fatal(err) + } +} diff --git a/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/db/idx_test.go b/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/db/idx_test.go new file mode 100644 index 000000000..8a8224cb8 --- /dev/null +++ b/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/db/idx_test.go @@ -0,0 +1,68 @@ +package db + +import ( + "io/ioutil" + "os" + "strings" + "testing" +) + +func TestIdxCRUD(t *testing.T) { + os.RemoveAll(TEST_DATA_DIR) + defer os.RemoveAll(TEST_DATA_DIR) + if err := os.MkdirAll(TEST_DATA_DIR, 0700); err != nil { + t.Fatal(err) + } + if err := ioutil.WriteFile(TEST_DATA_DIR+"/number_of_partitions", []byte("2"), 0600); err != nil { + t.Fatal(err) + } + db, err := OpenDB(TEST_DATA_DIR) + if err != nil { + t.Fatal(err) + } + if err = db.Create("col"); err != nil { + t.Fatal(err) + } + col := db.Use("col") + if len(col.AllIndexes()) != 0 { + t.Fatal(col.AllIndexes()) + } + // Create index & verify + if err = col.Index([]string{"a", "b"}); err != nil { + t.Fatal(err) + } + if col.Index([]string{"a", "b"}) == nil { + t.Fatal(col.indexPaths, "Did not error") + } + if len(col.AllIndexes()) != 1 || col.AllIndexes()[0][0] != "a" || col.AllIndexes()[0][1] != "b" { + t.Fatal(col.AllIndexes()) + } + if err = col.Index([]string{"c"}); err != nil { + t.Fatal(err) + } + allIndexes := col.AllIndexes() + idx0 := strings.Join(allIndexes[0], ",") + idx1 := strings.Join(allIndexes[1], ",") + if !(idx0 == "a,b" && idx1 == "c" || idx0 == "c" && idx1 == "a,b") { + t.Fatal(allIndexes) + } + // Unindex & verify + if col.Unindex([]string{"%&^*"}) == nil { + t.Fatal("Did not error") + } + if err = col.Unindex([]string{"c"}); err != nil { + t.Fatal(err) + } + if len(col.AllIndexes()) != 1 || col.AllIndexes()[0][0] != "a" || col.AllIndexes()[0][1] != "b" { + t.Fatal(col.AllIndexes()) + } + if err = col.Unindex([]string{"a", "b"}); err != nil { + t.Fatal(err) + } + if len(col.AllIndexes()) != 0 { + t.Fatal(col.AllIndexes()) + } + if err := db.Close(); err != nil { + t.Fatal(err) + } +} diff --git a/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/db/query.go b/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/db/query.go new file mode 100644 index 000000000..8ff19a4dd --- /dev/null +++ b/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/db/query.go @@ -0,0 +1,343 @@ +/* Query processor. */ +package db + +import ( + "errors" + "fmt" + "strconv" + "strings" + + "github.com/HouzuoGuo/tiedot/dberr" + "github.com/HouzuoGuo/tiedot/tdlog" +) + +// Calculate union of sub-query results. +func EvalUnion(exprs []interface{}, src *Col, result *map[int]struct{}) (err error) { + for _, subExpr := range exprs { + if err = evalQuery(subExpr, src, result, false); err != nil { + return + } + } + return +} + +// Put all document IDs into result. +func EvalAllIDs(src *Col, result *map[int]struct{}) (err error) { + src.forEachDoc(func(id int, _ []byte) bool { + (*result)[id] = struct{}{} + return true + }, false) + return +} + +// Value equity check ("attribute == value") using hash lookup. +func Lookup(lookupValue interface{}, expr map[string]interface{}, src *Col, result *map[int]struct{}) (err error) { + // Figure out lookup path - JSON array "in" + path, hasPath := expr["in"] + if !hasPath { + return errors.New("Missing lookup path `in`") + } + vecPath := make([]string, 0) + if vecPathInterface, ok := path.([]interface{}); ok { + for _, v := range vecPathInterface { + vecPath = append(vecPath, fmt.Sprint(v)) + } + } else { + return errors.New(fmt.Sprintf("Expecting vector lookup path `in`, but %v given", path)) + } + // Figure out result number limit + intLimit := int(0) + if limit, hasLimit := expr["limit"]; hasLimit { + if floatLimit, ok := limit.(float64); ok { + intLimit = int(floatLimit) + } else if _, ok := limit.(int); ok { + intLimit = limit.(int) + } else { + return dberr.New(dberr.ErrorExpectingInt, "limit", limit) + } + } + lookupStrValue := fmt.Sprint(lookupValue) // the value to look for + lookupValueHash := StrHash(lookupStrValue) + scanPath := strings.Join(vecPath, INDEX_PATH_SEP) + if _, indexed := src.indexPaths[scanPath]; !indexed { + return dberr.New(dberr.ErrorNeedIndex, scanPath, expr) + } + num := lookupValueHash % src.db.numParts + ht := src.hts[num][scanPath] + ht.Lock.RLock() + vals := ht.Get(lookupValueHash, intLimit) + ht.Lock.RUnlock() + for _, match := range vals { + // Filter result to avoid hash collision + if doc, err := src.read(match, false); err == nil { + for _, v := range GetIn(doc, vecPath) { + if fmt.Sprint(v) == lookupStrValue { + (*result)[match] = struct{}{} + } + } + } + } + return +} + +// Value existence check (value != nil) using hash lookup. +func PathExistence(hasPath interface{}, expr map[string]interface{}, src *Col, result *map[int]struct{}) (err error) { + // Figure out the path + vecPath := make([]string, 0) + if vecPathInterface, ok := hasPath.([]interface{}); ok { + for _, v := range vecPathInterface { + vecPath = append(vecPath, fmt.Sprint(v)) + } + } else { + return errors.New(fmt.Sprintf("Expecting vector path, but %v given", hasPath)) + } + // Figure out result number limit + intLimit := 0 + if limit, hasLimit := expr["limit"]; hasLimit { + if floatLimit, ok := limit.(float64); ok { + intLimit = int(floatLimit) + } else if _, ok := limit.(int); ok { + intLimit = limit.(int) + } else { + return dberr.New(dberr.ErrorExpectingInt, "limit", limit) + } + } + jointPath := strings.Join(vecPath, INDEX_PATH_SEP) + if _, indexed := src.indexPaths[jointPath]; !indexed { + return dberr.New(dberr.ErrorNeedIndex, vecPath, expr) + } + counter := 0 + partDiv := src.approxDocCount(false) / src.db.numParts / 4000 // collect approx. 4k document IDs in each iteration + if partDiv == 0 { + partDiv++ + } + for iteratePart := 0; iteratePart < src.db.numParts; iteratePart++ { + ht := src.hts[iteratePart][jointPath] + ht.Lock.RLock() + for i := 0; i < partDiv; i++ { + _, ids := ht.GetPartition(i, partDiv) + for _, id := range ids { + (*result)[id] = struct{}{} + counter++ + if counter == intLimit { + ht.Lock.RUnlock() + return nil + } + } + } + ht.Lock.RUnlock() + } + return nil +} + +// Calculate intersection of sub-query results. +func Intersect(subExprs interface{}, src *Col, result *map[int]struct{}) (err error) { + myResult := make(map[int]struct{}) + if subExprVecs, ok := subExprs.([]interface{}); ok { + first := true + for _, subExpr := range subExprVecs { + subResult := make(map[int]struct{}) + intersection := make(map[int]struct{}) + if err = evalQuery(subExpr, src, &subResult, false); err != nil { + return + } + if first { + myResult = subResult + first = false + } else { + for k, _ := range subResult { + if _, inBoth := myResult[k]; inBoth { + intersection[k] = struct{}{} + } + } + myResult = intersection + } + } + for docID := range myResult { + (*result)[docID] = struct{}{} + } + } else { + return dberr.New(dberr.ErrorExpectingSubQuery, subExprs) + } + return +} + +// Calculate complement of sub-query results. +func Complement(subExprs interface{}, src *Col, result *map[int]struct{}) (err error) { + myResult := make(map[int]struct{}) + if subExprVecs, ok := subExprs.([]interface{}); ok { + for _, subExpr := range subExprVecs { + subResult := make(map[int]struct{}) + complement := make(map[int]struct{}) + if err = evalQuery(subExpr, src, &subResult, false); err != nil { + return + } + for k, _ := range subResult { + if _, inBoth := myResult[k]; !inBoth { + complement[k] = struct{}{} + } + } + for k, _ := range myResult { + if _, inBoth := subResult[k]; !inBoth { + complement[k] = struct{}{} + } + } + myResult = complement + } + for docID := range myResult { + (*result)[docID] = struct{}{} + } + } else { + return dberr.New(dberr.ErrorExpectingSubQuery, subExprs) + } + return +} + +func (col *Col) hashScan(idxName string, key, limit int) []int { + ht := col.hts[key%col.db.numParts][idxName] + ht.Lock.RLock() + vals := ht.Get(key, limit) + ht.Lock.RUnlock() + return vals +} + +// Look for indexed integer values within the specified integer range. +func IntRange(intFrom interface{}, expr map[string]interface{}, src *Col, result *map[int]struct{}) (err error) { + path, hasPath := expr["in"] + if !hasPath { + return errors.New("Missing path `in`") + } + // Figure out the path + vecPath := make([]string, 0) + if vecPathInterface, ok := path.([]interface{}); ok { + for _, v := range vecPathInterface { + vecPath = append(vecPath, fmt.Sprint(v)) + } + } else { + return errors.New(fmt.Sprintf("Expecting vector path `in`, but %v given", path)) + } + // Figure out result number limit + intLimit := int(0) + if limit, hasLimit := expr["limit"]; hasLimit { + if floatLimit, ok := limit.(float64); ok { + intLimit = int(floatLimit) + } else if _, ok := limit.(int); ok { + intLimit = limit.(int) + } else { + return dberr.New(dberr.ErrorExpectingInt, limit) + } + } + // Figure out the range ("from" value & "to" value) + from, to := int(0), int(0) + if floatFrom, ok := intFrom.(float64); ok { + from = int(floatFrom) + } else if _, ok := intFrom.(int); ok { + from = intFrom.(int) + } else { + return dberr.New(dberr.ErrorExpectingInt, "int-from", from) + } + if intTo, ok := expr["int-to"]; ok { + if floatTo, ok := intTo.(float64); ok { + to = int(floatTo) + } else if _, ok := intTo.(int); ok { + to = intTo.(int) + } else { + return dberr.New(dberr.ErrorExpectingInt, "int-to", to) + } + } else if intTo, ok := expr["int to"]; ok { + if floatTo, ok := intTo.(float64); ok { + to = int(floatTo) + } else if _, ok := intTo.(int); ok { + to = intTo.(int) + } else { + return dberr.New(dberr.ErrorExpectingInt, "int to", to) + } + } else { + return dberr.New(dberr.ErrorMissing, "int-to") + } + if to > from && to-from > 1000 || from > to && from-to > 1000 { + tdlog.CritNoRepeat("Query %v involves index lookup on more than 1000 values, which can be very inefficient", expr) + } + counter := int(0) // Number of results already collected + htPath := strings.Join(vecPath, ",") + if _, indexScan := src.indexPaths[htPath]; !indexScan { + return dberr.New(dberr.ErrorNeedIndex, vecPath, expr) + } + if from < to { + // Forward scan - from low value to high value + for lookupValue := from; lookupValue <= to; lookupValue++ { + lookupStrValue := fmt.Sprint(lookupValue) + hashValue := StrHash(lookupStrValue) + vals := src.hashScan(htPath, hashValue, int(intLimit)) + for _, docID := range vals { + if intLimit > 0 && counter == intLimit { + break + } + counter += 1 + (*result)[docID] = struct{}{} + } + } + } else { + // Backward scan - from high value to low value + for lookupValue := from; lookupValue >= to; lookupValue-- { + lookupStrValue := fmt.Sprint(lookupValue) + hashValue := StrHash(lookupStrValue) + vals := src.hashScan(htPath, hashValue, int(intLimit)) + for _, docID := range vals { + if intLimit > 0 && counter == intLimit { + break + } + counter += 1 + (*result)[docID] = struct{}{} + } + } + } + return +} + +func evalQuery(q interface{}, src *Col, result *map[int]struct{}, placeSchemaLock bool) (err error) { + if placeSchemaLock { + src.db.schemaLock.RLock() + defer src.db.schemaLock.RUnlock() + } + switch expr := q.(type) { + case []interface{}: // [sub query 1, sub query 2, etc] + return EvalUnion(expr, src, result) + case string: + if expr == "all" { + return EvalAllIDs(src, result) + } else { + // Might be single document number + docID, err := strconv.ParseInt(expr, 10, 64) + if err != nil { + return dberr.New(dberr.ErrorExpectingInt, "Single Document ID", docID) + } + (*result)[int(docID)] = struct{}{} + } + case map[string]interface{}: + if lookupValue, lookup := expr["eq"]; lookup { // eq - lookup + return Lookup(lookupValue, expr, src, result) + } else if hasPath, exist := expr["has"]; exist { // has - path existence test + return PathExistence(hasPath, expr, src, result) + } else if subExprs, intersect := expr["n"]; intersect { // n - intersection + return Intersect(subExprs, src, result) + } else if subExprs, complement := expr["c"]; complement { // c - complement + return Complement(subExprs, src, result) + } else if intFrom, htRange := expr["int-from"]; htRange { // int-from, int-to - integer range query + return IntRange(intFrom, expr, src, result) + } else if intFrom, htRange := expr["int from"]; htRange { // "int from, "int to" - integer range query - same as above, just without dash + return IntRange(intFrom, expr, src, result) + } else { + return errors.New(fmt.Sprintf("Query %v does not contain any operation (lookup/union/etc)", expr)) + } + } + return nil +} + +// Main entrance to query processor - evaluate a query and put result into result map (as map keys). +func EvalQuery(q interface{}, src *Col, result *map[int]struct{}) (err error) { + return evalQuery(q, src, result, true) +} + +// TODO: How to bring back regex matcher? +// TODO: How to bring back JSON parameterized query? diff --git a/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/db/query_test.go b/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/db/query_test.go new file mode 100644 index 000000000..d27bb40de --- /dev/null +++ b/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/db/query_test.go @@ -0,0 +1,279 @@ +package db + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "os" + "testing" + + "github.com/HouzuoGuo/tiedot/dberr" +) + +func ensureMapHasKeys(m map[int]struct{}, keys ...int) bool { + if len(m) != len(keys) { + return false + } + for _, v := range keys { + if _, ok := m[v]; !ok { + return false + } + } + return true +} + +func runQuery(query string, col *Col) (map[int]struct{}, error) { + result := make(map[int]struct{}) + var jq interface{} + if err := json.Unmarshal([]byte(query), &jq); err != nil { + fmt.Println(err) + } + return result, EvalQuery(jq, col, &result) +} + +func TestQuery(t *testing.T) { + os.RemoveAll(TEST_DATA_DIR) + defer os.RemoveAll(TEST_DATA_DIR) + if err := os.MkdirAll(TEST_DATA_DIR, 0700); err != nil { + t.Fatal(err) + } + if err := ioutil.WriteFile(TEST_DATA_DIR+"/number_of_partitions", []byte("2"), 0600); err != nil { + t.Fatal(err) + } + db, err := OpenDB(TEST_DATA_DIR) + if err != nil { + t.Fatal(err) + } + defer db.Close() + // Prepare collection and index + if err = db.Create("col"); err != nil { + t.Fatal(err) + } + col := db.Use("col") + docs := []string{ + `{"a": {"b": [1]}, "c": 1, "d": 1, "f": 1, "g": 1, "special": {"thing": null}, "h": 1}`, + `{"a": {"b": 1}, "c": [1], "d": 2, "f": 2, "g": 2}`, + `{"a": [{"b": [2]}], "c": 2, "d": 1, "f": 3, "g": 3, "h": 3}`, + `{"a": {"b": 3}, "c": [3], "d": 2, "f": 4, "g": 4}`, + `{"a": {"b": [4]}, "c": 4, "d": 1, "f": 5, "g": 5}`, + `{"a": [{"b": 5}, {"b": 6}], "c": 4, "d": 1, "f": 5, "g": 5, "h": 2}`, + `{"a": [{"b": "val1"}, {"b": "val2"}]}`, + `{"a": [{"b": "val3"}, {"b": ["val4", "val5"]}]}`} + ids := make([]int, len(docs)) + for i, doc := range docs { + var jsonDoc map[string]interface{} + if err := json.Unmarshal([]byte(doc), &jsonDoc); err != nil { + panic(err) + } + if ids[i], err = col.Insert(jsonDoc); err != nil { + t.Fatal(err) + return + } + } + q, err := runQuery(`["all"]`, col) + if err != nil { + t.Fatal(err) + } + col.Index([]string{"a", "b"}) + col.Index([]string{"f"}) + col.Index([]string{"h"}) + col.Index([]string{"special"}) + col.Index([]string{"e"}) + // expand numbers + q, err = runQuery(`["1", "2", ["3", "4"], "5"]`, col) + if err != nil { + t.Fatal(err) + } + if !ensureMapHasKeys(q, 1, 2, 3, 4, 5) { + t.Fatal(q) + } + // hash scan + q, err = runQuery(`{"eq": 1, "in": ["a", "b"]}`, col) + if err != nil { + t.Fatal(err) + } + if !ensureMapHasKeys(q, ids[0], ids[1]) { + t.Fatal(q) + } + q, err = runQuery(`{"eq": 5, "in": ["a", "b"]}`, col) + if err != nil { + t.Fatal(err) + } + if !ensureMapHasKeys(q, ids[5]) { + t.Fatal(q) + } + q, err = runQuery(`{"eq": 6, "in": ["a", "b"]}`, col) + if err != nil { + t.Fatal(err) + } + if !ensureMapHasKeys(q, ids[5]) { + t.Fatal(q) + } + q, err = runQuery(`{"eq": 1, "limit": 1, "in": ["a", "b"]}`, col) + if err != nil { + fmt.Println(err) + } + if !ensureMapHasKeys(q, ids[1]) && !ensureMapHasKeys(q, ids[0]) { + t.Fatal(q, ids[1], ids[0]) + } + // collection scan + q, err = runQuery(`{"eq": 1, "in": ["c"]}`, col) + if dberr.Type(err) != dberr.ErrorNeedIndex { + t.Fatal("Collection scan should not happen") + } + // lookup on "special" (null) + q, err = runQuery(`{"eq": {"thing": null}, "in": ["special"]}`, col) + if err != nil { + t.Fatal(err) + } + if !ensureMapHasKeys(q, ids[0]) { + t.Fatal(q) + } + // lookup in list + q, err = runQuery(`{"eq": "val1", "in": ["a", "b"]}`, col) + if err != nil { + t.Fatal(err) + } + if !ensureMapHasKeys(q, ids[6]) { + t.Fatal(q) + } + q, err = runQuery(`{"eq": "val5", "in": ["a", "b"]}`, col) + if err != nil { + t.Fatal(err) + } + if !ensureMapHasKeys(q, ids[7]) { + t.Fatal(q) + } + // "e" should not exist + q, err = runQuery(`{"has": ["e"]}`, col) + if err != nil { + t.Fatal(err) + } + if !ensureMapHasKeys(q) { + t.Fatal(q) + } + // existence test, hash scan, with limit + q, err = runQuery(`{"has": ["h"], "limit": 2}`, col) + if err != nil { + t.Fatal(err) + } + if !ensureMapHasKeys(q, ids[0], ids[2]) && !ensureMapHasKeys(q, ids[2], ids[5]) && !ensureMapHasKeys(q, ids[5], ids[0]) { + t.Fatal(q, ids[0], ids[1], ids[2]) + } + // existence test with incorrect input + q, err = runQuery(`{"has": ["c"], "limit": "a"}`, col) + if dberr.Type(err) != dberr.ErrorExpectingInt { + t.Fatal(err) + } + // existence test, collection scan & PK + q, err = runQuery(`{"has": ["c"], "limit": 2}`, col) + if dberr.Type(err) != dberr.ErrorNeedIndex { + t.Fatal("Existence test should return error") + } + q, err = runQuery(`{"has": ["@id"], "limit": 2}`, col) + if dberr.Type(err) != dberr.ErrorNeedIndex { + t.Fatal("Existence test should return error") + } + // int range scan with incorrect input + q, err = runQuery(`{"int-from": "a", "int-to": 4, "in": ["f"], "limit": 1}`, col) + if dberr.Type(err) != dberr.ErrorExpectingInt { + t.Fatal(err) + } + q, err = runQuery(`{"int-from": 1, "int-to": "a", "in": ["f"], "limit": 1}`, col) + if dberr.Type(err) != dberr.ErrorExpectingInt { + t.Fatal(err) + } + q, err = runQuery(`{"int-from": 1, "int-to": 2, "in": ["f"], "limit": "a"}`, col) + if dberr.Type(err) != dberr.ErrorExpectingInt { + t.Fatal(err) + } + // int range scan + q, err = runQuery(`{"int-from": 2, "int-to": 4, "in": ["f"]}`, col) + if err != nil { + t.Fatal(err) + } + if !ensureMapHasKeys(q, ids[1], ids[2], ids[3]) { + t.Fatal(q) + } + q, err = runQuery(`{"int-from": 2, "int-to": 4, "in": ["f"], "limit": 2}`, col) + if err != nil { + t.Fatal(err) + } + if !ensureMapHasKeys(q, ids[1], ids[2]) { + t.Fatal(q, ids[1], ids[2]) + } + // int hash scan using reversed range and limit + q, err = runQuery(`{"int-from": 10, "int-to": 0, "in": ["f"]}`, col) + if err != nil { + t.Fatal(err) + } + if !ensureMapHasKeys(q, ids[5], ids[4], ids[3], ids[2], ids[1], ids[0]) { + t.Fatal(q) + } + q, err = runQuery(`{"int-from": 10, "int-to": 0, "in": ["f"], "limit": 2}`, col) + if err != nil { + t.Fatal(err) + } + if !ensureMapHasKeys(q, ids[5], ids[4]) { + t.Fatal(q) + } + // all documents + q, err = runQuery(`"all"`, col) + if err != nil { + t.Fatal(err) + } + if !ensureMapHasKeys(q, ids[0], ids[1], ids[2], ids[3], ids[4], ids[5], ids[6], ids[7]) { + t.Fatal(q) + } + // union + col.Index([]string{"c"}) + q, err = runQuery(`[{"eq": 4, "limit": 1, "in": ["a", "b"]}, {"eq": 1, "limit": 1, "in": ["c"]}]`, col) + if err != nil { + t.Fatal(err) + } + if !ensureMapHasKeys(q, ids[0], ids[4]) && !ensureMapHasKeys(q, ids[1], ids[4]) { + t.Fatal(q) + } + // intersection + col.Index([]string{"d"}) + q, err = runQuery(`{"n": [{"eq": 2, "in": ["d"]}, "all"]}`, col) + if err != nil { + t.Fatal(err) + } + if !ensureMapHasKeys(q, ids[1], ids[3]) { + t.Fatal(q) + } + // intersection with incorrect input + q, err = runQuery(`{"c": null}`, col) + if dberr.Type(err) != dberr.ErrorExpectingSubQuery { + t.Fatal(err) + } + // complement + q, err = runQuery(`{"c": [{"eq": 4, "in": ["c"]}, {"eq": 2, "in": ["d"]}, "all"]}`, col) + if err != nil { + t.Fatal(err) + } + if !ensureMapHasKeys(q, ids[0], ids[2], ids[6], ids[7]) { + t.Fatal(q) + } + // complement with incorrect input + q, err = runQuery(`{"c": null}`, col) + if dberr.Type(err) != dberr.ErrorExpectingSubQuery { + t.Fatal(err) + } + // union of intersection + q, err = runQuery(`[{"n": [{"eq": 3, "in": ["c"]}]}, {"n": [{"eq": 2, "in": ["c"]}]}]`, col) + if !ensureMapHasKeys(q, ids[2], ids[3]) { + t.Fatal(q) + } + // union of complement + q, err = runQuery(`[{"c": [{"eq": 3, "in": ["c"]}]}, {"c": [{"eq": 2, "in": ["c"]}]}]`, col) + if !ensureMapHasKeys(q, ids[2], ids[3]) { + t.Fatal(q) + } + // union of complement of intersection + q, err = runQuery(`[{"c": [{"n": [{"eq": 1, "in": ["d"]},{"eq": 1, "in": ["c"]}]},{"eq": 1, "in": ["d"]}]},{"eq": 2, "in": ["c"]}]`, col) + if !ensureMapHasKeys(q, ids[2], ids[4], ids[5]) { + t.Fatal(q) + } +} diff --git a/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/dberr/errors.go b/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/dberr/errors.go new file mode 100644 index 000000000..4102d92b2 --- /dev/null +++ b/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/dberr/errors.go @@ -0,0 +1,48 @@ +package dberr + +import "fmt" + +type errorType string + +const ( + ErrorNil errorType = "" + ErrorUndefined errorType = "Unknown Error." + + // IO error + ErrorIO errorType = "IO error has occured, see log for more details." + ErrorNoDoc errorType = "Document `%d` does not exist" + + // Document errors + ErrorDocTooLarge errorType = "Document is too large. Max: `%d`, Given: `%d`" + ErrorDocLocked errorType = "Document `%d` is locked for update - try again later" + + // Query input errors + ErrorNeedIndex errorType = "Please index %v and retry query %v." + ErrorExpectingSubQuery errorType = "Expecting a vector of sub-queries, but %v given." + ErrorExpectingInt errorType = "Expecting `%s` as an integer, but %v given." + ErrorMissing errorType = "Missing `%s`" +) + +func New(err errorType, details ...interface{}) Error { + return Error{err, details} +} + +type Error struct { + err errorType + details []interface{} +} + +func (e Error) Error() string { + return fmt.Sprintf(string(e.err), e.details...) +} + +func Type(e error) errorType { + if e == nil { + return ErrorNil + } + + if err, ok := e.(Error); ok { + return err.err + } + return ErrorUndefined +} diff --git a/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/gommap/LICENSE b/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/gommap/LICENSE new file mode 100644 index 000000000..50b26cf61 --- /dev/null +++ b/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/gommap/LICENSE @@ -0,0 +1,24 @@ +Copyright (c) 2011, Evan Shaw +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + * Neither the name of the copyright holder nor the + names of its contributors may be used to endorse or promote products + derived from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY +DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/gommap/README.md b/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/gommap/README.md new file mode 100644 index 000000000..b9eecba1b --- /dev/null +++ b/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/gommap/README.md @@ -0,0 +1,4 @@ +This work is based on [mmap-go][] (BSD-style license) written by [Evan Shaw][]. + +[mmap-go]: https://github.com/edsrzf/mmap-go +[Evan Shaw]: https://github.com/edsrzf/ diff --git a/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/gommap/mmap.go b/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/gommap/mmap.go new file mode 100644 index 000000000..640828d70 --- /dev/null +++ b/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/gommap/mmap.go @@ -0,0 +1,58 @@ +// Copyright 2011 Evan Shaw. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// This file defines the common package interface and contains a little bit of +// factored out logic. + +// Package gommap allows mapping files into memory. It tries to provide a simple, reasonably portable interface, +// but doesn't go out of its way to abstract away every little platform detail. +// This specifically means: +// * forked processes may or may not inherit mappings +// * a file's timestamp may or may not be updated by writes through mappings +// * specifying a size larger than the file's actual size can increase the file's size +// * If the mapped file is being modified by another process while your program's running, don't expect consistent results between platforms +package gommap + +import ( + "errors" + "os" + "reflect" + "unsafe" +) + +// MMap represents a file mapped into memory. +type MMap []byte + +// Map maps an entire file into memory. +// Note that because of runtime limitations, no file larger than about 2GB can +// be completely mapped into memory. +func Map(f *os.File) (MMap, error) { + fd := uintptr(f.Fd()) + fi, err := f.Stat() + if err != nil { + return nil, err + } + length := int(fi.Size()) + if int64(length) != fi.Size() { + return nil, errors.New("memory map file length overflow") + } + return mmap(length, fd) +} + +func (m *MMap) header() *reflect.SliceHeader { + return (*reflect.SliceHeader)(unsafe.Pointer(m)) +} + +// Unmap deletes the memory mapped region, flushes any remaining changes, and sets +// m to nil. +// Trying to read or write any remaining references to m after Unmap is called will +// result in undefined behavior. +// Unmap should only be called on the slice value that was originally returned from +// a call to Map. Calling Unmap on a derived slice may cause errors. +func (m *MMap) Unmap() error { + dh := m.header() + err := unmap(dh.Data, uintptr(dh.Len)) + *m = nil + return err +} diff --git a/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/gommap/mmap_unix.go b/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/gommap/mmap_unix.go new file mode 100644 index 000000000..387724cc0 --- /dev/null +++ b/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/gommap/mmap_unix.go @@ -0,0 +1,23 @@ +// Copyright 2011 Evan Shaw. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build darwin freebsd linux netbsd openbsd + +package gommap + +import ( + "syscall" +) + +func mmap(len int, fd uintptr) ([]byte, error) { + return syscall.Mmap(int(fd), 0, len, syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED) +} + +func unmap(addr, len uintptr) error { + _, _, errno := syscall.Syscall(syscall.SYS_MUNMAP, addr, len, 0) + if errno != 0 { + return syscall.Errno(errno) + } + return nil +} diff --git a/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/gommap/mmap_windows.go b/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/gommap/mmap_windows.go new file mode 100644 index 000000000..ae1889ccc --- /dev/null +++ b/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/gommap/mmap_windows.go @@ -0,0 +1,59 @@ +// Copyright 2011 Evan Shaw. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package gommap + +import ( + "os" + "sync" + "syscall" +) + +// mmap on Windows is a two-step process. +// First, we call CreateFileMapping to get a handle. +// Then, we call MapviewToFile to get an actual pointer into memory. +// Because we want to emulate a POSIX-style mmap, we don't want to expose +// the handle -- only the pointer. We also want to return only a byte slice, +// not a struct, so it's convenient to manipulate. + +// We keep this map so that we can get back the original handle from the memory address. +var handleLock sync.Mutex +var handleMap = map[uintptr]syscall.Handle{} + +// Windows mmap always mapes the entire file regardless of the specified length. +func mmap(length int, hfile uintptr) ([]byte, error) { + h, errno := syscall.CreateFileMapping(syscall.Handle(hfile), nil, syscall.PAGE_READWRITE, 0, 0, nil) + if h == 0 { + return nil, os.NewSyscallError("CreateFileMapping", errno) + } + + addr, errno := syscall.MapViewOfFile(h, syscall.FILE_MAP_WRITE, 0, 0, 0) + if addr == 0 { + return nil, os.NewSyscallError("MapViewOfFile", errno) + } + handleLock.Lock() + handleMap[addr] = h + handleLock.Unlock() + + m := MMap{} + dh := m.header() + dh.Data = addr + dh.Len = length + dh.Cap = length + + return m, nil +} + +func unmap(addr, len uintptr) error { + if err := syscall.UnmapViewOfFile(addr); err != nil { + return err + } + + handleLock.Lock() + defer handleLock.Unlock() + handle := handleMap[addr] + delete(handleMap, addr) + + return os.NewSyscallError("CloseHandle", syscall.CloseHandle(syscall.Handle(handle))) +} diff --git a/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/tdlog/tdlog.go b/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/tdlog/tdlog.go new file mode 100644 index 000000000..5b8d51832 --- /dev/null +++ b/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/tdlog/tdlog.go @@ -0,0 +1,54 @@ +package tdlog + +import ( + "fmt" + "log" + "sync" +) + +// Controls whether INFO log messages are generated +var VerboseLog = false + +// LVL 6 +func Infof(template string, params ...interface{}) { + if VerboseLog { + log.Printf(template, params...) + } +} + +func Info(params ...interface{}) { + if VerboseLog { + log.Print(params...) + } +} + +// LVL 5 +func Noticef(template string, params ...interface{}) { + log.Printf(template, params...) +} + +func Notice(params ...interface{}) { + log.Print(params...) +} + +var critHistory = make(map[string]struct{}) +var critLock = new(sync.Mutex) + +// LVL 2 - will not repeat a message twice over the past 100 distinct crit messages +func CritNoRepeat(template string, params ...interface{}) { + msg := fmt.Sprintf(template, params...) + critLock.Lock() + if _, exists := critHistory[msg]; !exists { + log.Print(msg) + critHistory[msg] = struct{}{} + } + if len(critHistory) > 100 { + critHistory = make(map[string]struct{}) + } + critLock.Unlock() +} + +// LVL 1 +func Panicf(template string, params ...interface{}) { + log.Panicf(template, params...) +} diff --git a/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/tdlog/tdlog_test.go b/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/tdlog/tdlog_test.go new file mode 100644 index 000000000..da638ae5d --- /dev/null +++ b/Godeps/_workspace/src/github.com/HouzuoGuo/tiedot/tdlog/tdlog_test.go @@ -0,0 +1,23 @@ +package tdlog + +import ( + "testing" +) + +func TestAllLogLevels(t *testing.T) { + defer func() { + if recover() == nil { + t.Fatal("Did not catch Panicf") + } + }() + Infof("a %s %s", "b", "c") + Info("a", "b", "c") + Noticef("a %s %s", "b", "c") + Notice("a", "b", "c") + CritNoRepeat("a %s %s", "b", "c") + if _, exists := critHistory["a b c"]; !exists { + t.Fatal("did not record history") + } + Panicf("a %s %s", "b", "c") + t.Fatal("Cannot reach here") +} diff --git a/pkg/utils/database/database.go b/pkg/utils/database/database.go new file mode 100644 index 000000000..2b693e79b --- /dev/null +++ b/pkg/utils/database/database.go @@ -0,0 +1,51 @@ +package database + +import ( + "github.com/HouzuoGuo/tiedot/db" +) + +func NewDatabase() *Database { + return &Database{} +} + +func (data *Database) setdbdir(dirname string) { + data.DBdir = dirname +} + +func (data *Database) GetDBHandle(dirname string) error { + var err error + data.setdbdir(dirname) + data.DBhandle, err = db.OpenDB(data.DBdir) + if err != nil { + return err + } + return nil +} + +func (data *Database) InitCollection(colls ...string) { + for _, str := range colls { + data.DBhandle.Create(str) + } +} + +func (data *Database) GetCollections() []string { + var colls []string + for _, name := range data.DBhandle.AllCols() { + colls = append(colls, name) + } + return colls +} + +func (data *Database) getCollectionHandle(coll string) *db.Col { + return data.DBhandle.Use(coll) +} + +func (data *Database) GetCollectionData(coll string, docid int) (map[string]interface{}, error) { + collHandle := data.getCollectionHandle(coll) + return collHandle.Read(docid) +} + +func (data *Database) InsertToCollection(coll string, model map[string]interface{}) (docid int, err error) { + collHandle := data.getCollectionHandle(coll) + return collHandle.Insert(model) +} diff --git a/pkg/utils/database/database_test.go b/pkg/utils/database/database_test.go new file mode 100644 index 000000000..9361bb092 --- /dev/null +++ b/pkg/utils/database/database_test.go @@ -0,0 +1,36 @@ +package database + +import ( + "os" + "testing" + + . "gopkg.in/check.v1" +) + +func Test(t *testing.T) { TestingT(t) } + +type MySuite struct{} + +var _ = Suite(&MySuite{}) + +func (s *MySuite) Testing(c *C) { + d := NewDatabase() + d.GetDBHandle("/tmp/testdata") + defer os.RemoveAll("/tmp/testdata") + + d.InitCollection("Matrix") + + data := map[string]interface{}{ + "version": "1.4", + "url": "golang.org", + "language": "Go", + } + + docId, err1 := d.InsertToCollection("Matrix", data) + c.Assert(err1, IsNil) + + retdata, err2 := d.GetCollectionData("Matrix", docId) + + c.Assert(err2, IsNil) + c.Assert(data, DeepEquals, retdata) +} diff --git a/pkg/utils/database/structure.go b/pkg/utils/database/structure.go new file mode 100644 index 000000000..e1a5849d8 --- /dev/null +++ b/pkg/utils/database/structure.go @@ -0,0 +1,10 @@ +package database + +import ( + "github.com/HouzuoGuo/tiedot/db" +) + +type Database struct { + DBdir string + DBhandle *db.DB +}