Skip to content

Commit

Permalink
Rewrite orchestrator example using the effects API
Browse files Browse the repository at this point in the history
This allows to both test the new API and provide an example
on how to work with this API

Change-type: patch
  • Loading branch information
pipex committed Sep 26, 2023
1 parent 85281f6 commit 66960f7
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 92 deletions.
5 changes: 0 additions & 5 deletions .eslintrc.json

This file was deleted.

2 changes: 1 addition & 1 deletion lib/effects/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export * from './effect';
export * from './helpers';
export { pipe, flow } from './pipe';
export { pipe, flow, sequence } from './pipe';
export * from './do';
2 changes: 2 additions & 0 deletions lib/effects/pipe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,8 @@ export function flow(fa: (...a: any[]) => any, ...fns: Array<Fn<any, any>>) {
);
}

export const sequence = flow;

/**
* Chain functions from left to right and return the result applied to
* the initial argument.
Expand Down
7 changes: 0 additions & 7 deletions tests/orchestrator/tasks.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,6 @@ describe('orchestrator/tasks', () => {
// The parent tag should be removed
await expect(docker.getImage('alpine:latest').inspect()).to.be.rejected;

// TODO: it should always(?) be true that the task condition should match
// before the test but not after, perhaps this should be a test helper
expect(
doFetch.condition(s),
'condition should no longer hold after the test',
).to.be.false;

// If we run the task again, it should not pull the image again
// i.e. the image should have the same id as before
s = await doFetch({
Expand Down
169 changes: 90 additions & 79 deletions tests/orchestrator/tasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import * as Docker from 'dockerode';
import * as tar from 'tar-stream';

import { Disposer, Initializer, Task } from 'mahler';
import { Effect, bind, map, IO, set, sequence } from 'mahler/effects';
import { console } from '~/test-utils';
import { App, Device, Service, ServiceStatus } from './state';
import { getContainerName, getImageName, getRegistryAndName } from './utils';
Expand Down Expand Up @@ -32,89 +33,99 @@ const docker = new Docker();
export const fetch = Task.of({
op: 'create',
path: '/apps/:appUuid/releases/:releaseUuid/services/:serviceName',
// Only pull the image if it's not already present
condition: (device: Device, ctx) =>
!device.images.some((img) => img.name === getImageName(ctx)),
// The effect of this task is to add the image to the device
effect: (device, ctx) => {
const { digest } = getRegistryAndName(ctx.target.image);
return {
...device,
images: [
...device.images,
{
name: getImageName(ctx),
...(digest && { contentHash: digest }),
effect: sequence(
(device: Device, ctx) => Effect.of({ device, ctx }),
// Set "assigns" a variable to the shared context, this allows the result
// to be used by subsequent functions on the sequence
set('imageName', ({ ctx }) => getImageName(ctx)),
set('imageParts', ({ ctx }) => getRegistryAndName(ctx.target.image)),
bind('image', ({ ctx, device, imageParts, imageName }) =>
// Bind assigns the result of the operation to an effect. Here the effect is provided
// by the call to `IO`. We need to provide a sync and an async side to this call
IO(
// This is the async behavior for the task, it will only be executed
// at runtime
async () => {
const { registry, digest } = imageParts;

const pack = tar.pack(); // pack is a stream

// we use a dockerfile to add image metadata
pack.entry(
{ name: 'Dockerfile' },
[
`FROM ${ctx.target.image}`,
`LABEL io.balena.image="${ctx.target.image}"`,
...(digest ? [`LABEL io.balena.content-hash="${digest}"`] : []),
].join('\n'),
);

pack.finalize();

await new Promise((resolve, reject) =>
docker
.buildImage(pack, {
t: imageName,

// Add authentication to the registry if a key
// has been provided
...(registry &&
device.keys[registry] && {
authconfig: {
username: `d_${device.uuid}`,
password: device.keys[registry],
serverAddress: registry,
},
}),
} as Docker.ImageBuildOptions)
.then((stream) => {
stream.on('data', (b) => console.debug(b.toString()));
stream.on('error', reject);
stream.on('close', reject);
stream.on('end', resolve);
})
.catch(reject),
);

// Get the image using the name
const dockerImage = await docker.getImage(imageName).inspect();

// try to delete the parent image
await docker
.getImage(ctx.target.image)
.remove()
.catch((e) =>
console.warn(
`could not remove image tag '${ctx.target.image}'`,
e,
),
);

// This returns the actual image that will be used
return {
name: imageName,
imageId: dockerImage.Id,
...(digest && { contentHash: digest }),
};
},
],
};
},
action: async (device, ctx) => {
const { registry, digest } = getRegistryAndName(ctx.target.image);

const imageName = getImageName(ctx);
const pack = tar.pack(); // pack is a stream

// we use a dockerfile to add image metadata
pack.entry(
{ name: 'Dockerfile' },
[
`FROM ${ctx.target.image}`,
`LABEL io.balena.image="${ctx.target.image}"`,
...(digest ? [`LABEL io.balena.content-hash="${digest}"`] : []),
].join('\n'),
);

pack.finalize();

await new Promise((resolve, reject) =>
docker
.buildImage(pack, {
t: imageName,

// Add authentication to the registry if a key
// has been provided
...(registry &&
device.keys[registry] && {
authconfig: {
username: `d_${device.uuid}`,
password: device.keys[registry],
serverAddress: registry,
},
}),
} as Docker.ImageBuildOptions)
.then((stream) => {
stream.on('data', (b) => console.debug(b.toString()));
stream.on('error', reject);
stream.on('close', reject);
stream.on('end', resolve);
})
.catch(reject),
);

// Get the image using the name
const dockerImage = await docker.getImage(imageName).inspect();

// try to delete the parent image
await docker
.getImage(ctx.target.image)
.remove()
.catch((e) =>
console.warn(`could not remove image tag '${ctx.target.image}'`, e),
);

return {
...device,
images: [
...device.images,
{
name: imageName,
imageId: dockerImage.Id,
...(digest && { contentHash: digest }),
// This is the sync behavior for the task, it will be executed during planning
() => {
const { digest } = imageParts;
// This returns a "mocked" version of the image that will be used by the planner
return {
name: imageName,
...(digest && { contentHash: digest }),
};
},
],
};
},
),
),
map(({ device, image }) => ({
...device,
images: [...device.images, image],
})),
),
description: (ctx) =>
`pull image '${ctx.target.image}' for service '${ctx.serviceName}' of app '${ctx.appUuid}'`,
});
Expand Down
3 changes: 3 additions & 0 deletions tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
"mahler/planner": [
"lib/planner/index.ts"
],
"mahler/effects": [
"lib/effects/index.ts"
],
"~/test-utils": [
"tests/index.ts"
]
Expand Down

0 comments on commit 66960f7

Please sign in to comment.