diff --git a/p8e-api/src/main/kotlin/io/provenance/engine/service/EventService.kt b/p8e-api/src/main/kotlin/io/provenance/engine/service/EventService.kt index 698bb78..9470c20 100644 --- a/p8e-api/src/main/kotlin/io/provenance/engine/service/EventService.kt +++ b/p8e-api/src/main/kotlin/io/provenance/engine/service/EventService.kt @@ -4,12 +4,15 @@ import com.google.protobuf.util.JsonFormat import io.p8e.proto.Events.P8eEvent import io.p8e.proto.Events.P8eEvent.Event import io.p8e.util.ThreadPoolFactory +import io.p8e.util.or import io.p8e.util.orThrow import io.p8e.util.timed import io.provenance.p8e.shared.extension.logger import io.provenance.engine.domain.EventRecord import io.provenance.engine.domain.EventStatus import io.provenance.engine.domain.EventTable +import io.provenance.p8e.shared.domain.EnvelopeRecord +import io.provenance.p8e.shared.util.P8eMDC import org.jetbrains.exposed.sql.SizedIterable import org.jetbrains.exposed.sql.SortOrder import org.jetbrains.exposed.sql.and @@ -43,7 +46,7 @@ class EventService() { Event.ENVELOPE_FRAGMENT to listOf(Event.ENVELOPE_CHAINCODE, Event.ENVELOPE_ERROR), Event.ENVELOPE_REQUEST to listOf(Event.ENVELOPE_MAILBOX_OUTBOUND), Event.ENVELOPE_MAILBOX_OUTBOUND to listOf(Event.SCOPE_INDEX, Event.SCOPE_INDEX_FRAGMENT), - Event.ENVELOPE_CHAINCODE to listOf(Event.SCOPE_INDEX, Event.SCOPE_INDEX_FRAGMENT, Event.ENVELOPE_ERROR), + Event.ENVELOPE_CHAINCODE to listOf(Event.SCOPE_INDEX, Event.SCOPE_INDEX_FRAGMENT, Event.ENVELOPE_ERROR, Event.ENVELOPE_CHAINCODE), Event.SCOPE_INDEX to listOf(Event.ENVELOPE_RESPONSE), Event.SCOPE_INDEX_FRAGMENT to listOf(Event.ENVELOPE_RESPONSE), Event.ENVELOPE_RESPONSE to listOf(), // end of the line @@ -64,6 +67,8 @@ class EventService() { val validTransitions = VALID_STATE_TRANSITIONS[it.event] if (validTransitions == null || validTransitions.contains(event.event)) { it.update(event, status, created, created).also(::submitToChannel) + } else { + log.info("Skipping invalid event transition of ${it.event} -> ${event.event}") } } ?: EventRecord.insert(event, envelopeUuid, status, created, created).also(::submitToChannel) @@ -201,6 +206,11 @@ class NotificationHandler(private val event: Event) { fun notification(tableEvent: EventRecord) { thread(pool = computeExecutor(tableEvent.event)) { try { + transaction { EnvelopeRecord.findById(tableEvent.envelopeUuid) }?.let { + P8eMDC.set(it, clear = true) + }.or { + log.warn("Failed to find envelope record for MDC (envelopeUuid: ${tableEvent.envelopeUuid}, eventUuid: ${tableEvent.eventUuid.value})") + } when (tableEvent.status) { EventStatus.CREATED -> { log.info("Received CREATED event: [${JsonFormat.printer().print(tableEvent.payload)}") diff --git a/p8e-api/src/test/kotlin/service/EventServiceTest.kt b/p8e-api/src/test/kotlin/service/EventServiceTest.kt index 9ac9ddd..58882ab 100644 --- a/p8e-api/src/test/kotlin/service/EventServiceTest.kt +++ b/p8e-api/src/test/kotlin/service/EventServiceTest.kt @@ -219,4 +219,24 @@ class EventServiceTest { Assert.assertEquals(Event.ENVELOPE_RESPONSE, updatedRecord.event) Assert.assertEquals("some-other-test-message".toByteString(), updatedRecord.payload.message) } + + @Test + fun `Verify ENVELOPE_CHAINCODE to ENVELOPE_CHAINCODE is a valid transition`() { + // CHAINCODE -> CHAINCODE transition for TransactionStatusService.retryDead case + val event = Events.P8eEvent.newBuilder() + .setEvent(Event.ENVELOPE_CHAINCODE) + .setMessage("some-test-message".toByteString()) + .build() + + transaction { EventRecord.insert(event, envelopeRecord.uuid.value) } + + val event2 = Events.P8eEvent.newBuilder() + .setEvent(Event.ENVELOPE_CHAINCODE) + .setMessage("some-other-test-message".toByteString()) + .build() + val updatedRecord = transaction { eventService.submitEvent(event2, envelopeRecord.uuid.value, EventStatus.CREATED, createdTime) } + + // event should be updated, since you can go from ENVELOPE_CHAINCODE -> ENVELOPE_CHAINCODE + Assert.assertEquals("some-other-test-message".toByteString(), updatedRecord.payload.message) + } }