From 69f8977ed8c2e1eb8920f942d7b8aa546cd0130e Mon Sep 17 00:00:00 2001 From: Daniel Palma Date: Thu, 9 Jun 2022 09:23:41 +0200 Subject: [PATCH] Update paging to cursor based & add Customers stream (#4) * Add Customers Stream * Switch to cursor-based pagination * Add basic tests to verify paging Co-authored-by: Niall Woodward --- .gitignore | 3 + tap_gorgias/client.py | 50 +++++++- tap_gorgias/streams.py | 208 ++++++++++++------------------- tap_gorgias/tap.py | 6 +- tap_gorgias/tests/__init__.py | 0 tap_gorgias/tests/integration.py | 78 ++++++++++++ 6 files changed, 216 insertions(+), 129 deletions(-) create mode 100644 tap_gorgias/tests/__init__.py create mode 100644 tap_gorgias/tests/integration.py diff --git a/.gitignore b/.gitignore index 475019c..4b907a2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ +# IntelliJ local settings +.idea + # Secrets and internal config files **/.secrets/* diff --git a/tap_gorgias/client.py b/tap_gorgias/client.py index b728b33..d65efc9 100644 --- a/tap_gorgias/client.py +++ b/tap_gorgias/client.py @@ -1,9 +1,10 @@ """REST client handling, including GorgiasStream base class.""" import time -from typing import Dict +from typing import Dict, Optional, Any import requests +from singer_sdk.helpers.jsonpath import extract_jsonpath from singer_sdk.streams import RESTStream from singer_sdk.authenticators import BasicAuthenticator @@ -13,6 +14,16 @@ class GorgiasStream(RESTStream): """Gorgias stream class.""" + # Most of the endpoints of the API returning a large number of resources are paginated. + # Cursor-based pagination provides lower latency when listing resources. + # Views use a custom path for the cursor value. + # https://developers.gorgias.com/reference/pagination + next_page_token_jsonpath = "$.meta.next_cursor" + + # Generic jsonpath, a list of resources. E.g: a list of tickets. + # https://developers.gorgias.com/reference/pagination#response-attributes + records_jsonpath = "$.data[*]" + http_headers = {"Accept": "application/json", "Content-Type": "application/json"} _LOG_REQUEST_METRIC_URLS = True @@ -83,3 +94,40 @@ def validate_response(self, response: requests.Response) -> None: f"{response.reason} for path: {self.path}" ) raise RetriableAPIError(msg) + + def get_url_params( + self, context: Optional[dict], next_page_token: Optional[Any] + ) -> Dict[str, Any]: + """Return a dictionary of values to be used in URL parameterization. + + If paging is supported, developers may override with specific paging logic. + + Args: + context: Stream partition or context dictionary. + next_page_token: Token, page number or any request argument to request the + next page of data. + + Returns: + Dictionary of URL query parameters to use in the request. + """ + return {"cursor": next_page_token, "limit": self.config["page_size"]} + + def get_next_page_token( + self, response: requests.Response, previous_token: Optional[Any] + ) -> Any: + """Return token identifying next page or None if all records have been read. + + Args: + response: A raw `requests.Response`_ object. + previous_token: Previous pagination reference. + + Returns: + Reference value to retrieve next page. + + .. _requests.Response: + https://docs.python-requests.org/en/latest/api/#requests.Response + """ + all_matches = extract_jsonpath(self.next_page_token_jsonpath, response.json()) + first_match = next(iter(all_matches), None) + next_page_token = first_match + return next_page_token diff --git a/tap_gorgias/streams.py b/tap_gorgias/streams.py index 17737a1..3d1d2e9 100644 --- a/tap_gorgias/streams.py +++ b/tap_gorgias/streams.py @@ -1,5 +1,5 @@ """Stream type classes for tap-gorgias.""" - +from urllib import parse from datetime import datetime import logging import requests @@ -8,18 +8,37 @@ from tap_gorgias.client import GorgiasStream +logger = logging.getLogger(__name__) + class TicketsStream(GorgiasStream): """Define custom stream.""" name = "tickets" path = "/api/views/{view_id}/items" - records_jsonpath = "$.data[*]" - next_page_token_jsonpath = "$.meta.next_items" primary_keys = ["id"] replication_key = "updated_datetime" is_sorted = True + # Link to the next items, if any. + next_page_token_jsonpath = "$.meta.next_items" + + customer_schema = [ + th.ObjectType( + th.Property("id", th.IntegerType), + th.Property("email", th.StringType), + th.Property("name", th.StringType), + th.Property("first_name", th.StringType), + th.Property("last_name", th.StringType), + th.Property( + "meta", + th.ObjectType( + th.Property("name_set_via", th.StringType), + ), + ), + ) + ] + schema = th.PropertiesList( th.Property( "id", @@ -59,78 +78,15 @@ class TicketsStream(GorgiasStream): ), th.Property( "requester", - th.ObjectType( - th.Property( - "id", - th.IntegerType - ), - th.Property( - "email", - th.StringType - ), - th.Property( - "name", - th.StringType - ), - th.Property( - "firstname", - th.StringType - ), - th.Property( - "lastname", - th.StringType - ), - ) + *customer_schema ), th.Property( "customer", - th.ObjectType( - th.Property( - "id", - th.IntegerType - ), - th.Property( - "email", - th.StringType - ), - th.Property( - "name", - th.StringType - ), - th.Property( - "firstname", - th.StringType - ), - th.Property( - "lastname", - th.StringType - ), - ) + *customer_schema ), th.Property( "assignee_user", - th.ObjectType( - th.Property( - "id", - th.IntegerType - ), - th.Property( - "email", - th.StringType - ), - th.Property( - "name", - th.StringType - ), - th.Property( - "firstname", - th.StringType - ), - th.Property( - "lastname", - th.StringType - ), - ) + *customer_schema ), th.Property( "assignee_team", @@ -264,7 +220,7 @@ class TicketsStream(GorgiasStream): ).to_dict() def prepare_request( - self, context: Optional[dict], next_page_token: Optional[Any] + self, context: Optional[dict], next_page_token: Optional[Any] ) -> requests.PreparedRequest: """Prepare a request object. @@ -282,20 +238,8 @@ def prepare_request( HTTP headers and authenticator. """ http_method = self.rest_method - url: str = "" - params: dict = {} - # The next page token is actually a url path returned - # by the API, so append it to the url base - if not next_page_token: - url = ( - self.get_url(context) + f"?limit={self.config['ticket_view_page_size']}" - ) - else: - url = ( - self.url_base - + next_page_token - + f"&limit={self.config['ticket_view_page_size']}" - ) + params = self.get_url_params(context, next_page_token) + request_data = self.prepare_request_payload(context, next_page_token) headers = self.get_headers() @@ -305,7 +249,7 @@ def prepare_request( self.requests_session.prepare_request( requests.Request( method=http_method, - url=url, + url=self.get_url(context), params=params, headers=headers, json=request_data, @@ -348,7 +292,7 @@ def create_ticket_view(self, sync_start_datetime: datetime) -> int: "filters": f"gte(ticket.updated_datetime, '{sync_start_datetime.isoformat()}')" } ) - logging.info(f"Creating ticket view with parameters {payload}") + logger.info(f"Creating ticket view with parameters {payload}") decorated_request = self.request_decorator(self._request) prepared_request = cast( requests.PreparedRequest, @@ -362,7 +306,7 @@ def create_ticket_view(self, sync_start_datetime: datetime) -> int: ), ) resp = decorated_request(prepared_request, None) - logging.info("View successfully created.") + logger.info("View successfully created.") view_id = resp.json()["id"] return view_id @@ -380,7 +324,7 @@ def delete_ticket_view(self, view_id: int) -> None: ), ) resp = decorated_request(prepared_request, None) - logging.info(f"Deleted ticket view {view_id}") + logger.info(f"Deleted ticket view {view_id}") def get_records(self, context: Optional[dict]) -> Iterable[Dict[str, Any]]: """Return a generator of row-type dictionary objects. @@ -394,7 +338,7 @@ def get_records(self, context: Optional[dict]) -> Iterable[Dict[str, Any]]: One item per (possibly processed) record in the API. """ sync_start_datetime = self.get_starting_timestamp(context) - logging.info(f"Starting timestamp: {sync_start_datetime}") + logger.info(f"Starting timestamp: {sync_start_datetime}") view_id = self.create_ticket_view(sync_start_datetime) context = context or {} context["view_id"] = view_id @@ -413,47 +357,23 @@ def get_child_context(self, record: dict, context: Optional[dict]) -> dict: """Return the ticket_id for use by child streams.""" return {"ticket_id": record["id"]} - -class PaginatedGorgiasStream(GorgiasStream): def get_url_params( - self, context: Optional[dict], next_page_token: Optional[Any] + self, context: Optional[dict], next_page_token: Optional[Any] ) -> Dict[str, Any]: - """Return a dictionary of values to be used in URL parameterization. - - If paging is supported, developers may override with specific paging logic. + """Return the URL parameters for the request. - Args: - context: Stream partition or context dictionary. - next_page_token: Token, page number or any request argument to request the - next page of data. - - Returns: - Dictionary of URL query parameters to use in the request. - """ - return {"page": next_page_token} + For the Tickets stream, the next cursor is returned in a querystring parameter under the path $.meta.next_items + so here we parse the whole url query string in order to extract the cursor. - def get_next_page_token( - self, response: requests.Response, previous_token: Optional[Any] - ) -> Any: - """Return token identifying next page or None if all records have been read. - - Args: - response: A raw `requests.Response`_ object. - previous_token: Previous pagination reference. - - Returns: - Reference value to retrieve next page. - - .. _requests.Response: - https://docs.python-requests.org/en/latest/api/#requests.Response """ - num_pages = response.json()["meta"]["nb_pages"] - page = response.json()["meta"]["page"] - if num_pages > page: - return page + 1 + next_page_url_query = parse.parse_qs(next_page_token) + if not next_page_url_query: + return {"limit": self.config["page_size"]} + else: + return {"limit": self.config["page_size"], "cursor": next_page_url_query["cursor"][0], "direction": "next"} -class MessagesStream(PaginatedGorgiasStream): +class MessagesStream(GorgiasStream): """Messages stream. Uses tickets as a parent stream. Consequently, only retrieves @@ -465,7 +385,6 @@ class MessagesStream(PaginatedGorgiasStream): name = "messages" parent_stream_type = TicketsStream path = "/api/tickets/{ticket_id}/messages" - records_jsonpath = "$.data[*]" primary_keys = ["id"] state_partitioning_keys = [] @@ -654,7 +573,7 @@ class MessagesStream(PaginatedGorgiasStream): ).to_dict() -class SatisfactionSurveysStream(PaginatedGorgiasStream): +class SatisfactionSurveysStream(GorgiasStream): """Satisfaction surveys. The satisfaction survey API endpoint does not allow any filtering or @@ -669,7 +588,7 @@ class SatisfactionSurveysStream(PaginatedGorgiasStream): name = "satisfaction_surveys" path = "/api/satisfaction-surveys" - records_jsonpath = "$.data[*]" + primary_keys = ["id"] schema = th.PropertiesList( th.Property( @@ -713,3 +632,40 @@ class SatisfactionSurveysStream(PaginatedGorgiasStream): th.StringType ) ).to_dict() + + +class CustomersStream(GorgiasStream): + """Customers. + + The customers API endpoint does not allow any filtering or + custom ordering of the results, only on created datetime. + This has to be run as a full refresh for each extraction, due to the + inability to filter and lack of ordering by updated_datetime.. + https://developers.gorgias.com/reference/get_api-customers + """ + + name = "customers" + path = "/api/customers" + primary_keys = ["id"] + + schema = th.PropertiesList( + th.Property("id", th.IntegerType), + th.Property("created_datetime", th.DateTimeType), + th.Property("email", th.StringType), + th.Property("external_id", th.StringType), + th.Property("firstname", th.StringType), + th.Property("language", th.StringType), + th.Property("lastname", th.StringType), + th.Property("name", th.StringType), + th.Property("timezone", th.StringType), + th.Property("updated_datetime", th.DateTimeType), + th.Property("note", th.StringType), + th.Property("active", th.BooleanType), + th.Property( + "meta", + th.ObjectType( + th.Property("name_set_via", th.StringType), + ), + ), + th.Property("error", th.StringType), + ).to_dict() diff --git a/tap_gorgias/tap.py b/tap_gorgias/tap.py index 08da708..5adf91c 100644 --- a/tap_gorgias/tap.py +++ b/tap_gorgias/tap.py @@ -9,12 +9,14 @@ TicketsStream, MessagesStream, SatisfactionSurveysStream, + CustomersStream ) STREAM_TYPES = [ TicketsStream, MessagesStream, SatisfactionSurveysStream, + CustomersStream ] @@ -48,10 +50,10 @@ class TapGorgias(Tap): description="The earliest record date to sync", ), th.Property( - "ticket_view_page_size", + "page_size", th.IntegerType, default=100, - description="The page size for each list view items call", + description="The page size for each list endpoint call", ), ).to_dict() diff --git a/tap_gorgias/tests/__init__.py b/tap_gorgias/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tap_gorgias/tests/integration.py b/tap_gorgias/tests/integration.py new file mode 100644 index 0000000..6159e5a --- /dev/null +++ b/tap_gorgias/tests/integration.py @@ -0,0 +1,78 @@ +"""Tests standard tap features using the built-in SDK tests library.""" + +import datetime +import io +import os +from contextlib import redirect_stderr, redirect_stdout + +import singer +from singer_sdk.testing import get_standard_tap_tests + +from tap_gorgias.tap import TapGorgias + +SAMPLE_CONFIG = { + "start_date": datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d"), + "subdomain": os.getenv("SUBDOMAIN", default=None), + "email_address": os.getenv("EMAIL_ADDRESS", default=None), + "api_key": os.getenv("API_KEY", default=None), +} + +# Edit these values to match your environment: +EXPECTED_RECORD_COUNT = 363 +PAGE_SIZE = 50 + + +# Run standard built-in tap tests from the SDK: +def test_standard_tap_tests(): + """Run standard tap tests from the SDK.""" + + tests = get_standard_tap_tests( + TapGorgias, + config=SAMPLE_CONFIG + ) + for test in tests: + test() + + +def get_all_records(): + tap = TapGorgias(config={**SAMPLE_CONFIG, "page_size": PAGE_SIZE}, parse_env_config=True) + + page_count = 0 + + def counter(fn): + def inner(*args, **kwargs): + nonlocal page_count + page_count += 1 + return fn(*args, **kwargs) + + return inner + + stdout_buf = io.StringIO() + stderr_buf = io.StringIO() + with redirect_stdout(stdout_buf), redirect_stderr(stderr_buf): + streams = tap.load_streams() + for stream in streams: + if stream.tap_stream_id == "satisfaction_surveys": + stream.prepare_request = counter(stream.prepare_request) + stream.sync() + stdout_buf.seek(0) + stderr_buf.seek(0) + + record_count = 0 + for message in stdout_buf: + o = singer.parse_message(message).asdict() + if o['type'] == 'RECORD': + record_count += 1 + return record_count, page_count + + +def test_if_getting_all_records(): + """Test if we get All Record from the Satisfaction Surveys endpoint.""" + all_record_count, page_count = get_all_records() + assert all_record_count == EXPECTED_RECORD_COUNT + + +def test_paging(): + """Test if we get every page from the Satisfaction Surveys endpoint.""" + all_record_count, page_count = get_all_records() + assert all_record_count // PAGE_SIZE + 1 == page_count