From 44afdd3a9878489c9847d5249bbbe68bdda5039c Mon Sep 17 00:00:00 2001 From: Jorge Quilcate Otoya Date: Tue, 27 Aug 2019 11:30:20 +0200 Subject: [PATCH 1/6] feat: lazy limit --- .../main/java/zipkin2/storage/kafka/KafkaSpanStore.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/storage/src/main/java/zipkin2/storage/kafka/KafkaSpanStore.java b/storage/src/main/java/zipkin2/storage/kafka/KafkaSpanStore.java index 9b5184ca..8bdb7e8c 100644 --- a/storage/src/main/java/zipkin2/storage/kafka/KafkaSpanStore.java +++ b/storage/src/main/java/zipkin2/storage/kafka/KafkaSpanStore.java @@ -16,6 +16,7 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Set; import org.apache.kafka.streams.KafkaStreams; @@ -199,7 +200,8 @@ static class GetTracesCall extends KafkaStreamsStoreCall>> { KeyValueIterator> spanIds = traceIdsByTsStore.range(from, to); spanIds.forEachRemaining(keyValue -> { for (String traceId : keyValue.value) { - if (!traceIds.contains(traceId) && traces.size() < queryRequest.limit()) { + if (!traceIds.contains(traceId)) { + //if (!traceIds.contains(traceId) && traces.size() < queryRequest.limit()) { List spans = tracesStore.get(traceId); if (spans != null && queryRequest.test(spans)) { // apply filters traceIds.add(traceId); // adding to check if we have already add it later @@ -208,8 +210,10 @@ static class GetTracesCall extends KafkaStreamsStoreCall>> { } } }); + traces.sort(Comparator.comparing(o -> o.get(0).timestamp())); + // TODO check if we should reverse LOG.debug("Traces found from query {}: {}", queryRequest, traces.size()); - return traces; + return traces.subList(0, queryRequest.limit()); } @Override From 5d8545b5ab2508b967a88fdaa352615d8fbbd5a2 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Tue, 27 Aug 2019 13:09:58 +0200 Subject: [PATCH 2/6] fix: validate limit is bigger than results --- .../zipkin2/storage/kafka/KafkaSpanStore.java | 4 +-- .../zipkin2/storage/kafka/KafkaStorageIT.java | 26 ++++++++++++++++++- 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/storage/src/main/java/zipkin2/storage/kafka/KafkaSpanStore.java b/storage/src/main/java/zipkin2/storage/kafka/KafkaSpanStore.java index 8bdb7e8c..7e973e1d 100644 --- a/storage/src/main/java/zipkin2/storage/kafka/KafkaSpanStore.java +++ b/storage/src/main/java/zipkin2/storage/kafka/KafkaSpanStore.java @@ -211,9 +211,9 @@ static class GetTracesCall extends KafkaStreamsStoreCall>> { } }); traces.sort(Comparator.comparing(o -> o.get(0).timestamp())); - // TODO check if we should reverse + Collections.reverse(traces); // return most recent traces LOG.debug("Traces found from query {}: {}", queryRequest, traces.size()); - return traces.subList(0, queryRequest.limit()); + return traces.subList(0, queryRequest.limit() >= traces.size() ? traceIds.size() : queryRequest.limit()); } @Override diff --git a/storage/src/test/java/zipkin2/storage/kafka/KafkaStorageIT.java b/storage/src/test/java/zipkin2/storage/kafka/KafkaStorageIT.java index fa4726e6..0d0d472e 100644 --- a/storage/src/test/java/zipkin2/storage/kafka/KafkaStorageIT.java +++ b/storage/src/test/java/zipkin2/storage/kafka/KafkaStorageIT.java @@ -158,9 +158,14 @@ class KafkaStorageIT { .localEndpoint(Endpoint.newBuilder().serviceName("svc_b").build()) .timestamp(TODAY * 1000).duration(2) .build(); + Span other = Span.newBuilder().traceId("c").id("c").name("op_c").kind(Span.Kind.SERVER) + .localEndpoint(Endpoint.newBuilder().serviceName("svc_c").build()) + .timestamp(TODAY * 1000 + 10).duration(8) + .build(); List spans = Arrays.asList(parent, child); // When: been published tracesProducer.send(new ProducerRecord<>(storage.spansTopicName, parent.traceId(), spans)); + tracesProducer.send(new ProducerRecord<>(storage.spansTopicName, other.traceId(), Collections.singletonList(other))); tracesProducer.flush(); // Then: stored IntegrationTestUtils.waitUntilMinRecordsReceived( @@ -189,6 +194,25 @@ class KafkaStorageIT { return traces.size() == 1 && traces.get(0).size() == 2; // Trace is found and has two spans }); + await().atMost(30, TimeUnit.SECONDS) + .until(() -> { + List> traces = new ArrayList<>(); + try { + traces = + spanStore.getTraces(QueryRequest.newBuilder() + .endTs(TODAY + 1) + .lookback(Duration.ofMinutes(1).toMillis()) + .limit(1) + .build()) + .execute(); + } catch (InvalidStateStoreException e) { // ignoring state issues + System.err.println(e.getMessage()); + } catch (Exception e) { + e.printStackTrace(); + } + return traces.size() == 1 + && traces.get(0).size() == 1; // last trace is returned first + }); await().atMost(5, TimeUnit.SECONDS) .until(() -> { List services = new ArrayList<>(); @@ -199,7 +223,7 @@ class KafkaStorageIT { } catch (Exception e) { e.printStackTrace(); } - return services.size() == 2; + return services.size() == 3; }); // There are two service names await().atMost(5, TimeUnit.SECONDS) .until(() -> { From 96e91902db9d2c70895127cdf55d32faa848dc75 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Tue, 27 Aug 2019 13:36:21 +0200 Subject: [PATCH 3/6] fix: close iterators --- .../zipkin2/storage/kafka/KafkaSpanStore.java | 53 ++++++++++--------- 1 file changed, 27 insertions(+), 26 deletions(-) diff --git a/storage/src/main/java/zipkin2/storage/kafka/KafkaSpanStore.java b/storage/src/main/java/zipkin2/storage/kafka/KafkaSpanStore.java index 7e973e1d..b0bba84d 100644 --- a/storage/src/main/java/zipkin2/storage/kafka/KafkaSpanStore.java +++ b/storage/src/main/java/zipkin2/storage/kafka/KafkaSpanStore.java @@ -112,10 +112,12 @@ static class GetServiceNamesCall extends KafkaStreamsStoreCall> { @Override public List query() { List serviceNames = new ArrayList<>(); - serviceStore.all().forEachRemaining(keyValue -> { - // double check service names are unique - if (!serviceNames.contains(keyValue.value)) serviceNames.add(keyValue.value); - }); + try (KeyValueIterator all = serviceStore.all()) { + all.forEachRemaining(keyValue -> { + // double check service names are unique + if (!serviceNames.contains(keyValue.value)) serviceNames.add(keyValue.value); + }); + } // comply with Zipkin API as service names are required to be ordered lexicographically Collections.sort(serviceNames); return serviceNames; @@ -179,46 +181,47 @@ static class GetRemoteServiceNamesCall extends KafkaStreamsStoreCall>> { final ReadOnlyKeyValueStore> tracesStore; final ReadOnlyKeyValueStore> traceIdsByTsStore; - final QueryRequest queryRequest; + final QueryRequest request; GetTracesCall( ReadOnlyKeyValueStore> tracesStore, ReadOnlyKeyValueStore> traceIdsByTsStore, - QueryRequest queryRequest) { + QueryRequest request) { this.tracesStore = tracesStore; this.traceIdsByTsStore = traceIdsByTsStore; - this.queryRequest = queryRequest; + this.request = request; } @Override public List> query() { List> traces = new ArrayList<>(); List traceIds = new ArrayList<>(); // milliseconds to microseconds - long from = (queryRequest.endTs() - queryRequest.lookback()) * 1000; - long to = queryRequest.endTs() * 1000; + long from = (request.endTs() - request.lookback()) * 1000; + long to = request.endTs() * 1000; // first index - KeyValueIterator> spanIds = traceIdsByTsStore.range(from, to); - spanIds.forEachRemaining(keyValue -> { - for (String traceId : keyValue.value) { - if (!traceIds.contains(traceId)) { - //if (!traceIds.contains(traceId) && traces.size() < queryRequest.limit()) { - List spans = tracesStore.get(traceId); - if (spans != null && queryRequest.test(spans)) { // apply filters - traceIds.add(traceId); // adding to check if we have already add it later - traces.add(spans); + try (KeyValueIterator> spanIds = traceIdsByTsStore.range(from, to)) { + spanIds.forEachRemaining(keyValue -> { + for (String traceId : keyValue.value) { + if (!traceIds.contains(traceId)) { + List spans = tracesStore.get(traceId); + if (spans != null && request.test(spans)) { // apply filters + traceIds.add(traceId); // adding to check if we have already add it later + traces.add(spans); + } } } - } - }); + }); + } traces.sort(Comparator.comparing(o -> o.get(0).timestamp())); Collections.reverse(traces); // return most recent traces - LOG.debug("Traces found from query {}: {}", queryRequest, traces.size()); - return traces.subList(0, queryRequest.limit() >= traces.size() ? traceIds.size() : queryRequest.limit()); + LOG.debug("Traces found from query {}: {}", request, traces.size()); + return traces.subList(0, + request.limit() >= traces.size() ? traceIds.size() : request.limit()); } @Override public Call>> clone() { - return new GetTracesCall(tracesStore, traceIdsByTsStore, queryRequest); + return new GetTracesCall(tracesStore, traceIdsByTsStore, request); } } @@ -226,9 +229,7 @@ static class GetTraceCall extends KafkaStreamsStoreCall> { final ReadOnlyKeyValueStore> traceStore; final String traceId; - GetTraceCall( - ReadOnlyKeyValueStore> traceStore, - String traceId) { + GetTraceCall(ReadOnlyKeyValueStore> traceStore, String traceId) { this.traceStore = traceStore; this.traceId = traceId; } From be37cefd746ba45e3712507f53e7f651441b3dad Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Tue, 27 Aug 2019 14:17:32 +0200 Subject: [PATCH 4/6] fix: simplify sort --- .../src/main/java/zipkin2/storage/kafka/KafkaSpanStore.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/storage/src/main/java/zipkin2/storage/kafka/KafkaSpanStore.java b/storage/src/main/java/zipkin2/storage/kafka/KafkaSpanStore.java index b0bba84d..8a048d19 100644 --- a/storage/src/main/java/zipkin2/storage/kafka/KafkaSpanStore.java +++ b/storage/src/main/java/zipkin2/storage/kafka/KafkaSpanStore.java @@ -212,8 +212,7 @@ static class GetTracesCall extends KafkaStreamsStoreCall>> { } }); } - traces.sort(Comparator.comparing(o -> o.get(0).timestamp())); - Collections.reverse(traces); // return most recent traces + traces.sort(Comparator.>comparingLong(o -> o.get(0).timestampAsLong()).reversed()); LOG.debug("Traces found from query {}: {}", request, traces.size()); return traces.subList(0, request.limit() >= traces.size() ? traceIds.size() : request.limit()); From 40f6879c8617c28231583eb1da1bbca99e31419b Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Tue, 27 Aug 2019 15:16:24 +0200 Subject: [PATCH 5/6] feat: bucket based query from earliest to oldest --- Makefile | 2 +- .../zipkin2/storage/kafka/KafkaSpanStore.java | 40 ++++++++++++++----- .../zipkin2/storage/kafka/KafkaStorageIT.java | 6 +-- 3 files changed, 34 insertions(+), 14 deletions(-) diff --git a/Makefile b/Makefile index 1bebc776..aca3aa2f 100644 --- a/Makefile +++ b/Makefile @@ -54,7 +54,7 @@ build: license-header ${MAVEN} clean install -DskipTests .PHONY: test -test: build +test: ${MAVEN} test verify .PHONY: zipkin-local diff --git a/storage/src/main/java/zipkin2/storage/kafka/KafkaSpanStore.java b/storage/src/main/java/zipkin2/storage/kafka/KafkaSpanStore.java index 8a048d19..f2c37592 100644 --- a/storage/src/main/java/zipkin2/storage/kafka/KafkaSpanStore.java +++ b/storage/src/main/java/zipkin2/storage/kafka/KafkaSpanStore.java @@ -198,19 +198,39 @@ static class GetTracesCall extends KafkaStreamsStoreCall>> { // milliseconds to microseconds long from = (request.endTs() - request.lookback()) * 1000; long to = request.endTs() * 1000; - // first index - try (KeyValueIterator> spanIds = traceIdsByTsStore.range(from, to)) { - spanIds.forEachRemaining(keyValue -> { - for (String traceId : keyValue.value) { - if (!traceIds.contains(traceId)) { - List spans = tracesStore.get(traceId); - if (spans != null && request.test(spans)) { // apply filters - traceIds.add(traceId); // adding to check if we have already add it later - traces.add(spans); + long checkpoint = to - (60 * 1000 * 1000); // 1 min before upper bound + if (checkpoint <= from) { // do one run + try (KeyValueIterator> spanIds = traceIdsByTsStore.range(from, to)) { + spanIds.forEachRemaining(keyValue -> { + for (String traceId : keyValue.value) { + if (!traceIds.contains(traceId)) { + List spans = tracesStore.get(traceId); + if (spans != null && request.test(spans)) { // apply filters + traceIds.add(traceId); // adding to check if we have already add it later + traces.add(spans); + } } } + }); + } + } else { + while (checkpoint > from && traces.size() < request.limit()) { + try (KeyValueIterator> spanIds = traceIdsByTsStore.range(checkpoint, to)) { + spanIds.forEachRemaining(keyValue -> { + for (String traceId : keyValue.value) { + if (!traceIds.contains(traceId)) { + List spans = tracesStore.get(traceId); + if (spans != null && request.test(spans)) { // apply filters + traceIds.add(traceId); // adding to check if we have already add it later + traces.add(spans); + } + } + } + }); } - }); + to = checkpoint; + checkpoint = checkpoint - (60 * 1000 * 1000); // 1 min before more + } } traces.sort(Comparator.>comparingLong(o -> o.get(0).timestampAsLong()).reversed()); LOG.debug("Traces found from query {}: {}", request, traces.size()); diff --git a/storage/src/test/java/zipkin2/storage/kafka/KafkaStorageIT.java b/storage/src/test/java/zipkin2/storage/kafka/KafkaStorageIT.java index 0d0d472e..dfd78de3 100644 --- a/storage/src/test/java/zipkin2/storage/kafka/KafkaStorageIT.java +++ b/storage/src/test/java/zipkin2/storage/kafka/KafkaStorageIT.java @@ -174,7 +174,7 @@ class KafkaStorageIT { SpanStore spanStore = storage.spanStore(); ServiceAndSpanNames serviceAndSpanNames = storage.serviceAndSpanNames(); // Then: services names are searchable - await().atMost(30, TimeUnit.SECONDS) + await().atMost(10, TimeUnit.SECONDS) .until(() -> { List> traces = new ArrayList<>(); try { @@ -194,14 +194,14 @@ class KafkaStorageIT { return traces.size() == 1 && traces.get(0).size() == 2; // Trace is found and has two spans }); - await().atMost(30, TimeUnit.SECONDS) + await().atMost(10, TimeUnit.SECONDS) .until(() -> { List> traces = new ArrayList<>(); try { traces = spanStore.getTraces(QueryRequest.newBuilder() .endTs(TODAY + 1) - .lookback(Duration.ofMinutes(1).toMillis()) + .lookback(Duration.ofMinutes(2).toMillis()) .limit(1) .build()) .execute(); From 9c28f5c9feb85c081f208665358de9bbee57a4db Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Tue, 27 Aug 2019 15:33:15 +0200 Subject: [PATCH 6/6] feat: bucket based query from earliest to oldest --- .../src/main/java/zipkin2/storage/kafka/KafkaSpanStore.java | 2 +- .../src/test/java/zipkin2/storage/kafka/KafkaStorageIT.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/storage/src/main/java/zipkin2/storage/kafka/KafkaSpanStore.java b/storage/src/main/java/zipkin2/storage/kafka/KafkaSpanStore.java index f2c37592..3f50207a 100644 --- a/storage/src/main/java/zipkin2/storage/kafka/KafkaSpanStore.java +++ b/storage/src/main/java/zipkin2/storage/kafka/KafkaSpanStore.java @@ -198,7 +198,7 @@ static class GetTracesCall extends KafkaStreamsStoreCall>> { // milliseconds to microseconds long from = (request.endTs() - request.lookback()) * 1000; long to = request.endTs() * 1000; - long checkpoint = to - (60 * 1000 * 1000); // 1 min before upper bound + long checkpoint = to - (30 * 1000 * 1000); // 30 sec before upper bound if (checkpoint <= from) { // do one run try (KeyValueIterator> spanIds = traceIdsByTsStore.range(from, to)) { spanIds.forEachRemaining(keyValue -> { diff --git a/storage/src/test/java/zipkin2/storage/kafka/KafkaStorageIT.java b/storage/src/test/java/zipkin2/storage/kafka/KafkaStorageIT.java index dfd78de3..a9526396 100644 --- a/storage/src/test/java/zipkin2/storage/kafka/KafkaStorageIT.java +++ b/storage/src/test/java/zipkin2/storage/kafka/KafkaStorageIT.java @@ -181,7 +181,7 @@ class KafkaStorageIT { traces = spanStore.getTraces(QueryRequest.newBuilder() .endTs(TODAY + 1) - .lookback(Duration.ofMinutes(1).toMillis()) + .lookback(Duration.ofSeconds(30).toMillis()) .serviceName("svc_a") .limit(10) .build()) @@ -201,7 +201,7 @@ class KafkaStorageIT { traces = spanStore.getTraces(QueryRequest.newBuilder() .endTs(TODAY + 1) - .lookback(Duration.ofMinutes(2).toMillis()) + .lookback(Duration.ofMinutes(1).toMillis()) .limit(1) .build()) .execute();