Skip to content

Commit

Permalink
Adds back in pyspark individual metrics
Browse files Browse the repository at this point in the history
We had captured all of them. Now we capture individual data as well,
which allows for easy comparison. It's duplicated, so we use an
lru_tools cache (which should cache based on the pyspark dataframe ID)
  • Loading branch information
elijahbenizzy committed Jul 15, 2024
1 parent 32b1ef1 commit 71ed230
Showing 1 changed file with 31 additions and 1 deletion.
32 changes: 31 additions & 1 deletion ui/sdk/src/hamilton_sdk/tracking/pyspark_stats.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Any, Dict, Optional
import functools
from typing import Any, Dict, List, Optional

import pyspark.sql as ps
from hamilton_sdk.tracking import data_observation
Expand Down Expand Up @@ -43,6 +44,8 @@
}


# quick cache to ensure we don't compute twice
@functools.lru_cache(maxsize=128)
def _introspect(df: ps.DataFrame) -> Dict[str, Any]:
"""Introspect a PySpark dataframe and return a dictionary of statistics.
Expand Down Expand Up @@ -105,6 +108,33 @@ def compute_schema_psdf(
return None


@data_observation.compute_additional_results.register
def compute_additional_psdf(
result: ps.DataFrame, node_name: str, node_tags: dict
) -> List[ObservationType]:
o_value = _introspect(result)
return [
{
"observability_type": "primitive",
"observability_value": {
"type": str(str),
"value": o_value["cost_explain"],
},
"observability_schema_version": "0.0.1",
"name": "Cost Explain",
},
{
"observability_type": "primitive",
"observability_value": {
"type": str(str),
"value": o_value["extended_explain"],
},
"observability_schema_version": "0.0.1",
"name": "Extended Explain",
},
]


if __name__ == "__main__":
import numpy as np
import pandas as pd
Expand Down

0 comments on commit 71ed230

Please sign in to comment.