From d60060978d1d8a39ccc5b3df1dbc0c5d2e9b2338 Mon Sep 17 00:00:00 2001 From: qinyouzeng Date: Thu, 16 Nov 2023 09:26:58 +0800 Subject: [PATCH] fix: fix query in and not in error; fix init table sql; fix query time using UTC time --- .../Extensions/IDbConnectionExtensitions.cs | 165 ++++++++------- .../Extensions/ServiceExtensitions.cs | 197 +++++++++--------- .../Model/MASAStackClickhouseConnection.cs | 27 +-- .../_Imports.cs | 1 + .../Common.cs | 17 +- .../Extensions/ServiceExtensitionsTests.cs | 26 --- .../LogServiceTests.cs | 14 +- .../TraceServiceTests.cs | 9 +- 8 files changed, 224 insertions(+), 232 deletions(-) delete mode 100644 src/Contrib/StackSdks/Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests/Extensions/ServiceExtensitionsTests.cs diff --git a/src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc.Clickhouse/Extensions/IDbConnectionExtensitions.cs b/src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc.Clickhouse/Extensions/IDbConnectionExtensitions.cs index 1fbc1e292..17b00b122 100644 --- a/src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc.Clickhouse/Extensions/IDbConnectionExtensitions.cs +++ b/src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc.Clickhouse/Extensions/IDbConnectionExtensitions.cs @@ -7,33 +7,52 @@ internal static class IDbConnectionExtensitions { public static PaginatedListBase QueryTrace(this IDbConnection connection, BaseRequestDto query) { - var sql = AppendWhere(query); + var (where, parameters, ors) = AppendWhere(query); var orderBy = AppendOrderBy(query, false); - var total = Convert.ToInt64(ExecuteScalar(connection, $"select count(1) from {MasaStackClickhouseConnection.TraceTable} where {sql.where}", sql.parameters?.ToArray())); + var countSql = CombineOrs($"select count() as `total` from {MasaStackClickhouseConnection.TraceTable} where {where}", ors); + var total = Convert.ToInt64(ExecuteScalar(connection, $"select sum(`total`) from {countSql}", parameters?.ToArray())); var start = (query.Page - 1) * query.PageSize; var result = new PaginatedListBase() { Total = total, Result = new() }; if (total > 0 && start - total < 0) { - result.Result = Query(connection, $"select * from(select ServiceName,Timestamp,TraceId,SpanId,ParentSpanId,TraceState,SpanKind,Duration,SpanName,toJSONString(SpanAttributes) as Spans,toJSONString(ResourceAttributes) as Resources from {MasaStackClickhouseConnection.TraceTable} where {sql.where} {orderBy}) as t limit {start},{query.PageSize}", sql.parameters?.ToArray(), ConvertTraceDto); + var querySql = CombineOrs($"select ServiceName,Timestamp,TraceId,SpanId,ParentSpanId,TraceState,SpanKind,Duration,SpanName,Spans,Resources from {MasaStackClickhouseConnection.TraceTable} where {where}", ors,orderBy); + result.Result = Query(connection, $"select * from {querySql} as t limit {start},{query.PageSize}", parameters?.ToArray(), ConvertTraceDto); } return result; } public static PaginatedListBase QueryLog(this IDbConnection connection, BaseRequestDto query) { - var sql = AppendWhere(query, false); + var (where, parameters, ors) = AppendWhere(query, false); var orderBy = AppendOrderBy(query, true); - var total = Convert.ToInt64(ExecuteScalar(connection, $"select count(1) from {MasaStackClickhouseConnection.LogTable} where {sql.where}", sql.parameters?.ToArray())); + var countSql = CombineOrs($"select count() as `total` from {MasaStackClickhouseConnection.LogTable} where {where}", ors); + var total = Convert.ToInt64(ExecuteScalar(connection, $"select sum(`total`) from {countSql}", parameters?.ToArray())); var start = (query.Page - 1) * query.PageSize; var result = new PaginatedListBase() { Total = total, Result = new() }; + if (total > 0 && start - total < 0) { - result.Result = Query(connection, $"select * from(select Timestamp,TraceId,SpanId,TraceFlags,SeverityText,SeverityNumber,ServiceName,Body,toJSONString(ResourceAttributes) as Resources,toJSONString(LogAttributes) as Logs from {MasaStackClickhouseConnection.LogTable} where {sql.where} {orderBy}) as t limit {start},{query.PageSize}", sql.parameters?.ToArray(), ConvertLogDto); + var querySql = CombineOrs($"select Timestamp,TraceId,SpanId,TraceFlags,SeverityText,SeverityNumber,ServiceName,Body,Resources,Logs from {MasaStackClickhouseConnection.LogTable} where {where}", ors, orderBy); + result.Result = Query(connection, $"select * from {querySql} as t limit {start},{query.PageSize}", parameters?.ToArray(), ConvertLogDto); } return result; } + private static string CombineOrs(string sql, IEnumerable ors, string? orderBy = null) + { + if (ors == null || !ors.Any()) + return $"({sql} {orderBy})"; + + var text = new StringBuilder(); + foreach (var or in ors) + { + text.AppendLine($" union all {sql}{or} {orderBy}"); + } + text.Remove(0, 11).Insert(0, '(').Append(')'); + return text.ToString(); + } + public static List GetMapping(this IDbConnection dbConnection, bool isLog) { var type = isLog ? "log" : "trace"; @@ -52,16 +71,16 @@ public static List GetMapping(this IDbConnection dbConnectio public static List GetTraceByTraceId(this IDbConnection connection, string traceId) { string where = $"TraceId=@TraceId"; - return Query(connection, $"select * from (select Timestamp,TraceId,SpanId,ParentSpanId,TraceState,SpanKind,Duration,SpanName,toJSONString(SpanAttributes) as Spans,toJSONString(ResourceAttributes) as Resources from {MasaStackClickhouseConnection.TraceTable} where {where}) as t limit 1000", new IDataParameter[] { new ClickHouseParameter { ParameterName = "TraceId", Value = traceId } }, ConvertTraceDto); + return Query(connection, $"select * from (select Timestamp,TraceId,SpanId,ParentSpanId,TraceState,SpanKind,Duration,SpanName,Spans,Resources from {MasaStackClickhouseConnection.TraceTable} where {where}) as t limit 1000", new IDataParameter[] { new ClickHouseParameter { ParameterName = "TraceId", Value = traceId } }, ConvertTraceDto); } public static string AppendOrderBy(BaseRequestDto query, bool isLog) { - var str = query.Sort?.IsDesc ?? false ? " desc" : ""; + var str = query.Sort?.IsDesc ?? true ? " desc" : ""; return $" order by Timestamp{str}"; } - public static (string where, List @parameters) AppendWhere(BaseRequestDto query, bool isTrace = true) + public static (string where, List @parameters, List ors) AppendWhere(BaseRequestDto query, bool isTrace = true) { var sql = new StringBuilder(); var @paramerters = new List(); @@ -81,23 +100,15 @@ public static (string where, List @parameters) AppendWhere(BaseR } if (!string.IsNullOrEmpty(query.Instance)) { - sql.Append(" and ResourceAttributes['service.instance.id']=@ServiceInstanceId"); + sql.Append(" and `Resource.service.instance.id`=@ServiceInstanceId"); @paramerters.Add(new ClickHouseParameter() { ParameterName = "ServiceInstanceId", Value = query.Instance }); } - if (!string.IsNullOrEmpty(query.Endpoint)) + if (isTrace && !string.IsNullOrEmpty(query.Endpoint)) { - if (isTrace) - { - sql.Append(" and SpanKind=@SpanKind and SpanAttributes['http.target']=@HttpTarget"); - @paramerters.Add(new ClickHouseParameter() { ParameterName = "SpanKind", Value = "SPAN_KIND_SERVER" }); - } - else - { - sql.Append(" and mapContains(LogAttributes, 'Host') and LogAttributes['RequestPath']=@HttpTarget"); - } + sql.Append(" and `Attributes.http.target`=@HttpTarget"); @paramerters.Add(new ClickHouseParameter() { ParameterName = "HttpTarget", Value = query.Instance }); } - AppendKeyword(query.Keyword, sql, paramerters, isTrace); + var ors = AppendKeyword(query.Keyword, paramerters, isTrace); AppendConditions(query.Conditions, paramerters, sql, isTrace); if (!string.IsNullOrEmpty(query.RawQuery)) @@ -105,39 +116,40 @@ public static (string where, List @parameters) AppendWhere(BaseR if (sql.Length > 0) sql.Remove(0, 4); - return (sql.ToString(), @paramerters); + return (sql.ToString(), @paramerters, ors); } - private static void AppendKeyword(string keyword, StringBuilder sql, List @paramerters, bool isTrace = true) + private static List AppendKeyword(string keyword, List @paramerters, bool isTrace = true) { + var sqls = new List(); if (string.IsNullOrEmpty(keyword)) - return; + return sqls; //status_code if (int.TryParse(keyword, out var num) && num != 0 && num - 1000 < 0 && isTrace) { - sql.Append(" and (SpanAttributes['http.status_code']=@HttpStatusCode or SpanAttributes['http.request_content_body'] like @Keyword)"); + sqls.Add(" and `Attributes.http.status_code`=@HttpStatusCode"); + sqls.Add(" and `Attributes.http.request_content_body` like @Keyword"); paramerters.Add(new ClickHouseParameter() { ParameterName = "HttpStatusCode", Value = num }); paramerters.Add(new ClickHouseParameter() { ParameterName = "Keyword", Value = $"%{keyword}%" }); - return; + return sqls; } - if (isTrace) { - sql.Append(@" and (SpanAttributes['http.request_content_body'] like @Keyword - or SpanAttributes['http.url'] like @Keyword - or SpanAttributes['http.response_content_body'] like @Keyword - or mapContains(SpanAttributes, 'exception.message') and SpanAttributes['exception.message'] like @Keyword)"); + sqls.Add(" and `Attributes.http.request_content_body` like @Keyword"); + sqls.Add(" and `Attributes.http.response_content_body` like @Keyword"); + sqls.Add(" and `Attributes.exception.message` like @Keyword"); } else { - sql.Append(@" and (Body like @Keyword - or LogAttributes['HttpTarget'] like @Keyword - or LogAttributes['http.response_content_body'] like @Keyword - or mapContains(LogAttributes, 'exception.message') and LogAttributes['exception.message'] like @Keyword)"); + if (keyword.Equals("error", StringComparison.CurrentCultureIgnoreCase)) + sqls.Add(" and SeverityText='Error'"); + sqls.Add(" and Body like @Keyword"); + sqls.Add(" and `Attributes.exception.message` like @Keyword"); } paramerters.Add(new ClickHouseParameter() { ParameterName = "Keyword", Value = $"%{keyword}%" }); + return sqls; } private static void AppendConditions(IEnumerable? conditions, List @paramerters, StringBuilder sql, bool isTrace = true) @@ -230,7 +242,10 @@ private static void AppendField(FieldConditionDto item, List @pa private static void ParseWhere(StringBuilder sql, object value, List @paramerters, string fieldName, string paramName, string compare) { DbType dbType = value is DateTime ? DbType.DateTime2 : DbType.AnsiString; - sql.Append($" and {fieldName} {compare} @{paramName}"); + if (value is IEnumerable) + sql.Append($" and {fieldName} {compare} (@{paramName})"); + else + sql.Append($" and {fieldName} {compare} @{paramName}"); @paramerters.Add(new ClickHouseParameter { ParameterName = $"{paramName}", Value = value, DbType = dbType }); } @@ -308,6 +323,8 @@ public static MappingResponseDto ConvertToMapping(IDataReader reader) public static TraceResponseDto ConvertTraceDto(IDataReader reader) { var startTime = Convert.ToDateTime(reader["Timestamp"]); + //var localTime = startTime.ToUniversalTime(); + //startTime = startTime.Add(startTime - localTime); long ns = Convert.ToInt64(reader["Duration"]); string resource = reader["Resources"].ToString()!, spans = reader["Spans"].ToString()!; var result = new TraceResponseDto @@ -338,7 +355,7 @@ public static LogResponseDto ConvertLogDto(IDataReader reader) SeverityText = reader["SeverityText"].ToString()!, TraceFlags = Convert.ToInt32(reader["TraceFlags"]), SpanId = reader["SpanId"].ToString()!, - Timestamp = Convert.ToDateTime(reader["Timestamp"]), + Timestamp = Convert.ToDateTime(reader["Timestamp"]).ToLocalTime(), }; if (!string.IsNullOrEmpty(resource)) result.Resource = JsonSerializer.Deserialize>(resource)!; @@ -353,21 +370,9 @@ public static object AggregationQuery(this IDbConnection dbConnection, SimpleAgg var append = new StringBuilder(); var appendWhere = new StringBuilder(); var name = GetName(requestDto.Name, isLog); - if (name.StartsWith("ResourceAttributes[", StringComparison.CurrentCultureIgnoreCase)) - { - var filed = requestDto.Name["resource.".Length..]; - appendWhere.Append($" mapContains(ResourceAttributes,'{filed}') and "); - } - else if (requestDto.Name.StartsWith("attributes.", StringComparison.CurrentCultureIgnoreCase)) - { - var filed = requestDto.Name["attributes.".Length..]; - appendWhere.Append($" mapContains({(isLog ? "Log" : "Span")}Attributes,'{filed}') and "); - } - AppendAggtype(requestDto, sql, append, name, out var isScalar); - sql.AppendFormat(" from {0} ", isLog ? MasaStackClickhouseConnection.LogTable : MasaStackClickhouseConnection.TraceTable); - var (where, @paremeters) = AppendWhere(requestDto, !isLog); + var (where, @paremeters, _) = AppendWhere(requestDto, !isLog); sql.Append($" where {appendWhere} {where}"); sql.Append(append); var paramArray = @paremeters?.ToArray()!; @@ -433,7 +438,7 @@ private static void AppendAggtype(SimpleAggregateRequestDto requestDto, StringBu break; case AggregateTypes.GroupBy: sql.Append($"{name} as a,Count({name}) as b"); - append.Append($" Group By a order by a"); + append.Append($" and a<>'' Group By a order by b desc"); break; case AggregateTypes.DateHistogram: sql.Append($"toStartOfInterval({name}, INTERVAL {ConvertInterval(requestDto.Interval)} minute ) as `time`,count() as `count`"); @@ -444,30 +449,50 @@ private static void AppendAggtype(SimpleAggregateRequestDto requestDto, StringBu private static string GetName(string name, bool isLog) { - if (name.Equals("resource.service.name", StringComparison.CurrentCultureIgnoreCase)) - { - return "ServiceName"; - } - else if (name.Equals("@timestamp", StringComparison.CurrentCultureIgnoreCase)) - { + if (name.Equals("@timestamp", StringComparison.CurrentCultureIgnoreCase)) return "Timestamp"; - } - else if (name.StartsWith("resource.", StringComparison.CurrentCultureIgnoreCase)) - { - return $"ResourceAttributes['{name[("resource.".Length)..]}']"; - } - else if (name.StartsWith("attributes.", StringComparison.CurrentCultureIgnoreCase)) - { - return $"{(isLog ? "Log" : "Span")}Attributes['{name[("attributes.".Length)..]}']"; - } - else if (!isLog && name.Equals("kind", StringComparison.InvariantCultureIgnoreCase)) - { + if (!isLog && name.Equals("kind", StringComparison.InvariantCultureIgnoreCase)) return "SpanKind"; - } + + if (name.StartsWith("resource.", StringComparison.CurrentCultureIgnoreCase)) + return GetResourceName(name); + + if (name.StartsWith("attributes.", StringComparison.CurrentCultureIgnoreCase)) + return GetAttributeName(name, isLog); + return name; } + private static string GetResourceName(string name) + { + var field = name[("resource.".Length)..]; + if (field.Equals("service.name", StringComparison.CurrentCultureIgnoreCase)) + return "ServiceName"; + + if (field.Equals("service.namespace", StringComparison.CurrentCultureIgnoreCase) || field.Equals("service.instance.id", StringComparison.CurrentCultureIgnoreCase)) + return $"Resource.{field}"; + + return $"ResourceAttributesValues[indexOf(ResourceAttributesKeys,'{field}')]"; + } + + private static string GetAttributeName(string name, bool isLog) + { + var pre = isLog ? "Log" : "Span"; + var field = name[("attributes.".Length)..]; + if (isLog && (field.Equals("exception.message", StringComparison.CurrentCultureIgnoreCase))) + return $"Attributes.{field}"; + + if (!isLog && (field.Equals("http.status_code", StringComparison.CurrentCultureIgnoreCase) + || field.Equals("http.request_content_body", StringComparison.CurrentCultureIgnoreCase) + || field.Equals("http.response_content_body", StringComparison.CurrentCultureIgnoreCase) + || field.Equals("exception.message", StringComparison.CurrentCultureIgnoreCase)) + ) + return $"Attributes.{field}"; + + return $"{pre}AttributesValues[indexOf({pre}AttributesKeys,'{field}')]"; + } + public static int ConvertInterval(string s) { var unit = Regex.Replace(s, @"\d+", "", RegexOptions.IgnoreCase, TimeSpan.FromSeconds(5)); @@ -502,7 +527,7 @@ public static int ConvertInterval(string s) public static string GetMaxDelayTraceId(this IDbConnection dbConnection, BaseRequestDto requestDto) { - var (where, parameters) = AppendWhere(requestDto); + var (where, parameters, _) = AppendWhere(requestDto); var text = $"select * from( TraceId from {MasaStackClickhouseConnection.TraceTable} where {where} order by Duration desc) as t limit 1"; return dbConnection.ExecuteScalar(text, parameters?.ToArray())?.ToString()!; } diff --git a/src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc.Clickhouse/Extensions/ServiceExtensitions.cs b/src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc.Clickhouse/Extensions/ServiceExtensitions.cs index 65fc5d223..8c21d2a23 100644 --- a/src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc.Clickhouse/Extensions/ServiceExtensitions.cs +++ b/src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc.Clickhouse/Extensions/ServiceExtensitions.cs @@ -7,18 +7,9 @@ public static class ServiceExtensitions { internal static ILogger? Logger { get; private set; } - public static IServiceCollection AddMASAStackClickhouse(this IServiceCollection services, string connectionStr, string? logTable = null, string? traceTable = null) + public static IServiceCollection AddMASAStackClickhouse(this IServiceCollection services, string connectionStr, string logTable, string traceTable, string? logSourceTable = null, string? traceSourceTable = null) { - services.AddScoped(services => new MasaStackClickhouseConnection(connectionStr, logTable, traceTable)) - .AddScoped() - .AddScoped(); - Init(services, false); - return services; - } - - public static IServiceCollection AddMASAStackClickhouse(this IServiceCollection services, string connectionStr, string logTable, string traceTable, string logSourceTable, string traceSourceTable) - { - services.AddScoped(services => new MasaStackClickhouseConnection(connectionStr, logTable, logSourceTable, traceTable, traceSourceTable)) + services.AddScoped(services => new MasaStackClickhouseConnection(connectionStr, logTable, traceTable, logSourceTable, traceSourceTable)) .AddScoped() .AddScoped(); Init(services); @@ -41,140 +32,117 @@ private static void InitTable(MasaStackClickhouseConnection connection) var database = connection.ConnectionSettings?.Database; database ??= new ClickHouseConnectionSettings(connection.ConnectionString).Database; - if (Convert.ToInt32(connection.ExecuteScalar($"select * from system.tables where database ='{database}' and name in ['{MasaStackClickhouseConnection.TraceTable}','{MasaStackClickhouseConnection.LogTable}']")) > 0) + if (Convert.ToInt32(connection.ExecuteScalar($"select count() from system.tables where database ='{database}' and name in ['{MasaStackClickhouseConnection.TraceTable.Split('.')[1]}','{MasaStackClickhouseConnection.LogTable.Split('.')[1]}']")) > 0) return; - var createTableSqls = new string[]{ @$"CREATE TABLE {MasaStackClickhouseConnection.LogTable} -( + var createTableSqls = new string[]{ + @$"CREATE TABLE {MasaStackClickhouseConnection.LogTable} +( `Timestamp` DateTime64(9) CODEC(Delta(8), ZSTD(1)), - `TraceId` String CODEC(ZSTD(1)), - `SpanId` String CODEC(ZSTD(1)), - `TraceFlags` UInt32 CODEC(ZSTD(1)), - `SeverityText` LowCardinality(String) CODEC(ZSTD(1)), - `SeverityNumber` Int32 CODEC(ZSTD(1)), - `ServiceName` LowCardinality(String) CODEC(ZSTD(1)), - `Body` String CODEC(ZSTD(1)), - `ResourceSchemaUrl` String CODEC(ZSTD(1)), - - `ResourceAttributes` Map(LowCardinality(String), String) CODEC(ZSTD(1)), - + `Resources` String CODEC(ZSTD(1)), `ScopeSchemaUrl` String CODEC(ZSTD(1)), - `ScopeName` String CODEC(ZSTD(1)), - `ScopeVersion` String CODEC(ZSTD(1)), - - `ScopeAttributes` Map(LowCardinality(String), String) CODEC(ZSTD(1)), - - `LogAttributes` Map(LowCardinality(String), String) CODEC(ZSTD(1)), - - INDEX idx_trace_id TraceId TYPE bloom_filter(0.001) GRANULARITY 1, - - INDEX idx_res_attr_key mapKeys(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, - - INDEX idx_res_attr_value mapValues(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, - - INDEX idx_scope_attr_key mapKeys(ScopeAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, - - INDEX idx_scope_attr_value mapValues(ScopeAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, - - INDEX idx_log_attr_key mapKeys(LogAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, - - INDEX idx_log_attr_value mapValues(LogAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, - - INDEX idx_body Body TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 1 + `Scopes` String CODEC(ZSTD(1)), + `Logs` String CODEC(ZSTD(1)), + + `Resource.service.namespace` String CODEC(ZSTD(1)), + `Resource.service.version` String CODEC(ZSTD(1)), + `Resource.service.instance.id` String CODEC(ZSTD(1)), + + `Attributes.TaskId` String CODEC(ZSTD(1)), + `Attributes.exception.message` String CODEC(ZSTD(1)), + + ResourceAttributesKeys Array(String) CODEC(ZSTD(1)), + ResourceAttributesValues Array(String) CODEC(ZSTD(1)), + LogAttributesKeys Array(String) CODEC(ZSTD(1)), + LogAttributesValues Array(String) CODEC(ZSTD(1)), + + INDEX idx_log_id TraceId TYPE bloom_filter(0.001) GRANULARITY 1, + INDEX idx_log_servicename ServiceName TYPE bloom_filter(0.001) GRANULARITY 1, + INDEX idx_log_serviceinstanceid `Resource.service.instance.id` TYPE bloom_filter(0.001) GRANULARITY 1, + INDEX idx_log_severitytext SeverityText TYPE bloom_filter(0.001) GRANULARITY 1, + INDEX idx_log_taskid `Attributes.TaskId` TYPE bloom_filter(0.001) GRANULARITY 1, + + INDEX idx_string_body Body TYPE tokenbf_v1(30720, 2, 0) GRANULARITY 1, + INDEX idx_string_exceptionmessage Attributes.exception.message TYPE tokenbf_v1(30720, 2, 0) GRANULARITY 1 ) ENGINE = MergeTree -PARTITION BY toYYYYMM(Timestamp) +PARTITION BY toDate(Timestamp) ORDER BY ( -Timestamp, -ServiceName, -SeverityText, -TraceId, -SpanId -) + Timestamp, + `Resource.service.namespace`, + ServiceName + ) TTL toDateTime(Timestamp) + toIntervalDay(30) SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1; ", @$"CREATE TABLE {MasaStackClickhouseConnection.TraceTable} ( - `Timestamp` DateTime64(9) CODEC(Delta(8), - ZSTD(1)), - + `Timestamp` DateTime64(9) CODEC(Delta(8), ZSTD(1)), `TraceId` String CODEC(ZSTD(1)), - `SpanId` String CODEC(ZSTD(1)), - `ParentSpanId` String CODEC(ZSTD(1)), - `TraceState` String CODEC(ZSTD(1)), - `SpanName` LowCardinality(String) CODEC(ZSTD(1)), - `SpanKind` LowCardinality(String) CODEC(ZSTD(1)), - `ServiceName` LowCardinality(String) CODEC(ZSTD(1)), - - `ResourceAttributes` Map(LowCardinality(String), - String) CODEC(ZSTD(1)), - + `Resources` String CODEC(ZSTD(1)), `ScopeName` String CODEC(ZSTD(1)), - `ScopeVersion` String CODEC(ZSTD(1)), - - `SpanAttributes` Map(LowCardinality(String), - String) CODEC(ZSTD(1)), - + `Spans` String CODEC(ZSTD(1)), `Duration` Int64 CODEC(ZSTD(1)), - `StatusCode` LowCardinality(String) CODEC(ZSTD(1)), - `StatusMessage` String CODEC(ZSTD(1)), - `Events.Timestamp` Array(DateTime64(9)) CODEC(ZSTD(1)), - `Events.Name` Array(LowCardinality(String)) CODEC(ZSTD(1)), - - `Events.Attributes` Array(Map(LowCardinality(String), - String)) CODEC(ZSTD(1)), - + `Events.Attributes` Array(Map(LowCardinality(String), String)) CODEC(ZSTD(1)), `Links.TraceId` Array(String) CODEC(ZSTD(1)), - `Links.SpanId` Array(String) CODEC(ZSTD(1)), - `Links.TraceState` Array(String) CODEC(ZSTD(1)), - - `Links.Attributes` Array(Map(LowCardinality(String), - String)) CODEC(ZSTD(1)), + `Links.Attributes` Array(Map(LowCardinality(String), String)) CODEC(ZSTD(1)), + + `Resource.service.namespace` String CODEC(ZSTD(1)), + `Resource.service.version` String CODEC(ZSTD(1)), + `Resource.service.instance.id` String CODEC(ZSTD(1)), + + `Attributes.http.status_code` String CODEC(ZSTD(1)), + `Attributes.http.response_content_body` String CODEC(ZSTD(1)), + `Attributes.http.request_content_body` String CODEC(ZSTD(1)), + `Attributes.http.target` String CODEC(ZSTD(1)), + `Attributes.exception.message` String CODEC(ZSTD(1)), + + `ResourceAttributesKeys` Array(String) CODEC(ZSTD(1)), + `ResourceAttributesValues` Array(String) CODEC(ZSTD(1)), + `SpanAttributesKeys` Array(String) CODEC(ZSTD(1)), + `SpanAttributesValues` Array(String) CODEC(ZSTD(1)), INDEX idx_trace_id TraceId TYPE bloom_filter(0.001) GRANULARITY 1, - - INDEX idx_res_attr_key mapKeys(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, - - INDEX idx_res_attr_value mapValues(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, - - INDEX idx_span_attr_key mapKeys(SpanAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, - - INDEX idx_span_attr_value mapValues(SpanAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, - - INDEX idx_duration Duration TYPE minmax GRANULARITY 1 + INDEX idx_trace_servicename ServiceName TYPE bloom_filter(0.001) GRANULARITY 1, + INDEX idx_trace_servicenamespace Resource.service.namespace TYPE bloom_filter(0.001) GRANULARITY 1, + INDEX idx_trace_serviceinstanceid Resource.service.instance.id TYPE bloom_filter(0.001) GRANULARITY 1, + INDEX idx_trace_statuscode Attributes.http.status_code TYPE bloom_filter(0.001) GRANULARITY 1, + + INDEX idx_string_requestbody Attributes.http.request_content_body TYPE tokenbf_v1(30720, 2, 0) GRANULARITY 1, + INDEX idx_string_responsebody Attributes.http.response_content_body TYPE tokenbf_v1(30720, 2, 0) GRANULARITY 1, + INDEX idx_string_exceptionmessage Attributes.exception.message TYPE tokenbf_v1(30720, 2, 0) GRANULARITY 1 ) ENGINE = MergeTree -PARTITION BY toYYYYMM(Timestamp) +PARTITION BY toDate(Timestamp) ORDER BY ( -Timestamp, -ServiceName, - TraceId + Timestamp, + Resource.service.namespace, + ServiceName ) TTL toDateTime(Timestamp) + toIntervalDay(30) SETTINGS index_granularity = 8192, @@ -182,11 +150,38 @@ TTL toDateTime(Timestamp) + toIntervalDay(30) ", $@"CREATE MATERIALIZED VIEW {MasaStackClickhouseConnection.LogTable}_v TO {MasaStackClickhouseConnection.LogTable} AS -SELECT * FROM {MasaStackClickhouseConnection.LogSourceTable}; +SELECT +Timestamp,TraceId,SpanId,TraceFlags,SeverityText,SeverityNumber,ServiceName,Body,ResourceSchemaUrl,toJSONString(ResourceAttributes) as Resources, +ScopeSchemaUrl,ScopeName,ScopeVersion,toJSONString(ScopeAttributes) as Scopes,toJSONString(LogAttributes) as Logs, +ResourceAttributes['service.namespace'] as `Resource.service.namespace`,ResourceAttributes['service.version'] as `Resource.service.version`, +ResourceAttributes['service.instance.id'] as `Resource.service.instance.id`, +LogAttributes['TaskId'] as `Attributes.TaskId`,LogAttributes['exception.message'] as `Attributes.exception.message`, +mapKeys(ResourceAttributes) as ResourceAttributesKeys,mapValues(ResourceAttributes) as ResourceAttributesValues, +mapKeys(LogAttributes) as LogAttributesKeys,mapValues(LogAttributes) as LogAttributesValues +FROM {MasaStackClickhouseConnection.LogSourceTable}; ", $@"CREATE MATERIALIZED VIEW {MasaStackClickhouseConnection.TraceTable}_v TO {MasaStackClickhouseConnection.TraceTable} AS -SELECT * FROM {MasaStackClickhouseConnection.TraceSourceTable}; +SELECT + Timestamp,TraceId,SpanId,ParentSpanId,TraceState,SpanName,SpanKind,ServiceName,toJSONString(ResourceAttributes) AS Resources, + ScopeName,ScopeVersion,toJSONString(SpanAttributes) AS Spans, + Duration,StatusCode,StatusMessage,Events.Timestamp,Events.Name,Events.Attributes, + Links.TraceId,Links.SpanId,Links.TraceState,Links.Attributes, + + ResourceAttributes['service.namespace'] as `Resource.service.namespace`,ResourceAttributes['service.version'] as `Resource.service.version`, + ResourceAttributes['service.instance.id'] as `Resource.service.instance.id`, + + SpanAttributes['http.status_code'] as `Attributes.http.status_code`, + SpanAttributes['http.response_content_body'] as `Attributes.http.response_content_body`, + SpanAttributes['http.request_content_body'] as `Attributes.http.request_content_body`, + SpanAttributes['http.target'] as `Attributes.http.target`, + SpanAttributes['exception.message'] as `Attributes.exception.message`, + + mapKeys(ResourceAttributes) AS ResourceAttributesKeys, + mapValues(ResourceAttributes) AS ResourceAttributesValues, + mapKeys(SpanAttributes) AS SpanAttributesKeys, + mapValues(SpanAttributes) AS SpanAttributesValues +FROM {MasaStackClickhouseConnection.TraceSourceTable}; " }; foreach (var sql in createTableSqls) diff --git a/src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc.Clickhouse/Model/MASAStackClickhouseConnection.cs b/src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc.Clickhouse/Model/MASAStackClickhouseConnection.cs index 4b09f82e6..3a56c084b 100644 --- a/src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc.Clickhouse/Model/MASAStackClickhouseConnection.cs +++ b/src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc.Clickhouse/Model/MASAStackClickhouseConnection.cs @@ -15,34 +15,15 @@ internal class MasaStackClickhouseConnection : ClickHouseConnection public static string MappingTable { get; private set; } - public MasaStackClickhouseConnection(string connection, string? logTable = null, string? traceTable = null) - { - ArgumentNullException.ThrowIfNull(connection); - ConnectionString = connection; - logTable ??= "otel_logs"; - traceTable ??= "otel_traces"; - if (!string.IsNullOrEmpty(ConnectionSettings.Database)) - { - LogTable = $"{ConnectionSettings.Database}.{logTable}"; - TraceTable = $"{ConnectionSettings.Database}.{traceTable}"; - MappingTable = $"{ConnectionSettings.Database}.otel_mapping"; - } - else - { - LogTable = logTable; - TraceTable = traceTable; - MappingTable = "otel_mapping"; - } - } - - public MasaStackClickhouseConnection(string connection, string logTable, string logSourceTable, string traceTable, string traceSourceTable) + public MasaStackClickhouseConnection(string connection, string logTable, string traceTable, string? logSourceTable = null, string? traceSourceTable = null) { ArgumentNullException.ThrowIfNull(connection); ArgumentNullException.ThrowIfNull(logTable); - ArgumentNullException.ThrowIfNull(logSourceTable); ArgumentNullException.ThrowIfNull(traceTable); - ArgumentNullException.ThrowIfNull(traceSourceTable); ConnectionString = connection; + logSourceTable ??= "otel_logs"; + traceSourceTable ??= "otel_traces"; + if (!string.IsNullOrEmpty(ConnectionSettings.Database)) { LogTable = $"{ConnectionSettings.Database}.{logTable}"; diff --git a/src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc.Clickhouse/_Imports.cs b/src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc.Clickhouse/_Imports.cs index fa2b94e53..dd0275a5d 100644 --- a/src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc.Clickhouse/_Imports.cs +++ b/src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc.Clickhouse/_Imports.cs @@ -11,6 +11,7 @@ global using Masa.Utils.Models; global using Microsoft.Extensions.DependencyInjection; global using Microsoft.Extensions.Logging; +global using System.Collections; global using System.Data; global using System.Data.Common; global using System.Text; diff --git a/src/Contrib/StackSdks/Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests/Common.cs b/src/Contrib/StackSdks/Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests/Common.cs index 3432b4123..09fee8546 100644 --- a/src/Contrib/StackSdks/Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests/Common.cs +++ b/src/Contrib/StackSdks/Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests/Common.cs @@ -5,7 +5,7 @@ namespace Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests; internal class Common { - public static void InitTableData(bool isLog) + public static void InitTable(bool isLog) { var name = isLog ? "log" : "trace"; using var connection = new ClickHouseConnection(Consts.ConnectionString); @@ -26,4 +26,19 @@ public static void InitTableData(bool isLog) cmd.ExecuteNonQuery(); } } + + public static void InitTableData(bool isLog) + { + var name = isLog ? "log" : "trace"; + using var connection = new ClickHouseConnection(Consts.ConnectionString); + connection.Open(); + using var cmd = connection.CreateCommand(); + var path = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"Data/otel_{name}_data.txt"); + using (var dataReader = new StreamReader(path)) + { + var sql = dataReader.ReadToEnd(); + cmd.CommandText = sql; + cmd.ExecuteNonQuery(); + } + } } diff --git a/src/Contrib/StackSdks/Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests/Extensions/ServiceExtensitionsTests.cs b/src/Contrib/StackSdks/Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests/Extensions/ServiceExtensitionsTests.cs deleted file mode 100644 index 82ecc5b6e..000000000 --- a/src/Contrib/StackSdks/Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests/Extensions/ServiceExtensitionsTests.cs +++ /dev/null @@ -1,26 +0,0 @@ -// Copyright (c) MASA Stack All rights reserved. -// Licensed under the MIT License. See LICENSE.txt in the project root for license information. - -namespace Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests.Extensions; - -[TestClass] -public class ServiceExtensitionsTests -{ - [TestMethod] - public void AddMASAStackClickhouseTest() - { - var service = new ServiceCollection(); - service.AddMASAStackClickhouse(Consts.ConnectionString); - var logService = service.BuildServiceProvider().GetRequiredService(); - Assert.IsNotNull(logService); - } - - [TestMethod] - public void AddMASAStackClickhouseTestCustomTable() - { - var service = new ServiceCollection(); - service.AddMASAStackClickhouse(Consts.ConnectionString,"otel_log","custom_log","otel_trace","custom_trace"); - var logService = service.BuildServiceProvider().GetRequiredService(); - Assert.IsNotNull(logService); - } -} diff --git a/src/Contrib/StackSdks/Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests/LogServiceTests.cs b/src/Contrib/StackSdks/Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests/LogServiceTests.cs index 2ca6a35af..188235712 100644 --- a/src/Contrib/StackSdks/Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests/LogServiceTests.cs +++ b/src/Contrib/StackSdks/Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests/LogServiceTests.cs @@ -7,27 +7,30 @@ namespace Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests; public class LogServiceTests { private static ILogService logService; + private readonly DateTime startTime= DateTime.Parse("2023-11-02 09:00:00"); [ClassInitialize] public static void Initialized(TestContext testContext) { - Common.InitTableData(true); + Common.InitTable(true); + Common.InitTable(false); var services = new ServiceCollection(); services.AddLogging(builder => builder.AddConsole()); - services.AddMASAStackClickhouse(Consts.ConnectionString); + services.AddMASAStackClickhouse(Consts.ConnectionString, "custom_log", "custom_trace"); + Common.InitTableData(true); logService = services.BuildServiceProvider().GetRequiredService(); } [TestMethod] public async Task QueryListTest() - { - var startTime = DateTime.Parse("2023-11-02 09:00:00"); + { var query = new BaseRequestDto { Page = 1, PageSize = 10, Start = startTime, End = startTime.AddHours(1), + Keyword="Kafka", Conditions = new List { new FieldConditionDto{ Name="Resource.service.name", @@ -64,8 +67,7 @@ public async Task MappingTest() [TestMethod] public async Task AggTest() - { - var startTime = DateTime.Parse("2023-11-02 09:00:00"); + { var request = new SimpleAggregateRequestDto { Name = "Resource.service.name", diff --git a/src/Contrib/StackSdks/Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests/TraceServiceTests.cs b/src/Contrib/StackSdks/Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests/TraceServiceTests.cs index 78711e968..85d1b2e53 100644 --- a/src/Contrib/StackSdks/Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests/TraceServiceTests.cs +++ b/src/Contrib/StackSdks/Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests/TraceServiceTests.cs @@ -7,21 +7,21 @@ namespace Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests; public class TraceServiceTests { private static ITraceService traceService; + private readonly DateTime startTime = DateTime.Parse("2023-11-02 09:00:00"); [ClassInitialize] public static void Initialized(TestContext testContext) { - Common.InitTableData(false); var services = new ServiceCollection(); services.AddLogging(builder=>builder.AddConsole()); - services.AddMASAStackClickhouse(Consts.ConnectionString); + services.AddMASAStackClickhouse(Consts.ConnectionString,"custom_log", "custom_trace"); + Common.InitTableData(false); traceService = services.BuildServiceProvider().GetRequiredService(); } [TestMethod] public async Task QueryListTest() - { - var startTime = DateTime.Parse("2023-11-02 09:00:00"); + { var query = new BaseRequestDto { Page = 1, @@ -43,7 +43,6 @@ public async Task TraceIdTest() [TestMethod] public async Task AggTest() { - var startTime = DateTime.Parse("2023-11-02 09:00:00"); var request = new SimpleAggregateRequestDto { Name = "Resource.service.name",