From 9e6b5344317724e77dc1b34d8c36f7cb62803f3d Mon Sep 17 00:00:00 2001 From: Jay Tilala Date: Wed, 27 Jul 2022 16:33:06 +0530 Subject: [PATCH] add integration test for Braintree --- tests/base.py | 365 +++++++++++++++++++++++ tests/test_braintree_all_fields.py | 85 ++++++ tests/test_braintree_automatic_fields.py | 73 +++++ tests/test_braintree_bookmark.py | 189 ++++++++++++ tests/test_braintree_discovery.py | 142 +++++++++ tests/test_braintree_pagination.py | 73 +++++ tests/test_braintree_start_date.py | 161 ++++++++++ 7 files changed, 1088 insertions(+) create mode 100644 tests/base.py create mode 100644 tests/test_braintree_all_fields.py create mode 100644 tests/test_braintree_automatic_fields.py create mode 100644 tests/test_braintree_bookmark.py create mode 100644 tests/test_braintree_discovery.py create mode 100644 tests/test_braintree_pagination.py create mode 100644 tests/test_braintree_start_date.py diff --git a/tests/base.py b/tests/base.py new file mode 100644 index 0000000..bd48cde --- /dev/null +++ b/tests/base.py @@ -0,0 +1,365 @@ +import unittest +import os +from datetime import timedelta +from datetime import datetime as dt +import time +import dateutil.parser +from dateutil.parser import parse +import singer +from tap_tester import connections, menagerie, runner + +logger = singer.get_logger() + + +class BraintreeBaseTest(unittest.TestCase): + + """ + Setup expectations for test sub classes. + Metadata describing streams. + A bunch of shared methods that are used in tap-tester tests. + Shared tap-specific methods (as needed). + """ + + AUTOMATIC_FIELDS = "automatic" + PRIMARY_KEYS = "table-key-properties" + REPLICATION_METHOD = "forced-replication-method" + INCREMENTAL = "INCREMENTAL" + FULL_TABLE = "FULL_TABLE" + BOOKMARK = "bookmark" + REPLICATION_KEYS = "REPLICATION_KEYS" + START_DATE_FORMAT = "%Y-%m-%dT%H:%M:%S" + start_date = "2022-06-25T00:00:00Z" + + def setUp(self): + required_creds = { + "merchant_id": "TAP_BRAINTREE_MERCHANT_ID", + "public_key": "TAP_BRAINTREE_PUBLIC_KEY", + "private_key": "TAP_BRAINTREE_PRIVATE_KEY", + "start_date": "TAP_BRAINTREE_START_DATE", + "environment": "TAP_BRAINTREE_ENVIRONMENT", + } + missing_creds = [v for v in required_creds.values() if not os.getenv(v)] + if missing_creds: + raise Exception("set " + ", ".join(missing_creds)) + self._credentials = {k: os.getenv(v) for k, v in required_creds.items()} + + def get_credentials(self): + self._credentials["merchant_id"] = os.getenv("TAP_BRAINTREE_MERCHANT_ID") + self._credentials["public_key"] = os.getenv("TAP_BRAINTREE_PUBLIC_KEY") + self._credentials["private_key"] = os.getenv("TAP_BRAINTREE_PRIVATE_KEY") + self._credentials["start_date"] = os.getenv("TAP_BRAINTREE_START_DATE") + self._credentials["environment"] = os.getenv("TAP_BRAINTREE_ENVIRONMENT") + return self._credentials + + @staticmethod + def tap_name(): + """The name of the tap""" + return "tap-braintree" + + @staticmethod + def get_type(): + return "platform.braintree" + + def get_properties(self): + """Configuration properties required for the tap.""" + return {"start_date": self.start_date} + + def expected_metadata(self): + """The expected primary key of the streams""" + + return { + "add_ons": { + self.PRIMARY_KEYS: {"id"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"updated_at"}, + self.BOOKMARK: {"updated_at"}, + }, + "customers": { + self.PRIMARY_KEYS: {"id"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"created_at"}, + self.BOOKMARK: {"created_at"}, + }, + "discounts": { + self.PRIMARY_KEYS: {"id"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"updated_at"}, + self.BOOKMARK: {"updated_at"}, + }, + "disputes": { + self.PRIMARY_KEYS: {"id"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"received_date"}, + self.BOOKMARK: {"received_date"}, + }, + "merchant_accounts": { + self.PRIMARY_KEYS: {"id"}, + self.REPLICATION_METHOD: self.FULL_TABLE, + }, + "plans": { + self.PRIMARY_KEYS: {"id"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"updated_at"}, + self.BOOKMARK: {"updated_at"}, + }, + "settlement_batch_summary": { + self.PRIMARY_KEYS: {"settlement_date"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"settlement_date"}, + self.BOOKMARK: {"settlement_date"}, + }, + "subscriptions": { + self.PRIMARY_KEYS: {"id"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"created_at"}, + self.BOOKMARK: {"created_at"}, + }, + "transactions": { + self.PRIMARY_KEYS: {"id"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"created_at"}, + self.BOOKMARK: {"created_at"}, + }, + } + + def expected_streams(self): + """A set of expected stream names""" + return set(self.expected_metadata().keys()) + + def expected_primary_keys(self): + """return a dictionary with key of table name and value as a set of primary key fields""" + return { + table: properties.get(self.PRIMARY_KEYS, set()) + for table, properties in self.expected_metadata().items() + } + + def expected_bookmark_keys(self): + """return a dictionary with key of table name and value as a set of bookmark key fields""" + return { + table: properties.get(self.BOOKMARK, set()) + for table, properties in self.expected_metadata().items() + } + + def expected_replication_keys(self): + """return a dictionary with key of table name and value as a set of replication key fields""" + return { + table: properties.get(self.REPLICATION_KEYS, set()) + for table, properties in self.expected_metadata().items() + } + + def expected_automatic_fields(self): + """return a dictionary with key of table name and set of value of automatic(primary key and bookmark field) fields""" + auto_fields = {} + for k, v in self.expected_metadata().items(): + auto_fields[k] = v.get(self.PRIMARY_KEYS, set()) | v.get( + self.BOOKMARK, set() + ) + return auto_fields + + def expected_replication_method(self): + """return a dictionary with key of table name and value of replication method""" + return { + table: properties.get(self.REPLICATION_METHOD, None) + for table, properties in self.expected_metadata().items() + } + + def run_and_verify_check_mode(self, conn_id): + """ + Run the tap in check mode and verify it succeeds. + This should be ran prior to field selection and initial sync. + Return the connection id and found catalogs from menagerie. + """ + # run in check mode + check_job_name = runner.run_check_mode(self, conn_id) + + # verify check exit codes + exit_status = menagerie.get_exit_status(conn_id, check_job_name) + menagerie.verify_check_exit_status(self, exit_status, check_job_name) + + found_catalogs = menagerie.get_catalogs(conn_id) + self.assertGreater( + len(found_catalogs), + 0, + msg="unable to locate schemas for connection {}".format(conn_id), + ) + + found_catalog_names = set(map(lambda c: c["stream_name"], found_catalogs)) + print(found_catalog_names) + + self.assertSetEqual( + self.expected_streams(), + found_catalog_names, + msg="discovered schemas do not match", + ) + print("discovered schemas are OK") + + return found_catalogs + + def run_and_verify_sync(self, conn_id): + """ + Run a sync job and make sure it exited properly. + Return a dictionary with keys of streams synced + and values of records synced for each stream + """ + # Run a sync job using orchestrator + sync_job_name = runner.run_sync_mode(self, conn_id) + + # Verify tap and target exit codes + exit_status = menagerie.get_exit_status(conn_id, sync_job_name) + menagerie.verify_sync_exit_status(self, exit_status, sync_job_name) + + # Verify actual rows were synced + sync_record_count = runner.examine_target_output_file( + self, conn_id, self.expected_streams(), self.expected_primary_keys() + ) + self.assertGreater( + sum(sync_record_count.values()), + 0, + msg="failed to replicate any data: {}".format(sync_record_count), + ) + print("total replicated row count: {}".format(sum(sync_record_count.values()))) + + return sync_record_count + + def perform_and_verify_table_and_field_selection( + self, conn_id, test_catalogs, select_all_fields=True + ): + """ + Perform table and field selection based off of the streams to select + set and field selection parameters. + + Verify this results in the expected streams selected and all or no + fields selected for those streams. + """ + + # Select all available fields or select no fields from all testable streams + self.select_all_streams_and_fields( + conn_id=conn_id, catalogs=test_catalogs, select_all_fields=select_all_fields + ) + + catalogs = menagerie.get_catalogs(conn_id) + + # Ensure our selection affects the catalog + expected_selected = [tc.get("stream_name") for tc in test_catalogs] + for cat in catalogs: + catalog_entry = menagerie.get_annotated_schema(conn_id, cat["stream_id"]) + + # Verify all testable streams are selected + selected = catalog_entry.get("annotated-schema").get("selected") + print("Validating selection on {}: {}".format(cat["stream_name"], selected)) + if cat["stream_name"] not in expected_selected: + self.assertFalse(selected, msg="Stream selected, but not testable.") + continue # Skip remaining assertions if we aren't selecting this stream + self.assertTrue(selected, msg="Stream not selected.") + + if select_all_fields: + # Verify all fields within each selected stream are selected + for field, field_props in ( + catalog_entry.get("annotated-schema").get("properties").items() + ): + field_selected = field_props.get("selected") + print( + "\tValidating selection on {}.{}: {}".format( + cat["stream_name"], field, field_selected + ) + ) + self.assertTrue(field_selected, msg="Field not selected.") + else: + # Verify only automatic fields are selected + expected_automatic_fields = self.expected_automatic_fields().get( + cat["stream_name"] + ) + selected_fields = self.get_selected_fields_from_metadata( + catalog_entry["metadata"] + ) + self.assertEqual(expected_automatic_fields, selected_fields) + + @staticmethod + def get_selected_fields_from_metadata(metadata): + """Get selected fields from metadata""" + selected_fields = set() + for field in metadata: + is_field_metadata = len(field["breadcrumb"]) > 1 + inclusion_automatic_or_selected = ( + field["metadata"]["selected"] is True + or field["metadata"]["inclusion"] == "automatic" + ) + if is_field_metadata and inclusion_automatic_or_selected: + selected_fields.add(field["breadcrumb"][1]) + return selected_fields + + @staticmethod + def select_all_streams_and_fields( + conn_id, catalogs, select_all_fields: bool = True + ): + """Select all streams and all fields within streams""" + for catalog in catalogs: + schema = menagerie.get_annotated_schema(conn_id, catalog["stream_id"]) + + non_selected_properties = [] + if not select_all_fields: + # get a list of all properties so that none are selected + non_selected_properties = ( + schema.get("annotated-schema", {}).get("properties", {}).keys() + ) + + connections.select_catalog_and_fields_via_metadata( + conn_id, catalog, schema, [], non_selected_properties + ) + + def calculated_states_by_stream(self, current_state): + """Return calculated states for all the streams""" + timedelta_by_stream = { + stream: [0, 0, 1] # {stream_name: [days, hours, minutes], ...} + for stream in self.expected_streams() + } + + stream_to_calculated_state = { + stream: "" for stream in current_state["bookmarks"].keys() + } + for stream, state in current_state["bookmarks"].items(): + state_key, state_value = next(iter(state.keys())), next( + iter(state.values()) + ) + state_as_datetime = dateutil.parser.parse(state_value) + + days, hours, minutes = timedelta_by_stream[stream] + calculated_state_as_datetime = state_as_datetime - timedelta( + days=days, hours=hours, minutes=minutes + ) + + state_format = "%Y-%m-%dT%H:%M:%SZ" + calculated_state_formatted = dt.strftime( + calculated_state_as_datetime, state_format + ) + + stream_to_calculated_state[stream] = {state_key: calculated_state_formatted} + + return stream_to_calculated_state + + def timedelta_formatted(self, dtime, days=0): + """Return parsed date by adding parsed days""" + date_stripped = dt.strptime(dtime, self.START_DATE_FORMAT) + return_date = date_stripped + timedelta(days=days) + + return dt.strftime(return_date, self.START_DATE_FORMAT) + + def is_incremental(self, stream): + """Check that given stream is INCREMENTAL or not""" + return ( + self.expected_metadata()[stream][self.REPLICATION_METHOD] + == self.INCREMENTAL + ) + + def dt_to_ts(self, dtime): + return int( + time.mktime(dt.strptime(dtime[:19], self.START_DATE_FORMAT).timetuple()) + ) + + def timedelta_format(self, datetime, days=0, hours=0, minutes=0): + + """Add specified days,hours,minutes in given date and return in DATE_FORMAT""" + return_date = parse(datetime) + timedelta( + days=days, hours=hours, minutes=minutes + ) + return dt.strftime(return_date, self.START_DATE_FORMAT) diff --git a/tests/test_braintree_all_fields.py b/tests/test_braintree_all_fields.py new file mode 100644 index 0000000..594710c --- /dev/null +++ b/tests/test_braintree_all_fields.py @@ -0,0 +1,85 @@ +import tap_tester.connections as connections +import tap_tester.runner as runner +import tap_tester.menagerie as menagerie +from base import BraintreeBaseTest + + +class AllFieldsTest(BraintreeBaseTest): + """Ensure running the tap with all streams and fields selected results in the replication of all fields.""" + + def name(self): + """Name of test""" + + return "tap_tester_braintree_all_fields" + + def test_run(self): + """ + • Verify no unexpected streams were replicated + • Verify that more than just the automatic fields are replicated for each stream. + • verify all fields for each stream are replicated + """ + + # Streams to verify all fields tests + # Here certain streams are removed as we are not able to generate data for those streams + expected_streams = self.expected_streams() + expected_automatic_fields = self.expected_automatic_fields() + conn_id = connections.ensure_connection(self) + found_catalogs = self.run_and_verify_check_mode(conn_id) + + # table and field selection + test_catalogs_all_fields = [ + catalog + for catalog in found_catalogs + if catalog.get("tap_stream_id") in expected_streams + ] + + self.perform_and_verify_table_and_field_selection( + conn_id, test_catalogs_all_fields + ) + + # grab metadata after performing table-and-field selection to set expectations + # used for asserting all fields are replicated + stream_to_all_catalog_fields = dict() + for catalog in test_catalogs_all_fields: + stream_id, stream_name = catalog["stream_id"], catalog["stream_name"] + catalog_entry = menagerie.get_annotated_schema(conn_id, stream_id) + fields_from_field_level_md = [ + md_entry["breadcrumb"][1] + for md_entry in catalog_entry["metadata"] + if md_entry["breadcrumb"] != [] + ] + stream_to_all_catalog_fields[stream_name] = set(fields_from_field_level_md) + + self.run_and_verify_sync(conn_id) + + synced_records = runner.get_records_from_target_output() + + # Verify no unexpected streams were replicated + synced_stream_names = set(synced_records.keys()) + self.assertSetEqual(expected_streams, synced_stream_names) + + for stream in expected_streams: + with self.subTest(stream=stream): + + # expected values + expected_all_keys = stream_to_all_catalog_fields[stream] + expected_automatic_keys = expected_automatic_fields.get(stream, set()) + + # Verify that more than just the automatic fields are replicated for each stream. + self.assertTrue( + expected_automatic_keys.issubset(expected_all_keys), + msg='{} is not in "expected_all_keys"'.format( + expected_automatic_keys - expected_all_keys + ), + ) + + messages = synced_records.get(stream) + # collect actual values + actual_all_keys = set() + + for message in messages["messages"]: + if message["action"] == "upsert": + actual_all_keys.update(message["data"].keys()) + + # verify all fields for each stream are replicated + self.assertSetEqual(expected_all_keys, actual_all_keys) diff --git a/tests/test_braintree_automatic_fields.py b/tests/test_braintree_automatic_fields.py new file mode 100644 index 0000000..4ad41d8 --- /dev/null +++ b/tests/test_braintree_automatic_fields.py @@ -0,0 +1,73 @@ +from tap_tester import runner, connections +from base import BraintreeBaseTest + + +class AutomaticFields(BraintreeBaseTest): + """Test that with no fields selected for a stream automatic fields are still replicated""" + + @staticmethod + def name(): + """Name of test""" + + return "tap_tester_braintree_automatic_fields" + + def test_run(self): + """ + • Verify that for each stream you can get multiple pages of data + when no fields are selected and only the automatic fields are replicated. + + • PREREQUISITE + For EACH stream add enough data that you surpass the limit of a single + fetch of data. For instance if you have a limit of 250 records ensure + that 251 (or more) records have been posted for that stream. + """ + + # Here certain streams are removed as we are not able to generate data for those streams + expected_streams = self.expected_streams() + + # instantiate connection + conn_id = connections.ensure_connection(self) + + # run check mode + found_catalogs = self.run_and_verify_check_mode(conn_id) + + # table and field selection + test_catalogs_automatic_fields = [ + catalog + for catalog in found_catalogs + if catalog.get("stream_name") in expected_streams + ] + + self.perform_and_verify_table_and_field_selection( + conn_id, + test_catalogs_automatic_fields, + select_all_fields=False, + ) + + # run initial sync + record_count_by_stream = self.run_and_verify_sync(conn_id) + synced_records = runner.get_records_from_target_output() + + for stream in expected_streams: + with self.subTest(stream=stream): + + # expected values + expected_keys = self.expected_automatic_fields().get(stream) + + # collect actual values + data = synced_records.get(stream) + keys = data["schema"]["properties"].keys() + actual_keys = set() + for key in keys: + if data["schema"]["properties"][key]["inclusion"] == "automatic": + actual_keys.add(key) + + # Verify that you get some records for each stream + self.assertGreater( + record_count_by_stream.get(stream, -1), + 0, + msg="The number of records is not over the stream max limit", + ) + + # Verify that only the automatic fields are sent to the target + self.assertSetEqual(expected_keys, actual_keys) diff --git a/tests/test_braintree_bookmark.py b/tests/test_braintree_bookmark.py new file mode 100644 index 0000000..2da2b65 --- /dev/null +++ b/tests/test_braintree_bookmark.py @@ -0,0 +1,189 @@ +from tap_tester import menagerie +from tap_tester import connections, runner +from base import BraintreeBaseTest + + +class BookmarkTest(BraintreeBaseTest): + """Test tap sets a bookmark and respects it for the next sync of a stream""" + + @staticmethod + def name(): + """Name of test""" + + return "tap_tester_braintree_bookmark_test" + + def test_run(self): + """ + • Verify that for each stream you can do a sync which records bookmarks. + • Verify that the bookmark is the maximum value sent to the target for the replication key. + • Verify that a second sync respects the bookmark + -> All data of the second sync is >= the bookmark from the first sync + -> The number of records in the 2nd sync is less then the first + + • Verify that for full table stream, all data replicated in sync 1 is replicated again in sync 2. + + -> PREREQUISITE + For EACH stream that is incrementally replicated there are multiple rows of data with + different values for the replication key + """ + + # Here certain streams are removed as we are not able to generate data for those streams + expected_streams = self.expected_streams() + expected_bookmark_keys = self.expected_bookmark_keys() + expected_replication_keys = self.expected_replication_keys() + expected_replication_methods = self.expected_replication_method() + + ########################################################################## + # First Sync + ########################################################################## + conn_id = connections.ensure_connection(self) + + # Run in check mode + found_catalogs = self.run_and_verify_check_mode(conn_id) + + self.perform_and_verify_table_and_field_selection(conn_id, found_catalogs) + + # Run a first sync job using orchestrator + first_sync_record_count = self.run_and_verify_sync(conn_id) + first_sync_records = runner.get_records_from_target_output() + first_sync_bookmarks = menagerie.get_state(conn_id) + + ########################################################################## + # Update State Between Syncs + ########################################################################## + new_states = {"bookmarks": dict()} + simulated_states = self.calculated_states_by_stream(first_sync_bookmarks) + for stream, new_state in simulated_states.items(): + new_states["bookmarks"][stream] = new_state + menagerie.set_state(conn_id, new_states) + + ########################################################################## + # Second Sync + ########################################################################## + + second_sync_record_count = self.run_and_verify_sync(conn_id) + second_sync_records = runner.get_records_from_target_output() + second_sync_bookmarks = menagerie.get_state(conn_id) + + ########################################################################## + # Test By Stream + ########################################################################## + + for stream in expected_streams: + with self.subTest(stream=stream): + + # expected values + expected_replication_method = expected_replication_methods[stream] + + # collect information for assertions from syncs 1 & 2 base on expected values + first_sync_count = first_sync_record_count.get(stream, 0) + second_sync_count = second_sync_record_count.get(stream, 0) + first_sync_messages = [ + record.get("data") + for record in first_sync_records.get(stream, {}).get("messages", []) + if record.get("action") == "upsert" + ] + second_sync_messages = [ + record.get("data") + for record in second_sync_records.get(stream, {}).get( + "messages", [] + ) + if record.get("action") == "upsert" + ] + first_bookmark_key_value = first_sync_bookmarks.get( + "bookmarks", {stream: None} + ).get(stream) + second_bookmark_key_value = second_sync_bookmarks.get( + "bookmarks", {stream: None} + ).get(stream) + + if expected_replication_method == self.INCREMENTAL: + + # collect information specific to incremental streams from syncs 1 & 2 + replication_key = list(expected_replication_keys[stream])[ + 0 + ] # Key in which state has been saved in state file + record_replication_key = list(expected_bookmark_keys[stream])[0] + first_bookmark_value = first_bookmark_key_value.get(replication_key) + second_bookmark_value = second_bookmark_key_value.get( + replication_key + ) + + first_bookmark_value_ts = self.dt_to_ts(first_bookmark_value) + + second_bookmark_value_ts = self.dt_to_ts(second_bookmark_value) + + simulated_bookmark_value = self.dt_to_ts( + new_states["bookmarks"][stream][replication_key] + ) + + # Verify the first sync sets a bookmark of the expected form + self.assertIsNotNone(first_bookmark_key_value) + self.assertIsNotNone(first_bookmark_value) + + self.assertIsNotNone(second_bookmark_key_value) + self.assertIsNotNone(second_bookmark_value) + + # Verify the second sync bookmark is Equal to the first sync bookmark + # assumes no changes to data during test + self.assertEqual(second_bookmark_value, first_bookmark_value) + for record in first_sync_messages: + + # Verify the first sync bookmark value is the max replication key value for a given stream + replication_key_value = self.dt_to_ts( + record.get(record_replication_key) + ) + self.assertLessEqual( + replication_key_value, + first_bookmark_value_ts, + msg="First sync bookmark was set incorrectly, a record with a greater replication-key value was synced.", + ) + + for record in second_sync_messages: + # Verify the second sync replication key value is Greater or Equal to the first sync bookmark + replication_key_value = self.dt_to_ts( + record.get(record_replication_key) + ) + self.assertGreaterEqual( + replication_key_value, + simulated_bookmark_value, + msg="Second sync records do not respect the previous bookmark.", + ) + + # Verify the second sync bookmark value is the max replication key value for a given stream + self.assertLessEqual( + replication_key_value, + second_bookmark_value_ts, + msg="Second sync bookmark was set incorrectly, a record with a greater replication-key value was synced.", + ) + + # verify that you get less data the 2nd time around + self.assertLess( + second_sync_count, + first_sync_count, + msg="second sync didn't have less records, bookmark usage not verified", + ) + + elif expected_replication_method == self.FULL_TABLE: + + # Verify the syncs do not set a bookmark for full table streams + self.assertIsNone(first_bookmark_key_value) + self.assertIsNone(second_bookmark_key_value) + + # Verify the number of records in the second sync is the same as the first + self.assertEqual(second_sync_count, first_sync_count) + + else: + + raise NotImplementedError( + "INVALID EXPECTATIONS\t\tSTREAM: {} REPLICATION_METHOD: {}".format( + stream, expected_replication_method + ) + ) + + # Verify at least 1 record was replicated in the second sync + self.assertGreater( + second_sync_count, + 0, + msg="We are not fully testing bookmarking for {}".format(stream), + ) diff --git a/tests/test_braintree_discovery.py b/tests/test_braintree_discovery.py new file mode 100644 index 0000000..319a182 --- /dev/null +++ b/tests/test_braintree_discovery.py @@ -0,0 +1,142 @@ +import re +from tap_tester import menagerie, connections +from base import BraintreeBaseTest + + +class DiscoveryTest(BraintreeBaseTest): + """Test whether tap discovery mode and metadata conforms to standards or not.""" + + @staticmethod + def name(): + return "tap_tester_braintree_discovery_test" + + def test_run(self): + """ + Testing that discovery creates the appropriate catalog with valid metadata. + • Verify number of actual streams discovered match expected + • Verify the stream names discovered were what we expect + • Verify stream names follow naming convention streams should only have lowercase alphas + and underscores + • verify there is only 1 top level breadcrumb + • verify replication key(s) + • verify primary key(s) + • verify that if there is a replication key we are doing INCREMENTAL replication otherwise FULL TABLE replication + • verify the actual replication matches our expected replication method + • verify that primary, replication and replication keys are given the inclusion of automatic. + • verify that all other fields have inclusion of available metadata. + """ + # Here certain streams are removed as we are not able to generate data for those streams + streams_to_test = self.expected_streams() + + conn_id = connections.ensure_connection(self) + + found_catalogs = self.run_and_verify_check_mode(conn_id) + + # Verify stream names follow naming convention + # streams should only have lowercase alphas and underscores + found_catalog_names = {c["tap_stream_id"] for c in found_catalogs} + self.assertTrue( + all([re.fullmatch(r"[a-z_]+", name) for name in found_catalog_names]), + msg="One or more streams don't follow standard naming", + ) + + for stream in streams_to_test: + with self.subTest(stream=stream): + + # Ensure that the catalog is found for a given stream + catalog = next( + iter( + [ + catalog + for catalog in found_catalogs + if catalog["stream_name"] == stream + ] + ) + ) + self.assertIsNotNone(catalog) + + # collecting expected values + expected_primary_keys = self.expected_primary_keys()[stream] + expected_replication_keys = self.expected_replication_keys()[stream] + expected_automatic_fields = ( + expected_primary_keys | expected_replication_keys + ) + expected_replication_method = self.expected_replication_method()[stream] + + # collecting actual values... + schema_and_metadata = menagerie.get_annotated_schema( + conn_id, catalog["stream_id"] + ) + metadata = schema_and_metadata["metadata"] + stream_properties = [ + item for item in metadata if item.get("breadcrumb") == [] + ] + if len(stream_properties) == 0: + stream_properties.append({}) + actual_primary_keys = set( + stream_properties[0] + .get("metadata", {self.PRIMARY_KEYS: []}) + .get(self.PRIMARY_KEYS, []) + ) + actual_replication_method = ( + stream_properties[0] + .get("metadata", {self.REPLICATION_METHOD: None}) + .get(self.REPLICATION_METHOD) + ) + actual_automatic_fields = set( + item.get("breadcrumb", ["properties", None])[1] + for item in metadata + if item.get("metadata").get("inclusion") == "automatic" + ) + + ########################################################################## + ### metadata assertions + ########################################################################## + + # verify there is only 1 top level breadcrumb in metadata + self.assertTrue( + len(stream_properties) == 1, + msg="There is NOT only one top level breadcrumb for {}".format( + stream + ) + + "\nstream_properties | {}".format(stream_properties), + ) + + # verify primary key(s) match expectations + self.assertSetEqual( + expected_primary_keys, + actual_primary_keys, + ) + + # verify replication key(s) match expectations + # self.assertSetEqual( + # expected_replication_keys, actual_replication_keys, + # ) + + # verify the actual replication method matches our expected replication method + self.assertEqual(expected_replication_method, actual_replication_method) + + # verify that if there is a replication key we are doing INCREMENTAL replication otherwise FULL TABLE replication + if expected_replication_keys is not None: + self.assertEqual(expected_replication_method, "INCREMENTAL") + else: + self.assertEqual(expected_replication_method, "FULL_TABLE") + + # verify that primary keys and replication keys + # are given the inclusion of automatic in metadata. + self.assertSetEqual(expected_automatic_fields, actual_automatic_fields) + + # verify that all other fields have inclusion of available + # This assumes there are no unsupported fields for SaaS sources + self.assertTrue( + all( + { + item.get("metadata").get("inclusion") == "available" + for item in metadata + if item.get("breadcrumb", []) != [] + and item.get("breadcrumb", ["properties", None])[1] + not in actual_automatic_fields + } + ), + msg="Not all non key properties are set to available in metadata", + ) diff --git a/tests/test_braintree_pagination.py b/tests/test_braintree_pagination.py new file mode 100644 index 0000000..43a4629 --- /dev/null +++ b/tests/test_braintree_pagination.py @@ -0,0 +1,73 @@ +from tap_tester import runner, connections +from base import BraintreeBaseTest + + +class PaginationTest(BraintreeBaseTest): + """Test to ensure tap can replicate multiple pages of data for streams that use pagination.""" + + @staticmethod + def name(): + """Name of test""" + + return "tap_tester_braintree_pagination_test" + + def test_run(self): + """ + • Verify records are more than page size i.e. multiple pages of data is present + • Verify that verify there is no duplicate data on a page + • Verify that verify there is no duplicate data between different pages + """ + page_size = 0 # here page_size is taken zero because streams add_ons, discounts, merchant_accounts have only one record for given config + conn_id = connections.ensure_connection(self) + + # Checking pagination for streams with enough data + # Here certain streams are removed as we are not able to generate data for those streams + expected_streams = self.expected_streams() + + found_catalogs = self.run_and_verify_check_mode(conn_id) + + # table and field selection + test_catalogs = [ + catalog + for catalog in found_catalogs + if catalog.get("stream_name") in expected_streams + ] + + self.perform_and_verify_table_and_field_selection(conn_id, test_catalogs) + + record_count_by_stream = self.run_and_verify_sync(conn_id) + + synced_records = runner.get_records_from_target_output() + + for stream in expected_streams: + with self.subTest(stream=stream): + # expected values + expected_primary_keys = self.expected_primary_keys() + + # collect information for assertions from syncs 1 & 2 base on expected values + record_count_sync = record_count_by_stream.get(stream, 0) + primary_keys_list = [ + tuple( + message.get("data").get(expected_pk) + for expected_pk in expected_primary_keys[stream] + ) + for message in synced_records.get(stream).get("messages") + if message.get("action") == "upsert" + ] + + # verify records are more than page size so multiple page is working + self.assertGreater(record_count_sync, page_size) + + primary_keys_list_1 = primary_keys_list[:page_size] + primary_keys_list_2 = primary_keys_list[page_size : 2 * page_size] + + primary_keys_page_1 = set(primary_keys_list_1) + primary_keys_page_2 = set(primary_keys_list_2) + + # Verify by private keys that data is unique for page + self.assertEqual( + len(primary_keys_page_1), page_size + ) # verify there are no dupes on a page + self.assertTrue( + primary_keys_page_1.isdisjoint(primary_keys_page_2) + ) # verify there are no dupes between pages diff --git a/tests/test_braintree_start_date.py b/tests/test_braintree_start_date.py new file mode 100644 index 0000000..5f14f05 --- /dev/null +++ b/tests/test_braintree_start_date.py @@ -0,0 +1,161 @@ +from tap_tester import connections, runner +from base import BraintreeBaseTest + + +class BraintreeStartDateTest(BraintreeBaseTest): + + start_date_1 = "2022-06-19T00:00:00Z" + start_date_2 = "2022-07-20T00:00:00Z" + + @staticmethod + def name(): + return "tap_tester_braintree_start_date_test" + + def test_run(self): + """Instantiate start date according to the desired data set and run the test""" + + self.start_date_1 = self.get_properties().get("start_date") + self.start_date_2 = self.timedelta_formatted(self.start_date_1, days=3) + + start_date_1_epoch = self.dt_to_ts(self.start_date_1) + start_date_2_epoch = self.dt_to_ts(self.start_date_2) + + self.start_date = self.start_date_1 + + expected_streams = self.expected_streams() + + # First Sync + + # instantiate connection + conn_id_1 = connections.ensure_connection(self) + + # run check mode + found_catalogs_1 = self.run_and_verify_check_mode(conn_id_1) + + # table and field selection + test_catalogs_1_all_fields = [ + catalog + for catalog in found_catalogs_1 + if catalog.get("stream_name") in expected_streams + ] + self.perform_and_verify_table_and_field_selection( + conn_id_1, test_catalogs_1_all_fields, select_all_fields=True + ) + + # run initial sync + record_count_by_stream_1 = self.run_and_verify_sync(conn_id_1) + synced_records_1 = runner.get_records_from_target_output() + + # Update START DATE Between Syncs + + print( + "REPLICATION START DATE CHANGE: {} ===>>> {} ".format( + self.start_date, self.start_date_2 + ) + ) + self.start_date = self.start_date_2 + + # Second Sync + + # create a new connection with the new start_date + conn_id_2 = connections.ensure_connection(self, original_properties=False) + + # run check mode + found_catalogs_2 = self.run_and_verify_check_mode(conn_id_2) + + # table and field selection + test_catalogs_2_all_fields = [ + catalog + for catalog in found_catalogs_2 + if catalog.get("stream_name") in expected_streams + ] + self.perform_and_verify_table_and_field_selection( + conn_id_2, test_catalogs_2_all_fields, select_all_fields=True + ) + + # run sync + record_count_by_stream_2 = self.run_and_verify_sync(conn_id_2) + synced_records_2 = runner.get_records_from_target_output() + + # Verify the total number of records replicated in sync 1 is greater than the number + # of records replicated in sync 2 + self.assertGreater( + sum(record_count_by_stream_1.values()), + sum(record_count_by_stream_2.values()), + ) + + for stream in expected_streams: + + # WE ARE NOT ABLE TO GENERATE TEST DATA SO SKIPPING TWO STREAMS(mark_as_spam, dropped_email) + if stream in ["mark_as_spam", "dropped_email"]: + continue + + with self.subTest(stream=stream): + + # expected values + expected_primary_keys = self.expected_primary_keys()[stream] + expected_bookmark_keys = self.expected_bookmark_keys()[stream] + + # collect information for assertions from syncs 1 & 2 base on expected values + record_count_sync_1 = record_count_by_stream_1.get(stream, 0) + record_count_sync_2 = record_count_by_stream_2.get(stream, 0) + primary_keys_list_1 = [ + tuple( + message.get("data").get(expected_pk) + for expected_pk in expected_primary_keys + ) + for message in synced_records_1.get(stream).get("messages") + if message.get("action") == "upsert" + ] + primary_keys_list_2 = [ + tuple( + message.get("data").get(expected_pk) + for expected_pk in expected_primary_keys + ) + for message in synced_records_2.get(stream).get("messages") + if message.get("action") == "upsert" + ] + + primary_keys_sync_1 = set(primary_keys_list_1) + primary_keys_sync_2 = set(primary_keys_list_2) + + if self.is_incremental(stream): + + # Expected bookmark key is one element in set so directly access it + bookmark_keys_list_1 = [ + message.get("data").get(next(iter(expected_bookmark_keys))) + for message in synced_records_1.get(stream).get("messages") + if message.get("action") == "upsert" + ] + bookmark_keys_list_2 = [ + message.get("data").get(next(iter(expected_bookmark_keys))) + for message in synced_records_2.get(stream).get("messages") + if message.get("action") == "upsert" + ] + + bookmark_key_sync_1 = set(bookmark_keys_list_1) + bookmark_key_sync_2 = set(bookmark_keys_list_2) + + # Verify bookmark key values are greater than or equal to start date of sync 1 + for bookmark_key_value in bookmark_key_sync_1: + self.assertGreaterEqual(bookmark_key_value, start_date_1_epoch) + + # Verify bookmark key values are greater than or equal to start date of sync 2 + for bookmark_key_value in bookmark_key_sync_2: + self.assertGreaterEqual(bookmark_key_value, start_date_2_epoch) + + # Verify the number of records replicated in sync 1 is greater than the number + # of records replicated in sync 2 for stream + self.assertGreater(record_count_sync_1, record_count_sync_2) + + # Verify the records replicated in sync 2 were also replicated in sync 1 + self.assertTrue(primary_keys_sync_2.issubset(primary_keys_sync_1)) + + else: + + # Verify that the 2nd sync with a later start date replicates the same number of + # records as the 1st sync. + self.assertEqual(record_count_sync_2, record_count_sync_1) + + # Verify by primary key the same records are replicated in the 1st and 2nd syncs + self.assertSetEqual(primary_keys_sync_1, primary_keys_sync_2)