Skip to content

Commit

Permalink
add account state time
Browse files Browse the repository at this point in the history
  • Loading branch information
0o-de-lally committed Nov 22, 2024
1 parent e7d283b commit 7f84ea5
Show file tree
Hide file tree
Showing 11 changed files with 351 additions and 124 deletions.
48 changes: 48 additions & 0 deletions src/batch_tx_type.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use std::fmt::Display;

/// response for the batch insert tx
#[derive(Debug, Clone)]
pub struct BatchTxReturn {
pub unique_accounts: u64,
pub created_accounts: u64,
pub modified_accounts: u64,
pub unchanged_accounts: u64,
pub created_tx: u64,
}

impl Display for BatchTxReturn {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Total Transactions - unique accounts: {}, created accounts: {}, modified accounts: {}, unchanged accounts: {}, transactions created: {}",
self.unique_accounts,
self.created_accounts,
self.modified_accounts,
self.unchanged_accounts,
self.created_tx
)
}
}

impl Default for BatchTxReturn {
fn default() -> Self {
Self::new()
}
}

impl BatchTxReturn {
pub fn new() -> Self {
Self {
unique_accounts: 0,
created_accounts: 0,
modified_accounts: 0,
unchanged_accounts: 0,
created_tx: 0,
}
}
pub fn increment(&mut self, new: &BatchTxReturn) {
self.unique_accounts += new.unique_accounts;
self.created_accounts += new.created_accounts;
self.modified_accounts += new.modified_accounts;
self.unchanged_accounts += new.unchanged_accounts;
self.created_tx += new.created_tx;
}
}
61 changes: 53 additions & 8 deletions src/extract_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,36 @@ use std::path::Path;
use anyhow::Result;
use diem_types::account_view::AccountView;
use libra_backwards_compatibility::version_five::{
balance_v5::BalanceResourceV5, ol_wallet::SlowWalletResourceV5,
state_snapshot_v5::v5_accounts_from_manifest_path,
balance_v5::BalanceResourceV5,
ol_wallet::SlowWalletResourceV5,
state_snapshot_v5::{v5_accounts_from_manifest_path, v5_read_from_snapshot_manifest},
};
use libra_storage::read_snapshot::{accounts_from_snapshot_backup, load_snapshot_manifest};
use libra_types::exports::AccountAddress;
use libra_types::{
exports::AccountAddress,
move_resource::{libra_coin::LibraCoinStoreResource, wallet::SlowWalletResource},
};
use log::{error, info, warn};

use crate::schema_account_state::WarehouseAccState;
use crate::{
scan::FrameworkVersion,
schema_account_state::{WarehouseAccState, WarehouseTime},
};

// uses libra-compatibility to parse the v5 manifest files, and decode v5 format bytecode into current version data structures (v6+);
pub async fn extract_v5_snapshot(v5_manifest_path: &Path) -> Result<Vec<WarehouseAccState>> {
// NOTE: this is duplicated with next step.
let manifest_data = v5_read_from_snapshot_manifest(v5_manifest_path)?;
let account_blobs = v5_accounts_from_manifest_path(v5_manifest_path).await?;

// TODO: see below, massively inefficient
let time = WarehouseTime {
framework_version: FrameworkVersion::V5,
timestamp: 0,
version: manifest_data.version,
epoch: 0,
};

info!("account records found: {}", &account_blobs.len());
let mut warehouse_state = vec![];
for el in account_blobs.iter() {
Expand All @@ -26,14 +44,16 @@ pub async fn extract_v5_snapshot(v5_manifest_path: &Path) -> Result<Vec<Warehous
let cast_address = AccountAddress::from_hex_literal(&address_literal)?;
let mut s = WarehouseAccState::new(cast_address);

if let Some(r) = acc.get_diem_account_resource().ok() {
s.time = time.clone();

if let Ok(r) = acc.get_diem_account_resource() {
s.sequence_num = r.sequence_number();
}

if let Some(b) = acc.get_resource::<BalanceResourceV5>().ok() {
if let Ok(b) = acc.get_resource::<BalanceResourceV5>() {
s.balance = b.coin()
}
if let Some(sw) = acc.get_resource::<SlowWalletResourceV5>().ok() {
if let Ok(sw) = acc.get_resource::<SlowWalletResourceV5>() {
s.slow_wallet_locked = sw.unlocked;
s.slow_wallet_transferred = sw.transferred;
}
Expand All @@ -58,6 +78,15 @@ pub async fn extract_current_snapshot(archive_path: &Path) -> Result<Vec<Warehou
);
let manifest = load_snapshot_manifest(&manifest_file)?;

// TODO: this is not memory efficient, will be massively duplicating data
// when the insert query could just use the warehouse time, for entire state
let time = WarehouseTime {
version: manifest.version,
epoch: manifest.epoch,
framework_version: FrameworkVersion::V7,
timestamp: 0,
};

let accs = accounts_from_snapshot_backup(manifest, archive_path).await?;

info!("SUCCESS: backup loaded. # accounts: {}", &accs.len());
Expand All @@ -66,7 +95,23 @@ pub async fn extract_current_snapshot(archive_path: &Path) -> Result<Vec<Warehou
let mut warehouse_state = vec![];
for el in accs.iter() {
if let Some(address) = el.get_account_address()? {
let s = WarehouseAccState::new(address);
let mut s = WarehouseAccState::new(address);

s.time = time.clone();

if let Some(r) = el.get_account_resource()? {
s.sequence_num = r.sequence_number();
}

if let Some(b) = el.get_resource::<LibraCoinStoreResource>()? {
s.balance = b.coin();
}

if let Some(sw) = el.get_resource::<SlowWalletResource>()? {
s.slow_wallet_locked = sw.unlocked;
s.slow_wallet_transferred = sw.transferred;
}

warehouse_state.push(s);
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
pub mod batch_tx_type;
pub mod cypher_templates;
pub mod enrich_exchange_onboarding;
pub mod enrich_whitepages;
pub mod schema_exchange_orders;
pub mod extract_snapshot;
pub mod extract_transactions;
pub mod load;
Expand All @@ -12,6 +12,7 @@ pub mod neo4j_init;
pub mod queue;
pub mod scan;
pub mod schema_account_state;
pub mod schema_exchange_orders;
pub mod schema_transaction;
pub mod unzip_temp;
pub mod warehouse_cli;
Expand Down
19 changes: 17 additions & 2 deletions src/load.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use crate::{
batch_tx_type::BatchTxReturn,
extract_snapshot::{extract_current_snapshot, extract_v5_snapshot},
extract_transactions::extract_current_transactions,
load_tx_cypher::{self, BatchTxReturn},
load_account_state::snapshot_batch,
load_tx_cypher,
queue::{self, clear_queue, push_queue_from_archive_map},
scan::{ArchiveMap, ManifestInfo},
};
Expand Down Expand Up @@ -68,7 +71,19 @@ pub async fn try_load_one_archive(
let mut all_results = BatchTxReturn::new();
match man.contents {
crate::scan::BundleContent::Unknown => todo!(),
crate::scan::BundleContent::StateSnapshot => todo!(),
crate::scan::BundleContent::StateSnapshot => {
let snaps = match man.version {
crate::scan::FrameworkVersion::Unknown => todo!(),
crate::scan::FrameworkVersion::V5 => extract_v5_snapshot(&man.archive_dir).await?,
crate::scan::FrameworkVersion::V6 => {
extract_current_snapshot(&man.archive_dir).await?
}
crate::scan::FrameworkVersion::V7 => {
extract_current_snapshot(&man.archive_dir).await?
}
};
snapshot_batch(&snaps, pool, batch_size, &man.archive_id).await?;
}
crate::scan::BundleContent::Transaction => {
let (txs, _) = extract_current_transactions(&man.archive_dir).await?;
let batch_res =
Expand Down
75 changes: 69 additions & 6 deletions src/load_account_state.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,72 @@
use log::info;
use neo4rs::Graph;
use std::{thread, time::Duration};

use crate::{batch_tx_type::BatchTxReturn, queue, schema_account_state::WarehouseAccState};
use anyhow::{Context, Result};
use crate::schema_account_state::WarehouseAccState;
use log::{error, info, warn};
use neo4rs::Graph;

// TODO: code duplication
pub async fn snapshot_batch(
txs: &[WarehouseAccState],
pool: &Graph,
batch_size: usize,
archive_id: &str,
) -> Result<BatchTxReturn> {
let mut all_results = BatchTxReturn::new();

let chunks: Vec<&[WarehouseAccState]> = txs.chunks(batch_size).collect();

info!("archive: {}", archive_id);

for (i, c) in chunks.into_iter().enumerate() {
info!("batch #{}", i);
// double checking the status of the loading PER BATCH
// it could have been updated in the interim
// since the outer check in ingest_all, just checks
// all things completed prior to this run
// check if this is already completed, or should be inserted.
match queue::is_batch_complete(pool, archive_id, i).await {
Ok(Some(true)) => {
info!("...skipping, all batches loaded.");
// skip this one
continue;
}
Ok(Some(false)) => {
// keep going
}
_ => {
info!("...batch not found in queue, adding to queue.");

// no task found in db, add to queue
queue::update_task(pool, archive_id, false, i).await?;
}
}
info!("...loading to db");

match impl_batch_snapshot_insert(pool, c).await {
Ok(batch) => {
// dbg!(&batch);
all_results.increment(&batch);
// dbg!(&all_results);
queue::update_task(pool, archive_id, true, i).await?;
info!("...success");
}
Err(e) => {
let secs = 10;
error!("skipping batch, could not insert: {:?}", e);
warn!("waiting {} secs before retrying connection", secs);
thread::sleep(Duration::from_secs(secs));
}
};
}

Ok(all_results)
}

pub async fn impl_batch_snapshot_insert(
pool: &Graph,
batch_snapshots: &[WarehouseAccState],
) -> Result<()> {

) -> Result<BatchTxReturn> {
let list_str = WarehouseAccState::to_cypher_map(batch_snapshots);
let cypher_string = WarehouseAccState::cypher_batch_insert_str(&list_str);

Expand All @@ -28,5 +85,11 @@ pub async fn impl_batch_snapshot_insert(

info!("merged snapshots: {}", merged_snapshots);

Ok(())
Ok(BatchTxReturn {
unique_accounts: 0,
created_accounts: 0,
modified_accounts: 0,
unchanged_accounts: 0,
created_tx: merged_snapshots,
})
}
2 changes: 1 addition & 1 deletion src/load_exchange_orders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use log::{error, info, warn};
use neo4rs::{query, Graph};

use crate::{
schema_exchange_orders::{read_orders_from_file, ExchangeOrder},
queue,
schema_exchange_orders::{read_orders_from_file, ExchangeOrder},
};

pub async fn swap_batch(
Expand Down
50 changes: 2 additions & 48 deletions src/load_tx_cypher.rs
Original file line number Diff line number Diff line change
@@ -1,61 +1,15 @@
use anyhow::{Context, Result};
use log::{error, info, warn};
use neo4rs::{query, Graph};
use std::{fmt::Display, thread, time::Duration};
use std::{thread, time::Duration};

use crate::{
batch_tx_type::BatchTxReturn,
cypher_templates::{write_batch_tx_string, write_batch_user_create},
queue,
schema_transaction::WarehouseTxMaster,
};

/// response for the batch insert tx
#[derive(Debug, Clone)]
pub struct BatchTxReturn {
pub unique_accounts: u64,
pub created_accounts: u64,
pub modified_accounts: u64,
pub unchanged_accounts: u64,
pub created_tx: u64,
}

impl Display for BatchTxReturn {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Total Transactions - unique accounts: {}, created accounts: {}, modified accounts: {}, unchanged accounts: {}, transactions created: {}",
self.unique_accounts,
self.created_accounts,
self.modified_accounts,
self.unchanged_accounts,
self.created_tx
)
}
}

impl Default for BatchTxReturn {
fn default() -> Self {
Self::new()
}
}

impl BatchTxReturn {
pub fn new() -> Self {
Self {
unique_accounts: 0,
created_accounts: 0,
modified_accounts: 0,
unchanged_accounts: 0,
created_tx: 0,
}
}
pub fn increment(&mut self, new: &BatchTxReturn) {
self.unique_accounts += new.unique_accounts;
self.created_accounts += new.created_accounts;
self.modified_accounts += new.modified_accounts;
self.unchanged_accounts += new.unchanged_accounts;
self.created_tx += new.created_tx;
}
}

// TODO: code duplication with exchange order loading.
pub async fn tx_batch(
txs: &[WarehouseTxMaster],
Expand Down
Loading

0 comments on commit 7f84ea5

Please sign in to comment.