diff --git a/Lib/test/test_winapi.py b/Lib/test/test_winapi.py index 846398f89d..99b5a0dfd1 100644 --- a/Lib/test/test_winapi.py +++ b/Lib/test/test_winapi.py @@ -77,34 +77,22 @@ def _events_waitany_test(self, n): evts[i] = old_evt - # TODO: RUSTPYTHON - @unittest.expectedFailure def test_few_events_waitall(self): self._events_waitall_test(16) - # TODO: RUSTPYTHON - @unittest.expectedFailure def test_many_events_waitall(self): self._events_waitall_test(256) - # TODO: RUSTPYTHON - @unittest.expectedFailure def test_max_events_waitall(self): self._events_waitall_test(MAXIMUM_BATCHED_WAIT_OBJECTS) - # TODO: RUSTPYTHON - @unittest.expectedFailure def test_few_events_waitany(self): self._events_waitany_test(16) - # TODO: RUSTPYTHON - @unittest.expectedFailure def test_many_events_waitany(self): self._events_waitany_test(256) - # TODO: RUSTPYTHON - @unittest.expectedFailure def test_max_events_waitany(self): self._events_waitany_test(MAXIMUM_BATCHED_WAIT_OBJECTS) @@ -140,8 +128,6 @@ def test_getshortpathname(self): # Should contain "PROGRA~" but we can't predict the number self.assertIsNotNone(re.match(r".\:\\PROGRA~\d", actual.upper()), actual) - # TODO: RUSTPYTHON - @unittest.expectedFailure def test_namedpipe(self): pipe_name = rf"\\.\pipe\LOCAL\{os_helper.TESTFN}" diff --git a/crates/vm/src/stdlib/winapi.rs b/crates/vm/src/stdlib/winapi.rs index 44f56f0112..5cfb62fad6 100644 --- a/crates/vm/src/stdlib/winapi.rs +++ b/crates/vm/src/stdlib/winapi.rs @@ -572,7 +572,7 @@ mod _winapi { LCMapStringEx as WinLCMapStringEx, }; - // Reject unsupported flags (same as CPython) + // Reject unsupported flags if flags & (LCMAP_SORTHANDLE | LCMAP_HASH | LCMAP_BYTEREV | LCMAP_SORTKEY) != 0 { return Err(vm.new_value_error("unsupported flags")); } @@ -725,4 +725,441 @@ mod _winapi { use windows_sys::Win32::Storage::FileSystem::GetLongPathNameW; get_path_name_impl(&path, GetLongPathNameW, vm) } + + /// WaitNamedPipe - Wait for an instance of a named pipe to become available. + #[pyfunction] + fn WaitNamedPipe(name: PyStrRef, timeout: u32, vm: &VirtualMachine) -> PyResult<()> { + use windows_sys::Win32::System::Pipes::WaitNamedPipeW; + + let name_wide = name.as_wtf8().to_wide_with_nul(); + + let success = unsafe { WaitNamedPipeW(name_wide.as_ptr(), timeout) }; + + if success == 0 { + return Err(vm.new_last_os_error()); + } + + Ok(()) + } + + /// PeekNamedPipe - Peek at data in a named pipe without removing it. + #[pyfunction] + fn PeekNamedPipe( + handle: WinHandle, + size: OptionalArg, + vm: &VirtualMachine, + ) -> PyResult { + use windows_sys::Win32::System::Pipes::PeekNamedPipe as WinPeekNamedPipe; + + let size = size.unwrap_or(0); + + if size < 0 { + return Err(vm.new_value_error("negative size".to_string())); + } + + let mut navail: u32 = 0; + let mut nleft: u32 = 0; + + if size > 0 { + let mut buf = vec![0u8; size as usize]; + let mut nread: u32 = 0; + + let ret = unsafe { + WinPeekNamedPipe( + handle.0, + buf.as_mut_ptr() as *mut _, + size as u32, + &mut nread, + &mut navail, + &mut nleft, + ) + }; + + if ret == 0 { + return Err(vm.new_last_os_error()); + } + + buf.truncate(nread as usize); + let bytes: PyObjectRef = vm.ctx.new_bytes(buf).into(); + Ok(vm + .ctx + .new_tuple(vec![ + bytes, + vm.ctx.new_int(navail).into(), + vm.ctx.new_int(nleft).into(), + ]) + .into()) + } else { + let ret = unsafe { + WinPeekNamedPipe(handle.0, null_mut(), 0, null_mut(), &mut navail, &mut nleft) + }; + + if ret == 0 { + return Err(vm.new_last_os_error()); + } + + Ok(vm + .ctx + .new_tuple(vec![ + vm.ctx.new_int(navail).into(), + vm.ctx.new_int(nleft).into(), + ]) + .into()) + } + } + + /// CreateEventW - Create or open a named or unnamed event object. + #[pyfunction] + fn CreateEventW( + security_attributes: isize, // Always NULL (0) + manual_reset: bool, + initial_state: bool, + name: Option, + vm: &VirtualMachine, + ) -> PyResult { + use windows_sys::Win32::System::Threading::CreateEventW as WinCreateEventW; + + let _ = security_attributes; // Ignored, always NULL + + let name_wide = name.map(|n| n.as_wtf8().to_wide_with_nul()); + let name_ptr = name_wide.as_ref().map_or(null(), |n| n.as_ptr()); + + let handle = unsafe { + WinCreateEventW( + null(), + i32::from(manual_reset), + i32::from(initial_state), + name_ptr, + ) + }; + + if handle.is_null() { + return Err(vm.new_last_os_error()); + } + + Ok(WinHandle(handle)) + } + + /// SetEvent - Set the specified event object to the signaled state. + #[pyfunction] + fn SetEvent(event: WinHandle, vm: &VirtualMachine) -> PyResult<()> { + use windows_sys::Win32::System::Threading::SetEvent as WinSetEvent; + + let ret = unsafe { WinSetEvent(event.0) }; + + if ret == 0 { + return Err(vm.new_last_os_error()); + } + + Ok(()) + } + + /// WriteFile - Write data to a file or I/O device. + #[pyfunction] + fn WriteFile( + handle: WinHandle, + buffer: crate::function::ArgBytesLike, + use_overlapped: OptionalArg, + vm: &VirtualMachine, + ) -> PyResult<(u32, u32)> { + use windows_sys::Win32::Storage::FileSystem::WriteFile as WinWriteFile; + + let use_overlapped = use_overlapped.unwrap_or(false); + + if use_overlapped { + return Err(vm.new_not_implemented_error( + "overlapped WriteFile is not yet implemented in _winapi".to_string(), + )); + } + + let buf = buffer.borrow_buf(); + let len = std::cmp::min(buf.len(), u32::MAX as usize) as u32; + let mut written: u32 = 0; + + let ret = unsafe { + WinWriteFile( + handle.0, + buf.as_ptr() as *const _, + len, + &mut written, + null_mut(), + ) + }; + + let err = if ret == 0 { + unsafe { windows_sys::Win32::Foundation::GetLastError() } + } else { + 0 + }; + + if ret == 0 { + return Err(vm.new_last_os_error()); + } + + Ok((written, err)) + } + + const MAXIMUM_WAIT_OBJECTS: usize = 64; + + /// BatchedWaitForMultipleObjects - Wait for multiple handles, supporting more than 64. + #[pyfunction] + fn BatchedWaitForMultipleObjects( + handle_seq: PyObjectRef, + wait_all: bool, + milliseconds: OptionalArg, + vm: &VirtualMachine, + ) -> PyResult { + use std::sync::Arc; + use std::sync::atomic::{AtomicU32, Ordering}; + use windows_sys::Win32::Foundation::{CloseHandle, WAIT_FAILED, WAIT_OBJECT_0}; + use windows_sys::Win32::System::SystemInformation::GetTickCount64; + use windows_sys::Win32::System::Threading::{ + CreateEventW as WinCreateEventW, CreateThread, GetExitCodeThread, + INFINITE as WIN_INFINITE, ResumeThread, SetEvent as WinSetEvent, TerminateThread, + WaitForMultipleObjects, + }; + + let milliseconds = milliseconds.unwrap_or(WIN_INFINITE); + + // Get handles from sequence + let seq = ArgSequence::::try_from_object(vm, handle_seq)?; + let handles: Vec = seq.into_vec(); + let nhandles = handles.len(); + + if nhandles == 0 { + return if wait_all { + Ok(vm.ctx.none()) + } else { + Ok(vm.ctx.new_list(vec![]).into()) + }; + } + + let max_total_objects = (MAXIMUM_WAIT_OBJECTS - 1) * (MAXIMUM_WAIT_OBJECTS - 1); + if nhandles > max_total_objects { + return Err(vm.new_value_error(format!( + "need at most {} handles, got a sequence of length {}", + max_total_objects, nhandles + ))); + } + + // Create batches of handles + let batch_size = MAXIMUM_WAIT_OBJECTS - 1; // Leave room for cancel_event + let mut batches: Vec> = Vec::new(); + let mut i = 0; + while i < nhandles { + let end = std::cmp::min(i + batch_size, nhandles); + batches.push(handles[i..end].to_vec()); + i = end; + } + + if wait_all { + // For wait_all, we wait sequentially for each batch + let deadline = if milliseconds != WIN_INFINITE { + Some(unsafe { GetTickCount64() } + milliseconds as u64) + } else { + None + }; + + for batch in &batches { + let timeout = if let Some(deadline) = deadline { + let now = unsafe { GetTickCount64() }; + if now >= deadline { + return Err( + vm.new_exception_empty(vm.ctx.exceptions.timeout_error.to_owned()) + ); + } + (deadline - now) as u32 + } else { + WIN_INFINITE + }; + + let batch_handles: Vec<_> = batch.iter().map(|&h| h as _).collect(); + let result = unsafe { + WaitForMultipleObjects( + batch_handles.len() as u32, + batch_handles.as_ptr(), + 1, // wait_all = TRUE + timeout, + ) + }; + + if result == WAIT_FAILED { + return Err(vm.new_last_os_error()); + } + if result == windows_sys::Win32::Foundation::WAIT_TIMEOUT { + return Err(vm.new_exception_empty(vm.ctx.exceptions.timeout_error.to_owned())); + } + } + + Ok(vm.ctx.none()) + } else { + // For wait_any, we use threads to wait on each batch in parallel + let cancel_event = unsafe { WinCreateEventW(null(), 1, 0, null()) }; // Manual reset, not signaled + if cancel_event.is_null() { + return Err(vm.new_last_os_error()); + } + + struct BatchData { + handles: Vec, + cancel_event: isize, + handle_base: usize, + result: AtomicU32, + thread: std::cell::UnsafeCell, + } + + unsafe impl Send for BatchData {} + unsafe impl Sync for BatchData {} + + let batch_data: Vec> = batches + .iter() + .enumerate() + .map(|(idx, batch)| { + let base = idx * batch_size; + let mut handles_with_cancel = batch.clone(); + handles_with_cancel.push(cancel_event as isize); + Arc::new(BatchData { + handles: handles_with_cancel, + cancel_event: cancel_event as isize, + handle_base: base, + result: AtomicU32::new(WAIT_FAILED), + thread: std::cell::UnsafeCell::new(0), + }) + }) + .collect(); + + // Thread function + extern "system" fn batch_wait_thread(param: *mut std::ffi::c_void) -> u32 { + let data = unsafe { &*(param as *const BatchData) }; + let handles: Vec<_> = data.handles.iter().map(|&h| h as _).collect(); + let result = unsafe { + WaitForMultipleObjects( + handles.len() as u32, + handles.as_ptr(), + 0, // wait_any + WIN_INFINITE, + ) + }; + data.result.store(result, Ordering::SeqCst); + + if result == WAIT_FAILED { + let err = unsafe { windows_sys::Win32::Foundation::GetLastError() }; + unsafe { WinSetEvent(data.cancel_event as _) }; + return err; + } else if result >= windows_sys::Win32::Foundation::WAIT_ABANDONED_0 + && result + < windows_sys::Win32::Foundation::WAIT_ABANDONED_0 + + MAXIMUM_WAIT_OBJECTS as u32 + { + data.result.store(WAIT_FAILED, Ordering::SeqCst); + unsafe { WinSetEvent(data.cancel_event as _) }; + return windows_sys::Win32::Foundation::ERROR_ABANDONED_WAIT_0; + } + 0 + } + + // Create threads + let mut thread_handles: Vec = Vec::new(); + for data in &batch_data { + let thread = unsafe { + CreateThread( + null(), + 1, // Smallest stack + Some(batch_wait_thread), + Arc::as_ptr(data) as *const _ as *mut _, + 4, // CREATE_SUSPENDED + null_mut(), + ) + }; + if thread.is_null() { + // Cleanup on error + for h in &thread_handles { + unsafe { TerminateThread(*h as _, 0) }; + unsafe { CloseHandle(*h as _) }; + } + unsafe { CloseHandle(cancel_event) }; + return Err(vm.new_last_os_error()); + } + unsafe { *data.thread.get() = thread as isize }; + thread_handles.push(thread as isize); + } + + // Resume all threads + for &thread in &thread_handles { + unsafe { ResumeThread(thread as _) }; + } + + // Wait for any thread to complete + let thread_handles_raw: Vec<_> = thread_handles.iter().map(|&h| h as _).collect(); + let result = unsafe { + WaitForMultipleObjects( + thread_handles_raw.len() as u32, + thread_handles_raw.as_ptr(), + 0, // wait_any + milliseconds, + ) + }; + + let err = if result == WAIT_FAILED { + Some(unsafe { windows_sys::Win32::Foundation::GetLastError() }) + } else if result == windows_sys::Win32::Foundation::WAIT_TIMEOUT { + Some(windows_sys::Win32::Foundation::WAIT_TIMEOUT) + } else { + None + }; + + // Signal cancel event to stop other threads + unsafe { WinSetEvent(cancel_event) }; + + // Wait for all threads to finish + unsafe { + WaitForMultipleObjects( + thread_handles_raw.len() as u32, + thread_handles_raw.as_ptr(), + 1, // wait_all + WIN_INFINITE, + ) + }; + + // Check for errors from threads + let mut thread_err = err; + for data in &batch_data { + if thread_err.is_none() && data.result.load(Ordering::SeqCst) == WAIT_FAILED { + let mut exit_code: u32 = 0; + let thread = unsafe { *data.thread.get() }; + if unsafe { GetExitCodeThread(thread as _, &mut exit_code) } == 0 { + thread_err = + Some(unsafe { windows_sys::Win32::Foundation::GetLastError() }); + } else if exit_code != 0 { + thread_err = Some(exit_code); + } + } + let thread = unsafe { *data.thread.get() }; + unsafe { CloseHandle(thread as _) }; + } + + unsafe { CloseHandle(cancel_event) }; + + // Return result + if let Some(e) = thread_err { + if e == windows_sys::Win32::Foundation::WAIT_TIMEOUT { + return Err(vm.new_exception_empty(vm.ctx.exceptions.timeout_error.to_owned())); + } + return Err(vm.new_os_error(e as i32)); + } + + // Collect triggered indices + let mut triggered_indices: Vec = Vec::new(); + for data in &batch_data { + let result = data.result.load(Ordering::SeqCst); + let triggered = result as i32 - WAIT_OBJECT_0 as i32; + // Check if it's a valid handle index (not the cancel_event which is last) + if triggered >= 0 && (triggered as usize) < data.handles.len() - 1 { + let index = data.handle_base + triggered as usize; + triggered_indices.push(vm.ctx.new_int(index).into()); + } + } + + Ok(vm.ctx.new_list(triggered_indices).into()) + } + } }