Varia's website https://varia.zone
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

149 lines
4.3 KiB

from __future__ import annotations
from collections.abc import Callable, Mapping
from io import SEEK_SET, UnsupportedOperation
from os import PathLike
from pathlib import Path
from typing import Any, BinaryIO, cast
from .. import (
BrokenResourceError,
ClosedResourceError,
EndOfStream,
TypedAttributeSet,
to_thread,
typed_attribute,
)
from ..abc import ByteReceiveStream, ByteSendStream
class FileStreamAttribute(TypedAttributeSet):
#: the open file descriptor
file: BinaryIO = typed_attribute()
#: the path of the file on the file system, if available (file must be a real file)
path: Path = typed_attribute()
#: the file number, if available (file must be a real file or a TTY)
fileno: int = typed_attribute()
class _BaseFileStream:
def __init__(self, file: BinaryIO):
self._file = file
async def aclose(self) -> None:
await to_thread.run_sync(self._file.close)
@property
def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]:
attributes: dict[Any, Callable[[], Any]] = {
FileStreamAttribute.file: lambda: self._file,
}
if hasattr(self._file, "name"):
attributes[FileStreamAttribute.path] = lambda: Path(self._file.name)
try:
self._file.fileno()
except UnsupportedOperation:
pass
else:
attributes[FileStreamAttribute.fileno] = lambda: self._file.fileno()
return attributes
class FileReadStream(_BaseFileStream, ByteReceiveStream):
"""
A byte stream that reads from a file in the file system.
:param file: a file that has been opened for reading in binary mode
.. versionadded:: 3.0
"""
@classmethod
async def from_path(cls, path: str | PathLike[str]) -> FileReadStream:
"""
Create a file read stream by opening the given file.
:param path: path of the file to read from
"""
file = await to_thread.run_sync(Path(path).open, "rb")
return cls(cast(BinaryIO, file))
async def receive(self, max_bytes: int = 65536) -> bytes:
try:
data = await to_thread.run_sync(self._file.read, max_bytes)
except ValueError:
raise ClosedResourceError from None
except OSError as exc:
raise BrokenResourceError from exc
if data:
return data
else:
raise EndOfStream
async def seek(self, position: int, whence: int = SEEK_SET) -> int:
"""
Seek the file to the given position.
.. seealso:: :meth:`io.IOBase.seek`
.. note:: Not all file descriptors are seekable.
:param position: position to seek the file to
:param whence: controls how ``position`` is interpreted
:return: the new absolute position
:raises OSError: if the file is not seekable
"""
return await to_thread.run_sync(self._file.seek, position, whence)
async def tell(self) -> int:
"""
Return the current stream position.
.. note:: Not all file descriptors are seekable.
:return: the current absolute position
:raises OSError: if the file is not seekable
"""
return await to_thread.run_sync(self._file.tell)
class FileWriteStream(_BaseFileStream, ByteSendStream):
"""
A byte stream that writes to a file in the file system.
:param file: a file that has been opened for writing in binary mode
.. versionadded:: 3.0
"""
@classmethod
async def from_path(
cls, path: str | PathLike[str], append: bool = False
) -> FileWriteStream:
"""
Create a file write stream by opening the given file for writing.
:param path: path of the file to write to
:param append: if ``True``, open the file for appending; if ``False``, any
existing file at the given path will be truncated
"""
mode = "ab" if append else "wb"
file = await to_thread.run_sync(Path(path).open, mode)
return cls(cast(BinaryIO, file))
async def send(self, item: bytes) -> None:
try:
await to_thread.run_sync(self._file.write, item)
except ValueError:
raise ClosedResourceError from None
except OSError as exc:
raise BrokenResourceError from exc