Skip to content

Commit

Permalink
Migrate DurableTask.ServiceBus Table and Blob store SDK from WindowsA…
Browse files Browse the repository at this point in the history
…zure.Storage to Azure.Data.Tables and Azure.Storage.Blobs (#1112)
  • Loading branch information
arlynch authored Jun 25, 2024
1 parent d21a482 commit 0af3eca
Show file tree
Hide file tree
Showing 13 changed files with 480 additions and 445 deletions.
8 changes: 5 additions & 3 deletions src/DurableTask.ServiceBus/DurableTask.ServiceBus.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

<!-- Version Info -->
<PropertyGroup>
<MajorVersion>3</MajorVersion>
<MajorVersion>4</MajorVersion>
<MinorVersion>0</MinorVersion>
<PatchVersion>0</PatchVersion>

Expand All @@ -23,6 +23,8 @@
</PropertyGroup>

<ItemGroup Condition="'$(TargetFramework)' == 'net462'">
<PackageReference Include="Azure.Data.Tables" Version="12.8.3" />
<PackageReference Include="Azure.Storage.Blobs" Version="12.20.0" />
<PackageReference Include="ImpromptuInterface" version="6.2.2" />
<PackageReference Include="Microsoft.Azure.KeyVault.Core" version="1.0.0" />
<PackageReference Include="Microsoft.Data.Edm" version="5.8.5" />
Expand All @@ -31,13 +33,13 @@
<PackageReference Include="Microsoft.WindowsAzure.ConfigurationManager" version="3.2.1" />
<PackageReference Include="System.Spatial" version="5.8.5" />
<PackageReference Include="WindowsAzure.ServiceBus" version="4.1.3" />
<PackageReference Include="WindowsAzure.Storage" version="7.0.0" />
<Reference Include="System.Transactions" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.0'">
<PackageReference Include="Azure.Data.Tables" Version="12.8.3" />
<PackageReference Include="Azure.Storage.Blobs" Version="12.20.0" />
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.17.0" />
<PackageReference Include="WindowsAzure.Storage" Version="9.3.3" />
</ItemGroup>

<ItemGroup>
Expand Down
13 changes: 7 additions & 6 deletions src/DurableTask.ServiceBus/Tracking/AzureStorageBlobStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ namespace DurableTask.ServiceBus.Tracking
using System;
using System.IO;
using System.Threading.Tasks;
using Azure.Core;
using DurableTask.Core;
using DurableTask.Core.Tracking;
using Microsoft.WindowsAzure.Storage;

/// <summary>
/// Azure blob storage to allow save and load large blobs, such as message and session, as a stream using Azure blob container.
Expand All @@ -41,13 +41,14 @@ public AzureStorageBlobStore(string hubName, string connectionString)
}

/// <summary>
/// Creates a new AzureStorageBlobStore using the supplied hub name and cloud storage account
/// Creates a new AzureStorageBlobStore using the supplied hub name, endpoint and token credential
/// </summary>
/// <param name="hubName">The hub name for this store</param>
/// <param name="cloudStorageAccount">Azure Cloud Storage Account</param>
public AzureStorageBlobStore(string hubName, CloudStorageAccount cloudStorageAccount)
/// <param name="hubName">The hub name</param>
/// <param name="endpoint">Uri Endpoint for the blob store</param>
/// <param name="credential">Token Credentials required for accessing the blob store</param>
public AzureStorageBlobStore(string hubName, Uri endpoint, TokenCredential credential)
{
this.blobClient = new BlobStorageClient(hubName, cloudStorageAccount);
this.blobClient = new BlobStorageClient(hubName, endpoint, credential);
}

/// <summary>
Expand Down
232 changes: 120 additions & 112 deletions src/DurableTask.ServiceBus/Tracking/AzureTableClient.cs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ namespace DurableTask.ServiceBus.Tracking
{
using System;
using System.Collections.Generic;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Table;
using Azure;
using Azure.Data.Tables;

/// <summary>
/// Abstract class for composite entities for Azure table
Expand All @@ -41,40 +41,13 @@ public abstract class AzureTableCompositeTableEntity : ITableEntity
/// <summary>
/// Gets or sets the row timestamp
/// </summary>
public DateTimeOffset Timestamp { get; set; } = DateTimeOffset.Now;
public DateTimeOffset? Timestamp { get; set; } = DateTimeOffset.Now;

/// <summary>
/// Gets or sets the entity etag
/// </summary>
public string ETag { get; set; }

/// <summary>
/// Read an entity properties based on the supplied dictionary or entity properties
/// </summary>
/// <param name="properties">Dictionary of properties to read for the entity</param>
/// <param name="operationContext">The operation context</param>
public abstract void ReadEntity(IDictionary<string, EntityProperty> properties, OperationContext operationContext);

/// <summary>
/// Write an entity to a dictionary of entity properties
/// </summary>
/// <param name="operationContext">The operation context</param>
public abstract IDictionary<string, EntityProperty> WriteEntity(OperationContext operationContext);
public ETag ETag { get; set; }

internal abstract IEnumerable<ITableEntity> BuildDenormalizedEntities();

/// <summary>
///
/// </summary>
protected T GetValue<T>(string key, IDictionary<string, EntityProperty> properties,
Func<EntityProperty, T> extract)
{
if (!properties.TryGetValue(key, out EntityProperty ep))
{
return default(T);
}

return extract(ep);
}
}
}
77 changes: 26 additions & 51 deletions src/DurableTask.ServiceBus/Tracking/AzureTableInstanceStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ namespace DurableTask.ServiceBus.Tracking
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading.Tasks;
using Azure;
using Azure.Core;
using DurableTask.Core;
using DurableTask.Core.Common;
using DurableTask.Core.Serializing;
using DurableTask.Core.Tracking;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Table;

/// <summary>
/// Azure Table Instance store provider to allow storage and lookup for orchestration state event history with query support
Expand Down Expand Up @@ -61,23 +62,29 @@ public AzureTableInstanceStore(string hubName, string tableConnectionString)
}

/// <summary>
/// Creates a new AzureTableInstanceStore using the supplied hub name and cloud storage account
/// Creates a new AzureTableInstanceStore using the supplied hub name, Uri endpoint, and Token Credential
/// </summary>
/// <param name="hubName">The hub name for this instance store</param>
/// <param name="cloudStorageAccount">Cloud Storage Account</param>
public AzureTableInstanceStore(string hubName, CloudStorageAccount cloudStorageAccount)
/// <param name="endpoint">Uri Endpoint for the instance store</param>
/// <param name="credential">Token Credentials required for accessing the instance store</param>
public AzureTableInstanceStore(string hubName, Uri endpoint, TokenCredential credential)
{
if (cloudStorageAccount == null)
if (endpoint == null)
{
throw new ArgumentException("Invalid Cloud Storage Account", nameof(cloudStorageAccount));
throw new ArgumentException("Invalid Uri Endpoint", nameof(endpoint));
}

if (credential == null)
{
throw new ArgumentException("Invalid Token Credential", nameof(credential));
}

if (string.IsNullOrWhiteSpace(hubName))
{
throw new ArgumentException("Invalid hub name", nameof(hubName));
}

this.tableClient = new AzureTableClient(hubName, cloudStorageAccount);
this.tableClient = new AzureTableClient(hubName, endpoint, credential);

// Workaround an issue with Storage that throws exceptions for any date < 1600 so DateTime.Min cannot be used
DateTimeUtils.SetMinDateTimeForStorageEmulator();
Expand Down Expand Up @@ -287,23 +294,14 @@ public Task<OrchestrationStateQuerySegment> QueryOrchestrationStatesSegmentedAsy
public async Task<OrchestrationStateQuerySegment> QueryOrchestrationStatesSegmentedAsync(
OrchestrationStateQuery stateQuery, string continuationToken, int count)
{
TableContinuationToken tokenObj = null;

if (continuationToken != null)
{
tokenObj = DeserializeTableContinuationToken(continuationToken);
}

TableQuerySegment<AzureTableOrchestrationStateEntity> results =
await this.tableClient.QueryOrchestrationStatesSegmentedAsync(stateQuery, tokenObj, count)
Page<AzureTableOrchestrationStateEntity> results =
await this.tableClient.QueryOrchestrationStatesSegmentedAsync(stateQuery, continuationToken, count)
.ConfigureAwait(false);

return new OrchestrationStateQuerySegment
{
Results = results.Results.Select(s => s.State),
ContinuationToken = results.ContinuationToken == null
? null
: SerializeTableContinuationToken(results.ContinuationToken)
Results = results.Values.Select(s => s.State),
ContinuationToken = results.ContinuationToken
};
}

Expand All @@ -315,12 +313,12 @@ await this.tableClient.QueryOrchestrationStatesSegmentedAsync(stateQuery, tokenO
/// <returns>The number of history events purged.</returns>
public async Task<int> PurgeOrchestrationHistoryEventsAsync(DateTime thresholdDateTimeUtc, OrchestrationStateTimeRangeFilterType timeRangeFilterType)
{
TableContinuationToken continuationToken = null;
string continuationToken = null;

var purgeCount = 0;
do
{
TableQuerySegment<AzureTableOrchestrationStateEntity> resultSegment =
Page<AzureTableOrchestrationStateEntity> resultSegment =
(await this.tableClient.QueryOrchestrationStatesSegmentedAsync(
new OrchestrationStateQuery()
.AddTimeRangeFilter(DateTimeUtils.MinDateTime, thresholdDateTimeUtc, timeRangeFilterType),
Expand All @@ -329,23 +327,23 @@ public async Task<int> PurgeOrchestrationHistoryEventsAsync(DateTime thresholdDa

continuationToken = resultSegment.ContinuationToken;

if (resultSegment.Results != null)
if (resultSegment.Values != null)
{
await PurgeOrchestrationHistorySegmentAsync(resultSegment).ConfigureAwait(false);
purgeCount += resultSegment.Results.Count;
purgeCount += resultSegment.Values.Count;
}
} while (continuationToken != null);

return purgeCount;
}

async Task PurgeOrchestrationHistorySegmentAsync(
TableQuerySegment<AzureTableOrchestrationStateEntity> orchestrationStateEntitySegment)
Page<AzureTableOrchestrationStateEntity> orchestrationStateEntitySegment)
{
var stateEntitiesToDelete = new List<AzureTableOrchestrationStateEntity>(orchestrationStateEntitySegment.Results);
var stateEntitiesToDelete = new List<AzureTableOrchestrationStateEntity>(orchestrationStateEntitySegment.Values);

var historyEntitiesToDelete = new ConcurrentBag<IEnumerable<AzureTableOrchestrationHistoryEventEntity>>();
await Task.WhenAll(orchestrationStateEntitySegment.Results.Select(
await Task.WhenAll(orchestrationStateEntitySegment.Values.Select(
entity => Task.Run(async () =>
{
IEnumerable<AzureTableOrchestrationHistoryEventEntity> historyEntities =
Expand Down Expand Up @@ -463,28 +461,5 @@ OrchestrationWorkItemInstanceEntity TableHistoryEntityToWorkItemEvent(AzureTable
HistoryEvent = entity.HistoryEvent
};
}

string SerializeTableContinuationToken(TableContinuationToken continuationToken)
{
if (continuationToken == null)
{
throw new ArgumentNullException(nameof(continuationToken));
}

string serializedToken = JsonDataConverter.Default.Serialize(continuationToken);
return Convert.ToBase64String(Encoding.Unicode.GetBytes(serializedToken));
}

TableContinuationToken DeserializeTableContinuationToken(string serializedContinuationToken)
{
if (string.IsNullOrWhiteSpace(serializedContinuationToken))
{
throw new ArgumentException("Invalid serializedContinuationToken");
}

byte[] tokenBytes = Convert.FromBase64String(serializedContinuationToken);

return JsonDataConverter.Default.Deserialize<TableContinuationToken>(Encoding.Unicode.GetString(tokenBytes));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@ namespace DurableTask.ServiceBus.Tracking
{
using System;
using System.Collections.Generic;
using System.Runtime.Serialization;
using Azure.Data.Tables;
using DurableTask.Core.History;
using DurableTask.Core.Serializing;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Table;
using Newtonsoft.Json;

/// <summary>
/// History Tracking entity for orchestration history events
/// </summary>
public class AzureTableOrchestrationHistoryEventEntity : AzureTableCompositeTableEntity
internal class AzureTableOrchestrationHistoryEventEntity : AzureTableCompositeTableEntity
{
private static readonly JsonSerializerSettings WriteJsonSettings = new JsonSerializerSettings
{
Expand Down Expand Up @@ -93,6 +93,7 @@ public AzureTableOrchestrationHistoryEventEntity(
/// <summary>
/// Gets or set the history event detail for the tracking entity
/// </summary>
[IgnoreDataMember] // see HistoryEventJson
public HistoryEvent HistoryEvent { get; set; }

internal override IEnumerable<ITableEntity> BuildDenormalizedEntities()
Expand All @@ -110,55 +111,6 @@ internal override IEnumerable<ITableEntity> BuildDenormalizedEntities()
return new[] { entity };
}

/// <summary>
/// Write an entity to a dictionary of entity properties
/// </summary>
/// <param name="operationContext">The operation context</param>
public override IDictionary<string, EntityProperty> WriteEntity(OperationContext operationContext)
{
string serializedHistoryEvent = JsonConvert.SerializeObject(HistoryEvent, WriteJsonSettings);

// replace with a generic event with the truncated history so at least we have some record
// note that this makes the history stored in the instance store unreplayable. so any replay logic
// that we build will have to especially check for this event and flag the orchestration as unplayable if it sees this event
if (!string.IsNullOrWhiteSpace(serializedHistoryEvent) &&
serializedHistoryEvent.Length > ServiceBusConstants.MaxStringLengthForAzureTableColumn)
{
serializedHistoryEvent = JsonConvert.SerializeObject(new GenericEvent(HistoryEvent.EventId,
serializedHistoryEvent.Substring(0, ServiceBusConstants.MaxStringLengthForAzureTableColumn) + " ....(truncated)..]"),
WriteJsonSettings);
}

var returnValues = new Dictionary<string, EntityProperty>();
returnValues.Add("InstanceId", new EntityProperty(InstanceId));
returnValues.Add("ExecutionId", new EntityProperty(ExecutionId));
returnValues.Add("TaskTimeStamp", new EntityProperty(TaskTimeStamp));
returnValues.Add("SequenceNumber", new EntityProperty(SequenceNumber));
returnValues.Add("HistoryEvent", new EntityProperty(serializedHistoryEvent));

return returnValues;
}

/// <summary>
/// Read an entity properties based on the supplied dictionary or entity properties
/// </summary>
/// <param name="properties">Dictionary of properties to read for the entity</param>
/// <param name="operationContext">The operation context</param>
public override void ReadEntity(IDictionary<string, EntityProperty> properties,
OperationContext operationContext)
{
InstanceId = GetValue("InstanceId", properties, property => property.StringValue);
ExecutionId = GetValue("ExecutionId", properties, property => property.StringValue);
SequenceNumber = GetValue("SequenceNumber", properties, property => property.Int32Value).GetValueOrDefault();
TaskTimeStamp =
GetValue("TaskTimeStamp", properties, property => property.DateTimeOffsetValue)
.GetValueOrDefault()
.DateTime;

string serializedHistoryEvent = GetValue("HistoryEvent", properties, property => property.StringValue);
HistoryEvent = JsonConvert.DeserializeObject<HistoryEvent>(serializedHistoryEvent, ReadJsonSettings);
}

/// <summary>
/// Returns a string that represents the current object.
/// </summary>
Expand All @@ -169,5 +121,27 @@ public override string ToString()
{
return $"Instance Id: {InstanceId} Execution Id: {ExecutionId} Seq: {SequenceNumber.ToString()} Time: {TaskTimeStamp} HistoryEvent: {HistoryEvent.EventType.ToString()}";
}

#region AzureStorageHelpers
// This public accessor is only used to safely interact with the Azure Table Storage SDK, which reads and writes using reflection.
// This is an artifact of updating from and SDK that did not use reflection.
[DataMember(Name = "HistoryEvent")]
public string HistoryEventJson
{
get
{
var serializedHistoryEvent = JsonConvert.SerializeObject(HistoryEvent, WriteJsonSettings);
if (!string.IsNullOrWhiteSpace(serializedHistoryEvent) &&
serializedHistoryEvent.Length > ServiceBusConstants.MaxStringLengthForAzureTableColumn)
{
serializedHistoryEvent = JsonConvert.SerializeObject(new GenericEvent(HistoryEvent.EventId,
serializedHistoryEvent.Substring(0, ServiceBusConstants.MaxStringLengthForAzureTableColumn) + " ....(truncated)..]"),
WriteJsonSettings);
}
return serializedHistoryEvent;
}
set => HistoryEvent = JsonConvert.DeserializeObject<HistoryEvent>(value, ReadJsonSettings);
}
#endregion
}
}
Loading

0 comments on commit 0af3eca

Please sign in to comment.