Skip to content

Commit

Permalink
feat: Reduce testing concurrency in CI and add transaction id field i…
Browse files Browse the repository at this point in the history
…n logging

Reduce testing concurrency by setting maxConcurrency to 3 in the Rust emulator test.
Add a transaction id field in the logging for better tracking and debugging.
  • Loading branch information
pavadeli committed Mar 26, 2024
1 parent b8fa17e commit c40add2
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 71 deletions.
18 changes: 7 additions & 11 deletions crates/emulator-grpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use itertools::Itertools;
use tokio::sync::mpsc;
use tokio_stream::{once, wrappers::ReceiverStream, StreamExt};
use tonic::{async_trait, codec::CompressionEncoding, Code, Request, Response, Result, Status};
use tracing::{debug, debug_span, instrument, Instrument, Level};
use tracing::{debug, debug_span, field::display, instrument, Instrument, Level, Span};
use utils::error_in_stream;

#[macro_use]
Expand Down Expand Up @@ -78,7 +78,7 @@ impl Firestore for FirestoreEmulator {
/// same order that they were requested.
#[instrument(level = Level::DEBUG, skip_all, fields(
count = request.get_ref().documents.len(),
in_txn = display(is_txn(&request.get_ref().consistency_selector))
txn_id,
), err)]
async fn batch_get_documents(
&self,
Expand Down Expand Up @@ -114,6 +114,9 @@ impl Firestore for FirestoreEmulator {
s => (vec![], s.try_into()?),
};
debug!(?read_consistency);
if let ReadConsistency::Transaction(id) = read_consistency {
Span::current().record("txn_id", display(id));
}

let (tx, rx) = mpsc::channel(16);
tokio::spawn(
Expand Down Expand Up @@ -146,7 +149,7 @@ impl Firestore for FirestoreEmulator {
/// Commits a transaction, while optionally updating documents.
#[instrument(level = Level::DEBUG, skip_all, fields(
count = request.get_ref().writes.len(),
in_txn = !request.get_ref().transaction.is_empty(),
txn_id,
), err)]
async fn commit(&self, request: Request<CommitRequest>) -> Result<Response<CommitResponse>> {
let CommitRequest {
Expand All @@ -161,6 +164,7 @@ impl Firestore for FirestoreEmulator {
perform_writes(&database, writes).await?
} else {
let txn_id = transaction.try_into()?;
Span::current().record("txn_id", display(txn_id));
debug!(?txn_id);
database.commit(writes, txn_id).await?
};
Expand Down Expand Up @@ -584,11 +588,3 @@ async fn perform_writes(
});
Ok((time, write_results))
}

fn is_txn(selector: &Option<batch_get_documents_request::ConsistencySelector>) -> &'static str {
match selector {
Some(batch_get_documents_request::ConsistencySelector::Transaction(_)) => "true",
Some(batch_get_documents_request::ConsistencySelector::NewTransaction(_)) => "new",
Some(batch_get_documents_request::ConsistencySelector::ReadTime(_)) | None => "false",
}
}
36 changes: 23 additions & 13 deletions crates/firestore-database/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use tokio::sync::{
broadcast::{self, Receiver},
RwLock,
};
use tracing::{debug, instrument, Level, Span};
use tracing::{debug, field::display, instrument, Level, Span};

use self::{
collection::Collection,
Expand Down Expand Up @@ -70,22 +70,32 @@ impl FirestoreDatabase {
})
}

#[instrument(level = Level::DEBUG, skip_all, err, fields(in_txn = consistency.is_transaction(), found))]
#[instrument(level = Level::DEBUG, skip_all, err, fields(txn_id, found))]
pub async fn get_doc(
&self,
name: &DocumentRef,
consistency: ReadConsistency,
) -> Result<Option<Arc<StoredDocumentVersion>>> {
debug!(%name);
let version = if let Some(txn) = self.get_txn_for_consistency(consistency).await? {
txn.read_doc(name).await?
} else {
self.get_doc_meta(name)
let version = match consistency {
ReadConsistency::Default => self
.get_doc_meta(name)
.await?
.read()
.await?
.current_version()
.cloned(),
ReadConsistency::ReadTime(time) => self
.get_doc_meta(name)
.await?
.read()
.await?
.version_for_consistency(consistency)?
.cloned()
.version_at_time(time)
.cloned(),
ReadConsistency::Transaction(id) => {
Span::current().record("txn_id", display(id));
self.transactions.get(id).await?.read_doc(name).await?
}
};
Span::current().record("found", version.is_some());
Ok(version)
Expand Down Expand Up @@ -433,13 +443,13 @@ impl FirestoreDatabase {
pub async fn new_txn(&self, transaction_options: TransactionOptions) -> Result<TransactionId> {
use transaction_options::*;
match transaction_options.mode {
None => Ok(self
.transactions
.start_read_only(ReadConsistency::Default)
.await),
None => Ok(self.transactions.start_read_only(None).await),
Some(Mode::ReadOnly(ro)) => Ok(self
.transactions
.start_read_only(ro.consistency_selector.into())
.start_read_only(
ro.consistency_selector
.map(|read_only::ConsistencySelector::ReadTime(read_time)| read_time),
)
.await),
Some(Mode::ReadWrite(rw)) => {
if rw.retry_transaction.is_empty() {
Expand Down
13 changes: 1 addition & 12 deletions crates/firestore-database/src/database/document.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use tokio::{
};
use tracing::{debug, instrument, trace, warn, Level};

use super::{read_consistency::ReadConsistency, reference::DocumentRef};
use super::reference::DocumentRef;
use crate::{error::Result, FirestoreProject, GenericDatabaseError};

pub struct DocumentMeta {
Expand Down Expand Up @@ -125,17 +125,6 @@ impl DocumentContents {
.and_then(DocumentVersion::stored_document)
}

pub fn version_for_consistency(
&self,
consistency: ReadConsistency,
) -> Result<Option<&Arc<StoredDocumentVersion>>> {
Ok(match consistency {
ReadConsistency::Default => self.current_version(),
ReadConsistency::ReadTime(time) => self.version_at_time(time),
ReadConsistency::Transaction(_) => self.current_version(),
})
}

pub fn exists(&self) -> bool {
self.versions
.last()
Expand Down
13 changes: 1 addition & 12 deletions crates/firestore-database/src/database/read_consistency.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use googleapis::google::{
firestore::v1::{
batch_get_documents_request, get_document_request, list_documents_request,
run_aggregation_query_request, run_query_request, transaction_options,
run_aggregation_query_request, run_query_request,
},
protobuf::Timestamp,
};
Expand Down Expand Up @@ -56,14 +56,3 @@ impl_try_from_consistency_selector!(get_document_request);
impl_try_from_consistency_selector!(list_documents_request);
impl_try_from_consistency_selector!(run_query_request);
impl_try_from_consistency_selector!(run_aggregation_query_request);

impl From<Option<transaction_options::read_only::ConsistencySelector>> for ReadConsistency {
fn from(value: Option<transaction_options::read_only::ConsistencySelector>) -> Self {
match value {
Some(transaction_options::read_only::ConsistencySelector::ReadTime(time)) => {
Self::ReadTime(time)
}
None => Self::Default,
}
}
}
50 changes: 27 additions & 23 deletions crates/firestore-database/src/database/transaction.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use std::{
collections::{hash_map::Entry, HashMap},
fmt::Display,
sync::{
atomic::{self, AtomicUsize},
Arc, Weak,
},
};

use googleapis::google::protobuf::Timestamp;
use tokio::sync::{Mutex, RwLock};
use tracing::{instrument, Level};

Expand All @@ -14,10 +16,7 @@ use super::{
reference::DocumentRef,
FirestoreDatabase,
};
use crate::{
document::StoredDocumentVersion, error::Result, read_consistency::ReadConsistency,
GenericDatabaseError,
};
use crate::{document::StoredDocumentVersion, error::Result, GenericDatabaseError};

#[derive(Debug)]
pub(crate) struct RunningTransactions {
Expand Down Expand Up @@ -49,12 +48,12 @@ impl RunningTransactions {
id
}

pub(crate) async fn start_read_only(&self, consistency: ReadConsistency) -> TransactionId {
pub(crate) async fn start_read_only(&self, read_time: Option<Timestamp>) -> TransactionId {
let id = TransactionId::new();
let txn = Arc::new(Transaction::ReadOnly(ReadOnlyTransaction::new(
id,
Weak::clone(&self.database),
consistency,
read_time,
)));
self.txns.write().await.insert(id, txn);
id
Expand Down Expand Up @@ -112,6 +111,7 @@ impl Transaction {

#[derive(Debug)]
pub(crate) struct ReadWriteTransaction {
#[allow(dead_code)] // For logging
pub(crate) id: TransactionId,
database: Weak<FirestoreDatabase>,
guards: Mutex<HashMap<DocumentRef, Arc<OwnedDocumentContentsReadGuard>>>,
Expand All @@ -130,11 +130,7 @@ impl ReadWriteTransaction {
&self,
name: &DocumentRef,
) -> Result<Option<Arc<StoredDocumentVersion>>> {
Ok(self
.read_guard(name)
.await?
.version_for_consistency(ReadConsistency::Transaction(self.id))?
.cloned())
Ok(self.read_guard(name).await?.current_version().cloned())
}

pub(crate) async fn drop_remaining_guards(&self) {
Expand Down Expand Up @@ -179,36 +175,38 @@ impl ReadWriteTransaction {

#[derive(Debug)]
pub(crate) struct ReadOnlyTransaction {
#[allow(dead_code)] // Only useful in logging
#[allow(dead_code)] // For logging
pub(crate) id: TransactionId,
pub(crate) database: Weak<FirestoreDatabase>,
pub(crate) consistency: ReadConsistency,
pub(crate) read_time: Option<Timestamp>,
}

impl ReadOnlyTransaction {
pub(crate) fn new(
id: TransactionId,
database: Weak<FirestoreDatabase>,
consistency: ReadConsistency,
read_time: Option<Timestamp>,
) -> Self {
Self {
id,
database,
consistency,
read_time,
}
}

async fn read_doc(&self, name: &DocumentRef) -> Result<Option<Arc<StoredDocumentVersion>>> {
Ok(self
let doc = self
.database
.upgrade()
.ok_or_else(|| GenericDatabaseError::aborted("database was dropped"))?
.get_doc_meta(name)
.await?
.read()
.await?
.version_for_consistency(self.consistency)?
.cloned())
.await?;
let lock = doc.read().await?;
let version = match self.read_time {
Some(read_time) => lock.version_at_time(read_time),
None => lock.current_version(),
};
Ok(version.cloned())
}
}

Expand All @@ -225,17 +223,23 @@ impl TransactionId {
/// Check if the given [`TransactionId`] could have been issued by the currently running
/// instance. This doesn't guarantee that the given id is valid, but it prevents collisions with
/// future IDs.
fn check(self: TransactionId) -> Result<()> {
fn check(self) -> Result<()> {
if self.0 < NEXT_TXN_ID.load(atomic::Ordering::Relaxed) {
Ok(())
} else {
Err(GenericDatabaseError::InvalidArgument(format!(
"Invalid transaction ID, {self:?} has not been issued by this instance"
"Invalid transaction ID, {self} has not been issued by this instance"
)))
}
}
}

impl Display for TransactionId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}

impl TryFrom<Vec<u8>> for TransactionId {
type Error = GenericDatabaseError;

Expand Down

0 comments on commit c40add2

Please sign in to comment.