diff --git a/airflow/dags/ingestion/ask-astro-load.py b/airflow/dags/ingestion/ask-astro-load.py index de764fe5..5f8061aa 100644 --- a/airflow/dags/ingestion/ask-astro-load.py +++ b/airflow/dags/ingestion/ask-astro-load.py @@ -31,7 +31,7 @@ {"doc_dir": "code-samples", "repo_base": "astronomer/docs"}, ] issues_docs_sources = [ - {"repo_base": "apache/airflow", "cutoff_date": datetime.date(2020, 1, 1), "cutoff_issue_number": 30000} + "apache/airflow", ] slack_channel_sources = [ { @@ -218,7 +218,7 @@ def extract_astro_blogs(): return [df] md_docs = extract_github_markdown.expand(source=markdown_docs_sources) - issues_docs = extract_github_issues.expand(source=issues_docs_sources) + issues_docs = extract_github_issues.expand(repo_base=issues_docs_sources) stackoverflow_docs = extract_stack_overflow.expand(tag=stackoverflow_tags) # slack_docs = extract_slack_archive.expand(source=slack_channel_sources) registry_cells_docs = extract_astro_registry_cell_types() diff --git a/airflow/include/tasks/extract/utils/weaviate/ask_astro_weaviate_hook.py b/airflow/include/tasks/extract/utils/weaviate/ask_astro_weaviate_hook.py index 25bc8d83..7c5149e8 100644 --- a/airflow/include/tasks/extract/utils/weaviate/ask_astro_weaviate_hook.py +++ b/airflow/include/tasks/extract/utils/weaviate/ask_astro_weaviate_hook.py @@ -254,8 +254,7 @@ def batch_process_data( self.logger.error(f"Failed to add row {row_id} with UUID {uuid}. Error: {e}") batch_errors.append({"row_id": row_id, "uuid": uuid, "error": str(e)}) - results = batch.create_objects() - return batch_errors + [item for result in results for item in result.get("errors", [])], results + return batch_errors def process_batch_errors(self, results: list, verbose: bool) -> list: """ @@ -339,11 +338,11 @@ def ingest_data( self.logger.info(f"Passing {len(df)} objects for ingest.") - batch_errors, results = self.batch_process_data( + batch_errors = self.batch_process_data( df, class_name, uuid_column, vector_column, batch_params, existing, verbose ) - batch_errors += self.process_batch_errors(results, verbose) + batch_errors += self.process_batch_errors(batch_errors, verbose) if existing == "upsert" and batch_errors: self.logger.warning("Error during upsert. Rolling back all inserts.") diff --git a/airflow/include/tasks/ingest.py b/airflow/include/tasks/ingest.py index 4b4acc3d..2ab14dac 100644 --- a/airflow/include/tasks/ingest.py +++ b/airflow/include/tasks/ingest.py @@ -2,90 +2,6 @@ import pandas as pd import requests -from weaviate.util import generate_uuid5 - - -def import_upsert_data(dfs: list[pd.DataFrame], class_name: str, primary_key: str) -> list: - """ - This task concatenates multiple dataframes from upstream dynamic tasks and vectorizes with import to weaviate. - This function is used as a python_callable with the weaviate_import decorator. The returned dictionary is passed - to the WeaviateImportDataOperator for ingest. The operator returns a list of any objects that failed to import. - - A 'uuid' is generated based on the content and metadata (the git sha, document url, the document source and a - concatenation of the headers). - - Any existing documents with the same primary_key but differing UUID or sha will be deleted prior to import. - - param dfs: A list of dataframes from downstream dynamic tasks - type dfs: list[pd.DataFrame] - - param class_name: The name of the class to import data. Class should be created with weaviate schema. - type class_name: str - - param primary_key: The name of a column to use as a primary key for upsert logic. - type primary_key: str - """ - - df = pd.concat(dfs, ignore_index=True) - - df["uuid"] = df.apply(lambda x: generate_uuid5(identifier=x.to_dict(), namespace=class_name), axis=1) - - if df[["docLink", "uuid"]].duplicated().any(): - df.drop_duplicates(subset=["docLink", "uuid"], keep="first", inplace=True) - df.reset_index(drop=True, inplace=True) - - print(f"Passing {len(df)} objects for import.") - - return { - "data": df, - "class_name": class_name, - "existing": "upsert", - "primary_key": primary_key, - "uuid_column": "uuid", - "error_threshold": 0, - "verbose": True, - } - - -def import_data(dfs: list[pd.DataFrame], class_name: str) -> list: - """ - This task concatenates multiple dataframes from upstream dynamic tasks and vectorizes with import to weaviate. - This function is used as a python_callable with the weaviate_import decorator. The returned dictionary is passed - to the WeaviateImportDataOperator for ingest. The operator returns a list of any objects that failed to import. - - A 'uuid' is generated based on the content and metadata (the git sha, document url, the document source and a - concatenation of the headers) and Weaviate will create the vectors. - - Any existing documents are skipped. The assumption is that this is a first - import of data and skipping upsert checks will speed up import. - - param dfs: A list of dataframes from downstream dynamic tasks - type dfs: list[pd.DataFrame] - - param class_name: The name of the class to import data. Class should be created with weaviate schema. - type class_name: str - """ - - df = pd.concat(dfs, ignore_index=True) - - df["uuid"] = df.apply(lambda x: generate_uuid5(identifier=x.to_dict(), namespace=class_name), axis=1) - - if df[["docLink", "uuid"]].duplicated().any(): - df.drop_duplicates(subset=["docLink", "uuid"], keep="first", inplace=True) - df.reset_index(drop=True, inplace=True) - - print(f"Passing {len(df)} objects for import.") - - return { - "data": df, - "class_name": class_name, - "existing": "skip", - "uuid_column": "uuid", - "error_threshold": 0, - "batched_mode": True, - "batch_size": 1000, - "verbose": False, - } def import_baseline(class_name: str, seed_baseline_url: str) -> list: