Skip to content

Commit

Permalink
upgraded_kafka_client_version_to_3.9.0
Browse files Browse the repository at this point in the history
upgraded_kafka_client_version_to_2.8.2

undo commented code

removed zkclient

Upgraded kafka version to 3.7.1

upgraded to 3.9.0

updated version to 1.54-SNAPSHOT

Prepare 1.54 development release iteration
  • Loading branch information
adkharat committed Dec 18, 2024
1 parent 95ca2c5 commit 74cb554
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 62 deletions.
7 changes: 2 additions & 5 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ ext.versions = [
cassandra : '3.4.0',
commons_cli : '1.3.1',
thrift : '0.9.3',
kafka : '0.11.0.2',
zkclient : '0.10'
kafka : '3.9.0'
]

ext.libraries = [
Expand Down Expand Up @@ -102,8 +101,6 @@ ext.libraries = [
commons_cli : "commons-cli:commons-cli:${versions.commons_cli}",
thrift : "org.apache.thrift:libthrift:${versions.thrift}",
kafka_clients : "org.apache.kafka:kafka-clients:${versions.kafka}",
kafka : "org.apache.kafka:kafka_2.12:${versions.kafka}",
zkclient : "com.101tec:zkclient:${versions.zkclient}"
]

ext.tempto_core = project(':tempto-core')
Expand All @@ -113,7 +110,7 @@ ext.tempto_runner = project(':tempto-runner')
ext.tempto_ldap = project(':tempto-ldap')
ext.tempto_kafka = project(':tempto-kafka')
ext.expected_result_generator = project(':expected-result-generator')
ext.tempto_version = '1.53'
ext.tempto_version = '1.54-SNAPSHOT'
ext.tempto_group = "io.prestodb.tempto"
ext.isReleaseVersion = !tempto_version.endsWith("SNAPSHOT")

Expand Down
1 change: 1 addition & 0 deletions tempto-examples/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ jar {

shadowJar {
version = ''
zip64 = true
}

build.dependsOn.add([shadowJar])
Expand Down
3 changes: 1 addition & 2 deletions tempto-kafka/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ apply plugin: 'java'

dependencies {
compile tempto_core
compile libraries.kafka
compile libraries.zkclient
compile libraries.kafka_clients
}

// project information
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,56 +24,54 @@
import io.prestodb.tempto.internal.fulfillment.table.TableName;
import io.prestodb.tempto.query.QueryExecutor;
import io.prestodb.tempto.query.QueryResult;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;

import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;

import java.util.Collections;
import java.util.Iterator;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;

import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static org.slf4j.LoggerFactory.getLogger;


@TableManager.Descriptor(tableDefinitionClass = KafkaTableDefinition.class, type = "KAFKA")
@Singleton
public class KafkaTableManager
implements TableManager<KafkaTableDefinition>
{
private static final Logger LOGGER = getLogger(KafkaTableManager.class);
private final String databaseName;
private final QueryExecutor prestoQueryExecutor;
private final String brokerHost;
private final Integer brokerPort;
private final String prestoKafkaCatalog;
private final String zookeeperHost;
private final Integer zookeeperPort;

@Inject
public KafkaTableManager(
@Named("databaseName") String databaseName,
@Named("broker.host") String brokerHost,
@Named("broker.port") int brokerPort,
@Named("zookeeper.host") String zookeeperHost,
@Named("zookeeper.port") int zookeeperPort,
@Named("presto_database_name") String prestoDatabaseName,
@Named("presto_kafka_catalog") String prestoKafkaCatalog,
Injector injector)
{
this.databaseName = requireNonNull(databaseName, "databaseName is null");
this.brokerHost = requireNonNull(brokerHost, "brokerHost is null");
this.brokerPort = brokerPort;
this.zookeeperHost = requireNonNull(zookeeperHost, "zookeeperHost is null");
this.zookeeperPort = zookeeperPort;
requireNonNull(injector, "injector is null");
requireNonNull(prestoDatabaseName, "prestoDatabaseName is null");
this.prestoQueryExecutor = injector.getInstance(Key.get(QueryExecutor.class, Names.named(prestoDatabaseName)));
Expand All @@ -95,6 +93,18 @@ public TableInstance<KafkaTableDefinition> createImmutable(KafkaTableDefinition
return new KafkaTableInstance(createdTableName, tableDefinition);
}

private void withKafkaAdminClient(Consumer<AdminClient> routine) {
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerHost + ":" + brokerPort);
try (AdminClient adminClient = AdminClient.create(properties)) {
routine.accept(adminClient);
} catch (Exception e) {
LOGGER.error("An error occurred while performing an operation with Kafka AdminClient on broker "
+ brokerHost + ":" + brokerPort);
throw new RuntimeException("An error occurred while performing an operation with Kafka AdminClient", e);
}
}

private void verifyTableExistsInPresto(String schema, String name)
{
String sql = format("select count(1) from %s.information_schema.tables where table_schema='%s' and table_name='%s'", prestoKafkaCatalog, schema, name);
Expand All @@ -104,34 +114,25 @@ private void verifyTableExistsInPresto(String schema, String name)
}
}

private void deleteTopic(String topic)
{
withZookeeper(zkUtils -> {
if (AdminUtils.topicExists(zkUtils, topic)) {
AdminUtils.deleteTopic(zkUtils, topic);

for (int checkTry = 0; checkTry < 5; ++checkTry) {
if (!AdminUtils.topicExists(zkUtils, topic)) {
return;
}
try {
Thread.sleep(1_000);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("could not delete topic " + topic);
}
}
throw new RuntimeException("could not delete topic " + topic);
private void deleteTopic(String topic) {
withKafkaAdminClient(adminClient -> {
try {
DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Collections.singletonList(topic));
deleteTopicsResult.all().get();
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException("Failed to delete topic " + topic, e);
}
});
}

private void createTopic(String topic, int partitionsCount, int replicationLevel)
{
withZookeeper(zkUtils -> {
Properties topicConfiguration = new Properties();
AdminUtils.createTopic(zkUtils, topic, partitionsCount, replicationLevel, topicConfiguration, RackAwareMode.Disabled$.MODULE$);
private void createTopic(String topic, int partitionsCount, int replicationLevel) {
withKafkaAdminClient(adminClient -> {
NewTopic newTopic = new NewTopic(topic, partitionsCount, (short) replicationLevel);
try {
adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException("Failed to create topic " + topic, e);
}
});
}

Expand Down Expand Up @@ -164,25 +165,6 @@ private void insertDataIntoTopic(String topic, KafkaDataSource dataSource)
}
}

private void withZookeeper(Consumer<ZkUtils> routine)
{
int sessionTimeOutInMs = 15_000;
int connectionTimeOutInMs = 10_000;
String zookeeperHosts = zookeeperHost + ":" + zookeeperPort;

ZkClient zkClient = new ZkClient(zookeeperHosts,
sessionTimeOutInMs,
connectionTimeOutInMs,
ZKStringSerializer$.MODULE$);
try {
ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false);
routine.accept(zkUtils);
}
finally {
zkClient.close();
}
}

@Override
public TableInstance<KafkaTableDefinition> createMutable(KafkaTableDefinition tableDefinition, MutableTableRequirement.State state, TableHandle tableHandle)
{
Expand Down

0 comments on commit 74cb554

Please sign in to comment.