Skip to content

Commit

Permalink
Merge pull request #2 from adgramigna/add-multiprocessing
Browse files Browse the repository at this point in the history
Add multiprocessing
  • Loading branch information
adgramigna authored Oct 14, 2023
2 parents 8012e1f + c47944b commit ea9396c
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 55 deletions.
11 changes: 4 additions & 7 deletions .github/workflows/daily_job_board_scrape.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,9 @@ jobs:
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Make shell script executable
run: cd job_board_scraper/shell_scripts && chmod +x run_job_scraper_chunked.sh

- name: Run Shell script
run: cd job_board_scraper/shell_scripts && ./run_job_scraper_chunked.sh
- name: Run Scrapy script
run: cd job_board_scraper && run_job_scraper.py
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_REGION: ${{ secrets.AWS_REGION }}
Expand All @@ -36,7 +33,7 @@ jobs:
PG_HOST: ${{ secrets.PG_HOST }}
PG_PASSWORD: ${{ secrets.PG_PASSWORD }}
PG_USER: ${{ secrets.PG_USER }}
ID_COL: ${{secrets.ID_COL}}
CHUNK_SIZE: ${{ secrets.CHUNK_SIZE }}
RAW_HTML_S3_BUCKET: ${{ secrets.RAW_HTML_S3_BUCKET }}

run_dbt:
Expand Down
3 changes: 3 additions & 0 deletions job_board_scraper/job_board_scraper/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@
# Disable Telnet Console (enabled by default)
#TELNETCONSOLE_ENABLED = False

#Need dynamic allocation with multiprocessing
TELNETCONSOLE_PORT = None #https://docs.scrapy.org/en/latest/topics/telnetconsole.html

# Override the default request headers:
#DEFAULT_REQUEST_HEADERS = {
# 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
Expand Down
27 changes: 12 additions & 15 deletions job_board_scraper/job_board_scraper/utils/scraper_util.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
import os
import sys
from dotenv import load_dotenv
load_dotenv()

def parse_args():
try:
id_lower_bound = int(sys.argv[1])
id_upper_bound = int(sys.argv[2])
id_restriction = f""" and {os.environ.get("ID_COL")} >= %s and {os.environ.get("ID_COL")} < %s"""
id_values = tuple([id_lower_bound, id_upper_bound])
except:
id_restriction = ""
id_values = tuple()
return id_restriction, id_values
def get_url_chunks(careers_page_urls, chunk_size):
url_chunks = []
single_chunk = []
for i, url in enumerate(careers_page_urls):
careers_page_url = url[0] #UnTuple-ify
single_chunk.append(careers_page_url)
if i % chunk_size == chunk_size - 1:
url_chunks.append(single_chunk)
single_chunk = []
if len(single_chunk) > 0:
url_chunks.append(single_chunk)
return url_chunks
52 changes: 34 additions & 18 deletions job_board_scraper/run_job_scraper.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,48 @@
import logging
import psycopg2
import time
import multiprocessing
from scrapy.crawler import CrawlerProcess
from job_board_scraper.spiders.greenhouse_jobs_outline_spider import GreenhouseJobsOutlineSpider
from job_board_scraper.spiders.greenhouse_job_departments_spider import GreenhouseJobDepartmentsSpider
from job_board_scraper.spiders.lever_jobs_outline_spider import LeverJobsOutlineSpider
from job_board_scraper.utils.postgres_wrapper import PostgresWrapper
from job_board_scraper.utils import general as util
from job_board_scraper.utils.scraper_util import parse_args
from job_board_scraper.utils.scraper_util import get_url_chunks
from scrapy.utils.project import get_project_settings

logger = logging.getLogger("logger")
process = CrawlerProcess(get_project_settings())
connection = psycopg2.connect(host=os.environ.get("PG_HOST"), user=os.environ.get("PG_USER"), password=os.environ.get("PG_PASSWORD"), dbname=os.environ.get("PG_DATABASE"))
cursor = connection.cursor()
id_restriction, id_values = parse_args()
cursor.execute(os.environ.get("PAGES_TO_SCRAPE_QUERY")+id_restriction, id_values)
careers_page_urls = cursor.fetchall()
cursor.close()
connection.close()
run_hash = util.hash_ids.encode(int(time.time()))

for i, url in enumerate(careers_page_urls):
careers_page_url = url[0] #UnTuple-ify
logger.info(f"url = {careers_page_url} {careers_page_urls}")
if careers_page_url.split(".")[1] == "greenhouse":
process.crawl(GreenhouseJobDepartmentsSpider, careers_page_url = careers_page_url, use_existing_html=True, run_hash=run_hash, url_id=i)
process.crawl(GreenhouseJobsOutlineSpider, careers_page_url = careers_page_url, use_existing_html=True, run_hash=run_hash, url_id=i)
elif careers_page_url.split(".")[1] == "lever":
process.crawl(LeverJobsOutlineSpider, careers_page_url = careers_page_url, use_existing_html=True, run_hash=run_hash, url_id=i)
process.start()
def run_spider(single_url_chunk, chunk_number):
process = CrawlerProcess(get_project_settings())
for i, careers_page_url in enumerate(single_url_chunk):
logger.info(f"url = {careers_page_url}")
if careers_page_url.split(".")[1] == "greenhouse":
process.crawl(GreenhouseJobDepartmentsSpider, careers_page_url = careers_page_url, use_existing_html=True, run_hash=run_hash, url_id=chunk_number*i+i)
process.crawl(GreenhouseJobsOutlineSpider, careers_page_url = careers_page_url, use_existing_html=True, run_hash=run_hash, url_id=chunk_number*i+i)
elif careers_page_url.split(".")[1] == "lever":
process.crawl(LeverJobsOutlineSpider, careers_page_url = careers_page_url, use_existing_html=True, run_hash=run_hash, url_id=chunk_number*i+i)
process.start()

if __name__ == '__main__':
chunk_size = int(os.environ.get("CHUNK_SIZE"))

connection = psycopg2.connect(host=os.environ.get("PG_HOST"), user=os.environ.get("PG_USER"), password=os.environ.get("PG_PASSWORD"), dbname=os.environ.get("PG_DATABASE"))
cursor = connection.cursor()
cursor.execute(os.environ.get("PAGES_TO_SCRAPE_QUERY"))
careers_page_urls = cursor.fetchall()
cursor.close()
connection.close()
url_chunks = get_url_chunks(careers_page_urls, chunk_size)

num_processes = len(url_chunks)
processes = []

for i, single_url_chunk in enumerate(url_chunks):
p = multiprocessing.Process(target=run_spider, args=(single_url_chunk, i))
processes.append(p)
p.start()

for p in processes:
p.join()
15 changes: 0 additions & 15 deletions job_board_scraper/shell_scripts/run_job_scraper_chunked.sh

This file was deleted.

0 comments on commit ea9396c

Please sign in to comment.