Skip to content
This repository has been archived by the owner on Nov 6, 2023. It is now read-only.

Commit

Permalink
feat: add support for opensearch (#238)
Browse files Browse the repository at this point in the history
* add support for opensearch

* make auth optional

* use proper http_auth arg

---------

Co-authored-by: Manuel Garrido <[email protected]>
  • Loading branch information
manugarri and Manuel Garrido authored Oct 12, 2023
1 parent cb35607 commit 4eab947
Show file tree
Hide file tree
Showing 13 changed files with 464 additions and 3 deletions.
Empty file.
152 changes: 152 additions & 0 deletions odd_collector/adapters/opensearch/adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
from typing import Optional
from urllib.parse import urlparse

from funcy import get_in, get_lax
from odd_collector_sdk.domain.adapter import BaseAdapter
from odd_models.models import DataEntity, DataEntityList
from oddrn_generator import ElasticSearchGenerator, Generator

from odd_collector.domain.plugin import ElasticsearchPlugin

from .client import Client
from .logger import logger
from .mappers.indices import map_index
from .mappers.stream import map_data_stream
from .mappers.template import TemplateEntity, map_template


class Adapter(BaseAdapter):
config: ElasticsearchPlugin
generator: ElasticSearchGenerator

def __init__(self, config: ElasticsearchPlugin) -> None:
super().__init__(config)
self.client = Client(config)

def create_generator(self) -> Generator:
return ElasticSearchGenerator(host_settings=urlparse(self.config.host).netloc)

def get_data_entity_list(self) -> DataEntityList:
return DataEntityList(
data_source_oddrn=self.get_data_source_oddrn(),
items=list(self.get_datasets()),
)

def get_datasets(self) -> list[DataEntity]:
logger.debug(
f"Start collecting datasets from Elasticsearch at {self.config.host} with port {self.config.port}"
)

indices = self.client.get_indices("*")
templates = self.client.get_index_template("*")

mappings = self.client.get_mapping()
data_streams = self.client.get_data_streams()

indices = [
index for index in indices if not index["index"].startswith(".internal")
]
logger.success(f"Got {len(indices)} indices")

index_by_names = {index["index"]: index for index in indices}
templates_by_names = {
tmpl["name"]: tmpl for tmpl in templates if not tmpl["name"].startswith(".")
}
streams_by_names = {stream["name"]: stream for stream in data_streams}
mappings_by_names = dict(mappings.items())

indices_entities: dict[str, DataEntity] = {}
for index_name, index in index_by_names.items():
indices_entities[index_name] = map_index(
index=index,
generator=self.generator,
properties=get_in(
mappings_by_names,
[index_name, "mappings", "properties"],
default={},
),
)

# map templates
template_entities: dict[str, TemplateEntity] = {}
for tmpl_name, tmpl in templates_by_names.items():
data_entity = map_template(tmpl, self.generator)
pattern = tmpl["index_template"]["index_patterns"]

# Here we are trying to get all indices that match the pattern
# to show that current template works with index
# But if we can't get them, we just skip
try:
for index_name in self.client.get_indices(index=pattern, h="index"):
if index_entity := indices_entities.get(index_name["index"]):
data_entity.add_output(index_entity)
except Exception as e:
logger.warning(e)
continue

template_entities[tmpl_name] = data_entity

# map data streams
stream_entities = {}
for stream_name, stream in streams_by_names.items():
stream_data_entity = map_data_stream(stream, self.generator)
stream_entities[stream_name] = stream_data_entity

if template_entity := template_entities.get(stream["template"]):
template_entity.add_input(stream_data_entity)

return [
*indices_entities.values(),
*stream_entities.values(),
*template_entities.values(),
]

# TODO: implement mapping rollover policies
def _get_rollover_policy(self, stream_data: dict) -> Optional[dict]:
try:
backing_indices = [
index_info["index_name"] for index_info in stream_data["indices"]
]
for index in backing_indices:
index_settings = self.client.get_indices(index)
lifecycle_policy = get_lax(
index_settings, [index, "settings", "index", "lifecycle"]
)

if lifecycle_policy:
logger.debug(
f"Index {index} has Lifecycle Policy {lifecycle_policy['name']}"
)
lifecycle_policy_data = self.client.ilm.get_lifecycle(
name=lifecycle_policy["name"]
)

logger.debug(f"Lifecycle policy metadata {lifecycle_policy_data}")

rollover = get_lax(
lifecycle_policy_data,
[
lifecycle_policy["name"],
"policy",
"phases",
"hot",
"actions",
"rollover",
],
)

if rollover is not None:
max_size = rollover.get("max_size")
max_age = rollover.get("max_age")
else:
max_size = None
max_age = None

lifecycle_metadata = {"max_age": max_age, "max_size": max_size}
return lifecycle_metadata
else:
logger.debug(f"No lifecycle policy exists for this index {index}.")
return None
except KeyError:
logger.debug(f"Incorrect fields. Got fields: {stream_data}")
return None
41 changes: 41 additions & 0 deletions odd_collector/adapters/opensearch/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from typing import Optional

from opensearchpy import OpenSearch

from odd_collector.domain.plugin import OpensearchPlugin


class Client:
def __init__(self, config: OpensearchPlugin):
auth = None
if config.username and config.password:
username, password = config.username, config.password.get_secret_value()
auth = (username, password)


self._os = OpenSearch(
hosts=[f"{config.host}:{config.port}"],
http_auth=auth,
verify_certs=config.verify_certs,
ca_certs=config.ca_certs,
use_ssl=config.use_ssl,
)
assert self._os.ping()

def get_indices(self, index: Optional[str] = None, h=None) -> list:
return self._os.cat.indices(format="json", index=index, h=h)

def get_mapping(self, index_name: Optional[str] = None) -> dict:
return self._os.indices.get_mapping(index=index_name)

def get_index_settings(self, index_name: str) -> dict:
return self._os.indices.get_settings(index=index_name)

def get_data_streams(self, name: Optional[str] = None) -> dict:
response = self._os.indices.get_data_stream(name=name)
return response["data_streams"]

def get_index_template(self, template_name: str) -> list[dict]:
return self._os.indices.get_index_template(name=template_name).get(
"index_templates"
)
3 changes: 3 additions & 0 deletions odd_collector/adapters/opensearch/logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from odd_collector_sdk.logger import logger

logger = logger
Empty file.
83 changes: 83 additions & 0 deletions odd_collector/adapters/opensearch/mappers/fields.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from typing import Any, Dict

from odd_models.models import DataSetField, DataSetFieldType, Type
from oddrn_generator import ElasticSearchGenerator

# As of ElasticSearch 7.x supported fields are listed here
# https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-types.html#
TYPES_ELASTIC_TO_ODD = {
"blob": Type.TYPE_STRING,
"boolean": Type.TYPE_BOOLEAN,
"constant_keyword": Type.TYPE_STRING,
"date": Type.TYPE_DATETIME,
"date_nanos": Type.TYPE_INTEGER,
"double": Type.TYPE_NUMBER,
"float": Type.TYPE_NUMBER,
"geo_point": Type.TYPE_MAP,
"flattened": Type.TYPE_MAP,
"half_float": Type.TYPE_NUMBER,
"integer": Type.TYPE_INTEGER,
"ip": Type.TYPE_STRING,
"keyword": Type.TYPE_STRING,
"long": Type.TYPE_INTEGER,
"nested": Type.TYPE_LIST,
"object": Type.TYPE_MAP,
"text": Type.TYPE_STRING,
"wildcard": Type.TYPE_STRING,
}


def is_logical(type_property: str) -> bool:
return type_property == "boolean"


def __get_field_type(props: Dict[str, Any]) -> str:
"""
Sample mapping for field types
{'@timestamp' : {'type' : "alias","path" : "timestamp"},
'timestamp" : {"type" : "date"},
'bool_var': {'type': 'boolean'},
'data_stream': {'properties': {'dataset': {'type': 'constant_keyword'},
'namespace': {'type': 'constant_keyword'},
'type': {'type': 'constant_keyword',
'value': 'logs'}}},
'event1': {'properties': {'dataset': {'ignore_above': 1024, 'type': 'keyword'}}},
'event2': {'properties': {'dataset': {'ignore_above': 1024, 'type': 'constant_keyword'}}},
'event3': {'properties': {'dataset': {'ignore_above': 1024, 'type': 'wildcard'}}},
'host': {'type': 'object'},
'int_field': {'type': 'long'},
'float_field': {'type': 'float'},
"""
if "type" in props:
return props["type"]
elif "properties" in props:
return "object"
else:
return "unknown"


def map_field(
field_name: str,
field_metadata: dict,
oddrn_generator: ElasticSearchGenerator,
path: str,
) -> DataSetField:
data_type: str = __get_field_type(field_metadata)
oddrn_path: str = oddrn_generator.get_oddrn_by_path(path, field_name)
field_type = TYPES_ELASTIC_TO_ODD.get(data_type, Type.TYPE_UNKNOWN)

dsf: DataSetField = DataSetField(
oddrn=oddrn_path,
name=field_name,
metadata=[],
type=DataSetFieldType(
type=field_type,
logical_type=data_type,
is_nullable=True,
),
default_value=None,
description=None,
owner=None,
)

return dsf
30 changes: 30 additions & 0 deletions odd_collector/adapters/opensearch/mappers/indices.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from odd_models.models import DataEntity, DataEntityType, DataSet
from oddrn_generator import ElasticSearchGenerator

from odd_collector.adapters.elasticsearch.mappers.fields import map_field
from odd_collector.adapters.elasticsearch.mappers.metadata import extract_index_metadata


def map_index(
index: dict,
properties: dict,
generator: ElasticSearchGenerator,
) -> DataEntity:
generator.set_oddrn_paths(indices=index["index"])
index_oddrn = generator.get_oddrn_by_path("indices")

# field type with `@` prefix defines alias for another field in same index
field_list = [
map_field(name, value, generator, "indices_fields")
for name, value in properties.items()
if not name.startswith("@")
]

return DataEntity(
oddrn=index_oddrn,
name=index["index"],
owner=None,
type=DataEntityType.TABLE,
metadata=[extract_index_metadata(index)],
dataset=DataSet(parent_oddrn=None, rows_number=0, field_list=field_list),
)
44 changes: 44 additions & 0 deletions odd_collector/adapters/opensearch/mappers/metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import json
from dataclasses import dataclass
from typing import Any

from funcy import walk_values
from odd_collector_sdk.utils.metadata import (
DefinitionType,
HasMetadata,
MetadataExtension,
extract_metadata,
)

from ..logger import logger


@dataclass
class MetadataWrapper(HasMetadata):
odd_metadata: dict[str, Any]


def extract_index_metadata(data: dict[str, Any]) -> MetadataExtension:
meta_wrapper = MetadataWrapper(odd_metadata=data)
return extract_metadata("elasticsearch", meta_wrapper, DefinitionType.DATASET)


def extract_template_metadata(data: dict[str, Any]) -> MetadataExtension:
metadata = data

try:
metadata = walk_values(json.dumps, metadata)
except Exception as e:
logger.warning(f"Can't convert template metadata to json. {str(e)}")
logger.debug(f"Template metadata: {data!r}")

meta_wrapper = MetadataWrapper(odd_metadata=metadata)

return extract_metadata("elasticsearch", meta_wrapper, DefinitionType.DATASET)


def extract_data_stream_metadata(data: dict[str, Any]) -> MetadataExtension:
meta_wrapper = MetadataWrapper(odd_metadata=data)
return extract_metadata(
"elasticsearch", meta_wrapper, DefinitionType.DATASET, flatten=True
)
22 changes: 22 additions & 0 deletions odd_collector/adapters/opensearch/mappers/stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from odd_models.models import DataEntity, DataEntityType, DataSet
from oddrn_generator import ElasticSearchGenerator

from .metadata import extract_data_stream_metadata


def map_data_stream(
stream_data: dict,
generator: ElasticSearchGenerator,
) -> DataEntity:
generator.set_oddrn_paths(streams=stream_data["name"])
stream_oddrn = generator.get_oddrn_by_path("streams")

return DataEntity(
oddrn=stream_oddrn,
name=stream_data["name"],
owner=None,
# TODO: Change to stream type
type=DataEntityType.FILE,
metadata=[extract_data_stream_metadata(stream_data)],
dataset=DataSet(parent_oddrn=None, rows_number=0, field_list=[]),
)
Loading

0 comments on commit 4eab947

Please sign in to comment.