Skip to content

Commit

Permalink
Java CDK: Destinations emit shorter error message on failed stream st…
Browse files Browse the repository at this point in the history
…atus; allow setting internal message on transient trace messages (#41959)
  • Loading branch information
edgao authored Jul 15, 2024
1 parent 8539527 commit aa3426e
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 12 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
|:-----------|:-----------| :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.41.4 | 2024-07-15 | [\#41959](https://github.com/airbytehq/airbyte/pull/41959) | Allow setting `internal_message` in Config/TransientErrorException. Destinations: shorten error message for INCOMPLETE stream status. |
| 0.41.3 | 2024-07-15 | [\#41680](https://github.com/airbytehq/airbyte/pull/41680) | Fix: CompletableFutures.allOf now handles empty list and `Throwable` |
| 0.41.2 | 2024-07-12 | [\#40567](https://github.com/airbytehq/airbyte/pull/40567) | Fix BaseSqlGenerator test case (generation_id support); update minimum platform version for refreshes support. |
| 0.41.1 | 2024-07-11 | [\#41212](https://github.com/airbytehq/airbyte/pull/41212) | Improve debezium logging. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
*/
package io.airbyte.cdk.integrations.base

import io.airbyte.commons.exceptions.ConfigErrorException
import io.airbyte.commons.exceptions.TransientErrorException
import io.airbyte.commons.stream.AirbyteStreamStatusHolder
import io.airbyte.protocol.models.v0.*
import java.time.Instant
Expand All @@ -16,12 +18,30 @@ object AirbyteTraceMessageUtility {

@JvmStatic
fun emitConfigErrorTrace(e: Throwable, displayMessage: String?) {
emitErrorTrace(e, displayMessage, AirbyteErrorTraceMessage.FailureType.CONFIG_ERROR)
if (e is ConfigErrorException) {
emitErrorTrace(
e,
displayMessage,
AirbyteErrorTraceMessage.FailureType.CONFIG_ERROR,
e.internalMessage,
)
} else {
emitErrorTrace(e, displayMessage, AirbyteErrorTraceMessage.FailureType.CONFIG_ERROR)
}
}

@JvmStatic
fun emitTransientErrorTrace(e: Throwable, displayMessage: String?) {
emitErrorTrace(e, displayMessage, AirbyteErrorTraceMessage.FailureType.TRANSIENT_ERROR)
if (e is TransientErrorException) {
emitErrorTrace(
e,
displayMessage,
AirbyteErrorTraceMessage.FailureType.TRANSIENT_ERROR,
e.internalMessage,
)
} else {
emitErrorTrace(e, displayMessage, AirbyteErrorTraceMessage.FailureType.TRANSIENT_ERROR)
}
}

fun emitCustomErrorTrace(displayMessage: String?, internalMessage: String?) {
Expand Down Expand Up @@ -70,9 +90,10 @@ object AirbyteTraceMessageUtility {
fun emitErrorTrace(
e: Throwable,
displayMessage: String?,
failureType: AirbyteErrorTraceMessage.FailureType
failureType: AirbyteErrorTraceMessage.FailureType,
internalMessage: String = "",
) {
emitMessage(makeErrorTraceAirbyteMessage(e, displayMessage, failureType))
emitMessage(makeErrorTraceAirbyteMessage(e, displayMessage, failureType, internalMessage))
}

@JvmStatic
Expand All @@ -99,15 +120,22 @@ object AirbyteTraceMessageUtility {
fun makeErrorTraceAirbyteMessage(
e: Throwable,
displayMessage: String?,
failureType: AirbyteErrorTraceMessage.FailureType
failureType: AirbyteErrorTraceMessage.FailureType,
internalMessage: String = "",
): AirbyteMessage {
val actualInternalMessage: String =
if (internalMessage.isEmpty()) {
e.toString()
} else {
internalMessage + "\n" + e.toString()
}
return makeAirbyteMessageFromTraceMessage(
makeAirbyteTraceMessage(AirbyteTraceMessage.Type.ERROR)
.withError(
AirbyteErrorTraceMessage()
.withFailureType(failureType)
.withMessage(displayMessage)
.withInternalMessage(e.toString())
.withInternalMessage(actualInternalMessage)
.withStackTrace(ExceptionUtils.getStackTrace(e))
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,17 @@ constructor(
// In this case, it would be misleading to mark the sync as successful, because e.g. we
// maybe didn't commit a truncate.
if (unsuccessfulStreams.isNotEmpty()) {
val unsuccessfulStreamsString =
unsuccessfulStreams.joinToString(", ") { "${it.namespace}.${it.name}" }
val internalMessageString =
"Some streams either received an INCOMPLETE stream status, or did not receive a stream status at all: $unsuccessfulStreamsString"
logger.info { internalMessageString }
// Throw as a "transient" error. This will tell platform to retry the sync,
// but won't trigger any alerting.
throw TransientErrorException(
"Some streams were unsuccessful due to a source error: $unsuccessfulStreams"
displayMessage =
"Some streams were unsuccessful due to a source error. See logs for details.",
internalMessage = internalMessageString,
)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.41.3
version=0.41.4
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,20 @@ package io.airbyte.commons.exceptions
* exception is caught and emits an AirbyteTraceMessage.
*/
class ConfigErrorException : RuntimeException {
val internalMessage: String
val displayMessage: String

constructor(displayMessage: String) : super(displayMessage) {
constructor(displayMessage: String, internalMessage: String = "") : super(displayMessage) {
this.displayMessage = displayMessage
this.internalMessage = internalMessage
}

constructor(displayMessage: String, exception: Throwable?) : super(displayMessage, exception) {
constructor(
displayMessage: String,
exception: Throwable?,
internalMessage: String = ""
) : super(displayMessage, exception) {
this.displayMessage = displayMessage
this.internalMessage = internalMessage
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,17 @@ package io.airbyte.commons.exceptions
* an AirbyteTraceMessage.
*/
class TransientErrorException : RuntimeException {
val internalMessage: String

constructor(displayMessage: String) : super(displayMessage)
constructor(displayMessage: String, internalMessage: String = "") : super(displayMessage) {
this.internalMessage = internalMessage
}

constructor(displayMessage: String, exception: Throwable?) : super(displayMessage, exception)
constructor(
displayMessage: String,
exception: Throwable?,
internalMessage: String = "",
) : super(displayMessage, exception) {
this.internalMessage = internalMessage
}
}

0 comments on commit aa3426e

Please sign in to comment.