Skip to content

Commit

Permalink
Enhancement/checking deployments and testing (#1064)
Browse files Browse the repository at this point in the history
* Added some debugging

* A little cleanup

* Removed the inline filtering of sources

  Instead of using the old pipe use(filter) we are going to rely on
  filtering the sources ahead of time and passing all desired sources
  to be run.

* Moved a logging line

* Added the missing active filter
  • Loading branch information
caparker authored Oct 24, 2023
1 parent b2f175d commit 5c09182
Show file tree
Hide file tree
Showing 13 changed files with 200 additions and 136 deletions.
21 changes: 20 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,25 @@ For a more detailed description of the command line options available, use: `nod
## Deployment
Deployment is is being built from the lambda-deployment branch. Any development for openaq-fetch should be branched/merged from/to the lambda-deployment branch until further notice.

Deployments rely on a json object that contains the different deployments. The schedular is then used to loop through that object and post a message that will trigger a lambda to run that deployment. A deployment consists of a set of arguments that are passed to the fetch script to limit the sources that are run.

You can test the deployments with the following

Show all deployments but dont submit and dont run the fetcher
`
node index.js --dryrun --deployments all --nofetch
`
Only the japan deployment but dont run the fetcher
`
node index.js --dryrun --deployments japan --nofetch
`

Only the japan deployment, dont submit a file but run the fetcher
`
node index.js --dryrun --deployments japan
`


## Data Source Criteria

This section lists the key criteria for air quality data aggregated
Expand All @@ -83,7 +102,7 @@ this section.

1. Data must be of one of these pollutant types: PM10, PM2.5, sulfur dioxide (SO2), carbon monoxide (CO), nitrogen dioxide (NO2), ozone (O3), and black carbon (BC).

2. Data must be from an official-level outdoor air quality source, as defined as data produced by a government entity or international organization. We do not, at this stage, include data from low-cost, temporary, and/or indoor sensors. 
2. Data must be from an official-level outdoor air quality source, as defined as data produced by a government entity or international organization. We do not, at this stage, include data from low-cost, temporary, and/or indoor sensors.

3. Data must be ‘raw’ and reported in physical concentrations on their originating site. Data cannot be shared in an 'Air Quality Index' or equivalent (e.g. AQI, PSI, API) format.

Expand Down
2 changes: 1 addition & 1 deletion cdk/stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { copyFileSync, readdirSync } from 'fs';


interface keyable {
[key: string]: string
[key: string]: string
}

interface StackProps extends cdk.StackProps {
Expand Down
9 changes: 5 additions & 4 deletions src/adapters/acumar.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ let offset;
export const name = 'acumar';

export async function fetchData (source, cb) {
try {
if (source.datetime) {
const dateLuxon = source.datetime.toFormat('dd/MM/yy');
const hourLuxon = source.datetime.toFormat('HH');
try {
if (source.datetime) {
log.debug(`Fetching data with ${source.datetime}`);
const dateLuxon = source.datetime.toFormat('dd/MM/yy');
const hourLuxon = source.datetime.toFormat('HH');

const results = await Promise.all(
stations.map((station) =>
Expand Down
21 changes: 6 additions & 15 deletions src/adapters/defra.js
Original file line number Diff line number Diff line change
@@ -1,29 +1,20 @@
'use strict';

import { REQUEST_TIMEOUT } from '../lib/constants.js';
//import { REQUEST_TIMEOUT } from '../lib/constants.js';
import { DateTime } from 'luxon';
import cheerio from 'cheerio';
import got from 'got';

//import got from 'got';
import log from '../lib/logger.js';
import client from '../lib/requests.js';
export const name = 'defra';

const headers = { 'User-Agent': 'OpenAQ' }

const gotExtended = got.extend({
retry: { limit: 3 },
timeout: { request: REQUEST_TIMEOUT },
headers: headers
});


export function fetchData(source, cb) {
gotExtended(source.url)
client(source, cb)
.then((response) => {
// Wrap everything in a try/catch in case something goes wrong
try {
// Format the data
const data = formatData(source, response.body);

// Make sure the data is valid
if (data === undefined) {
return cb({ message: 'Failure to parse data.' });
Expand All @@ -32,7 +23,7 @@ export function fetchData(source, cb) {
} catch (e) {
return cb({ message: 'Unknown adapter error.' });
}
})
})
}

let formatData = function (source, data) {
Expand Down
9 changes: 5 additions & 4 deletions src/deployments.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ const deploymentsArray = [
name: 'acumar',
adapter: 'acumar',
offset: 72
},
// Japan is slow and needs to be alone
{
name: 'japan',
source: 'japan-soramame',
}
// }
// name: 'japan',
// source: 'japan-soramame'
// }
];

export { deploymentsArray };
27 changes: 20 additions & 7 deletions src/fetch.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ const runningSources = {};
* Run all the data fetch tasks in parallel, simply logs out results
*/
export function handler (event, context) {
log.debug(event);
console.log(event);
// the event may have passed a source, in which case we need to filter
let currentSources;
let offset;
Expand All @@ -60,6 +58,7 @@ export function handler (event, context) {
// sure that we dont overwrite another process
let suffix = env.suffix || '_na';
if (event && event.Records && event.Records.length) {
log.debug(`Getting sources from event data - ${event.Records && event.Records.length} records`)
const messageId = event.Records[0].messageId;
if (event.Records.length === 1) {
const body = JSON.parse(event.Records[0].body);
Expand All @@ -77,30 +76,39 @@ export function handler (event, context) {
}
suffix = `_${suffix}${messageId}`;
} else if (event && event.sources) {
log.debug(`Getting sources from event sources - ${event.sources && event.sources.length} sources`)
const messageId = 'event';
currentSources = event.sources;
offset = event.offset;
datetime = event.datetime;
suffix = `_${event.suffix || suffix}${messageId}`;
} else if (event && event.source) {
log.debug(`Getting source from event source`)
currentSources = sourcesArray.filter(
(d) => d.name === event.source
);

} else if (event && event.adapter) {
log.debug(`Getting sources from event adapter ${event.adapter}`)
currentSources = sourcesArray.filter(
(d) => d.adapter === event.adapter
);
} else if (env.adapter) {
log.debug(`Getting sources from env variable adapter ${env.adapter}`)
currentSources = sourcesArray.filter(
(d) => d.adapter === env.adapter
);
} else if (env.source) {
log.debug(`Getting source from env variable source ${env.source}`)
currentSources = sourcesArray.filter(
(d) => d.name === env.source
);
} else {
currentSources = sourcesArray;
log.debug(`Getting sources from active sources array`)
currentSources = sourcesArray.filter(s => s.active);
}
// and the final file name
env.key = `realtime/${DateTime.now().toFormat(
'yyyy-MM-dd/X'
)}${suffix}.ndjson`;
env.key = `realtime/${DateTime.now().toFormat('yyyy-MM-dd/X')}${suffix}.ndjson`;

if (offset) {
env.offset = offset;
Expand All @@ -110,6 +118,11 @@ export function handler (event, context) {
env.datetime = datetime;
}

if (env.nofetch) {
log.info(`Skipping fetch for ${currentSources.length} sources and saving to ${env.key}`)
return true;
}

return Promise.race([
handleSigInt(runningSources),
handleProcessTimeout(processTimeout, runningSources),
Expand Down Expand Up @@ -141,7 +154,7 @@ export function handler (event, context) {
// filter sources - if env is set then choose only matching source,
// otherwise filter out inactive sources.
// * inactive sources will be run if called by name in env.
.use(chooseSourcesBasedOnEnv, env, runningSources)
//.use(chooseSourcesBasedOnEnv, env, runningSources)
// mark sources as started
.do(markSourceAs('started', runningSources))
// get measurements object from given source
Expand Down
74 changes: 13 additions & 61 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,69 +1,21 @@
import { handler as fetcher } from './fetch.js';
import { handler as scheduler } from './scheduler.js';
import { SQS } from '@aws-sdk/client-sqs';
import _env from './lib/env.js';

const event = {
Records: [
{
messageId: '9a70768a-3a75-4690-b1b8-8742264ff4f4',
receiptHandle:
'AQEB5sTdPJrZXryC2sRtG+cOo29FNzp/O+REZkHXZANKoaPJ9+f9nhpNIRs/GM4qoM1iWnJP1jANkUFkvfUovJ44GYYY8ja8UU7kGLu0i0Ngw9hPiIWVFNCmvZ2e/XOXKKkJvBbuKloHg0i92GjmUvsNQ/d249hW2RdHY9Y2sDu0giAi5w0USPNMxIeC1ibedxZnSKWpPngroebepIxaUDwBym29tE+L5xOtGhx6HRLR5qOWwHoiMOepecnM3Q6yhzyW6vY/AaL7DXIoXOVFCAtp0VliBVFWk8sct91dTjDbJMAx8/LEMHtKqXVyKG+Zs4zcOUMmTw1XIk50AOLgTIRQJ1XE/yWKU2bvBcPBbpvOwpPFKwYTaHNUN2ZxpLZAUhh7M0U2rZgYO3sfcOBME8grng==',
body: '[{"url":"http://api.erg.ic.ac.uk","adapter":"laqn","name":"London Air Quality Network","country":"GB","description":"Environmental Research Group","sourceURL":"http://api.erg.ic.ac.uk/AirQuality/Information/Documentation/pdf","contacts":["[email protected]"],"active":true,"datetime":"2022-04-27 06:03:26", "offset": 4 }]',
attributes: [],
messageAttributes: {},
md5OfBody: 'e29b76c2f919bc4aa0863332ce0198ee',
eventSource: 'aws:sqs',
eventSourceARN:
'arn:aws:sqs:us-east-1:470049585876:realtime-fetcher-queue',
awsRegion: 'us-east-1',
},
],
};

const events = {
Records: [
{
messageId: '9a70768a-3a75-4690-b1b8-8742264ff4f4',
receiptHandle:
'AQEB5sTdPJrZXryC2sRtG+cOo29FNzp/O+REZkHXZANKoaPJ9+f9nhpNIRs/GM4qoM1iWnJP1jANkUFkvfUovJ44GYYY8ja8UU7kGLu0i0Ngw9hPiIWVFNCmvZ2e/XOXKKkJvBbuKloHg0i92GjmUvsNQ/d249hW2RdHY9Y2sDu0giAi5w0USPNMxIeC1ibedxZnSKWpPngroebepIxaUDwBym29tE+L5xOtGhx6HRLR5qOWwHoiMOepecnM3Q6yhzyW6vY/AaL7DXIoXOVFCAtp0VliBVFWk8sct91dTjDbJMAx8/LEMHtKqXVyKG+Zs4zcOUMmTw1XIk50AOLgTIRQJ1XE/yWKU2bvBcPBbpvOwpPFKwYTaHNUN2ZxpLZAUhh7M0U2rZgYO3sfcOBME8grng==',
body: '[{"url":"http://api.erg.ic.ac.uk","adapter":"laqn","name":"London Air Quality Network","country":"GB","description":"Environmental Research Group","sourceURL":"http://api.erg.ic.ac.uk/AirQuality/Information/Documentation/pdf","contacts":["[email protected]"],"active":true,"datetime":"2022-04-27 06:03:26", "offset": 4 },{"url": "http://files.airnowtech.org/", "adapter": "airnow-http", "name": "AirNow", "city": "", "country": "", "description": "AirNow adapter using files streamed over http", "sourceURL": "http://www.airnow.gov/", "contacts": [ "[email protected]" ], "active": true, "offset": 10}]',
attributes: [],
messageAttributes: {},
md5OfBody: 'e29b76c2f919bc4aa0863332ce0198ee',
eventSource: 'aws:sqs',
eventSourceARN:
'arn:aws:sqs:us-east-1:470049585876:realtime-fetcher-queue',
awsRegion: 'us-east-1',
},
],
};

const getMessage = async () => {
const sqs = new SQS();
return await sqs
.receiveMessage({
QueueUrl: process.env.QUEUE_NAME,
})
.then(({ Messages }) => {
if (!Messages) return [];
try {
// use the messageid and body but update the format
// to fit the way that the lambda will see it
event.Records[0].messageId = Messages[0].MessageId;
event.Records[0].body = Messages[0].Body;
return event;
} catch (err) {
console.debug(`Could not parse body: ${Messages[0].Body}`);
console.error(err);
return [];
}
});
};
const env = _env();

// submit a new set
(async () => {
//let sqs = await scheduler({}, {});
//sqs.map(q => fetcher(q, {}));
//const event = await getMessage();
fetcher();
if (env.deployments) {
console.log(`Testing the scheduler with ${env.deployments}`)
let sqs = await scheduler({}, {});
let deployments = env.deployments.split(',')
sqs = sqs.filter(s => deployments.includes('all')||deployments.includes(s.name))
// now for each of these run them through the fetcher
sqs.map(q => fetcher(q, null));
} else {
console.log(`Testing the fetcher directly`)
fetcher();
}
})();
4 changes: 3 additions & 1 deletion src/lib/constants.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@

'use strict';

// Default timeout to use for requests module
export const REQUEST_TIMEOUT = process.env.REQUEST_TIMEOUT || 60000;
export const REQUEST_TIMEOUT = process.env.REQUEST_TIMEOUT || 15000;
export const REQUEST_RETRIES = 3;
15 changes: 15 additions & 0 deletions src/lib/env.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,17 @@ const _argv = yargs
alias: 'e',
group: 'Main options:'
})
.options('deployments', {
describe: 'Use the scheduler to pass deployment events to the fetcher. Specify all or a deployment name',
alias: 'D',
group: 'Testing options:'
})
.options('nofetch', {
boolean: false,
describe: 'Skip the actual fetch process',
alias: 'n',
group: 'Testing options:'
})
.options('datetime', {
describe: 'The date/time to query for, if the adapter handles it',
alias: 't',
Expand Down Expand Up @@ -138,6 +149,8 @@ export const readEnvFromLocalFile = (envFile) => {
export default () => {
let {
dryrun,
deployments,
nofetch,
debug,
source,
adapter,
Expand Down Expand Up @@ -208,6 +221,8 @@ export default () => {
doSaveToS3,
strict,
dryrun,
deployments,
nofetch,
debug,
source,
datetime,
Expand Down
18 changes: 1 addition & 17 deletions src/lib/logger.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,37 +4,21 @@
*/
'use strict';

//const winston = require('winston/lib/winston');
import winston from 'winston';

//require('winston-papertrail/lib/winston-papertrail').Papertrail; // eslint-disable-line no-unused-expressions
import * as os from 'os';
//const os = require('os');

import _env from './env.js';
const { verbose, logLevel, logColor } = _env();

const logger = winston.createLogger({
transports: [
new winston.transports.Console({
timestamp: () => new Date().toISOString(),
timestamp: true,
colorize: logColor,
level: verbose ? 'verbose' : logLevel
})
]
});

// // Add Papertrail logger if we have credentials
// if (process.env.PAPERTRAIL_URL) {
// logger.add(winston.transports.Papertrail, {
// host: process.env.PAPERTRAIL_URL,
// port: process.env.PAPERTRAIL_PORT,
// hostname: process.env.PAPERTRAIL_HOSTNAME,
// colorize: logColor,
// program: os.hostname(),
// level: logLevel
// });
// }

export default logger;
//module.exports = logger;
13 changes: 9 additions & 4 deletions src/lib/measurement.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,14 @@ async function getStreamFromAdapter (adapter, source) {
adapter.name
}"`
);
const data = await promisify(adapter.fetchData)(source);
const out = DataStream.from(data.measurements);
out.name = data.name;
return out;
const fetchData = promisify(adapter.fetchData)
const data = await fetchData(source)
.catch(err => {
throw new AdapterError(ADAPTER_ERROR, source, err && err.message && err.message.code)
});
const out = DataStream.from(data.measurements);
out.name = data.name;
return out;
}

log.debug(
Expand Down Expand Up @@ -287,6 +291,7 @@ export function fetchCorrectedMeasurementsFromSourceStream (stream, env) {
if (error) throw error;
else error = true;
} catch (cause) {
log.error(cause)
await input.raise(
cause instanceof AdapterError
? cause
Expand Down
Loading

0 comments on commit 5c09182

Please sign in to comment.