diff --git a/README.md b/README.md index ca7743ed..7580335c 100644 --- a/README.md +++ b/README.md @@ -72,6 +72,25 @@ For a more detailed description of the command line options available, use: `nod ## Deployment Deployment is is being built from the lambda-deployment branch. Any development for openaq-fetch should be branched/merged from/to the lambda-deployment branch until further notice. +Deployments rely on a json object that contains the different deployments. The schedular is then used to loop through that object and post a message that will trigger a lambda to run that deployment. A deployment consists of a set of arguments that are passed to the fetch script to limit the sources that are run. + +You can test the deployments with the following + +Show all deployments but dont submit and dont run the fetcher +` +node index.js --dryrun --deployments all --nofetch +` +Only the japan deployment but dont run the fetcher +` +node index.js --dryrun --deployments japan --nofetch +` + +Only the japan deployment, dont submit a file but run the fetcher +` +node index.js --dryrun --deployments japan +` + + ## Data Source Criteria This section lists the key criteria for air quality data aggregated @@ -83,7 +102,7 @@ this section. 1. Data must be of one of these pollutant types: PM10, PM2.5, sulfur dioxide (SO2), carbon monoxide (CO), nitrogen dioxide (NO2), ozone (O3), and black carbon (BC). -2. Data must be from an official-level outdoor air quality source, as defined as data produced by a government entity or international organization. We do not, at this stage, include data from low-cost, temporary, and/or indoor sensors.  +2. Data must be from an official-level outdoor air quality source, as defined as data produced by a government entity or international organization. We do not, at this stage, include data from low-cost, temporary, and/or indoor sensors. 3. Data must be ‘raw’ and reported in physical concentrations on their originating site. Data cannot be shared in an 'Air Quality Index' or equivalent (e.g. AQI, PSI, API) format. diff --git a/cdk/stack.ts b/cdk/stack.ts index b04c70b4..55cdfdec 100644 --- a/cdk/stack.ts +++ b/cdk/stack.ts @@ -12,7 +12,7 @@ import { copyFileSync, readdirSync } from 'fs'; interface keyable { - [key: string]: string + [key: string]: string } interface StackProps extends cdk.StackProps { diff --git a/src/adapters/acumar.js b/src/adapters/acumar.js index ee787701..10c31370 100644 --- a/src/adapters/acumar.js +++ b/src/adapters/acumar.js @@ -25,10 +25,11 @@ let offset; export const name = 'acumar'; export async function fetchData (source, cb) { - try { - if (source.datetime) { - const dateLuxon = source.datetime.toFormat('dd/MM/yy'); - const hourLuxon = source.datetime.toFormat('HH'); + try { + if (source.datetime) { + log.debug(`Fetching data with ${source.datetime}`); + const dateLuxon = source.datetime.toFormat('dd/MM/yy'); + const hourLuxon = source.datetime.toFormat('HH'); const results = await Promise.all( stations.map((station) => diff --git a/src/adapters/defra.js b/src/adapters/defra.js index c2a87818..e6a3b231 100644 --- a/src/adapters/defra.js +++ b/src/adapters/defra.js @@ -1,29 +1,20 @@ 'use strict'; -import { REQUEST_TIMEOUT } from '../lib/constants.js'; +//import { REQUEST_TIMEOUT } from '../lib/constants.js'; import { DateTime } from 'luxon'; import cheerio from 'cheerio'; -import got from 'got'; - +//import got from 'got'; +import log from '../lib/logger.js'; +import client from '../lib/requests.js'; export const name = 'defra'; -const headers = { 'User-Agent': 'OpenAQ' } - -const gotExtended = got.extend({ - retry: { limit: 3 }, - timeout: { request: REQUEST_TIMEOUT }, - headers: headers - }); - - export function fetchData(source, cb) { - gotExtended(source.url) + client(source, cb) .then((response) => { // Wrap everything in a try/catch in case something goes wrong try { // Format the data const data = formatData(source, response.body); - // Make sure the data is valid if (data === undefined) { return cb({ message: 'Failure to parse data.' }); @@ -32,7 +23,7 @@ export function fetchData(source, cb) { } catch (e) { return cb({ message: 'Unknown adapter error.' }); } - }) + }) } let formatData = function (source, data) { diff --git a/src/deployments.js b/src/deployments.js index bec9408b..debc6610 100644 --- a/src/deployments.js +++ b/src/deployments.js @@ -26,11 +26,12 @@ const deploymentsArray = [ name: 'acumar', adapter: 'acumar', offset: 72 + }, + // Japan is slow and needs to be alone + { + name: 'japan', + source: 'japan-soramame', } -// } -// name: 'japan', -// source: 'japan-soramame' -// } ]; export { deploymentsArray }; diff --git a/src/fetch.js b/src/fetch.js index 64983d96..812ea6d8 100644 --- a/src/fetch.js +++ b/src/fetch.js @@ -50,8 +50,6 @@ const runningSources = {}; * Run all the data fetch tasks in parallel, simply logs out results */ export function handler (event, context) { - log.debug(event); - console.log(event); // the event may have passed a source, in which case we need to filter let currentSources; let offset; @@ -60,6 +58,7 @@ export function handler (event, context) { // sure that we dont overwrite another process let suffix = env.suffix || '_na'; if (event && event.Records && event.Records.length) { + log.debug(`Getting sources from event data - ${event.Records && event.Records.length} records`) const messageId = event.Records[0].messageId; if (event.Records.length === 1) { const body = JSON.parse(event.Records[0].body); @@ -77,30 +76,39 @@ export function handler (event, context) { } suffix = `_${suffix}${messageId}`; } else if (event && event.sources) { + log.debug(`Getting sources from event sources - ${event.sources && event.sources.length} sources`) const messageId = 'event'; currentSources = event.sources; offset = event.offset; datetime = event.datetime; suffix = `_${event.suffix || suffix}${messageId}`; } else if (event && event.source) { + log.debug(`Getting source from event source`) currentSources = sourcesArray.filter( (d) => d.name === event.source ); + } else if (event && event.adapter) { + log.debug(`Getting sources from event adapter ${event.adapter}`) currentSources = sourcesArray.filter( (d) => d.adapter === event.adapter ); } else if (env.adapter) { + log.debug(`Getting sources from env variable adapter ${env.adapter}`) currentSources = sourcesArray.filter( (d) => d.adapter === env.adapter ); + } else if (env.source) { + log.debug(`Getting source from env variable source ${env.source}`) + currentSources = sourcesArray.filter( + (d) => d.name === env.source + ); } else { - currentSources = sourcesArray; + log.debug(`Getting sources from active sources array`) + currentSources = sourcesArray.filter(s => s.active); } // and the final file name - env.key = `realtime/${DateTime.now().toFormat( - 'yyyy-MM-dd/X' - )}${suffix}.ndjson`; + env.key = `realtime/${DateTime.now().toFormat('yyyy-MM-dd/X')}${suffix}.ndjson`; if (offset) { env.offset = offset; @@ -110,6 +118,11 @@ export function handler (event, context) { env.datetime = datetime; } + if (env.nofetch) { + log.info(`Skipping fetch for ${currentSources.length} sources and saving to ${env.key}`) + return true; + } + return Promise.race([ handleSigInt(runningSources), handleProcessTimeout(processTimeout, runningSources), @@ -141,7 +154,7 @@ export function handler (event, context) { // filter sources - if env is set then choose only matching source, // otherwise filter out inactive sources. // * inactive sources will be run if called by name in env. - .use(chooseSourcesBasedOnEnv, env, runningSources) + //.use(chooseSourcesBasedOnEnv, env, runningSources) // mark sources as started .do(markSourceAs('started', runningSources)) // get measurements object from given source diff --git a/src/index.js b/src/index.js index e2e80096..af8cd1a3 100644 --- a/src/index.js +++ b/src/index.js @@ -1,69 +1,21 @@ import { handler as fetcher } from './fetch.js'; import { handler as scheduler } from './scheduler.js'; import { SQS } from '@aws-sdk/client-sqs'; +import _env from './lib/env.js'; -const event = { - Records: [ - { - messageId: '9a70768a-3a75-4690-b1b8-8742264ff4f4', - receiptHandle: - 'AQEB5sTdPJrZXryC2sRtG+cOo29FNzp/O+REZkHXZANKoaPJ9+f9nhpNIRs/GM4qoM1iWnJP1jANkUFkvfUovJ44GYYY8ja8UU7kGLu0i0Ngw9hPiIWVFNCmvZ2e/XOXKKkJvBbuKloHg0i92GjmUvsNQ/d249hW2RdHY9Y2sDu0giAi5w0USPNMxIeC1ibedxZnSKWpPngroebepIxaUDwBym29tE+L5xOtGhx6HRLR5qOWwHoiMOepecnM3Q6yhzyW6vY/AaL7DXIoXOVFCAtp0VliBVFWk8sct91dTjDbJMAx8/LEMHtKqXVyKG+Zs4zcOUMmTw1XIk50AOLgTIRQJ1XE/yWKU2bvBcPBbpvOwpPFKwYTaHNUN2ZxpLZAUhh7M0U2rZgYO3sfcOBME8grng==', - body: '[{"url":"http://api.erg.ic.ac.uk","adapter":"laqn","name":"London Air Quality Network","country":"GB","description":"Environmental Research Group","sourceURL":"http://api.erg.ic.ac.uk/AirQuality/Information/Documentation/pdf","contacts":["info@openaq.org"],"active":true,"datetime":"2022-04-27 06:03:26", "offset": 4 }]', - attributes: [], - messageAttributes: {}, - md5OfBody: 'e29b76c2f919bc4aa0863332ce0198ee', - eventSource: 'aws:sqs', - eventSourceARN: - 'arn:aws:sqs:us-east-1:470049585876:realtime-fetcher-queue', - awsRegion: 'us-east-1', - }, - ], -}; - -const events = { - Records: [ - { - messageId: '9a70768a-3a75-4690-b1b8-8742264ff4f4', - receiptHandle: - 'AQEB5sTdPJrZXryC2sRtG+cOo29FNzp/O+REZkHXZANKoaPJ9+f9nhpNIRs/GM4qoM1iWnJP1jANkUFkvfUovJ44GYYY8ja8UU7kGLu0i0Ngw9hPiIWVFNCmvZ2e/XOXKKkJvBbuKloHg0i92GjmUvsNQ/d249hW2RdHY9Y2sDu0giAi5w0USPNMxIeC1ibedxZnSKWpPngroebepIxaUDwBym29tE+L5xOtGhx6HRLR5qOWwHoiMOepecnM3Q6yhzyW6vY/AaL7DXIoXOVFCAtp0VliBVFWk8sct91dTjDbJMAx8/LEMHtKqXVyKG+Zs4zcOUMmTw1XIk50AOLgTIRQJ1XE/yWKU2bvBcPBbpvOwpPFKwYTaHNUN2ZxpLZAUhh7M0U2rZgYO3sfcOBME8grng==', - body: '[{"url":"http://api.erg.ic.ac.uk","adapter":"laqn","name":"London Air Quality Network","country":"GB","description":"Environmental Research Group","sourceURL":"http://api.erg.ic.ac.uk/AirQuality/Information/Documentation/pdf","contacts":["info@openaq.org"],"active":true,"datetime":"2022-04-27 06:03:26", "offset": 4 },{"url": "http://files.airnowtech.org/", "adapter": "airnow-http", "name": "AirNow", "city": "", "country": "", "description": "AirNow adapter using files streamed over http", "sourceURL": "http://www.airnow.gov/", "contacts": [ "info@openaq.org" ], "active": true, "offset": 10}]', - attributes: [], - messageAttributes: {}, - md5OfBody: 'e29b76c2f919bc4aa0863332ce0198ee', - eventSource: 'aws:sqs', - eventSourceARN: - 'arn:aws:sqs:us-east-1:470049585876:realtime-fetcher-queue', - awsRegion: 'us-east-1', - }, - ], -}; - -const getMessage = async () => { - const sqs = new SQS(); - return await sqs - .receiveMessage({ - QueueUrl: process.env.QUEUE_NAME, - }) - .then(({ Messages }) => { - if (!Messages) return []; - try { - // use the messageid and body but update the format - // to fit the way that the lambda will see it - event.Records[0].messageId = Messages[0].MessageId; - event.Records[0].body = Messages[0].Body; - return event; - } catch (err) { - console.debug(`Could not parse body: ${Messages[0].Body}`); - console.error(err); - return []; - } - }); -}; +const env = _env(); // submit a new set (async () => { - //let sqs = await scheduler({}, {}); - //sqs.map(q => fetcher(q, {})); - //const event = await getMessage(); - fetcher(); + if (env.deployments) { + console.log(`Testing the scheduler with ${env.deployments}`) + let sqs = await scheduler({}, {}); + let deployments = env.deployments.split(',') + sqs = sqs.filter(s => deployments.includes('all')||deployments.includes(s.name)) + // now for each of these run them through the fetcher + sqs.map(q => fetcher(q, null)); + } else { + console.log(`Testing the fetcher directly`) + fetcher(); + } })(); diff --git a/src/lib/constants.js b/src/lib/constants.js index efb290c8..098030f0 100644 --- a/src/lib/constants.js +++ b/src/lib/constants.js @@ -1,4 +1,6 @@ + 'use strict'; // Default timeout to use for requests module -export const REQUEST_TIMEOUT = process.env.REQUEST_TIMEOUT || 60000; +export const REQUEST_TIMEOUT = process.env.REQUEST_TIMEOUT || 15000; +export const REQUEST_RETRIES = 3; diff --git a/src/lib/env.js b/src/lib/env.js index 82c40e40..158e404f 100644 --- a/src/lib/env.js +++ b/src/lib/env.js @@ -95,6 +95,17 @@ const _argv = yargs alias: 'e', group: 'Main options:' }) + .options('deployments', { + describe: 'Use the scheduler to pass deployment events to the fetcher. Specify all or a deployment name', + alias: 'D', + group: 'Testing options:' + }) + .options('nofetch', { + boolean: false, + describe: 'Skip the actual fetch process', + alias: 'n', + group: 'Testing options:' + }) .options('datetime', { describe: 'The date/time to query for, if the adapter handles it', alias: 't', @@ -138,6 +149,8 @@ export const readEnvFromLocalFile = (envFile) => { export default () => { let { dryrun, + deployments, + nofetch, debug, source, adapter, @@ -208,6 +221,8 @@ export default () => { doSaveToS3, strict, dryrun, + deployments, + nofetch, debug, source, datetime, diff --git a/src/lib/logger.js b/src/lib/logger.js index a6940b1d..43af31c4 100644 --- a/src/lib/logger.js +++ b/src/lib/logger.js @@ -4,12 +4,9 @@ */ 'use strict'; -//const winston = require('winston/lib/winston'); import winston from 'winston'; -//require('winston-papertrail/lib/winston-papertrail').Papertrail; // eslint-disable-line no-unused-expressions import * as os from 'os'; -//const os = require('os'); import _env from './env.js'; const { verbose, logLevel, logColor } = _env(); @@ -17,24 +14,11 @@ const { verbose, logLevel, logColor } = _env(); const logger = winston.createLogger({ transports: [ new winston.transports.Console({ - timestamp: () => new Date().toISOString(), + timestamp: true, colorize: logColor, level: verbose ? 'verbose' : logLevel }) ] }); -// // Add Papertrail logger if we have credentials -// if (process.env.PAPERTRAIL_URL) { -// logger.add(winston.transports.Papertrail, { -// host: process.env.PAPERTRAIL_URL, -// port: process.env.PAPERTRAIL_PORT, -// hostname: process.env.PAPERTRAIL_HOSTNAME, -// colorize: logColor, -// program: os.hostname(), -// level: logLevel -// }); -// } - export default logger; -//module.exports = logger; diff --git a/src/lib/measurement.js b/src/lib/measurement.js index cc2d4ac4..5c92792f 100644 --- a/src/lib/measurement.js +++ b/src/lib/measurement.js @@ -49,10 +49,14 @@ async function getStreamFromAdapter (adapter, source) { adapter.name }"` ); - const data = await promisify(adapter.fetchData)(source); - const out = DataStream.from(data.measurements); - out.name = data.name; - return out; + const fetchData = promisify(adapter.fetchData) + const data = await fetchData(source) + .catch(err => { + throw new AdapterError(ADAPTER_ERROR, source, err && err.message && err.message.code) + }); + const out = DataStream.from(data.measurements); + out.name = data.name; + return out; } log.debug( @@ -287,6 +291,7 @@ export function fetchCorrectedMeasurementsFromSourceStream (stream, env) { if (error) throw error; else error = true; } catch (cause) { + log.error(cause) await input.raise( cause instanceof AdapterError ? cause diff --git a/src/lib/requests.js b/src/lib/requests.js new file mode 100644 index 00000000..48e3b0fd --- /dev/null +++ b/src/lib/requests.js @@ -0,0 +1,59 @@ +'use strict'; + +import log from './logger.js'; +import { REQUEST_TIMEOUT, REQUEST_RETRIES } from './constants.js'; +import got from 'got' +import { AdapterError, DATA_URL_ERROR } from './errors.js'; + +const headers = { 'User-Agent': 'OpenAQ' } + +export default (source, cb) => { + let url, timeout, retries; + if(typeof(source) === 'object' && source.url) { + log.debug('Adapter passed along a source object') + url = source.url + retries = source.retries || REQUEST_RETRIES + timeout = source.timeout || REQUEST_TIMEOUT + } else if (typeof(source) === 'string') { // assume source is the url + log.debug('Adapter passed along a url') + url = source + retries = REQUEST_RETRIES + timeout = REQUEST_TIMEOUT + } else { + throw new AdapterError(DATA_URL_ERROR, null, 'No url was passed') + } + // setup the options + const requestClient = got.extend({ + timeout: { + request: timeout, + }, + retry: { + limit: retries, + errorCodes: [ + 'ETIMEDOUT' + ], + }, + headers: headers, + hooks: { + beforeRetry: [ + data => { + log.warn(`Retrying request to ${url}`) + } + ], + } + }); + log.debug(`Requesting response from ${url}`) + // make the request + return requestClient(url) + .then( res => { + // could do some checking here + return res + }) + .catch( err => { + if (cb) { + return cb({ message: err }) + } else { + log.error(`Error caught in got client - ${err.status_code}`) + } + }) +}; diff --git a/src/scheduler.js b/src/scheduler.js index b48a2ea5..608dbc33 100644 --- a/src/scheduler.js +++ b/src/scheduler.js @@ -1,14 +1,22 @@ import { SQS } from '@aws-sdk/client-sqs'; import { sourcesArray } from './sources/index.js'; import { deploymentsArray } from './deployments.js'; +import _env from './lib/env.js'; +const env = _env(); + +/** + * This is the entry point for the schedular lamdba. The purpose is to create a + * new message (SQS) for each deployment found in the deploymentsArray. + * Each message will then be handled by an invocation of the fetch lambda found in + * fetch.js + */ export async function handler (event, context) { // default to all active sources const sqs = new SQS(); // start by checking for deployments, if we dont have one // we create one for consistancy - if (deploymentsArray.length === 0) { deploymentsArray.push({ name: 'main', @@ -19,39 +27,53 @@ export async function handler (event, context) { } // now filter down to the deployments that need to run right now - return await Promise.all( deploymentsArray .filter((d) => !d.resolution || d.resolution === '1h') .map(async (d) => { + // start by assuming that we are running them all and therefor + // remove the inactive ones let sources = sourcesArray.filter((d) => d.active); - if (d.resolution) { - sources = sources.filter( - (s) => s.resolution === d.resolution - ); - } + // if the deployment has specified a spsecific source than we + // assume you want it event its marked inactive if (d.source) { - // assume you want it event its marked inactive sources = sourcesArray.filter((s) => s.name === d.source); - // only run one adapter + // or if you have want to run run only one adapter + // source and adapter cant be run together } else if (d.adapter) { + // this will only return active sources from the adapter sources = sources.filter((s) => s.adapter === d.adapter); } + // finally, if the deployment has a resolution + // use that to filter those sources down + if (d.resolution) { + sources = sources.filter( + (s) => s.resolution === d.resolution + ); + } + try { d.suffix = `${d.name}_`; d.sources = sources; - await sqs - .sendMessage({ - MessageBody: JSON.stringify(d), - QueueUrl: process.env.QUEUE_NAME - }) - .then((res) => { - console.log(res.MessageId); - return res.messageId; - }) - .catch((err) => { - console.log(err); - }); + let body = JSON.stringify(d) + if (env.dryrun) { + console.log(`${d.name} with ${d.sources.length} sources`) + let messageId = 'fake-message-id'; + let event = { name: d.name, Records: [{ body, messageId }] } + return event + } else { + await sqs + .sendMessage({ + MessageBody: body, + QueueUrl: process.env.QUEUE_NAME + }) + .then((res) => { + return res.messageId; + }) + .catch((err) => { + console.log(err); + }); + } } catch (err) { console.error(`Failed to send message: ${err}`); }