"""User authentication dependencies and utilities."""
from typing import Optional
from fastapi import Depends, Header, HTTPException, status
from app.dependencies import get_user_token
import jwt
[docs]
class CurrentUser:
"""Represents the currently authenticated user."""
def __init__(
self,
email: str,
user_id: str,
is_authenticated: bool,
is_anonymous: bool = False,
is_service_token: bool = False,
display_name: Optional[str] = None,
name: Optional[str] = None,
):
self.email = email
self.user_id = user_id
self.is_authenticated = is_authenticated
self.is_anonymous = is_anonymous
self.is_service_token = is_service_token # True if authenticated via service token (public access)
self.display_name = display_name
self.name = name
def __repr__(self):
return f"<CurrentUser email={self.email} authenticated={self.is_authenticated} anonymous={self.is_anonymous} service_token={self.is_service_token}>"
[docs]
def get_current_user(
user_token: Optional[str] = Depends(get_user_token),
service_token: Optional[str] = Header(None, alias="x-kumiho-token")
) -> CurrentUser:
"""
Dependency that extracts and validates the current user from the token.
Checks both Authorization header (user token) and X-Kumiho-Token (service token).
User tokens take precedence if both are present.
Args:
user_token: The user's Firebase token from Authorization header
service_token: The service token from X-Kumiho-Token header
Returns:
CurrentUser object
"""
# Default to unauthenticated
email = "anonymous"
user_id = "anonymous"
display_name = None
name = None
is_authenticated = False
is_anonymous = True
# Try user token first (from Authorization header), then fallback to service token
# BUT: if user_token is a Firebase anonymous token, prefer service_token
token = user_token or service_token
# If we have both tokens, check if user_token is anonymous and prefer service_token
if user_token and service_token:
try:
user_decoded = jwt.decode(user_token, options={"verify_signature": False})
firebase_claims = user_decoded.get('firebase', {})
sign_in_provider = firebase_claims.get('sign_in_provider')
# If user is Firebase anonymous, prefer the service token
if sign_in_provider == 'anonymous' or not user_decoded.get('email'):
token = service_token
except Exception:
pass
# Track if this is a service token (for public access - should only see published posts)
is_service_token = False
if token:
try:
# Decode without signature verification (Firebase token validation happens upstream)
decoded = jwt.decode(token, options={"verify_signature": False})
# Extract user info
token_email = decoded.get('email')
token_sub = decoded.get('sub')
token_name = decoded.get('name') or decoded.get('display_name') or decoded.get('displayName')
# Check if this is a Control Plane JWT (has tenant_id claim)
# CP JWTs are always authenticated service tokens
if decoded.get('tenant_id'):
is_authenticated = True
is_anonymous = False
is_service_token = True # Mark as service token
user_id = token_sub or "service"
email = token_email or f"service_{decoded.get('tenant_slug', 'unknown')}"
# Prioritize user's name from token over tenant slug
display_name = token_name or decoded.get('tenant_slug') or email
name = display_name
else:
# Firebase token - check for anonymous session
firebase_claims = decoded.get('firebase', {})
sign_in_provider = firebase_claims.get('sign_in_provider')
if token_sub:
user_id = token_sub
# Check if anonymous Firebase session
if sign_in_provider == 'anonymous' or not token_email:
is_anonymous = True
is_authenticated = False
email = f"anonymous_{token_sub[:8]}"
else:
# Real authenticated user
is_authenticated = True
is_anonymous = False
email = token_email
display_name = token_name
name = token_name
except Exception:
# If token decode fails, remain anonymous/unauthenticated
pass
return CurrentUser(
email=email,
user_id=user_id,
is_authenticated=is_authenticated,
is_anonymous=is_anonymous,
is_service_token=is_service_token,
display_name=display_name,
name=name,
)
[docs]
def require_authenticated_user(current_user: CurrentUser = Depends(get_current_user)) -> CurrentUser:
"""
Dependency that requires an authenticated user.
Raises 401 if the user is not authenticated.
Use this for endpoints that require login.
Args:
current_user: The current user from get_current_user dependency
Returns:
CurrentUser object (guaranteed to be authenticated)
Raises:
HTTPException: 401 if user is not authenticated
"""
if not current_user.is_authenticated:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication required"
)
return current_user