Skip to content

Commit

Permalink
enable batch nullify
Browse files Browse the repository at this point in the history
  • Loading branch information
sergeytimoshin committed Nov 27, 2024
1 parent 01c3936 commit c9952ce
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 57 deletions.
7 changes: 6 additions & 1 deletion forester-utils/src/indexer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ pub trait Indexer<R: RpcConnection>: Sync + Send + Debug {
}

fn get_leaf_indices_tx_hashes(
&self,
&mut self,
_merkle_tree_pubkey: Pubkey,
_zkp_batch_size: usize,
) -> Vec<(u32, [u8; 32], [u8; 32])> {
Expand All @@ -173,6 +173,11 @@ pub trait Indexer<R: RpcConnection>: Sync + Send + Debug {
) {
unimplemented!()
}

async fn update_test_indexer_in_nullification(&mut self, _merkle_tree_pubkey: Pubkey, _nullifier: &[u8; 32], _index: usize) {
unimplemented!()
}

}

#[derive(Debug, Clone)]
Expand Down
62 changes: 35 additions & 27 deletions forester/src/batched_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use solana_sdk::signer::Signer;
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::{debug, error, info};
use account_compression::batch::BatchState;

pub struct BatchedOperations<R: RpcConnection, I: Indexer<R>> {
pub rpc_pool: Arc<SolanaRpcPool<R>>,
Expand Down Expand Up @@ -97,9 +98,6 @@ impl<R: RpcConnection, I: Indexer<R>> BatchedOperations<R, I> {
&mut rpc,
self.merkle_tree,
self.output_queue,
// batch_index as usize,
// merkle_tree_next_index,
// batch_size
)
.await;

Expand All @@ -111,10 +109,8 @@ impl<R: RpcConnection, I: Indexer<R>> BatchedOperations<R, I> {
info!("Performing batch nullify operation");
let mut rpc = self.rpc_pool.get_connection().await?;

// Get nullify instruction data
let instruction_data = self.get_batched_nullify_ix_data().await?;

// Create and send batch nullify instruction
let instruction = create_batch_nullify_instruction(
self.authority.pubkey(),
self.derivation,
Expand All @@ -123,7 +119,6 @@ impl<R: RpcConnection, I: Indexer<R>> BatchedOperations<R, I> {
instruction_data.try_to_vec()?,
);

debug!("Sending batch nullify transaction");
let result = rpc
.create_and_send_transaction_with_event::<BatchNullifyEvent>(
&[instruction],
Expand All @@ -144,16 +139,15 @@ impl<R: RpcConnection, I: Indexer<R>> BatchedOperations<R, I> {
)
};

// Update test indexer
self.indexer
.lock()
.await
.update_test_indexer_after_nullification(
&mut rpc,
self.merkle_tree,
batch_index as usize,
)
.await;
// self.indexer
// .lock()
// .await
// .update_test_indexer_after_nullification(
// &mut rpc,
// self.merkle_tree,
// batch_index as usize,
// )
// .await;

info!("Batch nullify completed successfully: {:?}", result);
Ok(batch_size as usize)
Expand Down Expand Up @@ -281,7 +275,7 @@ impl<R: RpcConnection, I: Indexer<R>> BatchedOperations<R, I> {
)
} else {
error!(
"Failed to get proof from server: {:?}",
"create_append_batch_ix_data: failed to get proof from server: {:?}",
response.text().await
);
return Err(ForesterError::Custom(
Expand All @@ -299,8 +293,7 @@ impl<R: RpcConnection, I: Indexer<R>> BatchedOperations<R, I> {
async fn get_batched_nullify_ix_data(&self) -> Result<InstructionDataBatchNullifyInputs> {
let mut rpc = self.rpc_pool.get_connection().await.unwrap();

// Get merkle tree data
let (zkp_batch_size, _batch_index, old_root, old_root_index, leaves_hashchain) = {
let (zkp_batch_size, batch_index, old_root, old_root_index, leaves_hashchain) = {
let mut account = rpc.get_account(self.merkle_tree).await.unwrap().unwrap();
let merkle_tree =
ZeroCopyBatchedMerkleTreeAccount::from_bytes_mut(account.data.as_mut_slice())
Expand All @@ -317,20 +310,27 @@ impl<R: RpcConnection, I: Indexer<R>> BatchedOperations<R, I> {
(zkp_size, batch_idx, root, root_idx, hashchain)
};

// Get proofs and leaf data
let leaf_indices_tx_hashes = self
.indexer
.lock()
.await
.get_leaf_indices_tx_hashes(self.merkle_tree, zkp_batch_size as usize);

let mut leaves = Vec::new();
let mut nullifiers = Vec::new();
let mut tx_hashes = Vec::new();
let mut old_leaves = Vec::new();
let mut path_indices = Vec::new();
let mut merkle_proofs = Vec::new();

let batch = {
let mut merkle_tree_account = rpc.get_account(self.merkle_tree).await.unwrap().unwrap();
let merkle_tree = ZeroCopyBatchedMerkleTreeAccount::from_bytes_mut(
merkle_tree_account.data.as_mut_slice(),
)
.unwrap();
merkle_tree.batches[batch_index].clone()
};

for (index, leaf, tx_hash) in leaf_indices_tx_hashes.iter() {
path_indices.push(*index);
leaves.push(*leaf);
Expand All @@ -343,14 +343,18 @@ impl<R: RpcConnection, I: Indexer<R>> BatchedOperations<R, I> {
old_leaves.push(proof.leaf);
merkle_proofs.push(proof.proof);

let index_bytes = (*index as usize).to_be_bytes();
let nullifier = Poseidon::hashv(&[leaf, &index_bytes, tx_hash]).unwrap();

tx_hashes.push(*tx_hash);
nullifiers.push(nullifier);

if batch.get_state() == BatchState::Inserted
|| batch.get_state() == BatchState::ReadyToUpdateTree {
let index_bytes = index.to_be_bytes();
use light_hasher::Hasher;
let leaf = *leaf;
let nullifier = Poseidon::hashv(&[&leaf, &index_bytes, tx_hash]).unwrap();
self.indexer.lock().await.update_test_indexer_in_nullification(self.merkle_tree, &nullifier, *index as usize).await;
}
}

// Generate proof
let inputs = get_batch_update_inputs::<26>(
old_root,
tx_hashes,
Expand Down Expand Up @@ -384,6 +388,10 @@ impl<R: RpcConnection, I: Indexer<R>> BatchedOperations<R, I> {
c: proof_c,
}
} else {
error!(
"get_batched_nullify_ix_data: failed to get proof from server: {:?}",
response.text().await
);
return Err(ForesterError::Custom(
"Failed to get proof from server".into(),
));
Expand Down Expand Up @@ -419,7 +427,7 @@ pub async fn process_batched_operations<R: RpcConnection, I: Indexer<R>>(
};

let processed_appends_count = ops.perform_batch_append().await?;
// let processed_nullifications_count = ops.perform_batch_nullify().await?;
let processed_nullifications_count = ops.perform_batch_nullify().await?;

Ok(processed_appends_count)
}
36 changes: 19 additions & 17 deletions test-programs/registry-test/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,6 @@ async fn test_custom_forester_batched() {
for i in 0..merkle_tree.get_account().queue.batch_size {
println!("\ntx {}", i);


println!("compressing");
e2e_env
.compress_sol_deterministic(&unregistered_forester_keypair, 1_000_000, None)
Expand Down Expand Up @@ -681,12 +680,15 @@ async fn test_custom_forester_batched() {
tree_params.input_queue_batch_size / tree_params.output_queue_zkp_batch_size;
for i in 0..num_output_zkp_batches {
// Simulate concurrency since instruction data has been created before
let instruction_data = if i == 0 {
instruction_data.clone()
} else {
None
};
println!("unregistered forester: {}", unregistered_forester_keypair.pubkey());
// let instruction_data = if i == 0 {
// instruction_data.clone()
// } else {
// None
// };
println!(
"unregistered forester: {}",
unregistered_forester_keypair.pubkey()
);
println!("env.forester: {}", env.forester.pubkey());
perform_batch_append(
&mut rpc,
Expand All @@ -700,16 +702,16 @@ async fn test_custom_forester_batched() {
.unwrap();
// We only spent half of the output queue
// if i < num_output_zkp_batches / 2 {
// perform_batch_nullify(
// &mut rpc,
// &mut state_merkle_tree_bundle,
// &env.forester,
// 0,
// false,
// None,
// )
// .await
// .unwrap();
perform_batch_nullify(
&mut rpc,
&mut state_merkle_tree_bundle,
&env.forester,
0,
false,
None,
)
.await
.unwrap();
// }
}
}
Expand Down
42 changes: 30 additions & 12 deletions test-utils/src/indexer/test_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,15 +589,8 @@ impl<R: RpcConnection + Send + Sync + 'static> Indexer<R> for TestIndexer<R> {
.find(|x| x.accounts.merkle_tree == merkle_tree_pubkey)
.unwrap();

let mut leaves_len = bundle.merkle_tree.leaves().len();

while leaves_len <= index {
while bundle.merkle_tree.leaves().len() <= index {
bundle.merkle_tree.append(&[0u8; 32]).unwrap();
leaves_len = bundle.merkle_tree.leaves().len();
}

for _ in 0..leaves_len {
println!("leaf[{}]: {:?}", index, bundle.merkle_tree.get_leaf(index));
}

let leaf = match bundle.merkle_tree.get_leaf(index) {
Expand All @@ -607,31 +600,41 @@ impl<R: RpcConnection + Send + Sync + 'static> Indexer<R> for TestIndexer<R> {
bundle.merkle_tree.get_leaf(index).unwrap()
}
};

let proof = bundle
.merkle_tree
.get_proof_of_leaf(index, true)
.unwrap()
.to_vec();
println!("leaf: {:?}", leaf);
ProofOfLeaf { leaf, proof }
}

/// leaf index, leaf, tx hash
fn get_leaf_indices_tx_hashes(
&self,
&mut self,
merkle_tree_pubkey: Pubkey,
zkp_batch_size: usize,
) -> Vec<(u32, [u8; 32], [u8; 32])> {
let state_merkle_tree_bundle = self
let mut state_merkle_tree_bundle = self
.state_merkle_trees
.iter()
.iter_mut()
.find(|x| x.accounts.merkle_tree == merkle_tree_pubkey)
.unwrap();

let leaf_indices_tx_hashes =
state_merkle_tree_bundle.input_leaf_indices[..zkp_batch_size].to_vec();
println!("leaf_indices_tx_hashes: {:?}", leaf_indices_tx_hashes);

for (index, leaf, tx_hash) in leaf_indices_tx_hashes.iter() {
while state_merkle_tree_bundle.merkle_tree.get_next_index() < (index + 2) as usize {
state_merkle_tree_bundle.merkle_tree.append(&[0u8; 32]).unwrap();
}
let index_bytes = index.to_be_bytes();
let leaf = *leaf;
let nullifier = Poseidon::hashv(&[&leaf, &index_bytes, tx_hash]).unwrap();
state_merkle_tree_bundle.merkle_tree.update(&nullifier, *index as usize).unwrap();
}

leaf_indices_tx_hashes
}

Expand Down Expand Up @@ -681,7 +684,12 @@ impl<R: RpcConnection + Send + Sync + 'static> Indexer<R> for TestIndexer<R> {
(max_num_zkp_updates, num_inserted_zkps, zkp_batch_size)
};

if num_inserted_zkps == 0 {
return;
}

let leaves = state_merkle_tree_bundle.output_queue_elements.to_vec();

let start = (num_inserted_zkps as usize - 1) * zkp_batch_size as usize;
let end = start + zkp_batch_size as usize;
let batch_update_leaves = leaves[start..end].to_vec();
Expand Down Expand Up @@ -730,6 +738,16 @@ impl<R: RpcConnection + Send + Sync + 'static> Indexer<R> for TestIndexer<R> {
}
}

async fn update_test_indexer_in_nullification(&mut self, merkle_tree_pubkey: Pubkey, nullifier: &[u8; 32], index: usize) {
let mut state_merkle_tree_bundle = self
.state_merkle_trees
.iter_mut()
.find(|x| x.accounts.merkle_tree == merkle_tree_pubkey)
.unwrap();
state_merkle_tree_bundle.input_leaf_indices.remove(0);
state_merkle_tree_bundle.merkle_tree.update(&nullifier, index).unwrap();
}

async fn update_test_indexer_after_nullification(
&mut self,
rpc: &mut R,
Expand Down
3 changes: 3 additions & 0 deletions test-utils/src/test_batch_forester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,9 @@ pub async fn get_batched_nullify_ix_data<Rpc: RpcConnection>(
let leaf = bundle.merkle_tree.get_leaf(index).unwrap();
old_leaves.push(leaf);
}

println!("old_leaf = {:?}", old_leaves.last().unwrap());
println!("leaf = {:?}", leaf);
// Handle case that we nullify a leaf which has not been inserted yet.
while bundle.merkle_tree.get_next_index() < index + 2 {
bundle.merkle_tree.append(&[0u8; 32]).unwrap();
Expand Down

0 comments on commit c9952ce

Please sign in to comment.