Skip to content

Commit

Permalink
RD-8272 implement json infer and parse (#32)
Browse files Browse the repository at this point in the history
- Added `Json.InferAndParse` . For this entry a new sources package had
to be built called *raw-sources-in-memory* . It implements
ByteStreamLocation traits and instead of reading from URL like other
Location objects, it reads directly from memory. The schema for it is
called `in-memory:`.
- Fixed some warnings
- Removed deprecated sparkUrl field in Locations
  • Loading branch information
alexzerntev authored Jul 12, 2023
1 parent 101d64f commit f59dfdb
Show file tree
Hide file tree
Showing 17 changed files with 378 additions and 32 deletions.
10 changes: 8 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,10 @@ lazy val rawSourcesMock = (project in file("raw-sources-mock"))
.dependsOn(rawSourcesApi % "compile->compile;test->test")
.settings(strictBuildSettings)

lazy val rawSourcesInMemory = (project in file("raw-sources-in-memory"))
.dependsOn(rawSourcesApi % "compile->compile;test->test")
.settings(strictBuildSettings)

lazy val rawSourcesDropbox = (project in file("raw-sources-dropbox"))
.dependsOn(rawSourcesApi % "compile->compile;test->test")
.settings(
Expand Down Expand Up @@ -410,6 +414,7 @@ lazy val rawCompilerCommon = (project in file("raw-compiler-common"))
lazy val rawCompilerRql2 = (project in file("raw-compiler-rql2"))
.dependsOn(
rawCompilerCommon % "compile->compile;test->test",
rawSourcesInMemory % "compile->compile;test->test",
rawInferrerLocal % "test->test",
rawSourcesDropbox % "test->test",
rawSourcesHttp % "test->test",
Expand All @@ -421,7 +426,7 @@ lazy val rawCompilerRql2 = (project in file("raw-compiler-rql2"))
rawSourcesMsSQL % "test->test",
rawSourcesSnowflake % "test->test",
rawSourcesMock % "test->test",
rawSourcesGithub % "test->test"
rawSourcesGithub % "test->test",
)
.settings(
buildSettings // TODO (msb): Promote this to strictBuildSettings and add bail-out annotations as needed,
Expand Down Expand Up @@ -508,7 +513,8 @@ lazy val rawCli = (project in file("raw-cli"))
rawSourcesSqlite,
rawSourcesMsSQL,
rawSourcesSnowflake,
rawSourcesGithub
rawSourcesGithub,
rawSourcesInMemory
)
.settings(
buildSettings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ raw.compiler.rql2.builtin.SnowflakeInferAndReadEntry
raw.compiler.rql2.builtin.SnowflakeInferAndQueryEntry
raw.compiler.rql2.builtin.CsvInferAndReadEntry
raw.compiler.rql2.builtin.InferAndReadJsonEntry
raw.compiler.rql2.builtin.InferAndParseJsonEntry
raw.compiler.rql2.builtin.InferAndReadXmlEntry
raw.compiler.rql2.builtin.AvgCollectionEntry
raw.compiler.rql2.builtin.FindFirstCollectionEntry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@

package raw.compiler.rql2.builtin

import raw.compiler.{EntryDoc, ExampleDoc, PackageDoc, ParamDoc, ReturnDoc, TypeDoc}
import raw.compiler.base.errors.{BaseError, UnsupportedType}
import raw.compiler.base.source.{AnythingType, BaseNode, Type}
import raw.compiler.common.source._
import raw.compiler.rql2._
import raw.compiler.rql2.source._
import raw.compiler.{EntryDoc, ExampleDoc, PackageDoc, ParamDoc, ReturnDoc, TypeDoc}
import raw.inferrer._
import raw.runtime.interpreter.{LocationValue, StringValue}
import raw.sources.bytestream.in_memory.InMemoryByteStreamLocation
import raw.sources.{LocationBinarySetting, LocationDescription, LocationSettingKey, LocationSettingValue}

class JsonPackage extends PackageExtension {

Expand Down Expand Up @@ -109,8 +112,7 @@ class InferAndReadJsonEntry extends SugarEntryExtension with JsonEntryExtensionH
JsonInputFormatDescriptor(inferredType, sampled, _, _, _)
) = inputFormatDescriptor
) yield {
val preferNulls =
optionalArgs.collectFirst { case a if a._1 == "preferNulls" => a._2 }.map(getBoolValue).getOrElse(true)
val preferNulls = optionalArgs.collectFirst { case a if a._1 == "preferNulls" => a._2 }.forall(getBoolValue)
val makeNullables = preferNulls && sampled
val makeTryables = sampled

Expand Down Expand Up @@ -147,7 +149,7 @@ class InferAndReadJsonEntry extends SugarEntryExtension with JsonEntryExtensionH
)
) = inputFormatDescriptor.right.get

val location = locationValueToExp(mandatoryArgs(0))
val location = locationValueToExp(mandatoryArgs.head)
val args = Vector(
Some(FunAppArg(location, None)),
Some(FunAppArg(TypeExp(t), None)),
Expand Down Expand Up @@ -264,6 +266,136 @@ class ReadJsonEntry extends EntryExtension with JsonEntryExtensionHelper {

}

class InferAndParseJsonEntry extends SugarEntryExtension with JsonEntryExtensionHelper {

override def packageName: String = "Json"

override def entryName: String = "InferAndParse"

override def docs: EntryDoc = EntryDoc(
"Reads JSON data from a string with schema detection (inference).",
None,
params = List(
ParamDoc(
"stringData",
TypeDoc(List("string")),
description = "The data in string format to infer and read."
),
ParamDoc(
"sampleSize",
TypeDoc(List("int")),
description = """Specifies the number of objects to sample within the data.""",
info = Some("""If a large `sampleSize` is used, the detection takes more time to complete,
|but has a higher chance of detecting the correct format.
|To force the detection to read the full data, set `sampleSize` to -1.""".stripMargin),
isOptional = true
),
ParamDoc(
"preferNulls",
typeDoc = TypeDoc(List("bool")),
description =
"""If set to true and during inference the system does read the whole data, marks all fields as nullable. Defaults to true.""",
isOptional = true
)
),
examples = List(ExampleDoc("""Json.InferAndParse(\"\"\" {"hello" : "world"} \"\"\")"""))
)

override def nrMandatoryParams: Int = 1

override def getMandatoryParam(prevMandatoryArgs: Seq[Arg], idx: Int): Either[String, Param] = {
assert(idx == 0)
Right(ValueParam(Rql2StringType()))
}

override def optionalParams: Option[Set[String]] = Some(
Set(
"sampleSize",
"preferNulls"
)
)

override def getOptionalParam(prevMandatoryArgs: Seq[Arg], idn: String): Either[String, Param] = {
idn match {
case "sampleSize" => Right(ValueParam(Rql2IntType()))
case "preferNulls" => Right(ValueParam(Rql2BoolType()))
}
}

override def returnType(
mandatoryArgs: Seq[Arg],
optionalArgs: Seq[(String, Arg)],
varArgs: Seq[Arg]
)(implicit programContext: ProgramContext): Either[String, Type] = {

val (locationArg, _) = getInMemoryLocation(mandatoryArgs)

for (
inferrerProperties <- getJsonInferrerProperties(Seq(locationArg), optionalArgs);
inputFormatDescriptor <- programContext.infer(inferrerProperties);
TextInputStreamFormatDescriptor(
_,
_,
JsonInputFormatDescriptor(inferredType, sampled, _, _, _)
) = inputFormatDescriptor
) yield {
val preferNulls = optionalArgs.collectFirst { case a if a._1 == "preferNulls" => a._2 }.forall(getBoolValue)
val makeNullables = preferNulls && sampled
val makeTryables = sampled

inferTypeToRql2Type(inferredType, makeNullables, makeTryables) match {
case Rql2IterableType(rowType, _) => Rql2IterableType(rowType)
case other => addProp(other, Rql2IsTryableTypeProperty())
}
}
}

override def desugar(
t: Type,
args: Seq[FunAppArg],
mandatoryArgs: Seq[Arg],
optionalArgs: Seq[(String, Arg)],
varArgs: Seq[Arg]
)(implicit programContext: ProgramContext): Exp = {

val (locationArg, codeData) = getInMemoryLocation(mandatoryArgs)

val inputFormatDescriptor = for (
inferrerProperties <- getJsonInferrerProperties(Seq(locationArg), optionalArgs);
inputFormatDescriptor <- programContext.infer(inferrerProperties)
) yield {
inputFormatDescriptor
}

val TextInputStreamFormatDescriptor(
_,
_,
JsonInputFormatDescriptor(
_,
_,
timeFormat,
dateFormat,
timestampFormat
)
) = inputFormatDescriptor.right.get

val args = Vector(
Some(FunAppArg(StringConst(codeData), None)),
Some(FunAppArg(TypeExp(t), None)),
timeFormat.map(s => FunAppArg(StringConst(s), Some("timeFormat"))),
dateFormat.map(s => FunAppArg(StringConst(s), Some("dateFormat"))),
timestampFormat.map(s => FunAppArg(StringConst(s), Some("timestampFormat")))
).flatten

FunApp(
Proj(PackageIdnExp("Json"), "Parse"),
args
)

}

}

class ParseJsonEntry extends EntryExtension with JsonEntryExtensionHelper {

override def packageName: String = "Json"
Expand Down Expand Up @@ -394,7 +526,7 @@ class PrintJsonEntry extends EntryExtension with JsonEntryExtensionHelper {
varArgs: Seq[Arg]
)(implicit programContext: ProgramContext): Either[String, Type] = {
// Here we validate the type of the argument, and always return a string
val data = mandatoryArgs(0)
val data = mandatoryArgs.head
validateJsonType(data.t).right.map(_ => Rql2StringType()).left.map { errors =>
errors
.map { case UnsupportedType(_, t, _) => s"unsupported type ${SourcePrettyPrinter.format(t)}" }
Expand All @@ -412,7 +544,7 @@ trait JsonEntryExtensionHelper extends EntryExtensionHelper {
): Either[String, JsonInferrerProperties] = {
Right(
JsonInferrerProperties(
getLocationValue(mandatoryArgs(0)),
getLocationValue(mandatoryArgs.head),
optionalArgs.collectFirst { case a if a._1 == "sampleSize" => a._2 }.map(getIntValue),
optionalArgs
.collectFirst { case a if a._1 == "encoding" => a._2 }
Expand All @@ -421,6 +553,22 @@ trait JsonEntryExtensionHelper extends EntryExtensionHelper {
)
}

protected def getInMemoryLocation(mandatoryArgs: Seq[Arg]): (ValueArg, String) = {
val codeData = mandatoryArgs.head match {
case ValueArg(v, _) => v match {
case StringValue(innVal) => innVal
}
}
val settings = Map[LocationSettingKey, LocationSettingValue](
(
LocationSettingKey(InMemoryByteStreamLocation.codeDataKey),
LocationBinarySetting(codeData.getBytes())
)
)
val locationDescription = LocationDescription(InMemoryByteStreamLocation.schemaWithColon, settings)
(ValueArg(LocationValue(locationDescription), Rql2LocationType()), codeData)
}

protected def validateJsonType(t: Type): Either[Seq[UnsupportedType], Type] = t match {
case _: Rql2LocationType | _: Rql2RegexType => Left(Seq(UnsupportedType(t, t, None)))
case Rql2RecordType(atts, props) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ package raw.compiler.rql2.tests.builtin
import raw.compiler.RQLInterpolator
import raw.compiler.rql2.tests.{CompilerTestContext, FailAfterNServer}

import java.nio.file.Path

trait JsonPackageTest extends CompilerTestContext with FailAfterNServer {

override def failServices: Seq[FailAfter] = Seq(
Expand Down Expand Up @@ -209,6 +211,8 @@ trait JsonPackageTest extends CompilerTestContext with FailAfterNServer {
| Json.Parse("[1,2,3]", type t)
|""".stripMargin)(it => it should typeAs("collection(int)"))

test("""Json.InferAndParse("[1,2,3]")""".stripMargin)(it => it should typeAs("collection(int)"))

test("""
|let t = type int
|in
Expand Down Expand Up @@ -276,7 +280,7 @@ trait JsonPackageTest extends CompilerTestContext with FailAfterNServer {
_ should runErrorAs("path not found")
)

val jsonWithNulls = tempFile(
private val jsonWithNulls = tempFile(
"""[
| {"a": 1, "b": "1", "c": [1, 2, 3]},
| {"a": 1, "b": "1", "c": [1, 2, 3]},
Expand Down Expand Up @@ -338,7 +342,7 @@ trait JsonPackageTest extends CompilerTestContext with FailAfterNServer {
|]""".stripMargin)
)

val changeTypes = tempFile(
private val changeTypes = tempFile(
"""[
| {"a": 1, "b": "1", "c": [1, 2, 3]},
| {"a": 1, "b": "1", "c": [1, 2, 3]},
Expand Down Expand Up @@ -656,4 +660,21 @@ trait JsonPackageTest extends CompilerTestContext with FailAfterNServer {
s"""Json.Parse($ttt[{"a": [1,2,3], "c": null}]$ttt, type collection(record(a: list(int), b: undefined, c: undefined)))"""
)

// Infer and Parse
test(
s"""Json.InferAndParse("[1, 2, 3]")"""
)(_ should evaluateTo("[1, 2, 3]"))

test(
s"""let a = Json.InferAndParse(Json.Print({ a:1, b : { c : 2, d : 3 } })) in a.b.c"""
)(_ should evaluateTo("2"))

test(
s"""Json.InferAndParse("1, 2, 3]")"""
)(
_ should runErrorAs(
"Unexpected character (',' (code 44)): Expected space separating root-level values\n at [Source: (InputStreamReader); line: 1, column: 3]"
)
)

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ class DropboxPath(

override def rawUri: String = s"dropbox://${cli.name}$path"

override def sparkUri: String = throw new ByteStreamException("spark url not supported for Dropbox")

override def testAccess(): Unit = {
cli.testAccess(path)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ class GithubByteStreamLocation(
override val retryStrategy: RetryStrategy
) extends ByteStreamLocation
with StrictLogging {
override def sparkUri: String = http.sparkUri

override protected def doGetInputStream(): InputStream = http.getInputStream

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ class HttpByteStreamLocation(
new URI(url).normalize().toString
}

override val sparkUri: String = rawUri

override def testAccess(): Unit = {
// We are reading a single byte to ensure the connection is valid.
// By reading a single byte, we are actually doing a connection and authentication
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
raw.sources.bytestream.in_memory.InMemoryByteStreamLocationBuilder
Empty file.
Loading

0 comments on commit f59dfdb

Please sign in to comment.