-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add client request size metric channel #2023
Changes from 7 commits
6dc9052
0582992
28dcb36
4084d82
638a1a2
880b94c
1d4f94c
bf43b43
66ba6bd
fe26204
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
type: feature | ||
feature: | ||
description: Add a request size metric channel, which records the size of payloads | ||
written by the client. | ||
links: | ||
- https://github.com/palantir/dialogue/pull/2023 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,124 @@ | ||
/* | ||
* (c) Copyright 2023 Palantir Technologies Inc. All rights reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.palantir.dialogue.core; | ||
|
||
import com.codahale.metrics.Histogram; | ||
import com.google.common.base.Suppliers; | ||
import com.google.common.io.CountingOutputStream; | ||
import com.google.common.util.concurrent.ListenableFuture; | ||
import com.palantir.conjure.java.client.config.ClientConfiguration; | ||
import com.palantir.dialogue.Endpoint; | ||
import com.palantir.dialogue.EndpointChannel; | ||
import com.palantir.dialogue.Request; | ||
import com.palantir.dialogue.RequestBody; | ||
import com.palantir.dialogue.Response; | ||
import com.palantir.logsafe.logger.SafeLogger; | ||
import com.palantir.logsafe.logger.SafeLoggerFactory; | ||
import com.palantir.tritium.metrics.registry.TaggedMetricRegistry; | ||
import java.io.IOException; | ||
import java.io.OutputStream; | ||
import java.util.Optional; | ||
import java.util.function.Supplier; | ||
|
||
final class RequestSizeMetricsChannel implements EndpointChannel { | ||
private static final SafeLogger log = SafeLoggerFactory.get(RequestSizeMetricsChannel.class); | ||
// MIN_REPORTED_REQUEST_SIZE filters recording small requests to reduce metric cardinality | ||
private static final long MIN_REPORTED_REQUEST_SIZE = 1 << 20; | ||
private final EndpointChannel delegate; | ||
private final Supplier<Histogram> retryableRequestSize; | ||
private final Supplier<Histogram> nonretryableRequestSize; | ||
|
||
static EndpointChannel create(Config cf, EndpointChannel channel, Endpoint endpoint) { | ||
ClientConfiguration clientConf = cf.clientConf(); | ||
return new RequestSizeMetricsChannel(channel, cf.channelName(), endpoint, clientConf.taggedMetricRegistry()); | ||
} | ||
|
||
RequestSizeMetricsChannel( | ||
EndpointChannel delegate, String channelName, Endpoint endpoint, TaggedMetricRegistry registry) { | ||
this.delegate = delegate; | ||
DialogueClientMetrics dialogueClientMetrics = DialogueClientMetrics.of(registry); | ||
this.retryableRequestSize = Suppliers.memoize(() -> dialogueClientMetrics | ||
.requestsSize() | ||
.channelName(channelName) | ||
.serviceName(endpoint.serviceName()) | ||
.endpoint(endpoint.endpointName()) | ||
.retryable("true") | ||
.build()); | ||
this.nonretryableRequestSize = Suppliers.memoize(() -> dialogueClientMetrics | ||
.requestsSize() | ||
.channelName(channelName) | ||
.serviceName(endpoint.serviceName()) | ||
.endpoint(endpoint.endpointName()) | ||
.retryable("false") | ||
.build()); | ||
} | ||
|
||
@Override | ||
public ListenableFuture<Response> execute(Request request) { | ||
Request augmentedRequest = wrap(request); | ||
return delegate.execute(augmentedRequest); | ||
} | ||
|
||
private Request wrap(Request request) { | ||
Optional<RequestBody> body = request.body(); | ||
if (body.isEmpty()) { | ||
// No need to record empty bodies | ||
return request; | ||
} | ||
Comment on lines
+80
to
+83
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Currently this will record metrics for both requests which can and cannot be retried, without distinction. Should we tag metrics based on whether or not retires are allowed? (we could hold two histograms and pass the correct one to |
||
Supplier<Histogram> requestSizeHistogram = | ||
body.get().repeatable() ? this.retryableRequestSize : this.nonretryableRequestSize; | ||
|
||
return Request.builder() | ||
.from(request) | ||
.body(new RequestSizeRecordingRequestBody(body.get(), requestSizeHistogram)) | ||
.build(); | ||
} | ||
|
||
private static class RequestSizeRecordingRequestBody implements RequestBody { | ||
private final RequestBody delegate; | ||
private final Supplier<Histogram> size; | ||
|
||
RequestSizeRecordingRequestBody(RequestBody requestBody, Supplier<Histogram> size) { | ||
this.delegate = requestBody; | ||
this.size = size; | ||
} | ||
|
||
@Override | ||
public void writeTo(OutputStream output) throws IOException { | ||
CountingOutputStream countingOut = new CountingOutputStream(output); | ||
delegate.writeTo(countingOut); | ||
if (countingOut.getCount() > MIN_REPORTED_REQUEST_SIZE) { | ||
size.get().update(countingOut.getCount()); | ||
} | ||
} | ||
|
||
@Override | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to override and pass through the |
||
public String contentType() { | ||
return delegate.contentType(); | ||
} | ||
|
||
@Override | ||
public boolean repeatable() { | ||
return delegate.repeatable(); | ||
} | ||
|
||
@Override | ||
public void close() { | ||
delegate.close(); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,185 @@ | ||
/* | ||
* (c) Copyright 2023 Palantir Technologies Inc. All rights reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.palantir.dialogue.core; | ||
|
||
import static org.assertj.core.api.Assertions.assertThat; | ||
|
||
import com.codahale.metrics.Snapshot; | ||
import com.google.common.util.concurrent.Futures; | ||
import com.google.common.util.concurrent.ListenableFuture; | ||
import com.palantir.conjure.java.client.config.ClientConfiguration; | ||
import com.palantir.dialogue.EndpointChannel; | ||
import com.palantir.dialogue.Request; | ||
import com.palantir.dialogue.RequestBody; | ||
import com.palantir.dialogue.Response; | ||
import com.palantir.dialogue.TestConfigurations; | ||
import com.palantir.dialogue.TestEndpoint; | ||
import com.palantir.dialogue.TestResponse; | ||
import com.palantir.tritium.metrics.registry.DefaultTaggedMetricRegistry; | ||
import com.palantir.tritium.metrics.registry.MetricName; | ||
import com.palantir.tritium.metrics.registry.TaggedMetricRegistry; | ||
import java.io.ByteArrayOutputStream; | ||
import java.io.IOException; | ||
import java.io.OutputStream; | ||
import java.nio.charset.StandardCharsets; | ||
import java.util.OptionalLong; | ||
import java.util.concurrent.ExecutionException; | ||
import org.junit.jupiter.api.Test; | ||
import org.junit.jupiter.api.extension.ExtendWith; | ||
import org.mockito.Mock; | ||
import org.mockito.junit.jupiter.MockitoExtension; | ||
|
||
@ExtendWith(MockitoExtension.class) | ||
andybradshaw marked this conversation as resolved.
Show resolved
Hide resolved
|
||
public class RequestSizeMetricsChannelTest { | ||
@Mock | ||
DialogueChannelFactory factory; | ||
andybradshaw marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
@Test | ||
public void records_request_size_metrics() throws ExecutionException, InterruptedException { | ||
TaggedMetricRegistry registry = DefaultTaggedMetricRegistry.getDefault(); | ||
andybradshaw marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
ByteArrayOutputStream baos = new ByteArrayOutputStream(); | ||
int recordableRequestSize = 2 << 20; | ||
byte[] expected = "a".repeat(recordableRequestSize).getBytes(StandardCharsets.UTF_8); | ||
Request request = Request.builder() | ||
.body(new RequestBody() { | ||
@Override | ||
public void writeTo(OutputStream output) throws IOException { | ||
output.write(expected); | ||
} | ||
|
||
@Override | ||
public String contentType() { | ||
return "text/plain"; | ||
} | ||
|
||
@Override | ||
public boolean repeatable() { | ||
return true; | ||
} | ||
|
||
@Override | ||
public OptionalLong contentLength() { | ||
return OptionalLong.of(expected.length); | ||
} | ||
|
||
@Override | ||
public void close() {} | ||
}) | ||
.build(); | ||
|
||
EndpointChannel channel = RequestSizeMetricsChannel.create( | ||
ImmutableConfig.builder() | ||
.channelName("channelName") | ||
.channelFactory(factory) | ||
.rawConfig(ClientConfiguration.builder() | ||
.from(TestConfigurations.create("https://foo:10001")) | ||
.taggedMetricRegistry(registry) | ||
.build()) | ||
.build(), | ||
r -> { | ||
try { | ||
RequestBody body = r.body().get(); | ||
body.writeTo(baos); | ||
andybradshaw marked this conversation as resolved.
Show resolved
Hide resolved
|
||
body.close(); | ||
} catch (IOException e) { | ||
andybradshaw marked this conversation as resolved.
Show resolved
Hide resolved
|
||
throw new RuntimeException(e); | ||
} | ||
return Futures.immediateFuture(new TestResponse().code(200)); | ||
}, | ||
TestEndpoint.GET); | ||
ListenableFuture<Response> response = channel.execute(request); | ||
|
||
assertThat(response.get().code()).isEqualTo(200); | ||
Snapshot snapshot = DialogueClientMetrics.of(registry) | ||
.requestsSize() | ||
.channelName("channelName") | ||
.serviceName("service") | ||
.endpoint("endpoint") | ||
.retryable("true") | ||
.build() | ||
.getSnapshot(); | ||
assertThat(snapshot.size()).isEqualTo(1); | ||
assertThat(snapshot.get99thPercentile()).isEqualTo(expected.length); | ||
} | ||
|
||
@Test | ||
public void small_request_not_recorded() throws ExecutionException, InterruptedException { | ||
TaggedMetricRegistry registry = DefaultTaggedMetricRegistry.getDefault(); | ||
andybradshaw marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
ByteArrayOutputStream baos = new ByteArrayOutputStream(); | ||
byte[] expected = "test request body".getBytes(StandardCharsets.UTF_8); | ||
Request request = Request.builder() | ||
.body(new RequestBody() { | ||
@Override | ||
public void writeTo(OutputStream output) throws IOException { | ||
output.write(expected); | ||
} | ||
|
||
@Override | ||
public String contentType() { | ||
return "text/plain"; | ||
} | ||
|
||
@Override | ||
public boolean repeatable() { | ||
return true; | ||
} | ||
|
||
@Override | ||
public OptionalLong contentLength() { | ||
return OptionalLong.of(expected.length); | ||
} | ||
|
||
@Override | ||
public void close() {} | ||
}) | ||
.build(); | ||
|
||
EndpointChannel channel = RequestSizeMetricsChannel.create( | ||
ImmutableConfig.builder() | ||
.channelName("smallRequestChannelName") | ||
.channelFactory(factory) | ||
.rawConfig(ClientConfiguration.builder() | ||
.from(TestConfigurations.create("https://foo:10001")) | ||
.taggedMetricRegistry(registry) | ||
.build()) | ||
.build(), | ||
r -> { | ||
try { | ||
RequestBody body = r.body().get(); | ||
body.writeTo(baos); | ||
body.close(); | ||
} catch (IOException e) { | ||
throw new RuntimeException(e); | ||
} | ||
andybradshaw marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return Futures.immediateFuture(new TestResponse().code(200)); | ||
}, | ||
TestEndpoint.GET); | ||
ListenableFuture<Response> response = channel.execute(request); | ||
|
||
assertThat(response.get().code()).isEqualTo(200); | ||
MetricName metricName = DialogueClientMetrics.of(registry) | ||
.requestsSize() | ||
.channelName("smallRequestChannelName") | ||
.serviceName("service") | ||
.endpoint("endpoint") | ||
.retryable("true") | ||
.buildMetricName(); | ||
assertThat(registry.remove(metricName)).isEmpty(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'll need to wire this up from
DialogueChannel
so that it's part of the chain of channels used to send requestsThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep! wasn't sure if we'd want to do that separately or not, and also wasn't sure where in the chain to add this... maybe alongside the timing endpoint channel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, the timing endpoint channel is in a conditional to protect against an edge case where clients may arbitrarily build new
Endpoint
objects (if they're not using standard dialogue generated bindings). I don't want to miss out on that data, so perhaps we can avoid fanout by setting a relatively high threshold for reporting data, something like 1mb+.We'll need to restructure a bit of the code to avoid creating a histogram until we've seen a large request for the first time (which is impossible-ish on GET endpoints, and many PUTs/POSTs will never reach it) by creating a memoized supplier for the histogram rather than passing the histogram directly.
Setting a threshold improves the probability of capturing maximums as well, since we use a sampling reservoir, we only hold ~1024 samples at a given time (with heavy recency bias), so reducing small and uninteresting samples will increase the probability we capture outliers.