Skip to content

FastAPI Integration

This document provides comprehensive guidance for integrating FastAPI with Temporal.io in enterprise environments, creating robust REST APIs that orchestrate workflows and provide business functionality.

Overview

FastAPI serves as the HTTP interface layer for Temporal workflows, providing REST endpoints for workflow initiation, monitoring, and interaction. This integration enables web applications, mobile apps, and other services to interact with Temporal workflows through standard HTTP APIs.

Project Structure

fastapi-temporal-service/
├── src/
│   └── temporal_api/
│       ├── __init__.py
│       ├── main.py              # FastAPI application
│       ├── api/
│       │   ├── __init__.py
│       │   ├── v1/
│       │   │   ├── __init__.py
│       │   │   ├── orders.py    # Order endpoints
│       │   │   ├── workflows.py # Workflow management
│       │   │   └── health.py    # Health checks
│       │   └── dependencies.py  # API dependencies
│       ├── models/
│       │   ├── __init__.py
│       │   ├── api_models.py    # API request/response models
│       │   └── temporal_models.py # Temporal data models
│       ├── services/
│       │   ├── __init__.py
│       │   ├── temporal_client.py
│       │   └── workflow_service.py
│       ├── middleware/
│       │   ├── __init__.py
│       │   ├── auth.py          # Authentication middleware
│       │   ├── logging.py       # Request logging
│       │   └── metrics.py       # Metrics collection
│       └── utils/
│           ├── __init__.py
│           ├── config.py
│           └── exceptions.py

FastAPI Application Setup

Main Application

# src/temporal_api/main.py
import logging
from contextlib import asynccontextmanager
from typing import AsyncGenerator

from fastapi import FastAPI, Request, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware
from fastapi.responses import JSONResponse
from prometheus_fastapi_instrumentator import Instrumentator

from .api.v1 import orders, workflows, health
from .middleware.auth import AuthMiddleware
from .middleware.logging import LoggingMiddleware
from .services.temporal_client import TemporalClientService
from .utils.config import get_settings
from .utils.exceptions import TemporalAPIException

# Global temporal client service
temporal_service: TemporalClientService = None

@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator:
    """Application lifespan manager"""
    global temporal_service

    # Startup
    settings = get_settings()
    temporal_service = TemporalClientService(settings)
    await temporal_service.connect()

    # Add to app state
    app.state.temporal_service = temporal_service

    yield

    # Shutdown
    if temporal_service:
        await temporal_service.disconnect()

# Create FastAPI application
app = FastAPI(
    title="Temporal Enterprise API",
    description="REST API for Temporal workflow orchestration",
    version="1.0.0",
    lifespan=lifespan,
    docs_url="/docs",
    redoc_url="/redoc",
    openapi_url="/openapi.json"
)

# Middleware setup
app.add_middleware(GZipMiddleware, minimum_size=1000)
app.add_middleware(
    CORSMiddleware,
    allow_origins=get_settings().allowed_origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)
app.add_middleware(AuthMiddleware)
app.add_middleware(LoggingMiddleware)

# Metrics instrumentation
Instrumentator().instrument(app).expose(app, endpoint="/metrics")

# Exception handlers
@app.exception_handler(TemporalAPIException)
async def temporal_api_exception_handler(request: Request, exc: TemporalAPIException):
    return JSONResponse(
        status_code=exc.status_code,
        content={
            "error": exc.error_type,
            "message": exc.message,
            "details": exc.details,
            "request_id": getattr(request.state, "request_id", None)
        }
    )

@app.exception_handler(Exception)
async def general_exception_handler(request: Request, exc: Exception):
    logging.error(f"Unhandled exception: {exc}", exc_info=True)
    return JSONResponse(
        status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
        content={
            "error": "internal_server_error",
            "message": "An internal server error occurred",
            "request_id": getattr(request.state, "request_id", None)
        }
    )

# Include routers
app.include_router(health.router, prefix="/health", tags=["health"])
app.include_router(orders.router, prefix="/api/v1/orders", tags=["orders"])
app.include_router(workflows.router, prefix="/api/v1/workflows", tags=["workflows"])

@app.get("/", include_in_schema=False)
async def root():
    return {"message": "Temporal Enterprise API", "version": "1.0.0"}

API Models

Request/Response Models

# src/temporal_api/models/api_models.py
from datetime import datetime
from decimal import Decimal
from enum import Enum
from typing import List, Optional, Dict, Any
from uuid import UUID

from pydantic import BaseModel, Field, validator

class OrderStatus(str, Enum):
    PENDING = "pending"
    CONFIRMED = "confirmed"
    PROCESSING = "processing"
    SHIPPED = "shipped"
    DELIVERED = "delivered"
    CANCELLED = "cancelled"

class CreateOrderRequest(BaseModel):
    customer_id: str = Field(..., min_length=1)
    items: List[Dict[str, Any]] = Field(..., min_items=1)
    shipping_address: Dict[str, str]
    payment_method: str
    payment_details: Dict[str, Any]

    class Config:
        schema_extra = {
            "example": {
                "customer_id": "cust_123",
                "items": [
                    {
                        "id": "item_1",
                        "name": "Product A",
                        "sku": "SKU-001",
                        "quantity": 2,
                        "unit_price": 29.99
                    }
                ],
                "shipping_address": {
                    "street": "123 Main St",
                    "city": "Anytown",
                    "state": "CA",
                    "zip_code": "12345"
                },
                "payment_method": "credit_card",
                "payment_details": {
                    "token": "tok_123456789"
                }
            }
        }

class OrderResponse(BaseModel):
    id: str
    customer_id: str
    status: OrderStatus
    total_amount: Decimal
    created_at: datetime
    workflow_id: Optional[str] = None

class WorkflowExecutionRequest(BaseModel):
    workflow_type: str
    task_queue: str = "temporal-product-queue"
    input_data: Dict[str, Any]
    workflow_id: Optional[str] = None
    execution_timeout_seconds: Optional[int] = 3600

class WorkflowExecutionResponse(BaseModel):
    workflow_id: str
    run_id: str
    status: str = "RUNNING"
    result: Optional[Dict[str, Any]] = None

class WorkflowStatusResponse(BaseModel):
    workflow_id: str
    run_id: str
    status: str
    result: Optional[Dict[str, Any]] = None
    history_length: Optional[int] = None
    execution_time_seconds: Optional[float] = None

class WorkflowSignalRequest(BaseModel):
    signal_name: str
    signal_input: Optional[Dict[str, Any]] = None

class WorkflowQueryRequest(BaseModel):
    query_name: str
    query_args: Optional[Dict[str, Any]] = None

class HealthCheckResponse(BaseModel):
    status: str = "healthy"
    timestamp: datetime = Field(default_factory=datetime.utcnow)
    services: Dict[str, str] = Field(default_factory=dict)
    version: str = "1.0.0"

Temporal Client Service

Client Management

# src/temporal_api/services/temporal_client.py
import asyncio
import logging
from typing import Any, Dict, Optional, Type
from datetime import timedelta

from temporalio.client import Client, WorkflowHandle
from temporalio.common import RetryPolicy
from temporalio.exceptions import WorkflowAlreadyStartedError

from ..utils.config import Settings
from ..utils.exceptions import TemporalAPIException

class TemporalClientService:
    """Service for managing Temporal client connections and operations"""

    def __init__(self, settings: Settings):
        self.settings = settings
        self.client: Optional[Client] = None
        self.logger = logging.getLogger(__name__)

    async def connect(self) -> None:
        """Connect to Temporal server"""
        try:
            self.client = await Client.connect(
                self.settings.temporal_server_url,
                namespace=self.settings.temporal_namespace,
                tls=self.settings.temporal_tls_config if self.settings.temporal_tls_enabled else False
            )
            self.logger.info(f"Connected to Temporal server: {self.settings.temporal_server_url}")
        except Exception as e:
            self.logger.error(f"Failed to connect to Temporal server: {e}")
            raise TemporalAPIException(
                "temporal_connection_error",
                "Failed to connect to Temporal server",
                status_code=503,
                details={"error": str(e)}
            )

    async def disconnect(self) -> None:
        """Disconnect from Temporal server"""
        if self.client:
            await self.client.close()
            self.logger.info("Disconnected from Temporal server")

    async def start_workflow(
        self,
        workflow_type: str,
        workflow_input: Any,
        workflow_id: Optional[str] = None,
        task_queue: str = "temporal-product-queue",
        execution_timeout: Optional[timedelta] = None,
        retry_policy: Optional[RetryPolicy] = None
    ) -> WorkflowHandle:
        """Start a new workflow execution"""

        if not self.client:
            raise TemporalAPIException(
                "temporal_not_connected",
                "Temporal client not connected",
                status_code=503
            )

        try:
            handle = await self.client.start_workflow(
                workflow_type,
                workflow_input,
                id=workflow_id,
                task_queue=task_queue,
                execution_timeout=execution_timeout,
                retry_policy=retry_policy
            )

            self.logger.info(
                f"Started workflow {workflow_type}",
                extra={
                    "workflow_id": handle.id,
                    "run_id": handle.result_run_id,
                    "task_queue": task_queue
                }
            )

            return handle

        except WorkflowAlreadyStartedError:
            # Get existing workflow handle
            handle = self.client.get_workflow_handle(workflow_id)
            self.logger.warning(f"Workflow {workflow_id} already exists, returning existing handle")
            return handle

        except Exception as e:
            self.logger.error(f"Failed to start workflow: {e}")
            raise TemporalAPIException(
                "workflow_start_error",
                f"Failed to start workflow: {str(e)}",
                status_code=400,
                details={"workflow_type": workflow_type, "error": str(e)}
            )

    async def get_workflow_status(self, workflow_id: str) -> Dict[str, Any]:
        """Get workflow execution status"""

        if not self.client:
            raise TemporalAPIException(
                "temporal_not_connected",
                "Temporal client not connected",
                status_code=503
            )

        try:
            handle = self.client.get_workflow_handle(workflow_id)

            # Get workflow description
            description = await handle.describe()

            status_info = {
                "workflow_id": workflow_id,
                "run_id": description.run_id,
                "status": description.status.name,
                "workflow_type": description.workflow_type,
                "task_queue": description.task_queue,
                "start_time": description.start_time.isoformat() if description.start_time else None,
                "close_time": description.close_time.isoformat() if description.close_time else None,
                "execution_time_seconds": None,
                "history_length": description.history_length,
                "result": None
            }

            # Calculate execution time if workflow is closed
            if description.start_time and description.close_time:
                execution_time = description.close_time - description.start_time
                status_info["execution_time_seconds"] = execution_time.total_seconds()

            # Get result if workflow is completed
            if description.status.name in ["COMPLETED", "FAILED", "CANCELED"]:
                try:
                    if description.status.name == "COMPLETED":
                        result = await handle.result()
                        if hasattr(result, 'dict'):
                            status_info["result"] = result.dict()
                        else:
                            status_info["result"] = result
                except Exception as e:
                    self.logger.warning(f"Failed to get workflow result: {e}")

            return status_info

        except Exception as e:
            self.logger.error(f"Failed to get workflow status: {e}")
            raise TemporalAPIException(
                "workflow_status_error",
                f"Failed to get workflow status: {str(e)}",
                status_code=404,
                details={"workflow_id": workflow_id, "error": str(e)}
            )

    async def signal_workflow(
        self,
        workflow_id: str,
        signal_name: str,
        signal_input: Any = None
    ) -> None:
        """Send signal to workflow"""

        if not self.client:
            raise TemporalAPIException(
                "temporal_not_connected",
                "Temporal client not connected",
                status_code=503
            )

        try:
            handle = self.client.get_workflow_handle(workflow_id)
            await handle.signal(signal_name, signal_input)

            self.logger.info(
                f"Sent signal {signal_name} to workflow {workflow_id}",
                extra={"signal_input": signal_input}
            )

        except Exception as e:
            self.logger.error(f"Failed to signal workflow: {e}")
            raise TemporalAPIException(
                "workflow_signal_error",
                f"Failed to signal workflow: {str(e)}",
                status_code=400,
                details={
                    "workflow_id": workflow_id,
                    "signal_name": signal_name,
                    "error": str(e)
                }
            )

    async def query_workflow(
        self,
        workflow_id: str,
        query_name: str,
        query_args: Any = None
    ) -> Any:
        """Query workflow for information"""

        if not self.client:
            raise TemporalAPIException(
                "temporal_not_connected",
                "Temporal client not connected",
                status_code=503
            )

        try:
            handle = self.client.get_workflow_handle(workflow_id)
            result = await handle.query(query_name, query_args)

            self.logger.info(
                f"Queried workflow {workflow_id} with {query_name}",
                extra={"query_args": query_args}
            )

            return result

        except Exception as e:
            self.logger.error(f"Failed to query workflow: {e}")
            raise TemporalAPIException(
                "workflow_query_error",
                f"Failed to query workflow: {str(e)}",
                status_code=400,
                details={
                    "workflow_id": workflow_id,
                    "query_name": query_name,
                    "error": str(e)
                }
            )

API Endpoints

Order Management Endpoints

# src/temporal_api/api/v1/orders.py
from datetime import timedelta
from typing import List
from uuid import uuid4

from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.responses import JSONResponse

from ...models.api_models import (
    CreateOrderRequest, OrderResponse, WorkflowStatusResponse
)
from ...services.temporal_client import TemporalClientService
from ...utils.exceptions import TemporalAPIException
from ..dependencies import get_temporal_service, get_current_user

router = APIRouter()

@router.post("/", response_model=OrderResponse, status_code=status.HTTP_201_CREATED)
async def create_order(
    order_request: CreateOrderRequest,
    temporal_service: TemporalClientService = Depends(get_temporal_service),
    current_user: dict = Depends(get_current_user)
) -> OrderResponse:
    """Create a new order and start order processing workflow"""

    try:
        # Generate unique order ID
        order_id = str(uuid4())

        # Prepare workflow input
        workflow_input = {
            "id": order_id,
            "customer_id": order_request.customer_id,
            "items": order_request.items,
            "shipping_address": order_request.shipping_address,
            "payment_method": order_request.payment_method,
            "payment_details": order_request.payment_details
        }

        # Start order processing workflow
        handle = await temporal_service.start_workflow(
            workflow_type="order_processing",
            workflow_input=workflow_input,
            workflow_id=f"order-{order_id}",
            task_queue="temporal-product-queue",
            execution_timeout=timedelta(hours=2)
        )

        # Calculate total amount (simplified)
        total_amount = sum(
            item.get("quantity", 0) * item.get("unit_price", 0)
            for item in order_request.items
        )

        return OrderResponse(
            id=order_id,
            customer_id=order_request.customer_id,
            status="PENDING",
            total_amount=total_amount,
            created_at=datetime.utcnow(),
            workflow_id=handle.id
        )

    except TemporalAPIException:
        raise
    except Exception as e:
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"Failed to create order: {str(e)}"
        )

@router.get("/{order_id}", response_model=WorkflowStatusResponse)
async def get_order_status(
    order_id: str,
    temporal_service: TemporalClientService = Depends(get_temporal_service),
    current_user: dict = Depends(get_current_user)
) -> WorkflowStatusResponse:
    """Get order processing status"""

    try:
        workflow_id = f"order-{order_id}"
        status_info = await temporal_service.get_workflow_status(workflow_id)

        return WorkflowStatusResponse(**status_info)

    except TemporalAPIException:
        raise
    except Exception as e:
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"Failed to get order status: {str(e)}"
        )

@router.post("/{order_id}/cancel", status_code=status.HTTP_204_NO_CONTENT)
async def cancel_order(
    order_id: str,
    temporal_service: TemporalClientService = Depends(get_temporal_service),
    current_user: dict = Depends(get_current_user)
):
    """Cancel an order by sending cancel signal to workflow"""

    try:
        workflow_id = f"order-{order_id}"
        await temporal_service.signal_workflow(
            workflow_id=workflow_id,
            signal_name="cancel",
            signal_input={"reason": "customer_request"}
        )

        return JSONResponse(
            status_code=status.HTTP_204_NO_CONTENT,
            content=None
        )

    except TemporalAPIException:
        raise
    except Exception as e:
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"Failed to cancel order: {str(e)}"
        )

Health Check Endpoints

# src/temporal_api/api/v1/health.py
from datetime import datetime
from fastapi import APIRouter, Depends
from ...models.api_models import HealthCheckResponse
from ...services.temporal_client import TemporalClientService
from ..dependencies import get_temporal_service

router = APIRouter()

@router.get("/", response_model=HealthCheckResponse)
async def health_check(
    temporal_service: TemporalClientService = Depends(get_temporal_service)
) -> HealthCheckResponse:
    """Application health check"""

    services = {}

    # Check Temporal connection
    if temporal_service and temporal_service.client:
        try:
            # Simple check - list workflows (limited)
            services["temporal"] = "healthy"
        except Exception:
            services["temporal"] = "unhealthy"
    else:
        services["temporal"] = "disconnected"

    # Determine overall status
    overall_status = "healthy" if all(
        status == "healthy" for status in services.values()
    ) else "unhealthy"

    return HealthCheckResponse(
        status=overall_status,
        timestamp=datetime.utcnow(),
        services=services
    )

@router.get("/ready")
async def readiness_check():
    """Kubernetes readiness probe"""
    return {"status": "ready"}

@router.get("/live")
async def liveness_check():
    """Kubernetes liveness probe"""
    return {"status": "alive"}

Dependencies and Middleware

API Dependencies

# src/temporal_api/api/dependencies.py
from fastapi import Depends, HTTPException, Request, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials

from ..services.temporal_client import TemporalClientService
from ..utils.exceptions import TemporalAPIException

security = HTTPBearer()

def get_temporal_service(request: Request) -> TemporalClientService:
    """Get Temporal client service from app state"""
    temporal_service = getattr(request.app.state, "temporal_service", None)
    if not temporal_service:
        raise TemporalAPIException(
            "service_unavailable",
            "Temporal service not available",
            status_code=503
        )
    return temporal_service

async def get_current_user(
    credentials: HTTPAuthorizationCredentials = Depends(security)
) -> dict:
    """Get current authenticated user (simplified)"""
    # In production, validate JWT token and extract user info
    # This is a simplified example
    return {
        "user_id": "user_123",
        "username": "test_user",
        "roles": ["user"]
    }

Authentication Middleware

# src/temporal_api/middleware/auth.py
import logging
from fastapi import Request, HTTPException, status
from starlette.middleware.base import BaseHTTPMiddleware

class AuthMiddleware(BaseHTTPMiddleware):
    """Authentication middleware for API requests"""

    EXCLUDED_PATHS = {"/health", "/metrics", "/docs", "/redoc", "/openapi.json"}

    async def dispatch(self, request: Request, call_next):
        # Skip authentication for excluded paths
        if any(request.url.path.startswith(path) for path in self.EXCLUDED_PATHS):
            return await call_next(request)

        # Check for Authorization header
        auth_header = request.headers.get("Authorization")
        if not auth_header or not auth_header.startswith("Bearer "):
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Missing or invalid authorization header"
            )

        # In production, validate JWT token here
        token = auth_header.split(" ")[1]

        # Add user context to request
        request.state.user = {"user_id": "user_123", "token": token}

        response = await call_next(request)
        return response

Configuration and Utilities

Exception Handling

# src/temporal_api/utils/exceptions.py
from typing import Optional, Dict, Any

class TemporalAPIException(Exception):
    """Custom exception for Temporal API errors"""

    def __init__(
        self,
        error_type: str,
        message: str,
        status_code: int = 400,
        details: Optional[Dict[str, Any]] = None
    ):
        self.error_type = error_type
        self.message = message
        self.status_code = status_code
        self.details = details or {}
        super().__init__(message)

Docker Configuration

Dockerfile

FROM python:3.11-slim

WORKDIR /app

# Install system dependencies
RUN apt-get update && apt-get install -y \
    gcc \
    && rm -rf /var/lib/apt/lists/*

# Copy requirements and install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY src/ ./src/

# Set environment variables
ENV PYTHONPATH="/app/src"
ENV PYTHONUNBUFFERED=1

# Expose port
EXPOSE 8000

# Health check
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:8000/health || exit 1

# Run application
CMD ["uvicorn", "temporal_api.main:app", "--host", "0.0.0.0", "--port", "8000"]

This FastAPI integration provides a robust HTTP interface for Temporal workflows with enterprise features including authentication, monitoring, error handling, and comprehensive API documentation.