Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
module github.com/flipkart-incubator/gorocksdb

go 1.13

require (
github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c
github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 // indirect
github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 // indirect
github.com/stretchr/testify v1.7.0
)
17 changes: 17 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c h1:8ISkoahWXwZR41ois5lSJBSVw4D0OV19Ht/JSTzvSv0=
github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c/go.mod h1:Yg+htXGokKKdzcwhuNDwVvN+uBxDGXJ7G/VN1d8fa64=
github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 h1:JWuenKqqX8nojtoVVWjGfOF9635RETekkoH6Cc9SX0A=
github.com/facebookgo/stack v0.0.0-20160209184415-751773369052/go.mod h1:UbMTZqLaRiH3MsBH8va0n7s1pQYcu3uTb8G4tygF4Zg=
github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 h1:7HZCaLC5+BZpmbhCOZJ293Lz68O7PYrF2EzeiFMwCLk=
github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4/go.mod h1:5tD+neXqOorC30/tWg0LCSkrqj/AR6gu8yY8/fpw1q0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
72 changes: 72 additions & 0 deletions optim_transactiondb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package gorocksdb

// #include <stdlib.h>
// #include "rocksdb/c.h"
import "C"
import (
"errors"
"unsafe"
)

// OptimisticTransactionDB is a reusable handle to a RocksDB optimistic transactional database on disk, created by OpenOptimisticTransactionDb.
type OptimisticTransactionDB struct {
c *C.rocksdb_optimistictransactiondb_t
name string
opts *Options
}

// OpenOptimisticTransactionDb opens a database with the specified options.
func OpenOptimisticTransactionDb(opts *Options, name string) (*OptimisticTransactionDB, error) {
var (
cErr *C.char
cName = C.CString(name)
)
defer C.free(unsafe.Pointer(cName))
db := C.rocksdb_optimistictransactiondb_open(
opts.c, cName, &cErr)
if cErr != nil {
defer C.rocksdb_free(unsafe.Pointer(cErr))
return nil, errors.New(C.GoString(cErr))
}
return &OptimisticTransactionDB{
name: name,
c: db,
opts: opts,
}, nil
}

// GetBaseDb returns the handle to the underlying DB instance.
func (db *OptimisticTransactionDB) GetBaseDb() *DB {
baseDb := C.rocksdb_optimistictransactiondb_get_base_db(db.c)
return &DB{
name: db.name,
c: baseDb,
opts: db.opts,
}
}

// TransactionBegin begins a new transaction
// with the WriteOptions and TransactionOptions given.
func (db *OptimisticTransactionDB) TransactionBegin(
opts *WriteOptions,
transactionOpts *OptimisticTransactionOptions,
oldTransaction *Transaction,
) *Transaction {
if oldTransaction != nil {
return NewNativeTransaction(C.rocksdb_optimistictransaction_begin(
db.c,
opts.c,
transactionOpts.c,
oldTransaction.c,
))
}

return NewNativeTransaction(C.rocksdb_optimistictransaction_begin(
db.c, opts.c, transactionOpts.c, nil))
}

// Close closes the database.
func (transactionDB *OptimisticTransactionDB) Close() {
C.rocksdb_optimistictransactiondb_close(transactionDB.c)
transactionDB.c = nil
}
127 changes: 127 additions & 0 deletions optim_transactiondb_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package gorocksdb

import (
"io/ioutil"
"sync"
"testing"

"github.com/facebookgo/ensure"
)

func TestOpenOptimisticTransactionDb(t *testing.T) {
db := newTestOptimisticTransactionDB(t, "TestOpenTransactionDb")
defer db.Close()
}

func TestOptimisticTransactionDBCRUD(t *testing.T) {
db := newTestOptimisticTransactionDB(t, "TestTransactionDbCRUD")
defer db.Close()

var (
givenTxnKey = []byte("hello2")
givenTxnKey2 = []byte("hello3")
givenTxnVal1 = []byte("whatawonderful")
wo = NewDefaultWriteOptions()
ro = NewDefaultReadOptions()
to = NewDefaultOptimisticTransactionOptions()
)

bdb := db.GetBaseDb()

// transaction
txn := db.TransactionBegin(wo, to, nil)
defer txn.Destroy()
// create
ensure.Nil(t, txn.Put(givenTxnKey, givenTxnVal1))
v4, err := txn.Get(ro, givenTxnKey)
defer v4.Free()
ensure.Nil(t, err)
ensure.DeepEqual(t, v4.Data(), givenTxnVal1)
ensure.Nil(t, txn.Commit())

v5, err := bdb.Get(ro, givenTxnKey)
defer v5.Free()
ensure.Nil(t, err)
ensure.DeepEqual(t, v5.Data(), givenTxnVal1)

// transaction
txn2 := db.TransactionBegin(wo, to, nil)
defer txn2.Destroy()
// create
ensure.Nil(t, txn2.Put(givenTxnKey2, givenTxnVal1))
// rollback
ensure.Nil(t, txn2.Rollback())

v6, err := bdb.Get(ro, givenTxnKey2)
defer v6.Free()
ensure.Nil(t, err)
ensure.True(t, v6.Data() == nil)

// transaction
txn3 := db.TransactionBegin(wo, to, nil)
defer txn3.Destroy()
// delete
ensure.Nil(t, txn3.Delete(givenTxnKey))
ensure.Nil(t, txn3.Commit())

v7, err := bdb.Get(ro, givenTxnKey)
defer v7.Free()
ensure.Nil(t, err)
ensure.True(t, v7.Data() == nil)
}

func TestOptimisticTransactionDBConflicts(t *testing.T) {
db := newTestOptimisticTransactionDB(t, "TestOptimisticConflicts")
defer db.Close()

var (
ctrKey = []byte("num")
wo = NewDefaultWriteOptions()
ro = NewDefaultReadOptions()
to = NewDefaultOptimisticTransactionOptions()
)

bdb := db.GetBaseDb()
ensure.Nil(t, bdb.Put(wo, ctrKey, []byte{0}))
targetCnt := 10

var wg sync.WaitGroup
for i := 1; i <= targetCnt; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
txn := db.TransactionBegin(wo, to, nil)
cnt, err := txn.GetForUpdate(ro, ctrKey)
ensure.Nil(t, err)
val := cnt.Data()[0]
newVal := val + 1
ensure.Nil(t, txn.Put(ctrKey, []byte{newVal}))
err = txn.Commit()
cnt.Free()
txn.Destroy()
if err == nil {
break
}
}
}()
}
wg.Wait()
cnt, err := bdb.Get(ro, ctrKey)
defer cnt.Free()
ensure.Nil(t, err)
val := cnt.Data()[0]
ensure.True(t, val == byte(targetCnt))
}

func newTestOptimisticTransactionDB(t *testing.T, name string) *OptimisticTransactionDB {
dir, err := ioutil.TempDir("", "gorocksoptimistictransactiondb-"+name)
ensure.Nil(t, err)

opts := NewDefaultOptions()
opts.SetCreateIfMissing(true)
db, err := OpenOptimisticTransactionDb(opts, dir)
ensure.Nil(t, err)

return db
}
32 changes: 32 additions & 0 deletions options_optim_transaction.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package gorocksdb

// #include "rocksdb/c.h"
import "C"

// TransactionOptions represent all of the available options options for
// a transaction on the database.
type OptimisticTransactionOptions struct {
c *C.rocksdb_optimistictransaction_options_t
}

// NewDefaultTransactionOptions creates a default TransactionOptions object.
func NewDefaultOptimisticTransactionOptions() *OptimisticTransactionOptions {
return NewNativeOptimisticTransactionOptions(C.rocksdb_optimistictransaction_options_create())
}

// NewNativeTransactionOptions creates a TransactionOptions object.
func NewNativeOptimisticTransactionOptions(c *C.rocksdb_optimistictransaction_options_t) *OptimisticTransactionOptions {
return &OptimisticTransactionOptions{c}
}

// SetSetSnapshot to true is the same as calling
// Transaction::SetSnapshot().
func (opts *OptimisticTransactionOptions) SetSetSnapshot(value bool) {
C.rocksdb_optimistictransaction_options_set_set_snapshot(opts.c, boolToChar(value))
}

// Destroy deallocates the TransactionOptions object.
func (opts *OptimisticTransactionOptions) Destroy() {
C.rocksdb_optimistictransaction_options_destroy(opts.c)
opts.c = nil
}