-
Notifications
You must be signed in to change notification settings - Fork 27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CORE-20926: Merging forward updates from release/os/5.2 to release/os/5.3 - 2024-11-28 #6405
CORE-20926: Merging forward updates from release/os/5.2 to release/os/5.3 - 2024-11-28 #6405
Conversation
When lost membership requests were investigated it seems to be a problem that occurs during rebalance in the state and event pattern. After the rebalance the commits that were not successful don't get re-processed and are instead skipped over. As part of handling this, this adds a reset of the event offset position so we try processing again from the correct place after a RebalanceInProgressException.
Change CommitFailedException classification from fatal to transient. This is required as the kafka connection tests exposed that we mark CommitFailedException as fatal and therefore do not retry. A CommitFailedException means we should abort the transaction but by classifying it as fatal we will bubble this up and we will not retry on any level. A CommitFailedException is fatal at the transaction level but not at the worker level so this should be changed.
This issue appeared while running kafka connection tests which kill the kafka broker. When the connection to the kafka broker was lost, ERROR level logs related to CryptoOpsClientImpl appeared several times. This is because there was a failure in executing net.corda.data.crypto.wire.ops.rpc.queries.SupportedSchemesRpcQuery. It has been verified how this is handled in the rest worker: KeyRestResource calls listSchemes and fails at cryptoOpsClient.getSupportedSchemes. The exception handling for listSchemes will return an InternalServerException. This extends HttpApiException and the response code will be 500. As we return an error to the rest client and the system can recover, the log level for the CryptoOpsClient failing the operation should be WARN level instead.
…xception (#6299) This change ensures that the exception `CordaMessageAPIProducerRequiresReset` is correctly handled, by re-raising it to the context which owns the `CordaProducer` and resetting the producer safely. Unit tests have also been added to verify this behaviour.
…lNodeInfoProcessor (#6296) This issue appeared while running kafka connection tests which kill the kafka broker. In a flow worker compacted subscription we got a org.apache.kafka.common.errors.TimeoutException and handled it as fatal. This is because we have no error handling in our KafkaAdmin class for the exceptions that can be thrown by KafkaFuture.get() . The KafkaAdmin client is called from the VirtualNodeInfoProcessor which also does not catch this. This PR adds retry logic to the KafkaAdmin for getTopics and in the case of exhausting these retries, error handling in VirtualNodeInfoProcessor to handle this better and prevent this exception bubbling further.
…xceptionUtils (#6311) This is required so that the exception is correctly handled in the Kafka message patterns.
…cerRequiresReset (#6303) The `ProducerFencedException` in these cases is likely due to an invalid epoch being used in an operation as a result of a timeout on the broker side. The correct action in this case is to reset the producer.
… in status ERROR (#6365) Numerous fixes were added to the 5.2 branch to stop Lifecyle coordinators from traversing from ERROR to UP/DOWN, however 2 use cases were missed. The lifecycle registry is the source of truth about the health of a worker and is inspected directly when the /status and /health REST endpoints are called. It is essential these report correctly when called by the k8s liveness probe to allow for unhealthy workers to be restarted. 1.) when a stop a event is received, the registry is directly updated from within the Lifecyle processor with a new status. This is eagerly blocked here when the current status is ERROR. Separately, within the registry itself any status update from ERROR is also blocked as a last resort. 2.) when a close event is received, the coordinator is removed from the registry. This could mask a worker in ERROR state by removing it from the registry. This is also blocked at the registry level. 3.) Additionally, calling start() on a coordinator in ERROR state would reset it to DOWN. This is also blocked eagerly at the LifeCycleProccessor Level.
…lient errors (#6385) The current mediator messaging pattern in Corda can encounter an retry loop when transient errors are received from other Corda workers. This retry loop blocks flow topic partitions from progressing and it has been observed that the corda cluster affected can become permanently unstable due to the effects of consumer lag. This pattern is used by the flow worker to perform synchronous HTTP calls to various workers, including verification, token, crypto, uniqueness, and persistence workers. To address this issue, a separate Kafka topic is dedicated to handling retries. This will allow the primary ingestion topics to continue processing unaffected flows, while introducing finite retry logic for flows impacted by transient errors. Additionally AVRO version is bumped to fix a vulnerability
The FlowRestResourceImpl uses a transactional producer to send flow start events to the flow mapper. This means if the start event payload requires chunking, the producer will do so and send the event. The client will receive notification that the flow was received with a status of START_REQUESTED. The flow mapper uses an async producer as it is more performant. It will therefore be unable to pass the message to the flow engine and it will throw a fatal exception due to trying to chunk a message with an async producer. This will cause the flow mappers topic partition to become blocked as the fatal exception will result in a retry.
This PR ensures the flow result does not exceed the max allowed message size for payloads sent to the message bus. The flow result object is passed to the message bus via the FlowStatus object. If the message size is too large this will result in a fatal exception in the mediator and the flows topic partition will become blocked as the pattern will retry message after the worker restarts.
…too large (#6392) Currently there is no validation to stop the user from sending a message which exceeds the max allowed message size. It has been observed that this will cause a fatal exception in the mediator message pattern. Subsequently the flow partition with the affected flow will become blocked and unable to progress.
…re not affected by (#6402)
Upgraded following guidance from https://github.com/Kong/unirest-java/blob/main/UPGRADE_GUIDE.md Apache client no longer packaged. Use kotlin HttpClient instead. Gson no longer packaged with unirest. Add it where required. Before v4 the Unirest client uses the GSon Objectmapper by default. In places where we did not override it to use the jackson mapper, i have added the necessary gson dependency. Note: we do set the override to use the jackson mapper in the rest:rest-client modules RemoteClient
Please remember to 'Merge' all forward merges and do not 'Squash and Merge' |
…se/os/5.2-release/os/5.3-2024-11-28-397 # Conflicts: # components/flow/flow-rest-resource-service-impl/src/main/kotlin/net/corda/flow/rest/impl/v1/FlowRestResourceImpl.kt # gradle.properties # gradle/libs.versions.toml # libs/rest/rest-client/src/main/kotlin/net/corda/rest/client/auth/scheme/AuthenticationScheme.kt # libs/rest/rest-client/src/main/kotlin/net/corda/rest/client/auth/scheme/BasicAuthenticationScheme.kt # libs/rest/rest-client/src/main/kotlin/net/corda/rest/client/auth/scheme/BearerTokenAuthenticationScheme.kt # libs/rest/rest-client/src/main/kotlin/net/corda/rest/client/auth/scheme/NoopAuthenticationScheme.kt # libs/rest/rest-client/src/main/kotlin/net/corda/rest/client/connect/remote/RemoteClient.kt # libs/rest/rest-server-impl/src/integrationTest/kotlin/net/corda/rest/server/impl/AbstractWebsocketTest.kt # libs/rest/rest-server-impl/src/integrationTest/kotlin/net/corda/rest/server/impl/RestServerDurableStreamsRequestsTest.kt # libs/rest/rest-server-impl/src/integrationTest/kotlin/net/corda/rest/server/impl/RestServerOpenApiTest.kt # libs/rest/rest-server-impl/src/integrationTest/kotlin/net/corda/rest/server/impl/RestServerRequestsTest.kt # testing/e2e-test-utilities/src/main/kotlin/net/corda/e2etest/utilities/UnirestHttpsClient.kt # tools/corda-runtime-gradle-plugin/src/integrationTest/kotlin/net/corda/gradle/plugin/queries/QueriesTasksTest.kt # tools/corda-runtime-gradle-plugin/src/main/kotlin/net/corda/gradle/plugin/ProjectUtils.kt # tools/corda-runtime-gradle-plugin/src/main/kotlin/net/corda/gradle/plugin/cordalifecycle/EnvironmentSetupHelper.kt # tools/corda-runtime-gradle-plugin/src/main/kotlin/net/corda/gradle/plugin/cordalifecycle/EnvironmentSetupTasks.kt # tools/corda-runtime-gradle-plugin/src/main/kotlin/net/corda/gradle/plugin/cordapp/DeployCpiHelper.kt # tools/corda-runtime-gradle-plugin/src/main/kotlin/net/corda/gradle/plugin/network/VNodeHelper.kt # tools/corda-runtime-gradle-plugin/src/main/kotlin/net/corda/gradle/plugin/queries/QueryTasksImpl.kt
Quality Gate passedIssues Measures |
Jenkins build for PR 6405 build 3 Build Successful: |
This PR was created by the merge bot.
Files with conflicts to resolve manually:
components/flow/flow-rest-resource-service-impl/src/main/kotlin/net/corda/flow/rest/impl/v1/FlowRestResourceImpl.kt
gradle.properties
gradle/libs.versions.toml
libs/rest/rest-client/src/main/kotlin/net/corda/rest/client/auth/scheme/AuthenticationScheme.kt
libs/rest/rest-client/src/main/kotlin/net/corda/rest/client/auth/scheme/BasicAuthenticationScheme.kt
libs/rest/rest-client/src/main/kotlin/net/corda/rest/client/auth/scheme/BearerTokenAuthenticationScheme.kt
libs/rest/rest-client/src/main/kotlin/net/corda/rest/client/auth/scheme/NoopAuthenticationScheme.kt
libs/rest/rest-client/src/main/kotlin/net/corda/rest/client/connect/remote/RemoteClient.kt
libs/rest/rest-server-impl/src/integrationTest/kotlin/net/corda/rest/server/impl/AbstractWebsocketTest.kt
libs/rest/rest-server-impl/src/integrationTest/kotlin/net/corda/rest/server/impl/RestServerDurableStreamsRequestsTest.kt
libs/rest/rest-server-impl/src/integrationTest/kotlin/net/corda/rest/server/impl/RestServerOpenApiTest.kt
libs/rest/rest-server-impl/src/integrationTest/kotlin/net/corda/rest/server/impl/RestServerRequestsTest.kt
testing/e2e-test-utilities/src/main/kotlin/net/corda/e2etest/utilities/UnirestHttpsClient.kt
tools/corda-runtime-gradle-plugin/src/integrationTest/kotlin/net/corda/gradle/plugin/queries/QueriesTasksTest.kt
tools/corda-runtime-gradle-plugin/src/main/kotlin/net/corda/gradle/plugin/ProjectUtils.kt
tools/corda-runtime-gradle-plugin/src/main/kotlin/net/corda/gradle/plugin/cordalifecycle/EnvironmentSetupHelper.kt
tools/corda-runtime-gradle-plugin/src/main/kotlin/net/corda/gradle/plugin/cordalifecycle/EnvironmentSetupTasks.kt
tools/corda-runtime-gradle-plugin/src/main/kotlin/net/corda/gradle/plugin/cordapp/DeployCpiHelper.kt
tools/corda-runtime-gradle-plugin/src/main/kotlin/net/corda/gradle/plugin/network/VNodeHelper.kt
tools/corda-runtime-gradle-plugin/src/main/kotlin/net/corda/gradle/plugin/queries/QueryTasksImpl.kt
Includes: