import os import psycopg2 from fastapi import FastAPI, HTTPException, Request, Depends from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from typing import List, Optional from jose import jwt, JWTError import requests from functools import lru_cache app = FastAPI(title="Dex Demo Backend") # CORS настройки app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Конфигурация из переменных окружения DB_HOST = os.getenv("DB_HOST", "postgres") DB_PORT = os.getenv("DB_PORT", "5440") DB_NAME = os.getenv("DB_NAME", "dexdemo") DB_USER = os.getenv("DB_USER", "dexdemo") DB_PASSWORD = os.getenv("DB_PASSWORD", "dexdemo") DEX_ISSUER = os.getenv("DEX_ISSUER", "https://dex.127.0.0.1.sslip.io/") DEX_JWKS_URL = f"{DEX_ISSUER}keys" # Режим разработки INSECURE_DEV_MODE = os.getenv("INSECURE_DEV_MODE", "false").lower() == "true" if INSECURE_DEV_MODE: INSECURE_DEV_EMAIL = os.getenv("INSECURE_DEV_EMAIL") class Organization(BaseModel): id: int name: str class Role(BaseModel): id: int name: str description: Optional[str] class Link(BaseModel): id: int title: str url: str description: Optional[str] class UserInfo(BaseModel): email: str full_name: str organization: Optional[Organization] roles: List[Role] available_links: List[Link] def get_db_connection(): """Создание подключения к БД""" try: conn = psycopg2.connect( host=DB_HOST, port=DB_PORT, database=DB_NAME, user=DB_USER, password=DB_PASSWORD ) return conn except Exception as e: raise HTTPException(status_code=500, detail=f"Database connection error: {str(e)}") @lru_cache() def get_jwks(): """Получение JWKS от Dex для валидации JWT""" try: response = requests.get(DEX_JWKS_URL, verify=False, timeout=5) response.raise_for_status() return response.json() except Exception as e: print(f"Error fetching JWKS: {e}") return None def validate_jwt_token(token: str) -> dict: """Валидация JWT токена""" try: # Получаем JWKS jwks = get_jwks() if not jwks: raise HTTPException(status_code=500, detail="Cannot fetch JWKS from Dex") # Декодируем заголовок токена, чтобы получить kid unverified_header = jwt.get_unverified_header(token) kid = unverified_header.get("kid") # Находим нужный ключ key = None for jwk in jwks.get("keys", []): if jwk.get("kid") == kid: key = jwk break if not key: raise HTTPException(status_code=401, detail="Unable to find appropriate key") # Валидируем токен payload = jwt.decode( token, key, algorithms=["RS256"], audience=None, issuer=DEX_ISSUER, options={ "verify_aud": False, # Отключаем проверку audience для демо "verify_at_hash": False } ) return payload except JWTError as e: raise HTTPException(status_code=401, detail=f"Invalid token: {str(e)}") except Exception as e: raise HTTPException(status_code=401, detail=f"Token validation error: {str(e)}") def get_user_email(request: Request) -> str: """Извлечение email пользователя из JWT или заголовков""" # В режиме разработки возвращаем заданный email if INSECURE_DEV_MODE: print(f"INSECURE_DEV_MODE: Using email {INSECURE_DEV_EMAIL}") return INSECURE_DEV_EMAIL # Попытка получить токен из заголовка Authorization auth_header = request.headers.get("Authorization") if auth_header and auth_header.startswith("Bearer "): token = auth_header.split(" ")[1] payload = validate_jwt_token(token) # Email может быть в разных полях email = payload.get("email") # payload.get("name") if email: return email # Или берем из заголока от oauth2-proxy/DexAuthenticator email = request.headers.get("X-Auth-Request-Email") # request.headers.get("X-Auth-Request-User") if not email: raise HTTPException(status_code=401, detail="No authentication information found") return email @app.get("/api/health") def health_check(): """Проверка здоровья сервиса""" return {"status": "ok"} @app.get("/api/user-info", response_model=UserInfo) def get_user_info(email: str = Depends(get_user_email)): """Получение информации о пользователе""" conn = get_db_connection() cursor = conn.cursor() try: # Получаем информацию о пользователе cursor.execute(""" SELECT u.email, u.full_name, u.organization_id, o.name as org_name FROM users u LEFT JOIN organizations o ON u.organization_id = o.id WHERE u.email = %s """, (email,)) user_row = cursor.fetchone() if not user_row: raise HTTPException(status_code=404, detail="User not found in database") user_email, full_name, org_id, org_name = user_row organization = None if org_id and org_name: organization = Organization(id=org_id, name=org_name) # Получаем роли пользователя cursor.execute(""" SELECT r.id, r.name, r.description FROM roles r JOIN user_roles ur ON r.id = ur.role_id JOIN users u ON ur.user_id = u.id WHERE u.email = %s """, (email,)) roles = [ Role(id=row[0], name=row[1], description=row[2]) for row in cursor.fetchall() ] # Получаем доступные ссылки на основе ролей cursor.execute(""" SELECT DISTINCT l.id, l.title, l.url, l.description FROM links l JOIN role_links rl ON l.id = rl.link_id JOIN user_roles ur ON rl.role_id = ur.role_id JOIN users u ON ur.user_id = u.id WHERE u.email = %s ORDER BY l.id """, (email,)) available_links = [ Link(id=row[0], title=row[1], url=row[2], description=row[3]) for row in cursor.fetchall() ] return UserInfo( email=user_email, full_name=full_name, organization=organization, roles=roles, available_links=available_links ) finally: cursor.close() conn.close() if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)