Skip to content

Commit

Permalink
STORM-3142: Add JUnit 5 support, migrate a couple of tests in storm-k…
Browse files Browse the repository at this point in the history
…afka-client to check that both JUnit 5 and 4 work. Also fix storm-kafka-client tests so they delete their temporary directories when done testing.
  • Loading branch information
srdo committed Jul 4, 2018
1 parent 26d2f95 commit c14a9b5
Show file tree
Hide file tree
Showing 34 changed files with 144 additions and 246 deletions.
5 changes: 0 additions & 5 deletions examples/storm-loadgen/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,6 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs
<groupId>org.hdrhistogram</groupId>
<artifactId>HdrHistogram</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-client</artifactId>
Expand Down
5 changes: 0 additions & 5 deletions examples/storm-starter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,6 @@
<groupId>org.hdrhistogram</groupId>
<artifactId>HdrHistogram</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
Expand Down
5 changes: 0 additions & 5 deletions external/storm-elasticsearch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,6 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava-testlib</artifactId>
Expand Down
5 changes: 0 additions & 5 deletions external/storm-hbase/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,6 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
Expand Down
6 changes: 0 additions & 6 deletions external/storm-jms/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,6 @@
<artifactId>geronimo-jms_1.1_spec</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>

<!-- Active MQ for testing JMS-->
<dependency>
Expand Down
4 changes: 4 additions & 0 deletions external/storm-kafka-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>java-hamcrest</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.storm.testing.TmpPath;

public class KafkaUnit {
private KafkaServer kafkaServer;
private EmbeddedZookeeper zkServer;
private ZkUtils zkUtils;
private KafkaProducer<String, String> producer;
private TmpPath kafkaDir;
private static final String ZK_HOST = "127.0.0.1";
private static final String KAFKA_HOST = "127.0.0.1";
private static final int KAFKA_PORT = 9092;
Expand All @@ -61,10 +63,11 @@ public void setUp() throws IOException {
zkUtils = ZkUtils.apply(zkClient, false);

// setup Broker
kafkaDir = new TmpPath(Files.createTempDirectory("kafka-").toAbsolutePath().toString());
Properties brokerProps = new Properties();
brokerProps.setProperty("zookeeper.connect", zkConnect);
brokerProps.setProperty("broker.id", "0");
brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
brokerProps.setProperty("log.dirs", kafkaDir.getPath());
brokerProps.setProperty("listeners", String.format("PLAINTEXT://%s:%d", KAFKA_HOST, KAFKA_PORT));
KafkaConfig config = new KafkaConfig(brokerProps);
MockTime mock = new MockTime();
Expand All @@ -77,6 +80,7 @@ public void setUp() throws IOException {
public void tearDown() {
closeProducer();
kafkaServer.shutdown();
kafkaDir.close();
zkUtils.close();
zkServer.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,25 @@
*/
package org.apache.storm.kafka;

import java.io.IOException;
import org.junit.rules.ExternalResource;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;


public class KafkaUnitRule extends ExternalResource {
public class KafkaUnitExtension implements BeforeEachCallback, AfterEachCallback {

private final KafkaUnit kafkaUnit;

public KafkaUnitRule() {
public KafkaUnitExtension() {
this.kafkaUnit = new KafkaUnit();
}

@Override
public void before() throws IOException {
public void beforeEach(ExtensionContext ctx) throws Exception {
kafkaUnit.setUp();
}

@Override
public void after() {
public void afterEach(ExtensionContext ctx) throws Exception {
kafkaUnit.tearDown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package org.apache.storm.kafka;

import java.util.Collections;
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.storm.kafka.spout.RecordTranslator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,45 +18,41 @@

package org.apache.storm.kafka.spout;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;

import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.KafkaUnitRule;
import org.apache.storm.kafka.KafkaUnitExtension;
import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
import org.apache.storm.kafka.spout.subscription.TopicAssigner;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Time;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.junit.jupiter.MockitoExtension;

import java.util.HashMap;
import java.util.Map;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;

import org.apache.storm.kafka.spout.subscription.TopicAssigner;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;

@ExtendWith(MockitoExtension.class)
public abstract class KafkaSpoutAbstractTest {

@Rule
public MockitoRule mockito = MockitoJUnit.rule();

@Rule
public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule();
@RegisterExtension
public KafkaUnitExtension kafkaUnitExtension = new KafkaUnitExtension();

final TopologyContext topologyContext = mock(TopologyContext.class);
final Map<String, Object> conf = new HashMap<>();
Expand All @@ -79,7 +75,7 @@ protected KafkaSpoutAbstractTest(long commitOffsetPeriodMs) {
this.commitOffsetPeriodMs = commitOffsetPeriodMs;
}

@Before
@BeforeEach
public void setUp() {
spoutConfig = createSpoutConfig();

Expand All @@ -105,15 +101,15 @@ KafkaConsumer<String, String> createConsumerSpy() {
return spy(new KafkaConsumerFactoryDefault<String, String>().createConsumer(spoutConfig));
}

@After
@AfterEach
public void tearDown() throws Exception {
simulatedTime.close();
}

abstract KafkaSpoutConfig<String, String> createSpoutConfig();

void prepareSpout(int messageCount) throws Exception {
SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitRule.getKafkaUnit(), SingleTopicKafkaSpoutConfiguration.TOPIC, messageCount);
SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitExtension.getKafkaUnit(), SingleTopicKafkaSpoutConfiguration.TOPIC, messageCount);
SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collectorMock);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,24 @@

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.HashMap;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class KafkaSpoutConfigTest {

@Rule
public ExpectedException expectedException = ExpectedException.none();

@Test
public void testBasic() {
KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic").build();
assertEquals(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST, conf.getFirstPollOffsetStrategy());
assertEquals(conf.getFirstPollOffsetStrategy(), FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST);
assertNull(conf.getConsumerGroupId());
assertTrue(conf.getTranslator() instanceof DefaultRecordTranslator);
HashMap<String, Object> expected = new HashMap<>();
Expand All @@ -49,8 +45,8 @@ public void testBasic() {
expected.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
expected.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
expected.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
assertEquals(expected, conf.getKafkaProps());
assertEquals(KafkaSpoutConfig.DEFAULT_METRICS_TIME_BUCKET_SIZE_SECONDS, conf.getMetricsTimeBucketSizeInSecs());
assertEquals(conf.getKafkaProps(), expected);
assertEquals(conf.getMetricsTimeBucketSizeInSecs(), KafkaSpoutConfig.DEFAULT_METRICS_TIME_BUCKET_SIZE_SECONDS);
}

@Test
Expand All @@ -59,7 +55,7 @@ public void testSetEmitNullTuplesToTrue() {
.setEmitNullTuples(true)
.build();

assertTrue("Failed to set emit null tuples to true", conf.isEmitNullTuples());
assertTrue(conf.isEmitNullTuples(), "Failed to set emit null tuples to true");
}

@Test
Expand Down Expand Up @@ -88,14 +84,13 @@ public void testMetricsTimeBucketSizeInSecs() {
.setMetricsTimeBucketSizeInSecs(100)
.build();

assertEquals(100, conf.getMetricsTimeBucketSizeInSecs());
assertEquals(conf.getMetricsTimeBucketSizeInSecs(), 100);
}

@Test
public void testThrowsIfEnableAutoCommitIsSet() {
expectedException.expect(IllegalStateException.class);
KafkaSpoutConfig.builder("localhost:1234", "topic")
Assertions.assertThrows(IllegalStateException.class, () -> KafkaSpoutConfig.builder("localhost:1234", "topic")
.setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
.build();
.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package org.apache.storm.kafka.spout;

import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
Expand All @@ -40,8 +39,6 @@
import org.apache.storm.task.TopologyContext;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Time.SimulatedTime;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;

Expand All @@ -54,9 +51,10 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;

import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription;
import org.apache.storm.kafka.spout.subscription.ManualPartitioner;
import org.apache.storm.kafka.spout.subscription.TopicFilter;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class KafkaSpoutEmitTest {

Expand All @@ -68,7 +66,7 @@ public class KafkaSpoutEmitTest {
private KafkaConsumer<String, String> consumerMock;
private KafkaSpoutConfig<String, String> spoutConfig;

@Before
@BeforeEach
public void setUp() {
spoutConfig = createKafkaSpoutConfigBuilder(mock(TopicFilter.class), mock(ManualPartitioner.class), -1)
.setOffsetCommitPeriodMs(offsetCommitPeriodMs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,15 @@
package org.apache.storm.kafka.spout;


import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
import org.apache.storm.utils.Time;
import org.junit.Test;

import java.util.regex.Pattern;

import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;

import java.util.regex.Pattern;
import org.apache.storm.kafka.NullRecordTranslator;
import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
import org.apache.storm.utils.Time;
import org.junit.jupiter.api.Test;

public class KafkaSpoutNullTupleTest extends KafkaSpoutAbstractTest {

Expand All @@ -39,7 +37,7 @@ public KafkaSpoutNullTupleTest() {

@Override
KafkaSpoutConfig<String, String> createSpoutConfig() {
return KafkaSpoutConfig.builder("127.0.0.1:" + kafkaUnitRule.getKafkaUnit().getKafkaPort(),
return KafkaSpoutConfig.builder("127.0.0.1:" + kafkaUnitExtension.getKafkaUnit().getKafkaPort(),
Pattern.compile(SingleTopicKafkaSpoutConfiguration.TOPIC))
.setOffsetCommitPeriodMs(commitOffsetPeriodMs)
.setRecordTranslator(new NullRecordTranslator<>())
Expand Down
Loading

0 comments on commit c14a9b5

Please sign in to comment.