Skip to content

Commit

Permalink
Merge branch 'main' into support_arrays_overlap
Browse files Browse the repository at this point in the history
  • Loading branch information
KevinyhZou authored Aug 20, 2024
2 parents abb4e15 + 4c52bdd commit 0039b63
Show file tree
Hide file tree
Showing 18 changed files with 454 additions and 44 deletions.
18 changes: 11 additions & 7 deletions .github/workflows/clickhouse_be_trigger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,21 @@ on:
jobs:
add-comment:
runs-on: ubuntu-latest
permissions: write-all
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Sleep for Dev PR workflow done
run: |
sleep 15
- name: Add comment to PR
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
COMMENT="Run Gluten Clickhouse CI"
URL=$(jq -r .pull_request.comments_url "$GITHUB_EVENT_PATH")
curl -H "Authorization: token ${GITHUB_TOKEN}" -X POST -d "{\"body\":\"$COMMENT\"}" "${URL}"
uses: actions/github-script@v7
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
script: |
await github.rest.issues.createComment({
owner: context.repo.owner,
repo: context.repo.repo,
issue_number: context.payload.number,
body: "Run Gluten Clickhouse CI"
});
Original file line number Diff line number Diff line change
Expand Up @@ -198,4 +198,24 @@ class GlutenClickhouseFunctionSuite extends GlutenClickHouseTPCHAbstractSuite {
}
}

test("duplicate column name issue") {
withTable("left_table", "right_table") {
sql("create table left_table(id int, name string) using orc")
sql("create table right_table(id int, book string) using orc")
sql("insert into left_table values (1,'a'),(2,'b'),(3,'c'),(4,'d')")
sql("insert into right_table values (1,'a'),(1,'b'),(2,'c'),(2,'d')")
compareResultsAgainstVanillaSpark(
"""
|select p1.id, p1.name, p2.book
| from left_table p1 left join
| (select id, id, book
| from right_table where id <= 2) p2
| on p1.id=p2.id
|""".stripMargin,
true,
{ _ => }
)
}
}

}
4 changes: 2 additions & 2 deletions cpp-ch/clickhouse.version
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
CH_ORG=Kyligence
CH_BRANCH=rebase_ch/20240817
CH_COMMIT=ed191291681
CH_BRANCH=rebase_ch/20240820
CH_COMMIT=b5b8245b022
7 changes: 3 additions & 4 deletions cpp-ch/local-engine/Common/GlutenConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

#pragma once

#include <base/unit.h>
#include <base/types.h>
#include <Interpreters/Context.h>
#include <base/types.h>
#include <base/unit.h>

namespace local_engine
{
Expand Down Expand Up @@ -104,7 +104,7 @@ struct JoinConfig
bool prefer_multi_join_on_clauses = true;
size_t multi_join_on_clauses_build_side_rows_limit = 10000000;

static JoinConfig loadFromContext(DB::ContextPtr context)
static JoinConfig loadFromContext(const DB::ContextPtr & context)
{
JoinConfig config;
config.prefer_multi_join_on_clauses = context->getConfigRef().getBool(PREFER_MULTI_JOIN_ON_CLAUSES, true);
Expand Down Expand Up @@ -198,4 +198,3 @@ struct GlutenJobSchedulerConfig
}
};
}

23 changes: 18 additions & 5 deletions cpp-ch/local-engine/Join/BroadCastJoinBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,26 @@ jlong callJavaGet(const std::string & id)
DB::Block resetBuildTableBlockName(Block & block, bool only_one = false)
{
DB::ColumnsWithTypeAndName new_cols;
std::set<std::string> names;
int32_t seq = 0;
for (const auto & col : block)
{
// Add a prefix to avoid column name conflicts with left table.
new_cols.emplace_back(col.column, col.type, BlockUtil::RIHGT_COLUMN_PREFIX + col.name);

if (only_one)
break;
// Add a prefix to avoid column name conflicts with left table.
std::stringstream new_name;
// add a sequence to avoid duplicate name in some rare cases
if (names.find(col.name) == names.end())
{
new_name << BlockUtil::RIHGT_COLUMN_PREFIX << col.name;
names.insert(col.name);
}
else
{
new_name << BlockUtil::RIHGT_COLUMN_PREFIX << (seq++) << "_" << col.name;
}
new_cols.emplace_back(col.column, col.type, new_name.str());

if (only_one)
break;
}
return DB::Block(new_cols);
}
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Parser/JoinRelParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ DB::QueryPlanPtr JoinRelParser::parseJoin(const substrait::JoinRel & join, DB::Q
google::protobuf::StringValue optimization_info;
optimization_info.ParseFromString(join.advanced_extension().optimization().value());
auto join_opt_info = JoinOptimizationInfo::parse(optimization_info.value());
LOG_ERROR(getLogger("JoinRelParser"), "optimizaiton info:{}", optimization_info.value());
LOG_DEBUG(getLogger("JoinRelParser"), "optimization info:{}", optimization_info.value());
auto storage_join = join_opt_info.is_broadcast ? BroadCastJoinBuilder::getJoin(join_opt_info.storage_join_key) : nullptr;
if (storage_join)
{
Expand Down
96 changes: 74 additions & 22 deletions cpp-ch/local-engine/Storages/Parquet/ParquetConverter.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,23 @@ template <typename PhysicalType>
struct ToParquet
{
using T = typename PhysicalType::c_type;
T as(const DB::Field & value, const parquet::ColumnDescriptor &)
T as(const DB::Field & value, const parquet::ColumnDescriptor & s)
{
if constexpr (std::is_same_v<PhysicalType, parquet::Int32Type>)
return static_cast<T>(value.safeGet<Int64>());
if (s.logical_type()->is_decimal())
{
if constexpr (std::is_same_v<PhysicalType, parquet::Int32Type>)
{
const auto v = value.safeGet<DB::DecimalField<DB::Decimal32>>();
return v.getValue().value;
}
if constexpr (std::is_same_v<PhysicalType, parquet::Int64Type>)
{
const auto v = value.safeGet<DB::DecimalField<DB::Decimal64>>();
return v.getValue().value;
}
}
// parquet::BooleanType, parquet::Int64Type, parquet::FloatType, parquet::DoubleType
return value.safeGet<T>(); // FLOAT, DOUBLE, INT64
return value.safeGet<T>(); // FLOAT, DOUBLE, INT64, Int32
}
};

Expand All @@ -57,36 +68,52 @@ struct ToParquet<parquet::ByteArrayType>
}
};

template <typename T>
parquet::FixedLenByteArray convertField(const DB::Field & value, uint8_t * buf, size_t type_length)
{
assert(sizeof(T) >= type_length);

T val = value.safeGet<DB::DecimalField<DB::Decimal<T>>>().getValue().value;
std::reverse(reinterpret_cast<char *>(&val), reinterpret_cast<char *>(&val) + sizeof(T));
const int offset = sizeof(T) - type_length;

memcpy(buf, reinterpret_cast<char *>(&val) + offset, type_length);
return parquet::FixedLenByteArray(buf);
}

template <>
struct ToParquet<parquet::FLBAType>
{
uint8_t buf[256];
uint8_t buf[16];
using T = parquet::FixedLenByteArray;
T as(const DB::Field & value, const parquet::ColumnDescriptor & descriptor)
{
if (value.getType() != DB::Field::Types::Decimal128)
throw DB::Exception(
DB::ErrorCodes::LOGICAL_ERROR, "Field type '{}' for FIXED_LEN_BYTE_ARRAY is not supported", value.getTypeName());
static_assert(sizeof(Int128) <= sizeof(buf));
if (descriptor.type_length() > sizeof(Int128))
if (value.getType() == DB::Field::Types::Decimal256)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Field type '{}' is not supported", value.getTypeName());

static_assert(sizeof(Int128) == sizeof(buf));

if (descriptor.type_length() > sizeof(buf))
throw DB::Exception(
DB::ErrorCodes::LOGICAL_ERROR,
"descriptor.type_length() = {} , which is > {}, e.g. sizeof(Int128)",
"descriptor.type_length() = {} , which is > {}, e.g. sizeof(buf)",
descriptor.type_length(),
sizeof(Int128));
Int128 val = value.safeGet<DB::DecimalField<DB::Decimal128>>().getValue();
std::reverse(reinterpret_cast<char *>(&val), reinterpret_cast<char *>(&val) + sizeof(val));
const int offset = sizeof(Int128) - descriptor.type_length();
memcpy(buf, reinterpret_cast<char *>(&val) + offset, descriptor.type_length());
return parquet::FixedLenByteArray(buf);
sizeof(buf));

if (value.getType() == DB::Field::Types::Decimal32)
return convertField<Int32>(value, buf, descriptor.type_length());
if (value.getType() == DB::Field::Types::Decimal64)
return convertField<Int64>(value, buf, descriptor.type_length());

return convertField<Int128>(value, buf, descriptor.type_length());
}
};

// Int32 Int64 Float Double
template <typename DType, typename Col>
struct ConverterNumeric
{
using From = typename Col::Container::value_type;
using From = typename Col::ValueType;
using To = typename DType::c_type;

const Col & column;
Expand Down Expand Up @@ -119,6 +146,7 @@ using ConverterInt64 = ConverterNumeric<parquet::Int64Type, DB::ColumnVector<Int
using ConverterInt64_u = ConverterNumeric<parquet::Int64Type, DB::ColumnVector<UInt64>>;

using ConverterDouble = ConverterNumeric<parquet::DoubleType, DB::ColumnVector<Float64>>;
using ConverterFloat = ConverterNumeric<parquet::FloatType, DB::ColumnVector<Float32>>;

struct ConverterString
{
Expand All @@ -141,7 +169,7 @@ struct ConverterString

/// Like ConverterNumberAsFixedString, but converts to big-endian. Because that's the byte order
/// Parquet uses for decimal types and literally nothing else, for some reason.
template <typename T>
template <DB::is_decimal T>
struct ConverterDecimal
{
const parquet::ColumnDescriptor & descriptor;
Expand All @@ -165,7 +193,7 @@ struct ConverterDecimal
data_buf.resize(count * sizeof(T));
ptr_buf.resize(count);
memcpy(data_buf.data(), reinterpret_cast<const char *>(column.getData().data() + offset), count * sizeof(T));
const size_t offset_in_buf = sizeof(Int128) - descriptor.type_length();
const size_t offset_in_buf = sizeof(T) - descriptor.type_length();
;
for (size_t i = 0; i < count; ++i)
{
Expand All @@ -176,6 +204,13 @@ struct ConverterDecimal
}
};

using Decimal128ToFLB = ConverterDecimal<DB::Decimal128>;
using Decimal64ToFLB = ConverterDecimal<DB::Decimal64>;
using Decimal32ToFLB = ConverterDecimal<DB::Decimal32>;

using ConverterDecimal32 = ConverterNumeric<parquet::Int32Type, DB::ColumnDecimal<DB::Decimal32>>;
using ConverterDecimal64 = ConverterNumeric<parquet::Int64Type, DB::ColumnDecimal<DB::Decimal64>>;

class BaseConverter
{
public:
Expand Down Expand Up @@ -239,6 +274,8 @@ std::shared_ptr<ParquetConverter<DType>> ParquetConverter<DType>::Make(const DB:
case TypeIndex::UInt32:
result = std::make_shared<ParquetConverterImpl<parquet::Int32Type, ConverterInt32_u>>(ConverterInt32_u(c));
break;
case TypeIndex::Decimal32:
result = std::make_shared<ParquetConverterImpl<parquet::Int32Type, ConverterDecimal32>>(ConverterDecimal32(c));
default:
break;
}
Expand All @@ -251,13 +288,23 @@ std::shared_ptr<ParquetConverter<DType>> ParquetConverter<DType>::Make(const DB:
case TypeIndex::UInt64:
result = std::make_shared<ParquetConverterImpl<parquet::Int64Type, ConverterInt64_u>>(ConverterInt64_u(c));
break;
case TypeIndex::Decimal64:
result = std::make_shared<ParquetConverterImpl<parquet::Int64Type, ConverterDecimal64>>(ConverterDecimal64(c));
default:
break;
}
break;
case parquet::Type::INT96:
break;
case parquet::Type::FLOAT:
switch (c->getDataType())
{
case TypeIndex::Float32:
result = std::make_shared<ParquetConverterImpl<parquet::FloatType, ConverterFloat>>(ConverterFloat(c));
break;
default:
break;
}
break;
case parquet::Type::DOUBLE:
switch (c->getDataType())
Expand All @@ -283,8 +330,13 @@ std::shared_ptr<ParquetConverter<DType>> ParquetConverter<DType>::Make(const DB:
switch (c->getDataType())
{
case TypeIndex::Decimal128:
result = std::make_shared<ParquetConverterImpl<parquet::FLBAType, ConverterDecimal<Decimal128>>>(
ConverterDecimal<Decimal128>(c, desc));
result = std::make_shared<ParquetConverterImpl<parquet::FLBAType, Decimal128ToFLB>>(Decimal128ToFLB(c, desc));
break;
case TypeIndex::Decimal64:
result = std::make_shared<ParquetConverterImpl<parquet::FLBAType, Decimal64ToFLB>>(Decimal64ToFLB(c, desc));
break;
case TypeIndex::Decimal32:
result = std::make_shared<ParquetConverterImpl<parquet::FLBAType, Decimal32ToFLB>>(Decimal32ToFLB(c, desc));
break;
default:
break;
Expand Down
Loading

0 comments on commit 0039b63

Please sign in to comment.