You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
The DFIR join operator uses symmetric_hash_join_into_iter, which (a) dynamically chooses which side is the "outer loop" based on collection size, and (b) uses a HashMap to determine the order of the "inner loop". This leads to non-determinism for even simple cross_product operators in Hydro:
use hydro_lang::*;
use dfir_rs::futures::StreamExt;
tokio_test::block_on(test_util::stream_transform_test(|process| {
let tick = process.tick();
let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
let stream2 = process.source_iter(q!(vec![1, 2, 3]));
stream1.cross_product(stream2)
}, |mut stream| async move {
// should be ('a', 1), ('a', 2), ('a', 3), ('b', 1), ('b', 2), ('b', 3), ('c', 1), ('c', 2), ('c', 3)
for w in vec![('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)] {
assert_eq!(stream.next().await.unwrap(), w);
}
}));
The text was updated successfully, but these errors were encountered:
The fix shouldn't be that hard, so maybe not necessary to have a temporary fix. We have dfir_rs/src/compiled/pull/cross_join.rs we just aren't setting it up in dfir_lang/src/graph/ops/cross_join.rs and dfir_lang/src/graph/ops/cross_join_multiset.rs -- we are reusing the logic from dfir_lang/src/graph/ops/join.rs .. I think to avoid copying a lot of persistence boilerplate.
cross_product will still be non-deterministic for unbounded streams though, which is why I think we should do the conservative thing for now till we figure out what the semantics should be.
The DFIR
join
operator usessymmetric_hash_join_into_iter
, which (a) dynamically chooses which side is the "outer loop" based on collection size, and (b) uses aHashMap
to determine the order of the "inner loop". This leads to non-determinism for even simplecross_product
operators in Hydro:The text was updated successfully, but these errors were encountered: