Skip to content
This repository has been archived by the owner on Jul 18, 2024. It is now read-only.

Commit

Permalink
Support to define engine for pipeline (#447)
Browse files Browse the repository at this point in the history
  • Loading branch information
haojinIntel authored Nov 21, 2023
1 parent 7e87dc3 commit 3951931
Showing 1 changed file with 8 additions and 4 deletions.
12 changes: 8 additions & 4 deletions RecDP/pyrecdp/LLM/TextPipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@


class TextPipeline(BasePipeline):
def __init__(self, pipeline_file=None):
def __init__(self, engine_name='ray', pipeline_file=None):
super().__init__()
self.engine_name = engine_name
if pipeline_file != None:
self.import_from_json(pipeline_file) if pipeline_file.endswith(
'.json') else self.import_from_yaml(pipeline_file)
Expand Down Expand Up @@ -52,7 +53,10 @@ def check_platform(self, executable_sequence):
if op.support_spark:
spark_list.append(str(op))
if is_ray:
return 'ray'
if self.engine_name == 'spark' and is_spark:
return 'spark'
else:
return 'ray'
elif is_spark:
return 'spark'
else:
Expand Down Expand Up @@ -206,8 +210,8 @@ def evaluate(self) -> dict:

class ResumableTextPipeline(TextPipeline):
# Provide a pipeline for large dir. We will handle files one by one and resume when pipeline broken.
def __init__(self, pipeline_file=None):
super().__init__(pipeline_file)
def __init__(self, engine_name='ray', pipeline_file=None):
super().__init__(engine_name, pipeline_file)
# Enabling this option will result in a decrease in execution speed
self.statistics_flag = False

Expand Down

0 comments on commit 3951931

Please sign in to comment.