Skip to content

Commit

Permalink
Remove the code which is not in scope for this PR
Browse files Browse the repository at this point in the history
  • Loading branch information
sunank200 committed Nov 21, 2023
1 parent 0d12313 commit e5d0cc8
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 208 deletions.
12 changes: 6 additions & 6 deletions airflow/dags/ingestion/ask-astro-load.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def ask_astro_load_bulk():
This DAG performs the initial load of data from sources.
If seed_baseline_url (set above) points to a parquet file with pre-embedded data it will be
ingested. Otherwise, new data is extracted, split, embedded and ingested.
ingested. Otherwise, new data is extracted, split, embedded and ingested.
The first time this DAG runs (without seeded baseline) it will take at lease 90 minutes to
extract data from all sources. Extracted data is then serialized to disk in the project
Expand All @@ -81,7 +81,7 @@ def get_schema(schema_file: str) -> list:
:param schema_file: path to the schema JSON file
"""
return ask_astro_weaviate_hook.get_schema(schema_file="include/data/schema.json")
return ask_astro_weaviate_hook.get_schema(schema_file="include/data/schema.json", weaviate_class=WEAVIATE_CLASS)

@task.branch
def check_schema(class_objects: list) -> list[str]:
Expand Down Expand Up @@ -178,12 +178,12 @@ def extract_stack_overflow(tag: str, stackoverflow_cutoff_date: str = stackoverf
# return df

@task(trigger_rule="none_failed")
def extract_github_issues(source: dict):
def extract_github_issues(repo_base: str):
try:
df = pd.read_parquet(f"include/data/{source['repo_base']}/issues.parquet")
df = pd.read_parquet(f"include/data/{repo_base}/issues.parquet")
except Exception:
df = github.extract_github_issues(source, _GITHUB_CONN_ID)
df.to_parquet(f"include/data/{source['repo_base']}/issues.parquet")
df = github.extract_github_issues(repo_base, _GITHUB_CONN_ID)
df.to_parquet(f"include/data/{repo_base}/issues.parquet")

return df

Expand Down
34 changes: 18 additions & 16 deletions airflow/include/tasks/extract/blogs.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from __future__ import annotations

import datetime
from urllib.parse import urljoin
from datetime import datetime

import pandas as pd
import requests
Expand All @@ -14,7 +13,7 @@
page_url = base_url + "/blog/{page}/#archive"


def extract_astro_blogs(blog_cutoff_date: datetime.date) -> list[pd.DataFrame]:
def extract_astro_blogs(blog_cutoff_date: datetime) -> list[pd.DataFrame]:
"""
This task downloads Blogs from the Astronomer website and returns a list of pandas dataframes. Return
type is a list in order to map to upstream dynamic tasks.
Expand All @@ -31,34 +30,37 @@ def extract_astro_blogs(blog_cutoff_date: datetime.date) -> list[pd.DataFrame]:

headers = {}
links = []
dates = []
page = 1

response = requests.get(page_url.format(page=page), headers=headers)
while response.ok:
soup = BeautifulSoup(response.text, "lxml")
for card in soup.find_all(class_="post-card__meta"):
blog_date = datetime.datetime.strptime(card.find("time")["datetime"], "%Y-%m-%dT%H:%M:%S.%fZ")
if blog_date.date() >= blog_cutoff_date:
url = urljoin(base_url, card.find("a", href=True)["href"])
response = requests.head(url, allow_redirects=True)
if response.ok:
links.append(
{
"docLink": response.url,
"title": card.find(class_="title").get_text(),
}
)
cards = soup.find_all(class_="post-card__cover")
card_links = [base_url + card.find("a", href=True)["href"] for card in cards]
links.extend(card_links)
meta = soup.find_all(class_="post-card__meta")
dates.extend([post.find("time")["datetime"] for post in meta])

page = page + 1
response = requests.get(page_url.format(page=page), headers=headers)

df = pd.DataFrame(links)
df = pd.DataFrame(zip(links, dates), columns=["docLink", "date"])

df["date"] = pd.to_datetime(df["date"]).dt.date
df = df[df["date"] > blog_cutoff_date.date()]
df.drop("date", inplace=True, axis=1)
df.drop_duplicates(inplace=True)

df["content"] = df["docLink"].apply(lambda x: requests.get(x).content)
df["title"] = df["content"].apply(
lambda x: BeautifulSoup(x, "lxml").find(class_="post-card__meta").find(class_="title").get_text()
)

df["content"] = df["content"].apply(lambda x: BeautifulSoup(x, "lxml").find(class_="prose").get_text())
df["content"] = df.apply(lambda x: blog_format.format(title=x.title, content=x.content), axis=1)

df.drop("title", axis=1, inplace=True)
df["sha"] = df["content"].apply(generate_uuid5)
df["docSource"] = "astro blog"
df.reset_index(drop=True, inplace=True)
Expand Down
66 changes: 30 additions & 36 deletions airflow/include/tasks/extract/github.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,19 +180,14 @@ def extract_github_python(source: dict, github_conn_id: str) -> pd.DataFrame:
return df


def extract_github_issues(source: dict, github_conn_id: str) -> pd.DataFrame:
def extract_github_issues(repo_base: str, github_conn_id: str) -> pd.DataFrame:
"""
This task downloads github issues as markdown documents in a pandas dataframe. Text from templated
auto responses for issues are removed while building a markdown document for each issue. Extraction will
only pull issues after a provided "cutoff_date" AND "cutoff_issue_number".
auto responses for issues are removed while building a markdown document for each issue.
param source: A dictionary specifying what to ingest.
example:
{"repo_base": "apache/airflow",
"cutoff_date": datetime.date.today() - relativedelta(months=1),
"cutoff_issue_number": 30000
}
type repo_base: dict
param repo_base: The name of of organization/repository (ie. "apache/airflow") from which to extract
issues.
type repo_base: str
param github_conn_id: The connection ID to use with the GithubHook
param github_conn_id: str
Expand All @@ -207,8 +202,8 @@ def extract_github_issues(source: dict, github_conn_id: str) -> pd.DataFrame:

gh_hook = GithubHook(github_conn_id)

repo = gh_hook.client.get_repo(source["repo_base"])
issues = repo.get_issues(state="all")
repo = gh_hook.client.get_repo(repo_base)
issues = repo.get_issues()

issue_autoresponse_text = "Thanks for opening your first issue here!"
pr_autoresponse_text = "Congratulations on your first Pull Request and welcome to the Apache Airflow community!"
Expand Down Expand Up @@ -248,31 +243,30 @@ def extract_github_issues(source: dict, github_conn_id: str) -> pd.DataFrame:

while page:
for issue in page:
if issue.updated_at.date() >= source["cutoff_date"] and issue.number >= source["cutoff_issue_number"]:
print(issue.number)
comments = []
for comment in issue.get_comments():
if not any(substring in comment.body for substring in drop_content):
comments.append(
comment_markdown_template.format(
user=comment.user.login, date=issue.created_at.strftime("%m-%d-%Y"), body=comment.body
)
print(issue.number)
comments = []
for comment in issue.get_comments():
if not any(substring in comment.body for substring in drop_content):
comments.append(
comment_markdown_template.format(
user=comment.user.login, date=issue.created_at.strftime("%m-%d-%Y"), body=comment.body
)
downloaded_docs.append(
{
"docLink": issue.html_url,
"sha": "",
"content": issue_markdown_template.format(
title=issue.title,
date=issue.created_at.strftime("%m-%d-%Y"),
user=issue.user.login,
state=issue.state,
body=issue.body,
comments="\n".join(comments),
),
"docSource": f"{source['repo_base']}/issues",
}
)
)
downloaded_docs.append(
{
"docLink": issue.html_url,
"sha": "",
"content": issue_markdown_template.format(
title=issue.title,
date=issue.created_at.strftime("%m-%d-%Y"),
user=issue.user.login,
state=issue.state,
body=issue.body,
comments="\n".join(comments),
),
"docSource": f"{repo_base}/issues",
}
)
page_num = page_num + 1
page = issues.get_page(page_num)

Expand Down
71 changes: 0 additions & 71 deletions airflow/include/tasks/extract/stack_overflow.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,13 @@
from __future__ import annotations

import datetime

import pandas as pd
from stackapi import StackAPI
from weaviate.util import generate_uuid5

from include.tasks.extract.utils.stack_overflow_helpers import (
process_stack_answers,
process_stack_answers_api,
process_stack_comments,
process_stack_comments_api,
process_stack_posts,
process_stack_questions,
process_stack_questions_api,
)


Expand Down Expand Up @@ -70,68 +64,3 @@ def extract_stack_overflow_archive(tag: str, stackoverflow_cutoff_date: str) ->
df = df[["docSource", "sha", "content", "docLink"]]

return df


def extract_stack_overflow(tag: str, stackoverflow_cutoff_date: str) -> pd.DataFrame:
"""
This task generates stack overflow documents as a single markdown document per question with associated comments
and answers. The task returns a pandas dataframe with all documents.
param tag: The tag names to include in extracting from stack overflow.
This is used for populating the 'docSource'
type tag: str
param stackoverflow_cutoff_date: Only messages from after this date will be extracted.
type stackoverflow_cutoff_date: str
returned dataframe fields are:
'docSource': 'stackoverflow' plus the tag name (ie. 'airflow')
'docLink': URL for the base question.
'content': The question (plus answers) in markdown format.
'sha': a UUID based on the other fields. This is for compatibility with other document types.
"""

SITE = StackAPI(name="stackoverflow", max_pagesize=100, max_pages=10000000)

fromdate = datetime.datetime.strptime(stackoverflow_cutoff_date, "%Y-%m-%d")

# https://api.stackexchange.com/docs/read-filter#filters=!-(5KXGCFLp3w9.-7QsAKFqaf5yFPl**9q*_hsHzYGjJGQ6BxnCMvDYijFE&filter=default&run=true
filter_ = "!-(5KXGCFLp3w9.-7QsAKFqaf5yFPl**9q*_hsHzYGjJGQ6BxnCMvDYijFE"

questions_dict = SITE.fetch(endpoint="questions", tagged=tag, fromdate=fromdate, filter=filter_)
items = questions_dict.pop("items")

# TODO: check if we need to paginate
len(items)
# TODO: add backoff logic. For now just fail the task if we can't fetch all results due to api rate limits.
assert not questions_dict["has_more"]

posts_df = pd.DataFrame(items)
posts_df = posts_df[posts_df["answer_count"] >= 1]
posts_df = posts_df[posts_df["score"] >= 1]
posts_df.reset_index(inplace=True, drop=True)

# process questions
questions_df = posts_df
questions_df["comments"] = questions_df["comments"].fillna("")
questions_df["question_comments"] = questions_df["comments"].apply(lambda x: process_stack_comments_api(x))
questions_df = process_stack_questions_api(questions_df=questions_df, tag=tag)

# process associated answers
answers_df = posts_df.explode("answers").reset_index(drop=True)
answers_df["comments"] = answers_df["answers"].apply(lambda x: x.get("comments"))
answers_df["comments"] = answers_df["comments"].fillna("")
answers_df["answer_comments"] = answers_df["comments"].apply(lambda x: process_stack_comments_api(x))
answers_df = process_stack_answers_api(answers_df=answers_df)

# combine questions and answers
df = questions_df.join(answers_df).reset_index(drop=True)
df["content"] = df[["question_text", "answer_text"]].apply("\n".join, axis=1)

df["sha"] = df.apply(generate_uuid5, axis=1)

# column order matters for uuid generation
df = df[["docSource", "sha", "content", "docLink"]]

return df
71 changes: 0 additions & 71 deletions airflow/include/tasks/extract/utils/stack_overflow_helpers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import datetime
from textwrap import dedent

import pandas as pd
Expand Down Expand Up @@ -191,73 +190,3 @@ def process_stack_answers(posts_df: pd.DataFrame, comments_df: pd.DataFrame) ->
answers_df = answers_df.groupby("question_id")["answer_text"].apply(lambda x: "".join(x))

return answers_df


def process_stack_comments_api(comments: list) -> str:
"""
This helper function processes a list of slack comments for a question or answer
param comments: a list of stack overflow comments from the api
type comments: list
"""
return "".join(
[
comment_template.format(
user=comment["owner"]["user_id"],
date=datetime.datetime.fromtimestamp(comment["creation_date"]).strftime("%Y-%m-%d"),
score=comment["score"],
body=comment["body_markdown"],
)
for comment in comments
]
)


def process_stack_questions_api(questions_df: pd.DataFrame, tag: str) -> pd.DataFrame:
"""
This helper function formats a questions dataframe pulled from slack api.
The column question_text is created in markdown format based on question_template.
"""

# format question content
questions_df["question_text"] = questions_df.apply(
lambda x: question_template.format(
title=x.title,
user=x.owner["user_id"],
date=datetime.datetime.fromtimestamp(x.creation_date).strftime("%Y-%m-%d"),
score=x.score,
body=x.body_markdown,
question_comments=x.question_comments,
),
axis=1,
)
questions_df = questions_df[["link", "question_id", "question_text"]].set_index("question_id")

questions_df["docSource"] = f"stackoverflow {tag}"

questions_df.rename({"link": "docLink"}, axis=1, inplace=True)

return questions_df


def process_stack_answers_api(answers_df: pd.DataFrame) -> pd.DataFrame:
"""
This helper function formats answers into markdownd documents and returns
a dataframe with question_id as index.
"""
answers_df["answer_text"] = answers_df[["answers", "answer_comments"]].apply(
lambda x: answer_template.format(
score=x.answers["score"],
date=datetime.datetime.fromtimestamp(x.answers["creation_date"]).strftime("%Y-%m-%d"),
user=x.answers["owner"]["user_id"],
body=x.answers["body_markdown"],
answer_comments=x.answer_comments,
),
axis=1,
)
answers_df = answers_df.groupby("question_id")["answer_text"].apply(lambda x: "".join(x))

return answers_df
Loading

0 comments on commit e5d0cc8

Please sign in to comment.