Skip to content

Commit

Permalink
Add QuasarDB backend (#1007)
Browse files Browse the repository at this point in the history
* added quasar backend (not finished)

* added demo for quasar backend

* deleted unused imports

* added Ticker, Candles, Trades support

* Trades are now saved to two tables depending on side

* updated quasar demo

* removed unused callbacks

* added shard_size to QuasarCallback,
added table creation,
added comments explaining _set_table_name method

* added candles channel,
moved opening connection from init to write

* writing to db is now accomplished with connection pool,
formated file

* added console output for channels

* fixed issue where callbacks dict keys was overwritten

* symbol column is now of symbol type,
only one table per callback,
create table query is created for each callback type

* removed binance

* changed symbol tables, exchange is now of symbol type

* add more channels

* add initial BookQuasar implementation

* change plugin name from quasar to quasardb

* add quasrdb backend to setup

* add quasardb to INSTALL.md

* add quasardb to README.md

* Update INSTALL.md

* change "book" create table query

* BookQuasar now stores only best ask, best bid

* fix best_ask_amount

* add more examples

* add comment to BookQuasar format method

* fix naming

* change queries

* use numpy instead of pandas

* swap timestamp with receipt_timestamp

* update quasardb requirements

* small changes

* Add QuasarDB to AUTHORS.md

* use binance for TICKER channel

* replace host and port args with uri in QuasarCallback
  • Loading branch information
igorniebylski authored Feb 27, 2024
1 parent 7a982de commit db8d22d
Show file tree
Hide file tree
Showing 6 changed files with 217 additions and 1 deletion.
3 changes: 2 additions & 1 deletion AUTHORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ Cryptofeed was originally created by Bryant Moscon, but many others have contrib
* [O. Janche](https://github.com/toyan) - <[email protected]>
* [Bastien Enjalbert](https://github.com/bastienjalbert) - <[email protected]>
* [Jonggyun Kim](https://github.com/gyunt) - <[email protected]>
* [Thomas Bouamoud](https://github.com/thomasbs17) - <[email protected]>
* [QuasarDB](https://quasar.ai/)
* [Thomas Bouamoud](https://github.com/thomasbs17) - <[email protected]>
5 changes: 5 additions & 0 deletions INSTALL.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ for the exhaustive list of these *extra* dependencies.

pip install --user --upgrade cryptofeed[postgres]

* QuasarDB backend
To install Cryptofeed along with [QuasarDB](https://quasar.ai/) in one bundle:

pip install --user --upgrade cryptofeed[quasardb]

* RabbitMQ backend

pip install --user --upgrade cryptofeed[rabbit]
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ Supported Backends:
* Kafka
* RabbitMQ
* PostgreSQL
* [QuasarDB](https://quasar.ai/)
* GCP Pub/Sub
* [QuestDB](https://questdb.io/)

Expand Down
175 changes: 175 additions & 0 deletions cryptofeed/backends/quasardb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
from datetime import datetime, timedelta
import quasardb.pool as pool
import quasardb.numpy as qdbnp
import numpy as np
from cryptofeed.backends.backend import BackendCallback


class QuasarCallback(BackendCallback):
def __init__(self, uri="qdb://127.0.0.1:2836", username: str = "", private_key: str = "", public_key: str = "", none_to=None, shard_size: timedelta = timedelta(minutes=15)):
self.numeric_type = float
self.table = ""
self.running = True
self.none_to = none_to
self.shard_size = self._get_str_timedelta(shard_size)

pool.initialize(uri=uri, user_name=username, user_private_key=private_key, cluster_public_key=public_key)

def _get_str_timedelta(self, delta: timedelta):
# calculate the number of hours, minutes, and remaining seconds from timedelta, return it in correct format for query
hours, remainder = divmod(delta.total_seconds(), 3600)
minutes, seconds = divmod(remainder, 60)
return f"{int(hours)}hour {int(minutes)}min {int(seconds)}s"

def format(self, data: dict):
data['timestamp'] = np.datetime64(datetime.utcfromtimestamp(data['timestamp']), 'ns')
data['receipt_timestamp'] = np.datetime64(datetime.utcfromtimestamp(data['receipt_timestamp']), 'ns')
data['timestamp'], data['receipt_timestamp'] = data['receipt_timestamp'], data['timestamp']
index = data['timestamp']
data.pop('timestamp')
return index, data

def _set_table_name(self, data: dict):
# setting table name
# {channel}/{exchange}/{symbol_1-symbol_2}
# eg. ticker/coinbase/btc-usd
self.table = f"{self.table_prefix.lower()}/{data['exchange'].lower()}/{data['symbol'].lower()}"

def _create_table(self, conn):
if not conn.table(self.table).exists():
conn.query(self.query)

def _insert_format(self, date: np.datetime64, data: dict):
# converts values to np.array
for key, value in data.items():
data[key] = np.array([value])
return np.array([date]), data

async def write(self, data: dict):
self._set_table_name(data)
self._create_query()
index, data = self.format(data)
idx, np_array = self._insert_format(index, data)
# write to table, if table doesnt exist it will be created with specified shard_size value
with pool.instance().connect() as conn:
self._create_table(conn)
qdbnp.write_arrays(np_array, conn, conn.table(self.table), index=idx, fast=True, _async=True)


class TickerQuasar(QuasarCallback):
table_prefix = "ticker"

def _create_query(self):
self.query = f'CREATE TABLE "{self.table}" (exchange SYMBOL(exchange), symbol SYMBOL(symbol), bid DOUBLE, ask DOUBLE, receipt_timestamp TIMESTAMP) SHARD_SIZE = {self.shard_size}'


class TradeQuasar(QuasarCallback):
table_prefix = "trades"

def _create_query(self):
self.query = f'CREATE TABLE "{self.table}" (exchange SYMBOL(exchange), symbol SYMBOL(symbol), side SYMBOL(side), amount DOUBLE, price DOUBLE, id STRING, type SYMBOL(type), receipt_timestamp TIMESTAMP) SHARD_SIZE = {self.shard_size}'


class CandlesQuasar(QuasarCallback):
table_prefix = "candles"

def format(self, data: dict):
index, data = super().format(data)
data['start'] = datetime.utcfromtimestamp(data['start'])
data['stop'] = datetime.utcfromtimestamp(data['stop'])
data['closed'] = int(data['closed'])
return index, data

def _create_query(self):
self.query = f'CREATE TABLE "{self.table}" (exchange SYMBOL(exchange), symbol SYMBOL(symbol), start TIMESTAMP, stop TIMESTAMP, interval STRING, trades STRING, open DOUBLE, close DOUBLE, high DOUBLE, low DOUBLE, volume DOUBLE, closed INT64, receipt_timestamp TIMESTAMP) SHARD_SIZE = {self.shard_size}'


class FundingQuasar(QuasarCallback):
table_prefix = "funding"

def format(self, data: dict):
index, data = super().format(data)
data['next_funding_time'] = datetime.utcfromtimestamp(data['next_funding_time'])
return index, data

def _create_query(self):
self.query = f'CREATE TABLE "{self.table}" (exchange SYMBOL(exchange), symbol SYMBOL(symbol), mark_price DOUBLE, rate DOUBLE, next_funding_time TIMESTAMP, predicted_rate DOUBLE, receipt_timestamp TIMESTAMP) SHARD_SIZE = {self.shard_size}'


class BookQuasar(QuasarCallback):
table_prefix = "book"

def format(self, data: dict):
index, data = super().format(data)
# store only best bid and best ask
if not data['book']:
best_bid = max(data["delta"]["bid"], key=lambda x: x[0])
best_ask = min(data["delta"]["ask"], key=lambda x: x[0])

data['best_bid_price'] = best_bid[0]
data['best_bid_amount'] = best_bid[1]
data['best_ask_price'] = best_ask[0]
data['best_ask_amount'] = best_ask[1]
data.pop('delta')
else:
best_bid = max(data["book"]["bid"].keys())
best_ask = min(data["book"]["ask"].keys())

data['best_bid_price'] = best_bid
data['best_bid_amount'] = data["book"]["bid"][best_bid]
data['best_ask_price'] = best_ask
data['best_ask_amount'] = data["book"]["ask"][best_ask]
data.pop('book')
return index, data

def _create_query(self):
self.query = f'CREATE TABLE "{self.table}" (exchange SYMBOL(exchange), symbol SYMBOL(symbol), best_bid_price DOUBLE, best_bid_amount DOUBLE, best_ask_price DOUBLE, best_ask_amount DOUBLE, receipt_timestamp TIMESTAMP) SHARD_SIZE = {self.shard_size}'


class LiquidationsQuasar(QuasarCallback):
table_prefix = "liquidations"

def _create_query(self):
self.query = f'CREATE TABLE "{self.table}" (exchange SYMBOL(exchange), symbol SYMBOL(symbol), side SYMBOL(side), quantity DOUBLE, price DOUBLE, id STRING, status SYMBOL(type), receipt_timestamp TIMESTAMP) SHARD_SIZE = {self.shard_size}'


class OpenInterestQuasar(QuasarCallback):
table_prefix = "open_interest"

def _create_query(self):
self.query = f'CREATE TABLE "{self.table}" (exchange SYMBOL(exchange), symbol SYMBOL(symbol), open_interest FLOAT, receipt_timestamp TIMESTAMP) SHARD_SIZE = {self.shard_size}'


class OrderInfoQuasar(QuasarCallback):
table_prefix = "order_info"

def _create_query(self):
self.query = f'CREATE TABLE "{self.table}" (exchange SYMBOL(exchange), symbol SYMBOL(symbol), id STRING, client_order_id STRING, side SYMBOL(side), status SYMBOL(type), type SYMBOL(type), price DOUBLE, amount DOUBLE, remaining DOUBLE, account STRING, receipt_timestamp TIMESTAMP) SHARD_SIZE = {self.shard_size}'


class TransactionsQuasar(QuasarCallback):
table_prefix = "transactions"

def _create_query(self):
self.query = f'CREATE TABLE "{self.table}" (exchange SYMBOL(exchange), currency SYMBOL(currency), type SYMBOL(type), status SYMBOL(type), amount DOUBLE, receipt_timestamp TIMESTAMP) SHARD_SIZE = {self.shard_size}'


class BalancesQuasar(QuasarCallback):
table_prefix = "balances"

def _create_query(self):
self.query = f'CREATE TABLE "{self.table}" (exchange SYMBOL(exchange), currency SYMBOL(currency), balance DOUBLE, reserved DOUBLE, receipt_timestamp TIMESTAMP) SHARD_SIZE = {self.shard_size}'


class FillsQuasar(QuasarCallback):
table_prefix = "fills"

def _create_query(self):
self.query = f'CREATE TABLE "{self.table}" (exchange SYMBOL(exchange), symbol SYMBOL(symbol), price DOUBLE, amount DOUBLE, side SYMBOL(side), fee DOUBLE, id STRING, order_id STRING, liquidity DOUBLE, type SYMBOL(type), account STRING, receipt_timestamp TIMESTAMP) SHARD_SIZE = {self.shard_size}'


class IndexQuasar(QuasarCallback):
table_prefix = "index"

def _create_query(self):
self.query = f'CREATE TABLE "{self.table}" (exchange SYMBOL(exchange), symbol SYMBOL(symbol), price DOUBLE, receipt_timestamp TIMESTAMP) SHARD_SIZE = {self.shard_size}'
33 changes: 33 additions & 0 deletions examples/demo_quasardb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from cryptofeed import FeedHandler
from cryptofeed.exchanges import *
from cryptofeed.backends.quasardb import *


async def feed_info(data, receipt_timestamp):
print(f'{data} recived at {receipt_timestamp}')


def main():
f = FeedHandler()

# save to database
f.add_feed(Binance(channels=[TICKER], symbols=['BTC-USDT'], callbacks={TICKER: TickerQuasar()}))
f.add_feed(Coinbase(channels=[TRADES], symbols=['BTC-USD'], callbacks={TRADES: TradeQuasar()}))
f.add_feed(Bybit(channels=[CANDLES], symbols=['BTC-USD-PERP'], callbacks={CANDLES: CandlesQuasar()}))
f.add_feed(Bybit(channels=[OPEN_INTEREST], symbols=['BTC-USD-PERP'], callbacks={OPEN_INTEREST: OpenInterestQuasar()}))
f.add_feed(Bybit(channels=[INDEX], symbols=['BTC-USD-PERP'], callbacks={INDEX: IndexQuasar()}))
f.add_feed(Bybit(channels=[LIQUIDATIONS], symbols=['BTC-USD-PERP'], callbacks={LIQUIDATIONS: LiquidationsQuasar()}))

# print to console
f.add_feed(Binance(channels=[TICKER], symbols=['BTC-USDT'], callbacks={TICKER: feed_info}))
f.add_feed(Coinbase(channels=[TRADES], symbols=['BTC-USD'], callbacks={TRADES: feed_info}))
f.add_feed(Bybit(channels=[CANDLES], symbols=['BTC-USD-PERP'], callbacks={CANDLES: feed_info}))
f.add_feed(Bybit(channels=[OPEN_INTEREST], symbols=['BTC-USD-PERP'], callbacks={OPEN_INTEREST: feed_info}))
f.add_feed(Bybit(channels=[INDEX], symbols=['BTC-USD-PERP'], callbacks={INDEX: feed_info}))
f.add_feed(Bybit(channels=[LIQUIDATIONS], symbols=['BTC-USD-PERP'], callbacks={LIQUIDATIONS: feed_info}))

f.run()


if __name__ == '__main__':
main()
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def run_tests(self):
"kafka": ["aiokafka>=0.7.0"],
"mongo": ["motor"],
"postgres": ["asyncpg"],
"quasardb": ["quasardb", "numpy"],
"rabbit": ["aio_pika", "pika"],
"redis": ["hiredis", "redis>=4.5.1"],
"zmq": ["pyzmq"],
Expand Down

0 comments on commit db8d22d

Please sign in to comment.