Skip to content

Commit

Permalink
Merge pull request #434 from Inist-CNRS/add-oa-fetch
Browse files Browse the repository at this point in the history
feat: 🎸 add [OAFetch]
  • Loading branch information
touv authored Nov 29, 2024
2 parents 104388f + be91c80 commit c2395ea
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 0 deletions.
31 changes: 31 additions & 0 deletions docs/plugin-conditor.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ Sachant qu'on appauvrit (casse, accents, tiret, apostrophe) tous les champs.
* [CORHALFetch](#corhalfetch)
* [getRnsr](#getrnsr)
* [getRnsrInfo](#getrnsrinfo)
* [OAFetch](#oafetch)
* [WOSFetch](#wosfetch)

### affAlign
Expand Down Expand Up @@ -332,6 +333,36 @@ Output:

* `year` **[number](https://developer.mozilla.org/docs/Web/JavaScript/Reference/Global_Objects/Number)** Year of the RNSR to use instead of the last one (optional, default `2023`)

### OAFetch

Take `Object` with OpenAlx API parametrs, throw each chunk from the result

Input:

```json
[
{ filter: "authorships.author.id:a5000387389" },
]
```

Script:

````ini
[OAFetch]

Output:

```json
[{...}, {"a": "b"}, {"a": "c" }]
````

#### Parameters

* `timeout` **[Number](https://developer.mozilla.org/docs/Web/JavaScript/Reference/Global_Objects/Number)** Timeout in milliseconds (optional, default `1000`)
* `retries` **[Number](https://developer.mozilla.org/docs/Web/JavaScript/Reference/Global_Objects/Number)** The maximum amount of times to retry the connection (optional, default `5`)

Returns **[Object](https://developer.mozilla.org/docs/Web/JavaScript/Reference/Global_Objects/Object)** 

### WOSFetch

Take `String` as URL, throw each chunk from the result
Expand Down
31 changes: 31 additions & 0 deletions packages/conditor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ Sachant qu'on appauvrit (casse, accents, tiret, apostrophe) tous les champs.
* [CORHALFetch](#corhalfetch)
* [getRnsr](#getrnsr)
* [getRnsrInfo](#getrnsrinfo)
* [OAFetch](#oafetch)
* [WOSFetch](#wosfetch)

### affAlign
Expand Down Expand Up @@ -332,6 +333,36 @@ Output:

* `year` **[number](https://developer.mozilla.org/docs/Web/JavaScript/Reference/Global_Objects/Number)** Year of the RNSR to use instead of the last one (optional, default `2023`)

### OAFetch

Take `Object` with OpenAlx API parametrs, throw each chunk from the result

Input:

```json
[
{ filter: "authorships.author.id:a5000387389" },
]
```

Script:

````ini
[OAFetch]

Output:

```json
[{...}, {"a": "b"}, {"a": "c" }]
````

#### Parameters

* `timeout` **[Number](https://developer.mozilla.org/docs/Web/JavaScript/Reference/Global_Objects/Number)** Timeout in milliseconds (optional, default `1000`)
* `retries` **[Number](https://developer.mozilla.org/docs/Web/JavaScript/Reference/Global_Objects/Number)** The maximum amount of times to retry the connection (optional, default `5`)

Returns **[Object](https://developer.mozilla.org/docs/Web/JavaScript/Reference/Global_Objects/Object)** 

### WOSFetch

Take `String` as URL, throw each chunk from the result
Expand Down
2 changes: 2 additions & 0 deletions packages/conditor/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import getRnsr from './getRnsr';
import getRnsrInfo from './getRnsrInfo';
import CORHALFetch from './corhal-fetch';
import WOSFetch from './wos-fetch';
import OAFetch from './openalex-fetch';

const funcs = {
affAlign,
Expand All @@ -14,6 +15,7 @@ const funcs = {
getRnsrInfo,
CORHALFetch,
WOSFetch,
OAFetch,
};

export default funcs;
Expand Down
116 changes: 116 additions & 0 deletions packages/conditor/src/openalex-fetch.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/* istanbul ignore file */
import debug from 'debug';
import { URL, URLSearchParams } from 'url';
import AbortController from 'node-abort-controller';
import retry from 'async-retry';
import fetch from 'fetch-with-proxy';
import writeTo from 'stream-write';
import each from 'async-each-series';

const request = (url, parameters) => async () => {
const response = await fetch(url, parameters);
if (!response.ok) {
const err = new Error(response.statusText);
err.body = await response.json();
throw err;
}
return response;
};
const write = (output, notices) => new Promise((resolve, reject) => each(
notices,
(notice, next) => writeTo(output, notice, next),
(err) => (err ? reject(err) : resolve(true)),
));

/**
* Take `Object` with OpenAlx API parametrs, throw each chunk from the result
*
*
* Input:
*
* ```json
* [
* { filter: "authorships.author.id:a5000387389" },
* ]
* ```
*
* Script:
*
* ```ini
* [OAFetch]
*
* Output:
*
* ```json
* [{...}, {"a": "b"}, {"a": "c" }]
* ```
*
* @name OAFetch
* @param {Number} [timeout=1000] Timeout in milliseconds
* @param {Number} [retries=5] The maximum amount of times to retry the connection
* @returns {Object}
*/
export default async function OAFetch(data, feed) {
if (this.isLast()) {
return feed.close();
}
const { ezs } = this;
const url = String(this.getParam('url', 'https://api.openalex.org'));
const retries = Number(this.getParam('retries', 5));
const timeout = Number(this.getParam('timeout')) || 1000;
const queryparams = new URLSearchParams({
'per-page': 20,
...data,
cursor: '*',
});
const stringURL = `${url}/works?${queryparams}`;
const cURL = new URL(stringURL);
const controller = new AbortController();
const parameters = {
timeout,
signal: controller.signal,
method: 'GET',
};
const options = {
retries,
};
const onError = (e) => {
controller.abort();
debug('ezs:warn')(`Break item #${this.getIndex()} [OAFetch]`, ezs.serializeError(e));
return feed.stop(e);
};
const loop = async (stream, arr, afterKeyToken) => {
if (Array.isArray(arr) && arr.length > 0) {
await write(stream, arr);
}
if (afterKeyToken) {
queryparams.set('cursor', afterKeyToken);
const stringURLBis = `${url}/works?${queryparams}`;
const cURLBis = new URL(stringURLBis);
const parametersBis = {
timeout,
signal: controller.signal,
method: 'GET',
};
try {
const responseBis = await retry(request(cURLBis.href, parametersBis), options);
const { results: resultsBis, meta: metaBis } = await responseBis.json();
loop(stream, resultsBis, metaBis?.next_cursor);
} catch (e) {
debug('ezs:error')(`Error with ${stringURLBis}`, ezs.serializeError(e));
stream.end();
}
} else {
stream.end();
}
};
try {
const output = ezs.createStream(ezs.objectMode());
const response = await retry(request(cURL.href, parameters), options);
const { results, meta } = await response.json();
await loop(output, results, meta?.next_cursor);
await feed.flow(output);
} catch (e) {
onError(e);
}
}
27 changes: 27 additions & 0 deletions packages/conditor/test/oa-fetch.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import from from 'from';
import ezs from '../../core/src';
import statements from '../src';

describe('openalexfetch', () => {
test.skip('#1', (done) => {
ezs.use(statements);
const input = [
{
filter: 'authorships.author.id:a5000387389',
'per-page': 20,
},
];
const output = [];
from(input)
.pipe(ezs('OAFetch', { retries: 1, timeout: 50000 }))
.pipe(ezs.catch())
.on('error', done)
.on('data', (chunk) => {
output.push(chunk);
})
.on('end', () => {
expect(output.length).toBe(487);
done();
});
}, 60000);
});

0 comments on commit c2395ea

Please sign in to comment.