Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix binary serialization with nats consumer #158

Merged
merged 1 commit into from
Feb 25, 2025

Conversation

mcambria
Copy link
Contributor

@mcambria mcambria commented Feb 21, 2025

Howdy,
Nats consumers would crash when using binary serialization like avro because of a bad logging statement that tried to force the record value to a string or throws. This removes that unnecessary string conversion.

The relevant error log looks like

{"timestamp":"2025-02-11T18:30:51.99Z","sequence":227,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.server.ConnectorLifecycle","level":"ERROR","message":"Connector completed: success = 'false', message = 'io.debezium.DebeziumException: Unexpected data type '[B'', error = 'io.debezium.DebeziumException: Unexpected data type '[B''","threadName":"pool-7-thread-1","threadId":26,"mdc":{},"ndc":"","hostName":"debezium","processName":"io.debezium.server.Main","processId":1,"exception":{"refId":1,"exceptionType":"io.debezium.DebeziumException","message":"Unexpected data type '[B'","frames":[{"class":"io.debezium.server.BaseChangeConsumer","method":"getString","line":84},{"class":"io.debezium.server.nats.jetstream.NatsJetStreamChangeConsumer","method":"handleBatch","line":184},{"class":"io.debezium.embedded.async.ParallelSmtAndConvertBatchProcessor","method":"processRecords","line":56},{"class":"io.debezium.embedded.async.AsyncEmbeddedEngine$PollRecords","method":"doCall","line":1167},{"class":"io.debezium.embedded.async.AsyncEmbeddedEngine$PollRecords","method":"doCall","line":1148},{"class":"io.debezium.embedded.async.RetryingCallable","method":"call","line":47},{"class":"java.util.concurrent.FutureTask","method":"run","line":264},{"class":"java.util.concurrent.Executors$RunnableAdapter","method":"call","line":515},{"class":"java.util.concurrent.FutureTask","method":"run","line":264},{"class":"java.util.concurrent.ThreadPoolExecutor","method":"runWorker","line":1128},{"class":"java.util.concurrent.ThreadPoolExecutor$Worker","method":"run","line":628},{"class":"java.lang.Thread","method":"run","line":829}]}}

Here's a docker compose file for easy issue reproduction

services:
  postgres:
    image: postgres:latest
    container_name: postgres
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
      POSTGRES_DB: postgres
    command:
      - "postgres"
      - "-c"
      - "wal_level=logical"
    ports:
      - "5432:5432"
    networks:
      - debezium-net
    volumes:
      - pgdata:/var/lib/postgresql/data

  debezium:
    image: debezium/server:3.0.0.Final
    container_name: debezium-server
    environment:
      - DEBEZIUM_SINK_TYPE=nats-jetstream
      - DEBEZIUM_SINK_NATS_JETSTREAM_URL=nats://nats:4222
      - DEBEZIUM_SINK_NATS_JETSTREAM_CREATE_STREAM=true
      - DEBEZIUM_SINK_NATS_JETSTREAM_STREAM=DebeziumStream
      - DEBEZIUM_SINK_NATS_JETSTREAM_SUBJECTS=dbz,dbz.>
      - DEBEZIUM_SOURCE_CONNECTOR_CLASS=io.debezium.connector.postgresql.PostgresConnector
      - DEBEZIUM_SOURCE_OFFSET_STORAGE_FILE_FILENAME=data/offsets.dat
      - DEBEZIUM_SOURCE_OFFSET_FLUSH_INTERVAL_MS=0
      - DEBEZIUM_SOURCE_DATABASE_HOSTNAME=postgres
      - DEBEZIUM_SOURCE_DATABASE_PORT=5432
      - DEBEZIUM_SOURCE_DATABASE_USER=postgres
      - DEBEZIUM_SOURCE_DATABASE_PASSWORD=postgres
      - DEBEZIUM_SOURCE_DATABASE_DBNAME=postgres
      - DEBEZIUM_SOURCE_PLUGIN_NAME=pgoutput
      - DEBEZIUM_SOURCE_TOPIC_PREFIX=dbz
      - DEBEZIUM_SOURCE_SCHEMA_INCLUDE_LIST=public
      - DEBEZIUM_FORMAT_VALUE=avro
      - DEBEZIUM_FORMAT_VALUE_APICURIO_REGISTRY_URL=http://apicurio:8080/apis/registry/v2
      - DEBEZIUM_FORMAT_VALUE_APICURIO_REGISTRY_AUTO-REGISTER=true
      - DEBEZIUM_FORMAT_VALUE_APICURIO_REGISTRY_FIND-LATEST=true
      - ENABLE_APICURIO_CONVERTERS=true
    ports:
      - "8083:8083"
    depends_on:
      - postgres
      - nats
    networks:
      - debezium-net
  
  apicurio-registry:
    container_name: apicurio
    networks:
      - debezium-net
    image: apicurio/apicurio-registry-mem:latest-release
    ports:
      - "8080:8080"

  nats:
    image: nats:latest
    container_name: nats
    ports:
      - "4222:4222"
    networks:
      - debezium-net
    command: --js -sd /data

networks:
  debezium-net:
    driver: bridge

volumes:
  pgdata:

@jpechane
Copy link
Contributor

@mcambria Hi, thanks for the PR! Could you please create a Jira describing the bug and change the commit message into DBZ-xxxx fix binary serialization with nats consumer, where DBZ-xxxx is Jira identifier for the issue.
Thanks a lot

@mcambria mcambria force-pushed the support_binary_format_with_nats branch from a4884c4 to ae16e02 Compare February 24, 2025 17:24
@mcambria
Copy link
Contributor Author

Submitted DBZ-8734 and updated the commit message!

@jpechane jpechane merged commit 8e3cb71 into debezium:main Feb 25, 2025
3 checks passed
@jpechane
Copy link
Contributor

@mcambria Applied, thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants