Skip to content

Commit

Permalink
bookmark after each page of conversation records
Browse files Browse the repository at this point in the history
  • Loading branch information
dsprayberry committed Jan 8, 2024
1 parent 9edf65f commit cb8dc00
Showing 1 changed file with 5 additions and 4 deletions.
9 changes: 5 additions & 4 deletions tap_intercom/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,10 @@ def sync(self,
counter.increment()
max_datetime = max(record_datetime, max_datetime)

# Sync child stream, if the child is selected and if we have records greater than the child stream bookmark
if has_child and is_child_selected and (record[self.replication_key] >= child_bookmark_ts):
state = child_stream_obj.sync_substream(record.get('id'), child_schema, child_metadata, record[self.replication_key], state)

if self.to_write_intermediate_bookmark and record_counter == MAX_PAGE_SIZE:
# Write bookmark and state after every page of records
state = singer.write_bookmark(state,
Expand All @@ -238,10 +242,6 @@ def sync(self,
# Reset counter
record_counter = 0

# Sync child stream, if the child is selected and if we have records greater than the child stream bookmark
if has_child and is_child_selected and (record[self.replication_key] >= child_bookmark_ts):
state = child_stream_obj.sync_substream(record.get('id'), child_schema, child_metadata, record[self.replication_key], state)

bookmark_date = singer.utils.strftime(max_datetime)
LOGGER.info("FINISHED Syncing: {}, total_records: {}.".format(self.tap_stream_id, counter.value))

Expand Down Expand Up @@ -507,6 +507,7 @@ class Conversations(IncrementalStream):
data_key = 'conversations'
per_page = MAX_PAGE_SIZE
child = 'conversation_parts'
to_write_intermediate_bookmark = True

def get_records(self, bookmark_datetime=None, is_parent=False, stream_metadata=None) -> Iterator[list]:
paging = True
Expand Down

0 comments on commit cb8dc00

Please sign in to comment.