Skip to content

Commit

Permalink
Optimize ConnectRecord api openmessaging#47
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Jun 22, 2022
1 parent ed258b0 commit 5de17b2
Show file tree
Hide file tree
Showing 3 changed files with 269 additions and 37 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package io.openmessaging.connector.api.component.task.sink;

import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.data.ConnectRecord;
import io.openmessaging.connector.api.data.Schema;

import java.util.Objects;

/**
* sink connect record
*/
public class SinkRecord extends ConnectRecord<SinkRecord> {

private final String brokerName;
private final long queueOffset;

public SinkRecord(String brokerName, long queueOffset,String topic, Integer queueId, Schema schema, Object data) {
this(brokerName, queueOffset, topic, queueId, null, schema, data ,null );
}

public SinkRecord(String brokerName, long queueOffset,String topic, Integer queueId, Schema schema, Object data, KeyValue extensions) {
this(brokerName, queueOffset, topic, queueId, null, schema, data ,extensions );
}

public SinkRecord(String brokerName, long queueOffset,String topic, Integer queueId, Long timestamp, Schema schema, Object data) {
this(brokerName, queueOffset, topic, queueId, timestamp, schema, data ,null );
}

public SinkRecord(String brokerName, long queueOffset, String topic, Integer queueId, Long timestamp, Schema schema, Object data, KeyValue extensions) {
super(topic, queueId, timestamp, schema, data, extensions);
this.brokerName = brokerName;
this.queueOffset = queueOffset;
}

public String brokerName(){
return brokerName;
}

public long queueOffset(){
return queueOffset;
}

/**
* new record
*
* @param topic
* @param queueId
* @param schema
* @param data
* @param timestamp
* @return
*/
@Override
public SinkRecord newRecord(String topic, Integer queueId, Schema schema, Object data, Long timestamp) {
return newRecord(topic,queueId,schema,data,timestamp, null);
}

/**
* new record
*
* @param topic
* @param queueId
* @param schema
* @param data
* @param timestamp
* @param extensions
* @return
*/
@Override
public SinkRecord newRecord(String topic, Integer queueId, Schema schema, Object data, Long timestamp, KeyValue extensions) {
return new SinkRecord(brokerName(), queueOffset(), topic, queueId, timestamp, schema, data, extensions);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof SinkRecord)) return false;
if (!super.equals(o)) return false;
SinkRecord that = (SinkRecord) o;
return queueOffset == that.queueOffset && Objects.equals(brokerName, that.brokerName);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), brokerName, queueOffset);
}

@Override
public String toString() {
return "SinkRecord{" +
"brokerName='" + brokerName + '\'' +
", queueOffset=" + queueOffset +
"} " + super.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package io.openmessaging.connector.api.component.task.source;

import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.data.ConnectRecord;
import io.openmessaging.connector.api.data.RecordOffset;
import io.openmessaging.connector.api.data.RecordPartition;
import io.openmessaging.connector.api.data.RecordPosition;
import io.openmessaging.connector.api.data.Schema;

import java.util.Objects;

/**
* source connect record
*/
public class SourceRecord extends ConnectRecord<SourceRecord> {

private final RecordPosition position;

public SourceRecord(RecordPartition recordPartition, RecordOffset recordOffset, String topic, Schema schema, Object data) {
this(recordPartition, recordOffset, topic, null , null,schema , data, null);
}

public SourceRecord(RecordPartition recordPartition, RecordOffset recordOffset, String topic, Schema schema, Object data, KeyValue extensions) {
this(recordPartition, recordOffset, topic, null , null,schema , data, extensions);
}

public SourceRecord(RecordPartition recordPartition, RecordOffset recordOffset,String topic, Integer queueId, Schema schema, Object data) {
this(recordPartition, recordOffset, topic, queueId, null, schema , data, null);
}

public SourceRecord(RecordPartition recordPartition, RecordOffset recordOffset,String topic, Integer queueId, Schema schema, Object data, KeyValue extensions) {
this(recordPartition, recordOffset, topic, queueId, null, schema , data, extensions);
}

public SourceRecord(RecordPartition recordPartition, RecordOffset recordOffset,String topic, Integer queueId, Long timestamp, Schema schema, Object data) {
this(recordPartition, recordOffset, topic, queueId, timestamp, schema , data, null);
}

public SourceRecord(RecordPartition recordPartition, RecordOffset recordOffset, String topic, Integer queueId, Long timestamp, Schema schema, Object data, KeyValue extensions) {
super(topic, queueId, timestamp, schema, data, extensions);
this.position = new RecordPosition(recordPartition, recordOffset);
}

public RecordPosition position() {
return position;
}

/**
* new record
*
* @param topic
* @param queueId
* @param schema
* @param data
* @param timestamp
* @return
*/
@Override
public SourceRecord newRecord(String topic, Integer queueId, Schema schema, Object data, Long timestamp) {
return newRecord(topic, queueId, schema , data, timestamp, null );
}

/**
* new record
*
* @param topic
* @param queueId
* @param schema
* @param data
* @param timestamp
* @param extensions
* @return
*/
@Override
public SourceRecord newRecord(String topic, Integer queueId, Schema schema, Object data, Long timestamp, KeyValue extensions) {
return new SourceRecord(position().getPartition(), position().getOffset(), topic, queueId, timestamp, schema, data, extensions);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof SourceRecord)) return false;
if (!super.equals(o)) return false;
SourceRecord that = (SourceRecord) o;
return Objects.equals(position, that.position);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), position);
}

@Override
public String toString() {
return "SourceRecord{" +
"position=" + position +
"} " + super.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,16 @@
/**
* SourceDataEntries are generated by SourceTasks and passed to specific message queue to store.
*/
public class ConnectRecord {
public abstract class ConnectRecord<R extends ConnectRecord<R>> {
/**
* Topic of the data entry.
*/
private String topic;

/**
* queueId of the entry
*/
private Integer queueId;

/**
* Timestamp of the data entry.
Expand All @@ -39,28 +48,63 @@ public class ConnectRecord {
*/
private Object data;

/**
* Position of record
*/
private RecordPosition position;

/**
* Extension properties
*/
private KeyValue extensions;

public ConnectRecord(RecordPartition recordPartition, RecordOffset recordOffset,
Long timestamp) {
this(recordPartition, recordOffset, timestamp, null, null);
public ConnectRecord(String topic, Integer queueId, Long timestamp, Schema schema, Object data) {
this(topic, queueId, timestamp, schema, data, null);
}

public ConnectRecord(RecordPartition recordPartition, RecordOffset recordOffset,
Long timestamp, Schema schema,
Object data) {
this.position = new RecordPosition(recordPartition, recordOffset);
public ConnectRecord(String topic, Integer queueId, Long timestamp, Schema schema, Object data,KeyValue extensions) {
this.topic = topic;
this.queueId = queueId;
this.schema = schema;
this.timestamp = timestamp;
this.data = data;
this.extensions = extensions;
}


/**
* new record
* @param topic
* @param queueId
* @param schema
* @param data
* @param timestamp
* @return
*/
public abstract R newRecord(String topic, Integer queueId, Schema schema, Object data, Long timestamp);

/**
* new record
* @param topic
* @param queueId
* @param schema
* @param data
* @param timestamp
* @param extensions
* @return
*/
public abstract R newRecord(String topic, Integer queueId, Schema schema, Object data, Long timestamp, KeyValue extensions);


public String getTopic() {
return topic;
}

public void setTopic(String topic) {
this.topic = topic;
}

public Integer getQueueId() {
return queueId;
}

public void setQueueId(Integer queueId) {
this.queueId = queueId;
}

public Long getTimestamp() {
Expand Down Expand Up @@ -119,34 +163,28 @@ public String getExtension(String key) {
return this.extensions.getString(key);
}

public RecordPosition getPosition() {
return position;
}

public void setPosition(RecordPosition position) {
this.position = position;
}

@Override public boolean equals(Object o) {
if (this == o)
return true;
if (!(o instanceof ConnectRecord))
return false;
ConnectRecord that = (ConnectRecord) o;
return Objects.equals(timestamp, that.timestamp) && Objects.equals(schema, that.schema) && Objects.equals(data, that.data) && Objects.equals(position, that.position) && Objects.equals(extensions, that.extensions);
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof ConnectRecord)) return false;
ConnectRecord record = (ConnectRecord) o;
return topic.equals(record.topic) && queueId.equals(record.queueId) && getTimestamp().equals(record.getTimestamp()) && getSchema().equals(record.getSchema()) && getData().equals(record.getData()) && getExtensions().equals(record.getExtensions());
}

@Override public int hashCode() {
return Objects.hash(timestamp, schema, data, position, extensions);
@Override
public int hashCode() {
return Objects.hash(topic, queueId, getTimestamp(), getSchema(), getData(), getExtensions());
}

@Override public String toString() {
@Override
public String toString() {
return "ConnectRecord{" +
"timestamp=" + timestamp +
", schema=" + schema +
", data=" + data +
", position=" + position +
", extensions=" + extensions +
'}';
"topic='" + topic + '\'' +
", queueId=" + queueId +
", timestamp=" + timestamp +
", schema=" + schema +
", data=" + data +
", extensions=" + extensions +
'}';
}
}

0 comments on commit 5de17b2

Please sign in to comment.