Skip to content

Commit

Permalink
refactor: ♻️ Continued Indicator Refactor and Creation of `Indicato…
Browse files Browse the repository at this point in the history
…rResult`
  • Loading branch information
awhipp committed May 2, 2024
1 parent cab8382 commit 5fc6705
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 116 deletions.
100 changes: 50 additions & 50 deletions coverage.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<?xml version="1.0" ?>
<coverage version="7.4.4" timestamp="1714683889344" lines-valid="638" lines-covered="466" line-rate="0.7304" branches-covered="0" branches-valid="0" branch-rate="0" complexity="0">
<coverage version="7.4.4" timestamp="1714684221274" lines-valid="638" lines-covered="466" line-rate="0.7304" branches-covered="0" branches-valid="0" branch-rate="0" complexity="0">
<!-- Generated by coverage.py: https://coverage.readthedocs.io/en/7.4.4 -->
<!-- Based on https://raw.githubusercontent.com/cobertura/web/master/htdocs/xml/coverage-04.dtd -->
<sources>
Expand All @@ -26,73 +26,73 @@
<line number="3" hits="0"/>
<line number="4" hits="0"/>
<line number="5" hits="0"/>
<line number="7" hits="0"/>
<line number="6" hits="0"/>
<line number="8" hits="0"/>
<line number="9" hits="0"/>
<line number="10" hits="0"/>
<line number="11" hits="0"/>
<line number="12" hits="0"/>
<line number="14" hits="0"/>
<line number="16" hits="0"/>
<line number="19" hits="0"/>
<line number="22" hits="0"/>
<line number="15" hits="0"/>
<line number="18" hits="0"/>
<line number="21" hits="0"/>
<line number="24" hits="0"/>
<line number="25" hits="0"/>
<line number="26" hits="0"/>
<line number="27" hits="0"/>
<line number="28" hits="0"/>
<line number="29" hits="0"/>
<line number="30" hits="0"/>
<line number="32" hits="0"/>
<line number="31" hits="0"/>
<line number="38" hits="0"/>
<line number="39" hits="0"/>
<line number="40" hits="0"/>
<line number="41" hits="0"/>
<line number="42" hits="0"/>
<line number="43" hits="0"/>
<line number="44" hits="0"/>
<line number="45" hits="0"/>
<line number="46" hits="0"/>
<line number="47" hits="0"/>
<line number="48" hits="0"/>
<line number="50" hits="0"/>
<line number="49" hits="0"/>
<line number="51" hits="0"/>
<line number="52" hits="0"/>
<line number="53" hits="0"/>
<line number="54" hits="0"/>
<line number="56" hits="0"/>
<line number="58" hits="0"/>
<line number="60" hits="0"/>
<line number="62" hits="0"/>
<line number="69" hits="0"/>
<line number="71" hits="0"/>
<line number="73" hits="0"/>
<line number="55" hits="0"/>
<line number="57" hits="0"/>
<line number="59" hits="0"/>
<line number="61" hits="0"/>
<line number="68" hits="0"/>
<line number="70" hits="0"/>
<line number="72" hits="0"/>
<line number="74" hits="0"/>
<line number="75" hits="0"/>
<line number="76" hits="0"/>
<line number="80" hits="0"/>
<line number="79" hits="0"/>
<line number="83" hits="0"/>
<line number="84" hits="0"/>
<line number="85" hits="0"/>
<line number="88" hits="0"/>
<line number="89" hits="0"/>
<line number="90" hits="0"/>
<line number="91" hits="0"/>
<line number="95" hits="0"/>
<line number="97" hits="0"/>
<line number="99" hits="0"/>
<line number="101" hits="0"/>
<line number="104" hits="0"/>
<line number="116" hits="0"/>
<line number="94" hits="0"/>
<line number="96" hits="0"/>
<line number="98" hits="0"/>
<line number="100" hits="0"/>
<line number="103" hits="0"/>
<line number="115" hits="0"/>
<line number="118" hits="0"/>
<line number="119" hits="0"/>
<line number="120" hits="0"/>
<line number="126" hits="0"/>
<line number="128" hits="0"/>
<line number="132" hits="0"/>
<line number="135" hits="0"/>
<line number="138" hits="0"/>
<line number="140" hits="0"/>
<line number="142" hits="0"/>
<line number="125" hits="0"/>
<line number="127" hits="0"/>
<line number="131" hits="0"/>
<line number="134" hits="0"/>
<line number="137" hits="0"/>
<line number="139" hits="0"/>
<line number="141" hits="0"/>
<line number="143" hits="0"/>
<line number="144" hits="0"/>
<line number="145" hits="0"/>
<line number="146" hits="0"/>
<line number="147" hits="0"/>
<line number="148" hits="0"/>
<line number="149" hits="0"/>
<line number="151" hits="0"/>
<line number="153" hits="0"/>
<line number="155" hits="0"/>
<line number="150" hits="0"/>
<line number="152" hits="0"/>
<line number="154" hits="0"/>
</lines>
</class>
<class name="moving_average_indicator.py" filename="foresight/indicator_services/moving_average_indicator.py" complexity="0" line-rate="0" branch-rate="0">
Expand Down Expand Up @@ -374,7 +374,7 @@
<class name="exceptions.py" filename="foresight/utils/exceptions.py" complexity="0" line-rate="0" branch-rate="0">
<methods/>
<lines>
<line number="4" hits="0"/>
<line number="5" hits="0"/>
</lines>
</class>
<class name="logger.py" filename="foresight/utils/logger.py" complexity="0" line-rate="1" branch-rate="0">
Expand Down Expand Up @@ -486,13 +486,13 @@
<line number="58" hits="1"/>
<line number="60" hits="1"/>
<line number="68" hits="1"/>
<line number="79" hits="1"/>
<line number="80" hits="1"/>
<line number="90" hits="1"/>
<line number="91" hits="1"/>
<line number="95" hits="1"/>
<line number="96" hits="0"/>
<line number="97" hits="0"/>
<line number="81" hits="1"/>
<line number="82" hits="1"/>
<line number="92" hits="1"/>
<line number="93" hits="1"/>
<line number="97" hits="1"/>
<line number="98" hits="0"/>
<line number="99" hits="0"/>
</lines>
</class>
</classes>
Expand Down
82 changes: 22 additions & 60 deletions foresight/indicator_services/indicator.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
"""Indicator Superclass"""

import datetime
import json
import time
from typing import Literal

import pandas as pd
from boto3_type_annotations.sqs import Client
from utils.aws import get_client
from utils.database import TimeScaleService
from utils.models.subscription_feed import SubscriptionFeed

from foresight.utils.aws import get_client
from foresight.utils.exceptions import AbstractClassError
from foresight.utils.logger import generate_logger
from foresight.utils.models.indicator_result import IndicatorResult
from foresight.utils.models.subscription_feed import SubscriptionFeed


logger = generate_logger(name=__name__)
Expand All @@ -35,52 +34,38 @@ def __init__(
timescale: str,
order_type: str = "mid",
):
"""Initialize the Indicator Class"""
if isinstance(self, Indicator):
raise AbstractClassError("<Indicator> must be subclassed.")

# Instantiate variables
self.component_name = component_name
self.order_type = order_type
self.instrument = instrument
self.timescale = timescale

self.queue_url = self.create_queue()
self.add_subscription_record()

def create_queue(self) -> str:
"""Create a queue."""
# Create Queue
sqs_client: Client = get_client("sqs")
queue_name = f"{self.component_name}_{self.instrument}_indicator_queue"
response = sqs_client.create_queue(QueueName=queue_name)

logger.info(f"Created queue: {queue_name}")
self.queue_url = response["QueueUrl"]

return response["QueueUrl"]

def add_subscription_record(self):
"""Add a subscription record."""
# Add subscription to feed for window service
SubscriptionFeed(
queue_url=self.queue_url,
instrument=self.instrument,
timescale=self.timescale,
order_type=self.order_type,
).insertOrUpdate()

).insert_or_update()
logger.info(f"Added subscription record for {self.component_name}")

def pull_from_queue(self):
def poll(self):
"""Pulls from Queue and returns a DataFrame."""
sqs_client: Client = get_client("sqs")

logger.info(f"Counting messages in queue: {self.queue_url}")
response = sqs_client.get_queue_attributes(
QueueUrl=self.queue_url,
AttributeNames=["ApproximateNumberOfMessages"],
)
logger.info(
f"Messages in queue: {response['Attributes']['ApproximateNumberOfMessages']}",
)

logger.info(f"Pulling from queue: {self.queue_url}")
logger.info(f"Polling from queue: {self.queue_url}")
response = sqs_client.receive_message(
QueueUrl=self.queue_url,
MaxNumberOfMessages=1,
Expand All @@ -93,37 +78,6 @@ def pull_from_queue(self):
)
self.pricing = json.loads(message["Body"])

def do_work(self) -> dict:
"""Calculate the value of the indicator."""
raise NotImplementedError("Subclasses must implement this method.")

def create_indicator_table(self):
"""Create a table in the data store."""
# ! TODO - Move to Indicator Results Model
TimeScaleService().create_table(
query="""
CREATE TABLE IF NOT EXISTS indicator_results (
component_name VARCHAR(255) NOT NULL,
time TIMESTAMPTZ NOT NULL,
value TEXT NOT NULL,
PRIMARY KEY (component_name, time)
)""",
hyper_table_name="indicator_results",
hyper_table_column="time",
)

def save_indicator_results(self, value: str):
"""Save the results of the indicator."""
# ! TODO - Move to Indicator Results Model
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
TimeScaleService().execute(
query=f"""
INSERT INTO indicator_results (component_name, time, value)
VALUES ('{self.component_name}', '{timestamp}', '{value}')
""",
)
logger.info(f"Saved indicator results for {self.component_name}")

def format_pricing_data(self) -> dict:
"""'Calculate the all price data for the instrument as a list of json objects"""
# ! TODO - Should be handled in previous stage
Expand All @@ -138,17 +92,25 @@ def format_pricing_data(self) -> dict:

self.pricing = data.to_dict("records")

def do_work(self) -> dict:
"""Calculate the value of the indicator."""
raise NotImplementedError("Subclasses must implement this method.")

def schedule_work(self):
"""Scheduled for every minute."""
self.create_indicator_table()
IndicatorResult.create_table()

while True:
self.pull_from_queue()
self.poll()

if len(self.pricing) > 0:
self.format_pricing_data()

result = self.do_work()

self.save_indicator_results(value=json.dumps(result))
IndicatorResult(
component_name=self.component_name,
value=json.dumps(result),
).insert()

time.sleep(5)
51 changes: 51 additions & 0 deletions foresight/utils/models/indicator_result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""Pydantic model that stores the Indicator results."""

from datetime import datetime
from typing import Optional

from pydantic import BaseModel

from foresight.utils.database import TimeScaleService


class IndicatorResult(BaseModel):
"""Model representing a result for a given component and its value.
Args:
"""

component_name: str
time: Optional[datetime] = datetime.now()
value: str

@staticmethod
def create_table(table_name: str = "indicator_results") -> str:
"""Create a table in the data store if it does not exist.
Args:
table_name (str): The name of the table to create.
Returns:
str: The name of the table created.
"""
TimeScaleService().create_table(
query=f"""
CREATE TABLE IF NOT EXISTS {table_name} (
component_name VARCHAR(255) NOT NULL,
time TIMESTAMPTZ NOT NULL,
value TEXT NOT NULL,
PRIMARY KEY (component_name, time)
)""",
table_name=table_name,
column_name="time",
)

return table_name

def insert(self, table_name: str = "indicator_results"):
"""Insert indicator results into the database."""
TimeScaleService().execute(
query=f"""INSERT INTO {table_name} (component_name, time, value)
VALUES (%s, %s, %s)""",
params=(self.component_name, self.time, self.value),
)
2 changes: 1 addition & 1 deletion foresight/utils/models/subscription_feed.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def drop_table(table_name: str = "subscription_feed"):
# Execute SQL queries here
TimeScaleService().execute(query=f"DROP TABLE {table_name}")

def insertOrUpdate(self, table_name: str = "subscription_feed"):
def insert_or_update(self, table_name: str = "subscription_feed"):
"""
Insert or replaces a feed record in the database.
Expand Down
8 changes: 4 additions & 4 deletions tests/utils/models/subscription_feeds_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ def test_create_table():


@pytest.mark.usefixtures("setup_subscription_feed_table")
def test_insert_and_fetch():
"""Insert and fetch subscription feed."""
def test_insert_or_update_and_fetch():
"""Testing `insert_or_update` and `fetch` subscription feed."""

# ARRANGE
feed = SubscriptionFeed(
Expand All @@ -89,7 +89,7 @@ def test_insert_and_fetch():
)

# ACT
feed.insertOrUpdate()
feed.insert_or_update()

data = SubscriptionFeed.fetch()

Expand All @@ -106,7 +106,7 @@ def test_insert_and_fetch():
feed.order_type = "ask"

# ACT AGAIN
feed.insertOrUpdate()
feed.insert_or_update()

data = SubscriptionFeed.fetch()

Expand Down
Loading

1 comment on commit 5fc6705

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Coverage

Code Coverage
FileStmtsMissCoverMissing
foresight
   __init__.py00100% 
foresight/indicator_services
   __init__.py00100% 
   indicator.py56560%3–5, 7–8, 10–14, 17, 20, 23–28, 30, 38–39, 42–45, 48–50, 52–53, 56, 62, 64, 66, 68–69, 73–75, 79, 81, 85, 88, 91, 93, 95, 97, 99, 101, 103–104, 106–107, 109, 111, 116
   moving_average_indicator.py18180%3, 5–6, 9, 12, 20–21, 28, 30, 33–34, 36–37, 40, 42, 45–46, 51
foresight/interface_service
   __init__.py00100% 
   app.py31310%3–4, 6–9, 11–12, 15, 18, 20, 23, 25, 33–34, 36, 39–40, 42, 45–46, 48–51, 53–54, 56, 59–61
foresight/stream_service
   __init__.py00100% 
   app.py711381%82, 110, 131, 133–134, 139, 142–148
foresight/stream_service/models
   __init__.py00100% 
   pricing.py40100% 
   stream.py180100% 
foresight/utils
   __init__.py00100% 
   aws.py12558%14, 33, 35–36, 43
   database.py551180%52–53, 70–71, 76, 91–92, 94, 98–100
   exceptions.py110%5
   logger.py90100% 
foresight/utils/models
   __init__.py00100% 
   forex_data.py61296%164–165
   indicator_result.py14140%3–4, 6, 8, 11, 17–19, 21–22, 31, 43, 45, 47
   subscription_feed.py25292%98–99
foresight/window_service
   __init__.py00100% 
   app.py351071%20, 25–27, 70–71, 75, 77–78, 80
tests
   conftest.py240100% 
tests/stream_service
   conftest.py19952%21–22, 30, 33, 37–41
   stream_service_test.py410100% 
tests/utils/models
   forex_data_test.py610100% 
   subscription_feeds_test.py460100% 
tests/window_service
   window_service_test.py370100% 
TOTAL63817273% 

Please sign in to comment.