Skip to content

Commit

Permalink
Add another batch key chunk try with a smaller chunk size
Browse files Browse the repository at this point in the history
  • Loading branch information
butkeraites-hotglue committed Oct 7, 2024
1 parent ab6a26b commit 08e07f4
Showing 1 changed file with 59 additions and 46 deletions.
105 changes: 59 additions & 46 deletions tap_salesforce/salesforce/bulk.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
BATCH_STATUS_POLLING_SLEEP = 20
PK_CHUNKED_BATCH_STATUS_POLLING_SLEEP = 60
ITER_CHUNK_SIZE = 1024
DEFAULT_CHUNK_SIZE = 100000 # Max is 250000
DEFAULT_CHUNK_SIZE = 250000 # Max is 250000
FALLBACK_CHUNK_SIZE = 2000

LOGGER = singer.get_logger()

Expand Down Expand Up @@ -113,49 +114,60 @@ def _can_pk_chunk_job(self, failure_message): # pylint: disable=no-self-use
"Retried more than 15 times" in failure_message or \
"Failed to write query result" in failure_message

def try_bulking_with_pk_chunking(self, catalog_entry, state, use_fall_back_chunk_size=False):
start_date = self.sf.get_start_date(state, catalog_entry)
batch_status = self._bulk_query_with_pk_chunking(catalog_entry, start_date, use_fall_back_chunk_size)
job_id = batch_status['job_id']
self.sf.pk_chunking = True
# Write job ID and batch state for resumption
tap_stream_id = catalog_entry['tap_stream_id']
self.tap_stream_id = tap_stream_id
state = singer.write_bookmark(state, tap_stream_id, 'JobID', job_id)
state = singer.write_bookmark(state, tap_stream_id, 'BatchIDs', batch_status['completed'][:])

# Parallelize the batch result processing
with ThreadPoolExecutor() as executor:
futures = [
executor.submit(self.process_batch, job_id, completed_batch_id, catalog_entry, state)
for completed_batch_id in batch_status['completed']
]

# Process the results as they complete
for future in futures:
for result in future.result():
yield result

def _bulk_query(self, catalog_entry, state):
try:
start_date = self.sf.get_start_date(state, catalog_entry)
batch_status = self._bulk_query_with_pk_chunking(catalog_entry, start_date)
job_id = batch_status['job_id']
self.sf.pk_chunking = True
# Write job ID and batch state for resumption
tap_stream_id = catalog_entry['tap_stream_id']
state = singer.write_bookmark(state, tap_stream_id, 'JobID', job_id)
state = singer.write_bookmark(state, tap_stream_id, 'BatchIDs', batch_status['completed'][:])

# Parallelize the batch result processing
with ThreadPoolExecutor() as executor:
futures = [
executor.submit(self.process_batch, job_id, completed_batch_id, catalog_entry, state)
for completed_batch_id in batch_status['completed']
]

# Process the results as they complete
for future in futures:
for result in future.result():
yield result
yield from self.try_bulking_with_pk_chunking(catalog_entry, state)
except Exception as e:
if job_id in self.closed_jobs:
LOGGER.info(f"Another batch failed before. Ignoring this new job...")
pass
LOGGER.info(f"PK Chunking failled on job {job_id}. Trying without it...")
self._close_job(job_id)

job_id = self._create_job(catalog_entry)
start_date = self.sf.get_start_date(state, catalog_entry)
self.sf.pk_chunking = False

batch_id = self._add_batch(catalog_entry, job_id, start_date)

self._close_job(job_id)

batch_status = self._poll_on_batch_status(job_id, batch_id)
if batch_status['state'] == 'Failed':
raise TapSalesforceException(batch_status['stateMessage'])
else:
for result in self.get_batch_results(job_id, batch_id, catalog_entry):
yield result
try:
yield from self.try_bulking_with_pk_chunking(catalog_entry, state, True)
except Exception as e:
if job_id in self.closed_jobs:
LOGGER.info(f"Another batch failed before. Ignoring this new job...")
pass
LOGGER.info(f"PK Chunking failled on job {job_id}. Trying without it...")
self._close_job(job_id)

if hasattr(self,"tap_stream_id"):
with open("streams_pk_chunking_failing.txt", "a") as file:
file.write(self.tap_stream_id + "\n") # Append data with a newline character

job_id = self._create_job(catalog_entry)
start_date = self.sf.get_start_date(state, catalog_entry)
self.sf.pk_chunking = False

batch_id = self._add_batch(catalog_entry, job_id, start_date)

self._close_job(job_id)

batch_status = self._poll_on_batch_status(job_id, batch_id)
if batch_status['state'] == 'Failed':
raise TapSalesforceException(batch_status['stateMessage'])
else:
for result in self.get_batch_results(job_id, batch_id, catalog_entry):
yield result

def process_batch(self, job_id, batch_id, catalog_entry, state):
"""Process a single batch and yield results."""
Expand All @@ -168,11 +180,11 @@ def process_batch(self, job_id, batch_id, catalog_entry, state):
LOGGER.info("Batches to go: %d", len(state['bookmarks'][catalog_entry['tap_stream_id']]["BatchIDs"]))
singer.write_state(state)

def _bulk_query_with_pk_chunking(self, catalog_entry, start_date):
def _bulk_query_with_pk_chunking(self, catalog_entry, start_date, use_fall_back_chunk_size=False):
LOGGER.info("Trying Bulk Query with PK Chunking")

# Create a new job
job_id = self._create_job(catalog_entry, True)
job_id = self._create_job(catalog_entry, True, use_fall_back_chunk_size)

self._add_batch(catalog_entry, job_id, start_date, False)

Expand All @@ -187,7 +199,7 @@ def _bulk_query_with_pk_chunking(self, catalog_entry, start_date):

return batch_status

def _create_job(self, catalog_entry, pk_chunking=False):
def _create_job(self, catalog_entry, pk_chunking=False, use_fall_back_chunk_size=False):
url = self.bulk_url.format(self.sf.instance_url, "job")
body = {"operation": "queryAll", "object": catalog_entry['stream'], "contentType": "CSV"}

Expand All @@ -196,8 +208,9 @@ def _create_job(self, catalog_entry, pk_chunking=False):

if pk_chunking:
LOGGER.info("ADDING PK CHUNKING HEADER")

headers['Sforce-Enable-PKChunking'] = "true; chunkSize={}".format(DEFAULT_CHUNK_SIZE)
chunk_size = DEFAULT_CHUNK_SIZE if not use_fall_back_chunk_size else FALLBACK_CHUNK_SIZE
headers['Sforce-Enable-PKChunking'] = "true; chunkSize={}".format(chunk_size)
LOGGER.info(f"[use_fall_back_chunk_size:{use_fall_back_chunk_size}] HEADERS: {headers}")

# If the stream ends with 'CleanInfo' or 'History', we can PK Chunk on the object's parent
if any(catalog_entry['stream'].endswith(suffix) for suffix in ["CleanInfo", "History"]):
Expand Down

0 comments on commit 08e07f4

Please sign in to comment.