From 89f081379432e6eac9002b2684171a715e6b7570 Mon Sep 17 00:00:00 2001 From: awhipp Date: Fri, 26 Apr 2024 00:05:33 -0400 Subject: [PATCH] feat: :recycle: Refactoring models to own queries against tables --- foresight/models/forex_data.py | 14 ---- foresight/stream_service/app.py | 34 ++------ .../{ => stream_service}/models/__init__.py | 0 foresight/stream_service/models/forex_data.py | 73 +++++++++++++++++ .../{ => stream_service}/models/pricing.py | 0 .../{ => stream_service}/models/stream.py | 4 +- foresight/utils/database.py | 19 +---- foresight/window_service/app.py | 59 ++++++-------- tests/conftest.py | 16 ---- tests/stream_service/conftest.py | 41 ++++++++++ tests/stream_service/models/__init__.py | 0 .../stream_service/models/test_forex_data.py | 79 +++++++++++++++++++ tests/stream_service/test_app.py | 52 ++---------- tests/window_service/__init__.py | 0 tests/window_service/test_app.py | 0 15 files changed, 235 insertions(+), 156 deletions(-) delete mode 100644 foresight/models/forex_data.py rename foresight/{ => stream_service}/models/__init__.py (100%) create mode 100644 foresight/stream_service/models/forex_data.py rename foresight/{ => stream_service}/models/pricing.py (100%) rename foresight/{ => stream_service}/models/stream.py (86%) delete mode 100644 tests/conftest.py create mode 100644 tests/stream_service/conftest.py create mode 100644 tests/stream_service/models/__init__.py create mode 100644 tests/stream_service/models/test_forex_data.py create mode 100644 tests/window_service/__init__.py create mode 100644 tests/window_service/test_app.py diff --git a/foresight/models/forex_data.py b/foresight/models/forex_data.py deleted file mode 100644 index baa8fe5..0000000 --- a/foresight/models/forex_data.py +++ /dev/null @@ -1,14 +0,0 @@ -"""Forex Data Model used in TimeScaleDB""" - -from datetime import datetime - -from pydantic import BaseModel - - -class ForexData(BaseModel): - """TimescaleDB model for forex data.""" - - instrument: str - time: datetime - bid: float - ask: float diff --git a/foresight/stream_service/app.py b/foresight/stream_service/app.py index a0c2847..aab06cb 100644 --- a/foresight/stream_service/app.py +++ b/foresight/stream_service/app.py @@ -12,9 +12,8 @@ import dotenv import requests -from foresight.models.forex_data import ForexData -from foresight.models.stream import Stream -from foresight.utils.database import TimeScaleService +from foresight.stream_service.models.forex_data import ForexData +from foresight.stream_service.models.stream import Stream from foresight.utils.logger import generate_logger @@ -59,7 +58,7 @@ def open_random_walk_stream( ask=round(initial_price + 0.0001, 5), ) - TimeScaleService().insert_forex_data(record, table_name=table_name) + record.insert_forex_data(table_name=table_name) logger.info(record) @@ -83,8 +82,8 @@ def process_stream_data(line: str, table_name: str = "forex_data"): logger.error(record.errorMessage) elif record.type == "PRICE" and record.tradeable: - TimeScaleService().insert_forex_data( - record.to_forex_data(), + forex_data = record.to_forex_data() + forex_data.insert_forex_data( table_name=table_name, ) logger.info(record) @@ -135,28 +134,9 @@ def open_stream(): execute_stream() -def create_table(table_name: str = "forex_data") -> str: - """Create a table in the data store.""" - - # Execute SQL queries here - TimeScaleService().create_table( - query=f"""CREATE TABLE IF NOT EXISTS {table_name} ( - instrument VARCHAR(10) NOT NULL, - time TIMESTAMPTZ NOT NULL, - bid FLOAT NOT NULL, - ask FLOAT NOT NULL, - PRIMARY KEY (instrument, time) - )""", - table_name=table_name, - column_name="time", - ) - - return table_name - - if __name__ == "__main__": - # Execute SQL queries here - create_table() + # Create the table in the data store if it does not exist. + ForexData.create_table() # Open a stream to the OANDA API and send the data to the data store. while True: diff --git a/foresight/models/__init__.py b/foresight/stream_service/models/__init__.py similarity index 100% rename from foresight/models/__init__.py rename to foresight/stream_service/models/__init__.py diff --git a/foresight/stream_service/models/forex_data.py b/foresight/stream_service/models/forex_data.py new file mode 100644 index 0000000..aa6b0ec --- /dev/null +++ b/foresight/stream_service/models/forex_data.py @@ -0,0 +1,73 @@ +"""Forex Data Model used in TimeScaleDB""" + +from datetime import datetime + +from pydantic import BaseModel + +from foresight.utils.database import TimeScaleService + + +class ForexData(BaseModel): + """TimescaleDB model for forex data. + + Args: + instrument (str): The currency pair. + time (datetime): The time of the record. + bid (float): The bid price. + ask (float): The ask price. + """ + + instrument: str + time: datetime + bid: float + ask: float + + @staticmethod + def create_table(table_name: str = "forex_data") -> str: + """Create a table in the data store if it does not exist. + + Args: + table_name (str): The name of the table to create. + + Returns: + str: The name of the table created. + """ + + # Execute SQL queries here + TimeScaleService().create_table( + query=f"""CREATE TABLE IF NOT EXISTS {table_name} ( + instrument VARCHAR(10) NOT NULL, + time TIMESTAMPTZ NOT NULL, + bid FLOAT NOT NULL, + ask FLOAT NOT NULL, + PRIMARY KEY (instrument, time) + )""", + table_name=table_name, + column_name="time", + ) + + return table_name + + def insert_forex_data(self, table_name: str = "forex_data"): + """Insert forex data into the database.""" + TimeScaleService().execute( + query=f"""INSERT INTO {table_name} (instrument, time, bid, ask) + VALUES (%s, %s, %s, %s)""", + params=( + self.instrument, + self.time, + self.bid, + self.ask, + ), + ) + + @staticmethod + def drop_table(table_name: str = "forex_data"): + """Drop a table in the data store. + + Args: + table_name (str): The name of the table to drop. + """ + + # Execute SQL queries here + TimeScaleService().execute(query=f"DROP TABLE {table_name}") diff --git a/foresight/models/pricing.py b/foresight/stream_service/models/pricing.py similarity index 100% rename from foresight/models/pricing.py rename to foresight/stream_service/models/pricing.py diff --git a/foresight/models/stream.py b/foresight/stream_service/models/stream.py similarity index 86% rename from foresight/models/stream.py rename to foresight/stream_service/models/stream.py index 52cd495..82cbb07 100644 --- a/foresight/models/stream.py +++ b/foresight/stream_service/models/stream.py @@ -7,8 +7,8 @@ from pydantic import BaseModel -from foresight.models.forex_data import ForexData -from foresight.models.pricing import Pricing +from foresight.stream_service.models.forex_data import ForexData +from foresight.stream_service.models.pricing import Pricing class Stream(BaseModel): diff --git a/foresight/utils/database.py b/foresight/utils/database.py index 978f378..e02aac2 100644 --- a/foresight/utils/database.py +++ b/foresight/utils/database.py @@ -7,8 +7,6 @@ import psycopg2 import psycopg2.extras -from foresight.models.forex_data import ForexData - dotenv.load_dotenv(".env") @@ -73,7 +71,9 @@ def create_table(self, query, table_name=None, column_name=None): except psycopg2.DatabaseError: logger.info("Already created the hyper table. Skipping.") except Exception as table_create_exception: - raise Exception(f"Failed to create table: {table_create_exception}") + raise ValueError( + f"Failed to create table: {table_create_exception}", + ) from table_create_exception else: raise Exception("Database connection not established.") @@ -98,19 +98,6 @@ def close(self): self.connection.close() self.connection = None - def insert_forex_data(self, forex_data: ForexData, table_name: str = "forex_data"): - """Insert forex data into the database.""" - self.execute( - query=f"""INSERT INTO {table_name} (instrument, time, bid, ask) - VALUES (%s, %s, %s, %s)""", - params=( - forex_data.instrument, - forex_data.time, - forex_data.bid, - forex_data.ask, - ), - ) - # Example usage: # # Execute a sample query against native tables. diff --git a/foresight/window_service/app.py b/foresight/window_service/app.py index cefdd2d..a908bf4 100644 --- a/foresight/window_service/app.py +++ b/foresight/window_service/app.py @@ -18,6 +18,9 @@ logger = logging.getLogger(__name__) +timeMap: dict = {"S": "second", "M": "minute", "H": "hour", "D": "day"} + + def fetch_data(instrument: str = "EUR_USD", timescale: str = "M") -> pd.DataFrame: """ Fetch all data from the database and return a DataFrame. @@ -30,42 +33,26 @@ def fetch_data(instrument: str = "EUR_USD", timescale: str = "M") -> pd.DataFram dict: The data from the database """ try: - # Fetch all data from the database based on the parameters - if timescale == "M": - df = pd.DataFrame( - TimeScaleService().execute( - query=f""" - SELECT TO_CHAR(date_trunc('minute', time), 'YYYY-MM-DD HH24:MI:SS') as time, - AVG(ask) as ask, AVG(bid) as bid - FROM forex_data - WHERE instrument = '{instrument}' - AND time >= NOW() - INTERVAL '60 minute' - GROUP BY time - ORDER BY time ASC - """, - ), - ) - - return ( - df.groupby(df["time"]).agg({"ask": "mean", "bid": "mean"}).reset_index() - ) - elif timescale == "T": - return pd.DataFrame( - TimeScaleService().execute( - query=f""" - SELECT TO_CHAR(time, 'YYYY-MM-DD HH24:MI:SS') as time, - ask, bid - FROM forex_data - WHERE instrument = '{instrument}' - AND time >= NOW() - INTERVAL '60 minute' - ORDER BY time ASC - """, - ), - ) - else: - raise Exception(f"Invalid timescale: {timescale}") - except Exception as fetch_exception: - logger.error(f"Error: {fetch_exception}") + df = pd.DataFrame( + TimeScaleService().execute( + query=f""" + SELECT + TO_CHAR( + date_trunc('{timeMap[timescale.lower()]}', time), 'YYYY-MM-DD HH24:MI:SS' + ) as time, + AVG(ask) as ask, AVG(bid) as bid + FROM forex_data + WHERE instrument = '{instrument}' + AND time >= NOW() - INTERVAL '60 minute' + GROUP BY time + ORDER BY time ASC + """, + ), + ) + + return df.groupby(df["time"]).agg({"ask": "mean", "bid": "mean"}).reset_index() + except Exception as fetch_exception: # pylint: disable=broad-except + logger.error("Error fetching data: %s", fetch_exception) if __name__ == "__main__": diff --git a/tests/conftest.py b/tests/conftest.py deleted file mode 100644 index a9d612b..0000000 --- a/tests/conftest.py +++ /dev/null @@ -1,16 +0,0 @@ -"""Test configuration for Foresight.""" - -import uuid - -import pytest - -from foresight.stream_service.app import create_table -from foresight.utils.database import TimeScaleService - - -@pytest.fixture() -def create_timescale_table(): - """Create a table in the TimescaleDB.""" - table_name = create_table(table_name=f"forex_data_{str(uuid.uuid4())[0:4]}") - yield table_name - TimeScaleService().execute(query=f"DROP TABLE {table_name}") diff --git a/tests/stream_service/conftest.py b/tests/stream_service/conftest.py new file mode 100644 index 0000000..ccc77da --- /dev/null +++ b/tests/stream_service/conftest.py @@ -0,0 +1,41 @@ +"""Test configuration for Foresight.""" + +import pytest + +from foresight.stream_service.models.forex_data import ForexData +from foresight.utils.database import TimeScaleService + + +@pytest.fixture() +def create_forex_data_table(): + """Create a table in the TimescaleDB.""" + table_name = ForexData.create_table() + yield table_name + ForexData.drop_table() + + +@pytest.fixture() +def insert_forex_data(): + """Insert data into the TimescaleDB.""" + # ARRANGE + table_name = create_forex_data_table + data = ForexData( + instrument="EUR_USD", + time="2021-01-01T00:00:00", + bid=1.0, + ask=1.0001, + ) + + # ACT + data.insert_forex_data() + + # ASSERT + records = TimeScaleService().execute( + query=f"SELECT * FROM {table_name}", + ) + + assert len(records) == 1 # 1 record + assert records[0]["instrument"] == "EUR_USD" + assert records[0]["bid"] == 1.0 + assert records[0]["ask"] == 1.0001 + assert records[0]["time"] == "2021-01-01 00:00:00+00" diff --git a/tests/stream_service/models/__init__.py b/tests/stream_service/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/stream_service/models/test_forex_data.py b/tests/stream_service/models/test_forex_data.py new file mode 100644 index 0000000..a691aac --- /dev/null +++ b/tests/stream_service/models/test_forex_data.py @@ -0,0 +1,79 @@ +"""Test forex data model.""" + +import datetime + +import pytest + +from foresight.stream_service.models.forex_data import ForexData +from foresight.utils.database import TimeScaleService + + +def test_valid_forex_data(): + """Test valid forex data.""" + + forex_data = ForexData( + instrument="EUR_USD", + time="2021-01-01T00:00:00", + bid=1.0, + ask=1.0001, + ) + assert forex_data.instrument == "EUR_USD" + assert forex_data.time == datetime.datetime(2021, 1, 1, 0, 0) + assert forex_data.bid == 1.0 + assert forex_data.ask == 1.0001 + + +def test_invalid_forex_data(): + """Test invalid forex data. All 4 fields are required.""" + + valid_definition = { + "instrument": "EUR_USD", + "time": "2021-01-01T00:00:00", + "bid": 1.0, + "ask": 1.0001, + } + + # Loop through all fields and remove one at a time + for key in valid_definition.keys(): + invalid_definition = valid_definition.copy() + invalid_definition.pop(key) + with pytest.raises(ValueError): + ForexData(**invalid_definition) + + +def test_create_table(): + """Test the create_table method.""" + + # ARRANGE / ACT + table_name = ForexData.create_table() + + # ASSERT + assert table_name == table_name + + hyper_table_details = TimeScaleService().execute( + query=f"""SELECT * FROM timescaledb_information.hypertables + WHERE hypertable_name = '{table_name}'""", + ) + + assert len(hyper_table_details) == 1 # Only one record + + hyper_table_details = hyper_table_details[0] + + assert hyper_table_details["hypertable_schema"] == "public" + assert hyper_table_details["hypertable_name"] == table_name + + table_schema = TimeScaleService().execute( + query=f"""SELECT + column_name, data_type, character_maximum_length, column_default, is_nullable + FROM INFORMATION_SCHEMA.COLUMNS where table_name = '{table_name}'""", + ) + + expected_columns = { + "instrument": "character varying", + "time": "timestamp with time zone", + "bid": "double precision", + "ask": "double precision", + } + assert len(table_schema) == 4 # 4 columns + for column in table_schema: + assert column["data_type"] == expected_columns[column["column_name"]] diff --git a/tests/stream_service/test_app.py b/tests/stream_service/test_app.py index 6340438..3eea0ae 100644 --- a/tests/stream_service/test_app.py +++ b/tests/stream_service/test_app.py @@ -5,56 +5,18 @@ import pytest -from foresight.models.stream import Stream from foresight.stream_service.app import open_oanda_stream from foresight.stream_service.app import open_random_walk_stream from foresight.stream_service.app import process_stream_data +from foresight.stream_service.models.stream import Stream from foresight.utils.database import TimeScaleService -def test_create_table(create_timescale_table): - """Test the create_table method.""" - - # ARRANGE / ACT - table_name = create_timescale_table - - # ASSERT - assert table_name == table_name - - hyper_table_details = TimeScaleService().execute( - query=f"""SELECT * FROM timescaledb_information.hypertables - WHERE hypertable_name = '{table_name}'""", - ) - - assert len(hyper_table_details) == 1 # Only one record - - hyper_table_details = hyper_table_details[0] - - assert hyper_table_details["hypertable_schema"] == "public" - assert hyper_table_details["hypertable_name"] == table_name - - table_schema = TimeScaleService().execute( - query=f"""SELECT - column_name, data_type, character_maximum_length, column_default, is_nullable - FROM INFORMATION_SCHEMA.COLUMNS where table_name = '{table_name}'""", - ) - - expected_columns = { - "instrument": "character varying", - "time": "timestamp with time zone", - "bid": "double precision", - "ask": "double precision", - } - assert len(table_schema) == 4 # 4 columns - for column in table_schema: - assert column["data_type"] == expected_columns[column["column_name"]] - - -def test_open_random_walk_stream(create_timescale_table): +def test_open_random_walk_stream(create_forex_data_table): """Ensure that the random walk stream is working as expected.""" # ARRANGE - table_name = create_timescale_table + table_name = create_forex_data_table # ACT open_random_walk_stream(max_walk=10, table_name=table_name) @@ -72,11 +34,11 @@ def test_open_random_walk_stream(create_timescale_table): assert record["ask"] > 0 -def test_process_steam_data(create_timescale_table): +def test_process_steam_data(create_forex_data_table): """Test the process_stream_data method.""" # ARRANGE - table_name = create_timescale_table + table_name = create_forex_data_table sample_stream_record = Stream( instrument="EUR_USD", time=datetime.now().isoformat(), @@ -107,10 +69,10 @@ def test_process_steam_data(create_timescale_table): ) -def test_open_oanda_stream(create_timescale_table): +def test_open_oanda_stream(create_forex_data_table): """Test the open_oanda_stream method.""" # ARRANGE - table_name = create_timescale_table + table_name = create_forex_data_table # ACT open_oanda_stream(run_forever=False, limit=1) diff --git a/tests/window_service/__init__.py b/tests/window_service/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/window_service/test_app.py b/tests/window_service/test_app.py new file mode 100644 index 0000000..e69de29