Skip to content

Commit

Permalink
fix: fix delivering traceId
Browse files Browse the repository at this point in the history
  • Loading branch information
Qinyouzeng committed May 8, 2024
1 parent d688900 commit dfc0741
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,21 @@ namespace Masa.Scheduler.Contracts.Server.Infrastructure.Utils;

public static class HttpUtils
{
public static HttpMethod ConvertHttpMethod(Contracts.Server.Infrastructure.Enums.HttpMethods methods)
private static ActivitySource _source = new ActivitySource("Masa.Scheduler.Background");

public static HttpMethod ConvertHttpMethod(HttpMethods methods)
{
switch (methods)
{
case Contracts.Server.Infrastructure.Enums.HttpMethods.GET:
case HttpMethods.GET:
return HttpMethod.Get;
case Contracts.Server.Infrastructure.Enums.HttpMethods.POST:
case HttpMethods.POST:
return HttpMethod.Post;
case Contracts.Server.Infrastructure.Enums.HttpMethods.HEAD:
case HttpMethods.HEAD:
return HttpMethod.Head;
case Contracts.Server.Infrastructure.Enums.HttpMethods.PUT:
case HttpMethods.PUT:
return HttpMethod.Put;
case Contracts.Server.Infrastructure.Enums.HttpMethods.DELETE:
case HttpMethods.DELETE:
return HttpMethod.Delete;
default:
throw new UserFriendlyException($"Cannot convert method: {methods}");
Expand All @@ -32,11 +34,17 @@ public static void AddHttpHeader(HttpClient client, List<KeyValuePair<string, st
}
}

public static Uri GetRequestUrl(string requestUrl, List<KeyValuePair<string, string>> httpParameters)
public static Activity? SetTraceParent(string? traceId, string? spanId)
{
var builder = new UriBuilder(requestUrl);
return _source.StartActivity("Background Task", ActivityKind.Consumer, $"00-{traceId}-{spanId}-01");
}

builder.Query = string.Join("&", httpParameters.Select(p => $"{p.Key}={p.Value}"));
public static Uri GetRequestUrl(string requestUrl, List<KeyValuePair<string, string>> httpParameters)
{
var builder = new UriBuilder(requestUrl)
{
Query = string.Join("&", httpParameters.Select(p => $"{p.Key}={p.Value}"))
};

return builder.Uri;
}
Expand All @@ -50,11 +58,11 @@ public static Uri GetRequestUrl(string requestUrl, List<KeyValuePair<string, str

var contentType = "text/plain";

if ((content.StartsWith("{") && content.EndsWith("}")) || content.StartsWith("[") && content.EndsWith("]"))
if ((content.StartsWith('{') && content.EndsWith('}')) || content.StartsWith('[') && content.EndsWith(']'))
{
contentType = "application/json";
}
else if (content.StartsWith("<") && content.EndsWith(">"))
else if (content.StartsWith('<') && content.EndsWith('>'))
{
contentType = "application/xml";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public override async Task OnManagerStartAsync()
Logger.LogError(ex, "OnManagerStartAsync");
}
}

public async Task ProcessTaskRun()
{
try
Expand Down Expand Up @@ -141,8 +141,7 @@ public async Task ProcessTaskRun()
public Task StartTaskAsync(SchedulerWorkerManagerData data, Guid taskId, SchedulerJobDto job, DateTimeOffset excuteTime, string? traceId = null, string? spanId = null)
{
var cts = new CancellationTokenSource();
var internalCts = new CancellationTokenSource();

var internalCts = new CancellationTokenSource();
data.TaskCancellationTokenSources.TryAdd(taskId, cts);

data.InternalCancellationTokenSources.TryAdd(taskId, internalCts);
Expand Down Expand Up @@ -182,11 +181,12 @@ public Task StartTaskAsync(SchedulerWorkerManagerData data, Guid taskId, Schedul

_ = Task.Run(async () =>
{
var activity = HttpUtils.SetTraceParent(traceId, spanId);
var managerData = ServiceProvider.GetRequiredService<SchedulerWorkerManagerData>();

try
{
_schedulerLogger.LogInformation($"Task run", WriterTypes.Worker, taskId, job.Id);

var runStatus = await taskHandler.RunTask(taskId, job, excuteTime, traceId, spanId, internalCts.Token);

if (!job.IsAsync || (job.IsAsync && runStatus != TaskRunStatus.Success))
Expand All @@ -201,6 +201,7 @@ public Task StartTaskAsync(SchedulerWorkerManagerData data, Guid taskId, Schedul
}
finally
{
activity?.Dispose();
cts?.Dispose();
internalCts?.Dispose();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright (c) MASA Stack All rights reserved.
// Licensed under the Apache License. See LICENSE.txt in the project root for license information.

using OpenTelemetry.Instrumentation.AspNetCore;
using OpenTelemetry.Logs;
using OpenTelemetry.Metrics;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;

namespace Microsoft.Extensions.DependencyInjection;

public static class OpenTelemetrySchedulerExtensions
{
public static IServiceCollection AddSchedulerObservable(this IServiceCollection services,
ILoggingBuilder loggingBuilder,
Func<MasaObservableOptions> optionsConfigure,
Func<string>? otlpUrlConfigure = null)
{
var options = optionsConfigure();
var otlpUrl = otlpUrlConfigure?.Invoke() ?? string.Empty;

ArgumentNullException.ThrowIfNull(options);

Uri? uri = null;
if (!string.IsNullOrEmpty(otlpUrl) && !Uri.TryCreate(otlpUrl, UriKind.Absolute, out uri))
throw new UriFormatException($"{nameof(otlpUrl)}:{otlpUrl} is invalid url");

var resources = ResourceBuilder.CreateDefault().AddMasaService(options);
loggingBuilder.AddMasaOpenTelemetry(builder =>
{
builder.SetResourceBuilder(resources);
builder.AddOtlpExporter(otlp => otlp.Endpoint = uri);
});

services.AddMasaMetrics(builder =>
{
builder.SetResourceBuilder(resources);
builder.AddOtlpExporter(options =>
{
options.Endpoint = uri;
});
});

services.AddMasaTracing(
builder =>
{
builder.SetResourceBuilder(resources);
builder.AddOtlpExporter(options => options.Endpoint = uri);
builder.AddSource("Masa.Scheduler.Background");
},
builder =>
{
builder.AspNetCoreInstrumentationOptions.AppendDefaultFilter(builder, false);
builder.AspNetCoreInstrumentationOptions.AppendSchedulerFilter(builder);
});

return services;
}

public static void AppendSchedulerFilter(this Action<AspNetCoreInstrumentationOptions> options,
OpenTelemetryInstrumentationOptions openTelemetryInstrumentationOptions)
{
options += opt =>
{
opt.Filter = (HttpContext httpContext) =>
{
var url = httpContext.Request.Path.Value ?? "/";
return !(url == "/" || url.Contains("/heartbeat"));
};
};
openTelemetryInstrumentationOptions.AspNetCoreInstrumentationOptions += options;
}
}

This file was deleted.

This file was deleted.

This file was deleted.

2 changes: 1 addition & 1 deletion src/Services/Masa.Scheduler.Services.Worker/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
var identityServerUrl = masaStackConfig.GetSsoDomain();
builder.Services.AddDaprClient();

builder.Services.AddObservable(builder.Logging, () =>
builder.Services.AddSchedulerObservable(builder.Logging, () =>
{
return new MasaObservableOptions
{
Expand Down

0 comments on commit dfc0741

Please sign in to comment.