diff --git a/Lib/test/test_fnmatch.py b/Lib/test/test_fnmatch.py index b977b8f8eb..10ed496d4e 100644 --- a/Lib/test/test_fnmatch.py +++ b/Lib/test/test_fnmatch.py @@ -71,7 +71,6 @@ def test_fnmatchcase(self): check('usr/bin', 'usr\\bin', False, fnmatchcase) check('usr\\bin', 'usr\\bin', True, fnmatchcase) - @unittest.expectedFailureIfWindows('TODO: RUSTPYTHON') def test_bytes(self): self.check_match(b'test', b'te*') self.check_match(b'test\xff', b'te*\xff') diff --git a/Lib/test/test_ntpath.py b/Lib/test/test_ntpath.py index 86c34a43f1..1efe8bfa0e 100644 --- a/Lib/test/test_ntpath.py +++ b/Lib/test/test_ntpath.py @@ -130,8 +130,6 @@ def test_splitdrive(self): tester('ntpath.splitdrive("//?/UNC/server/share/dir")', ("//?/UNC/server/share", "/dir")) - # TODO: RUSTPYTHON - @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON") def test_splitdrive_invalid_paths(self): splitdrive = ntpath.splitdrive self.assertEqual(splitdrive('\\\\ser\x00ver\\sha\x00re\\di\x00r'), @@ -238,8 +236,6 @@ def test_splitroot(self): tester('ntpath.splitroot(" :/foo")', (" :", "/", "foo")) tester('ntpath.splitroot("/:/foo")', ("", "/", ":/foo")) - # TODO: RUSTPYTHON - @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON") def test_splitroot_invalid_paths(self): splitroot = ntpath.splitroot self.assertEqual(splitroot('\\\\ser\x00ver\\sha\x00re\\di\x00r'), @@ -268,8 +264,6 @@ def test_split(self): tester('ntpath.split("c:/")', ('c:/', '')) tester('ntpath.split("//conky/mountpoint/")', ('//conky/mountpoint/', '')) - # TODO: RUSTPYTHON - @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON") def test_split_invalid_paths(self): split = ntpath.split self.assertEqual(split('c:\\fo\x00o\\ba\x00r'), @@ -392,8 +386,6 @@ def test_join(self): tester("ntpath.join('D:a', './c:b')", 'D:a\\.\\c:b') tester("ntpath.join('D:/a', './c:b')", 'D:\\a\\.\\c:b') - # TODO: RUSTPYTHON - @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON") def test_normcase(self): normcase = ntpath.normcase self.assertEqual(normcase(''), '') @@ -409,8 +401,6 @@ def test_normcase(self): self.assertEqual(normcase('\u03a9\u2126'.encode()), expected.encode()) - # TODO: RUSTPYTHON - @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON") def test_normcase_invalid_paths(self): normcase = ntpath.normcase self.assertEqual(normcase('abc\x00def'), 'abc\x00def') @@ -468,8 +458,6 @@ def test_normpath(self): tester("ntpath.normpath('\\\\')", '\\\\') tester("ntpath.normpath('//?/UNC/server/share/..')", '\\\\?\\UNC\\server\\share\\') - # TODO: RUSTPYTHON - @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON") def test_normpath_invalid_paths(self): normpath = ntpath.normpath self.assertEqual(normpath('fo\x00o'), 'fo\x00o') @@ -1130,8 +1118,6 @@ def test_abspath(self): drive, _ = ntpath.splitdrive(cwd_dir) tester('ntpath.abspath("/abc/")', drive + "\\abc") - # TODO: RUSTPYTHON - @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON") def test_abspath_invalid_paths(self): abspath = ntpath.abspath if sys.platform == 'win32': @@ -1438,8 +1424,6 @@ def test_isfile_anonymous_pipe(self): os.close(pr) os.close(pw) - # TODO: RUSTPYTHON - @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON") @unittest.skipIf(sys.platform != 'win32', "windows only") def test_isfile_named_pipe(self): import _winapi diff --git a/Lib/test/test_posixpath.py b/Lib/test/test_posixpath.py index 3614fc40ba..0dc0211ead 100644 --- a/Lib/test/test_posixpath.py +++ b/Lib/test/test_posixpath.py @@ -189,8 +189,6 @@ def test_dirname(self): self.assertEqual(posixpath.dirname(b"////foo"), b"////") self.assertEqual(posixpath.dirname(b"//foo//bar"), b"//foo") - # TODO: RUSTPYTHON - @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON") def test_islink(self): self.assertIs(posixpath.islink(TESTFN + "1"), False) self.assertIs(posixpath.lexists(TESTFN + "2"), False) @@ -236,8 +234,6 @@ def test_ismount_invalid_paths(self): self.assertIs(posixpath.ismount('/\x00'), False) self.assertIs(posixpath.ismount(b'/\x00'), False) - # TODO: RUSTPYTHON - @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON") @os_helper.skip_unless_symlink def test_ismount_symlinks(self): # Symlinks are never mountpoints. diff --git a/crates/vm/src/stdlib/nt.rs b/crates/vm/src/stdlib/nt.rs index f1a5a71af9..0b8b2ec637 100644 --- a/crates/vm/src/stdlib/nt.rs +++ b/crates/vm/src/stdlib/nt.rs @@ -570,6 +570,151 @@ pub(crate) mod module { Ok(path.mode.process_path(buffer.to_os_string(), vm)) } + /// Implements CPython's _Py_skiproot logic for Windows paths + /// Returns (drive_size, root_size) where: + /// - drive_size: length of the drive/UNC portion + /// - root_size: length of the root separator (0 or 1) + fn skiproot(path: &[u16]) -> (usize, usize) { + let len = path.len(); + if len == 0 { + return (0, 0); + } + + const SEP: u16 = b'\\' as u16; + const ALTSEP: u16 = b'/' as u16; + const COLON: u16 = b':' as u16; + + let is_sep = |c: u16| c == SEP || c == ALTSEP; + let get = |i: usize| path.get(i).copied().unwrap_or(0); + + if is_sep(get(0)) { + if is_sep(get(1)) { + // UNC or device path: \\server\share or \\?\device + // Check for \\?\UNC\server\share + let idx = if len >= 8 + && get(2) == b'?' as u16 + && is_sep(get(3)) + && (get(4) == b'U' as u16 || get(4) == b'u' as u16) + && (get(5) == b'N' as u16 || get(5) == b'n' as u16) + && (get(6) == b'C' as u16 || get(6) == b'c' as u16) + && is_sep(get(7)) + { + 8 + } else { + 2 + }; + + // Find the end of server name + let mut i = idx; + while i < len && !is_sep(get(i)) { + i += 1; + } + + if i >= len { + // No share part: \\server + return (i, 0); + } + + // Skip separator and find end of share name + i += 1; + while i < len && !is_sep(get(i)) { + i += 1; + } + + // drive = \\server\share, root = \ (if present) + if i >= len { (i, 0) } else { (i, 1) } + } else { + // Relative path with root: \Windows + (0, 1) + } + } else if len >= 2 && get(1) == COLON { + // Drive letter path + if len >= 3 && is_sep(get(2)) { + // Absolute: X:\Windows + (2, 1) + } else { + // Relative with drive: X:Windows + (2, 0) + } + } else { + // Relative path: Windows + (0, 0) + } + } + + #[pyfunction] + fn _path_splitroot_ex(path: crate::PyObjectRef, vm: &VirtualMachine) -> PyResult { + use crate::builtins::{PyBytes, PyStr}; + use rustpython_common::wtf8::Wtf8Buf; + + // Handle path-like objects via os.fspath, but without null check (nonstrict=True in CPython) + let path = if let Some(fspath) = vm.get_method(path.clone(), identifier!(vm, __fspath__)) { + fspath?.call((), vm)? + } else { + path + }; + + // Convert to wide string, validating UTF-8 for bytes input + let (wide, is_bytes): (Vec, bool) = if let Some(s) = path.downcast_ref::() { + // Use encode_wide which handles WTF-8 (including surrogates) + let wide: Vec = s.as_wtf8().encode_wide().collect(); + (wide, false) + } else if let Some(b) = path.downcast_ref::() { + // On Windows, bytes must be valid UTF-8 - this raises UnicodeDecodeError if not + let s = std::str::from_utf8(b.as_bytes()).map_err(|e| { + vm.new_exception_msg( + vm.ctx.exceptions.unicode_decode_error.to_owned(), + format!( + "'utf-8' codec can't decode byte {:#x} in position {}: invalid start byte", + b.as_bytes().get(e.valid_up_to()).copied().unwrap_or(0), + e.valid_up_to() + ), + ) + })?; + let wide: Vec = s.encode_utf16().collect(); + (wide, true) + } else { + return Err(vm.new_type_error(format!( + "expected str or bytes, not {}", + path.class().name() + ))); + }; + + // Normalize slashes for parsing + let normalized: Vec = wide + .iter() + .map(|&c| if c == b'/' as u16 { b'\\' as u16 } else { c }) + .collect(); + + let (drv_size, root_size) = skiproot(&normalized); + + // Return as bytes if input was bytes, preserving the original content + if is_bytes { + // Convert UTF-16 back to UTF-8 for bytes output + let drv = String::from_utf16(&wide[..drv_size]) + .map_err(|e| vm.new_unicode_decode_error(e.to_string()))?; + let root = String::from_utf16(&wide[drv_size..drv_size + root_size]) + .map_err(|e| vm.new_unicode_decode_error(e.to_string()))?; + let tail = String::from_utf16(&wide[drv_size + root_size..]) + .map_err(|e| vm.new_unicode_decode_error(e.to_string()))?; + Ok(vm.ctx.new_tuple(vec![ + vm.ctx.new_bytes(drv.into_bytes()).into(), + vm.ctx.new_bytes(root.into_bytes()).into(), + vm.ctx.new_bytes(tail.into_bytes()).into(), + ])) + } else { + // For str output, use WTF-8 to handle surrogates + let drv = Wtf8Buf::from_wide(&wide[..drv_size]); + let root = Wtf8Buf::from_wide(&wide[drv_size..drv_size + root_size]); + let tail = Wtf8Buf::from_wide(&wide[drv_size + root_size..]); + Ok(vm.ctx.new_tuple(vec![ + vm.ctx.new_str(drv).into(), + vm.ctx.new_str(root).into(), + vm.ctx.new_str(tail).into(), + ])) + } + } + #[pyfunction] fn _path_splitroot(path: OsPath, vm: &VirtualMachine) -> PyResult<(String, String)> { let orig: Vec<_> = path.path.to_wide(); diff --git a/crates/vm/src/stdlib/winapi.rs b/crates/vm/src/stdlib/winapi.rs index 62505b2b74..956d96d543 100644 --- a/crates/vm/src/stdlib/winapi.rs +++ b/crates/vm/src/stdlib/winapi.rs @@ -540,4 +540,133 @@ mod _winapi { windows_sys::Win32::System::Threading::ReleaseMutex(handle as _) }) } + + // LOCALE_NAME_INVARIANT is an empty string in Windows API + #[pyattr] + const LOCALE_NAME_INVARIANT: &str = ""; + + /// LCMapStringEx - Map a string to another string using locale-specific rules + /// This is used by ntpath.normcase() for proper Windows case conversion + #[pyfunction] + fn LCMapStringEx( + locale: PyStrRef, + flags: u32, + src: PyStrRef, + vm: &VirtualMachine, + ) -> PyResult { + use rustpython_common::wtf8::Wtf8Buf; + use windows_sys::Win32::Globalization::{ + LCMAP_BYTEREV, LCMAP_HASH, LCMAP_SORTHANDLE, LCMAP_SORTKEY, + LCMapStringEx as WinLCMapStringEx, + }; + + // Reject unsupported flags (same as CPython) + if flags & (LCMAP_SORTHANDLE | LCMAP_HASH | LCMAP_BYTEREV | LCMAP_SORTKEY) != 0 { + return Err(vm.new_value_error("unsupported flags")); + } + + // Use encode_wide() which properly handles WTF-8 (including surrogates) + let locale_wide: Vec = locale + .as_wtf8() + .encode_wide() + .chain(std::iter::once(0)) + .collect(); + let src_wide: Vec = src.as_wtf8().encode_wide().collect(); + + if src_wide.len() > i32::MAX as usize { + return Err(vm.new_overflow_error("input string is too long".to_string())); + } + + // First call to get required buffer size + let dest_size = unsafe { + WinLCMapStringEx( + locale_wide.as_ptr(), + flags, + src_wide.as_ptr(), + src_wide.len() as i32, + null_mut(), + 0, + null(), + null(), + 0, + ) + }; + + if dest_size <= 0 { + return Err(vm.new_last_os_error()); + } + + // Second call to perform the mapping + let mut dest = vec![0u16; dest_size as usize]; + let nmapped = unsafe { + WinLCMapStringEx( + locale_wide.as_ptr(), + flags, + src_wide.as_ptr(), + src_wide.len() as i32, + dest.as_mut_ptr(), + dest_size, + null(), + null(), + 0, + ) + }; + + if nmapped <= 0 { + return Err(vm.new_last_os_error()); + } + + dest.truncate(nmapped as usize); + + // Convert UTF-16 back to WTF-8 (handles surrogates properly) + let result = Wtf8Buf::from_wide(&dest); + Ok(vm.ctx.new_str(result)) + } + + #[derive(FromArgs)] + struct CreateNamedPipeArgs { + #[pyarg(positional)] + name: PyStrRef, + #[pyarg(positional)] + open_mode: u32, + #[pyarg(positional)] + pipe_mode: u32, + #[pyarg(positional)] + max_instances: u32, + #[pyarg(positional)] + out_buffer_size: u32, + #[pyarg(positional)] + in_buffer_size: u32, + #[pyarg(positional)] + default_timeout: u32, + #[pyarg(positional)] + _security_attributes: PyObjectRef, // Ignored, can be None + } + + /// CreateNamedPipe - Create a named pipe + #[pyfunction] + fn CreateNamedPipe(args: CreateNamedPipeArgs, vm: &VirtualMachine) -> PyResult { + use windows_sys::Win32::System::Pipes::CreateNamedPipeW; + + let name_wide = args.name.as_str().to_wide_with_nul(); + + let handle = unsafe { + CreateNamedPipeW( + name_wide.as_ptr(), + args.open_mode, + args.pipe_mode, + args.max_instances, + args.out_buffer_size, + args.in_buffer_size, + args.default_timeout, + null(), // security_attributes - NULL for now + ) + }; + + if handle == INVALID_HANDLE_VALUE { + return Err(vm.new_last_os_error()); + } + + Ok(WinHandle(handle)) + } }