Skip to content

Commit

Permalink
Use AskAstroWeaviateHook for github DAG
Browse files Browse the repository at this point in the history
  • Loading branch information
sunank200 committed Nov 20, 2023
1 parent 0d12313 commit 9b0ec5b
Showing 1 changed file with 18 additions and 9 deletions.
27 changes: 18 additions & 9 deletions airflow/dags/ingestion/ask-astro-load-github.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@
import os

from dateutil.relativedelta import relativedelta
from include.tasks import ingest, split
from include.tasks import split
from include.tasks.extract import github
from include.tasks.extract.utils.weaviate.ask_astro_weaviate_hook import AskAstroWeaviateHook

from airflow.decorators import dag, task

ask_astro_env = os.environ.get("ASK_ASTRO_ENV", "")
ask_astro_env = os.environ.get("ASK_ASTRO_ENV", "dev")

_WEAVIATE_CONN_ID = f"weaviate_{ask_astro_env}"
_GITHUB_CONN_ID = "github_ro"
WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsProd")
WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsDev")

ask_astro_weaviate_hook = AskAstroWeaviateHook(_WEAVIATE_CONN_ID)

markdown_docs_sources = [
{"doc_dir": "learn", "repo_base": "astronomer/docs"},
{"doc_dir": "astro", "repo_base": "astronomer/docs"},
Expand Down Expand Up @@ -66,12 +70,17 @@ def ask_astro_load_github():

split_code_docs = task(split.split_python).expand(dfs=[code_samples])

task.weaviate_import(
ingest.import_upsert_data,
weaviate_conn_id=_WEAVIATE_CONN_ID,
).partial(
class_name=WEAVIATE_CLASS, primary_key="docLink"
).expand(dfs=[split_md_docs, split_code_docs])
_import_data = (
task(ask_astro_weaviate_hook.ingest_data, retries=10)
.partial(
class_name=WEAVIATE_CLASS,
existing="upsert",
doc_key="docLink",
batch_params={"batch_size": 1000},
verbose=True,
)
.expand(dfs=[split_md_docs, split_code_docs])
)


ask_astro_load_github()

0 comments on commit 9b0ec5b

Please sign in to comment.