Skip to content

Agent Execution & Runtimes

This document describes the supported agent execution models, runtimes, and protocols in the Meta Agent Platform.

Overview

Agents can be executed in various environments to support diverse use cases, including containerized, API-based, protocol-driven, multi-modal, and edge deployments.

Execution Architecture

The platform's execution architecture consists of several key components:

  1. Execution Engine: Manages agent lifecycle and runtime selection
  2. Runtime Adapters: Interface with specific execution environments
  3. Input/Output Processors: Handle data transformation and validation
  4. Observability Layer: Collects metrics, logs, and traces
  5. Security Manager: Enforces security policies and isolation

Agent Execution Architecture

Note: This is a placeholder for an agent execution architecture diagram. The actual diagram should be created and added to the project.

Execution Types

1. Docker Container Agents

  • Isolation: Agents run in Docker containers for security and reproducibility.
  • Lifecycle: Pull image, inject inputs, execute, collect outputs, teardown.
  • Resource Control: CPU, memory, and network limits enforced.
  • Best For: Complex dependencies, custom environments, reproducibility.

Container Execution Flow

sequenceDiagram
    participant Orchestrator
    participant ContainerRuntime
    participant Agent

    Orchestrator->>ContainerRuntime: Pull image
    ContainerRuntime-->>Orchestrator: Image ready
    Orchestrator->>ContainerRuntime: Create container with resource limits
    ContainerRuntime-->>Orchestrator: Container created
    Orchestrator->>ContainerRuntime: Inject inputs (env vars, files, or stdin)
    ContainerRuntime->>Agent: Start execution with inputs
    Agent->>Agent: Process inputs
    Agent-->>ContainerRuntime: Return outputs (stdout, files)
    ContainerRuntime-->>Orchestrator: Collect outputs
    Orchestrator->>ContainerRuntime: Teardown container

Example Docker Agent Configuration

# docker-agent-config.yaml
name: text-classifier
version: 1.0.0
type: docker

container:
  image: registry.example.com/agents/text-classifier:1.0.0
  pull_policy: IfNotPresent
  command: ["python", "/app/run.py"]
  working_dir: /app
  user: agent

resources:
  memory: "512Mi"
  memory_limit: "1Gi"
  cpu: "0.5"
  cpu_limit: "1.0"

input:
  method: env  # env, file, or stdin
  format: json

output:
  method: stdout  # stdout or file
  format: json
  path: /app/output.json  # Only for file output

security:
  read_only_fs: true
  no_new_privileges: true
  drop_capabilities: ["ALL"]
  allow_network: false

2. API Agents

  • RESTful APIs: Agents exposed as HTTP endpoints.
  • Integration: Platform sends requests, receives responses.
  • Authentication: Supports API keys, OAuth2, custom headers.
  • Best For: SaaS agents, cloud services, scalable APIs.

API Execution Flow

sequenceDiagram
    participant Orchestrator
    participant APIAdapter
    participant ExternalAPI

    Orchestrator->>APIAdapter: Send request with inputs
    APIAdapter->>APIAdapter: Format request (JSON, XML, etc.)
    APIAdapter->>APIAdapter: Add authentication headers
    APIAdapter->>ExternalAPI: HTTP request (GET, POST, etc.)
    ExternalAPI-->>APIAdapter: HTTP response
    APIAdapter->>APIAdapter: Parse response
    APIAdapter->>APIAdapter: Transform to platform format
    APIAdapter-->>Orchestrator: Return formatted outputs

Example API Agent Configuration

# api-agent-config.yaml
name: language-translator
version: 1.0.0
type: api

endpoint:
  url: https://api.translation-service.com/v1/translate
  method: POST
  timeout: 30  # seconds
  retry:
    max_attempts: 3
    backoff: exponential
    initial_delay: 1.0  # seconds
    max_delay: 10.0  # seconds

authentication:
  type: api_key
  header_name: X-API-Key
  key_reference: ${TRANSLATION_API_KEY}  # Environment variable reference

request:
  content_type: application/json
  body_template: |
    {
      "text": "${input.text}",
      "source_language": "${input.source_lang}",
      "target_language": "${input.target_lang}"
    }

response:
  success_codes: [200, 201]
  error_path: $.error.message
  output_mapping:
    translated_text: $.data.translation
    detected_language: $.data.detected_source_language
    confidence: $.data.confidence

3. A2A/Open Agent Protocol

  • Standardized Protocol: Supports interoperability with external agent frameworks.
  • Features: Runs, threads, memory management, streaming output.
  • Integration: Adapter classes translate between platform and protocol.
  • Reference: Agent Protocol Integration

A2A Protocol Flow

sequenceDiagram
    participant Orchestrator
    participant A2AAdapter
    participant ExternalAgent

    Orchestrator->>A2AAdapter: Create run with inputs
    A2AAdapter->>ExternalAgent: POST /runs
    ExternalAgent-->>A2AAdapter: Return run_id
    loop Until completed
        A2AAdapter->>ExternalAgent: GET /runs/{run_id}
        ExternalAgent-->>A2AAdapter: Return status
    end
    A2AAdapter->>ExternalAgent: GET /runs/{run_id}/steps
    ExternalAgent-->>A2AAdapter: Return step results
    A2AAdapter->>A2AAdapter: Transform to platform format
    A2AAdapter-->>Orchestrator: Return outputs

Example A2A Agent Configuration

# a2a-agent-config.yaml
name: research-assistant
version: 1.0.0
type: a2a

endpoint:
  url: https://agent-protocol-service.example.com
  timeout: 300  # seconds

authentication:
  type: bearer
  token_reference: ${AGENT_PROTOCOL_TOKEN}

protocol:
  version: "1.0"
  features:
    - streaming
    - thread_management
    - tool_use

capabilities:
  - web_search
  - code_generation
  - data_analysis

streaming: true
max_steps: 15
threads_enabled: true

4. Multi-Modal Runtimes

  • Vision, Audio, Sensor Agents: Specialized runtimes for processing images, audio, sensor data.
  • Frameworks: OpenCV, PyTorch, TensorFlow, Whisper, etc.
  • Fusion: Support for cross-modal orchestration.

Multi-Modal Execution Flow

sequenceDiagram
    participant Orchestrator
    participant ModalityRouter
    participant VisionProcessor
    participant AudioProcessor
    participant TextProcessor
    participant FusionEngine

    Orchestrator->>ModalityRouter: Process multi-modal input
    ModalityRouter->>VisionProcessor: Process image data
    ModalityRouter->>AudioProcessor: Process audio data
    ModalityRouter->>TextProcessor: Process text data
    VisionProcessor-->>FusionEngine: Vision features
    AudioProcessor-->>FusionEngine: Audio features
    TextProcessor-->>FusionEngine: Text features
    FusionEngine->>FusionEngine: Apply fusion strategy
    FusionEngine-->>Orchestrator: Return fused results

Example Multi-Modal Agent Configuration

# multi-modal-agent-config.yaml
name: content-analyzer
version: 1.0.0
type: multi-modal

modalities:
  - type: vision
    processor: image_classifier
    model: efficientnet_b0
    input_format: [jpg, png]
    preprocessing:
      resize: [224, 224]
      normalize: true
  - type: text
    processor: text_analyzer
    model: distilbert-base-uncased
    max_length: 512
  - type: audio
    processor: audio_classifier
    model: yamnet
    sample_rate: 16000

fusion:
  strategy: late_fusion
  weights:
    vision: 0.4
    text: 0.4
    audio: 0.2

resources:
  memory: "2Gi"
  cpu: "1.0"
  gpu: "0.5"

5. Edge Runtime

  • Lightweight Execution: Optimized for resource-constrained devices.
  • Offline Operation: Local storage, sync when online.
  • Technologies: WebAssembly, SQLite, ONNX Runtime, TFLite.

Edge Execution Flow

sequenceDiagram
    participant EdgeDevice
    participant LocalRuntime
    participant Agent
    participant LocalStorage
    participant SyncManager
    participant Cloud

    EdgeDevice->>LocalRuntime: Trigger agent execution
    LocalRuntime->>Agent: Execute with inputs
    Agent->>LocalStorage: Read/write data
    Agent-->>LocalRuntime: Return results
    LocalRuntime-->>EdgeDevice: Display results

    opt When online
        SyncManager->>LocalStorage: Get pending changes
        SyncManager->>Cloud: Sync data and results
        Cloud-->>SyncManager: Sync acknowledgement
        SyncManager->>LocalStorage: Update sync status
    end

Example Edge Agent Configuration

# edge-agent-config.yaml
name: object-detector-edge
version: 1.0.0
type: edge

runtime: tflite
model:
  path: /models/ssd_mobilenet_v2.tflite
  quantized: true
  input_shape: [1, 300, 300, 3]

resources:
  max_memory: "200MB"
  target_cpu: "0.3"

storage:
  type: sqlite
  path: /data/detections.db
  max_size: "50MB"
  retention_days: 7

sync:
  mode: delta
  interval: 3600  # seconds
  max_batch_size: "1MB"

power:
  low_power_mode: true
  wake_lock: false

Execution Flow

Agent Execution Flow

Note: This is a placeholder for an agent execution flow diagram. The actual diagram should be created and added to the project.

  1. Input Preparation: Inputs are validated and formatted per agent schema.

    def prepare_input(raw_input, agent_schema):
        # Validate input against schema
        validation_result = validate_against_schema(raw_input, agent_schema)
        if not validation_result.valid:
            raise ValidationError(validation_result.errors)
    
        # Apply default values for missing fields
        input_with_defaults = apply_defaults(raw_input, agent_schema)
    
        # Transform input to agent-specific format
        transformed_input = transform_input(input_with_defaults, agent_schema)
    
        return transformed_input
    

  2. Execution: Agent is invoked in the appropriate runtime.

    def execute_agent(agent_id, prepared_input, runtime_config):
        # Select appropriate runtime
        runtime = get_runtime_for_agent(agent_id, runtime_config)
    
        # Initialize runtime with agent
        runtime.initialize(agent_id)
    
        # Execute agent with prepared input
        execution_id = runtime.execute(prepared_input)
    
        # Wait for completion or timeout
        result = runtime.wait_for_completion(execution_id, timeout=runtime_config.timeout)
    
        return result
    

  3. Output Handling: Results are validated, post-processed, and returned.

    def handle_output(raw_output, agent_schema):
        # Validate output against schema
        validation_result = validate_against_schema(raw_output, agent_schema.output)
        if not validation_result.valid:
            raise OutputValidationError(validation_result.errors)
    
        # Transform output to platform format
        transformed_output = transform_output(raw_output, agent_schema.output)
    
        # Apply any post-processing
        processed_output = apply_post_processing(transformed_output, agent_schema.post_processing)
    
        return processed_output
    

  4. Observability: Logs, metrics, and traces are collected for monitoring.

    def collect_observability_data(execution_id, agent_id, runtime_metrics):
        # Collect logs
        logs = get_logs_for_execution(execution_id)
    
        # Collect metrics
        metrics = {
            'execution_time_ms': runtime_metrics.execution_time,
            'memory_usage_mb': runtime_metrics.memory_usage,
            'cpu_usage_percent': runtime_metrics.cpu_usage
        }
    
        # Create trace
        trace = create_execution_trace(execution_id, agent_id, logs, metrics)
    
        # Store observability data
        store_observability_data(trace)
    
        return trace
    

Protocols & Adapters

Adapter Pattern

Adapters standardize the interface between the platform and various agent runtimes:

from abc import ABC, abstractmethod

class RuntimeAdapter(ABC):
    """Base adapter interface for all runtime types"""

    @abstractmethod
    def initialize(self, agent_id):
        """Initialize the runtime with the specified agent"""
        pass

    @abstractmethod
    def execute(self, input_data):
        """Execute the agent with the given input"""
        pass

    @abstractmethod
    def wait_for_completion(self, execution_id, timeout=None):
        """Wait for execution to complete and return results"""
        pass

    @abstractmethod
    def get_status(self, execution_id):
        """Get the current status of an execution"""
        pass

    @abstractmethod
    def terminate(self, execution_id):
        """Terminate an execution"""
        pass

# Example Docker adapter implementation
class DockerRuntimeAdapter(RuntimeAdapter):
    def __init__(self, docker_client, config):
        self.docker = docker_client
        self.config = config
        self.containers = {}

    def initialize(self, agent_id):
        agent_config = get_agent_config(agent_id)
        image = agent_config.container.image

        # Pull image if needed
        if agent_config.container.pull_policy == 'Always':
            self.docker.images.pull(image)

        return True

    def execute(self, input_data):
        agent_id = input_data.agent_id
        agent_config = get_agent_config(agent_id)

        # Prepare container configuration
        container_config = {
            'image': agent_config.container.image,
            'command': agent_config.container.command,
            'environment': self._prepare_environment(input_data, agent_config),
            'mem_limit': agent_config.resources.memory_limit,
            'cpu_quota': int(agent_config.resources.cpu_limit * 100000),
            'network_disabled': not agent_config.security.allow_network,
            'read_only': agent_config.security.read_only_fs,
            'user': agent_config.container.user
        }

        # Create and start container
        container = self.docker.containers.run(
            **container_config,
            detach=True
        )

        execution_id = f"docker-{container.id[:12]}"
        self.containers[execution_id] = container

        return execution_id

    def wait_for_completion(self, execution_id, timeout=None):
        container = self.containers.get(execution_id)
        if not container:
            raise ValueError(f"Unknown execution ID: {execution_id}")

        try:
            container.wait(timeout=timeout)
            logs = container.logs().decode('utf-8')

            # Parse output based on agent configuration
            agent_id = self._get_agent_id_for_execution(execution_id)
            agent_config = get_agent_config(agent_id)

            if agent_config.output.method == 'stdout':
                return self._parse_output(logs, agent_config)
            elif agent_config.output.method == 'file':
                output_file = agent_config.output.path
                output_content = self._get_file_from_container(container, output_file)
                return self._parse_output(output_content, agent_config)
        finally:
            # Cleanup container
            self._cleanup_container(execution_id)

    # Other methods implementation...

Security Considerations

Container Security

  • Image Scanning: Scan container images for vulnerabilities before execution.

    # Example using Trivy scanner
    trivy image registry.example.com/agents/text-classifier:1.0.0
    

  • Non-Root Execution: Run containers as non-privileged users.

    # In Dockerfile
    RUN useradd -r -s /bin/false agent
    USER agent
    

  • Resource Limits: Enforce CPU, memory, and storage limits.

    # In Kubernetes deployment
    resources:
      limits:
        cpu: "1"
        memory: "512Mi"
      requests:
        cpu: "0.5"
        memory: "256Mi"
    

API Security

  • Input Validation: Validate all API inputs against schemas.

    def validate_api_input(input_data, schema):
        try:
            jsonschema.validate(instance=input_data, schema=schema)
            return True, None
        except jsonschema.exceptions.ValidationError as e:
            return False, str(e)
    

  • Rate Limiting: Implement rate limiting to prevent abuse.

    # Using Redis for rate limiting
    def check_rate_limit(api_key, limit=100, period=3600):
        current = int(time.time())
        key = f"rate_limit:{api_key}:{current // period}"
        count = redis_client.incr(key)
    
        if count == 1:
            redis_client.expire(key, period)
    
        if count > limit:
            return False  # Rate limit exceeded
    
        return True  # Within rate limit
    

  • Authentication: Implement robust authentication mechanisms.

    def verify_api_auth(request, auth_config):
        if auth_config.type == 'api_key':
            api_key = request.headers.get(auth_config.header_name)
            if not api_key:
                return False, "Missing API key"
    
            return verify_api_key(api_key), "Invalid API key"
    
        elif auth_config.type == 'oauth2':
            token = request.headers.get('Authorization', '').replace('Bearer ', '')
            if not token:
                return False, "Missing OAuth token"
    
            return verify_oauth_token(token), "Invalid OAuth token"
    

Edge Security

  • Secure Boot: Verify integrity of edge runtime before execution.
  • Encrypted Storage: Encrypt sensitive data stored on edge devices.
  • Secure Updates: Implement signed updates and rollback mechanisms.

Troubleshooting

Issue Possible Cause Solution
Container fails to start Missing image or invalid configuration Check image exists and configuration is valid
API timeout Slow external service or network issues Implement retry with backoff, check service status
Protocol version mismatch Incompatible agent protocol version Update adapter or agent to compatible version
Resource exhaustion Insufficient resource limits Increase resource limits or optimize agent
Authentication failure Invalid or expired credentials Refresh credentials or check configuration

References


Last updated: 2025-04-18