Skip to content

API Reference

Public API of litestar-keycloak. Import from the top-level package:

from litestar_keycloak import (
    KeycloakPlugin,
    KeycloakConfig,
    TokenLocation,
    KeycloakUser,
    TokenPayload,
    require_roles,
    require_client_roles,
    require_scopes,
    MatchStrategy,
)

Plugin and config

litestar_keycloak.plugin.KeycloakPlugin

Bases: InitPluginProtocol

Litestar plugin that integrates Keycloak authentication.

Usage::

from litestar import Litestar
from litestar_keycloak import KeycloakPlugin, KeycloakConfig

app = Litestar(
    route_handlers=[...],
    plugins=[KeycloakPlugin(
        KeycloakConfig(
            server_url="https://keycloak.example.com",
            realm="my-realm",
            client_id="my-app",
        )
    )],
)
Source code in src/litestar_keycloak/plugin.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
class KeycloakPlugin(InitPluginProtocol):
    """Litestar plugin that integrates Keycloak authentication.

    Usage::

        from litestar import Litestar
        from litestar_keycloak import KeycloakPlugin, KeycloakConfig

        app = Litestar(
            route_handlers=[...],
            plugins=[KeycloakPlugin(
                KeycloakConfig(
                    server_url="https://keycloak.example.com",
                    realm="my-realm",
                    client_id="my-app",
                )
            )],
        )
    """

    __slots__ = ("_config", "_jwks_cache", "_verifier")

    def __init__(self, config: KeycloakConfig) -> None:
        self._config = config
        self._jwks_cache = JWKSCache(
            jwks_url=config.jwks_url,
            ttl=config.jwks_cache_ttl,
            http_timeout=config.http_timeout,
        )
        self._verifier = TokenVerifier(config, self._jwks_cache)

    def on_app_init(self, app_config: AppConfig) -> AppConfig:
        """Hook called by Litestar during application startup.

        Registers:
        - Authentication middleware (bearer token extraction + validation)
        - Exception handlers (``KeycloakError`` hierarchy -> HTTP responses)
        - DI providers (``current_user``, ``token_payload``, ``raw_token``)
        - OIDC routes (when ``include_routes`` is ``True``)
        - Lifespan handler to warm the JWKS cache on startup
        """
        # -- auth middleware -----------------------------------------------
        middleware_cls = create_auth_middleware(self._config, self._verifier)
        app_config.middleware.insert(0, middleware_cls)

        # -- exception handlers --------------------------------------------
        app_config.exception_handlers = cast(
            "Any",
            {
                **exception_handlers,
                **app_config.exception_handlers,
            },
        )

        # -- DI providers --------------------------------------------------
        app_config.dependencies = {
            **build_dependencies(),
            **app_config.dependencies,
        }

        # -- OIDC routes ---------------------------------------------------
        if self._config.include_routes:
            controller = build_auth_controller(self._config)
            app_config.route_handlers.append(controller)

        # -- JWKS warm-up on startup ---------------------------------------
        app_config.on_startup.append(self._on_startup)

        return app_config

    async def _on_startup(self) -> None:
        """Warm the JWKS cache so the first request doesn't pay fetch latency."""
        await self._jwks_cache.warm()

__slots__ = ('_config', '_jwks_cache', '_verifier') class-attribute instance-attribute

_config = config instance-attribute

_jwks_cache = JWKSCache(jwks_url=(config.jwks_url), ttl=(config.jwks_cache_ttl), http_timeout=(config.http_timeout)) instance-attribute

_verifier = TokenVerifier(config, self._jwks_cache) instance-attribute

__init__(config)

Source code in src/litestar_keycloak/plugin.py
50
51
52
53
54
55
56
57
def __init__(self, config: KeycloakConfig) -> None:
    self._config = config
    self._jwks_cache = JWKSCache(
        jwks_url=config.jwks_url,
        ttl=config.jwks_cache_ttl,
        http_timeout=config.http_timeout,
    )
    self._verifier = TokenVerifier(config, self._jwks_cache)

_on_startup() async

Warm the JWKS cache so the first request doesn't pay fetch latency.

Source code in src/litestar_keycloak/plugin.py
 98
 99
100
async def _on_startup(self) -> None:
    """Warm the JWKS cache so the first request doesn't pay fetch latency."""
    await self._jwks_cache.warm()

on_app_init(app_config)

Hook called by Litestar during application startup.

Registers: - Authentication middleware (bearer token extraction + validation) - Exception handlers (KeycloakError hierarchy -> HTTP responses) - DI providers (current_user, token_payload, raw_token) - OIDC routes (when include_routes is True) - Lifespan handler to warm the JWKS cache on startup

Source code in src/litestar_keycloak/plugin.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def on_app_init(self, app_config: AppConfig) -> AppConfig:
    """Hook called by Litestar during application startup.

    Registers:
    - Authentication middleware (bearer token extraction + validation)
    - Exception handlers (``KeycloakError`` hierarchy -> HTTP responses)
    - DI providers (``current_user``, ``token_payload``, ``raw_token``)
    - OIDC routes (when ``include_routes`` is ``True``)
    - Lifespan handler to warm the JWKS cache on startup
    """
    # -- auth middleware -----------------------------------------------
    middleware_cls = create_auth_middleware(self._config, self._verifier)
    app_config.middleware.insert(0, middleware_cls)

    # -- exception handlers --------------------------------------------
    app_config.exception_handlers = cast(
        "Any",
        {
            **exception_handlers,
            **app_config.exception_handlers,
        },
    )

    # -- DI providers --------------------------------------------------
    app_config.dependencies = {
        **build_dependencies(),
        **app_config.dependencies,
    }

    # -- OIDC routes ---------------------------------------------------
    if self._config.include_routes:
        controller = build_auth_controller(self._config)
        app_config.route_handlers.append(controller)

    # -- JWKS warm-up on startup ---------------------------------------
    app_config.on_startup.append(self._on_startup)

    return app_config

litestar_keycloak.config.KeycloakConfig dataclass

Immutable configuration for the Keycloak plugin.

Only server_url, realm, and client_id are required. Everything else has sensible defaults suitable for a typical confidential-client setup with RS256-signed JWTs.

Example::

KeycloakConfig(
    server_url="https://keycloak.example.com",
    realm="my-realm",
    client_id="my-app",
    client_secret="s3cret",
)
Source code in src/litestar_keycloak/config.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
@dataclass(frozen=True, slots=True)
class KeycloakConfig:
    """Immutable configuration for the Keycloak plugin.

    Only ``server_url``, ``realm``, and ``client_id`` are required.
    Everything else has sensible defaults suitable for a typical
    confidential-client setup with RS256-signed JWTs.

    Example::

        KeycloakConfig(
            server_url="https://keycloak.example.com",
            realm="my-realm",
            client_id="my-app",
            client_secret="s3cret",
        )
    """

    # -- connection --------------------------------------------------------
    server_url: str
    """Base Keycloak URL without trailing slash (e.g. ``https://kc.example.com``)."""

    realm: str
    """Keycloak realm name."""

    client_id: str
    """OIDC client identifier registered in the realm."""

    client_secret: str | None = None
    """Client secret for confidential clients.  ``None`` for public clients."""

    # -- token -------------------------------------------------------------
    token_location: TokenLocation = TokenLocation.HEADER
    """Where to read the bearer token from: ``Authorization`` header or a cookie."""

    cookie_name: str = "access_token"
    """Cookie name when ``token_location`` is ``COOKIE``."""

    algorithms: tuple[str, ...] = ("RS256",)
    """Accepted JWT signing algorithms."""

    scopes: tuple[str, ...] = ("openid",)
    """Scopes requested during the authorization code flow."""

    # -- JWKS --------------------------------------------------------------
    jwks_cache_ttl: int = 3600
    """How long (seconds) to cache the JWKS before re-fetching."""

    # -- routes ------------------------------------------------------------
    include_routes: bool = False
    """Mount ``/auth/login``, ``/callback``, ``/logout``, ``/refresh`` routes."""

    auth_prefix: str = "/auth"
    """URL prefix for the optional OIDC route group."""

    redirect_uri: str | None = None
    """OAuth2 redirect URI for the authorization code flow.
    Required when ``include_routes`` is ``True``."""

    # -- advanced ----------------------------------------------------------
    audience: str | None = None
    """Expected ``aud`` claim.  Defaults to ``client_id`` when ``None``."""

    optional_audiences: frozenset[str] = field(default_factory=frozenset)
    """Additional audiences to accept (e.g. service client IDs)."""

    http_timeout: int = 10
    """Timeout in seconds for outgoing HTTP calls to Keycloak."""

    excluded_paths: frozenset[str] = field(default_factory=frozenset)
    """Request paths that skip authentication entirely (e.g. health checks)."""

    # -- derived properties ------------------------------------------------

    @property
    def realm_url(self) -> str:
        """``{server_url}/realms/{realm}`` — base for all OIDC endpoints."""
        return f"{self.server_url.rstrip('/')}/realms/{self.realm}"

    @property
    def issuer(self) -> str:
        """Expected ``iss`` claim value (same as ``realm_url``)."""
        return self.realm_url

    @property
    def discovery_url(self) -> str:
        """OpenID Connect discovery document URL."""
        return f"{self.realm_url}/.well-known/openid-configuration"

    @property
    def jwks_url(self) -> str:
        """JSON Web Key Set endpoint URL."""
        return f"{self.realm_url}/protocol/openid-connect/certs"

    @property
    def authorization_url(self) -> str:
        """Authorization endpoint for the code flow."""
        return f"{self.realm_url}/protocol/openid-connect/auth"

    @property
    def token_url(self) -> str:
        """Token endpoint for code exchange and refresh."""
        return f"{self.realm_url}/protocol/openid-connect/token"

    @property
    def logout_url(self) -> str:
        """End-session endpoint."""
        return f"{self.realm_url}/protocol/openid-connect/logout"

    @property
    def effective_audience(self) -> str:
        """Primary audience for JWT validation (explicit or client_id)."""
        return self.audience if self.audience is not None else self.client_id

    @property
    def accepted_audiences(self) -> frozenset[str]:
        """All audiences accepted for JWT validation (primary + optional_audiences)."""
        return frozenset({self.effective_audience}) | self.optional_audiences

    @property
    def effective_excluded_paths(self) -> frozenset[str]:
        """Paths that skip auth: excluded_paths plus auth routes when include_routes."""
        if not self.include_routes:
            return self.excluded_paths
        auth_paths = {
            f"{self.auth_prefix}/login",
            f"{self.auth_prefix}/callback",
            f"{self.auth_prefix}/logout",
            f"{self.auth_prefix}/refresh",
        }
        return self.excluded_paths | auth_paths

    def __post_init__(self) -> None:
        if self.include_routes and self.redirect_uri is None:
            raise ValueError("redirect_uri is required when include_routes is True")
        if self.jwks_cache_ttl < 0:
            raise ValueError("jwks_cache_ttl must be non-negative")
        if self.http_timeout <= 0:
            raise ValueError("http_timeout must be positive")

accepted_audiences property

All audiences accepted for JWT validation (primary + optional_audiences).

algorithms = ('RS256',) class-attribute instance-attribute

Accepted JWT signing algorithms.

audience = None class-attribute instance-attribute

Expected aud claim. Defaults to client_id when None.

auth_prefix = '/auth' class-attribute instance-attribute

URL prefix for the optional OIDC route group.

authorization_url property

Authorization endpoint for the code flow.

client_id instance-attribute

OIDC client identifier registered in the realm.

client_secret = None class-attribute instance-attribute

Client secret for confidential clients. None for public clients.

cookie_name = 'access_token' class-attribute instance-attribute

Cookie name when token_location is COOKIE.

discovery_url property

OpenID Connect discovery document URL.

effective_audience property

Primary audience for JWT validation (explicit or client_id).

effective_excluded_paths property

Paths that skip auth: excluded_paths plus auth routes when include_routes.

excluded_paths = field(default_factory=frozenset) class-attribute instance-attribute

Request paths that skip authentication entirely (e.g. health checks).

http_timeout = 10 class-attribute instance-attribute

Timeout in seconds for outgoing HTTP calls to Keycloak.

include_routes = False class-attribute instance-attribute

Mount /auth/login, /callback, /logout, /refresh routes.

issuer property

Expected iss claim value (same as realm_url).

jwks_cache_ttl = 3600 class-attribute instance-attribute

How long (seconds) to cache the JWKS before re-fetching.

jwks_url property

JSON Web Key Set endpoint URL.

logout_url property

End-session endpoint.

optional_audiences = field(default_factory=frozenset) class-attribute instance-attribute

Additional audiences to accept (e.g. service client IDs).

realm instance-attribute

Keycloak realm name.

realm_url property

{server_url}/realms/{realm} — base for all OIDC endpoints.

redirect_uri = None class-attribute instance-attribute

OAuth2 redirect URI for the authorization code flow. Required when include_routes is True.

scopes = ('openid',) class-attribute instance-attribute

Scopes requested during the authorization code flow.

server_url instance-attribute

Base Keycloak URL without trailing slash (e.g. https://kc.example.com).

token_location = TokenLocation.HEADER class-attribute instance-attribute

Where to read the bearer token from: Authorization header or a cookie.

token_url property

Token endpoint for code exchange and refresh.

__init__(server_url, realm, client_id, client_secret=None, token_location=TokenLocation.HEADER, cookie_name='access_token', algorithms=('RS256',), scopes=('openid',), jwks_cache_ttl=3600, include_routes=False, auth_prefix='/auth', redirect_uri=None, audience=None, optional_audiences=frozenset(), http_timeout=10, excluded_paths=frozenset())

__post_init__()

Source code in src/litestar_keycloak/config.py
155
156
157
158
159
160
161
def __post_init__(self) -> None:
    if self.include_routes and self.redirect_uri is None:
        raise ValueError("redirect_uri is required when include_routes is True")
    if self.jwks_cache_ttl < 0:
        raise ValueError("jwks_cache_ttl must be non-negative")
    if self.http_timeout <= 0:
        raise ValueError("http_timeout must be positive")

litestar_keycloak.config.TokenLocation

Bases: Enum

Where the plugin looks for the access token on incoming requests.

Source code in src/litestar_keycloak/config.py
16
17
18
19
20
class TokenLocation(enum.Enum):
    """Where the plugin looks for the access token on incoming requests."""

    HEADER = "header"
    COOKIE = "cookie"

COOKIE = 'cookie' class-attribute instance-attribute

HEADER = 'header' class-attribute instance-attribute

Models

litestar_keycloak.models.KeycloakUser dataclass

High-level identity object injected into route handlers.

Wraps TokenPayload and exposes a flattened, ergonomic API so handlers don't need to know about raw JWT claim structures::

@get("/me")
async def me(current_user: KeycloakUser) -> dict:
    return {"name": current_user.name, "roles": current_user.realm_roles}
Source code in src/litestar_keycloak/models.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
@dataclass(frozen=True, slots=True)
class KeycloakUser:
    """High-level identity object injected into route handlers.

    Wraps ``TokenPayload`` and exposes a flattened, ergonomic API so
    handlers don't need to know about raw JWT claim structures::

        @get("/me")
        async def me(current_user: KeycloakUser) -> dict:
            return {"name": current_user.name, "roles": current_user.realm_roles}
    """

    sub: str
    """Keycloak user ID."""

    preferred_username: str | None = None
    email: str | None = None
    email_verified: bool = False
    given_name: str | None = None
    family_name: str | None = None
    name: str | None = None

    realm_roles: frozenset[str] = field(default_factory=frozenset)
    """All realm-level roles."""

    client_roles: dict[str, frozenset[str]] = field(default_factory=dict)
    """Client roles keyed by client ID."""

    scopes: frozenset[str] = field(default_factory=frozenset)
    """Token scopes as a set."""

    raw: TokenPayload | None = None
    """Full token payload for advanced use cases."""

    def has_role(self, role: str) -> bool:
        """Check if the user holds a realm role."""
        return role in self.realm_roles

    def has_client_role(self, client_id: str, role: str) -> bool:
        """Check if the user holds a role for a specific client."""
        return role in self.client_roles.get(client_id, frozenset())

    def has_scope(self, scope: str) -> bool:
        """Check if the token carries a specific scope."""
        return scope in self.scopes

    @classmethod
    def from_token(cls, payload: TokenPayload) -> KeycloakUser:
        """Construct from a validated ``TokenPayload``."""
        client_roles = {
            client_id: frozenset(access.get("roles", []))
            for client_id, access in payload.resource_access.items()
        }
        return cls(
            sub=payload.sub,
            preferred_username=payload.preferred_username,
            email=payload.email,
            email_verified=payload.email_verified,
            given_name=payload.given_name,
            family_name=payload.family_name,
            name=payload.name,
            realm_roles=payload.realm_roles,
            client_roles=client_roles,
            scopes=payload.scopes,
            raw=payload,
        )

client_roles = field(default_factory=dict) class-attribute instance-attribute

Client roles keyed by client ID.

email = None class-attribute instance-attribute

email_verified = False class-attribute instance-attribute

family_name = None class-attribute instance-attribute

given_name = None class-attribute instance-attribute

name = None class-attribute instance-attribute

preferred_username = None class-attribute instance-attribute

raw = None class-attribute instance-attribute

Full token payload for advanced use cases.

realm_roles = field(default_factory=frozenset) class-attribute instance-attribute

All realm-level roles.

scopes = field(default_factory=frozenset) class-attribute instance-attribute

Token scopes as a set.

sub instance-attribute

Keycloak user ID.

__init__(sub, preferred_username=None, email=None, email_verified=False, given_name=None, family_name=None, name=None, realm_roles=frozenset(), client_roles=dict(), scopes=frozenset(), raw=None)

from_token(payload) classmethod

Construct from a validated TokenPayload.

Source code in src/litestar_keycloak/models.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
@classmethod
def from_token(cls, payload: TokenPayload) -> KeycloakUser:
    """Construct from a validated ``TokenPayload``."""
    client_roles = {
        client_id: frozenset(access.get("roles", []))
        for client_id, access in payload.resource_access.items()
    }
    return cls(
        sub=payload.sub,
        preferred_username=payload.preferred_username,
        email=payload.email,
        email_verified=payload.email_verified,
        given_name=payload.given_name,
        family_name=payload.family_name,
        name=payload.name,
        realm_roles=payload.realm_roles,
        client_roles=client_roles,
        scopes=payload.scopes,
        raw=payload,
    )

has_client_role(client_id, role)

Check if the user holds a role for a specific client.

Source code in src/litestar_keycloak/models.py
161
162
163
def has_client_role(self, client_id: str, role: str) -> bool:
    """Check if the user holds a role for a specific client."""
    return role in self.client_roles.get(client_id, frozenset())

has_role(role)

Check if the user holds a realm role.

Source code in src/litestar_keycloak/models.py
157
158
159
def has_role(self, role: str) -> bool:
    """Check if the user holds a realm role."""
    return role in self.realm_roles

has_scope(scope)

Check if the token carries a specific scope.

Source code in src/litestar_keycloak/models.py
165
166
167
def has_scope(self, scope: str) -> bool:
    """Check if the token carries a specific scope."""
    return scope in self.scopes

litestar_keycloak.models.TokenPayload dataclass

Decoded and validated JWT claims.

Mirrors the standard OIDC claims plus Keycloak-specific role structures. Constructed by the token module after successful validation — fields are guaranteed to satisfy all configured checks (issuer, audience, expiry, algorithm) by the time this object exists.

Source code in src/litestar_keycloak/models.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
@dataclass(frozen=True, slots=True)
class TokenPayload:
    """Decoded and validated JWT claims.

    Mirrors the standard OIDC claims plus Keycloak-specific role
    structures.  Constructed by the ``token`` module after successful
    validation — fields are guaranteed to satisfy all configured checks
    (issuer, audience, expiry, algorithm) by the time this object exists.
    """

    # -- standard OIDC claims ---------------------------------------------
    sub: str
    """Subject identifier (Keycloak user ID, typically a UUID)."""

    iss: str
    """Issuer URL (``{server_url}/realms/{realm}``)."""

    aud: str | list[str]
    """Audience — single client ID or a list when multiple audiences are present."""

    exp: int
    """Expiration timestamp (Unix epoch seconds)."""

    iat: int
    """Issued-at timestamp (Unix epoch seconds)."""

    # -- optional standard claims ------------------------------------------
    azp: str | None = None
    """Authorized party — the client that requested the token."""

    scope: str = ""
    """Space-delimited scope string (e.g. ``"openid profile email"``)."""

    jti: str | None = None
    """Unique token identifier."""

    typ: str | None = None
    """Token type (typically ``"Bearer"``)."""

    # -- Keycloak-specific claims ------------------------------------------
    preferred_username: str | None = None
    email: str | None = None
    email_verified: bool = False
    given_name: str | None = None
    family_name: str | None = None
    name: str | None = None

    realm_access: dict[str, Any] = field(default_factory=dict)
    """Raw ``realm_access`` claim (e.g. ``{"roles": ["admin", "user"]}``)."""

    resource_access: dict[str, Any] = field(default_factory=dict)
    """Raw ``resource_access`` claim keyed by client ID."""

    # -- extra claims catchall ---------------------------------------------
    extra: dict[str, Any] = field(default_factory=dict)
    """Any remaining claims not explicitly modeled above."""

    # -- convenience -------------------------------------------------------

    @property
    def realm_roles(self) -> frozenset[str]:
        """Realm-level roles extracted from ``realm_access.roles``."""
        return frozenset(self.realm_access.get("roles", []))

    def client_roles(self, client_id: str) -> frozenset[str]:
        """Roles for a specific client from ``resource_access``."""
        access = self.resource_access.get(client_id, {})
        return frozenset(access.get("roles", []))

    @property
    def scopes(self) -> frozenset[str]:
        """Scope string split into a set for O(1) membership tests."""
        return frozenset(self.scope.split()) if self.scope else frozenset()

    @property
    def expires_at(self) -> datetime:
        """``exp`` as a timezone-aware UTC datetime."""
        return datetime.fromtimestamp(self.exp, tz=UTC)

    @property
    def issued_at(self) -> datetime:
        """``iat`` as a timezone-aware UTC datetime."""
        return datetime.fromtimestamp(self.iat, tz=UTC)

    @classmethod
    def from_claims(cls, claims: dict[str, Any]) -> TokenPayload:
        """Build a ``TokenPayload`` from a raw decoded JWT dict.

        Known fields are mapped to explicit attributes; everything else
        lands in ``extra`` so no claim is silently dropped.
        Keycloak may omit ``aud``; fall back to ``azp`` when building.
        """
        # Normalize aud (Keycloak sometimes omits it; use azp as fallback)
        normalized = dict(claims)
        if not normalized.get("aud"):
            normalized["aud"] = normalized.get("azp") or ""
        # Some OIDC providers omit "sub" from access tokens;
        # falling back to preferred_username
        if "sub" not in normalized or normalized.get("sub") is None:
            normalized["sub"] = normalized.get("preferred_username") or ""
        known_fields = {f.name for f in cls.__dataclass_fields__.values()} - {"extra"}
        known = {k: v for k, v in normalized.items() if k in known_fields}
        extra = {k: v for k, v in normalized.items() if k not in known_fields}
        return cls(**known, extra=extra)

aud instance-attribute

Audience — single client ID or a list when multiple audiences are present.

azp = None class-attribute instance-attribute

Authorized party — the client that requested the token.

email = None class-attribute instance-attribute

email_verified = False class-attribute instance-attribute

exp instance-attribute

Expiration timestamp (Unix epoch seconds).

expires_at property

exp as a timezone-aware UTC datetime.

extra = field(default_factory=dict) class-attribute instance-attribute

Any remaining claims not explicitly modeled above.

family_name = None class-attribute instance-attribute

given_name = None class-attribute instance-attribute

iat instance-attribute

Issued-at timestamp (Unix epoch seconds).

iss instance-attribute

Issuer URL ({server_url}/realms/{realm}).

issued_at property

iat as a timezone-aware UTC datetime.

jti = None class-attribute instance-attribute

Unique token identifier.

name = None class-attribute instance-attribute

preferred_username = None class-attribute instance-attribute

realm_access = field(default_factory=dict) class-attribute instance-attribute

Raw realm_access claim (e.g. {"roles": ["admin", "user"]}).

realm_roles property

Realm-level roles extracted from realm_access.roles.

resource_access = field(default_factory=dict) class-attribute instance-attribute

Raw resource_access claim keyed by client ID.

scope = '' class-attribute instance-attribute

Space-delimited scope string (e.g. "openid profile email").

scopes property

Scope string split into a set for O(1) membership tests.

sub instance-attribute

Subject identifier (Keycloak user ID, typically a UUID).

typ = None class-attribute instance-attribute

Token type (typically "Bearer").

__init__(sub, iss, aud, exp, iat, azp=None, scope='', jti=None, typ=None, preferred_username=None, email=None, email_verified=False, given_name=None, family_name=None, name=None, realm_access=dict(), resource_access=dict(), extra=dict())

client_roles(client_id)

Roles for a specific client from resource_access.

Source code in src/litestar_keycloak/models.py
81
82
83
84
def client_roles(self, client_id: str) -> frozenset[str]:
    """Roles for a specific client from ``resource_access``."""
    access = self.resource_access.get(client_id, {})
    return frozenset(access.get("roles", []))

from_claims(claims) classmethod

Build a TokenPayload from a raw decoded JWT dict.

Known fields are mapped to explicit attributes; everything else lands in extra so no claim is silently dropped. Keycloak may omit aud; fall back to azp when building.

Source code in src/litestar_keycloak/models.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
@classmethod
def from_claims(cls, claims: dict[str, Any]) -> TokenPayload:
    """Build a ``TokenPayload`` from a raw decoded JWT dict.

    Known fields are mapped to explicit attributes; everything else
    lands in ``extra`` so no claim is silently dropped.
    Keycloak may omit ``aud``; fall back to ``azp`` when building.
    """
    # Normalize aud (Keycloak sometimes omits it; use azp as fallback)
    normalized = dict(claims)
    if not normalized.get("aud"):
        normalized["aud"] = normalized.get("azp") or ""
    # Some OIDC providers omit "sub" from access tokens;
    # falling back to preferred_username
    if "sub" not in normalized or normalized.get("sub") is None:
        normalized["sub"] = normalized.get("preferred_username") or ""
    known_fields = {f.name for f in cls.__dataclass_fields__.values()} - {"extra"}
    known = {k: v for k, v in normalized.items() if k in known_fields}
    extra = {k: v for k, v in normalized.items() if k not in known_fields}
    return cls(**known, extra=extra)

Guards

litestar_keycloak.guards.MatchStrategy

Bases: Enum

How multiple required values are evaluated.

Source code in src/litestar_keycloak/guards.py
25
26
27
28
29
30
31
32
class MatchStrategy(enum.Enum):
    """How multiple required values are evaluated."""

    ALL = "all"
    """User must hold **every** required role/scope (default)."""

    ANY = "any"
    """User must hold **at least one** of the required roles/scopes."""

ALL = 'all' class-attribute instance-attribute

User must hold every required role/scope (default).

ANY = 'any' class-attribute instance-attribute

User must hold at least one of the required roles/scopes.

litestar_keycloak.guards.require_roles(*roles, strategy=MatchStrategy.ALL)

Guard factory that enforces realm-level roles.

Parameters:

Name Type Description Default
*roles str

One or more role names the user must hold.

()
strategy MatchStrategy

ALL (default) requires every role; ANY requires at least one.

ALL

Example::

@get("/admin", guards=[require_roles("admin")])
async def admin_panel() -> dict: ...

@get("/staff", guards=[require_roles(
    "admin", "manager", strategy=MatchStrategy.ANY)])
async def staff_area() -> dict: ...
Source code in src/litestar_keycloak/guards.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def require_roles(
    *roles: str,
    strategy: MatchStrategy = MatchStrategy.ALL,
) -> Callable[..., None]:
    """Guard factory that enforces realm-level roles.

    Args:
        *roles: One or more role names the user must hold.
        strategy: ``ALL`` (default) requires every role;
                  ``ANY`` requires at least one.

    Example::

        @get("/admin", guards=[require_roles("admin")])
        async def admin_panel() -> dict: ...

        @get("/staff", guards=[require_roles(
            "admin", "manager", strategy=MatchStrategy.ANY)])
        async def staff_area() -> dict: ...
    """
    required = frozenset(roles)

    def guard(
        connection: ASGIConnection[Any, Any, Any, Any], _: BaseRouteHandler
    ) -> None:
        user = _resolve_user(connection)
        if strategy is MatchStrategy.ALL:
            satisfied = required <= user.realm_roles
        else:
            satisfied = bool(required & user.realm_roles)

        if not satisfied:
            raise InsufficientRoleError(required=required, actual=user.realm_roles)

    return guard

litestar_keycloak.guards.require_client_roles(client_id, *roles, strategy=MatchStrategy.ALL)

Guard factory that enforces client-level roles.

Parameters:

Name Type Description Default
client_id str

The Keycloak client whose roles are checked.

required
*roles str

One or more role names the user must hold for client_id.

()
strategy MatchStrategy

ALL (default) or ANY.

ALL

Example::

@get("/billing", guards=[require_client_roles("billing-service", "read")])
async def billing() -> dict: ...
Source code in src/litestar_keycloak/guards.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def require_client_roles(
    client_id: str,
    *roles: str,
    strategy: MatchStrategy = MatchStrategy.ALL,
) -> Callable[..., None]:
    """Guard factory that enforces client-level roles.

    Args:
        client_id: The Keycloak client whose roles are checked.
        *roles: One or more role names the user must hold for *client_id*.
        strategy: ``ALL`` (default) or ``ANY``.

    Example::

        @get("/billing", guards=[require_client_roles("billing-service", "read")])
        async def billing() -> dict: ...
    """
    required = frozenset(roles)

    def guard(
        connection: ASGIConnection[Any, Any, Any, Any], _: BaseRouteHandler
    ) -> None:
        user = _resolve_user(connection)
        actual = user.client_roles.get(client_id, frozenset())

        if strategy is MatchStrategy.ALL:
            satisfied = required <= actual
        else:
            satisfied = bool(required & actual)

        if not satisfied:
            raise InsufficientRoleError(required=required, actual=actual)

    return guard

litestar_keycloak.guards.require_scopes(*scopes, strategy=MatchStrategy.ALL)

Guard factory that enforces token scopes.

Parameters:

Name Type Description Default
*scopes str

One or more scope strings the token must carry.

()
strategy MatchStrategy

ALL (default) or ANY.

ALL

Example::

@get("/reports", guards=[require_scopes("reports:read")])
async def reports() -> dict: ...
Source code in src/litestar_keycloak/guards.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def require_scopes(
    *scopes: str,
    strategy: MatchStrategy = MatchStrategy.ALL,
) -> Callable[..., None]:
    """Guard factory that enforces token scopes.

    Args:
        *scopes: One or more scope strings the token must carry.
        strategy: ``ALL`` (default) or ``ANY``.

    Example::

        @get("/reports", guards=[require_scopes("reports:read")])
        async def reports() -> dict: ...
    """
    required = frozenset(scopes)

    def guard(
        connection: ASGIConnection[Any, Any, Any, Any], _: BaseRouteHandler
    ) -> None:
        user = _resolve_user(connection)

        if strategy is MatchStrategy.ALL:
            satisfied = required <= user.scopes
        else:
            satisfied = bool(required & user.scopes)

        if not satisfied:
            raise InsufficientScopeError(required=required, actual=user.scopes)

    return guard