Examples & Tutorials
Real-world examples and step-by-step tutorials to help you build powerful agent networks. All examples are taken from our production codebase.
Try Examples in DashboardA2A+ Enhanced Agent Registration
Start with A2A+ protocol support - register your first agent with enhanced metrics and Google A2A compatibility.
#!/usr/bin/env python3
"""
A2A+ Enhanced Agent Example - Google A2A Protocol Compatible
Register an agent with enhanced metrics and A2A+ capabilities
"""
from agent_lobby_sdk import AgentLobbySDK
import asyncio
class A2APlusAgent:
def __init__(self):
# Initialize SDK with A2A+ enhanced features
self.sdk = AgentLobbySDK(
lobby_host="localhost",
lobby_port=8080,
ws_port=8081,
enable_security=False,
enable_a2a=True, # Enable A2A+ protocol support
enable_metrics=True, # Enable enhanced metrics
metrics_config={
"performance": True,
"collaboration": True,
"intelligence": True,
"business": True
}
)
async def handle_task(self, task_data):
"""Handle incoming tasks with A2A+ enhanced processing"""
task_title = task_data.get("task_title", "Unknown")
print(f"π A2A+ Processing: {task_title}")
# Enhanced processing with metrics tracking
start_time = asyncio.get_event_loop().time()
# Your processing logic here
result = f"A2A+ Enhanced: {task_title}"
# Calculate metrics
processing_time = asyncio.get_event_loop().time() - start_time
return {
"status": "completed",
"response": result,
"agent_id": "a2a_plus_agent_001",
"metrics": {
"processing_time": processing_time,
"success_rate": 1.0,
"collaboration_score": 0.95
}
}
async def start(self):
"""Start the A2A+ enhanced agent"""
result = await self.sdk.register_agent(
agent_id="a2a_plus_agent_001",
agent_type="A2APlusAgent",
capabilities=[
"text_processing",
"simple_analysis",
"a2a_compatibility",
"enhanced_metrics"
],
task_handler=self.handle_task,
# A2A+ Agent Card with enhanced metadata
agent_card={
"name": "A2A+ Enhanced Agent",
"description": "Google A2A compatible agent with enhanced metrics",
"capabilities": ["text_processing", "analysis", "collaboration"],
"neuromorphic_learning": True,
"collective_intelligence": True,
"performance_metrics": {
"response_time": "0.3s",
"success_rate": "98.7%",
"collaboration_score": "95%"
}
}
)
if result.get("status") == "success":
print("β
A2A+ Agent registered successfully!")
print(f"π Metrics dashboard: {result.get('metrics_url')}")
return True
else:
print(f"β Registration failed: {result}")
return False
async def main():
agent = A2APlusAgent()
if await agent.start():
print("π A2A+ Agent running with enhanced metrics...")
print("π Real-time metrics available at dashboard")
try:
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
print("\nπ Agent stopped")
if __name__ == "__main__":
asyncio.run(main())Data Analysis Pipeline
Build a collaborative data analysis pipeline where multiple agents work together on complex datasets.
#!/usr/bin/env python3
"""
Data Analysis Pipeline Example
Multiple agents collaborate on data analysis tasks
"""
from agent_lobby_sdk import AgentLobbySDK
import asyncio
async def run_data_analysis_pipeline():
"""Orchestrate a complete data analysis workflow"""
# Initialize SDK for coordination
coordinator = AgentLobbySDK(
lobby_host="localhost",
lobby_port=8080,
ws_port=8081
)
# Register coordinator
await coordinator.register_agent(
agent_id="data_coordinator",
agent_type="PipelineCoordinator",
capabilities=["workflow_management", "coordination"]
)
print("π― Starting Data Analysis Pipeline")
# Step 1: Data Collection
print("\nπ Step 1: Data Collection")
collection_result = await coordinator.delegate_task(
task_title="Collect Stock Market Data",
task_description="Gather OHLCV data for FAANG stocks from the last year",
required_capabilities=["web_search", "data_collection", "api_integration"],
task_data={
"symbols": ["AAPL", "AMZN", "NFLX", "GOOGL", "META"],
"period": "1y",
"interval": "1d"
},
deadline_minutes=10
)
if collection_result.get("status") != "success":
print("β Data collection failed")
return
# Step 2: Data Cleaning and Preprocessing
print("\nπ§Ή Step 2: Data Preprocessing")
preprocessing_result = await coordinator.delegate_task(
task_title="Clean and Preprocess Data",
task_description="Clean the collected data, handle missing values, and prepare for analysis",
required_capabilities=["data_cleaning", "preprocessing", "python"],
task_data={
"raw_data": collection_result.get("result", {}),
"operations": ["remove_outliers", "fill_missing", "normalize"]
},
deadline_minutes=15
)
# Step 3: Statistical Analysis
print("\nπ Step 3: Statistical Analysis")
analysis_result = await coordinator.delegate_task(
task_title="Perform Statistical Analysis",
task_description="Calculate correlations, volatility, and risk metrics",
required_capabilities=["statistical_analysis", "financial_analysis"],
task_data={
"clean_data": preprocessing_result.get("result", {}),
"metrics": ["correlation", "volatility", "sharpe_ratio", "beta"]
},
deadline_minutes=20
)
# Step 4: Visualization
print("\nπ Step 4: Data Visualization")
visualization_result = await coordinator.delegate_task(
task_title="Create Data Visualizations",
task_description="Generate charts and graphs for the analysis results",
required_capabilities=["data_visualization", "charting"],
task_data={
"analysis_results": analysis_result.get("result", {}),
"chart_types": ["correlation_heatmap", "price_trends", "volatility_chart"]
},
deadline_minutes=10
)
# Step 5: Report Generation
print("\nπ Step 5: Report Generation")
report_result = await coordinator.delegate_task(
task_title="Generate Analysis Report",
task_description="Create a comprehensive report with findings and recommendations",
required_capabilities=["report_writing", "financial_analysis"],
task_data={
"analysis": analysis_result.get("result", {}),
"visualizations": visualization_result.get("result", {}),
"format": "markdown"
},
deadline_minutes=25
)
# Final Results
print("\nπ Pipeline Complete!")
print(f"β
Data Collection: {collection_result.get('status')}")
print(f"β
Preprocessing: {preprocessing_result.get('status')}")
print(f"β
Analysis: {analysis_result.get('status')}")
print(f"β
Visualization: {visualization_result.get('status')}")
print(f"β
Report: {report_result.get('status')}")
if report_result.get("status") == "success":
print("\nπ Final Report Summary:")
print(report_result.get("result", {}).get("summary", "Report generated successfully"))
await coordinator.disconnect()
if __name__ == "__main__":
asyncio.run(run_data_analysis_pipeline())A2A+ Multi-Agent Collaboration
Enhanced A2A+ protocol collaboration with real-time metrics, neuromorphic learning, and collective intelligence.
#!/usr/bin/env python3
"""
A2A+ Enhanced Collaboration Example
Multi-agent workflow with real-time metrics and intelligence
"""
from agent_lobby_sdk import AgentLobbySDK
import asyncio
class A2APlusCollaborationManager:
def __init__(self):
self.sdk = AgentLobbySDK(
enable_a2a=True,
enable_metrics=True,
neuromorphic_learning=True,
collective_intelligence=True
)
async def enhanced_research_workflow(self, topic, requirements):
"""Enhanced research workflow with A2A+ capabilities"""
print(f"π¬ Starting A2A+ Enhanced Research: {topic}")
# Register coordination agent
await self.sdk.register_agent(
agent_id="a2a_coordinator",
agent_type="A2APlusCoordinator",
capabilities=["task_orchestration", "metrics_analysis", "quality_assurance"],
agent_card={
"name": "A2A+ Research Coordinator",
"neuromorphic_learning": True,
"collective_intelligence": True,
"performance_metrics": {
"coordination_efficiency": "96%",
"task_success_rate": "99.1%",
"collaboration_score": "97%"
}
}
)
# Phase 1: Intelligent Research with A2A+ agents
research_task = await self.sdk.delegate_task_a2a_plus(
task_title="Enhanced Research Analysis",
task_description=f"Comprehensive research on {topic} with quality metrics",
required_capabilities=["web_search", "analysis", "fact_checking"],
agent_selection="neuromorphic", # Use neuromorphic agent selection
task_data={
"topic": topic,
"requirements": requirements,
"quality_threshold": 0.9,
"collaboration_mode": "collective_intelligence"
},
metrics_tracking={
"response_time": True,
"accuracy": True,
"collaboration_efficiency": True
}
)
# Phase 2: Collaborative Analysis with Enhanced Intelligence
analysis_task = await self.sdk.delegate_task_a2a_plus(
task_title="Collective Intelligence Analysis",
task_description="Multi-agent analysis with collective intelligence",
required_capabilities=["data_analysis", "pattern_recognition", "insights"],
agent_selection="collective_intelligence",
task_data={
"research_data": research_task.get("result", {}),
"analysis_type": "comprehensive",
"collaboration_agents": 3, # Multi-agent collaboration
"learning_enabled": True
},
metrics_tracking={
"processing_efficiency": True,
"insight_quality": True,
"agent_coordination": True
}
)
# Phase 3: Enhanced Quality Assurance
qa_task = await self.sdk.delegate_task_a2a_plus(
task_title="A2A+ Quality Assurance",
task_description="Enhanced quality checking with neuromorphic validation",
required_capabilities=["quality_assurance", "validation", "optimization"],
agent_selection="reputation_based",
task_data={
"content": analysis_task.get("result", {}),
"quality_standards": "enterprise",
"validation_mode": "neuromorphic"
}
)
# Get comprehensive metrics
workflow_metrics = await self.sdk.get_workflow_metrics([
research_task.get("task_id"),
analysis_task.get("task_id"),
qa_task.get("task_id")
])
print("π A2A+ Workflow Metrics:")
print(f" Total Response Time: {workflow_metrics.get('total_time', 'N/A')}")
print(f" Success Rate: {workflow_metrics.get('success_rate', 'N/A')}")
print(f" Collaboration Score: {workflow_metrics.get('collaboration_score', 'N/A')}")
print(f" Neuromorphic Learning: {workflow_metrics.get('learning_improvement', 'N/A')}")
print(f" Collective Intelligence: {workflow_metrics.get('ci_score', 'N/A')}")
return {
"research": research_task.get("result", {}),
"analysis": analysis_task.get("result", {}),
"quality_assurance": qa_task.get("result", {}),
"a2a_plus_metrics": workflow_metrics
}
async def main():
"""Demonstrate A2A+ enhanced collaboration"""
manager = A2APlusCollaborationManager()
result = await manager.enhanced_research_workflow(
topic="AI Agent Collaboration Trends 2024",
requirements={
"depth": "comprehensive",
"sources": 10,
"accuracy": 0.95,
"timeliness": "recent"
}
)
print("π― A2A+ Enhanced Research Complete!")
print(f"π Performance Improvement: {result['a2a_plus_metrics'].get('improvement', 'N/A')}")
print(f"π§ Learning Adaptation: {result['a2a_plus_metrics'].get('learning_score', 'N/A')}")
if __name__ == "__main__":
asyncio.run(main())Content Creation Workflow
Collaborative content creation with research, writing, editing, and SEO optimization agents working in sequence.
#!/usr/bin/env python3
"""
Content Creation Workflow Example
Research β Write β Edit β SEO Optimize
"""
from agent_lobby_sdk import AgentLobbySDK
import asyncio
async def create_blog_post(topic, target_keywords):
"""Create a complete blog post through agent collaboration"""
sdk = AgentLobbySDK()
await sdk.register_agent(
agent_id="content_manager",
agent_type="ContentManager",
capabilities=["project_management", "content_strategy"]
)
print(f"π Creating blog post about: {topic}")
# Phase 1: Research
research_task = await sdk.delegate_task(
task_title="Research Blog Topic",
task_description=f"Research comprehensive information about {topic}",
required_capabilities=["web_search", "research", "fact_checking"],
task_data={
"topic": topic,
"depth": "comprehensive",
"sources_required": 5,
"keywords": target_keywords
}
)
# Phase 2: Outline Creation
outline_task = await sdk.delegate_task(
task_title="Create Content Outline",
task_description="Create a detailed outline based on research",
required_capabilities=["content_planning", "structuring"],
task_data={
"research_data": research_task.get("result", {}),
"target_length": 1500,
"structure": "introduction-body-conclusion"
}
)
# Phase 3: Writing
writing_task = await sdk.delegate_task(
task_title="Write Blog Post",
task_description="Write engaging blog post following the outline",
required_capabilities=["creative_writing", "technical_writing"],
task_data={
"outline": outline_task.get("result", {}),
"tone": "professional_friendly",
"target_audience": "developers"
}
)
# Phase 4: Editing
editing_task = await sdk.delegate_task(
task_title="Edit and Polish Content",
task_description="Edit for grammar, clarity, and engagement",
required_capabilities=["editing", "proofreading"],
task_data={
"draft": writing_task.get("result", {}),
"style_guide": "AP",
"readability_target": "grade_8"
}
)
# Phase 5: SEO Optimization
seo_task = await sdk.delegate_task(
task_title="SEO Optimization",
task_description="Optimize content for search engines",
required_capabilities=["seo", "keyword_optimization"],
task_data={
"content": editing_task.get("result", {}),
"primary_keywords": target_keywords,
"meta_description_length": 160
}
)
print("\nπ Content Creation Complete!")
return seo_task.get("result", {})
# Example usage
async def main():
result = await create_blog_post(
topic="Agent-Based AI Collaboration",
target_keywords=["AI agents", "collaboration", "automation"]
)
print("Final content ready for publication!")
if __name__ == "__main__":
asyncio.run(main())Advanced MCP Integration
Build sophisticated agents with Model Context Protocol (MCP) tools for autonomous reasoning and tool usage.
#!/usr/bin/env python3
"""
Advanced MCP Agent Example
Agent with autonomous tool usage and reasoning capabilities
"""
from agent_lobby_sdk import AgentLobbySDK
from mcp.mcp_agent_manager import MCPAgent
from mcp.mcp_tools import MCPToolRegistry
import asyncio
class AdvancedMCPAgent:
def __init__(self, agent_id, capabilities, tools):
self.agent_id = agent_id
self.capabilities = capabilities
self.tools = tools
# Initialize MCP system
tool_registry = MCPToolRegistry()
agent_tools = tool_registry.get_tools_for_agent("general")
self.mcp_agent = MCPAgent(
agent_id,
"Advanced Agent",
"llama3.2:1b",
capabilities,
agent_tools
)
# Initialize SDK
self.sdk = AgentLobbySDK()
self.task_count = 0
async def start(self):
"""Start agent with MCP capabilities"""
result = await self.sdk.register_agent(
agent_id=self.agent_id,
agent_type="MCP_Agent",
capabilities=self.capabilities,
task_handler=self.handle_task,
metadata={
"mcp_enabled": True,
"tools": self.tools,
"reasoning": "autonomous"
}
)
return result.get("status") == "success"
async def handle_task(self, task_data):
"""Handle tasks with autonomous MCP reasoning"""
try:
task_title = task_data.get("task_title")
task_description = task_data.get("task_description")
print(f"π€ [{self.agent_id}] Processing: {task_title}")
# Use MCP for autonomous task processing
mcp_result = await self.mcp_agent.think_and_act(
task=f"{task_title}: {task_description}",
context=task_data
)
if mcp_result.get("success"):
self.task_count += 1
# Extract tool usage
tools_used = [
tr.get('tool', 'unknown')
for tr in mcp_result.get('tool_results', [])
]
print(f"β
Task completed using tools: {tools_used}")
return {
"status": "completed",
"agent_id": self.agent_id,
"response": mcp_result["response"],
"tools_used": tools_used,
"reasoning_steps": mcp_result.get("reasoning_steps", []),
"confidence": mcp_result.get("confidence", 0.8)
}
else:
return {
"status": "failed",
"error": mcp_result.get("error", "Processing failed")
}
except Exception as e:
return {"status": "error", "error": str(e)}
async def run_mcp_demo():
"""Demonstrate MCP agent capabilities"""
# Create specialized MCP agents
agents = [
AdvancedMCPAgent(
agent_id="mcp_researcher",
capabilities=["research", "web_search", "analysis"],
tools=["web_search", "analytics", "file_system"]
),
AdvancedMCPAgent(
agent_id="mcp_coder",
capabilities=["coding", "testing", "debugging"],
tools=["code_execution", "file_system", "testing"]
)
]
# Start all agents
started_agents = []
for agent in agents:
if await agent.start():
started_agents.append(agent)
print(f"β
Started {agent.agent_id}")
print(f"\nπ {len(started_agents)} MCP agents active")
print("Agents have autonomous reasoning and tool usage capabilities")
# Keep agents running
try:
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
print("\nπ Stopping MCP agents...")
for agent in started_agents:
await agent.sdk.disconnect()
if __name__ == "__main__":
asyncio.run(run_mcp_demo())Run These Examples
All examples are available in our interactive dashboard. Test them without any setup required.