From 69a131cd796d2af4425e6d5b8d7f55d211939cc7 Mon Sep 17 00:00:00 2001 From: kellymondor Date: Tue, 23 Aug 2022 16:30:15 -0600 Subject: [PATCH] Fix bug where LinkedIn api returns less than expected but still has more leads to go through --- catalog.json | 471 ++++++++++++++++++++++++++ tap_linkedin/__init__.py | 10 +- tap_linkedin/client.py | 12 +- tap_linkedin/filter_criteria.py | 34 +- tap_linkedin/streams/people_stream.py | 17 +- tap_linkedin/sync.py | 44 +-- tap_linkedin/utils.py | 10 + 7 files changed, 537 insertions(+), 61 deletions(-) create mode 100644 tap_linkedin/utils.py diff --git a/catalog.json b/catalog.json index f993d44..1bc3952 100644 --- a/catalog.json +++ b/catalog.json @@ -586,4 +586,475 @@ ] } ] +}{ + "streams": [ + { + "tap_stream_id": "companies", + "replication_key": "id", + "key_properties": [ + "id" + ], + "schema": { + "properties": { + "id": { + "type": [ + "null", + "integer" + ] + }, + "name": { + "type": [ + "null", + "string" + ] + }, + "website": { + "type": [ + "null", + "string" + ] + }, + "location": { + "type": [ + "null", + "string" + ] + }, + "headquarters": { + "properties": { + "country": { + "type": [ + "null", + "string" + ] + }, + "geographicArea": { + "type": [ + "null", + "string" + ] + }, + "city": { + "type": [ + "null", + "string" + ] + }, + "postalCode": { + "type": [ + "null", + "string" + ] + }, + "line1": { + "type": [ + "null", + "string" + ] + }, + "line2": { + "type": [ + "null", + "string" + ] + } + }, + "type": [ + "null", + "object" + ] + }, + "description": { + "type": [ + "null", + "string" + ] + }, + "industry": { + "type": [ + "null", + "string" + ] + }, + "employeeCount": { + "type": [ + "null", + "integer" + ] + }, + "employeeCountRange": { + "type": [ + "null", + "string" + ] + }, + "revenue": { + "properties": { + "amount": { + "type": [ + "null", + "string" + ] + }, + "currencyCode": { + "type": [ + "null", + "string" + ] + } + }, + "type": [ + "null", + "object" + ] + }, + "formattedRevenue": { + "properties": { + "amount": { + "type": [ + "null", + "integer" + ] + }, + "unit": { + "type": [ + "null", + "string" + ] + }, + "currencyCode": { + "type": [ + "null", + "string" + ] + } + }, + "type": [ + "null", + "object" + ] + }, + "flagshipCompanyUrl": { + "type": [ + "null", + "string" + ] + }, + "employeesSearchPageUrl": { + "type": [ + "null", + "string" + ] + }, + "type": { + "type": [ + "null", + "string" + ] + }, + "yearFounded": { + "type": [ + "null", + "integer" + ] + } + }, + "type": [ + "null", + "object" + ], + "additionalProperties": true + }, + "stream": "companies", + "metadata": [ + { + "metadata": { + "selected": null + }, + "breadcrumb": [] + } + ] + }, + { + "tap_stream_id": "people", + "replication_key": "start", + "key_properties": [ + "id" + ], + "schema": { + "properties": { + "id": { + "type": [ + "null", + "integer" + ] + }, + "start": { + "type": [ + "null", + "integer" + ] + }, + "firstName": { + "type": [ + "null", + "string" + ] + }, + "lastName": { + "type": [ + "null", + "string" + ] + }, + "fullName": { + "type": [ + "null", + "string" + ] + }, + "geoRegion": { + "type": [ + "null", + "string" + ] + }, + "searchRegion": { + "type": [ + "null", + "string" + ] + }, + "entityUrn": { + "type": [ + "null", + "string" + ] + }, + "summary": { + "type": [ + "null", + "string" + ] + }, + "currentPositions": { + "items": { + "properties": { + "companyName": { + "type": [ + "null", + "string" + ] + }, + "title": { + "type": [ + "null", + "string" + ] + }, + "companyUrn": { + "type": [ + "null", + "string" + ] + }, + "description": { + "type": [ + "null", + "string" + ] + }, + "tenureAtPosition": { + "properties": { + "numYears": { + "type": [ + "null", + "integer" + ] + }, + "numMonths": { + "type": [ + "null", + "integer" + ] + } + }, + "type": [ + "null", + "object" + ] + }, + "tenureAtCompany": { + "properties": { + "numYears": { + "type": [ + "null", + "integer" + ] + }, + "numMonths": { + "type": [ + "null", + "integer" + ] + } + }, + "type": [ + "null", + "object" + ] + }, + "startedOn": { + "properties": { + "month": { + "type": [ + "null", + "integer" + ] + }, + "year": { + "type": [ + "null", + "integer" + ] + } + }, + "type": [ + "null", + "object" + ] + }, + "companyUrnResolutionResult": { + "properties": { + "entityUrn": { + "type": [ + "null", + "string" + ] + }, + "name": { + "type": [ + "null", + "string" + ] + }, + "industry": { + "type": [ + "null", + "string" + ] + }, + "location": { + "type": [ + "null", + "string" + ] + } + }, + "type": [ + "null", + "object" + ] + } + }, + "type": [ + "null", + "object" + ] + }, + "type": [ + "null", + "array" + ] + }, + "pastPositions": { + "items": { + "properties": { + "endedOn": { + "properties": { + "month": { + "type": [ + "null", + "integer" + ] + }, + "year": { + "type": [ + "null", + "integer" + ] + } + }, + "type": [ + "null", + "object" + ] + }, + "startedOn": { + "properties": { + "month": { + "type": [ + "null", + "integer" + ] + }, + "year": { + "type": [ + "null", + "integer" + ] + } + }, + "type": [ + "null", + "object" + ] + }, + "companyName": { + "type": [ + "null", + "string" + ] + }, + "title": { + "type": [ + "null", + "string" + ] + }, + "companyUrn": { + "type": [ + "null", + "string" + ] + } + }, + "type": [ + "null", + "object" + ] + }, + "type": [ + "null", + "array" + ] + } + }, + "type": [ + "null", + "object" + ], + "additionalProperties": true + }, + "stream": "people", + "metadata": [ + { + "metadata": { + "selected": null + }, + "breadcrumb": [] + } + ] + } + ] } \ No newline at end of file diff --git a/tap_linkedin/__init__.py b/tap_linkedin/__init__.py index 5dde87e..fe1ae6d 100644 --- a/tap_linkedin/__init__.py +++ b/tap_linkedin/__init__.py @@ -1,7 +1,5 @@ -import os -import json import singer -from singer import utils + from tap_linkedin.client import LinkedInClient from tap_linkedin.discover import discover from tap_linkedin.sync import sync @@ -16,10 +14,10 @@ LOGGER = singer.get_logger() -@utils.handle_top_exception(LOGGER) +@singer.utils.handle_top_exception(LOGGER) def main(): # Parse command line arguments - args = utils.parse_args(REQUIRED_CONFIG_KEYS) + args = singer.utils.parse_args(REQUIRED_CONFIG_KEYS) Context.state = args.state @@ -35,7 +33,7 @@ def main(): else: catalog = discover(client, args.config) - sync(client, args.config) + sync(client) if __name__ == "__main__": main() \ No newline at end of file diff --git a/tap_linkedin/client.py b/tap_linkedin/client.py index d264657..e7ff97a 100644 --- a/tap_linkedin/client.py +++ b/tap_linkedin/client.py @@ -3,7 +3,6 @@ from tap_linkedin.exceptions import raise_for_error, LinkedInError, ReadTimeoutError, Server5xxError, LinkedInTooManyRequestsError import backoff -import json import singer LOGGER = singer.get_logger() @@ -32,11 +31,10 @@ def __enter__(self): return self - def __exit__(self, exception_type, exception_value, traceback): + def __exit__(self): self.__session.close() def __headers(self): - headers = {} headers["accept"] = "*/*" headers["accept-language"] = "en-US,en;q=0.9" @@ -57,15 +55,14 @@ def __headers(self): return headers - def get_company_url(self, linkedinId): - - url = f"{self.BASE_URL}/{self.COMPANY_URL_PREFIX}/{linkedinId}?{self.COMPANY_URL_SUFFIX}" + def get_company_url(self, linkedin_id): + url = f"{self.BASE_URL}/{self.COMPANY_URL_PREFIX}/{linkedin_id}?{self.COMPANY_URL_SUFFIX}" return url def get_people_search_url(self, company_size, region, years_of_experience, tenure): - filters = f"filters:List((type:COMPANY_HEADCOUNT,values:List((id:{company_size}))),(type:COMPANY_HEADQUARTERS,values:List((id:{region}))),(type:YEARS_AT_CURRENT_COMPANY,values:List((id:{tenure}))),(type:YEARS_OF_EXPERIENCE,values:List((id:{years_of_experience})))),keywords:{self.keyword})" + filters = f"filters:List((type:COMPANY_HEADCOUNT,values:List((id:{company_size}))),(type:REGION,values:List((id:{region}))),(type:YEARS_AT_CURRENT_COMPANY,values:List((id:{tenure}))),(type:YEARS_OF_EXPERIENCE,values:List((id:{years_of_experience})))),keywords:{self.keyword})" url = f"{self.BASE_URL}/{self.PEOPLE_URL_PREFIX},{filters}&{self.PEOPLE_URL_SUFFIX}" return url @@ -124,6 +121,7 @@ def perform_request(self, if response.status_code != 200: LOGGER.error(f'Error status_code = {response.status_code}') raise_for_error(response) + return response except requests.exceptions.Timeout as err: diff --git a/tap_linkedin/filter_criteria.py b/tap_linkedin/filter_criteria.py index 57f407d..57c5539 100644 --- a/tap_linkedin/filter_criteria.py +++ b/tap_linkedin/filter_criteria.py @@ -1,3 +1,8 @@ +import singer +import itertools + +LOGGER = singer.get_logger() + REGIONS = { "northamerica": '102221843', "southamerica": '104514572', @@ -10,15 +15,15 @@ COMPANY_SIZE = { 'selfemployed': 'A', - '1-10': 'B', + '1-10': 'B', '11-50': 'C', '51-200': 'D', '201-500': 'E', '501-1000': 'F', '1001-5000': 'G', '5001-10000': 'H', - '10000plus': 'I' -} + '10000plus': 'I', +} YEARS_OF_EXPERIENCE = { '10plus': '5', @@ -35,3 +40,26 @@ '1-2': '2', 'under1': '1' } + +def get_facet_combinations(): + # create all combinations for searching sales nav + # this allows us to segment search results and capture as many as possible without overlapping + facets = [] + facets.append(list(REGIONS.values())) + facets.append(list(YEARS_OF_EXPERIENCE.values())) + facets.append(list(TENURE.values())) + facet_combinations = list(itertools.product(*facets)) + + # create a map of all combinations so we have a key for each combo + # this will allow us to start up the sync again in the right spot in case it stops + combo_list = {} + + # loop through with company_size first to try to avoid getting data for the same company twice + # we use a set to capture company ids in the company stream context + for key, value in COMPANY_SIZE.items(): + for idx, combination in enumerate(facet_combinations): + if combination[2] <= combination[1]: + combo_dict = {"company_size": value, "region": combination[0], "years_of_experience": combination[1], "tenure": combination[2]} + combo_list[f"{value}-{idx}"] = combo_dict + + return combo_list \ No newline at end of file diff --git a/tap_linkedin/streams/people_stream.py b/tap_linkedin/streams/people_stream.py index c8c6be5..70a1ac8 100644 --- a/tap_linkedin/streams/people_stream.py +++ b/tap_linkedin/streams/people_stream.py @@ -1,7 +1,7 @@ import singer from .base_stream import BaseStream from tap_linkedin.context import Context -from tap_linkedin.filter_criteria import REGIONS +from tap_linkedin.utils import sleep LOGGER = singer.get_logger() @@ -23,17 +23,21 @@ def get_company_ids(self): else: pass - def sync_page(self, url, page_size, region, start): + def sync_page(self, url, page_size, company_size, region, years_of_experience, tenure, start): params = {"count": page_size, "start": start} time_extracted = singer.utils.now() response = self.client.get_request(url, params) + result_count = response.get('paging').get("total") records = response.get('elements') for idx, record in enumerate(records): record["id"] = int(record.get("objectUrn").replace("urn:li:member:", "")) record["searchRegion"] = region + record["searchCompanySize"] = company_size + record["searchYearsOfExperience"] = years_of_experience + record["searchTenure"] = tenure self.write_record(record, time_extracted) Context.set_bookmark(self.stream_id, self.replication_key, start + idx) @@ -55,14 +59,14 @@ def sync_page(self, url, page_size, region, start): Context.set_bookmark(self.stream_id, self.replication_key, start) self.write_state() - if len(records) < page_size: + if start >= result_count or len(records) == 0: start = None return start def sync_records(self, **kwargs): - start = Context.get_bookmark(PeopleStream.stream_id).get(PeopleStream.replication_key, 0) + start = 0 self.write_state() region = kwargs.get("region") @@ -71,10 +75,11 @@ def sync_records(self, **kwargs): tenure = kwargs.get("tenure") url = self.client.get_people_search_url(company_size, region, years_of_experience, tenure) - start = self.sync_page(url, PAGE_SIZE, region, start) + start = self.sync_page(url, PAGE_SIZE, company_size, region, years_of_experience, tenure, start) while start: - start = self.sync_page(url, PAGE_SIZE, region, start) + start = self.sync_page(url, PAGE_SIZE, company_size, region, years_of_experience, tenure, start) + sleep(3, 10) LOGGER.info(f"{PeopleStream.count} people found with GraphQL skills.") diff --git a/tap_linkedin/sync.py b/tap_linkedin/sync.py index 1722f71..137878f 100644 --- a/tap_linkedin/sync.py +++ b/tap_linkedin/sync.py @@ -1,18 +1,14 @@ -from re import L import singer -import itertools -import time -import random from tap_linkedin.streams.people_stream import PeopleStream from tap_linkedin.streams.company_stream import CompanyStream -from tap_linkedin.client import LinkedInClient from tap_linkedin.context import Context -from tap_linkedin.filter_criteria import REGIONS, COMPANY_SIZE, YEARS_OF_EXPERIENCE, TENURE +from tap_linkedin.utils import sleep +from tap_linkedin.filter_criteria import get_facet_combinations LOGGER = singer.get_logger() -def sync(client, config): +def sync(client): combination_list = get_facet_combinations() @@ -32,15 +28,12 @@ def sync(client, config): if currently_syncing_query: currently_syncing_query_split = currently_syncing_query.split("-") currently_syncing_query_company_size = currently_syncing_query_split[0] - currently_syncing_query_facet = int(currently_syncing_query_split[1]) - - if currently_syncing_stream == "companies": - company_stream = CompanyStream(client) - company_stream.sync() + currently_syncing_query_facet = 0 people_stream = PeopleStream(client) for key, value in combination_list.items(): + key_split = key.split("-") key_company_size = key_split[0] key_facet = int(key_split[1]) @@ -59,33 +52,6 @@ def sync(client, config): company_stream = CompanyStream(client) company_stream.sync() sleep() - -def sleep(): - delay = random.randint(45, 90) - LOGGER.info(f"Sleeping for {delay} seconds.") - time.sleep(delay) - -def get_facet_combinations(): - # create all combinations for searching sales nav - # this allows us to segment search results and capture as many as possible without overlapping - facets = [] - facets.append(list(REGIONS.values())) - facets.append(list(YEARS_OF_EXPERIENCE.values())) - facets.append(list(TENURE.values())) - facet_combinations = list(itertools.product(*facets)) - - # create a map of all combinations so we have a key for each combo - # this will allow us to start up the sync again in the right spot in case it stops - combo_list = {} - - # loop through with company_size first to try to avoid getting data for the same company twice - # we use a set to capture company ids in the company stream context - for key, value in COMPANY_SIZE.items(): - for idx, combination in enumerate(facet_combinations): - combo_dict = {"company_size": value, "region": combination[0], "years_of_experience": combination[1], "tenure": combination[2]} - combo_list[f"{value}-{idx}"] = combo_dict - - return combo_list diff --git a/tap_linkedin/utils.py b/tap_linkedin/utils.py new file mode 100644 index 0000000..f5fd3cd --- /dev/null +++ b/tap_linkedin/utils.py @@ -0,0 +1,10 @@ +import singer +import time +import random + +LOGGER = singer.get_logger() + +def sleep(range_start: int = 45, range_end: int = 90): + delay = random.randint(range_start, range_end) + LOGGER.info(f"Sleeping for {delay} seconds.") + time.sleep(delay) \ No newline at end of file