Skip to content

Commit 5b68e1f

Browse files
authored
feat: implement client for revoking consents/offline access, with async and improved testing (#644)
1 parent f553ac0 commit 5b68e1f

3 files changed

Lines changed: 154 additions & 1 deletion

File tree

src/keycloak/keycloak_admin.py

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1524,6 +1524,30 @@ def user_consents(self, user_id: str) -> list:
15241524
)
15251525
return raise_error_from_response(data_raw, KeycloakGetError)
15261526

1527+
def revoke_consent(self, user_id: str, client_id: str) -> dict | bytes:
1528+
"""
1529+
Revoke consent and offline tokens for particular client from user.
1530+
1531+
:param user_id: User id
1532+
:type user_id: str
1533+
:param client_id: Client id
1534+
:type client_id: str
1535+
1536+
"""
1537+
params_path = {
1538+
"realm-name": self.connection.realm_name,
1539+
"id": user_id,
1540+
"client-id": client_id,
1541+
}
1542+
data_raw = self.connection.raw_delete(
1543+
urls_patterns.URL_ADMIN_USER_CONSENT.format(**params_path),
1544+
)
1545+
return raise_error_from_response(
1546+
data_raw,
1547+
KeycloakDeleteError,
1548+
expected_codes=[HTTP_NO_CONTENT],
1549+
)
1550+
15271551
def get_user_social_logins(self, user_id: str) -> list:
15281552
"""
15291553
Get user social logins.
@@ -6742,7 +6766,7 @@ async def a_user_logout(self, user_id: str) -> bytes:
67426766

67436767
async def a_user_consents(self, user_id: str) -> list:
67446768
"""
6745-
Get consents granted asynchronously by the user.
6769+
Asynchronously get consents granted by the user.
67466770
67476771
UserConsentRepresentation
67486772
https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_userconsentrepresentation
@@ -6758,6 +6782,30 @@ async def a_user_consents(self, user_id: str) -> list:
67586782
)
67596783
return raise_error_from_response(data_raw, KeycloakGetError)
67606784

6785+
async def a_revoke_consent(self, user_id: str, client_id: str) -> dict | bytes:
6786+
"""
6787+
Asynchronously revoke consent and offline tokens for particular client from user.
6788+
6789+
:param user_id: User id
6790+
:type user_id: str
6791+
:param client_id: Client id
6792+
:type client_id: str
6793+
6794+
"""
6795+
params_path = {
6796+
"realm-name": self.connection.realm_name,
6797+
"id": user_id,
6798+
"client-id": client_id,
6799+
}
6800+
data_raw = await self.connection.a_raw_delete(
6801+
urls_patterns.URL_ADMIN_USER_CONSENT.format(**params_path),
6802+
)
6803+
return raise_error_from_response(
6804+
data_raw,
6805+
KeycloakDeleteError,
6806+
expected_codes=[HTTP_NO_CONTENT],
6807+
)
6808+
67616809
async def a_get_user_social_logins(self, user_id: str) -> list:
67626810
"""
67636811
Get user social logins asynchronously.

src/keycloak/urls_patterns.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
URL_ADMIN_USERS_COUNT = "admin/realms/{realm-name}/users/count"
4747
URL_ADMIN_USER = "admin/realms/{realm-name}/users/{id}"
4848
URL_ADMIN_USER_CONSENTS = "admin/realms/{realm-name}/users/{id}/consents"
49+
URL_ADMIN_USER_CONSENT = URL_ADMIN_USER_CONSENTS + "/{client-id}"
4950
URL_ADMIN_SEND_UPDATE_ACCOUNT = "admin/realms/{realm-name}/users/{id}/execute-actions-email"
5051
URL_ADMIN_SEND_VERIFY_EMAIL = "admin/realms/{realm-name}/users/{id}/send-verify-email"
5152
URL_ADMIN_RESET_PASSWORD = "admin/realms/{realm-name}/users/{id}/reset-password" # noqa: S105

tests/test_keycloak_admin.py

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
CLIENT_NOT_FOUND_REGEX = '404: b\'{"error":"Client not found".*}\''
3333
CLIENT_SCOPE_NOT_FOUND_REGEX = '404: b\'{"error":"Client scope not found".*}\''
34+
CONSENT_NOT_FOUND_REGEX = '404: b\'{"error":"Consent nor offline token not found".*}\''
3435
COULD_NOT_FIND_ROLE_REGEX = '404: b\'{"error":"Could not find role".*}\''
3536
COULD_NOT_FIND_ROLE_WITH_ID_REGEX = '404: b\'{"error":"Could not find role with id".*}\''
3637
HTTP_404_REGEX = '404: b\'{"error":"HTTP 404 Not Found".*}\''
@@ -3548,6 +3549,57 @@ def test_refresh_token(admin: KeycloakAdmin) -> None:
35483549
admin.connection.refresh_token()
35493550

35503551

3552+
def test_consents(
3553+
admin: KeycloakAdmin, oid_with_credentials: tuple[KeycloakOpenID, str, str]
3554+
) -> None:
3555+
"""
3556+
Test getting and revoking offline access via the consents API.
3557+
3558+
:param admin: Keycloak admin
3559+
:type admin: KeycloakAdmin
3560+
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
3561+
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
3562+
"""
3563+
oid, username, password = oid_with_credentials
3564+
3565+
# Use offline access as ersatz consent
3566+
offline_token = oid.token(username, password, scope="offline_access")
3567+
decoded_access_token = oid.decode_token(token=offline_token["access_token"])
3568+
user_id = decoded_access_token["sub"]
3569+
3570+
# Test get consents/offline access
3571+
res = admin.user_consents(user_id=user_id)
3572+
assert len(res) == 1, res
3573+
assert "additionalGrants" in res[0], res[0]
3574+
assert res[0]["additionalGrants"][0].get("key") == "Offline Token", res[0]
3575+
3576+
# Test get consents fail
3577+
with pytest.raises(KeycloakGetError) as err:
3578+
admin.user_consents(user_id="non-existent-id")
3579+
assert err.match(USER_NOT_FOUND_REGEX)
3580+
3581+
# Test revoke fails
3582+
with pytest.raises(KeycloakDeleteError) as err:
3583+
admin.revoke_consent(user_id="non-existent-id", client_id=oid.client_id)
3584+
assert err.match(USER_NOT_FOUND_REGEX)
3585+
3586+
with pytest.raises(KeycloakDeleteError) as err:
3587+
admin.revoke_consent(user_id=user_id, client_id="non-existent-client")
3588+
assert err.match(CLIENT_NOT_FOUND_REGEX)
3589+
3590+
# Test revoke offline access
3591+
res = admin.revoke_consent(user_id=user_id, client_id=oid.client_id)
3592+
assert res == {}, res
3593+
3594+
res = admin.user_consents(user_id=user_id)
3595+
assert len(res) == 0, res
3596+
3597+
# Test re-revoke fails
3598+
with pytest.raises(KeycloakDeleteError) as err:
3599+
admin.revoke_consent(user_id=user_id, client_id=oid.client_id)
3600+
assert err.match(CONSENT_NOT_FOUND_REGEX)
3601+
3602+
35513603
# async function start
35523604

35533605

@@ -7199,6 +7251,58 @@ async def test_a_refresh_token(admin: KeycloakAdmin) -> None:
71997251
admin.connection.refresh_token()
72007252

72017253

7254+
@pytest.mark.asyncio
7255+
async def test_a_consents(
7256+
admin: KeycloakAdmin, oid_with_credentials: tuple[KeycloakOpenID, str, str]
7257+
) -> None:
7258+
"""
7259+
Test getting and revoking offline access via the consents API.
7260+
7261+
:param admin: Keycloak admin
7262+
:type admin: KeycloakAdmin
7263+
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
7264+
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
7265+
"""
7266+
oid, username, password = oid_with_credentials
7267+
7268+
# Use offline access as ersatz consent
7269+
offline_token = await oid.a_token(username, password, scope="offline_access")
7270+
decoded_access_token = await oid.a_decode_token(token=offline_token["access_token"])
7271+
user_id = decoded_access_token["sub"]
7272+
7273+
# Test get consents/offline access
7274+
res = await admin.a_user_consents(user_id=user_id)
7275+
assert len(res) == 1, res
7276+
assert "additionalGrants" in res[0], res[0]
7277+
assert res[0]["additionalGrants"][0].get("key") == "Offline Token", res[0]
7278+
7279+
# Test get consents fail
7280+
with pytest.raises(KeycloakGetError) as err:
7281+
await admin.a_user_consents(user_id="non-existent-id")
7282+
assert err.match(USER_NOT_FOUND_REGEX)
7283+
7284+
# Test revoke fails
7285+
with pytest.raises(KeycloakDeleteError) as err:
7286+
await admin.a_revoke_consent(user_id="non-existent-id", client_id=oid.client_id)
7287+
assert err.match(USER_NOT_FOUND_REGEX)
7288+
7289+
with pytest.raises(KeycloakDeleteError) as err:
7290+
await admin.a_revoke_consent(user_id=user_id, client_id="non-existent-client")
7291+
assert err.match(CLIENT_NOT_FOUND_REGEX)
7292+
7293+
# Test revoke offline access
7294+
res = await admin.a_revoke_consent(user_id=user_id, client_id=oid.client_id)
7295+
assert res == {}, res
7296+
7297+
res = await admin.a_user_consents(user_id=user_id)
7298+
assert len(res) == 0, res
7299+
7300+
# Test re-revoke fails
7301+
with pytest.raises(KeycloakDeleteError) as err:
7302+
await admin.a_revoke_consent(user_id=user_id, client_id=oid.client_id)
7303+
assert err.match(CONSENT_NOT_FOUND_REGEX)
7304+
7305+
72027306
def test_counter_part() -> None:
72037307
"""Test that each function has its async counter part."""
72047308
admin_methods = [func for func in dir(KeycloakAdmin) if callable(getattr(KeycloakAdmin, func))]

0 commit comments

Comments
 (0)