miri/concurrency/
thread.rs

1//! Implements threads.
2
3use std::mem;
4use std::sync::atomic::Ordering::Relaxed;
5use std::task::Poll;
6use std::time::{Duration, SystemTime};
7
8use either::Either;
9use rand::seq::IteratorRandom;
10use rustc_abi::ExternAbi;
11use rustc_const_eval::CTRL_C_RECEIVED;
12use rustc_data_structures::fx::FxHashMap;
13use rustc_hir::def_id::DefId;
14use rustc_index::{Idx, IndexVec};
15use rustc_middle::mir::Mutability;
16use rustc_middle::ty::layout::TyAndLayout;
17use rustc_span::Span;
18
19use crate::concurrency::GlobalDataRaceHandler;
20use crate::shims::tls;
21use crate::*;
22
23#[derive(Clone, Copy, Debug, PartialEq)]
24enum SchedulingAction {
25    /// Execute step on the active thread.
26    ExecuteStep,
27    /// Execute a timeout callback.
28    ExecuteTimeoutCallback,
29    /// Wait for a bit, until there is a timeout to be called.
30    Sleep(Duration),
31}
32
33/// What to do with TLS allocations from terminated threads
34#[derive(Clone, Copy, Debug, PartialEq)]
35pub enum TlsAllocAction {
36    /// Deallocate backing memory of thread-local statics as usual
37    Deallocate,
38    /// Skip deallocating backing memory of thread-local statics and consider all memory reachable
39    /// from them as "allowed to leak" (like global `static`s).
40    Leak,
41}
42
43/// The argument type for the "unblock" callback, indicating why the thread got unblocked.
44#[derive(Clone, Copy, Debug, PartialEq)]
45pub enum UnblockKind {
46    /// Operation completed successfully, thread continues normal execution.
47    Ready,
48    /// The operation did not complete within its specified duration.
49    TimedOut,
50}
51
52/// Type alias for unblock callbacks, i.e. machine callbacks invoked when
53/// a thread gets unblocked.
54pub type DynUnblockCallback<'tcx> = DynMachineCallback<'tcx, UnblockKind>;
55
56/// A thread identifier.
57#[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)]
58pub struct ThreadId(u32);
59
60impl ThreadId {
61    pub fn to_u32(self) -> u32 {
62        self.0
63    }
64
65    /// Create a new thread id from a `u32` without checking if this thread exists.
66    pub fn new_unchecked(id: u32) -> Self {
67        Self(id)
68    }
69
70    pub const MAIN_THREAD: ThreadId = ThreadId(0);
71}
72
73impl Idx for ThreadId {
74    fn new(idx: usize) -> Self {
75        ThreadId(u32::try_from(idx).unwrap())
76    }
77
78    fn index(self) -> usize {
79        usize::try_from(self.0).unwrap()
80    }
81}
82
83impl From<ThreadId> for u64 {
84    fn from(t: ThreadId) -> Self {
85        t.0.into()
86    }
87}
88
89/// Keeps track of what the thread is blocked on.
90#[derive(Debug, Copy, Clone, PartialEq, Eq)]
91pub enum BlockReason {
92    /// The thread tried to join the specified thread and is blocked until that
93    /// thread terminates.
94    Join(ThreadId),
95    /// Waiting for time to pass.
96    Sleep,
97    /// Blocked on a mutex.
98    Mutex,
99    /// Blocked on a condition variable.
100    Condvar,
101    /// Blocked on a reader-writer lock.
102    RwLock,
103    /// Blocked on a Futex variable.
104    Futex,
105    /// Blocked on an InitOnce.
106    InitOnce,
107    /// Blocked on epoll.
108    Epoll,
109    /// Blocked on eventfd.
110    Eventfd,
111    /// Blocked on unnamed_socket.
112    UnnamedSocket,
113}
114
115/// The state of a thread.
116enum ThreadState<'tcx> {
117    /// The thread is enabled and can be executed.
118    Enabled,
119    /// The thread is blocked on something.
120    Blocked { reason: BlockReason, timeout: Option<Timeout>, callback: DynUnblockCallback<'tcx> },
121    /// The thread has terminated its execution. We do not delete terminated
122    /// threads (FIXME: why?).
123    Terminated,
124}
125
126impl<'tcx> std::fmt::Debug for ThreadState<'tcx> {
127    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128        match self {
129            Self::Enabled => write!(f, "Enabled"),
130            Self::Blocked { reason, timeout, .. } =>
131                f.debug_struct("Blocked").field("reason", reason).field("timeout", timeout).finish(),
132            Self::Terminated => write!(f, "Terminated"),
133        }
134    }
135}
136
137impl<'tcx> ThreadState<'tcx> {
138    fn is_enabled(&self) -> bool {
139        matches!(self, ThreadState::Enabled)
140    }
141
142    fn is_terminated(&self) -> bool {
143        matches!(self, ThreadState::Terminated)
144    }
145
146    fn is_blocked_on(&self, reason: BlockReason) -> bool {
147        matches!(*self, ThreadState::Blocked { reason: actual_reason, .. } if actual_reason == reason)
148    }
149}
150
151/// The join status of a thread.
152#[derive(Debug, Copy, Clone, PartialEq, Eq)]
153enum ThreadJoinStatus {
154    /// The thread can be joined.
155    Joinable,
156    /// A thread is detached if its join handle was destroyed and no other
157    /// thread can join it.
158    Detached,
159    /// The thread was already joined by some thread and cannot be joined again.
160    Joined,
161}
162
163/// A thread.
164pub struct Thread<'tcx> {
165    state: ThreadState<'tcx>,
166
167    /// Name of the thread.
168    thread_name: Option<Vec<u8>>,
169
170    /// The virtual call stack.
171    stack: Vec<Frame<'tcx, Provenance, FrameExtra<'tcx>>>,
172
173    /// The function to call when the stack ran empty, to figure out what to do next.
174    /// Conceptually, this is the interpreter implementation of the things that happen 'after' the
175    /// Rust language entry point for this thread returns (usually implemented by the C or OS runtime).
176    /// (`None` is an error, it means the callback has not been set up yet or is actively running.)
177    pub(crate) on_stack_empty: Option<StackEmptyCallback<'tcx>>,
178
179    /// The index of the topmost user-relevant frame in `stack`. This field must contain
180    /// the value produced by `get_top_user_relevant_frame`.
181    /// The `None` state here represents
182    /// This field is a cache to reduce how often we call that method. The cache is manually
183    /// maintained inside `MiriMachine::after_stack_push` and `MiriMachine::after_stack_pop`.
184    top_user_relevant_frame: Option<usize>,
185
186    /// The join status.
187    join_status: ThreadJoinStatus,
188
189    /// Stack of active unwind payloads for the current thread. Used for storing
190    /// the argument of the call to `miri_start_unwind` (the payload) when unwinding.
191    /// This is pointer-sized, and matches the `Payload` type in `src/libpanic_unwind/miri.rs`.
192    ///
193    /// In real unwinding, the payload gets passed as an argument to the landing pad,
194    /// which then forwards it to 'Resume'. However this argument is implicit in MIR,
195    /// so we have to store it out-of-band. When there are multiple active unwinds,
196    /// the innermost one is always caught first, so we can store them as a stack.
197    pub(crate) unwind_payloads: Vec<ImmTy<'tcx>>,
198
199    /// Last OS error location in memory. It is a 32-bit integer.
200    pub(crate) last_error: Option<MPlaceTy<'tcx>>,
201}
202
203pub type StackEmptyCallback<'tcx> =
204    Box<dyn FnMut(&mut MiriInterpCx<'tcx>) -> InterpResult<'tcx, Poll<()>> + 'tcx>;
205
206impl<'tcx> Thread<'tcx> {
207    /// Get the name of the current thread if it was set.
208    fn thread_name(&self) -> Option<&[u8]> {
209        self.thread_name.as_deref()
210    }
211
212    /// Get the name of the current thread for display purposes; will include thread ID if not set.
213    fn thread_display_name(&self, id: ThreadId) -> String {
214        if let Some(ref thread_name) = self.thread_name {
215            String::from_utf8_lossy(thread_name).into_owned()
216        } else {
217            format!("unnamed-{}", id.index())
218        }
219    }
220
221    /// Return the top user-relevant frame, if there is one. `skip` indicates how many top frames
222    /// should be skipped.
223    /// Note that the choice to return `None` here when there is no user-relevant frame is part of
224    /// justifying the optimization that only pushes of user-relevant frames require updating the
225    /// `top_user_relevant_frame` field.
226    fn compute_top_user_relevant_frame(&self, skip: usize) -> Option<usize> {
227        self.stack
228            .iter()
229            .enumerate()
230            .rev()
231            .skip(skip)
232            .find_map(|(idx, frame)| if frame.extra.is_user_relevant { Some(idx) } else { None })
233    }
234
235    /// Re-compute the top user-relevant frame from scratch. `skip` indicates how many top frames
236    /// should be skipped.
237    pub fn recompute_top_user_relevant_frame(&mut self, skip: usize) {
238        self.top_user_relevant_frame = self.compute_top_user_relevant_frame(skip);
239    }
240
241    /// Set the top user-relevant frame to the given value. Must be equal to what
242    /// `get_top_user_relevant_frame` would return!
243    pub fn set_top_user_relevant_frame(&mut self, frame_idx: usize) {
244        debug_assert_eq!(Some(frame_idx), self.compute_top_user_relevant_frame(0));
245        self.top_user_relevant_frame = Some(frame_idx);
246    }
247
248    /// Returns the topmost frame that is considered user-relevant, or the
249    /// top of the stack if there is no such frame, or `None` if the stack is empty.
250    pub fn top_user_relevant_frame(&self) -> Option<usize> {
251        debug_assert_eq!(self.top_user_relevant_frame, self.compute_top_user_relevant_frame(0));
252        // This can be called upon creation of an allocation. We create allocations while setting up
253        // parts of the Rust runtime when we do not have any stack frames yet, so we need to handle
254        // empty stacks.
255        self.top_user_relevant_frame.or_else(|| self.stack.len().checked_sub(1))
256    }
257
258    pub fn current_span(&self) -> Span {
259        self.top_user_relevant_frame()
260            .map(|frame_idx| self.stack[frame_idx].current_span())
261            .unwrap_or(rustc_span::DUMMY_SP)
262    }
263}
264
265impl<'tcx> std::fmt::Debug for Thread<'tcx> {
266    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
267        write!(
268            f,
269            "{}({:?}, {:?})",
270            String::from_utf8_lossy(self.thread_name().unwrap_or(b"<unnamed>")),
271            self.state,
272            self.join_status
273        )
274    }
275}
276
277impl<'tcx> Thread<'tcx> {
278    fn new(name: Option<&str>, on_stack_empty: Option<StackEmptyCallback<'tcx>>) -> Self {
279        Self {
280            state: ThreadState::Enabled,
281            thread_name: name.map(|name| Vec::from(name.as_bytes())),
282            stack: Vec::new(),
283            top_user_relevant_frame: None,
284            join_status: ThreadJoinStatus::Joinable,
285            unwind_payloads: Vec::new(),
286            last_error: None,
287            on_stack_empty,
288        }
289    }
290}
291
292impl VisitProvenance for Thread<'_> {
293    fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
294        let Thread {
295            unwind_payloads: panic_payload,
296            last_error,
297            stack,
298            top_user_relevant_frame: _,
299            state: _,
300            thread_name: _,
301            join_status: _,
302            on_stack_empty: _, // we assume the closure captures no GC-relevant state
303        } = self;
304
305        for payload in panic_payload {
306            payload.visit_provenance(visit);
307        }
308        last_error.visit_provenance(visit);
309        for frame in stack {
310            frame.visit_provenance(visit)
311        }
312    }
313}
314
315impl VisitProvenance for Frame<'_, Provenance, FrameExtra<'_>> {
316    fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
317        let Frame {
318            return_place,
319            locals,
320            extra,
321            // There are some private fields we cannot access; they contain no tags.
322            ..
323        } = self;
324
325        // Return place.
326        return_place.visit_provenance(visit);
327        // Locals.
328        for local in locals.iter() {
329            match local.as_mplace_or_imm() {
330                None => {}
331                Some(Either::Left((ptr, meta))) => {
332                    ptr.visit_provenance(visit);
333                    meta.visit_provenance(visit);
334                }
335                Some(Either::Right(imm)) => {
336                    imm.visit_provenance(visit);
337                }
338            }
339        }
340
341        extra.visit_provenance(visit);
342    }
343}
344
345/// The moment in time when a blocked thread should be woken up.
346#[derive(Debug)]
347enum Timeout {
348    Monotonic(Instant),
349    RealTime(SystemTime),
350}
351
352impl Timeout {
353    /// How long do we have to wait from now until the specified time?
354    fn get_wait_time(&self, clock: &MonotonicClock) -> Duration {
355        match self {
356            Timeout::Monotonic(instant) => instant.duration_since(clock.now()),
357            Timeout::RealTime(time) =>
358                time.duration_since(SystemTime::now()).unwrap_or(Duration::ZERO),
359        }
360    }
361
362    /// Will try to add `duration`, but if that overflows it may add less.
363    fn add_lossy(&self, duration: Duration) -> Self {
364        match self {
365            Timeout::Monotonic(i) => Timeout::Monotonic(i.add_lossy(duration)),
366            Timeout::RealTime(s) => {
367                // If this overflows, try adding just 1h and assume that will not overflow.
368                Timeout::RealTime(
369                    s.checked_add(duration)
370                        .unwrap_or_else(|| s.checked_add(Duration::from_secs(3600)).unwrap()),
371                )
372            }
373        }
374    }
375}
376
377/// The clock to use for the timeout you are asking for.
378#[derive(Debug, Copy, Clone, PartialEq)]
379pub enum TimeoutClock {
380    Monotonic,
381    RealTime,
382}
383
384/// Whether the timeout is relative or absolute.
385#[derive(Debug, Copy, Clone)]
386pub enum TimeoutAnchor {
387    Relative,
388    Absolute,
389}
390
391/// An error signaling that the requested thread doesn't exist.
392#[derive(Debug, Copy, Clone)]
393pub struct ThreadNotFound;
394
395/// A set of threads.
396#[derive(Debug)]
397pub struct ThreadManager<'tcx> {
398    /// Identifier of the currently active thread.
399    active_thread: ThreadId,
400    /// Threads used in the program.
401    ///
402    /// Note that this vector also contains terminated threads.
403    threads: IndexVec<ThreadId, Thread<'tcx>>,
404    /// A mapping from a thread-local static to the thread specific allocation.
405    thread_local_allocs: FxHashMap<(DefId, ThreadId), StrictPointer>,
406    /// A flag that indicates that we should change the active thread.
407    yield_active_thread: bool,
408    /// A flag that indicates that we should do round robin scheduling of threads else randomized scheduling is used.
409    fixed_scheduling: bool,
410}
411
412impl VisitProvenance for ThreadManager<'_> {
413    fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
414        let ThreadManager {
415            threads,
416            thread_local_allocs,
417            active_thread: _,
418            yield_active_thread: _,
419            fixed_scheduling: _,
420        } = self;
421
422        for thread in threads {
423            thread.visit_provenance(visit);
424        }
425        for ptr in thread_local_allocs.values() {
426            ptr.visit_provenance(visit);
427        }
428    }
429}
430
431impl<'tcx> ThreadManager<'tcx> {
432    pub(crate) fn new(config: &MiriConfig) -> Self {
433        let mut threads = IndexVec::new();
434        // Create the main thread and add it to the list of threads.
435        threads.push(Thread::new(Some("main"), None));
436        Self {
437            active_thread: ThreadId::MAIN_THREAD,
438            threads,
439            thread_local_allocs: Default::default(),
440            yield_active_thread: false,
441            fixed_scheduling: config.fixed_scheduling,
442        }
443    }
444
445    pub(crate) fn init(
446        ecx: &mut MiriInterpCx<'tcx>,
447        on_main_stack_empty: StackEmptyCallback<'tcx>,
448    ) {
449        ecx.machine.threads.threads[ThreadId::MAIN_THREAD].on_stack_empty =
450            Some(on_main_stack_empty);
451        if ecx.tcx.sess.target.os.as_ref() != "windows" {
452            // The main thread can *not* be joined on except on windows.
453            ecx.machine.threads.threads[ThreadId::MAIN_THREAD].join_status =
454                ThreadJoinStatus::Detached;
455        }
456    }
457
458    pub fn thread_id_try_from(&self, id: impl TryInto<u32>) -> Result<ThreadId, ThreadNotFound> {
459        if let Ok(id) = id.try_into()
460            && usize::try_from(id).is_ok_and(|id| id < self.threads.len())
461        {
462            Ok(ThreadId(id))
463        } else {
464            Err(ThreadNotFound)
465        }
466    }
467
468    /// Check if we have an allocation for the given thread local static for the
469    /// active thread.
470    fn get_thread_local_alloc_id(&self, def_id: DefId) -> Option<StrictPointer> {
471        self.thread_local_allocs.get(&(def_id, self.active_thread)).cloned()
472    }
473
474    /// Set the pointer for the allocation of the given thread local
475    /// static for the active thread.
476    ///
477    /// Panics if a thread local is initialized twice for the same thread.
478    fn set_thread_local_alloc(&mut self, def_id: DefId, ptr: StrictPointer) {
479        self.thread_local_allocs.try_insert((def_id, self.active_thread), ptr).unwrap();
480    }
481
482    /// Borrow the stack of the active thread.
483    pub fn active_thread_stack(&self) -> &[Frame<'tcx, Provenance, FrameExtra<'tcx>>] {
484        &self.threads[self.active_thread].stack
485    }
486
487    /// Mutably borrow the stack of the active thread.
488    pub fn active_thread_stack_mut(
489        &mut self,
490    ) -> &mut Vec<Frame<'tcx, Provenance, FrameExtra<'tcx>>> {
491        &mut self.threads[self.active_thread].stack
492    }
493
494    pub fn all_stacks(
495        &self,
496    ) -> impl Iterator<Item = (ThreadId, &[Frame<'tcx, Provenance, FrameExtra<'tcx>>])> {
497        self.threads.iter_enumerated().map(|(id, t)| (id, &t.stack[..]))
498    }
499
500    /// Create a new thread and returns its id.
501    fn create_thread(&mut self, on_stack_empty: StackEmptyCallback<'tcx>) -> ThreadId {
502        let new_thread_id = ThreadId::new(self.threads.len());
503        self.threads.push(Thread::new(None, Some(on_stack_empty)));
504        new_thread_id
505    }
506
507    /// Set an active thread and return the id of the thread that was active before.
508    fn set_active_thread_id(&mut self, id: ThreadId) -> ThreadId {
509        assert!(id.index() < self.threads.len());
510        info!(
511            "---------- Now executing on thread `{}` (previous: `{}`) ----------------------------------------",
512            self.get_thread_display_name(id),
513            self.get_thread_display_name(self.active_thread)
514        );
515        std::mem::replace(&mut self.active_thread, id)
516    }
517
518    /// Get the id of the currently active thread.
519    pub fn active_thread(&self) -> ThreadId {
520        self.active_thread
521    }
522
523    /// Get the total number of threads that were ever spawn by this program.
524    pub fn get_total_thread_count(&self) -> usize {
525        self.threads.len()
526    }
527
528    /// Get the total of threads that are currently live, i.e., not yet terminated.
529    /// (They might be blocked.)
530    pub fn get_live_thread_count(&self) -> usize {
531        self.threads.iter().filter(|t| !t.state.is_terminated()).count()
532    }
533
534    /// Has the given thread terminated?
535    fn has_terminated(&self, thread_id: ThreadId) -> bool {
536        self.threads[thread_id].state.is_terminated()
537    }
538
539    /// Have all threads terminated?
540    fn have_all_terminated(&self) -> bool {
541        self.threads.iter().all(|thread| thread.state.is_terminated())
542    }
543
544    /// Enable the thread for execution. The thread must be terminated.
545    fn enable_thread(&mut self, thread_id: ThreadId) {
546        assert!(self.has_terminated(thread_id));
547        self.threads[thread_id].state = ThreadState::Enabled;
548    }
549
550    /// Get a mutable borrow of the currently active thread.
551    pub fn active_thread_mut(&mut self) -> &mut Thread<'tcx> {
552        &mut self.threads[self.active_thread]
553    }
554
555    /// Get a shared borrow of the currently active thread.
556    pub fn active_thread_ref(&self) -> &Thread<'tcx> {
557        &self.threads[self.active_thread]
558    }
559
560    /// Mark the thread as detached, which means that no other thread will try
561    /// to join it and the thread is responsible for cleaning up.
562    ///
563    /// `allow_terminated_joined` allows detaching joined threads that have already terminated.
564    /// This matches Windows's behavior for `CloseHandle`.
565    ///
566    /// See <https://docs.microsoft.com/en-us/windows/win32/procthread/thread-handles-and-identifiers>:
567    /// > The handle is valid until closed, even after the thread it represents has been terminated.
568    fn detach_thread(&mut self, id: ThreadId, allow_terminated_joined: bool) -> InterpResult<'tcx> {
569        trace!("detaching {:?}", id);
570
571        let is_ub = if allow_terminated_joined && self.threads[id].state.is_terminated() {
572            // "Detached" in particular means "not yet joined". Redundant detaching is still UB.
573            self.threads[id].join_status == ThreadJoinStatus::Detached
574        } else {
575            self.threads[id].join_status != ThreadJoinStatus::Joinable
576        };
577        if is_ub {
578            throw_ub_format!("trying to detach thread that was already detached or joined");
579        }
580
581        self.threads[id].join_status = ThreadJoinStatus::Detached;
582        interp_ok(())
583    }
584
585    /// Set the name of the given thread.
586    pub fn set_thread_name(&mut self, thread: ThreadId, new_thread_name: Vec<u8>) {
587        self.threads[thread].thread_name = Some(new_thread_name);
588    }
589
590    /// Get the name of the given thread.
591    pub fn get_thread_name(&self, thread: ThreadId) -> Option<&[u8]> {
592        self.threads[thread].thread_name()
593    }
594
595    pub fn get_thread_display_name(&self, thread: ThreadId) -> String {
596        self.threads[thread].thread_display_name(thread)
597    }
598
599    /// Put the thread into the blocked state.
600    fn block_thread(
601        &mut self,
602        reason: BlockReason,
603        timeout: Option<Timeout>,
604        callback: DynUnblockCallback<'tcx>,
605    ) {
606        let state = &mut self.threads[self.active_thread].state;
607        assert!(state.is_enabled());
608        *state = ThreadState::Blocked { reason, timeout, callback }
609    }
610
611    /// Change the active thread to some enabled thread.
612    fn yield_active_thread(&mut self) {
613        // We do not yield immediately, as swapping out the current stack while executing a MIR statement
614        // could lead to all sorts of confusion.
615        // We should only switch stacks between steps.
616        self.yield_active_thread = true;
617    }
618
619    /// Get the wait time for the next timeout, or `None` if no timeout is pending.
620    fn next_callback_wait_time(&self, clock: &MonotonicClock) -> Option<Duration> {
621        self.threads
622            .iter()
623            .filter_map(|t| {
624                match &t.state {
625                    ThreadState::Blocked { timeout: Some(timeout), .. } =>
626                        Some(timeout.get_wait_time(clock)),
627                    _ => None,
628                }
629            })
630            .min()
631    }
632}
633
634impl<'tcx> EvalContextPrivExt<'tcx> for MiriInterpCx<'tcx> {}
635trait EvalContextPrivExt<'tcx>: MiriInterpCxExt<'tcx> {
636    /// Execute a timeout callback on the callback's thread.
637    #[inline]
638    fn run_timeout_callback(&mut self) -> InterpResult<'tcx> {
639        let this = self.eval_context_mut();
640        let mut found_callback = None;
641        // Find a blocked thread that has timed out.
642        for (id, thread) in this.machine.threads.threads.iter_enumerated_mut() {
643            match &thread.state {
644                ThreadState::Blocked { timeout: Some(timeout), .. }
645                    if timeout.get_wait_time(&this.machine.monotonic_clock) == Duration::ZERO =>
646                {
647                    let old_state = mem::replace(&mut thread.state, ThreadState::Enabled);
648                    let ThreadState::Blocked { callback, .. } = old_state else { unreachable!() };
649                    found_callback = Some((id, callback));
650                    // Run the fallback (after the loop because borrow-checking).
651                    break;
652                }
653                _ => {}
654            }
655        }
656        if let Some((thread, callback)) = found_callback {
657            // This back-and-forth with `set_active_thread` is here because of two
658            // design decisions:
659            // 1. Make the caller and not the callback responsible for changing
660            //    thread.
661            // 2. Make the scheduler the only place that can change the active
662            //    thread.
663            let old_thread = this.machine.threads.set_active_thread_id(thread);
664            callback.call(this, UnblockKind::TimedOut)?;
665            this.machine.threads.set_active_thread_id(old_thread);
666        }
667        // found_callback can remain None if the computer's clock
668        // was shifted after calling the scheduler and before the call
669        // to get_ready_callback (see issue
670        // https://github.com/rust-lang/miri/issues/1763). In this case,
671        // just do nothing, which effectively just returns to the
672        // scheduler.
673        interp_ok(())
674    }
675
676    #[inline]
677    fn run_on_stack_empty(&mut self) -> InterpResult<'tcx, Poll<()>> {
678        let this = self.eval_context_mut();
679        // Inform GenMC that a thread has finished all user code. GenMC needs to know this for scheduling.
680        // FIXME(GenMC): Thread-local destructors *are* user code, so this is odd. Also now that we
681        // support pre-main constructors, it can get called there as well.
682        if let Some(genmc_ctx) = this.machine.data_race.as_genmc_ref() {
683            let thread_id = this.active_thread();
684            genmc_ctx.handle_thread_stack_empty(thread_id);
685        }
686        let mut callback = this
687            .active_thread_mut()
688            .on_stack_empty
689            .take()
690            .expect("`on_stack_empty` not set up, or already running");
691        let res = callback(this)?;
692        this.active_thread_mut().on_stack_empty = Some(callback);
693        interp_ok(res)
694    }
695
696    /// Decide which action to take next and on which thread.
697    ///
698    /// The currently implemented scheduling policy is the one that is commonly
699    /// used in stateless model checkers such as Loom: run the active thread as
700    /// long as we can and switch only when we have to (the active thread was
701    /// blocked, terminated, or has explicitly asked to be preempted).
702    ///
703    /// If GenMC mode is active, the scheduling is instead handled by GenMC.
704    fn schedule(&mut self) -> InterpResult<'tcx, SchedulingAction> {
705        let this = self.eval_context_mut();
706        // In GenMC mode, we let GenMC do the scheduling
707        if let Some(genmc_ctx) = this.machine.data_race.as_genmc_ref() {
708            let next_thread_id = genmc_ctx.schedule_thread(this)?;
709
710            let thread_manager = &mut this.machine.threads;
711            thread_manager.active_thread = next_thread_id;
712            thread_manager.yield_active_thread = false;
713
714            assert!(thread_manager.threads[thread_manager.active_thread].state.is_enabled());
715            return interp_ok(SchedulingAction::ExecuteStep);
716        }
717
718        // We are not in GenMC mode, so we control the schedule
719        let thread_manager = &mut this.machine.threads;
720        let clock = &this.machine.monotonic_clock;
721        let rng = this.machine.rng.get_mut();
722        // This thread and the program can keep going.
723        if thread_manager.threads[thread_manager.active_thread].state.is_enabled()
724            && !thread_manager.yield_active_thread
725        {
726            // The currently active thread is still enabled, just continue with it.
727            return interp_ok(SchedulingAction::ExecuteStep);
728        }
729        // The active thread yielded or got terminated. Let's see if there are any timeouts to take
730        // care of. We do this *before* running any other thread, to ensure that timeouts "in the
731        // past" fire before any other thread can take an action. This ensures that for
732        // `pthread_cond_timedwait`, "an error is returned if [...] the absolute time specified by
733        // abstime has already been passed at the time of the call".
734        // <https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_cond_timedwait.html>
735        let potential_sleep_time = thread_manager.next_callback_wait_time(clock);
736        if potential_sleep_time == Some(Duration::ZERO) {
737            return interp_ok(SchedulingAction::ExecuteTimeoutCallback);
738        }
739        // No callbacks immediately scheduled, pick a regular thread to execute.
740        // The active thread blocked or yielded. So we go search for another enabled thread.
741        // We build the list of threads by starting with the threads after the current one, followed by
742        // the threads before the current one and then the current thread itself (i.e., this iterator acts
743        // like `threads.rotate_left(self.active_thread.index() + 1)`. This ensures that if we pick the first
744        // eligible thread, we do regular round-robin scheduling, and all threads get a chance to take a step.
745        let mut threads_iter = thread_manager
746            .threads
747            .iter_enumerated()
748            .skip(thread_manager.active_thread.index() + 1)
749            .chain(
750                thread_manager
751                    .threads
752                    .iter_enumerated()
753                    .take(thread_manager.active_thread.index() + 1),
754            )
755            .filter(|(_id, thread)| thread.state.is_enabled());
756        // Pick a new thread, and switch to it.
757        let new_thread = if thread_manager.fixed_scheduling {
758            threads_iter.next()
759        } else {
760            threads_iter.choose(rng)
761        };
762
763        if let Some((id, _thread)) = new_thread {
764            if thread_manager.active_thread != id {
765                info!(
766                    "---------- Now executing on thread `{}` (previous: `{}`) ----------------------------------------",
767                    thread_manager.get_thread_display_name(id),
768                    thread_manager.get_thread_display_name(thread_manager.active_thread)
769                );
770                thread_manager.active_thread = id;
771            }
772        }
773        // This completes the `yield`, if any was requested.
774        thread_manager.yield_active_thread = false;
775
776        if thread_manager.threads[thread_manager.active_thread].state.is_enabled() {
777            return interp_ok(SchedulingAction::ExecuteStep);
778        }
779        // We have not found a thread to execute.
780        if thread_manager.threads.iter().all(|thread| thread.state.is_terminated()) {
781            unreachable!("all threads terminated without the main thread terminating?!");
782        } else if let Some(sleep_time) = potential_sleep_time {
783            // All threads are currently blocked, but we have unexecuted
784            // timeout_callbacks, which may unblock some of the threads. Hence,
785            // sleep until the first callback.
786            interp_ok(SchedulingAction::Sleep(sleep_time))
787        } else {
788            throw_machine_stop!(TerminationInfo::Deadlock);
789        }
790    }
791}
792
793// Public interface to thread management.
794impl<'tcx> EvalContextExt<'tcx> for crate::MiriInterpCx<'tcx> {}
795pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
796    #[inline]
797    fn thread_id_try_from(&self, id: impl TryInto<u32>) -> Result<ThreadId, ThreadNotFound> {
798        self.eval_context_ref().machine.threads.thread_id_try_from(id)
799    }
800
801    /// Get a thread-specific allocation id for the given thread-local static.
802    /// If needed, allocate a new one.
803    fn get_or_create_thread_local_alloc(
804        &mut self,
805        def_id: DefId,
806    ) -> InterpResult<'tcx, StrictPointer> {
807        let this = self.eval_context_mut();
808        let tcx = this.tcx;
809        if let Some(old_alloc) = this.machine.threads.get_thread_local_alloc_id(def_id) {
810            // We already have a thread-specific allocation id for this
811            // thread-local static.
812            interp_ok(old_alloc)
813        } else {
814            // We need to allocate a thread-specific allocation id for this
815            // thread-local static.
816            // First, we compute the initial value for this static.
817            if tcx.is_foreign_item(def_id) {
818                throw_unsup_format!("foreign thread-local statics are not supported");
819            }
820            let params = this.machine.get_default_alloc_params();
821            let alloc = this.ctfe_query(|tcx| tcx.eval_static_initializer(def_id))?;
822            // We make a full copy of this allocation.
823            let mut alloc = alloc.inner().adjust_from_tcx(
824                &this.tcx,
825                |bytes, align| {
826                    interp_ok(MiriAllocBytes::from_bytes(
827                        std::borrow::Cow::Borrowed(bytes),
828                        align,
829                        params,
830                    ))
831                },
832                |ptr| this.global_root_pointer(ptr),
833            )?;
834            // This allocation will be deallocated when the thread dies, so it is not in read-only memory.
835            alloc.mutability = Mutability::Mut;
836            // Create a fresh allocation with this content.
837            let ptr = this.insert_allocation(alloc, MiriMemoryKind::Tls.into())?;
838            this.machine.threads.set_thread_local_alloc(def_id, ptr);
839            interp_ok(ptr)
840        }
841    }
842
843    /// Start a regular (non-main) thread.
844    #[inline]
845    fn start_regular_thread(
846        &mut self,
847        thread: Option<MPlaceTy<'tcx>>,
848        start_routine: Pointer,
849        start_abi: ExternAbi,
850        func_arg: ImmTy<'tcx>,
851        ret_layout: TyAndLayout<'tcx>,
852    ) -> InterpResult<'tcx, ThreadId> {
853        let this = self.eval_context_mut();
854
855        // Create the new thread
856        let new_thread_id = this.machine.threads.create_thread({
857            let mut state = tls::TlsDtorsState::default();
858            Box::new(move |m| state.on_stack_empty(m))
859        });
860        let current_span = this.machine.current_span();
861        match &mut this.machine.data_race {
862            GlobalDataRaceHandler::None => {}
863            GlobalDataRaceHandler::Vclocks(data_race) =>
864                data_race.thread_created(&this.machine.threads, new_thread_id, current_span),
865            GlobalDataRaceHandler::Genmc(genmc_ctx) =>
866                genmc_ctx.handle_thread_create(&this.machine.threads, new_thread_id)?,
867        }
868        // Write the current thread-id, switch to the next thread later
869        // to treat this write operation as occurring on the current thread.
870        if let Some(thread_info_place) = thread {
871            this.write_scalar(
872                Scalar::from_uint(new_thread_id.to_u32(), thread_info_place.layout.size),
873                &thread_info_place,
874            )?;
875        }
876
877        // Finally switch to new thread so that we can push the first stackframe.
878        // After this all accesses will be treated as occurring in the new thread.
879        let old_thread_id = this.machine.threads.set_active_thread_id(new_thread_id);
880
881        // The child inherits its parent's cpu affinity.
882        if let Some(cpuset) = this.machine.thread_cpu_affinity.get(&old_thread_id).cloned() {
883            this.machine.thread_cpu_affinity.insert(new_thread_id, cpuset);
884        }
885
886        // Perform the function pointer load in the new thread frame.
887        let instance = this.get_ptr_fn(start_routine)?.as_instance()?;
888
889        // Note: the returned value is currently ignored (see the FIXME in
890        // pthread_join in shims/unix/thread.rs) because the Rust standard library does not use
891        // it.
892        let ret_place = this.allocate(ret_layout, MiriMemoryKind::Machine.into())?;
893
894        this.call_function(
895            instance,
896            start_abi,
897            &[func_arg],
898            Some(&ret_place),
899            ReturnContinuation::Stop { cleanup: true },
900        )?;
901
902        // Restore the old active thread frame.
903        this.machine.threads.set_active_thread_id(old_thread_id);
904
905        interp_ok(new_thread_id)
906    }
907
908    /// Handles thread termination of the active thread: wakes up threads joining on this one,
909    /// and deals with the thread's thread-local statics according to `tls_alloc_action`.
910    ///
911    /// This is called by the eval loop when a thread's on_stack_empty returns `Ready`.
912    fn terminate_active_thread(&mut self, tls_alloc_action: TlsAllocAction) -> InterpResult<'tcx> {
913        let this = self.eval_context_mut();
914
915        // Mark thread as terminated.
916        let thread = this.active_thread_mut();
917        assert!(thread.stack.is_empty(), "only threads with an empty stack can be terminated");
918        thread.state = ThreadState::Terminated;
919        match &mut this.machine.data_race {
920            GlobalDataRaceHandler::None => {}
921            GlobalDataRaceHandler::Vclocks(data_race) =>
922                data_race.thread_terminated(&this.machine.threads),
923            GlobalDataRaceHandler::Genmc(genmc_ctx) =>
924                genmc_ctx.handle_thread_finish(&this.machine.threads)?,
925        }
926        // Deallocate TLS.
927        let gone_thread = this.active_thread();
928        {
929            let mut free_tls_statics = Vec::new();
930            this.machine.threads.thread_local_allocs.retain(|&(_def_id, thread), &mut alloc_id| {
931                if thread != gone_thread {
932                    // A different thread, keep this static around.
933                    return true;
934                }
935                // Delete this static from the map and from memory.
936                // We cannot free directly here as we cannot use `?` in this context.
937                free_tls_statics.push(alloc_id);
938                false
939            });
940            // Now free the TLS statics.
941            for ptr in free_tls_statics {
942                match tls_alloc_action {
943                    TlsAllocAction::Deallocate =>
944                        this.deallocate_ptr(ptr.into(), None, MiriMemoryKind::Tls.into())?,
945                    TlsAllocAction::Leak =>
946                        if let Some(alloc) = ptr.provenance.get_alloc_id() {
947                            trace!(
948                                "Thread-local static leaked and stored as static root: {:?}",
949                                alloc
950                            );
951                            this.machine.static_roots.push(alloc);
952                        },
953                }
954            }
955        }
956        // Unblock joining threads.
957        let unblock_reason = BlockReason::Join(gone_thread);
958        let threads = &this.machine.threads.threads;
959        let joining_threads = threads
960            .iter_enumerated()
961            .filter(|(_, thread)| thread.state.is_blocked_on(unblock_reason))
962            .map(|(id, _)| id)
963            .collect::<Vec<_>>();
964        for thread in joining_threads {
965            this.unblock_thread(thread, unblock_reason)?;
966        }
967
968        interp_ok(())
969    }
970
971    /// Block the current thread, with an optional timeout.
972    /// The callback will be invoked when the thread gets unblocked.
973    #[inline]
974    fn block_thread(
975        &mut self,
976        reason: BlockReason,
977        timeout: Option<(TimeoutClock, TimeoutAnchor, Duration)>,
978        callback: DynUnblockCallback<'tcx>,
979    ) {
980        let this = self.eval_context_mut();
981        let timeout = timeout.map(|(clock, anchor, duration)| {
982            let anchor = match clock {
983                TimeoutClock::RealTime => {
984                    assert!(
985                        this.machine.communicate(),
986                        "cannot have `RealTime` timeout with isolation enabled!"
987                    );
988                    Timeout::RealTime(match anchor {
989                        TimeoutAnchor::Absolute => SystemTime::UNIX_EPOCH,
990                        TimeoutAnchor::Relative => SystemTime::now(),
991                    })
992                }
993                TimeoutClock::Monotonic =>
994                    Timeout::Monotonic(match anchor {
995                        TimeoutAnchor::Absolute => this.machine.monotonic_clock.epoch(),
996                        TimeoutAnchor::Relative => this.machine.monotonic_clock.now(),
997                    }),
998            };
999            anchor.add_lossy(duration)
1000        });
1001        this.machine.threads.block_thread(reason, timeout, callback);
1002    }
1003
1004    /// Put the blocked thread into the enabled state.
1005    /// Sanity-checks that the thread previously was blocked for the right reason.
1006    fn unblock_thread(&mut self, thread: ThreadId, reason: BlockReason) -> InterpResult<'tcx> {
1007        let this = self.eval_context_mut();
1008        let old_state =
1009            mem::replace(&mut this.machine.threads.threads[thread].state, ThreadState::Enabled);
1010        let callback = match old_state {
1011            ThreadState::Blocked { reason: actual_reason, callback, .. } => {
1012                assert_eq!(
1013                    reason, actual_reason,
1014                    "unblock_thread: thread was blocked for the wrong reason"
1015                );
1016                callback
1017            }
1018            _ => panic!("unblock_thread: thread was not blocked"),
1019        };
1020        // The callback must be executed in the previously blocked thread.
1021        let old_thread = this.machine.threads.set_active_thread_id(thread);
1022        callback.call(this, UnblockKind::Ready)?;
1023        this.machine.threads.set_active_thread_id(old_thread);
1024        interp_ok(())
1025    }
1026
1027    #[inline]
1028    fn detach_thread(
1029        &mut self,
1030        thread_id: ThreadId,
1031        allow_terminated_joined: bool,
1032    ) -> InterpResult<'tcx> {
1033        let this = self.eval_context_mut();
1034        this.machine.threads.detach_thread(thread_id, allow_terminated_joined)
1035    }
1036
1037    /// Mark that the active thread tries to join the thread with `joined_thread_id`.
1038    ///
1039    /// When the join is successful (immediately, or as soon as the joined thread finishes), `success_retval` will be written to `return_dest`.
1040    fn join_thread(
1041        &mut self,
1042        joined_thread_id: ThreadId,
1043        success_retval: Scalar,
1044        return_dest: &MPlaceTy<'tcx>,
1045    ) -> InterpResult<'tcx> {
1046        let this = self.eval_context_mut();
1047        let thread_mgr = &mut this.machine.threads;
1048        if thread_mgr.threads[joined_thread_id].join_status == ThreadJoinStatus::Detached {
1049            // On Windows this corresponds to joining on a closed handle.
1050            throw_ub_format!("trying to join a detached thread");
1051        }
1052
1053        fn after_join<'tcx>(
1054            this: &mut InterpCx<'tcx, MiriMachine<'tcx>>,
1055            joined_thread_id: ThreadId,
1056            success_retval: Scalar,
1057            return_dest: &MPlaceTy<'tcx>,
1058        ) -> InterpResult<'tcx> {
1059            let threads = &this.machine.threads;
1060            match &mut this.machine.data_race {
1061                GlobalDataRaceHandler::None => {}
1062                GlobalDataRaceHandler::Vclocks(data_race) =>
1063                    data_race.thread_joined(threads, joined_thread_id),
1064                GlobalDataRaceHandler::Genmc(genmc_ctx) =>
1065                    genmc_ctx.handle_thread_join(threads.active_thread, joined_thread_id)?,
1066            }
1067            this.write_scalar(success_retval, return_dest)?;
1068            interp_ok(())
1069        }
1070
1071        // Mark the joined thread as being joined so that we detect if other
1072        // threads try to join it.
1073        thread_mgr.threads[joined_thread_id].join_status = ThreadJoinStatus::Joined;
1074        if !thread_mgr.threads[joined_thread_id].state.is_terminated() {
1075            trace!(
1076                "{:?} blocked on {:?} when trying to join",
1077                thread_mgr.active_thread, joined_thread_id
1078            );
1079            // The joined thread is still running, we need to wait for it.
1080            // Once we get unblocked, perform the appropriate synchronization and write the return value.
1081            let dest = return_dest.clone();
1082            thread_mgr.block_thread(
1083                BlockReason::Join(joined_thread_id),
1084                None,
1085                callback!(
1086                    @capture<'tcx> {
1087                        joined_thread_id: ThreadId,
1088                        dest: MPlaceTy<'tcx>,
1089                        success_retval: Scalar,
1090                    }
1091                    |this, unblock: UnblockKind| {
1092                        assert_eq!(unblock, UnblockKind::Ready);
1093                        after_join(this, joined_thread_id, success_retval, &dest)
1094                    }
1095                ),
1096            );
1097        } else {
1098            // The thread has already terminated - establish happens-before and write the return value.
1099            after_join(this, joined_thread_id, success_retval, return_dest)?;
1100        }
1101        interp_ok(())
1102    }
1103
1104    /// Mark that the active thread tries to exclusively join the thread with `joined_thread_id`.
1105    /// If the thread is already joined by another thread, it will throw UB.
1106    ///
1107    /// When the join is successful (immediately, or as soon as the joined thread finishes), `success_retval` will be written to `return_dest`.
1108    fn join_thread_exclusive(
1109        &mut self,
1110        joined_thread_id: ThreadId,
1111        success_retval: Scalar,
1112        return_dest: &MPlaceTy<'tcx>,
1113    ) -> InterpResult<'tcx> {
1114        let this = self.eval_context_mut();
1115        let threads = &this.machine.threads.threads;
1116        if threads[joined_thread_id].join_status == ThreadJoinStatus::Joined {
1117            throw_ub_format!("trying to join an already joined thread");
1118        }
1119
1120        if joined_thread_id == this.machine.threads.active_thread {
1121            throw_ub_format!("trying to join itself");
1122        }
1123
1124        // Sanity check `join_status`.
1125        assert!(
1126            threads
1127                .iter()
1128                .all(|thread| { !thread.state.is_blocked_on(BlockReason::Join(joined_thread_id)) }),
1129            "this thread already has threads waiting for its termination"
1130        );
1131
1132        this.join_thread(joined_thread_id, success_retval, return_dest)
1133    }
1134
1135    #[inline]
1136    fn active_thread(&self) -> ThreadId {
1137        let this = self.eval_context_ref();
1138        this.machine.threads.active_thread()
1139    }
1140
1141    #[inline]
1142    fn active_thread_mut(&mut self) -> &mut Thread<'tcx> {
1143        let this = self.eval_context_mut();
1144        this.machine.threads.active_thread_mut()
1145    }
1146
1147    #[inline]
1148    fn active_thread_ref(&self) -> &Thread<'tcx> {
1149        let this = self.eval_context_ref();
1150        this.machine.threads.active_thread_ref()
1151    }
1152
1153    #[inline]
1154    fn get_total_thread_count(&self) -> usize {
1155        let this = self.eval_context_ref();
1156        this.machine.threads.get_total_thread_count()
1157    }
1158
1159    #[inline]
1160    fn have_all_terminated(&self) -> bool {
1161        let this = self.eval_context_ref();
1162        this.machine.threads.have_all_terminated()
1163    }
1164
1165    #[inline]
1166    fn enable_thread(&mut self, thread_id: ThreadId) {
1167        let this = self.eval_context_mut();
1168        this.machine.threads.enable_thread(thread_id);
1169    }
1170
1171    #[inline]
1172    fn active_thread_stack<'a>(&'a self) -> &'a [Frame<'tcx, Provenance, FrameExtra<'tcx>>] {
1173        let this = self.eval_context_ref();
1174        this.machine.threads.active_thread_stack()
1175    }
1176
1177    #[inline]
1178    fn active_thread_stack_mut<'a>(
1179        &'a mut self,
1180    ) -> &'a mut Vec<Frame<'tcx, Provenance, FrameExtra<'tcx>>> {
1181        let this = self.eval_context_mut();
1182        this.machine.threads.active_thread_stack_mut()
1183    }
1184
1185    /// Set the name of the current thread. The buffer must not include the null terminator.
1186    #[inline]
1187    fn set_thread_name(&mut self, thread: ThreadId, new_thread_name: Vec<u8>) {
1188        self.eval_context_mut().machine.threads.set_thread_name(thread, new_thread_name);
1189    }
1190
1191    #[inline]
1192    fn get_thread_name<'c>(&'c self, thread: ThreadId) -> Option<&'c [u8]>
1193    where
1194        'tcx: 'c,
1195    {
1196        self.eval_context_ref().machine.threads.get_thread_name(thread)
1197    }
1198
1199    #[inline]
1200    fn yield_active_thread(&mut self) {
1201        self.eval_context_mut().machine.threads.yield_active_thread();
1202    }
1203
1204    #[inline]
1205    fn maybe_preempt_active_thread(&mut self) {
1206        use rand::Rng as _;
1207
1208        let this = self.eval_context_mut();
1209        if !this.machine.threads.fixed_scheduling
1210            && this.machine.rng.get_mut().random_bool(this.machine.preemption_rate)
1211        {
1212            this.yield_active_thread();
1213        }
1214    }
1215
1216    /// Run the core interpreter loop. Returns only when an interrupt occurs (an error or program
1217    /// termination).
1218    fn run_threads(&mut self) -> InterpResult<'tcx, !> {
1219        let this = self.eval_context_mut();
1220        loop {
1221            if CTRL_C_RECEIVED.load(Relaxed) {
1222                this.machine.handle_abnormal_termination();
1223                throw_machine_stop!(TerminationInfo::Interrupted);
1224            }
1225            match this.schedule()? {
1226                SchedulingAction::ExecuteStep => {
1227                    if !this.step()? {
1228                        // See if this thread can do something else.
1229                        match this.run_on_stack_empty()? {
1230                            Poll::Pending => {} // keep going
1231                            Poll::Ready(()) =>
1232                                this.terminate_active_thread(TlsAllocAction::Deallocate)?,
1233                        }
1234                    }
1235                }
1236                SchedulingAction::ExecuteTimeoutCallback => {
1237                    this.run_timeout_callback()?;
1238                }
1239                SchedulingAction::Sleep(duration) => {
1240                    this.machine.monotonic_clock.sleep(duration);
1241                }
1242            }
1243        }
1244    }
1245}