Skip to content

Commit

Permalink
fix(builder): add embedding model (#385)
Browse files Browse the repository at this point in the history
  • Loading branch information
andylau-55 authored Nov 1, 2024
1 parent 3f863f8 commit 0e687f7
Show file tree
Hide file tree
Showing 19 changed files with 177 additions and 163 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ public List<SubGraphRecord> call() throws Exception {
log.info("LLMNlExtractProcessor invoke Chunks: {}", names);
List<Object> result =
(List<Object>) operatorFactory.invoke(config.getOperatorConfig(), record);
log.info("LLMNlExtractProcessor invoke result: {}", JSON.toJSONString(result));
List<SubGraphRecord> records =
JSON.parseObject(JSON.toJSONString(result), new TypeReference<List<SubGraphRecord>>() {});
node.addTraceLog(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,12 @@ public List<BaseRecord> process(List<BaseRecord> inputs) {
node.addTraceLog("Start vectorizer processor...");
List<BaseRecord> results = new ArrayList<>();
SubGraphRecord subGraph = new SubGraphRecord(Lists.newArrayList(), Lists.newArrayList());
SubGraphRecord outputs = new SubGraphRecord(Lists.newArrayList(), Lists.newArrayList());

for (BaseRecord record : inputs) {
SubGraphRecord spgRecord = (SubGraphRecord) record;
outputs.getResultNodes().addAll(spgRecord.getResultNodes());
outputs.getResultEdges().addAll(spgRecord.getResultEdges());
Map map = mapper.convertValue(spgRecord, Map.class);
node.addTraceLog(
"invoke vectorizer processor operator:%s", config.getOperatorConfig().getClassName());
Expand All @@ -69,7 +72,7 @@ public List<BaseRecord> process(List<BaseRecord> inputs) {
}
results.add(subGraph);
node.addTraceLog("post vectorizer complete...");
node.setOutputs(subGraph);
node.setOutputs(outputs);
node.setStatus(StatusEnum.FINISH);
return results;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import com.antgroup.openspg.cloudext.interfaces.graphstore.model.lpg.record.LPGPropertyRecord;
import com.antgroup.openspg.cloudext.interfaces.graphstore.model.lpg.record.VertexRecord;
import com.antgroup.openspg.cloudext.interfaces.graphstore.model.lpg.schema.EdgeTypeName;
import com.antgroup.openspg.reasoner.runner.local.impl.LocalRunnerThreadPool;
import com.antgroup.openspg.server.common.model.project.Project;
import com.google.common.collect.Lists;
import java.util.ArrayList;
Expand All @@ -39,16 +38,23 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class Neo4jSinkWriter extends BaseSinkWriter<Neo4jSinkNodeConfig> {

private static final int NUM_THREADS = 10;

private ExecuteNode node;
private Neo4jStoreClient client;
private Project project;
private static final String DOT = ".";
ExecutorService nodeExecutor;
ExecutorService edgeExecutor;

public Neo4jSinkWriter(String id, String name, Neo4jSinkNodeConfig config) {
super(id, name, config);
Expand All @@ -63,6 +69,22 @@ public void doInit(BuilderContext context) throws BuilderException {
}
client = new Neo4jStoreClient(context.getGraphStoreUrl());
project = JSON.parseObject(context.getProject(), Project.class);
nodeExecutor =
new ThreadPoolExecutor(
NUM_THREADS,
NUM_THREADS,
2 * 60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new ThreadPoolExecutor.CallerRunsPolicy());
edgeExecutor =
new ThreadPoolExecutor(
NUM_THREADS,
NUM_THREADS,
2 * 60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new ThreadPoolExecutor.CallerRunsPolicy());
}

@Override
Expand Down Expand Up @@ -90,8 +112,6 @@ private void batchWriteToNeo4j(List<BaseRecord> records) {
public void writeToNeo4j(SubGraphRecord subGraphRecord) {
subGraphRecord.getResultNodes().forEach(node -> convertProperties(node.getProperties()));
subGraphRecord.getResultEdges().forEach(edge -> convertProperties(edge.getProperties()));

ExecutorService nodeExecutor = LocalRunnerThreadPool.getThreadPoolExecutor(null);
try {
node.addTraceLog("Start Writer Nodes processor...");
List<Future<Void>> nodeFutures =
Expand All @@ -102,8 +122,6 @@ public void writeToNeo4j(SubGraphRecord subGraphRecord) {
Thread.currentThread().interrupt();
throw new RuntimeException("Error during node upsert", e);
}

ExecutorService edgeExecutor = LocalRunnerThreadPool.getThreadPoolExecutor(null);
try {
node.addTraceLog("Start Writer Edges processor...");
List<Future<Void>> edgeFutures =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@

import com.google.common.collect.Maps;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.neo4j.driver.AuthTokens;
import org.neo4j.driver.Config;
import org.neo4j.driver.Driver;
import org.neo4j.driver.GraphDatabase;

Expand All @@ -32,7 +34,14 @@ public static Driver getNeo4jDriver(String uri, String user, String password) {
if (instanceMap.get(uniqueKey) == null) {
Driver driver;
try {
driver = GraphDatabase.driver(uri, AuthTokens.basic(user, password));
Config config =
Config.builder()
.withMaxConnectionPoolSize(200)
.withMaxConnectionLifetime(2, TimeUnit.HOURS)
.withMaxTransactionRetryTime(300, TimeUnit.SECONDS)
.withConnectionAcquisitionTimeout(300, TimeUnit.SECONDS)
.build();
driver = GraphDatabase.driver(uri, AuthTokens.basic(user, password), config);
driver.verifyConnectivity();
} catch (Exception e) {
throw new RuntimeException("init Neo4j Client failed :" + uri, e);
Expand Down
1 change: 1 addition & 0 deletions dev/release/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
version: "3.7"
services:
server:
restart: always
Expand Down
4 changes: 1 addition & 3 deletions dev/release/mysql/buildx-release-mysql.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
# or implied.

docker buildx build -f Dockerfile --platform linux/arm64/v8,linux/amd64 --push \
-t spg-registry.cn-hangzhou.cr.aliyuncs.com/spg/openspg-mysql:0.0.3 \
-t spg-registry.cn-hangzhou.cr.aliyuncs.com/spg/openspg-mysql:latest \
-t openspg/openspg-mysql:0.0.3 \
-t openspg/openspg-mysql:0.5 \
-t openspg/openspg-mysql:latest \
.
1 change: 0 additions & 1 deletion dev/release/mysql/sql/initdb.sql
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,6 @@ CREATE TABLE `kg_ontology_ext` (
) AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8mb4 COMMENT = 'schema的扩展属性';

INSERT INTO kg_biz_domain (`id`,`gmt_create`,`gmt_modified`,`name`,`status`,`description`,`global_config`) VALUES(1,'2023-09-01 00:00:00','2023-09-01 00:00:00','defaultTenant','VALID','',null);
INSERT INTO kg_project_info (`id`,`name`,`description`,`status`,`gmt_create`,`gmt_modified`,`namespace`,`biz_domain_id`) VALUES(1,'defaultProject','defaultProject','VALID','2023-09-01 00:00:00','2023-09-01 00:00:00','DEFAULT',1);

INSERT INTO kg_ontology_entity (`id`,`original_id`,`name`,`name_zh`,`entity_category`,`layer`,`description`,`description_zh`,`status`,`with_index`,`scope`,`version`,`version_status`,`gmt_create`,`gmt_modified`,`transformer_id`,`operator_config`,`config`,`unique_name`) VALUES(1,1,'Thing','事物','ADVANCED','EXTENSION','Base class for all schema types, all of which inherit the type either directly or indirectly','所有schema类型的基类,它们都直接或者间接继承该类型','1','TRUE','PUBLIC',44,'ONLINE','2023-09-01 00:00:00','2023-09-01 00:00:00',0,null,null,'Thing');
INSERT INTO kg_ontology_entity (`id`,`original_id`,`name`,`name_zh`,`entity_category`,`layer`,`description`,`description_zh`,`status`,`with_index`,`scope`,`version`,`version_status`,`gmt_create`,`gmt_modified`,`transformer_id`,`operator_config`,`config`,`unique_name`) VALUES(2,2,'Text','文本','BASIC','CORE','文本','基本数据类型-文本','1','TRUE','PUBLIC',0,'ONLINE','2023-09-01 00:00:00','2023-09-01 00:00:00',0,null,'{"constrains":[{"id":"REQUIRE","name":"Required","nameZh":"值非空","value":null},{"id":"UNIQUE","name":"Unique","nameZh":"值唯一","value":null},{"id":"ENUM","name":"Enum","nameZh":"枚举","value":null},{"id":"MULTIVALUE","name":"Multi value","nameZh":"多值","value":null},{"id":"REGULAR","name":"Regular match","nameZh":"正则匹配","value":null}]}','Text');
Expand Down
7 changes: 2 additions & 5 deletions dev/release/python/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,10 @@ RUN if [ "${TARGETPLATFORM}" = "linux/amd64" ]; then \
RUN python3 -m venv /openspg_venv && \
. /openspg_venv/bin/activate && \
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-$(dpkg --print-architecture) && \
pip3 install openspg-kag==0.0.3.20241022.2 -i https://artifacts.antgroup-inc.cn/artifact/repositories/simple-dev/ && \
pip3 install openspg-kag==0.5.1 && \
pip3 install pemja==0.4.0 && \
pip3 install -U "http://alps-common.oss-cn-hangzhou-zmf.aliyuncs.com/nscommon/shiji/nscommon-0.0.1.tar.gz" &&\
echo "if (tty -s); then \n . /openspg_venv/bin/activate \nfi" >> ~/.bashrc

ADD openspg/dev/release/python/lib/builder*.jar /openspg_venv/lib/python3.8/site-packages/knext/builder/lib
ADD openspg/dev/release/python/lib/reasoner*.jar /openspg_venv/lib/python3.8/site-packages/knext/reasoner/lib

COPY openspg/ /openspg
#RUN git clone --depth=1 https://github.com/OpenSPG/openspg.git
RUN git clone --depth=1 https://github.com/OpenSPG/KAG.git
18 changes: 16 additions & 2 deletions dev/release/python/build-release-python-aliyun.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
# or implied.

IMAGE="spg-registry.cn-hangzhou.cr.aliyuncs.com/spg/openspg-python"
VERSION="kag1"
VERSION="0.5"
cd ../../../../
docker build -f openspg/dev/release/python/Dockerfile --platform linux/arm64/v8 --push \
-t ${IMAGE}:${VERSION}-arm64 \
Expand All @@ -31,4 +31,18 @@ docker manifest create \
${IMAGE}:${VERSION}-amd64 \
${IMAGE}:${VERSION}-arm64

docker manifest push ${IMAGE}:${VERSION}
docker manifest push ${IMAGE}:${VERSION}

if docker manifest inspect ${IMAGE}:${LATEST} &> /dev/null; then
echo "Manifest already exists, removing it..."
docker manifest rm ${IMAGE}:${LATEST}
else
echo "Manifest does not exist, proceeding with creation and push."
fi

docker manifest create \
${IMAGE}:${LATEST} \
${IMAGE}:${VERSION}-amd64 \
${IMAGE}:${VERSION}-arm64

docker manifest push ${IMAGE}:${LATEST}
4 changes: 1 addition & 3 deletions dev/release/python/build-release-python.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@

# for amd64
docker build -f Dockerfile --platform linux/amd64 --push \
-t spg-registry.cn-hangzhou.cr.aliyuncs.com/spg/openspg-python:0.0.3 \
-t spg-registry.cn-hangzhou.cr.aliyuncs.com/spg/openspg-python:latest \
-t openspg/openspg-python:0.0.3 \
-t openspg/openspg-python:0.5 \
-t openspg/openspg-python:latest \
.
4 changes: 1 addition & 3 deletions dev/release/server/buildx-release-server.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
# or implied.

docker buildx build -f Dockerfile --platform linux/arm64/v8,linux/amd64 --push \
-t spg-registry.cn-hangzhou.cr.aliyuncs.com/spg/openspg-server:0.0.3 \
-t spg-registry.cn-hangzhou.cr.aliyuncs.com/spg/openspg-server:latest \
-t openspg/openspg-server:0.0.3 \
-t openspg/openspg-server:0.5 \
-t openspg/openspg-server:latest \
.
1 change: 1 addition & 0 deletions dev/test/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
version: "3.7"
services:
mysql:
restart: always
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -169,23 +169,6 @@ public void testSubStr() {
Assert.assertEquals(rst2, "岁");
}

@Test
public void testSplitPart() {
UdfMng mng = UdfMngFactory.getUdfMng();
IUdfMeta udfMeta =
mng.getUdfMeta(
"split_part",
Lists.newArrayList(KTString$.MODULE$, KTString$.MODULE$, KTInteger$.MODULE$));
Object rst1 = udfMeta.invoke("Hello,World!", ",", 0);
Assert.assertEquals("Hello", rst1);
Object rst2 = udfMeta.invoke("Hello,Ni,Hao", ",", -1);
Assert.assertEquals("Hao", rst2);
Object rst3 = udfMeta.invoke("A省B市C村XXX", "村", 0);
Assert.assertEquals("A省B市C", rst3);
Object rst4 = udfMeta.invoke("A省B市C村XXX", "村", 5);
Assert.assertEquals("", rst4);
}

@Test
public void testCast() {
UdfMng mng = UdfMngFactory.getUdfMng();
Expand Down
Loading

0 comments on commit 0e687f7

Please sign in to comment.