Skip to content

Commit

Permalink
Add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
pavadeli committed Mar 26, 2024
1 parent 78efd14 commit b0f9c99
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 64 deletions.
31 changes: 14 additions & 17 deletions crates/firestore-database/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,19 +129,12 @@ impl FirestoreDatabase {
consistency: ReadConsistency,
) -> Result<Option<Arc<Transaction>>> {
if let ReadConsistency::Transaction(id) = consistency {
Ok(Some(self.get_txn(id).await?))
Ok(Some(self.transactions.get(id).await?))
} else {
Ok(None)
}
}

pub async fn get_txn(
&self,
id: TransactionId,
) -> Result<Arc<Transaction>, GenericDatabaseError> {
self.transactions.get(id).await
}

/// Get all the collections that reside directly under the given parent. This means that:
/// - the IDs will not contain a `/`
/// - the result will be empty if `parent` is a [`Ref::Collection`].
Expand Down Expand Up @@ -320,12 +313,7 @@ impl FirestoreDatabase {
transaction: TransactionId,
) -> Result<(Timestamp, Vec<WriteResult>)> {
let txn = self.transactions.get(transaction).await?;
let txn = &txn.as_read_write().ok_or_else(|| {
GenericDatabaseError::invalid_argument(format!(
"Transaction {transaction:?} is read-only"
))
})?;
let time = Timestamp::now();
let rw_txn = txn.as_read_write();

let mut write_results = vec![];
let mut updates = HashMap::new();
Expand All @@ -335,11 +323,20 @@ impl FirestoreDatabase {
for write in &writes {
let name = get_doc_name_from_write(write)?;
if let Entry::Vacant(entry) = write_guard_cache.entry(name.clone()) {
let txn = rw_txn.as_ref().ok_or_else(|| {
GenericDatabaseError::invalid_argument(
"Cannot modify entities in a read-only transaction",
)
})?;
entry.insert(txn.take_write_guard(&name).await?);
}
}

txn.drop_remaining_guards().await;
if let Some(txn) = rw_txn {
txn.drop_remaining_guards().await;
}

let time = Timestamp::now();

for write in writes {
let name = get_doc_name_from_write(&write)?;
Expand All @@ -351,7 +348,7 @@ impl FirestoreDatabase {
}
}

self.transactions.stop(transaction).await?;
self.transactions.remove(transaction).await?;

self.send_event(DatabaseEvent {
database: Arc::downgrade(self),
Expand Down Expand Up @@ -457,7 +454,7 @@ impl FirestoreDatabase {
}

pub async fn rollback(&self, transaction: TransactionId) -> Result<()> {
self.transactions.stop(transaction).await?;
self.transactions.remove(transaction).await?;
Ok(())
}

Expand Down
40 changes: 23 additions & 17 deletions crates/firestore-database/src/database/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,29 +77,31 @@ impl RunningTransactions {
}
}

pub(crate) async fn stop(&self, id: TransactionId) -> Result<()> {
pub(crate) async fn remove(&self, id: TransactionId) -> Result<Arc<Transaction>> {
self.txns.write().await.remove(&id).ok_or_else(|| {
GenericDatabaseError::invalid_argument(format!("invalid transaction ID: {}", id.0))
})?;
Ok(())
})
}
}

#[derive(Debug)]
pub enum Transaction {
pub(crate) enum Transaction {
ReadWrite(ReadWriteTransaction),
ReadOnly(ReadOnlyTransaction),
}

impl Transaction {
pub async fn read_doc(&self, name: &DocumentRef) -> Result<Option<Arc<StoredDocumentVersion>>> {
pub(crate) async fn read_doc(
&self,
name: &DocumentRef,
) -> Result<Option<Arc<StoredDocumentVersion>>> {
match self {
Transaction::ReadWrite(txn) => txn.read_doc(name).await,
Transaction::ReadOnly(txn) => txn.read_doc(name).await,
}
}

pub fn as_read_write(&self) -> Option<&ReadWriteTransaction> {
pub(crate) fn as_read_write(&self) -> Option<&ReadWriteTransaction> {
if let Self::ReadWrite(v) = self {
Some(v)
} else {
Expand All @@ -109,10 +111,10 @@ impl Transaction {
}

#[derive(Debug)]
pub struct ReadWriteTransaction {
pub id: TransactionId,
pub(crate) struct ReadWriteTransaction {
pub(crate) id: TransactionId,
database: Weak<FirestoreDatabase>,
guards: Mutex<HashMap<DocumentRef, Arc<OwnedDocumentContentsReadGuard>>>,
guards: Mutex<HashMap<DocumentRef, Arc<OwnedDocumentContentsReadGuard>>>,
}

impl ReadWriteTransaction {
Expand All @@ -124,19 +126,22 @@ impl ReadWriteTransaction {
}
}

pub async fn read_doc(&self, name: &DocumentRef) -> Result<Option<Arc<StoredDocumentVersion>>> {
pub(crate) async fn read_doc(
&self,
name: &DocumentRef,
) -> Result<Option<Arc<StoredDocumentVersion>>> {
Ok(self
.read_guard(name)
.await?
.version_for_consistency(ReadConsistency::Transaction(self.id))?
.cloned())
}

pub async fn drop_remaining_guards(&self) {
pub(crate) async fn drop_remaining_guards(&self) {
self.guards.lock().await.clear();
}

pub async fn take_write_guard(
pub(crate) async fn take_write_guard(
&self,
name: &DocumentRef,
) -> Result<OwnedDocumentContentsWriteGuard> {
Expand Down Expand Up @@ -173,14 +178,15 @@ impl ReadWriteTransaction {
}

#[derive(Debug)]
pub struct ReadOnlyTransaction {
pub id: TransactionId,
pub database: Weak<FirestoreDatabase>,
pub consistency: ReadConsistency,
pub(crate) struct ReadOnlyTransaction {
#[allow(dead_code)] // Only useful in logging
pub(crate) id: TransactionId,
pub(crate) database: Weak<FirestoreDatabase>,
pub(crate) consistency: ReadConsistency,
}

impl ReadOnlyTransaction {
pub fn new(
pub(crate) fn new(
id: TransactionId,
database: Weak<FirestoreDatabase>,
consistency: ReadConsistency,
Expand Down
57 changes: 27 additions & 30 deletions test-suite/tests/3-basic-transaction.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,64 +5,61 @@ import { AsyncLocalStorage } from 'async_hooks';
import { range } from 'lodash';
import { setTimeout as time } from 'timers/promises';
import { fs } from './utils';
import { writeData } from './utils/firestore';

describe('concurrent tests', () => {
// no concurrent tests with the Java Emulator..
if (fs.connection === 'JAVA EMULATOR') {
test.concurrent = test;
}
const concurrent = fs.connection === 'JAVA EMULATOR' ? test : test.concurrent;

test.concurrent('simple txn', async () => {
concurrent('simple txn', async () => {
const [docRef1] = refs();

await fs.firestore.runTransaction(async txn => {
expect(await txn.get(docRef1)).toHaveProperty('exists', false);

txn.set(docRef1, writeData({ foo: 'bar' }));
txn.set(docRef1, fs.writeData({ foo: 'bar' }));

expect(() => txn.get(docRef1)).toThrow('Firestore transactions require all reads to be executed before all writes.');
});
expect(await getData(docRef1.get())).toEqual({ foo: 'bar' });
});

test.concurrent('updating same doc multiple times', async () => {
concurrent('updating same doc multiple times', async () => {
const [docRef1] = refs();

await fs.firestore.runTransaction(async txn => {
expect(await txn.get(docRef1)).toHaveProperty('exists', false);

txn.set(docRef1, writeData({ foo: fs.exported.FieldValue.increment(1) }));
txn.set(docRef1, fs.writeData({ foo: fs.exported.FieldValue.increment(1) }));
txn.update(docRef1, { foo: fs.exported.FieldValue.increment(1) });
});
expect(await getData(docRef1.get())).toEqual({ foo: 2 });
});

test.concurrent('using txn.getAll', async () => {
concurrent('using txn.getAll', async () => {
const [docRef1, docRef2] = refs();

await docRef1.set(writeData({ some: 'data' }));
await docRef1.set(fs.writeData({ some: 'data' }));

await fs.firestore.runTransaction(async txn => {
const [snap1, snap2] = await txn.getAll(docRef1, docRef2);

expect(await getData(snap1)).toEqual({ some: 'data' });
expect(snap2.exists).toBeFalse();

txn.set(docRef2, writeData({ foo: 'bar' }));
txn.set(docRef2, fs.writeData({ foo: 'bar' }));
});
expect(await getData(docRef1.get())).toEqual({ some: 'data' });
expect(await getData(docRef2.get())).toEqual({ foo: 'bar' });
});

test.concurrent('aborting transaction', async () => {
concurrent('aborting transaction', async () => {
const [docRef1] = refs();

await expect(
fs.firestore.runTransaction(async txn => {
expect(await txn.get(docRef1)).toHaveProperty('exists', false);

txn.set(docRef1, writeData({ foo: 'bar' }));
txn.set(docRef1, fs.writeData({ foo: 'bar' }));

throw new Error('I quit!');
}),
Expand All @@ -76,10 +73,10 @@ describe('concurrent tests', () => {
// - 10 ABORTED: Transaction lock timeout
// - inconsistent number of `tries`
fs.notImplementedInJava ||
test.concurrent('retry if document is locked', async () => {
concurrent('retry if document is locked', async () => {
const [docRef1] = refs();

await docRef1.set(writeData({ some: 'data' }));
await docRef1.set(fs.writeData({ some: 'data' }));

await runTxn('outer', [docRef1], async () => {
const { innerTxnCompleted } = await innerTxn('inner', [docRef1]);
Expand All @@ -95,7 +92,7 @@ describe('concurrent tests', () => {
});

fs.notImplementedInJava ||
test.concurrent('lock on non-existing document', async () => {
concurrent('lock on non-existing document', async () => {
const [docRef1] = refs();

await runTxn('outer', [docRef1], async () => {
Expand All @@ -110,7 +107,7 @@ describe('concurrent tests', () => {
});
});

test.concurrent('no lock if getting separate documents', async () => {
concurrent('no lock if getting separate documents', async () => {
const [docRef1, docRef2] = refs();

await runTxn('outer', [docRef1], async () => {
Expand All @@ -122,7 +119,7 @@ describe('concurrent tests', () => {
});

// Note: Very slow on Cloud Firestore!!
test.concurrent(
concurrent(
'chaos',
async () => {
const [docRef1, docRef2] = refs();
Expand All @@ -147,7 +144,7 @@ describe('concurrent tests', () => {
45_000,
);

test.concurrent('only read locked document', async () => {
concurrent('only read locked document', async () => {
const [docRef1, docRef2] = refs();

await docRef1.create(fs.writeData());
Expand All @@ -167,10 +164,10 @@ describe('concurrent tests', () => {
});
});

test.concurrent('regular `set` waits on transaction', async () => {
concurrent('regular `set` waits on transaction', async () => {
const [docRef1] = refs();

await docRef1.set(writeData({ some: 'data' }));
await docRef1.set(fs.writeData({ some: 'data' }));

let setDone = false;
await runTxn('outer', [docRef1], async () => {
Expand All @@ -183,7 +180,7 @@ describe('concurrent tests', () => {
});

describe('tests with synchronized processes', () => {
test.concurrent('reading the same doc from different txns', async () => {
concurrent('reading the same doc from different txns', async () => {
const test = new ConcurrentTest();
// Scenario:
// Process 1 - create doc
Expand Down Expand Up @@ -248,7 +245,7 @@ describe('concurrent tests', () => {
]);
});

test.concurrent('reading the same doc from different txns, try to write in second txn', async () => {
concurrent('reading the same doc from different txns, try to write in second txn', async () => {
const test = new ConcurrentTest();
// Scenario:
// Process 1 - create doc
Expand Down Expand Up @@ -326,7 +323,7 @@ describe('concurrent tests', () => {
]);
});

test.concurrent('reading the same doc from different txns, try to write in both txns', async () => {
concurrent('reading the same doc from different txns, try to write in both txns', async () => {
const test = new ConcurrentTest();
// Scenario:
// Process 1 - create doc
Expand Down Expand Up @@ -411,7 +408,7 @@ describe('concurrent tests', () => {
]);
});

test.concurrent('regular writes also wait until all txns-locks are released', async () => {
concurrent('regular writes also wait until all txns-locks are released', async () => {
const test = new ConcurrentTest();
// Scenario:
// Process 1 - create outside txn (doc 1), completes immediately.
Expand Down Expand Up @@ -538,7 +535,7 @@ describe('concurrent tests', () => {
fs.notImplementedInRust ||
fs.notImplementedInJava ||
fs.notImplementedInCloud ||
test.concurrent('deadlock', async () => {
concurrent('deadlock', async () => {
const test = new ConcurrentTest(40_000);

const [ref1, ref2] = refs();
Expand Down Expand Up @@ -585,7 +582,7 @@ describe('concurrent tests', () => {
});

describe('queries', () => {
test.concurrent('update after reading a query', async () => {
concurrent('update after reading a query', async () => {
const testName = 'update';

const [docRef1, docRef2] = refs();
Expand Down Expand Up @@ -631,7 +628,7 @@ describe('concurrent tests', () => {
]);
});

test.concurrent('delete after reading a query', async () => {
concurrent('delete after reading a query', async () => {
const testName = 'delete';

const [docRef1, docRef2] = refs();
Expand Down Expand Up @@ -676,7 +673,7 @@ describe('concurrent tests', () => {

fs.notImplementedInJava || // Timeout
fs.notImplementedInRust || // Timeout
test.concurrent('create after reading a query', async () => {
concurrent('create after reading a query', async () => {
const testName = 'create';

const [docRef1, docRef2] = refs();
Expand Down Expand Up @@ -769,7 +766,7 @@ describe('concurrent tests', () => {
({ awaitAfterTxn } = (await runAfterGet()) ?? {});

for (const ref of write) {
txn.set(ref, writeData({ [name]: { tries } }), { merge: true });
txn.set(ref, fs.writeData({ [name]: { tries } }), { merge: true });
}
});
await awaitAfterTxn;
Expand Down
Loading

0 comments on commit b0f9c99

Please sign in to comment.