From 036104a9bda3aa0332a3facec7bfebbe9e6a2eac Mon Sep 17 00:00:00 2001 From: lAmeR <42315864+lAmeR1@users.noreply.github.com> Date: Sat, 15 Apr 2023 23:03:41 +0200 Subject: [PATCH 1/5] removed block_time from inputs and outputs Signed-off-by: lAmeR <42315864+lAmeR1@users.noreply.github.com> --- BlocksProcessor.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/BlocksProcessor.py b/BlocksProcessor.py index 72151e7..18faf94 100644 --- a/BlocksProcessor.py +++ b/BlocksProcessor.py @@ -123,8 +123,7 @@ async def __add_tx_to_queue(self, block_hash, block): script_public_key_address=out["verboseData"][ "scriptPublicKeyAddress"], script_public_key_type=out["verboseData"][ - "scriptPublicKeyType"], - block_time=int(transaction["verboseData"]["blockTime"]))) + "scriptPublicKeyType"])) # Add transactions input for index, tx_in in enumerate(transaction.get("inputs", [])): self.txs_input.append(TransactionInput(transaction_id=transaction["verboseData"]["transactionId"], @@ -134,8 +133,7 @@ async def __add_tx_to_queue(self, block_hash, block): previous_outpoint_index=int(tx_in["previousOutpoint"].get( "index", 0)), signature_script=tx_in["signatureScript"], - sig_op_count=tx_in["sigOpCount"], - block_time=int(transaction["verboseData"]["blockTime"]))) + sig_op_count=tx_in["sigOpCount"])) else: # If the block if already in the Queue, merge the block_hashes. self.txs[tx_id].block_hash = list(set(self.txs[tx_id].block_hash + [block_hash])) From af9617ca9a481d77f3b38fd1282b7e72262deed9 Mon Sep 17 00:00:00 2001 From: lAmeR <42315864+lAmeR1@users.noreply.github.com> Date: Sat, 15 Apr 2023 23:03:55 +0200 Subject: [PATCH 2/5] removed block_time from inputs and outputs Signed-off-by: lAmeR <42315864+lAmeR1@users.noreply.github.com> --- models/Transaction.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/models/Transaction.py b/models/Transaction.py index 67ffa08..881d72b 100644 --- a/models/Transaction.py +++ b/models/Transaction.py @@ -31,11 +31,8 @@ class TransactionOutput(Base): script_public_key_address = Column(String) script_public_key_type = Column(String) accepting_block_hash = Column(String) - block_time = Column(BigInteger) # "1663286480803" -Index("block_timeoutputs_idx", TransactionOutput.block_time) -Index("idx_addr_bt", TransactionOutput.script_public_key_address, TransactionOutput.block_time) Index("idx_txouts", TransactionOutput.transaction_id) Index("idx_txouts_addr", TransactionOutput.script_public_key_address) Index("tx_id_and_index", TransactionOutput.transaction_id, TransactionOutput.index) @@ -52,7 +49,6 @@ class TransactionInput(Base): signature_script = Column(String) # "41c903159094....281a1d26f70b0037d600554e01", sig_op_count = Column(Integer) - block_time = Column(BigInteger) # "1663286480803" Index("idx_txin_prev", TransactionInput.previous_outpoint_hash) From 86b775951b8480c970f81bac60e6dee435dd514d Mon Sep 17 00:00:00 2001 From: lAmeR <42315864+lAmeR1@users.noreply.github.com> Date: Sat, 15 Apr 2023 23:04:19 +0200 Subject: [PATCH 3/5] TxAddrMappingUpdater new version Signed-off-by: lAmeR <42315864+lAmeR1@users.noreply.github.com> --- TxAddrMappingUpdater.py | 148 ++++++++++++++++++---------------------- 1 file changed, 68 insertions(+), 80 deletions(-) diff --git a/TxAddrMappingUpdater.py b/TxAddrMappingUpdater.py index bc508f2..71e4b3c 100644 --- a/TxAddrMappingUpdater.py +++ b/TxAddrMappingUpdater.py @@ -5,9 +5,10 @@ from datetime import datetime, timedelta from dbsession import session_maker +from helper import KeyValueStore from models.TxAddrMapping import TxAddrMapping -LIMIT = 2000 +LIMIT = 1000 PRECONDITION_RETRIES = 2 _logger = logging.getLogger(__name__) @@ -16,29 +17,13 @@ class TxAddrMappingUpdater(object): def __init__(self): self.last_block_time = None + self.id_counter_inputs = None + self.id_counter_outputs = None def precondition(self): - for i in range(PRECONDITION_RETRIES): - with session_maker() as s: - res = s.execute("SELECT block_time " - "FROM tx_id_address_mapping " - "ORDER by id " - "DESC " - "LIMIT 1").first() - - # get last added block time and substract 1 hour just for instance - if res: - self.last_block_time = res[0] - 1000 * 60 * 60 - break - else: - _logger.info('No block time found in database. Fetching from blocks') - res = s.execute("SELECT timestamp FROM blocks ORDER by blue_score ASC LIMIT 1").first() - if res: - self.last_block_time = int(res[0].timestamp() * 1000) - break - - _logger.info(f"Retry {i + 1}. Wait 10s") - time.sleep(10) + with session_maker() as s: + self.id_counter_inputs = int(KeyValueStore.get("last_id_counter_inputs")) or 0 + self.id_counter_outputs = int(KeyValueStore.get("last_id_counter_outputs")) or 0 @staticmethod def minimum_timestamp(): @@ -52,9 +37,28 @@ def loop(self): _logger.debug('Start TxAddrMappingUpdater') # type: TxAddrMapping while True: + + # get max id ( either LIMIT or maximum in DB ) + with session_maker() as s: + max_in = min(self.id_counter_inputs + LIMIT, + s.execute( + f"""SELECT id FROM transactions_inputs ORDER by id DESC LIMIT 1""") + .scalar()) + + max_out = min(self.id_counter_outputs + LIMIT, + s.execute( + f"""SELECT id FROM transactions_outputs ORDER by id DESC LIMIT 1""") + .scalar()) + try: - count_outputs, new_last_block_time_outputs = self.update_outputs(self.last_block_time) - count_inputs, new_last_block_time_inputs = self.update_inputs(self.last_block_time) + count_outputs, new_last_block_time_outputs = self.update_outputs(self.id_counter_outputs, + max_out) + count_inputs, new_last_block_time_inputs = self.update_inputs(self.id_counter_inputs, + max_in) + # save last runs ids in case of restart + KeyValueStore.set("last_id_counter_inputs", max_in) + KeyValueStore.set("last_id_counter_outputs", max_out) + except Exception: error_cnt += 1 if error_cnt <= 3: @@ -65,36 +69,23 @@ def loop(self): _logger.info(f"Updated {count_inputs} input mappings.") _logger.info(f"Updated {count_outputs} outputs mappings.") - # initialize with latest known block time - new_last_block_time = max(new_last_block_time_outputs or 0, - new_last_block_time_inputs or 0) - - # fallback if nothing added - if not new_last_block_time_outputs and not new_last_block_time_inputs: - _logger.debug("No mapping to be added.") - new_last_block_time = self.get_last_block_time(self.last_block_time) - - # get minimum timestamp to check next loop - # -> block_time is not an consistent increasing value -> need to leave space in the past - # -> now() - 1 min - min_timestamp = TxAddrMappingUpdater.minimum_timestamp() + last_id_counter_inputs = self.id_counter_inputs + last_id_counter_outputs = self.id_counter_outputs - # new last block times are sorted in the database! - next_block_time = min(new_last_block_time_outputs or new_last_block_time, - min_timestamp, - new_last_block_time_inputs or new_last_block_time) - - if self.last_block_time == next_block_time: - # this only happens, when kaspad is not synced and the last block ( far in the past ) is not changing - _logger.info('DB is not filled with the current blocks. Wait.') - time.sleep(10) + # next start id is the maximum of last request + self.id_counter_inputs = max_in + self.id_counter_outputs = max_out - self.last_block_time = next_block_time + _logger.debug(f"Next TX-Input ID: {self.id_counter_inputs}." + + (f" ({datetime.fromtimestamp(new_last_block_time_inputs / 1000).isoformat()})" + if new_last_block_time_inputs else "")) - _logger.debug(f"Added TxAddrMapping up to: " - f"{datetime.fromtimestamp(self.last_block_time / 1000).isoformat()}") + _logger.debug(f"Next TX-Output ID: {self.id_counter_outputs}." + + (f" ({datetime.fromtimestamp(new_last_block_time_outputs / 1000).isoformat()})" + if new_last_block_time_outputs else "")) - if self.last_block_time == min_timestamp: + if last_id_counter_inputs + LIMIT > self.id_counter_inputs and \ + last_id_counter_outputs + LIMIT > self.id_counter_outputs: time.sleep(10) def get_last_block_time(self, start_block_time): @@ -112,27 +103,29 @@ def get_last_block_time(self, start_block_time): except TypeError: return start_block_time - def update_inputs(self, start_block_time: int): + def update_inputs(self, min_id: int, max_id: int): with session_maker() as s: - result = s.execute(f"""INSERT INTO tx_id_address_mapping (transaction_id, address, block_time) - - (SELECT DISTINCT * FROM - (SELECT tid, transactions_outputs.script_public_key_address, sq.block_time FROM - (SELECT - transactions.transaction_id as tid, - transactions.block_time - - FROM transactions - WHERE transactions.block_time >= :blocktime) as sq - LEFT JOIN transactions_inputs ON transactions_inputs.transaction_id = sq.tid - LEFT JOIN transactions_outputs ON transactions_outputs.transaction_id = transactions_inputs.previous_outpoint_hash AND transactions_outputs.index = transactions_inputs.previous_outpoint_index - ORDER by sq.block_time ASC - LIMIT {LIMIT}) as masterq - WHERE script_public_key_address IS NOT NULL) + result = s.execute(f"""INSERT INTO tx_id_address_mapping (transaction_id, address, block_time) - ON CONFLICT DO NOTHING - RETURNING block_time;""", {"blocktime": start_block_time}) + SELECT DISTINCT * FROM ( + SELECT transactions_inputs.transaction_id, + transactions_outputs.script_public_key_address, + transactions.block_time FROM transactions_inputs + LEFT JOIN transactions_outputs ON + + transactions_outputs.transaction_id = transactions_inputs.previous_outpoint_hash AND + transactions_outputs.index = transactions_inputs.previous_outpoint_index + + LEFT JOIN transactions ON transactions.transaction_id = transactions_inputs.transaction_id + + WHERE transactions_inputs.id > :minId AND transactions_inputs.id <= :maxId + AND transactions_outputs.script_public_key_address IS NOT NULL + ORDER by transactions_inputs.id + ) as distinct_query + + ON CONFLICT DO NOTHING + RETURNING block_time;""", {"minId": min_id, "maxId": max_id}) s.commit() @@ -142,25 +135,20 @@ def update_inputs(self, start_block_time: int): except IndexError: return 0, None - def update_outputs(self, start_block_time: int): + def update_outputs(self, min_id: int, max_id: int): with session_maker() as s: result = s.execute(f""" INSERT INTO tx_id_address_mapping (transaction_id, address, block_time) - (SELECT - transactions.transaction_id as tid, - transactions_outputs.script_public_key_address, - transactions.block_time - - FROM transactions - LEFT JOIN transactions_outputs ON transactions.transaction_id = transactions_outputs.transaction_id - WHERE transactions.block_time >= :blocktime - ORDER by transactions.block_time ASC - LIMIT {LIMIT}) + (SELECT sq.*, transactions.block_time FROM (SELECT transaction_id, script_public_key_address + FROM transactions_outputs + WHERE transactions_outputs.id > :minId and transactions_outputs.id <= :maxId + ORDER by transactions_outputs.id DESC) as sq + JOIN transactions ON transactions.transaction_id = sq.transaction_id) ON CONFLICT DO NOTHING - RETURNING block_time;""", {"blocktime": start_block_time}) + RETURNING block_time;""", {"minId": min_id, "maxId": max_id}) s.commit() From 3460be2744da240cded8a8d86a6ad16126ae75bd Mon Sep 17 00:00:00 2001 From: lAmeR <42315864+lAmeR1@users.noreply.github.com> Date: Fri, 28 Apr 2023 08:44:13 +0200 Subject: [PATCH 4/5] Fix version for TXs without outputs Signed-off-by: lAmeR <42315864+lAmeR1@users.noreply.github.com> --- BlocksProcessor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/BlocksProcessor.py b/BlocksProcessor.py index 18faf94..703e4c1 100644 --- a/BlocksProcessor.py +++ b/BlocksProcessor.py @@ -114,7 +114,7 @@ async def __add_tx_to_queue(self, block_hash, block): block_time=int(transaction["verboseData"]["blockTime"])) # Add transactions output - for index, out in enumerate(transaction["outputs"]): + for index, out in enumerate(transaction.get("outputs", [])): self.txs_output.append(TransactionOutput(transaction_id=transaction["verboseData"]["transactionId"], index=index, amount=out["amount"], From 7860a04fe0e8811b7ac197450fa25f0622846f4a Mon Sep 17 00:00:00 2001 From: lAmeR <42315864+lAmeR1@users.noreply.github.com> Date: Wed, 24 May 2023 22:52:18 +0200 Subject: [PATCH 5/5] fixed default values to zero Signed-off-by: lAmeR <42315864+lAmeR1@users.noreply.github.com> --- TxAddrMappingUpdater.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/TxAddrMappingUpdater.py b/TxAddrMappingUpdater.py index 71e4b3c..8cfb805 100644 --- a/TxAddrMappingUpdater.py +++ b/TxAddrMappingUpdater.py @@ -22,8 +22,8 @@ def __init__(self): def precondition(self): with session_maker() as s: - self.id_counter_inputs = int(KeyValueStore.get("last_id_counter_inputs")) or 0 - self.id_counter_outputs = int(KeyValueStore.get("last_id_counter_outputs")) or 0 + self.id_counter_inputs = int(KeyValueStore.get("last_id_counter_inputs") or 0) + self.id_counter_outputs = int(KeyValueStore.get("last_id_counter_outputs") or 0) @staticmethod def minimum_timestamp(): @@ -43,12 +43,12 @@ def loop(self): max_in = min(self.id_counter_inputs + LIMIT, s.execute( f"""SELECT id FROM transactions_inputs ORDER by id DESC LIMIT 1""") - .scalar()) + .scalar() or 0) max_out = min(self.id_counter_outputs + LIMIT, s.execute( f"""SELECT id FROM transactions_outputs ORDER by id DESC LIMIT 1""") - .scalar()) + .scalar() or 0) try: count_outputs, new_last_block_time_outputs = self.update_outputs(self.id_counter_outputs,