Skip to content

Commit

Permalink
Merge pull request #8 from esgf-nimbus/fix_intake_es
Browse files Browse the repository at this point in the history
Fixes intake_es catalog
  • Loading branch information
jasonb5 authored Aug 12, 2023
2 parents af8059a + 8877c11 commit e353a7c
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 55 deletions.
2 changes: 1 addition & 1 deletion dockerfiles/minimal-notebook/Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
.DEFAULT_GOAL = build

VERSION = 0.1.12
VERSION = 0.1.13

run: ARGS = -p 8888:8888 -v $(PWD)/../dask-gateway/conda-envs:/conda-envs

Expand Down
199 changes: 146 additions & 53 deletions dockerfiles/minimal-notebook/intake_es/src/intake_es/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@
logging.basicConfig(level=logging.DEBUG)


class ESCatalogError(Exception):
pass


class NoResultError(ESCatalogError):
pass


class ElasticSearchCatalog(intake.Catalog):
name = "elasticsearch"
container = "xarray"
Expand Down Expand Up @@ -50,7 +58,64 @@ def __init__(
self._df = None
self._entries = {}

def search(self, **kwargs):
@property
def describe_index(self):
try:
return self._client.indices.get(index=self._index).body
except Exception:
raise ESCatalogError("Unable to query information about the index")

@property
def fields(self):
try:
fields = {}
for x, y in self.describe_index[self._index]["mappings"][
"properties"
].items():
try:
fields[x] = list(y["fields"].keys())[0]
except KeyError:
fields[x] = y["type"]
return fields
except Exception:
raise ESCatalogError("Unable to query field mapping for the index")

def field_top(self, field):
aggs = {"unique": {"terms": {"field": field}}}

result = self._client.search(index=self._index, aggs=aggs, size=0)

data = result["aggregations"]["unique"]["buckets"]

return {x["key"]: x["doc_count"] for x in data}

def field_nunique(self, field):
aggs = {
"unique": {
"composite": {"sources": {"values": {"terms": {"field": field}}}}
}
}

result = self._client.search(index=self._index, aggs=aggs, size=0)

data = result["aggregations"]["unique"]["buckets"]
after_key = result["aggregations"]["unique"]["after_key"]

while True:
aggs["unique"]["composite"]["after"] = after_key
result = self._client.search(index=self._index, aggs=aggs, size=0)

new_data = result["aggregations"]["unique"]["buckets"]

if len(new_data) == 0:
break

data += new_data
after_key = result["aggregations"]["unique"]["after_key"]

return {x["key"]["values"]: x["doc_count"] for x in data}

def search(self, dry_run=False, **kwargs):
"""Search the Elasticsearch index.
The values of the search arguments can be a string or list. If a list
Expand All @@ -62,6 +127,67 @@ def search(self, **kwargs):
Examples:
>>> cat.search(activity_drs='ScenarioMIP', variable_id=['pr', 'tas'])
"""
query = self._build_query(**kwargs)

logging.debug(f"Built query {query}")

if dry_run:
result = self._client.count(index=self._index, query=query)

logging.debug(f"Raw result {result}")

return result["count"]
else:
logging.debug(f"Using query {query}")

sort = [{"_doc": "asc"}]

result = self._client.search(
index=self._index, query=query, sort=sort, size=10000
)
data = result["hits"]["hits"]
search_after = result["hits"]["hits"][-1]["sort"]

while True:
result = self._client.search(
index=self._index,
query=query,
sort=sort,
search_after=search_after,
size=10000,
)

new_data = result["hits"]["hits"]

if len(new_data) == 0:
break

search_after = new_data[-1]["sort"]

data += new_data

data = [x["_source"] for x in data]

df = pd.DataFrame(data)

cat = ElasticSearchCatalog(
self._index,
host=self._host,
es_kwargs=self._es_kwargs,
search_kwargs=self._search_kwargs,
skip_client=True,
)

cat._df = df

cat._entries = {
".".join(x.values[:-1].tolist()): NetCDFSource(f'{x["path"]}/*.nc')
for _, x in df.iterrows()
}

return cat

def _build_query(self, **kwargs):
if len(kwargs) == 0:
warnings.warn(
"No search arguments, this may take awhile",
Expand All @@ -76,67 +202,34 @@ def search(self, **kwargs):
else:
self._search_kwargs.update(kwargs)

must_terms = []
filter_terms = []
if len(self._search_kwargs) == 1:
k, v = list(self._search_kwargs.items())[0]

for k, v in self._search_kwargs.items():
if isinstance(v, (list, set)):
filter_terms.append({"terms": {k: v}})
query = {"terms": {k: {"value": v}}}
else:
must_terms.append({"term": {k: v}})

query = {
"bool": {
"must": must_terms,
"filter": filter_terms,
}
}

logging.debug(f"Using query {query}")

sort = [{"_doc": "asc"}]
result = self._client.search(
index=self._index, query=query, sort=sort, size=10000
)
data = [x["_source"] for x in result["hits"]["hits"]]

total_hits = result["hits"]["total"]["value"]
relation = result["hits"]["total"]["relation"]

if total_hits == 10000 and relation == "gte":
logging.debug("Pulling additional documents")
query = {"term": {k: {"value": v}}}
else:
must_terms = []
filter_terms = []

for idx in range(10):
if len(result["hits"]["hits"]) == 0:
break
search_after = result["hits"]["hits"][-1]["sort"]
result = self._client.search(
index=self._index,
query=query,
sort=sort,
search_after=search_after,
size=10000,
)
data += [x["_source"] for x in result["hits"]["hits"]]
for k, v in self._search_kwargs.items():
if isinstance(v, (list, set)):
filter_terms.append({"terms": {k: v}})
else:
must_terms.append({"term": {k: v}})

df = pd.DataFrame(data)
query_bool = {}

cat = ElasticSearchCatalog(
self._index,
host=self._host,
es_kwargs=self._es_kwargs,
search_kwargs=self._search_kwargs,
skip_client=True,
)
if len(must_terms) > 0:
query_bool["must"] = must_terms

cat._df = df
if len(filter_terms) > 0:
query_bool["filter"] = filter_terms

cat._entries = {
".".join(x.values[:-1].tolist()): NetCDFSource(f'{x["path"]}/*.nc')
for _, x in df.iterrows()
}
query = {"bool": query_bool}

return cat
return query

def keys(self):
try:
Expand Down
2 changes: 1 addition & 1 deletion dockerfiles/minimal-notebook/tbump.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[version]
current = "0.1.12"
current = "0.1.13"

regex = '''
(?P<major>\d+)
Expand Down

0 comments on commit e353a7c

Please sign in to comment.