Skip to content

Commit

Permalink
MINOR: fix warnings in Kafka Streams state store tests (#17855)
Browse files Browse the repository at this point in the history
Reviewers: Chia-Ping Tsai <[email protected]>
  • Loading branch information
mjsax authored Nov 26, 2024
1 parent 98d47f4 commit f5d7123
Show file tree
Hide file tree
Showing 22 changed files with 871 additions and 930 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ public void shouldMaterializeCount(final EmitStrategy.StrategyType inputType) {
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
processData(driver);
final SessionStore<String, Long> store = driver.getSessionStore("count-store");
final List<KeyValue<Windowed<String>, Long>> data = StreamsTestUtils.toList(store.fetch("1", "2"));
final List<KeyValue<Windowed<String>, Long>> data = StreamsTestUtils.toListAndCloseIterator(store.fetch("1", "2"));
if (!emitFinal) {
assertThat(
data,
Expand Down Expand Up @@ -255,7 +255,7 @@ public void shouldMaterializeReduced(final EmitStrategy.StrategyType inputType)
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
processData(driver);
final SessionStore<String, String> sessionStore = driver.getSessionStore("reduced");
final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(sessionStore.fetch("1", "2"));
final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toListAndCloseIterator(sessionStore.fetch("1", "2"));

if (!emitFinal) {
assertThat(
Expand Down Expand Up @@ -288,7 +288,7 @@ public void shouldMaterializeAggregated(final EmitStrategy.StrategyType inputTyp
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
processData(driver);
final SessionStore<String, String> sessionStore = driver.getSessionStore("aggregated");
final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(sessionStore.fetch("1", "2"));
final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toListAndCloseIterator(sessionStore.fetch("1", "2"));
if (!emitFinal) {
assertThat(
data,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ public void shouldMaterializeCount() {
{
final WindowStore<String, Long> windowStore = driver.getWindowStore("count-store");
final List<KeyValue<Windowed<String>, Long>> data =
StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
StreamsTestUtils.toListAndCloseIterator(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));

assertThat(data, equalTo(Arrays.asList(
KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 100)), 1L),
Expand All @@ -223,7 +223,7 @@ public void shouldMaterializeCount() {
final WindowStore<String, ValueAndTimestamp<Long>> windowStore =
driver.getTimestampedWindowStore("count-store");
final List<KeyValue<Windowed<String>, ValueAndTimestamp<Long>>> data =
StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
StreamsTestUtils.toListAndCloseIterator(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
assertThat(data, equalTo(Arrays.asList(
KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 100)), ValueAndTimestamp.make(1L, 100L)),
KeyValue.pair(new Windowed<>("1", new TimeWindow(50, 150)), ValueAndTimestamp.make(2L, 150L)),
Expand All @@ -248,7 +248,7 @@ public void shouldMaterializeReduced() {
{
final WindowStore<String, String> windowStore = driver.getWindowStore("reduced");
final List<KeyValue<Windowed<String>, String>> data =
StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
StreamsTestUtils.toListAndCloseIterator(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
assertThat(data, equalTo(Arrays.asList(
KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 100)), "1"),
KeyValue.pair(new Windowed<>("1", new TimeWindow(50, 150)), "1+2"),
Expand All @@ -262,7 +262,7 @@ public void shouldMaterializeReduced() {
final WindowStore<String, ValueAndTimestamp<Long>> windowStore =
driver.getTimestampedWindowStore("reduced");
final List<KeyValue<Windowed<String>, ValueAndTimestamp<Long>>> data =
StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
StreamsTestUtils.toListAndCloseIterator(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
assertThat(data, equalTo(Arrays.asList(
KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 100)), ValueAndTimestamp.make("1", 100L)),
KeyValue.pair(new Windowed<>("1", new TimeWindow(50, 150)), ValueAndTimestamp.make("1+2", 150L)),
Expand All @@ -289,7 +289,7 @@ public void shouldMaterializeAggregated() {
{
final WindowStore<String, String> windowStore = driver.getWindowStore("aggregated");
final List<KeyValue<Windowed<String>, String>> data =
StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
StreamsTestUtils.toListAndCloseIterator(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
assertThat(data, equalTo(Arrays.asList(
KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 100)), "0+1"),
KeyValue.pair(new Windowed<>("1", new TimeWindow(50, 150)), "0+1+2"),
Expand All @@ -303,7 +303,7 @@ public void shouldMaterializeAggregated() {
final WindowStore<String, ValueAndTimestamp<Long>> windowStore =
driver.getTimestampedWindowStore("aggregated");
final List<KeyValue<Windowed<String>, ValueAndTimestamp<Long>>> data =
StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
StreamsTestUtils.toListAndCloseIterator(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
assertThat(data, equalTo(Arrays.asList(
KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 100)), ValueAndTimestamp.make("0+1", 100L)),
KeyValue.pair(new Windowed<>("1", new TimeWindow(50, 150)), ValueAndTimestamp.make("0+1+2", 150L)),
Expand Down Expand Up @@ -410,7 +410,7 @@ public void shouldDropWindowsOutsideOfRetention() {
{
final WindowStore<String, String> windowStore = driver.getWindowStore("aggregated");
final List<KeyValue<Windowed<String>, String>> data =
StreamsTestUtils.toList(windowStore.fetch("1", "1", ofEpochMilli(0), ofEpochMilli(10000L)));
StreamsTestUtils.toListAndCloseIterator(windowStore.fetch("1", "1", ofEpochMilli(0), ofEpochMilli(10000L)));
assertThat(data, equalTo(Arrays.asList(
KeyValue.pair(new Windowed<>("1", new TimeWindow(900, 1000)), "0+4"),
KeyValue.pair(new Windowed<>("1", new TimeWindow(1900, 2000)), "0+5"))));
Expand All @@ -419,7 +419,7 @@ public void shouldDropWindowsOutsideOfRetention() {
final WindowStore<String, ValueAndTimestamp<Long>> windowStore =
driver.getTimestampedWindowStore("aggregated");
final List<KeyValue<Windowed<String>, ValueAndTimestamp<Long>>> data =
StreamsTestUtils.toList(windowStore.fetch("1", "1", ofEpochMilli(0), ofEpochMilli(2000L)));
StreamsTestUtils.toListAndCloseIterator(windowStore.fetch("1", "1", ofEpochMilli(0), ofEpochMilli(2000L)));
assertThat(data, equalTo(Arrays.asList(
KeyValue.pair(new Windowed<>("1", new TimeWindow(900, 1000)), ValueAndTimestamp.make("0+4", 1000L)),
KeyValue.pair(new Windowed<>("1", new TimeWindow(1900, 2000)), ValueAndTimestamp.make("0+5", 2000L)))));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ public void shouldMaterializeCount(final StrategyType inputType, final boolean i
{
final WindowStore<String, Long> windowStore = driver.getWindowStore("count-store");
final List<KeyValue<Windowed<String>, Long>> data =
StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
StreamsTestUtils.toListAndCloseIterator(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));

if (withCache) {
// with cache returns all records (expired from underneath as well) as part of
Expand All @@ -266,7 +266,7 @@ public void shouldMaterializeCount(final StrategyType inputType, final boolean i
final WindowStore<String, ValueAndTimestamp<Long>> windowStore =
driver.getTimestampedWindowStore("count-store");
final List<KeyValue<Windowed<String>, ValueAndTimestamp<Long>>> data =
StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
StreamsTestUtils.toListAndCloseIterator(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));

// the same values and logic described above applies here as well.
if (withCache) {
Expand Down Expand Up @@ -305,7 +305,7 @@ public void shouldMaterializeReduced(final StrategyType inputType, final boolean
{
final WindowStore<String, String> windowStore = driver.getWindowStore("reduced");
final List<KeyValue<Windowed<String>, String>> data =
StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
StreamsTestUtils.toListAndCloseIterator(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));

if (withCache) {
// with cache returns all records (expired from underneath as well) as part of
Expand All @@ -325,7 +325,7 @@ public void shouldMaterializeReduced(final StrategyType inputType, final boolean
{
final WindowStore<String, ValueAndTimestamp<String>> windowStore = driver.getTimestampedWindowStore("reduced");
final List<KeyValue<Windowed<String>, ValueAndTimestamp<String>>> data =
StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
StreamsTestUtils.toListAndCloseIterator(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));

// same logic/data as explained above.
if (withCache) {
Expand Down Expand Up @@ -358,7 +358,7 @@ public void shouldMaterializeAggregated(final StrategyType inputType, final bool
{
final WindowStore<String, String> windowStore = driver.getWindowStore("aggregated");
final List<KeyValue<Windowed<String>, String>> data =
StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
StreamsTestUtils.toListAndCloseIterator(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));

if (withCache) {
// with cache returns all records (expired from underneath as well) as part of
Expand All @@ -379,7 +379,7 @@ public void shouldMaterializeAggregated(final StrategyType inputType, final bool
{
final WindowStore<String, ValueAndTimestamp<String>> windowStore = driver.getTimestampedWindowStore("aggregated");
final List<KeyValue<Windowed<String>, ValueAndTimestamp<String>>> data =
StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
StreamsTestUtils.toListAndCloseIterator(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
if (withCache) {
assertThat(data, equalTo(asList(
KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), ValueAndTimestamp.make("0+1+2", 15L)),
Expand Down
Loading

0 comments on commit f5d7123

Please sign in to comment.