From 5035c1eabeb30705ef37a271ded2ea95aceea008 Mon Sep 17 00:00:00 2001 From: Tim Reichard Date: Tue, 19 Mar 2024 11:04:20 -0500 Subject: [PATCH] Remove databricks session spark object --- HISTORY.rst | 5 +++++ aioradio/ds_utils.py | 13 +++---------- setup.py | 2 +- 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/HISTORY.rst b/HISTORY.rst index af1573a..3116117 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -3,6 +3,11 @@ History ======= +v0.20.16 (2024-03-19) + +* Remove databricks session spark object and use original method to convert spark to polars df. + + v0.20.15 (2024-03-16) * Explicitly add databricks config to DatabricksSession. diff --git a/aioradio/ds_utils.py b/aioradio/ds_utils.py index cbc0680..a9ee92b 100644 --- a/aioradio/ds_utils.py +++ b/aioradio/ds_utils.py @@ -30,6 +30,7 @@ import numpy as np import pandas as pd import polars as pl +import pyarrow as pa from haversine import haversine, Unit from mlflow.entities.model_registry.model_version_status import ModelVersionStatus from mlflow.tracking.client import MlflowClient @@ -48,15 +49,7 @@ c_handler.setFormatter(c_format) logger.addHandler(c_handler) -try: - from databricks.connect import DatabricksSession - spark = DatabricksSession.builder.remote( - host=os.environ['DATABRICKS_HOST'], - token=os.environ['DATABRICKS_TOKEN'], - cluster_id=os.environ['DATABRICKS_CLUSTER_ID'] - ).getOrCreate() -except Exception: - spark = SparkSession.builder.getOrCreate() +spark = SparkSession.builder.getOrCreate() ############################### Databricks functions ################################ @@ -91,7 +84,7 @@ def ese_db_catalog(env): def sql_to_polars_df(sql): """Get polars DataFrame from SQL query results.""" - return pl.from_pandas(spark.sql(sql).toPandas()) + return pl.from_arrow(pa.Table.from_batches(spark.sql(sql)._collect_as_arrow())) def does_db_table_exists(name): diff --git a/setup.py b/setup.py index 0f3d312..2a3cfd9 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ long_description = fileobj.read() setup(name='aioradio', - version='0.20.15', + version='0.20.16', description='Generic asynchronous i/o python utilities for AWS services (SQS, S3, DynamoDB, Secrets Manager), Redis, MSSQL (pyodbc), JIRA and more', long_description=long_description, long_description_content_type="text/markdown",