Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
IanRFerguson committed Mar 24, 2024
1 parent c41ba16 commit 554ee97
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: checks
name: formatting_checks

on:
pull_request:
Expand All @@ -7,7 +7,7 @@ on:
branches: ["master", "dev"]

jobs:
run_checks:
check_linting_and_formatting:
runs-on: macos-latest
steps:

Expand All @@ -32,9 +32,23 @@ jobs:
run: |
python -m pip install black flake8 isort
- name: Run Python checks
- name: Run Python formatting and linting
if: steps.changed-python-files.outputs.any_changed == 'true'
run: |
flake8 ${{ steps.changed-python-files.outputs.all_changed_files }} --extend-ignore=E203,W503 --max-line-length=120
black --check ${{ steps.changed-python-files.outputs.all_changed_files }}
isort --profile black ${{ steps.changed-python-files.outputs.all_changed_files }}
isort --profile black ${{ steps.changed-python-files.outputs.all_changed_files }}
run_unit_tests:
runs-on: macos-latest
steps:
- name: Checkout branch
uses: actions/checkout@v3
with:
fetch-depth: 0

- name: Setup Python environment
uses: actions/setup-python@v3

- name: Run Unit Tests
run: pytest -rf tests/*
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<img src="https://upload.wikimedia.org/wikipedia/en/d/d5/Klondike_logo.svg">

Klondike offers a lightweight API to read and write data to Google BigQuery using rust-optimized Polars DataFrames.
Klondike offers a lightweight API to read and write data to Google BigQuery using Polars DataFrames.

## Installation

Expand Down
14 changes: 11 additions & 3 deletions klondike/bigquery/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import polars as pl
from google.cloud import bigquery
from google.cloud.bigquery import LoadJobConfig

from klondike import logger

##########
Expand All @@ -17,7 +18,7 @@
"https://www.googleapis.com/auth/bigquery",
]
},
)
)[0]


class BigQueryConnector:
Expand All @@ -29,12 +30,14 @@ def __init__(
self,
app_creds: Optional[Union[str, dict]] = None,
project: Optional[str] = None,
location: Optional[str] = None,
timeout: int = 60,
client_options: dict = SCOPES,
google_environment_variable: str = "GOOGLE_APPLICATION_CREDENTIALS",
):
self.app_creds = app_creds
self.project = project
self.location = location
self.timeout = timeout
self.client_options = client_options

Expand Down Expand Up @@ -163,9 +166,13 @@ def read_dataframe_from_bigquery(self, sql: str) -> pl.DataFrame:
# Execute and wait for results
result = query_job.result()

if not result:
logger.info("Nothing to see here! No results to return")
return

# Populate DataFrame using PyArrow
df = pl.from_arrow(result.to_arrow())
logger.info(f"Successfully read {len(df)} from BigQuery")
logger.info(f"Successfully read {len(df)} rows from BigQuery")

return df

Expand All @@ -189,7 +196,8 @@ def write_dataframe_to_bigquery(
max_bad_records: Tolerance for bad records in the load job, defaults to 0
table_schema: List of column names, types, and optional flags to include
if_exists: One of `fail`, `drop`, `append`, `truncate`
load_kwargs: See here for list of accepted values - https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.LoadJobConfig #noqa
load_kwargs: See here for list of accepted values
https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.LoadJobConfig
"""

if if_exists == "drop":
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
google-cloud-bigquery==3.19.0
polars==0.20.16
polars==0.20.16
pyarrow==15.0.2
Empty file added tests/__init__.py
Empty file.
59 changes: 59 additions & 0 deletions tests/test_bigquery_connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import os
from unittest import mock

import polars as pl
import pyarrow as pa

from klondike import BigQueryConnector

from .test_utils import KlondikeTestCase

##########


class TestBigQuery(KlondikeTestCase):
def setUp(self):
super().setUp()
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self._credentials_path

def tearDown(self):
super().tearDown()
del os.environ["GOOGLE_APPLICATION_CREDENTIALS"]

def _build_mock_cursor(self, query_results=None):
cursor = mock.MagicMock()
cursor.execute.return_value = None
cursor.fetchmany.side_effect = [query_results, []]

if query_results:
cursor.description = query_results

# Create a mock that will play the role of the connection
connection = mock.MagicMock()
connection.cursor.return_value = cursor

# Create a mock that will play the role of our GoogleBigQuery client
client = mock.MagicMock()

bq = BigQueryConnector()
bq._client = client

return bq

def test_read_dataframe_from_bigquery(self):
# sql = "select * from my_table"
# tbl = pa.table(
# {
# "city": ["Brooklyn", "San Francisco", "Richmond"],
# "state": ["New York", "California", "Virginia"],
# }
# )

# bq = self._build_mock_cursor(query_results=tbl)
# df = bq.read_dataframe_from_bigquery(sql=sql)

# assert isinstance(df, pl.DataFrame)
pass

def test_write_dataframe_to_bigquery(self):
pass
35 changes: 35 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import json
import os
import tempfile
import unittest

##########


class KlondikeTestCase(unittest.TestCase):
def setUp(self):
self._temp_directory = tempfile.TemporaryDirectory()

self._credentials_path = os.path.join(
self._temp_directory.name, "service_account.json"
)

self._service_account = {
"type": "foo",
"project_id": "bar",
"private_key_id": "biz",
"private_key": "bap",
"client_email": "bim",
"client_id": "top",
"auth_uri": "hat",
"token_uri": "tap",
"auth_provider_x509_cert_url": "dance",
"client_x509_cert_url": "good",
"universe_domain": "stuff",
}

with open(self._credentials_path, "w") as f:
json.dump(self._service_account, f)

def tearDown(self):
self._temp_directory.cleanup()

0 comments on commit 554ee97

Please sign in to comment.