Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate DurableTask.ServiceBus Table and Blob store SDK from WindowsAzure.Storage to Azure.Data.Tables and Azure.Storage.Blobs #1112

Merged
merged 13 commits into from
Jun 25, 2024
Merged
8 changes: 5 additions & 3 deletions src/DurableTask.ServiceBus/DurableTask.ServiceBus.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

<!-- Version Info -->
<PropertyGroup>
<MajorVersion>3</MajorVersion>
<MajorVersion>4</MajorVersion>
<MinorVersion>0</MinorVersion>
<PatchVersion>0</PatchVersion>

Expand All @@ -23,6 +23,8 @@
</PropertyGroup>

<ItemGroup Condition="'$(TargetFramework)' == 'net462'">
<PackageReference Include="Azure.Data.Tables" Version="12.8.3" />
<PackageReference Include="Azure.Storage.Blobs" Version="12.20.0" />
<PackageReference Include="ImpromptuInterface" version="6.2.2" />
<PackageReference Include="Microsoft.Azure.KeyVault.Core" version="1.0.0" />
<PackageReference Include="Microsoft.Data.Edm" version="5.8.5" />
Expand All @@ -31,13 +33,13 @@
<PackageReference Include="Microsoft.WindowsAzure.ConfigurationManager" version="3.2.1" />
<PackageReference Include="System.Spatial" version="5.8.5" />
<PackageReference Include="WindowsAzure.ServiceBus" version="4.1.3" />
<PackageReference Include="WindowsAzure.Storage" version="7.0.0" />
<Reference Include="System.Transactions" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.0'">
<PackageReference Include="Azure.Data.Tables" Version="12.8.3" />
<PackageReference Include="Azure.Storage.Blobs" Version="12.20.0" />
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.17.0" />
<PackageReference Include="WindowsAzure.Storage" Version="9.3.3" />
</ItemGroup>

<ItemGroup>
Expand Down
13 changes: 7 additions & 6 deletions src/DurableTask.ServiceBus/Tracking/AzureStorageBlobStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ namespace DurableTask.ServiceBus.Tracking
using System;
using System.IO;
using System.Threading.Tasks;
using Azure.Core;
using DurableTask.Core;
using DurableTask.Core.Tracking;
using Microsoft.WindowsAzure.Storage;

/// <summary>
/// Azure blob storage to allow save and load large blobs, such as message and session, as a stream using Azure blob container.
Expand All @@ -41,13 +41,14 @@ public AzureStorageBlobStore(string hubName, string connectionString)
}

/// <summary>
/// Creates a new AzureStorageBlobStore using the supplied hub name and cloud storage account
/// Creates a new AzureStorageBlobStore using the supplied hub name, endpoint and token credential
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just confirming - does this mean that only managed identity (i.e token credentials, right?) would be supported now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To preserve previous functionality, we are maintaining support for connection strings. The breaking change is to introduce support for token credentials and removing support for the "cloud storage account" as that class no longer exists in the new Azure SDK.

/// </summary>
/// <param name="hubName">The hub name for this store</param>
/// <param name="cloudStorageAccount">Azure Cloud Storage Account</param>
public AzureStorageBlobStore(string hubName, CloudStorageAccount cloudStorageAccount)
/// <param name="hubName">The hub name</param>
/// <param name="endpoint">Uri Endpoint for the blob store</param>
/// <param name="credential">Token Credentials required for accessing the blob store</param>
public AzureStorageBlobStore(string hubName, Uri endpoint, TokenCredential credential)
{
this.blobClient = new BlobStorageClient(hubName, cloudStorageAccount);
this.blobClient = new BlobStorageClient(hubName, endpoint, credential);
}

/// <summary>
Expand Down
232 changes: 120 additions & 112 deletions src/DurableTask.ServiceBus/Tracking/AzureTableClient.cs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ namespace DurableTask.ServiceBus.Tracking
{
using System;
using System.Collections.Generic;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Table;
using Azure;
using Azure.Data.Tables;

/// <summary>
/// Abstract class for composite entities for Azure table
Expand All @@ -41,40 +41,13 @@ public abstract class AzureTableCompositeTableEntity : ITableEntity
/// <summary>
/// Gets or sets the row timestamp
/// </summary>
public DateTimeOffset Timestamp { get; set; } = DateTimeOffset.Now;
public DateTimeOffset? Timestamp { get; set; } = DateTimeOffset.Now;

/// <summary>
/// Gets or sets the entity etag
/// </summary>
public string ETag { get; set; }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dropping these (and other public fields) is technically a breaking change. Just jotting this down for down, still unsure what replaces this (or if functionality was dropped)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is being replaced with an ETag class provided by Azure Core, and a required part of the new Azure SDK. While unfortunate this is a breaking change, this should not change functionality in a substantial manner.


/// <summary>
/// Read an entity properties based on the supplied dictionary or entity properties
/// </summary>
/// <param name="properties">Dictionary of properties to read for the entity</param>
/// <param name="operationContext">The operation context</param>
public abstract void ReadEntity(IDictionary<string, EntityProperty> properties, OperationContext operationContext);

/// <summary>
/// Write an entity to a dictionary of entity properties
/// </summary>
/// <param name="operationContext">The operation context</param>
public abstract IDictionary<string, EntityProperty> WriteEntity(OperationContext operationContext);
public ETag ETag { get; set; }

internal abstract IEnumerable<ITableEntity> BuildDenormalizedEntities();

/// <summary>
///
/// </summary>
protected T GetValue<T>(string key, IDictionary<string, EntityProperty> properties,
Func<EntityProperty, T> extract)
{
if (!properties.TryGetValue(key, out EntityProperty ep))
{
return default(T);
}

return extract(ep);
}
}
}
77 changes: 26 additions & 51 deletions src/DurableTask.ServiceBus/Tracking/AzureTableInstanceStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ namespace DurableTask.ServiceBus.Tracking
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading.Tasks;
using Azure;
using Azure.Core;
using DurableTask.Core;
using DurableTask.Core.Common;
using DurableTask.Core.Serializing;
using DurableTask.Core.Tracking;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Table;

/// <summary>
/// Azure Table Instance store provider to allow storage and lookup for orchestration state event history with query support
Expand Down Expand Up @@ -61,23 +62,29 @@ public AzureTableInstanceStore(string hubName, string tableConnectionString)
}

/// <summary>
/// Creates a new AzureTableInstanceStore using the supplied hub name and cloud storage account
/// Creates a new AzureTableInstanceStore using the supplied hub name, Uri endpoint, and Token Credential
/// </summary>
/// <param name="hubName">The hub name for this instance store</param>
/// <param name="cloudStorageAccount">Cloud Storage Account</param>
public AzureTableInstanceStore(string hubName, CloudStorageAccount cloudStorageAccount)
/// <param name="endpoint">Uri Endpoint for the instance store</param>
/// <param name="credential">Token Credentials required for accessing the instance store</param>
public AzureTableInstanceStore(string hubName, Uri endpoint, TokenCredential credential)
{
if (cloudStorageAccount == null)
if (endpoint == null)
{
throw new ArgumentException("Invalid Cloud Storage Account", nameof(cloudStorageAccount));
throw new ArgumentException("Invalid Uri Endpoint", nameof(endpoint));
}

if (credential == null)
{
throw new ArgumentException("Invalid Token Credential", nameof(credential));
}

if (string.IsNullOrWhiteSpace(hubName))
{
throw new ArgumentException("Invalid hub name", nameof(hubName));
}

this.tableClient = new AzureTableClient(hubName, cloudStorageAccount);
this.tableClient = new AzureTableClient(hubName, endpoint, credential);

// Workaround an issue with Storage that throws exceptions for any date < 1600 so DateTime.Min cannot be used
DateTimeUtils.SetMinDateTimeForStorageEmulator();
Expand Down Expand Up @@ -287,23 +294,14 @@ public Task<OrchestrationStateQuerySegment> QueryOrchestrationStatesSegmentedAsy
public async Task<OrchestrationStateQuerySegment> QueryOrchestrationStatesSegmentedAsync(
OrchestrationStateQuery stateQuery, string continuationToken, int count)
{
TableContinuationToken tokenObj = null;

if (continuationToken != null)
{
tokenObj = DeserializeTableContinuationToken(continuationToken);
}

TableQuerySegment<AzureTableOrchestrationStateEntity> results =
await this.tableClient.QueryOrchestrationStatesSegmentedAsync(stateQuery, tokenObj, count)
Page<AzureTableOrchestrationStateEntity> results =
await this.tableClient.QueryOrchestrationStatesSegmentedAsync(stateQuery, continuationToken, count)
.ConfigureAwait(false);

return new OrchestrationStateQuerySegment
{
Results = results.Results.Select(s => s.State),
ContinuationToken = results.ContinuationToken == null
? null
: SerializeTableContinuationToken(results.ContinuationToken)
Results = results.Values.Select(s => s.State),
ContinuationToken = results.ContinuationToken
};
}

Expand All @@ -315,12 +313,12 @@ await this.tableClient.QueryOrchestrationStatesSegmentedAsync(stateQuery, tokenO
/// <returns>The number of history events purged.</returns>
public async Task<int> PurgeOrchestrationHistoryEventsAsync(DateTime thresholdDateTimeUtc, OrchestrationStateTimeRangeFilterType timeRangeFilterType)
{
TableContinuationToken continuationToken = null;
string continuationToken = null;

var purgeCount = 0;
do
{
TableQuerySegment<AzureTableOrchestrationStateEntity> resultSegment =
Page<AzureTableOrchestrationStateEntity> resultSegment =
(await this.tableClient.QueryOrchestrationStatesSegmentedAsync(
new OrchestrationStateQuery()
.AddTimeRangeFilter(DateTimeUtils.MinDateTime, thresholdDateTimeUtc, timeRangeFilterType),
Expand All @@ -329,23 +327,23 @@ public async Task<int> PurgeOrchestrationHistoryEventsAsync(DateTime thresholdDa

continuationToken = resultSegment.ContinuationToken;

if (resultSegment.Results != null)
if (resultSegment.Values != null)
{
await PurgeOrchestrationHistorySegmentAsync(resultSegment).ConfigureAwait(false);
purgeCount += resultSegment.Results.Count;
purgeCount += resultSegment.Values.Count;
}
} while (continuationToken != null);

return purgeCount;
}

async Task PurgeOrchestrationHistorySegmentAsync(
TableQuerySegment<AzureTableOrchestrationStateEntity> orchestrationStateEntitySegment)
Page<AzureTableOrchestrationStateEntity> orchestrationStateEntitySegment)
{
var stateEntitiesToDelete = new List<AzureTableOrchestrationStateEntity>(orchestrationStateEntitySegment.Results);
var stateEntitiesToDelete = new List<AzureTableOrchestrationStateEntity>(orchestrationStateEntitySegment.Values);

var historyEntitiesToDelete = new ConcurrentBag<IEnumerable<AzureTableOrchestrationHistoryEventEntity>>();
await Task.WhenAll(orchestrationStateEntitySegment.Results.Select(
await Task.WhenAll(orchestrationStateEntitySegment.Values.Select(
entity => Task.Run(async () =>
{
IEnumerable<AzureTableOrchestrationHistoryEventEntity> historyEntities =
Expand Down Expand Up @@ -463,28 +461,5 @@ OrchestrationWorkItemInstanceEntity TableHistoryEntityToWorkItemEvent(AzureTable
HistoryEvent = entity.HistoryEvent
};
}

string SerializeTableContinuationToken(TableContinuationToken continuationToken)
{
if (continuationToken == null)
{
throw new ArgumentNullException(nameof(continuationToken));
}

string serializedToken = JsonDataConverter.Default.Serialize(continuationToken);
return Convert.ToBase64String(Encoding.Unicode.GetBytes(serializedToken));
}

TableContinuationToken DeserializeTableContinuationToken(string serializedContinuationToken)
{
if (string.IsNullOrWhiteSpace(serializedContinuationToken))
{
throw new ArgumentException("Invalid serializedContinuationToken");
}

byte[] tokenBytes = Convert.FromBase64String(serializedContinuationToken);

return JsonDataConverter.Default.Deserialize<TableContinuationToken>(Encoding.Unicode.GetString(tokenBytes));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@ namespace DurableTask.ServiceBus.Tracking
{
using System;
using System.Collections.Generic;
using System.Runtime.Serialization;
using Azure.Data.Tables;
using DurableTask.Core.History;
using DurableTask.Core.Serializing;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Table;
using Newtonsoft.Json;

/// <summary>
/// History Tracking entity for orchestration history events
/// </summary>
public class AzureTableOrchestrationHistoryEventEntity : AzureTableCompositeTableEntity
internal class AzureTableOrchestrationHistoryEventEntity : AzureTableCompositeTableEntity
{
private static readonly JsonSerializerSettings WriteJsonSettings = new JsonSerializerSettings
{
Expand Down Expand Up @@ -93,6 +93,7 @@ public AzureTableOrchestrationHistoryEventEntity(
/// <summary>
/// Gets or set the history event detail for the tracking entity
/// </summary>
[IgnoreDataMember] // see HistoryEventJson
public HistoryEvent HistoryEvent { get; set; }

internal override IEnumerable<ITableEntity> BuildDenormalizedEntities()
Expand All @@ -110,55 +111,6 @@ internal override IEnumerable<ITableEntity> BuildDenormalizedEntities()
return new[] { entity };
}

/// <summary>
/// Write an entity to a dictionary of entity properties
/// </summary>
/// <param name="operationContext">The operation context</param>
public override IDictionary<string, EntityProperty> WriteEntity(OperationContext operationContext)
{
string serializedHistoryEvent = JsonConvert.SerializeObject(HistoryEvent, WriteJsonSettings);

// replace with a generic event with the truncated history so at least we have some record
// note that this makes the history stored in the instance store unreplayable. so any replay logic
// that we build will have to especially check for this event and flag the orchestration as unplayable if it sees this event
if (!string.IsNullOrWhiteSpace(serializedHistoryEvent) &&
serializedHistoryEvent.Length > ServiceBusConstants.MaxStringLengthForAzureTableColumn)
{
serializedHistoryEvent = JsonConvert.SerializeObject(new GenericEvent(HistoryEvent.EventId,
serializedHistoryEvent.Substring(0, ServiceBusConstants.MaxStringLengthForAzureTableColumn) + " ....(truncated)..]"),
WriteJsonSettings);
}

var returnValues = new Dictionary<string, EntityProperty>();
returnValues.Add("InstanceId", new EntityProperty(InstanceId));
returnValues.Add("ExecutionId", new EntityProperty(ExecutionId));
returnValues.Add("TaskTimeStamp", new EntityProperty(TaskTimeStamp));
returnValues.Add("SequenceNumber", new EntityProperty(SequenceNumber));
returnValues.Add("HistoryEvent", new EntityProperty(serializedHistoryEvent));

return returnValues;
}

/// <summary>
/// Read an entity properties based on the supplied dictionary or entity properties
/// </summary>
/// <param name="properties">Dictionary of properties to read for the entity</param>
/// <param name="operationContext">The operation context</param>
public override void ReadEntity(IDictionary<string, EntityProperty> properties,
OperationContext operationContext)
{
InstanceId = GetValue("InstanceId", properties, property => property.StringValue);
ExecutionId = GetValue("ExecutionId", properties, property => property.StringValue);
SequenceNumber = GetValue("SequenceNumber", properties, property => property.Int32Value).GetValueOrDefault();
TaskTimeStamp =
GetValue("TaskTimeStamp", properties, property => property.DateTimeOffsetValue)
.GetValueOrDefault()
.DateTime;

string serializedHistoryEvent = GetValue("HistoryEvent", properties, property => property.StringValue);
HistoryEvent = JsonConvert.DeserializeObject<HistoryEvent>(serializedHistoryEvent, ReadJsonSettings);
}

/// <summary>
/// Returns a string that represents the current object.
/// </summary>
Expand All @@ -169,5 +121,27 @@ public override string ToString()
{
return $"Instance Id: {InstanceId} Execution Id: {ExecutionId} Seq: {SequenceNumber.ToString()} Time: {TaskTimeStamp} HistoryEvent: {HistoryEvent.EventType.ToString()}";
}

#region AzureStorageHelpers
// This public accessor is only used to safely interact with the Azure Table Storage SDK, which reads and writes using reflection.
// This is an artifact of updating from and SDK that did not use reflection.
[DataMember(Name = "HistoryEvent")]
public string HistoryEventJson
{
get
{
var serializedHistoryEvent = JsonConvert.SerializeObject(HistoryEvent, WriteJsonSettings);
if (!string.IsNullOrWhiteSpace(serializedHistoryEvent) &&
serializedHistoryEvent.Length > ServiceBusConstants.MaxStringLengthForAzureTableColumn)
{
serializedHistoryEvent = JsonConvert.SerializeObject(new GenericEvent(HistoryEvent.EventId,
serializedHistoryEvent.Substring(0, ServiceBusConstants.MaxStringLengthForAzureTableColumn) + " ....(truncated)..]"),
WriteJsonSettings);
}
return serializedHistoryEvent;
}
set => HistoryEvent = JsonConvert.DeserializeObject<HistoryEvent>(value, ReadJsonSettings);
}
#endregion
}
}
Loading
Loading