Skip to content

Commit

Permalink
add pythonvirtualenvoperator
Browse files Browse the repository at this point in the history
  • Loading branch information
kieronellis committed Jun 24, 2024
1 parent 26882ed commit eb58b7b
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 19 deletions.
2 changes: 2 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ RUN pip install pandas-gbq
RUN pip install pytest
RUN pip install PyGithub

RUN pip install virtualenv

RUN mkdir /action
COPY dag_validation.py /action/dag_validation.py
COPY alert.py /action/alert.py
Expand Down
56 changes: 37 additions & 19 deletions tests/dags/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
KubernetesPodOperator,
)
from airflow.operators.python import PythonVirtualenvOperator

from shared_var import image
import numpy as np
Expand Down Expand Up @@ -51,6 +52,8 @@ def test_access_var():
print("my var message : {}".format(my_var))
return ("Access Var Success!")

def get_clv():
return ("CLV Success!")

access_var = PythonOperator(
task_id = 'test_access_var',
Expand All @@ -65,22 +68,37 @@ def test_access_var():
dag = dag
)

r_value = '{"foo": "bar"\n, "buzz": 2}'

k8s_image = KubernetesPodOperator(
namespace="default",
image=image,
cmds=["bash", "-cx"],
arguments=["echo '{}' > /airflow/xcom/return.json".format(r_value)],
name="test-k8s-xcom-sidecar",
task_id="task-test",
labels={"dag": "test-xcom-sidecar"},
get_logs=True,
do_xcom_push=True,
is_delete_operator_pod=True,
log_events_on_failure=True,
dag=dag
)
BaseHook.get_connection("test_conn")

access_var >> import_module >> k8s_image


virtual_env = PythonVirtualenvOperator(
task_id="add_gcp_connection_python_new",
python_callable=get_clv,
system_site_packages=False,
requirements=[
"apache-airflow",
"apache-airflow-providers-google==5.0.0",
"Lifetimes==0.11.3",
],
)


access_var >> import_module >> virtual_env

# r_value = '{"foo": "bar"\n, "buzz": 2}'

# k8s_image = KubernetesPodOperator(
# namespace="default",
# image=image,
# cmds=["bash", "-cx"],
# arguments=["echo '{}' > /airflow/xcom/return.json".format(r_value)],
# name="test-k8s-xcom-sidecar",
# task_id="task-test",
# labels={"dag": "test-xcom-sidecar"},
# get_logs=True,
# do_xcom_push=True,
# is_delete_operator_pod=True,
# log_events_on_failure=True,
# dag=dag
# )
# BaseHook.get_connection("test_conn")
# access_var >> import_module >> k8s_image

0 comments on commit eb58b7b

Please sign in to comment.