Skip to content

Commit

Permalink
Update configure for shorting metadata/topic discovery time
Browse files Browse the repository at this point in the history
  • Loading branch information
zyuan-paytm committed Jan 31, 2020
1 parent af9da35 commit a119c61
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 6 deletions.
10 changes: 9 additions & 1 deletion src/main/java/com/pinterest/secor/common/SecorConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.TimeZone;

/**
* One-stop shop for Secor configuration options.
Expand Down Expand Up @@ -221,6 +225,10 @@ public String getFetchMinBytes() {
return getString("kafka.fetch.min.bytes");
}

public String getMetaDataRefreshInterval() {
return getString("kafka.metadata.max.age.ms", "90000");
}

public String getFetchMaxBytes() {
return getString("kafka.fetch.max.bytes");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public void init(SecorConfig config) throws UnknownHostException {
optionalConfig(config.getSslProvider(), conf -> props.put("ssl.provider", conf));
optionalConfig(config.getSslTruststoreType(), conf -> props.put("ssl.truststore.type", conf));
optionalConfig(config.getNewConsumerPartitionAssignmentStrategyClass(), conf -> props.put("partition.assignment.strategy", conf));
optionalConfig(config.getMetaDataRefreshInterval(), conf -> props.put("metadata.max.age.ms", conf));

mZookeeperConnector = new ZookeeperConnector(config);
mRecordsBatch = new ArrayDeque<>();
Expand All @@ -143,7 +144,6 @@ private void optionalConfig(String maybeConf, Consumer<String> configConsumer) {
@Override
public void subscribe(RebalanceHandler handler, SecorConfig config) {
ConsumerRebalanceListener reBalanceListener = new SecorConsumerRebalanceListener(mKafkaConsumer, mZookeeperConnector, getSkipZookeeperOffsetSeek(config), config.getNewConsumerAutoOffsetReset(), handler);
;

String[] subscribeList = config.getKafkaTopicList();
if (Strings.isNullOrEmpty(subscribeList[0])) {
Expand Down
19 changes: 15 additions & 4 deletions src/main/java/com/pinterest/secor/uploader/Uploader.java
Original file line number Diff line number Diff line change
Expand Up @@ -273,10 +273,21 @@ protected void checkTopicPartition(TopicPartition topicPartition, boolean forceU
final long size = mFileRegistry.getSize(topicPartition);
final long modificationAgeSec = mFileRegistry.getModificationAgeSec(topicPartition);
LOG.debug("size: " + size + " modificationAge: " + modificationAgeSec);
shouldUpload = forceUpload ||
size >= mConfig.getMaxFileSizeBytes() ||
modificationAgeSec >= mConfig.getMaxFileAgeSeconds() ||
isRequiredToUploadAtTime(topicPartition);

boolean fileSizeTrigger = size >= mConfig.getMaxFileSizeBytes();
boolean fileAgeTrigger = modificationAgeSec >= mConfig.getMaxFileAgeSeconds();
boolean uploadTimeTrigger = isRequiredToUploadAtTime(topicPartition);
shouldUpload = forceUpload || fileAgeTrigger
|| fileSizeTrigger
|| uploadTimeTrigger;

if (shouldUpload) {
String reason = forceUpload ? "forceUpload"
: fileAgeTrigger ? String.format("fileAgeSec %s is larger than config value %s", modificationAgeSec, mConfig.getMaxFileAgeSeconds())
: fileSizeTrigger ? String.format("fileSizeBytes %s is larger than config value %s", size, mConfig.getMaxFileSizeBytes())
: String.format("requiredToUploadAtMinute %s", mConfig.getUploadMinuteMark());
LOG.info("UploadFile with topic partition [{}] flag set because [" + reason + "]", topicPartition);
}
}
if (shouldUpload) {
long newOffsetCount = mZookeeperConnector.getCommittedOffsetCount(topicPartition);
Expand Down

0 comments on commit a119c61

Please sign in to comment.