Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3 FDML catalog #510

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions aws-s3/src/main/scala/latis/catalog/S3FdmlCatalog.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package latis.catalog

import scala.xml.XML

import blobstore.s3.S3Blob
import blobstore.s3.S3Store
import blobstore.url.Url
import cats.effect.IO
import cats.syntax.all.*
import fs2.Stream
import software.amazon.awssdk.services.s3.S3AsyncClient

import latis.dataset.Dataset
import latis.input.fdml.FdmlReader
import latis.input.fdml.FdmlParser
import latis.ops.OperationRegistry

object S3FdmlCatalog {

/** Creates a catalog from FDML files in an S3 bucket. */
def fromS3Client(
client: S3AsyncClient,
bucketName: String,
opReg: OperationRegistry
): IO[Catalog] = {
// Just keeping the first error out of possibly many.
val store = S3Store.builder[IO](client).build.leftMap(_.head).liftTo[IO]

val bucket = Url.parseF[IO](s"s3://$bucketName")

(store, bucket, opReg.pure[IO])
.flatMapN(bucketToDatasets)
.map(Catalog.fromFoldable(_))
}

private def bucketToDatasets(
store: S3Store[IO],
bucket: Url.Plain,
opReg: OperationRegistry
): IO[Vector[Dataset]] = listFdmlFiles(store, bucket)
.evalMap(urlToDataset(store, _, opReg))
.compile
.toVector

private def listFdmlFiles(
store: S3Store[IO],
url: Url.Plain
): Stream[IO, Url[S3Blob]] = store.list(url).filter {
_.path.fileName.filter(_.endsWith(".fdml")).isDefined
}

private def parseFdml(
str: String,
opReg: OperationRegistry
): Either[Throwable, Dataset] =
FdmlParser.parseXml(XML.loadString(str)).flatMap(FdmlReader.read(_, opReg))

private def urlToDataset(
store: S3Store[IO],
url: Url[S3Blob],
opReg: OperationRegistry
): IO[Dataset] = store.getContents(url).flatMap(parseFdml(_, opReg).liftTo[IO])

}
25 changes: 25 additions & 0 deletions aws-s3/src/test/resources/data.fdml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?xml version="1.0" encoding="UTF-8"?>

<dataset id="data"
title="Test Dataset"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="http://latis-data.io/schemas/1.0/fdml-with-text-adapter.xsd">

<source uri="data/data.txt"/>

<adapter class="latis.input.TextAdapter"
skipLines="1"/>

<function>
<scalar id="time"
units="days since 2000-01-01"
type="int"
class="latis.time.Time"/>
<tuple>
<scalar id="b" type="int"/>
<scalar id="c" type="double"/>
<scalar id="d" type="string"/>
</tuple>
</function>

</dataset>
24 changes: 24 additions & 0 deletions aws-s3/src/test/resources/data2.fdml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<?xml version="1.0" encoding="UTF-8"?>

<dataset id="data2"
title="Test Dataset"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="http://latis-data.io/schemas/1.0/fdml-with-text-adapter.xsd">

<source uri="data/data2.txt"/>

<adapter class="latis.input.TextAdapter"/>

<function>
<scalar id="time"
units="days since 2000-01-01"
type="int"
class="latis.time.Time"/>
<tuple>
<scalar id="b" type="int"/>
<scalar id="c" type="double"/>
<scalar id="d" type="string" size="1"/>
</tuple>
</function>

</dataset>
17 changes: 17 additions & 0 deletions aws-s3/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE configuration>

<configuration>
<import class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"/>
<import class="ch.qos.logback.core.ConsoleAppender"/>

<appender name="STDOUT" class="ConsoleAppender">
<encoder class="PatternLayoutEncoder">
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>

<root level="error">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
105 changes: 105 additions & 0 deletions aws-s3/src/test/scala/latis/catalog/S3FdmlCatalogSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package latis.catalog

import java.nio.file.Paths

import scala.concurrent.duration.Duration

import cats.effect.IO
import cats.effect.Resource
import com.dimafeng.testcontainers.LocalStackV2Container
import org.testcontainers.containers.localstack.LocalStackContainer.Service
import software.amazon.awssdk.services.s3.S3AsyncClient
import software.amazon.awssdk.services.s3.model.CreateBucketRequest
import software.amazon.awssdk.services.s3.model.PutObjectRequest
import latis.ops.OperationRegistry

final class S3FdmlCatalogSuite extends munit.CatsEffectSuite {

// Starting the test container is slow.
override val munitIOTimeout: Duration = Duration(1, "minute")

val s3Client = {
val localStack = {
val mkContainer = IO {
LocalStackV2Container(services = List(Service.S3))
}.flatTap(container => IO(container.start()))
Resource.make(mkContainer)(container => IO(container.stop()))
}

val client = localStack.flatMap { ls =>
val mkClient = IO {
S3AsyncClient
.builder()
.endpointOverride(ls.endpointOverride(Service.S3))
.credentialsProvider(ls.staticCredentialsProvider)
.region(ls.region)
.build()
}
Resource.make(mkClient)(client => IO(client.close()))
}.evalTap { cl =>
IO.fromCompletableFuture {
IO {
cl.createBucket(
CreateBucketRequest
.builder()
.bucket("empty")
.build()
)
}
} >> IO.fromCompletableFuture {
IO {
cl.createBucket(
CreateBucketRequest
.builder()
.bucket("nonempty")
.build()
)
}
} >> IO.fromCompletableFuture {
IO {
cl.putObject(
PutObjectRequest
.builder()
.bucket("nonempty")
.key("data.fdml")
.build(),
Paths.get(getClass().getResource("/data.fdml").toURI())
)
}
} >> IO.fromCompletableFuture {
IO {
cl.putObject(
PutObjectRequest
.builder()
.bucket("nonempty")
.key("data2.fdml")
.build(),
Paths.get(getClass().getResource("/data2.fdml").toURI())
)
}
}
}

ResourceSuiteLocalFixture("s3-client", client)
}

override val munitFixtures = List(s3Client)

test("empty") {
IO(s3Client()).flatTap { client =>
S3FdmlCatalog
.fromS3Client(client, "empty", OperationRegistry.default)
.flatMap(_.datasets.compile.count)
.assertEquals(0L)
}
}

test("nonempty") {
IO(s3Client()).flatTap { client =>
S3FdmlCatalog
.fromS3Client(client, "nonempty", OperationRegistry.default)
.flatMap(_.datasets.compile.count)
.assertEquals(2L)
}
}
}
15 changes: 15 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,21 @@ lazy val `aws-lambda` = project
}
)

lazy val awsS3 = project
.in(file("aws-s3"))
.dependsOn(core)
.settings(commonSettings)
.settings(
name := "latis3-aws-s3",
libraryDependencies ++= Seq(
"com.amazonaws" % "aws-java-sdk-core" % "1.12.746" % Test,
"ch.qos.logback" % "logback-classic" % logbackVersion % Test,
"com.github.fs2-blobstore" %% "s3" % "0.9.14",
"software.amazon.awssdk" % "s3" % "2.26.5",
"com.dimafeng" %% "testcontainers-scala-localstack-v2" % "0.41.4" % Test
)
)

lazy val core = project
.dependsOn(`dap2-parser`)
.dependsOn(macros)
Expand Down
Loading