Skip to content

Commit

Permalink
Use paged batch encoding
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Zaikin committed Jan 9, 2024
1 parent 61e7f08 commit 0d38b95
Show file tree
Hide file tree
Showing 16 changed files with 476 additions and 44 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,7 @@ target/
bin/

# Tezos wallet
.tezos-client
.tezos-client

# Jupyter checkpoints
.ipynb_checkpoints
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/pre-block/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub fn validate_certificate_signature(
.iter()
.any(|x| (*x as usize) >= config.authorities.len())
{
anyhow::bail!("Unknown authority");
anyhow::bail!("Unknown authority {:?}", cert.signers);
}

if cert.signers.len() < config.quorum_threshold() {
Expand Down
2 changes: 1 addition & 1 deletion docker/operator/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ COPY . ./
RUN make build-operator

FROM alpine:3.15 AS rollup
RUN apk --no-cache add binutils gcc gmp libgmpxx hidapi libc-dev libev libffi sudo
RUN apk --no-cache add binutils gcc gmp libgmpxx hidapi libc-dev libev libffi sudo jq
ARG OCTEZ_PROTO
COPY --from=octez /usr/local/bin/octez-smart-rollup-node-${OCTEZ_PROTO} /usr/bin/octez-smart-rollup-node
COPY --from=octez /usr/local/bin/octez-client /usr/bin/octez-client
Expand Down
23 changes: 20 additions & 3 deletions docker/operator/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,19 @@ rollup_dir="/root/.tezos-smart-rollup-node"
endpoint=$NODE_URI
faucet="https://faucet.$NETWORK.teztnets.xyz"

min_batch_elements=1
max_batch_elements=15
min_batch_size=1
max_batch_size=32607
batcher_config="{ \"batcher\": { \"min_batch_elements\": $min_batch_elements, \"max_batch_elements\": $max_batch_elements, \"min_batch_size\": $min_batch_size, \"max_batch_size\": $max_batch_size }}"

if [ -z "$NODE_URI" ]; then
if [ -z "$NETWORK" ]; then
echo "NETWORK is not set"
exit 1
fi
endpoint="https://rpc.$NETWORK.teztnets.xyz"
#endpoint="https://$NETWORK.ecadinfra.com"
#endpoint="https://rpc.$NETWORK.teztnets.xyz"
endpoint="https://$NETWORK.ecadinfra.com"
#endpoint="https://rpc.tzkt.io/$NETWORK"
fi

Expand All @@ -35,6 +41,11 @@ import_key() {
fi
}

update_config() {
cmd="jq '. += $batcher_config' $rollup_dir/config.json"
echo "$($cmd)" > $rollup_dir/config.json
}

run_node() {
import_key

Expand Down Expand Up @@ -65,7 +76,10 @@ deploy_rollup() {
echo "Found existing rollup config"
if [ "$1" == "--force" ]; then
echo "Overriding with new kernel"
rm -rf "$rollup_dir/*"
rm -rf "$rollup_dir/context"
rm -rf "$rollup_dir/storage"
rm "$rollup_dir/metadata"
rm "$rollup_dir/config.json"
octez-client --endpoint "$endpoint" forget all smart rollups --force
else
exit 0
Expand Down Expand Up @@ -116,6 +130,9 @@ case $command in
send_message)
send_message $@
;;
update_config)
update_config
;;
*)
cat <<EOF
Available commands:
Expand Down
2 changes: 1 addition & 1 deletion docker/operator/local.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ ARG OCTEZ_TAG
FROM tezos/tezos:${OCTEZ_TAG} AS octez

FROM alpine:3.15 AS rollup
RUN apk --no-cache add binutils gcc gmp libgmpxx hidapi libc-dev libev libffi sudo
RUN apk --no-cache add binutils gcc gmp libgmpxx hidapi libc-dev libev libffi sudo jq
ARG OCTEZ_PROTO
COPY --from=octez /usr/local/bin/octez-smart-rollup-node-${OCTEZ_PROTO} /usr/bin/octez-smart-rollup-node
COPY --from=octez /usr/local/bin/octez-client /usr/bin/octez-client
Expand Down
2 changes: 2 additions & 0 deletions kernel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ pre-block = { workspace = true, features = ["conversions"] }
serde_yaml = "0.9.29"
hex = "*"

serde_json.workspace = true
serde.workspace = true
bcs.workspace = true

narwhal-config.workspace = true
pre-block = { workspace = true, features = ["conversions"] }
47 changes: 41 additions & 6 deletions kernel/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
//
// SPDX-License-Identifier: MIT

use pre_block::fixture::NarwhalFixture;
use narwhal_config::{Committee, Import};
use pre_block::{fixture::NarwhalFixture, PublicKey};
use serde::{Deserialize, Serialize};
use std::path::PathBuf;

Expand All @@ -22,23 +23,57 @@ struct KernelSetup {
pub instructions: Vec<Instruction>,
}

fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
let mut output_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
output_path.push("../bin/kernel_config.yaml");

fn mock_setup() -> std::result::Result<KernelSetup, Box<dyn std::error::Error>> {
let fixture = NarwhalFixture::default();

let epoch = 0;
let value = bcs::to_bytes(&fixture.authorities())?;

let kernel_setup = KernelSetup {
let setup = KernelSetup {
instructions: vec![Instruction {
set: Set {
value: hex::encode(&value),
to: format!("/authorities/{}", epoch),
},
}],
};
Ok(setup)
}

fn real_setup() -> std::result::Result<KernelSetup, Box<dyn std::error::Error>> {
let mut committee = Committee::import("../launcher/defaults/committee.json")?;
committee.load();

let authorities: Vec<PublicKey> = committee
.authorities()
.map(|auth| auth.protocol_key_bytes().0.to_vec())
.collect();
let value = bcs::to_bytes(&authorities)?;

let setup = KernelSetup {
instructions: vec![
// Instruction {
// set: Set {
// value: hex::encode(&value),
// to: format!("/authorities/{}", committee.epoch()),
// },
// },
Instruction {
set: Set {
value: hex::encode(&(0u64.to_be_bytes())),
to: format!("/index"),
},
},
],
};
Ok(setup)
}

fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
let mut output_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
output_path.push("../bin/kernel_config.yaml");

let kernel_setup = real_setup()?;

let file = std::fs::File::create(output_path).expect("Could not create file");
serde_yaml::to_writer(file, &kernel_setup)?;
Expand Down
94 changes: 75 additions & 19 deletions kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,30 @@ mod storage;
#[cfg(test)]
mod tests;

use storage::{read_authorities, read_head, write_authorities, write_block, write_head, Store};
use storage::{
read_authorities, read_chunk, read_head, write_authorities, write_block, write_chunk,
write_head, Store,
};

const LEVELS_PER_EPOCH: u32 = 100;

pub fn apply_pre_block<Host: Runtime>(host: &mut Host, pre_block: PreBlock) {
fn reconstruct_batch(host: &impl Runtime, header: &[u8]) -> anyhow::Result<Vec<u8>> {
if header.len() % 32 != 0 {
anyhow::bail!("DA header size must be multiple of 32 (chunk digest size)");
}

let mut batch: Vec<Vec<u8>> = Vec::with_capacity(header.len() / 32);
for hash in header.chunks(32) {
match read_chunk(host, hash) {
Some(chunk) => batch.push(chunk),
None => anyhow::bail!("Missing chunk {}", hex::encode(hash)),
}
}

Ok(batch.concat())
}

fn apply_pre_block<Host: Runtime>(host: &mut Host, pre_block: PreBlock) {
let mut block: Vec<Vec<u8>> = Vec::new();
for tx in pre_block.into_transactions() {
let tx_hash = digest_256(&tx).unwrap();
Expand All @@ -32,11 +51,23 @@ pub fn apply_pre_block<Host: Runtime>(host: &mut Host, pre_block: PreBlock) {
}

fn process_external_message<Host: Runtime>(host: &mut Host, contents: &[u8], level: u32) {
let pre_block: PreBlock = bcs::from_bytes(contents).expect("Failed to parse consensus output");
let pre_block: PreBlock = match bcs::from_bytes(contents) {
Ok(value) => value,
Err(err) => {
host.write_debug(&format!("Failed to parse pre-block: {}", err));
return;
}
};
host.write_debug(&format!("Incoming pre-block #{}", pre_block.index()));

let epoch = 0; // (level % LEVELS_PER_EPOCH) as u64;
let authorities = read_authorities(host, epoch);
let authorities = match read_authorities(host, epoch) {
Some(value) => value,
None => {
host.write_debug(&format!("Authorities for epoch #{epoch} not initialized"));
return;
}
};
let config = DsnConfig::new(epoch, authorities);

{
Expand All @@ -57,54 +88,79 @@ fn process_external_message<Host: Runtime>(host: &mut Host, contents: &[u8], lev
}

fn process_internal_message<Host: Runtime>(host: &mut Host, contents: &[u8]) {
let config: DsnConfig = bcs::from_bytes(contents).expect("Failed to parse authorities");
let config: DsnConfig = match bcs::from_bytes(contents) {
Ok(value) => value,
Err(err) => {
host.write_debug(&format!("Failed to parse DSN config: {}", err));
return;
}
};
write_authorities(host, config.epoch, &config.authorities);
host.write_debug(&format!(
"Authorities for epoch #{0} initialized",
config.epoch
));
}

pub fn kernel_loop<Host: Runtime>(host: &mut Host) {
host.write_debug(&format!("Kernel loop started"));
let smart_rollup_address = host.reveal_metadata().address();
let mut chunked_message: Vec<u8> = Vec::new();
loop {
match host.read_input().expect("Failed to read inbox") {
Some(message) => {
let bytes = message.as_ref();
match InboxMessage::<MichelsonBytes>::parse(bytes).expect("Failed to parse message")
{
(_, InboxMessage::External(payload)) => {
match InboxMessage::<MichelsonBytes>::parse(bytes).ok() {
Some((_, InboxMessage::External(payload))) => {
match ExternalMessageFrame::parse(payload) {
Ok(ExternalMessageFrame::Targetted { address, contents })
if *address.hash() == smart_rollup_address =>
{
host.write_debug(&format!("Incoming external message"));
match contents {
[0u8, chunk @ ..] => chunked_message.extend_from_slice(chunk),
[1u8, chunk @ ..] => {
chunked_message.extend_from_slice(chunk);
process_external_message(
host,
&chunked_message,
message.level,
);
chunked_message.clear();
[0u8, chunk @ ..] => {
let hash = write_chunk(host, chunk);
host.write_debug(&format!(
"Stashing chunk: {}",
hex::encode(&hash)
));
}
[1u8, header @ ..] => match reconstruct_batch(host, header) {
Ok(contents) => {
process_external_message(
host,
&contents,
message.level,
);
}
Err(err) => {
host.write_debug(&format!("Invalid batch: {}", err));
}
},
_ => panic!("Unexpected message tag"),
}
}
_ => { /* not for us */ }
}
}
(_, InboxMessage::Internal(msg)) => {
Some((_, InboxMessage::Internal(msg))) => {
match msg {
InternalInboxMessage::Transfer(transfer) => {
host.write_debug(&format!(
"Incoming internal message: {}",
hex::encode(&transfer.payload.0)
));
process_internal_message(host, &transfer.payload.0);
}
_ => { /* system messages */ }
}
}
None => { /* not for us */ }
}
}
_ => break,
}
}
host.write_debug(&format!("Kernel loop exited"));
}

tezos_smart_rollup_entrypoint::kernel_entry!(kernel_loop);
Loading

0 comments on commit 0d38b95

Please sign in to comment.