Skip to content

Commit

Permalink
TDL-20724 : Use Incremental Export endpoint for Users stream (#127)
Browse files Browse the repository at this point in the history
* Added code for Incremental export API for Users stream

* removed unused import

* Fixed the expected keys for all fields test.
  • Loading branch information
shantanu73 authored May 25, 2023
1 parent 78872f1 commit 63dd6af
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 134 deletions.
5 changes: 4 additions & 1 deletion tap_zendesk/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,10 @@ def get_incremental_export(url, access_token, request_timeout, start_time):
'Authorization': 'Bearer {}'.format(access_token),
}

params = {'start_time': start_time.timestamp()}
params = {'start_time': start_time}

if not isinstance(start_time, int):
params = {'start_time': start_time.timestamp()}

response = call_api(url, request_timeout, params=params, headers=headers)
response_json = response.json()
Expand Down
74 changes: 12 additions & 62 deletions tap_zendesk/streams.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import os
import json
import datetime
import time
import pytz
import zenpy
import singer
Expand Down Expand Up @@ -32,8 +31,6 @@
'checkbox': 'boolean',
}

DEFAULT_SEARCH_WINDOW_SIZE = (60 * 60 * 24) * 30 # defined in seconds, default to a month (30 days)

def get_abs_path(path):
return os.path.join(os.path.dirname(os.path.realpath(__file__)), path)

Expand Down Expand Up @@ -218,10 +215,12 @@ def check_access(self):
start_time = datetime.datetime.utcnow().strftime(START_DATE_FORMAT)
self.client.organizations.incremental(start_time=start_time)

class Users(Stream):
class Users(CursorBasedExportStream):
name = "users"
replication_method = "INCREMENTAL"
replication_key = "updated_at"
item_key = "users"
endpoint = "https://{}.zendesk.com/api/v2/incremental/users/cursor.json"

def _add_custom_fields(self, schema):
try:
Expand All @@ -235,65 +234,16 @@ def _add_custom_fields(self, schema):
return schema

def sync(self, state):
original_search_window_size = int(self.config.get('search_window_size', DEFAULT_SEARCH_WINDOW_SIZE))
search_window_size = original_search_window_size
bookmark = self.get_bookmark(state)
start = bookmark - datetime.timedelta(seconds=1)
end = start + datetime.timedelta(seconds=search_window_size)
sync_end = singer.utils.now() - datetime.timedelta(minutes=1)
parsed_sync_end = singer.strftime(sync_end, "%Y-%m-%dT%H:%M:%SZ")

# ASSUMPTION: updated_at value always comes back in utc
num_retries = 0
while start < sync_end:
parsed_start = singer.strftime(start, "%Y-%m-%dT%H:%M:%SZ")
parsed_end = min(singer.strftime(end, "%Y-%m-%dT%H:%M:%SZ"), parsed_sync_end)
LOGGER.info("Querying for users with window of exclusive boundaries between %s and %s", parsed_start, parsed_end)
users = self.client.search("", updated_after=parsed_start, updated_before=parsed_end, type="user")

# NB: Zendesk will return an error on the 1001st record, so we
# need to check total response size before iterating
# See: https://develop.zendesk.com/hc/en-us/articles/360022563994--BREAKING-New-Search-API-Result-Limits
if users.count > 1000:
# To avoid infinite loop behavior we should reduce the window if it is greater than 2
if search_window_size > 2:
search_window_size = search_window_size // 2
end = start + datetime.timedelta(seconds=search_window_size)
LOGGER.info("users - Detected Search API response size too large. Cutting search window in half to %s seconds.", search_window_size)
continue

raise Exception("users - Unable to get all users within minimum window of a single second ({}), found {} users within this timestamp. Zendesk can only provide a maximum of 1000 users per request. See: https://develop.zendesk.com/hc/en-us/articles/360022563994--BREAKING-New-Search-API-Result-Limits".format(datetime.datetime.strftime(datetime.datetime.strptime(parsed_start, "%Y-%m-%dT%H:%M:%SZ") + datetime.timedelta(seconds=1), "%Y-%m-%dT%H:%M:%SZ"), users.count))

# Consume the records to account for dates lower than window start
users = [user for user in users] # pylint: disable=unnecessary-comprehension

if not all(parsed_start <= user.updated_at for user in users):
# Only retry up to 30 minutes (60 attempts at 30 seconds each)
if num_retries < 60:
LOGGER.info("users - Record found before date window start. Waiting 30 seconds, then retrying window for consistency. (Retry #%s)", num_retries + 1)
time.sleep(30)
num_retries += 1
continue
bad_users = [user for user in users if user.updated_at < parsed_start]
raise AssertionError("users - Record (user-id: {}) found before date window start and did not resolve after 30 minutes of retrying. Details: window start ({}) is not less than or equal to updated_at value(s) {}".format(
[user.id for user in bad_users],
parsed_start,
[str(user.updated_at) for user in bad_users]))

# If we make it here, all quality checks have passed. Reset retry count.
num_retries = 0
for user in users:
if parsed_start <= user.updated_at <= parsed_end:
yield (self.stream, user)
self.update_bookmark(state, parsed_end)

# Assumes that the for loop got everything
singer.write_state(state)
if search_window_size <= original_search_window_size // 2:
search_window_size = search_window_size * 2
LOGGER.info("Successfully requested records. Doubling search window to %s seconds", search_window_size)
start = end - datetime.timedelta(seconds=1)
end = start + datetime.timedelta(seconds=search_window_size)
epoch_bookmark = int(bookmark.timestamp())
users = self.get_objects(epoch_bookmark)

for user in users:
self.update_bookmark(state, user["updated_at"])
yield (self.stream, user)

singer.write_state(state)


def check_access(self):
'''
Expand Down
2 changes: 1 addition & 1 deletion test/test_all_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def test_run(self):
if stream == "ticket_fields":
expected_all_keys = expected_all_keys - {'system_field_options', 'sub_type_id'}
elif stream == "users":
expected_all_keys = expected_all_keys - {'permanently_deleted'}
expected_all_keys = expected_all_keys - {'chat_only'}
elif stream == "ticket_metrics":
expected_all_keys = expected_all_keys - {'status', 'instance_id', 'metric', 'type', 'time'}

Expand Down
70 changes: 0 additions & 70 deletions test/unittests/test_user_infinite_loop.py

This file was deleted.

0 comments on commit 63dd6af

Please sign in to comment.