From d414ed266bfc747d03ea9044ea0f662e2b388f1c Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Fri, 13 Oct 2023 02:21:48 +0200 Subject: [PATCH 1/3] gh-110205: Fix asyncio ThreadedChildWatcher._join_threads() ThreadedChildWatcher._join_threads() now clears references to completed threads. test_asyncio.utils.TestCase now calls _join_threads() of the watcher, uses SHORT_TIMEOUT to join a thread, and then raises an exception if there are still running threads. Rename also ThreadedChildWatcher threads to add "asyncio-" prefix to the name. --- Lib/asyncio/unix_events.py | 10 +++++++--- Lib/test/test_asyncio/utils.py | 9 ++++++--- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index 65f0923264d14e..d7362be2535752 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -1373,12 +1373,16 @@ def is_active(self): def close(self): self._join_threads() - def _join_threads(self): + def _join_threads(self, timeout=None): """Internal: Join all non-daemon threads""" threads = [thread for thread in list(self._threads.values()) if thread.is_alive() and not thread.daemon] for thread in threads: - thread.join() + thread.join(timeout) + + # Clear references to terminated threads + self.threads = [thread for thread in list(self._threads.values()) + if thread.is_alive() and not thread.daemon] def __enter__(self): return self @@ -1397,7 +1401,7 @@ def __del__(self, _warn=warnings.warn): def add_child_handler(self, pid, callback, *args): loop = events.get_running_loop() thread = threading.Thread(target=self._do_waitpid, - name=f"waitpid-{next(self._pid_counter)}", + name=f"asyncio-waitpid-{next(self._pid_counter)}", args=(loop, pid, callback, args), daemon=True) self._threads[pid] = thread diff --git a/Lib/test/test_asyncio/utils.py b/Lib/test/test_asyncio/utils.py index e1101bf42eb24e..a952fb2970c38e 100644 --- a/Lib/test/test_asyncio/utils.py +++ b/Lib/test/test_asyncio/utils.py @@ -546,6 +546,7 @@ def close_loop(loop): else: loop._default_executor.shutdown(wait=True) loop.close() + policy = support.maybe_get_event_loop_policy() if policy is not None: try: @@ -557,9 +558,11 @@ def close_loop(loop): pass else: if isinstance(watcher, asyncio.ThreadedChildWatcher): - threads = list(watcher._threads.values()) - for thread in threads: - thread.join() + watcher._join_threads(timeout=support.SHORT_TIMEOUT) + threads = watcher._threads + if threads: + self.fail(f"watcher still has running threads: " + f"{threads}") def set_event_loop(self, loop, *, cleanup=True): if loop is None: From f161daafc1d9be2a96c6486f3924dabfb63eda76 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Fri, 13 Oct 2023 11:00:09 +0200 Subject: [PATCH 2/3] Don't create a new list --- Lib/asyncio/unix_events.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index d7362be2535752..dfa03946653c4a 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -1381,8 +1381,8 @@ def _join_threads(self, timeout=None): thread.join(timeout) # Clear references to terminated threads - self.threads = [thread for thread in list(self._threads.values()) - if thread.is_alive() and not thread.daemon] + self.threads[:] = [thread for thread in list(self._threads.values()) + if thread.is_alive() and not thread.daemon] def __enter__(self): return self From dbbad2d163f835e06514a465d33b3d34c0d42c58 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Fri, 13 Oct 2023 16:21:35 +0200 Subject: [PATCH 3/3] Fix typo: self.threads => self._threads --- Lib/asyncio/unix_events.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index dfa03946653c4a..a089087023a629 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -1381,8 +1381,8 @@ def _join_threads(self, timeout=None): thread.join(timeout) # Clear references to terminated threads - self.threads[:] = [thread for thread in list(self._threads.values()) - if thread.is_alive() and not thread.daemon] + self._threads[:] = [thread for thread in list(self._threads.values()) + if thread.is_alive() and not thread.daemon] def __enter__(self): return self