Skip to content

Commit

Permalink
Make a start with #2
Browse files Browse the repository at this point in the history
  • Loading branch information
michielbdejong committed Sep 20, 2024
1 parent 078fddc commit 27a1d39
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 2 deletions.
88 changes: 88 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
"author": "Michiel de Jong <[email protected]>",
"license": "Apache-2.0",
"dependencies": {
"redis": "^4.7.0",
"tslib": "~2.6"
},
"volta": {
Expand Down
43 changes: 43 additions & 0 deletions src/feeder.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@

import { readFileSync } from 'fs';
import cluster from 'node:cluster';
import { availableParallelism } from 'node:os';
import process from 'node:process';

const numCPUs = availableParallelism();
const TESTNET_CSV = '../strategy-pit/__tests__/fixtures/testnet-sarafu.csv';

if (cluster.isPrimary) {
console.log(`Primary ${process.pid} is running`);

// Fork workers.
for (let i = 0; i < numCPUs; i++) {
cluster.fork({ CHUNK: i });
}
cluster.on('online', worker => {
console.info(`worker process ${worker.process.pid} is online`)
})
cluster.on('exit', (worker, code, signal) => {
console.log(`worker ${worker.process.pid} exited ${code} ${signal}`);
});
} else {
let cumm = 0;
const mod = parseInt(process.env.CHUNK);
console.log(`Worker ${process.pid} started for chunk ${mod}`);
console.log("Feeding the server...");
const data = readFileSync(TESTNET_CSV, 'utf8')
const lines = data.split('\n').map(line => {
const [ from, to, weight ] = line.split(' ')
return { from, to, weight }
}).filter(line => line.from !== 'from' && line.from !== '');
for (let lineNo = mod; lineNo < lines.length; lineNo += numCPUs) {
// console.log(process.pid, mod, lineNo, cumm);
const result = await fetch('http://localhost:8000', {
method: 'POST',
body: JSON.stringify(lines[lineNo])
});
cumm += parseFloat(await result.text());
}
console.log(`Worker ${process.pid} done ${cumm}`);
process.exit();
}
11 changes: 9 additions & 2 deletions src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,15 @@ if (cluster.isPrimary) {
// Workers can share any TCP connection
// In this case it is an HTTP server
http.createServer((req, res) => {
res.writeHead(200);
res.end(`hello ${req.url}\n`);
let body = '';
req.on('data', chunk => {
body += chunk;
});
req.on('end', async () => {
const obj = JSON.parse(body);
res.writeHead(200);
res.end(`${obj.weight}\n`);
});
}).listen(8000);

console.log(`Worker ${process.pid} started`);
Expand Down
12 changes: 12 additions & 0 deletions src/redis.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { createClient } from 'redis';

const client = createClient();

client.on('error', err => console.log('Redis Client Error', err));

await client.connect();

await client.set('key', 'value');
const value = await client.get('key');
console.log(value);
await client.quit();

0 comments on commit 27a1d39

Please sign in to comment.