From 093822bd0f0c66d1729542f4f4f2821449daa99d Mon Sep 17 00:00:00 2001 From: Einar Date: Tue, 30 Jan 2024 12:00:36 +0100 Subject: [PATCH] Add full support for data point status codes (#371) --- Cognite.Extensions/Cognite.Extensions.csproj | 2 +- Cognite.Extensions/CogniteUtils.cs | 44 +- .../TimeSeries/Alpha/DataPointExtensions.cs | 688 ++++++++++++++++++ .../TimeSeries/Alpha/StatusCodeHelpers.cs | 686 +++++++++++++++++ ExtractorUtils.Test/unit/StatusCodeTests.cs | 31 + ExtractorUtils/Cognite/CogniteDestination.cs | 90 ++- version | 2 +- 7 files changed, 1523 insertions(+), 20 deletions(-) create mode 100644 Cognite.Extensions/TimeSeries/Alpha/DataPointExtensions.cs create mode 100644 Cognite.Extensions/TimeSeries/Alpha/StatusCodeHelpers.cs create mode 100644 ExtractorUtils.Test/unit/StatusCodeTests.cs diff --git a/Cognite.Extensions/Cognite.Extensions.csproj b/Cognite.Extensions/Cognite.Extensions.csproj index 100f28cb..24e3a9fa 100644 --- a/Cognite.Extensions/Cognite.Extensions.csproj +++ b/Cognite.Extensions/Cognite.Extensions.csproj @@ -28,7 +28,7 @@ - + diff --git a/Cognite.Extensions/CogniteUtils.cs b/Cognite.Extensions/CogniteUtils.cs index f22b0a89..a4dc36da 100644 --- a/Cognite.Extensions/CogniteUtils.cs +++ b/Cognite.Extensions/CogniteUtils.cs @@ -14,6 +14,8 @@ using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging.Abstractions; +using Com.Cognite.V1.Timeseries.Proto.Alpha; +using Cognite.Extensions.Alpha; namespace Cognite.Extensions { @@ -26,12 +28,12 @@ public static class CogniteUtils /// Cognite min double value /// public const double NumericValueMin = -1e+100; - + /// /// Cognite max double value /// public const double NumericValueMax = 1e+100; - + /// /// Cognite max string length /// @@ -256,7 +258,7 @@ public static IDictionary> ReadDatapoints(Strea total++; dps.Add(dp); } - + if (!ret.TryGetValue(id, out var datapoints)) { ret[id] = dps; @@ -570,7 +572,7 @@ public static IAsyncPolicy GetRetryPolicy(ILogger? logger, retry => TimeSpan.FromMilliseconds(Math.Min(125 * Math.Pow(2, Math.Min(retry - 1, numRetries)), delay)), GetRetryHandler(logger)); } - + } /// /// Get a polly timeout policy with a timeout set to milliseconds @@ -611,7 +613,8 @@ public class Datapoint private readonly long _timestamp; private readonly double? _numericValue; private readonly string? _stringValue; - + private readonly StatusCode _statusCode; + /// /// Timestamp in Unix time milliseconds /// @@ -632,16 +635,24 @@ public class Datapoint /// public bool IsString => _numericValue == null; + /// + /// Datapoint status code. + /// + public StatusCode Status => _statusCode; + /// /// Creates a numeric data point /// /// Timestamp /// double value - public Datapoint(DateTime timestamp, double numericValue) + /// ALPHA: set the data point status code. + /// This is only used if the alpha datapoints endpoint is used. + public Datapoint(DateTime timestamp, double numericValue, StatusCode? statusCode = null) { _timestamp = timestamp.ToUnixTimeMilliseconds(); _numericValue = numericValue; _stringValue = null; + _statusCode = statusCode ?? new StatusCode(0); } /// @@ -654,17 +665,21 @@ public Datapoint(DateTime timestamp, string? stringValue) _timestamp = timestamp.ToUnixTimeMilliseconds(); _numericValue = null; _stringValue = stringValue; + _statusCode = new StatusCode(0); } /// /// Creates a numeric data point /// /// Timestamp /// double value - public Datapoint(long timestamp, double numericValue) + /// ALPHA: set the data point status code. + /// This is only used if the alpha datapoints endpoint is used. + public Datapoint(long timestamp, double numericValue, StatusCode? statusCode = null) { _timestamp = timestamp; _numericValue = numericValue; _stringValue = null; + _statusCode = statusCode ?? new StatusCode(0); } /// @@ -677,6 +692,7 @@ public Datapoint(long timestamp, string? stringValue) _timestamp = timestamp; _numericValue = null; _stringValue = stringValue; + _statusCode = new StatusCode(0); } /// /// Convert datapoint into an array of bytes on the form @@ -685,7 +701,7 @@ public Datapoint(long timestamp, string? stringValue) /// public byte[] ToStorableBytes() { - ushort size = sizeof(long) + sizeof(bool); + ushort size = sizeof(long) + sizeof(bool) + sizeof(ulong); byte[] valBytes; @@ -705,6 +721,8 @@ public byte[] ToStorableBytes() pos += sizeof(long); Buffer.BlockCopy(BitConverter.GetBytes(IsString), 0, bytes, pos, sizeof(bool)); pos += sizeof(bool); + Buffer.BlockCopy(BitConverter.GetBytes(_statusCode.Code), 0, bytes, pos, sizeof(ulong)); + pos += sizeof(ulong); Buffer.BlockCopy(valBytes, 0, bytes, pos, valBytes.Length); @@ -720,12 +738,14 @@ public byte[] ToStorableBytes() { throw new ArgumentNullException(nameof(stream)); } - var baseBytes = new byte[sizeof(long) + sizeof(bool)]; - int read = stream.Read(baseBytes, 0, sizeof(long) + sizeof(bool)); - if (read < sizeof(long) + sizeof(bool)) return null; + var readLength = sizeof(long) + sizeof(bool) + sizeof(ulong); + var baseBytes = new byte[readLength]; + int read = stream.Read(baseBytes, 0, readLength); + if (read < readLength) return null; var timestamp = BitConverter.ToInt64(baseBytes, 0); var isString = BitConverter.ToBoolean(baseBytes, sizeof(long)); + var statusCode = BitConverter.ToUInt64(baseBytes, sizeof(long) + sizeof(bool)); if (isString) { @@ -737,7 +757,7 @@ public byte[] ToStorableBytes() var valueBytes = new byte[sizeof(double)]; if (stream.Read(valueBytes, 0, sizeof(double)) < sizeof(double)) return null; double value = BitConverter.ToDouble(valueBytes, 0); - return new Datapoint(timestamp, value); + return new Datapoint(timestamp, value, new StatusCode(statusCode)); } } } diff --git a/Cognite.Extensions/TimeSeries/Alpha/DataPointExtensions.cs b/Cognite.Extensions/TimeSeries/Alpha/DataPointExtensions.cs new file mode 100644 index 00000000..0d6c3171 --- /dev/null +++ b/Cognite.Extensions/TimeSeries/Alpha/DataPointExtensions.cs @@ -0,0 +1,688 @@ +using Cognite.Extractor.Common; +using CogniteSdk; +using CogniteSdk.Resources; +using Com.Cognite.V1.Timeseries.Proto.Alpha; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using Prometheus; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Data.Common; +using System.IO.Compression; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using TimeRange = Cognite.Extractor.Common.TimeRange; + +namespace Cognite.Extensions.Alpha +{ + /// + /// Extensions to datapoints + /// + public static class DataPointExtensions + { + private const int _maxNumOfVerifyRequests = 10; + private static ILogger _logger = new NullLogger(); + + internal static void SetLogger(ILogger logger) + { + _logger = logger; + } + + /// + /// Create a protobuf insertion request from dictionary + /// + /// Datapoints to insert + /// Converted request + public static DataPointInsertionRequest ToInsertRequest(this IDictionary> dps) + { + if (dps == null) throw new ArgumentNullException(nameof(dps)); + var request = new DataPointInsertionRequest(); + var dataPointCount = 0; + foreach (var kvp in dps) + { + var item = new DataPointInsertionItem(); + if (kvp.Key.Id.HasValue) + { + item.Id = kvp.Key.Id.Value; + } + else + { + item.ExternalId = kvp.Key.ExternalId.ToString(); + } + if (!kvp.Value.Any()) + { + continue; + } + var stringPoints = kvp.Value + .Where(dp => dp.StringValue != null) + .Select(dp => new StringDatapoint + { + Timestamp = dp.Timestamp, + Value = dp.StringValue + }); + var numericPoints = kvp.Value + .Where(dp => dp.NumericValue.HasValue) + .Select(dp => new NumericDatapoint + { + Timestamp = dp.Timestamp, + Value = dp.NumericValue!.Value, + Status = new Status + { + Code = (long)dp.Status.Code + } + }); + if (stringPoints.Any()) + { + var stringData = new StringDatapoints(); + stringData.Datapoints.AddRange(stringPoints); + if (stringData.Datapoints.Count > 0) + { + item.StringDatapoints = stringData; + request.Items.Add(item); + dataPointCount += stringData.Datapoints.Count; + } + } + else + { + var doubleData = new NumericDatapoints(); + doubleData.Datapoints.AddRange(numericPoints); + if (doubleData.Datapoints.Count > 0) + { + item.NumericDatapoints = doubleData; + request.Items.Add(item); + dataPointCount += doubleData.Datapoints.Count; + } + } + } + return request; + } + + /// + /// Insert datapoints to timeseries. Insertions are chunked and cleaned according to configuration, + /// and can optionally handle errors. If any timeseries missing from the result and inserted by externalId, + /// they are created before the points are inserted again. + /// + /// Cognite client + /// Datapoints to insert + /// Maximum number of timeseries per chunk + /// Maximum number of datapoints per timeseries + /// Maximum number of parallel request + /// Maximum number of timeseries to retrieve per request + /// Maximum number of parallel requests to retrieve timeseries + /// Number of datapoints total before using gzip compression. + /// How to sanitize datapoints + /// How to handle retries + /// Optional replacement for NaN double values + /// Optional data set id + /// Cancellation token + /// Results with a list of errors. If TimeSeriesResult is null, no timeseries were attempted created. + public static async Task<(CogniteResult DataPointResult, CogniteResult? TimeSeriesResult)> InsertAsyncCreateMissing( + Client client, + IDictionary> points, + int keyChunkSize, + int valueChunkSize, + int throttleSize, + int timeseriesChunkSize, + int timeseriesThrottleSize, + int gzipCountLimit, + SanitationMode sanitationMode, + RetryMode retryMode, + double? nanReplacement, + long? dataSetId, + CancellationToken token) + { + if (client == null) throw new ArgumentNullException(nameof(client)); + if (points == null) throw new ArgumentNullException(nameof(points)); + + var result = await InsertAsync(client, points, keyChunkSize, valueChunkSize, throttleSize, + timeseriesChunkSize, timeseriesThrottleSize, gzipCountLimit, sanitationMode, + RetryMode.OnError, nanReplacement, token).ConfigureAwait(false); + + if (result.Errors?.Any(err => err.Type == ErrorType.FatalFailure) ?? false) return (result, null); + + var missingIds = new HashSet((result.Errors ?? Enumerable.Empty()) + .Where(err => err.Type == ErrorType.ItemMissing) + .SelectMany(err => err.Values ?? Enumerable.Empty()) + .Where(idt => idt.ExternalId != null)); + + if (!missingIds.Any()) return (result, null); + + _logger.LogInformation("Creating {Count} missing timeseries", missingIds.Count); + + var toCreate = new List(); + foreach (var id in missingIds) + { + var dp = points[id].FirstOrDefault(); + if (dp == null) continue; + + bool isString = dp.NumericValue == null; + + toCreate.Add(new TimeSeriesCreate + { + ExternalId = id.ExternalId, + IsString = isString, + DataSetId = dataSetId + }); + } + + var tsResult = await client.TimeSeries.EnsureTimeSeriesExistsAsync( + toCreate, + timeseriesChunkSize, + timeseriesThrottleSize, + retryMode, + sanitationMode, + token).ConfigureAwait(false); + + if (tsResult.Errors?.Any(err => err.Type != ErrorType.ItemExists) ?? false) return (result, tsResult); + + var pointsToInsert = points.Where(kvp => missingIds.Contains(kvp.Key)).ToDictionary(kvp => kvp.Key, kvp => kvp.Value); + + var result2 = await InsertAsync(client, pointsToInsert, keyChunkSize, valueChunkSize, throttleSize, + timeseriesChunkSize, timeseriesThrottleSize, gzipCountLimit, sanitationMode, + RetryMode.OnError, nanReplacement, token).ConfigureAwait(false); + + return (result.Merge(result2), tsResult); + } + + + /// + /// Insert datapoints to timeseries. Insertions are chunked and cleaned according to configuration, + /// and can optionally handle errors. + /// + /// Cognite client + /// Datapoints to insert + /// Maximum number of timeseries per chunk + /// Maximum number of datapoints per timeseries + /// Maximum number of parallel request + /// Maximum number of timeseries to retrieve per request + /// Maximum number of parallel requests to retrieve timeseries + /// Number of datapoints total before using gzip compression. + /// How to sanitize datapoints + /// How to handle retries + /// Optional replacement for NaN double values + /// Cancellation token + /// Result with a list of errors + public static async Task> InsertAsync( + Client client, + IDictionary> points, + int keyChunkSize, + int valueChunkSize, + int throttleSize, + int timeseriesChunkSize, + int timeseriesThrottleSize, + int gzipCountLimit, + SanitationMode sanitationMode, + RetryMode retryMode, + double? nanReplacement, + CancellationToken token) + { + IEnumerable> errors; + (points, errors) = Sanitation.CleanDataPointsRequest(points, sanitationMode, nanReplacement); + + var chunks = points + .Select(p => (p.Key, p.Value)) + .ChunkBy(valueChunkSize, keyChunkSize) + .Select(chunk => chunk.ToDictionary(pair => pair.Key, pair => pair.Values)) + .ToList(); + + int size = chunks.Count + (errors.Any() ? 1 : 0); + var results = new CogniteResult[size]; + + if (errors.Any()) + { + results[size - 1] = new CogniteResult(errors); + if (size == 1) return results[size - 1]; + } + if (size == 0) return new CogniteResult(null); + + _logger.LogDebug("Inserting timeseries datapoints. Number of timeseries: {Number}. Number of chunks: {Chunks}", points.Count, chunks.Count); + var generators = chunks + .Select>, Func>( + (chunk, idx) => async () => + { + var result = await + InsertDataPointsHandleErrors(client, chunk, timeseriesChunkSize, timeseriesThrottleSize, gzipCountLimit, retryMode, token) + .ConfigureAwait(false); + results[idx] = result; + }); + + int taskNum = 0; + await generators.RunThrottled( + throttleSize, + (_) => + { + if (chunks.Count > 1) + _logger.LogDebug("{MethodName} completed {NumDone}/{TotalNum} tasks", + nameof(InsertAsync), ++taskNum, chunks.Count); + }, + token).ConfigureAwait(false); + + return CogniteResult.Merge(results); + } + + private static async Task> InsertDataPointsHandleErrors( + Client client, + IDictionary> points, + int timeseriesChunkSize, + int timeseriesThrottleSize, + int gzipCountLimit, + RetryMode retryMode, + CancellationToken token) + { + var errors = new List>(); + while (points != null && points.Any() && !token.IsCancellationRequested) + { + var request = points.ToInsertRequest(); + try + { + bool useGzip = false; + int count = points.Sum(kvp => kvp.Value.Count()); + if (gzipCountLimit >= 0 && count >= gzipCountLimit) + { + useGzip = true; + } + + if (useGzip) + { + using (CdfMetrics.Datapoints.WithLabels("create")) + { + await client.Alpha.DataPoints.CreateAsync(request, CompressionLevel.Fastest, token).ConfigureAwait(false); + } + } + else + { + using (CdfMetrics.Datapoints.WithLabels("create")) + { + await client.Alpha.DataPoints.CreateAsync(request, token).ConfigureAwait(false); + } + } + + CdfMetrics.NumberDatapoints.Inc(count); + + _logger.LogDebug("Created {cnt} datapoints for {ts} timeseries in CDF", count, points.Count); + return new CogniteResult(errors); + } + catch (Exception ex) + { + _logger.LogDebug("Failed to create datapoints for {seq} timeseries: {msg}", points.Count, ex.Message); + var error = ResultHandlers.ParseException(ex, RequestType.CreateDatapoints); + + if (error.Type == ErrorType.FatalFailure + && (retryMode == RetryMode.OnFatal + || retryMode == RetryMode.OnFatalKeepDuplicates)) + { + await Task.Delay(1000, token).ConfigureAwait(false); + } + else if (retryMode == RetryMode.None) + { + errors.Add(error); + break; + } + else + { + if (!error.Complete) + { + (error, points) = await ResultHandlers + .VerifyDatapointsFromCDF(client.TimeSeries, error, + points, timeseriesChunkSize, timeseriesThrottleSize, token) + .ConfigureAwait(false); + errors.Add(error); + } + else + { + errors.Add(error); + } + points = ResultHandlers.CleanFromError(error, points); + } + } + } + + return new CogniteResult(errors); + } + + /// + /// Deletes ranges of data points in CDF. The parameter contains the first (inclusive) + /// and last (inclusive) timestamps for the range. After the delete request is sent to CDF, attempt to confirm that + /// the data points were deleted by querying the time range. Deletes in CDF are eventually consistent, failing to + /// confirm the deletion does not mean that the operation failed in CDF + /// + /// Cognite datapoints resource + /// Ranges to delete + /// Chunk size for delete operations + /// Chunk size for list operations + /// Throttle size for delete operations + /// Throttle size for list operations + /// Cancelation token + /// A object with any missing ids or ids with unconfirmed deletes + public static async Task DeleteIgnoreErrorsAsync( + this AlphaDataPointsResource dataPoints, + IDictionary> ranges, + int deleteChunkSize, + int listChunkSize, + int deleteThrottleSize, + int listThrottleSize, + CancellationToken token) + { + if (ranges == null) + { + throw new ArgumentNullException(nameof(ranges)); + } + var toDelete = new List(); + foreach (var kvp in ranges) + { + _logger.LogTrace("Deleting data points from time series {Name}. Ranges: {Ranges}", + kvp.Key.ToString(), string.Join(", ", kvp.Value.Select(v => v.ToString()))); + toDelete.AddRange(kvp.Value.Select(r => + new IdentityWithRange + { + ExternalId = kvp.Key.ExternalId, + Id = kvp.Key.Id, + InclusiveBegin = r.First.ToUnixTimeMilliseconds(), + ExclusiveEnd = r.Last.ToUnixTimeMilliseconds() + 1 // exclusive + }) + ); + } + + var chunks = toDelete + .ChunkBy(deleteChunkSize) + .ToList(); // Maximum number of items in the /timeseries/data/delete endpoint. + + var missing = new HashSet(); + var mutex = new object(); + + var generators = chunks + .Select, Func>( + c => async () => + { + var errors = await DeleteDataPointsIgnoreErrorsChunk(dataPoints, c, token).ConfigureAwait(false); + lock (mutex) + { + missing.UnionWith(errors); + } + }); + + var taskNum = 0; + await generators.RunThrottled( + deleteThrottleSize, + (_) => + { + if (chunks.Count > 1) + _logger.LogDebug("{MethodName} completed {NumDone}/{TotalNum} tasks", + nameof(DeleteIgnoreErrorsAsync), ++taskNum, chunks.Count); + }, + token).ConfigureAwait(false); + + return new DeleteError(missing, Enumerable.Empty()); + } + + private static async Task> DeleteDataPointsIgnoreErrorsChunk( + AlphaDataPointsResource dataPoints, + IEnumerable chunks, + CancellationToken token) + { + var missing = new HashSet(); + if (!chunks.Any()) return missing; + + var deleteQuery = new DataPointsDelete() + { + Items = chunks + }; + try + { + using (CdfMetrics.Datapoints.WithLabels("delete").NewTimer()) + { + await dataPoints.DeleteAsync(deleteQuery, token).ConfigureAwait(false); + } + } + catch (ResponseException e) when (e.Code == 400 && e.Missing != null && e.Missing.Any()) + { + CogniteUtils.ExtractMissingFromResponseException(missing, e); + var remaining = chunks.Where(i => !missing.Contains(i.Id.HasValue ? new Identity(i.Id.Value) : new Identity(i.ExternalId))); + var errors = await DeleteDataPointsIgnoreErrorsChunk(dataPoints, remaining, token).ConfigureAwait(false); + missing.UnionWith(errors); + } + return missing; + } + + /// + /// Get the last timestamp for each time series given in before each given timestamp. + /// Ignores timeseries not in CDF. The return dictionary contains only ids that exist in CDF. + /// Note that end limits closer to actual endpoints in CDF is considerably faster. + /// + /// DataPointsResource to use + /// ExternalIds and last timestamp. Let last timestamp be DateTime.MaxValue to use default ("now"). + /// Number of timeseries per request + /// Maximum number of parallel requests + /// Cancellation token + /// Dictionary from externalId to last timestamp, only contains existing timeseries + public static async Task> GetLatestTimestamps( + this AlphaDataPointsResource dataPoints, + IEnumerable<(Identity id, DateTime before)> ids, + int chunkSize, + int throttleSize, + CancellationToken token) + { + var ret = new ConcurrentDictionary(); + var idSet = new HashSet(ids.Select(id => id.id)); + + var chunks = ids + .Select((pair) => + { + var id = pair.id; + IdentityWithBefore idt = id.ExternalId == null ? IdentityWithBefore.Create(id.Id!.Value) : IdentityWithBefore.Create(id.ExternalId); + if (pair.before != DateTime.MaxValue) + { + idt.Before = pair.before.ToUnixTimeMilliseconds().ToString(); + } + return idt; + }) + .ChunkBy(chunkSize) + .ToList(); + + var generators = chunks.Select, Func>( + chunk => async () => + { + IEnumerable> dps; + using (CdfMetrics.Datapoints.WithLabels("latest").NewTimer()) + { + dps = await dataPoints.LatestAsync( + new DataPointsLatestQuery + { + IgnoreUnknownIds = true, + Items = chunk + }, token).ConfigureAwait(false); + } + + foreach (var dp in dps) + { + if (dp.DataPoints.Any()) + { + Identity id; + if (dp.ExternalId != null) + { + id = new Identity(dp.ExternalId); + if (!idSet.Contains(id)) + { + id = new Identity(dp.Id); + } + } + else + { + id = new Identity(dp.Id); + } + ret[id] = CogniteTime.FromUnixTimeMilliseconds(dp.DataPoints.First().Timestamp); + } + } + }); + int numTasks = 0; + await generators + .RunThrottled(throttleSize, (_) => + _logger.LogDebug("Last timestamp from CDF: {num}/{total}", ++numTasks, chunks.Count), token) + .ConfigureAwait(false); + return ret; + } + + /// + /// Get the first timestamp for each time series given in after each given timestamp. + /// Ignores timeseries not in CDF. The return dictionary contains only ids that exist in CDF. + /// + /// DataPointsResource to use + /// ExternalIds and last timestamp. Let last timestamp be Epoch to use default. + /// Number of timeseries per request + /// Maximum number of parallel requests + /// Cancellation token + /// Dictionary from externalId to first timestamp, only contains existing timeseries + public static async Task> GetEarliestTimestamps( + this AlphaDataPointsResource dataPoints, + IEnumerable<(Identity id, DateTime after)> ids, + int chunkSize, + int throttleSize, + CancellationToken token) + { + var ret = new ConcurrentDictionary(); + var idSet = new HashSet(ids.Select(id => id.id)); + + var chunks = ids + .Select((pair) => + { + var query = new DataPointsQueryItem(); + if (pair.id.Id.HasValue) + { + query.Id = pair.id.Id.Value; + } + else + { + query.ExternalId = pair.id.ExternalId; + } + if (pair.after > CogniteTime.DateTimeEpoch) + { + query.Start = pair.after.ToUnixTimeMilliseconds().ToString(); + } + return query; + }) + .ChunkBy(chunkSize) + .ToList(); + + var generators = chunks.Select, Func>( + chunk => async () => + { + DataPointListResponse dps; + using (CdfMetrics.Datapoints.WithLabels("first").NewTimer()) + { + dps = await dataPoints.ListAsync( + new DataPointsQuery + { + IgnoreUnknownIds = true, + Items = chunk, + Limit = 1 + }, token).ConfigureAwait(false); + } + + foreach (var dp in dps.Items) + { + Identity id; + if (dp.ExternalId != null) + { + id = new Identity(dp.ExternalId); + if (!idSet.Contains(id)) + { + id = new Identity(dp.Id); + } + } + else + { + id = new Identity(dp.Id); + } + if (dp.DatapointTypeCase == DataPointListItem.DatapointTypeOneofCase.NumericDatapoints + && dp.NumericDatapoints.Datapoints.Any()) + { + ret[id] = CogniteTime.FromUnixTimeMilliseconds(dp.NumericDatapoints.Datapoints.First().Timestamp); + } + else if (dp.DatapointTypeCase == DataPointListItem.DatapointTypeOneofCase.StringDatapoints + && dp.StringDatapoints.Datapoints.Any()) + { + ret[id] = CogniteTime.FromUnixTimeMilliseconds(dp.StringDatapoints.Datapoints.First().Timestamp); + } + } + }); + int numTasks = 0; + await generators + .RunThrottled(throttleSize, (_) => + _logger.LogDebug("First timestamp from CDF: {num}/{total}", ++numTasks, chunks.Count), token) + .ConfigureAwait(false); + return ret; + } + /// + /// Fetches the range of datapoints present in CDF. Limited by given ranges for each id. + /// Note that end limits closer to actual endpoints in CDF is considerably faster. + /// + /// DataPointsResource to use + /// ExternalIds and start/end of region to look for datapoints. + /// Use TimeRange.Complete for first after epoch, and last before now. + /// Number of timeseries to read for each earliest request + /// Number of timeseries to read for each latest request + /// Max number of parallel requests + /// If true, fetch latest timestamps + /// If true, fetch earliest timestamps + /// Cancellation token + /// + public static async Task> GetExtractedRanges( + this AlphaDataPointsResource dataPoints, + IEnumerable<(Identity id, TimeRange limit)> ids, + int chunkSizeEarliest, + int chunkSizeLatest, + int throttleSize, + bool latest, + bool earliest, + CancellationToken token) + { + if (ids == null) + { + throw new ArgumentNullException(nameof(ids)); + } + _logger.LogDebug("Getting extracted ranges for {num} timeseries", ids.Count()); + + if (latest && earliest) throttleSize = Math.Max(1, throttleSize / 2); + + var ranges = ids.ToDictionary(pair => pair.id, pair => TimeRange.Empty); + var tasks = new List>>(); + if (latest) + { + tasks.Add(dataPoints.GetLatestTimestamps(ids.Select(pair => (pair.id, pair.limit?.Last ?? DateTime.MaxValue)), + chunkSizeLatest, throttleSize, token)); + } + if (earliest) + { + tasks.Add(dataPoints.GetEarliestTimestamps(ids.Select(pair => (pair.id, pair.limit?.First ?? CogniteTime.DateTimeEpoch)), + chunkSizeEarliest, throttleSize, token)); + } + var results = await Task.WhenAll(tasks).ConfigureAwait(false); + if (latest) + { + var latestResult = results[0]; + foreach (var id in ids) + { + if (latestResult.TryGetValue(id.id, out DateTime ts)) + { + ranges[id.id] = new TimeRange(CogniteTime.DateTimeEpoch, ts); + } + } + } + if (earliest) + { + var earliestResult = results[latest ? 1 : 0]; + foreach (var id in ids) + { + if (earliestResult.TryGetValue(id.id, out DateTime ts)) + { + ranges[id.id] = new TimeRange(ts, ranges[id.id].Last); + } + } + } + return ranges; + } + } +} diff --git a/Cognite.Extensions/TimeSeries/Alpha/StatusCodeHelpers.cs b/Cognite.Extensions/TimeSeries/Alpha/StatusCodeHelpers.cs new file mode 100644 index 00000000..68d8ba83 --- /dev/null +++ b/Cognite.Extensions/TimeSeries/Alpha/StatusCodeHelpers.cs @@ -0,0 +1,686 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace Cognite.Extensions.Alpha +{ + // NOTE: This entire section is largely taken from the time series backend, + // it more or less replicates the behavior of the API when it comes to parsing and handling status codes. + // This is necessary for sanitation. + + /// + /// Helper methods for dealing with status codes. + /// + internal static class StatusCodeHelpers + { + private static HashSet _supportedFlags = new HashSet { + "Low", "High", "Constant", + "StructureChanged", "SemanticsChanged", "Overflow", + "MultipleValues", "ExtraData", "Partial", "Interpolated", "Calculated", + }; + + /// + /// Attempt to parse a status code symbol into a numerical status code. + /// + /// Status code symbol + /// Parsed value, or null if parsing failed + /// A string containing the error that caused parsing to fail + /// + public static string? TryParseStatusCodeSymbol(string symbol, out StatusCode? value) + { + if (symbol == null) throw new ArgumentNullException(nameof(symbol)); + + value = null; + + var parts = symbol.Split(' ', ','); + if (!Enum.TryParse(parts[0].Replace("Good_", "Good") + .Replace("Bad_", "Bad") + .Replace("Uncertain_", "Uncertain"), out StatusCodeCategory category)) + { + return "Unknown severity or sub code"; + } + + foreach (var p in parts.Skip(1).Where((p) => !string.IsNullOrWhiteSpace(p))) + { + if (!_supportedFlags.Contains(p)) + { + return $"Unsupported flag {p}"; + } + } + + var code = (ulong)category; + + if (parts.Contains("StructureChanged")) + { + code |= 1 << 15; + } + if (parts.Contains("SemanticsChanged")) + { + code |= 1 << 14; + } + + var limitRes = TryParseLimit(parts, out var limitValue); + if (limitRes != null) return limitRes; + + var historianRes = TryParseHistorianBits(parts, out var historianValue); + if (historianRes != null) return historianRes; + + code |= limitValue!.Value | historianValue!.Value; + + value = new StatusCode(code); + + return null; + } + + private static string? TryParseLimit(string[] parts, out ulong? value) + { + value = 0; + int limitArguments = 0; + if (parts.Contains("Low")) + { + value = 0x100L; + limitArguments++; + } + if (parts.Contains("High")) + { + value = 0x200L; + limitArguments++; + } + if (parts.Contains("Constant")) + { + value = 0x300L; + limitArguments++; + } + + if (limitArguments > 1) + { + return "More than one status code limit"; + } + + if (value > 0) value |= 1 << 10; + + return null; + } + + private static string? TryParseHistorianBits(string[] parts, out ulong? value) + { + value = 0; + if (parts.Contains("Partial")) value |= 0b100; + if (parts.Contains("MultipleValues")) value |= 0b1_0000; + if (parts.Contains("ExtraData")) value |= 0b1000; + if (parts.Contains("Overflow")) value |= 1 << 7; + + var isInterpolated = parts.Contains("Interpolated"); + var isCalculated = parts.Contains("Calculated"); + if (isInterpolated && isCalculated) + { + return "Calculated and Interpolated flags are mutually exclusive"; + } + + if (isCalculated) value |= 1; + if (isInterpolated) value |= 0b10; + + if (value > 0) value |= 1 << 10; + + return null; + } + + internal const ulong CATEGORY_MASK = 0b1111_1111_1111_1111_0000_0000_0000_0000L; + internal const ulong INFO_BITS_MASK = 0b0000_0000_0000_0000_0000_0011_1111_1111L; + } + + /// + /// Exception thrown by trying to create an invalid status code. + /// + public class InvalidStatusCodeException : Exception + { + /// + public InvalidStatusCodeException(string message) : base(message) + { + } + + /// + public InvalidStatusCodeException(string message, Exception innerException) : base(message, innerException) + { + } + + /// + public InvalidStatusCodeException() + { + } + } + + /// + /// Representation of a status code, deconstructed. + /// + public struct StatusCode + { + /// + /// Status code + /// + public ulong Code { get; } + + internal StatusCode(ulong code) + { + Code = code; + } + + /// + /// Create a status code from a category. + /// + /// Category to create from + /// Status code + public static StatusCode FromCategory(StatusCodeCategory category) + { + return new StatusCode((ulong)category); + } + + + /// + /// Try to parse a status code from a string. + /// + /// Status code symbol to parse + /// Status code, if parsing succeeded, else null + /// Error if parsing failed + public static string? TryParse(string symbol, out StatusCode? result) + { + return StatusCodeHelpers.TryParseStatusCodeSymbol(symbol, out result); + } + + /// + /// Parse a status code from a string, throw an exception if it fails. + /// + /// Status code symbol to parse + /// Parsed status code + /// + public static StatusCode Parse(string symbol) + { + var res = TryParse(symbol, out var result); + + if (res != null) throw new InvalidStatusCodeException(res); + + return result!.Value; + } + + /// + /// Try create a status code from a long. + /// + /// Status code symbol to parse + /// Status code, if parsing succeeded, else null + /// Error if parsing failed + public static string? TryCreate(ulong code, out StatusCode? result) + { + if (code == 0) + { + result = new StatusCode(code); + return null; + } + + result = null; + + if ((code >> 30) == 0b11) return "Unsupported severity: 0b11"; + if ((code & (0b11 << 28)) != 0) return "Bits 28 and 29 are reserved"; + if (!Enum.IsDefined(typeof(StatusCodeCategory), code & StatusCodeHelpers.CATEGORY_MASK)) + { + return "Unknown category"; + } + + if ((code & (0b111 << 1)) != 0) return "Bits 11, 12, and 13 are reserved"; + if ((code & (0b11 << 5)) != 0) return "Bits 5 and 6 are reserved"; + + var infoBits = StatusCodeHelpers.INFO_BITS_MASK & code; + if ((code & (1 << 10)) != 0) + { + if ((code & 0b11) == 0b11) + { + return "Calculated and Interpolated flags are mutually exclusive"; + } + + if (infoBits == 0) + { + return "When info type is 01, info bits must not be 0"; + } + } + else if (infoBits != 0) + { + return "When info type is 00, all info bits must be 0"; + } + + result = new StatusCode(code); + + return null; + } + + /// + /// Parse a status code from a numerical code, throw an exception if it fails. + /// + /// Status code to parse + /// Parsed status code + /// + public static StatusCode Create(ulong code) + { + var res = TryCreate(code, out var result); + + if (res != null) throw new InvalidStatusCodeException(res); + + return result!.Value; + } + + /// + public override readonly bool Equals(object? obj) + { + return obj is StatusCode code && code.Code == Code; + } + + /// + public override readonly int GetHashCode() + { + return Code.GetHashCode(); + } + + /// + public override readonly string ToString() + { + if (Code == 0) return "Good"; + var builder = new StringBuilder(); + + builder.Append(Category); + if (StructureChanged) builder.Append(", StructureChanged"); + if (SemanticsChanged) builder.Append(", SemanticsChanged"); + if (Limit != Limit.None) builder.AppendFormat(", {0}", Limit); + if (IsOverflow) builder.Append(", Overflow"); + if (IsMultiValue) builder.Append(", MultipleValues"); + if (HasExtraData) builder.Append(", ExtraData"); + if (IsPartial) builder.Append(", Partial"); + if (ValueType != ValueType.Raw) builder.AppendFormat(", {0}", ValueType); + + return builder.ToString(); + } + + /// + /// Type of status code. + /// + public readonly Severity Severity => (Severity)((Code >> 30) & 0b11); + + /// + /// Structure changed flag. + /// + public readonly bool StructureChanged => (Code & (1 << 15)) != 0; + /// + /// Semantics changed flag. + /// + public readonly bool SemanticsChanged => (Code & (1 << 14)) != 0; + + /// + /// Whether this is a data value info type. + /// + public readonly bool IsDataValueInfoType => (Code & (1 << 10)) != 0; + + /// + /// Whether the value is bounded by some limit. + /// + public readonly Limit Limit => (Limit)((Code >> 8) & 0b11); + + /// + /// Status code category. + /// + public readonly StatusCodeCategory Category => (StatusCodeCategory)(Code & StatusCodeHelpers.CATEGORY_MASK); + + /// + /// Whether the value is overflowed. + /// + public readonly bool IsOverflow => (Code & (1 << 7)) != 0; + + /// + /// Multi value flag. + /// + public readonly bool IsMultiValue => (Code & (1 << 4)) != 0; + /// + /// Has extra data flag. + /// + public readonly bool HasExtraData => (Code & (1 << 3)) != 0; + /// + /// Is partial flag. + /// + public readonly bool IsPartial => (Code & (1 << 2)) != 0; + + /// + /// Type of value origin. + /// + public readonly ValueType ValueType => (ValueType)(Code & 0b11); + } + + /// + /// Base status type: good, bad, or uncertain. + /// + public enum Severity + { + /// + /// Status code is good. + /// + Good = 0, + /// + /// Status code is uncertain. + /// + Uncertain = 1, + /// + /// Status code is bad. + /// + Bad = 2 + } + + /// + /// Enum for status code value source. + /// + public enum ValueType + { + /// + /// Value is calculated. + /// + Calculated = 1, + /// + /// Value is interpolated. + /// + Interpolated = 2, + /// + /// Value is a raw value. + /// + Raw = 0, + } + + /// + /// Enum for status code limit types + /// + public enum Limit + { + /// + /// Value is constant + /// + Constant = 0b11, + /// + /// Value is at high limit. + /// + High = 0b10, + /// + /// Value is at low limit. + /// + Low = 0b01, + /// + /// Value is not at a limit. + /// + None = 0b00 + } + + + /// + /// Enum of all status code categories. + /// + public enum StatusCodeCategory : ulong + { +#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member + Good = 0x00000000, + GoodCallAgain = 0x00A90000, + GoodCascade = 0x04090000, + GoodCascadeInitializationAcknowledged = 0x04010000, + GoodCascadeInitializationRequest = 0x04020000, + GoodCascadeNotInvited = 0x04030000, + GoodCascadeNotSelected = 0x04040000, + GoodClamped = 0x00300000, + GoodCommunicationEvent = 0x00A70000, + GoodCompletesAsynchronously = 0x002E0000, + GoodDataIgnored = 0x00D90000, + GoodDependentValueChanged = 0x00E00000, + GoodEdited = 0x00DC0000, + GoodEdited_DependentValueChanged = 0x01160000, + GoodEdited_DominantValueChanged = 0x01170000, + GoodEdited_DominantValueChanged_DependentValueChanged = 0x01180000, + GoodEntryInserted = 0x00A20000, + GoodEntryReplaced = 0x00A30000, + GoodFaultStateActive = 0x04070000, + GoodInitiateFaultState = 0x04080000, + GoodLocalOverride = 0x00960000, + GoodMoreData = 0x00A60000, + GoodNoData = 0x00A50000, + GoodNonCriticalTimeout = 0x00AA0000, + GoodOverload = 0x002F0000, + GoodPostActionFailed = 0x00DD0000, + GoodResultsMayBeIncomplete = 0x00BA0000, + GoodRetransmissionQueueNotSupported = 0x00DF0000, + GoodShutdownEvent = 0x00A80000, + GoodSubscriptionTransferred = 0x002D0000, + Uncertain = 0x40000000, + UncertainConfigurationError = 0x420F0000, + UncertainDataSubNormal = 0x40A40000, + UncertainDependentValueChanged = 0x40E20000, + UncertainDominantValueChanged = 0x40DE0000, + UncertainEngineeringUnitsExceeded = 0x40940000, + UncertainInitialValue = 0x40920000, + UncertainLastUsableValue = 0x40900000, + UncertainNoCommunicationLastUsableValue = 0x408F0000, + UncertainNotAllNodesAvailable = 0x40C00000, + UncertainReferenceNotDeleted = 0x40BC0000, + UncertainReferenceOutOfServer = 0x406C0000, + UncertainSensorCalibration = 0x420A0000, + UncertainSensorNotAccurate = 0x40930000, + UncertainSimulatedValue = 0x42090000, + UncertainSubNormal = 0x40950000, + UncertainSubstituteValue = 0x40910000, + UncertainTransducerInManual = 0x42080000, + Bad = 0x80000000, + BadAggregateConfigurationRejected = 0x80DA0000, + BadAggregateInvalidInputs = 0x80D60000, + BadAggregateListMismatch = 0x80D40000, + BadAggregateNotSupported = 0x80D50000, + BadAlreadyExists = 0x81150000, + BadApplicationSignatureInvalid = 0x80580000, + BadArgumentsMissing = 0x80760000, + BadAttributeIdInvalid = 0x80350000, + BadBoundNotFound = 0x80D70000, + BadBoundNotSupported = 0x80D80000, + BadBrowseDirectionInvalid = 0x804D0000, + BadBrowseNameDuplicated = 0x80610000, + BadBrowseNameInvalid = 0x80600000, + BadCertificateChainIncomplete = 0x810D0000, + BadCertificateHostNameInvalid = 0x80160000, + BadCertificateInvalid = 0x80120000, + BadCertificateIssuerRevocationUnknown = 0x801C0000, + BadCertificateIssuerRevoked = 0x801E0000, + BadCertificateIssuerTimeInvalid = 0x80150000, + BadCertificateIssuerUseNotAllowed = 0x80190000, + BadCertificatePolicyCheckFailed = 0x81140000, + BadCertificateRevocationUnknown = 0x801B0000, + BadCertificateRevoked = 0x801D0000, + BadCertificateTimeInvalid = 0x80140000, + BadCertificateUntrusted = 0x801A0000, + BadCertificateUriInvalid = 0x80170000, + BadCertificateUseNotAllowed = 0x80180000, + BadCommunicationError = 0x80050000, + BadConditionAlreadyDisabled = 0x80980000, + BadConditionAlreadyEnabled = 0x80CC0000, + BadConditionAlreadyShelved = 0x80D10000, + BadConditionBranchAlreadyAcked = 0x80CF0000, + BadConditionBranchAlreadyConfirmed = 0x80D00000, + BadConditionDisabled = 0x80990000, + BadConditionNotShelved = 0x80D20000, + BadConfigurationError = 0x80890000, + BadConnectionClosed = 0x80AE0000, + BadConnectionRejected = 0x80AC0000, + BadContentFilterInvalid = 0x80480000, + BadContinuationPointInvalid = 0x804A0000, + BadDataEncodingInvalid = 0x80380000, + BadDataEncodingUnsupported = 0x80390000, + BadDataLost = 0x809D0000, + BadDataTypeIdUnknown = 0x80110000, + BadDataUnavailable = 0x809E0000, + BadDeadbandFilterInvalid = 0x808E0000, + BadDecodingError = 0x80070000, + BadDependentValueChanged = 0x80E30000, + BadDeviceFailure = 0x808B0000, + BadDialogNotActive = 0x80CD0000, + BadDialogResponseInvalid = 0x80CE0000, + BadDisconnect = 0x80AD0000, + BadDiscoveryUrlMissing = 0x80510000, + BadDominantValueChanged = 0x80E10000, + BadDuplicateReferenceNotAllowed = 0x80660000, + BadEdited_OutOfRange = 0x81190000, + BadEdited_OutOfRange_DominantValueChanged = 0x811C0000, + BadEdited_OutOfRange_DominantValueChanged_DependentValueChanged = 0x811E0000, + BadEncodingError = 0x80060000, + BadEncodingLimitsExceeded = 0x80080000, + BadEndOfStream = 0x80B00000, + BadEntryExists = 0x809F0000, + BadEventFilterInvalid = 0x80470000, + BadEventIdUnknown = 0x809A0000, + BadEventNotAcknowledgeable = 0x80BB0000, + BadExpectedStreamToBlock = 0x80B40000, + BadFilterElementInvalid = 0x80C40000, + BadFilterLiteralInvalid = 0x80C50000, + BadFilterNotAllowed = 0x80450000, + BadFilterOperandCountMismatch = 0x80C30000, + BadFilterOperandInvalid = 0x80490000, + BadFilterOperatorInvalid = 0x80C10000, + BadFilterOperatorUnsupported = 0x80C20000, + BadHistoryOperationInvalid = 0x80710000, + BadHistoryOperationUnsupported = 0x80720000, + BadIdentityChangeNotSupported = 0x80C60000, + BadIdentityTokenInvalid = 0x80200000, + BadIdentityTokenRejected = 0x80210000, + BadIndexRangeInvalid = 0x80360000, + BadIndexRangeNoData = 0x80370000, + BadInitialValue_OutOfRange = 0x811A0000, + BadInsufficientClientProfile = 0x807C0000, + BadInternalError = 0x80020000, + BadInvalidArgument = 0x80AB0000, + BadInvalidSelfReference = 0x80670000, + BadInvalidState = 0x80AF0000, + BadInvalidTimestamp = 0x80230000, + BadInvalidTimestampArgument = 0x80BD0000, + BadLicenseExpired = 0x810E0000, + BadLicenseLimitsExceeded = 0x810F0000, + BadLicenseNotAvailable = 0x81100000, + BadMaxAgeInvalid = 0x80700000, + BadMaxConnectionsReached = 0x80B70000, + BadMessageNotAvailable = 0x807B0000, + BadMethodInvalid = 0x80750000, + BadMonitoredItemFilterInvalid = 0x80430000, + BadMonitoredItemFilterUnsupported = 0x80440000, + BadMonitoredItemIdInvalid = 0x80420000, + BadMonitoringModeInvalid = 0x80410000, + BadNoCommunication = 0x80310000, + BadNoContinuationPoints = 0x804B0000, + BadNoData = 0x809B0000, + BadNoDataAvailable = 0x80B10000, + BadNoDeleteRights = 0x80690000, + BadNoEntryExists = 0x80A00000, + BadNoMatch = 0x806F0000, + BadNoSubscription = 0x80790000, + BadNoValidCertificates = 0x80590000, + BadNodeAttributesInvalid = 0x80620000, + BadNodeClassInvalid = 0x805F0000, + BadNodeIdExists = 0x805E0000, + BadNodeIdInvalid = 0x80330000, + BadNodeIdRejected = 0x805D0000, + BadNodeIdUnknown = 0x80340000, + BadNodeNotInView = 0x804E0000, + BadNonceInvalid = 0x80240000, + BadNotConnected = 0x808A0000, + BadNotExecutable = 0x81110000, + BadNotFound = 0x803E0000, + BadNotImplemented = 0x80400000, + BadNotReadable = 0x803A0000, + BadNotSupported = 0x803D0000, + BadNotTypeDefinition = 0x80C80000, + BadNotWritable = 0x803B0000, + BadNothingToDo = 0x800F0000, + BadNumericOverflow = 0x81120000, + BadObjectDeleted = 0x803F0000, + BadOperationAbandoned = 0x80B30000, + BadOutOfMemory = 0x80030000, + BadOutOfRange = 0x803C0000, + BadOutOfRange_DominantValueChanged = 0x811B0000, + BadOutOfRange_DominantValueChanged_DependentValueChanged = 0x811D0000, + BadOutOfService = 0x808D0000, + BadParentNodeIdInvalid = 0x805B0000, + BadProtocolVersionUnsupported = 0x80BE0000, + BadQueryTooComplex = 0x806E0000, + BadReferenceLocalOnly = 0x80680000, + BadReferenceNotAllowed = 0x805C0000, + BadReferenceTypeIdInvalid = 0x804C0000, + BadRefreshInProgress = 0x80970000, + BadRequestCancelledByClient = 0x802C0000, + BadRequestCancelledByRequest = 0x805A0000, + BadRequestHeaderInvalid = 0x802A0000, + BadRequestInterrupted = 0x80840000, + BadRequestNotAllowed = 0x80E40000, + BadRequestNotComplete = 0x81130000, + BadRequestTimeout = 0x80850000, + BadRequestTooLarge = 0x80B80000, + BadRequestTypeInvalid = 0x80530000, + BadResourceUnavailable = 0x80040000, + BadResponseTooLarge = 0x80B90000, + BadSecureChannelClosed = 0x80860000, + BadSecureChannelIdInvalid = 0x80220000, + BadSecureChannelTokenUnknown = 0x80870000, + BadSecurityChecksFailed = 0x80130000, + BadSecurityModeInsufficient = 0x80E60000, + BadSecurityModeRejected = 0x80540000, + BadSecurityPolicyRejected = 0x80550000, + BadSempahoreFileMissing = 0x80520000, + BadSensorFailure = 0x808C0000, + BadSequenceNumberInvalid = 0x80880000, + BadSequenceNumberUnknown = 0x807A0000, + BadServerHalted = 0x800E0000, + BadServerIndexInvalid = 0x806A0000, + BadServerNameMissing = 0x80500000, + BadServerNotConnected = 0x800D0000, + BadServerUriInvalid = 0x804F0000, + BadServiceUnsupported = 0x800B0000, + BadSessionClosed = 0x80260000, + BadSessionIdInvalid = 0x80250000, + BadSessionNotActivated = 0x80270000, + BadShelvingTimeOutOfRange = 0x80D30000, + BadShutdown = 0x800C0000, + BadSourceNodeIdInvalid = 0x80640000, + BadStateNotActive = 0x80BF0000, + BadStructureMissing = 0x80460000, + BadSubscriptionIdInvalid = 0x80280000, + BadSyntaxError = 0x80B60000, + BadTargetNodeIdInvalid = 0x80650000, + BadTcpEndpointUrlInvalid = 0x80830000, + BadTcpInternalError = 0x80820000, + BadTcpMessageTooLarge = 0x80800000, + BadTcpMessageTypeInvalid = 0x807E0000, + BadTcpNotEnoughResources = 0x80810000, + BadTcpSecureChannelUnknown = 0x807F0000, + BadTcpServerTooBusy = 0x807D0000, + BadTicketInvalid = 0x81200000, + BadTicketRequired = 0x811F0000, + BadTimeout = 0x800A0000, + BadTimestampNotSupported = 0x80A10000, + BadTimestampsToReturnInvalid = 0x802B0000, + BadTooManyArguments = 0x80E50000, + BadTooManyMatches = 0x806D0000, + BadTooManyMonitoredItems = 0x80DB0000, + BadTooManyOperations = 0x80100000, + BadTooManyPublishRequests = 0x80780000, + BadTooManySessions = 0x80560000, + BadTooManySubscriptions = 0x80770000, + BadTypeDefinitionInvalid = 0x80630000, + BadTypeMismatch = 0x80740000, + BadUnexpectedError = 0x80010000, + BadUnknownResponse = 0x80090000, + BadUserAccessDenied = 0x801F0000, + BadUserSignatureInvalid = 0x80570000, + BadViewIdUnknown = 0x806B0000, + BadViewParameterMismatch = 0x80CA0000, + BadViewTimestampInvalid = 0x80C90000, + BadViewVersionInvalid = 0x80CB0000, + BadWaitingForInitialData = 0x80320000, + BadWaitingForResponse = 0x80B20000, + BadWouldBlock = 0x80B50000, + BadWriteNotSupported = 0x80730000 +#pragma warning restore CS1591 // Missing XML comment for publicly visible type or member + } +} + diff --git a/ExtractorUtils.Test/unit/StatusCodeTests.cs b/ExtractorUtils.Test/unit/StatusCodeTests.cs new file mode 100644 index 00000000..b266967a --- /dev/null +++ b/ExtractorUtils.Test/unit/StatusCodeTests.cs @@ -0,0 +1,31 @@ +using Cognite.Extensions.Alpha; +using Xunit; + +namespace ExtractorUtils.Test.Unit +{ + public class StatusCodeTests + { + [Fact] + public void TestFromCategory() + { + // Just check that this doesn't fail. + Assert.Equal("Good", StatusCode.FromCategory(StatusCodeCategory.Good).ToString()); + Assert.Equal("BadBrowseDirectionInvalid", StatusCode.FromCategory(StatusCodeCategory.BadBrowseDirectionInvalid).ToString()); + Assert.Equal("UncertainDataSubNormal", StatusCode.FromCategory(StatusCodeCategory.UncertainDataSubNormal).ToString()); + } + + [Fact] + public void TestParse() + { + Assert.Equal("Good, StructureChanged, Calculated", StatusCode.Parse("Good, StructureChanged, Calculated").ToString()); + Assert.Equal("UncertainSensorCalibration, Overflow, ExtraData", + StatusCode.Parse("UncertainSensorCalibration, Overflow, ExtraData").ToString()); + + Assert.Equal("Good", StatusCode.Create(0).ToString()); + + Assert.Throws(() => StatusCode.Create(12345)); + Assert.Throws(() => StatusCode.Parse("Bad, Whoop")); + } + + } +} \ No newline at end of file diff --git a/ExtractorUtils/Cognite/CogniteDestination.cs b/ExtractorUtils/Cognite/CogniteDestination.cs index 2083a80a..0eb3bb02 100644 --- a/ExtractorUtils/Cognite/CogniteDestination.cs +++ b/ExtractorUtils/Cognite/CogniteDestination.cs @@ -395,7 +395,7 @@ public async Task> InsertDataPointsAsync( { if (points == null || !points.Any()) return new CogniteResult(null); - _logger.LogDebug("Uploading {Number} data points to CDF for {NumberTs} time series", + _logger.LogDebug("Uploading {Number} data points to CDF for {NumberTs} time series", points.Values.Select(dp => dp.Count()).Sum(), points.Keys.Count); return await DataPointExtensions.InsertAsync( @@ -413,6 +413,46 @@ public async Task> InsertDataPointsAsync( token).ConfigureAwait(false); } + /// + /// ALPHA: Insert the provided data points into CDF. The data points are chunked + /// according to and trimmed according to the + /// CDF limits. + /// The dictionary keys are time series identities (Id or ExternalId) and the values are numeric or string data points + /// + /// On error, the offending timeseries/datapoints can optionally be removed. + /// + /// This version includes alpha support for status codes. + /// + /// Data points + /// + /// + /// Cancellation token + public async Task> AlphaInsertDataPointsAsync( + IDictionary>? points, + SanitationMode sanitationMode, + RetryMode retryMode, + CancellationToken token) + { + if (points == null || !points.Any()) return new CogniteResult(null); + + _logger.LogDebug("Uploading {Number} data points to CDF for {NumberTs} time series", + points.Values.Select(dp => dp.Count()).Sum(), + points.Keys.Count); + return await Extensions.Alpha.DataPointExtensions.InsertAsync( + _client, + points, + _config.CdfChunking.DataPointTimeSeries, + _config.CdfChunking.DataPoints, + _config.CdfThrottling.DataPoints, + _config.CdfChunking.TimeSeries, + _config.CdfThrottling.TimeSeries, + _config.CdfChunking.DataPointsGzipLimit, + sanitationMode, + retryMode, + _config.NanReplacement, + token).ConfigureAwait(false); + } + /// /// Insert datapoints to timeseries. Insertions are chunked and cleaned according to configuration, /// and can optionally handle errors. If any timeseries missing from the result and inserted by externalId, @@ -449,6 +489,44 @@ public async Task> InsertDataPointsAsync( token).ConfigureAwait(false); } + /// + /// ALPHA: Insert datapoints to timeseries. Insertions are chunked and cleaned according to configuration, + /// and can optionally handle errors. If any timeseries missing from the result and inserted by externalId, + /// they are created before the points are inserted again. + /// + /// This version includes alpha support for status codes. + /// + /// Datapoints to insert + /// How to sanitize datapoints + /// How to handle retries + /// Optional data set id + /// Cancellation token + /// Results with a list of errors. If TimeSeriesResult is null, no timeseries were attempted created. + public async Task<(CogniteResult DataPointResult, CogniteResult? TimeSeriesResult)> AlphaInsertDataPointsCreateMissingAsync( + IDictionary>? points, + SanitationMode sanitationMode, + RetryMode retryMode, + long? dataSetId, + CancellationToken token) + { + if (points == null || !points.Any()) return (new CogniteResult(null), null); + + return await Extensions.Alpha.DataPointExtensions.InsertAsyncCreateMissing( + _client, + points, + _config.CdfChunking.DataPointTimeSeries, + _config.CdfChunking.DataPoints, + _config.CdfThrottling.DataPoints, + _config.CdfChunking.TimeSeries, + _config.CdfThrottling.TimeSeries, + _config.CdfChunking.DataPointsGzipLimit, + sanitationMode, + retryMode, + _config.NanReplacement, + dataSetId, + token).ConfigureAwait(false); + } + /// /// Deletes ranges of data points in CDF. The parameter contains the first (inclusive) /// and last (inclusive) timestamps for the range. After the delete request is sent to CDF, attempt to confirm that @@ -466,7 +544,7 @@ public async Task DeleteDataPointsIgnoreErrorsAsync( { return new DeleteError(new List(), new List()); } - _logger.LogDebug("Deleting data points in CDF for {NumberTs} time series", + _logger.LogDebug("Deleting data points in CDF for {NumberTs} time series", ranges.Keys.Count); var errors = await _client.DataPoints.DeleteIgnoreErrorsAsync( ranges, @@ -475,7 +553,7 @@ public async Task DeleteDataPointsIgnoreErrorsAsync( _config.CdfThrottling.DataPoints, _config.CdfThrottling.DataPoints, token).ConfigureAwait(false); - _logger.LogDebug("During deletion, {NumMissing} ids where not found and {NumNotConfirmed} range deletions could not be confirmed", + _logger.LogDebug("During deletion, {NumMissing} ids where not found and {NumNotConfirmed} range deletions could not be confirmed", errors.IdsNotFound.Count(), errors.IdsDeleteNotConfirmed.Count()); return errors; } @@ -533,8 +611,8 @@ await _client.Raw.InsertRowsAsync( /// DTO type /// Task public async Task InsertRawRowsAsync( - string database, - string table, + string database, + string table, IDictionary? rows, CancellationToken token) { @@ -542,7 +620,7 @@ public async Task InsertRawRowsAsync( { return; } - _logger.LogDebug("Uploading {Number} rows to CDF Raw. Database: {Db}. Table: {Table}", + _logger.LogDebug("Uploading {Number} rows to CDF Raw. Database: {Db}. Table: {Table}", rows.Count, database, table); diff --git a/version b/version index b57fc722..815d5ca0 100644 --- a/version +++ b/version @@ -1 +1 @@ -1.18.2 +1.19.0