Skip to content

Commit

Permalink
Move segment to the common class infrastructure
Browse files Browse the repository at this point in the history
Begin simplifying redundant code.
  • Loading branch information
PerilousApricot committed Jul 11, 2016
1 parent 1359d87 commit ef08ece
Show file tree
Hide file tree
Showing 10 changed files with 203 additions and 195 deletions.
12 changes: 6 additions & 6 deletions src/lio/ex3.c
Original file line number Diff line number Diff line change
Expand Up @@ -224,11 +224,11 @@ gop_op_status_t lio_exnode_clone_func(void *arg, int gid)

if (gop_completed_successfully(gop) != OP_STATE_SUCCESS) {
nfailed++;
segment_destroy(segptr[1]);
tbx_obj_put(&segptr[1]->obj);
} else {
if (did == segment_id(segptr[0])) ex->default_seg = segptr[1];
tbx_list_insert(ex->view, &segment_id(segptr[1]), segptr[1]);
tbx_atomic_inc(segptr[1]->ref_count);
tbx_obj_get(&segptr[1]->obj);
}

gop_free(gop, OP_DESTROY);
Expand Down Expand Up @@ -456,7 +456,7 @@ int lio_exnode_deserialize_text(lio_exnode_t *ex, lio_exnode_exchange_t *exp, li
log_printf(15, "exnode_load_text: Loading view segment " XIDT "\n", id);
seg = load_segment(ess, id, exp);
if (seg != NULL) {
tbx_atomic_inc(seg->ref_count);
tbx_obj_get(&seg->obj);
tbx_list_insert(ex->view, &segment_id(seg), seg);
} else {
log_printf(0, "Bad segment! sid=" XIDT "\n", id);
Expand Down Expand Up @@ -588,9 +588,9 @@ void lio_exnode_destroy(lio_exnode_t *ex)
//** Remove the views
it = tbx_list_iter_search(ex->view, (tbx_sl_key_t *)NULL, 0);
while (tbx_list_next(&it, (tbx_sl_key_t *)&id, (tbx_sl_data_t *)&seg) == 0) {
tbx_atomic_dec(seg->ref_count);
log_printf(15, "lio_exnode_destroy: seg->id=" XIDT " ref_count=%d\n", segment_id(seg), seg->ref_count);
segment_destroy(seg);
if (tbx_obj_put(&seg->obj)) {
log_printf(15, "lio_exnode_destroy: seg->id=" XIDT "\n", segment_id(seg));
}
tbx_list_next(&it, (tbx_sl_key_t *)&id, (tbx_sl_data_t *)&seg);
}

Expand Down
18 changes: 1 addition & 17 deletions src/lio/ex3.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "ex3/types.h"
#include "os.h"
#include "rs.h"
#include "segment.h"
#include "service_manager.h"

#ifdef __cplusplus
Expand All @@ -58,17 +59,6 @@ struct lio_segment_rw_hints_t { //** Structure for contaiing hints to the va
};

//#define inspect_printf(fd, ...) if ((fd) != NULL) fprintf(fd, __VA_ARGS__)

#define lio_segment_type(s) (s)->header.type
#define segment_destroy(s) (s)->fn.destroy(s)
#define segment_remove(s, da, to) (s)->fn.remove(s, da, to)
#define segment_clone(s, da, clone_ex, mode, attr, to) (s)->fn.clone(s, da, clone_ex, mode, attr, to)
#define segment_block_size(s) (s)->fn.block_size(s)
#define segment_serialize(s, exp) (s)->fn.serialize(s, exp)
#define segment_deserialize(s, id, exp) (s)->fn.deserialize(s, id, exp)
#define segment_lock(s) apr_thread_mutex_lock((s)->lock)
#define segment_unlock(s) apr_thread_mutex_unlock((s)->lock)

struct lio_exnode_t {
lio_ex_header_t header;
lio_segment_t *default_seg;
Expand Down Expand Up @@ -104,12 +94,6 @@ lio_segment_t *view_search_by_name(lio_exnode_t *ex, char *name);
lio_segment_t *view_search_by_id(lio_exnode_t *ex, ex_id_t id);

//** Segment related functions
#define segment_get_header(seg) &((seg)->header)
#define segment_set_header(seg, new_head) (seg)->header = *(new_head)
gop_op_generic_t *segment_put(gop_thread_pool_context_t *tpc, data_attr_t *da, lio_segment_rw_hints_t *rw_hints, FILE *fd, lio_segment_t *dest_seg, ex_off_t dest_offset, ex_off_t len, ex_off_t bufsize, char *buffer, int do_truncate, int timeout);
gop_op_generic_t *segment_get(gop_thread_pool_context_t *tpc, data_attr_t *da, lio_segment_rw_hints_t *rw_hints, lio_segment_t *src_seg, FILE *fd, ex_off_t src_offset, ex_off_t len, ex_off_t bufsize, char *buffer, int timeout);
lio_segment_t *load_segment(lio_service_manager_t *ess, ex_id_t id, lio_exnode_exchange_t *ex);

void generate_ex_id(ex_id_t *id);

#ifdef __cplusplus
Expand Down
24 changes: 12 additions & 12 deletions src/lio/lio/segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ limitations under the License.
#include <lio/ex3.h>
#include <lio/visibility.h>
#include <lio/rs.h>
#include <tbx/object.h>

#ifdef __cplusplus
extern "C" {
#endif

// Typedefs
typedef struct lio_segment_vtable_t lio_segment_vtable_t;
typedef gop_op_generic_t *(*lio_segment_read_fn_t)(lio_segment_t *seg, data_attr_t *da, lio_segment_rw_hints_t *hints, int n_iov, ex_tbx_iovec_t *iov, tbx_tbuf_t *buffer, ex_off_t boff, int timeout);
typedef gop_op_generic_t *(*lio_segment_write_fn_t)(lio_segment_t *seg, data_attr_t *da, lio_segment_rw_hints_t *hints, int n_iov, ex_tbx_iovec_t *iov, tbx_tbuf_t *buffer, ex_off_t boff, int timeout);
typedef gop_op_generic_t *(*lio_segment_inspect_fn_t)(lio_segment_t *seg, data_attr_t *da, tbx_log_fd_t *fd, int mode, ex_off_t buffer_size, lio_inspect_args_t *args, int timeout);
Expand Down Expand Up @@ -63,17 +65,18 @@ LIO_API gop_op_generic_t *lio_slog_merge_with_base(lio_segment_t *seg, data_attr
#define SEGMENT_TYPE_LINEAR "linear"

// Preprocessor macros
#define segment_flush(s, da, lo, hi, to) (s)->fn.flush(s, da, lo, hi, to)
#define segment_flush(s, da, lo, hi, to) ((lio_segment_vtable_t *)(s)->obj.vtable)->flush(s, da, lo, hi, to)
#define segment_id(s) (s)->header.id
#define segment_inspect(s, da, fd, mode, bsize, query, to) (s)->fn.inspect(s, da, fd, mode, bsize, query, to)
#define segment_read(s, da, hints, n_iov, iov, tbuf, boff, to) (s)->fn.read(s, da, hints, n_iov, iov, tbuf, boff, to)
#define segment_signature(s, buffer, used, bufsize) (s)->fn.signature(s, buffer, used, bufsize)
#define segment_size(s) (s)->fn.size(s)
#define lio_segment_truncate(s, da, new_size, to) (s)->fn.truncate(s, da, new_size, to)
#define segment_write(s, da, hints, n_iov, iov, tbuf, boff, to) (s)->fn.write(s, da, hints, n_iov, iov, tbuf, boff, to)
#define segment_inspect(s, da, fd, mode, bsize, query, to) ((lio_segment_vtable_t *)(s)->obj.vtable)->inspect(s, da, fd, mode, bsize, query, to)
#define segment_read(s, da, hints, n_iov, iov, tbuf, boff, to) ((lio_segment_vtable_t *)(s)->obj.vtable)->read(s, da, hints, n_iov, iov, tbuf, boff, to)
#define segment_signature(s, buffer, used, bufsize) ((lio_segment_vtable_t *)(s)->obj.vtable)->signature(s, buffer, used, bufsize)
#define segment_size(s) ((lio_segment_vtable_t *)(s)->obj.vtable)->size(s)
#define lio_segment_truncate(s, da, new_size, to) ((lio_segment_vtable_t *)(s)->obj.vtable)->truncate(s, da, new_size, to)
#define segment_write(s, da, hints, n_iov, iov, tbuf, boff, to) ((lio_segment_vtable_t *)(s)->obj.vtable)->write(s, da, hints, n_iov, iov, tbuf, boff, to)

// Exported types. To be obscured
struct lio_segment_fn_t {
struct lio_segment_vtable_t {
tbx_vtable_t base;
lio_segment_read_fn_t read;
lio_segment_write_fn_t write;
lio_segment_inspect_fn_t inspect;
Expand All @@ -86,15 +89,13 @@ struct lio_segment_fn_t {
lio_segment_size_fn_t size;
lio_segment_serialize_fn_t serialize;
lio_segment_deserialize_fn_t deserialize;
lio_segment_destroy_fn_t destroy;
};

struct lio_segment_t {
tbx_obj_t obj;
lio_ex_header_t header;
tbx_atomic_unit32_t ref_count;
segment_priv_t *priv;
lio_service_manager_t *ess;
lio_segment_fn_t fn;
apr_thread_mutex_t *lock;
apr_thread_cond_t *cond;
apr_pool_t *mpool;
Expand All @@ -106,7 +107,6 @@ struct lio_segment_errors_t {
int write;
};


#ifdef __cplusplus
}
#endif
Expand Down
59 changes: 31 additions & 28 deletions src/lio/segment/cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@

#define XOT_MAX (LONG_MAX-2)

const lio_segment_vtable_t lio_cacheseg_vtable;
typedef struct {
lio_segment_t *seg;
data_attr_t *da;
Expand Down Expand Up @@ -3168,7 +3169,7 @@ gop_op_status_t segcache_clone_func(void *arg, int id)
status = (gop_waitall(cop->gop) == OP_STATE_SUCCESS) ? gop_success_status : gop_failure_status;
gop_free(cop->gop, OP_DESTROY);

tbx_atomic_inc(ds->child_seg->ref_count);
tbx_obj_get(&ds->child_seg->obj);
return(status);
}

Expand Down Expand Up @@ -3212,7 +3213,7 @@ gop_op_generic_t *segcache_clone(lio_segment_t *seg, data_attr_t *da, lio_segmen
tbx_type_malloc(cop, cache_clone_t, 1);
cop->sseg = seg;
cop->dseg = clone;
if (use_existing == 1) tbx_atomic_dec(sd->child_seg->ref_count);
if (use_existing == 1) tbx_obj_put(&sd->child_seg->obj);
cop->gop = segment_clone(ss->child_seg, da, &(sd->child_seg), mode, arg, timeout);

log_printf(5, "child_clone gid=%d\n", gop_id(cop->gop));
Expand Down Expand Up @@ -3285,7 +3286,8 @@ int segcache_serialize_text(lio_segment_t *seg, lio_exnode_exchange_t *exp)
free(etext);
}
tbx_append_printf(segbuf, &sused, bufsize, "type=%s\n", SEGMENT_TYPE_CACHE);
tbx_append_printf(segbuf, &sused, bufsize, "ref_count=%d\n", seg->ref_count);
// FIXME
//tbx_append_printf(segbuf, &sused, bufsize, "ref_count=%d\n", seg->ref_count);

//** Basic size info
tbx_append_printf(segbuf, &sused, bufsize, "used_size=" XOT "\n", s->total_size);
Expand Down Expand Up @@ -3388,7 +3390,7 @@ int segcache_deserialize_text(lio_segment_t *seg, ex_id_t myid, lio_exnode_excha
seg->header.type = SEGMENT_TYPE_CACHE;
seg->header.name = tbx_inip_get_string(fd, seggrp, "name", "");

tbx_atomic_inc(s->child_seg->ref_count);
tbx_obj_get(&s->child_seg->obj);

//** Tweak the page size
s->page_size = segment_block_size(s->child_seg);
Expand Down Expand Up @@ -3469,16 +3471,16 @@ int segcache_deserialize(lio_segment_t *seg, ex_id_t id, lio_exnode_exchange_t *
// segcache_destroy - Destroys the cache segment
//***********************************************************************

void segcache_destroy(lio_segment_t *seg)
void segcache_destroy(tbx_ref_t *ref)
{
tbx_obj_t *obj = container_of(ref, tbx_obj_t, refcount);
lio_segment_t *seg = container_of(obj, lio_segment_t, obj);
lio_cache_lio_segment_t *s = (lio_cache_lio_segment_t *)seg->priv;
gop_op_generic_t *gop;
int i;

//** Check if it's still in use
log_printf(2, "segcache_destroy: seg->id=" XIDT " ref_count=%d sptr=%p\n", segment_id(seg), seg->ref_count, seg);

if (seg->ref_count > 0) return;
log_printf(2, "segcache_destroy: seg->id=" XIDT " sptr=%p\n", segment_id(seg), seg);

CACHE_PRINT;

Expand Down Expand Up @@ -3545,8 +3547,7 @@ void segcache_destroy(lio_segment_t *seg)

//** Destroy the child segment as well
if (s->child_seg != NULL) {
tbx_atomic_dec(s->child_seg->ref_count);
segment_destroy(s->child_seg);
tbx_obj_put(&s->child_seg->obj);
}

//** and finally the misc stuff
Expand Down Expand Up @@ -3587,7 +3588,7 @@ lio_segment_t *segment_cache_create(void *arg)
//** Make the space
tbx_type_malloc_clear(seg, lio_segment_t, 1);
tbx_type_malloc_clear(s, lio_cache_lio_segment_t, 1);

tbx_obj_init(&seg->obj, (tbx_vtable_t *) &lio_cacheseg_vtable);
assert_result(apr_pool_create(&(seg->mpool), NULL), APR_SUCCESS);
apr_thread_mutex_create(&(seg->lock), APR_THREAD_MUTEX_DEFAULT, seg->mpool);
apr_thread_cond_create(&(seg->cond), seg->mpool);
Expand All @@ -3596,7 +3597,7 @@ lio_segment_t *segment_cache_create(void *arg)

s->flush_stack = tbx_stack_new();
s->tpc_unlimited = lio_lookup_service(es, ESS_RUNNING, ESS_TPC_CACHE);
FATAL_UNLESS(s->tpc_unlimited != NULL);
FATAL_UNLESS(s->tpc_unlimited != NULL);

s->pages = tbx_list_create(0, &skiplist_compare_ex_off, NULL, NULL, NULL);

Expand All @@ -3611,28 +3612,13 @@ lio_segment_t *segment_cache_create(void *arg)
log_printf(2, "CACHE-PTR seg=" XIDT " s->c=%p\n", segment_id(seg), s->c);

generate_ex_id(&(seg->header.id));
tbx_atomic_set(seg->ref_count, 0);
seg->header.type = SEGMENT_TYPE_CACHE;

snprintf(qname, sizeof(qname), XIDT HP_HOSTPORT_SEPARATOR "1" HP_HOSTPORT_SEPARATOR "0" HP_HOSTPORT_SEPARATOR "0", seg->header.id);
s->qname = strdup(qname);

seg->ess = es;
seg->priv = s;
seg->fn.read = cache_read;
seg->fn.write = cache_write;
seg->fn.inspect = segcache_inspect;
seg->fn.truncate = seglio_cache_truncate;
seg->fn.remove = segcache_remove;
seg->fn.flush = cache_flush_range;
seg->fn.clone = segcache_clone;
seg->fn.signature = segcache_signature;
seg->fn.size = segcache_size;
seg->fn.block_size = segcache_block_size;
seg->fn.serialize = segcache_serialize;
seg->fn.deserialize = segcache_deserialize;
seg->fn.destroy = segcache_destroy;

if (s->c != NULL) { //** If no cache backend skip this only used for temporary deseril/serial
cache_lock(s->c);
CACHE_PRINT;
Expand All @@ -3655,9 +3641,26 @@ lio_segment_t *segment_cache_load(void *arg, ex_id_t id, lio_exnode_exchange_t *
{
lio_segment_t *seg = segment_cache_create(arg);
if (segment_deserialize(seg, id, ex) != 0) {
segment_destroy(seg);
tbx_obj_put(&seg->obj);
seg = NULL;
}
return(seg);
}

const lio_segment_vtable_t lio_cacheseg_vtable = {
.base.name = "segment_cache",
.base.free_fn = segcache_destroy,
.read = cache_read,
.write = cache_write,
.inspect = segcache_inspect,
.truncate = seglio_cache_truncate,
.remove = segcache_remove,
.flush = cache_flush_range,
.clone = segcache_clone,
.signature = segcache_signature,
.size = segcache_size,
.block_size = segcache_block_size,
.serialize = segcache_serialize,
.deserialize = segcache_deserialize,
};

52 changes: 28 additions & 24 deletions src/lio/segment/file.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <tbx/atomic_counter.h>
#include <tbx/iniparse.h>
#include <tbx/log.h>
#include <tbx/object.h>
#include <tbx/string_token.h>
#include <tbx/transfer_buffer.h>
#include <tbx/type_malloc.h>
Expand All @@ -46,6 +47,10 @@
#include "segment/file.h"
#include "service_manager.h"


// Forward declaration
const lio_segment_vtable_t lio_fileseg_vtable;

typedef struct {
char *fname;
char *qname;
Expand Down Expand Up @@ -476,7 +481,6 @@ int segfile_serialize_text(lio_segment_t *seg, lio_exnode_exchange_t *exp)
free(etext);
}
tbx_append_printf(segbuf, &sused, bufsize, "type=%s\n", seg->header.type);
tbx_append_printf(segbuf, &sused, bufsize, "ref_count=%d\n", seg->ref_count);
tbx_append_printf(segbuf, &sused, bufsize, "file=%s\n\n", s->fname);

exnode_exchange_append_text(exp, segbuf);
Expand Down Expand Up @@ -579,14 +583,12 @@ int segfile_deserialize(lio_segment_t *seg, ex_id_t id, lio_exnode_exchange_t *e
// segfile_destroy - Destroys a linear segment struct (not the data)
//***********************************************************************

void segfile_destroy(lio_segment_t *seg)
void segfile_destroy(tbx_ref_t *ref)
{
segfile_priv_t *s = (segfile_priv_t *)seg->priv;

//** Check if it's still in use
log_printf(15, "segfile_destroy: seg->id=" XIDT " ref_count=%d\n", segment_id(seg), seg->ref_count);
tbx_obj_t *obj = container_of(ref, tbx_obj_t, refcount);
lio_segment_t *seg = container_of(obj, lio_segment_t, obj);

if (seg->ref_count > 0) return;
segfile_priv_t *s = (segfile_priv_t *)seg->priv;

if (s->fname != NULL) free(s->fname);
if (s->qname != NULL) free(s->qname);
Expand Down Expand Up @@ -633,11 +635,10 @@ lio_segment_t *segment_file_create(void *arg)
//** Make the space
tbx_type_malloc_clear(seg, lio_segment_t, 1);
tbx_type_malloc_clear(s, segfile_priv_t, 1);

tbx_obj_init(&seg->obj, (tbx_vtable_t *) &lio_fileseg_vtable);
s->fname = NULL;

generate_ex_id(&(seg->header.id));
tbx_atomic_set(seg->ref_count, 0);
seg->header.type = SEGMENT_TYPE_FILE;

s->tpc = lio_lookup_service(es, ESS_RUNNING, ESS_TPC_UNLIMITED);
Expand All @@ -646,20 +647,6 @@ lio_segment_t *segment_file_create(void *arg)

seg->priv = s;
seg->ess = es;
seg->fn.read = segfile_read;
seg->fn.write = segfile_write;
seg->fn.inspect = segfile_inspect;
seg->fn.truncate = segfile_truncate;
seg->fn.remove = segfile_remove;
seg->fn.flush = segfile_flush;
seg->fn.clone = segfile_clone;
seg->fn.signature = segfile_signature;
seg->fn.size = segfile_size;
seg->fn.block_size = segfile_block_size;
seg->fn.serialize = segfile_serialize;
seg->fn.deserialize = segfile_deserialize;
seg->fn.destroy = segfile_destroy;

return(seg);
}

Expand All @@ -671,8 +658,25 @@ lio_segment_t *segment_file_load(void *arg, ex_id_t id, lio_exnode_exchange_t *e
{
lio_segment_t *seg = segment_file_create(arg);
if (segment_deserialize(seg, id, ex) != 0) {
segment_destroy(seg);
tbx_obj_put(&seg->obj);
seg = NULL;
}
return(seg);
}

const lio_segment_vtable_t lio_fileseg_vtable = {
.base.name = "segment_file",
.base.free_fn = segfile_destroy,
.read = segfile_read,
.write = segfile_write,
.inspect = segfile_inspect,
.truncate = segfile_truncate,
.remove = segfile_remove,
.flush = segfile_flush,
.clone = segfile_clone,
.signature = segfile_signature,
.size = segfile_size,
.block_size = segfile_block_size,
.serialize = segfile_serialize,
.deserialize = segfile_deserialize,
};
Loading

0 comments on commit ef08ece

Please sign in to comment.