Skip to content

Commit

Permalink
Merge pull request #14 from awhipp/tests/add_stream_service_tests
Browse files Browse the repository at this point in the history
fix: ✅ Added tests for the stream service
  • Loading branch information
awhipp authored Mar 13, 2024
2 parents 00a1a1a + 5148c06 commit 2edec1a
Show file tree
Hide file tree
Showing 9 changed files with 277 additions and 96 deletions.
7 changes: 7 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"python.testing.pytestArgs": [
"tests"
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true
}
14 changes: 14 additions & 0 deletions foresight/models/forex_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
"""Forex Data Model used in TimeScaleDB"""

from datetime import datetime

from pydantic import BaseModel


class ForexData(BaseModel):
"""TimescaleDB model for forex data."""

instrument: str
time: datetime
bid: float
ask: float
4 changes: 2 additions & 2 deletions foresight/models/pricing.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@
class Pricing(BaseModel):
"""A Pydantic model for the pricing API."""

price: str
liquidity: int
price: float
liquidity: int = 1
18 changes: 14 additions & 4 deletions foresight/models/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,34 @@
This module contains the Pydantic models for the Steam API.
"""

from datetime import datetime
from typing import Optional

from pydantic import BaseModel

from foresight.models.forex_data import ForexData
from foresight.models.pricing import Pricing


class Stream(BaseModel):
"""A Pydantic model for the Steam API."""

type: str
type: str = "PRICE"
instrument: Optional[str] = None
time: str
tradeable: Optional[bool] = None
time: datetime
tradeable: Optional[bool] = True
bids: Optional[list[Pricing]] = []
asks: Optional[list[Pricing]] = []
closeoutBid: Optional[str] = None
closeoutAsk: Optional[str] = None
status: Optional[str] = None
tradeable: Optional[bool] = False
errorMessage: Optional[str] = None

def to_forex_data(self):
"""Convert the stream data to forex data."""
return ForexData(
instrument=self.instrument,
time=self.time,
bid=self.bids[0].price,
ask=self.asks[0].price,
)
189 changes: 102 additions & 87 deletions foresight/stream_service/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,131 +3,146 @@
import json
import os
import traceback
from datetime import datetime
from random import random
from time import sleep
from typing import Union

import dotenv
import requests

from foresight.models.forex_data import ForexData
from foresight.models.stream import Stream
from foresight.utils.database import TimeScaleService

# Setup logging and log timestamp prepend
from foresight.utils.logger import generate_logger


logger = generate_logger(__name__)
dotenv.load_dotenv(".env")


def open_stream():
"""Open a stream to the OANDA API and send the data to the data store."""
def open_random_walk_stream(
sleep_between: Union[int, float] = 5,
instrument: str = "EUR_USD",
max_walk: int = -1,
table_name: str = "forex_data",
):
"""
Open a random walk stream and send the data to the data store.
Args:
sleep_between (Union[int, float]): The time to sleep between each record.
Set to 0 if max_walk is defined.
max_walk (int): The maximum number of walks to complete.
table_name (str): The name of the table to send the data to.
"""

if max_walk > 0:
sleep_between = 0

logger.info(
f"""Starting random walk stream with sleep between:
{sleep_between} and max_walk: {max_walk}.""",
)

initial_price = 1.0
walks_completed = 0

while True:
initial_price = initial_price * (1.0 + (random() - 0.5) * 0.1)

record = ForexData(
instrument=instrument,
time=datetime.now().isoformat(),
bid=round(initial_price, 5),
ask=round(initial_price + 0.0001, 5),
)

TimeScaleService().insert_forex_data(record, table_name=table_name)

logger.info(record)

if max_walk > 0:
walks_completed += 1
if walks_completed >= max_walk:
break

sleep(sleep_between)


def process_stream_data(line: str, table_name: str = "forex_data"):
"""
Process the stream data and send it to the data store.
"""
if line:
decoded_line = line.decode("utf-8")
record: Stream = Stream.model_validate(json.loads(decoded_line))

if record.errorMessage not in [None, ""]:
logger.error(record.errorMessage)
elif record.type == "PRICE" and record.tradeable:

TimeScaleService().insert_forex_data(
record.to_forex_data(),
table_name=table_name,
)
logger.info(record)


def open_oanda_stream():
"""
Open a stream to the OANDA API and send the data to the data store.
"""
account_id = os.getenv("OANDA_ACCOUNT_ID")
api_token = os.getenv("OANDA_TOKEN")
OANDA_API = os.getenv("OANDA_API", "https://stream-fxpractice.oanda.com/v3/")
if not OANDA_API.endswith("/"):
OANDA_API += "/"

url = f"{OANDA_API}accounts/{account_id}/pricing/stream?instruments=EUR_USD"
head = {
"Content-type": "application/json",
"Accept-Datetime-Format": "RFC3339",
"Authorization": f"Bearer {api_token}",
}
resp = requests.get(url, headers=head, stream=True, timeout=30).iter_lines()
for line in resp:
process_stream_data(line)


def open_stream():
"""Stream the data send the data to the data store.
Uses a random walk or the OANDA API endpoint based on env."""

random_walk = os.getenv("APP_RANDOM_WALK", "False").lower() == "true"

if random_walk:
logger.info("Random walk mode...")
from datetime import datetime
from random import random
from time import sleep

initial_price = 1.0

while True:
initial_price = initial_price * (1.0 + (random() - 0.5) * 0.1)
record = {
"instrument": "EUR_USD",
"time": datetime.now().isoformat(),
"bid": round(initial_price, 5),
"ask": round(initial_price + 0.0001, 5),
}

TimeScaleService().execute(
query="""INSERT INTO forex_data (instrument, time, bid, ask)
VALUES (%s, %s, %s, %s)""",
params=(
record["instrument"],
record["time"],
record["bid"],
record["ask"],
),
)
execute_stream = open_random_walk_stream if random_walk else open_oanda_stream
execute_stream()

logger.info(record)

sleep(5)

else:
url = f"{OANDA_API}accounts/{account_id}/pricing/stream?instruments=EUR_USD"
head = {
"Content-type": "application/json",
"Accept-Datetime-Format": "RFC3339",
"Authorization": f"Bearer {api_token}",
}
resp = requests.get(url, headers=head, stream=True, timeout=30).iter_lines()
for line in resp:
if line:
decoded_line = line.decode("utf-8")
obj = json.loads(decoded_line)
obj: Stream = Stream.model_validate(obj)
if obj.errorMessage:
logger.error(obj.errorMessage)
continue
if obj.type == "PRICE" and obj.tradeable:
record = {
"instrument": obj.instrument,
"time": obj.time,
"bid": float(obj.bids[0].price),
"ask": float(obj.asks[0].price),
}

TimeScaleService().execute(
query="""INSERT INTO forex_data (instrument, time, bid, ask)
VALUES (%s, %s, %s, %s)""",
params=(
record["instrument"],
record["time"],
record["bid"],
record["ask"],
),
)

logger.info(record)


def create_table():
def create_table(table_name: str = "forex_data") -> str:
"""Create a table in the data store."""

# Execute SQL queries here
TimeScaleService().create_table(
query="""CREATE TABLE IF NOT EXISTS forex_data (
query=f"""CREATE TABLE IF NOT EXISTS {table_name} (
instrument VARCHAR(10) NOT NULL,
time TIMESTAMPTZ NOT NULL,
bid FLOAT NOT NULL,
ask FLOAT NOT NULL,
PRIMARY KEY (instrument, time)
)""",
hyper_table_name="forex_data",
hyper_table_column="time",
table_name=table_name,
column_name="time",
)

return table_name


if __name__ == "__main__":
# Execute SQL queries here
TimeScaleService().create_table(
query="""CREATE TABLE IF NOT EXISTS forex_data (
instrument VARCHAR(10) NOT NULL,
time TIMESTAMPTZ NOT NULL,
bid FLOAT NOT NULL,
ask FLOAT NOT NULL,
PRIMARY KEY (instrument, time)
)""",
hyper_table_name="forex_data",
hyper_table_column="time",
)
create_table()

# Open a stream to the OANDA API and send the data to the data store.
while True:
Expand Down
21 changes: 18 additions & 3 deletions foresight/utils/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import psycopg2
import psycopg2.extras

from foresight.models.forex_data import ForexData


dotenv.load_dotenv(".env")

Expand Down Expand Up @@ -57,16 +59,16 @@ def __init__(self):
f"Failed to connect to the database: {connection_exception}",
)

def create_table(self, query, ht_name=None, ht_column=None):
def create_table(self, query, table_name=None, column_name=None):
"""Create a table in the database."""
if self.connection is not None:
try:
with self.connection.cursor() as cursor:
cursor.execute(query)
if ht_name is not None and ht_column is not None:
if table_name is not None and column_name is not None:
try:
cursor.execute(
f"SELECT create_hypertable('{ht_name}', '{ht_column}')",
f"SELECT create_hypertable('{table_name}', '{column_name}')",
)
except psycopg2.DatabaseError:
logger.info("Already created the hyper table. Skipping.")
Expand Down Expand Up @@ -96,6 +98,19 @@ def close(self):
self.connection.close()
self.connection = None

def insert_forex_data(self, forex_data: ForexData, table_name: str = "forex_data"):
"""Insert forex data into the database."""
self.execute(
query=f"""INSERT INTO {table_name} (instrument, time, bid, ask)
VALUES (%s, %s, %s, %s)""",
params=(
forex_data.instrument,
forex_data.time,
forex_data.bid,
forex_data.ask,
),
)


# Example usage:
# # Execute a sample query against native tables.
Expand Down
16 changes: 16 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"""Test configuration for Foresight."""

import uuid

import pytest

from foresight.stream_service.app import create_table
from foresight.utils.database import TimeScaleService


@pytest.fixture()
def create_timescale_table():
"""Create a table in the TimescaleDB."""
table_name = create_table(table_name=f"forex_data_{str(uuid.uuid4())[0:4]}")
yield table_name
TimeScaleService().execute(query=f"DROP TABLE {table_name}")
Empty file.
Loading

0 comments on commit 2edec1a

Please sign in to comment.