Skip to content

Commit db0f47a

Browse files
committed
Add backend role-based access control
1 parent 13652b5 commit db0f47a

11 files changed

Lines changed: 208 additions & 12 deletions

File tree

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
"""Add user role
2+
3+
Revision ID: b3a8e8a9d5f2
4+
Revises: fe56fa70289e
5+
Create Date: 2026-05-05 19:15:00.000000
6+
7+
"""
8+
import sqlalchemy as sa
9+
from alembic import op
10+
11+
12+
# revision identifiers, used by Alembic.
13+
revision = "b3a8e8a9d5f2"
14+
down_revision = "fe56fa70289e"
15+
branch_labels = None
16+
depends_on = None
17+
18+
19+
def upgrade():
20+
op.add_column(
21+
"user",
22+
sa.Column("role", sa.String(length=20), nullable=False, server_default="member"),
23+
)
24+
op.execute("UPDATE \"user\" SET role = 'admin' WHERE is_superuser = true")
25+
26+
27+
def downgrade():
28+
op.drop_column("user", "role")

backend/app/api/deps.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from app.core import security
1212
from app.core.config import settings
1313
from app.core.db import engine
14-
from app.models import TokenPayload, User
14+
from app.models import TokenPayload, User, UserRole
1515

1616
reusable_oauth2 = OAuth2PasswordBearer(
1717
tokenUrl=f"{settings.API_V1_STR}/login/access-token"
@@ -49,8 +49,26 @@ def get_current_user(session: SessionDep, token: TokenDep) -> User:
4949
CurrentUser = Annotated[User, Depends(get_current_user)]
5050

5151

52+
def current_role(user: User) -> UserRole:
53+
if user.is_superuser:
54+
return UserRole.admin
55+
return UserRole(user.role)
56+
57+
58+
def require_roles(*allowed_roles: UserRole):
59+
def role_dependency(current_user: CurrentUser) -> User:
60+
if current_role(current_user) not in allowed_roles:
61+
raise HTTPException(
62+
status_code=status.HTTP_403_FORBIDDEN,
63+
detail="The user doesn't have enough privileges",
64+
)
65+
return current_user
66+
67+
return role_dependency
68+
69+
5270
def get_current_active_superuser(current_user: CurrentUser) -> User:
53-
if not current_user.is_superuser:
71+
if current_role(current_user) != UserRole.admin:
5472
raise HTTPException(
5573
status_code=403, detail="The user doesn't have enough privileges"
5674
)

backend/app/api/main.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
from fastapi import APIRouter
22

3-
from app.api.routes import items, login, private, users, utils
3+
from app.api.routes import items, login, metrics, private, users, utils
44
from app.core.config import settings
55

66
api_router = APIRouter()
77
api_router.include_router(login.router)
88
api_router.include_router(users.router)
99
api_router.include_router(utils.router)
1010
api_router.include_router(items.router)
11+
api_router.include_router(metrics.router)
1112

1213

1314
if settings.ENVIRONMENT == "local":

backend/app/api/routes/metrics.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from typing import Any
2+
3+
from fastapi import APIRouter, Depends
4+
from sqlmodel import func, select
5+
6+
from app.api.deps import SessionDep, require_roles
7+
from app.models import Item, User, UserRole
8+
9+
router = APIRouter(prefix="/metrics", tags=["metrics"])
10+
11+
12+
@router.get("/", dependencies=[Depends(require_roles(UserRole.admin, UserRole.manager))])
13+
def read_metrics(session: SessionDep) -> dict[str, Any]:
14+
"""
15+
Return simple operational metrics for privileged users.
16+
"""
17+
user_count = session.exec(select(func.count()).select_from(User)).one()
18+
item_count = session.exec(select(func.count()).select_from(Item)).one()
19+
return {
20+
"users": user_count,
21+
"items": item_count,
22+
"summary": "Demo insights visible to admins and managers.",
23+
}

backend/app/api/routes/users.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@
88
from app.api.deps import (
99
CurrentUser,
1010
SessionDep,
11+
current_role,
1112
get_current_active_superuser,
13+
require_roles,
1214
)
1315
from app.core.config import settings
1416
from app.core.security import get_password_hash, verify_password
@@ -20,6 +22,7 @@
2022
UserCreate,
2123
UserPublic,
2224
UserRegister,
25+
UserRole,
2326
UsersPublic,
2427
UserUpdate,
2528
UserUpdateMe,
@@ -31,7 +34,7 @@
3134

3235
@router.get(
3336
"/",
34-
dependencies=[Depends(get_current_active_superuser)],
37+
dependencies=[Depends(require_roles(UserRole.admin, UserRole.manager))],
3538
response_model=UsersPublic,
3639
)
3740
def read_users(session: SessionDep, skip: int = 0, limit: int = 100) -> Any:
@@ -169,7 +172,7 @@ def read_user_by_id(
169172
user = session.get(User, user_id)
170173
if user == current_user:
171174
return user
172-
if not current_user.is_superuser:
175+
if current_role(current_user) not in {UserRole.admin, UserRole.manager}:
173176
raise HTTPException(
174177
status_code=403,
175178
detail="The user doesn't have enough privileges",

backend/app/core/db.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from app import crud
44
from app.core.config import settings
5-
from app.models import User, UserCreate
5+
from app.models import User, UserCreate, UserRole
66

77
engine = create_engine(str(settings.SQLALCHEMY_DATABASE_URI))
88

@@ -29,5 +29,26 @@ def init_db(session: Session) -> None:
2929
email=settings.FIRST_SUPERUSER,
3030
password=settings.FIRST_SUPERUSER_PASSWORD,
3131
is_superuser=True,
32+
role=UserRole.admin,
3233
)
3334
user = crud.create_user(session=session, user_create=user_in)
35+
elif user.role != UserRole.admin:
36+
user.role = UserRole.admin
37+
user.is_superuser = True
38+
session.add(user)
39+
session.commit()
40+
41+
seed_users = [
42+
("manager@example.com", "changethis", UserRole.manager, "Demo Manager"),
43+
("member@example.com", "changethis", UserRole.member, "Demo Member"),
44+
]
45+
for email, password, role, full_name in seed_users:
46+
existing_user = session.exec(select(User).where(User.email == email)).first()
47+
if not existing_user:
48+
user_in = UserCreate(
49+
email=email,
50+
password=password,
51+
role=role,
52+
full_name=full_name,
53+
)
54+
crud.create_user(session=session, user_create=user_in)

backend/app/crud.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,18 @@
44
from sqlmodel import Session, select
55

66
from app.core.security import get_password_hash, verify_password
7-
from app.models import Item, ItemCreate, User, UserCreate, UserUpdate
7+
from app.models import Item, ItemCreate, User, UserCreate, UserRole, UserUpdate
88

99

1010
def create_user(*, session: Session, user_create: UserCreate) -> User:
11+
role = UserRole.admin if user_create.is_superuser else user_create.role
1112
db_obj = User.model_validate(
12-
user_create, update={"hashed_password": get_password_hash(user_create.password)}
13+
user_create,
14+
update={
15+
"hashed_password": get_password_hash(user_create.password),
16+
"role": role,
17+
"is_superuser": role == UserRole.admin,
18+
},
1319
)
1420
session.add(db_obj)
1521
session.commit()
@@ -24,6 +30,10 @@ def update_user(*, session: Session, db_user: User, user_in: UserUpdate) -> Any:
2430
password = user_data["password"]
2531
hashed_password = get_password_hash(password)
2632
extra_data["hashed_password"] = hashed_password
33+
if user_data.get("role") == UserRole.admin:
34+
extra_data["is_superuser"] = True
35+
elif user_data.get("role") in {UserRole.manager, UserRole.member}:
36+
extra_data["is_superuser"] = False
2737
db_user.sqlmodel_update(user_data, update=extra_data)
2838
session.add(db_user)
2939
session.commit()

backend/app/models.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,33 @@
11
import uuid
22
from datetime import datetime, timezone
3+
from enum import Enum
34

45
from pydantic import EmailStr
5-
from sqlalchemy import DateTime
6+
from sqlalchemy import Column, DateTime, String
67
from sqlmodel import Field, Relationship, SQLModel
78

89

910
def get_datetime_utc() -> datetime:
1011
return datetime.now(timezone.utc)
1112

1213

14+
class UserRole(str, Enum):
15+
admin = "admin"
16+
manager = "manager"
17+
member = "member"
18+
19+
1320
# Shared properties
1421
class UserBase(SQLModel):
1522
email: EmailStr = Field(unique=True, index=True, max_length=255)
1623
is_active: bool = True
1724
is_superuser: bool = False
25+
role: UserRole = Field(
26+
default=UserRole.member,
27+
sa_column=Column(
28+
String(20), nullable=False, server_default=UserRole.member.value
29+
),
30+
)
1831
full_name: str | None = Field(default=None, max_length=255)
1932

2033

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
from fastapi.testclient import TestClient
2+
from sqlmodel import Session
3+
4+
from app.core.config import settings
5+
from app.models import UserRole
6+
from tests.utils.user import create_user_with_role, user_authentication_headers
7+
8+
9+
def test_manager_can_read_metrics(client: TestClient, db: Session) -> None:
10+
manager, password = create_user_with_role(db=db, role=UserRole.manager)
11+
headers = user_authentication_headers(
12+
client=client, email=manager.email, password=password
13+
)
14+
15+
r = client.get(f"{settings.API_V1_STR}/metrics/", headers=headers)
16+
17+
assert r.status_code == 200
18+
assert "users" in r.json()
19+
assert "items" in r.json()
20+
21+
22+
def test_member_cannot_read_metrics(
23+
client: TestClient, normal_user_token_headers: dict[str, str]
24+
) -> None:
25+
r = client.get(f"{settings.API_V1_STR}/metrics/", headers=normal_user_token_headers)
26+
27+
assert r.status_code == 403
28+
assert r.json()["detail"] == "The user doesn't have enough privileges"

backend/tests/api/routes/test_users.py

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,12 @@
77
from app import crud
88
from app.core.config import settings
99
from app.core.security import verify_password
10-
from app.models import User, UserCreate
11-
from tests.utils.user import create_random_user
10+
from app.models import User, UserCreate, UserRole
11+
from tests.utils.user import (
12+
create_random_user,
13+
create_user_with_role,
14+
user_authentication_headers,
15+
)
1216
from tests.utils.utils import random_email, random_lower_string
1317

1418

@@ -519,3 +523,36 @@ def test_delete_user_without_privileges(
519523
)
520524
assert r.status_code == 403
521525
assert r.json()["detail"] == "The user doesn't have enough privileges"
526+
527+
528+
def test_manager_can_retrieve_users(client: TestClient, db: Session) -> None:
529+
manager, password = create_user_with_role(db=db, role=UserRole.manager)
530+
headers = user_authentication_headers(
531+
client=client, email=manager.email, password=password
532+
)
533+
534+
r = client.get(f"{settings.API_V1_STR}/users/", headers=headers)
535+
536+
assert r.status_code == 200
537+
assert "data" in r.json()
538+
539+
540+
def test_member_cannot_retrieve_users(
541+
client: TestClient, normal_user_token_headers: dict[str, str]
542+
) -> None:
543+
r = client.get(f"{settings.API_V1_STR}/users/", headers=normal_user_token_headers)
544+
545+
assert r.status_code == 403
546+
assert r.json()["detail"] == "The user doesn't have enough privileges"
547+
548+
549+
def test_manager_cannot_create_user(client: TestClient, db: Session) -> None:
550+
manager, password = create_user_with_role(db=db, role=UserRole.manager)
551+
headers = user_authentication_headers(
552+
client=client, email=manager.email, password=password
553+
)
554+
data = {"email": random_email(), "password": random_lower_string()}
555+
556+
r = client.post(f"{settings.API_V1_STR}/users/", headers=headers, json=data)
557+
558+
assert r.status_code == 403

0 commit comments

Comments
 (0)