Skip to content

Commit

Permalink
Allow config prefix override (#11)
Browse files Browse the repository at this point in the history
* Allow config prefix override

This change will allow to override the default configuration prefix 'KAFKA_MONGO_WATCHER'
with a environment variable called : 'KAFKA_MONGO_WATCHER_PREFIX'

* explain KAFKA_MONGO_WATCHER_PREFIX

* update opentelemetry config name

Co-authored-by: fkarakas <[email protected]>
Co-authored-by: Laurent Dechoux <[email protected]>
Co-authored-by: ldechoux <ghofBud3>
  • Loading branch information
3 people authored Feb 25, 2022
1 parent 6933325 commit 8c48a9f
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 43 deletions.
10 changes: 5 additions & 5 deletions .env.dist
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
KAFKA_MONGO_WATCHER_REPLAY=true
REPLAY=true

KAFKA_MONGO_WATCHER_PRINT_CONFIG=true
KAFKA_MONGO_WATCHER_LOG_CLI_VERBOSE=false
KAFKA_MONGO_WATCHER_LOG_LEVEL=DEBUG
KAFKA_MONGO_WATCHER_PRODUCER_POOL_SIZE=1
PRINT_CONFIG=true
LOG_CLI_VERBOSE=false
LOG_LEVEL=DEBUG
PRODUCER_POOL_SIZE=1
77 changes: 43 additions & 34 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ You can run it using the following example and pass configuration environment va

```bash
$ docker run \
-e 'KAFKA_MONGO_WATCHER_REPLAY=true' \
-e 'REPLAY=true' \
etf1/kafka-mongo-watcher:latest
```

Expand All @@ -62,7 +62,7 @@ In order to run the watcher, type the following command with the desired argumen
You can use flags (as in this example) or environment variables:

```bash
$ ./kafka-mongo-watcher -KAFKA_MONGO_WATCHER_REPLAY=true
$ ./kafka-mongo-watcher -REPLAY=true
...
<info> HTTP server started {"facility":"kafka-mongo-watcher","version":"wip","addr":":8001","file":"/usr/local/Cellar/go/1.14/libexec/src/runtime/asm_amd64.s","line":1373}
<info> Connected to mongodb database {"facility":"kafka-mongo-watcher","version":"wip","uri":"mongodb://root:[email protected]:27011,127.0.0.1:27012,127.0.0.1:27013/watcher?replicaSet=replicaset\u0026authSource=admin"}
Expand All @@ -77,148 +77,157 @@ In dev environment you can copy `.env.dist` in `.env` and edit his content in or
You can set/override configuration variables from `.env` file and from `variables environment` and or from cli arguments
(If a variables was configured in multiple sources the last will override the previous one)

#### KAFKA_MONGO_WATCHER_CUSTOM_PIPELINE
Configuration variables with prefix are first loaded and then without prefix. For example if you define `KAFKA_MONGO_WATCHER_MONGODB_URI=xxxx` it will used for the mongo uri, even if `MONGODB_URI=yyyy` is set. This allows some overriding case, sometimes useful inside kubernetes cluster.

#### KAFKA_MONGO_WATCHER_PREFIX
*Type*: string

*Description*: In case you want to specify a filtering pipeline, you can specify it here. It works both wil replay and watch mode.
*Description*: In case you want to specify a different prefix (not `KAFKA_MONGO_WATCHER`) for all configuration environment variables.

*Example value*: `[ { "$match": { "fullDocument.is_active": true } }, { $addFields: { "custom-field": "custom-value" } } ]`
*Example value*: `KAFKA_MONGO_WATCHER_PREFIX=CUSTOM` in this case

**Hint**: You can also use some built-in variables such as `%currentTimestamp%` that will put the current timestamp value right in the aggregation pipeline.
#### CUSTOM_PIPELINE
*Type*: string

*Example value with variables*: `[ { "$match": { "date": { "$gt": { "$date": { "$numberLong": "%currentTimestamp%" } } } } } ]`
*Description*: In case you want to specify a filtering pipeline, you can specify it here. It works both wil replay and watch mode.

#### KAFKA_MONGO_WATCHER_REPLAY
*Example value*: `[ { "$match": { "fullDocument.is_active": true } }, { $addFields: { "custom-field": "custom-value" } } ]`

#### REPLAY
*Type*: bool

*Description*: In case you want to send all collection's documents once (default: false)

#### KAFKA_MONGO_WATCHER_MONGODB_URI
**Hint**: You can also use some built-in variables such as `%currentTimestamp%` that will put the current timestamp value right in the aggregation pipeline.

*Example value with variables*: `[ { "$match": { "date": { "$gt": { "$date": { "$numberLong": "%currentTimestamp%" } } } } } ]`

#### MONGODB_URI
*Type*: string

*Description*: The MongoDB connection string URI (default: mongodb://root:toor@127.0.0.1:27011,...)

#### KAFKA_MONGO_WATCHER_MONGODB_COLLECTION_NAME
#### MONGODB_COLLECTION_NAME
*Type*: string

*Description*: The MongoDB collection you want to watch (default: "items")

#### KAFKA_MONGO_WATCHER_MONGODB_DATABASE_NAME
#### MONGODB_DATABASE_NAME
*Type*: string

*Description*: The MongoDB database name you want to connect to (default: "watcher")

#### KAFKA_MONGO_WATCHER_MONGODB_SERVER_SELECTION_TIMEOUT
#### MONGODB_SERVER_SELECTION_TIMEOUT
*Type*: duration

*Description*: The MongoDB server selection timeout duration (default: 2s)

#### KAFKA_MONGO_WATCHER_MONGODB_OPTION_BATCH_SIZE
#### MONGODB_OPTION_BATCH_SIZE
*Type*: integer

*Description*: In case you want to enable watch batch size on MongoDB watch (default: 0 / no batch)

#### KAFKA_MONGO_WATCHER_MONGODB_OPTION_FULL_DOCUMENT
#### MONGODB_OPTION_FULL_DOCUMENT
*Type*: boolean

*Description*: In case you want to retrieve the full document when watching for oplogs (default: true)

#### KAFKA_MONGO_WATCHER_MONGODB_OPTION_MAX_AWAIT_TIME
#### MONGODB_OPTION_MAX_AWAIT_TIME
*Type*: duration

*Description*: In case you want to set a maximum value awaiting for new oplogs (default: 0 / don't stop)

#### KAFKA_MONGO_WATCHER_MONGODB_OPTION_RESUME_AFTER
#### MONGODB_OPTION_RESUME_AFTER
*Type*: string

*Description*: In case you want to set a logical starting point for the change stream (example : `{"_data": <hex string>}`)

#### KAFKA_MONGO_WATCHER_MONGODB_OPTION_START_AT_OPERATION_TIME_I
#### MONGODB_OPTION_START_AT_OPERATION_TIME_I
*Type*: uint32 *(increment value)*

#### KAFKA_MONGO_WATCHER_MONGODB_OPTION_START_AT_OPERATION_TIME_T
#### MONGODB_OPTION_START_AT_OPERATION_TIME_T
*Type*: uint32 *(timestamp)*

*Description*: In case you want to set a timestamp for the change stream to only return changes that occurred at or after the given timestamp (default: nil)

#### KAFKA_MONGO_WATCHER_MONGODB_OPTION_WATCH_MAX_RETRIES
#### MONGODB_OPTION_WATCH_MAX_RETRIES
*Type*: integer

*Description*: The max number of retries when trying to watch a collection (default: 3, set to 0 to disable retry)

#### KAFKA_MONGO_WATCHER_MONGODB_OPTION_WATCH_RETRY_DELAY
#### MONGODB_OPTION_WATCH_RETRY_DELAY
*Type*: duration

*Description*: Sleeping delay between two watch attempts (default: 500ms)

#### KAFKA_MONGO_WATCHER_KAFKA_BOOTSTRAP_SERVERS
#### KAFKA_BOOTSTRAP_SERVERS
*Type*: string

*Description*: Kafka bootstrap servers list (default: "127.0.0.1:9092")

#### KAFKA_MONGO_WATCHER_KAFKA_TOPIC
#### KAFKA_TOPIC
*Type*: string

*Description*: Kafka topic to write into (default: "kafka-mongo-watcher")

#### KAFKA_MONGO_WATCHER_KAFKA_PRODUCE_CHANNEL_SIZE
#### KAFKA_PRODUCE_CHANNEL_SIZE
*Type*: integer

*Description*: The maximum size of the internal channel producer size (default: 10000)

A big value here can increase the heap memory of the application as all the payload that have to be sent to Kafka will be maintained in channel.

#### KAFKA_MONGO_WATCHER_LOG_CLI_VERBOSE
#### LOG_CLI_VERBOSE
*Type*: boolean

*Description*: Used to enable/disable log verbosity (default: true)

#### KAFKA_MONGO_WATCHER_LOG_LEVEL
#### LOG_LEVEL
*Type*: string

*Description*: Used to define first level you want to start display logs (default: "info")

#### KAFKA_MONGO_WATCHER_GRAYLOG_ENDPOINT
#### GRAYLOG_ENDPOINT
*Type*: string

*Description*: In case you want to push logs into a Graylog server, just fill this entry with the endpoint

#### KAFKA_MONGO_WATCHER_HTTP_IDLE_TIMEOUT
#### HTTP_IDLE_TIMEOUT
*Type*: duration

*Description*: A idle timeout for HTTP technical server (default: 90s)

#### KAFKA_MONGO_WATCHER_HTTP_READ_HEADER_TIMEOUT
#### HTTP_READ_HEADER_TIMEOUT
*Type*: duration

*Description*: A read timeout for HTTP technical server (default: 1s)

#### KAFKA_MONGO_WATCHER_HTTP_WRITE_TIMEOUT
#### HTTP_WRITE_TIMEOUT
*Type*: duration

*Description*: A write timeout for HTTP technical server (default: 10s)

#### KAFKA_MONGO_WATCHER_HTTP_TECH_ADDR
#### HTTP_TECH_ADDR
*Type*: string

*Description*: A specified address for HTTP technical server to listen (default: ":8001")

#### KAFKA_MONGO_WATCHER_PRINT_CONFIG
#### PRINT_CONFIG
*Type*: boolean

*Description*: Used to enable/disable the configuration print at startup (default: true)

#### KAFKA_MONGO_WATCHER_PPROF_ENABLED
#### PPROF_ENABLED
*Type*: boolean

*Description*: In case you want to enable Go pprof debugging (default: true). No impact when not used

#### OTEL_COLLECTOR_ENDPOINT
#### OPEN_TELEMETRY_COLLECTOR_ENDPOINT
*Type*: string

*Description*: In case you want to enable OpenTelemetry tracing, fill this with the <host>:<port> of your collector endpoint

#### OTEL_SAMPLE_RATIO
#### OPEN_TELEMETRY_SAMPLE_RATIO
*Type*: float64

*Description*: A fraction between 0 and 1 to enable sampling OpenTelemetry traces
Expand Down
7 changes: 6 additions & 1 deletion cmd/watcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,16 @@ import (
signal_subscriber "github.com/gol4ng/signal"
)

const (
var (
configPrefix = "kafka_mongo_watcher"
)

func main() {

if prefixFromEnv := os.Getenv("KAFKA_MONGO_WATCHER_PREFIX"); prefixFromEnv != "" {
configPrefix = prefixFromEnv
}

ctx, cancel := context.WithCancel(context.Background())
cfg := config.NewBase(ctx, configPrefix)

Expand Down
2 changes: 1 addition & 1 deletion cmd/watcher/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ var cfg *config.Base

func setupConfig(ctx context.Context) {
if cfg == nil {
os.Setenv("KAFKA_MONGO_WATCHER_PRINT_CONFIG", "false")
os.Setenv("PRINT_CONFIG", "false")

cfg = config.NewBase(ctx, configPrefix)
cfg.LogCliVerbose = false
Expand Down
4 changes: 2 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ type Base struct {
GraylogEndpoint string `config:"GRAYLOG_ENDPOINT"`
Replay bool `config:"REPLAY"`
CustomPipeline string `config:"CUSTOM_PIPELINE"`
OtelCollectorEndpoint string `config:"OTEL_COLLECTOR_ENDPOINT"`
OtelSampleRatio float64 `config:"OTEL_SAMPLE_RATIO"`
OtelCollectorEndpoint string `config:"OPEN_TELEMETRY_COLLECTOR_ENDPOINT"`
OtelSampleRatio float64 `config:"OPEN_TELEMETRY_SAMPLE_RATIO"`
PprofEnabled bool `config:"PPROF_ENABLED"`

HttpServer
Expand Down

0 comments on commit 8c48a9f

Please sign in to comment.