From 00fb8a91fd71895ab876223002485337659539ca Mon Sep 17 00:00:00 2001 From: David Kleiven Date: Fri, 29 Mar 2024 19:18:52 +0100 Subject: [PATCH] feat: Add external extractor --- pom.xml | 15 ++ src/main/Config.kt | 2 + src/main/ExternalTripleStore.kt | 7 + .../ExternalNetworkResourceHandler.kt | 89 +++++++++ src/main/formItemHandlers/FormItemNames.kt | 1 + .../ExternalNetworkResourceHandlerTest.kt | 176 ++++++++++++++++++ .../testDataFactory/SparqlResultFactory.kt | 53 ++++++ 7 files changed, 343 insertions(+) create mode 100644 src/main/formItemHandlers/ExternalNetworkResourceHandler.kt create mode 100644 src/test/ExternalNetworkResourceHandlerTest.kt diff --git a/pom.xml b/pom.xml index cc3f922..59def67 100644 --- a/pom.xml +++ b/pom.xml @@ -100,6 +100,21 @@ io.ktor ktor-client-core-jvm ${ktor.version} + + + io.ktor + ktor-client + ${ktor.version} + + + io.ktor + ktor-client-cio-jvm + ${ktor.version} + + + io.ktor + ktor-client-mock-jvm + ${ktor.version} test diff --git a/src/main/Config.kt b/src/main/Config.kt index 04cc871..adb7963 100644 --- a/src/main/Config.kt +++ b/src/main/Config.kt @@ -10,12 +10,14 @@ private val logger = KotlinLogging.logger {} class Environment { companion object { var namespaceFile: String? = System.getenv("NAMESPACE_FILE") + val cimResource: String? = System.getenv("CIM_RESOURCE") } } class Config { companion object { var namespaces: Map = loadNamespaces(Environment.namespaceFile) + val cimResource: String = Environment.cimResource ?: "CIM16.sparql" } } diff --git a/src/main/ExternalTripleStore.kt b/src/main/ExternalTripleStore.kt index b5e94fe..590472a 100644 --- a/src/main/ExternalTripleStore.kt +++ b/src/main/ExternalTripleStore.kt @@ -46,6 +46,13 @@ fun populateInMemTripleStore(sparqlResult: SparqlResultJson): TripleStoreRDF4J { return store } +fun updateInMemTripleStore( + store: TripleStoreRDF4J, + sparqlResult: SparqlResultJson, +) { + store.update(insertQuery(sparqlResult.result.bindings)) +} + fun insertTriple(result: Map): String? { val graph = result["graph"] val subject = result["s"] diff --git a/src/main/formItemHandlers/ExternalNetworkResourceHandler.kt b/src/main/formItemHandlers/ExternalNetworkResourceHandler.kt new file mode 100644 index 0000000..a1936f8 --- /dev/null +++ b/src/main/formItemHandlers/ExternalNetworkResourceHandler.kt @@ -0,0 +1,89 @@ +package com.github.statnett.loadflowservice.formItemHandlers + +import com.github.statnett.loadflowservice.Config +import com.github.statnett.loadflowservice.SparqlResultJson +import com.github.statnett.loadflowservice.createExtractionQuery +import com.github.statnett.loadflowservice.parseQuery +import com.github.statnett.loadflowservice.updateInMemTripleStore +import com.powsybl.triplestore.api.PropertyBags +import com.powsybl.triplestore.impl.rdf4j.TripleStoreRDF4J +import io.github.oshai.kotlinlogging.KotlinLogging +import io.ktor.client.HttpClient +import io.ktor.client.call.body +import io.ktor.client.engine.HttpClientEngine +import io.ktor.client.engine.cio.CIO +import io.ktor.client.request.get +import io.ktor.client.request.headers +import io.ktor.http.HttpHeaders +import io.ktor.http.append +import io.ktor.http.content.PartData +import io.ktor.server.util.url +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.runBlocking +import kotlinx.serialization.json.Json + +private val logger = KotlinLogging.logger(ExternalNetworkResourceHandler::class.simpleName ?: "") + +class ExternalNetworkResourceHandler( + private val authorizationHeader: String? = null, + private val cimResource: String = Config.cimResource, + httpEngine: HttpClientEngine? = null, +) : FormItemLoadable { + private val client: HttpClient = + HttpClient(httpEngine ?: CIO.create()) { + expectSuccess = true + } + + private val tripleStore = TripleStoreRDF4J() + + override fun formItemHandler(part: PartData.FormItem) { + val name = part.name ?: "" + if (name == FormItemNames.NETWORK) { + val externalResourceUrls = part.value.split(",") + val sparqlResults = collectExternalNetworkData(externalResourceUrls) + sparqlResults.forEach { updateInMemTripleStore(tripleStore, it) } + } + } + + private fun collectExternalNetworkData(urls: List): List { + val parsedQuery = parseQuery(cimResource) + val extractionQuery = createExtractionQuery(parsedQuery) + return runBlocking { makeRequests(urls, extractionQuery) } + } + + private suspend fun makeRequests( + urls: List, + query: String, + ) = coroutineScope { + urls.map { url -> + async(Dispatchers.IO) { + makeSparqlRequest(url, query) + } + }.awaitAll() + } + + private suspend fun makeSparqlRequest( + resourceUrl: String, + query: String, + ): SparqlResultJson { + logger.info { "Requesting data from $resourceUrl" } + val response = + client.get(resourceUrl) { + url { + parameters.append("query", query) + } + headers { + if (authorizationHeader != null) { + append(HttpHeaders.Authorization, authorizationHeader) + } + append(HttpHeaders.Accept, "application/sparql-results+json") + } + } + return Json.decodeFromString(response.body()) + } + + fun tripleStoreQuery(query: String): PropertyBags = tripleStore.query(query) +} diff --git a/src/main/formItemHandlers/FormItemNames.kt b/src/main/formItemHandlers/FormItemNames.kt index 523b858..58a0efd 100644 --- a/src/main/formItemHandlers/FormItemNames.kt +++ b/src/main/formItemHandlers/FormItemNames.kt @@ -7,5 +7,6 @@ class FormItemNames { const val SENSITIVITY_FACTORS = "sensitivity-factors" const val SECURITY_ANALYSIS_PARAMS = "security-analysis-params" const val CONTINGENCIES = "contingencies" + const val NETWORK = "network" } } diff --git a/src/test/ExternalNetworkResourceHandlerTest.kt b/src/test/ExternalNetworkResourceHandlerTest.kt new file mode 100644 index 0000000..1dc72bd --- /dev/null +++ b/src/test/ExternalNetworkResourceHandlerTest.kt @@ -0,0 +1,176 @@ +import com.github.statnett.loadflowservice.formItemHandlers.ExternalNetworkResourceHandler +import com.github.statnett.loadflowservice.formItemHandlers.FormItemNames +import io.kotest.property.arbitrary.take +import io.ktor.client.engine.mock.MockEngine +import io.ktor.client.engine.mock.MockEngineConfig +import io.ktor.client.engine.mock.respond +import io.ktor.client.plugins.ClientRequestException +import io.ktor.client.plugins.ServerResponseException +import io.ktor.http.HttpHeaders +import io.ktor.http.HttpStatusCode +import io.ktor.http.content.PartData +import io.ktor.http.headersOf +import kotlinx.serialization.SerializationException +import kotlinx.serialization.encodeToString +import kotlinx.serialization.json.Json +import org.junit.jupiter.api.DynamicTest +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.TestFactory +import testDataFactory.SmallCimModels +import testDataFactory.sparqlResultArb +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith + +const val UNAVAILABLE_URL = "http://unavailable" +const val AVAILABLE_URL = "http://available" +const val DESERIALIZATION_ERROR = "http://random-json" +const val TOKEN_AUTHORIZATION_REQUIRED = "http://secret-sparql" +const val SECRET_TOKEN = "secret-token" +const val TWO_TERMINALS_URL = "http://two-terminals" +const val TWO_CONNECTIVITY_NODES_URL = "http://two-connectivity-nodes" +const val CIM = "http://cim-prefix" + +fun buildFormItem( + value: String, + name: String, +): PartData.FormItem { + return PartData.FormItem(value, {}, headersOf(HttpHeaders.ContentDisposition, "form-data; name=$name")) +} + +class ExternalNetworkResourceHandlerTest { + private val mockEngineConfig = MockEngineConfig() + + init { + mockEngineConfig.addHandler { request -> + val urlStr = request.url.toString() + if (urlStr.startsWith(UNAVAILABLE_URL)) { + respond( + "Not available", + HttpStatusCode.ServiceUnavailable, + headersOf(HttpHeaders.ContentType, "text/plain"), + ) + } else if (urlStr.startsWith(AVAILABLE_URL)) { + respond( + Json.encodeToString(sparqlResultArb.take(1).first()), + HttpStatusCode.OK, + headersOf(HttpHeaders.ContentType, "application/sparql-results+json"), + ) + } else if (urlStr.startsWith(DESERIALIZATION_ERROR)) { + respond( + """{"field": 1.0}""", + HttpStatusCode.OK, + headersOf(HttpHeaders.ContentType, "application/json"), + ) + } else if (urlStr.startsWith(TOKEN_AUTHORIZATION_REQUIRED)) { + val token = request.headers[HttpHeaders.Authorization] + respond( + Json.encodeToString(sparqlResultArb.take(1).first()), + if (token == SECRET_TOKEN) HttpStatusCode.OK else HttpStatusCode.Unauthorized, + headersOf(HttpHeaders.ContentType, "application/sparql-results+json"), + ) + } else if (urlStr.startsWith(TWO_TERMINALS_URL)) { + respond( + Json.encodeToString(SmallCimModels(CIM).twoTerminalsWithConnectivityNode()), + HttpStatusCode.OK, + headersOf(HttpHeaders.ContentType, "application/sparql-results+json"), + ) + } else if (urlStr.startsWith(TWO_CONNECTIVITY_NODES_URL)) { + respond( + Json.encodeToString(SmallCimModels(CIM).twoConnectivityNodes()), + HttpStatusCode.OK, + headersOf(HttpHeaders.ContentType, "application/sparql-results+json"), + ) + } else { + throw IllegalArgumentException("${request.url} not known") + } + } + } + + @TestFactory + fun `test error on unavailable`() = + listOf( + UNAVAILABLE_URL, + "$UNAVAILABLE_URL,$UNAVAILABLE_URL", + "$AVAILABLE_URL,$UNAVAILABLE_URL", + ).map { urls -> + DynamicTest.dynamicTest(urls) { + val data = buildFormItem(urls, FormItemNames.NETWORK) + assertEquals(FormItemNames.NETWORK, data.name!!) + + val engine = MockEngine(mockEngineConfig) + val handler = ExternalNetworkResourceHandler(httpEngine = engine) + assertFailsWith { handler.formItemHandler(data) } + } + } + + @Test + fun `test serialization error raises`() { + val data = buildFormItem(DESERIALIZATION_ERROR, FormItemNames.NETWORK) + val engine = MockEngine(mockEngineConfig) + assertFailsWith { + ExternalNetworkResourceHandler( + httpEngine = engine, + ).formItemHandler(data) + } + } + + @TestFactory + fun `test available ok with random data`() = + listOf( + AVAILABLE_URL, + "$AVAILABLE_URL,$AVAILABLE_URL", + ).map { urls -> + DynamicTest.dynamicTest(urls) { + val data = buildFormItem(urls, FormItemNames.NETWORK) + val engine = MockEngine(mockEngineConfig) + ExternalNetworkResourceHandler(httpEngine = engine).formItemHandler(data) + } + } + + @TestFactory + fun `test authentication error`() = + listOf( + null, + "wrong-token", + ).map { token -> + DynamicTest.dynamicTest("$token") { + val data = buildFormItem(TOKEN_AUTHORIZATION_REQUIRED, FormItemNames.NETWORK) + val engine = MockEngine(mockEngineConfig) + assertFailsWith { + ExternalNetworkResourceHandler( + httpEngine = engine, + authorizationHeader = token, + ).formItemHandler(data) + } + } + } + + @Test + fun `test authentication ok`() { + val data = buildFormItem(TOKEN_AUTHORIZATION_REQUIRED, FormItemNames.NETWORK) + val engine = MockEngine(mockEngineConfig) + ExternalNetworkResourceHandler(httpEngine = engine, authorizationHeader = SECRET_TOKEN).formItemHandler(data) + } + + @Test + fun `test store populated with small cim model`() { + val data = buildFormItem("$TWO_TERMINALS_URL,$TWO_CONNECTIVITY_NODES_URL", FormItemNames.NETWORK) + val engine = MockEngine(mockEngineConfig) + val handler = ExternalNetworkResourceHandler(httpEngine = engine) + handler.formItemHandler(data) + + val query = + """ + PREFIX cim: <$CIM#> + + SELECT ?name { + ?terminal cim:Terminal.ConnectivityNode/cim:IdentifiedObject.name ?name + } + """.trimIndent() + val result = handler.tripleStoreQuery(query) + assertEquals(2, result.size) + val expect = setOf("Connectivity node 1", "Connectivity node 2") + val got = result.map { item -> item["name"] }.toSet() + assertEquals(expect, got) + } +} diff --git a/src/test/testDataFactory/SparqlResultFactory.kt b/src/test/testDataFactory/SparqlResultFactory.kt index f4d684c..4a405b3 100644 --- a/src/test/testDataFactory/SparqlResultFactory.kt +++ b/src/test/testDataFactory/SparqlResultFactory.kt @@ -84,3 +84,56 @@ val sparqlObjectArb = val chosen = Arb.choice(options).bind() SparqlItem(chosen.first, chosen.second) } + +class SmallCimModels(private val cim: String) { + companion object { + const val CON_NODE1_URI = "urn:uuid:12-af" + const val CON_NODE2_URI = "urn:uuid:13-af" + } + + fun twoTerminalsWithConnectivityNode(): SparqlResultJson { + return SparqlResultJson( + head = SparqlVars(listOf("graph", "s", "p", "o")), + result = + SparqlResult( + listOf( + mapOf( + "graph" to SparqlItem(SparqlTypes.URI, "http://g"), + "s" to SparqlItem(SparqlTypes.URI, "urn:uuid:12-ae"), + "p" to SparqlItem(SparqlTypes.URI, "$cim#Terminal.ConnectivityNode"), + "o" to SparqlItem(SparqlTypes.URI, CON_NODE1_URI), + ), + mapOf( + "graph" to SparqlItem(SparqlTypes.URI, "http://g"), + "s" to SparqlItem(SparqlTypes.URI, "urn:uuid:13-ae"), + "p" to SparqlItem(SparqlTypes.URI, "$cim#Terminal.ConnectivityNode"), + "o" to SparqlItem(SparqlTypes.URI, CON_NODE2_URI), + ), + ), + ), + ) + } + + fun twoConnectivityNodes(): SparqlResultJson { + return SparqlResultJson( + head = SparqlVars(listOf("graph", "s", "p", "o")), + result = + SparqlResult( + listOf( + mapOf( + "graph" to SparqlItem(SparqlTypes.URI, "http://g"), + "s" to SparqlItem(SparqlTypes.URI, CON_NODE1_URI), + "p" to SparqlItem(SparqlTypes.URI, "$cim#IdentifiedObject.name"), + "o" to SparqlItem(SparqlTypes.LITERAL, "Connectivity node 1"), + ), + mapOf( + "graph" to SparqlItem(SparqlTypes.URI, "http://g"), + "s" to SparqlItem(SparqlTypes.URI, CON_NODE2_URI), + "p" to SparqlItem(SparqlTypes.URI, "$cim#IdentifiedObject.name"), + "o" to SparqlItem(SparqlTypes.LITERAL, "Connectivity node 2"), + ), + ), + ), + ) + } +}