Skip to content

Commit

Permalink
feat: ✨ Adding ability to insert multiple
Browse files Browse the repository at this point in the history
  • Loading branch information
awhipp committed Apr 30, 2024
1 parent 3f3c41d commit a67185b
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 36 deletions.
1 change: 0 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ repos:
- id: check-json
- id: check-merge-conflict
- id: check-toml
- id: detect-aws-credentials
- id: name-tests-test

- repo: https://github.com/PyCQA/flake8
Expand Down
9 changes: 7 additions & 2 deletions foresight/utils/database.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Provides a singleton class to interact with the TimescaleDB database."""

import os
from typing import Union

import dotenv
import psycopg2
Expand Down Expand Up @@ -74,12 +75,16 @@ def create_table(self, query, table_name=None, column_name=None):
else:
raise Exception("Database connection not established.")

def execute(self, query, params=None):
def execute(self, query, params: Union[tuple, list] = None):
"""Execute a query on the database."""
if self.connection is not None:
try:
with self.connection.cursor() as cursor:
cursor.execute(query, params)
if params is None or isinstance(params, tuple):
cursor.execute(query, params)
elif isinstance(params, list):
psycopg2.extras.execute_values(cursor, query, params)

if cursor.description is not None:
data = cursor.fetchall()
return [dict(row) for row in data]
Expand Down
73 changes: 40 additions & 33 deletions foresight/utils/models/forex_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from datetime import datetime

import pandas as pd
from pydantic import BaseModel

from foresight.utils.database import TimeScaleService
Expand All @@ -11,7 +10,15 @@

logger = generate_logger(name=__name__)

timeMap: dict = {"S": "second", "M": "minute", "H": "hour", "D": "day"}
time_map: dict = {"S": "second", "M": "minute", "H": "hour", "D": "day"}

# Generate the interval based on the timescale
interval_map: dict = {
"S": "60 minute",
"M": "24 hour",
"H": "14 day",
"D": "120 day",
}


class ForexData(BaseModel):
Expand Down Expand Up @@ -68,6 +75,23 @@ def insert_forex_data(self, table_name: str = "forex_data"):
),
)

@staticmethod
def insert_multiple(data: list["ForexData"], table_name: str = "forex_data"):
"""Insert list of multiple forex data efficiently."""
if len(data) > 0:
TimeScaleService().execute(
query=f"""INSERT INTO {table_name} (instrument, time, bid, ask) VALUES %s""",
params=[
(
row.instrument,
row.time,
row.bid,
row.ask,
)
for row in data
],
)

@staticmethod
def drop_table(table_name: str = "forex_data"):
"""Drop a table in the data store.
Expand All @@ -84,6 +108,8 @@ def fetch(instrument: str = "EUR_USD", timescale: str = "M") -> list["ForexData"
"""
Fetch all data from the database and return a DataFrame.
The goal is to great the moving average at the timescale granularity.
Parameters:
instrument (str): The instrument to fetch
timescale (str): The timescale to fetch (S = Second, M = Minute)
Expand All @@ -92,38 +118,19 @@ def fetch(instrument: str = "EUR_USD", timescale: str = "M") -> list["ForexData"
dict: The data from the database
"""
try:
df = pd.DataFrame(
TimeScaleService().execute(
query=f"""
SELECT
TO_CHAR(
date_trunc(
'{timeMap[timescale.lower()]}', time
),
'YYYY-MM-DD HH24:MI:SS'
) as time,
AVG(ask) as ask, AVG(bid) as bid
FROM forex_data
WHERE instrument = '{instrument}'
AND time >= NOW() - INTERVAL '60 minute'
GROUP BY time
ORDER BY time ASC
""",
),
)
query = f"""SELECT
instrument,
time_bucket('{interval_map[timescale]}', time) as time,
AVG(bid) as bid,
AVG(ask) as ask
FROM forex_data
WHERE instrument = '{instrument}'
GROUP BY instrument, time
ORDER BY time ASC"""
results = TimeScaleService().execute(query=query)

return [ForexData(**row) for row in results]

df = (
df.groupby(df["time"]).agg({"ask": "mean", "bid": "mean"}).reset_index()
)
return [
ForexData(
instrument=instrument,
time=datetime.strptime(row["time"], "%Y-%m-%d %H:%M:%S"),
bid=row["bid"],
ask=row["ask"],
)
for _, row in df.iterrows()
]
except Exception as fetch_exception: # pylint: disable=broad-except
logger.error("Error fetching data: %s", fetch_exception)

Expand Down
13 changes: 13 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
"""Test configuration file for fixtures."""

import pytest

from foresight.utils.models.forex_data import ForexData


@pytest.fixture()
def setup_forex_data():
"""Setup forex data for testing."""
ForexData.create_table()
yield
ForexData.drop_table()
41 changes: 41 additions & 0 deletions tests/utils/models/forex_data_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,47 @@ def test_create_table():
assert column["data_type"] == expected_columns[column["column_name"]]


@pytest.mark.usefixtures("setup_forex_data")
@pytest.mark.parametrize(
"timescale",
[
"S",
"M",
"H",
"D",
],
)
def test_insert_and_fetch(timescale):
"""Insert and fetch forex data."""

# ! TODO Better Test of Aggregate (across timescales)

# ARRANGE
dt_end = datetime.datetime.now()

# Create random data for the last 1000 seconds
data_array = []
for i in range(1000):
dt = dt_end - datetime.timedelta(seconds=i)
dt_end = dt
data_array.append(
ForexData(
instrument="EUR_USD",
time=dt,
bid=1.0 + i,
ask=2.0 + i,
),
)

# ACT
ForexData.insert_multiple(data=data_array)

data = ForexData.fetch(timescale=timescale)

# ASSERT
assert len(data) == 1000


def test_to_price_json():
"""Test the to_price_json method."""

Expand Down
Empty file.

1 comment on commit a67185b

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Coverage

Code Coverage
FileStmtsMissCoverMissing
foresight
   __init__.py00100% 
foresight/indicator_services
   __init__.py00100% 
   indicator.py64640%3–5, 7–10, 12, 15, 18, 21–24, 26, 33–38, 44, 46–50, 52, 54, 57, 63, 65, 67–69, 73, 77–78, 82–84, 88, 90, 92, 94, 96, 108, 110–111, 117, 119, 122, 125, 128, 130, 132, 134–136, 138–139, 141, 143, 145
   moving_average_indicator.py18180%3, 5–6, 9, 12, 20–21, 28, 30, 33–34, 36–37, 40, 42, 45–46, 51
foresight/interface_service
   __init__.py00100% 
   app.py31310%3–4, 6–9, 11–12, 15, 18, 20, 23, 25, 33–34, 36, 39–40, 42, 45–46, 48–51, 53–54, 56, 59–61
foresight/stream_service
   __init__.py00100% 
   app.py711381%82, 110, 131, 133–134, 139, 142–148
foresight/stream_service/models
   __init__.py00100% 
   pricing.py40100% 
   stream.py180100% 
foresight/utils
   __init__.py00100% 
   aws.py12120%3, 5, 8, 11, 13–14, 21, 27, 30, 32–33, 40
   database.py551180%52–53, 70–71, 76, 91–92, 94, 98–100
   logger.py90100% 
foresight/utils/models
   __init__.py00100% 
   forex_data.py43295%134–135
   subscription_feeds.py24240%3–4, 6–7, 10, 13, 23–26, 28–29, 39, 48, 50–51, 59, 61–62, 72–73, 77–79
foresight/window_service
   __init__.py00100% 
   app.py25250%3, 5, 7–10, 13, 16–17, 19, 21–23, 26–27, 32, 34, 40–42, 46–47, 50–52
tests
   conftest.py70100% 
tests/stream_service
   conftest.py19952%21–22, 30, 33, 37–41
   stream_service_test.py410100% 
tests/utils/models
   forex_data_test.py530100% 
   subscription_feeds_test.py00100% 
tests/window_service
   window_service_test.py00100% 
TOTAL49420957% 

Please sign in to comment.