From b5e394163e6cdf8b8c99aadd1a5d7b9ddd0f7516 Mon Sep 17 00:00:00 2001 From: Jason Petersen Date: Sun, 12 May 2024 22:45:30 -0600 Subject: [PATCH 1/4] Add LOCF and gap-fill logic --- doc/timeseries.md | 35 +++++++++++++++ sql/timeseries.sql | 104 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 139 insertions(+) diff --git a/doc/timeseries.md b/doc/timeseries.md index 37491db..e2ef44a 100644 --- a/doc/timeseries.md +++ b/doc/timeseries.md @@ -48,6 +48,41 @@ Sometimes you know older data isn't queried very often, but still don't want to By calling `set_ts_compression_policy` on a time-series table with an appropriate interval (perhaps`'1 month'`), this extension will take care of compressing partitions (using a columnar storage method) older than the specified interval, once an hour. As with the retention policy functionality, a function is also provided for clearing any existing policy (existing partitions will not be decompressed, however). +### Analytics Helpers + +This extension includes several functions intended to make writing correct time-series queries easier. Certain concepts can be difficult to express in standard SQL and helper functions can aid in readability and maintainability. + +#### `first` and `last` + +These two functions help clean up the syntax of a fairly common pattern: a query is grouped by one dimension, but a user wants to know what the first or last row in a group is when ordered by a _different_ dimension. + +For instance, you might have a cloud computing platform reporting metrics and wish to know the latest (in time) CPU utilization metric for each machine in the platform: + +```sql +SELECT machine_id, + last(cpu_util, recorded_at) +FROM events +GROUP BY machine_id; +``` + +#### `date_bin_table` + +This function automates the tedium of aligning time-series values to a given width, or "stride", and makes sure to include NULL rows for any time periods where the source table has no data points. + +It must be called against a time-series table, but apart from that consideration using it is pretty straightforward: + +```sql +SELECT * FROM date_bin_table(NULL::target_table, '1 hour'); +``` + +The output of this query will differ from simply hitting the target table directly in three ways: + + * Rows will be sorted by time, ascending + * The time column's values will be binned to the provided width + * Extra rows will be added for periods with no data. They will include the time stamp for that bin and NULL in all other columns + + + ## Roadmap While `timeseries` is still in its early days, we have a concrete vision for the features we will be including in the future. Feedback on the importance of a given feature to customer use cases will help us better prioritize the following lists. diff --git a/sql/timeseries.sql b/sql/timeseries.sql index 3d83545..09ec9bb 100644 --- a/sql/timeseries.sql +++ b/sql/timeseries.sql @@ -496,3 +496,107 @@ CREATE OR REPLACE AGGREGATE last(value anyelement, rank anycompatible) ( FINALFUNC = endpoint_final, FINALFUNC_EXTRA ); + + +-- When provided with a table, interval, and time range, +-- this function will bin all data within the specified +-- range into rows spaced according to the stride. +-- +-- Time bins which lack data in the input table will still +-- receive a single row of output with NULLs in columns +-- other than that row's date bin value. +CREATE OR REPLACE FUNCTION public.date_bin_table (target_table_elem anyelement, time_stride interval, time_range tstzrange) + RETURNS SETOF anyelement + LANGUAGE plpgsql +AS $function$ +#print_strict_params on +DECLARE + target_table_id regclass; + tl_sql text; + part_col_name name; + part_col_type regtype; + col_name name; + prev_lead_time interval; + part_duration interval; + leading_partitions numeric; +BEGIN + SELECT oid INTO target_table_id + FROM pg_class cl + WHERE reltype=pg_typeof(target_table_elem); + + RAISE NOTICE 'target_table_id = %', target_table_id; + + SELECT + at.attname, + format_type(at.atttypid, at.atttypmod)::regtype + INTO STRICT part_col_name, part_col_type + FROM ( + SELECT partrelid, unnest(pt.partattrs) partattnum + FROM pg_partitioned_table pt + WHERE pt.partrelid = target_table_id + ) pat + JOIN pg_attribute at ON at.attrelid = pat.partrelid AND + at.attnum = pat.partattnum; + + RAISE NOTICE 'part_col_name = %', part_col_name; + RAISE NOTICE 'part_col_type = %', part_col_type; + + FOR col_name IN + SELECT attname FROM pg_attribute + WHERE attnum > 0 AND + NOT attisdropped AND + attrelid=target_table_id + ORDER BY attnum ASC + LOOP + RAISE NOTICE 'col_name = %', col_name; + + IF col_name = part_col_name + THEN + tl_sql := format('%sCAST(date_series.date AS %s) %I, ', tl_sql, part_col_type, col_name); + ELSE + tl_sql := format('%sdata.%I, ', tl_sql, col_name); + END IF; + + RAISE NOTICE 'tl_sql: %', tl_sql; + END LOOP; + + tl_sql := rtrim(tl_sql, ', '); + RAISE NOTICE 'final tl_sql: %', tl_sql; + + RETURN QUERY EXECUTE format($query$ + WITH data AS ( + SELECT *, date_bin($1, %I, $2) binned_date + FROM %I + WHERE %I BETWEEN $2 AND $3 + ORDER BY binned_date + ) + SELECT %s + FROM generate_series($2, $3, $1) date_series(date) + LEFT JOIN data + ON data.binned_date = date_series.date;$query$, + part_col_name, + target_table_id, + part_col_name, + tl_sql) + USING time_stride, lower(time_range), upper(time_range); + RETURN; +END; +$function$; + +-- Function implementation for LOCF: last-observed carry-forward. +-- Intended for use on the output of date_bin_table in order to +-- fill NULL rows with the last observed value. +CREATE OR REPLACE FUNCTION public.locf_agg(state anyelement, value anyelement) + RETURNS anyelement + LANGUAGE plpgsql +AS $function$ +BEGIN + RETURN COALESCE(value, state); +END; +$function$; + +-- Aggregate for LOCF. For use in a WINDOW clause +CREATE AGGREGATE locf(anyelement) ( + SFUNC = locf_agg, + STYPE = anyelement +); From 8d27e8361cdae264420541dce86bb09e7fcbeaec Mon Sep 17 00:00:00 2001 From: Jason Petersen Date: Wed, 15 May 2024 15:56:59 -0600 Subject: [PATCH 2/4] Remove debugging statements, add error-handling --- sql/timeseries.sql | 34 ++++++++++++++++++++++------------ 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/sql/timeseries.sql b/sql/timeseries.sql index 09ec9bb..c2d9661 100644 --- a/sql/timeseries.sql +++ b/sql/timeseries.sql @@ -497,7 +497,6 @@ CREATE OR REPLACE AGGREGATE last(value anyelement, rank anycompatible) ( FINALFUNC_EXTRA ); - -- When provided with a table, interval, and time range, -- this function will bin all data within the specified -- range into rows spaced according to the stride. @@ -505,6 +504,8 @@ CREATE OR REPLACE AGGREGATE last(value anyelement, rank anycompatible) ( -- Time bins which lack data in the input table will still -- receive a single row of output with NULLs in columns -- other than that row's date bin value. +-- +-- The target table must be time-series enabled. CREATE OR REPLACE FUNCTION public.date_bin_table (target_table_elem anyelement, time_stride interval, time_range tstzrange) RETURNS SETOF anyelement LANGUAGE plpgsql @@ -524,7 +525,22 @@ BEGIN FROM pg_class cl WHERE reltype=pg_typeof(target_table_elem); - RAISE NOTICE 'target_table_id = %', target_table_id; + IF target_table_id IS NULL THEN + RAISE invalid_parameter_value USING + MESSAGE = 'invalid table type', + DETAIL = 'Target table element must be a table type', + HINT = 'Provide a value with a type corresponding to the table to query.'; + END IF; + + PERFORM * + FROM public.ts_config + WHERE "table_id"=target_table_id; + IF NOT FOUND THEN + RAISE object_not_in_prerequisite_state USING + MESSAGE = 'target table must be time-series enhanced', + DETAIL = 'Cannot query tables without time-series enhancements', + HINT = format('Call %L to enable time-series enhancements', 'enable_ts_table'); + END IF; SELECT at.attname, @@ -538,9 +554,6 @@ BEGIN JOIN pg_attribute at ON at.attrelid = pat.partrelid AND at.attnum = pat.partattnum; - RAISE NOTICE 'part_col_name = %', part_col_name; - RAISE NOTICE 'part_col_type = %', part_col_type; - FOR col_name IN SELECT attname FROM pg_attribute WHERE attnum > 0 AND @@ -548,20 +561,17 @@ BEGIN attrelid=target_table_id ORDER BY attnum ASC LOOP - RAISE NOTICE 'col_name = %', col_name; - IF col_name = part_col_name THEN - tl_sql := format('%sCAST(date_series.date AS %s) %I, ', tl_sql, part_col_type, col_name); + tl_sql := format('%sCAST(date_series.date AS %s) %I, ', + tl_sql, part_col_type, col_name); ELSE - tl_sql := format('%sdata.%I, ', tl_sql, col_name); + tl_sql := format('%sdata.%I, ', + tl_sql, col_name); END IF; - - RAISE NOTICE 'tl_sql: %', tl_sql; END LOOP; tl_sql := rtrim(tl_sql, ', '); - RAISE NOTICE 'final tl_sql: %', tl_sql; RETURN QUERY EXECUTE format($query$ WITH data AS ( From e11ca55b92e6039881732df068f425970c08c820 Mon Sep 17 00:00:00 2001 From: Jason Petersen Date: Wed, 15 May 2024 16:07:08 -0600 Subject: [PATCH 3/4] Add analytics tests --- test/expected/basic_usage.out | 32 ++++++++++++++++++++++++++++++-- test/sql/basic_usage.sql | 25 +++++++++++++++++-------- 2 files changed, 47 insertions(+), 10 deletions(-) diff --git a/test/expected/basic_usage.out b/test/expected/basic_usage.out index 7826867..8d7bfb4 100644 --- a/test/expected/basic_usage.out +++ b/test/expected/basic_usage.out @@ -140,9 +140,15 @@ SELECT COUNT(*) < 10 AS fewer_partitions FROM ts_part_info; CREATE TABLE events ( user_id bigint, event_id bigint, - event_time timestamptz, + event_time timestamptz NOT NULL, value float -); +) PARTITION BY RANGE (event_time); +SELECT enable_ts_table('events'); + enable_ts_table +----------------- + +(1 row) + COPY events FROM STDIN WITH (FORMAT 'csv'); SELECT first(value, event_time), user_id FROM events GROUP BY user_id; first | user_id @@ -172,3 +178,25 @@ SELECT last(event_id, event_time), user_id FROM events GROUP BY user_id; 6 | 1 (2 rows) +SELECT last(user_id, value) top_performer, + locf(avg(value)) OVER (ORDER BY event_time), + event_time +FROM date_bin_table(NULL::events, '1 minute', + '[2020-11-04 15:50:00-08, 2020-11-04 16:00:00-08]') +GROUP BY 3 +ORDER BY 3; + top_performer | locf | event_time +---------------+--------------------+------------------------------ + | | Wed Nov 04 15:50:00 2020 PST + 2 | 1.4 | Wed Nov 04 15:51:00 2020 PST + | 1.4 | Wed Nov 04 15:52:00 2020 PST + 2 | 1.5 | Wed Nov 04 15:53:00 2020 PST + | 1.5 | Wed Nov 04 15:54:00 2020 PST + 2 | 1.6 | Wed Nov 04 15:55:00 2020 PST + | 1.6 | Wed Nov 04 15:56:00 2020 PST + 2 | 1.7 | Wed Nov 04 15:57:00 2020 PST + 2 | 1.8 | Wed Nov 04 15:58:00 2020 PST + 2 | 1.9000000000000001 | Wed Nov 04 15:59:00 2020 PST + | 1.9000000000000001 | Wed Nov 04 16:00:00 2020 PST +(11 rows) + diff --git a/test/sql/basic_usage.sql b/test/sql/basic_usage.sql index 47d3d03..81a5a04 100644 --- a/test/sql/basic_usage.sql +++ b/test/sql/basic_usage.sql @@ -52,20 +52,21 @@ SELECT COUNT(*) < 10 AS fewer_partitions FROM ts_part_info; CREATE TABLE events ( user_id bigint, event_id bigint, - event_time timestamptz, + event_time timestamptz NOT NULL, value float -); +) PARTITION BY RANGE (event_time); +SELECT enable_ts_table('events'); COPY events FROM STDIN WITH (FORMAT 'csv'); -1,1,"2020-11-04 15:54:02.226999-08",1.1 -1,2,"2020-11-04 15:55:02.226999-08",1.2 -1,3,"2020-11-04 15:56:02.226999-08",1.3 +1,1,"2020-11-04 15:51:02.226999-08",1.1 +1,2,"2020-11-04 15:53:02.226999-08",1.2 +1,3,"2020-11-04 15:55:02.226999-08",1.3 1,4,"2020-11-04 15:57:02.226999-08",1.4 1,5,"2020-11-04 15:58:02.226999-08",1.5 1,6,"2020-11-04 15:59:02.226999-08",1.6 -2,7,"2020-11-04 15:54:02.226999-08",1.7 -2,8,"2020-11-04 15:55:02.226999-08",1.8 -2,9,"2020-11-04 15:56:02.226999-08",1.9 +2,7,"2020-11-04 15:51:02.226999-08",1.7 +2,8,"2020-11-04 15:53:02.226999-08",1.8 +2,9,"2020-11-04 15:55:02.226999-08",1.9 2,10,"2020-11-04 15:57:02.226999-08",2.0 2,11,"2020-11-04 15:58:02.226999-08",2.1 2,12,"2020-11-04 15:59:02.226999-08",2.2 @@ -78,3 +79,11 @@ SELECT last(value, event_time), user_id FROM events GROUP BY user_id; SELECT first(event_id, event_time), user_id FROM events GROUP BY user_id; SELECT last(event_id, event_time), user_id FROM events GROUP BY user_id; + +SELECT last(user_id, value) top_performer, + locf(avg(value)) OVER (ORDER BY event_time), + event_time +FROM date_bin_table(NULL::events, '1 minute', + '[2020-11-04 15:50:00-08, 2020-11-04 16:00:00-08]') +GROUP BY 3 +ORDER BY 3; From 7dc0c4f3737b7eb8bfd89967bb7609d3b850e8c9 Mon Sep 17 00:00:00 2001 From: Jason Petersen Date: Wed, 15 May 2024 16:23:06 -0600 Subject: [PATCH 4/4] Add compression tests --- sql/timeseries.sql | 14 ++++++++------ test/expected/basic_usage.out | 20 +++++++++++++++++++- test/sql/basic_usage.sql | 6 +++++- 3 files changed, 32 insertions(+), 8 deletions(-) diff --git a/sql/timeseries.sql b/sql/timeseries.sql index c2d9661..016bf1c 100644 --- a/sql/timeseries.sql +++ b/sql/timeseries.sql @@ -281,11 +281,13 @@ SELECT pt.parentrelid as table_id, pg_get_expr(c.relpartbound, c.oid) AS part_range, pg_table_size(pt.relid) AS table_size_bytes, pg_indexes_size(pt.relid) AS index_size_bytes, - pg_total_relation_size(pt.relid) AS total_size_bytes + pg_total_relation_size(pt.relid) AS total_size_bytes, + am.amname AS access_method FROM @extschema@.ts_config tsc, pg_partition_tree(tsc.table_id) pt, - pg_class c - WHERE pt.isleaf AND pt.relid = c.oid + pg_class c, + pg_am am + WHERE pt.isleaf AND pt.relid = c.oid AND c.relam = am.oid ORDER BY 2 ASC; -- Unlike the above view, this sums partitions for each time-series table. @@ -506,7 +508,7 @@ CREATE OR REPLACE AGGREGATE last(value anyelement, rank anycompatible) ( -- other than that row's date bin value. -- -- The target table must be time-series enabled. -CREATE OR REPLACE FUNCTION public.date_bin_table (target_table_elem anyelement, time_stride interval, time_range tstzrange) +CREATE OR REPLACE FUNCTION @extschema@.date_bin_table (target_table_elem anyelement, time_stride interval, time_range tstzrange) RETURNS SETOF anyelement LANGUAGE plpgsql AS $function$ @@ -533,7 +535,7 @@ BEGIN END IF; PERFORM * - FROM public.ts_config + FROM @extschema@.ts_config WHERE "table_id"=target_table_id; IF NOT FOUND THEN RAISE object_not_in_prerequisite_state USING @@ -596,7 +598,7 @@ $function$; -- Function implementation for LOCF: last-observed carry-forward. -- Intended for use on the output of date_bin_table in order to -- fill NULL rows with the last observed value. -CREATE OR REPLACE FUNCTION public.locf_agg(state anyelement, value anyelement) +CREATE OR REPLACE FUNCTION @extschema@.locf_agg(state anyelement, value anyelement) RETURNS anyelement LANGUAGE plpgsql AS $function$ diff --git a/test/expected/basic_usage.out b/test/expected/basic_usage.out index 8d7bfb4..57e2ac7 100644 --- a/test/expected/basic_usage.out +++ b/test/expected/basic_usage.out @@ -65,6 +65,12 @@ SELECT COUNT(*) > 10 AS has_partitions FROM ts_part_info; t (1 row) +SELECT COUNT(*) > 0 AS "compressed?" FROM ts_part_info WHERE access_method = 'columnar'; + compressed? +------------- + f +(1 row) + SELECT set_ts_retention_policy('measurements', '90 days'); set_ts_retention_policy ------------------------- @@ -119,7 +125,19 @@ SELECT premake FROM part_config WHERE parent_table='public.measurements'; 1 (1 row) -SELECT set_ts_retention_policy('measurements', '1 days'); +SELECT apply_compression_policy('measurements', '1 day'); + apply_compression_policy +-------------------------- + +(1 row) + +SELECT COUNT(*) > 0 AS "compressed?" FROM ts_part_info WHERE access_method = 'columnar'; + compressed? +------------- + t +(1 row) + +SELECT set_ts_retention_policy('measurements', '1 day'); set_ts_retention_policy ------------------------- diff --git a/test/sql/basic_usage.sql b/test/sql/basic_usage.sql index 81a5a04..bd35e05 100644 --- a/test/sql/basic_usage.sql +++ b/test/sql/basic_usage.sql @@ -32,6 +32,7 @@ SELECT partition_duration, partition_lead_time FROM ts_config ORDER BY table_id: SELECT partition_interval, premake FROM part_config WHERE parent_table='public.measurements'; SELECT COUNT(*) > 10 AS has_partitions FROM ts_part_info; +SELECT COUNT(*) > 0 AS "compressed?" FROM ts_part_info WHERE access_method = 'columnar'; SELECT set_ts_retention_policy('measurements', '90 days'); SELECT retention_duration FROM ts_config WHERE table_id='measurements'::regclass; @@ -45,7 +46,10 @@ SELECT set_ts_lead_time('measurements', '1 day'); SELECT partition_lead_time FROM ts_config WHERE table_id='measurements'::regclass; SELECT premake FROM part_config WHERE parent_table='public.measurements'; -SELECT set_ts_retention_policy('measurements', '1 days'); +SELECT apply_compression_policy('measurements', '1 day'); +SELECT COUNT(*) > 0 AS "compressed?" FROM ts_part_info WHERE access_method = 'columnar'; + +SELECT set_ts_retention_policy('measurements', '1 day'); SELECT run_maintenance(); SELECT COUNT(*) < 10 AS fewer_partitions FROM ts_part_info;