Skip to content

Commit

Permalink
Sequencer API is serving requests
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Zaikin committed Jan 4, 2024
1 parent befd288 commit 0ed24d9
Show file tree
Hide file tree
Showing 9 changed files with 331 additions and 111 deletions.
277 changes: 226 additions & 51 deletions Cargo.lock

Large diffs are not rendered by default.

39 changes: 26 additions & 13 deletions crates/pre-block/src/fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ use std::collections::HashMap;
use std::{collections::BTreeSet, num::NonZeroUsize};

use narwhal_test_utils::{latest_protocol_version, CommitteeFixture};
use narwhal_types::{CertificateDigest, CertificateV2, Header, VoteAPI};
use narwhal_types::{CertificateDigest, CertificateV2, Header, VoteAPI, HeaderV2Builder};
use narwhal_utils::protocol_config::ProtocolConfig;

use crate::conversion::convert_batch;
use crate::{Batch, Certificate, Digest, PreBlock, PreBlockStore, PublicKey};
use crate::{Certificate, Digest, Batch, PreBlock, PreBlockStore, PublicKey};

pub const COMMITTEE_SIZE: usize = 4;

Expand Down Expand Up @@ -117,11 +116,30 @@ impl NarwhalFixture {
}

fn round(&mut self, num_txs: u32) -> (Vec<Vec<Batch>>, Vec<Certificate>) {
let (round, headers, batches) =
self.fixture
.headers_round(self.round, &self.parents, &self.config, num_txs);
let mut headers: Vec<Header> = Vec::new();
let mut batches: Vec<Vec<Batch>> = Vec::new();

for authority in self.fixture.authorities() {
let txs: Batch = (0..num_txs)
.into_iter()
.map(|i| i.to_be_bytes().to_vec())
.collect();

let builder = HeaderV2Builder::default();
let header = builder
.author(authority.id())
.round(self.round)
.epoch(0)
.parents(self.parents.clone())
.with_payload_batch(narwhal_types::Batch::new(txs.clone(), &self.config), 0, 0)
.build()
.unwrap();

headers.push(header.into());
batches.push(vec![txs]);
}

self.round = round;
self.round += 1;
self.parents = headers
.iter()
.map(|header| CertificateDigest::new(header.digest().0))
Expand All @@ -132,12 +150,7 @@ impl NarwhalFixture {
.map(|header| self.certify(header))
.collect();

let mut res_batches: Vec<Vec<Batch>> = Vec::new();
for batch_list in batches {
res_batches.push(batch_list.into_iter().map(convert_batch).collect())
}

(res_batches, certificates)
(batches, certificates)
}

fn leader(&self) -> Certificate {
Expand Down
4 changes: 2 additions & 2 deletions crates/pre-block/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ pub fn validate_certificate_chain(

match store.get_certificate_index(parent) {
Some(prev_index) if prev_index + 1 != index => {
anyhow::bail!("Parent certificate is not from a preceding sub dag")
anyhow::bail!("Parent certificate is not from a preceding sub dag {}", hex::encode(parent))
}
None => {
anyhow::bail!("Parent certificate cannot be not found");
anyhow::bail!("Parent certificate cannot be not found {}", hex::encode(parent));
}
_ => (),
}
Expand Down
2 changes: 1 addition & 1 deletion docker/kernel/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ deploy_rollup() {

octez-client --endpoint "$endpoint" originate smart rollup "rollup" from operator of kind wasm_2_0_0 of type $rollup_type with kernel "$kernel" --burn-cap 999 --force | tee originate.out
operator_address=$(octez-client --endpoint "$endpoint" show address "operator" 2>&1 | grep Hash | grep -oE "tz.*")
octez-smart-rollup-node --base-dir "$client_dir" init operator config for "rollup" with operators "$operator_address" --data-dir "$rollup_dir"
octez-smart-rollup-node --base-dir "$client_dir" init operator config for "rollup" with operators "$operator_address" --data-dir "$rollup_dir" --force
}

generate_key() {
Expand Down
2 changes: 2 additions & 0 deletions kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub fn apply_pre_block<Host: Runtime>(host: &mut Host, pre_block: PreBlock) {

fn process_external_message<Host: Runtime>(host: &mut Host, contents: &[u8], level: u32) {
let pre_block: PreBlock = bcs::from_bytes(contents).expect("Failed to parse consensus output");
host.write_debug(&format!("Incoming pre-block #{}", pre_block.index()));

let epoch = 0; // (level % LEVELS_PER_EPOCH) as u64;
let authorities = read_authorities(host, epoch);
Expand All @@ -43,6 +44,7 @@ fn process_external_message<Host: Runtime>(host: &mut Host, contents: &[u8], lev
match pre_block.verify(&config, &store) {
Ok(()) => {
pre_block.commit(&mut store);
host.write_debug(&format!("Handled pre-block #{}", pre_block.index()));
}
Err(err) => {
host.write_debug(&format!("Skipping pre-block: {}", err));
Expand Down
3 changes: 2 additions & 1 deletion sequencer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ edition = "2021"
version.workspace = true

[dependencies]
reqwest.workspace = true
anyhow.workspace = true
serde.workspace = true
serde_json.workspace = true
tokio = { workspace = true, features = ["rt-multi-thread"] }
fastcrypto.workspace = true
clap.workspace = true
Expand All @@ -25,6 +25,7 @@ hex = "*"
tide = "0.16"
env_logger = "0.10.0"
log = "0.4"
surf = "2.3.2"

narwhal-types.workspace = true
narwhal-crypto.workspace = true
Expand Down
16 changes: 1 addition & 15 deletions sequencer/src/da_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

use log::info;
use pre_block::fixture::NarwhalFixture;
use pre_block::{Certificate, CertificateHeader, PreBlock};
use pre_block::PreBlock;
use serde::Serialize;
use std::{sync::mpsc, time::Duration};
use tezos_data_encoding::enc::BinWriter;
Expand Down Expand Up @@ -47,19 +47,6 @@ pub fn batch_encode_to<T: Serialize>(
Ok(())
}

fn dummy_pre_block(index: u64) -> PreBlock {
PreBlock {
index,
leader: Certificate {
header: CertificateHeader::default(),
signature: vec![],
signers: vec![],
},
certificates: vec![],
batches: vec![],
}
}

pub async fn fetch_pre_blocks(
prev_index: u64,
pre_blocks_tx: mpsc::Sender<PreBlock>,
Expand All @@ -71,7 +58,6 @@ pub async fn fetch_pre_blocks(
// while let Some(pre_block) = stream.next().await {

loop {
// let pre_block = dummy_pre_block(index);
let pre_block = fixture.next_pre_block(1);
if pre_block.index() == index {
info!("[DA fetch] received pre-block #{}", index);
Expand Down
21 changes: 19 additions & 2 deletions sequencer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,33 @@ async fn broadcast_transaction(mut req: tide::Request<State>) -> tide::Result<St
Ok(hex::encode(tx_digest))
}

async fn get_block_by_level(req: tide::Request<State>) -> tide::Result<tide::Body> {
async fn get_block_by_level(req: tide::Request<State>) -> tide::Result<String> {
let level: u32 = req.param("level")?.parse().unwrap_or(0);
let block = req.state().rollup_client.get_block_by_level(level).await?;

if let Some(txs) = block {
let res: Vec<String> = txs.iter().map(|tx_digest| hex::encode(tx_digest)).collect();
tide::Body::from_json(&res)
Ok(serde_json::to_string(&res)?)
} else {
Err(tide::Error::new(404, anyhow::anyhow!("Block not found")))
}
}

async fn get_head(req: tide::Request<State>) -> tide::Result<String> {
let head = req.state().rollup_client.get_head().await?;
Ok(head.to_string())
}

async fn get_authorities(req: tide::Request<State>) -> tide::Result<String> {
let epoch: u64 = req.param("epoch")?.parse().unwrap_or(0);
let authorities = req.state().rollup_client.get_authorities(epoch).await?;
let res: Vec<String> = authorities
.into_iter()
.map(|a| hex::encode(a))
.collect();
Ok(serde_json::to_string(&res)?)
}

async fn run_api_server(
rpc_addr: String,
rpc_port: u16,
Expand All @@ -71,6 +86,8 @@ async fn run_api_server(
let mut app = tide::with_state(State::new(rollup_node_url, worker_node_url));
app.at("/broadcast").post(broadcast_transaction);
app.at("/blocks/:level").get(get_block_by_level);
app.at("/authorities/:epoch").get(get_authorities);
app.at("/head").get(get_head);
app.listen(rpc_host).await?;
Ok(())
}
Expand Down
78 changes: 52 additions & 26 deletions sequencer/src/rollup_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,65 +37,78 @@ pub enum DurableStorageResponse {
#[derive(Clone, Debug)]
pub struct RollupClient {
pub base_url: String,
client: reqwest::Client,
client: surf::Client,
}

impl RollupClient {
pub fn new(base_url: String) -> Self {
Self {
base_url,
client: reqwest::Client::new(),
client: surf::Client::new(),
}
}

pub async fn get_rollup_address(&self) -> anyhow::Result<SmartRollupAddress> {
let res = self
let mut res = self
.client
.get(format!("{}/global/smart_rollup_address", self.base_url))
.send()
.await?;
.await
.map_err(|e| anyhow::anyhow!("{}", e))?;

if res.status() == 200 {
let value: String = res.json().await?;
let value: String = res
.body_json()
.await
.map_err(|e| anyhow::anyhow!("{}", e))?;
Ok(SmartRollupAddress::from_b58check(&value)?)
} else {
Err(anyhow::anyhow!(
"Get rollup address: response status {0}",
res.status().as_u16()
"Get rollup address: response status {}",
res.status()
))
}
}

pub async fn get_inbox_level(&self) -> anyhow::Result<u32> {
let res = self
let mut res = self
.client
.get(format!("{}/global/block/head/level", self.base_url))
.send()
.await?;
.await
.map_err(|e| anyhow::anyhow!("{}", e))?;

if res.status() == 200 {
let value: u32 = res.json().await?;
let value: u32 = res
.body_json()
.await
.map_err(|e| anyhow::anyhow!("{}", e))?;
Ok(value)
} else {
Err(anyhow::anyhow!(
"Get inbox level: response status {0}",
res.status().as_u16()
"Get inbox level: response status {}",
res.status()
))
}
}

pub async fn store_get(&self, key: String) -> anyhow::Result<Option<Vec<u8>>> {
let res = self
let mut res = self
.client
.get(format!(
"{}/global/block/head/durable/wasm_2_0_0/value?key={}",
self.base_url, key
))
.send()
.await?;
.await
.map_err(|e| anyhow::anyhow!("{}", e))?;

if res.status() == 200 || res.status() == 500 {
let content: Option<DurableStorageResponse> = res.json().await?;
let content: Option<DurableStorageResponse> = res
.body_json()
.await
.map_err(|e| anyhow::anyhow!("{}", e))?;

match content {
Some(DurableStorageResponse::Value(value)) => {
let payload = hex::decode(value)?;
Expand All @@ -110,42 +123,55 @@ impl RollupClient {
}
} else {
Err(anyhow::anyhow!(
"Store get: response status {0}",
res.status().as_u16()
"Store get: response status {}",
res.status()
))
}
}

pub async fn inject_batch(&self, batch: DaBatch) -> anyhow::Result<()> {
let messages: Vec<String> = batch.into_iter().map(|msg| hex::encode(msg)).collect();

let res = self
let mut res = self
.client
.post(format!("{}/local/batcher/injection", self.base_url))
.json(&messages)
.send()
.await?;
.body_json(&messages)
.map_err(|e| anyhow::anyhow!("{}", e))?
.await
.map_err(|e| anyhow::anyhow!("{}", e))?;

if res.status() == 200 {
Ok(())
} else {
Err(anyhow::anyhow!(
"Inject batch: response status {0} - {1}",
res.status().as_u16(),
res.text().await?,
"Inject batch: response status {} - {}",
res.status(),
res.body_string().await.unwrap()
))
}
}

pub async fn get_block_by_level(&self, level: u32) -> anyhow::Result<Option<Vec<[u8; 32]>>> {
pub async fn get_block_by_level(&self, level: u32) -> anyhow::Result<Option<Vec<Vec<u8>>>> {
let res = self.store_get(format!("/blocks/{level}")).await?;
if let Some(bytes) = res {
Ok(bcs::from_bytes(&bytes)?)
Ok(Some(bcs::from_bytes(&bytes)?))
} else {
Ok(None)
}
}

pub async fn get_head(&self) -> anyhow::Result<u32> {
let res = self.store_get("/head".into()).await?;
if let Some(bytes) = res {
let index = u32::from_be_bytes(bytes.try_into().map_err(|b| {
anyhow::anyhow!("Failed to parse head: {}", hex::encode(b))
})?);
Ok(index + 1)
} else {
Ok(0)
}
}

pub async fn get_next_index(&self) -> anyhow::Result<u64> {
let res = self.store_get("/index".into()).await?;
if let Some(bytes) = res {
Expand Down

0 comments on commit 0ed24d9

Please sign in to comment.