Skip to content

Commit

Permalink
[PM-17562] Initial POC of Distributed Events (#5323)
Browse files Browse the repository at this point in the history
* Initial POC of Distributed Events

* Apply suggestions from code review

Co-authored-by: Justin Baur <[email protected]>

* Clean up files to support accepted changes. Address PR Feedback

* Removed unneeded using to fix lint warning

* Moved config into a common EventLogging top-level item. Fixed issues from PR review

* Optimized per suggestion from justinbaur

Co-authored-by: Justin Baur <[email protected]>

* Updated to add IAsyncDisposable as suggested in PR review

* Updated with suggestion to use KeyedSingleton for the IEventWriteService

* Changed key case to lowercase

---------

Co-authored-by: Justin Baur <[email protected]>
  • Loading branch information
brant-livefront and justindbaur authored Jan 30, 2025
1 parent 443a147 commit 5efd68c
Show file tree
Hide file tree
Showing 10 changed files with 311 additions and 4 deletions.
6 changes: 5 additions & 1 deletion dev/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,8 @@ IDP_SP_ACS_URL=http://localhost:51822/saml2/yourOrgIdHere/Acs
# Optional reverse proxy configuration
# Should match server listen ports in reverse-proxy.conf
API_PROXY_PORT=4100
IDENTITY_PROXY_PORT=33756
IDENTITY_PROXY_PORT=33756

# Optional RabbitMQ configuration
RABBITMQ_DEFAULT_USER=bitwarden
RABBITMQ_DEFAULT_PASS=SET_A_PASSWORD_HERE_123
15 changes: 15 additions & 0 deletions dev/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,20 @@ services:
profiles:
- idp

rabbitmq:
image: rabbitmq:management
container_name: rabbitmq
ports:
- "5672:5672"
- "15672:15672"
environment:
RABBITMQ_DEFAULT_USER: ${RABBITMQ_DEFAULT_USER}
RABBITMQ_DEFAULT_PASS: ${RABBITMQ_DEFAULT_PASS}
volumes:
- rabbitmq_data:/var/lib/rabbitmq_data
profiles:
- rabbitmq

reverse-proxy:
image: nginx:alpine
container_name: reverse-proxy
Expand All @@ -99,3 +113,4 @@ volumes:
mssql_dev_data:
postgres_dev_data:
mysql_dev_data:
rabbitmq_data:
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using System.Net.Http.Json;
using Bit.Core.Models.Data;
using Bit.Core.Settings;
using Microsoft.Extensions.Logging;

namespace Bit.Core.Services;

public class RabbitMqEventHttpPostListener : RabbitMqEventListenerBase
{
private readonly HttpClient _httpClient;
private readonly string _httpPostUrl;
private readonly string _queueName;

protected override string QueueName => _queueName;

public const string HttpClientName = "EventHttpPostListenerHttpClient";

public RabbitMqEventHttpPostListener(
IHttpClientFactory httpClientFactory,
ILogger<RabbitMqEventListenerBase> logger,
GlobalSettings globalSettings)
: base(logger, globalSettings)
{
_httpClient = httpClientFactory.CreateClient(HttpClientName);
_httpPostUrl = globalSettings.EventLogging.RabbitMq.HttpPostUrl;
_queueName = globalSettings.EventLogging.RabbitMq.HttpPostQueueName;
}

protected override async Task HandleMessageAsync(EventMessage eventMessage)
{
var content = JsonContent.Create(eventMessage);
var response = await _httpClient.PostAsync(_httpPostUrl, content);
response.EnsureSuccessStatusCode();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
using System.Text.Json;
using Bit.Core.Models.Data;
using Bit.Core.Settings;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Bit.Core.Services;

public abstract class RabbitMqEventListenerBase : BackgroundService
{
private IChannel _channel;
private IConnection _connection;
private readonly string _exchangeName;
private readonly ConnectionFactory _factory;
private readonly ILogger<RabbitMqEventListenerBase> _logger;

protected abstract string QueueName { get; }

protected RabbitMqEventListenerBase(
ILogger<RabbitMqEventListenerBase> logger,
GlobalSettings globalSettings)
{
_factory = new ConnectionFactory
{
HostName = globalSettings.EventLogging.RabbitMq.HostName,
UserName = globalSettings.EventLogging.RabbitMq.Username,
Password = globalSettings.EventLogging.RabbitMq.Password
};
_exchangeName = globalSettings.EventLogging.RabbitMq.ExchangeName;
_logger = logger;
}

public override async Task StartAsync(CancellationToken cancellationToken)
{
_connection = await _factory.CreateConnectionAsync(cancellationToken);
_channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken);

await _channel.ExchangeDeclareAsync(exchange: _exchangeName, type: ExchangeType.Fanout, durable: true);
await _channel.QueueDeclareAsync(queue: QueueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null,
cancellationToken: cancellationToken);
await _channel.QueueBindAsync(queue: QueueName,
exchange: _exchangeName,
routingKey: string.Empty,
cancellationToken: cancellationToken);
await base.StartAsync(cancellationToken);
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.ReceivedAsync += async (_, eventArgs) =>
{
try
{
var eventMessage = JsonSerializer.Deserialize<EventMessage>(eventArgs.Body.Span);
await HandleMessageAsync(eventMessage);
}
catch (Exception ex)
{
_logger.LogError(ex, "An error occurred while processing the message");
}
};

await _channel.BasicConsumeAsync(QueueName, autoAck: true, consumer: consumer, cancellationToken: stoppingToken);

while (!stoppingToken.IsCancellationRequested)
{
await Task.Delay(1_000, stoppingToken);
}
}

public override async Task StopAsync(CancellationToken cancellationToken)
{
await _channel.CloseAsync();
await _connection.CloseAsync();
await base.StopAsync(cancellationToken);
}

public override void Dispose()
{
_channel.Dispose();
_connection.Dispose();
base.Dispose();
}

protected abstract Task HandleMessageAsync(EventMessage eventMessage);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
using Bit.Core.Models.Data;
using Bit.Core.Settings;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

namespace Bit.Core.Services;

public class RabbitMqEventRepositoryListener : RabbitMqEventListenerBase
{
private readonly IEventWriteService _eventWriteService;
private readonly string _queueName;

protected override string QueueName => _queueName;

public RabbitMqEventRepositoryListener(
[FromKeyedServices("persistent")] IEventWriteService eventWriteService,
ILogger<RabbitMqEventListenerBase> logger,
GlobalSettings globalSettings)
: base(logger, globalSettings)
{
_eventWriteService = eventWriteService;
_queueName = globalSettings.EventLogging.RabbitMq.EventRepositoryQueueName;
}

protected override Task HandleMessageAsync(EventMessage eventMessage)
{
return _eventWriteService.CreateAsync(eventMessage);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
using System.Text.Json;
using Bit.Core.Models.Data;
using Bit.Core.Settings;
using RabbitMQ.Client;

namespace Bit.Core.Services;
public class RabbitMqEventWriteService : IEventWriteService, IAsyncDisposable
{
private readonly ConnectionFactory _factory;
private readonly Lazy<Task<IConnection>> _lazyConnection;
private readonly string _exchangeName;

public RabbitMqEventWriteService(GlobalSettings globalSettings)
{
_factory = new ConnectionFactory
{
HostName = globalSettings.EventLogging.RabbitMq.HostName,
UserName = globalSettings.EventLogging.RabbitMq.Username,
Password = globalSettings.EventLogging.RabbitMq.Password
};
_exchangeName = globalSettings.EventLogging.RabbitMq.ExchangeName;

_lazyConnection = new Lazy<Task<IConnection>>(CreateConnectionAsync);
}

public async Task CreateAsync(IEvent e)
{
var connection = await _lazyConnection.Value;
using var channel = await connection.CreateChannelAsync();

await channel.ExchangeDeclareAsync(exchange: _exchangeName, type: ExchangeType.Fanout, durable: true);

var body = JsonSerializer.SerializeToUtf8Bytes(e);

await channel.BasicPublishAsync(exchange: _exchangeName, routingKey: string.Empty, body: body);
}

public async Task CreateManyAsync(IEnumerable<IEvent> events)
{
var connection = await _lazyConnection.Value;
using var channel = await connection.CreateChannelAsync();
await channel.ExchangeDeclareAsync(exchange: _exchangeName, type: ExchangeType.Fanout, durable: true);

foreach (var e in events)
{
var body = JsonSerializer.SerializeToUtf8Bytes(e);

await channel.BasicPublishAsync(exchange: _exchangeName, routingKey: string.Empty, body: body);
}
}

public async ValueTask DisposeAsync()
{
if (_lazyConnection.IsValueCreated)
{
var connection = await _lazyConnection.Value;
await connection.DisposeAsync();
}
}

private async Task<IConnection> CreateConnectionAsync()
{
return await _factory.CreateConnectionAsync();
}
}
5 changes: 3 additions & 2 deletions src/Core/Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
<PackageReference Include="DnsClient" Version="1.8.0" />
<PackageReference Include="Fido2.AspNet" Version="3.0.1" />
<PackageReference Include="Handlebars.Net" Version="2.1.6" />
<PackageReference Include="MailKit" Version="4.9.0" />
<PackageReference Include="MailKit" Version="4.9.0" />
<PackageReference Include="Microsoft.AspNetCore.Authentication.JwtBearer" Version="8.0.10" />
<PackageReference Include="Microsoft.Azure.Cosmos" Version="3.46.1" />
<PackageReference Include="Microsoft.Azure.NotificationHubs" Version="4.2.0" />
Expand Down Expand Up @@ -70,12 +70,13 @@
<PackageReference Include="Quartz" Version="3.13.1" />
<PackageReference Include="Quartz.Extensions.Hosting" Version="3.13.1" />
<PackageReference Include="Quartz.Extensions.DependencyInjection" Version="3.13.1" />
<PackageReference Include="RabbitMQ.Client" Version="7.0.0" />
</ItemGroup>

<ItemGroup>
<Protobuf Include="Billing\Pricing\Protos\password-manager.proto" GrpcServices="Client" />
</ItemGroup>

<ItemGroup>
<Folder Include="Resources\" />
<Folder Include="Properties\" />
Expand Down
39 changes: 39 additions & 0 deletions src/Core/Settings/GlobalSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public virtual string LicenseDirectory
public virtual SqlSettings PostgreSql { get; set; } = new SqlSettings();
public virtual SqlSettings MySql { get; set; } = new SqlSettings();
public virtual SqlSettings Sqlite { get; set; } = new SqlSettings() { ConnectionString = "Data Source=:memory:" };
public virtual EventLoggingSettings EventLogging { get; set; } = new EventLoggingSettings();
public virtual MailSettings Mail { get; set; } = new MailSettings();
public virtual IConnectionStringSettings Storage { get; set; } = new ConnectionStringSettings();
public virtual ConnectionStringSettings Events { get; set; } = new ConnectionStringSettings();
Expand Down Expand Up @@ -256,6 +257,44 @@ public string JobSchedulerConnectionString
}
}

public class EventLoggingSettings
{
public RabbitMqSettings RabbitMq { get; set; }

public class RabbitMqSettings
{
private string _hostName;
private string _username;
private string _password;
private string _exchangeName;

public virtual string EventRepositoryQueueName { get; set; } = "events-write-queue";
public virtual string HttpPostQueueName { get; set; } = "events-httpPost-queue";
public virtual string HttpPostUrl { get; set; }

public string HostName
{
get => _hostName;
set => _hostName = value.Trim('"');
}
public string Username
{
get => _username;
set => _username = value.Trim('"');
}
public string Password
{
get => _password;
set => _password = value.Trim('"');
}
public string ExchangeName
{
get => _exchangeName;
set => _exchangeName = value.Trim('"');
}
}
}

public class ConnectionStringSettings : IConnectionStringSettings
{
private string _connectionString;
Expand Down
16 changes: 16 additions & 0 deletions src/Events/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,22 @@ public void ConfigureServices(IServiceCollection services)
{
services.AddHostedService<Core.HostedServices.ApplicationCacheHostedService>();
}

// Optional RabbitMQ Listeners
if (CoreHelpers.SettingHasValue(globalSettings.EventLogging.RabbitMq.HostName) &&
CoreHelpers.SettingHasValue(globalSettings.EventLogging.RabbitMq.Username) &&
CoreHelpers.SettingHasValue(globalSettings.EventLogging.RabbitMq.Password) &&
CoreHelpers.SettingHasValue(globalSettings.EventLogging.RabbitMq.ExchangeName))
{
services.AddKeyedSingleton<IEventWriteService, RepositoryEventWriteService>("persistent");
services.AddHostedService<RabbitMqEventRepositoryListener>();

if (CoreHelpers.SettingHasValue(globalSettings.EventLogging.RabbitMq.HttpPostUrl))
{
services.AddHttpClient(RabbitMqEventHttpPostListener.HttpClientName);
services.AddHostedService<RabbitMqEventHttpPostListener>();
}
}
}

public void Configure(
Expand Down
12 changes: 11 additions & 1 deletion src/SharedWeb/Utilities/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,17 @@ public static void AddDefaultServices(this IServiceCollection services, GlobalSe
}
else if (globalSettings.SelfHosted)
{
services.AddSingleton<IEventWriteService, RepositoryEventWriteService>();
if (CoreHelpers.SettingHasValue(globalSettings.EventLogging.RabbitMq.HostName) &&
CoreHelpers.SettingHasValue(globalSettings.EventLogging.RabbitMq.Username) &&
CoreHelpers.SettingHasValue(globalSettings.EventLogging.RabbitMq.Password) &&
CoreHelpers.SettingHasValue(globalSettings.EventLogging.RabbitMq.ExchangeName))
{
services.AddSingleton<IEventWriteService, RabbitMqEventWriteService>();
}
else
{
services.AddSingleton<IEventWriteService, RepositoryEventWriteService>();
}
}
else
{
Expand Down

0 comments on commit 5efd68c

Please sign in to comment.