varia.website/venv/lib/python3.11/site-packages/watchfiles/run.py
2024-11-19 14:01:39 +01:00

439 lines
15 KiB
Python

import contextlib
import json
import logging
import os
import re
import shlex
import signal
import subprocess
import sys
from importlib import import_module
from multiprocessing import get_context
from multiprocessing.context import SpawnProcess
from pathlib import Path
from time import sleep
from typing import TYPE_CHECKING, Any, Callable, Dict, Generator, List, Optional, Set, Tuple, Union
import anyio
from .filters import DefaultFilter
from .main import Change, FileChange, awatch, watch
if TYPE_CHECKING:
from typing import Literal
__all__ = 'run_process', 'arun_process', 'detect_target_type', 'import_string'
logger = logging.getLogger('watchfiles.main')
def run_process(
*paths: Union[Path, str],
target: Union[str, Callable[..., Any]],
args: Tuple[Any, ...] = (),
kwargs: Optional[Dict[str, Any]] = None,
target_type: "Literal['function', 'command', 'auto']" = 'auto',
callback: Optional[Callable[[Set[FileChange]], None]] = None,
watch_filter: Optional[Callable[[Change, str], bool]] = DefaultFilter(),
grace_period: float = 0,
debounce: int = 1_600,
step: int = 50,
debug: Optional[bool] = None,
sigint_timeout: int = 5,
sigkill_timeout: int = 1,
recursive: bool = True,
ignore_permission_denied: bool = False,
) -> int:
"""
Run a process and restart it upon file changes.
`run_process` can work in two ways:
* Using `multiprocessing.Process` † to run a python function
* Or, using `subprocess.Popen` to run a command
!!! note
**†** technically `multiprocessing.get_context('spawn').Process` to avoid forking and improve
code reload/import.
Internally, `run_process` uses [`watch`][watchfiles.watch] with `raise_interrupt=False` so the function
exits cleanly upon `Ctrl+C`.
Args:
*paths: matches the same argument of [`watch`][watchfiles.watch]
target: function or command to run
args: arguments to pass to `target`, only used if `target` is a function
kwargs: keyword arguments to pass to `target`, only used if `target` is a function
target_type: type of target. Can be `'function'`, `'command'`, or `'auto'` in which case
[`detect_target_type`][watchfiles.run.detect_target_type] is used to determine the type.
callback: function to call on each reload, the function should accept a set of changes as the sole argument
watch_filter: matches the same argument of [`watch`][watchfiles.watch]
grace_period: number of seconds after the process is started before watching for changes
debounce: matches the same argument of [`watch`][watchfiles.watch]
step: matches the same argument of [`watch`][watchfiles.watch]
debug: matches the same argument of [`watch`][watchfiles.watch]
sigint_timeout: the number of seconds to wait after sending sigint before sending sigkill
sigkill_timeout: the number of seconds to wait after sending sigkill before raising an exception
recursive: matches the same argument of [`watch`][watchfiles.watch]
Returns:
number of times the function was reloaded.
```py title="Example of run_process running a function"
from watchfiles import run_process
def callback(changes):
print('changes detected:', changes)
def foobar(a, b):
print('foobar called with:', a, b)
if __name__ == '__main__':
run_process('./path/to/dir', target=foobar, args=(1, 2), callback=callback)
```
As well as using a `callback` function, changes can be accessed from within the target function,
using the `WATCHFILES_CHANGES` environment variable.
```py title="Example of run_process accessing changes"
from watchfiles import run_process
def foobar(a, b, c):
# changes will be an empty list "[]" the first time the function is called
changes = os.getenv('WATCHFILES_CHANGES')
changes = json.loads(changes)
print('foobar called due to changes:', changes)
if __name__ == '__main__':
run_process('./path/to/dir', target=foobar, args=(1, 2, 3))
```
Again with the target as `command`, `WATCHFILES_CHANGES` can be used
to access changes.
```bash title="example.sh"
echo "changers: ${WATCHFILES_CHANGES}"
```
```py title="Example of run_process running a command"
from watchfiles import run_process
if __name__ == '__main__':
run_process('.', target='./example.sh')
```
"""
if target_type == 'auto':
target_type = detect_target_type(target)
logger.debug('running "%s" as %s', target, target_type)
catch_sigterm()
process = start_process(target, target_type, args, kwargs)
reloads = 0
if grace_period:
logger.debug('sleeping for %s seconds before watching for changes', grace_period)
sleep(grace_period)
try:
for changes in watch(
*paths,
watch_filter=watch_filter,
debounce=debounce,
step=step,
debug=debug,
raise_interrupt=False,
recursive=recursive,
ignore_permission_denied=ignore_permission_denied,
):
callback and callback(changes)
process.stop(sigint_timeout=sigint_timeout, sigkill_timeout=sigkill_timeout)
process = start_process(target, target_type, args, kwargs, changes)
reloads += 1
finally:
process.stop()
return reloads
async def arun_process(
*paths: Union[Path, str],
target: Union[str, Callable[..., Any]],
args: Tuple[Any, ...] = (),
kwargs: Optional[Dict[str, Any]] = None,
target_type: "Literal['function', 'command', 'auto']" = 'auto',
callback: Optional[Callable[[Set[FileChange]], Any]] = None,
watch_filter: Optional[Callable[[Change, str], bool]] = DefaultFilter(),
grace_period: float = 0,
debounce: int = 1_600,
step: int = 50,
debug: Optional[bool] = None,
recursive: bool = True,
ignore_permission_denied: bool = False,
) -> int:
"""
Async equivalent of [`run_process`][watchfiles.run_process], all arguments match those of `run_process` except
`callback` which can be a coroutine.
Starting and stopping the process and watching for changes is done in a separate thread.
As with `run_process`, internally `arun_process` uses [`awatch`][watchfiles.awatch], however `KeyboardInterrupt`
cannot be caught and suppressed in `awatch` so these errors need to be caught separately, see below.
```py title="Example of arun_process usage"
import asyncio
from watchfiles import arun_process
async def callback(changes):
await asyncio.sleep(0.1)
print('changes detected:', changes)
def foobar(a, b):
print('foobar called with:', a, b)
async def main():
await arun_process('.', target=foobar, args=(1, 2), callback=callback)
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
print('stopped via KeyboardInterrupt')
```
"""
import inspect
if target_type == 'auto':
target_type = detect_target_type(target)
logger.debug('running "%s" as %s', target, target_type)
catch_sigterm()
process = await anyio.to_thread.run_sync(start_process, target, target_type, args, kwargs)
reloads = 0
if grace_period:
logger.debug('sleeping for %s seconds before watching for changes', grace_period)
await anyio.sleep(grace_period)
async for changes in awatch(
*paths,
watch_filter=watch_filter,
debounce=debounce,
step=step,
debug=debug,
recursive=recursive,
ignore_permission_denied=ignore_permission_denied,
):
if callback is not None:
r = callback(changes)
if inspect.isawaitable(r):
await r
await anyio.to_thread.run_sync(process.stop)
process = await anyio.to_thread.run_sync(start_process, target, target_type, args, kwargs, changes)
reloads += 1
await anyio.to_thread.run_sync(process.stop)
return reloads
# Use spawn context to make sure code run in subprocess
# does not reuse imported modules in main process/context
spawn_context = get_context('spawn')
def split_cmd(cmd: str) -> List[str]:
import platform
posix = platform.uname().system.lower() != 'windows'
return shlex.split(cmd, posix=posix)
def start_process(
target: Union[str, Callable[..., Any]],
target_type: "Literal['function', 'command']",
args: Tuple[Any, ...],
kwargs: Optional[Dict[str, Any]],
changes: Optional[Set[FileChange]] = None,
) -> 'CombinedProcess':
if changes is None:
changes_env_var = '[]'
else:
changes_env_var = json.dumps([[c.raw_str(), p] for c, p in changes])
os.environ['WATCHFILES_CHANGES'] = changes_env_var
process: 'Union[SpawnProcess, subprocess.Popen[bytes]]'
if target_type == 'function':
kwargs = kwargs or {}
if isinstance(target, str):
args = target, get_tty_path(), args, kwargs
target_ = run_function
kwargs = {}
else:
target_ = target
process = spawn_context.Process(target=target_, args=args, kwargs=kwargs)
process.start()
else:
if args or kwargs:
logger.warning('ignoring args and kwargs for "command" target')
assert isinstance(target, str), 'target must be a string to run as a command'
popen_args = split_cmd(target)
process = subprocess.Popen(popen_args)
return CombinedProcess(process)
def detect_target_type(target: Union[str, Callable[..., Any]]) -> "Literal['function', 'command']":
"""
Used by [`run_process`][watchfiles.run_process], [`arun_process`][watchfiles.arun_process]
and indirectly the CLI to determine the target type with `target_type` is `auto`.
Detects the target type - either `function` or `command`. This method is only called with `target_type='auto'`.
The following logic is employed:
* If `target` is not a string, it is assumed to be a function
* If `target` ends with `.py` or `.sh`, it is assumed to be a command
* Otherwise, the target is assumed to be a function if it matches the regex `[a-zA-Z0-9_]+(\\.[a-zA-Z0-9_]+)+`
If this logic does not work for you, specify the target type explicitly using the `target_type` function argument
or `--target-type` command line argument.
Args:
target: The target value
Returns:
either `'function'` or `'command'`
"""
if not isinstance(target, str):
return 'function'
elif target.endswith(('.py', '.sh')):
return 'command'
elif re.fullmatch(r'[a-zA-Z0-9_]+(\.[a-zA-Z0-9_]+)+', target):
return 'function'
else:
return 'command'
class CombinedProcess:
def __init__(self, p: 'Union[SpawnProcess, subprocess.Popen[bytes]]'):
self._p = p
assert self.pid is not None, 'process not yet spawned'
def stop(self, sigint_timeout: int = 5, sigkill_timeout: int = 1) -> None:
os.environ.pop('WATCHFILES_CHANGES', None)
if self.is_alive():
logger.debug('stopping process...')
os.kill(self.pid, signal.SIGINT)
try:
self.join(sigint_timeout)
except subprocess.TimeoutExpired:
# Capture this exception to allow the self.exitcode to be reached.
# This will allow the SIGKILL to be sent, otherwise it is swallowed up.
logger.warning('SIGINT timed out after %r seconds', sigint_timeout)
pass
if self.exitcode is None:
logger.warning('process has not terminated, sending SIGKILL')
os.kill(self.pid, signal.SIGKILL)
self.join(sigkill_timeout)
else:
logger.debug('process stopped')
else:
logger.warning('process already dead, exit code: %d', self.exitcode)
def is_alive(self) -> bool:
if isinstance(self._p, SpawnProcess):
return self._p.is_alive()
else:
return self._p.poll() is None
@property
def pid(self) -> int:
# we check the process has always been spawned when CombinedProcess is initialised
return self._p.pid # type: ignore[return-value]
def join(self, timeout: int) -> None:
if isinstance(self._p, SpawnProcess):
self._p.join(timeout)
else:
self._p.wait(timeout)
@property
def exitcode(self) -> Optional[int]:
if isinstance(self._p, SpawnProcess):
return self._p.exitcode
else:
return self._p.returncode
def run_function(function: str, tty_path: Optional[str], args: Tuple[Any, ...], kwargs: Dict[str, Any]) -> None:
with set_tty(tty_path):
func = import_string(function)
func(*args, **kwargs)
def import_string(dotted_path: str) -> Any:
"""
Stolen approximately from django. Import a dotted module path and return the attribute/class designated by the
last name in the path. Raise ImportError if the import fails.
"""
try:
module_path, class_name = dotted_path.strip(' ').rsplit('.', 1)
except ValueError as e:
raise ImportError(f'"{dotted_path}" doesn\'t look like a module path') from e
module = import_module(module_path)
try:
return getattr(module, class_name)
except AttributeError as e:
raise ImportError(f'Module "{module_path}" does not define a "{class_name}" attribute') from e
def get_tty_path() -> Optional[str]: # pragma: no cover
"""
Return the path to the current TTY, if any.
Virtually impossible to test in pytest, hence no cover.
"""
try:
return os.ttyname(sys.stdin.fileno())
except OSError:
# fileno() always fails with pytest
return '/dev/tty'
except AttributeError:
# on Windows. No idea of a better solution
return None
@contextlib.contextmanager
def set_tty(tty_path: Optional[str]) -> Generator[None, None, None]:
if tty_path:
try:
with open(tty_path) as tty: # pragma: no cover
sys.stdin = tty
yield
except OSError:
# eg. "No such device or address: '/dev/tty'", see https://github.com/samuelcolvin/watchfiles/issues/40
yield
else:
# currently on windows tty_path is None and there's nothing we can do here
yield
def raise_keyboard_interrupt(signum: int, _frame: Any) -> None: # pragma: no cover
logger.warning('received signal %s, raising KeyboardInterrupt', signal.Signals(signum))
raise KeyboardInterrupt
def catch_sigterm() -> None:
"""
Catch SIGTERM and raise KeyboardInterrupt instead. This means watchfiles will stop quickly
on `docker compose stop` and other cases where SIGTERM is sent.
Without this the watchfiles process will be killed while a running process will continue uninterrupted.
"""
logger.debug('registering handler for SIGTERM on watchfiles process %d', os.getpid())
signal.signal(signal.SIGTERM, raise_keyboard_interrupt)