- Kafka - de 0 a PRO - Práctica Guiada
-
Docker Instalado: Para facilitar la práctica y el manejo montaremos nuestro propio "cluster" de Kafka en contenedores docker.
-
JDK 11+ Instalado
-
Maven Instalado
Nota: Para la instalación de SDKs mi recomendación es usar SDKman
El repositorio estará organizado en carpetas, por temática (API), dentro de la cual encontraréis una con el ejercicio propuesto a resolver y otra con el ejercicio resuelto.
El proyecto contiene bash script, código Python y código Java organizado en dos módulos Maven, para compilar el proyecto situarse en la carpeta root y ejecutar:
mvn clean install
Abre la carpeta 1.Environment y ejecuta:
docker-compose -f zk-simple-kafka-multiple.yml up -d
En este apartado veremos como setear algunas de las propiedades basicas de Kafka.
Para ver el listado de todas las configuraciones posibles:
Utilizaremos el comando kafka-configs que nos da la instalación de kafka para comprobar el estado de algunos settings básicos de nuestro clúster, para ello deberemos ejecutar dicho comando dentro de cualquiera de nuestros broker.
Por tanto lo primero que necesitaremos será habilitar una consola interactiva dentro del contenedor de uno de nuestros broker para lo que ejecutamos:
docker exec -it kafka-broker-1 /bin/bash
Una vez dentro ejecutaremos el comando kafka-configs para listar la configuración de brokers activa en este momento:
kafka-configs --bootstrap-server kafka1:19092 --entity-type brokers --describe --all
1. Utiliza el comando **kafka-configs** para setear la propiedad _message.max.bytes_ a _512_ en el broker 1
2. Utiliza el comando **kafka-configs** para comprobar el efecto de tu acción.
3. Utiliza el comando **kafka-configs** para setear la propiedad _message.max.bytes_ a _512_ en todos los brokers
4. Revierte la propiedad al valor por defecto para todos los broker.
5. ¿Qué pasa si usa configuración solo existe en un broker e intentamos borrarla de todos a la vez?, ¿Testealo con los scripts anteriores?
Utilizaremos el comando kafka-topics para crear y administrar topics dentro de nuestro cluster:
Para monitorizar lo que está pasando en nuestro cluster, abriremos el log de cada broker en una consola aparte ejecutando:
docker logs -f kafka-broker-<id>
Dentro del contenedor (recuerda docker exec...) de cual quiera de nuestros broker ejecutaremos:
kafka-topics --bootstrap-server kafka1:19092 --create --topic my-topic --partitions 1 --replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1
Vamos a modificar el numero de particiones y replicas de nuestro topic y observemos lo que pasa:
Para el número de particiones:
kafka-topics --bootstrap-server kafka1:19092 --alter --topic my-topic --partitions 2
El incremento de réplicas más "tricky", necesitaremos reasignar la réplica de cada partición a mano (algo a evitar tanto como sea posible).
Primero necesitamos saber cual es la configuración actual del topic:
kafka-topics --bootstrap-server kafka1:19092 --topic my-topic --describe
También necesitaremos un fichero JSON que describa esta reasignación, increase-replication-factor.json:
{
"version": 1,
"partitions": [
{
"topic": "my-topic",
"partition": 0,
"replicas": [
1,
3,
2
]
},
{
"topic": "my-topic",
"partition": 1,
"replicas": [
2,
3,
1
]
}
]
}
Para crear el archivo dentro de nuestro broker podemos usar el comando:
cat << EOF > increase-replication-factor.json
Por último ejecutaremos el comando:
kafka-reassign-partitions --bootstrap-server kafka1:19092 --reassignment-json-file increase-replication-factor.json --execute
1. Crea un topic con 1 particion, factor de replica 1, y que sincronice tras 5 mensajes
2. Cambia el número de particiones a 3 y reasigna la replicación de manera óptima.
3. Cambia la configuración de sincronizacón para que esta se haga tras cada mensaje.
4. Experimenta matando y levantando brokers, ¿Crees que tu asignación del factor de replica fue adecuada?
Primero crea un topic console-example con 3 particiones y factor de réplica 3.
Produciremos varios mensajes en el topic mediante el comando kafka-console-producer y observaremos el comportamiento:
El mensaje a producir será uno simple que solo contendrá un id como body:
1,{"id": "1"}
kafka-console-producer --bootstrap-server kafka1:19092 --topic console-example --property "parse.key=true" --property "key.separator=,"
Ahora crearemos un consumidor de consola:
kafka-console-consumer --bootstrap-server kafka1:19092 --topic console-example --property print.key=true --from-beginning
¿Qué pasa cuando este arranca?:
Solución
¡El consumidor consume todos los mensajes!.
¿Que pasara si añadimos otro consumidor?
Solución
¡Tenemos dos consumidores consumiendo exactamente los mismos mensajes!.
Ahora introduciremos dos consumidores formando un grupo de consumo:
kafka-console-consumer --bootstrap-server kafka1:19092 --topic console-example --property print.key=true --from-beginning --consumer-property group.id=console-group-1
Observad el rebalanceo y particionado que se produce mediante la partition key elegida. ¿Qué casos de uso encontramos para esta funcionalidad?.
Necesitamos crear un productor para que un operador desde consola introduzca las lecturas de n (15??) medidores de temperatura de una sala.
Cada lectura la recibira un dispositivo distinto simulado a través de un consumidor de consola independentiente, queremos que cada consumidor solo reciba la medición correspondiente a su medidor teniendo en cuenta que es muy importante preservar el orden de las mediciones tomadas.
Nota: Si eliges Python como lenguaje, necesitarás instalar el módulo kafka
pip install kafka-python
Observemos la configuración de la clase SimpleProducer. ¿Qué pasa en nuestro cluster si la ejecutamos "directamente"?
Usa el comando kafka-topics para ver que ha pasado con nuestro simple-topic
Es momento ahora de crear nuestro primer consumidor. ¿Sabrías decir que pasará cuando arranquemos nuestro SimpleConsumer1?
¿Que pasará si arrancamos SimpleConsumer2 y SimpleConsumer3?
Solución
Estarán preparados para consumir, pero no consumirán nada, ya que todos los mensajes han sido consumidos por en el arranque anterior y nuestros nuevos procesos pertenecen al mismo grupo de consumo.
¿Y si corremos de nuevo SimpleProducer?, ¿Habrá algún cambio en la manera de consumir?
Solución
Los nuevos mensajes empezarán a ser consumidos por el proceso perteneciente al grupo que tenga la partición a la que corresponda el mensaje asignada.
Es tiempo ahora de volver a nuestro medidor de temperatura. Esta vez simularemos cada dispositivo en una clase productora equivalente.
De nuevo necesitamos que los eventos de cada medidor sean atendidos por un solo consumidor y en el orden establecido.
En la carpeta tx del punto 3.1 encontraremos un ejemplo de un productor consumidor transaccional, es decir que asegura una semantica Exactly Once!.
Hay que tener en cuenta que esta semántica solo esta garantizada out of the box en operaciones dentro del cluster es decir cuando consumimos un topic para producir en otro.
De ese modo debemos asegurar la idempotencia de los mensajes que producimos, asegurar la entrega de los mismos (minimo de ACKS de cada réplica), además en la parte consumidora tendremos que asegurar la transacción en la producción a nuestro topic de salida (recordamos que produciremos en otro topic) poniendo el isolation level en READ_COMMITED, es decir solo consumiremos aquellos mensajes que vengan marcados como comiteados por el productor.
Para los casos en los que no escribamos en otro topic tendra que ser la lógica de consumo la que asegure la transaccionalidad en su parte.
¡De este modo como veremos más adelante las APIs construidas por encima de producer consumer asegura esta semántica by the face!.
En este ejemplo TxMessageProducer produce dos textos, marcados con un id, es decir nos aseguramos la idempotencia en la producción (ver comentarios en las clases de configuración).
Más tarde TxWordCount consumirá los textos separando y contando las palabras de cada mensaje para producir idempotentemente la cuenta de cada palabra en un topic de salida. Para ello:
- Iniciaremos un productor en modo transaccional antes de empezar a consumir.
- En cada poll iniciaremos una nueva transacción
- Ejecutaremos nuestra lógica de consumo para luego mandar todos los commit de los offset consumidos en este poll con el consumer group asegurandonos de ese modo que tanto productor como consumidor han marcado el mensaje como procesado.
Toda la documentación oficial del API de Streams aquí
Especial atención a los conceptos basicos
También digno de mencionar como todas las semánticas de entrega están soportadas por infraestructura mediante la propiedad de configuración processing.guarantee
Para el primer ejemplo buscaremos los básicos de Stream KTable y KStream, la mejor explicación grafica la podemos encontrar aquí
Podemos ver el stream como una foto en un momento dado del estado de un topic/partición, esta foto se está siendo constantemente actualizada (si procesamos en tiempo real) o bien en micro batches en una ventana de tiempo, como vemos en los gráficos de la docu oficial cada momento del stream representa un mensaje en la historia del topic. Por contra en la tabla podremos de un solo momento (en un solo offset) obtener la información agregada del estado de nuestro topic.
En nuestro primer ejemplo KafkaStreamsWordCount vemos como el simple concepto de Ktable simplifica y hace mucho más eficiente nuestro código.
Si nos fijamos en nuestro ejemplo KafkaStreams mediante un sencillo método (función lambda) aggregate podemos ir agregando, valga la redundancia, información que va llegando a nuestro topic.
Podemos simplificar más aún estos calculus gracias a la abstracción KGroupedStream (ejemplos KafkaStreamGroupedKey, KafkaStreamsAggregate) que conseguiremos aplicando *groupBy a nuestro stream, sobre la que podemos aplicar funciones reduce, aggregate, flatmap, etc.
¿Qué diferencia vemos entre nuestros dos ejemplos?
Pista
¿Alguna diferencia en como suma?, ¿Qué pasa si dejamos algún tiempo sin consumir?
Solución
Efectivamente el windowedBy descartará por defecto todos los mensajes que salgan de nuestra ventana de tiempo. Por tanto vemos como en el ejemplo de aggregate sumara todas las entradas de las keys producidas dentro de la ventana. Mientras que en la GroupedKey sumara todo lo que entre en el topic.
Más info sobre el "windowing" aquí
También podemos "cruzar los rayos" para ello usaremos la sintaxis de join de las que nos provee el DSL de Streams. Esto nos permitirá agregar información de dos o más topics en cualquier abstracción del dsl de streams.
Información detalla y amigable de todas las posibilidades de Join aquí
Otra cosa a tener en cuenta es que alguna de las operaciones sobre los streams son stateful, esto quiere decir que los procesadores guardaran información en un storage intermedio (normalmente disco local) del estado de las task ejecutadas, de modo que puedan recuperar y proseguir las operaciones donde las dejaron en caso de tener que reiniciarse.
Además esto nos provee de la interesante posibilidad de hacer queries interactivas sobre un stream, funcionalidad sobre la que se construye KSQL.
En nuestros ejemplos de Movies utilizaremos joins de streams sirviéndonos tanto de KTables como de GlobalKTables.
Utilizaremos estos ejemplos para ver como de una manera sencilla podemos implementar nuestros propios serializadores y serdes, que no es más que la abstracción que agrupa en una sola clase el serializador y deserializdor. Para ello solo tendremos que exteneder e implementar algunos métodos, para dar la logica de mapeo desde el tipo de entrada a nuestro tipo de salida. Puedes ver un ejemplo en el paquete movies.serializers, y un ejemplo genérico de serialización POJO <-> JSON en el paquete streamutils
Connect es una herramienta que nos permite ingestar desde y hacia sistemas de persistencia externos (incluidos topics de kafka) usando workers (maquinas tanto en modo stand alone como distribuido) donde tendremos instalado el core de Connect (normalmente una instalación común de kafka nos valdría) usando para ello una serie de plugins (connectors).
Como cualquier otra API construida "on top" of producer/consumer utiliza la propia infraestructura de Kafka para asegurarnos la persistencia, semánticas de entrega (es capaz de asegurar semanticas exactly once, dependiendo del conector).
Solo necesitaremos arrancarlo pasándole una configuración, podemos ver un ejemplo tanto en la config de nuestro contenedor connect como en el fichero de ejemplo dentro de la carpeta 5.KafkaConnect/plaintext.
Cosas a tener en cuenta en esta configuración:
- Los Serializadores son los usados por defecto en caso de no especificar ninguno en el connector
- Configuramos varios topics internos que connect usara para mantener el estado de sus tareas
- Configuramos la ruta donde connect en arranque, es decir no admite cambios en caliente leera los plugins disponibles para nuestro worker.
- Group Id que nos permite federar workers
Mas info sobre como levantar connect e instalar plugin aquí
Además connect nos provee de un API Rest para poder interactuar de manera amigable.
Además existe un hub donde podremos buscar y descargar los connectors oficiales y no oficiales que necesitemos.
En la carpeta 5.KafkaConnect/plaintext podemos ver un ejemplo de conector que procesa un archivo de texto plano volcándolo en un topic.
Para ello solo tenemos que mediante el api rest de Connect que arrancaremos un plain text connector:
curl -d @"connect-file-source.json" -H "Content-Type: application/json" -X POST http://localhost:8083/connectors
como vemos en el json de configuracion que le pasamos:
{
"name": "local-file-source",
"config": {
"connector.class": "FileStreamSource",
"tasks.max": 1,
"file": "/home/appuser/plain-txt.txt",
"topic": "file.content"
}
}
Lo único que tenemos que decirle es que arrancaremos un nuevo proceso con:
- La clase FileStreamSource (connector integrado en la instalación por defecto de Kafka)
- El número maximo de task (en este caso task de stream en paralelo) que queremos
- Y la ruta donde estará el fichero a procesar.
OJO ruta **dentro del contenedor de connect, por lo que tendremos que usar el comando (dentro de la carpeta del ejemplo):
docker cp plain-txt.txt connect:/home/appuser
¡Después de correr el proceso inspecciona el topic file.content para ver que ha pasado!
Para el ejemplo de Mongo usaremos un replica set de mongo (no he conseguido que funcione con un stand alone) que ya hemos levantado en nuestro compose.
Antes de emepzar el ejemplo necesitaremos poner nuestra BBDD en orden.
Para ello interactuaremos directamente con el contenedor (a falta de un cliente de mongo, si lo tenéis sentíos libres de usarlo)
docker-compose exec mongo bash
mongosh --username admin --password admin
db.createCollection( "movies", {
validator: { $jsonSchema: {
bsonType: "object",
required: [ "id" ],
properties: {
id: {
bsonType: "string",
description: "id"
},
title: {
bsonType : "string",
description: "title"
},
releaseYear: {
bsonType : "string",
description: "release year"
}
}
} }
} )
Para el ejercicio usaremos el mismo modelo que ya usamos en movies.
Hacemos el post para crear el nuevo conector:
curl -d @"connect-mongo-source.json" -H "Content-Type: application/json" -X POST http://localhost:8083/connectors
Echemosle un ojo a la configuración:
{
"name": "mongo-source",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"tasks.max": 1,
"database": "test",
"collection": "movies",
"connection.uri": "mongodb://mongo:27017",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"org.apache.kafka.connect.storage.StringConverter",
"topic.prefix": "",
"poll.max.batch.size": "1000",
"poll.await.time.ms": "5000",
"pipeline":"[]",
"batch.size": 0,
"change.stream.full.document": "updateLookup"
}
}
Esta vez le estamos pasando:
- El nombre del conector y el número maximo de task
- Los serializadores (converters) que usaremos en el proceso, sobreescribiendo los que pasamos por defecto en arranque.
- Configuración específica del connector
- La clase que usaremos, como podemos ver esta clase no pertenece a los paquetes base de kafka, por hemos tenido que copiar el jar que la contiene, que descargamos del hub, en un volumen al que el contendeor de connect tenga acceso, ojo este jar debe estar ahi antes de arrancar.
Insertaremos algunos documentos y veremos como se comporta....
puedes decirme....
¿En que topic esta escribiendo?
Solución
como vemos en la docu este conector creara un nuevo topic test.movies (db . collecion ) al que agregaria un prefijo si asi lo configuramos,
fijate en el describe del topic...
Solución
El converter a string elimina todos los metadatos de Mongo (que vienen por cabeceras) por tanto obtendremos un string limpio con el esquema json de nuestra coleccion
¿Qué pasa si inserto nuevos documentos en la colección?
Solución
el proceso esta escuchando a todos los updates de nuestra colección por tanto procesara como mensaje esta/s nueva/s entrada.
Solución
Si no hemos borrado los topics internos del conector el proceso debería volver arrancarse y volver a empezar el proceso en el punto que lo dejo.
KSQL es el API con el nivel de abstracción más alto de todas los que provee Kafka construido por encima del API de stream nos provee una sintaxis tipo SQL de manera que mediante estas sentencias que todos podemos conocer podremos trabajar con KStreams, KTables y topics en general.
Como veíamos antes se aprovecharemos la naturaleza stateful de los Streams para poder realizar estas operaciones.
De modo similar a connect KSQL es un servidor (standalone o distribuido) que hará de proxy con nuestro cluster.
Hay 3 maneras de comunicarnos con nuestro KSQL Server:
- KSQL Client: Máquina Cliente, que nos hara las veces de cliente SQL (como un sql plus o similar), conectaremos con el servidor a través de https.
- REST API, Podemos enviar nuestras queries al servidor a traves de su api REST
- JAVA Client (BETA) : También nos provee de un cliente Java con el que podemos interactuar con el servidor de KSQL haciendo uso de su API REST.
La Documentación de Referencia la podemos encontrarla aquí.
Como ejemplo usaremos el ejercicio de productor/consumidor simple contenido en la carpeta: 3.1.JavaConsumerProducerAPI/src/main/java/org/ogomez/nontx
.
Para la ejecución del mismo usaremos el plugin maven exec
.
- Ve a la carpeta raiz de proyecto kafka-exercises y ejecuta:
mvn clean install
Con esto habremos conseguido compilar nuestro proyecto correctamente. 2. Ve a la carpeta raiz de los ejercicio JAVA:
cd 3.1.JavaConsumerProducerAPI
- Ejecuta en una consola aparte cada una de las aplicaciones:
mvn exec:java -Dexec.mainClass="org.ogomez.nontx.SimpleConsumer"
mvn exec:java -Dexec.mainClass="org.ogomez.nontx.SimpleConsumer2"
mvn exec:java -Dexec.mainClass="org.ogomez.nontx.SimpleConsumer3"
mvn exec:java -Dexec.mainClass="org.ogomez.nontx.SimpleProducer"