Skip to content

Commit

Permalink
Add compression functions, features, docs, tests
Browse files Browse the repository at this point in the history
This adds a dependency on Hydra columnar and pg_cron.
  • Loading branch information
jasonmp85 committed Apr 18, 2024
1 parent 9cea508 commit 1e9b0f7
Show file tree
Hide file tree
Showing 11 changed files with 159 additions and 16 deletions.
23 changes: 20 additions & 3 deletions .github/workflows/installcheck.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,33 @@ jobs:
sudo apt-get install -y --no-install-recommends postgresql-common &&
sudo sed -ri 's/#(create_main_cluster) .*$/\1 = false/' /etc/postgresql-common/createcluster.conf
# rust needed to install trunk
- name: Install Rust stable toolchain
uses: actions-rs/toolchain@v1
with:
toolchain: stable

- name: Install stoml and pg-trunk
shell: bash
run: |
set -xe
wget https://github.com/freshautomations/stoml/releases/download/v0.7.1/stoml_linux_amd64 &> /dev/null
mv stoml_linux_amd64 stoml
chmod +x stoml
sudo mv stoml /usr/local/bin/
cargo install pg-trunk
- name: Install PostgreSQL and server-dev
run: >
sudo apt-get install -y --no-install-recommends \
postgresql-${{matrix.pg_version}} \
postgresql-server-dev-${{matrix.pg_version}}
- name: Install dependencies
run: >
sudo apt-get install -y --no-install-recommends \
postgresql-${{matrix.pg_version}}-partman
run: |
sudo ~/.cargo/bin/trunk install --pg-version=${{matrix.pg_version}} pg_partman
sudo ~/.cargo/bin/trunk install --pg-version=${{matrix.pg_version}} pg_cron
sudo ~/.cargo/bin/trunk install --pg-version=${{matrix.pg_version}} hydra_columnar
- uses: actions/checkout@v3

Expand Down
6 changes: 4 additions & 2 deletions META.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "timeseries",
"abstract": "Convenience API for Tembo time series stack",
"description": "Convenience API for Tembo time series stack",
"version": "0.1.2",
"version": "0.1.3",
"maintainer": [
"Jason Petersen <[email protected]>"
],
Expand All @@ -12,7 +12,7 @@
"abstract": "Convenience API for Tembo time series stack",
"file": "sql/timeseries.sql",
"docfile": "doc/timeseries.md",
"version": "0.1.2"
"version": "0.1.3"
}
},
"prereqs": {
Expand All @@ -21,6 +21,8 @@
"PostgreSQL": "14.0.0"
},
"recommends": {
"columnar": "1.1.2",
"pg_cron": "1.6.2",
"pg_partman": "4.5.1"
}
}
Expand Down
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ EXTVERSION = $(shell grep -m 1 '[[:space:]]\{6\}"version":' META.json | \
DISTVERSION = $(shell grep -m 1 '[[:space:]]\{3\}"version":' META.json | \
sed -e 's/[[:space:]]*"version":[[:space:]]*"\([^"]*\)",\{0,1\}/\1/')

EXTVERSIONS = 0.1.2
EXTVERSIONS = 0.1.3

DATA = $(wildcard sql/*--*.sql)
DATA_built = $(foreach v,$(EXTVERSIONS),sql/$(EXTENSION)--$(v).sql)

DOCS = $(wildcard doc/*.md)
TESTS = $(wildcard test/sql/*.sql)
REGRESS = $(patsubst test/sql/%.sql,%,$(TESTS))
REGRESS_OPTS = --inputdir=test
REGRESS_OPTS = --temp-config=./timeseries.conf --temp-instance=./tmp_check --inputdir=test
PG_CONFIG = pg_config
EXTRA_CLEAN = sql/$(EXTENSION)--$(EXTVERSION).sql

Expand All @@ -30,5 +30,5 @@ latest-changes.md: Changes

# generate each version's file installation file by concatenating
# previous upgrade scripts
sql/$(EXTENSION)--0.1.2.sql: sql/$(EXTENSION).sql
sql/$(EXTENSION)--0.1.3.sql: sql/$(EXTENSION).sql
cat $^ > $@
2 changes: 1 addition & 1 deletion Trunk.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ description = "Open source timeseries extension for Postgres."
homepage = "https://github.com/tembo-io/pg_timeseries"
documentation = "https://github.com/tembo-io/pg_timeseries"
categories = ["analytics"]
version = "0.1.2"
version = "0.1.3"

[build]
postgres_version = "15"
Expand Down
2 changes: 1 addition & 1 deletion doc/guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ In this guide, you will become familiar with the functions and features of the t

## Preparing your database

You'll need a PostgreSQL instance running timeseries `0.1.2` or later. An easy way to have one set up for you is to deploy one from Tembo Cloud [here](https://cloud.tembo.io). The free tier will perform well enough for the data set we'll be using.
You'll need a PostgreSQL instance running timeseries `0.1.3` or later. An easy way to have one set up for you is to deploy one from Tembo Cloud [here](https://cloud.tembo.io). The free tier will perform well enough for the data set we'll be using.

Once that's up and running, you'll need a client machine with `psql` (to connect to your database) and [the Divvy dataset](https://tembo-demo-bucket.s3.amazonaws.com/202004--202402-divvy-tripdata-slim.csv.gz), which will total about 50MiB of CSV after decompression.

Expand Down
8 changes: 7 additions & 1 deletion doc/timeseries.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,19 @@ On the other hand, you may be worried about plugging a firehose of data into you

Fortunately, it's incredibly easy to simply drop time-series partitions on a schedule. Call `set_ts_retention_policy` with your time-series table and an interval (say, `'90 days'`) to establish such a policy. Once an hour, any partitions falling entirely outside the retention window will be dropped. Use `clear_ts_retention_policy` to revert to the default behavior (infinite retention). Each of these functions will return the previous retention policy when called.

### Compression

Sometimes you know older data isn't queried very often, but still don't want to commit to just dropping older partitions. In this case, compression may be what you desire.

By calling `set_ts_compression_policy` on a time-series table with an appropriate interval (perhaps`'1 month'`), this extension will take care of compressing partitions (using a columnar storage method) older than the specified interval, once an hour. As with the retention policy functionality, a function is also provided for clearing any existing policy (existing partitions will not be decompressed, however).

## Roadmap

While `timeseries` is still in its early days, we have a concrete vision for the features we will be including in the future. Feedback on the importance of a given feature to customer use cases will help us better prioritize the following lists.

This list is somewhat ordered by likelihood of near-term delivery, or maybe difficulty, but that property is only loosely intended and no guarantee of priority. Again, feedback from users will take precedence.

- Columnar storage and storage type management — will enable some degree of compression and accelerate column-oriented analytics workloads
- Assorted "analytic" functions frequently associated with time-series workloads
- Periodic `REFRESH MATERIALIZED VIEW` — set schedules for background refresh of materialized views (useful for dashboarding, etc.)
- Roll-off to `TABLESPACE` — as data ages, it will be moved into a specified table space
- Use of "tiered storage", i.e. moving older partitions to be stored in S3 rather than on-disk
Expand Down
120 changes: 117 additions & 3 deletions sql/timeseries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ CREATE TABLE @[email protected]_config(
table_id regclass PRIMARY KEY,
partition_duration interval NOT NULL,
partition_lead_time interval NOT NULL,
retention_duration interval);
retention_duration interval,
compression_duration interval);

-- Enhances an existing table with our time-series best practices. Basically
-- the entry point to this extension. Minimally, a user must create a table
Expand Down Expand Up @@ -157,8 +158,7 @@ $function$;
--
-- In the future, similar functions will manage schedules for moving partitions
-- to non-default TABLESPACEs, issuing CLUSTER commands to reorganize data in
-- non-active partitions along a chosen index, or even applying compressed
-- storage methods and rollup logic.
-- non-active partitions along a chosen index.
--
-- Returns the previous retention duration, or NULL if none was set.
CREATE OR REPLACE
Expand Down Expand Up @@ -295,3 +295,117 @@ SELECT table_id,
SUM(index_size_bytes) AS index_size_bytes,
SUM(total_size_bytes) AS total_size_bytes
FROM @[email protected]_part_info GROUP BY 1 ORDER BY 2 ASC;

-- This function sets a compression policy on an existing time-series table, which
-- ensures that all partitions older than a particular offset (from present) are
-- automatically converted to a columnar storage mechanism, which offers some
-- degree of data compression.
--
-- Returns the previous compression duration, or NULL if none was set.
CREATE OR REPLACE FUNCTION @[email protected]_ts_compression_policy(target_table_id regclass, new_compression interval)
RETURNS interval
LANGUAGE plpgsql
AS $function$
DECLARE
table_name text;
prev_compression interval;
BEGIN
SELECT compression_duration
INTO prev_compression
FROM @[email protected]_config
WHERE "table_id"=target_table_id
FOR UPDATE;
IF NOT FOUND THEN
RAISE object_not_in_prerequisite_state USING
MESSAGE = 'could not fetch compression policy',
DETAIL = 'Target table was not time-series enhanced',
HINT = format('Call %L to enable time-series enhancements', 'enable_ts_table');
END IF;

UPDATE @[email protected]_config
SET "compression_duration"=new_compression
WHERE "table_id"=target_table_id;

RETURN prev_compression;
END;
$function$;

-- Unsets any compression policy on the specified table. Returns the old policy,
-- if one was set.
CREATE OR REPLACE
FUNCTION @[email protected]_ts_compression_policy(target_table_id regclass)
RETURNS interval
LANGUAGE plpgsql
AS $function$
DECLARE
prev_compression interval;
BEGIN
SELECT set_ts_compression_policy(target_table_id, NULL) INTO prev_compression;

RETURN prev_compression;
END;
$function$;

-- This function implements the core of compression application: given a target table ID
-- (which must be time-series enabled) and a compression offset, all partitions falling
-- entirely behind the offset (from the present time) will be converted to using columnar
-- storage. This function is "idempotent" in the sense that repeated calls will behave
-- identically given the same wall clock time and arguments.
CREATE OR REPLACE FUNCTION @[email protected]_compression_policy(target_table_id regclass, comp_offset interval)
RETURNS void
LANGUAGE plpgsql
AS $function$
DECLARE
table_name text;
part_row record;
part_beg timestamptz;
part_end timestamptz;
part_am name;
BEGIN
IF comp_offset IS NULL THEN
RETURN;
END IF;

SELECT format('%s.%s', n.nspname, c.relname)
INTO table_name
FROM pg_class c
LEFT JOIN pg_namespace n
ON n.oid = c.relnamespace
WHERE c.oid=target_table_id;

FOR part_row IN
SELECT
partition_schemaname,
partition_tablename
FROM @[email protected]_partitions(table_name, 'ASC')
LOOP
SELECT child_start_time, child_end_time
INTO part_beg, part_end
FROM @[email protected]_partition_info(
part_row.partition_schemaname || '.' ||
part_row.partition_tablename);

SELECT am.amname
INTO part_am
FROM pg_class c, pg_am am
WHERE c.oid = (part_row.partition_schemaname || '.' ||
part_row.partition_tablename)::regclass AND
c.relam = am.oid;

IF part_am <> 'columnar' AND
part_end < (now() - comp_offset) THEN
PERFORM columnar.alter_table_set_access_method(
part_row.partition_schemaname || '.' ||
part_row.partition_tablename, 'columnar');
EXECUTE format('ALTER TABLE %I ATTACH PARTITION %I.%I FOR VALUES FROM (%L) TO (%L)', target_table_id, part_row.partition_schemaname, part_row.partition_tablename, part_beg, part_end);
END IF;
END LOOP;
END;
$function$;

-- Since we're using pg_cron, might as well schedule the maintenance through its bgw
-- rather than run a duplicate one through pg_partman.
SELECT cron.schedule('partman-maintenance', '@hourly', $$SELECT partman.run_maintenance();$$);

-- Scan the time-series config table and apply compression policies once an hour.
SELECT cron.schedule('timeseries-compression', '@hourly', $$SELECT @[email protected]_compression_policy(table_id, compression_duration) FROM ts_config;$$);
2 changes: 2 additions & 0 deletions test/expected/basic_usage.out
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
BEGIN;
CREATE SCHEMA partman;
CREATE EXTENSION IF NOT EXISTS pg_partman WITH SCHEMA partman CASCADE;
CREATE EXTENSION IF NOT EXISTS pg_cron CASCADE;
CREATE EXTENSION timeseries;
ROLLBACK;
CREATE EXTENSION timeseries CASCADE;
NOTICE: installing required extension "pg_cron"
NOTICE: installing required extension "pg_partman"
CREATE TABLE simple ();
SELECT enable_ts_table('simple');
Expand Down
1 change: 1 addition & 0 deletions test/sql/basic_usage.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
BEGIN;
CREATE SCHEMA partman;
CREATE EXTENSION IF NOT EXISTS pg_partman WITH SCHEMA partman CASCADE;
CREATE EXTENSION IF NOT EXISTS pg_cron CASCADE;
CREATE EXTENSION timeseries;
ROLLBACK;

Expand Down
1 change: 1 addition & 0 deletions timeseries.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
shared_preload_libraries = 'pg_cron'
4 changes: 2 additions & 2 deletions timeseries.control
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# time series extension
comment = 'Convenience API for Tembo time series stack'
default_version = '0.1.2'
default_version = '0.1.3'
module_pathname = '$libdir/timeseries'
requires = 'pg_partman'
requires = 'columnar, pg_cron, pg_partman'

0 comments on commit 1e9b0f7

Please sign in to comment.