From c15757ba38755089a8929ba33964578ff6fcfd5e Mon Sep 17 00:00:00 2001
From: sunxiaojian
Date: Tue, 12 Apr 2022 20:31:27 +0800
Subject: [PATCH 01/22] SinkTask and SourceTask implement the validate method
https://github.com/apache/rocketmq-connect/issues/85
---
.../connector/api/component/task/sink/SinkTask.java | 10 ++++++++++
.../api/component/task/source/SourceTask.java | 10 ++++++++++
2 files changed, 20 insertions(+)
diff --git a/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkTask.java b/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkTask.java
index 0268397..fda1207 100644
--- a/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkTask.java
+++ b/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkTask.java
@@ -14,6 +14,7 @@
package io.openmessaging.connector.api.component.task.sink;
+import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.task.Task;
import io.openmessaging.connector.api.data.ConnectRecord;
import io.openmessaging.connector.api.data.RecordOffset;
@@ -57,4 +58,13 @@ public Map preCommit(Map
Date: Fri, 15 Apr 2022 17:00:53 +0800
Subject: [PATCH 02/22] Adjust the init and start methods of the component
interface
---
.../connector/api/component/Component.java | 8 ++++----
.../api/component/connector/Connector.java | 2 +-
.../api/component/task/sink/SinkTask.java | 2 +-
.../api/component/task/source/SourceTask.java | 17 ++++-------------
4 files changed, 10 insertions(+), 19 deletions(-)
diff --git a/connector/src/main/java/io/openmessaging/connector/api/component/Component.java b/connector/src/main/java/io/openmessaging/connector/api/component/Component.java
index 0fed332..0349c2f 100644
--- a/connector/src/main/java/io/openmessaging/connector/api/component/Component.java
+++ b/connector/src/main/java/io/openmessaging/connector/api/component/Component.java
@@ -28,16 +28,16 @@ public interface Component {
/**
* Init the component
*
- * @param config component config
+ * @param context component context
*/
- void init(KeyValue config);
+ void init(R context);
/**
* Start the component
*
- * @param componentContext component context
+ * @param config component context
*/
- void start(R componentContext);
+ void start(KeyValue config);
/**
* Stop the component.
diff --git a/connector/src/main/java/io/openmessaging/connector/api/component/connector/Connector.java b/connector/src/main/java/io/openmessaging/connector/api/component/connector/Connector.java
index dd8ac3e..d9272a5 100644
--- a/connector/src/main/java/io/openmessaging/connector/api/component/connector/Connector.java
+++ b/connector/src/main/java/io/openmessaging/connector/api/component/connector/Connector.java
@@ -23,7 +23,7 @@ public abstract class Connector implements Component {
protected ConnectorContext connectorContext;
- @Override public void start(ConnectorContext connectorContext) {
+ @Override public void init(ConnectorContext connectorContext) {
this.connectorContext = connectorContext;
}
diff --git a/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkTask.java b/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkTask.java
index fda1207..c26d482 100644
--- a/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkTask.java
+++ b/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkTask.java
@@ -27,7 +27,7 @@ public abstract class SinkTask implements Task {
protected SinkTaskContext sinkTaskContext;
- @Override public void start(SinkTaskContext sinkTaskContext) {
+ @Override public void init(SinkTaskContext sinkTaskContext) {
this.sinkTaskContext = sinkTaskContext;
}
diff --git a/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceTask.java b/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceTask.java
index 8e07abc..5fec1a9 100644
--- a/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceTask.java
+++ b/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceTask.java
@@ -22,8 +22,7 @@
public abstract class SourceTask implements Task {
protected SourceTaskContext sourceTaskContext;
-
- @Override public void start(SourceTaskContext sourceTaskContext) {
+ @Override public void init(SourceTaskContext sourceTaskContext) {
this.sourceTaskContext = sourceTaskContext;
}
@@ -46,7 +45,6 @@ public abstract class SourceTask implements Task {
*
*
* @throws InterruptedException task thread interupt exception
- *
* @param connectRecords connect records
*/
public void commit(final List connectRecords) throws InterruptedException {
@@ -57,25 +55,18 @@ public void commit(final List connectRecords) throws InterruptedE
* If the user wants to use external storage to save the position,user can implement this
* function.
*/
- public void commit() {
- }
+ public void commit() { }
/**
* Get source task context.
- *
* @return source task context
*/
+ @Deprecated
public SourceTaskContext getContext() {
return sourceTaskContext;
}
- /**
- * Should invoke before start the connector.
- * @param config component config
- */
@Override
- public void validate(KeyValue config) {
-
- }
+ public void validate(KeyValue config){}
}
From fa134ee3954b3e130d0052ecfd48aa89ec512180 Mon Sep 17 00:00:00 2001
From: sunxiaojian
Date: Fri, 15 Apr 2022 17:17:37 +0800
Subject: [PATCH 03/22] Set pause and resume to deprecated methods. It feels
like they can be removed
---
.../connector/api/component/task/Task.java | 2 ++
.../api/component/task/sink/SinkTask.java | 16 +++++++++++++++
.../api/component/task/source/SourceTask.java | 20 ++++++++++++++++++-
3 files changed, 37 insertions(+), 1 deletion(-)
diff --git a/connector/src/main/java/io/openmessaging/connector/api/component/task/Task.java b/connector/src/main/java/io/openmessaging/connector/api/component/task/Task.java
index 3254e1c..5c34f59 100644
--- a/connector/src/main/java/io/openmessaging/connector/api/component/task/Task.java
+++ b/connector/src/main/java/io/openmessaging/connector/api/component/task/Task.java
@@ -22,11 +22,13 @@ public interface Task extends Component {
/**
* Pause the task.
*/
+ @Deprecated
void pause();
/**
* Resume the task.
*/
+ @Deprecated
void resume();
}
diff --git a/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkTask.java b/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkTask.java
index c26d482..9fc8f7a 100644
--- a/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkTask.java
+++ b/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkTask.java
@@ -67,4 +67,20 @@ public Map preCommit(Map
Date: Tue, 24 May 2022 21:44:05 +0800
Subject: [PATCH 04/22] Add struct object and optimize schema and schema
builder API #41
---
.../connector/api/data/FieldType.java | 115 +++++++-
.../connector/api/data/Schema.java | 234 ++++++++++++++--
.../connector/api/data/SchemaBuilder.java | 193 ++++++++++++-
.../connector/api/data/Struct.java | 261 ++++++++++++++++++
.../api/errors/SchemaBuilderException.java | 28 ++
5 files changed, 789 insertions(+), 42 deletions(-)
create mode 100644 connector/src/main/java/io/openmessaging/connector/api/data/Struct.java
create mode 100644 connector/src/main/java/io/openmessaging/connector/api/errors/SchemaBuilderException.java
diff --git a/connector/src/main/java/io/openmessaging/connector/api/data/FieldType.java b/connector/src/main/java/io/openmessaging/connector/api/data/FieldType.java
index 9eb1009..0ff6467 100644
--- a/connector/src/main/java/io/openmessaging/connector/api/data/FieldType.java
+++ b/connector/src/main/java/io/openmessaging/connector/api/data/FieldType.java
@@ -14,47 +14,132 @@
package io.openmessaging.connector.api.data;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
/**
* Define the field type.
*/
public enum FieldType {
- /** Short Integer */
+
+ /**
+ * Byte Integer
+ */
INT8,
- /** Integer */
+ /**
+ * Short
+ */
+ INT16,
+
+ /**
+ * Integer
+ */
INT32,
- /** Long */
+ /**
+ * Long
+ */
INT64,
- /** BigInteger */
- BIG_INTEGER,
-
- /** Float */
+ /**
+ * Float
+ */
FLOAT32,
- /** Double */
+ /**
+ * Double
+ */
FLOAT64,
- /** Boolean */
+ /**
+ * Boolean
+ */
BOOLEAN,
- /** String */
+ /**
+ * String
+ */
STRING,
- /** Byte */
+ /**
+ * Byte
+ */
BYTES,
- /** List */
+ /**
+ * List
+ */
ARRAY,
- /** Map */
+ /**
+ * Map
+ */
MAP,
- /** Date */
+ /**
+ * Date
+ */
DATETIME,
+
/**
* Structured
*/
- STRUCT
+ STRUCT;
+
+ /**
+ * Maps Schema.Types to a list of Java classes that can be used to represent them.
+ */
+ public static final Map> SCHEMA_TYPE_CLASSES = new EnumMap<>(FieldType.class);
+ /**
+ * Maps the Java classes to the corresponding Schema.Type.
+ */
+ public static final Map, FieldType> JAVA_CLASS_SCHEMA_TYPES = new HashMap<>();
+
+ static {
+ SCHEMA_TYPE_CLASSES.put(FieldType.INT8, Collections.singletonList((Class) Byte.class));
+ SCHEMA_TYPE_CLASSES.put(FieldType.INT16, Collections.singletonList((Class) Short.class));
+ SCHEMA_TYPE_CLASSES.put(FieldType.INT32, Collections.singletonList((Class) Integer.class));
+ SCHEMA_TYPE_CLASSES.put(FieldType.INT64, Collections.singletonList((Class) Long.class));
+ SCHEMA_TYPE_CLASSES.put(FieldType.FLOAT32, Collections.singletonList((Class) Float.class));
+ SCHEMA_TYPE_CLASSES.put(FieldType.FLOAT64, Collections.singletonList((Class) Double.class));
+ SCHEMA_TYPE_CLASSES.put(FieldType.BOOLEAN, Collections.singletonList((Class) Boolean.class));
+ SCHEMA_TYPE_CLASSES.put(FieldType.STRING, Collections.singletonList((Class) String.class));
+ SCHEMA_TYPE_CLASSES.put(FieldType.BYTES, Arrays.asList((Class) byte[].class, (Class) ByteBuffer.class));
+ SCHEMA_TYPE_CLASSES.put(FieldType.ARRAY, Collections.singletonList((Class) List.class));
+ SCHEMA_TYPE_CLASSES.put(FieldType.MAP, Collections.singletonList((Class) Map.class));
+ SCHEMA_TYPE_CLASSES.put(FieldType.STRUCT, Collections.singletonList((Class) Struct.class));
+
+ for (Map.Entry> schemaClasses : SCHEMA_TYPE_CLASSES.entrySet()) {
+ for (Class> schemaClass : schemaClasses.getValue()) {
+ JAVA_CLASS_SCHEMA_TYPES.put(schemaClass, schemaClasses.getKey());
+ }
+ }
+ }
+
+ /**
+ * Determine whether it is a basic type
+ *
+ * @return
+ */
+ public boolean isPrimitive() {
+ switch (this) {
+ case INT8:
+ case INT16:
+ case INT32:
+ case INT64:
+ case FLOAT32:
+ case FLOAT64:
+ case BOOLEAN:
+ case STRING:
+ case BYTES:
+ return true;
+ }
+ return false;
+ }
}
diff --git a/connector/src/main/java/io/openmessaging/connector/api/data/Schema.java b/connector/src/main/java/io/openmessaging/connector/api/data/Schema.java
index 5db07c6..f4174ea 100644
--- a/connector/src/main/java/io/openmessaging/connector/api/data/Schema.java
+++ b/connector/src/main/java/io/openmessaging/connector/api/data/Schema.java
@@ -14,7 +14,12 @@
package io.openmessaging.connector.api.data;
+import io.openmessaging.connector.api.errors.ConnectException;
+
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
/**
* Schema
@@ -25,7 +30,22 @@ public class Schema {
* Name of the schema.
*/
private String name;
-
+ /**
+ * schema version
+ */
+ private Integer version;
+ /**
+ * optional
+ */
+ private boolean optional;
+ /**
+ * default value
+ */
+ private Object defaultValue;
+ /**
+ * Description schema
+ */
+ private String doc;
/**
* Type of the field
*/
@@ -34,13 +54,106 @@ public class Schema {
* Structure of the schema, contains a list of {@link Field}
*/
private List fields;
+ private Map fieldsByName;
- public Schema(String name, FieldType fieldType, List fields) {
- this.name = name;
+ /**
+ * map type
+ */
+ private Schema keySchema;
+ private Schema valueSchema;
+
+ /**
+ * Construct a Schema. Most users should not construct schemas manually, preferring {@link SchemaBuilder} instead.
+ */
+ public Schema(String name,FieldType fieldType, boolean optional, Object defaultValue, Integer version, String doc, List fields,Schema keySchema, Schema valueSchema) {
this.fieldType = fieldType;
- this.fields = fields;
+ this.optional = optional;
+ this.defaultValue = defaultValue;
+ this.name = name;
+ this.version = version;
+ this.doc = doc;
+ if (this.fieldType == FieldType.STRUCT) {
+ this.fields = fields == null ? Collections.emptyList() : fields;
+ this.fieldsByName = new HashMap<>(this.fields.size());
+ for (Field field : this.fields) {
+ fieldsByName.put(field.getName(), field);
+ }
+ } else {
+ this.fields = null;
+ this.fieldsByName = null;
+ }
+ this.keySchema = keySchema;
+ this.valueSchema = valueSchema;
+ }
+
+
+ /**
+ * Construct a Schema. Most users should not construct schemas manually, preferring {@link SchemaBuilder} instead.
+ */
+ public Schema(String name,FieldType fieldType, boolean optional, Object defaultValue, Integer version, List fields) {
+ this(name, fieldType, optional, defaultValue, version, null, fields, null, null);
+ }
+
+
+ /**
+ * Construct a Schema. Most users should not construct schemas manually, preferring {@link SchemaBuilder} instead.
+ */
+ public Schema(String name, FieldType fieldType, List fields) {
+ this(name, fieldType, false, null, null, fields);
+ }
+
+ /**
+ * get field
+ * @param fieldName
+ * @return
+ */
+ public Field getField(String fieldName) {
+ if (fieldsByName.containsKey(fieldName)){
+ return fieldsByName.get(fieldName);
+ }
+ return null;
+ }
+
+ /**
+ * add field
+ * @param field
+ */
+ public void addField(Field field) {
+ this.fields.add(field);
+ this.fieldsByName.put(field.getName(), field);
+ }
+
+ public Schema keySchema() {
+ return keySchema;
+ }
+ public Schema valueSchema() {
+ return valueSchema;
}
+ public boolean isOptional() {
+ return optional;
+ }
+
+ public Object defaultValue(){
+ return defaultValue;
+ }
+
+ /**
+ * Get the optional version of the schema. If a version is included, newer versions *must* be larger than older ones.
+ * @return the version of this schema
+ */
+ public Integer version(){
+ return version;
+ }
+
+ /**
+ * @return the documentation for this schema
+ */
+ public String doc() {
+ return doc;
+ }
+
+
public String getName() {
return name;
}
@@ -49,6 +162,14 @@ public void setName(String name) {
this.name = name;
}
+ public FieldType getFieldType() {
+ return fieldType;
+ }
+
+ public void setFieldType(FieldType fieldType) {
+ this.fieldType = fieldType;
+ }
+
public List getFields() {
return fields;
}
@@ -57,34 +178,101 @@ public void setFields(List fields) {
this.fields = fields;
}
- public Field getField(String fieldName) {
+ /**
+ * Validate that the value can be used for this schema
+ */
+ public void validateValue(Object value) {
+ validateValue(this, value);
+ }
+ /**
+ * Validate that the value can be used with the schema
+ */
+ public static void validateValue(Schema schema, Object value) {
+ validateValue(null, schema, value);
+ }
+ public static void validateValue(String name, Schema schema, Object value) {
- for (Field field : fields) {
- if (field.getName()
- .equals(fieldName)) {
- return field;
+ // check optional
+ if (value == null) {
+ if (!schema.isOptional()) {
+ throw new ConnectException("Invalid value: null used for required field: \"" + name
+ + "\", schema type: " + schema.getFieldType());
}
+ return;
}
- return null;
- }
- public void addField(Field field) {
- this.fields.add(field);
- }
+ // check field type
+ List expectedClasses = expectedClassesFor(schema);
- public FieldType getFieldType() {
- return fieldType;
+ if (expectedClasses == null) {
+ throw new ConnectException("Invalid Java object for schema type " + schema.getFieldType()
+ + ": " + value.getClass()
+ + " for field: \"" + name + "\"");
+ }
+
+ boolean foundMatch = false;
+ if (expectedClasses.size() == 1) {
+ foundMatch = expectedClasses.get(0).isInstance(value);
+ } else {
+ for (Class> expectedClass : expectedClasses) {
+ if (expectedClass.isInstance(value)) {
+ foundMatch = true;
+ break;
+ }
+ }
+ }
+
+ if (!foundMatch) {
+ throw new ConnectException("Invalid Java object for schema type " + schema.getFieldType()
+ + ": " + value.getClass()
+ + " for field: \"" + name + "\"");
+ }
+
+ switch (schema.getFieldType()) {
+ case STRUCT:
+ Struct struct = (Struct) value;
+ if (!struct.schema().equals(schema)) {
+ throw new ConnectException("Struct schemas do not match.");
+ }
+ struct.validate();
+ break;
+ case ARRAY:
+ List> array = (List>) value;
+ for (Object entry : array) {
+ validateValue(schema.valueSchema(), entry);
+ }
+ break;
+ case MAP:
+ Map, ?> map = (Map, ?>) value;
+ for (Map.Entry, ?> entry : map.entrySet()) {
+ validateValue(schema.keySchema(), entry.getKey());
+ validateValue(schema.valueSchema(), entry.getValue());
+ }
+ break;
+ }
}
- public void setFieldType(FieldType fieldType) {
- this.fieldType = fieldType;
+ /**
+ * expected classes for
+ * @param schema
+ * @return
+ */
+ private static List expectedClassesFor(Schema schema) {
+ return FieldType.SCHEMA_TYPE_CLASSES.get(schema.getFieldType());
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "Schema{" +
- "name='" + name + '\'' +
- ", fieldType=" + fieldType +
- ", fields=" + fields +
- '}';
+ "name='" + name + '\'' +
+ ", version=" + version +
+ ", optional=" + optional +
+ ", defaultValue=" + defaultValue +
+ ", doc='" + doc + '\'' +
+ ", fieldType=" + fieldType +
+ ", fields=" + fields +
+ ", keySchema=" + keySchema +
+ ", valueSchema=" + valueSchema +
+ '}';
}
}
diff --git a/connector/src/main/java/io/openmessaging/connector/api/data/SchemaBuilder.java b/connector/src/main/java/io/openmessaging/connector/api/data/SchemaBuilder.java
index db353e4..786b6c6 100644
--- a/connector/src/main/java/io/openmessaging/connector/api/data/SchemaBuilder.java
+++ b/connector/src/main/java/io/openmessaging/connector/api/data/SchemaBuilder.java
@@ -15,24 +15,62 @@
package io.openmessaging.connector.api.data;
import io.openmessaging.connector.api.errors.ConnectException;
+import io.openmessaging.connector.api.errors.SchemaBuilderException;
+
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
+/**
+ * schema builder
+ */
public class SchemaBuilder {
+ private static final String TYPE_FIELD = "type";
+ private static final String OPTIONAL_FIELD = "optional";
+ private static final String DEFAULT_FIELD = "default";
+ private static final String NAME_FIELD = "name";
+ private static final String VERSION_FIELD = "version";
+ private static final String DOC_FIELD = "doc";
/**
* Name of the schema.
*/
private String name;
+ /**
+ * version
+ */
+ private Integer version = null;
+ /**
+ * optional
+ */
+ private Boolean optional = null;
+ /**
+ * default value
+ */
+ private Object defaultValue = null;
+ /**
+ * Description schema
+ */
+ private String doc = null;
/**
* Type of the field
*/
private FieldType type;
+
/**
* Structure of the schema, contains a list of {@link Field}
*/
- private List fields;
+ private Map fields;
+
+ /**
+ * map type
+ */
+ private Schema keySchema;
+ private Schema valueSchema;
+
public SchemaBuilder(FieldType type) {
if (null == type) {
@@ -40,10 +78,19 @@ public SchemaBuilder(FieldType type) {
}
this.type = type;
if (type == FieldType.STRUCT) {
- fields = new ArrayList<>();
+ /**
+ * ensure sequencing
+ */
+ fields = new LinkedHashMap<>();
}
}
+
+ public boolean isOptional() {
+ return this.optional == null ? false : optional;
+ }
+
+
/**
* @return a new {@link FieldType#INT8} SchemaBuilder
*/
@@ -51,6 +98,13 @@ public static SchemaBuilder int8() {
return new SchemaBuilder(FieldType.INT8);
}
+ /**
+ * @return a new {@link FieldType#INT16} SchemaBuilder
+ */
+ public static SchemaBuilder int16() {
+ return new SchemaBuilder(FieldType.INT16);
+ }
+
/**
* @return a new {@link FieldType#INT32} SchemaBuilder
*/
@@ -109,24 +163,155 @@ public static SchemaBuilder struct() {
return new SchemaBuilder(FieldType.STRUCT);
}
+
+ // Maps & Arrays
+
+ /**
+ * @param valueSchema the schema for elements of the array
+ */
+ public static SchemaBuilder array(Schema valueSchema) {
+ if (null == valueSchema) {
+ throw new SchemaBuilderException("valueSchema cannot be null.");
+ }
+ SchemaBuilder builder = new SchemaBuilder(FieldType.ARRAY);
+ builder.valueSchema = valueSchema;
+ return builder;
+ }
+
+ /**
+ * @param keySchema the schema for keys in the map
+ * @param valueSchema the schema for values in the map
+ */
+ public static SchemaBuilder map(Schema keySchema, Schema valueSchema) {
+ if (null == keySchema) {
+ throw new SchemaBuilderException("keySchema cannot be null.");
+ }
+ if (null == valueSchema) {
+ throw new SchemaBuilderException("valueSchema cannot be null.");
+ }
+ SchemaBuilder builder = new SchemaBuilder(FieldType.MAP);
+ builder.keySchema = keySchema;
+ builder.valueSchema = valueSchema;
+ return builder;
+ }
+
+ /**
+ * Add a field to this struct schemaļ¼ensure sequence
+ */
+ public SchemaBuilder field(String fieldName, Schema fieldSchema) {
+ if (type != FieldType.STRUCT) {
+ throw new SchemaBuilderException("Cannot create fields on type " + type);
+ }
+ if (null == fieldName || fieldName.isEmpty()) {
+ throw new SchemaBuilderException("fieldName cannot be null.");
+ }
+ if (null == fieldSchema) {
+ throw new SchemaBuilderException("fieldSchema for field " + fieldName + " cannot be null.");
+ }
+ int fieldIndex = fields.size();
+ if (fields.containsKey(fieldName)) {
+ throw new SchemaBuilderException("Cannot create field because of field name duplication " + fieldName);
+ }
+ fields.put(fieldName, new Field(fieldIndex, fieldName, fieldSchema));
+ return this;
+ }
+
+
+ /**
+ * Get the list of fields for this Schema. Throws a DataException if this schema is not a struct.
+ */
+ public List fields() {
+ if (type != FieldType.STRUCT) {
+ throw new ConnectException("Cannot list fields on non-struct type");
+ }
+ return new ArrayList<>(fields.values());
+ }
+
+ /**
+ * get field by fieldName
+ *
+ * @param fieldName
+ * @return
+ */
+ public Field field(String fieldName) {
+ if (type != FieldType.STRUCT) {
+ throw new ConnectException("Cannot look up fields on non-struct type");
+ }
+ return fields.get(fieldName);
+ }
+
/**
* Sets the schema name
*
* @param name schema name
- *
* @return the current SchemaBuilder instance
*/
public SchemaBuilder name(String name) {
+ checkCanSet(NAME_FIELD, this.name, name);
this.name = name;
return this;
}
+ /**
+ * Set the version of this schema. Schema versions are integers which, if provided, must indicate which schema is
+ * newer and which is older by their ordering.
+ */
+ public SchemaBuilder version(Integer version) {
+ checkCanSet(VERSION_FIELD, this.version, version);
+ this.version = version;
+ return this;
+ }
+
+ /**
+ * Set the documentation for this schema.
+ */
+ public SchemaBuilder doc(String doc) {
+ checkCanSet(DOC_FIELD, optional, true);
+ this.doc = doc;
+ return this;
+ }
+
+ /**
+ * Set this schema as optional.
+ */
+ public SchemaBuilder optional() {
+ checkCanSet(OPTIONAL_FIELD, optional, true);
+ optional = true;
+ return this;
+ }
+
+ /**
+ * Set the default value for this schema. The value is validated against the schema type, throwing a
+ * {@link SchemaBuilderException} if it does not match.
+ */
+ public SchemaBuilder defaultValue(Object value) {
+ checkCanSet(DEFAULT_FIELD, defaultValue, value);
+ checkNotNull(TYPE_FIELD, type, DEFAULT_FIELD);
+ defaultValue = value;
+ return this;
+ }
+
+
+ private static void checkCanSet(String fieldName, Object fieldVal, Object val) {
+ if (fieldVal != null && fieldVal != val) {
+ throw new ConnectException("Invalid SchemaBuilder call: " + fieldName + " has already been set.");
+ }
+ }
+
+
+ private static void checkNotNull(String fieldName, Object val, String fieldToSet) {
+ if (val == null) {
+ throw new SchemaBuilderException("Invalid SchemaBuilder call: " + fieldName + " must be specified to set " + fieldToSet);
+ }
+ }
+
/**
* Build the Schema using the current settings
*
* @return the {@link Schema}
*/
public Schema build() {
- return new Schema(name, type, fields);
+ return new Schema(name, type, isOptional(), defaultValue, version, doc,
+ fields == null ? null : Collections.unmodifiableList(new ArrayList<>(fields.values())), keySchema, valueSchema);
}
}
diff --git a/connector/src/main/java/io/openmessaging/connector/api/data/Struct.java b/connector/src/main/java/io/openmessaging/connector/api/data/Struct.java
new file mode 100644
index 0000000..5710ef5
--- /dev/null
+++ b/connector/src/main/java/io/openmessaging/connector/api/data/Struct.java
@@ -0,0 +1,261 @@
+
+package io.openmessaging.connector.api.data;
+
+import io.openmessaging.connector.api.errors.ConnectException;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * struct object model
+ */
+public class Struct {
+
+ private final Schema schema;
+ private final Object[] values;
+
+ /**
+ * construct
+ * @param schema
+ */
+ public Struct(Schema schema) {
+ if (schema.getFieldType() != FieldType.STRUCT) {
+ throw new ConnectException("Not a struct schema: " + schema);
+ }
+ this.schema = schema;
+ this.values = new Object[schema.getFields().size()];
+ }
+
+ /**
+ * get schema
+ * @return
+ */
+ public Schema schema() {
+ return schema;
+ }
+
+ /**
+ * get data by fieldName
+ * @param fieldName
+ * @return
+ */
+ public Object get(String fieldName) {
+ Field field = lookupField(fieldName);
+ return get(field);
+ }
+
+ /**
+ * get data by Field object
+ * @param field
+ * @return
+ */
+ public Object get(Field field) {
+ Object val = values[field.getIndex()];
+ if (val == null && field.getSchema().defaultValue() != null) {
+ val = field.getSchema().defaultValue();
+ }
+ return val;
+ }
+
+
+ public Object getWithoutDefault(String fieldName) {
+ Field field = lookupField(fieldName);
+ return values[field.getIndex()];
+ }
+
+ /**
+ * cast the result to a Byte.
+ * @param fieldName
+ * @return
+ */
+ public Byte getInt8(String fieldName) {
+ return (Byte) getCheckType(fieldName, FieldType.INT8);
+ }
+
+ /**
+ * cast the result to a Integer.
+ * @param fieldName
+ * @return
+ */
+ public Integer getInt32(String fieldName) {
+ return (Integer) getCheckType(fieldName, FieldType.INT32);
+ }
+
+ /**
+ * cast the result to a Long.
+ * @param fieldName
+ * @return
+ */
+ public Long getInt64(String fieldName) {
+ return (Long) getCheckType(fieldName, FieldType.INT64);
+ }
+
+ /**
+ * cast the result to a Float.
+ * @param fieldName
+ * @return
+ */
+ public Float getFloat32(String fieldName) {
+ return (Float) getCheckType(fieldName, FieldType.FLOAT32);
+ }
+
+ /**
+ * cast the result to a Double.
+ * @param fieldName
+ * @return
+ */
+ public Double getFloat64(String fieldName) {
+ return (Double) getCheckType(fieldName, FieldType.FLOAT64);
+ }
+
+ /**
+ * cast the result to a Boolean.
+ * @param fieldName
+ * @return
+ */
+ public Boolean getBoolean(String fieldName) {
+ return (Boolean) getCheckType(fieldName, FieldType.BOOLEAN);
+ }
+
+ /**
+ * cast the result to a String.
+ * @param fieldName
+ * @return
+ */
+ public String getString(String fieldName) {
+ return (String) getCheckType(fieldName, FieldType.STRING);
+ }
+
+ /**
+ * cast the result to a byte[].
+ * @param fieldName
+ * @return
+ */
+ public byte[] getBytes(String fieldName) {
+ Object bytes = getCheckType(fieldName, FieldType.BYTES);
+ if (bytes instanceof ByteBuffer) {
+ return ((ByteBuffer) bytes).array();
+ }
+ return (byte[]) bytes;
+ }
+
+ /**
+ * cast the result to a List.
+ * @param fieldName
+ * @param
+ * @return
+ */
+ public List getArray(String fieldName) {
+ return (List) getCheckType(fieldName, FieldType.ARRAY);
+ }
+
+ /**
+ * cast the result to a Map.
+ */
+ public Map getMap(String fieldName) {
+ return (Map) getCheckType(fieldName, FieldType.MAP);
+ }
+
+ /**
+ * cast the result to a Struct.
+ */
+ public Struct getStruct(String fieldName) {
+ return (Struct) getCheckType(fieldName, FieldType.STRUCT);
+ }
+
+
+ /**
+ * Set value by fieldName
+ * @param fieldName
+ * @param value
+ * @return
+ */
+ public Struct put(String fieldName, Object value) {
+ Field field = lookupField(fieldName);
+ return put(field, value);
+ }
+
+ /**
+ * set data
+ * @param field
+ * @param value
+ * @return
+ */
+ public Struct put(Field field, Object value) {
+ if (null == field) {
+ throw new ConnectException("field cannot be null.");
+ }
+ Schema.validateValue(field.getName(), field.getSchema(), value);
+ values[field.getIndex()] = value;
+ return this;
+ }
+
+
+ /**
+ * Validates that this struct has filled in all the necessary data with valid values
+ */
+ public void validate() {
+ for (Field field : schema.getFields()) {
+ Schema fieldSchema = field.getSchema();
+ Object value = values[field.getIndex()];
+ if (value == null && (fieldSchema.isOptional() || fieldSchema.defaultValue() != null)) {
+ continue;
+ }
+ Schema.validateValue(field.getName(), fieldSchema, value);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Struct struct = (Struct) o;
+ return Objects.equals(schema, struct.schema) &&
+ Arrays.deepEquals(values, struct.values);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(schema, Arrays.deepHashCode(values));
+ }
+
+ private Field lookupField(String fieldName) {
+ Field field = schema.getField(fieldName);
+ if (field == null) {
+ throw new ConnectException(fieldName + " is not a valid field name");
+ }
+ return field;
+ }
+
+ private Object getCheckType(String fieldName, FieldType type) {
+ Field field = lookupField(fieldName);
+ if (field.getSchema().getFieldType() != type) {
+ throw new ConnectException("Field '" + fieldName + "' is not of type " + type);
+ }
+ return values[field.getIndex()];
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("Struct{");
+ boolean first = true;
+ for (int i = 0; i < values.length; i++) {
+ final Object value = values[i];
+ if (value != null) {
+ final Field field = schema.getFields().get(i);
+ if (first) {
+ first = false;
+ } else {
+ sb.append(",");
+ }
+ sb.append(field.getName()).append("=").append(value);
+ }
+ }
+ return sb.append("}").toString();
+ }
+
+}
+
diff --git a/connector/src/main/java/io/openmessaging/connector/api/errors/SchemaBuilderException.java b/connector/src/main/java/io/openmessaging/connector/api/errors/SchemaBuilderException.java
new file mode 100644
index 0000000..5e88168
--- /dev/null
+++ b/connector/src/main/java/io/openmessaging/connector/api/errors/SchemaBuilderException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.connector.api.errors;
+
+public class SchemaBuilderException extends ConnectException {
+ public SchemaBuilderException(String s) {
+ super(s);
+ }
+
+ public SchemaBuilderException(String s, Throwable throwable) {
+ super(s, throwable);
+ }
+
+ public SchemaBuilderException(Throwable throwable) {
+ super(throwable);
+ }
+}
From c72659efc30c05c29b98d669e1cd1ab60811ac75 Mon Sep 17 00:00:00 2001
From: sunxiaojian
Date: Tue, 24 May 2022 21:54:08 +0800
Subject: [PATCH 05/22] add offset storage writer #41
---
.../api/storage/OffsetStorageWriter.java | 45 +++++++++++++++++++
1 file changed, 45 insertions(+)
create mode 100644 connector/src/main/java/io/openmessaging/connector/api/storage/OffsetStorageWriter.java
diff --git a/connector/src/main/java/io/openmessaging/connector/api/storage/OffsetStorageWriter.java b/connector/src/main/java/io/openmessaging/connector/api/storage/OffsetStorageWriter.java
new file mode 100644
index 0000000..f41ef0b
--- /dev/null
+++ b/connector/src/main/java/io/openmessaging/connector/api/storage/OffsetStorageWriter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package io.openmessaging.connector.api.storage;
+
+
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+
+import java.util.Map;
+
+/**
+ * position storage writer
+ */
+public interface OffsetStorageWriter {
+
+ /**
+ * write offset
+ * @param partition
+ * @param position
+ */
+ void writeOffset(RecordPartition partition, RecordOffset position);
+
+
+ /**
+ * write offsets
+ * @param positions
+ */
+ void writeOffset(Map positions);
+}
From e6deeb8148f44e6e4c45fc852f8967b180ba8a34 Mon Sep 17 00:00:00 2001
From: sunxiaojian
Date: Sat, 4 Jun 2022 12:12:08 +0800
Subject: [PATCH 06/22] add getter and setter method #41
---
.../connector/api/data/Schema.java | 62 +++++++++++++------
.../connector/api/data/SchemaBuilder.java | 2 +-
.../connector/api/data/Struct.java | 29 +++++++--
3 files changed, 69 insertions(+), 24 deletions(-)
diff --git a/connector/src/main/java/io/openmessaging/connector/api/data/Schema.java b/connector/src/main/java/io/openmessaging/connector/api/data/Schema.java
index f4174ea..8858715 100644
--- a/connector/src/main/java/io/openmessaging/connector/api/data/Schema.java
+++ b/connector/src/main/java/io/openmessaging/connector/api/data/Schema.java
@@ -123,36 +123,62 @@ public void addField(Field field) {
this.fieldsByName.put(field.getName(), field);
}
- public Schema keySchema() {
- return keySchema;
+
+ public Integer getVersion() {
+ return version;
}
- public Schema valueSchema() {
- return valueSchema;
+
+ public void setVersion(Integer version) {
+ this.version = version;
}
public boolean isOptional() {
return optional;
}
- public Object defaultValue(){
+ public void setOptional(boolean optional) {
+ this.optional = optional;
+ }
+
+ public Object getDefaultValue() {
return defaultValue;
}
- /**
- * Get the optional version of the schema. If a version is included, newer versions *must* be larger than older ones.
- * @return the version of this schema
- */
- public Integer version(){
- return version;
+ public void setDefaultValue(Object defaultValue) {
+ this.defaultValue = defaultValue;
}
- /**
- * @return the documentation for this schema
- */
- public String doc() {
+ public String getDoc() {
return doc;
}
+ public void setDoc(String doc) {
+ this.doc = doc;
+ }
+
+ public Map getFieldsByName() {
+ return fieldsByName;
+ }
+
+ public void setFieldsByName(Map fieldsByName) {
+ this.fieldsByName = fieldsByName;
+ }
+
+ public Schema getKeySchema() {
+ return keySchema;
+ }
+
+ public void setKeySchema(Schema keySchema) {
+ this.keySchema = keySchema;
+ }
+
+ public Schema getValueSchema() {
+ return valueSchema;
+ }
+
+ public void setValueSchema(Schema valueSchema) {
+ this.valueSchema = valueSchema;
+ }
public String getName() {
return name;
@@ -239,14 +265,14 @@ public static void validateValue(String name, Schema schema, Object value) {
case ARRAY:
List> array = (List>) value;
for (Object entry : array) {
- validateValue(schema.valueSchema(), entry);
+ validateValue(schema.getValueSchema(), entry);
}
break;
case MAP:
Map, ?> map = (Map, ?>) value;
for (Map.Entry, ?> entry : map.entrySet()) {
- validateValue(schema.keySchema(), entry.getKey());
- validateValue(schema.valueSchema(), entry.getValue());
+ validateValue(schema.getKeySchema(), entry.getKey());
+ validateValue(schema.getValueSchema(), entry.getValue());
}
break;
}
diff --git a/connector/src/main/java/io/openmessaging/connector/api/data/SchemaBuilder.java b/connector/src/main/java/io/openmessaging/connector/api/data/SchemaBuilder.java
index 786b6c6..4f0f468 100644
--- a/connector/src/main/java/io/openmessaging/connector/api/data/SchemaBuilder.java
+++ b/connector/src/main/java/io/openmessaging/connector/api/data/SchemaBuilder.java
@@ -87,7 +87,7 @@ public SchemaBuilder(FieldType type) {
public boolean isOptional() {
- return this.optional == null ? false : optional;
+ return this.optional == null ? true : optional;
}
diff --git a/connector/src/main/java/io/openmessaging/connector/api/data/Struct.java b/connector/src/main/java/io/openmessaging/connector/api/data/Struct.java
index 5710ef5..a32d275 100644
--- a/connector/src/main/java/io/openmessaging/connector/api/data/Struct.java
+++ b/connector/src/main/java/io/openmessaging/connector/api/data/Struct.java
@@ -14,8 +14,24 @@
*/
public class Struct {
- private final Schema schema;
- private final Object[] values;
+ private Schema schema;
+ private Object[] values;
+
+ public Schema getSchema() {
+ return schema;
+ }
+
+ public void setSchema(Schema schema) {
+ this.schema = schema;
+ }
+
+ public Object[] getValues() {
+ return values;
+ }
+
+ public void setValues(Object[] values) {
+ this.values = values;
+ }
/**
* construct
@@ -29,6 +45,9 @@ public Struct(Schema schema) {
this.values = new Object[schema.getFields().size()];
}
+
+
+
/**
* get schema
* @return
@@ -54,8 +73,8 @@ public Object get(String fieldName) {
*/
public Object get(Field field) {
Object val = values[field.getIndex()];
- if (val == null && field.getSchema().defaultValue() != null) {
- val = field.getSchema().defaultValue();
+ if (val == null && field.getSchema().getDefaultValue() != null) {
+ val = field.getSchema().getDefaultValue();
}
return val;
}
@@ -201,7 +220,7 @@ public void validate() {
for (Field field : schema.getFields()) {
Schema fieldSchema = field.getSchema();
Object value = values[field.getIndex()];
- if (value == null && (fieldSchema.isOptional() || fieldSchema.defaultValue() != null)) {
+ if (value == null && (fieldSchema.isOptional() || fieldSchema.getDefaultValue() != null)) {
continue;
}
Schema.validateValue(field.getName(), fieldSchema, value);
From aa9f315c846cc878ff8dfa673b9ad40777aa0df8 Mon Sep 17 00:00:00 2001
From: sunxiaojian
Date: Sat, 4 Jun 2022 13:33:51 +0800
Subject: [PATCH 07/22] add SchemaAndValue #41
---
.../connector/api/data/SchemaAndValue.java | 64 +++++++++++++++++++
1 file changed, 64 insertions(+)
create mode 100644 connector/src/main/java/io/openmessaging/connector/api/data/SchemaAndValue.java
diff --git a/connector/src/main/java/io/openmessaging/connector/api/data/SchemaAndValue.java b/connector/src/main/java/io/openmessaging/connector/api/data/SchemaAndValue.java
new file mode 100644
index 0000000..8b05436
--- /dev/null
+++ b/connector/src/main/java/io/openmessaging/connector/api/data/SchemaAndValue.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.connector.api.data;
+
+import java.util.Objects;
+
+/**
+ * schema and value
+ */
+public class SchemaAndValue {
+ private final Schema schema;
+ private final Object value;
+
+ public static final SchemaAndValue NULL = new SchemaAndValue(null, null);
+
+ public SchemaAndValue(Schema schema, Object value) {
+ this.value = value;
+ this.schema = schema;
+ }
+
+ public Schema schema() {
+ return schema;
+ }
+
+ public Object value() {
+ return value;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ SchemaAndValue that = (SchemaAndValue) o;
+ return Objects.equals(schema, that.schema) &&
+ Objects.equals(value, that.value);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(schema, value);
+ }
+
+ @Override
+ public String toString() {
+ return "SchemaAndValue{" +
+ "schema=" + schema +
+ ", value=" + value +
+ '}';
+ }
+}
From 23ed62a077ce63cebe7b7561a9ee4eb0621f1e49 Mon Sep 17 00:00:00 2001
From: sunxiaojian
Date: Sat, 4 Jun 2022 17:13:20 +0800
Subject: [PATCH 08/22] add logical type #41
---
.../connector/api/data/FieldType.java | 38 -----
.../connector/api/data/Schema.java | 149 ++++++++++++++----
.../connector/api/data/SchemaBuilder.java | 80 +++++++++-
.../connector/api/data/logical/Date.java | 74 +++++++++
.../connector/api/data/logical/Decimal.java | 82 ++++++++++
.../connector/api/data/logical/Time.java | 81 ++++++++++
.../connector/api/data/logical/Timestamp.java | 59 +++++++
7 files changed, 496 insertions(+), 67 deletions(-)
create mode 100644 connector/src/main/java/io/openmessaging/connector/api/data/logical/Date.java
create mode 100644 connector/src/main/java/io/openmessaging/connector/api/data/logical/Decimal.java
create mode 100644 connector/src/main/java/io/openmessaging/connector/api/data/logical/Time.java
create mode 100644 connector/src/main/java/io/openmessaging/connector/api/data/logical/Timestamp.java
diff --git a/connector/src/main/java/io/openmessaging/connector/api/data/FieldType.java b/connector/src/main/java/io/openmessaging/connector/api/data/FieldType.java
index 0ff6467..71e169d 100644
--- a/connector/src/main/java/io/openmessaging/connector/api/data/FieldType.java
+++ b/connector/src/main/java/io/openmessaging/connector/api/data/FieldType.java
@@ -14,14 +14,6 @@
package io.openmessaging.connector.api.data;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
/**
* Define the field type.
*/
@@ -92,36 +84,6 @@ public enum FieldType {
*/
STRUCT;
- /**
- * Maps Schema.Types to a list of Java classes that can be used to represent them.
- */
- public static final Map> SCHEMA_TYPE_CLASSES = new EnumMap<>(FieldType.class);
- /**
- * Maps the Java classes to the corresponding Schema.Type.
- */
- public static final Map, FieldType> JAVA_CLASS_SCHEMA_TYPES = new HashMap<>();
-
- static {
- SCHEMA_TYPE_CLASSES.put(FieldType.INT8, Collections.singletonList((Class) Byte.class));
- SCHEMA_TYPE_CLASSES.put(FieldType.INT16, Collections.singletonList((Class) Short.class));
- SCHEMA_TYPE_CLASSES.put(FieldType.INT32, Collections.singletonList((Class) Integer.class));
- SCHEMA_TYPE_CLASSES.put(FieldType.INT64, Collections.singletonList((Class) Long.class));
- SCHEMA_TYPE_CLASSES.put(FieldType.FLOAT32, Collections.singletonList((Class) Float.class));
- SCHEMA_TYPE_CLASSES.put(FieldType.FLOAT64, Collections.singletonList((Class) Double.class));
- SCHEMA_TYPE_CLASSES.put(FieldType.BOOLEAN, Collections.singletonList((Class) Boolean.class));
- SCHEMA_TYPE_CLASSES.put(FieldType.STRING, Collections.singletonList((Class) String.class));
- SCHEMA_TYPE_CLASSES.put(FieldType.BYTES, Arrays.asList((Class) byte[].class, (Class) ByteBuffer.class));
- SCHEMA_TYPE_CLASSES.put(FieldType.ARRAY, Collections.singletonList((Class) List.class));
- SCHEMA_TYPE_CLASSES.put(FieldType.MAP, Collections.singletonList((Class) Map.class));
- SCHEMA_TYPE_CLASSES.put(FieldType.STRUCT, Collections.singletonList((Class) Struct.class));
-
- for (Map.Entry> schemaClasses : SCHEMA_TYPE_CLASSES.entrySet()) {
- for (Class> schemaClass : schemaClasses.getValue()) {
- JAVA_CLASS_SCHEMA_TYPES.put(schemaClass, schemaClasses.getKey());
- }
- }
- }
-
/**
* Determine whether it is a basic type
*
diff --git a/connector/src/main/java/io/openmessaging/connector/api/data/Schema.java b/connector/src/main/java/io/openmessaging/connector/api/data/Schema.java
index 8858715..deac30d 100644
--- a/connector/src/main/java/io/openmessaging/connector/api/data/Schema.java
+++ b/connector/src/main/java/io/openmessaging/connector/api/data/Schema.java
@@ -14,18 +14,71 @@
package io.openmessaging.connector.api.data;
+import io.openmessaging.connector.api.data.logical.Date;
+import io.openmessaging.connector.api.data.logical.Decimal;
+import io.openmessaging.connector.api.data.logical.Time;
+import io.openmessaging.connector.api.data.logical.Timestamp;
import io.openmessaging.connector.api.errors.ConnectException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.EnumMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
/**
* Schema
*/
public class Schema {
+ /**
+ * Maps Schema.Types to a list of Java classes that can be used to represent them.
+ */
+ public static final Map> SCHEMA_TYPE_CLASSES = new EnumMap<>(FieldType.class);
+
+ /**
+ * Maps known logical types to a list of Java classes that can be used to represent them.
+ */
+ private static final Map> LOGICAL_TYPE_CLASSES = new HashMap<>();
+
+ /**
+ * Maps the Java classes to the corresponding Schema.Type.
+ */
+ public static final Map, FieldType> JAVA_CLASS_SCHEMA_TYPES = new HashMap<>();
+
+ static {
+ SCHEMA_TYPE_CLASSES.put(FieldType.INT8, Collections.singletonList((Class) Byte.class));
+ SCHEMA_TYPE_CLASSES.put(FieldType.INT16, Collections.singletonList((Class) Short.class));
+ SCHEMA_TYPE_CLASSES.put(FieldType.INT32, Collections.singletonList((Class) Integer.class));
+ SCHEMA_TYPE_CLASSES.put(FieldType.INT64, Collections.singletonList((Class) Long.class));
+ SCHEMA_TYPE_CLASSES.put(FieldType.FLOAT32, Collections.singletonList((Class) Float.class));
+ SCHEMA_TYPE_CLASSES.put(FieldType.FLOAT64, Collections.singletonList((Class) Double.class));
+ SCHEMA_TYPE_CLASSES.put(FieldType.BOOLEAN, Collections.singletonList((Class) Boolean.class));
+ SCHEMA_TYPE_CLASSES.put(FieldType.STRING, Collections.singletonList((Class) String.class));
+ SCHEMA_TYPE_CLASSES.put(FieldType.BYTES, Arrays.asList((Class) byte[].class, (Class) ByteBuffer.class));
+ SCHEMA_TYPE_CLASSES.put(FieldType.ARRAY, Collections.singletonList((Class) List.class));
+ SCHEMA_TYPE_CLASSES.put(FieldType.MAP, Collections.singletonList((Class) Map.class));
+ SCHEMA_TYPE_CLASSES.put(FieldType.STRUCT, Collections.singletonList((Class) Struct.class));
+
+ for (Map.Entry> schemaClasses : SCHEMA_TYPE_CLASSES.entrySet()) {
+ for (Class> schemaClass : schemaClasses.getValue()) {
+ JAVA_CLASS_SCHEMA_TYPES.put(schemaClass, schemaClasses.getKey());
+ }
+ }
+
+ LOGICAL_TYPE_CLASSES.put(Decimal.LOGICAL_NAME, Collections.singletonList((Class) BigDecimal.class));
+ LOGICAL_TYPE_CLASSES.put(Date.LOGICAL_NAME, Collections.singletonList((Class) java.util.Date.class));
+ LOGICAL_TYPE_CLASSES.put(Time.LOGICAL_NAME, Collections.singletonList((Class) java.util.Date.class));
+ LOGICAL_TYPE_CLASSES.put(Timestamp.LOGICAL_NAME, Collections.singletonList((Class) java.util.Date.class));
+
+ }
+
+
/**
* Name of the schema.
*/
@@ -55,7 +108,7 @@ public class Schema {
*/
private List fields;
private Map fieldsByName;
-
+ private Map parameters;
/**
* map type
*/
@@ -65,13 +118,14 @@ public class Schema {
/**
* Construct a Schema. Most users should not construct schemas manually, preferring {@link SchemaBuilder} instead.
*/
- public Schema(String name,FieldType fieldType, boolean optional, Object defaultValue, Integer version, String doc, List fields,Schema keySchema, Schema valueSchema) {
+ public Schema(String name,FieldType fieldType, boolean optional, Object defaultValue, Integer version, String doc, List fields,Schema keySchema, Schema valueSchema, Map parameters) {
this.fieldType = fieldType;
this.optional = optional;
this.defaultValue = defaultValue;
this.name = name;
this.version = version;
this.doc = doc;
+ this.parameters = parameters;
if (this.fieldType == FieldType.STRUCT) {
this.fields = fields == null ? Collections.emptyList() : fields;
this.fieldsByName = new HashMap<>(this.fields.size());
@@ -91,7 +145,7 @@ public Schema(String name,FieldType fieldType, boolean optional, Object defaultV
* Construct a Schema. Most users should not construct schemas manually, preferring {@link SchemaBuilder} instead.
*/
public Schema(String name,FieldType fieldType, boolean optional, Object defaultValue, Integer version, List fields) {
- this(name, fieldType, optional, defaultValue, version, null, fields, null, null);
+ this(name, fieldType, optional, defaultValue, version, null, fields, null, null,new ConcurrentHashMap<>());
}
@@ -102,28 +156,7 @@ public Schema(String name, FieldType fieldType, List fields) {
this(name, fieldType, false, null, null, fields);
}
- /**
- * get field
- * @param fieldName
- * @return
- */
- public Field getField(String fieldName) {
- if (fieldsByName.containsKey(fieldName)){
- return fieldsByName.get(fieldName);
- }
- return null;
- }
-
- /**
- * add field
- * @param field
- */
- public void addField(Field field) {
- this.fields.add(field);
- this.fieldsByName.put(field.getName(), field);
- }
-
-
+ // start getter and setter
public Integer getVersion() {
return version;
}
@@ -204,6 +237,37 @@ public void setFields(List fields) {
this.fields = fields;
}
+ public Map getParameters() {
+ return parameters;
+ }
+
+ public void setParameters(Map parameters) {
+ this.parameters = parameters;
+ }
+
+ // end getter and setter
+
+ /**
+ * get field
+ * @param fieldName
+ * @return
+ */
+ public Field getField(String fieldName) {
+ if (fieldsByName.containsKey(fieldName)){
+ return fieldsByName.get(fieldName);
+ }
+ return null;
+ }
+
+ /**
+ * add field
+ * @param field
+ */
+ public void addField(Field field) {
+ this.fields.add(field);
+ this.fieldsByName.put(field.getName(), field);
+ }
+
/**
* Validate that the value can be used for this schema
*/
@@ -217,7 +281,6 @@ public static void validateValue(Schema schema, Object value) {
validateValue(null, schema, value);
}
public static void validateValue(String name, Schema schema, Object value) {
-
// check optional
if (value == null) {
if (!schema.isOptional()) {
@@ -229,7 +292,6 @@ public static void validateValue(String name, Schema schema, Object value) {
// check field type
List expectedClasses = expectedClassesFor(schema);
-
if (expectedClasses == null) {
throw new ConnectException("Invalid Java object for schema type " + schema.getFieldType()
+ ": " + value.getClass()
@@ -284,9 +346,38 @@ public static void validateValue(String name, Schema schema, Object value) {
* @return
*/
private static List expectedClassesFor(Schema schema) {
- return FieldType.SCHEMA_TYPE_CLASSES.get(schema.getFieldType());
+ List expectedClasses = LOGICAL_TYPE_CLASSES.get(schema.getName());
+ if (expectedClasses == null) {
+ expectedClasses = SCHEMA_TYPE_CLASSES.getOrDefault(schema.getFieldType(), Collections.emptyList());
+ }
+ return expectedClasses;
+ }
+
+
+ /**
+ * Get the type associated with the given class.
+ * @param klass
+ * @return
+ */
+ public static FieldType schemaType(Class> klass) {
+ synchronized (JAVA_CLASS_SCHEMA_TYPES) {
+ FieldType schemaType = JAVA_CLASS_SCHEMA_TYPES.get(klass);
+ if (Objects.nonNull(schemaType)) {
+ return schemaType;
+ }
+ for (Map.Entry, FieldType> entry : JAVA_CLASS_SCHEMA_TYPES.entrySet()) {
+ try {
+ klass.asSubclass(entry.getKey());
+ JAVA_CLASS_SCHEMA_TYPES.put(klass, entry.getValue());
+ return entry.getValue();
+ } catch (ClassCastException e) {
+ }
+ }
+ }
+ return null;
}
+
@Override
public String toString() {
return "Schema{" +
@@ -297,6 +388,8 @@ public String toString() {
", doc='" + doc + '\'' +
", fieldType=" + fieldType +
", fields=" + fields +
+ ", fieldsByName=" + fieldsByName +
+ ", parameters=" + parameters +
", keySchema=" + keySchema +
", valueSchema=" + valueSchema +
'}';
diff --git a/connector/src/main/java/io/openmessaging/connector/api/data/SchemaBuilder.java b/connector/src/main/java/io/openmessaging/connector/api/data/SchemaBuilder.java
index 4f0f468..1b92a3e 100644
--- a/connector/src/main/java/io/openmessaging/connector/api/data/SchemaBuilder.java
+++ b/connector/src/main/java/io/openmessaging/connector/api/data/SchemaBuilder.java
@@ -14,6 +14,9 @@
package io.openmessaging.connector.api.data;
+import io.openmessaging.connector.api.data.logical.Date;
+import io.openmessaging.connector.api.data.logical.Decimal;
+import io.openmessaging.connector.api.data.logical.Timestamp;
import io.openmessaging.connector.api.errors.ConnectException;
import io.openmessaging.connector.api.errors.SchemaBuilderException;
@@ -64,6 +67,7 @@ public class SchemaBuilder {
* Structure of the schema, contains a list of {@link Field}
*/
private Map fields;
+ private Map parameters;
/**
* map type
@@ -195,6 +199,42 @@ public static SchemaBuilder map(Schema keySchema, Schema valueSchema) {
return builder;
}
+
+ /**
+ * date
+ * @return
+ */
+ public static SchemaBuilder date() {
+ return Date.builder();
+ }
+
+ /**
+ * time
+ * @return
+ */
+ public static SchemaBuilder time() {
+ return Timestamp.builder();
+ }
+
+ /**
+ * timestamp
+ * @return
+ */
+ public static SchemaBuilder timestamp() {
+ return Timestamp.builder();
+ }
+
+ /**
+ * decimal
+ * @param scale
+ * @return
+ */
+ public static SchemaBuilder decimal(int scale) {
+ return Decimal.builder(scale);
+ }
+
+
+
/**
* Add a field to this struct schemaļ¼ensure sequence
*/
@@ -280,6 +320,44 @@ public SchemaBuilder optional() {
return this;
}
+ public Map parameters() {
+ return parameters == null ? null : Collections.unmodifiableMap(parameters);
+ }
+
+
+ /**
+ * Set a schema parameter
+ * @param propertyName
+ * @param propertyValue
+ * @return
+ */
+ public SchemaBuilder parameter(String propertyName, String propertyValue) {
+ // Preserve order of insertion with a LinkedHashMap. This isn't strictly necessary, but is nice if logical types
+ // can print their properties in a consistent order.
+ if (parameters == null) {
+ parameters = new LinkedHashMap<>();
+ }
+ parameters.put(propertyName, propertyValue);
+ return this;
+ }
+
+ /**
+ * Set schema parameters
+ * @param props
+ * @return
+ */
+ public SchemaBuilder parameters(Map props) {
+ // Avoid creating an empty set of properties so we never have an empty map
+ if (props.isEmpty()) {
+ return this;
+ }
+ if (parameters == null) {
+ parameters = new LinkedHashMap<>();
+ }
+ parameters.putAll(props);
+ return this;
+ }
+
/**
* Set the default value for this schema. The value is validated against the schema type, throwing a
* {@link SchemaBuilderException} if it does not match.
@@ -312,6 +390,6 @@ private static void checkNotNull(String fieldName, Object val, String fieldToSet
*/
public Schema build() {
return new Schema(name, type, isOptional(), defaultValue, version, doc,
- fields == null ? null : Collections.unmodifiableList(new ArrayList<>(fields.values())), keySchema, valueSchema);
+ fields == null ? null : Collections.unmodifiableList(new ArrayList<>(fields.values())), keySchema, valueSchema, parameters);
}
}
diff --git a/connector/src/main/java/io/openmessaging/connector/api/data/logical/Date.java b/connector/src/main/java/io/openmessaging/connector/api/data/logical/Date.java
new file mode 100644
index 0000000..0908473
--- /dev/null
+++ b/connector/src/main/java/io/openmessaging/connector/api/data/logical/Date.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.connector.api.data.logical;
+
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.errors.ConnectException;
+
+import java.util.Calendar;
+import java.util.TimeZone;
+
+/**
+ * A date representing a calendar day with no time of day or timezone.
+ **/
+public class Date {
+ public static final String LOGICAL_NAME = "io.openmessaging.connector.api.data.logical.Date";
+
+ private static final long MILLIS_PER_DAY = 24 * 60 * 60 * 1000;
+
+ private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
+
+
+ /**
+ * build schema
+ * @return
+ */
+ public static SchemaBuilder builder() {
+ return SchemaBuilder.int32()
+ .name(LOGICAL_NAME)
+ .version(1);
+ }
+
+ public static final Schema SCHEMA = builder().build();
+
+ /**
+ * Convert a value from its logical format (Date) to it's encoded format.
+ * @param value the logical value
+ * @return the encoded value
+ */
+ public static int fromLogical(Schema schema, java.util.Date value) {
+ if (!(LOGICAL_NAME.equals(schema.getName()))) {
+ throw new ConnectException("Requested conversion of Date object but the schema does not match.");
+ }
+ Calendar calendar = Calendar.getInstance(UTC);
+ calendar.setTime(value);
+ if (calendar.get(Calendar.HOUR_OF_DAY) != 0 || calendar.get(Calendar.MINUTE) != 0 ||
+ calendar.get(Calendar.SECOND) != 0 || calendar.get(Calendar.MILLISECOND) != 0) {
+ throw new ConnectException("Kafka Connect Date type should not have any time fields set to non-zero values.");
+ }
+ long unixMillis = calendar.getTimeInMillis();
+ return (int) (unixMillis / MILLIS_PER_DAY);
+ }
+
+ public static java.util.Date toLogical(Schema schema, int value) {
+ if (!(LOGICAL_NAME.equals(schema.getName()))) {
+ throw new ConnectException("Requested conversion of Date object but the schema does not match.");
+ }
+ return new java.util.Date(value * MILLIS_PER_DAY);
+ }
+}
diff --git a/connector/src/main/java/io/openmessaging/connector/api/data/logical/Decimal.java b/connector/src/main/java/io/openmessaging/connector/api/data/logical/Decimal.java
new file mode 100644
index 0000000..2781710
--- /dev/null
+++ b/connector/src/main/java/io/openmessaging/connector/api/data/logical/Decimal.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.connector.api.data.logical;
+
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.errors.ConnectException;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+
+/**
+ * decimal
+ */
+public class Decimal {
+ public static final String LOGICAL_NAME = "io.openmessaging.connector.api.data.logical.Decimal";
+ public static final String SCALE_FIELD = "scale";
+
+ /**
+ *schema builder
+ * @param scale
+ * @return
+ */
+ public static SchemaBuilder builder(int scale) {
+ return SchemaBuilder.bytes()
+ .name(LOGICAL_NAME)
+ .parameter(SCALE_FIELD, Integer.toString(scale))
+ .version(1);
+ }
+
+ /**
+ * get schema
+ * @param scale
+ * @return
+ */
+ public static Schema schema(int scale){
+ return builder(scale).build();
+ }
+
+ /**
+ * Convert a value from its logical format (BigDecimal) to it's encoded format.
+ * @param schema
+ * @param value
+ * @return
+ */
+ public static byte[] fromLogical(Schema schema, BigDecimal value) {
+ if (value.scale() != scale(schema)) {
+ throw new ConnectException("BigDecimal has mismatching scale value for given Decimal schema");
+ }
+ return value.unscaledValue().toByteArray();
+ }
+
+ public static BigDecimal toLogical(Schema schema, byte[] value) {
+ return new BigDecimal(new BigInteger(value), scale(schema));
+ }
+
+ private static int scale(Schema schema) {
+ String scaleString = schema.getParameters().get(SCALE_FIELD);
+ if (scaleString == null) {
+ throw new ConnectException("Invalid Decimal schema: scale parameter not found.");
+ }
+ try {
+ return Integer.parseInt(scaleString);
+ } catch (NumberFormatException e) {
+ throw new ConnectException("Invalid scale parameter found in Decimal schema: ", e);
+ }
+ }
+}
diff --git a/connector/src/main/java/io/openmessaging/connector/api/data/logical/Time.java b/connector/src/main/java/io/openmessaging/connector/api/data/logical/Time.java
new file mode 100644
index 0000000..f52158c
--- /dev/null
+++ b/connector/src/main/java/io/openmessaging/connector/api/data/logical/Time.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.connector.api.data.logical;
+
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.errors.ConnectException;
+
+import java.util.Calendar;
+import java.util.TimeZone;
+
+
+/**
+ * A time representing a specific point in a day, not tied to any specific date. The corresponding Java type is a
+ * java.util.Date where only hours, minutes, seconds, and milliseconds can be non-zero. This effectively makes it a
+ * point in time during the first day after the Unix epoch. The underlying representation is an integer
+ * representing the number of milliseconds after midnight.
+ */
+public class Time {
+ public static final String LOGICAL_NAME = "io.openmessaging.connector.api.data.logical.Time";
+
+ private static final long MILLIS_PER_DAY = 24 * 60 * 60 * 1000;
+
+ private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
+
+ /**
+ * Returns a SchemaBuilder for a Time. By returning a SchemaBuilder you can override additional schema settings such
+ * as required/optional, default value, and documentation.
+ * @return a SchemaBuilder
+ */
+ private static SchemaBuilder builder() {
+ return SchemaBuilder.int32()
+ .name(LOGICAL_NAME)
+ .version(1);
+ }
+
+ public static final Schema SCHEMA = builder().build();
+
+ /**
+ * Convert a value from its logical format (Time) to it's encoded format.
+ * @param schema
+ * @param value
+ * @return
+ */
+ public static int fromLogical(Schema schema, java.util.Date value) {
+ if (!(LOGICAL_NAME.equals(schema.getName()))) {
+ throw new ConnectException("Requested conversion of Time object but the schema does not match.");
+ }
+ Calendar calendar = Calendar.getInstance(UTC);
+ calendar.setTime(value);
+ long unixMillis = calendar.getTimeInMillis();
+ if (unixMillis < 0 || unixMillis > MILLIS_PER_DAY) {
+ throw new ConnectException("Kafka Connect Time type should not have any date fields set to non-zero values.");
+ }
+ return (int) unixMillis;
+ }
+
+ public static java.util.Date toLogical(Schema schema, int value) {
+ if (!(LOGICAL_NAME.equals(schema.getName()))) {
+ throw new ConnectException("Requested conversion of Date object but the schema does not match.");
+ }
+ if (value < 0 || value > MILLIS_PER_DAY) {
+ throw new ConnectException("Time values must use number of milliseconds greater than 0 and less than 86400000");
+ }
+ return new java.util.Date(value);
+ }
+}
diff --git a/connector/src/main/java/io/openmessaging/connector/api/data/logical/Timestamp.java b/connector/src/main/java/io/openmessaging/connector/api/data/logical/Timestamp.java
new file mode 100644
index 0000000..7f00ce6
--- /dev/null
+++ b/connector/src/main/java/io/openmessaging/connector/api/data/logical/Timestamp.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.connector.api.data.logical;
+
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.errors.ConnectException;
+
+/**
+ * A timestamp representing an absolute time, without timezone information. The corresponding Java type is a
+ * java.util.Date. The underlying representation is a long representing the number of milliseconds since Unix epoch.
+ */
+public class Timestamp {
+ public static final String LOGICAL_NAME = "io.openmessaging.connector.api.data.logical.Timestamp";
+
+
+ /**
+ * build schema
+ * @return
+ */
+ public static SchemaBuilder builder() {
+ return SchemaBuilder.int64()
+ .name(LOGICAL_NAME)
+ .version(1);
+ }
+
+ public static final Schema SCHEMA = builder().build();
+
+ /**
+ * Convert a value from its logical format (Date) to it's encoded format.
+ */
+ public static long fromLogical(Schema schema, java.util.Date value) {
+ if (!(LOGICAL_NAME.equals(schema.getName()))) {
+ throw new ConnectException("Requested conversion of Timestamp object but the schema does not match.");
+ }
+ return value.getTime();
+ }
+
+ public static java.util.Date toLogical(Schema schema, long value) {
+ if (!(LOGICAL_NAME.equals(schema.getName()))) {
+ throw new ConnectException("Requested conversion of Timestamp object but the schema does not match.");
+ }
+ return new java.util.Date(value);
+ }
+}
From 26fe155363e5710e3ba33829a292438965ad742c Mon Sep 17 00:00:00 2001
From: sunxiaojian
Date: Mon, 6 Jun 2022 16:02:50 +0800
Subject: [PATCH 09/22] Schemabuilder add required method
---
.../openmessaging/connector/api/data/SchemaBuilder.java | 9 +++++++++
.../openmessaging/connector/api/data/logical/Time.java | 2 +-
2 files changed, 10 insertions(+), 1 deletion(-)
diff --git a/connector/src/main/java/io/openmessaging/connector/api/data/SchemaBuilder.java b/connector/src/main/java/io/openmessaging/connector/api/data/SchemaBuilder.java
index 1b92a3e..d29738c 100644
--- a/connector/src/main/java/io/openmessaging/connector/api/data/SchemaBuilder.java
+++ b/connector/src/main/java/io/openmessaging/connector/api/data/SchemaBuilder.java
@@ -320,6 +320,15 @@ public SchemaBuilder optional() {
return this;
}
+ /**
+ * Set this schema as required.
+ */
+ public SchemaBuilder required() {
+ checkCanSet(OPTIONAL_FIELD, optional, false);
+ optional = false;
+ return this;
+ }
+
public Map parameters() {
return parameters == null ? null : Collections.unmodifiableMap(parameters);
}
diff --git a/connector/src/main/java/io/openmessaging/connector/api/data/logical/Time.java b/connector/src/main/java/io/openmessaging/connector/api/data/logical/Time.java
index f52158c..6638d8f 100644
--- a/connector/src/main/java/io/openmessaging/connector/api/data/logical/Time.java
+++ b/connector/src/main/java/io/openmessaging/connector/api/data/logical/Time.java
@@ -42,7 +42,7 @@ public class Time {
* as required/optional, default value, and documentation.
* @return a SchemaBuilder
*/
- private static SchemaBuilder builder() {
+ public static SchemaBuilder builder() {
return SchemaBuilder.int32()
.name(LOGICAL_NAME)
.version(1);
From 3e6f1e6a4912fcdfb8c7b356a758e5f04bdcef8c Mon Sep 17 00:00:00 2001
From: sunxiaojian
Date: Mon, 6 Jun 2022 16:12:20 +0800
Subject: [PATCH 10/22] schema add hashCode and equals method
---
.../io/openmessaging/connector/api/data/Schema.java | 12 ++++++++++++
1 file changed, 12 insertions(+)
diff --git a/connector/src/main/java/io/openmessaging/connector/api/data/Schema.java b/connector/src/main/java/io/openmessaging/connector/api/data/Schema.java
index deac30d..3a24eca 100644
--- a/connector/src/main/java/io/openmessaging/connector/api/data/Schema.java
+++ b/connector/src/main/java/io/openmessaging/connector/api/data/Schema.java
@@ -377,6 +377,18 @@ public static FieldType schemaType(Class> klass) {
return null;
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof Schema)) return false;
+ Schema schema = (Schema) o;
+ return isOptional() == schema.isOptional() && Objects.equals(getName(), schema.getName()) && Objects.equals(getVersion(), schema.getVersion()) && Objects.equals(getDefaultValue(), schema.getDefaultValue()) && Objects.equals(getDoc(), schema.getDoc()) && getFieldType() == schema.getFieldType() && Objects.equals(getFields(), schema.getFields()) && Objects.equals(getParameters(), schema.getParameters()) && Objects.equals(getKeySchema(), schema.getKeySchema()) && Objects.equals(getValueSchema(), schema.getValueSchema());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getName(), getVersion(), isOptional(), getDefaultValue(), getDoc(), getFieldType(), getFields(), getParameters(), getKeySchema(), getValueSchema());
+ }
@Override
public String toString() {
From 14765cb460a61de6cc801f8a9bd1b94e821ca55f Mon Sep 17 00:00:00 2001
From: sunxiaojian
Date: Mon, 6 Jun 2022 21:15:29 +0800
Subject: [PATCH 11/22] fixed doc method
---
.../java/io/openmessaging/connector/api/data/SchemaBuilder.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/connector/src/main/java/io/openmessaging/connector/api/data/SchemaBuilder.java b/connector/src/main/java/io/openmessaging/connector/api/data/SchemaBuilder.java
index d29738c..a47fc6c 100644
--- a/connector/src/main/java/io/openmessaging/connector/api/data/SchemaBuilder.java
+++ b/connector/src/main/java/io/openmessaging/connector/api/data/SchemaBuilder.java
@@ -306,7 +306,7 @@ public SchemaBuilder version(Integer version) {
* Set the documentation for this schema.
*/
public SchemaBuilder doc(String doc) {
- checkCanSet(DOC_FIELD, optional, true);
+ checkCanSet(DOC_FIELD, this.doc, doc);
this.doc = doc;
return this;
}
From a6b3c79a2c916ba92f481a876db3232956ffd0f4 Mon Sep 17 00:00:00 2001
From: sunxiaojian
Date: Tue, 7 Jun 2022 12:32:31 +0800
Subject: [PATCH 12/22] Field add equals and hashcode method
---
.../openmessaging/connector/api/data/Field.java | 16 ++++++++++++++++
1 file changed, 16 insertions(+)
diff --git a/connector/src/main/java/io/openmessaging/connector/api/data/Field.java b/connector/src/main/java/io/openmessaging/connector/api/data/Field.java
index 73d53f7..ca8bfe6 100644
--- a/connector/src/main/java/io/openmessaging/connector/api/data/Field.java
+++ b/connector/src/main/java/io/openmessaging/connector/api/data/Field.java
@@ -14,6 +14,8 @@
package io.openmessaging.connector.api.data;
+import java.util.Objects;
+
/**
* Filed of the schema.
*/
@@ -62,6 +64,20 @@ public void setSchema(Schema schema) {
this.schema = schema;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof Field)) return false;
+ Field field = (Field) o;
+ return getIndex() == field.getIndex() && Objects.equals(getName(), field.getName()) && Objects.equals(getSchema(), field.getSchema());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getIndex(), getName(), getSchema());
+ }
+
@Override public String toString() {
return "Field{" +
"index=" + index +
From 427e009dd0d9a950bc5a20f5d4216b840bb1ad1b Mon Sep 17 00:00:00 2001
From: sunxiaojian
Date: Wed, 8 Jun 2022 10:53:10 +0800
Subject: [PATCH 13/22] optimize api #85
---
.../api/component/connector/Connector.java | 21 ++++++-----
.../connector/api/component/task/Task.java | 16 ++++-----
.../component/task/sink/SinkConnector.java | 3 ++
.../api/component/task/sink/SinkTask.java | 29 ++-------------
.../component/task/sink/SinkTaskContext.java | 3 ++
.../task/source/SourceConnector.java | 3 ++
.../api/component/task/source/SourceTask.java | 35 ++-----------------
7 files changed, 30 insertions(+), 80 deletions(-)
diff --git a/connector/src/main/java/io/openmessaging/connector/api/component/connector/Connector.java b/connector/src/main/java/io/openmessaging/connector/api/component/connector/Connector.java
index d9272a5..d444bf1 100644
--- a/connector/src/main/java/io/openmessaging/connector/api/component/connector/Connector.java
+++ b/connector/src/main/java/io/openmessaging/connector/api/component/connector/Connector.java
@@ -23,20 +23,11 @@ public abstract class Connector implements Component {
protected ConnectorContext connectorContext;
- @Override public void init(ConnectorContext connectorContext) {
+ @Override
+ public void init(ConnectorContext connectorContext) {
this.connectorContext = connectorContext;
}
- /**
- * Pause the connector.
- */
- public abstract void pause();
-
- /**
- * Resume the connector.
- */
- public abstract void resume();
-
/**
* Returns a set of configurations for Tasks based on the current configuration,
* producing at most count configurations.
@@ -53,4 +44,12 @@ public abstract class Connector implements Component {
*/
public abstract Class extends Task> taskClass();
+ /**
+ * Should invoke before start the connector.
+ * @param config component config
+ */
+ @Override
+ public void validate(KeyValue config){
+
+ }
}
diff --git a/connector/src/main/java/io/openmessaging/connector/api/component/task/Task.java b/connector/src/main/java/io/openmessaging/connector/api/component/task/Task.java
index 5c34f59..7b83931 100644
--- a/connector/src/main/java/io/openmessaging/connector/api/component/task/Task.java
+++ b/connector/src/main/java/io/openmessaging/connector/api/component/task/Task.java
@@ -14,21 +14,17 @@
package io.openmessaging.connector.api.component.task;
+import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.Component;
import io.openmessaging.connector.api.component.ComponentContext;
public interface Task extends Component {
/**
- * Pause the task.
+ * Should invoke before start the connector.
+ *
+ * @param config component config
*/
- @Deprecated
- void pause();
-
- /**
- * Resume the task.
- */
- @Deprecated
- void resume();
-
+ @Override
+ default void validate(KeyValue config) {}
}
diff --git a/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkConnector.java b/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkConnector.java
index c1f46af..a1a5f60 100644
--- a/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkConnector.java
+++ b/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkConnector.java
@@ -16,6 +16,9 @@
import io.openmessaging.connector.api.component.connector.Connector;
+/**
+ * sink connector
+ */
public abstract class SinkConnector extends Connector {
public SinkConnector() {
diff --git a/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkTask.java b/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkTask.java
index 9fc8f7a..9e722dc 100644
--- a/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkTask.java
+++ b/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkTask.java
@@ -14,7 +14,6 @@
package io.openmessaging.connector.api.component.task.sink;
-import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.task.Task;
import io.openmessaging.connector.api.data.ConnectRecord;
import io.openmessaging.connector.api.data.RecordOffset;
@@ -23,6 +22,9 @@
import java.util.List;
import java.util.Map;
+/**
+ * sink task
+ */
public abstract class SinkTask implements Task {
protected SinkTaskContext sinkTaskContext;
@@ -58,29 +60,4 @@ public Map preCommit(Map {
protected SourceTaskContext sourceTaskContext;
- @Override public void init(SourceTaskContext sourceTaskContext) {
+ @Override
+ public void init(SourceTaskContext sourceTaskContext) {
this.sourceTaskContext = sourceTaskContext;
}
@@ -56,35 +56,4 @@ public void commit(final List connectRecords) throws InterruptedE
* function.
*/
public void commit() { }
-
- /**
- * Get source task context.
- * @return source task context
- */
- @Deprecated
- public SourceTaskContext getContext() {
- return sourceTaskContext;
- }
-
- @Override
- public void validate(KeyValue config){
-
- }
-
- /**
- * Pause the task.
- */
- @Override
- public void pause() {
-
- }
-
- /**
- * Resume the task.
- */
- @Override
- public void resume() {
-
- }
-
}
From c4b1836e6be105124925fece4d49b516ac6a0b53 Mon Sep 17 00:00:00 2001
From: sunxiaojian
Date: Thu, 9 Jun 2022 19:57:43 +0800
Subject: [PATCH 14/22] Optimize transform api #45
---
.../connector/api/component/Transform.java | 19 ++++++++++++++++++-
1 file changed, 18 insertions(+), 1 deletion(-)
diff --git a/connector/src/main/java/io/openmessaging/connector/api/component/Transform.java b/connector/src/main/java/io/openmessaging/connector/api/component/Transform.java
index d912b25..170e62d 100644
--- a/connector/src/main/java/io/openmessaging/connector/api/component/Transform.java
+++ b/connector/src/main/java/io/openmessaging/connector/api/component/Transform.java
@@ -14,6 +14,7 @@
package io.openmessaging.connector.api.component;
+import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.data.ConnectRecord;
/**
@@ -22,7 +23,23 @@
* @version OMS 0.1.0
* @since OMS 0.1.0
*/
-public interface Transform extends Component {
+public interface Transform extends AutoCloseable {
+ /**
+ * config
+ * @param config
+ */
+ void config(KeyValue config);
+ /**
+ * transform record
+ * @param record
+ * @return
+ */
R doTransform(R record);
+
+ /**
+ * close
+ */
+ @Override
+ void close();
}
From c76d2fb339a66d39694197be61c5814ca455d4c0 Mon Sep 17 00:00:00 2001
From: sunxiaojian
Date: Sat, 18 Jun 2022 11:52:43 +0800
Subject: [PATCH 15/22] Optimize transform api and add RecordConverter
---
.../connector/api/component/Component.java | 9 +--
.../connector/api/component/Transform.java | 18 ++---
.../api/component/connector/Connector.java | 8 +-
.../connector/api/component/task/Task.java | 9 ++-
.../api/component/task/sink/SinkTask.java | 5 +-
.../api/component/task/source/SourceTask.java | 4 +
.../connector/api/data/RecordConverter.java | 81 +++++++++++++++++++
7 files changed, 111 insertions(+), 23 deletions(-)
create mode 100644 connector/src/main/java/io/openmessaging/connector/api/data/RecordConverter.java
diff --git a/connector/src/main/java/io/openmessaging/connector/api/component/Component.java b/connector/src/main/java/io/openmessaging/connector/api/component/Component.java
index 0349c2f..8ee512d 100644
--- a/connector/src/main/java/io/openmessaging/connector/api/component/Component.java
+++ b/connector/src/main/java/io/openmessaging/connector/api/component/Component.java
@@ -16,7 +16,7 @@
import io.openmessaging.KeyValue;
-public interface Component {
+public interface Component {
/**
* Should invoke before start the connector.
@@ -25,13 +25,6 @@ public interface Component {
*/
void validate(KeyValue config);
- /**
- * Init the component
- *
- * @param context component context
- */
- void init(R context);
-
/**
* Start the component
*
diff --git a/connector/src/main/java/io/openmessaging/connector/api/component/Transform.java b/connector/src/main/java/io/openmessaging/connector/api/component/Transform.java
index 170e62d..c48cf70 100644
--- a/connector/src/main/java/io/openmessaging/connector/api/component/Transform.java
+++ b/connector/src/main/java/io/openmessaging/connector/api/component/Transform.java
@@ -23,23 +23,21 @@
* @version OMS 0.1.0
* @since OMS 0.1.0
*/
-public interface Transform extends AutoCloseable {
+public interface Transform extends Component {
+
/**
- * config
- * @param config
+ * Should invoke before start the connector.
+ *
+ * @param config component config
*/
- void config(KeyValue config);
+ @Override
+ default void validate(KeyValue config){
+ }
/**
* transform record
* @param record
* @return
*/
R doTransform(R record);
-
- /**
- * close
- */
- @Override
- void close();
}
diff --git a/connector/src/main/java/io/openmessaging/connector/api/component/connector/Connector.java b/connector/src/main/java/io/openmessaging/connector/api/component/connector/Connector.java
index d444bf1..cdde4a1 100644
--- a/connector/src/main/java/io/openmessaging/connector/api/component/connector/Connector.java
+++ b/connector/src/main/java/io/openmessaging/connector/api/component/connector/Connector.java
@@ -16,18 +16,22 @@
import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.Component;
+import io.openmessaging.connector.api.component.ComponentContext;
import io.openmessaging.connector.api.component.task.Task;
import java.util.List;
-public abstract class Connector implements Component {
+public abstract class Connector implements Component {
protected ConnectorContext connectorContext;
- @Override
public void init(ConnectorContext connectorContext) {
this.connectorContext = connectorContext;
}
+ protected ConnectorContext getConnectorContext() {
+ return connectorContext;
+ }
+
/**
* Returns a set of configurations for Tasks based on the current configuration,
* producing at most count configurations.
diff --git a/connector/src/main/java/io/openmessaging/connector/api/component/task/Task.java b/connector/src/main/java/io/openmessaging/connector/api/component/task/Task.java
index 7b83931..73fbf45 100644
--- a/connector/src/main/java/io/openmessaging/connector/api/component/task/Task.java
+++ b/connector/src/main/java/io/openmessaging/connector/api/component/task/Task.java
@@ -18,7 +18,7 @@
import io.openmessaging.connector.api.component.Component;
import io.openmessaging.connector.api.component.ComponentContext;
-public interface Task extends Component {
+public interface Task extends Component{
/**
* Should invoke before start the connector.
@@ -27,4 +27,11 @@ public interface Task extends Component {
*/
@Override
default void validate(KeyValue config) {}
+
+ /**
+ * Init the component
+ *
+ * @param context component context
+ */
+ void init(R context);
}
diff --git a/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkTask.java b/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkTask.java
index 9e722dc..ea8d8ff 100644
--- a/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkTask.java
+++ b/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkTask.java
@@ -23,13 +23,14 @@
import java.util.Map;
/**
- * sink task
+ * The sink task API definition is used to define the logic for data writing
*/
public abstract class SinkTask implements Task {
protected SinkTaskContext sinkTaskContext;
- @Override public void init(SinkTaskContext sinkTaskContext) {
+ @Override
+ public void init(SinkTaskContext sinkTaskContext) {
this.sinkTaskContext = sinkTaskContext;
}
diff --git a/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceTask.java b/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceTask.java
index 4a91049..4d69c56 100644
--- a/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceTask.java
+++ b/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceTask.java
@@ -18,9 +18,13 @@
import io.openmessaging.connector.api.data.ConnectRecord;
import java.util.List;
+/**
+ * The source task API definition is used to define the logic for data pulling
+ */
public abstract class SourceTask implements Task {
protected SourceTaskContext sourceTaskContext;
+
@Override
public void init(SourceTaskContext sourceTaskContext) {
this.sourceTaskContext = sourceTaskContext;
diff --git a/connector/src/main/java/io/openmessaging/connector/api/data/RecordConverter.java b/connector/src/main/java/io/openmessaging/connector/api/data/RecordConverter.java
new file mode 100644
index 0000000..6912492
--- /dev/null
+++ b/connector/src/main/java/io/openmessaging/connector/api/data/RecordConverter.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.connector.api.data;
+
+import io.openmessaging.KeyValue;
+
+import java.util.Map;
+
+/**
+ * The topic parameter is mainly a compatible schema registry
+ * abstract converter
+ */
+public interface RecordConverter {
+
+
+ /**
+ * Config is used for parameter passing in the conversion process
+ * @param configs
+ */
+ default void configure(Map configs) {
+
+ }
+
+ /**
+ * Convert ConnectRecord to byte[]
+ * @param topic the topic associated with the data
+ * @param schema record schema
+ * @param value record value
+ * @return
+ */
+ byte[] fromConnectData(String topic, Schema schema, Object value);
+
+
+ /**
+ * The provided subject and extension may be used in the record as needed.
+ * @param topic the topic associated with the data
+ * @param extensions
+ * @param schema rocketmq connect record schema
+ * @param value rocketmq connect record value
+ * @return
+ */
+ default byte[] fromConnectData(String topic, KeyValue extensions, Schema schema, Object value) {
+ return fromConnectData(topic, schema, value);
+ }
+
+ /**
+ * Convert a byte[] object to a Rocketmq Connect data object.
+ *
+ * @param topic the topic associated with the data
+ * @param value the value to convert
+ * @return an object containing the {@link Schema} and the converted value
+ */
+ SchemaAndValue toConnectData(String topic, byte[] value);
+
+
+ /**
+ * The provided subject and extension may be used in the record as needed.
+ * @param topic the topic associated with the data
+ * @param extensions transform property
+ * @param value
+ * @return
+ */
+ default SchemaAndValue toConnectData(String topic, KeyValue extensions, byte[] value) {
+ return toConnectData(topic, value);
+ }
+
+}
From 5de17b28514284ae6167de0916c30c6828c8e43f Mon Sep 17 00:00:00 2001
From: sunxiaojian
Date: Wed, 22 Jun 2022 18:16:25 +0800
Subject: [PATCH 16/22] Optimize ConnectRecord api #47
---
.../api/component/task/sink/SinkRecord.java | 95 +++++++++++++++
.../component/task/source/SourceRecord.java | 99 ++++++++++++++++
.../connector/api/data/ConnectRecord.java | 112 ++++++++++++------
3 files changed, 269 insertions(+), 37 deletions(-)
create mode 100644 connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkRecord.java
create mode 100644 connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceRecord.java
diff --git a/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkRecord.java b/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkRecord.java
new file mode 100644
index 0000000..375e295
--- /dev/null
+++ b/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkRecord.java
@@ -0,0 +1,95 @@
+package io.openmessaging.connector.api.component.task.sink;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.Schema;
+
+import java.util.Objects;
+
+/**
+ * sink connect record
+ */
+public class SinkRecord extends ConnectRecord {
+
+ private final String brokerName;
+ private final long queueOffset;
+
+ public SinkRecord(String brokerName, long queueOffset,String topic, Integer queueId, Schema schema, Object data) {
+ this(brokerName, queueOffset, topic, queueId, null, schema, data ,null );
+ }
+
+ public SinkRecord(String brokerName, long queueOffset,String topic, Integer queueId, Schema schema, Object data, KeyValue extensions) {
+ this(brokerName, queueOffset, topic, queueId, null, schema, data ,extensions );
+ }
+
+ public SinkRecord(String brokerName, long queueOffset,String topic, Integer queueId, Long timestamp, Schema schema, Object data) {
+ this(brokerName, queueOffset, topic, queueId, timestamp, schema, data ,null );
+ }
+
+ public SinkRecord(String brokerName, long queueOffset, String topic, Integer queueId, Long timestamp, Schema schema, Object data, KeyValue extensions) {
+ super(topic, queueId, timestamp, schema, data, extensions);
+ this.brokerName = brokerName;
+ this.queueOffset = queueOffset;
+ }
+
+ public String brokerName(){
+ return brokerName;
+ }
+
+ public long queueOffset(){
+ return queueOffset;
+ }
+
+ /**
+ * new record
+ *
+ * @param topic
+ * @param queueId
+ * @param schema
+ * @param data
+ * @param timestamp
+ * @return
+ */
+ @Override
+ public SinkRecord newRecord(String topic, Integer queueId, Schema schema, Object data, Long timestamp) {
+ return newRecord(topic,queueId,schema,data,timestamp, null);
+ }
+
+ /**
+ * new record
+ *
+ * @param topic
+ * @param queueId
+ * @param schema
+ * @param data
+ * @param timestamp
+ * @param extensions
+ * @return
+ */
+ @Override
+ public SinkRecord newRecord(String topic, Integer queueId, Schema schema, Object data, Long timestamp, KeyValue extensions) {
+ return new SinkRecord(brokerName(), queueOffset(), topic, queueId, timestamp, schema, data, extensions);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof SinkRecord)) return false;
+ if (!super.equals(o)) return false;
+ SinkRecord that = (SinkRecord) o;
+ return queueOffset == that.queueOffset && Objects.equals(brokerName, that.brokerName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), brokerName, queueOffset);
+ }
+
+ @Override
+ public String toString() {
+ return "SinkRecord{" +
+ "brokerName='" + brokerName + '\'' +
+ ", queueOffset=" + queueOffset +
+ "} " + super.toString();
+ }
+}
diff --git a/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceRecord.java b/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceRecord.java
new file mode 100644
index 0000000..4b4797a
--- /dev/null
+++ b/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceRecord.java
@@ -0,0 +1,99 @@
+package io.openmessaging.connector.api.component.task.source;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.data.RecordPosition;
+import io.openmessaging.connector.api.data.Schema;
+
+import java.util.Objects;
+
+/**
+ * source connect record
+ */
+public class SourceRecord extends ConnectRecord {
+
+ private final RecordPosition position;
+
+ public SourceRecord(RecordPartition recordPartition, RecordOffset recordOffset, String topic, Schema schema, Object data) {
+ this(recordPartition, recordOffset, topic, null , null,schema , data, null);
+ }
+
+ public SourceRecord(RecordPartition recordPartition, RecordOffset recordOffset, String topic, Schema schema, Object data, KeyValue extensions) {
+ this(recordPartition, recordOffset, topic, null , null,schema , data, extensions);
+ }
+
+ public SourceRecord(RecordPartition recordPartition, RecordOffset recordOffset,String topic, Integer queueId, Schema schema, Object data) {
+ this(recordPartition, recordOffset, topic, queueId, null, schema , data, null);
+ }
+
+ public SourceRecord(RecordPartition recordPartition, RecordOffset recordOffset,String topic, Integer queueId, Schema schema, Object data, KeyValue extensions) {
+ this(recordPartition, recordOffset, topic, queueId, null, schema , data, extensions);
+ }
+
+ public SourceRecord(RecordPartition recordPartition, RecordOffset recordOffset,String topic, Integer queueId, Long timestamp, Schema schema, Object data) {
+ this(recordPartition, recordOffset, topic, queueId, timestamp, schema , data, null);
+ }
+
+ public SourceRecord(RecordPartition recordPartition, RecordOffset recordOffset, String topic, Integer queueId, Long timestamp, Schema schema, Object data, KeyValue extensions) {
+ super(topic, queueId, timestamp, schema, data, extensions);
+ this.position = new RecordPosition(recordPartition, recordOffset);
+ }
+
+ public RecordPosition position() {
+ return position;
+ }
+
+ /**
+ * new record
+ *
+ * @param topic
+ * @param queueId
+ * @param schema
+ * @param data
+ * @param timestamp
+ * @return
+ */
+ @Override
+ public SourceRecord newRecord(String topic, Integer queueId, Schema schema, Object data, Long timestamp) {
+ return newRecord(topic, queueId, schema , data, timestamp, null );
+ }
+
+ /**
+ * new record
+ *
+ * @param topic
+ * @param queueId
+ * @param schema
+ * @param data
+ * @param timestamp
+ * @param extensions
+ * @return
+ */
+ @Override
+ public SourceRecord newRecord(String topic, Integer queueId, Schema schema, Object data, Long timestamp, KeyValue extensions) {
+ return new SourceRecord(position().getPartition(), position().getOffset(), topic, queueId, timestamp, schema, data, extensions);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof SourceRecord)) return false;
+ if (!super.equals(o)) return false;
+ SourceRecord that = (SourceRecord) o;
+ return Objects.equals(position, that.position);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), position);
+ }
+
+ @Override
+ public String toString() {
+ return "SourceRecord{" +
+ "position=" + position +
+ "} " + super.toString();
+ }
+}
diff --git a/connector/src/main/java/io/openmessaging/connector/api/data/ConnectRecord.java b/connector/src/main/java/io/openmessaging/connector/api/data/ConnectRecord.java
index cf79206..f05da50 100644
--- a/connector/src/main/java/io/openmessaging/connector/api/data/ConnectRecord.java
+++ b/connector/src/main/java/io/openmessaging/connector/api/data/ConnectRecord.java
@@ -22,7 +22,16 @@
/**
* SourceDataEntries are generated by SourceTasks and passed to specific message queue to store.
*/
-public class ConnectRecord {
+public abstract class ConnectRecord> {
+ /**
+ * Topic of the data entry.
+ */
+ private String topic;
+
+ /**
+ * queueId of the entry
+ */
+ private Integer queueId;
/**
* Timestamp of the data entry.
@@ -39,28 +48,63 @@ public class ConnectRecord {
*/
private Object data;
- /**
- * Position of record
- */
- private RecordPosition position;
-
/**
* Extension properties
*/
private KeyValue extensions;
- public ConnectRecord(RecordPartition recordPartition, RecordOffset recordOffset,
- Long timestamp) {
- this(recordPartition, recordOffset, timestamp, null, null);
+ public ConnectRecord(String topic, Integer queueId, Long timestamp, Schema schema, Object data) {
+ this(topic, queueId, timestamp, schema, data, null);
}
- public ConnectRecord(RecordPartition recordPartition, RecordOffset recordOffset,
- Long timestamp, Schema schema,
- Object data) {
- this.position = new RecordPosition(recordPartition, recordOffset);
+ public ConnectRecord(String topic, Integer queueId, Long timestamp, Schema schema, Object data,KeyValue extensions) {
+ this.topic = topic;
+ this.queueId = queueId;
this.schema = schema;
this.timestamp = timestamp;
this.data = data;
+ this.extensions = extensions;
+ }
+
+
+ /**
+ * new record
+ * @param topic
+ * @param queueId
+ * @param schema
+ * @param data
+ * @param timestamp
+ * @return
+ */
+ public abstract R newRecord(String topic, Integer queueId, Schema schema, Object data, Long timestamp);
+
+ /**
+ * new record
+ * @param topic
+ * @param queueId
+ * @param schema
+ * @param data
+ * @param timestamp
+ * @param extensions
+ * @return
+ */
+ public abstract R newRecord(String topic, Integer queueId, Schema schema, Object data, Long timestamp, KeyValue extensions);
+
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public Integer getQueueId() {
+ return queueId;
+ }
+
+ public void setQueueId(Integer queueId) {
+ this.queueId = queueId;
}
public Long getTimestamp() {
@@ -119,34 +163,28 @@ public String getExtension(String key) {
return this.extensions.getString(key);
}
- public RecordPosition getPosition() {
- return position;
- }
-
- public void setPosition(RecordPosition position) {
- this.position = position;
- }
-
- @Override public boolean equals(Object o) {
- if (this == o)
- return true;
- if (!(o instanceof ConnectRecord))
- return false;
- ConnectRecord that = (ConnectRecord) o;
- return Objects.equals(timestamp, that.timestamp) && Objects.equals(schema, that.schema) && Objects.equals(data, that.data) && Objects.equals(position, that.position) && Objects.equals(extensions, that.extensions);
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof ConnectRecord)) return false;
+ ConnectRecord record = (ConnectRecord) o;
+ return topic.equals(record.topic) && queueId.equals(record.queueId) && getTimestamp().equals(record.getTimestamp()) && getSchema().equals(record.getSchema()) && getData().equals(record.getData()) && getExtensions().equals(record.getExtensions());
}
- @Override public int hashCode() {
- return Objects.hash(timestamp, schema, data, position, extensions);
+ @Override
+ public int hashCode() {
+ return Objects.hash(topic, queueId, getTimestamp(), getSchema(), getData(), getExtensions());
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "ConnectRecord{" +
- "timestamp=" + timestamp +
- ", schema=" + schema +
- ", data=" + data +
- ", position=" + position +
- ", extensions=" + extensions +
- '}';
+ "topic='" + topic + '\'' +
+ ", queueId=" + queueId +
+ ", timestamp=" + timestamp +
+ ", schema=" + schema +
+ ", data=" + data +
+ ", extensions=" + extensions +
+ '}';
}
}
From 898863cb317029e34ed9a1680816449ed8a1095c Mon Sep 17 00:00:00 2001
From: sunxiaojian
Date: Wed, 22 Jun 2022 18:24:40 +0800
Subject: [PATCH 17/22] upgrade api to 0.1.4-SNAPSHOT;
---
connector/pom.xml | 2 +-
.../connector/api/component/task/sink/SinkTask.java | 4 ++--
.../connector/api/component/task/source/SourceTask.java | 8 ++++----
pom.xml | 2 +-
4 files changed, 8 insertions(+), 8 deletions(-)
diff --git a/connector/pom.xml b/connector/pom.xml
index d9f79d9..257ccbe 100644
--- a/connector/pom.xml
+++ b/connector/pom.xml
@@ -15,7 +15,7 @@
io.openmessaging
openmessaging-connect
- 0.1.3-SNAPSHOT
+ 0.1.4-SNAPSHOT
4.0.0
diff --git a/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkTask.java b/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkTask.java
index ea8d8ff..69c5b92 100644
--- a/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkTask.java
+++ b/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkTask.java
@@ -37,9 +37,9 @@ public void init(SinkTaskContext sinkTaskContext) {
/**
* Put the records to the sink
*
- * @param sinkRecords sink records
+ * @param records sink records
*/
- public abstract void put(List sinkRecords) throws ConnectException;
+ public abstract void put(List records) throws ConnectException;
/**
* Flush the records to the sink
diff --git a/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceTask.java b/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceTask.java
index 4d69c56..d2f319f 100644
--- a/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceTask.java
+++ b/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceTask.java
@@ -33,10 +33,10 @@ public void init(SourceTaskContext sourceTaskContext) {
/**
* Poll this source task for new records.
*
- * @return connectRecord list
+ * @return SourceRecord list
* @throws InterruptedException task thread interupt exception
*/
- public abstract List poll() throws InterruptedException;
+ public abstract List poll() throws InterruptedException;
/**
*
@@ -49,9 +49,9 @@ public void init(SourceTaskContext sourceTaskContext) {
*
*
* @throws InterruptedException task thread interupt exception
- * @param connectRecords connect records
+ * @param records connect records
*/
- public void commit(final List connectRecords) throws InterruptedException {
+ public void commit(final List records) throws InterruptedException {
commit();
}
diff --git a/pom.xml b/pom.xml
index 2177f74..cc8793c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -22,7 +22,7 @@
io.openmessaging
openmessaging-connect
- 0.1.3-SNAPSHOT
+ 0.1.4-SNAPSHOT
OpenMessaging Connect
pom
From 6c089586f6ed9885ff595539ee2f2a9372bd57b1 Mon Sep 17 00:00:00 2001
From: sunxiaojian
Date: Wed, 22 Jun 2022 18:42:44 +0800
Subject: [PATCH 18/22] optimize connector
---
.../api/component/connector/Connector.java | 2 +-
.../component/task/sink/SinkConnector.java | 6 ++--
.../task/sink/SinkConnectorContext.java | 27 ++++++++++++++++
.../task/source/SourceConnector.java | 4 +++
.../task/source/SourceConnectorContext.java | 32 +++++++++++++++++++
5 files changed, 68 insertions(+), 3 deletions(-)
create mode 100644 connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkConnectorContext.java
create mode 100644 connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceConnectorContext.java
diff --git a/connector/src/main/java/io/openmessaging/connector/api/component/connector/Connector.java b/connector/src/main/java/io/openmessaging/connector/api/component/connector/Connector.java
index cdde4a1..1c6fb2a 100644
--- a/connector/src/main/java/io/openmessaging/connector/api/component/connector/Connector.java
+++ b/connector/src/main/java/io/openmessaging/connector/api/component/connector/Connector.java
@@ -28,7 +28,7 @@ public void init(ConnectorContext connectorContext) {
this.connectorContext = connectorContext;
}
- protected ConnectorContext getConnectorContext() {
+ protected ConnectorContext context() {
return connectorContext;
}
diff --git a/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkConnector.java b/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkConnector.java
index a1a5f60..018ee3f 100644
--- a/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkConnector.java
+++ b/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkConnector.java
@@ -21,7 +21,9 @@
*/
public abstract class SinkConnector extends Connector {
- public SinkConnector() {
- super();
+
+ @Override
+ protected SinkConnectorContext context() {
+ return (SinkConnectorContext) connectorContext;
}
}
diff --git a/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkConnectorContext.java b/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkConnectorContext.java
new file mode 100644
index 0000000..ef26c4f
--- /dev/null
+++ b/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkConnectorContext.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.connector.api.component.task.sink;
+
+
+import io.openmessaging.connector.api.component.connector.ConnectorContext;
+
+/**
+ * source connector context
+ */
+public interface SinkConnectorContext extends ConnectorContext {
+
+}
diff --git a/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceConnector.java b/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceConnector.java
index d9e4a35..95a5267 100644
--- a/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceConnector.java
+++ b/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceConnector.java
@@ -21,4 +21,8 @@
*/
public abstract class SourceConnector extends Connector {
+ @Override
+ protected SourceConnectorContext context() {
+ return (SourceConnectorContext) connectorContext;
+ }
}
diff --git a/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceConnectorContext.java b/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceConnectorContext.java
new file mode 100644
index 0000000..9b2dcfe
--- /dev/null
+++ b/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceConnectorContext.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.connector.api.component.task.source;
+
+
+import io.openmessaging.connector.api.component.connector.ConnectorContext;
+import io.openmessaging.connector.api.storage.OffsetStorageReader;
+
+/**
+ * source connector context
+ */
+public interface SourceConnectorContext extends ConnectorContext {
+
+ /**
+ * @return the OffsetStorageReader for this connector.
+ */
+ OffsetStorageReader offsetStorageReader();
+}
From ea7e3c0c5e4c266084c825198afa41111a71fe96 Mon Sep 17 00:00:00 2001
From: sunxiaojian
Date: Wed, 13 Jul 2022 10:44:12 +0800
Subject: [PATCH 19/22] [ISSUE#47]Optimize ConnectRecord api #50
---
.../api/component/task/sink/SinkRecord.java | 9 +++++-
.../component/task/source/SourceRecord.java | 22 +++++----------
.../connector/api/data/ConnectRecord.java | 28 +++++--------------
3 files changed, 22 insertions(+), 37 deletions(-)
diff --git a/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkRecord.java b/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkRecord.java
index 375e295..b949520 100644
--- a/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkRecord.java
+++ b/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkRecord.java
@@ -11,6 +11,7 @@
*/
public class SinkRecord extends ConnectRecord {
+ private Integer queueId;
private final String brokerName;
private final long queueOffset;
@@ -27,9 +28,15 @@ public SinkRecord(String brokerName, long queueOffset,String topic, Integer queu
}
public SinkRecord(String brokerName, long queueOffset, String topic, Integer queueId, Long timestamp, Schema schema, Object data, KeyValue extensions) {
- super(topic, queueId, timestamp, schema, data, extensions);
+ super(topic, timestamp, schema, data, extensions);
this.brokerName = brokerName;
this.queueOffset = queueOffset;
+ this.queueId = queueId;
+ }
+
+
+ public Integer queueId(){
+ return queueId;
}
public String brokerName(){
diff --git a/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceRecord.java b/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceRecord.java
index 4b4797a..571fdad 100644
--- a/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceRecord.java
+++ b/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceRecord.java
@@ -16,28 +16,20 @@ public class SourceRecord extends ConnectRecord {
private final RecordPosition position;
- public SourceRecord(RecordPartition recordPartition, RecordOffset recordOffset, String topic, Schema schema, Object data) {
- this(recordPartition, recordOffset, topic, null , null,schema , data, null);
- }
-
public SourceRecord(RecordPartition recordPartition, RecordOffset recordOffset, String topic, Schema schema, Object data, KeyValue extensions) {
- this(recordPartition, recordOffset, topic, null , null,schema , data, extensions);
- }
-
- public SourceRecord(RecordPartition recordPartition, RecordOffset recordOffset,String topic, Integer queueId, Schema schema, Object data) {
- this(recordPartition, recordOffset, topic, queueId, null, schema , data, null);
+ this(recordPartition, recordOffset, topic, null,schema , data, extensions);
}
- public SourceRecord(RecordPartition recordPartition, RecordOffset recordOffset,String topic, Integer queueId, Schema schema, Object data, KeyValue extensions) {
- this(recordPartition, recordOffset, topic, queueId, null, schema , data, extensions);
+ public SourceRecord(RecordPartition recordPartition, RecordOffset recordOffset,String topic, Schema schema, Object data) {
+ this(recordPartition, recordOffset, topic, null, schema , data, null);
}
- public SourceRecord(RecordPartition recordPartition, RecordOffset recordOffset,String topic, Integer queueId, Long timestamp, Schema schema, Object data) {
- this(recordPartition, recordOffset, topic, queueId, timestamp, schema , data, null);
+ public SourceRecord(RecordPartition recordPartition, RecordOffset recordOffset,String topic, Long timestamp, Schema schema, Object data) {
+ this(recordPartition, recordOffset, topic, timestamp, schema , data, null);
}
- public SourceRecord(RecordPartition recordPartition, RecordOffset recordOffset, String topic, Integer queueId, Long timestamp, Schema schema, Object data, KeyValue extensions) {
- super(topic, queueId, timestamp, schema, data, extensions);
+ public SourceRecord(RecordPartition recordPartition, RecordOffset recordOffset, String topic, Long timestamp, Schema schema, Object data, KeyValue extensions) {
+ super(topic, timestamp, schema, data, extensions);
this.position = new RecordPosition(recordPartition, recordOffset);
}
diff --git a/connector/src/main/java/io/openmessaging/connector/api/data/ConnectRecord.java b/connector/src/main/java/io/openmessaging/connector/api/data/ConnectRecord.java
index f05da50..d7d619e 100644
--- a/connector/src/main/java/io/openmessaging/connector/api/data/ConnectRecord.java
+++ b/connector/src/main/java/io/openmessaging/connector/api/data/ConnectRecord.java
@@ -28,11 +28,6 @@ public abstract class ConnectRecord> {
*/
private String topic;
- /**
- * queueId of the entry
- */
- private Integer queueId;
-
/**
* Timestamp of the data entry.
*/
@@ -53,13 +48,12 @@ public abstract class ConnectRecord> {
*/
private KeyValue extensions;
- public ConnectRecord(String topic, Integer queueId, Long timestamp, Schema schema, Object data) {
- this(topic, queueId, timestamp, schema, data, null);
+ public ConnectRecord(String topic, Long timestamp, Schema schema, Object data) {
+ this(topic, timestamp, schema, data, null);
}
- public ConnectRecord(String topic, Integer queueId, Long timestamp, Schema schema, Object data,KeyValue extensions) {
+ public ConnectRecord(String topic, Long timestamp, Schema schema, Object data,KeyValue extensions) {
this.topic = topic;
- this.queueId = queueId;
this.schema = schema;
this.timestamp = timestamp;
this.data = data;
@@ -99,14 +93,6 @@ public void setTopic(String topic) {
this.topic = topic;
}
- public Integer getQueueId() {
- return queueId;
- }
-
- public void setQueueId(Integer queueId) {
- this.queueId = queueId;
- }
-
public Long getTimestamp() {
return timestamp;
}
@@ -163,24 +149,24 @@ public String getExtension(String key) {
return this.extensions.getString(key);
}
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof ConnectRecord)) return false;
- ConnectRecord record = (ConnectRecord) o;
- return topic.equals(record.topic) && queueId.equals(record.queueId) && getTimestamp().equals(record.getTimestamp()) && getSchema().equals(record.getSchema()) && getData().equals(record.getData()) && getExtensions().equals(record.getExtensions());
+ ConnectRecord> that = (ConnectRecord>) o;
+ return Objects.equals(topic, that.topic) && Objects.equals(timestamp, that.timestamp) && Objects.equals(schema, that.schema) && Objects.equals(data, that.data) && Objects.equals(extensions, that.extensions);
}
@Override
public int hashCode() {
- return Objects.hash(topic, queueId, getTimestamp(), getSchema(), getData(), getExtensions());
+ return Objects.hash(topic, timestamp, schema, data, extensions);
}
@Override
public String toString() {
return "ConnectRecord{" +
"topic='" + topic + '\'' +
- ", queueId=" + queueId +
", timestamp=" + timestamp +
", schema=" + schema +
", data=" + data +
From fabb29edea46868bb4d4d7562f30dafd7d784d78 Mon Sep 17 00:00:00 2001
From: sunxiaojian
Date: Wed, 13 Jul 2022 10:53:17 +0800
Subject: [PATCH 20/22] Optimize ConnectRecord remove queueId field #50
---
.../api/component/task/sink/SinkRecord.java | 33 +++++++++++++------
.../component/task/source/SourceRecord.java | 24 ++++++++++----
.../connector/api/data/ConnectRecord.java | 6 ++--
3 files changed, 43 insertions(+), 20 deletions(-)
diff --git a/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkRecord.java b/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkRecord.java
index b949520..5319082 100644
--- a/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkRecord.java
+++ b/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkRecord.java
@@ -1,3 +1,17 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package io.openmessaging.connector.api.component.task.sink;
import io.openmessaging.KeyValue;
@@ -11,7 +25,7 @@
*/
public class SinkRecord extends ConnectRecord {
- private Integer queueId;
+ private final Integer queueId;
private final String brokerName;
private final long queueOffset;
@@ -51,22 +65,20 @@ public long queueOffset(){
* new record
*
* @param topic
- * @param queueId
* @param schema
* @param data
* @param timestamp
* @return
*/
@Override
- public SinkRecord newRecord(String topic, Integer queueId, Schema schema, Object data, Long timestamp) {
- return newRecord(topic,queueId,schema,data,timestamp, null);
+ public SinkRecord newRecord(String topic, Schema schema, Object data, Long timestamp) {
+ return newRecord(topic, schema,data,timestamp, null);
}
/**
* new record
*
* @param topic
- * @param queueId
* @param schema
* @param data
* @param timestamp
@@ -74,8 +86,8 @@ public SinkRecord newRecord(String topic, Integer queueId, Schema schema, Object
* @return
*/
@Override
- public SinkRecord newRecord(String topic, Integer queueId, Schema schema, Object data, Long timestamp, KeyValue extensions) {
- return new SinkRecord(brokerName(), queueOffset(), topic, queueId, timestamp, schema, data, extensions);
+ public SinkRecord newRecord(String topic, Schema schema, Object data, Long timestamp, KeyValue extensions) {
+ return new SinkRecord(brokerName(), queueOffset(), topic, queueId(), timestamp, schema, data, extensions);
}
@Override
@@ -84,18 +96,19 @@ public boolean equals(Object o) {
if (!(o instanceof SinkRecord)) return false;
if (!super.equals(o)) return false;
SinkRecord that = (SinkRecord) o;
- return queueOffset == that.queueOffset && Objects.equals(brokerName, that.brokerName);
+ return queueOffset == that.queueOffset && Objects.equals(queueId, that.queueId) && Objects.equals(brokerName, that.brokerName);
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), brokerName, queueOffset);
+ return Objects.hash(super.hashCode(), queueId, brokerName, queueOffset);
}
@Override
public String toString() {
return "SinkRecord{" +
- "brokerName='" + brokerName + '\'' +
+ "queueId=" + queueId +
+ ", brokerName='" + brokerName + '\'' +
", queueOffset=" + queueOffset +
"} " + super.toString();
}
diff --git a/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceRecord.java b/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceRecord.java
index 571fdad..155b22b 100644
--- a/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceRecord.java
+++ b/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceRecord.java
@@ -1,3 +1,17 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package io.openmessaging.connector.api.component.task.source;
import io.openmessaging.KeyValue;
@@ -41,22 +55,20 @@ public RecordPosition position() {
* new record
*
* @param topic
- * @param queueId
* @param schema
* @param data
* @param timestamp
* @return
*/
@Override
- public SourceRecord newRecord(String topic, Integer queueId, Schema schema, Object data, Long timestamp) {
- return newRecord(topic, queueId, schema , data, timestamp, null );
+ public SourceRecord newRecord(String topic, Schema schema, Object data, Long timestamp) {
+ return newRecord(topic, schema , data, timestamp, null );
}
/**
* new record
*
* @param topic
- * @param queueId
* @param schema
* @param data
* @param timestamp
@@ -64,8 +76,8 @@ public SourceRecord newRecord(String topic, Integer queueId, Schema schema, Obje
* @return
*/
@Override
- public SourceRecord newRecord(String topic, Integer queueId, Schema schema, Object data, Long timestamp, KeyValue extensions) {
- return new SourceRecord(position().getPartition(), position().getOffset(), topic, queueId, timestamp, schema, data, extensions);
+ public SourceRecord newRecord(String topic, Schema schema, Object data, Long timestamp, KeyValue extensions) {
+ return new SourceRecord(position().getPartition(), position().getOffset(), topic, timestamp, schema, data, extensions);
}
@Override
diff --git a/connector/src/main/java/io/openmessaging/connector/api/data/ConnectRecord.java b/connector/src/main/java/io/openmessaging/connector/api/data/ConnectRecord.java
index d7d619e..c1c2e29 100644
--- a/connector/src/main/java/io/openmessaging/connector/api/data/ConnectRecord.java
+++ b/connector/src/main/java/io/openmessaging/connector/api/data/ConnectRecord.java
@@ -64,25 +64,23 @@ public ConnectRecord(String topic, Long timestamp, Schema schema, Object data,Ke
/**
* new record
* @param topic
- * @param queueId
* @param schema
* @param data
* @param timestamp
* @return
*/
- public abstract R newRecord(String topic, Integer queueId, Schema schema, Object data, Long timestamp);
+ public abstract R newRecord(String topic, Schema schema, Object data, Long timestamp);
/**
* new record
* @param topic
- * @param queueId
* @param schema
* @param data
* @param timestamp
* @param extensions
* @return
*/
- public abstract R newRecord(String topic, Integer queueId, Schema schema, Object data, Long timestamp, KeyValue extensions);
+ public abstract R newRecord(String topic, Schema schema, Object data, Long timestamp, KeyValue extensions);
public String getTopic() {
From e94593e4d8e9718bcae01e73f4bb47047ab1508b Mon Sep 17 00:00:00 2001
From: sunxiaojian
Date: Wed, 13 Jul 2022 10:53:17 +0800
Subject: [PATCH 21/22] Optimize ConnectRecord remove queueId field #47
---
.../api/component/task/sink/SinkRecord.java | 33 +++++++++++++------
.../component/task/source/SourceRecord.java | 24 ++++++++++----
.../connector/api/data/ConnectRecord.java | 6 ++--
3 files changed, 43 insertions(+), 20 deletions(-)
diff --git a/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkRecord.java b/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkRecord.java
index b949520..5319082 100644
--- a/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkRecord.java
+++ b/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkRecord.java
@@ -1,3 +1,17 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package io.openmessaging.connector.api.component.task.sink;
import io.openmessaging.KeyValue;
@@ -11,7 +25,7 @@
*/
public class SinkRecord extends ConnectRecord {
- private Integer queueId;
+ private final Integer queueId;
private final String brokerName;
private final long queueOffset;
@@ -51,22 +65,20 @@ public long queueOffset(){
* new record
*
* @param topic
- * @param queueId
* @param schema
* @param data
* @param timestamp
* @return
*/
@Override
- public SinkRecord newRecord(String topic, Integer queueId, Schema schema, Object data, Long timestamp) {
- return newRecord(topic,queueId,schema,data,timestamp, null);
+ public SinkRecord newRecord(String topic, Schema schema, Object data, Long timestamp) {
+ return newRecord(topic, schema,data,timestamp, null);
}
/**
* new record
*
* @param topic
- * @param queueId
* @param schema
* @param data
* @param timestamp
@@ -74,8 +86,8 @@ public SinkRecord newRecord(String topic, Integer queueId, Schema schema, Object
* @return
*/
@Override
- public SinkRecord newRecord(String topic, Integer queueId, Schema schema, Object data, Long timestamp, KeyValue extensions) {
- return new SinkRecord(brokerName(), queueOffset(), topic, queueId, timestamp, schema, data, extensions);
+ public SinkRecord newRecord(String topic, Schema schema, Object data, Long timestamp, KeyValue extensions) {
+ return new SinkRecord(brokerName(), queueOffset(), topic, queueId(), timestamp, schema, data, extensions);
}
@Override
@@ -84,18 +96,19 @@ public boolean equals(Object o) {
if (!(o instanceof SinkRecord)) return false;
if (!super.equals(o)) return false;
SinkRecord that = (SinkRecord) o;
- return queueOffset == that.queueOffset && Objects.equals(brokerName, that.brokerName);
+ return queueOffset == that.queueOffset && Objects.equals(queueId, that.queueId) && Objects.equals(brokerName, that.brokerName);
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), brokerName, queueOffset);
+ return Objects.hash(super.hashCode(), queueId, brokerName, queueOffset);
}
@Override
public String toString() {
return "SinkRecord{" +
- "brokerName='" + brokerName + '\'' +
+ "queueId=" + queueId +
+ ", brokerName='" + brokerName + '\'' +
", queueOffset=" + queueOffset +
"} " + super.toString();
}
diff --git a/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceRecord.java b/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceRecord.java
index 571fdad..155b22b 100644
--- a/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceRecord.java
+++ b/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceRecord.java
@@ -1,3 +1,17 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package io.openmessaging.connector.api.component.task.source;
import io.openmessaging.KeyValue;
@@ -41,22 +55,20 @@ public RecordPosition position() {
* new record
*
* @param topic
- * @param queueId
* @param schema
* @param data
* @param timestamp
* @return
*/
@Override
- public SourceRecord newRecord(String topic, Integer queueId, Schema schema, Object data, Long timestamp) {
- return newRecord(topic, queueId, schema , data, timestamp, null );
+ public SourceRecord newRecord(String topic, Schema schema, Object data, Long timestamp) {
+ return newRecord(topic, schema , data, timestamp, null );
}
/**
* new record
*
* @param topic
- * @param queueId
* @param schema
* @param data
* @param timestamp
@@ -64,8 +76,8 @@ public SourceRecord newRecord(String topic, Integer queueId, Schema schema, Obje
* @return
*/
@Override
- public SourceRecord newRecord(String topic, Integer queueId, Schema schema, Object data, Long timestamp, KeyValue extensions) {
- return new SourceRecord(position().getPartition(), position().getOffset(), topic, queueId, timestamp, schema, data, extensions);
+ public SourceRecord newRecord(String topic, Schema schema, Object data, Long timestamp, KeyValue extensions) {
+ return new SourceRecord(position().getPartition(), position().getOffset(), topic, timestamp, schema, data, extensions);
}
@Override
diff --git a/connector/src/main/java/io/openmessaging/connector/api/data/ConnectRecord.java b/connector/src/main/java/io/openmessaging/connector/api/data/ConnectRecord.java
index d7d619e..c1c2e29 100644
--- a/connector/src/main/java/io/openmessaging/connector/api/data/ConnectRecord.java
+++ b/connector/src/main/java/io/openmessaging/connector/api/data/ConnectRecord.java
@@ -64,25 +64,23 @@ public ConnectRecord(String topic, Long timestamp, Schema schema, Object data,Ke
/**
* new record
* @param topic
- * @param queueId
* @param schema
* @param data
* @param timestamp
* @return
*/
- public abstract R newRecord(String topic, Integer queueId, Schema schema, Object data, Long timestamp);
+ public abstract R newRecord(String topic, Schema schema, Object data, Long timestamp);
/**
* new record
* @param topic
- * @param queueId
* @param schema
* @param data
* @param timestamp
* @param extensions
* @return
*/
- public abstract R newRecord(String topic, Integer queueId, Schema schema, Object data, Long timestamp, KeyValue extensions);
+ public abstract R newRecord(String topic, Schema schema, Object data, Long timestamp, KeyValue extensions);
public String getTopic() {
From c254a7a809164d35dbc78a1091ca8928af79cd39 Mon Sep 17 00:00:00 2001
From: sunxiaojian
Date: Fri, 15 Jul 2022 14:35:43 +0800
Subject: [PATCH 22/22] Connectrecord add a key field to identify the unique
data #53
---
.../connector/api/data/ConnectRecord.java | 105 ++++++++++++++----
1 file changed, 82 insertions(+), 23 deletions(-)
diff --git a/connector/src/main/java/io/openmessaging/connector/api/data/ConnectRecord.java b/connector/src/main/java/io/openmessaging/connector/api/data/ConnectRecord.java
index cf79206..67f8411 100644
--- a/connector/src/main/java/io/openmessaging/connector/api/data/ConnectRecord.java
+++ b/connector/src/main/java/io/openmessaging/connector/api/data/ConnectRecord.java
@@ -29,6 +29,16 @@ public class ConnectRecord {
*/
private Long timestamp;
+ /**
+ * key schema
+ */
+ private Schema keySchema;
+
+ /**
+ * Payload of the key entry.
+ */
+ private Object key;
+
/**
* Schema of the data entry.
*/
@@ -63,6 +73,22 @@ public ConnectRecord(RecordPartition recordPartition, RecordOffset recordOffset,
this.data = data;
}
+ public ConnectRecord(RecordPartition recordPartition, RecordOffset recordOffset,
+ Long timestamp,Schema keySchema, Object key, Schema schema,
+ Object data) {
+ this.position = new RecordPosition(recordPartition, recordOffset);
+ this.timestamp = timestamp;
+
+ // key
+ this.keySchema = keySchema;
+ this.key = key;
+
+ // value
+ this.schema = schema;
+ this.data = data;
+
+ }
+
public Long getTimestamp() {
return timestamp;
}
@@ -71,6 +97,22 @@ public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}
+ public Schema getKeySchema() {
+ return keySchema;
+ }
+
+ public void setKeySchema(Schema keySchema) {
+ this.keySchema = keySchema;
+ }
+
+ public Object getKey() {
+ return key;
+ }
+
+ public void setKey(Object key) {
+ this.key = key;
+ }
+
public Schema getSchema() {
return schema;
}
@@ -95,6 +137,18 @@ public void setExtensions(KeyValue extensions) {
this.extensions = extensions;
}
+ public RecordPosition getPosition() {
+ return position;
+ }
+
+ public void setPosition(RecordPosition position) {
+ this.position = position;
+ }
+
+ /**
+ * add extension by KeyValue
+ * @param extensions
+ */
public void addExtension(KeyValue extensions) {
if (this.extensions == null) {
this.extensions = new DefaultKeyValue();
@@ -105,6 +159,11 @@ public void addExtension(KeyValue extensions) {
}
}
+ /**
+ * add extension by key and value
+ * @param key
+ * @param value
+ */
public void addExtension(String key, String value) {
if (this.extensions == null) {
this.extensions = new DefaultKeyValue();
@@ -112,6 +171,11 @@ public void addExtension(String key, String value) {
this.extensions.put(key, value);
}
+ /**
+ * get extension value
+ * @param key
+ * @return
+ */
public String getExtension(String key) {
if (this.extensions == null) {
return null;
@@ -119,34 +183,29 @@ public String getExtension(String key) {
return this.extensions.getString(key);
}
- public RecordPosition getPosition() {
- return position;
- }
-
- public void setPosition(RecordPosition position) {
- this.position = position;
- }
-
- @Override public boolean equals(Object o) {
- if (this == o)
- return true;
- if (!(o instanceof ConnectRecord))
- return false;
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof ConnectRecord)) return false;
ConnectRecord that = (ConnectRecord) o;
- return Objects.equals(timestamp, that.timestamp) && Objects.equals(schema, that.schema) && Objects.equals(data, that.data) && Objects.equals(position, that.position) && Objects.equals(extensions, that.extensions);
+ return Objects.equals(timestamp, that.timestamp) && Objects.equals(keySchema, that.keySchema) && Objects.equals(key, that.key) && Objects.equals(schema, that.schema) && Objects.equals(data, that.data) && Objects.equals(position, that.position) && Objects.equals(extensions, that.extensions);
}
- @Override public int hashCode() {
- return Objects.hash(timestamp, schema, data, position, extensions);
+ @Override
+ public int hashCode() {
+ return Objects.hash(timestamp, keySchema, key, schema, data, position, extensions);
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "ConnectRecord{" +
- "timestamp=" + timestamp +
- ", schema=" + schema +
- ", data=" + data +
- ", position=" + position +
- ", extensions=" + extensions +
- '}';
+ "timestamp=" + timestamp +
+ ", keySchema=" + keySchema +
+ ", key=" + key +
+ ", schema=" + schema +
+ ", data=" + data +
+ ", position=" + position +
+ ", extensions=" + extensions +
+ '}';
}
}