Agent Security & Compliance
This document outlines security and compliance best practices for developing and running agents in the Meta Agent Platform.
Overview
Agents can process sensitive data and execute code in diverse environments. Security and compliance are critical to protect users, organizations, and the platform.
Security Architecture
The platform's security architecture consists of several key components:
- Authentication & Authorization: Controls agent access to resources
- Secure Execution: Isolates agent execution environments
- Data Protection: Encrypts data at rest and in transit
- Monitoring & Auditing: Tracks agent activities and detects anomalies
- Compliance Framework: Enforces regulatory requirements

Note: This is a placeholder for an agent security architecture diagram. The actual diagram should be created and added to the project.
Security Principles
- Defense in Depth: Multiple layers of security controls
- Zero Trust: Verify every access attempt regardless of source
- Least Privilege: Minimal access rights for each agent
- Secure by Default: Security enabled without explicit configuration
- Privacy by Design: Privacy considerations built into agent development
Security Best Practices
Input Validation
Strictly validate and sanitize all inputs to prevent injection attacks and unexpected behavior:
import jsonschema
from typing import Dict, Any
def validate_input(input_data: Dict[str, Any], schema: Dict[str, Any]) -> bool:
"""Validate input data against a JSON schema."""
try:
jsonschema.validate(instance=input_data, schema=schema)
return True
except jsonschema.exceptions.ValidationError as e:
logging.error(f"Input validation error: {e}")
return False
# Example schema for an image classification agent
input_schema = {
"type": "object",
"required": ["image"],
"properties": {
"image": {
"type": "string",
"format": "binary",
"description": "Image to classify (JPEG or PNG)"
},
"top_k": {
"type": "integer",
"minimum": 1,
"maximum": 20,
"default": 5
}
},
"additionalProperties": False # Reject unexpected properties
}
# Validate input before processing
def process_request(request_data):
if not validate_input(request_data, input_schema):
return {"error": "Invalid input"}
# Process the validated input
# ...
Output Filtering
Prevent leakage of sensitive data in outputs:
def filter_sensitive_data(output_data):
"""Filter sensitive data from outputs."""
# Define patterns for sensitive data (PII, credentials, etc.)
patterns = [
r'\b\d{3}-\d{2}-\d{4}\b', # SSN
r'\b\d{16}\b', # Credit card
r'password["\']?\s*[:=]\s*["\']?[^"\',;\s]+', # Passwords
# Add more patterns as needed
]
# Convert output to string for filtering
output_str = json.dumps(output_data)
# Replace sensitive data with redacted text
for pattern in patterns:
output_str = re.sub(pattern, '[REDACTED]', output_str)
# Convert back to original format
return json.loads(output_str)
# Apply filtering before returning results
def get_response(result_data):
filtered_data = filter_sensitive_data(result_data)
return filtered_data
Least Privilege
Run agents with minimal permissions and resource access:
# Example Docker container configuration with least privilege
container:
image: registry.example.com/agents/text-classifier:1.0.0
user: non-root-user
read_only: true
capabilities:
drop: [ALL] # Drop all capabilities
resources:
limits:
memory: "512Mi"
cpu: "0.5"
security_context:
allow_privilege_escalation: false
run_as_non_root: true
network:
type: restricted
egress:
- host: api.example.com
ports: [443]
Container Security
Use non-root containers, scan images, and restrict network access:
# Example Dockerfile with security best practices
FROM python:3.9-slim
# Create non-root user
RUN useradd -r -s /bin/false appuser
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY app/ /app/
WORKDIR /app
# Set proper permissions
RUN chown -R appuser:appuser /app
# Switch to non-root user
USER appuser
# Use read-only filesystem where possible
VOLUME ["/app/data"]
# Run with minimal capabilities
ENTRYPOINT ["python", "main.py"]
API Security
Authenticate API calls, rate limit, and validate payloads:
from functools import wraps
from flask import request, jsonify
import time
import redis
# Redis client for rate limiting
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def require_api_key(f):
"""Decorator to require API key authentication."""
@wraps(f)
def decorated(*args, **kwargs):
api_key = request.headers.get('X-API-Key')
if not api_key or not is_valid_api_key(api_key):
return jsonify({"error": "Unauthorized"}), 401
return f(*args, **kwargs)
return decorated
def rate_limit(limit=100, period=3600):
"""Decorator to apply rate limiting."""
def decorator(f):
@wraps(f)
def decorated(*args, **kwargs):
api_key = request.headers.get('X-API-Key')
if not api_key:
return jsonify({"error": "Unauthorized"}), 401
# Rate limiting key includes API key and time period
rate_key = f"rate:{api_key}:{int(time.time() // period)}"
current = redis_client.incr(rate_key)
# Set expiration on first request
if current == 1:
redis_client.expire(rate_key, period)
# Check if over limit
if current > limit:
return jsonify({"error": "Rate limit exceeded"}), 429
return f(*args, **kwargs)
return decorated
return decorator
# Example API endpoint with security measures
@app.route('/api/classify', methods=['POST'])
@require_api_key
@rate_limit(limit=100, period=3600)
def classify_image():
# Validate input
if not request.is_json:
return jsonify({"error": "Invalid content type"}), 400
data = request.get_json()
if not validate_input(data, input_schema):
return jsonify({"error": "Invalid input"}), 400
# Process request
result = process_image(data)
# Filter sensitive data from output
filtered_result = filter_sensitive_data(result)
return jsonify(filtered_result)
Edge Security
Encrypt local storage, use secure boot, and secure update mechanisms:
from cryptography.fernet import Fernet
import os
import json
class SecureLocalStorage:
"""Encrypted local storage for edge agents."""
def __init__(self, key_path):
# Load or generate encryption key
if os.path.exists(key_path):
with open(key_path, 'rb') as key_file:
self.key = key_file.read()
else:
self.key = Fernet.generate_key()
with open(key_path, 'wb') as key_file:
key_file.write(self.key)
self.cipher = Fernet(self.key)
def store(self, data_key, data):
"""Encrypt and store data."""
serialized = json.dumps(data).encode('utf-8')
encrypted = self.cipher.encrypt(serialized)
# Store encrypted data
with open(f"data/{data_key}.enc", 'wb') as f:
f.write(encrypted)
def retrieve(self, data_key):
"""Retrieve and decrypt data."""
try:
with open(f"data/{data_key}.enc", 'rb') as f:
encrypted = f.read()
decrypted = self.cipher.decrypt(encrypted)
return json.loads(decrypted.decode('utf-8'))
except Exception as e:
logging.error(f"Error retrieving data: {e}")
return None
# Secure update verification
def verify_update_signature(update_file, signature_file, public_key_file):
"""Verify digital signature of an update package."""
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.serialization import load_pem_public_key
# Load the public key
with open(public_key_file, 'rb') as f:
public_key = load_pem_public_key(f.read())
# Load the update file and signature
with open(update_file, 'rb') as f:
update_data = f.read()
with open(signature_file, 'rb') as f:
signature = f.read()
# Verify the signature
try:
public_key.verify(
signature,
update_data,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
return True
except Exception as e:
logging.error(f"Signature verification failed: {e}")
return False
Dependency Management
Use trusted dependencies and scan for vulnerabilities:
# Example requirements.txt with pinned versions
tensorflow==2.8.0
numpy==1.22.3
pillow==9.0.1
requests==2.27.1
cryptography==36.0.2
# Script to scan dependencies for vulnerabilities
#!/bin/bash
# Install safety tool
pip install safety
# Scan dependencies
safety check -r requirements.txt --full-report
# Fail if high severity vulnerabilities are found
if [ $? -ne 0 ]; then
echo "Security vulnerabilities found in dependencies!"
exit 1
fi
Secrets Management
Never hardcode secrets; use platform secret management:
from meta_agent_platform import SecretManager
# Initialize the secret manager
secret_manager = SecretManager()
# Retrieve a secret
def get_api_credentials():
"""Get API credentials from the platform secret manager."""
try:
api_key = secret_manager.get_secret("external_api_key")
api_secret = secret_manager.get_secret("external_api_secret")
return api_key, api_secret
except Exception as e:
logging.error(f"Failed to retrieve API credentials: {e}")
return None, None
# Use the credentials
def call_external_api(data):
api_key, api_secret = get_api_credentials()
if not api_key or not api_secret:
return {"error": "Failed to retrieve API credentials"}
# Make API call with credentials
# ...
Logging
Avoid logging sensitive data; use structured, secure logs:
import logging
import json
from datetime import datetime
class SecureLogger:
"""Logger that redacts sensitive information."""
def __init__(self, log_file, log_level=logging.INFO):
self.logger = logging.getLogger("secure_logger")
self.logger.setLevel(log_level)
# Create file handler
handler = logging.FileHandler(log_file)
handler.setLevel(log_level)
# Create formatter
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
# Add handler to logger
self.logger.addHandler(handler)
# Sensitive fields to redact
self.sensitive_fields = [
"password", "token", "api_key", "secret", "ssn", "credit_card",
"account_number", "email", "phone", "address"
]
def _redact_sensitive_data(self, data):
"""Redact sensitive fields from data."""
if isinstance(data, dict):
redacted = {}
for key, value in data.items():
if any(sensitive in key.lower() for sensitive in self.sensitive_fields):
redacted[key] = "[REDACTED]"
elif isinstance(value, (dict, list)):
redacted[key] = self._redact_sensitive_data(value)
else:
redacted[key] = value
return redacted
elif isinstance(data, list):
return [self._redact_sensitive_data(item) for item in data]
else:
return data
def log(self, level, message, data=None):
"""Log a message with optional structured data."""
if data:
# Redact sensitive information
redacted_data = self._redact_sensitive_data(data)
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"message": message,
"data": redacted_data
}
self.logger.log(level, json.dumps(log_entry))
else:
self.logger.log(level, message)
def info(self, message, data=None):
self.log(logging.INFO, message, data)
def warning(self, message, data=None):
self.log(logging.WARNING, message, data)
def error(self, message, data=None):
self.log(logging.ERROR, message, data)
def critical(self, message, data=None):
self.log(logging.CRITICAL, message, data)
# Example usage
logger = SecureLogger("agent.log")
logger.info("Processing request", {"user_id": 123, "api_key": "secret-key-value"})
# Logs: {"timestamp": "2025-04-18T12:34:56.789", "message": "Processing request", "data": {"user_id": 123, "api_key": "[REDACTED]"}}
Observability
Instrument agents for monitoring and anomaly detection:
from prometheus_client import Counter, Histogram, start_http_server
import time
# Define metrics
REQUEST_COUNT = Counter('agent_requests_total', 'Total number of requests', ['status'])
REQUEST_LATENCY = Histogram('agent_request_latency_seconds', 'Request latency in seconds')
ERROR_COUNT = Counter('agent_errors_total', 'Total number of errors', ['type'])
# Start metrics server
start_http_server(8000)
# Decorator for monitoring function execution
def monitor(f):
@wraps(f)
def decorated(*args, **kwargs):
start_time = time.time()
try:
result = f(*args, **kwargs)
REQUEST_COUNT.labels(status='success').inc()
return result
except Exception as e:
REQUEST_COUNT.labels(status='error').inc()
ERROR_COUNT.labels(type=type(e).__name__).inc()
raise
finally:
REQUEST_LATENCY.observe(time.time() - start_time)
return decorated
# Example usage
@monitor
def process_image(data):
# Process image
# ...
return result
Compliance Considerations
Data Privacy
Respect user privacy and comply with regulations (GDPR, HIPAA, etc.):
from meta_agent_platform import PrivacyManager
class PrivacyCompliantAgent:
"""Agent that implements privacy compliance features."""
def __init__(self, config):
self.privacy_manager = PrivacyManager()
self.data_retention_days = config.get('data_retention_days', 30)
self.required_consent = config.get('required_consent', [])
def process_personal_data(self, user_id, data, purpose):
"""Process personal data with privacy controls."""
# Check if we have consent for this purpose
if purpose in self.required_consent:
has_consent = self.privacy_manager.check_consent(user_id, purpose)
if not has_consent:
return {"error": f"No consent for {purpose}"}
# Record data processing activity for compliance reporting
self.privacy_manager.record_processing_activity(
user_id=user_id,
data_categories=["profile", "preferences"],
purpose=purpose,
retention_period=f"{self.data_retention_days} days"
)
# Apply data minimization
minimized_data = self.privacy_manager.minimize_data(data, purpose)
# Process the data
result = self._process_data(minimized_data)
# Schedule data deletion after retention period
self.privacy_manager.schedule_deletion(
user_id=user_id,
data_reference=result['data_reference'],
delete_after_days=self.data_retention_days
)
return result
def handle_data_subject_request(self, user_id, request_type):
"""Handle data subject requests (access, deletion, etc.)."""
if request_type == "access":
# Retrieve all data for this user
user_data = self.privacy_manager.get_user_data(user_id)
return {"user_data": user_data}
elif request_type == "delete":
# Delete all data for this user
self.privacy_manager.delete_user_data(user_id)
return {"status": "deleted"}
elif request_type == "export":
# Export all data in portable format
export_data = self.privacy_manager.export_user_data(user_id)
return {"export_data": export_data}
return {"error": "Unknown request type"}
Audit Trails
Maintain logs for all agent actions and data access:
from meta_agent_platform import AuditLogger
import uuid
from datetime import datetime
class AuditableAgent:
"""Agent that maintains comprehensive audit trails."""
def __init__(self, config):
self.audit_logger = AuditLogger()
self.agent_id = config.get('agent_id')
def _create_audit_event(self, event_type, user_id=None, resource_id=None, details=None):
"""Create a standardized audit event."""
return {
"event_id": str(uuid.uuid4()),
"timestamp": datetime.utcnow().isoformat(),
"agent_id": self.agent_id,
"event_type": event_type,
"user_id": user_id,
"resource_id": resource_id,
"details": details or {},
"source_ip": self._get_client_ip(),
"success": True
}
def log_data_access(self, user_id, data_type, purpose):
"""Log data access event."""
event = self._create_audit_event(
event_type="DATA_ACCESS",
user_id=user_id,
resource_id=data_type,
details={
"purpose": purpose,
"access_time": datetime.utcnow().isoformat()
}
)
self.audit_logger.log(event)
def log_data_modification(self, user_id, data_type, action):
"""Log data modification event."""
event = self._create_audit_event(
event_type="DATA_MODIFICATION",
user_id=user_id,
resource_id=data_type,
details={
"action": action,
"modification_time": datetime.utcnow().isoformat()
}
)
self.audit_logger.log(event)
def log_authentication(self, user_id, success, failure_reason=None):
"""Log authentication event."""
event = self._create_audit_event(
event_type="AUTHENTICATION",
user_id=user_id,
details={
"success": success,
"failure_reason": failure_reason
}
)
event["success"] = success
self.audit_logger.log(event)
def log_api_call(self, user_id, endpoint, method, status_code):
"""Log API call event."""
event = self._create_audit_event(
event_type="API_CALL",
user_id=user_id,
resource_id=endpoint,
details={
"method": method,
"status_code": status_code
}
)
self.audit_logger.log(event)
Data Residency
Ensure data stays within required jurisdictions:
from meta_agent_platform import DataResidencyManager
class ResidencyAwareAgent:
"""Agent that enforces data residency requirements."""
def __init__(self, config):
self.residency_manager = DataResidencyManager()
self.allowed_regions = config.get('allowed_regions', [])
def process_data(self, data, user_region):
"""Process data with residency controls."""
# Check if we can process data in this region
if user_region not in self.allowed_regions:
return {
"error": f"Data processing not allowed in region {user_region}",
"allowed_regions": self.allowed_regions
}
# Select storage location based on region
storage_location = self.residency_manager.get_storage_for_region(user_region)
# Process data in the appropriate region
result = self.residency_manager.process_in_region(
data=data,
region=user_region,
processor=self._process_data
)
# Store result in the appropriate region
storage_reference = self.residency_manager.store_in_region(
data=result,
region=user_region,
storage_location=storage_location
)
return {
"result": result,
"storage_reference": storage_reference,
"storage_region": user_region
}
def get_data(self, storage_reference, requester_region):
"""Retrieve data with residency controls."""
# Check if data can be accessed from this region
can_access, reason = self.residency_manager.can_access_from_region(
storage_reference=storage_reference,
requester_region=requester_region
)
if not can_access:
return {"error": reason}
# Retrieve data
data = self.residency_manager.retrieve_from_region(
storage_reference=storage_reference
)
return {"data": data}
Consent Management
Obtain and record user consent where required:
from meta_agent_platform import ConsentManager
from datetime import datetime
class ConsentAwareAgent:
"""Agent that manages user consent."""
def __init__(self, config):
self.consent_manager = ConsentManager()
self.required_consents = config.get('required_consents', [])
def request_consent(self, user_id, purpose, description):
"""Request consent from a user."""
consent_request = {
"user_id": user_id,
"purpose": purpose,
"description": description,
"requested_at": datetime.utcnow().isoformat(),
"version": "1.0"
}
# Record the consent request
request_id = self.consent_manager.create_consent_request(consent_request)
return {
"request_id": request_id,
"consent_request": consent_request
}
def record_consent(self, user_id, purpose, granted, request_id=None):
"""Record user's consent decision."""
consent_record = {
"user_id": user_id,
"purpose": purpose,
"granted": granted,
"timestamp": datetime.utcnow().isoformat(),
"request_id": request_id,
"version": "1.0"
}
# Store the consent record
record_id = self.consent_manager.store_consent(consent_record)
return {
"record_id": record_id,
"consent_record": consent_record
}
def check_consent(self, user_id, purpose):
"""Check if user has given consent for a specific purpose."""
consent_status = self.consent_manager.get_consent_status(user_id, purpose)
return {
"user_id": user_id,
"purpose": purpose,
"has_consent": consent_status.get('granted', False),
"timestamp": consent_status.get('timestamp'),
"version": consent_status.get('version')
}
def withdraw_consent(self, user_id, purpose):
"""Record withdrawal of consent."""
self.consent_manager.withdraw_consent(user_id, purpose)
return {
"user_id": user_id,
"purpose": purpose,
"status": "withdrawn",
"timestamp": datetime.utcnow().isoformat()
}
Compliance Badges
Agents passing compliance checks can display badges in the marketplace:
# Example compliance metadata for an agent
compliance:
certifications:
- name: "GDPR Compliant"
issuer: "EU Privacy Shield"
issued_date: "2025-01-15"
expiration_date: "2026-01-15"
verification_url: "https://compliance.example.com/verify/gdpr-12345"
- name: "HIPAA Compliant"
issuer: "Healthcare Compliance Board"
issued_date: "2025-02-20"
expiration_date: "2026-02-20"
verification_url: "https://compliance.example.com/verify/hipaa-67890"
data_handling:
data_types:
- name: "Personal Information"
retention_period: "30 days"
encryption: "AES-256"
- name: "Health Records"
retention_period: "7 years"
encryption: "AES-256"
special_handling: "HIPAA BAA Required"
data_residency:
regions:
- "US"
- "EU"
- "CA"
restrictions:
- "No data transfer outside approved regions"
data_subject_rights:
supported:
- "Access"
- "Rectification"
- "Erasure"
- "Portability"
contact_email: "privacy@example.com"
response_time: "30 days"
Regulatory Compliance Matrix
| Regulation | Key Requirements | Implementation |
|---|---|---|
| GDPR | - Lawful basis for processing - Data subject rights - Data protection by design - Breach notification |
- Consent management - Subject rights API - Privacy by design - Incident response plan |
| HIPAA | - PHI protection - Technical safeguards - Business Associate Agreements - Audit controls |
- Encryption - Access controls - BAA templates - Comprehensive audit logs |
| CCPA/CPRA | - Right to know - Right to delete - Right to opt-out - Data categories |
- Data inventory - Deletion workflow - Preference center - Data mapping |
| SOC 2 | - Security - Availability - Processing integrity - Confidentiality - Privacy |
- Security controls - Uptime monitoring - Data validation - Encryption - Privacy policies |
Secure Agent Lifecycle

Note: This is a placeholder for a secure agent lifecycle diagram. The actual diagram should be created and added to the project.
1. Design
Threat model your agent and plan for security:
flowchart TD
A[Identify Assets] --> B[Identify Threats]
B --> C[Assess Vulnerabilities]
C --> D[Evaluate Risks]
D --> E[Define Security Controls]
E --> F[Document Security Requirements]
Key Activities: - Conduct threat modeling sessions - Identify sensitive data flows - Define security requirements - Create a security architecture - Establish privacy controls
Example Threat Model:
# Example threat model for an image classification agent
threat_model:
assets:
- name: "User Images"
sensitivity: "High"
impact_if_compromised: "High"
- name: "Classification Model"
sensitivity: "Medium"
impact_if_compromised: "Medium"
- name: "API Credentials"
sensitivity: "High"
impact_if_compromised: "High"
threats:
- name: "Unauthorized Access"
likelihood: "Medium"
impact: "High"
mitigations:
- "Strong authentication"
- "Access control"
- "Audit logging"
- name: "Data Leakage"
likelihood: "Medium"
impact: "High"
mitigations:
- "Encryption at rest and in transit"
- "Output filtering"
- "Data minimization"
- name: "Model Poisoning"
likelihood: "Low"
impact: "High"
mitigations:
- "Model integrity verification"
- "Secure update process"
- "Anomaly detection"
data_flows:
- source: "User"
destination: "Agent API"
data: "Image, Parameters"
protections: "TLS, Input Validation"
- source: "Agent API"
destination: "Classification Engine"
data: "Processed Image"
protections: "Input Sanitization, Containerization"
- source: "Classification Engine"
destination: "User"
data: "Classification Results"
protections: "Output Filtering, Rate Limiting"
2. Development
Follow secure coding practices and use code reviews:
Secure Coding Checklist: - [ ] Input validation for all data sources - [ ] Output encoding to prevent injection attacks - [ ] Proper error handling without information leakage - [ ] Secure dependency management - [ ] No hardcoded secrets or credentials - [ ] Proper authentication and authorization - [ ] Secure logging practices
Code Review Security Checklist:
## Security Code Review Checklist
### Input Validation
- [ ] All user inputs are validated
- [ ] Schema validation is used for structured data
- [ ] Input size limits are enforced
- [ ] Input validation occurs on the server side
### Authentication & Authorization
- [ ] Authentication is required for protected resources
- [ ] Authorization checks are performed
- [ ] Secure credential storage
- [ ] No hardcoded credentials
### Data Protection
- [ ] Sensitive data is identified and protected
- [ ] Encryption is used for sensitive data
- [ ] No sensitive data in logs
- [ ] Proper key management
### Error Handling
- [ ] Errors are handled gracefully
- [ ] No sensitive information in error messages
- [ ] Appropriate logging of errors
- [ ] Fail securely (closed by default)
### Dependencies
- [ ] Dependencies are up-to-date
- [ ] No known vulnerabilities in dependencies
- [ ] Minimal dependencies used
- [ ] Dependency versions are pinned
3. Testing
Perform security testing (static analysis, penetration testing):
Security Testing Approach:
# Example security testing workflow
# 1. Static Analysis
def run_static_analysis():
"""Run static code analysis tools."""
# Run security linters
subprocess.run(["bandit", "-r", "./src", "-f", "json", "-o", "bandit-results.json"])
# Run dependency scanner
subprocess.run(["safety", "check", "-r", "requirements.txt", "--json", ">safety-results.json"])
# Run secrets scanner
subprocess.run(["detect-secrets", "scan", "--all-files", ">secrets-results.json"])
# 2. Dynamic Analysis
def run_dynamic_analysis():
"""Run dynamic security tests."""
# Start the agent in test mode
agent_process = subprocess.Popen(["python", "agent.py", "--test-mode"])
try:
# Run API fuzzing
subprocess.run(["api-fuzzer", "--target", "http://localhost:8000/api", "--report", "fuzzing-results.json"])
# Run penetration tests
subprocess.run(["zap-cli", "--zap-path", "/opt/zap", "quick-scan", "--self-contained", "--start-options", "-config api.disablekey=true", "http://localhost:8000"])
finally:
# Clean up
agent_process.terminate()
# 3. Security Regression Tests
def run_security_regression_tests():
"""Run security-focused regression tests."""
subprocess.run(["pytest", "tests/security/", "-v"])
# Main testing workflow
def security_test_workflow():
run_static_analysis()
run_dynamic_analysis()
run_security_regression_tests()
# Generate security report
generate_security_report()
Example Security Test Cases:
# Example security test cases
def test_input_validation():
"""Test input validation controls."""
# Test with valid input
assert agent.validate_input({"image": "valid.jpg"}) == True
# Test with invalid input
assert agent.validate_input({"image": "../../../etc/passwd"}) == False
assert agent.validate_input({"image": "<script>alert('xss')</script>"}) == False
# Test with oversized input
large_input = {"image": "x" * (10 * 1024 * 1024)} # 10MB
assert agent.validate_input(large_input) == False
def test_authentication():
"""Test authentication controls."""
# Test with valid API key
response = client.get("/api/status", headers={"X-API-Key": "valid-key"})
assert response.status_code == 200
# Test with invalid API key
response = client.get("/api/status", headers={"X-API-Key": "invalid-key"})
assert response.status_code == 401
# Test with missing API key
response = client.get("/api/status")
assert response.status_code == 401
def test_rate_limiting():
"""Test rate limiting controls."""
# Send requests up to the limit
for _ in range(100):
response = client.get("/api/status", headers={"X-API-Key": "valid-key"})
assert response.status_code == 200
# Send one more request (should be rate limited)
response = client.get("/api/status", headers={"X-API-Key": "valid-key"})
assert response.status_code == 429
4. Deployment
Use secure packaging and deployment pipelines:
Secure Deployment Pipeline:
# Example CI/CD pipeline with security controls
name: Secure Deployment Pipeline
on:
push:
branches: [main]
pull_request:
branches: [main]
jobs:
security_scan:
name: Security Scanning
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: 3.9
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install bandit safety detect-secrets
- name: Run static analysis
run: bandit -r ./src -f json -o bandit-results.json
- name: Check dependencies
run: safety check -r requirements.txt --full-report
- name: Scan for secrets
run: detect-secrets scan --all-files
build_and_test:
name: Build and Test
needs: security_scan
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: 3.9
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install pytest pytest-cov
- name: Run tests
run: pytest --cov=./src tests/
- name: Build container
run: docker build -t agent:${{ github.sha }} .
- name: Scan container
uses: aquasecurity/trivy-action@master
with:
image-ref: agent:${{ github.sha }}
format: 'table'
exit-code: '1'
severity: 'CRITICAL,HIGH'
deploy:
name: Deploy
needs: build_and_test
runs-on: ubuntu-latest
steps:
- name: Deploy to staging
run: |
# Deploy to staging environment
echo "Deploying to staging..."
- name: Run security tests
run: |
# Run security tests against staging
echo "Running security tests..."
- name: Deploy to production
if: github.ref == 'refs/heads/main'
run: |
# Deploy to production environment
echo "Deploying to production..."
5. Operation
Monitor for anomalies and respond to incidents:
Security Monitoring Dashboard:
flowchart TD
A[Agent Logs] --> B[Log Aggregation]
C[Metrics] --> B
D[Audit Events] --> B
B --> E[Anomaly Detection]
E --> F{Alert?}
F -->|Yes| G[Incident Response]
F -->|No| H[Dashboard]
G --> I[Remediation]
I --> J[Post-Incident Review]
Incident Response Plan:
## Security Incident Response Plan
### 1. Preparation
- Maintain current inventory of agents and data
- Define security incident criteria
- Establish incident response team
- Document contact information
- Create incident response playbooks
### 2. Detection and Analysis
- Monitor security alerts
- Analyze logs and metrics
- Determine incident severity
- Document initial findings
### 3. Containment
- Isolate affected systems
- Block malicious activity
- Preserve evidence
- Implement temporary workarounds
### 4. Eradication
- Remove malicious code or components
- Patch vulnerabilities
- Strengthen security controls
- Verify system integrity
### 5. Recovery
- Restore systems to normal operation
- Validate security controls
- Monitor for additional issues
- Return to normal operations
### 6. Post-Incident Activity
- Conduct post-incident review
- Document lessons learned
- Update security controls
- Improve incident response process
6. Decommissioning
Securely delete data and revoke credentials:
# Example decommissioning process
def decommission_agent(agent_id):
"""Securely decommission an agent."""
# 1. Revoke all credentials
revoke_agent_credentials(agent_id)
# 2. Notify users of decommissioning
notify_users_of_decommissioning(agent_id)
# 3. Export required data for retention
retention_data = export_data_for_retention(agent_id)
archive_data_securely(retention_data)
# 4. Securely delete data
securely_delete_agent_data(agent_id)
# 5. Remove from registry
remove_from_registry(agent_id)
# 6. Document decommissioning
document_decommissioning(agent_id)
return {"status": "decommissioned", "agent_id": agent_id}
def securely_delete_agent_data(agent_id):
"""Securely delete all agent data."""
# Get all data locations
data_locations = get_agent_data_locations(agent_id)
for location in data_locations:
if location.type == "database":
# Delete from database with audit trail
db_client.delete_with_audit(
table=location.table,
condition={"agent_id": agent_id},
reason="Agent decommissioning"
)
elif location.type == "file":
# Securely overwrite and delete files
secure_delete_file(location.path)
elif location.type == "object_storage":
# Delete from object storage
storage_client.delete_objects(
bucket=location.bucket,
prefix=f"agents/{agent_id}/"
)
# Verify deletion
verify_data_deletion(agent_id)
Security Resources
Security Tools
| Category | Tools |
|---|---|
| Static Analysis | Bandit, SonarQube, Semgrep |
| Dependency Scanning | Safety, OWASP Dependency Check, Snyk |
| Container Security | Trivy, Clair, Anchore |
| Secret Scanning | detect-secrets, GitGuardian, TruffleHog |
| Dynamic Testing | OWASP ZAP, Burp Suite, API Fuzzer |
| Compliance | Compliance Checkers, Policy-as-Code tools |
Security Checklists
- OWASP API Security Top 10
- OWASP Application Security Verification Standard
- NIST Secure Software Development Framework
- CIS Docker Benchmark
References
- Security Design
- Component Design: Agent Execution Security
- Marketplace Development Guide
- Edge Security Guide
- Compliance Framework
Last updated: 2025-04-18