Skip to content

Commit

Permalink
publish dionysus.consume_batch
Browse files Browse the repository at this point in the history
  • Loading branch information
Azdaroth committed Mar 15, 2024
1 parent 5449a87 commit 836b1ca
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
## [Unreleased]
- Publish consumed messages batch as `dionysus.consume_batch` event.

## [0.4.0]
- Allow `Dionysus::Producer::Genesis::StreamJob`/`Dionysus::Producer::Genesis::Streamer` to take more options and perform filtering by extra conditions. This is useful if you only need to stream some of the records.
Expand Down
10 changes: 7 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ Dionysus::Producer.responders_for(Record).each do |responder|
message = [].tap do |current_message|
current_message << ["record_created", created_records.to_a, {}]
if canceled_records.any?
current_message << ["record_destroyed", canceled_records.map { |record| RecordPrometheusDTO.new(record) }, { serialize: false }]
current_message << ["record_destroyed", canceled_records.map { |record| RecordDionysusDTO.new(record) }, { serialize: false }]
end
end

Expand Down Expand Up @@ -1137,7 +1137,7 @@ Dionysus::Consumer.configure do |config|
config.resolve_synced_data_hash_proc = ->(record) { record.synced_data_model.synced_data_hash } # optional, defaults to ->(record) { record.public_send(Dionysus::Consumer.configuration.synced_data_attribute).to_h }
config.sidekiq_queue = :default # optional, defaults to `:dionysus`
config.message_filter = FilterIgnoringLargeMessageToAvoidOutofMemoryErrors.new(error_handler: Sentry) # DEPRECATED - not required, defaults to Dionysus::Utils::DefaultMessageFilter, which doesn't ignore any messages. It can be useful when you want to ignore some messages, e.g. some very large ones that would cause OOM error. Check the implementation of `Dionysus::Utils::DefaultMessageFilter for more details to understand what kind of arguments are available to set the condition. `error_handler` needs to implement Sentry-like interface. Kept for backwards compatibility, please use `message_filters` instead.
config.message_filters = [FilterIgnoringLargeMessageToAvoidOutofMemoryErrors.new(error_handler: Sentry)] # not required, defaults to [Dionysus::Utils::DefaultMessageFilter], which doesn't ignore any messages. It can be useful when you want to ignore some messages, e.g. some very large ones that would cause OOM error. Check the implementation of `Dionysus::Utils::DefaultMessageFilter for more details to understand what kind of arguments are available to set the condition. `error_handler` needs to implement Sentry-like interface.
config.message_filters = [FilterIgnoringLargeMessageToAvoidOutofMemoryErrors.new(error_handler: Sentry)] # not required, defaults to [Dionysus::Utils::DefaultMessageFilter], which doesn't ignore any messages. It can be useful when you want to ignore some messages, e.g. some very large ones that would cause OOM error. Check the implementation of `Dionysus::Utils::DefaultMessageFilter for more details to understand what kind of arguments are available to set the condition. `error_handler` needs to implement Sentry-like interface.

# if you ever need to provide mapping:

Expand All @@ -1154,7 +1154,9 @@ end

Check publisher for reference about instrumentation and event bus. The only difference is about the methods that are instrumented and events that are published.

For the event bus, you may expect the `dionysus.consume` event. It contains the following attributes:
For the event bus, you may expect two events:

1. `dionysus.consume` event. It contains the following attributes:
- `topic_name`, e.g. "v3_inbox", "v3_rentals"
- `model_name`, e.g. "Conversation", "Rental"
- `event_name`, e.g. "rental_created", "converation_updated", "message_destroyed"
Expand All @@ -1168,6 +1170,8 @@ For the event bus, you may expect the `dionysus.consume` event. It contains the
}
```

2. `dionysus.consume_batch` event. It's essentially an array of events available under `dionysus.consume` event representing an entire Karafka consumers messages' batch.

Event bus is the recommended way to do something upon consuming events if you want to avoid putting that logic into ActiveRecord callbacks.


Expand Down
12 changes: 9 additions & 3 deletions lib/dionysus/consumer/batch_events_publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ def initialize(config, topic)

def publish(processed_events)
processed_events
.map(&method(:to_event_data))
.each(&method(:publish_via_event_bus))
.map { |dionysus_event| to_event_data(dionysus_event) }
.then { |mapped_events| publish_events_batch_via_event_bus(mapped_events) }
.then { |mapped_events| mapped_events.each { |event_data| publish_single_event_via_event_bus(event_data) } }
end

private
Expand All @@ -27,7 +28,12 @@ def to_event_data(dionysus_event)
}
end

def publish_via_event_bus(event_data)
def publish_events_batch_via_event_bus(events_batch_data)
config.event_bus.publish("dionysus.consume_batch", events_batch_data)
events_batch_data
end

def publish_single_event_via_event_bus(event_data)
config.event_bus.publish("dionysus.consume", event_data)
end
end
98 changes: 94 additions & 4 deletions spec/dionysus/consumer/karafka_consumer_generator_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3317,6 +3317,96 @@ def publish(event_name, payload = {})
end
let(:expected_events) do
[
[
"dionysus.consume_batch",
[
{
topic_name: "v8_rentals",
event_name: "rental_updated",
model_name: "Rental",
transformed_data: [
attributes: {
"synced_tax_id" => 201,
"synced_booking_ids" => [101, 102],
"synced_account_id" => 300,
"synced_id" => 1,
"name" => "Villa Saganaki",
"non_relationship" => [
{ "id" => 1 }
]
},
has_many: [
[
"bookings",
[
{
attributes: {
"synced_id" => 101,
"start_at" => 1
},
has_many: [],
has_one: []
},
{
attributes: {
"synced_id" => 102,
"start_at" => 22
},
has_many: [],
has_one: []
}
]
]
],
has_one: [
[
"tax",
{
attributes: {
"synced_id" => 201,
"name" => "VAT"
},
has_many: [],
has_one: []
}
],
[
"account",
{
attributes:
{
"synced_id" => 300,
"name" => "#WhateverItTakes"
},
has_many: [],
has_one: []
}
]
]
],
local_changes: {
["Rental", 1] => { "name" => ["old name", "Villa Saganaki"] },
["bookings", 101] => { "start_at" => [nil, 1] },
["bookings", 102] => { "start_at" => [2, 22] },
["tax", 201] => { "name" => [nil, "VAT"] },
["account", 300] => { "name" => ["Discipline Equals Freedom", "#WhateverItTakes"] }
}
},
{
topic_name: "v8_rentals",
event_name: "rental_destroyed",
model_name: "Rental",
transformed_data: [
attributes: {
"synced_id" => 10_101
},
has_many: [],
has_one: []
],
local_changes: {}
}
]
],
[
"dionysus.consume",
{
Expand Down Expand Up @@ -3887,10 +3977,10 @@ def publish(event_name, payload = {})
consume
end

it "publishes events correctly in the end" do
it "publishes events correctly" do
expect do
consume
end.to change { event_bus.events.count }.from(0).to(3)
end.to change { event_bus.events.count }.from(0).to(4)
end
end

Expand Down Expand Up @@ -4068,10 +4158,10 @@ def publish(event_name, payload = {})
expect(consumer).to have_received(:process_batch).exactly(1)
end

it "publishes events correctly in the end" do
it "publishes events correctly" do
expect do
consume
end.to change { event_bus.events.count }.from(0).to(3)
end.to change { event_bus.events.count }.from(0).to(4)
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion spec/integration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def with_connection
end.to change { DBForKarafkaConsumerTest.rentals.count }.from(0)
.and change {
event_bus.messages.count
}.from(0).to(1) # only for rental_created, tombstone is not considered here
}.from(0).to(2) # only for rental_created, tombstone is not considered here
.and change { transaction_provider.counter }.from(0).to(2)
end
end
Expand Down

0 comments on commit 836b1ca

Please sign in to comment.