diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs index 609a8b35d..2dce87152 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs @@ -318,5 +318,12 @@ internal LogHelper Logger /// Consumers that require separate dispatch (such as the new out-of-proc v2 SDKs) must set this to true. /// public bool UseSeparateQueueForEntityWorkItems { get; set; } = false; + + /// + /// Gets or sets the maximum number of history events for a single orchestration instance. + /// An orchestrator that goes beyond this limit is automatically terminated. + /// By default, this is null, which means there is no limit. + /// + public int? MaxHistoryEvents { get; set; } = null; } } diff --git a/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj b/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj index 60aa12404..a3bb1e3b6 100644 --- a/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj +++ b/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj @@ -20,7 +20,7 @@ 1 - 17 + 18 3 $(MajorVersion).$(MinorVersion).$(PatchVersion) $(VersionPrefix).0 @@ -28,6 +28,7 @@ $(VersionPrefix).$(FileVersionRevision) $(MajorVersion).$(MinorVersion).0.0 + preview.1 diff --git a/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs b/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs index 2e2b7282d..b60b3cd46 100644 --- a/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs +++ b/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs @@ -515,6 +515,23 @@ async Task ScheduleOrchestrationStatePrefetch( batch.OrchestrationExecutionId, cancellationToken); + int numEvents = history.Events.Count; + bool hasMaximumHistorySize = this.settings.MaxHistoryEvents != null; + if (hasMaximumHistorySize && numEvents > this.settings.MaxHistoryEvents) + { + // number of history events exceeds limit, fail this orchestrator + this.settings.Logger.OrchestrationProcessingFailure( + this.storageAccountName, + this.settings.TaskHubName, + batch.OrchestrationInstanceId, + batch.OrchestrationExecutionId, + $"Orchestration has {numEvents} events, exceeding the limit of {this.settings.MaxHistoryEvents}. Orchestrator will be terminated."); + + // Fail the orchestrator by creating Terminate event + ExecutionTerminatedEvent terminateEvent = new ExecutionTerminatedEvent(-1, $"Orchestrator exceeded maximum history size: {this.settings.MaxHistoryEvents}"); + history.Events.Add(terminateEvent); + } + batch.OrchestrationState = new OrchestrationRuntimeState(history.Events); batch.ETag = history.ETag; batch.LastCheckpointTime = history.LastCheckpointTime; diff --git a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs index 15469c1b0..a2a4cf06f 100644 --- a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs @@ -14,7 +14,6 @@ namespace DurableTask.AzureStorage.Tracking { using System; - using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; @@ -226,22 +225,36 @@ public override async Task ExistsAsync() return new OrchestrationHistory(historyEvents, checkpointCompletionTime, eTagValue, trackingStoreContext); } - +#nullable enable async Task> GetHistoryEntitiesResponseInfoAsync(string instanceId, string expectedExecutionId, IList projectionColumns, CancellationToken cancellationToken = default(CancellationToken)) { var sanitizedInstanceId = KeySanitation.EscapePartitionKey(instanceId); string filterCondition = TableQuery.GenerateFilterCondition(PartitionKeyProperty, QueryComparisons.Equal, sanitizedInstanceId); - if (!string.IsNullOrEmpty(expectedExecutionId)) + + // we need to get the executionID to account for continueAsNew scenarios, where a given instanceID may have multiple histories + // we can obtain this from the sentinel row, which always has the latest executionID + if (string.IsNullOrWhiteSpace(expectedExecutionId)) { - // Filter down to a specific generation. - var rowKeyOrExecutionId = TableQuery.CombineFilters( - TableQuery.GenerateFilterCondition("RowKey", QueryComparisons.Equal, SentinelRowKey), - TableOperators.Or, - TableQuery.GenerateFilterCondition("ExecutionId", QueryComparisons.Equal, expectedExecutionId)); + var sentinelRowFilter = TableQuery.GenerateFilterCondition("RowKey", QueryComparisons.Equal, SentinelRowKey); + var sentinelRowForInstanceIdFilter = TableQuery.CombineFilters(filterCondition, TableOperators.And, sentinelRowFilter); - filterCondition = TableQuery.CombineFilters(filterCondition, TableOperators.And, rowKeyOrExecutionId); + TableQuery sentinelRowQuery = new TableQuery().Where(sentinelRowForInstanceIdFilter); + var sentinelRowQueryResponse = await this.HistoryTable.ExecuteQueryAsync(sentinelRowQuery, cancellationToken); + + if (sentinelRowQueryResponse.ReturnedEntities != null && sentinelRowQueryResponse.ReturnedEntities.Count > 0) + { + expectedExecutionId = sentinelRowQueryResponse.ReturnedEntities.FirstOrDefault().Properties["ExecutionId"].StringValue; + } } + // Filter down to a specific generation. + var rowKeyOrExecutionId = TableQuery.CombineFilters( + TableQuery.GenerateFilterCondition("RowKey", QueryComparisons.Equal, SentinelRowKey), + TableOperators.Or, + TableQuery.GenerateFilterCondition("ExecutionId", QueryComparisons.Equal, expectedExecutionId)); + + filterCondition = TableQuery.CombineFilters(filterCondition, TableOperators.And, rowKeyOrExecutionId); + TableQuery query = new TableQuery().Where(filterCondition); if (projectionColumns != null) @@ -250,9 +263,11 @@ public override async Task ExistsAsync() } var tableEntitiesResponseInfo = await this.HistoryTable.ExecuteQueryAsync(query, cancellationToken); - return tableEntitiesResponseInfo; + + } +#nullable disable async Task> QueryHistoryAsync(TableQuery query, string instanceId, CancellationToken cancellationToken) {