Skip to content

Commit

Permalink
feat(examples): add kafka router server to examples
Browse files Browse the repository at this point in the history
  • Loading branch information
EandrewJones committed Aug 2, 2024
1 parent ee2389f commit 3164665
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 9 deletions.
4 changes: 2 additions & 2 deletions Dockerfile → example/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ RUN --mount=type=bind,target=./package.json,src=./package.json \
COPY ./src src/
COPY ./example example/

EXPOSE 8000

CMD ["node", "example/server.js"]
EXPOSE 8000
ENTRYPOINT ["node"]
126 changes: 126 additions & 0 deletions example/kafka-rest-router.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

const commander = require("commander");
const { program } = require("commander");
const express = require("express");
const bodyParser = require("body-parser");

// Functions.
const myParseInt = (value, _dummyPrevious) => {
// parseInt takes a string and a radix
const parsedValue = parseInt(value, 10);
if (isNaN(parsedValue)) {
throw new commander.InvalidArgumentError("Not a number.");
}
return parsedValue;
};

const configureCors = (req, res, next) => {
res.header("Access-Control-Allow-Origin", "*");
res.header("Access-Control-Allow-Methods", "POST,OPTIONS");
res.header(
"Access-Control-Allow-Headers",
"Content-Type, Authorization, Content-Length, X-Requested-With",
);

// intercept OPTIONS method
if ("OPTIONS" == req.method) {
res.sendStatus(200);
} else {
next();
}
};

const transformAndForward = async (req, res) => {
try {
const body = typeof req.body === "string" ? JSON.parse(req.body) : req.body;

// Sometimes UserALE sends empty buffers or empty objects, these need to be filtered out
const isEmptyArray = Array.isArray(body) && body.length === 0;
const isEmptyObject =
typeof body === "object" &&
body !== null &&
Object.keys(body).length === 0;
if (isEmptyArray || isEmptyObject) return;

if (options.verbose) console.log(JSON.stringify(body, null, 3));

const transformedPayload = {
records: body.map((log) => ({
key: log["sessionId"],
value: log,
})),
};

if (options.verbose)
console.log(JSON.stringify(transformedPayload, null, 3));

const response = await fetch(options.forwardTo, {
method: "POST",
body: JSON.stringify(transformedPayload),
headers: { "Content-Type": "application/json" },
});

const statusCode = response.status;

res.status(statusCode).send(`Forwarded status code: ${statusCode}`);
} catch (error) {
console.error("Error: ", error);
res.status(500).send("Internal service error");
}
};

const gracefulShutdown = () => {
process.exit();
};

// Args
program
.requiredOption("-f, --forward-to <forward-address>", "Forwarding address")
.option("-p, --port <port>", "Port number", myParseInt, 8000)
.option("-v, --verbose", "Enable verbose mode", false)
.parse(process.argv);
const options = program.opts();
console.log(options);

// Configure app
const app = express();

app.set("port", options.port);
app.use(configureCors);
app.use(bodyParser.urlencoded({ extended: true, limit: "100mb" }));
app.use(bodyParser.json({ limit: "100mb" }));
app.use(bodyParser.text());

// Routes
app.post("/", transformAndForward);

// Server
app.listen(app.get("port"), () => {
console.log(
"UserALE Karapace Router started...",
`\n\tPort: ${app.get("port")}`,
`\n\tForwarding address: ${options.forwardTo}`,
`\n\tVerbose: ${options.verbose}`,
);
});

process.on("SIGTERM", () => gracefulShutdown());
process.on("SIGINT", () => gracefulShutdown());
17 changes: 15 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
"@typescript/lib-dom": "npm:@types/web@^0.0.144",
"babel-jest": "^29.7.0",
"body-parser": "^1.20.2",
"commander": "^12.1.0",
"cypress": "^13.6.0",
"cz-conventional-changelog": "^3.3.0",
"detect-browser": "^5.3.0",
Expand Down
26 changes: 21 additions & 5 deletions tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@
"isolatedModules": true,
"incremental": true,
"jsx": "preserve",
"lib": ["dom", "dom.iterable", "esnext"],
"lib": [
"dom",
"dom.iterable",
"esnext"
],
"module": "esnext",
"moduleResolution": "node",
"noEmit": true,
Expand All @@ -22,16 +26,28 @@
"noUnusedLocals": true,
"noUnusedParameters": false,
"paths": {
"@/*": ["./src/*"]
"@/*": [
"./src/*"
]
},
"pretty": true,
"resolveJsonModule": true,
"skipLibCheck": true,
"sourceMap": true,
"strict": true,
"target": "es2015",
"typeRoots": ["node_modules/@types/"],
"typeRoots": [
"node_modules/@types/"
],
},
"include": ["src", "test", "src/types.d.ts"],
"exclude": ["node_modules", "build", "logs", "example"],
"include": [
"src",
"test",
"src/types.d.ts"
],
"exclude": [
"node_modules",
"build",
"logs"
],
}

0 comments on commit 3164665

Please sign in to comment.