diff --git a/connector/src/main/java/io/openmessaging/connector/api/component/connector/Connector.java b/connector/src/main/java/io/openmessaging/connector/api/component/connector/Connector.java index cdde4a1..1c6fb2a 100644 --- a/connector/src/main/java/io/openmessaging/connector/api/component/connector/Connector.java +++ b/connector/src/main/java/io/openmessaging/connector/api/component/connector/Connector.java @@ -28,7 +28,7 @@ public void init(ConnectorContext connectorContext) { this.connectorContext = connectorContext; } - protected ConnectorContext getConnectorContext() { + protected ConnectorContext context() { return connectorContext; } diff --git a/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkConnector.java b/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkConnector.java index a1a5f60..018ee3f 100644 --- a/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkConnector.java +++ b/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkConnector.java @@ -21,7 +21,9 @@ */ public abstract class SinkConnector extends Connector { - public SinkConnector() { - super(); + + @Override + protected SinkConnectorContext context() { + return (SinkConnectorContext) connectorContext; } } diff --git a/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkConnectorContext.java b/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkConnectorContext.java new file mode 100644 index 0000000..ef26c4f --- /dev/null +++ b/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkConnectorContext.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.connector.api.component.task.sink; + + +import io.openmessaging.connector.api.component.connector.ConnectorContext; + +/** + * source connector context + */ +public interface SinkConnectorContext extends ConnectorContext { + +} diff --git a/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkRecord.java b/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkRecord.java new file mode 100644 index 0000000..4e7f218 --- /dev/null +++ b/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkRecord.java @@ -0,0 +1,115 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.connector.api.component.task.sink; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.data.ConnectRecord; +import io.openmessaging.connector.api.data.Schema; + +import java.util.Objects; + +/** + * sink connect record + */ +public class SinkRecord extends ConnectRecord { + + private final Integer queueId; + private final String brokerName; + private final long queueOffset; + + public SinkRecord(String brokerName, long queueOffset,String topic, Integer queueId, Schema schema, Object data) { + this(brokerName, queueOffset, topic, queueId, null, null, schema, data ,null ); + } + + public SinkRecord(String brokerName, long queueOffset, String topic, Integer queueId, Schema keySchema, Object key, Schema schema, Object data) { + this(brokerName, queueOffset, topic, queueId, null, keySchema, key, schema, data ,null ); + } + + public SinkRecord(String brokerName, long queueOffset, String topic, Integer queueId, Schema keySchema, Object key, Schema schema, Object data, KeyValue extensions) { + this(brokerName, queueOffset, topic, queueId, null, keySchema, key, schema, data ,extensions ); + } + + public SinkRecord(String brokerName, long queueOffset, String topic, Integer queueId, Long timestamp, Schema keySchema, Object key, Schema schema, Object data, KeyValue extensions) { + super(topic, timestamp, keySchema, key, schema, data, extensions); + this.brokerName = brokerName; + this.queueOffset = queueOffset; + this.queueId = queueId; + } + + + public Integer queueId(){ + return queueId; + } + + public String brokerName(){ + return brokerName; + } + + public long queueOffset(){ + return queueOffset; + } + + /** + * new record + * + * @param topic + * @param schema + * @param data + * @param timestamp + * @return + */ + @Override + public SinkRecord newRecord(String topic, Long timestamp, Schema keySchema, Object key,Schema schema, Object data) { + return newRecord(topic, timestamp, keySchema, key, schema, data, null); + } + + /** + * new record + * + * @param topic + * @param schema + * @param data + * @param timestamp + * @param extensions + * @return + */ + @Override + public SinkRecord newRecord(String topic, Long timestamp, Schema keySchema, Object key, Schema schema, Object data, KeyValue extensions) { + return new SinkRecord(brokerName(), queueOffset(), topic, queueId(), timestamp, keySchema, key, schema, data, extensions); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof SinkRecord)) return false; + if (!super.equals(o)) return false; + SinkRecord that = (SinkRecord) o; + return queueOffset == that.queueOffset && Objects.equals(queueId, that.queueId) && Objects.equals(brokerName, that.brokerName); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), queueId, brokerName, queueOffset); + } + + @Override + public String toString() { + return "SinkRecord{" + + "queueId=" + queueId + + ", brokerName='" + brokerName + '\'' + + ", queueOffset=" + queueOffset + + "} " + super.toString(); + } +} diff --git a/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkTask.java b/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkTask.java index ea8d8ff..69c5b92 100644 --- a/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkTask.java +++ b/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkTask.java @@ -37,9 +37,9 @@ public void init(SinkTaskContext sinkTaskContext) { /** * Put the records to the sink * - * @param sinkRecords sink records + * @param records sink records */ - public abstract void put(List sinkRecords) throws ConnectException; + public abstract void put(List records) throws ConnectException; /** * Flush the records to the sink diff --git a/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceConnector.java b/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceConnector.java index d9e4a35..95a5267 100644 --- a/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceConnector.java +++ b/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceConnector.java @@ -21,4 +21,8 @@ */ public abstract class SourceConnector extends Connector { + @Override + protected SourceConnectorContext context() { + return (SourceConnectorContext) connectorContext; + } } diff --git a/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceConnectorContext.java b/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceConnectorContext.java new file mode 100644 index 0000000..9b2dcfe --- /dev/null +++ b/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceConnectorContext.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.connector.api.component.task.source; + + +import io.openmessaging.connector.api.component.connector.ConnectorContext; +import io.openmessaging.connector.api.storage.OffsetStorageReader; + +/** + * source connector context + */ +public interface SourceConnectorContext extends ConnectorContext { + + /** + * @return the OffsetStorageReader for this connector. + */ + OffsetStorageReader offsetStorageReader(); +} diff --git a/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceRecord.java b/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceRecord.java new file mode 100644 index 0000000..711df08 --- /dev/null +++ b/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceRecord.java @@ -0,0 +1,103 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.connector.api.component.task.source; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.data.ConnectRecord; +import io.openmessaging.connector.api.data.RecordOffset; +import io.openmessaging.connector.api.data.RecordPartition; +import io.openmessaging.connector.api.data.RecordPosition; +import io.openmessaging.connector.api.data.Schema; + +import java.util.Objects; + +/** + * source connect record + */ +public class SourceRecord extends ConnectRecord { + + private final RecordPosition position; + + public SourceRecord(RecordPartition recordPartition, RecordOffset recordOffset,String topic, Schema schema, Object data) { + this(recordPartition, recordOffset, topic, null, null, null, schema , data, null); + } + + public SourceRecord(RecordPartition recordPartition, RecordOffset recordOffset,String topic, Schema keySchema, Object key, Schema schema, Object data) { + this(recordPartition, recordOffset, topic, keySchema, key, schema , data, null); + } + + public SourceRecord(RecordPartition recordPartition, RecordOffset recordOffset, String topic, Schema keySchema, Object key, Schema valueSchema, Object value, KeyValue extensions) { + this(recordPartition, recordOffset, topic, null, keySchema, key, valueSchema, value, extensions); + } + + public SourceRecord(RecordPartition recordPartition, RecordOffset recordOffset, String topic, Long timestamp, Schema keySchema, Object key, Schema schema, Object data, KeyValue extensions) { + super(topic, timestamp, keySchema, key, schema, data, extensions); + this.position = new RecordPosition(recordPartition, recordOffset); + } + + public RecordPosition position() { + return position; + } + + /** + * new record + * + * @param topic + * @param schema + * @param data + * @param timestamp + * @return + */ + @Override + public SourceRecord newRecord(String topic, Long timestamp, Schema keySchema, Object key,Schema schema, Object data) { + return newRecord(topic, timestamp, keySchema, key, schema , data, null ); + } + + /** + * new record + * + * @param topic + * @param schema + * @param data + * @param timestamp + * @param extensions + * @return + */ + @Override + public SourceRecord newRecord(String topic, Long timestamp, Schema keySchema, Object key,Schema schema, Object data, KeyValue extensions) { + return new SourceRecord(position().getPartition(), position().getOffset(), topic, timestamp, keySchema, key, schema, data, extensions); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof SourceRecord)) return false; + if (!super.equals(o)) return false; + SourceRecord that = (SourceRecord) o; + return Objects.equals(position, that.position); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), position); + } + + @Override + public String toString() { + return "SourceRecord{" + + "position=" + position + + "} " + super.toString(); + } +} diff --git a/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceTask.java b/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceTask.java index 2ea4ce1..8f05e4c 100644 --- a/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceTask.java +++ b/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceTask.java @@ -35,41 +35,25 @@ public void init(SourceTaskContext sourceTaskContext) { /** * Poll this source task for new records. * - * @return connectRecord list + * @return SourceRecord list * @throws InterruptedException task thread interupt exception */ - public abstract List poll() throws InterruptedException; - + public abstract List poll() throws InterruptedException; /** * batch commit * @param records * @param metadata */ - public void commit(final List records, Map metadata) { + public void commit(final List records, Map metadata) throws InterruptedException { } /** * commit record * @param record * @param metadata */ - public void commit(final ConnectRecord record, Map metadata) { - commit(record); - } + public void commit(final SourceRecord record, Map metadata) throws InterruptedException { - /** - *

- * Commit an individual {@link ConnectRecord} when the callback from the producer client is received. - *

- *

- * SourceTasks are not required to implement this functionality;Connect System will record offsets - * automatically. This hook is provided for systems that also need to store offsets internally - * in their own system. - *

- * - * @param record connect record - */ - public void commit(final ConnectRecord record) { } /** diff --git a/connector/src/main/java/io/openmessaging/connector/api/data/ConnectRecord.java b/connector/src/main/java/io/openmessaging/connector/api/data/ConnectRecord.java index 67f8411..5ad0cb3 100644 --- a/connector/src/main/java/io/openmessaging/connector/api/data/ConnectRecord.java +++ b/connector/src/main/java/io/openmessaging/connector/api/data/ConnectRecord.java @@ -22,7 +22,11 @@ /** * SourceDataEntries are generated by SourceTasks and passed to specific message queue to store. */ -public class ConnectRecord { +public abstract class ConnectRecord> { + /** + * Topic of the data entry. + */ + private String topic; /** * Timestamp of the data entry. @@ -49,36 +53,26 @@ public class ConnectRecord { */ private Object data; - /** - * Position of record - */ - private RecordPosition position; - /** * Extension properties */ private KeyValue extensions; - public ConnectRecord(RecordPartition recordPartition, RecordOffset recordOffset, - Long timestamp) { - this(recordPartition, recordOffset, timestamp, null, null); + public ConnectRecord(String topic, Long timestamp, Schema schema, Object data) { + this(topic, timestamp, schema, data, null); } - public ConnectRecord(RecordPartition recordPartition, RecordOffset recordOffset, - Long timestamp, Schema schema, - Object data) { - this.position = new RecordPosition(recordPartition, recordOffset); - this.schema = schema; - this.timestamp = timestamp; - this.data = data; + public ConnectRecord(String topic, Long timestamp, Schema schema, Object data, KeyValue extensions) { + this(topic, timestamp, null, null, schema, data, extensions); } - public ConnectRecord(RecordPartition recordPartition, RecordOffset recordOffset, - Long timestamp,Schema keySchema, Object key, Schema schema, - Object data) { - this.position = new RecordPosition(recordPartition, recordOffset); - this.timestamp = timestamp; + public ConnectRecord(String topic, Long timestamp,Schema keySchema, Object key, Schema schema, Object data) { + this(topic, timestamp, keySchema, key, schema, data, null); + } + public ConnectRecord(String topic, Long timestamp, Schema keySchema, Object key, Schema schema, Object data , KeyValue extensions) { + this.topic = topic; + this.timestamp = timestamp; // key this.keySchema = keySchema; this.key = key; @@ -86,9 +80,42 @@ public ConnectRecord(RecordPartition recordPartition, RecordOffset recordOffset, // value this.schema = schema; this.data = data; + // extension + this.extensions = extensions; } + + /** + * new record + * @param topic + * @param schema + * @param data + * @param timestamp + * @return + */ + public abstract R newRecord(String topic, Long timestamp, Schema keySchema, Object key,Schema schema, Object data); + + /** + * new record + * @param topic + * @param schema + * @param data + * @param timestamp + * @param extensions + * @return + */ + public abstract R newRecord(String topic, Long timestamp, Schema keySchema, Object key,Schema schema, Object data, KeyValue extensions); + + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + public Long getTimestamp() { return timestamp; } @@ -137,14 +164,6 @@ public void setExtensions(KeyValue extensions) { this.extensions = extensions; } - public RecordPosition getPosition() { - return position; - } - - public void setPosition(RecordPosition position) { - this.position = position; - } - /** * add extension by KeyValue * @param extensions @@ -187,24 +206,24 @@ public String getExtension(String key) { public boolean equals(Object o) { if (this == o) return true; if (!(o instanceof ConnectRecord)) return false; - ConnectRecord that = (ConnectRecord) o; - return Objects.equals(timestamp, that.timestamp) && Objects.equals(keySchema, that.keySchema) && Objects.equals(key, that.key) && Objects.equals(schema, that.schema) && Objects.equals(data, that.data) && Objects.equals(position, that.position) && Objects.equals(extensions, that.extensions); + ConnectRecord that = (ConnectRecord) o; + return Objects.equals(topic, that.topic) && Objects.equals(timestamp, that.timestamp) && Objects.equals(keySchema, that.keySchema) && Objects.equals(key, that.key) && Objects.equals(schema, that.schema) && Objects.equals(data, that.data) && Objects.equals(extensions, that.extensions); } @Override public int hashCode() { - return Objects.hash(timestamp, keySchema, key, schema, data, position, extensions); + return Objects.hash(topic, timestamp, keySchema, key, schema, data, extensions); } @Override public String toString() { return "ConnectRecord{" + - "timestamp=" + timestamp + + "topic='" + topic + '\'' + + ", timestamp=" + timestamp + ", keySchema=" + keySchema + ", key=" + key + ", schema=" + schema + ", data=" + data + - ", position=" + position + ", extensions=" + extensions + '}'; }