diff --git a/app/app/application/base.py b/app/app/application/base.py index 0f7bafa..f5197da 100644 --- a/app/app/application/base.py +++ b/app/app/application/base.py @@ -58,7 +58,7 @@ def complete_app_span(self, app_spans): for tag_str in [ "x_request_id_0", "x_request_id_1", "auto_instance_0", "auto_instance_1", "subnet_0", "app_service", - "_tsdb_region_name", "process_kname_0", + "_querier_region", "process_kname_0", "http_proxy_client", "auto_instance_1_node_type", "app_instance", "response_exception", "version", "l7_protocol_str", "auto_instance_0_node_type", diff --git a/app/app/application/l7_flow_tracing.py b/app/app/application/l7_flow_tracing.py index 48c0482..d806a41 100644 --- a/app/app/application/l7_flow_tracing.py +++ b/app/app/application/l7_flow_tracing.py @@ -1886,6 +1886,8 @@ def sort_all_flows(dataframe_flows: DataFrame, network_delay_us: int, if merge_flow(flows, flow): # 合并单向Flow为会话 continue flow['_index'] = len(flows) # assert '_index' not in flow + flow['_querier_region'] = dict_flows['_querier_region'][ + index] # set _querier_region for multi-region flows.append(flow) # 注意:不要对 flows 再做排序,下面的代码会通过 flows[flow_index] 来反查 flow @@ -2798,7 +2800,9 @@ def _get_flow_dict(flow: DataFrame): "tap_id": flow.get("tap_id", None), "tap": - flow.get("tap", None) + flow.get("tap", None), + "_querier_region": + flow.get("_querier_region", None) } if flow["signal_source"] == L7_FLOW_SIGNAL_SOURCE_EBPF: flow_dict["subnet"] = flow.get("subnet") diff --git a/app/app/data/querier_client.py b/app/app/data/querier_client.py index 62efbe0..2691fc6 100644 --- a/app/app/data/querier_client.py +++ b/app/app/data/querier_client.py @@ -76,7 +76,7 @@ async def exec(self): 'columns'): result_df = self.to_dataframe(result_dict) if self.region is not None: - result_df['_tsdb_region_name'] = self.region + result_df['_querier_region'] = self.region if self.query_id is not None: result_df['query_id'] = self.query_id if status != 200: