Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CORE-20926: Merging forward updates from release/os/5.2 to release/os/5.3 - 2024-11-28 #6405

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
609cfbf
CORE-19384: Add reset after RebalanceInProgressException (#6181)
emilybowe Jun 14, 2024
03ef673
CORE-20799: Change CommitFailedException classification (#6295)
emilybowe Jul 17, 2024
44fd98d
change error to warn (#6294)
emilybowe Jul 17, 2024
3b05a6c
CORE-20797 Adding handling for CordaMessageAPIProducerRequiresReset e…
ben-millar Jul 23, 2024
104f19e
CORE-20795: Add exception handling and retry to KafkaAdmin and Virtua…
emilybowe Jul 26, 2024
0b98ca0
CORE-20797 Adding CordaMessageAPIProducerRequiresReset exception to E…
ben-millar Jul 29, 2024
47994ec
CORE-20794 Catching ProducerFencedException and rethrowing as a Produ…
ben-millar Jul 29, 2024
37233f6
CORE-20886 stop the registry from being updated when a coordinator is…
LWogan Oct 29, 2024
2434c4a
CORE-20867 Implement retry topic to handle persistent transient RPC C…
LWogan Nov 14, 2024
57b10aa
CORE-20898: bump netty version for vulnerability (#6395)
LWogan Nov 20, 2024
ec9f852
CORE-20847 validate the flow start event size (#6394)
LWogan Nov 20, 2024
575bd65
CORE-20909 validate flow result cannot exceed max message size (#6393)
LWogan Nov 20, 2024
ef977c5
CORE-20907: stop external message api from sending messages that are …
LWogan Nov 20, 2024
26c8a04
CORE-20918: waiver for detekt low severity issue (#6399)
LWogan Nov 20, 2024
31fb9b2
CORE-20887: update jetty version to one without CVE-2024-8184 (#6400)
LWogan Nov 20, 2024
8850133
CORE-20888: add snyk waiver for javalin jetty vulnerability that we a…
LWogan Nov 21, 2024
e10a302
CORE-20819: unirest snyk issue (#6396)
LWogan Nov 21, 2024
8b5e762
Merge remote-tracking branch 'origin/release/os/5.3' into merge-relea…
LWogan Dec 2, 2024
4d0bdef
CORE-XXXX: detekt
LWogan Dec 2, 2024
9c1aca9
CORE-XXXX: update test
LWogan Dec 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions .snyk
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,36 @@ ignore:
OSGi so for now we have to wait for the next one.
expires: 2024-07-31T00:00:00.000Z
created: 2024-04-11T15:11:31.735Z
SNYK-JAVA-ORGJETBRAINSKOTLIN-2393744:
- '*':
reason: >-
Corda5 Shippable artifacts do not make use of detekt-cli, which is
where this dependency originates, this is used at compile / build time
only for static code analysis and not shipped in any of our releasable artifacts.
expires: 2025-11-20T14:30:31.735Z
created: 2024-11-20T14:30:31.735Z
SNYK-JAVA-ORGECLIPSEJETTY-8186141:
- '*':
reason: >-
This project acknowledges the presence of CVE-2024-6763 in the version of Jetty currently used by Javalin.
The vulnerability affects users of Jetty's HttpURI class, which our project does not directly utilize,
nor is it exposed through Javalin in our application context.
The Javalin team has indicated that they do not use HttpURI, and we have verified that our dependency tree presents no indirect
exposure. We will monitor Javalin updates and adopt a release upgrading Jetty to a patched version (≥12.0.12) when feasible.
Given the limited risk, no immediate action is required beyond ongoing dependency monitoring.
Note: there are currently no versions of Javalin released without this issue.
expires: 2025-11-21T14:30:31.735Z
created: 2024-11-21T12:30:31.735Z
SNYK-JAVA-ORGECLIPSEJETTY-8186158:
- '*':
reason: >-
This project acknowledges the presence of CVE-2024-6763 in the version of Jetty currently used by Javalin.
The vulnerability affects users of Jetty's HttpURI class, which our project does not directly utilize,
nor is it exposed through Javalin in our application context.
The Javalin team has indicated that they do not use HttpURI, and we have verified that our dependency tree presents no indirect
exposure. We will monitor Javalin updates and adopt a release upgrading Jetty to a patched version (≥12.0.12) when feasible.
Given the limited risk, no immediate action is required beyond ongoing dependency monitoring.
Note: there are currently no versions of Javalin released without this issue.
expires: 2025-11-21T14:30:31.735Z
created: 2024-11-21T12:30:31.735Z
patch: {}
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,7 @@ object FlowRestExceptionConstants {
const val INVALID_ID = "Supplied clientRequestId %s is invalid, it must conform to the pattern %s."
const val CPI_NOT_FOUND = "Failed to find a CPI for ID = %s."
const val FLOW_STATUS_NOT_FOUND = "Failed to find the flow status for holdingId = %s and clientRequestId = %s."
const val MAX_FLOW_START_ARGS_SIZE = "The flow start payload has exceeded the max allowed payload size. Note: max payload size is set" +
" to half the value of maxAllowedMessageSize."

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package net.corda.flow.rest.impl.v1

import net.corda.avro.serialization.CordaAvroSerializationFactory
import net.corda.cpiinfo.read.CpiInfoReadService
import net.corda.data.flow.FlowKey
import net.corda.data.flow.output.FlowStates
Expand Down Expand Up @@ -41,6 +42,7 @@ import net.corda.rest.messagebus.MessageBusUtils.tryWithExceptionHandling
import net.corda.rest.response.ResponseEntity
import net.corda.rest.security.CURRENT_REST_CONTEXT
import net.corda.schema.Schemas.Flow.FLOW_MAPPER_START
import net.corda.schema.configuration.MessagingConfig
import net.corda.tracing.TraceTag
import net.corda.tracing.addTraceContextToRecord
import net.corda.tracing.trace
Expand Down Expand Up @@ -71,6 +73,8 @@ class FlowRestResourceImpl @Activate constructor(
private val permissionValidationService: PermissionValidationService,
@Reference(service = PlatformInfoProvider::class)
private val platformInfoProvider: PlatformInfoProvider,
@Reference(service = CordaAvroSerializationFactory::class)
private val cordaAvroSerializationFactory: CordaAvroSerializationFactory,
) : FlowRestResource, PluggableRestResource<FlowRestResource>, Lifecycle {

private companion object {
Expand All @@ -82,12 +86,15 @@ class FlowRestResourceImpl @Activate constructor(
override val targetInterface: Class<FlowRestResource> = FlowRestResource::class.java
override val protocolVersion get() = platformInfoProvider.localWorkerPlatformVersion

private val serializer = cordaAvroSerializationFactory.createAvroSerializer<Any>()
private var publisher: Publisher? = null
private var fatalErrorOccurred = false
private lateinit var onFatalError: () -> Unit
private lateinit var messagingConfig: SmartConfig

override fun initialise(config: SmartConfig, onFatalError: () -> Unit) {
this.onFatalError = onFatalError
this.messagingConfig = config
publisher?.close()
publisher = publisherFactory.createPublisher(PublisherConfig("FlowRestResource"), config)
}
Expand Down Expand Up @@ -196,6 +203,13 @@ class FlowRestResourceImpl @Activate constructor(
startFlow.requestBody.escapedJson,
flowContextPlatformProperties
)
val startEventSize = serializer.serialize(startEvent)?.size
val maxAllowedMessageSize = getMaxAllowedMessageSize(messagingConfig)
if (startEventSize != null && startEventSize > maxAllowedMessageSize) {
log.warn(FlowRestExceptionConstants.MAX_FLOW_START_ARGS_SIZE, IllegalArgumentException("Flow start event of size " +
"[$startEventSize] exceeds maxAllowedMessageSize [$maxAllowedMessageSize]"))
throw InvalidInputDataException(FlowRestExceptionConstants.FATAL_ERROR)
}
val status = messageFactory.createStartFlowStatus(clientRequestId, vNode, flowClassName)

val records = listOf(
Expand Down Expand Up @@ -328,4 +342,6 @@ class FlowRestResourceImpl @Activate constructor(
private fun getVirtualNode(holdingIdentityShortHash: String): VirtualNodeInfo {
return virtualNodeInfoReadService.getByHoldingIdentityShortHashOrThrow(holdingIdentityShortHash).toAvro()
}

private fun getMaxAllowedMessageSize(messagingConfig: SmartConfig) = messagingConfig.getLong(MessagingConfig.MAX_ALLOWED_MSG_SIZE)
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package net.corda.flow.rest.impl.v1

import com.typesafe.config.ConfigValueFactory
import net.corda.avro.serialization.CordaAvroSerializationFactory
import net.corda.avro.serialization.CordaAvroSerializer
import net.corda.cpiinfo.read.CpiInfoReadService
import net.corda.crypto.core.SecureHashImpl
import net.corda.data.flow.FlowKey
Expand All @@ -10,6 +13,7 @@ import net.corda.flow.rest.factory.MessageFactory
import net.corda.flow.rest.v1.FlowRestResource
import net.corda.flow.rest.v1.types.request.StartFlowParameters
import net.corda.flow.rest.v1.types.response.FlowStatusResponse
import net.corda.libs.configuration.SmartConfig
import net.corda.libs.configuration.SmartConfigImpl
import net.corda.libs.packaging.core.CordappManifest
import net.corda.libs.packaging.core.CpiIdentifier
Expand All @@ -34,6 +38,7 @@ import net.corda.rest.exception.ResourceNotFoundException
import net.corda.rest.exception.ServiceUnavailableException
import net.corda.rest.security.CURRENT_REST_CONTEXT
import net.corda.rest.security.RestAuthContext
import net.corda.schema.configuration.MessagingConfig
import net.corda.test.util.identity.createTestHoldingIdentity
import net.corda.utilities.MDC_CLIENT_ID
import net.corda.virtualnode.OperationalStatus
Expand All @@ -46,6 +51,7 @@ import org.junit.jupiter.api.assertThrows
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.mockito.kotlin.any
import org.mockito.kotlin.anyOrNull
import org.mockito.kotlin.argumentCaptor
import org.mockito.kotlin.atLeastOnce
import org.mockito.kotlin.doThrow
Expand All @@ -64,6 +70,8 @@ class FlowRestResourceImplTest {

private lateinit var flowStatusLookupService: FlowStatusLookupService
private lateinit var virtualNodeInfoReadService: VirtualNodeInfoReadService
private lateinit var cordaAvroSerializationFactory: CordaAvroSerializationFactory
private lateinit var serializer: CordaAvroSerializer<Any>
private lateinit var publisherFactory: PublisherFactory
private lateinit var messageFactory: MessageFactory
private lateinit var cpiInfoReadService: CpiInfoReadService
Expand Down Expand Up @@ -131,6 +139,11 @@ class FlowRestResourceImplTest {
permissionValidationService = mock()
permissionValidator = mock()
fatalErrorFunction = mock()
cordaAvroSerializationFactory = mock()
serializer = mock()

whenever(cordaAvroSerializationFactory.createAvroSerializer<Any>(anyOrNull())).thenReturn(serializer)
whenever(serializer.serialize(anyOrNull())).thenReturn(byteArrayOf(1,2,3))

val cpiMetadata = getMockCPIMeta()
whenever(cpiInfoReadService.get(any())).thenReturn(cpiMetadata)
Expand Down Expand Up @@ -169,16 +182,17 @@ class FlowRestResourceImplTest {
).thenReturn(true)
}

private fun createFlowRestResource(initialise: Boolean = true): FlowRestResource {
private fun createFlowRestResource(initialise: Boolean = true, messagingConfigParam: SmartConfig = messagingConfig): FlowRestResource {
return FlowRestResourceImpl(
virtualNodeInfoReadService,
flowStatusLookupService,
publisherFactory,
messageFactory,
cpiInfoReadService,
permissionValidationService,
mock()
).apply { if (initialise) (initialise(SmartConfigImpl.empty(), fatalErrorFunction)) }
mock(),
cordaAvroSerializationFactory
).apply { if (initialise) (initialise(messagingConfigParam, fatalErrorFunction)) }
}

@Test
Expand Down Expand Up @@ -350,6 +364,16 @@ class FlowRestResourceImplTest {
}
}

@Test
fun `start flow fails with InvalidInputDataException when payload is too large`() {
val flowRestResource = createFlowRestResource(true, SmartConfigImpl.empty().withValue(MessagingConfig.MAX_ALLOWED_MSG_SIZE,
ConfigValueFactory.fromAnyRef(1)))

assertThrows<InvalidInputDataException> {
flowRestResource.startFlow(VALID_SHORT_HASH, StartFlowParameters(clientRequestId, FLOW1, TestJsonObject()))
}
}

@Test
fun `start flow event fails when not initialized`() {
val flowRestResource = createFlowRestResource(false)
Expand Down Expand Up @@ -587,4 +611,7 @@ class FlowRestResourceImplTest {
flowRestResource.startFlow(VALID_SHORT_HASH, StartFlowParameters("", FLOW1, TestJsonObject()))
}
}

private val messagingConfig = SmartConfigImpl.empty().withValue(MessagingConfig.MAX_ALLOWED_MSG_SIZE, ConfigValueFactory.fromAnyRef
(10000000))
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package net.corda.flow.application.messaging

import net.corda.avro.serialization.CordaAvroSerializationFactory
import net.corda.flow.fiber.FlowFiberService
import net.corda.flow.fiber.FlowIORequest
import net.corda.sandbox.type.UsedByFlow
import net.corda.v5.application.messaging.ExternalMessaging
import net.corda.v5.base.annotations.Suspendable
import net.corda.v5.base.exceptions.CordaRuntimeException
import net.corda.v5.serialization.SingletonSerializeAsToken
import org.osgi.service.component.annotations.Activate
import org.osgi.service.component.annotations.Component
Expand All @@ -16,25 +18,46 @@ import java.util.UUID
@Component(service = [ExternalMessaging::class, UsedByFlow::class], scope = ServiceScope.PROTOTYPE)
class ExternalMessagingImpl(
private val flowFiberService: FlowFiberService,
private val idFactoryFunc: () -> String
private val idFactoryFunc: () -> String,
cordaAvroSerializationFactory: CordaAvroSerializationFactory
) : ExternalMessaging, UsedByFlow, SingletonSerializeAsToken {

private val serializer = cordaAvroSerializationFactory.createAvroSerializer<Any>()

@Activate
constructor(
@Reference(service = FlowFiberService::class)
flowFiberService: FlowFiberService
) : this(flowFiberService, { UUID.randomUUID().toString() })
flowFiberService: FlowFiberService,
@Reference(service = CordaAvroSerializationFactory::class)
cordaAvroSerializationFactory: CordaAvroSerializationFactory
) : this(flowFiberService, { UUID.randomUUID().toString() }, cordaAvroSerializationFactory)

@Suspendable
override fun send(channelName: String, message: String) {
validateSize(message)
send(channelName, idFactoryFunc(), message)
}

private fun validateSize(message: String) {
val bytesSize = serializer.serialize(message)?.size
val maxAllowedMessageSize = maxMessageSize()
if (bytesSize != null && maxAllowedMessageSize < bytesSize) {
throw CordaRuntimeException(
"Cannot send external messaging content as " +
"it exceeds the max message size allowed. Message Size: [$bytesSize], Max Size: [$maxAllowedMessageSize}]"
)
}
}

@Suspendable
override fun send(channelName: String, messageId: String, message: String) {
validateSize(message)

flowFiberService
.getExecutingFiber()
.suspend(FlowIORequest.SendExternalMessage(channelName, messageId, message))
}

private fun maxMessageSize() = flowFiberService.getExecutingFiber().getExecutionContext().flowCheckpoint.maxMessageSize
}

Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package net.corda.flow.external.events.impl

import java.time.Instant
import net.corda.data.flow.event.external.ExternalEvent
import net.corda.data.flow.event.external.ExternalEventResponse
import net.corda.data.flow.state.external.ExternalEventState
import net.corda.flow.external.events.factory.ExternalEventFactory
import net.corda.flow.external.events.factory.ExternalEventRecord
import net.corda.messaging.api.records.Record
import java.time.Duration
import java.time.Instant

/**
* [ExternalEventManager] encapsulates external event behaviour by creating and modifying [ExternalEventState]s.
Expand Down Expand Up @@ -86,4 +86,16 @@ interface ExternalEventManager {
instant: Instant,
retryWindow: Duration
): Pair<ExternalEventState, Record<*, *>?>

/**
* Get the external event to send for the transient error retry scenario.
* Returns the event as is from the state. No additional checks required.
* @param externalEventState The [ExternalEventState] to get the event from.
* @param instant The current time. Used to set timestamp.
* @return The external event request to resend
* */
fun getRetryEvent(
externalEventState: ExternalEventState,
instant: Instant,
): Record<*, *>
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,13 @@ class ExternalEventManagerImpl(
return externalEventState to record
}

override fun getRetryEvent(
externalEventState: ExternalEventState,
instant: Instant,
): Record<*, *> {
return generateRecord(externalEventState, instant)
}

private fun checkRetry(externalEventState: ExternalEventState, instant: Instant, retryWindow: Duration) {
when {
(externalEventState.sendTimestamp + retryWindow) >= instant -> {
Expand Down
Loading