Skip to content

Testing

This document provides comprehensive testing strategies and implementation guidelines for Temporal.io workflows, activities, and FastAPI integrations in enterprise environments.

Overview

Testing Temporal applications requires specialized approaches due to their distributed, asynchronous nature. This guide covers unit testing, integration testing, load testing, and end-to-end testing strategies using Temporal's testing framework and best practices.

Testing Framework Setup

Dependencies

# requirements-test.txt
pytest>=7.0.0
pytest-asyncio>=0.21.0
pytest-cov>=4.0.0
pytest-mock>=3.10.0
temporalio[testing]>=1.0.0
httpx>=0.24.0  # For FastAPI testing
factory-boy>=3.2.0  # For test data generation
freezegun>=1.2.0  # For time mocking

Test Configuration

# tests/conftest.py
import asyncio
import pytest
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker
from httpx import AsyncClient

from src.temporal_workflows.workflows import OrderProcessingWorkflow
from src.temporal_workflows.activities import PaymentActivity, InventoryActivity
from src.temporal_api.main import app

@pytest.fixture(scope="session")
def event_loop():
    """Create event loop for async tests"""
    policy = asyncio.get_event_loop_policy()
    loop = policy.new_event_loop()
    yield loop
    loop.close()

@pytest.fixture
async def temporal_env():
    """Create Temporal test environment"""
    async with WorkflowEnvironment() as env:
        yield env

@pytest.fixture
async def temporal_worker(temporal_env):
    """Create Temporal worker for testing"""
    async with Worker(
        temporal_env.client,
        task_queue="test-queue",
        workflows=[OrderProcessingWorkflow],
        activities=[
            PaymentActivity.process_payment,
            InventoryActivity.reserve_inventory,
            InventoryActivity.release_inventory,
        ]
    ):
        yield

@pytest.fixture
async def fastapi_client():
    """Create FastAPI test client"""
    async with AsyncClient(app=app, base_url="http://test") as client:
        yield client

@pytest.fixture
def sample_order_data():
    """Sample order data for testing"""
    return {
        "id": "order-123",
        "customer_id": "cust-456",
        "items": [
            {
                "id": "item-1",
                "name": "Test Product",
                "sku": "TEST-001",
                "quantity": 2,
                "unit_price": 50.00
            }
        ],
        "shipping_address": {
            "street": "123 Test St",
            "city": "Test City",
            "state": "TS",
            "zip_code": "12345"
        },
        "payment_method": "credit_card",
        "payment_details": {"token": "test_token_123"}
    }

Unit Testing Workflows

Workflow Testing

# tests/test_workflows.py
import pytest
from datetime import timedelta
from temporalio.testing import WorkflowEnvironment
from temporalio.common import RetryPolicy

from src.temporal_workflows.workflows import OrderProcessingWorkflow
from src.temporal_workflows.models import OrderData, OrderStatus

class TestOrderProcessingWorkflow:
    """Test cases for order processing workflow"""

    @pytest.mark.asyncio
    async def test_successful_order_processing(self, temporal_env, sample_order_data):
        """Test successful order processing flow"""

        async with Worker(
            temporal_env.client,
            task_queue="test-queue",
            workflows=[OrderProcessingWorkflow],
            activities=[
                # Mock activities that always succeed
                self.mock_payment_success,
                self.mock_inventory_success,
                self.mock_shipping_success,
            ]
        ):
            # Start workflow
            handle = await temporal_env.client.start_workflow(
                OrderProcessingWorkflow.run,
                OrderData(**sample_order_data),
                id="test-order-123",
                task_queue="test-queue",
                execution_timeout=timedelta(minutes=5)
            )

            # Wait for completion
            result = await handle.result()

            # Assertions
            assert result.status == OrderStatus.COMPLETED
            assert result.id == "order-123"
            assert result.total_amount == 100.00

    @pytest.mark.asyncio
    async def test_payment_failure_rollback(self, temporal_env, sample_order_data):
        """Test rollback when payment fails"""

        async with Worker(
            temporal_env.client,
            task_queue="test-queue",
            workflows=[OrderProcessingWorkflow],
            activities=[
                self.mock_payment_failure,  # Payment fails
                self.mock_inventory_success,
                self.mock_inventory_release,  # Should be called for rollback
            ]
        ):
            handle = await temporal_env.client.start_workflow(
                OrderProcessingWorkflow.run,
                OrderData(**sample_order_data),
                id="test-order-payment-fail",
                task_queue="test-queue"
            )

            result = await handle.result()

            assert result.status == OrderStatus.FAILED
            assert "payment_failed" in result.failure_reason

    @pytest.mark.asyncio
    async def test_workflow_signals(self, temporal_env, sample_order_data):
        """Test workflow signal handling"""

        async with Worker(
            temporal_env.client,
            task_queue="test-queue",
            workflows=[OrderProcessingWorkflow],
            activities=[self.mock_long_running_activity]
        ):
            handle = await temporal_env.client.start_workflow(
                OrderProcessingWorkflow.run,
                OrderData(**sample_order_data),
                id="test-order-signal",
                task_queue="test-queue"
            )

            # Send cancel signal
            await handle.signal(OrderProcessingWorkflow.cancel, {"reason": "customer_request"})

            result = await handle.result()

            assert result.status == OrderStatus.CANCELLED
            assert result.cancellation_reason == "customer_request"

    @pytest.mark.asyncio
    async def test_workflow_queries(self, temporal_env, sample_order_data):
        """Test workflow query handling"""

        async with Worker(
            temporal_env.client,
            task_queue="test-queue",
            workflows=[OrderProcessingWorkflow],
            activities=[self.mock_slow_activity]
        ):
            handle = await temporal_env.client.start_workflow(
                OrderProcessingWorkflow.run,
                OrderData(**sample_order_data),
                id="test-order-query",
                task_queue="test-queue"
            )

            # Query workflow status
            status = await handle.query(OrderProcessingWorkflow.get_status)

            assert status["current_step"] in ["payment", "inventory", "shipping"]
            assert status["progress"] >= 0

    # Mock activity implementations
    async def mock_payment_success(self, payment_data):
        return {"status": "success", "transaction_id": "txn_123"}

    async def mock_payment_failure(self, payment_data):
        raise Exception("Payment processing failed")

    async def mock_inventory_success(self, items):
        return {"status": "reserved", "reservation_id": "res_123"}

    async def mock_inventory_release(self, reservation_id):
        return {"status": "released"}

    async def mock_shipping_success(self, shipping_data):
        return {"status": "scheduled", "tracking_number": "TRK_123"}

    async def mock_long_running_activity(self, data):
        # Simulate long-running activity for signal testing
        await asyncio.sleep(10)
        return {"status": "completed"}

    async def mock_slow_activity(self, data):
        # Simulate activity for query testing
        await asyncio.sleep(2)
        return {"status": "completed"}

Unit Testing Activities

Activity Testing

# tests/test_activities.py
import pytest
from unittest.mock import AsyncMock, patch
from decimal import Decimal

from src.temporal_workflows.activities import PaymentActivity, InventoryActivity

class TestPaymentActivity:
    """Test cases for payment activities"""

    @pytest.mark.asyncio
    async def test_process_payment_success(self):
        """Test successful payment processing"""

        payment_data = {
            "amount": Decimal("100.00"),
            "currency": "USD",
            "payment_method": "credit_card",
            "customer_id": "cust_123",
            "payment_details": {"token": "tok_123"}
        }

        with patch('src.temporal_workflows.activities.PaymentClient') as mock_client:
            mock_client.return_value.process_payment.return_value = {
                "status": "success",
                "transaction_id": "txn_123456",
                "amount_charged": Decimal("100.00")
            }

            result = await PaymentActivity.process_payment(payment_data)

            assert result["status"] == "success"
            assert result["transaction_id"] == "txn_123456"
            assert result["amount_charged"] == Decimal("100.00")

    @pytest.mark.asyncio
    async def test_process_payment_insufficient_funds(self):
        """Test payment failure due to insufficient funds"""

        payment_data = {
            "amount": Decimal("1000.00"),
            "currency": "USD",
            "payment_method": "credit_card",
            "customer_id": "cust_123",
            "payment_details": {"token": "tok_invalid"}
        }

        with patch('src.temporal_workflows.activities.PaymentClient') as mock_client:
            mock_client.return_value.process_payment.side_effect = Exception("Insufficient funds")

            with pytest.raises(Exception) as exc_info:
                await PaymentActivity.process_payment(payment_data)

            assert "Insufficient funds" in str(exc_info.value)

    @pytest.mark.asyncio
    async def test_refund_payment(self):
        """Test payment refund"""

        refund_data = {
            "transaction_id": "txn_123456",
            "amount": Decimal("50.00"),
            "reason": "partial_refund"
        }

        with patch('src.temporal_workflows.activities.PaymentClient') as mock_client:
            mock_client.return_value.refund_payment.return_value = {
                "status": "success",
                "refund_id": "ref_789",
                "amount_refunded": Decimal("50.00")
            }

            result = await PaymentActivity.refund_payment(refund_data)

            assert result["status"] == "success"
            assert result["refund_id"] == "ref_789"

class TestInventoryActivity:
    """Test cases for inventory activities"""

    @pytest.mark.asyncio
    async def test_reserve_inventory_success(self):
        """Test successful inventory reservation"""

        items = [
            {"sku": "TEST-001", "quantity": 2},
            {"sku": "TEST-002", "quantity": 1}
        ]

        with patch('src.temporal_workflows.activities.InventoryClient') as mock_client:
            mock_client.return_value.reserve_items.return_value = {
                "status": "success",
                "reservation_id": "res_123",
                "reserved_items": items
            }

            result = await InventoryActivity.reserve_inventory(items)

            assert result["status"] == "success"
            assert result["reservation_id"] == "res_123"
            assert len(result["reserved_items"]) == 2

    @pytest.mark.asyncio
    async def test_reserve_inventory_insufficient_stock(self):
        """Test inventory reservation with insufficient stock"""

        items = [{"sku": "OUT-OF-STOCK", "quantity": 10}]

        with patch('src.temporal_workflows.activities.InventoryClient') as mock_client:
            mock_client.return_value.reserve_items.side_effect = Exception("Insufficient stock")

            with pytest.raises(Exception) as exc_info:
                await InventoryActivity.reserve_inventory(items)

            assert "Insufficient stock" in str(exc_info.value)

Integration Testing

FastAPI Integration Tests

# tests/test_api_integration.py
import pytest
from unittest.mock import AsyncMock, patch
from httpx import AsyncClient

from src.temporal_api.main import app

class TestOrderAPI:
    """Integration tests for order API endpoints"""

    @pytest.mark.asyncio
    async def test_create_order_success(self, fastapi_client: AsyncClient):
        """Test successful order creation via API"""

        order_request = {
            "customer_id": "cust_123",
            "items": [
                {
                    "id": "item_1",
                    "name": "Test Product",
                    "sku": "TEST-001",
                    "quantity": 2,
                    "unit_price": 29.99
                }
            ],
            "shipping_address": {
                "street": "123 Test St",
                "city": "Test City",
                "state": "TS",
                "zip_code": "12345"
            },
            "payment_method": "credit_card",
            "payment_details": {"token": "tok_123"}
        }

        with patch('src.temporal_api.services.temporal_client.TemporalClientService') as mock_service:
            mock_handle = AsyncMock()
            mock_handle.id = "order-test-123"
            mock_service.return_value.start_workflow.return_value = mock_handle

            response = await fastapi_client.post(
                "/api/v1/orders/",
                json=order_request,
                headers={"Authorization": "Bearer test-token"}
            )

        assert response.status_code == 201
        data = response.json()
        assert data["customer_id"] == "cust_123"
        assert data["status"] == "PENDING"
        assert data["total_amount"] == 59.98
        assert data["workflow_id"] == "order-test-123"

    @pytest.mark.asyncio
    async def test_get_order_status(self, fastapi_client: AsyncClient):
        """Test getting order status via API"""

        with patch('src.temporal_api.services.temporal_client.TemporalClientService') as mock_service:
            mock_service.return_value.get_workflow_status.return_value = {
                "workflow_id": "order-test-123",
                "run_id": "run-456",
                "status": "RUNNING",
                "workflow_type": "order_processing",
                "history_length": 15
            }

            response = await fastapi_client.get(
                "/api/v1/orders/test-123",
                headers={"Authorization": "Bearer test-token"}
            )

        assert response.status_code == 200
        data = response.json()
        assert data["workflow_id"] == "order-test-123"
        assert data["status"] == "RUNNING"

    @pytest.mark.asyncio
    async def test_cancel_order(self, fastapi_client: AsyncClient):
        """Test order cancellation via API"""

        with patch('src.temporal_api.services.temporal_client.TemporalClientService') as mock_service:
            mock_service.return_value.signal_workflow.return_value = None

            response = await fastapi_client.post(
                "/api/v1/orders/test-123/cancel",
                headers={"Authorization": "Bearer test-token"}
            )

        assert response.status_code == 204

    @pytest.mark.asyncio
    async def test_unauthorized_request(self, fastapi_client: AsyncClient):
        """Test API request without authorization"""

        response = await fastapi_client.get("/api/v1/orders/test-123")
        assert response.status_code == 401

Load Testing

Temporal Load Tests

# tests/test_load.py
import asyncio
import pytest
from concurrent.futures import ThreadPoolExecutor
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker

from src.temporal_workflows.workflows import OrderProcessingWorkflow

class TestWorkflowLoad:
    """Load tests for workflow execution"""

    @pytest.mark.asyncio
    async def test_concurrent_workflow_execution(self, temporal_env):
        """Test concurrent workflow execution under load"""

        async with Worker(
            temporal_env.client,
            task_queue="load-test-queue",
            workflows=[OrderProcessingWorkflow],
            activities=[self.mock_fast_activity]
        ):
            # Start multiple workflows concurrently
            concurrent_workflows = 50
            handles = []

            for i in range(concurrent_workflows):
                handle = await temporal_env.client.start_workflow(
                    OrderProcessingWorkflow.run,
                    {"id": f"load-test-{i}", "items": [{"sku": "TEST", "quantity": 1}]},
                    id=f"load-test-workflow-{i}",
                    task_queue="load-test-queue"
                )
                handles.append(handle)

            # Wait for all workflows to complete
            results = await asyncio.gather(*[handle.result() for handle in handles])

            # Verify all workflows completed successfully
            assert len(results) == concurrent_workflows
            for result in results:
                assert result.status in ["COMPLETED", "FAILED"]  # Allow some failures under load

    async def mock_fast_activity(self, data):
        """Fast mock activity for load testing"""
        await asyncio.sleep(0.1)  # Simulate minimal processing time
        return {"status": "success"}

API Load Tests with Locust

# tests/locustfile.py
from locust import HttpUser, task, between
import json
import random

class OrderAPIUser(HttpUser):
    """Load test user for Order API"""

    wait_time = between(1, 3)

    def on_start(self):
        """Setup test user"""
        self.auth_header = {"Authorization": "Bearer test-token"}

    @task(3)
    def create_order(self):
        """Create new order - most common operation"""
        order_data = {
            "customer_id": f"cust_{random.randint(1000, 9999)}",
            "items": [
                {
                    "id": f"item_{random.randint(1, 100)}",
                    "name": "Test Product",
                    "sku": f"SKU-{random.randint(100, 999)}",
                    "quantity": random.randint(1, 5),
                    "unit_price": round(random.uniform(10, 100), 2)
                }
            ],
            "shipping_address": {
                "street": "123 Load Test St",
                "city": "Test City",
                "state": "TS",
                "zip_code": "12345"
            },
            "payment_method": "credit_card",
            "payment_details": {"token": f"tok_{random.randint(10000, 99999)}"}
        }

        response = self.client.post(
            "/api/v1/orders/",
            json=order_data,
            headers=self.auth_header
        )

        if response.status_code == 201:
            # Store order ID for subsequent operations
            order_data = response.json()
            self.order_id = order_data["id"]

    @task(2)
    def get_order_status(self):
        """Check order status"""
        if hasattr(self, 'order_id'):
            self.client.get(
                f"/api/v1/orders/{self.order_id}",
                headers=self.auth_header
            )

    @task(1)
    def cancel_order(self):
        """Cancel order - less common operation"""
        if hasattr(self, 'order_id'):
            self.client.post(
                f"/api/v1/orders/{self.order_id}/cancel",
                headers=self.auth_header
            )

    @task(1)
    def health_check(self):
        """Health check endpoint"""
        self.client.get("/health")

End-to-End Testing

Complete Workflow Tests

# tests/test_e2e.py
import pytest
import asyncio
from datetime import timedelta
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker

class TestEndToEndFlow:
    """End-to-end tests for complete order processing flow"""

    @pytest.mark.asyncio
    async def test_complete_order_lifecycle(self, temporal_env):
        """Test complete order processing from start to finish"""

        # Setup real-like activities (with mocked external services)
        async with Worker(
            temporal_env.client,
            task_queue="e2e-test-queue",
            workflows=[OrderProcessingWorkflow],
            activities=[
                self.realistic_payment_activity,
                self.realistic_inventory_activity,
                self.realistic_shipping_activity,
            ]
        ):
            order_data = {
                "id": "e2e-order-123",
                "customer_id": "e2e-customer-456",
                "items": [
                    {"sku": "E2E-PRODUCT-1", "quantity": 2, "unit_price": 50.00},
                    {"sku": "E2E-PRODUCT-2", "quantity": 1, "unit_price": 25.00}
                ],
                "total_amount": 125.00,
                "shipping_address": {
                    "street": "123 E2E Test St",
                    "city": "E2E City",
                    "state": "E2E",
                    "zip_code": "12345"
                },
                "payment_method": "credit_card",
                "payment_details": {"token": "e2e_test_token"}
            }

            # Start workflow
            handle = await temporal_env.client.start_workflow(
                OrderProcessingWorkflow.run,
                order_data,
                id="e2e-order-workflow-123",
                task_queue="e2e-test-queue",
                execution_timeout=timedelta(minutes=10)
            )

            # Monitor workflow progress
            status_checks = 0
            while status_checks < 10:  # Maximum checks to prevent infinite loop
                try:
                    status = await handle.query(OrderProcessingWorkflow.get_status)
                    print(f"Workflow status: {status}")

                    if status.get("current_step") == "completed":
                        break

                    await asyncio.sleep(1)
                    status_checks += 1
                except:
                    # Workflow might not be ready for queries yet
                    await asyncio.sleep(1)
                    status_checks += 1

            # Get final result
            result = await handle.result()

            # Verify end-to-end flow
            assert result.status == "COMPLETED"
            assert result.payment_transaction_id is not None
            assert result.inventory_reservation_id is not None
            assert result.shipping_tracking_number is not None
            assert result.total_amount == 125.00

    async def realistic_payment_activity(self, payment_data):
        """Realistic payment processing with delays"""
        await asyncio.sleep(2)  # Simulate external API call
        return {
            "status": "success",
            "transaction_id": f"txn_{payment_data['customer_id']}_123",
            "amount_charged": payment_data["amount"]
        }

    async def realistic_inventory_activity(self, items):
        """Realistic inventory processing with delays"""
        await asyncio.sleep(1.5)  # Simulate database operations
        return {
            "status": "reserved",
            "reservation_id": f"res_{len(items)}_456",
            "reserved_items": items
        }

    async def realistic_shipping_activity(self, shipping_data):
        """Realistic shipping processing with delays"""
        await asyncio.sleep(3)  # Simulate shipping partner API
        return {
            "status": "scheduled",
            "tracking_number": f"TRK{shipping_data['zip_code']}789",
            "estimated_delivery": "2024-01-15"
        }

Test Data Management

Test Factories

# tests/factories.py
import factory
from datetime import datetime, timedelta
from decimal import Decimal

class OrderDataFactory(factory.Factory):
    """Factory for generating test order data"""

    class Meta:
        model = dict

    id = factory.Sequence(lambda n: f"order-{n}")
    customer_id = factory.Sequence(lambda n: f"customer-{n}")
    total_amount = factory.LazyFunction(lambda: Decimal(str(factory.Faker('pydecimal', left_digits=3, right_digits=2).generate())))
    created_at = factory.LazyFunction(datetime.utcnow)

    @factory.lazy_attribute
    def items(self):
        return [
            {
                "id": factory.Faker('uuid4').generate(),
                "name": factory.Faker('commerce_product_name').generate(),
                "sku": factory.Faker('bothify', text='SKU-###').generate(),
                "quantity": factory.Faker('random_int', min=1, max=5).generate(),
                "unit_price": float(factory.Faker('pydecimal', left_digits=2, right_digits=2).generate())
            }
            for _ in range(factory.Faker('random_int', min=1, max=3).generate())
        ]

    shipping_address = factory.SubFactory('tests.factories.AddressFactory')
    payment_method = "credit_card"
    payment_details = {"token": factory.Faker('uuid4')}

class AddressFactory(factory.Factory):
    """Factory for generating test addresses"""

    class Meta:
        model = dict

    street = factory.Faker('street_address')
    city = factory.Faker('city')
    state = factory.Faker('state_abbr')
    zip_code = factory.Faker('zipcode')

CI/CD Integration

GitHub Actions Test Configuration

# .github/workflows/test.yml
name: Test Suite

on:
  push:
    branches: [ main, develop ]
  pull_request:
    branches: [ main ]

jobs:
  unit-tests:
    runs-on: ubuntu-latest
    strategy:
      matrix:
        python-version: [3.9, 3.10, 3.11]

    steps:
    - uses: actions/checkout@v3

    - name: Set up Python ${{ matrix.python-version }}
      uses: actions/setup-python@v4
      with:
        python-version: ${{ matrix.python-version }}

    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install -r requirements.txt
        pip install -r requirements-test.txt

    - name: Run unit tests
      run: |
        pytest tests/test_workflows.py tests/test_activities.py -v --cov=src --cov-report=xml

    - name: Upload coverage to Codecov
      uses: codecov/codecov-action@v3
      with:
        file: ./coverage.xml

  integration-tests:
    runs-on: ubuntu-latest
    needs: unit-tests

    services:
      temporal:
        image: temporalio/auto-setup:1.20.0
        ports:
          - 7233:7233
        env:
          - DB=postgresql
          - DB_PORT=5432
          - POSTGRES_USER=temporal
          - POSTGRES_PWD=temporal
          - POSTGRES_SEEDS=postgres

      postgres:
        image: postgres:13
        ports:
          - 5432:5432
        env:
          POSTGRES_PASSWORD: temporal
          POSTGRES_USER: temporal
          POSTGRES_DB: temporal
        options: >-
          --health-cmd pg_isready
          --health-interval 10s
          --health-timeout 5s
          --health-retries 5

    steps:
    - uses: actions/checkout@v3

    - name: Set up Python
      uses: actions/setup-python@v4
      with:
        python-version: 3.11

    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install -r requirements.txt
        pip install -r requirements-test.txt

    - name: Wait for Temporal to be ready
      run: |
        sleep 30
        curl -f http://localhost:7233/ || exit 1

    - name: Run integration tests
      run: |
        pytest tests/test_api_integration.py tests/test_e2e.py -v
      env:
        TEMPORAL_SERVER_URL: localhost:7233
        TEMPORAL_NAMESPACE: default

  load-tests:
    runs-on: ubuntu-latest
    if: github.event_name == 'push' && github.ref == 'refs/heads/main'

    steps:
    - uses: actions/checkout@v3

    - name: Set up Python
      uses: actions/setup-python@v4
      with:
        python-version: 3.11

    - name: Install dependencies
      run: |
        pip install locust

    - name: Run load tests
      run: |
        locust -f tests/locustfile.py --headless -u 10 -r 2 -t 60s --host http://localhost:8000

This comprehensive testing guide provides robust strategies for testing Temporal workflows, activities, and FastAPI integrations across unit, integration, load, and end-to-end testing scenarios.