From 8a5e994875a32a546c20f8696f98b24859459086 Mon Sep 17 00:00:00 2001 From: Dan Farrelly Date: Tue, 23 Jul 2024 18:29:38 -0400 Subject: [PATCH] Fix inngest.send. Add Event Builder --- inngest/src/main/kotlin/com/inngest/Event.kt | 83 +++++++++++++++++-- .../src/main/kotlin/com/inngest/Inngest.kt | 55 +++++++++++- inngest/src/main/kotlin/com/inngest/Step.kt | 29 ++----- 3 files changed, 134 insertions(+), 33 deletions(-) diff --git a/inngest/src/main/kotlin/com/inngest/Event.kt b/inngest/src/main/kotlin/com/inngest/Event.kt index 5a98242f..0ff5a42b 100644 --- a/inngest/src/main/kotlin/com/inngest/Event.kt +++ b/inngest/src/main/kotlin/com/inngest/Event.kt @@ -1,6 +1,9 @@ package com.inngest -data class Event( +/** + * An internal class used for parsing events sent to Inngest functions + */ +internal data class Event( val id: String, val name: String, val data: LinkedHashMap, @@ -9,7 +12,77 @@ data class Event( val v: Any? = null, ) -// data class EventAPIResponse( -// val ids: Array, -// val status: String, -// ) +/** + * Create an event to send to Inngest + */ +data class InngestEvent( + val id: String?, + val name: String, + val data: Any, + val user: Any?, + val ts: Long?, + val v: String? = null, +) + +/** + * Construct a new Inngest Event via builder + */ +class InngestEventBuilder( + var id: String?, + var name: String?, + var data: Any?, + private var user: Any?, + private var ts: Long?, + private var v: String? = null, +) { + fun id(id: String): InngestEventBuilder { + this.id = id + return this + } + + fun name(name: String): InngestEventBuilder { + this.name = name + return this + } + + fun data(data: Any): InngestEventBuilder { + this.data = data + return this + } + + fun ts(ts: Long): InngestEventBuilder { + this.ts = ts + return this + } + + fun v(v: String): InngestEventBuilder { + this.v = v + return this + } + + fun build(): InngestEvent { + if (name == null) { + throw IllegalArgumentException("name is required") + } + if (data == null) { + throw IllegalArgumentException("data is required") + } + return InngestEvent( + id, + name!!, + data!!, + user, + ts, + v, + ) + } +} + +/** + * The response from the Inngest Event API including the ids of any event created + * in the order of which they were included in the request + */ +data class SendEventsResponse( + val ids: List, + val status: Int, +) diff --git a/inngest/src/main/kotlin/com/inngest/Inngest.kt b/inngest/src/main/kotlin/com/inngest/Inngest.kt index 5190faee..f5f9da63 100644 --- a/inngest/src/main/kotlin/com/inngest/Inngest.kt +++ b/inngest/src/main/kotlin/com/inngest/Inngest.kt @@ -2,6 +2,8 @@ package com.inngest import com.beust.klaxon.Klaxon import java.io.IOException +import io.ktor.http.* +import java.net.ConnectException class Inngest @JvmOverloads @@ -36,11 +38,56 @@ class Inngest fun send(events: Array): SendEventsResponse? { val request = httpClient.build("$baseUrl/e/$eventKey", events) - return httpClient.send(request) lambda@{ response -> - // TODO: Handle error case - if (!response.isSuccessful) throw IOException("Unexpected code $response") + try { + return httpClient.send(request) lambda@{ response -> + if (!response.isSuccessful) { + // TODO - Attempt to parse the HTTP response and get error from JSON body to pass here + throw InngestSendEventBadResponseCodeException(response.code) + } - return@lambda Klaxon().parse(response.body!!.charStream()) + val responseBody = response.body!!.charStream() + try { + val sendEventsResponse = Klaxon().parse(responseBody) + if (sendEventsResponse != null) { + return@lambda sendEventsResponse + } + } catch (e: Exception) { + throw InngestSendEventInvalidResponseException(responseBody.toString()) + } + throw InngestSendEventInvalidResponseException(responseBody.toString()) + } + } catch (e: ConnectException) { + throw InngestSendEventConnectException(e.message!!) + } catch (e: Exception) { + throw InngestSendEventException(e.message!!) } } } + +/** + * A generic exception occurred while sending events + */ +open class InngestSendEventException( + message: String, +) : Exception("Failed to send event: $message") + +/** + * A failure occurred establishing a connection to the Inngest Event API + */ +class InngestSendEventConnectException( + message: String, +) : InngestSendEventException(message) + +/** + * The Inngest Event API returned a non-successful HTTP status code + */ +class InngestSendEventBadResponseCodeException( + code: Int, +) : InngestSendEventException("Bad response code: $code") + +/** + * The Inngest Event API returned a response that was not parsable + */ +class InngestSendEventInvalidResponseException( + message: String, +) : InngestSendEventException("Unable to parse response: $message") diff --git a/inngest/src/main/kotlin/com/inngest/Step.kt b/inngest/src/main/kotlin/com/inngest/Step.kt index eeebb9a5..bee9d7ec 100644 --- a/inngest/src/main/kotlin/com/inngest/Step.kt +++ b/inngest/src/main/kotlin/com/inngest/Step.kt @@ -5,26 +5,6 @@ import java.time.Duration typealias MemoizedRecord = HashMap typealias MemoizedState = HashMap -data class InngestEvent( - val name: String, - val data: Any, -) - -data class SendEventsResponse( - val ids: Array, -) { - override fun equals(other: Any?): Boolean { - if (this === other) return true - if (javaClass != other?.javaClass) return false - - other as SendEventsResponse - - return ids.contentEquals(other.ids) - } - - override fun hashCode(): Int = ids.contentHashCode() -} - class StepInvalidStateTypeException( val id: String, val hashedId: String, @@ -51,7 +31,7 @@ class StepInterruptSleepException( class StepInterruptSendEventException( id: String, hashedId: String, - val eventIds: Array, + val eventIds: List, ) : StepInterruptException(id, hashedId, eventIds) class StepInterruptInvokeException( @@ -231,15 +211,16 @@ class Step( val hashedId = state.getHashFromId(id) try { - val stepState = state.getState>(hashedId, "event_ids") + val stepState = state.getState>(hashedId, "event_ids") if (stepState != null) { - return SendEventsResponse(stepState) + return SendEventsResponse(stepState, 200) } throw Exception("step state expected sendEvent, got something else") } catch (e: StateNotFound) { val response = client.send(events) - throw StepInterruptSendEventException(id, hashedId, response!!.ids) + //throw StepInterruptSendEventException(id, hashedId, response!!.ids) + throw StepInterruptSendEventException(id, hashedId, response.ids) } }