Skip to content

Commit

Permalink
feat(airflow): wrap up fetch_questions_through_stack_api
Browse files Browse the repository at this point in the history
  • Loading branch information
Lee-W committed Dec 6, 2023
1 parent b15edd9 commit a18ad0e
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 31 deletions.
4 changes: 2 additions & 2 deletions airflow/include/tasks/extract/stack_overflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@


def extract_stack_overflow(
tag: str, stackoverflow_cutoff_date: str, *, max_pagesize: int = 100, max_pages: int = 10000000
tag: str, stackoverflow_cutoff_date: str, from_start: False, *, page_size: int = 100, max_pages: int = 10000000
) -> pd.DataFrame:
"""
This task generates stack overflow documents as a single markdown document per question with associated comments
Expand All @@ -35,7 +35,7 @@ def extract_stack_overflow(
questions = fetch_questions_through_stack_api(
tag=tag,
stackoverflow_cutoff_date=stackoverflow_cutoff_date,
max_pagesize=max_pagesize,
page_size=page_size,
max_pages=max_pages,
)
posts_df = process_stack_api_posts(questions)
Expand Down
92 changes: 63 additions & 29 deletions airflow/include/tasks/extract/utils/stack_overflow_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,58 @@


def fetch_questions_through_stack_api(
tag: str, stackoverflow_cutoff_date: str, *, max_pagesize: int = 100, max_pages: int = 10000000
tag: str, stackoverflow_cutoff_date: str, *, page_size: int = 100, max_pages: int = 10000000
) -> dict:
stack_api = StackAPI(name="stackoverflow", max_pagesize=max_pagesize, max_pages=max_pages)
fromdate = datetime.strptime(stackoverflow_cutoff_date, "%Y-%m-%d")
first_question_id, first_question_creation_date = fetch_first_question_after_fromdate(tag=tag, fromdate=fromdate)

stack_api = StackAPI(name="stackoverflow", page_size=page_size, max_pages=max_pages)

# https://api.stackexchange.com/docs/read-filter#filters=!-(5KXGCFLp3w9.-7QsAKFqaf5yFPl**9q*_hsHzYGjJGQ6BxnCMvDYijFE&filter=default&run=true
filter_ = "!-(5KXGCFLp3w9.-7QsAKFqaf5yFPl**9q*_hsHzYGjJGQ6BxnCMvDYijFE"

questions_resp = stack_api.fetch(endpoint="questions", tagged=tag, fromdate=fromdate, filter=filter_)
questions_resp = stack_api.fetch(
endpoint="questions",
filter=filter_,
tagged=tag,
fromdate=fromdate,
order="desc",
sort="creation",
)
questions = questions_resp.pop("items")
while questions_resp["quota_remaining"] > 0 and questions[-1]["question_id"] != first_question_id:
todate = questions[-1]["creation_date"]
questions_resp = stack_api.fetch(
endpoint="questions",
filter=filter_,
tagged=tag,
fromdate=fromdate,
todate=todate,
order="desc",
sort="creation",
)
questions.extend(questions_resp.pop("items"))

return questions

# TODO: check if we need to paginate
len(questions)

# TODO: add backoff logic. For now just fail the task if we can't fetch all results due to api rate limits.
assert not questions_resp["has_more"]
def fetch_first_question_after_fromdate(*, tag: str, fromdate: datetime.date) -> tuple[int, int]:
"""Get the first question id after fromdate"""
stack_api = StackAPI(name="stackoverflow")
filter_ = "!*1PUVE3_-UtLf0rvavrile9fyVsn*T)jdVaO6_P)K"
questions_resp = stack_api.fetch(
endpoint="questions",
page=1,
pagesize=1,
order="asc",
sort="creation",
fromdate=fromdate,
tagged=tag,
filter=filter_,
)
first_question = questions_resp["items"][0]

return questions
return first_question["question_id"], first_question["creation_date"]


def process_stack_api_posts(questions: dict) -> pd.DataFrame:
Expand All @@ -60,27 +94,6 @@ def process_stack_api_posts(questions: dict) -> pd.DataFrame:
return posts_df


def process_stack_api_comments(comments: list) -> str:
"""
This helper function processes a list of slack comments for a question or answer
param comments: a list of stack overflow comments from the api
type comments: list
"""
return "".join(
[
comment_template.format(
user=comment["owner"]["user_id"],
date=datetime.fromtimestamp(comment["creation_date"]).strftime("%Y-%m-%d"),
score=comment["score"],
body=comment["body_markdown"],
)
for comment in comments
]
)


def process_stack_api_questions(posts_df: dict, tag: str) -> pd.DataFrame:
"""
This helper function processes a dataframe of slack posts into a set format.
Expand Down Expand Up @@ -111,6 +124,27 @@ def process_stack_api_questions(posts_df: dict, tag: str) -> pd.DataFrame:
return questions_df


def process_stack_api_comments(comments: list) -> str:
"""
This helper function processes a list of slack comments for a question or answer
param comments: a list of stack overflow comments from the api
type comments: list
"""
return "".join(
[
comment_template.format(
user=comment["owner"].get("user_id", ""),
date=datetime.fromtimestamp(comment["creation_date"]).strftime("%Y-%m-%d"),
score=comment["score"],
body=comment["body_markdown"],
)
for comment in comments
]
)


def process_stack_api_answers(posts_df: pd.DataFrame) -> pd.DataFrame:
"""
This helper function builds a dataframe of slack answers based on posts.
Expand Down

0 comments on commit a18ad0e

Please sign in to comment.