236 lines
7.2 KiB
Python
236 lines
7.2 KiB
Python
import os
|
||
import psycopg2
|
||
from fastapi import FastAPI, HTTPException, Request, Depends
|
||
from fastapi.middleware.cors import CORSMiddleware
|
||
from pydantic import BaseModel
|
||
from typing import List, Optional
|
||
from jose import jwt, JWTError
|
||
import requests
|
||
from functools import lru_cache
|
||
|
||
app = FastAPI(title="Dex Demo Backend")
|
||
|
||
# CORS настройки
|
||
app.add_middleware(
|
||
CORSMiddleware,
|
||
allow_origins=["*"],
|
||
allow_credentials=True,
|
||
allow_methods=["*"],
|
||
allow_headers=["*"],
|
||
)
|
||
|
||
# Конфигурация из переменных окружения
|
||
DB_HOST = os.getenv("DB_HOST", "postgres")
|
||
DB_PORT = os.getenv("DB_PORT", "5440")
|
||
DB_NAME = os.getenv("DB_NAME", "dexdemo")
|
||
DB_USER = os.getenv("DB_USER", "dexdemo")
|
||
DB_PASSWORD = os.getenv("DB_PASSWORD", "dexdemo")
|
||
DEX_ISSUER = os.getenv("DEX_ISSUER", "https://dex.127.0.0.1.sslip.io/")
|
||
DEX_JWKS_URL = f"{DEX_ISSUER}keys"
|
||
|
||
# Режим разработки
|
||
INSECURE_DEV_MODE = os.getenv("INSECURE_DEV_MODE", "false").lower() == "true"
|
||
if INSECURE_DEV_MODE:
|
||
INSECURE_DEV_EMAIL = os.getenv("INSECURE_DEV_EMAIL")
|
||
|
||
class Organization(BaseModel):
|
||
id: int
|
||
name: str
|
||
|
||
class Role(BaseModel):
|
||
id: int
|
||
name: str
|
||
description: Optional[str]
|
||
|
||
|
||
class Link(BaseModel):
|
||
id: int
|
||
title: str
|
||
url: str
|
||
description: Optional[str]
|
||
|
||
|
||
class UserInfo(BaseModel):
|
||
email: str
|
||
full_name: str
|
||
organization: Optional[Organization]
|
||
roles: List[Role]
|
||
available_links: List[Link]
|
||
|
||
|
||
def get_db_connection():
|
||
"""Создание подключения к БД"""
|
||
try:
|
||
conn = psycopg2.connect(
|
||
host=DB_HOST,
|
||
port=DB_PORT,
|
||
database=DB_NAME,
|
||
user=DB_USER,
|
||
password=DB_PASSWORD
|
||
)
|
||
return conn
|
||
except Exception as e:
|
||
raise HTTPException(status_code=500, detail=f"Database connection error: {str(e)}")
|
||
|
||
|
||
@lru_cache()
|
||
def get_jwks():
|
||
"""Получение JWKS от Dex для валидации JWT"""
|
||
try:
|
||
response = requests.get(DEX_JWKS_URL, verify=False, timeout=5)
|
||
response.raise_for_status()
|
||
return response.json()
|
||
except Exception as e:
|
||
print(f"Error fetching JWKS: {e}")
|
||
return None
|
||
|
||
|
||
def validate_jwt_token(token: str) -> dict:
|
||
"""Валидация JWT токена"""
|
||
try:
|
||
# Получаем JWKS
|
||
jwks = get_jwks()
|
||
if not jwks:
|
||
raise HTTPException(status_code=500, detail="Cannot fetch JWKS from Dex")
|
||
|
||
# Декодируем заголовок токена, чтобы получить kid
|
||
unverified_header = jwt.get_unverified_header(token)
|
||
kid = unverified_header.get("kid")
|
||
|
||
# Находим нужный ключ
|
||
key = None
|
||
for jwk in jwks.get("keys", []):
|
||
if jwk.get("kid") == kid:
|
||
key = jwk
|
||
break
|
||
|
||
if not key:
|
||
raise HTTPException(status_code=401, detail="Unable to find appropriate key")
|
||
|
||
# Валидируем токен
|
||
payload = jwt.decode(
|
||
token,
|
||
key,
|
||
algorithms=["RS256"],
|
||
audience=None,
|
||
issuer=DEX_ISSUER,
|
||
options={
|
||
"verify_aud": False, # Отключаем проверку audience для демо
|
||
"verify_at_hash": False
|
||
}
|
||
)
|
||
|
||
return payload
|
||
except JWTError as e:
|
||
raise HTTPException(status_code=401, detail=f"Invalid token: {str(e)}")
|
||
except Exception as e:
|
||
raise HTTPException(status_code=401, detail=f"Token validation error: {str(e)}")
|
||
|
||
|
||
def get_user_email(request: Request) -> str:
|
||
"""Извлечение email пользователя из JWT или заголовков"""
|
||
# В режиме разработки возвращаем заданный email
|
||
if INSECURE_DEV_MODE:
|
||
print(f"INSECURE_DEV_MODE: Using email {INSECURE_DEV_EMAIL}")
|
||
return INSECURE_DEV_EMAIL
|
||
|
||
# Попытка получить токен из заголовка Authorization
|
||
auth_header = request.headers.get("Authorization")
|
||
|
||
if auth_header and auth_header.startswith("Bearer "):
|
||
token = auth_header.split(" ")[1]
|
||
payload = validate_jwt_token(token)
|
||
|
||
# Email может быть в разных полях
|
||
email = payload.get("email") # payload.get("name")
|
||
if email:
|
||
return email
|
||
|
||
# Или берем из заголока от oauth2-proxy/DexAuthenticator
|
||
email = request.headers.get("X-Auth-Request-Email") # request.headers.get("X-Auth-Request-User")
|
||
|
||
if not email:
|
||
raise HTTPException(status_code=401, detail="No authentication information found")
|
||
|
||
return email
|
||
|
||
|
||
@app.get("/api/health")
|
||
def health_check():
|
||
"""Проверка здоровья сервиса"""
|
||
return {"status": "ok"}
|
||
|
||
|
||
@app.get("/api/user-info", response_model=UserInfo)
|
||
def get_user_info(email: str = Depends(get_user_email)):
|
||
"""Получение информации о пользователе"""
|
||
conn = get_db_connection()
|
||
cursor = conn.cursor()
|
||
|
||
try:
|
||
# Получаем информацию о пользователе
|
||
cursor.execute("""
|
||
SELECT u.email, u.full_name, u.organization_id, o.name as org_name
|
||
FROM users u
|
||
LEFT JOIN organizations o ON u.organization_id = o.id
|
||
WHERE u.email = %s
|
||
""", (email,))
|
||
|
||
user_row = cursor.fetchone()
|
||
if not user_row:
|
||
raise HTTPException(status_code=404, detail="User not found in database")
|
||
|
||
user_email, full_name, org_id, org_name = user_row
|
||
|
||
organization = None
|
||
if org_id and org_name:
|
||
organization = Organization(id=org_id, name=org_name)
|
||
|
||
# Получаем роли пользователя
|
||
cursor.execute("""
|
||
SELECT r.id, r.name, r.description
|
||
FROM roles r
|
||
JOIN user_roles ur ON r.id = ur.role_id
|
||
JOIN users u ON ur.user_id = u.id
|
||
WHERE u.email = %s
|
||
""", (email,))
|
||
|
||
roles = [
|
||
Role(id=row[0], name=row[1], description=row[2])
|
||
for row in cursor.fetchall()
|
||
]
|
||
|
||
# Получаем доступные ссылки на основе ролей
|
||
cursor.execute("""
|
||
SELECT DISTINCT l.id, l.title, l.url, l.description
|
||
FROM links l
|
||
JOIN role_links rl ON l.id = rl.link_id
|
||
JOIN user_roles ur ON rl.role_id = ur.role_id
|
||
JOIN users u ON ur.user_id = u.id
|
||
WHERE u.email = %s
|
||
ORDER BY l.id
|
||
""", (email,))
|
||
|
||
available_links = [
|
||
Link(id=row[0], title=row[1], url=row[2], description=row[3])
|
||
for row in cursor.fetchall()
|
||
]
|
||
|
||
return UserInfo(
|
||
email=user_email,
|
||
full_name=full_name,
|
||
organization=organization,
|
||
roles=roles,
|
||
available_links=available_links
|
||
)
|
||
|
||
finally:
|
||
cursor.close()
|
||
conn.close()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
import uvicorn
|
||
uvicorn.run(app, host="0.0.0.0", port=8000)
|
||
|