Skip to content

Commit

Permalink
Fix inngest.send. Add Event Builder
Browse files Browse the repository at this point in the history
  • Loading branch information
djfarrelly authored and albertchae committed Sep 9, 2024
1 parent a67b0b6 commit 8a5e994
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 33 deletions.
83 changes: 78 additions & 5 deletions inngest/src/main/kotlin/com/inngest/Event.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package com.inngest

data class Event(
/**
* An internal class used for parsing events sent to Inngest functions
*/
internal data class Event(
val id: String,
val name: String,
val data: LinkedHashMap<String, Any>,
Expand All @@ -9,7 +12,77 @@ data class Event(
val v: Any? = null,
)

// data class EventAPIResponse(
// val ids: Array<String>,
// val status: String,
// )
/**
* Create an event to send to Inngest
*/
data class InngestEvent(
val id: String?,
val name: String,
val data: Any,
val user: Any?,
val ts: Long?,
val v: String? = null,
)

/**
* Construct a new Inngest Event via builder
*/
class InngestEventBuilder(
var id: String?,
var name: String?,
var data: Any?,
private var user: Any?,
private var ts: Long?,
private var v: String? = null,
) {
fun id(id: String): InngestEventBuilder {
this.id = id
return this
}

fun name(name: String): InngestEventBuilder {
this.name = name
return this
}

fun data(data: Any): InngestEventBuilder {
this.data = data
return this
}

fun ts(ts: Long): InngestEventBuilder {
this.ts = ts
return this
}

fun v(v: String): InngestEventBuilder {
this.v = v
return this
}

fun build(): InngestEvent {
if (name == null) {
throw IllegalArgumentException("name is required")
}
if (data == null) {
throw IllegalArgumentException("data is required")
}
return InngestEvent(
id,
name!!,
data!!,
user,
ts,
v,
)
}
}

/**
* The response from the Inngest Event API including the ids of any event created
* in the order of which they were included in the request
*/
data class SendEventsResponse(
val ids: List<String>,
val status: Int,
)
55 changes: 51 additions & 4 deletions inngest/src/main/kotlin/com/inngest/Inngest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package com.inngest

import com.beust.klaxon.Klaxon
import java.io.IOException
import io.ktor.http.*
import java.net.ConnectException

class Inngest
@JvmOverloads
Expand Down Expand Up @@ -36,11 +38,56 @@ class Inngest
fun send(events: Array<InngestEvent>): SendEventsResponse? {
val request = httpClient.build("$baseUrl/e/$eventKey", events)

return httpClient.send(request) lambda@{ response ->
// TODO: Handle error case
if (!response.isSuccessful) throw IOException("Unexpected code $response")
try {
return httpClient.send(request) lambda@{ response ->
if (!response.isSuccessful) {
// TODO - Attempt to parse the HTTP response and get error from JSON body to pass here
throw InngestSendEventBadResponseCodeException(response.code)
}

return@lambda Klaxon().parse<SendEventsResponse>(response.body!!.charStream())
val responseBody = response.body!!.charStream()
try {
val sendEventsResponse = Klaxon().parse<EventAPIResponse>(responseBody)
if (sendEventsResponse != null) {
return@lambda sendEventsResponse
}
} catch (e: Exception) {
throw InngestSendEventInvalidResponseException(responseBody.toString())
}
throw InngestSendEventInvalidResponseException(responseBody.toString())
}
} catch (e: ConnectException) {
throw InngestSendEventConnectException(e.message!!)
} catch (e: Exception) {
throw InngestSendEventException(e.message!!)
}
}
}

/**
* A generic exception occurred while sending events
*/
open class InngestSendEventException(
message: String,
) : Exception("Failed to send event: $message")

/**
* A failure occurred establishing a connection to the Inngest Event API
*/
class InngestSendEventConnectException(
message: String,
) : InngestSendEventException(message)

/**
* The Inngest Event API returned a non-successful HTTP status code
*/
class InngestSendEventBadResponseCodeException(
code: Int,
) : InngestSendEventException("Bad response code: $code")

/**
* The Inngest Event API returned a response that was not parsable
*/
class InngestSendEventInvalidResponseException(
message: String,
) : InngestSendEventException("Unable to parse response: $message")
29 changes: 5 additions & 24 deletions inngest/src/main/kotlin/com/inngest/Step.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,6 @@ import java.time.Duration
typealias MemoizedRecord = HashMap<String, Any>
typealias MemoizedState = HashMap<String, MemoizedRecord>

data class InngestEvent(
val name: String,
val data: Any,
)

data class SendEventsResponse(
val ids: Array<String>,
) {
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (javaClass != other?.javaClass) return false

other as SendEventsResponse

return ids.contentEquals(other.ids)
}

override fun hashCode(): Int = ids.contentHashCode()
}

class StepInvalidStateTypeException(
val id: String,
val hashedId: String,
Expand All @@ -51,7 +31,7 @@ class StepInterruptSleepException(
class StepInterruptSendEventException(
id: String,
hashedId: String,
val eventIds: Array<String>,
val eventIds: List<String>,
) : StepInterruptException(id, hashedId, eventIds)

class StepInterruptInvokeException(
Expand Down Expand Up @@ -231,15 +211,16 @@ class Step(
val hashedId = state.getHashFromId(id)

try {
val stepState = state.getState<Array<String>>(hashedId, "event_ids")
val stepState = state.getState<List<String>>(hashedId, "event_ids")

if (stepState != null) {
return SendEventsResponse(stepState)
return SendEventsResponse(stepState, 200)
}
throw Exception("step state expected sendEvent, got something else")
} catch (e: StateNotFound) {
val response = client.send(events)
throw StepInterruptSendEventException(id, hashedId, response!!.ids)
//throw StepInterruptSendEventException(id, hashedId, response!!.ids)
throw StepInterruptSendEventException(id, hashedId, response.ids)
}
}

Expand Down

0 comments on commit 8a5e994

Please sign in to comment.