diff --git a/crates/common/src/lock.rs b/crates/common/src/lock.rs index 74679bd9d8f..f1d00627e90 100644 --- a/crates/common/src/lock.rs +++ b/crates/common/src/lock.rs @@ -96,3 +96,18 @@ pub unsafe fn reinit_rwlock_after_fork(rwlock: &PyRwLock) { core::ptr::write_bytes(raw, 0, core::mem::size_of::()); } } + +/// Reset a `PyThreadMutex` to its initial (unlocked, unowned) state after `fork()`. +/// +/// `PyThreadMutex` is used by buffered IO objects (`BufferedReader`, +/// `BufferedWriter`, `TextIOWrapper`). If a dead parent thread held one of +/// these locks during `fork()`, the child would deadlock on any IO operation. +/// +/// # Safety +/// +/// Must only be called from the single-threaded child process immediately +/// after `fork()`, before any other thread is created. +#[cfg(unix)] +pub unsafe fn reinit_thread_mutex_after_fork(mutex: &PyThreadMutex) { + unsafe { mutex.raw().reinit_after_fork() } +} diff --git a/crates/common/src/lock/thread_mutex.rs b/crates/common/src/lock/thread_mutex.rs index 2cabf7ea4cd..5b5b89f4eb1 100644 --- a/crates/common/src/lock/thread_mutex.rs +++ b/crates/common/src/lock/thread_mutex.rs @@ -72,6 +72,23 @@ impl RawThreadMutex { } } +impl RawThreadMutex { + /// Reset this mutex to its initial (unlocked, unowned) state after `fork()`. + /// + /// # Safety + /// + /// Must only be called from the single-threaded child process immediately + /// after `fork()`, before any other thread is created. + #[cfg(unix)] + pub unsafe fn reinit_after_fork(&self) { + self.owner.store(0, Ordering::Relaxed); + unsafe { + let mutex_ptr = &self.mutex as *const R as *mut u8; + core::ptr::write_bytes(mutex_ptr, 0, core::mem::size_of::()); + } + } +} + unsafe impl Send for RawThreadMutex {} unsafe impl Sync for RawThreadMutex {} @@ -103,6 +120,11 @@ impl From for ThreadMutex { } } impl ThreadMutex { + /// Access the underlying raw thread mutex. + pub fn raw(&self) -> &RawThreadMutex { + &self.raw + } + pub fn lock(&self) -> Option> { if self.raw.lock() { Some(ThreadMutexGuard { diff --git a/crates/vm/src/stdlib/io.rs b/crates/vm/src/stdlib/io.rs index f75a5dd4014..945042bc9e4 100644 --- a/crates/vm/src/stdlib/io.rs +++ b/crates/vm/src/stdlib/io.rs @@ -2,6 +2,8 @@ * I/O core tools. */ pub(crate) use _io::module_def; +#[cfg(all(unix, feature = "threading"))] +pub(crate) use _io::reinit_std_streams_after_fork; cfg_if::cfg_if! { if #[cfg(any(not(target_arch = "wasm32"), target_os = "wasi"))] { @@ -4985,6 +4987,61 @@ mod _io { } } + /// Reinit per-object IO buffer locks on std streams after `fork()`. + /// + /// # Safety + /// + /// Must only be called from the single-threaded child process immediately + /// after `fork()`, before any other thread is created. + #[cfg(all(unix, feature = "threading"))] + pub unsafe fn reinit_std_streams_after_fork(vm: &VirtualMachine) { + for name in ["stdin", "stdout", "stderr"] { + let Ok(stream) = vm.sys_module.get_attr(name, vm) else { + continue; + }; + reinit_io_locks(&stream); + } + } + + #[cfg(all(unix, feature = "threading"))] + fn reinit_io_locks(obj: &PyObject) { + use crate::common::lock::reinit_thread_mutex_after_fork; + + if let Some(tio) = obj.downcast_ref::() { + unsafe { reinit_thread_mutex_after_fork(&tio.data) }; + if let Some(guard) = tio.data.lock() { + if let Some(ref data) = *guard { + if let Some(ref decoder) = data.decoder { + reinit_io_locks(decoder); + } + reinit_io_locks(&data.buffer); + } + } + return; + } + if let Some(nl) = obj.downcast_ref::() { + unsafe { reinit_thread_mutex_after_fork(&nl.data) }; + return; + } + if let Some(br) = obj.downcast_ref::() { + unsafe { reinit_thread_mutex_after_fork(&br.data) }; + return; + } + if let Some(bw) = obj.downcast_ref::() { + unsafe { reinit_thread_mutex_after_fork(&bw.data) }; + return; + } + if let Some(brw) = obj.downcast_ref::() { + unsafe { reinit_thread_mutex_after_fork(&brw.data) }; + return; + } + if let Some(brw) = obj.downcast_ref::() { + unsafe { reinit_thread_mutex_after_fork(&brw.read.data) }; + unsafe { reinit_thread_mutex_after_fork(&brw.write.data) }; + return; + } + } + pub fn io_open( file: PyObjectRef, mode: Option<&str>, diff --git a/crates/vm/src/stdlib/posix.rs b/crates/vm/src/stdlib/posix.rs index febb3af9b0d..1adfb8cfe5a 100644 --- a/crates/vm/src/stdlib/posix.rs +++ b/crates/vm/src/stdlib/posix.rs @@ -719,6 +719,14 @@ pub mod module { #[cfg(feature = "threading")] reinit_locks_after_fork(vm); + // Reinit per-object IO buffer locks on std streams. + // BufferedReader/Writer/TextIOWrapper use PyThreadMutex which can be + // held by dead parent threads, causing deadlocks on any IO in the child. + #[cfg(feature = "threading")] + unsafe { + crate::stdlib::io::reinit_std_streams_after_fork(vm) + }; + // Phase 2: Reset low-level atomic state (no locks needed). crate::signal::clear_after_fork(); crate::stdlib::signal::_signal::clear_wakeup_fd_after_fork();