Skip to content

Commit

Permalink
Remove the code which is not in scope for this PR
Browse files Browse the repository at this point in the history
  • Loading branch information
sunank200 committed Nov 21, 2023
1 parent 0d12313 commit c5cd78d
Show file tree
Hide file tree
Showing 9 changed files with 96 additions and 232 deletions.
14 changes: 7 additions & 7 deletions airflow/dags/ingestion/ask-astro-load.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

_WEAVIATE_CONN_ID = f"weaviate_{ask_astro_env}"
_GITHUB_CONN_ID = "github_ro"
WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsDevAnkit")
WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsDev")

ask_astro_weaviate_hook = AskAstroWeaviateHook(_WEAVIATE_CONN_ID)

Expand Down Expand Up @@ -65,7 +65,7 @@ def ask_astro_load_bulk():
This DAG performs the initial load of data from sources.
If seed_baseline_url (set above) points to a parquet file with pre-embedded data it will be
ingested. Otherwise, new data is extracted, split, embedded and ingested.
ingested. Otherwise, new data is extracted, split, embedded and ingested.
The first time this DAG runs (without seeded baseline) it will take at lease 90 minutes to
extract data from all sources. Extracted data is then serialized to disk in the project
Expand All @@ -81,7 +81,7 @@ def get_schema(schema_file: str) -> list:
:param schema_file: path to the schema JSON file
"""
return ask_astro_weaviate_hook.get_schema(schema_file="include/data/schema.json")
return ask_astro_weaviate_hook.get_schema(schema_file="include/data/schema.json", weaviate_class=WEAVIATE_CLASS)

@task.branch
def check_schema(class_objects: list) -> list[str]:
Expand Down Expand Up @@ -178,12 +178,12 @@ def extract_stack_overflow(tag: str, stackoverflow_cutoff_date: str = stackoverf
# return df

@task(trigger_rule="none_failed")
def extract_github_issues(source: dict):
def extract_github_issues(repo_base: str):
try:
df = pd.read_parquet(f"include/data/{source['repo_base']}/issues.parquet")
df = pd.read_parquet(f"include/data/{repo_base}/issues.parquet")
except Exception:
df = github.extract_github_issues(source, _GITHUB_CONN_ID)
df.to_parquet(f"include/data/{source['repo_base']}/issues.parquet")
df = github.extract_github_issues(repo_base, _GITHUB_CONN_ID)
df.to_parquet(f"include/data/{repo_base}/issues.parquet")

return df

Expand Down
2 changes: 1 addition & 1 deletion airflow/include/data/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
"name": "content",
"description": "Document content",
"dataType": ["text"],
"tokenization": "whitespace",
"tokenization": "word",
"moduleConfig": {
"text2vec-openai": {
"skip": "False",
Expand Down
34 changes: 18 additions & 16 deletions airflow/include/tasks/extract/blogs.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from __future__ import annotations

import datetime
from urllib.parse import urljoin
from datetime import datetime

import pandas as pd
import requests
Expand All @@ -14,7 +13,7 @@
page_url = base_url + "/blog/{page}/#archive"


def extract_astro_blogs(blog_cutoff_date: datetime.date) -> list[pd.DataFrame]:
def extract_astro_blogs(blog_cutoff_date: datetime) -> list[pd.DataFrame]:
"""
This task downloads Blogs from the Astronomer website and returns a list of pandas dataframes. Return
type is a list in order to map to upstream dynamic tasks.
Expand All @@ -31,34 +30,37 @@ def extract_astro_blogs(blog_cutoff_date: datetime.date) -> list[pd.DataFrame]:

headers = {}
links = []
dates = []
page = 1

response = requests.get(page_url.format(page=page), headers=headers)
while response.ok:
soup = BeautifulSoup(response.text, "lxml")
for card in soup.find_all(class_="post-card__meta"):
blog_date = datetime.datetime.strptime(card.find("time")["datetime"], "%Y-%m-%dT%H:%M:%S.%fZ")
if blog_date.date() >= blog_cutoff_date:
url = urljoin(base_url, card.find("a", href=True)["href"])
response = requests.head(url, allow_redirects=True)
if response.ok:
links.append(
{
"docLink": response.url,
"title": card.find(class_="title").get_text(),
}
)
cards = soup.find_all(class_="post-card__cover")
card_links = [base_url + card.find("a", href=True)["href"] for card in cards]
links.extend(card_links)
meta = soup.find_all(class_="post-card__meta")
dates.extend([post.find("time")["datetime"] for post in meta])

page = page + 1
response = requests.get(page_url.format(page=page), headers=headers)

df = pd.DataFrame(links)
df = pd.DataFrame(zip(links, dates), columns=["docLink", "date"])

df["date"] = pd.to_datetime(df["date"]).dt.date
df = df[df["date"] > blog_cutoff_date.date()]
df.drop("date", inplace=True, axis=1)
df.drop_duplicates(inplace=True)

df["content"] = df["docLink"].apply(lambda x: requests.get(x).content)
df["title"] = df["content"].apply(
lambda x: BeautifulSoup(x, "lxml").find(class_="post-card__meta").find(class_="title").get_text()
)

df["content"] = df["content"].apply(lambda x: BeautifulSoup(x, "lxml").find(class_="prose").get_text())
df["content"] = df.apply(lambda x: blog_format.format(title=x.title, content=x.content), axis=1)

df.drop("title", axis=1, inplace=True)
df["sha"] = df["content"].apply(generate_uuid5)
df["docSource"] = "astro blog"
df.reset_index(drop=True, inplace=True)
Expand Down
66 changes: 30 additions & 36 deletions airflow/include/tasks/extract/github.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,19 +180,14 @@ def extract_github_python(source: dict, github_conn_id: str) -> pd.DataFrame:
return df


def extract_github_issues(source: dict, github_conn_id: str) -> pd.DataFrame:
def extract_github_issues(repo_base: str, github_conn_id: str) -> pd.DataFrame:
"""
This task downloads github issues as markdown documents in a pandas dataframe. Text from templated
auto responses for issues are removed while building a markdown document for each issue. Extraction will
only pull issues after a provided "cutoff_date" AND "cutoff_issue_number".
auto responses for issues are removed while building a markdown document for each issue.
param source: A dictionary specifying what to ingest.
example:
{"repo_base": "apache/airflow",
"cutoff_date": datetime.date.today() - relativedelta(months=1),
"cutoff_issue_number": 30000
}
type repo_base: dict
param repo_base: The name of of organization/repository (ie. "apache/airflow") from which to extract
issues.
type repo_base: str
param github_conn_id: The connection ID to use with the GithubHook
param github_conn_id: str
Expand All @@ -207,8 +202,8 @@ def extract_github_issues(source: dict, github_conn_id: str) -> pd.DataFrame:

gh_hook = GithubHook(github_conn_id)

repo = gh_hook.client.get_repo(source["repo_base"])
issues = repo.get_issues(state="all")
repo = gh_hook.client.get_repo(repo_base)
issues = repo.get_issues()

issue_autoresponse_text = "Thanks for opening your first issue here!"
pr_autoresponse_text = "Congratulations on your first Pull Request and welcome to the Apache Airflow community!"
Expand Down Expand Up @@ -248,31 +243,30 @@ def extract_github_issues(source: dict, github_conn_id: str) -> pd.DataFrame:

while page:
for issue in page:
if issue.updated_at.date() >= source["cutoff_date"] and issue.number >= source["cutoff_issue_number"]:
print(issue.number)
comments = []
for comment in issue.get_comments():
if not any(substring in comment.body for substring in drop_content):
comments.append(
comment_markdown_template.format(
user=comment.user.login, date=issue.created_at.strftime("%m-%d-%Y"), body=comment.body
)
print(issue.number)
comments = []
for comment in issue.get_comments():
if not any(substring in comment.body for substring in drop_content):
comments.append(
comment_markdown_template.format(
user=comment.user.login, date=issue.created_at.strftime("%m-%d-%Y"), body=comment.body
)
downloaded_docs.append(
{
"docLink": issue.html_url,
"sha": "",
"content": issue_markdown_template.format(
title=issue.title,
date=issue.created_at.strftime("%m-%d-%Y"),
user=issue.user.login,
state=issue.state,
body=issue.body,
comments="\n".join(comments),
),
"docSource": f"{source['repo_base']}/issues",
}
)
)
downloaded_docs.append(
{
"docLink": issue.html_url,
"sha": "",
"content": issue_markdown_template.format(
title=issue.title,
date=issue.created_at.strftime("%m-%d-%Y"),
user=issue.user.login,
state=issue.state,
body=issue.body,
comments="\n".join(comments),
),
"docSource": f"{repo_base}/issues",
}
)
page_num = page_num + 1
page = issues.get_page(page_num)

Expand Down
71 changes: 0 additions & 71 deletions airflow/include/tasks/extract/stack_overflow.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,13 @@
from __future__ import annotations

import datetime

import pandas as pd
from stackapi import StackAPI
from weaviate.util import generate_uuid5

from include.tasks.extract.utils.stack_overflow_helpers import (
process_stack_answers,
process_stack_answers_api,
process_stack_comments,
process_stack_comments_api,
process_stack_posts,
process_stack_questions,
process_stack_questions_api,
)


Expand Down Expand Up @@ -70,68 +64,3 @@ def extract_stack_overflow_archive(tag: str, stackoverflow_cutoff_date: str) ->
df = df[["docSource", "sha", "content", "docLink"]]

return df


def extract_stack_overflow(tag: str, stackoverflow_cutoff_date: str) -> pd.DataFrame:
"""
This task generates stack overflow documents as a single markdown document per question with associated comments
and answers. The task returns a pandas dataframe with all documents.
param tag: The tag names to include in extracting from stack overflow.
This is used for populating the 'docSource'
type tag: str
param stackoverflow_cutoff_date: Only messages from after this date will be extracted.
type stackoverflow_cutoff_date: str
returned dataframe fields are:
'docSource': 'stackoverflow' plus the tag name (ie. 'airflow')
'docLink': URL for the base question.
'content': The question (plus answers) in markdown format.
'sha': a UUID based on the other fields. This is for compatibility with other document types.
"""

SITE = StackAPI(name="stackoverflow", max_pagesize=100, max_pages=10000000)

fromdate = datetime.datetime.strptime(stackoverflow_cutoff_date, "%Y-%m-%d")

# https://api.stackexchange.com/docs/read-filter#filters=!-(5KXGCFLp3w9.-7QsAKFqaf5yFPl**9q*_hsHzYGjJGQ6BxnCMvDYijFE&filter=default&run=true
filter_ = "!-(5KXGCFLp3w9.-7QsAKFqaf5yFPl**9q*_hsHzYGjJGQ6BxnCMvDYijFE"

questions_dict = SITE.fetch(endpoint="questions", tagged=tag, fromdate=fromdate, filter=filter_)
items = questions_dict.pop("items")

# TODO: check if we need to paginate
len(items)
# TODO: add backoff logic. For now just fail the task if we can't fetch all results due to api rate limits.
assert not questions_dict["has_more"]

posts_df = pd.DataFrame(items)
posts_df = posts_df[posts_df["answer_count"] >= 1]
posts_df = posts_df[posts_df["score"] >= 1]
posts_df.reset_index(inplace=True, drop=True)

# process questions
questions_df = posts_df
questions_df["comments"] = questions_df["comments"].fillna("")
questions_df["question_comments"] = questions_df["comments"].apply(lambda x: process_stack_comments_api(x))
questions_df = process_stack_questions_api(questions_df=questions_df, tag=tag)

# process associated answers
answers_df = posts_df.explode("answers").reset_index(drop=True)
answers_df["comments"] = answers_df["answers"].apply(lambda x: x.get("comments"))
answers_df["comments"] = answers_df["comments"].fillna("")
answers_df["answer_comments"] = answers_df["comments"].apply(lambda x: process_stack_comments_api(x))
answers_df = process_stack_answers_api(answers_df=answers_df)

# combine questions and answers
df = questions_df.join(answers_df).reset_index(drop=True)
df["content"] = df[["question_text", "answer_text"]].apply("\n".join, axis=1)

df["sha"] = df.apply(generate_uuid5, axis=1)

# column order matters for uuid generation
df = df[["docSource", "sha", "content", "docLink"]]

return df
56 changes: 34 additions & 22 deletions airflow/include/tasks/extract/utils/html_helpers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import logging
import urllib.parse
from time import sleep

Expand All @@ -12,10 +13,10 @@ def get_links(url: str, exclude_docs: list) -> set:
Given a HTML url this function scrapes the page for any HTML links (<a> tags) and returns a set of links which:
a) starts with the same base (ie. scheme + netloc)
b) is a relative link from the currently read page
Relative links are converted to absolute links.Note that the absolute link may not be unique due to redirects.
Relative links are converted to absolute links.
Note: The absolute link may not be unique due to redirects. Need to check for redirects in calling function.
:param url: The url to scrape for links.
:param exclude_docs: A list of strings to exclude from the returned links.
"""
response = requests.get(url)
data = response.text
Expand All @@ -39,27 +40,38 @@ def get_links(url: str, exclude_docs: list) -> set:
return links


def get_all_links(url: str, all_links: set, exclude_docs: list):
def get_all_links(url: str, all_links: set, exclude_docs: list, retry_count: int = 0, max_retries: int = 5):
"""
This is a recursive function to find all the sub-pages of a webpage. Given a starting URL the function
recurses through all child links referenced in the page.
Recursive function to find all sub-pages of a webpage.
The all_links set is updated in recursion so no return set is passed.
:param url: The url to scrape for links.
:param all_links: A set of all links found so far.
:param exclude_docs: A list of strings to exclude from the returned links.
:param retry_count: Current retry attempt.
:param max_retries: Maximum number of retries allowed for a single URL.
"""
links = get_links(url=url, exclude_docs=exclude_docs)
for link in links:
# check if the linked page actually exists and get the redirect which is hopefully unique
try:
links = get_links(url=url, exclude_docs=exclude_docs)
for link in links:
# check if the linked page actually exists and get the redirect which is hopefully unique

response = requests.head(link, allow_redirects=True)
if response.ok:
redirect_url = response.url
if redirect_url not in all_links:
print(redirect_url)
all_links.add(redirect_url)
try:
get_all_links(url=redirect_url, all_links=all_links, exclude_docs=exclude_docs)
except Exception as e:
print(e)
print("Retrying")
sleep(5)
response = requests.head(link, allow_redirects=True)
if response.ok:
redirect_url = response.url
if redirect_url not in all_links:
logging.info(redirect_url)
all_links.add(redirect_url)
get_all_links(url=redirect_url, all_links=all_links, exclude_docs=exclude_docs)
except requests.exceptions.ConnectionError as ce:
if retry_count < max_retries:
logging.warning(f"Connection error for {url}: {ce}. Retrying ({retry_count + 1}/{max_retries})")
sleep(2**retry_count) # Exponential backoff
get_all_links(
url=url,
all_links=all_links,
exclude_docs=exclude_docs,
retry_count=retry_count + 1,
max_retries=max_retries,
)
else:
logging.warning(f"Max retries reached for {url}. Skipping this URL.")
Loading

0 comments on commit c5cd78d

Please sign in to comment.