Skip to content

Commit

Permalink
Reimplement insert trigger in C
Browse files Browse the repository at this point in the history
  • Loading branch information
knizhnik committed Dec 29, 2013
1 parent 7006ec1 commit e97a5be
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 3 deletions.
3 changes: 2 additions & 1 deletion CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

Initial version

--- Release version 1.02 (27.12.2013) -------------------------------------
--- Release version 1.02 (29.12.2013) -------------------------------------

1. Add support of MONEY type
2. Add support of VARCHAR type
Expand All @@ -16,3 +16,4 @@ Initial version
10. Add more overloaded binary operators accepting numeric parameter
11. Support grand, group and hash ALL/ANY aggregates.
12. Make timestamp join more flexible: add direction parameter
13. Reimplement insert trigger in C
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,7 @@ include $(top_builddir)/src/Makefile.global
include $(top_srcdir)/contrib/contrib-global.mk
endif

distrib: clean
distrib:
rm -f *.o
rm -rf results/ regression.diffs regression.out tmp_check/ log/
cd .. ; tar --exclude=.svn -chvzf imcs-1.02.tar.gz imcs
16 changes: 15 additions & 1 deletion imcs--1.1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ declare
create_drop_func text;
create_truncate_func text;
create_project_func text;
trigger_args text := '';
sep text;
perf text;
id_type text;
Expand Down Expand Up @@ -278,7 +279,9 @@ begin
create_delete_trigger_func := create_delete_trigger_func||table_name||'_delete(OLD."'||timestamp_id||'",OLD."'||timestamp_id||'"); return OLD; end; $$ language plpgsql';
end if;

create_insert_trigger := 'create trigger '||table_name||'_insert after insert on '||table_name||' for each row execute procedure '||table_name||'_insert_trigger()';
-- PL/pgSQL version of trigger functions atr too slow
-- create_insert_trigger := 'create trigger '||table_name||'_insert after insert on '||table_name||' for each row execute procedure '||table_name||'_insert_trigger()';
create_insert_trigger := 'create trigger '||table_name||'_insert after insert on '||table_name||' for each row execute procedure columnar_store_insert_trigger('''||table_name||''','||id_attnum||','||timestamp_attnum;
create_delete_trigger := 'create trigger '||table_name||'_delete before delete on '||table_name||' for each row execute procedure '||table_name||'_delete_trigger()';
create_truncate_trigger := 'create trigger '||table_name||'_truncate before truncate on '||table_name||' for each statement execute procedure '||table_name||'_truncate_trigger()';

Expand All @@ -292,6 +295,8 @@ begin
raise exception 'Size is not specified for attribute %',meta.attname;
end if;
end if;
trigger_args := trigger_args||','''||meta.attname||''','||meta.atttypid||','||attr_len;

if (meta.attname = timestamp_id) then
is_timestamp := true;
if (timeseries_id is not null) then
Expand Down Expand Up @@ -362,6 +367,8 @@ begin
create_concat_func := create_concat_func||'end loop; return root; end; $$ language plpgsql stable';
create_insert_trigger_func := create_insert_trigger_func||'; return NEW; end; $$ language plpgsql';

create_insert_trigger := create_insert_trigger||trigger_args||')';

execute create_type;
execute create_load_func;
execute create_get_func;
Expand Down Expand Up @@ -402,6 +409,8 @@ create function columnar_store_span(cs_id cstring, from_pos bigint, till_pos big
create function columnar_store_delete(cs_id cstring, search_result timeseries, field_type integer, is_timestamp bool, field_size integer) returns void as 'MODULE_PATHNAME' language C strict;
create function columnar_store_truncate(table_name cstring) returns void as 'MODULE_PATHNAME' language C strict;

create function columnar_store_insert_trigger() returns trigger as 'MODULE_PATHNAME' language C;

create function columnar_store_load(table_name cstring, timeseries_attnum integer, timestamp_attnum integer, already_sorted bool,filter cstring) returns bigint as 'MODULE_PATHNAME' language C;
create function columnar_store_append_char(cs_id cstring, val "char", field_type integer, is_timestamp bool, field_size integer) returns void as 'MODULE_PATHNAME','columnar_store_append_int8' language C strict;
create function columnar_store_append_int2(cs_id cstring, val int2, field_type integer, is_timestamp bool, field_size integer) returns void as 'MODULE_PATHNAME','columnar_store_append_int16' language C strict;
Expand All @@ -410,9 +419,12 @@ create function columnar_store_append_int8(cs_id cstring, val int8, field_type i
create function columnar_store_append_date(cs_id cstring, val date, field_type integer, is_timestamp bool, field_size integer) returns void as 'MODULE_PATHNAME','columnar_store_append_int32' language C strict;
create function columnar_store_append_time(cs_id cstring, val time, field_type integer, is_timestamp bool, field_size integer) returns void as 'MODULE_PATHNAME','columnar_store_append_int64' language C strict;
create function columnar_store_append_timestamp(cs_id cstring, val timestamp, field_type integer, is_timestamp bool, field_size integer) returns void as 'MODULE_PATHNAME','columnar_store_append_int64' language C strict;
create function columnar_store_append_money(cs_id cstring, val timestamp, field_type integer, is_timestamp bool, field_size integer) returns void as 'MODULE_PATHNAME','columnar_store_append_int64' language C strict;
create function columnar_store_append_float4(cs_id cstring, val float4, field_type integer, is_timestamp bool, field_size integer) returns void as 'MODULE_PATHNAME','columnar_store_append_float' language C strict;
create function columnar_store_append_float8(cs_id cstring, val float8, field_type integer, is_timestamp bool, field_size integer) returns void as 'MODULE_PATHNAME','columnar_store_append_double' language C strict;
create function columnar_store_append_bpchar(cs_id cstring, val text, field_type integer, is_timestamp bool, field_size integer) returns void as 'MODULE_PATHNAME','columnar_store_append_char' language C strict;
create function columnar_store_append_varchar(cs_id cstring, val text, field_type integer, is_timestamp bool, field_size integer) returns void as 'MODULE_PATHNAME','columnar_store_append_char' language C strict;


create function columnar_store_search_char(cs_id cstring, from_ts "char", till_ts "char", field_type integer) returns timeseries as 'MODULE_PATHNAME','columnar_store_search_int8' language C stable;
create function columnar_store_search_int2(cs_id cstring, from_ts int2, till_ts int2, field_type integer) returns timeseries as 'MODULE_PATHNAME','columnar_store_search_int16' language C stable;
Expand Down Expand Up @@ -993,9 +1005,11 @@ create function cs_to_date_array(timeseries) returns date[] as 'MODULE_PATHNAME'
create function cs_to_int8_array(timeseries) returns int8[] as 'MODULE_PATHNAME','cs_to_array' language C stable strict;
create function cs_to_time_array(timeseries) returns time[] as 'MODULE_PATHNAME','cs_to_array' language C stable strict;
create function cs_to_timestamp_array(timeseries) returns timestamp[] as 'MODULE_PATHNAME','cs_to_array' language C stable strict;
create function cs_to_money_array(timeseries) returns money[] as 'MODULE_PATHNAME','cs_to_array' language C stable strict;
create function cs_to_float4_array(timeseries) returns float4[] as 'MODULE_PATHNAME','cs_to_array' language C stable strict;
create function cs_to_float8_array(timeseries) returns float8[] as 'MODULE_PATHNAME','cs_to_array' language C stable strict;
create function cs_to_bpchar_array(timeseries) returns bpchar[] as 'MODULE_PATHNAME','cs_to_array' language C stable strict;
create function cs_to_varchar_array(timeseries) returns varchar[] as 'MODULE_PATHNAME','cs_to_array' language C stable strict;

create function cs_from_array(anyarray, elem_size integer default 0) returns timeseries as 'MODULE_PATHNAME' language C stable strict;

Expand Down
170 changes: 170 additions & 0 deletions imcs.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
#include "func.h"
#include "smp.h"
#include "executor/spi.h"
#include "commands/trigger.h"
#include "utils/timestamp.h"
#include "utils/rel.h"
#include "utils/date.h"
#include "utils/datetime.h"
#include "utils/nabstime.h"
Expand Down Expand Up @@ -255,6 +257,7 @@ PG_FUNCTION_INFO_V1(columnar_store_span);
PG_FUNCTION_INFO_V1(columnar_store_load);
PG_FUNCTION_INFO_V1(columnar_store_delete);
PG_FUNCTION_INFO_V1(columnar_store_truncate);
PG_FUNCTION_INFO_V1(columnar_store_insert_trigger);
PG_FUNCTION_INFO_V1(columnar_store_search_int8);
PG_FUNCTION_INFO_V1(columnar_store_search_int16);
PG_FUNCTION_INFO_V1(columnar_store_search_int32);
Expand Down Expand Up @@ -452,6 +455,7 @@ Datum columnar_store_span(PG_FUNCTION_ARGS);
Datum columnar_store_load(PG_FUNCTION_ARGS);
Datum columnar_store_delete(PG_FUNCTION_ARGS);
Datum columnar_store_truncate(PG_FUNCTION_ARGS);
Datum columnar_store_insert_trigger(PG_FUNCTION_ARGS);
Datum columnar_store_search_int8(PG_FUNCTION_ARGS);
Datum columnar_store_search_int16(PG_FUNCTION_ARGS);
Datum columnar_store_search_int32(PG_FUNCTION_ARGS);
Expand Down Expand Up @@ -3805,6 +3809,172 @@ Datum columnar_store_load(PG_FUNCTION_ARGS)
PG_RETURN_INT64(n_records);
}

Datum columnar_store_insert_trigger(PG_FUNCTION_ARGS)
{
TriggerData* trigger_data;
Trigger* trigger;
char const* table_name;
int id_attnum;
int timestamp_attnum;
int table_name_len;
int i, n_attrs;
Oid* attr_type_oid;
imcs_elem_typeid_t* attr_type;
int* attr_size;
char** attr_name;
char** cs_id_prefix;
int* cs_id_prefix_len;
int cs_id_max_len = 256;
char* cs_id = (char*)palloc(cs_id_max_len);
text* t;
Datum* values;
bool* nulls;
imcs_timeseries_t* ts;
char* id = NULL;
int id_len = 0;
int len;
char id_buf[32];

if (!CALLED_AS_TRIGGER(fcinfo)) {
ereport(ERROR, (errcode(ERRCODE_TRIGGERED_ACTION_EXCEPTION), (errmsg("columnar_store_insert_trigger can be called only by trigger"))));
}
trigger_data = (TriggerData*)fcinfo->context;
trigger = trigger_data->tg_trigger;
table_name = trigger->tgargs[0];
id_attnum = atoi(trigger->tgargs[1]);
timestamp_attnum = atoi(trigger->tgargs[2]);
table_name_len = strlen(table_name);
n_attrs = trigger->tgnargs/3-1;

attr_type_oid = (imcs_elem_typeid_t*)palloc(n_attrs*sizeof(imcs_elem_typeid_t));
attr_type = (Oid*)palloc(n_attrs*sizeof(Oid));
attr_size = (int*)palloc(n_attrs*sizeof(int));
attr_name = (char**)palloc(n_attrs*sizeof(char*));
cs_id_prefix = (char**)palloc(n_attrs*sizeof(char*));
cs_id_prefix_len = (int*)palloc(n_attrs*sizeof(int));

values = (Datum*)palloc(sizeof(Datum)*n_attrs);
nulls = (bool*)palloc(sizeof(bool)*n_attrs);

for (i = 0; i < n_attrs; i++) {
attr_name[i] = trigger->tgargs[i*3+3];
attr_type_oid[i] = atoi(trigger->tgargs[i*3+4]);
attr_type[i] = imcs_oid_to_typeid(attr_type_oid[i]);
attr_size[i] = atoi(trigger->tgargs[i*3+5]);
cs_id_prefix_len[i] = table_name_len + strlen(attr_name[i]) + 1;
cs_id_prefix[i] = (char*)palloc(cs_id_prefix_len[i]+1);
sprintf(cs_id_prefix[i], "%s-%s", table_name, attr_name[i]);
}

ts = imcs_get_timeseries(cs_id_prefix[timestamp_attnum-1], attr_type[timestamp_attnum-1], true, attr_size[timestamp_attnum-1], true);

if (id_attnum != 0) { /* in case of single timeseries, dummy hash entry to check if timeseries was already initialized is not needed: use entry for timestamp */
ts->count = 1;
}
heap_deform_tuple(trigger_data->tg_trigtuple, trigger_data->tg_relation->rd_att, values, nulls);
if (id_attnum != 0) {
if (nulls[id_attnum-1]) {
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), (errmsg("Timseries identifier can not be NULL"))));
}
switch (attr_type[id_attnum-1]) {
case TID_int8:
id_len = sprintf(id_buf, "%d", DatumGetChar(values[id_attnum-1]));
break;
case TID_int16:
id_len = sprintf(id_buf, "%d", DatumGetInt16(values[id_attnum-1]));
break;
case TID_int32:
id_len = sprintf(id_buf, "%d", DatumGetInt32(values[id_attnum-1]));
break;
case TID_int64:
id_len = sprintf(id_buf, "%lld", (long long)DatumGetInt64(values[id_attnum-1]));
break;
case TID_char:
t = DatumGetTextP(values[id_attnum-1]);
id = (char*)VARDATA(t);
id_len = VARSIZE(t) - VARHDRSZ;
if (attr_type_oid[id_attnum-1] == BPCHAROID) {
while (id_len != 0 && id[id_len-1] == ' ') {
id_len -= 1;
}
}
break;
default:
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), (errmsg("Unsupported timeseries ID type %d", attr_type_oid[id_attnum-1]))));
}
}
for (i = 0; i < n_attrs; i++) {
if (nulls[i]) {
if (imcs_substitute_nulls) {
values[i] = 0;
} else {
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), (errmsg("NULL values are not supported by columnar store"))));
}
}
if (i+1 != id_attnum) {
bool is_timestamp = i+1 == timestamp_attnum;
char *str;
if (id_attnum != 0) {
int prefix_len = cs_id_prefix_len[i];
while (cs_id_max_len < prefix_len + id_len + 2) {
cs_id_max_len *= 2;
pfree(cs_id);
cs_id = (char*)palloc(cs_id_max_len);
}
memcpy(cs_id, cs_id_prefix[i], prefix_len);
cs_id[prefix_len] = '-';
memcpy(cs_id + prefix_len + 1, id, id_len);
cs_id[prefix_len + id_len + 1] = '\0';
} else {
cs_id = cs_id_prefix[i];
}
ts = imcs_get_timeseries(cs_id, attr_type[i], is_timestamp, attr_size[i], true);
switch (attr_type[i]) {
case TID_int8:
imcs_append_int8(ts, DatumGetChar(values[i]));
break;
case TID_int16:
imcs_append_int16(ts, DatumGetInt16(values[i]));
break;
case TID_int32:
case TID_date:
imcs_append_int32(ts, DatumGetInt32(values[i]));
break;
case TID_int64:
case TID_time:
case TID_timestamp:
case TID_money:
imcs_append_int64(ts, DatumGetInt64(values[i]));
break;
case TID_float:
imcs_append_float(ts, DatumGetFloat4(values[i]));
break;
case TID_double:
imcs_append_double(ts, DatumGetFloat8(values[i]));
break;
case TID_char:
if (nulls[i]) { /* substitute NULL with empty string */
imcs_append_char(ts, NULL, 0);
} else {
t = DatumGetTextP(values[i]);
str = (char*)VARDATA(t);
len = VARSIZE(t) - VARHDRSZ;
if (attr_type_oid[i] == BPCHAROID) {
while (len != 0 && str[len-1] == ' ') {
len -= 1;
}
}
imcs_append_char(ts, str, len);
}
break;
default:
Assert(false);
}
}
}
PG_RETURN_POINTER(NULL);
}

Datum cs_cut(PG_FUNCTION_ARGS)
{
bytea* str = PG_GETARG_BYTEA_P(0);
Expand Down

0 comments on commit e97a5be

Please sign in to comment.