Skip to content

Commit

Permalink
Allow building the extractor without a cognite destination (#128)
Browse files Browse the repository at this point in the history
While the utils are really built for pushing to CDF, it is sometimes useful to allow building the extractor without a CogniteDestination, for testing or other purposes. This is a fix that makes it possible to not pass cognite configuration without extractor startup failing.
  • Loading branch information
einarmo authored Oct 12, 2021
1 parent 205c45c commit 65e28f7
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 37 deletions.
2 changes: 2 additions & 0 deletions Cognite.Extensions/CogniteUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
using System.IO;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging.Abstractions;

namespace Cognite.Extensions
{
Expand Down Expand Up @@ -545,6 +546,7 @@ public static IAsyncPolicy<HttpResponseMessage> GetRetryPolicy(ILogger logger,
int? maxRetries,
int? maxDelay)
{
logger = logger ?? new NullLogger<Client>();
int numRetries = maxRetries ?? 5;
int delay = maxDelay ?? 5_000;
if (maxDelay < 0) maxDelay = int.MaxValue;
Expand Down
4 changes: 2 additions & 2 deletions ExampleExtractor/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@

class MyExtractor : BaseExtractor
{
public MyExtractor(BaseConfig config, CogniteDestination destination, IServiceProvider provider)
: base(config, destination, provider)
public MyExtractor(BaseConfig config, IServiceProvider provider, CogniteDestination destination)
: base(config, provider, destination)
{
}

Expand Down
69 changes: 67 additions & 2 deletions ExtractorUtils.Test/integration/ExtractorTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Threading;
using System.Threading.Tasks;
using Xunit;
using Xunit.Abstractions;

namespace ExtractorUtils.Test.Integration
{
Expand All @@ -29,7 +30,7 @@ class TestExtractor : BaseExtractor
public string DBName { get; private set; }
public string TableName { get; private set; }
private string _prefix;
public TestExtractor(MyConfig config, CogniteDestination destination, IServiceProvider provider) : base(config, destination, provider)
public TestExtractor(MyConfig config, CogniteDestination destination, IServiceProvider provider) : base(config, provider, destination)
{
_prefix = config.Prefix;
}
Expand Down Expand Up @@ -125,9 +126,30 @@ await Destination.CogniteClient.Events.DeleteAsync(new EventDelete
}
}

class NoCdfExtractor : BaseExtractor
{
public bool Started { get; private set; }
public int Iter { get; private set; }
public NoCdfExtractor(MyConfig config, IServiceProvider provider) : base(config, provider)
{
}

public class ExtractorTest
protected override Task Start()
{
Started = true;
Scheduler.SchedulePeriodicTask(null, TimeSpan.FromMilliseconds(100), token => Iter++);
Assert.Null(Destination);
return Task.CompletedTask;
}
}

public class ExtractorTest : ConsoleWrapper
{
public ExtractorTest(ITestOutputHelper output) : base(output)
{

}

[Fact(Timeout = 30000)]
public async Task TestExtractorRun()
{
Expand Down Expand Up @@ -303,5 +325,48 @@ await tester.Destination.CogniteClient.ExtPipes.DeleteAsync(new ExtPipeDelete
});
}
}

[Fact]
public async Task TestExtractorWithoutCDF()
{
var cfg = new MyConfig();
cfg.GenerateDefaults();

NoCdfExtractor ext = null;

using var source = new CancellationTokenSource();

var task = ExtractorRunner.Run<MyConfig, NoCdfExtractor>(
null,
null,
null,
null,
true,
true,
true,
false,
source.Token,
(dest, extractor) =>
{
Assert.Null(dest.CogniteClient);
ext = extractor;
},
null,
null,
null,
cfg,
false
);

await Task.Delay(500);

Assert.NotNull(ext);
Assert.True(ext.Started);
Assert.True(ext.Iter > 2);

source.Cancel();

await task;
}
}
}
70 changes: 50 additions & 20 deletions ExtractorUtils/BaseExtractor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,15 @@ public abstract class BaseExtractor : IDisposable, IAsyncDisposable
/// <param name="run">Optional extraction run</param>
public BaseExtractor(
BaseConfig config,
CogniteDestination destination,
IServiceProvider provider,
CogniteDestination destination = null,
ExtractionRun run = null)
{
Config = config;
Destination = destination;
if (destination?.CogniteClient != null)
{
Destination = destination;
}
Provider = provider;
Run = run;
_logger = provider.GetService<ILogger<BaseExtractor>>();
Expand All @@ -84,7 +87,10 @@ public BaseExtractor(
/// <returns>Task</returns>
protected virtual async Task TestConfig()
{
await Destination.TestCogniteConfig(Source.Token).ConfigureAwait(false);
if (Destination != null)
{
await Destination.TestCogniteConfig(Source.Token).ConfigureAwait(false);
}
}

/// <summary>
Expand Down Expand Up @@ -163,6 +169,7 @@ protected void CreateRawQueue<T>(
TimeSpan uploadInterval,
Func<QueueUploadResult<(string key, T columns)>, Task> callback)
{
if (Destination == null) throw new InvalidOperationException("Creating queues requires Destination");
string name = $"{dbName}-{tableName}";
if (RawUploadQueues.ContainsKey(($"{name}", typeof(T))))
throw new InvalidOperationException($"Upload queue with type {typeof(T)}" +
Expand Down Expand Up @@ -220,6 +227,7 @@ protected void CreateTimeseriesQueue(
Func<QueueUploadResult<(Identity id, Datapoint dp)>, Task> callback,
string bufferPath = null)
{
if (Destination == null) throw new InvalidOperationException("Creating queues requires Destination");
if (TSUploadQueue != null) throw new InvalidOperationException("Timeseries upload queue already created");
TSUploadQueue = Destination.CreateTimeSeriesUploadQueue(
uploadInterval,
Expand Down Expand Up @@ -261,6 +269,7 @@ protected void CreateEventQueue(
Func<QueueUploadResult<EventCreate>, Task> callback,
string bufferPath = null)
{
if (Destination == null) throw new InvalidOperationException("Creating queues requires Destination");
if (EventUploadQueue != null) throw new InvalidOperationException("Event upload queue already created");
EventUploadQueue = Destination.CreateEventUploadQueue(
uploadInterval,
Expand Down Expand Up @@ -298,21 +307,33 @@ protected virtual void Dispose(bool disposing)
{
if (disposing)
{
try
if (Scheduler != null)
{
// Cannot be allowed to fail here
Scheduler.ExitAllAndWait().Wait();
} catch { }
Scheduler.Dispose();
try
{
// Cannot be allowed to fail here
Scheduler.ExitAllAndWait().Wait();
}
catch { }
Scheduler.Dispose();
Scheduler = null;
}
EventUploadQueue?.Dispose();
EventUploadQueue = null;
TSUploadQueue?.Dispose();
TSUploadQueue = null;
foreach (var queue in RawUploadQueues.Values)
{
queue.Dispose();
}
RawUploadQueues.Clear();
Source.Cancel();
Source.Dispose();

if (Source != null)
{
Source.Cancel();
Source.Dispose();
Source = null;
}
}
}

Expand All @@ -321,24 +342,33 @@ protected virtual void Dispose(bool disposing)
/// </summary>
protected virtual async ValueTask DisposeAsyncCore()
{
try
{
await Scheduler.ExitAllAndWait().ConfigureAwait(false);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error terminating scheduler: {msg}", ex.Message);
if (Scheduler != null) {
try
{
await Scheduler.ExitAllAndWait().ConfigureAwait(false);
}
catch (Exception ex)
{
_logger?.LogError(ex, "Error terminating scheduler: {msg}", ex.Message);
}
Scheduler.Dispose();
Scheduler = null;
}
Scheduler.Dispose();
if (EventUploadQueue != null) await EventUploadQueue.DisposeAsync().ConfigureAwait(false);
EventUploadQueue = null;
if (TSUploadQueue != null) await TSUploadQueue.DisposeAsync().ConfigureAwait(false);
TSUploadQueue = null;
foreach (var queue in RawUploadQueues.Values)
{
if (queue != null) await queue.DisposeAsync().ConfigureAwait(false);
}
RawUploadQueues.Clear();
Source.Cancel();
Source.Dispose();
if (Source != null)
{
Source.Cancel();
Source.Dispose();
Source = null;
}
}

/// <summary>
Expand Down
9 changes: 6 additions & 3 deletions ExtractorUtils/Cognite/DestinationUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,22 @@ public static class DestinationUtils
/// to report metrics on the number and duration of API requests</param>
/// <param name="setHttpClient">Default true. If false CogniteSdk Client.Builder is not added to the
/// <see cref="ServiceCollection"/>. If this is false it must be added before this method is called.</param>
/// <param name="required">True to fail if cognite configuration is missing</param>
public static void AddCogniteClient(this IServiceCollection services,
string appId,
string userAgent = null,
bool setLogger = false,
bool setMetrics = false,
bool setHttpClient = true)
bool setHttpClient = true,
bool required = true)
{
if (setHttpClient)
{
services.AddHttpClient<Client.Builder>(c => c.Timeout = Timeout.InfiniteTimeSpan)
.AddPolicyHandler((provider, message) =>
{
var retryConfig = provider.GetService<CogniteConfig>()?.CdfRetries;
return CogniteExtensions.GetRetryPolicy(provider.GetRequiredService<ILogger<Client>>(),
return CogniteExtensions.GetRetryPolicy(provider.GetService<ILogger<Client>>(),
retryConfig?.MaxRetries, retryConfig?.MaxDelay);

})
Expand Down Expand Up @@ -86,9 +88,10 @@ public static void AddCogniteClient(this IServiceCollection services,
services.AddSingleton<IMetrics, CdfMetricCollector>();
services.AddTransient(provider =>
{
var cdfBuilder = provider.GetRequiredService<Client.Builder>();
var conf = provider.GetService<CogniteConfig>();
if ((conf == null || conf.Project?.TrimToNull() == null) && !required) return null;
var auth = provider.GetService<IAuthenticator>();
var cdfBuilder = provider.GetRequiredService<Client.Builder>();
var logger = setLogger ?
provider.GetRequiredService<ILogger<Client>>() : null;
CogniteExtensions.AddExtensionLoggers(provider);
Expand Down
8 changes: 5 additions & 3 deletions ExtractorUtils/Configuration/Configuration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public static T AddConfig<T>(this IServiceCollection services,
/// <param name="addStateStore">True to add state store, used if extractor reads history</param>
/// <param name="addLogger">True to add logger</param>
/// <param name="addMetrics">True to add metrics</param>
/// <param name="requireDestination">True to fail if a destination cannot be configured</param>
/// <param name="config">Optional pre-defined config object to use instead of reading from file</param>
/// <exception cref="ConfigurationException">Thrown when the version is not valid,
/// the yaml file is not found or in case of yaml parsing error</exception>
Expand All @@ -68,6 +69,7 @@ public static T AddExtractorDependencies<T>(
bool addStateStore,
bool addLogger = true,
bool addMetrics = true,
bool requireDestination = true,
T config = null) where T : VersionedConfig
{
if (config != null)
Expand All @@ -84,7 +86,7 @@ public static T AddExtractorDependencies<T>(
{
config = services.AddConfig<T>(configPath, acceptedConfigVersions);
}
services.AddCogniteClient(appId, userAgent, addLogger, addMetrics);
services.AddCogniteClient(appId, userAgent, addLogger, addMetrics, true, requireDestination);
if (addStateStore) services.AddStateStore();
if (addLogger) services.AddLogger();
if (addMetrics) services.AddMetrics();
Expand All @@ -103,9 +105,9 @@ public static void AddExtractionRun(this IServiceCollection services, bool setLo
{
var logger = setLogger ?
provider.GetRequiredService<ILogger<ExtractionRun>>() : null;
var destination = provider.GetRequiredService<CogniteDestination>();
var destination = provider.GetService<CogniteDestination>();
var config = provider.GetService<CogniteConfig>();

if (config == null || destination == null) return null;
if (config?.ExtractionPipeline == null || config.ExtractionPipeline.PipelineId == null) return null;
return new ExtractionRun(config.ExtractionPipeline, destination, logger);
});
Expand Down
13 changes: 6 additions & 7 deletions ExtractorUtils/ExtractorRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public static class ExtractorRunner
/// <param name="extServices">Optional pre-configured service collection</param>
/// <param name="startupLogger">Optional logger to use before config has been loaded, to report configuration issues</param>
/// <param name="config">Optional pre-existing config object, can be used instead of config path.</param>
/// <param name="requireDestination">Default true, whether to fail if a destination cannot be configured</param>
/// <returns>Task which completes when the extractor has run</returns>
public static async Task Run<TConfig, TExtractor>(
string configPath,
Expand All @@ -56,7 +57,8 @@ public static async Task Run<TConfig, TExtractor>(
Action<TConfig> configCallback = null,
ServiceCollection extServices = null,
ILogger startupLogger = null,
TConfig config = null)
TConfig config = null,
bool requireDestination = true)
where TConfig : VersionedConfig
where TExtractor : BaseExtractor
{
Expand All @@ -80,17 +82,14 @@ void CancelKeyPressHandler(object sender, ConsoleCancelEventArgs eArgs)

if (extServices != null)
{
foreach (var service in extServices)
{
services.Add(service);
}
services.Add(extServices);
}

ConfigurationException exception = null;
try
{
config = services.AddExtractorDependencies<TConfig>(configPath, acceptedConfigVersions,
appId, userAgent, addStateStore, addLogger, addMetrics);
config = services.AddExtractorDependencies(configPath, acceptedConfigVersions,
appId, userAgent, addStateStore, addLogger, addMetrics, requireDestination, config);
configCallback?.Invoke(config);
}
catch (AggregateException ex)
Expand Down

0 comments on commit 65e28f7

Please sign in to comment.