From b35820f13fb6520815056c7560be4d5e8fbf81ea Mon Sep 17 00:00:00 2001 From: Chris K Wensel Date: Wed, 19 Jul 2023 16:49:23 -0700 Subject: [PATCH] add an expression evaluator for literal insertion, fix input manifest support --- README.md | 2 + tessellate-main/build.gradle.kts | 2 + .../java/io/clusterless/tessellate/Main.java | 3 +- .../tessellate/factory/ManifestReader.java | 76 ++++++-------- .../tessellate/factory/TapFactories.java | 14 ++- .../tessellate/factory/TapFactory.java | 4 + .../tessellate/factory/hdfs/FSFactory.java | 99 +++++++++++-------- .../factory/hdfs/JSONFSFactory.java | 3 +- .../factory/hdfs/LinesFSFactory.java | 5 +- .../factory/hdfs/ParquetFactory.java | 18 +++- .../factory/hdfs/TextFSFactory.java | 6 +- .../factory/local/LocalDirectoryFactory.java | 5 - .../factory/local/tap/PrefixedDirTap.java | 10 ++ .../clusterless/tessellate/model/CopyOp.java | 2 +- .../clusterless/tessellate/model/Dataset.java | 2 + .../tessellate/model/EvalInsertOp.java | 52 ++++++++++ .../tessellate/model/HasManifest.java | 15 --- .../tessellate/model/InsertOp.java | 12 ++- .../tessellate/model/Partition.java | 4 +- .../io/clusterless/tessellate/model/Sink.java | 2 +- .../clusterless/tessellate/model/Source.java | 25 +++-- .../tessellate/model/Transform.java | 4 +- .../tessellate/options/InputOptions.java | 17 ++++ .../tessellate/options/OutputOptions.java | 22 +++++ .../options/PipelineOptionsMerge.java | 9 ++ .../tessellate/pipeline/Pipeline.java | 53 ++++++++-- .../tessellate/pipeline/Transforms.java | 1 + .../tessellate/util/LiteralResolver.java | 35 +++++++ .../io/clusterless/tessellate/util/URIs.java | 30 +++++- .../tessellate/util/LiteralResolverTest.java | 20 ++++ .../clusterless/tessellate/util/URIsTest.java | 25 +++++ 31 files changed, 435 insertions(+), 142 deletions(-) create mode 100644 tessellate-main/src/main/java/io/clusterless/tessellate/model/EvalInsertOp.java delete mode 100644 tessellate-main/src/main/java/io/clusterless/tessellate/model/HasManifest.java create mode 100644 tessellate-main/src/main/java/io/clusterless/tessellate/util/LiteralResolver.java create mode 100644 tessellate-main/src/test/java/io/clusterless/tessellate/util/LiteralResolverTest.java create mode 100644 tessellate-main/src/test/java/io/clusterless/tessellate/util/URIsTest.java diff --git a/README.md b/README.md index 79e9019..7d1b6b4 100644 --- a/README.md +++ b/README.md @@ -69,6 +69,8 @@ Usage: - insert - insert a literal value into a field - `value=>intoField|type` +- eval - evaluate an expression locally and insert into a field (relies on [MVEL](http://mvel.documentnode.com)) + - `expression!>intoField|type` - coerce - transform a field to a new type - `field|newType` - copy - copy a field value to a new field diff --git a/tessellate-main/build.gradle.kts b/tessellate-main/build.gradle.kts index 3d50ba1..6ecfcf9 100644 --- a/tessellate-main/build.gradle.kts +++ b/tessellate-main/build.gradle.kts @@ -85,6 +85,8 @@ dependencies { implementation("org.apache.hadoop:hadoop-common:$hadoop3Version") implementation("org.apache.hadoop:hadoop-aws:$hadoop3Version") + implementation("org.mvel:mvel2:2.5.0.Final") + // required by hadoop in java 9+ implementation("javax.xml.bind:jaxb-api:2.3.0") diff --git a/tessellate-main/src/main/java/io/clusterless/tessellate/Main.java b/tessellate-main/src/main/java/io/clusterless/tessellate/Main.java index 6ecfd40..95c7338 100644 --- a/tessellate-main/src/main/java/io/clusterless/tessellate/Main.java +++ b/tessellate-main/src/main/java/io/clusterless/tessellate/Main.java @@ -29,7 +29,8 @@ @CommandLine.Command( name = "tess", mixinStandardHelpOptions = true, - version = "1.0-wip" + version = "1.0-wip", + sortOptions = false ) public class Main implements Callable { enum Show { diff --git a/tessellate-main/src/main/java/io/clusterless/tessellate/factory/ManifestReader.java b/tessellate-main/src/main/java/io/clusterless/tessellate/factory/ManifestReader.java index dadd31b..99bfc8d 100644 --- a/tessellate-main/src/main/java/io/clusterless/tessellate/factory/ManifestReader.java +++ b/tessellate-main/src/main/java/io/clusterless/tessellate/factory/ManifestReader.java @@ -9,56 +9,56 @@ package io.clusterless.tessellate.factory; import cascading.flow.local.LocalFlowProcess; -import cascading.nested.json.hadoop3.JSONTextLine; -import cascading.tap.SinkMode; -import cascading.tap.hadoop.Hfs; -import cascading.tap.local.hadoop.LocalHfsAdaptor; +import cascading.tap.Tap; import cascading.tuple.TupleEntry; import cascading.tuple.TupleEntryIterator; import com.fasterxml.jackson.databind.JsonNode; -import io.clusterless.tessellate.model.Dataset; +import io.clusterless.tessellate.model.Field; +import io.clusterless.tessellate.model.Schema; +import io.clusterless.tessellate.model.Sink; import io.clusterless.tessellate.model.Source; +import io.clusterless.tessellate.options.PipelineOptions; +import io.clusterless.tessellate.util.Format; import io.clusterless.tessellate.util.JSONUtil; import io.clusterless.tessellate.util.URIs; -import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.URI; -import java.util.*; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; import java.util.stream.Collectors; public class ManifestReader { private static final Logger LOG = LoggerFactory.getLogger(ManifestReader.class); public static final int SHOW_DUPLICATES = 20; - public static ManifestReader from(Dataset dataset) { - if (!(dataset instanceof Source)) { - return new ManifestReader(dataset.uris()); - } + public static ManifestReader from(Sink sink) { + return new ManifestReader(sink.uris()); + } - return new ManifestReader(((Source) dataset)); + public static ManifestReader from(Source source) { + return new ManifestReader(source); } private final URI manifestURI; private final List uris; - private final int numPartitions; private List manifestUris; public ManifestReader(Source source) { this.manifestURI = source.manifest(); this.uris = clean(source.uris()); - this.numPartitions = source.partitions().size(); } public ManifestReader(List uris) { this.uris = clean(uris); this.manifestURI = null; - this.numPartitions = 0; } - public List uris(Properties conf) throws IOException { + public List uris(PipelineOptions pipelineOptions) throws IOException { if (manifestURI == null) { return uris; } @@ -69,7 +69,7 @@ public List uris(Properties conf) throws IOException { JsonNode node = null; - try (TupleEntryIterator entryIterator = openForRead(conf, manifestURI)) { + try (TupleEntryIterator entryIterator = openForRead(pipelineOptions, manifestURI)) { while (entryIterator.hasNext()) { TupleEntry next = entryIterator.next(); node = (JsonNode) next.getObject(0); @@ -93,32 +93,6 @@ public List uris(Properties conf) throws IOException { return manifestUris; } - public boolean urisFromManifest() { - return manifestUris != null; - } - - public URI findCommonRoot(Properties conf) throws IOException { - List uris = uris(conf); - - // uri is likely a directory or single file, let the Hfs tap handle it - if (!urisFromManifest() && uris.size() == 1) { - return uris.get(0); - } - - Set roots = uris.stream() - .map(u -> URIs.trim(u, numPartitions + 1)) - .map(Objects::toString) - .collect(Collectors.toSet()); - - String commonPrefix = StringUtils.getCommonPrefix(roots.toArray(new String[0])); - - if (commonPrefix.isEmpty()) { - throw new IllegalArgumentException("to many unique roots, got: " + roots); - } - - return URI.create(commonPrefix); - } - protected List clean(List uris) { List distinct = uris.stream() .map(URIs::copyWithoutQuery) @@ -147,7 +121,19 @@ protected List clean(List uris) { return uris; } - private static TupleEntryIterator openForRead(Properties conf, URI uri) throws IOException { - return new LocalHfsAdaptor(new Hfs(new JSONTextLine(), uri.toString(), SinkMode.KEEP)).openForRead(new LocalFlowProcess(conf)); + private TupleEntryIterator openForRead(PipelineOptions pipelineOptions, URI uri) throws IOException { + Source source = Source.builder() + .withSchema(Schema.builder() + .withFormat(Format.json) + .withDeclared(List.of(new Field("json|json"))) + .build()) + .withInputs(List.of(uri)) + .build(); + + SourceFactory sourceFactory = TapFactories.findSourceFactory(pipelineOptions, source); + + Tap sourceTap = sourceFactory.getSource(pipelineOptions, source); + + return sourceTap.openForRead(new LocalFlowProcess(new Properties())); } } diff --git a/tessellate-main/src/main/java/io/clusterless/tessellate/factory/TapFactories.java b/tessellate-main/src/main/java/io/clusterless/tessellate/factory/TapFactories.java index 0a1d771..4008fd1 100644 --- a/tessellate-main/src/main/java/io/clusterless/tessellate/factory/TapFactories.java +++ b/tessellate-main/src/main/java/io/clusterless/tessellate/factory/TapFactories.java @@ -15,12 +15,14 @@ import io.clusterless.tessellate.factory.local.LocalDirectoryFactory; import io.clusterless.tessellate.model.Sink; import io.clusterless.tessellate.model.Source; +import io.clusterless.tessellate.options.PipelineOptions; import io.clusterless.tessellate.util.Compression; import io.clusterless.tessellate.util.Format; import io.clusterless.tessellate.util.Protocol; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.net.URI; import java.util.*; import java.util.stream.Collectors; @@ -60,7 +62,17 @@ public class TapFactories { } } - public static SourceFactory findSourceFactory(Source sourceModel) { + public static SourceFactory findSourceFactory(PipelineOptions pipelineOptions, Source sourceModel) throws IOException { + if (sourceModel.manifest() != null) { + LOG.info("reading manifest: {}", sourceModel.manifest()); + + ManifestReader manifestReader = ManifestReader.from(sourceModel); + + List uris = manifestReader.uris(pipelineOptions); + + sourceModel.uris().addAll(uris); + } + List inputUris = sourceModel.uris(); Format format = sourceModel.schema().format(); Compression compression = sourceModel.schema().compression(); diff --git a/tessellate-main/src/main/java/io/clusterless/tessellate/factory/TapFactory.java b/tessellate-main/src/main/java/io/clusterless/tessellate/factory/TapFactory.java index 5d3ee46..8fe4e01 100644 --- a/tessellate-main/src/main/java/io/clusterless/tessellate/factory/TapFactory.java +++ b/tessellate-main/src/main/java/io/clusterless/tessellate/factory/TapFactory.java @@ -12,9 +12,13 @@ import io.clusterless.tessellate.util.Format; import io.clusterless.tessellate.util.Protocol; +import java.util.Properties; import java.util.Set; public interface TapFactory { + default void applyGlobalProperties(Properties properties) { + } + Set getProtocols(); Set getFormats(); diff --git a/tessellate-main/src/main/java/io/clusterless/tessellate/factory/hdfs/FSFactory.java b/tessellate-main/src/main/java/io/clusterless/tessellate/factory/hdfs/FSFactory.java index 4cea9c0..1bcf282 100644 --- a/tessellate-main/src/main/java/io/clusterless/tessellate/factory/hdfs/FSFactory.java +++ b/tessellate-main/src/main/java/io/clusterless/tessellate/factory/hdfs/FSFactory.java @@ -19,7 +19,6 @@ import cascading.tap.partition.Partition; import cascading.tuple.Fields; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; -import io.clusterless.tessellate.factory.ManifestReader; import io.clusterless.tessellate.factory.ManifestWriter; import io.clusterless.tessellate.factory.Observed; import io.clusterless.tessellate.factory.hdfs.fs.ObserveLocalFileSystem; @@ -30,6 +29,7 @@ import io.clusterless.tessellate.options.AWSOptions; import io.clusterless.tessellate.options.PipelineOptions; import io.clusterless.tessellate.util.Property; +import io.clusterless.tessellate.util.URIs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.s3a.Constants; import org.apache.hadoop.fs.s3a.S3AFileSystem; @@ -67,22 +67,25 @@ public int openWritesThreshold() { Fields declaredFields = declaredFields(dataset, currentFields); - Properties local = initLocalProperties(pipelineOptions, dataset, declaredFields); + Properties local = getProperties(pipelineOptions, dataset, declaredFields); - ManifestReader manifestReader = ManifestReader.from(dataset); + List uris = dataset.uris(); - List uris = manifestReader.uris(local); + URI commonRoot = uris.get(0); - URI commonRoot = manifestReader.findCommonRoot(local); + // uri is likely a directory or single file, let the Hfs tap handle it + if (!isSink && dataset.manifest() != null) { + commonRoot = URIs.findCommonPrefix(uris, dataset.partitions().size()); + } - LOG.info("{}: handing uris: {}, with common: {}", logPrefix(isSink), uris.size(), commonRoot); + LOG.info("{}: handling uris: {}, with common: {}", logPrefix(isSink), uris.size(), commonRoot); - Scheme scheme = createScheme(pipelineOptions, dataset, declaredFields); + Scheme scheme = createScheme(dataset, declaredFields); Tap tap; if (isSink) { - tap = createSinkTap(local, scheme, commonRoot, uris, ((Sink) dataset).manifest()); + tap = createSinkTap(local, scheme, commonRoot, uris, dataset.manifest()); } else { tap = createSourceTap(local, scheme, commonRoot, uris); } @@ -113,8 +116,33 @@ private static String logPrefix(boolean isSink) { return isSink ? "writing" : "reading"; } - @NotNull - protected Properties initLocalProperties(PipelineOptions pipelineOptions, Dataset dataset, Fields declaredFields) { + @Override + public void applyGlobalProperties(Properties properties) { + properties.setProperty("mapred.output.direct." + S3AFileSystem.class.getSimpleName(), "true"); + properties.setProperty("mapreduce.input.fileinputformat.input.dir.recursive", "true"); + + // intercept all writes against the s3a: and file: filesystem + properties.setProperty("mapred.output.direct." + ObserveS3AFileSystem.class.getSimpleName(), "true"); + properties.setProperty("fs.s3a.impl", ObserveS3AFileSystem.class.getName()); + properties.setProperty("fs.s3.impl", ObserveS3AFileSystem.class.getName()); + properties.setProperty("mapred.output.direct." + ObserveLocalFileSystem.class.getSimpleName(), "true"); + properties.setProperty("fs.file.impl", ObserveLocalFileSystem.class.getName()); + + properties.setProperty(Constants.AWS_CREDENTIALS_PROVIDER, getAWSCredentialProviders()); + } + + protected Properties getProperties(PipelineOptions pipelineOptions, Dataset dataset, Fields declaredFields) { + Properties local = new Properties(); + + // covers case when reading manifest + applyGlobalProperties(local); + + local = applyAWSProperties(pipelineOptions, local, isSink(dataset)); + + return applySinkProperties(dataset, declaredFields, local); + } + + protected Properties applySinkProperties(Dataset dataset, Fields declaredFields, Properties local) { String prefix = PART_NAME_DEFAULT; // hdfs always treat paths as directories, so we need to provide a prefix for the part files @@ -122,42 +150,31 @@ protected Properties initLocalProperties(PipelineOptions pipelineOptions, Datase prefix = getPartFileName((Sink) dataset, declaredFields); } - Properties local = new Properties(); - - local.setProperty("mapred.output.direct." + S3AFileSystem.class.getSimpleName(), "true"); local.setProperty("cascading.tapcollector.partname", String.format("%%s%%s%s-%%05d-%%05d", prefix)); - local.setProperty("mapreduce.input.fileinputformat.input.dir.recursive", "true"); - - // intercept all writes against the s3a: and file: filesystem - local.setProperty("mapred.output.direct." + ObserveS3AFileSystem.class.getSimpleName(), "true"); - local.setProperty("fs.s3a.impl", ObserveS3AFileSystem.class.getName()); - local.setProperty("fs.s3.impl", ObserveS3AFileSystem.class.getName()); - local.setProperty("mapred.output.direct." + ObserveLocalFileSystem.class.getSimpleName(), "true"); - local.setProperty("fs.file.impl", ObserveLocalFileSystem.class.getName()); - return applyAWSProperties(pipelineOptions, dataset, local); + return local; } - protected abstract Scheme createScheme(PipelineOptions pipelineOptions, Dataset dataset, Fields declaredFields); + protected abstract Scheme createScheme(Dataset dataset, Fields declaredFields); - protected Properties applyAWSProperties(PipelineOptions pipelineOptions, Dataset dataset, Properties local) { - AWSOptions overrideAWSOptions = isSink(dataset) ? pipelineOptions.outputOptions() : pipelineOptions.inputOptions(); + protected Properties applyAWSProperties(PipelineOptions pipelineOptions, Properties properties, boolean isSink) { + AWSOptions overrideAWSOptions = isSink ? pipelineOptions.outputOptions() : pipelineOptions.inputOptions(); List awsOptions = List.of(overrideAWSOptions, pipelineOptions); Optional hasAssumedRoleARN = awsOptions.stream() .filter(AWSOptions::hasAWSAssumedRoleARN) .findFirst(); - hasAssumedRoleARN.ifPresent(o -> local.setProperty(Constants.ASSUMED_ROLE_ARN, o.awsAssumedRoleARN())); + hasAssumedRoleARN.ifPresent(o -> properties.setProperty(Constants.ASSUMED_ROLE_ARN, o.awsAssumedRoleARN())); - Property.setIfNotNullFromSystem(local, Constants.ASSUMED_ROLE_ARN); + Property.setIfNotNullFromSystem(properties, Constants.ASSUMED_ROLE_ARN); - if (local.containsKey(Constants.ASSUMED_ROLE_ARN)) { - local.setProperty(Constants.ASSUMED_ROLE_SESSION_NAME, "role-session-" + System.currentTimeMillis()); - local.setProperty(Constants.AWS_CREDENTIALS_PROVIDER, AssumedRoleCredentialProvider.class.getName()); - local.setProperty(Constants.ASSUMED_ROLE_CREDENTIALS_PROVIDER, getAWSCredentialProviders()); + if (properties.containsKey(Constants.ASSUMED_ROLE_ARN)) { + properties.setProperty(Constants.ASSUMED_ROLE_SESSION_NAME, "role-session-" + System.currentTimeMillis()); + properties.setProperty(Constants.AWS_CREDENTIALS_PROVIDER, AssumedRoleCredentialProvider.class.getName()); + properties.setProperty(Constants.ASSUMED_ROLE_CREDENTIALS_PROVIDER, getAWSCredentialProviders()); } else { - local.setProperty(Constants.AWS_CREDENTIALS_PROVIDER, getAWSCredentialProviders()); + properties.setProperty(Constants.AWS_CREDENTIALS_PROVIDER, getAWSCredentialProviders()); } Optional hasAWSEndpoint = awsOptions.stream() @@ -165,14 +182,14 @@ protected Properties applyAWSProperties(PipelineOptions pipelineOptions, Dataset .map(AWSOptions::awsEndpoint) .findFirst(); - Property.setIfNotNullFromEnvThenSystem(local, "AWS_S3_ENDPOINT", Constants.ENDPOINT, hasAWSEndpoint.orElse(null)); - Property.setIfNotNullFromEnvThenSystem(local, "AWS_ACCESS_KEY_ID", Constants.ACCESS_KEY); - Property.setIfNotNullFromEnvThenSystem(local, "AWS_SECRET_ACCESS_KEY", Constants.SECRET_KEY); - Property.setIfNotNullFromSystem(local, Constants.SESSION_TOKEN); - Property.setIfNotNullFromSystem(local, Constants.PROXY_HOST); - Property.setIfNotNullFromSystem(local, Constants.PROXY_PORT); + Property.setIfNotNullFromEnvThenSystem(properties, "AWS_S3_ENDPOINT", Constants.ENDPOINT, hasAWSEndpoint.orElse(null)); + Property.setIfNotNullFromEnvThenSystem(properties, "AWS_ACCESS_KEY_ID", Constants.ACCESS_KEY); + Property.setIfNotNullFromEnvThenSystem(properties, "AWS_SECRET_ACCESS_KEY", Constants.SECRET_KEY); + Property.setIfNotNullFromSystem(properties, Constants.SESSION_TOKEN); + Property.setIfNotNullFromSystem(properties, Constants.PROXY_HOST); + Property.setIfNotNullFromSystem(properties, Constants.PROXY_PORT); - return local; + return properties; } @NotNull @@ -234,6 +251,8 @@ private String getAWSCredentialProviders() { list.addFirst(DefaultAWSCredentialsProviderChain.class); - return list.stream().map(Class::getName).collect(Collectors.joining(",")); + return list.stream() + .map(Class::getName) + .collect(Collectors.joining(",")); } } diff --git a/tessellate-main/src/main/java/io/clusterless/tessellate/factory/hdfs/JSONFSFactory.java b/tessellate-main/src/main/java/io/clusterless/tessellate/factory/hdfs/JSONFSFactory.java index 64d2ac7..67b5926 100644 --- a/tessellate-main/src/main/java/io/clusterless/tessellate/factory/hdfs/JSONFSFactory.java +++ b/tessellate-main/src/main/java/io/clusterless/tessellate/factory/hdfs/JSONFSFactory.java @@ -14,7 +14,6 @@ import cascading.tuple.Fields; import io.clusterless.tessellate.factory.TapFactory; import io.clusterless.tessellate.model.Dataset; -import io.clusterless.tessellate.options.PipelineOptions; import io.clusterless.tessellate.util.Compression; import io.clusterless.tessellate.util.Format; import io.clusterless.tessellate.util.JSONUtil; @@ -30,7 +29,7 @@ public Set getFormats() { } @Override - protected Scheme createScheme(PipelineOptions pipelineOptions, Dataset dataset, Fields declaredFields) { + protected Scheme createScheme(Dataset dataset, Fields declaredFields) { if (dataset.schema().compression() == Compression.none) { return new JSONTextLine(JSONUtil.DATA_MAPPER, declaredFields); } else { diff --git a/tessellate-main/src/main/java/io/clusterless/tessellate/factory/hdfs/LinesFSFactory.java b/tessellate-main/src/main/java/io/clusterless/tessellate/factory/hdfs/LinesFSFactory.java index 9fcc84f..ec45a66 100644 --- a/tessellate-main/src/main/java/io/clusterless/tessellate/factory/hdfs/LinesFSFactory.java +++ b/tessellate-main/src/main/java/io/clusterless/tessellate/factory/hdfs/LinesFSFactory.java @@ -13,7 +13,6 @@ import io.clusterless.tessellate.options.PipelineOptions; import io.clusterless.tessellate.util.Compression; import io.clusterless.tessellate.util.Protocol; -import org.jetbrains.annotations.NotNull; import java.util.Properties; import java.util.Set; @@ -30,8 +29,8 @@ public Set getCompressions() { } @Override - protected @NotNull Properties initLocalProperties(PipelineOptions pipelineOptions, Dataset dataset, Fields declaredFields) { - Properties properties = super.initLocalProperties(pipelineOptions, dataset, declaredFields); + public Properties getProperties(PipelineOptions pipelineOptions, Dataset dataset, Fields declaredFields) { + Properties properties = super.getProperties(pipelineOptions, dataset, declaredFields); switch (dataset.schema().compression()) { case none: diff --git a/tessellate-main/src/main/java/io/clusterless/tessellate/factory/hdfs/ParquetFactory.java b/tessellate-main/src/main/java/io/clusterless/tessellate/factory/hdfs/ParquetFactory.java index 485dac2..953d921 100644 --- a/tessellate-main/src/main/java/io/clusterless/tessellate/factory/hdfs/ParquetFactory.java +++ b/tessellate-main/src/main/java/io/clusterless/tessellate/factory/hdfs/ParquetFactory.java @@ -13,7 +13,6 @@ import cascading.tuple.Fields; import io.clusterless.tessellate.factory.TapFactory; import io.clusterless.tessellate.model.Dataset; -import io.clusterless.tessellate.options.PipelineOptions; import io.clusterless.tessellate.util.Compression; import io.clusterless.tessellate.util.Format; import io.clusterless.tessellate.util.JSONUtil; @@ -38,11 +37,18 @@ public Set getFormats() { @Override public Set getCompressions() { - return Set.of(Compression.none, Compression.gzip, Compression.snappy, Compression.lzo); + // lzo needs a proprietary library + return Set.of( + Compression.none, + Compression.gzip, + Compression.snappy, + Compression.brotli, + Compression.lz4 + ); } @Override - protected Scheme createScheme(PipelineOptions pipelineOptions, Dataset dataset, Fields declaredFields) { + protected Scheme createScheme(Dataset dataset, Fields declaredFields) { CompressionCodecName compressionCodecName = compressionCodecName(dataset); return new TypedParquetScheme(declaredFields, compressionCodecName) @@ -60,6 +66,12 @@ private static CompressionCodecName compressionCodecName(Dataset dataset) { case snappy: compressionCodecName = CompressionCodecName.SNAPPY; break; + case brotli: + compressionCodecName = CompressionCodecName.BROTLI; + break; + case lz4: + compressionCodecName = CompressionCodecName.LZ4; + break; case lzo: compressionCodecName = CompressionCodecName.LZO; break; diff --git a/tessellate-main/src/main/java/io/clusterless/tessellate/factory/hdfs/TextFSFactory.java b/tessellate-main/src/main/java/io/clusterless/tessellate/factory/hdfs/TextFSFactory.java index dcd7360..4b59352 100644 --- a/tessellate-main/src/main/java/io/clusterless/tessellate/factory/hdfs/TextFSFactory.java +++ b/tessellate-main/src/main/java/io/clusterless/tessellate/factory/hdfs/TextFSFactory.java @@ -8,6 +8,7 @@ package io.clusterless.tessellate.factory.hdfs; +import cascading.nested.json.hadoop3.JSONTextLine; import cascading.scheme.Scheme; import cascading.scheme.hadoop.TextDelimited; import cascading.scheme.hadoop.TextLine; @@ -15,7 +16,6 @@ import io.clusterless.tessellate.factory.TapFactory; import io.clusterless.tessellate.model.Dataset; import io.clusterless.tessellate.model.Schema; -import io.clusterless.tessellate.options.PipelineOptions; import io.clusterless.tessellate.util.Compression; import io.clusterless.tessellate.util.Format; @@ -30,7 +30,7 @@ public Set getFormats() { } @Override - protected Scheme createScheme(PipelineOptions pipelineOptions, Dataset dataset, Fields declaredFields) { + protected Scheme createScheme(Dataset dataset, Fields declaredFields) { Schema schema = dataset.schema(); TextLine.Compress compress = schema.compression() == Compression.none ? TextLine.Compress.DISABLE : TextLine.Compress.ENABLE; @@ -42,6 +42,8 @@ protected Scheme createScheme(PipelineOptions pipelineOptions, Dataset dataset, return new TextDelimited(declaredFields, compress, schema.embedsSchema(), ",", "\""); case tsv: return new TextDelimited(declaredFields, compress, schema.embedsSchema(), "\t", "\""); + case json: + return new JSONTextLine(declaredFields, compress); } } } diff --git a/tessellate-main/src/main/java/io/clusterless/tessellate/factory/local/LocalDirectoryFactory.java b/tessellate-main/src/main/java/io/clusterless/tessellate/factory/local/LocalDirectoryFactory.java index e67ec4e..a44d38f 100644 --- a/tessellate-main/src/main/java/io/clusterless/tessellate/factory/local/LocalDirectoryFactory.java +++ b/tessellate-main/src/main/java/io/clusterless/tessellate/factory/local/LocalDirectoryFactory.java @@ -23,7 +23,6 @@ import io.clusterless.tessellate.factory.TapFactory; import io.clusterless.tessellate.factory.local.tap.PrefixedDirTap; import io.clusterless.tessellate.model.Dataset; -import io.clusterless.tessellate.model.HasManifest; import io.clusterless.tessellate.model.Schema; import io.clusterless.tessellate.model.Sink; import io.clusterless.tessellate.options.PipelineOptions; @@ -74,10 +73,6 @@ public int openWritesThreshold() { Fields declaredFields = declaredFields(dataset, currentFields); - if (dataset instanceof HasManifest && ((HasManifest) dataset).manifest() != null) { - throw new IllegalStateException("manifests not supported for local sinks/sources"); - } - List uris = dataset.uris(); if (uris.size() > 1) { diff --git a/tessellate-main/src/main/java/io/clusterless/tessellate/factory/local/tap/PrefixedDirTap.java b/tessellate-main/src/main/java/io/clusterless/tessellate/factory/local/tap/PrefixedDirTap.java index caf60d1..40a3043 100644 --- a/tessellate-main/src/main/java/io/clusterless/tessellate/factory/local/tap/PrefixedDirTap.java +++ b/tessellate-main/src/main/java/io/clusterless/tessellate/factory/local/tap/PrefixedDirTap.java @@ -18,6 +18,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.file.Path; +import java.nio.file.PathMatcher; import java.util.Properties; public class PrefixedDirTap extends DirTap { @@ -46,6 +47,15 @@ protected String getOutputFileBasename() { return prefix; } + @Override + protected PathMatcher getPathMatcher() { + // Hadoop FS writes _SUCCESS and .crc files, so we need to ignore them + return path -> { + String string = path.getFileName().toString(); + return string.charAt(0) != '_' && string.charAt(0) != '.'; + }; + } + protected TapWith create(Scheme scheme, Path path, SinkMode sinkMode) { try { return new PrefixedDirTap(scheme, path, sinkMode, prefix); diff --git a/tessellate-main/src/main/java/io/clusterless/tessellate/model/CopyOp.java b/tessellate-main/src/main/java/io/clusterless/tessellate/model/CopyOp.java index e020cd8..0af15a5 100644 --- a/tessellate-main/src/main/java/io/clusterless/tessellate/model/CopyOp.java +++ b/tessellate-main/src/main/java/io/clusterless/tessellate/model/CopyOp.java @@ -14,7 +14,7 @@ /** *
- * ts->ymd|DateTime|yyyyMMdd
+ * ts+>ymd|DateTime|yyyyMMdd
  * 
*/ public class CopyOp extends Translate implements TransformOp { diff --git a/tessellate-main/src/main/java/io/clusterless/tessellate/model/Dataset.java b/tessellate-main/src/main/java/io/clusterless/tessellate/model/Dataset.java index 320c996..359021d 100644 --- a/tessellate-main/src/main/java/io/clusterless/tessellate/model/Dataset.java +++ b/tessellate-main/src/main/java/io/clusterless/tessellate/model/Dataset.java @@ -12,6 +12,8 @@ import java.util.List; public interface Dataset { + URI manifest(); + Schema schema(); List uris(); diff --git a/tessellate-main/src/main/java/io/clusterless/tessellate/model/EvalInsertOp.java b/tessellate-main/src/main/java/io/clusterless/tessellate/model/EvalInsertOp.java new file mode 100644 index 0000000..7239a47 --- /dev/null +++ b/tessellate-main/src/main/java/io/clusterless/tessellate/model/EvalInsertOp.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2023 Chris K Wensel . All Rights Reserved. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +package io.clusterless.tessellate.model; + +import cascading.tuple.type.CoercibleType; +import io.clusterless.tessellate.pipeline.Transforms; +import io.clusterless.tessellate.util.LiteralResolver; +import org.jetbrains.annotations.NotNull; + +import java.lang.reflect.Type; +import java.util.Map; + +public class EvalInsertOp extends InsertOp { + public EvalInsertOp(String declaration) { + super(declaration); + } + + @NotNull + protected String translate() { + return "!>"; + } + + public Object evaluate(Map context) { + Class resolvedType = getResolvedType(); + + return LiteralResolver.resolve(value(), context, resolvedType); + } + + protected Class getResolvedType() { + Type type = field().fields().getType(0); + Class resolvedType; + if (type instanceof Class) { + resolvedType = (Class) type; + } else if (type instanceof CoercibleType) { + resolvedType = ((CoercibleType) type).getCanonicalType(); + } else { + throw new IllegalArgumentException("invalid type: " + type); + } + return resolvedType; + } + + @Override + public Transforms transform() { + return Transforms.eval; + } +} diff --git a/tessellate-main/src/main/java/io/clusterless/tessellate/model/HasManifest.java b/tessellate-main/src/main/java/io/clusterless/tessellate/model/HasManifest.java deleted file mode 100644 index d56e371..0000000 --- a/tessellate-main/src/main/java/io/clusterless/tessellate/model/HasManifest.java +++ /dev/null @@ -1,15 +0,0 @@ -/* - * Copyright (c) 2023 Chris K Wensel . All Rights Reserved. - * - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. - */ - -package io.clusterless.tessellate.model; - -import java.net.URI; - -public interface HasManifest { - URI manifest(); -} diff --git a/tessellate-main/src/main/java/io/clusterless/tessellate/model/InsertOp.java b/tessellate-main/src/main/java/io/clusterless/tessellate/model/InsertOp.java index 74c655b..cefdc5e 100644 --- a/tessellate-main/src/main/java/io/clusterless/tessellate/model/InsertOp.java +++ b/tessellate-main/src/main/java/io/clusterless/tessellate/model/InsertOp.java @@ -11,10 +11,11 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import io.clusterless.tessellate.pipeline.Transforms; +import org.jetbrains.annotations.NotNull; /** *
- * value=field|type
+ * value=>field|type
  * 
*/ public class InsertOp implements TransformOp, Model { @@ -27,16 +28,21 @@ public class InsertOp implements TransformOp, Model { @JsonCreator public InsertOp(String declaration) { this.declaration = declaration; - String[] split = declaration.split("=>"); + String[] split = declaration.split(translate()); if (split.length != 2) { - throw new IllegalArgumentException("invalid insert declaration, expects 'value=field`, got: " + declaration); + throw new IllegalArgumentException("invalid " + transform().name() + " declaration, expects 'value" + translate() + "field`, got: " + declaration); } this.field = new Field(split[1]); this.value = split[0]; } + @NotNull + protected String translate() { + return "=>"; + } + public String declaration() { return declaration; } diff --git a/tessellate-main/src/main/java/io/clusterless/tessellate/model/Partition.java b/tessellate-main/src/main/java/io/clusterless/tessellate/model/Partition.java index 73b1e24..8cd9657 100644 --- a/tessellate-main/src/main/java/io/clusterless/tessellate/model/Partition.java +++ b/tessellate-main/src/main/java/io/clusterless/tessellate/model/Partition.java @@ -12,7 +12,7 @@ import org.jetbrains.annotations.NotNull; /** - * ts->ymd|DateTime|yyyyMMdd + * ts+>ymd|DateTime|yyyyMMdd */ public class Partition extends Translate { @JsonCreator @@ -23,7 +23,7 @@ public Partition(String partition) { @Override @NotNull protected String translate() { - return "\\+>"; + return "[+]>"; } @Override diff --git a/tessellate-main/src/main/java/io/clusterless/tessellate/model/Sink.java b/tessellate-main/src/main/java/io/clusterless/tessellate/model/Sink.java index c851214..8744e0b 100644 --- a/tessellate-main/src/main/java/io/clusterless/tessellate/model/Sink.java +++ b/tessellate-main/src/main/java/io/clusterless/tessellate/model/Sink.java @@ -12,7 +12,7 @@ import java.util.ArrayList; import java.util.List; -public class Sink implements Dataset, Model, HasManifest { +public class Sink implements Dataset, Model { private URI manifest; private String manifestLot; private URI output; diff --git a/tessellate-main/src/main/java/io/clusterless/tessellate/model/Source.java b/tessellate-main/src/main/java/io/clusterless/tessellate/model/Source.java index 59864c0..96a07a7 100644 --- a/tessellate-main/src/main/java/io/clusterless/tessellate/model/Source.java +++ b/tessellate-main/src/main/java/io/clusterless/tessellate/model/Source.java @@ -12,8 +12,9 @@ import java.util.ArrayList; import java.util.List; -public class Source implements Dataset, Model, HasManifest { +public class Source implements Dataset, Model { private URI manifest; + private String manifestLot; private List inputs = new ArrayList<>(); private Schema schema = new Schema(); private List partitions = new ArrayList<>(); @@ -29,6 +30,10 @@ public URI manifest() { return manifest; } + public String manifestLot() { + return manifestLot; + } + public List inputs() { return inputs; } @@ -59,13 +64,13 @@ public LineOptions lines() { return lines; } - public static final class Builder { private URI manifest; + private String manifestLot; private List inputs = new ArrayList<>(); private Schema schema = new Schema(); private List partitions = new ArrayList<>(); - private boolean namedPartitions; + private boolean namedPartitions = true; private LineOptions lines = new LineOptions(); private List select = new ArrayList<>(); @@ -81,6 +86,11 @@ public Builder withManifest(URI manifest) { return this; } + public Builder withManifestLot(String manifestLot) { + this.manifestLot = manifestLot; + return this; + } + public Builder withInputs(List inputs) { this.inputs = inputs; return this; @@ -113,12 +123,13 @@ public Builder withSelect(List select) { public Source build() { Source source = new Source(); - source.schema = this.schema; - source.partitions = this.partitions; - source.inputs = this.inputs; + source.manifestLot = this.manifestLot; + source.manifest = this.manifest; source.lines = this.lines; + source.inputs = this.inputs; + source.schema = this.schema; source.select = this.select; - source.manifest = this.manifest; + source.partitions = this.partitions; source.namedPartitions = this.namedPartitions; return source; } diff --git a/tessellate-main/src/main/java/io/clusterless/tessellate/model/Transform.java b/tessellate-main/src/main/java/io/clusterless/tessellate/model/Transform.java index d100f32..2c3882b 100644 --- a/tessellate-main/src/main/java/io/clusterless/tessellate/model/Transform.java +++ b/tessellate-main/src/main/java/io/clusterless/tessellate/model/Transform.java @@ -48,8 +48,10 @@ public void addTransform(String transform) { transforms.add(new RenameOp(transform)); } else if (transform.contains("+>")) { transforms.add(new CopyOp(transform)); - } else if (transform.contains("=")) { + } else if (transform.contains("=>")) { transforms.add(new InsertOp(transform)); + } else if (transform.contains("!>")) { + transforms.add(new EvalInsertOp(transform)); } else { transforms.add(new CoerceOp(transform)); } diff --git a/tessellate-main/src/main/java/io/clusterless/tessellate/options/InputOptions.java b/tessellate-main/src/main/java/io/clusterless/tessellate/options/InputOptions.java index db1e4c8..f6a9fe5 100644 --- a/tessellate-main/src/main/java/io/clusterless/tessellate/options/InputOptions.java +++ b/tessellate-main/src/main/java/io/clusterless/tessellate/options/InputOptions.java @@ -17,6 +17,10 @@ public class InputOptions implements AWSOptions { @CommandLine.Option(names = {"-i", "--input"}, description = "input uris") private List inputs = new LinkedList<>(); + @CommandLine.Option(names = {"-m", "--input-manifest"}, description = "input manifest uri") + private URI inputManifest; + @CommandLine.Option(names = {"--input-lot"}, description = "input lot") + private String inputLot; @CommandLine.Option(names = {"--input-aws-endpoint"}, description = "aws endpoint") protected String awsEndpoint; @CommandLine.Option(names = {"--input-aws-region"}, description = "aws region") @@ -33,6 +37,19 @@ public List inputs() { return inputs; } + public URI inputManifest() { + return inputManifest; + } + + public String inputLot() { + return inputLot; + } + + public InputOptions setInputManifest(URI inputManifest) { + this.inputManifest = inputManifest; + return this; + } + @Override public String awsEndpoint() { return awsEndpoint; diff --git a/tessellate-main/src/main/java/io/clusterless/tessellate/options/OutputOptions.java b/tessellate-main/src/main/java/io/clusterless/tessellate/options/OutputOptions.java index 1d78384..2408bdd 100644 --- a/tessellate-main/src/main/java/io/clusterless/tessellate/options/OutputOptions.java +++ b/tessellate-main/src/main/java/io/clusterless/tessellate/options/OutputOptions.java @@ -15,6 +15,10 @@ public class OutputOptions implements AWSOptions { @CommandLine.Option(names = {"-o", "--output"}, description = "output uris") private URI output; + @CommandLine.Option(names = {"-t", "--output-manifest"}, description = "output manifest uri template") + private String outputManifest; + @CommandLine.Option(names = {"-l", "--output-lot"}, description = "output lot") + private String outputLot; @CommandLine.Option(names = {"--output-aws-endpoint"}, description = "aws endpoint") protected String awsEndpoint; @CommandLine.Option(names = {"--output-aws-region"}, description = "aws region") @@ -31,6 +35,24 @@ public OutputOptions setOutput(URI output) { return this; } + public String outputManifest() { + return outputManifest; + } + + public OutputOptions setOutputManifest(String outputManifest) { + this.outputManifest = outputManifest; + return this; + } + + public String outputLot() { + return outputLot; + } + + public OutputOptions setOutputLot(String outputLot) { + this.outputLot = outputLot; + return this; + } + @Override public String awsEndpoint() { return awsEndpoint; diff --git a/tessellate-main/src/main/java/io/clusterless/tessellate/options/PipelineOptionsMerge.java b/tessellate-main/src/main/java/io/clusterless/tessellate/options/PipelineOptionsMerge.java index 1e47c1f..4bffb66 100644 --- a/tessellate-main/src/main/java/io/clusterless/tessellate/options/PipelineOptionsMerge.java +++ b/tessellate-main/src/main/java/io/clusterless/tessellate/options/PipelineOptionsMerge.java @@ -40,6 +40,7 @@ public class PipelineOptionsMerge { private static BuildSpec buildSpec = new BuildSpec() .putInto("inputs", "/source/inputs") .putInto("inputManifest", "/source/manifest") + .putInto("inputManifestLot", "/source/manifestLot") .putInto("output", "/sink/output") .putInto("outputManifest", "/sink/manifest") .putInto("outputManifestLot", "/sink/manifestLot"); @@ -57,7 +58,11 @@ public class PipelineOptionsMerge { static { argumentLookups.put("inputs", pipelineOptions -> nullOrNode(pipelineOptions.inputOptions().inputs())); + argumentLookups.put("inputManifest", pipelineOptions -> nullOrNode(pipelineOptions.inputOptions().inputManifest())); + argumentLookups.put("inputManifestLot", pipelineOptions -> nullOrNode(pipelineOptions.inputOptions().inputLot())); argumentLookups.put("output", pipelineOptions -> nullOrNode(pipelineOptions.outputOptions().output())); + argumentLookups.put("outputManifest", pipelineOptions -> nullOrNode(pipelineOptions.outputOptions().outputManifest())); + argumentLookups.put("outputManifestLot", pipelineOptions -> nullOrNode(pipelineOptions.outputOptions().outputLot())); } PipelineOptions pipelineOptions; @@ -104,6 +109,10 @@ public PipelineDef merge() { } private JsonNode resolve(Path path, JsonNode jsonNode) { + if (jsonNode.isNull()) { + return jsonNode; + } + URI uri = JSONUtil.treeToValueSafe(jsonNode, URI.class); if (!(uri.getScheme() == null || uri.getScheme().equals("file"))) { diff --git a/tessellate-main/src/main/java/io/clusterless/tessellate/pipeline/Pipeline.java b/tessellate-main/src/main/java/io/clusterless/tessellate/pipeline/Pipeline.java index a677f6b..bf7e4b4 100644 --- a/tessellate-main/src/main/java/io/clusterless/tessellate/pipeline/Pipeline.java +++ b/tessellate-main/src/main/java/io/clusterless/tessellate/pipeline/Pipeline.java @@ -31,11 +31,14 @@ import io.clusterless.tessellate.model.*; import io.clusterless.tessellate.options.PipelineOptions; import io.clusterless.tessellate.util.Format; +import io.clusterless.tessellate.util.LiteralResolver; import io.clusterless.tessellate.util.Models; +import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Map; import java.util.Properties; import java.util.concurrent.atomic.AtomicBoolean; @@ -46,6 +49,7 @@ public class Pipeline { private final PipelineOptions pipelineOptions; private final PipelineDef pipelineDef; private Flow flow; + private Properties commonProperties = new Properties(); private LocalFlowProcess localFlowProcess; private final AtomicBoolean running = new AtomicBoolean(false); @@ -65,7 +69,7 @@ public PipelineDef pipelineDef() { public LocalFlowProcess flowProcess() { if (localFlowProcess == null) { - localFlowProcess = new LocalFlowProcess(); + localFlowProcess = new LocalFlowProcess(commonProperties); } return localFlowProcess; } @@ -83,7 +87,10 @@ public boolean isRunning() { } public void build() throws IOException { - SourceFactory sourceFactory = TapFactories.findSourceFactory(pipelineDef.source()); + SourceFactory sourceFactory = TapFactories.findSourceFactory(pipelineOptions, pipelineDef.source()); + + sourceFactory.applyGlobalProperties(commonProperties); + Tap sourceTap = sourceFactory.getSource(pipelineOptions, pipelineDef.source()); if (pipelineDef.source().schema().embedsSchema() || pipelineDef.source().schema().format().alwaysEmbedsSchema()) { @@ -92,28 +99,40 @@ public void build() throws IOException { // get source fields here so that any partition fields will be captured Fields currentFields = sourceTap.getSourceFields(); + logCurrentFields(currentFields); Pipe pipe = new Pipe("head"); Schema sourceSchema = pipelineDef.source().schema(); if (sourceSchema.format() == Format.regex) { Fields declaredFields = Models.fieldAsFields(sourceSchema.declared(), String.class, Fields.ALL); - pipe = new Each(pipe, new Fields("line"), new RegexParser(declaredFields, sourceSchema.pattern()), Fields.RESULTS); + pipe = new Each(pipe, new Fields("line"), new RegexParser(declaredFields, sourceSchema.pattern()), Fields.SWAP); LOG.info("parsing lines with regex: {}", sourceSchema.pattern()); - currentFields = declaredFields; + currentFields = currentFields.subtract(new Fields("line")).append(declaredFields); + logCurrentFields(currentFields); } // todo: group like transforms together if there are no interdependencies for (TransformOp transformOp : pipelineDef.transform().transformOps()) { switch (transformOp.transform()) { + case eval: + EvalInsertOp evalOp = (EvalInsertOp) transformOp; + Fields evalFields = evalOp.field().fields(); + Object eval = evalOp.evaluate(getContext()); + LOG.info("transform eval: fields: {}, value: {}", evalFields, eval); + pipe = new Each(pipe, new Insert(evalFields, eval), Fields.ALL); + currentFields = currentFields.append(evalFields); + logCurrentFields(currentFields); + break; case insert: InsertOp insertOp = (InsertOp) transformOp; Fields insertFields = insertOp.field().fields(); - String value = insertOp.value() == null || insertOp.value().isEmpty() ? null : insertOp.value(); + String value = insertOp.value() == null || ((String) insertOp.value()).isEmpty() ? null : insertOp.value(); Object literal = Coercions.coerce(value, insertFields.getType(0)); LOG.info("transform insert: fields: {}, value: {}", insertFields, literal); pipe = new Each(pipe, new Insert(insertFields, literal), Fields.ALL); currentFields = currentFields.append(insertFields); + logCurrentFields(currentFields); break; case coerce: CoerceOp coerceOp = (CoerceOp) transformOp; @@ -121,6 +140,7 @@ public void build() throws IOException { LOG.info("transform coerce: fields: {}", coerceFields); pipe = new Coerce(pipe, coerceFields); currentFields = currentFields.rename(coerceFields, coerceFields); // change the type information + logCurrentFields(currentFields); break; case copy: CopyOp copyOp = (CopyOp) transformOp; @@ -129,6 +149,7 @@ public void build() throws IOException { LOG.info("transform copy: from: {}, to: {}", copyFromFields, copyToFields); pipe = new Copy(pipe, copyFromFields, copyToFields); currentFields = currentFields.append(copyToFields); + logCurrentFields(currentFields); break; case rename: RenameOp renameOp = (RenameOp) transformOp; @@ -137,6 +158,7 @@ public void build() throws IOException { LOG.info("transform rename: from: {}, to: {}", renameFromFields, renameToFields); pipe = new Rename(pipe, renameFromFields, renameToFields); currentFields = currentFields.rename(renameFromFields, renameToFields); + logCurrentFields(currentFields); break; case discard: DiscardOp discardOp = (DiscardOp) transformOp; @@ -144,6 +166,7 @@ public void build() throws IOException { LOG.info("transform discard: fields: {}", discardFields); pipe = new Discard(pipe, discardFields); currentFields = currentFields.subtract(discardFields); + logCurrentFields(currentFields); break; } } @@ -156,6 +179,8 @@ public void build() throws IOException { if (partition.from().isPresent()) { pipe = new Copy(pipe, partition.from().get().fields(), partition.to().fields()); partitionFields = partitionFields.append(partition.to().fields()); + } else if (currentFields.contains(partition.to().fields())) { + partitionFields = partitionFields.append(partition.to().fields()); } else { pipe = new Coerce(pipe, partition.to().fields()); // change the type information @@ -164,7 +189,7 @@ public void build() throws IOException { } } - LOG.info("coercing into partitions fields: {}", partitionFields); + LOG.info("sink partitions fields: {}", partitionFields); // watch the progress on the console if (pipelineOptions().debug()) { @@ -175,15 +200,29 @@ public void build() throws IOException { SinkFactory sinkFactory = TapFactories.findSinkFactory(pipelineDef.sink()); + sinkFactory.applyGlobalProperties(commonProperties); + Tap sinkTap = sinkFactory.getSink(pipelineOptions, pipelineDef.sink(), currentFields); - flow = new LocalFlowConnector().connect(flowDef() + flow = new LocalFlowConnector(commonProperties).connect(flowDef() .setName("pipeline") .addSource(pipe, sourceTap) .addSink(pipe, sinkTap) .addTail(pipe)); } + @NotNull + protected Map getContext() { + Map context = LiteralResolver.context(); + context.put("source", pipelineDef.source()); + context.put("sink", pipelineDef.sink()); + return context; + } + + private static void logCurrentFields(Fields currentFields) { + LOG.info("current fields: {}", currentFields); + } + public Integer run() throws IOException { if (flow == null) { diff --git a/tessellate-main/src/main/java/io/clusterless/tessellate/pipeline/Transforms.java b/tessellate-main/src/main/java/io/clusterless/tessellate/pipeline/Transforms.java index e885159..1862499 100644 --- a/tessellate-main/src/main/java/io/clusterless/tessellate/pipeline/Transforms.java +++ b/tessellate-main/src/main/java/io/clusterless/tessellate/pipeline/Transforms.java @@ -10,6 +10,7 @@ public enum Transforms { insert, + eval, coerce, copy, rename, diff --git a/tessellate-main/src/main/java/io/clusterless/tessellate/util/LiteralResolver.java b/tessellate-main/src/main/java/io/clusterless/tessellate/util/LiteralResolver.java new file mode 100644 index 0000000..c4c09fd --- /dev/null +++ b/tessellate-main/src/main/java/io/clusterless/tessellate/util/LiteralResolver.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2023 Chris K Wensel . All Rights Reserved. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +package io.clusterless.tessellate.util; + +import org.mvel2.MVEL; + +import java.util.HashMap; +import java.util.Map; + +public class LiteralResolver { + private static Map context = new HashMap<>(); + + static { + context.put("env", System.getenv()); + context.put("sys", System.getProperties()); + } + + public static Map context() { + return new HashMap<>(context); + } + + public static T resolve(String expression, Class type) { + return resolve(expression, context, type); + } + + public static T resolve(String expression, Map context, Class type) { + return MVEL.eval(expression, context, type); + } +} diff --git a/tessellate-main/src/main/java/io/clusterless/tessellate/util/URIs.java b/tessellate-main/src/main/java/io/clusterless/tessellate/util/URIs.java index 0cea842..c164034 100644 --- a/tessellate-main/src/main/java/io/clusterless/tessellate/util/URIs.java +++ b/tessellate-main/src/main/java/io/clusterless/tessellate/util/URIs.java @@ -8,15 +8,19 @@ package io.clusterless.tessellate.util; +import org.apache.commons.lang3.StringUtils; +import org.jetbrains.annotations.NotNull; + import java.net.URI; import java.net.URISyntaxException; import java.nio.file.Paths; +import java.util.List; +import java.util.Objects; +import java.util.Set; import java.util.StringJoiner; -import java.util.regex.Pattern; +import java.util.stream.Collectors; public class URIs { - private static Pattern pattern = Pattern.compile("(\\{\\{(?!\\{)(.*)}}(?!}))"); - public static URI copyWithoutQuery(URI uri) { try { return new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), null, null); @@ -78,6 +82,10 @@ public static URI trim(URI uri, int trim) { String[] split = path.substring(1).split("/"); + if (split.length == trim) { + return copyWithPath(uri, "/"); + } + StringJoiner joiner = new StringJoiner("/", "/", "/"); for (int i = 0; i < split.length - trim; i++) { @@ -98,4 +106,20 @@ public static URI makeAbsolute(URI uri) { return Paths.get(uri.getPath()).toAbsolutePath().toUri(); } + + @NotNull + public static URI findCommonPrefix(List uris, int numPartitions) { + Set roots = uris.stream() + .map(u -> trim(u, numPartitions)) + .map(Objects::toString) + .collect(Collectors.toSet()); + + String commonPrefix = StringUtils.getCommonPrefix(roots.toArray(new String[0])); + + if (commonPrefix.isEmpty()) { + throw new IllegalArgumentException("to many unique roots, got: " + roots); + } + + return URI.create(commonPrefix); + } } diff --git a/tessellate-main/src/test/java/io/clusterless/tessellate/util/LiteralResolverTest.java b/tessellate-main/src/test/java/io/clusterless/tessellate/util/LiteralResolverTest.java new file mode 100644 index 0000000..f774cc5 --- /dev/null +++ b/tessellate-main/src/test/java/io/clusterless/tessellate/util/LiteralResolverTest.java @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2023 Chris K Wensel . All Rights Reserved. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +package io.clusterless.tessellate.util; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class LiteralResolverTest { + @Test + void resolveEnv() { + Assertions.assertEquals(System.getenv("USER"), LiteralResolver.resolve("env.USER", String.class)); + Assertions.assertEquals(System.getProperty("user.name"), LiteralResolver.resolve("sys['user.name']", String.class)); + } +} diff --git a/tessellate-main/src/test/java/io/clusterless/tessellate/util/URIsTest.java b/tessellate-main/src/test/java/io/clusterless/tessellate/util/URIsTest.java new file mode 100644 index 0000000..b5bfe04 --- /dev/null +++ b/tessellate-main/src/test/java/io/clusterless/tessellate/util/URIsTest.java @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2023 Chris K Wensel . All Rights Reserved. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +package io.clusterless.tessellate.util; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.net.URI; + +public class URIsTest { + @Test + void trim() { + Assertions.assertEquals(URI.create("s3://bucket/"), URIs.trim(URI.create("s3://bucket/path/"), 1)); + Assertions.assertEquals(URI.create("s3://bucket/path1/"), URIs.trim(URI.create("s3://bucket/path1/path2/"), 1)); + Assertions.assertEquals(URI.create("s3://bucket/path1/"), URIs.trim(URI.create("s3://bucket/path1/path2"), 1)); + Assertions.assertEquals(URI.create("s3://bucket/"), URIs.trim(URI.create("s3://bucket/path1/path2/"), 2)); + Assertions.assertEquals(URI.create("s3://bucket/"), URIs.trim(URI.create("s3://bucket/path1/path2"), 2)); + } +}