Skip to content

Commit

Permalink
feat: 1st prod ready release
Browse files Browse the repository at this point in the history
  • Loading branch information
argilzar committed Oct 8, 2024
1 parent af46d93 commit 0153b79
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 21 deletions.
6 changes: 1 addition & 5 deletions .env.test
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,5 @@ LOG_LEVEL=error
# Set this one to true if you want pretty logs. Set to empty (not false) if you want to disable it.
# In production, you should set this to empty (not false)
LOG_PRETTY=true

# Service
SERVICE_PORT=8888



SERVICE_PORT=3000
32 changes: 22 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,28 @@
# Generic Kafka Forward 2

# Generic Kafka Forward

Forwards events from (flowcore)[https://www.flowcore.io] to Kafka.

### Environment Variables

| Environment Variable | Description | Type | Default Value | Required |
|---------------------------------|-------------------------------------|:--------:|---------------|:--------:|
| LOG_LEVEL | The log level | `string` | `info` | |
| LOG_PRETTY | Whether to log in pretty format | `int` | `0` | |
| SERVICE_PORT | The port the service will listen on | `int` | `3000` | |


# Development
| Environment Variable | Description | Type | Default Value | Required |
|----------------------------------|----------------------------------------|:---------:|-------------------------------------------|:--------:|
| NODE_ENV | The environment the app is running in | `string` | `production` | |
| LOG_LEVEL | The log level | `string` | `info` | |
| LOG_PRETTY | Whether to log in pretty format | `boolean` | `false` | |
| KAFKA_BROKER | Kafka broker URL | `string` | ||
| KAFKA_TOPIC | Kafka topic | `string` | ||
| KAFKA_GROUP_ID | Kafka group ID | `string` | `flowcore-generic-kafka-forwarder` | |
| KAFKA_CLIENT_ID | Kafka client ID | `string` | `flowcore-generic-kafka-forwarder-client` | |
| KAFKA_SSL | Use SSL for Kafka connection | `boolean` | `false` | |
| KAFKA_SASL_MECHANISM | Kafka SASL mechanism | `string` | | |
| KAFKA_USERNAME | Kafka username | `string` | `""` | |
| KAFKA_PASSWORD | Kafka password | `string` | `""` | |
| KAFKAJS_NO_PARTITIONER_WARNING | Disable partitioner warning | `boolean` | `true` | |
| KAFKA_AWS_AUTHORIZATION_IDENTITY | Kafka AWS authorization identity | `string` | `""` | |
| KAFKA_KEY_PATH | Kafka key path | `string` | `key` | |
| KAFKA_IGNORE_EMPTY_KEY | Ignore empty Kafka key | `boolean` | `false` | |
| KAFKA_ADD_FLOWCORE_HEADERS | Add Flowcore headers to Kafka messages | `boolean` | `false` | |
| SERVICE_PORT | The port the service will listen on | `int` | `3000` | |

## App setup

Expand All @@ -24,6 +35,7 @@ cp .env.example .env

# You can create a new api key in flowcore and set the `FLOWCORE_API_KEY` environment variable in the `.env` file.
```

## Building

```bash
Expand Down
Binary file modified bun.lockb
Binary file not shown.
30 changes: 30 additions & 0 deletions services/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.4.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"

kafka:
image: confluentinc/cp-kafka:7.4.0
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
ports:
- "9092:9092"

kafdrop:
image: obsidiandynamics/kafdrop:latest
depends_on:
- kafka
environment:
KAFKA_BROKERCONNECT: kafka:9092
JVM_OPTS: -Xms32M -Xmx64M
ports:
- "9000:9000"
4 changes: 3 additions & 1 deletion src/env/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@ const envSchema = z.object({
KAFKAJS_NO_PARTITIONER_WARNING: z.coerce.boolean().default(true),
KAFKA_AWS_AUTHORIZATION_IDENTITY: z.coerce.string().default(""),
KAFKA_KEY_PATH: z.string().default("key"),
KAFKA_IGNORE_EMPTY_KEY: z.coerce.boolean().default(false),
KAFKA_ADD_FLOWCORE_HEADERS: z.coerce.boolean().default(false),

PORT: z.coerce.number().default(3000),
})

const env = envSchema.safeParse(process.env)

if (!env.success) {
console.error("Missing environment variables", env.error.flatten().fieldErrors)
console.error("Missing environment variables.", env.error.flatten().fieldErrors)
process.exit(1)
}

Expand Down
27 changes: 22 additions & 5 deletions src/server/routes/transform/transform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,30 @@ export const transform = new Elysia({
// const result = Value.Parse(ExamplePayloadDto, payload)
// Extract the key from the payload using the configurable path
const keyPath = env.KAFKA_KEY_PATH
const key = get(event, keyPath)
console.log("transform", event)
const key_value = get(payload, keyPath)

if (!key_value) {
if (env.KAFKA_IGNORE_EMPTY_KEY) {
logger.warn(`No key found in the payload, using path: '${keyPath}'`)
return
}
}

const headers: {
[p: string]: string
} = {}

if (env.KAFKA_ADD_FLOWCORE_HEADERS) {
headers["x-flowcore-event-source"] = `${event.dataCore}/${event.aggregator}/${event.eventType}/${event.eventId}`
headers["x-flowcore-timeBucket"] = event.timeBucket
}

logger.debug(`Sending message to Kafka topic: ${env.KAFKA_TOPIC} with key: ${key_value}`)

// Send message to Kafka
await producer.send({
topic: "your-topic",
messages: [{ key: key, value: JSON.stringify(payload) }],
return await producer.send({
topic: env.KAFKA_TOPIC,
messages: [{ key: key_value, value: JSON.stringify(payload), headers }],
})
},
{
Expand Down

0 comments on commit 0153b79

Please sign in to comment.