Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Output Ports of an Operator to Write Storage #3295

Open
wants to merge 64 commits into
base: master
Choose a base branch
from

Conversation

Xiao-zhen-Liu
Copy link
Collaborator

@Xiao-zhen-Liu Xiao-zhen-Liu commented Mar 2, 2025

Currently the Amber engine creates and uses sink operators to write the results of output ports of an operator. This design causes many problems, mainly because it alters the physical plan. Ideally a physical plan should not be changed once it is compiled.

This PR updates the design to use output ports of an operator instead of sink operators to write port results to storage. The changes implemented in this PR include:

  • The logic to add sink operators during compilation and scheduling are not removed in this PR. However, all the execution logic in the sink operator are removed. A sink operator will not do anything during execution. We will remove sink operators in the next PR.
  • GlobalPortIdentity is moved from Region to workflow-core so that it is accessible by all the modules.
  • The compiler does not create storage objects anymore. Instead, it produces a set of GlobalPortIdentity that need view results. This set is passed along to the scheduler as part of WorkflowContext.workflowSettings. In the future, this information will be directly produced by the frontend instead of by the compiler.
  • The scheduler combines the ports that need view result and materialized ports needed by the scheduler as part of a region. Ideally this information should be fixed once a region is created by the SchedulerGenerator. However, as the physical plan still needs to be changed currently (because of additional cache read operators), additional logic is implemented in the ScheduleGenerator to make sure this information is correct for all the regions.
  • The scheduler and resource allocator do not create storage objects directly and only assign storage URIs for each region as part of resourceConfig.
  • When a region is scheduled to execute, during the initialization of a region, these URIs are used to create storage objects.
  • AssignPortRequest is used to indicate whether an output port of a worker needs storage and to pass the storage URI information to a worker. Note this request is used for both input ports and output ports, and this PR only updates output ports. As a result, for input ports, empty storage URIs will be provided in AssignPortRequest. In the future, after we also use input ports to read storage, we will also update and use these storage URIs.
  • Note that since currently operators with dependee inputs belong to multiple regions, and AssignPortRequest is only used once for each worker, I had to implement additional logic in the to make sure all the regions that such an operator belongs to have the proper storage information (specifically, the output port connecting a dependee link belongs to two regions, and both regions need to have storageURI for this port)
  • Inside a worker (both Java and Python), the OutputManager is used to create writer threads for each output port that needs storage. The writing does not block the data processor, but will block the completion status of the operator/port.

TODOs:

  • Completely remove sink op
  • Use GlobalPortIdentity for storage URIs
  • Remove the mention of "result" in the storage layer
  • Let the frontend specify view results on the port level
  • Remove cache read op and use input port for reading storage

…orage

# Conflicts:
#	core/amber/src/main/python/core/architecture/packaging/output_manager.py
#	core/amber/src/main/python/core/runnables/main_loop.py
#	core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
…orage

# Conflicts:
#	core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala
#	core/amber/src/main/scala/edu/uci/ics/texera/web/service/ResultExportService.scala
…n-use-output-port-for-storage

# Conflicts:
#	core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala
#	core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala
#	core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala
#	core/amber/src/main/scala/edu/uci/ics/texera/web/service/ResultExportService.scala
#	core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala
#	core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/VFSURIFactory.scala
#	core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala
#	core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sink/ProgressiveSinkOpExec.scala
@Xiao-zhen-Liu Xiao-zhen-Liu self-assigned this Mar 2, 2025
@Xiao-zhen-Liu Xiao-zhen-Liu added engine refactoring Refactor the code labels Mar 2, 2025
Copy link
Collaborator

@Yicong-Huang Yicong-Huang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general looks good! left code comments.

Copy link
Collaborator

@Yicong-Huang Yicong-Huang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
engine refactoring Refactor the code
Projects
Status: Review in progress
Development

Successfully merging this pull request may close these issues.

2 participants