Skip to content
This repository has been archived by the owner on Jul 22, 2022. It is now read-only.

Commit

Permalink
fix(indexer): correctly handle adding history events
Browse files Browse the repository at this point in the history
  • Loading branch information
stoically committed Feb 16, 2020
1 parent 75c4dda commit 770674e
Showing 1 changed file with 145 additions and 73 deletions.
218 changes: 145 additions & 73 deletions native/src/indexer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::path::PathBuf;
use std::path::{Path, PathBuf};

use serde_json::{json, Value};
use seshat::{
Expand Down Expand Up @@ -35,7 +35,8 @@ pub(crate) fn handle_message(radical: &mut Radical, message: Value) -> Result<Va
let res = match indexer {
None => match method {
MessageMethod::InitEventIndex => {
let indexer = Indexer::new(event_store, &message)?;
let config = Indexer::message_to_config(&message);
let indexer = Indexer::new(event_store, config)?;
radical.indexer.insert(event_store.to_owned(), indexer);
json!(null)
}
Expand All @@ -46,12 +47,22 @@ pub(crate) fn handle_message(radical: &mut Radical, message: Value) -> Result<Va
MessageMethod::LoadCheckpoints => indexer.load_checkpoints()?,
MessageMethod::IsEventIndexEmpty => indexer.is_event_index_empty()?,
MessageMethod::CommitLiveEvents => indexer.commit_live_events()?,
MessageMethod::AddEventToIndex => indexer.add_event_to_index(message)?,
MessageMethod::AddCrawlerCheckpoint => indexer.add_crawler_checkpoint(message)?,
MessageMethod::AddHistoricEvents => indexer.add_history_events(message)?,
MessageMethod::RemoveCrawlerCheckpoint => indexer.remove_crawler_checkpoint(message)?,
MessageMethod::SearchEventIndex => indexer.search_event_index(message)?,
MessageMethod::LoadFileEvents => indexer.load_file_events(message)?,
MessageMethod::AddEventToIndex => {
indexer.add_event_to_index(get!(message, "content"))?
}
MessageMethod::AddCrawlerCheckpoint => {
indexer.add_history_events(get!(message, "content"))?
}
MessageMethod::AddHistoricEvents => {
indexer.add_history_events(get!(message, "content"))?
}
MessageMethod::RemoveCrawlerCheckpoint => {
indexer.remove_crawler_checkpoint(get!(message, "content"))?
}
MessageMethod::SearchEventIndex => {
indexer.search_event_index(get!(message, "content"))?
}
MessageMethod::LoadFileEvents => indexer.load_file_events(get!(message, "content"))?,
MessageMethod::GetStats => indexer.get_stats()?,
MessageMethod::CloseEventIndex => {
radical.indexer.remove(event_store);
Expand All @@ -76,7 +87,23 @@ pub(crate) struct Indexer {
}

impl Indexer {
pub fn new(event_store: &str, message: &Value) -> Result<Indexer, Error> {
pub fn new(event_store: &str, config: Config) -> Result<Indexer, Error> {
let path = Indexer::event_store_path(event_store)?;
std::fs::create_dir_all(&path)?;

Ok(Indexer::new_in_path(path, config)?)
}

pub fn new_in_path(path: PathBuf, config: Config) -> Result<Indexer, Error> {
let database = Database::new_with_config(path, &config)?;
let connection = database.get_connection()?;
Ok(Indexer {
database,
connection,
})
}

fn message_to_config(message: &Value) -> Config {
let mut config = Config::new();
config = config.set_passphrase(
message["passphrase"]
Expand All @@ -88,15 +115,7 @@ impl Indexer {
config = config.set_language(&language);
}

let path = Indexer::event_store_path(event_store)?;
std::fs::create_dir_all(&path)?;

let database = Database::new_with_config(path, &config)?;
let connection = database.get_connection()?;
Ok(Indexer {
database,
connection,
})
config
}

fn event_store_path(event_store: &str) -> Result<PathBuf, Error> {
Expand Down Expand Up @@ -147,34 +166,26 @@ impl Indexer {
Ok(json!(true))
}

fn add_event_to_index(&mut self, message: Value) -> Result<Value, Error> {
let message_content = get!(message, "content");
let event_json = get!(message_content, "ev");
let profile_json = get!(message_content, "profile");
fn add_event_to_index(&mut self, message: &Value) -> Result<Value, Error> {
let event_json = get!(message, "ev");
let profile_json = get!(message, "profile");
let (event, profile) = parse_event(&event_json, &profile_json)?;
self.database.add_event(event, profile);
Ok(json!(null))
}

fn add_crawler_checkpoint(&mut self, message: Value) -> Result<Value, Error> {
let message_content = get!(message, "content");
let new_checkpoint: Option<CrawlerCheckpoint> =
Some(parse_checkpoint(get!(message_content, "checkpoint"))?);
let old_checkpoint: Option<CrawlerCheckpoint> = None;
let events: Vec<(Event, Profile)> = Vec::new();
self.database
.add_historic_events(events, new_checkpoint, old_checkpoint)
.recv()??;
Ok(json!(null))
}
fn add_history_events(&mut self, message: &Value) -> Result<Value, Error> {
let new_checkpoint: Option<CrawlerCheckpoint> = match message.get("checkpoint") {
Some(checkpoint) => Some(parse_checkpoint(checkpoint)?),
None => None,
};
let old_checkpoint: Option<CrawlerCheckpoint> = match message.get("oldCheckpoint") {
Some(checkpoint) => Some(parse_checkpoint(checkpoint)?),
None => None,
};

fn add_history_events(&mut self, message: Value) -> Result<Value, Error> {
let message_content = get!(message, "content");
let new_checkpoint: Option<CrawlerCheckpoint> =
Some(parse_checkpoint(get!(message_content, "checkpoint"))?);
let old_checkpoint: Option<CrawlerCheckpoint> = None;
let mut events: Vec<(Event, Profile)> = Vec::new();
let events_json = message_content["events"].as_array();
let events_json = message["events"].as_array();
match events_json {
Some(events_json) => {
for event in events_json {
Expand All @@ -184,27 +195,19 @@ impl Indexer {
}
None => (),
};

self.database
.add_historic_events(events, new_checkpoint, old_checkpoint)
.recv()??;
Ok(json!(null))
}

fn remove_crawler_checkpoint(&mut self, message: Value) -> Result<Value, Error> {
let message_content = get!(message, "content");
let new_checkpoint: Option<CrawlerCheckpoint> = None;
let old_checkpoint: Option<CrawlerCheckpoint> =
Some(parse_checkpoint(get!(message_content, "checkpoint"))?);
let events: Vec<(Event, Profile)> = Vec::new();
self.database
.add_historic_events(events, new_checkpoint, old_checkpoint)
.recv()??;
Ok(json!(null))
fn remove_crawler_checkpoint(&mut self, message: &Value) -> Result<Value, Error> {
Ok(self.add_history_events(&json!({ "oldCheckpoint": get!(message, "checkpoint") }))?)
}

fn search_event_index(&mut self, message: Value) -> Result<Value, Error> {
let message_content = get!(message, "content");
let search_config = get!(message_content, "searchConfig");
fn search_event_index(&mut self, message: &Value) -> Result<Value, Error> {
let search_config = get!(message, "searchConfig");
let (term, config) = parse_search_object(&search_config)?;
let searcher = self.database.get_searcher();
let search_results = searcher.search(&term, &config)?;
Expand Down Expand Up @@ -240,8 +243,8 @@ impl Indexer {
}))
}

fn load_file_events(&mut self, message: Value) -> Result<Value, Error> {
let args = get!(get!(message, "content"), "args");
fn load_file_events(&mut self, message: &Value) -> Result<Value, Error> {
let args = get!(message, "args");
let room_id = as_str!(args, "roomId");
let mut config = LoadConfig::new(room_id);
let limit = as_i64!(args, "limit");
Expand Down Expand Up @@ -455,6 +458,7 @@ fn method_to_enum(method: &String) -> MessageMethod {
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;

fn event_room_message_text() -> Value {
json!({
Expand All @@ -477,35 +481,40 @@ mod tests {

fn checkpoint() -> Value {
json!({
"checkpoint": {
"roomId": "!FDVbSkWZSIcwvBFMdt:localhost",
"token": "123",
"fullCrawl": false,
"direction": "b"
}
"roomId": "!FDVbSkWZSIcwvBFMdt:localhost",
"token": "123",
"direction": "b"
})
}

fn checkpoint2() -> Value {
json!({
"roomId": "!FDVbSkWZSIcwvBFMdt:localhost",
"token": "456",
"direction": "b"
})
}

fn setup() {
use tempfile::tempdir;
let tmpdir = tempdir().unwrap();
std::env::set_var("HOME", tmpdir.path());
fn indexer(tmpdir: &Path) -> Indexer {
let mut config = Config::new();
config = config.set_passphrase("TEST_PASS");
Indexer::new_in_path(tmpdir.to_path_buf(), config).expect("indexer")
}

#[test]
fn crawler_checkpoints() {
setup();
let mut indexer = Indexer::new("test_passphrase", &json!({})).expect("indexer");
let tmpdir = tempdir().expect("tempdir");
let mut indexer = indexer(tmpdir.path());
let checkpoint = checkpoint();

indexer
.add_crawler_checkpoint(json!({
"content": checkpoint.clone()
.add_history_events(&json!({
"checkpoint": checkpoint.clone()
}))
.expect("add_crawler_checkpoint");
indexer
.remove_crawler_checkpoint(json!({
"content": checkpoint.clone()
.remove_crawler_checkpoint(&json!({
"checkpoint": checkpoint.clone()
}))
.expect("remove_crawler_checkpoint");

Expand All @@ -514,9 +523,68 @@ mod tests {
assert_eq!(count, 0);
}

#[test]
fn initial_crawl() {
let tmpdir = tempdir().expect("tempdir");
let mut indexer = indexer(tmpdir.path());
let checkpoint = checkpoint();
let profile = Profile::new("Alice", "");

indexer
.add_history_events(&json!({
"checkpoint": checkpoint.clone()
}))
.expect("add_crawler_checkpoint");

indexer
.add_history_events(&json!({
"checkpoint": checkpoint2(),
"events": [
{
"event": event_room_message_text(),
"profile": profile
}
],
"oldCheckpoint": checkpoint.clone()
}))
.expect("add_history_events");

assert_eq!(
indexer
.load_checkpoints()
.expect("load_checkpoints")
.as_array()
.expect("load_checkpoints.as_array")
.len(),
1
);
}

#[test]
fn add_event() {
let tmpdir = tempdir().expect("tempdir");
let mut indexer = indexer(tmpdir.path());

let profile = Profile::new("Alice", "");
indexer
.add_event_to_index(&json!({
"ev": event_room_message_text(),
"profile": profile
}))
.expect("add_event_to_index");

indexer.commit_live_events().expect("commit_live_events");

let reply = indexer.get_stats().expect("get_stats");
assert_eq!(reply["eventCount"].as_i64().expect("eventCount"), 1);
}

#[test]
fn json_messages() {
setup();
let tmpdir = tempdir().expect("tempdir");
// make sure that we have only one test that modifies the environment
// since tests run in parallel
std::env::set_var("HOME", tmpdir.path().to_str().expect("tmpdir path"));
use std::collections::HashMap;
let mut radical = Radical {
indexer: HashMap::new(),
Expand Down Expand Up @@ -555,7 +623,9 @@ mod tests {
&mut radical,
json!({
"method": "addCrawlerCheckpoint",
"content": checkpoint
"content": {
"checkpoint": checkpoint
}
}),
)
.expect("addCrawlerCheckpoint");
Expand All @@ -564,7 +634,9 @@ mod tests {
&mut radical,
json!({
"method": "removeCrawlerCheckpoint",
"content": checkpoint
"content": {
"checkpoint": checkpoint
}
}),
)
.expect("removeCrawlerCheckpoint");
Expand Down

0 comments on commit 770674e

Please sign in to comment.