From 8f1c536be208ee455123ff8e8c90f6460c66a8f2 Mon Sep 17 00:00:00 2001 From: Christian Parker Date: Wed, 18 Oct 2023 06:31:43 -0700 Subject: [PATCH 1/5] Added some debugging --- cdk/stack.ts | 2 +- src/adapters/acumar.js | 4 ++++ src/lib/measurement.js | 5 ++++- 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/cdk/stack.ts b/cdk/stack.ts index b04c70b4..55cdfdec 100644 --- a/cdk/stack.ts +++ b/cdk/stack.ts @@ -12,7 +12,7 @@ import { copyFileSync, readdirSync } from 'fs'; interface keyable { - [key: string]: string + [key: string]: string } interface StackProps extends cdk.StackProps { diff --git a/src/adapters/acumar.js b/src/adapters/acumar.js index 13c49366..92761331 100644 --- a/src/adapters/acumar.js +++ b/src/adapters/acumar.js @@ -32,6 +32,7 @@ export const name = 'acumar'; export async function fetchData (source, cb) { try { if (source.datetime) { + log.debug(`Fetching data with ${source.datetime}`); const sourceLuxon = DateTime.fromISO(source.datetime); const dateLuxon = sourceLuxon.toFormat('dd/MM/yy'); const hourLuxon = sourceLuxon.toFormat('HH'); @@ -96,6 +97,7 @@ async function getPollutionData(station, dateLuxon, hourLuxon) { let firstDataRow; if (dateLuxon && hourLuxon) { + log.debug(`Checking for data: ${dateLuxon}, ${hourLuxon}`); firstDataRow = $('table') .eq(station.table) .find('tr') @@ -111,6 +113,7 @@ async function getPollutionData(station, dateLuxon, hourLuxon) { }) .first(); } else { + log.warn(`Could not match date and hour: ${dateLuxon}, ${hourLuxon}`); firstDataRow = $('table') .eq(station.table) .find('tr') @@ -140,6 +143,7 @@ async function getPollutionData(station, dateLuxon, hourLuxon) { .trim() ); + results.push({ city: 'Buenos Aires', location: station.station, diff --git a/src/lib/measurement.js b/src/lib/measurement.js index 82537c14..153ad2ff 100644 --- a/src/lib/measurement.js +++ b/src/lib/measurement.js @@ -87,7 +87,10 @@ function createFetchObject (input, source, failures, dryRun) { .catch(ignore); return { - fetchStarted: Date.now(), + get fetchStarted () { + log.info(`Started ${source.name}`); + return Date.now(); + }, get fetchEnded () { return fetchEnded; }, From 4a52cac6fd1969fe0b767647244855d2c3f1b522 Mon Sep 17 00:00:00 2001 From: Christian Parker Date: Wed, 18 Oct 2023 15:43:52 -0700 Subject: [PATCH 2/5] A little cleanup --- src/adapters/acumar.js | 61 ------------------------------------------ src/lib/constants.js | 2 +- src/lib/measurement.js | 2 +- src/lib/requests.js | 7 ++--- 4 files changed, 4 insertions(+), 68 deletions(-) diff --git a/src/adapters/acumar.js b/src/adapters/acumar.js index bf9cd0ac..10c31370 100644 --- a/src/adapters/acumar.js +++ b/src/adapters/acumar.js @@ -96,12 +96,7 @@ async function getPollutionData ( const $ = load(response.body); if (dateLuxon && hourLuxon) { -<<<<<<< HEAD - log.debug(`Checking for data: ${dateLuxon}, ${hourLuxon}`); - firstDataRow = $('table') -======= const firstDataRowIndex = $('table') ->>>>>>> b2f175dd5c954b41efe54f5765bd432930a53dd0 .eq(station.table) .find('tr') .get() @@ -114,71 +109,15 @@ async function getPollutionData ( .trim() .replace(' hs.', ''); return dateCell === dateLuxon && hourCell === hourLuxon; -<<<<<<< HEAD - }) - .first(); - } else { - log.warn(`Could not match date and hour: ${dateLuxon}, ${hourLuxon}`); - firstDataRow = $('table') -======= }); const timeRows = $('table') ->>>>>>> b2f175dd5c954b41efe54f5765bd432930a53dd0 .eq(station.table) .find('tr') .slice(firstDataRowIndex, firstDataRowIndex + numRows); -<<<<<<< HEAD - const dateStr = firstDataRow.find('td').eq(0).text().trim(); - const timeStr = firstDataRow - .find('td') - .eq(1) - .text() - .trim() - .replace(' hs.', ''); - const localDate = DateTime.fromFormat( - `${dateStr} ${timeStr}`, - 'dd/MM/yy H', - { zone: 'America/Argentina/Buenos_Aires' } - ); - const utcDate = localDate.toUTC(); - - pollutantParams.forEach((param, index) => { - const value = parseFloat( - firstDataRow - .find('td') - .eq(index + 2) - .text() - .trim() - ); - - - results.push({ - city: 'Buenos Aires', - location: station.station, - parameter: param, - value, - unit: param === 'co' ? 'mg/m³' : 'µg/m³', - date: { - local: localDate.toISO({ suppressMilliseconds: true }), - utc: utcDate.toISO({ suppressMilliseconds: true }), - }, - coordinates: station.coordinates, - attribution: [ - { - name: 'ACUMAR', - url: station.url, - }, - ], - averagingPeriod: { - unit: 'hours', - value: 1, - }, -======= timeRows.each((_, row) => { processRow($, row, station, pollutantParams, results); ->>>>>>> b2f175dd5c954b41efe54f5765bd432930a53dd0 }); } else { const recentRows = $('table') diff --git a/src/lib/constants.js b/src/lib/constants.js index e0a39307..098030f0 100644 --- a/src/lib/constants.js +++ b/src/lib/constants.js @@ -2,5 +2,5 @@ 'use strict'; // Default timeout to use for requests module -export const REQUEST_TIMEOUT = process.env.REQUEST_TIMEOUT || 5; +export const REQUEST_TIMEOUT = process.env.REQUEST_TIMEOUT || 15000; export const REQUEST_RETRIES = 3; diff --git a/src/lib/measurement.js b/src/lib/measurement.js index 7064ae62..5c92792f 100644 --- a/src/lib/measurement.js +++ b/src/lib/measurement.js @@ -52,7 +52,7 @@ async function getStreamFromAdapter (adapter, source) { const fetchData = promisify(adapter.fetchData) const data = await fetchData(source) .catch(err => { - throw new AdapterError(ADAPTER_ERROR, null, err.code) + throw new AdapterError(ADAPTER_ERROR, source, err && err.message && err.message.code) }); const out = DataStream.from(data.measurements); out.name = data.name; diff --git a/src/lib/requests.js b/src/lib/requests.js index 9c95cc28..48e3b0fd 100644 --- a/src/lib/requests.js +++ b/src/lib/requests.js @@ -7,9 +7,6 @@ import { AdapterError, DATA_URL_ERROR } from './errors.js'; const headers = { 'User-Agent': 'OpenAQ' } - - - export default (source, cb) => { let url, timeout, retries; if(typeof(source) === 'object' && source.url) { @@ -28,10 +25,10 @@ export default (source, cb) => { // setup the options const requestClient = got.extend({ timeout: { - request: REQUEST_TIMEOUT + request: timeout, }, retry: { - limit: source.retries || 3, + limit: retries, errorCodes: [ 'ETIMEDOUT' ], From a55a77ea58fd400e147f1fa30019eead91a1832b Mon Sep 17 00:00:00 2001 From: Christian Parker Date: Thu, 19 Oct 2023 09:29:10 -0700 Subject: [PATCH 3/5] Removed the inline filtering of sources Instead of using the old pipe use(filter) we are going to rely on filtering the sources ahead of time and passing all desired sources to be run. --- README.md | 21 ++++++++++++- src/deployments.js | 9 +++--- src/fetch.js | 25 ++++++++++++---- src/index.js | 74 ++++++++-------------------------------------- src/lib/env.js | 15 ++++++++++ src/lib/errors.js | 1 - src/scheduler.js | 64 ++++++++++++++++++++++++++------------- 7 files changed, 115 insertions(+), 94 deletions(-) diff --git a/README.md b/README.md index ca7743ed..7580335c 100644 --- a/README.md +++ b/README.md @@ -72,6 +72,25 @@ For a more detailed description of the command line options available, use: `nod ## Deployment Deployment is is being built from the lambda-deployment branch. Any development for openaq-fetch should be branched/merged from/to the lambda-deployment branch until further notice. +Deployments rely on a json object that contains the different deployments. The schedular is then used to loop through that object and post a message that will trigger a lambda to run that deployment. A deployment consists of a set of arguments that are passed to the fetch script to limit the sources that are run. + +You can test the deployments with the following + +Show all deployments but dont submit and dont run the fetcher +` +node index.js --dryrun --deployments all --nofetch +` +Only the japan deployment but dont run the fetcher +` +node index.js --dryrun --deployments japan --nofetch +` + +Only the japan deployment, dont submit a file but run the fetcher +` +node index.js --dryrun --deployments japan +` + + ## Data Source Criteria This section lists the key criteria for air quality data aggregated @@ -83,7 +102,7 @@ this section. 1. Data must be of one of these pollutant types: PM10, PM2.5, sulfur dioxide (SO2), carbon monoxide (CO), nitrogen dioxide (NO2), ozone (O3), and black carbon (BC). -2. Data must be from an official-level outdoor air quality source, as defined as data produced by a government entity or international organization. We do not, at this stage, include data from low-cost, temporary, and/or indoor sensors.  +2. Data must be from an official-level outdoor air quality source, as defined as data produced by a government entity or international organization. We do not, at this stage, include data from low-cost, temporary, and/or indoor sensors. 3. Data must be ‘raw’ and reported in physical concentrations on their originating site. Data cannot be shared in an 'Air Quality Index' or equivalent (e.g. AQI, PSI, API) format. diff --git a/src/deployments.js b/src/deployments.js index bec9408b..debc6610 100644 --- a/src/deployments.js +++ b/src/deployments.js @@ -26,11 +26,12 @@ const deploymentsArray = [ name: 'acumar', adapter: 'acumar', offset: 72 + }, + // Japan is slow and needs to be alone + { + name: 'japan', + source: 'japan-soramame', } -// } -// name: 'japan', -// source: 'japan-soramame' -// } ]; export { deploymentsArray }; diff --git a/src/fetch.js b/src/fetch.js index 64983d96..da8cec85 100644 --- a/src/fetch.js +++ b/src/fetch.js @@ -50,8 +50,6 @@ const runningSources = {}; * Run all the data fetch tasks in parallel, simply logs out results */ export function handler (event, context) { - log.debug(event); - console.log(event); // the event may have passed a source, in which case we need to filter let currentSources; let offset; @@ -60,6 +58,7 @@ export function handler (event, context) { // sure that we dont overwrite another process let suffix = env.suffix || '_na'; if (event && event.Records && event.Records.length) { + log.debug(`Getting sources from event data - ${event.Records && event.Records.length} records`) const messageId = event.Records[0].messageId; if (event.Records.length === 1) { const body = JSON.parse(event.Records[0].body); @@ -77,30 +76,39 @@ export function handler (event, context) { } suffix = `_${suffix}${messageId}`; } else if (event && event.sources) { + log.debug(`Getting sources from event sources - ${event.sources && event.sources.length} sources`) const messageId = 'event'; currentSources = event.sources; offset = event.offset; datetime = event.datetime; suffix = `_${event.suffix || suffix}${messageId}`; } else if (event && event.source) { + log.debug(`Getting source from event source`) currentSources = sourcesArray.filter( (d) => d.name === event.source ); + } else if (event && event.adapter) { + log.debug(`Getting sources from event adapter ${event.adapter}`) currentSources = sourcesArray.filter( (d) => d.adapter === event.adapter ); } else if (env.adapter) { + log.debug(`Getting sources from env variable adapter ${env.adapter}`) currentSources = sourcesArray.filter( (d) => d.adapter === env.adapter ); + } else if (env.source) { + log.debug(`Getting source from env variable source ${env.source}`) + currentSources = sourcesArray.filter( + (d) => d.name === env.source + ); } else { + log.debug(`Getting sources from active sources array`) currentSources = sourcesArray; } // and the final file name - env.key = `realtime/${DateTime.now().toFormat( - 'yyyy-MM-dd/X' - )}${suffix}.ndjson`; + env.key = `realtime/${DateTime.now().toFormat('yyyy-MM-dd/X')}${suffix}.ndjson`; if (offset) { env.offset = offset; @@ -110,6 +118,11 @@ export function handler (event, context) { env.datetime = datetime; } + if (env.nofetch) { + log.info(`Skipping fetch for ${currentSources.length} sources and saving to ${env.key}`) + return true; + } + return Promise.race([ handleSigInt(runningSources), handleProcessTimeout(processTimeout, runningSources), @@ -141,7 +154,7 @@ export function handler (event, context) { // filter sources - if env is set then choose only matching source, // otherwise filter out inactive sources. // * inactive sources will be run if called by name in env. - .use(chooseSourcesBasedOnEnv, env, runningSources) + //.use(chooseSourcesBasedOnEnv, env, runningSources) // mark sources as started .do(markSourceAs('started', runningSources)) // get measurements object from given source diff --git a/src/index.js b/src/index.js index e2e80096..af8cd1a3 100644 --- a/src/index.js +++ b/src/index.js @@ -1,69 +1,21 @@ import { handler as fetcher } from './fetch.js'; import { handler as scheduler } from './scheduler.js'; import { SQS } from '@aws-sdk/client-sqs'; +import _env from './lib/env.js'; -const event = { - Records: [ - { - messageId: '9a70768a-3a75-4690-b1b8-8742264ff4f4', - receiptHandle: - 'AQEB5sTdPJrZXryC2sRtG+cOo29FNzp/O+REZkHXZANKoaPJ9+f9nhpNIRs/GM4qoM1iWnJP1jANkUFkvfUovJ44GYYY8ja8UU7kGLu0i0Ngw9hPiIWVFNCmvZ2e/XOXKKkJvBbuKloHg0i92GjmUvsNQ/d249hW2RdHY9Y2sDu0giAi5w0USPNMxIeC1ibedxZnSKWpPngroebepIxaUDwBym29tE+L5xOtGhx6HRLR5qOWwHoiMOepecnM3Q6yhzyW6vY/AaL7DXIoXOVFCAtp0VliBVFWk8sct91dTjDbJMAx8/LEMHtKqXVyKG+Zs4zcOUMmTw1XIk50AOLgTIRQJ1XE/yWKU2bvBcPBbpvOwpPFKwYTaHNUN2ZxpLZAUhh7M0U2rZgYO3sfcOBME8grng==', - body: '[{"url":"http://api.erg.ic.ac.uk","adapter":"laqn","name":"London Air Quality Network","country":"GB","description":"Environmental Research Group","sourceURL":"http://api.erg.ic.ac.uk/AirQuality/Information/Documentation/pdf","contacts":["info@openaq.org"],"active":true,"datetime":"2022-04-27 06:03:26", "offset": 4 }]', - attributes: [], - messageAttributes: {}, - md5OfBody: 'e29b76c2f919bc4aa0863332ce0198ee', - eventSource: 'aws:sqs', - eventSourceARN: - 'arn:aws:sqs:us-east-1:470049585876:realtime-fetcher-queue', - awsRegion: 'us-east-1', - }, - ], -}; - -const events = { - Records: [ - { - messageId: '9a70768a-3a75-4690-b1b8-8742264ff4f4', - receiptHandle: - 'AQEB5sTdPJrZXryC2sRtG+cOo29FNzp/O+REZkHXZANKoaPJ9+f9nhpNIRs/GM4qoM1iWnJP1jANkUFkvfUovJ44GYYY8ja8UU7kGLu0i0Ngw9hPiIWVFNCmvZ2e/XOXKKkJvBbuKloHg0i92GjmUvsNQ/d249hW2RdHY9Y2sDu0giAi5w0USPNMxIeC1ibedxZnSKWpPngroebepIxaUDwBym29tE+L5xOtGhx6HRLR5qOWwHoiMOepecnM3Q6yhzyW6vY/AaL7DXIoXOVFCAtp0VliBVFWk8sct91dTjDbJMAx8/LEMHtKqXVyKG+Zs4zcOUMmTw1XIk50AOLgTIRQJ1XE/yWKU2bvBcPBbpvOwpPFKwYTaHNUN2ZxpLZAUhh7M0U2rZgYO3sfcOBME8grng==', - body: '[{"url":"http://api.erg.ic.ac.uk","adapter":"laqn","name":"London Air Quality Network","country":"GB","description":"Environmental Research Group","sourceURL":"http://api.erg.ic.ac.uk/AirQuality/Information/Documentation/pdf","contacts":["info@openaq.org"],"active":true,"datetime":"2022-04-27 06:03:26", "offset": 4 },{"url": "http://files.airnowtech.org/", "adapter": "airnow-http", "name": "AirNow", "city": "", "country": "", "description": "AirNow adapter using files streamed over http", "sourceURL": "http://www.airnow.gov/", "contacts": [ "info@openaq.org" ], "active": true, "offset": 10}]', - attributes: [], - messageAttributes: {}, - md5OfBody: 'e29b76c2f919bc4aa0863332ce0198ee', - eventSource: 'aws:sqs', - eventSourceARN: - 'arn:aws:sqs:us-east-1:470049585876:realtime-fetcher-queue', - awsRegion: 'us-east-1', - }, - ], -}; - -const getMessage = async () => { - const sqs = new SQS(); - return await sqs - .receiveMessage({ - QueueUrl: process.env.QUEUE_NAME, - }) - .then(({ Messages }) => { - if (!Messages) return []; - try { - // use the messageid and body but update the format - // to fit the way that the lambda will see it - event.Records[0].messageId = Messages[0].MessageId; - event.Records[0].body = Messages[0].Body; - return event; - } catch (err) { - console.debug(`Could not parse body: ${Messages[0].Body}`); - console.error(err); - return []; - } - }); -}; +const env = _env(); // submit a new set (async () => { - //let sqs = await scheduler({}, {}); - //sqs.map(q => fetcher(q, {})); - //const event = await getMessage(); - fetcher(); + if (env.deployments) { + console.log(`Testing the scheduler with ${env.deployments}`) + let sqs = await scheduler({}, {}); + let deployments = env.deployments.split(',') + sqs = sqs.filter(s => deployments.includes('all')||deployments.includes(s.name)) + // now for each of these run them through the fetcher + sqs.map(q => fetcher(q, null)); + } else { + console.log(`Testing the fetcher directly`) + fetcher(); + } })(); diff --git a/src/lib/env.js b/src/lib/env.js index 82c40e40..158e404f 100644 --- a/src/lib/env.js +++ b/src/lib/env.js @@ -95,6 +95,17 @@ const _argv = yargs alias: 'e', group: 'Main options:' }) + .options('deployments', { + describe: 'Use the scheduler to pass deployment events to the fetcher. Specify all or a deployment name', + alias: 'D', + group: 'Testing options:' + }) + .options('nofetch', { + boolean: false, + describe: 'Skip the actual fetch process', + alias: 'n', + group: 'Testing options:' + }) .options('datetime', { describe: 'The date/time to query for, if the adapter handles it', alias: 't', @@ -138,6 +149,8 @@ export const readEnvFromLocalFile = (envFile) => { export default () => { let { dryrun, + deployments, + nofetch, debug, source, adapter, @@ -208,6 +221,8 @@ export default () => { doSaveToS3, strict, dryrun, + deployments, + nofetch, debug, source, datetime, diff --git a/src/lib/errors.js b/src/lib/errors.js index 54d11215..3d4f2392 100644 --- a/src/lib/errors.js +++ b/src/lib/errors.js @@ -214,7 +214,6 @@ export async function handleUnresolvedPromises (strict) { export function handleFetchErrors () { return (error) => { - log.error('Handle fetch errors') const cause = error instanceof FetchError ? error : error.cause; if (cause instanceof FetchError) { diff --git a/src/scheduler.js b/src/scheduler.js index b48a2ea5..c7f39246 100644 --- a/src/scheduler.js +++ b/src/scheduler.js @@ -1,14 +1,22 @@ import { SQS } from '@aws-sdk/client-sqs'; import { sourcesArray } from './sources/index.js'; import { deploymentsArray } from './deployments.js'; +import _env from './lib/env.js'; +const env = _env(); + +/** + * This is the entry point for the schedular lamdba. The purpose is to create a + * new message (SQS) for each deployment found in the deploymentsArray. + * Each message will then be handled by an invocation of the fetch lambda found in + * fetch.js + */ export async function handler (event, context) { // default to all active sources const sqs = new SQS(); // start by checking for deployments, if we dont have one // we create one for consistancy - if (deploymentsArray.length === 0) { deploymentsArray.push({ name: 'main', @@ -19,39 +27,53 @@ export async function handler (event, context) { } // now filter down to the deployments that need to run right now - return await Promise.all( deploymentsArray .filter((d) => !d.resolution || d.resolution === '1h') .map(async (d) => { + // start by assuming that we are running them all and therefor + // remove the inactive ones let sources = sourcesArray.filter((d) => d.active); - if (d.resolution) { - sources = sources.filter( - (s) => s.resolution === d.resolution - ); - } + // if the deployment has specified a spsecific source than we + // assume you want it event its marked inactive if (d.source) { - // assume you want it event its marked inactive sources = sourcesArray.filter((s) => s.name === d.source); - // only run one adapter + // or if you have want to run run only one adapter + // source and adapter cant be run together } else if (d.adapter) { + // this will only return active sources from the adapter sources = sources.filter((s) => s.adapter === d.adapter); } + // finally, if the deployment has a resolution + // use that to filter those sources down + if (d.resolution) { + sources = sources.filter( + (s) => s.resolution === d.resolution + ); + } + try { d.suffix = `${d.name}_`; d.sources = sources; - await sqs - .sendMessage({ - MessageBody: JSON.stringify(d), - QueueUrl: process.env.QUEUE_NAME - }) - .then((res) => { - console.log(res.MessageId); - return res.messageId; - }) - .catch((err) => { - console.log(err); - }); + console.log(`${d.name} with ${d.sources.length} sources`) + let body = JSON.stringify(d) + if (env.dryrun) { + let messageId = 'fake-message-id'; + let event = { name: d.name, Records: [{ body, messageId }] } + return event + } else { + await sqs + .sendMessage({ + MessageBody: body, + QueueUrl: process.env.QUEUE_NAME + }) + .then((res) => { + return res.messageId; + }) + .catch((err) => { + console.log(err); + }); + } } catch (err) { console.error(`Failed to send message: ${err}`); } From f12eb9c53bb4aba82a848b99d181e6da15cc2e6b Mon Sep 17 00:00:00 2001 From: Christian Parker Date: Thu, 19 Oct 2023 09:34:23 -0700 Subject: [PATCH 4/5] Moved a logging line --- src/scheduler.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/scheduler.js b/src/scheduler.js index c7f39246..608dbc33 100644 --- a/src/scheduler.js +++ b/src/scheduler.js @@ -55,9 +55,9 @@ export async function handler (event, context) { try { d.suffix = `${d.name}_`; d.sources = sources; - console.log(`${d.name} with ${d.sources.length} sources`) let body = JSON.stringify(d) if (env.dryrun) { + console.log(`${d.name} with ${d.sources.length} sources`) let messageId = 'fake-message-id'; let event = { name: d.name, Records: [{ body, messageId }] } return event From 3a018350e89f902455aa4dc698e3459a75d0e5fb Mon Sep 17 00:00:00 2001 From: Christian Parker Date: Thu, 19 Oct 2023 09:40:51 -0700 Subject: [PATCH 5/5] Added the missing active filter --- src/fetch.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/fetch.js b/src/fetch.js index da8cec85..812ea6d8 100644 --- a/src/fetch.js +++ b/src/fetch.js @@ -105,7 +105,7 @@ export function handler (event, context) { ); } else { log.debug(`Getting sources from active sources array`) - currentSources = sourcesArray; + currentSources = sourcesArray.filter(s => s.active); } // and the final file name env.key = `realtime/${DateTime.now().toFormat('yyyy-MM-dd/X')}${suffix}.ndjson`;