Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug: failed to extract payload schema: cannot resolve field type \"jsonb\" #229

Open
WORMrus opened this issue Dec 12, 2024 · 2 comments
Open
Labels
bug Something isn't working triage Needs to be triaged

Comments

@WORMrus
Copy link

WORMrus commented Dec 12, 2024

Bug description

I am trying to start a pipeline that would read rows of my outbox table into Kafka. The actual table contains about 10 columns one of which is jsonb to which I originally attributed the problem I am expiriencing.

However, during my troubleshooting I've created a table that is just a bigint Id and a varchar field and tried reading from it with the same results. I've also dropped the Kafka destination and replaced it with a file one. Even though the destination plugin did not mention any issues.

I am certain that the conenctor reaches the correct table as I originally copied it from the real one but without any constraints. The connector complained about the primary key missing. Adding it to the test table got me back to the original issue.

Conduit is running in Docker, the info endpoint returns:
{ "version": "v0.12.3", "os": "linux", "arch": "amd64" }
The connector is of version v0.10.1

Postgres is "PostgreSQL 14.3, compiled by Visual C++ build 1914, 64-bit"

My pipeline file is as follows:

version: 2.2                    # Parser version

pipelines:                      # A list of pipeline configurations
  - id: Obx               # Pipeline ID [required]
    status: running             # Pipeline status at startup (running or stopped)
    name: outbox        # Pipeline name
    description: desc           # Pipeline description
    connectors:                 # A list of connector configurations
      - id: pg
        name: Outbox
        settings:
          url: postgresql://postgres:1@my_ip:5432/db_name?search_path=schema_name
          tables: test_table
         # the following settings were added during my troubleshooting, they do not seem to change the error
          snapshotMode: never
          sdk.schema.extract.payload.enabled: false
          sdk.schema.context.enabled: false
          sdk.schema.extract.key.enabled: false
        type: source
        plugin: builtin:postgres
      - id: con2
        type: destination
        plugin: builtin:file
        name: my-file-destination
        settings:
          path: ./file2.txt
    dead-letter-queue:          # Dead-letter queue (DLQ) configuration
      plugin: "builtin:file"    # DLQ Connector plugin
      settings:                 # A map of configuration keys and values for the plugin (specific to the chosen plugin)
        path: "./dlq.out"
      window-size: 5            # DLQ nack window size
      window-nack-threshold: 2  # DLQ nack window threshold

Here are the full logs as displayed when starting a brand new container:

2024-12-12T13:07:47+00:00 INF All 0 tables opened in 0s component=badger.DB
2024-12-12T13:07:47+00:00 INF Discard stats nextEmptySlot: 0 component=badger.DB
2024-12-12T13:07:47+00:00 INF Set nextTxnTs to 0 component=badger.DB
2024-12-12T13:07:47+00:00 INF loading processor plugins from directory /app/processors ... component=plugin.processor.standalone.Registry
2024-12-12T13:07:47+00:00 WRN could not read processor plugin directory error="open /app/processors: no such file or directory" component=plugin.processor.standalone.Registry
2024-12-12T13:07:47+00:00 INF standalone processor plugins initialized component=plugin.processor.standalone.Registry count=0 plugin_path=/app/processors
2024-12-12T13:07:47+00:00 INF builtin processor plugins initialized component=plugin.processor.builtin.Registry count=17
2024-12-12T13:07:47+00:00 INF processors initialized component=processor.Service count=0
2024-12-12T13:07:47+00:00 INF connector utilities started address=[::]:41251
2024-12-12T13:07:47+00:00 INF connector utilities started on [::]:41251
2024-12-12T13:07:47+00:00 INF builtin connector plugins initialized component=plugin.connector.builtin.Registry count=6
2024-12-12T13:07:47+00:00 WRN could not read connector plugin directory error="open /app/connectors: no such file or directory" component=plugin.connector.standalone.Registry
2024-12-12T13:07:47+00:00 INF standalone connector plugins initialized component=plugin.connector.standalone.Registry count=0 plugin_path=/app/connectors
2024-12-12T13:07:47+00:00 INF connectors initialized component=connector.Service count=0
2024-12-12T13:07:47+00:00 INF pipelines initialized component=pipeline.Service count=0
2024-12-12T13:07:47+00:00 INF pipeline started component=lifecycle.Service pipeline_id=Obx
2024-12-12T13:07:47+00:00 INF pipeline configs provisioned component=provisioning.Service created=["Obx"] deleted=[] pipelines_path=/app/pipelines
2024-12-12T13:07:47+00:00 INF grpc API started address=[::]:8084
2024-12-12T13:07:47+00:00 INF destination connector plugin successfully started component=connector.Destination connector_id=Obx:con2
2024-12-12T13:07:47+00:00 INF http API started address=[::]:8080
2024-12-12T13:07:47+00:00 INF destination connector plugin successfully started component=connector.Destination connector_id=Obx-dlq
2024-12-12T13:07:47+00:00 INF
2024-12-12T13:07:47+00:00 INF click here to navigate to Conduit UI: http://localhost:8080/ui
2024-12-12T13:07:47+00:00 INF click here to navigate to explore the HTTP API: http://localhost:8080/openapi
2024-12-12T13:07:47+00:00 INF
2024-12-12T13:07:47+00:00 WRN Publication "conduitpub" already exists. component=plugin connector_id=Obx:pg plugin_name=builtin:postgres plugin_type=source
2024-12-12T13:07:47+00:00 WRN replication slot "conduitslot" already exists component=plugin connector_id=Obx:pg plugin_name=builtin:postgres plugin_type=source
2024-12-12T13:07:47+00:00 INF Starting logical replication at 2/DC3C7E78 component=plugin connector_id=Obx:pg plugin_name=builtin:postgres plugin_type=source publication=conduitpub slot=conduitslot
2024-12-12T13:07:47+00:00 INF Logical replication started component=plugin connector_id=Obx:pg plugin_name=builtin:postgres plugin_type=source publication=conduitpub slot=conduitslot
2024-12-12T13:07:47+00:00 INF source connector plugin successfully started component=connector.Source connector_id=Obx:pg
2024-12-12T13:07:47+00:00 INF source does not support batch reads, falling back to single reads component=plugin connector_id=Obx:pg plugin_name=builtin:postgres plugin_type=source
2024-12-12T13:07:47+00:00 ERR replication exited with an error error="handler error: logrepl handler insert: failed to update avro schema: failed to extract payload schema: cannot resolve field type \"jsonb\" " component=plugin connector_id=Obx:pg plugin_name=builtin:postgres plugin_type=source stack=null
2024-12-12T13:07:47+00:00 INF source connector plugin successfully torn down component=connector.Source connector_id=Obx:pg
2024-12-12T13:07:47+00:00 ERR node stopped error="node Obx:pg stopped with error: source stream was stopped unexpectedly: error reading from source: read plugin error: failed to fetch next record: logical replication error: handler error: logrepl handler insert: failed to update avro schema: failed to extract payload schema: cannot resolve field type \"jsonb\" " component=lifecycle.Service node_id=Obx:pg stack=[{"file":"/app/pkg/lifecycle/service.go","func":"github.com/conduitio/conduit/pkg/lifecycle.(*Service).runPipeline.func2","line":743},{"file":"/app/pkg/lifecycle/stream/source.go","func":"github.com/conduitio/conduit/pkg/lifecycle/stream.(*SourceNode).Run","line":146},{"file":"/app/pkg/lifecycle/stream/source.go","func":"github.com/conduitio/conduit/pkg/lifecycle/stream.(*SourceNode).Run.func1","line":90}]
2024-12-12T13:07:47+00:00 ERR node stopped error="node fanout stopped with error: context canceled" component=lifecycle.Service node_id=fanout stack=[{"file":"/app/pkg/lifecycle/service.go","func":"github.com/conduitio/conduit/pkg/lifecycle.(*Service).runPipeline.func2","line":743}]
2024-12-12T13:07:47+00:00 INF destination connector plugin successfully torn down component=connector.Destination connector_id=Obx-dlq
2024-12-12T13:07:47+00:00 INF destination connector plugin successfully torn down component=connector.Destination connector_id=Obx:con2
2024-12-12T13:07:47+00:00 ERR node stopped error="node Obx:con2-metrics stopped with error: context canceled" component=lifecycle.Service node_id=Obx:con2-metrics stack=[{"file":"/app/pkg/lifecycle/service.go","func":"github.com/conduitio/conduit/pkg/lifecycle.(*Service).runPipeline.func2","line":743}]
2024-12-12T13:07:47+00:00 ERR node stopped error="node fanin stopped with error: context canceled" component=lifecycle.Service node_id=fanin stack=[{"file":"/app/pkg/lifecycle/service.go","func":"github.com/conduitio/conduit/pkg/lifecycle.(*Service).runPipeline.func2","line":743}]
2024-12-12T13:07:47+00:00 ERR node stopped error="node Obx:con2-acker stopped with error: context canceled" component=lifecycle.Service node_id=Obx:con2-acker stack=[{"file":"/app/pkg/lifecycle/service.go","func":"github.com/conduitio/conduit/pkg/lifecycle.(*Service).runPipeline.func2","line":743}]
2024-12-12T13:07:47+00:00 ERR node stopped error="node Obx:con2 stopped with error: context canceled" component=lifecycle.Service node_id=Obx:con2 stack=[{"file":"/app/pkg/lifecycle/service.go","func":"github.com/conduitio/conduit/pkg/lifecycle.(*Service).runPipeline.func2","line":743}]

Steps to reproduce

  1. Have the postgres ready as described above (a table with an id and some varchar column)
  2. docker run -it -p 8080:8080 -v path\to\pipe.yaml:/app/pipelines/pipeline.yaml conduit.docker.scarf.sh/conduitio/conduit
  3. Observe the errors mentioned above

Version

Conduit v0.12.3 connector v0.10.1

@WORMrus WORMrus added bug Something isn't working triage Needs to be triaged labels Dec 12, 2024
@hariso
Copy link
Contributor

hariso commented Dec 12, 2024

@WORMrus Thanks for reaching out to us.:)

So, looking at your logs, I see this:

2024-12-12T13:07:47+00:00 WRN Publication "conduitpub" already exists. component=plugin connector_id=Obx:pg plugin_name=builtin:postgres plugin_type=source
2024-12-12T13:07:47+00:00 WRN replication slot "conduitslot" already exists component=plugin connector_id=Obx:pg plugin_name=builtin:postgres plugin_type=source
2024-12-12T13:07:47+00:00 INF Starting logical replication at 2/DC3C7E78 component=plugin connector_id=Obx:pg plugin_name=builtin:postgres plugin_type=source publication=conduitpub slot=conduitslot
...
2024-12-12T13:07:47+00:00 ERR replication exited with an error error="handler error: logrepl handler insert: failed to update avro schema: failed to extract payload schema: cannot resolve field type \"jsonb\" " component=plugin connector_id=Obx:pg plugin_name=builtin:postgres plugin_type=source stack=null

The connector is reading an existing publication slot, and we can also see in the logs the jsonb type mentioned, which means that it's not the table that contains just the big int and a varchar table.

I'd probably do the following:

  • delete the publication slot (or recreate the Postgres instance)
  • run Conduit again (from scratch, deleting the old container)

A table with just a primary key, big int and varchar should work, we've been using that often (on the Conduit Platform too). The jsonb column is something that we'd need to check.

@WORMrus
Copy link
Author

WORMrus commented Dec 12, 2024

@hariso Thank you for returning to me so quickly, indeed, it seems that the current state was related to the original table and its jsonb column. I did see the warnings but was not sure what to make of them as I am not familiar with replication slots\publications as a feature of PG.

As advised, I've deleted the slot with select pg_drop_replication_slot('conduitslot') and a new container was able to start. It however did not capture any changes. It could only create a replica at startup(after I changed the snapshotMode setting back to the default). An additional drop publication conduitpub was needed to make it work for both existing and new data.

With that out of the way, I've once again removed the slot and the publication, deleted the container and added a new jsonb column (not null, '{}'::jsonb as default).

I was able to start a new container. However, it threw an error when I inserted a new row (json there was just a {}):

2024-12-12T22:27:18+00:00 INF All 0 tables opened in 0s component=badger.DB
2024-12-12T22:27:18+00:00 INF Discard stats nextEmptySlot: 0 component=badger.DB
2024-12-12T22:27:18+00:00 INF Set nextTxnTs to 0 component=badger.DB
2024-12-12T22:27:18+00:00 INF loading processor plugins from directory /app/processors ... component=plugin.processor.standalone.Registry
2024-12-12T22:27:18+00:00 WRN could not read processor plugin directory error="open /app/processors: no such file or directory" component=plugin.processor.standalone.Registry
2024-12-12T22:27:18+00:00 INF standalone processor plugins initialized component=plugin.processor.standalone.Registry count=0 plugin_path=/app/processors
2024-12-12T22:27:18+00:00 INF builtin processor plugins initialized component=plugin.processor.builtin.Registry count=17
2024-12-12T22:27:18+00:00 INF processors initialized component=processor.Service count=0
2024-12-12T22:27:18+00:00 INF connector utilities started address=[::]:37225
2024-12-12T22:27:18+00:00 INF connector utilities started on [::]:37225
2024-12-12T22:27:18+00:00 INF builtin connector plugins initialized component=plugin.connector.builtin.Registry count=6
2024-12-12T22:27:18+00:00 WRN could not read connector plugin directory error="open /app/connectors: no such file or directory" component=plugin.connector.standalone.Registry
2024-12-12T22:27:18+00:00 INF standalone connector plugins initialized component=plugin.connector.standalone.Registry count=0 plugin_path=/app/connectors
2024-12-12T22:27:18+00:00 INF connectors initialized component=connector.Service count=0
2024-12-12T22:27:18+00:00 INF pipelines initialized component=pipeline.Service count=0
2024-12-12T22:27:18+00:00 INF pipeline started component=lifecycle.Service pipeline_id=Obx
2024-12-12T22:27:18+00:00 INF pipeline configs provisioned component=provisioning.Service created=["Obx"] deleted=[] pipelines_path=/app/pipelines
2024-12-12T22:27:18+00:00 INF grpc API started address=[::]:8084
2024-12-12T22:27:18+00:00 INF destination connector plugin successfully started component=connector.Destination connector_id=Obx:con2
2024-12-12T22:27:18+00:00 INF destination connector plugin successfully started component=connector.Destination connector_id=Obx-dlq
2024-12-12T22:27:18+00:00 INF http API started address=[::]:8080
2024-12-12T22:27:18+00:00 INF
2024-12-12T22:27:18+00:00 INF click here to navigate to Conduit UI: http://localhost:8080/ui
2024-12-12T22:27:18+00:00 INF click here to navigate to explore the HTTP API: http://localhost:8080/openapi
2024-12-12T22:27:18+00:00 INF
2024-12-12T22:27:18+00:00 INF Starting logical replication at 2/DC882DC8 component=plugin connector_id=Obx:pg plugin_name=builtin:postgres plugin_type=source publication=conduitpub slot=conduitslot
2024-12-12T22:27:18+00:00 INF Logical replication started component=plugin connector_id=Obx:pg plugin_name=builtin:postgres plugin_type=source publication=conduitpub slot=conduitslot
2024-12-12T22:27:18+00:00 INF source connector plugin successfully started component=connector.Source connector_id=Obx:pg
2024-12-12T22:27:18+00:00 INF source does not support batch reads, falling back to single reads component=plugin connector_id=Obx:pg plugin_name=builtin:postgres plugin_type=source
2024-12-12T22:27:37+00:00 ERR replication exited with an error error="handler error: logrepl handler insert: failed to update avro schema: failed to extract payload schema: cannot resolve field type \"jsonb\" " component=plugin connector_id=Obx:pg plugin_name=builtin:postgres plugin_type=source stack=null
2024-12-12T22:27:37+00:00 INF source connector plugin successfully torn down component=connector.Source connector_id=Obx:pg
2024-12-12T22:27:37+00:00 ERR node stopped error="node Obx:pg stopped with error: source stream was stopped unexpectedly: error reading from source: read plugin error: failed to fetch next record: logical replication error: handler error: logrepl handler insert: failed to update avro schema: failed to extract payload schema: cannot resolve field type \"jsonb\" " component=lifecycle.Service node_id=Obx:pg stack=[{"file":"/app/pkg/lifecycle/service.go","func":"github.com/conduitio/conduit/pkg/lifecycle.(*Service).runPipeline.func2","line":743},{"file":"/app/pkg/lifecycle/stream/source.go","func":"github.com/conduitio/conduit/pkg/lifecycle/stream.(*SourceNode).Run","line":146},{"file":"/app/pkg/lifecycle/stream/source.go","func":"github.com/conduitio/conduit/pkg/lifecycle/stream.(*SourceNode).Run.func1","line":90}]
2024-12-12T22:27:37+00:00 ERR node stopped error="node fanout stopped with error: context canceled" component=lifecycle.Service node_id=fanout stack=[{"file":"/app/pkg/lifecycle/service.go","func":"github.com/conduitio/conduit/pkg/lifecycle.(*Service).runPipeline.func2","line":743}]
2024-12-12T22:27:37+00:00 ERR node stopped error="node Obx:con2-acker stopped with error: context canceled" component=lifecycle.Service node_id=Obx:con2-acker stack=[{"file":"/app/pkg/lifecycle/service.go","func":"github.com/conduitio/conduit/pkg/lifecycle.(*Service).runPipeline.func2","line":743}]
2024-12-12T22:27:37+00:00 INF destination connector plugin successfully torn down component=connector.Destination connector_id=Obx-dlq
2024-12-12T22:27:37+00:00 ERR node stopped error="node Obx:con2-metrics stopped with error: context canceled" component=lifecycle.Service node_id=Obx:con2-metrics stack=[{"file":"/app/pkg/lifecycle/service.go","func":"github.com/conduitio/conduit/pkg/lifecycle.(*Service).runPipeline.func2","line":743}]
2024-12-12T22:27:37+00:00 INF destination connector plugin successfully torn down component=connector.Destination connector_id=Obx:con2
2024-12-12T22:27:37+00:00 ERR node stopped error="node Obx:pg-metrics stopped with error: context canceled" component=lifecycle.Service node_id=Obx:pg-metrics stack=[{"file":"/app/pkg/lifecycle/service.go","func":"github.com/conduitio/conduit/pkg/lifecycle.(*Service).runPipeline.func2","line":743}]
2024-12-12T22:27:37+00:00 ERR node stopped error="node fanin stopped with error: context canceled" component=lifecycle.Service node_id=fanin stack=[{"file":"/app/pkg/lifecycle/service.go","func":"github.com/conduitio/conduit/pkg/lifecycle.(*Service).runPipeline.func2","line":743}]
2024-12-12T22:27:37+00:00 ERR node stopped error="node Obx:con2 stopped with error: context canceled" component=lifecycle.Service node_id=Obx:con2 stack=[{"file":"/app/pkg/lifecycle/service.go","func":"github.com/conduitio/conduit/pkg/lifecycle.(*Service).runPipeline.func2","line":743}]
2024-12-12T22:27:37+00:00 INF restarting with backoff attempt=1 component=lifecycle.Service duration=1917.753816 pipeline_id=Obx

So it seems that something is off with jsonb columns

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working triage Needs to be triaged
Projects
Status: Triage
Development

No branches or pull requests

2 participants