Skip to content

Commit

Permalink
Transaction support column families, tecbot#168
Browse files Browse the repository at this point in the history
  • Loading branch information
flier committed Mar 19, 2021
1 parent f0fad39 commit da506dd
Show file tree
Hide file tree
Showing 2 changed files with 215 additions and 0 deletions.
72 changes: 72 additions & 0 deletions transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,23 @@ func (transaction *Transaction) Get(opts *ReadOptions, key []byte) (*Slice, erro
return NewSlice(cValue, cValLen), nil
}

// Get returns the data associated with the key from the database given this transaction and column family.
func (transaction *Transaction) GetCF(opts *ReadOptions, cf *ColumnFamilyHandle, key []byte) (*Slice, error) {
var (
cErr *C.char
cValLen C.size_t
cKey = byteToChar(key)
)
cValue := C.rocksdb_transaction_get_cf(
transaction.c, opts.c, cf.c, cKey, C.size_t(len(key)), &cValLen, &cErr,
)
if cErr != nil {
defer C.rocksdb_free(unsafe.Pointer(cErr))
return nil, errors.New(C.GoString(cErr))
}
return NewSlice(cValue, cValLen), nil
}

// GetForUpdate queries the data associated with the key and puts an exclusive lock on the key from the database given this transaction.
func (transaction *Transaction) GetForUpdate(opts *ReadOptions, key []byte) (*Slice, error) {
var (
Expand All @@ -80,6 +97,23 @@ func (transaction *Transaction) GetForUpdate(opts *ReadOptions, key []byte) (*Sl
return NewSlice(cValue, cValLen), nil
}

// GetForUpdate queries the data associated with the key and puts an exclusive lock on the key from the database given this transaction and column family.
func (transaction *Transaction) GetForUpdateCF(opts *ReadOptions, cf *ColumnFamilyHandle, key []byte) (*Slice, error) {
var (
cErr *C.char
cValLen C.size_t
cKey = byteToChar(key)
)
cValue := C.rocksdb_transaction_get_for_update_cf(
transaction.c, opts.c, cf.c, cKey, C.size_t(len(key)), &cValLen, C.uchar(byte(1)) /*exclusive*/, &cErr,
)
if cErr != nil {
defer C.rocksdb_free(unsafe.Pointer(cErr))
return nil, errors.New(C.GoString(cErr))
}
return NewSlice(cValue, cValLen), nil
}

// Put writes data associated with a key to the transaction.
func (transaction *Transaction) Put(key, value []byte) error {
var (
Expand All @@ -97,6 +131,23 @@ func (transaction *Transaction) Put(key, value []byte) error {
return nil
}

// Put writes data associated with a key to the transaction and column family.
func (transaction *Transaction) PutCF(cf *ColumnFamilyHandle, key, value []byte) error {
var (
cErr *C.char
cKey = byteToChar(key)
cValue = byteToChar(value)
)
C.rocksdb_transaction_put_cf(
transaction.c, cf.c, cKey, C.size_t(len(key)), cValue, C.size_t(len(value)), &cErr,
)
if cErr != nil {
defer C.rocksdb_free(unsafe.Pointer(cErr))
return errors.New(C.GoString(cErr))
}
return nil
}

// Delete removes the data associated with the key from the transaction.
func (transaction *Transaction) Delete(key []byte) error {
var (
Expand All @@ -111,13 +162,34 @@ func (transaction *Transaction) Delete(key []byte) error {
return nil
}

// Delete removes the data associated with the key from the transaction and column family.
func (transaction *Transaction) DeleteCF(cf *ColumnFamilyHandle, key []byte) error {
var (
cErr *C.char
cKey = byteToChar(key)
)
C.rocksdb_transaction_delete_cf(transaction.c, cf.c, cKey, C.size_t(len(key)), &cErr)
if cErr != nil {
defer C.rocksdb_free(unsafe.Pointer(cErr))
return errors.New(C.GoString(cErr))
}
return nil
}

// NewIterator returns an Iterator over the database that uses the
// ReadOptions given.
func (transaction *Transaction) NewIterator(opts *ReadOptions) *Iterator {
return NewNativeIterator(
unsafe.Pointer(C.rocksdb_transaction_create_iterator(transaction.c, opts.c)))
}

// NewIterator returns an Iterator over the database that uses the
// ReadOptions given and column family.
func (transaction *Transaction) NewIteratorCF(opts *ReadOptions, cf *ColumnFamilyHandle) *Iterator {
return NewNativeIterator(
unsafe.Pointer(C.rocksdb_transaction_create_iterator_cf(transaction.c, opts.c, cf.c)))
}

// Destroy deallocates the transaction object.
func (transaction *Transaction) Destroy() {
C.rocksdb_transaction_destroy(transaction.c)
Expand Down
143 changes: 143 additions & 0 deletions transactiondb.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,87 @@ func OpenTransactionDb(
}, nil
}

// OpenTransactionDbColumnFamilies opens a database with the specified options.
func OpenTransactionDbColumnFamilies(
opts *Options,
transactionDBOpts *TransactionDBOptions,
name string,
cfNames []string,
cfOpts []*Options,
) (*TransactionDB, []*ColumnFamilyHandle, error) {
numColumnFamilies := len(cfNames)
if numColumnFamilies != len(cfOpts) {
return nil, nil, errors.New("must provide the same number of column family names and options")
}
cNames := make([]*C.char, numColumnFamilies)
for i, s := range cfNames {
cNames[i] = C.CString(s)
}
defer func() {
for _, s := range cNames {
C.free(unsafe.Pointer(s))
}
}()

cOpts := make([]*C.rocksdb_options_t, numColumnFamilies)
for i, o := range cfOpts {
cOpts[i] = o.c
}

var (
cErr *C.char
cName = C.CString(name)
)
defer C.free(unsafe.Pointer(cName))

cHandles := make([]*C.rocksdb_column_family_handle_t, numColumnFamilies)

db := C.rocksdb_transactiondb_open_column_families(
opts.c,
transactionDBOpts.c,
cName,
C.int(numColumnFamilies),
&cNames[0],
&cOpts[0],
&cHandles[0],
&cErr)
if cErr != nil {
defer C.rocksdb_free(unsafe.Pointer(cErr))
return nil, nil, errors.New(C.GoString(cErr))
}

cfHandles := make([]*ColumnFamilyHandle, numColumnFamilies)
for i, c := range cHandles {
cfHandles[i] = NewNativeColumnFamilyHandle(c)
}

return &TransactionDB{
name: name,
c: db,
opts: opts,
transactionDBOpts: transactionDBOpts,
}, cfHandles, nil
}

func (db *TransactionDB) CreateColumnFamily(opts *Options, name string) (*ColumnFamilyHandle, error) {
var (
cErr *C.char
cName = C.CString(name)
)
defer C.free(unsafe.Pointer(cName))

h := C.rocksdb_transactiondb_create_column_family(
db.c,
opts.c,
cName,
&cErr)
if cErr != nil {
defer C.rocksdb_free(unsafe.Pointer(cErr))
return nil, errors.New(C.GoString(cErr))
}
return NewNativeColumnFamilyHandle(h), nil
}

// NewSnapshot creates a new snapshot of the database.
func (db *TransactionDB) NewSnapshot() *Snapshot {
return NewNativeSnapshot(C.rocksdb_transactiondb_create_snapshot(db.c))
Expand Down Expand Up @@ -89,6 +170,23 @@ func (db *TransactionDB) Get(opts *ReadOptions, key []byte) (*Slice, error) {
return NewSlice(cValue, cValLen), nil
}

// Get returns the data associated with the key from the database and column family.
func (db *TransactionDB) GetCF(opts *ReadOptions, cf *ColumnFamilyHandle, key []byte) (*Slice, error) {
var (
cErr *C.char
cValLen C.size_t
cKey = byteToChar(key)
)
cValue := C.rocksdb_transactiondb_get_cf(
db.c, opts.c, cf.c, cKey, C.size_t(len(key)), &cValLen, &cErr,
)
if cErr != nil {
defer C.rocksdb_free(unsafe.Pointer(cErr))
return nil, errors.New(C.GoString(cErr))
}
return NewSlice(cValue, cValLen), nil
}

// Put writes data associated with a key to the database.
func (db *TransactionDB) Put(opts *WriteOptions, key, value []byte) error {
var (
Expand All @@ -106,6 +204,23 @@ func (db *TransactionDB) Put(opts *WriteOptions, key, value []byte) error {
return nil
}

// Put writes data associated with a key to the database and column family.
func (db *TransactionDB) PutCF(opts *WriteOptions, cf *ColumnFamilyHandle, key, value []byte) error {
var (
cErr *C.char
cKey = byteToChar(key)
cValue = byteToChar(value)
)
C.rocksdb_transactiondb_put_cf(
db.c, opts.c, cf.c, cKey, C.size_t(len(key)), cValue, C.size_t(len(value)), &cErr,
)
if cErr != nil {
defer C.rocksdb_free(unsafe.Pointer(cErr))
return errors.New(C.GoString(cErr))
}
return nil
}

// Delete removes the data associated with the key from the database.
func (db *TransactionDB) Delete(opts *WriteOptions, key []byte) error {
var (
Expand All @@ -120,6 +235,34 @@ func (db *TransactionDB) Delete(opts *WriteOptions, key []byte) error {
return nil
}

// Delete removes the data associated with the key from the database and column family.
func (db *TransactionDB) DeleteCF(opts *WriteOptions, cf *ColumnFamilyHandle, key []byte) error {
var (
cErr *C.char
cKey = byteToChar(key)
)
C.rocksdb_transactiondb_delete_cf(db.c, opts.c, cf.c, cKey, C.size_t(len(key)), &cErr)
if cErr != nil {
defer C.rocksdb_free(unsafe.Pointer(cErr))
return errors.New(C.GoString(cErr))
}
return nil
}

// NewIterator returns an Iterator over the database that uses the
// ReadOptions given.
func (db *TransactionDB) NewIterator(opts *ReadOptions) *Iterator {
return NewNativeIterator(
unsafe.Pointer(C.rocksdb_transactiondb_create_iterator(db.c, opts.c)))
}

// NewIterator returns an Iterator over the database that uses the
// ReadOptions given and column family.
func (db *TransactionDB) NewIteratorCF(opts *ReadOptions, cf *ColumnFamilyHandle) *Iterator {
return NewNativeIterator(
unsafe.Pointer(C.rocksdb_transactiondb_create_iterator_cf(db.c, opts.c, cf.c)))
}

// NewCheckpoint creates a new Checkpoint for this db.
func (db *TransactionDB) NewCheckpoint() (*Checkpoint, error) {
var (
Expand Down

0 comments on commit da506dd

Please sign in to comment.