Skip to content

Commit

Permalink
Always have conn as the first argument (#254)
Browse files Browse the repository at this point in the history
  • Loading branch information
neekolas authored Sep 26, 2023
1 parent 108e467 commit b5cd33e
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 110 deletions.
13 changes: 8 additions & 5 deletions xmtp/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,14 @@ where
#[cfg(test)]
AccountStrategy::ExternalAccount(a) => a,
};
store.insert_or_ignore_user(StoredUser {
user_address: account.addr(),
created_at: now(),
last_refreshed: 0,
})?;
store.insert_or_ignore_user(
&mut store.conn()?,
StoredUser {
user_address: account.addr(),
created_at: now(),
last_refreshed: 0,
},
)?;

Ok(Client::new(api_client, self.network, account, store))
}
Expand Down
57 changes: 28 additions & 29 deletions xmtp/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ where
&self,
user_address: &str,
) -> Result<(), ClientError> {
let user = self.store.get_user(user_address)?;
let user = self.store.get_user(&mut self.store.conn()?, user_address)?;
if user.is_none() || user.unwrap().last_refreshed < now() - INSTALLATION_REFRESH_INTERVAL_NS
{
self.refresh_user_installations(user_address).await?;
Expand All @@ -200,14 +200,15 @@ where

let self_install_id = key_fingerprint(&self.account.identity_keys().curve25519);
let contacts = self.get_contacts(user_address).await?;
let conn = &mut self.store.conn()?;
debug!(
"Fetched contacts for address {}: {:?}",
user_address, contacts
);

let installation_map = self
.store
.get_installations(&mut self.store.conn()?, user_address)?
.get_installations(conn, user_address)?
.into_iter()
.map(|v| (v.installation_id.clone(), v))
.collect::<HashMap<_, _>>();
Expand All @@ -223,37 +224,35 @@ where
user_address, new_installs
);

self.store
.conn()?
.transaction(|transaction_manager| -> Result<(), ClientError> {
self.store.insert_or_ignore_user_with_conn(
conn.transaction(|transaction_manager| -> Result<(), ClientError> {
self.store.insert_or_ignore_user(
transaction_manager,
StoredUser {
user_address: user_address.to_string(),
created_at: now(),
last_refreshed: refresh_timestamp,
},
)?;
for install in new_installs {
info!("Saving Install {}", install.installation_id);
let session = self.create_uninitialized_session(&install.get_contact()?)?;

self.store
.insert_or_ignore_install(transaction_manager, install)?;
self.store.insert_or_ignore_session(
transaction_manager,
StoredUser {
user_address: user_address.to_string(),
created_at: now(),
last_refreshed: refresh_timestamp,
},
StoredSession::try_from(&session)?,
)?;
for install in new_installs {
info!("Saving Install {}", install.installation_id);
let session = self.create_uninitialized_session(&install.get_contact()?)?;

self.store
.insert_or_ignore_install(install, transaction_manager)?;
self.store.insert_or_ignore_session(
StoredSession::try_from(&session)?,
transaction_manager,
)?;
}
}

self.store.update_user_refresh_timestamp(
transaction_manager,
user_address,
refresh_timestamp,
)?;
self.store.update_user_refresh_timestamp(
transaction_manager,
user_address,
refresh_timestamp,
)?;

Ok(())
})?;
Ok(())
})?;

Ok(())
}
Expand Down
21 changes: 16 additions & 5 deletions xmtp/src/conversation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ where
) -> Result<(), ConversationError> {
let peer_address = peer_addr_from_convo_id(convo_id, &client.wallet_address())?;
let created_at = now();
client.store.insert_or_ignore_user_with_conn(
client.store.insert_or_ignore_user(
conn,
StoredUser {
user_address: peer_address.clone(),
Expand All @@ -124,7 +124,7 @@ where
},
)?;

client.store.insert_or_ignore_conversation_with_conn(
client.store.insert_or_ignore_conversation(
conn,
StoredConversation {
peer_address,
Expand Down Expand Up @@ -203,11 +203,19 @@ mod tests {
let client = gen_test_client().await;
let peer_address = "0x000";
let convo_id = format!(":{}:{}", peer_address, client.wallet_address());
assert!(client.store.get_conversation(&convo_id).unwrap().is_none());
assert!(client
.store
.get_conversation(&mut client.store.conn().unwrap(), &convo_id)
.unwrap()
.is_none());

let conversation = gen_test_conversation(&client, peer_address).await;
assert!(conversation.peer_address() == peer_address);
assert!(client.store.get_conversation(&convo_id).unwrap().is_some());
assert!(client
.store
.get_conversation(&mut client.store.conn().unwrap(), &convo_id)
.unwrap()
.is_some());
}

#[tokio::test]
Expand All @@ -216,7 +224,10 @@ mod tests {
let conversation = gen_test_conversation(&client, "0x000").await;
conversation.send_text("Hello, world!").await.unwrap();

let message = &client.store.get_unprocessed_messages().unwrap()[0];
let message = &client
.store
.get_unprocessed_messages(&mut client.store.conn().unwrap())
.unwrap()[0];
let content = EncodedContent::decode(&message.content[..]).unwrap();
assert!(TextCodec::decode(content).unwrap() == "Hello, world!");
}
Expand Down
29 changes: 17 additions & 12 deletions xmtp/src/conversations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use xmtp_proto::xmtp::{
};

use crate::{
conversation::{peer_addr_from_convo_id, ConversationError, Conversation},
conversation::{peer_addr_from_convo_id, Conversation, ConversationError},
message::DecodedInboundMessage,
session::SessionManager,
storage::{
Expand Down Expand Up @@ -72,9 +72,10 @@ impl<A: XmtpApiClient> Conversations<A> {
pub fn save_inbound_messages(client: &Client<A>) -> Result<(), ConversationError> {
let inbound_topic = build_installation_message_topic(&client.installation_id());

client
.store
.lock_refresh_job(RefreshJobKind::Message, |conn, job| {
client.store.lock_refresh_job(
&mut client.store.conn()?,
RefreshJobKind::Message,
|conn, job| {
log::debug!(
"Refresh messages start time: {}",
Conversations::<A>::get_start_time(&job).unsigned_abs()
Expand All @@ -94,7 +95,8 @@ impl<A: XmtpApiClient> Conversations<A> {
}

Ok(())
})?;
},
)?;

Ok(())
}
Expand Down Expand Up @@ -138,7 +140,7 @@ impl<A: XmtpApiClient> Conversations<A> {

let existing_sessions = client
.store
.get_latest_sessions_for_installation(&payload.sender_installation_id, conn)?;
.get_latest_sessions_for_installation(conn, &payload.sender_installation_id)?;

// Attempt to decrypt with existing sessions
for raw_session in existing_sessions {
Expand Down Expand Up @@ -294,12 +296,12 @@ impl<A: XmtpApiClient> Conversations<A> {
|transaction| -> Result<(), ConversationError> {
let my_sessions = client
.store
.get_latest_sessions(&client.wallet_address(), transaction)?;
.get_latest_sessions(transaction, &client.wallet_address())?;
let their_user_addr =
peer_addr_from_convo_id(&message.convo_id, &client.wallet_address())?;
let their_sessions = client
.store
.get_latest_sessions(&their_user_addr, transaction)?;
.get_latest_sessions(transaction, &their_user_addr)?;
if their_sessions.is_empty() {
return Err(ConversationError::NoSessions(their_user_addr));
}
Expand All @@ -321,11 +323,11 @@ impl<A: XmtpApiClient> Conversations<A> {
}

client.store.commit_outbound_payloads_for_message(
transaction,
message.id,
MessageState::LocallyCommitted,
outbound_payloads,
updated_sessions,
transaction,
)?;
Ok(())
},
Expand All @@ -339,7 +341,9 @@ impl<A: XmtpApiClient> Conversations<A> {
client
.refresh_user_installations_if_stale(&client.wallet_address())
.await?;
let mut messages = client.store.get_unprocessed_messages()?;
let mut messages = client
.store
.get_unprocessed_messages(&mut client.store.conn()?)?;
log::debug!("Processing {} messages", messages.len());
messages.sort_by(|a, b| a.created_at.cmp(&b.created_at));
for message in messages {
Expand All @@ -359,6 +363,7 @@ impl<A: XmtpApiClient> Conversations<A> {

pub async fn publish_outbound_payloads(client: &Client<A>) -> Result<(), ConversationError> {
let unsent_payloads = client.store.fetch_and_lock_outbound_payloads(
&mut client.store.conn()?,
OutboundPayloadState::Pending,
Duration::from_secs(60).as_nanos() as i64,
)?;
Expand Down Expand Up @@ -387,6 +392,7 @@ impl<A: XmtpApiClient> Conversations<A> {
.map(|payload| payload.created_at_ns)
.collect();
client.store.update_and_unlock_outbound_payloads(
&mut client.store.conn()?,
payload_ids,
OutboundPayloadState::ServerAcknowledged,
)?;
Expand Down Expand Up @@ -418,8 +424,7 @@ mod tests {
let alice_client = gen_test_client().await;
let bob_client = gen_test_client().await;
let conversation =
Conversation::new(&alice_client, bob_client.wallet_address().to_string())
.unwrap();
Conversation::new(&alice_client, bob_client.wallet_address().to_string()).unwrap();
assert_eq!(conversation.peer_address(), bob_client.wallet_address());
}

Expand Down
Loading

0 comments on commit b5cd33e

Please sign in to comment.