From df692b6cf29803f6be1b1792a624a4ba5db49eed Mon Sep 17 00:00:00 2001 From: Thomas Bouamoud <64446011+thomasbs17@users.noreply.github.com> Date: Sun, 4 Feb 2024 00:10:58 +0000 Subject: [PATCH] Coinbase migration to Advanced Trade (#1005) * Updated Coinbase: from Coinbase Pro to Advanced Trade * updated AUTHORS.md * finalized Coinbase updates * flake8 * bug and typo fix * bug fix * bug fix * Removed seq_no as per doc: Subscribe to the level2 channel to guarantee that messages are delivered and your order book is in sync. * updates based on PR comments * updated CHANGES.md based on PR comment --- AUTHORS.md | 1 + CHANGES.md | 1 + cryptofeed/exchange.py | 6 +- cryptofeed/exchanges/coinbase.py | 395 ++++++++----------------------- 4 files changed, 103 insertions(+), 300 deletions(-) diff --git a/AUTHORS.md b/AUTHORS.md index 0f26272f0..6479f20fe 100644 --- a/AUTHORS.md +++ b/AUTHORS.md @@ -14,3 +14,4 @@ Cryptofeed was originally created by Bryant Moscon, but many others have contrib * [O. Janche](https://github.com/toyan) - * [Bastien Enjalbert](https://github.com/bastienjalbert) - * [Jonggyun Kim](https://github.com/gyunt) - +* [Thomas Bouamoud](https://github.com/thomasbs17) - \ No newline at end of file diff --git a/CHANGES.md b/CHANGES.md index 1708aceb3..673bbe7a6 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -3,6 +3,7 @@ ### 2.4.1 * Bugfix: Handle empty nextFundingRate in OKX * Bugfix: Handle null next_funding_time and estimated_rate in HuobiSwap funding + * Update: transitioned from Coinbase Pro (retired) to Coinbase Advanced Trade ### 2.4.0 (2024-01-07) * Update: Fix tests diff --git a/cryptofeed/exchange.py b/cryptofeed/exchange.py index a698db805..6917fd7a4 100644 --- a/cryptofeed/exchange.py +++ b/cryptofeed/exchange.py @@ -87,7 +87,7 @@ def _symbol_endpoint_prepare(cls, ep: RestEndpoint) -> Union[List[str], str]: return ep.route('instruments') @classmethod - def symbol_mapping(cls, refresh=False) -> Dict: + def symbol_mapping(cls, refresh=False, headers: dict = None) -> Dict: if Symbols.populated(cls.id) and not refresh: return Symbols.get(cls.id)[0] try: @@ -97,10 +97,10 @@ def symbol_mapping(cls, refresh=False) -> Dict: if isinstance(addr, list): for ep in addr: LOG.debug("%s: reading symbol information from %s", cls.id, ep) - data.append(cls.http_sync.read(ep, json=True, uuid=cls.id)) + data.append(cls.http_sync.read(ep, json=True, headers=headers, uuid=cls.id)) else: LOG.debug("%s: reading symbol information from %s", cls.id, addr) - data.append(cls.http_sync.read(addr, json=True, uuid=cls.id)) + data.append(cls.http_sync.read(addr, json=True, headers=headers, uuid=cls.id)) syms, info = cls._parse_symbol_data(data if len(data) > 1 else data[0]) Symbols.set(cls.id, syms, info) diff --git a/cryptofeed/exchanges/coinbase.py b/cryptofeed/exchanges/coinbase.py index f3ccf89fa..3cf63882a 100644 --- a/cryptofeed/exchanges/coinbase.py +++ b/cryptofeed/exchanges/coinbase.py @@ -4,7 +4,9 @@ Please see the LICENSE file for the terms and conditions associated with this software. ''' -import asyncio +import datetime +import hashlib +import hmac import logging import time from decimal import Decimal @@ -13,27 +15,49 @@ from yapic import json +from cryptofeed.config import Config from cryptofeed.connection import AsyncConnection, RestEndpoint, Routes, WebsocketEndpoint -from cryptofeed.defines import BID, ASK, BUY, COINBASE, L2_BOOK, L3_BOOK, SELL, TICKER, TRADES +from cryptofeed.defines import BID, ASK, BUY, COINBASE, L2_BOOK, SELL, TRADES from cryptofeed.feed import Feed from cryptofeed.symbols import Symbol from cryptofeed.exchanges.mixins.coinbase_rest import CoinbaseRestMixin -from cryptofeed.types import OrderBook, Ticker, Trade - +from cryptofeed.types import OrderBook, Trade LOG = logging.getLogger('feedhandler') +def get_private_parameters(config: Config, chan: str = None, product_ids_str: list = None, + rest_api: bool = False, endpoint: str = None) -> dict: + timestamp = str(int(time.time())) + if rest_api: + base_endpoint = '/api/v3/brokerage/' + endpoint = base_endpoint + endpoint + message = f'{timestamp}GET{endpoint}' + else: + product_ids_str = ",".join(product_ids_str) + message = f"{timestamp}{chan}{product_ids_str}" + signature = hmac.new( + config["coinbase"]["key_secret"].encode("utf-8"), + message.encode("utf-8"), + digestmod=hashlib.sha256, + ).hexdigest() + if rest_api: + return {'CB-ACCESS-KEY': config["coinbase"]["key_id"], 'CB-ACCESS-TIMESTAMP': timestamp, + 'CB-ACCESS-SIGN': signature} + else: + return {'api_key': config["coinbase"]["key_id"], 'timestamp': timestamp, 'signature': signature} + + class Coinbase(Feed, CoinbaseRestMixin): id = COINBASE - websocket_endpoints = [WebsocketEndpoint('wss://ws-feed.pro.coinbase.com', options={'compression': None})] - rest_endpoints = [RestEndpoint('https://api.pro.coinbase.com', routes=Routes('/products', l3book='/products/{}/book?level=3'))] + websocket_endpoints = [WebsocketEndpoint('wss://advanced-trade-ws.coinbase.com', options={'compression': None})] + rest_endpoints = [ + RestEndpoint('https://api.coinbase.com/api/v3/brokerage', routes=Routes('/products', l3book='/product_book?product_id={}'))] + # TODO: implement candles and user channels websocket_channels = { L2_BOOK: 'level2', - L3_BOOK: 'full', - TRADES: 'matches', - TICKER: 'ticker', + TRADES: 'market_trades', } request_limit = 10 @@ -42,120 +66,46 @@ def _parse_symbol_data(cls, data: list) -> Tuple[Dict, Dict]: ret = {} info = defaultdict(dict) - for entry in data: - base, quote = entry['id'].split("-") - sym = Symbol(base, quote) + for entry in data['products']: + sym = Symbol(entry['base_currency_id'], entry['quote_currency_id']) info['tick_size'][sym.normalized] = entry['quote_increment'] info['instrument_type'][sym.normalized] = sym.type - ret[sym.normalized] = entry['id'] + ret[sym.normalized] = entry['product_id'] return ret, info + @classmethod + def symbols(cls, config: dict = None, refresh=False) -> list: + config = Config(config) + if 'coinbase' not in config or 'key_id' not in config['coinbase'] or 'key_secret' not in config['coinbase']: + raise ValueError('You must provide key_id and key_secret in config to retrieve symbols from Coinbase.') + headers = get_private_parameters(config, rest_api=True, endpoint='products') + return list(cls.symbol_mapping(refresh=refresh, headers=headers).keys()) + def __init__(self, callbacks=None, **kwargs): super().__init__(callbacks=callbacks, **kwargs) - # we only keep track of the L3 order book if we have at least one subscribed order-book callback. - # use case: subscribing to the L3 book plus Trade type gives you order_type information (see _received below), - # and we don't need to do the rest of the book-keeping unless we have an active callback - self.keep_l3_book = False - if callbacks and L3_BOOK in callbacks: - self.keep_l3_book = True self.__reset() def __reset(self): - self.order_map = {} - self.order_type_map = {} - self.seq_no = None - # sequence number validation only works when the FULL data stream is enabled - chan = self.std_channel_to_exchange(L3_BOOK) - if chan in self.subscription: - pairs = self.subscription[chan] - self.seq_no = {pair: None for pair in pairs} - self._l3_book = {} self._l2_book = {} - async def _ticker(self, msg: dict, timestamp: float): - ''' - { - 'type': 'ticker', - 'sequence': 5928281084, - 'product_id': 'BTC-USD', - 'price': '8500.01000000', - 'open_24h': '8217.24000000', - 'volume_24h': '4529.1293778', - 'low_24h': '8172.00000000', - 'high_24h': '8600.00000000', - 'volume_30d': '329178.93594133', - 'best_bid': '8500', - 'best_ask': '8500.01' - } - - { - 'type': 'ticker', - 'sequence': 5928281348, - 'product_id': 'BTC-USD', - 'price': '8500.00000000', - 'open_24h': '8217.24000000', - 'volume_24h': '4529.13179472', - 'low_24h': '8172.00000000', - 'high_24h': '8600.00000000', - 'volume_30d': '329178.93835825', - 'best_bid': '8500', - 'best_ask': '8500.01', - 'side': 'sell', - 'time': '2018-05-21T00:30:11.587000Z', - 'trade_id': 43736677, - 'last_size': '0.00241692' - } - ''' - - ts = self.timestamp_normalize(msg['time']) if 'time' in msg else None - await self.callback(TICKER, Ticker(self.id, self.exchange_symbol_to_std_symbol(msg['product_id']), Decimal(msg['best_bid']), Decimal(msg['best_ask']), ts, raw=msg), timestamp) - - async def _book_update(self, msg: dict, timestamp: float): + async def _trade_update(self, msg: dict, timestamp: float): ''' { - 'type': 'match', or last_match 'trade_id': 43736593 - 'maker_order_id': '2663b65f-b74e-4513-909d-975e3910cf22', - 'taker_order_id': 'd058d737-87f1-4763-bbb4-c2ccf2a40bde', - 'side': 'buy', + 'side': 'BUY' or 'SELL', 'size': '0.01235647', 'price': '8506.26000000', 'product_id': 'BTC-USD', - 'sequence': 5928276661, 'time': '2018-05-21T00:26:05.585000Z' } ''' pair = self.exchange_symbol_to_std_symbol(msg['product_id']) ts = self.timestamp_normalize(msg['time']) - - if self.keep_l3_book and 'full' in self.subscription and pair in self.subscription['full']: - delta = {BID: [], ASK: []} - price = Decimal(msg['price']) - side = ASK if msg['side'] == 'sell' else BID - size = Decimal(msg['size']) - maker_order_id = msg['maker_order_id'] - - _, new_size = self.order_map[maker_order_id] - new_size -= size - if new_size <= 0: - del self.order_map[maker_order_id] - self.order_type_map.pop(maker_order_id, None) - delta[side].append((maker_order_id, price, 0)) - del self._l3_book[pair].book[side][price][maker_order_id] - if len(self._l3_book[pair].book[side][price]) == 0: - del self._l3_book[pair].book[side][price] - else: - self.order_map[maker_order_id] = (price, new_size) - self._l3_book[pair].book[side][price][maker_order_id] = new_size - delta[side].append((maker_order_id, price, new_size)) - - await self.book_callback(L3_BOOK, self._l3_book[pair], timestamp, timestamp=ts, delta=delta, raw=msg, sequence_number=self.seq_no[pair]) - - order_type = self.order_type_map.get(msg['taker_order_id']) + order_type = 'market' t = Trade( self.id, - self.exchange_symbol_to_std_symbol(msg['product_id']), - SELL if msg['side'] == 'buy' else BUY, + pair, + SELL if msg['side'] == 'SELL' else BUY, Decimal(msg['size']), Decimal(msg['price']), ts, @@ -167,8 +117,10 @@ async def _book_update(self, msg: dict, timestamp: float): async def _pair_level2_snapshot(self, msg: dict, timestamp: float): pair = self.exchange_symbol_to_std_symbol(msg['product_id']) - bids = {Decimal(price): Decimal(amount) for price, amount in msg['bids']} - asks = {Decimal(price): Decimal(amount) for price, amount in msg['asks']} + bids = {Decimal(update['price_level']): Decimal(update['new_quantity']) for update in msg['updates'] if + update['side'] == 'bid'} + asks = {Decimal(update['price_level']): Decimal(update['new_quantity']) for update in msg['updates'] if + update['side'] == 'ask'} if pair not in self._l2_book: self._l2_book[pair] = OrderBook(self.id, pair, max_depth=self.max_depth, bids=bids, asks=asks) else: @@ -177,215 +129,64 @@ async def _pair_level2_snapshot(self, msg: dict, timestamp: float): await self.book_callback(L2_BOOK, self._l2_book[pair], timestamp, raw=msg) - async def _pair_level2_update(self, msg: dict, timestamp: float): + async def _pair_level2_update(self, msg: dict, timestamp: float, ts: datetime): pair = self.exchange_symbol_to_std_symbol(msg['product_id']) - ts = self.timestamp_normalize(msg['time']) delta = {BID: [], ASK: []} - for side, price, amount in msg['changes']: - side = BID if side == 'buy' else ASK - price = Decimal(price) - amount = Decimal(amount) + for update in msg['updates']: + side = BID if update['side'] == 'bid' else ASK + price = Decimal(update['price_level']) + amount = Decimal(update['new_quantity']) if amount == 0: - del self._l2_book[pair].book[side][price] - delta[side].append((price, 0)) + if price in self._l2_book[pair].book[side]: + del self._l2_book[pair].book[side][price] + delta[side].append((price, 0)) else: self._l2_book[pair].book[side][price] = amount delta[side].append((price, amount)) await self.book_callback(L2_BOOK, self._l2_book[pair], timestamp, timestamp=ts, raw=msg, delta=delta) - async def _book_snapshot(self, pairs: list): - # Coinbase needs some time to send messages to us - # before we request the snapshot. If we don't sleep - # the snapshot seq no could be much earlier than - # the subsequent messages, causing a seq no mismatch. - await asyncio.sleep(2) - - urls = [self.rest_endpoints[0].route('l3book', self.sandbox).format(pair) for pair in pairs] - - results = [] - for url in urls: - ret = await self.http_conn.read(url) - results.append(ret) - # rate limit - 3 per second - await asyncio.sleep(0.3) - - timestamp = time.time() - for res, pair in zip(results, pairs): - orders = json.loads(res, parse_float=Decimal) - npair = self.exchange_symbol_to_std_symbol(pair) - self._l3_book[npair] = OrderBook(self.id, pair, max_depth=self.max_depth) - self.seq_no[npair] = orders['sequence'] - for side in (BID, ASK): - for price, size, order_id in orders[side + 's']: - price = Decimal(price) - size = Decimal(size) - if price in self._l3_book[npair].book[side]: - self._l3_book[npair].book[side][price][order_id] = size - else: - self._l3_book[npair].book[side][price] = {order_id: size} - self.order_map[order_id] = (price, size) - await self.book_callback(L3_BOOK, self._l3_book[npair], timestamp, raw=orders) - - async def _open(self, msg: dict, timestamp: float): - if not self.keep_l3_book: - return - delta = {BID: [], ASK: []} - price = Decimal(msg['price']) - side = ASK if msg['side'] == 'sell' else BID - size = Decimal(msg['remaining_size']) - pair = self.exchange_symbol_to_std_symbol(msg['product_id']) - order_id = msg['order_id'] - ts = self.timestamp_normalize(msg['time']) - - if price in self._l3_book[pair].book[side]: - self._l3_book[pair].book[side][price][order_id] = size - else: - self._l3_book[pair].book[side][price] = {order_id: size} - self.order_map[order_id] = (price, size) - - delta[side].append((order_id, price, size)) - - await self.book_callback(L3_BOOK, self._l3_book[pair], timestamp, timestamp=ts, delta=delta, raw=msg, sequence_number=msg['sequence']) - - async def _done(self, msg: dict, timestamp: float): - """ - per Coinbase API Docs: - - A done message will be sent for received orders which are fully filled or canceled due - to self-trade prevention. There will be no open message for such orders. Done messages - for orders which are not on the book should be ignored when maintaining a real-time order book. - """ - if 'price' not in msg: - # market order life cycle: received -> done - self.order_type_map.pop(msg['order_id'], None) - self.order_map.pop(msg['order_id'], None) - return - - order_id = msg['order_id'] - self.order_type_map.pop(order_id, None) - if order_id not in self.order_map: - return - - del self.order_map[order_id] - if self.keep_l3_book: - delta = {BID: [], ASK: []} - - price = Decimal(msg['price']) - side = ASK if msg['side'] == 'sell' else BID - pair = self.exchange_symbol_to_std_symbol(msg['product_id']) - ts = self.timestamp_normalize(msg['time']) - - del self._l3_book[pair].book[side][price][order_id] - if len(self._l3_book[pair].book[side][price]) == 0: - del self._l3_book[pair].book[side][price] - delta[side].append((order_id, price, 0)) - - await self.book_callback(L3_BOOK, self._l3_book[pair], timestamp, delta=delta, timestamp=ts, raw=msg, sequence_number=msg['sequence']) - - async def _received(self, msg: dict, timestamp: float): - """ - per Coinbase docs: - A valid order has been received and is now active. This message is emitted for every single - valid order as soon as the matching engine receives it whether it fills immediately or not. - - This message is the only time we receive the order type (limit vs market) for a given order, - so we keep it in a map by order ID. - """ - order_id = msg["order_id"] - order_type = msg["order_type"] - self.order_type_map[order_id] = order_type - - async def _change(self, msg: dict, timestamp: float): - """ - Like done, these updates can be sent for orders that are not in the book. Per the docs: - - Not all done or change messages will result in changing the order book. These messages will - be sent for received orders which are not yet on the order book. Do not alter - the order book for such messages, otherwise your order book will be incorrect. - - {'price': '16556.88', 'old_size': '0.24076471', 'new_size': '0.04076471', 'order_id': '9675d63e-0432-413d-a3f3-f30d7df39614', 'reason': 'STP', 'type': 'change', 'side': 'buy', 'product_id': 'BTC-USD', 'time': datetime.datetime(2022, 11, 24, 0, 35, 28, 904847, tzinfo=datetime.timezone.utc), 'sequence': 50703787284} - """ - if not self.keep_l3_book: - return - - delta = {BID: [], ASK: []} - - if 'price' not in msg or not msg['price']: - return - - order_id = msg['order_id'] - if order_id not in self.order_map: - return - - ts = self.timestamp_normalize(msg['time']) - price = Decimal(msg['price']) - side = ASK if msg['side'] == 'sell' else BID - new_size = Decimal(msg['new_size']) - pair = self.exchange_symbol_to_std_symbol(msg['product_id']) - - self._l3_book[pair].book[side][price][order_id] = new_size - self.order_map[order_id] = (price, new_size) - - delta[side].append((order_id, price, new_size)) - - await self.book_callback(L3_BOOK, self._l3_book[pair], timestamp, delta=delta, timestamp=ts, raw=msg, sequence_number=msg['sequence']) - async def message_handler(self, msg: str, conn: AsyncConnection, timestamp: float): # PERF perf_start(self.id, 'msg') msg = json.loads(msg, parse_float=Decimal) - if self.seq_no: - if 'product_id' in msg and 'sequence' in msg: - pair = self.exchange_symbol_to_std_symbol(msg['product_id']) - if not self.seq_no.get(pair, None): - return - if msg['sequence'] <= self.seq_no[pair]: - return - if msg['sequence'] != self.seq_no[pair] + 1: - LOG.warning("%s: Missing sequence number detected for %s. Received %d, expected %d", self.id, pair, msg['sequence'], self.seq_no[pair] + 1) - LOG.warning("%s: Resetting data for %s", self.id, pair) - self.__reset(symbol=pair) - await self._book_snapshot([pair]) - return - - self.seq_no[pair] = msg['sequence'] - - if 'type' in msg: - if msg['type'] == 'ticker': - await self._ticker(msg, timestamp) - elif msg['type'] == 'match' or msg['type'] == 'last_match': - await self._book_update(msg, timestamp) - elif msg['type'] == 'snapshot': - await self._pair_level2_snapshot(msg, timestamp) - elif msg['type'] == 'l2update': - await self._pair_level2_update(msg, timestamp) - elif msg['type'] == 'open': - await self._open(msg, timestamp) - elif msg['type'] == 'done': - await self._done(msg, timestamp) - elif msg['type'] == 'change': - await self._change(msg, timestamp) - elif msg['type'] == 'received': - await self._received(msg, timestamp) - elif msg['type'] == 'activate': - pass - elif msg['type'] == 'subscriptions': - pass - else: - LOG.warning("%s: Invalid message type %s", self.id, msg) - # PERF perf_end(self.id, 'msg') - # PERF perf_log(self.id, 'msg') + if 'channel' in msg and 'events' in msg: + for event in msg['events']: + if msg['channel'] == 'market_trades': + if event.get('type') == 'update': + for trade in event['trades']: + await self._trade_update(trade, timestamp) + else: + pass # TODO: do we want to implement trades snapshots? + elif msg['channel'] == 'l2_data': + if event.get('type') == 'update': + await self._pair_level2_update(event, timestamp, msg['timestamp']) + elif event.get('type') == 'snapshot': + await self._pair_level2_snapshot(event, timestamp) + elif msg['channel'] == 'subscriptions': + pass + else: + LOG.warning("%s: Invalid message type %s", self.id, msg) + # PERF perf_end(self.id, 'msg') + # PERF perf_log(self.id, 'msg') async def subscribe(self, conn: AsyncConnection): self.__reset() - - for chan in self.subscription: - await conn.write(json.dumps({"type": "subscribe", - "product_ids": self.subscription[chan], - "channels": [chan] - })) - - chan = self.std_channel_to_exchange(L3_BOOK) - if chan in self.subscription: - await self._book_snapshot(self.subscription[chan]) + all_pairs = list() + + async def _subscribe(chan: str, product_ids: list): + params = {"type": "subscribe", + "product_ids": product_ids, + "channel": chan + } + private_params = get_private_parameters(self.config, chan, product_ids) + if private_params: + params = {**params, **private_params} + await conn.write(json.dumps(params)) + + for channel in self.subscription: + all_pairs += self.subscription[channel] + await _subscribe(channel, self.subscription[channel]) + all_pairs = list(dict.fromkeys(all_pairs)) + await _subscribe('heartbeat', all_pairs) + # Implementing heartbeat as per Best Practices doc: https://docs.cloud.coinbase.com/advanced-trade-api/docs/ws-best-practices