diff --git a/src/Contracts/Masa.Scheduler.Contracts.Server/Infrastructure/Utils/HttpUtils.cs b/src/Contracts/Masa.Scheduler.Contracts.Server/Infrastructure/Utils/HttpUtils.cs index b8cf6905..07bcea95 100644 --- a/src/Contracts/Masa.Scheduler.Contracts.Server/Infrastructure/Utils/HttpUtils.cs +++ b/src/Contracts/Masa.Scheduler.Contracts.Server/Infrastructure/Utils/HttpUtils.cs @@ -5,19 +5,21 @@ namespace Masa.Scheduler.Contracts.Server.Infrastructure.Utils; public static class HttpUtils { - public static HttpMethod ConvertHttpMethod(Contracts.Server.Infrastructure.Enums.HttpMethods methods) + private static ActivitySource _source = new ActivitySource("Masa.Scheduler.Background"); + + public static HttpMethod ConvertHttpMethod(HttpMethods methods) { switch (methods) { - case Contracts.Server.Infrastructure.Enums.HttpMethods.GET: + case HttpMethods.GET: return HttpMethod.Get; - case Contracts.Server.Infrastructure.Enums.HttpMethods.POST: + case HttpMethods.POST: return HttpMethod.Post; - case Contracts.Server.Infrastructure.Enums.HttpMethods.HEAD: + case HttpMethods.HEAD: return HttpMethod.Head; - case Contracts.Server.Infrastructure.Enums.HttpMethods.PUT: + case HttpMethods.PUT: return HttpMethod.Put; - case Contracts.Server.Infrastructure.Enums.HttpMethods.DELETE: + case HttpMethods.DELETE: return HttpMethod.Delete; default: throw new UserFriendlyException($"Cannot convert method: {methods}"); @@ -32,11 +34,17 @@ public static void AddHttpHeader(HttpClient client, List> httpParameters) + public static Activity? SetTraceParent(string? traceId, string? spanId) { - var builder = new UriBuilder(requestUrl); + return _source.StartActivity("Background Task", ActivityKind.Consumer, $"00-{traceId}-{spanId}-01"); + } - builder.Query = string.Join("&", httpParameters.Select(p => $"{p.Key}={p.Value}")); + public static Uri GetRequestUrl(string requestUrl, List> httpParameters) + { + var builder = new UriBuilder(requestUrl) + { + Query = string.Join("&", httpParameters.Select(p => $"{p.Key}={p.Value}")) + }; return builder.Uri; } @@ -50,11 +58,11 @@ public static Uri GetRequestUrl(string requestUrl, List")) + else if (content.StartsWith('<') && content.EndsWith('>')) { contentType = "application/xml"; } diff --git a/src/Services/Masa.Scheduler.Services.Worker/Domain/Managers/Workers/SchedulerWorkerManager.cs b/src/Services/Masa.Scheduler.Services.Worker/Domain/Managers/Workers/SchedulerWorkerManager.cs index 6317bf9b..8ecb95c0 100644 --- a/src/Services/Masa.Scheduler.Services.Worker/Domain/Managers/Workers/SchedulerWorkerManager.cs +++ b/src/Services/Masa.Scheduler.Services.Worker/Domain/Managers/Workers/SchedulerWorkerManager.cs @@ -90,7 +90,7 @@ public override async Task OnManagerStartAsync() Logger.LogError(ex, "OnManagerStartAsync"); } } - + public async Task ProcessTaskRun() { try @@ -141,8 +141,7 @@ public async Task ProcessTaskRun() public Task StartTaskAsync(SchedulerWorkerManagerData data, Guid taskId, SchedulerJobDto job, DateTimeOffset excuteTime, string? traceId = null, string? spanId = null) { var cts = new CancellationTokenSource(); - var internalCts = new CancellationTokenSource(); - + var internalCts = new CancellationTokenSource(); data.TaskCancellationTokenSources.TryAdd(taskId, cts); data.InternalCancellationTokenSources.TryAdd(taskId, internalCts); @@ -182,11 +181,12 @@ public Task StartTaskAsync(SchedulerWorkerManagerData data, Guid taskId, Schedul _ = Task.Run(async () => { + var activity = HttpUtils.SetTraceParent(traceId, spanId); var managerData = ServiceProvider.GetRequiredService(); - try { _schedulerLogger.LogInformation($"Task run", WriterTypes.Worker, taskId, job.Id); + var runStatus = await taskHandler.RunTask(taskId, job, excuteTime, traceId, spanId, internalCts.Token); if (!job.IsAsync || (job.IsAsync && runStatus != TaskRunStatus.Success)) @@ -201,6 +201,7 @@ public Task StartTaskAsync(SchedulerWorkerManagerData data, Guid taskId, Schedul } finally { + activity?.Dispose(); cts?.Dispose(); internalCts?.Dispose(); diff --git a/src/Services/Masa.Scheduler.Services.Worker/Extensions/OpenTelemetrySchedulerExtensions.cs b/src/Services/Masa.Scheduler.Services.Worker/Extensions/OpenTelemetrySchedulerExtensions.cs new file mode 100644 index 00000000..71f0e7b1 --- /dev/null +++ b/src/Services/Masa.Scheduler.Services.Worker/Extensions/OpenTelemetrySchedulerExtensions.cs @@ -0,0 +1,73 @@ +// Copyright (c) MASA Stack All rights reserved. +// Licensed under the Apache License. See LICENSE.txt in the project root for license information. + +using OpenTelemetry.Instrumentation.AspNetCore; +using OpenTelemetry.Logs; +using OpenTelemetry.Metrics; +using OpenTelemetry.Resources; +using OpenTelemetry.Trace; + +namespace Microsoft.Extensions.DependencyInjection; + +public static class OpenTelemetrySchedulerExtensions +{ + public static IServiceCollection AddSchedulerObservable(this IServiceCollection services, + ILoggingBuilder loggingBuilder, + Func optionsConfigure, + Func? otlpUrlConfigure = null) + { + var options = optionsConfigure(); + var otlpUrl = otlpUrlConfigure?.Invoke() ?? string.Empty; + + ArgumentNullException.ThrowIfNull(options); + + Uri? uri = null; + if (!string.IsNullOrEmpty(otlpUrl) && !Uri.TryCreate(otlpUrl, UriKind.Absolute, out uri)) + throw new UriFormatException($"{nameof(otlpUrl)}:{otlpUrl} is invalid url"); + + var resources = ResourceBuilder.CreateDefault().AddMasaService(options); + loggingBuilder.AddMasaOpenTelemetry(builder => + { + builder.SetResourceBuilder(resources); + builder.AddOtlpExporter(otlp => otlp.Endpoint = uri); + }); + + services.AddMasaMetrics(builder => + { + builder.SetResourceBuilder(resources); + builder.AddOtlpExporter(options => + { + options.Endpoint = uri; + }); + }); + + services.AddMasaTracing( + builder => + { + builder.SetResourceBuilder(resources); + builder.AddOtlpExporter(options => options.Endpoint = uri); + builder.AddSource("Masa.Scheduler.Background"); + }, + builder => + { + builder.AspNetCoreInstrumentationOptions.AppendDefaultFilter(builder, false); + builder.AspNetCoreInstrumentationOptions.AppendSchedulerFilter(builder); + }); + + return services; + } + + public static void AppendSchedulerFilter(this Action options, + OpenTelemetryInstrumentationOptions openTelemetryInstrumentationOptions) + { + options += opt => + { + opt.Filter = (HttpContext httpContext) => + { + var url = httpContext.Request.Path.Value ?? "/"; + return !(url == "/" || url.Contains("/heartbeat")); + }; + }; + openTelemetryInstrumentationOptions.AspNetCoreInstrumentationOptions += options; + } +} \ No newline at end of file diff --git a/src/Services/Masa.Scheduler.Services.Worker/Extensions/TracingAop/HttpClientTracingInterceptorBase.cs b/src/Services/Masa.Scheduler.Services.Worker/Extensions/TracingAop/HttpClientTracingInterceptorBase.cs deleted file mode 100644 index b360d01c..00000000 --- a/src/Services/Masa.Scheduler.Services.Worker/Extensions/TracingAop/HttpClientTracingInterceptorBase.cs +++ /dev/null @@ -1,18 +0,0 @@ -// Copyright (c) MASA Stack All rights reserved. -// Licensed under the Apache License. See LICENSE.txt in the project root for license information. - -namespace Masa.Scheduler.Services.Worker.Extensions.TracingAop; - -public abstract class HttpClientTracingInterceptorBase : IHttpClientTracingInterceptor -{ - private IServiceScope _serviceScope = MasaApp.RootServiceProvider.CreateScope(); - - public IServiceProvider? ServiceProvider { get => _serviceScope.ServiceProvider; } - - public abstract void OnException(Activity activity, Exception exception); - - public abstract void OnHttpRequestMessage(Activity activity, HttpRequestMessage requestMessage); - - public abstract void OnHttpResponseMessage(Activity activity, HttpResponseMessage responseMessage); - -} diff --git a/src/Services/Masa.Scheduler.Services.Worker/Extensions/TracingAop/IHttpClientTracingInterceptor.cs b/src/Services/Masa.Scheduler.Services.Worker/Extensions/TracingAop/IHttpClientTracingInterceptor.cs deleted file mode 100644 index d82da8e9..00000000 --- a/src/Services/Masa.Scheduler.Services.Worker/Extensions/TracingAop/IHttpClientTracingInterceptor.cs +++ /dev/null @@ -1,12 +0,0 @@ -// Copyright (c) MASA Stack All rights reserved. -// Licensed under the Apache License. See LICENSE.txt in the project root for license information. - -namespace Masa.Scheduler.Services.Worker.Extensions.TracingAop; - -public interface IHttpClientTracingInterceptor : ISingletonDependency -{ - public void OnHttpResponseMessage(Activity activity, HttpResponseMessage responseMessage); - - public void OnHttpRequestMessage(Activity activity, HttpRequestMessage requestMessage); - -} diff --git a/src/Services/Masa.Scheduler.Services.Worker/Extensions/TracingAop/TaskHttpClientTracingInterceptor.cs b/src/Services/Masa.Scheduler.Services.Worker/Extensions/TracingAop/TaskHttpClientTracingInterceptor.cs deleted file mode 100644 index 1960d7f5..00000000 --- a/src/Services/Masa.Scheduler.Services.Worker/Extensions/TracingAop/TaskHttpClientTracingInterceptor.cs +++ /dev/null @@ -1,35 +0,0 @@ -// Copyright (c) MASA Stack All rights reserved. -// Licensed under the Apache License. See LICENSE.txt in the project root for license information. - -namespace Masa.Scheduler.Services.Worker.Extensions.TracingAop; - -public class TaskHttpClientTracingInterceptor : HttpClientTracingInterceptorBase -{ - public override void OnHttpResponseMessage(Activity activity, HttpResponseMessage responseMessage) - { - - } - - public override void OnHttpRequestMessage(Activity activity, HttpRequestMessage requestMessage) - { - var queryString = requestMessage.RequestUri!.Query.Trim('?'); - if (queryString.Contains("traceId")) - { - var traceIdQuery = queryString.Split("&").FirstOrDefault(e => e.Contains("traceId")); - var spanIdQuery = queryString.Split("&").FirstOrDefault(e => e.Contains("spanId")); - var traceId = traceIdQuery!.Split('=')[1]; - var spanId = spanIdQuery!.Split('=')[1]; - - if (traceId.IsNullOrEmpty() == false && spanId.IsNullOrEmpty() == false) - { - activity.SetParentId(ActivityTraceId.CreateFromString(traceId), ActivitySpanId.CreateFromString(spanId)); - } - } - - } - - public override void OnException(Activity activity, Exception exception) - { - - } -} diff --git a/src/Services/Masa.Scheduler.Services.Worker/Program.cs b/src/Services/Masa.Scheduler.Services.Worker/Program.cs index ca27ac45..f7485c34 100644 --- a/src/Services/Masa.Scheduler.Services.Worker/Program.cs +++ b/src/Services/Masa.Scheduler.Services.Worker/Program.cs @@ -20,7 +20,7 @@ var identityServerUrl = masaStackConfig.GetSsoDomain(); builder.Services.AddDaprClient(); -builder.Services.AddObservable(builder.Logging, () => +builder.Services.AddSchedulerObservable(builder.Logging, () => { return new MasaObservableOptions {