Skip to content

Commit

Permalink
Upgrade to rabbitmq-client 6.X (#32)
Browse files Browse the repository at this point in the history
* Upgrade to rabbitmq-client 6.X

* Fjern _

* Fjern _ og bruk this i stedet
  • Loading branch information
zapodot authored Nov 10, 2020
1 parent 62f87d0 commit 65fa864
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 91 deletions.
1 change: 1 addition & 0 deletions KS.Fiks.IO.Client.Tests/Amqp/AmqpHandlerFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using Ks.Fiks.Maskinporten.Client;
using Moq;
using RabbitMQ.Client;
using RabbitMQ.Client.Exceptions;

namespace KS.Fiks.IO.Client.Tests.Amqp
{
Expand Down
2 changes: 1 addition & 1 deletion KS.Fiks.IO.Client.Tests/KS.Fiks.IO.Client.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
<PackageReference Include="KS.Fiks.QA" Version="1.0.0" PrivateAssets="All" />
<PackageReference Include="Moq" Version="4.14.7" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.3" />
<PackageReference Include="RabbitMQ.Client" Version="5.2.0" />
<PackageReference Include="RabbitMQ.Client" Version="6.2.1" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.3">
<PrivateAssets>all</PrivateAssets>
Expand Down
22 changes: 11 additions & 11 deletions KS.Fiks.IO.Client/Amqp/AmqpConsumerFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,28 @@ namespace KS.Fiks.IO.Client.Amqp
{
internal class AmqpConsumerFactory : IAmqpConsumerFactory
{
private readonly IFileWriter _fileWriter;
private readonly IFileWriter fileWriter;

private readonly IAsicDecrypter _decrypter;
private readonly IAsicDecrypter decrypter;

private readonly ISendHandler _sendHandler;
private readonly ISendHandler sendHandler;

private readonly IDokumentlagerHandler _dokumentlagerHandler;
private readonly IDokumentlagerHandler dokumentlagerHandler;

private readonly Guid _accountId;
private readonly Guid accountId;

public AmqpConsumerFactory(ISendHandler sendHandler, IDokumentlagerHandler dokumentlagerHandler, KontoConfiguration kontoConfiguration)
{
_dokumentlagerHandler = dokumentlagerHandler;
_fileWriter = new FileWriter();
_decrypter = new AsicDecrypter(DecryptionService.Create(kontoConfiguration.PrivatNokkel));
_sendHandler = sendHandler;
_accountId = kontoConfiguration.KontoId;
this.dokumentlagerHandler = dokumentlagerHandler;
this.fileWriter = new FileWriter();
this.decrypter = new AsicDecrypter(DecryptionService.Create(kontoConfiguration.PrivatNokkel));
this.sendHandler = sendHandler;
this.accountId = kontoConfiguration.KontoId;
}

public IAmqpReceiveConsumer CreateReceiveConsumer(IModel channel)
{
return new AmqpReceiveConsumer(channel, _dokumentlagerHandler, _fileWriter, _decrypter, _sendHandler, _accountId);
return new AmqpReceiveConsumer(channel, this.dokumentlagerHandler, this.fileWriter, this.decrypter, this.sendHandler, this.accountId);
}
}
}
52 changes: 26 additions & 26 deletions KS.Fiks.IO.Client/Amqp/AmqpHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,19 @@ internal class AmqpHandler : IAmqpHandler
{
private const string QueuePrefix = "fiksio.konto.";

private readonly IModel _channel;
private readonly IModel channel;

private readonly IAmqpConsumerFactory _amqpConsumerFactory;
private readonly IAmqpConsumerFactory amqpConsumerFactory;

private readonly IConnectionFactory _connectionFactory;
private readonly IConnectionFactory connectionFactory;

private readonly KontoConfiguration _kontoConfiguration;
private readonly KontoConfiguration kontoConfiguration;

private readonly IMaskinportenClient _maskinportenClient;
private readonly IMaskinportenClient maskinportenClient;

private readonly SslOption _sslOption;
private readonly SslOption sslOption;

private IAmqpReceiveConsumer _receiveConsumer;
private IAmqpReceiveConsumer receiveConsumer;

internal AmqpHandler(
IMaskinportenClient maskinportenClient,
Expand All @@ -39,29 +39,29 @@ internal AmqpHandler(
IConnectionFactory connectionFactory = null,
IAmqpConsumerFactory consumerFactory = null)
{
_sslOption = amqpConfiguration.SslOption ?? new SslOption();
_maskinportenClient = maskinportenClient;
_kontoConfiguration = kontoConfiguration;
_connectionFactory = connectionFactory ?? new ConnectionFactory();
this.sslOption = amqpConfiguration.SslOption ?? new SslOption();
this.maskinportenClient = maskinportenClient;
this.kontoConfiguration = kontoConfiguration;
this.connectionFactory = connectionFactory ?? new ConnectionFactory();
SetupConnectionFactory(integrasjonConfiguration);
_channel = ConnectToChannel(amqpConfiguration);
_amqpConsumerFactory = consumerFactory ?? new AmqpConsumerFactory(sendHandler, dokumentlagerHandler, _kontoConfiguration);
this.channel = ConnectToChannel(amqpConfiguration);
this.amqpConsumerFactory = consumerFactory ?? new AmqpConsumerFactory(sendHandler, dokumentlagerHandler, this.kontoConfiguration);
}

public void AddMessageReceivedHandler(
EventHandler<MottattMeldingArgs> receivedEvent,
EventHandler<ConsumerEventArgs> cancelledEvent)
{
if (_receiveConsumer == null)
if (this.receiveConsumer == null)
{
_receiveConsumer = _amqpConsumerFactory.CreateReceiveConsumer(_channel);
this.receiveConsumer = this.amqpConsumerFactory.CreateReceiveConsumer(this.channel);
}

_receiveConsumer.Received += receivedEvent;
this.receiveConsumer.Received += receivedEvent;

_receiveConsumer.ConsumerCancelled += cancelledEvent;
this.receiveConsumer.ConsumerCancelled += cancelledEvent;

_channel.BasicConsume(_receiveConsumer, GetQueueName());
this.channel.BasicConsume(this.receiveConsumer, GetQueueName());
}

public void Dispose()
Expand All @@ -74,7 +74,7 @@ protected virtual void Dispose(bool disposing)
{
if (disposing)
{
_channel?.Dispose();
this.channel?.Dispose();
}
}

Expand All @@ -95,22 +95,22 @@ private IConnection CreateConnection(AmqpConfiguration configuration)
{
try
{
var endpoint = new AmqpTcpEndpoint(configuration.Host, configuration.Port, _sslOption);
return _connectionFactory.CreateConnection(new List<AmqpTcpEndpoint> {endpoint});
var endpoint = new AmqpTcpEndpoint(configuration.Host, configuration.Port, this.sslOption);
return this.connectionFactory.CreateConnection(new List<AmqpTcpEndpoint> {endpoint});
}
catch (Exception ex)
{
throw new FiksIOAmqpConnectionFailedException($"Unable to create connection. Host: {configuration.Host}; Port: {configuration.Port}; UserName:{_connectionFactory.UserName}; SslOption.Enabled: {_sslOption?.Enabled};SslOption.ServerName: {_sslOption?.ServerName}", ex);
throw new FiksIOAmqpConnectionFailedException($"Unable to create connection. Host: {configuration.Host}; Port: {configuration.Port}; UserName:{this.connectionFactory.UserName}; SslOption.Enabled: {this.sslOption?.Enabled};SslOption.ServerName: {this.sslOption?.ServerName}", ex);
}
}

private void SetupConnectionFactory(IntegrasjonConfiguration integrasjonConfiguration)
{
try
{
var maskinportenToken = _maskinportenClient.GetAccessToken(integrasjonConfiguration.Scope).Result;
_connectionFactory.UserName = integrasjonConfiguration.IntegrasjonId.ToString();
_connectionFactory.Password = $"{integrasjonConfiguration.IntegrasjonPassord} {maskinportenToken.Token}";
var maskinportenToken = this.maskinportenClient.GetAccessToken(integrasjonConfiguration.Scope).Result;
this.connectionFactory.UserName = integrasjonConfiguration.IntegrasjonId.ToString();
this.connectionFactory.Password = $"{integrasjonConfiguration.IntegrasjonPassord} {maskinportenToken.Token}";
}
catch (AggregateException ex)
{
Expand All @@ -120,7 +120,7 @@ private void SetupConnectionFactory(IntegrasjonConfiguration integrasjonConfigur

private string GetQueueName()
{
return $"{QueuePrefix}{_kontoConfiguration.KontoId}";
return $"{QueuePrefix}{this.kontoConfiguration.KontoId}";
}
}
}
87 changes: 46 additions & 41 deletions KS.Fiks.IO.Client/Amqp/AmqpReceiveConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ internal class AmqpReceiveConsumer : DefaultBasicConsumer, IAmqpReceiveConsumer
{
private const string DokumentlagerHeaderName = "dokumentlager-id";

private readonly IAsicDecrypter _decrypter;
private readonly Guid accountId;

private readonly IFileWriter _fileWriter;
private readonly IAsicDecrypter decrypter;

private readonly ISendHandler _sendHandler;
private readonly IDokumentlagerHandler dokumentlagerHandler;

private readonly IDokumentlagerHandler _dokumentlagerHandler;
private readonly IFileWriter fileWriter;

private readonly Guid _accountId;
private readonly ISendHandler sendHandler;

public AmqpReceiveConsumer(
IModel model,
Expand All @@ -35,29 +35,11 @@ public AmqpReceiveConsumer(
Guid accountId)
: base(model)
{
_dokumentlagerHandler = dokumentlagerHandler;
_fileWriter = fileWriter;
_decrypter = decrypter;
_sendHandler = sendHandler;
_accountId = accountId;
}

private static bool IsDataInDokumentlager(IBasicProperties properties)
{
return ReceivedMessageParser.GetGuidFromHeader(properties.Headers, DokumentlagerHeaderName) != null;
}

private static Guid GetDokumentlagerId(IBasicProperties properties)
{
try
{
return ReceivedMessageParser.RequireGuidFromHeader(properties.Headers, DokumentlagerHeaderName);
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
throw;
}
this.dokumentlagerHandler = dokumentlagerHandler;
this.fileWriter = fileWriter;
this.decrypter = decrypter;
this.sendHandler = sendHandler;
this.accountId = accountId;
}

public event EventHandler<MottattMeldingArgs> Received;
Expand All @@ -69,7 +51,7 @@ public override void HandleBasicDeliver(
string exchange,
string routingKey,
IBasicProperties properties,
byte[] body)
ReadOnlyMemory<byte> body)
{
base.HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);

Expand All @@ -87,7 +69,7 @@ public override void HandleBasicDeliver(
new MottattMeldingArgs(
receivedMessage,
new SvarSender(
this._sendHandler,
this.sendHandler,
receivedMessage,
new AmqpAcknowledgeManager(
() => Model.BasicAck(deliveryTag, false),
Expand All @@ -101,13 +83,41 @@ public override void HandleBasicDeliver(
}
}

private MottattMelding ParseMessage(IBasicProperties properties, byte[] body)
private static bool IsDataInDokumentlager(IBasicProperties properties)
{
return ReceivedMessageParser.GetGuidFromHeader(properties.Headers, DokumentlagerHeaderName) != null;
}

private static Guid GetDokumentlagerId(IBasicProperties properties)
{
try
{
return ReceivedMessageParser.RequireGuidFromHeader(properties.Headers, DokumentlagerHeaderName);
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
throw;
}
}

private static bool HasPayload(IBasicProperties properties, ReadOnlyMemory<byte> body)
{
var metadata = ReceivedMessageParser.Parse(_accountId, properties);
return new MottattMelding(HasPayload(properties, body), metadata, GetDataProvider(properties, body), this._decrypter, this._fileWriter);
return IsDataInDokumentlager(properties) || body.Length > 0;
}

private Func<Task<Stream>> GetDataProvider(IBasicProperties properties, byte[] body)
private MottattMelding ParseMessage(IBasicProperties properties, ReadOnlyMemory<byte> body)
{
var metadata = ReceivedMessageParser.Parse(this.accountId, properties);
return new MottattMelding(
HasPayload(properties, body),
metadata,
GetDataProvider(properties, body),
this.decrypter,
this.fileWriter);
}

private Func<Task<Stream>> GetDataProvider(IBasicProperties properties, ReadOnlyMemory<byte> body)
{
if (!HasPayload(properties, body))
{
Expand All @@ -116,15 +126,10 @@ private Func<Task<Stream>> GetDataProvider(IBasicProperties properties, byte[] b

if (IsDataInDokumentlager(properties))
{
return async () => await _dokumentlagerHandler.Download(GetDokumentlagerId(properties));
return async () => await this.dokumentlagerHandler.Download(GetDokumentlagerId(properties));
}

return async () => await Task.FromResult(new MemoryStream(body));
}

private bool HasPayload(IBasicProperties properties, byte[] body)
{
return IsDataInDokumentlager(properties) || body.Length > 0;
return async () => await Task.FromResult(new MemoryStream(body.ToArray()));
}
}
}
2 changes: 1 addition & 1 deletion KS.Fiks.IO.Client/KS.Fiks.IO.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
<PackageReference Include="KS.Fiks.Maskinporten.Client" Version="1.0.5" />
<PackageReference Include="KS.Fiks.QA" Version="1.0.0" PrivateAssets="All" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.3" />
<PackageReference Include="RabbitMQ.Client" Version="5.2.0" />
<PackageReference Include="RabbitMQ.Client" Version="6.2.1" />
<PackageReference Include="StyleCop.Analyzers" Version="1.0.2">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
Expand Down
22 changes: 11 additions & 11 deletions KS.Fiks.IO.Client/Utility/ReceivedMessageParser.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,10 @@ internal static MottattMeldingMetadata Parse(
mottakerKontoId: receiverAccountId,
avsenderKontoId: RequireGuidFromHeader(headers, SenderAccountIdHeaderName),
svarPaMelding: GetGuidFromHeader(headers, RelatedMessageIdHeaderName),
ttl: ParseTimeSpan(properties.Expiration, TtlHeaderName),
ttl: ParseTimeSpan(properties.Expiration),
headere: ExtractEgendefinerteHeadere(headers));
}

private static Dictionary<string, string> ExtractEgendefinerteHeadere(IDictionary<string, object> headers)
{
return headers
.Where(h => h.Key.StartsWith(EgendefinertHeaderPrefix))
.ToDictionary(
h => h.Key.Substring(EgendefinertHeaderPrefix.Length),
h => System.Text.Encoding.UTF8.GetString((byte[])h.Value));
}

internal static Guid RequireGuidFromHeader(IDictionary<string, object> header, string headerName)
{
var headerAsString = RequireStringFromHeader(header, headerName);
Expand All @@ -70,6 +61,15 @@ internal static Guid RequireGuidFromHeader(IDictionary<string, object> header, s
}
}

private static Dictionary<string, string> ExtractEgendefinerteHeadere(IDictionary<string, object> headers)
{
return headers
.Where(h => h.Key.StartsWith(EgendefinertHeaderPrefix))
.ToDictionary(
h => h.Key.Substring(EgendefinertHeaderPrefix.Length),
h => System.Text.Encoding.UTF8.GetString((byte[])h.Value));
}

private static string RequireStringFromHeader(IDictionary<string, object> header, string headerName)
{
if (!header.ContainsKey(headerName))
Expand Down Expand Up @@ -100,7 +100,7 @@ private static Guid ParseGuid(string guidAsString, string headerName)
}
}

private static TimeSpan ParseTimeSpan(string longAsString, string headerName)
private static TimeSpan ParseTimeSpan(string longAsString)
{
return long.TryParse(longAsString, NumberStyles.Any, CultureInfo.InvariantCulture, out var timeAsLong) ? TimeSpan.FromMilliseconds(timeAsLong) : TimeSpan.MaxValue;
}
Expand Down

0 comments on commit 65fa864

Please sign in to comment.