1use std::cell::{Cell, OnceCell, RefCell};
6use std::collections::VecDeque;
7use std::io;
8use std::io::ErrorKind;
9
10use crate::concurrency::VClock;
11use crate::shims::files::{
12 EvalContextExt as _, FileDescription, FileDescriptionRef, WeakFileDescriptionRef,
13};
14use crate::shims::unix::UnixFileDescription;
15use crate::shims::unix::linux_like::epoll::{EpollReadyEvents, EvalContextExt as _};
16use crate::*;
17
18const MAX_SOCKETPAIR_BUFFER_CAPACITY: usize = 212992;
22
23#[derive(Debug, PartialEq)]
24enum AnonSocketType {
25 Socketpair,
27 PipeRead,
29 PipeWrite,
31}
32
33#[derive(Debug)]
35struct AnonSocket {
36 readbuf: Option<RefCell<Buffer>>,
39 peer_fd: OnceCell<WeakFileDescriptionRef<AnonSocket>>,
43 peer_lost_data: Cell<bool>,
47 blocked_read_tid: RefCell<Vec<ThreadId>>,
50 blocked_write_tid: RefCell<Vec<ThreadId>>,
53 is_nonblock: Cell<bool>,
55 fd_type: AnonSocketType,
57}
58
59#[derive(Debug)]
60struct Buffer {
61 buf: VecDeque<u8>,
62 clock: VClock,
63}
64
65impl Buffer {
66 fn new() -> Self {
67 Buffer { buf: VecDeque::new(), clock: VClock::default() }
68 }
69}
70
71impl AnonSocket {
72 fn peer_fd(&self) -> &WeakFileDescriptionRef<AnonSocket> {
73 self.peer_fd.get().unwrap()
74 }
75}
76
77impl FileDescription for AnonSocket {
78 fn name(&self) -> &'static str {
79 match self.fd_type {
80 AnonSocketType::Socketpair => "socketpair",
81 AnonSocketType::PipeRead | AnonSocketType::PipeWrite => "pipe",
82 }
83 }
84
85 fn close<'tcx>(
86 self,
87 _communicate_allowed: bool,
88 ecx: &mut MiriInterpCx<'tcx>,
89 ) -> InterpResult<'tcx, io::Result<()>> {
90 if let Some(peer_fd) = self.peer_fd().upgrade() {
91 if let Some(readbuf) = &self.readbuf {
94 if !readbuf.borrow().buf.is_empty() {
95 peer_fd.peer_lost_data.set(true);
96 }
97 }
98 ecx.check_and_update_readiness(peer_fd)?;
100 }
101 interp_ok(Ok(()))
102 }
103
104 fn read<'tcx>(
105 self: FileDescriptionRef<Self>,
106 _communicate_allowed: bool,
107 ptr: Pointer,
108 len: usize,
109 ecx: &mut MiriInterpCx<'tcx>,
110 finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
111 ) -> InterpResult<'tcx> {
112 anonsocket_read(self, ptr, len, ecx, finish)
113 }
114
115 fn write<'tcx>(
116 self: FileDescriptionRef<Self>,
117 _communicate_allowed: bool,
118 ptr: Pointer,
119 len: usize,
120 ecx: &mut MiriInterpCx<'tcx>,
121 finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
122 ) -> InterpResult<'tcx> {
123 anonsocket_write(self, ptr, len, ecx, finish)
124 }
125
126 fn as_unix<'tcx>(&self, _ecx: &MiriInterpCx<'tcx>) -> &dyn UnixFileDescription {
127 self
128 }
129
130 fn get_flags<'tcx>(&self, ecx: &mut MiriInterpCx<'tcx>) -> InterpResult<'tcx, Scalar> {
131 let mut flags = 0;
132
133 match self.fd_type {
138 AnonSocketType::Socketpair => {
139 flags |= ecx.eval_libc_i32("O_RDWR");
140 }
141 AnonSocketType::PipeRead => {
142 flags |= ecx.eval_libc_i32("O_RDONLY");
143 }
144 AnonSocketType::PipeWrite => {
145 flags |= ecx.eval_libc_i32("O_WRONLY");
146 }
147 }
148
149 if self.is_nonblock.get() {
151 flags |= ecx.eval_libc_i32("O_NONBLOCK");
152 }
153
154 interp_ok(Scalar::from_i32(flags))
155 }
156
157 fn set_flags<'tcx>(
158 &self,
159 mut flag: i32,
160 ecx: &mut MiriInterpCx<'tcx>,
161 ) -> InterpResult<'tcx, Scalar> {
162 let o_nonblock = ecx.eval_libc_i32("O_NONBLOCK");
165 let o_rdonly = ecx.eval_libc_i32("O_RDONLY");
166 let o_wronly = ecx.eval_libc_i32("O_WRONLY");
167 let o_rdwr = ecx.eval_libc_i32("O_RDWR");
168
169 if flag & o_nonblock == o_nonblock {
171 self.is_nonblock.set(true);
172 flag &= !o_nonblock;
173 } else {
174 self.is_nonblock.set(false);
175 }
176
177 flag &= !(o_rdonly | o_wronly | o_rdwr);
179
180 if flag != 0 {
182 throw_unsup_format!(
183 "fcntl: only O_NONBLOCK is supported for F_SETFL on socketpairs and pipes"
184 )
185 }
186
187 interp_ok(Scalar::from_i32(0))
188 }
189}
190
191fn anonsocket_write<'tcx>(
193 self_ref: FileDescriptionRef<AnonSocket>,
194 ptr: Pointer,
195 len: usize,
196 ecx: &mut MiriInterpCx<'tcx>,
197 finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
198) -> InterpResult<'tcx> {
199 if len == 0 {
202 return finish.call(ecx, Ok(0));
203 }
204
205 let Some(peer_fd) = self_ref.peer_fd().upgrade() else {
207 return finish.call(ecx, Err(ErrorKind::BrokenPipe.into()));
210 };
211
212 let Some(writebuf) = &peer_fd.readbuf else {
213 return finish.call(ecx, Err(IoError::LibcError("EBADF")));
215 };
216
217 let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(writebuf.borrow().buf.len());
219 if available_space == 0 {
220 if self_ref.is_nonblock.get() {
221 return finish.call(ecx, Err(ErrorKind::WouldBlock.into()));
223 } else {
224 self_ref.blocked_write_tid.borrow_mut().push(ecx.active_thread());
225 let weak_self_ref = FileDescriptionRef::downgrade(&self_ref);
228 ecx.block_thread(
229 BlockReason::UnnamedSocket,
230 None,
231 callback!(
232 @capture<'tcx> {
233 weak_self_ref: WeakFileDescriptionRef<AnonSocket>,
234 ptr: Pointer,
235 len: usize,
236 finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
237 }
238 |this, unblock: UnblockKind| {
239 assert_eq!(unblock, UnblockKind::Ready);
240 let self_ref = weak_self_ref.upgrade().unwrap();
243 anonsocket_write(self_ref, ptr, len, this, finish)
244 }
245 ),
246 );
247 }
248 } else {
249 let mut writebuf = writebuf.borrow_mut();
251 ecx.release_clock(|clock| {
253 writebuf.clock.join(clock);
254 });
255 let write_size = len.min(available_space);
257 let actual_write_size = ecx.write_to_host(&mut writebuf.buf, write_size, ptr)?.unwrap();
258 assert_eq!(actual_write_size, write_size);
259
260 drop(writebuf);
262
263 let waiting_threads = std::mem::take(&mut *peer_fd.blocked_read_tid.borrow_mut());
265 for thread_id in waiting_threads {
267 ecx.unblock_thread(thread_id, BlockReason::UnnamedSocket)?;
268 }
269 ecx.check_and_update_readiness(peer_fd)?;
272
273 return finish.call(ecx, Ok(write_size));
274 }
275 interp_ok(())
276}
277
278fn anonsocket_read<'tcx>(
280 self_ref: FileDescriptionRef<AnonSocket>,
281 ptr: Pointer,
282 len: usize,
283 ecx: &mut MiriInterpCx<'tcx>,
284 finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
285) -> InterpResult<'tcx> {
286 if len == 0 {
288 return finish.call(ecx, Ok(0));
289 }
290
291 let Some(readbuf) = &self_ref.readbuf else {
292 throw_unsup_format!("reading from the write end of a pipe")
295 };
296
297 if readbuf.borrow_mut().buf.is_empty() {
298 if self_ref.peer_fd().upgrade().is_none() {
299 return finish.call(ecx, Ok(0));
302 } else if self_ref.is_nonblock.get() {
303 return finish.call(ecx, Err(ErrorKind::WouldBlock.into()));
309 } else {
310 self_ref.blocked_read_tid.borrow_mut().push(ecx.active_thread());
311 let weak_self_ref = FileDescriptionRef::downgrade(&self_ref);
314 ecx.block_thread(
315 BlockReason::UnnamedSocket,
316 None,
317 callback!(
318 @capture<'tcx> {
319 weak_self_ref: WeakFileDescriptionRef<AnonSocket>,
320 ptr: Pointer,
321 len: usize,
322 finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
323 }
324 |this, unblock: UnblockKind| {
325 assert_eq!(unblock, UnblockKind::Ready);
326 let self_ref = weak_self_ref.upgrade().unwrap();
329 anonsocket_read(self_ref, ptr, len, this, finish)
330 }
331 ),
332 );
333 }
334 } else {
335 let mut readbuf = readbuf.borrow_mut();
337 ecx.acquire_clock(&readbuf.clock);
341
342 let read_size = ecx.read_from_host(&mut readbuf.buf, len, ptr)?.unwrap();
345
346 drop(readbuf);
348
349 if let Some(peer_fd) = self_ref.peer_fd().upgrade() {
357 let waiting_threads = std::mem::take(&mut *peer_fd.blocked_write_tid.borrow_mut());
359 for thread_id in waiting_threads {
361 ecx.unblock_thread(thread_id, BlockReason::UnnamedSocket)?;
362 }
363 ecx.check_and_update_readiness(peer_fd)?;
365 };
366
367 return finish.call(ecx, Ok(read_size));
368 }
369 interp_ok(())
370}
371
372impl UnixFileDescription for AnonSocket {
373 fn get_epoll_ready_events<'tcx>(&self) -> InterpResult<'tcx, EpollReadyEvents> {
374 let mut epoll_ready_events = EpollReadyEvents::new();
378
379 if let Some(readbuf) = &self.readbuf {
381 if !readbuf.borrow().buf.is_empty() {
382 epoll_ready_events.epollin = true;
383 }
384 } else {
385 epoll_ready_events.epollin = true;
387 }
388
389 if let Some(peer_fd) = self.peer_fd().upgrade() {
391 if let Some(writebuf) = &peer_fd.readbuf {
392 let data_size = writebuf.borrow().buf.len();
393 let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(data_size);
394 if available_space != 0 {
395 epoll_ready_events.epollout = true;
396 }
397 } else {
398 epoll_ready_events.epollout = true;
400 }
401 } else {
402 epoll_ready_events.epollrdhup = true;
405 epoll_ready_events.epollhup = true;
406 epoll_ready_events.epollin = true;
410 epoll_ready_events.epollout = true;
411 if self.peer_lost_data.get() {
413 epoll_ready_events.epollerr = true;
414 }
415 }
416 interp_ok(epoll_ready_events)
417 }
418}
419
420impl<'tcx> EvalContextExt<'tcx> for crate::MiriInterpCx<'tcx> {}
421pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
422 fn socketpair(
425 &mut self,
426 domain: &OpTy<'tcx>,
427 type_: &OpTy<'tcx>,
428 protocol: &OpTy<'tcx>,
429 sv: &OpTy<'tcx>,
430 ) -> InterpResult<'tcx, Scalar> {
431 let this = self.eval_context_mut();
432
433 let domain = this.read_scalar(domain)?.to_i32()?;
434 let mut flags = this.read_scalar(type_)?.to_i32()?;
435 let protocol = this.read_scalar(protocol)?.to_i32()?;
436 let sv = this.deref_pointer_as(sv, this.machine.layouts.i32)?;
438
439 let mut is_sock_nonblock = false;
440
441 if this.tcx.sess.target.os == "linux" {
444 let sock_nonblock = this.eval_libc_i32("SOCK_NONBLOCK");
446 let sock_cloexec = this.eval_libc_i32("SOCK_CLOEXEC");
447 if flags & sock_nonblock == sock_nonblock {
448 is_sock_nonblock = true;
449 flags &= !sock_nonblock;
450 }
451 if flags & sock_cloexec == sock_cloexec {
452 flags &= !sock_cloexec;
453 }
454 }
455
456 if domain != this.eval_libc_i32("AF_UNIX") && domain != this.eval_libc_i32("AF_LOCAL") {
460 throw_unsup_format!(
461 "socketpair: domain {:#x} is unsupported, only AF_UNIX \
462 and AF_LOCAL are allowed",
463 domain
464 );
465 } else if flags != this.eval_libc_i32("SOCK_STREAM") {
466 throw_unsup_format!(
467 "socketpair: type {:#x} is unsupported, only SOCK_STREAM, \
468 SOCK_CLOEXEC and SOCK_NONBLOCK are allowed",
469 flags
470 );
471 } else if protocol != 0 {
472 throw_unsup_format!(
473 "socketpair: socket protocol {protocol} is unsupported, \
474 only 0 is allowed",
475 );
476 }
477
478 let fds = &mut this.machine.fds;
480 let fd0 = fds.new_ref(AnonSocket {
481 readbuf: Some(RefCell::new(Buffer::new())),
482 peer_fd: OnceCell::new(),
483 peer_lost_data: Cell::new(false),
484 blocked_read_tid: RefCell::new(Vec::new()),
485 blocked_write_tid: RefCell::new(Vec::new()),
486 is_nonblock: Cell::new(is_sock_nonblock),
487 fd_type: AnonSocketType::Socketpair,
488 });
489 let fd1 = fds.new_ref(AnonSocket {
490 readbuf: Some(RefCell::new(Buffer::new())),
491 peer_fd: OnceCell::new(),
492 peer_lost_data: Cell::new(false),
493 blocked_read_tid: RefCell::new(Vec::new()),
494 blocked_write_tid: RefCell::new(Vec::new()),
495 is_nonblock: Cell::new(is_sock_nonblock),
496 fd_type: AnonSocketType::Socketpair,
497 });
498
499 fd0.peer_fd.set(FileDescriptionRef::downgrade(&fd1)).unwrap();
501 fd1.peer_fd.set(FileDescriptionRef::downgrade(&fd0)).unwrap();
502
503 let sv0 = fds.insert(fd0);
505 let sv1 = fds.insert(fd1);
506
507 let sv0 = Scalar::from_int(sv0, sv.layout.size);
509 let sv1 = Scalar::from_int(sv1, sv.layout.size);
510 this.write_scalar(sv0, &sv)?;
511 this.write_scalar(sv1, &sv.offset(sv.layout.size, sv.layout, this)?)?;
512
513 interp_ok(Scalar::from_i32(0))
514 }
515
516 fn pipe2(
517 &mut self,
518 pipefd: &OpTy<'tcx>,
519 flags: Option<&OpTy<'tcx>>,
520 ) -> InterpResult<'tcx, Scalar> {
521 let this = self.eval_context_mut();
522
523 let pipefd = this.deref_pointer_as(pipefd, this.machine.layouts.i32)?;
524 let mut flags = match flags {
525 Some(flags) => this.read_scalar(flags)?.to_i32()?,
526 None => 0,
527 };
528
529 let cloexec = this.eval_libc_i32("O_CLOEXEC");
530 let o_nonblock = this.eval_libc_i32("O_NONBLOCK");
531
532 let mut is_nonblock = false;
535 if flags & o_nonblock == o_nonblock {
536 is_nonblock = true;
537 flags &= !o_nonblock;
538 }
539 if flags & cloexec == cloexec {
541 flags &= !cloexec;
542 }
543 if flags != 0 {
544 throw_unsup_format!("unsupported flags in `pipe2`");
545 }
546
547 let fds = &mut this.machine.fds;
550 let fd0 = fds.new_ref(AnonSocket {
551 readbuf: Some(RefCell::new(Buffer::new())),
552 peer_fd: OnceCell::new(),
553 peer_lost_data: Cell::new(false),
554 blocked_read_tid: RefCell::new(Vec::new()),
555 blocked_write_tid: RefCell::new(Vec::new()),
556 is_nonblock: Cell::new(is_nonblock),
557 fd_type: AnonSocketType::PipeRead,
558 });
559 let fd1 = fds.new_ref(AnonSocket {
560 readbuf: None,
561 peer_fd: OnceCell::new(),
562 peer_lost_data: Cell::new(false),
563 blocked_read_tid: RefCell::new(Vec::new()),
564 blocked_write_tid: RefCell::new(Vec::new()),
565 is_nonblock: Cell::new(is_nonblock),
566 fd_type: AnonSocketType::PipeWrite,
567 });
568
569 fd0.peer_fd.set(FileDescriptionRef::downgrade(&fd1)).unwrap();
571 fd1.peer_fd.set(FileDescriptionRef::downgrade(&fd0)).unwrap();
572
573 let pipefd0 = fds.insert(fd0);
575 let pipefd1 = fds.insert(fd1);
576
577 let pipefd0 = Scalar::from_int(pipefd0, pipefd.layout.size);
579 let pipefd1 = Scalar::from_int(pipefd1, pipefd.layout.size);
580 this.write_scalar(pipefd0, &pipefd)?;
581 this.write_scalar(pipefd1, &pipefd.offset(pipefd.layout.size, pipefd.layout, this)?)?;
582
583 interp_ok(Scalar::from_i32(0))
584 }
585}