Skip to content

Python SDK Guide

This document provides comprehensive guidance for using the Temporal Python SDK in enterprise environments, covering setup, configuration, best practices, and advanced features for building production-ready Temporal applications.

Overview

The Temporal Python SDK enables developers to build workflows and activities using Python, providing a powerful and flexible foundation for orchestrating business processes. This guide focuses on enterprise-specific considerations including performance, reliability, observability, and maintainability.

Installation and Setup

Project Structure

temporal-python-project/
├── pyproject.toml              # Project configuration with uv
├── uv.lock                     # Dependency lock file
├── README.md
├── .gitignore
├── .github/
│   └── workflows/
│       └── ci.yml
├── src/
│   └── temporal_app/
│       ├── __init__.py
│       ├── activities/
│       │   ├── __init__.py
│       │   ├── base.py         # Base activity class
│       │   ├── payment.py      # Payment activities
│       │   ├── inventory.py    # Inventory activities
│       │   └── notification.py # Notification activities
│       ├── workflows/
│       │   ├── __init__.py
│       │   ├── base.py         # Base workflow class
│       │   ├── order_processing.py
│       │   └── user_onboarding.py
│       ├── models/
│       │   ├── __init__.py
│       │   ├── orders.py       # Order-related data models
│       │   └── users.py        # User-related data models
│       ├── workers/
│       │   ├── __init__.py
│       │   ├── main.py         # Main worker entry point
│       │   └── config.py       # Worker configuration
│       ├── clients/
│       │   ├── __init__.py
│       │   ├── temporal_client.py
│       │   └── external_apis.py
│       └── utils/
│           ├── __init__.py
│           ├── logging.py      # Logging configuration
│           ├── metrics.py      # Metrics utilities
│           └── config.py       # Application configuration
├── tests/
│   ├── __init__.py
│   ├── conftest.py
│   ├── unit/
│   │   ├── test_activities.py
│   │   └── test_workflows.py
│   ├── integration/
│   │   └── test_workflows_integration.py
│   └── fixtures/
│       └── sample_data.py
├── docker/
│   ├── Dockerfile
│   └── docker-compose.yml
└── k8s/
    ├── deployment.yaml
    ├── service.yaml
    └── configmap.yaml

Dependencies Configuration

# pyproject.toml
[project]
name = "temporal-enterprise-app"
version = "1.0.0"
description = "Enterprise Temporal application"
authors = [
    {name = "DevOps Team", email = "devops@example.com"}
]
readme = "README.md"
license = {text = "MIT"}
requires-python = ">=3.11"

dependencies = [
    # Temporal SDK
    "temporalio>=1.7.0",

    # Async HTTP client
    "httpx>=0.25.0",

    # Data validation and serialization
    "pydantic>=2.5.0",
    "pydantic-settings>=2.1.0",

    # Database
    "asyncpg>=0.29.0",
    "sqlalchemy[asyncio]>=2.0.0",
    "alembic>=1.13.0",

    # Observability
    "structlog>=23.2.0",
    "prometheus-client>=0.19.0",
    "opentelemetry-api>=1.21.0",
    "opentelemetry-sdk>=1.21.0",
    "opentelemetry-exporter-prometheus>=1.12.0",
    "opentelemetry-instrumentation-sqlalchemy>=0.42b0",

    # Configuration
    "python-dotenv>=1.0.0",

    # Utilities
    "tenacity>=8.2.0",
    "click>=8.1.0",
    "rich>=13.7.0",
]

[project.optional-dependencies]
dev = [
    # Testing
    "pytest>=7.4.0",
    "pytest-asyncio>=0.21.0",
    "pytest-cov>=4.0.0",
    "pytest-mock>=3.12.0",
    "factory-boy>=3.3.0",

    # Code quality
    "black>=23.11.0",
    "ruff>=0.1.6",
    "mypy>=1.7.0",
    "pre-commit>=3.6.0",

    # Documentation
    "mkdocs>=1.5.0",
    "mkdocs-material>=9.4.0",

    # Development tools
    "ipython>=8.17.0",
    "rich>=13.7.0",
]

[project.scripts]
temporal-worker = "temporal_app.workers.main:main"
temporal-client = "temporal_app.clients.temporal_client:cli"

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

# Tool configurations
[tool.black]
line-length = 88
target-version = ['py311']
include = '\.pyi?$'
extend-exclude = '''
/(
  # directories
  \.eggs
  | \.git
  | \.hg
  | \.mypy_cache
  | \.tox
  | \.venv
  | build
  | dist
)/
'''

[tool.ruff]
target-version = "py311"
line-length = 88
select = [
    "E",   # pycodestyle errors
    "W",   # pycodestyle warnings
    "F",   # pyflakes
    "UP",  # pyupgrade
    "B",   # flake8-bugbear
    "SIM", # flake8-simplify
    "I",   # isort
    "N",   # pep8-naming
    "C4",  # flake8-comprehensions
    "PTH", # flake8-use-pathlib
]

[tool.mypy]
python_version = "3.11"
strict = true
warn_return_any = true
warn_unused_configs = true
disallow_untyped_defs = true
disallow_incomplete_defs = true
check_untyped_defs = true
disallow_untyped_decorators = true

[[tool.mypy.overrides]]
module = "tests.*"
disallow_untyped_defs = false

[tool.pytest.ini_options]
testpaths = ["tests"]
python_files = ["test_*.py"]
python_classes = ["Test*"]
python_functions = ["test_*"]
addopts = [
    "-ra",
    "--strict-markers",
    "--strict-config",
    "--cov=temporal_app",
    "--cov-report=html",
    "--cov-report=term-missing",
]
markers = [
    "slow: marks tests as slow (deselect with '-m \"not slow\"')",
    "integration: marks tests as integration tests",
    "unit: marks tests as unit tests",
]
asyncio_mode = "auto"

Base Classes and Utilities

Base Activity Class

# src/temporal_app/activities/base.py
import asyncio
import logging
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional, Type, TypeVar
from datetime import datetime

from temporalio import activity
from pydantic import BaseModel, ValidationError
from opentelemetry import trace

from ..utils.metrics import ActivityMetrics
from ..utils.logging import get_structured_logger

T = TypeVar('T', bound=BaseModel)

class BaseActivity(ABC):
    """Base class for all Temporal activities with enterprise features"""

    def __init__(self):
        self.logger = get_structured_logger(self.__class__.__name__)
        self.metrics = ActivityMetrics()
        self.tracer = trace.get_tracer(__name__)

    async def execute_with_instrumentation(
        self, 
        input_data: BaseModel,
        operation_name: Optional[str] = None
    ) -> Any:
        """Execute activity with comprehensive instrumentation"""

        operation_name = operation_name or self.__class__.__name__

        # Start metrics timer
        timer = self.metrics.start_timer(operation_name)

        # Start OpenTelemetry span
        with self.tracer.start_as_current_span(operation_name) as span:
            try:
                # Add span attributes
                span.set_attribute("activity.name", self.__class__.__name__)
                span.set_attribute("activity.input_type", type(input_data).__name__)

                # Log activity start
                self.logger.info(
                    "Activity started",
                    activity_name=self.__class__.__name__,
                    input_type=type(input_data).__name__,
                    workflow_id=activity.info().workflow_id,
                    activity_id=activity.info().activity_id
                )

                # Send heartbeat
                activity.heartbeat("Activity execution started")

                # Execute the actual activity logic
                result = await self.execute(input_data)

                # Log success
                self.logger.info(
                    "Activity completed successfully",
                    activity_name=self.__class__.__name__,
                    execution_time=timer.stop()
                )

                # Record success metrics
                self.metrics.record_success(operation_name)
                span.set_attribute("activity.status", "success")

                return result

            except ValidationError as e:
                # Handle validation errors
                self.logger.error(
                    "Activity validation error",
                    activity_name=self.__class__.__name__,
                    error=str(e),
                    execution_time=timer.stop()
                )

                self.metrics.record_error(operation_name, "validation_error")
                span.set_attribute("activity.status", "validation_error")
                span.record_exception(e)

                raise

            except Exception as e:
                # Handle all other errors
                self.logger.error(
                    "Activity execution failed",
                    activity_name=self.__class__.__name__,
                    error=str(e),
                    error_type=type(e).__name__,
                    execution_time=timer.stop()
                )

                self.metrics.record_error(operation_name, type(e).__name__)
                span.set_attribute("activity.status", "error")
                span.record_exception(e)

                raise

    @abstractmethod
    async def execute(self, input_data: BaseModel) -> Any:
        """Execute the activity logic - to be implemented by subclasses"""
        pass

    async def validate_input(self, input_data: Any, model_class: Type[T]) -> T:
        """Validate and parse input data using Pydantic model"""
        try:
            if isinstance(input_data, model_class):
                return input_data
            elif isinstance(input_data, dict):
                return model_class(**input_data)
            else:
                return model_class.model_validate(input_data)
        except ValidationError as e:
            self.logger.error(
                "Input validation failed",
                error=str(e),
                input_type=type(input_data).__name__
            )
            raise

    async def call_external_service(
        self,
        service_name: str,
        method: str,
        *args,
        **kwargs
    ) -> Any:
        """Call external service with retry and instrumentation"""

        with self.tracer.start_as_current_span(f"external_call_{service_name}") as span:
            span.set_attribute("service.name", service_name)
            span.set_attribute("service.method", method)

            try:
                # Implement your external service call logic here
                # This is a placeholder for demonstration
                result = await self._make_external_call(service_name, method, *args, **kwargs)

                span.set_attribute("external_call.status", "success")
                return result

            except Exception as e:
                span.set_attribute("external_call.status", "error")
                span.record_exception(e)

                self.logger.error(
                    "External service call failed",
                    service=service_name,
                    method=method,
                    error=str(e)
                )
                raise

    async def _make_external_call(self, service_name: str, method: str, *args, **kwargs) -> Any:
        """Placeholder for actual external service calls"""
        # Implement actual external service calling logic
        pass

# Activity decorator with base class integration
def enterprise_activity(name: Optional[str] = None):
    """Decorator for enterprise activities with built-in instrumentation"""
    def decorator(cls: Type[BaseActivity]):

        @activity.defn(name=name or cls.__name__)
        async def activity_wrapper(*args, **kwargs):
            instance = cls()

            # Assume first argument is the input data
            input_data = args[0] if args else kwargs.get('input_data')

            return await instance.execute_with_instrumentation(input_data)

        # Store original class for reference
        activity_wrapper._original_class = cls
        return activity_wrapper

    return decorator

Base Workflow Class

# src/temporal_app/workflows/base.py
import asyncio
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional, Type, TypeVar
from datetime import datetime, timedelta

from temporalio import workflow
from temporalio.common import RetryPolicy
from pydantic import BaseModel

from ..utils.logging import get_structured_logger
from ..utils.metrics import WorkflowMetrics

T = TypeVar('T', bound=BaseModel)

class BaseWorkflow(ABC):
    """Base class for all Temporal workflows with enterprise features"""

    def __init__(self):
        self.logger = get_structured_logger(self.__class__.__name__)
        self.metrics = WorkflowMetrics()
        self._execution_started = workflow.now()
        self._steps_completed: set[str] = set()

    async def execute_with_instrumentation(self, input_data: BaseModel) -> Any:
        """Execute workflow with comprehensive instrumentation"""

        try:
            # Log workflow start
            workflow.logger.info(
                "Workflow started",
                workflow_name=self.__class__.__name__,
                workflow_id=workflow.info().workflow_id,
                run_id=workflow.info().run_id,
                input_type=type(input_data).__name__
            )

            # Execute the actual workflow logic
            result = await self.execute(input_data)

            # Calculate execution time
            execution_time = workflow.now() - self._execution_started

            # Log workflow completion
            workflow.logger.info(
                "Workflow completed successfully",
                workflow_name=self.__class__.__name__,
                execution_time=execution_time.total_seconds(),
                steps_completed=len(self._steps_completed)
            )

            return result

        except Exception as e:
            # Calculate execution time for failed workflow
            execution_time = workflow.now() - self._execution_started

            # Log workflow failure
            workflow.logger.error(
                "Workflow execution failed",
                workflow_name=self.__class__.__name__,
                error=str(e),
                error_type=type(e).__name__,
                execution_time=execution_time.total_seconds(),
                steps_completed=len(self._steps_completed)
            )

            raise

    @abstractmethod
    async def execute(self, input_data: BaseModel) -> Any:
        """Execute the workflow logic - to be implemented by subclasses"""
        pass

    async def execute_activity_step(
        self,
        activity_function: Any,
        input_data: Any,
        step_name: str,
        timeout: timedelta = timedelta(minutes=10),
        retry_policy: Optional[RetryPolicy] = None,
        heartbeat_timeout: Optional[timedelta] = None,
        task_queue: Optional[str] = None
    ) -> Any:
        """Execute an activity with standardized error handling and logging"""

        # Check if step was already completed (for replay safety)
        if step_name in self._steps_completed:
            workflow.logger.debug(f"Step '{step_name}' already completed, skipping")
            return

        # Default retry policy for enterprise activities
        if retry_policy is None:
            retry_policy = RetryPolicy(
                initial_interval=timedelta(seconds=1),
                maximum_interval=timedelta(seconds=60),
                maximum_attempts=3,
                non_retryable_error_types=[
                    "ValidationError",
                    "AuthenticationError",
                    "AuthorizationError"
                ]
            )

        try:
            workflow.logger.info(f"Starting activity step: {step_name}")

            result = await workflow.execute_activity(
                activity_function,
                input_data,
                start_to_close_timeout=timeout,
                retry_policy=retry_policy,
                heartbeat_timeout=heartbeat_timeout,
                task_queue=task_queue
            )

            # Mark step as completed
            self._steps_completed.add(step_name)

            workflow.logger.info(f"Completed activity step: {step_name}")
            return result

        except Exception as e:
            workflow.logger.error(
                f"Activity step failed: {step_name}",
                error=str(e),
                error_type=type(e).__name__
            )
            raise

    async def execute_parallel_activities(
        self,
        activities: Dict[str, tuple],  # step_name -> (activity_function, input_data, options)
        gather_exceptions: bool = True
    ) -> Dict[str, Any]:
        """Execute multiple activities in parallel"""

        workflow.logger.info(f"Starting {len(activities)} parallel activities")

        # Create tasks for all activities
        tasks = {}
        for step_name, (activity_func, input_data, options) in activities.items():
            task = workflow.execute_activity(
                activity_func,
                input_data,
                **options
            )
            tasks[step_name] = task

        # Wait for all tasks to complete
        if gather_exceptions:
            results = {}
            for step_name, task in tasks.items():
                try:
                    results[step_name] = await task
                    self._steps_completed.add(step_name)
                except Exception as e:
                    workflow.logger.error(f"Parallel activity failed: {step_name}", error=str(e))
                    results[step_name] = e
            return results
        else:
            # Use asyncio.gather for fail-fast behavior
            task_list = list(tasks.values())
            step_names = list(tasks.keys())

            results_list = await asyncio.gather(*task_list)

            # Mark all steps as completed
            for step_name in step_names:
                self._steps_completed.add(step_name)

            return dict(zip(step_names, results_list))

    async def wait_for_condition_with_timeout(
        self,
        condition: callable,
        timeout: timedelta,
        check_interval: timedelta = timedelta(seconds=1)
    ) -> bool:
        """Wait for a condition with timeout"""

        start_time = workflow.now()

        while workflow.now() - start_time < timeout:
            if condition():
                return True
            await asyncio.sleep(check_interval.total_seconds())

        return False

    def get_execution_metrics(self) -> Dict[str, Any]:
        """Get workflow execution metrics"""
        execution_time = workflow.now() - self._execution_started

        return {
            "workflow_name": self.__class__.__name__,
            "workflow_id": workflow.info().workflow_id,
            "run_id": workflow.info().run_id,
            "execution_time_seconds": execution_time.total_seconds(),
            "steps_completed": len(self._steps_completed),
            "completed_steps": list(self._steps_completed)
        }

# Workflow decorator with base class integration
def enterprise_workflow(name: Optional[str] = None):
    """Decorator for enterprise workflows with built-in instrumentation"""
    def decorator(cls: Type[BaseWorkflow]):

        @workflow.defn(name=name or cls.__name__)
        class WorkflowWrapper:
            async def run(self, *args, **kwargs):
                instance = cls()

                # Assume first argument is the input data
                input_data = args[0] if args else kwargs.get('input_data')

                return await instance.execute_with_instrumentation(input_data)

        # Store original class for reference
        WorkflowWrapper._original_class = cls
        return WorkflowWrapper

    return decorator

Data Models and Validation

Pydantic Models for Type Safety

# src/temporal_app/models/orders.py
from datetime import datetime
from decimal import Decimal
from enum import Enum
from typing import List, Optional, Dict, Any
from uuid import UUID, uuid4

from pydantic import BaseModel, Field, validator, root_validator

class OrderStatus(str, Enum):
    PENDING = "pending"
    CONFIRMED = "confirmed"
    PROCESSING = "processing"
    SHIPPED = "shipped"
    DELIVERED = "delivered"
    CANCELLED = "cancelled"
    REFUNDED = "refunded"

class PaymentMethod(str, Enum):
    CREDIT_CARD = "credit_card"
    DEBIT_CARD = "debit_card"
    PAYPAL = "paypal"
    BANK_TRANSFER = "bank_transfer"

class Address(BaseModel):
    """Shipping/billing address model"""
    street: str = Field(..., min_length=1, max_length=255)
    city: str = Field(..., min_length=1, max_length=100)
    state: str = Field(..., min_length=2, max_length=50)
    zip_code: str = Field(..., regex=r'^\d{5}(-\d{4})?$')
    country: str = Field(default="US", min_length=2, max_length=2)

    class Config:
        schema_extra = {
            "example": {
                "street": "123 Main St",
                "city": "Anytown",
                "state": "CA",
                "zip_code": "12345",
                "country": "US"
            }
        }

class OrderItem(BaseModel):
    """Individual order item"""
    id: str = Field(..., min_length=1)
    name: str = Field(..., min_length=1, max_length=255)
    sku: str = Field(..., min_length=1, max_length=100)
    quantity: int = Field(..., gt=0, le=1000)
    unit_price: Decimal = Field(..., gt=0, max_digits=10, decimal_places=2)
    total_price: Optional[Decimal] = None

    @root_validator
    def calculate_total_price(cls, values):
        quantity = values.get('quantity')
        unit_price = values.get('unit_price')
        if quantity is not None and unit_price is not None:
            values['total_price'] = quantity * unit_price
        return values

class Order(BaseModel):
    """Complete order model"""
    id: str = Field(default_factory=lambda: str(uuid4()))
    customer_id: str = Field(..., min_length=1)
    items: List[OrderItem] = Field(..., min_items=1)
    shipping_address: Address
    billing_address: Optional[Address] = None
    payment_method: PaymentMethod
    payment_details: Dict[str, Any] = Field(default_factory=dict)

    subtotal: Optional[Decimal] = None
    tax_amount: Optional[Decimal] = None
    shipping_amount: Optional[Decimal] = None
    total_amount: Optional[Decimal] = None

    status: OrderStatus = OrderStatus.PENDING
    created_at: datetime = Field(default_factory=datetime.utcnow)
    updated_at: Optional[datetime] = None

    notes: Optional[str] = Field(None, max_length=1000)
    metadata: Dict[str, Any] = Field(default_factory=dict)

    @root_validator
    def calculate_totals(cls, values):
        items = values.get('items', [])

        # Calculate subtotal
        subtotal = sum(item.total_price or Decimal('0') for item in items)
        values['subtotal'] = subtotal

        # Calculate tax (example: 8.5%)
        tax_rate = Decimal('0.085')
        tax_amount = subtotal * tax_rate
        values['tax_amount'] = tax_amount.quantize(Decimal('0.01'))

        # Calculate shipping (example: flat rate)
        shipping_amount = Decimal('9.99') if subtotal < 100 else Decimal('0')
        values['shipping_amount'] = shipping_amount

        # Calculate total
        total_amount = subtotal + tax_amount + shipping_amount
        values['total_amount'] = total_amount.quantize(Decimal('0.01'))

        return values

    @validator('billing_address', always=True)
    def set_billing_address(cls, v, values):
        if v is None:
            return values.get('shipping_address')
        return v

# Activity input/output models
class OrderValidationRequest(BaseModel):
    order: Order

class OrderValidationResult(BaseModel):
    is_valid: bool
    order_id: str
    validation_errors: List[str] = Field(default_factory=list)
    estimated_shipping_date: Optional[datetime] = None

class PaymentProcessingRequest(BaseModel):
    order_id: str
    amount: Decimal
    payment_method: PaymentMethod
    payment_details: Dict[str, Any]
    idempotency_key: str

class PaymentProcessingResult(BaseModel):
    success: bool
    transaction_id: Optional[str] = None
    error_message: Optional[str] = None
    payment_method_verified: bool = False

class ShippingRequest(BaseModel):
    order_id: str
    items: List[OrderItem]
    shipping_address: Address
    shipping_method: str = "standard"

class ShippingResult(BaseModel):
    success: bool
    tracking_number: Optional[str] = None
    estimated_delivery_date: Optional[datetime] = None
    shipping_cost: Optional[Decimal] = None
    carrier: Optional[str] = None

# Workflow result models
class OrderProcessingResult(BaseModel):
    order_id: str
    status: OrderStatus
    validation_result: Optional[OrderValidationResult] = None
    payment_result: Optional[PaymentProcessingResult] = None
    shipping_result: Optional[ShippingResult] = None
    processing_duration_seconds: Optional[float] = None
    completed_at: datetime = Field(default_factory=datetime.utcnow)

Activity Implementation Examples

Payment Processing Activity

# src/temporal_app/activities/payment.py
import asyncio
from decimal import Decimal
from typing import Optional
import httpx
from temporalio import activity
from temporalio.exceptions import ApplicationError

from .base import BaseActivity, enterprise_activity
from ..models.orders import PaymentProcessingRequest, PaymentProcessingResult, PaymentMethod
from ..utils.config import get_settings

class PaymentProcessor(BaseActivity):
    """Payment processing activity with enterprise features"""

    def __init__(self):
        super().__init__()
        self.settings = get_settings()
        self.client = httpx.AsyncClient(
            timeout=30.0,
            limits=httpx.Limits(max_connections=10, max_keepalive_connections=5)
        )

    async def execute(self, input_data: PaymentProcessingRequest) -> PaymentProcessingResult:
        """Process payment with comprehensive error handling"""

        # Validate input
        request = await self.validate_input(input_data, PaymentProcessingRequest)

        # Send heartbeat
        activity.heartbeat("Starting payment processing")

        try:
            # Verify payment method
            is_verified = await self._verify_payment_method(request)
            if not is_verified:
                return PaymentProcessingResult(
                    success=False,
                    error_message="Payment method verification failed",
                    payment_method_verified=False
                )

            # Process payment based on method
            if request.payment_method == PaymentMethod.CREDIT_CARD:
                result = await self._process_credit_card_payment(request)
            elif request.payment_method == PaymentMethod.PAYPAL:
                result = await self._process_paypal_payment(request)
            else:
                raise ApplicationError(
                    f"Unsupported payment method: {request.payment_method}",
                    type="UnsupportedPaymentMethod"
                )

            # Log successful payment
            self.logger.info(
                "Payment processed successfully",
                order_id=request.order_id,
                transaction_id=result.transaction_id,
                amount=str(request.amount)
            )

            return result

        except Exception as e:
            self.logger.error(
                "Payment processing failed",
                order_id=request.order_id,
                error=str(e),
                payment_method=request.payment_method
            )

            # Return structured error result
            return PaymentProcessingResult(
                success=False,
                error_message=str(e),
                payment_method_verified=is_verified if 'is_verified' in locals() else False
            )

    async def _verify_payment_method(self, request: PaymentProcessingRequest) -> bool:
        """Verify payment method details"""
        activity.heartbeat("Verifying payment method")

        try:
            # Simulate payment method verification
            await asyncio.sleep(0.1)  # Simulate API call

            # Add actual payment method verification logic here
            return True

        except Exception as e:
            self.logger.error(f"Payment method verification failed: {e}")
            return False

    async def _process_credit_card_payment(
        self, 
        request: PaymentProcessingRequest
    ) -> PaymentProcessingResult:
        """Process credit card payment"""
        activity.heartbeat("Processing credit card payment")

        # Prepare payment request
        payment_data = {
            "amount": str(request.amount),
            "currency": "USD",
            "payment_method": request.payment_details,
            "idempotency_key": request.idempotency_key,
            "metadata": {
                "order_id": request.order_id
            }
        }

        try:
            # Call external payment processor
            response = await self.call_external_service(
                "payment_processor",
                "charge",
                data=payment_data
            )

            if response.get("status") == "succeeded":
                return PaymentProcessingResult(
                    success=True,
                    transaction_id=response.get("id"),
                    payment_method_verified=True
                )
            else:
                return PaymentProcessingResult(
                    success=False,
                    error_message=response.get("error", "Payment failed"),
                    payment_method_verified=True
                )

        except Exception as e:
            raise ApplicationError(
                f"Credit card payment processing failed: {str(e)}",
                type="PaymentProcessingError"
            )

    async def _process_paypal_payment(
        self, 
        request: PaymentProcessingRequest
    ) -> PaymentProcessingResult:
        """Process PayPal payment"""
        activity.heartbeat("Processing PayPal payment")

        # Implement PayPal-specific payment logic
        try:
            # Simulate PayPal API call
            await asyncio.sleep(0.2)

            return PaymentProcessingResult(
                success=True,
                transaction_id=f"pp_{request.idempotency_key}",
                payment_method_verified=True
            )

        except Exception as e:
            raise ApplicationError(
                f"PayPal payment processing failed: {str(e)}",
                type="PaymentProcessingError"
            )

# Register the activity
@enterprise_activity("process_payment")
class ProcessPaymentActivity(PaymentProcessor):
    pass

Inventory Management Activity

# src/temporal_app/activities/inventory.py
import asyncio
from typing import Dict, List
from temporalio import activity
from temporalio.exceptions import ApplicationError

from .base import BaseActivity, enterprise_activity
from ..models.orders import OrderItem
from ..utils.database import get_database_connection

class InventoryManager(BaseActivity):
    """Inventory management activity"""

    async def execute(self, order_items: List[OrderItem]) -> Dict[str, bool]:
        """Check and reserve inventory for order items"""

        activity.heartbeat("Starting inventory check")

        results = {}

        try:
            async with get_database_connection() as db:
                for item in order_items:
                    # Check availability
                    available_quantity = await self._check_inventory(db, item.sku)

                    if available_quantity >= item.quantity:
                        # Reserve inventory
                        success = await self._reserve_inventory(
                            db, item.sku, item.quantity
                        )
                        results[item.sku] = success
                    else:
                        self.logger.warning(
                            "Insufficient inventory",
                            sku=item.sku,
                            requested=item.quantity,
                            available=available_quantity
                        )
                        results[item.sku] = False

                # Check if all items were successfully reserved
                if not all(results.values()):
                    # Rollback reservations for failed order
                    await self._rollback_reservations(db, order_items, results)
                    raise ApplicationError(
                        "Insufficient inventory for some items",
                        type="InsufficientInventory",
                        details={"failed_items": [sku for sku, success in results.items() if not success]}
                    )

                activity.heartbeat("Inventory reservation completed")
                return results

        except Exception as e:
            self.logger.error(f"Inventory management failed: {e}")
            raise

    async def _check_inventory(self, db, sku: str) -> int:
        """Check available inventory for SKU"""
        query = "SELECT available_quantity FROM inventory WHERE sku = $1"
        result = await db.fetchval(query, sku)
        return result or 0

    async def _reserve_inventory(self, db, sku: str, quantity: int) -> bool:
        """Reserve inventory for SKU"""
        query = """
        UPDATE inventory 
        SET available_quantity = available_quantity - $2,
            reserved_quantity = reserved_quantity + $2
        WHERE sku = $1 AND available_quantity >= $2
        """
        result = await db.execute(query, sku, quantity)
        return result == "UPDATE 1"

    async def _rollback_reservations(self, db, order_items: List[OrderItem], results: Dict[str, bool]):
        """Rollback successful reservations"""
        for item in order_items:
            if results.get(item.sku, False):
                query = """
                UPDATE inventory 
                SET available_quantity = available_quantity + $2,
                    reserved_quantity = reserved_quantity - $2
                WHERE sku = $1
                """
                await db.execute(query, item.sku, item.quantity)

@enterprise_activity("reserve_inventory")
class ReserveInventoryActivity(InventoryManager):
    pass

Workflow Implementation Example

Order Processing Workflow

# src/temporal_app/workflows/order_processing.py
from datetime import timedelta
from temporalio import workflow
from temporalio.common import RetryPolicy
from temporalio.exceptions import ApplicationError

from .base import BaseWorkflow, enterprise_workflow
from ..models.orders import (
    Order, OrderProcessingResult, OrderStatus,
    OrderValidationRequest, PaymentProcessingRequest, ShippingRequest
)
from ..activities.payment import ProcessPaymentActivity
from ..activities.inventory import ReserveInventoryActivity

class OrderProcessingWorkflow(BaseWorkflow):
    """Enterprise order processing workflow with comprehensive error handling"""

    async def execute(self, order: Order) -> OrderProcessingResult:
        """Execute order processing workflow"""

        result = OrderProcessingResult(
            order_id=order.id,
            status=OrderStatus.PENDING
        )

        try:
            # Step 1: Validate order
            validation_result = await self.execute_activity_step(
                self._validate_order,
                OrderValidationRequest(order=order),
                step_name="validate_order",
                timeout=timedelta(minutes=5)
            )
            result.validation_result = validation_result

            if not validation_result.is_valid:
                result.status = OrderStatus.CANCELLED
                raise ApplicationError(
                    "Order validation failed",
                    type="OrderValidationError",
                    details={"errors": validation_result.validation_errors}
                )

            # Step 2: Reserve inventory
            inventory_result = await self.execute_activity_step(
                ReserveInventoryActivity,
                order.items,
                step_name="reserve_inventory",
                timeout=timedelta(minutes=10)
            )

            # Step 3: Process payment
            payment_request = PaymentProcessingRequest(
                order_id=order.id,
                amount=order.total_amount,
                payment_method=order.payment_method,
                payment_details=order.payment_details,
                idempotency_key=f"payment_{order.id}"
            )

            payment_result = await self.execute_activity_step(
                ProcessPaymentActivity,
                payment_request,
                step_name="process_payment",
                timeout=timedelta(minutes=15),
                retry_policy=RetryPolicy(
                    initial_interval=timedelta(seconds=2),
                    maximum_interval=timedelta(minutes=1),
                    maximum_attempts=3,
                    non_retryable_error_types=["InsufficientFunds", "InvalidPaymentMethod"]
                )
            )
            result.payment_result = payment_result

            if not payment_result.success:
                # Payment failed - release inventory
                await self._compensate_inventory_reservation(order.items)
                result.status = OrderStatus.CANCELLED
                raise ApplicationError(
                    "Payment processing failed",
                    type="PaymentError",
                    details={"error": payment_result.error_message}
                )

            # Step 4: Create shipping label
            shipping_request = ShippingRequest(
                order_id=order.id,
                items=order.items,
                shipping_address=order.shipping_address
            )

            shipping_result = await self.execute_activity_step(
                self._create_shipping_label,
                shipping_request,
                step_name="create_shipping",
                timeout=timedelta(minutes=10)
            )
            result.shipping_result = shipping_result

            # Step 5: Send confirmation notifications
            await self.execute_activity_step(
                self._send_order_confirmation,
                {
                    "order_id": order.id,
                    "customer_id": order.customer_id,
                    "payment_result": payment_result,
                    "shipping_result": shipping_result
                },
                step_name="send_confirmation",
                timeout=timedelta(minutes=5)
            )

            # Mark order as confirmed
            result.status = OrderStatus.CONFIRMED

            # Calculate processing duration
            execution_metrics = self.get_execution_metrics()
            result.processing_duration_seconds = execution_metrics["execution_time_seconds"]

            workflow.logger.info(
                "Order processing completed successfully",
                order_id=order.id,
                processing_time=result.processing_duration_seconds
            )

            return result

        except Exception as e:
            # Handle workflow failure
            await self._handle_workflow_failure(order, result, e)
            raise

    async def _validate_order(self, request: OrderValidationRequest):
        """Validate order details"""
        # This would be implemented as a separate activity
        pass

    async def _create_shipping_label(self, request: ShippingRequest):
        """Create shipping label"""
        # This would be implemented as a separate activity
        pass

    async def _send_order_confirmation(self, data: dict):
        """Send order confirmation"""
        # This would be implemented as a separate activity
        pass

    async def _compensate_inventory_reservation(self, items):
        """Release reserved inventory (compensation)"""
        try:
            await workflow.execute_activity(
                self._release_inventory,
                items,
                start_to_close_timeout=timedelta(minutes=5)
            )
        except Exception as e:
            workflow.logger.error(f"Failed to release inventory: {e}")

    async def _release_inventory(self, items):
        """Release inventory activity"""
        # This would be implemented as a separate activity
        pass

    async def _handle_workflow_failure(self, order: Order, result: OrderProcessingResult, error: Exception):
        """Handle workflow failure with proper cleanup"""
        workflow.logger.error(
            "Order processing workflow failed",
            order_id=order.id,
            error=str(error),
            error_type=type(error).__name__
        )

        # Attempt to send failure notification
        try:
            await workflow.execute_activity(
                self._send_failure_notification,
                {
                    "order_id": order.id,
                    "customer_id": order.customer_id,
                    "error": str(error)
                },
                start_to_close_timeout=timedelta(minutes=2)
            )
        except Exception as notification_error:
            workflow.logger.error(f"Failed to send failure notification: {notification_error}")

# Register the workflow
@enterprise_workflow("order_processing")
class OrderProcessingWorkflowRegistered(OrderProcessingWorkflow):
    pass

Worker Configuration and Startup

Worker Implementation

# src/temporal_app/workers/main.py
import asyncio
import signal
import sys
from contextlib import asynccontextmanager
from typing import Optional

import click
from temporalio.client import Client
from temporalio.worker import Worker

from ..activities.payment import ProcessPaymentActivity
from ..activities.inventory import ReserveInventoryActivity
from ..workflows.order_processing import OrderProcessingWorkflowRegistered
from ..utils.config import get_settings
from ..utils.logging import setup_logging
from ..utils.metrics import setup_metrics

class TemporalWorker:
    """Enterprise Temporal worker with comprehensive configuration"""

    def __init__(self, settings):
        self.settings = settings
        self.client: Optional[Client] = None
        self.worker: Optional[Worker] = None
        self.running = False

    async def start(self):
        """Start the Temporal worker"""

        # Setup logging and metrics
        setup_logging(self.settings.log_level)
        setup_metrics()

        # Connect to Temporal server
        self.client = await Client.connect(
            self.settings.temporal_server_url,
            namespace=self.settings.temporal_namespace,
            tls=self.settings.temporal_tls_config if self.settings.temporal_tls_enabled else False
        )

        # Create worker
        self.worker = Worker(
            self.client,
            task_queue=self.settings.task_queue,
            workflows=[OrderProcessingWorkflowRegistered],
            activities=[
                ProcessPaymentActivity,
                ReserveInventoryActivity,
            ],
            max_concurrent_activities=self.settings.max_concurrent_activities,
            max_concurrent_workflows=self.settings.max_concurrent_workflows,
            max_concurrent_local_activities=self.settings.max_concurrent_local_activities,
        )

        print(f"Starting Temporal worker on task queue: {self.settings.task_queue}")
        print(f"Connected to: {self.settings.temporal_server_url}")
        print(f"Namespace: {self.settings.temporal_namespace}")

        # Start worker
        self.running = True
        await self.worker.run()

    async def stop(self):
        """Stop the Temporal worker gracefully"""
        if self.running and self.worker:
            print("Shutting down Temporal worker...")
            self.running = False
            await self.worker.shutdown()

        if self.client:
            await self.client.close()

# Global worker instance
worker_instance = None

async def create_worker():
    """Create and configure worker instance"""
    settings = get_settings()
    worker = TemporalWorker(settings)
    return worker

def signal_handler(signum, frame):
    """Handle shutdown signals"""
    print(f"\nReceived signal {signum}. Initiating graceful shutdown...")
    if worker_instance:
        # Create a new event loop if needed
        try:
            loop = asyncio.get_event_loop()
        except RuntimeError:
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)

        loop.run_until_complete(worker_instance.stop())
    sys.exit(0)

@click.command()
@click.option('--task-queue', default='temporal-product-queue', help='Task queue name')
@click.option('--log-level', default='INFO', help='Logging level')
@click.option('--max-concurrent-activities', default=100, type=int, help='Max concurrent activities')
@click.option('--max-concurrent-workflows', default=100, type=int, help='Max concurrent workflows')
def main(task_queue: str, log_level: str, max_concurrent_activities: int, max_concurrent_workflows: int):
    """Start the Temporal worker"""
    global worker_instance

    # Setup signal handlers
    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)

    async def run_worker():
        global worker_instance

        # Override settings with CLI arguments
        settings = get_settings()
        settings.task_queue = task_queue
        settings.log_level = log_level
        settings.max_concurrent_activities = max_concurrent_activities
        settings.max_concurrent_workflows = max_concurrent_workflows

        worker_instance = TemporalWorker(settings)

        try:
            await worker_instance.start()
        except KeyboardInterrupt:
            print("\nReceived interrupt signal. Shutting down...")
        except Exception as e:
            print(f"Worker error: {e}")
            raise
        finally:
            if worker_instance:
                await worker_instance.stop()

    # Run the worker
    asyncio.run(run_worker())

if __name__ == "__main__":
    main()

Configuration Management

Settings Configuration

# src/temporal_app/utils/config.py
from functools import lru_cache
from typing import Optional, Dict, Any
from pydantic import BaseSettings, Field

class Settings(BaseSettings):
    """Application settings with environment variable support"""

    # Temporal configuration
    temporal_server_url: str = Field(default="localhost:7233", env="TEMPORAL_SERVER_URL")
    temporal_namespace: str = Field(default="default", env="TEMPORAL_NAMESPACE")
    temporal_tls_enabled: bool = Field(default=False, env="TEMPORAL_TLS_ENABLED")
    temporal_tls_config: Optional[Dict[str, Any]] = None

    # Worker configuration
    task_queue: str = Field(default="temporal-product-queue", env="TASK_QUEUE")
    max_concurrent_activities: int = Field(default=100, env="MAX_CONCURRENT_ACTIVITIES")
    max_concurrent_workflows: int = Field(default=100, env="MAX_CONCURRENT_WORKFLOWS")
    max_concurrent_local_activities: int = Field(default=100, env="MAX_CONCURRENT_LOCAL_ACTIVITIES")

    # Database configuration
    database_url: str = Field(..., env="DATABASE_URL")
    database_pool_size: int = Field(default=10, env="DATABASE_POOL_SIZE")
    database_max_overflow: int = Field(default=20, env="DATABASE_MAX_OVERFLOW")

    # External services
    payment_processor_url: str = Field(..., env="PAYMENT_PROCESSOR_URL")
    payment_processor_api_key: str = Field(..., env="PAYMENT_PROCESSOR_API_KEY")

    # Logging configuration
    log_level: str = Field(default="INFO", env="LOG_LEVEL")
    log_format: str = Field(default="json", env="LOG_FORMAT")

    # Metrics configuration
    metrics_port: int = Field(default=8080, env="METRICS_PORT")
    metrics_enabled: bool = Field(default=True, env="METRICS_ENABLED")

    # Application configuration
    environment: str = Field(default="development", env="ENVIRONMENT")
    debug: bool = Field(default=False, env="DEBUG")

    class Config:
        env_file = ".env"
        env_file_encoding = "utf-8"

@lru_cache()
def get_settings() -> Settings:
    """Get cached settings instance"""
    return Settings()

This comprehensive Python SDK guide provides enterprise-ready patterns and practices for building robust Temporal applications with proper error handling, observability, and maintainability.