Skip to content

Commit

Permalink
refactor(jstz_node): mirgrate to new_address
Browse files Browse the repository at this point in the history
  • Loading branch information
ryutamago committed Jan 8, 2025
1 parent 94cfb5c commit b6b277b
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 16 deletions.
12 changes: 6 additions & 6 deletions crates/jstz_node/src/services/logs/broadcaster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{collections::HashMap, convert::Infallible, sync::Arc, time::Duration};

use axum::response::{sse, Sse};
use futures_util::future;
use jstz_proto::context::account::Address;
use jstz_proto::context::new_account::NewAddress;
use parking_lot::Mutex;
use tokio::sync::mpsc::{self, Sender};
use tokio::time::interval;
Expand All @@ -14,7 +14,7 @@ pub type InfallibleSSeStream = ReceiverStream<Result<sse::Event, Infallible>>;
/// Broadcasts messages to all connected clients through Server-sent Events
/// <https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events>.
pub struct Broadcaster {
clients: Mutex<HashMap<Address, Vec<Sender<InfallibleSseEvent>>>>, // TODO: Use a read-write lock instead?
clients: Mutex<HashMap<NewAddress, Vec<Sender<InfallibleSseEvent>>>>, // TODO: Use a read-write lock instead?
}

// Pings clients every 10 seconds
Expand Down Expand Up @@ -47,7 +47,7 @@ impl Broadcaster {
async fn remove_stale_clients(&self) {
let clients = self.clients.lock().clone();

let mut responsive_clients: HashMap<Address, Vec<Sender<InfallibleSseEvent>>> =
let mut responsive_clients: HashMap<NewAddress, Vec<Sender<InfallibleSseEvent>>> =
HashMap::new();

for (contract_address, senders) in clients {
Expand All @@ -72,7 +72,7 @@ impl Broadcaster {
/// Registers client with broadcaster, returning an SSE response body.
pub async fn new_client(
&self,
contract_address: Address,
function_address: NewAddress,
) -> Sse<InfallibleSSeStream> {
let (tx, rx) = mpsc::channel(10);

Expand All @@ -82,7 +82,7 @@ impl Broadcaster {

self.clients
.lock()
.entry(contract_address)
.entry(function_address)
.or_default()
.push(tx);

Expand All @@ -96,7 +96,7 @@ impl Broadcaster {
}

/// Broadcasts `msg` to all clients.
pub async fn broadcast(&self, contract_address: &Address, msg: &str) {
pub async fn broadcast(&self, contract_address: &NewAddress, msg: &str) {
let clients = self.clients.lock().clone();

if let Some(clients) = clients.get(contract_address) {
Expand Down
6 changes: 3 additions & 3 deletions crates/jstz_node/src/services/logs/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use anyhow::{anyhow, Result};
use jstz_api::js_log::LogLevel;
use jstz_crypto::public_key_hash::PublicKeyHash;
use jstz_proto::{
context::account::Address, js_logger::LogRecord, request_logger::RequestEvent,
context::new_account::NewAddress, js_logger::LogRecord, request_logger::RequestEvent,
};
use r2d2::{Pool, PooledConnection};
use r2d2_sqlite::SqliteConnectionManager;
Expand Down Expand Up @@ -103,7 +103,7 @@ impl Db {

pub async fn logs_by_address(
&self,
function_address: Address,
function_address: NewAddress,
limit: usize,
offset: usize,
) -> QueryResponseResult {
Expand All @@ -117,7 +117,7 @@ impl Db {

pub async fn logs_by_address_and_request_id(
&self,
function_address: Address,
function_address: NewAddress,
request_id: String,
) -> QueryResponseResult {
let conn = self.connection().await?;
Expand Down
20 changes: 13 additions & 7 deletions crates/jstz_node/src/services/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@ use axum::{
Json,
};
use broadcaster::InfallibleSSeStream;
use jstz_crypto::hash::Hash;
#[cfg(feature = "persistent-logging")]
use jstz_proto::request_logger::{
RequestEvent, REQUEST_END_PREFIX, REQUEST_START_PREFIX,
};
use jstz_proto::{
context::account::Address,
context::new_account::NewAddress,
js_logger::{LogRecord, LOG_PREFIX},
};
use serde::Deserialize;
Expand Down Expand Up @@ -53,14 +52,14 @@ mod persistent_logging {
extract::{Path, Query, State},
Json,
};
use jstz_proto::context::account::Address;
use jstz_proto::context::new_account::NewAddress;

pub async fn persistent_logs(
State(AppState { db, .. }): State<AppState>,
Path(address): Path<String>,
Query(Pagination { limit, offset }): Query<Pagination>,
) -> ServiceResult<Json<Vec<LogRecord>>> {
let address = Address::from_base58(&address)
let address = NewAddress::from_base58(&address)
.map_err(|e| ServiceError::BadRequest(e.to_string()))?;
let result = db.logs_by_address(address, offset, limit).await?;

Expand All @@ -72,7 +71,7 @@ mod persistent_logging {
Path(address): Path<String>,
Path(request_id): Path<String>,
) -> ServiceResult<Json<Vec<LogRecord>>> {
let address = Address::from_base58(&address)
let address = NewAddress::from_base58(&address)
.map_err(|e| ServiceError::BadRequest(e.to_string()))?;

let result = db
Expand Down Expand Up @@ -159,8 +158,10 @@ impl LogsService {
#[cfg(not(feature = "persistent-logging"))]
#[allow(irrefutable_let_patterns)]
if let Line::Js(log) = line {
// TODO: use smart function address once jstz-proto is updated
// https://linear.app/tezos/issue/JSTZ-261/use-newaddress-for-jstz-proto
broadcaster
.broadcast(&log.address, &line_str[LOG_PREFIX.len()..])
.broadcast(&NewAddress::User(log.address.clone()), &line_str[LOG_PREFIX.len()..])
.await;
}
}
Expand Down Expand Up @@ -230,8 +231,13 @@ async fn stream_log(
State(AppState { broadcaster, .. }): State<AppState>,
Path(address): Path<String>,
) -> ServiceResult<Sse<InfallibleSSeStream>> {
let address = Address::from_base58(&address)
let address = NewAddress::from_base58(&address)
.map_err(|e| ServiceError::BadRequest(e.to_string()))?;
// TODO: Add a check to see if the address is smart function
// https://linear.app/tezos/issue/JSTZ-260/add-validation-check-for-address-type
// address
// .check_is_smart_function()
// .map_err(|e| ServiceError::BadRequest(e.to_string()))?;
Ok(broadcaster.new_client(address).await)
}

Expand Down

0 comments on commit b6b277b

Please sign in to comment.