-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtasks.py
executable file
·73 lines (59 loc) · 1.92 KB
/
tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import json
import bson
import click
from os import path
import requests
from requests.auth import HTTPBasicAuth
from app import app, db
def _get_aggreagation(name):
with open('distributions.json') as json_data:
return json.load(json_data)
@click.argument('password')
@click.argument('username')
@app.cli.command()
def import_form_data(
url='http://unisyncgtwy.westeurope.cloudapp.azure.com:4984/leb-winter-prd',
username='',
password='',
collection='submissions'
):
"""Initialize the database."""
click.echo('Staring import from Kobo...')
url = path.join(url, '_all_docs?include_docs=true')
print(url)
data = requests.get(
url,
auth=HTTPBasicAuth(username, password)
).json()
# data = data['rows']
db.connection.get_default_database()[collection].drop()
# db.connection.get_default_database()[collection].insert_many(data)
for row in data['rows']:
if 'doc' in row:
doc = row['doc']
if doc['type'] == 'assessment':
db.connection.get_default_database()[collection].insert_one(doc)
click.echo('{} submissions imported from Kobo'.format(len(data['rows'])))
# @app.cli.command()
# def convert_to_integer(
# collection='submissions',
# field_name='Metadata_section/site_data/sample_tents'
# ):
# for form in db.connection.get_default_database()[collection].find():
# if field_name not in form:
# continue
# form[field_name] = int(form[field_name])
# db.connection.get_default_database()[collection].save(form)
@app.cli.command()
def run_aggregation(
collection='submissions',
file_name='distributions'
):
click.echo(db.connection.get_default_database().command(
'aggregate',
collection,
pipeline=_get_aggreagation(file_name),
allowDiskUse=True,
cursor={},
# explain=True
))