Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AP-2038] implementing reset_state command #1192

Merged
merged 5 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/user_guide/cli.rst
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,18 @@ Validates a project directory with YAML tap and target files.
:--dir: relative path to the project directory with YAML taps and targets.


reset_state
"""""""""""

Reset state file for log based tables. It works only for PostgresSQL databases!

:--target: Target connector id

:--tap: Tap connector id

.. _cli_reset_state:


Environment variables
---------------------

Expand Down
5 changes: 3 additions & 2 deletions pipelinewise/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@
'import_config', # This is for backward compatibility; use 'import' instead
'validate',
'encrypt_string',
'partial_sync_table'
'partial_sync_table',
'reset_state'
]


Expand Down Expand Up @@ -150,7 +151,7 @@ def _validate_command_specific_arguments(args):
if args.command == 'init' and args.name == '*':
raise CommandSpecificArgumentsException('You must specify a project name using the argument --name')

if args.command in ['discover_tap', 'test_tap_connection', 'run_tap', 'stop_tap', 'sync_tables']:
if args.command in ['discover_tap', 'test_tap_connection', 'run_tap', 'stop_tap', 'sync_tables', 'reset_state']:
if args.tap == '*':
raise CommandSpecificArgumentsException('You must specify a source name using the argument --tap')
if args.target == '*':
Expand Down
34 changes: 32 additions & 2 deletions pipelinewise/cli/pipelinewise.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import sys
import json
import copy

import psutil
import pidfile

Expand Down Expand Up @@ -1863,6 +1864,35 @@ def sync_tables_partial_sync(self, defined_tables=None):
if cons_target_config:
utils.silentremove(cons_target_config)

def reset_state(self):
"""Reset state file"""

if self.tap.get('type') == 'tap-postgres':
self._update_state_file('lsn', 1)
self.logger.info('state file is reset for log based tables!')
else:
self.logger.error('state reset is not supported for %s (%s)!',
self.tap.get('id'), self.tap.get('type'))
raise SystemExit(1)

def _update_state_file(self, table_property, new_value):
tap_state = self.tap['files']['state']
try:
with open(tap_state, 'r', encoding='utf8') as state_file:
state_content = json.load(state_file)
bookmarks = state_content.get('bookmarks')
for table, properties in bookmarks.items():
if table_property in properties:
bookmarks[table][table_property] = new_value
state_content['bookmarks'] = bookmarks

with open(tap_state, 'w', encoding='utf8') as state_file:
json.dump(state_content, state_file, indent=4)

except Exception as exp:
self.logger.error(exp)
raise SystemExit(1) from exp

@staticmethod
def _remove_not_partial_synced_tables_from_properties(tap_params, not_synced_tables):
"""" Remove partial sync table which are not synced yet from properties """
Expand All @@ -1888,7 +1918,7 @@ def _reset_state_file_for_partial_sync(self, selected_tables):
filtered_bookmarks = dict(filter(lambda k: k[0] not in selected_partial_sync_tables, bookmarks.items()))
state_content['bookmarks'] = filtered_bookmarks
with open(tap_state, 'w', encoding='utf8') as state_file:
json.dump(state_content, state_file)
json.dump(state_content, state_file, indent=4)

def _check_supporting_tap_and_target_for_partial_sync(self):
tap_type = self.tap['type']
Expand Down Expand Up @@ -2088,7 +2118,7 @@ def _clean_tables_from_bookmarks_in_state_file(state_file_to_clean: str, tables:
bookmarks.pop(table_name.replace('"', ''), None)

state_file.seek(0)
json.dump(state_data, state_file)
json.dump(state_data, state_file, indent=4)
state_file.truncate()

except FileNotFoundError:
Expand Down
18 changes: 18 additions & 0 deletions tests/units/cli/resources/test_reset_state/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"targets": [
{
"id": "target_foo",
"type": "target-snowflake",
"taps": [
{
"id": "tap_pg",
"type": "tap-postgres"
},
{
"id": "tap_bar",
"type": "tap-bar"
}
]
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"bookmarks": {
"foo_table": {
"lsn": 456321
},
"bar_table": {
"foo": "bar"
}
}
}
74 changes: 74 additions & 0 deletions tests/units/cli/test_reset_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import json
import os

from unittest import TestCase, mock

from pipelinewise import cli


class TestResetState(TestCase):
"""Testcases for reset state CLI"""
def setUp(self):
resources_dir = f'{os.path.dirname(__file__)}/resources'
self.test_cli = cli
self.test_cli.CONFIG_DIR = f'{resources_dir}/test_reset_state'
self.test_cli.VENV_DIR = './virtualenvs-dummy'

def _run_cli(self, arguments_dict: dict) -> None:
"""Running the test CLI application"""
argv_list = ['main', 'reset_state']
if arguments_dict.get('tap'):
argv_list.extend(['--tap', arguments_dict['tap']])
if arguments_dict.get('target'):
argv_list.extend(['--target', arguments_dict['target']])

with mock.patch('sys.argv', argv_list):
self.test_cli.main()

def test_reset_state_file_if_tap_is_pg(self):
""" Test reset_state command for Postgres taps"""
state_content = {
'bookmarks': {
'foo_table': {
'lsn': 54321
},
'bar_table': {
'foo': 'bar'
}
}
}
with open(f'{self.test_cli.CONFIG_DIR}/target_foo/tap_pg/state.json', 'w', encoding='utf-8') as state_file:
json.dump(state_content, state_file)

arguments = {
'tap': 'tap_pg',
'target': 'target_foo'
}
self._run_cli(arguments)

with open(f'{self.test_cli.CONFIG_DIR}/target_foo/tap_pg/state.json', 'r', encoding='utf-8') as state_file:
actual_state = json.load(state_file)

expected_state = {
'bookmarks': {
'foo_table': {
'lsn': 1
},
'bar_table': {
'foo': 'bar'
}
}
}
self.assertDictEqual(expected_state, actual_state)

def test_exit_with_error_1_if_tap_is_not_allowed(self):
""" Test reset_state command exit with error 1 if tap is not allowed for it"""
arguments = {
'tap': 'tap_bar',
'target': 'target_foo'
}

with self.assertRaises(SystemExit) as system_exit:
self._run_cli(arguments)

self.assertEqual(system_exit.exception.code, 1)
Loading