import sys
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
import redis.asyncio as redis
import structlog
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from python_api.api.routes.api_keys import router as api_keys_router
from python_api.api.routes.auth import router as auth_router
from python_api.api.routes.health import router as health_router
from python_api.api.routes.market import router as market_router
from python_api.api.routes.price import router as price_router
from python_api.api.routes.profile import router as profile_router
from python_api.api.routes.users import router as users_router
from python_api.db.session import dispose_engine, get_engine
from python_api.grpc.server import serve_grpc
from python_api.middleware.rate_limit import RateLimitMiddleware
from python_api.middleware.request_id import RequestIdMiddleware
from python_api.utils.config import KNOWN_INSECURE_SECRETS, get_settings
from python_api.utils.constants import API_V1_PREFIX
from python_api.utils.errors import AppError, app_error_handler, global_exception_handler
from python_api.utils.logger import configure_logging
from python_api.utils.telemetry import setup_telemetry, shutdown_telemetry
configure_logging()
logger = structlog.get_logger()
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
settings = get_settings()
get_engine()
logger.info("PostgreSQL engine initialized")
pool = redis.ConnectionPool.from_url(
settings.redis_url,
max_connections=settings.redis_pool_size,
encoding="utf-8",
decode_responses=True,
)
redis_client = redis.Redis(connection_pool=pool)
app.state.redis = redis_client
app.state.redis_pool = pool
logger.info("Redis initialized", pool_size=settings.redis_pool_size)
grpc_server = None
grpc_redis = None
grpc_pool = None
if settings.grpc_enabled:
grpc_pool = redis.ConnectionPool.from_url(
settings.redis_url,
max_connections=settings.redis_pool_size,
encoding="utf-8",
decode_responses=True,
)
grpc_redis = redis.Redis(connection_pool=grpc_pool)
try:
grpc_server = await serve_grpc(grpc_redis, settings.grpc_port)
logger.info("gRPC server started", port=settings.grpc_port)
except Exception as e:
logger.error("Failed to start gRPC server", error=str(e))
logger.info(
"Application startup",
app_name=settings.app_name,
version=settings.version,
host=settings.api_host,
port=settings.api_port,
)
yield
shutdown_telemetry()
if grpc_server:
await grpc_server.stop(5)
logger.info("gRPC server stopped")
if grpc_redis:
await grpc_redis.aclose()
if grpc_pool:
await grpc_pool.disconnect()
logger.info("gRPC Redis closed")
await redis_client.aclose()
await pool.disconnect()
logger.info("Redis closed")
await dispose_engine()
logger.info("PostgreSQL engine disposed")
logger.info("Application shutdown")
def create_app() -> FastAPI:
settings = get_settings()
if not settings.debug:
is_insecure = (
settings.jwt_secret_key in KNOWN_INSECURE_SECRETS or len(settings.jwt_secret_key) < 32
)
if is_insecure:
logger.error(
"Insecure JWT_SECRET_KEY detected!",
detail="Default simple values are not allowed in production. "
"Please set a unique JWT_SECRET_KEY with at least 32 characters in your environment or config/python-api.env file.",
)
sys.exit(1)
app = FastAPI(
title=settings.app_name,
description="Python API service for fetching and processing market data.",
version=settings.version,
contact={
"name": "API Support",
"email": "support@example.com",
},
license_info={
"name": "MIT",
"url": "https://opensource.org/licenses/MIT",
},
lifespan=lifespan,
openapi_url=f"{API_V1_PREFIX}/openapi.json",
docs_url=f"{API_V1_PREFIX}/docs",
redoc_url=f"{API_V1_PREFIX}/redoc",
)
setup_telemetry(app)
app.add_exception_handler(AppError, app_error_handler) # type: ignore[arg-type]
app.add_exception_handler(Exception, global_exception_handler)
app.add_middleware(RequestIdMiddleware)
app.add_middleware(
RateLimitMiddleware,
enable_rate_limit=settings.rate_limit_enabled,
requests_per_minute=settings.rate_limit_requests_per_minute,
)
app.add_middleware(
CORSMiddleware,
allow_origins=settings.cors_origins,
allow_credentials=True,
allow_methods=["GET", "POST", "PATCH", "DELETE", "OPTIONS"],
allow_headers=["Authorization", "Content-Type"],
)
app.include_router(auth_router, prefix=f"{API_V1_PREFIX}/auth", tags=["Authentication"])
app.include_router(profile_router, prefix=f"{API_V1_PREFIX}/profile", tags=["Profile"])
app.include_router(users_router, prefix=f"{API_V1_PREFIX}/users", tags=["Users"])
app.include_router(api_keys_router, prefix=f"{API_V1_PREFIX}/api-keys", tags=["API Keys"])
app.include_router(market_router, prefix=f"{API_V1_PREFIX}/market", tags=["Market"])
app.include_router(price_router, prefix=API_V1_PREFIX, tags=["Prices"])
app.include_router(health_router, prefix="/health", tags=["Health"])
return app
app = create_app()
@app.get(
"/",
responses={200: {"description": "Root endpoint"}},
)
async def root() -> dict[str, str]:
logger.info("Root endpoint called")
return {"message": "Hello from Python API"}