From 5d79a82bbb8887771d06ac492fec80de7729ccab Mon Sep 17 00:00:00 2001 From: Alex Zerntev Date: Fri, 14 Jul 2023 17:03:38 +0300 Subject: [PATCH] RD-9261 implement csv infer and read (#43) - Implemented Csv.InferAndRead - Added encoding to Json.InferAndRead --- .../services/raw.compiler.rql2.EntryExtension | 1 + .../compiler/rql2/builtin/CsvPackage.scala | 239 +++++++++++++++++- .../InMemoryLocationValueBuilder.scala | 38 +++ .../compiler/rql2/builtin/JsonPackage.scala | 34 +-- .../rql2/tests/builtin/CsvPackageTest.scala | 28 +- 5 files changed, 312 insertions(+), 28 deletions(-) create mode 100644 raw-compiler-rql2/src/main/scala/raw/compiler/rql2/builtin/InMemoryLocationValueBuilder.scala diff --git a/raw-compiler-rql2/src/main/resources/META-INF/services/raw.compiler.rql2.EntryExtension b/raw-compiler-rql2/src/main/resources/META-INF/services/raw.compiler.rql2.EntryExtension index 5c84cf540..f9d131e6c 100644 --- a/raw-compiler-rql2/src/main/resources/META-INF/services/raw.compiler.rql2.EntryExtension +++ b/raw-compiler-rql2/src/main/resources/META-INF/services/raw.compiler.rql2.EntryExtension @@ -9,6 +9,7 @@ raw.compiler.rql2.builtin.SQLServerInferAndQueryEntry raw.compiler.rql2.builtin.SnowflakeInferAndReadEntry raw.compiler.rql2.builtin.SnowflakeInferAndQueryEntry raw.compiler.rql2.builtin.CsvInferAndReadEntry +raw.compiler.rql2.builtin.CsvInferAndParseEntry raw.compiler.rql2.builtin.InferAndReadJsonEntry raw.compiler.rql2.builtin.InferAndParseJsonEntry raw.compiler.rql2.builtin.InferAndReadXmlEntry diff --git a/raw-compiler-rql2/src/main/scala/raw/compiler/rql2/builtin/CsvPackage.scala b/raw-compiler-rql2/src/main/scala/raw/compiler/rql2/builtin/CsvPackage.scala index 135b7310a..152756b9e 100644 --- a/raw-compiler-rql2/src/main/scala/raw/compiler/rql2/builtin/CsvPackage.scala +++ b/raw-compiler-rql2/src/main/scala/raw/compiler/rql2/builtin/CsvPackage.scala @@ -16,8 +16,8 @@ import raw.compiler.base.errors.{BaseError, UnsupportedType} import raw.compiler.base.source.{AnythingType, BaseNode, Type} import raw.compiler.common.source._ import raw.compiler.rql2._ -import raw.compiler.{EntryDoc, ExampleDoc, PackageDoc, ParamDoc, ReturnDoc, TypeDoc} import raw.compiler.rql2.source._ +import raw.compiler.{EntryDoc, ExampleDoc, PackageDoc, ParamDoc, ReturnDoc, TypeDoc} import raw.inferrer._ class CsvPackage extends PackageExtension { @@ -186,8 +186,7 @@ class CsvInferAndReadEntry extends SugarEntryExtension with CsvEntryExtensionHel ) ) = inputFormatDescriptor ) yield { - val preferNulls = - optionalArgs.collectFirst { case a if a._1 == "preferNulls" => a._2 }.map(getBoolValue).getOrElse(true) + val preferNulls = optionalArgs.collectFirst { case a if a._1 == "preferNulls" => a._2 }.forall(getBoolValue) val makeNullables = sampled && preferNulls val makeTryables = sampled @@ -234,7 +233,7 @@ class CsvInferAndReadEntry extends SugarEntryExtension with CsvEntryExtensionHel ) ) = r.right.get - val location = locationValueToExp(mandatoryArgs(0)) + val location = locationValueToExp(mandatoryArgs.head) val l0Args: Vector[FunAppArg] = Vector( Some(FunAppArg(location, None)), Some(FunAppArg(TypeExp(t), None)), @@ -403,6 +402,236 @@ class CsvReadEntry extends EntryExtension with CsvEntryExtensionHelper { } +class CsvInferAndParseEntry extends SugarEntryExtension with CsvEntryExtensionHelper { + + override def packageName: String = "Csv" + + override def entryName: String = "InferAndParse" + + override def docs: EntryDoc = EntryDoc( + "Reads a CSV using schema detection (inference).", + params = List( + ParamDoc( + "stringData", + typeDoc = TypeDoc(List("string")), + description = "The data in string format to infer and parsed." + ), + ParamDoc( + "sampleSize", + typeDoc = TypeDoc(List("int")), + description = "Specifies the number of rows to sample within the data.", + info = Some("""If a large `sampleSize` is used, the detection takes more time to complete, + |but has a higher chance of detecting the correct format. + |To force the detection to read the full data, set `sampleSize` to -1.""".stripMargin), + isOptional = true + ), + ParamDoc( + "encoding", + typeDoc = TypeDoc(List("string")), + description = """Specifies the encoding of the data.""", + info = Some("""If the encoding is not specified it is determined automatically.""".stripMargin), + isOptional = true + ), + ParamDoc( + "hasHeader", + typeDoc = TypeDoc(List("bool")), + description = """Specifies whether the data has a header or not, e.g. `true` or `false`.""", + info = Some("""If not specified, the inference tries to detect the presence of a header.""".stripMargin), + isOptional = true + ), + ParamDoc( + "delimiters", + typeDoc = TypeDoc(List("list")), + description = """Specifies a candidate list of delimiters, e.g. `[",", "|"]`.""", + info = Some("""If not specified, the inference tries to detect the delimiter.""".stripMargin), + isOptional = true + ), + ParamDoc( + "nulls", + typeDoc = TypeDoc(List("list")), + description = """Specifies a candidate list of strings to interpret as null, e.g. `["NA"]`.""", + info = Some("""If not specified, the inference does not detect nulls.""".stripMargin), + isOptional = true + ), + ParamDoc( + "nans", + typeDoc = TypeDoc(List("list")), + description = """Specifies a candidate list of strings to interpret as NaN, e.g. `["nan"]`.""", + info = Some("""If not specified, the inference does not detect NaNs.""".stripMargin), + isOptional = true + ), + ParamDoc( + "skip", + typeDoc = TypeDoc(List("int")), + description = "Number of rows to skip from the beginning of the data. Defaults to 0.", + isOptional = true + ), + ParamDoc( + "escape", + typeDoc = TypeDoc(List("string")), + description = """The escape character to use when parsing the CSV data, e.g. `"\\"`.""", + isOptional = true + ), + ParamDoc( + "quotes", + typeDoc = TypeDoc(List("string")), + description = """Specifies a candidate list of quote characters to interpret as quotes, e.g. `["\""]`.""", + info = Some("""If not specified, the inference tries to detect the quote char. + |f the set to `null` then no quote character is used.""".stripMargin), + isOptional = true + ), + ParamDoc( + "preferNulls", + typeDoc = TypeDoc(List("bool")), + description = + """If set to true and during inference the system does read the whole data, marks all fields as nullable. Defaults to true.""", + isOptional = true + ) + ), + examples = List(ExampleDoc("""Csv.InferAndParse(\"\"\"value1;value2\"\"\")""")), + ret = Some(ReturnDoc("A collection with the data parsed from the CSV string.", Some(TypeDoc(List("collection"))))) + ) + + override def nrMandatoryParams: Int = 1 + + override def getMandatoryParam(prevMandatoryArgs: Seq[Arg], idx: Int): Either[String, Param] = { + assert(idx == 0) + Right(ValueParam(Rql2StringType())) + } + + override def optionalParams: Option[Set[String]] = Some( + Set( + "sampleSize", + "encoding", + "hasHeader", + "delimiters", + "nulls", + "nans", + "skip", + "escape", + "quotes", + "preferNulls" + ) + ) + + override def getOptionalParam(prevMandatoryArgs: Seq[Arg], idn: String): Either[String, Param] = { + idn match { + case "sampleSize" => Right(ValueParam(Rql2IntType())) + case "encoding" => Right(ValueParam(Rql2StringType())) + case "hasHeader" => Right(ValueParam(Rql2BoolType())) + case "delimiters" => Right(ValueParam(Rql2ListType(Rql2StringType()))) + case "nulls" => Right(ValueParam(Rql2ListType(Rql2StringType()))) + case "nans" => Right(ValueParam(Rql2ListType(Rql2StringType()))) + case "skip" => Right(ValueParam(Rql2IntType())) + case "escape" => Right(ValueParam(Rql2StringType())) + case "quotes" => Right(ValueParam(Rql2ListType(Rql2StringType(Set(Rql2IsNullableTypeProperty()))))) + case "preferNulls" => Right(ValueParam(Rql2BoolType())) + } + } + + override def returnType( + mandatoryArgs: Seq[Arg], + optionalArgs: Seq[(String, Arg)], + varArgs: Seq[Arg] + )(implicit programContext: ProgramContext): Either[String, Type] = { + + val (locationArg, _) = InMemoryLocationValueBuilder.build(mandatoryArgs) + + for ( + inferrerProperties <- getCsvInferrerProperties(Seq(locationArg), optionalArgs); + inputFormatDescriptor <- programContext.infer(inferrerProperties); + TextInputStreamFormatDescriptor( + _, + _, + CsvInputFormatDescriptor( + SourceCollectionType(SourceRecordType(atts, _), _), + _, + _, + _, + _, + _, + _, + _, + _, + sampled, + _, + _, + _ + ) + ) = inputFormatDescriptor + ) yield { + val preferNulls = optionalArgs.collectFirst { case a if a._1 == "preferNulls" => a._2 }.forall(getBoolValue) + val makeNullables = sampled && preferNulls + val makeTryables = sampled + + val convertedAtts = atts.map(x => Rql2AttrType(x.idn, inferTypeToRql2Type(x.tipe, makeNullables, makeTryables))) + + // Here we are making the record not nullable and not tryable. + Rql2IterableType(Rql2RecordType(convertedAtts)) + + } + } + + override def desugar( + t: Type, + args: Seq[FunAppArg], + mandatoryArgs: Seq[Arg], + optionalArgs: Seq[(String, Arg)], + varArgs: Seq[Arg] + )(implicit programContext: ProgramContext): Exp = { + + val (locationArg, codeData) = InMemoryLocationValueBuilder.build(mandatoryArgs) + + val r = for ( + inferrerProperties <- getCsvInferrerProperties(Seq(locationArg), optionalArgs); + inputFormatDescriptor <- programContext.infer(inferrerProperties) + ) yield { + inputFormatDescriptor + } + + val TextInputStreamFormatDescriptor( + _, + _, + CsvInputFormatDescriptor( + _, + _, + sep, + nulls, + _, + nans, + skip, + escapeChar, + quoteChar, + _, + timeFormat, + dateFormat, + timestampFormat + ) + ) = r.right.get + + val l0Args: Vector[FunAppArg] = Vector( + Some(FunAppArg(StringConst(codeData), None)), + Some(FunAppArg(TypeExp(t), None)), + Some(FunAppArg(StringConst(sep.toString), Some("delimiter"))), + Some(FunAppArg(IntConst(skip.toString), Some("skip"))), + escapeChar.map(s => FunAppArg(StringConst(s.toString), Some("escape"))), + quoteChar.map(s => FunAppArg(StringConst(s.toString), Some("quote"))), + Some(FunAppArg(ListPackageBuilder.Build(nulls.map(StringConst): _*), Some("nulls"))), + Some(FunAppArg(ListPackageBuilder.Build(nans.map(StringConst): _*), Some("nans"))), + timeFormat.map(s => FunAppArg(StringConst(s), Some("timeFormat"))), + dateFormat.map(s => FunAppArg(StringConst(s), Some("dateFormat"))), + timestampFormat.map(s => FunAppArg(StringConst(s), Some("timestampFormat"))) + ).flatten + + FunApp( + Proj(PackageIdnExp("Csv"), "Parse"), + l0Args + ) + + } + +} + class CsvParseEntry extends EntryExtension with CsvEntryExtensionHelper { override def packageName: String = "Csv" @@ -555,7 +784,7 @@ trait CsvEntryExtensionHelper extends EntryExtensionHelper { ): Either[String, CsvInferrerProperties] = { Right( CsvInferrerProperties( - getLocationValue(mandatoryArgs(0)), + getLocationValue(mandatoryArgs.head), optionalArgs.collectFirst { case a if a._1 == "sampleSize" => a._2 }.map(getIntValue), optionalArgs .collectFirst { case a if a._1 == "encoding" => a._2 } diff --git a/raw-compiler-rql2/src/main/scala/raw/compiler/rql2/builtin/InMemoryLocationValueBuilder.scala b/raw-compiler-rql2/src/main/scala/raw/compiler/rql2/builtin/InMemoryLocationValueBuilder.scala new file mode 100644 index 000000000..e4581a1ea --- /dev/null +++ b/raw-compiler-rql2/src/main/scala/raw/compiler/rql2/builtin/InMemoryLocationValueBuilder.scala @@ -0,0 +1,38 @@ +/* + * Copyright 2023 RAW Labs S.A. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.txt. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0, included in the file + * licenses/APL.txt. + */ + +package raw.compiler.rql2.builtin + +import raw.compiler.rql2.{Arg, ValueArg} +import raw.sources.bytestream.in_memory.InMemoryByteStreamLocation +import raw.sources.{LocationSettingKey, LocationSettingValue} +import raw.runtime.interpreter.{LocationValue, StringValue} +import raw.sources.{LocationBinarySetting, LocationDescription} +import raw.compiler.rql2.source.Rql2LocationType + +object InMemoryLocationValueBuilder { + def build(mandatoryArgs: Seq[Arg]): (ValueArg, String) = { + val codeData = mandatoryArgs.head match { + case ValueArg(v, _) => v match { + case StringValue(innVal) => innVal + } + } + val settings = Map[LocationSettingKey, LocationSettingValue]( + ( + LocationSettingKey(InMemoryByteStreamLocation.codeDataKey), + LocationBinarySetting(codeData.getBytes()) + ) + ) + val locationDescription = LocationDescription(InMemoryByteStreamLocation.schemaWithColon, settings) + (ValueArg(LocationValue(locationDescription), Rql2LocationType()), codeData) + } +} diff --git a/raw-compiler-rql2/src/main/scala/raw/compiler/rql2/builtin/JsonPackage.scala b/raw-compiler-rql2/src/main/scala/raw/compiler/rql2/builtin/JsonPackage.scala index a3ba564c1..0af0023ab 100644 --- a/raw-compiler-rql2/src/main/scala/raw/compiler/rql2/builtin/JsonPackage.scala +++ b/raw-compiler-rql2/src/main/scala/raw/compiler/rql2/builtin/JsonPackage.scala @@ -19,9 +19,6 @@ import raw.compiler.rql2._ import raw.compiler.rql2.source._ import raw.compiler.{EntryDoc, ExampleDoc, PackageDoc, ParamDoc, ReturnDoc, TypeDoc} import raw.inferrer._ -import raw.runtime.interpreter.{LocationValue, StringValue} -import raw.sources.bytestream.in_memory.InMemoryByteStreamLocation -import raw.sources.{LocationBinarySetting, LocationDescription, LocationSettingKey, LocationSettingValue} class JsonPackage extends PackageExtension { @@ -279,7 +276,7 @@ class InferAndParseJsonEntry extends SugarEntryExtension with JsonEntryExtension ParamDoc( "stringData", TypeDoc(List("string")), - description = "The data in string format to infer and read." + description = "The data in string format to infer and parsed." ), ParamDoc( "sampleSize", @@ -290,6 +287,13 @@ class InferAndParseJsonEntry extends SugarEntryExtension with JsonEntryExtension |To force the detection to read the full data, set `sampleSize` to -1.""".stripMargin), isOptional = true ), + ParamDoc( + "encoding", + typeDoc = TypeDoc(List("string")), + description = """Specifies the encoding of the data.""", + info = Some("""If the encoding is not specified it is determined automatically.""".stripMargin), + isOptional = true + ), ParamDoc( "preferNulls", typeDoc = TypeDoc(List("bool")), @@ -311,6 +315,7 @@ class InferAndParseJsonEntry extends SugarEntryExtension with JsonEntryExtension override def optionalParams: Option[Set[String]] = Some( Set( "sampleSize", + "encoding", "preferNulls" ) ) @@ -318,6 +323,7 @@ class InferAndParseJsonEntry extends SugarEntryExtension with JsonEntryExtension override def getOptionalParam(prevMandatoryArgs: Seq[Arg], idn: String): Either[String, Param] = { idn match { case "sampleSize" => Right(ValueParam(Rql2IntType())) + case "encoding" => Right(ValueParam(Rql2StringType())) case "preferNulls" => Right(ValueParam(Rql2BoolType())) } } @@ -328,7 +334,7 @@ class InferAndParseJsonEntry extends SugarEntryExtension with JsonEntryExtension varArgs: Seq[Arg] )(implicit programContext: ProgramContext): Either[String, Type] = { - val (locationArg, _) = getInMemoryLocation(mandatoryArgs) + val (locationArg, _) = InMemoryLocationValueBuilder.build(mandatoryArgs) for ( inferrerProperties <- getJsonInferrerProperties(Seq(locationArg), optionalArgs); @@ -358,7 +364,7 @@ class InferAndParseJsonEntry extends SugarEntryExtension with JsonEntryExtension varArgs: Seq[Arg] )(implicit programContext: ProgramContext): Exp = { - val (locationArg, codeData) = getInMemoryLocation(mandatoryArgs) + val (locationArg, codeData) = InMemoryLocationValueBuilder.build(mandatoryArgs) val inputFormatDescriptor = for ( inferrerProperties <- getJsonInferrerProperties(Seq(locationArg), optionalArgs); @@ -553,22 +559,6 @@ trait JsonEntryExtensionHelper extends EntryExtensionHelper { ) } - protected def getInMemoryLocation(mandatoryArgs: Seq[Arg]): (ValueArg, String) = { - val codeData = mandatoryArgs.head match { - case ValueArg(v, _) => v match { - case StringValue(innVal) => innVal - } - } - val settings = Map[LocationSettingKey, LocationSettingValue]( - ( - LocationSettingKey(InMemoryByteStreamLocation.codeDataKey), - LocationBinarySetting(codeData.getBytes()) - ) - ) - val locationDescription = LocationDescription(InMemoryByteStreamLocation.schemaWithColon, settings) - (ValueArg(LocationValue(locationDescription), Rql2LocationType()), codeData) - } - protected def validateJsonType(t: Type): Either[Seq[UnsupportedType], Type] = t match { case _: Rql2LocationType | _: Rql2RegexType => Left(Seq(UnsupportedType(t, t, None))) case Rql2RecordType(atts, props) => diff --git a/raw-compiler-rql2/src/test/scala/raw/compiler/rql2/tests/builtin/CsvPackageTest.scala b/raw-compiler-rql2/src/test/scala/raw/compiler/rql2/tests/builtin/CsvPackageTest.scala index d79768fe9..34be8c87d 100644 --- a/raw-compiler-rql2/src/test/scala/raw/compiler/rql2/tests/builtin/CsvPackageTest.scala +++ b/raw-compiler-rql2/src/test/scala/raw/compiler/rql2/tests/builtin/CsvPackageTest.scala @@ -17,7 +17,7 @@ import raw.compiler.rql2.tests.{CompilerTestContext, FailAfterNServer} trait CsvPackageTest extends CompilerTestContext with FailAfterNServer { - val data = tempFile("""a|b|c + private val data = tempFile("""a|b|c |1|10|100 |2|20|200 |3|30|300""".stripMargin) @@ -479,4 +479,30 @@ trait CsvPackageTest extends CompilerTestContext with FailAfterNServer { it should evaluateTo("""[{a: 1, b: "tralala"}, {a: null, b: "ploum"}, {a: 3, b: "ploum"}, {a: 4, b: null}]""") ) + // Infer and Parse + val ttt = "\"\"\"" + test( + s"""Csv.InferAndParse("1,2,3")""" + )(_ should evaluateTo("""[{_1: 1, _2: 2, _3: 3}]""")) + + test( + s"""Csv.InferAndParse("1, 2, hello")""" + )(_ should evaluateTo("""[{_1: 1, _2: 2, _3: "hello"}]""")) + + test( + s"""Csv.InferAndParse("1, 2,")""" + )(_ should evaluateTo("""[{_1: 1, _2: 2, _3: null}]""")) + + test( + s"""Csv.InferAndParse("1;2;")""" + )(_ should evaluateTo("""[{_1: 1, _2: 2, _3: null}]""")) + + test( + s"""Csv.InferAndParse(${ttt}1;2\n3;hello$ttt, delimiters=[";","\\n"])""".stripMargin + )(_ should evaluateTo("""[{_1: 1, _2: "2"}, {_1: 3, _2: "hello"}]""")) + + test( + s"""Csv.InferAndParse(${ttt}1;2\n3;hello;5;;;;;;;$ttt, delimiters=[";","\\n"])""".stripMargin + )(_ should evaluateTo("""[]""")) + }