Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Merge #679
Browse files Browse the repository at this point in the history
679: Improve visibility into maintenance tasks r=cevian a=cevian



Co-authored-by: Matvey Arye <[email protected]>
  • Loading branch information
bors[bot] and cevian authored Jul 6, 2021
2 parents 1d0749b + 8838af6 commit 6f6fddc
Show file tree
Hide file tree
Showing 6 changed files with 241 additions and 38 deletions.
12 changes: 10 additions & 2 deletions pkg/migrations/migration_files_generated.go

Large diffs are not rendered by default.

158 changes: 134 additions & 24 deletions pkg/migrations/sql/idempotent/base.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1610,7 +1610,7 @@ GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.drop_metric_chunk_data(text, timestampt

--drop chunks from metrics tables and delete the appropriate series.
CREATE OR REPLACE PROCEDURE SCHEMA_CATALOG.drop_metric_chunks(
metric_name TEXT, older_than TIMESTAMPTZ, ran_at TIMESTAMPTZ DEFAULT now()
metric_name TEXT, older_than TIMESTAMPTZ, ran_at TIMESTAMPTZ = now(), log_verbose BOOLEAN = FALSE
) AS $func$
DECLARE
metric_id int;
Expand All @@ -1619,6 +1619,8 @@ DECLARE
time_dimension_id INT;
last_updated TIMESTAMPTZ;
present_epoch BIGINT;
lastT TIMESTAMPTZ;
startT TIMESTAMPTZ;
BEGIN
SELECT id, table_name
INTO STRICT metric_id, metric_table
Expand All @@ -1627,6 +1629,13 @@ BEGIN
SELECT older_than + INTERVAL '1 hour'
INTO check_time;

startT := clock_timestamp();

PERFORM set_config('application_name', format('promscale maintenance: data retention: metric %s', metric_name), false);
IF log_verbose THEN
RAISE LOG 'promscale maintenance: data retention: metric %: starting', metric_name;
END IF;

-- transaction 1
IF SCHEMA_CATALOG.is_timescaledb_installed() THEN
--Get the time dimension id for the time dimension
Expand Down Expand Up @@ -1659,27 +1668,49 @@ BEGIN
IF older_than IS NULL THEN
-- even though there are no new Ids in need of deletion,
-- we may still have old ones to delete
lastT := clock_timestamp();
PERFORM set_config('application_name', format('promscale maintenance: data retention: metric %s: delete expired series', metric_name), false);
PERFORM SCHEMA_CATALOG.delete_expired_series(metric_table, ran_at, present_epoch, last_updated);
IF log_verbose THEN
RAISE LOG 'promscale maintenance: data retention: metric %: done deleting expired series as only action in %', metric_name, clock_timestamp-lastT;
RAISE LOG 'promscale maintenance: data retention: metric %: finished in %', metric_name, clock_timestamp()-startT;
END IF;
RETURN;
END IF;

-- transaction 2
lastT := clock_timestamp();
PERFORM set_config('application_name', format('promscale maintenance: data retention: metric %s: mark unused series', metric_name), false);
PERFORM SCHEMA_CATALOG.mark_unused_series(metric_table, older_than, check_time);
IF log_verbose THEN
RAISE LOG 'promscale maintenance: data retention: metric %: done marking unused series in %', metric_name, clock_timestamp()-lastT;
END IF;
COMMIT;

-- transaction 3
lastT := clock_timestamp();
PERFORM set_config('application_name', format('promscale maintenance: data retention: metric %s: drop chunks', metric_name), false);
PERFORM SCHEMA_CATALOG.drop_metric_chunk_data(metric_name, older_than);
IF log_verbose THEN
RAISE LOG 'promscale maintenance: data retention: metric %: done dropping chunks in %', metric_name, clock_timestamp()-lastT;
END IF;
SELECT current_epoch, last_update_time INTO present_epoch, last_updated FROM
SCHEMA_CATALOG.ids_epoch LIMIT 1;
COMMIT;

-- transaction 4
lastT := clock_timestamp();
PERFORM set_config('application_name', format('promscale maintenance: data retention: metric %s: delete expired series', metric_name), false);
PERFORM SCHEMA_CATALOG.delete_expired_series(metric_table, ran_at, present_epoch, last_updated);
IF log_verbose THEN
RAISE LOG 'promscale maintenance: data retention: metric %: done deleting expired series in %', metric_name, clock_timestamp()-lastT;
RAISE LOG 'promscale maintenance: data retention: metric %: finished in %', metric_name, clock_timestamp()-startT;
END IF;
RETURN;
END
$func$
LANGUAGE PLPGSQL;
GRANT EXECUTE ON PROCEDURE SCHEMA_CATALOG.drop_metric_chunks(text, timestamptz, timestamptz) TO prom_maintenance;
GRANT EXECUTE ON PROCEDURE SCHEMA_CATALOG.drop_metric_chunks(text, timestamptz, timestamptz, boolean) TO prom_maintenance;

--Order by random with stable marking gives us same order in a statement and different
-- orderings in different statements
Expand Down Expand Up @@ -1710,10 +1741,11 @@ $$
LANGUAGE PLPGSQL STABLE;
GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.get_metrics_that_need_drop_chunk() TO prom_reader;

CREATE OR REPLACE PROCEDURE SCHEMA_CATALOG.execute_data_retention_policy()
CREATE OR REPLACE PROCEDURE SCHEMA_CATALOG.execute_data_retention_policy(log_verbose boolean)
AS $$
DECLARE
r RECORD;
remaining_metrics SCHEMA_CATALOG.metric[] DEFAULT '{}';
BEGIN
--Do one loop with metric that could be locked without waiting.
--This allows you to do everything you can while avoiding lock contention.
Expand All @@ -1724,71 +1756,126 @@ BEGIN
SELECT *
FROM SCHEMA_CATALOG.get_metrics_that_need_drop_chunk()
LOOP
CONTINUE WHEN NOT SCHEMA_CATALOG.lock_metric_for_maintenance(r.id, wait=>false);
CALL SCHEMA_CATALOG.drop_metric_chunks(r.metric_name, NOW() - SCHEMA_CATALOG.get_metric_retention_period(r.metric_name));
IF NOT SCHEMA_CATALOG.lock_metric_for_maintenance(r.id, wait=>false) THEN
remaining_metrics := remaining_metrics || r;
CONTINUE;
END IF;
CALL SCHEMA_CATALOG.drop_metric_chunks(r.metric_name, NOW() - SCHEMA_CATALOG.get_metric_retention_period(r.metric_name), log_verbose=>log_verbose);
PERFORM SCHEMA_CATALOG.unlock_metric_for_maintenance(r.id);

COMMIT;
END LOOP;

IF log_verbose AND array_length(remaining_metrics, 1) > 0 THEN
RAISE LOG 'promscale maintenance: data retention: need to wait to grab locks on % metrics', array_length(remaining_metrics, 1);
END IF;

FOR r IN
SELECT *
FROM SCHEMA_CATALOG.get_metrics_that_need_drop_chunk()
FROM unnest(remaining_metrics)
LOOP
PERFORM set_config('application_name', format('promscale maintenance: data retention: metric %s: wait for lock', r.metric_name), false);
PERFORM SCHEMA_CATALOG.lock_metric_for_maintenance(r.id);
CALL SCHEMA_CATALOG.drop_metric_chunks(r.metric_name, NOW() - SCHEMA_CATALOG.get_metric_retention_period(r.metric_name));
CALL SCHEMA_CATALOG.drop_metric_chunks(r.metric_name, NOW() - SCHEMA_CATALOG.get_metric_retention_period(r.metric_name), log_verbose=>log_verbose);
PERFORM SCHEMA_CATALOG.unlock_metric_for_maintenance(r.id);

COMMIT;
END LOOP;
END;
$$ LANGUAGE PLPGSQL;
COMMENT ON PROCEDURE SCHEMA_CATALOG.execute_data_retention_policy()
COMMENT ON PROCEDURE SCHEMA_CATALOG.execute_data_retention_policy(boolean)
IS 'drops old data according to the data retention policy. This procedure should be run regularly in a cron job';
GRANT EXECUTE ON PROCEDURE SCHEMA_CATALOG.execute_data_retention_policy() TO prom_maintenance;
GRANT EXECUTE ON PROCEDURE SCHEMA_CATALOG.execute_data_retention_policy(boolean) TO prom_maintenance;

--public procedure to be called by cron
--right now just does data retention but name is generic so that
--we can add stuff later without needing people to change their cron scripts
--should be the last thing run in a session so that all session locks
--are guaranteed released on error.
CREATE OR REPLACE PROCEDURE SCHEMA_PROM.execute_maintenance()
CREATE OR REPLACE PROCEDURE SCHEMA_PROM.execute_maintenance(log_verbose boolean = false)
AS $$
DECLARE
startT TIMESTAMPTZ;
BEGIN
CALL SCHEMA_CATALOG.execute_data_retention_policy();
startT := clock_timestamp();
IF log_verbose THEN
RAISE LOG 'promscale maintenance: data retention: starting';
END IF;
PERFORM set_config('application_name', format('promscale maintenance: data retention'), false);
CALL SCHEMA_CATALOG.execute_data_retention_policy(log_verbose=>log_verbose);
IF SCHEMA_CATALOG.get_timescale_major_version() >= 2 THEN
CALL SCHEMA_CATALOG.execute_compression_policy();
IF log_verbose THEN
RAISE LOG 'promscale maintenance: compression: starting';
END IF;
PERFORM set_config('application_name', format('promscale maintenance: compression'), false);
CALL SCHEMA_CATALOG.execute_compression_policy(log_verbose=>log_verbose);
END IF;
IF log_verbose THEN
RAISE LOG 'promscale maintenance: finished in %', clock_timestamp()-startT;
END IF;
IF clock_timestamp()-startT > INTERVAL '12 hours' THEN
RAISE WARNING 'promscale maintenance jobs are taking too long (one run took %)', clock_timestamp()-startT
USING HINT = 'Please consider increasing the number of maintenance jobs using config_maintenance_jobs()';
END IF;
END;
$$ LANGUAGE PLPGSQL;
COMMENT ON PROCEDURE SCHEMA_PROM.execute_maintenance()
COMMENT ON PROCEDURE SCHEMA_PROM.execute_maintenance(boolean)
IS 'Execute maintenance tasks like dropping data according to retention policy. This procedure should be run regularly in a cron job';
GRANT EXECUTE ON PROCEDURE SCHEMA_PROM.execute_maintenance() TO prom_maintenance;
GRANT EXECUTE ON PROCEDURE SCHEMA_PROM.execute_maintenance(boolean) TO prom_maintenance;

CREATE OR REPLACE PROCEDURE SCHEMA_CATALOG.execute_maintenance_job(job_id int, config jsonb)
AS $$
DECLARE
log_verbose boolean;
ae_key text;
ae_value text;
ae_load boolean := FALSE;
BEGIN
CALL SCHEMA_PROM.execute_maintenance();
log_verbose := coalesce(config->>'log_verbose', 'false')::boolean;

--if auto_explain enabled in config, turn it on in a best-effort way
--i.e. if it fails (most likely due to lack of superuser priviliges) move on anyway.
BEGIN
FOR ae_key, ae_value IN
SELECT * FROM jsonb_each_text(config->'auto_explain')
LOOP
IF NOT ae_load THEN
ae_load := true;
LOAD 'auto_explain';
END IF;

PERFORM set_config('auto_explain.'|| ae_key, ae_value, FALSE);
END LOOP;
EXCEPTION WHEN OTHERS THEN
RAISE WARNING 'could not set auto_explain options';
END;


CALL SCHEMA_PROM.execute_maintenance(log_verbose=>log_verbose);
END
$$ LANGUAGE PLPGSQL;

CREATE OR REPLACE FUNCTION SCHEMA_PROM.config_maintenance_jobs(number_jobs int, new_schedule_interval interval)
CREATE OR REPLACE FUNCTION SCHEMA_PROM.config_maintenance_jobs(number_jobs int, new_schedule_interval interval, new_config jsonb = NULL)
RETURNS BOOLEAN
AS $func$
DECLARE
cnt int;
log_verbose boolean;
BEGIN
--check format of config
log_verbose := coalesce(new_config->>'log_verbose', 'false')::boolean;

PERFORM SCHEMA_TIMESCALE.delete_job(job_id)
FROM timescaledb_information.jobs
WHERE proc_schema = 'SCHEMA_CATALOG' AND proc_name = 'execute_maintenance_job' AND schedule_interval != new_schedule_interval;
WHERE proc_schema = 'SCHEMA_CATALOG' AND proc_name = 'execute_maintenance_job' AND (schedule_interval != new_schedule_interval OR new_config IS DISTINCT FROM config) ;


SELECT count(*) INTO cnt
FROM timescaledb_information.jobs
WHERE proc_schema = 'SCHEMA_CATALOG' AND proc_name = 'execute_maintenance_job';

IF cnt < number_jobs THEN
PERFORM SCHEMA_TIMESCALE.add_job('SCHEMA_CATALOG.execute_maintenance_job', new_schedule_interval)
PERFORM SCHEMA_TIMESCALE.add_job('SCHEMA_CATALOG.execute_maintenance_job', new_schedule_interval, config=>new_config)
FROM generate_series(1, number_jobs-cnt);
END IF;

Expand All @@ -1808,9 +1895,9 @@ SECURITY DEFINER
--search path must be set for security definer
SET search_path = pg_temp;
--redundant given schema settings but extra caution for security definers
REVOKE ALL ON FUNCTION SCHEMA_PROM.config_maintenance_jobs(int, interval) FROM PUBLIC;
GRANT EXECUTE ON FUNCTION SCHEMA_PROM.config_maintenance_jobs(int, interval) TO prom_admin;
COMMENT ON FUNCTION SCHEMA_PROM.config_maintenance_jobs(int, interval)
REVOKE ALL ON FUNCTION SCHEMA_PROM.config_maintenance_jobs(int, interval, jsonb) FROM PUBLIC;
GRANT EXECUTE ON FUNCTION SCHEMA_PROM.config_maintenance_jobs(int, interval, jsonb) TO prom_admin;
COMMENT ON FUNCTION SCHEMA_PROM.config_maintenance_jobs(int, interval, jsonb)
IS 'Configure the number of maintence jobs run by the job scheduler, as well as their scheduled interval';


Expand Down Expand Up @@ -2478,11 +2565,13 @@ LANGUAGE PLPGSQL STABLE;
GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.get_metrics_that_need_compression() TO prom_maintenance;

--only for timescaledb 2.0 in 1.x we use compression policies
CREATE OR REPLACE PROCEDURE SCHEMA_CATALOG.execute_compression_policy()
CREATE OR REPLACE PROCEDURE SCHEMA_CATALOG.execute_compression_policy(log_verbose boolean = false)
AS $$
DECLARE
r SCHEMA_CATALOG.metric;
remaining_metrics SCHEMA_CATALOG.metric[] DEFAULT '{}';
startT TIMESTAMPTZ;
lockStartT TIMESTAMPTZ;
BEGIN
--Do one loop with metric that could be locked without waiting.
--This allows you to do everything you can while avoiding lock contention.
Expand All @@ -2497,7 +2586,15 @@ BEGIN
remaining_metrics := remaining_metrics || r;
CONTINUE;
END IF;
IF log_verbose THEN
startT := clock_timestamp();
RAISE LOG 'promscale maintenance: compression: metric %: starting, without lock wait', r.metric_name;
END IF;
PERFORM set_config('application_name', format('promscale maintenance: compression: metric %s', r.metric_name), false);
CALL SCHEMA_CATALOG.compress_metric_chunks(r.metric_name);
IF log_verbose THEN
RAISE LOG 'promscale maintenance: compression: metric %: finished in %', r.metric_name, clock_timestamp()-startT;
END IF;
PERFORM SCHEMA_CATALOG.unlock_metric_for_maintenance(r.id);

COMMIT;
Expand All @@ -2507,17 +2604,30 @@ BEGIN
SELECT *
FROM unnest(remaining_metrics)
LOOP
IF log_verbose THEN
lockStartT := clock_timestamp();
RAISE LOG 'promscale maintenance: compression: metric %: waiting for lock', r.metric_name;
END IF;
PERFORM set_config('application_name', format('promscale maintenance: compression: metric %s: waiting on lock', r.metric_name), false);
PERFORM SCHEMA_CATALOG.lock_metric_for_maintenance(r.id);
IF log_verbose THEN
startT := clock_timestamp();
RAISE LOG 'promscale maintenance: compression: metric %: starting', r.metric_name;
END IF;
PERFORM set_config('application_name', format('promscale maintenance: compression: metric %s', r.metric_name), false);
CALL SCHEMA_CATALOG.compress_metric_chunks(r.metric_name);
IF log_verbose THEN
RAISE LOG 'promscale maintenance: compression: metric %: finished in % (lock took %; compression took %)', r.metric_name, clock_timestamp()-lockStartT, startT-lockStartT, clock_timestamp()-startT;
END IF;
PERFORM SCHEMA_CATALOG.unlock_metric_for_maintenance(r.id);

COMMIT;
END LOOP;
END;
$$ LANGUAGE PLPGSQL;
COMMENT ON PROCEDURE SCHEMA_CATALOG.execute_compression_policy()
COMMENT ON PROCEDURE SCHEMA_CATALOG.execute_compression_policy(boolean)
IS 'compress data according to the policy. This procedure should be run regularly in a cron job';
GRANT EXECUTE ON PROCEDURE SCHEMA_CATALOG.execute_compression_policy() TO prom_maintenance;
GRANT EXECUTE ON PROCEDURE SCHEMA_CATALOG.execute_compression_policy(boolean) TO prom_maintenance;

CREATE OR REPLACE PROCEDURE SCHEMA_PROM.add_prom_node(node_name TEXT, attach_to_existing_metrics BOOLEAN = true)
AS $func$
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
DROP PROCEDURE IF EXISTS SCHEMA_CATALOG.drop_metric_chunks(TEXT, TIMESTAMPTZ, TIMESTAMPTZ);
DROP PROCEDURE IF EXISTS SCHEMA_CATALOG.execute_data_retention_policy();
DROP PROCEDURE IF EXISTS SCHEMA_PROM.execute_maintenance();
DROP FUNCTION IF EXISTS SCHEMA_PROM.config_maintenance_jobs(int, interval);
DROP PROCEDURE IF EXISTS SCHEMA_CATALOG.execute_compression_policy();
Loading

0 comments on commit 6f6fddc

Please sign in to comment.