Skip to content

Commit

Permalink
rename schema modules
Browse files Browse the repository at this point in the history
  • Loading branch information
0o-de-lally committed Nov 22, 2024
1 parent a33076d commit 432dd42
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 159 deletions.
41 changes: 20 additions & 21 deletions src/extract_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ use std::path::Path;

use anyhow::Result;
use diem_types::account_view::AccountView;
use libra_backwards_compatibility::version_five::state_snapshot_v5::v5_accounts_from_manifest_path;
use libra_backwards_compatibility::version_five::{balance_v5::BalanceResourceV5, state_snapshot_v5::v5_accounts_from_manifest_path};
use libra_storage::read_snapshot::{accounts_from_snapshot_backup, load_snapshot_manifest};
use libra_types::exports::AccountAddress;
use log::{info, warn};

use crate::table_structs::{WarehouseAccount, WarehouseRecord, WarehouseTime};
use crate::schema_account_state::WarehouseAccState;

// uses libra-compatibility to parse the v5 manifest files, and decode v5 format bytecode into current version data structures (v6+);
pub async fn extract_v5_snapshot(v5_manifest_path: &Path) -> Result<Vec<WarehouseRecord>> {
pub async fn extract_v5_snapshot(v5_manifest_path: &Path) -> Result<Vec<WarehouseAccState>> {
let account_blobs = v5_accounts_from_manifest_path(v5_manifest_path).await?;
dbg!(&account_blobs.len());
let mut warehouse_state = vec![];
Expand All @@ -20,13 +21,17 @@ pub async fn extract_v5_snapshot(v5_manifest_path: &Path) -> Result<Vec<Warehous
Ok(a) => {
let address_literal = a.to_hex_literal();
let cast_address = AccountAddress::from_hex_literal(&address_literal)?;
let s = WarehouseRecord {
account: WarehouseAccount {
address: cast_address,
},
time: WarehouseTime::default(),
balance: None,
};
let mut s = WarehouseAccState::new(cast_address);

if let Some(r) = acc.get_diem_account_resource().ok() {
s.sequence_num = r.sequence_number();
}

if let Some(b) = acc.get_resource::<BalanceResourceV5>().ok() {
s.balance = Some(b.coin())
}


warehouse_state.push(s);
}
Err(e) => {
Expand All @@ -38,7 +43,7 @@ pub async fn extract_v5_snapshot(v5_manifest_path: &Path) -> Result<Vec<Warehous
Ok(warehouse_state)
}

pub async fn extract_current_snapshot(archive_path: &Path) -> Result<Vec<WarehouseRecord>> {
pub async fn extract_current_snapshot(archive_path: &Path) -> Result<Vec<WarehouseAccState>> {
let manifest_file = archive_path.join("state.manifest");
assert!(
manifest_file.exists(),
Expand All @@ -49,30 +54,24 @@ pub async fn extract_current_snapshot(archive_path: &Path) -> Result<Vec<Warehou

let accs = accounts_from_snapshot_backup(manifest, archive_path).await?;

// TODO: Change to log
println!("SUCCESS: backup loaded. # accounts: {}", &accs.len());
info!("SUCCESS: backup loaded. # accounts: {}", &accs.len());

// TODO: stream this
let mut warehouse_state = vec![];
for el in accs.iter() {
if let Some(address) = el.get_account_address()? {
let s = WarehouseRecord {
account: WarehouseAccount { address },
time: WarehouseTime::default(),
balance: None,
};
let s = WarehouseAccState::new(address);
warehouse_state.push(s);
}
}

// TODO: Change to log
println!(
info!(
"SUCCESS: accounts parsed. # accounts: {}",
&warehouse_state.len()
);

if warehouse_state.len() != accs.len() {
println!("WARN: account count does not match");
warn!("account count does not match");
}

Ok(warehouse_state)
Expand Down
2 changes: 1 addition & 1 deletion src/extract_transactions.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::table_structs::{RelationLabel, UserEventTypes, WarehouseEvent, WarehouseTxMaster};
use crate::schema_transaction::{RelationLabel, UserEventTypes, WarehouseEvent, WarehouseTxMaster};
use anyhow::Result;
use chrono::DateTime;
use diem_crypto::HashValue;
Expand Down
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ pub mod load_tx_cypher;
pub mod neo4j_init;
pub mod queue;
pub mod scan;
pub mod table_structs;
pub mod schema_account_state;
pub mod schema_transaction;
pub mod unzip_temp;
pub mod warehouse_cli;

Expand Down
2 changes: 1 addition & 1 deletion src/load_tx_cypher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{fmt::Display, thread, time::Duration};
use crate::{
cypher_templates::{write_batch_tx_string, write_batch_user_create},
queue,
table_structs::WarehouseTxMaster,
schema_transaction::WarehouseTxMaster,
};

/// response for the batch insert tx
Expand Down
33 changes: 33 additions & 0 deletions src/schema_account_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use libra_types::exports::AccountAddress;

#[derive(Debug, Clone)]
/// The basic information for an account
pub struct WarehouseAccState {
pub address: AccountAddress,
pub time: WarehouseTime,
pub sequence_num: u64,
pub balance: Option<u64>,
}

impl WarehouseAccState {
pub fn new(address: AccountAddress) -> Self {
Self {
address,
sequence_num: 0,
time: WarehouseTime::default(),
balance: None,
}
}
pub fn set_time(&mut self, timestamp: u64, version: u64, epoch: u64) {
self.time.timestamp = timestamp;
self.time.version = version;
self.time.epoch = epoch;
}
}
// holds timestamp, chain height, and epoch
#[derive(Debug, Clone, Default)]
pub struct WarehouseTime {
pub timestamp: u64,
pub version: u64,
pub epoch: u64,
}
94 changes: 2 additions & 92 deletions src/table_structs.rs → src/schema_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use diem_crypto::HashValue;
use diem_types::account_config::{DepositEvent, WithdrawEvent};
use libra_backwards_compatibility::sdk::v7_libra_framework_sdk_builder::EntryFunctionCall;
use libra_types::{exports::AccountAddress, move_resource::coin_register_event::CoinRegisterEvent};
use neo4rs::{BoltList, BoltMap, BoltType};
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -44,24 +43,9 @@ impl RelationLabel {
}
}

// TODO: deprecate?
#[derive(Debug, Clone)]
pub struct TransferTx {
pub tx_hash: HashValue,
pub to: AccountAddress,
pub amount: u64,
}

// TODO: deprecate?
#[derive(Debug, Clone)]
pub struct MiscTx {
pub tx_hash: HashValue, // primary key
pub data: serde_json::Value,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct WarehouseEvent {
pub tx_hash: HashValue, // primary key
pub tx_hash: HashValue,
pub event: UserEventTypes,
pub event_name: String,
pub data: serde_json::Value,
Expand All @@ -86,7 +70,7 @@ pub enum EntryFunctionArgs {

#[derive(Debug, Deserialize, Serialize)]
pub struct WarehouseTxMaster {
pub tx_hash: HashValue, // primary key
pub tx_hash: HashValue,
pub relation_label: RelationLabel,
pub sender: AccountAddress,
pub recipient: Option<AccountAddress>,
Expand Down Expand Up @@ -114,7 +98,6 @@ impl Default for WarehouseTxMaster {
block_datetime: DateTime::<Utc>::from_timestamp_micros(0).unwrap(),
expiration_timestamp: 0,
entry_function: None,
// args: json!(""),
events: vec![],
}
}
Expand Down Expand Up @@ -156,77 +139,4 @@ impl WarehouseTxMaster {
list_literal.pop(); // need to drop last comma ","
format!("[{}]", list_literal)
}

// NOTE: this seems to be memory inefficient.
// also creates a vendor lock-in with neo4rs instead of any open cypher.
// Hence the query templating
pub fn to_boltmap(&self) -> BoltMap {
let mut map = BoltMap::new();
map.put("tx_hash".into(), self.tx_hash.to_string().into());
map.put("sender".into(), self.sender.clone().to_hex_literal().into());
map.put(
"recipient".into(),
self.sender.clone().to_hex_literal().into(),
);

// TODO
// map.put("epoch".into(), self.epoch.into());
// map.put("round".into(), self.round.into());
// map.put("epoch".into(), self.epoch.into());
// map.put("block_timestamp".into(), self.block_timestamp.into());
// map.put(
// "expiration_timestamp".into(),
// self.expiration_timestamp.into(),
// );
map
}
/// how one might implement the bolt types.
pub fn slice_to_bolt_list(txs: &[Self]) -> BoltType {
let mut list = BoltList::new();
for el in txs {
let map = el.to_boltmap();
list.push(BoltType::Map(map));
}
BoltType::List(list)
}
}

#[derive(Debug, Clone)]
/// The basic information for an account
pub struct WarehouseRecord {
pub account: WarehouseAccount,
pub time: WarehouseTime,
pub balance: Option<WarehouseBalance>,
}

impl WarehouseRecord {
pub fn new(address: AccountAddress) -> Self {
Self {
account: WarehouseAccount { address },
time: WarehouseTime::default(),
balance: Some(WarehouseBalance::default()),
}
}
pub fn set_time(&mut self, timestamp: u64, version: u64, epoch: u64) {
self.time.timestamp = timestamp;
self.time.version = version;
self.time.epoch = epoch;
}
}
// holds timestamp, chain height, and epoch
#[derive(Debug, Clone, Default)]
pub struct WarehouseTime {
pub timestamp: u64,
pub version: u64,
pub epoch: u64,
}
#[derive(Debug, Clone)]
pub struct WarehouseAccount {
pub address: AccountAddress,
}

#[derive(Debug, Default, Clone)]
pub struct WarehouseBalance {
// balances in v6+ terms
pub balance: u64,
}
87 changes: 44 additions & 43 deletions tests/test_e2e_tx_neo4j.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use libra_forensic_db::cypher_templates::{write_batch_tx_string, write_batch_use
use libra_forensic_db::load::try_load_one_archive;
use libra_forensic_db::load_tx_cypher::tx_batch;
use libra_forensic_db::scan::scan_dir_archive;
use libra_forensic_db::table_structs::WarehouseTxMaster;
use libra_forensic_db::schema_transaction::WarehouseTxMaster;
use libra_forensic_db::{
extract_transactions::extract_current_transactions,
neo4j_init::{get_neo4j_localhost_pool, maybe_create_indexes},
Expand Down Expand Up @@ -199,45 +199,46 @@ async fn batch_users_create_unit() -> Result<()> {
Ok(())
}

#[ignore] // For reference deprecated in favor of string templates
#[tokio::test]
async fn test_bolt_serialize() -> Result<()> {
let c = start_neo4j_container();
let port = c.get_host_port_ipv4(7687);
let graph = get_neo4j_localhost_pool(port)
.await
.expect("could not get neo4j connection pool");
maybe_create_indexes(&graph).await?;

// Define a batch of transactions as a vector of HashMaps
let transactions = vec![WarehouseTxMaster::default()];
let bolt_list = WarehouseTxMaster::slice_to_bolt_list(&transactions);

// Build the query and add the transactions as a parameter
let cypher_query = query(
"UNWIND $transactions AS tx
MERGE (from:Account {address: tx.sender})
MERGE (to:Account {address: tx.recipient})
MERGE (from)-[:Tx {tx_hash: tx.tx_hash}]->(to)",
)
.param("transactions", bolt_list); // Pass the batch as a parameter

// Execute the query
graph.run(cypher_query).await?;

// get the sum of all transactions in db
let cypher_query = query(
"MATCH ()-[r:Tx]->()
RETURN count(r) AS total_tx_count",
);

// Execute the query
let mut result = graph.execute(cypher_query).await?;

// Fetch the first row only
let row = result.next().await?.unwrap();
let total_tx_count: i64 = row.get("total_tx_count").unwrap();
assert!(total_tx_count == 1);

Ok(())
}
// NOTE: Left commented for reference. Bolt types deprecated in favor of string templates
// #[ignore]
// #[tokio::test]
// async fn test_bolt_serialize() -> Result<()> {
// let c = start_neo4j_container();
// let port = c.get_host_port_ipv4(7687);
// let graph = get_neo4j_localhost_pool(port)
// .await
// .expect("could not get neo4j connection pool");
// maybe_create_indexes(&graph).await?;

// // Define a batch of transactions as a vector of HashMaps
// let transactions = vec![WarehouseTxMaster::default()];
// let bolt_list = WarehouseTxMaster::slice_to_bolt_list(&transactions);

// // Build the query and add the transactions as a parameter
// let cypher_query = query(
// "UNWIND $transactions AS tx
// MERGE (from:Account {address: tx.sender})
// MERGE (to:Account {address: tx.recipient})
// MERGE (from)-[:Tx {tx_hash: tx.tx_hash}]->(to)",
// )
// .param("transactions", bolt_list); // Pass the batch as a parameter

// // Execute the query
// graph.run(cypher_query).await?;

// // get the sum of all transactions in db
// let cypher_query = query(
// "MATCH ()-[r:Tx]->()
// RETURN count(r) AS total_tx_count",
// );

// // Execute the query
// let mut result = graph.execute(cypher_query).await?;

// // Fetch the first row only
// let row = result.next().await?.unwrap();
// let total_tx_count: i64 = row.get("total_tx_count").unwrap();
// assert!(total_tx_count == 1);

// Ok(())
// }

0 comments on commit 432dd42

Please sign in to comment.