Skip to content

Commit

Permalink
[AP-1241]Fixed where clause for partial sync (#988)
Browse files Browse the repository at this point in the history
* fixed where clause for partial sync

* fix for where clause

* fixed unittest
  • Loading branch information
amofakhar authored Jul 8, 2022
1 parent fdb6bc0 commit 24a7146
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 8 deletions.
4 changes: 2 additions & 2 deletions pipelinewise/fastsync/commons/tap_mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,9 +439,9 @@ def copy_table(
table_dict = utils.tablename_to_dict(table_name)
where_clause_sql = ''
if where_clause_setting:
where_clause_sql = f' WHERE {where_clause_setting["column"]} >= {where_clause_setting["start_value"]}'
where_clause_sql = f' WHERE {where_clause_setting["column"]} >= \'{where_clause_setting["start_value"]}\''
if where_clause_setting['end_value']:
where_clause_sql += f' AND {where_clause_setting["column"]} <= {where_clause_setting["end_value"]}'
where_clause_sql += f' AND {where_clause_setting["column"]} <= \'{where_clause_setting["end_value"]}\''

sql = """SELECT {}
,CONVERT_TZ( NOW(),@@session.time_zone,'+00:00') AS _SDC_EXTRACTED_AT
Expand Down
4 changes: 2 additions & 2 deletions pipelinewise/fastsync/commons/tap_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,9 +500,9 @@ def copy_table(

where_clause_sql = ''
if where_clause_setting:
where_clause_sql = f' WHERE {where_clause_setting["column"]} >= {where_clause_setting["start_value"]}'
where_clause_sql = f' WHERE {where_clause_setting["column"]} >= \'{where_clause_setting["start_value"]}\''
if where_clause_setting['end_value']:
where_clause_sql += f' AND {where_clause_setting["column"]} <= {where_clause_setting["end_value"]}'
where_clause_sql += f' AND {where_clause_setting["column"]} <= \'{where_clause_setting["end_value"]}\''

sql = """COPY (SELECT {}
,now() AT TIME ZONE 'UTC'
Expand Down
4 changes: 2 additions & 2 deletions pipelinewise/fastsync/partialsync/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ def load_into_snowflake(
target_schema = common_utils.get_target_schema(args.target, args.table)
table_dict = common_utils.tablename_to_dict(args.table)
target_table = table_dict.get('table_name')
where_clause = f'WHERE {args.column} >= {args.start_value}'
where_clause = f'WHERE {args.column} >= \'{args.start_value}\''
if args.end_value:
where_clause += f' AND {args.column} <= {args.end_value}'
where_clause += f' AND {args.column} <= \'{args.end_value}\''

snowflake.query(f'DELETE FROM {target_schema}."{target_table.upper()}" {where_clause}')
# copy partial data into the table
Expand Down
4 changes: 2 additions & 2 deletions tests/units/partialsync/test_partial_sync_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@ def test_load_into_snowflake(self):
test_s3_keys = ['s3_key_foo']
test_tap_id = args.target['tap_id']
test_bucket = args.target['s3_bucket']
where_clause_for_end = f' AND {args.column} <= {args.end_value}' if args.end_value else ''
where_clause_for_end = f" AND {args.column} <= '{args.end_value}'" if args.end_value else ''

mocked_snowflake = mock.MagicMock()

load_into_snowflake(mocked_snowflake, args, test_s3_keys, test_s3_key_pattern, test_size_byte)

mocked_snowflake.query.assert_called_with(
f'DELETE FROM {test_target_schema}."{test_table.upper()}"'
f' WHERE {args.column} >= {args.start_value}{where_clause_for_end}')
f' WHERE {args.column} >= \'{args.start_value}\'{where_clause_for_end}')

mocked_snowflake.copy_to_table.assert_called_with(
test_s3_key_pattern, test_target_schema, args.table, test_size_byte, is_temporary=False
Expand Down

0 comments on commit 24a7146

Please sign in to comment.