Varia's website
https://varia.zone
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
439 lines
15 KiB
439 lines
15 KiB
2 weeks ago
|
import contextlib
|
||
|
import json
|
||
|
import logging
|
||
|
import os
|
||
|
import re
|
||
|
import shlex
|
||
|
import signal
|
||
|
import subprocess
|
||
|
import sys
|
||
|
from importlib import import_module
|
||
|
from multiprocessing import get_context
|
||
|
from multiprocessing.context import SpawnProcess
|
||
|
from pathlib import Path
|
||
|
from time import sleep
|
||
|
from typing import TYPE_CHECKING, Any, Callable, Dict, Generator, List, Optional, Set, Tuple, Union
|
||
|
|
||
|
import anyio
|
||
|
|
||
|
from .filters import DefaultFilter
|
||
|
from .main import Change, FileChange, awatch, watch
|
||
|
|
||
|
if TYPE_CHECKING:
|
||
|
from typing import Literal
|
||
|
|
||
|
__all__ = 'run_process', 'arun_process', 'detect_target_type', 'import_string'
|
||
|
logger = logging.getLogger('watchfiles.main')
|
||
|
|
||
|
|
||
|
def run_process(
|
||
|
*paths: Union[Path, str],
|
||
|
target: Union[str, Callable[..., Any]],
|
||
|
args: Tuple[Any, ...] = (),
|
||
|
kwargs: Optional[Dict[str, Any]] = None,
|
||
|
target_type: "Literal['function', 'command', 'auto']" = 'auto',
|
||
|
callback: Optional[Callable[[Set[FileChange]], None]] = None,
|
||
|
watch_filter: Optional[Callable[[Change, str], bool]] = DefaultFilter(),
|
||
|
grace_period: float = 0,
|
||
|
debounce: int = 1_600,
|
||
|
step: int = 50,
|
||
|
debug: Optional[bool] = None,
|
||
|
sigint_timeout: int = 5,
|
||
|
sigkill_timeout: int = 1,
|
||
|
recursive: bool = True,
|
||
|
ignore_permission_denied: bool = False,
|
||
|
) -> int:
|
||
|
"""
|
||
|
Run a process and restart it upon file changes.
|
||
|
|
||
|
`run_process` can work in two ways:
|
||
|
|
||
|
* Using `multiprocessing.Process` † to run a python function
|
||
|
* Or, using `subprocess.Popen` to run a command
|
||
|
|
||
|
!!! note
|
||
|
|
||
|
**†** technically `multiprocessing.get_context('spawn').Process` to avoid forking and improve
|
||
|
code reload/import.
|
||
|
|
||
|
Internally, `run_process` uses [`watch`][watchfiles.watch] with `raise_interrupt=False` so the function
|
||
|
exits cleanly upon `Ctrl+C`.
|
||
|
|
||
|
Args:
|
||
|
*paths: matches the same argument of [`watch`][watchfiles.watch]
|
||
|
target: function or command to run
|
||
|
args: arguments to pass to `target`, only used if `target` is a function
|
||
|
kwargs: keyword arguments to pass to `target`, only used if `target` is a function
|
||
|
target_type: type of target. Can be `'function'`, `'command'`, or `'auto'` in which case
|
||
|
[`detect_target_type`][watchfiles.run.detect_target_type] is used to determine the type.
|
||
|
callback: function to call on each reload, the function should accept a set of changes as the sole argument
|
||
|
watch_filter: matches the same argument of [`watch`][watchfiles.watch]
|
||
|
grace_period: number of seconds after the process is started before watching for changes
|
||
|
debounce: matches the same argument of [`watch`][watchfiles.watch]
|
||
|
step: matches the same argument of [`watch`][watchfiles.watch]
|
||
|
debug: matches the same argument of [`watch`][watchfiles.watch]
|
||
|
sigint_timeout: the number of seconds to wait after sending sigint before sending sigkill
|
||
|
sigkill_timeout: the number of seconds to wait after sending sigkill before raising an exception
|
||
|
recursive: matches the same argument of [`watch`][watchfiles.watch]
|
||
|
|
||
|
Returns:
|
||
|
number of times the function was reloaded.
|
||
|
|
||
|
```py title="Example of run_process running a function"
|
||
|
from watchfiles import run_process
|
||
|
|
||
|
def callback(changes):
|
||
|
print('changes detected:', changes)
|
||
|
|
||
|
def foobar(a, b):
|
||
|
print('foobar called with:', a, b)
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
run_process('./path/to/dir', target=foobar, args=(1, 2), callback=callback)
|
||
|
```
|
||
|
|
||
|
As well as using a `callback` function, changes can be accessed from within the target function,
|
||
|
using the `WATCHFILES_CHANGES` environment variable.
|
||
|
|
||
|
```py title="Example of run_process accessing changes"
|
||
|
from watchfiles import run_process
|
||
|
|
||
|
def foobar(a, b, c):
|
||
|
# changes will be an empty list "[]" the first time the function is called
|
||
|
changes = os.getenv('WATCHFILES_CHANGES')
|
||
|
changes = json.loads(changes)
|
||
|
print('foobar called due to changes:', changes)
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
run_process('./path/to/dir', target=foobar, args=(1, 2, 3))
|
||
|
```
|
||
|
|
||
|
Again with the target as `command`, `WATCHFILES_CHANGES` can be used
|
||
|
to access changes.
|
||
|
|
||
|
```bash title="example.sh"
|
||
|
echo "changers: ${WATCHFILES_CHANGES}"
|
||
|
```
|
||
|
|
||
|
```py title="Example of run_process running a command"
|
||
|
from watchfiles import run_process
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
run_process('.', target='./example.sh')
|
||
|
```
|
||
|
"""
|
||
|
if target_type == 'auto':
|
||
|
target_type = detect_target_type(target)
|
||
|
|
||
|
logger.debug('running "%s" as %s', target, target_type)
|
||
|
catch_sigterm()
|
||
|
process = start_process(target, target_type, args, kwargs)
|
||
|
reloads = 0
|
||
|
|
||
|
if grace_period:
|
||
|
logger.debug('sleeping for %s seconds before watching for changes', grace_period)
|
||
|
sleep(grace_period)
|
||
|
|
||
|
try:
|
||
|
for changes in watch(
|
||
|
*paths,
|
||
|
watch_filter=watch_filter,
|
||
|
debounce=debounce,
|
||
|
step=step,
|
||
|
debug=debug,
|
||
|
raise_interrupt=False,
|
||
|
recursive=recursive,
|
||
|
ignore_permission_denied=ignore_permission_denied,
|
||
|
):
|
||
|
callback and callback(changes)
|
||
|
process.stop(sigint_timeout=sigint_timeout, sigkill_timeout=sigkill_timeout)
|
||
|
process = start_process(target, target_type, args, kwargs, changes)
|
||
|
reloads += 1
|
||
|
finally:
|
||
|
process.stop()
|
||
|
return reloads
|
||
|
|
||
|
|
||
|
async def arun_process(
|
||
|
*paths: Union[Path, str],
|
||
|
target: Union[str, Callable[..., Any]],
|
||
|
args: Tuple[Any, ...] = (),
|
||
|
kwargs: Optional[Dict[str, Any]] = None,
|
||
|
target_type: "Literal['function', 'command', 'auto']" = 'auto',
|
||
|
callback: Optional[Callable[[Set[FileChange]], Any]] = None,
|
||
|
watch_filter: Optional[Callable[[Change, str], bool]] = DefaultFilter(),
|
||
|
grace_period: float = 0,
|
||
|
debounce: int = 1_600,
|
||
|
step: int = 50,
|
||
|
debug: Optional[bool] = None,
|
||
|
recursive: bool = True,
|
||
|
ignore_permission_denied: bool = False,
|
||
|
) -> int:
|
||
|
"""
|
||
|
Async equivalent of [`run_process`][watchfiles.run_process], all arguments match those of `run_process` except
|
||
|
`callback` which can be a coroutine.
|
||
|
|
||
|
Starting and stopping the process and watching for changes is done in a separate thread.
|
||
|
|
||
|
As with `run_process`, internally `arun_process` uses [`awatch`][watchfiles.awatch], however `KeyboardInterrupt`
|
||
|
cannot be caught and suppressed in `awatch` so these errors need to be caught separately, see below.
|
||
|
|
||
|
```py title="Example of arun_process usage"
|
||
|
import asyncio
|
||
|
from watchfiles import arun_process
|
||
|
|
||
|
async def callback(changes):
|
||
|
await asyncio.sleep(0.1)
|
||
|
print('changes detected:', changes)
|
||
|
|
||
|
def foobar(a, b):
|
||
|
print('foobar called with:', a, b)
|
||
|
|
||
|
async def main():
|
||
|
await arun_process('.', target=foobar, args=(1, 2), callback=callback)
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
try:
|
||
|
asyncio.run(main())
|
||
|
except KeyboardInterrupt:
|
||
|
print('stopped via KeyboardInterrupt')
|
||
|
```
|
||
|
"""
|
||
|
import inspect
|
||
|
|
||
|
if target_type == 'auto':
|
||
|
target_type = detect_target_type(target)
|
||
|
|
||
|
logger.debug('running "%s" as %s', target, target_type)
|
||
|
catch_sigterm()
|
||
|
process = await anyio.to_thread.run_sync(start_process, target, target_type, args, kwargs)
|
||
|
reloads = 0
|
||
|
|
||
|
if grace_period:
|
||
|
logger.debug('sleeping for %s seconds before watching for changes', grace_period)
|
||
|
await anyio.sleep(grace_period)
|
||
|
|
||
|
async for changes in awatch(
|
||
|
*paths,
|
||
|
watch_filter=watch_filter,
|
||
|
debounce=debounce,
|
||
|
step=step,
|
||
|
debug=debug,
|
||
|
recursive=recursive,
|
||
|
ignore_permission_denied=ignore_permission_denied,
|
||
|
):
|
||
|
if callback is not None:
|
||
|
r = callback(changes)
|
||
|
if inspect.isawaitable(r):
|
||
|
await r
|
||
|
|
||
|
await anyio.to_thread.run_sync(process.stop)
|
||
|
process = await anyio.to_thread.run_sync(start_process, target, target_type, args, kwargs, changes)
|
||
|
reloads += 1
|
||
|
await anyio.to_thread.run_sync(process.stop)
|
||
|
return reloads
|
||
|
|
||
|
|
||
|
# Use spawn context to make sure code run in subprocess
|
||
|
# does not reuse imported modules in main process/context
|
||
|
spawn_context = get_context('spawn')
|
||
|
|
||
|
|
||
|
def split_cmd(cmd: str) -> List[str]:
|
||
|
import platform
|
||
|
|
||
|
posix = platform.uname().system.lower() != 'windows'
|
||
|
return shlex.split(cmd, posix=posix)
|
||
|
|
||
|
|
||
|
def start_process(
|
||
|
target: Union[str, Callable[..., Any]],
|
||
|
target_type: "Literal['function', 'command']",
|
||
|
args: Tuple[Any, ...],
|
||
|
kwargs: Optional[Dict[str, Any]],
|
||
|
changes: Optional[Set[FileChange]] = None,
|
||
|
) -> 'CombinedProcess':
|
||
|
if changes is None:
|
||
|
changes_env_var = '[]'
|
||
|
else:
|
||
|
changes_env_var = json.dumps([[c.raw_str(), p] for c, p in changes])
|
||
|
|
||
|
os.environ['WATCHFILES_CHANGES'] = changes_env_var
|
||
|
|
||
|
process: 'Union[SpawnProcess, subprocess.Popen[bytes]]'
|
||
|
if target_type == 'function':
|
||
|
kwargs = kwargs or {}
|
||
|
if isinstance(target, str):
|
||
|
args = target, get_tty_path(), args, kwargs
|
||
|
target_ = run_function
|
||
|
kwargs = {}
|
||
|
else:
|
||
|
target_ = target
|
||
|
|
||
|
process = spawn_context.Process(target=target_, args=args, kwargs=kwargs)
|
||
|
process.start()
|
||
|
else:
|
||
|
if args or kwargs:
|
||
|
logger.warning('ignoring args and kwargs for "command" target')
|
||
|
|
||
|
assert isinstance(target, str), 'target must be a string to run as a command'
|
||
|
popen_args = split_cmd(target)
|
||
|
process = subprocess.Popen(popen_args)
|
||
|
return CombinedProcess(process)
|
||
|
|
||
|
|
||
|
def detect_target_type(target: Union[str, Callable[..., Any]]) -> "Literal['function', 'command']":
|
||
|
"""
|
||
|
Used by [`run_process`][watchfiles.run_process], [`arun_process`][watchfiles.arun_process]
|
||
|
and indirectly the CLI to determine the target type with `target_type` is `auto`.
|
||
|
|
||
|
Detects the target type - either `function` or `command`. This method is only called with `target_type='auto'`.
|
||
|
|
||
|
The following logic is employed:
|
||
|
|
||
|
* If `target` is not a string, it is assumed to be a function
|
||
|
* If `target` ends with `.py` or `.sh`, it is assumed to be a command
|
||
|
* Otherwise, the target is assumed to be a function if it matches the regex `[a-zA-Z0-9_]+(\\.[a-zA-Z0-9_]+)+`
|
||
|
|
||
|
If this logic does not work for you, specify the target type explicitly using the `target_type` function argument
|
||
|
or `--target-type` command line argument.
|
||
|
|
||
|
Args:
|
||
|
target: The target value
|
||
|
|
||
|
Returns:
|
||
|
either `'function'` or `'command'`
|
||
|
"""
|
||
|
if not isinstance(target, str):
|
||
|
return 'function'
|
||
|
elif target.endswith(('.py', '.sh')):
|
||
|
return 'command'
|
||
|
elif re.fullmatch(r'[a-zA-Z0-9_]+(\.[a-zA-Z0-9_]+)+', target):
|
||
|
return 'function'
|
||
|
else:
|
||
|
return 'command'
|
||
|
|
||
|
|
||
|
class CombinedProcess:
|
||
|
def __init__(self, p: 'Union[SpawnProcess, subprocess.Popen[bytes]]'):
|
||
|
self._p = p
|
||
|
assert self.pid is not None, 'process not yet spawned'
|
||
|
|
||
|
def stop(self, sigint_timeout: int = 5, sigkill_timeout: int = 1) -> None:
|
||
|
os.environ.pop('WATCHFILES_CHANGES', None)
|
||
|
if self.is_alive():
|
||
|
logger.debug('stopping process...')
|
||
|
|
||
|
os.kill(self.pid, signal.SIGINT)
|
||
|
|
||
|
try:
|
||
|
self.join(sigint_timeout)
|
||
|
except subprocess.TimeoutExpired:
|
||
|
# Capture this exception to allow the self.exitcode to be reached.
|
||
|
# This will allow the SIGKILL to be sent, otherwise it is swallowed up.
|
||
|
logger.warning('SIGINT timed out after %r seconds', sigint_timeout)
|
||
|
pass
|
||
|
|
||
|
if self.exitcode is None:
|
||
|
logger.warning('process has not terminated, sending SIGKILL')
|
||
|
os.kill(self.pid, signal.SIGKILL)
|
||
|
self.join(sigkill_timeout)
|
||
|
else:
|
||
|
logger.debug('process stopped')
|
||
|
else:
|
||
|
logger.warning('process already dead, exit code: %d', self.exitcode)
|
||
|
|
||
|
def is_alive(self) -> bool:
|
||
|
if isinstance(self._p, SpawnProcess):
|
||
|
return self._p.is_alive()
|
||
|
else:
|
||
|
return self._p.poll() is None
|
||
|
|
||
|
@property
|
||
|
def pid(self) -> int:
|
||
|
# we check the process has always been spawned when CombinedProcess is initialised
|
||
|
return self._p.pid # type: ignore[return-value]
|
||
|
|
||
|
def join(self, timeout: int) -> None:
|
||
|
if isinstance(self._p, SpawnProcess):
|
||
|
self._p.join(timeout)
|
||
|
else:
|
||
|
self._p.wait(timeout)
|
||
|
|
||
|
@property
|
||
|
def exitcode(self) -> Optional[int]:
|
||
|
if isinstance(self._p, SpawnProcess):
|
||
|
return self._p.exitcode
|
||
|
else:
|
||
|
return self._p.returncode
|
||
|
|
||
|
|
||
|
def run_function(function: str, tty_path: Optional[str], args: Tuple[Any, ...], kwargs: Dict[str, Any]) -> None:
|
||
|
with set_tty(tty_path):
|
||
|
func = import_string(function)
|
||
|
func(*args, **kwargs)
|
||
|
|
||
|
|
||
|
def import_string(dotted_path: str) -> Any:
|
||
|
"""
|
||
|
Stolen approximately from django. Import a dotted module path and return the attribute/class designated by the
|
||
|
last name in the path. Raise ImportError if the import fails.
|
||
|
"""
|
||
|
try:
|
||
|
module_path, class_name = dotted_path.strip(' ').rsplit('.', 1)
|
||
|
except ValueError as e:
|
||
|
raise ImportError(f'"{dotted_path}" doesn\'t look like a module path') from e
|
||
|
|
||
|
module = import_module(module_path)
|
||
|
try:
|
||
|
return getattr(module, class_name)
|
||
|
except AttributeError as e:
|
||
|
raise ImportError(f'Module "{module_path}" does not define a "{class_name}" attribute') from e
|
||
|
|
||
|
|
||
|
def get_tty_path() -> Optional[str]: # pragma: no cover
|
||
|
"""
|
||
|
Return the path to the current TTY, if any.
|
||
|
|
||
|
Virtually impossible to test in pytest, hence no cover.
|
||
|
"""
|
||
|
try:
|
||
|
return os.ttyname(sys.stdin.fileno())
|
||
|
except OSError:
|
||
|
# fileno() always fails with pytest
|
||
|
return '/dev/tty'
|
||
|
except AttributeError:
|
||
|
# on Windows. No idea of a better solution
|
||
|
return None
|
||
|
|
||
|
|
||
|
@contextlib.contextmanager
|
||
|
def set_tty(tty_path: Optional[str]) -> Generator[None, None, None]:
|
||
|
if tty_path:
|
||
|
try:
|
||
|
with open(tty_path) as tty: # pragma: no cover
|
||
|
sys.stdin = tty
|
||
|
yield
|
||
|
except OSError:
|
||
|
# eg. "No such device or address: '/dev/tty'", see https://github.com/samuelcolvin/watchfiles/issues/40
|
||
|
yield
|
||
|
else:
|
||
|
# currently on windows tty_path is None and there's nothing we can do here
|
||
|
yield
|
||
|
|
||
|
|
||
|
def raise_keyboard_interrupt(signum: int, _frame: Any) -> None: # pragma: no cover
|
||
|
logger.warning('received signal %s, raising KeyboardInterrupt', signal.Signals(signum))
|
||
|
raise KeyboardInterrupt
|
||
|
|
||
|
|
||
|
def catch_sigterm() -> None:
|
||
|
"""
|
||
|
Catch SIGTERM and raise KeyboardInterrupt instead. This means watchfiles will stop quickly
|
||
|
on `docker compose stop` and other cases where SIGTERM is sent.
|
||
|
|
||
|
Without this the watchfiles process will be killed while a running process will continue uninterrupted.
|
||
|
"""
|
||
|
logger.debug('registering handler for SIGTERM on watchfiles process %d', os.getpid())
|
||
|
signal.signal(signal.SIGTERM, raise_keyboard_interrupt)
|