From aed193c745accf703309ad1b8cd91a4edca796e1 Mon Sep 17 00:00:00 2001 From: Chris K Wensel Date: Wed, 19 Jul 2023 21:09:52 -0700 Subject: [PATCH] support MVEL templates within the pipeline JSON file also remove the evalInsert transform, as it's redundant and less useful than the template support --- README.md | 70 ++++++++++++++++--- .../java/io/clusterless/tessellate/Main.java | 6 +- .../tessellate/model/EvalInsertOp.java | 52 -------------- .../tessellate/options/InputOptions.java | 2 +- .../tessellate/options/OutputOptions.java | 2 +- .../options/PipelineOptionsMerge.java | 20 +++++- .../tessellate/pipeline/Pipeline.java | 9 --- .../tessellate/pipeline/Transforms.java | 1 - .../clusterless/tessellate/util/JSONUtil.java | 8 +++ .../pipeline/PipelineOptionsMergerTest.java | 3 + .../pipeline/PipelineParseTest.java | 2 +- .../tessellate/util/LiteralResolverTest.java | 1 + .../src/test/resources/config/pipeline.json | 2 +- 13 files changed, 99 insertions(+), 79 deletions(-) delete mode 100644 tessellate-main/src/main/java/io/clusterless/tessellate/model/EvalInsertOp.java diff --git a/README.md b/README.md index 7d1b6b4..572d5cb 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,10 @@ This project is under active development and many features are considered alpha. Please do play around with this project in order to provide early feedback, but do expect things to change until we hit 1.0 release. +All final and WIP releases can be found here: + +- https://github.com/ClusterlessHQ/tessellate/releases + ## About A primary activity of any data-engineering effort is to format and organize data for different access patterns. @@ -22,14 +26,62 @@ Tessellate may be used from the command line, but also natively supports the ## Features -### Supported formats +### Pipeline definition + +Tessellate pipelines are defined in JSON files. + +For a copy of a template pipeline JSON file, run: + +```shell +tess --print-pipeline > pipeline.json +``` + +Some command line options are merged at runtime with the pipeline JSON file. Command line options take precedence over +the pipeline JSON file. + +Overriding command line options include + +- `--inputs` +- `--input-manifest` +- `--input-manifest-lot` +- `--output` +- `--output-manifest` +- `--output-manifest-lot` + +In order to embed system properties, environment variables, or other provided intrinsic values, [MVEL +templates](http://mvel.documentnode.com) are supported. + +Current context values supported are: + +- Environment variables +- System properties +- Pipeline source properties +- Pipeline sink properties + +For example: + +- `@{env['USER']}` - resolve an environment variable +- `@{sys['user.name']}` - resolve a system property +- `@{sink.manifestLot}` - resolve a sink property from the pipeline JSON definition + +Used in a transform to embed the current `lot` value into the output: + +```json +{ + "transform": [ + "@{source.manifestLot}=>lot|string" + ] +} +``` + +### Supported data formats - `text/regex` - lines of text parsed by regex - `csv` - with or without headers - `tsv` - with or without headers - [Apache Parquet](https://parquet.apache.org) -The regex support is based on regex groups. Groups are matched by ordinal with the declared fields in the schema. +Regex support is based on regex groups. Groups are matched by ordinal with the declared fields in the schema. Provided named formats include: @@ -49,7 +101,7 @@ Usage: } ``` -### Supported locations/protocols +### Supported data locations/protocols - `file://` - `s3://` @@ -69,8 +121,6 @@ Usage: - insert - insert a literal value into a field - `value=>intoField|type` -- eval - evaluate an expression locally and insert into a field (relies on [MVEL](http://mvel.documentnode.com)) - - `expression!>intoField|type` - coerce - transform a field to a new type - `field|newType` - copy - copy a field value to a new field @@ -102,7 +152,7 @@ Usage: So that the Cascading WIP releases can be retrieved, to `gradle.properties` add: -``` +```properties githubUsername=[your github username] githubPassword=[your github password] ``` @@ -111,10 +161,14 @@ githubPassword=[your github password] ## To Run -> ./tessellate-main/build/install/tess/bin/tess --help +```shell +./tessellate-main/build/install/tess/bin/tess --help +``` To print a project file template: -> tess --print-project +```shell +tess --print-pipeline +``` Documentation coming soon, but see the tests for usage. diff --git a/tessellate-main/src/main/java/io/clusterless/tessellate/Main.java b/tessellate-main/src/main/java/io/clusterless/tessellate/Main.java index 95c7338..1938b21 100644 --- a/tessellate-main/src/main/java/io/clusterless/tessellate/Main.java +++ b/tessellate-main/src/main/java/io/clusterless/tessellate/Main.java @@ -48,8 +48,8 @@ enum Show { @CommandLine.Mixin protected PipelineOptions pipelineOptions = new PipelineOptions(); - @CommandLine.Option(names = "--print-project", description = "show project template, will not run pipeline") - protected boolean printProject = false; + @CommandLine.Option(names = "--print-pipeline", description = "show pipeline template, will not run pipeline") + protected boolean printPipeline = false; @CommandLine.Option(names = "--show-source", description = "show protocols, formats, or compression options") protected Show showSource; @@ -137,7 +137,7 @@ public Integer call() throws IOException { PipelineDef pipelineDef = merge.merge(); - if (printProject) { + if (printPipeline) { System.out.println(JSONUtil.writeAsStringSafePretty(pipelineDef)); return 0; } diff --git a/tessellate-main/src/main/java/io/clusterless/tessellate/model/EvalInsertOp.java b/tessellate-main/src/main/java/io/clusterless/tessellate/model/EvalInsertOp.java deleted file mode 100644 index 7239a47..0000000 --- a/tessellate-main/src/main/java/io/clusterless/tessellate/model/EvalInsertOp.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright (c) 2023 Chris K Wensel . All Rights Reserved. - * - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. - */ - -package io.clusterless.tessellate.model; - -import cascading.tuple.type.CoercibleType; -import io.clusterless.tessellate.pipeline.Transforms; -import io.clusterless.tessellate.util.LiteralResolver; -import org.jetbrains.annotations.NotNull; - -import java.lang.reflect.Type; -import java.util.Map; - -public class EvalInsertOp extends InsertOp { - public EvalInsertOp(String declaration) { - super(declaration); - } - - @NotNull - protected String translate() { - return "!>"; - } - - public Object evaluate(Map context) { - Class resolvedType = getResolvedType(); - - return LiteralResolver.resolve(value(), context, resolvedType); - } - - protected Class getResolvedType() { - Type type = field().fields().getType(0); - Class resolvedType; - if (type instanceof Class) { - resolvedType = (Class) type; - } else if (type instanceof CoercibleType) { - resolvedType = ((CoercibleType) type).getCanonicalType(); - } else { - throw new IllegalArgumentException("invalid type: " + type); - } - return resolvedType; - } - - @Override - public Transforms transform() { - return Transforms.eval; - } -} diff --git a/tessellate-main/src/main/java/io/clusterless/tessellate/options/InputOptions.java b/tessellate-main/src/main/java/io/clusterless/tessellate/options/InputOptions.java index f6a9fe5..984090e 100644 --- a/tessellate-main/src/main/java/io/clusterless/tessellate/options/InputOptions.java +++ b/tessellate-main/src/main/java/io/clusterless/tessellate/options/InputOptions.java @@ -19,7 +19,7 @@ public class InputOptions implements AWSOptions { private List inputs = new LinkedList<>(); @CommandLine.Option(names = {"-m", "--input-manifest"}, description = "input manifest uri") private URI inputManifest; - @CommandLine.Option(names = {"--input-lot"}, description = "input lot") + @CommandLine.Option(names = {"--input-manifest-lot"}, description = "input lot") private String inputLot; @CommandLine.Option(names = {"--input-aws-endpoint"}, description = "aws endpoint") protected String awsEndpoint; diff --git a/tessellate-main/src/main/java/io/clusterless/tessellate/options/OutputOptions.java b/tessellate-main/src/main/java/io/clusterless/tessellate/options/OutputOptions.java index 2408bdd..475b366 100644 --- a/tessellate-main/src/main/java/io/clusterless/tessellate/options/OutputOptions.java +++ b/tessellate-main/src/main/java/io/clusterless/tessellate/options/OutputOptions.java @@ -17,7 +17,7 @@ public class OutputOptions implements AWSOptions { private URI output; @CommandLine.Option(names = {"-t", "--output-manifest"}, description = "output manifest uri template") private String outputManifest; - @CommandLine.Option(names = {"-l", "--output-lot"}, description = "output lot") + @CommandLine.Option(names = {"-l", "--output-manifest-lot"}, description = "output lot") private String outputLot; @CommandLine.Option(names = {"--output-aws-endpoint"}, description = "aws endpoint") protected String awsEndpoint; diff --git a/tessellate-main/src/main/java/io/clusterless/tessellate/options/PipelineOptionsMerge.java b/tessellate-main/src/main/java/io/clusterless/tessellate/options/PipelineOptionsMerge.java index 4bffb66..e02b17c 100644 --- a/tessellate-main/src/main/java/io/clusterless/tessellate/options/PipelineOptionsMerge.java +++ b/tessellate-main/src/main/java/io/clusterless/tessellate/options/PipelineOptionsMerge.java @@ -16,6 +16,9 @@ import heretical.pointer.path.NestedPointer; import io.clusterless.tessellate.model.PipelineDef; import io.clusterless.tessellate.util.JSONUtil; +import io.clusterless.tessellate.util.LiteralResolver; +import org.jetbrains.annotations.NotNull; +import org.mvel2.templates.TemplateRuntime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -130,9 +133,22 @@ public PipelineDef merge(JsonNode pipelineDef) { loadAndMerge(pipelineDef, "/source"); loadAndMerge(pipelineDef, "/sink"); - LOG.info("pipeline: {}", JSONUtil.writeAsStringSafe(pipelineDef)); + String mergedPipelineDef = JSONUtil.writeAsStringSafe(pipelineDef); + Map context = getContext(mergedPipelineDef); + String resolved = TemplateRuntime.eval(mergedPipelineDef, context).toString(); + LOG.info("pipeline: {}", resolved); - return JSONUtil.treeToValueSafe(pipelineDef, PipelineDef.class); + return JSONUtil.stringToValue(resolved, PipelineDef.class); + } + + @NotNull + private static Map getContext(String mergedPipelineDef) { + Map context = LiteralResolver.context(); + Map map = JSONUtil.stringToValue(mergedPipelineDef, Map.class); + + context.put("source", map.get("source")); + context.put("sink", map.get("sink")); + return context; } private void loadAndMerge(JsonNode jsonNode, String target) { diff --git a/tessellate-main/src/main/java/io/clusterless/tessellate/pipeline/Pipeline.java b/tessellate-main/src/main/java/io/clusterless/tessellate/pipeline/Pipeline.java index bf7e4b4..9431bcb 100644 --- a/tessellate-main/src/main/java/io/clusterless/tessellate/pipeline/Pipeline.java +++ b/tessellate-main/src/main/java/io/clusterless/tessellate/pipeline/Pipeline.java @@ -115,15 +115,6 @@ public void build() throws IOException { // todo: group like transforms together if there are no interdependencies for (TransformOp transformOp : pipelineDef.transform().transformOps()) { switch (transformOp.transform()) { - case eval: - EvalInsertOp evalOp = (EvalInsertOp) transformOp; - Fields evalFields = evalOp.field().fields(); - Object eval = evalOp.evaluate(getContext()); - LOG.info("transform eval: fields: {}, value: {}", evalFields, eval); - pipe = new Each(pipe, new Insert(evalFields, eval), Fields.ALL); - currentFields = currentFields.append(evalFields); - logCurrentFields(currentFields); - break; case insert: InsertOp insertOp = (InsertOp) transformOp; Fields insertFields = insertOp.field().fields(); diff --git a/tessellate-main/src/main/java/io/clusterless/tessellate/pipeline/Transforms.java b/tessellate-main/src/main/java/io/clusterless/tessellate/pipeline/Transforms.java index 6e76d36..33c5953 100644 --- a/tessellate-main/src/main/java/io/clusterless/tessellate/pipeline/Transforms.java +++ b/tessellate-main/src/main/java/io/clusterless/tessellate/pipeline/Transforms.java @@ -14,7 +14,6 @@ public enum Transforms { insert("=>", "^.+[=]>.+$", InsertOp::new), - eval("!>", "^.+[!]>.+$", EvalInsertOp::new), copy("+>", "^.+[+]>.+$", CopyOp::new), rename("->", "^.+[-]>.+$", RenameOp::new), discard("->", "^.+[-]>$", DiscardOp::new), diff --git a/tessellate-main/src/main/java/io/clusterless/tessellate/util/JSONUtil.java b/tessellate-main/src/main/java/io/clusterless/tessellate/util/JSONUtil.java index cf9696a..260dbc6 100644 --- a/tessellate-main/src/main/java/io/clusterless/tessellate/util/JSONUtil.java +++ b/tessellate-main/src/main/java/io/clusterless/tessellate/util/JSONUtil.java @@ -165,6 +165,14 @@ public static JsonNode stringToTree(String value) { } } + public static T stringToValue(String value, Class type) { + try { + return CONFIG_READER.readValue(value, type); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + public static T treeToValue(JsonNode n, Class type) throws JsonProcessingException { return CONFIG_READER.treeToValue(n, type); } diff --git a/tessellate-main/src/test/java/io/clusterless/tessellate/pipeline/PipelineOptionsMergerTest.java b/tessellate-main/src/test/java/io/clusterless/tessellate/pipeline/PipelineOptionsMergerTest.java index de01756..604b79c 100644 --- a/tessellate-main/src/test/java/io/clusterless/tessellate/pipeline/PipelineOptionsMergerTest.java +++ b/tessellate-main/src/test/java/io/clusterless/tessellate/pipeline/PipelineOptionsMergerTest.java @@ -10,6 +10,7 @@ import com.adelean.inject.resources.junit.jupiter.GivenTextResource; import com.adelean.inject.resources.junit.jupiter.TestWithResources; +import io.clusterless.tessellate.model.InsertOp; import io.clusterless.tessellate.model.PipelineDef; import io.clusterless.tessellate.options.PipelineOptions; import io.clusterless.tessellate.options.PipelineOptionsMerge; @@ -39,6 +40,8 @@ void usingOptions(@GivenTextResource("/config/pipeline.json") String pipelineJso assertEquals(inputs, merged.source().inputs()); assertEquals(output, merged.sink().output()); + + assertEquals("1689820455", ((InsertOp) merged.transform().transformOps().get(5)).value()); } @Test diff --git a/tessellate-main/src/test/java/io/clusterless/tessellate/pipeline/PipelineParseTest.java b/tessellate-main/src/test/java/io/clusterless/tessellate/pipeline/PipelineParseTest.java index c7ec338..7c71348 100644 --- a/tessellate-main/src/test/java/io/clusterless/tessellate/pipeline/PipelineParseTest.java +++ b/tessellate-main/src/test/java/io/clusterless/tessellate/pipeline/PipelineParseTest.java @@ -55,6 +55,6 @@ void name(@GivenTextResource("config/pipeline.json") String pipelineJson) throws assertInstanceOf(CopyOp.class, transform.transformOps().get(2)); assertInstanceOf(DiscardOp.class, transform.transformOps().get(3)); assertInstanceOf(InsertOp.class, transform.transformOps().get(4)); - assertInstanceOf(EvalInsertOp.class, transform.transformOps().get(5)); + assertInstanceOf(InsertOp.class, transform.transformOps().get(5)); } } diff --git a/tessellate-main/src/test/java/io/clusterless/tessellate/util/LiteralResolverTest.java b/tessellate-main/src/test/java/io/clusterless/tessellate/util/LiteralResolverTest.java index f774cc5..20e3cc6 100644 --- a/tessellate-main/src/test/java/io/clusterless/tessellate/util/LiteralResolverTest.java +++ b/tessellate-main/src/test/java/io/clusterless/tessellate/util/LiteralResolverTest.java @@ -15,6 +15,7 @@ public class LiteralResolverTest { @Test void resolveEnv() { Assertions.assertEquals(System.getenv("USER"), LiteralResolver.resolve("env.USER", String.class)); + Assertions.assertEquals(System.getenv("USER"), LiteralResolver.resolve("env['USER']", String.class)); Assertions.assertEquals(System.getProperty("user.name"), LiteralResolver.resolve("sys['user.name']", String.class)); } } diff --git a/tessellate-main/src/test/resources/config/pipeline.json b/tessellate-main/src/test/resources/config/pipeline.json index db7f26e..a56c243 100644 --- a/tessellate-main/src/test/resources/config/pipeline.json +++ b/tessellate-main/src/test/resources/config/pipeline.json @@ -24,6 +24,6 @@ "three+>@three|DateTime|yyyyMMdd", "four->", "five=>_five", - "1689820455!>six|DateTime|yyyyMMdd" + "@{1689820455}=>six|DateTime|yyyyMMdd" ] }