From f6d1c026d440e9de93fd912afdfadece72c31c50 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Josep=20Batall=C3=A9?= Date: Fri, 19 Aug 2022 18:01:30 +0200 Subject: [PATCH] core: refactor wallet processor. Setting logging config --- backend/wallet_processor/entities/__init__.py | 11 +- backend/wallet_processor/entities/broker.py | 76 ++++-- backend/wallet_processor/entities/crypto.py | 234 ++++++++---------- .../handlers/wallet_processor.py | 44 ++-- backend/wallet_processor/utils/__init__.py | 70 +++--- .../wallet_processor/utils/crypto_prices.py | 2 +- backend/worker.py | 1 - config/__init__.py | 48 ++++ 8 files changed, 279 insertions(+), 207 deletions(-) diff --git a/backend/wallet_processor/entities/__init__.py b/backend/wallet_processor/entities/__init__.py index 620c406..81e1a10 100644 --- a/backend/wallet_processor/entities/__init__.py +++ b/backend/wallet_processor/entities/__init__.py @@ -21,10 +21,19 @@ def __init__(self): def trade(self, queue, order): raise NotImplementedError + def preprocess(self, orders): + pass + def clean(self): pass - def get_orders(self, user_id): + def get_accounts(self, user_id): + pass + + def get_orders(self, accounts): + pass + + def get_transactions(self, accounts): pass def create_closed_orders(self): diff --git a/backend/wallet_processor/entities/broker.py b/backend/wallet_processor/entities/broker.py index d498d05..572577f 100644 --- a/backend/wallet_processor/entities/broker.py +++ b/backend/wallet_processor/entities/broker.py @@ -9,37 +9,54 @@ class BrokerProcessor(AbstractEntity): def __init__(self): super(BrokerProcessor, self).__init__() + @staticmethod + def preprocess(orders): + return orders + @staticmethod def clean(transactions): ProxyOrder.query.filter(ProxyOrder.transaction_id.in_([t.id for t in transactions])).delete() ClosedOrder.query.filter(ClosedOrder.sell_transaction_id.in_([t.id for t in transactions])).delete() @staticmethod - def get_orders(user_id): - accounts = Account.query.with_entities(Account.id).filter(Account.user_id == user_id, - Account.entity.has(type=Entity.Type.BROKER)).all() - transactions = StockTransaction.query.filter( + def get_accounts(user_id): + return Account.query.with_entities(Account.id).filter(Account.user_id == user_id, + Account.entity.has(type=Entity.Type.BROKER)).all() + + @staticmethod + def get_orders(accounts): + """ + Get all stock orders of user user_id in ASC order + """ + return StockTransaction.query.filter( StockTransaction.account_id.in_([a.id for a in accounts])).order_by( StockTransaction.value_date.asc(), StockTransaction.type.asc()).all() - return transactions + + @staticmethod + def get_transactions(accounts): + return [] def trade(self, queue, order): + """ + Create queue and closed orders checking buys and sells + """ sell_items = None fees = order.fee + order.exchange_fee if order.shares == 0: return None if order.type == StockTransaction.Type.BUY: - order = Transaction(order.id, Transaction.Type.BUY, order.value_date, order.ticker, order.shares, order.price, fees, order.currency_rate) + order = Transaction(Transaction.Type.BUY, order, order.ticker, order.shares, order.price, fees) queue.buy(order) - # ticker_transactions[t.ticker]["open"].append(OpenOrder(t)) else: - order = Transaction(order.id, Transaction.Type.SELL, order.value_date, order.ticker, order.shares, order.price, fees, order.currency_rate) + order = Transaction(Transaction.Type.SELL, order, order.ticker, order.shares, order.price, fees) sell_items = queue.sell(order) - # ticker_transactions[t.ticker]["closed"].extend(sell_items) return sell_items def create_closed_orders(self, orders, tracked_orders): + """ + Insert closed orders to database + """ self._logger.info("Cleaning closed/wallet/proxy orders") self.clean(orders) @@ -64,10 +81,14 @@ def create_closed_orders(self, orders, tracked_orders): ).save() def calc_wallet(self, user_id, orders, queue, tracked_orders): - # calculating wallet + """ + Calculate current open orders + """ to_insert = [] for ticker, partial_orders in queue.queues.items(): - self._logger.debug(f"Start processing ticker {ticker.ticker} - {ticker}") + if ticker.ticker == 'CRSR': + print("CHECL") + # self._logger.debug(f"Processing ticker {ticker.ticker} - {ticker}") if not ticker: self._logger.error("Ticker not found!") continue @@ -76,6 +97,7 @@ def calc_wallet(self, user_id, orders, queue, tracked_orders): avg_price = 0 fees = 0 total_cost = 0 + total_cost_eur = 0 open_orders = [] for order in partial_orders: @@ -83,6 +105,7 @@ def calc_wallet(self, user_id, orders, queue, tracked_orders): shares += order.amount avg_price = order.price total_cost = shares * avg_price + total_cost_eur += shares * avg_price * order.trade.currency_rate fees = order.fee open_orders.append(OpenOrder(transaction_id=order.trade.transaction_id, shares=shares)) continue @@ -90,7 +113,8 @@ def calc_wallet(self, user_id, orders, queue, tracked_orders): avg_price = (shares * avg_price + order.amount * order.price) / (shares + order.amount) # TODO: calc average fee? shares += order.amount - total_cost += order.price * order.amount # * partial_order.trade.currency_rate + total_cost += order.price * order.amount + total_cost_eur += order.price * order.amount * order.trade.currency_rate fees += order.fee open_orders.append(OpenOrder(transaction_id=order.trade.transaction_id, shares=order.amount)) @@ -103,16 +127,17 @@ def calc_wallet(self, user_id, orders, queue, tracked_orders): current_benefits = 0 current_benefits_eur = 0 total_sell = 0 + total_sell_eur = 0 for order in [w for w in tracked_orders if w.sell_trade.ticker == ticker]: current_benefits += order.benefits current_benefits_eur += order.benefits_in_eur - total_sell += order.sell_trade.price * order.amount # * order.sell_trade.currency_rate + total_sell += order.sell_trade.price * order.amount + total_sell_eur += order.sell_trade.price * order.amount * order.sell_trade.currency_rate fees += order.sell_trade.fees - self._logger.debug(f"Benefits: {current_benefits}. Fees: {fees}") + # self._logger.debug(f"Benefits: {current_benefits}. Fees: {fees}") current_benefits += fees - - break_even = (total_cost - total_sell) / shares + break_even = (total_cost_eur - current_benefits_eur) / shares w = Wallet( ticker_id=ticker.id, @@ -121,8 +146,9 @@ def calc_wallet(self, user_id, orders, queue, tracked_orders): price=avg_price, # in $ cost=total_cost, # in base currency without fees benefits=current_benefits_eur, # in €, in front we should sum fees - break_even=break_even if break_even > 0 else 0, # in base_currency - fees=fees) + break_even=break_even if break_even > 0 else 0, # in €, in front we apply the current fx_rate + fees=fees # in € + ) w.open_orders.extend(open_orders) to_insert.append(w) @@ -133,5 +159,15 @@ def calc_wallet(self, user_id, orders, queue, tracked_orders): self._logger.info("Wallet calculation done") @staticmethod - def calc_balance_with_orders(self): - return {} + def calc_balance_with_orders(orders): + balance = {} + for order in orders: + if order.ticker.ticker not in balance: + balance[order.ticker.ticker] = 0 + + if order.type == StockTransaction.Type.BUY: + balance[order.ticker.ticker] += order.shares + else: + balance[order.ticker.ticker] -= order.shares + + return balance diff --git a/backend/wallet_processor/entities/crypto.py b/backend/wallet_processor/entities/crypto.py index 9f43b1f..c720059 100644 --- a/backend/wallet_processor/entities/crypto.py +++ b/backend/wallet_processor/entities/crypto.py @@ -1,7 +1,7 @@ -from datetime import datetime from wallet_processor.entities import AbstractEntity from models.system import Account, Entity -from models.crypto import ExchangeOrder, ExchangeTransaction, ExchangeWallet, ExchangeClosedOrder, ExchangeProxyOrder +from models.crypto import ExchangeOrder, ExchangeTransaction, ExchangeWallet, ExchangeClosedOrder, ExchangeProxyOrder, \ + ExchangeOpenOrder from wallet_processor.utils import BalanceQueue, Transaction from wallet_processor.utils.crypto_prices import get_price @@ -11,137 +11,125 @@ class CryptoProcessor(AbstractEntity): def __init__(self): super(CryptoProcessor, self).__init__() - def preprocess(self): - pass - # self.check_transactions(orders) - # - # orders_orders = [o for o in orders if not isinstance(o, TransactionOrder)] - # iota_orders = [o for o in orders_orders if 'IOT' in o.pair] - # iota_trans = [o for o in orders if isinstance(o, TransactionOrder) and 'IOT' in o.currency] - # w = 0 - # for o in sorted(iota_orders + iota_trans, key=lambda x: x['time']): - # time = datetime.fromtimestamp(o.time) - # if isinstance(o, TransactionOrder): - # vol = o.amount - # if o.type == OrderType.WITHDRAWAL: - # vol = -o.amount - # if isinstance(o, Order): - # vol = o.vol - # if o.type == OrderType.SELL: - # vol = -o.vol - # print(f"{time},{o.type},{vol},{o.exchange}") + @staticmethod + def preprocess(orders): + for order in orders: + if isinstance(order, ExchangeTransaction): + order.currency = order.currency.upper().replace("IOT", "IOTA").replace("IOTAA", "IOTA").\ + replace("XRB", "NANO").replace("XBT", "BTC") + else: + order.pair = order.pair.upper().replace("IOT", "IOTA").replace("IOTAA", "IOTA").\ + replace("XRB", "NANO").replace("XBT", "BTC") + return orders @staticmethod - def clean(transactions): - ExchangeProxyOrder.query.filter(ExchangeProxyOrder.transaction_id.in_([t.id for t in transactions])).delete() - ExchangeClosedOrder.query.filter(ExchangeClosedOrder.sell_transaction_id.in_([t.id for t in transactions])).delete() + def clean(orders): + ExchangeProxyOrder.query.filter(ExchangeProxyOrder.order_id.in_([t.id for t in orders])).delete() + ExchangeClosedOrder.query.filter(ExchangeClosedOrder.sell_order_id.in_([t.id for t in orders])).delete() @staticmethod - def get_orders(user_id): - accounts = Account.query.with_entities(Account.id).filter(Account.user_id == user_id, - Account.entity.has(type=Entity.Type.EXCHANGE)).all() - transactions = ExchangeOrder.query.filter( + def get_accounts(user_id): + return Account.query.with_entities(Account.id).filter(Account.user_id == user_id, + Account.entity.has(type=Entity.Type.EXCHANGE)).all() + + @staticmethod + def get_orders(accounts): + orders = ExchangeOrder.query.filter( ExchangeOrder.account_id.in_([a.id for a in accounts])).order_by( ExchangeOrder.value_date.asc(), ExchangeOrder.type.asc()).all() + return orders + + @staticmethod + def get_transactions(accounts): + transactions = ExchangeTransaction.query.filter( + ExchangeTransaction.account_id.in_([a.id for a in accounts])).order_by( + ExchangeTransaction.value_date.asc(), ExchangeTransaction.type.asc()).all() return transactions def trade(self, queue, order): + """ + This function calculates the trade operations. Creates the queue and returns the sell orders + """ value_date = order.value_date if isinstance(order, ExchangeTransaction): - order.currency = order.currency.upper() - order.currency = order.currency.replace("IOT", "IOTA").replace("IOTAA", "IOTA").replace("XRB", "NANO") - if order.type == ExchangeTransaction.Type.DEPOSIT and order.currency == 'EUR': - print(f"{value_date} - Deposit {order.amount}{order.currency}. Target: {order.exchange} " - f"Current: {queue.current_amount(order.currency)}") + if order.type == ExchangeTransaction.Type.DEPOSIT: # and order.currency == 'EUR': queue.deposit(order) - elif order.type == ExchangeTransaction.Type.WITHDRAWAL and order.currency == 'EUR': - print(f"{value_date} - Withdrawal {order.amount}{order.currency}. Source: {order.exchange} " - f"Current: {queue.current_amount(order.currency)}") + elif order.type == ExchangeTransaction.Type.WITHDRAWAL: # and order.currency == 'EUR': queue.withdrawal(order) - return - # order.vol = round(order.amount, 4) - order.pair = order.pair.upper() - order.pair = order.pair.replace("IOT", "IOTA").replace("IOTAA", "IOTA").replace("XRB", "NANO") - order.cost = order.amount * order.price - # order.amount = float("{:0.4f}".format(order.amount)) - # order.price = float("{:0.8f}".format(order.price)) + self._logger.debug( + f"{value_date} - {order.get_type()} - {order.amount}{order.currency} to/from {order.account.entity}. " + f"Current: {queue.current_amount(order.currency)}{order.currency}") + return sell_info = None + order.cost = round(order.amount * order.price, 8) source = order.pair.split("/")[0] target = order.pair.split("/")[1] if order.type == ExchangeOrder.Type.BUY: - self._logger.info(f"{value_date} - Buy {source} from {target}. Amount: {order.amount}{source}. " - f"Price: {order.price} Cost: {order.cost}{target}. {order.account_id} " - f"Current EUR: {queue.current_amount('EUR')}") + self._logger.debug(f"{value_date} - Buying {order.amount}{source}@{order.price} from {target}. " + f"Cost: {order.cost}{target}. {order.account.entity}. " + f"Current EUR: {queue.current_amount('EUR')}") else: - self._logger.info(f"{value_date} - Sell {source} to {target}. Selling: {order.amount}{source}. " - f"Price: {order.price} Won: {order.cost}{target}. {order.account_id}. " - f"Current amount: {queue.current_amount(source)} " - f"Current EUR: {queue.current_amount('EUR')}") + self._logger.debug(f"{value_date} - Selling {order.amount}{source}@{order.price} to {target}. " + f"Won: {order.cost}{target}. {order.account.entity}. " + f"Current amount: {queue.current_amount(source)} " + f"Current EUR: {queue.current_amount('EUR')}") if order.type == ExchangeOrder.Type.BUY and target == 'EUR': - buy_transaction = Transaction(order.id, Transaction.Type.BUY, order.value_date, source, order.amount, order.price) + buy_transaction = Transaction(Transaction.Type.BUY, order, source, order.amount, order.price) order.cost = round(order.cost, 4) - sell_transaction = Transaction(order.id, Transaction.Type.SELL, order.value_date, target, order.cost, order.price, - order.fee) + sell_transaction = Transaction(Transaction.Type.SELL, order, target, order.cost, order.price, order.fee) queue.buy(buy_transaction) - sell_info = queue.sell(sell_transaction) + queue.sell(sell_transaction) + self._logger.info(f"Current EUR: {queue.current_amount('EUR')}") return None - sell_transaction = None - buy_transaction = None if order.type == ExchangeOrder.Type.BUY: # target can be other coins target_price = get_price(target, "EUR", order.value_date) source_price = get_price(source, "EUR", order.value_date) - sell_transaction = Transaction(order.id, Transaction.Type.SELL, order.value_date, target, order.cost, - target_price, order.fee) - buy_transaction = Transaction(order.id, Transaction.Type.BUY, order.value_date, source, order.amount, source_price) + sell_transaction = Transaction(Transaction.Type.SELL, order, target, order.cost, target_price, order.fee) + buy_transaction = Transaction(Transaction.Type.BUY, order, source, order.amount, source_price) elif order.type == ExchangeOrder.Type.SELL and target == 'EUR': order.cost = round(order.cost, 4) - sell_transaction = Transaction(order.id, Transaction.Type.SELL, order.value_date, source, order.amount, order.price) - buy_transaction = Transaction(order.id, Transaction.Type.BUY, order.value_date, target, order.cost, order.price, - order.fee) + sell_transaction = Transaction(Transaction.Type.SELL, order, source, order.amount, order.price) + buy_transaction = Transaction(Transaction.Type.BUY, order, target, order.cost, order.price, order.fee) elif order.type == ExchangeOrder.Type.SELL: target_price = get_price(target, "EUR", order.value_date) source_price = get_price(source, "EUR", order.value_date) - sell_transaction = Transaction(order.id, Transaction.Type.SELL, order.value_date, source, order.amount, source_price) - buy_transaction = Transaction(order.id, Transaction.Type.BUY, order.value_date, target, order.cost, target_price, - order.fee) + sell_transaction = Transaction(Transaction.Type.SELL, order, source, order.amount, source_price) + buy_transaction = Transaction(Transaction.Type.BUY, order, target, order.cost, target_price, order.fee) else: - self._logger.warning("HEEY, something happened here!") - - if not sell_transaction or not buy_transaction: - self._logger.debug("Not sell and buy transactions!") + self._logger.debug("Not sell and buy transactions, something happened here!") return - if order.type == ExchangeOrder.Type.SELL: - sell_info = queue.sell(sell_transaction) - queue.buy(buy_transaction) - else: - sell_info = queue.sell(sell_transaction) - queue.buy(buy_transaction) + # if order.type == ExchangeOrder.Type.SELL: + sell_info = queue.sell(sell_transaction) + queue.buy(buy_transaction) + # else: + # sell_info = queue.sell(sell_transaction) + # queue.buy(buy_transaction) + self._logger.info(f"Current EUR: {queue.current_amount('EUR')}") if sell_info: return sell_info return None - def calc_wallet(self, user_id, orders, queue, tracked_orders): - self._logger.info("Cleaning closed orders") + def create_closed_orders(self, orders, tracked_orders): + self._logger.info("Cleaning closed/wallet/proxy orders") self.clean(orders) self._logger.info("Generating new crypto wallet") - order_benefits = [] - benefits_year = {} for sell_order in tracked_orders: - closed_order = ExchangeClosedOrder(sell_transaction_id=sell_order.sell_trade.transaction_id) + closed_order = ExchangeClosedOrder(sell_order_id=sell_order.sell_trade.transaction_id) closed_order.save() for buy_order in sell_order.buy_items: if not buy_order.trade: - self._logger.warning("Some strange case!") + # self._logger.warning(f"Sell order without buy. Probably the buy order is made in other exchange! \ + # {buy_order.amount}{sell_order.sell_trade.ticker} at {sell_order.sell_trade.time}") continue if sell_order.amount <= buy_order.amount: @@ -150,75 +138,80 @@ def calc_wallet(self, user_id, orders, queue, tracked_orders): try: partial_fee = buy_order.fee / (sell_order.amount / buy_order.amount) except: - print("Amount is 0!") + self._logger.warning("Amount is 0!") partial_fee = 0 ExchangeProxyOrder( closed_order_id=closed_order.id, - transaction_id=buy_order.trade.transaction_id, - shares=buy_order.amount, + order_id=buy_order.trade.transaction_id, + amount=buy_order.amount, partial_fee=partial_fee ).save() - close = { - "amount": sell_order.amount, - "price": sell_order.sell_trade.price, - "time": sell_order.sell_trade.time, - } - - # calculating wallet + def calc_wallet(self, user_id, orders, queue, tracked_orders): + """ + Calculate current open orders + """ + # calculating wallet, using Wallet and OpenOrders to_insert = [] for ticker, partial_orders in queue.queues.items(): self._logger.debug(f"Start processing ticker {ticker}") - shares = 0 + amount = 0 avg_price = 0 fees = 0 total_cost = 0 open_orders = [] for order in partial_orders: - if shares == 0: - shares += order.amount + if not order.trade: + self._logger.error("Order without trade!") + continue + if amount == 0: + amount += order.amount avg_price = order.price - total_cost = shares * avg_price + total_cost = amount * avg_price fees = order.fee + open_orders.append(ExchangeOpenOrder(order_id=order.trade.transaction_id, amount=order.amount)) continue - avg_price = (shares * avg_price + order.amount * order.price) / (shares + order.amount) + avg_price = (amount * avg_price + order.amount * order.price) / (amount + order.amount) # TODO: calc average fee? - shares += order.amount + amount += order.amount total_cost += order.price * order.amount # * partial_order.trade.currency_rate fees += order.fee + open_orders.append(ExchangeOpenOrder(order_id=order.trade.transaction_id, amount=order.amount)) # Calculate current benefits taking into account sells - current_benefits = 0 + # current_benefits = 0 current_benefits_eur = 0 total_sell = 0 for order in [w for w in tracked_orders if w.sell_trade.ticker == ticker]: - current_benefits += order.benefits + # current_benefits += order.benefits current_benefits_eur += order.benefits_in_eur total_sell += order.sell_trade.price * order.amount * order.sell_trade.currency_rate fees += order.sell_trade.fees # current_benefits += 0 # fees?? - if shares == 0: - self._logger.info(f"Shares is 0 for ticker {ticker}!") + if amount == 0: + self._logger.info(f"Amount is 0 for ticker {ticker}!") continue - break_even = (total_cost - total_sell) / shares + break_even = (total_cost - total_sell) / amount if not ticker: self._logger.info("Ticker not found!") w = ExchangeWallet( currency=ticker, user_id=user_id, - amount=shares, + amount=amount, price=avg_price, # in $ cost=total_cost, # in base currency without fees benefits=current_benefits_eur, # in €, in front we should sum fees break_even=break_even if break_even > 0 else 0, # in base_currency fees=fees) - + w.open_orders.extend(open_orders) to_insert.append(w) + ExchangeOpenOrder.query.filter(ExchangeOpenOrder.order_id.in_([t.id for t in orders])).delete() + ExchangeWallet.query.delete() ExchangeWallet.bulk_object(to_insert) self._logger.info("Wallet calculation done") @@ -226,7 +219,7 @@ def calc_wallet(self, user_id, orders, queue, tracked_orders): @staticmethod def calc_balance_with_orders(orders): balance = {} - for order in orders: + for order in sorted(orders, key=lambda x: x.value_date): if isinstance(order, ExchangeTransaction): if order.currency not in balance: balance[order.currency] = 0 @@ -236,45 +229,20 @@ def calc_balance_with_orders(orders): else: balance[order.currency] -= order.amount continue - order.pair = order.pair.replace("XBT", "BTC").replace("IOT", "IOTA").replace("IOTAA", "IOTA") - # logger.info(order.vol) source = order.pair.split("/")[0] target = order.pair.split("/")[1] - # order.vol = float("{:0.8f}".format(order.amount)) - # order.cost = float("{:0.8f}".format(order.cost)) - if source not in balance: balance[source] = 0 if target not in balance: balance[target] = 0 + if order.type == ExchangeOrder.Type.BUY: balance[source] += order.amount - balance[target] -= order.cost + balance[target] -= order.cost + order.fee else: balance[source] -= order.amount - balance[target] += order.cost + balance[target] += order.cost - order.fee return balance - - def check_transactions(self, orders): - transaction_orders = [o for o in orders if isinstance(o, ExchangeTransaction)] - queue = {} - pair_deposit_with = {} - for order in transaction_orders: - time = datetime.fromtimestamp(order.time) - order.currency = order.currency.upper() - if order.currency not in queue: - queue[order.currency] = 0 - - if order.type == ExchangeTransaction.Type.DEPOSIT: - self._logger.info(f"{time} - Deposit {order.amount}{order.currency}. Target: {order.exchange}") - queue[order.currency] += order.amount - # queue.deposit(order) - else: - self._logger.info(f"{time} - Withdrawal {order.amount}{order.currency}. Source: {order.exchange}") - queue[order.currency] -= order.amount - # queue.withdrawal(order) - - self._logger.info("Done") diff --git a/backend/wallet_processor/handlers/wallet_processor.py b/backend/wallet_processor/handlers/wallet_processor.py index 351f67e..e9df2ff 100644 --- a/backend/wallet_processor/handlers/wallet_processor.py +++ b/backend/wallet_processor/handlers/wallet_processor.py @@ -38,9 +38,14 @@ def process(self, data): return logger.info("Processor initialized. Retrieving orders") - orders = self.processor.get_orders(user_id) + accounts = self.processor.get_accounts(user_id) + orders = self.processor.get_orders(accounts) + transactions = self.processor.get_transactions(accounts) + orders = sorted(orders + transactions, key=lambda x: x.value_date) - logger.info(f"Found {len(orders)} orders. Start calculation") + orders = self.processor.preprocess(orders) + + logger.info(f"Found: {len(orders)} orders and {len(transactions)} transactions. Starting wallet calculation..") tracked_orders = [] queue = BalanceQueue() @@ -49,41 +54,52 @@ def process(self, data): if sell_order: tracked_orders.append(sell_order) - logger.info("") - self.processor.create_closed_orders(orders, tracked_orders) - logger.info("Calculation done. Generating wallet...") - self.processor.calc_wallet(user_id, orders, queue, tracked_orders) - try: self.validate_wallet(queue, orders) except Exception as e: logger.error(f"Error validating wallet calculations: {e}") + logger.exception(e) + + logger.info("Calculation done. Generating wallet...") + self.processor.create_closed_orders(orders, tracked_orders) + self.processor.calc_wallet(user_id, orders, queue, tracked_orders) + logger.info("Done") def validate_wallet(self, queue, orders): """ - We can try to check that the Wallet (Current balance) calculated is equal from whem we sum the operations + We can try to check that the Wallet (Current balance) calculated is equal to the sum of all operations """ wallet_balance = self.calc_balance_with_queue(queue) - wallet_balance = {k: v for k, v in wallet_balance.items() if v != 0.0} + wallet_balance = {k: round(v, 8) for k, v in wallet_balance.items() if v > 0.000000001} + wallet_balance = collections.OrderedDict(sorted(wallet_balance.items())) order_balance = self.processor.calc_balance_with_orders(orders) - order_balance = {k: v for k, v in order_balance.items() if v != 0.0} - - wallet_balance = collections.OrderedDict(sorted(wallet_balance.items())) + order_balance = {k: round(v, 8) for k, v in order_balance.items() if v > 0.000000001} order_balance = collections.OrderedDict(sorted(order_balance.items())) # check diff diffs = set(wallet_balance.items()) ^ set(order_balance.items()) - logger.info(f"Diffs: {diffs}") + diffs_dict = {} + for d in diffs: + if d[0] not in diffs_dict: + diffs_dict[d[0]] = [] + diffs_dict[d[0]].append(d[1]) + + if diffs_dict: + logger.warning(f"Found following diffs between orders and wallet calculation: {diffs_dict}") @staticmethod def calc_balance_with_queue(queue): logger.info("Calc balance using queue") balance = {} for k, v in queue.queues.items(): - balance[k] = 0 + if not isinstance(k, str): + k = k.ticker + + if k not in balance: + balance[k] = 0 for q in v: balance[k] += q.amount diff --git a/backend/wallet_processor/utils/__init__.py b/backend/wallet_processor/utils/__init__.py index 15cf09a..ee90406 100644 --- a/backend/wallet_processor/utils/__init__.py +++ b/backend/wallet_processor/utils/__init__.py @@ -1,5 +1,8 @@ from collections import deque, defaultdict from functools import reduce +import logging + +logger = logging.getLogger("utils.process") class Transaction: @@ -10,15 +13,16 @@ class Type: BUY = 0 SELL = 1 - def __init__(self, transaction_id, type, time, ticker, amount, price, fees=None, currency_rate=None): - self.transaction_id = transaction_id + def __init__(self, type, order, ticker, amount, price, fees=None): + self.order_type = order.__class__.__name__ + self.transaction_id = order.id self.type = type - self.time = time + self.time = order.value_date self.ticker = ticker self.amount = amount self.price = price - self.fees = round(fees, 4) if fees else 0 - self.currency_rate = round(currency_rate, 4) if currency_rate else 1 + self.fees = round(fees, 4) if fees else 0.0 + self.currency_rate = round(order.currency_rate, 4) if hasattr(order, 'currency_rate') else 1 @property def cost(self): @@ -29,7 +33,7 @@ def cost_base_currency(self): return round(self.amount * self.price * self.currency_rate - self.fees, 4) def __str__(self): - return f"Ticker: {self.ticker}-{self.amount}@{self.price}" + return f"Ticker: {self.ticker}-{self.amount}@{self.price}-Fees:{self.fees}" class PartialOrder: @@ -77,10 +81,10 @@ class SellInfo: def __init__(self, sell_trade, buy_items): self.sell_trade = sell_trade # the trade representing the sale if not buy_items: - print("CHECK HERE") + logger.debug("No buy items found!") for b in buy_items: if not isinstance(b, PartialOrder): - print("Error") + logger.error("No PartialOrder found!") self.buy_items = buy_items # list of buys from the past associated with the sale @property @@ -115,7 +119,7 @@ def cost_buy_in_eur(self): Cost when buying (in tax currency). Summarize cost of all buy items """ - return reduce(lambda a, b: a + b.cost * b.trade.currency_rate, self.buy_items, 0.0) + return reduce(lambda a, b: a + b.cost * b.trade.currency_rate or 1, self.buy_items, 0.0) @property def benefits(self): @@ -134,7 +138,7 @@ def benefits_in_eur(self): # return self.cost_sell - self.cost_buy # return self.cost_sell * self.sell_trade.currency_rate - self.cost_buy * self.sell_trade.currency_rate - return self.cost_sell * self.sell_trade.currency_rate - self.cost_buy_in_eur + return self.cost_sell * (self.sell_trade.currency_rate or 1) - self.cost_buy_in_eur class BalanceQueue: @@ -144,7 +148,7 @@ def __init__(self): self.withdrawals = defaultdict() def current_amount(self, ticker): - return sum([o.amount for o in self.queues[ticker]]) + return round(sum([o.amount for o in self.queues[ticker]]), 8) @staticmethod def _is_empty(queue): @@ -163,11 +167,6 @@ def _put_back(queue, item): def _put(queue, item): queue.append(item) - def buy(self, order): - amount = order.amount # - order.fee - self._put(self.queues[order.ticker], PartialOrder(amount, order)) - # self._put(queue, PartialOrder(amount, order)) - def deposit(self, order): self._put(self.queues[order.currency], PartialOrder(order.amount, None)) return @@ -185,11 +184,11 @@ def deposit(self, order): except: return if len(self.withdrawals[order.currency]) == 1: - print("Assign withdrawal to diposit?") + logger.info("Assign withdrawal to diposit?") if not wth_order: - print(f"{order.currency} - Unable to match withdrawal order with deposit order. Deposit amount: {order.amount}") + logger.info(f"{order.currency} - Unable to match withdrawal order with deposit order. Deposit amount: {order.amount}") for o in self.withdrawals[order.currency]: - print(f"Withdrawal amount: {o.amount} Fee: {o.fee}") + logger.info(f"Withdrawal amount: {o.amount} Fee: {o.fee}") self._put(self.queues[order.currency], PartialOrder(order.amount, None)) return # remove from dict @@ -204,17 +203,10 @@ def withdrawal(self, order): # self.withdrawals[order.currency].append(order) # return - if order.currency == 'XBT' or (order.currency == 'XRP' and order.exchange == 'Bittrex'): - print("CHeck XRP withdrawal") - - if (order.currency == 'IOTA'): - print("CHeck XRP sell") - while remaining_sell_amount > 0: if self._is_empty(queue): # no bought items left but sell is not fully covered # items_bought.append(PartialOrder(remaining_sell_amount, None)) - print("ALERT - NO BOUGHT ITEM LEFT for withdrawal! Pair: {}".format(order.currency)) - print(f"Current amount of {order.currency} is {self.current_amount(order.currency)}") + logger.warning(f"ALERT - NO BOUGHT ITEM LEFT for withdrawal! Pair: {order.currency}. Current amount is {self.current_amount(order.currency)}") break item = self._pop(queue) @@ -227,32 +219,36 @@ def withdrawal(self, order): # remaining_sell_amount -= item.amount remaining_sell_amount = float("{:0.8f}".format(remaining_sell_amount - item.amount)) else: - print("CHeck here!?") + logger.error("Unhandled condition. Please check!") + + def buy(self, order): + if order.order_type == 'StockTransaction': + amount = order.amount + else: + amount = order.amount - order.fees + self._put(self.queues[order.ticker], PartialOrder(amount, order)) def sell(self, order): remaining_sell_amount = order.amount items_bought = [] queue = self.queues[order.ticker] - - if (order.ticker == 'XBT'): - print("CHeck XRP sell") + fees = order.fees if order.order_type != 'StockTransaction' else 0 while remaining_sell_amount > 0: if self._is_empty(queue): # no bought items left but sell is not fully covered items_bought.append(PartialOrder(remaining_sell_amount, None)) - print("ALERT - NO BOUGHT ITEM LEFT!! Pair: {}".format(order.ticker)) - print(f"Current amount of {order.ticker} is {self.current_amount(order.ticker)}") + logger.warning(f"ALERT - NO BOUGHT ITEM LEFT!! Pair: {order.ticker} Current amount is {self.current_amount(order.ticker)}") break item = self._pop(queue) if item.trade and item.trade.ticker != order.ticker: - print("Different items: {} vs {}".format(item.trade.ticker, order.ticker)) + logger.warning("Different items: {} vs {}".format(item.trade.ticker, order.ticker)) if remaining_sell_amount < item.amount: # sell amount is entirely covered by bought items items_bought.append(PartialOrder(remaining_sell_amount, item.trade)) - #item.amount = float("{:0.8f}".format(item.amount - remaining_sell_amount - order.fee)) - item.amount = float("{:0.8f}".format(item.amount - remaining_sell_amount)) + # item.amount = float("{:0.8f}".format(item.amount - remaining_sell_amount - order.fee)) + item.amount = float("{:0.8f}".format(item.amount - remaining_sell_amount - fees)) self._put_back(queue, item) break elif remaining_sell_amount >= item.amount: # bought item is fully consumed by sell @@ -260,7 +256,7 @@ def sell(self, order): # remaining_sell_amount -= item.amount remaining_sell_amount = float("{:0.8f}".format(remaining_sell_amount - item.amount)) else: - print("CHeck here!?") + logger.error("Unhandled condition. Please check!") return SellInfo(order, items_bought) diff --git a/backend/wallet_processor/utils/crypto_prices.py b/backend/wallet_processor/utils/crypto_prices.py index 20c1fc1..4212eb7 100644 --- a/backend/wallet_processor/utils/crypto_prices.py +++ b/backend/wallet_processor/utils/crypto_prices.py @@ -101,7 +101,7 @@ def get_price(source, target, timestamp): r = requests.get(url.format(source, target, timestamp)) data = r.json() if r.status_code != 200: - print("errir") + print("error") try: data.get('Data').get('Data')[1].get('close') except: diff --git a/backend/worker.py b/backend/worker.py index ccd58d7..85cf97a 100644 --- a/backend/worker.py +++ b/backend/worker.py @@ -5,7 +5,6 @@ from kombu import Exchange, Queue, Connection logger = logging.getLogger("worker") -logging.basicConfig(level=logging.DEBUG, format='%(asctime)-15s %(levelname)s:%(name)s:%(message)s') class HandlerType: diff --git a/config/__init__.py b/config/__init__.py index 7b879ee..bec45bf 100644 --- a/config/__init__.py +++ b/config/__init__.py @@ -1,6 +1,8 @@ import os import importlib import importlib.util +import logging +import logging.config ENVIRONMENT_VARIABLE = "BACKEND_SETTINGS" @@ -45,6 +47,52 @@ def __init__(self, settings_module='config.local'): else: setattr(self, setting, setting_value) + self.LOG_LEVEL = logging.DEBUG + self.std_format = '%(asctime)s - %(levelname)s - %(name)s - %(message)s' + self.enabled_handlers = ['default'] + logging.config.dictConfig({ + 'version': 1, + 'disable_existing_loggers': False, + 'filters': { + }, + 'formatters': { + 'standard': { + 'format': self.std_format + } + }, + 'handlers': { + 'default': { + 'level': self.LOG_LEVEL, + 'formatter': 'standard', + 'class': 'logging.StreamHandler', + 'filters': [] + }, + }, + 'loggers': { + '': { + 'handlers': self.enabled_handlers, + 'level': self.LOG_LEVEL, + 'propagate': True + }, + 'urllib3': { + 'handlers': self.enabled_handlers, + 'level': logging.WARNING + }, + 'requests': { # disable requests library logging + 'handlers': self.enabled_handlers, + 'level': logging.WARNING + }, + 'sqlalchemy': { + 'handlers': self.enabled_handlers, + 'level': logging.ERROR + }, + 'sqlalchemy.engine': { + 'handlers': self.enabled_handlers, + 'level': logging.CRITICAL + }, + } + }) + env = os.environ.setdefault(ENVIRONMENT_VARIABLE, "config.local")