Workers Module

The Workers module in the Autoppia SDK provides the necessary tools for configuring, deploying, and interacting with AI worker agents. It covers the conversion of backend configurations into worker instances, the definition of worker interfaces, and REST API and routing mechanisms for message processing.

Module Components

  • Worker Configuration & Adapter: Converts backend Data Transfer Objects (DTOs) into worker configuration models.

  • Worker Interface & Base Class: Defines the core interface that all AI worker implementations must follow.

  • Worker Router: Provides a mechanism to route messages to worker instances.

  • Worker API: Offers a REST API wrapper (using FastAPI) for interacting with workers.


1. Worker Configuration & Adapter

This section converts backend configuration DTOs into a domain-specific worker configuration model, integrating services such as integrations, vector stores, and language models (LLMs).

File: autoppia_sdk/src/workers/adapter.py

pythonCopiarfrom typing import Optional, Dict, Any
from autoppia_backend_client.models import WorkerConfig as WorkerConfigDTO
from autoppia_sdk.src.integrations.adapter import IntegrationsAdapter
from autoppia_sdk.src.vectorstores.adapter import VectorStoreAdapter
from autoppia_sdk.src.llms.adapter import LLMAdapter
from autoppia_sdk.src.workers.interface import WorkerConfig

class AIWorkerConfigAdapter:
    """Adapter for constructing worker configurations from backend DTOs.
    
    Handles conversion of backend data transfer objects into domain-specific
    configuration models with proper validation and resource management.
    
    Args:
        worker_id: Optional identifier for worker instance tracking
    """
    
    def __init__(self, worker_id: Optional[str] = None) -> None:
        self.worker_id = worker_id
        self.worker_config_dto: Optional[WorkerConfigDTO] = None

    def adapt_integrations(self) -> Dict[str, Any]:
        """Adapt integrations configuration from backend DTO.
        
        Returns:
            Dictionary of initialized integration clients keyed by provider.
            
        Raises:
            ValueError: If required integration configuration is missing.
        """
        if not self.worker_config_dto:
            raise ValueError("Configuration DTO not loaded")
        return IntegrationsAdapter().from_autoppia_backend(self.worker_config_dto)

    def adapt_vector_stores(self) -> Dict[str, Any]:
        """Adapt vector store configuration from backend DTO.
        
        Returns:
            Dictionary of vector store clients keyed by provider.
        """
        if not self.worker_config_dto or not self.worker_config_dto.embedding_database:
            return {}

        vector_store = VectorStoreAdapter(
            self.worker_config_dto.embedding_database
        ).from_backend()
        
        return {self.worker_config_dto.embedding_database.provider: vector_store} if vector_store else {}

    def adapt_llms(self) -> Dict[str, Any]:
        """Adapt LLM configuration from backend DTO.
        
        Returns:
            Dictionary of LLM clients keyed by provider.
        """
        if not self.worker_config_dto or not self.worker_config_dto.user_llm_model:
            return {}

        llm = LLMAdapter(self.worker_config_dto.user_llm_model).from_backend()
        provider = self.worker_config_dto.user_llm_model.llm_model.provider.provider_type
        return {provider: llm} if llm else {}

    def adapt_toolkits(self) -> None:
        """Placeholder for toolkit adaptation (not implemented)."""
        raise NotImplementedError("Toolkit adaptation not yet implemented")

    def from_autoppia_user_backend(self, worker_config_dto: WorkerConfigDTO) -> WorkerConfig:
        """Construct worker configuration from backend DTO with validation.
        
        Args:
            worker_config_dto: Source data transfer object from backend.
            
        Returns:
            WorkerConfig: Initialized worker configuration with integrated services.
            
        Raises:
            ValueError: For missing required fields or invalid configurations.
            RuntimeError: If service initialization fails.
        """
        self.worker_config_dto = worker_config_dto
        
        if not worker_config_dto.name:
            raise ValueError("Missing required field: 'name' (worker identifier)")

        return WorkerConfig(
            integrations=self.adapt_integrations(),
            vectorstores=self.adapt_vector_stores(),
            llms=self.adapt_llms(),
            system_prompt=worker_config_dto.system_prompt.prompt if worker_config_dto.system_prompt else None,
            name=worker_config_dto.name,
            ip=worker_config_dto.ip,
            port=worker_config_dto.port
        )

2. Worker Interface & Base Class

Defines the data model for worker configurations and the basic lifecycle methods that all worker implementations must support.

File: autoppia_sdk/src/workers/interface.py

pythonCopiarfrom abc import ABC, abstractmethod
from autoppia_sdk.src.integrations.interface import IntegrationInterface
from autoppia_sdk.src.llms.interface import LLMServiceInterface
from autoppia_sdk.src.vectorstores.interface import VectorStoreInterface
from dataclasses import dataclass, field
from typing import Dict, Optional, Any

@dataclass
class WorkerConfig:
    """Configuration container for worker instances.
    
    Attributes:
        name: Unique identifier for the worker configuration.
        system_prompt: Base prompt template for the worker.
        ip: IP address of the worker.
        port: Port number of the worker.
        integrations: Dictionary of integration clients keyed by provider.
        llms: Dictionary of LLM services keyed by provider.
        vectorstores: Dictionary of vector stores keyed by provider.
        extra_arguments: Additional provider-specific configuration parameters.
    """
    name: str
    system_prompt: Optional[str] = None
    ip: Optional[str] = None
    port: Optional[int] = None
    integrations: Dict[str, IntegrationInterface] = field(default_factory=dict)
    llms: Dict[str, LLMServiceInterface] = field(default_factory=dict)
    vectorstores: Dict[str, VectorStoreInterface] = field(default_factory=dict)
    extra_arguments: Dict[str, Any] = field(default_factory=dict)

class AIWorker(ABC):
    """Base interface that all marketplace agents must implement.
    
    This abstract class defines the core interface that all AI workers/agents
    must implement to be compatible with the Autoppia SDK. It provides the basic
    lifecycle and interaction methods required for agent operation.
    """

    @abstractmethod
    def start(self) -> None:
        """Initialize the agent and any required resources.
        
        Should be called before any calls to the agent are made.
        """

    @abstractmethod
    def stop(self) -> None:
        """Cleanup and release any resources.
        
        Should be called when the agent is no longer needed.
        """

    @abstractmethod
    def call(self, message: str) -> str:
        """Process a message and return a response.
        
        Args:
            message: The input message/query to be processed by the agent.
            
        Returns:
            str: The agent's response to the input message.
        """

3. Worker Router

The Worker Router manages the routing and communication with AI worker instances. It handles fetching configuration details (such as IP and port) and routing messages to the appropriate worker endpoint.

File: autoppia_sdk/src/workers/worker_router.py

pythonCopiarimport requests
from autoppia_sdk.src.workers.adapter import AIWorkerConfigAdapter

class WorkerRouter():
    """A router class for handling communication with AI workers.

    This class manages the routing and communication with AI worker instances,
    handling configuration retrieval and message processing.
    """

    @classmethod
    def from_id(cls, worker_id: int):
        """Fetches worker IP and port from the info endpoint.
        
        Args:
            worker_id (int): Identifier for the worker.
            
        Returns:
            WorkerRouter: An instance configured with the worker's IP and port.
            
        Raises:
            Exception: If fetching worker info fails.
        """
        try:
            payload = {
                "SECRET": "ekwklrklkfewf3232nm",
                "id": worker_id
            }
            response = requests.get("http://3.251.99.81/info", json=payload)
            data = response.json()
            print("data", data)
            ip = data.get("ip")
            port = data.get("port")
            
            if not ip or not port:
                raise ValueError("Invalid response: missing ip or port")
                
            return cls(ip, port)
        except Exception as e:
            raise Exception(f"Failed to fetch worker info: {str(e)}")
    
    def __init__(self, ip: str, port: int):
        """Initializes a WorkerRouter instance.
        
        Args:
            ip (str): The IP address of the worker.
            port (int): The port number the worker is listening on.
        """
        self.ip = ip
        self.port = port
    
    def call(self, message: str):
        """Sends a message to the worker for processing.
        
        Args:
            message (str): The message to be processed by the worker.
            
        Returns:
            Any: The processed result from the worker.
            
        Raises:
            Exception: If the worker call fails or returns an error status.
        """
        try:
            url = f"http://{self.ip}:{self.port}/process"
            response = requests.post(url, json={"message": message})
            response.raise_for_status()  # Raise an exception for bad status codes
            
            return response.json()["result"]
        except Exception as e:
            raise Exception(f"Failed to call worker: {str(e)}")

4. Worker API

The Worker API wraps worker implementations in a REST interface using FastAPI. It provides endpoints for processing messages and health checks and handles worker startup and shutdown events.

File: autoppia_sdk/src/workers/worker_api.py

pythonCopiarfrom typing import Optional
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

class WorkerMessage(BaseModel):
    """
    Pydantic model for worker message requests.
    
    Attributes:
        message (str): The message to be processed by the worker.
    """
    message: str

class WorkerAPI:
    """
    FastAPI wrapper for worker implementations.
    
    This class provides a REST API interface for worker operations including
    health checks and message processing.
    
    Attributes:
        app (FastAPI): The FastAPI application instance.
        worker: The worker instance that handles message processing.
    """
    def __init__(self, worker):
        """
        Initialize the WorkerAPI.
        
        Args:
            worker: Worker instance that will process messages.
        """
        self.app = FastAPI(title="Worker API", description="API Wrapper for the worker")
        self.worker = worker

        # Register routes
        self.setup_routes()

    def setup_routes(self):
        """
        Configure API routes and event handlers.
        
        Sets up the following endpoints:
        - POST /process: Process a message.
        - GET /health: Check worker health.
        
        And event handlers:
        - startup: Starts the worker.
        - shutdown: Stops the worker.
        """
        @self.app.on_event("startup")
        async def startup_event():
            self.worker.start()

        @self.app.on_event("shutdown")
        async def shutdown_event():
            if self.worker:
                self.worker.stop()

        @self.app.post("/process")
        def process_message(message: WorkerMessage):
            """
            Process a message using the worker.
            
            Args:
                message (WorkerMessage): The message to process.
                
            Returns:
                dict: Contains the processing result.
                
            Raises:
                HTTPException: If worker is not initialized or processing fails.
            """
            if not self.worker:
                raise HTTPException(status_code=500, detail="Worker not initialized")

            try:
                result = self.worker.call(message.message)
                return {"result": result}
            except Exception as e:
                raise HTTPException(status_code=500, detail=str(e))

        @self.app.get("/health")
        async def health_check():
            """
            Check the health status of the worker.
            
            Returns:
                dict: Contains status and worker type information.
            """
            return {"status": "healthy", "worker_type": self.worker.__class__.__name__}

    def get_app(self) -> FastAPI:
        """
        Get the FastAPI application instance.
        
        Returns:
            FastAPI: The configured FastAPI application.
        """
        return self.app

Last updated