1use std::cell::{Cell, OnceCell, RefCell};
6use std::collections::VecDeque;
7use std::io;
8use std::io::ErrorKind;
9
10use crate::concurrency::VClock;
11use crate::shims::files::{
12 EvalContextExt as _, FileDescription, FileDescriptionRef, WeakFileDescriptionRef,
13};
14use crate::shims::unix::UnixFileDescription;
15use crate::shims::unix::linux_like::epoll::{EpollReadyEvents, EvalContextExt as _};
16use crate::*;
17
18const MAX_SOCKETPAIR_BUFFER_CAPACITY: usize = 212992;
22
23#[derive(Debug, PartialEq)]
24enum AnonSocketType {
25 Socketpair,
27 PipeRead,
29 PipeWrite,
31}
32
33#[derive(Debug)]
35struct AnonSocket {
36 readbuf: Option<RefCell<Buffer>>,
39 peer_fd: OnceCell<WeakFileDescriptionRef<AnonSocket>>,
43 peer_lost_data: Cell<bool>,
47 blocked_read_tid: RefCell<Vec<ThreadId>>,
50 blocked_write_tid: RefCell<Vec<ThreadId>>,
53 is_nonblock: Cell<bool>,
55 fd_type: AnonSocketType,
57}
58
59#[derive(Debug)]
60struct Buffer {
61 buf: VecDeque<u8>,
62 clock: VClock,
63}
64
65impl Buffer {
66 fn new() -> Self {
67 Buffer { buf: VecDeque::new(), clock: VClock::default() }
68 }
69}
70
71impl AnonSocket {
72 fn peer_fd(&self) -> &WeakFileDescriptionRef<AnonSocket> {
73 self.peer_fd.get().unwrap()
74 }
75}
76
77impl FileDescription for AnonSocket {
78 fn name(&self) -> &'static str {
79 match self.fd_type {
80 AnonSocketType::Socketpair => "socketpair",
81 AnonSocketType::PipeRead | AnonSocketType::PipeWrite => "pipe",
82 }
83 }
84
85 fn close<'tcx>(
86 self,
87 _communicate_allowed: bool,
88 ecx: &mut MiriInterpCx<'tcx>,
89 ) -> InterpResult<'tcx, io::Result<()>> {
90 if let Some(peer_fd) = self.peer_fd().upgrade() {
91 if let Some(readbuf) = &self.readbuf {
94 if !readbuf.borrow().buf.is_empty() {
95 peer_fd.peer_lost_data.set(true);
96 }
97 }
98 ecx.check_and_update_readiness(peer_fd)?;
100 }
101 interp_ok(Ok(()))
102 }
103
104 fn read<'tcx>(
105 self: FileDescriptionRef<Self>,
106 _communicate_allowed: bool,
107 ptr: Pointer,
108 len: usize,
109 ecx: &mut MiriInterpCx<'tcx>,
110 finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
111 ) -> InterpResult<'tcx> {
112 anonsocket_read(self, ptr, len, ecx, finish)
113 }
114
115 fn write<'tcx>(
116 self: FileDescriptionRef<Self>,
117 _communicate_allowed: bool,
118 ptr: Pointer,
119 len: usize,
120 ecx: &mut MiriInterpCx<'tcx>,
121 finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
122 ) -> InterpResult<'tcx> {
123 anonsocket_write(self, ptr, len, ecx, finish)
124 }
125
126 fn short_fd_operations(&self) -> bool {
127 matches!(self.fd_type, AnonSocketType::Socketpair)
132 }
133
134 fn as_unix<'tcx>(&self, _ecx: &MiriInterpCx<'tcx>) -> &dyn UnixFileDescription {
135 self
136 }
137
138 fn get_flags<'tcx>(&self, ecx: &mut MiriInterpCx<'tcx>) -> InterpResult<'tcx, Scalar> {
139 let mut flags = 0;
140
141 match self.fd_type {
146 AnonSocketType::Socketpair => {
147 flags |= ecx.eval_libc_i32("O_RDWR");
148 }
149 AnonSocketType::PipeRead => {
150 flags |= ecx.eval_libc_i32("O_RDONLY");
151 }
152 AnonSocketType::PipeWrite => {
153 flags |= ecx.eval_libc_i32("O_WRONLY");
154 }
155 }
156
157 if self.is_nonblock.get() {
159 flags |= ecx.eval_libc_i32("O_NONBLOCK");
160 }
161
162 interp_ok(Scalar::from_i32(flags))
163 }
164
165 fn set_flags<'tcx>(
166 &self,
167 mut flag: i32,
168 ecx: &mut MiriInterpCx<'tcx>,
169 ) -> InterpResult<'tcx, Scalar> {
170 let o_nonblock = ecx.eval_libc_i32("O_NONBLOCK");
173 let o_rdonly = ecx.eval_libc_i32("O_RDONLY");
174 let o_wronly = ecx.eval_libc_i32("O_WRONLY");
175 let o_rdwr = ecx.eval_libc_i32("O_RDWR");
176
177 if flag & o_nonblock == o_nonblock {
179 self.is_nonblock.set(true);
180 flag &= !o_nonblock;
181 } else {
182 self.is_nonblock.set(false);
183 }
184
185 flag &= !(o_rdonly | o_wronly | o_rdwr);
187
188 if flag != 0 {
190 throw_unsup_format!(
191 "fcntl: only O_NONBLOCK is supported for F_SETFL on socketpairs and pipes"
192 )
193 }
194
195 interp_ok(Scalar::from_i32(0))
196 }
197}
198
199fn anonsocket_write<'tcx>(
201 self_ref: FileDescriptionRef<AnonSocket>,
202 ptr: Pointer,
203 len: usize,
204 ecx: &mut MiriInterpCx<'tcx>,
205 finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
206) -> InterpResult<'tcx> {
207 if len == 0 {
210 return finish.call(ecx, Ok(0));
211 }
212
213 let Some(peer_fd) = self_ref.peer_fd().upgrade() else {
215 return finish.call(ecx, Err(ErrorKind::BrokenPipe.into()));
218 };
219
220 let Some(writebuf) = &peer_fd.readbuf else {
221 return finish.call(ecx, Err(IoError::LibcError("EBADF")));
223 };
224
225 let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(writebuf.borrow().buf.len());
227 if available_space == 0 {
228 if self_ref.is_nonblock.get() {
229 return finish.call(ecx, Err(ErrorKind::WouldBlock.into()));
231 } else {
232 self_ref.blocked_write_tid.borrow_mut().push(ecx.active_thread());
233 let weak_self_ref = FileDescriptionRef::downgrade(&self_ref);
236 ecx.block_thread(
237 BlockReason::UnnamedSocket,
238 None,
239 callback!(
240 @capture<'tcx> {
241 weak_self_ref: WeakFileDescriptionRef<AnonSocket>,
242 ptr: Pointer,
243 len: usize,
244 finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
245 }
246 |this, unblock: UnblockKind| {
247 assert_eq!(unblock, UnblockKind::Ready);
248 let self_ref = weak_self_ref.upgrade().unwrap();
251 anonsocket_write(self_ref, ptr, len, this, finish)
252 }
253 ),
254 );
255 }
256 } else {
257 let mut writebuf = writebuf.borrow_mut();
259 ecx.release_clock(|clock| {
261 writebuf.clock.join(clock);
262 });
263 let write_size = len.min(available_space);
265 let actual_write_size = ecx.write_to_host(&mut writebuf.buf, write_size, ptr)?.unwrap();
266 assert_eq!(actual_write_size, write_size);
267
268 drop(writebuf);
270
271 let waiting_threads = std::mem::take(&mut *peer_fd.blocked_read_tid.borrow_mut());
273 for thread_id in waiting_threads {
275 ecx.unblock_thread(thread_id, BlockReason::UnnamedSocket)?;
276 }
277 ecx.check_and_update_readiness(peer_fd)?;
280
281 return finish.call(ecx, Ok(write_size));
282 }
283 interp_ok(())
284}
285
286fn anonsocket_read<'tcx>(
288 self_ref: FileDescriptionRef<AnonSocket>,
289 ptr: Pointer,
290 len: usize,
291 ecx: &mut MiriInterpCx<'tcx>,
292 finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
293) -> InterpResult<'tcx> {
294 if len == 0 {
296 return finish.call(ecx, Ok(0));
297 }
298
299 let Some(readbuf) = &self_ref.readbuf else {
300 throw_unsup_format!("reading from the write end of a pipe")
303 };
304
305 if readbuf.borrow_mut().buf.is_empty() {
306 if self_ref.peer_fd().upgrade().is_none() {
307 return finish.call(ecx, Ok(0));
310 } else if self_ref.is_nonblock.get() {
311 return finish.call(ecx, Err(ErrorKind::WouldBlock.into()));
317 } else {
318 self_ref.blocked_read_tid.borrow_mut().push(ecx.active_thread());
319 let weak_self_ref = FileDescriptionRef::downgrade(&self_ref);
322 ecx.block_thread(
323 BlockReason::UnnamedSocket,
324 None,
325 callback!(
326 @capture<'tcx> {
327 weak_self_ref: WeakFileDescriptionRef<AnonSocket>,
328 ptr: Pointer,
329 len: usize,
330 finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
331 }
332 |this, unblock: UnblockKind| {
333 assert_eq!(unblock, UnblockKind::Ready);
334 let self_ref = weak_self_ref.upgrade().unwrap();
337 anonsocket_read(self_ref, ptr, len, this, finish)
338 }
339 ),
340 );
341 }
342 } else {
343 let mut readbuf = readbuf.borrow_mut();
345 ecx.acquire_clock(&readbuf.clock);
349
350 let read_size = ecx.read_from_host(&mut readbuf.buf, len, ptr)?.unwrap();
353
354 drop(readbuf);
356
357 if let Some(peer_fd) = self_ref.peer_fd().upgrade() {
365 let waiting_threads = std::mem::take(&mut *peer_fd.blocked_write_tid.borrow_mut());
367 for thread_id in waiting_threads {
369 ecx.unblock_thread(thread_id, BlockReason::UnnamedSocket)?;
370 }
371 ecx.check_and_update_readiness(peer_fd)?;
373 };
374
375 return finish.call(ecx, Ok(read_size));
376 }
377 interp_ok(())
378}
379
380impl UnixFileDescription for AnonSocket {
381 fn get_epoll_ready_events<'tcx>(&self) -> InterpResult<'tcx, EpollReadyEvents> {
382 let mut epoll_ready_events = EpollReadyEvents::new();
386
387 if let Some(readbuf) = &self.readbuf {
389 if !readbuf.borrow().buf.is_empty() {
390 epoll_ready_events.epollin = true;
391 }
392 } else {
393 epoll_ready_events.epollin = true;
395 }
396
397 if let Some(peer_fd) = self.peer_fd().upgrade() {
399 if let Some(writebuf) = &peer_fd.readbuf {
400 let data_size = writebuf.borrow().buf.len();
401 let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(data_size);
402 if available_space != 0 {
403 epoll_ready_events.epollout = true;
404 }
405 } else {
406 epoll_ready_events.epollout = true;
408 }
409 } else {
410 epoll_ready_events.epollrdhup = true;
413 epoll_ready_events.epollhup = true;
414 epoll_ready_events.epollin = true;
418 epoll_ready_events.epollout = true;
419 if self.peer_lost_data.get() {
421 epoll_ready_events.epollerr = true;
422 }
423 }
424 interp_ok(epoll_ready_events)
425 }
426}
427
428impl<'tcx> EvalContextExt<'tcx> for crate::MiriInterpCx<'tcx> {}
429pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
430 fn socketpair(
433 &mut self,
434 domain: &OpTy<'tcx>,
435 type_: &OpTy<'tcx>,
436 protocol: &OpTy<'tcx>,
437 sv: &OpTy<'tcx>,
438 ) -> InterpResult<'tcx, Scalar> {
439 let this = self.eval_context_mut();
440
441 let domain = this.read_scalar(domain)?.to_i32()?;
442 let mut flags = this.read_scalar(type_)?.to_i32()?;
443 let protocol = this.read_scalar(protocol)?.to_i32()?;
444 let sv = this.deref_pointer_as(sv, this.machine.layouts.i32)?;
446
447 let mut is_sock_nonblock = false;
448
449 if this.tcx.sess.target.os == "linux" {
452 let sock_nonblock = this.eval_libc_i32("SOCK_NONBLOCK");
454 let sock_cloexec = this.eval_libc_i32("SOCK_CLOEXEC");
455 if flags & sock_nonblock == sock_nonblock {
456 is_sock_nonblock = true;
457 flags &= !sock_nonblock;
458 }
459 if flags & sock_cloexec == sock_cloexec {
460 flags &= !sock_cloexec;
461 }
462 }
463
464 if domain != this.eval_libc_i32("AF_UNIX") && domain != this.eval_libc_i32("AF_LOCAL") {
468 throw_unsup_format!(
469 "socketpair: domain {:#x} is unsupported, only AF_UNIX \
470 and AF_LOCAL are allowed",
471 domain
472 );
473 } else if flags != this.eval_libc_i32("SOCK_STREAM") {
474 throw_unsup_format!(
475 "socketpair: type {:#x} is unsupported, only SOCK_STREAM, \
476 SOCK_CLOEXEC and SOCK_NONBLOCK are allowed",
477 flags
478 );
479 } else if protocol != 0 {
480 throw_unsup_format!(
481 "socketpair: socket protocol {protocol} is unsupported, \
482 only 0 is allowed",
483 );
484 }
485
486 let fds = &mut this.machine.fds;
488 let fd0 = fds.new_ref(AnonSocket {
489 readbuf: Some(RefCell::new(Buffer::new())),
490 peer_fd: OnceCell::new(),
491 peer_lost_data: Cell::new(false),
492 blocked_read_tid: RefCell::new(Vec::new()),
493 blocked_write_tid: RefCell::new(Vec::new()),
494 is_nonblock: Cell::new(is_sock_nonblock),
495 fd_type: AnonSocketType::Socketpair,
496 });
497 let fd1 = fds.new_ref(AnonSocket {
498 readbuf: Some(RefCell::new(Buffer::new())),
499 peer_fd: OnceCell::new(),
500 peer_lost_data: Cell::new(false),
501 blocked_read_tid: RefCell::new(Vec::new()),
502 blocked_write_tid: RefCell::new(Vec::new()),
503 is_nonblock: Cell::new(is_sock_nonblock),
504 fd_type: AnonSocketType::Socketpair,
505 });
506
507 fd0.peer_fd.set(FileDescriptionRef::downgrade(&fd1)).unwrap();
509 fd1.peer_fd.set(FileDescriptionRef::downgrade(&fd0)).unwrap();
510
511 let sv0 = fds.insert(fd0);
513 let sv1 = fds.insert(fd1);
514
515 let sv0 = Scalar::from_int(sv0, sv.layout.size);
517 let sv1 = Scalar::from_int(sv1, sv.layout.size);
518 this.write_scalar(sv0, &sv)?;
519 this.write_scalar(sv1, &sv.offset(sv.layout.size, sv.layout, this)?)?;
520
521 interp_ok(Scalar::from_i32(0))
522 }
523
524 fn pipe2(
525 &mut self,
526 pipefd: &OpTy<'tcx>,
527 flags: Option<&OpTy<'tcx>>,
528 ) -> InterpResult<'tcx, Scalar> {
529 let this = self.eval_context_mut();
530
531 let pipefd = this.deref_pointer_as(pipefd, this.machine.layouts.i32)?;
532 let mut flags = match flags {
533 Some(flags) => this.read_scalar(flags)?.to_i32()?,
534 None => 0,
535 };
536
537 let cloexec = this.eval_libc_i32("O_CLOEXEC");
538 let o_nonblock = this.eval_libc_i32("O_NONBLOCK");
539
540 let mut is_nonblock = false;
543 if flags & o_nonblock == o_nonblock {
544 is_nonblock = true;
545 flags &= !o_nonblock;
546 }
547 if flags & cloexec == cloexec {
549 flags &= !cloexec;
550 }
551 if flags != 0 {
552 throw_unsup_format!("unsupported flags in `pipe2`");
553 }
554
555 let fds = &mut this.machine.fds;
558 let fd0 = fds.new_ref(AnonSocket {
559 readbuf: Some(RefCell::new(Buffer::new())),
560 peer_fd: OnceCell::new(),
561 peer_lost_data: Cell::new(false),
562 blocked_read_tid: RefCell::new(Vec::new()),
563 blocked_write_tid: RefCell::new(Vec::new()),
564 is_nonblock: Cell::new(is_nonblock),
565 fd_type: AnonSocketType::PipeRead,
566 });
567 let fd1 = fds.new_ref(AnonSocket {
568 readbuf: None,
569 peer_fd: OnceCell::new(),
570 peer_lost_data: Cell::new(false),
571 blocked_read_tid: RefCell::new(Vec::new()),
572 blocked_write_tid: RefCell::new(Vec::new()),
573 is_nonblock: Cell::new(is_nonblock),
574 fd_type: AnonSocketType::PipeWrite,
575 });
576
577 fd0.peer_fd.set(FileDescriptionRef::downgrade(&fd1)).unwrap();
579 fd1.peer_fd.set(FileDescriptionRef::downgrade(&fd0)).unwrap();
580
581 let pipefd0 = fds.insert(fd0);
583 let pipefd1 = fds.insert(fd1);
584
585 let pipefd0 = Scalar::from_int(pipefd0, pipefd.layout.size);
587 let pipefd1 = Scalar::from_int(pipefd1, pipefd.layout.size);
588 this.write_scalar(pipefd0, &pipefd)?;
589 this.write_scalar(pipefd1, &pipefd.offset(pipefd.layout.size, pipefd.layout, this)?)?;
590
591 interp_ok(Scalar::from_i32(0))
592 }
593}