domain.protocols.authorization_protocol¶
src.domain.protocols.authorization_protocol
¶
Authorization protocol (port) for RBAC access control.
This protocol defines the contract for authorization systems. Infrastructure adapters implement this protocol to provide concrete authorization (Casbin, custom RBAC, etc.).
Following hexagonal architecture: - Domain defines the PORT (this protocol) - Infrastructure provides ADAPTERS (CasbinAdapter) - Application layer uses the protocol (doesn't know about Casbin)
Reference
- docs/architecture/authorization-architecture.md
Usage
from src.domain.protocols import AuthorizationProtocol
Dependency injection via container¶
authz: AuthorizationProtocol = Depends(get_authorization)
Check permission¶
allowed = await authz.check_permission( user_id=user.id, resource="accounts", action="write", )
Get user roles¶
roles = await authz.get_roles_for_user(user.id)
Assign role¶
await authz.assign_role(user.id, "admin", assigned_by=admin.id)
Classes¶
AuthorizationProtocol
¶
Bases: Protocol
Protocol for authorization systems.
Provides RBAC (Role-Based Access Control) operations including permission checks, role management, and role queries.
Implementations
- CasbinAdapter: Production (Casbin + PostgreSQL)
- InMemoryAuthorizationAdapter: Testing
Security
- All permission checks are audited
- Role changes emit domain events
- Cache invalidation on role changes
Error Handling
Permission checks return bool (fail-closed design). Role operations return bool (success/failure). Exceptions are logged but don't bubble up (fail-closed).
Source code in src/domain/protocols/authorization_protocol.py
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 | |
Functions¶
check_permission
async
¶
Check if user has permission for resource/action.
Core authorization method. Checks Casbin policies including role inheritance. Results are cached for performance.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
user_id
|
UUID
|
User's UUID. |
required |
resource
|
str
|
Resource name (accounts, transactions, users, etc.). |
required |
action
|
str
|
Action name (read, write). |
required |
Returns:
| Name | Type | Description |
|---|---|---|
bool |
bool
|
True if allowed, False if denied. |
Side Effects
- Audits the authorization check (ACCESS_GRANTED/ACCESS_DENIED)
- Caches result in Redis (5 min TTL)
Example
allowed = await authz.check_permission( user_id=user.id, resource="accounts", action="write", ) if not allowed: raise HTTPException(403, "Permission denied")
Source code in src/domain/protocols/authorization_protocol.py
get_roles_for_user
async
¶
Get all roles assigned to user (including inherited).
Returns the role hierarchy. For example, if user has 'admin' role, returns ['admin', 'user', 'readonly'] due to inheritance.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
user_id
|
UUID
|
User's UUID. |
required |
Returns:
| Type | Description |
|---|---|
list[str]
|
list[str]: List of role names. Empty if user has no roles. |
Example
roles = await authz.get_roles_for_user(user.id)
['admin', 'user', 'readonly'] for admin user¶
Source code in src/domain/protocols/authorization_protocol.py
has_role
async
¶
Check if user has specific role (including inherited).
Considers role hierarchy. Admin inherits user, user inherits readonly.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
user_id
|
UUID
|
User's UUID. |
required |
role
|
str
|
Role name to check (admin, user, readonly). |
required |
Returns:
| Name | Type | Description |
|---|---|---|
bool |
bool
|
True if user has role, False otherwise. |
Example
Admin has all roles due to inheritance¶
await authz.has_role(admin_user.id, "readonly") # True await authz.has_role(admin_user.id, "admin") # True
Source code in src/domain/protocols/authorization_protocol.py
assign_role
async
¶
Assign role to user.
Adds user to role. Emits domain events and invalidates permission cache.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
user_id
|
UUID
|
User's UUID to assign role to. |
required |
role
|
str
|
Role name to assign (admin, user, readonly). |
required |
assigned_by
|
UUID
|
UUID of user performing the assignment (for audit). |
required |
Returns:
| Name | Type | Description |
|---|---|---|
bool |
bool
|
True if role assigned, False if user already had role. |
Side Effects
- Emits RoleAssignmentAttempted event (before)
- Emits RoleAssignmentSucceeded/Failed event (after)
- Invalidates user's permission cache
- Audits the role assignment
Example
success = await authz.assign_role( user_id=target_user.id, role="admin", assigned_by=admin.id, )
Source code in src/domain/protocols/authorization_protocol.py
revoke_role
async
¶
Revoke role from user.
Removes user from role. Emits domain events and invalidates cache.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
user_id
|
UUID
|
User's UUID to revoke role from. |
required |
role
|
str
|
Role name to revoke (admin, user, readonly). |
required |
revoked_by
|
UUID
|
UUID of user performing the revocation (for audit). |
required |
reason
|
str | None
|
Optional reason for revocation (for audit trail). |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
bool |
bool
|
True if role revoked, False if user didn't have role. |
Side Effects
- Emits RoleRevocationAttempted event (before)
- Emits RoleRevocationSucceeded/Failed event (after)
- Invalidates user's permission cache
- Audits the role revocation
- May revoke sessions if admin role removed
Example
success = await authz.revoke_role( user_id=target_user.id, role="admin", revoked_by=admin.id, reason="User left admin team", )
Source code in src/domain/protocols/authorization_protocol.py
get_permissions_for_role
async
¶
Get all permissions for a role.
Returns direct permissions (not inherited). Useful for admin UI.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
role
|
str
|
Role name (admin, user, readonly). |
required |
Returns:
| Type | Description |
|---|---|
list[tuple[str, str]]
|
list[tuple[str, str]]: List of (resource, action) tuples. |
Example
perms = await authz.get_permissions_for_role("user")