Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow user to specify MaxHistoryEvents and constraint history loading to include the latest generation only #1119

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -318,5 +318,12 @@ internal LogHelper Logger
/// Consumers that require separate dispatch (such as the new out-of-proc v2 SDKs) must set this to true.
/// </summary>
public bool UseSeparateQueueForEntityWorkItems { get; set; } = false;

/// <summary>
/// Gets or sets the maximum number of history events for a single orchestration instance.
/// An orchestrator that goes beyond this limit is automatically terminated.
/// By default, this is null, which means there is no limit.
/// </summary>
public int? MaxHistoryEvents { get; set; } = null;
}
}
3 changes: 2 additions & 1 deletion src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@
<!-- Version Info -->
<PropertyGroup>
<MajorVersion>1</MajorVersion>
<MinorVersion>17</MinorVersion>
<MinorVersion>18</MinorVersion>
<PatchVersion>3</PatchVersion>
<VersionPrefix>$(MajorVersion).$(MinorVersion).$(PatchVersion)</VersionPrefix>
<FileVersion>$(VersionPrefix).0</FileVersion>
<!-- FileVersionRevision is expected to be set by the CI. This is useful for distinguishing between multiple builds of the same version. -->
<FileVersion Condition="'$(FileVersionRevision)' != ''">$(VersionPrefix).$(FileVersionRevision)</FileVersion>
<!-- The assembly version is only the major/minor pair, making it easier to do in-place upgrades -->
<AssemblyVersion>$(MajorVersion).$(MinorVersion).0.0</AssemblyVersion>
<VersionSuffix>preview.1</VersionSuffix>
</PropertyGroup>

<!-- This version is used as the nuget package version -->
Expand Down
17 changes: 17 additions & 0 deletions src/DurableTask.AzureStorage/OrchestrationSessionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,23 @@ async Task ScheduleOrchestrationStatePrefetch(
batch.OrchestrationExecutionId,
cancellationToken);

int numEvents = history.Events.Count;
bool hasMaximumHistorySize = this.settings.MaxHistoryEvents != null;
if (hasMaximumHistorySize && numEvents > this.settings.MaxHistoryEvents)
{
// number of history events exceeds limit, fail this orchestrator
this.settings.Logger.OrchestrationProcessingFailure(
this.storageAccountName,
this.settings.TaskHubName,
batch.OrchestrationInstanceId,
batch.OrchestrationExecutionId,
$"Orchestration has {numEvents} events, exceeding the limit of {this.settings.MaxHistoryEvents}. Orchestrator will be terminated.");

// Fail the orchestrator by creating Terminate event
ExecutionTerminatedEvent terminateEvent = new ExecutionTerminatedEvent(-1, $"Orchestrator exceeded maximum history size: {this.settings.MaxHistoryEvents}");
history.Events.Add(terminateEvent);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to add this logic to CompleteTaskOrchestrationWorkItemAsync rather than doing it here. That way you can terminate the orchestration without needing to load and execute the excessive history, which is required in this case. There would be some logging benefits as well since the termination happens on the same logical thread as the orchestration execution, rather than on a background thread, which is the case for history loading.

}

batch.OrchestrationState = new OrchestrationRuntimeState(history.Events);
batch.ETag = history.ETag;
batch.LastCheckpointTime = history.LastCheckpointTime;
Expand Down
35 changes: 25 additions & 10 deletions src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
namespace DurableTask.AzureStorage.Tracking
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
Expand Down Expand Up @@ -226,22 +225,36 @@ public override async Task<bool> ExistsAsync()

return new OrchestrationHistory(historyEvents, checkpointCompletionTime, eTagValue, trackingStoreContext);
}

#nullable enable
async Task<TableEntitiesResponseInfo<DynamicTableEntity>> GetHistoryEntitiesResponseInfoAsync(string instanceId, string expectedExecutionId, IList<string> projectionColumns, CancellationToken cancellationToken = default(CancellationToken))
{
var sanitizedInstanceId = KeySanitation.EscapePartitionKey(instanceId);
string filterCondition = TableQuery.GenerateFilterCondition(PartitionKeyProperty, QueryComparisons.Equal, sanitizedInstanceId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we're always overwriting this variable value with this change. To make the intent more clear, I suggest we rename this variable to to something like instanceIdFilterCondition and then declare filterCondition below (on line 256) as a new variable.

if (!string.IsNullOrEmpty(expectedExecutionId))

// we need to get the executionID to account for continueAsNew scenarios, where a given instanceID may have multiple histories
// we can obtain this from the sentinel row, which always has the latest executionID
if (string.IsNullOrWhiteSpace(expectedExecutionId))
{
// Filter down to a specific generation.
var rowKeyOrExecutionId = TableQuery.CombineFilters(
TableQuery.GenerateFilterCondition("RowKey", QueryComparisons.Equal, SentinelRowKey),
TableOperators.Or,
TableQuery.GenerateFilterCondition("ExecutionId", QueryComparisons.Equal, expectedExecutionId));
var sentinelRowFilter = TableQuery.GenerateFilterCondition("RowKey", QueryComparisons.Equal, SentinelRowKey);
var sentinelRowForInstanceIdFilter = TableQuery.CombineFilters(filterCondition, TableOperators.And, sentinelRowFilter);

filterCondition = TableQuery.CombineFilters(filterCondition, TableOperators.And, rowKeyOrExecutionId);
TableQuery<DynamicTableEntity> sentinelRowQuery = new TableQuery<DynamicTableEntity>().Where(sentinelRowForInstanceIdFilter);
var sentinelRowQueryResponse = await this.HistoryTable.ExecuteQueryAsync(sentinelRowQuery, cancellationToken);

if (sentinelRowQueryResponse.ReturnedEntities != null && sentinelRowQueryResponse.ReturnedEntities.Count > 0)
{
expectedExecutionId = sentinelRowQueryResponse.ReturnedEntities.FirstOrDefault().Properties["ExecutionId"].StringValue;
}
}

// Filter down to a specific generation.
var rowKeyOrExecutionId = TableQuery.CombineFilters(
TableQuery.GenerateFilterCondition("RowKey", QueryComparisons.Equal, SentinelRowKey),
TableOperators.Or,
TableQuery.GenerateFilterCondition("ExecutionId", QueryComparisons.Equal, expectedExecutionId));

filterCondition = TableQuery.CombineFilters(filterCondition, TableOperators.And, rowKeyOrExecutionId);

TableQuery<DynamicTableEntity> query = new TableQuery<DynamicTableEntity>().Where(filterCondition);

if (projectionColumns != null)
Expand All @@ -250,9 +263,11 @@ public override async Task<bool> ExistsAsync()
}

var tableEntitiesResponseInfo = await this.HistoryTable.ExecuteQueryAsync(query, cancellationToken);

return tableEntitiesResponseInfo;

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will remove this extra whitespace before merging. Did not notice it


}
#nullable disable

async Task<IList<DynamicTableEntity>> QueryHistoryAsync(TableQuery<DynamicTableEntity> query, string instanceId, CancellationToken cancellationToken)
{
Expand Down
Loading