From ba6b9dcd4a6c4dab221b999a2b54073deb451d79 Mon Sep 17 00:00:00 2001 From: ajay-kharat Date: Wed, 27 Nov 2024 10:11:33 +0530 Subject: [PATCH] Prepare 1.54 development release iteration upgraded_kafka_client_version_to_3.9.0 upgraded_kafka_client_version_to_2.8.2 undo commented code removed zkclient Upgraded kafka version to 3.7.1 upgraded to 3.9.0 updated version to 1.54-SNAPSHOT Prepare 1.54 development release iteration --- build.gradle | 7 +- tempto-examples/build.gradle | 1 + tempto-kafka/build.gradle | 3 +- .../table/kafka/KafkaTableManager.java | 92 ++++++++----------- 4 files changed, 41 insertions(+), 62 deletions(-) diff --git a/build.gradle b/build.gradle index 75c23571..1e56e48d 100644 --- a/build.gradle +++ b/build.gradle @@ -57,8 +57,7 @@ ext.versions = [ cassandra : '3.4.0', commons_cli : '1.3.1', thrift : '0.9.3', - kafka : '0.11.0.2', - zkclient : '0.10' + kafka : '3.9.0' ] ext.libraries = [ @@ -102,8 +101,6 @@ ext.libraries = [ commons_cli : "commons-cli:commons-cli:${versions.commons_cli}", thrift : "org.apache.thrift:libthrift:${versions.thrift}", kafka_clients : "org.apache.kafka:kafka-clients:${versions.kafka}", - kafka : "org.apache.kafka:kafka_2.12:${versions.kafka}", - zkclient : "com.101tec:zkclient:${versions.zkclient}" ] ext.tempto_core = project(':tempto-core') @@ -113,7 +110,7 @@ ext.tempto_runner = project(':tempto-runner') ext.tempto_ldap = project(':tempto-ldap') ext.tempto_kafka = project(':tempto-kafka') ext.expected_result_generator = project(':expected-result-generator') -ext.tempto_version = '1.53' +ext.tempto_version = '1.54-SNAPSHOT' ext.tempto_group = "io.prestodb.tempto" ext.isReleaseVersion = !tempto_version.endsWith("SNAPSHOT") diff --git a/tempto-examples/build.gradle b/tempto-examples/build.gradle index 87384a97..6c236080 100644 --- a/tempto-examples/build.gradle +++ b/tempto-examples/build.gradle @@ -41,6 +41,7 @@ jar { shadowJar { version = '' + zip64 = true } build.dependsOn.add([shadowJar]) diff --git a/tempto-kafka/build.gradle b/tempto-kafka/build.gradle index c465aa72..4815ebe8 100644 --- a/tempto-kafka/build.gradle +++ b/tempto-kafka/build.gradle @@ -16,8 +16,7 @@ apply plugin: 'java' dependencies { compile tempto_core - compile libraries.kafka - compile libraries.zkclient + compile libraries.kafka_clients } // project information diff --git a/tempto-kafka/src/main/java/io/prestodb/tempto/fulfillment/table/kafka/KafkaTableManager.java b/tempto-kafka/src/main/java/io/prestodb/tempto/fulfillment/table/kafka/KafkaTableManager.java index d062f411..8fc42e8e 100644 --- a/tempto-kafka/src/main/java/io/prestodb/tempto/fulfillment/table/kafka/KafkaTableManager.java +++ b/tempto-kafka/src/main/java/io/prestodb/tempto/fulfillment/table/kafka/KafkaTableManager.java @@ -24,47 +24,47 @@ import io.prestodb.tempto.internal.fulfillment.table.TableName; import io.prestodb.tempto.query.QueryExecutor; import io.prestodb.tempto.query.QueryResult; -import kafka.admin.AdminUtils; -import kafka.admin.RackAwareMode; -import kafka.utils.ZKStringSerializer$; -import kafka.utils.ZkUtils; -import org.I0Itec.zkclient.ZkClient; -import org.I0Itec.zkclient.ZkConnection; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.DeleteTopicsResult; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.slf4j.Logger; import javax.inject.Inject; import javax.inject.Named; import javax.inject.Singleton; +import java.util.Collections; import java.util.Iterator; import java.util.Properties; +import java.util.concurrent.ExecutionException; import java.util.function.Consumer; import static java.lang.String.format; import static java.util.Objects.requireNonNull; +import static org.slf4j.LoggerFactory.getLogger; + @TableManager.Descriptor(tableDefinitionClass = KafkaTableDefinition.class, type = "KAFKA") @Singleton public class KafkaTableManager implements TableManager { + private static final Logger LOGGER = getLogger(KafkaTableManager.class); private final String databaseName; private final QueryExecutor prestoQueryExecutor; private final String brokerHost; private final Integer brokerPort; private final String prestoKafkaCatalog; - private final String zookeeperHost; - private final Integer zookeeperPort; @Inject public KafkaTableManager( @Named("databaseName") String databaseName, @Named("broker.host") String brokerHost, @Named("broker.port") int brokerPort, - @Named("zookeeper.host") String zookeeperHost, - @Named("zookeeper.port") int zookeeperPort, @Named("presto_database_name") String prestoDatabaseName, @Named("presto_kafka_catalog") String prestoKafkaCatalog, Injector injector) @@ -72,8 +72,6 @@ public KafkaTableManager( this.databaseName = requireNonNull(databaseName, "databaseName is null"); this.brokerHost = requireNonNull(brokerHost, "brokerHost is null"); this.brokerPort = brokerPort; - this.zookeeperHost = requireNonNull(zookeeperHost, "zookeeperHost is null"); - this.zookeeperPort = zookeeperPort; requireNonNull(injector, "injector is null"); requireNonNull(prestoDatabaseName, "prestoDatabaseName is null"); this.prestoQueryExecutor = injector.getInstance(Key.get(QueryExecutor.class, Names.named(prestoDatabaseName))); @@ -95,6 +93,18 @@ public TableInstance createImmutable(KafkaTableDefinition return new KafkaTableInstance(createdTableName, tableDefinition); } + private void withKafkaAdminClient(Consumer routine) { + Properties properties = new Properties(); + properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerHost + ":" + brokerPort); + try (AdminClient adminClient = AdminClient.create(properties)) { + routine.accept(adminClient); + } catch (Exception e) { + LOGGER.error("An error occurred while performing an operation with Kafka AdminClient on broker " + + brokerHost + ":" + brokerPort); + throw new RuntimeException("An error occurred while performing an operation with Kafka AdminClient", e); + } + } + private void verifyTableExistsInPresto(String schema, String name) { String sql = format("select count(1) from %s.information_schema.tables where table_schema='%s' and table_name='%s'", prestoKafkaCatalog, schema, name); @@ -104,34 +114,25 @@ private void verifyTableExistsInPresto(String schema, String name) } } - private void deleteTopic(String topic) - { - withZookeeper(zkUtils -> { - if (AdminUtils.topicExists(zkUtils, topic)) { - AdminUtils.deleteTopic(zkUtils, topic); - - for (int checkTry = 0; checkTry < 5; ++checkTry) { - if (!AdminUtils.topicExists(zkUtils, topic)) { - return; - } - try { - Thread.sleep(1_000); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("could not delete topic " + topic); - } - } - throw new RuntimeException("could not delete topic " + topic); + private void deleteTopic(String topic) { + withKafkaAdminClient(adminClient -> { + try { + DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Collections.singletonList(topic)); + deleteTopicsResult.all().get(); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException("Failed to delete topic " + topic, e); } }); } - private void createTopic(String topic, int partitionsCount, int replicationLevel) - { - withZookeeper(zkUtils -> { - Properties topicConfiguration = new Properties(); - AdminUtils.createTopic(zkUtils, topic, partitionsCount, replicationLevel, topicConfiguration, RackAwareMode.Disabled$.MODULE$); + private void createTopic(String topic, int partitionsCount, int replicationLevel) { + withKafkaAdminClient(adminClient -> { + NewTopic newTopic = new NewTopic(topic, partitionsCount, (short) replicationLevel); + try { + adminClient.createTopics(Collections.singletonList(newTopic)).all().get(); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException("Failed to create topic " + topic, e); + } }); } @@ -164,25 +165,6 @@ private void insertDataIntoTopic(String topic, KafkaDataSource dataSource) } } - private void withZookeeper(Consumer routine) - { - int sessionTimeOutInMs = 15_000; - int connectionTimeOutInMs = 10_000; - String zookeeperHosts = zookeeperHost + ":" + zookeeperPort; - - ZkClient zkClient = new ZkClient(zookeeperHosts, - sessionTimeOutInMs, - connectionTimeOutInMs, - ZKStringSerializer$.MODULE$); - try { - ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false); - routine.accept(zkUtils); - } - finally { - zkClient.close(); - } - } - @Override public TableInstance createMutable(KafkaTableDefinition tableDefinition, MutableTableRequirement.State state, TableHandle tableHandle) {