Skip to content

Commit 6361733

Browse files
grudloffevryshoooo
andauthored
feat: Add PKCE support (RFC 7636) (#670)
* feat: support PKCE in authorization flow (RFC 7636) * test: add tests for PKCE flow * docs: document PKCE usage in OpenID client * chore: address linting and formatting issues * test: full e2e pkce test * test: skip pkce for 22 and below --------- Co-authored-by: Richard Nemeth <ryshoooo@gmail.com>
1 parent 16411f7 commit 6361733

4 files changed

Lines changed: 198 additions & 2 deletions

File tree

docs/source/modules/openid_client.rst

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,3 +146,35 @@ Get auth status for a specific resource and scope by token
146146
147147
token = keycloak_openid.token("user", "password")
148148
auth_status = keycloak_openid.has_uma_access(token['access_token'], "Resource#Scope")
149+
150+
PKCE Authorization Flow Example
151+
----------------------------------------------
152+
153+
.. code-block:: python
154+
155+
from keycloak import KeycloakOpenID
156+
from keycloak.pkce_utils import generate_code_verifier, generate_code_challenge
157+
158+
# Configure client
159+
keycloak_openid = KeycloakOpenID(server_url="http://localhost:8080/",
160+
client_id="example_client",
161+
realm_name="example_realm")
162+
163+
# Generate PKCE values
164+
code_verifier = generate_code_verifier()
165+
code_challenge, code_challenge_method = generate_code_challenge(code_verifier)
166+
167+
# Get Code With Oauth Authorization Request (PKCE)
168+
auth_url = keycloak_openid.auth_url(
169+
redirect_uri="your_call_back_url",
170+
scope="email",
171+
state="your_state_info",
172+
code_challenge=code_challenge,
173+
code_challenge_method=code_challenge_method)
174+
175+
# Get Access Token With Code (PKCE)
176+
access_token = keycloak_openid.token(
177+
grant_type='authorization_code',
178+
code='the_code_you_get_from_auth_url_callback',
179+
redirect_uri="your_call_back_url",
180+
code_verifier=code_verifier)

src/keycloak/keycloak_openid.py

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,8 @@ def auth_url(
287287
scope: str = "email",
288288
state: str = "",
289289
nonce: str = "",
290+
code_challenge: str | None = None,
291+
code_challenge_method: str | None = None,
290292
) -> str:
291293
"""
292294
Get authorization URL endpoint.
@@ -299,6 +301,10 @@ def auth_url(
299301
:type state: str
300302
:param nonce: Associates a Client session with an ID Token to mitigate replay attacks
301303
:type nonce: str
304+
:param code_challenge: PKCE code challenge
305+
:type code_challenge: str
306+
:param code_challenge_method: PKCE code challenge method
307+
:type code_challenge_method: str
302308
:returns: Authorization URL Full Build
303309
:rtype: str
304310
"""
@@ -310,7 +316,12 @@ def auth_url(
310316
"state": state,
311317
"nonce": nonce,
312318
}
313-
return URL_AUTH.format(**params_path)
319+
url = URL_AUTH.format(**params_path)
320+
if code_challenge:
321+
url += f"&code_challenge={code_challenge}"
322+
if code_challenge_method:
323+
url += f"&code_challenge_method={code_challenge_method}"
324+
return url
314325

315326
def token(
316327
self,
@@ -321,6 +332,7 @@ def token(
321332
redirect_uri: str = "",
322333
totp: int | None = None,
323334
scope: str = "openid",
335+
code_verifier: str | None = None,
324336
**extra: dict,
325337
) -> dict:
326338
"""
@@ -347,6 +359,8 @@ def token(
347359
:type totp: int
348360
:param scope: Scope, defaults to openid
349361
:type scope: str
362+
:param code_verifier: PKCE code verifier
363+
:type code_verifier: str
350364
:param extra: Additional extra arguments
351365
:type extra: dict
352366
:returns: Keycloak token
@@ -362,6 +376,8 @@ def token(
362376
"redirect_uri": redirect_uri,
363377
"scope": scope,
364378
}
379+
if code_verifier:
380+
payload["code_verifier"] = code_verifier
365381
if extra:
366382
payload.update(extra)
367383

@@ -1033,6 +1049,8 @@ async def a_auth_url(
10331049
scope: str = "email",
10341050
state: str = "",
10351051
nonce: str = "",
1052+
code_challenge: str | None = None,
1053+
code_challenge_method: str | None = None,
10361054
) -> str:
10371055
"""
10381056
Get authorization URL endpoint asynchronously.
@@ -1045,6 +1063,10 @@ async def a_auth_url(
10451063
:type state: str
10461064
:param nonce: Associates a Client session with an ID Token to mitigate replay attacks
10471065
:type nonce: str
1066+
:param code_challenge: PKCE code challenge
1067+
:type code_challenge: str
1068+
:param code_challenge_method: PKCE code challenge method
1069+
:type code_challenge_method: str
10481070
:returns: Authorization URL Full Build
10491071
:rtype: str
10501072
"""
@@ -1056,7 +1078,12 @@ async def a_auth_url(
10561078
"state": state,
10571079
"nonce": nonce,
10581080
}
1059-
return URL_AUTH.format(**params_path)
1081+
url = URL_AUTH.format(**params_path)
1082+
if code_challenge:
1083+
url += f"&code_challenge={code_challenge}"
1084+
if code_challenge_method:
1085+
url += f"&code_challenge_method={code_challenge_method}"
1086+
return url
10601087

10611088
async def a_token(
10621089
self,
@@ -1067,6 +1094,7 @@ async def a_token(
10671094
redirect_uri: str = "",
10681095
totp: int | None = None,
10691096
scope: str = "openid",
1097+
code_verifier: str | None = None,
10701098
**extra: dict,
10711099
) -> dict:
10721100
"""
@@ -1093,6 +1121,8 @@ async def a_token(
10931121
:type totp: int
10941122
:param scope: Scope, defaults to openid
10951123
:type scope: str
1124+
:param code_verifier: PKCE code verifier
1125+
:type code_verifier: str
10961126
:param extra: Additional extra arguments
10971127
:type extra: dict
10981128
:returns: Keycloak token
@@ -1108,6 +1138,8 @@ async def a_token(
11081138
"redirect_uri": redirect_uri,
11091139
"scope": scope,
11101140
}
1141+
if code_verifier:
1142+
payload["code_verifier"] = code_verifier
11111143
if extra:
11121144
payload.update(extra)
11131145

src/keycloak/pkce_utils.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
#
2+
# The MIT License (MIT)
3+
#
4+
# Copyright (C) 2017 Marcos Pereira <marcospereira.mpj@gmail.com>
5+
#
6+
# Permission is hereby granted, free of charge, to any person obtaining a copy of
7+
# this software and associated documentation files (the "Software"), to deal in
8+
# the Software without restriction, including without limitation the rights to
9+
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
10+
# the Software, and to permit persons to whom the Software is furnished to do so,
11+
# subject to the following conditions:
12+
#
13+
# The above copyright notice and this permission notice shall be included in all
14+
# copies or substantial portions of the Software.
15+
#
16+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
18+
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
19+
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
20+
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
21+
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
22+
"""PKCE utility functions for code verifier and code challenge generation."""
23+
24+
import base64
25+
import hashlib
26+
import os
27+
28+
29+
def generate_code_verifier(length: int = 128) -> str:
30+
"""
31+
Generate a high-entropy cryptographic random string for PKCE code_verifier.
32+
33+
RFC 7636 recommends a length between 43 and 128 characters.
34+
"""
35+
return base64.urlsafe_b64encode(os.urandom(length)).rstrip(b"=").decode("utf-8")[:length]
36+
37+
38+
def generate_code_challenge(code_verifier: str, method: str = "S256") -> tuple[str, str]:
39+
"""
40+
Generate a code_challenge from the code_verifier using the specified method.
41+
42+
Supported methods: "S256" (default), "plain".
43+
Returns (code_challenge, code_challenge_method).
44+
"""
45+
if method == "S256":
46+
code_challenge = (
47+
base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode("utf-8")).digest())
48+
.rstrip(b"=")
49+
.decode("utf-8")
50+
)
51+
return code_challenge, "S256"
52+
if method == "plain":
53+
return code_verifier, "plain"
54+
error_msg = f"Unsupported PKCE method: {method}"
55+
raise ValueError(error_msg)

tests/test_pkce_flow.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
"""Tests for PKCE flow: code verifier and code challenge handling."""
2+
3+
import os
4+
import re
5+
import urllib.parse
6+
7+
import requests
8+
from packaging.version import Version
9+
10+
from keycloak import KeycloakAdmin, KeycloakOpenID
11+
from keycloak.pkce_utils import generate_code_challenge, generate_code_verifier
12+
13+
14+
def test_pkce_auth_url_and_token(env: object, admin: KeycloakAdmin) -> None:
15+
"""Test PKCE flow: auth_url includes code_challenge, token includes code_verifier."""
16+
if os.environ["KEYCLOAK_DOCKER_IMAGE_TAG"] != "latest" and Version(
17+
os.environ["KEYCLOAK_DOCKER_IMAGE_TAG"],
18+
) <= Version("22"):
19+
return
20+
21+
client_representation = {
22+
"clientId": "pkce-test",
23+
"enabled": True,
24+
"publicClient": True,
25+
"standardFlowEnabled": True,
26+
"directAccessGrantsEnabled": False,
27+
"serviceAccountsEnabled": False,
28+
"implicitFlowEnabled": False,
29+
"redirectUris": ["http://test.test/callback"],
30+
"webOrigins": ["*"],
31+
}
32+
admin.create_client(client_representation)
33+
34+
oid = KeycloakOpenID(
35+
server_url=f"http://{env.keycloak_host}:{env.keycloak_port}",
36+
realm_name="master",
37+
client_id="pkce-test",
38+
)
39+
code_verifier = generate_code_verifier()
40+
code_challenge, code_challenge_method = generate_code_challenge(code_verifier)
41+
42+
# Build PKCE auth URL
43+
url = oid.auth_url(
44+
redirect_uri="http://test.test/callback",
45+
code_challenge=code_challenge,
46+
code_challenge_method=code_challenge_method,
47+
scope="openid%20email",
48+
)
49+
assert f"code_challenge={code_challenge}" in url
50+
assert f"code_challenge_method={code_challenge_method}" in url
51+
52+
session = requests.Session()
53+
resp = session.get(url, allow_redirects=False)
54+
cookies = resp.cookies.get_dict()
55+
assert resp.status_code == 200
56+
resp_url = re.findall(r"action=\"(.*)\" method", resp.text)[0]
57+
resp = session.post(
58+
resp_url,
59+
data={"username": env.keycloak_admin, "password": env.keycloak_admin_password},
60+
allow_redirects=False,
61+
cookies=cookies,
62+
)
63+
assert resp.status_code == 302, resp.text
64+
resp_code = urllib.parse.parse_qs(resp.headers["Location"])["code"][0]
65+
66+
access_token = oid.token(
67+
grant_type="authorization_code",
68+
code=resp_code,
69+
redirect_uri="http://test.test/callback",
70+
code_verifier=code_verifier,
71+
)
72+
info = oid.userinfo(access_token["access_token"])
73+
assert info["preferred_username"] == env.keycloak_admin
74+
75+
# Cleanup
76+
client_id = admin.get_client_id("pkce-test")
77+
admin.delete_client(client_id)

0 commit comments

Comments
 (0)