Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fresh results #34

Merged
merged 6 commits into from
Aug 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ build: license-header
${MAVEN} clean install -DskipTests

.PHONY: test
test: build
test:
${MAVEN} test verify

.PHONY: zipkin-local
Expand Down
74 changes: 49 additions & 25 deletions storage/src/main/java/zipkin2/storage/kafka/KafkaSpanStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import org.apache.kafka.streams.KafkaStreams;
Expand Down Expand Up @@ -111,10 +112,12 @@ static class GetServiceNamesCall extends KafkaStreamsStoreCall<List<String>> {

@Override public List<String> query() {
List<String> serviceNames = new ArrayList<>();
serviceStore.all().forEachRemaining(keyValue -> {
// double check service names are unique
if (!serviceNames.contains(keyValue.value)) serviceNames.add(keyValue.value);
});
try (KeyValueIterator<String, String> all = serviceStore.all()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sheesh do we allow dupes? sounds like we should tighten up that javadoc

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeap, that surprise me on one of my deployments where I started to get duplicate service names. Will create an issue to double check this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#35

all.forEachRemaining(keyValue -> {
// double check service names are unique
if (!serviceNames.contains(keyValue.value)) serviceNames.add(keyValue.value);
});
}
// comply with Zipkin API as service names are required to be ordered lexicographically
Collections.sort(serviceNames);
return serviceNames;
Expand Down Expand Up @@ -178,53 +181,74 @@ static class GetRemoteServiceNamesCall extends KafkaStreamsStoreCall<List<String
static class GetTracesCall extends KafkaStreamsStoreCall<List<List<Span>>> {
final ReadOnlyKeyValueStore<String, List<Span>> tracesStore;
final ReadOnlyKeyValueStore<Long, Set<String>> traceIdsByTsStore;
final QueryRequest queryRequest;
final QueryRequest request;

GetTracesCall(
ReadOnlyKeyValueStore<String, List<Span>> tracesStore,
ReadOnlyKeyValueStore<Long, Set<String>> traceIdsByTsStore,
QueryRequest queryRequest) {
QueryRequest request) {
this.tracesStore = tracesStore;
this.traceIdsByTsStore = traceIdsByTsStore;
this.queryRequest = queryRequest;
this.request = request;
}

@Override public List<List<Span>> query() {
List<List<Span>> traces = new ArrayList<>();
List<String> traceIds = new ArrayList<>();
// milliseconds to microseconds
long from = (queryRequest.endTs() - queryRequest.lookback()) * 1000;
long to = queryRequest.endTs() * 1000;
// first index
KeyValueIterator<Long, Set<String>> spanIds = traceIdsByTsStore.range(from, to);
spanIds.forEachRemaining(keyValue -> {
for (String traceId : keyValue.value) {
if (!traceIds.contains(traceId) && traces.size() < queryRequest.limit()) {
List<Span> spans = tracesStore.get(traceId);
if (spans != null && queryRequest.test(spans)) { // apply filters
traceIds.add(traceId); // adding to check if we have already add it later
traces.add(spans);
long from = (request.endTs() - request.lookback()) * 1000;
long to = request.endTs() * 1000;
long checkpoint = to - (30 * 1000 * 1000); // 30 sec before upper bound
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this checkpoint represent the bucket range

if (checkpoint <= from) { // do one run
try (KeyValueIterator<Long, Set<String>> spanIds = traceIdsByTsStore.range(from, to)) {
spanIds.forEachRemaining(keyValue -> {
for (String traceId : keyValue.value) {
if (!traceIds.contains(traceId)) {
List<Span> spans = tracesStore.get(traceId);
if (spans != null && request.test(spans)) { // apply filters
traceIds.add(traceId); // adding to check if we have already add it later
traces.add(spans);
}
}
}
});
}
} else {
while (checkpoint > from && traces.size() < request.limit()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @llinder @michaelsembwever this seems familiar though in cassandra I think we try to do async lazy chained calls.. not sure if the traceIdsByTsStore.range here is a remote call or not.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All stores are local tho

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless we'd support openzipkin/zipkin#2784 where calls could span multiple instances

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue moved to #36

try (KeyValueIterator<Long, Set<String>> spanIds = traceIdsByTsStore.range(checkpoint, to)) {
spanIds.forEachRemaining(keyValue -> {
for (String traceId : keyValue.value) {
if (!traceIds.contains(traceId)) {
List<Span> spans = tracesStore.get(traceId);
if (spans != null && request.test(spans)) { // apply filters
traceIds.add(traceId); // adding to check if we have already add it later
traces.add(spans);
}
}
}
});
}
to = checkpoint;
checkpoint = checkpoint - (60 * 1000 * 1000); // 1 min before more
}
});
LOG.debug("Traces found from query {}: {}", queryRequest, traces.size());
return traces;
}
traces.sort(Comparator.<List<Span>>comparingLong(o -> o.get(0).timestampAsLong()).reversed());
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resulting traces list should have values close to limit, so sorting should not be expensive.

LOG.debug("Traces found from query {}: {}", request, traces.size());
return traces.subList(0,
request.limit() >= traces.size() ? traceIds.size() : request.limit());
}

@Override
public Call<List<List<Span>>> clone() {
return new GetTracesCall(tracesStore, traceIdsByTsStore, queryRequest);
return new GetTracesCall(tracesStore, traceIdsByTsStore, request);
}
}

static class GetTraceCall extends KafkaStreamsStoreCall<List<Span>> {
final ReadOnlyKeyValueStore<String, List<Span>> traceStore;
final String traceId;

GetTraceCall(
ReadOnlyKeyValueStore<String, List<Span>> traceStore,
String traceId) {
GetTraceCall(ReadOnlyKeyValueStore<String, List<Span>> traceStore, String traceId) {
this.traceStore = traceStore;
this.traceId = traceId;
}
Expand Down
30 changes: 27 additions & 3 deletions storage/src/test/java/zipkin2/storage/kafka/KafkaStorageIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,14 @@ class KafkaStorageIT {
.localEndpoint(Endpoint.newBuilder().serviceName("svc_b").build())
.timestamp(TODAY * 1000).duration(2)
.build();
Span other = Span.newBuilder().traceId("c").id("c").name("op_c").kind(Span.Kind.SERVER)
.localEndpoint(Endpoint.newBuilder().serviceName("svc_c").build())
.timestamp(TODAY * 1000 + 10).duration(8)
.build();
List<Span> spans = Arrays.asList(parent, child);
// When: been published
tracesProducer.send(new ProducerRecord<>(storage.spansTopicName, parent.traceId(), spans));
tracesProducer.send(new ProducerRecord<>(storage.spansTopicName, other.traceId(), Collections.singletonList(other)));
tracesProducer.flush();
// Then: stored
IntegrationTestUtils.waitUntilMinRecordsReceived(
Expand All @@ -169,14 +174,14 @@ class KafkaStorageIT {
SpanStore spanStore = storage.spanStore();
ServiceAndSpanNames serviceAndSpanNames = storage.serviceAndSpanNames();
// Then: services names are searchable
await().atMost(30, TimeUnit.SECONDS)
await().atMost(10, TimeUnit.SECONDS)
.until(() -> {
List<List<Span>> traces = new ArrayList<>();
try {
traces =
spanStore.getTraces(QueryRequest.newBuilder()
.endTs(TODAY + 1)
.lookback(Duration.ofMinutes(1).toMillis())
.lookback(Duration.ofSeconds(30).toMillis())
.serviceName("svc_a")
.limit(10)
.build())
Expand All @@ -189,6 +194,25 @@ class KafkaStorageIT {
return traces.size() == 1
&& traces.get(0).size() == 2; // Trace is found and has two spans
});
await().atMost(10, TimeUnit.SECONDS)
.until(() -> {
List<List<Span>> traces = new ArrayList<>();
try {
traces =
spanStore.getTraces(QueryRequest.newBuilder()
.endTs(TODAY + 1)
.lookback(Duration.ofMinutes(1).toMillis())
.limit(1)
.build())
.execute();
} catch (InvalidStateStoreException e) { // ignoring state issues
System.err.println(e.getMessage());
} catch (Exception e) {
e.printStackTrace();
}
return traces.size() == 1
&& traces.get(0).size() == 1; // last trace is returned first
});
await().atMost(5, TimeUnit.SECONDS)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

side question.. why all the waits? I know in some cases you can't get a callback of storage quorum met. in such case you could add a sleep to a command that wraps storage to declutter the IT's main code..

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, probably copy-pasta heh. I think only first one is needed, will clean up.

.until(() -> {
List<String> services = new ArrayList<>();
Expand All @@ -199,7 +223,7 @@ class KafkaStorageIT {
} catch (Exception e) {
e.printStackTrace();
}
return services.size() == 2;
return services.size() == 3;
}); // There are two service names
await().atMost(5, TimeUnit.SECONDS)
.until(() -> {
Expand Down