Skip to content

Commit

Permalink
Updated Nugets and added error message for empty broker URL instead o…
Browse files Browse the repository at this point in the history
…f failing with an exception. Switch off generating SAS token by default.
  • Loading branch information
barnstee committed Oct 19, 2023
1 parent a4f2b06 commit c8aa616
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 28 deletions.
7 changes: 7 additions & 0 deletions KafkaClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ public void Connect()
_consumer = null;
}

if (string.IsNullOrEmpty(Settings.Instance.BrokerUrl))
{
// no broker URL configured = nothing to connect to!
_logger.LogError("Broker URL not configured. Cannot connect to broker!");
return;
}

// create Kafka client
var config = new ProducerConfig {
BootstrapServers = Settings.Instance.BrokerUrl + ":" + Settings.Instance.BrokerPort,
Expand Down
61 changes: 43 additions & 18 deletions MQTTClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,27 @@ public MQTTClient(ILoggerFactory loggerFactory, ICommandProcessor commandProcess
_uAApplication = uAApplication;
}

public class MqttClientCertificatesProvider : IMqttClientCertificatesProvider
{
private readonly IUAApplication _uAApplication;

public MqttClientCertificatesProvider(IUAApplication uAApplication)
{
_uAApplication = uAApplication;
}

X509CertificateCollection IMqttClientCertificatesProvider.GetCertificates()
{
X509Certificate2 appCert = _uAApplication.UAApplicationInstance.ApplicationConfiguration.SecurityConfiguration.ApplicationCertificate.Certificate;
if (appCert == null)
{
throw new Exception($"Cannot access OPC UA application certificate!");
}

return new X509CertificateCollection() { appCert };
}
}

public void Connect()
{
try
Expand All @@ -52,6 +73,13 @@ public void Connect()
Diagnostics.Singleton.Info.ConnectedToBroker = false;
}

if (string.IsNullOrEmpty(Settings.Instance.BrokerUrl))
{
// no broker URL configured = nothing to connect to!
_logger.LogError("Broker URL not configured. Cannot connect to broker!");
return;
}

// create MQTT password
string password = Settings.Instance.BrokerPassword;
if (Settings.Instance.CreateBrokerSASToken)
Expand All @@ -73,7 +101,7 @@ public void Connect()
MqttClientOptionsBuilder clientOptions = new MqttClientOptionsBuilder()
.WithTcpServer(Settings.Instance.BrokerUrl, (int?)Settings.Instance.BrokerPort)
.WithClientId(Settings.Instance.PublisherName)
.WithTls(new MqttClientOptionsBuilderTlsParameters { UseTls = Settings.Instance.UseTLS })
.WithTlsOptions(new MqttClientTlsOptions { UseTls = Settings.Instance.UseTLS })
.WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311)
.WithTimeout(TimeSpan.FromSeconds(10))
.WithKeepAlivePeriod(TimeSpan.FromSeconds(100))
Expand All @@ -83,9 +111,9 @@ public void Connect()
if (Settings.Instance.BrokerPort == 443)
{
clientOptions = new MqttClientOptionsBuilder()
.WithWebSocketServer(Settings.Instance.BrokerUrl)
.WithWebSocketServer( o => o.WithUri(Settings.Instance.BrokerUrl))
.WithClientId(Settings.Instance.PublisherName)
.WithTls(new MqttClientOptionsBuilderTlsParameters { UseTls = Settings.Instance.UseTLS })
.WithTlsOptions(new MqttClientTlsOptions { UseTls = Settings.Instance.UseTLS })
.WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311)
.WithTimeout(TimeSpan.FromSeconds(10))
.WithKeepAlivePeriod(TimeSpan.FromSeconds(100))
Expand All @@ -95,21 +123,15 @@ public void Connect()

if (Settings.Instance.UseCertAuth)
{
X509Certificate2 appCert = _uAApplication.UAApplicationInstance.ApplicationConfiguration.SecurityConfiguration.ApplicationCertificate.Certificate;
if (appCert == null)
{
throw new Exception($"Cannot access OPC UA application certificate!");
}

clientOptions = new MqttClientOptionsBuilder()
.WithTcpServer(Settings.Instance.BrokerUrl)
.WithClientId(Settings.Instance.PublisherName)
.WithTls(new MqttClientOptionsBuilderTlsParameters
.WithTlsOptions(new MqttClientTlsOptions
{
UseTls = true,
AllowUntrustedCertificates = true,
IgnoreCertificateChainErrors = true,
Certificates = new[] { appCert }
ClientCertificatesProvider = new MqttClientCertificatesProvider(_uAApplication)
})
.WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V500)
.WithTimeout(TimeSpan.FromSeconds(10))
Expand Down Expand Up @@ -148,17 +170,20 @@ public void Connect()
throw new Exception($"Connection to MQTT broker failed. Status: {connectResult.ResultCode}; status: {status}");
}

MqttClientSubscribeResult subscribeResult = _client.SubscribeAsync(
if (!string.IsNullOrEmpty(Settings.Instance.BrokerCommandTopic))
{
MqttClientSubscribeResult subscribeResult = _client.SubscribeAsync(
new MqttTopicFilter
{
Topic = Settings.Instance.BrokerCommandTopic,
QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce
}).GetAwaiter().GetResult();

// make sure subscriptions were successful
if (subscribeResult.Items.Count != 1 || subscribeResult.Items.ElementAt(0).ResultCode != MqttClientSubscribeResultCode.GrantedQoS0)
{
throw new ApplicationException("Failed to subscribe");
}).GetAwaiter().GetResult();

// make sure subscriptions were successful
if (subscribeResult.Items.Count != 1 || subscribeResult.Items.ElementAt(0).ResultCode != MqttClientSubscribeResultCode.GrantedQoS0)
{
throw new ApplicationException("Failed to subscribe");
}
}

Diagnostics.Singleton.Info.ConnectedToBroker = true;
Expand Down
2 changes: 1 addition & 1 deletion Settings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public async void Save()

public uint BrokerMessageSize { get; set; } = HubMessageSizeMax;

public bool CreateBrokerSASToken { get; set; } = true;
public bool CreateBrokerSASToken { get; set; } = false;

public bool UseTLS { get; set; } = true;

Expand Down
18 changes: 9 additions & 9 deletions UA-CloudPublisher.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,16 @@

<ItemGroup>
<PackageReference Include="Azure.AI.OpenAI" Version="1.0.0-beta.5" />
<PackageReference Include="Azure.Identity" Version="1.9.0" />
<PackageReference Include="Azure.Storage.Blobs" Version="12.17.0" />
<PackageReference Include="Azure.Storage.Files.DataLake" Version="12.15.0" />
<PackageReference Include="Azure.Identity" Version="1.10.3" />
<PackageReference Include="Azure.Storage.Blobs" Version="12.18.0" />
<PackageReference Include="Azure.Storage.Files.DataLake" Version="12.16.0" />
<PackageReference Include="Confluent.Kafka" Version="2.2.0" />
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.18.1" />
<PackageReference Include="MQTTnet" Version="4.2.1.781" />
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Client" Version="1.4.371.96" />
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Client.ComplexTypes" Version="1.4.371.96" />
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Configuration" Version="1.4.371.96" />
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Core" Version="1.4.371.96" />
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.19.5" />
<PackageReference Include="MQTTnet" Version="4.3.1.873" />
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Client" Version="1.4.372.56" />
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Client.ComplexTypes" Version="1.4.372.56" />
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Configuration" Version="1.4.372.56" />
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Core" Version="1.4.372.56" />
<PackageReference Include="Serilog.Extensions.Logging.File" Version="3.0.0" />
<PackageReference Include="System.Collections.Concurrent" Version="4.3.0" />
</ItemGroup>
Expand Down

0 comments on commit c8aa616

Please sign in to comment.