diff --git a/CHANGES b/CHANGES index 0b48a1a..1e524d5 100644 --- a/CHANGES +++ b/CHANGES @@ -2,7 +2,7 @@ Initial version ---- Release version 1.02 (27.12.2013) ------------------------------------- +--- Release version 1.02 (29.12.2013) ------------------------------------- 1. Add support of MONEY type 2. Add support of VARCHAR type @@ -16,3 +16,4 @@ Initial version 10. Add more overloaded binary operators accepting numeric parameter 11. Support grand, group and hash ALL/ANY aggregates. 12. Make timestamp join more flexible: add direction parameter +13. Reimplement insert trigger in C \ No newline at end of file diff --git a/Makefile b/Makefile index 641e2bc..e583f79 100644 --- a/Makefile +++ b/Makefile @@ -31,5 +31,7 @@ include $(top_builddir)/src/Makefile.global include $(top_srcdir)/contrib/contrib-global.mk endif -distrib: clean +distrib: + rm -f *.o + rm -rf results/ regression.diffs regression.out tmp_check/ log/ cd .. ; tar --exclude=.svn -chvzf imcs-1.02.tar.gz imcs \ No newline at end of file diff --git a/imcs--1.1.sql b/imcs--1.1.sql index 0d74394..9f759f0 100644 --- a/imcs--1.1.sql +++ b/imcs--1.1.sql @@ -56,6 +56,7 @@ declare create_drop_func text; create_truncate_func text; create_project_func text; + trigger_args text := ''; sep text; perf text; id_type text; @@ -278,7 +279,9 @@ begin create_delete_trigger_func := create_delete_trigger_func||table_name||'_delete(OLD."'||timestamp_id||'",OLD."'||timestamp_id||'"); return OLD; end; $$ language plpgsql'; end if; - create_insert_trigger := 'create trigger '||table_name||'_insert after insert on '||table_name||' for each row execute procedure '||table_name||'_insert_trigger()'; + -- PL/pgSQL version of trigger functions atr too slow + -- create_insert_trigger := 'create trigger '||table_name||'_insert after insert on '||table_name||' for each row execute procedure '||table_name||'_insert_trigger()'; + create_insert_trigger := 'create trigger '||table_name||'_insert after insert on '||table_name||' for each row execute procedure columnar_store_insert_trigger('''||table_name||''','||id_attnum||','||timestamp_attnum; create_delete_trigger := 'create trigger '||table_name||'_delete before delete on '||table_name||' for each row execute procedure '||table_name||'_delete_trigger()'; create_truncate_trigger := 'create trigger '||table_name||'_truncate before truncate on '||table_name||' for each statement execute procedure '||table_name||'_truncate_trigger()'; @@ -292,6 +295,8 @@ begin raise exception 'Size is not specified for attribute %',meta.attname; end if; end if; + trigger_args := trigger_args||','''||meta.attname||''','||meta.atttypid||','||attr_len; + if (meta.attname = timestamp_id) then is_timestamp := true; if (timeseries_id is not null) then @@ -362,6 +367,8 @@ begin create_concat_func := create_concat_func||'end loop; return root; end; $$ language plpgsql stable'; create_insert_trigger_func := create_insert_trigger_func||'; return NEW; end; $$ language plpgsql'; + create_insert_trigger := create_insert_trigger||trigger_args||')'; + execute create_type; execute create_load_func; execute create_get_func; @@ -402,6 +409,8 @@ create function columnar_store_span(cs_id cstring, from_pos bigint, till_pos big create function columnar_store_delete(cs_id cstring, search_result timeseries, field_type integer, is_timestamp bool, field_size integer) returns void as 'MODULE_PATHNAME' language C strict; create function columnar_store_truncate(table_name cstring) returns void as 'MODULE_PATHNAME' language C strict; +create function columnar_store_insert_trigger() returns trigger as 'MODULE_PATHNAME' language C; + create function columnar_store_load(table_name cstring, timeseries_attnum integer, timestamp_attnum integer, already_sorted bool,filter cstring) returns bigint as 'MODULE_PATHNAME' language C; create function columnar_store_append_char(cs_id cstring, val "char", field_type integer, is_timestamp bool, field_size integer) returns void as 'MODULE_PATHNAME','columnar_store_append_int8' language C strict; create function columnar_store_append_int2(cs_id cstring, val int2, field_type integer, is_timestamp bool, field_size integer) returns void as 'MODULE_PATHNAME','columnar_store_append_int16' language C strict; @@ -410,9 +419,12 @@ create function columnar_store_append_int8(cs_id cstring, val int8, field_type i create function columnar_store_append_date(cs_id cstring, val date, field_type integer, is_timestamp bool, field_size integer) returns void as 'MODULE_PATHNAME','columnar_store_append_int32' language C strict; create function columnar_store_append_time(cs_id cstring, val time, field_type integer, is_timestamp bool, field_size integer) returns void as 'MODULE_PATHNAME','columnar_store_append_int64' language C strict; create function columnar_store_append_timestamp(cs_id cstring, val timestamp, field_type integer, is_timestamp bool, field_size integer) returns void as 'MODULE_PATHNAME','columnar_store_append_int64' language C strict; +create function columnar_store_append_money(cs_id cstring, val timestamp, field_type integer, is_timestamp bool, field_size integer) returns void as 'MODULE_PATHNAME','columnar_store_append_int64' language C strict; create function columnar_store_append_float4(cs_id cstring, val float4, field_type integer, is_timestamp bool, field_size integer) returns void as 'MODULE_PATHNAME','columnar_store_append_float' language C strict; create function columnar_store_append_float8(cs_id cstring, val float8, field_type integer, is_timestamp bool, field_size integer) returns void as 'MODULE_PATHNAME','columnar_store_append_double' language C strict; create function columnar_store_append_bpchar(cs_id cstring, val text, field_type integer, is_timestamp bool, field_size integer) returns void as 'MODULE_PATHNAME','columnar_store_append_char' language C strict; +create function columnar_store_append_varchar(cs_id cstring, val text, field_type integer, is_timestamp bool, field_size integer) returns void as 'MODULE_PATHNAME','columnar_store_append_char' language C strict; + create function columnar_store_search_char(cs_id cstring, from_ts "char", till_ts "char", field_type integer) returns timeseries as 'MODULE_PATHNAME','columnar_store_search_int8' language C stable; create function columnar_store_search_int2(cs_id cstring, from_ts int2, till_ts int2, field_type integer) returns timeseries as 'MODULE_PATHNAME','columnar_store_search_int16' language C stable; @@ -993,9 +1005,11 @@ create function cs_to_date_array(timeseries) returns date[] as 'MODULE_PATHNAME' create function cs_to_int8_array(timeseries) returns int8[] as 'MODULE_PATHNAME','cs_to_array' language C stable strict; create function cs_to_time_array(timeseries) returns time[] as 'MODULE_PATHNAME','cs_to_array' language C stable strict; create function cs_to_timestamp_array(timeseries) returns timestamp[] as 'MODULE_PATHNAME','cs_to_array' language C stable strict; +create function cs_to_money_array(timeseries) returns money[] as 'MODULE_PATHNAME','cs_to_array' language C stable strict; create function cs_to_float4_array(timeseries) returns float4[] as 'MODULE_PATHNAME','cs_to_array' language C stable strict; create function cs_to_float8_array(timeseries) returns float8[] as 'MODULE_PATHNAME','cs_to_array' language C stable strict; create function cs_to_bpchar_array(timeseries) returns bpchar[] as 'MODULE_PATHNAME','cs_to_array' language C stable strict; +create function cs_to_varchar_array(timeseries) returns varchar[] as 'MODULE_PATHNAME','cs_to_array' language C stable strict; create function cs_from_array(anyarray, elem_size integer default 0) returns timeseries as 'MODULE_PATHNAME' language C stable strict; diff --git a/imcs.c b/imcs.c index 6b48d93..74b6c73 100644 --- a/imcs.c +++ b/imcs.c @@ -4,7 +4,9 @@ #include "func.h" #include "smp.h" #include "executor/spi.h" +#include "commands/trigger.h" #include "utils/timestamp.h" +#include "utils/rel.h" #include "utils/date.h" #include "utils/datetime.h" #include "utils/nabstime.h" @@ -255,6 +257,7 @@ PG_FUNCTION_INFO_V1(columnar_store_span); PG_FUNCTION_INFO_V1(columnar_store_load); PG_FUNCTION_INFO_V1(columnar_store_delete); PG_FUNCTION_INFO_V1(columnar_store_truncate); +PG_FUNCTION_INFO_V1(columnar_store_insert_trigger); PG_FUNCTION_INFO_V1(columnar_store_search_int8); PG_FUNCTION_INFO_V1(columnar_store_search_int16); PG_FUNCTION_INFO_V1(columnar_store_search_int32); @@ -452,6 +455,7 @@ Datum columnar_store_span(PG_FUNCTION_ARGS); Datum columnar_store_load(PG_FUNCTION_ARGS); Datum columnar_store_delete(PG_FUNCTION_ARGS); Datum columnar_store_truncate(PG_FUNCTION_ARGS); +Datum columnar_store_insert_trigger(PG_FUNCTION_ARGS); Datum columnar_store_search_int8(PG_FUNCTION_ARGS); Datum columnar_store_search_int16(PG_FUNCTION_ARGS); Datum columnar_store_search_int32(PG_FUNCTION_ARGS); @@ -3805,6 +3809,172 @@ Datum columnar_store_load(PG_FUNCTION_ARGS) PG_RETURN_INT64(n_records); } +Datum columnar_store_insert_trigger(PG_FUNCTION_ARGS) +{ + TriggerData* trigger_data; + Trigger* trigger; + char const* table_name; + int id_attnum; + int timestamp_attnum; + int table_name_len; + int i, n_attrs; + Oid* attr_type_oid; + imcs_elem_typeid_t* attr_type; + int* attr_size; + char** attr_name; + char** cs_id_prefix; + int* cs_id_prefix_len; + int cs_id_max_len = 256; + char* cs_id = (char*)palloc(cs_id_max_len); + text* t; + Datum* values; + bool* nulls; + imcs_timeseries_t* ts; + char* id = NULL; + int id_len = 0; + int len; + char id_buf[32]; + + if (!CALLED_AS_TRIGGER(fcinfo)) { + ereport(ERROR, (errcode(ERRCODE_TRIGGERED_ACTION_EXCEPTION), (errmsg("columnar_store_insert_trigger can be called only by trigger")))); + } + trigger_data = (TriggerData*)fcinfo->context; + trigger = trigger_data->tg_trigger; + table_name = trigger->tgargs[0]; + id_attnum = atoi(trigger->tgargs[1]); + timestamp_attnum = atoi(trigger->tgargs[2]); + table_name_len = strlen(table_name); + n_attrs = trigger->tgnargs/3-1; + + attr_type_oid = (imcs_elem_typeid_t*)palloc(n_attrs*sizeof(imcs_elem_typeid_t)); + attr_type = (Oid*)palloc(n_attrs*sizeof(Oid)); + attr_size = (int*)palloc(n_attrs*sizeof(int)); + attr_name = (char**)palloc(n_attrs*sizeof(char*)); + cs_id_prefix = (char**)palloc(n_attrs*sizeof(char*)); + cs_id_prefix_len = (int*)palloc(n_attrs*sizeof(int)); + + values = (Datum*)palloc(sizeof(Datum)*n_attrs); + nulls = (bool*)palloc(sizeof(bool)*n_attrs); + + for (i = 0; i < n_attrs; i++) { + attr_name[i] = trigger->tgargs[i*3+3]; + attr_type_oid[i] = atoi(trigger->tgargs[i*3+4]); + attr_type[i] = imcs_oid_to_typeid(attr_type_oid[i]); + attr_size[i] = atoi(trigger->tgargs[i*3+5]); + cs_id_prefix_len[i] = table_name_len + strlen(attr_name[i]) + 1; + cs_id_prefix[i] = (char*)palloc(cs_id_prefix_len[i]+1); + sprintf(cs_id_prefix[i], "%s-%s", table_name, attr_name[i]); + } + + ts = imcs_get_timeseries(cs_id_prefix[timestamp_attnum-1], attr_type[timestamp_attnum-1], true, attr_size[timestamp_attnum-1], true); + + if (id_attnum != 0) { /* in case of single timeseries, dummy hash entry to check if timeseries was already initialized is not needed: use entry for timestamp */ + ts->count = 1; + } + heap_deform_tuple(trigger_data->tg_trigtuple, trigger_data->tg_relation->rd_att, values, nulls); + if (id_attnum != 0) { + if (nulls[id_attnum-1]) { + ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), (errmsg("Timseries identifier can not be NULL")))); + } + switch (attr_type[id_attnum-1]) { + case TID_int8: + id_len = sprintf(id_buf, "%d", DatumGetChar(values[id_attnum-1])); + break; + case TID_int16: + id_len = sprintf(id_buf, "%d", DatumGetInt16(values[id_attnum-1])); + break; + case TID_int32: + id_len = sprintf(id_buf, "%d", DatumGetInt32(values[id_attnum-1])); + break; + case TID_int64: + id_len = sprintf(id_buf, "%lld", (long long)DatumGetInt64(values[id_attnum-1])); + break; + case TID_char: + t = DatumGetTextP(values[id_attnum-1]); + id = (char*)VARDATA(t); + id_len = VARSIZE(t) - VARHDRSZ; + if (attr_type_oid[id_attnum-1] == BPCHAROID) { + while (id_len != 0 && id[id_len-1] == ' ') { + id_len -= 1; + } + } + break; + default: + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), (errmsg("Unsupported timeseries ID type %d", attr_type_oid[id_attnum-1])))); + } + } + for (i = 0; i < n_attrs; i++) { + if (nulls[i]) { + if (imcs_substitute_nulls) { + values[i] = 0; + } else { + ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), (errmsg("NULL values are not supported by columnar store")))); + } + } + if (i+1 != id_attnum) { + bool is_timestamp = i+1 == timestamp_attnum; + char *str; + if (id_attnum != 0) { + int prefix_len = cs_id_prefix_len[i]; + while (cs_id_max_len < prefix_len + id_len + 2) { + cs_id_max_len *= 2; + pfree(cs_id); + cs_id = (char*)palloc(cs_id_max_len); + } + memcpy(cs_id, cs_id_prefix[i], prefix_len); + cs_id[prefix_len] = '-'; + memcpy(cs_id + prefix_len + 1, id, id_len); + cs_id[prefix_len + id_len + 1] = '\0'; + } else { + cs_id = cs_id_prefix[i]; + } + ts = imcs_get_timeseries(cs_id, attr_type[i], is_timestamp, attr_size[i], true); + switch (attr_type[i]) { + case TID_int8: + imcs_append_int8(ts, DatumGetChar(values[i])); + break; + case TID_int16: + imcs_append_int16(ts, DatumGetInt16(values[i])); + break; + case TID_int32: + case TID_date: + imcs_append_int32(ts, DatumGetInt32(values[i])); + break; + case TID_int64: + case TID_time: + case TID_timestamp: + case TID_money: + imcs_append_int64(ts, DatumGetInt64(values[i])); + break; + case TID_float: + imcs_append_float(ts, DatumGetFloat4(values[i])); + break; + case TID_double: + imcs_append_double(ts, DatumGetFloat8(values[i])); + break; + case TID_char: + if (nulls[i]) { /* substitute NULL with empty string */ + imcs_append_char(ts, NULL, 0); + } else { + t = DatumGetTextP(values[i]); + str = (char*)VARDATA(t); + len = VARSIZE(t) - VARHDRSZ; + if (attr_type_oid[i] == BPCHAROID) { + while (len != 0 && str[len-1] == ' ') { + len -= 1; + } + } + imcs_append_char(ts, str, len); + } + break; + default: + Assert(false); + } + } + } + PG_RETURN_POINTER(NULL); +} + Datum cs_cut(PG_FUNCTION_ARGS) { bytea* str = PG_GETARG_BYTEA_P(0);