Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add e2e test for deletion with pg log-based replication #958

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
extras_require={
'test': [
'pre-commit==2.18.1',
'python-dateutil==2.8.2',
'flake8==4.0.1',
'pytest==7.1.1',
'pytest-dependency==0.4.0',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from datetime import datetime
import dateutil.parser

from pipelinewise.fastsync import postgres_to_snowflake
from tests.end_to_end.helpers import assertions
from tests.end_to_end.target_snowflake.tap_postgres import TapPostgres

TAP_ID = 'postgres_to_sf_without_delete_in_target'
TARGET_ID = 'snowflake'


class TestReplicatePGToSFWithoutDeleteInTarget(TapPostgres):
"""
Resync tables from Postgres to Snowflake without replicating deletes
"""

# pylint: disable=arguments-differ
def setUp(self):
super().setUp(tap_id=TAP_ID, target_id=TARGET_ID)

def test_replicate_pg_to_sf_without_delete_in_target(self):
assertions.assert_run_tap_success(
self.tap_id, self.target_id, ['fastsync', 'singer']
)

result = self.e2e_env.run_query_target_snowflake(
f'SELECT _SDC_DELETED_AT FROM '
f'ppw_e2e_tap_postgres{self.e2e_env.sf_schema_postfix}."TABLE_WITH_SPACE AND UPPERCASE"'
f" where cvarchar='A';"
)[0][0]

self.assertIsNone(result)

# Delete row in source
self.e2e_env.run_query_tap_postgres(
'DELETE FROM public."table_with_space and UPPERCase" WHERE cvarchar = \'A\';'
)

# Run tap second time
assertions.assert_run_tap_success(
self.tap_id, self.target_id, ['singer'], profiling=True
)

deleted_row = self.e2e_env.run_query_target_snowflake(
f'SELECT _SDC_DELETED_AT FROM '
f'ppw_e2e_tap_postgres{self.e2e_env.sf_schema_postfix}."TABLE_WITH_SPACE AND UPPERCASE"'
f" where cvarchar='A';"
)[0]

# Validate that the entire row data is still in the target
for column in deleted_row:
self.assertIsNotNone(column)

deleted_at = self.e2e_env.run_query_target_snowflake(
f'SELECT _SDC_DELETED_AT FROM '
f'ppw_e2e_tap_postgres{self.e2e_env.sf_schema_postfix}."TABLE_WITH_SPACE AND UPPERCASE"'
f" where cvarchar='A';"
)[0][0]

# Validate that _sdc_deleted_at column exists and has been set
self.assertIsNotNone(deleted_at)
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
---

# ------------------------------------------------------------------------------
# General Properties
# ------------------------------------------------------------------------------
id: "postgres_to_sf_without_delete_in_target"
name: "PostgreSQL source test database"
type: "tap-postgres"
owner: "test-runner"


# ------------------------------------------------------------------------------
# Source (Tap) - PostgreSQL connection details
# ------------------------------------------------------------------------------
db_conn:
host: "${TAP_POSTGRES_HOST}" # PostgreSQL host
logical_poll_total_seconds: 30 # Time out if no LOG_BASED changes received for 3 seconds
port: ${TAP_POSTGRES_PORT} # PostgreSQL port
user: "${TAP_POSTGRES_USER}" # PostgreSQL user
password: "${TAP_POSTGRES_PASSWORD}" # Plain string or vault encrypted
dbname: "${TAP_POSTGRES_DB}" # PostgreSQL database name

hard_delete: false
add_metadata_columns: true

# ------------------------------------------------------------------------------
# Destination (Target) - Target properties
# Connection details should be in the relevant target YAML file
# ------------------------------------------------------------------------------
target: "snowflake" # ID of the target connector where the data will be loaded
batch_size_rows: 1000 # Batch size for the stream to optimise load performance
stream_buffer_size: 0 # In-memory buffer size (MB) between taps and targets for asynchronous data pipes


# ------------------------------------------------------------------------------
# Source to target Schema mapping
# ------------------------------------------------------------------------------
schemas:

### SOURCE SCHEMA 1: public
- source_schema: "public"
target_schema: "ppw_e2e_tap_postgres${TARGET_SNOWFLAKE_SCHEMA_POSTFIX}"

tables:
### Table with space and mixed upper and lowercase characters
- table_name: "table_with_space and UPPERCase"
replication_method: "LOG_BASED"