Skip to content

Commit

Permalink
Do not skip bad datapoints (#416)
Browse files Browse the repository at this point in the history
* Do not skip bad datapoints

* Bump version
  • Loading branch information
einarmo authored May 8, 2024
1 parent 5696958 commit 4ca72b5
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 35 deletions.
8 changes: 4 additions & 4 deletions Cognite.Extensions/TimeSeries/DataPointExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ public static DataPointInsertionRequest ToInsertRequest(this IDictionary<Identit
}
else
{
item.ExternalId = kvp.Key.ExternalId.ToString();
item.ExternalId = kvp.Key.ExternalId;
}
if (!kvp.Value.Any())
{
continue;
}
var stringPoints = kvp.Value
.Where(dp => dp.IsString && dp.Status.IsGood)
.Where(dp => dp.IsString)
.Select(dp => new StringDatapoint
{
Timestamp = dp.Timestamp,
Expand All @@ -68,7 +68,7 @@ public static DataPointInsertionRequest ToInsertRequest(this IDictionary<Identit
}
});
var numericPoints = kvp.Value
.Where(dp => !dp.IsString && dp.Status.IsGood)
.Where(dp => !dp.IsString)
.Select(dp => new NumericDatapoint
{
Timestamp = dp.Timestamp,
Expand Down Expand Up @@ -284,7 +284,7 @@ private static async Task<CogniteResult<DataPointInsertError>> InsertDataPointsH
try
{
bool useGzip = false;
int count = points.Sum(kvp => kvp.Value.Count());
int count = request.Items.Sum(r => r.NumericDatapoints?.Datapoints?.Count ?? 0 + r.StringDatapoints?.Datapoints?.Count ?? 0);
if (gzipCountLimit >= 0 && count >= gzipCountLimit)
{
useGzip = true;
Expand Down
15 changes: 13 additions & 2 deletions Cognite.Extensions/TimeSeries/DataPointSanitation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ public static (IDictionary<Identity, IEnumerable<Datapoint>>, IEnumerable<Cognit

foreach (var kvp in points)
{
if (!kvp.Value.Any()) continue;

var isString = kvp.Value.First().IsString;

var id = kvp.Key;
var dps = kvp.Value;

Expand All @@ -117,6 +121,13 @@ public static (IDictionary<Identity, IEnumerable<Datapoint>>, IEnumerable<Cognit

foreach (var dp in dps)
{
if (dp.IsString != isString)
{
badDps.Add((ResourceType.DataPointValue, dp));
CdfMetrics.DatapointsSkipped.Inc();
continue;
}

var cleanDp = dp;
if (mode == SanitationMode.Clean)
{
Expand All @@ -142,7 +153,7 @@ public static (IDictionary<Identity, IEnumerable<Datapoint>>, IEnumerable<Cognit
{
CdfMetrics.DatapointTimeseriesSkipped.Inc();
}
if (badDps.Any())
if (badDps.Count > 0)
{
badDpGroups.AddRange(badDps
.GroupBy(pair => pair.type)
Expand All @@ -152,7 +163,7 @@ public static (IDictionary<Identity, IEnumerable<Datapoint>>, IEnumerable<Cognit

IEnumerable<CogniteError<DataPointInsertError>> errors;

if (badDpGroups.Any())
if (badDpGroups.Count > 0)
{
errors = badDpGroups
.GroupBy(group => group.type)
Expand Down
65 changes: 37 additions & 28 deletions ExtractorUtils.Test/unit/DatapointsTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
using Xunit;
using Xunit.Abstractions;
using TimeRange = Cognite.Extractor.Common.TimeRange;
using StatusCode = Cognite.Extensions.StatusCode;

namespace ExtractorUtils.Test.Unit
{
Expand Down Expand Up @@ -67,12 +68,13 @@ public async Task TestInsertDataPoints()
services.AddConfig<BaseConfig>(path, 2);
services.AddTestLogging(_output);
services.AddCogniteClient("testApp");
using (var provider = services.BuildServiceProvider()) {
using (var provider = services.BuildServiceProvider())
{
var cogniteDestination = provider.GetRequiredService<CogniteDestination>();

double[] doublePoints = { 0.0, 1.1, 2.2, double.NaN, 3.3, 4.4, double.NaN, 5.5, double.NegativeInfinity };
string[] stringPoints = { "0", null, "1", new string('!', CogniteUtils.StringLengthMax), new string('2', CogniteUtils.StringLengthMax + 1), "3"};
string[] stringPoints = { "0", null, "1", new string('!', CogniteUtils.StringLengthMax), new string('2', CogniteUtils.StringLengthMax + 1), "3" };

var datapoints = new Dictionary<Identity, IEnumerable<Datapoint>>() {
{ new Identity("A"), new Datapoint[] { new Datapoint(DateTime.UtcNow, "1"), new Datapoint(DateTime.UtcNow, "2") }},
{ new Identity(1), doublePoints.Select(d => new Datapoint(DateTime.UtcNow, d))},
Expand Down Expand Up @@ -157,9 +159,10 @@ public async Task TestDeleteDataPoints()
services.AddConfig<BaseConfig>(path, 2);
services.AddTestLogging(_output);
services.AddCogniteClient("testApp");
using (var provider = services.BuildServiceProvider()) {
using (var provider = services.BuildServiceProvider())
{
var cogniteDestination = provider.GetRequiredService<CogniteDestination>();

var ranges = new Dictionary<Identity, IEnumerable<TimeRange>>() {
{ new Identity("A"), new TimeRange[] { new TimeRange(DateTime.UtcNow - TimeSpan.FromDays(2), DateTime.UtcNow),
new TimeRange(DateTime.UtcNow - TimeSpan.FromDays(4), DateTime.UtcNow - TimeSpan.FromDays(2))}},
Expand All @@ -169,28 +172,28 @@ public async Task TestDeleteDataPoints()
var errors = await cogniteDestination.DeleteDataPointsIgnoreErrorsAsync(
ranges,
CancellationToken.None);

mockHttpMessageHandler.Protected()
.Verify<Task<HttpResponseMessage>>(
"SendAsync",
"SendAsync",
Times.Exactly(1), // 1 delete
ItExpr.IsAny<HttpRequestMessage>(),
ItExpr.IsAny<CancellationToken>());

Assert.Empty(errors.IdsDeleteNotConfirmed);
Assert.Empty(errors.IdsNotFound);

ranges.Add(new Identity("missing-C"), new TimeRange[] { new TimeRange(DateTime.UtcNow - TimeSpan.FromDays(2), DateTime.UtcNow)});
ranges.Add(new Identity("D"), new TimeRange[] { new TimeRange(DateTime.UtcNow - TimeSpan.FromDays(2), DateTime.UtcNow)});
ranges.Add(new Identity("nc-E"), new TimeRange[] { new TimeRange(DateTime.UtcNow - TimeSpan.FromDays(2), DateTime.UtcNow)});
ranges.Add(new Identity("missing-F"), new TimeRange[] { new TimeRange(DateTime.UtcNow - TimeSpan.FromDays(2), DateTime.UtcNow)});
ranges.Add(new Identity("G"), new TimeRange[] { new TimeRange(DateTime.UtcNow - TimeSpan.FromDays(2), DateTime.UtcNow)});
ranges.Add(new Identity("nc-H"), new TimeRange[] { new TimeRange(DateTime.UtcNow - TimeSpan.FromDays(2), DateTime.UtcNow)});
ranges.Add(new Identity("missing-C"), new TimeRange[] { new TimeRange(DateTime.UtcNow - TimeSpan.FromDays(2), DateTime.UtcNow) });
ranges.Add(new Identity("D"), new TimeRange[] { new TimeRange(DateTime.UtcNow - TimeSpan.FromDays(2), DateTime.UtcNow) });
ranges.Add(new Identity("nc-E"), new TimeRange[] { new TimeRange(DateTime.UtcNow - TimeSpan.FromDays(2), DateTime.UtcNow) });
ranges.Add(new Identity("missing-F"), new TimeRange[] { new TimeRange(DateTime.UtcNow - TimeSpan.FromDays(2), DateTime.UtcNow) });
ranges.Add(new Identity("G"), new TimeRange[] { new TimeRange(DateTime.UtcNow - TimeSpan.FromDays(2), DateTime.UtcNow) });
ranges.Add(new Identity("nc-H"), new TimeRange[] { new TimeRange(DateTime.UtcNow - TimeSpan.FromDays(2), DateTime.UtcNow) });

errors = await cogniteDestination.DeleteDataPointsIgnoreErrorsAsync(
ranges,
CancellationToken.None);

Assert.Contains(new Identity("missing-C"), errors.IdsNotFound);
Assert.Contains(new Identity("missing-F"), errors.IdsNotFound);
}
Expand Down Expand Up @@ -262,7 +265,8 @@ public async Task TestUploadQueue()
}))
{
queue.AddStateStorage(stateMap, stateStore, "test-states");
var enqueueTask = Task.Run(async () => {
var enqueueTask = Task.Run(async () =>
{
while (index < 13)
{
var dps = uploadGenerator(index);
Expand Down Expand Up @@ -292,7 +296,8 @@ public async Task TestUploadQueue()
}))
{
queue.AddStateStorage(stateMap, stateStore, "test-states");
var enqueueTask = Task.Run(async () => {
var enqueueTask = Task.Run(async () =>
{
while (index < 23)
{
var dps = uploadGenerator(index);
Expand Down Expand Up @@ -398,11 +403,11 @@ public async Task TestUploadQueueBuffer()
System.IO.File.Delete(path);
}


#region mock
private Dictionary<string, List<Datapoint>> _createdDataPoints = new Dictionary<string, List<Datapoint>>();

private async Task<HttpResponseMessage> mockInsertDataPointsAsync(HttpRequestMessage message , CancellationToken token) {
private async Task<HttpResponseMessage> mockInsertDataPointsAsync(HttpRequestMessage message, CancellationToken token)
{
var uri = message.RequestUri.ToString();

var responseBody = "{ }";
Expand Down Expand Up @@ -439,7 +444,7 @@ private async Task<HttpResponseMessage> mockInsertDataPointsAsync(HttpRequestMes
.Select(i => i.DatapointTypeCase == DataPointInsertionItem.DatapointTypeOneofCase.NumericDatapoints ?
i.NumericDatapoints.Datapoints.Count : i.StringDatapoints.Datapoints.Count)
.Sum() <= 4); // data-points chunk size




Expand Down Expand Up @@ -469,7 +474,8 @@ private async Task<HttpResponseMessage> mockInsertDataPointsAsync(HttpRequestMes
{
mismatchedResponse.error.message = "Expected string value for datapoint";
}
else {
else
{
mismatchedResponse.error.message = "Expected numeric value for datapoint";
}
break;
Expand All @@ -480,11 +486,12 @@ private async Task<HttpResponseMessage> mockInsertDataPointsAsync(HttpRequestMes
statusCode = HttpStatusCode.BadRequest;
responseBody = JsonConvert.SerializeObject(mismatchedResponse);
}
else if (missingResponse.error.missing.Count > 0) {
else if (missingResponse.error.missing.Count > 0)
{
statusCode = HttpStatusCode.BadRequest;
responseBody = JsonConvert.SerializeObject(missingResponse);
}
else
else
{
foreach (var item in data.Items)
{
Expand Down Expand Up @@ -514,14 +521,15 @@ private async Task<HttpResponseMessage> mockInsertDataPointsAsync(HttpRequestMes
var response = new HttpResponseMessage
{
StatusCode = statusCode,
Content = new StringContent(responseBody)
Content = new StringContent(responseBody)
};
response.Content.Headers.ContentType = new MediaTypeHeaderValue("application/json");
response.Headers.Add("x-request-id", "1");
return response;
}

private static async Task<HttpResponseMessage> mockDeleteDataPointsAsync(HttpRequestMessage message , CancellationToken token) {
private static async Task<HttpResponseMessage> mockDeleteDataPointsAsync(HttpRequestMessage message, CancellationToken token)
{
var uri = message.RequestUri.ToString();
HttpContent responseBody = new StringContent("{ }");
var statusCode = HttpStatusCode.OK;
Expand All @@ -530,7 +538,8 @@ private static async Task<HttpResponseMessage> mockDeleteDataPointsAsync(HttpReq
var ids = JsonConvert.DeserializeObject<dynamic>(content);
IEnumerable<dynamic> items = ids.items;

if (uri.Contains($"{_project}/timeseries/data/list")) {
if (uri.Contains($"{_project}/timeseries/data/list"))
{
Assert.True(items.Count() <= 2);
DataPointListResponse dpList = new DataPointListResponse();
foreach (var item in items)
Expand All @@ -541,7 +550,7 @@ private static async Task<HttpResponseMessage> mockDeleteDataPointsAsync(HttpReq
dp.NumericDatapoints = new NumericDatapoints();
dpList.Items.Add(dp);
}
using(MemoryStream stream = new MemoryStream())
using (MemoryStream stream = new MemoryStream())
{
dpList.WriteTo(stream);
responseBody = new ByteArrayContent(stream.ToArray());
Expand Down Expand Up @@ -579,7 +588,7 @@ private static async Task<HttpResponseMessage> mockDeleteDataPointsAsync(HttpReq
var response = new HttpResponseMessage
{
StatusCode = statusCode,
Content = responseBody
Content = responseBody
};
response.Content.Headers.ContentType = new MediaTypeHeaderValue("application/json");
response.Headers.Add("x-request-id", "1");
Expand Down
2 changes: 1 addition & 1 deletion version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.23.1
1.23.2

0 comments on commit 4ca72b5

Please sign in to comment.