-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Create bq_ingest_atlassian_stream_dag.py
- Loading branch information
1 parent
98add37
commit 5831d3e
Showing
1 changed file
with
102 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
from __future__ import print_function | ||
import datetime | ||
import pendulum | ||
import os | ||
import tablib | ||
import pathlib | ||
|
||
import json | ||
import requests | ||
import re | ||
import six | ||
import itertools | ||
import io | ||
import pytz | ||
|
||
from requests.exceptions import HTTPError | ||
|
||
from airflow import models | ||
from airflow.operators.python_operator import PythonOperator | ||
from google.cloud import bigquery | ||
|
||
from galileo import galileo, searchconsole, ga | ||
|
||
|
||
def get_streams(): | ||
# Data streams sources - URL Queries | ||
query_number = ("1", "5", "6", "7", "8", "9", "10") | ||
# url of dashboard from Atlassian | ||
url = "https://dashboard.platform.aus-gov.com.au" | ||
# Get the json response query | ||
for q in query_number: | ||
url_q = url + "/api/queries/" + q | ||
res = requests.get(url_q, auth=('', '')) | ||
# Extract dataset ID from query json | ||
data_res = res.json() | ||
data_id = data_res['latest_query_data_id'] | ||
json_url = url + "/api/queries/" + q + \ | ||
"/results/" + str(data_id) + ".json" | ||
try: | ||
# read the result json file | ||
data_stream = requests.get( | ||
json_url, auth=('', '')) | ||
data_stream.raise_for_status() | ||
except HTTPError as http_err: | ||
print(http_err) | ||
continue | ||
except ValueError as val_err: | ||
print(val_err) | ||
continue | ||
except Exception as err: | ||
print(err) | ||
continue | ||
json_stream = io.StringIO(data_stream.text) | ||
# write the json file to destination | ||
write_bq_table(json_stream, q) | ||
|
||
|
||
def write_bq_table(data_stream, query_number): | ||
client = bigquery.Client() | ||
table_id = "dta-ga-bigquery.dta_customers_ausgov" + \ | ||
".atlassian_dashboard_q" + query_number | ||
|
||
job_config = bigquery.LoadJobConfig( | ||
writeDisposition="WRITE_APPEND", | ||
autodetect=True, | ||
source_format="NEWLINE_DELIMITED_JSON" | ||
) | ||
|
||
job = client.load_table_from_file( | ||
data_stream, table_id, job_config=job_config | ||
) | ||
job.result() | ||
|
||
|
||
default_dag_args = { | ||
# The start_date describes when a DAG is valid / can be run. Set this to a | ||
# fixed point in time rather than dynamically, since it is evaluated every | ||
# time a DAG is parsed. See: | ||
# https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date | ||
'start_date': datetime.datetime(2020, 4, 8), | ||
'retries': 3, | ||
'retry_delay': datetime.timedelta(minutes=10) | ||
# 'start_date': pendulum.create(2020, 1, 15, tz="Australia/Sydney") | ||
} | ||
|
||
with models.DAG( | ||
'ingestion_stream_atlassian', | ||
# schedule_interval=datetime.timedelta(days=1), | ||
schedule_interval='*/30 * * * *', | ||
catchup=False, | ||
default_args=default_dag_args) as dag: | ||
project_id = models.Variable.get('GCP_PROJECT', 'dta-ga-bigquery') | ||
|
||
# Data stream ingestion method call | ||
ingest_stream = PythonOperator( | ||
task_id='ingest_stream', | ||
python_callable=get_streams, | ||
provide_context=False, | ||
dag=dag | ||
) | ||
|
||
ingest_stream |