Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert disable hostname verification #422

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions Jenkinsfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/usr/bin/env groovy
/*
* Copyright [2021 - 2021] Confluent Inc.
*/
common {
slackChannel = '#connect-warn'
nodeLabel = 'docker-debian-jdk8'
downStreamValidate = false
}
46 changes: 31 additions & 15 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,21 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.confluent</groupId>
<artifactId>connect-plugins-parent</artifactId>
<version>0.6.8</version>
</parent>

<groupId>com.github.splunk.kafka.connect</groupId>
<artifactId>splunk-kafka-connect</artifactId>
<version>v2.0.5</version>
<version>v2.0.5.1</version>
<name>splunk-kafka-connect</name>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<java.version>1.8</java.version>
<junit.version>4.13.2</junit.version>
<junit.jupiter.version>5.3.2</junit.jupiter.version>
<junit.vintage.version>5.3.2</junit.vintage.version>
Expand Down Expand Up @@ -152,6 +156,12 @@
<version>3.7</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>2.23.4</version>
<scope>test</scope>
</dependency>
</dependencies>

<reporting>
Expand Down Expand Up @@ -202,19 +212,25 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>2.17</version>
<executions>
<execution>
<id>validate</id>
<phase>validate</phase>
<configuration>
<configLocation>google_checks.xml</configLocation>
</configuration>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<groupId>com.github.spotbugs</groupId>
<artifactId>spotbugs-maven-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<groupId>com.mycila</groupId>
<artifactId>license-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>**</exclude>
</excludes>
</configuration>
</plugin>
<!-- https://mvnrepository.com/artifact/org.jacoco/jacoco-maven-plugin -->

Expand Down
6 changes: 6 additions & 0 deletions src/main/java/com/splunk/hecclient/Hec.java
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,9 @@ public static CloseableHttpClient createHttpClient(final HecConfig config) {
return new HttpClientBuilder().setDisableSSLCertVerification(config.getDisableSSLCertVerification())
.setMaxConnectionPoolSizePerDestination(poolSizePerDest)
.setMaxConnectionPoolSize(poolSizePerDest * config.getUris().size())
.setSocketTimeout(config.getSocketTimeout())
.setConnectionTimeout(config.getConnectionTimeout())
.setConnectionRequestTimeout(config.getConnectionRequestTimeout())
.build();
}

Expand All @@ -286,6 +289,9 @@ public static CloseableHttpClient createHttpClient(final HecConfig config) {
.setMaxConnectionPoolSizePerDestination(poolSizePerDest)
.setMaxConnectionPoolSize(poolSizePerDest * config.getUris().size())
.setSslContext(context)
.setSocketTimeout(config.getSocketTimeout())
.setConnectionTimeout(config.getConnectionTimeout())
.setConnectionRequestTimeout(config.getConnectionRequestTimeout())
.build();
}
else {
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/splunk/hecclient/HecAckPoller.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public void add(HecChannel channel, EventBatch batch, String response) {
}

if (resp.getText() == "Invalid data format") {
log.warn("Invalid Splunk HEC data format. Ignoring events. channel={} index={} events={}", channel, channel.getIndexer(), batch.toString());
log.warn("Invalid Splunk HEC data format. Ignoring events. channel={} index={} batch={}", channel, channel.getIndexer(), batch.getUUID());
batch.commit();
List<EventBatch> committedBatches = new ArrayList<>();
committedBatches.add(batch);
Expand Down Expand Up @@ -316,7 +316,7 @@ private void findAndRemoveTimedoutBatches(Map<Long, EventBatch> batches, List<Ev
}

private void handleAckPollResponse(String resp, HecChannel channel) {
log.debug("ackPollResponse={}, channel={}", resp, channel);
log.debug("Ack response for channel={}", channel);
HecAckPollResponse ackPollResult;
try {
ackPollResult = jsonMapper.readValue(resp, HecAckPollResponse.class);
Expand Down
20 changes: 20 additions & 0 deletions src/main/java/com/splunk/hecclient/HecConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public final class HecConfig {
private int ackPollInterval = 10; // in seconds
private int ackPollThreads = 2;
private int socketTimeout = 60; // in seconds
private int connectionTimeout = 60; // in seconds
private int connectionRequestTimeout = 60; // in seconds
private int socketSendBufferSize = 8 * 1024 * 1024; // in byte
private int backoffThresholdSeconds = 60 * 1000;
private boolean enableChannelTracking = false;
Expand Down Expand Up @@ -62,6 +64,14 @@ public int getSocketTimeout() {
return socketTimeout;
}

public int getConnectionTimeout() {
return connectionTimeout;
}

public int getConnectionRequestTimeout() {
return connectionRequestTimeout;
}

public int getSocketSendBufferSize() {
return socketSendBufferSize;
}
Expand Down Expand Up @@ -119,6 +129,16 @@ public HecConfig setSocketTimeout(int timeout /*seconds*/) {
return this;
}

public HecConfig setConnectionTimeout(int timeout /*seconds*/) {
connectionTimeout = timeout;
return this;
}

public HecConfig setConnectionRequestTimeout(int timeout /*seconds*/) {
connectionRequestTimeout = timeout;
return this;
}

public HecConfig setSocketSendBufferSize(int bufSize /*bytes*/) {
socketSendBufferSize = bufSize;
return this;
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/com/splunk/hecclient/HttpClientBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public final class HttpClientBuilder {
private int maxConnectionPoolSizePerDestination = 4;
private int maxConnectionPoolSize = 4 * 2;
private int socketTimeout = 60; // in seconds
private int connectionRequestTimeout = 60; // in seconds
private int connectionTimeout = 60; // in seconds
private int socketSendBufferSize = 8 * 1024 * 1024; // in bytes
private boolean disableSSLCertVerification = false;
private SSLContext sslContext = null;
Expand All @@ -52,6 +54,16 @@ public HttpClientBuilder setSocketTimeout(int timeout /*seconds*/) {
return this;
}

public HttpClientBuilder setConnectionRequestTimeout(int timeout /*seconds*/) {
this.connectionRequestTimeout = timeout;
return this;
}

public HttpClientBuilder setConnectionTimeout(int timeout /*seconds*/) {
this.connectionTimeout = timeout;
return this;
}

public HttpClientBuilder setSocketSendBufferSize(int bufSize /*bytes*/) {
this.socketSendBufferSize = bufSize;
return this;
Expand All @@ -74,6 +86,9 @@ public CloseableHttpClient build() {
.setSoTimeout(socketTimeout * 1000)
.build();
RequestConfig requestConfig = RequestConfig.custom()
.setSocketTimeout(socketTimeout * 1000)
.setConnectionRequestTimeout(connectionRequestTimeout * 1000)
.setConnectTimeout(connectionTimeout * 1000)
.setCookieSpec(CookieSpecs.STANDARD)
.build();

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/splunk/hecclient/Indexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ private String readAndCloseResponse(CloseableHttpResponse resp) {
logBackPressure();
}

log.error("failed to post events resp={}, status={}", respPayload, status);
log.error("failed to post events status={}", status);
JsonNode jsonNode;
try {
jsonNode = jsonMapper.readTree(respPayload);
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/splunk/hecclient/ResponsePoller.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ public void add(HecChannel channel, EventBatch batch, String resp) {
return;
}
if (response.getText() == "Invalid data format") {
log.warn("Invalid Splunk HEC data format. Ignoring events. channel={} index={} events={}", channel, channel.getIndexer(), batch.toString());
log.warn("Invalid Splunk HEC data format. Ignoring events. channel={} index={} batch={}", channel, channel.getIndexer(), batch.getUUID());
}
} catch (Exception ex) {
log.error("failed to parse response", resp, ex);
log.error("failed to parse response", ex);
fail(channel, batch, ex);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public KafkaRecordTracker() {
* @param batches the acked event batches
*/
public void removeAckedEventBatches(final List<EventBatch> batches) {
log.debug("received acked event batches={}", batches);
log.debug("received acked event batches={}", batches.size());
/* Loop all *assigned* partitions to find the lowest consecutive
* HEC-commited offsets. A batch could contain events coming from a
* variety of topic/partitions, and scanning those events coulb be
Expand Down
Loading
Loading