Skip to content

Commit

Permalink
customizable batch size fix
Browse files Browse the repository at this point in the history
  • Loading branch information
ariana-flipp committed May 14, 2024
1 parent 51b5093 commit d84820e
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 42 deletions.
2 changes: 2 additions & 0 deletions lib/deimos/config/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ def self.configure_producer_or_consumer(kafka_config)
Deimos.config.consumers.bulk_import_id_generator,
save_associations_first: kafka_config.save_associations_first
)
else
klass.config[:max_batch_size] = kafka_config.max_batch_size || Deimos.config.producers.max_batch_size
end
end
end
Expand Down
9 changes: 1 addition & 8 deletions lib/deimos/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ class << self
def config
@config ||= {
encode_key: true,
namespace: Deimos.config.producers.schema_namespace,
max_batch_size: Deimos.config.producers.max_batch_size
namespace: Deimos.config.producers.schema_namespace
}
end

Expand All @@ -90,12 +89,6 @@ def partition_key(_payload)
nil
end

# @param size [Integer] Override the default batch size for publishing.
# @return [void]
def max_batch_size(size)
config[:max_batch_size] = size
end

# Publish the payload to the topic.
# @param payload [Hash, SchemaClass::Record] with an optional payload_key hash key.
# @param topic [String] if specifying the topic
Expand Down
34 changes: 0 additions & 34 deletions spec/producer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,6 @@ def self.partition_key(payload)
key_config none: true
end
stub_const('MyNoTopicProducer', producer_class)

producer_class = Class.new(Deimos::Producer) do
schema 'MySchema'
namespace 'com.my-namespace'
topic 'my-topic'
key_config field: 'test_id'
max_batch_size 1
end
stub_const('MySmallBatchProducer', producer_class)
end

it 'should fail on invalid message with error handler' do
Expand Down Expand Up @@ -615,30 +606,5 @@ def self.partition_key(payload)
end
end

describe "max_batch_size" do
it 'should use top-level default value if max_batch_size is not defined by the producer' do
expect(MyProducer.config[:max_batch_size]).to eq(500)
end

it 'should call produce_batch multiple times when max_batch_size < records size' do
Deimos::Message.new({ 'test_id' => 'foo', 'some_int' => 123 },
MySmallBatchProducer,
topic: 'my-topic',
partition_key: 'foo',
key: 'foo')
Deimos::Message.new({ 'test_id' => 'bar', 'some_int' => 124 },
MySmallBatchProducer,
topic: 'my-topic',
partition_key: 'bar',
key: 'bar')
expect(described_class).to receive(:produce_batch).twice

MySmallBatchProducer.publish_list(
[{ 'test_id' => 'foo', 'some_int' => 123 },
{ 'test_id' => 'bar', 'some_int' => 124 }]
)
end
end

end
end

0 comments on commit d84820e

Please sign in to comment.