diff --git a/KafkaClient.cs b/KafkaClient.cs index 4b60d8c..3f3a01d 100644 --- a/KafkaClient.cs +++ b/KafkaClient.cs @@ -53,6 +53,13 @@ public void Connect() _consumer = null; } + if (string.IsNullOrEmpty(Settings.Instance.BrokerUrl)) + { + // no broker URL configured = nothing to connect to! + _logger.LogError("Broker URL not configured. Cannot connect to broker!"); + return; + } + // create Kafka client var config = new ProducerConfig { BootstrapServers = Settings.Instance.BrokerUrl + ":" + Settings.Instance.BrokerPort, diff --git a/MQTTClient.cs b/MQTTClient.cs index 3f798c5..ee04ac1 100644 --- a/MQTTClient.cs +++ b/MQTTClient.cs @@ -38,6 +38,27 @@ public MQTTClient(ILoggerFactory loggerFactory, ICommandProcessor commandProcess _uAApplication = uAApplication; } + public class MqttClientCertificatesProvider : IMqttClientCertificatesProvider + { + private readonly IUAApplication _uAApplication; + + public MqttClientCertificatesProvider(IUAApplication uAApplication) + { + _uAApplication = uAApplication; + } + + X509CertificateCollection IMqttClientCertificatesProvider.GetCertificates() + { + X509Certificate2 appCert = _uAApplication.UAApplicationInstance.ApplicationConfiguration.SecurityConfiguration.ApplicationCertificate.Certificate; + if (appCert == null) + { + throw new Exception($"Cannot access OPC UA application certificate!"); + } + + return new X509CertificateCollection() { appCert }; + } + } + public void Connect() { try @@ -52,6 +73,13 @@ public void Connect() Diagnostics.Singleton.Info.ConnectedToBroker = false; } + if (string.IsNullOrEmpty(Settings.Instance.BrokerUrl)) + { + // no broker URL configured = nothing to connect to! + _logger.LogError("Broker URL not configured. Cannot connect to broker!"); + return; + } + // create MQTT password string password = Settings.Instance.BrokerPassword; if (Settings.Instance.CreateBrokerSASToken) @@ -73,7 +101,7 @@ public void Connect() MqttClientOptionsBuilder clientOptions = new MqttClientOptionsBuilder() .WithTcpServer(Settings.Instance.BrokerUrl, (int?)Settings.Instance.BrokerPort) .WithClientId(Settings.Instance.PublisherName) - .WithTls(new MqttClientOptionsBuilderTlsParameters { UseTls = Settings.Instance.UseTLS }) + .WithTlsOptions(new MqttClientTlsOptions { UseTls = Settings.Instance.UseTLS }) .WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311) .WithTimeout(TimeSpan.FromSeconds(10)) .WithKeepAlivePeriod(TimeSpan.FromSeconds(100)) @@ -83,9 +111,9 @@ public void Connect() if (Settings.Instance.BrokerPort == 443) { clientOptions = new MqttClientOptionsBuilder() - .WithWebSocketServer(Settings.Instance.BrokerUrl) + .WithWebSocketServer( o => o.WithUri(Settings.Instance.BrokerUrl)) .WithClientId(Settings.Instance.PublisherName) - .WithTls(new MqttClientOptionsBuilderTlsParameters { UseTls = Settings.Instance.UseTLS }) + .WithTlsOptions(new MqttClientTlsOptions { UseTls = Settings.Instance.UseTLS }) .WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311) .WithTimeout(TimeSpan.FromSeconds(10)) .WithKeepAlivePeriod(TimeSpan.FromSeconds(100)) @@ -95,21 +123,15 @@ public void Connect() if (Settings.Instance.UseCertAuth) { - X509Certificate2 appCert = _uAApplication.UAApplicationInstance.ApplicationConfiguration.SecurityConfiguration.ApplicationCertificate.Certificate; - if (appCert == null) - { - throw new Exception($"Cannot access OPC UA application certificate!"); - } - clientOptions = new MqttClientOptionsBuilder() .WithTcpServer(Settings.Instance.BrokerUrl) .WithClientId(Settings.Instance.PublisherName) - .WithTls(new MqttClientOptionsBuilderTlsParameters + .WithTlsOptions(new MqttClientTlsOptions { UseTls = true, AllowUntrustedCertificates = true, IgnoreCertificateChainErrors = true, - Certificates = new[] { appCert } + ClientCertificatesProvider = new MqttClientCertificatesProvider(_uAApplication) }) .WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V500) .WithTimeout(TimeSpan.FromSeconds(10)) @@ -148,17 +170,20 @@ public void Connect() throw new Exception($"Connection to MQTT broker failed. Status: {connectResult.ResultCode}; status: {status}"); } - MqttClientSubscribeResult subscribeResult = _client.SubscribeAsync( + if (!string.IsNullOrEmpty(Settings.Instance.BrokerCommandTopic)) + { + MqttClientSubscribeResult subscribeResult = _client.SubscribeAsync( new MqttTopicFilter { Topic = Settings.Instance.BrokerCommandTopic, QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce - }).GetAwaiter().GetResult(); - - // make sure subscriptions were successful - if (subscribeResult.Items.Count != 1 || subscribeResult.Items.ElementAt(0).ResultCode != MqttClientSubscribeResultCode.GrantedQoS0) - { - throw new ApplicationException("Failed to subscribe"); + }).GetAwaiter().GetResult(); + + // make sure subscriptions were successful + if (subscribeResult.Items.Count != 1 || subscribeResult.Items.ElementAt(0).ResultCode != MqttClientSubscribeResultCode.GrantedQoS0) + { + throw new ApplicationException("Failed to subscribe"); + } } Diagnostics.Singleton.Info.ConnectedToBroker = true; diff --git a/Settings.cs b/Settings.cs index 2148cc4..1d43c2e 100644 --- a/Settings.cs +++ b/Settings.cs @@ -127,7 +127,7 @@ public async void Save() public uint BrokerMessageSize { get; set; } = HubMessageSizeMax; - public bool CreateBrokerSASToken { get; set; } = true; + public bool CreateBrokerSASToken { get; set; } = false; public bool UseTLS { get; set; } = true; diff --git a/UA-CloudPublisher.csproj b/UA-CloudPublisher.csproj index 3eb3c1b..559aa2a 100644 --- a/UA-CloudPublisher.csproj +++ b/UA-CloudPublisher.csproj @@ -45,16 +45,16 @@ - - - + + + - - - - - - + + + + + +