diff --git a/python/knext/knext/examples/finance/builder/job/data/Indicator.csv b/python/knext/knext/examples/finance/builder/job/data/Indicator.csv deleted file mode 100644 index 7e401fe05..000000000 --- a/python/knext/knext/examples/finance/builder/job/data/Indicator.csv +++ /dev/null @@ -1,14 +0,0 @@ -id -财政收入质量 -财政自给能力 -土地出让收入 -一般公共预算收入 -留抵退税 -税收收入 -税收收入/一般公共预算收入 -一般公共预算支出 -财政自给率 -政府性基金收入 -转移性收入 -综合财力 - diff --git a/python/knext/knext/examples/finance/builder/job/data/document.csv b/python/knext/knext/examples/finance/builder/job/data/document.csv index 8b61f4304..07c072cc5 100644 --- a/python/knext/knext/examples/finance/builder/job/data/document.csv +++ b/python/knext/knext/examples/finance/builder/job/data/document.csv @@ -1,2 +1,3 @@ input -济南市财政收入质量及自给能力均较好,但土地出让收入大幅下降致综合财力明显下滑。济南市财政收入质量及自给能力均较好,但土地出让收入大幅下降致综合财力明显下滑。2022年济南市一般公共预算收入1,000.21亿元,扣除留 抵退税因素后同比增长8%,规模在山东省下辖地市中排名第2位;其中税收收入690.31亿元,税收占比69.02%;一般公共 预算支出1,260.23亿元,财政自给率79.37%。政府性基金收入547.29亿元,同比大幅下降48.38%,主要系土地出让收入 同比由966.74亿元降至453.74亿元;转移性收入285.78亿元(上年同期为233.11亿元);综合财力约1,833.28亿元(上年 同期为2,301.02亿元)。 \ No newline at end of file +济南市财政收入质量及自给能力均较好,但土地出让收入大幅下降致综合财力明显下滑。济南市财政收入质量及自给能力均较好,但土地出让收入大幅下降致综合财力明显下滑。2022年济南市一般公共预算收入1,000.21亿元,扣除留 抵退税因素后同比增长8%,规模在山东省下辖地市中排名第2位;其中税收收入690.31亿元,税收占比69.02%;一般公共 预算支出1,260.23亿元,财政自给率79.37%。政府性基金收入547.29亿元,同比大幅下降48.38%,主要系土地出让收入 同比由966.74亿元降至453.74亿元;转移性收入285.78亿元(上年同期为233.11亿元);综合财力约1,833.28亿元(上年 同期为2,301.02亿元)。 +2022年,全国一般公共预算收入203703亿元,比上年增长0.6%,扣除留抵退税因素后增长9.1%。其中,中央一般公共预算收入94885亿元,比上年增长3.8%,扣除留抵退税因素后增长13.1%;地方一般公共预算本级收入108818亿元,比上年下降2.1%,扣除留抵退税因素后增长5.9%。全国税收收入166614亿元,比上年下降3.5%,扣除留抵退税因素后增长6.6%;非税收入37089亿元,比上年增长24.4%。 \ No newline at end of file diff --git a/python/knext/knext/examples/finance/builder/job/data/indicators.csv b/python/knext/knext/examples/finance/builder/job/data/indicators.csv new file mode 100644 index 000000000..9ff665216 --- /dev/null +++ b/python/knext/knext/examples/finance/builder/job/data/indicators.csv @@ -0,0 +1,153 @@ +id +债券总发行量 +上级补助支出 +进出口总额 +网络零售额 +土地出让总价 +工业投资 +工业用地出让面积 +一般公共预算收入 +三次产业增加值结构 +一般债务还本支出 +制造业投资 +信贷规模 +一级平台 +专项债务余额 +刚性支出 +土地增值税 +商办用地出让总价 +地方政府债务 +房地产开发投资增速 +国有资源(资产)有偿使用收入 +非标逾期 +商品房销售面积 +商办用地出让面积 +商品房销售额 +城投有息债务 +企业所得税 +第一产业投资 +固定资产投资增速 +住宅销售面积 +债券偿还量 +服务业投资 +第一产业增加值 +工业增加值 +平台有息债务 +出口总额 +卫生健康支出 +民间投资 +交通运输投资 +水利设施投资 +专项债务限额 +存续债券到期规模 +农林水支出 +城乡社区支出 +净融资 +债券兑付规模 +债务限额 +一般公共预算自给率 +增值税留抵退税 +住宅用地出让总价 +金融机构人民币存款余额 +工业生产性投资 +三次产业结构 +本级存续发债城投平台 +就业支出 +第二产业增加值 +土地成交均价 +政府债务还本支出 +本级净融资 +规模以上工业增加值 +有息债务 +非税收入 +本级债券发行量 +房屋施工面积 +一般公共服务支出 +高新技术产业投资 +社会消费品零售总额 +涉宅用地成交均价 +契税 +债务和融资 +平台票据逾期 +住宅用地成交均价 +政府性基金预算收入 +一般债务余额 +经济总量 +科研经费支出 +一般性项目支出 +固定资产投资 +住宅用地出让面积 +存续债券余额 +政府性基金收入 +医疗卫生支出 +城市维护建设税 +生态环境投资 +一般债务限额 +进口总额 +上级补助收入 +基金转移收入 +耕地占用税 +重工业增加值 +住宅投资 +规上工业企业总产值 +转移性收入 +专项债务还本支出 +住宅销售额 +公共设施投资 +债券余额 +财政刚性支出 +债券发行量 +房产税 +本级债券偿还量 +综合财力 +存续发债城投平台 +第三产业增加值 +教育支出 +社会保障支出 +财政支出 +债务率 +本级存续债券余额 +债券净融资规模 +第二产业投资 +GDP +广义债务 +住宅施工面积 +土地出让面积 +所得税 +留抵退税 +城投平台 +土地开发资金收入 +一般公共预算支出 +基础设施投资 +经济 +增值税 +财政自给率 +投资存续债 +债券到期规模 +逾期发生额 +清理入库收入 +规上工业亩均税收 +个人所得税 +涉宅用地成交面积 +科技支出 +存续发债城投有息债务 +城镇土地使用税 +平台发债规模 +第三产业投资 +规模以上高新技术产业增加值 +财政 +其他政府性基金收入 +涉宅用地成交总价 +房地产投资 +三次产业投资结构 +公共安全支出 +旅游总收入 +已发债的投融资主体 +逾期余额 +土地出让收入 +本级有息债务 +涉地税 +综合用地(含住宅)出让总价 +轻工业增加值 +税收收入 +社会保障和就业支出 diff --git a/python/knext/knext/examples/finance/builder/job/indicator.py b/python/knext/knext/examples/finance/builder/job/indicator.py index 44d889a23..656f4c62d 100644 --- a/python/knext/knext/examples/finance/builder/job/indicator.py +++ b/python/knext/knext/examples/finance/builder/job/indicator.py @@ -17,7 +17,7 @@ class Indicator(BuilderJob): def build(self): source = CSVReader( - local_path="./builder/job/data/Indicator.csv", + local_path="./builder/job/data/indicators.csv", columns=["id"], start_row=2, ) diff --git a/python/knext/knext/examples/finance/builder/job/indicator_event.py b/python/knext/knext/examples/finance/builder/job/indicator_event.py new file mode 100644 index 000000000..ffcf97c29 --- /dev/null +++ b/python/knext/knext/examples/finance/builder/job/indicator_event.py @@ -0,0 +1,53 @@ +# -*- coding: utf-8 -*- +# Copyright 2023 Ant Group CO., Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. + + +from schema.finance_schema_helper import Finance + +from knext.api.component import ( + CSVReader, + UserDefinedExtractor, + KGWriter, + SPGTypeMapping, +) +from knext.client.model.builder_job import BuilderJob +from nn4k.invoker import NNInvoker + +from builder.operator.indicator_extract import IndicatorExtractOp +from builder.operator.event_extract import EventExtractOp + + +class IndicatorEvent(BuilderJob): + def build(self): + source = CSVReader( + local_path="builder/job/data/document.csv", columns=["input"], start_row=2 + ) + + extract = UserDefinedExtractor( + extract_op=EventExtractOp( + params={"config": "builder/model/openai_infer.json"} + ), + ) + + event_mapping = ( + SPGTypeMapping(spg_type_name=Finance.IndicatorEvent) + .add_property_mapping("id", Finance.IndicatorEvent.id) + .add_property_mapping("id", Finance.IndicatorEvent.name) + .add_property_mapping("subject", Finance.IndicatorEvent.subject) + .add_property_mapping("value", Finance.IndicatorEvent.value) + .add_property_mapping("trend", Finance.IndicatorEvent.trend) + .add_property_mapping("date", Finance.IndicatorEvent.date) + ) + + sink = KGWriter() + + return source >> extract >> [event_mapping] >> sink diff --git a/python/knext/knext/examples/finance/builder/model/openai_infer.json b/python/knext/knext/examples/finance/builder/model/openai_infer.json index 4eee4404a..ed37e7621 100644 --- a/python/knext/knext/examples/finance/builder/model/openai_infer.json +++ b/python/knext/knext/examples/finance/builder/model/openai_infer.json @@ -1,7 +1,6 @@ { - "invoker_type": "OpenAI", - "openai_api_key": "EMPTY", - "openai_api_base": "http://127.0.0.1:38080/v1", - "openai_model_name": "gpt-3.5-turbo", - "openai_max_tokens": 2000 -} \ No newline at end of file + "nn_name": "gpt-4", + "openai_api_key": "EMPTY", + "openai_api_base": "http://127.0.0.1:38080/v1", + "openai_max_tokens": 2000 +} diff --git a/python/knext/knext/examples/finance/builder/operator/event_extract.py b/python/knext/knext/examples/finance/builder/operator/event_extract.py new file mode 100644 index 000000000..a7b5cb1cb --- /dev/null +++ b/python/knext/knext/examples/finance/builder/operator/event_extract.py @@ -0,0 +1,38 @@ +# -*- coding: utf-8 -*- + +# Copyright 2023 Ant Group CO., Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. +import requests +from typing import List, Dict +from knext.api.operator import ExtractOp +from knext.api.record import SPGRecord +from nn4k.invoker import NNInvoker + + +class EventExtractOp(ExtractOp): + def __init__(self, params: Dict[str, str] = None): + super().__init__(params) + # Address for LLM service + self.config = params["config"] + self.invoker = NNInvoker.from_config(self.config) + from builder.operator.prompts import EventExtractPrompt + + self.prompt_op = EventExtractPrompt() + + def generate(self, input_data): + return self.invoker.remote_inference(input_data)[0] + + def invoke(self, record: Dict[str, str]) -> List[SPGRecord]: + # Building LLM inputs with IndicatorNERPrompt + input_data = self.prompt_op.build_prompt(record) + output = self.generate(input_data) + records = self.prompt_op.parse_response(output) + return records diff --git a/python/knext/knext/examples/finance/builder/operator/indicator_link.py b/python/knext/knext/examples/finance/builder/operator/indicator_link.py index 78d0b0f52..aba64f000 100644 --- a/python/knext/knext/examples/finance/builder/operator/indicator_link.py +++ b/python/knext/knext/examples/finance/builder/operator/indicator_link.py @@ -17,6 +17,7 @@ from knext.api.client import SearchClient from schema.finance_schema_helper import Finance +from nn4k.invoker import NNInvoker class IndicatorLinkOp(LinkOp): @@ -29,29 +30,17 @@ def __init__(self): self.prompt_op = IndicatorLinkPrompt() self.search_client = SearchClient(self.bind_to) - def generate(self, input_data): - req = { - "input": input_data, - "max_input_len": 1024, - "max_output_len": 1024, - } - url = "http://localhost:9999/generate" - try: - rsp = requests.post(url, req) - rsp.raise_for_status() - return rsp.json() - except Exception as e: - return {"output": ""} - def invoke(self, property: str, subject_record: SPGRecord) -> List[SPGRecord]: # Retrieve relevant indicators from KG based on indicator name name = property - recall_records = self.search_client.fuzzy_search_by_property(property, name) - # Reranking the realled records with LLM to get final linking result - data = { - "input": name, - "candidates": [x.properties["name"] for x in recall_records], - } - link_input = self.prompt_op.build_prompt(data) - link_result = self.generate(link_input) - return self.prompt_op.parse_response(link_result) + recall_records = self.search_client.fuzzy_search_by_property( + property, "name", size=3 + ) + if len(recall_records) == 0: + print("no indicators recalled") + tmp = SPGRecord("Finance.Indicator") + tmp.upsert_property("id", property) + tmp.upsert_property("name", name) + return [tmp] + print(f"recalled indicators: {recall_records}") + return recall_records diff --git a/python/knext/knext/examples/finance/builder/operator/prompts.py b/python/knext/knext/examples/finance/builder/operator/prompts.py index 01cd1d08b..abbf9a4f7 100644 --- a/python/knext/knext/examples/finance/builder/operator/prompts.py +++ b/python/knext/knext/examples/finance/builder/operator/prompts.py @@ -10,6 +10,8 @@ # Unless required by applicable law or agreed to in writing, software distributed under the License # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express # or implied. +import copy +import json import numpy as np from typing import List, Dict from knext.api.operator import PromptOp @@ -65,14 +67,8 @@ def parse_response(self, response: str) -> List[SPGRecord]: class IndicatorLinkPrompt(PromptOp): template = """ -判断在指标列表{candidates}中,有无与指标{input}相同的指标名称,如果有,则返回相同指标名称, -没有则返回空字符串。 -##### -输出格式: -{{"same_indicator": "XXX"}} -##### -文本: -{input} +判断在指标列表{candidates}中,有无与指标{input}相同的指标名称,如果有,则依照如下json格式 +{{"same_indicator": "XXX"}}返回相同指标名称,没有则返回空字符串。注意返回结果一定要是可解析的json串。 """ def build_prompt(self, variables: Dict[str, str]): @@ -82,7 +78,19 @@ def build_prompt(self, variables: Dict[str, str]): ) def parse_response(self, response: str) -> List[SPGRecord]: - return get_mock_spg_records(1) + try: + tmp = json.loads(response) + except Exception as e: + print(f"failed to load {response}, info: {e}") + return [] + print("IndicatorLink parse_response: ", tmp) + linked_indicator = tmp.get("same_indicator", "") + if len(linked_indicator) > 0: + output = SPGRecord("Finance.Indicator") + output.upsert_property("id", linked_indicator) + output.upsert_property("name", linked_indicator) + return [output] + return [] class IndicatorFusePrompt(PromptOp): @@ -123,3 +131,62 @@ def build_prompt(self, variables: Dict[str, str]): def parse_response(self, response: str) -> List[SPGRecord]: return get_mock_spg_records(5) + + +class EventExtractPrompt(PromptOp): + template = { + "input": "", + "instruction": '你是专门进行事件提取的专家。请从input中抽取出符合schema定义的事件,不存在的事件返回空列表,不存在的论元返回空。请按照JSON字符串的格式回答。输出格式为:{"event":[{"event_type":,"arguments":{"":,},}]}', + "schema": [ + { + "arguments": [ + "date", + "location", + "indicator", + "value", + "trend", + ], + "event_type": "区域指标事件", + } + ], + "schema description": { + "date": "类型为时间,年/月/日", + "location": "类型为文本。指的是经济指标事件的范围,如全国、成都市、上海市等", + "indicator": "类型为文本,是用于衡量地区经济状况的数据指标,如土地出让收入,一般公共预算收入", + "value": "类型为数字, 代表指标名的数值,如200亿元", + "trend": "类型为文本,代表指标名的变化趋势,如增长,下降,大幅降低,请不要包含具体数值", + }, + } + + def build_prompt(self, variables: Dict[str, str]): + tmp = copy.deepcopy(self.template) + tmp["input"] = variables["input"] + return json.dumps(tmp) + + def parse_response(self, response: str) -> List[SPGRecord]: + try: + records = json.loads(response) + except Exception as e: + print(f"failed to load {response}, info: {e}") + return [] + + output = [] + for record in records["event"]: + print(f"extracted events: {record}") + indicator = record["arguments"].get("indicator", "") + trend = record["arguments"].get("trend", "") + name = f"{indicator}{trend}" + if len(name) == 0: + continue + tmp = SPGRecord("Finance.IndicatorEvent") + tmp.upsert_property("id", name) + tmp.upsert_property("name", name) + subject = record["arguments"].get("indicator", "") + if len(subject) == 0: + subject = name + tmp.upsert_property("subject", subject) + tmp.upsert_property("value", record["arguments"].get("value", "")) + tmp.upsert_property("date", record["arguments"].get("date", "")) + tmp.upsert_property("trend", record["arguments"].get("trend", "")) + output.append(tmp) + return output diff --git a/python/knext/knext/examples/finance/schema/finance.schema b/python/knext/knext/examples/finance/schema/finance.schema index 5ddff0b9c..c28d9d70d 100644 --- a/python/knext/knext/examples/finance/schema/finance.schema +++ b/python/knext/knext/examples/finance/schema/finance.schema @@ -2,3 +2,15 @@ namespace Finance Indicator(指标概念): ConceptType hypernymPredicate: isA + +IndicatorEvent(经济指标事件): EventType + desc: 指的是特定地区经济状况和发展水平的数据指标的相关事件 + properties: + subject(指标名): Indicator + desc: 是用于衡量特定地区经济状况和发展水平的一系列数据和指标,比如GDP、 + constraint: MultiValue + value(指标值): Text + desc: 代表指标名的数值 + trend(指标趋势): Text + desc: 代表指标名的变化趋势,比如上涨、下跌之类 + date(指标日期): STD.Date \ No newline at end of file