Skip to content

Commit

Permalink
Merge pull request #5 from lAmeR1/fix_initial_run
Browse files Browse the repository at this point in the history
Fix initial run
  • Loading branch information
lAmeR1 authored May 24, 2023
2 parents 6548139 + 7860a04 commit 88f3d39
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 88 deletions.
6 changes: 2 additions & 4 deletions BlocksProcessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,7 @@ async def __add_tx_to_queue(self, block_hash, block):
script_public_key_address=out["verboseData"][
"scriptPublicKeyAddress"],
script_public_key_type=out["verboseData"][
"scriptPublicKeyType"],
block_time=int(transaction["verboseData"]["blockTime"])))
"scriptPublicKeyType"]))
# Add transactions input
for index, tx_in in enumerate(transaction.get("inputs", [])):
self.txs_input.append(TransactionInput(transaction_id=transaction["verboseData"]["transactionId"],
Expand All @@ -134,8 +133,7 @@ async def __add_tx_to_queue(self, block_hash, block):
previous_outpoint_index=int(tx_in["previousOutpoint"].get(
"index", 0)),
signature_script=tx_in["signatureScript"],
sig_op_count=tx_in["sigOpCount"],
block_time=int(transaction["verboseData"]["blockTime"])))
sig_op_count=tx_in["sigOpCount"]))
else:
# If the block if already in the Queue, merge the block_hashes.
self.txs[tx_id].block_hash = list(set(self.txs[tx_id].block_hash + [block_hash]))
Expand Down
148 changes: 68 additions & 80 deletions TxAddrMappingUpdater.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
from datetime import datetime, timedelta

from dbsession import session_maker
from helper import KeyValueStore
from models.TxAddrMapping import TxAddrMapping

LIMIT = 2000
LIMIT = 1000
PRECONDITION_RETRIES = 2

_logger = logging.getLogger(__name__)
Expand All @@ -16,29 +17,13 @@
class TxAddrMappingUpdater(object):
def __init__(self):
self.last_block_time = None
self.id_counter_inputs = None
self.id_counter_outputs = None

def precondition(self):
for i in range(PRECONDITION_RETRIES):
with session_maker() as s:
res = s.execute("SELECT block_time "
"FROM tx_id_address_mapping "
"ORDER by id "
"DESC "
"LIMIT 1").first()

# get last added block time and substract 1 hour just for instance
if res:
self.last_block_time = res[0] - 1000 * 60 * 60
break
else:
_logger.info('No block time found in database. Fetching from blocks')
res = s.execute("SELECT timestamp FROM blocks ORDER by blue_score ASC LIMIT 1").first()
if res:
self.last_block_time = int(res[0].timestamp() * 1000)
break

_logger.info(f"Retry {i + 1}. Wait 10s")
time.sleep(10)
with session_maker() as s:
self.id_counter_inputs = int(KeyValueStore.get("last_id_counter_inputs") or 0)
self.id_counter_outputs = int(KeyValueStore.get("last_id_counter_outputs") or 0)

@staticmethod
def minimum_timestamp():
Expand All @@ -52,9 +37,28 @@ def loop(self):
_logger.debug('Start TxAddrMappingUpdater') # type: TxAddrMapping

while True:

# get max id ( either LIMIT or maximum in DB )
with session_maker() as s:
max_in = min(self.id_counter_inputs + LIMIT,
s.execute(
f"""SELECT id FROM transactions_inputs ORDER by id DESC LIMIT 1""")
.scalar() or 0)

max_out = min(self.id_counter_outputs + LIMIT,
s.execute(
f"""SELECT id FROM transactions_outputs ORDER by id DESC LIMIT 1""")
.scalar() or 0)

try:
count_outputs, new_last_block_time_outputs = self.update_outputs(self.last_block_time)
count_inputs, new_last_block_time_inputs = self.update_inputs(self.last_block_time)
count_outputs, new_last_block_time_outputs = self.update_outputs(self.id_counter_outputs,
max_out)
count_inputs, new_last_block_time_inputs = self.update_inputs(self.id_counter_inputs,
max_in)
# save last runs ids in case of restart
KeyValueStore.set("last_id_counter_inputs", max_in)
KeyValueStore.set("last_id_counter_outputs", max_out)

except Exception:
error_cnt += 1
if error_cnt <= 3:
Expand All @@ -65,36 +69,23 @@ def loop(self):
_logger.info(f"Updated {count_inputs} input mappings.")
_logger.info(f"Updated {count_outputs} outputs mappings.")

# initialize with latest known block time
new_last_block_time = max(new_last_block_time_outputs or 0,
new_last_block_time_inputs or 0)

# fallback if nothing added
if not new_last_block_time_outputs and not new_last_block_time_inputs:
_logger.debug("No mapping to be added.")
new_last_block_time = self.get_last_block_time(self.last_block_time)

# get minimum timestamp to check next loop
# -> block_time is not an consistent increasing value -> need to leave space in the past
# -> now() - 1 min
min_timestamp = TxAddrMappingUpdater.minimum_timestamp()
last_id_counter_inputs = self.id_counter_inputs
last_id_counter_outputs = self.id_counter_outputs

# new last block times are sorted in the database!
next_block_time = min(new_last_block_time_outputs or new_last_block_time,
min_timestamp,
new_last_block_time_inputs or new_last_block_time)

if self.last_block_time == next_block_time:
# this only happens, when kaspad is not synced and the last block ( far in the past ) is not changing
_logger.info('DB is not filled with the current blocks. Wait.')
time.sleep(10)
# next start id is the maximum of last request
self.id_counter_inputs = max_in
self.id_counter_outputs = max_out

self.last_block_time = next_block_time
_logger.debug(f"Next TX-Input ID: {self.id_counter_inputs}." +
(f" ({datetime.fromtimestamp(new_last_block_time_inputs / 1000).isoformat()})"
if new_last_block_time_inputs else ""))

_logger.debug(f"Added TxAddrMapping up to: "
f"{datetime.fromtimestamp(self.last_block_time / 1000).isoformat()}")
_logger.debug(f"Next TX-Output ID: {self.id_counter_outputs}." +
(f" ({datetime.fromtimestamp(new_last_block_time_outputs / 1000).isoformat()})"
if new_last_block_time_outputs else ""))

if self.last_block_time == min_timestamp:
if last_id_counter_inputs + LIMIT > self.id_counter_inputs and \
last_id_counter_outputs + LIMIT > self.id_counter_outputs:
time.sleep(10)

def get_last_block_time(self, start_block_time):
Expand All @@ -112,27 +103,29 @@ def get_last_block_time(self, start_block_time):
except TypeError:
return start_block_time

def update_inputs(self, start_block_time: int):
def update_inputs(self, min_id: int, max_id: int):
with session_maker() as s:
result = s.execute(f"""INSERT INTO tx_id_address_mapping (transaction_id, address, block_time)
(SELECT DISTINCT * FROM
(SELECT tid, transactions_outputs.script_public_key_address, sq.block_time FROM
(SELECT
transactions.transaction_id as tid,
transactions.block_time
FROM transactions
WHERE transactions.block_time >= :blocktime) as sq

LEFT JOIN transactions_inputs ON transactions_inputs.transaction_id = sq.tid
LEFT JOIN transactions_outputs ON transactions_outputs.transaction_id = transactions_inputs.previous_outpoint_hash AND transactions_outputs.index = transactions_inputs.previous_outpoint_index
ORDER by sq.block_time ASC
LIMIT {LIMIT}) as masterq
WHERE script_public_key_address IS NOT NULL)
result = s.execute(f"""INSERT INTO tx_id_address_mapping (transaction_id, address, block_time)
ON CONFLICT DO NOTHING
RETURNING block_time;""", {"blocktime": start_block_time})
SELECT DISTINCT * FROM (
SELECT transactions_inputs.transaction_id,
transactions_outputs.script_public_key_address,
transactions.block_time FROM transactions_inputs
LEFT JOIN transactions_outputs ON
transactions_outputs.transaction_id = transactions_inputs.previous_outpoint_hash AND
transactions_outputs.index = transactions_inputs.previous_outpoint_index
LEFT JOIN transactions ON transactions.transaction_id = transactions_inputs.transaction_id
WHERE transactions_inputs.id > :minId AND transactions_inputs.id <= :maxId
AND transactions_outputs.script_public_key_address IS NOT NULL
ORDER by transactions_inputs.id
) as distinct_query
ON CONFLICT DO NOTHING
RETURNING block_time;""", {"minId": min_id, "maxId": max_id})

s.commit()

Expand All @@ -142,25 +135,20 @@ def update_inputs(self, start_block_time: int):
except IndexError:
return 0, None

def update_outputs(self, start_block_time: int):
def update_outputs(self, min_id: int, max_id: int):
with session_maker() as s:
result = s.execute(f"""
INSERT INTO tx_id_address_mapping (transaction_id, address, block_time)
(SELECT
transactions.transaction_id as tid,
transactions_outputs.script_public_key_address,
transactions.block_time
FROM transactions
LEFT JOIN transactions_outputs ON transactions.transaction_id = transactions_outputs.transaction_id
WHERE transactions.block_time >= :blocktime
ORDER by transactions.block_time ASC
LIMIT {LIMIT})
(SELECT sq.*, transactions.block_time FROM (SELECT transaction_id, script_public_key_address
FROM transactions_outputs
WHERE transactions_outputs.id > :minId and transactions_outputs.id <= :maxId
ORDER by transactions_outputs.id DESC) as sq
JOIN transactions ON transactions.transaction_id = sq.transaction_id)
ON CONFLICT DO NOTHING
RETURNING block_time;""", {"blocktime": start_block_time})
RETURNING block_time;""", {"minId": min_id, "maxId": max_id})

s.commit()

Expand Down
4 changes: 0 additions & 4 deletions models/Transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,8 @@ class TransactionOutput(Base):
script_public_key_address = Column(String)
script_public_key_type = Column(String)
accepting_block_hash = Column(String)
block_time = Column(BigInteger) # "1663286480803"


Index("block_timeoutputs_idx", TransactionOutput.block_time)
Index("idx_addr_bt", TransactionOutput.script_public_key_address, TransactionOutput.block_time)
Index("idx_txouts", TransactionOutput.transaction_id)
Index("idx_txouts_addr", TransactionOutput.script_public_key_address)
Index("tx_id_and_index", TransactionOutput.transaction_id, TransactionOutput.index)
Expand All @@ -52,7 +49,6 @@ class TransactionInput(Base):

signature_script = Column(String) # "41c903159094....281a1d26f70b0037d600554e01",
sig_op_count = Column(Integer)
block_time = Column(BigInteger) # "1663286480803"


Index("idx_txin_prev", TransactionInput.previous_outpoint_hash)
Expand Down

0 comments on commit 88f3d39

Please sign in to comment.