Skip to content

Commit

Permalink
Feature/producer properties (#222)
Browse files Browse the repository at this point in the history
* Add option to setup producer properties

* Add changelog entry
  • Loading branch information
kandersen82 authored Jun 7, 2024
1 parent 3bac2ba commit a512cc8
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 3 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [3.3.0] - ?

### Added

- Producer properties can be added when creating a producer

## [3.2.1] - 2024-04-24

### Fixed
Expand Down
5 changes: 5 additions & 0 deletions src/DotPulsar/Abstractions/IProducerBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ public interface IProducerBuilder<TMessage>
/// </summary>
IProducerBuilder<TMessage> MaxPendingMessages(uint maxPendingMessages);

/// <summary>
/// Add/Set a property key/value on the producer. This is optional.
/// </summary>
IProducerBuilder<TMessage> ProducerProperty(string key, string value);

/// <summary>
/// Create the producer.
/// </summary>
Expand Down
3 changes: 2 additions & 1 deletion src/DotPulsar/Internal/Producer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ private SubProducer CreateSubProducer(string topic, int partition)
var producerName = _options.ProducerName;
var schema = _options.Schema;
var producerAccessMode = (PulsarApi.ProducerAccessMode) _options.ProducerAccessMode;
var factory = new ProducerChannelFactory(correlationId, _processManager, _connectionPool, topic, producerName, producerAccessMode, schema.SchemaInfo, _compressorFactory);
var producerProperties = _options.ProducerProperties;
var factory = new ProducerChannelFactory(correlationId, _processManager, _connectionPool, topic, producerName, producerAccessMode, schema.SchemaInfo, _compressorFactory, producerProperties);
var stateManager = CreateStateManager();
var initialChannel = new NotReadyChannel<TMessage>();
var executor = new Executor(correlationId, _processManager, _exceptionHandler);
Expand Down
11 changes: 10 additions & 1 deletion src/DotPulsar/Internal/ProducerBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public sealed class ProducerBuilder<TMessage> : IProducerBuilder<TMessage>
{
private readonly IPulsarClient _pulsarClient;
private readonly ISchema<TMessage> _schema;
private readonly Dictionary<string, string> _producerProperties;
private string? _producerName;
private ProducerAccessMode _producerAccessMode;
private bool _attachTraceInfoToMessages;
Expand All @@ -40,6 +41,7 @@ public ProducerBuilder(IPulsarClient pulsarClient, ISchema<TMessage> schema)
_initialSequenceId = ProducerOptions<TMessage>.DefaultInitialSequenceId;
_maxPendingMessages = 500;
_producerAccessMode = ProducerOptions<TMessage>.DefaultProducerAccessMode;
_producerProperties = [];
}

public IProducerBuilder<TMessage> AttachTraceInfoToMessages(bool attachTraceInfoToMessages)
Expand Down Expand Up @@ -96,6 +98,12 @@ public IProducerBuilder<TMessage> MaxPendingMessages(uint maxPendingMessages)
return this;
}

public IProducerBuilder<TMessage> ProducerProperty(string key, string value)
{
_producerProperties[key] = value;
return this;
}

public IProducer<TMessage> Create()
{
if (string.IsNullOrEmpty(_topic))
Expand All @@ -112,7 +120,8 @@ public IProducer<TMessage> Create()
InitialSequenceId = _initialSequenceId,
ProducerName = _producerName,
StateChangedHandler = _stateChangedHandler,
MaxPendingMessages = _maxPendingMessages
MaxPendingMessages = _maxPendingMessages,
ProducerProperties = _producerProperties
};

if (_messageRouter is not null)
Expand Down
6 changes: 5 additions & 1 deletion src/DotPulsar/Internal/ProducerChannelFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ public ProducerChannelFactory(
string? producerName,
ProducerAccessMode producerAccessMode,
SchemaInfo schemaInfo,
ICompressorFactory? compressorFactory)
ICompressorFactory? compressorFactory,
Dictionary<string,string>? properties)
{
_correlationId = correlationId;
_eventRegister = eventRegister;
Expand All @@ -49,6 +50,9 @@ public ProducerChannelFactory(
Topic = topic
};

if (properties is not null)
_commandProducer.Metadatas.AddRange(properties.Select(x => new KeyValue { Key = x.Key, Value = x.Value }));

_compressorFactory = compressorFactory;
_schema = schemaInfo.PulsarSchema;
}
Expand Down
6 changes: 6 additions & 0 deletions src/DotPulsar/ProducerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public ProducerOptions(string topic, ISchema<TMessage> schema)
Topic = topic;
Schema = schema;
MessageRouter = new RoundRobinPartitionRouter();
ProducerProperties = [];
}

/// <summary>
Expand Down Expand Up @@ -99,4 +100,9 @@ public ProducerOptions(string topic, ISchema<TMessage> schema)
/// Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.
/// </summary>
public uint MaxPendingMessages { get; set; }

/// <summary>
/// Add/Set the producers's properties. This is optional.
/// </summary>
public Dictionary<string, string> ProducerProperties { get; set; }
}

0 comments on commit a512cc8

Please sign in to comment.