Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

【惊天巨bug】AddOpFieldForDebeziumRecord无法正确添加__op #25

Open
bulolo opened this issue Jun 24, 2024 · 4 comments
Open

【惊天巨bug】AddOpFieldForDebeziumRecord无法正确添加__op #25

bulolo opened this issue Jun 24, 2024 · 4 comments

Comments

@bulolo
Copy link

bulolo commented Jun 24, 2024

完整复现过程:https://junyao.tech/posts/e4464a42.html

  • mongodb 6.0 、mysql8
  • debezium/connect:2.4.2.Final
  • kafka 2.4.1

都无法通过AddOpFieldForDebeziumRecord 添加__op:1 或者 __op:0

mysql

        "transforms": "addfield,unwrap",
        "transforms.addfield.type": "com.starrocks.connector.kafka.transforms.AddOpFieldForDebeziumRecord",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "true",
        "transforms.unwrap.delete.handling.mode": "rewrite"

mongo

        "transforms": "addfield,unwrap",
        "transforms.addfield.type": "com.starrocks.connector.kafka.transforms.AddOpFieldForDebeziumRecord",
        "transforms.unwrap.type": "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState",
        "transforms.unwrap.drop.tombstones": "true",
        "transforms.unwrap.delete.handling.mode": "rewrite"
@bulolo
Copy link
Author

bulolo commented Jun 25, 2024

image 实际加不上__op

@bulolo bulolo changed the title 无法完成删除 AddOpFieldForDebeziumRecord无法正确添加__op Jun 26, 2024
@bulolo bulolo changed the title AddOpFieldForDebeziumRecord无法正确添加__op 【惊天巨bug】AddOpFieldForDebeziumRecord无法正确添加__op Jun 27, 2024
@Desperado2
Copy link

这是因为只实现了按照MySQL的binlog的实现,也就是说是否会添加__op取决于binlog数据中是否有op这个字段,按照debezium MySQL的日志格式来看,其中是有op这样的字段标识增删改的。
debezium文档中提到的格式

{
  "schema": { ... },
  "payload": {
    "before": { 
      "id": 1004,
      "first_name": "Anne",
      "last_name": "Kretchmar",
      "email": "[email protected]"
    },
    "after": { 
      "id": 1004,
      "first_name": "Anne Marie",
      "last_name": "Kretchmar",
      "email": "[email protected]"
    },
    "source": { 
      "version": "3.0.1.Final",
      "name": "mysql-server-1",
      "connector": "mysql",
      "ts_ms": 1465581029100,
      "ts_us": 1465581029100000,
      "ts_ns": 1465581029100000000,
      "snapshot": false,
      "db": "inventory",
      "table": "customers",
      "server_id": 223344,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 484,
      "row": 0,
      "thread": 7,
      "query": "UPDATE customers SET first_name='Anne Marie' WHERE id=1004"
    },
    "op": "u", 
    "ts_ms": 1465581029523, 
    "ts_us": 1465581029523758, 
    "ts_ns": 1465581029523758914 
  }
}

具体可见:https://debezium.io/documentation/reference/stable/connectors/mysql.html

具体实现代码可参见:

    private static final String OP = "op";
    private static final String OP_C = "c";
    private static final String OP_U = "u";
    private static final String OP_D = "d";

public R apply(R record) {
        try {
            String op;
            try {
                op = (String) value.get(OP);
            } catch (Exception e) {
                return record;
            }
            if (op.equals(OP_C) || op.equals(OP_U)) {
                Struct newValue = updateValue(value, AFTER);
                return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), newValue.schema(), newValue, record.timestamp());
            } else if (op.equals(OP_D)) {
                Struct newValue = updateValue(value, BEFORE);
                return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), newValue.schema(), newValue, record.timestamp());
            }
        } catch (Exception e) {
            return record;
        }
        return record;
    }

由此可知,其是通过是否包含 op 字段来进行是否添加的条件的。

解决方式:
您可以通过在Connect的配置中添加columns和jsonpaths的方式解决。可参考:https://docs.starrocks.io/zh/docs/loading/Load_to_Primary_Key_tables/#upsert-%E5%92%8C-delete

{
    "name": "topic-connect",
    "config": {
        "connector.class": "com.starrocks.connector.kafka.StarRocksSinkConnector",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
        "sink.properties.columns": "_id,name,__deleted,__op = __deleted",
        "sink.properties.jsonpaths": "[\"$._id\", \"$.name\", \"$.__deleted\"]",
        "transforms": "decrypt",
        "transforms.decrypt.type": "com.starrocks.connector.kafka.transforms.DecryptJsonTransformation",
        "transforms.decrypt.secret.key": "6253c3d7db9723f1d43b5e447c7ddf0da9e68dc121829daa0a06438b5deba549"
    }
}

@bulolo
Copy link
Author

bulolo commented Nov 12, 2024

@Desperado2
https://debezium.io/documentation/reference/stable/transformations/event-flattening.html

要看一下这个SMT将标准的数据解开,StarRocksSinkConnector,应该要支持SMT模式,

另外上面的解决方法意味着columns和jsonpaths,如果有100个字段,要添加100次?

@bulolo
Copy link
Author

bulolo commented Nov 13, 2024

这是因为只实现了按照MySQL的binlog的实现,也就是说是否会添加__op取决于binlog数据中是否有op这个字段,按照debezium MySQL的日志格式来看,其中是有op这样的字段标识增删改的。 debezium文档中提到的格式

{
  "schema": { ... },
  "payload": {
    "before": { 
      "id": 1004,
      "first_name": "Anne",
      "last_name": "Kretchmar",
      "email": "[email protected]"
    },
    "after": { 
      "id": 1004,
      "first_name": "Anne Marie",
      "last_name": "Kretchmar",
      "email": "[email protected]"
    },
    "source": { 
      "version": "3.0.1.Final",
      "name": "mysql-server-1",
      "connector": "mysql",
      "ts_ms": 1465581029100,
      "ts_us": 1465581029100000,
      "ts_ns": 1465581029100000000,
      "snapshot": false,
      "db": "inventory",
      "table": "customers",
      "server_id": 223344,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 484,
      "row": 0,
      "thread": 7,
      "query": "UPDATE customers SET first_name='Anne Marie' WHERE id=1004"
    },
    "op": "u", 
    "ts_ms": 1465581029523, 
    "ts_us": 1465581029523758, 
    "ts_ns": 1465581029523758914 
  }
}

具体可见:https://debezium.io/documentation/reference/stable/connectors/mysql.html

具体实现代码可参见:

    private static final String OP = "op";
    private static final String OP_C = "c";
    private static final String OP_U = "u";
    private static final String OP_D = "d";

public R apply(R record) {
        try {
            String op;
            try {
                op = (String) value.get(OP);
            } catch (Exception e) {
                return record;
            }
            if (op.equals(OP_C) || op.equals(OP_U)) {
                Struct newValue = updateValue(value, AFTER);
                return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), newValue.schema(), newValue, record.timestamp());
            } else if (op.equals(OP_D)) {
                Struct newValue = updateValue(value, BEFORE);
                return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), newValue.schema(), newValue, record.timestamp());
            }
        } catch (Exception e) {
            return record;
        }
        return record;
    }

由此可知,其是通过是否包含 op 字段来进行是否添加的条件的。

解决方式: 您可以通过在Connect的配置中添加columns和jsonpaths的方式解决。可参考:https://docs.starrocks.io/zh/docs/loading/Load_to_Primary_Key_tables/#upsert-%E5%92%8C-delete

{
    "name": "topic-connect",
    "config": {
        "connector.class": "com.starrocks.connector.kafka.StarRocksSinkConnector",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
        "sink.properties.columns": "_id,name,__deleted,__op = __deleted",
        "sink.properties.jsonpaths": "[\"$._id\", \"$.name\", \"$.__deleted\"]",
        "transforms": "decrypt",
        "transforms.decrypt.type": "com.starrocks.connector.kafka.transforms.DecryptJsonTransformation",
        "transforms.decrypt.secret.key": "6253c3d7db9723f1d43b5e447c7ddf0da9e68dc121829daa0a06438b5deba549"
    }
}

另外当初你们有复现过,标准的mysql binlog也是不行的(删除操作)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants