Skip to content

Commit

Permalink
Merge pull request #57 from skunkteam/paco/fix-clear-emulator-bug
Browse files Browse the repository at this point in the history
Improve clearEmulator behavior
  • Loading branch information
pavadeli authored Mar 22, 2024
2 parents 7462154 + fbd16d9 commit 58140ff
Showing 1 changed file with 46 additions and 2 deletions.
48 changes: 46 additions & 2 deletions crates/firestore-database/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,14 @@ impl Listener {
Some(database_event) = self.database_events.next() => {
match database_event {
// Database was dropped, reconnect to automatically create a new Database.
(db_name, None) => self.ensure_subscribed_to(&*self.project.database(&db_name).await),
(db_name, None) => {
let database = &*self.project.database(&db_name).await;
self.database_events.insert(
db_name,
StreamNotifyClose::new(BroadcastStream::new(database.subscribe())),
);
self.reset(database).await?;
},
// Received an event from the database.
(_, Some(Ok(event))) => self.process_event(&event).await?,
// Buffer overflow, we weren't able to keep up with the amount of events of one of the databases.
Expand Down Expand Up @@ -205,7 +212,24 @@ impl Listener {
self.send_complete(update_time).await
}

#[instrument(level = Level::TRACE, skip_all, err)]
async fn reset(&mut self, database: &FirestoreDatabase) -> Result<()> {
// We rely on the fact that this function will complete before any other events are
// processed. That's why we know for sure that the output stream is not used for
// something else until we respond with our NO_CHANGE msg. That msg means that everything is
// up to date until that point and this is (for now) the easiest way to make sure
// that is actually the case. This is probably okay, but if it becomes a hotspot we
// might look into optimizing later.
let Some(target) = &mut self.target else {
return Ok(());
};

let time = Timestamp::now();
let msgs = target.reset(database, time).await?;
self.send_all(msgs).await?;
self.send_complete(time).await?;
Ok(())
}

async fn set_document(
&mut self,
database: &FirestoreDatabase,
Expand Down Expand Up @@ -383,6 +407,17 @@ impl ListenerTarget {
ListenerTarget::QueryTarget(target) => target.process_event(database, event).await,
}
}

async fn reset(
&mut self,
database: &FirestoreDatabase,
time: Timestamp,
) -> Result<Vec<ResponseType>> {
match self {
ListenerTarget::DocumentTarget(target) => Ok(target.reset(time)),
ListenerTarget::QueryTarget(target) => target.reset(database, time).await,
}
}
}

struct DocumentTarget {
Expand Down Expand Up @@ -421,6 +456,15 @@ impl DocumentTarget {
};
Ok(vec![msg])
}

// TODO: Reuse code of initial setup of this target.
fn reset(&mut self, time: Timestamp) -> Vec<ResponseType> {
vec![ResponseType::DocumentDelete(DocumentDelete {
document: self.name.to_string(),
removed_target_ids: vec![TARGET_ID],
read_time: Some(time),
})]
}
}

struct QueryTarget {
Expand Down

0 comments on commit 58140ff

Please sign in to comment.