Skip to content

Commit

Permalink
added --print-output-schema to print the sink schema to stdout as json
Browse files Browse the repository at this point in the history
the schema is athena type compatible and can be used to create a AWS Glue table.
  • Loading branch information
cwensel committed Aug 9, 2023
1 parent 18c1da8 commit 9b00969
Show file tree
Hide file tree
Showing 6 changed files with 254 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ public class PipelineOptions implements AWSOptions {
@CommandLine.Option(names = {"--aws-assumed-role-arn"}, description = "aws assumed role arn")
protected String assumedRoleARN;

@CommandLine.Mixin
protected PrintOptions printOptions = new PrintOptions();

public Path pipelinePath() {
return pipelinePath;
}
Expand Down Expand Up @@ -69,4 +72,8 @@ public String aswRegion() {
public String awsAssumedRoleARN() {
return assumedRoleARN;
}

public PrintOptions printOptions() {
return printOptions;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (c) 2023 Chris K Wensel <[email protected]>. All Rights Reserved.
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

package io.clusterless.tessellate.options;

import picocli.CommandLine;

public class PrintOptions {
public enum PrintFormat {
JSON,
SQL
}

@CommandLine.Option(names = {"--print-output-schema"}, description = "print output schema, exits after printing")
protected boolean printOutputSchema = false;

@CommandLine.Option(names = {"--print-format"}, description = "print format")
protected PrintFormat printFormat = PrintFormat.JSON;

public boolean printOutputSchema() {
return printOutputSchema;
}

public PrintFormat printFormat() {
return printFormat;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import io.clusterless.tessellate.factory.*;
import io.clusterless.tessellate.model.*;
import io.clusterless.tessellate.options.PipelineOptions;
import io.clusterless.tessellate.options.PrintOptions;
import io.clusterless.tessellate.printer.SchemaPrinter;
import io.clusterless.tessellate.util.Format;
import io.clusterless.tessellate.util.Models;
import org.slf4j.Logger;
Expand Down Expand Up @@ -232,7 +234,6 @@ private static void logCurrentFields(Fields currentFields) {
}

public Integer run() throws IOException {

if (state == State.NONE) {
build();
}
Expand All @@ -241,6 +242,15 @@ public Integer run() throws IOException {
throw new IllegalStateException("pipeline is not ready to run");
}

if (pipelineOptions().printOptions().printOutputSchema()) {
Tap tap = flow.getSink();
PrintOptions.PrintFormat printFormat = pipelineOptions().printOptions().printFormat();
SchemaPrinter schemaPrinter = new SchemaPrinter(tap, printFormat);

schemaPrinter.print(System.out);
return 0;
}

running.set(true);

try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Copyright (c) 2023 Chris K Wensel <[email protected]>. All Rights Reserved.
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

package io.clusterless.tessellate.printer;

import cascading.tap.AdaptorTap;
import cascading.tap.Tap;
import cascading.tap.partition.BasePartitionTap;
import cascading.tuple.Fields;
import cascading.tuple.coerce.Coercions;
import cascading.tuple.type.CoercibleType;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.clusterless.tessellate.options.PrintOptions;
import io.clusterless.tessellate.type.WrappedCoercibleType;
import io.clusterless.tessellate.util.JSONUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.PrintStream;
import java.lang.reflect.Type;
import java.util.*;
import java.util.function.BiConsumer;

public class SchemaPrinter {
private static final Logger LOG = LoggerFactory.getLogger(SchemaPrinter.class);
private TypeMap typeMap = new TypeMap(TypeMap.Dialect.athena);

public static class Schema {
public static final String NAME = "name";
public static final String TYPE = "type";
public static final String COMMENT = "comment";

@JsonProperty
protected List<Map<String, String>> columns = new LinkedList<>();

@JsonProperty
protected List<Map<String, String>> partitions = new LinkedList<>();

public void addColumn(String name, String type) {
addTo(columns, name, type);
}

public void addPartition(String name, String type) {
addTo(partitions, name, type);
}

protected void addTo(List<Map<String, String>> value, String name, String type) {
value.stream()
.filter(pos -> pos.get(NAME).equals(name))
.findFirst()
.ifPresentOrElse(
pos -> pos.put(TYPE, type),
() -> value.add(map(name, type))
);
}

private Map<String, String> map(String name, String type) {
Map<String, String> map = new LinkedHashMap<>();
map.put(NAME, name);
map.put(TYPE, type);
return map;
}
}

private final PrintOptions.PrintFormat printFormat;
private final Fields fields;
private final Fields partitionFields;

public SchemaPrinter(Tap tap, PrintOptions.PrintFormat printFormat) {
this.printFormat = printFormat;

if (tap instanceof AdaptorTap) {
tap = ((AdaptorTap<?, ?, ?, ?, ?, ?>) tap).getOriginal();
}

if (tap instanceof BasePartitionTap) {
Tap parent = ((BasePartitionTap<?, ?, ?>) tap).getParent();
this.fields = parent.getSinkFields();
this.partitionFields = ((BasePartitionTap<?, ?, ?>) tap).getPartition().getPartitionFields();
} else {
this.fields = tap.getSinkFields();
this.partitionFields = Fields.NONE;
}
}

public void print(PrintStream out) {
// todo: update in place by reading an existing schema
// note the schema object already supports update of a column without changing the order or comments
Schema schema = new Schema();

LOG.info("adding columns: {}, excluding: {}", fields.print(), partitionFields.print());
addFields(fields, partitionFields, schema::addColumn);

LOG.info("adding partitions: {}", partitionFields.print());
addFields(partitionFields, Fields.NONE, schema::addPartition);

out.print(asString(schema));
}

protected String asString(Schema schema) {
switch (printFormat) {
case JSON:
return JSONUtil.writeAsStringSafePretty(schema);
default:
throw new UnsupportedOperationException("Unsupported value: " + printFormat);
}
}

protected void addFields(Fields fields, Fields exclude, BiConsumer<String, String> consumer) {
Iterator<Fields> fieldsIterator = fields.fieldsIterator();

while (fieldsIterator.hasNext()) {
Fields next = fieldsIterator.next();

if (exclude.contains(next)) {
continue;
}

String name = next.get(0).toString();

Type type = next.getType(0);

if (type instanceof WrappedCoercibleType) {
type = ((WrappedCoercibleType<?>) type).wrappedCoercibleType();

if (type instanceof Coercions.Coerce) {
type = ((Coercions.Coerce) type).getCanonicalType();
}
}

if (type instanceof Class && Coercions.primitives.containsKey(type)) {
type = Coercions.primitives.get(type);
} else if (type instanceof CoercibleType) {
type = type.getClass();
}

String typeName = typeMap.get(type);

consumer.accept(name, typeName);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2023 Chris K Wensel <[email protected]>. All Rights Reserved.
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

package io.clusterless.tessellate.printer;

import cascading.nested.json.JSONCoercibleType;
import cascading.tuple.type.DateType;
import cascading.tuple.type.InstantType;

import java.lang.reflect.Type;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;

public class TypeMap {
public enum Dialect {
athena
}

private static Consumer<Map<Type, String>> athena = map -> {
map.put(String.class, "string");
map.put(Boolean.class, "boolean");
map.put(Byte.class, "tinyint");
map.put(Short.class, "smallint");
map.put(Integer.class, "int");
map.put(Long.class, "bigint");
map.put(Float.class, "float");
map.put(Double.class, "double");
map.put(JSONCoercibleType.class, "string");
map.put(DateType.class, "timestamp");
map.put(InstantType.class, "timestamp");
};
Map<Type, String> map = new HashMap<>();

public TypeMap(Dialect dialect) {
switch (dialect) {
case athena:
athena.accept(map);
break;
default:
throw new IllegalStateException("Unexpected value: " + dialect);
}
}

public String get(Type type) {
return map.get(type);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ protected WrappedCoercibleType(CoercibleType<Canonical> coercibleType, Function<
this.function = function;
}

public CoercibleType<Canonical> wrappedCoercibleType() {
return coercibleType;
}

@Override
public Class<Canonical> getCanonicalType() {
return coercibleType.getCanonicalType();
Expand Down

0 comments on commit 9b00969

Please sign in to comment.