Skip to content

Commit

Permalink
First draft for microtransactions
Browse files Browse the repository at this point in the history
  • Loading branch information
Kai Mast committed Feb 12, 2015
1 parent d9f8b88 commit 1e03328
Show file tree
Hide file tree
Showing 7 changed files with 309 additions and 46 deletions.
2 changes: 1 addition & 1 deletion Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -848,7 +848,7 @@ python_wrappers =
python_wrappers += test/sh/bindings.python.BasicSearch.sh
python_wrappers += test/sh/bindings.python.Basic.sh
python_wrappers += test/sh/bindings.python.CondPut.sh
#python_wrappers += test/sh/bindings.python.DataTypeDocument.sh
python_wrappers += test/sh/bindings.python.DataTypeDocument.sh
python_wrappers += test/sh/bindings.python.DataTypeFloat.sh
python_wrappers += test/sh/bindings.python.DataTypeInt.sh
python_wrappers += test/sh/bindings.python.DataTypeListFloat.sh
Expand Down
45 changes: 44 additions & 1 deletion bindings/python/hyperdex/client.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ cdef extern from "stdint.h":
ctypedef unsigned long int uint64_t
ctypedef long unsigned int size_t


cdef extern from "stdlib.h":

void* malloc(size_t size)
Expand Down Expand Up @@ -113,6 +112,8 @@ cdef extern from "hyperdex/client.h":

cdef struct hyperdex_client

cdef struct hyperdex_microtransaction

cdef struct hyperdex_client_attribute:
const char* attr
const char* value
Expand Down Expand Up @@ -173,6 +174,9 @@ cdef extern from "hyperdex/client.h":
int64_t hyperdex_client_get(hyperdex_client* client, const char* space, const char* key, size_t key_sz, hyperdex_client_returncode* status, const hyperdex_client_attribute** attrs, size_t* attrs_sz)
int64_t hyperdex_client_get_partial(hyperdex_client* client, const char* space, const char* key, size_t key_sz, const char** attrnames, size_t attrnames_sz, hyperdex_client_returncode* status, const hyperdex_client_attribute** attrs, size_t* attrs_sz)
int64_t hyperdex_client_put(hyperdex_client* client, const char* space, const char* key, size_t key_sz, const hyperdex_client_attribute* attrs, size_t attrs_sz, hyperdex_client_returncode* status)
hyperdex_microtransaction* hyperdex_client_microtransaction_init(hyperdex_client* _cl, const char* space, hyperdex_client_returncode *status);
int64_t hyperdex_client_microtransaction_commit(hyperdex_client* _cl, hyperdex_microtransaction *transaction, const char* key, size_t key_sz);
int64_t hyperdex_client_microtransaction_put(hyperdex_client* client, hyperdex_microtransaction* tx, const char* space, const char* key, size_t key_sz, const hyperdex_client_attribute* attrs, size_t attrs_sz, hyperdex_client_returncode* status)
int64_t hyperdex_client_cond_put(hyperdex_client* client, const char* space, const char* key, size_t key_sz, const hyperdex_client_attribute_check* checks, size_t checks_sz, const hyperdex_client_attribute* attrs, size_t attrs_sz, hyperdex_client_returncode* status)
int64_t hyperdex_client_group_put(hyperdex_client* client, const char* space, const hyperdex_client_attribute_check* checks, size_t checks_sz, const hyperdex_client_attribute* attrs, size_t attrs_sz, hyperdex_client_returncode* status, uint64_t* count)
int64_t hyperdex_client_put_if_not_exist(hyperdex_client* client, const char* space, const char* key, size_t key_sz, const hyperdex_client_attribute* attrs, size_t attrs_sz, hyperdex_client_returncode* status)
Expand Down Expand Up @@ -1162,6 +1166,42 @@ cdef class Iterator:
self.attrs_sz = 0



cdef class Microtransaction:
def __cinit__(self, Client c, bytes spacename):
cdef const char* in_space
self.client = c
self.deferred = Deferred(self)
self.client.convert_spacename(self.deferred.arena, spacename, &in_space);
self.transaction = hyperdex_client_microtransaction_init(self.client.client, in_space, &self.deferred.status)

def put(self, dict attributes):
cdef hyperdex_client_attribute* in_attrs
cdef size_t in_attrs_sz
self.client.convert_attributes(self.deferred.arena, attributes, &in_attrs, &in_attrs_sz);

def async_commit(self, bytes key):
cdef const char* in_key
cdef size_t in_key_sz
self.client.convert_key(self.deferred.arena, key, &in_key, &in_key_sz)

self.deferred.reqid = hyperdex_client_microtransaction_commit(self.client.client, self.transaction, in_key, in_key_sz)

self.clear_auth_context()
if self.deferred.reqid < 0:
raise HyperDexClientException(self.deferred.status, hyperdex_client_error_message(self.client.client))
self.deferred.encode_return = hyperdex_python_client_deferred_encode_status

self.client.ops[self.deferred.reqid] = self.deferred
return self.deferred

def commit(self, bytes key):
return self.async_commit(key).wait()

cdef Deferred deferred
cdef Client client
cdef hyperdex_microtransaction* transaction

cdef class Client:
cdef hyperdex_client* client
cdef dict ops
Expand Down Expand Up @@ -1562,6 +1602,9 @@ cdef class Client:
self.ops[it.reqid] = it
return it

cdef microtransaction_init(self, bytes spacename):
return Microtransaction(self, spacename)

def async_get(self, bytes spacename, key, auth=None):
return self.asynccall__spacename_key__status_attributes(hyperdex_client_get, spacename, key, auth)
def get(self, bytes spacename, key, auth=None):
Expand Down
43 changes: 42 additions & 1 deletion client/c.cc
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ hyperdex_client_error_message(hyperdex_client* _cl)
HYPERDEX_API void
hyperdex_client_set_type_conversion(hyperdex_client* _cl, bool enabled)
{
FAKE_STATUS;
hyperdex::client* cl = reinterpret_cast<hyperdex::client*>(_cl);
cl->set_type_conversion(enabled);
}
Expand Down Expand Up @@ -1934,6 +1933,48 @@ hyperdex_client_sorted_search(struct hyperdex_client* _cl,
);
}

HYPERDEX_API struct hyperdex_microtransaction*
hyperdex_client_microtransaction_init(struct hyperdex_client* _cl,
const char* space,
enum hyperdex_client_returncode *status)
{

SIGNAL_PROTECT_ERR(NULL);
hyperdex::client *cl = reinterpret_cast<hyperdex::client*>(_cl);
hyperdex::microtransaction *tx = cl->microtransaction_init(space, status);

return reinterpret_cast<struct hyperdex_microtransaction*>(tx);
}

HYPERDEX_API int64_t
hyperdex_client_microtransaction_commit(struct hyperdex_client* _cl,
struct hyperdex_microtransaction *transaction,
const char* key, size_t key_sz)
{
hyperdex::microtransaction* tx = reinterpret_cast<hyperdex::microtransaction*>(transaction);
hyperdex_client_returncode *status = tx->status;

C_WRAP_EXCEPT(
return cl->microtransaction_commit(tx, key, key_sz);
);
}

HYPERDEX_API int64_t
hyperdex_client_microtransaction_put(struct hyperdex_client* _cl,
struct hyperdex_microtransaction *transaction,
const struct hyperdex_client_attribute* attrs, size_t attrs_sz)
{
hyperdex::microtransaction* tx = reinterpret_cast<hyperdex::microtransaction*>(transaction);
hyperdex_client_returncode *status = tx->status;

C_WRAP_EXCEPT(
const hyperdex_client_keyop_info* opinfo;
opinfo = hyperdex_client_keyop_info_lookup(XSTR(put), strlen(XSTR(put)));
return cl->microtransaciton_add_funcall(tx, opinfo, attrs, attrs_sz, NULL, 0);
);
}


HYPERDEX_API int64_t
hyperdex_client_count(struct hyperdex_client* _cl,
const char* space,
Expand Down
109 changes: 109 additions & 0 deletions client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
return false;

using hyperdex::client;
using hyperdex::microtransaction;

client :: client(const char* coordinator, uint16_t port)
: m_coord(coordinator, port)
Expand Down Expand Up @@ -1234,6 +1235,96 @@ client :: handle_disruption(const server_id& si)
m_busybee.drop(si.get());
}

microtransaction* client::microtransaction_init(const char* space, hyperdex_client_returncode *status)
{
if (!maintain_coord_connection(status))
{
return NULL;
}

const schema* sc = m_coord.config()->get_schema(space);

if (!sc)
{
ERROR(UNKNOWNSPACE) << "space \"" << e::strescape(space) << "\" does not exist";
return NULL;
}

return new microtransaction(space, *sc, status);
}

int64_t client::microtransaciton_add_funcall(microtransaction *transaction,
const hyperdex_client_keyop_info* opinfo,
const hyperdex_client_attribute* attrs, size_t attrs_sz,
const hyperdex_client_map_attribute* mapattrs, size_t mapattrs_sz)
{
size_t idx = 0;
e::arena memory;

// Prepare the attrs
idx = prepare_funcs(transaction->space, transaction->sc, opinfo, attrs, attrs_sz, &memory, transaction->status, &transaction->funcalls);

if (idx < attrs_sz)
{
return -2 - idx;
}

// Prepare the mapattrs
idx = prepare_funcs(transaction->space, transaction->sc, opinfo, mapattrs, mapattrs_sz, &memory, transaction->status, &transaction->funcalls);

if (idx < mapattrs_sz)
{
return -2 - attrs_sz - idx;
}

return 0;
}

int64_t client::microtransaction_commit(microtransaction *transaction,
const char* _key, size_t _key_sz)
{
hyperdex_client_returncode *status = transaction->status;

datatype_info* di = datatype_info::lookup(transaction->sc.attrs[0].type);
assert(di);
e::slice key(_key, _key_sz);

if (!di->validate(key))
{
ERROR(WRONGTYPE) << "key must be type " << transaction->sc.attrs[0].type;
return -1;
}

e::intrusive_ptr<pending> op;
op = new pending_atomic(m_next_client_id++, status);
std::auto_ptr<e::buffer> msg;
auth_wallet aw(m_macaroons, m_macaroons_sz);
size_t header_sz = HYPERDEX_CLIENT_HEADER_SIZE_REQ
+ pack_size(key);
size_t footer_sz = 0;

if (m_macaroons_sz)
{
footer_sz += pack_size(aw);
}

int ret = transaction->generate_message(header_sz, footer_sz, &msg);

if (ret < 0)
{
return ret;
}

msg->pack_at(HYPERDEX_CLIENT_HEADER_SIZE_REQ) << key;

if (m_macaroons_sz)
{
msg->pack_at(msg->capacity() - footer_sz) << aw;
}

return send_keyop(transaction->space, key, REQ_ATOMIC, msg, op, status);
}

HYPERDEX_API std::ostream&
operator << (std::ostream& lhs, hyperdex_client_returncode rhs)
{
Expand Down Expand Up @@ -1278,3 +1369,21 @@ client :: set_type_conversion(bool enabled)
{
m_convert_types = enabled;
}

int64_t
microtransaction::generate_message(size_t header_sz, size_t footer_sz,
std::auto_ptr<e::buffer>* msg)
{
const bool fail_if_not_found = true;
std::vector<attribute_check> checks;

std::stable_sort(funcalls.begin(), funcalls.end());
size_t sz = header_sz + footer_sz
+ sizeof(uint8_t)
+ pack_size(checks)
+ pack_size(funcalls);
msg->reset(e::buffer::create(sz));
uint8_t flags = (fail_if_not_found ? 1 : 0);
(*msg)->pack_at(header_sz) << flags << checks << funcalls;
return 0;
}
47 changes: 45 additions & 2 deletions client/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,25 @@

BEGIN_HYPERDEX_NAMESPACE

struct microtransaction
{
microtransaction(const char* space_, const schema& sc_,
hyperdex_client_returncode *status_)
: space(space_), sc(sc_), status(status_), funcalls()
{}

PO6_NONCOPYABLE(microtransaction);

int64_t generate_message(size_t header_sz, size_t footer_sz,
std::auto_ptr<e::buffer>* msg);

const char* space;
const schema& sc;
hyperdex_client_returncode* status;
std::vector<funcall> funcalls;

};

class client
{
public:
Expand Down Expand Up @@ -90,24 +109,48 @@ class client
int64_t count(const char* space,
const hyperdex_client_attribute_check* checks, size_t checks_sz,
hyperdex_client_returncode* status, uint64_t* result);
// general keyop call

// General keyop call
// This will be called by the bindings from c.cc
int64_t perform_funcall(const hyperdex_client_keyop_info* opinfo,
const char* space, const char* key, size_t key_sz,
const hyperdex_client_attribute_check* checks, size_t checks_sz,
const hyperdex_client_attribute* attrs, size_t attrs_sz,
const hyperdex_client_map_attribute* mapattrs, size_t mapattrs_sz,
hyperdex_client_returncode* status);
// general grouped keyop call

// General keyop call for group operations
// This will be called by the bindings from c.cc
int64_t perform_group_funcall(const hyperdex_client_keyop_info* opinfo,
const char* space,
const hyperdex_client_attribute_check* checks, size_t checks_sz,
const hyperdex_client_attribute* attrs, size_t attrs_sz,
const hyperdex_client_map_attribute* mapattrs, size_t mapattrs_sz,
hyperdex_client_returncode* status,
uint64_t* update_count);

// Initiate a microtransaction
// Status and transaction object must remain valid until the operation has completed
microtransaction *microtransaction_init(const char* space,
hyperdex_client_returncode *status);

// Add a new funcall to the microstransaction
int64_t microtransaciton_add_funcall(microtransaction *transaction,
const hyperdex_client_keyop_info* opinfo,
const hyperdex_client_attribute* attrs, size_t attrs_sz,
const hyperdex_client_map_attribute* mapattrs, size_t mapattrs_sz);

// Issue a microtransaction to a specific key using specific checks
int64_t microtransaction_commit(microtransaction *transaction,
const char* key, size_t key_sz);

// looping/polling
int64_t loop(int timeout, hyperdex_client_returncode* status);

// Return the fildescriptor that hyperdex uses for networking
int poll();

// Block unitl there is incoming data or the timeout is reached
int block(int timeout);
// error handling
const char* error_message();
Expand Down
16 changes: 16 additions & 0 deletions include/hyperdex/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ extern "C"
#endif /* __cplusplus */

struct hyperdex_client;
struct hyperdex_microtransaction;

struct hyperdex_client_attribute
{
Expand Down Expand Up @@ -968,6 +969,21 @@ hyperdex_client_group_map_atomic_max(struct hyperdex_client* client,
enum hyperdex_client_returncode* status,
uint64_t* count);

struct hyperdex_microtransaction*
hyperdex_client_microtransaction_init(struct hyperdex_client* _cl,
const char* space,
enum hyperdex_client_returncode *status);

int64_t
hyperdex_client_microtransaction_commit(struct hyperdex_client* _cl,
struct hyperdex_microtransaction *transaction,
const char* key, size_t key_sz);

int64_t
hyperdex_client_microtransaction_put(struct hyperdex_client* _cl,
struct hyperdex_microtransaction *transaction,
const struct hyperdex_client_attribute* attrs, size_t attrs_sz);

int64_t
hyperdex_client_search(struct hyperdex_client* client,
const char* space,
Expand Down
Loading

0 comments on commit 1e03328

Please sign in to comment.