Skip to content

Commit

Permalink
Added support for persisting config files and certs in Microsoft OneL…
Browse files Browse the repository at this point in the history
…ake and general code hardening. also improved launch settings.
  • Loading branch information
barnstee committed Jul 29, 2023
1 parent a6361bb commit a4f2b06
Show file tree
Hide file tree
Showing 15 changed files with 297 additions and 66 deletions.
11 changes: 11 additions & 0 deletions AzureFileStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ public AzureFileStorage(ILoggerFactory logger)

public async Task<string> FindFileAsync(string path, string name, CancellationToken cancellationToken = default)
{
if (string.IsNullOrEmpty(path) || string.IsNullOrEmpty(name))
{
return null;
}

try
{
if (!string.IsNullOrEmpty(Environment.GetEnvironmentVariable("STORAGE_CONNECTION_STRING")))
Expand All @@ -36,6 +41,8 @@ public async Task<string> FindFileAsync(string path, string name, CancellationTo
BlobContainerClient container = new BlobContainerClient(Environment.GetEnvironmentVariable("STORAGE_CONNECTION_STRING"), _blobContainerName);
await container.CreateIfNotExistsAsync(cancellationToken: cancellationToken).ConfigureAwait(false);

Diagnostics.Singleton.Info.ConnectedToCloudStorage = true;

var resultSegment = container.GetBlobsAsync();
await foreach (BlobItem blobItem in resultSegment.ConfigureAwait(false))
{
Expand Down Expand Up @@ -70,6 +77,8 @@ public async Task<string> StoreFileAsync(string path, byte[] content, Cancellati
BlobContainerClient container = new BlobContainerClient(Environment.GetEnvironmentVariable("STORAGE_CONNECTION_STRING"), _blobContainerName);
await container.CreateIfNotExistsAsync(cancellationToken: cancellationToken).ConfigureAwait(false);

Diagnostics.Singleton.Info.ConnectedToCloudStorage = true;

// Get a reference to the blob
BlobClient blob = container.GetBlobClient(path);

Expand Down Expand Up @@ -114,6 +123,8 @@ public async Task<byte[]> LoadFileAsync(string name, CancellationToken cancellat
BlobContainerClient container = new BlobContainerClient(Environment.GetEnvironmentVariable("STORAGE_CONNECTION_STRING"), _blobContainerName);
await container.CreateIfNotExistsAsync(cancellationToken: cancellationToken).ConfigureAwait(false);

Diagnostics.Singleton.Info.ConnectedToCloudStorage = true;

var resultSegment = container.GetBlobsAsync();
await foreach (BlobItem blobItem in resultSegment.ConfigureAwait(false))
{
Expand Down
5 changes: 4 additions & 1 deletion Controllers/HomeController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@
namespace Opc.Ua.Cloud.Publisher.Controllers
{
using Microsoft.AspNetCore.Mvc;
using Opc.Ua.Cloud.Publisher.Models;

public class HomeController : Controller
{
public static string AuthenticationCode { get; set; } = "Not applicable";

public IActionResult Index()
{
return View();
return View("Index", new HomeModel() { AuthCode = AuthenticationCode });
}

[HttpGet]
Expand Down
7 changes: 5 additions & 2 deletions Diagnostics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ private void Clear()
{
Info.PublisherStartTime = DateTime.UtcNow;
Info.ConnectedToBroker = false;
Info.ConnectedToCloudStorage = false;
Info.NumberOfOpcSessionsConnected = 0;
Info.NumberOfOpcSubscriptionsConnected = 0;
Info.NumberOfOpcMonitoredItemsMonitored = 0;
Expand All @@ -70,14 +71,14 @@ private void Clear()

public async Task RunAsync(CancellationToken cancellationToken = default)
{
Clear();

if ( Settings.Instance.DiagnosticsLoggingInterval == 0)
{
// diagnostics are disabled
return;
}

Clear();

uint ticks = 0;
while (true)
{
Expand All @@ -97,6 +98,7 @@ public async Task RunAsync(CancellationToken cancellationToken = default)

_hubClient.AddOrUpdateTableEntry("Publisher Start Time", Info.PublisherStartTime.ToString());
_hubClient.AddOrUpdateTableEntry("Connected to broker(s)", Info.ConnectedToBroker.ToString());
_hubClient.AddOrUpdateTableEntry("Connected to cloud storage/OneLake", Info.ConnectedToCloudStorage.ToString());
_hubClient.AddOrUpdateTableEntry("OPC UA sessions", Info.NumberOfOpcSessionsConnected.ToString());
_hubClient.AddOrUpdateTableEntry("OPC UA subscriptions", Info.NumberOfOpcSubscriptionsConnected.ToString());
_hubClient.AddOrUpdateTableEntry("OPC UA monitored items", Info.NumberOfOpcMonitoredItemsMonitored.ToString());
Expand Down Expand Up @@ -129,6 +131,7 @@ public async Task RunAsync(CancellationToken cancellationToken = default)
if (ticks % 10 == 0)
{
DiagnosticsSend("ConnectedToBroker", new DataValue(Info.ConnectedToBroker));
DiagnosticsSend("ConnectedToCloudStorage", new DataValue(Info.ConnectedToCloudStorage));
DiagnosticsSend("NumOpcSessions", new DataValue(Info.NumberOfOpcSessionsConnected));
DiagnosticsSend("NumOpcSubscriptions", new DataValue(Info.NumberOfOpcSubscriptionsConnected));
DiagnosticsSend("NumOpcMonitoredItems", new DataValue(Info.NumberOfOpcMonitoredItemsMonitored));
Expand Down
5 changes: 5 additions & 0 deletions LocalFileStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ public LocalFileStorage(ILoggerFactory logger)

public Task<string> FindFileAsync(string path, string name, CancellationToken cancellationToken = default)
{
if (string.IsNullOrEmpty(path) || string.IsNullOrEmpty(name))
{
return null;
}

try
{
foreach (string filePath in Directory.GetFiles(path))
Expand Down
21 changes: 11 additions & 10 deletions MessageProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,6 @@ public MessageProcessor(
_logger = loggerFactory.CreateLogger("MessageProcessor");
_encoder = encoder;
_sink = sink;

if (Settings.Instance.MetadataSendInterval != 0)
{
_metadataTimer = new Timer(SendMetadataOnTimer, null, (int)Settings.Instance.MetadataSendInterval * 1000, (int)Settings.Instance.MetadataSendInterval * 1000);
}

if (Settings.Instance.SendUAStatus)
{
_statusTimer = new Timer(SendStatusOnTimer, null, (int)Settings.Instance.DiagnosticsLoggingInterval * 1000, (int)Settings.Instance.DiagnosticsLoggingInterval * 1000);
}
}

public void ClearMetadataMessageCache()
Expand Down Expand Up @@ -106,6 +96,7 @@ public void Run(CancellationToken cancellationToken = default)
}

Init();

_isRunning = true;

while (true)
Expand Down Expand Up @@ -213,6 +204,16 @@ private void Init()

// init our send time
_nextSendTime = DateTime.UtcNow + TimeSpan.FromSeconds(Settings.Instance.DefaultSendIntervalSeconds);

if (Settings.Instance.MetadataSendInterval != 0)
{
_metadataTimer = new Timer(SendMetadataOnTimer, null, (int)Settings.Instance.MetadataSendInterval * 1000, (int)Settings.Instance.MetadataSendInterval * 1000);
}

if (Settings.Instance.SendUAStatus)
{
_statusTimer = new Timer(SendStatusOnTimer, null, (int)Settings.Instance.DiagnosticsLoggingInterval * 1000, (int)Settings.Instance.DiagnosticsLoggingInterval * 1000);
}
}

private void BatchMessage(string jsonMessage)
Expand Down
3 changes: 3 additions & 0 deletions Models/DiagnosticsModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ public class DiagnosticsModel
[JsonProperty(NullValueHandling = NullValueHandling.Include)]
public bool ConnectedToBroker { get; set; } = false;

[JsonProperty(NullValueHandling = NullValueHandling.Include)]
public bool ConnectedToCloudStorage { get; set; } = false;

[JsonProperty(NullValueHandling = NullValueHandling.Include)]
public int NumberOfOpcSessionsConnected { get; set; } = 0;

Expand Down
7 changes: 7 additions & 0 deletions Models/HomeModel.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace Opc.Ua.Cloud.Publisher.Models
{
public class HomeModel
{
public string AuthCode { get; set; } = string.Empty;
}
}
191 changes: 191 additions & 0 deletions OneLakeFileStorage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@

namespace Opc.Ua.Cloud.Publisher
{
using Azure.Identity;
using Azure.Storage.Files.DataLake;
using Azure.Storage.Files.DataLake.Models;
using Microsoft.Extensions.Logging;
using Opc.Ua.Cloud.Publisher.Controllers;
using Opc.Ua.Cloud.Publisher.Interfaces;
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

public class OneLakeFileStorage : IFileStorage
{
private readonly ILogger _logger;

private string _blobContainerName = "uacloudpublisher";

private DeviceCodeCredential _credential;

private DataLakeServiceClient _dataLakeServiceClient;

private DataLakeFileSystemClient _fileSystemClient;

private object _lock = new object();

private Task MyDeviceCodeCallback(DeviceCodeInfo info, CancellationToken cancellation)
{
_logger.LogInformation(info.Message);

HomeController.AuthenticationCode = info.UserCode;

return Task.CompletedTask;
}

public OneLakeFileStorage(ILoggerFactory logger)
{
_logger = logger.CreateLogger("OneLakeFileStorage");

if (!string.IsNullOrEmpty(Environment.GetEnvironmentVariable("STORAGE_CONTAINER_NAME")))
{
_blobContainerName = Environment.GetEnvironmentVariable("STORAGE_CONTAINER_NAME");
}

DeviceCodeCredentialOptions options = new()
{
DeviceCodeCallback = MyDeviceCodeCallback
};
_credential = new(options);
}

public Task<string> FindFileAsync(string path, string name, CancellationToken cancellationToken = default)
{
if (string.IsNullOrEmpty(path) || string.IsNullOrEmpty(name))
{
return null;
}

try
{
lock (_lock)
{
VerifyOneLakeConnectivity();

if (!string.IsNullOrEmpty(Environment.GetEnvironmentVariable("STORAGE_CONNECTION_STRING")))
{
string[] connectionStringParts = Environment.GetEnvironmentVariable("STORAGE_CONNECTION_STRING").Split("/");

string dirName = connectionStringParts[4] + "/Files/" + _blobContainerName + path;
foreach (var fspath in _fileSystemClient.GetPaths(dirName))
{
if (fspath.Name.Contains(dirName + "/" + name))
{
return Task.FromResult(fspath.Name);
}
}
}
}

return null;
}
catch (Exception ex)
{
_logger.LogError(ex.Message);
return null;
}
}

public Task<string> StoreFileAsync(string path, byte[] content, CancellationToken cancellationToken = default)
{
if (string.IsNullOrEmpty(path) || (content == null) || (content.Length == 0))
{
return null;
}

try
{
lock (_lock)
{
VerifyOneLakeConnectivity();

if (!string.IsNullOrEmpty(Environment.GetEnvironmentVariable("STORAGE_CONNECTION_STRING")))
{
string[] connectionStringParts = Environment.GetEnvironmentVariable("STORAGE_CONNECTION_STRING").Split("/");

string filePath = connectionStringParts[4] + "/Files/" + _blobContainerName + path;
DataLakeFileClient client = _fileSystemClient.GetFileClient(filePath);
client.Upload(new MemoryStream(content), true);

return Task.FromResult(path);
}
}

return null;
}
catch (Exception ex)
{
_logger.LogError(ex.Message);
return null;
}
}

public Task<byte[]> LoadFileAsync(string name, CancellationToken cancellationToken = default)
{
if (string.IsNullOrEmpty(name))
{
return null;
}

try
{
lock (_lock)
{
VerifyOneLakeConnectivity();

if (!string.IsNullOrEmpty(Environment.GetEnvironmentVariable("STORAGE_CONNECTION_STRING")))
{
string[] connectionStringParts = Environment.GetEnvironmentVariable("STORAGE_CONNECTION_STRING").Split("/");

DataLakeFileClient client = _fileSystemClient.GetFileClient(name);
Azure.Response<FileDownloadInfo> response = client.Read();
MemoryStream content = new();
response.Value.Content.CopyTo(content);
return Task.FromResult(content.ToArray());
}
}

return null;
}
catch (Exception ex)
{
_logger.LogError(ex.Message);
return null;
}
}

private void VerifyOneLakeConnectivity()
{
if (!string.IsNullOrEmpty(Environment.GetEnvironmentVariable("STORAGE_CONNECTION_STRING")))
{
string[] connectionStringParts = Environment.GetEnvironmentVariable("STORAGE_CONNECTION_STRING").Split("/");

_dataLakeServiceClient = new DataLakeServiceClient(new Uri("https://" + connectionStringParts[2]), _credential);
_fileSystemClient = _dataLakeServiceClient.GetFileSystemClient(connectionStringParts[3]);

// make sure our directory exists
string dirName = connectionStringParts[4] + "/Files/" + _blobContainerName;
string authNotification = "Not required - OneLake access authenticated!";
bool found = false;
foreach (var fspath in _fileSystemClient.GetPaths(connectionStringParts[4] + "/Files"))
{
if (fspath.Name == dirName)
{
found = true;
HomeController.AuthenticationCode = authNotification;
Diagnostics.Singleton.Info.ConnectedToCloudStorage = true;
}
}

if (!found)
{
_fileSystemClient.CreateDirectory(connectionStringParts[4] + "/Files/" + _blobContainerName);
HomeController.AuthenticationCode = authNotification;
Diagnostics.Singleton.Info.ConnectedToCloudStorage = true;
}
}
}
}
}
7 changes: 5 additions & 2 deletions Properties/launchSettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,13 @@
"useSSL": true,
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development",
"USE_KAFKA": "TRUE",
"USE_KAFKA": "1",
"STORAGE_TYPE": "",
"STORAGE_CONTAINER_NAME": "uacloudpublisher",
"STORAGE_CONNECTION_STRING": "DefaultEndpointsProtocol=https;AccountName=[yourstorageaccountname];AccountKey=[key];EndpointSuffix=core.windows.net"
"STORAGE_CONNECTION_STRING": "DefaultEndpointsProtocol=https;AccountName=[yourstorageaccountname];AccountKey=[key];EndpointSuffix=core.windows.net",
"AZURE_OPENAI_API_ENDPOINT": "https://[yourinstancename].openai.azure.com/",
"AZURE_OPENAI_API_KEY": "",
"AZURE_OPENAI_API_DEPLOYMENT_NAME": ""
}
}
}
Expand Down
Loading

0 comments on commit a4f2b06

Please sign in to comment.