Skip to content

Commit

Permalink
add filtering options to ckan dataset fetcher
Browse files Browse the repository at this point in the history
  • Loading branch information
OriHoch committed Jul 15, 2024
1 parent c564f18 commit fd7c3ab
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 31 deletions.
9 changes: 7 additions & 2 deletions configuration.template.json
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,16 @@
},
{
"name": "generic_fetcher",
"display": "Fetch/Update data from different source types into a CKAN Package",
"display": "Fetch/Update data from different source types into a CKAN Package. See details about the fetchers in DESCRIPTION of each fetcher at https://github.com/hasadna/datacity-ckan-dgp/blob/main/datacity_ckan_dgp/generic_fetchers/",
"fields": [
{
"name": "source_url",
"display": "Source URL (source type will be inferred from the URL, see https://github.com/hasadna/datacity-ckan-dgp/blob/main/datacity_ckan_dgp/operators/generic_fetcher.py for details)",
"display": "Source URL (source type will be inferred from the URL)",
"type": "text"
},
{
"name": "source_filter",
"display": "Source Filter (optional, value depends on the source type)",
"type": "text"
},
{
Expand Down
144 changes: 119 additions & 25 deletions datacity_ckan_dgp/generic_fetchers/ckan_dataset_fetcher.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,104 @@
import os
import json
import uuid
import shutil

import requests
import dataflows as DF

from .. import ckan
from ..utils import http_stream_download


def fetch(source_url, target_instance_name, target_package_id, target_organization_id, tmpdir):
DESCRIPTION = """
Fetch from CKAN Dataset
URL example: https://data.gov.il/dataset/automated-devices
If source filter is not provided it will copy all resources as-is
If source filter is provided it will do the following:
1. Find the source resource for tabular data - either CSV or XLSX
2. Create the filtered tabular data as a CSV (XLSX will be created by our other automation)
3. If GEOJSON resource is available it will be copied and filtered separately
4. All other resources will be ignored
Local development examples:
without source filter:
python3 -m datacity_ckan_dgp.operators.generic_fetcher '{"source_url": "https://data.gov.il/dataset/automated-devices", "target_instance_name": "LOCAL_DEVELOPMENT", "target_package_id": "automated-devices", "target_organization_id": "israel-gov", "tmpdir": ".data/ckan_fetcher_tmpdir"}'
with source filter:
python3 -m datacity_ckan_dgp.operators.generic_fetcher '{"source_url": "https://data.gov.il/dataset/automated-devices", "target_instance_name": "LOCAL_DEVELOPMENT", "target_package_id": "automated-devices", "target_organization_id": "israel-gov", "tmpdir": ".data/ckan_fetcher_tmpdir", "source_filter": {"City": "חיפה"}}'
"""

DEVEL_SKIP_DOWNLOAD = os.getenv('DEVEL_SKIP_DOWNLOAD', 'false').lower() == 'true'


def get_filtered_tabular_resources_to_update(tmpdir, source_filter, id_, name, format_, hash_, description, filename):
print(f'filtering tabular data from {filename} with format {format_}...')
resources_to_update = []
DF.Flow(
DF.load(f'{tmpdir}/{id_}', name='filtered', format=format_.lower()),
DF.filter_rows(lambda row: all(row.get(k) == v for k, v in source_filter.items())),
DF.printer(),
DF.dump_to_path(f'{tmpdir}/{id_}-filtered')
).process()
with open(f'{tmpdir}/{id_}-filtered/datapackage.json', 'r') as f:
hash_ = json.load(f)['hash']
shutil.copyfile(f'{tmpdir}/{id_}-filtered/filtered.csv', f'{tmpdir}/{id_}')
resources_to_update.append((id_, name, 'CSV', hash_, description, filename))
return resources_to_update


def get_filtered_geojson_resources_to_update(tmpdir, source_filter, id_, name, format_, hash_, description, filename):
print(f'filtering geojson data from {filename} with format {format_}...')
resources_to_update = []
with open(f'{tmpdir}/{id_}', 'r') as f:
data = json.load(f)
features = data.get('features') or []
features = [feature for feature in features if all(feature['properties'].get(k) == v for k, v in source_filter.items())]
data['features'] = features
with open(f'{tmpdir}/{id_}', 'w') as f:
json.dump(data, f)
resources_to_update.append((id_, name, 'GEOJSON', hash_, description, filename))
return resources_to_update


def get_resources_to_update(resources, tmpdir, headers, existing_target_resources, source_filter):
resources_to_update = []
for resource in resources:
id_ = resource.get('id') or ''
url = resource.get('url') or ''
if url and id_:
if 'e.data.gov.il' in url:
url = url.replace('e.data.gov.il', 'data.gov.il')
filename = url.split('/')[-1]
if DEVEL_SKIP_DOWNLOAD:
print(f'skipping download of {filename} from {url}')
source_hash = ''
else:
source_hash = http_stream_download(f'{tmpdir}/{id_}', {'url': url, 'headers': headers})
source_format = resource.get('format') or ''
source_name = resource.get('name') or ''
description = resource.get('description') or ''
if source_filter or existing_target_resources.get(f'{source_name}.{source_format}', {}).get('hash') != source_hash:
resources_to_update.append((id_, source_name, source_format, source_hash, description, filename))
if source_filter:
prefiltered_resources = resources_to_update
resources_to_update = []
names = set(args[1].lower() for args in prefiltered_resources)
for name in names:
print(f'filtering resources for {name}')
source_resources_by_format = {args[2].lower(): args for args in prefiltered_resources if args[1].lower() == name}
if 'csv' in source_resources_by_format:
resources_to_update.extend(get_filtered_tabular_resources_to_update(tmpdir, source_filter, *source_resources_by_format['csv']))
elif 'xlsx' in source_resources_by_format:
resources_to_update.extend(get_filtered_tabular_resources_to_update(tmpdir, source_filter, *source_resources_by_format['xlsx']))
if 'geojson' in source_resources_by_format:
resources_to_update.extend(get_filtered_geojson_resources_to_update(tmpdir, source_filter, *source_resources_by_format['geojson']))
return resources_to_update


def fetch(source_url, target_instance_name, target_package_id, target_organization_id, tmpdir, source_filter):
res = ckan.package_show(target_instance_name, target_package_id)
target_package_exists = False
existing_target_resources = {}
Expand All @@ -17,31 +109,30 @@ def fetch(source_url, target_instance_name, target_package_id, target_organizati
name = resource.get('name') or ''
hash_ = resource.get('hash') or ''
id_ = resource.get('id') or ''
if format_ and name and hash_ and id_:
if format_ and name and id_:
existing_target_resources[f'{name}.{format_}'] = {'hash': hash_, 'id': id_}
source_package_id = source_url.split('/dataset/')[1].split('/')[0]
source_instance_baseurl = source_url.split('/dataset/')[0]
if 'data.gov.il' in source_instance_baseurl:
headers = {'user-agent': 'datagov-external-client'}
else:
headers = None
res = requests.get(f'{source_instance_baseurl}/api/3/action/package_show?id={source_package_id}', headers=headers).json()
assert res['success']
if DEVEL_SKIP_DOWNLOAD:
print('skipping download of package metadata')
with open(f'{tmpdir}/package.json', 'r') as f:
res = json.load(f)
else:
try:
res = requests.get(f'{source_instance_baseurl}/api/3/action/package_show?id={source_package_id}', headers=headers)
res_json = res.json()
assert res_json['success']
except Exception as e:
raise Exception(f'Failed to fetch source package\n{res.text if res else ""}') from e
res = res_json
with open(f'{tmpdir}/package.json', 'w') as f:
json.dump(res, f)
package_title = res['result']['title']
resources_to_update = []
for resource in res['result']['resources']:
id_ = resource.get('id') or ''
url = resource.get('url') or ''
if url and id_:
if 'e.data.gov.il' in url:
url = url.replace('e.data.gov.il', 'data.gov.il')
filename = url.split('/')[-1]
source_hash = http_stream_download(f'{tmpdir}/{id_}', {'url': url, 'headers': headers})
source_format = resource.get('format') or ''
source_name = resource.get('name') or ''
description = resource.get('description') or ''
if existing_target_resources.get(f'{source_name}.{source_format}', {}).get('hash') != source_hash:
resources_to_update.append((id_, source_name, source_format, source_hash, description, filename))
resources_to_update = get_resources_to_update(res['result']['resources'], tmpdir, headers, existing_target_resources, source_filter)
if resources_to_update:
print(f'updating {len(resources_to_update)} resources')
if not target_package_exists:
Expand All @@ -58,13 +149,16 @@ def fetch(source_url, target_instance_name, target_package_id, target_organizati
os.unlink(f'{tmpdir}/{filename}')
os.rename(f'{tmpdir}/{id_}', f'{tmpdir}/{filename}')
if f'{name}.{format_}' in existing_target_resources:
print('existing resource found, but hash is different, updating resource data')
res = ckan.resource_update(target_instance_name, {
'id': existing_target_resources[f'{name}.{format_}']['id'],
'hash': hash_,
'description': description
}, files=[('upload', open(f'{tmpdir}/{filename}', 'rb'))])
assert res['success'], str(res)
if existing_target_resources[f'{name}.{format_}'].get('hash') and existing_target_resources[f'{name}.{format_}']['hash'] == hash_:
print('existing resource found and hash is the same, skipping resource data update')
else:
print('existing resource found, but hash is different, updating resource data')
res = ckan.resource_update(target_instance_name, {
'id': existing_target_resources[f'{name}.{format_}']['id'],
'hash': hash_,
'description': description
}, files=[('upload', open(f'{tmpdir}/{filename}', 'rb'))])
assert res['success'], str(res)
else:
print('no existing resource found, creating new resource')
res = ckan.resource_create(target_instance_name, {
Expand Down
7 changes: 3 additions & 4 deletions datacity_ckan_dgp/operators/generic_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@
from importlib import import_module


# the source url will be checked against the following types in order to determine which type of source it is
FETCHERS = [
{
# python3 -m datacity_ckan_dgp.operators.generic_fetcher '{"source_url": "https://data.gov.il/dataset/automated-devices", "target_instance_name": "LOCAL_DEVELOPMENT", "target_package_id": "automated-devices", "target_organization_id": "israel-gov", "tmpdir": ".data/ckan_fetcher_tmpdir"}'
'fetcher': 'ckan_dataset',
'match': {
'url_contains': '/dataset/'
Expand All @@ -30,21 +28,22 @@ def tempdir(tmpdir):

def operator(name, params):
source_url = params['source_url']
source_filter = params.get('source_filter')
target_instance_name = params['target_instance_name']
target_package_id = params['target_package_id']
target_organization_id = params['target_organization_id']
tmpdir = params.get('tmpdir')
with tempdir(tmpdir) as tmpdir:
print('starting generic_fetcher operator')
print(f'source_url={source_url} target_instance_name={target_instance_name} target_package_id={target_package_id} target_organization_id={target_organization_id}')
print(f'source_filter={source_filter}')
print(f'tmpdir={tmpdir}')
for fetcher in FETCHERS:
assert fetcher['match'].keys() == {'url_contains'}, 'only url_contains match is supported at the moment'
if fetcher['match']['url_contains'] in source_url:
import_module(f'datacity_ckan_dgp.generic_fetchers.{fetcher["fetcher"]}_fetcher').fetch(source_url, target_instance_name, target_package_id, target_organization_id, tmpdir)
import_module(f'datacity_ckan_dgp.generic_fetchers.{fetcher["fetcher"]}_fetcher').fetch(source_url, target_instance_name, target_package_id, target_organization_id, tmpdir, source_filter)
break


# python3 -m datacity_ckan_dgp.operators.generic_fetcher '{"source_url": "https://data.gov.il/dataset/automated-devices", "target_instance_name": "LOCAL_DEVELOPMENT", "target_package_id": "automated-devices", "target_organization_id": "israel-gov", "tmpdir": ".data/ckan_fetcher_tmpdir"}'
if __name__ == '__main__':
operator('_', json.loads(sys.argv[1]))

0 comments on commit fd7c3ab

Please sign in to comment.