From 0617d672ec353563057261f21524b7ebb7074add Mon Sep 17 00:00:00 2001 From: zineng Date: Mon, 23 Sep 2019 11:23:05 -0400 Subject: [PATCH 1/4] Fixed mismatch between serialization with Avro and Deserialization with KafkaAvro --- .../secor/common/AvroSchemaRegistry.java | 5 +++ .../ConfigurableAvroSchemaRegistry.java | 7 ++++ .../common/SecorSchemaRegistryClient.java | 11 ++++++ .../AvroParquetFileReaderWriterFactory.java | 7 ++-- ...vroParquetFileReaderWriterFactoryTest.java | 39 ++++++++++++++++++- 5 files changed, 64 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/pinterest/secor/common/AvroSchemaRegistry.java b/src/main/java/com/pinterest/secor/common/AvroSchemaRegistry.java index 1837a18ea..784f73ba4 100644 --- a/src/main/java/com/pinterest/secor/common/AvroSchemaRegistry.java +++ b/src/main/java/com/pinterest/secor/common/AvroSchemaRegistry.java @@ -20,9 +20,14 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.specific.SpecificDatumWriter; + +import java.io.IOException; public interface AvroSchemaRegistry { GenericRecord deserialize(String topic, byte[] payload); Schema getSchema(String topic); + + byte[] serialize(SpecificDatumWriter writer, String topic, GenericRecord record) throws IOException; } diff --git a/src/main/java/com/pinterest/secor/common/ConfigurableAvroSchemaRegistry.java b/src/main/java/com/pinterest/secor/common/ConfigurableAvroSchemaRegistry.java index 170f86400..a520c75a4 100644 --- a/src/main/java/com/pinterest/secor/common/ConfigurableAvroSchemaRegistry.java +++ b/src/main/java/com/pinterest/secor/common/ConfigurableAvroSchemaRegistry.java @@ -19,11 +19,13 @@ package com.pinterest.secor.common; import com.pinterest.secor.parser.AvroMessageParser; +import com.pinterest.secor.util.AvroSerializer; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.Decoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificDatumWriter; import org.apache.kafka.common.errors.SerializationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,4 +69,9 @@ record = readers.get(topic).read(null, decoder); public Schema getSchema(String topic) { return schemas.get(topic); } + + @Override + public byte[] serialize(SpecificDatumWriter writer, String topic, GenericRecord record) throws IOException { + return AvroSerializer.serialize(writer, record); + } } diff --git a/src/main/java/com/pinterest/secor/common/SecorSchemaRegistryClient.java b/src/main/java/com/pinterest/secor/common/SecorSchemaRegistryClient.java index b1ffc3ce2..7a267d644 100644 --- a/src/main/java/com/pinterest/secor/common/SecorSchemaRegistryClient.java +++ b/src/main/java/com/pinterest/secor/common/SecorSchemaRegistryClient.java @@ -21,11 +21,14 @@ import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import io.confluent.kafka.serializers.KafkaAvroSerializer; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.specific.SpecificDatumWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.Map; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; @@ -35,6 +38,7 @@ public class SecorSchemaRegistryClient implements AvroSchemaRegistry { private static final Logger LOG = LoggerFactory.getLogger(SecorSchemaRegistryClient.class); protected KafkaAvroDeserializer deserializer; + protected KafkaAvroSerializer serializer; private final static Map schemas = new ConcurrentHashMap<>(); protected SchemaRegistryClient schemaRegistryClient; @@ -53,6 +57,7 @@ public SecorSchemaRegistryClient(SecorConfig config) { //Allows the SchemaRegistryClient to be mocked in unit tests protected void init(SecorConfig config) { deserializer = new KafkaAvroDeserializer(schemaRegistryClient); + serializer = new KafkaAvroSerializer(schemaRegistryClient); } public GenericRecord deserialize(String topic, byte[] message) { @@ -74,4 +79,10 @@ public Schema getSchema(String topic) { } return schema; } + + @Override + public byte[] serialize(SpecificDatumWriter writer, String topic, GenericRecord record) throws IOException { + return serializer.serialize(topic, record); + + } } diff --git a/src/main/java/com/pinterest/secor/io/impl/AvroParquetFileReaderWriterFactory.java b/src/main/java/com/pinterest/secor/io/impl/AvroParquetFileReaderWriterFactory.java index 90f9ee70c..e4907c0a7 100644 --- a/src/main/java/com/pinterest/secor/io/impl/AvroParquetFileReaderWriterFactory.java +++ b/src/main/java/com/pinterest/secor/io/impl/AvroParquetFileReaderWriterFactory.java @@ -26,7 +26,6 @@ import com.pinterest.secor.io.FileWriter; import com.pinterest.secor.io.KeyValue; import com.pinterest.secor.util.AvroSchemaRegistryFactory; -import com.pinterest.secor.util.AvroSerializer; import com.pinterest.secor.util.ParquetUtil; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; @@ -75,10 +74,12 @@ protected class AvroParquetFileReader implements FileReader { private ParquetReader reader; private SpecificDatumWriter writer; private long offset; + String topic; + public AvroParquetFileReader(LogFilePath logFilePath, CompressionCodec codec) throws IOException { Path path = new Path(logFilePath.getLogFilePath()); - String topic = logFilePath.getTopic(); + topic = logFilePath.getTopic(); Schema schema = schemaRegistry.getSchema(topic); reader = AvroParquetReader.builder(path).build(); writer = new SpecificDatumWriter(schema); @@ -89,7 +90,7 @@ public AvroParquetFileReader(LogFilePath logFilePath, CompressionCodec codec) th public KeyValue next() throws IOException { GenericRecord record = reader.read(); if (record != null) { - return new KeyValue(offset++, AvroSerializer.serialize(writer, record)); + return new KeyValue(offset++, schemaRegistry.serialize(writer, topic, record)); } return null; } diff --git a/src/test/java/com/pinterest/secor/io/impl/AvroParquetFileReaderWriterFactoryTest.java b/src/test/java/com/pinterest/secor/io/impl/AvroParquetFileReaderWriterFactoryTest.java index e3b1cefa8..671b4a391 100644 --- a/src/test/java/com/pinterest/secor/io/impl/AvroParquetFileReaderWriterFactoryTest.java +++ b/src/test/java/com/pinterest/secor/io/impl/AvroParquetFileReaderWriterFactoryTest.java @@ -30,6 +30,8 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.kafka.common.errors.SerializationException; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mockito; @@ -40,6 +42,7 @@ import java.util.stream.Stream; import static org.junit.Assert.assertArrayEquals; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.when; @@ -76,8 +79,8 @@ public void testAvroParquetReadWriteRoundTripUsingSchemaRegistry() throws Except SecorSchemaRegistryClient secorSchemaRegistryClient = Mockito.mock(SecorSchemaRegistryClient.class); when(secorSchemaRegistryClient.getSchema(anyString())).thenReturn(schema); - when(secorSchemaRegistryClient.deserialize("test-avro-topic", AvroSerializer.serialize(writer, msg1))).thenReturn(msg1); - when(secorSchemaRegistryClient.deserialize("test-avro-topic", AvroSerializer.serialize(writer, msg2))).thenReturn(msg2); + when(secorSchemaRegistryClient.serialize(any(SpecificDatumWriter.class), any(String.class), any(GenericRecord.class))).thenReturn(AvroSerializer.serialize(writer, msg1), AvroSerializer.serialize(writer, msg2)); + when(secorSchemaRegistryClient.deserialize(any(String.class), any(byte[].class))).thenReturn(msg1, msg2, msg1, msg2); mFactory.schemaRegistry = secorSchemaRegistryClient; when(config.getFileReaderWriterFactory()) .thenReturn(AvroParquetFileReaderWriterFactory.class.getName()); @@ -108,6 +111,38 @@ public void testAvroParquetReadWriteRoundTripWithoutSchemaRegistry() throws Exce testAvroParquetReadWriteRoundTrip(configurableAvroSchemaRegistry); } + + @Ignore + public void testAvroParquetReadWriterRoundTripWithConfluentSchemaRegister() throws Exception { + + Schema schema = SchemaBuilder.record("UnitTestRecord") + .fields() + .name("data").type().stringType().noDefault() + .name("timestamp").type().nullable().longType().noDefault() + .endRecord(); + + GenericRecordBuilder builder = new GenericRecordBuilder(schema); + msg1 = builder.set("data", "foo").set("timestamp", 1467176315L).build(); + writer = new SpecificDatumWriter(schema); + SecorSchemaRegistryClient secorSchemaRegistryClient = new SecorSchemaRegistryClient(config); + + KeyValue kv1 = (new KeyValue(23232, AvroSerializer.serialize(writer, msg1))); + + + Exception e = null; + try { + secorSchemaRegistryClient.deserialize("test-avro-topic", kv1.getValue()); + } catch (SerializationException se) { + se.printStackTrace(); + e = se; + } catch (Exception otherE) { + //ignore + } + + assertEquals(null, e); + + } + private void testAvroParquetReadWriteRoundTrip(AvroSchemaRegistry schemaRegistry) throws Exception { LogFilePath tempLogFilePath = new LogFilePath(Files.createTempDir().toString(), "test-avro-topic", new String[] { "part-1" }, 0, 1, 23232, ".avro"); From af9da35bb545badb3329404a2332f7de5d0cc082 Mon Sep 17 00:00:00 2001 From: zineng Date: Tue, 15 Oct 2019 16:30:36 -0400 Subject: [PATCH 2/4] Update logger for internal use --- src/main/config/log4j.prod.properties | 5 +- .../pinterest/secor/common/OffsetTracker.java | 33 +++++++ .../secor/common/SingleProcessCounter.java | 89 +++++++++++++++++++ .../pinterest/secor/consumer/Consumer.java | 18 +++- .../pinterest/secor/reader/MessageReader.java | 4 +- .../pinterest/secor/uploader/Uploader.java | 17 ++-- src/main/scripts/docker-entrypoint.sh | 3 +- 7 files changed, 156 insertions(+), 13 deletions(-) create mode 100644 src/main/java/com/pinterest/secor/common/SingleProcessCounter.java diff --git a/src/main/config/log4j.prod.properties b/src/main/config/log4j.prod.properties index fce4ca6c7..7067a9050 100644 --- a/src/main/config/log4j.prod.properties +++ b/src/main/config/log4j.prod.properties @@ -2,9 +2,12 @@ # root logger. log4j.rootLogger=DEBUG, CONSOLE, ROLLINGFILE +log4j.logger.org.apache.kafka.clients.consumer.internals.Fetcher=DEBUG +log4j.logger.com.pinterest.secor.consumer.Consumer=DEBUG +log4j.logger.com.pinterest.secor.common.SingleProcessCounter=DEBUG log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender -log4j.appender.CONSOLE.Threshold=WARN +log4j.appender.CONSOLE.Threshold=INFO log4j.appender.CONSOLE.Target=System.err log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] (%C:%L) %-5p %m%n diff --git a/src/main/java/com/pinterest/secor/common/OffsetTracker.java b/src/main/java/com/pinterest/secor/common/OffsetTracker.java index 1ec167da3..68cb90fbe 100644 --- a/src/main/java/com/pinterest/secor/common/OffsetTracker.java +++ b/src/main/java/com/pinterest/secor/common/OffsetTracker.java @@ -97,4 +97,37 @@ public long setCommittedOffsetCount(TopicPartition topicPartition, long count) { mCommittedOffsetCount.put(topicPartition, count); return trueCommittedOffsetCount; } + + + public String toString() { + + StringBuilder sb = new StringBuilder(); + sb.append("Topic Offset dump: \n"); + sb.append("First Seen offset:\n"); + dump(mFirstSeendOffset, sb); + + sb.append("Last Seen offset:\n"); + dump(mLastSeenOffset, sb); + + sb.append("Committed offset: \n"); + dump(mCommittedOffsetCount, sb); + + return sb.toString(); + + } + + private StringBuilder dump(HashMap offsetMap, StringBuilder sb) { + offsetMap.forEach((tp, offset) -> { + sb + .append("[") + .append(tp.toString()) + .append(", Offset:" + offset) + .append("]") + .append("\n"); + }); + + return sb; + } + + } diff --git a/src/main/java/com/pinterest/secor/common/SingleProcessCounter.java b/src/main/java/com/pinterest/secor/common/SingleProcessCounter.java new file mode 100644 index 000000000..523666a80 --- /dev/null +++ b/src/main/java/com/pinterest/secor/common/SingleProcessCounter.java @@ -0,0 +1,89 @@ +package com.pinterest.secor.common; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class SingleProcessCounter { + private static final Logger LOG = LoggerFactory.getLogger(SingleProcessCounter.class); + + private ConcurrentHashMap mMessageUploadCounter; + + private ConcurrentHashMap mMessageLocalCounter; + + private static volatile SingleProcessCounter counter = null; + + private static Object lock = new Object(); + + private SingleProcessCounter() { + mMessageLocalCounter = new ConcurrentHashMap<>(); + mMessageUploadCounter = new ConcurrentHashMap<>(); + } + + public static SingleProcessCounter getSingleProcessCounter() { + if (counter != null) return counter; + + synchronized (lock) { + if (counter == null) + counter = new SingleProcessCounter(); + } + return counter; + } + + public void increment(TopicPartition tp, Long delta) { + long bufferValue = mMessageLocalCounter.merge(tp, delta, (v_old, v_delta) -> v_old + v_delta); + + if (LOG.isDebugEnabled()) + LOG.debug("Topic {} Partition {} local message {}", tp.getTopic(), tp.getPartition(), bufferValue); + + } + + public void decrement(TopicPartition tp, Long delta) { + long bufferValue = mMessageLocalCounter.merge(tp, delta, (v_old, v_delta) -> v_old - v_delta); + + if (LOG.isDebugEnabled()) + LOG.debug("Topic {} Partition {} local message {}", tp.getTopic(), tp.getPartition(), bufferValue); + } + + public void topicUploaded(TopicPartition tp) { + long counter = getLocalCounter(tp); + mMessageUploadCounter.merge(tp, counter, (v_old, v_delta) -> v_old + v_delta); + decrement(tp, counter); + } + + public long getLocalCounter(TopicPartition tp) { + return mMessageLocalCounter.getOrDefault(tp, 0l); + } + + public long getTotalCounter(TopicPartition tp) { + return mMessageLocalCounter.values().stream().reduce((a, b) -> a + b).orElse(0l) + mMessageUploadCounter.values().stream().reduce((a, b) -> a + b).orElse(0l); + + } + + public String toString() { + + StringBuilder sb = new StringBuilder(); + sb.append("Message completed stats: \n"); + toString(mMessageLocalCounter, sb, "Current local Msg written counter: "); + toString(mMessageUploadCounter, sb, "Uploaded Msg counter "); + + return sb.toString(); + } + + private void toString(Map map, StringBuilder sb, String msg) { + map.forEach((tp, offset) -> { + sb + .append("[") + .append(tp.toString()) + .append("," + msg + offset) + .append("]") + .append("\n"); + }); + } + + public void resetLocalCount(TopicPartition topicPartition) { + mMessageLocalCounter.merge(topicPartition, 0l, (v_old, v_set) -> v_set); + } +} diff --git a/src/main/java/com/pinterest/secor/consumer/Consumer.java b/src/main/java/com/pinterest/secor/consumer/Consumer.java index 74a925227..9fbd472da 100644 --- a/src/main/java/com/pinterest/secor/consumer/Consumer.java +++ b/src/main/java/com/pinterest/secor/consumer/Consumer.java @@ -22,6 +22,8 @@ import com.pinterest.secor.common.FileRegistry; import com.pinterest.secor.common.OffsetTracker; import com.pinterest.secor.common.SecorConfig; +import com.pinterest.secor.common.SingleProcessCounter; +import com.pinterest.secor.common.TopicPartition; import com.pinterest.secor.message.Message; import com.pinterest.secor.message.ParsedMessage; import com.pinterest.secor.monitoring.MetricCollector; @@ -75,6 +77,7 @@ public class Consumer extends Thread { private boolean mUploadOnShutdown; private volatile boolean mShuttingDown = false; private static volatile boolean mCallingSystemExit = false; + private static SingleProcessCounter spc = SingleProcessCounter.getSingleProcessCounter(); public Consumer(SecorConfig config) { mConfig = config; @@ -156,8 +159,9 @@ public void run() { // check upload policy every N seconds or 10,000 messages/consumer timeouts long checkEveryNSeconds = Math.min(10 * 60, mConfig.getMaxFileAgeSeconds() / 2); long checkMessagesPerSecond = mConfig.getMessagesPerSecond(); - long nMessages = 0; + long nMsgPulls = 0; long lastChecked = System.currentTimeMillis(); + long timeStamp = System.currentTimeMillis(); while (true) { boolean hasMoreMessages = consumeNextMessage(); if (!hasMoreMessages) { @@ -170,8 +174,14 @@ public void run() { } long now = System.currentTimeMillis(); + + if (nMsgPulls % 1000 == 0 || now - timeStamp > 60 * 1000) { + LOG.info("nMsgPulls: " + nMsgPulls + " lastChecked: " + lastChecked); + timeStamp = now; + } + if (mDeterministicUploadPolicyTracker != null || - nMessages++ % checkMessagesPerSecond == 0 || + nMsgPulls++ % checkMessagesPerSecond == 0 || (now - lastChecked) > checkEveryNSeconds * 1000) { lastChecked = now; checkUploadPolicy(false); @@ -191,6 +201,7 @@ public void run() { protected void checkUploadPolicy(boolean forceUpload) { try { + LOG.info("checkUploadPolicy invoked, " + mOffsetTracker.toString() + ", " + spc.toString()); mUploader.applyPolicy(forceUpload); } catch (Exception e) { throw new RuntimeException("Failed to apply upload policy", e); @@ -242,7 +253,8 @@ protected boolean consumeNextMessage() { if (parsedMessage != null) { try { mMessageWriter.write(parsedMessage); - + spc.increment(new TopicPartition(rawMessage.getTopic(), + rawMessage.getKafkaPartition()), 1l); mMetricCollector.metric("consumer.message_size_bytes", rawMessage.getPayload().length, rawMessage.getTopic()); mMetricCollector.increment("consumer.throughput_bytes", rawMessage.getPayload().length, rawMessage.getTopic()); } catch (Exception e) { diff --git a/src/main/java/com/pinterest/secor/reader/MessageReader.java b/src/main/java/com/pinterest/secor/reader/MessageReader.java index ec8549a8a..b45bd0c31 100644 --- a/src/main/java/com/pinterest/secor/reader/MessageReader.java +++ b/src/main/java/com/pinterest/secor/reader/MessageReader.java @@ -109,8 +109,8 @@ public Message read() { exportStats(); } if (message.getOffset() < committedOffsetCount) { - LOG.debug("skipping message {} because its offset precedes committed offset count {}", - message, committedOffsetCount); + LOG.info("skipping message topic {} offset {} because its offset precedes committed offset count {}", + message.getTopic(), message.getOffset(), committedOffsetCount); return null; } return message; diff --git a/src/main/java/com/pinterest/secor/uploader/Uploader.java b/src/main/java/com/pinterest/secor/uploader/Uploader.java index 8ec9fa6cb..60bf157cc 100644 --- a/src/main/java/com/pinterest/secor/uploader/Uploader.java +++ b/src/main/java/com/pinterest/secor/uploader/Uploader.java @@ -25,6 +25,7 @@ import com.pinterest.secor.common.OffsetTracker; import com.pinterest.secor.common.SecorConfig; import com.pinterest.secor.common.SecorConstants; +import com.pinterest.secor.common.SingleProcessCounter; import com.pinterest.secor.common.TopicPartition; import com.pinterest.secor.common.ZookeeperConnector; import com.pinterest.secor.io.FileReader; @@ -65,6 +66,7 @@ public class Uploader { protected String mTopicFilter; private boolean isOffsetsStorageKafka = false; + private static SingleProcessCounter spc = SingleProcessCounter.getSingleProcessCounter(); /** @@ -142,6 +144,7 @@ protected void uploadFiles(TopicPartition topicPartition) throws Exception { mMessageReader.commit(topicPartition, lastSeenOffset + 1); } mMetricCollector.increment("uploader.file_uploads.count", paths.size(), topicPartition.getTopic()); + spc.topicUploaded(topicPartition); } else { LOG.warn("Zookeeper committed offset didn't match for topic {} partition {}: {} vs {}", topicPartition.getTopic(), topicPartition.getTopic(), zookeeperCommittedOffsetCount, @@ -182,6 +185,7 @@ private void trim(LogFilePath srcPath, long startOffset) throws Exception { int copiedMessages = 0; // Deleting the writer closes its stream flushing all pending data to the disk. mFileRegistry.deleteWriter(srcPath); + long droppedCounter = 0; try { CompressionCodec codec = null; String extension = ""; @@ -195,19 +199,21 @@ private void trim(LogFilePath srcPath, long startOffset) throws Exception { if (keyVal.getOffset() >= startOffset) { if (writer == null) { String localPrefix = mConfig.getLocalPath() + '/' + - IdUtil.getLocalMessageDir(); + IdUtil.getLocalMessageDir(); dstPath = new LogFilePath(localPrefix, srcPath.getTopic(), - srcPath.getPartitions(), srcPath.getGeneration(), - srcPath.getKafkaPartition(), startOffset, - extension); + srcPath.getPartitions(), srcPath.getGeneration(), + srcPath.getKafkaPartition(), startOffset, + extension); writer = mFileRegistry.getOrCreateWriter(dstPath, - codec); + codec); } writer.write(keyVal); if (mDeterministicUploadPolicyTracker != null) { mDeterministicUploadPolicyTracker.track(topicPartition, keyVal); } copiedMessages++; + } else { + droppedCounter++; } } } finally { @@ -216,6 +222,7 @@ private void trim(LogFilePath srcPath, long startOffset) throws Exception { } } mFileRegistry.deletePath(srcPath); + spc.decrement(topicPartition, droppedCounter); if (dstPath == null) { LOG.info("removed file {}", srcPath.getLogFilePath()); } else { diff --git a/src/main/scripts/docker-entrypoint.sh b/src/main/scripts/docker-entrypoint.sh index 510c70049..2b6c886fc 100644 --- a/src/main/scripts/docker-entrypoint.sh +++ b/src/main/scripts/docker-entrypoint.sh @@ -1,7 +1,6 @@ -#!/bin/sh +#!/bin/bash set -e - SECOR_CONFIG='' if [ -z "$ZOOKEEPER_QUORUM" ]; then From a119c6162c8eafca3605ad1aa8d9b0ef46f182ac Mon Sep 17 00:00:00 2001 From: zineng Date: Fri, 31 Jan 2020 13:15:49 -0500 Subject: [PATCH 3/4] Update configure for shorting metadata/topic discovery time --- .../pinterest/secor/common/SecorConfig.java | 10 +++++++++- .../reader/SecorKafkaMessageIterator.java | 2 +- .../pinterest/secor/uploader/Uploader.java | 19 +++++++++++++++---- 3 files changed, 25 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/pinterest/secor/common/SecorConfig.java b/src/main/java/com/pinterest/secor/common/SecorConfig.java index 38b264a3b..99c6e1908 100644 --- a/src/main/java/com/pinterest/secor/common/SecorConfig.java +++ b/src/main/java/com/pinterest/secor/common/SecorConfig.java @@ -26,7 +26,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Properties; +import java.util.TimeZone; /** * One-stop shop for Secor configuration options. @@ -221,6 +225,10 @@ public String getFetchMinBytes() { return getString("kafka.fetch.min.bytes"); } + public String getMetaDataRefreshInterval() { + return getString("kafka.metadata.max.age.ms", "90000"); + } + public String getFetchMaxBytes() { return getString("kafka.fetch.max.bytes"); } diff --git a/src/main/java/com/pinterest/secor/reader/SecorKafkaMessageIterator.java b/src/main/java/com/pinterest/secor/reader/SecorKafkaMessageIterator.java index 56a8f2b5d..02a8a8847 100644 --- a/src/main/java/com/pinterest/secor/reader/SecorKafkaMessageIterator.java +++ b/src/main/java/com/pinterest/secor/reader/SecorKafkaMessageIterator.java @@ -117,6 +117,7 @@ public void init(SecorConfig config) throws UnknownHostException { optionalConfig(config.getSslProvider(), conf -> props.put("ssl.provider", conf)); optionalConfig(config.getSslTruststoreType(), conf -> props.put("ssl.truststore.type", conf)); optionalConfig(config.getNewConsumerPartitionAssignmentStrategyClass(), conf -> props.put("partition.assignment.strategy", conf)); + optionalConfig(config.getMetaDataRefreshInterval(), conf -> props.put("metadata.max.age.ms", conf)); mZookeeperConnector = new ZookeeperConnector(config); mRecordsBatch = new ArrayDeque<>(); @@ -143,7 +144,6 @@ private void optionalConfig(String maybeConf, Consumer configConsumer) { @Override public void subscribe(RebalanceHandler handler, SecorConfig config) { ConsumerRebalanceListener reBalanceListener = new SecorConsumerRebalanceListener(mKafkaConsumer, mZookeeperConnector, getSkipZookeeperOffsetSeek(config), config.getNewConsumerAutoOffsetReset(), handler); - ; String[] subscribeList = config.getKafkaTopicList(); if (Strings.isNullOrEmpty(subscribeList[0])) { diff --git a/src/main/java/com/pinterest/secor/uploader/Uploader.java b/src/main/java/com/pinterest/secor/uploader/Uploader.java index 60bf157cc..185ae4e90 100644 --- a/src/main/java/com/pinterest/secor/uploader/Uploader.java +++ b/src/main/java/com/pinterest/secor/uploader/Uploader.java @@ -273,10 +273,21 @@ protected void checkTopicPartition(TopicPartition topicPartition, boolean forceU final long size = mFileRegistry.getSize(topicPartition); final long modificationAgeSec = mFileRegistry.getModificationAgeSec(topicPartition); LOG.debug("size: " + size + " modificationAge: " + modificationAgeSec); - shouldUpload = forceUpload || - size >= mConfig.getMaxFileSizeBytes() || - modificationAgeSec >= mConfig.getMaxFileAgeSeconds() || - isRequiredToUploadAtTime(topicPartition); + + boolean fileSizeTrigger = size >= mConfig.getMaxFileSizeBytes(); + boolean fileAgeTrigger = modificationAgeSec >= mConfig.getMaxFileAgeSeconds(); + boolean uploadTimeTrigger = isRequiredToUploadAtTime(topicPartition); + shouldUpload = forceUpload || fileAgeTrigger + || fileSizeTrigger + || uploadTimeTrigger; + + if (shouldUpload) { + String reason = forceUpload ? "forceUpload" + : fileAgeTrigger ? String.format("fileAgeSec %s is larger than config value %s", modificationAgeSec, mConfig.getMaxFileAgeSeconds()) + : fileSizeTrigger ? String.format("fileSizeBytes %s is larger than config value %s", size, mConfig.getMaxFileSizeBytes()) + : String.format("requiredToUploadAtMinute %s", mConfig.getUploadMinuteMark()); + LOG.info("UploadFile with topic partition [{}] flag set because [" + reason + "]", topicPartition); + } } if (shouldUpload) { long newOffsetCount = mZookeeperConnector.getCommittedOffsetCount(topicPartition); From 5e8f14d7b4135727e6613e54c65d7516f08d7dda Mon Sep 17 00:00:00 2001 From: Dorjee Tsering Date: Tue, 21 Apr 2020 12:06:14 -0400 Subject: [PATCH 4/4] few changes to fix issues --- .../secor/common/SecorSchemaRegistryClient.java | 6 +++++- .../java/com/pinterest/secor/consumer/Consumer.java | 4 ++-- .../io/impl/AvroParquetFileReaderWriterFactory.java | 12 ++++++++---- .../com/pinterest/secor/writer/MessageWriter.java | 4 ++-- 4 files changed, 17 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/pinterest/secor/common/SecorSchemaRegistryClient.java b/src/main/java/com/pinterest/secor/common/SecorSchemaRegistryClient.java index f70a60498..280da5df9 100644 --- a/src/main/java/com/pinterest/secor/common/SecorSchemaRegistryClient.java +++ b/src/main/java/com/pinterest/secor/common/SecorSchemaRegistryClient.java @@ -68,7 +68,6 @@ public GenericRecord deserialize(String topic, byte[] message) { GenericRecord record = (GenericRecord) deserializer.deserialize(topic, message); if (record != null) { Schema schema = record.getSchema(); - schemas.put(topic, schema); } return record; } @@ -84,9 +83,14 @@ public GenericRecord deserialize(String topic, byte[] message) { */ public Schema getSchema(String topic) { Schema schema = schemas.get(topic); + if(schema != null) { + LOG.info("Found the schema in memory map for topic " + topic); + LOG.info("schema = " + schema.toString(true)); + } if (schema == null) { try { SchemaMetadata schemaMetadata = schemaRegistryClient.getLatestSchemaMetadata(topic + "-value"); + LOG.info("Pulled schema from schema registry api for topic = " + topic + ", schema id = " + schemaMetadata.getId() + ", schema = " + schemaMetadata.getSchema() + ", schema version = " + schemaMetadata.getVersion()); schema = schemaRegistryClient.getByID(schemaMetadata.getId()); schemas.put(topic, schema); } catch (IOException e) { diff --git a/src/main/java/com/pinterest/secor/consumer/Consumer.java b/src/main/java/com/pinterest/secor/consumer/Consumer.java index 9fbd472da..ddc4413f5 100644 --- a/src/main/java/com/pinterest/secor/consumer/Consumer.java +++ b/src/main/java/com/pinterest/secor/consumer/Consumer.java @@ -176,7 +176,7 @@ public void run() { long now = System.currentTimeMillis(); if (nMsgPulls % 1000 == 0 || now - timeStamp > 60 * 1000) { - LOG.info("nMsgPulls: " + nMsgPulls + " lastChecked: " + lastChecked); + LOG.debug("nMsgPulls: " + nMsgPulls + " lastChecked: " + lastChecked); timeStamp = now; } @@ -201,7 +201,7 @@ public void run() { protected void checkUploadPolicy(boolean forceUpload) { try { - LOG.info("checkUploadPolicy invoked, " + mOffsetTracker.toString() + ", " + spc.toString()); + LOG.debug("checkUploadPolicy invoked, " + mOffsetTracker.toString() + ", " + spc.toString()); mUploader.applyPolicy(forceUpload); } catch (Exception e) { throw new RuntimeException("Failed to apply upload policy", e); diff --git a/src/main/java/com/pinterest/secor/io/impl/AvroParquetFileReaderWriterFactory.java b/src/main/java/com/pinterest/secor/io/impl/AvroParquetFileReaderWriterFactory.java index 1259bd50d..d1ce2c866 100644 --- a/src/main/java/com/pinterest/secor/io/impl/AvroParquetFileReaderWriterFactory.java +++ b/src/main/java/com/pinterest/secor/io/impl/AvroParquetFileReaderWriterFactory.java @@ -127,10 +127,14 @@ public long getLength() throws IOException { @Override public void write(KeyValue keyValue) throws IOException { - GenericRecord record = schemaRegistry.deserialize(topic, keyValue.getValue()); - LOG.trace("Writing record {}", record); - if (record != null){ - writer.write(record); + try { + GenericRecord record = schemaRegistry.deserialize(topic, keyValue.getValue()); + LOG.trace("Writing record {}", record); + if (record != null) { + writer.write(record); + } + } catch (Exception ex) { + LOG.error("Failed to write but will continue with next messages. Error message = " + ex.getMessage()); } } diff --git a/src/main/java/com/pinterest/secor/writer/MessageWriter.java b/src/main/java/com/pinterest/secor/writer/MessageWriter.java index e5537cd35..4f3c5e8e0 100644 --- a/src/main/java/com/pinterest/secor/writer/MessageWriter.java +++ b/src/main/java/com/pinterest/secor/writer/MessageWriter.java @@ -83,9 +83,9 @@ public void adjustOffset(Message message, boolean isLegacyConsumer) throws IOExc if (isLegacyConsumer && message.getOffset() != lastSeenOffset + 1) { StatsUtil.incr("secor.consumer_rebalance_count." + topicPartition.getTopic()); // There was a rebalancing event since we read the last message. - LOG.info("offset of message {} does not follow sequentially the last seen offset {}. " + + LOG.info("offset of message topic = {}, partition = {}, offset = {}, kafkaf key = {} does not follow sequentially the last seen offset {}. " + "Deleting files in topic {} partition {}", - message, lastSeenOffset, topicPartition.getTopic(), topicPartition.getPartition()); + message.getTopic(), message.getKafkaPartition(), message.getOffset(), message.getKafkaKey(), lastSeenOffset, topicPartition.getTopic(), topicPartition.getPartition()); mFileRegistry.deleteTopicPartition(topicPartition); if (mDeterministicUploadPolicyTracker != null) {