Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CORE-17768 Topology changes - Use new topics #4931

Merged
Merged
Show file tree
Hide file tree
Changes from 57 commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
44a3635
CORE-17429 State metadata support in message processor (#4828)
mbrkic-r3 Oct 12, 2023
e928edb
CORE-16203 Removed coroutine usage from Multi-Source Event Mediator.
mbrkic-r3 Oct 13, 2023
4956ab7
CORE-16203 Replace State and Event pattern with Multi-Source Event Me…
mbrkic-r3 Oct 12, 2023
b5f29d6
CORE-16203 Set linger.ms to 0 for stateAndEvent producer in kafka-mes…
mbrkic-r3 Oct 14, 2023
6023e7c
CORE-17487 5.1 Performance integration - parallel processing (#4873)
mbrkic-r3 Oct 14, 2023
36e96e5
CORE-17661 5.1 Performance integration - Multi-Source Event Mediator …
mbrkic-r3 Oct 14, 2023
48df404
Merge pull request #4872 from corda/mbrkic-r3/feature/51-perf-integra…
driessamyn Oct 14, 2023
580044a
CORE-16203 5.1 Performance integration - FlowMapper using Multi-Sourc…
mbrkic-r3 Oct 15, 2023
1afb5c2
Merge pull request #4876 from corda/mbrkic-r3/feature/51-perf-integra…
Omar-awad Oct 15, 2023
a201644
CORE-17768 Using new topology Kafka topics (flow.start, flow.session,…
mbrkic-r3 Oct 15, 2023
036fff5
Merge branch 'feature/51-perf-integration' into mbrkic-r3/CORE-17768/…
mbrkic-r3 Oct 15, 2023
ffa5e83
CORE-17562 Metrics for Multi-Source Event Mediator
mbrkic-r3 Oct 15, 2023
1bb2c92
CORE-17562 Fixed unit test
mbrkic-r3 Oct 15, 2023
b5dab4c
Merge branch 'release/os/5.1' into feature/51-perf-integration
driessamyn Oct 16, 2023
ea06ea4
Update API version to fix build (#4882)
driessamyn Oct 16, 2023
1f0669d
Merge branch 'feature/51-perf-integration' into mbrkic-r3/CORE-17562/…
Omar-awad Oct 16, 2023
8542b55
Merge pull request #4879 from corda/mbrkic-r3/CORE-17562/mediator-met…
Omar-awad Oct 16, 2023
47c1f12
CORE-16242 - Synchronous RPC Pattern API Implementation for Crypto Wo…
thiagoviana Oct 16, 2023
ef0afe4
CORE-17822 Apply prefix to topic overrides (#4893)
davidcurrie Oct 17, 2023
0b169e0
CORE-17627: Add metadata to flow mapper states for mapper status (#4886)
JamesHR3 Oct 17, 2023
f6a278a
CORE-17843 Fixed stopping event mediator. Modified to work with even…
mbrkic-r3 Oct 17, 2023
0aff9f5
CORE-17388: Add session timeout metadata to store alongside the check…
JamesHR3 Oct 17, 2023
b88f1cf
CORE-16181 Implementing RPC client, routing external events through R…
ben-millar Oct 17, 2023
e81978f
CORE-16181 Fixed integration test
mbrkic-r3 Oct 17, 2023
28a4895
CORE-16181 Fixed integration test
mbrkic-r3 Oct 17, 2023
f59239e
CORE-16181 Fixed integration test
mbrkic-r3 Oct 17, 2023
19b0879
CORE-16181 Fixed integration test
mbrkic-r3 Oct 17, 2023
5d9a67d
Merge branch 'feature/51-perf-integration' into mbrkic-r3/CORE-17768/…
mbrkic-r3 Oct 17, 2023
f45f313
Merge remote-tracking branch 'origin/mbrkic-r3/CORE-17768/topoligy-ne…
mbrkic-r3 Oct 17, 2023
6e2f759
Merge remote-tracking branch 'origin/feature/51-perf-integration' int…
mbrkic-r3 Oct 17, 2023
921fbd2
CORE-17768 Optimized imports after merge
mbrkic-r3 Oct 17, 2023
061bf89
Merge branch 'feature/51-perf-integration' into mbrkic-r3/CORE-17843/…
mbrkic-r3 Oct 17, 2023
ea96da7
CORE-17626: Add integration test for flow mapper cleanup (#4907)
JamesHR3 Oct 18, 2023
f674a17
CORE-17882: Add a route for flow events (#4915)
JamesHR3 Oct 18, 2023
b30fc1c
Merge remote-tracking branch 'origin/feature/51-perf-integration' int…
mbrkic-r3 Oct 18, 2023
6643999
Merge remote-tracking branch 'origin/mbrkic-r3/CORE-17843/FlowMapperS…
mbrkic-r3 Oct 18, 2023
a141940
Merge branch 'release/os/5.1' into feature/51-perf-integration
driessamyn Oct 18, 2023
adac57c
Merge branch 'feature/51-perf-integration' into mbrkic-r3/CORE-17843/…
mbrkic-r3 Oct 18, 2023
91826e0
CORE-17843 Removed duplicate variable in TestStateManagerFactoryImpl …
mbrkic-r3 Oct 18, 2023
eb0945a
CORE-17768 Disabled smoke test "cluster configuration changes are pic…
mbrkic-r3 Oct 18, 2023
05741a6
CORE-17843 Disabled smoke test "cluster configuration changes are pic…
mbrkic-r3 Oct 18, 2023
48e04eb
Revert "CORE-17768 Disabled smoke test "cluster configuration changes…
mbrkic-r3 Oct 18, 2023
d4d30a8
CORE-17843 Minor fixed based on review feedback.
mbrkic-r3 Oct 18, 2023
6c5c5f2
Merge remote-tracking branch 'origin/feature/51-perf-integration' int…
mbrkic-r3 Oct 18, 2023
8ee72f8
Merge remote-tracking branch 'origin/mbrkic-r3/CORE-17843/FlowMapperS…
mbrkic-r3 Oct 18, 2023
127cfdd
CORE-17768 Fixed compile errors after merge
mbrkic-r3 Oct 18, 2023
c8b9399
CORE-17768 Kafka configuration for performance test
mbrkic-r3 Oct 18, 2023
165f8b1
CORE-17768 Kafka configuration for performance test
mbrkic-r3 Oct 18, 2023
9cefd49
CORE-17768 Fixed bad merge
mbrkic-r3 Oct 18, 2023
5df64b5
Merge remote-tracking branch 'origin/mbrkic-r3/CORE-17768/topoligy-ne…
mbrkic-r3 Oct 19, 2023
04987fd
CORE-17768 Fixed detekt warning
mbrkic-r3 Oct 19, 2023
334f8c8
CORE-17768 Fixed unit test
mbrkic-r3 Oct 19, 2023
5bee79f
CORE-17768 Set API version to 5.1.0.37-alpha-1697718230127
mbrkic-r3 Oct 19, 2023
4c772ad
Merge remote-tracking branch 'origin/mbrkic-r3/CORE-17768/topoligy-ne…
mbrkic-r3 Oct 19, 2023
2542989
CORE-17768 Set API version to 5.1.0.37-alpha-1697725680919
mbrkic-r3 Oct 19, 2023
e3cfac1
Merge remote-tracking branch 'origin/mbrkic-r3/CORE-17768/topoligy-ne…
mbrkic-r3 Oct 19, 2023
d78533b
CORE-17768 Added logging for debugging purposes
mbrkic-r3 Oct 19, 2023
425b66a
Revert "CORE-17768 Added logging for debugging purposes"
mbrkic-r3 Oct 19, 2023
1b83d91
CORE-17768 Set max.poll.records to 100
mbrkic-r3 Oct 20, 2023
81d9d54
Merge remote-tracking branch 'origin/release/os/5.1' into mbrkic-r3/C…
mbrkic-r3 Oct 20, 2023
56c943e
Merge remote-tracking branch 'origin/mbrkic-r3/CORE-17768/topoligy-ne…
mbrkic-r3 Oct 20, 2023
24d2464
CORE-17768 Routing FlowEvents that FlowEngine sends out to itself to …
mbrkic-r3 Oct 20, 2023
3ac5843
Merge remote-tracking branch 'origin/mbrkic-r3/CORE-17768/topoligy-ne…
mbrkic-r3 Oct 20, 2023
eb3dcd7
CORE-17768 Fixed failing FlowEventMediatorFactoryImplTest
mbrkic-r3 Oct 20, 2023
c9ac559
Merge remote-tracking branch 'origin/mbrkic-r3/CORE-17768/topoligy-ne…
mbrkic-r3 Oct 20, 2023
2d7c0e7
CORE-17768 Reverted number of mediator threads to default value. Set …
mbrkic-r3 Oct 24, 2023
bafeed3
CORE-17768 Fixed unit test
mbrkic-r3 Oct 24, 2023
a54e269
CORE-17768 Using beta version of API
mbrkic-r3 Oct 24, 2023
1819520
Merge remote-tracking branch 'origin/release/os/5.1' into mbrkicr3/CO…
mbrkic-r3 Oct 25, 2023
05b237d
Merge remote-tracking branch 'origin/release/os/5.1' into mbrkicr3/CO…
mbrkic-r3 Oct 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import net.corda.libs.packaging.core.CpkMetadata
import net.corda.messaging.api.records.Record
import net.corda.osgi.api.Application
import net.corda.osgi.api.Shutdown
import net.corda.schema.Schemas.Flow.FLOW_EVENT_TOPIC
import net.corda.schema.Schemas.Flow.FLOW_START
import net.corda.schema.configuration.ConfigKeys.FLOW_CONFIG
import net.corda.schema.configuration.FlowConfig.PROCESSING_FLOW_CLEANUP_TIME
import net.corda.schema.configuration.FlowConfig.PROCESSING_MAX_FLOW_SLEEP_DURATION
Expand Down Expand Up @@ -172,11 +172,11 @@ class CordaVNode @Activate constructor(

val rpcStartFlow = createRPCStartFlow(clientId, vnodeInfo.toAvro())
val flowId = generateRandomId()
val record = Record(FLOW_EVENT_TOPIC, flowId, FlowEvent(flowId, rpcStartFlow))
val record = Record(FLOW_START, flowId, FlowEvent(flowId, rpcStartFlow))
flowEventProcessorFactory.create(mapOf(FLOW_CONFIG to smartConfig)).apply {
val result = onNext(null, record)
result.responseEvents.singleOrNull { evt ->
evt.topic == FLOW_EVENT_TOPIC
evt.topic == FLOW_START
}?.also { evt ->
@Suppress("unchecked_cast")
onNext(result.updatedState, evt as Record<String, FlowEvent>)
Expand Down
2 changes: 1 addition & 1 deletion charts/corda-lib/templates/_worker.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ metadata:
spec:
type: ClusterIP
selector:
app: {{ $workerName }}
app.kubernetes.io/component: {{ include "corda.workerComponent" $worker }}
ports:
- protocol: TCP
port: {{ include "corda.workerServicePort" . }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ import kotlin.test.assertTrue
)
).thenReturn(
Record(
Schemas.Flow.FLOW_EVENT_TOPIC,
Schemas.Flow.FLOW_SESSION,
flowExternalEventContexts.get(it).flowId,
FlowEvent()
)
Expand All @@ -269,7 +269,7 @@ import kotlin.test.assertTrue
)
).thenReturn(
Record(
Schemas.Flow.FLOW_EVENT_TOPIC,
Schemas.Flow.FLOW_SESSION,
flowExternalEventContexts.get(it).flowId,
FlowEvent()
)
Expand All @@ -281,7 +281,7 @@ import kotlin.test.assertTrue
)
).thenReturn(
Record(
Schemas.Flow.FLOW_EVENT_TOPIC,
Schemas.Flow.FLOW_SESSION,
flowExternalEventContexts.get(it).flowId,
FlowEvent()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import net.corda.flow.mapper.impl.executor.SessionEventExecutor
import net.corda.flow.mapper.impl.executor.SessionInitProcessor
import net.corda.flow.mapper.impl.executor.StartFlowExecutor
import net.corda.libs.configuration.SmartConfig
import net.corda.schema.Schemas.Flow.FLOW_EVENT_TOPIC
import net.corda.schema.Schemas.Flow.FLOW_START
import org.osgi.service.component.annotations.Activate
import org.osgi.service.component.annotations.Component
import org.osgi.service.component.annotations.Reference
Expand Down Expand Up @@ -66,7 +66,7 @@ class FlowMapperEventExecutorFactoryImpl @Activate constructor(
}
}

is StartFlow -> StartFlowExecutor(eventKey, FLOW_EVENT_TOPIC, flowMapperEventPayload, state)
is StartFlow -> StartFlowExecutor(eventKey, FLOW_START, flowMapperEventPayload, state)
is ExecuteCleanup -> ExecuteCleanupEventExecutor(eventKey)
is ScheduleCleanup -> ScheduleCleanupEventExecutor(eventKey, flowMapperEventPayload, state)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ class RecordFactoryImpl @Activate constructor(

private fun getSessionEventOutputTopic(sessionEvent: SessionEvent): String {
return when (sessionEvent.messageDirection) {
MessageDirection.INBOUND -> Schemas.Flow.FLOW_EVENT_TOPIC
MessageDirection.INBOUND -> Schemas.Flow.FLOW_SESSION
MessageDirection.OUTBOUND -> {
if (isLocalCluster(sessionEvent)) {
Schemas.Flow.FLOW_MAPPER_EVENT_TOPIC
Schemas.Flow.FLOW_MAPPER_SESSION_IN
} else {
Schemas.P2P.P2P_OUT_TOPIC
}
Expand All @@ -120,8 +120,8 @@ class RecordFactoryImpl @Activate constructor(
) : Record<*, *> {
val outputTopic = getSessionEventOutputTopic(sourceEvent)
val (newDirection, sessionId) = when (outputTopic) {
Schemas.Flow.FLOW_MAPPER_EVENT_TOPIC -> Pair(MessageDirection.INBOUND, toggleSessionId(sourceEvent.sessionId))
Schemas.Flow.FLOW_EVENT_TOPIC -> Pair(MessageDirection.INBOUND, sourceEvent.sessionId)
Schemas.Flow.FLOW_MAPPER_SESSION_IN -> Pair(MessageDirection.INBOUND, toggleSessionId(sourceEvent.sessionId))
Schemas.Flow.FLOW_SESSION -> Pair(MessageDirection.INBOUND, sourceEvent.sessionId)
else -> Pair(MessageDirection.OUTBOUND, sourceEvent.sessionId)
}
val sequenceNumber = if (newPayload is SessionError) null else sourceEvent.sequenceNum
Expand All @@ -136,14 +136,14 @@ class RecordFactoryImpl @Activate constructor(
sourceEvent.contextSessionProperties
)
return when (outputTopic) {
Schemas.Flow.FLOW_EVENT_TOPIC -> {
Schemas.Flow.FLOW_SESSION -> {
if (flowId == null) {
throw IllegalArgumentException("Flow ID is required to forward an event back to the flow event" +
"topic, but it was not provided.")
}
Record(outputTopic, flowId, FlowEvent(flowId, sessionEvent))
}
Schemas.Flow.FLOW_MAPPER_EVENT_TOPIC -> {
Schemas.Flow.FLOW_MAPPER_SESSION_IN -> {
Record(outputTopic, sessionEvent.sessionId, FlowMapperEvent(sessionEvent))
}
Schemas.P2P.P2P_OUT_TOPIC -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import org.mockito.kotlin.anyOrNull
import org.mockito.kotlin.mock
import org.mockito.kotlin.verify
import org.mockito.kotlin.whenever
import java.lang.IllegalArgumentException
import java.nio.ByteBuffer
import java.time.Instant

Expand Down Expand Up @@ -98,7 +97,7 @@ internal class RecordFactoryImplTest {
"my-flow-id"
)
assertThat(record).isNotNull
assertThat(record.topic).isEqualTo(Schemas.Flow.FLOW_MAPPER_EVENT_TOPIC)
assertThat(record.topic).isEqualTo(Schemas.Flow.FLOW_MAPPER_SESSION_IN)
assertThat(record.value!!::class).isEqualTo(FlowMapperEvent::class)
verify(locallyHostedIdentitiesServiceSameCluster).isHostedLocally(bobId.toCorda())
val sessionOutput = (record.value as FlowMapperEvent).payload as SessionEvent
Expand Down Expand Up @@ -165,7 +164,7 @@ internal class RecordFactoryImplTest {
flowConfig,
FLOW_ID
)
assertThat(record.topic).isEqualTo(Schemas.Flow.FLOW_EVENT_TOPIC)
assertThat(record.topic).isEqualTo(Schemas.Flow.FLOW_SESSION)
assertThat(record.key).isEqualTo(FLOW_ID)
assertThat(record.value!!::class.java).isEqualTo(FlowEvent::class.java)
val sessionOutput = (record.value as FlowEvent).payload as SessionEvent
Expand Down Expand Up @@ -194,7 +193,7 @@ internal class RecordFactoryImplTest {
FLOW_ID
)
assertThat(record).isNotNull
assertThat(record.topic).isEqualTo(Schemas.Flow.FLOW_MAPPER_EVENT_TOPIC)
assertThat(record.topic).isEqualTo(Schemas.Flow.FLOW_MAPPER_SESSION_IN)
assertThat(record.value!!::class).isEqualTo(FlowMapperEvent::class)
val sessionOutput = (record.value as FlowMapperEvent).payload as SessionEvent
assertThat(sessionOutput.sessionId).isEqualTo("$SESSION_ID-INITIATED")
Expand Down Expand Up @@ -249,7 +248,7 @@ internal class RecordFactoryImplTest {
flowConfig,
FLOW_ID
)
assertThat(record.topic).isEqualTo(Schemas.Flow.FLOW_EVENT_TOPIC)
assertThat(record.topic).isEqualTo(Schemas.Flow.FLOW_SESSION)
assertThat(record.key).isEqualTo(FLOW_ID)
assertThat(record.value!!::class.java).isEqualTo(FlowEvent::class.java)
val sessionOutput = (record.value as FlowEvent).payload as SessionEvent
Expand Down Expand Up @@ -316,7 +315,7 @@ internal class RecordFactoryImplTest {
timestamp,
flowConfig,
)
assertThat(record.topic).isEqualTo(Schemas.Flow.FLOW_MAPPER_EVENT_TOPIC)
assertThat(record.topic).isEqualTo(Schemas.Flow.FLOW_MAPPER_SESSION_IN)
assertThat(record.key).isEqualTo("$SESSION_ID-INITIATED")
assertThat(record.value!!::class).isEqualTo(FlowMapperEvent::class)
val sessionOutput = (record.value as FlowMapperEvent).payload as SessionEvent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class SessionInitProcessorTest {
flowId: String
): Record<*, *> {
return if (sourceEvent.messageDirection == MessageDirection.INBOUND) {
Record(Schemas.Flow.FLOW_EVENT_TOPIC, flowId, FlowEvent(flowId, sourceEvent))
Record(Schemas.Flow.FLOW_SESSION, flowId, FlowEvent(flowId, sourceEvent))
} else {
Record(Schemas.P2P.P2P_OUT_TOPIC, "sessionId", "")
}
Expand Down Expand Up @@ -79,7 +79,7 @@ class SessionInitProcessorTest {

Assertions.assertThat(outboundEvents.size).isEqualTo(1)
val outboundEvent = outboundEvents.first()
Assertions.assertThat(outboundEvent.topic).isEqualTo(Schemas.Flow.FLOW_EVENT_TOPIC)
Assertions.assertThat(outboundEvent.topic).isEqualTo(Schemas.Flow.FLOW_SESSION)
Assertions.assertThat(outboundEvent.key::class).isEqualTo(String::class)
Assertions.assertThat(outboundEvent.value!!::class).isEqualTo(FlowEvent::class)
Assertions.assertThat(payload.sessionId).isEqualTo("sessionId-INITIATED")
Expand Down
Loading