-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use rdf-connect/ldes-client
#45
base: master
Are you sure you want to change the base?
Conversation
Logs are in line with what the ldes-client uses, which makes for a bit neater output.
|
||
const consumerJob = new CronJob(CRON_PATTERN, async () => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The usage of the Cronjob wasn't entirely clear to me, so I left it out for now.
The ldes-client works by either ingesting the whole feed in one go, however long that may take, or by continuously polling the feed with a given interval.
Adding a cronjob seems like a second way to enable polling of the LDES feed? There may be a good reason to have the cronjob approach that I'm missing, so we can just add it back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine with leaving this out, the use case is not entirely clear to me either. I think this was to only start syncing at a given time. e.g. to make sure syncs happen outside of busy hours. but there's other ways around that.
app.ts
Outdated
lastVersionOnly: REPLACE_VERSIONS, // Won't emit members if they're known to be older than what is already in the state file | ||
loose: true, // Make this configurable? IPDC needs this to be true | ||
fetch: enhanced_fetch({ | ||
/* In comment are the default values, perhaps we want to make these configurable | ||
concurrent: 10, // Amount of concurrent requests to a single domain | ||
retry: { | ||
codes: [408, 425, 429, 500, 502, 503, 504], // Which faulty HTTP status codes will trigger retry | ||
base: 500, // Seems to be unused in the client code | ||
maxRetries: 5, | ||
}*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comments already mention it. but some of these things could be made configurable via environment variables.
export function memberProcessor(): WritableStream<Member> { | ||
const logger = getLoggerFor("member-processor"); | ||
|
||
const processMember = async (member: Member) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is now implementing a Web Stream Writable (as opposed to a Node.js Streams WritableStream).
The new client uses the Web Stream API and it made more sense to follow it than to use (the rather confusing) Node.js Streams API (although it would still work).
lib/member-processor.ts
Outdated
/* TODO: hardcoding the predicate is not a good idea, | ||
* either the LDES client should inform us about this metadata | ||
* or we should make this configurable and bypass the client's | ||
* built-in versioning support. */ | ||
quad(variable("s"), DCTERMS.versionOf, baseResourceUri) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before we could configure this value via an environment variable so there was no hardcoding in the service. Ideally the ldes-client will just expose this information via the Member so consumers like us can use it to post-process the data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as discussed let's use the stream info if possible, dcterms:isVersionOf (note that it's isVersionOf and not versionOf !) is a good fallback
@@ -122,85 +118,10 @@ export async function executeDeleteQuery (quads: RDF.Quad[]) { | |||
} | |||
} | |||
|
|||
export async function getLatestTimestamp (baseResource: RDF.NamedNode, treeProperties: TreeProperties) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Versioning (and only emitting newer versions) now happens by the underlying library, so we don't need to fetch a member's latest timestamp anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this still allows some performance gains (fewer inserts) in case you don't have a state file but do already have data (for example when upgrading the consumer), but am fine with keeping things simple for now.
export async function executeDeleteInsertQuery ( | ||
quadsToDelete: RDF.Quad[], | ||
quadsToInsert: RDF.Quad[] | ||
) { | ||
await executeDeleteQuery(quadsToDelete); | ||
await executeInsertQuery(quadsToInsert); | ||
} | ||
|
||
export async function fetchState (stream: RDF.NamedNode): Promise<State | undefined> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ldes-client stores its state as a file on disk. Although we could write and read this data to the triplestore, it makes working with the client more cumbersome. I decided to leave it out here, instead users of this service can provide a volume mount to store the state of ldes-client.
|
||
| Environment variable | Reason | Description | | ||
|----------------------|--------|-------------| | ||
| `LDES_DEREFERENCE_MEMBERS` | The underlying library does not make dereferencing optional. | Whether to dereference members, because the collection pages do not contain all information. | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As stated, this is always enabled by the library and cannot be disabled as of now, or at least I could not manage to disable it.
This might be an issue for https://github.com/redpencilio/ldes-consumer-manager
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do wonder how they decide what to dereference and what that means for us. I can see cases where you explicitly don't want this. needs further investigation on what this means in practice
package.json
Outdated
@@ -5,9 +5,9 @@ | |||
"cron": "^2.1.0", | |||
"env-var": "^7.3.0", | |||
"express-http-context": "^1.2.4", | |||
"@greyoda/ldes-client": "0.0.15", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently it's using my fork which lives here: https://github.com/sergiofenoll/ldes-client
At the moment of writing, the fork implements loose
mode for extracting members. This ensures that certain misconfigured LDES feeds can still be parsed.
Other issues with the client are:
- Versioning doesn't work when the state is retrieved from the serialized state file (see: https://matrix.to/#/!HCYaBzWmkpZlPcCyAL:chat.semantic.works/$5uy83w1PnVNWzRNQ4O3CWs5SyY2mmh6u6qwP4GwnHg4?via=chat.semantic.works&via=matrix.org&via=gitter.im)
ldes:versionOfPath
andldes:timestampPath
aren't exposed to consumers
|----------------------|---------|-------------| | ||
| `INGEST_MODE` | `ALL` | How the LDES feed should be ingested. Valid options are `ALL` and `MATERIALIZE`. `ALL` will ingest all versioned members as-is and store them in the triplestore. `MATERIALIZE` will store the [materializations of the members](https://semiceu.github.io/LinkedDataEventStreams/#version-materializations). | | ||
| `REPLACE_VERSIONS` | `false` | Whether to remove old versions of a resource when adding a new version or not. | | ||
| `PERSIST_STATE` | `false` | Whether to persist the state of the LDES client. The state is stored as a file in `/data/$LDES_ENDPOINT_VIEW-state.json`, make sure to mount the data folder to have access to store the state across container rebuilds! | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
won't the slashes in the feed uri create extra directories?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was too hasty writing this. What the code does is the following:
const url = new URL(LDES_ENDPOINT_VIEW);
stateFilePath = `/data/${url.host}-state.json`;
So it uses the host part, which can't contain slashes.
| `LDES_POLLING_INTERVAL` | `60000` | Number of milliseconds before refetching uncacheable fragments | | ||
| `LDES_REQUESTS_PER_MINUTE` | `0` (unlimited) | How many requests per minutes may be sent to the same host. This is optional, but any passed in value must be a positive number. | | ||
| `LDES_ENDPOINT_HEADERS` | `{}` (no headers will be added) | Extra headers that will be added to the requests sent to the LDES endpoint. Recommended syntax:<pre>environment:<br> LDES_ENDPOINT_HEADERS: ><br> { "HEADER-NAME": "header-value" } # The leading whitespace is important!</pre> | | ||
| `SPARQL_ENDPOINT_HEADER_<key>` | N/A | A header key-value combination which should be send as part of the headers to the SPARQL endpoint. | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
guessing this will also need to mvoe to a json structure, similar to LDES_ENDPOINT_HEADERS.
Now that the client exposes the LDES feed's metadata, we will use it when processing the members (in particular, we will use the versionOf predicate).
If the consuming feed doesn't provide the metadata users of the service can still define the expected paths and this service will use them. Note that `REPLACE_VERSIONS` will not work when providing these because the ldes-client library needs the feed to provide the paths itself, we can't tell it what paths to look for. An alternative would be to re-implement versioning in this service and fallback to it if the feed doesn't provide the paths.
It gets REALLY chatty since every request will be logging this and nothing changes.
New version should fix some memory issues in the library
"safe" mode just means that any time fetch throws an exception, the request is retried in a loop until no exception is thrown. This isn't great and we probably want to just handle retries ourselves instead.
The aim of this PR is to replace the usage of https://github.com/TREEcg/event-stream-client/tree/main/packages/actor-init-ldes-client with https://github.com/rdf-connect/ldes-client.
The PR is currently in Draft mode because not all features are 100% supported, in fact the version of ldes-client that will be installed is my fork which contains some small but necessary changes to make some faulty LDES feeds get ingested properly.
I will add remarks/notes about the changes in comments on this PR. If any of the remarks should move somewhere (i.e. to the README, comments in code) or should be further worked out, let me know.