Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AP-1927] added new argument for selecting replication method to sync tables #1189

Merged
merged 5 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dev-project/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ services:
--tlsMode requireTLS
--tlsAllowConnectionsWithoutCertificates
--tlsCertificateKeyFile /etc/ssl/mongodb.pem
--tlsAllowInvalidHostnames
--tlsCAFile /etc/ssl/rootCA.pem
--quiet
networks:
Expand Down
2 changes: 1 addition & 1 deletion dev-project/mongo/create-pipelinewise-user.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
echo 'CREATE MONGODB PIPELINEWISE USER'

mongo --tls --tlsAllowInvalidCertificates -u $MONGO_INITDB_ROOT_USERNAME -p $MONGO_INITDB_ROOT_PASSWORD --authenticationDatabase admin admin <<EOF
mongo --tls --tlsAllowInvalidCertificates --tlsAllowInvalidHostnames -u $MONGO_INITDB_ROOT_USERNAME -p $MONGO_INITDB_ROOT_PASSWORD --authenticationDatabase admin admin <<EOF
db.getName();
db.createUser({
user: $(jq --arg 'user' $MONGO_USERNAME --null-input '$user'),
Expand Down
4 changes: 4 additions & 0 deletions docs/user_guide/resync.rst
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,11 @@ add the ``--tables`` argument:
table_mb: <integer/float>


.. attention::

There is an option to chose tables for re-sync which has a specific replication method by ``--replication_method_only <name of replication method>``

$ pipelinewise sync_tables --target <target_id> --tap <tap_id> --replication_method_only log_based



Expand Down
2 changes: 2 additions & 0 deletions pipelinewise/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,8 @@ def main():
parser.add_argument('--force', default=False, required=False,
help='Force sync_tables for full sync', action='store_true'
)
parser.add_argument('--replication_method_only', default='*', type=str,
help='Sync only tables which their replication method is as entered value')

args = parser.parse_args()

Expand Down
16 changes: 10 additions & 6 deletions pipelinewise/cli/pipelinewise.py
Original file line number Diff line number Diff line change
Expand Up @@ -1397,7 +1397,9 @@ def do_sync_tables(self, fastsync_stream_ids=None):
else:
tables_to_sync = self.args.tables

selected_tables = self._get_sync_tables_setting_from_selection_file(tables_to_sync)
selected_tables = self._get_sync_tables_setting_from_selection_file(
tables_to_sync, self.args.replication_method_only)

processes_list = []
if selected_tables['partial_sync']:
self._reset_state_file_for_partial_sync(selected_tables)
Expand Down Expand Up @@ -2098,7 +2100,8 @@ def _clean_tables_from_bookmarks_in_state_file(state_file_to_clean: str, tables:
def _get_fixed_name_of_table(stream_id):
return stream_id.replace('-', '.', 1)

def _get_sync_tables_setting_from_selection_file(self, tables):
def _get_sync_tables_setting_from_selection_file(self, tables, replication_method_only='*'):
replication_method = replication_method_only.upper()
selection = utils.load_json(self.tap['files']['selection'])
selection = selection.get('selection')
all_tables = {'full_sync': [], 'partial_sync': {}}
Expand All @@ -2107,10 +2110,11 @@ def _get_sync_tables_setting_from_selection_file(self, tables):
for table in selection:
table_name = self._get_fixed_name_of_table(table['tap_stream_id'])
if tables_list is None or table_name in tables_list:
if table.get('sync_start_from'):
all_tables['partial_sync'][table_name] = table['sync_start_from']
else:
all_tables['full_sync'].append(table_name)
if replication_method in ['*', table.get('replication_method')]:
if table.get('sync_start_from'):
all_tables['partial_sync'][table_name] = table['sync_start_from']
else:
all_tables['full_sync'].append(table_name)
return all_tables

def __check_if_table_is_selected(self, table_in_properties):
Expand Down
2 changes: 1 addition & 1 deletion scripts/publish_docs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ if [ -z "$GH_TOKEN" ]; then
fi

# Install dependencies in a virtual env
python3.8 -m venv ~/venv-doc
python3 -m venv ~/venv-doc
. ~/venv-doc/bin/activate
pip install --upgrade pip
pip install sphinx sphinx-rtd-theme
Expand Down
4 changes: 3 additions & 1 deletion tests/units/cli/cli_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ def __init__(
extra_log=False,
debug=False,
profiler=False,
force=False
force=False,
replication_method_only='*'
):
self.target = target
self.tap = tap
Expand All @@ -36,6 +37,7 @@ def __init__(
self.debug = debug
self.profiler = profiler
self.force = force
self.replication_method_only = replication_method_only

# "log" Getters and setters
@property
Expand Down
30 changes: 30 additions & 0 deletions tests/units/cli/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1077,3 +1077,33 @@ def test_get_sync_tables_setting_from_selection_file_if_tables_parameter_is_none
'partial_sync': {'baz': 'PARTIAL', 'par': 'PARTIAL_par'}
}
assert actual_selected_tables == expected_selected_tables

def test_get_sync_tables_if_using_replication_method_only(self):
"""Test if the method for getting list of tables for syncing returns only tables with selected
replication method"""
tables = 'foo,foo_bar,foo_bar_baz,bar_baz,bar,par,par_foo'
with patch('pipelinewise.cli.pipelinewise.utils.load_json') as mocked_load_json:
self.pipelinewise.tap = {
'files': {
'selection': 'foo.json'
}
}
mocked_load_json.return_value = {
'selection': [
{'tap_stream_id': 'foo', 'replication_method': 'FULL_TABLE'},
{'tap_stream_id': 'foo_bar', 'replication_method': 'LOG_BASED'},
{'tap_stream_id': 'foo_bar_baz', 'replication_method': 'INCREMENTAL'},
{'tap_stream_id': 'bar_baz', 'replication_method': 'FULL_TABLE'},
{'tap_stream_id': 'bar', 'replication_method': 'LOG_BASED'},
{'tap_stream_id': 'par', 'sync_start_from': 'PARTIAL_par', 'replication_method': 'INCREMENTAL'},
{'tap_stream_id': 'par_foo', 'sync_start_from': 'PARTIAL_par_foo',
'replication_method': 'LOG_BASED'}
]
}
actual_selected_tables = self.pipelinewise._get_sync_tables_setting_from_selection_file(
tables, replication_method_only='log_based')
expected_selected_tables = {
'full_sync': ['foo_bar', 'bar'],
'partial_sync': {'par_foo': 'PARTIAL_par_foo'}
}
assert actual_selected_tables == expected_selected_tables
Loading