Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add LOCF and gap-fill logic #12

Merged
merged 4 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions doc/timeseries.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,41 @@ Sometimes you know older data isn't queried very often, but still don't want to

By calling `set_ts_compression_policy` on a time-series table with an appropriate interval (perhaps`'1 month'`), this extension will take care of compressing partitions (using a columnar storage method) older than the specified interval, once an hour. As with the retention policy functionality, a function is also provided for clearing any existing policy (existing partitions will not be decompressed, however).

### Analytics Helpers

This extension includes several functions intended to make writing correct time-series queries easier. Certain concepts can be difficult to express in standard SQL and helper functions can aid in readability and maintainability.

#### `first` and `last`

These two functions help clean up the syntax of a fairly common pattern: a query is grouped by one dimension, but a user wants to know what the first or last row in a group is when ordered by a _different_ dimension.

For instance, you might have a cloud computing platform reporting metrics and wish to know the latest (in time) CPU utilization metric for each machine in the platform:

```sql
SELECT machine_id,
last(cpu_util, recorded_at)
FROM events
GROUP BY machine_id;
```

#### `date_bin_table`

This function automates the tedium of aligning time-series values to a given width, or "stride", and makes sure to include NULL rows for any time periods where the source table has no data points.

It must be called against a time-series table, but apart from that consideration using it is pretty straightforward:

```sql
SELECT * FROM date_bin_table(NULL::target_table, '1 hour');
```

The output of this query will differ from simply hitting the target table directly in three ways:

* Rows will be sorted by time, ascending
* The time column's values will be binned to the provided width
* Extra rows will be added for periods with no data. They will include the time stamp for that bin and NULL in all other columns



## Roadmap

While `timeseries` is still in its early days, we have a concrete vision for the features we will be including in the future. Feedback on the importance of a given feature to customer use cases will help us better prioritize the following lists.
Expand Down
122 changes: 119 additions & 3 deletions sql/timeseries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -281,11 +281,13 @@ SELECT pt.parentrelid as table_id,
pg_get_expr(c.relpartbound, c.oid) AS part_range,
pg_table_size(pt.relid) AS table_size_bytes,
pg_indexes_size(pt.relid) AS index_size_bytes,
pg_total_relation_size(pt.relid) AS total_size_bytes
pg_total_relation_size(pt.relid) AS total_size_bytes,
am.amname AS access_method
FROM @[email protected]_config tsc,
pg_partition_tree(tsc.table_id) pt,
pg_class c
WHERE pt.isleaf AND pt.relid = c.oid
pg_class c,
pg_am am
WHERE pt.isleaf AND pt.relid = c.oid AND c.relam = am.oid
ORDER BY 2 ASC;

-- Unlike the above view, this sums partitions for each time-series table.
Expand Down Expand Up @@ -496,3 +498,117 @@ CREATE OR REPLACE AGGREGATE last(value anyelement, rank anycompatible) (
FINALFUNC = endpoint_final,
FINALFUNC_EXTRA
);

-- When provided with a table, interval, and time range,
-- this function will bin all data within the specified
-- range into rows spaced according to the stride.
--
-- Time bins which lack data in the input table will still
-- receive a single row of output with NULLs in columns
-- other than that row's date bin value.
--
-- The target table must be time-series enabled.
CREATE OR REPLACE FUNCTION @[email protected]_bin_table (target_table_elem anyelement, time_stride interval, time_range tstzrange)
RETURNS SETOF anyelement
LANGUAGE plpgsql
AS $function$
#print_strict_params on
DECLARE
target_table_id regclass;
tl_sql text;
part_col_name name;
part_col_type regtype;
col_name name;
prev_lead_time interval;
part_duration interval;
leading_partitions numeric;
BEGIN
SELECT oid INTO target_table_id
FROM pg_class cl
WHERE reltype=pg_typeof(target_table_elem);

IF target_table_id IS NULL THEN
RAISE invalid_parameter_value USING
MESSAGE = 'invalid table type',
DETAIL = 'Target table element must be a table type',
HINT = 'Provide a value with a type corresponding to the table to query.';
END IF;

PERFORM *
FROM @[email protected]_config
WHERE "table_id"=target_table_id;
IF NOT FOUND THEN
RAISE object_not_in_prerequisite_state USING
MESSAGE = 'target table must be time-series enhanced',
DETAIL = 'Cannot query tables without time-series enhancements',
HINT = format('Call %L to enable time-series enhancements', 'enable_ts_table');
END IF;

SELECT
at.attname,
format_type(at.atttypid, at.atttypmod)::regtype
INTO STRICT part_col_name, part_col_type
FROM (
SELECT partrelid, unnest(pt.partattrs) partattnum
FROM pg_partitioned_table pt
WHERE pt.partrelid = target_table_id
) pat
JOIN pg_attribute at ON at.attrelid = pat.partrelid AND
at.attnum = pat.partattnum;

FOR col_name IN
SELECT attname FROM pg_attribute
WHERE attnum > 0 AND
NOT attisdropped AND
attrelid=target_table_id
ORDER BY attnum ASC
LOOP
IF col_name = part_col_name
THEN
tl_sql := format('%sCAST(date_series.date AS %s) %I, ',
tl_sql, part_col_type, col_name);
ELSE
tl_sql := format('%sdata.%I, ',
tl_sql, col_name);
END IF;
END LOOP;

tl_sql := rtrim(tl_sql, ', ');

RETURN QUERY EXECUTE format($query$
WITH data AS (
SELECT *, date_bin($1, %I, $2) binned_date
FROM %I
WHERE %I BETWEEN $2 AND $3
ORDER BY binned_date
)
SELECT %s
FROM generate_series($2, $3, $1) date_series(date)
LEFT JOIN data
ON data.binned_date = date_series.date;$query$,
part_col_name,
target_table_id,
part_col_name,
tl_sql)
USING time_stride, lower(time_range), upper(time_range);
RETURN;
END;
$function$;

-- Function implementation for LOCF: last-observed carry-forward.
-- Intended for use on the output of date_bin_table in order to
-- fill NULL rows with the last observed value.
CREATE OR REPLACE FUNCTION @[email protected]_agg(state anyelement, value anyelement)
RETURNS anyelement
LANGUAGE plpgsql
AS $function$
BEGIN
RETURN COALESCE(value, state);
END;
$function$;

-- Aggregate for LOCF. For use in a WINDOW clause
CREATE AGGREGATE locf(anyelement) (
SFUNC = locf_agg,
STYPE = anyelement
);
52 changes: 49 additions & 3 deletions test/expected/basic_usage.out
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ SELECT COUNT(*) > 10 AS has_partitions FROM ts_part_info;
t
(1 row)

SELECT COUNT(*) > 0 AS "compressed?" FROM ts_part_info WHERE access_method = 'columnar';
compressed?
-------------
f
(1 row)

SELECT set_ts_retention_policy('measurements', '90 days');
set_ts_retention_policy
-------------------------
Expand Down Expand Up @@ -119,7 +125,19 @@ SELECT premake FROM part_config WHERE parent_table='public.measurements';
1
(1 row)

SELECT set_ts_retention_policy('measurements', '1 days');
SELECT apply_compression_policy('measurements', '1 day');
apply_compression_policy
--------------------------

(1 row)

SELECT COUNT(*) > 0 AS "compressed?" FROM ts_part_info WHERE access_method = 'columnar';
compressed?
-------------
t
(1 row)

SELECT set_ts_retention_policy('measurements', '1 day');
set_ts_retention_policy
-------------------------

Expand All @@ -140,9 +158,15 @@ SELECT COUNT(*) < 10 AS fewer_partitions FROM ts_part_info;
CREATE TABLE events (
user_id bigint,
event_id bigint,
event_time timestamptz,
event_time timestamptz NOT NULL,
value float
);
) PARTITION BY RANGE (event_time);
SELECT enable_ts_table('events');
enable_ts_table
-----------------

(1 row)

COPY events FROM STDIN WITH (FORMAT 'csv');
SELECT first(value, event_time), user_id FROM events GROUP BY user_id;
first | user_id
Expand Down Expand Up @@ -172,3 +196,25 @@ SELECT last(event_id, event_time), user_id FROM events GROUP BY user_id;
6 | 1
(2 rows)

SELECT last(user_id, value) top_performer,
locf(avg(value)) OVER (ORDER BY event_time),
event_time
FROM date_bin_table(NULL::events, '1 minute',
'[2020-11-04 15:50:00-08, 2020-11-04 16:00:00-08]')
GROUP BY 3
ORDER BY 3;
top_performer | locf | event_time
---------------+--------------------+------------------------------
| | Wed Nov 04 15:50:00 2020 PST
2 | 1.4 | Wed Nov 04 15:51:00 2020 PST
| 1.4 | Wed Nov 04 15:52:00 2020 PST
2 | 1.5 | Wed Nov 04 15:53:00 2020 PST
| 1.5 | Wed Nov 04 15:54:00 2020 PST
2 | 1.6 | Wed Nov 04 15:55:00 2020 PST
| 1.6 | Wed Nov 04 15:56:00 2020 PST
2 | 1.7 | Wed Nov 04 15:57:00 2020 PST
2 | 1.8 | Wed Nov 04 15:58:00 2020 PST
2 | 1.9000000000000001 | Wed Nov 04 15:59:00 2020 PST
| 1.9000000000000001 | Wed Nov 04 16:00:00 2020 PST
(11 rows)

31 changes: 22 additions & 9 deletions test/sql/basic_usage.sql
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ SELECT partition_duration, partition_lead_time FROM ts_config ORDER BY table_id:
SELECT partition_interval, premake FROM part_config WHERE parent_table='public.measurements';

SELECT COUNT(*) > 10 AS has_partitions FROM ts_part_info;
SELECT COUNT(*) > 0 AS "compressed?" FROM ts_part_info WHERE access_method = 'columnar';

SELECT set_ts_retention_policy('measurements', '90 days');
SELECT retention_duration FROM ts_config WHERE table_id='measurements'::regclass;
Expand All @@ -45,27 +46,31 @@ SELECT set_ts_lead_time('measurements', '1 day');
SELECT partition_lead_time FROM ts_config WHERE table_id='measurements'::regclass;
SELECT premake FROM part_config WHERE parent_table='public.measurements';

SELECT set_ts_retention_policy('measurements', '1 days');
SELECT apply_compression_policy('measurements', '1 day');
SELECT COUNT(*) > 0 AS "compressed?" FROM ts_part_info WHERE access_method = 'columnar';

SELECT set_ts_retention_policy('measurements', '1 day');
SELECT run_maintenance();
SELECT COUNT(*) < 10 AS fewer_partitions FROM ts_part_info;

CREATE TABLE events (
user_id bigint,
event_id bigint,
event_time timestamptz,
event_time timestamptz NOT NULL,
value float
);
) PARTITION BY RANGE (event_time);
SELECT enable_ts_table('events');

COPY events FROM STDIN WITH (FORMAT 'csv');
1,1,"2020-11-04 15:54:02.226999-08",1.1
1,2,"2020-11-04 15:55:02.226999-08",1.2
1,3,"2020-11-04 15:56:02.226999-08",1.3
1,1,"2020-11-04 15:51:02.226999-08",1.1
1,2,"2020-11-04 15:53:02.226999-08",1.2
1,3,"2020-11-04 15:55:02.226999-08",1.3
1,4,"2020-11-04 15:57:02.226999-08",1.4
1,5,"2020-11-04 15:58:02.226999-08",1.5
1,6,"2020-11-04 15:59:02.226999-08",1.6
2,7,"2020-11-04 15:54:02.226999-08",1.7
2,8,"2020-11-04 15:55:02.226999-08",1.8
2,9,"2020-11-04 15:56:02.226999-08",1.9
2,7,"2020-11-04 15:51:02.226999-08",1.7
2,8,"2020-11-04 15:53:02.226999-08",1.8
2,9,"2020-11-04 15:55:02.226999-08",1.9
2,10,"2020-11-04 15:57:02.226999-08",2.0
2,11,"2020-11-04 15:58:02.226999-08",2.1
2,12,"2020-11-04 15:59:02.226999-08",2.2
Expand All @@ -78,3 +83,11 @@ SELECT last(value, event_time), user_id FROM events GROUP BY user_id;
SELECT first(event_id, event_time), user_id FROM events GROUP BY user_id;

SELECT last(event_id, event_time), user_id FROM events GROUP BY user_id;

SELECT last(user_id, value) top_performer,
locf(avg(value)) OVER (ORDER BY event_time),
event_time
FROM date_bin_table(NULL::events, '1 minute',
'[2020-11-04 15:50:00-08, 2020-11-04 16:00:00-08]')
GROUP BY 3
ORDER BY 3;
Loading