Skip to content

Commit

Permalink
Add implementation of first/last aggregates
Browse files Browse the repository at this point in the history
Possibly to rewrite in Rust later, but supplies the functionality
for now
  • Loading branch information
jasonmp85 committed Apr 23, 2024
1 parent 0df1853 commit 016d28b
Showing 1 changed file with 87 additions and 0 deletions.
87 changes: 87 additions & 0 deletions sql/timeseries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -409,3 +409,90 @@ SELECT cron.schedule('partman-maintenance', '@hourly', $$SELECT partman.run_main

-- Scan the time-series config table and apply compression policies once an hour.
SELECT cron.schedule('timeseries-compression', '@hourly', $$SELECT @[email protected]_compression_policy(table_id, compression_duration) FROM ts_config;$$);


-- Incremental function to make progress on the calculation of the 'first' aggregate.
-- The state is stored as two text values: the current "winning" value and the rank
-- for that value. Text is used because the polymorphic functions here are limited
-- to arrays, and `value` and `rank` may have different types.
CREATE OR REPLACE FUNCTION public.first_agg(state text[], value anyelement, rank anycompatible)
RETURNS text[]
LANGUAGE plpgsql
AS $function$
DECLARE
new_state text[] := state;
old_rank rank%TYPE;
BEGIN
IF rank IS NULL THEN
RETURN new_state;
END IF;

SELECT state[2] INTO old_rank;
IF (state IS NULL) OR
(rank < old_rank) THEN
new_state := ARRAY[value::text, rank::text];
END IF;

RETURN new_state;
END;
$function$;

-- This function is identical to the above, but with the sort order reversed
-- TODO: combine these two step functions
CREATE OR REPLACE FUNCTION public.last_agg(state text[], value anyelement, rank anycompatible)
RETURNS text[]
LANGUAGE plpgsql
AS $function$
DECLARE
new_state text[] := state;
old_rank rank%TYPE;
BEGIN
IF rank IS NULL THEN
RETURN new_state;
END IF;

SELECT state[2] INTO old_rank;
IF (state IS NULL) OR
(rank > old_rank) THEN
new_state := ARRAY[value::text, rank::text];
END IF;

RETURN new_state;
END;
$function$;

-- The final pass for the endpoint aggregate functions. this just checks whether the
-- aggregation state is present, and, if so, returns the winning value.
CREATE OR REPLACE FUNCTION public.endpoint_final(state text[], value anyelement, rank anycompatible)
RETURNS anyelement
LANGUAGE plpgsql
AS $function$
DECLARE
final_value value%TYPE;
BEGIN
IF state IS NULL THEN
RETURN NULL;
END IF;

SELECT state[1] INTO final_value;
RETURN final_value;
END;
$function$;

-- Defines an aggregate that returns the first value in a table from one column
-- (the "value" column) when sorted by a second column (the "rank" column)
CREATE OR REPLACE AGGREGATE first(value anyelement, rank anycompatible) (
SFUNC = first_agg,
STYPE = text[],
FINALFUNC = endpoint_final,
FINALFUNC_EXTRA
);

-- Defines an aggregate that returns the last value in a table from one column
-- (the "value" column) when sorted by a second column (the "rank" column)
CREATE OR REPLACE AGGREGATE last(value anyelement, rank anycompatible) (
SFUNC = last_agg,
STYPE = text[],
FINALFUNC = endpoint_final,
FINALFUNC_EXTRA
);

0 comments on commit 016d28b

Please sign in to comment.