Skip to content

Commit

Permalink
NET-352: fix SQL generation bug in week-aggregate combining query.
Browse files Browse the repository at this point in the history
  • Loading branch information
msqr committed Oct 18, 2023
1 parent f5a9d63 commit 4735490
Show file tree
Hide file tree
Showing 3 changed files with 190 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,18 @@ private void sqlSelect(StringBuilder buf) {
buf.append("SELECT ");
if ( combine != null ) {
buf.append("s.vstream_id AS stream_id,\n");
buf.append(" s.obj_rank,\n");
buf.append(" s.source_rank,\n");
buf.append(" s.names_i,\n");
buf.append(" s.names_a,\n");
if ( aggregation == Aggregation.Week ) {
buf.append(" solarcommon.first(s.obj_rank) AS obj_rank,\n");
buf.append(" solarcommon.first(s.source_rank) AS source_rank,\n");
buf.append(" solarcommon.first(s.names_i) AS names_i,\n");
buf.append(" solarcommon.first(s.names_a) AS names_a,\n");

} else {
buf.append(" s.obj_rank,\n");
buf.append(" s.source_rank,\n");
buf.append(" s.names_i,\n");
buf.append(" s.names_a,\n");
}
} else {
buf.append(" datum.stream_id,\n");
}
Expand Down Expand Up @@ -361,8 +369,14 @@ private void sqlCore(StringBuilder buf, boolean ordered) {
sqlFrom(buf);
sqlWhere(buf);
if ( aggregation == Aggregation.Week ) {
buf.append("GROUP BY ");
if ( combine != null ) {
buf.append("s.vstream_id");
} else {
buf.append("datum.stream_id");
}
buf.append(
"GROUP BY datum.stream_id, date_trunc('week', datum.ts_start AT TIME ZONE s.time_zone) AT TIME ZONE s.time_zone\n");
", date_trunc('week', datum.ts_start AT TIME ZONE s.time_zone) AT TIME ZONE s.time_zone\n");
}
if ( combine != null ) {
if ( isMinuteAggregation() ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -833,6 +833,29 @@ public void sql_find_daily_vids_nodeAndSources_absoluteDates_sortTimeNodeSource_
TestSqlResources.class, SQL_COMMENT));
}

@Test
public void sql_find_weekly_vids_nodeAndSources_absoluteDates() {
// GIVEN
BasicDatumCriteria filter = new BasicDatumCriteria();
filter.setAggregation(Aggregation.Week);
filter.setNodeIds(new Long[] { 1L, 2L, 3L });
filter.setSourceIds(new String[] { "a", "b", "c" });
filter.setStartDate(Instant.now().truncatedTo(ChronoUnit.DAYS));
filter.setEndDate(filter.getStartDate().plusSeconds(TimeUnit.DAYS.toSeconds(7)));
filter.setCombiningType(CombiningType.Sum);
filter.setObjectIdMaps(new String[] { "100:1,2,3" });
filter.setSourceIdMaps(new String[] { "V:a,b,c" });

// WHEN
String sql = new SelectDatum(filter).getSql();

// THEN
log.debug("Generated SQL:\n{}", sql);
assertThat("SQL matches", sql,
equalToTextResource("select-datum-weekly-virtual-nodesAndSources-dates.sql",
TestSqlResources.class, SQL_COMMENT));
}

@Test
public void sql_find_seasonal_hod_nodesAndSources_absoluteDates() {
// GIVEN
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
WITH rs AS (
SELECT s.stream_id
, CASE
WHEN array_position(?, s.node_id) IS NOT NULL THEN ?
ELSE s.node_id
END AS node_id
, COALESCE(array_position(?, s.node_id), 0) AS obj_rank
, CASE
WHEN array_position(?, s.source_id::TEXT) IS NOT NULL THEN ?
ELSE s.source_id
END AS source_id
, COALESCE(array_position(?, s.source_id::TEXT), 0) AS source_rank
, s.names_i
, s.names_a, COALESCE(l.time_zone, 'UTC') AS time_zone
FROM solardatm.da_datm_meta s
LEFT OUTER JOIN solarnet.sn_node n ON n.node_id = s.node_id
LEFT OUTER JOIN solarnet.sn_loc l ON l.id = n.loc_id
WHERE s.node_id = ANY(?)
AND s.source_id ~ ANY(ARRAY(SELECT solarcommon.ant_pattern_to_regexp(unnest(?))))
)
, s AS (
SELECT solardatm.virutal_stream_id(node_id, source_id) AS vstream_id
, *
FROM rs
)
, vs AS (
SELECT DISTINCT ON (vstream_id) vstream_id, node_id, source_id
FROM s
)
, d AS (
SELECT s.vstream_id AS stream_id,
solarcommon.first(s.obj_rank) AS obj_rank,
solarcommon.first(s.source_rank) AS source_rank,
solarcommon.first(s.names_i) AS names_i,
solarcommon.first(s.names_a) AS names_a,
date_trunc('week', datum.ts_start AT TIME ZONE s.time_zone) AT TIME ZONE s.time_zone AS ts,
(solardatm.rollup_agg_data(
(datum.data_i, datum.data_a, datum.data_s, datum.data_t, datum.stat_i, datum.read_a)::solardatm.agg_data
ORDER BY datum.ts_start)).*
FROM s
INNER JOIN solardatm.agg_datm_daily datum ON datum.stream_id = s.stream_id
WHERE datum.ts_start >= ?
AND datum.ts_start < ?
GROUP BY s.vstream_id, date_trunc('week', datum.ts_start AT TIME ZONE s.time_zone) AT TIME ZONE s.time_zone
)
-- calculate instantaneous values per date + property NAME (to support joining different streams with different index orders)
-- ordered by object/source ranking defined by query metadata; assume names are unique per stream
, wi AS (
SELECT
d.stream_id
, d.ts
, p.val
, rank() OVER slot as prank
, d.names_i[p.idx] AS pname
, d.stat_i[p.idx][1] AS cnt
, SUM(d.stat_i[p.idx][1]) OVER slot AS tot_cnt
FROM d
INNER JOIN unnest(d.data_i) WITH ORDINALITY AS p(val, idx) ON TRUE
WHERE p.val IS NOT NULL
WINDOW slot AS (PARTITION BY d.stream_id, d.ts, d.names_i[p.idx] ORDER BY d.obj_rank, d.source_rank RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
ORDER BY d.stream_id, d.ts, d.names_i[p.idx], d.obj_rank, d.source_rank
)
-- calculate instantaneous statistics
, di AS (
SELECT
stream_id
, ts
, pname
, to_char(SUM(val), 'FM999999999999999999990.999999999')::NUMERIC AS val
, SUM(cnt) AS cnt
FROM wi
GROUP BY stream_id, ts, pname
ORDER BY stream_id, ts, pname
)
-- join property data back into arrays; no stat_i for virtual stream
, di_ary AS (
SELECT
stream_id
, ts
, array_agg(val ORDER BY pname) AS data_i
, array_agg(pname ORDER BY pname) AS names_i
FROM di
GROUP BY stream_id, ts
ORDER BY stream_id, ts
)
-- calculate accumulating values per date + property NAME (to support joining different streams with different index orders)
-- ordered by object/source ranking defined by query metadata; assume names are unique per stream
, wa AS (
SELECT
d.stream_id
, d.ts
, p.val
, rank() OVER slot as prank
, d.names_a[p.idx] AS pname
, d.read_a[p.idx][1] AS rdiff
, first_value(d.read_a[p.idx][2]) OVER slot AS rstart
, last_value(d.read_a[p.idx][3]) OVER slot AS rend
FROM d
INNER JOIN unnest(d.data_a) WITH ORDINALITY AS p(val, idx) ON TRUE
WHERE p.val IS NOT NULL
WINDOW slot AS (PARTITION BY d.stream_id, d.ts, d.names_a[p.idx] ORDER BY d.obj_rank, d.source_rank RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
ORDER BY d.stream_id, d.ts, d.names_a[p.idx], d.obj_rank, d.source_rank
)
-- calculate accumulating statistics
, da AS (
SELECT
stream_id
, ts
, pname
, to_char(SUM(val), 'FM999999999999999999990.999999999')::NUMERIC AS val
, to_char(SUM(rdiff), 'FM999999999999999999990.999999999')::NUMERIC AS rdiff
FROM wa
GROUP BY stream_id, ts, pname
ORDER BY stream_id, ts, pname
)
-- join property data back into arrays; only read_a.diff for virtual stream
, da_ary AS (
SELECT
stream_id
, ts
, array_agg(val ORDER BY pname) AS data_a
, array_agg(
ARRAY[rdiff, NULL, NULL] ORDER BY pname
) AS read_a
, array_agg(pname ORDER BY pname) AS names_a
FROM da
GROUP BY stream_id, ts
ORDER BY stream_id, ts
)
, datum AS (
SELECT
COALESCE(di_ary.stream_id, da_ary.stream_id) AS stream_id
, COALESCE(di_ary.ts, da_ary.ts) AS ts
, di_ary.data_i
, da_ary.data_a
, NULL::BIGINT[] AS data_s
, NULL::TEXT[] AS data_t
, NULL::BIGINT[][] AS stat_i
, da_ary.read_a
, di_ary.names_i
, da_ary.names_a
FROM di_ary
FULL OUTER JOIN da_ary ON da_ary.stream_id = di_ary.stream_id AND da_ary.ts = di_ary.ts
)
SELECT datum.*, vs.node_id, vs.source_id
FROM datum
INNER JOIN vs ON vs.vstream_id = datum.stream_id
ORDER BY datum.stream_id, ts

0 comments on commit 4735490

Please sign in to comment.