Skip to content

Commit

Permalink
Merge pull request coasys#483 from coasys/notifications-ui
Browse files Browse the repository at this point in the history
Notification Prolog helper predicates and OS notification push
  • Loading branch information
fayeed authored Jun 12, 2024
2 parents 2cd0f5b + edb1717 commit f77a71d
Show file tree
Hide file tree
Showing 8 changed files with 442 additions and 42 deletions.
13 changes: 13 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,20 @@ This project _loosely_ adheres to [Semantic Versioning](https://semver.org/spec/

## unreleased

### Fixed
- Prolog engine gets respawned when a query caused an error to clean the machine state [PR#483](https://github.com/coasys/ad4m/pull/483)
- Catch panics in Scryer and handle as error instead of killing the engine thread which caused `channel closed` error in future queries [PR#483](https://github.com/coasys/ad4m/pull/483)

### Added
- Prolog predicates needed in new Flux mention notification trigger:
- agent_did/1
- remove_html_tags/2
- string_includes/2
- literal_from_url/3
- json_property/3
[PR#483](https://github.com/coasys/ad4m/pull/483)
- Triggered notifications are handled through operating system notifications [PR#483](https://github.com/coasys/ad4m/pull/483)

- App notifications implemented. ADAM apps can register Prolog queries with the executor which will be checked on every perspective change. If the change adds a new match, it will trigger the publishing of a notifications via subscriptions in client interface [PR#475](https://github.com/coasys/ad4m/pull/475), as well as calling a web hook if given [PR#482](https://github.com/coasys/ad4m/pull/482)
- Support ADAM executor hosting service alpha [PR#474](https://github.com/coasys/ad4m/pull/474)
- Complete instructions in README [PR#473](https://github.com/coasys/ad4m/pull/473)
Expand Down
87 changes: 56 additions & 31 deletions rust-executor/src/perspectives/perspective_instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::time::Duration;
use serde_json::Value;
use scryer_prolog::machine::parsed_results::{QueryMatch, QueryResolution};
use tokio::{join, time};
use tokio::sync::Mutex;
use tokio::sync::{Mutex, RwLock};
use ad4m_client::literal::Literal;
use chrono::DateTime;
use deno_core::anyhow::anyhow;
Expand Down Expand Up @@ -130,7 +130,7 @@ pub struct PerspectiveInstance {
prolog_needs_rebuild: Arc<Mutex<bool>>,
is_teardown: Arc<Mutex<bool>>,
sdna_change_mutex: Arc<Mutex<()>>,
prolog_update_mutex: Arc<Mutex<()>>,
prolog_update_mutex: Arc<RwLock<()>>,
link_language: Arc<Mutex<Option<Language>>>,
}

Expand All @@ -149,7 +149,7 @@ impl PerspectiveInstance {
prolog_needs_rebuild: Arc::new(Mutex::new(true)),
is_teardown: Arc::new(Mutex::new(false)),
sdna_change_mutex: Arc::new(Mutex::new(())),
prolog_update_mutex: Arc::new(Mutex::new(())),
prolog_update_mutex: Arc::new(RwLock::new(())),
link_language: Arc::new(Mutex::new(None)),
}
}
Expand Down Expand Up @@ -792,15 +792,29 @@ impl PerspectiveInstance {
}

async fn ensure_prolog_engine(&self) -> Result<(), AnyError> {
let mut maybe_prolog_engine = self.prolog_engine.lock().await;
if maybe_prolog_engine.is_none() {
let has_prolog_engine = {
self.prolog_engine.lock().await.is_some()
};

let mut rebuild_flag = self.prolog_needs_rebuild.lock().await;

if !has_prolog_engine || *rebuild_flag == true {
let _update_lock = self.prolog_update_mutex.write().await;
let mut maybe_prolog_engine = self.prolog_engine.lock().await;
if *rebuild_flag == true && maybe_prolog_engine.is_some() {
let old_engine = maybe_prolog_engine.as_ref().unwrap();
let _ = old_engine.drop();
*rebuild_flag = false;
}

let mut engine = PrologEngine::new();
engine.spawn().await.map_err(|e| anyhow!("Failed to spawn Prolog engine: {}", e))?;
let all_links = self.get_links(&LinkQuery::default()).await?;
let facts = init_engine_facts(all_links, self.persisted.lock().await.neighbourhood.as_ref().map(|n| n.author.clone())).await?;
engine.load_module_string("facts".to_string(), facts).await?;
*maybe_prolog_engine = Some(engine);
}

Ok(())
}

Expand All @@ -809,6 +823,7 @@ impl PerspectiveInstance {
pub async fn prolog_query(&self, query: String) -> Result<QueryResolution, AnyError> {
self.ensure_prolog_engine().await?;

let _read_lock = self.prolog_update_mutex.read().await;
let prolog_engine_mutex = self.prolog_engine.lock().await;
let prolog_engine_option_ref = prolog_engine_mutex.as_ref();
let prolog_engine = prolog_engine_option_ref.as_ref().expect("Must be some since we initialized the engine above");
Expand All @@ -819,10 +834,18 @@ impl PerspectiveInstance {
query
};

prolog_engine
let result = prolog_engine
.run_query(query)
.await?
.map_err(|e| anyhow!(e))
.await?;

match result {
Err(e) => {
let mut flag = self.prolog_needs_rebuild.lock().await;
*flag = true;
Err(anyhow!(e))
}
Ok(resolution) => Ok(resolution)
}
}

fn spawn_prolog_facts_update(&self, before: BTreeMap<Notification, Vec<QueryMatch>>, diff: DecoratedPerspectiveDiff) {
Expand Down Expand Up @@ -894,33 +917,35 @@ impl PerspectiveInstance {

async fn publish_notification_matches(uuid: String, match_map: BTreeMap<Notification, Vec<QueryMatch>>) {
for (notification, matches) in match_map {
let payload = TriggeredNotification {
notification: notification.clone(),
perspective_id: uuid.clone(),
trigger_match: prolog_resolution_to_string(QueryResolution::Matches(matches))
};
if (matches.len() > 0) {
let payload = TriggeredNotification {
notification: notification.clone(),
perspective_id: uuid.clone(),
trigger_match: prolog_resolution_to_string(QueryResolution::Matches(matches))
};

let message = serde_json::to_string(&payload).unwrap();
let message = serde_json::to_string(&payload).unwrap();

if let Ok(_) = url::Url::parse(&notification.webhook_url) {
log::info!("Notification webhook - posting to {:?}", notification.webhook_url);
let client = reqwest::Client::new();
let res = client.post(&notification.webhook_url)
.bearer_auth(&notification.webhook_auth)
.header("Content-Type", "application/json")
.body(message.clone())
.send()
.await;
log::info!("Notification webhook response: {:?}", res);
}

if let Ok(_) = url::Url::parse(&notification.webhook_url) {
log::info!("Notification webhook - posting to {:?}", notification.webhook_url);
let client = reqwest::Client::new();
let res = client.post(&notification.webhook_url)
.bearer_auth(&notification.webhook_auth)
.header("Content-Type", "application/json")
.body(message.clone())
.send()
get_global_pubsub()
.await
.publish(
&RUNTIME_NOTIFICATION_TRIGGERED_TOPIC,
&message,
)
.await;
log::info!("Notification webhook response: {:?}", res);
}

get_global_pubsub()
.await
.publish(
&RUNTIME_NOTIFICATION_TRIGGERED_TOPIC,
&message,
)
.await;
}
}

Expand Down
154 changes: 154 additions & 0 deletions rust-executor/src/perspectives/sdna.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,13 @@ pub async fn init_engine_facts(all_links: Vec<DecoratedLinkExpression>, neighbou
lines.push(":- discontiguous(p3_class_color/2).".to_string());
lines.push(":- discontiguous(p3_instance_color/3).".to_string());

// library modules
lines.push(":- use_module(library(lists)).".to_string());
lines.push(":- use_module(library(dcgs)).".to_string());
lines.push(":- use_module(library(charsio)).".to_string());
lines.push(":- use_module(library(format)).".to_string());
lines.push(":- use_module(library(assoc)).".to_string());
lines.push(":- use_module(library(dif)).".to_string());

let lib = r#"
:- discontiguous(paginate/4).
Expand Down Expand Up @@ -171,6 +177,154 @@ takeN(Rest, NextN, PageRest).

lines.extend(lib.split('\n').map(|s| s.to_string()));

let literal_html_string_predicates = r#"
% Main predicate to remove HTML tags
remove_html_tags(Input, Output) :-
phrase(strip_html(Output), Input).
% DCG rule to strip HTML tags
strip_html([]) --> [].
strip_html(Result) -->
"<", !, skip_tag, strip_html(Result).
strip_html([Char|Result]) -->
[Char],
strip_html(Result).
% DCG rule to skip HTML tags
skip_tag --> ">", !.
skip_tag --> [_], skip_tag.
% Main predicate to check if Substring is included in String
string_includes(String, Substring) :-
phrase((..., string(Substring), ...), String).
% DCG rule for any sequence of characters
... --> [].
... --> [_], ... .
% DCG rule for matching a specific string
string([]) --> [].
string([C|Cs]) --> [C], string(Cs).
literal_from_url(Url, Decoded, Scheme) :-
phrase(parse_url(Scheme, Encoded), Url),
phrase(url_decode(Decoded), Encoded).
% DCG rule to parse the URL
parse_url(Scheme, Encoded) -->
"literal://", scheme(Scheme), ":", string(Encoded).
scheme(string) --> "string".
scheme(number) --> "number".
scheme(json) --> "json".
url_decode([]) --> [].
url_decode([H|T]) --> url_decode_char(H), url_decode(T).
url_decode_char(' ') --> "%20".
url_decode_char('!') --> "%21".
url_decode_char('"') --> "%22".
url_decode_char('#') --> "%23".
url_decode_char('$') --> "%24".
url_decode_char('%') --> "%25".
url_decode_char('&') --> "%26".
url_decode_char('\'') --> "%27".
url_decode_char('(') --> "%28".
url_decode_char(')') --> "%29".
url_decode_char('*') --> "%2A".
url_decode_char('+') --> "%2B".
url_decode_char(',') --> "%2C".
url_decode_char('/') --> "%2F".
url_decode_char(':') --> "%3A".
url_decode_char(';') --> "%3B".
url_decode_char('=') --> "%3D".
url_decode_char('?') --> "%3F".
url_decode_char('@') --> "%40".
url_decode_char('[') --> "%5B".
url_decode_char(']') --> "%5D".
url_decode_char('{') --> "%7B".
url_decode_char('}') --> "%7D".
url_decode_char('<') --> "%3C".
url_decode_char('>') --> "%3E".
url_decode_char('\\') --> "%5C".
url_decode_char('^') --> "%5E".
url_decode_char('_') --> "%5F".
url_decode_char('|') --> "%7C".
url_decode_char('~') --> "%7E".
url_decode_char('`') --> "%60".
url_decode_char('-') --> "%2D".
url_decode_char('.') --> "%2E".
url_decode_char(Char) --> [Char], { \+ member(Char, "%") }.
"#;

lines.extend(literal_html_string_predicates.split('\n').map(|s| s.to_string()));

let json_parser = r#"
% Main predicate to parse JSON and extract a property
json_property(JsonString, Property, Value) :-
phrase(json_dict(Dict), JsonString),
get_assoc(Property, Dict, Value).
% DCG rules to parse JSON
json_dict(Dict) -->
ws, "{", ws, key_value_pairs(Pairs), ws, "}", ws,
{ list_to_assoc(Pairs, Dict) }.
key_value_pairs([Key-Value|Pairs]) -->
ws, json_string(Key), ws, ":", ws, json_value(Value), ws, ("," -> key_value_pairs(Pairs) ; {Pairs=[]}).
json_value(Value) --> json_dict(Value).
json_value(Value) --> json_array(Value).
json_value(Value) --> json_string(Value).
json_value(Value) --> json_number(Value).
json_array([Value|Values]) -->
"[", ws, json_value(Value), ws, ("," -> json_value_list(Values) ; {Values=[]}), ws, "]".
json_value_list([Value|Values]) --> json_value(Value), ws, ("," -> json_value_list(Values) ; {Values=[]}).
json_string(String) -->
"\"", json_string_chars(String), "\"".
json_string_chars([]) --> [].
json_string_chars([C|Cs]) --> json_string_char(C), json_string_chars(Cs).
json_string_char(C) --> [C], { dif(C, '"'), dif(C, '\\') }.
json_string_char('"') --> ['\\', '"'].
json_string_char('\\') --> ['\\', '\\'].
json_string_char('/') --> ['\\', '/'].
json_string_char('\b') --> ['\\', 'b'].
json_string_char('\f') --> ['\\', 'f'].
json_string_char('\n') --> ['\\', 'n'].
json_string_char('\r') --> ['\\', 'r'].
json_string_char('\t') --> ['\\', 't'].
json_number(Number) -->
number_sequence(Chars),
{ atom_chars(Atom, Chars),
atom_number(Atom, Number) }.
string_chars([]) --> [].
string_chars([C|Cs]) --> [C], { dif(C, '"') }, string_chars(Cs).
% Simplified number_sequence to handle both integer and fractional parts
number_sequence([D|Ds]) --> digit(D), number_sequence_rest(Ds).
number_sequence_rest([D|Ds]) --> digit(D), number_sequence_rest(Ds).
number_sequence_rest([]) --> [].
digit(D) --> [D], { member(D, "0123456789.") }.
ws --> ws_char, ws.
ws --> [].
ws_char --> [C], { C = ' ' ; C = '\t' ; C = '\n' ; C = '\r' }.
"#;

lines.extend(json_parser.split('\n').map(|s| s.to_string()));

lines.push(format!("agent_did(\"{}\").", agent::did()));

let mut author_agents = vec![agent::did()];
if let Some(neughbourhood_author) = neighbourhood_author {
author_agents.push(neughbourhood_author);
Expand Down
21 changes: 14 additions & 7 deletions rust-executor/src/perspectives/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,20 @@ pub fn prolog_value_to_json_string(value: Value) -> String {
Value::Atom(a) => format!("{}", a.as_str()),
Value::String(s) =>
if let Err(_e) = serde_json::from_str::<serde_json::Value>(s.as_str()) {
//treat as string literal
//escape double quotes
format!("\"{}\"", s
.replace("\"", "\\\"")
.replace("\n", "\\n")
.replace("\t", "\\t")
.replace("\r", "\\r"))
//try unescaping an escape json string
let wrapped_s = format!("\"{}\"",s);
if let Ok(json_value) = serde_json::from_str::<serde_json::Value>(wrapped_s.as_str()) {
json_value.to_string()
} else {
//treat as string literal
//escape double quotes
format!("\"{}\"", s
.replace("\"", "\\\"")
.replace("\n", "\\n")
.replace("\t", "\\t")
.replace("\r", "\\r"))
}

} else {
//return valid json string
s
Expand Down
Loading

0 comments on commit f77a71d

Please sign in to comment.