Skip to content

Commit

Permalink
only allow valid event state transitions (#90)
Browse files Browse the repository at this point in the history
* only allow valid event state transitions

- prevents duplicated events leading to an envelope going from ENVELOPE_RESPONSE back to SCOPE_INDEX for example and the completion event never happening after successful indexing when an affiliate wasn't actively connected

* add state transition entry for ENVELOPE_MAILBOX_OUTBOUND event
  • Loading branch information
piercetrey-figure authored Oct 12, 2021
1 parent d6766f0 commit 033df10
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 35 deletions.
41 changes: 21 additions & 20 deletions p8e-api/src/main/kotlin/io/provenance/engine/domain/Event.kt
Original file line number Diff line number Diff line change
Expand Up @@ -29,33 +29,21 @@ object EventTable : UUIDTable(name = "event", columnName = "uuid") {
open class EventEntityClass : UUIDEntityClass<EventRecord>(EventTable) {
fun findForUpdate(uuid: UUID) = find { EventTable.id eq uuid }.forUpdate().firstOrNull()

fun insertOrUpdate(
fun insert(
event: P8eEvent,
envelopeUuid: UUID,
status: EventStatus = EventStatus.CREATED,
created: OffsetDateTime = OffsetDateTime.now(),
updated: OffsetDateTime = OffsetDateTime.now()
): EventRecord = findByEnvelopeUuidForUpdate(envelopeUuid)?.also {
it.event = event.event
it.payload = event
it.status = status
it.created = created
it.updated = updated
} ?: new(UUID.randomUUID()) {
this.event = event.event
this.payload = event
this.status = status
this.envelopeUuid = envelopeUuid
this.created = created
this.updated = updated
): EventRecord = new(UUID.randomUUID()) {
this.event = event.event
this.payload = event
this.status = status
this.envelopeUuid = envelopeUuid
this.created = created
this.updated = updated
}


fun findByEvent(event: P8eEvent.Event): List<EventRecord> =
find{
(EventTable.event eq event)
}.toList()

fun findForConnectedClients(where: (SqlExpressionBuilder.()-> Op<Boolean>)) = EventTable
.innerJoin(EnvelopeTable)
.join(AffiliateConnectionTable, JoinType.INNER, EnvelopeTable.contractClassname, AffiliateConnectionTable.classname) {
Expand All @@ -77,6 +65,19 @@ class EventRecord(uuid: EntityID<UUID>) : UUIDEntity(uuid) {
var status by EventTable.status
var created by EventTable.created
var updated by EventTable.updated

fun update(
event: P8eEvent,
status: EventStatus = EventStatus.CREATED,
created: OffsetDateTime = OffsetDateTime.now(),
updated: OffsetDateTime = OffsetDateTime.now()
) = apply {
this.event = event.event
this.payload = event
this.status = status
this.created = created
this.updated = updated
}
}

enum class EventStatus {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ class EventService() {
val SKIPPABLE_EVENTS = listOf(Event.UNRECOGNIZED, Event.SCOPE_INDEX_FRAGMENT)
val CONNECTED_CLIENT_EVENTS = listOf(Event.ENVELOPE_REQUEST, Event.ENVELOPE_RESPONSE, Event.ENVELOPE_ERROR)
val UNCONNECTED_CLIENT_EVENTS = Event.values().toList().minus(CONNECTED_CLIENT_EVENTS).minus(SKIPPABLE_EVENTS)

val VALID_STATE_TRANSITIONS = mapOf(
Event.ENVELOPE_FRAGMENT to listOf(Event.ENVELOPE_CHAINCODE, Event.ENVELOPE_ERROR),
Event.ENVELOPE_REQUEST to listOf(Event.ENVELOPE_MAILBOX_OUTBOUND),
Event.ENVELOPE_MAILBOX_OUTBOUND to listOf(Event.SCOPE_INDEX, Event.SCOPE_INDEX_FRAGMENT),
Event.ENVELOPE_CHAINCODE to listOf(Event.SCOPE_INDEX, Event.SCOPE_INDEX_FRAGMENT, Event.ENVELOPE_ERROR),
Event.SCOPE_INDEX to listOf(Event.ENVELOPE_RESPONSE),
Event.SCOPE_INDEX_FRAGMENT to listOf(Event.ENVELOPE_RESPONSE),
Event.ENVELOPE_RESPONSE to listOf(), // end of the line
)
}

fun registerCallback(event: Event, callback: EventHandler) {
Expand All @@ -49,7 +59,13 @@ class EventService() {
}

fun submitEvent(event: P8eEvent, envelopeUuid: UUID, status: EventStatus = EventStatus.CREATED, created: OffsetDateTime = OffsetDateTime.now()): EventRecord =
EventRecord.insertOrUpdate(event, envelopeUuid, status, created, created).also(::submitToChannel)
EventRecord.findByEnvelopeUuidForUpdate(envelopeUuid)
?.also {
val validTransitions = VALID_STATE_TRANSITIONS[it.event]
if (validTransitions == null || validTransitions.contains(event.event)) {
it.update(event, status, created, created).also(::submitToChannel)
}
} ?: EventRecord.insert(event, envelopeUuid, status, created, created).also(::submitToChannel)

fun completeInProgressEvent(envelopeUuid: UUID, expectedEventType: Event) = EventRecord.findByEnvelopeUuidForUpdate(envelopeUuid)
?.takeIf { it.event == expectedEventType }
Expand Down
61 changes: 47 additions & 14 deletions p8e-api/src/test/kotlin/service/EventServiceTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,6 @@ class EventServiceTest {

val key = TestUtils.generateKeyPair()

val eventMock = Events.P8eEvent.newBuilder()
.setEvent(Event.ENVELOPE_REQUEST)
.setMessage(key.public.toHex().toByteString())
.build()

val testData = ContractScope.EnvelopeState.newBuilder()
.setContractClassname("HelloWorldContract")
.build()
Expand Down Expand Up @@ -92,15 +87,6 @@ class EventServiceTest {
status = ContractScope.Envelope.Status.CREATED
scopeUuid = scopeRecord.uuid
}

//Insert data into EventTable
EventTable.insert {
it[payload] = eventMock
it[status] = EventStatus.CREATED
it[event] = eventMock.event
it[envelopeUuid] = envelopeRecord.uuid.value
it[created] = createdTime
}
}

eventService = EventService()
Expand All @@ -111,6 +97,13 @@ class EventServiceTest {
@Test
fun `Test in-progress events are updated to completed`() {
transaction {
val event = Events.P8eEvent.newBuilder()
.setEvent(Event.ENVELOPE_REQUEST)
.setMessage("some-test-message".toByteString())
.build()

EventRecord.insert(event, envelopeRecord.uuid.value)

//Execute
eventService.completeInProgressEvent(envelopeRecord.uuid.value, Event.ENVELOPE_REQUEST)

Expand Down Expand Up @@ -186,4 +179,44 @@ class EventServiceTest {
Assert.assertEquals(EventStatus.CREATED, testErrorEventRecord.status)
Assert.assertEquals(envelopeRecord.uuid.value, testErrorEventRecord.envelopeUuid)
}

@Test
fun `Verify event submission skipped for invalid transition`() {
val event = Events.P8eEvent.newBuilder()
.setEvent(Event.ENVELOPE_RESPONSE)
.setMessage("some-test-message".toByteString())
.build()

transaction { EventRecord.insert(event, envelopeRecord.uuid.value) }

val event2 = Events.P8eEvent.newBuilder()
.setEvent(Event.SCOPE_INDEX)
.setMessage("some-other-test-message".toByteString())
.build()
val updatedRecord = transaction { eventService.submitEvent(event2, envelopeRecord.uuid.value, EventStatus.CREATED, createdTime) }

// event should remain unchanged, since you can't go from ENVELOPE_RESPONSE -> SCOPE_INDEX
Assert.assertEquals(Event.ENVELOPE_RESPONSE, updatedRecord.event)
Assert.assertEquals("some-test-message".toByteString(), updatedRecord.payload.message)
}

@Test
fun `Verify event submission works for a valid transition`() {
val event = Events.P8eEvent.newBuilder()
.setEvent(Event.SCOPE_INDEX)
.setMessage("some-test-message".toByteString())
.build()

transaction { EventRecord.insert(event, envelopeRecord.uuid.value) }

val event2 = Events.P8eEvent.newBuilder()
.setEvent(Event.ENVELOPE_RESPONSE)
.setMessage("some-other-test-message".toByteString())
.build()
val updatedRecord = transaction { eventService.submitEvent(event2, envelopeRecord.uuid.value, EventStatus.CREATED, createdTime) }

// event should be updated, since you can go from SCOPE_INDEX -> ENVELOPE_RESPONSE
Assert.assertEquals(Event.ENVELOPE_RESPONSE, updatedRecord.event)
Assert.assertEquals("some-other-test-message".toByteString(), updatedRecord.payload.message)
}
}

0 comments on commit 033df10

Please sign in to comment.