From 4735490dc438605c52b0ab8284131fb3b7aff7e2 Mon Sep 17 00:00:00 2001 From: Matt Magoffin Date: Wed, 18 Oct 2023 14:23:37 +1300 Subject: [PATCH] NET-352: fix SQL generation bug in week-aggregate combining query. --- .../datum/v2/dao/jdbc/sql/SelectDatum.java | 24 ++- .../dao/jdbc/sql/test/SelectDatumTests.java | 23 +++ ...m-weekly-virtual-nodesAndSources-dates.sql | 148 ++++++++++++++++++ 3 files changed, 190 insertions(+), 5 deletions(-) create mode 100644 solarnet/datum/src/test/resources/net/solarnetwork/central/datum/v2/dao/jdbc/sql/test/select-datum-weekly-virtual-nodesAndSources-dates.sql diff --git a/solarnet/datum/src/main/java/net/solarnetwork/central/datum/v2/dao/jdbc/sql/SelectDatum.java b/solarnet/datum/src/main/java/net/solarnetwork/central/datum/v2/dao/jdbc/sql/SelectDatum.java index 4a630aa26..3aab57230 100644 --- a/solarnet/datum/src/main/java/net/solarnetwork/central/datum/v2/dao/jdbc/sql/SelectDatum.java +++ b/solarnet/datum/src/main/java/net/solarnetwork/central/datum/v2/dao/jdbc/sql/SelectDatum.java @@ -161,10 +161,18 @@ private void sqlSelect(StringBuilder buf) { buf.append("SELECT "); if ( combine != null ) { buf.append("s.vstream_id AS stream_id,\n"); - buf.append(" s.obj_rank,\n"); - buf.append(" s.source_rank,\n"); - buf.append(" s.names_i,\n"); - buf.append(" s.names_a,\n"); + if ( aggregation == Aggregation.Week ) { + buf.append(" solarcommon.first(s.obj_rank) AS obj_rank,\n"); + buf.append(" solarcommon.first(s.source_rank) AS source_rank,\n"); + buf.append(" solarcommon.first(s.names_i) AS names_i,\n"); + buf.append(" solarcommon.first(s.names_a) AS names_a,\n"); + + } else { + buf.append(" s.obj_rank,\n"); + buf.append(" s.source_rank,\n"); + buf.append(" s.names_i,\n"); + buf.append(" s.names_a,\n"); + } } else { buf.append(" datum.stream_id,\n"); } @@ -361,8 +369,14 @@ private void sqlCore(StringBuilder buf, boolean ordered) { sqlFrom(buf); sqlWhere(buf); if ( aggregation == Aggregation.Week ) { + buf.append("GROUP BY "); + if ( combine != null ) { + buf.append("s.vstream_id"); + } else { + buf.append("datum.stream_id"); + } buf.append( - "GROUP BY datum.stream_id, date_trunc('week', datum.ts_start AT TIME ZONE s.time_zone) AT TIME ZONE s.time_zone\n"); + ", date_trunc('week', datum.ts_start AT TIME ZONE s.time_zone) AT TIME ZONE s.time_zone\n"); } if ( combine != null ) { if ( isMinuteAggregation() ) { diff --git a/solarnet/datum/src/test/java/net/solarnetwork/central/datum/v2/dao/jdbc/sql/test/SelectDatumTests.java b/solarnet/datum/src/test/java/net/solarnetwork/central/datum/v2/dao/jdbc/sql/test/SelectDatumTests.java index bcf920a5f..292be9307 100644 --- a/solarnet/datum/src/test/java/net/solarnetwork/central/datum/v2/dao/jdbc/sql/test/SelectDatumTests.java +++ b/solarnet/datum/src/test/java/net/solarnetwork/central/datum/v2/dao/jdbc/sql/test/SelectDatumTests.java @@ -833,6 +833,29 @@ public void sql_find_daily_vids_nodeAndSources_absoluteDates_sortTimeNodeSource_ TestSqlResources.class, SQL_COMMENT)); } + @Test + public void sql_find_weekly_vids_nodeAndSources_absoluteDates() { + // GIVEN + BasicDatumCriteria filter = new BasicDatumCriteria(); + filter.setAggregation(Aggregation.Week); + filter.setNodeIds(new Long[] { 1L, 2L, 3L }); + filter.setSourceIds(new String[] { "a", "b", "c" }); + filter.setStartDate(Instant.now().truncatedTo(ChronoUnit.DAYS)); + filter.setEndDate(filter.getStartDate().plusSeconds(TimeUnit.DAYS.toSeconds(7))); + filter.setCombiningType(CombiningType.Sum); + filter.setObjectIdMaps(new String[] { "100:1,2,3" }); + filter.setSourceIdMaps(new String[] { "V:a,b,c" }); + + // WHEN + String sql = new SelectDatum(filter).getSql(); + + // THEN + log.debug("Generated SQL:\n{}", sql); + assertThat("SQL matches", sql, + equalToTextResource("select-datum-weekly-virtual-nodesAndSources-dates.sql", + TestSqlResources.class, SQL_COMMENT)); + } + @Test public void sql_find_seasonal_hod_nodesAndSources_absoluteDates() { // GIVEN diff --git a/solarnet/datum/src/test/resources/net/solarnetwork/central/datum/v2/dao/jdbc/sql/test/select-datum-weekly-virtual-nodesAndSources-dates.sql b/solarnet/datum/src/test/resources/net/solarnetwork/central/datum/v2/dao/jdbc/sql/test/select-datum-weekly-virtual-nodesAndSources-dates.sql new file mode 100644 index 000000000..f6d82013f --- /dev/null +++ b/solarnet/datum/src/test/resources/net/solarnetwork/central/datum/v2/dao/jdbc/sql/test/select-datum-weekly-virtual-nodesAndSources-dates.sql @@ -0,0 +1,148 @@ +WITH rs AS ( + SELECT s.stream_id + , CASE + WHEN array_position(?, s.node_id) IS NOT NULL THEN ? + ELSE s.node_id + END AS node_id + , COALESCE(array_position(?, s.node_id), 0) AS obj_rank + , CASE + WHEN array_position(?, s.source_id::TEXT) IS NOT NULL THEN ? + ELSE s.source_id + END AS source_id + , COALESCE(array_position(?, s.source_id::TEXT), 0) AS source_rank + , s.names_i + , s.names_a, COALESCE(l.time_zone, 'UTC') AS time_zone + FROM solardatm.da_datm_meta s + LEFT OUTER JOIN solarnet.sn_node n ON n.node_id = s.node_id + LEFT OUTER JOIN solarnet.sn_loc l ON l.id = n.loc_id + WHERE s.node_id = ANY(?) + AND s.source_id ~ ANY(ARRAY(SELECT solarcommon.ant_pattern_to_regexp(unnest(?)))) +) +, s AS ( + SELECT solardatm.virutal_stream_id(node_id, source_id) AS vstream_id + , * + FROM rs +) +, vs AS ( + SELECT DISTINCT ON (vstream_id) vstream_id, node_id, source_id + FROM s +) +, d AS ( + SELECT s.vstream_id AS stream_id, + solarcommon.first(s.obj_rank) AS obj_rank, + solarcommon.first(s.source_rank) AS source_rank, + solarcommon.first(s.names_i) AS names_i, + solarcommon.first(s.names_a) AS names_a, + date_trunc('week', datum.ts_start AT TIME ZONE s.time_zone) AT TIME ZONE s.time_zone AS ts, + (solardatm.rollup_agg_data( + (datum.data_i, datum.data_a, datum.data_s, datum.data_t, datum.stat_i, datum.read_a)::solardatm.agg_data + ORDER BY datum.ts_start)).* + FROM s + INNER JOIN solardatm.agg_datm_daily datum ON datum.stream_id = s.stream_id + WHERE datum.ts_start >= ? + AND datum.ts_start < ? + GROUP BY s.vstream_id, date_trunc('week', datum.ts_start AT TIME ZONE s.time_zone) AT TIME ZONE s.time_zone +) +-- calculate instantaneous values per date + property NAME (to support joining different streams with different index orders) +-- ordered by object/source ranking defined by query metadata; assume names are unique per stream +, wi AS ( + SELECT + d.stream_id + , d.ts + , p.val + , rank() OVER slot as prank + , d.names_i[p.idx] AS pname + , d.stat_i[p.idx][1] AS cnt + , SUM(d.stat_i[p.idx][1]) OVER slot AS tot_cnt + FROM d + INNER JOIN unnest(d.data_i) WITH ORDINALITY AS p(val, idx) ON TRUE + WHERE p.val IS NOT NULL + WINDOW slot AS (PARTITION BY d.stream_id, d.ts, d.names_i[p.idx] ORDER BY d.obj_rank, d.source_rank RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) + ORDER BY d.stream_id, d.ts, d.names_i[p.idx], d.obj_rank, d.source_rank +) +-- calculate instantaneous statistics +, di AS ( + SELECT + stream_id + , ts + , pname + , to_char(SUM(val), 'FM999999999999999999990.999999999')::NUMERIC AS val + , SUM(cnt) AS cnt + FROM wi + GROUP BY stream_id, ts, pname + ORDER BY stream_id, ts, pname +) +-- join property data back into arrays; no stat_i for virtual stream +, di_ary AS ( + SELECT + stream_id + , ts + , array_agg(val ORDER BY pname) AS data_i + , array_agg(pname ORDER BY pname) AS names_i + FROM di + GROUP BY stream_id, ts + ORDER BY stream_id, ts +) +-- calculate accumulating values per date + property NAME (to support joining different streams with different index orders) +-- ordered by object/source ranking defined by query metadata; assume names are unique per stream +, wa AS ( + SELECT + d.stream_id + , d.ts + , p.val + , rank() OVER slot as prank + , d.names_a[p.idx] AS pname + , d.read_a[p.idx][1] AS rdiff + , first_value(d.read_a[p.idx][2]) OVER slot AS rstart + , last_value(d.read_a[p.idx][3]) OVER slot AS rend + FROM d + INNER JOIN unnest(d.data_a) WITH ORDINALITY AS p(val, idx) ON TRUE + WHERE p.val IS NOT NULL + WINDOW slot AS (PARTITION BY d.stream_id, d.ts, d.names_a[p.idx] ORDER BY d.obj_rank, d.source_rank RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) + ORDER BY d.stream_id, d.ts, d.names_a[p.idx], d.obj_rank, d.source_rank +) +-- calculate accumulating statistics +, da AS ( + SELECT + stream_id + , ts + , pname + , to_char(SUM(val), 'FM999999999999999999990.999999999')::NUMERIC AS val + , to_char(SUM(rdiff), 'FM999999999999999999990.999999999')::NUMERIC AS rdiff + FROM wa + GROUP BY stream_id, ts, pname + ORDER BY stream_id, ts, pname +) +-- join property data back into arrays; only read_a.diff for virtual stream +, da_ary AS ( + SELECT + stream_id + , ts + , array_agg(val ORDER BY pname) AS data_a + , array_agg( + ARRAY[rdiff, NULL, NULL] ORDER BY pname + ) AS read_a + , array_agg(pname ORDER BY pname) AS names_a + FROM da + GROUP BY stream_id, ts + ORDER BY stream_id, ts +) +, datum AS ( + SELECT + COALESCE(di_ary.stream_id, da_ary.stream_id) AS stream_id + , COALESCE(di_ary.ts, da_ary.ts) AS ts + , di_ary.data_i + , da_ary.data_a + , NULL::BIGINT[] AS data_s + , NULL::TEXT[] AS data_t + , NULL::BIGINT[][] AS stat_i + , da_ary.read_a + , di_ary.names_i + , da_ary.names_a + FROM di_ary + FULL OUTER JOIN da_ary ON da_ary.stream_id = di_ary.stream_id AND da_ary.ts = di_ary.ts +) +SELECT datum.*, vs.node_id, vs.source_id +FROM datum +INNER JOIN vs ON vs.vstream_id = datum.stream_id +ORDER BY datum.stream_id, ts \ No newline at end of file