-
Notifications
You must be signed in to change notification settings - Fork 858
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Scheduled Actions V2] WIP Generator component #6905
base: sched2_core
Are you sure you want to change the base?
Conversation
"go.temporal.io/server/service/worker/scheduler" | ||
scheduler1 "go.temporal.io/server/service/worker/scheduler" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Duplicate import.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will fix
} | ||
|
||
// process time range between last high water mark and system time | ||
t1 := timestamp.TimeValue(generator.LastProcessedTime) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not this? (out of curiosity)
t1 := timestamp.TimeValue(generator.LastProcessedTime) | |
t1 := generator.LastProcessedTime.AsTime() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure, artifact from original implementation I suppose. will fix :)
t1 := timestamp.TimeValue(generator.LastProcessedTime) | ||
t2 := time.Now() | ||
if t2.Before(t1) { | ||
e.Logger.Warn("Time went backwards", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering how this log would be printed. Could be worth baking a logger with some relevant tags.
return common.ValidateTask(node, TransitionBuffer) | ||
} | ||
|
||
var TransitionSleep = hsm.NewTransition( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I typically put the transitions with the state machine. I think we should try and keep that consistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will do.
TaskTypeSleep = "scheduler.generator.Sleep" | ||
TaskTypeBuffer = "scheduler.generator.Buffer" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering why you need two tasks here, why don't you buffer in the sleep task under a write lock? It's less DB writes this way. You should only need the buffer task if you plan to do any IO or other type of side effect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering why you need two tasks here, why don't you buffer in the sleep task under a write lock? It's less DB writes this way. You should only need the buffer task if you plan to do any IO or other type of side effect.
If that's an option (buffering under a write lock), that works for me, since there shouldn't be any regular contention - posted a question on your other comment, re: ConflictToken
vs write locking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking about WAITING
versus EXECUTING
in the context of the Backfiller, as well, where a user's backfill request will trigger an immediate Backfiller state transition and task to start backfilling. For Generator
, if we don't have a separate EXECUTING
task to transition to after a user updates, how would we signal the sleeping Generator
sub-state machine? If re-entering the WAITING
state works, then we can probably also get rid of EXECUTING
on the Backfiller.
return err | ||
} | ||
if refreshedScheduler.ConflictToken != scheduler.ConflictToken { | ||
// schedule was updated while processing, retry |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we actually want to retry the task here? I commented on the transition definition too but I think you can do all of this under a write lock since there's no IO involved AFAICT.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe not - If we were doing this under the write lock on the generator node, is the expectation that we'd also attempt to acquire the generator node for write access during an update request? I've been assuming that env.Access
scopes applied only to the given ref
, do they/can they apply more broadly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The lock is held for the entire mutable state record including the whole tree structure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah! Okay, I'd been thinking that each component could potentially map to its own mutable state. One MS simplifies that. Will update to just fail out the task (I believe ErrStaleReference
is for this purpose?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's start with a clean slate and delete the PoC scheduler component and all of its protos.
if !ok { | ||
return 0, fmt.Errorf("%w: expected state1 to be a Generator instance, got %v", hsm.ErrIncompatibleType, s1) | ||
} | ||
s2, ok := a.(Generator) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s2, ok := a.(Generator) | |
s2, ok := b.(Generator) |
// We're reprocessing since the most recent event after an update. Discard actions before | ||
// the update time (which was just set to "now"). This doesn't have to be guarded with | ||
// hasMinVersion because this condition couldn't happen in previous versions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I imagine this comment should be rewritten somewhat. We don't have versions anymore, at least. Also I'm not sure how updates fit into this execution model, it may not be true that "we just set update time to 'now'"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, yes, will rewrite this.
|
||
// Returns the next time result, or an error if the schedule cannot be compiled. | ||
func (e taskExecutor) getNextTime(s core.Scheduler, after time.Time) (scheduler.GetNextTimeResult, error) { | ||
spec, err := s.CompiledSpec(e.SpecBuilder) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just curious if these can get cached at all. it's not that expensive to construct so probably not worth it, except for backfills where it's doing this in a loop during one transition
oh, I see, the caching is elsewhere. nm
if s1.State() > s2.State() { | ||
return 1, nil | ||
} else if s1.State() < s2.State() { | ||
return -1, nil | ||
} | ||
|
||
return 0, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if s1.State() > s2.State() { | |
return 1, nil | |
} else if s1.State() < s2.State() { | |
return -1, nil | |
} | |
return 0, nil | |
return cmp.Compare(s1.State(), s2.State()), nil |
bufferedStarts = append(bufferedStarts, &schedpb.BufferedStart{ | ||
NominalTime: timestamppb.New(next.Nominal), | ||
ActualTime: timestamppb.New(next.Next), | ||
OverlapPolicy: scheduler.OverlapPolicy(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OverlapPolicy: scheduler.OverlapPolicy(), | |
OverlapPolicy: overlapPolicy, |
?
Based on #6904
What changed?
Why?
How did you test it?