Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support asof join #15411

Open
wants to merge 187 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
187 commits
Select commit Hold shift + click to select a range
c60fc15
init
zenus Apr 28, 2024
896a791
inited
zenus May 4, 2024
733fb47
Merge branch 'main' into asof
zenus May 6, 2024
a5ec9ae
Merge branch 'main' into asof
zenus May 7, 2024
fa79c24
Merge branch 'main' into asof
zenus May 7, 2024
ee0b22b
fixed
zenus May 8, 2024
8aefb9a
Merge branch 'asof' of https://github.com/zenus/databend into asof
zenus May 8, 2024
91b9eeb
fixed
zenus May 9, 2024
3d60527
Merge branch 'main' into asof
zenus May 9, 2024
b0f33fc
fixed
zenus May 11, 2024
e737094
Merge branch 'asof' of https://github.com/zenus/databend into asof
zenus May 11, 2024
9837a25
add
zenus May 11, 2024
02223e3
fixed
zenus May 11, 2024
eb70b9e
Merge branch 'main' into asof
zenus May 11, 2024
9a82acc
fixed
zenus May 11, 2024
fa8644b
Merge branch 'asof' of https://github.com/zenus/databend into asof
zenus May 11, 2024
1b943ab
fixed
zenus May 12, 2024
a170bd5
fixed
zenus May 12, 2024
fc14506
fixed
zenus May 12, 2024
465bf53
fixed
zenus May 12, 2024
d5a9216
fixed
zenus May 12, 2024
b88e6dd
fixed
zenus May 12, 2024
68decdb
Merge branch 'main' into asof
zenus May 12, 2024
b3ce0c8
fixed
zenus May 12, 2024
2718b22
Merge branch 'asof' of https://github.com/zenus/databend into asof
zenus May 12, 2024
1c5b1b4
Merge branch 'main' into asof
zenus May 12, 2024
4e3f4a4
fixed
zenus May 12, 2024
78eb155
Merge branch 'asof' of https://github.com/zenus/databend into asof
zenus May 12, 2024
0a7c786
fixed
zenus May 12, 2024
a50139c
fixed
zenus May 12, 2024
aab33e3
fixed
zenus May 12, 2024
3994fd6
fixed
zenus May 12, 2024
9682336
fixed
zenus May 12, 2024
f44f269
Merge branch 'main' into asof
zenus May 12, 2024
1c6954b
Merge branch 'main' into asof
zenus May 12, 2024
637a4b0
Merge branch 'main' into asof
zenus May 13, 2024
26fb31a
Merge branch 'main' into asof
zenus May 14, 2024
e4b2d73
Merge branch 'main' into asof
zenus May 14, 2024
da5ee1c
Merge branch 'main' into asof
zenus May 14, 2024
e66b6bf
Merge branch 'main' into asof
zenus May 14, 2024
845f8bd
Merge branch 'main' into asof
zenus May 14, 2024
cefcbe8
Merge branch 'main' into asof
zenus May 15, 2024
35c00e7
add test
zenus May 16, 2024
aa33e99
Merge branch 'main' into asof
zenus May 16, 2024
33038c4
Merge branch 'main' into asof
zenus May 16, 2024
b39ab3f
Update test_asof_join_miss.test
zenus May 17, 2024
591b663
Merge branch 'main' into asof
zenus May 17, 2024
5c43452
fixed
zenus May 17, 2024
7afb562
Merge branch 'asof' of https://github.com/zenus/databend into asof
zenus May 17, 2024
ada7c12
Merge branch 'main' into asof
zenus May 17, 2024
9eaadb6
fixed
zenus May 17, 2024
4dbde29
Merge branch 'asof' of https://github.com/zenus/databend into asof
zenus May 17, 2024
20d3039
Merge branch 'main' into asof
zenus May 19, 2024
aeb3cc1
Merge branch 'main' into asof
zenus May 20, 2024
8b40b88
Merge branch 'main' into asof
zenus May 21, 2024
b72f118
Merge branch 'main' into asof
zenus May 21, 2024
53075c8
fixed
zenus May 21, 2024
94f4499
Merge branch 'asof' of https://github.com/zenus/databend into asof
zenus May 21, 2024
29544ee
Merge branch 'main' into asof
zenus May 22, 2024
6c5ced6
Merge branch 'main' into asof
zenus May 22, 2024
e83b8d8
fixed
zenus May 22, 2024
5c92ff2
Merge branch 'asof' of https://github.com/zenus/databend into asof
zenus May 22, 2024
7ef3cea
Merge branch 'main' into asof
zenus May 22, 2024
70e98ea
fixed
zenus May 22, 2024
5fa993d
Merge branch 'asof' of https://github.com/zenus/databend into asof
zenus May 22, 2024
5acb66f
fixed
zenus May 22, 2024
7b696c7
fixed
zenus May 22, 2024
dec32a4
fixed
zenus May 22, 2024
8237caa
Merge branch 'main' into asof
zenus May 22, 2024
654d977
Merge branch 'main' into asof
zenus May 23, 2024
5a28e15
fixed
zenus May 23, 2024
3ee5bd2
Merge branch 'asof' of https://github.com/zenus/databend into asof
zenus May 23, 2024
28c21c5
Merge branch 'main' into asof
zenus May 23, 2024
402fcf6
fixed
zenus May 23, 2024
c6172b1
Merge branch 'asof' of https://github.com/zenus/databend into asof
zenus May 23, 2024
b501560
Merge branch 'main' into asof
zenus May 23, 2024
1d7584b
Merge branch 'main' into asof
zenus May 24, 2024
7c246f6
fixed
zenus May 25, 2024
f1ca0c4
fixed
zenus May 25, 2024
8735744
Merge branch 'main' into asof
zenus May 25, 2024
823b918
fixed
zenus May 25, 2024
1ea7be5
Merge branch 'asof' of https://github.com/zenus/databend into asof
zenus May 25, 2024
72d2d38
fixed
zenus May 25, 2024
b91b30c
fixed
zenus May 25, 2024
916d3d2
Merge branch 'main' into asof
zenus May 26, 2024
a4da166
fixed1
zenus May 26, 2024
d8bac05
fixed
zenus May 26, 2024
00a7802
fixed2
zenus May 27, 2024
4a15119
fixed2
zenus May 27, 2024
d521e47
fixesd
zenus May 27, 2024
3efd5ff
ffixed
zenus May 27, 2024
42d23de
Merge branch 'main' into asof
zenus May 30, 2024
63606a5
Merge branch 'main' into asof
zenus May 31, 2024
270bd81
Merge branch 'main' into asof
zenus May 31, 2024
4a88c61
Merge branch 'main' into asof
zenus May 31, 2024
7223a3f
fixed
zenus Jun 1, 2024
3a794fd
Merge branch 'asof' of https://github.com/zenus/databend into asof
zenus Jun 1, 2024
3bbee1b
Merge branch 'main' into asof
zenus Jun 1, 2024
2eb5868
fixed
zenus Jun 1, 2024
efc15bb
Merge branch 'asof' of https://github.com/zenus/databend into asof
zenus Jun 1, 2024
884096d
fixed
zenus Jun 1, 2024
9016291
fixed
zenus Jun 1, 2024
ad78dc5
fixed
zenus Jun 1, 2024
b1e61af
fixed
zenus Jun 1, 2024
befab81
Merge branch 'main' into asof
zenus Jun 1, 2024
292e4b8
fixed
zenus Jun 1, 2024
e21c049
Merge branch 'asof' of https://github.com/zenus/databend into asof
zenus Jun 1, 2024
7019936
fixed
zenus Jun 1, 2024
3228177
fixed
zenus Jun 1, 2024
cd808a3
Merge branch 'main' into asof
zenus Jun 1, 2024
7d8b160
fixed
zenus Jun 1, 2024
2cbb8dc
Merge branch 'asof' of https://github.com/zenus/databend into asof
zenus Jun 1, 2024
c1ed71f
fixed
zenus Jun 1, 2024
b6b64a5
fixed
zenus Jun 1, 2024
64827e1
fixed
zenus Jun 1, 2024
28b1cbf
Merge branch 'main' into asof
zenus Jun 3, 2024
9724ccf
Merge branch 'main' into asof
zenus Jun 4, 2024
d898166
Merge branch 'main' into asof
zenus Jun 4, 2024
7596d73
Merge branch 'main' into asof
zenus Jun 5, 2024
2ffad82
Merge branch 'main' into asof
zenus Jun 6, 2024
2b08301
Merge branch 'main' into asof
zenus Jun 6, 2024
2665f0f
Merge branch 'main' into asof
zenus Jun 8, 2024
500ae2d
add
zenus Jun 12, 2024
baaa45a
Merge branch 'main' into asof
zenus Jun 12, 2024
d6ba9de
fixed
zenus Jun 12, 2024
fc19ce8
Merge branch 'main' into asof
zenus Jun 13, 2024
9c2e066
Merge branch 'main' into asof
zenus Jun 13, 2024
555a972
fixed
zenus Jun 13, 2024
bd0d127
Merge branch 'main' into asof
zenus Jun 13, 2024
30e14ba
fixed
zenus Jun 13, 2024
c8a6f24
Merge branch 'asof' of https://github.com/zenus/databend into asof
zenus Jun 13, 2024
ab511bf
add
zenus Jun 13, 2024
1036684
fixed
zenus Jun 13, 2024
5ba098e
Merge branch 'main' into asof
zenus Jun 13, 2024
75b1ed3
fixed
zenus Jun 13, 2024
24fe57c
Merge branch 'asof' of https://github.com/zenus/databend into asof
zenus Jun 13, 2024
ea75c8b
add
zenus Jun 13, 2024
50371f9
fixed
zenus Jun 14, 2024
f30b9e0
Merge branch 'main' into asof
zenus Jun 14, 2024
d7fe74e
add
zenus Jun 15, 2024
d812a8c
Merge branch 'asof' of https://github.com/zenus/databend into asof
zenus Jun 15, 2024
a14018e
Merge branch 'main' into asof
zenus Jun 15, 2024
3ffb4f6
fixed
zenus Jun 16, 2024
cbc291c
Merge branch 'asof' of https://github.com/zenus/databend into asof
zenus Jun 16, 2024
d0ce8f8
Merge branch 'main' into asof
zenus Jun 16, 2024
c880d9d
fixed
zenus Jun 25, 2024
8e1e243
Merge branch 'asof' of https://github.com/zenus/databend into asof
zenus Jun 25, 2024
e49062a
Merge branch 'main' into asof
zenus Jun 25, 2024
497f15d
add fix
zenus Jun 25, 2024
9621adc
Merge branch 'asof' of https://github.com/zenus/databend into asof
zenus Jun 25, 2024
e627684
Merge branch 'main' into asof
zenus Jun 25, 2024
74a646f
add fix
zenus Jun 25, 2024
306be8e
Merge branch 'asof' of https://github.com/zenus/databend into asof
zenus Jun 25, 2024
2761620
Merge branch 'main' into asof
zenus Jun 25, 2024
66e8c81
Merge branch 'main' into asof
zenus Jun 29, 2024
5893975
fix
zenus Jun 29, 2024
6d3c43e
Merge branch 'asof' of https://github.com/zenus/databend into asof
zenus Jun 29, 2024
63d3af9
fix
zenus Jun 29, 2024
17f3f21
fix
zenus Jun 29, 2024
999e5eb
fix
zenus Jun 30, 2024
cbd314f
fix
zenus Jul 13, 2024
79e23aa
Merge branch 'main' into asof
zenus Jul 13, 2024
1ad7a48
fix
zenus Jul 13, 2024
cd08c6f
fix
zenus Jul 13, 2024
43f6bd6
fix
zenus Jul 14, 2024
8bec20e
Merge branch 'main' into asof
zenus Jul 14, 2024
8b3045d
Merge branch 'main' into asof
zenus Jul 15, 2024
1fbf2aa
Merge branch 'main' into asof
zenus Jul 18, 2024
c6cf42c
Merge branch 'main' into asof
zenus Jul 20, 2024
858b37b
Merge branch 'main' into asof
zenus Jul 22, 2024
d48d6a1
Merge branch 'main' into asof
zenus Jul 24, 2024
f57a880
Merge branch 'main' into asof
zenus Jul 24, 2024
cdeb024
Merge branch 'main' into asof
zenus Jul 24, 2024
7d0f3fb
Merge branch 'main' into asof
zenus Jul 24, 2024
9b684d9
Merge branch 'main' into asof
zenus Jul 27, 2024
e04f9b6
Update test_asof_join_ints.test
zenus Jul 27, 2024
6671f6d
Merge branch 'main' into asof
zenus Aug 1, 2024
9adbb35
Merge branch 'main' into asof
zenus Aug 4, 2024
3e5ee6a
add fix
zenus Aug 4, 2024
7350cfd
Merge branch 'main' into asof
zenus Aug 12, 2024
cc58475
Merge branch 'main' into asof
zenus Aug 13, 2024
35844e3
Merge branch 'main' into asof
zenus Aug 15, 2024
b6bd9e0
Merge branch 'main' into asof
zenus Aug 16, 2024
c175398
Merge branch 'main' into asof
zenus Aug 16, 2024
cba1112
Merge branch 'main' into asof
zenus Aug 16, 2024
22a8441
Merge branch 'main' into asof
zenus Aug 19, 2024
8f524f6
Merge branch 'main' into asof
zenus Aug 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/query/ast/src/ast/format/syntax/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,9 @@ pub(crate) fn pretty_table(table: TableReference) -> RcDoc<'static> {
JoinOperator::RightAnti => RcDoc::text("RIGHT ANTI JOIN"),
JoinOperator::LeftSemi => RcDoc::text("LEFT SEMI JOIN"),
JoinOperator::RightSemi => RcDoc::text("RIGHT SEMI JOIN"),
JoinOperator::Asof => RcDoc::text("ASOF JOIN"),
JoinOperator::LeftAsof => RcDoc::text("ASOF LEFT JOIN"),
JoinOperator::RightAsof => RcDoc::text("ASOF RIGHT JOIN"),
})
.append(RcDoc::space().append(pretty_table(*join.right)))
.append(match &join.condition {
Expand Down
13 changes: 13 additions & 0 deletions src/query/ast/src/ast/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,15 @@ impl Display for TableReference {
JoinOperator::CrossJoin => {
write!(f, " CROSS JOIN")?;
}
JoinOperator::Asof => {
write!(f, " ASOF JOIN")?;
}
JoinOperator::LeftAsof => {
write!(f, " ASOF LEFT JOIN")?;
}
JoinOperator::RightAsof => {
write!(f, " ASOF RIGHT JOIN")?;
}
}
write!(f, " {}", join.right)?;
match &join.condition {
Expand Down Expand Up @@ -934,6 +943,10 @@ pub enum JoinOperator {
RightAnti,
// CrossJoin can only work with `JoinCondition::None`
CrossJoin,
// Asof
Asof,
LeftAsof,
RightAsof,
}

#[derive(Debug, Clone, PartialEq, Drive, DriveMut)]
Expand Down
3 changes: 3 additions & 0 deletions src/query/ast/src/parser/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,9 @@ pub fn join_operator(i: Input) -> IResult<JoinOperator> {
value(JoinOperator::RightOuter, rule! { RIGHT ~ OUTER? }),
value(JoinOperator::FullOuter, rule! { FULL ~ OUTER? }),
value(JoinOperator::CrossJoin, rule! { CROSS }),
value(JoinOperator::LeftAsof, rule! { ASOF ~ LEFT }),
value(JoinOperator::RightAsof, rule! { ASOF ~ RIGHT }),
value(JoinOperator::Asof, rule! { ASOF }),
))(i)
}

Expand Down
3 changes: 3 additions & 0 deletions src/query/ast/src/parser/token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,8 @@ pub enum TokenKind {
ARRAY,
#[token("AS", ignore(ascii_case))]
AS,
#[token("ASOF", ignore(ascii_case))]
ASOF,
#[token("AST", ignore(ascii_case))]
AST,
#[token("AT", ignore(ascii_case))]
Expand Down Expand Up @@ -1644,6 +1646,7 @@ impl TokenKind {
| TokenKind::WHEN => true,
| TokenKind::ARRAY
| TokenKind::AS
| TokenKind::ASOF
| TokenKind::BETWEEN
| TokenKind::CREATE
| TokenKind::ATTACH
Expand Down
61 changes: 61 additions & 0 deletions src/query/expression/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ pub use self::string::StringType;
pub use self::timestamp::TimestampType;
pub use self::variant::VariantType;
use crate::property::Domain;
use crate::types::date::DATE_MAX;
use crate::types::date::DATE_MIN;
use crate::types::timestamp::TIMESTAMP_MAX;
use crate::types::timestamp::TIMESTAMP_MIN;
use crate::values::Column;
use crate::values::Scalar;
use crate::ColumnBuilder;
Expand Down Expand Up @@ -174,6 +178,63 @@ impl DataType {
}
}

pub fn infinity(&self) -> Result<Scalar, String> {
match &self {
DataType::Timestamp => Ok(Scalar::Timestamp(TIMESTAMP_MAX)),
DataType::Date => Ok(Scalar::Date(DATE_MAX)),
DataType::Number(NumberDataType::Float32) => Ok(Scalar::Number(NumberScalar::Float32(
ordered_float::OrderedFloat(f32::INFINITY),
))),
DataType::Number(NumberDataType::Int32) => {
Ok(Scalar::Number(NumberScalar::Int32(i32::MAX)))
}
DataType::Number(NumberDataType::Int16) => {
Ok(Scalar::Number(NumberScalar::Int16(i16::MAX)))
}
DataType::Number(NumberDataType::Int8) => {
Ok(Scalar::Number(NumberScalar::Int8(i8::MAX)))
}
DataType::Number(NumberDataType::Float64) => Ok(Scalar::Number(NumberScalar::Float64(
ordered_float::OrderedFloat(f64::INFINITY),
))),
DataType::Number(NumberDataType::Int64) => {
Ok(Scalar::Number(NumberScalar::Int64(i64::MAX)))
}
_ => Result::Err(format!(
"only support numeric types and time types, but got {:?}",
self
)),
}
}
pub fn ninfinity(&self) -> Result<Scalar, String> {
match &self {
DataType::Timestamp => Ok(Scalar::Timestamp(TIMESTAMP_MIN)),
DataType::Date => Ok(Scalar::Date(DATE_MIN)),
DataType::Number(NumberDataType::Float32) => Ok(Scalar::Number(NumberScalar::Float32(
ordered_float::OrderedFloat(f32::NEG_INFINITY),
))),
DataType::Number(NumberDataType::Int32) => {
Ok(Scalar::Number(NumberScalar::Int32(i32::MIN)))
}
DataType::Number(NumberDataType::Int16) => {
Ok(Scalar::Number(NumberScalar::Int16(i16::MIN)))
}
DataType::Number(NumberDataType::Int8) => {
Ok(Scalar::Number(NumberScalar::Int8(i8::MIN)))
}
DataType::Number(NumberDataType::Float64) => Ok(Scalar::Number(NumberScalar::Float64(
ordered_float::OrderedFloat(f64::NEG_INFINITY),
))),
DataType::Number(NumberDataType::Int64) => {
Ok(Scalar::Number(NumberScalar::Int64(i64::MIN)))
}
_ => Result::Err(format!(
"only support numeric types and time types, but got {:?}",
self
)),
}
}

pub fn is_unsigned_numeric(&self) -> bool {
match self {
DataType::Number(ty) => ALL_UNSIGNED_INTEGER_TYPES.contains(ty),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ impl HashJoinState {
}
}

pub(crate) fn wrap_true_validity(
pub fn wrap_true_validity(
column: &BlockEntry,
num_rows: usize,
true_validity: &Bitmap,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ impl HashJoinProbeState {
pub fn probe(&self, input: DataBlock, probe_state: &mut ProbeState) -> Result<Vec<DataBlock>> {
match self.hash_join_state.hash_join_desc.join_type {
JoinType::Inner
| JoinType::Asof
| JoinType::LeftAsof
| JoinType::RightAsof
| JoinType::LeftSemi
| JoinType::LeftAnti
| JoinType::RightSemi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ mod transform_hash_join_probe;
mod util;

pub use build_spill::BuildSpillState;
pub use common::wrap_true_validity;
pub use desc::HashJoinDesc;
pub use hash_join_build_state::HashJoinBuildState;
pub use hash_join_probe_state::HashJoinProbeState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use core::time::Duration;
use std::sync::atomic::Ordering;

use databend_common_arrow::arrow::bitmap::Bitmap;
use databend_common_arrow::arrow::bitmap::MutableBitmap;
use databend_common_catalog::table_context::TableContext;
Expand All @@ -28,6 +31,7 @@ use databend_common_expression::DataBlock;
use databend_common_expression::DataField;
use databend_common_expression::DataSchemaRef;
use databend_common_expression::DataSchemaRefExt;
use databend_common_expression::Scalar;
use databend_common_expression::ScalarRef;
use databend_common_expression::SortColumnDescription;
use databend_common_expression::Value;
Expand Down Expand Up @@ -174,6 +178,22 @@ impl IEJoinState {

impl RangeJoinState {
pub fn ie_join(&self, task_id: usize) -> Result<Vec<DataBlock>> {
let partition_count = self.partition_count.load(Ordering::SeqCst) as usize;
if task_id < partition_count {
let blocks = self.inner_join(task_id);
self.completed_pair.fetch_add(1, Ordering::SeqCst);
blocks
} else {
if !self.left_match.read().is_empty() {
return Ok(vec![self.fill_left_outer(task_id)?]);
} else if !self.right_match.read().is_empty() {
return Ok(vec![self.fill_right_outer(task_id)?]);
}
Ok(vec![DataBlock::empty()])
}
}

pub fn inner_join(&self, task_id: usize) -> Result<Vec<DataBlock>> {
let block_size = self.ctx.get_settings().get_max_block_size()? as usize;
let tasks = self.tasks.read();
let (left_idx, right_idx) = tasks[task_id];
Expand Down Expand Up @@ -313,6 +333,8 @@ impl RangeJoinState {
let mut right_buffer = Vec::with_capacity(block_size);
let mut off1;
let mut off2 = 0;
let mut left_match = self.left_match.write();
let mut right_match = self.right_match.write();
for (idx, p) in p_array.iter().enumerate() {
if let ScalarRef::Number(NumberScalar::Int64(val)) =
unsafe { l1_index_column.index_unchecked(*p as usize) }
Expand Down Expand Up @@ -365,27 +387,161 @@ impl RangeJoinState {
let left_table = self.left_table.read();
let right_table = self.right_table.read();
let mut indices = Vec::with_capacity(left_buffer.len());
let mut column_builder =
NumberColumnBuilder::with_capacity(&NumberDataType::UInt64, left_buffer.len());
for res in left_buffer.iter() {
indices.push((0u32, *res as u32, 1usize));
if !left_match.is_empty() {
column_builder.push(NumberScalar::UInt64((*res + left_offset) as u64));
}
}
let mut left_result_block =
DataBlock::take_blocks(&left_table[left_idx..left_idx + 1], &indices, indices.len());
indices.clear();
for res in right_buffer.iter() {
indices.push((0u32, *res as u32, 1usize));
if !right_match.is_empty() {
column_builder.push(NumberScalar::UInt64((*res + right_offset) as u64));
}
}
let right_result_block = DataBlock::take_blocks(
&right_table[right_idx..right_idx + 1],
&indices,
indices.len(),
);
// Merge left_result_block and right_result_block
for col in right_result_block.columns() {
left_result_block.add_column(col.clone());
}
if !left_match.is_empty() || !right_match.is_empty() {
left_result_block.add_column(BlockEntry::new(
DataType::Number(NumberDataType::UInt64),
Value::Column(Column::Number(column_builder.build())),
));
}
for filter in self.other_conditions.iter() {
left_result_block = filter_block(left_result_block, filter)?;
}
if !left_match.is_empty() || !right_match.is_empty() {
let column = &left_result_block
.columns()
.last()
.unwrap()
.value
.try_downcast::<UInt64Type>()
.unwrap();
if let ValueRef::Column(col) = column.as_ref() {
for val in UInt64Type::iter_column(&col) {
if !left_match.is_empty() {
left_match.set(val as usize, true);
}
if !right_match.is_empty() {
right_match.set(val as usize, true);
}
}
}
left_result_block.pop_columns(1);
}
Ok(left_result_block)
}

pub fn fill_left_outer(&self, task_id: usize) -> Result<DataBlock> {
let partition_count = self.partition_count.load(Ordering::SeqCst) as usize;
let mut completed = self.completed_pair.load(Ordering::SeqCst) as usize;
while completed < partition_count {
std::thread::sleep(Duration::from_millis(10));
completed = self.completed_pair.load(Ordering::SeqCst) as usize;
}

let block_size = self.ctx.get_settings().get_max_block_size()? as usize;
let tasks = self.tasks.read();
let (left_idx, right_idx) = tasks[task_id];
let row_offset = self.row_offset.read();
let (left_offset, _right_offset) = row_offset[task_id];
let left_table = self.left_table.read();
let right_table = self.right_table.read();
let mut indices = Vec::with_capacity(block_size);
let left_match = self.left_match.read();

for (i, state) in left_match
.iter()
.enumerate()
.skip(left_offset)
.take(left_table[left_idx].num_rows())
{
if !(state) {
indices.push((0u32, (i - left_offset) as u32, 1usize));
}
}
if indices.is_empty() {
return Ok(DataBlock::empty());
}
let mut left_result_block =
DataBlock::take_blocks(&left_table[left_idx..left_idx + 1], &indices, indices.len());
let nullable_columns = right_table[right_idx]
.columns()
.iter()
.map(|c| BlockEntry {
value: Value::Scalar(Scalar::Null),
data_type: c.data_type.wrap_nullable(),
})
.collect::<Vec<_>>();
let right_result_block = DataBlock::new(nullable_columns, indices.len());
// Merge left_result_block and right_result_block
for col in right_result_block.columns() {
left_result_block.add_column(col.clone());
}
Ok(left_result_block)
}

pub fn fill_right_outer(&self, task_id: usize) -> Result<DataBlock> {
let partition_count = self.partition_count.load(Ordering::SeqCst) as usize;
let mut completed = self.completed_pair.load(Ordering::SeqCst) as usize;
while completed < partition_count {
std::thread::sleep(Duration::from_millis(10));
completed = self.completed_pair.load(Ordering::SeqCst) as usize;
}

let block_size = self.ctx.get_settings().get_max_block_size()? as usize;
let tasks = self.tasks.read();
let (left_idx, right_idx) = tasks[task_id];
let row_offset = self.row_offset.read();
let (_, right_offset) = row_offset[task_id];
let left_table = self.left_table.read();
let right_table = self.right_table.read();
let mut indices = Vec::with_capacity(block_size);
let right_match = self.right_match.read();

for (i, state) in right_match
.iter()
.enumerate()
.skip(right_offset)
.take(right_table[right_idx].num_rows())
{
if !(state) {
indices.push((0u32, (i - right_offset) as u32, 1usize));
}
}
if indices.is_empty() {
return Ok(DataBlock::empty());
}
let nullable_columns = left_table[left_idx]
.columns()
.iter()
.map(|c| BlockEntry {
value: Value::Scalar(Scalar::Null),
data_type: c.data_type.wrap_nullable(),
})
.collect::<Vec<_>>();
let mut left_result_block = DataBlock::new(nullable_columns, indices.len());
let right_result_block = DataBlock::take_blocks(
&right_table[right_idx..right_idx + 1],
&indices,
indices.len(),
);
// Merge left_result_block and right_result_block
for col in right_result_block.columns() {
left_result_block.add_column(col.clone());
}
Ok(left_result_block)
}
}
Loading
Loading