Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry pick/ung 1600 #67

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import com.ubirch.util._
import javax.inject._
import org.apache.kafka.clients.consumer.ConsumerRecord

import scala.concurrent.duration._
import scala.concurrent.duration.DurationInt
import scala.concurrent.{ ExecutionContext, Future }
import scala.language.postfixOps
import scala.util.{ Failure, Success, Try }
Expand Down
5 changes: 3 additions & 2 deletions event-log-core/src/main/resources/db/keyspace.cql
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ create table if not exists events (
milli int,
event_time timestamp,
nonce text,
PRIMARY KEY ((id, category), year, month, day, hour)
) WITH CLUSTERING ORDER BY (year desc, month DESC, day DESC);
status text,
PRIMARY KEY ((id, category), year, month, day, hour, minute, second, milli)
) WITH CLUSTERING ORDER BY (year DESC, month DESC, day DESC, hour DESC, minute DESC, second DESC, milli DESC);
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
/* Cassandra migration for keyspace event_log.
Version 8 - 2021-03-18T08:29:29.223675+00:00

Adding status */

ALTER TABLE events ADD status text;
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ trait EventLogQueries extends TablePointer[EventLogRow] with CustomEncodings[Eve

def selectAllQ: db.Quoted[db.EntityQuery[EventLogRow]] = quote(query[EventLogRow])

def byIdAndCatQ(id: String, category: String) = quote {
def byIdAndCatQ(id: String, category: String): Quoted[EntityQuery[EventLogRow]] = quote {
query[EventLogRow].filter(x => x.id == lift(id) && x.category == lift(category)).map(x => x)
}

Expand Down Expand Up @@ -109,4 +109,22 @@ class EventsDAO @Inject() (val events: Events, val lookups: Lookups)(implicit va
def deleteFromEventLog(eventLog: EventLog): Future[Int] = {
events.delete(EventLogRow.fromEventLog(eventLog)).map(_ => 1)
}

def updateFromEventLog(eventLog: EventLog): Future[Int] = {
events.byIdAndCat(eventLog.id, Values.UPP_CATEGORY).flatMap { rows =>
Future.sequence(rows.map {
row =>
eventLog.category match {
case Values.UPP_ENABLE_CATEGORY =>
val updated = row.copy(status = Some(Values.UPP_STATUS_ENABLED))
events.insert(updated)
case _ =>
val updated = row.copy(status = Some(Values.UPP_STATUS_DISABLED))
events.insert(updated)
}

}).map(_ => 1)
}
}

}
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package com.ubirch.models

import java.util.Date

import org.json4s.JValue

import java.util.Date

/**
* Concrete type for the EventLogBase whose type T is JValue
*
* @param id String that identifies the EventLog. It can be a hash or a UUID or anything unique
* @param customerId Represents an id for a customer id.
* @param serviceClass Represents the name from where the log comes.
* E.G: The name of the class.
* E.G: The name of the class.
* @param category Represents the category for the event. This is useful for
* adding layers of description to the event.
* @param event Represents the event that is to be recorded.
Expand All @@ -21,6 +21,9 @@ import org.json4s.JValue
* to support its creation from the eventTime.
* @param signature Represents the signature for the event log.
* @param nonce Represents a value that can be used to calculate the hash of the event.
* @param status Represents the status of the EventlogRow. (At the moment only used for UPP categories.
* If it has become disabled, it shouldn't be possible to verify the UPP). It's value might
* be either NONE, Some(ENABLED) or Some(DISABLED). None is equivalent to Some(ENABLED).
*/

case class EventLogRow(
Expand All @@ -32,7 +35,8 @@ case class EventLogRow(
eventTime: Date,
eventTimeInfo: TimeInfo,
signature: String,
nonce: String
nonce: String,
status: Option[String] = None
)

object EventLogRow {
Expand Down
2 changes: 2 additions & 0 deletions event-log-core/src/main/scala/com/ubirch/models/Values.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ object Values {
val UPP_DISABLE_CATEGORY = "UPP_DISABLE"
val UPP_ENABLE_CATEGORY = "UPP_ENABLE"
val UPP_DELETE_CATEGORY = "UPP_DELETE"
val UPP_STATUS_DISABLED = "DISABLED"
val UPP_STATUS_ENABLED = "ENABLED"
val PUB_KEY_CATEGORY = "PUB_KEY"
val CHAIN_CATEGORY = "CHAIN"
val DEVICE_CATEGORY = "DEVICE"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,9 @@ class LoggerExecutor @Inject() (

private def processEventLog(eventLog: EventLog): Future[Int] = {
eventLog.category match {
case Values.UPP_ENABLE_CATEGORY =>
logger.warn(s"it has not been implemented yet. category: ${eventLog.category}")
Future.successful(0)
case Values.UPP_DISABLE_CATEGORY =>
logger.warn(s"it has not been implemented yet. category: ${eventLog.category}")
Future.successful(0)
case Values.UPP_ENABLE_CATEGORY | Values.UPP_DISABLE_CATEGORY =>
logger.info(s"updating eventlog with id: ${eventLog.id} to status: ${eventLog.category}")
events.updateFromEventLog(eventLog)
case Values.UPP_DELETE_CATEGORY =>
logger.info(s"delete event log. id: ${eventLog.id}")
// the target UPP that is deleted should have an UPP category
Expand Down
93 changes: 91 additions & 2 deletions event-log-core/src/test/scala/com/ubirch/EventLogSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,94 @@ class EventLogSpec extends TestBase with EmbeddedCassandra with LazyLogging {
assert(res1.isEmpty)

}
}

"consume messages and disable UPP in cassandra" in {

implicit val kafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig(kafkaPort = PortGiver.giveMeKafkaPort, zooKeeperPort = PortGiver.giveMeZookeeperPort)

val InjectorHelper = new InjectorHelperImpl("localhost:" + kafkaConfig.kafkaPort)

val config = InjectorHelper.get[Config]

val entities = (0 to 5).map(_ => Entities.Events.eventExample(UUIDHelper.randomUUID, Values.UPP_CATEGORY)
.sign(config)
.withCustomerId(UUIDHelper.randomUUID)).toList

val eventsDAO = InjectorHelper.get[EventsDAO]

entities.map(el => await(eventsDAO.insertFromEventLogWithoutLookups(el), 2 seconds))
val all = await(eventsDAO.events.selectAll, 2 seconds)

assert(all.size == entities.size)

withRunningKafka {

//Publish updates
val topic = "com.ubirch.eventlog"
val entitiesAsString = entities.map(_.copy(category = Values.UPP_DISABLE_CATEGORY)).map(_.toJson)
entitiesAsString.foreach { entityAsString =>
publishStringMessageToKafka(topic, entityAsString)
}

//Consumer
val consumer = InjectorHelper.get[StringConsumer]
consumer.startPolling()
Thread.sleep(10000)

//Read Events
val events = InjectorHelper.get[Events]

def res: List[EventLogRow] = await(events.selectAll, 2 seconds)

assert(res.size == entities.size)
res.map(row => assert(row.status.contains(Values.UPP_STATUS_DISABLED)))
}

}

"consume messages and enable UPP in cassandra" in {

implicit val kafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig(kafkaPort = PortGiver.giveMeKafkaPort, zooKeeperPort = PortGiver.giveMeZookeeperPort)

val InjectorHelper = new InjectorHelperImpl("localhost:" + kafkaConfig.kafkaPort)

val config = InjectorHelper.get[Config]

val entities = (0 to 5).map(_ => Entities.Events.eventExample(UUIDHelper.randomUUID, Values.UPP_CATEGORY)
.sign(config)
.withCustomerId(UUIDHelper.randomUUID)).toList

val eventsDAO = InjectorHelper.get[EventsDAO]

entities.map(el => await(eventsDAO.insertFromEventLogWithoutLookups(el), 2 seconds))
val all = await(eventsDAO.events.selectAll, 2 seconds)

assert(all.size == entities.size)

withRunningKafka {

//Publish updates
val topic = "com.ubirch.eventlog"
val entitiesAsString = entities.map(_.copy(category = Values.UPP_ENABLE_CATEGORY)).map(_.toJson)
entitiesAsString.foreach { entityAsString =>
publishStringMessageToKafka(topic, entityAsString)
}

//Consumer
val consumer = InjectorHelper.get[StringConsumer]
consumer.startPolling()
Thread.sleep(10000)

//Read Events
val events = InjectorHelper.get[Events]

def res: List[EventLogRow] = await(events.selectAll, 2 seconds)

//Verify
assert(res.size == entities.size)
res.map(row => assert(row.status.contains(Values.UPP_STATUS_ENABLED)))
}

}

Expand Down Expand Up @@ -616,8 +704,9 @@ class EventLogSpec extends TestBase with EmbeddedCassandra with LazyLogging {
| milli int,
| event_time timestamp,
| nonce text,
| PRIMARY KEY ((id, category), year, month, day, hour)
|) WITH CLUSTERING ORDER BY (year desc, month DESC, day DESC);
| status text,
| PRIMARY KEY ((id, category), year, month, day, hour, minute, second, milli)
|) WITH CLUSTERING ORDER BY (year DESC, month DESC, day DESC, hour DESC, minute DESC, second DESC, milli DESC);
""".stripMargin,
"drop table if exists lookups;",
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.ubirch.models

import java.util.Date

import com.github.nosan.embedded.cassandra.cql.CqlScript
import com.typesafe.scalalogging.LazyLogging
import com.ubirch.util.{ InjectorHelper, UUIDHelper }
Expand All @@ -10,7 +8,8 @@ import io.prometheus.client.CollectorRegistry
import org.json4s.JValue
import org.json4s.jackson.JsonMethods.parse

import scala.concurrent.duration._
import java.util.Date
import scala.concurrent.duration.DurationInt
import scala.language.postfixOps

class EventDAOLogSpec extends TestBase with EmbeddedCassandra with LazyLogging {
Expand Down Expand Up @@ -102,6 +101,90 @@ class EventDAOLogSpec extends TestBase with EmbeddedCassandra with LazyLogging {

}

"eventLogRowEnable" in {
val data: JValue = parse(""" { "id" : [1, 2, 3, 4] } """)

def date = new Date()

val key = UUIDHelper.timeBasedUUID.toString
val value = UUIDHelper.timeBasedUUID.toString
val category = Values.UPP_CATEGORY
val lookupName = "lookupname"

val event = {
EventLog(data)
.withNewId(key)
.withCustomerId("my customer id")
.withCategory(category)
.withServiceClass("my service class")
.withEventTime(date)
.withSignature("my signature")
.withNonce("my nonce")
.withHeaders("hola" -> "Hello", "hey" -> "como estas")
.addLookupKeys(LookupKey(lookupName, category, key.asKey, Seq(value.asValue)))
}

val eventsDAO = InjectorHelper.get[EventsDAO]

await(eventsDAO.insertFromEventLog(event), 2 seconds)

val foundEvent = await(eventsDAO.eventLogRowByLookupRowInfo(value, lookupName, category), 2 seconds)

val eventRow = EventLogRow.fromEventLog(event)
assert(foundEvent.isDefined)
assert(eventRow == foundEvent.get)

await(eventsDAO.events.insert(foundEvent.head.copy(status = Some(Values.UPP_STATUS_ENABLED))), 2 seconds)
val updatedEvent = await(eventsDAO.eventLogRowByLookupRowInfo(value, lookupName, category), 2 seconds)

assert(updatedEvent.nonEmpty)
assert(updatedEvent.head.status.contains(Values.UPP_STATUS_ENABLED))

assert(updatedEvent.contains(eventRow.copy(status = Some(Values.UPP_STATUS_ENABLED))))
}

"eventLogRowDisable" in {
val data: JValue = parse(""" { "id" : [1, 2, 3, 4] } """)

def date = new Date()

val key = UUIDHelper.timeBasedUUID.toString
val value = UUIDHelper.timeBasedUUID.toString
val category = Values.UPP_CATEGORY
val lookupName = "lookupname"

val event = {
EventLog(data)
.withNewId(key)
.withCustomerId("my customer id")
.withCategory(category)
.withServiceClass("my service class")
.withEventTime(date)
.withSignature("my signature")
.withNonce("my nonce")
.withHeaders("hola" -> "Hello", "hey" -> "como estas")
.addLookupKeys(LookupKey(lookupName, category, key.asKey, Seq(value.asValue)))
}

val eventsDAO = InjectorHelper.get[EventsDAO]

await(eventsDAO.insertFromEventLog(event), 2 seconds)

val foundEvent = await(eventsDAO.eventLogRowByLookupRowInfo(value, lookupName, category), 2 seconds)

val eventRow = EventLogRow.fromEventLog(event)
assert(foundEvent.isDefined)
assert(eventRow == foundEvent.get)

await(eventsDAO.events.insert(foundEvent.head.copy(status = Some(Values.UPP_STATUS_DISABLED))), 2 seconds)
val updatedEvent = await(eventsDAO.eventLogRowByLookupRowInfo(value, lookupName, category), 2 seconds)

assert(updatedEvent.nonEmpty)
assert(updatedEvent.head.status.contains(Values.UPP_STATUS_DISABLED))

assert(updatedEvent.contains(eventRow.copy(status = Some(Values.UPP_STATUS_DISABLED))))
}

"eventLogRowByLookupValueAndCategory" in {
val data: JValue = parse(""" { "id" : [1, 2, 3, 4] } """)

Expand Down Expand Up @@ -350,7 +433,7 @@ class EventDAOLogSpec extends TestBase with EmbeddedCassandra with LazyLogging {
events.map(e => await(eventsDAO.deleteFromEventLog(e), 2 seconds))

val allAfterDeleted = await(eventsDAO.events.selectAll, 2 seconds)
assert(allAfterDeleted.size == 0)
assert(allAfterDeleted.isEmpty)
}

}
Expand Down Expand Up @@ -385,8 +468,9 @@ class EventDAOLogSpec extends TestBase with EmbeddedCassandra with LazyLogging {
| milli int,
| event_time timestamp,
| nonce text,
| PRIMARY KEY ((id, category), year, month, day, hour)
|) WITH CLUSTERING ORDER BY (year desc, month DESC, day DESC);
| status text,
| PRIMARY KEY ((id, category), year, month, day, hour, minute, second, milli)
|) WITH CLUSTERING ORDER BY (year DESC, month DESC, day DESC, hour DESC, minute DESC, second DESC, milli DESC);
""".stripMargin,
"drop table if exists lookups;",
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito._
import org.scalatest.mockito.MockitoSugar

import scala.concurrent.duration._
import scala.concurrent.duration.DurationInt
import scala.concurrent.{ Future, Promise }
import scala.language.{ implicitConversions, postfixOps }

Expand Down
Loading