Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

few changes to fix issues #2

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
5 changes: 4 additions & 1 deletion src/main/config/log4j.prod.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@

# root logger.
log4j.rootLogger=DEBUG, CONSOLE, ROLLINGFILE
log4j.logger.org.apache.kafka.clients.consumer.internals.Fetcher=DEBUG
log4j.logger.com.pinterest.secor.consumer.Consumer=DEBUG
log4j.logger.com.pinterest.secor.common.SingleProcessCounter=DEBUG

log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.Threshold=WARN
log4j.appender.CONSOLE.Threshold=INFO
log4j.appender.CONSOLE.Target=System.err
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] (%C:%L) %-5p %m%n
Expand Down
6 changes: 1 addition & 5 deletions src/main/java/com/pinterest/secor/common/FileRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import java.util.*;

/**
* FileRegistry keeps track of local log files currently being appended to and the associated
Expand Down
12 changes: 3 additions & 9 deletions src/main/java/com/pinterest/secor/common/LegacyKafkaClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,7 @@
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetRequest;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.TopicMetadataResponse;
import kafka.javaapi.*;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;
import org.apache.kafka.common.protocol.Errors;
Expand Down Expand Up @@ -234,8 +228,8 @@ public Message getCommittedMessage(TopicPartition topicPartition) throws Excepti
}
return getMessage(topicPartition, committedOffset, consumer);
} catch (MessageDoesNotExistException e) {
// If a MessageDoesNotExistException exception is raised,
// the message at the last committed offset does not exist in Kafka.
// If a MessageDoesNotExistException exception is raised,
// the message at the last committed offset does not exist in Kafka.
// This is usually due to the message being compacted away by the
// Kafka log compaction process.
//
Expand Down
33 changes: 33 additions & 0 deletions src/main/java/com/pinterest/secor/common/OffsetTracker.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,37 @@ public long setCommittedOffsetCount(TopicPartition topicPartition, long count) {
mCommittedOffsetCount.put(topicPartition, count);
return trueCommittedOffsetCount;
}


public String toString() {

StringBuilder sb = new StringBuilder();
sb.append("Topic Offset dump: \n");
sb.append("First Seen offset:\n");
dump(mFirstSeendOffset, sb);

sb.append("Last Seen offset:\n");
dump(mLastSeenOffset, sb);

sb.append("Committed offset: \n");
dump(mCommittedOffsetCount, sb);

return sb.toString();

}

private StringBuilder dump(HashMap<TopicPartition, Long> offsetMap, StringBuilder sb) {
offsetMap.forEach((tp, offset) -> {
sb
.append("[")
.append(tp.toString())
.append(", Offset:" + offset)
.append("]")
.append("\n");
});

return sb;
}


}
4 changes: 4 additions & 0 deletions src/main/java/com/pinterest/secor/common/SecorConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ public String getFetchMinBytes() {
return getString("kafka.fetch.min.bytes");
}

public String getMetaDataRefreshInterval() {
return getString("kafka.metadata.max.age.ms", "90000");
}

public String getFetchMaxBytes() {
return getString("kafka.fetch.max.bytes");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ public GenericRecord deserialize(String topic, byte[] message) {
GenericRecord record = (GenericRecord) deserializer.deserialize(topic, message);
if (record != null) {
Schema schema = record.getSchema();
schemas.put(topic, schema);
Copy link

@ruccoon ruccoon May 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @dorjeetsering, just curious here, do we need remove this line, looks like nothing get updated to the record

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the main change we want. We can remove the null check block.
if (record != null) {
Schema schema = record.getSchema();
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
return record;
}
Expand All @@ -84,9 +83,14 @@ public GenericRecord deserialize(String topic, byte[] message) {
*/
public Schema getSchema(String topic) {
Schema schema = schemas.get(topic);
if(schema != null) {
LOG.info("Found the schema in memory map for topic " + topic);
LOG.info("schema = " + schema.toString(true));
}
if (schema == null) {
try {
SchemaMetadata schemaMetadata = schemaRegistryClient.getLatestSchemaMetadata(topic + "-value");
LOG.info("Pulled schema from schema registry api for topic = " + topic + ", schema id = " + schemaMetadata.getId() + ", schema = " + schemaMetadata.getSchema() + ", schema version = " + schemaMetadata.getVersion());
schema = schemaRegistryClient.getByID(schemaMetadata.getId());
schemas.put(topic, schema);
} catch (IOException e) {
Expand Down
89 changes: 89 additions & 0 deletions src/main/java/com/pinterest/secor/common/SingleProcessCounter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package com.pinterest.secor.common;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class SingleProcessCounter {
private static final Logger LOG = LoggerFactory.getLogger(SingleProcessCounter.class);

private ConcurrentHashMap<TopicPartition, Long> mMessageUploadCounter;

private ConcurrentHashMap<TopicPartition, Long> mMessageLocalCounter;

private static volatile SingleProcessCounter counter = null;

private static Object lock = new Object();

private SingleProcessCounter() {
mMessageLocalCounter = new ConcurrentHashMap<>();
mMessageUploadCounter = new ConcurrentHashMap<>();
}

public static SingleProcessCounter getSingleProcessCounter() {
if (counter != null) return counter;

synchronized (lock) {
if (counter == null)
counter = new SingleProcessCounter();
}
return counter;
}

public void increment(TopicPartition tp, Long delta) {
long bufferValue = mMessageLocalCounter.merge(tp, delta, (v_old, v_delta) -> v_old + v_delta);

if (LOG.isDebugEnabled())
LOG.debug("Topic {} Partition {} local message {}", tp.getTopic(), tp.getPartition(), bufferValue);

}

public void decrement(TopicPartition tp, Long delta) {
long bufferValue = mMessageLocalCounter.merge(tp, delta, (v_old, v_delta) -> v_old - v_delta);

if (LOG.isDebugEnabled())
LOG.debug("Topic {} Partition {} local message {}", tp.getTopic(), tp.getPartition(), bufferValue);
}

public void topicUploaded(TopicPartition tp) {
long counter = getLocalCounter(tp);
mMessageUploadCounter.merge(tp, counter, (v_old, v_delta) -> v_old + v_delta);
decrement(tp, counter);
}

public long getLocalCounter(TopicPartition tp) {
return mMessageLocalCounter.getOrDefault(tp, 0l);
}

public long getTotalCounter(TopicPartition tp) {
return mMessageLocalCounter.values().stream().reduce((a, b) -> a + b).orElse(0l) + mMessageUploadCounter.values().stream().reduce((a, b) -> a + b).orElse(0l);

}

public String toString() {

StringBuilder sb = new StringBuilder();
sb.append("Message completed stats: \n");
toString(mMessageLocalCounter, sb, "Current local Msg written counter: ");
toString(mMessageUploadCounter, sb, "Uploaded Msg counter ");

return sb.toString();
}

private void toString(Map<TopicPartition, Long> map, StringBuilder sb, String msg) {
map.forEach((tp, offset) -> {
sb
.append("[")
.append(tp.toString())
.append("," + msg + offset)
.append("]")
.append("\n");
});
}

public void resetLocalCount(TopicPartition topicPartition) {
mMessageLocalCounter.merge(topicPartition, 0l, (v_old, v_set) -> v_set);
}
}
18 changes: 15 additions & 3 deletions src/main/java/com/pinterest/secor/consumer/Consumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.pinterest.secor.common.FileRegistry;
import com.pinterest.secor.common.OffsetTracker;
import com.pinterest.secor.common.SecorConfig;
import com.pinterest.secor.common.SingleProcessCounter;
import com.pinterest.secor.common.TopicPartition;
import com.pinterest.secor.message.Message;
import com.pinterest.secor.message.ParsedMessage;
import com.pinterest.secor.monitoring.MetricCollector;
Expand Down Expand Up @@ -75,6 +77,7 @@ public class Consumer extends Thread {
private boolean mUploadOnShutdown;
private volatile boolean mShuttingDown = false;
private static volatile boolean mCallingSystemExit = false;
private static SingleProcessCounter spc = SingleProcessCounter.getSingleProcessCounter();

public Consumer(SecorConfig config) {
mConfig = config;
Expand Down Expand Up @@ -156,8 +159,9 @@ public void run() {
// check upload policy every N seconds or 10,000 messages/consumer timeouts
long checkEveryNSeconds = Math.min(10 * 60, mConfig.getMaxFileAgeSeconds() / 2);
long checkMessagesPerSecond = mConfig.getMessagesPerSecond();
long nMessages = 0;
long nMsgPulls = 0;
long lastChecked = System.currentTimeMillis();
long timeStamp = System.currentTimeMillis();
while (true) {
boolean hasMoreMessages = consumeNextMessage();
if (!hasMoreMessages) {
Expand All @@ -170,8 +174,14 @@ public void run() {
}

long now = System.currentTimeMillis();

if (nMsgPulls % 1000 == 0 || now - timeStamp > 60 * 1000) {
LOG.debug("nMsgPulls: " + nMsgPulls + " lastChecked: " + lastChecked);
timeStamp = now;
}

if (mDeterministicUploadPolicyTracker != null ||
nMessages++ % checkMessagesPerSecond == 0 ||
nMsgPulls++ % checkMessagesPerSecond == 0 ||
(now - lastChecked) > checkEveryNSeconds * 1000) {
lastChecked = now;
checkUploadPolicy(false);
Expand All @@ -191,6 +201,7 @@ public void run() {

protected void checkUploadPolicy(boolean forceUpload) {
try {
LOG.debug("checkUploadPolicy invoked, " + mOffsetTracker.toString() + ", " + spc.toString());
mUploader.applyPolicy(forceUpload);
} catch (Exception e) {
throw new RuntimeException("Failed to apply upload policy", e);
Expand Down Expand Up @@ -242,7 +253,8 @@ protected boolean consumeNextMessage() {
if (parsedMessage != null) {
try {
mMessageWriter.write(parsedMessage);

spc.increment(new TopicPartition(rawMessage.getTopic(),
rawMessage.getKafkaPartition()), 1l);
mMetricCollector.metric("consumer.message_size_bytes", rawMessage.getPayload().length, rawMessage.getTopic());
mMetricCollector.increment("consumer.throughput_bytes", rawMessage.getPayload().length, rawMessage.getTopic());
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,14 @@ public long getLength() throws IOException {

@Override
public void write(KeyValue keyValue) throws IOException {
GenericRecord record = schemaRegistry.deserialize(topic, keyValue.getValue());
LOG.trace("Writing record {}", record);
if (record != null){
writer.write(record);
try {
GenericRecord record = schemaRegistry.deserialize(topic, keyValue.getValue());
LOG.trace("Writing record {}", record);
if (record != null) {
writer.write(record);
}
} catch (Exception ex) {
LOG.error("Failed to write but will continue with next messages. Error message = " + ex.getMessage());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,6 @@
*/
package com.pinterest.secor.io.impl;

import java.io.IOException;
import java.io.StringWriter;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.Lz4Codec;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.orc.CompressionKind;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.pinterest.secor.common.FileRegistry;
Expand All @@ -54,6 +32,22 @@
import com.pinterest.secor.util.orc.VectorColumnFiller;
import com.pinterest.secor.util.orc.VectorColumnFiller.JsonConverter;
import com.pinterest.secor.util.orc.schema.ORCSchemaProvider;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.Lz4Codec;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.orc.*;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.StringWriter;
import java.util.List;

/**
* ORC reader/writer implementation
Expand Down Expand Up @@ -155,7 +149,7 @@ public JsonORCFileWriter(LogFilePath logFilePath, CompressionCodec codec)
if (schema == null) {
String topic = logFilePath.getTopic();
throw new IllegalArgumentException(
String.format("No schema is provided for topic '%s'", topic));
String.format("No schema is provided for topic '%s'", topic));
}
List<TypeDescription> fieldTypes = schema.getChildren();
converters = new JsonConverter[fieldTypes.size()];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,7 @@
import com.pinterest.secor.common.SecorConfig;
import com.pinterest.secor.tools.LogFilePrinter;
import com.pinterest.secor.util.FileUtil;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,7 @@
import com.pinterest.secor.common.SecorConfig;
import com.pinterest.secor.tools.LogFileVerifier;
import com.pinterest.secor.util.FileUtil;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,7 @@
package com.pinterest.secor.main;

import com.pinterest.secor.tools.TestLogMessageProducer;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Loading