Source code for app.auth_utils

"""User authentication dependencies and utilities."""
from typing import Optional
from fastapi import Depends, Header, HTTPException, status
from app.dependencies import get_user_token
import jwt


[docs] class CurrentUser: """Represents the currently authenticated user.""" def __init__( self, email: str, user_id: str, is_authenticated: bool, is_anonymous: bool = False, is_service_token: bool = False, display_name: Optional[str] = None, name: Optional[str] = None, ): self.email = email self.user_id = user_id self.is_authenticated = is_authenticated self.is_anonymous = is_anonymous self.is_service_token = is_service_token # True if authenticated via service token (public access) self.display_name = display_name self.name = name def __repr__(self): return f"<CurrentUser email={self.email} authenticated={self.is_authenticated} anonymous={self.is_anonymous} service_token={self.is_service_token}>"
[docs] def get_current_user( user_token: Optional[str] = Depends(get_user_token), service_token: Optional[str] = Header(None, alias="x-kumiho-token") ) -> CurrentUser: """ Dependency that extracts and validates the current user from the token. Checks both Authorization header (user token) and X-Kumiho-Token (service token). User tokens take precedence if both are present. Args: user_token: The user's Firebase token from Authorization header service_token: The service token from X-Kumiho-Token header Returns: CurrentUser object """ # Default to unauthenticated email = "anonymous" user_id = "anonymous" display_name = None name = None is_authenticated = False is_anonymous = True # Try user token first (from Authorization header), then fallback to service token # BUT: if user_token is a Firebase anonymous token, prefer service_token token = user_token or service_token # If we have both tokens, check if user_token is anonymous and prefer service_token if user_token and service_token: try: user_decoded = jwt.decode(user_token, options={"verify_signature": False}) firebase_claims = user_decoded.get('firebase', {}) sign_in_provider = firebase_claims.get('sign_in_provider') # If user is Firebase anonymous, prefer the service token if sign_in_provider == 'anonymous' or not user_decoded.get('email'): token = service_token except Exception: pass # Track if this is a service token (for public access - should only see published posts) is_service_token = False if token: try: # Decode without signature verification (Firebase token validation happens upstream) decoded = jwt.decode(token, options={"verify_signature": False}) # Extract user info token_email = decoded.get('email') token_sub = decoded.get('sub') token_name = decoded.get('name') or decoded.get('display_name') or decoded.get('displayName') # Check if this is a Control Plane JWT (has tenant_id claim) # CP JWTs are always authenticated service tokens if decoded.get('tenant_id'): is_authenticated = True is_anonymous = False is_service_token = True # Mark as service token user_id = token_sub or "service" email = token_email or f"service_{decoded.get('tenant_slug', 'unknown')}" # Prioritize user's name from token over tenant slug display_name = token_name or decoded.get('tenant_slug') or email name = display_name else: # Firebase token - check for anonymous session firebase_claims = decoded.get('firebase', {}) sign_in_provider = firebase_claims.get('sign_in_provider') if token_sub: user_id = token_sub # Check if anonymous Firebase session if sign_in_provider == 'anonymous' or not token_email: is_anonymous = True is_authenticated = False email = f"anonymous_{token_sub[:8]}" else: # Real authenticated user is_authenticated = True is_anonymous = False email = token_email display_name = token_name name = token_name except Exception: # If token decode fails, remain anonymous/unauthenticated pass return CurrentUser( email=email, user_id=user_id, is_authenticated=is_authenticated, is_anonymous=is_anonymous, is_service_token=is_service_token, display_name=display_name, name=name, )
[docs] def require_authenticated_user(current_user: CurrentUser = Depends(get_current_user)) -> CurrentUser: """ Dependency that requires an authenticated user. Raises 401 if the user is not authenticated. Use this for endpoints that require login. Args: current_user: The current user from get_current_user dependency Returns: CurrentUser object (guaranteed to be authenticated) Raises: HTTPException: 401 if user is not authenticated """ if not current_user.is_authenticated: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication required" ) return current_user