Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

next encoder #187

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions .github/workflows/bench.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ name: bench

on:
workflow_dispatch:
# TODO remove
pull_request:
schedule:
- cron: "42 9 * * *"

Expand Down Expand Up @@ -47,5 +49,6 @@ jobs:

# - run: mix run bench/cast.exs
- run: mix run bench/encode.exs
- run: mix run bench/insert.exs
- run: mix run bench/stream.exs
# TODO uncomment
# - run: mix run bench/insert.exs
# - run: mix run bench/stream.exs
282 changes: 264 additions & 18 deletions bench/encode.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,272 @@ IO.puts("""
This benchmark measures the performance of encoding rows in RowBinary format.
""")

alias Ch.RowBinary
defmodule ClickhouseEventV2 do
use Ecto.Schema

types = ["UInt64", "String", "Array(UInt8)", "DateTime"]
@primary_key false
schema "events_v2" do
field(:name, Ch, type: "LowCardinality(String)")
field(:site_id, Ch, type: "UInt64")
field(:hostname, :string)
field(:pathname, :string)
field(:user_id, Ch, type: "UInt64")
field(:session_id, Ch, type: "UInt64")
field(:timestamp, :naive_datetime)

rows = fn count ->
Enum.map(1..count, fn i ->
[i, "Golang SQL database driver", [1, 2, 3, 4, 5, 6, 7, 8, 9], DateTime.utc_now()]
end)
field(:"meta.key", {:array, :string})
field(:"meta.value", {:array, :string})

field(:revenue_source_amount, Ch, type: "Nullable(Decimal64(3))")
field(:revenue_source_currency, Ch, type: "FixedString(3)")
field(:revenue_reporting_amount, Ch, type: "Nullable(Decimal64(3))")
field(:revenue_reporting_currency, Ch, type: "FixedString(3)")

# Session attributes
field(:referrer, :string)
field(:referrer_source, :string)
field(:utm_medium, :string)
field(:utm_source, :string)
field(:utm_campaign, :string)
field(:utm_content, :string)
field(:utm_term, :string)

field(:country_code, Ch, type: "FixedString(2)")
field(:subdivision1_code, Ch, type: "LowCardinality(String)")
field(:subdivision2_code, Ch, type: "LowCardinality(String)")
field(:city_geoname_id, Ch, type: "UInt32")

field(:screen_size, Ch, type: "LowCardinality(String)")
field(:operating_system, Ch, type: "LowCardinality(String)")
field(:operating_system_version, Ch, type: "LowCardinality(String)")
field(:browser, Ch, type: "LowCardinality(String)")
field(:browser_version, Ch, type: "LowCardinality(String)")
end
end

Benchee.run(
%{
"RowBinary" => fn rows -> RowBinary.encode_rows(rows, types) end,
"RowBinary stream" => fn rows ->
Stream.chunk_every(rows, 60_000)
|> Stream.each(fn chunk -> RowBinary.encode_rows(chunk, types) end)
|> Stream.run()
defmodule Ecto.Adapters.ClickHouse.Schema do
def remap_type(type, schema, field) do
remap_type(Ecto.Type.type(type), type, schema, field)
end

defp remap_type({:parameterized, {Ch, t}}, _original, _schema, _field), do: t

defp remap_type(t, _original, _schema, _field)
when t in [:string, :date, :uuid, :boolean],
do: t

defp remap_type(dt, _original, _schema, _field)
when dt in [:naive_datetime, :utc_datetime],
do: :datetime

defp remap_type(usec, _original, _schema, _field)
when usec in [:naive_datetime_usec, :utc_datetime_usec],
do: {:datetime64, _precision = 6}

# TODO remove
defp remap_type(t, _original, _schema, _field)
when t in [:binary, :binary_id],
do: :string

# TODO remove
for size <- [8, 16, 32, 64, 128, 256] do
defp remap_type(unquote(:"u#{size}") = u, _original, _schema, _field), do: u
defp remap_type(unquote(:"i#{size}") = i, _original, _schema, _field), do: i
end

defp remap_type({:array = a, t}, original, schema, field),
do: {a, remap_type(t, original, schema, field)}
end

defmodule Bench do
fields = ClickhouseEventV2.__schema__(:fields)

types =
Enum.map(fields, fn field ->
type = ClickhouseEventV2.__schema__(:type, field) || raise "missing type for #{field}"

type
|> Ecto.Type.type()
|> Ecto.Adapters.ClickHouse.Schema.remap_type(ClickhouseEventV2, field)
end)

encoding_types = Ch.RowBinary.encoding_types(types)

def current_encoder(event) do
[Enum.map(unquote(fields), fn field -> Map.fetch!(event, field) end)]
|> Ch.RowBinary._encode_rows(unquote(encoding_types))

# |> IO.iodata_to_binary()
end

def next_encoder(%{
name: name,
site_id: site_id,
hostname: hostname,
pathname: pathname,
user_id: user_id,
session_id: session_id,
timestamp: timestamp,
"meta.key": meta_keys,
"meta.value": meta_values,
revenue_source_amount: revenue_source_amount,
revenue_source_currency: revenue_source_currency,
revenue_reporting_amount: revenue_reporting_amount,
revenue_reporting_currency: revenue_reporting_currency,
referrer: referrer,
referrer_source: referrer_source,
utm_medium: utm_medium,
utm_source: utm_source,
utm_campaign: utm_campaign,
utm_content: utm_content,
utm_term: utm_term,
country_code: country_code,
subdivision1_code: subdivision1_code,
subdivision2_code: subdivision2_code,
city_geoname_id: city_geoname_id,
screen_size: screen_size,
operating_system: operating_system,
operating_system_version: operating_system_version,
browser: browser,
browser_version: browser_version
}) do
site_id = site_id || 0
user_id = user_id || 0
session_id = session_id || 0
city_geoname_id = city_geoname_id || 0
timestamp = if timestamp, do: to_unix(timestamp), else: 0

[
encode_string(name),
<<site_id::64-unsigned>>,
encode_string(hostname),
encode_string(pathname),
<<user_id::64-unsigned, session_id::64-unsigned, timestamp::16-unsigned>>,
encode_string_array(meta_keys),
encode_string_array(meta_values),
encode_nullable(revenue_source_amount),
encode_fixed_string_3(revenue_source_currency),
encode_nullable(revenue_reporting_amount),
encode_fixed_string_3(revenue_reporting_currency),
encode_string(referrer),
encode_string(referrer_source),
encode_string(utm_medium),
encode_string(utm_source),
encode_string(utm_campaign),
encode_string(utm_content),
encode_string(utm_term),
encode_fixed_string_2(country_code),
encode_string(subdivision1_code),
encode_string(subdivision2_code),
<<city_geoname_id::32-unsigned>>,
encode_string(screen_size),
encode_string(operating_system),
encode_string(operating_system_version),
encode_string(browser),
encode_string(browser_version)
]
end

# @compile inline: [encode_varint: 1]
defp encode_varint(i) when i < 128, do: i
defp encode_varint(i), do: encode_varint_cont(i)

defp encode_varint_cont(i) when i < 128, do: <<i>>

import Bitwise

defp encode_varint_cont(i) do
[(i &&& 0b0111_1111) ||| 0b1000_0000 | encode_varint_cont(i >>> 7)]
end

@compile inline: [encode_string: 1]
defp encode_string(str) when is_binary(str) do
[encode_varint(byte_size(str)) | str]
end

defp encode_string(nil), do: 0

@compile inline: [encode_string_array: 1]
def encode_string_array([]), do: 0
# def encode_string_array([e1]), do: [1, encode_varint(byte_size(e1)) | e1]

# def encode_string_array([e1, e2]) do
# [2, encode_varint(byte_size(e1)), e1, encode_varint(byte_size(e2)), e2]
# end

# def encode_string_array([e1, e2, e3]) do
# [
# 2,
# encode_varint(byte_size(e1)),
# e1,
# encode_varint(byte_size(e2)),
# e2,
# encode_varint(byte_size(e3)),
# e3
# ]
# end

def encode_string_array([_ | _] = arr) do
[encode_varint(length(arr)) | encode_strings(arr)]
end

def encode_string_array(nil), do: 0

defp encode_strings([s | rest]), do: [encode_string(s) | encode_strings(rest)]
defp encode_strings([]), do: []

@compile inline: [encode_fixed_string_2: 1]
defp encode_fixed_string_2(str) when byte_size(str) == 2, do: str
defp encode_fixed_string_2(nil), do: <<0, 0>>

@compile inline: [encode_fixed_string_3: 1]
defp encode_fixed_string_3(str) when byte_size(str) == 3, do: str
defp encode_fixed_string_3(nil), do: <<0, 0, 0>>

@compile inline: [encode_nullable: 1]
defp encode_nullable(nil), do: 1

defp to_unix(%{year: year, month: month, day: day, hour: hour, minute: minute, second: second}) do
to_unix(year, month, day, hour, minute, second)
end

@compile inline: [to_unix: 6]

for year <- 2024..2025, month <- 1..12 do
epoch = DateTime.to_unix(DateTime.new!(Date.new!(year, month, 1), Time.new!(0, 0, 0)))

defp to_unix(unquote(year), unquote(month), day, hour, minute, second) do
unquote(epoch) + (day - 1) * 86400 + hour * 3600 + minute * 60 + second
end
},
inputs: %{
"1_000_000 (UInt64, String, Array(UInt8), DateTime) rows" => rows.(1_000_000)
}
)
end

def run do
Benchee.run(
%{
"current encoder" => &Bench.current_encoder/1,
"next encoder" => &Bench.next_encoder/1
},
# profile_after: true,
inputs: %{
# "empty" => %ClickhouseEventV2{},
"pageview" => %ClickhouseEventV2{
name: "pageview",
site_id: 3,
hostname: "stats.copycat.fun",
pathname: "/",
user_id: 18_167_959_095_776_540_841,
session_id: 18_173_968_599_266_389_340,
timestamp: ~N[2024-12-13 12:53:59],
country_code: "TH",
screen_size: "Mobile",
operating_system: "iOS",
operating_system_version: "17.5",
browser: "Safari",
browser_version: "17.5"
}
}
)
end
end

Bench.run()
Loading