diff --git a/backend-rust/.sqlx/query-1098f561527a09b752d50168e35c4bf7d7759696e9bf167a010d8d81d4a0f676.json b/backend-rust/.sqlx/query-1098f561527a09b752d50168e35c4bf7d7759696e9bf167a010d8d81d4a0f676.json new file mode 100644 index 00000000..b024904c --- /dev/null +++ b/backend-rust/.sqlx/query-1098f561527a09b752d50168e35c4bf7d7759696e9bf167a010d8d81d4a0f676.json @@ -0,0 +1,20 @@ +{ + "db_name": "PostgreSQL", + "query": "\nSELECT\n hash\nFROM blocks\nWHERE finalization_time IS NOT NULL\nORDER BY height DESC\nLIMIT 1\n", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "hash", + "type_info": "Bpchar" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false + ] + }, + "hash": "1098f561527a09b752d50168e35c4bf7d7759696e9bf167a010d8d81d4a0f676" +} diff --git a/backend-rust/.sqlx/query-1c8291787209470cd9bf025bc9641eda3d128a04340dc9eaa8fb688a39a71fbe.json b/backend-rust/.sqlx/query-1c8291787209470cd9bf025bc9641eda3d128a04340dc9eaa8fb688a39a71fbe.json deleted file mode 100644 index 4c0b3021..00000000 --- a/backend-rust/.sqlx/query-1c8291787209470cd9bf025bc9641eda3d128a04340dc9eaa8fb688a39a71fbe.json +++ /dev/null @@ -1,24 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\nWITH finalizer\n AS (SELECT height FROM blocks WHERE hash = $1)\nUPDATE blocks b\n SET finalization_time = EXTRACT(\"MILLISECONDS\" FROM $3 - b.slot_time),\n finalized_by = finalizer.height\nFROM finalizer\nWHERE $2 <= b.height AND b.height < finalizer.height\nRETURNING finalizer.height", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "height", - "type_info": "Int8" - } - ], - "parameters": { - "Left": [ - "Bpchar", - "Int8", - "Timestamp" - ] - }, - "nullable": [ - false - ] - }, - "hash": "1c8291787209470cd9bf025bc9641eda3d128a04340dc9eaa8fb688a39a71fbe" -} diff --git a/backend-rust/.sqlx/query-3ff270d4a647c8837eeb174541cd3b629221191b75441b53d1f1d2d949a4250b.json b/backend-rust/.sqlx/query-3ff270d4a647c8837eeb174541cd3b629221191b75441b53d1f1d2d949a4250b.json new file mode 100644 index 00000000..81017710 --- /dev/null +++ b/backend-rust/.sqlx/query-3ff270d4a647c8837eeb174541cd3b629221191b75441b53d1f1d2d949a4250b.json @@ -0,0 +1,20 @@ +{ + "db_name": "PostgreSQL", + "query": "\nSELECT\n slot_time\nFROM blocks\nORDER BY height DESC\nLIMIT 1\n", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "slot_time", + "type_info": "Timestamp" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false + ] + }, + "hash": "3ff270d4a647c8837eeb174541cd3b629221191b75441b53d1f1d2d949a4250b" +} diff --git a/backend-rust/.sqlx/query-7239974919e11d23c08750f1a9f78e1737a302b81df505d07ec57d1e3c10d554.json b/backend-rust/.sqlx/query-7239974919e11d23c08750f1a9f78e1737a302b81df505d07ec57d1e3c10d554.json new file mode 100644 index 00000000..0b30a75e --- /dev/null +++ b/backend-rust/.sqlx/query-7239974919e11d23c08750f1a9f78e1737a302b81df505d07ec57d1e3c10d554.json @@ -0,0 +1,20 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO blocks\n (height, hash, slot_time, block_time, baker_id, total_amount, total_staked)\nSELECT * FROM UNNEST(\n $1::BIGINT[],\n $2::TEXT[],\n $3::TIMESTAMP[],\n $4::BIGINT[],\n $5::BIGINT[],\n $6::BIGINT[],\n $7::BIGINT[]\n);", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8Array", + "TextArray", + "TimestampArray", + "Int8Array", + "Int8Array", + "Int8Array", + "Int8Array" + ] + }, + "nullable": [] + }, + "hash": "7239974919e11d23c08750f1a9f78e1737a302b81df505d07ec57d1e3c10d554" +} diff --git a/backend-rust/.sqlx/query-c5dc978a5e0838b2f97696c0ef178d250ea046851b2446c61d88ad2b1c97849c.json b/backend-rust/.sqlx/query-c5dc978a5e0838b2f97696c0ef178d250ea046851b2446c61d88ad2b1c97849c.json deleted file mode 100644 index a4282e2a..00000000 --- a/backend-rust/.sqlx/query-c5dc978a5e0838b2f97696c0ef178d250ea046851b2446c61d88ad2b1c97849c.json +++ /dev/null @@ -1,19 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "INSERT INTO blocks (height, hash, slot_time, block_time, baker_id, total_amount, total_staked)\nVALUES ($1, $2, $3,\n (SELECT EXTRACT(\"MILLISECONDS\" FROM $3 - b.slot_time) FROM blocks b WHERE b.height=($1 - 1::bigint)),\n $4, $5, $6);", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8", - "Bpchar", - "Timestamp", - "Int8", - "Int8", - "Int8" - ] - }, - "nullable": [] - }, - "hash": "c5dc978a5e0838b2f97696c0ef178d250ea046851b2446c61d88ad2b1c97849c" -} diff --git a/backend-rust/.sqlx/query-c97c1cb3271378d4411f58645746f895551d46196c7343e4142a127dff9ad191.json b/backend-rust/.sqlx/query-c97c1cb3271378d4411f58645746f895551d46196c7343e4142a127dff9ad191.json new file mode 100644 index 00000000..08828671 --- /dev/null +++ b/backend-rust/.sqlx/query-c97c1cb3271378d4411f58645746f895551d46196c7343e4142a127dff9ad191.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "\nUPDATE blocks\n SET finalization_time = EXTRACT(\"MILLISECONDS\" FROM finalizer.slot_time - blocks.slot_time),\n finalized_by = finalizer.height\nFROM UNNEST($1::BIGINT[], $2::TEXT[], $3::TIMESTAMP[]) AS finalizer(height, finalized, slot_time)\nJOIN blocks last ON finalizer.finalized = last.hash\nWHERE blocks.finalization_time IS NULL AND blocks.height <= last.height\n", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8Array", + "TextArray", + "TimestampArray" + ] + }, + "nullable": [] + }, + "hash": "c97c1cb3271378d4411f58645746f895551d46196c7343e4142a127dff9ad191" +} diff --git a/backend-rust/.sqlx/query-e80b5d3738ba93255f5f2b564983513fb0b1f40abc01a4ba48b6c861925cdc03.json b/backend-rust/.sqlx/query-e80b5d3738ba93255f5f2b564983513fb0b1f40abc01a4ba48b6c861925cdc03.json deleted file mode 100644 index f54a4ed4..00000000 --- a/backend-rust/.sqlx/query-e80b5d3738ba93255f5f2b564983513fb0b1f40abc01a4ba48b6c861925cdc03.json +++ /dev/null @@ -1,26 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\nSELECT\n height as last_finalized_height,\n hash as last_finalized_hash\nFROM blocks\nWHERE finalization_time IS NULL\nORDER BY height ASC\nLIMIT 1\n", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "last_finalized_height", - "type_info": "Int8" - }, - { - "ordinal": 1, - "name": "last_finalized_hash", - "type_info": "Bpchar" - } - ], - "parameters": { - "Left": [] - }, - "nullable": [ - false, - false - ] - }, - "hash": "e80b5d3738ba93255f5f2b564983513fb0b1f40abc01a4ba48b6c861925cdc03" -} diff --git a/backend-rust/.sqlx/query-4bf3a743a28e01f648bfa45be8e699f563b1d097925615b2a85dcf5852ad0635.json b/backend-rust/.sqlx/query-fc560cd7aff38b6379b1321c21ee26fda687d61c7895fe97e2c090f5b621502f.json similarity index 63% rename from backend-rust/.sqlx/query-4bf3a743a28e01f648bfa45be8e699f563b1d097925615b2a85dcf5852ad0635.json rename to backend-rust/.sqlx/query-fc560cd7aff38b6379b1321c21ee26fda687d61c7895fe97e2c090f5b621502f.json index b136add8..f7132182 100644 --- a/backend-rust/.sqlx/query-4bf3a743a28e01f648bfa45be8e699f563b1d097925615b2a85dcf5852ad0635.json +++ b/backend-rust/.sqlx/query-fc560cd7aff38b6379b1321c21ee26fda687d61c7895fe97e2c090f5b621502f.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "INSERT INTO blocks (height, hash, slot_time, block_time, total_amount, total_staked) VALUES ($1, $2, $3, 0, $4, $5);", + "query": "INSERT INTO blocks (height, hash, slot_time, block_time, finalization_time, total_amount, total_staked) VALUES ($1, $2, $3, 0, 0, $4, $5);", "describe": { "columns": [], "parameters": { @@ -14,5 +14,5 @@ }, "nullable": [] }, - "hash": "4bf3a743a28e01f648bfa45be8e699f563b1d097925615b2a85dcf5852ad0635" + "hash": "fc560cd7aff38b6379b1321c21ee26fda687d61c7895fe97e2c090f5b621502f" } diff --git a/backend-rust/migrations/0001_initialize.up.sql b/backend-rust/migrations/0001_initialize.up.sql index ee960470..d9430b70 100644 --- a/backend-rust/migrations/0001_initialize.up.sql +++ b/backend-rust/migrations/0001_initialize.up.sql @@ -86,11 +86,12 @@ CREATE TABLE blocks( block_time INTEGER NOT NULL, - -- Milliseconds between the slot_time of this block and the block above causing this block to be finalized. + -- Milliseconds between the slot_time of this block and the first block above where this was + -- recorded as finalized. -- This is NULL until the indexer have processed the block marking this a finalized. finalization_time INTEGER, - -- Block causing this block to become finalized. + -- Block where this block was first recorded as finalized. -- This is NULL until the indexer have processed the block marking this a finalized. finalized_by BIGINT diff --git a/backend-rust/src/indexer.rs b/backend-rust/src/indexer.rs index ae8689e6..580828bf 100644 --- a/backend-rust/src/indexer.rs +++ b/backend-rust/src/indexer.rs @@ -33,7 +33,6 @@ use prometheus_client::{ registry::Registry, }; use sqlx::PgPool; -use std::borrow::Borrow; use tokio::{time::Instant, try_join}; use tokio_util::sync::CancellationToken; use tracing::{error, info, warn}; @@ -157,11 +156,17 @@ SELECT height FROM blocks ORDER BY height DESC LIMIT 1 let (sender, receiver) = tokio::sync::mpsc::channel(self.config.max_processing_batch); let receiver = tokio_stream::wrappers::ReceiverStream::from(receiver) .ready_chunks(self.config.max_processing_batch); - let traverse_future = traverse_config.traverse(self.block_pre_processor, sender); - let process_future = processor_config.process_event_stream(self.block_processor, receiver); + let traverse_future = + tokio::spawn(traverse_config.traverse(self.block_pre_processor, sender)); + let process_future = + tokio::spawn(processor_config.process_event_stream(self.block_processor, receiver)); info!("Indexing from block height {}", self.start_height); - let (result, ()) = futures::join!(traverse_future, process_future); - Ok(result?) + // Wait for both processes to exit, in case one of them results in an error, + // wait for the other which then eventually will stop gracefully as either end + // of their channel will get dropped. + let (traverse_result, process_result) = futures::join!(traverse_future, process_future); + process_result?; + Ok(traverse_result??) } } @@ -419,15 +424,13 @@ impl BlockProcessor { max_successive_failures: u32, registry: &mut Registry, ) -> anyhow::Result { - let starting_context = sqlx::query_as!( - BlockProcessingContext, + let last_finalized_block = sqlx::query!( r#" SELECT - height as last_finalized_height, - hash as last_finalized_hash + hash FROM blocks -WHERE finalization_time IS NULL -ORDER BY height ASC +WHERE finalization_time IS NOT NULL +ORDER BY height DESC LIMIT 1 "# ) @@ -435,6 +438,24 @@ LIMIT 1 .await .context("Failed to query data for save context")?; + let last_block = sqlx::query!( + r#" +SELECT + slot_time +FROM blocks +ORDER BY height DESC +LIMIT 1 +"# + ) + .fetch_one(&pool) + .await + .context("Failed to query data for save context")?; + + let starting_context = BlockProcessingContext { + last_finalized_hash: last_finalized_block.hash, + last_block_slot_time: last_block.slot_time, + }; + let blocks_processed = Counter::default(); registry.register( "blocks_processed", @@ -486,22 +507,24 @@ impl ProcessEvent for BlockProcessor { let start_time = Instant::now(); let mut out = format!("Processed {} blocks:", batch.len()); let mut tx = self.pool.begin().await.context("Failed to create SQL transaction")?; - let mut override_context = None; - for data in batch { - let context = override_context.as_ref().unwrap_or(self.current_context.borrow()); - let new_context = data.save(context, &mut tx).await.context("Failed saving block")?; - if new_context.is_some() { - override_context = new_context; + // Clone the context, to avoid mutating the current context until we are certain + // nothing fails. + let mut new_context = self.current_context.clone(); + PreparedBlock::batch_save(batch, &mut new_context, &mut tx).await?; + for block in batch { + for item in block.prepared_block_items.iter() { + item.save(&mut tx).await?; } - out.push_str(format!("\n- {}:{}", data.height, data.hash).as_str()) + out.push_str(format!("\n- {}:{}", block.height, block.hash).as_str()) } tx.commit().await.context("Failed to commit SQL transaction")?; - if let Some(context) = override_context { - self.current_context = context; - } + // Update metrics. let duration = start_time.elapsed(); - self.processing_duration_seconds.observe(duration.as_secs_f64()); + self.processing_duration_seconds.observe(duration.as_secs_f64() / batch.len() as f64); self.blocks_processed.inc_by(u64::try_from(batch.len())?); + // Update the current context when we are certain that nothing failed during + // processing. + self.current_context = new_context; Ok(out) } @@ -525,13 +548,14 @@ impl ProcessEvent for BlockProcessor { } } +#[derive(Clone)] struct BlockProcessingContext { - /// The last finalized block height according to the latest indexed block. - /// This is needed in order to compute the finalization time of blocks. - last_finalized_height: i64, /// The last finalized block hash according to the latest indexed block. - /// This is needed in order to compute the finalization time of blocks. - last_finalized_hash: String, + /// This is used when computing the finalization time. + last_finalized_hash: String, + /// The slot time of the last processed block. + /// This is used when computing the block time. + last_block_slot_time: NaiveDateTime, } /// Raw block information fetched from a Concordium Node. @@ -571,7 +595,7 @@ async fn save_genesis_data(endpoint: v2::Endpoint, pool: &PgPool) -> anyhow::Res let total_amount = i64::try_from(genesis_tokenomics.common_reward_data().total_amount.micro_ccd())?; sqlx::query!( - r#"INSERT INTO blocks (height, hash, slot_time, block_time, total_amount, total_staked) VALUES ($1, $2, $3, 0, $4, $5);"#, + r#"INSERT INTO blocks (height, hash, slot_time, block_time, finalization_time, total_amount, total_staked) VALUES ($1, $2, $3, 0, 0, $4, $5);"#, 0, block_hash, slot_time, @@ -693,60 +717,95 @@ impl PreparedBlock { }) } - async fn save( - &self, - context: &BlockProcessingContext, + async fn batch_save( + batch: &[Self], + context: &mut BlockProcessingContext, tx: &mut sqlx::Transaction<'static, sqlx::Postgres>, - ) -> anyhow::Result> { + ) -> anyhow::Result<()> { + let mut heights = Vec::with_capacity(batch.len()); + let mut hashes = Vec::with_capacity(batch.len()); + let mut slot_times = Vec::with_capacity(batch.len()); + let mut baker_ids = Vec::with_capacity(batch.len()); + let mut total_amounts = Vec::with_capacity(batch.len()); + let mut total_staked = Vec::with_capacity(batch.len()); + let mut block_times = Vec::with_capacity(batch.len()); + + let mut finalizers = Vec::with_capacity(batch.len()); + let mut last_finalizeds = Vec::with_capacity(batch.len()); + let mut finalizers_slot_time = Vec::with_capacity(batch.len()); + + for block in batch { + heights.push(block.height); + hashes.push(block.hash.clone()); + slot_times.push(block.slot_time); + baker_ids.push(block.baker_id); + total_amounts.push(block.total_amount); + total_staked.push(block.total_staked); + block_times.push( + block + .slot_time + .signed_duration_since(context.last_block_slot_time) + .num_milliseconds(), + ); + context.last_block_slot_time = block.slot_time; + + // Check if this block knows of a new finalized block. + // If so, note it down so we can mark the blocks since last time as finalized by + // this block. + if block.block_last_finalized != context.last_finalized_hash { + finalizers.push(block.height); + finalizers_slot_time.push(block.slot_time); + last_finalizeds.push(block.block_last_finalized.clone()); + + context.last_finalized_hash = block.block_last_finalized.clone(); + } + } + sqlx::query!( - r#"INSERT INTO blocks (height, hash, slot_time, block_time, baker_id, total_amount, total_staked) -VALUES ($1, $2, $3, - (SELECT EXTRACT("MILLISECONDS" FROM $3 - b.slot_time) FROM blocks b WHERE b.height=($1 - 1::bigint)), - $4, $5, $6);"#, - self.height, - self.hash, - self.slot_time, - self.baker_id, - self.total_amount, - self.total_staked + r#"INSERT INTO blocks + (height, hash, slot_time, block_time, baker_id, total_amount, total_staked) +SELECT * FROM UNNEST( + $1::BIGINT[], + $2::TEXT[], + $3::TIMESTAMP[], + $4::BIGINT[], + $5::BIGINT[], + $6::BIGINT[], + $7::BIGINT[] +);"#, + &heights, + &hashes, + &slot_times, + &block_times, + &baker_ids as &[Option], + &total_amounts, + &total_staked ) .execute(tx.as_mut()) - .await?; + .await?; - // Check if this block knows of a new finalized block. - // If so, mark the blocks since last time as finalized by this block. - let new_context = if self.block_last_finalized != context.last_finalized_hash { - let last_height = context.last_finalized_height; - - let rec = sqlx::query!( - r#" -WITH finalizer - AS (SELECT height FROM blocks WHERE hash = $1) -UPDATE blocks b - SET finalization_time = EXTRACT("MILLISECONDS" FROM $3 - b.slot_time), + // With all blocks in the batch inserted we update blocks which we now can + // compute the finalization time for. Using the list of finalizer blocks + // (those containing a last finalized block different from its predecessor) + // we update the blocks below which does not contain finalization time and + // compute it to be the difference between the slot_time of the block and the + // finalizer block. + let rec = sqlx::query!( + r#" +UPDATE blocks + SET finalization_time = EXTRACT("MILLISECONDS" FROM finalizer.slot_time - blocks.slot_time), finalized_by = finalizer.height -FROM finalizer -WHERE $2 <= b.height AND b.height < finalizer.height -RETURNING finalizer.height"#, - self.block_last_finalized, - last_height, - self.slot_time - ) - .fetch_one(tx.as_mut()) - .await - .context("Failed updating finalization_time")?; - let new_context = BlockProcessingContext { - last_finalized_hash: self.block_last_finalized.clone(), - last_finalized_height: rec.height, - }; - Some(new_context) - } else { - None - }; - for item in self.prepared_block_items.iter() { - item.save(tx).await?; - } - Ok(new_context) +FROM UNNEST($1::BIGINT[], $2::TEXT[], $3::TIMESTAMP[]) AS finalizer(height, finalized, slot_time) +JOIN blocks last ON finalizer.finalized = last.hash +WHERE blocks.finalization_time IS NULL AND blocks.height <= last.height +"#, + &finalizers, + &last_finalizeds, + &finalizers_slot_time + ) + .execute(tx.as_mut()) + .await?; + Ok(()) } }