Skip to content

Commit

Permalink
RD-9261 implement csv infer and read (#43)
Browse files Browse the repository at this point in the history
- Implemented Csv.InferAndRead
- Added encoding to Json.InferAndRead
  • Loading branch information
alexzerntev authored Jul 14, 2023
1 parent b78f20c commit 5d79a82
Show file tree
Hide file tree
Showing 5 changed files with 312 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ raw.compiler.rql2.builtin.SQLServerInferAndQueryEntry
raw.compiler.rql2.builtin.SnowflakeInferAndReadEntry
raw.compiler.rql2.builtin.SnowflakeInferAndQueryEntry
raw.compiler.rql2.builtin.CsvInferAndReadEntry
raw.compiler.rql2.builtin.CsvInferAndParseEntry
raw.compiler.rql2.builtin.InferAndReadJsonEntry
raw.compiler.rql2.builtin.InferAndParseJsonEntry
raw.compiler.rql2.builtin.InferAndReadXmlEntry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import raw.compiler.base.errors.{BaseError, UnsupportedType}
import raw.compiler.base.source.{AnythingType, BaseNode, Type}
import raw.compiler.common.source._
import raw.compiler.rql2._
import raw.compiler.{EntryDoc, ExampleDoc, PackageDoc, ParamDoc, ReturnDoc, TypeDoc}
import raw.compiler.rql2.source._
import raw.compiler.{EntryDoc, ExampleDoc, PackageDoc, ParamDoc, ReturnDoc, TypeDoc}
import raw.inferrer._

class CsvPackage extends PackageExtension {
Expand Down Expand Up @@ -186,8 +186,7 @@ class CsvInferAndReadEntry extends SugarEntryExtension with CsvEntryExtensionHel
)
) = inputFormatDescriptor
) yield {
val preferNulls =
optionalArgs.collectFirst { case a if a._1 == "preferNulls" => a._2 }.map(getBoolValue).getOrElse(true)
val preferNulls = optionalArgs.collectFirst { case a if a._1 == "preferNulls" => a._2 }.forall(getBoolValue)
val makeNullables = sampled && preferNulls
val makeTryables = sampled

Expand Down Expand Up @@ -234,7 +233,7 @@ class CsvInferAndReadEntry extends SugarEntryExtension with CsvEntryExtensionHel
)
) = r.right.get

val location = locationValueToExp(mandatoryArgs(0))
val location = locationValueToExp(mandatoryArgs.head)
val l0Args: Vector[FunAppArg] = Vector(
Some(FunAppArg(location, None)),
Some(FunAppArg(TypeExp(t), None)),
Expand Down Expand Up @@ -403,6 +402,236 @@ class CsvReadEntry extends EntryExtension with CsvEntryExtensionHelper {

}

class CsvInferAndParseEntry extends SugarEntryExtension with CsvEntryExtensionHelper {

override def packageName: String = "Csv"

override def entryName: String = "InferAndParse"

override def docs: EntryDoc = EntryDoc(
"Reads a CSV using schema detection (inference).",
params = List(
ParamDoc(
"stringData",
typeDoc = TypeDoc(List("string")),
description = "The data in string format to infer and parsed."
),
ParamDoc(
"sampleSize",
typeDoc = TypeDoc(List("int")),
description = "Specifies the number of rows to sample within the data.",
info = Some("""If a large `sampleSize` is used, the detection takes more time to complete,
|but has a higher chance of detecting the correct format.
|To force the detection to read the full data, set `sampleSize` to -1.""".stripMargin),
isOptional = true
),
ParamDoc(
"encoding",
typeDoc = TypeDoc(List("string")),
description = """Specifies the encoding of the data.""",
info = Some("""If the encoding is not specified it is determined automatically.""".stripMargin),
isOptional = true
),
ParamDoc(
"hasHeader",
typeDoc = TypeDoc(List("bool")),
description = """Specifies whether the data has a header or not, e.g. `true` or `false`.""",
info = Some("""If not specified, the inference tries to detect the presence of a header.""".stripMargin),
isOptional = true
),
ParamDoc(
"delimiters",
typeDoc = TypeDoc(List("list")),
description = """Specifies a candidate list of delimiters, e.g. `[",", "|"]`.""",
info = Some("""If not specified, the inference tries to detect the delimiter.""".stripMargin),
isOptional = true
),
ParamDoc(
"nulls",
typeDoc = TypeDoc(List("list")),
description = """Specifies a candidate list of strings to interpret as null, e.g. `["NA"]`.""",
info = Some("""If not specified, the inference does not detect nulls.""".stripMargin),
isOptional = true
),
ParamDoc(
"nans",
typeDoc = TypeDoc(List("list")),
description = """Specifies a candidate list of strings to interpret as NaN, e.g. `["nan"]`.""",
info = Some("""If not specified, the inference does not detect NaNs.""".stripMargin),
isOptional = true
),
ParamDoc(
"skip",
typeDoc = TypeDoc(List("int")),
description = "Number of rows to skip from the beginning of the data. Defaults to 0.",
isOptional = true
),
ParamDoc(
"escape",
typeDoc = TypeDoc(List("string")),
description = """The escape character to use when parsing the CSV data, e.g. `"\\"`.""",
isOptional = true
),
ParamDoc(
"quotes",
typeDoc = TypeDoc(List("string")),
description = """Specifies a candidate list of quote characters to interpret as quotes, e.g. `["\""]`.""",
info = Some("""If not specified, the inference tries to detect the quote char.
|f the set to `null` then no quote character is used.""".stripMargin),
isOptional = true
),
ParamDoc(
"preferNulls",
typeDoc = TypeDoc(List("bool")),
description =
"""If set to true and during inference the system does read the whole data, marks all fields as nullable. Defaults to true.""",
isOptional = true
)
),
examples = List(ExampleDoc("""Csv.InferAndParse(\"\"\"value1;value2\"\"\")""")),
ret = Some(ReturnDoc("A collection with the data parsed from the CSV string.", Some(TypeDoc(List("collection")))))
)

override def nrMandatoryParams: Int = 1

override def getMandatoryParam(prevMandatoryArgs: Seq[Arg], idx: Int): Either[String, Param] = {
assert(idx == 0)
Right(ValueParam(Rql2StringType()))
}

override def optionalParams: Option[Set[String]] = Some(
Set(
"sampleSize",
"encoding",
"hasHeader",
"delimiters",
"nulls",
"nans",
"skip",
"escape",
"quotes",
"preferNulls"
)
)

override def getOptionalParam(prevMandatoryArgs: Seq[Arg], idn: String): Either[String, Param] = {
idn match {
case "sampleSize" => Right(ValueParam(Rql2IntType()))
case "encoding" => Right(ValueParam(Rql2StringType()))
case "hasHeader" => Right(ValueParam(Rql2BoolType()))
case "delimiters" => Right(ValueParam(Rql2ListType(Rql2StringType())))
case "nulls" => Right(ValueParam(Rql2ListType(Rql2StringType())))
case "nans" => Right(ValueParam(Rql2ListType(Rql2StringType())))
case "skip" => Right(ValueParam(Rql2IntType()))
case "escape" => Right(ValueParam(Rql2StringType()))
case "quotes" => Right(ValueParam(Rql2ListType(Rql2StringType(Set(Rql2IsNullableTypeProperty())))))
case "preferNulls" => Right(ValueParam(Rql2BoolType()))
}
}

override def returnType(
mandatoryArgs: Seq[Arg],
optionalArgs: Seq[(String, Arg)],
varArgs: Seq[Arg]
)(implicit programContext: ProgramContext): Either[String, Type] = {

val (locationArg, _) = InMemoryLocationValueBuilder.build(mandatoryArgs)

for (
inferrerProperties <- getCsvInferrerProperties(Seq(locationArg), optionalArgs);
inputFormatDescriptor <- programContext.infer(inferrerProperties);
TextInputStreamFormatDescriptor(
_,
_,
CsvInputFormatDescriptor(
SourceCollectionType(SourceRecordType(atts, _), _),
_,
_,
_,
_,
_,
_,
_,
_,
sampled,
_,
_,
_
)
) = inputFormatDescriptor
) yield {
val preferNulls = optionalArgs.collectFirst { case a if a._1 == "preferNulls" => a._2 }.forall(getBoolValue)
val makeNullables = sampled && preferNulls
val makeTryables = sampled

val convertedAtts = atts.map(x => Rql2AttrType(x.idn, inferTypeToRql2Type(x.tipe, makeNullables, makeTryables)))

// Here we are making the record not nullable and not tryable.
Rql2IterableType(Rql2RecordType(convertedAtts))

}
}

override def desugar(
t: Type,
args: Seq[FunAppArg],
mandatoryArgs: Seq[Arg],
optionalArgs: Seq[(String, Arg)],
varArgs: Seq[Arg]
)(implicit programContext: ProgramContext): Exp = {

val (locationArg, codeData) = InMemoryLocationValueBuilder.build(mandatoryArgs)

val r = for (
inferrerProperties <- getCsvInferrerProperties(Seq(locationArg), optionalArgs);
inputFormatDescriptor <- programContext.infer(inferrerProperties)
) yield {
inputFormatDescriptor
}

val TextInputStreamFormatDescriptor(
_,
_,
CsvInputFormatDescriptor(
_,
_,
sep,
nulls,
_,
nans,
skip,
escapeChar,
quoteChar,
_,
timeFormat,
dateFormat,
timestampFormat
)
) = r.right.get

val l0Args: Vector[FunAppArg] = Vector(
Some(FunAppArg(StringConst(codeData), None)),
Some(FunAppArg(TypeExp(t), None)),
Some(FunAppArg(StringConst(sep.toString), Some("delimiter"))),
Some(FunAppArg(IntConst(skip.toString), Some("skip"))),
escapeChar.map(s => FunAppArg(StringConst(s.toString), Some("escape"))),
quoteChar.map(s => FunAppArg(StringConst(s.toString), Some("quote"))),
Some(FunAppArg(ListPackageBuilder.Build(nulls.map(StringConst): _*), Some("nulls"))),
Some(FunAppArg(ListPackageBuilder.Build(nans.map(StringConst): _*), Some("nans"))),
timeFormat.map(s => FunAppArg(StringConst(s), Some("timeFormat"))),
dateFormat.map(s => FunAppArg(StringConst(s), Some("dateFormat"))),
timestampFormat.map(s => FunAppArg(StringConst(s), Some("timestampFormat")))
).flatten

FunApp(
Proj(PackageIdnExp("Csv"), "Parse"),
l0Args
)

}

}

class CsvParseEntry extends EntryExtension with CsvEntryExtensionHelper {

override def packageName: String = "Csv"
Expand Down Expand Up @@ -555,7 +784,7 @@ trait CsvEntryExtensionHelper extends EntryExtensionHelper {
): Either[String, CsvInferrerProperties] = {
Right(
CsvInferrerProperties(
getLocationValue(mandatoryArgs(0)),
getLocationValue(mandatoryArgs.head),
optionalArgs.collectFirst { case a if a._1 == "sampleSize" => a._2 }.map(getIntValue),
optionalArgs
.collectFirst { case a if a._1 == "encoding" => a._2 }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2023 RAW Labs S.A.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.txt.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0, included in the file
* licenses/APL.txt.
*/

package raw.compiler.rql2.builtin

import raw.compiler.rql2.{Arg, ValueArg}
import raw.sources.bytestream.in_memory.InMemoryByteStreamLocation
import raw.sources.{LocationSettingKey, LocationSettingValue}
import raw.runtime.interpreter.{LocationValue, StringValue}
import raw.sources.{LocationBinarySetting, LocationDescription}
import raw.compiler.rql2.source.Rql2LocationType

object InMemoryLocationValueBuilder {
def build(mandatoryArgs: Seq[Arg]): (ValueArg, String) = {
val codeData = mandatoryArgs.head match {
case ValueArg(v, _) => v match {
case StringValue(innVal) => innVal
}
}
val settings = Map[LocationSettingKey, LocationSettingValue](
(
LocationSettingKey(InMemoryByteStreamLocation.codeDataKey),
LocationBinarySetting(codeData.getBytes())
)
)
val locationDescription = LocationDescription(InMemoryByteStreamLocation.schemaWithColon, settings)
(ValueArg(LocationValue(locationDescription), Rql2LocationType()), codeData)
}
}
Loading

0 comments on commit 5d79a82

Please sign in to comment.