You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
405 lines
15 KiB
405 lines
15 KiB
3 years ago
|
import hashlib
|
||
|
import typing as t
|
||
|
import warnings
|
||
|
from collections.abc import MutableMapping
|
||
|
from datetime import datetime
|
||
|
|
||
|
from itsdangerous import BadSignature
|
||
|
from itsdangerous import URLSafeTimedSerializer
|
||
|
from werkzeug.datastructures import CallbackDict
|
||
|
|
||
|
from .helpers import is_ip
|
||
|
from .json.tag import TaggedJSONSerializer
|
||
|
|
||
|
if t.TYPE_CHECKING:
|
||
|
import typing_extensions as te
|
||
|
from .app import Flask
|
||
|
from .wrappers import Request, Response
|
||
|
|
||
|
|
||
|
class SessionMixin(MutableMapping):
|
||
|
"""Expands a basic dictionary with session attributes."""
|
||
|
|
||
|
@property
|
||
|
def permanent(self) -> bool:
|
||
|
"""This reflects the ``'_permanent'`` key in the dict."""
|
||
|
return self.get("_permanent", False)
|
||
|
|
||
|
@permanent.setter
|
||
|
def permanent(self, value: bool) -> None:
|
||
|
self["_permanent"] = bool(value)
|
||
|
|
||
|
#: Some implementations can detect whether a session is newly
|
||
|
#: created, but that is not guaranteed. Use with caution. The mixin
|
||
|
# default is hard-coded ``False``.
|
||
|
new = False
|
||
|
|
||
|
#: Some implementations can detect changes to the session and set
|
||
|
#: this when that happens. The mixin default is hard coded to
|
||
|
#: ``True``.
|
||
|
modified = True
|
||
|
|
||
|
#: Some implementations can detect when session data is read or
|
||
|
#: written and set this when that happens. The mixin default is hard
|
||
|
#: coded to ``True``.
|
||
|
accessed = True
|
||
|
|
||
|
|
||
|
class SecureCookieSession(CallbackDict, SessionMixin):
|
||
|
"""Base class for sessions based on signed cookies.
|
||
|
|
||
|
This session backend will set the :attr:`modified` and
|
||
|
:attr:`accessed` attributes. It cannot reliably track whether a
|
||
|
session is new (vs. empty), so :attr:`new` remains hard coded to
|
||
|
``False``.
|
||
|
"""
|
||
|
|
||
|
#: When data is changed, this is set to ``True``. Only the session
|
||
|
#: dictionary itself is tracked; if the session contains mutable
|
||
|
#: data (for example a nested dict) then this must be set to
|
||
|
#: ``True`` manually when modifying that data. The session cookie
|
||
|
#: will only be written to the response if this is ``True``.
|
||
|
modified = False
|
||
|
|
||
|
#: When data is read or written, this is set to ``True``. Used by
|
||
|
# :class:`.SecureCookieSessionInterface` to add a ``Vary: Cookie``
|
||
|
#: header, which allows caching proxies to cache different pages for
|
||
|
#: different users.
|
||
|
accessed = False
|
||
|
|
||
|
def __init__(self, initial: t.Any = None) -> None:
|
||
|
def on_update(self) -> None:
|
||
|
self.modified = True
|
||
|
self.accessed = True
|
||
|
|
||
|
super().__init__(initial, on_update)
|
||
|
|
||
|
def __getitem__(self, key: str) -> t.Any:
|
||
|
self.accessed = True
|
||
|
return super().__getitem__(key)
|
||
|
|
||
|
def get(self, key: str, default: t.Any = None) -> t.Any:
|
||
|
self.accessed = True
|
||
|
return super().get(key, default)
|
||
|
|
||
|
def setdefault(self, key: str, default: t.Any = None) -> t.Any:
|
||
|
self.accessed = True
|
||
|
return super().setdefault(key, default)
|
||
|
|
||
|
|
||
|
class NullSession(SecureCookieSession):
|
||
|
"""Class used to generate nicer error messages if sessions are not
|
||
|
available. Will still allow read-only access to the empty session
|
||
|
but fail on setting.
|
||
|
"""
|
||
|
|
||
|
def _fail(self, *args: t.Any, **kwargs: t.Any) -> "te.NoReturn":
|
||
|
raise RuntimeError(
|
||
|
"The session is unavailable because no secret "
|
||
|
"key was set. Set the secret_key on the "
|
||
|
"application to something unique and secret."
|
||
|
)
|
||
|
|
||
|
__setitem__ = __delitem__ = clear = pop = popitem = update = setdefault = _fail # type: ignore # noqa: B950
|
||
|
del _fail
|
||
|
|
||
|
|
||
|
class SessionInterface:
|
||
|
"""The basic interface you have to implement in order to replace the
|
||
|
default session interface which uses werkzeug's securecookie
|
||
|
implementation. The only methods you have to implement are
|
||
|
:meth:`open_session` and :meth:`save_session`, the others have
|
||
|
useful defaults which you don't need to change.
|
||
|
|
||
|
The session object returned by the :meth:`open_session` method has to
|
||
|
provide a dictionary like interface plus the properties and methods
|
||
|
from the :class:`SessionMixin`. We recommend just subclassing a dict
|
||
|
and adding that mixin::
|
||
|
|
||
|
class Session(dict, SessionMixin):
|
||
|
pass
|
||
|
|
||
|
If :meth:`open_session` returns ``None`` Flask will call into
|
||
|
:meth:`make_null_session` to create a session that acts as replacement
|
||
|
if the session support cannot work because some requirement is not
|
||
|
fulfilled. The default :class:`NullSession` class that is created
|
||
|
will complain that the secret key was not set.
|
||
|
|
||
|
To replace the session interface on an application all you have to do
|
||
|
is to assign :attr:`flask.Flask.session_interface`::
|
||
|
|
||
|
app = Flask(__name__)
|
||
|
app.session_interface = MySessionInterface()
|
||
|
|
||
|
.. versionadded:: 0.8
|
||
|
"""
|
||
|
|
||
|
#: :meth:`make_null_session` will look here for the class that should
|
||
|
#: be created when a null session is requested. Likewise the
|
||
|
#: :meth:`is_null_session` method will perform a typecheck against
|
||
|
#: this type.
|
||
|
null_session_class = NullSession
|
||
|
|
||
|
#: A flag that indicates if the session interface is pickle based.
|
||
|
#: This can be used by Flask extensions to make a decision in regards
|
||
|
#: to how to deal with the session object.
|
||
|
#:
|
||
|
#: .. versionadded:: 0.10
|
||
|
pickle_based = False
|
||
|
|
||
|
def make_null_session(self, app: "Flask") -> NullSession:
|
||
|
"""Creates a null session which acts as a replacement object if the
|
||
|
real session support could not be loaded due to a configuration
|
||
|
error. This mainly aids the user experience because the job of the
|
||
|
null session is to still support lookup without complaining but
|
||
|
modifications are answered with a helpful error message of what
|
||
|
failed.
|
||
|
|
||
|
This creates an instance of :attr:`null_session_class` by default.
|
||
|
"""
|
||
|
return self.null_session_class()
|
||
|
|
||
|
def is_null_session(self, obj: object) -> bool:
|
||
|
"""Checks if a given object is a null session. Null sessions are
|
||
|
not asked to be saved.
|
||
|
|
||
|
This checks if the object is an instance of :attr:`null_session_class`
|
||
|
by default.
|
||
|
"""
|
||
|
return isinstance(obj, self.null_session_class)
|
||
|
|
||
|
def get_cookie_name(self, app: "Flask") -> str:
|
||
|
"""Returns the name of the session cookie.
|
||
|
|
||
|
Uses ``app.session_cookie_name`` which is set to ``SESSION_COOKIE_NAME``
|
||
|
"""
|
||
|
return app.session_cookie_name
|
||
|
|
||
|
def get_cookie_domain(self, app: "Flask") -> t.Optional[str]:
|
||
|
"""Returns the domain that should be set for the session cookie.
|
||
|
|
||
|
Uses ``SESSION_COOKIE_DOMAIN`` if it is configured, otherwise
|
||
|
falls back to detecting the domain based on ``SERVER_NAME``.
|
||
|
|
||
|
Once detected (or if not set at all), ``SESSION_COOKIE_DOMAIN`` is
|
||
|
updated to avoid re-running the logic.
|
||
|
"""
|
||
|
|
||
|
rv = app.config["SESSION_COOKIE_DOMAIN"]
|
||
|
|
||
|
# set explicitly, or cached from SERVER_NAME detection
|
||
|
# if False, return None
|
||
|
if rv is not None:
|
||
|
return rv if rv else None
|
||
|
|
||
|
rv = app.config["SERVER_NAME"]
|
||
|
|
||
|
# server name not set, cache False to return none next time
|
||
|
if not rv:
|
||
|
app.config["SESSION_COOKIE_DOMAIN"] = False
|
||
|
return None
|
||
|
|
||
|
# chop off the port which is usually not supported by browsers
|
||
|
# remove any leading '.' since we'll add that later
|
||
|
rv = rv.rsplit(":", 1)[0].lstrip(".")
|
||
|
|
||
|
if "." not in rv:
|
||
|
# Chrome doesn't allow names without a '.'. This should only
|
||
|
# come up with localhost. Hack around this by not setting
|
||
|
# the name, and show a warning.
|
||
|
warnings.warn(
|
||
|
f"{rv!r} is not a valid cookie domain, it must contain"
|
||
|
" a '.'. Add an entry to your hosts file, for example"
|
||
|
f" '{rv}.localdomain', and use that instead."
|
||
|
)
|
||
|
app.config["SESSION_COOKIE_DOMAIN"] = False
|
||
|
return None
|
||
|
|
||
|
ip = is_ip(rv)
|
||
|
|
||
|
if ip:
|
||
|
warnings.warn(
|
||
|
"The session cookie domain is an IP address. This may not work"
|
||
|
" as intended in some browsers. Add an entry to your hosts"
|
||
|
' file, for example "localhost.localdomain", and use that'
|
||
|
" instead."
|
||
|
)
|
||
|
|
||
|
# if this is not an ip and app is mounted at the root, allow subdomain
|
||
|
# matching by adding a '.' prefix
|
||
|
if self.get_cookie_path(app) == "/" and not ip:
|
||
|
rv = f".{rv}"
|
||
|
|
||
|
app.config["SESSION_COOKIE_DOMAIN"] = rv
|
||
|
return rv
|
||
|
|
||
|
def get_cookie_path(self, app: "Flask") -> str:
|
||
|
"""Returns the path for which the cookie should be valid. The
|
||
|
default implementation uses the value from the ``SESSION_COOKIE_PATH``
|
||
|
config var if it's set, and falls back to ``APPLICATION_ROOT`` or
|
||
|
uses ``/`` if it's ``None``.
|
||
|
"""
|
||
|
return app.config["SESSION_COOKIE_PATH"] or app.config["APPLICATION_ROOT"]
|
||
|
|
||
|
def get_cookie_httponly(self, app: "Flask") -> bool:
|
||
|
"""Returns True if the session cookie should be httponly. This
|
||
|
currently just returns the value of the ``SESSION_COOKIE_HTTPONLY``
|
||
|
config var.
|
||
|
"""
|
||
|
return app.config["SESSION_COOKIE_HTTPONLY"]
|
||
|
|
||
|
def get_cookie_secure(self, app: "Flask") -> bool:
|
||
|
"""Returns True if the cookie should be secure. This currently
|
||
|
just returns the value of the ``SESSION_COOKIE_SECURE`` setting.
|
||
|
"""
|
||
|
return app.config["SESSION_COOKIE_SECURE"]
|
||
|
|
||
|
def get_cookie_samesite(self, app: "Flask") -> str:
|
||
|
"""Return ``'Strict'`` or ``'Lax'`` if the cookie should use the
|
||
|
``SameSite`` attribute. This currently just returns the value of
|
||
|
the :data:`SESSION_COOKIE_SAMESITE` setting.
|
||
|
"""
|
||
|
return app.config["SESSION_COOKIE_SAMESITE"]
|
||
|
|
||
|
def get_expiration_time(
|
||
|
self, app: "Flask", session: SessionMixin
|
||
|
) -> t.Optional[datetime]:
|
||
|
"""A helper method that returns an expiration date for the session
|
||
|
or ``None`` if the session is linked to the browser session. The
|
||
|
default implementation returns now + the permanent session
|
||
|
lifetime configured on the application.
|
||
|
"""
|
||
|
if session.permanent:
|
||
|
return datetime.utcnow() + app.permanent_session_lifetime
|
||
|
return None
|
||
|
|
||
|
def should_set_cookie(self, app: "Flask", session: SessionMixin) -> bool:
|
||
|
"""Used by session backends to determine if a ``Set-Cookie`` header
|
||
|
should be set for this session cookie for this response. If the session
|
||
|
has been modified, the cookie is set. If the session is permanent and
|
||
|
the ``SESSION_REFRESH_EACH_REQUEST`` config is true, the cookie is
|
||
|
always set.
|
||
|
|
||
|
This check is usually skipped if the session was deleted.
|
||
|
|
||
|
.. versionadded:: 0.11
|
||
|
"""
|
||
|
|
||
|
return session.modified or (
|
||
|
session.permanent and app.config["SESSION_REFRESH_EACH_REQUEST"]
|
||
|
)
|
||
|
|
||
|
def open_session(
|
||
|
self, app: "Flask", request: "Request"
|
||
|
) -> t.Optional[SessionMixin]:
|
||
|
"""This method has to be implemented and must either return ``None``
|
||
|
in case the loading failed because of a configuration error or an
|
||
|
instance of a session object which implements a dictionary like
|
||
|
interface + the methods and attributes on :class:`SessionMixin`.
|
||
|
"""
|
||
|
raise NotImplementedError()
|
||
|
|
||
|
def save_session(
|
||
|
self, app: "Flask", session: SessionMixin, response: "Response"
|
||
|
) -> None:
|
||
|
"""This is called for actual sessions returned by :meth:`open_session`
|
||
|
at the end of the request. This is still called during a request
|
||
|
context so if you absolutely need access to the request you can do
|
||
|
that.
|
||
|
"""
|
||
|
raise NotImplementedError()
|
||
|
|
||
|
|
||
|
session_json_serializer = TaggedJSONSerializer()
|
||
|
|
||
|
|
||
|
class SecureCookieSessionInterface(SessionInterface):
|
||
|
"""The default session interface that stores sessions in signed cookies
|
||
|
through the :mod:`itsdangerous` module.
|
||
|
"""
|
||
|
|
||
|
#: the salt that should be applied on top of the secret key for the
|
||
|
#: signing of cookie based sessions.
|
||
|
salt = "cookie-session"
|
||
|
#: the hash function to use for the signature. The default is sha1
|
||
|
digest_method = staticmethod(hashlib.sha1)
|
||
|
#: the name of the itsdangerous supported key derivation. The default
|
||
|
#: is hmac.
|
||
|
key_derivation = "hmac"
|
||
|
#: A python serializer for the payload. The default is a compact
|
||
|
#: JSON derived serializer with support for some extra Python types
|
||
|
#: such as datetime objects or tuples.
|
||
|
serializer = session_json_serializer
|
||
|
session_class = SecureCookieSession
|
||
|
|
||
|
def get_signing_serializer(
|
||
|
self, app: "Flask"
|
||
|
) -> t.Optional[URLSafeTimedSerializer]:
|
||
|
if not app.secret_key:
|
||
|
return None
|
||
|
signer_kwargs = dict(
|
||
|
key_derivation=self.key_derivation, digest_method=self.digest_method
|
||
|
)
|
||
|
return URLSafeTimedSerializer(
|
||
|
app.secret_key,
|
||
|
salt=self.salt,
|
||
|
serializer=self.serializer,
|
||
|
signer_kwargs=signer_kwargs,
|
||
|
)
|
||
|
|
||
|
def open_session(
|
||
|
self, app: "Flask", request: "Request"
|
||
|
) -> t.Optional[SecureCookieSession]:
|
||
|
s = self.get_signing_serializer(app)
|
||
|
if s is None:
|
||
|
return None
|
||
|
val = request.cookies.get(self.get_cookie_name(app))
|
||
|
if not val:
|
||
|
return self.session_class()
|
||
|
max_age = int(app.permanent_session_lifetime.total_seconds())
|
||
|
try:
|
||
|
data = s.loads(val, max_age=max_age)
|
||
|
return self.session_class(data)
|
||
|
except BadSignature:
|
||
|
return self.session_class()
|
||
|
|
||
|
def save_session(
|
||
|
self, app: "Flask", session: SessionMixin, response: "Response"
|
||
|
) -> None:
|
||
|
name = self.get_cookie_name(app)
|
||
|
domain = self.get_cookie_domain(app)
|
||
|
path = self.get_cookie_path(app)
|
||
|
secure = self.get_cookie_secure(app)
|
||
|
samesite = self.get_cookie_samesite(app)
|
||
|
|
||
|
# If the session is modified to be empty, remove the cookie.
|
||
|
# If the session is empty, return without setting the cookie.
|
||
|
if not session:
|
||
|
if session.modified:
|
||
|
response.delete_cookie(
|
||
|
name, domain=domain, path=path, secure=secure, samesite=samesite
|
||
|
)
|
||
|
|
||
|
return
|
||
|
|
||
|
# Add a "Vary: Cookie" header if the session was accessed at all.
|
||
|
if session.accessed:
|
||
|
response.vary.add("Cookie")
|
||
|
|
||
|
if not self.should_set_cookie(app, session):
|
||
|
return
|
||
|
|
||
|
httponly = self.get_cookie_httponly(app)
|
||
|
expires = self.get_expiration_time(app, session)
|
||
|
val = self.get_signing_serializer(app).dumps(dict(session)) # type: ignore
|
||
|
response.set_cookie(
|
||
|
name,
|
||
|
val, # type: ignore
|
||
|
expires=expires,
|
||
|
httponly=httponly,
|
||
|
domain=domain,
|
||
|
path=path,
|
||
|
secure=secure,
|
||
|
samesite=samesite,
|
||
|
)
|