Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix alembic #57

Closed
wants to merge 12 commits into from
185 changes: 185 additions & 0 deletions postgresql_audit/alembic/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
import re

from alembic.autogenerate import comparators, rewriter
from alembic.operations import ops

from postgresql_audit.alembic.init_activity_table_triggers import (
InitActivityTableTriggersOp,
RemoveActivityTableTriggersOp
)
from postgresql_audit.alembic.migration_ops import (
AddColumnToActivityOp,
RemoveColumnFromRemoveActivityOp
)
from postgresql_audit.alembic.register_table_for_version_tracking import (
DeregisterTableForVersionTrackingOp,
RegisterTableForVersionTrackingOp
)


@comparators.dispatch_for('schema')
def compare_timestamp_schema(autogen_context, upgrade_ops, schemas):
routines = set()
for schema in schemas:
schema_name = (
autogen_context.dialect.default_schema_name if schema is None
else schema
)
routines.update([
(schema, *row) for row in autogen_context.connection.execute(f'''
SELECT routine_name, routine_definition
FROM information_schema.routines
WHERE routines.specific_schema='{schema_name}'
''')
])

for schema in schemas:
should_track_versions = any(
'versioned' in table.info
for table in autogen_context.sorted_tables
if table.info and table.schema == schema
)
schema_prefix = f'{schema}.' if schema else ''
tracked = f'{schema_prefix}audit_table' in [
routine[1] for routine in routines if routine[0] == schema
]

if should_track_versions:
if not tracked:
upgrade_ops.ops.insert(
0,
InitActivityTableTriggersOp(False, schema=schema)
)
else:
if tracked:
upgrade_ops.ops.append(
RemoveActivityTableTriggersOp(False, schema=schema)
)


@comparators.dispatch_for('table')
def compare_timestamp_table(
autogen_context,
modify_ops,
schemaname,
tablename,
conn_table,
metadata_table
):
if metadata_table is None:
return
meta_info = metadata_table.info or {}
schema_name = (
autogen_context.dialect.default_schema_name
if schemaname is None else schemaname
)

triggers = [row for row in autogen_context.connection.execute(
'SELECT event_object_schema AS table_schema,'
'event_object_table AS table_name,'
'trigger_schema,'
'trigger_name,'
"STRING_AGG(event_manipulation, ',') AS event,"
'action_timing AS activation,'
'action_condition AS condition,'
'action_statement AS definition '
'FROM information_schema.triggers '
f"WHERE event_object_table = '{tablename}' "
f"AND trigger_schema = '{schema_name}' "
'GROUP BY 1,2,3,4,6,7,8 '
'ORDER BY table_schema, table_name'
)]

trigger_name = 'audit_trigger'

if 'versioned' in meta_info:
excluded_columns = (
metadata_table.info['versioned'].get('exclude', tuple())
)
trigger = next(
(trigger for trigger in triggers if trigger_name in trigger[3]),
None
)
original_excluded_columns = __get_existing_excluded_columns(trigger)

if trigger and set(original_excluded_columns) == set(excluded_columns):
return

modify_ops.ops.insert(
0,
RegisterTableForVersionTrackingOp(
tablename,
excluded_columns,
original_excluded_columns,
schema=schema_name
)
)
else:
trigger = next(
(trigger for trigger in triggers if trigger_name in trigger[3]),
None
)
original_excluded_columns = __get_existing_excluded_columns(trigger)

if trigger:
modify_ops.ops.append(
DeregisterTableForVersionTrackingOp(
tablename,
original_excluded_columns,
schema=schema_name
)
)


def __get_existing_excluded_columns(trigger):
original_excluded_columns = ()
if trigger:
arguments_match = re.search(
r"EXECUTE FUNCTION create_activity\('{(.+)}'\)",
trigger[7]
)
if arguments_match:
original_excluded_columns = arguments_match.group(1).split(',')
return original_excluded_columns


writer = rewriter.Rewriter()


@writer.rewrites(ops.AddColumnOp)
def add_column_rewrite(context, revision, op):
table_info = op.column.table.info or {}
if (
'versioned' in table_info
and op.column.name not in table_info['versioned'].get('exclude', [])
):
return [
op,
AddColumnToActivityOp(
op.table_name,
op.column.name,
schema=op.column.table.schema,
),
]
else:
return op


@writer.rewrites(ops.DropColumnOp)
def drop_column_rewrite(context, revision, op):
column = op.to_column()
table_info = column.table.info or {}
if (
'versioned' in table_info
and column.name not in table_info['versioned'].get('exclude', [])
):
return [
op,
RemoveColumnFromRemoveActivityOp(
op.table_name,
column.name,
schema=column.table.schema,
),
]
else:
return op
120 changes: 120 additions & 0 deletions postgresql_audit/alembic/init_activity_table_triggers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
from alembic.autogenerate import renderers
from alembic.operations import MigrateOperation, Operations

from postgresql_audit.utils import (
create_audit_table,
create_operators,
render_tmpl
)


@Operations.register_operation('init_activity_table_triggers')
class InitActivityTableTriggersOp(MigrateOperation):
"""Initialize Activity Table Triggers"""

def __init__(self, use_statement_level_triggers, schema=None):
self.schema = schema
self.use_statement_level_triggers = use_statement_level_triggers

@classmethod
def init_activity_table_triggers(
cls, operations, use_statement_level_triggers, **kwargs
):
op = InitActivityTableTriggersOp(
use_statement_level_triggers, **kwargs
)
return operations.invoke(op)

def reverse(self):
# only needed to support autogenerate
return RemoveActivityTableTriggersOp(
self.use_statement_level_triggers, schema=self.schema
)


@Operations.register_operation('remove_activity_table_triggers')
class RemoveActivityTableTriggersOp(MigrateOperation):
"""Drop Activity Table Triggers"""

def __init__(self, use_statement_level_triggers, schema=None):
self.schema = schema
self.use_statement_level_triggers = use_statement_level_triggers

@classmethod
def remove_activity_table_triggers(cls, operations, **kwargs):
op = RemoveActivityTableTriggersOp(False, **kwargs)
return operations.invoke(op)

def reverse(self):
# only needed to support autogenerate
return InitActivityTableTriggersOp(
self.use_statement_level_triggers, schema=self.schema
)


@Operations.implementation_for(InitActivityTableTriggersOp)
def init_activity_table_triggers(operations, operation):
conn = operations
bind = conn.get_bind()

if operation.schema:
conn.execute(render_tmpl('create_schema.sql', operation.schema))

conn.execute(render_tmpl('jsonb_change_key_name.sql', operation.schema))
create_audit_table(
None, bind, operation.schema, operation.use_statement_level_triggers
)
create_operators(None, bind, operation.schema)


@Operations.implementation_for(RemoveActivityTableTriggersOp)
def remove_activity_table_triggers(operations, operation):
conn = operations
bind = conn.get_bind()

if operation.schema:
conn.execute(render_tmpl('drop_schema.sql', operation.schema))

conn.execute('''
DROP FUNCTION jsonb_change_key_name(
data jsonb, old_key text, new_key text
)
''')
schema_prefix = f'{operation.schema}.' if operation.schema else ''

conn.execute(f'''
DROP FUNCTION {schema_prefix}audit_table(
target_table regclass, ignored_cols text[]
)
''')
conn.execute(f'DROP FUNCTION {schema_prefix}create_activity()')

if bind.dialect.server_version_info < (9, 5, 0):
conn.execute('DROP FUNCTION jsonb_subtract(jsonb, TEXT)')
conn.execute('DROP OPERATOR IF EXISTS - (jsonb, text);')
conn.execute('DROP FUNCTION jsonb_merge(jsonb, jsonb)')
conn.execute('DROP OPERATOR IF EXISTS || (jsonb, jsonb);')
if bind.dialect.server_version_info < (9, 6, 0):
conn.execute('DROP FUNCTION current_setting(TEXT, BOOL)')
if bind.dialect.server_version_info < (10, 0):
conn.execute('DROP FUNCTION jsonb_subtract(jsonb, TEXT[])')
conn.execute('DROP OPERATOR IF EXISTS - (jsonb, text[])')

conn.execute('DROP OPERATOR IF EXISTS - (jsonb, jsonb)')
conn.execute('DROP FUNCTION get_setting(text, text)')
conn.execute('DROP FUNCTION jsonb_subtract(jsonb,jsonb)')


@renderers.dispatch_for(InitActivityTableTriggersOp)
def render_init_activity_table_triggers(autogen_context, op):
return 'op.init_activity_table_triggers(%r, **%r)' % (
op.use_statement_level_triggers,
{'schema': op.schema}
)


@renderers.dispatch_for(RemoveActivityTableTriggersOp)
def render_remove_activity_table_triggers(autogen_context, op):
return 'op.remove_activity_table_triggers(**%r)' % (
{'schema': op.schema}
)
Loading