Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Scheduled Actions V2] WIP: top-level Scheduler state machine #6904

Draft
wants to merge 1 commit into
base: sched2_common
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 171 additions & 0 deletions components/scheduler2/core/scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
package core
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't create this extra package, just put everything in components/scheduler, it'll help you avoid circular dependencies later and allow you to not expose anything that's not strictly necessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason it's separated out is because the substate machines would have a lot of conflicts both between boilerplate (generator.MachineKey/backfiller.MachineKey -> GeneratorMachineKey/BackfillerMachineKey) as well as conflict between states/events (generator.TransitionSleep/executor.TransitionSleep -> TransitionGeneratorSleep/TransitionExecutorSleep). It also helps avoid creating circular dependencies and tight coupling between components.

I can be convinced the other way if it's going to be the standard for HSM projects, to keep all within a single package, but if there is a component that were to be fitting for multiple component packages, I think it'd be this one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... will leave it up to you.


import (
"fmt"

enumspb "go.temporal.io/api/enums/v1"
enumsspb "go.temporal.io/server/api/enums/v1"
schedspb "go.temporal.io/server/api/schedule/v1"
"go.temporal.io/server/service/history/hsm"
"go.temporal.io/server/service/worker/scheduler"
"google.golang.org/protobuf/proto"
)

type (
// The top-level scheduler state machine is compromised of 3 substate machines:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
// The top-level scheduler state machine is compromised of 3 substate machines:
// The top-level scheduler state machine is compromised of 3 sub state machines:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will fix here/elsewhere

// - Generator: buffers actions according to the schedule specification
// - Executor: executes buffered actions
// - Backfiller: buffers actions according to requested backfills
//
// A running scheduler will always have exactly one of each of the above substate
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is this space intentional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope, will fix that.

// machines mounted as nodes within the HSM tree. The top-level machine itself
// remains in a singular running state for its lifetime (all work is done within the
// substate machines). The Scheduler state machine is only responsible for creating
// the singleton substate machines.
Scheduler struct {
*schedspb.HsmSchedulerV2State

// Locally-cached state
compiledSpec *scheduler.CompiledSpec
}

// The machine definitions provide serialization/deserialization and type information.
machineDefinition struct{}
)

const (
// Unique identifier for top-level scheduler state machine.
SchedulerMachineType = "scheduler.SchedulerV2"
)

var (
_ hsm.StateMachine[enumsspb.Scheduler2State] = Scheduler{}
_ hsm.StateMachineDefinition = &machineDefinition{}
)

// Registers state machine definitions with the HSM registry. Should be called
// during dependency injection.
func RegisterStateMachines(r *hsm.Registry) error {
if err := r.RegisterMachine(machineDefinition{}); err != nil {
return err
}
// TODO: add other state machines here
return nil
}

// MachineCollection creates a new typed [statemachines.Collection] for operations.
func MachineCollection(tree *hsm.Node) hsm.Collection[Scheduler] {
return hsm.NewCollection[Scheduler](tree, SchedulerMachineType)
}

func (s Scheduler) State() enumsspb.Scheduler2State {
return s.HsmSchedulerV2State.State
}

func (s Scheduler) SetState(state enumsspb.Scheduler2State) {
s.HsmSchedulerV2State.State = state
}

func (s Scheduler) RegenerateTasks(node *hsm.Node) ([]hsm.Task, error) {
return nil, nil
}

func (machineDefinition) Type() string {
return SchedulerMachineType
}

func (machineDefinition) Serialize(state any) ([]byte, error) {
if state, ok := state.(Scheduler); ok {
return proto.Marshal(state.HsmSchedulerV2State)
}
return nil, fmt.Errorf("invalid scheduler state provided: %v", state)
}

func (machineDefinition) Deserialize(body []byte) (any, error) {
state := &schedspb.HsmSchedulerV2State{}
return Scheduler{
HsmSchedulerV2State: state,
compiledSpec: nil,
}, proto.Unmarshal(body, state)
}

// Returns:
//
// 0 when states are equal
// 1 when a is newer than b
// -1 when b is newer than a
func (machineDefinition) CompareState(a any, b any) (int, error) {
s1, ok := a.(Scheduler)
if !ok {
return 0, fmt.Errorf("%w: expected state1 to be a Scheduler instance, got %v", hsm.ErrIncompatibleType, s1)
}
s2, ok := a.(Scheduler)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
s2, ok := a.(Scheduler)
s2, ok := b.(Scheduler)

if !ok {
return 0, fmt.Errorf("%w: expected state1 to be a Scheduler instance, got %v", hsm.ErrIncompatibleType, s2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return 0, fmt.Errorf("%w: expected state1 to be a Scheduler instance, got %v", hsm.ErrIncompatibleType, s2)
return 0, fmt.Errorf("%w: expected state2 to be a Scheduler instance, got %v", hsm.ErrIncompatibleType, s2)

}

if s1.State() > s2.State() {
return 1, nil
} else if s1.State() < s2.State() {
return -1, nil
}
Comment on lines +107 to +111
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is used to determine whether to sync a state machine across clusters in the soon-to-be-deprecated replication stack.
The way you've implemented this will prevent updates to the scheduler from being synced. You may be able to use the conflict token instead but that has some unwanted implications when there's a split brain.

Hopefully state based replication will be ready before the scheduler work and you won't have to implement this at all.
For now, I'd start with an implementation that panics and put a TODO here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, I'd start with an implementation that panics and put a TODO here.

Yep, will do.

Comment on lines +107 to +111
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use cmp.Compare


return 0, nil
}

// Returns true when the Scheduler should allow scheduled actions to be taken.
//
// When decrement is true, the schedule's state's `RemainingActions` counter
// is decremented and the conflict token is bumped.
func (s Scheduler) CanTakeScheduledAction(decrement bool) bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rename this to UseScheduledAction instead of having a side effect in a method named "Can...".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 will rename

// If paused, don't do anything
if s.Schedule.State.Paused {
return false
}

// If unlimited actions, allow
if !s.Schedule.State.LimitedActions {
return true
}

// Otherwise check and decrement limit
if s.Schedule.State.RemainingActions > 0 {
if decrement {
s.Schedule.State.RemainingActions--
s.ConflictToken++
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want to increment the conflict token here, it can fail a user update request for no good reason.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, will remove.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea was that an update might do: sched.state.remainingActions += 5 and then we do want it conflict/retry for accuracy

}
return true
}

// No actions left
return false
}

func (s Scheduler) CompiledSpec(specBuilder *scheduler.SpecBuilder) (*scheduler.CompiledSpec, error) {
// cache compiled spec
if s.compiledSpec == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the conflict token number to know if your cache may be invalid instead of worrying about manual invalidation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't for manual invalidation, it's to support lazily loading the compiled spec after deserializing the state machine. Ideally, I'd fill this in at Deserialize time, but that would require I have access to the SpecBuilder (which itself caches time zone files from disk).. I suppose I could wire in SpecBuilder onto the machineDefinition, but I think I remember you mentioning that CHASM automates the Deserialize/Serialize bits away from components, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the spec is updated later, don't you need to recompile it?

CHASM automates the Deserialize/Serialize bits away from components

Yes, which means you're right about not wanting to put this in deserialize, but also for the reason I gave above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the spec is updated later, don't you need to recompile it?

Yeah, you do; I see your point, will update.

cspec, err := specBuilder.NewCompiledSpec(s.Schedule.Spec)
if err != nil {
return nil, err
}
s.compiledSpec = cspec

Check failure on line 151 in components/scheduler2/core/scheduler.go

View workflow job for this annotation

GitHub Actions / golangci

modifies-value-receiver: suspicious assignment to a by-value method receiver (revive)
}

return s.compiledSpec, nil
}

func (s Scheduler) JitterSeed() string {
return fmt.Sprintf("%s-%s", s.NamespaceId, s.ScheduleId)
}

func (s Scheduler) Identity() string {
return fmt.Sprintf("temporal-scheduler-%s-%s", s.Namespace, s.ScheduleId)
}

func (s Scheduler) OverlapPolicy() enumspb.ScheduleOverlapPolicy {
policy := s.Schedule.Policies.OverlapPolicy
if policy == enumspb.SCHEDULE_OVERLAP_POLICY_UNSPECIFIED {
policy = enumspb.SCHEDULE_OVERLAP_POLICY_SKIP
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the old impl we made a point of eagerly setting these default values in the state so that Describe would return them. that behavior has some pros and cons but we should probably preserve it.

}
return policy
}
Loading