1use std::mem;
4use std::sync::atomic::Ordering::Relaxed;
5use std::task::Poll;
6use std::time::{Duration, SystemTime};
7
8use either::Either;
9use rand::seq::IteratorRandom;
10use rustc_abi::ExternAbi;
11use rustc_const_eval::CTRL_C_RECEIVED;
12use rustc_data_structures::fx::FxHashMap;
13use rustc_hir::def_id::DefId;
14use rustc_index::{Idx, IndexVec};
15use rustc_middle::mir::Mutability;
16use rustc_middle::ty::layout::TyAndLayout;
17use rustc_span::Span;
18
19use crate::concurrency::GlobalDataRaceHandler;
20use crate::shims::tls;
21use crate::*;
22
23#[derive(Clone, Copy, Debug, PartialEq)]
24enum SchedulingAction {
25 ExecuteStep,
27 ExecuteTimeoutCallback,
29 Sleep(Duration),
31}
32
33#[derive(Clone, Copy, Debug, PartialEq)]
35pub enum TlsAllocAction {
36 Deallocate,
38 Leak,
41}
42
43#[derive(Clone, Copy, Debug, PartialEq)]
45pub enum UnblockKind {
46 Ready,
48 TimedOut,
50}
51
52pub type DynUnblockCallback<'tcx> = DynMachineCallback<'tcx, UnblockKind>;
55
56#[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)]
58pub struct ThreadId(u32);
59
60impl ThreadId {
61 pub fn to_u32(self) -> u32 {
62 self.0
63 }
64
65 pub fn new_unchecked(id: u32) -> Self {
67 Self(id)
68 }
69
70 pub const MAIN_THREAD: ThreadId = ThreadId(0);
71}
72
73impl Idx for ThreadId {
74 fn new(idx: usize) -> Self {
75 ThreadId(u32::try_from(idx).unwrap())
76 }
77
78 fn index(self) -> usize {
79 usize::try_from(self.0).unwrap()
80 }
81}
82
83impl From<ThreadId> for u64 {
84 fn from(t: ThreadId) -> Self {
85 t.0.into()
86 }
87}
88
89#[derive(Debug, Copy, Clone, PartialEq, Eq)]
91pub enum BlockReason {
92 Join(ThreadId),
95 Sleep,
97 Mutex,
99 Condvar,
101 RwLock,
103 Futex,
105 InitOnce,
107 Epoll,
109 Eventfd,
111 UnnamedSocket,
113}
114
115enum ThreadState<'tcx> {
117 Enabled,
119 Blocked { reason: BlockReason, timeout: Option<Timeout>, callback: DynUnblockCallback<'tcx> },
121 Terminated,
124}
125
126impl<'tcx> std::fmt::Debug for ThreadState<'tcx> {
127 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128 match self {
129 Self::Enabled => write!(f, "Enabled"),
130 Self::Blocked { reason, timeout, .. } =>
131 f.debug_struct("Blocked").field("reason", reason).field("timeout", timeout).finish(),
132 Self::Terminated => write!(f, "Terminated"),
133 }
134 }
135}
136
137impl<'tcx> ThreadState<'tcx> {
138 fn is_enabled(&self) -> bool {
139 matches!(self, ThreadState::Enabled)
140 }
141
142 fn is_terminated(&self) -> bool {
143 matches!(self, ThreadState::Terminated)
144 }
145
146 fn is_blocked_on(&self, reason: BlockReason) -> bool {
147 matches!(*self, ThreadState::Blocked { reason: actual_reason, .. } if actual_reason == reason)
148 }
149}
150
151#[derive(Debug, Copy, Clone, PartialEq, Eq)]
153enum ThreadJoinStatus {
154 Joinable,
156 Detached,
159 Joined,
161}
162
163pub struct Thread<'tcx> {
165 state: ThreadState<'tcx>,
166
167 thread_name: Option<Vec<u8>>,
169
170 stack: Vec<Frame<'tcx, Provenance, FrameExtra<'tcx>>>,
172
173 pub(crate) on_stack_empty: Option<StackEmptyCallback<'tcx>>,
178
179 top_user_relevant_frame: Option<usize>,
185
186 join_status: ThreadJoinStatus,
188
189 pub(crate) unwind_payloads: Vec<ImmTy<'tcx>>,
198
199 pub(crate) last_error: Option<MPlaceTy<'tcx>>,
201}
202
203pub type StackEmptyCallback<'tcx> =
204 Box<dyn FnMut(&mut MiriInterpCx<'tcx>) -> InterpResult<'tcx, Poll<()>> + 'tcx>;
205
206impl<'tcx> Thread<'tcx> {
207 fn thread_name(&self) -> Option<&[u8]> {
209 self.thread_name.as_deref()
210 }
211
212 fn thread_display_name(&self, id: ThreadId) -> String {
214 if let Some(ref thread_name) = self.thread_name {
215 String::from_utf8_lossy(thread_name).into_owned()
216 } else {
217 format!("unnamed-{}", id.index())
218 }
219 }
220
221 fn compute_top_user_relevant_frame(&self, skip: usize) -> Option<usize> {
227 self.stack
228 .iter()
229 .enumerate()
230 .rev()
231 .skip(skip)
232 .find_map(|(idx, frame)| if frame.extra.is_user_relevant { Some(idx) } else { None })
233 }
234
235 pub fn recompute_top_user_relevant_frame(&mut self, skip: usize) {
238 self.top_user_relevant_frame = self.compute_top_user_relevant_frame(skip);
239 }
240
241 pub fn set_top_user_relevant_frame(&mut self, frame_idx: usize) {
244 debug_assert_eq!(Some(frame_idx), self.compute_top_user_relevant_frame(0));
245 self.top_user_relevant_frame = Some(frame_idx);
246 }
247
248 pub fn top_user_relevant_frame(&self) -> Option<usize> {
251 debug_assert_eq!(self.top_user_relevant_frame, self.compute_top_user_relevant_frame(0));
252 self.top_user_relevant_frame.or_else(|| self.stack.len().checked_sub(1))
256 }
257
258 pub fn current_span(&self) -> Span {
259 self.top_user_relevant_frame()
260 .map(|frame_idx| self.stack[frame_idx].current_span())
261 .unwrap_or(rustc_span::DUMMY_SP)
262 }
263}
264
265impl<'tcx> std::fmt::Debug for Thread<'tcx> {
266 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
267 write!(
268 f,
269 "{}({:?}, {:?})",
270 String::from_utf8_lossy(self.thread_name().unwrap_or(b"<unnamed>")),
271 self.state,
272 self.join_status
273 )
274 }
275}
276
277impl<'tcx> Thread<'tcx> {
278 fn new(name: Option<&str>, on_stack_empty: Option<StackEmptyCallback<'tcx>>) -> Self {
279 Self {
280 state: ThreadState::Enabled,
281 thread_name: name.map(|name| Vec::from(name.as_bytes())),
282 stack: Vec::new(),
283 top_user_relevant_frame: None,
284 join_status: ThreadJoinStatus::Joinable,
285 unwind_payloads: Vec::new(),
286 last_error: None,
287 on_stack_empty,
288 }
289 }
290}
291
292impl VisitProvenance for Thread<'_> {
293 fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
294 let Thread {
295 unwind_payloads: panic_payload,
296 last_error,
297 stack,
298 top_user_relevant_frame: _,
299 state: _,
300 thread_name: _,
301 join_status: _,
302 on_stack_empty: _, } = self;
304
305 for payload in panic_payload {
306 payload.visit_provenance(visit);
307 }
308 last_error.visit_provenance(visit);
309 for frame in stack {
310 frame.visit_provenance(visit)
311 }
312 }
313}
314
315impl VisitProvenance for Frame<'_, Provenance, FrameExtra<'_>> {
316 fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
317 let Frame {
318 return_place,
319 locals,
320 extra,
321 ..
323 } = self;
324
325 return_place.visit_provenance(visit);
327 for local in locals.iter() {
329 match local.as_mplace_or_imm() {
330 None => {}
331 Some(Either::Left((ptr, meta))) => {
332 ptr.visit_provenance(visit);
333 meta.visit_provenance(visit);
334 }
335 Some(Either::Right(imm)) => {
336 imm.visit_provenance(visit);
337 }
338 }
339 }
340
341 extra.visit_provenance(visit);
342 }
343}
344
345#[derive(Debug)]
347enum Timeout {
348 Monotonic(Instant),
349 RealTime(SystemTime),
350}
351
352impl Timeout {
353 fn get_wait_time(&self, clock: &MonotonicClock) -> Duration {
355 match self {
356 Timeout::Monotonic(instant) => instant.duration_since(clock.now()),
357 Timeout::RealTime(time) =>
358 time.duration_since(SystemTime::now()).unwrap_or(Duration::ZERO),
359 }
360 }
361
362 fn add_lossy(&self, duration: Duration) -> Self {
364 match self {
365 Timeout::Monotonic(i) => Timeout::Monotonic(i.add_lossy(duration)),
366 Timeout::RealTime(s) => {
367 Timeout::RealTime(
369 s.checked_add(duration)
370 .unwrap_or_else(|| s.checked_add(Duration::from_secs(3600)).unwrap()),
371 )
372 }
373 }
374 }
375}
376
377#[derive(Debug, Copy, Clone, PartialEq)]
379pub enum TimeoutClock {
380 Monotonic,
381 RealTime,
382}
383
384#[derive(Debug, Copy, Clone)]
386pub enum TimeoutAnchor {
387 Relative,
388 Absolute,
389}
390
391#[derive(Debug, Copy, Clone)]
393pub struct ThreadNotFound;
394
395#[derive(Debug)]
397pub struct ThreadManager<'tcx> {
398 active_thread: ThreadId,
400 threads: IndexVec<ThreadId, Thread<'tcx>>,
404 thread_local_allocs: FxHashMap<(DefId, ThreadId), StrictPointer>,
406 yield_active_thread: bool,
408 fixed_scheduling: bool,
410}
411
412impl VisitProvenance for ThreadManager<'_> {
413 fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
414 let ThreadManager {
415 threads,
416 thread_local_allocs,
417 active_thread: _,
418 yield_active_thread: _,
419 fixed_scheduling: _,
420 } = self;
421
422 for thread in threads {
423 thread.visit_provenance(visit);
424 }
425 for ptr in thread_local_allocs.values() {
426 ptr.visit_provenance(visit);
427 }
428 }
429}
430
431impl<'tcx> ThreadManager<'tcx> {
432 pub(crate) fn new(config: &MiriConfig) -> Self {
433 let mut threads = IndexVec::new();
434 threads.push(Thread::new(Some("main"), None));
436 Self {
437 active_thread: ThreadId::MAIN_THREAD,
438 threads,
439 thread_local_allocs: Default::default(),
440 yield_active_thread: false,
441 fixed_scheduling: config.fixed_scheduling,
442 }
443 }
444
445 pub(crate) fn init(
446 ecx: &mut MiriInterpCx<'tcx>,
447 on_main_stack_empty: StackEmptyCallback<'tcx>,
448 ) {
449 ecx.machine.threads.threads[ThreadId::MAIN_THREAD].on_stack_empty =
450 Some(on_main_stack_empty);
451 if ecx.tcx.sess.target.os.as_ref() != "windows" {
452 ecx.machine.threads.threads[ThreadId::MAIN_THREAD].join_status =
454 ThreadJoinStatus::Detached;
455 }
456 }
457
458 pub fn thread_id_try_from(&self, id: impl TryInto<u32>) -> Result<ThreadId, ThreadNotFound> {
459 if let Ok(id) = id.try_into()
460 && usize::try_from(id).is_ok_and(|id| id < self.threads.len())
461 {
462 Ok(ThreadId(id))
463 } else {
464 Err(ThreadNotFound)
465 }
466 }
467
468 fn get_thread_local_alloc_id(&self, def_id: DefId) -> Option<StrictPointer> {
471 self.thread_local_allocs.get(&(def_id, self.active_thread)).cloned()
472 }
473
474 fn set_thread_local_alloc(&mut self, def_id: DefId, ptr: StrictPointer) {
479 self.thread_local_allocs.try_insert((def_id, self.active_thread), ptr).unwrap();
480 }
481
482 pub fn active_thread_stack(&self) -> &[Frame<'tcx, Provenance, FrameExtra<'tcx>>] {
484 &self.threads[self.active_thread].stack
485 }
486
487 pub fn active_thread_stack_mut(
489 &mut self,
490 ) -> &mut Vec<Frame<'tcx, Provenance, FrameExtra<'tcx>>> {
491 &mut self.threads[self.active_thread].stack
492 }
493
494 pub fn all_stacks(
495 &self,
496 ) -> impl Iterator<Item = (ThreadId, &[Frame<'tcx, Provenance, FrameExtra<'tcx>>])> {
497 self.threads.iter_enumerated().map(|(id, t)| (id, &t.stack[..]))
498 }
499
500 fn create_thread(&mut self, on_stack_empty: StackEmptyCallback<'tcx>) -> ThreadId {
502 let new_thread_id = ThreadId::new(self.threads.len());
503 self.threads.push(Thread::new(None, Some(on_stack_empty)));
504 new_thread_id
505 }
506
507 fn set_active_thread_id(&mut self, id: ThreadId) -> ThreadId {
509 assert!(id.index() < self.threads.len());
510 info!(
511 "---------- Now executing on thread `{}` (previous: `{}`) ----------------------------------------",
512 self.get_thread_display_name(id),
513 self.get_thread_display_name(self.active_thread)
514 );
515 std::mem::replace(&mut self.active_thread, id)
516 }
517
518 pub fn active_thread(&self) -> ThreadId {
520 self.active_thread
521 }
522
523 pub fn get_total_thread_count(&self) -> usize {
525 self.threads.len()
526 }
527
528 pub fn get_live_thread_count(&self) -> usize {
531 self.threads.iter().filter(|t| !t.state.is_terminated()).count()
532 }
533
534 fn has_terminated(&self, thread_id: ThreadId) -> bool {
536 self.threads[thread_id].state.is_terminated()
537 }
538
539 fn have_all_terminated(&self) -> bool {
541 self.threads.iter().all(|thread| thread.state.is_terminated())
542 }
543
544 fn enable_thread(&mut self, thread_id: ThreadId) {
546 assert!(self.has_terminated(thread_id));
547 self.threads[thread_id].state = ThreadState::Enabled;
548 }
549
550 pub fn active_thread_mut(&mut self) -> &mut Thread<'tcx> {
552 &mut self.threads[self.active_thread]
553 }
554
555 pub fn active_thread_ref(&self) -> &Thread<'tcx> {
557 &self.threads[self.active_thread]
558 }
559
560 fn detach_thread(&mut self, id: ThreadId, allow_terminated_joined: bool) -> InterpResult<'tcx> {
569 trace!("detaching {:?}", id);
570
571 let is_ub = if allow_terminated_joined && self.threads[id].state.is_terminated() {
572 self.threads[id].join_status == ThreadJoinStatus::Detached
574 } else {
575 self.threads[id].join_status != ThreadJoinStatus::Joinable
576 };
577 if is_ub {
578 throw_ub_format!("trying to detach thread that was already detached or joined");
579 }
580
581 self.threads[id].join_status = ThreadJoinStatus::Detached;
582 interp_ok(())
583 }
584
585 pub fn set_thread_name(&mut self, thread: ThreadId, new_thread_name: Vec<u8>) {
587 self.threads[thread].thread_name = Some(new_thread_name);
588 }
589
590 pub fn get_thread_name(&self, thread: ThreadId) -> Option<&[u8]> {
592 self.threads[thread].thread_name()
593 }
594
595 pub fn get_thread_display_name(&self, thread: ThreadId) -> String {
596 self.threads[thread].thread_display_name(thread)
597 }
598
599 fn block_thread(
601 &mut self,
602 reason: BlockReason,
603 timeout: Option<Timeout>,
604 callback: DynUnblockCallback<'tcx>,
605 ) {
606 let state = &mut self.threads[self.active_thread].state;
607 assert!(state.is_enabled());
608 *state = ThreadState::Blocked { reason, timeout, callback }
609 }
610
611 fn yield_active_thread(&mut self) {
613 self.yield_active_thread = true;
617 }
618
619 fn next_callback_wait_time(&self, clock: &MonotonicClock) -> Option<Duration> {
621 self.threads
622 .iter()
623 .filter_map(|t| {
624 match &t.state {
625 ThreadState::Blocked { timeout: Some(timeout), .. } =>
626 Some(timeout.get_wait_time(clock)),
627 _ => None,
628 }
629 })
630 .min()
631 }
632}
633
634impl<'tcx> EvalContextPrivExt<'tcx> for MiriInterpCx<'tcx> {}
635trait EvalContextPrivExt<'tcx>: MiriInterpCxExt<'tcx> {
636 #[inline]
638 fn run_timeout_callback(&mut self) -> InterpResult<'tcx> {
639 let this = self.eval_context_mut();
640 let mut found_callback = None;
641 for (id, thread) in this.machine.threads.threads.iter_enumerated_mut() {
643 match &thread.state {
644 ThreadState::Blocked { timeout: Some(timeout), .. }
645 if timeout.get_wait_time(&this.machine.monotonic_clock) == Duration::ZERO =>
646 {
647 let old_state = mem::replace(&mut thread.state, ThreadState::Enabled);
648 let ThreadState::Blocked { callback, .. } = old_state else { unreachable!() };
649 found_callback = Some((id, callback));
650 break;
652 }
653 _ => {}
654 }
655 }
656 if let Some((thread, callback)) = found_callback {
657 let old_thread = this.machine.threads.set_active_thread_id(thread);
664 callback.call(this, UnblockKind::TimedOut)?;
665 this.machine.threads.set_active_thread_id(old_thread);
666 }
667 interp_ok(())
674 }
675
676 #[inline]
677 fn run_on_stack_empty(&mut self) -> InterpResult<'tcx, Poll<()>> {
678 let this = self.eval_context_mut();
679 if let Some(genmc_ctx) = this.machine.data_race.as_genmc_ref() {
683 let thread_id = this.active_thread();
684 genmc_ctx.handle_thread_stack_empty(thread_id);
685 }
686 let mut callback = this
687 .active_thread_mut()
688 .on_stack_empty
689 .take()
690 .expect("`on_stack_empty` not set up, or already running");
691 let res = callback(this)?;
692 this.active_thread_mut().on_stack_empty = Some(callback);
693 interp_ok(res)
694 }
695
696 fn schedule(&mut self) -> InterpResult<'tcx, SchedulingAction> {
705 let this = self.eval_context_mut();
706 if let Some(genmc_ctx) = this.machine.data_race.as_genmc_ref() {
708 let next_thread_id = genmc_ctx.schedule_thread(this)?;
709
710 let thread_manager = &mut this.machine.threads;
711 thread_manager.active_thread = next_thread_id;
712 thread_manager.yield_active_thread = false;
713
714 assert!(thread_manager.threads[thread_manager.active_thread].state.is_enabled());
715 return interp_ok(SchedulingAction::ExecuteStep);
716 }
717
718 let thread_manager = &mut this.machine.threads;
720 let clock = &this.machine.monotonic_clock;
721 let rng = this.machine.rng.get_mut();
722 if thread_manager.threads[thread_manager.active_thread].state.is_enabled()
724 && !thread_manager.yield_active_thread
725 {
726 return interp_ok(SchedulingAction::ExecuteStep);
728 }
729 let potential_sleep_time = thread_manager.next_callback_wait_time(clock);
736 if potential_sleep_time == Some(Duration::ZERO) {
737 return interp_ok(SchedulingAction::ExecuteTimeoutCallback);
738 }
739 let mut threads_iter = thread_manager
746 .threads
747 .iter_enumerated()
748 .skip(thread_manager.active_thread.index() + 1)
749 .chain(
750 thread_manager
751 .threads
752 .iter_enumerated()
753 .take(thread_manager.active_thread.index() + 1),
754 )
755 .filter(|(_id, thread)| thread.state.is_enabled());
756 let new_thread = if thread_manager.fixed_scheduling {
758 threads_iter.next()
759 } else {
760 threads_iter.choose(rng)
761 };
762
763 if let Some((id, _thread)) = new_thread {
764 if thread_manager.active_thread != id {
765 info!(
766 "---------- Now executing on thread `{}` (previous: `{}`) ----------------------------------------",
767 thread_manager.get_thread_display_name(id),
768 thread_manager.get_thread_display_name(thread_manager.active_thread)
769 );
770 thread_manager.active_thread = id;
771 }
772 }
773 thread_manager.yield_active_thread = false;
775
776 if thread_manager.threads[thread_manager.active_thread].state.is_enabled() {
777 return interp_ok(SchedulingAction::ExecuteStep);
778 }
779 if thread_manager.threads.iter().all(|thread| thread.state.is_terminated()) {
781 unreachable!("all threads terminated without the main thread terminating?!");
782 } else if let Some(sleep_time) = potential_sleep_time {
783 interp_ok(SchedulingAction::Sleep(sleep_time))
787 } else {
788 throw_machine_stop!(TerminationInfo::Deadlock);
789 }
790 }
791}
792
793impl<'tcx> EvalContextExt<'tcx> for crate::MiriInterpCx<'tcx> {}
795pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
796 #[inline]
797 fn thread_id_try_from(&self, id: impl TryInto<u32>) -> Result<ThreadId, ThreadNotFound> {
798 self.eval_context_ref().machine.threads.thread_id_try_from(id)
799 }
800
801 fn get_or_create_thread_local_alloc(
804 &mut self,
805 def_id: DefId,
806 ) -> InterpResult<'tcx, StrictPointer> {
807 let this = self.eval_context_mut();
808 let tcx = this.tcx;
809 if let Some(old_alloc) = this.machine.threads.get_thread_local_alloc_id(def_id) {
810 interp_ok(old_alloc)
813 } else {
814 if tcx.is_foreign_item(def_id) {
818 throw_unsup_format!("foreign thread-local statics are not supported");
819 }
820 let params = this.machine.get_default_alloc_params();
821 let alloc = this.ctfe_query(|tcx| tcx.eval_static_initializer(def_id))?;
822 let mut alloc = alloc.inner().adjust_from_tcx(
824 &this.tcx,
825 |bytes, align| {
826 interp_ok(MiriAllocBytes::from_bytes(
827 std::borrow::Cow::Borrowed(bytes),
828 align,
829 params,
830 ))
831 },
832 |ptr| this.global_root_pointer(ptr),
833 )?;
834 alloc.mutability = Mutability::Mut;
836 let ptr = this.insert_allocation(alloc, MiriMemoryKind::Tls.into())?;
838 this.machine.threads.set_thread_local_alloc(def_id, ptr);
839 interp_ok(ptr)
840 }
841 }
842
843 #[inline]
845 fn start_regular_thread(
846 &mut self,
847 thread: Option<MPlaceTy<'tcx>>,
848 start_routine: Pointer,
849 start_abi: ExternAbi,
850 func_arg: ImmTy<'tcx>,
851 ret_layout: TyAndLayout<'tcx>,
852 ) -> InterpResult<'tcx, ThreadId> {
853 let this = self.eval_context_mut();
854
855 let new_thread_id = this.machine.threads.create_thread({
857 let mut state = tls::TlsDtorsState::default();
858 Box::new(move |m| state.on_stack_empty(m))
859 });
860 let current_span = this.machine.current_span();
861 match &mut this.machine.data_race {
862 GlobalDataRaceHandler::None => {}
863 GlobalDataRaceHandler::Vclocks(data_race) =>
864 data_race.thread_created(&this.machine.threads, new_thread_id, current_span),
865 GlobalDataRaceHandler::Genmc(genmc_ctx) =>
866 genmc_ctx.handle_thread_create(&this.machine.threads, new_thread_id)?,
867 }
868 if let Some(thread_info_place) = thread {
871 this.write_scalar(
872 Scalar::from_uint(new_thread_id.to_u32(), thread_info_place.layout.size),
873 &thread_info_place,
874 )?;
875 }
876
877 let old_thread_id = this.machine.threads.set_active_thread_id(new_thread_id);
880
881 if let Some(cpuset) = this.machine.thread_cpu_affinity.get(&old_thread_id).cloned() {
883 this.machine.thread_cpu_affinity.insert(new_thread_id, cpuset);
884 }
885
886 let instance = this.get_ptr_fn(start_routine)?.as_instance()?;
888
889 let ret_place = this.allocate(ret_layout, MiriMemoryKind::Machine.into())?;
893
894 this.call_function(
895 instance,
896 start_abi,
897 &[func_arg],
898 Some(&ret_place),
899 ReturnContinuation::Stop { cleanup: true },
900 )?;
901
902 this.machine.threads.set_active_thread_id(old_thread_id);
904
905 interp_ok(new_thread_id)
906 }
907
908 fn terminate_active_thread(&mut self, tls_alloc_action: TlsAllocAction) -> InterpResult<'tcx> {
913 let this = self.eval_context_mut();
914
915 let thread = this.active_thread_mut();
917 assert!(thread.stack.is_empty(), "only threads with an empty stack can be terminated");
918 thread.state = ThreadState::Terminated;
919 match &mut this.machine.data_race {
920 GlobalDataRaceHandler::None => {}
921 GlobalDataRaceHandler::Vclocks(data_race) =>
922 data_race.thread_terminated(&this.machine.threads),
923 GlobalDataRaceHandler::Genmc(genmc_ctx) =>
924 genmc_ctx.handle_thread_finish(&this.machine.threads)?,
925 }
926 let gone_thread = this.active_thread();
928 {
929 let mut free_tls_statics = Vec::new();
930 this.machine.threads.thread_local_allocs.retain(|&(_def_id, thread), &mut alloc_id| {
931 if thread != gone_thread {
932 return true;
934 }
935 free_tls_statics.push(alloc_id);
938 false
939 });
940 for ptr in free_tls_statics {
942 match tls_alloc_action {
943 TlsAllocAction::Deallocate =>
944 this.deallocate_ptr(ptr.into(), None, MiriMemoryKind::Tls.into())?,
945 TlsAllocAction::Leak =>
946 if let Some(alloc) = ptr.provenance.get_alloc_id() {
947 trace!(
948 "Thread-local static leaked and stored as static root: {:?}",
949 alloc
950 );
951 this.machine.static_roots.push(alloc);
952 },
953 }
954 }
955 }
956 let unblock_reason = BlockReason::Join(gone_thread);
958 let threads = &this.machine.threads.threads;
959 let joining_threads = threads
960 .iter_enumerated()
961 .filter(|(_, thread)| thread.state.is_blocked_on(unblock_reason))
962 .map(|(id, _)| id)
963 .collect::<Vec<_>>();
964 for thread in joining_threads {
965 this.unblock_thread(thread, unblock_reason)?;
966 }
967
968 interp_ok(())
969 }
970
971 #[inline]
974 fn block_thread(
975 &mut self,
976 reason: BlockReason,
977 timeout: Option<(TimeoutClock, TimeoutAnchor, Duration)>,
978 callback: DynUnblockCallback<'tcx>,
979 ) {
980 let this = self.eval_context_mut();
981 let timeout = timeout.map(|(clock, anchor, duration)| {
982 let anchor = match clock {
983 TimeoutClock::RealTime => {
984 assert!(
985 this.machine.communicate(),
986 "cannot have `RealTime` timeout with isolation enabled!"
987 );
988 Timeout::RealTime(match anchor {
989 TimeoutAnchor::Absolute => SystemTime::UNIX_EPOCH,
990 TimeoutAnchor::Relative => SystemTime::now(),
991 })
992 }
993 TimeoutClock::Monotonic =>
994 Timeout::Monotonic(match anchor {
995 TimeoutAnchor::Absolute => this.machine.monotonic_clock.epoch(),
996 TimeoutAnchor::Relative => this.machine.monotonic_clock.now(),
997 }),
998 };
999 anchor.add_lossy(duration)
1000 });
1001 this.machine.threads.block_thread(reason, timeout, callback);
1002 }
1003
1004 fn unblock_thread(&mut self, thread: ThreadId, reason: BlockReason) -> InterpResult<'tcx> {
1007 let this = self.eval_context_mut();
1008 let old_state =
1009 mem::replace(&mut this.machine.threads.threads[thread].state, ThreadState::Enabled);
1010 let callback = match old_state {
1011 ThreadState::Blocked { reason: actual_reason, callback, .. } => {
1012 assert_eq!(
1013 reason, actual_reason,
1014 "unblock_thread: thread was blocked for the wrong reason"
1015 );
1016 callback
1017 }
1018 _ => panic!("unblock_thread: thread was not blocked"),
1019 };
1020 let old_thread = this.machine.threads.set_active_thread_id(thread);
1022 callback.call(this, UnblockKind::Ready)?;
1023 this.machine.threads.set_active_thread_id(old_thread);
1024 interp_ok(())
1025 }
1026
1027 #[inline]
1028 fn detach_thread(
1029 &mut self,
1030 thread_id: ThreadId,
1031 allow_terminated_joined: bool,
1032 ) -> InterpResult<'tcx> {
1033 let this = self.eval_context_mut();
1034 this.machine.threads.detach_thread(thread_id, allow_terminated_joined)
1035 }
1036
1037 fn join_thread(
1041 &mut self,
1042 joined_thread_id: ThreadId,
1043 success_retval: Scalar,
1044 return_dest: &MPlaceTy<'tcx>,
1045 ) -> InterpResult<'tcx> {
1046 let this = self.eval_context_mut();
1047 let thread_mgr = &mut this.machine.threads;
1048 if thread_mgr.threads[joined_thread_id].join_status == ThreadJoinStatus::Detached {
1049 throw_ub_format!("trying to join a detached thread");
1051 }
1052
1053 fn after_join<'tcx>(
1054 this: &mut InterpCx<'tcx, MiriMachine<'tcx>>,
1055 joined_thread_id: ThreadId,
1056 success_retval: Scalar,
1057 return_dest: &MPlaceTy<'tcx>,
1058 ) -> InterpResult<'tcx> {
1059 let threads = &this.machine.threads;
1060 match &mut this.machine.data_race {
1061 GlobalDataRaceHandler::None => {}
1062 GlobalDataRaceHandler::Vclocks(data_race) =>
1063 data_race.thread_joined(threads, joined_thread_id),
1064 GlobalDataRaceHandler::Genmc(genmc_ctx) =>
1065 genmc_ctx.handle_thread_join(threads.active_thread, joined_thread_id)?,
1066 }
1067 this.write_scalar(success_retval, return_dest)?;
1068 interp_ok(())
1069 }
1070
1071 thread_mgr.threads[joined_thread_id].join_status = ThreadJoinStatus::Joined;
1074 if !thread_mgr.threads[joined_thread_id].state.is_terminated() {
1075 trace!(
1076 "{:?} blocked on {:?} when trying to join",
1077 thread_mgr.active_thread, joined_thread_id
1078 );
1079 let dest = return_dest.clone();
1082 thread_mgr.block_thread(
1083 BlockReason::Join(joined_thread_id),
1084 None,
1085 callback!(
1086 @capture<'tcx> {
1087 joined_thread_id: ThreadId,
1088 dest: MPlaceTy<'tcx>,
1089 success_retval: Scalar,
1090 }
1091 |this, unblock: UnblockKind| {
1092 assert_eq!(unblock, UnblockKind::Ready);
1093 after_join(this, joined_thread_id, success_retval, &dest)
1094 }
1095 ),
1096 );
1097 } else {
1098 after_join(this, joined_thread_id, success_retval, return_dest)?;
1100 }
1101 interp_ok(())
1102 }
1103
1104 fn join_thread_exclusive(
1109 &mut self,
1110 joined_thread_id: ThreadId,
1111 success_retval: Scalar,
1112 return_dest: &MPlaceTy<'tcx>,
1113 ) -> InterpResult<'tcx> {
1114 let this = self.eval_context_mut();
1115 let threads = &this.machine.threads.threads;
1116 if threads[joined_thread_id].join_status == ThreadJoinStatus::Joined {
1117 throw_ub_format!("trying to join an already joined thread");
1118 }
1119
1120 if joined_thread_id == this.machine.threads.active_thread {
1121 throw_ub_format!("trying to join itself");
1122 }
1123
1124 assert!(
1126 threads
1127 .iter()
1128 .all(|thread| { !thread.state.is_blocked_on(BlockReason::Join(joined_thread_id)) }),
1129 "this thread already has threads waiting for its termination"
1130 );
1131
1132 this.join_thread(joined_thread_id, success_retval, return_dest)
1133 }
1134
1135 #[inline]
1136 fn active_thread(&self) -> ThreadId {
1137 let this = self.eval_context_ref();
1138 this.machine.threads.active_thread()
1139 }
1140
1141 #[inline]
1142 fn active_thread_mut(&mut self) -> &mut Thread<'tcx> {
1143 let this = self.eval_context_mut();
1144 this.machine.threads.active_thread_mut()
1145 }
1146
1147 #[inline]
1148 fn active_thread_ref(&self) -> &Thread<'tcx> {
1149 let this = self.eval_context_ref();
1150 this.machine.threads.active_thread_ref()
1151 }
1152
1153 #[inline]
1154 fn get_total_thread_count(&self) -> usize {
1155 let this = self.eval_context_ref();
1156 this.machine.threads.get_total_thread_count()
1157 }
1158
1159 #[inline]
1160 fn have_all_terminated(&self) -> bool {
1161 let this = self.eval_context_ref();
1162 this.machine.threads.have_all_terminated()
1163 }
1164
1165 #[inline]
1166 fn enable_thread(&mut self, thread_id: ThreadId) {
1167 let this = self.eval_context_mut();
1168 this.machine.threads.enable_thread(thread_id);
1169 }
1170
1171 #[inline]
1172 fn active_thread_stack<'a>(&'a self) -> &'a [Frame<'tcx, Provenance, FrameExtra<'tcx>>] {
1173 let this = self.eval_context_ref();
1174 this.machine.threads.active_thread_stack()
1175 }
1176
1177 #[inline]
1178 fn active_thread_stack_mut<'a>(
1179 &'a mut self,
1180 ) -> &'a mut Vec<Frame<'tcx, Provenance, FrameExtra<'tcx>>> {
1181 let this = self.eval_context_mut();
1182 this.machine.threads.active_thread_stack_mut()
1183 }
1184
1185 #[inline]
1187 fn set_thread_name(&mut self, thread: ThreadId, new_thread_name: Vec<u8>) {
1188 self.eval_context_mut().machine.threads.set_thread_name(thread, new_thread_name);
1189 }
1190
1191 #[inline]
1192 fn get_thread_name<'c>(&'c self, thread: ThreadId) -> Option<&'c [u8]>
1193 where
1194 'tcx: 'c,
1195 {
1196 self.eval_context_ref().machine.threads.get_thread_name(thread)
1197 }
1198
1199 #[inline]
1200 fn yield_active_thread(&mut self) {
1201 self.eval_context_mut().machine.threads.yield_active_thread();
1202 }
1203
1204 #[inline]
1205 fn maybe_preempt_active_thread(&mut self) {
1206 use rand::Rng as _;
1207
1208 let this = self.eval_context_mut();
1209 if !this.machine.threads.fixed_scheduling
1210 && this.machine.rng.get_mut().random_bool(this.machine.preemption_rate)
1211 {
1212 this.yield_active_thread();
1213 }
1214 }
1215
1216 fn run_threads(&mut self) -> InterpResult<'tcx, !> {
1219 let this = self.eval_context_mut();
1220 loop {
1221 if CTRL_C_RECEIVED.load(Relaxed) {
1222 this.machine.handle_abnormal_termination();
1223 throw_machine_stop!(TerminationInfo::Interrupted);
1224 }
1225 match this.schedule()? {
1226 SchedulingAction::ExecuteStep => {
1227 if !this.step()? {
1228 match this.run_on_stack_empty()? {
1230 Poll::Pending => {} Poll::Ready(()) =>
1232 this.terminate_active_thread(TlsAllocAction::Deallocate)?,
1233 }
1234 }
1235 }
1236 SchedulingAction::ExecuteTimeoutCallback => {
1237 this.run_timeout_callback()?;
1238 }
1239 SchedulingAction::Sleep(duration) => {
1240 this.machine.monotonic_clock.sleep(duration);
1241 }
1242 }
1243 }
1244 }
1245}