Skip to content
This repository has been archived by the owner on Jun 18, 2024. It is now read-only.

Commit

Permalink
Merge pull request #203 from sched-ext/htejun/cleanups
Browse files Browse the repository at this point in the history
Cleanups and a small fix
  • Loading branch information
Byte-Lab authored May 17, 2024
2 parents 5a00279 + 6a5cf23 commit c592e1c
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 70 deletions.
119 changes: 51 additions & 68 deletions kernel/sched/ext.c
Original file line number Diff line number Diff line change
Expand Up @@ -921,16 +921,6 @@ static unsigned long __percpu *scx_kick_cpus_pnt_seqs;
*/
static DEFINE_PER_CPU(struct task_struct *, direct_dispatch_task);

/*
* Has the current CPU been onlined from the perspective of scx?
*
* A hotplugged CPU may begin scheduling tasks before the core scheduler will
* call into rq_online_scx(). Track whether we've had a chance to invoke
* ops.cpu_online() so we can skip invoking ops.enqueue() or ops.dispatch() on
* that CPU until the scheduler knows about the hotplug event.
*/
static DEFINE_PER_CPU(bool, scx_cpu_online);

/* dispatch queues */
static struct scx_dispatch_q __cacheline_aligned_in_smp scx_dsq_global;

Expand All @@ -952,16 +942,16 @@ struct scx_dsp_buf_ent {
};

static u32 scx_dsp_max_batch;
static struct scx_dsp_buf_ent __percpu *scx_dsp_buf;

struct scx_dsp_ctx {
struct rq *rq;
struct rq_flags *rf;
u32 buf_cursor;
u32 cursor;
u32 nr_tasks;
struct scx_dsp_buf_ent buf[];
};

static DEFINE_PER_CPU(struct scx_dsp_ctx, scx_dsp_ctx);
static struct scx_dsp_ctx __percpu *scx_dsp_ctx;

/* string formatting from BPF */
struct scx_bstr_buf {
Expand Down Expand Up @@ -1673,10 +1663,10 @@ static bool task_linked_on_dsq(struct task_struct *p)
return !list_empty(&p->scx.dsq_node.list);
}

static void dispatch_dequeue(struct scx_rq *scx_rq, struct task_struct *p)
static void dispatch_dequeue(struct rq *rq, struct task_struct *p)
{
struct scx_dispatch_q *dsq = p->scx.dsq;
bool is_local = dsq == &scx_rq->local_dsq;
bool is_local = dsq == &rq->scx.local_dsq;

if (!dsq) {
WARN_ON_ONCE(task_linked_on_dsq(p));
Expand Down Expand Up @@ -1804,9 +1794,9 @@ static void direct_dispatch(struct task_struct *p, u64 enq_flags)
dispatch_enqueue(dsq, p, enq_flags);
}

static bool test_rq_online(struct rq *rq)
static bool scx_rq_online(struct rq *rq)
{
return per_cpu(scx_cpu_online, cpu_of(rq));
return likely(rq->scx.flags & SCX_RQ_ONLINE);
}

static void do_enqueue_task(struct rq *rq, struct task_struct *p, u64 enq_flags,
Expand All @@ -1826,7 +1816,7 @@ static void do_enqueue_task(struct rq *rq, struct task_struct *p, u64 enq_flags,
* offline. We're just trying to on/offline the CPU. Don't bother the
* BPF scheduler.
*/
if (unlikely(!test_rq_online(rq)))
if (!scx_rq_online(rq))
goto local;

if (scx_ops_bypassing()) {
Expand Down Expand Up @@ -2011,8 +2001,6 @@ static void ops_dequeue(struct task_struct *p, u64 deq_flags)

static void dequeue_task_scx(struct rq *rq, struct task_struct *p, int deq_flags)
{
struct scx_rq *scx_rq = &rq->scx;

if (!(p->scx.flags & SCX_TASK_QUEUED)) {
WARN_ON_ONCE(task_runnable(p));
return;
Expand Down Expand Up @@ -2046,10 +2034,10 @@ static void dequeue_task_scx(struct rq *rq, struct task_struct *p, int deq_flags
p->scx.flags &= ~SCX_TASK_DEQD_FOR_SLEEP;

p->scx.flags &= ~SCX_TASK_QUEUED;
scx_rq->nr_running--;
rq->scx.nr_running--;
sub_nr_running(rq, 1);

dispatch_dequeue(scx_rq, p);
dispatch_dequeue(rq, p);
}

static void yield_task_scx(struct rq *rq)
Expand Down Expand Up @@ -2203,17 +2191,15 @@ static void dispatch_to_local_dsq_unlock(struct rq *rq, struct rq_flags *rf,
static void consume_local_task(struct rq *rq, struct scx_dispatch_q *dsq,
struct task_struct *p)
{
struct scx_rq *scx_rq = &rq->scx;

lockdep_assert_held(&dsq->lock); /* released on return */

/* @dsq is locked and @p is on this rq */
WARN_ON_ONCE(p->scx.holding_cpu >= 0);
task_unlink_from_dsq(p, dsq);
list_add_tail(&p->scx.dsq_node.list, &scx_rq->local_dsq.list);
list_add_tail(&p->scx.dsq_node.list, &rq->scx.local_dsq.list);
dsq_mod_nr(dsq, -1);
dsq_mod_nr(&scx_rq->local_dsq, 1);
WRITE_ONCE(p->scx.dsq, &scx_rq->local_dsq);
dsq_mod_nr(&rq->scx.local_dsq, 1);
WRITE_ONCE(p->scx.dsq, &rq->scx.local_dsq);
raw_spin_unlock(&dsq->lock);
}

Expand All @@ -2232,7 +2218,7 @@ static bool task_can_run_on_remote_rq(struct task_struct *p, struct rq *rq)
return false;
if (!(p->flags & PF_KTHREAD) && unlikely(!task_cpu_possible(cpu, p)))
return false;
if (unlikely(!test_rq_online(rq)))
if (!scx_rq_online(rq))
return false;
return true;
}
Expand Down Expand Up @@ -2387,7 +2373,7 @@ dispatch_to_local_dsq(struct rq *rq, struct rq_flags *rf, u64 dsq_id,
* instead, which should always be safe. As this is an allowed
* behavior, don't trigger an ops error.
*/
if (unlikely(!test_rq_online(dst_rq)))
if (!scx_rq_online(dst_rq))
dst_rq = src_rq;

if (src_rq == dst_rq) {
Expand Down Expand Up @@ -2514,31 +2500,30 @@ static void finish_dispatch(struct rq *rq, struct rq_flags *rf,

static void flush_dispatch_buf(struct rq *rq, struct rq_flags *rf)
{
struct scx_dsp_ctx *dspc = this_cpu_ptr(&scx_dsp_ctx);
struct scx_dsp_ctx *dspc = this_cpu_ptr(scx_dsp_ctx);
u32 u;

for (u = 0; u < dspc->buf_cursor; u++) {
struct scx_dsp_buf_ent *ent = &this_cpu_ptr(scx_dsp_buf)[u];
for (u = 0; u < dspc->cursor; u++) {
struct scx_dsp_buf_ent *ent = &dspc->buf[u];

finish_dispatch(rq, rf, ent->task, ent->qseq, ent->dsq_id,
ent->enq_flags);
}

dspc->nr_tasks += dspc->buf_cursor;
dspc->buf_cursor = 0;
dspc->nr_tasks += dspc->cursor;
dspc->cursor = 0;
}

static int balance_one(struct rq *rq, struct task_struct *prev,
struct rq_flags *rf, bool local)
{
struct scx_rq *scx_rq = &rq->scx;
struct scx_dsp_ctx *dspc = this_cpu_ptr(&scx_dsp_ctx);
struct scx_dsp_ctx *dspc = this_cpu_ptr(scx_dsp_ctx);
bool prev_on_scx = prev->sched_class == &ext_sched_class;
int nr_loops = SCX_DSP_MAX_LOOPS;
bool has_tasks = false;

lockdep_assert_rq_held(rq);
scx_rq->flags |= SCX_RQ_BALANCING;
rq->scx.flags |= SCX_RQ_BALANCING;

if (static_branch_unlikely(&scx_ops_cpu_preempt) &&
unlikely(rq->scx.cpu_released)) {
Expand Down Expand Up @@ -2582,14 +2567,13 @@ static int balance_one(struct rq *rq, struct task_struct *prev,
}

/* if there already are tasks to run, nothing to do */
if (scx_rq->local_dsq.nr)
if (rq->scx.local_dsq.nr)
goto has_tasks;

if (consume_dispatch_q(rq, rf, &scx_dsq_global))
goto has_tasks;

if (!SCX_HAS_OP(dispatch) || scx_ops_bypassing() ||
unlikely(!test_rq_online(rq)))
if (!SCX_HAS_OP(dispatch) || scx_ops_bypassing() || !scx_rq_online(rq))
goto out;

dspc->rq = rq;
Expand All @@ -2610,7 +2594,7 @@ static int balance_one(struct rq *rq, struct task_struct *prev,

flush_dispatch_buf(rq, rf);

if (scx_rq->local_dsq.nr)
if (rq->scx.local_dsq.nr)
goto has_tasks;
if (consume_dispatch_q(rq, rf, &scx_dsq_global))
goto has_tasks;
Expand All @@ -2635,7 +2619,7 @@ static int balance_one(struct rq *rq, struct task_struct *prev,
has_tasks:
has_tasks = true;
out:
scx_rq->flags &= ~SCX_RQ_BALANCING;
rq->scx.flags &= ~SCX_RQ_BALANCING;
return has_tasks;
}

Expand Down Expand Up @@ -2689,7 +2673,7 @@ static void set_next_task_scx(struct rq *rq, struct task_struct *p, bool first)
* dispatched. Call ops_dequeue() to notify the BPF scheduler.
*/
ops_dequeue(p, SCX_DEQ_CORE_SCHED_EXEC);
dispatch_dequeue(&rq->scx, p);
dispatch_dequeue(rq, p);
}

p->se.exec_start = rq_clock_task(rq);
Expand Down Expand Up @@ -3228,16 +3212,18 @@ static void handle_hotplug(struct rq *rq, bool online)

static void rq_online_scx(struct rq *rq, enum rq_onoff_reason reason)
{
if (reason == RQ_ONOFF_HOTPLUG)
if (reason == RQ_ONOFF_HOTPLUG) {
handle_hotplug(rq, true);
per_cpu(scx_cpu_online, cpu_of(rq)) = true;
rq->scx.flags |= SCX_RQ_ONLINE;
}
}

static void rq_offline_scx(struct rq *rq, enum rq_onoff_reason reason)
{
per_cpu(scx_cpu_online, cpu_of(rq)) = false;
if (reason == RQ_ONOFF_HOTPLUG)
if (reason == RQ_ONOFF_HOTPLUG) {
rq->scx.flags &= ~SCX_RQ_ONLINE;
handle_hotplug(rq, false);
}
}

#else /* CONFIG_SMP */
Expand Down Expand Up @@ -4474,8 +4460,8 @@ static void scx_ops_disable_workfn(struct kthread_work *work)
} while (dsq == ERR_PTR(-EAGAIN));
rhashtable_walk_exit(&rht_iter);

free_percpu(scx_dsp_buf);
scx_dsp_buf = NULL;
free_percpu(scx_dsp_ctx);
scx_dsp_ctx = NULL;
scx_dsp_max_batch = 0;

free_exit_info(scx_exit_info);
Expand Down Expand Up @@ -4959,11 +4945,12 @@ static int scx_ops_enable(struct sched_ext_ops *ops)
if (ret)
goto err_disable;

WARN_ON_ONCE(scx_dsp_buf);
WARN_ON_ONCE(scx_dsp_ctx);
scx_dsp_max_batch = ops->dispatch_max_batch ?: SCX_DSP_DFL_MAX_BATCH;
scx_dsp_buf = __alloc_percpu(sizeof(scx_dsp_buf[0]) * scx_dsp_max_batch,
__alignof__(scx_dsp_buf[0]));
if (!scx_dsp_buf) {
scx_dsp_ctx = __alloc_percpu(struct_size_t(struct scx_dsp_ctx, buf,
scx_dsp_max_batch),
__alignof__(struct scx_dsp_ctx));
if (!scx_dsp_ctx) {
ret = -ENOMEM;
goto err_disable;
}
Expand Down Expand Up @@ -5843,28 +5830,26 @@ static bool scx_dispatch_preamble(struct task_struct *p, u64 enq_flags)

static void scx_dispatch_commit(struct task_struct *p, u64 dsq_id, u64 enq_flags)
{
struct scx_dsp_ctx *dspc = this_cpu_ptr(scx_dsp_ctx);
struct task_struct *ddsp_task;
int idx;

ddsp_task = __this_cpu_read(direct_dispatch_task);
if (ddsp_task) {
mark_direct_dispatch(ddsp_task, p, dsq_id, enq_flags);
return;
}

idx = __this_cpu_read(scx_dsp_ctx.buf_cursor);
if (unlikely(idx >= scx_dsp_max_batch)) {
if (unlikely(dspc->cursor >= scx_dsp_max_batch)) {
scx_ops_error("dispatch buffer overflow");
return;
}

this_cpu_ptr(scx_dsp_buf)[idx] = (struct scx_dsp_buf_ent){
dspc->buf[dspc->cursor++] = (struct scx_dsp_buf_ent){
.task = p,
.qseq = atomic_long_read(&p->scx.ops_state) & SCX_OPSS_QSEQ_MASK,
.dsq_id = dsq_id,
.enq_flags = enq_flags,
};
__this_cpu_inc(scx_dsp_ctx.buf_cursor);
}

__bpf_kfunc_start_defs();
Expand Down Expand Up @@ -5976,7 +5961,7 @@ __bpf_kfunc u32 scx_bpf_dispatch_nr_slots(void)
if (!scx_kf_allowed(SCX_KF_DISPATCH))
return 0;

return scx_dsp_max_batch - __this_cpu_read(scx_dsp_ctx.buf_cursor);
return scx_dsp_max_batch - __this_cpu_read(scx_dsp_ctx->cursor);
}

/**
Expand All @@ -5987,13 +5972,13 @@ __bpf_kfunc u32 scx_bpf_dispatch_nr_slots(void)
*/
__bpf_kfunc void scx_bpf_dispatch_cancel(void)
{
struct scx_dsp_ctx *dspc = this_cpu_ptr(&scx_dsp_ctx);
struct scx_dsp_ctx *dspc = this_cpu_ptr(scx_dsp_ctx);

if (!scx_kf_allowed(SCX_KF_DISPATCH))
return;

if (dspc->buf_cursor > 0)
dspc->buf_cursor--;
if (dspc->cursor > 0)
dspc->cursor--;
else
scx_ops_error("dispatch buffer underflow");
}
Expand All @@ -6015,7 +6000,7 @@ __bpf_kfunc void scx_bpf_dispatch_cancel(void)
*/
__bpf_kfunc bool scx_bpf_consume(u64 dsq_id)
{
struct scx_dsp_ctx *dspc = this_cpu_ptr(&scx_dsp_ctx);
struct scx_dsp_ctx *dspc = this_cpu_ptr(scx_dsp_ctx);
struct scx_dispatch_q *dsq;

if (!scx_kf_allowed(SCX_KF_DISPATCH))
Expand Down Expand Up @@ -6061,7 +6046,7 @@ __bpf_kfunc bool __scx_bpf_consume_task(unsigned long it, struct task_struct *p)
{
struct bpf_iter_scx_dsq_kern *kit = (void *)it;
struct scx_dispatch_q *dsq, *kit_dsq;
struct scx_dsp_ctx *dspc = this_cpu_ptr(&scx_dsp_ctx);
struct scx_dsp_ctx *dspc = this_cpu_ptr(scx_dsp_ctx);
struct rq *task_rq;
u64 kit_dsq_seq;

Expand Down Expand Up @@ -6145,22 +6130,20 @@ __bpf_kfunc u32 scx_bpf_reenqueue_local(void)
{
u32 nr_enqueued, i;
struct rq *rq;
struct scx_rq *scx_rq;

if (!scx_kf_allowed(SCX_KF_CPU_RELEASE))
return 0;

rq = cpu_rq(smp_processor_id());
lockdep_assert_rq_held(rq);
scx_rq = &rq->scx;

/*
* Get the number of tasks on the local DSQ before iterating over it to
* pull off tasks. The enqueue callback below can signal that it wants
* the task to stay on the local DSQ, and we want to prevent the BPF
* scheduler from causing us to loop indefinitely.
*/
nr_enqueued = scx_rq->local_dsq.nr;
nr_enqueued = rq->scx.local_dsq.nr;
for (i = 0; i < nr_enqueued; i++) {
struct task_struct *p;

Expand All @@ -6169,7 +6152,7 @@ __bpf_kfunc u32 scx_bpf_reenqueue_local(void)
SCX_OPSS_NONE);
WARN_ON_ONCE(!(p->scx.flags & SCX_TASK_QUEUED));
WARN_ON_ONCE(p->scx.holding_cpu != -1);
dispatch_dequeue(scx_rq, p);
dispatch_dequeue(rq, p);
do_enqueue_task(rq, p, SCX_ENQ_REENQ, -1);
}

Expand Down
10 changes: 8 additions & 2 deletions kernel/sched/sched.h
Original file line number Diff line number Diff line change
Expand Up @@ -721,8 +721,14 @@ struct cfs_rq {
#ifdef CONFIG_SCHED_CLASS_EXT
/* scx_rq->flags, protected by the rq lock */
enum scx_rq_flags {
SCX_RQ_BALANCING = 1 << 0,
SCX_RQ_CAN_STOP_TICK = 1 << 1,
/*
* A hotplugged CPU starts scheduling before rq_online_scx(). Track
* ops.cpu_on/offline() state so that ops.enqueue/dispatch() are called
* only while the BPF scheduler considers the CPU to be online.
*/
SCX_RQ_ONLINE = 1 << 0,
SCX_RQ_BALANCING = 1 << 1,
SCX_RQ_CAN_STOP_TICK = 1 << 2,
};

struct scx_rq {
Expand Down
Loading

0 comments on commit c592e1c

Please sign in to comment.