Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

deps: Update dependency org.apache.httpcomponents.core5:httpcore5 to v5.3.1 #805

Merged
merged 3 commits into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package io.zeebe.containers.exporter;

import io.camunda.zeebe.protocol.record.Record;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
Expand All @@ -24,12 +26,21 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import net.jcip.annotations.ThreadSafe;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.URIScheme;
import org.apache.hc.core5.http.config.CharCodingConfig;
import org.apache.hc.core5.http.impl.HttpProcessors;
import org.apache.hc.core5.http.impl.bootstrap.AsyncServerBootstrap;
import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
import org.apache.hc.core5.http.impl.routing.RequestRouter;
import org.apache.hc.core5.http.nio.AsyncDataConsumer;
import org.apache.hc.core5.http.nio.AsyncFilterChain;
import org.apache.hc.core5.http.nio.AsyncFilterHandler;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.net.URIAuthority;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.reactor.ListenerEndpoint;
import org.apiguardian.api.API;
Expand Down Expand Up @@ -257,11 +268,13 @@ private HttpAsyncServer createServer() {
return AsyncServerBootstrap.bootstrap()
.setIOReactorConfig(config)
.setCanonicalHostName("localhost")
.setExceptionCallback(e -> LOGGER.warn("Error occurred in DebugReceiver server", e))
.setCharCodingConfig(CharCodingConfig.custom().setCharset(StandardCharsets.UTF_8).build())
.setHttpProcessor(HttpProcessors.server("ztc-debug/1.1"))
// need to register the handler on both the primary and possibly Testcontainers' proxy for
// our local server, as otherwise the requests with hosts that do not match will be skipped
.registerVirtual(GenericContainer.INTERNAL_HOST_HOSTNAME, "/records", recordHandler)
.register(GenericContainer.INTERNAL_HOST_HOSTNAME, "/records", recordHandler)
.register("127.0.0.1", "/records", recordHandler)
.register("/records", recordHandler)
.create();
}
Expand Down
84 changes: 49 additions & 35 deletions core/src/main/java/io/zeebe/containers/exporter/RecordHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,15 @@
*/
package io.zeebe.containers.exporter;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.camunda.zeebe.protocol.jackson.ZeebeProtocolModule;
import io.camunda.zeebe.protocol.record.Record;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import org.apache.hc.core5.http.EntityDetails;
Expand Down Expand Up @@ -58,7 +57,7 @@
* <li>200 - the records were passed through, and the response body will be a singleton map with
* one key, <em>position</em>, the value of which is the highest acknowledged position for the
* partition ID from which the records are coming from
* <li>204 - there were either no records passed, or there is no known acknowledged position yet
* <li>200 - there were either no records passed, or there is no known acknowledged position yet
* for the partition form which the records are coming from
* <li>400 - if there is no request body, or the request body cannot be parsed as a list of
* records
Expand All @@ -69,6 +68,7 @@ final class RecordHandler implements AsyncServerRequestHandler<Message<HttpReque
private static final Logger LOGGER = LoggerFactory.getLogger(RecordHandler.class);
private static final ObjectMapper MAPPER =
new ObjectMapper().registerModule(new ZeebeProtocolModule());
private static final byte[] EMPTY_BODY = "{}".getBytes(StandardCharsets.UTF_8);

private final Consumer<Record<?>> recordConsumer;
private final boolean autoAcknowledge;
Expand Down Expand Up @@ -96,32 +96,27 @@ public void handle(
final HttpContext context)
throws HttpException, IOException {
final byte[] requestBody = requestObject.getBody();
final AsyncResponseProducer responseProducer = handleRequest(requestBody);
responseTrigger.submitResponse(responseProducer, context);
}

private AsyncResponseProducer handleRequest(final byte[] requestBody)
throws JsonProcessingException {
if (requestBody == null || requestBody.length == 0) {
final BasicHttpResponse response =
new BasicHttpResponse(HttpStatus.SC_BAD_REQUEST, "must send a list of records as body");
responseTrigger.submitResponse(new BasicResponseProducer(response), context);
return;
return createErrorResponse(HttpStatus.SC_BAD_REQUEST, "Must send a list of records as body");
}

final List<Record<?>> records;
try {
records = MAPPER.readValue(requestBody, new TypeReference<List<Record<?>>>() {});
} catch (final IOException e) {
final BasicHttpResponse response =
new BasicHttpResponse(
HttpStatus.SC_BAD_REQUEST,
"failed to deserialize records, see receiver logs for more");
responseTrigger.submitResponse(new BasicResponseProducer(response), context);
LOGGER.warn("Failed to deserialize exported records", e);

return;
return createErrorResponse(
HttpStatus.SC_BAD_REQUEST, "Failed to deserialize records, see receiver logs for more");
}

if (records.isEmpty()) {
final BasicHttpResponse response =
new BasicHttpResponse(HttpStatus.SC_NO_CONTENT, "no records given");
responseTrigger.submitResponse(new BasicResponseProducer(response), context);
return;
LOGGER.debug("No records given, will return a successful response regardless");
}

for (final Record<?> record : records) {
Expand All @@ -132,26 +127,45 @@ public void handle(
}
}

final int partitionId = records.get(0).getPartitionId();
final AsyncResponseProducer responseProducer = createSuccessfulResponse(partitionId);
responseTrigger.submitResponse(responseProducer, context);
return createSuccessfulResponse(records);
}

private AsyncResponseProducer createSuccessfulResponse(final int partitionId)
private AsyncResponseProducer createSuccessfulResponse(final List<Record<?>> records)
throws JsonProcessingException {
final Long position = positions.get(partitionId);

if (position == null) {
final HttpResponse response =
new BasicHttpResponse(
HttpStatus.SC_NO_CONTENT, "no acknowledged position for partition " + partitionId);
return new BasicResponseProducer(response);
}

final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_OK);
response.setHeader("Content-Type", "application/json");
final byte[] responseBody =
MAPPER.writeValueAsBytes(Collections.singletonMap("position", position));
records.isEmpty()
? EMPTY_BODY
: MAPPER.writeValueAsBytes(
Collections.singletonMap(
"position", positions.get(records.get(0).getPartitionId())));

response.setHeader("Content-Type", "application/json; charset=UTF-8");
return new BasicResponseProducer(response, new BasicAsyncEntityProducer(responseBody));
}

private AsyncResponseProducer createErrorResponse(final int status, final String message)
throws JsonProcessingException {
final ProblemDetail problem = new ProblemDetail(status, message);
final HttpResponse response = new BasicHttpResponse(status);
response.setHeader("Content-Type", "application/json; charset=UTF-8");

final byte[] responseBody = MAPPER.writeValueAsBytes(problem);
return new BasicResponseProducer(response, new BasicAsyncEntityProducer(responseBody));
}

@SuppressWarnings({"FieldCanBeLocal", "unused"})
@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY)
private static final class ProblemDetail {
private final String type = "about:blank";
private final String instance = "/records";

private final int status;
private final String detail;

private ProblemDetail(int status, String detail) {
this.status = status;
this.detail = detail;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.test.broker.protocol.ProtocolFactory;
import java.net.ConnectException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -107,7 +108,13 @@ void shouldRetryOnNonSuccessfulHttpCode() {
void shouldHandleNoResponseBody() {
// given
final Record<?> record = recordFactory.generateRecord();
WireMock.stubFor(WireMock.post("/records").willReturn(WireMock.aResponse().withStatus(204)));
WireMock.stubFor(
WireMock.post("/records")
.willReturn(
WireMock.aResponse()
.withStatus(200)
.withResponseBody(
Body.fromJsonBytes("{}".getBytes(StandardCharsets.UTF_8)))));
controller.updateLastExportedRecordPosition(10L);

// when
Expand Down Expand Up @@ -141,7 +148,7 @@ void shouldExportRecordAsList() {
// given
final Record<?> record = recordFactory.generateRecord();
final String expectedRequestBody = Json.write(Collections.singletonList(record));
WireMock.stubFor(WireMock.post("/records").willReturn(WireMock.aResponse().withStatus(204)));
WireMock.stubFor(WireMock.post("/records").willReturn(WireMock.aResponse().withStatus(200)));

// when
exporter.export(record);
Expand All @@ -158,7 +165,7 @@ void shouldExportRecordsOneAtATime() {
// given
final Record<?> firstRecord = recordFactory.generateRecord();
final Record<?> secondRecord = recordFactory.generateRecord();
WireMock.stubFor(WireMock.post("/records").willReturn(WireMock.aResponse().withStatus(204)));
WireMock.stubFor(WireMock.post("/records").willReturn(WireMock.aResponse().withStatus(200)));

// when
exporter.export(firstRecord);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ void shouldBindToPortOnStart() throws IOException {
.body(Collections.emptyList())
.post(receiver.recordsEndpoint())
.then()
.statusCode(204);
.statusCode(200);
}
}

Expand All @@ -145,7 +145,7 @@ void shouldBindToRandomPort() {
.body(Collections.emptyList())
.post(receiver.recordsEndpoint())
.then()
.statusCode(204);
.statusCode(200);
}
}

Expand Down Expand Up @@ -209,6 +209,7 @@ void shouldAcknowledgePosition() {
RestAssured.given()
.body(Collections.singletonList(record))
.contentType("application/json")
.accept("application/json")
.post(receiver.recordsEndpoint())
.as(Map.class);
assertThat(response).containsEntry("position", 20);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.net.http.HttpRequest.BodyPublishers;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.Objects;
Expand Down Expand Up @@ -99,6 +100,13 @@ public void export(final Record<?> record) {
Thread.currentThread().interrupt();
LangUtil.rethrowUnchecked(e);
} catch (final Exception e) {
context
.getLogger()
.warn(
"Failed to export record {} of partition {}",
record.getPosition(),
record.getPartitionId(),
e);
LangUtil.rethrowUnchecked(e);
}

Expand All @@ -112,14 +120,15 @@ private void pushRecord(final Record<?> record) throws IOException, InterruptedE
final HttpResponse<byte[]> response = client.send(request, BodyHandlers.ofByteArray());
final int statusCode = response.statusCode();
if (statusCode >= 400) {
final String error =
hasResponseBody(response) ? new String(response.body(), StandardCharsets.UTF_8) : "";
throw new BadRequestException(
String.format(
"Failed to push out record with position %d on partition %d: response code %d",
record.getPosition(), record.getPartitionId(), statusCode));
"Failed to push out record with position %d on partition %d: response code=%d, error='%s'",
record.getPosition(), record.getPartitionId(), statusCode, error));
}

// is there a body to read?
if (statusCode != 204) {
if (hasResponseBody(response)) {
handleAcknowledgment(response.body(), record.getPartitionId());
}

Expand All @@ -129,6 +138,10 @@ private void pushRecord(final Record<?> record) throws IOException, InterruptedE
"Exported record {} to {} (status code: {})", record, config.endpointURI(), statusCode);
}

private boolean hasResponseBody(final HttpResponse<byte[]> response) {
return response.body() != null && response.body().length > 0;
}

private HttpRequest buildRequestForRecord(final Record<?> record) throws JsonProcessingException {
return HttpRequest.newBuilder()
.uri(config.endpointURI())
Expand All @@ -144,9 +157,9 @@ private HttpRequest buildRequestForRecord(final Record<?> record) throws JsonPro
private void handleAcknowledgment(final byte[] responseBody, final int partitionId)
throws IOException {
final RecordsResponse body = MAPPER.readValue(responseBody, RecordsResponse.class);
final long position = body.position;
final Long position = body.position;

if (position > -1) {
if (position != null && position > -1) {
controller.updateLastExportedRecordPosition(position);
context
.getLogger()
Expand Down
7 changes: 6 additions & 1 deletion justfile
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@ default:
./mvnw clean -T2C {{ mvnArgs }}

test +mvnArgs='':
./mvnw verify -DskipChecks -T1C {{ mvnArgs }}
./mvnw verify -DskipChecks -T1C {{ mvnArgs }}

# use only if you have a pretty beefy machine :) if not, you can always just set a lower forkCount by calling
# `just fast-test -DforkCount=2` (or something like that)
fast-test +mvnArgs='':
./mvnw verify -DskipChecks -T1C -Pparallel-tests -DforkCount=3

@ut +mvnArgs='': (test "-DskipITs" mvnArgs)
@it +mvnArgs='': (test "-DskipUTs" mvnArgs)
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
<version.docker>3.4.1</version.docker>
<version.duct-tape>1.0.8</version.duct-tape>
<version.feign>13.5</version.feign>
<version.httpcore5>5.2.5</version.httpcore5>
<version.httpcore5>5.3.1</version.httpcore5>
<version.jackson>2.18.2</version.jackson>
<version.jcip>1.0</version.jcip>
<version.junit-jupiter>5.11.4</version.junit-jupiter>
Expand Down
Loading