Skip to content

Commit

Permalink
Merge pull request #288 from datosgobar/280-metadata-catalog-id-filter
Browse files Browse the repository at this point in the history
Filtro de metadatos por catalog_id
  • Loading branch information
lucaslavandeira authored Jun 18, 2018
2 parents 4f82f51 + f2ac5bd commit 20356ed
Show file tree
Hide file tree
Showing 11 changed files with 938 additions and 86 deletions.
3 changes: 2 additions & 1 deletion series_tiempo_ar_api/apps/metadata/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@
'dataset_theme': 'dataset_theme',
'units': 'units',
'dataset_publisher_name': 'dataset_publisher_name',
'dataset_source': 'dataset_source_keyword'
'dataset_source': 'dataset_source_keyword',
'catalog_id': 'catalog_id',
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#! coding: utf-8
from __future__ import print_function

import logging
from elasticsearch.helpers import parallel_bulk

from series_tiempo_ar_api.apps.metadata.indexer.doc_types import Field
from series_tiempo_ar_api.libs.indexing.elastic import ElasticInstance

from . import strings

logger = logging.getLogger(__name__)


class CatalogMetadataIndexer(object):

def __init__(self, data_json, catalog_id, doc_type=Field):
self.data_json = data_json
self.catalog_id = catalog_id
self.doc_type = doc_type
self.elastic = ElasticInstance.get()
logger.info('Hosts de ES: %s', self.elastic.transport.hosts)

def index(self):
logger.info(u'Inicio de la indexación de metadatos de %s', self.catalog_id)

actions = self.scrap_datajson()

self.index_actions(actions)
logger.info(u'Fin de la indexación de metadatos')

def index_actions(self, actions):
for success, info in parallel_bulk(self.elastic, actions):
if not success:
logger.info(strings.INDEXING_ERROR, info)

def scrap_datajson(self):
themes = self.get_themes(self.data_json['themeTaxonomy'])
datasets = {}
actions = []
for field in self.data_json.get_fields(only_time_series=True):
dataset = datasets.setdefault(field['dataset_identifier'],
self.get_dataset(identifier=field['dataset_identifier']))

doc = self.doc_type(
title=field.get('title'),
description=field.get('description'),
id=field.get('id'),
units=field.get('units'),
dataset_title=dataset.get('title'),
dataset_source=dataset.get('source'),
dataset_source_keyword=dataset.get('source'),
dataset_description=dataset.get('description'),
dataset_publisher_name=dataset.get('publisher', {}).get('name'),
dataset_theme=themes.get(dataset.get('theme', [None])[0]),
catalog_id=self.catalog_id
)
actions.append(doc.to_dict(include_meta=True))
return actions

def get_dataset(self, identifier):
for dataset in self.data_json['dataset']:
if dataset['identifier'] == identifier:
return dataset
raise ValueError(u'Identifier no encontrado: {}'.format(identifier))

@staticmethod
def get_themes(theme_taxonomy):
themes = {}
for theme in theme_taxonomy:
themes[theme['id']] = theme['label']

return themes
1 change: 1 addition & 0 deletions series_tiempo_ar_api/apps/metadata/indexer/doc_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class Field(DocType):
dataset_theme = Keyword()
units = Keyword()
dataset_publisher_name = Keyword()
catalog_id = Keyword()

# Guardamos una copia como keyword para poder usar en aggregations
dataset_source = Text()
Expand Down
83 changes: 20 additions & 63 deletions series_tiempo_ar_api/apps/metadata/indexer/metadata_indexer.py
Original file line number Diff line number Diff line change
@@ -1,77 +1,34 @@
#! coding: utf-8
from __future__ import print_function

import logging
from elasticsearch.helpers import parallel_bulk
from pydatajson import DataJson

from series_tiempo_ar_api.apps.management.models import Node
from series_tiempo_ar_api.apps.metadata.indexer.catalog_metadata_indexer import \
CatalogMetadataIndexer
from series_tiempo_ar_api.apps.metadata.indexer.doc_types import Field
from series_tiempo_ar_api.libs.indexing.elastic import ElasticInstance

from . import strings

logger = logging.getLogger(__name__)


class MetadataIndexer(object):
class MetadataIndexer:

def __init__(self, data_json):
self.data_json = data_json
def __init__(self, doc_type=Field):
self.elastic = ElasticInstance.get()
logger.info('Hosts de ES: %s', self.elastic.transport.hosts)

def index(self):
logger.info(u'Inicio de la indexación de metadatos')
self.init_index()

actions = self.scrap_datajson()

self.index_actions(actions)
logger.info(u'Fin de la indexación de metadatos')
self.doc_type = doc_type
self.index = self.doc_type._doc_type.index

# noinspection PyProtectedMember
def init_index(self):
if self.elastic.indices.exists(Field._doc_type.index):
self.elastic.indices.delete(Field._doc_type.index)
Field.init(using=self.elastic)
if not self.elastic.indices.exists(self.index):
self.doc_type.init(using=self.elastic)

def index_actions(self, actions):
for success, info in parallel_bulk(self.elastic, actions):
if not success:
logger.info(strings.INDEXING_ERROR, info)

def scrap_datajson(self):
themes = self.get_themes(self.data_json['themeTaxonomy'])
datasets = {}
actions = []
for field in self.data_json.get_fields(only_time_series=True):
dataset = datasets.setdefault(field['dataset_identifier'],
self.get_dataset(identifier=field['dataset_identifier']))

doc = Field(
title=field.get('title'),
description=field.get('description'),
id=field.get('id'),
units=field.get('units'),
dataset_title=dataset.get('title'),
dataset_source=dataset.get('source'),
dataset_source_keyword=dataset.get('source'),
dataset_description=dataset.get('description'),
dataset_publisher_name=dataset.get('publisher', {}).get('name'),
dataset_theme=themes.get(dataset.get('theme', [None])[0])
)
actions.append(doc.to_dict(include_meta=True))
return actions

def get_dataset(self, identifier):
for dataset in self.data_json['dataset']:
if dataset['identifier'] == identifier:
return dataset
raise ValueError(u'Identifier no encontrado: {}'.format(identifier))

@staticmethod
def get_themes(theme_taxonomy):
themes = {}
for theme in theme_taxonomy:
themes[theme['id']] = theme['label']

return themes
def run(self):
self.init_index()
for node in Node.objects.filter(indexable=True):
try:
data_json = DataJson(node.catalog_url)
CatalogMetadataIndexer(data_json, node.catalog_id, self.doc_type).index()
except Exception as e:
logger.exception(u'Error en la lectura del catálogo: %s', e)

self.elastic.indices.forcemerge(self.index)
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,8 @@

import logging
from django.core.management import BaseCommand
from pydatajson import DataJson

from django_datajsonar.models import Node
from series_tiempo_ar_api.apps.metadata.indexer.metadata_indexer import MetadataIndexer


logger = logging.getLogger(__name__)


Expand All @@ -17,14 +13,4 @@ def add_arguments(self, parser):
parser.add_argument('datajson_url', nargs='*')

def handle(self, *args, **options):
if options['datajson_url']:
urls = options['datajson_url']
else:
urls = [node.catalog_url for node in Node.objects.filter(indexable=True)]

for url in urls:
try:
data_json = DataJson(url)
MetadataIndexer(data_json).index()
except Exception as e:
logger.exception(u'Error en la lectura del catálogo: %s', e)
MetadataIndexer()
47 changes: 47 additions & 0 deletions series_tiempo_ar_api/apps/metadata/tests/indexer_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#! coding: utf-8
import os

from django.test import TestCase

from series_tiempo_ar_api.apps.management.models import Node
from series_tiempo_ar_api.apps.metadata.indexer.doc_types import Field
from series_tiempo_ar_api.apps.metadata.indexer.metadata_indexer import MetadataIndexer
from series_tiempo_ar_api.apps.metadata.tests.tests import SAMPLES_DIR
from series_tiempo_ar_api.libs.indexing.elastic import ElasticInstance

from faker import Faker

fake = Faker()


class IndexerTests(TestCase):
class FakeField(Field):
class Meta:
index = fake.word()

def setUp(self):
self.elastic = ElasticInstance.get()

def test_index_two_nodes(self):
Node(catalog_id='first_catalog',
catalog_url=os.path.join(SAMPLES_DIR, 'single_distribution.json'),
indexable=True).save()
Node(catalog_id='second_catalog',
catalog_url=os.path.join(SAMPLES_DIR, 'second_single_distribution.json'),
indexable=True).save()

MetadataIndexer(doc_type=self.FakeField).run()

first_catalog_fields = self.FakeField.search(using=ElasticInstance.get())\
.filter('term', catalog_id='first_catalog')\
.execute()

second_catalog_fields = self.FakeField.search(using=ElasticInstance.get())\
.filter('term', catalog_id='second_catalog')\
.execute()

self.assertTrue(first_catalog_fields)
self.assertTrue(second_catalog_fields)

def tearDown(self):
self.elastic.indices.delete(self.FakeField._doc_type.index)
Loading

0 comments on commit 20356ed

Please sign in to comment.