Skip to content

Commit

Permalink
optimization: 节点重试支持配置变量渲染豁免 (#120)
Browse files Browse the repository at this point in the history
* optimization: 节点重试支持配置变量渲染豁免

* minor: unit test fix
  • Loading branch information
normal-wls authored Nov 14, 2022
1 parent 8f65eb5 commit a7c81c7
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 30 deletions.
2 changes: 1 addition & 1 deletion bamboo_engine/__version__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@
specific language governing permissions and limitations under the License.
"""

__version__ = "2.5.2"
__version__ = "2.5.3"
6 changes: 5 additions & 1 deletion bamboo_engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ def retry_node(self, node_id: str, data: Optional[dict] = None):
:type data: Optional[dict], optional
"""
node = self.runtime.get_node(node_id)
escape_render_keys = data.pop("_escape_render_keys", []) if data else []

if not node.can_retry:
raise InvalidOperationError("can not retry node({}) with can_retry({})".format(node_id, node.can_retry))
Expand All @@ -263,7 +264,10 @@ def retry_node(self, node_id: str, data: Optional[dict] = None):
if data is not None:
self.runtime.set_data_inputs(
node_id,
{k: DataInput(need_render=True, value=v) for k, v in data.items()},
{
k: DataInput(need_render=False if k in escape_render_keys else True, value=v)
for k, v in data.items()
},
)

self._add_history(node_id, state)
Expand Down
58 changes: 31 additions & 27 deletions bamboo_engine/handlers/service_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@
from bamboo_engine.context import Context
from bamboo_engine.eri.models.interrupt import ScheduleInterruptPoint
from bamboo_engine.interrupt import ExecuteKeyPoint, ScheduleKeyPoint
from bamboo_engine.metrics import ENGINE_SCHEDULE_FAILED_COUNT, ENGINE_EXECUTE_FAILED_COUNT, \
ENGINE_EXECUTE_EXCEPTION_COUNT, ENGINE_SCHEDULE_EXCEPTION_COUNT
from bamboo_engine.metrics import (
ENGINE_SCHEDULE_FAILED_COUNT,
ENGINE_EXECUTE_FAILED_COUNT,
ENGINE_EXECUTE_EXCEPTION_COUNT,
ENGINE_SCHEDULE_EXCEPTION_COUNT,
)
from bamboo_engine.template import Template
from bamboo_engine.eri import (
ProcessInfo,
Expand Down Expand Up @@ -53,12 +57,12 @@ class ServiceActivityHandler(NodeHandler):
"""

def execute(
self,
process_info: ProcessInfo,
loop: int,
inner_loop: int,
version: str,
recover_point: Optional[ExecuteInterruptPoint] = None,
self,
process_info: ProcessInfo,
loop: int,
inner_loop: int,
version: str,
recover_point: Optional[ExecuteInterruptPoint] = None,
) -> ExecuteResult:
"""
节点的 execute 处理逻辑
Expand All @@ -72,7 +76,7 @@ def execute(
"""

with metrics.observe(
metrics.ENGINE_NODE_EXECUTE_PRE_PROCESS_DURATION, type=self.node.type.value, hostname=self._hostname
metrics.ENGINE_NODE_EXECUTE_PRE_PROCESS_DURATION, type=self.node.type.value, hostname=self._hostname
):
top_pipeline_id = process_info.top_pipeline_id
root_pipeline_id = process_info.root_pipeline_id
Expand Down Expand Up @@ -236,7 +240,7 @@ def execute(
ENGINE_EXECUTE_FAILED_COUNT.labels(type=node_type, hostname=self._hostname).inc()

with metrics.observe(
metrics.ENGINE_NODE_EXECUTE_POST_PROCESS_DURATION, type=self.node.type.value, hostname=self._hostname
metrics.ENGINE_NODE_EXECUTE_POST_PROCESS_DURATION, type=self.node.type.value, hostname=self._hostname
):
serialize_ouputs, ouputs_serializer = self.runtime.serialize_execution_data(service_data.outputs)
self.interrupter.check_and_set(
Expand Down Expand Up @@ -343,14 +347,14 @@ def execute(
)

def _finish_schedule(
self,
process_info: ProcessInfo,
schedule: Schedule,
data_outputs: dict,
execution_data: ExecutionData,
error_ignored: bool,
root_pipeline_inputs: dict,
recover_point: Optional[ScheduleInterruptPoint] = None,
self,
process_info: ProcessInfo,
schedule: Schedule,
data_outputs: dict,
execution_data: ExecutionData,
error_ignored: bool,
root_pipeline_inputs: dict,
recover_point: Optional[ScheduleInterruptPoint] = None,
) -> ScheduleResult:
self.runtime.set_state(
node_id=self.node.id,
Expand All @@ -376,13 +380,13 @@ def _finish_schedule(
)

def schedule(
self,
process_info: ProcessInfo,
loop: int,
inner_loop: int,
schedule: Schedule,
callback_data: Optional[CallbackData] = None,
recover_point: Optional[ScheduleInterruptPoint] = None,
self,
process_info: ProcessInfo,
loop: int,
inner_loop: int,
schedule: Schedule,
callback_data: Optional[CallbackData] = None,
recover_point: Optional[ScheduleInterruptPoint] = None,
) -> ScheduleResult:
"""
节点的 schedule 处理逻辑
Expand All @@ -397,7 +401,7 @@ def schedule(
:rtype: ScheduleResult
"""
with metrics.observe(
metrics.ENGINE_NODE_SCHEDULE_PRE_PROCESS_DURATION, type=self.node.type.value, hostname=self._hostname
metrics.ENGINE_NODE_SCHEDULE_PRE_PROCESS_DURATION, type=self.node.type.value, hostname=self._hostname
):
# data prepare
top_pipeline_id = process_info.top_pipeline_id
Expand Down Expand Up @@ -459,7 +463,7 @@ def schedule(
ENGINE_SCHEDULE_FAILED_COUNT.labels(type=node_type, hostname=self._hostname).inc()

with metrics.observe(
metrics.ENGINE_NODE_SCHEDULE_POST_PROCESS_DURATION, type=self.node.type.value, hostname=self._hostname
metrics.ENGINE_NODE_SCHEDULE_POST_PROCESS_DURATION, type=self.node.type.value, hostname=self._hostname
):
serialize_ouputs, ouputs_serializer = self.runtime.serialize_execution_data(service_data.outputs)
self.interrupter.check_and_set(
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "bamboo-engine"
version = "2.5.2"
version = "2.5.3"
description = "Bamboo-engine is a general-purpose workflow engine"
authors = ["homholueng <[email protected]>"]
license = "MIT"
Expand Down

0 comments on commit a7c81c7

Please sign in to comment.