From c5cd78d84827f6499d829f42c7f34396fabc046a Mon Sep 17 00:00:00 2001 From: Ankit Chaurasia <8670962+sunank200@users.noreply.github.com> Date: Tue, 21 Nov 2023 20:49:54 +0545 Subject: [PATCH] Remove the code which is not in scope for this PR --- airflow/dags/ingestion/ask-astro-load.py | 14 ++-- airflow/include/data/schema.json | 2 +- airflow/include/tasks/extract/blogs.py | 34 ++++----- airflow/include/tasks/extract/github.py | 66 ++++++++--------- .../include/tasks/extract/stack_overflow.py | 71 ------------------- .../tasks/extract/utils/html_helpers.py | 56 +++++++++------ .../extract/utils/stack_overflow_helpers.py | 71 ------------------- .../utils/weaviate/ask_astro_weaviate_hook.py | 10 ++- airflow/include/tasks/ingest.py | 4 +- 9 files changed, 96 insertions(+), 232 deletions(-) diff --git a/airflow/dags/ingestion/ask-astro-load.py b/airflow/dags/ingestion/ask-astro-load.py index 97eca3f5..de764fe5 100644 --- a/airflow/dags/ingestion/ask-astro-load.py +++ b/airflow/dags/ingestion/ask-astro-load.py @@ -17,7 +17,7 @@ _WEAVIATE_CONN_ID = f"weaviate_{ask_astro_env}" _GITHUB_CONN_ID = "github_ro" -WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsDevAnkit") +WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsDev") ask_astro_weaviate_hook = AskAstroWeaviateHook(_WEAVIATE_CONN_ID) @@ -65,7 +65,7 @@ def ask_astro_load_bulk(): This DAG performs the initial load of data from sources. If seed_baseline_url (set above) points to a parquet file with pre-embedded data it will be - ingested. Otherwise, new data is extracted, split, embedded and ingested. + ingested. Otherwise, new data is extracted, split, embedded and ingested. The first time this DAG runs (without seeded baseline) it will take at lease 90 minutes to extract data from all sources. Extracted data is then serialized to disk in the project @@ -81,7 +81,7 @@ def get_schema(schema_file: str) -> list: :param schema_file: path to the schema JSON file """ - return ask_astro_weaviate_hook.get_schema(schema_file="include/data/schema.json") + return ask_astro_weaviate_hook.get_schema(schema_file="include/data/schema.json", weaviate_class=WEAVIATE_CLASS) @task.branch def check_schema(class_objects: list) -> list[str]: @@ -178,12 +178,12 @@ def extract_stack_overflow(tag: str, stackoverflow_cutoff_date: str = stackoverf # return df @task(trigger_rule="none_failed") - def extract_github_issues(source: dict): + def extract_github_issues(repo_base: str): try: - df = pd.read_parquet(f"include/data/{source['repo_base']}/issues.parquet") + df = pd.read_parquet(f"include/data/{repo_base}/issues.parquet") except Exception: - df = github.extract_github_issues(source, _GITHUB_CONN_ID) - df.to_parquet(f"include/data/{source['repo_base']}/issues.parquet") + df = github.extract_github_issues(repo_base, _GITHUB_CONN_ID) + df.to_parquet(f"include/data/{repo_base}/issues.parquet") return df diff --git a/airflow/include/data/schema.json b/airflow/include/data/schema.json index 25d0cf1d..d49b9a6f 100644 --- a/airflow/include/data/schema.json +++ b/airflow/include/data/schema.json @@ -54,7 +54,7 @@ "name": "content", "description": "Document content", "dataType": ["text"], - "tokenization": "whitespace", + "tokenization": "word", "moduleConfig": { "text2vec-openai": { "skip": "False", diff --git a/airflow/include/tasks/extract/blogs.py b/airflow/include/tasks/extract/blogs.py index 30aeef70..bec05921 100644 --- a/airflow/include/tasks/extract/blogs.py +++ b/airflow/include/tasks/extract/blogs.py @@ -1,7 +1,6 @@ from __future__ import annotations -import datetime -from urllib.parse import urljoin +from datetime import datetime import pandas as pd import requests @@ -14,7 +13,7 @@ page_url = base_url + "/blog/{page}/#archive" -def extract_astro_blogs(blog_cutoff_date: datetime.date) -> list[pd.DataFrame]: +def extract_astro_blogs(blog_cutoff_date: datetime) -> list[pd.DataFrame]: """ This task downloads Blogs from the Astronomer website and returns a list of pandas dataframes. Return type is a list in order to map to upstream dynamic tasks. @@ -31,34 +30,37 @@ def extract_astro_blogs(blog_cutoff_date: datetime.date) -> list[pd.DataFrame]: headers = {} links = [] + dates = [] page = 1 response = requests.get(page_url.format(page=page), headers=headers) while response.ok: soup = BeautifulSoup(response.text, "lxml") - for card in soup.find_all(class_="post-card__meta"): - blog_date = datetime.datetime.strptime(card.find("time")["datetime"], "%Y-%m-%dT%H:%M:%S.%fZ") - if blog_date.date() >= blog_cutoff_date: - url = urljoin(base_url, card.find("a", href=True)["href"]) - response = requests.head(url, allow_redirects=True) - if response.ok: - links.append( - { - "docLink": response.url, - "title": card.find(class_="title").get_text(), - } - ) + cards = soup.find_all(class_="post-card__cover") + card_links = [base_url + card.find("a", href=True)["href"] for card in cards] + links.extend(card_links) + meta = soup.find_all(class_="post-card__meta") + dates.extend([post.find("time")["datetime"] for post in meta]) page = page + 1 response = requests.get(page_url.format(page=page), headers=headers) - df = pd.DataFrame(links) + df = pd.DataFrame(zip(links, dates), columns=["docLink", "date"]) + + df["date"] = pd.to_datetime(df["date"]).dt.date + df = df[df["date"] > blog_cutoff_date.date()] + df.drop("date", inplace=True, axis=1) df.drop_duplicates(inplace=True) df["content"] = df["docLink"].apply(lambda x: requests.get(x).content) + df["title"] = df["content"].apply( + lambda x: BeautifulSoup(x, "lxml").find(class_="post-card__meta").find(class_="title").get_text() + ) + df["content"] = df["content"].apply(lambda x: BeautifulSoup(x, "lxml").find(class_="prose").get_text()) df["content"] = df.apply(lambda x: blog_format.format(title=x.title, content=x.content), axis=1) + df.drop("title", axis=1, inplace=True) df["sha"] = df["content"].apply(generate_uuid5) df["docSource"] = "astro blog" df.reset_index(drop=True, inplace=True) diff --git a/airflow/include/tasks/extract/github.py b/airflow/include/tasks/extract/github.py index 69501856..cac06619 100644 --- a/airflow/include/tasks/extract/github.py +++ b/airflow/include/tasks/extract/github.py @@ -180,19 +180,14 @@ def extract_github_python(source: dict, github_conn_id: str) -> pd.DataFrame: return df -def extract_github_issues(source: dict, github_conn_id: str) -> pd.DataFrame: +def extract_github_issues(repo_base: str, github_conn_id: str) -> pd.DataFrame: """ This task downloads github issues as markdown documents in a pandas dataframe. Text from templated - auto responses for issues are removed while building a markdown document for each issue. Extraction will - only pull issues after a provided "cutoff_date" AND "cutoff_issue_number". + auto responses for issues are removed while building a markdown document for each issue. - param source: A dictionary specifying what to ingest. - example: - {"repo_base": "apache/airflow", - "cutoff_date": datetime.date.today() - relativedelta(months=1), - "cutoff_issue_number": 30000 - } - type repo_base: dict + param repo_base: The name of of organization/repository (ie. "apache/airflow") from which to extract + issues. + type repo_base: str param github_conn_id: The connection ID to use with the GithubHook param github_conn_id: str @@ -207,8 +202,8 @@ def extract_github_issues(source: dict, github_conn_id: str) -> pd.DataFrame: gh_hook = GithubHook(github_conn_id) - repo = gh_hook.client.get_repo(source["repo_base"]) - issues = repo.get_issues(state="all") + repo = gh_hook.client.get_repo(repo_base) + issues = repo.get_issues() issue_autoresponse_text = "Thanks for opening your first issue here!" pr_autoresponse_text = "Congratulations on your first Pull Request and welcome to the Apache Airflow community!" @@ -248,31 +243,30 @@ def extract_github_issues(source: dict, github_conn_id: str) -> pd.DataFrame: while page: for issue in page: - if issue.updated_at.date() >= source["cutoff_date"] and issue.number >= source["cutoff_issue_number"]: - print(issue.number) - comments = [] - for comment in issue.get_comments(): - if not any(substring in comment.body for substring in drop_content): - comments.append( - comment_markdown_template.format( - user=comment.user.login, date=issue.created_at.strftime("%m-%d-%Y"), body=comment.body - ) + print(issue.number) + comments = [] + for comment in issue.get_comments(): + if not any(substring in comment.body for substring in drop_content): + comments.append( + comment_markdown_template.format( + user=comment.user.login, date=issue.created_at.strftime("%m-%d-%Y"), body=comment.body ) - downloaded_docs.append( - { - "docLink": issue.html_url, - "sha": "", - "content": issue_markdown_template.format( - title=issue.title, - date=issue.created_at.strftime("%m-%d-%Y"), - user=issue.user.login, - state=issue.state, - body=issue.body, - comments="\n".join(comments), - ), - "docSource": f"{source['repo_base']}/issues", - } - ) + ) + downloaded_docs.append( + { + "docLink": issue.html_url, + "sha": "", + "content": issue_markdown_template.format( + title=issue.title, + date=issue.created_at.strftime("%m-%d-%Y"), + user=issue.user.login, + state=issue.state, + body=issue.body, + comments="\n".join(comments), + ), + "docSource": f"{repo_base}/issues", + } + ) page_num = page_num + 1 page = issues.get_page(page_num) diff --git a/airflow/include/tasks/extract/stack_overflow.py b/airflow/include/tasks/extract/stack_overflow.py index d646c963..264edf26 100644 --- a/airflow/include/tasks/extract/stack_overflow.py +++ b/airflow/include/tasks/extract/stack_overflow.py @@ -1,19 +1,13 @@ from __future__ import annotations -import datetime - import pandas as pd -from stackapi import StackAPI from weaviate.util import generate_uuid5 from include.tasks.extract.utils.stack_overflow_helpers import ( process_stack_answers, - process_stack_answers_api, process_stack_comments, - process_stack_comments_api, process_stack_posts, process_stack_questions, - process_stack_questions_api, ) @@ -70,68 +64,3 @@ def extract_stack_overflow_archive(tag: str, stackoverflow_cutoff_date: str) -> df = df[["docSource", "sha", "content", "docLink"]] return df - - -def extract_stack_overflow(tag: str, stackoverflow_cutoff_date: str) -> pd.DataFrame: - """ - This task generates stack overflow documents as a single markdown document per question with associated comments - and answers. The task returns a pandas dataframe with all documents. - - param tag: The tag names to include in extracting from stack overflow. - This is used for populating the 'docSource' - type tag: str - - param stackoverflow_cutoff_date: Only messages from after this date will be extracted. - type stackoverflow_cutoff_date: str - - returned dataframe fields are: - 'docSource': 'stackoverflow' plus the tag name (ie. 'airflow') - 'docLink': URL for the base question. - 'content': The question (plus answers) in markdown format. - 'sha': a UUID based on the other fields. This is for compatibility with other document types. - - """ - - SITE = StackAPI(name="stackoverflow", max_pagesize=100, max_pages=10000000) - - fromdate = datetime.datetime.strptime(stackoverflow_cutoff_date, "%Y-%m-%d") - - # https://api.stackexchange.com/docs/read-filter#filters=!-(5KXGCFLp3w9.-7QsAKFqaf5yFPl**9q*_hsHzYGjJGQ6BxnCMvDYijFE&filter=default&run=true - filter_ = "!-(5KXGCFLp3w9.-7QsAKFqaf5yFPl**9q*_hsHzYGjJGQ6BxnCMvDYijFE" - - questions_dict = SITE.fetch(endpoint="questions", tagged=tag, fromdate=fromdate, filter=filter_) - items = questions_dict.pop("items") - - # TODO: check if we need to paginate - len(items) - # TODO: add backoff logic. For now just fail the task if we can't fetch all results due to api rate limits. - assert not questions_dict["has_more"] - - posts_df = pd.DataFrame(items) - posts_df = posts_df[posts_df["answer_count"] >= 1] - posts_df = posts_df[posts_df["score"] >= 1] - posts_df.reset_index(inplace=True, drop=True) - - # process questions - questions_df = posts_df - questions_df["comments"] = questions_df["comments"].fillna("") - questions_df["question_comments"] = questions_df["comments"].apply(lambda x: process_stack_comments_api(x)) - questions_df = process_stack_questions_api(questions_df=questions_df, tag=tag) - - # process associated answers - answers_df = posts_df.explode("answers").reset_index(drop=True) - answers_df["comments"] = answers_df["answers"].apply(lambda x: x.get("comments")) - answers_df["comments"] = answers_df["comments"].fillna("") - answers_df["answer_comments"] = answers_df["comments"].apply(lambda x: process_stack_comments_api(x)) - answers_df = process_stack_answers_api(answers_df=answers_df) - - # combine questions and answers - df = questions_df.join(answers_df).reset_index(drop=True) - df["content"] = df[["question_text", "answer_text"]].apply("\n".join, axis=1) - - df["sha"] = df.apply(generate_uuid5, axis=1) - - # column order matters for uuid generation - df = df[["docSource", "sha", "content", "docLink"]] - - return df diff --git a/airflow/include/tasks/extract/utils/html_helpers.py b/airflow/include/tasks/extract/utils/html_helpers.py index 070e43a7..34029303 100644 --- a/airflow/include/tasks/extract/utils/html_helpers.py +++ b/airflow/include/tasks/extract/utils/html_helpers.py @@ -1,5 +1,6 @@ from __future__ import annotations +import logging import urllib.parse from time import sleep @@ -12,10 +13,10 @@ def get_links(url: str, exclude_docs: list) -> set: Given a HTML url this function scrapes the page for any HTML links ( tags) and returns a set of links which: a) starts with the same base (ie. scheme + netloc) b) is a relative link from the currently read page + Relative links are converted to absolute links.Note that the absolute link may not be unique due to redirects. - Relative links are converted to absolute links. - - Note: The absolute link may not be unique due to redirects. Need to check for redirects in calling function. + :param url: The url to scrape for links. + :param exclude_docs: A list of strings to exclude from the returned links. """ response = requests.get(url) data = response.text @@ -39,27 +40,38 @@ def get_links(url: str, exclude_docs: list) -> set: return links -def get_all_links(url: str, all_links: set, exclude_docs: list): +def get_all_links(url: str, all_links: set, exclude_docs: list, retry_count: int = 0, max_retries: int = 5): """ - This is a recursive function to find all the sub-pages of a webpage. Given a starting URL the function - recurses through all child links referenced in the page. + Recursive function to find all sub-pages of a webpage. - The all_links set is updated in recursion so no return set is passed. + :param url: The url to scrape for links. + :param all_links: A set of all links found so far. + :param exclude_docs: A list of strings to exclude from the returned links. + :param retry_count: Current retry attempt. + :param max_retries: Maximum number of retries allowed for a single URL. """ - links = get_links(url=url, exclude_docs=exclude_docs) - for link in links: - # check if the linked page actually exists and get the redirect which is hopefully unique + try: + links = get_links(url=url, exclude_docs=exclude_docs) + for link in links: + # check if the linked page actually exists and get the redirect which is hopefully unique - response = requests.head(link, allow_redirects=True) - if response.ok: - redirect_url = response.url - if redirect_url not in all_links: - print(redirect_url) - all_links.add(redirect_url) - try: - get_all_links(url=redirect_url, all_links=all_links, exclude_docs=exclude_docs) - except Exception as e: - print(e) - print("Retrying") - sleep(5) + response = requests.head(link, allow_redirects=True) + if response.ok: + redirect_url = response.url + if redirect_url not in all_links: + logging.info(redirect_url) + all_links.add(redirect_url) get_all_links(url=redirect_url, all_links=all_links, exclude_docs=exclude_docs) + except requests.exceptions.ConnectionError as ce: + if retry_count < max_retries: + logging.warning(f"Connection error for {url}: {ce}. Retrying ({retry_count + 1}/{max_retries})") + sleep(2**retry_count) # Exponential backoff + get_all_links( + url=url, + all_links=all_links, + exclude_docs=exclude_docs, + retry_count=retry_count + 1, + max_retries=max_retries, + ) + else: + logging.warning(f"Max retries reached for {url}. Skipping this URL.") diff --git a/airflow/include/tasks/extract/utils/stack_overflow_helpers.py b/airflow/include/tasks/extract/utils/stack_overflow_helpers.py index 3ae27415..9b171e6e 100644 --- a/airflow/include/tasks/extract/utils/stack_overflow_helpers.py +++ b/airflow/include/tasks/extract/utils/stack_overflow_helpers.py @@ -1,4 +1,3 @@ -import datetime from textwrap import dedent import pandas as pd @@ -191,73 +190,3 @@ def process_stack_answers(posts_df: pd.DataFrame, comments_df: pd.DataFrame) -> answers_df = answers_df.groupby("question_id")["answer_text"].apply(lambda x: "".join(x)) return answers_df - - -def process_stack_comments_api(comments: list) -> str: - """ - This helper function processes a list of slack comments for a question or answer - - param comments: a list of stack overflow comments from the api - type comments: list - - """ - return "".join( - [ - comment_template.format( - user=comment["owner"]["user_id"], - date=datetime.datetime.fromtimestamp(comment["creation_date"]).strftime("%Y-%m-%d"), - score=comment["score"], - body=comment["body_markdown"], - ) - for comment in comments - ] - ) - - -def process_stack_questions_api(questions_df: pd.DataFrame, tag: str) -> pd.DataFrame: - """ - This helper function formats a questions dataframe pulled from slack api. - - The column question_text is created in markdown format based on question_template. - - """ - - # format question content - questions_df["question_text"] = questions_df.apply( - lambda x: question_template.format( - title=x.title, - user=x.owner["user_id"], - date=datetime.datetime.fromtimestamp(x.creation_date).strftime("%Y-%m-%d"), - score=x.score, - body=x.body_markdown, - question_comments=x.question_comments, - ), - axis=1, - ) - questions_df = questions_df[["link", "question_id", "question_text"]].set_index("question_id") - - questions_df["docSource"] = f"stackoverflow {tag}" - - questions_df.rename({"link": "docLink"}, axis=1, inplace=True) - - return questions_df - - -def process_stack_answers_api(answers_df: pd.DataFrame) -> pd.DataFrame: - """ - This helper function formats answers into markdownd documents and returns - a dataframe with question_id as index. - """ - answers_df["answer_text"] = answers_df[["answers", "answer_comments"]].apply( - lambda x: answer_template.format( - score=x.answers["score"], - date=datetime.datetime.fromtimestamp(x.answers["creation_date"]).strftime("%Y-%m-%d"), - user=x.answers["owner"]["user_id"], - body=x.answers["body_markdown"], - answer_comments=x.answer_comments, - ), - axis=1, - ) - answers_df = answers_df.groupby("question_id")["answer_text"].apply(lambda x: "".join(x)) - - return answers_df diff --git a/airflow/include/tasks/extract/utils/weaviate/ask_astro_weaviate_hook.py b/airflow/include/tasks/extract/utils/weaviate/ask_astro_weaviate_hook.py index 0a1702e0..25bc8d83 100644 --- a/airflow/include/tasks/extract/utils/weaviate/ask_astro_weaviate_hook.py +++ b/airflow/include/tasks/extract/utils/weaviate/ask_astro_weaviate_hook.py @@ -2,7 +2,6 @@ import json import logging -import os from typing import Any import pandas as pd @@ -13,8 +12,6 @@ from airflow.exceptions import AirflowException from airflow.providers.weaviate.hooks.weaviate import WeaviateHook -WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsDevAnkit") - class AskAstroWeaviateHook(WeaviateHook): """Extends the WeaviateHook to include specific methods for handling Ask-Astro.""" @@ -24,11 +21,12 @@ def __init__(self, *args, **kwargs): self.logger = logging.getLogger("airflow.task") self.client = self.get_client() - def get_schema(self, schema_file: str) -> list: + def get_schema(self, schema_file: str, weaviate_class: str) -> list: """ Reads and processes the schema from a JSON file. - :param schema_file: path to the schema JSON file + :param schema_file: path to the schema JSON file. + :param weaviate_class: The name of the class to import data. Class should be created with weaviate schema. """ try: with open(schema_file) as file: @@ -42,7 +40,7 @@ def get_schema(self, schema_file: str) -> list: classes = schema_data.get("classes", [schema_data]) for class_object in classes: - class_object.update({"class": WEAVIATE_CLASS}) + class_object.update({"class": weaviate_class}) self.logger.info("Schema processing completed.") return classes diff --git a/airflow/include/tasks/ingest.py b/airflow/include/tasks/ingest.py index e3c98f19..4b4acc3d 100644 --- a/airflow/include/tasks/ingest.py +++ b/airflow/include/tasks/ingest.py @@ -56,7 +56,7 @@ def import_data(dfs: list[pd.DataFrame], class_name: str) -> list: A 'uuid' is generated based on the content and metadata (the git sha, document url, the document source and a concatenation of the headers) and Weaviate will create the vectors. - Any existing documents are skipped. The assumption is that this is a first + Any existing documents are skipped. The assumption is that this is a first import of data and skipping upsert checks will speed up import. param dfs: A list of dataframes from downstream dynamic tasks @@ -97,7 +97,7 @@ def import_baseline(class_name: str, seed_baseline_url: str) -> list: seed_baseline_url is a URI for a parquet file of pre-embedded data. - Any existing documents are replaced. The assumption is that this is a first import of data and older data + Any existing documents are replaced. The assumption is that this is a first import of data and older data should be removed. param class_name: The name of the class to import data. Class should be created with weaviate schema.