diff --git a/elementary/monitor/api/groups/groups.py b/elementary/monitor/api/groups/groups.py
index 083da6d78..ba1d8a804 100644
--- a/elementary/monitor/api/groups/groups.py
+++ b/elementary/monitor/api/groups/groups.py
@@ -14,6 +14,7 @@
from elementary.monitor.api.models.schema import (
NormalizedExposureSchema,
NormalizedModelSchema,
+ NormalizedSeedSchema,
NormalizedSourceSchema,
)
from elementary.monitor.fetchers.tests.schema import NormalizedTestSchema
@@ -28,6 +29,7 @@
NormalizedSourceSchema,
NormalizedExposureSchema,
NormalizedTestSchema,
+ NormalizedSeedSchema,
]
diff --git a/elementary/monitor/api/lineage/schema.py b/elementary/monitor/api/lineage/schema.py
index b24270ad4..e8c341bbf 100644
--- a/elementary/monitor/api/lineage/schema.py
+++ b/elementary/monitor/api/lineage/schema.py
@@ -1,4 +1,3 @@
-import re
from typing import List, Literal, Optional, Tuple
import networkx as nx
@@ -6,13 +5,10 @@
from elementary.utils.pydantic_shim import BaseModel, validator
NodeUniqueIdType = str
-NodeType = Literal["model", "source", "exposure"]
+NodeType = Literal["seed", "model", "source", "exposure"]
NodeSubType = Literal["table", "view"]
-_SEED_PATH_PATTERN = re.compile(r"^seed\.")
-
-
class LineageNodeSchema(BaseModel):
id: NodeUniqueIdType
type: NodeType
@@ -51,15 +47,4 @@ class NodeDependsOnNodesSchema(BaseModel):
@validator("depends_on_nodes", pre=True, always=True)
def set_depends_on_nodes(cls, depends_on_nodes):
formatted_depends_on = depends_on_nodes or []
- formatted_depends_on = [
- cls._format_node_id(node_id) for node_id in formatted_depends_on
- ]
return [node_id for node_id in formatted_depends_on if node_id]
-
- @classmethod
- def _format_node_id(cls, node_id: str):
- # Currently we don't save seeds in our artifacts.
- # We remove seeds from the lineage graph (as long as we don't support them).
- if re.search(_SEED_PATH_PATTERN, node_id):
- return None
- return node_id
diff --git a/elementary/monitor/api/models/models.py b/elementary/monitor/api/models/models.py
index 8fd4f7b7b..5d9c54d15 100644
--- a/elementary/monitor/api/models/models.py
+++ b/elementary/monitor/api/models/models.py
@@ -2,7 +2,7 @@
import os
import statistics
from collections import defaultdict
-from typing import Dict, List, Optional, Set, Union, overload
+from typing import Dict, List, Optional, Set, Union, cast, overload
from elementary.clients.api.api_client import APIClient
from elementary.clients.dbt.base_dbt_runner import BaseDbtRunner
@@ -13,6 +13,7 @@
ModelRunsWithTotalsSchema,
NormalizedExposureSchema,
NormalizedModelSchema,
+ NormalizedSeedSchema,
NormalizedSourceSchema,
TotalsModelRunsSchema,
)
@@ -22,7 +23,11 @@
from elementary.monitor.fetchers.models.schema import (
ModelRunSchema as FetcherModelRunSchema,
)
-from elementary.monitor.fetchers.models.schema import ModelSchema, SourceSchema
+from elementary.monitor.fetchers.models.schema import (
+ ModelSchema,
+ SeedSchema,
+ SourceSchema,
+)
from elementary.utils.log import get_logger
logger = get_logger(__name__)
@@ -30,6 +35,7 @@
class ModelsAPI(APIClient):
_ARTIFACT_TYPE_DIR_MAP = {
+ SeedSchema: "seeds",
SourceSchema: "sources",
ModelSchema: "models",
ExposureSchema: "exposures",
@@ -117,6 +123,16 @@ def _get_model_runs_totals(
success_runs = len([run for run in runs if run.status == "success"])
return TotalsModelRunsSchema(errors=error_runs, success=success_runs)
+ def get_seeds(self) -> Dict[str, NormalizedSeedSchema]:
+ seed_results = self.models_fetcher.get_seeds()
+ seeds = dict()
+ if seed_results:
+ for seed_result in seed_results:
+ normalized_seed = self._normalize_dbt_artifact_dict(seed_result)
+ seed_unique_id = cast(str, normalized_seed.unique_id)
+ seeds[seed_unique_id] = normalized_seed
+ return seeds
+
def get_models(
self, exclude_elementary_models: bool = False
) -> Dict[str, NormalizedModelSchema]:
@@ -127,12 +143,7 @@ def get_models(
if models_results:
for model_result in models_results:
normalized_model = self._normalize_dbt_artifact_dict(model_result)
-
- model_unique_id = normalized_model.unique_id
- if model_unique_id is None:
- # Shouldn't happen, but handling this case for mypy
- continue
-
+ model_unique_id = cast(str, normalized_model.unique_id)
models[model_unique_id] = normalized_model
return models
@@ -222,6 +233,12 @@ def _exposure_has_upstream_node(
for dep in exposure.depends_on_nodes
)
+ @overload
+ def _normalize_dbt_artifact_dict(
+ self, artifact: SeedSchema
+ ) -> NormalizedSeedSchema:
+ ...
+
@overload
def _normalize_dbt_artifact_dict(
self, artifact: ModelSchema
@@ -241,9 +258,15 @@ def _normalize_dbt_artifact_dict(
...
def _normalize_dbt_artifact_dict(
- self, artifact: Union[ModelSchema, ExposureSchema, SourceSchema]
- ) -> Union[NormalizedModelSchema, NormalizedExposureSchema, NormalizedSourceSchema]:
+ self, artifact: Union[SeedSchema, ModelSchema, ExposureSchema, SourceSchema]
+ ) -> Union[
+ NormalizedSeedSchema,
+ NormalizedModelSchema,
+ NormalizedExposureSchema,
+ NormalizedSourceSchema,
+ ]:
schema_to_normalized_schema_map = {
+ SeedSchema: NormalizedSeedSchema,
ExposureSchema: NormalizedExposureSchema,
ModelSchema: NormalizedModelSchema,
SourceSchema: NormalizedSourceSchema,
@@ -285,7 +308,7 @@ def _normalize_artifact_path(cls, artifact: ArtifactSchemaType, fqn: str) -> str
@classmethod
def _fqn(
cls,
- artifact: Union[ModelSchema, ExposureSchema, SourceSchema],
+ artifact: Union[ModelSchema, ExposureSchema, SourceSchema, SeedSchema],
) -> str:
if isinstance(artifact, ExposureSchema):
path = (artifact.meta or {}).get("path")
diff --git a/elementary/monitor/api/models/schema.py b/elementary/monitor/api/models/schema.py
index c1bc1c694..f9bbf9f19 100644
--- a/elementary/monitor/api/models/schema.py
+++ b/elementary/monitor/api/models/schema.py
@@ -6,6 +6,7 @@
from elementary.monitor.fetchers.models.schema import (
ExposureSchema,
ModelSchema,
+ SeedSchema,
SourceSchema,
)
from elementary.utils.pydantic_shim import BaseModel, Field, validator
@@ -35,6 +36,11 @@ def format_normalized_full_path_sep(cls, normalized_full_path: str) -> str:
return posixpath.sep.join(normalized_full_path.split(os.path.sep))
+# NormalizedArtifactSchema must be first in the inheritance order
+class NormalizedSeedSchema(NormalizedArtifactSchema, SeedSchema):
+ artifact_type: str = Field("seed", const=True) # type: ignore # noqa
+
+
# NormalizedArtifactSchema must be first in the inheritance order
class NormalizedModelSchema(NormalizedArtifactSchema, ModelSchema):
artifact_type: str = Field("model", const=True) # type: ignore # noqa
diff --git a/elementary/monitor/api/report/report.py b/elementary/monitor/api/report/report.py
index 02a747816..df09cb410 100644
--- a/elementary/monitor/api/report/report.py
+++ b/elementary/monitor/api/report/report.py
@@ -13,6 +13,7 @@
ModelRunsSchema,
NormalizedExposureSchema,
NormalizedModelSchema,
+ NormalizedSeedSchema,
NormalizedSourceSchema,
)
from elementary.monitor.api.report.schema import ReportDataEnvSchema, ReportDataSchema
@@ -41,11 +42,12 @@ def _get_groups(
models: Iterable[NormalizedModelSchema],
sources: Iterable[NormalizedSourceSchema],
exposures: Iterable[NormalizedExposureSchema],
+ seeds: Iterable[NormalizedSeedSchema],
singular_tests: Iterable[NormalizedTestSchema],
) -> GroupsSchema:
groups_api = GroupsAPI(self.dbt_runner)
return groups_api.get_groups(
- artifacts=[*models, *sources, *exposures, *singular_tests]
+ artifacts=[*models, *sources, *exposures, *seeds, *singular_tests]
)
def get_report_data(
@@ -78,6 +80,8 @@ def get_report_data(
invocations_api = InvocationsAPI(dbt_runner=self.dbt_runner)
lineage_node_ids: List[str] = []
+ seeds = models_api.get_seeds()
+ lineage_node_ids.extend(seeds.keys())
models = models_api.get_models(exclude_elementary_models)
lineage_node_ids.extend(models.keys())
sources = models_api.get_sources()
@@ -87,7 +91,11 @@ def get_report_data(
singular_tests = tests_api.get_singular_tests()
groups = self._get_groups(
- models.values(), sources.values(), exposures.values(), singular_tests
+ models.values(),
+ sources.values(),
+ exposures.values(),
+ seeds.values(),
+ singular_tests,
)
models_runs = models_api.get_models_runs(
@@ -133,7 +141,9 @@ def get_report_data(
)
serializable_groups = groups.dict()
- serializable_models = self._serialize_models(models, sources, exposures)
+ serializable_models = self._serialize_models(
+ models, sources, exposures, seeds
+ )
serializable_model_runs = self._serialize_models_runs(models_runs.runs)
serializable_model_runs_totals = models_runs.dict(include={"totals"})[
"totals"
@@ -191,8 +201,9 @@ def _serialize_models(
models: Dict[str, NormalizedModelSchema],
sources: Dict[str, NormalizedSourceSchema],
exposures: Dict[str, NormalizedExposureSchema],
+ seeds: Dict[str, NormalizedSeedSchema],
) -> Dict[str, dict]:
- nodes = dict(**models, **sources, **exposures)
+ nodes = dict(**models, **sources, **exposures, **seeds)
serializable_nodes = dict()
for key in nodes.keys():
serializable_nodes[key] = dict(nodes[key])
diff --git a/elementary/monitor/data_monitoring/report/index.html b/elementary/monitor/data_monitoring/report/index.html
index 462ab1af5..04d502768 100644
--- a/elementary/monitor/data_monitoring/report/index.html
+++ b/elementary/monitor/data_monitoring/report/index.html
@@ -30,7 +30,7 @@