-
Notifications
You must be signed in to change notification settings - Fork 751
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
initial commit with high-level design for refactoring DagManager #3756
base: master
Are you sure you want to change the base?
initial commit with high-level design for refactoring DagManager #3756
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good boiler plate code, need more java docs, some pseudocode, and perhaps diagrams to really clarify what the new interactions you are suggesting are. I don't want to go too much into implementation but need a few more clues as to what you're imagining.
/** | ||
* Responsible to performing the actual work for a given {@link DagTask}. | ||
* It processes the {@link DagTask} by first initializing its state, performing actions | ||
* like updating {@link DagStateStore} and finally submiting an event to the executor. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's differentiate between contacting the executor to carry out an action and submitting status events
|
||
|
||
/** | ||
* Responsible to performing the actual work for a given {@link DagTask}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"performing work and notifying other components/modules of it's imminent or completed actions"
abstract protected S initialize() throws MaybeRetryableException; | ||
abstract protected R act(S state) throws MaybeRetryableException; | ||
abstract protected void sendNotification(R result) throws MaybeRetryableException; | ||
|
||
final void process() { | ||
throw new UnsupportedOperationException(" Process unsupported"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you elaborate a little on how S, R, and these functions are used with pseudocode in process? Who is the user of DagProc
? Is it sufficient just to call process
with the DagTask
as input or do they need to initialize
, then act
, etc...? Does process
do all of the above instead? Does it handle retries? Can you add some java docs to these methods.
public interface DagProcFactory extends DagTaskVisitor<DagProc> { | ||
DagProc meet(LaunchDagTask ldt); | ||
DagProc meet(KillDagTask kdt); | ||
DagProc meet(ResumeDagTask rdt); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add the SLA type actions as well from above
- enforceFlowCompletionDeadline
- void enforceJobStartDeadline
|
||
/** | ||
* Defines an individual task or job in a Dag. | ||
* It carries the state information required by {@link DagProc} to for its processing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extra "to"
public void launchFlow() { | ||
|
||
} | ||
|
||
@Override | ||
public void resumeFlow() { | ||
|
||
} | ||
|
||
@Override | ||
public void killFlow() { | ||
|
||
} | ||
|
||
@Override | ||
public void enforceFlowCompletionDeadline() { | ||
|
||
} | ||
|
||
@Override | ||
public void enforceJobStartDeadline() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Who calls these functions? Why are there no parameters? I initially thought DagManagement
from above will be the one to obtain these objects one by one from DagTaskStream
but perhaps DagManager
is the user of DagTaskStream
and utilizes the DagManagement
interface? It would be good to include in these classes how they fit into the other classes (who is a caller of what)? A diagram might actually be super helpful at clarifying for any reader who interacts with whom in what capacity.
public interface DagTaskVisitor<T> { | ||
T meet(LaunchDagTask launchDagTask); | ||
T meet(KillDagTask killDagTask); | ||
T meet(ResumeDagTask resumeDagTask); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing remaining SLA tasks
|
||
|
||
/** | ||
* An implmentation of {@link DagProc} for killing {@link DagTask}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this only for API call to kill or from any other pt? Good to clarify in java doc
|
||
|
||
/** | ||
* An implmentation of {@link DagProc} for launching {@link DagTask}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
newly added launches or every time task needs to be launched on executor?
private DagTaskStream dagTaskStream; | ||
private DagProc dagProc; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need some clarification on how these are used (see questions above). Does run
below keep getting called continuously? Once the DagTask
is returned do we call the DagTaskStream
methods to launch/kill....
Dear Gobblin maintainers,
Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
The idea is make
DagManager
less bulky and break it into submodules which will share it responsibilities. This is one step towards achieving the statelessness inDagManager
when we want to operate in a multi-active (multiple DagManager leaders) modeUsing this PR as a reference only. Will create subsequent PRs with actual implementation.
JIRA
Description
Tests
Commits