Skip to content

Commit

Permalink
postgres agent sql
Browse files Browse the repository at this point in the history
  • Loading branch information
dariakharlan committed Aug 20, 2024
1 parent 9e63378 commit 4cae7d3
Show file tree
Hide file tree
Showing 32 changed files with 256 additions and 8 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ test-postgres: bootstrap run-postgres sleep
$(DOCKER_TEST) tests/test_input/test_1/test_postgres_http.py
$(DOCKER_TEST) tests/test_pipelines/test_1/test_postgres_http.py

test-postgres-py: bootstrap run-postgres sleep
$(DOCKER_TEST) tests/test_input/test_1/test_postgres_py.py
$(DOCKER_TEST) tests/test_pipelines/test_1/test_postgres_py.py

test-clickhouse: bootstrap run-clickhouse sleep
$(DOCKER_TEST) tests/test_input/test_2/test_clickhouse.py
$(DOCKER_TEST) tests/test_pipelines/test_2/test_clickhouse.py
Expand Down
2 changes: 1 addition & 1 deletion agent/Dockerfile.local
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@ RUN groupadd -g 10001 agent && useradd -r -u 10001 -g agent agent
RUN mkdir /var/log/agent /opt/metrics

ENTRYPOINT ["/custom_entrypoint.sh"]
CMD flask run --host=0.0.0.0 --port=8080
CMD flask run --host=0.0.0.0 --port=80
1 change: 1 addition & 0 deletions agent/src/agent/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
app.register_blueprint(data_extractors.actian)
app.register_blueprint(data_extractors.cacti)
app.register_blueprint(data_extractors.observium)
app.register_blueprint(data_extractors.postgres)
app.register_blueprint(data_extractors.rrd)
app.register_blueprint(data_extractors.snmp)
app.register_blueprint(data_extractors.topology)
Expand Down
1 change: 1 addition & 0 deletions agent/src/agent/api/routes/data_extractors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@
from .snmp import snmp
from .topology import topology
from .rrd import rrd
from .postgres import postgres
16 changes: 16 additions & 0 deletions agent/src/agent/api/routes/data_extractors/postgres.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from flask import jsonify, Blueprint, request
from agent import pipeline, data_extractor
from agent.api.routes import needs_pipeline

postgres = Blueprint('postgres_source', __name__)


@postgres.route('/data_extractors/postgresql/<pipeline_id>', methods=['GET'])
@needs_pipeline
def read(pipeline_id: str):
pipeline_ = pipeline.repository.get_by_id(pipeline_id)
offset = request.args.get('offset')
if not offset:
return jsonify('No offset provided'), 400
metrics = data_extractor.postgres.extract_metrics(pipeline_, int(offset))
return jsonify(metrics)
1 change: 1 addition & 0 deletions agent/src/agent/data_extractor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@
from . import topology
from . import rrd
from . import actian
from . import postgres
1 change: 1 addition & 0 deletions agent/src/agent/data_extractor/postgres/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .postgres import extract_metrics
19 changes: 19 additions & 0 deletions agent/src/agent/data_extractor/postgres/postgres.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import psycopg2
import psycopg2.extras
from agent import source
from agent.pipeline import Pipeline, TimestampType
from agent.modules import logger

logger_ = logger.get_logger(__name__)


def extract_metrics(pipeline_: Pipeline, offset: int) -> list:
cnx = psycopg2.connect(pipeline_.source.config[source.PostgresPySource.CONNECTION_STRING])
cursor = cnx.cursor(cursor_factory=psycopg2.extras.DictCursor)
timestamp_condition = f'{pipeline_.timestamp_path} >= {offset} AND {pipeline_.timestamp_path} < {offset} + {pipeline_.interval}'
query = pipeline_.query.replace('{TIMESTAMP_CONDITION}', timestamp_condition)
logger_.info(f'Executing query: {query}')
cursor.execute(query)
return [dict(row) for row in cursor]


1 change: 1 addition & 0 deletions agent/src/agent/pipeline/config/handlers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@
from . import prtg
from . import zabbix
from . import actian
from . import postgres
1 change: 1 addition & 0 deletions agent/src/agent/pipeline/config/handlers/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def _get_schema_handler(pipeline_: Pipeline, base_config: dict) -> SchemaConfigH
source.TYPE_OBSERVIUM: pipeline.config.handlers.observium.ObserviumConfigHandler,
source.TYPE_ORACLE: pipeline.config.handlers.jdbc.JDBCSchemaConfigHandler,
source.TYPE_POSTGRES: pipeline.config.handlers.jdbc.JDBCSchemaConfigHandler,
source.TYPE_POSTGRES_PY: pipeline.config.handlers.postgres.PostgresPyConfigHandler,
source.TYPE_PROMETHEUS: pipeline.config.handlers.promql.PromQLSchemaConfigHandler,
source.TYPE_PRTG: pipeline.config.handlers.prtg.PRTGSchemaConfigHandler,
source.TYPE_SAGE: pipeline.config.handlers.sage.SageSchemaConfigHandler,
Expand Down
16 changes: 16 additions & 0 deletions agent/src/agent/pipeline/config/handlers/postgres.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from agent.modules.logger import get_logger
from agent.pipeline.config import stages
from agent.pipeline.config.handlers.base import SchemaConfigHandler

logger = get_logger(__name__)


class PostgresPyConfigHandler(SchemaConfigHandler):
stages_to_override = {
'offset': stages.jdbc.JDBCOffsetScript,
'source': stages.source.postgres.PostgresPySource,
'JavaScriptEvaluator_01': stages.js_convert_metrics.JSConvertMetrics30,
'ExpressionEvaluator_02': stages.expression_evaluator.AddProperties30,
'destination': stages.destination.Destination,
'destination_watermark': stages.destination.WatermarkDestination,
}
3 changes: 2 additions & 1 deletion agent/src/agent/pipeline/config/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def _check_pipeline(self, pipeline_: Pipeline):
@classmethod
def _get_config_file(cls, pipeline: Pipeline) -> str:
return {
source.TYPE_ACTIAN: 'actian.json',
source.TYPE_ACTIAN: 'agent_data_extractor_sql.json',
source.TYPE_CLICKHOUSE: 'jdbc_schema.json',
source.TYPE_DIRECTORY: 'directory_schema.json',
source.TYPE_DATABRICKS: 'jdbc_schema.json',
Expand All @@ -78,6 +78,7 @@ def _get_config_file(cls, pipeline: Pipeline) -> str:
source.TYPE_ORACLE: 'jdbc_schema.json',
source.TYPE_OBSERVIUM: 'observium_schema.json',
source.TYPE_POSTGRES: 'jdbc_schema.json',
source.TYPE_POSTGRES_PY: 'agent_data_extractor_sql.json',
source.TYPE_PROMETHEUS: 'promql_schema.json',
source.TYPE_PRTG: 'prtg_schema.json',
source.TYPE_SAGE: 'sage_schema.json',
Expand Down
3 changes: 2 additions & 1 deletion agent/src/agent/pipeline/config/stages/source/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@
from . import prtg
from . import zabbix
from . import elastic
from . import actian
from . import actian
from . import postgres
2 changes: 1 addition & 1 deletion agent/src/agent/pipeline/config/stages/source/actian.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@


class ActianSource(JythonProcessor):
JYTHON_SCRIPT = 'actian.py'
JYTHON_SCRIPT = 'agent_data_extractor_sql.py'
DATA_EXTRACTOR_API_ENDPOINT = 'data_extractors/actian'

def _get_script_params(self) -> list[dict]:
Expand Down
29 changes: 29 additions & 0 deletions agent/src/agent/pipeline/config/stages/source/postgres.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import urllib.parse
from agent import monitoring
from agent.pipeline.config.stages.base import JythonProcessor


class PostgresPySource(JythonProcessor):
JYTHON_SCRIPT = 'agent_data_extractor_sql.py'
DATA_EXTRACTOR_API_ENDPOINT = 'data_extractors/postgresql'

def _get_script_params(self) -> list[dict]:
return [
{
'key': 'AGENT_DATA_EXTRACTOR_URL',
'value': urllib.parse.urljoin(
self.pipeline.streamsets.agent_external_url, '/'.join([
self.DATA_EXTRACTOR_API_ENDPOINT,
'${pipeline:id()}',
])
)
},
{
'key': 'TIMEOUT',
'value': str(self.pipeline.source.query_timeout)
},
{
'key': 'MONITORING_URL',
'value': monitoring.get_monitoring_source_error_url(self.pipeline)
},
]
5 changes: 3 additions & 2 deletions agent/src/agent/pipeline/config/validators/validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,15 @@ def validate(pipeline_):


def get_config_validator(pipeline_: Pipeline) -> Validator:
jdbc_sources = [
sql_sources = [
source.TYPE_ACTIAN,
source.TYPE_DATABRICKS,
source.TYPE_DRUID,
source.TYPE_IMPALA,
source.TYPE_MYSQL,
source.TYPE_MSSQL,
source.TYPE_POSTGRES,
source.TYPE_POSTGRES_PY,
source.TYPE_CLICKHOUSE,
source.TYPE_ORACLE,
]
Expand All @@ -69,7 +70,7 @@ def get_config_validator(pipeline_: Pipeline) -> Validator:
return validators.TopologyValidator()
if pipeline_.source.type == source.TYPE_ELASTIC:
return ElasticValidator()
if pipeline_.source.type in jdbc_sources:
if pipeline_.source.type in sql_sources:
return JDBCValidator()
if pipeline_.source.type == source.TYPE_VICTORIA:
return PromQLValidator()
Expand Down
4 changes: 3 additions & 1 deletion agent/src/agent/pipeline/json_builder/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from .elastic import ElasticBuilder
from .influx import InfluxBuilder, Influx2Builder
from .jdbc import JDBCBuilder, JDBCRawBuilder, JDBCEventBuilder
from .agent_data_extractor_sql_builder import AgentDataExtractorSQLBuilder
from .kafka import KafkaBuilder, KafkaRawBuilder
from .mongo import MongoBuilder
from .observium import ObserviumBuilder
Expand Down Expand Up @@ -37,7 +38,7 @@ def get(pipeline_: Pipeline, config: dict, is_edit=False) -> IBuilder:

def _get_builder(pipeline_: Pipeline, config: dict, is_edit: bool) -> IBuilder:
loaders = {
source.TYPE_ACTIAN: JDBCBuilder,
source.TYPE_ACTIAN: AgentDataExtractorSQLBuilder,
source.TYPE_CACTI: CactiBuilder,
source.TYPE_CLICKHOUSE: JDBCBuilder,
source.TYPE_DIRECTORY: DirectoryBuilder,
Expand All @@ -53,6 +54,7 @@ def _get_builder(pipeline_: Pipeline, config: dict, is_edit: bool) -> IBuilder:
source.TYPE_MYSQL: JDBCBuilder,
source.TYPE_ORACLE: JDBCBuilder,
source.TYPE_POSTGRES: JDBCBuilder,
source.TYPE_POSTGRES_PY: AgentDataExtractorSQLBuilder,
source.TYPE_PROMETHEUS: PromQLBuilder,
source.TYPE_PRTG: PRTGBuilder,
source.TYPE_OBSERVIUM: ObserviumBuilder,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from agent.pipeline.json_builder import Builder


class AgentDataExtractorSQLBuilder(Builder):
VALIDATION_SCHEMA_FILE_NAME = 'agent_data_extractor_sql'
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"type": "object",
"properties": {
"query": {"type": "string"},
"days_to_backfill": {"type": "integer"},
"interval": {"type": "integer"},
"delay": {"type": "integer"},
"values": {"type": "object"},
"count_records": {"type": "boolean"},
"dimensions": {"type": "array", "items": {"type": "string"}},
"watermark_in_local_timezone": {"type": "boolean"},
"query_missing_data_interval": {"type": "number"},
"timestamp": {"type": "object", "properties": {
"name": {"type": "string", "minLength": 1},
"type": {"type": "string", "enum": ["unix"]}
}, "required": ["name", "type"]},
"properties": {"type": "object"},
"tags": {
"type": "object",
"patternProperties": {"[^ \\.]+": {"type": "array", "items": {"type": "string"}}}
}
},
"required": ["query", "interval", "dimensions", "timestamp"]
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"query_missing_data_interval": {"type": "number"},
"timestamp": {"type": "object", "properties": {
"name": {"type": "string", "minLength": 1},
"type": {"type": "string", "enum": ["datetime", "unix", "unix_ms", "string"]}
"type": {"type": "string", "enum": ["datetime", "unix", "unix_ms"]}
}, "required": ["name", "type"]},
"properties": {"type": "object"},
"tags": {
Expand Down
1 change: 1 addition & 0 deletions agent/src/agent/pipeline/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def supports_schema(pipeline_: Pipeline) -> bool:
source.TYPE_OBSERVIUM: False,
source.TYPE_ORACLE: True,
source.TYPE_POSTGRES: True,
source.TYPE_POSTGRES_PY: True,
source.TYPE_PROMETHEUS: True,
source.TYPE_PRTG: True,
source.TYPE_RRD: False,
Expand Down
2 changes: 2 additions & 0 deletions agent/src/agent/source/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
TYPE_ORACLE = 'oracle'
TYPE_OBSERVIUM = 'observium'
TYPE_POSTGRES = 'postgres'
TYPE_POSTGRES_PY = 'postgres-py'
TYPE_PROMETHEUS = 'prometheus'
TYPE_PRTG = 'prtg'
TYPE_RRD = 'rrd'
Expand Down Expand Up @@ -58,6 +59,7 @@
TYPE_OBSERVIUM: ObserviumSource,
TYPE_ORACLE: JDBCSource,
TYPE_POSTGRES: JDBCSource,
TYPE_POSTGRES_PY: PostgresPySource,
TYPE_PROMETHEUS: PromQLSource,
TYPE_PRTG: PRTGSource,
TYPE_RRD: RRDSource,
Expand Down
1 change: 1 addition & 0 deletions agent/src/agent/source/sensitive_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def _get_keywords(source_type: str) -> List[str]:
source.TYPE_OBSERVIUM: ['username', 'password'],
source.TYPE_ORACLE: ['connection_string', 'hikariConfigBean.username', 'hikariConfigBean.password'],
source.TYPE_POSTGRES: ['connection_string', 'hikariConfigBean.username', 'hikariConfigBean.password'],
source.TYPE_POSTGRES_PY: ['connection_string'],
source.TYPE_PROMETHEUS: ['username', 'password'],
source.TYPE_PRTG: ['username', 'password'],
source.TYPE_RRD: [],
Expand Down
4 changes: 4 additions & 0 deletions agent/src/agent/source/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,10 @@ class ActianSource(Source):
CONNECTION_STRING = 'connection_string'


class PostgresPySource(Source):
CONNECTION_STRING = 'connection_string'


class SourceException(Exception):
pass

Expand Down
9 changes: 9 additions & 0 deletions agent/src/agent/source/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,14 @@ def validate_connection_string(self):
raise ValidationException('Wrong connection string format')


class PostgresPyValidator(JDBCValidator):
def validate_connection(self):
pass

def validate_connection_string(self):
pass


class ValidationException(Exception):
pass

Expand All @@ -444,6 +452,7 @@ def get_validator(source_: Source) -> Validator:
source.TYPE_OBSERVIUM: ObserviumValidator,
source.TYPE_ORACLE: OracleValidator,
source.TYPE_POSTGRES: JDBCValidator,
source.TYPE_POSTGRES_PY: PostgresPyValidator,
source.TYPE_PROMETHEUS: PromQLValidator,
source.TYPE_PRTG: PrtgValidator,
source.TYPE_RRD: RRDValidator,
Expand Down
32 changes: 32 additions & 0 deletions agent/tests/input_files/postgres_py_pipelines.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
[
{
"source": "test_postgres_py",
"pipeline_id": "test_postgres_py_file_short",
"values": {"clicks": "gauge", "impressions": "gauge"},
"dimensions": ["adsize", "country"],
"timestamp": {
"type": "unix",
"name": "timestamp_unix"
},
"interval": 86400,
"query": "SELECT * FROM test WHERE {TIMESTAMP_CONDITION}",
"days_to_backfill": 2305
},
{
"source": "test_postgres_py",
"pipeline_id": "test_postgres_py_file_full",
"values": {"clicks": "gauge", "impressions": "gauge"},
"count_records": true,
"count_records_measurement_name": "test",
"dimensions": ["adsize", "country"],
"timestamp": {
"type": "unix",
"name": "extract(epoch from timestamp_datetime)",
"alias": "timestamp_unix_converted"
},
"properties": {"key1": "val1", "key2": "val2"},
"interval": 86400,
"query": "SELECT extract(epoch from timestamp_datetime) as timestamp_unix_converted, adsize, country, clicks, impressions FROM test WHERE {TIMESTAMP_CONDITION} AND not country is NULL",
"days_to_backfill": 2305
}
]
9 changes: 9 additions & 0 deletions agent/tests/input_files/postgres_py_sources.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[
{
"type": "postgres-py",
"name": "test_postgres_py",
"config": {
"connection_string": "postgresql://postgres:password@postgres:5432/test"
}
}
]
17 changes: 17 additions & 0 deletions agent/tests/test_input/test_1/test_postgres_py.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import pytest

from datetime import datetime
from ..test_zpipeline_base import TestInputBase


def _get_days_to_backfill():
return (datetime.now() - datetime(year=2017, month=12, day=10)).days + 1


class TestPostgreSQL(TestInputBase):
__test__ = True
params = {
'test_create_source_with_file': [{'file_name': 'postgres_py_sources'}],
'test_create_with_file': [{'file_name': 'postgres_py_pipelines',
'override_config': {'days_to_backfill': _get_days_to_backfill()}}],
}
Loading

0 comments on commit 4cae7d3

Please sign in to comment.