Skip to content

Commit

Permalink
Merge pull request #4 from allegro/performance_optimization
Browse files Browse the repository at this point in the history
Performance optimization
  • Loading branch information
druminski authored Jun 13, 2016
2 parents 518a738 + e09c926 commit 5ee6482
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 48 deletions.
42 changes: 39 additions & 3 deletions converter/build.gradle
Original file line number Diff line number Diff line change
@@ -1,27 +1,52 @@
buildscript {
repositories {
mavenCentral()
maven {
url 'https://plugins.gradle.org/m2/'
}
}
dependencies {
classpath "io.codearte.gradle.nexus:gradle-nexus-staging-plugin:0.5.3"
classpath 'io.codearte.gradle.nexus:gradle-nexus-staging-plugin:0.5.3'
classpath 'gradle.plugin.me.champeau.gradle:jmh-gradle-plugin:0.3.0'
}
}


plugins {
id 'java'
id 'groovy'
id 'maven'
id 'jacoco'
id 'com.bmuschko.nexus' version '2.3.1'
id 'me.champeau.gradle.jmh' version '0.3.0'
}

apply plugin: 'io.codearte.nexus-staging'
apply plugin: 'idea'

configurations {
jmh
}

jmh {
include = 'tech\\.allegro\\.schema\\.json2avro\\.converter\\..*'
humanOutputFile = null
jmhVersion = '1.12'
iterations = 2
timeOnIteration = '10s'
fork = 1
warmupIterations = 1
warmup = '10s'
failOnError = true
threads = 1
}

dependencies {
compile group: 'org.apache.avro', name: 'avro', version: '1.7.7'

testCompile group: 'org.spockframework', name: 'spock-core', version: versions.spock

jmh 'org.openjdk.jmh:jmh-core:1.12'
jmh 'org.openjdk.jmh:jmh-generator-annprocess:1.12'
}

modifyPom {
Expand Down Expand Up @@ -58,4 +83,15 @@ nexusStaging {

numberOfRetries = 15
delayBetweenRetriesInMillis = 5000
}
}

idea {
module {
scopes.PROVIDED.plus += [ configurations.jmh ]
}
}

// Workaround for duplicated `BenchmarkList` and `CompilerHints` files from META-INF directory in jmh jar.
// Those duplications can prevent from running benchmark tests.
// More info https://github.com/melix/jmh-gradle-plugin/issues/6
tasks.getByName('jmhJar').doFirst() {duplicatesStrategy(DuplicatesStrategy.EXCLUDE)}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package tech.allegro.schema.json2avro.converter;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import org.openjdk.jmh.runner.options.TimeValue;

import java.util.concurrent.TimeUnit;

@State(Scope.Benchmark)
public class JsonAvroConverterBenchmark {

private JsonAvroConverter converter = new JsonAvroConverter();
private byte[] messageWithNullField;
private byte[] completeMessage;
private Schema schema;

@Setup
public void setup() {
converter = new JsonAvroConverter();
schema = new Schema.Parser().parse(
"{" +
" \"type\" : \"record\"," +
" \"name\" : \"Acme\"," +
" \"fields\" : [" +
" { \"name\" : \"username\", \"type\" : \"string\" }," +
" { \"name\" : \"age\", \"type\" : [\"null\", \"int\"], \"default\": null }]" +
"}");

messageWithNullField = "{ \"username\": \"mike\" }".getBytes();
completeMessage = "{ \"username\": \"mike\", \"age\": 30}".getBytes();
}

@Benchmark
@BenchmarkMode(Mode.SampleTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public GenericData.Record conversionLatencyForMessageWithNotProvidedOptionalField() {
return converter.convertToGenericDataRecord(messageWithNullField, schema);
}

@Benchmark
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
public GenericData.Record conversionThroughputForMessageWithNotProvidedOptionalField() {
return converter.convertToGenericDataRecord(messageWithNullField, schema);
}

@Benchmark
@BenchmarkMode(Mode.SampleTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public GenericData.Record conversionLatencyForCompleteMessage() {
return converter.convertToGenericDataRecord(completeMessage, schema);
}

@Benchmark
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
public GenericData.Record conversionThroughputForCompleteMessage() {
return converter.convertToGenericDataRecord(completeMessage, schema);
}

public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include(".*" + JsonAvroConverterBenchmark.class.getSimpleName() + ".*")
.warmupIterations(2)
.measurementIterations(2)
.measurementTime(TimeValue.seconds(20))
.warmupTime(TimeValue.seconds(5))
.forks(1)
.threads(1)
.syncIterations(true)
.build();

new Runner(opt).run();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package tech.allegro.schema.json2avro.converter;

import org.apache.avro.AvroTypeException;

import java.util.Deque;
import java.util.stream.StreamSupport;

import static java.util.Spliterator.ORDERED;
import static java.util.Spliterators.spliteratorUnknownSize;
import static java.util.stream.Collectors.joining;

class AvroTypeExceptions {
static AvroTypeException enumException(Deque<String> fieldPath, String expectedSymbols) {
return new AvroTypeException(new StringBuilder()
.append("Field ")
.append(path(fieldPath))
.append(" is expected to be of enum type and be one of ")
.append(expectedSymbols)
.toString());
}

static AvroTypeException unionException(String fieldName, String expectedTypes, Deque<String> offendingPath) {
return new AvroTypeException(new StringBuilder()
.append("Could not evaluate union, field")
.append(fieldName)
.append("is expected to be one of these: ")
.append(expectedTypes)
.append("If this is a complex type, check if offending field: ")
.append(path(offendingPath))
.append(" adheres to schema.")
.toString());
}

static AvroTypeException typeException(Deque<String> fieldPath, String expectedType) {
return new AvroTypeException(new StringBuilder()
.append("Field ")
.append(path(fieldPath))
.append(" is expected to be type: ")
.append(expectedType)
.toString());
}

private static String path(Deque<String> path) {
return StreamSupport.stream(spliteratorUnknownSize(path.descendingIterator(), ORDERED), false)
.map(Object::toString).collect(joining("."));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,18 @@
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import java.util.function.Function;

import static java.lang.String.format;
import static java.util.Optional.ofNullable;
import static java.util.stream.Collectors.*;
import static tech.allegro.schema.json2avro.converter.AvroTypeExceptions.*;

public class JsonGenericRecordReader {
private ObjectMapper mapper;
private static final Object INCOMPATIBLE = new Object();
private final ObjectMapper mapper;

public JsonGenericRecordReader() {
this.mapper = new ObjectMapper();
this(new ObjectMapper());
}

public JsonGenericRecordReader(ObjectMapper mapper) {
Expand Down Expand Up @@ -54,31 +52,31 @@ private GenericData.Record readRecord(Map<String,Object> json, Schema schema, De
GenericRecordBuilder record = new GenericRecordBuilder(schema);
json.entrySet().forEach(entry ->
ofNullable(schema.getField(entry.getKey()))
.ifPresent(field -> record.set(field, read(field, field.schema(), entry.getValue(), path))));
.ifPresent(field -> record.set(field, read(field, field.schema(), entry.getValue(), path, false))));
return record.build();
}

@SuppressWarnings("unchecked")
private Object read(Schema.Field field, Schema schema, Object value, Deque<String> path) {
private Object read(Schema.Field field, Schema schema, Object value, Deque<String> path, boolean silently) {
boolean pushed = !field.name().equals(path.peek());
if(pushed) {
path.push(field.name());
}
Object result;

switch (schema.getType()) {
case RECORD: result = readRecord(ensureType(value, Map.class, path), schema, path); break;
case ARRAY: result = readArray(field, schema, ensureType(value, List.class, path), path); break;
case MAP: result = readMap(field, schema, ensureType(value, Map.class, path), path); break;
case UNION: result = readUnion(field, schema, value, path); break;
case INT: result = ensureType(value, Number.class, path).intValue(); break;
case LONG: result = ensureType(value, Number.class, path).longValue(); break;
case FLOAT: result = ensureType(value, Number.class, path).floatValue(); break;
case DOUBLE: result = ensureType(value, Number.class, path).doubleValue(); break;
case BOOLEAN: result = ensureType(value, Boolean.class, path); break;
case ENUM: result = ensureEnum(schema, ensureType(value, String.class, path), path); break;
case STRING: result = ensureType(value, String.class, path); break;
case NULL: result = ensureNull(value, path); break;
case RECORD: result = onValidType(value, Map.class, path, silently, map -> readRecord(map, schema, path)); break;
case ARRAY: result = onValidType(value, List.class, path, silently, list -> readArray(field, schema, list, path)); break;
case MAP: result = onValidType(value, Map.class, path, silently, map -> readMap(field, schema, map, path)); break;
case UNION: result = readUnion(field, schema, value, path); break;
case INT: result = onValidNumber(value, path, silently, Number::intValue); break;
case LONG: result = onValidNumber(value, path, silently, Number::longValue); break;
case FLOAT: result = onValidNumber(value, path, silently, Number::floatValue); break;
case DOUBLE: result = onValidNumber(value, path, silently, Number::doubleValue); break;
case BOOLEAN: result = onValidType(value, Boolean.class, path, silently, bool -> bool); break;
case ENUM: result = onValidType(value, String.class, path, silently, string -> ensureEnum(schema, string, path)); break;
case STRING: result = onValidType(value, String.class, path, silently, string -> string); break;
case NULL: result = value == null ? value : INCOMPATIBLE; break;
default: throw new AvroTypeException("Unsupported type: " + field.schema().getType());
}

Expand All @@ -89,57 +87,60 @@ private Object read(Schema.Field field, Schema schema, Object value, Deque<Strin
}

private List<Object> readArray(Schema.Field field, Schema schema, List<Object> items, Deque<String> path) {
return items.stream().map(item -> read(field, schema.getElementType(), item, path)).collect(toList());
return items.stream().map(item -> read(field, schema.getElementType(), item, path, false)).collect(toList());
}

private Map<String, Object> readMap(Schema.Field field, Schema schema, Map<String, Object> map, Deque<String> path) {
return map.entrySet()
.stream()
.collect(toMap(Map.Entry::getKey, entry -> read(field, schema.getValueType(), entry.getValue(), path)));
.collect(toMap(Map.Entry::getKey, entry -> read(field, schema.getValueType(), entry.getValue(), path, false)));
}


private Object readUnion(Schema.Field field, Schema schema, Object value, Deque<String> path) {
List<Schema> types = schema.getTypes();
for (Schema type : types) {
try {
return read(field, type, value, path);
} catch (AvroRuntimeException ex) {
Object nestedValue = read(field, type, value, path, true);
if (nestedValue == INCOMPATIBLE) {
continue;
} else {
return nestedValue;
}
} catch (AvroRuntimeException e) {
// thrown only for union of more complex types like records
continue;
}
}
throw new AvroTypeException(format("Could not evaluate union, field %s is expected to be one of these: %s. " +
"If this is a complex type, check if offending field: %s adheres to schema.",
field.name(), types.stream().map(Schema::getType).map(Object::toString).collect(joining(",")), path(path)));
throw unionException(
field.name(),
types.stream().map(Schema::getType).map(Object::toString).collect(joining(", ")),
path);
}

private Object ensureEnum(Schema schema, Object value, Deque<String> path) {
List<String> symbols = schema.getEnumSymbols();
if(symbols.contains(value)){
return value;
}
String knownSymbols = symbols.stream().map(String::valueOf).collect(Collectors.joining(", "));
throw new AvroTypeException(format("Field %s is expected to be of enum type and be one of %s", path(path), knownSymbols));
throw enumException(path, symbols.stream().map(String::valueOf).collect(joining(", ")));
}

@SuppressWarnings("unchecked")
private <T> T ensureType(Object value, Class<T> type, Deque<String> path) {
if (type.isInstance(value)) {
return (T) value;
}
throw new AvroTypeException(format("Field %s is expected to be of %s type.", path(path), type.getName()));
}
public <T> Object onValidType(Object value, Class<T> type, Deque<String> path, boolean silently, Function<T, Object> function)
throws AvroTypeException {

private Object ensureNull(Object o, Deque<String> path) {
if (o != null) {
throw new AvroTypeException(format("Field %s was expected to be null.", path(path)));
if (type.isInstance(value)) {
return function.apply((T) value);
} else {
if (silently) {
return INCOMPATIBLE;
} else {
throw typeException(path, type.getTypeName());
}
}
return null;
}

private String path(Deque<String> path) {
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(path.descendingIterator(), Spliterator.ORDERED), false)
.map(Object::toString).collect(joining("."));
public Object onValidNumber(Object value, Deque<String> path, boolean silently, Function<Number, Object> function) {
return onValidType(value, Number.class, path, silently, function);
}
}

0 comments on commit 5ee6482

Please sign in to comment.