From d77500adef58fa3faec619f694776ef24d357a0c Mon Sep 17 00:00:00 2001 From: Rueger Date: Fri, 19 Mar 2021 13:00:24 +0100 Subject: [PATCH 1/5] fixing three tests in the service module --- .../com/ubirch/controllers/EventLogControllerSpec.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/event-log-service/src/test/scala/com/ubirch/controllers/EventLogControllerSpec.scala b/event-log-service/src/test/scala/com/ubirch/controllers/EventLogControllerSpec.scala index d874557c..cf641ab6 100644 --- a/event-log-service/src/test/scala/com/ubirch/controllers/EventLogControllerSpec.scala +++ b/event-log-service/src/test/scala/com/ubirch/controllers/EventLogControllerSpec.scala @@ -2,10 +2,10 @@ package com.ubirch.controllers import com.github.nosan.embedded.cassandra.cql.CqlScript import com.ubirch.TestBase -import com.ubirch.models.{ EventLogGenericResponse, Values } +import com.ubirch.models.{EventLogGenericResponse, Values} import com.ubirch.service.ExtServiceBinder import com.ubirch.services.ServiceBinder -import com.ubirch.util.{ EventLogJsonSupport, InjectorHelper } +import com.ubirch.util.{EventLogJsonSupport, InjectorHelper} import io.prometheus.client.CollectorRegistry class EventLogControllerSpec extends TestBase { @@ -188,6 +188,7 @@ class EventLogControllerSpec extends TestBase { | milli int, | event_time timestamp, | nonce text, + | status text, | PRIMARY KEY ((id, category), year, month, day, hour, minute, second, milli) |) WITH CLUSTERING ORDER BY (year DESC, month DESC, day DESC, hour DESC, minute DESC, second DESC, milli DESC); |""".stripMargin, From c5eacef43846c089eadb21e055d98c82bbfff84a Mon Sep 17 00:00:00 2001 From: Rueger Date: Sun, 21 Mar 2021 16:22:18 +0100 Subject: [PATCH 2/5] reverting revert changing back formatting configurations adding sometimes explicit DurationInt import, as otherwise Intellij complains --- .../scala/com/ubirch/chainer/process/DefaultExecutors.scala | 2 +- .../src/test/scala/com/ubirch/verification/LookupSpec.scala | 2 +- pom.xml | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/event-log-chainer/src/main/scala/com/ubirch/chainer/process/DefaultExecutors.scala b/event-log-chainer/src/main/scala/com/ubirch/chainer/process/DefaultExecutors.scala index 8abd6983..0ecfe7f9 100644 --- a/event-log-chainer/src/main/scala/com/ubirch/chainer/process/DefaultExecutors.scala +++ b/event-log-chainer/src/main/scala/com/ubirch/chainer/process/DefaultExecutors.scala @@ -18,7 +18,7 @@ import com.ubirch.util._ import javax.inject._ import org.apache.kafka.clients.consumer.ConsumerRecord -import scala.concurrent.duration._ +import scala.concurrent.duration.DurationInt import scala.concurrent.{ ExecutionContext, Future } import scala.language.postfixOps import scala.util.{ Failure, Success, Try } diff --git a/event-log-verification-service/src/test/scala/com/ubirch/verification/LookupSpec.scala b/event-log-verification-service/src/test/scala/com/ubirch/verification/LookupSpec.scala index 2234a599..b646aa4e 100644 --- a/event-log-verification-service/src/test/scala/com/ubirch/verification/LookupSpec.scala +++ b/event-log-verification-service/src/test/scala/com/ubirch/verification/LookupSpec.scala @@ -19,7 +19,7 @@ import javax.inject._ import org.json4s.jackson.JsonMethods.parse import org.json4s.{ JNull, JValue } -import scala.concurrent.duration._ +import scala.concurrent.duration.DurationInt import scala.concurrent.{ ExecutionContext, Future } import scala.language.postfixOps diff --git a/pom.xml b/pom.xml index e53c0ab4..f4dc277e 100644 --- a/pom.xml +++ b/pom.xml @@ -171,8 +171,8 @@ format - Prevent - Prevent + Force + Force true Force true From 9c45881ae92c54c0f4f79625a5f555dab67a3fd8 Mon Sep 17 00:00:00 2001 From: Rueger Date: Sun, 21 Mar 2021 17:59:09 +0100 Subject: [PATCH 3/5] Revert "Revert "Merge pull request #64 from ubirch/UNG-1600/add-en-and-disable-functionality"" This reverts commit 36b90188e41e5fb507143d955058b50e5f0b7421. --- .../src/main/resources/db/keyspace.cql | 5 +- .../db/migrations/v8_Adding_status.cql | 6 ++ .../scala/com/ubirch/models/EventLogDAO.scala | 20 +++- .../scala/com/ubirch/models/EventLogRow.scala | 12 ++- .../main/scala/com/ubirch/models/Values.scala | 2 + .../com/ubirch/process/LoggerExecutor.scala | 9 +- .../test/scala/com/ubirch/EventLogSpec.scala | 93 +++++++++++++++++- .../com/ubirch/models/EventDAOLogSpec.scala | 96 +++++++++++++++++-- .../com/ubirch/process/ExecutorSpec.scala | 2 +- .../verification/controllers/DefaultApi.scala | 7 +- .../ubirch/verification/services/Finder.scala | 13 ++- pom.xml | 4 +- 12 files changed, 240 insertions(+), 29 deletions(-) create mode 100644 event-log-core/src/main/resources/db/migrations/v8_Adding_status.cql diff --git a/event-log-core/src/main/resources/db/keyspace.cql b/event-log-core/src/main/resources/db/keyspace.cql index 1b5a3302..c2fc4dad 100644 --- a/event-log-core/src/main/resources/db/keyspace.cql +++ b/event-log-core/src/main/resources/db/keyspace.cql @@ -21,5 +21,6 @@ create table if not exists events ( milli int, event_time timestamp, nonce text, - PRIMARY KEY ((id, category), year, month, day, hour) -) WITH CLUSTERING ORDER BY (year desc, month DESC, day DESC); + status text, + PRIMARY KEY ((id, category), year, month, day, hour, minute, second, milli) +) WITH CLUSTERING ORDER BY (year DESC, month DESC, day DESC, hour DESC, minute DESC, second DESC, milli DESC); diff --git a/event-log-core/src/main/resources/db/migrations/v8_Adding_status.cql b/event-log-core/src/main/resources/db/migrations/v8_Adding_status.cql new file mode 100644 index 00000000..aefb40b1 --- /dev/null +++ b/event-log-core/src/main/resources/db/migrations/v8_Adding_status.cql @@ -0,0 +1,6 @@ +/* Cassandra migration for keyspace event_log. + Version 8 - 2021-03-18T08:29:29.223675+00:00 + + Adding status */ + +ALTER TABLE events ADD status text; diff --git a/event-log-core/src/main/scala/com/ubirch/models/EventLogDAO.scala b/event-log-core/src/main/scala/com/ubirch/models/EventLogDAO.scala index 08894d57..f762d994 100644 --- a/event-log-core/src/main/scala/com/ubirch/models/EventLogDAO.scala +++ b/event-log-core/src/main/scala/com/ubirch/models/EventLogDAO.scala @@ -19,7 +19,7 @@ trait EventLogQueries extends TablePointer[EventLogRow] with CustomEncodings[Eve def selectAllQ: db.Quoted[db.EntityQuery[EventLogRow]] = quote(query[EventLogRow]) - def byIdAndCatQ(id: String, category: String) = quote { + def byIdAndCatQ(id: String, category: String): Quoted[EntityQuery[EventLogRow]] = quote { query[EventLogRow].filter(x => x.id == lift(id) && x.category == lift(category)).map(x => x) } @@ -109,4 +109,22 @@ class EventsDAO @Inject() (val events: Events, val lookups: Lookups)(implicit va def deleteFromEventLog(eventLog: EventLog): Future[Int] = { events.delete(EventLogRow.fromEventLog(eventLog)).map(_ => 1) } + + def updateFromEventLog(eventLog: EventLog): Future[Int] = { + events.byIdAndCat(eventLog.id, Values.UPP_CATEGORY).flatMap { rows => + Future.sequence(rows.map { + row => + eventLog.category match { + case Values.UPP_ENABLE_CATEGORY => + val updated = row.copy(status = Some(Values.UPP_STATUS_ENABLED)) + events.insert(updated) + case _ => + val updated = row.copy(status = Some(Values.UPP_STATUS_DISABLED)) + events.insert(updated) + } + + }).map(_ => 1) + } + } + } diff --git a/event-log-core/src/main/scala/com/ubirch/models/EventLogRow.scala b/event-log-core/src/main/scala/com/ubirch/models/EventLogRow.scala index d9496660..4e78592e 100644 --- a/event-log-core/src/main/scala/com/ubirch/models/EventLogRow.scala +++ b/event-log-core/src/main/scala/com/ubirch/models/EventLogRow.scala @@ -1,16 +1,16 @@ package com.ubirch.models -import java.util.Date - import org.json4s.JValue +import java.util.Date + /** * Concrete type for the EventLogBase whose type T is JValue * * @param id String that identifies the EventLog. It can be a hash or a UUID or anything unique * @param customerId Represents an id for a customer id. * @param serviceClass Represents the name from where the log comes. - * E.G: The name of the class. + * E.G: The name of the class. * @param category Represents the category for the event. This is useful for * adding layers of description to the event. * @param event Represents the event that is to be recorded. @@ -21,6 +21,9 @@ import org.json4s.JValue * to support its creation from the eventTime. * @param signature Represents the signature for the event log. * @param nonce Represents a value that can be used to calculate the hash of the event. + * @param status Represents the status of the EventlogRow. (At the moment only used for UPP categories. + * If it has become disabled, it shouldn't be possible to verify the UPP). It's value might + * be either NONE, Some(ENABLED) or Some(DISABLED). None is equivalent to Some(ENABLED). */ case class EventLogRow( @@ -32,7 +35,8 @@ case class EventLogRow( eventTime: Date, eventTimeInfo: TimeInfo, signature: String, - nonce: String + nonce: String, + status: Option[String] = None ) object EventLogRow { diff --git a/event-log-core/src/main/scala/com/ubirch/models/Values.scala b/event-log-core/src/main/scala/com/ubirch/models/Values.scala index b3b0a8c0..526b4211 100644 --- a/event-log-core/src/main/scala/com/ubirch/models/Values.scala +++ b/event-log-core/src/main/scala/com/ubirch/models/Values.scala @@ -10,6 +10,8 @@ object Values { val UPP_DISABLE_CATEGORY = "UPP_DISABLE" val UPP_ENABLE_CATEGORY = "UPP_ENABLE" val UPP_DELETE_CATEGORY = "UPP_DELETE" + val UPP_STATUS_DISABLED = "DISABLED" + val UPP_STATUS_ENABLED = "ENABLED" val PUB_KEY_CATEGORY = "PUB_KEY" val CHAIN_CATEGORY = "CHAIN" val DEVICE_CATEGORY = "DEVICE" diff --git a/event-log-core/src/main/scala/com/ubirch/process/LoggerExecutor.scala b/event-log-core/src/main/scala/com/ubirch/process/LoggerExecutor.scala index 51e4f780..9e3e6188 100644 --- a/event-log-core/src/main/scala/com/ubirch/process/LoggerExecutor.scala +++ b/event-log-core/src/main/scala/com/ubirch/process/LoggerExecutor.scala @@ -83,12 +83,9 @@ class LoggerExecutor @Inject() ( private def processEventLog(eventLog: EventLog): Future[Int] = { eventLog.category match { - case Values.UPP_ENABLE_CATEGORY => - logger.warn(s"it has not been implemented yet. category: ${eventLog.category}") - Future.successful(0) - case Values.UPP_DISABLE_CATEGORY => - logger.warn(s"it has not been implemented yet. category: ${eventLog.category}") - Future.successful(0) + case Values.UPP_ENABLE_CATEGORY | Values.UPP_DISABLE_CATEGORY => + logger.info(s"updating eventlog with id: ${eventLog.id} to status: ${eventLog.category}") + events.updateFromEventLog(eventLog) case Values.UPP_DELETE_CATEGORY => logger.info(s"delete event log. id: ${eventLog.id}") // the target UPP that is deleted should have an UPP category diff --git a/event-log-core/src/test/scala/com/ubirch/EventLogSpec.scala b/event-log-core/src/test/scala/com/ubirch/EventLogSpec.scala index 85e1f52f..60856864 100644 --- a/event-log-core/src/test/scala/com/ubirch/EventLogSpec.scala +++ b/event-log-core/src/test/scala/com/ubirch/EventLogSpec.scala @@ -270,6 +270,94 @@ class EventLogSpec extends TestBase with EmbeddedCassandra with LazyLogging { assert(res1.isEmpty) } + } + + "consume messages and disable UPP in cassandra" in { + + implicit val kafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig(kafkaPort = PortGiver.giveMeKafkaPort, zooKeeperPort = PortGiver.giveMeZookeeperPort) + + val InjectorHelper = new InjectorHelperImpl("localhost:" + kafkaConfig.kafkaPort) + + val config = InjectorHelper.get[Config] + + val entities = (0 to 5).map(_ => Entities.Events.eventExample(UUIDHelper.randomUUID, Values.UPP_CATEGORY) + .sign(config) + .withCustomerId(UUIDHelper.randomUUID)).toList + + val eventsDAO = InjectorHelper.get[EventsDAO] + + entities.map(el => await(eventsDAO.insertFromEventLogWithoutLookups(el), 2 seconds)) + val all = await(eventsDAO.events.selectAll, 2 seconds) + + assert(all.size == entities.size) + + withRunningKafka { + + //Publish updates + val topic = "com.ubirch.eventlog" + val entitiesAsString = entities.map(_.copy(category = Values.UPP_DISABLE_CATEGORY)).map(_.toJson) + entitiesAsString.foreach { entityAsString => + publishStringMessageToKafka(topic, entityAsString) + } + + //Consumer + val consumer = InjectorHelper.get[StringConsumer] + consumer.startPolling() + Thread.sleep(10000) + + //Read Events + val events = InjectorHelper.get[Events] + + def res: List[EventLogRow] = await(events.selectAll, 2 seconds) + + assert(res.size == entities.size) + res.map(row => assert(row.status.contains(Values.UPP_STATUS_DISABLED))) + } + + } + + "consume messages and enable UPP in cassandra" in { + + implicit val kafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig(kafkaPort = PortGiver.giveMeKafkaPort, zooKeeperPort = PortGiver.giveMeZookeeperPort) + + val InjectorHelper = new InjectorHelperImpl("localhost:" + kafkaConfig.kafkaPort) + + val config = InjectorHelper.get[Config] + + val entities = (0 to 5).map(_ => Entities.Events.eventExample(UUIDHelper.randomUUID, Values.UPP_CATEGORY) + .sign(config) + .withCustomerId(UUIDHelper.randomUUID)).toList + + val eventsDAO = InjectorHelper.get[EventsDAO] + + entities.map(el => await(eventsDAO.insertFromEventLogWithoutLookups(el), 2 seconds)) + val all = await(eventsDAO.events.selectAll, 2 seconds) + + assert(all.size == entities.size) + + withRunningKafka { + + //Publish updates + val topic = "com.ubirch.eventlog" + val entitiesAsString = entities.map(_.copy(category = Values.UPP_ENABLE_CATEGORY)).map(_.toJson) + entitiesAsString.foreach { entityAsString => + publishStringMessageToKafka(topic, entityAsString) + } + + //Consumer + val consumer = InjectorHelper.get[StringConsumer] + consumer.startPolling() + Thread.sleep(10000) + + //Read Events + val events = InjectorHelper.get[Events] + + def res: List[EventLogRow] = await(events.selectAll, 2 seconds) + + //Verify + assert(res.size == entities.size) + res.map(row => assert(row.status.contains(Values.UPP_STATUS_ENABLED))) + } } @@ -616,8 +704,9 @@ class EventLogSpec extends TestBase with EmbeddedCassandra with LazyLogging { | milli int, | event_time timestamp, | nonce text, - | PRIMARY KEY ((id, category), year, month, day, hour) - |) WITH CLUSTERING ORDER BY (year desc, month DESC, day DESC); + | status text, + | PRIMARY KEY ((id, category), year, month, day, hour, minute, second, milli) + |) WITH CLUSTERING ORDER BY (year DESC, month DESC, day DESC, hour DESC, minute DESC, second DESC, milli DESC); """.stripMargin, "drop table if exists lookups;", """ diff --git a/event-log-core/src/test/scala/com/ubirch/models/EventDAOLogSpec.scala b/event-log-core/src/test/scala/com/ubirch/models/EventDAOLogSpec.scala index de88171c..903e6683 100644 --- a/event-log-core/src/test/scala/com/ubirch/models/EventDAOLogSpec.scala +++ b/event-log-core/src/test/scala/com/ubirch/models/EventDAOLogSpec.scala @@ -1,7 +1,5 @@ package com.ubirch.models -import java.util.Date - import com.github.nosan.embedded.cassandra.cql.CqlScript import com.typesafe.scalalogging.LazyLogging import com.ubirch.util.{ InjectorHelper, UUIDHelper } @@ -10,7 +8,8 @@ import io.prometheus.client.CollectorRegistry import org.json4s.JValue import org.json4s.jackson.JsonMethods.parse -import scala.concurrent.duration._ +import java.util.Date +import scala.concurrent.duration.DurationInt import scala.language.postfixOps class EventDAOLogSpec extends TestBase with EmbeddedCassandra with LazyLogging { @@ -102,6 +101,90 @@ class EventDAOLogSpec extends TestBase with EmbeddedCassandra with LazyLogging { } + "eventLogRowEnable" in { + val data: JValue = parse(""" { "id" : [1, 2, 3, 4] } """) + + def date = new Date() + + val key = UUIDHelper.timeBasedUUID.toString + val value = UUIDHelper.timeBasedUUID.toString + val category = Values.UPP_CATEGORY + val lookupName = "lookupname" + + val event = { + EventLog(data) + .withNewId(key) + .withCustomerId("my customer id") + .withCategory(category) + .withServiceClass("my service class") + .withEventTime(date) + .withSignature("my signature") + .withNonce("my nonce") + .withHeaders("hola" -> "Hello", "hey" -> "como estas") + .addLookupKeys(LookupKey(lookupName, category, key.asKey, Seq(value.asValue))) + } + + val eventsDAO = InjectorHelper.get[EventsDAO] + + await(eventsDAO.insertFromEventLog(event), 2 seconds) + + val foundEvent = await(eventsDAO.eventLogRowByLookupRowInfo(value, lookupName, category), 2 seconds) + + val eventRow = EventLogRow.fromEventLog(event) + assert(foundEvent.isDefined) + assert(eventRow == foundEvent.get) + + await(eventsDAO.events.insert(foundEvent.head.copy(status = Some(Values.UPP_STATUS_ENABLED))), 2 seconds) + val updatedEvent = await(eventsDAO.eventLogRowByLookupRowInfo(value, lookupName, category), 2 seconds) + + assert(updatedEvent.nonEmpty) + assert(updatedEvent.head.status.contains(Values.UPP_STATUS_ENABLED)) + + assert(updatedEvent.contains(eventRow.copy(status = Some(Values.UPP_STATUS_ENABLED)))) + } + + "eventLogRowDisable" in { + val data: JValue = parse(""" { "id" : [1, 2, 3, 4] } """) + + def date = new Date() + + val key = UUIDHelper.timeBasedUUID.toString + val value = UUIDHelper.timeBasedUUID.toString + val category = Values.UPP_CATEGORY + val lookupName = "lookupname" + + val event = { + EventLog(data) + .withNewId(key) + .withCustomerId("my customer id") + .withCategory(category) + .withServiceClass("my service class") + .withEventTime(date) + .withSignature("my signature") + .withNonce("my nonce") + .withHeaders("hola" -> "Hello", "hey" -> "como estas") + .addLookupKeys(LookupKey(lookupName, category, key.asKey, Seq(value.asValue))) + } + + val eventsDAO = InjectorHelper.get[EventsDAO] + + await(eventsDAO.insertFromEventLog(event), 2 seconds) + + val foundEvent = await(eventsDAO.eventLogRowByLookupRowInfo(value, lookupName, category), 2 seconds) + + val eventRow = EventLogRow.fromEventLog(event) + assert(foundEvent.isDefined) + assert(eventRow == foundEvent.get) + + await(eventsDAO.events.insert(foundEvent.head.copy(status = Some(Values.UPP_STATUS_DISABLED))), 2 seconds) + val updatedEvent = await(eventsDAO.eventLogRowByLookupRowInfo(value, lookupName, category), 2 seconds) + + assert(updatedEvent.nonEmpty) + assert(updatedEvent.head.status.contains(Values.UPP_STATUS_DISABLED)) + + assert(updatedEvent.contains(eventRow.copy(status = Some(Values.UPP_STATUS_DISABLED)))) + } + "eventLogRowByLookupValueAndCategory" in { val data: JValue = parse(""" { "id" : [1, 2, 3, 4] } """) @@ -350,7 +433,7 @@ class EventDAOLogSpec extends TestBase with EmbeddedCassandra with LazyLogging { events.map(e => await(eventsDAO.deleteFromEventLog(e), 2 seconds)) val allAfterDeleted = await(eventsDAO.events.selectAll, 2 seconds) - assert(allAfterDeleted.size == 0) + assert(allAfterDeleted.isEmpty) } } @@ -385,8 +468,9 @@ class EventDAOLogSpec extends TestBase with EmbeddedCassandra with LazyLogging { | milli int, | event_time timestamp, | nonce text, - | PRIMARY KEY ((id, category), year, month, day, hour) - |) WITH CLUSTERING ORDER BY (year desc, month DESC, day DESC); + | status text, + | PRIMARY KEY ((id, category), year, month, day, hour, minute, second, milli) + |) WITH CLUSTERING ORDER BY (year DESC, month DESC, day DESC, hour DESC, minute DESC, second DESC, milli DESC); """.stripMargin, "drop table if exists lookups;", """ diff --git a/event-log-core/src/test/scala/com/ubirch/process/ExecutorSpec.scala b/event-log-core/src/test/scala/com/ubirch/process/ExecutorSpec.scala index 4099bc7f..c2d806e6 100644 --- a/event-log-core/src/test/scala/com/ubirch/process/ExecutorSpec.scala +++ b/event-log-core/src/test/scala/com/ubirch/process/ExecutorSpec.scala @@ -14,7 +14,7 @@ import org.mockito.ArgumentMatchers.any import org.mockito.Mockito._ import org.scalatest.mockito.MockitoSugar -import scala.concurrent.duration._ +import scala.concurrent.duration.DurationInt import scala.concurrent.{ Future, Promise } import scala.language.{ implicitConversions, postfixOps } diff --git a/event-log-verification-service/src/main/scala/com/ubirch/verification/controllers/DefaultApi.scala b/event-log-verification-service/src/main/scala/com/ubirch/verification/controllers/DefaultApi.scala index 0af372f5..2c482e0d 100644 --- a/event-log-verification-service/src/main/scala/com/ubirch/verification/controllers/DefaultApi.scala +++ b/event-log-verification-service/src/main/scala/com/ubirch/verification/controllers/DefaultApi.scala @@ -1,8 +1,5 @@ package com.ubirch.verification.controllers -import java.io.{ ByteArrayOutputStream, IOException } -import java.nio.charset.StandardCharsets - import com.google.inject.Inject import com.typesafe.scalalogging.StrictLogging import com.ubirch.niomon.cache.RedisCache @@ -16,11 +13,13 @@ import com.ubirch.verification.services.kafka.AcctEventPublishing import com.ubirch.verification.services.{ KeyServiceBasedVerifier, TokenVerification } import com.ubirch.verification.util.{ HashHelper, LookupJsonSupport } import io.udash.rest.raw.JsonValue -import javax.inject.{ Named, Singleton } import org.json4s.JsonAST.JNull import org.msgpack.core.MessagePack import org.redisson.api.RMapCache +import java.io.{ ByteArrayOutputStream, IOException } +import java.nio.charset.StandardCharsets +import javax.inject.{ Named, Singleton } import scala.concurrent.{ ExecutionContext, Future } @Singleton diff --git a/event-log-verification-service/src/main/scala/com/ubirch/verification/services/Finder.scala b/event-log-verification-service/src/main/scala/com/ubirch/verification/services/Finder.scala index 4d3d717f..190ba71f 100644 --- a/event-log-verification-service/src/main/scala/com/ubirch/verification/services/Finder.scala +++ b/event-log-verification-service/src/main/scala/com/ubirch/verification/services/Finder.scala @@ -95,7 +95,18 @@ class DefaultFinder @Inject() (cassandraFinder: CassandraFinder, gremlinFinder: extends Finder with LazyLogging { - def findEventLog(value: String, category: String): Future[Option[EventLogRow]] = cassandraFinder.findEventLog(value, category) + /** + * This finder method filters the result of the cassandra query depending on the status of the eventLogRow. + * If it is disabled, a verification shouldn't succeed. + */ + def findEventLog(value: String, category: String): Future[Option[EventLogRow]] = { + cassandraFinder + .findEventLog(value, category) + .map { + case Some(row) if !row.status.contains(Values.UPP_STATUS_DISABLED) => Some(row) + case _ => None + } + } def findByPayload(value: String): Future[Option[EventLogRow]] = findEventLog(value, Values.UPP_CATEGORY) diff --git a/pom.xml b/pom.xml index f4dc277e..e53c0ab4 100644 --- a/pom.xml +++ b/pom.xml @@ -171,8 +171,8 @@ format - Force - Force + Prevent + Prevent true Force true From a5358cd1f1d225409f92f8aee172421c1107a95a Mon Sep 17 00:00:00 2001 From: Rueger Date: Mon, 22 Mar 2021 09:22:43 +0100 Subject: [PATCH 4/5] fixing formatting rule to state of before adding status to different tests making DurationInt import explicit --- .../scala/com/ubirch/lookup/process/DefaultExecutors.scala | 2 +- .../src/test/scala/com/ubirch/lookup/LookupSpec.scala | 7 ++++--- .../test/scala/com/ubirch/verification/LookupSpec.scala | 5 +++-- pom.xml | 4 ++-- 4 files changed, 10 insertions(+), 8 deletions(-) diff --git a/event-log-kafka-lookup/src/main/scala/com/ubirch/lookup/process/DefaultExecutors.scala b/event-log-kafka-lookup/src/main/scala/com/ubirch/lookup/process/DefaultExecutors.scala index ddf0b4c7..5657bd67 100644 --- a/event-log-kafka-lookup/src/main/scala/com/ubirch/lookup/process/DefaultExecutors.scala +++ b/event-log-kafka-lookup/src/main/scala/com/ubirch/lookup/process/DefaultExecutors.scala @@ -18,7 +18,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.producer.ProducerRecord import org.json4s.JsonAST.JNull -import scala.concurrent.duration._ +import scala.concurrent.duration.DurationInt import scala.concurrent.{ ExecutionContext, Future } import scala.language.postfixOps diff --git a/event-log-kafka-lookup/src/test/scala/com/ubirch/lookup/LookupSpec.scala b/event-log-kafka-lookup/src/test/scala/com/ubirch/lookup/LookupSpec.scala index 43c6ceef..a6234364 100644 --- a/event-log-kafka-lookup/src/test/scala/com/ubirch/lookup/LookupSpec.scala +++ b/event-log-kafka-lookup/src/test/scala/com/ubirch/lookup/LookupSpec.scala @@ -24,7 +24,7 @@ import org.apache.kafka.common.serialization.StringSerializer import org.json4s.JValue import org.json4s.jackson.JsonMethods.parse -import scala.concurrent.duration._ +import scala.concurrent.duration.DurationInt import scala.concurrent.{ ExecutionContext, Future } import scala.language.postfixOps @@ -179,7 +179,7 @@ class LookupSpec extends TestBase with EmbeddedCassandra with LazyLogging { val insertEventSql: String = s""" - |INSERT INTO events (id, customer_id, service_class, category, event, event_time, year, month, day, hour, minute, second, milli, signature, nonce) + |INSERT INTO events (id, customer_id, service_class, category, event, event_time, year, month, day, hour, minute, second, milli, signature, nonce, status) | VALUES ('c29tZSBieXRlcyEAAQIDnw==', 'customer_id', 'service_class', '${Values.UPP_CATEGORY}', '{ | "hint":0, | "payload":"c29tZSBieXRlcyEAAQIDnw==", @@ -188,7 +188,7 @@ class LookupSpec extends TestBase with EmbeddedCassandra with LazyLogging { | "uuid":"8e78b5ca-6597-11e8-8185-c83ea7000e4d", | "version":34 |}', '2019-01-29T17:00:28.333Z', 2019, 5, 2, 19, 439, 16, 0, '0681D35827B17104A2AACCE5A08C4CD1BC8A5EF5EFF4A471D15976693CC0D6D67392F1CACAE63565D6E521D2325A998CDE00A2FEF5B65D0707F4158000EB6D05', - |'34376336396166392D336533382D343665652D393063332D616265313364383335353266'); + |'34376336396166392D336533382D343665652D393063332D616265313364383335353266', null); """.stripMargin val insertLookupSql: String = @@ -925,6 +925,7 @@ class LookupSpec extends TestBase with EmbeddedCassandra with LazyLogging { | milli int, | event_time timestamp, | nonce text, + | status text, | PRIMARY KEY ((id, category), year, month, day, hour) |) WITH CLUSTERING ORDER BY (year desc, month DESC, day DESC); """.stripMargin, diff --git a/event-log-verification-service/src/test/scala/com/ubirch/verification/LookupSpec.scala b/event-log-verification-service/src/test/scala/com/ubirch/verification/LookupSpec.scala index b646aa4e..cd625768 100644 --- a/event-log-verification-service/src/test/scala/com/ubirch/verification/LookupSpec.scala +++ b/event-log-verification-service/src/test/scala/com/ubirch/verification/LookupSpec.scala @@ -178,7 +178,7 @@ class LookupSpec extends TestBase with EmbeddedCassandra with LazyLogging { val insertEventSql: String = s""" - |INSERT INTO events (id, customer_id, service_class, category, event, event_time, year, month, day, hour, minute, second, milli, signature, nonce) + |INSERT INTO events (id, customer_id, service_class, category, event, event_time, year, month, day, hour, minute, second, milli, signature, nonce, status) | VALUES ('c29tZSBieXRlcyEAAQIDnw==', 'customer_id', 'service_class', '${Values.UPP_CATEGORY}', '{ | "hint":0, | "payload":"c29tZSBieXRlcyEAAQIDnw==", @@ -187,7 +187,7 @@ class LookupSpec extends TestBase with EmbeddedCassandra with LazyLogging { | "uuid":"8e78b5ca-6597-11e8-8185-c83ea7000e4d", | "version":34 |}', '2019-01-29T17:00:28.333Z', 2019, 5, 2, 19, 439, 16, 0, '0681D35827B17104A2AACCE5A08C4CD1BC8A5EF5EFF4A471D15976693CC0D6D67392F1CACAE63565D6E521D2325A998CDE00A2FEF5B65D0707F4158000EB6D05', - |'34376336396166392D336533382D343665652D393063332D616265313364383335353266'); + |'34376336396166392D336533382D343665652D393063332D616265313364383335353266', null); """.stripMargin val data: String = @@ -559,6 +559,7 @@ class LookupSpec extends TestBase with EmbeddedCassandra with LazyLogging { | milli int, | event_time timestamp, | nonce text, + | status text, | PRIMARY KEY ((id, category), year, month, day, hour) |) WITH CLUSTERING ORDER BY (year desc, month DESC, day DESC); """.stripMargin, diff --git a/pom.xml b/pom.xml index e53c0ab4..f4dc277e 100644 --- a/pom.xml +++ b/pom.xml @@ -171,8 +171,8 @@ format - Prevent - Prevent + Force + Force true Force true From ce7582396055e2fb691cad183ac750104eb1e5bc Mon Sep 17 00:00:00 2001 From: Rueger Date: Mon, 22 Mar 2021 10:15:59 +0100 Subject: [PATCH 5/5] adding filter to default finder to only let through enabled EventLogRows adding tests, to check it works correctly --- .../com/ubirch/lookup/models/Finder.scala | 17 +- .../scala/com/ubirch/lookup/LookupSpec.scala | 226 ++++++++++++++++-- 2 files changed, 217 insertions(+), 26 deletions(-) diff --git a/event-log-kafka-lookup/src/main/scala/com/ubirch/lookup/models/Finder.scala b/event-log-kafka-lookup/src/main/scala/com/ubirch/lookup/models/Finder.scala index 564f6a93..11d9074b 100644 --- a/event-log-kafka-lookup/src/main/scala/com/ubirch/lookup/models/Finder.scala +++ b/event-log-kafka-lookup/src/main/scala/com/ubirch/lookup/models/Finder.scala @@ -1,11 +1,11 @@ package com.ubirch.lookup.models import com.typesafe.scalalogging.LazyLogging -import com.ubirch.models.{ EventLogRow, Values } -import javax.inject._ +import com.ubirch.models.{EventLogRow, Values} -import scala.concurrent.{ ExecutionContext, Future } -import scala.util.{ Failure, Success } +import javax.inject._ +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Success} trait Finder extends LazyLogging { @@ -92,8 +92,15 @@ class DefaultFinder @Inject() (cassandraFinder: CassandraFinder, gremlinFinder: extends Finder with LazyLogging { + /** + * Method filters all EventLogRows that are disabled, as those shouldn't become verified anymore. + * If the status is None or Some(Values.UPP_STATUS_ENABLED) the UPPs are verifiable. + */ def findEventLog(value: String, category: String): Future[Option[EventLogRow]] = - cassandraFinder.findEventLog(value, category) + cassandraFinder.findEventLog(value, category).map { + case Some(row) if !row.status.contains(Values.UPP_STATUS_DISABLED) => Some(row) + case _ => None + } def findByPayload(value: String): Future[Option[EventLogRow]] = findEventLog(value, Values.UPP_CATEGORY) diff --git a/event-log-kafka-lookup/src/test/scala/com/ubirch/lookup/LookupSpec.scala b/event-log-kafka-lookup/src/test/scala/com/ubirch/lookup/LookupSpec.scala index a6234364..48923b11 100644 --- a/event-log-kafka-lookup/src/test/scala/com/ubirch/lookup/LookupSpec.scala +++ b/event-log-kafka-lookup/src/test/scala/com/ubirch/lookup/LookupSpec.scala @@ -1,34 +1,33 @@ package com.ubirch.lookup -import java.util.UUID - import com.github.nosan.embedded.cassandra.cql.CqlScript import com.google.inject.Module import com.google.inject.binder.ScopedBindingBuilder -import com.typesafe.config.{ Config, ConfigValueFactory } +import com.typesafe.config.{Config, ConfigValueFactory} import com.typesafe.scalalogging.LazyLogging import com.ubirch.kafka.consumer.StringConsumer import com.ubirch.kafka.util.PortGiver import com.ubirch.lookup.models._ import com.ubirch.lookup.process.LookupExecutor -import com.ubirch.lookup.services.{ DefaultTestingGremlinConnector, Gremlin, LookupServiceBinder } +import com.ubirch.lookup.services.{DefaultTestingGremlinConnector, Gremlin, LookupServiceBinder} import com.ubirch.lookup.util.LookupJsonSupport import com.ubirch.models._ import com.ubirch.protocol.ProtocolMessage import com.ubirch.services.config.ConfigProvider import com.ubirch.util._ import io.prometheus.client.CollectorRegistry -import javax.inject._ import net.manub.embeddedkafka.EmbeddedKafkaConfig import org.apache.kafka.common.serialization.StringSerializer import org.json4s.JValue import org.json4s.jackson.JsonMethods.parse +import java.util.UUID +import javax.inject._ import scala.concurrent.duration.DurationInt -import scala.concurrent.{ ExecutionContext, Future } +import scala.concurrent.{ExecutionContext, Future} import scala.language.postfixOps -class FakeEmptyFinder @Inject() (cassandraFinder: CassandraFinder)(implicit val ec: ExecutionContext) extends Finder { +class FakeEmptyFinder @Inject()(cassandraFinder: CassandraFinder)(implicit val ec: ExecutionContext) extends Finder { def findEventLog(value: String, category: String): Future[Option[EventLogRow]] = cassandraFinder.findEventLog(value, category) @@ -43,7 +42,11 @@ class FakeEmptyFinder @Inject() (cassandraFinder: CassandraFinder)(implicit val class FakeFoundFinder @Inject() (cassandraFinder: CassandraFinder)(implicit val ec: ExecutionContext) extends Finder { - def findEventLog(value: String, category: String): Future[Option[EventLogRow]] = cassandraFinder.findEventLog(value, category) + def findEventLog(value: String, category: String): Future[Option[EventLogRow]] = + cassandraFinder.findEventLog(value, category).map { + case Some(row) if !row.status.contains(Values.UPP_STATUS_DISABLED) => Some(row) + case _ => None + } def findByPayload(value: String): Future[Option[EventLogRow]] = findEventLog(value, Values.UPP_CATEGORY) @@ -191,6 +194,20 @@ class LookupSpec extends TestBase with EmbeddedCassandra with LazyLogging { |'34376336396166392D336533382D343665652D393063332D616265313364383335353266', null); """.stripMargin + def insertEventSqlWithStatus(status: String): String = + s""" + |INSERT INTO events (id, customer_id, service_class, category, event, event_time, year, month, day, hour, minute, second, milli, signature, nonce, status) + | VALUES ('c29tZSBieXRlcyEAAQIDnw==', 'customer_id', 'service_class', '${Values.UPP_CATEGORY}', '{ + | "hint":0, + | "payload":"c29tZSBieXRlcyEAAQIDnw==", + | "signature":"5aTelLQBerVT/vJiL2qjZCxWxqlfwT/BaID0zUVy7LyUC9nUdb02//aCiZ7xH1HglDqZ0Qqb7GyzF4jtBxfSBg==", + | "signed":"lRKwjni1ymWXEeiBhcg+pwAOTQCwc29tZSBieXRlcyEAAQIDnw==", + | "uuid":"8e78b5ca-6597-11e8-8185-c83ea7000e4d", + | "version":34 + |}', '2019-01-29T17:00:28.333Z', 2019, 5, 2, 19, 439, 16, 0, '0681D35827B17104A2AACCE5A08C4CD1BC8A5EF5EFF4A471D15976693CC0D6D67392F1CACAE63565D6E521D2325A998CDE00A2FEF5B65D0707F4158000EB6D05', + |'34376336396166392D336533382D343665652D393063332D616265313364383335353266', '$status'); + """.stripMargin + val insertLookupSql: String = s""" |INSERT INTO lookups (name, category, key, value) VALUES ('${Signature.value}', '${Values.UPP_CATEGORY}', 'c29tZSBieXRlcyEAAQIDnw==', '5aTelLQBerVT/vJiL2qjZCxWxqlfwT/BaID0zUVy7LyUC9nUdb02//aCiZ7xH1HglDqZ0Qqb7GyzF4jtBxfSBg=='); @@ -301,6 +318,186 @@ class LookupSpec extends TestBase with EmbeddedCassandra with LazyLogging { } + "consume and process successfully when Disabled" in { + + cassandra.executeScripts( + CqlScript.statements( + insertEventSqlWithStatus(Values.UPP_STATUS_DISABLED) + ) + ) + + implicit val kafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig(kafkaPort = PortGiver.giveMeKafkaPort, zooKeeperPort = PortGiver.giveMeZookeeperPort) + + val bootstrapServers = "localhost:" + kafkaConfig.kafkaPort + + val modules: List[Module] = List { + new LookupServiceBinder { + + override def gremlin: ScopedBindingBuilder = bind(classOf[Gremlin]).to(classOf[DefaultTestingGremlinConnector]) + + override def finder: ScopedBindingBuilder = bind(classOf[Finder]).to(classOf[FakeFoundFinder]) + + override def config: ScopedBindingBuilder = bind(classOf[Config]).toProvider(new ConfigProvider { + override def conf: Config = { + super.conf + .withValue( + "eventLog.kafkaConsumer.bootstrapServers", + ConfigValueFactory.fromAnyRef(bootstrapServers) + ) + .withValue( + "eventLog.kafkaProducer.bootstrapServers", + ConfigValueFactory.fromAnyRef(bootstrapServers) + ) + } + }) + } + } + + val injector = new InjectorHelper(modules) {} + + withRunningKafka { + + val messageEnvelopeTopic = "com.ubirch.eventlog.lookup_request" + val eventLogTopic = "com.ubirch.eventlog.lookup_response" + + val key = UUIDHelper.randomUUID.toString + val value = "c29tZSBieXRlcyEAAQIDnw==" + val queryType = Payload + val queryDepth = ShortestPath + val responseForm = AnchorsWithPath + val blockchainInfo = Normal + + val pr = ProducerRecordHelper.toRecord( + messageEnvelopeTopic, + key, + value, + Map( + QueryType.HEADER -> queryType.value, + QueryDepth.HEADER -> queryDepth.value, + ResponseForm.HEADER -> responseForm.value, + BlockchainInfo.HEADER -> blockchainInfo.value + ) + ) + publishToKafka(pr) + + //Consumer + val consumer = injector.get[StringConsumer] + consumer.setTopics(Set(messageEnvelopeTopic)) + + consumer.startPolling() + //Consumer + + Thread.sleep(5000) + + val readMessage = consumeFirstStringMessageFrom(eventLogTopic) + val expected = s"""{"success":true,"message":"Nothing Found","data":{"success":true,"key":"$key","query_type":"payload","message":"Nothing Found","event":null,"anchors":null}}""" + + assert(readMessage == expected) + + } + + } + + "consume and process successfully when Enabled" in { + + cassandra.executeScripts( + CqlScript.statements( + insertEventSqlWithStatus(Values.UPP_STATUS_ENABLED) + ) + ) + + implicit val kafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig(kafkaPort = PortGiver.giveMeKafkaPort, zooKeeperPort = PortGiver.giveMeZookeeperPort) + + val bootstrapServers = "localhost:" + kafkaConfig.kafkaPort + + val modules: List[Module] = List { + new LookupServiceBinder { + + override def gremlin: ScopedBindingBuilder = bind(classOf[Gremlin]).to(classOf[DefaultTestingGremlinConnector]) + + override def finder: ScopedBindingBuilder = bind(classOf[Finder]).to(classOf[FakeFoundFinder]) + + override def config: ScopedBindingBuilder = bind(classOf[Config]).toProvider(new ConfigProvider { + override def conf: Config = { + super.conf + .withValue( + "eventLog.kafkaConsumer.bootstrapServers", + ConfigValueFactory.fromAnyRef(bootstrapServers) + ) + .withValue( + "eventLog.kafkaProducer.bootstrapServers", + ConfigValueFactory.fromAnyRef(bootstrapServers) + ) + } + }) + } + } + + val injector = new InjectorHelper(modules) {} + + withRunningKafka { + + val messageEnvelopeTopic = "com.ubirch.eventlog.lookup_request" + val eventLogTopic = "com.ubirch.eventlog.lookup_response" + + val key = UUIDHelper.randomUUID.toString + val value = "c29tZSBieXRlcyEAAQIDnw==" + val queryType = Payload + val queryDepth = ShortestPath + val responseForm = AnchorsWithPath + val blockchainInfo = Normal + + val pr = ProducerRecordHelper.toRecord( + messageEnvelopeTopic, + key, + value, + Map( + QueryType.HEADER -> queryType.value, + QueryDepth.HEADER -> queryDepth.value, + ResponseForm.HEADER -> responseForm.value, + BlockchainInfo.HEADER -> blockchainInfo.value + ) + ) + publishToKafka(pr) + + //Consumer + val consumer = injector.get[StringConsumer] + consumer.setTopics(Set(messageEnvelopeTopic)) + + consumer.startPolling() + //Consumer + + Thread.sleep(5000) + + val readMessage = consumeFirstStringMessageFrom(eventLogTopic) + + val data = + """ + |{ + | "hint":0, + | "payload":"c29tZSBieXRlcyEAAQIDnw==", + | "signature":"5aTelLQBerVT/vJiL2qjZCxWxqlfwT/BaID0zUVy7LyUC9nUdb02//aCiZ7xH1HglDqZ0Qqb7GyzF4jtBxfSBg==", + | "signed":"lRKwjni1ymWXEeiBhcg+pwAOTQCwc29tZSBieXRlcyEAAQIDnw==", + | "uuid":"8e78b5ca-6597-11e8-8185-c83ea7000e4d", + | "version":34 + |} + """.stripMargin + + val expectedLookup = LookupResult.Found( + key = key, + queryType = queryType, + event = LookupJsonSupport.getJValue(data), + anchors = LookupExecutor.shortestPathAsJValue(FakeFoundFinder.simplePath, FakeFoundFinder.blockchains) + ) + val expectedLookupJValue = LookupJsonSupport.ToJson[LookupResult](expectedLookup).get + val expectedGenericResponse = JValueGenericResponse.Success("Query Successfully Processed", expectedLookupJValue) + val expectedGenericResponseAsJson = LookupJsonSupport.ToJson[JValueGenericResponse](expectedGenericResponse).toString + + assert(readMessage == expectedGenericResponseAsJson) + + } + } + "consume and process successfully when NotFound" in { implicit val kafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig(kafkaPort = PortGiver.giveMeKafkaPort, zooKeeperPort = PortGiver.giveMeZookeeperPort) @@ -479,19 +676,6 @@ class LookupSpec extends TestBase with EmbeddedCassandra with LazyLogging { val readMessage = consumeFirstStringMessageFrom(eventLogTopic) - val data = LookupJsonSupport.getJValue( - """ - |{ - | "hint":0, - | "payload":"c29tZSBieXRlcyEAAQIDnw==", - | "signature":"5aTelLQBerVT/vJiL2qjZCxWxqlfwT/BaID0zUVy7LyUC9nUdb02//aCiZ7xH1HglDqZ0Qqb7GyzF4jtBxfSBg==", - | "signed":"lRKwjni1ymWXEeiBhcg+pwAOTQCwc29tZSBieXRlcyEAAQIDnw==", - | "uuid":"8e78b5ca-6597-11e8-8185-c83ea7000e4d", - | "version":34 - |} - """.stripMargin - ) - assert(readMessage == s"""{"success":true,"message":"Nothing Found","data":{"success":true,"key":"$key","query_type":"signature","message":"Nothing Found","event":null,"anchors":null}}""") }