-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge remote-tracking branch 'origin/main' into ac/pm-16812/shortcut-…
…duplicate-group-patch-requests
- Loading branch information
Showing
15 changed files
with
358 additions
and
12 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
35 changes: 35 additions & 0 deletions
35
src/Core/AdminConsole/Services/Implementations/RabbitMqEventHttpPostListener.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
using System.Net.Http.Json; | ||
using Bit.Core.Models.Data; | ||
using Bit.Core.Settings; | ||
using Microsoft.Extensions.Logging; | ||
|
||
namespace Bit.Core.Services; | ||
|
||
public class RabbitMqEventHttpPostListener : RabbitMqEventListenerBase | ||
{ | ||
private readonly HttpClient _httpClient; | ||
private readonly string _httpPostUrl; | ||
private readonly string _queueName; | ||
|
||
protected override string QueueName => _queueName; | ||
|
||
public const string HttpClientName = "EventHttpPostListenerHttpClient"; | ||
|
||
public RabbitMqEventHttpPostListener( | ||
IHttpClientFactory httpClientFactory, | ||
ILogger<RabbitMqEventListenerBase> logger, | ||
GlobalSettings globalSettings) | ||
: base(logger, globalSettings) | ||
{ | ||
_httpClient = httpClientFactory.CreateClient(HttpClientName); | ||
_httpPostUrl = globalSettings.EventLogging.RabbitMq.HttpPostUrl; | ||
_queueName = globalSettings.EventLogging.RabbitMq.HttpPostQueueName; | ||
} | ||
|
||
protected override async Task HandleMessageAsync(EventMessage eventMessage) | ||
{ | ||
var content = JsonContent.Create(eventMessage); | ||
var response = await _httpClient.PostAsync(_httpPostUrl, content); | ||
response.EnsureSuccessStatusCode(); | ||
} | ||
} |
93 changes: 93 additions & 0 deletions
93
src/Core/AdminConsole/Services/Implementations/RabbitMqEventListenerBase.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
using System.Text.Json; | ||
using Bit.Core.Models.Data; | ||
using Bit.Core.Settings; | ||
using Microsoft.Extensions.Hosting; | ||
using Microsoft.Extensions.Logging; | ||
using RabbitMQ.Client; | ||
using RabbitMQ.Client.Events; | ||
|
||
namespace Bit.Core.Services; | ||
|
||
public abstract class RabbitMqEventListenerBase : BackgroundService | ||
{ | ||
private IChannel _channel; | ||
private IConnection _connection; | ||
private readonly string _exchangeName; | ||
private readonly ConnectionFactory _factory; | ||
private readonly ILogger<RabbitMqEventListenerBase> _logger; | ||
|
||
protected abstract string QueueName { get; } | ||
|
||
protected RabbitMqEventListenerBase( | ||
ILogger<RabbitMqEventListenerBase> logger, | ||
GlobalSettings globalSettings) | ||
{ | ||
_factory = new ConnectionFactory | ||
{ | ||
HostName = globalSettings.EventLogging.RabbitMq.HostName, | ||
UserName = globalSettings.EventLogging.RabbitMq.Username, | ||
Password = globalSettings.EventLogging.RabbitMq.Password | ||
}; | ||
_exchangeName = globalSettings.EventLogging.RabbitMq.ExchangeName; | ||
_logger = logger; | ||
} | ||
|
||
public override async Task StartAsync(CancellationToken cancellationToken) | ||
{ | ||
_connection = await _factory.CreateConnectionAsync(cancellationToken); | ||
_channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken); | ||
|
||
await _channel.ExchangeDeclareAsync(exchange: _exchangeName, type: ExchangeType.Fanout, durable: true); | ||
await _channel.QueueDeclareAsync(queue: QueueName, | ||
durable: true, | ||
exclusive: false, | ||
autoDelete: false, | ||
arguments: null, | ||
cancellationToken: cancellationToken); | ||
await _channel.QueueBindAsync(queue: QueueName, | ||
exchange: _exchangeName, | ||
routingKey: string.Empty, | ||
cancellationToken: cancellationToken); | ||
await base.StartAsync(cancellationToken); | ||
} | ||
|
||
protected override async Task ExecuteAsync(CancellationToken stoppingToken) | ||
{ | ||
var consumer = new AsyncEventingBasicConsumer(_channel); | ||
consumer.ReceivedAsync += async (_, eventArgs) => | ||
{ | ||
try | ||
{ | ||
var eventMessage = JsonSerializer.Deserialize<EventMessage>(eventArgs.Body.Span); | ||
await HandleMessageAsync(eventMessage); | ||
} | ||
catch (Exception ex) | ||
{ | ||
_logger.LogError(ex, "An error occurred while processing the message"); | ||
} | ||
}; | ||
|
||
await _channel.BasicConsumeAsync(QueueName, autoAck: true, consumer: consumer, cancellationToken: stoppingToken); | ||
|
||
while (!stoppingToken.IsCancellationRequested) | ||
{ | ||
await Task.Delay(1_000, stoppingToken); | ||
} | ||
} | ||
|
||
public override async Task StopAsync(CancellationToken cancellationToken) | ||
{ | ||
await _channel.CloseAsync(); | ||
await _connection.CloseAsync(); | ||
await base.StopAsync(cancellationToken); | ||
} | ||
|
||
public override void Dispose() | ||
{ | ||
_channel.Dispose(); | ||
_connection.Dispose(); | ||
base.Dispose(); | ||
} | ||
|
||
protected abstract Task HandleMessageAsync(EventMessage eventMessage); | ||
} |
29 changes: 29 additions & 0 deletions
29
src/Core/AdminConsole/Services/Implementations/RabbitMqEventRepositoryListener.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
using Bit.Core.Models.Data; | ||
using Bit.Core.Settings; | ||
using Microsoft.Extensions.DependencyInjection; | ||
using Microsoft.Extensions.Logging; | ||
|
||
namespace Bit.Core.Services; | ||
|
||
public class RabbitMqEventRepositoryListener : RabbitMqEventListenerBase | ||
{ | ||
private readonly IEventWriteService _eventWriteService; | ||
private readonly string _queueName; | ||
|
||
protected override string QueueName => _queueName; | ||
|
||
public RabbitMqEventRepositoryListener( | ||
[FromKeyedServices("persistent")] IEventWriteService eventWriteService, | ||
ILogger<RabbitMqEventListenerBase> logger, | ||
GlobalSettings globalSettings) | ||
: base(logger, globalSettings) | ||
{ | ||
_eventWriteService = eventWriteService; | ||
_queueName = globalSettings.EventLogging.RabbitMq.EventRepositoryQueueName; | ||
} | ||
|
||
protected override Task HandleMessageAsync(EventMessage eventMessage) | ||
{ | ||
return _eventWriteService.CreateAsync(eventMessage); | ||
} | ||
} |
65 changes: 65 additions & 0 deletions
65
src/Core/AdminConsole/Services/Implementations/RabbitMqEventWriteService.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
using System.Text.Json; | ||
using Bit.Core.Models.Data; | ||
using Bit.Core.Settings; | ||
using RabbitMQ.Client; | ||
|
||
namespace Bit.Core.Services; | ||
public class RabbitMqEventWriteService : IEventWriteService, IAsyncDisposable | ||
{ | ||
private readonly ConnectionFactory _factory; | ||
private readonly Lazy<Task<IConnection>> _lazyConnection; | ||
private readonly string _exchangeName; | ||
|
||
public RabbitMqEventWriteService(GlobalSettings globalSettings) | ||
{ | ||
_factory = new ConnectionFactory | ||
{ | ||
HostName = globalSettings.EventLogging.RabbitMq.HostName, | ||
UserName = globalSettings.EventLogging.RabbitMq.Username, | ||
Password = globalSettings.EventLogging.RabbitMq.Password | ||
}; | ||
_exchangeName = globalSettings.EventLogging.RabbitMq.ExchangeName; | ||
|
||
_lazyConnection = new Lazy<Task<IConnection>>(CreateConnectionAsync); | ||
} | ||
|
||
public async Task CreateAsync(IEvent e) | ||
{ | ||
var connection = await _lazyConnection.Value; | ||
using var channel = await connection.CreateChannelAsync(); | ||
|
||
await channel.ExchangeDeclareAsync(exchange: _exchangeName, type: ExchangeType.Fanout, durable: true); | ||
|
||
var body = JsonSerializer.SerializeToUtf8Bytes(e); | ||
|
||
await channel.BasicPublishAsync(exchange: _exchangeName, routingKey: string.Empty, body: body); | ||
} | ||
|
||
public async Task CreateManyAsync(IEnumerable<IEvent> events) | ||
{ | ||
var connection = await _lazyConnection.Value; | ||
using var channel = await connection.CreateChannelAsync(); | ||
await channel.ExchangeDeclareAsync(exchange: _exchangeName, type: ExchangeType.Fanout, durable: true); | ||
|
||
foreach (var e in events) | ||
{ | ||
var body = JsonSerializer.SerializeToUtf8Bytes(e); | ||
|
||
await channel.BasicPublishAsync(exchange: _exchangeName, routingKey: string.Empty, body: body); | ||
} | ||
} | ||
|
||
public async ValueTask DisposeAsync() | ||
{ | ||
if (_lazyConnection.IsValueCreated) | ||
{ | ||
var connection = await _lazyConnection.Value; | ||
await connection.DisposeAsync(); | ||
} | ||
} | ||
|
||
private async Task<IConnection> CreateConnectionAsync() | ||
{ | ||
return await _factory.CreateConnectionAsync(); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.