From 323e37560f98f2a3ea61413432579fdce28ca1e8 Mon Sep 17 00:00:00 2001 From: Dmytro Vitiuk Date: Fri, 29 Apr 2022 23:46:16 +0300 Subject: [PATCH] Initial commit for GCS Batch Source plugin metadata feature. --- .../plugin/batch/source/FTPBatchSource.java | 12 ++ .../plugin/batch/ETLMapReduceTestRun.java | 2 +- .../avro/AvroToStructuredTransformer.java | 5 +- .../avro/input/AvroInputFormatProvider.java | 107 ++++++------- .../input/PathTrackingAvroInputFormat.java | 32 +++- .../blob/input/BlobInputFormatProvider.java | 127 ++++++++++----- .../input/PathTrackingBlobInputFormat.java | 114 ++++++++------ .../io/cdap/plugin/format/FileFormat.java | 17 +- .../io/cdap/plugin/format/MetadataField.java | 52 +++++++ .../plugin/format/MetadataRecordReader.java | 72 +++++++++ .../format/input/PathTrackingConfig.java | 32 ++++ .../format/input/PathTrackingInputFormat.java | 45 +++++- .../PathTrackingInputFormatProvider.java | 58 ++++++- .../format/plugin/AbstractFileSource.java | 51 ++++++ .../plugin/AbstractFileSourceConfig.java | 28 ++++ .../format/plugin/FileSourceProperties.java | 8 + .../PathTrackingDelimitedInputFormat.java | 83 ++++++---- .../input/PathTrackingJsonInputFormat.java | 109 ++++++++----- .../input/ParquetInputFormatProvider.java | 121 +++++++++------ .../input/PathTrackingParquetInputFormat.java | 43 +++++- .../input/PathTrackingTextInputFormat.java | 20 ++- .../text/input/TextInputFormatProvider.java | 145 ++++++++++-------- 22 files changed, 951 insertions(+), 332 deletions(-) create mode 100644 format-common/src/main/java/io/cdap/plugin/format/MetadataField.java create mode 100644 format-common/src/main/java/io/cdap/plugin/format/MetadataRecordReader.java diff --git a/core-plugins/src/main/java/io/cdap/plugin/batch/source/FTPBatchSource.java b/core-plugins/src/main/java/io/cdap/plugin/batch/source/FTPBatchSource.java index 4de738b1c..3b0ce6ad4 100644 --- a/core-plugins/src/main/java/io/cdap/plugin/batch/source/FTPBatchSource.java +++ b/core-plugins/src/main/java/io/cdap/plugin/batch/source/FTPBatchSource.java @@ -209,6 +209,18 @@ public String getPathField() { return null; } + @Nullable + @Override + public String getLengthField() { + return null; + } + + @Nullable + @Override + public String getModificationTimeField() { + return null; + } + @Override public boolean useFilenameAsPath() { return false; diff --git a/core-plugins/src/test/java/io/cdap/plugin/batch/ETLMapReduceTestRun.java b/core-plugins/src/test/java/io/cdap/plugin/batch/ETLMapReduceTestRun.java index a6c6d5b88..11aeb7a70 100644 --- a/core-plugins/src/test/java/io/cdap/plugin/batch/ETLMapReduceTestRun.java +++ b/core-plugins/src/test/java/io/cdap/plugin/batch/ETLMapReduceTestRun.java @@ -52,7 +52,7 @@ * Tests for ETLBatch. */ public class ETLMapReduceTestRun extends ETLBatchTestBase { - private static final Schema TEXT_SCHEMA = TextInputFormatProvider.getDefaultSchema(null); + private static final Schema TEXT_SCHEMA = TextInputFormatProvider.getDefaultSchema(null, null, null); @Test public void testInvalidTransformConfigFailsToDeploy() { diff --git a/format-avro/src/main/java/io/cdap/plugin/format/avro/AvroToStructuredTransformer.java b/format-avro/src/main/java/io/cdap/plugin/format/avro/AvroToStructuredTransformer.java index 8db5f6e81..916dea094 100644 --- a/format-avro/src/main/java/io/cdap/plugin/format/avro/AvroToStructuredTransformer.java +++ b/format-avro/src/main/java/io/cdap/plugin/format/avro/AvroToStructuredTransformer.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.time.LocalDateTime; import java.time.format.DateTimeParseException; +import java.util.List; import java.util.Map; import javax.annotation.Nullable; @@ -51,11 +52,11 @@ public StructuredRecord transform(GenericRecord genericRecord, Schema structured } public StructuredRecord.Builder transform(GenericRecord genericRecord, Schema structuredSchema, - @Nullable String skipField) throws IOException { + @Nullable List skipFields) throws IOException { StructuredRecord.Builder builder = StructuredRecord.builder(structuredSchema); for (Schema.Field field : structuredSchema.getFields()) { String fieldName = field.getName(); - if (!fieldName.equals(skipField)) { + if (!skipFields.contains(fieldName)) { builder.set(fieldName, convertField(genericRecord.get(fieldName), field)); } } diff --git a/format-avro/src/main/java/io/cdap/plugin/format/avro/input/AvroInputFormatProvider.java b/format-avro/src/main/java/io/cdap/plugin/format/avro/input/AvroInputFormatProvider.java index 8f5051b59..34ae919d0 100644 --- a/format-avro/src/main/java/io/cdap/plugin/format/avro/input/AvroInputFormatProvider.java +++ b/format-avro/src/main/java/io/cdap/plugin/format/avro/input/AvroInputFormatProvider.java @@ -84,66 +84,61 @@ public static class Conf extends PathTrackingConfig { @Description(NAME_SCHEMA) public String schema; - } - - @Nullable - @Override - public Schema getSchema(FormatContext context) { - if (conf.containsMacro("schema")) { - return super.getSchema(context); - } - if (!Strings.isNullOrEmpty(conf.schema)) { - return super.getSchema(context); - } - String filePath = conf.getProperties().getProperties().getOrDefault("path", null); - if (filePath == null) { - return super.getSchema(context); - } - try { - return getDefaultSchema(context); - } catch (IOException e) { - throw new IllegalArgumentException("Invalid schema: " + e.getMessage(), e); - } - } - - /** - * Extract schema from file - * - * @param context {@link FormatContext} - * @return {@link Schema} - * @throws IOException raised when error occurs during schema extraction - */ - public Schema getDefaultSchema(@Nullable FormatContext context) throws IOException { - String filePath = conf.getProperties().getProperties().getOrDefault("path", null); - SeekableInput seekableInput = null; - FileReader dataFileReader = null; - try { - Job job = JobUtils.createInstance(); - Configuration hconf = job.getConfiguration(); - // set entries here, before FileSystem is used - for (Map.Entry entry : conf.getFileSystemProperties().entrySet()) { - hconf.set(entry.getKey(), entry.getValue()); + @Nullable + @Override + public Schema getSchema() { + if (containsMacro("schema")) { + return super.getSchema(); } - Path file = conf.getFilePathForSchemaGeneration(filePath, ".+\\.avro", hconf, job); - DatumReader dataReader = new GenericDatumReader<>(); - seekableInput = new FsInput(file, hconf); - dataFileReader = DataFileReader.openReader(seekableInput, dataReader); - GenericRecord firstRecord; - if (!dataFileReader.hasNext()) { - return null; + if (!Strings.isNullOrEmpty(schema)) { + return super.getSchema(); } - firstRecord = dataFileReader.next(); - return new AvroToStructuredTransformer().convertSchema(firstRecord.getSchema()); - } catch (IOException e) { - context.getFailureCollector().addFailure("Schema parse error", e.getMessage()); - } finally { - if (dataFileReader != null) { - dataFileReader.close(); + String filePath = getProperties().getProperties().getOrDefault("path", null); + if (filePath == null) { + return super.getSchema(); } - if (seekableInput != null) { - seekableInput.close(); + try { + return getDefaultSchema(); + } catch (IOException e) { + throw new IllegalArgumentException("Invalid schema: " + e.getMessage(), e); + } + } + + /** + * Extract schema from file + * + * @return {@link Schema} + * @throws IOException raised when error occurs during schema extraction + */ + public Schema getDefaultSchema() throws IOException { + String filePath = getProperties().getProperties().getOrDefault("path", null); + SeekableInput seekableInput = null; + FileReader dataFileReader = null; + try { + Job job = JobUtils.createInstance(); + Configuration hconf = job.getConfiguration(); + // set entries here, before FileSystem is used + for (Map.Entry entry : getFileSystemProperties().entrySet()) { + hconf.set(entry.getKey(), entry.getValue()); + } + Path file = getFilePathForSchemaGeneration(filePath, ".+\\.avro", hconf, job); + DatumReader dataReader = new GenericDatumReader<>(); + seekableInput = new FsInput(file, hconf); + dataFileReader = DataFileReader.openReader(seekableInput, dataReader); + GenericRecord firstRecord; + if (!dataFileReader.hasNext()) { + return null; + } + firstRecord = dataFileReader.next(); + return new AvroToStructuredTransformer().convertSchema(firstRecord.getSchema()); + } finally { + if (dataFileReader != null) { + dataFileReader.close(); + } + if (seekableInput != null) { + seekableInput.close(); + } } } - return null; } } diff --git a/format-avro/src/main/java/io/cdap/plugin/format/avro/input/PathTrackingAvroInputFormat.java b/format-avro/src/main/java/io/cdap/plugin/format/avro/input/PathTrackingAvroInputFormat.java index 8d5a9de3c..1f7d1846c 100644 --- a/format-avro/src/main/java/io/cdap/plugin/format/avro/input/PathTrackingAvroInputFormat.java +++ b/format-avro/src/main/java/io/cdap/plugin/format/avro/input/PathTrackingAvroInputFormat.java @@ -18,6 +18,7 @@ import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.plugin.format.MetadataField; import io.cdap.plugin.format.avro.AvroToStructuredTransformer; import io.cdap.plugin.format.input.PathTrackingInputFormat; import org.apache.avro.generic.GenericRecord; @@ -31,7 +32,9 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Map; import javax.annotation.Nullable; /** @@ -46,7 +49,17 @@ protected RecordReader createRecordReade RecordReader, NullWritable> delegate = (new AvroKeyInputFormat()) .createRecordReader(split, context); - return new AvroRecordReader(delegate, schema, pathField); + return new AvroRecordReader(delegate, schema, pathField, null); + } + + @Override + protected RecordReader createRecordReader( + FileSplit split, TaskAttemptContext context, @Nullable String pathField, Map metadataFields, + @Nullable Schema schema) throws IOException, InterruptedException { + + RecordReader, NullWritable> delegate = (new AvroKeyInputFormat()) + .createRecordReader(split, context); + return new AvroRecordReader(delegate, schema, pathField, metadataFields); } /** @@ -56,13 +69,15 @@ static class AvroRecordReader extends RecordReader, NullWritable> delegate; private final AvroToStructuredTransformer recordTransformer; private final String pathField; + private final Map metadataFields; private Schema schema; AvroRecordReader(RecordReader, NullWritable> delegate, @Nullable Schema schema, - @Nullable String pathField) { + @Nullable String pathField, @Nullable Map metadataFields) { this.delegate = delegate; this.schema = schema; this.pathField = pathField; + this.metadataFields = metadataFields == null ? Collections.EMPTY_MAP : metadataFields; this.recordTransformer = new AvroToStructuredTransformer(); } @@ -87,18 +102,25 @@ public StructuredRecord.Builder getCurrentValue() throws IOException, Interrupte // if schema is null, but we're still able to read, that means the file contains the schema information // set the schema based on the schema of the record if (schema == null) { - if (pathField == null) { + if (pathField == null && metadataFields.isEmpty()) { schema = Schema.parseJson(genericRecord.getSchema().toString()); } else { // if there is a path field, add the path as a field in the schema Schema schemaWithoutPath = Schema.parseJson(genericRecord.getSchema().toString()); List fields = new ArrayList<>(schemaWithoutPath.getFields().size() + 1); fields.addAll(schemaWithoutPath.getFields()); - fields.add(Schema.Field.of(pathField, Schema.of(Schema.Type.STRING))); + if (pathField != null) { + fields.add(Schema.Field.of(pathField, Schema.of(Schema.Type.STRING))); + } + for (String fieldName : metadataFields.keySet()) { + fields.add(Schema.Field.of(fieldName, Schema.of(metadataFields.get(fieldName).getSchemaType()))); + } schema = Schema.recordOf(schemaWithoutPath.getRecordName(), fields); } } - return recordTransformer.transform(genericRecord, schema, pathField); + List fieldsToExclude = new ArrayList<>(metadataFields.keySet()); + fieldsToExclude.add(pathField); + return recordTransformer.transform(genericRecord, schema, fieldsToExclude); } @Override diff --git a/format-blob/src/main/java/io/cdap/plugin/format/blob/input/BlobInputFormatProvider.java b/format-blob/src/main/java/io/cdap/plugin/format/blob/input/BlobInputFormatProvider.java index f0d92560a..15baad984 100644 --- a/format-blob/src/main/java/io/cdap/plugin/format/blob/input/BlobInputFormatProvider.java +++ b/format-blob/src/main/java/io/cdap/plugin/format/blob/input/BlobInputFormatProvider.java @@ -27,10 +27,16 @@ import io.cdap.cdap.etl.api.validation.ValidatingInputFormat; import io.cdap.plugin.format.input.PathTrackingConfig; import io.cdap.plugin.format.input.PathTrackingInputFormatProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import javax.annotation.Nullable; /** * Reads the entire contents of a File into a single record @@ -39,6 +45,8 @@ @Name(BlobInputFormatProvider.NAME) @Description(BlobInputFormatProvider.DESC) public class BlobInputFormatProvider extends PathTrackingInputFormatProvider { + private static final Logger LOG = LoggerFactory.getLogger(PathTrackingInputFormatProvider.class); + static final String NAME = "blob"; static final String DESC = "Plugin for reading files in blob format."; public static final PluginClass PLUGIN_CLASS = @@ -61,33 +69,50 @@ public void validate() { } Schema schema = conf.getSchema(); - String pathField = conf.getPathField(); - Schema.Field bodyField = schema.getField("body"); + + List fieldsToCheck = new ArrayList<>(); + fieldsToCheck.add(BlobConfig.NAME_BODY); + fieldsToCheck.add(conf.getPathField()); + try { + fieldsToCheck.add(conf.getLengthField()); + fieldsToCheck.add(conf.getModificationTimeField()); + } catch (NoSuchMethodError e) { + LOG.warn("'Length' and 'Modification Time' properties are not supported by plugin."); + } + Schema.Field bodyField = schema.getField(BlobConfig.NAME_BODY); if (bodyField == null) { - throw new IllegalArgumentException("The schema for the 'blob' format must have a field named 'body'"); + throw new IllegalArgumentException( + String.format("The schema for the 'blob' format must have a field named '%s'", BlobConfig.NAME_BODY)); } Schema bodySchema = bodyField.getSchema(); Schema.Type bodyType = bodySchema.isNullable() ? bodySchema.getNonNullable().getType() : bodySchema.getType(); if (bodyType != Schema.Type.BYTES) { - throw new IllegalArgumentException(String.format("The 'body' field must be of type 'bytes', but found '%s'", - bodyType.name().toLowerCase())); + throw new IllegalArgumentException(String.format("The '%s' field must be of type 'bytes', but found '%s'", + BlobConfig.NAME_BODY, bodyType.name().toLowerCase())); } // blob must contain 'body' as type 'bytes'. // it can optionally contain a path field of type 'string' - int numExpectedFields = pathField == null ? 1 : 2; + // it can optionally contain a length field of type 'long' + // it can optionally contain a modificationTime field of type 'long' + + List expectedFieldsList = fieldsToCheck + .stream() + .filter(Objects::nonNull) + .collect(Collectors.toList()); + + int numExpectedFields = expectedFieldsList.size(); int numFields = schema.getFields().size(); if (numFields > numExpectedFields) { - int numExtra = numFields - numExpectedFields; - if (pathField == null) { - throw new IllegalArgumentException( - String.format("The schema for the 'blob' format must only contain the 'body' field, " - + "but found %d other field%s.", numFields - 1, numExtra > 1 ? "s" : "")); - } else { - throw new IllegalArgumentException( - String.format("The schema for the 'blob' format must only contain the 'body' field and the '%s' field, " - + "but found %d other field%s.", pathField, numFields - 2, numExtra > 1 ? "s" : "")); - } + String expectedFields = expectedFieldsList.stream().map(Object::toString) + .collect(Collectors.joining("', '", "'", "'")); + + int numExtraFields = numFields - numExpectedFields; + throw new IllegalArgumentException( + String.format("The schema for the 'blob' format must only contain the %s field%s, " + + "but found %d other field%s", + expectedFields, numExpectedFields > 1 ? "s" : "", numExtraFields, + numExtraFields > 1 ? "s" : "")); } } @@ -107,41 +132,54 @@ public void validate(FormatContext context) { throw collector.getOrThrowException(); } - String pathField = conf.getPathField(); + List fieldsToCheck = new ArrayList<>(); + fieldsToCheck.add(BlobConfig.NAME_BODY); + fieldsToCheck.add(conf.getPathField()); + try { + fieldsToCheck.add(conf.getLengthField()); + fieldsToCheck.add(conf.getModificationTimeField()); + } catch (NoSuchMethodError e) { + LOG.warn("'Length' and 'Modification Time' properties are not supported by plugin."); + } Schema.Field bodyField = schema.getField(BlobConfig.NAME_BODY); if (bodyField == null) { - collector.addFailure("The schema for the 'blob' format must have a field named 'body' of type 'bytes'.", null) + collector.addFailure( + String.format("The schema for the 'blob' format must have a field named '%s' of type 'bytes'.", + BlobConfig.NAME_BODY), null) .withOutputSchemaField(BlobConfig.NAME_SCHEMA); } else { Schema bodySchema = bodyField.getSchema(); Schema nonNullableSchema = bodySchema.isNullable() ? bodySchema.getNonNullable() : bodySchema; if (nonNullableSchema.getType() != Schema.Type.BYTES) { - collector.addFailure( - String.format("Field 'body' is of unexpected type '%s'.", nonNullableSchema.getDisplayName()), + collector.addFailure(String.format("Field '%s' is of unexpected type '%s'.", + BlobConfig.NAME_BODY, nonNullableSchema.getDisplayName()), "Change type to 'bytes'.").withOutputSchemaField(BlobConfig.NAME_BODY); } } // blob must contain 'body' as type 'bytes'. // it can optionally contain a path field of type 'string' - int numExpectedFields = pathField == null ? 1 : 2; + // it can optionally contain a length field of type 'long' + List expectedFieldsList = fieldsToCheck + .stream() + .filter(Objects::nonNull) + .collect(Collectors.toList()); + + int numExpectedFields = expectedFieldsList.size(); int numFields = schema.getFields().size(); if (numFields > numExpectedFields) { for (Schema.Field field : schema.getFields()) { - if (pathField == null) { - if (!field.getName().equals(BlobConfig.NAME_BODY)) { - collector.addFailure("The schema for the 'blob' format must only contain the 'body' field.", - String.format("Remove additional field '%s'.", field.getName())) - .withOutputSchemaField(field.getName()); - } - } else { - if (!field.getName().equals(BlobConfig.NAME_BODY) && !field.getName().equals(pathField)) { - collector.addFailure( - String.format("The schema for the 'blob' format must only contain the 'body' field and '%s' field.", - pathField), String.format("Remove additional field '%s'.", field.getName())) - .withOutputSchemaField(field.getName()); - } + String expectedFields = expectedFieldsList.stream().map(Object::toString) + .collect(Collectors.joining(", ", "'", "'")); + + if (expectedFieldsList.contains(field.getName())) { + continue; } + + collector.addFailure( + String.format("The schema for the 'blob' format must only contain the '%s' field%s.", + expectedFields, expectedFields.length() > 1 ? "s" : ""), + String.format("Remove additional field '%s'.", field.getName())).withOutputSchemaField(field.getName()); } } } @@ -163,7 +201,17 @@ public Schema getSchema() { return null; } if (Strings.isNullOrEmpty(schema)) { - return getDefaultSchema(); + String lengthFieldResolved = null; + String modificationTimeFieldResolved = null; + + // this is required for back compatibility with File-based sources (File, FTP...) + try { + lengthFieldResolved = lengthField; + modificationTimeFieldResolved = modificationTimeField; + } catch (NoSuchFieldError e) { + LOG.warn("'Length' and 'Modification Time' properties are not supported by plugin."); + } + return getDefaultSchema(pathField, lengthFieldResolved, modificationTimeFieldResolved); } try { return Schema.parseJson(schema); @@ -172,12 +220,19 @@ public Schema getSchema() { } } - private Schema getDefaultSchema() { + private Schema getDefaultSchema(@Nullable String pathField, @Nullable String lengthField, + @Nullable String modificationTimeField) { List fields = new ArrayList<>(); fields.add(Schema.Field.of(NAME_BODY, Schema.of(Schema.Type.BYTES))); if (pathField != null && !pathField.isEmpty()) { fields.add(Schema.Field.of(pathField, Schema.of(Schema.Type.STRING))); } + if (lengthField != null && !lengthField.isEmpty()) { + fields.add(Schema.Field.of(lengthField, Schema.of(Schema.Type.LONG))); + } + if (modificationTimeField != null && !modificationTimeField.isEmpty()) { + fields.add(Schema.Field.of(modificationTimeField, Schema.of(Schema.Type.LONG))); + } return Schema.recordOf("blob", fields); } } diff --git a/format-blob/src/main/java/io/cdap/plugin/format/blob/input/PathTrackingBlobInputFormat.java b/format-blob/src/main/java/io/cdap/plugin/format/blob/input/PathTrackingBlobInputFormat.java index 636d98f8b..6bb5c957b 100644 --- a/format-blob/src/main/java/io/cdap/plugin/format/blob/input/PathTrackingBlobInputFormat.java +++ b/format-blob/src/main/java/io/cdap/plugin/format/blob/input/PathTrackingBlobInputFormat.java @@ -20,6 +20,7 @@ import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; import io.cdap.plugin.common.batch.JobUtils; +import io.cdap.plugin.format.MetadataField; import io.cdap.plugin.format.input.PathTrackingInputFormat; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; @@ -33,6 +34,7 @@ import java.io.IOException; import java.util.List; +import java.util.Map; import javax.annotation.Nullable; /** @@ -60,56 +62,82 @@ protected RecordReader createRecordReade if (split.getLength() > Integer.MAX_VALUE) { throw new IllegalArgumentException("Blob format cannot be used with files larger than 2GB"); } - return new RecordReader() { - boolean hasNext; - byte[] val; - - @Override - public void initialize(InputSplit split, TaskAttemptContext context) { - hasNext = true; - val = null; - } + return new BlobRecordReaded(split, context, schema); - @Override - public boolean nextKeyValue() throws IOException { - if (!hasNext) { - return false; - } - hasNext = false; - if (split.getLength() == 0) { - return false; - } - - Path path = split.getPath(); - FileSystem fs = path.getFileSystem(context.getConfiguration()); - try (FSDataInputStream input = fs.open(path)) { - val = new byte[(int) split.getLength()]; - ByteStreams.readFully(input, val); - } - return true; - } + } - @Override - public NullWritable getCurrentKey() { - return NullWritable.get(); - } + @Override + protected RecordReader createRecordReader(FileSplit split, + TaskAttemptContext context, + @Nullable String pathField, + Map + metadataFields, + @Nullable Schema schema) { + if (split.getLength() > Integer.MAX_VALUE) { + throw new IllegalArgumentException("Blob format cannot be used with files larger than 2GB"); + } + return new BlobRecordReaded(split, context, schema); - @Override - public StructuredRecord.Builder getCurrentValue() { - String fieldName = schema.getFields().iterator().next().getName(); - return StructuredRecord.builder(schema).set(fieldName, val); - } + } + + private class BlobRecordReaded extends RecordReader { + private final FileSplit split; + private final TaskAttemptContext context; + private final Schema schema; - @Override - public float getProgress() { - return 0.0f; + boolean hasNext; + byte[] val; + + private BlobRecordReaded(FileSplit split, TaskAttemptContext context, Schema schema) { + this.split = split; + this.context = context; + this.schema = schema; + } + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) { + hasNext = true; + val = null; + } + + @Override + public boolean nextKeyValue() throws IOException { + if (!hasNext) { + return false; + } + hasNext = false; + if (split.getLength() == 0) { + return false; } - @Override - public void close() { - // no-op + Path path = split.getPath(); + FileSystem fs = path.getFileSystem(context.getConfiguration()); + try (FSDataInputStream input = fs.open(path)) { + val = new byte[(int) split.getLength()]; + ByteStreams.readFully(input, val); } - }; + return true; + } + @Override + public NullWritable getCurrentKey() { + return NullWritable.get(); + } + + @Override + public StructuredRecord.Builder getCurrentValue() { + String fieldName = schema.getFields().iterator().next().getName(); + return StructuredRecord.builder(schema).set(fieldName, val); + } + + @Override + public float getProgress() { + return 0.0f; + } + + @Override + public void close() { + // no-op + } } } diff --git a/format-common/src/main/java/io/cdap/plugin/format/FileFormat.java b/format-common/src/main/java/io/cdap/plugin/format/FileFormat.java index b291d34e8..cf19e5e4f 100644 --- a/format-common/src/main/java/io/cdap/plugin/format/FileFormat.java +++ b/format-common/src/main/java/io/cdap/plugin/format/FileFormat.java @@ -60,10 +60,13 @@ public boolean canWrite() { * not require a specific schema. Should only be called for formats that can read. * * @param pathField the field of the file path, if it exists. + * @param lengthField the field of the file length, if it exists. + * @param modificationTimeField the field of the file length, if it exists. * @return the schema required by the format, if it exists */ @Nullable - public Schema getSchema(@Nullable String pathField) { + public Schema getSchema(@Nullable String pathField, @Nullable String lengthField, + @Nullable String modificationTimeField) { // TODO: move into the plugin formats once it is possible to instantiate them in the get schema methods. List fields = new ArrayList<>(3); switch (this) { @@ -73,12 +76,24 @@ public Schema getSchema(@Nullable String pathField) { if (pathField != null) { fields.add(Schema.Field.of(pathField, Schema.of(Schema.Type.STRING))); } + if (lengthField != null) { + fields.add(Schema.Field.of(lengthField, Schema.of(Schema.Type.LONG))); + } + if (modificationTimeField != null) { + fields.add(Schema.Field.of(modificationTimeField, Schema.of(Schema.Type.LONG))); + } return Schema.recordOf("text", fields); case BLOB: fields.add(Schema.Field.of("body", Schema.of(Schema.Type.BYTES))); if (pathField != null) { fields.add(Schema.Field.of(pathField, Schema.of(Schema.Type.STRING))); } + if (lengthField != null) { + fields.add(Schema.Field.of(lengthField, Schema.of(Schema.Type.LONG))); + } + if (modificationTimeField != null) { + fields.add(Schema.Field.of(modificationTimeField, Schema.of(Schema.Type.LONG))); + } return Schema.recordOf("text", fields); default: return null; diff --git a/format-common/src/main/java/io/cdap/plugin/format/MetadataField.java b/format-common/src/main/java/io/cdap/plugin/format/MetadataField.java new file mode 100644 index 000000000..7ca1ed71d --- /dev/null +++ b/format-common/src/main/java/io/cdap/plugin/format/MetadataField.java @@ -0,0 +1,52 @@ +/* + * Copyright © 2022 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.format; + +import io.cdap.cdap.api.data.schema.Schema; + +/** + * Collects all metadata fields available to retrieve. + */ +public enum MetadataField { + FILE_LENGTH("path.tracking.length.field", Schema.Type.LONG), + FILE_MODIFICATION_TIME("path.tracking.mod.field", Schema.Type.LONG); + + private final String confName; + private final Schema.Type schemaType; + + public String getConfName() { + return confName; + } + + public Schema.Type getSchemaType() { + return schemaType; + } + + MetadataField(String confName, Schema.Type schemaType) { + this.confName = confName; + this.schemaType = schemaType; + } + + public static MetadataField getMetadataField(String fieldName) { + for (MetadataField metadataField : MetadataField.values()) { + if (metadataField.name().equals(fieldName)) { + return metadataField; + } + } + throw new IllegalArgumentException("Invalid metadata field name: " + fieldName); + } +} diff --git a/format-common/src/main/java/io/cdap/plugin/format/MetadataRecordReader.java b/format-common/src/main/java/io/cdap/plugin/format/MetadataRecordReader.java new file mode 100644 index 000000000..54852bd36 --- /dev/null +++ b/format-common/src/main/java/io/cdap/plugin/format/MetadataRecordReader.java @@ -0,0 +1,72 @@ +/* + * Copyright © 2022 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.format; + +import io.cdap.cdap.api.data.format.StructuredRecord; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + +import java.io.IOException; + +/** + * This class is wrapper for file metadata (file length and modification time). + * @param + * @param + */ +public abstract class MetadataRecordReader extends RecordReader { + private long length; + private long modificationTime; + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { + Path file = ((FileSplit) split).getPath(); + final FileSystem fs = file.getFileSystem(context.getConfiguration()); + final FileStatus fileStatus = fs.getFileStatus(file); + + length = fileStatus.getLen(); + modificationTime = fileStatus.getModificationTime(); + } + + public long getLength() { + return length; + } + + public long getModificationTime() { + return modificationTime; + } + + public void populateMetadata(String fieldName, MetadataField metadataField, + StructuredRecord.Builder recordBuilder) { + Object result; + switch (metadataField) { + case FILE_LENGTH: + result = getLength(); + break; + case FILE_MODIFICATION_TIME: + result = getModificationTime(); + break; + default: + throw new IllegalArgumentException("Unable to populate metadata field: " + fieldName); + } + recordBuilder.set(fieldName, result); + } +} diff --git a/format-common/src/main/java/io/cdap/plugin/format/input/PathTrackingConfig.java b/format-common/src/main/java/io/cdap/plugin/format/input/PathTrackingConfig.java index 300d9ee1b..d0da12be0 100644 --- a/format-common/src/main/java/io/cdap/plugin/format/input/PathTrackingConfig.java +++ b/format-common/src/main/java/io/cdap/plugin/format/input/PathTrackingConfig.java @@ -51,6 +51,14 @@ public class PathTrackingConfig extends PluginConfig { "Output field to place the path of the file that the record was read from. " + "If not specified, the file path will not be included in output records. " + "If specified, the field must exist in the schema and be of type string."; + private static final String LENGTH_FIELD_DESC = + "Output field to place the length of the file that the record was read from. " + + "If not specified, the file length will not be included in output records. " + + "If specified, the field must exist in the schema and be of type long."; + private static final String MODIFICATION_TIME_FIELD_DESC = + "Output field to place the modification time of the file that the record was read from. " + + "If not specified, the file modification time will not be included in output records. " + + "If specified, the field must exist in the schema and be of type long."; private static final String FILENAME_ONLY_DESC = "Whether to only use the filename instead of the URI of the file path when a path field is given. " + "The default value is false."; @@ -64,6 +72,10 @@ public class PathTrackingConfig extends PluginConfig { fields.put("schema", new PluginPropertyField("schema", SCHEMA_DESC, "string", false, true)); fields.put("pathField", new PluginPropertyField("pathField", PATH_FIELD_DESC, "string", false, true)); + fields.put("lengthField", + new PluginPropertyField("lengthField", LENGTH_FIELD_DESC, "string", false, true)); + fields.put("modificationTimeField", + new PluginPropertyField("modificationTimeField", MODIFICATION_TIME_FIELD_DESC, "string", false, true)); fields.put("filenameOnly", new PluginPropertyField("filenameOnly", FILENAME_ONLY_DESC, "boolean", false, true)); FIELDS = Collections.unmodifiableMap(fields); @@ -79,6 +91,16 @@ public class PathTrackingConfig extends PluginConfig { @Description(PATH_FIELD_DESC) protected String pathField; + @Macro + @Nullable + @Description(LENGTH_FIELD_DESC) + protected String lengthField; + + @Macro + @Nullable + @Description(MODIFICATION_TIME_FIELD_DESC) + protected String modificationTimeField; + @Macro @Nullable @Description(FILENAME_ONLY_DESC) @@ -89,6 +111,16 @@ public String getPathField() { return pathField; } + @Nullable + public String getLengthField() { + return lengthField; + } + + @Nullable + public String getModificationTimeField() { + return modificationTimeField; + } + public boolean useFilenameOnly() { return filenameOnly == null ? false : filenameOnly; } diff --git a/format-common/src/main/java/io/cdap/plugin/format/input/PathTrackingInputFormat.java b/format-common/src/main/java/io/cdap/plugin/format/input/PathTrackingInputFormat.java index cda859a74..c537a9bc7 100644 --- a/format-common/src/main/java/io/cdap/plugin/format/input/PathTrackingInputFormat.java +++ b/format-common/src/main/java/io/cdap/plugin/format/input/PathTrackingInputFormat.java @@ -18,6 +18,8 @@ import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.plugin.format.MetadataField; +import io.cdap.plugin.format.MetadataRecordReader; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; @@ -30,6 +32,8 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import javax.annotation.Nullable; /** @@ -64,14 +68,15 @@ public RecordReader createRecordReader(InputSpli FileSplit fileSplit = (FileSplit) split; Configuration hConf = context.getConfiguration(); String pathField = hConf.get(PATH_FIELD); + Map metadataFields = extractMetadataFields(hConf); boolean userFilenameOnly = hConf.getBoolean(FILENAME_ONLY, false); String path = userFilenameOnly ? fileSplit.getPath().getName() : fileSplit.getPath().toUri().toString(); String schema = hConf.get(SCHEMA); Schema parsedSchema = schema == null ? null : Schema.parseJson(schema); - RecordReader delegate = createRecordReader(fileSplit, context, - pathField, parsedSchema); - return new TrackingRecordReader(delegate, pathField, path); + RecordReader delegate = createRecordReader(fileSplit, context, pathField, + metadataFields, parsedSchema); + return new TrackingRecordReader(delegate, pathField, metadataFields, path); } public RecordReader getDefaultRecordReaderDelegate(InputSplit split, @@ -88,27 +93,50 @@ public RecordReader getDefaultRecordReaderDelegate(InputSpli return delegate; } + /** + * Returns mapping "Field name" - "MetadataField enum" + * @param hConf + * @return + */ + private Map extractMetadataFields(Configuration hConf) { + Map metadataFields = new HashMap<>(); + for (MetadataField metadataField : MetadataField.values()) { + String configValue = hConf.get(metadataField.getConfName()); + if (configValue != null) { + metadataFields.put(configValue, MetadataField.getMetadataField(metadataField.name())); + } + } + return metadataFields; + } + + protected abstract RecordReader createRecordReader( + FileSplit split, TaskAttemptContext context, @Nullable String pathField, @Nullable Schema schema) + throws IOException, InterruptedException; + protected abstract RecordReader createRecordReader( - FileSplit split, TaskAttemptContext context, - @Nullable String pathField, @Nullable Schema schema) throws IOException, InterruptedException; + FileSplit split, TaskAttemptContext context, @Nullable String pathField, Map metadataFields, + @Nullable Schema schema) throws IOException, InterruptedException; /** * Supports adding a field to each record that contains the path of the file the record was read from. */ - static class TrackingRecordReader extends RecordReader { + static class TrackingRecordReader extends MetadataRecordReader { private final RecordReader delegate; + private final Map metadataFields; private final String pathField; private final String path; TrackingRecordReader(RecordReader delegate, - @Nullable String pathField, String path) { + @Nullable String pathField, Map metadataFields, String path) { this.delegate = delegate; this.pathField = pathField; + this.metadataFields = metadataFields; this.path = path; } @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { + super.initialize(split, context); delegate.initialize(split, context); } @@ -123,6 +151,9 @@ public StructuredRecord getCurrentValue() throws IOException, InterruptedExcepti if (pathField != null) { recordBuilder.set(pathField, path); } + for (String fieldName : metadataFields.keySet()) { + populateMetadata(fieldName, metadataFields.get(fieldName), recordBuilder); + } return recordBuilder.build(); } diff --git a/format-common/src/main/java/io/cdap/plugin/format/input/PathTrackingInputFormatProvider.java b/format-common/src/main/java/io/cdap/plugin/format/input/PathTrackingInputFormatProvider.java index 1fd05c66a..217845bf2 100644 --- a/format-common/src/main/java/io/cdap/plugin/format/input/PathTrackingInputFormatProvider.java +++ b/format-common/src/main/java/io/cdap/plugin/format/input/PathTrackingInputFormatProvider.java @@ -20,8 +20,14 @@ import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.validation.FormatContext; import io.cdap.cdap.etl.api.validation.ValidatingInputFormat; +import io.cdap.plugin.format.MetadataField; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; +import java.util.List; import java.util.Map; import javax.annotation.Nullable; @@ -31,6 +37,7 @@ * @param type of plugin config */ public abstract class PathTrackingInputFormatProvider implements ValidatingInputFormat { + private static final Logger LOG = LoggerFactory.getLogger(PathTrackingInputFormatProvider.class); private static final String NAME_SCHEMA = "schema"; protected T conf; @@ -45,6 +52,12 @@ public Map getInputFormatConfiguration() { properties.put(PathTrackingInputFormat.PATH_FIELD, conf.getPathField()); properties.put(PathTrackingInputFormat.FILENAME_ONLY, String.valueOf(conf.useFilenameOnly())); } + if (conf.getLengthField() != null) { + properties.put(MetadataField.FILE_LENGTH.getConfName(), conf.getLengthField()); + } + if (conf.getModificationTimeField() != null) { + properties.put(MetadataField.FILE_MODIFICATION_TIME.getConfName(), conf.getModificationTimeField()); + } if (conf.getSchema() != null) { properties.put(NAME_SCHEMA, conf.getSchema().toString()); } @@ -72,13 +85,56 @@ public void validate(FormatContext context) { public Schema getSchema(FormatContext context) { FailureCollector collector = context.getFailureCollector(); try { - return conf.getSchema(); + Schema formatSchema = conf.getSchema(); + if (formatSchema == null) { + return null; + } + String lengthFieldResolved = null; + String modificationTimeFieldResolved = null; + + // this is required for back compatibility with File-based sources (File, FTP...) + try { + lengthFieldResolved = conf.lengthField; + modificationTimeFieldResolved = conf.modificationTimeField; + } catch (NoSuchFieldError e) { + LOG.warn("A modern PathTrackingConfig is used with old plugin."); + } + + List fields = new ArrayList<>(formatSchema.getFields()); + + extendSchemaWithMetadataField(fields, conf.pathField, Schema.Type.STRING); + extendSchemaWithMetadataField(fields, lengthFieldResolved, Schema.Type.LONG); + extendSchemaWithMetadataField(fields, modificationTimeFieldResolved, Schema.Type.LONG); + + return Schema.recordOf("record", fields); } catch (Exception e) { collector.addFailure(e.getMessage(), null).withConfigProperty(NAME_SCHEMA).withStacktrace(e.getStackTrace()); } throw collector.getOrThrowException(); } + private void extendSchemaWithMetadataField(List fields, String fieldName, Schema.Type type) { + if (fieldName != null && !fieldName.isEmpty()) { + boolean addField = true; + + Iterator i = fields.iterator(); + while (i.hasNext()) { + Schema.Field field = i.next(); + if (field.getName().equals(fieldName)) { + if (field.getSchema().equals(Schema.of(type))) { + addField = false; + } else { + i.remove(); + } + } + } + + if (addField) { + fields.add(Schema.Field.of(fieldName, Schema.of(type))); + } + } + } + /** * Add any format specific properties required by the InputFormat. * diff --git a/format-common/src/main/java/io/cdap/plugin/format/plugin/AbstractFileSource.java b/format-common/src/main/java/io/cdap/plugin/format/plugin/AbstractFileSource.java index 9f6d13f14..554d79cf3 100644 --- a/format-common/src/main/java/io/cdap/plugin/format/plugin/AbstractFileSource.java +++ b/format-common/src/main/java/io/cdap/plugin/format/plugin/AbstractFileSource.java @@ -130,6 +130,8 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) { } validatePathField(collector, schema); + validateLengthField(collector, schema); + validateModificationTimeField(collector, schema); pipelineConfigurer.getStageConfigurer().setOutputSchema(schema); } @@ -156,6 +158,8 @@ public void prepareRun(BatchSourceContext context) throws Exception { FormatContext formatContext = new FormatContext(collector, context.getInputSchema()); validateInputFormatProvider(formatContext, fileFormat, validatingInputFormat); validatePathField(collector, validatingInputFormat.getSchema(formatContext)); + validateLengthField(collector, validatingInputFormat.getSchema(formatContext)); + validateModificationTimeField(collector, validatingInputFormat.getSchema(formatContext)); collector.getOrThrowException(); Job job = JobUtils.createInstance(); @@ -269,6 +273,53 @@ private void validatePathField(FailureCollector collector, Schema schema) { } } + private void validateLengthField(FailureCollector collector, Schema schema) { + String lengthField = config.getLengthField(); + if (lengthField != null && schema != null) { + Schema.Field schemaLengthField = schema.getField(lengthField); + if (schemaLengthField == null) { + collector.addFailure(String.format("Length field '%s' must exist in input schema.", lengthField), null) + .withConfigProperty(AbstractFileSourceConfig.LENGTH_FIELD); + throw collector.getOrThrowException(); + } + Schema lengthFieldSchema = schemaLengthField.getSchema(); + Schema nonNullableSchema = lengthFieldSchema.isNullable() ? lengthFieldSchema.getNonNullable() : + lengthFieldSchema; + + if (nonNullableSchema.getType() != Schema.Type.LONG) { + collector.addFailure(String.format("Length field '%s' is of unsupported type '%s'.", lengthField, + nonNullableSchema.getDisplayName()), + "It must be of type 'long'.") + .withConfigProperty(AbstractFileSourceConfig.LENGTH_FIELD) + .withOutputSchemaField(schemaLengthField.getName()); + } + } + } + + private void validateModificationTimeField(FailureCollector collector, Schema schema) { + String modificationTimeField = config.getModificationTimeField(); + if (modificationTimeField != null && schema != null) { + Schema.Field schemaModificationTimeField = schema.getField(modificationTimeField); + if (schemaModificationTimeField == null) { + collector.addFailure(String.format("Modification time field '%s' must exist in input schema.", + modificationTimeField), null) + .withConfigProperty(AbstractFileSourceConfig.MODIFICATION_TIME_FIELD); + throw collector.getOrThrowException(); + } + Schema modificationTimeFieldSchema = schemaModificationTimeField.getSchema(); + Schema nonNullableSchema = modificationTimeFieldSchema.isNullable() ? + modificationTimeFieldSchema.getNonNullable() : modificationTimeFieldSchema; + + if (nonNullableSchema.getType() != Schema.Type.LONG) { + collector.addFailure(String.format("Modification time field '%s' is of unsupported type '%s'.", + modificationTimeField, nonNullableSchema.getDisplayName()), + "It must be of type 'long'.") + .withConfigProperty(AbstractFileSourceConfig.MODIFICATION_TIME_FIELD) + .withOutputSchemaField(schemaModificationTimeField.getName()); + } + } + } + /** * Determines if the schema should be auto detected. This method must return false if any of the plugin properties * needed to determine the schema is a macro or not present. Otherwise this method should return true. diff --git a/format-common/src/main/java/io/cdap/plugin/format/plugin/AbstractFileSourceConfig.java b/format-common/src/main/java/io/cdap/plugin/format/plugin/AbstractFileSourceConfig.java index 75a3095d6..67af0b764 100644 --- a/format-common/src/main/java/io/cdap/plugin/format/plugin/AbstractFileSourceConfig.java +++ b/format-common/src/main/java/io/cdap/plugin/format/plugin/AbstractFileSourceConfig.java @@ -84,6 +84,22 @@ public abstract class AbstractFileSourceConfig extends PluginConfig implements F + "If specified, the field must exist in the output schema as a string.") private String pathField; + @Name(LENGTH_FIELD) + @Macro + @Nullable + @Description("Output field to place the length of the file that the record was read from. " + + "If not specified, the file length will not be included in output records. " + + "If specified, the field must exist in the output schema as a long.") + private String lengthField; + + @Name(MODIFICATION_TIME_FIELD) + @Macro + @Nullable + @Description("Output field to place the modification time of the file that the record was read from. " + + "If not specified, the file modification time will not be included in output records. " + + "If specified, the field must exist in the output schema as a long.") + private String modificationTimeField; + @Macro @Nullable @Description("Whether to only use the filename instead of the URI of the file path when a path field is given. " @@ -214,6 +230,18 @@ public String getPathField() { return pathField; } + @Nullable + @Override + public String getLengthField() { + return lengthField; + } + + @Override + @Nullable + public String getModificationTimeField() { + return modificationTimeField; + } + @Override public boolean useFilenameAsPath() { return filenameOnly; diff --git a/format-common/src/main/java/io/cdap/plugin/format/plugin/FileSourceProperties.java b/format-common/src/main/java/io/cdap/plugin/format/plugin/FileSourceProperties.java index 682219ffd..1a6696419 100644 --- a/format-common/src/main/java/io/cdap/plugin/format/plugin/FileSourceProperties.java +++ b/format-common/src/main/java/io/cdap/plugin/format/plugin/FileSourceProperties.java @@ -33,6 +33,8 @@ */ public interface FileSourceProperties { String PATH_FIELD = "pathField"; + String LENGTH_FIELD = "lengthField"; + String MODIFICATION_TIME_FIELD = "modificationTimeField"; /** * Validates the properties. @@ -122,6 +124,12 @@ default FileFormat getFormat() { @Nullable String getPathField(); + @Nullable + String getLengthField(); + + @Nullable + String getModificationTimeField(); + /** * Whether to only use the filename rather than the entire URI of the file path, * if {@link #getPathField()} is present. diff --git a/format-delimited/src/main/java/io/cdap/plugin/format/delimited/input/PathTrackingDelimitedInputFormat.java b/format-delimited/src/main/java/io/cdap/plugin/format/delimited/input/PathTrackingDelimitedInputFormat.java index 65d68a068..125c119db 100644 --- a/format-delimited/src/main/java/io/cdap/plugin/format/delimited/input/PathTrackingDelimitedInputFormat.java +++ b/format-delimited/src/main/java/io/cdap/plugin/format/delimited/input/PathTrackingDelimitedInputFormat.java @@ -22,6 +22,7 @@ import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; import io.cdap.plugin.common.SchemaValidator; +import io.cdap.plugin.format.MetadataField; import io.cdap.plugin.format.input.PathTrackingInputFormat; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; @@ -32,6 +33,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.IOException; import java.util.Iterator; +import java.util.Map; import javax.annotation.Nullable; /** @@ -50,38 +52,66 @@ protected RecordReader createRecordReade @Nullable String pathField, @Nullable Schema schema) { + return createRecordReader(split, context, pathField, null, schema); + } + + @Override + protected RecordReader createRecordReader(FileSplit split, + TaskAttemptContext context, + @Nullable String pathField, + Map metadataFields, + @Nullable Schema schema) { + RecordReader delegate = getDefaultRecordReaderDelegate(split, context); String delimiter = context.getConfiguration().get(DELIMITER); boolean skipHeader = context.getConfiguration().getBoolean(SKIP_HEADER, false); boolean enableQuotesValue = context.getConfiguration().getBoolean(ENABLE_QUOTES_VALUE, false); - return new RecordReader() { - - @Override - public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { - delegate.initialize(split, context); - } + return new DelimitedRecordReader(delegate, schema, delimiter, skipHeader, enableQuotesValue); + } - @Override - public boolean nextKeyValue() throws IOException, InterruptedException { - if (delegate.nextKeyValue()) { - // skip to next if the current record is header - if (skipHeader && delegate.getCurrentKey().get() == 0L) { - return delegate.nextKeyValue(); - } - return true; + private class DelimitedRecordReader extends RecordReader { + private final RecordReader delegate; + private final Schema schema; + private final String delimiter; + private final boolean skipHeader; + private final boolean enableQuotesValue; + + private DelimitedRecordReader(RecordReader delegate, Schema schema, String delimiter, + boolean skipHeader, boolean enableQuotesValue) { + this.delegate = delegate; + this.schema = schema; + this.delimiter = delimiter; + this.skipHeader = skipHeader; + this.enableQuotesValue = enableQuotesValue; + } + + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { + delegate.initialize(split, context); + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + if (delegate.nextKeyValue()) { + // skip to next if the current record is header + if (skipHeader && delegate.getCurrentKey().get() == 0L) { + return delegate.nextKeyValue(); } - return false; + return true; } + return false; + } - @Override - public NullWritable getCurrentKey() { - return NullWritable.get(); - } + @Override + public NullWritable getCurrentKey() { + return NullWritable.get(); + } - @Override - public StructuredRecord.Builder getCurrentValue() throws IOException, InterruptedException { - String delimitedString = delegate.getCurrentValue().toString(); + @Override + public StructuredRecord.Builder getCurrentValue() throws IOException, InterruptedException { + String delimitedString = delegate.getCurrentValue().toString(); StructuredRecord.Builder builder = StructuredRecord.builder(schema); Iterator fields = schema.getFields().iterator(); @@ -144,10 +174,9 @@ public float getProgress() throws IOException, InterruptedException { return delegate.getProgress(); } - @Override - public void close() throws IOException { - delegate.close(); - } - }; + @Override + public void close() throws IOException { + delegate.close(); + } } } diff --git a/format-json/src/main/java/io/cdap/plugin/format/json/input/PathTrackingJsonInputFormat.java b/format-json/src/main/java/io/cdap/plugin/format/json/input/PathTrackingJsonInputFormat.java index 279ba99a8..efca181cb 100644 --- a/format-json/src/main/java/io/cdap/plugin/format/json/input/PathTrackingJsonInputFormat.java +++ b/format-json/src/main/java/io/cdap/plugin/format/json/input/PathTrackingJsonInputFormat.java @@ -20,6 +20,7 @@ import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.format.StructuredRecordStringConverter; import io.cdap.plugin.common.SchemaValidator; +import io.cdap.plugin.format.MetadataField; import io.cdap.plugin.format.input.PathTrackingInputFormat; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; @@ -31,7 +32,9 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import java.util.Map; import javax.annotation.Nullable; /** @@ -40,17 +43,17 @@ public class PathTrackingJsonInputFormat extends PathTrackingInputFormat { - private Schema getModifiedSchema(Schema schema, @Nullable String pathField) { - // if the path field is set, it might not be nullable - // if it's not nullable, decoding a string into a StructuredRecord will fail because a non-nullable + private Schema getModifiedSchema(Schema schema, List metadataFields) { + // if the metadata fields (path, length, and modification time fields) are set, they might not be nullable + // if they are not nullable, decoding a value into a StructuredRecord will fail because a non-nullable // field will have a null value. - // so in these cases, a modified schema is used where the path field is nullable - if (pathField == null) { + // so in these cases, a modified schema is used where the metadata fields are nullable + if (metadataFields.isEmpty()) { return schema; } List fieldCopies = new ArrayList<>(schema.getFields().size()); for (Schema.Field field : schema.getFields()) { - if (field.getName().equals(pathField) && !field.getSchema().isNullable()) { + if (metadataFields.contains(field.getName()) && !field.getSchema().isNullable()) { fieldCopies.add(Schema.Field.of(field.getName(), Schema.nullableOf(field.getSchema()))); } else { fieldCopies.add(field); @@ -66,47 +69,73 @@ protected RecordReader createRecordReade @Nullable String pathField, @Nullable Schema schema) { RecordReader delegate = getDefaultRecordReaderDelegate(split, context); - Schema modifiedSchema = getModifiedSchema(schema, pathField); + Schema modifiedSchema = getModifiedSchema(schema, Arrays.asList(pathField)); - return new RecordReader() { + return new JsonRecordReader(delegate, modifiedSchema); + } - @Override - public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { - delegate.initialize(split, context); - } - @Override - public boolean nextKeyValue() throws IOException, InterruptedException { - return delegate.nextKeyValue(); - } + @Override + protected RecordReader createRecordReader(FileSplit split, + TaskAttemptContext context, + @Nullable String pathField, + Map + metadataFields, + @Nullable Schema schema) { + RecordReader delegate = getDefaultRecordReaderDelegate(split, context); - @Override - public NullWritable getCurrentKey() { - return NullWritable.get(); - } + List toNullableFields = new ArrayList<>(metadataFields.keySet()); + toNullableFields.add(pathField); + Schema modifiedSchema = getModifiedSchema(schema, toNullableFields); - @Override - public StructuredRecord.Builder getCurrentValue() throws IOException, InterruptedException { - String json = delegate.getCurrentValue().toString(); - StructuredRecord record = StructuredRecordStringConverter.fromJsonString(json, modifiedSchema); - StructuredRecord.Builder builder = StructuredRecord.builder(schema); - for (Schema.Field field : schema.getFields()) { - Object value = record.get(field.getName()); - SchemaValidator.validateDateTimeField(field.getSchema(), field.getName(), value); - builder.set(field.getName(), value); - } - return builder; - } + return new JsonRecordReader(delegate, modifiedSchema); + } - @Override - public float getProgress() throws IOException, InterruptedException { - return delegate.getProgress(); - } + private class JsonRecordReader extends RecordReader { + private final RecordReader delegate; + private final Schema modifiedSchema; - @Override - public void close() throws IOException { - delegate.close(); + private JsonRecordReader(RecordReader delegate, Schema modifiedSchema) { + this.delegate = delegate; + this.modifiedSchema = modifiedSchema; + } + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { + delegate.initialize(split, context); + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + return delegate.nextKeyValue(); + } + + @Override + public NullWritable getCurrentKey() { + return NullWritable.get(); + } + + @Override + public StructuredRecord.Builder getCurrentValue() throws IOException, InterruptedException { + String json = delegate.getCurrentValue().toString(); + StructuredRecord record = StructuredRecordStringConverter.fromJsonString(json, modifiedSchema); + StructuredRecord.Builder builder = StructuredRecord.builder(modifiedSchema); + for (Schema.Field field : modifiedSchema.getFields()) { + Object value = record.get(field.getName()); + SchemaValidator.validateDateTimeField(field.getSchema(), field.getName(), value); + builder.set(field.getName(), value); } - }; + return builder; + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return delegate.getProgress(); + } + + @Override + public void close() throws IOException { + delegate.close(); + } } } diff --git a/format-parquet/src/main/java/io/cdap/plugin/format/parquet/input/ParquetInputFormatProvider.java b/format-parquet/src/main/java/io/cdap/plugin/format/parquet/input/ParquetInputFormatProvider.java index b57249b7d..900a4edf4 100644 --- a/format-parquet/src/main/java/io/cdap/plugin/format/parquet/input/ParquetInputFormatProvider.java +++ b/format-parquet/src/main/java/io/cdap/plugin/format/parquet/input/ParquetInputFormatProvider.java @@ -23,7 +23,6 @@ import io.cdap.cdap.api.annotation.Plugin; import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.api.plugin.PluginClass; -import io.cdap.cdap.etl.api.validation.FormatContext; import io.cdap.cdap.etl.api.validation.ValidatingInputFormat; import io.cdap.plugin.common.batch.JobUtils; import io.cdap.plugin.format.input.PathTrackingConfig; @@ -34,8 +33,12 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.parquet.avro.AvroParquetReader; import org.apache.parquet.hadoop.ParquetReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import javax.annotation.Nullable; @@ -46,6 +49,7 @@ @Name(ParquetInputFormatProvider.NAME) @Description(ParquetInputFormatProvider.DESC) public class ParquetInputFormatProvider extends PathTrackingInputFormatProvider { + private static final Logger LOG = LoggerFactory.getLogger(PathTrackingInputFormatProvider.class); static final String NAME = "parquet"; static final String DESC = "Plugin for reading files in text format."; public static final PluginClass PLUGIN_CLASS = @@ -69,50 +73,6 @@ protected void addFormatProperties(Map properties) { } } - @Nullable - @Override - public Schema getSchema(FormatContext context) { - if (conf.containsMacro(PathTrackingConfig.NAME_SCHEMA) || !Strings.isNullOrEmpty(conf.schema)) { - return super.getSchema(context); - } - try { - return getDefaultSchema(context); - } catch (IOException e) { - throw new IllegalArgumentException("Invalid schema: " + e.getMessage(), e); - } - } - - /** - * Extract schema from file - * - * @param context {@link FormatContext} - * @return {@link Schema} - * @throws IOException raised when error occurs during schema extraction - */ - public Schema getDefaultSchema(FormatContext context) throws IOException { - String filePath = conf.getProperties().getProperties().getOrDefault("path", null); - ParquetReader reader = null; - try { - Job job = JobUtils.createInstance(); - Configuration hconf = job.getConfiguration(); - // set entries here, before FileSystem is used - for (Map.Entry entry : conf.getFileSystemProperties().entrySet()) { - hconf.set(entry.getKey(), entry.getValue()); - } - final Path file = conf.getFilePathForSchemaGeneration(filePath, ".+\\.parquet", hconf, job); - reader = AvroParquetReader.builder(file).build(); - GenericData.Record record = (GenericData.Record) reader.read(); - return Schema.parseJson(record.getSchema().toString()); - } catch (IOException e) { - context.getFailureCollector().addFailure("Schema error", e.getMessage()); - } finally { - if (reader != null) { - reader.close(); - } - } - return null; - } - /** * Common config for Parquet format */ @@ -122,5 +82,76 @@ public static class Conf extends PathTrackingConfig { @Nullable @Description(NAME_SCHEMA) public String schema; + + @Override + public Schema getSchema() { + if (containsMacro(NAME_SCHEMA)) { + return null; + } + if (Strings.isNullOrEmpty(schema)) { + try { + String lengthFieldResolved = null; + String modificationTimeFieldResolved = null; + + // this is required for back compatibility with File-based sources (File, FTP...) + try { + lengthFieldResolved = lengthField; + modificationTimeFieldResolved = modificationTimeField; + } catch (NoSuchFieldError e) { + LOG.warn("'Length' and 'Modification Time' properties are not supported by plugin."); + } + return getDefaultSchema(pathField, lengthFieldResolved, modificationTimeFieldResolved); + } catch (Exception e) { + throw new IllegalArgumentException("Invalid schema: " + e.getMessage(), e); + } + } + try { + return Schema.parseJson(schema); + } catch (IOException e) { + throw new IllegalArgumentException("Invalid schema: " + e.getMessage(), e); + } + } + + /** + * Extract schema from file + * + * @return {@link Schema} + * @throws IOException raised when error occurs during schema extraction + */ + public Schema getDefaultSchema(@Nullable String pathField, @Nullable String lengthField, + @Nullable String modificationTimeField) throws Exception { + String filePath = getProperties().getProperties().getOrDefault("path", null); + ParquetReader reader = null; + try { + Job job = JobUtils.createInstance(); + Configuration hconf = job.getConfiguration(); + // set entries here, before FileSystem is used + for (Map.Entry entry : getFileSystemProperties().entrySet()) { + hconf.set(entry.getKey(), entry.getValue()); + } + final Path file = getFilePathForSchemaGeneration(filePath, ".+\\.parquet", hconf, job); + reader = AvroParquetReader.builder(file).build(); + GenericData.Record record = (GenericData.Record) reader.read(); + + Schema schema = Schema.parseJson(record.getSchema().toString()); + List fields = new ArrayList<>(schema.getFields()); + + if (pathField != null && !pathField.isEmpty()) { + fields.add(Schema.Field.of(pathField, Schema.of(Schema.Type.STRING))); + } + if (lengthField != null && !lengthField.isEmpty()) { + fields.add(Schema.Field.of(lengthField, Schema.of(Schema.Type.LONG))); + } + if (modificationTimeField != null && !modificationTimeField.isEmpty()) { + fields.add(Schema.Field.of(modificationTimeField, Schema.of(Schema.Type.LONG))); + } + + return Schema.recordOf("record", fields); + } finally { + if (reader != null) { + reader.close(); + } + } + } } } diff --git a/format-parquet/src/main/java/io/cdap/plugin/format/parquet/input/PathTrackingParquetInputFormat.java b/format-parquet/src/main/java/io/cdap/plugin/format/parquet/input/PathTrackingParquetInputFormat.java index ee7b7b894..5b43fe46f 100644 --- a/format-parquet/src/main/java/io/cdap/plugin/format/parquet/input/PathTrackingParquetInputFormat.java +++ b/format-parquet/src/main/java/io/cdap/plugin/format/parquet/input/PathTrackingParquetInputFormat.java @@ -18,6 +18,8 @@ import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.plugin.format.MetadataField; +import io.cdap.plugin.format.MetadataRecordReader; import io.cdap.plugin.format.avro.AvroToStructuredTransformer; import io.cdap.plugin.format.input.PathTrackingInputFormat; import org.apache.avro.generic.GenericRecord; @@ -30,7 +32,9 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Map; import javax.annotation.Nullable; /** @@ -46,28 +50,46 @@ protected RecordReader createRecordReade throws IOException, InterruptedException { RecordReader delegate = (new AvroParquetInputFormat()) .createRecordReader(split, context); - return new ParquetRecordReader(delegate, schema, pathField); + return new ParquetRecordReader(delegate, schema, pathField, null); + } + + + + @Override + protected RecordReader createRecordReader(FileSplit split, + TaskAttemptContext context, + @Nullable String pathField, + Map + metadataFields, + @Nullable Schema schema) + throws IOException, InterruptedException { + RecordReader delegate = (new AvroParquetInputFormat()) + .createRecordReader(split, context); + return new ParquetRecordReader(delegate, schema, pathField, metadataFields); } /** * Transforms GenericRecords into StructuredRecord. */ - static class ParquetRecordReader extends RecordReader { + static class ParquetRecordReader extends MetadataRecordReader { private final RecordReader delegate; private final AvroToStructuredTransformer recordTransformer; private final String pathField; + private final Map metadataFields; private Schema schema; ParquetRecordReader(RecordReader delegate, @Nullable Schema schema, - @Nullable String pathField) { + @Nullable String pathField, @Nullable Map metadataFields) { this.delegate = delegate; this.schema = schema; this.pathField = pathField; + this.metadataFields = metadataFields == null ? Collections.EMPTY_MAP : metadataFields; this.recordTransformer = new AvroToStructuredTransformer(); } @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { + super.initialize(split, context); delegate.initialize(split, context); } @@ -87,18 +109,27 @@ public StructuredRecord.Builder getCurrentValue() throws IOException, Interrupte // if schema is null, but we're still able to read, that means the file contains the schema information // set the schema based on the schema of the record if (schema == null) { - if (pathField == null) { + if (pathField == null && metadataFields.isEmpty()) { schema = Schema.parseJson(genericRecord.getSchema().toString()); } else { // if there is a path field, add the path as a field in the schema Schema schemaWithoutPath = Schema.parseJson(genericRecord.getSchema().toString()); List fields = new ArrayList<>(schemaWithoutPath.getFields().size() + 1); fields.addAll(schemaWithoutPath.getFields()); - fields.add(Schema.Field.of(pathField, Schema.of(Schema.Type.STRING))); + if (pathField != null) { + fields.add(Schema.Field.of(pathField, Schema.of(Schema.Type.STRING))); + } + if (!metadataFields.isEmpty()) { + for (String fieldName : metadataFields.keySet()) { + fields.add(Schema.Field.of(fieldName, Schema.of(metadataFields.get(fieldName).getSchemaType()))); + } + } schema = Schema.recordOf(schemaWithoutPath.getRecordName(), fields); } } - return recordTransformer.transform(genericRecord, schema, pathField); + List fieldsToExclude = new ArrayList<>(metadataFields.keySet()); + fieldsToExclude.add(pathField); + return recordTransformer.transform(genericRecord, schema, fieldsToExclude); } @Override diff --git a/format-text/src/main/java/io/cdap/plugin/format/text/input/PathTrackingTextInputFormat.java b/format-text/src/main/java/io/cdap/plugin/format/text/input/PathTrackingTextInputFormat.java index 0e020d7bb..20d2caca2 100644 --- a/format-text/src/main/java/io/cdap/plugin/format/text/input/PathTrackingTextInputFormat.java +++ b/format-text/src/main/java/io/cdap/plugin/format/text/input/PathTrackingTextInputFormat.java @@ -18,6 +18,8 @@ import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.plugin.format.MetadataField; +import io.cdap.plugin.format.MetadataRecordReader; import io.cdap.plugin.format.input.PathTrackingInputFormat; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; @@ -26,9 +28,9 @@ import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; -import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import java.io.IOException; +import java.util.Map; import javax.annotation.Nullable; /** @@ -57,10 +59,23 @@ protected RecordReader createRecordReade return new TextRecordReader(delegate, schema, emittedHeader, header, skipHeader); } + @Override + protected RecordReader createRecordReader(FileSplit split, + TaskAttemptContext context, + @Nullable String pathField, + Map + metadataFields, + Schema schema) { + RecordReader delegate = getDefaultRecordReaderDelegate(split, context); + String header = context.getConfiguration().get(CombineTextInputFormat.HEADER); + boolean skipHeader = context.getConfiguration().getBoolean(CombineTextInputFormat.SKIP_HEADER, false); + return new TextRecordReader(delegate, schema, emittedHeader, header, skipHeader); + } + /** * Text record reader */ - static class TextRecordReader extends RecordReader { + static class TextRecordReader extends MetadataRecordReader { private final RecordReader delegate; private final Schema schema; private final String header; @@ -80,6 +95,7 @@ static class TextRecordReader extends RecordReader { + private static final Logger LOG = LoggerFactory.getLogger(PathTrackingInputFormatProvider.class); + static final String NAME = "text"; static final String DESC = "Plugin for reading files in text format."; public static final PluginClass PLUGIN_CLASS = @@ -69,13 +76,24 @@ protected void validate() { return; } - String pathField = conf.getPathField(); + List fieldsToCheck = new ArrayList<>(); + fieldsToCheck.add(TextConfig.NAME_BODY); + fieldsToCheck.add(TextConfig.NAME_OFFSET); + fieldsToCheck.add(conf.getPathField()); + try { + fieldsToCheck.add(conf.getLengthField()); + fieldsToCheck.add(conf.getModificationTimeField()); + } catch (NoSuchMethodError e) { + LOG.warn("'Length' and 'Modification Time' properties are not supported by plugin."); + } Schema schema = conf.getSchema(); // text must contain 'body' as type 'string'. // it can optionally contain a 'offset' field of type 'long' // it can optionally contain a path field of type 'string' - Schema.Field offsetField = schema.getField("offset"); + // it can optionally contain a length field of type 'long' + // it can optionally contain a modificationTime field of type 'long' + Schema.Field offsetField = schema.getField(TextConfig.NAME_OFFSET); if (offsetField != null) { Schema offsetSchema = offsetField.getSchema(); Schema.Type offsetType = offsetSchema.isNullable() ? offsetSchema.getNonNullable().getType() : @@ -86,45 +104,35 @@ protected void validate() { } } - Schema.Field bodyField = schema.getField("body"); + Schema.Field bodyField = schema.getField(TextConfig.NAME_BODY); if (bodyField == null) { - throw new IllegalArgumentException("The schema for the 'text' format must have a field named 'body'"); + throw new IllegalArgumentException( + String.format("The schema for the 'text' format must have a field named '%s'", TextConfig.NAME_BODY)); } Schema bodySchema = bodyField.getSchema(); Schema.Type bodyType = bodySchema.isNullable() ? bodySchema.getNonNullable().getType() : bodySchema.getType(); if (bodyType != Schema.Type.STRING) { - throw new IllegalArgumentException(String.format("The 'body' field must be of type 'string', but found '%s'", - bodyType.name().toLowerCase())); + throw new IllegalArgumentException(String.format("The '%s' field must be of type 'string', but found '%s'", + TextConfig.NAME_BODY, bodyType.name().toLowerCase())); } - // fields should be body (required), offset (optional), [pathfield] (optional) - boolean expectOffset = schema.getField("offset") != null; - boolean expectPath = pathField != null; - int numExpectedFields = 1; - if (expectOffset) { - numExpectedFields++; - } - if (expectPath) { - numExpectedFields++; - } - int maxExpectedFields = pathField == null ? 2 : 3; + // fields should be body (required), offset (optional), [pathfield, length, modificationTime] (optional) + List expectedFieldsList = fieldsToCheck + .stream() + .filter(Objects::nonNull) + .collect(Collectors.toList()); + + int numExpectedFields = expectedFieldsList.size(); int numFields = schema.getFields().size(); if (numFields > numExpectedFields) { - String expectedFields; - if (expectOffset && expectPath) { - expectedFields = String.format("'offset', 'body', and '%s' fields", pathField); - } else if (expectOffset) { - expectedFields = "'offset' and 'body' fields"; - } else if (expectPath) { - expectedFields = String.format("'body' and '%s' fields", pathField); - } else { - expectedFields = "'body' field"; - } + String expectedFields = expectedFieldsList.stream().map(Object::toString) + .collect(Collectors.joining("', '", "'", "'")); - int numExtraFields = numFields - maxExpectedFields; + int numExtraFields = numFields - numExpectedFields; throw new IllegalArgumentException( - String.format("The schema for the 'text' format must only contain the %s, but found %d other field%s", - expectedFields, numExtraFields, numExtraFields > 1 ? "s" : "")); + String.format("The schema for the 'text' format must only contain the %s field%s, but found %d other field%s", + expectedFields, numExpectedFields > 1 ? "s" : "", numExtraFields, + numExtraFields > 1 ? "s" : "")); } } @@ -144,10 +152,21 @@ public void validate(FormatContext context) { throw collector.getOrThrowException(); } - String pathField = conf.getPathField(); + List fieldsToCheck = new ArrayList<>(); + fieldsToCheck.add(TextConfig.NAME_BODY); + fieldsToCheck.add(TextConfig.NAME_OFFSET); + fieldsToCheck.add(conf.getPathField()); + try { + fieldsToCheck.add(conf.getLengthField()); + fieldsToCheck.add(conf.getModificationTimeField()); + } catch (NoSuchMethodError e) { + LOG.warn("'Length' and 'Modification Time' properties are not supported by plugin."); + } // text must contain 'body' as type 'string'. // it can optionally contain a 'offset' field of type 'long' // it can optionally contain a path field of type 'string' + // it can optionally contain a length field of type 'long' + // it can optionally contain a modificationTime field of type 'long' Schema.Field offsetField = schema.getField(TextConfig.NAME_OFFSET); if (offsetField != null) { Schema offsetSchema = offsetField.getSchema(); @@ -162,63 +181,59 @@ public void validate(FormatContext context) { Schema.Field bodyField = schema.getField(TextConfig.NAME_BODY); if (bodyField == null) { - collector.addFailure("The schema for the 'text' format must have a field named 'body'.", null) + collector.addFailure( + String.format("The schema for the 'text' format must have a field named '%s'.", TextConfig.NAME_BODY), null) .withConfigProperty(TextConfig.NAME_SCHEMA); } else { Schema bodySchema = bodyField.getSchema(); bodySchema = bodySchema.isNullable() ? bodySchema.getNonNullable() : bodySchema; Schema.Type bodyType = bodySchema.getType(); if (bodyType != Schema.Type.STRING) { - collector.addFailure( - String.format("The 'body' field is of unexpected type '%s'.'", bodySchema.getDisplayName()), + collector.addFailure(String.format("The '%s' field is of unexpected type '%s'.'", + TextConfig.NAME_BODY, bodySchema.getDisplayName()), "Change type to 'string'.").withOutputSchemaField(TextConfig.NAME_BODY); } } // fields should be body (required), offset (optional), [pathfield] (optional) - boolean expectOffset = schema.getField(TextConfig.NAME_OFFSET) != null; - boolean expectPath = pathField != null; - int numExpectedFields = 1; - if (expectOffset) { - numExpectedFields++; - } - if (expectPath) { - numExpectedFields++; - } + List expectedFieldsList = fieldsToCheck + .stream() + .filter(Objects::nonNull) + .collect(Collectors.toList()); + int numExpectedFields = expectedFieldsList.size(); int numFields = schema.getFields().size(); if (numFields > numExpectedFields) { for (Schema.Field field : schema.getFields()) { - String expectedFields; - if (expectOffset && expectPath) { - expectedFields = String.format("'offset', 'body', and '%s' fields", pathField); - } else if (expectOffset) { - expectedFields = "'offset' and 'body' fields"; - } else if (expectPath) { - expectedFields = String.format("'body' and '%s' fields", pathField); - } else { - expectedFields = "'body' field"; - } + String expectedFields = expectedFieldsList.stream().map(Object::toString) + .collect(Collectors.joining(", ", "'", "'")); - if (field.getName().equals(TextConfig.NAME_BODY) || (expectPath && field.getName().equals(pathField)) - || field.getName().equals(TextConfig.NAME_OFFSET)) { + if (expectedFieldsList.contains(field.getName())) { continue; } collector.addFailure( - String.format("The schema for the 'text' format must only contain the '%s'.", expectedFields), + String.format("The schema for the 'text' format must only contain the '%s' field%s.", + expectedFields, expectedFields.length() > 1 ? "s" : ""), String.format("Remove additional field '%s'.", field.getName())).withOutputSchemaField(field.getName()); } } } - public static Schema getDefaultSchema(@Nullable String pathField) { + public static Schema getDefaultSchema(@Nullable String pathField, @Nullable String lengthField, + @Nullable String modificationTimeField) { List fields = new ArrayList<>(); - fields.add(Schema.Field.of("offset", Schema.of(Schema.Type.LONG))); - fields.add(Schema.Field.of("body", Schema.of(Schema.Type.STRING))); + fields.add(Schema.Field.of(TextConfig.NAME_OFFSET, Schema.of(Schema.Type.LONG))); + fields.add(Schema.Field.of(TextConfig.NAME_BODY, Schema.of(Schema.Type.STRING))); if (pathField != null && !pathField.isEmpty()) { fields.add(Schema.Field.of(pathField, Schema.of(Schema.Type.STRING))); } + if (lengthField != null && !lengthField.isEmpty()) { + fields.add(Schema.Field.of(lengthField, Schema.of(Schema.Type.LONG))); + } + if (modificationTimeField != null && !modificationTimeField.isEmpty()) { + fields.add(Schema.Field.of(modificationTimeField, Schema.of(Schema.Type.LONG))); + } return Schema.recordOf("textfile", fields); } @@ -266,7 +281,17 @@ public Schema getSchema() { return null; } if (Strings.isNullOrEmpty(schema)) { - return getDefaultSchema(pathField); + String lengthFieldResolved = null; + String modificationTimeFieldResolved = null; + + // this is required for back compatibility with File-based sources (File, FTP...) + try { + lengthFieldResolved = lengthField; + modificationTimeFieldResolved = modificationTimeField; + } catch (NoSuchFieldError e) { + LOG.warn("'Length' and 'Modification Time' properties are not supported by plugin."); + } + return getDefaultSchema(pathField, lengthFieldResolved, modificationTimeFieldResolved); } try { return Schema.parseJson(schema);