Skip to content

Commit

Permalink
Add detailed logging and adjust phase handling
Browse files Browse the repository at this point in the history
Enhanced the logging in `add_new_tree` function to improve traceability and added checks to ensure trees are only processed during the active phase. Adjusted test configurations and timing to stabilize test execution under photon.
  • Loading branch information
sergeytimoshin committed Sep 11, 2024
1 parent c00a7af commit 902ae3d
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 53 deletions.
103 changes: 53 additions & 50 deletions forester/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,55 +192,58 @@ impl<R: RpcConnection, I: Indexer<R>> EpochManager<R, I> {
}
Ok(())
}

async fn add_new_tree(&self, new_tree: TreeAccounts) -> Result<()> {
info!("Adding new tree: {:?}", new_tree);
let mut trees = self.trees.lock().await;
trees.push(new_tree);
drop(trees);

info!("New tree added to the list of trees");

let (current_slot, current_epoch) = self.get_current_slot_and_epoch().await?;
let phases = get_epoch_phases(&self.protocol_config, current_epoch);

// Check if we're currently in the active phase
if current_slot >= phases.active.start && current_slot < phases.active.end {
info!("Currently in active phase. Attempting to process the new tree immediately.");
info!("Recovering regitration info...");
if let Ok(mut epoch_info) = self.recover_registration_info(current_epoch).await {
info!("Recovered registration info for current epoch");
let tree_schedule = TreeForesterSchedule::new_with_schedule(
&new_tree,
current_slot,
&epoch_info.forester_epoch_pda,
&epoch_info.epoch_pda,
);
epoch_info.trees.push(tree_schedule.clone());

// Check if we're currently processing an epoch
if let Some(processing_flag) = self.processing_epochs.get(&current_epoch) {
if processing_flag.load(Ordering::SeqCst) {
let phases = get_epoch_phases(&self.protocol_config, current_epoch);
let self_clone = Arc::new(self.clone());

if current_slot >= phases.active.start && current_slot < phases.active.end {
if let Ok(mut epoch_info) = self.recover_registration_info(current_epoch).await
{
let tree_schedule = TreeForesterSchedule::new_with_schedule(
&new_tree,
current_slot,
info!("Spawning task to process new tree in current epoch");
tokio::spawn(async move {
if let Err(e) = self_clone
.process_queue(
&epoch_info.epoch,
&epoch_info.forester_epoch_pda,
&epoch_info.epoch_pda,
);
epoch_info.trees.push(tree_schedule.clone());

let self_clone = Arc::new(self.clone());
tokio::spawn(async move {
if let Err(e) = self_clone
.process_queue(
&epoch_info.epoch,
&epoch_info.forester_epoch_pda,
tree_schedule,
)
.await
{
error!("Error processing queue for new tree: {:?}", e);
}
});

info!(
"Injected new tree into current epoch {}: {:?}",
current_epoch, new_tree
);
tree_schedule,
)
.await
{
error!("Error processing queue for new tree: {:?}", e);
} else {
warn!("Failed to retrieve current epoch info for injecting new tree");
info!("Successfully processed new tree in current epoch");
}
} else {
info!("New tree will be included in the next epoch");
}
});

info!(
"Injected new tree into current epoch {}: {:?}",
current_epoch, new_tree
);
} else {
warn!("Failed to retrieve current epoch info for processing new tree");
}
} else {
info!("Not in active phase. New tree will be processed in the next active phase");
}

Ok(())
Expand Down Expand Up @@ -282,7 +285,7 @@ impl<R: RpcConnection, I: Indexer<R>> EpochManager<R, I> {
&self.slot_tracker,
next_phases.registration.start,
)
.await
.await
{
error!("Error waiting for next registration phase: {:?}", e);
continue;
Expand Down Expand Up @@ -495,7 +498,7 @@ impl<R: RpcConnection, I: Indexer<R>> EpochManager<R, I> {
&self.protocol_config,
&self.config.payer_keypair,
)
.await
.await
{
Ok(Some(epoch)) => {
debug!("Registered epoch: {:?}", epoch);
Expand Down Expand Up @@ -630,7 +633,7 @@ impl<R: RpcConnection, I: Indexer<R>> EpochManager<R, I> {
&self.config.payer_keypair.pubkey(),
epoch_info.epoch.epoch,
)
.0;
.0;
let existing_registration = rpc
.get_anchor_account::<ForesterEpochPda>(&forester_epoch_pda_pubkey)
.await?;
Expand All @@ -647,7 +650,7 @@ impl<R: RpcConnection, I: Indexer<R>> EpochManager<R, I> {
&self.config.payer_keypair.pubkey(),
&[&self.config.payer_keypair],
)
.await?;
.await?;
}
}

Expand All @@ -670,7 +673,7 @@ impl<R: RpcConnection, I: Indexer<R>> EpochManager<R, I> {
level = "debug",
skip(self, epoch_info),
fields(forester = %self.config.payer_keypair.pubkey(), epoch = epoch_info.epoch.epoch
))]
))]
async fn perform_active_work(&self, epoch_info: &ForesterEpochInfo) -> Result<()> {
info!("Performing active work");

Expand Down Expand Up @@ -787,7 +790,7 @@ impl<R: RpcConnection, I: Indexer<R>> EpochManager<R, I> {
&self.slot_tracker,
forester_slot.start_solana_slot,
)
.await?;
.await?;

let light_slot_timeout = {
let slot_length_u32 = u32::try_from(epoch_pda.protocol_config.slot_length)
Expand Down Expand Up @@ -922,7 +925,7 @@ impl<R: RpcConnection, I: Indexer<R>> EpochManager<R, I> {
&self.config.payer_keypair.pubkey(),
epoch_info.epoch.epoch,
)
.0;
.0;
if let Some(forester_epoch_pda) = rpc
.get_anchor_account::<ForesterEpochPda>(&forester_epoch_pda_pubkey)
.await?
Expand Down Expand Up @@ -955,9 +958,9 @@ impl<R: RpcConnection, I: Indexer<R>> EpochManager<R, I> {
Err(e) => {
if let RpcError::ClientError(client_error) = &e {
if let Some(TransactionError::InstructionError(
_,
InstructionError::Custom(error_code),
)) = client_error.get_transaction_error()
_,
InstructionError::Custom(error_code),
)) = client_error.get_transaction_error()
{
let reported_work_code = RegistryError::ForesterAlreadyReportedWork as u32;
let not_in_report_work_phase_code =
Expand Down Expand Up @@ -1003,7 +1006,7 @@ impl<R: RpcConnection, I: Indexer<R>> EpochManager<R, I> {
self.indexer.clone(),
tree_account,
)
.await
.await
}
TreeType::State => {
rollover_state_merkle_tree(
Expand All @@ -1012,7 +1015,7 @@ impl<R: RpcConnection, I: Indexer<R>> EpochManager<R, I> {
self.indexer.clone(),
tree_account,
)
.await
.await
}
};

Expand Down Expand Up @@ -1088,7 +1091,7 @@ pub async fn run_service<R: RpcConnection, I: Indexer<R>>(
slot_tracker.clone(),
new_tree_sender.clone(),
)
.await
.await
{
Ok(epoch_manager) => {
let epoch_manager: Arc<EpochManager<R, I>> = Arc::new(epoch_manager);
Expand Down
2 changes: 1 addition & 1 deletion forester/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub fn setup_telemetry() {

let file_env_filter = EnvFilter::new("info,forester=debug");
let stdout_env_filter =
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info,forester=debug"));

let stdout_layer = fmt::Layer::new()
.with_writer(std::io::stdout)
Expand Down
5 changes: 3 additions & 2 deletions forester/tests/photon_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,13 @@ async fn test_multiple_state_trees_with_photon() {
}
}

#[ignore = "Test fails possibly because of photon"]
#[tokio::test(flavor = "multi_thread", worker_threads = 32)]
async fn test_multiple_address_trees_with_photon() {
init(Some(LightValidatorConfig {
enable_indexer: true,
enable_prover: true,
enable_forester: false,
wait_time: 10,
wait_time: 20,
..LightValidatorConfig::default()
}))
.await;
Expand Down Expand Up @@ -136,6 +135,8 @@ async fn test_multiple_address_trees_with_photon() {

for i in 0..10 {
let address_tree_accounts = env.create_address_tree(Some(95)).await;
tokio::time::sleep(Duration::from_secs(2)).await;

info!("address_tree_accounts {:?}", address_tree_accounts);
info!(
"address_tree_accounts.merkle_tree {:?}",
Expand Down

0 comments on commit 902ae3d

Please sign in to comment.