Skip to content

Commit

Permalink
[READY-FOR-REVIEW]: Fix Bebop duplicates bug (#6269)
Browse files Browse the repository at this point in the history
* fix inconsistency

* return model back

* change union_all to union

* remove duplicates
  • Loading branch information
B1boid authored Jul 8, 2024
1 parent 16a1fca commit 34e5ef4
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ WITH
raw_call_data AS (
SELECT
fun_type, call_success, call_block_time, call_block_number, call_tx_hash, contract_address, "order",
ROW_NUMBER() OVER (PARTITION BY call_tx_hash ORDER BY call_block_number) AS row_num
ROW_NUMBER() OVER (PARTITION BY call_tx_hash) AS row_num
FROM (
SELECT
'Single' as fun_type, call_success, call_block_time, call_block_number, call_tx_hash, contract_address, "order"
Expand Down Expand Up @@ -187,7 +187,17 @@ unnested_taker_arrays AS (
element_at(taker_amounts, sequence_number) AS taker_token_amount,
order_index,
sequence_number - 1 AS taker_token_index
FROM (SELECT * FROM raw_bebop_multi_trade UNION ALL SELECT * FROM unnested_aggregate_orders)
FROM (
SELECT
block_time, block_number, tx_hash, evt_index, contract_address, taker_address, maker_address,
taker_tokens, maker_tokens, taker_amounts, maker_amounts, taker_tokens_len, maker_tokens_len, order_index
FROM raw_bebop_multi_trade
UNION
SELECT
block_time, block_number, tx_hash, evt_index, contract_address, taker_address, maker_address,
taker_tokens, maker_tokens, taker_amounts, maker_amounts, taker_tokens_len, maker_tokens_len, order_index
FROM unnested_aggregate_orders
)
CROSS JOIN UNNEST(sequence(1, taker_tokens_len)) AS t(sequence_number)
),
bebop_multi_and_aggregate_trades AS (
Expand All @@ -213,6 +223,23 @@ bebop_multi_and_aggregate_trades AS (
cast(array[order_index, taker_token_index, sequence_number - 1] as array<bigint>) as trace_address
FROM unnested_taker_arrays
CROSS JOIN UNNEST(sequence(1, maker_tokens_len)) AS t(sequence_number)
),
all_trades as (
SELECT
block_time, block_number, tx_hash, evt_index, contract_address, taker_address, maker_address, taker_token_address,
maker_token_address, taker_token_amount, maker_token_amount, trade_type, taker_tokens_len, maker_tokens_len, trace_address,
ROW_NUMBER() OVER (PARTITION BY tx_hash, evt_index, trace_address ORDER BY evt_index) AS row_num
FROM (
SELECT
block_time, block_number, tx_hash, evt_index, contract_address, taker_address, maker_address, taker_token_address,
maker_token_address, taker_token_amount, maker_token_amount, trade_type, taker_tokens_len, maker_tokens_len, trace_address
FROM bebop_single_trade
UNION ALL
SELECT
block_time, block_number, tx_hash, evt_index, contract_address, taker_address, maker_address, taker_token_address,
maker_token_address, taker_token_amount, maker_token_amount, trade_type, taker_tokens_len, maker_tokens_len, trace_address
FROM bebop_multi_and_aggregate_trades
)
)

SELECT
Expand Down Expand Up @@ -257,9 +284,9 @@ SELECT
tx.to AS tx_to,
t.trace_address,
t.evt_index
FROM (SELECT * FROM bebop_single_trade UNION ALL SELECT * FROM bebop_multi_and_aggregate_trades) t
INNER JOIN
{{ source('arbitrum', 'transactions')}} tx
FROM (select * from all_trades where row_num = 1) t
INNER JOIN
{{ source('arbitrum', 'transactions')}} tx
ON t.tx_hash = tx.hash
{% if not is_incremental() %}
AND tx.block_time >= TIMESTAMP '{{project_start_date}}'
Expand Down
37 changes: 32 additions & 5 deletions dex/models/_projects/bebop/base/bebop_blend_base_trades.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ WITH
raw_call_data AS (
SELECT
fun_type, call_success, call_block_time, call_block_number, call_tx_hash, contract_address, "order",
ROW_NUMBER() OVER (PARTITION BY call_tx_hash ORDER BY call_block_number) AS row_num
ROW_NUMBER() OVER (PARTITION BY call_tx_hash) AS row_num
FROM (
SELECT
'Single' as fun_type, call_success, call_block_time, call_block_number, call_tx_hash, contract_address, "order"
Expand Down Expand Up @@ -187,7 +187,17 @@ unnested_taker_arrays AS (
element_at(taker_amounts, sequence_number) AS taker_token_amount,
order_index,
sequence_number - 1 AS taker_token_index
FROM (SELECT * FROM raw_bebop_multi_trade UNION ALL SELECT * FROM unnested_aggregate_orders)
FROM (
SELECT
block_time, block_number, tx_hash, evt_index, contract_address, taker_address, maker_address,
taker_tokens, maker_tokens, taker_amounts, maker_amounts, taker_tokens_len, maker_tokens_len, order_index
FROM raw_bebop_multi_trade
UNION
SELECT
block_time, block_number, tx_hash, evt_index, contract_address, taker_address, maker_address,
taker_tokens, maker_tokens, taker_amounts, maker_amounts, taker_tokens_len, maker_tokens_len, order_index
FROM unnested_aggregate_orders
)
CROSS JOIN UNNEST(sequence(1, taker_tokens_len)) AS t(sequence_number)
),
bebop_multi_and_aggregate_trades AS (
Expand All @@ -213,6 +223,23 @@ bebop_multi_and_aggregate_trades AS (
cast(array[order_index, taker_token_index, sequence_number - 1] as array<bigint>) as trace_address
FROM unnested_taker_arrays
CROSS JOIN UNNEST(sequence(1, maker_tokens_len)) AS t(sequence_number)
),
all_trades as (
SELECT
block_time, block_number, tx_hash, evt_index, contract_address, taker_address, maker_address, taker_token_address,
maker_token_address, taker_token_amount, maker_token_amount, trade_type, taker_tokens_len, maker_tokens_len, trace_address,
ROW_NUMBER() OVER (PARTITION BY tx_hash, evt_index, trace_address ORDER BY evt_index) AS row_num
FROM (
SELECT
block_time, block_number, tx_hash, evt_index, contract_address, taker_address, maker_address, taker_token_address,
maker_token_address, taker_token_amount, maker_token_amount, trade_type, taker_tokens_len, maker_tokens_len, trace_address
FROM bebop_single_trade
UNION ALL
SELECT
block_time, block_number, tx_hash, evt_index, contract_address, taker_address, maker_address, taker_token_address,
maker_token_address, taker_token_amount, maker_token_amount, trade_type, taker_tokens_len, maker_tokens_len, trace_address
FROM bebop_multi_and_aggregate_trades
)
)

SELECT
Expand Down Expand Up @@ -257,9 +284,9 @@ SELECT
tx.to AS tx_to,
t.trace_address,
t.evt_index
FROM (SELECT * FROM bebop_single_trade UNION ALL SELECT * FROM bebop_multi_and_aggregate_trades) t
INNER JOIN
{{ source('base', 'transactions')}} tx
FROM (select * from all_trades where row_num = 1) t
INNER JOIN
{{ source('base', 'transactions')}} tx
ON t.tx_hash = tx.hash
{% if not is_incremental() %}
AND tx.block_time >= TIMESTAMP '{{project_start_date}}'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
{{ config(
tags = ['prod_exclude'],
schema = 'bebop_blend_ethereum',
alias = 'trades',
partition_by = ['block_month'],
Expand All @@ -15,7 +14,7 @@ WITH
raw_call_data AS (
SELECT
fun_type, call_success, call_block_time, call_block_number, call_tx_hash, contract_address, "order",
ROW_NUMBER() OVER (PARTITION BY call_tx_hash ORDER BY call_block_number) AS row_num
ROW_NUMBER() OVER (PARTITION BY call_tx_hash) AS row_num
FROM (
SELECT
'Single' as fun_type, call_success, call_block_time, call_block_number, call_tx_hash, contract_address, "order"
Expand Down Expand Up @@ -187,7 +186,17 @@ unnested_taker_arrays AS (
element_at(taker_amounts, sequence_number) AS taker_token_amount,
order_index,
sequence_number - 1 AS taker_token_index
FROM (SELECT * FROM raw_bebop_multi_trade UNION ALL SELECT * FROM unnested_aggregate_orders)
FROM (
SELECT
block_time, block_number, tx_hash, evt_index, contract_address, taker_address, maker_address,
taker_tokens, maker_tokens, taker_amounts, maker_amounts, taker_tokens_len, maker_tokens_len, order_index
FROM raw_bebop_multi_trade
UNION
SELECT
block_time, block_number, tx_hash, evt_index, contract_address, taker_address, maker_address,
taker_tokens, maker_tokens, taker_amounts, maker_amounts, taker_tokens_len, maker_tokens_len, order_index
FROM unnested_aggregate_orders
)
CROSS JOIN UNNEST(sequence(1, taker_tokens_len)) AS t(sequence_number)
),
bebop_multi_and_aggregate_trades AS (
Expand All @@ -213,6 +222,23 @@ bebop_multi_and_aggregate_trades AS (
cast(array[order_index, taker_token_index, sequence_number - 1] as array<bigint>) as trace_address
FROM unnested_taker_arrays
CROSS JOIN UNNEST(sequence(1, maker_tokens_len)) AS t(sequence_number)
),
all_trades as (
SELECT
block_time, block_number, tx_hash, evt_index, contract_address, taker_address, maker_address, taker_token_address,
maker_token_address, taker_token_amount, maker_token_amount, trade_type, taker_tokens_len, maker_tokens_len, trace_address,
ROW_NUMBER() OVER (PARTITION BY tx_hash, evt_index, trace_address ORDER BY evt_index) AS row_num
FROM (
SELECT
block_time, block_number, tx_hash, evt_index, contract_address, taker_address, maker_address, taker_token_address,
maker_token_address, taker_token_amount, maker_token_amount, trade_type, taker_tokens_len, maker_tokens_len, trace_address
FROM bebop_single_trade
UNION ALL
SELECT
block_time, block_number, tx_hash, evt_index, contract_address, taker_address, maker_address, taker_token_address,
maker_token_address, taker_token_amount, maker_token_amount, trade_type, taker_tokens_len, maker_tokens_len, trace_address
FROM bebop_multi_and_aggregate_trades
)
)

SELECT
Expand Down Expand Up @@ -257,9 +283,9 @@ SELECT
tx.to AS tx_to,
t.trace_address,
t.evt_index
FROM (SELECT * FROM bebop_single_trade UNION ALL SELECT * FROM bebop_multi_and_aggregate_trades) t
INNER JOIN
{{ source('ethereum', 'transactions')}} tx
FROM (select * from all_trades where row_num = 1) t
INNER JOIN
{{ source('ethereum', 'transactions')}} tx
ON t.tx_hash = tx.hash
{% if not is_incremental() %}
AND tx.block_time >= TIMESTAMP '{{project_start_date}}'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@

{% set bebop_models = [
ref('bebop_rfq_ethereum_trades'),
ref('bebop_jam_ethereum_trades')
ref('bebop_jam_ethereum_trades'),
ref('bebop_blend_ethereum_trades')
] %}

SELECT *
Expand Down
37 changes: 32 additions & 5 deletions dex/models/_projects/bebop/polygon/bebop_blend_polygon_trades.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ WITH
raw_call_data AS (
SELECT
fun_type, call_success, call_block_time, call_block_number, call_tx_hash, contract_address, "order",
ROW_NUMBER() OVER (PARTITION BY call_tx_hash ORDER BY call_block_number) AS row_num
ROW_NUMBER() OVER (PARTITION BY call_tx_hash) AS row_num
FROM (
SELECT
'Single' as fun_type, call_success, call_block_time, call_block_number, call_tx_hash, contract_address, "order"
Expand Down Expand Up @@ -187,7 +187,17 @@ unnested_taker_arrays AS (
element_at(taker_amounts, sequence_number) AS taker_token_amount,
order_index,
sequence_number - 1 AS taker_token_index
FROM (SELECT * FROM raw_bebop_multi_trade UNION ALL SELECT * FROM unnested_aggregate_orders)
FROM (
SELECT
block_time, block_number, tx_hash, evt_index, contract_address, taker_address, maker_address,
taker_tokens, maker_tokens, taker_amounts, maker_amounts, taker_tokens_len, maker_tokens_len, order_index
FROM raw_bebop_multi_trade
UNION
SELECT
block_time, block_number, tx_hash, evt_index, contract_address, taker_address, maker_address,
taker_tokens, maker_tokens, taker_amounts, maker_amounts, taker_tokens_len, maker_tokens_len, order_index
FROM unnested_aggregate_orders
)
CROSS JOIN UNNEST(sequence(1, taker_tokens_len)) AS t(sequence_number)
),
bebop_multi_and_aggregate_trades AS (
Expand All @@ -213,6 +223,23 @@ bebop_multi_and_aggregate_trades AS (
cast(array[order_index, taker_token_index, sequence_number - 1] as array<bigint>) as trace_address
FROM unnested_taker_arrays
CROSS JOIN UNNEST(sequence(1, maker_tokens_len)) AS t(sequence_number)
),
all_trades as (
SELECT
block_time, block_number, tx_hash, evt_index, contract_address, taker_address, maker_address, taker_token_address,
maker_token_address, taker_token_amount, maker_token_amount, trade_type, taker_tokens_len, maker_tokens_len, trace_address,
ROW_NUMBER() OVER (PARTITION BY tx_hash, evt_index, trace_address ORDER BY evt_index) AS row_num
FROM (
SELECT
block_time, block_number, tx_hash, evt_index, contract_address, taker_address, maker_address, taker_token_address,
maker_token_address, taker_token_amount, maker_token_amount, trade_type, taker_tokens_len, maker_tokens_len, trace_address
FROM bebop_single_trade
UNION ALL
SELECT
block_time, block_number, tx_hash, evt_index, contract_address, taker_address, maker_address, taker_token_address,
maker_token_address, taker_token_amount, maker_token_amount, trade_type, taker_tokens_len, maker_tokens_len, trace_address
FROM bebop_multi_and_aggregate_trades
)
)

SELECT
Expand Down Expand Up @@ -257,9 +284,9 @@ SELECT
tx.to AS tx_to,
t.trace_address,
t.evt_index
FROM (SELECT * FROM bebop_single_trade UNION ALL SELECT * FROM bebop_multi_and_aggregate_trades) t
INNER JOIN
{{ source('polygon', 'transactions')}} tx
FROM (select * from all_trades where row_num = 1) t
INNER JOIN
{{ source('polygon', 'transactions')}} tx
ON t.tx_hash = tx.hash
{% if not is_incremental() %}
AND tx.block_time >= TIMESTAMP '{{project_start_date}}'
Expand Down

0 comments on commit 34e5ef4

Please sign in to comment.