-
Notifications
You must be signed in to change notification settings - Fork 0
/
tutorial_xcom.py
64 lines (55 loc) · 1.69 KB
/
tutorial_xcom.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# -*- coding: utf-8 -*-
import airflow
from airflow import DAG
from datetime import datetime, timedelta
from airflow.hooks.jdbc_hook import JdbcHook
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
import requests
from airflow.models import Variable
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(0),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'xcomtest',
default_args=default_args,
description='xcomtest',
schedule_interval="@once",
)
# 注入 **kwargs ,从**kwargs 中获取外界传来的数据;
def dateEnd(ds, **kwargs):
return kwargs['dag_run'].conf['date_end']
def dateStart(ds, **kwargs):
r = requests.get(Variable.get("http_url_test"))
print(r.content)
return kwargs['dag_run'].conf['date_start']
def getMacLists(ds, **kwargs):
businessid = kwargs['dag_run'].conf['businessid']
shopid = kwargs['dag_run'].conf['shopid']
mac = kwargs['dag_run'].conf['mac']
return "getMacLists_result"
t1 = PythonOperator(
task_id='dateStart',
provide_context=True,
python_callable=dateStart,
dag=dag)
t2 = PythonOperator(
task_id='dateEnd',
provide_context=True,
python_callable=dateEnd,
dag=dag)
t3 = PythonOperator(
task_id='getMacLists',
provide_context=True,
python_callable=getMacLists,
dag=dag)
t4 = BashOperator(
task_id='xcomview',
bash_command="echo {{ task_instance.xcom_pull(task_ids='getMacLists') }} {{ task_instance.xcom_pull(task_ids='dateStart') }} {{ task_instance.xcom_pull(task_ids='dateEnd') }} ",
dag=dag,
)
[t1, t2, t3] >> t4