Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: A new dockerfile for the connector, restructuring of the codebase, sorting of the imports #55

Merged
merged 4 commits into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/ISSUE_TEMPLATE/FEATURE-REQUEST.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ body:
attributes:
value: |
**Thanks :heart: for taking the time to fill out this feature request report!**
We kindly ask that you search to see if an issue [already exists](https://github.com/ibm-messaging/kafka-connect-mq-source/issues?q=is%3Aissue+sort%3Acreated-desc+) for your feature.
We are also happy to accept contributions from our users. For more details see [here](https://github.com/ibm-messaging/kafka-connect-mq-source/blob/master/CONTRIBUTING.md).
We kindly ask that you search to see if an issue [already exists](https://github.com/ibm-messaging/kafka-connect-jdbc-sink/issues?q=is%3Aissue+sort%3Acreated-desc+) for your feature.
We are also happy to accept contributions from our users. For more details see [here](https://github.com/ibm-messaging/kafka-connect-jdbc-sink/blob/main/CONTRIBUTING.md).
- type: textarea
attributes:
label: Description
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/checkstyle-and-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: Checkstyle and Tests
on:
pull_request:
branches:
- 'master'
- 'main'
types: [opened, synchronize, reopened]

jobs:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/github-build-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ name: Build and create release
jobs:
build:
name: Build and upload release binary
#if: github.event.base_ref == 'refs/heads/master' # only run if on master branch
#if: github.event.base_ref == 'refs/heads/main' # only run if on main branch
runs-on: ubuntu-latest
steps:
- name: Checkout code
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.ibm.eventstreams.connect</groupId>
<artifactId>kafka-connect-jdbc-sink</artifactId>
<version>0.0.3</version>
<version>1.0.0</version>
<name>kafka-connect-jdbc-sink</name>
<organization>
<name>IBM Corporation</name>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
*
* Copyright 2020 IBM Corporation
* Copyright 2020, 2023 IBM Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,11 +18,11 @@

package com.ibm.eventstreams.connect.jdbcsink;

import java.util.Map;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;

import java.util.Map;

public class JDBCSinkConfig extends AbstractConfig {

private static final String CONFIG_CONNECTION_GROUP = "Connection";
Expand All @@ -49,7 +49,7 @@ public class JDBCSinkConfig extends AbstractConfig {

public static final String CONFIG_NAME_INSERT_MODE_DATABASELEVEL = "insert.mode.databaselevel";
private static final String CONFIG_DOCUMENTATION_INSERT_MODE_DATABASELEVEL = "The insertion mode to use (ex: insert, upsert, or update).";
private static final String CONFIG_DISPLAY_INSERT_MODE_DATABASELEVEL = "Insert mode database level";
private static final String CONFIG_DISPLAY_INSERT_MODE_DATABASELEVEL = "Insert mode database level";

public static ConfigDef config() {
ConfigDef config = new ConfigDef();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
*
* Copyright 2020 IBM Corporation
* Copyright 2020, 2023 IBM Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,18 +18,18 @@

package com.ibm.eventstreams.connect.jdbcsink;

import java.util.Collections;
import java.util.List;
import java.util.Map;

import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;

import java.util.Collections;
import java.util.List;
import java.util.Map;

public class JDBCSinkConnector extends SinkConnector {
// TODO: check with ibm-messaging about externalizing snapshot version
public static String VERSION = "0.0.3-SNAPSHOT";
public static String VERSION = "1.0.0-SNAPSHOT";

private Map<String, String> props;

Expand All @@ -38,24 +38,28 @@ public class JDBCSinkConnector extends SinkConnector {
*
* @return the version, formatted as a String
*/
@Override public String version() {
@Override
public String version() {
return VERSION;
}

/**
* Start this Connector. This method will only be called on a clean Connector, i.e. it has
* either just been instantiated and initialized or {@link #stop()} has been invoked.
* Start this Connector. This method will only be called on a clean Connector,
* i.e. it has either just been instantiated and initialized or {@link #stop()}
* has been invoked.
*
* @param props configuration settings
*/
@Override public void start(Map<String, String> props) {
@Override
public void start(Map<String, String> props) {
this.props = props;
}

/**
* Returns the Task implementation for this Connector.
*/
@Override public Class<? extends Task> taskClass() {
@Override
public Class<? extends Task> taskClass() {
return JDBCSinkTask.class;
}

Expand All @@ -66,33 +70,39 @@ public class JDBCSinkConnector extends SinkConnector {
* @param maxTasks maximum number of configurations to generate
* @return configurations for Tasks
*/
@Override public List<Map<String, String>> taskConfigs(int maxTasks) {
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
return Collections.nCopies(maxTasks, props);
}

/**
* Stop this connector.
*/
@Override public void stop() {
@Override
public void stop() {

}

/**
* Define the configuration for the connector.
*
* @return The ConfigDef for this connector.
*/
@Override public ConfigDef config() {
@Override
public ConfigDef config() {
return JDBCSinkConfig.config();
}

/**
* Provides a default validation implementation which returns a list of allowed configurations
* together with configuration errors and recommended values for each configuration.
* Provides a default validation implementation which returns a list of allowed
* configurations together with configuration errors and recommended values for
* each configuration.
*
* @param connectorConfigs connector configuration values
* @return list of allowed configurations
*/
@Override public Config validate(Map<String, String> connectorConfigs) {
@Override
public Config validate(Map<String, String> connectorConfigs) {
return super.validate(connectorConfigs);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
*
* Copyright 2020 IBM Corporation
* Copyright 2020, 2023 IBM Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,75 +18,87 @@

package com.ibm.eventstreams.connect.jdbcsink;

import com.ibm.eventstreams.connect.jdbcsink.database.DatabaseFactory;
import com.ibm.eventstreams.connect.jdbcsink.database.IDatabase;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.Map;

import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.Map;
import com.ibm.eventstreams.connect.jdbcsink.database.DatabaseFactory;
import com.ibm.eventstreams.connect.jdbcsink.database.IDatabase;

public class JDBCSinkTask extends SinkTask {
private static final Logger logger = LoggerFactory.getLogger(JDBCSinkTask.class);
private static final String classname = JDBCSinkTask.class.getName();

// TODO: needs to be generic and incorporate other database types
// needs an interface
// needs an interface
private JDBCSinkConfig config;

public IDatabase database;

int remainingRetries; // init max retries via config.maxRetries ...

/**
* Start the Task. This should handle any configuration parsing and one-time setup of the task.
* Start the Task. This should handle any configuration parsing and one-time
* setup of the task.
*
* @param props initial configuration
*/
@Override public void start(Map<String, String> props) {
@Override
public void start(Map<String, String> props) {
logger.trace("[{}] Entry {}.start, props={}", Thread.currentThread().getId(), classname, props);
this.config = new JDBCSinkConfig(props);

DatabaseFactory databaseFactory = new DatabaseFactory();
DatabaseFactory databaseFactory = getDatabaseFactory();
try {
this.database = databaseFactory.makeDatabase(this.config);
} catch (Exception e) {
logger.error("Failed to build the database {} ", e);
e.printStackTrace();
throw e;
throw new ConnectException(e);
}

logger.trace("[{}] Exit {}.start", Thread.currentThread().getId(), classname);
}

protected DatabaseFactory getDatabaseFactory() {
DatabaseFactory databaseFactory = new DatabaseFactory();
return databaseFactory;
}

/**
* Put the records in the sink.
*
* If this operation fails, the SinkTask may throw a {@link org.apache.kafka.connect.errors.RetriableException} to
* indicate that the framework should attempt to retry the same call again. Other exceptions will cause the task to
* be stopped immediately. {@link SinkTaskContext#timeout(long)} can be used to set the maximum time before the
* batch will be retried.
* If this operation fails, the SinkTask may throw a
* {@link org.apache.kafka.connect.errors.RetriableException} to indicate that
* the framework should attempt to retry the same call again. Other exceptions
* will cause the task to be stopped immediately.
* {@link SinkTaskContext#timeout(long)} can be used to set the maximum time
* before the batch will be retried.
*
* @param records the set of records to send
*/
@Override public void put(Collection<SinkRecord> records) {
@Override
public void put(Collection<SinkRecord> records) {
logger.trace("[{}] Entry {}.put", Thread.currentThread().getId(), classname);
if (records.isEmpty()) {
return;
}

final SinkRecord first = records.iterator().next();
final int recordsCount = records.size();
logger.info("Received {} records. First record kafka coordinates:({}-{}-{}). Writing them to the database...",
recordsCount, first.topic(), first.kafkaPartition(), first.kafkaOffset()
);
recordsCount, first.topic(), first.kafkaPartition(), first.kafkaOffset());

final String tableName = config.getString(JDBCSinkConfig.CONFIG_NAME_TABLE_NAME_FORMAT);

Expand All @@ -96,28 +108,37 @@ public class JDBCSinkTask extends SinkTask {
this.database.getWriter().insert(tableName, records);
logger.info(String.format("%d RECORDS PROCESSED", records.size()));
Instant finish = Instant.now();
long timeElapsed = Duration.between(start, finish).toMillis(); //in millis
logger.info(String.format("Processed '%d' records", records.size() ));
long timeElapsed = Duration.between(start, finish).toMillis(); // in millis
logger.info(String.format("Processed '%d' records", records.size()));
logger.info(String.format("Total Execution time: %d", timeElapsed));
} catch (SQLException error) {
logger.error("Write of {} records failed, remainingRetries={}", recordsCount, remainingRetries, error);
// TODO: throw exception to cancel execution or retry?
throw new ConnectException(error);
} catch (final RuntimeException e) {
logger.error("Unexpected runtime exception: ", e);
throw e;
}

logger.trace("[{}] Exit {}.put", Thread.currentThread().getId(), classname);
}

@Override public void stop() {
@Override
public void stop() {
}

@Override public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
@Override
public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
// Not necessary
}

/**
* Get the version of this task. Usually this should be the same as the corresponding {@link Connector} class's version.
* Get the version of this task. Usually this should be the same as the
* corresponding {@link Connector} class's version.
*
* @return the version, formatted as a String
*/
@Override public String version() {
@Override
public String version() {
return getClass().getPackage().getImplementationVersion();
}
}
Loading
Loading