-
Notifications
You must be signed in to change notification settings - Fork 21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[O2B-1365] Implement GAQ periods views #1808
base: main
Are you sure you want to change the base?
Changes from all commits
03ca36e
2436041
06268f2
d6e7fcb
1658d1d
068d48c
4ee19e7
10be130
a7a7b54
1ee8bab
e152c6e
3fb54a5
e34e920
db84fcc
5f900f8
4199b3f
53f093a
9de457c
dc33aa6
86cb1fc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,162 @@ | ||||||
'use strict'; | ||||||
|
||||||
const SELECT_RUNS_FROM_TIMESTAMPS_FOR_GAQ_PERIODS = ` | ||||||
SELECT | ||||||
gaqd.data_pass_id, | ||||||
gaqd.run_number, | ||||||
COALESCE(UNIX_TIMESTAMP(first_tf_timestamp), UNIX_TIMESTAMP(time_start), 0) AS ordering_timestamp, | ||||||
UNIX_TIMESTAMP(COALESCE(first_tf_timestamp, time_start)) AS timestamp | ||||||
FROM global_aggregated_quality_detectors AS gaqd | ||||||
INNER JOIN runs as r | ||||||
ON gaqd.run_number = r.run_number | ||||||
`; | ||||||
|
||||||
const SELECT_RUNS_TO_TIMESTAMPS_FOR_GAQ_PERIODS = ` | ||||||
SELECT | ||||||
gaqd.data_pass_id, | ||||||
gaqd.run_number, | ||||||
UNIX_TIMESTAMP(COALESCE(last_tf_timestamp, time_end, NOW(3))) AS ordering_timestamp, | ||||||
UNIX_TIMESTAMP(COALESCE(last_tf_timestamp, time_end)) AS timestamp | ||||||
FROM global_aggregated_quality_detectors AS gaqd | ||||||
INNER JOIN runs as r | ||||||
ON gaqd.run_number = r.run_number | ||||||
`; | ||||||
|
||||||
const SELECT_QCF_EFFECTIVE_PERIODS_FROM_TIMESTAMPS_FOR_GAQ_PERIODS = ` | ||||||
SELECT | ||||||
gaqd.data_pass_id, | ||||||
gaqd.run_number, | ||||||
COALESCE(UNIX_TIMESTAMP(qcfep.\`from\`), 0) AS ordering_timestamp, | ||||||
UNIX_TIMESTAMP(qcfep.\`from\`) AS timestamp | ||||||
FROM quality_control_flag_effective_periods AS qcfep | ||||||
INNER JOIN quality_control_flags AS qcf ON qcf.id = qcfep.flag_id | ||||||
INNER JOIN data_pass_quality_control_flag AS dpqcf ON dpqcf.quality_control_flag_id = qcf.id | ||||||
-- Only flags of detectors which are defined in global_aggregated_quality_detectors | ||||||
-- should be taken into account for calculation of gaq_effective_periods | ||||||
INNER JOIN global_aggregated_quality_detectors AS gaqd | ||||||
ON gaqd.data_pass_id = dpqcf.data_pass_id | ||||||
AND gaqd.run_number = qcf.run_number | ||||||
AND gaqd.detector_id = qcf.detector_id | ||||||
`; | ||||||
|
||||||
const SELECT_QCF_EFFECTIVE_PERIODS_TO_TIMESTAMPS_FOR_GAQ_PERIODS = ` | ||||||
SELECT | ||||||
gaqd.data_pass_id, | ||||||
gaqd.run_number, | ||||||
UNIX_TIMESTAMP(COALESCE(qcfep.\`to\`, NOW(3))) AS ordering_timestamp, | ||||||
UNIX_TIMESTAMP(COALESCE(qcfep.\`to\`)) AS timestamp | ||||||
FROM quality_control_flag_effective_periods AS qcfep | ||||||
INNER JOIN quality_control_flags AS qcf ON qcf.id = qcfep.flag_id | ||||||
INNER JOIN data_pass_quality_control_flag AS dpqcf ON dpqcf.quality_control_flag_id = qcf.id | ||||||
-- Only flags of detectors which are defined in global_aggregated_quality_detectors | ||||||
-- should be taken into account for calculation of gaq_effective_periods | ||||||
INNER JOIN global_aggregated_quality_detectors AS gaqd | ||||||
ON gaqd.data_pass_id = dpqcf.data_pass_id | ||||||
AND gaqd.run_number = qcf.run_number | ||||||
AND gaqd.detector_id = qcf.detector_id | ||||||
`; | ||||||
|
||||||
const CREATE_GAQ_PERIODS_TIMESTAMPS_VIEW = ` | ||||||
CREATE OR REPLACE VIEW gaq_periods_timestamps AS | ||||||
SELECT * FROM ( | ||||||
SELECT | ||||||
data_pass_id, | ||||||
run_number, | ||||||
timestamp AS \`from\`, | ||||||
NTH_VALUE(timestamp, 2) OVER ( | ||||||
PARTITION BY data_pass_id, | ||||||
run_number | ||||||
ORDER BY ap.ordering_timestamp | ||||||
ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING | ||||||
) AS \`to\`, | ||||||
NTH_VALUE(ordering_timestamp, 2) OVER ( | ||||||
PARTITION BY data_pass_id, | ||||||
run_number | ||||||
ORDER BY ap.ordering_timestamp | ||||||
ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING | ||||||
) AS \`to_ordering_timestamp\` | ||||||
FROM ( | ||||||
-- Two selects for runs' timestamps (in case QC flag's eff. period doesn't start at run's start or end at run's end ) | ||||||
( ${SELECT_RUNS_FROM_TIMESTAMPS_FOR_GAQ_PERIODS} ) | ||||||
UNION | ||||||
( ${SELECT_RUNS_TO_TIMESTAMPS_FOR_GAQ_PERIODS} ) | ||||||
UNION | ||||||
-- Two selectes for timestamps of QC flags' effective periods | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Simple typo |
||||||
( ${SELECT_QCF_EFFECTIVE_PERIODS_FROM_TIMESTAMPS_FOR_GAQ_PERIODS} ) | ||||||
UNION | ||||||
( ${SELECT_QCF_EFFECTIVE_PERIODS_TO_TIMESTAMPS_FOR_GAQ_PERIODS} ) | ||||||
|
||||||
ORDER BY ordering_timestamp | ||||||
) AS ap | ||||||
) AS gaq_periods_with_last_nullish_row | ||||||
WHERE gaq_periods_with_last_nullish_row.\`to_ordering_timestamp\` IS NOT NULL | ||||||
`; | ||||||
|
||||||
const DROP_GAQ_PERIODS_TIMESTAMPS_VIEW = 'DROP VIEW gaq_periods_timestamps'; | ||||||
|
||||||
const CREATE_GAQ_PERIODS_VIEW = ` | ||||||
CREATE OR REPLACE VIEW gaq_periods AS | ||||||
SELECT | ||||||
gaq_periods_timestamps.data_pass_id AS dataPassId, | ||||||
gaq_periods_timestamps.run_number AS runNumber, | ||||||
gaq_periods_timestamps.\`from\` AS \`from\`, | ||||||
gaq_periods_timestamps.\`to\` AS \`to\`, | ||||||
|
||||||
IF(COUNT( DISTINCT gaqd.detector_id ) > COUNT( DISTINCT qcfep.flag_id ), | ||||||
null, | ||||||
SUM(qcft.bad) >= 1 | ||||||
) AS bad, | ||||||
IF(COUNT( DISTINCT gaqd.detector_id ) > COUNT( DISTINCT qcfep.flag_id ), | ||||||
null, | ||||||
SUM(IF(qcft.monte_carlo_reproducible, false, qcft.bad)) >= 1 | ||||||
) AS badWhenMcReproducibleAsNotBad, | ||||||
SUM(qcft.bad) = SUM(qcft.monte_carlo_reproducible) AND SUM(qcft.monte_carlo_reproducible) AS mcReproducible, | ||||||
GROUP_CONCAT( DISTINCT qcfv.flag_id ) AS verifiedFlagsList, | ||||||
GROUP_CONCAT( DISTINCT qcfep.flag_id ) AS flagsList | ||||||
|
||||||
FROM gaq_periods_timestamps | ||||||
INNER JOIN global_aggregated_quality_detectors AS gaqd | ||||||
ON gaqd.data_pass_id = gaq_periods_timestamps.data_pass_id | ||||||
AND gaqd.run_number = gaq_periods_timestamps.run_number | ||||||
|
||||||
LEFT JOIN ( | ||||||
data_pass_quality_control_flag AS dpqcf | ||||||
INNER JOIN quality_control_flags AS qcf | ||||||
ON dpqcf.quality_control_flag_id = qcf.id | ||||||
INNER JOIN quality_control_flag_types AS qcft | ||||||
ON qcft.id = qcf.flag_type_id | ||||||
INNER JOIN quality_control_flag_effective_periods AS qcfep | ||||||
ON qcf.id = qcfep.flag_id | ||||||
LEFT JOIN quality_control_flag_verifications AS qcfv | ||||||
ON qcfv.flag_id = qcf.id | ||||||
) | ||||||
ON gaq_periods_timestamps.data_pass_id = dpqcf.data_pass_id | ||||||
AND qcf.run_number = gaq_periods_timestamps.run_number | ||||||
AND gaqd.detector_id = qcf.detector_id | ||||||
AND gaq_periods_timestamps.run_number = qcf.run_number | ||||||
AND (qcfep.\`from\` IS NULL OR UNIX_TIMESTAMP(qcfep.\`from\`) <= gaq_periods_timestamps.\`from\`) | ||||||
AND (qcfep.\`to\` IS NULL OR gaq_periods_timestamps.\`to\` <= UNIX_TIMESTAMP(qcfep.\`to\`)) | ||||||
|
||||||
WHERE gaq_periods_timestamps.\`to\` IS NOT null | ||||||
|
||||||
GROUP BY | ||||||
gaq_periods_timestamps.data_pass_id, | ||||||
gaq_periods_timestamps.run_number, | ||||||
gaq_periods_timestamps.\`from\`, | ||||||
gaq_periods_timestamps.\`to\` | ||||||
`; | ||||||
|
||||||
const DROP_GAQ_PERIODS_VIEW = 'DROP VIEW gaq_periods'; | ||||||
|
||||||
/** @type {import('sequelize-cli').Migration} */ | ||||||
module.exports = { | ||||||
up: async (queryInterface) => queryInterface.sequelize.transaction(async (transaction) => { | ||||||
await queryInterface.sequelize.query(CREATE_GAQ_PERIODS_TIMESTAMPS_VIEW, { transaction }); | ||||||
await queryInterface.sequelize.query(CREATE_GAQ_PERIODS_VIEW, { transaction }); | ||||||
}), | ||||||
|
||||||
down: async (queryInterface) => queryInterface.sequelize.transaction(async (transaction) => { | ||||||
await queryInterface.sequelize.query(DROP_GAQ_PERIODS_VIEW, { transaction }); | ||||||
await queryInterface.sequelize.query(DROP_GAQ_PERIODS_TIMESTAMPS_VIEW, { transaction }); | ||||||
}), | ||||||
}; |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,51 +15,6 @@ const { Op } = require('sequelize'); | |
const { models: { QcFlag } } = require('..'); | ||
const Repository = require('./Repository'); | ||
|
||
const GAQ_PERIODS_VIEW = ` | ||
SELECT | ||
data_pass_id, | ||
run_number, | ||
timestamp AS \`from\`, | ||
NTH_VALUE(timestamp, 2) OVER ( | ||
PARTITION BY data_pass_id, | ||
run_number | ||
ORDER BY ap.timestamp | ||
ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING | ||
) AS \`to\` | ||
FROM ( | ||
( | ||
SELECT gaqd.data_pass_id, | ||
gaqd.run_number, | ||
COALESCE(UNIX_TIMESTAMP(qcfep.\`from\`), 0) AS timestamp | ||
FROM quality_control_flag_effective_periods AS qcfep | ||
INNER JOIN quality_control_flags AS qcf ON qcf.id = qcfep.flag_id | ||
INNER JOIN data_pass_quality_control_flag AS dpqcf ON dpqcf.quality_control_flag_id = qcf.id | ||
-- Only flags of detectors which are defined in global_aggregated_quality_detectors | ||
-- should be taken into account for calculation of gaq_effective_periods | ||
INNER JOIN global_aggregated_quality_detectors AS gaqd | ||
ON gaqd.data_pass_id = dpqcf.data_pass_id | ||
AND gaqd.run_number = qcf.run_number | ||
AND gaqd.detector_id = qcf.detector_id | ||
) | ||
UNION | ||
( | ||
SELECT gaqd.data_pass_id, | ||
gaqd.run_number, | ||
UNIX_TIMESTAMP(COALESCE(qcfep.\`to\`, NOW())) AS timestamp | ||
FROM quality_control_flag_effective_periods AS qcfep | ||
INNER JOIN quality_control_flags AS qcf ON qcf.id = qcfep.flag_id | ||
INNER JOIN data_pass_quality_control_flag AS dpqcf ON dpqcf.quality_control_flag_id = qcf.id | ||
-- Only flags of detectors which are defined in global_aggregated_quality_detectors | ||
-- should be taken into account for calculation of gaq_effective_periods | ||
INNER JOIN global_aggregated_quality_detectors AS gaqd | ||
ON gaqd.data_pass_id = dpqcf.data_pass_id | ||
AND gaqd.run_number = qcf.run_number | ||
AND gaqd.detector_id = qcf.detector_id | ||
) | ||
ORDER BY timestamp | ||
) AS ap | ||
`; | ||
|
||
/** | ||
* @typedef GaqPeriod | ||
* | ||
|
@@ -101,33 +56,10 @@ class QcFlagRepository extends Repository { | |
*/ | ||
async findGaqPeriods(dataPassId, runNumber) { | ||
const query = ` | ||
SELECT | ||
gaq_periods.data_pass_id AS dataPassId, | ||
gaq_periods.run_number AS runNumber, | ||
IF(gaq_periods.\`from\` = 0, null, gaq_periods.\`from\` * 1000) AS \`from\`, | ||
IF(gaq_periods.\`to\` = UNIX_TIMESTAMP(NOW()), null, gaq_periods.\`to\` * 1000) AS \`to\`, | ||
group_concat(qcf.id) AS contributingFlagIds | ||
|
||
FROM quality_control_flags AS qcf | ||
INNER JOIN quality_control_flag_effective_periods AS qcfep | ||
ON qcf.id = qcfep.flag_id | ||
INNER JOIN data_pass_quality_control_flag AS dpqcf ON dpqcf.quality_control_flag_id = qcf.id | ||
INNER JOIN (${GAQ_PERIODS_VIEW}) AS gaq_periods ON gaq_periods.data_pass_id = dpqcf.data_pass_id | ||
INNER JOIN global_aggregated_quality_detectors AS gaqd | ||
ON gaqd.data_pass_id = gaq_periods.data_pass_id | ||
AND gaqd.run_number = gaq_periods.run_number | ||
AND gaqd.detector_id = qcf.detector_id | ||
AND gaq_periods.run_number = qcf.run_number | ||
AND (qcfep.\`from\` IS NULL OR UNIX_TIMESTAMP(qcfep.\`from\`) <= gaq_periods.\`from\`) | ||
AND (qcfep.\`to\` IS NULL OR gaq_periods.\`to\` <= UNIX_TIMESTAMP(qcfep.\`to\`)) | ||
|
||
WHERE gaq_periods.data_pass_id = ${dataPassId} | ||
${runNumber ? `AND gaq_periods.run_number = ${runNumber}` : ''} | ||
|
||
GROUP BY gaq_periods.run_number, | ||
gaq_periods.data_pass_id, | ||
gaq_periods.\`from\`, | ||
gaq_periods.\`to\`; | ||
SELECT * FROM gaq_periods | ||
WHERE IF(gaq_periods.\`to\` = UNIX_TIMESTAMP(NOW(3)), null, gaq_periods.\`to\`) IS NOT NULL | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I am correct, you do not need the if anymore? |
||
AND gaq_periods.dataPassId = ${dataPassId} | ||
${runNumber ? `AND gaq_periods.runNumber = ${runNumber}` : ''} | ||
`; | ||
|
||
const [rows] = await this.model.sequelize.query(query); | ||
|
@@ -136,14 +68,14 @@ class QcFlagRepository extends Repository { | |
runNumber, | ||
from, | ||
to, | ||
contributingFlagIds, | ||
flagsList, | ||
}) => ({ | ||
dataPassId, | ||
runNumber, | ||
from, | ||
to, | ||
contributingFlagIds: contributingFlagIds.split(',').map((id) => parseInt(id, 10)), | ||
})); | ||
from: from * 1000, // Change unix seconds to miliseconds | ||
to: to * 1000, | ||
contributingFlagIds: flagsList ? flagsList.split(',').map((id) => parseInt(id, 10)) : [], | ||
})).filter(({ contributingFlagIds }) => contributingFlagIds.length > 0); | ||
} | ||
|
||
/** | ||
|
@@ -156,92 +88,57 @@ class QcFlagRepository extends Repository { | |
* @return {Promise<RunGaqSubSummary[]>} Resolves with the GAQ sub-summaries | ||
*/ | ||
async getRunGaqSubSummaries(dataPassId, { mcReproducibleAsNotBad = false } = {}) { | ||
const effectivePeriodsWithTypeSubQuery = ` | ||
SELECT | ||
gaq_periods.data_pass_id AS dataPassId, | ||
gaq_periods.run_number AS runNumber, | ||
IF(gaq_periods.\`from\` = 0, null, gaq_periods.\`from\`) AS \`from\`, | ||
IF(gaq_periods.\`to\` = UNIX_TIMESTAMP(NOW()), null, gaq_periods.\`to\`) AS \`to\`, | ||
SUM(IF(qcft.monte_carlo_reproducible AND :mcReproducibleAsNotBad, false, qcft.bad)) >= 1 AS bad, | ||
SUM(qcft.bad) = SUM(qcft.monte_carlo_reproducible) AND SUM(qcft.monte_carlo_reproducible) AS mcReproducible, | ||
GROUP_CONCAT( DISTINCT qcfv.flag_id ) AS verifiedFlagsList, | ||
GROUP_CONCAT( DISTINCT qcf.id ) AS flagsList | ||
|
||
FROM quality_control_flags AS qcf | ||
INNER JOIN quality_control_flag_types AS qcft | ||
ON qcft.id = qcf.flag_type_id | ||
LEFT JOIN quality_control_flag_verifications AS qcfv | ||
ON qcfv.flag_id = qcf.id | ||
INNER JOIN quality_control_flag_effective_periods AS qcfep | ||
ON qcf.id = qcfep.flag_id | ||
INNER JOIN data_pass_quality_control_flag AS dpqcf | ||
ON dpqcf.quality_control_flag_id = qcf.id | ||
INNER JOIN (${GAQ_PERIODS_VIEW}) AS gaq_periods | ||
ON gaq_periods.data_pass_id = dpqcf.data_pass_id | ||
INNER JOIN global_aggregated_quality_detectors AS gaqd | ||
ON gaqd.data_pass_id = gaq_periods.data_pass_id | ||
AND gaqd.run_number = gaq_periods.run_number | ||
AND gaqd.detector_id = qcf.detector_id | ||
AND gaq_periods.run_number = qcf.run_number | ||
AND (qcfep.\`from\` IS NULL OR UNIX_TIMESTAMP(qcfep.\`from\`) <= gaq_periods.\`from\`) | ||
AND (qcfep.\`to\` IS NULL OR gaq_periods.\`to\` <= UNIX_TIMESTAMP(qcfep.\`to\`)) | ||
|
||
GROUP BY | ||
gaq_periods.data_pass_id, | ||
gaq_periods.run_number, | ||
gaq_periods.\`from\`, | ||
gaq_periods.\`to\` | ||
`; | ||
|
||
const query = ` | ||
SELECT | ||
effectivePeriods.runNumber, | ||
effectivePeriods.dataPassId, | ||
effectivePeriods.bad, | ||
SUM(effectivePeriods.mcReproducible) > 0 AS mcReproducible, | ||
GROUP_CONCAT(effectivePeriods.verifiedFlagsList) AS verifiedFlagsList, | ||
GROUP_CONCAT(effectivePeriods.flagsList) AS flagsList, | ||
gaq_periods.runNumber, | ||
gaq_periods.dataPassId, | ||
gaq_periods.bad, | ||
gaq_periods.badWhenMcReproducibleAsNotBad, | ||
SUM(gaq_periods.mcReproducible) > 0 AS mcReproducible, | ||
GROUP_CONCAT(gaq_periods.verifiedFlagsList) AS verifiedFlagsList, | ||
GROUP_CONCAT(gaq_periods.flagsList) AS flagsList, | ||
|
||
IF( | ||
run.time_start IS NULL OR run.time_end IS NULL, | ||
IF( | ||
effectivePeriods.\`from\` IS NULL AND effectivePeriods.\`to\` IS NULL, | ||
gaq_periods.\`from\` IS NULL AND gaq_periods.\`to\` IS NULL, | ||
1, | ||
null | ||
), | ||
SUM( | ||
COALESCE(effectivePeriods.\`to\`, UNIX_TIMESTAMP(run.time_end)) | ||
- COALESCE(effectivePeriods.\`from\`, UNIX_TIMESTAMP(run.time_start)) | ||
COALESCE(gaq_periods.\`to\`, UNIX_TIMESTAMP(run.time_end)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure, but I think here what you should use is |
||
- COALESCE(gaq_periods.\`from\`, UNIX_TIMESTAMP(run.time_start)) | ||
) / ( | ||
UNIX_TIMESTAMP(run.time_end) - UNIX_TIMESTAMP(run.time_start) | ||
) | ||
) AS effectiveRunCoverage | ||
|
||
FROM (${effectivePeriodsWithTypeSubQuery}) AS effectivePeriods | ||
INNER JOIN runs AS run ON run.run_number = effectivePeriods.runNumber | ||
FROM gaq_periods | ||
INNER JOIN runs AS run ON run.run_number = gaq_periods.runNumber | ||
|
||
WHERE effectivePeriods.dataPassId = :dataPassId | ||
WHERE gaq_periods.dataPassId = :dataPassId | ||
|
||
GROUP BY | ||
effectivePeriods.dataPassId, | ||
effectivePeriods.runNumber, | ||
effectivePeriods.bad | ||
gaq_periods.dataPassId, | ||
gaq_periods.runNumber, | ||
gaq_periods.bad | ||
`; | ||
|
||
const [rows] = await this.model.sequelize.query(query, { replacements: { dataPassId, mcReproducibleAsNotBad } }); | ||
const [rows] = await this.model.sequelize.query(query, { replacements: { dataPassId } }); | ||
return rows.map(({ | ||
runNumber, | ||
bad, | ||
badWhenMcReproducibleAsNotBad, | ||
effectiveRunCoverage, | ||
mcReproducible, | ||
flagsList, | ||
verifiedFlagsList, | ||
}) => ({ | ||
runNumber, | ||
bad, | ||
bad: mcReproducibleAsNotBad ? badWhenMcReproducibleAsNotBad : bad, | ||
martinboulais marked this conversation as resolved.
Show resolved
Hide resolved
|
||
effectiveRunCoverage: parseFloat(effectiveRunCoverage) || null, | ||
mcReproducible: Boolean(mcReproducible), | ||
flagsIds: [...new Set(flagsList.split(','))], | ||
flagsIds: flagsList ? [...new Set(flagsList.split(','))] : [], | ||
martinboulais marked this conversation as resolved.
Show resolved
Hide resolved
|
||
verifiedFlagsIds: verifiedFlagsList ? [...new Set(verifiedFlagsList.split(','))] : [], | ||
})); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This kind of function are relatively rarely used (in my experience), it might be useful to put a small comment stating what you are doing here?
Also, maybe you can replace the
NTH_VALUE
andROWS BETWEEN...
by theLAG
function? You might need to reverse the order, not sure