Skip to content

Commit

Permalink
fix: array type could be warpped by angle brackets in athena
Browse files Browse the repository at this point in the history
  • Loading branch information
taozhi8833998 committed Aug 29, 2024
1 parent 9c6a2e4 commit 3a9bf88
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 2 deletions.
13 changes: 12 additions & 1 deletion pegjs/athena.pegjs
Original file line number Diff line number Diff line change
Expand Up @@ -2508,7 +2508,8 @@ COMMA = ','
STAR = '*'
LPAREN = '('
RPAREN = ')'

LANGLE_BRACKET = '<'
RANGLE_BRACKET = '>'
LBRAKE = '['
RBRAKE = ']'

Expand Down Expand Up @@ -2744,6 +2745,16 @@ array_type
},
}
}
/ t:KW_ARRAY __ LANGLE_BRACKET __ a:data_type_list __ RANGLE_BRACKET {
return {
dataType: t,
angle_brackets: true,
expr: {
type: 'expr_list',
value: a.map(d => ({ type: 'datatype', ...d }))
},
}
}
character_string_type
= t:(KW_CHAR / KW_VARCHAR) __ LPAREN __ l:[0-9]+ __ RPAREN {
return { dataType: t, length: parseInt(l.join(''), 10), parentheses: true };
Expand Down
3 changes: 2 additions & 1 deletion src/func.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ function arrayDimensionToSymbol(target) {

function castToSQL(expr) {
const { target, expr: expression, keyword, symbol, as: alias, parentheses: outParentheses } = expr
const { length, dataType, parentheses, quoted, scale, suffix: dataTypeSuffix, expr: targetExpr } = target
const { angle_brackets: angleBrackets, length, dataType, parentheses, quoted, scale, suffix: dataTypeSuffix, expr: targetExpr } = target
let str = targetExpr ? exprToSQL(targetExpr) : ''
if (length != null) str = scale ? `${length}, ${scale}` : length
if (parentheses) str = `(${str})`
if (angleBrackets) str = `<${str}>`
if (dataTypeSuffix && dataTypeSuffix.length) str += ` ${dataTypeSuffix.map(literalToSQL).join(' ')}`
let prefix = exprToSQL(expression)
let symbolChar = '::'
Expand Down
81 changes: 81 additions & 0 deletions test/athena.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -211,4 +211,85 @@ describe('athena', () => {
left join counted_tags_map t on od.organization = t.organization and od.date = t.conversation_ctreated_date`
expect(getParsedSql(sql)).to.be.equal("WITH `org_mapping` AS (SELECT * FROM (SELECT trim(lower(`name`)) AS `name`, `organization_name`, ROW_NUMBER() OVER (PARTITION BY trim(lower(`name`))) AS `rn` FROM `bronze_sales_prod`.`drive_organization_mapping`) WHERE `rn` = 1), `orgs` AS (SELECT trim(COALESCE(`om`.`organization_name`, `custom_attributes`.`Account`)) AS `organization`, * FROM `bronze_sales_prod`.`intercom_all_conversations` INNER JOIN `org_mapping` AS `om` ON trim(lower(`custom_attributes`.`Account`)) = trim(lower(`om`.`name`))), `orgs_dates_metrics` AS (SELECT `organization`, CAST(from_unixtime(`created_at`) AS DATE) AS `conversation_ctreated_date`, array_agg(`custom_attributes`) AS `custom_attributes_array`, approx_percentile(CASE WHEN `state` = 'closed' THEN date_diff('minute', from_unixtime(`created_at`), from_unixtime(`updated_at`)) ELSE 0 END, 0.5) AS `median_resolution_time_minutes`, SUM(CASE WHEN `state` IN ('open', 'snoozed') THEN 1 ELSE 0 END) AS `open_conversations_count`, COUNT(*) OVER all_conversations_count FROM `orgs` WHERE `organization` IS NOT NULL GROUP BY 1, 2), `last_year_org_dates` AS (SELECT DISTINCT `om`.`organization_name` AS `organization`, date_add('day', -`sequence`, CURRENT_DATE) AS DATE FROM UNNEST(sequence(1, 365)) AS `t` INNER JOIN `org_mapping` AS `om` ON 1 = 1 WHERE date_add('day', -`sequence`, CURRENT_DATE) >= date_add('year', -1, CURRENT_DATE)), `counted_tags` AS (SELECT `organization`, CAST(from_unixtime(`created_at`) AS DATE) AS `conversation_ctreated_date`, `tag`.`name` AS `tag`, json_extract_scalar(CAST(`source` AS JSON), '$.author.email') AS `author_email`, COUNT(*) AS `count` FROM `orgs` CROSS JOIN UNNEST(`tags`.`tags`) AS t(`tag`) GROUP BY 1, 2, 3, 4), `counted_tags_map` AS (SELECT `organization`, `conversation_ctreated_date`, `author_email`, MAP(ARRAY_AGG(`tag`), ARRAY_AGG(`count`)) AS `tags_counts` FROM `counted_tags` GROUP BY 1, 2, 3) SELECT `od`.`organization` AS `organization_name`, `od`.`date`, `t`.`author_email`, coalesce(`t`.`tags_counts`, MAP()) AS `tags_counts`, coalesce(`o`.`custom_attributes_array`, ARRAY[]) AS `custom_attributes_array`, COALESCE(`o`.`median_resolution_time_minutes`, 0) AS `median_resolution_time_minutes`, COALESCE(`o`.`open_conversations_count`, 0) AS `open_conversations_count`, COALESCE(`o`.`overall_conversations_count`, 0) AS `overall_conversations_count`, CAST(CURRENT_TIMESTAMP AS TIMESTAMP(6)) AS `dbt_insert_time` FROM `last_year_org_dates` AS `od` LEFT JOIN `orgs_dates_metrics` AS `o` ON `od`.`organization` = `o`.`organization` AND `od`.`date` = `o`.`conversation_ctreated_date` LEFT JOIN `counted_tags_map` AS `t` ON `od`.`organization` = `t`.`organization` AND `od`.`date` = `t`.`conversation_ctreated_date`")
})
it('should parse with clause', () => {
const sql = `WITH user_logins as (
SELECT user_id, event, dttm, dashboard_id, slice_id
FROM (
SELECT l.user_id, 'login' AS event, l.dttm, CAST(NULL as bigint) AS dashboard_id, CAST(NULL as bigint) AS slice_id,
LAG(l.dttm) OVER (PARTITION BY l.user_id ORDER BY l.dttm) AS previous_dttm
FROM "bronze_prod"."superset_logs" l
WHERE l.action = 'welcome'
)
WHERE previous_dttm IS NULL -- Keep the first record
OR dttm > previous_dttm + INTERVAL '1' HOUR -- Only keep records that are more than 1 hour apart
ORDER BY user_id, dttm
),
user_events as (
SELECT l.user_id, json_extract_scalar(l."json", '$.event_name') AS event, l.dttm,
NULLIF(COALESCE(cast(json_extract_scalar(l."json", '$.source_id') as bigint), l.dashboard_id), 0) AS dashboard_id,
NULLIF(COALESCE(cast(json_extract_scalar(l."json", '$.slice_id') as bigint), cast(json_extract_scalar(l."json", '$.chartId') as bigint), l.slice_id), 0) AS slice_id
FROM "bronze_prod"."superset_logs" l
WHERE json_extract_scalar("json", '$.event_name') IN (
'spa_navigation',
'mount_dashboard',
'export_csv_dashboard_chart',
'chart_download_as_image',
'export_xlsx_dashboard_chart',
'change_dashboard_filter'
)
),
export_dashboard_logs as (
SELECT user_id, event, dttm,
CAST(json_extract_scalar(json_array_element, '$.value') as bigint) AS dashboard_id,
CAST(NULL as bigint) as slice_id
FROM (
SELECT user_id, event, dttm,
json_array_element
FROM (
SELECT l.user_id, 'export_dashboard' AS event, l.dttm,
json_extract(l."json", '$.rison.filters') AS filters_array
FROM
"bronze_prod"."superset_logs" l
WHERE action = 'ReportScheduleRestApi.get_list'
)
CROSS JOIN UNNEST(CAST(filters_array AS ARRAY<json>)) AS t (json_array_element)
WHERE
json_extract_scalar(json_array_element, '$.col') = 'dashboard_id'
)
),
relevant_logs as (
SELECT *, ROW_NUMBER() OVER(PARTITION BY user_id, dttm ORDER BY dashboard_id) as RN
FROM(
SELECT user_id, dttm, event, max(dashboard_id) as dashboard_id, max(slice_id) as slice_id
FROM (
SELECT *
FROM user_logins
UNION ALL
SELECT *
FROM user_events
UNION ALL
SELECT *
FROM export_dashboard_logs
)
GROUP BY user_id, dttm, event
)
),
organizational_domains as (
SELECT lower(split_part(split_part(therapist_mail, '@', 2), '.', 1)) AS organization_domain, max(therapist_organization_name) as organization
from "silver_prod"."eleos_full_therapist_info"
group by 1
)
SELECT l.user_id, l.dttm, l.event, l.dashboard_id, l.slice_id, u.last_name, u.email, o.organization, d.dashboard_title, s.slice_name, 'Client Facing' as superset_instance
FROM relevant_logs l
JOIN "bronze_prod"."superset_ab_user" u ON l.user_id = u.id
LEFT JOIN "bronze_prod"."superset_dashboards" d ON l.dashboard_id = d.id
LEFT JOIN "bronze_prod"."superset_slices" s ON l.slice_id = s.id
LEFT JOIN organizational_domains o ON lower(split_part(split_part(u.email, '@', 2), '.', 1)) = o.organization_domain
WHERE RN = 1 AND lower(u.email) NOT LIKE '%eleos%'
AND lower(u.email) NOT LIKE '%test%'
AND lower(u.username) NOT LIKE '%eleos%'
AND lower(u.username) NOT LIKE '%test%'
AND lower(u.username) NOT LIKE '%admin%'`
expect(getParsedSql(sql)).to.be.equal("WITH `user_logins` AS (SELECT `user_id`, `event`, `dttm`, `dashboard_id`, `slice_id` FROM (SELECT `l`.`user_id`, 'login' AS `event`, `l`.`dttm`, CAST(NULL AS BIGINT) AS `dashboard_id`, CAST(NULL AS BIGINT) AS `slice_id`, LAG(`l`.`dttm`) OVER (PARTITION BY `l`.`user_id` ORDER BY `l`.`dttm` ASC) AS `previous_dttm` FROM `bronze_prod`.`superset_logs` AS `l` WHERE `l`.`action` = 'welcome') WHERE `previous_dttm` IS NULL OR `dttm` > `previous_dttm` + INTERVAL '1' HOUR ORDER BY `user_id` ASC, `dttm` ASC), `user_events` AS (SELECT `l`.`user_id`, json_extract_scalar(`l`.`json`, '$.event_name') AS `event`, `l`.`dttm`, NULLIF(COALESCE(CAST(json_extract_scalar(`l`.`json`, '$.source_id') AS BIGINT), `l`.`dashboard_id`), 0) AS `dashboard_id`, NULLIF(COALESCE(CAST(json_extract_scalar(`l`.`json`, '$.slice_id') AS BIGINT), CAST(json_extract_scalar(`l`.`json`, '$.chartId') AS BIGINT), `l`.`slice_id`), 0) AS `slice_id` FROM `bronze_prod`.`superset_logs` AS `l` WHERE json_extract_scalar(\"json\", '$.event_name') IN ('spa_navigation', 'mount_dashboard', 'export_csv_dashboard_chart', 'chart_download_as_image', 'export_xlsx_dashboard_chart', 'change_dashboard_filter')), `export_dashboard_logs` AS (SELECT `user_id`, `event`, `dttm`, CAST(json_extract_scalar(`json_array_element`, '$.value') AS BIGINT) AS `dashboard_id`, CAST(NULL AS BIGINT) AS `slice_id` FROM (SELECT `user_id`, `event`, `dttm`, `json_array_element` FROM (SELECT `l`.`user_id`, 'export_dashboard' AS `event`, `l`.`dttm`, json_extract(`l`.`json`, '$.rison.filters') AS `filters_array` FROM `bronze_prod`.`superset_logs` AS `l` WHERE `action` = 'ReportScheduleRestApi.get_list') CROSS JOIN UNNEST(CAST(`filters_array` AS ARRAY<JSON>)) AS t(`json_array_element`) WHERE json_extract_scalar(`json_array_element`, '$.col') = 'dashboard_id')), `relevant_logs` AS (SELECT *, ROW_NUMBER() OVER (PARTITION BY `user_id`, `dttm` ORDER BY `dashboard_id` ASC) AS `RN` FROM (SELECT `user_id`, `dttm`, `event`, MAX(`dashboard_id`) AS `dashboard_id`, MAX(`slice_id`) AS `slice_id` FROM (SELECT * FROM `user_logins` UNION ALL SELECT * FROM `user_events` UNION ALL SELECT * FROM `export_dashboard_logs`) GROUP BY `user_id`, `dttm`, `event`)), `organizational_domains` AS (SELECT lower(split_part(split_part(`therapist_mail`, '@', 2), '.', 1)) AS `organization_domain`, MAX(`therapist_organization_name`) AS `organization` FROM `silver_prod`.`eleos_full_therapist_info` GROUP BY 1) SELECT `l`.`user_id`, `l`.`dttm`, `l`.`event`, `l`.`dashboard_id`, `l`.`slice_id`, `u`.`last_name`, `u`.`email`, `o`.`organization`, `d`.`dashboard_title`, `s`.`slice_name`, 'Client Facing' AS `superset_instance` FROM `relevant_logs` AS `l` INNER JOIN `bronze_prod`.`superset_ab_user` AS `u` ON `l`.`user_id` = `u`.`id` LEFT JOIN `bronze_prod`.`superset_dashboards` AS `d` ON `l`.`dashboard_id` = `d`.`id` LEFT JOIN `bronze_prod`.`superset_slices` AS `s` ON `l`.`slice_id` = `s`.`id` LEFT JOIN `organizational_domains` AS `o` ON lower(split_part(split_part(`u`.`email`, '@', 2), '.', 1)) = `o`.`organization_domain` WHERE `RN` = 1 AND lower(`u`.`email`) NOT LIKE '%eleos%' AND lower(`u`.`email`) NOT LIKE '%test%' AND lower(`u`.`username`) NOT LIKE '%eleos%' AND lower(`u`.`username`) NOT LIKE '%test%' AND lower(`u`.`username`) NOT LIKE '%admin%'")
})
})

0 comments on commit 3a9bf88

Please sign in to comment.