diff --git a/build.sbt b/build.sbt index 748ad6173..dab95d516 100644 --- a/build.sbt +++ b/build.sbt @@ -318,6 +318,10 @@ lazy val rawSourcesMock = (project in file("raw-sources-mock")) .dependsOn(rawSourcesApi % "compile->compile;test->test") .settings(strictBuildSettings) +lazy val rawSourcesInMemory = (project in file("raw-sources-in-memory")) + .dependsOn(rawSourcesApi % "compile->compile;test->test") + .settings(strictBuildSettings) + lazy val rawSourcesDropbox = (project in file("raw-sources-dropbox")) .dependsOn(rawSourcesApi % "compile->compile;test->test") .settings( @@ -410,6 +414,7 @@ lazy val rawCompilerCommon = (project in file("raw-compiler-common")) lazy val rawCompilerRql2 = (project in file("raw-compiler-rql2")) .dependsOn( rawCompilerCommon % "compile->compile;test->test", + rawSourcesInMemory % "compile->compile;test->test", rawInferrerLocal % "test->test", rawSourcesDropbox % "test->test", rawSourcesHttp % "test->test", @@ -421,7 +426,7 @@ lazy val rawCompilerRql2 = (project in file("raw-compiler-rql2")) rawSourcesMsSQL % "test->test", rawSourcesSnowflake % "test->test", rawSourcesMock % "test->test", - rawSourcesGithub % "test->test" + rawSourcesGithub % "test->test", ) .settings( buildSettings // TODO (msb): Promote this to strictBuildSettings and add bail-out annotations as needed, @@ -508,7 +513,8 @@ lazy val rawCli = (project in file("raw-cli")) rawSourcesSqlite, rawSourcesMsSQL, rawSourcesSnowflake, - rawSourcesGithub + rawSourcesGithub, + rawSourcesInMemory ) .settings( buildSettings, diff --git a/raw-compiler-rql2/src/main/resources/META-INF/services/raw.compiler.rql2.EntryExtension b/raw-compiler-rql2/src/main/resources/META-INF/services/raw.compiler.rql2.EntryExtension index 5d986c611..5c84cf540 100644 --- a/raw-compiler-rql2/src/main/resources/META-INF/services/raw.compiler.rql2.EntryExtension +++ b/raw-compiler-rql2/src/main/resources/META-INF/services/raw.compiler.rql2.EntryExtension @@ -10,6 +10,7 @@ raw.compiler.rql2.builtin.SnowflakeInferAndReadEntry raw.compiler.rql2.builtin.SnowflakeInferAndQueryEntry raw.compiler.rql2.builtin.CsvInferAndReadEntry raw.compiler.rql2.builtin.InferAndReadJsonEntry +raw.compiler.rql2.builtin.InferAndParseJsonEntry raw.compiler.rql2.builtin.InferAndReadXmlEntry raw.compiler.rql2.builtin.AvgCollectionEntry raw.compiler.rql2.builtin.FindFirstCollectionEntry diff --git a/raw-compiler-rql2/src/main/scala/raw/compiler/rql2/builtin/JsonPackage.scala b/raw-compiler-rql2/src/main/scala/raw/compiler/rql2/builtin/JsonPackage.scala index 4af923158..a3ba564c1 100644 --- a/raw-compiler-rql2/src/main/scala/raw/compiler/rql2/builtin/JsonPackage.scala +++ b/raw-compiler-rql2/src/main/scala/raw/compiler/rql2/builtin/JsonPackage.scala @@ -12,13 +12,16 @@ package raw.compiler.rql2.builtin -import raw.compiler.{EntryDoc, ExampleDoc, PackageDoc, ParamDoc, ReturnDoc, TypeDoc} import raw.compiler.base.errors.{BaseError, UnsupportedType} import raw.compiler.base.source.{AnythingType, BaseNode, Type} import raw.compiler.common.source._ import raw.compiler.rql2._ import raw.compiler.rql2.source._ +import raw.compiler.{EntryDoc, ExampleDoc, PackageDoc, ParamDoc, ReturnDoc, TypeDoc} import raw.inferrer._ +import raw.runtime.interpreter.{LocationValue, StringValue} +import raw.sources.bytestream.in_memory.InMemoryByteStreamLocation +import raw.sources.{LocationBinarySetting, LocationDescription, LocationSettingKey, LocationSettingValue} class JsonPackage extends PackageExtension { @@ -109,8 +112,7 @@ class InferAndReadJsonEntry extends SugarEntryExtension with JsonEntryExtensionH JsonInputFormatDescriptor(inferredType, sampled, _, _, _) ) = inputFormatDescriptor ) yield { - val preferNulls = - optionalArgs.collectFirst { case a if a._1 == "preferNulls" => a._2 }.map(getBoolValue).getOrElse(true) + val preferNulls = optionalArgs.collectFirst { case a if a._1 == "preferNulls" => a._2 }.forall(getBoolValue) val makeNullables = preferNulls && sampled val makeTryables = sampled @@ -147,7 +149,7 @@ class InferAndReadJsonEntry extends SugarEntryExtension with JsonEntryExtensionH ) ) = inputFormatDescriptor.right.get - val location = locationValueToExp(mandatoryArgs(0)) + val location = locationValueToExp(mandatoryArgs.head) val args = Vector( Some(FunAppArg(location, None)), Some(FunAppArg(TypeExp(t), None)), @@ -264,6 +266,136 @@ class ReadJsonEntry extends EntryExtension with JsonEntryExtensionHelper { } +class InferAndParseJsonEntry extends SugarEntryExtension with JsonEntryExtensionHelper { + + override def packageName: String = "Json" + + override def entryName: String = "InferAndParse" + + override def docs: EntryDoc = EntryDoc( + "Reads JSON data from a string with schema detection (inference).", + None, + params = List( + ParamDoc( + "stringData", + TypeDoc(List("string")), + description = "The data in string format to infer and read." + ), + ParamDoc( + "sampleSize", + TypeDoc(List("int")), + description = """Specifies the number of objects to sample within the data.""", + info = Some("""If a large `sampleSize` is used, the detection takes more time to complete, + |but has a higher chance of detecting the correct format. + |To force the detection to read the full data, set `sampleSize` to -1.""".stripMargin), + isOptional = true + ), + ParamDoc( + "preferNulls", + typeDoc = TypeDoc(List("bool")), + description = + """If set to true and during inference the system does read the whole data, marks all fields as nullable. Defaults to true.""", + isOptional = true + ) + ), + examples = List(ExampleDoc("""Json.InferAndParse(\"\"\" {"hello" : "world"} \"\"\")""")) + ) + + override def nrMandatoryParams: Int = 1 + + override def getMandatoryParam(prevMandatoryArgs: Seq[Arg], idx: Int): Either[String, Param] = { + assert(idx == 0) + Right(ValueParam(Rql2StringType())) + } + + override def optionalParams: Option[Set[String]] = Some( + Set( + "sampleSize", + "preferNulls" + ) + ) + + override def getOptionalParam(prevMandatoryArgs: Seq[Arg], idn: String): Either[String, Param] = { + idn match { + case "sampleSize" => Right(ValueParam(Rql2IntType())) + case "preferNulls" => Right(ValueParam(Rql2BoolType())) + } + } + + override def returnType( + mandatoryArgs: Seq[Arg], + optionalArgs: Seq[(String, Arg)], + varArgs: Seq[Arg] + )(implicit programContext: ProgramContext): Either[String, Type] = { + + val (locationArg, _) = getInMemoryLocation(mandatoryArgs) + + for ( + inferrerProperties <- getJsonInferrerProperties(Seq(locationArg), optionalArgs); + inputFormatDescriptor <- programContext.infer(inferrerProperties); + TextInputStreamFormatDescriptor( + _, + _, + JsonInputFormatDescriptor(inferredType, sampled, _, _, _) + ) = inputFormatDescriptor + ) yield { + val preferNulls = optionalArgs.collectFirst { case a if a._1 == "preferNulls" => a._2 }.forall(getBoolValue) + val makeNullables = preferNulls && sampled + val makeTryables = sampled + + inferTypeToRql2Type(inferredType, makeNullables, makeTryables) match { + case Rql2IterableType(rowType, _) => Rql2IterableType(rowType) + case other => addProp(other, Rql2IsTryableTypeProperty()) + } + } + } + + override def desugar( + t: Type, + args: Seq[FunAppArg], + mandatoryArgs: Seq[Arg], + optionalArgs: Seq[(String, Arg)], + varArgs: Seq[Arg] + )(implicit programContext: ProgramContext): Exp = { + + val (locationArg, codeData) = getInMemoryLocation(mandatoryArgs) + + val inputFormatDescriptor = for ( + inferrerProperties <- getJsonInferrerProperties(Seq(locationArg), optionalArgs); + inputFormatDescriptor <- programContext.infer(inferrerProperties) + ) yield { + inputFormatDescriptor + } + + val TextInputStreamFormatDescriptor( + _, + _, + JsonInputFormatDescriptor( + _, + _, + timeFormat, + dateFormat, + timestampFormat + ) + ) = inputFormatDescriptor.right.get + + val args = Vector( + Some(FunAppArg(StringConst(codeData), None)), + Some(FunAppArg(TypeExp(t), None)), + timeFormat.map(s => FunAppArg(StringConst(s), Some("timeFormat"))), + dateFormat.map(s => FunAppArg(StringConst(s), Some("dateFormat"))), + timestampFormat.map(s => FunAppArg(StringConst(s), Some("timestampFormat"))) + ).flatten + + FunApp( + Proj(PackageIdnExp("Json"), "Parse"), + args + ) + + } + +} + class ParseJsonEntry extends EntryExtension with JsonEntryExtensionHelper { override def packageName: String = "Json" @@ -394,7 +526,7 @@ class PrintJsonEntry extends EntryExtension with JsonEntryExtensionHelper { varArgs: Seq[Arg] )(implicit programContext: ProgramContext): Either[String, Type] = { // Here we validate the type of the argument, and always return a string - val data = mandatoryArgs(0) + val data = mandatoryArgs.head validateJsonType(data.t).right.map(_ => Rql2StringType()).left.map { errors => errors .map { case UnsupportedType(_, t, _) => s"unsupported type ${SourcePrettyPrinter.format(t)}" } @@ -412,7 +544,7 @@ trait JsonEntryExtensionHelper extends EntryExtensionHelper { ): Either[String, JsonInferrerProperties] = { Right( JsonInferrerProperties( - getLocationValue(mandatoryArgs(0)), + getLocationValue(mandatoryArgs.head), optionalArgs.collectFirst { case a if a._1 == "sampleSize" => a._2 }.map(getIntValue), optionalArgs .collectFirst { case a if a._1 == "encoding" => a._2 } @@ -421,6 +553,22 @@ trait JsonEntryExtensionHelper extends EntryExtensionHelper { ) } + protected def getInMemoryLocation(mandatoryArgs: Seq[Arg]): (ValueArg, String) = { + val codeData = mandatoryArgs.head match { + case ValueArg(v, _) => v match { + case StringValue(innVal) => innVal + } + } + val settings = Map[LocationSettingKey, LocationSettingValue]( + ( + LocationSettingKey(InMemoryByteStreamLocation.codeDataKey), + LocationBinarySetting(codeData.getBytes()) + ) + ) + val locationDescription = LocationDescription(InMemoryByteStreamLocation.schemaWithColon, settings) + (ValueArg(LocationValue(locationDescription), Rql2LocationType()), codeData) + } + protected def validateJsonType(t: Type): Either[Seq[UnsupportedType], Type] = t match { case _: Rql2LocationType | _: Rql2RegexType => Left(Seq(UnsupportedType(t, t, None))) case Rql2RecordType(atts, props) => diff --git a/raw-compiler-rql2/src/test/scala/raw/compiler/rql2/tests/builtin/JsonPackageTest.scala b/raw-compiler-rql2/src/test/scala/raw/compiler/rql2/tests/builtin/JsonPackageTest.scala index c2e35718b..70ee9b64a 100644 --- a/raw-compiler-rql2/src/test/scala/raw/compiler/rql2/tests/builtin/JsonPackageTest.scala +++ b/raw-compiler-rql2/src/test/scala/raw/compiler/rql2/tests/builtin/JsonPackageTest.scala @@ -15,6 +15,8 @@ package raw.compiler.rql2.tests.builtin import raw.compiler.RQLInterpolator import raw.compiler.rql2.tests.{CompilerTestContext, FailAfterNServer} +import java.nio.file.Path + trait JsonPackageTest extends CompilerTestContext with FailAfterNServer { override def failServices: Seq[FailAfter] = Seq( @@ -209,6 +211,8 @@ trait JsonPackageTest extends CompilerTestContext with FailAfterNServer { | Json.Parse("[1,2,3]", type t) |""".stripMargin)(it => it should typeAs("collection(int)")) + test("""Json.InferAndParse("[1,2,3]")""".stripMargin)(it => it should typeAs("collection(int)")) + test(""" |let t = type int |in @@ -276,7 +280,7 @@ trait JsonPackageTest extends CompilerTestContext with FailAfterNServer { _ should runErrorAs("path not found") ) - val jsonWithNulls = tempFile( + private val jsonWithNulls = tempFile( """[ | {"a": 1, "b": "1", "c": [1, 2, 3]}, | {"a": 1, "b": "1", "c": [1, 2, 3]}, @@ -338,7 +342,7 @@ trait JsonPackageTest extends CompilerTestContext with FailAfterNServer { |]""".stripMargin) ) - val changeTypes = tempFile( + private val changeTypes = tempFile( """[ | {"a": 1, "b": "1", "c": [1, 2, 3]}, | {"a": 1, "b": "1", "c": [1, 2, 3]}, @@ -656,4 +660,21 @@ trait JsonPackageTest extends CompilerTestContext with FailAfterNServer { s"""Json.Parse($ttt[{"a": [1,2,3], "c": null}]$ttt, type collection(record(a: list(int), b: undefined, c: undefined)))""" ) + // Infer and Parse + test( + s"""Json.InferAndParse("[1, 2, 3]")""" + )(_ should evaluateTo("[1, 2, 3]")) + + test( + s"""let a = Json.InferAndParse(Json.Print({ a:1, b : { c : 2, d : 3 } })) in a.b.c""" + )(_ should evaluateTo("2")) + + test( + s"""Json.InferAndParse("1, 2, 3]")""" + )( + _ should runErrorAs( + "Unexpected character (',' (code 44)): Expected space separating root-level values\n at [Source: (InputStreamReader); line: 1, column: 3]" + ) + ) + } diff --git a/raw-sources-dropbox/src/main/scala/raw/sources/filesystem/dropbox/DropboxPath.scala b/raw-sources-dropbox/src/main/scala/raw/sources/filesystem/dropbox/DropboxPath.scala index 4da2d930d..852aa4813 100644 --- a/raw-sources-dropbox/src/main/scala/raw/sources/filesystem/dropbox/DropboxPath.scala +++ b/raw-sources-dropbox/src/main/scala/raw/sources/filesystem/dropbox/DropboxPath.scala @@ -30,8 +30,6 @@ class DropboxPath( override def rawUri: String = s"dropbox://${cli.name}$path" - override def sparkUri: String = throw new ByteStreamException("spark url not supported for Dropbox") - override def testAccess(): Unit = { cli.testAccess(path) } diff --git a/raw-sources-github/src/main/scala/raw/sources/bytestream/github/GithubByteStreamLocation.scala b/raw-sources-github/src/main/scala/raw/sources/bytestream/github/GithubByteStreamLocation.scala index 68a8ed4d6..5bcf81f70 100644 --- a/raw-sources-github/src/main/scala/raw/sources/bytestream/github/GithubByteStreamLocation.scala +++ b/raw-sources-github/src/main/scala/raw/sources/bytestream/github/GithubByteStreamLocation.scala @@ -26,7 +26,6 @@ class GithubByteStreamLocation( override val retryStrategy: RetryStrategy ) extends ByteStreamLocation with StrictLogging { - override def sparkUri: String = http.sparkUri override protected def doGetInputStream(): InputStream = http.getInputStream diff --git a/raw-sources-http/src/main/scala/raw/sources/bytestream/http/HttpByteStreamLocation.scala b/raw-sources-http/src/main/scala/raw/sources/bytestream/http/HttpByteStreamLocation.scala index 6f60fb097..4fee7aeb3 100644 --- a/raw-sources-http/src/main/scala/raw/sources/bytestream/http/HttpByteStreamLocation.scala +++ b/raw-sources-http/src/main/scala/raw/sources/bytestream/http/HttpByteStreamLocation.scala @@ -37,8 +37,6 @@ class HttpByteStreamLocation( new URI(url).normalize().toString } - override val sparkUri: String = rawUri - override def testAccess(): Unit = { // We are reading a single byte to ensure the connection is valid. // By reading a single byte, we are actually doing a connection and authentication diff --git a/raw-sources-in-memory/src/main/resources/META-INF/services/raw.sources.bytestream.ByteStreamLocationBuilder b/raw-sources-in-memory/src/main/resources/META-INF/services/raw.sources.bytestream.ByteStreamLocationBuilder new file mode 100644 index 000000000..407c4fe54 --- /dev/null +++ b/raw-sources-in-memory/src/main/resources/META-INF/services/raw.sources.bytestream.ByteStreamLocationBuilder @@ -0,0 +1 @@ +raw.sources.bytestream.in_memory.InMemoryByteStreamLocationBuilder \ No newline at end of file diff --git a/raw-sources-in-memory/src/main/resources/reference.conf b/raw-sources-in-memory/src/main/resources/reference.conf new file mode 100644 index 000000000..e69de29bb diff --git a/raw-sources-in-memory/src/main/scala/raw/sources/bytestream/in_memory/InMemoryByteStreamLocation.scala b/raw-sources-in-memory/src/main/scala/raw/sources/bytestream/in_memory/InMemoryByteStreamLocation.scala new file mode 100644 index 000000000..c1735b97f --- /dev/null +++ b/raw-sources-in-memory/src/main/scala/raw/sources/bytestream/in_memory/InMemoryByteStreamLocation.scala @@ -0,0 +1,62 @@ +/* + * Copyright 2023 RAW Labs S.A. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.txt. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0, included in the file + * licenses/APL.txt. + */ + +package raw.sources.bytestream.in_memory + +import raw.sources._ +import raw.sources.bytestream.{ + ByteStreamLocation, + DelegatingSeekableInputStream, + GenericSkippableInputStream, + SeekableInputStream +} + +import java.io.{ByteArrayInputStream, InputStream} +import java.nio.file.Path + +object InMemoryByteStreamLocation { + val schema = "in-memory" + val schemaWithColon = s"$schema:" + val codeDataKey = "code-data" +} + +class InMemoryByteStreamLocation( + locationDescription: LocationDescription +) extends ByteStreamLocation { + + override protected def doGetInputStream(): InputStream = { + assert(locationDescription.settings.contains(LocationSettingKey(InMemoryByteStreamLocation.codeDataKey))) + + val LocationBinarySetting(v) = + locationDescription.settings(LocationSettingKey(InMemoryByteStreamLocation.codeDataKey)) + + new ByteArrayInputStream(v.toArray) + } + + override protected def doGetSeekableInputStream(): SeekableInputStream = { + val genSings = new GenericSkippableInputStream(() => doGetInputStream()) + new DelegatingSeekableInputStream(genSings) { + override def getPos: Long = genSings.getPos + override def seek(newPos: Long): Unit = genSings.seek(newPos) + } + } + + override def getLocalPath(): Path = throw new AssertionError("Calling path on in memory location") + + override def cacheStrategy: CacheStrategy = locationDescription.cacheStrategy + + override def retryStrategy: RetryStrategy = locationDescription.retryStrategy + + override def rawUri: String = InMemoryByteStreamLocation.schemaWithColon + + override def testAccess(): Unit = {} +} diff --git a/raw-sources-in-memory/src/main/scala/raw/sources/bytestream/in_memory/InMemoryByteStreamLocationBuilder.scala b/raw-sources-in-memory/src/main/scala/raw/sources/bytestream/in_memory/InMemoryByteStreamLocationBuilder.scala new file mode 100644 index 000000000..26de551b8 --- /dev/null +++ b/raw-sources-in-memory/src/main/scala/raw/sources/bytestream/in_memory/InMemoryByteStreamLocationBuilder.scala @@ -0,0 +1,30 @@ +/* + * Copyright 2023 RAW Labs S.A. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.txt. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0, included in the file + * licenses/APL.txt. + */ + +package raw.sources.bytestream.in_memory + +import raw.sources.bytestream.{ByteStreamLocation, ByteStreamLocationBuilder} +import raw.sources.{LocationDescription, LocationException, SourceContext} + +class InMemoryByteStreamLocationBuilder extends ByteStreamLocationBuilder { + + override def schemes: Seq[String] = Seq(InMemoryByteStreamLocation.schema) + override def build(location: LocationDescription)(implicit sourceContext: SourceContext): ByteStreamLocation = { + if (location.url.startsWith(schemes.head)) { + new InMemoryByteStreamLocation(location) + } else { + throw new LocationException(s"not an in-memory location") + } + + } + +} diff --git a/raw-sources-in-memory/src/test/scala/raw/sources/bytestream/in_memory/TestInMemoryLocation.scala b/raw-sources-in-memory/src/test/scala/raw/sources/bytestream/in_memory/TestInMemoryLocation.scala new file mode 100644 index 000000000..81488b6e9 --- /dev/null +++ b/raw-sources-in-memory/src/test/scala/raw/sources/bytestream/in_memory/TestInMemoryLocation.scala @@ -0,0 +1,44 @@ +/* + * Copyright 2023 RAW Labs S.A. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.txt. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0, included in the file + * licenses/APL.txt. + */ + +package raw.sources.bytestream.in_memory + +import raw.RawTestSuite +import raw.sources._ + +class TestInMemoryLocation extends RawTestSuite { + + test("in memory new location") { _ => + val settings = Map[LocationSettingKey, LocationSettingValue]( + ( + LocationSettingKey(InMemoryByteStreamLocation.codeDataKey), + LocationBinarySetting("hello world".getBytes(UTF_8().rawEncoding)) + ) + ) + val locationDescription = LocationDescription(InMemoryByteStreamLocation.schemaWithColon, settings) + val inMemoryLocation = new InMemoryByteStreamLocation(locationDescription) + assert(inMemoryLocation.getInputStream.readAllBytes().sameElements("hello world".getBytes(UTF_8().rawEncoding))) + assert(inMemoryLocation.rawUri.startsWith("in-memory")) + } + + test("in memory location errors") { _ => + val settings = Map[LocationSettingKey, LocationSettingValue]( + (LocationSettingKey(InMemoryByteStreamLocation.codeDataKey), LocationStringSetting("hello world")) + ) + val locationDescription = LocationDescription(InMemoryByteStreamLocation.schemaWithColon, settings) + val inMemoryLocation = new InMemoryByteStreamLocation(locationDescription) + assertThrows[AssertionError](inMemoryLocation.getSeekableInputStream) + assertThrows[AssertionError](inMemoryLocation.getLocalPath()) + assertThrows[AssertionError](inMemoryLocation.getInputStream) + } + +} diff --git a/raw-sources-in-memory/src/test/scala/raw/sources/bytestream/in_memory/TestInMemoryLocationBuild.scala b/raw-sources-in-memory/src/test/scala/raw/sources/bytestream/in_memory/TestInMemoryLocationBuild.scala new file mode 100644 index 000000000..d66dcc6d2 --- /dev/null +++ b/raw-sources-in-memory/src/test/scala/raw/sources/bytestream/in_memory/TestInMemoryLocationBuild.scala @@ -0,0 +1,55 @@ +/* + * Copyright 2023 RAW Labs S.A. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.txt. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0, included in the file + * licenses/APL.txt. + */ + +package raw.sources.bytestream.in_memory + +import raw.RawTestSuite +import raw.sources._ + +class TestInMemoryLocationBuild extends RawTestSuite { + + test("in memory location builder") { _ => + val settings = Map[LocationSettingKey, LocationSettingValue]( + ( + LocationSettingKey(InMemoryByteStreamLocation.codeDataKey), + LocationBinarySetting("hello world".getBytes(UTF_8().rawEncoding)) + ) + ) + val locationDescription = LocationDescription(InMemoryByteStreamLocation.schemaWithColon, settings) + implicit val sourceContext: SourceContext = null + val inMemoryLocationBuilt = new InMemoryByteStreamLocationBuilder().build(locationDescription) + assert( + inMemoryLocationBuilt.getInputStream.readAllBytes().sameElements("hello world".getBytes(UTF_8().rawEncoding)) + ) + assert(inMemoryLocationBuilt.rawUri.startsWith("in-memory")) + } + + test("in memory location builder scheme") { _ => + val inMemoryLocationBuilder = new InMemoryByteStreamLocationBuilder() + assert(inMemoryLocationBuilder.schemes.size == 1) + assert(inMemoryLocationBuilder.schemes.head == InMemoryByteStreamLocation.schemaWithColon) + } + + test("wrong schema build error") { _ => + val settings = Map[LocationSettingKey, LocationSettingValue]( + ( + LocationSettingKey(InMemoryByteStreamLocation.codeDataKey), + LocationBinarySetting("hello world".getBytes(UTF_8().rawEncoding)) + ) + ) + val locationDescription = LocationDescription("not-in-memory", settings) + implicit val sourceContext: SourceContext = null + + assertThrows[LocationException](new InMemoryByteStreamLocationBuilder().build(locationDescription)) + } + +} diff --git a/raw-sources-local/src/main/scala/raw/sources/filesystem/local/LocalPath.scala b/raw-sources-local/src/main/scala/raw/sources/filesystem/local/LocalPath.scala index da67fddf2..b7349fb3a 100644 --- a/raw-sources-local/src/main/scala/raw/sources/filesystem/local/LocalPath.scala +++ b/raw-sources-local/src/main/scala/raw/sources/filesystem/local/LocalPath.scala @@ -27,8 +27,6 @@ class LocalPath(pathName: String, override val cacheStrategy: CacheStrategy, ove override def rawUri: String = s"file:$pathName" - override def sparkUri: String = rawUri - protected def path: Path = Paths.get(pathName) override def testAccess(): Unit = { diff --git a/raw-sources-mock/src/main/scala/raw/sources/filesystem/mock/MockPath.scala b/raw-sources-mock/src/main/scala/raw/sources/filesystem/mock/MockPath.scala index 9571efa5f..047a1fd20 100644 --- a/raw-sources-mock/src/main/scala/raw/sources/filesystem/mock/MockPath.scala +++ b/raw-sources-mock/src/main/scala/raw/sources/filesystem/mock/MockPath.scala @@ -42,8 +42,6 @@ class MockPath( override def rawUri: String = s"mock:${delegate.rawUri}" - override def sparkUri: String = rawUri - override def testAccess(): Unit = { doDelay() delegate.testAccess() diff --git a/raw-sources-s3/src/main/scala/raw/sources/filesystem/s3/S3Path.scala b/raw-sources-s3/src/main/scala/raw/sources/filesystem/s3/S3Path.scala index 07d295116..03408b3fb 100644 --- a/raw-sources-s3/src/main/scala/raw/sources/filesystem/s3/S3Path.scala +++ b/raw-sources-s3/src/main/scala/raw/sources/filesystem/s3/S3Path.scala @@ -39,15 +39,6 @@ class S3Path( s"s3://${cli.bucketName}$sep$path" } - override val sparkUri: String = { - // We use 's3a' for reading S3 from Spark. - if (rawUri.startsWith("s3:")) { - "s3a:" + rawUri.drop("s3:".length) - } else { - rawUri - } - } - override def testAccess(): Unit = { cli.testAccess(path) } diff --git a/raw-sources/src/main/scala/raw/sources/bytestream/ByteStreamLocation.scala b/raw-sources/src/main/scala/raw/sources/bytestream/ByteStreamLocation.scala index 8523d3eb9..886f7b89b 100644 --- a/raw-sources/src/main/scala/raw/sources/bytestream/ByteStreamLocation.scala +++ b/raw-sources/src/main/scala/raw/sources/bytestream/ByteStreamLocation.scala @@ -24,10 +24,6 @@ import scala.util.control.NonFatal trait ByteStreamLocation extends Location { - // TODO (msb): Does this still belong here? Should be overridden in raw.sources.hadoop/.spark ? - // Is it a Spark or Hadoop URL? - def sparkUri: String - // This call uses the retry mechanism. @throws[RawException] final def getInputStream: InputStream = {