initial commit

This commit is contained in:
2025-10-19 22:09:35 +03:00
commit 6d593b4554
114 changed files with 23622 additions and 0 deletions

View File

@ -0,0 +1,198 @@
# Authentication & Security
Learn how to implement secure authentication in your FastAPI application. The boilerplate provides a complete JWT-based authentication system with user management, permissions, and security best practices.
## What You'll Learn
- **[JWT Tokens](jwt-tokens.md)** - Understand access and refresh token management
- **[User Management](user-management.md)** - Handle registration, login, and user profiles
- **[Permissions](permissions.md)** - Implement role-based access control and authorization
## Authentication Overview
The system uses JWT tokens with refresh token rotation for secure, stateless authentication:
```python
# Basic login flow
@router.post("/login", response_model=Token)
async def login_for_access_token(response: Response, form_data: OAuth2PasswordRequestForm):
user = await authenticate_user(form_data.username, form_data.password, db)
access_token = await create_access_token(data={"sub": user["username"]})
refresh_token = await create_refresh_token(data={"sub": user["username"]})
# Set secure HTTP-only cookie for refresh token
response.set_cookie("refresh_token", refresh_token, httponly=True, secure=True)
return {"access_token": access_token, "token_type": "bearer"}
```
## Key Features
### JWT Token System
- **Access tokens**: Short-lived (30 minutes), for API requests
- **Refresh tokens**: Long-lived (7 days), stored in secure cookies
- **Token blacklisting**: Secure logout implementation
- **Automatic expiration**: Built-in token lifecycle management
### User Management
- **Flexible authentication**: Username or email login
- **Secure passwords**: bcrypt hashing with salt
- **Profile management**: Complete user CRUD operations
- **Soft delete**: User deactivation without data loss
### Permission System
- **Superuser privileges**: Administrative access control
- **Resource ownership**: User-specific data access
- **User tiers**: Subscription-based feature access
- **Rate limiting**: Per-user and per-tier API limits
## Authentication Patterns
### Endpoint Protection
```python
# Required authentication
@router.get("/protected")
async def protected_endpoint(current_user: dict = Depends(get_current_user)):
return {"message": f"Hello {current_user['username']}"}
# Optional authentication
@router.get("/public")
async def public_endpoint(user: dict | None = Depends(get_optional_user)):
if user:
return {"premium_content": True}
return {"premium_content": False}
# Superuser only
@router.get("/admin", dependencies=[Depends(get_current_superuser)])
async def admin_endpoint():
return {"admin_data": "sensitive"}
```
### Resource Ownership
```python
@router.patch("/posts/{post_id}")
async def update_post(post_id: int, current_user: dict = Depends(get_current_user)):
post = await crud_posts.get(db=db, id=post_id)
# Check ownership or admin privileges
if post["created_by_user_id"] != current_user["id"] and not current_user["is_superuser"]:
raise ForbiddenException("Cannot update other users' posts")
return await crud_posts.update(db=db, id=post_id, object=updates)
```
## Security Features
### Token Security
- Short-lived access tokens limit exposure
- HTTP-only refresh token cookies prevent XSS
- Token blacklisting enables secure logout
- Configurable token expiration times
### Password Security
- bcrypt hashing with automatic salt generation
- Configurable password complexity requirements
- No plain text passwords stored anywhere
- Rate limiting on authentication endpoints
### API Protection
- CORS policies for cross-origin request control
- Rate limiting prevents brute force attacks
- Input validation prevents injection attacks
- Consistent error messages prevent information disclosure
## Configuration
### JWT Settings
```env
SECRET_KEY="your-super-secret-key-here"
ALGORITHM="HS256"
ACCESS_TOKEN_EXPIRE_MINUTES=30
REFRESH_TOKEN_EXPIRE_DAYS=7
```
### Security Settings
```env
# Cookie security
COOKIE_SECURE=true
COOKIE_SAMESITE="lax"
# Password requirements
PASSWORD_MIN_LENGTH=8
ENABLE_PASSWORD_COMPLEXITY=true
```
## Getting Started
Follow this progressive learning path:
### 1. **[JWT Tokens](jwt-tokens.md)** - Foundation
Understand how JWT tokens work, including access and refresh token management, verification, and blacklisting.
### 2. **[User Management](user-management.md)** - Core Features
Implement user registration, login, profile management, and administrative operations.
### 3. **[Permissions](permissions.md)** - Access Control
Set up role-based access control, resource ownership checking, and tier-based permissions.
## Implementation Examples
### Quick Authentication Setup
```python
# Protect an endpoint
@router.get("/my-data")
async def get_my_data(current_user: dict = Depends(get_current_user)):
return await get_user_specific_data(current_user["id"])
# Check user permissions
def check_tier_access(user: dict, required_tier: str):
if not user.get("tier") or user["tier"]["name"] != required_tier:
raise ForbiddenException(f"Requires {required_tier} tier")
# Custom authentication dependency
async def get_premium_user(current_user: dict = Depends(get_current_user)):
check_tier_access(current_user, "Pro")
return current_user
```
### Frontend Integration
```javascript
// Basic authentication flow
class AuthManager {
async login(username, password) {
const response = await fetch('/api/v1/login', {
method: 'POST',
headers: {'Content-Type': 'application/x-www-form-urlencoded'},
body: new URLSearchParams({username, password})
});
const tokens = await response.json();
localStorage.setItem('access_token', tokens.access_token);
return tokens;
}
async makeAuthenticatedRequest(url, options = {}) {
const token = localStorage.getItem('access_token');
return fetch(url, {
...options,
headers: {
...options.headers,
'Authorization': `Bearer ${token}`
}
});
}
}
```
## What's Next
Start building your authentication system:
1. **[JWT Tokens](jwt-tokens.md)** - Learn token creation, verification, and lifecycle management
2. **[User Management](user-management.md)** - Implement registration, login, and profile operations
3. **[Permissions](permissions.md)** - Add authorization patterns and access control
The authentication system provides a secure foundation for your API. Each guide includes practical examples and implementation details for production-ready authentication.

View File

@ -0,0 +1,669 @@
# JWT Tokens
JSON Web Tokens (JWT) form the backbone of modern web authentication. This comprehensive guide explains how the boilerplate implements a secure, stateless authentication system using access and refresh tokens.
## Understanding JWT Authentication
JWT tokens are self-contained, digitally signed packages of information that can be safely transmitted between parties. Unlike traditional session-based authentication that requires server-side storage, JWT tokens are stateless - all the information needed to verify a user's identity is contained within the token itself.
### Why Use JWT?
**Stateless Design**: No need to store session data on the server, making it perfect for distributed systems and microservices.
**Scalability**: Since tokens contain all necessary information, they work seamlessly across multiple servers without shared session storage.
**Security**: Digital signatures ensure tokens can't be tampered with, and expiration times limit exposure if compromised.
**Cross-Domain Support**: Unlike cookies, JWT tokens work across different domains and can be used in mobile applications.
## Token Types
The authentication system uses a **dual-token approach** for maximum security and user experience:
### Access Tokens
Access tokens are short-lived credentials that prove a user's identity for API requests. Think of them as temporary keys that grant access to protected resources.
- **Purpose**: Authenticate API requests and authorize actions
- **Lifetime**: 30 minutes (configurable) - short enough to limit damage if compromised
- **Storage**: Authorization header (`Bearer <token>`) - sent with each API request
- **Usage**: Include in every call to protected endpoints
**Why Short-Lived?** If an access token is stolen (e.g., through XSS), the damage window is limited to 30 minutes before it expires naturally.
### Refresh Tokens
Refresh tokens are longer-lived credentials used solely to generate new access tokens. They provide a balance between security and user convenience.
- **Purpose**: Generate new access tokens without requiring re-login
- **Lifetime**: 7 days (configurable) - long enough for good UX, short enough for security
- **Storage**: Secure HTTP-only cookie - inaccessible to JavaScript, preventing XSS attacks
- **Usage**: Automatically used by the browser when access tokens need refreshing
**Why HTTP-Only Cookies?** This prevents malicious JavaScript from accessing refresh tokens, providing protection against XSS attacks while allowing automatic renewal.
## Token Creation
Understanding how tokens are created helps you customize the authentication system for your specific needs.
### Creating Access Tokens
Access tokens are generated during login and token refresh operations. The process involves encoding user information with an expiration time and signing it with your secret key.
```python
from datetime import timedelta
from app.core.security import create_access_token, ACCESS_TOKEN_EXPIRE_MINUTES
# Basic access token with default expiration
access_token = await create_access_token(data={"sub": username})
# Custom expiration for special cases (e.g., admin sessions)
custom_expires = timedelta(minutes=60)
access_token = await create_access_token(
data={"sub": username},
expires_delta=custom_expires
)
```
**When to Customize Expiration:**
- **High-security environments**: Shorter expiration (15 minutes)
- **Development/testing**: Longer expiration for convenience
- **Admin operations**: Variable expiration based on sensitivity
### Creating Refresh Tokens
Refresh tokens follow the same creation pattern but with longer expiration times. They're typically created only during login.
```python
from app.core.security import create_refresh_token, REFRESH_TOKEN_EXPIRE_DAYS
# Standard refresh token
refresh_token = await create_refresh_token(data={"sub": username})
# Extended refresh token for "remember me" functionality
extended_expires = timedelta(days=30)
refresh_token = await create_refresh_token(
data={"sub": username},
expires_delta=extended_expires
)
```
### Token Structure
JWT tokens consist of three parts separated by dots: `header.payload.signature`. The payload contains the actual user information and metadata.
```python
# Access token payload structure
{
"sub": "username", # Subject (user identifier)
"exp": 1234567890, # Expiration timestamp (Unix)
"token_type": "access", # Distinguishes from refresh tokens
"iat": 1234567890 # Issued at (automatic)
}
# Refresh token payload structure
{
"sub": "username", # Same user identifier
"exp": 1234567890, # Longer expiration time
"token_type": "refresh", # Prevents confusion/misuse
"iat": 1234567890 # Issue timestamp
}
```
**Key Fields Explained:**
- **`sub` (Subject)**: Identifies the user - can be username, email, or user ID
- **`exp` (Expiration)**: Unix timestamp when token becomes invalid
- **`token_type`**: Custom field preventing tokens from being used incorrectly
- **`iat` (Issued At)**: Useful for token rotation and audit trails
## Token Verification
Token verification is a multi-step process that ensures both the token's authenticity and the user's current authorization status.
### Verifying Access Tokens
Every protected endpoint must verify the access token before processing the request. This involves checking the signature, expiration, and blacklist status.
```python
from app.core.security import verify_token, TokenType
# Verify access token in endpoint
token_data = await verify_token(token, TokenType.ACCESS, db)
if token_data:
username = token_data.username_or_email
# Token is valid, proceed with request processing
else:
# Token is invalid, expired, or blacklisted
raise UnauthorizedException("Invalid or expired token")
```
### Verifying Refresh Tokens
Refresh token verification follows the same process but with different validation rules and outcomes.
```python
# Verify refresh token for renewal
token_data = await verify_token(token, TokenType.REFRESH, db)
if token_data:
# Generate new access token
new_access_token = await create_access_token(
data={"sub": token_data.username_or_email}
)
return {"access_token": new_access_token, "token_type": "bearer"}
else:
# Refresh token invalid - user must log in again
raise UnauthorizedException("Invalid refresh token")
```
### Token Verification Process
The verification process includes several security checks to prevent various attack vectors:
```python
async def verify_token(token: str, expected_token_type: TokenType, db: AsyncSession) -> TokenData | None:
# 1. Check blacklist first (prevents use of logged-out tokens)
is_blacklisted = await crud_token_blacklist.exists(db, token=token)
if is_blacklisted:
return None
try:
# 2. Verify signature and decode payload
payload = jwt.decode(token, SECRET_KEY.get_secret_value(), algorithms=[ALGORITHM])
# 3. Extract and validate claims
username_or_email: str | None = payload.get("sub")
token_type: str | None = payload.get("token_type")
# 4. Ensure token type matches expectation
if username_or_email is None or token_type != expected_token_type:
return None
# 5. Return validated data
return TokenData(username_or_email=username_or_email)
except JWTError:
# Token is malformed, expired, or signature invalid
return None
```
**Security Checks Explained:**
1. **Blacklist Check**: Prevents use of tokens from logged-out users
2. **Signature Verification**: Ensures token hasn't been tampered with
3. **Expiration Check**: Automatically handled by JWT library
4. **Type Validation**: Prevents refresh tokens from being used as access tokens
5. **Subject Validation**: Ensures token contains valid user identifier
## Client-Side Authentication Flow
Understanding the complete authentication flow helps frontend developers integrate properly with the API.
### Recommended Client Flow
**1. Login Process**
```javascript
// Send credentials to login endpoint
const response = await fetch('/api/v1/login', {
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: 'username=user&password=pass',
credentials: 'include' // Important: includes cookies
});
const { access_token, token_type } = await response.json();
// Store access token in memory (not localStorage)
sessionStorage.setItem('access_token', access_token);
```
**2. Making Authenticated Requests**
```javascript
// Include access token in Authorization header
const response = await fetch('/api/v1/protected-endpoint', {
headers: {
'Authorization': `Bearer ${sessionStorage.getItem('access_token')}`
},
credentials: 'include'
});
```
**3. Handling Token Expiration**
```javascript
// Automatic token refresh on 401 errors
async function apiCall(url, options = {}) {
let response = await fetch(url, {
...options,
headers: {
...options.headers,
'Authorization': `Bearer ${sessionStorage.getItem('access_token')}`
},
credentials: 'include'
});
// If token expired, try to refresh
if (response.status === 401) {
const refreshResponse = await fetch('/api/v1/refresh', {
method: 'POST',
credentials: 'include' // Sends refresh token cookie
});
if (refreshResponse.ok) {
const { access_token } = await refreshResponse.json();
sessionStorage.setItem('access_token', access_token);
// Retry original request
response = await fetch(url, {
...options,
headers: {
...options.headers,
'Authorization': `Bearer ${access_token}`
},
credentials: 'include'
});
} else {
// Refresh failed - redirect to login
window.location.href = '/login';
}
}
return response;
}
```
**4. Logout Process**
```javascript
// Clear tokens and call logout endpoint
await fetch('/api/v1/logout', {
method: 'POST',
credentials: 'include'
});
sessionStorage.removeItem('access_token');
// Refresh token cookie is cleared by server
```
### Cookie Configuration
The refresh token cookie is configured for maximum security:
```python
response.set_cookie(
key="refresh_token",
value=refresh_token,
httponly=True, # Prevents JavaScript access (XSS protection)
secure=True, # HTTPS only in production
samesite="Lax", # CSRF protection with good usability
max_age=REFRESH_TOKEN_EXPIRE_DAYS * 24 * 60 * 60
)
```
**SameSite Options:**
- **`Lax`** (Recommended): Cookies sent on top-level navigation but not cross-site requests
- **`Strict`**: Maximum security but may break some user flows
- **`None`**: Required for cross-origin requests (must use with Secure)
## Token Blacklisting
Token blacklisting solves a fundamental problem with JWT tokens: once issued, they remain valid until expiration, even if the user logs out. Blacklisting provides immediate token revocation.
### Why Blacklisting Matters
Without blacklisting, logged-out users could continue accessing your API until their tokens naturally expire. This creates security risks, especially on shared computers or if tokens are compromised.
### Blacklisting Implementation
The system uses a database table to track invalidated tokens:
```python
# models/token_blacklist.py
class TokenBlacklist(Base):
__tablename__ = "token_blacklist"
id: Mapped[int] = mapped_column(primary_key=True)
token: Mapped[str] = mapped_column(unique=True, index=True) # Full token string
expires_at: Mapped[datetime] = mapped_column() # When to clean up
created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
```
**Design Considerations:**
- **Unique constraint**: Prevents duplicate entries
- **Index on token**: Fast lookup during verification
- **Expires_at field**: Enables automatic cleanup of old entries
### Blacklisting Tokens
The system provides functions for both single token and dual token blacklisting:
```python
from app.core.security import blacklist_token, blacklist_tokens
# Single token blacklisting (for specific scenarios)
await blacklist_token(token, db)
# Dual token blacklisting (standard logout)
await blacklist_tokens(access_token, refresh_token, db)
```
### Blacklisting Process
The blacklisting process extracts the expiration time from the token to set an appropriate cleanup schedule:
```python
async def blacklist_token(token: str, db: AsyncSession) -> None:
# 1. Decode token to extract expiration (no verification needed)
payload = jwt.decode(token, SECRET_KEY.get_secret_value(), algorithms=[ALGORITHM])
exp_timestamp = payload.get("exp")
if exp_timestamp is not None:
# 2. Convert Unix timestamp to datetime
expires_at = datetime.fromtimestamp(exp_timestamp)
# 3. Store in blacklist with expiration
await crud_token_blacklist.create(
db,
object=TokenBlacklistCreate(token=token, expires_at=expires_at)
)
```
**Cleanup Strategy**: Blacklisted tokens can be automatically removed from the database after their natural expiration time, preventing unlimited database growth.
## Login Flow Implementation
### Complete Login Endpoint
```python
@router.post("/login", response_model=Token)
async def login_for_access_token(
response: Response,
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
db: Annotated[AsyncSession, Depends(async_get_db)],
) -> dict[str, str]:
# 1. Authenticate user
user = await authenticate_user(
username_or_email=form_data.username,
password=form_data.password,
db=db
)
if not user:
raise HTTPException(
status_code=401,
detail="Incorrect username or password"
)
# 2. Create access token
access_token = await create_access_token(data={"sub": user["username"]})
# 3. Create refresh token
refresh_token = await create_refresh_token(data={"sub": user["username"]})
# 4. Set refresh token as HTTP-only cookie
response.set_cookie(
key="refresh_token",
value=refresh_token,
httponly=True,
secure=True,
samesite="strict",
max_age=REFRESH_TOKEN_EXPIRE_DAYS * 24 * 60 * 60
)
return {"access_token": access_token, "token_type": "bearer"}
```
### Token Refresh Endpoint
```python
@router.post("/refresh", response_model=Token)
async def refresh_access_token(
response: Response,
db: Annotated[AsyncSession, Depends(async_get_db)],
refresh_token: str = Cookie(None)
) -> dict[str, str]:
if not refresh_token:
raise HTTPException(status_code=401, detail="Refresh token missing")
# 1. Verify refresh token
token_data = await verify_token(refresh_token, TokenType.REFRESH, db)
if not token_data:
raise HTTPException(status_code=401, detail="Invalid refresh token")
# 2. Create new access token
new_access_token = await create_access_token(
data={"sub": token_data.username_or_email}
)
# 3. Optionally create new refresh token (token rotation)
new_refresh_token = await create_refresh_token(
data={"sub": token_data.username_or_email}
)
# 4. Blacklist old refresh token
await blacklist_token(refresh_token, db)
# 5. Set new refresh token cookie
response.set_cookie(
key="refresh_token",
value=new_refresh_token,
httponly=True,
secure=True,
samesite="strict",
max_age=REFRESH_TOKEN_EXPIRE_DAYS * 24 * 60 * 60
)
return {"access_token": new_access_token, "token_type": "bearer"}
```
### Logout Implementation
```python
@router.post("/logout")
async def logout(
response: Response,
db: Annotated[AsyncSession, Depends(async_get_db)],
current_user: dict = Depends(get_current_user),
token: str = Depends(oauth2_scheme),
refresh_token: str = Cookie(None)
) -> dict[str, str]:
# 1. Blacklist access token
await blacklist_token(token, db)
# 2. Blacklist refresh token if present
if refresh_token:
await blacklist_token(refresh_token, db)
# 3. Clear refresh token cookie
response.delete_cookie(
key="refresh_token",
httponly=True,
secure=True,
samesite="strict"
)
return {"message": "Successfully logged out"}
```
## Authentication Dependencies
### get_current_user
```python
async def get_current_user(
db: AsyncSession = Depends(async_get_db),
token: str = Depends(oauth2_scheme)
) -> dict:
# 1. Verify token
token_data = await verify_token(token, TokenType.ACCESS, db)
if not token_data:
raise HTTPException(status_code=401, detail="Invalid token")
# 2. Get user from database
user = await crud_users.get(
db=db,
username=token_data.username_or_email,
schema_to_select=UserRead
)
if user is None:
raise HTTPException(status_code=401, detail="User not found")
return user
```
### get_optional_user
```python
async def get_optional_user(
db: AsyncSession = Depends(async_get_db),
token: str = Depends(optional_oauth2_scheme)
) -> dict | None:
if not token:
return None
try:
return await get_current_user(db=db, token=token)
except HTTPException:
return None
```
### get_current_superuser
```python
async def get_current_superuser(
current_user: dict = Depends(get_current_user)
) -> dict:
if not current_user.get("is_superuser", False):
raise HTTPException(
status_code=403,
detail="Not enough permissions"
)
return current_user
```
## Configuration
### Environment Variables
```bash
# JWT Configuration
SECRET_KEY=your-secret-key-here
ALGORITHM=HS256
ACCESS_TOKEN_EXPIRE_MINUTES=30
REFRESH_TOKEN_EXPIRE_DAYS=7
# Security Headers
SECURE_COOKIES=true
CORS_ORIGINS=["http://localhost:3000", "https://yourapp.com"]
```
### Security Configuration
```python
# app/core/config.py
class Settings(BaseSettings):
SECRET_KEY: SecretStr
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
REFRESH_TOKEN_EXPIRE_DAYS: int = 7
# Cookie settings
SECURE_COOKIES: bool = True
COOKIE_DOMAIN: str | None = None
COOKIE_SAMESITE: str = "strict"
```
## Security Best Practices
### Token Security
- **Use strong secrets**: Generate cryptographically secure SECRET_KEY
- **Rotate secrets**: Regularly change SECRET_KEY in production
- **Environment separation**: Different secrets for dev/staging/production
- **Secure transmission**: Always use HTTPS in production
### Cookie Security
- **HttpOnly flag**: Prevents JavaScript access to refresh tokens
- **Secure flag**: Ensures cookies only sent over HTTPS
- **SameSite attribute**: Prevents CSRF attacks
- **Domain restrictions**: Set cookie domain appropriately
### Implementation Security
- **Input validation**: Validate all token inputs
- **Rate limiting**: Implement login attempt limits
- **Audit logging**: Log authentication events
- **Token rotation**: Regularly refresh tokens
## Common Patterns
### API Key Authentication
For service-to-service communication:
```python
async def get_api_key_user(
api_key: str = Header(None),
db: AsyncSession = Depends(async_get_db)
) -> dict:
if not api_key:
raise HTTPException(status_code=401, detail="API key required")
# Verify API key
user = await crud_users.get(db=db, api_key=api_key)
if not user:
raise HTTPException(status_code=401, detail="Invalid API key")
return user
```
### Multiple Authentication Methods
```python
async def get_authenticated_user(
db: AsyncSession = Depends(async_get_db),
token: str = Depends(optional_oauth2_scheme),
api_key: str = Header(None)
) -> dict:
# Try JWT token first
if token:
try:
return await get_current_user(db=db, token=token)
except HTTPException:
pass
# Fall back to API key
if api_key:
return await get_api_key_user(api_key=api_key, db=db)
raise HTTPException(status_code=401, detail="Authentication required")
```
## Troubleshooting
### Common Issues
**Token Expired**: Implement automatic refresh using refresh tokens
**Invalid Signature**: Check SECRET_KEY consistency across environments
**Blacklisted Token**: User logged out - redirect to login
**Missing Token**: Ensure Authorization header is properly set
### Debugging Tips
```python
# Enable debug logging
import logging
logging.getLogger("app.core.security").setLevel(logging.DEBUG)
# Test token validation
async def debug_token(token: str, db: AsyncSession):
try:
payload = jwt.decode(token, SECRET_KEY.get_secret_value(), algorithms=[ALGORITHM])
print(f"Token payload: {payload}")
is_blacklisted = await crud_token_blacklist.exists(db, token=token)
print(f"Is blacklisted: {is_blacklisted}")
except JWTError as e:
print(f"JWT Error: {e}")
```
This comprehensive JWT implementation provides secure, scalable authentication for your FastAPI application.

View File

@ -0,0 +1,634 @@
# Permissions and Authorization
Authorization determines what authenticated users can do within your application. While authentication answers "who are you?", authorization answers "what can you do?". This section covers the permission system, access control patterns, and how to implement secure authorization in your endpoints.
## Understanding Authorization
Authorization is a multi-layered security concept that protects resources and operations based on user identity, roles, and contextual information. The boilerplate implements several authorization patterns to handle different security requirements.
### Authorization vs Authentication
**Authentication**: Verifies user identity - confirms the user is who they claim to be
**Authorization**: Determines user permissions - decides what the authenticated user can access
These work together: you must authenticate first (prove identity) before you can authorize (check permissions).
### Authorization Patterns
The system implements several common authorization patterns:
1. **Role-Based Access Control (RBAC)**: Users have roles (superuser, regular user) that determine permissions
2. **Resource Ownership**: Users can only access resources they own
3. **Tiered Access**: Different user tiers have different capabilities and limits
4. **Contextual Authorization**: Permissions based on request context (rate limits, time-based access)
## Core Authorization Patterns
### Superuser Permissions
Superusers have elevated privileges for administrative operations. This pattern is essential for system management but must be carefully controlled.
```python
from app.api.dependencies import get_current_superuser
# Superuser-only endpoint
@router.get("/admin/users/", dependencies=[Depends(get_current_superuser)])
async def get_all_users(
db: AsyncSession = Depends(async_get_db)
) -> list[UserRead]:
# Only superusers can access this endpoint
users = await crud_users.get_multi(
db=db,
schema_to_select=UserRead,
return_as_model=True
)
return users.data
```
**When to Use Superuser Authorization:**
- **User management operations**: Creating, deleting, or modifying other users
- **System configuration**: Changing application settings or configuration
- **Data export/import**: Bulk operations on sensitive data
- **Administrative reporting**: Access to system-wide analytics and logs
**Security Considerations:**
- **Minimal Assignment**: Only assign superuser status when absolutely necessary
- **Regular Audits**: Periodically review who has superuser access
- **Activity Logging**: Log all superuser actions for security monitoring
- **Time-Limited Access**: Consider temporary superuser elevation for specific tasks
### Resource Ownership
Resource ownership ensures users can only access and modify their own data. This is the most common authorization pattern in user-facing applications.
```python
@router.get("/posts/me/")
async def get_my_posts(
current_user: dict = Depends(get_current_user),
db: AsyncSession = Depends(async_get_db)
) -> list[PostRead]:
# Get posts owned by current user
posts = await crud_posts.get_multi(
db=db,
created_by_user_id=current_user["id"],
schema_to_select=PostRead,
return_as_model=True
)
return posts.data
@router.delete("/posts/{post_id}")
async def delete_post(
post_id: int,
current_user: dict = Depends(get_current_user),
db: AsyncSession = Depends(async_get_db)
) -> dict[str, str]:
# 1. Get the post
post = await crud_posts.get(db=db, id=post_id)
if not post:
raise NotFoundException("Post not found")
# 2. Check ownership
if post["created_by_user_id"] != current_user["id"]:
raise ForbiddenException("You can only delete your own posts")
# 3. Delete the post
await crud_posts.delete(db=db, id=post_id)
return {"message": "Post deleted"}
```
**Ownership Validation Pattern:**
1. **Retrieve Resource**: Get the resource from the database
2. **Check Ownership**: Compare resource owner with current user
3. **Authorize or Deny**: Allow action if user owns resource, deny otherwise
### User Tiers and Rate Limiting
User tiers provide differentiated access based on subscription levels or user status. This enables business models with different feature sets for different user types.
```python
@router.post("/posts/", response_model=PostRead)
async def create_post(
post: PostCreate,
current_user: dict = Depends(get_current_user),
db: AsyncSession = Depends(async_get_db)
) -> PostRead:
# Check rate limits based on user tier
await check_rate_limit(
resource="posts",
user_id=current_user["id"],
tier_id=current_user.get("tier_id"),
db=db
)
# Create post with user association
post_internal = PostCreateInternal(
**post.model_dump(),
created_by_user_id=current_user["id"]
)
created_post = await crud_posts.create(db=db, object=post_internal)
return created_post
```
**Rate Limiting Implementation:**
```python
async def check_rate_limit(
resource: str,
user_id: int,
tier_id: int | None,
db: AsyncSession
) -> None:
# 1. Get user's tier information
if tier_id:
tier = await crud_tiers.get(db=db, id=tier_id)
limit = tier["rate_limit_posts"] if tier else 10 # Default limit
else:
limit = 5 # Free tier limit
# 2. Count recent posts (last 24 hours)
recent_posts = await crud_posts.count(
db=db,
created_by_user_id=user_id,
created_at__gte=datetime.utcnow() - timedelta(hours=24)
)
# 3. Check if limit exceeded
if recent_posts >= limit:
raise RateLimitException(f"Daily {resource} limit exceeded ({limit})")
```
**Tier-Based Authorization Benefits:**
- **Business Model Support**: Different features for different subscription levels
- **Resource Protection**: Prevents abuse by limiting free tier usage
- **Progressive Enhancement**: Encourages upgrades by showing tier benefits
- **Fair Usage**: Ensures equitable resource distribution among users
### Custom Permission Helpers
Custom permission functions provide reusable authorization logic for complex scenarios.
```python
# Permission helper functions
async def can_edit_post(user: dict, post_id: int, db: AsyncSession) -> bool:
"""Check if user can edit a specific post."""
post = await crud_posts.get(db=db, id=post_id)
if not post:
return False
# Superusers can edit any post
if user.get("is_superuser", False):
return True
# Users can edit their own posts
if post["created_by_user_id"] == user["id"]:
return True
return False
async def can_access_admin_panel(user: dict) -> bool:
"""Check if user can access admin panel."""
return user.get("is_superuser", False)
async def has_tier_feature(user: dict, feature: str, db: AsyncSession) -> bool:
"""Check if user's tier includes a specific feature."""
tier_id = user.get("tier_id")
if not tier_id:
return False # Free tier - no premium features
tier = await crud_tiers.get(db=db, id=tier_id)
if not tier:
return False
# Check tier features (example)
return tier.get(f"allows_{feature}", False)
# Usage in endpoints
@router.put("/posts/{post_id}")
async def update_post(
post_id: int,
post_updates: PostUpdate,
current_user: dict = Depends(get_current_user),
db: AsyncSession = Depends(async_get_db)
) -> PostRead:
# Use permission helper
if not await can_edit_post(current_user, post_id, db):
raise ForbiddenException("Cannot edit this post")
updated_post = await crud_posts.update(
db=db,
object=post_updates,
id=post_id
)
return updated_post
```
**Permission Helper Benefits:**
- **Reusability**: Same logic used across multiple endpoints
- **Consistency**: Ensures uniform permission checking
- **Maintainability**: Changes to permissions only need updates in one place
- **Testability**: Permission logic can be unit tested separately
## Authorization Dependencies
### Basic Authorization Dependencies
```python
# Required authentication
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(async_get_db)
) -> dict:
"""Get currently authenticated user."""
token_data = await verify_token(token, TokenType.ACCESS, db)
if not token_data:
raise HTTPException(status_code=401, detail="Invalid token")
user = await crud_users.get(db=db, username=token_data.username_or_email)
if not user:
raise HTTPException(status_code=401, detail="User not found")
return user
# Optional authentication
async def get_optional_user(
token: str = Depends(optional_oauth2_scheme),
db: AsyncSession = Depends(async_get_db)
) -> dict | None:
"""Get currently authenticated user, or None if not authenticated."""
if not token:
return None
try:
return await get_current_user(token=token, db=db)
except HTTPException:
return None
# Superuser requirement
async def get_current_superuser(
current_user: dict = Depends(get_current_user)
) -> dict:
"""Get current user and ensure they are a superuser."""
if not current_user.get("is_superuser", False):
raise HTTPException(status_code=403, detail="Not enough permissions")
return current_user
```
### Advanced Authorization Dependencies
```python
# Tier-based access control
def require_tier(minimum_tier: str):
"""Factory function for tier-based dependencies."""
async def check_user_tier(
current_user: dict = Depends(get_current_user),
db: AsyncSession = Depends(async_get_db)
) -> dict:
tier_id = current_user.get("tier_id")
if not tier_id:
raise HTTPException(status_code=403, detail="No subscription tier")
tier = await crud_tiers.get(db=db, id=tier_id)
if not tier or tier["name"] != minimum_tier:
raise HTTPException(
status_code=403,
detail=f"Requires {minimum_tier} tier"
)
return current_user
return check_user_tier
# Resource ownership dependency
def require_resource_ownership(resource_type: str):
"""Factory function for resource ownership dependencies."""
async def check_ownership(
resource_id: int,
current_user: dict = Depends(get_current_user),
db: AsyncSession = Depends(async_get_db)
) -> dict:
if resource_type == "post":
resource = await crud_posts.get(db=db, id=resource_id)
owner_field = "created_by_user_id"
else:
raise ValueError(f"Unknown resource type: {resource_type}")
if not resource:
raise HTTPException(status_code=404, detail="Resource not found")
# Superusers can access any resource
if current_user.get("is_superuser", False):
return current_user
# Check ownership
if resource[owner_field] != current_user["id"]:
raise HTTPException(
status_code=403,
detail="You don't own this resource"
)
return current_user
return check_ownership
# Usage examples
@router.get("/premium-feature", dependencies=[Depends(require_tier("Premium"))])
async def premium_feature():
return {"message": "Premium feature accessed"}
@router.put("/posts/{post_id}")
async def update_post(
post_id: int,
post_update: PostUpdate,
current_user: dict = Depends(require_resource_ownership("post")),
db: AsyncSession = Depends(async_get_db)
) -> PostRead:
# User ownership already verified by dependency
updated_post = await crud_posts.update(db=db, object=post_update, id=post_id)
return updated_post
```
## Security Best Practices
### Principle of Least Privilege
Always grant the minimum permissions necessary for users to complete their tasks.
**Implementation:**
- **Default Deny**: Start with no permissions and explicitly grant what's needed
- **Regular Review**: Periodically audit user permissions and remove unnecessary access
- **Role Segregation**: Separate administrative and user-facing permissions
- **Temporary Elevation**: Use temporary permissions for one-time administrative tasks
### Defense in Depth
Implement multiple layers of authorization checks throughout your application.
**Authorization Layers:**
1. **API Gateway**: Route-level permission checks
2. **Endpoint Dependencies**: FastAPI dependency injection for common patterns
3. **Business Logic**: Method-level permission validation
4. **Database**: Row-level security where applicable
### Input Validation and Sanitization
Always validate and sanitize user input, even from authorized users.
```python
@router.post("/admin/users/{user_id}/tier")
async def update_user_tier(
user_id: int,
tier_update: UserTierUpdate,
current_user: dict = Depends(get_current_superuser),
db: AsyncSession = Depends(async_get_db)
) -> dict[str, str]:
# 1. Validate tier exists
tier = await crud_tiers.get(db=db, id=tier_update.tier_id)
if not tier:
raise NotFoundException("Tier not found")
# 2. Validate user exists
user = await crud_users.get(db=db, id=user_id)
if not user:
raise NotFoundException("User not found")
# 3. Prevent self-demotion (optional business rule)
if user_id == current_user["id"] and tier["name"] == "free":
raise ForbiddenException("Cannot demote yourself to free tier")
# 4. Update user tier
await crud_users.update(
db=db,
object={"tier_id": tier_update.tier_id},
id=user_id
)
return {"message": f"User tier updated to {tier['name']}"}
```
### Audit Logging
Log all significant authorization decisions for security monitoring and compliance.
```python
import logging
security_logger = logging.getLogger("security")
async def log_authorization_event(
user_id: int,
action: str,
resource: str,
result: str,
details: dict = None
):
"""Log authorization events for security auditing."""
security_logger.info(
f"Authorization {result}: User {user_id} attempted {action} on {resource}",
extra={
"user_id": user_id,
"action": action,
"resource": resource,
"result": result,
"details": details or {}
}
)
# Usage in permission checks
async def delete_user_account(user_id: int, current_user: dict, db: AsyncSession):
if current_user["id"] != user_id and not current_user.get("is_superuser"):
await log_authorization_event(
user_id=current_user["id"],
action="delete_account",
resource=f"user:{user_id}",
result="denied",
details={"reason": "insufficient_permissions"}
)
raise ForbiddenException("Cannot delete other users' accounts")
await log_authorization_event(
user_id=current_user["id"],
action="delete_account",
resource=f"user:{user_id}",
result="granted"
)
# Proceed with deletion
await crud_users.delete(db=db, id=user_id)
```
## Common Authorization Patterns
### Multi-Tenant Authorization
For applications serving multiple organizations or tenants:
```python
@router.get("/organizations/{org_id}/users/")
async def get_organization_users(
org_id: int,
current_user: dict = Depends(get_current_user),
db: AsyncSession = Depends(async_get_db)
) -> list[UserRead]:
# Check if user belongs to organization
membership = await crud_org_members.get(
db=db,
organization_id=org_id,
user_id=current_user["id"]
)
if not membership:
raise ForbiddenException("Not a member of this organization")
# Check if user has admin role in organization
if membership.role not in ["admin", "owner"]:
raise ForbiddenException("Insufficient organization permissions")
# Get organization users
users = await crud_users.get_multi(
db=db,
organization_id=org_id,
schema_to_select=UserRead,
return_as_model=True
)
return users.data
```
### Time-Based Permissions
For permissions that change based on time or schedule:
```python
from datetime import datetime, time
async def check_business_hours_access(user: dict) -> bool:
"""Check if user can access during business hours only."""
now = datetime.now()
business_start = time(9, 0) # 9 AM
business_end = time(17, 0) # 5 PM
# Superusers can always access
if user.get("is_superuser", False):
return True
# Regular users only during business hours
current_time = now.time()
return business_start <= current_time <= business_end
# Usage in dependency
async def require_business_hours(
current_user: dict = Depends(get_current_user)
) -> dict:
"""Require access during business hours for non-admin users."""
if not await check_business_hours_access(current_user):
raise ForbiddenException("Access only allowed during business hours")
return current_user
@router.post("/business-operation", dependencies=[Depends(require_business_hours)])
async def business_operation():
return {"message": "Business operation completed"}
```
### Role-Based Access Control (RBAC)
For more complex permission systems:
```python
# Role definitions
class Role(str, Enum):
USER = "user"
MODERATOR = "moderator"
ADMIN = "admin"
SUPERUSER = "superuser"
# Permission checking
def has_role(user: dict, required_role: Role) -> bool:
"""Check if user has required role or higher."""
role_hierarchy = {
Role.USER: 0,
Role.MODERATOR: 1,
Role.ADMIN: 2,
Role.SUPERUSER: 3
}
user_role = Role(user.get("role", "user"))
return role_hierarchy[user_role] >= role_hierarchy[required_role]
# Role-based dependency
def require_role(minimum_role: Role):
"""Factory for role-based dependencies."""
async def check_role(current_user: dict = Depends(get_current_user)) -> dict:
if not has_role(current_user, minimum_role):
raise HTTPException(
status_code=403,
detail=f"Requires {minimum_role.value} role or higher"
)
return current_user
return check_role
# Usage
@router.delete("/posts/{post_id}", dependencies=[Depends(require_role(Role.MODERATOR))])
async def moderate_delete_post(post_id: int, db: AsyncSession = Depends(async_get_db)):
await crud_posts.delete(db=db, id=post_id)
return {"message": "Post deleted by moderator"}
```
### Feature Flags and Permissions
For gradual feature rollouts:
```python
async def has_feature_access(user: dict, feature: str, db: AsyncSession) -> bool:
"""Check if user has access to a specific feature."""
# Check feature flags
feature_flag = await crud_feature_flags.get(db=db, name=feature)
if not feature_flag or not feature_flag.enabled:
return False
# Check user tier permissions
if feature_flag.requires_tier:
tier_id = user.get("tier_id")
if not tier_id:
return False
tier = await crud_tiers.get(db=db, id=tier_id)
if not tier or tier["level"] < feature_flag["minimum_tier_level"]:
return False
# Check beta user status
if feature_flag.beta_only:
return user.get("is_beta_user", False)
return True
# Feature flag dependency
def require_feature(feature_name: str):
"""Factory for feature flag dependencies."""
async def check_feature_access(
current_user: dict = Depends(get_current_user),
db: AsyncSession = Depends(async_get_db)
) -> dict:
if not await has_feature_access(current_user, feature_name, db):
raise HTTPException(
status_code=403,
detail=f"Access to {feature_name} feature not available"
)
return current_user
return check_feature_access
@router.get("/beta-feature", dependencies=[Depends(require_feature("beta_analytics"))])
async def get_beta_analytics():
return {"analytics": "beta_data"}
```
This comprehensive permissions system provides flexible, secure authorization patterns that can be adapted to your specific application requirements while maintaining security best practices.

View File

@ -0,0 +1,879 @@
# User Management
User management forms the core of any authentication system, handling everything from user registration and login to profile updates and account deletion. This section covers the complete user lifecycle with secure authentication flows and administrative operations.
## Understanding User Lifecycle
The user lifecycle in the boilerplate follows a secure, well-defined process that protects user data while providing a smooth experience. Understanding this flow helps you customize the system for your specific needs.
**Registration → Authentication → Profile Management → Administrative Operations**
Each stage has specific security considerations and business logic that ensure data integrity and user safety.
## User Registration
User registration is the entry point to your application. The process must be secure, user-friendly, and prevent common issues like duplicate accounts or weak passwords.
### Registration Process
The registration endpoint performs several validation steps before creating a user account. This multi-step validation prevents common registration issues and ensures data quality.
```python
# User registration endpoint
@router.post("/user", response_model=UserRead, status_code=201)
async def write_user(
user: UserCreate,
db: AsyncSession
) -> UserRead:
# 1. Check if email exists
email_row = await crud_users.exists(db=db, email=user.email)
if email_row:
raise DuplicateValueException("Email is already registered")
# 2. Check if username exists
username_row = await crud_users.exists(db=db, username=user.username)
if username_row:
raise DuplicateValueException("Username not available")
# 3. Hash password
user_internal_dict = user.model_dump()
user_internal_dict["hashed_password"] = get_password_hash(
password=user_internal_dict["password"]
)
del user_internal_dict["password"]
# 4. Create user
user_internal = UserCreateInternal(**user_internal_dict)
created_user = await crud_users.create(db=db, object=user_internal)
return created_user
```
**Security Steps Explained:**
1. **Email Uniqueness**: Prevents multiple accounts with the same email, which could cause confusion and security issues
2. **Username Uniqueness**: Ensures usernames are unique identifiers within your system
3. **Password Hashing**: Converts plain text passwords into secure hashes before database storage
4. **Data Separation**: Plain text passwords are immediately removed from memory after hashing
### Registration Schema
The registration schema defines what data is required and how it's validated. This ensures consistent data quality and prevents malformed user accounts.
```python
# User registration input
class UserCreate(UserBase):
model_config = ConfigDict(extra="forbid")
password: Annotated[
str,
Field(
pattern=r"^.{8,}|[0-9]+|[A-Z]+|[a-z]+|[^a-zA-Z0-9]+$",
examples=["Str1ngst!"]
)
]
# Internal schema for database storage
class UserCreateInternal(UserBase):
hashed_password: str
```
**Schema Design Principles:**
- **`extra="forbid"`**: Rejects unexpected fields, preventing injection of unauthorized data
- **Password Patterns**: Enforces minimum security requirements for passwords
- **Separation of Concerns**: External schema accepts passwords, internal schema stores hashes
## User Authentication
Authentication verifies user identity using credentials. The process must be secure against common attacks while remaining user-friendly.
### Authentication Process
```python
async def authenticate_user(username_or_email: str, password: str, db: AsyncSession) -> dict | False:
# 1. Get user by email or username
if "@" in username_or_email:
db_user = await crud_users.get(db=db, email=username_or_email, is_deleted=False)
else:
db_user = await crud_users.get(db=db, username=username_or_email, is_deleted=False)
if not db_user:
return False
# 2. Verify password
if not await verify_password(password, db_user["hashed_password"]):
return False
return db_user
```
**Security Considerations:**
- **Flexible Login**: Accepts both username and email for better user experience
- **Soft Delete Check**: `is_deleted=False` prevents deleted users from logging in
- **Consistent Timing**: Both user lookup and password verification take similar time
### Password Security
Password security is critical for protecting user accounts. The system uses industry-standard bcrypt hashing with automatic salt generation.
```python
import bcrypt
async def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a plain password against its hash."""
correct_password: bool = bcrypt.checkpw(
plain_password.encode(),
hashed_password.encode()
)
return correct_password
def get_password_hash(password: str) -> str:
"""Generate password hash with salt."""
hashed_password: str = bcrypt.hashpw(
password.encode(),
bcrypt.gensalt()
).decode()
return hashed_password
```
**Why bcrypt?**
- **Adaptive Hashing**: Computationally expensive, making brute force attacks impractical
- **Automatic Salt**: Each password gets a unique salt, preventing rainbow table attacks
- **Future-Proof**: Can increase computational cost as hardware improves
### Login Validation
Client-side validation provides immediate feedback but should never be the only validation layer.
```python
# Password validation pattern
PASSWORD_PATTERN = r"^.{8,}|[0-9]+|[A-Z]+|[a-z]+|[^a-zA-Z0-9]+$"
# Frontend validation (example)
function validatePassword(password) {
const minLength = password.length >= 8;
const hasNumber = /[0-9]/.test(password);
const hasUpper = /[A-Z]/.test(password);
const hasLower = /[a-z]/.test(password);
const hasSpecial = /[^a-zA-Z0-9]/.test(password);
return minLength && hasNumber && hasUpper && hasLower && hasSpecial;
}
```
**Validation Strategy:**
- **Server-Side**: Always validate on the server - client validation can be bypassed
- **Client-Side**: Provides immediate feedback for better user experience
- **Progressive**: Validate as user types to catch issues early
## Profile Management
Profile management allows users to update their information while maintaining security and data integrity.
### Get Current User Profile
Retrieving the current user's profile is a fundamental operation that should be fast and secure.
```python
@router.get("/user/me/", response_model=UserRead)
async def read_users_me(current_user: dict = Depends(get_current_user)) -> dict:
return current_user
# Frontend usage
async function getCurrentUser() {
const token = localStorage.getItem('access_token');
const response = await fetch('/api/v1/user/me/', {
headers: {
'Authorization': `Bearer ${token}`
}
});
if (response.ok) {
return await response.json();
}
throw new Error('Failed to get user profile');
}
```
**Design Decisions:**
- **`/me` Endpoint**: Common pattern that's intuitive for users and developers
- **Current User Dependency**: Automatically handles authentication and user lookup
- **Minimal Data**: Returns only safe, user-relevant information
### Update User Profile
Profile updates require careful validation to prevent unauthorized changes and maintain data integrity.
```python
@router.patch("/user/{username}")
async def patch_user(
values: UserUpdate,
username: str,
current_user: dict = Depends(get_current_user),
db: AsyncSession = Depends(async_get_db),
) -> dict[str, str]:
# 1. Get user from database
db_user = await crud_users.get(db=db, username=username, schema_to_select=UserRead)
if db_user is None:
raise NotFoundException("User not found")
# 2. Check ownership (users can only update their own profile)
if db_user["username"] != current_user["username"]:
raise ForbiddenException("Cannot update other users")
# 3. Validate unique constraints
if values.username and values.username != db_user["username"]:
existing_username = await crud_users.exists(db=db, username=values.username)
if existing_username:
raise DuplicateValueException("Username not available")
if values.email and values.email != db_user["email"]:
existing_email = await crud_users.exists(db=db, email=values.email)
if existing_email:
raise DuplicateValueException("Email is already registered")
# 4. Update user
await crud_users.update(db=db, object=values, username=username)
return {"message": "User updated"}
```
**Security Measures:**
1. **Ownership Verification**: Users can only update their own profiles
2. **Uniqueness Checks**: Prevents conflicts when changing username/email
3. **Partial Updates**: Only provided fields are updated
4. **Input Validation**: Pydantic schemas validate all input data
## User Deletion
User deletion requires careful consideration of data retention, user rights, and system integrity.
### Self-Deletion
Users should be able to delete their own accounts, but the process should be secure and potentially reversible.
```python
@router.delete("/user/{username}")
async def erase_user(
username: str,
current_user: dict = Depends(get_current_user),
db: AsyncSession = Depends(async_get_db),
token: str = Depends(oauth2_scheme),
) -> dict[str, str]:
# 1. Get user from database
db_user = await crud_users.get(db=db, username=username, schema_to_select=UserRead)
if not db_user:
raise NotFoundException("User not found")
# 2. Check ownership
if username != current_user["username"]:
raise ForbiddenException()
# 3. Soft delete user
await crud_users.delete(db=db, username=username)
# 4. Blacklist current token
await blacklist_token(token=token, db=db)
return {"message": "User deleted"}
```
**Soft Delete Benefits:**
- **Data Recovery**: Users can be restored if needed
- **Audit Trail**: Maintain records for compliance
- **Relationship Integrity**: Related data (posts, comments) remain accessible
- **Gradual Cleanup**: Allow time for data migration or backup
### Admin Deletion (Hard Delete)
Administrators may need to permanently remove users in specific circumstances.
```python
@router.delete("/db_user/{username}", dependencies=[Depends(get_current_superuser)])
async def erase_db_user(
username: str,
db: AsyncSession = Depends(async_get_db),
token: str = Depends(oauth2_scheme),
) -> dict[str, str]:
# 1. Check if user exists
db_user = await crud_users.exists(db=db, username=username)
if not db_user:
raise NotFoundException("User not found")
# 2. Hard delete from database
await crud_users.db_delete(db=db, username=username)
# 3. Blacklist current token
await blacklist_token(token=token, db=db)
return {"message": "User deleted from the database"}
```
**When to Use Hard Delete:**
- **Legal Requirements**: GDPR "right to be forgotten" requests
- **Data Breach Response**: Complete removal of compromised accounts
- **Spam/Abuse**: Permanent removal of malicious accounts
## Administrative Operations
### List All Users
```python
@router.get("/users", response_model=PaginatedListResponse[UserRead])
async def read_users(
db: AsyncSession = Depends(async_get_db),
page: int = 1,
items_per_page: int = 10
) -> dict:
users_data = await crud_users.get_multi(
db=db,
offset=compute_offset(page, items_per_page),
limit=items_per_page,
is_deleted=False,
)
response: dict[str, Any] = paginated_response(
crud_data=users_data,
page=page,
items_per_page=items_per_page
)
return response
```
### Get User by Username
```python
@router.get("/user/{username}", response_model=UserRead)
async def read_user(
username: str,
db: AsyncSession = Depends(async_get_db)
) -> UserRead:
db_user = await crud_users.get(
db=db,
username=username,
is_deleted=False,
schema_to_select=UserRead
)
if db_user is None:
raise NotFoundException("User not found")
return db_user
```
### User with Tier Information
```python
@router.get("/user/{username}/tier")
async def read_user_tier(
username: str,
db: AsyncSession = Depends(async_get_db)
) -> dict | None:
# 1. Get user
db_user = await crud_users.get(db=db, username=username, schema_to_select=UserRead)
if db_user is None:
raise NotFoundException("User not found")
# 2. Return None if no tier assigned
if db_user["tier_id"] is None:
return None
# 3. Get tier information
db_tier = await crud_tiers.get(db=db, id=db_user["tier_id"], schema_to_select=TierRead)
if not db_tier:
raise NotFoundException("Tier not found")
# 4. Combine user and tier data
user_dict = dict(db_user) # Convert to dict if needed
tier_dict = dict(db_tier) # Convert to dict if needed
for key, value in tier_dict.items():
user_dict[f"tier_{key}"] = value
return user_dict
```
## User Tiers and Permissions
### Assign User Tier
```python
@router.patch("/user/{username}/tier", dependencies=[Depends(get_current_superuser)])
async def patch_user_tier(
username: str,
values: UserTierUpdate,
db: AsyncSession = Depends(async_get_db)
) -> dict[str, str]:
# 1. Verify user exists
db_user = await crud_users.get(db=db, username=username, schema_to_select=UserRead)
if db_user is None:
raise NotFoundException("User not found")
# 2. Verify tier exists
tier_exists = await crud_tiers.exists(db=db, id=values.tier_id)
if not tier_exists:
raise NotFoundException("Tier not found")
# 3. Update user tier
await crud_users.update(db=db, object=values, username=username)
return {"message": "User tier updated"}
# Tier update schema
class UserTierUpdate(BaseModel):
tier_id: int
```
### User Rate Limits
```python
@router.get("/user/{username}/rate_limits", dependencies=[Depends(get_current_superuser)])
async def read_user_rate_limits(
username: str,
db: AsyncSession = Depends(async_get_db)
) -> dict[str, Any]:
# 1. Get user
db_user = await crud_users.get(db=db, username=username, schema_to_select=UserRead)
if db_user is None:
raise NotFoundException("User not found")
user_dict = dict(db_user) # Convert to dict if needed
# 2. No tier assigned
if db_user["tier_id"] is None:
user_dict["tier_rate_limits"] = []
return user_dict
# 3. Get tier and rate limits
db_tier = await crud_tiers.get(db=db, id=db_user["tier_id"], schema_to_select=TierRead)
if db_tier is None:
raise NotFoundException("Tier not found")
db_rate_limits = await crud_rate_limits.get_multi(db=db, tier_id=db_tier["id"])
user_dict["tier_rate_limits"] = db_rate_limits["data"]
return user_dict
```
## User Model Structure
### Database Model
```python
class User(Base):
__tablename__ = "user"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(30))
username: Mapped[str] = mapped_column(String(20), unique=True, index=True)
email: Mapped[str] = mapped_column(String(50), unique=True, index=True)
hashed_password: Mapped[str]
profile_image_url: Mapped[str] = mapped_column(default="https://www.profileimageurl.com")
is_superuser: Mapped[bool] = mapped_column(default=False)
tier_id: Mapped[int | None] = mapped_column(ForeignKey("tier.id"), default=None)
# Timestamps
created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
updated_at: Mapped[datetime | None] = mapped_column(default=None)
# Soft delete
is_deleted: Mapped[bool] = mapped_column(default=False)
deleted_at: Mapped[datetime | None] = mapped_column(default=None)
# Relationships
tier: Mapped["Tier"] = relationship(back_populates="users")
posts: Mapped[list["Post"]] = relationship(back_populates="created_by_user")
```
### User Schemas
```python
# Base schema with common fields
class UserBase(BaseModel):
name: Annotated[str, Field(min_length=2, max_length=30)]
username: Annotated[str, Field(min_length=2, max_length=20, pattern=r"^[a-z0-9]+$")]
email: Annotated[EmailStr, Field(examples=["user@example.com"])]
# Reading user data (API responses)
class UserRead(BaseModel):
id: int
name: str
username: str
email: str
profile_image_url: str
tier_id: int | None
# Full user data (internal use)
class User(TimestampSchema, UserBase, UUIDSchema, PersistentDeletion):
profile_image_url: str = "https://www.profileimageurl.com"
hashed_password: str
is_superuser: bool = False
tier_id: int | None = None
```
## Common User Operations
### Check User Existence
```python
# By email
email_exists = await crud_users.exists(db=db, email="user@example.com")
# By username
username_exists = await crud_users.exists(db=db, username="johndoe")
# By ID
user_exists = await crud_users.exists(db=db, id=123)
```
### Search Users
```python
# Get active users only
active_users = await crud_users.get_multi(
db=db,
is_deleted=False,
limit=10
)
# Get users by tier
tier_users = await crud_users.get_multi(
db=db,
tier_id=1,
is_deleted=False
)
# Get superusers
superusers = await crud_users.get_multi(
db=db,
is_superuser=True,
is_deleted=False
)
```
### User Statistics
```python
async def get_user_stats(db: AsyncSession) -> dict:
# Total users
total_users = await crud_users.count(db=db, is_deleted=False)
# Active users (logged in recently)
# This would require tracking last_login_at
# Users by tier
tier_stats = {}
tiers = await crud_tiers.get_multi(db=db)
for tier in tiers["data"]:
count = await crud_users.count(db=db, tier_id=tier["id"], is_deleted=False)
tier_stats[tier["name"]] = count
return {
"total_users": total_users,
"tier_distribution": tier_stats
}
```
## Frontend Integration
### Complete User Management Component
```javascript
class UserManager {
constructor(baseUrl = '/api/v1') {
this.baseUrl = baseUrl;
this.token = localStorage.getItem('access_token');
}
async register(userData) {
const response = await fetch(`${this.baseUrl}/user`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify(userData)
});
if (!response.ok) {
const error = await response.json();
throw new Error(error.detail);
}
return await response.json();
}
async login(username, password) {
const response = await fetch(`${this.baseUrl}/login`, {
method: 'POST',
headers: {
'Content-Type': 'application/x-www-form-urlencoded',
},
body: new URLSearchParams({
username: username,
password: password
})
});
if (!response.ok) {
const error = await response.json();
throw new Error(error.detail);
}
const tokens = await response.json();
localStorage.setItem('access_token', tokens.access_token);
this.token = tokens.access_token;
return tokens;
}
async getProfile() {
const response = await fetch(`${this.baseUrl}/user/me/`, {
headers: {
'Authorization': `Bearer ${this.token}`
}
});
if (!response.ok) {
throw new Error('Failed to get profile');
}
return await response.json();
}
async updateProfile(username, updates) {
const response = await fetch(`${this.baseUrl}/user/${username}`, {
method: 'PATCH',
headers: {
'Authorization': `Bearer ${this.token}`,
'Content-Type': 'application/json'
},
body: JSON.stringify(updates)
});
if (!response.ok) {
const error = await response.json();
throw new Error(error.detail);
}
return await response.json();
}
async deleteAccount(username) {
const response = await fetch(`${this.baseUrl}/user/${username}`, {
method: 'DELETE',
headers: {
'Authorization': `Bearer ${this.token}`
}
});
if (!response.ok) {
const error = await response.json();
throw new Error(error.detail);
}
// Clear local storage
localStorage.removeItem('access_token');
this.token = null;
return await response.json();
}
async logout() {
const response = await fetch(`${this.baseUrl}/logout`, {
method: 'POST',
headers: {
'Authorization': `Bearer ${this.token}`
}
});
// Clear local storage regardless of response
localStorage.removeItem('access_token');
this.token = null;
if (response.ok) {
return await response.json();
}
}
}
// Usage
const userManager = new UserManager();
// Register new user
try {
const user = await userManager.register({
name: "John Doe",
username: "johndoe",
email: "john@example.com",
password: "SecurePass123!"
});
console.log('User registered:', user);
} catch (error) {
console.error('Registration failed:', error.message);
}
// Login
try {
const tokens = await userManager.login('johndoe', 'SecurePass123!');
console.log('Login successful');
// Get profile
const profile = await userManager.getProfile();
console.log('User profile:', profile);
} catch (error) {
console.error('Login failed:', error.message);
}
```
## Security Considerations
### Input Validation
```python
# Server-side validation
class UserCreate(UserBase):
password: Annotated[
str,
Field(
min_length=8,
pattern=r"^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[@$!%*?&])[A-Za-z\d@$!%*?&]",
description="Password must contain uppercase, lowercase, number, and special character"
)
]
```
### Rate Limiting
```python
# Protect registration endpoint
@router.post("/user", dependencies=[Depends(rate_limiter_dependency)])
async def write_user(user: UserCreate, db: AsyncSession):
# Registration logic
pass
# Protect login endpoint
@router.post("/login", dependencies=[Depends(rate_limiter_dependency)])
async def login_for_access_token():
# Login logic
pass
```
### Data Sanitization
```python
def sanitize_user_input(user_data: dict) -> dict:
"""Sanitize user input to prevent XSS and injection."""
import html
sanitized = {}
for key, value in user_data.items():
if isinstance(value, str):
# HTML escape
sanitized[key] = html.escape(value.strip())
else:
sanitized[key] = value
return sanitized
```
## Next Steps
Now that you understand user management:
1. **[Permissions](permissions.md)** - Learn about role-based access control and authorization
2. **[Production Guide](../production.md)** - Implement production-grade security measures
3. **[JWT Tokens](jwt-tokens.md)** - Review token management if needed
User management provides the core functionality for authentication systems. Master these patterns before implementing advanced permission systems.
## Common Authentication Tasks
### Protect New Endpoints
```python
# Add authentication dependency to your router
@router.get("/my-endpoint")
async def my_endpoint(current_user: dict = Depends(get_current_user)):
# Endpoint now requires authentication
return {"user_specific_data": f"Hello {current_user['username']}"}
# Optional authentication for public endpoints
@router.get("/public-endpoint")
async def public_endpoint(user: dict | None = Depends(get_optional_user)):
if user:
return {"message": f"Hello {user['username']}", "premium_features": True}
return {"message": "Hello anonymous user", "premium_features": False}
```
### Complete Authentication Flow
```python
# 1. User registration
user_data = UserCreate(
name="John Doe",
username="johndoe",
email="john@example.com",
password="SecurePassword123!"
)
user = await crud_users.create(db=db, object=user_data)
# 2. User login
form_data = {"username": "johndoe", "password": "SecurePassword123!"}
user = await authenticate_user(form_data["username"], form_data["password"], db)
# 3. Token generation (handled in login endpoint)
access_token = await create_access_token(data={"sub": user["username"]})
refresh_token = await create_refresh_token(data={"sub": user["username"]})
# 4. API access with token
headers = {"Authorization": f"Bearer {access_token}"}
response = requests.get("/api/v1/users/me", headers=headers)
# 5. Token refresh when access token expires
response = requests.post("/api/v1/refresh") # Uses refresh token cookie
new_access_token = response.json()["access_token"]
# 6. Secure logout (blacklists both tokens)
await logout_user(access_token=access_token, refresh_token=refresh_token, db=db)
```
### Check User Permissions
```python
def check_user_permission(user: dict, required_tier: str = None):
"""Check if user has required permissions."""
if not user.get("is_active", True):
raise UnauthorizedException("User account is disabled")
if required_tier and user.get("tier", {}).get("name") != required_tier:
raise ForbiddenException(f"Requires {required_tier} tier")
# Usage in endpoint
@router.get("/premium-feature")
async def premium_feature(current_user: dict = Depends(get_current_user)):
check_user_permission(current_user, "Pro")
return {"premium_data": "exclusive_content"}
```
### Custom Authentication Logic
```python
async def get_user_with_posts(current_user: dict = Depends(get_current_user)):
"""Custom dependency that adds user's posts."""
posts = await crud_posts.get_multi(db=db, created_by_user_id=current_user["id"])
current_user["posts"] = posts
return current_user
# Usage
@router.get("/dashboard")
async def get_dashboard(user_with_posts: dict = Depends(get_user_with_posts)):
return {
"user": user_with_posts,
"post_count": len(user_with_posts["posts"])
}
```