Workers Module

The Workers module in the Autoppia SDK provides the necessary tools for configuring, deploying, and interacting with AI worker agents. It covers the conversion of backend configurations into worker instances, the definition of worker interfaces, and REST API and routing mechanisms for message processing.

Module Components

  • Worker Configuration & Adapter: Converts backend Data Transfer Objects (DTOs) into worker configuration models.

  • Worker Interface & Base Class: Defines the core interface that all AI worker implementations must follow.

  • Worker Router: Provides a mechanism to route messages to worker instances.

  • Worker API: Offers a REST API wrapper (using FastAPI) for interacting with workers.


1. Worker Configuration & Adapter

This section converts backend configuration DTOs into a domain-specific worker configuration model, integrating services such as integrations, vector stores, and language models (LLMs).

File: autoppia_sdk/src/workers/adapter.py

pythonCopiarfrom typing import Optional, Dict, Any
from autoppia_backend_client.models import WorkerConfig as WorkerConfigDTO
from autoppia_sdk.src.integrations.adapter import IntegrationsAdapter
from autoppia_sdk.src.vectorstores.adapter import VectorStoreAdapter
from autoppia_sdk.src.llms.adapter import LLMAdapter
from autoppia_sdk.src.workers.interface import WorkerConfig

class AIWorkerConfigAdapter:
    """Adapter for constructing worker configurations from backend DTOs.
    
    Handles conversion of backend data transfer objects into domain-specific
    configuration models with proper validation and resource management.
    
    Args:
        worker_id: Optional identifier for worker instance tracking
    """
    
    def __init__(self, worker_id: Optional[str] = None) -> None:
        self.worker_id = worker_id
        self.worker_config_dto: Optional[WorkerConfigDTO] = None

    def adapt_integrations(self) -> Dict[str, Any]:
        """Adapt integrations configuration from backend DTO.
        
        Returns:
            Dictionary of initialized integration clients keyed by provider.
            
        Raises:
            ValueError: If required integration configuration is missing.
        """
        if not self.worker_config_dto:
            raise ValueError("Configuration DTO not loaded")
        return IntegrationsAdapter().from_autoppia_backend(self.worker_config_dto)

    def adapt_vector_stores(self) -> Dict[str, Any]:
        """Adapt vector store configuration from backend DTO.
        
        Returns:
            Dictionary of vector store clients keyed by provider.
        """
        if not self.worker_config_dto or not self.worker_config_dto.embedding_database:
            return {}

        vector_store = VectorStoreAdapter(
            self.worker_config_dto.embedding_database
        ).from_backend()
        
        return {self.worker_config_dto.embedding_database.provider: vector_store} if vector_store else {}

    def adapt_llms(self) -> Dict[str, Any]:
        """Adapt LLM configuration from backend DTO.
        
        Returns:
            Dictionary of LLM clients keyed by provider.
        """
        if not self.worker_config_dto or not self.worker_config_dto.user_llm_model:
            return {}

        llm = LLMAdapter(self.worker_config_dto.user_llm_model).from_backend()
        provider = self.worker_config_dto.user_llm_model.llm_model.provider.provider_type
        return {provider: llm} if llm else {}

    def adapt_toolkits(self) -> None:
        """Placeholder for toolkit adaptation (not implemented)."""
        raise NotImplementedError("Toolkit adaptation not yet implemented")

    def from_autoppia_user_backend(self, worker_config_dto: WorkerConfigDTO) -> WorkerConfig:
        """Construct worker configuration from backend DTO with validation.
        
        Args:
            worker_config_dto: Source data transfer object from backend.
            
        Returns:
            WorkerConfig: Initialized worker configuration with integrated services.
            
        Raises:
            ValueError: For missing required fields or invalid configurations.
            RuntimeError: If service initialization fails.
        """
        self.worker_config_dto = worker_config_dto
        
        if not worker_config_dto.name:
            raise ValueError("Missing required field: 'name' (worker identifier)")

        return WorkerConfig(
            integrations=self.adapt_integrations(),
            vectorstores=self.adapt_vector_stores(),
            llms=self.adapt_llms(),
            system_prompt=worker_config_dto.system_prompt.prompt if worker_config_dto.system_prompt else None,
            name=worker_config_dto.name,
            ip=worker_config_dto.ip,
            port=worker_config_dto.port
        )

2. Worker Interface & Base Class

Defines the data model for worker configurations and the basic lifecycle methods that all worker implementations must support.

File: autoppia_sdk/src/workers/interface.py


3. Worker Router

The Worker Router manages the routing and communication with AI worker instances. It handles fetching configuration details (such as IP and port) and routing messages to the appropriate worker endpoint.

File: autoppia_sdk/src/workers/worker_router.py


4. Worker API

The Worker API wraps worker implementations in a REST interface using FastAPI. It provides endpoints for processing messages and health checks and handles worker startup and shutdown events.

File: autoppia_sdk/src/workers/worker_api.py

Last updated