-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
e524921
commit 048156f
Showing
4 changed files
with
116 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
59 changes: 59 additions & 0 deletions
59
dags/bq_scripts_covid19_ausgov/dta_python_atlassian_ingest_data_bq_0.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
# Pyhton 3.6.4 script to ingest data streams from Atlassian dashboard to DTA data warehouse | ||
|
||
# schedule interval for datasets are as follows | ||
# Q1: Total number of unique users : 300s | ||
# Q5: Unique users that trigger action : 1800s | ||
# Q6: Recieved message : 300s | ||
# Q7: Total message in plus out, include all : 300s | ||
# Q8: Hourly active users : 1800s | ||
# Q9: Unique users split by amount of messages sent : 1800s | ||
# Q10: Daily active users : 1800s | ||
|
||
|
||
import json | ||
import requests | ||
import re | ||
import six | ||
import itertools | ||
import urllib | ||
from urllib import Request as Re | ||
|
||
# Data streams sources - URL Queries | ||
query_number = ("1", "5", "6", "7", "8", "9", "10") | ||
|
||
# url of dashboard from Atlassian | ||
url2 = "https://dta:[email protected]" | ||
url = "https://dashboard.platform.aus-gov.com.au" | ||
|
||
# Get the json response query | ||
data_id = [] | ||
|
||
for q in query_number: | ||
url_q = url + "/api/queries/" + q | ||
# req = urllib2.Request(url_q) | ||
# print(req) | ||
res = urllib.urlopen(url_q) | ||
print(res.read()) | ||
# res = requests.get(req) | ||
# data_q = json.loads(req) | ||
data_res = res.read() | ||
# Extract dataset ID from query json | ||
data_id_ = re.search("\"latest_query_data_id\": ([0-9]+),", data_res) | ||
# data_id.append(data_q['latest_query_data_id']) | ||
data_id.append(data_id_) | ||
print(data_id) | ||
|
||
# Format the URL for json file number to fetch json data | ||
# https://dashboard.platform.aus-gov.com.au/api/queries/<query_number>/results/<json_number>.json | ||
|
||
for (i, q) in zip(data_id, query_number): | ||
json_url = url + "/api/queries/" + q + "/results/" + str(i) + ".json" | ||
# read the result json file | ||
# data_j = requests.get(json_url).text | ||
# data_j = json.loads(req) | ||
data_j = urllib.urlopen(json_url) | ||
# change the format of json to newline_delimited_json for ingestion into BQ | ||
# result = [json.dumps(record) for record in json.load(req)] | ||
# write the json file to destination | ||
f = open("temp_json.txt", "w") | ||
f.write(data_j) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
from datetime import datetime, timedelta | ||
|
||
from airflow.models import Variable | ||
from airflow import DAG | ||
from airflow.operators.bash_operator import BashOperator | ||
|
||
private_key = Variable.get("git_deploy_private_key_secret") | ||
repo_url = Variable.get("git_remote_url") | ||
|
||
default_args = { | ||
'owner': 'airflow', | ||
'start_date': datetime(2018, 9, 27, 18, 30), | ||
'retries': 0, | ||
'email': ['[email protected]'], | ||
'email_on_failure': True, | ||
'catchup': False, | ||
'depends_on_past': False, | ||
} | ||
|
||
name = 'git_sync' | ||
|
||
schedule = '* * * * *' | ||
|
||
dag = DAG(name, schedule_interval=schedule, default_args=default_args) | ||
|
||
git_sync_bash = """ | ||
set -e | ||
# setup ssh key and command for git | ||
pwd=$(pwd) | ||
cat <<EOF > $pwd/id_rsa | ||
{private_key} | ||
EOF | ||
chmod 0600 $pwd/id_rsa | ||
cat <<EOF > $pwd/ssh | ||
ssh -i $pwd/id_rsa -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no \\$* | ||
EOF | ||
chmod +x $pwd/ssh | ||
export GIT_SSH="$pwd/ssh" | ||
# clone repo | ||
repo_url='{repo_url}' | ||
repo_dir=$(basename $repo_url .git) | ||
git clone --depth 1 $repo_url | ||
# gsutil rsync into the gcs bucket dags/git dir | ||
gsutil -m rsync -x "\\.git\\/.*$" -d -r $repo_dir gs://$GCS_BUCKET/dags/git | ||
""".format(private_key=private_key, repo_url=repo_url) | ||
|
||
t1 = BashOperator( | ||
task_id= 'git_pull', | ||
bash_command=git_sync_bash, | ||
dag=dag | ||
) |