1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
use alloc::{boxed::Box, sync::Arc, vec::Vec};
use core::ptr;

use cpu::CpuId;
use spin::Mutex;
use sync_preemption::PreemptionSafeMutex;

use crate::TaskRef;

/// List of all the schedulers on the system.
///
/// This is primarily used for spawning tasks, either to find the least busy CPU
/// or spawn a task pinned to a particular CPU.
///
/// The outer mutex does not need to be preemption-safe, because it is never
/// accessed from `schedule`. In fact, ideally it would be a blocking mutex, but
/// that leads to circular dependencies.
static SCHEDULERS: Mutex<Vec<(CpuId, Arc<ConcurrentScheduler>)>> = Mutex::new(Vec::new());

/// A reference to the current CPUs scheduler.
///
/// This isn't strictly necessary, but it greatly improves performance, as it
/// avoids having to lock the system-wide list of schedulers.
#[cls::cpu_local]
static SCHEDULER: Option<Arc<ConcurrentScheduler>> = None;

type ConcurrentScheduler = PreemptionSafeMutex<dyn Scheduler>;

/// Yields the current CPU by selecting a new `Task` to run next,
/// and then switches to that new `Task`.
///
/// The new "next" `Task` to run will be selected by the currently-active
/// scheduler policy.
///
/// Preemption will be disabled while this function runs,
/// but interrupts are not disabled because it is not necessary.
///
/// ## Return
/// * `true` if a new task was selected and switched to.
/// * `false` if no new task was selected, meaning the current task will
///   continue running.
#[doc(alias("yield"))]
pub fn schedule() -> bool {
    let preemption_guard = preemption::hold_preemption();
    // If preemption was not previously enabled (before we disabled it above),
    // then we shouldn't perform a task switch here.
    if !preemption_guard.preemption_was_enabled() {
        // trace!("Note: preemption was disabled on CPU {}, skipping scheduler.", cpu::current_cpu());
        return false;
    }

    let cpu_id = preemption_guard.cpu_id();

    let next_task = SCHEDULER.update_guarded(
        |scheduler| scheduler.as_ref().unwrap().lock().next(),
        &preemption_guard,
    );

    let (did_switch, recovered_preemption_guard) =
        super::task_switch(next_task, cpu_id, preemption_guard);

    // log::trace!("AFTER TASK_SWITCH CALL (CPU {}) new current: {:?}, interrupts are {}", cpu_id, super::get_my_current_task(), irq_safety::interrupts_enabled());

    drop(recovered_preemption_guard);
    did_switch
}

/// Sets the scheduler policy for the given CPU.
pub fn set_policy<T>(cpu_id: CpuId, scheduler: T)
where
    T: Scheduler,
{
    let mutex = PreemptionSafeMutex::new(scheduler);
    let scheduler = Arc::new(mutex);

    let mut locked = SCHEDULERS.lock();
    SCHEDULER.update(|current_scheduler| {
        if let Some(old_scheduler) = current_scheduler {
            let mut old_scheduler_index = None;
            for (i, (cpu, scheduler)) in locked.iter().enumerate() {
                if *cpu == cpu_id {
                    debug_assert!(ptr::eq(old_scheduler, scheduler));
                    old_scheduler_index = Some(i);
                    break;
                }
            }

            if let Some(old_scheduler_index) = old_scheduler_index {
                locked.swap_remove(old_scheduler_index);
            } else {
                log::error!("BUG: current scheduler not found in `SCHEDULERS`");
            }

            let mut new_scheduler = scheduler.lock();
            for task in old_scheduler.lock().drain() {
                new_scheduler.add(task);
            }
        }

        locked.push((cpu_id, scheduler.clone() as _));
        *current_scheduler = Some(scheduler as _);
    });
}

/// Adds the given task to the least busy run queue.
pub fn add_task(task: TaskRef) {
    let locked = SCHEDULERS.lock();

    let mut min_busyness = usize::MAX;
    let mut least_busy_index = None;

    for (i, (_, scheduler)) in locked.iter().enumerate() {
        let busyness = scheduler.lock().busyness();
        if busyness < min_busyness {
            least_busy_index = Some(i);
            min_busyness = busyness;
        }
    }

    locked[least_busy_index.unwrap()].1.lock().add(task);
}

/// Adds the given task to the specified CPU's run queue.
pub fn add_task_to(cpu_id: CpuId, task: TaskRef) {
    for (cpu, scheduler) in SCHEDULERS.lock().iter() {
        if *cpu == cpu_id {
            scheduler.lock().add(task);
            return;
        }
    }
}

/// Adds the given task to the current CPU's run queue.
pub fn add_task_to_current(task: TaskRef) {
    SCHEDULER.update(|scheduler| scheduler.as_ref().unwrap().lock().add(task))
}

/// Removes the given task from all run queues.
pub fn remove_task(task: &TaskRef) -> bool {
    for (_, scheduler) in SCHEDULERS.lock().iter() {
        if scheduler.lock().remove(task) {
            // A task will only be on one run queue.
            return true;
        }
    }
    false
}

/// Removes the given task from the specified CPU's run queue.
pub fn remove_task_from(task: &TaskRef, cpu_id: CpuId) -> bool {
    for (cpu, scheduler) in SCHEDULERS.lock().iter() {
        if *cpu == cpu_id {
            return scheduler.lock().remove(task);
        }
    }
    false
}

/// Removes the given task from the current CPU's run queue.
pub fn remove_task_from_current(task: &TaskRef) -> bool {
    SCHEDULER.update(|scheduler| scheduler.as_ref().unwrap().lock().remove(task))
}

/// A task scheduler.
pub trait Scheduler: Send + Sync + 'static {
    /// Returns the next task to run.
    fn next(&mut self) -> TaskRef;

    /// Adds a task to the run queue.
    fn add(&mut self, task: TaskRef);

    /// Returns a measure of how busy the scheduler is, with higher values
    /// representing a busier scheduler.
    fn busyness(&self) -> usize;

    /// Removes a task from the run queue.
    fn remove(&mut self, task: &TaskRef) -> bool;

    /// Returns a reference to this scheduler as a priority scheduler, if it is one.
    fn as_priority_scheduler(&mut self) -> Option<&mut dyn PriorityScheduler>;

    /// Clears the scheduler's runqueue, returning an iterator over all contained tasks.
    fn drain(&mut self) -> Box<dyn Iterator<Item = TaskRef> + '_>;

    /// Returns a cloned list of contained tasks being scheduled by this scheduler.
    ///
    /// The list should be considered out-of-date as soon as it is called,
    /// but can be useful as a heuristic or for debugging.
    fn tasks(&self) -> Vec<TaskRef>;
}

/// A task scheduler that supports some notion of priority.
pub trait PriorityScheduler {
    /// Sets the priority of the given task.
    fn set_priority(&mut self, task: &TaskRef, priority: u8) -> bool;

    /// Gets the priority of the given task.
    fn priority(&mut self, task: &TaskRef) -> Option<u8>;
}

/// Returns the priority of the given task.
///
/// Returns `None` if the task is not on a priority run queue.
pub fn priority(task: &TaskRef) -> Option<u8> {
    for (_, scheduler) in SCHEDULERS.lock().iter() {
        if let Some(priority) = scheduler
            .lock()
            .as_priority_scheduler()
            .and_then(|priority_scheduler| priority_scheduler.priority(task))
        {
            return Some(priority);
        }
    }
    None
}

/// Sets the priority of the given task.
///
/// Returns `None` if the task is not on a priority run queue.
pub fn set_priority(task: &TaskRef, priority: u8) -> bool {
    for (_, scheduler) in SCHEDULERS.lock().iter() {
        if let Some(true) = scheduler
            .lock()
            .as_priority_scheduler()
            .map(|priority_scheduler| priority_scheduler.set_priority(task, priority))
        {
            return true;
        }
    }
    false
}

/// Returns the busyness of the scheduler on the given CPU,
/// in which higher values indicate a busier scheduler.
pub fn busyness(cpu_id: CpuId) -> Option<usize> {
    for (cpu, scheduler) in SCHEDULERS.lock().iter() {
        if *cpu == cpu_id {
            return Some(scheduler.lock().busyness());
        }
    }
    None
}

/// Modifies the given task's priority to be the maximum of its priority
/// and the current task's priority.
///
/// Returns a guard which reverts the change when dropped.
pub fn inherit_priority(task: &TaskRef) -> PriorityInheritanceGuard<'_> {
    let current_priority = super::with_current_task(priority).unwrap();
    let other_priority = priority(task);

    if let (Some(current_priority), Some(other_priority)) =
        (current_priority, other_priority) && current_priority > other_priority
    {
        set_priority(task, current_priority);
    }

    PriorityInheritanceGuard {
        inner: if let (Some(current_priority), Some(other_priority)) =
            (current_priority, other_priority)
            && current_priority > other_priority
        {
            Some((task, other_priority))
        } else {
            None
        },
    }
}

/// A guard that lowers a task's priority back to its previous value when dropped.
pub struct PriorityInheritanceGuard<'a> {
    inner: Option<(&'a TaskRef, u8)>,
}
impl<'a> Drop for PriorityInheritanceGuard<'a> {
    fn drop(&mut self) {
        if let Some((task, priority)) = self.inner {
            set_priority(task, priority);
        }
    }
}

/// Returns the list of tasks running on each CPU.
///
/// To avoid race conditions with migrating tasks, this function takes a lock
/// over all system schedulers. This is incredibly disruptive and should be
/// avoided at all costs.
pub fn tasks() -> Vec<(CpuId, Vec<TaskRef>)> {
    let schedulers = SCHEDULERS.lock().clone();
    let locked = schedulers
        .iter()
        .map(|(cpu, scheduler)| (cpu, scheduler.lock()))
        // We eagerly evaluate so that all schedulers are actually locked.
        .collect::<Vec<_>>();
    let result = locked
        .iter()
        .map(|(cpu, locked_scheduler)| (**cpu, locked_scheduler.tasks()))
        .collect();
    drop(locked);
    result
}