def __init__(self) -> None:
self._pid_counter = itertools.count(0)
- self._threads = {}
+ self._threads: Dict[Any, Any] = {}
- def is_active(self):
+ def is_active(self) -> bool:
return True
- def close(self):
+ def close(self) -> None:
self._join_threads()
- def _join_threads(self):
+ def _join_threads(self) -> None:
"""Internal: Join all non-daemon threads"""
threads = [thread for thread in list(self._threads.values())
if thread.is_alive() and not thread.daemon]
for thread in threads:
thread.join()
- def __enter__(self):
+ def __enter__(self) -> Any:
return self
- def __exit__(self, exc_type, exc_val, exc_tb):
+ def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
pass
- def __del__(self, _warn=warnings.warn):
+ def __del__(self, _warn: Any = warnings.warn) -> None:
threads = [thread for thread in list(self._threads.values())
if thread.is_alive()]
if threads:
ResourceWarning,
source=self)
- def add_child_handler(self, pid, callback, *args):
+ def add_child_handler(self, pid: Any, callback: Any, *args: Any) -> None:
loop = events.get_event_loop()
thread = threading.Thread(target=self._do_waitpid,
name=f'waitpid-{next(self._pid_counter)}',
self._threads[pid] = thread
thread.start()
- def remove_child_handler(self, pid):
+ def remove_child_handler(self, pid: Any) -> bool:
# asyncio never calls remove_child_handler() !!!
# The method is no-op but is implemented because
# abstract base classe requires it
return True
- def attach_loop(self, loop):
+ def attach_loop(self, loop: Any) -> None:
pass
- def _do_waitpid(self, loop, expected_pid, callback, args):
+ def _do_waitpid(self, loop: Any, expected_pid: Any, callback: Any, args: Any) -> None:
assert expected_pid > 0
try: