diff --git a/boms/common-expansion/pom.xml b/boms/common-expansion/pom.xml
index 7f18c7fea859..7b69103b3680 100644
--- a/boms/common-expansion/pom.xml
+++ b/boms/common-expansion/pom.xml
@@ -274,6 +274,19 @@
+
+
+ com.github.luben
+ zstd-jni
+ ${version.com.github.luben.zstd-jni}
+
+
+ *
+ *
+
+
+
+
com.fasterxml.jackson.dataformat
jackson-dataformat-yaml
@@ -1371,12 +1384,35 @@
+
+ org.lz4
+ lz4-java
+ ${version.org.lz4.lz4-java}
+
+
+ *
+ *
+
+
+
+
org.wildfly.security.mp
wildfly-elytron-jwt
${version.org.wildfly.security.elytron-mp}
+
+ org.xerial.snappy
+ snappy-java
+ ${version.org.xerial.snappy.snappy-java}
+
+
+ *
+ *
+
+
+
diff --git a/galleon-pack/galleon-shared/pom.xml b/galleon-pack/galleon-shared/pom.xml
index e042d94f7cd3..977172834849 100644
--- a/galleon-pack/galleon-shared/pom.xml
+++ b/galleon-pack/galleon-shared/pom.xml
@@ -40,7 +40,19 @@
com.fasterxml.jackson.jrjackson-jr-objects
-
+
+
+
+ com.github.luben
+ zstd-jni
+
+
+ *
+ *
+
+
+
+
com.google.api.grpcproto-google-common-protos
com.google.protobufprotobuf-java
com.google.protobufprotobuf-java-util
@@ -636,6 +648,17 @@
org.jetbrains.kotlinkotlin-stdlib-jdk8
org.jetbrains.kotlinkotlin-stdlib-jdk7
+
+ org.lz4
+ lz4-java
+
+
+ *
+ *
+
+
+
+
org.wildfly.security.mp
wildfly-elytron-jwt
@@ -647,6 +670,17 @@
+
+ org.xerial.snappy
+ snappy-java
+
+
+ *
+ *
+
+
+
+
${full.maven.groupId}
wildfly-microprofile-config-smallrye
@@ -780,7 +814,6 @@
${full.maven.groupId}
wildfly-microprofile-telemetry-cdi-provider
-
diff --git a/galleon-pack/galleon-shared/src/main/resources/license/licenses.xml b/galleon-pack/galleon-shared/src/main/resources/license/licenses.xml
index ae5f7b739130..97f8491f8e29 100644
--- a/galleon-pack/galleon-shared/src/main/resources/license/licenses.xml
+++ b/galleon-pack/galleon-shared/src/main/resources/license/licenses.xml
@@ -28,6 +28,17 @@
+
+ com.github.luben
+ zstd-jni
+
+
+ The BSD License
+ http://repository.jboss.org/licenses/bsd.txt
+ repo
+
+
+
com.google.api.grpc
proto-google-common-protos
@@ -961,6 +972,17 @@
+
+ org.lz4
+ lz4-java
+
+
+ Apache License 2.0
+ http://www.apache.org/licenses/LICENSE-2.0
+ repo
+
+
+
${full.maven.groupId}
wildfly-microprofile-config-smallrye
@@ -1236,5 +1258,16 @@
+
+ org.xerial.snappy
+ snappy-java
+
+
+ Apache License 2.0
+ http://www.apache.org/licenses/LICENSE-2.0
+ repo
+
+
+
diff --git a/galleon-pack/galleon-shared/src/main/resources/modules/system/layers/base/com/github/luben/zstd-jni/main/module.xml b/galleon-pack/galleon-shared/src/main/resources/modules/system/layers/base/com/github/luben/zstd-jni/main/module.xml
new file mode 100644
index 000000000000..924ecbb3615b
--- /dev/null
+++ b/galleon-pack/galleon-shared/src/main/resources/modules/system/layers/base/com/github/luben/zstd-jni/main/module.xml
@@ -0,0 +1,32 @@
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/galleon-pack/galleon-shared/src/main/resources/modules/system/layers/base/org/apache/kafka/client/main/module.xml b/galleon-pack/galleon-shared/src/main/resources/modules/system/layers/base/org/apache/kafka/client/main/module.xml
index 5baeb57c361e..6fb864393ea4 100644
--- a/galleon-pack/galleon-shared/src/main/resources/modules/system/layers/base/org/apache/kafka/client/main/module.xml
+++ b/galleon-pack/galleon-shared/src/main/resources/modules/system/layers/base/org/apache/kafka/client/main/module.xml
@@ -16,9 +16,12 @@
+
+
+
\ No newline at end of file
diff --git a/galleon-pack/galleon-shared/src/main/resources/modules/system/layers/base/org/lz4/lz4-java/main/module.xml b/galleon-pack/galleon-shared/src/main/resources/modules/system/layers/base/org/lz4/lz4-java/main/module.xml
new file mode 100644
index 000000000000..90636e326db9
--- /dev/null
+++ b/galleon-pack/galleon-shared/src/main/resources/modules/system/layers/base/org/lz4/lz4-java/main/module.xml
@@ -0,0 +1,33 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/galleon-pack/galleon-shared/src/main/resources/modules/system/layers/base/org/xerial/snappy/snappy-java/main/module.xml b/galleon-pack/galleon-shared/src/main/resources/modules/system/layers/base/org/xerial/snappy/snappy-java/main/module.xml
new file mode 100644
index 000000000000..dd0f89a6ae01
--- /dev/null
+++ b/galleon-pack/galleon-shared/src/main/resources/modules/system/layers/base/org/xerial/snappy/snappy-java/main/module.xml
@@ -0,0 +1,32 @@
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index d6b94e18efa3..3e43af18cd1d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -372,6 +372,7 @@
1.8
1.9
1.1
+ 1.5.2-1
2.0.1
2.8.9
32.1.2-jre
@@ -535,6 +536,7 @@
2.1.0
18.0.2
1.11
+ 1.8.0
4.2.0
9.5
1.0.4
@@ -546,6 +548,7 @@
1.0.0.Final
2.20.126
1.0.1
+ 1.1.8.4
${version.org.glassfish.jaxb}
1.6.3
1.2
diff --git a/testsuite/integration/microprofile/pom.xml b/testsuite/integration/microprofile/pom.xml
index 71948e448e25..5db51340f2d7 100644
--- a/testsuite/integration/microprofile/pom.xml
+++ b/testsuite/integration/microprofile/pom.xml
@@ -226,6 +226,24 @@
${version.org.apache.kafka}
test
+
+
+ com.github.luben
+ zstd-jni
+ test
+
+
+
+ org.lz4
+ lz4-java
+ test
+
+
+
+ org.xerial.snappy
+ snappy-java
+ test
+
org.wildfly.core
wildfly-cli
diff --git a/testsuite/integration/microprofile/src/test/java/org/wildfly/test/integration/microprofile/reactive/messaging/kafka/compression/CompressionMessagingBean.java b/testsuite/integration/microprofile/src/test/java/org/wildfly/test/integration/microprofile/reactive/messaging/kafka/compression/CompressionMessagingBean.java
new file mode 100644
index 000000000000..b9a6d8507698
--- /dev/null
+++ b/testsuite/integration/microprofile/src/test/java/org/wildfly/test/integration/microprofile/reactive/messaging/kafka/compression/CompressionMessagingBean.java
@@ -0,0 +1,102 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2023, Red Hat, Inc., and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.wildfly.test.integration.microprofile.reactive.messaging.kafka.compression;
+
+import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata;
+import io.smallrye.reactive.messaging.kafka.api.KafkaMetadataUtil;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import org.eclipse.microprofile.reactive.messaging.Channel;
+import org.eclipse.microprofile.reactive.messaging.Emitter;
+import org.eclipse.microprofile.reactive.messaging.Incoming;
+import org.eclipse.microprofile.reactive.messaging.Message;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * @author Kabir Khan
+ */
+@ApplicationScoped
+public class CompressionMessagingBean {
+ private final CountDownLatch latch = new CountDownLatch(4);
+ private List words = new ArrayList<>();
+
+ public CountDownLatch getLatch() {
+ return latch;
+ }
+
+ @Channel("to-kafka-gzip")
+ @Inject
+ Emitter gzipEmitter;
+
+ @Channel("to-kafka-snappy")
+ @Inject
+ Emitter snappyEmitter;
+
+ @Channel("to-kafka-lz4")
+ @Inject
+ Emitter lz4Emitter;
+
+ @Channel("to-kafka-zstd")
+ @Inject
+ Emitter zstdEmitter;
+
+ @Incoming("from-kafka")
+ public CompletionStage sink(Message message) {
+ IncomingKafkaRecordMetadata metadata = KafkaMetadataUtil.readIncomingKafkaMetadata(message).get();
+ words.add(message.getPayload());
+ latch.countDown();
+ return message.ack();
+ }
+
+ public List getWords() {
+ return words;
+ }
+
+ public void sendGzip(String...words) {
+ for (String word : words) {
+ gzipEmitter.send(word);
+ }
+ }
+
+ public void sendSnappy(String...words) {
+ for (String word : words) {
+ snappyEmitter.send(word);
+ }
+ }
+
+ public void sendLz4(String...words) {
+ for (String word : words) {
+ lz4Emitter.send(word);
+ }
+ }
+
+ public void sendZstd(String...words) {
+ for (String word : words) {
+ zstdEmitter.send(word);
+ }
+ }
+}
diff --git a/testsuite/integration/microprofile/src/test/java/org/wildfly/test/integration/microprofile/reactive/messaging/kafka/compression/ReactiveMessagingKafkaCompressionTestCase.java b/testsuite/integration/microprofile/src/test/java/org/wildfly/test/integration/microprofile/reactive/messaging/kafka/compression/ReactiveMessagingKafkaCompressionTestCase.java
new file mode 100644
index 000000000000..b793badac7ca
--- /dev/null
+++ b/testsuite/integration/microprofile/src/test/java/org/wildfly/test/integration/microprofile/reactive/messaging/kafka/compression/ReactiveMessagingKafkaCompressionTestCase.java
@@ -0,0 +1,93 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2023, Red Hat, Inc., and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.wildfly.test.integration.microprofile.reactive.messaging.kafka.compression;
+
+import jakarta.inject.Inject;
+import org.jboss.arquillian.container.test.api.Deployment;
+import org.jboss.arquillian.junit.Arquillian;
+import org.jboss.as.arquillian.api.ServerSetup;
+import org.jboss.as.test.shared.CLIServerSetupTask;
+import org.jboss.as.test.shared.PermissionUtils;
+import org.jboss.as.test.shared.TimeoutUtil;
+import org.jboss.shrinkwrap.api.ShrinkWrap;
+import org.jboss.shrinkwrap.api.asset.EmptyAsset;
+import org.jboss.shrinkwrap.api.exporter.ZipExporter;
+import org.jboss.shrinkwrap.api.spec.WebArchive;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.wildfly.test.integration.microprofile.reactive.EnableReactiveExtensionsSetupTask;
+import org.wildfly.test.integration.microprofile.reactive.RunKafkaSetupTask;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.PropertyPermission;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author Kabir Khan
+ */
+@RunWith(Arquillian.class)
+@ServerSetup({RunKafkaSetupTask.class, EnableReactiveExtensionsSetupTask.class})
+public class ReactiveMessagingKafkaCompressionTestCase {
+
+ private static final long TIMEOUT = TimeoutUtil.adjust(15000);
+
+ @Inject
+ CompressionMessagingBean bean;
+
+
+ @Deployment
+ public static WebArchive getDeployment() {
+ final WebArchive webArchive = ShrinkWrap.create(WebArchive.class, "reactive-messaging-kafka-tx.war")
+ .addAsWebInfResource(EmptyAsset.INSTANCE, "beans.xml")
+ .addPackage(ReactiveMessagingKafkaCompressionTestCase.class.getPackage())
+ .addClasses(RunKafkaSetupTask.class, EnableReactiveExtensionsSetupTask.class, CLIServerSetupTask.class)
+ .addAsWebInfResource(ReactiveMessagingKafkaCompressionTestCase.class.getPackage(), "microprofile-config.properties", "classes/META-INF/microprofile-config.properties")
+ .addClass(TimeoutUtil.class)
+ .addAsManifestResource(PermissionUtils.createPermissionsXmlAsset(
+ new PropertyPermission(TimeoutUtil.FACTOR_SYS_PROP, "read")
+ ), "permissions.xml");
+
+ webArchive.as(ZipExporter.class).exportTo(new File("target/test-original.war"), true);
+
+ return webArchive;
+ }
+
+ @Test
+ public void test() throws InterruptedException {
+ bean.sendGzip("Hello");
+ bean.sendSnappy("World");
+ bean.sendLz4("of");
+ bean.sendZstd("Reactive");
+
+ boolean wait = bean.getLatch().await(TIMEOUT, TimeUnit.MILLISECONDS);
+ Assert.assertTrue("Timed out", wait);
+ Set expected = new HashSet<>(Arrays.asList("Hello", "World", "of", "Reactive"));
+ Assert.assertEquals(expected.size(), bean.getWords().size());
+ Assert.assertTrue("Expected " + bean.getWords() + " to contain all of " + expected, bean.getWords().containsAll(expected));
+
+ }
+}
diff --git a/testsuite/integration/microprofile/src/test/java/org/wildfly/test/integration/microprofile/reactive/messaging/kafka/compression/microprofile-config.properties b/testsuite/integration/microprofile/src/test/java/org/wildfly/test/integration/microprofile/reactive/messaging/kafka/compression/microprofile-config.properties
new file mode 100644
index 000000000000..f6a653c13517
--- /dev/null
+++ b/testsuite/integration/microprofile/src/test/java/org/wildfly/test/integration/microprofile/reactive/messaging/kafka/compression/microprofile-config.properties
@@ -0,0 +1,54 @@
+#
+# JBoss, Home of Professional Open Source.
+# Copyright 2023, Red Hat, Inc., and individual contributors
+# as indicated by the @author tags. See the copyright.txt file in the
+# distribution for a full listing of individual contributors.
+#
+# This is free software; you can redistribute it and/or modify it
+# under the terms of the GNU Lesser General Public License as
+# published by the Free Software Foundation; either version 2.1 of
+# the License, or (at your option) any later version.
+#
+# This software is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this software; if not, write to the Free
+# Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+# 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+#
+
+# Configure the gzip producer
+mp.messaging.outgoing.to-kafka-gzip.connector=smallrye-kafka
+mp.messaging.outgoing.to-kafka-gzip.topic=testing
+mp.messaging.outgoing.to-kafka-gzip.value.serializer=org.apache.kafka.common.serialization.StringSerializer
+mp.messaging.outgoing.to-kafka-gzip.compression.type=gzip
+
+# Configure the snappy producer
+mp.messaging.outgoing.to-kafka-snappy.connector=smallrye-kafka
+mp.messaging.outgoing.to-kafka-snappy.topic=testing
+mp.messaging.outgoing.to-kafka-snappy.value.serializer=org.apache.kafka.common.serialization.StringSerializer
+mp.messaging.outgoing.to-kafka-snappy.compression.type=snappy
+
+# Configure the snappy producer
+mp.messaging.outgoing.to-kafka-lz4.connector=smallrye-kafka
+mp.messaging.outgoing.to-kafka-lz4.topic=testing
+mp.messaging.outgoing.to-kafka-lz4.value.serializer=org.apache.kafka.common.serialization.StringSerializer
+mp.messaging.outgoing.to-kafka-lz4.compression.type=lz4
+
+# Configure the zstd producer
+mp.messaging.outgoing.to-kafka-zstd.connector=smallrye-kafka
+mp.messaging.outgoing.to-kafka-zstd.topic=testing
+mp.messaging.outgoing.to-kafka-zstd.value.serializer=org.apache.kafka.common.serialization.StringSerializer
+mp.messaging.outgoing.to-kafka-zstd.compression.type=zstd
+
+# Configure the consumer
+mp.messaging.incoming.from-kafka.connector=smallrye-kafka
+mp.messaging.incoming.from-kafka.topic=testing
+mp.messaging.incoming.from-kafka.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
+
+# Needed as per https://github.com/smallrye/smallrye-reactive-messaging/issues/845 since the consumer
+# joins after the messages are sent
+mp.messaging.incoming.from-kafka.auto.offset.reset=earliest