Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create a filter safeguard in case of a batch fails because malformed query #34

Open
wants to merge 1 commit into
base: feature/hgi-4408
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions tap_salesforce/salesforce/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ def __init__(self,
self.version = api_version or "41.0"
self.data_url = "{}/services/data/v" + self.version + "/{}"
self.pk_chunking = False
self.filters_tried = []

# validate start_date
singer_utils.strptime(default_start_date)
Expand Down Expand Up @@ -409,25 +410,41 @@ def _build_query_string(self, catalog_entry, start_date, end_date=None, order_by
query = "SELECT {} FROM {}".format(",".join(selected_properties), catalog_entry['stream'])

catalog_metadata = metadata.to_map(catalog_entry['metadata'])
replication_key = catalog_metadata.get((), {}).get('replication-key')

if replication_key:
valid_filtering_replication_key = self.get_valid_filtering_replication_key(catalog_metadata)
if valid_filtering_replication_key:
where_clause = " WHERE {} > {} ".format(
replication_key,
valid_filtering_replication_key,
start_date)
if end_date:
end_date_clause = " AND {} < {}".format(replication_key, end_date)
end_date_clause = " AND {} < {}".format(valid_filtering_replication_key, end_date)
else:
end_date_clause = ""

order_by = " ORDER BY {} ASC".format(replication_key)
if order_by_clause:
order_by = " ORDER BY {} ASC".format(valid_filtering_replication_key)
return query + where_clause + end_date_clause + order_by

return query + where_clause + end_date_clause
else:
return query

def filtering_can_be_done_with(self, replication_key):
if replication_key in self.filters_tried:
return False
self.filters_tried.append(replication_key)
return True

def get_valid_filtering_replication_key(self, catalog_metadata):
replication_key = None
base_catalog_metadata = catalog_metadata.get((), {})
if 'replication-key' in base_catalog_metadata and self.filtering_can_be_done_with(base_catalog_metadata['replication-key']):
return base_catalog_metadata['replication-key']
if 'valid-replication-keys' in base_catalog_metadata:
for possible_replication_key in base_catalog_metadata['valid-replication-keys']:
if self.filtering_can_be_done_with(possible_replication_key):
return possible_replication_key
return replication_key

def query(self, catalog_entry, state, query_override=None):
if state["bookmarks"].get("ListView"):
if state["bookmarks"]["ListView"].get("SystemModstamp"):
Expand Down
8 changes: 7 additions & 1 deletion tap_salesforce/salesforce/bulk.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
PK_CHUNKED_BATCH_STATUS_POLLING_SLEEP = 60
ITER_CHUNK_SIZE = 1024
DEFAULT_CHUNK_SIZE = 100000 # Max is 250000
MAX_RETRIES = 3

LOGGER = singer.get_logger()

Expand Down Expand Up @@ -142,8 +143,13 @@ def _bulk_query(self, catalog_entry, state):
LOGGER.info("Finished syncing batch %s. Removing batch from state.", completed_batch_id)
LOGGER.info("Batches to go: %d", len(state['bookmarks'][catalog_entry['tap_stream_id']]["BatchIDs"]))
singer.write_state(state)
elif "MALFORMED_QUERY" in batch_status['stateMessage'] and len(self.sf.filters_tried) < MAX_RETRIES:
LOGGER.warning(f"{batch_status['stateMessage']}")
LOGGER.warning(f"Trying again ...")
for record in self._bulk_query(catalog_entry, state):
yield record
else:
raise TapSalesforceException(batch_status['stateMessage'])
raise TapSalesforceException(batch_status['stateMessage'] + f" [Filters tried] {self.sf.filters_tried}")
else:
for result in self.get_batch_results(job_id, batch_id, catalog_entry):
yield result
Expand Down