Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weโ€™ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PM-17562] Initial POC of Distributed Events #5323

Merged
merged 15 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion dev/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,8 @@ IDP_SP_ACS_URL=http://localhost:51822/saml2/yourOrgIdHere/Acs
# Optional reverse proxy configuration
# Should match server listen ports in reverse-proxy.conf
API_PROXY_PORT=4100
IDENTITY_PROXY_PORT=33756
IDENTITY_PROXY_PORT=33756

# Optional RabbitMQ configuration
RABBITMQ_DEFAULT_USER=bitwarden
RABBITMQ_DEFAULT_PASS=SET_A_PASSWORD_HERE_123
15 changes: 15 additions & 0 deletions dev/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,20 @@ services:
profiles:
- idp

rabbitmq:
withinfocus marked this conversation as resolved.
Show resolved Hide resolved
image: rabbitmq:management
container_name: rabbitmq
ports:
- "5672:5672"
- "15672:15672"
environment:
RABBITMQ_DEFAULT_USER: ${RABBITMQ_DEFAULT_USER}
RABBITMQ_DEFAULT_PASS: ${RABBITMQ_DEFAULT_PASS}
volumes:
- rabbitmq_data:/var/lib/rabbitmq_data
profiles:
- rabbitmq

reverse-proxy:
image: nginx:alpine
container_name: reverse-proxy
Expand All @@ -99,3 +113,4 @@ volumes:
mssql_dev_data:
postgres_dev_data:
mysql_dev_data:
rabbitmq_data:
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
๏ปฟusing System.Net.Http.Json;
using Bit.Core.Models.Data;
using Bit.Core.Settings;
using Microsoft.Extensions.Logging;

namespace Bit.Core.Services;

public class RabbitMqEventHttpPostListener : RabbitMqEventListenerBase
{
private readonly IHttpClientFactory _httpClientFactory;
private readonly string _httpPostUrl;
private readonly string _queueName;

protected override string QueueName => _queueName;

Check warning on line 14 in src/Core/AdminConsole/Services/Implementations/RabbitMqEventHttpPostListener.cs

View check run for this annotation

Codecov / codecov/patch

src/Core/AdminConsole/Services/Implementations/RabbitMqEventHttpPostListener.cs#L14

Added line #L14 was not covered by tests

public const string HttpClientName = "EventHttpPostListenerHttpClient";

public RabbitMqEventHttpPostListener(
IHttpClientFactory httpClientFactory,
ILogger<RabbitMqEventListenerBase> logger,
GlobalSettings globalSettings)
: base(logger, globalSettings)
{
_httpClientFactory = httpClientFactory;
_httpPostUrl = globalSettings.EventLogging.RabbitMq.HttpPostUrl;
_queueName = globalSettings.EventLogging.RabbitMq.HttpPostQueueName;
}

Check warning on line 27 in src/Core/AdminConsole/Services/Implementations/RabbitMqEventHttpPostListener.cs

View check run for this annotation

Codecov / codecov/patch

src/Core/AdminConsole/Services/Implementations/RabbitMqEventHttpPostListener.cs#L22-L27

Added lines #L22 - L27 were not covered by tests

protected override async Task HandleMessageAsync(EventMessage eventMessage)
withinfocus marked this conversation as resolved.
Show resolved Hide resolved
{
using var httpClient = _httpClientFactory.CreateClient(HttpClientName);
var content = JsonContent.Create(eventMessage);
var response = await httpClient.PostAsync(_httpPostUrl, content);
response.EnsureSuccessStatusCode();
}

Check warning on line 35 in src/Core/AdminConsole/Services/Implementations/RabbitMqEventHttpPostListener.cs

View check run for this annotation

Codecov / codecov/patch

src/Core/AdminConsole/Services/Implementations/RabbitMqEventHttpPostListener.cs#L30-L35

Added lines #L30 - L35 were not covered by tests
brant-livefront marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
๏ปฟusing System.Text.Json;
using Bit.Core.Models.Data;
using Bit.Core.Settings;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Bit.Core.Services;

public abstract class RabbitMqEventListenerBase : BackgroundService
{
private IChannel _channel;
private IConnection _connection;
private readonly string _exchangeName;
private readonly ConnectionFactory _factory;
private readonly ILogger<RabbitMqEventListenerBase> _logger;

protected abstract string QueueName { get; }

protected RabbitMqEventListenerBase(
ILogger<RabbitMqEventListenerBase> logger,
GlobalSettings globalSettings)
{
_factory = new ConnectionFactory
{
HostName = globalSettings.EventLogging.RabbitMq.HostName,
UserName = globalSettings.EventLogging.RabbitMq.Username,
Password = globalSettings.EventLogging.RabbitMq.Password
};
_exchangeName = globalSettings.EventLogging.RabbitMq.ExchangeName;
_logger = logger;
}

Check warning on line 33 in src/Core/AdminConsole/Services/Implementations/RabbitMqEventListenerBase.cs

View check run for this annotation

Codecov / codecov/patch

src/Core/AdminConsole/Services/Implementations/RabbitMqEventListenerBase.cs#L21-L33

Added lines #L21 - L33 were not covered by tests

public override async Task StartAsync(CancellationToken cancellationToken)
{
_connection = await _factory.CreateConnectionAsync(cancellationToken);
_channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken);

Check warning on line 38 in src/Core/AdminConsole/Services/Implementations/RabbitMqEventListenerBase.cs

View check run for this annotation

Codecov / codecov/patch

src/Core/AdminConsole/Services/Implementations/RabbitMqEventListenerBase.cs#L36-L38

Added lines #L36 - L38 were not covered by tests

await _channel.ExchangeDeclareAsync(exchange: _exchangeName, type: ExchangeType.Fanout, durable: true);
await _channel.QueueDeclareAsync(queue: QueueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null,
cancellationToken: cancellationToken);
await _channel.QueueBindAsync(queue: QueueName,
exchange: _exchangeName,
routingKey: string.Empty,
cancellationToken: cancellationToken);
await base.StartAsync(cancellationToken);
}

Check warning on line 52 in src/Core/AdminConsole/Services/Implementations/RabbitMqEventListenerBase.cs

View check run for this annotation

Codecov / codecov/patch

src/Core/AdminConsole/Services/Implementations/RabbitMqEventListenerBase.cs#L40-L52

Added lines #L40 - L52 were not covered by tests

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.ReceivedAsync += async (_, eventArgs) =>
{
try
{
var eventMessage = JsonSerializer.Deserialize<EventMessage>(eventArgs.Body.Span);
await HandleMessageAsync(eventMessage);
}
catch (Exception ex)
{
_logger.LogError(ex, "An error occurred while processing the message");
}
};

Check warning on line 68 in src/Core/AdminConsole/Services/Implementations/RabbitMqEventListenerBase.cs

View check run for this annotation

Codecov / codecov/patch

src/Core/AdminConsole/Services/Implementations/RabbitMqEventListenerBase.cs#L55-L68

Added lines #L55 - L68 were not covered by tests

await _channel.BasicConsumeAsync(QueueName, autoAck: true, consumer: consumer, cancellationToken: stoppingToken);

Check warning on line 70 in src/Core/AdminConsole/Services/Implementations/RabbitMqEventListenerBase.cs

View check run for this annotation

Codecov / codecov/patch

src/Core/AdminConsole/Services/Implementations/RabbitMqEventListenerBase.cs#L70

Added line #L70 was not covered by tests

while (!stoppingToken.IsCancellationRequested)
{
await Task.Delay(1_000, stoppingToken);
}
}

Check warning on line 76 in src/Core/AdminConsole/Services/Implementations/RabbitMqEventListenerBase.cs

View check run for this annotation

Codecov / codecov/patch

src/Core/AdminConsole/Services/Implementations/RabbitMqEventListenerBase.cs#L73-L76

Added lines #L73 - L76 were not covered by tests

public override async Task StopAsync(CancellationToken cancellationToken)
{
await _channel.CloseAsync();
await _connection.CloseAsync();
await base.StopAsync(cancellationToken);
}

Check warning on line 83 in src/Core/AdminConsole/Services/Implementations/RabbitMqEventListenerBase.cs

View check run for this annotation

Codecov / codecov/patch

src/Core/AdminConsole/Services/Implementations/RabbitMqEventListenerBase.cs#L79-L83

Added lines #L79 - L83 were not covered by tests

public override void Dispose()
{
_channel.Dispose();
_connection.Dispose();
base.Dispose();
}

Check warning on line 90 in src/Core/AdminConsole/Services/Implementations/RabbitMqEventListenerBase.cs

View check run for this annotation

Codecov / codecov/patch

src/Core/AdminConsole/Services/Implementations/RabbitMqEventListenerBase.cs#L86-L90

Added lines #L86 - L90 were not covered by tests

protected abstract Task HandleMessageAsync(EventMessage eventMessage);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
๏ปฟusing Bit.Core.Models.Data;
using Bit.Core.Repositories;
using Bit.Core.Settings;
using Microsoft.Extensions.Logging;

namespace Bit.Core.Services;

public class RabbitMqEventRepositoryListener : RabbitMqEventListenerBase
{
private readonly IEventWriteService _eventWriteService;
private readonly string _queueName;

protected override string QueueName => _queueName;

Check warning on line 13 in src/Core/AdminConsole/Services/Implementations/RabbitMqEventRepositoryListener.cs

View check run for this annotation

Codecov / codecov/patch

src/Core/AdminConsole/Services/Implementations/RabbitMqEventRepositoryListener.cs#L13

Added line #L13 was not covered by tests

public RabbitMqEventRepositoryListener(
IEventRepository eventRepository,
ILogger<RabbitMqEventListenerBase> logger,
GlobalSettings globalSettings)
: base(logger, globalSettings)
{
_eventWriteService = new RepositoryEventWriteService(eventRepository);
withinfocus marked this conversation as resolved.
Show resolved Hide resolved
_queueName = globalSettings.EventLogging.RabbitMq.EventRepositoryQueueName;
}

Check warning on line 23 in src/Core/AdminConsole/Services/Implementations/RabbitMqEventRepositoryListener.cs

View check run for this annotation

Codecov / codecov/patch

src/Core/AdminConsole/Services/Implementations/RabbitMqEventRepositoryListener.cs#L19-L23

Added lines #L19 - L23 were not covered by tests

protected override Task HandleMessageAsync(EventMessage eventMessage)
{
return _eventWriteService.CreateAsync(eventMessage);
}

Check warning on line 28 in src/Core/AdminConsole/Services/Implementations/RabbitMqEventRepositoryListener.cs

View check run for this annotation

Codecov / codecov/patch

src/Core/AdminConsole/Services/Implementations/RabbitMqEventRepositoryListener.cs#L26-L28

Added lines #L26 - L28 were not covered by tests
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
๏ปฟusing System.Text.Json;
using Bit.Core.Models.Data;
using Bit.Core.Settings;
using RabbitMQ.Client;

namespace Bit.Core.Services;
public class RabbitMqEventWriteService : IEventWriteService
{
private readonly ConnectionFactory _factory;
private readonly Lazy<Task<IConnection>> _lazyConnection;
private readonly string _exchangeName;

public RabbitMqEventWriteService(GlobalSettings globalSettings)
{
_factory = new ConnectionFactory
{
HostName = globalSettings.EventLogging.RabbitMq.HostName,
UserName = globalSettings.EventLogging.RabbitMq.Username,
Password = globalSettings.EventLogging.RabbitMq.Password
};
_exchangeName = globalSettings.EventLogging.RabbitMq.ExchangeName;

Check warning on line 21 in src/Core/AdminConsole/Services/Implementations/RabbitMqEventWriteService.cs

View check run for this annotation

Codecov / codecov/patch

src/Core/AdminConsole/Services/Implementations/RabbitMqEventWriteService.cs#L13-L21

Added lines #L13 - L21 were not covered by tests

_lazyConnection = new Lazy<Task<IConnection>>(CreateConnectionAsync);
withinfocus marked this conversation as resolved.
Show resolved Hide resolved
}

Check warning on line 24 in src/Core/AdminConsole/Services/Implementations/RabbitMqEventWriteService.cs

View check run for this annotation

Codecov / codecov/patch

src/Core/AdminConsole/Services/Implementations/RabbitMqEventWriteService.cs#L23-L24

Added lines #L23 - L24 were not covered by tests

public async Task CreateAsync(IEvent e)
{
var connection = await _lazyConnection.Value;
using var channel = await connection.CreateChannelAsync();

Check warning on line 29 in src/Core/AdminConsole/Services/Implementations/RabbitMqEventWriteService.cs

View check run for this annotation

Codecov / codecov/patch

src/Core/AdminConsole/Services/Implementations/RabbitMqEventWriteService.cs#L27-L29

Added lines #L27 - L29 were not covered by tests

await channel.ExchangeDeclareAsync(exchange: _exchangeName, type: ExchangeType.Fanout, durable: true);

Check warning on line 31 in src/Core/AdminConsole/Services/Implementations/RabbitMqEventWriteService.cs

View check run for this annotation

Codecov / codecov/patch

src/Core/AdminConsole/Services/Implementations/RabbitMqEventWriteService.cs#L31

Added line #L31 was not covered by tests

var body = JsonSerializer.SerializeToUtf8Bytes(e);

Check warning on line 33 in src/Core/AdminConsole/Services/Implementations/RabbitMqEventWriteService.cs

View check run for this annotation

Codecov / codecov/patch

src/Core/AdminConsole/Services/Implementations/RabbitMqEventWriteService.cs#L33

Added line #L33 was not covered by tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

โ“ Is JSON our best option for serialization?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so, at least to start. Were you thinking about something like Protobuf?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps ... just don't know best practices with Rabbit.


await channel.BasicPublishAsync(exchange: _exchangeName, routingKey: string.Empty, body: body);
}

Check warning on line 36 in src/Core/AdminConsole/Services/Implementations/RabbitMqEventWriteService.cs

View check run for this annotation

Codecov / codecov/patch

src/Core/AdminConsole/Services/Implementations/RabbitMqEventWriteService.cs#L35-L36

Added lines #L35 - L36 were not covered by tests

public async Task CreateManyAsync(IEnumerable<IEvent> events)
{
var connection = await _lazyConnection.Value;
using var channel = await connection.CreateChannelAsync();
await channel.ExchangeDeclareAsync(exchange: _exchangeName, type: ExchangeType.Fanout, durable: true);

Check warning on line 42 in src/Core/AdminConsole/Services/Implementations/RabbitMqEventWriteService.cs

View check run for this annotation

Codecov / codecov/patch

src/Core/AdminConsole/Services/Implementations/RabbitMqEventWriteService.cs#L39-L42

Added lines #L39 - L42 were not covered by tests

foreach (var e in events)
{
var body = JsonSerializer.SerializeToUtf8Bytes(e);

Check warning on line 46 in src/Core/AdminConsole/Services/Implementations/RabbitMqEventWriteService.cs

View check run for this annotation

Codecov / codecov/patch

src/Core/AdminConsole/Services/Implementations/RabbitMqEventWriteService.cs#L45-L46

Added lines #L45 - L46 were not covered by tests

await channel.BasicPublishAsync(exchange: _exchangeName, routingKey: string.Empty, body: body);
}
}

Check warning on line 50 in src/Core/AdminConsole/Services/Implementations/RabbitMqEventWriteService.cs

View check run for this annotation

Codecov / codecov/patch

src/Core/AdminConsole/Services/Implementations/RabbitMqEventWriteService.cs#L48-L50

Added lines #L48 - L50 were not covered by tests

private async Task<IConnection> CreateConnectionAsync()
{
return await _factory.CreateConnectionAsync();
}

Check warning on line 55 in src/Core/AdminConsole/Services/Implementations/RabbitMqEventWriteService.cs

View check run for this annotation

Codecov / codecov/patch

src/Core/AdminConsole/Services/Implementations/RabbitMqEventWriteService.cs#L53-L55

Added lines #L53 - L55 were not covered by tests
}
5 changes: 3 additions & 2 deletions src/Core/Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
<PackageReference Include="DnsClient" Version="1.8.0" />
<PackageReference Include="Fido2.AspNet" Version="3.0.1" />
<PackageReference Include="Handlebars.Net" Version="2.1.6" />
<PackageReference Include="MailKit" Version="4.9.0" />
<PackageReference Include="MailKit" Version="4.9.0" />
<PackageReference Include="Microsoft.AspNetCore.Authentication.JwtBearer" Version="8.0.10" />
<PackageReference Include="Microsoft.Azure.Cosmos" Version="3.46.1" />
<PackageReference Include="Microsoft.Azure.NotificationHubs" Version="4.2.0" />
Expand Down Expand Up @@ -70,12 +70,13 @@
<PackageReference Include="Quartz" Version="3.13.1" />
<PackageReference Include="Quartz.Extensions.Hosting" Version="3.13.1" />
<PackageReference Include="Quartz.Extensions.DependencyInjection" Version="3.13.1" />
<PackageReference Include="RabbitMQ.Client" Version="7.0.0" />
</ItemGroup>

<ItemGroup>
<Protobuf Include="Billing\Pricing\Protos\password-manager.proto" GrpcServices="Client" />
</ItemGroup>

<ItemGroup>
<Folder Include="Resources\" />
<Folder Include="Properties\" />
Expand Down
39 changes: 39 additions & 0 deletions src/Core/Settings/GlobalSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
public virtual SqlSettings PostgreSql { get; set; } = new SqlSettings();
public virtual SqlSettings MySql { get; set; } = new SqlSettings();
public virtual SqlSettings Sqlite { get; set; } = new SqlSettings() { ConnectionString = "Data Source=:memory:" };
public virtual EventLoggingSettings EventLogging { get; set; } = new EventLoggingSettings();
public virtual MailSettings Mail { get; set; } = new MailSettings();
public virtual IConnectionStringSettings Storage { get; set; } = new ConnectionStringSettings();
public virtual ConnectionStringSettings Events { get; set; } = new ConnectionStringSettings();
Expand Down Expand Up @@ -256,6 +257,44 @@
}
}

public class EventLoggingSettings
{
public RabbitMqSettings RabbitMq { get; set; }

public class RabbitMqSettings
{
private string _hostName;
private string _username;
private string _password;
private string _exchangeName;

public virtual string EventRepositoryQueueName { get; set; } = "events-write-queue";
public virtual string HttpPostQueueName { get; set; } = "events-httpPost-queue";
public virtual string HttpPostUrl { get; set; }

public string HostName
{
get => _hostName;

Check warning on line 277 in src/Core/Settings/GlobalSettings.cs

View check run for this annotation

Codecov / codecov/patch

src/Core/Settings/GlobalSettings.cs#L277

Added line #L277 was not covered by tests
set => _hostName = value.Trim('"');
}
public string Username
{
get => _username;

Check warning on line 282 in src/Core/Settings/GlobalSettings.cs

View check run for this annotation

Codecov / codecov/patch

src/Core/Settings/GlobalSettings.cs#L282

Added line #L282 was not covered by tests
set => _username = value.Trim('"');
}
public string Password
{
get => _password;

Check warning on line 287 in src/Core/Settings/GlobalSettings.cs

View check run for this annotation

Codecov / codecov/patch

src/Core/Settings/GlobalSettings.cs#L287

Added line #L287 was not covered by tests
set => _password = value.Trim('"');
}
public string ExchangeName
{
get => _exchangeName;

Check warning on line 292 in src/Core/Settings/GlobalSettings.cs

View check run for this annotation

Codecov / codecov/patch

src/Core/Settings/GlobalSettings.cs#L292

Added line #L292 was not covered by tests
set => _exchangeName = value.Trim('"');
}
}
}

public class ConnectionStringSettings : IConnectionStringSettings
{
private string _connectionString;
Expand Down
15 changes: 15 additions & 0 deletions src/Events/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,21 @@
{
services.AddHostedService<Core.HostedServices.ApplicationCacheHostedService>();
}

// Optional RabbitMQ Listeners
if (CoreHelpers.SettingHasValue(globalSettings.EventLogging.RabbitMq.HostName) &&
CoreHelpers.SettingHasValue(globalSettings.EventLogging.RabbitMq.Username) &&
CoreHelpers.SettingHasValue(globalSettings.EventLogging.RabbitMq.Password) &&
CoreHelpers.SettingHasValue(globalSettings.EventLogging.RabbitMq.ExchangeName))
{
services.AddHostedService<Core.Services.RabbitMqEventRepositoryListener>();

Check warning on line 92 in src/Events/Startup.cs

View check run for this annotation

Codecov / codecov/patch

src/Events/Startup.cs#L88-L92

Added lines #L88 - L92 were not covered by tests

if (CoreHelpers.SettingHasValue(globalSettings.EventLogging.RabbitMq.HttpPostUrl))
{
services.AddHttpClient(RabbitMqEventHttpPostListener.HttpClientName);
services.AddHostedService<Core.Services.RabbitMqEventHttpPostListener>();
}
}

Check warning on line 99 in src/Events/Startup.cs

View check run for this annotation

Codecov / codecov/patch

src/Events/Startup.cs#L95-L99

Added lines #L95 - L99 were not covered by tests
}

public void Configure(
Expand Down
12 changes: 11 additions & 1 deletion src/SharedWeb/Utilities/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,17 @@
}
else if (globalSettings.SelfHosted)
{
services.AddSingleton<IEventWriteService, RepositoryEventWriteService>();
if (CoreHelpers.SettingHasValue(globalSettings.EventLogging.RabbitMq.HostName) &&
CoreHelpers.SettingHasValue(globalSettings.EventLogging.RabbitMq.Username) &&
CoreHelpers.SettingHasValue(globalSettings.EventLogging.RabbitMq.Password) &&
CoreHelpers.SettingHasValue(globalSettings.EventLogging.RabbitMq.ExchangeName))
{
services.AddSingleton<IEventWriteService, RabbitMqEventWriteService>();
}

Check warning on line 334 in src/SharedWeb/Utilities/ServiceCollectionExtensions.cs

View check run for this annotation

Codecov / codecov/patch

src/SharedWeb/Utilities/ServiceCollectionExtensions.cs#L329-L334

Added lines #L329 - L334 were not covered by tests
else
{
services.AddSingleton<IEventWriteService, RepositoryEventWriteService>();
}

Check warning on line 338 in src/SharedWeb/Utilities/ServiceCollectionExtensions.cs

View check run for this annotation

Codecov / codecov/patch

src/SharedWeb/Utilities/ServiceCollectionExtensions.cs#L336-L338

Added lines #L336 - L338 were not covered by tests
}
else
{
Expand Down
Loading