Skip to content

Commit

Permalink
feature: pipeline新增get_batch_data接口实现 (#123)
Browse files Browse the repository at this point in the history
* feature: pipeline新增get_batch_data接口实现

* minor: update poetry.lock file
  • Loading branch information
normal-wls authored Nov 23, 2022
1 parent 28adec9 commit 3783c1a
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 5 deletions.
20 changes: 19 additions & 1 deletion runtime/bamboo-pipeline/pipeline/eri/imp/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"""

import json
from typing import Dict
from typing import Dict, List

from bamboo_engine import metrics, exceptions
from bamboo_engine.eri import Data, DataInput, ExecutionData, CallbackData
Expand Down Expand Up @@ -47,6 +47,24 @@ def get_data(self, node_id: str) -> Data:
outputs=json.loads(data_model.outputs),
)

def get_batch_data(self, node_ids: List[str]) -> Dict[str, Data]:
"""
批量获取节点数据对象
:param node_ids: 节点 ID 列表
:type node_ids: List[str]
:return: 节点数据对象字典 node_id: Data
:rtype: dict
"""
data_models = DBData.objects.filter(node_id__in=node_ids)
data = {}
for data_model in data_models:
data[data_model.node_id] = Data(
inputs=self._get_data_inputs(codec.data_json_loads(data_model.inputs)),
outputs=json.loads(data_model.outputs),
)
return data

@metrics.setup_histogram(metrics.ENGINE_RUNTIME_DATA_INPUTS_READ_TIME)
def get_data_inputs(self, node_id: str) -> Dict[str, DataInput]:
"""
Expand Down
11 changes: 11 additions & 0 deletions runtime/bamboo-pipeline/pipeline/tests/eri/imp/test_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,17 @@ def test_get_data(self):
self.assertEqual(data.inputs["b"].value, 2)
self.assertTrue(data.inputs["b"].value)

def test_get_batch_data(self):
batch_data = self.mixin.get_batch_data([self.node_id])
self.assertTrue(isinstance(batch_data, dict))
self.assertTrue(isinstance(batch_data[self.node_id], Data))
data = batch_data[self.node_id]
self.assertEqual(data.outputs, self.data_outputs)
self.assertEqual(data.inputs["a"].value, 1)
self.assertTrue(data.inputs["a"].value)
self.assertEqual(data.inputs["b"].value, 2)
self.assertTrue(data.inputs["b"].value)

def test_get_data_inputs(self):
inputs = self.mixin.get_data_inputs(self.node_id)
self.assertEqual(inputs["a"].value, 1)
Expand Down
4 changes: 2 additions & 2 deletions runtime/bamboo-pipeline/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions runtime/bamboo-pipeline/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "bamboo-pipeline"
version = "3.23.1"
version = "3.24.0"
description = "runtime for bamboo-engine base on Django and Celery"
authors = ["homholueng <[email protected]>"]
license = "MIT"
Expand All @@ -16,7 +16,7 @@ requests = "^2.22.0"
django-celery-beat = "^2.1.0"
Mako = "^1.1.4"
pytz = "2019.3"
bamboo-engine = "2.5.3"
bamboo-engine = "2.6.0"
jsonschema = "^2.5.1"
ujson = "4.1.*"
pyparsing = "^2.2.0"
Expand Down

0 comments on commit 3783c1a

Please sign in to comment.