Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for logical AVRO types. #40

Open
aymkhalil opened this issue Aug 25, 2022 · 1 comment
Open

Add support for logical AVRO types. #40

aymkhalil opened this issue Aug 25, 2022 · 1 comment

Comments

@aymkhalil
Copy link
Collaborator

aymkhalil commented Aug 25, 2022

There are three different ways a pulsar sink can receive a logical type depending on how the message was generated. Below, I'll use C* Decimal type (BigDecimal in Java) as an example:

  1. Using Pulsar API Schema.AVRO(org.apache.pulsar.client.api.MyPojo.class) where MyPojo has a BigDecimal field. The generate output schema is:
{
        "name": "bigDecimal",
        "type": [
          "null",
          {
            "type": "string",
            "java-class": "java.math.BigDecimal"
          }
        ]
    }

The pulsar sink will fail with

Caused by: java.lang.UnsupportedOperationException: No recommended schema for decimal (scale is required)
  1. Using Native AVRO APIs (LogicalTypes.decimal(precision, scale).addToSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BYTES))))) to add logical schema to bytes. This is matching the AVRO standard mentioned here. Example schema:
{
        "name": "v2",
        "type": {
          "type": "bytes",
          "logicalType": "decimal",
          "precision": 17,
          "scale": 4
        }
  1. Support Cassandra CDC logical types (new option decodeCDCDataTypes) #42 Using a logical CQL type that comes from a C* CDC source, the schema will be:
 {
              "name": "fieldName",
              "type": [
                "null",
                {
                  "type": "record",
                  "name": "cql_decimal",
                  "namespace": "",
                  "fields": [
                    {
                      "name": "bigint",
                      "type": "bytes"
                    },
                    {
                      "name": "scale",
                      "type": "int"
                    }
                  ],
                  "logicalType": "cql_decimal"
                }
              ]
            },

and this will cause the sink to fail with:

com.datastax.oss.sink.pulsar.CassandraSinkTask - Error decoding/mapping Pulsar record PulsarSinkRecord{SinkRecord(sourceRecord=PulsarRecord(topicName=Optional[persist    topicname], partition=0, message=Optional[org.apache.pulsar.client.impl.TopicMessageImpl@20958f19], schema=KeyValueSchema(SEP     ARATED,org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema@4d53f4e4,org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema@a79d566), failFunction=org.apache.pulsar.functions.source.PulsarSource$     $Lambda$586/0x00000008407be840@2d24432d, ackFunction=org.apache.pulsar.functions.source.PulsarSource$$Lambda$585/0x00000008407bf440@21dc87f6), value=(key = "org.apache.pulsar.client.impl.schema.generic.GenericAvro     Record@e180683", value = "org.apache.pulsar.client.impl.schema.generic.Generic
@aymkhalil
Copy link
Collaborator Author

Similar reported issue #29

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant