Skip to content

Commit

Permalink
feat: ♻️ Refactoring models to own queries against tables
Browse files Browse the repository at this point in the history
  • Loading branch information
awhipp committed Apr 26, 2024
1 parent 851134c commit 89f0813
Show file tree
Hide file tree
Showing 15 changed files with 235 additions and 156 deletions.
14 changes: 0 additions & 14 deletions foresight/models/forex_data.py

This file was deleted.

34 changes: 7 additions & 27 deletions foresight/stream_service/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@
import dotenv
import requests

from foresight.models.forex_data import ForexData
from foresight.models.stream import Stream
from foresight.utils.database import TimeScaleService
from foresight.stream_service.models.forex_data import ForexData
from foresight.stream_service.models.stream import Stream
from foresight.utils.logger import generate_logger


Expand Down Expand Up @@ -59,7 +58,7 @@ def open_random_walk_stream(
ask=round(initial_price + 0.0001, 5),
)

TimeScaleService().insert_forex_data(record, table_name=table_name)
record.insert_forex_data(table_name=table_name)

logger.info(record)

Expand All @@ -83,8 +82,8 @@ def process_stream_data(line: str, table_name: str = "forex_data"):
logger.error(record.errorMessage)
elif record.type == "PRICE" and record.tradeable:

TimeScaleService().insert_forex_data(
record.to_forex_data(),
forex_data = record.to_forex_data()
forex_data.insert_forex_data(
table_name=table_name,
)
logger.info(record)
Expand Down Expand Up @@ -135,28 +134,9 @@ def open_stream():
execute_stream()


def create_table(table_name: str = "forex_data") -> str:
"""Create a table in the data store."""

# Execute SQL queries here
TimeScaleService().create_table(
query=f"""CREATE TABLE IF NOT EXISTS {table_name} (
instrument VARCHAR(10) NOT NULL,
time TIMESTAMPTZ NOT NULL,
bid FLOAT NOT NULL,
ask FLOAT NOT NULL,
PRIMARY KEY (instrument, time)
)""",
table_name=table_name,
column_name="time",
)

return table_name


if __name__ == "__main__":
# Execute SQL queries here
create_table()
# Create the table in the data store if it does not exist.
ForexData.create_table()

# Open a stream to the OANDA API and send the data to the data store.
while True:
Expand Down
File renamed without changes.
73 changes: 73 additions & 0 deletions foresight/stream_service/models/forex_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""Forex Data Model used in TimeScaleDB"""

from datetime import datetime

from pydantic import BaseModel

from foresight.utils.database import TimeScaleService


class ForexData(BaseModel):
"""TimescaleDB model for forex data.
Args:
instrument (str): The currency pair.
time (datetime): The time of the record.
bid (float): The bid price.
ask (float): The ask price.
"""

instrument: str
time: datetime
bid: float
ask: float

@staticmethod
def create_table(table_name: str = "forex_data") -> str:
"""Create a table in the data store if it does not exist.
Args:
table_name (str): The name of the table to create.
Returns:
str: The name of the table created.
"""

# Execute SQL queries here
TimeScaleService().create_table(
query=f"""CREATE TABLE IF NOT EXISTS {table_name} (
instrument VARCHAR(10) NOT NULL,
time TIMESTAMPTZ NOT NULL,
bid FLOAT NOT NULL,
ask FLOAT NOT NULL,
PRIMARY KEY (instrument, time)
)""",
table_name=table_name,
column_name="time",
)

return table_name

def insert_forex_data(self, table_name: str = "forex_data"):
"""Insert forex data into the database."""
TimeScaleService().execute(
query=f"""INSERT INTO {table_name} (instrument, time, bid, ask)
VALUES (%s, %s, %s, %s)""",
params=(
self.instrument,
self.time,
self.bid,
self.ask,
),
)

@staticmethod
def drop_table(table_name: str = "forex_data"):
"""Drop a table in the data store.
Args:
table_name (str): The name of the table to drop.
"""

# Execute SQL queries here
TimeScaleService().execute(query=f"DROP TABLE {table_name}")
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@

from pydantic import BaseModel

from foresight.models.forex_data import ForexData
from foresight.models.pricing import Pricing
from foresight.stream_service.models.forex_data import ForexData
from foresight.stream_service.models.pricing import Pricing


class Stream(BaseModel):
Expand Down
19 changes: 3 additions & 16 deletions foresight/utils/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
import psycopg2
import psycopg2.extras

from foresight.models.forex_data import ForexData


dotenv.load_dotenv(".env")

Expand Down Expand Up @@ -73,7 +71,9 @@ def create_table(self, query, table_name=None, column_name=None):
except psycopg2.DatabaseError:
logger.info("Already created the hyper table. Skipping.")
except Exception as table_create_exception:
raise Exception(f"Failed to create table: {table_create_exception}")
raise ValueError(
f"Failed to create table: {table_create_exception}",
) from table_create_exception

else:
raise Exception("Database connection not established.")
Expand All @@ -98,19 +98,6 @@ def close(self):
self.connection.close()
self.connection = None

def insert_forex_data(self, forex_data: ForexData, table_name: str = "forex_data"):
"""Insert forex data into the database."""
self.execute(
query=f"""INSERT INTO {table_name} (instrument, time, bid, ask)
VALUES (%s, %s, %s, %s)""",
params=(
forex_data.instrument,
forex_data.time,
forex_data.bid,
forex_data.ask,
),
)


# Example usage:
# # Execute a sample query against native tables.
Expand Down
59 changes: 23 additions & 36 deletions foresight/window_service/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
logger = logging.getLogger(__name__)


timeMap: dict = {"S": "second", "M": "minute", "H": "hour", "D": "day"}


def fetch_data(instrument: str = "EUR_USD", timescale: str = "M") -> pd.DataFrame:
"""
Fetch all data from the database and return a DataFrame.
Expand All @@ -30,42 +33,26 @@ def fetch_data(instrument: str = "EUR_USD", timescale: str = "M") -> pd.DataFram
dict: The data from the database
"""
try:
# Fetch all data from the database based on the parameters
if timescale == "M":
df = pd.DataFrame(
TimeScaleService().execute(
query=f"""
SELECT TO_CHAR(date_trunc('minute', time), 'YYYY-MM-DD HH24:MI:SS') as time,
AVG(ask) as ask, AVG(bid) as bid
FROM forex_data
WHERE instrument = '{instrument}'
AND time >= NOW() - INTERVAL '60 minute'
GROUP BY time
ORDER BY time ASC
""",
),
)

return (
df.groupby(df["time"]).agg({"ask": "mean", "bid": "mean"}).reset_index()
)
elif timescale == "T":
return pd.DataFrame(
TimeScaleService().execute(
query=f"""
SELECT TO_CHAR(time, 'YYYY-MM-DD HH24:MI:SS') as time,
ask, bid
FROM forex_data
WHERE instrument = '{instrument}'
AND time >= NOW() - INTERVAL '60 minute'
ORDER BY time ASC
""",
),
)
else:
raise Exception(f"Invalid timescale: {timescale}")
except Exception as fetch_exception:
logger.error(f"Error: {fetch_exception}")
df = pd.DataFrame(
TimeScaleService().execute(
query=f"""
SELECT
TO_CHAR(
date_trunc('{timeMap[timescale.lower()]}', time), 'YYYY-MM-DD HH24:MI:SS'
) as time,
AVG(ask) as ask, AVG(bid) as bid
FROM forex_data
WHERE instrument = '{instrument}'
AND time >= NOW() - INTERVAL '60 minute'
GROUP BY time
ORDER BY time ASC
""",
),
)

return df.groupby(df["time"]).agg({"ask": "mean", "bid": "mean"}).reset_index()
except Exception as fetch_exception: # pylint: disable=broad-except
logger.error("Error fetching data: %s", fetch_exception)


if __name__ == "__main__":
Expand Down
16 changes: 0 additions & 16 deletions tests/conftest.py

This file was deleted.

41 changes: 41 additions & 0 deletions tests/stream_service/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""Test configuration for Foresight."""

import pytest

from foresight.stream_service.models.forex_data import ForexData
from foresight.utils.database import TimeScaleService


@pytest.fixture()
def create_forex_data_table():
"""Create a table in the TimescaleDB."""
table_name = ForexData.create_table()
yield table_name
ForexData.drop_table()


@pytest.fixture()
def insert_forex_data():
"""Insert data into the TimescaleDB."""
# ARRANGE
table_name = create_forex_data_table
data = ForexData(
instrument="EUR_USD",
time="2021-01-01T00:00:00",
bid=1.0,
ask=1.0001,
)

# ACT
data.insert_forex_data()

# ASSERT
records = TimeScaleService().execute(
query=f"SELECT * FROM {table_name}",
)

assert len(records) == 1 # 1 record
assert records[0]["instrument"] == "EUR_USD"
assert records[0]["bid"] == 1.0
assert records[0]["ask"] == 1.0001
assert records[0]["time"] == "2021-01-01 00:00:00+00"
Empty file.
Loading

1 comment on commit 89f0813

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Coverage

Code Coverage
FileStmtsMissCoverMissing
foresight
   __init__.py00100% 
foresight/indicator_services
   __init__.py00100% 
   indicator.py65650%3–4, 7–8, 10–13, 16, 21, 24, 27–30, 32, 39–44, 50, 52–56, 58, 60, 63, 69, 71, 73–75, 79, 83–84, 88–90, 94, 96, 98, 100, 102, 114, 116–117, 123, 125, 128, 131, 134, 136, 138, 140–142, 144–145, 147, 149, 151
   moving_average_indicator.py19190%2, 4–5, 8, 13, 16, 24–25, 32, 34, 37–38, 40–41, 44, 46, 49–50, 55
foresight/interface_service
   __init__.py00100% 
   app.py32320%3–5, 7–10, 12, 15, 18, 21, 26, 29, 31, 39–40, 42, 45–46, 48, 51–52, 54–57, 59–60, 62, 65–67
foresight/stream_service
   __init__.py00100% 
   app.py711381%82, 110, 131, 133–134, 139, 142–148
foresight/stream_service/models
   __init__.py00100% 
   forex_data.py170100% 
   pricing.py40100% 
   stream.py180100% 
foresight/utils
   __init__.py00100% 
   aws.py12120%3, 5, 8, 11, 13–14, 21, 27, 30, 32–33, 40
   database.py521375%55–56, 71–74, 79, 90–91, 93, 97–99
   logger.py90100% 
foresight/window_service
   __init__.py00100% 
   app.py39390%3–4, 6–7, 9–10, 13, 18, 21, 24, 35–36, 53–55, 58, 60, 70, 72–74, 79–80, 85–94, 98–100, 104–105, 111
tests
   __init__.py00100% 
tests/stream_service
   __init__.py00100% 
   conftest.py19952%21–22, 30, 33, 37–41
   test_app.py400100% 
tests/stream_service/models
   __init__.py00100% 
   test_forex_data.py300100% 
tests/window_service
   __init__.py00100% 
   test_app.py00100% 
TOTAL42720252% 

Please sign in to comment.