Skip to content

Commit

Permalink
refactor(knext): adjust directory structures (#102)
Browse files Browse the repository at this point in the history
Co-authored-by: baifuyu <[email protected]>
  • Loading branch information
J1ers and baifuyu authored Jan 23, 2024
1 parent 2e39a43 commit 9d9f6c4
Show file tree
Hide file tree
Showing 267 changed files with 1,205 additions and 6,879 deletions.
24 changes: 0 additions & 24 deletions python/knext/knext/api/client.py

This file was deleted.

File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied.
from knext import rest
from knext.chain.base import Chain
from knext.builder import rest
from knext.common.base.chain import Chain


class BuilderChain(Chain):
Expand All @@ -23,7 +23,7 @@ def output_types(self):
return None

def invoke(self, **kwargs):
from knext.client.builder import BuilderClient
from knext.builder.client import BuilderClient

client = BuilderClient()
client.execute(self, **kwargs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
import os
import sys

from knext import rest
from knext.chain.builder_chain import BuilderChain
from knext.client.base import Client
from knext.client.model.builder_job import BuilderJob, AlterOperationEnum
from knext.builder import rest
from knext.builder.chain import BuilderChain
from knext.common.base.client import Client
from knext.builder.model.builder_job import BuilderJob, AlterOperationEnum
from knext.common.class_register import register_from_package


Expand All @@ -34,31 +34,11 @@ def __init__(self, host_addr: str = None, project_id: int = None):
)
register_from_package(self._builder_job_path, BuilderJob)

def submit(self, job_name: str):
"""Submit an asynchronous builder job to the server by name."""
job = BuilderJob.by_name(job_name)()
builder_chain = BuilderChain.from_chain(job.build())
dag_config = builder_chain.to_rest()

params = {
param: getattr(job, param)
for param in job.__annotations__
if hasattr(job, param) and not param.startswith("_")
}
request = rest.BuilderJobSubmitRequest(
job_name=job.name,
project_id=self._project_id,
pipeline=dag_config,
params=params,
)
return self._rest_client.builder_submit_job_info_post(
builder_job_submit_request=request
)

def execute(self, builder_chain: BuilderChain, **kwargs):
import subprocess
import datetime
from knext import lib
from knext.builder import lib
from knext.common import env

jar_path = os.path.join(lib.__path__[0], lib.LOCAL_BUILDER_JAR)
dag_config = builder_chain.to_rest()
Expand All @@ -80,7 +60,7 @@ def execute(self, builder_chain: BuilderChain, **kwargs):
"--pythonPaths",
";".join(sys.path),
"--schemaUrl",
os.environ.get("KNEXT_HOST_ADDR") or lib.LOCAL_SCHEMA_URL,
os.environ.get("KNEXT_HOST_ADDR") or env.LOCAL_SCHEMA_URL,
"--parallelism",
str(kwargs.get("parallelism", "1")),
"--alterOperation",
Expand All @@ -106,7 +86,3 @@ def execute(self, builder_chain: BuilderChain, **kwargs):
print(json.dumps(" ".join(print_java_cmd))[1:-1].replace("'", '"'))

subprocess.call(java_cmd)

def query(self, job_inst_id: int):
"""Query status of a submitted builder job by job inst id."""
return self._rest_client.builder_query_job_inst_get(job_inst_id=job_inst_id)
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied.

from knext.component.builder.source_reader import CSVReader
from knext.component.builder.extractor import UserDefinedExtractor, LLMBasedExtractor
from knext.component.builder.mapping import (
from knext.builder.component.source_reader import CSVReader
from knext.builder.component.extractor import UserDefinedExtractor, LLMBasedExtractor
from knext.builder.component.mapping import (
SPGTypeMapping,
RelationMapping,
)
from knext.component.builder.sink_writer import KGWriter
from knext.builder.component.sink_writer import KGWriter


__all__ = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from functools import cmp_to_key
from typing import Union

from knext.component.base import Component
from knext.common.base.component import Component


class ComponentTypeEnum(str, Enum):
Expand Down Expand Up @@ -82,7 +82,7 @@ def downstream_types(self):
@staticmethod
def sort_by_dependency(mappings: list):

from knext.component.builder import SPGTypeMapping
from knext.builder.component import SPGTypeMapping

def comparator(x: SPGTypeMapping, y: SPGTypeMapping):
if x.spg_type_name in y.dependencies:
Expand All @@ -92,7 +92,7 @@ def comparator(x: SPGTypeMapping, y: SPGTypeMapping):
else:
return 0

from knext import rest
from knext.builder import rest

if len(mappings) == 1:
return rest.SpgTypeMappingNodeConfigs(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
import json
from typing import Dict, List, Sequence

from knext.client.operator import OperatorClient
from knext.common.runnable import Input, Output
from knext.component.builder.base import SPGExtractor
from knext.operator.spg_record import SPGRecord
from knext import rest
from knext.operator.op import PromptOp, ExtractOp
from knext.common.base.client import Client
from knext.common.base.runnable import Input, Output
from knext.builder.component.base import SPGExtractor
from knext.builder.operator.spg_record import SPGRecord
from knext.builder import rest
from knext.builder.operator.op import PromptOp, ExtractOp
from nn4k.invoker import NNInvoker


Expand Down Expand Up @@ -71,9 +71,9 @@ def to_rest(self):
params = dict()
params["model_config"] = json.dumps(self.llm.init_args)
params["prompt_config"] = json.dumps(
[OperatorClient().serialize(op.to_rest()) for op in self.prompt_ops]
[Client.serialize(op.to_rest()) for op in self.prompt_ops]
)
from knext.operator.builtin.online_runner import _BuiltInOnlineExtractor
from knext.builder.operator.builtin.online_runner import _BuiltInOnlineExtractor

extract_op = _BuiltInOnlineExtractor(params)
extract_op.params["max_retry_times"] = str(self.max_retry_times)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,29 @@
from enum import Enum
from typing import Union, Dict, List, Tuple, Sequence, Optional, Any

from knext import rest
from knext.client.model.base import BaseSpgType
from knext.client.schema import SchemaClient
from knext.common.runnable import Input, Output
from knext.builder import rest
from knext.schema.model.base import BaseSpgType
from knext.schema.client import SchemaClient
from knext.common.base.runnable import Input, Output

from knext.common.schema_helper import (
from knext.schema.model.schema_helper import (
SPGTypeName,
PropertyName,
RelationName,
TripletName,
SubPropertyName,
)
from knext.component.builder.base import Mapping
from knext.operator.op import LinkOp, FuseOp, PredictOp
from knext.operator.spg_record import SPGRecord
from knext.builder.component.base import Mapping
from knext.builder.operator.op import LinkOp, FuseOp, PredictOp
from knext.builder.operator.spg_record import SPGRecord


class LinkingStrategyEnum(str, Enum):
IDEquals = "ID_EQUALS"


class FusingStrategyEnum(str, Enum):
NewInstance = "NEW_INSTANCE"
Overwrite = "OVERWRITE"


class PredictingStrategyEnum(str, Enum):
Expand Down Expand Up @@ -80,9 +80,9 @@ class SPGTypeMapping(Mapping):
Examples:
mapping = SPGTypeMapping(
spg_type_name=DEFAULT.App
).add_mapping_field("id", DEFAULT.App.id) \
.add_mapping_field("name", DEFAULT.App.name) \
.add_mapping_field("riskMark", DEFAULT.App.riskMark) \
).add_property_mapping("id", DEFAULT.App.id) \
.add_property_mapping("name", DEFAULT.App.name) \
.add_property_mapping("riskMark", DEFAULT.App.riskMark) \
.add_predicting_field(DEFAULT.App.useCert)
"""

Expand Down Expand Up @@ -375,8 +375,8 @@ def to_rest(self):
fusing_config = rest.OperatorFusingConfig(
operator_config=self.fusing_strategy.to_rest()
)
elif self.fusing_strategy == FusingStrategyEnum.NewInstance:
fusing_config = rest.NewInstanceFusingConfig()
elif self.fusing_strategy == FusingStrategyEnum.Overwrite:
fusing_config = rest.OverwriteFusingConfig()
elif not self.fusing_strategy:
if self.spg_type_name in FuseOp.bind_schemas:
op_name = FuseOp.bind_schemas[self.spg_type_name]
Expand Down Expand Up @@ -440,8 +440,8 @@ class RelationMapping(Mapping):
subject_name=DEFAULT.App,
predicate_name=DEFAULT.App.useCert,
object_name=DEFAULT.Cert,
).add_mapping_field("src_id", "srcId") \
.add_mapping_field("dst_id", "dstId")
).add_property_mapping("src_id", "srcId") \
.add_property_mapping("dst_id", "dstId")
"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied.

from knext import rest
from knext.component.builder.base import SinkWriter
from knext.operator.spg_record import SPGRecord
from knext.builder import rest
from knext.builder.component.base import SinkWriter
from knext.builder.operator.spg_record import SPGRecord


class KGWriter(SinkWriter):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@

from pydantic import Field

from knext import rest
from knext.common.runnable import Input, Output
from knext.component.builder.base import SourceReader
from knext.builder import rest
from knext.common.base.runnable import Input, Output
from knext.builder.component.base import SourceReader


class CSVReader(SourceReader):
Expand Down
16 changes: 16 additions & 0 deletions python/knext/knext/builder/lib/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Copyright 2023 OpenSPG Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied.

LOCAL_BUILDER_JAR = "builder-runner-local-0.0.1-SNAPSHOT-jar-with-dependencies.jar"

LOCAL_GRAPH_STORE_URL = "tugraph://127.0.0.1:9090?graphName=default&timeout=50000&accessId=admin&accessKey=73@TuGraph"

LOCAL_SEARCH_ENGINE_URL = "elasticsearch://127.0.0.1:9200?scheme=http"
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from enum import Enum
from typing import Dict, Type

from knext.chain.builder_chain import BuilderChain
from knext.builder.chain import BuilderChain


class AlterOperationEnum(str, Enum):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied.

from knext.operator.op import LinkOp, ExtractOp, FuseOp, PromptOp, PredictOp
from knext.builder.operator.op import LinkOp, ExtractOp, FuseOp, PromptOp, PredictOp
from knext.builder.operator.spg_record import SPGRecord
from knext.builder.operator.builtin.auto_prompt import REPrompt, EEPrompt


__all__ = [
Expand All @@ -19,4 +21,7 @@
"FuseOp",
"PromptOp",
"PredictOp",
"SPGRecord",
"REPrompt",
"EEPrompt",
]
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@
from abc import ABC
from typing import Dict, Any, Type

from knext import rest

from knext.operator.invoke_result import InvokeResult
from knext.builder import rest


class BaseOp(ABC):
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
from abc import ABC
from typing import List, Dict, Tuple

from knext.client.schema import SchemaClient
from knext.common.schema_helper import SPGTypeName, PropertyName, RelationName
from knext.operator.op import PromptOp
from knext.operator.spg_record import SPGRecord
from knext.schema.client import SchemaClient
from knext.schema.model.schema_helper import SPGTypeName, PropertyName, RelationName
from knext.builder.operator.op import PromptOp
from knext.builder.operator.spg_record import SPGRecord
import uuid


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
import json
from typing import Dict, List

from knext.api.operator import ExtractOp
from knext.operator.spg_record import SPGRecord
from knext.builder.operator import ExtractOp
from knext.builder.operator.spg_record import SPGRecord
from nn4k.invoker import NNInvoker


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
from typing import List, Dict, Any

import knext.common.cache
from knext.common.schema_helper import SPGTypeName, TripletName
from knext.operator.base import BaseOp
from knext.operator.invoke_result import InvokeResult
from knext.operator.spg_record import SPGRecord
from knext.schema.model.schema_helper import SPGTypeName, TripletName
from knext.builder.operator.base import BaseOp
from knext.builder.operator.invoke_result import InvokeResult
from knext.builder.operator.spg_record import SPGRecord

cache = knext.common.cache.LinkCache(5000, 60)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

import pprint
from typing import Dict, Any, List, Tuple
from knext.common.schema_helper import (
from knext.schema.model.schema_helper import (
SPGTypeName,
PropertyName,
RelationName,
Expand Down
Loading

0 comments on commit 9d9f6c4

Please sign in to comment.