Skip to content

Commit

Permalink
Perf work
Browse files Browse the repository at this point in the history
Signed-off-by: Heinz N. Gies <[email protected]>
  • Loading branch information
Licenser committed Sep 21, 2023
1 parent e924e78 commit 9155e74
Show file tree
Hide file tree
Showing 7 changed files with 375 additions and 46 deletions.
2 changes: 1 addition & 1 deletion tests/script_runtime_errors/bad_fold_type/error.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Error:
1 | for event of
2 | case (k,v) => [k, v]
| ^^^^^^ The binary operation `+` is not defined for the type `record` and `array`
| ^^^^^^ Type error: Expected object, found array
3 | into {} end;
31 changes: 31 additions & 0 deletions tremor-cli/tests/bench/for-record-fn/main.troy
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
define flow main
flow
use tremor::{pipelines, connectors};
use bench;

define pipeline main
pipeline
define script runtime
script
use std::record;
record::from_array(for event of
case (k, v) => [k, v]
end)
end;

create script runtime;

select event from in into runtime;
select event from runtime into out;
select event from runtime/err into err;

end;
create pipeline main;
create connector bench from bench::bench;
create connector console from connectors::console;

connect /connector/bench to /pipeline/main;
connect /pipeline/main to /connector/bench;
connect /pipeline/main/err to /connector/console/stderr;
end;
deploy flow main;
5 changes: 5 additions & 0 deletions tremor-cli/tests/bench/for-record-fn/tags.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[
"throughput",
"codec:json",
"focus:for"
]
7 changes: 3 additions & 4 deletions tremor-cli/tests/bench/for-record/main.troy
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ flow
pipeline
define script runtime
script
use std::record;
record::from_array(for event of
case (k, v) => [k, v]
end)
for event of
case (k, v) => {"#{k}": v}
into {} end
end;

create script runtime;
Expand Down
46 changes: 42 additions & 4 deletions tremor-script/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,44 @@ impl<P> From<std::sync::PoisonError<P>> for Error {

impl From<TryTypeError> for Error {
fn from(e: TryTypeError) -> Self {
ErrorKind::TypeError(e.expected, e.got).into()
ErrorKind::TypeError(None, None, e.expected, e.got).into()
}
}

pub(crate) trait AddSpan<O, I>
where
O: BaseExpr + Ranged,
I: BaseExpr + Ranged,
{
type Output;
fn add_span(self, outer: &O, inner: &I) -> Self::Output;
}

impl<O, I> AddSpan<O, I> for Error
where
O: BaseExpr + Ranged,
I: BaseExpr + Ranged,
{
type Output = Self;
fn add_span(self, outer: &O, inner: &I) -> Self {
match self {
Error(ErrorKind::TypeError(_, _, e, g), state) => Error(
ErrorKind::TypeError(Some(outer.extent()), Some(inner.extent()), e, g),
state,
),
_ => self,

Check warning on line 116 in tremor-script/src/errors.rs

View check run for this annotation

Codecov / codecov/patch

tremor-script/src/errors.rs#L116

Added line #L116 was not covered by tests
}
}
}
impl<T, E, O, I> AddSpan<O, I> for std::result::Result<T, E>
where
O: BaseExpr + Ranged,
I: BaseExpr + Ranged,
Error: From<E>,
{
type Output = Result<T>;
fn add_span(self, outer: &O, inner: &I) -> Self::Output {
self.map_err(|e| Error::from(e).add_span(outer, inner))
}
}

Expand Down Expand Up @@ -294,9 +331,10 @@ impl ErrorKind {
| CyclicUse(outer, inner, _)
| InvalidDefinitionalWithParam(outer, inner, _, _, _)
| UpdateKeyMissing(outer, inner, _) => (Some(*outer), Some(*inner)),

TypeError(outer, inner, _, _) => (*outer, *inner),
// Special cases
EmptyScript
| TypeError(_, _)
| AccessError(_)
| CantSetArgsConst
| CantSetGroupConst
Expand Down Expand Up @@ -556,7 +594,7 @@ error_chain! {
description(" Cyclic dependency detected!")
display(" Cyclic dependency detected: {}", uses.join(" -> "))
}
TypeError(expected: ValueType, found: ValueType) {
TypeError(expr: Option<Span>, inner: Option<Span>, expected: ValueType, found: ValueType) {
description("Type error")
display("Type error: Expected {}, found {}", expected, found)
}
Expand Down Expand Up @@ -1384,7 +1422,7 @@ mod test {
.0;
assert_matches!(
r,
ErrorKind::TypeError(ValueType::Object, ValueType::String)
ErrorKind::TypeError(None, None, ValueType::Object, ValueType::String)
);
}
}
144 changes: 135 additions & 9 deletions tremor-script/src/interpreter/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use super::{
use crate::{
ast::BinOpKind,
errors::{
err_need_obj, error_assign_array, error_assign_to_const, error_bad_key_err,
error_invalid_assign_target, error_no_clause_hit, Result,
err_generic, err_need_obj, error_assign_array, error_assign_to_const, error_bad_key_err,
error_invalid_assign_target, error_no_clause_hit, AddSpan, Result,
},
};
use crate::{ast::ComprehensionFoldOp, prelude::*};
Expand All @@ -42,6 +42,10 @@ use std::{
iter,
};

pub(crate) type ComprehensionIter<'event, 'run> =
Box<dyn Iterator<Item = (Value<'event>, Value<'event>)> + 'run>;
pub(crate) type ComprehensionItem<'event, 'run> = (usize, ComprehensionIter<'event, 'run>);

#[derive(Debug)]
/// Continuation context to control program flow
// TODO, can we avoid the static here?
Expand Down Expand Up @@ -226,6 +230,116 @@ impl<'script> Expr<'script> {
}
}

fn comprehension_array<'run, 'event>(
&'run self,
opts: ExecOpts,
env: &'run Env<'run, 'event>,
event: &'run mut Value<'event>,
state: &'run mut Value<'static>,
meta: &'run mut Value<'event>,
local: &'run mut LocalStack<'event>,
expr: &'run Comprehension<'event, Expr>,
items: ComprehensionIter<'event, 'run>,
mut result: Vec<Value<'event>>,
) -> Result<Cont<'run, 'event>> {
let cases = &expr.cases;

'outer: for (k, v) in items {
stry!(set_local_shadow(self, local, expr.key_id, k));
stry!(set_local_shadow(self, local, expr.val_id, v));

for e in cases {
if stry!(test_guard(
self, opts, env, event, state, meta, local, &e.guard
)) {
let es = &e.exprs;
let l = &e.last_expr;
let v = demit!(Self::execute_effectors(
opts, env, event, state, meta, local, es, l,
));
// NOTE: We are creating a new value so we have to clone;
if opts.result_needed {
let v = v.into_owned();
result.push(v);
}
continue 'outer;
}
}
}

Ok(Cont::Cont(Cow::Owned(Value::Array(result))))
}
fn comprehension_record<'run, 'event>(
&'run self,
opts: ExecOpts,
env: &'run Env<'run, 'event>,
event: &'run mut Value<'event>,
state: &'run mut Value<'static>,
meta: &'run mut Value<'event>,
local: &'run mut LocalStack<'event>,
expr: &'run Comprehension<'event, Expr>,
items: ComprehensionIter<'event, 'run>,
mut result: Object<'event>,
) -> Result<Cont<'run, 'event>> {
let cases = &expr.cases;
let no_res = opts.without_result();

'outer: for (k, v) in items {
stry!(set_local_shadow(self, local, expr.key_id, k));
stry!(set_local_shadow(self, local, expr.val_id, v));

for e in cases {
if stry!(test_guard(
self, opts, env, event, state, meta, local, &e.guard
)) {
let l = &e.last_expr;
if let Expr::Imut(ImutExpr::Record(r)) = l {
for effector in &e.exprs {
demit!(effector.run(no_res, env, event, state, meta, local));

Check warning on line 298 in tremor-script/src/interpreter/expr.rs

View check run for this annotation

Codecov / codecov/patch

tremor-script/src/interpreter/expr.rs#L298

Added line #L298 was not covered by tests
}

if opts.result_needed {
for (k, v) in &r.base {
result.insert(k.clone(), v.clone());
}

Check warning on line 304 in tremor-script/src/interpreter/expr.rs

View check run for this annotation

Codecov / codecov/patch

tremor-script/src/interpreter/expr.rs#L303-L304

Added lines #L303 - L304 were not covered by tests
for f in &r.fields {
let k = stry!(f.name.run(opts, env, event, state, meta, local));
let v = stry!(f.value.run(opts, env, event, state, meta, local));
result.insert(k.clone(), v.into_owned());
}
} else {
for f in &r.fields {
if opts.result_needed {
let k = stry!(f.name.run(opts, env, event, state, meta, local));
let v =
stry!(f.value.run(opts, env, event, state, meta, local));

Check warning on line 315 in tremor-script/src/interpreter/expr.rs

View check run for this annotation

Codecov / codecov/patch

tremor-script/src/interpreter/expr.rs#L311-L315

Added lines #L311 - L315 were not covered by tests

result.insert(k.clone(), v.into_owned());

Check warning on line 317 in tremor-script/src/interpreter/expr.rs

View check run for this annotation

Codecov / codecov/patch

tremor-script/src/interpreter/expr.rs#L317

Added line #L317 was not covered by tests
} else {
stry!(f.value.run(opts, env, event, state, meta, local));

Check warning on line 319 in tremor-script/src/interpreter/expr.rs

View check run for this annotation

Codecov / codecov/patch

tremor-script/src/interpreter/expr.rs#L319

Added line #L319 was not covered by tests
}
}
}
} else {
let v = demit!(Self::execute_effectors(
opts, env, event, state, meta, local, &e.exprs, l,
));
// NOTE: We are creating a new value so we have to clone;
if opts.result_needed {
let v = v.into_owned();

for (k, v) in v.try_into_object().add_span(expr, l)? {
result.insert(k, v);
}
}

Check warning on line 334 in tremor-script/src/interpreter/expr.rs

View check run for this annotation

Codecov / codecov/patch

tremor-script/src/interpreter/expr.rs#L332-L334

Added lines #L332 - L334 were not covered by tests
}
continue 'outer;
}

Check warning on line 337 in tremor-script/src/interpreter/expr.rs

View check run for this annotation

Codecov / codecov/patch

tremor-script/src/interpreter/expr.rs#L337

Added line #L337 was not covered by tests
}
}

Ok(Cont::Cont(Cow::Owned(Value::Object(Box::new(result)))))
}
// TODO: Quite some overlap with `ImutExprInt::comprehension`
fn comprehension<'run, 'event>(
&'run self,
Expand All @@ -237,7 +351,6 @@ impl<'script> Expr<'script> {
local: &'run mut LocalStack<'event>,
expr: &'run Comprehension<'event, Expr>,
) -> Result<Cont<'run, 'event>> {
type Bi<'v, 'r> = (usize, Box<dyn Iterator<Item = (Value<'v>, Value<'v>)> + 'r>);
fn kv<'k, K>((k, v): (K, Value)) -> (Value<'k>, Value)
where
K: 'k,
Expand All @@ -247,12 +360,11 @@ impl<'script> Expr<'script> {
}

let target = &expr.target;
let cases = &expr.cases;
let t = stry!(target.run(opts, env, event, state, meta, local,));

let (l, items): Bi = t.as_object().map_or_else(
let (l, items): ComprehensionItem = t.as_object().map_or_else(
|| {
t.as_array().map_or_else::<Bi, _, _>(
t.as_array().map_or_else::<ComprehensionItem, _, _>(
|| (0, Box::new(iter::empty())),
|t| (t.len(), Box::new(t.clone().into_iter().enumerate().map(kv))),
)
Expand All @@ -270,6 +382,20 @@ impl<'script> Expr<'script> {
} else {
Value::const_null()
};
// short circuite for common cases
if expr.fold == ComprehensionFoldOp(BinOpKind::Add) {
if result.is_array() {
let a = result.into_array().unwrap_or_default();
return self
.comprehension_array(opts, env, event, state, meta, local, expr, items, a);
} else if result.is_object() {
let o = result.into_object().unwrap_or_default();
return self
.comprehension_record(opts, env, event, state, meta, local, expr, items, o);
}
}
let cases = &expr.cases;

'outer: for (k, v) in items {
stry!(set_local_shadow(self, local, expr.key_id, k));
stry!(set_local_shadow(self, local, expr.val_id, v));
Expand All @@ -292,18 +418,18 @@ impl<'script> Expr<'script> {
lhs.push(v);
}

Check warning on line 419 in tremor-script/src/interpreter/expr.rs

View check run for this annotation

Codecov / codecov/patch

tremor-script/src/interpreter/expr.rs#L416-L419

Added lines #L416 - L419 were not covered by tests
ComprehensionFoldOp(_) => {
return Err("Invalid fold operation".into())
return err_generic(expr, l, &"Invalid fold operation");

Check warning on line 421 in tremor-script/src/interpreter/expr.rs

View check run for this annotation

Codecov / codecov/patch

tremor-script/src/interpreter/expr.rs#L421

Added line #L421 was not covered by tests
}
}
} else if let Some(lhs) = result.as_object_mut() {
match expr.fold {

Check warning on line 425 in tremor-script/src/interpreter/expr.rs

View check run for this annotation

Codecov / codecov/patch

tremor-script/src/interpreter/expr.rs#L425

Added line #L425 was not covered by tests
ComprehensionFoldOp(BinOpKind::Add) => {
for (k, v) in v.into_object().ok_or("Invalid fold operation")? {
for (k, v) in v.try_into_object().add_span(expr, l)? {
lhs.insert(k, v);
}

Check warning on line 429 in tremor-script/src/interpreter/expr.rs

View check run for this annotation

Codecov / codecov/patch

tremor-script/src/interpreter/expr.rs#L427-L429

Added lines #L427 - L429 were not covered by tests
}
ComprehensionFoldOp(_) => {
return Err("Invalid fold operation".into())
return err_generic(expr, l, &"Invalid fold operation");

Check warning on line 432 in tremor-script/src/interpreter/expr.rs

View check run for this annotation

Codecov / codecov/patch

tremor-script/src/interpreter/expr.rs#L432

Added line #L432 was not covered by tests
}
}
} else {
Expand Down
Loading

0 comments on commit 9155e74

Please sign in to comment.