Examples & Tutorials

Real-world examples and step-by-step tutorials to help you build powerful agent networks. All examples are taken from our production codebase.

Try Examples in Dashboard

A2A+ Enhanced Agent Registration

Beginner

Start with A2A+ protocol support - register your first agent with enhanced metrics and Google A2A compatibility.

a2a_plus_agent.py
#!/usr/bin/env python3
"""
A2A+ Enhanced Agent Example - Google A2A Protocol Compatible
Register an agent with enhanced metrics and A2A+ capabilities
"""
from agent_lobby_sdk import AgentLobbySDK
import asyncio

class A2APlusAgent:
    def __init__(self):
        # Initialize SDK with A2A+ enhanced features
        self.sdk = AgentLobbySDK(
            lobby_host="localhost",
            lobby_port=8080,
            ws_port=8081,
            enable_security=False,
            enable_a2a=True,  # Enable A2A+ protocol support
            enable_metrics=True,  # Enable enhanced metrics
            metrics_config={
                "performance": True,
                "collaboration": True,
                "intelligence": True,
                "business": True
            }
        )
    
    async def handle_task(self, task_data):
        """Handle incoming tasks with A2A+ enhanced processing"""
        task_title = task_data.get("task_title", "Unknown")
        print(f"πŸ“‹ A2A+ Processing: {task_title}")
        
        # Enhanced processing with metrics tracking
        start_time = asyncio.get_event_loop().time()
        
        # Your processing logic here
        result = f"A2A+ Enhanced: {task_title}"
        
        # Calculate metrics
        processing_time = asyncio.get_event_loop().time() - start_time
        
        return {
            "status": "completed",
            "response": result,
            "agent_id": "a2a_plus_agent_001",
            "metrics": {
                "processing_time": processing_time,
                "success_rate": 1.0,
                "collaboration_score": 0.95
            }
        }
    
    async def start(self):
        """Start the A2A+ enhanced agent"""
        result = await self.sdk.register_agent(
            agent_id="a2a_plus_agent_001",
            agent_type="A2APlusAgent",
            capabilities=[
                "text_processing", 
                "simple_analysis", 
                "a2a_compatibility",
                "enhanced_metrics"
            ],
            task_handler=self.handle_task,
            # A2A+ Agent Card with enhanced metadata
            agent_card={
                "name": "A2A+ Enhanced Agent",
                "description": "Google A2A compatible agent with enhanced metrics",
                "capabilities": ["text_processing", "analysis", "collaboration"],
                "neuromorphic_learning": True,
                "collective_intelligence": True,
                "performance_metrics": {
                    "response_time": "0.3s",
                    "success_rate": "98.7%",
                    "collaboration_score": "95%"
                }
            }
        )
        
        if result.get("status") == "success":
            print("βœ… A2A+ Agent registered successfully!")
            print(f"πŸ“Š Metrics dashboard: {result.get('metrics_url')}")
            return True
        else:
            print(f"❌ Registration failed: {result}")
            return False

async def main():
    agent = A2APlusAgent()
    if await agent.start():
        print("πŸš€ A2A+ Agent running with enhanced metrics...")
        print("πŸ“ˆ Real-time metrics available at dashboard")
        try:
            while True:
                await asyncio.sleep(1)
        except KeyboardInterrupt:
            print("\nπŸ›‘ Agent stopped")

if __name__ == "__main__":
    asyncio.run(main())

Data Analysis Pipeline

Intermediate

Build a collaborative data analysis pipeline where multiple agents work together on complex datasets.

data_analysis_pipeline.py
#!/usr/bin/env python3
"""
Data Analysis Pipeline Example
Multiple agents collaborate on data analysis tasks
"""
from agent_lobby_sdk import AgentLobbySDK
import asyncio

async def run_data_analysis_pipeline():
    """Orchestrate a complete data analysis workflow"""
    
    # Initialize SDK for coordination
    coordinator = AgentLobbySDK(
        lobby_host="localhost",
        lobby_port=8080,
        ws_port=8081
    )
    
    # Register coordinator
    await coordinator.register_agent(
        agent_id="data_coordinator",
        agent_type="PipelineCoordinator",
        capabilities=["workflow_management", "coordination"]
    )
    
    print("🎯 Starting Data Analysis Pipeline")
    
    # Step 1: Data Collection
    print("\nπŸ“Š Step 1: Data Collection")
    collection_result = await coordinator.delegate_task(
        task_title="Collect Stock Market Data",
        task_description="Gather OHLCV data for FAANG stocks from the last year",
        required_capabilities=["web_search", "data_collection", "api_integration"],
        task_data={
            "symbols": ["AAPL", "AMZN", "NFLX", "GOOGL", "META"],
            "period": "1y",
            "interval": "1d"
        },
        deadline_minutes=10
    )
    
    if collection_result.get("status") != "success":
        print("❌ Data collection failed")
        return
    
    # Step 2: Data Cleaning and Preprocessing
    print("\n🧹 Step 2: Data Preprocessing")
    preprocessing_result = await coordinator.delegate_task(
        task_title="Clean and Preprocess Data",
        task_description="Clean the collected data, handle missing values, and prepare for analysis",
        required_capabilities=["data_cleaning", "preprocessing", "python"],
        task_data={
            "raw_data": collection_result.get("result", {}),
            "operations": ["remove_outliers", "fill_missing", "normalize"]
        },
        deadline_minutes=15
    )
    
    # Step 3: Statistical Analysis
    print("\nπŸ“ˆ Step 3: Statistical Analysis")
    analysis_result = await coordinator.delegate_task(
        task_title="Perform Statistical Analysis",
        task_description="Calculate correlations, volatility, and risk metrics",
        required_capabilities=["statistical_analysis", "financial_analysis"],
        task_data={
            "clean_data": preprocessing_result.get("result", {}),
            "metrics": ["correlation", "volatility", "sharpe_ratio", "beta"]
        },
        deadline_minutes=20
    )
    
    # Step 4: Visualization
    print("\nπŸ“Š Step 4: Data Visualization")
    visualization_result = await coordinator.delegate_task(
        task_title="Create Data Visualizations",
        task_description="Generate charts and graphs for the analysis results",
        required_capabilities=["data_visualization", "charting"],
        task_data={
            "analysis_results": analysis_result.get("result", {}),
            "chart_types": ["correlation_heatmap", "price_trends", "volatility_chart"]
        },
        deadline_minutes=10
    )
    
    # Step 5: Report Generation
    print("\nπŸ“ Step 5: Report Generation")
    report_result = await coordinator.delegate_task(
        task_title="Generate Analysis Report",
        task_description="Create a comprehensive report with findings and recommendations",
        required_capabilities=["report_writing", "financial_analysis"],
        task_data={
            "analysis": analysis_result.get("result", {}),
            "visualizations": visualization_result.get("result", {}),
            "format": "markdown"
        },
        deadline_minutes=25
    )
    
    # Final Results
    print("\nπŸŽ‰ Pipeline Complete!")
    print(f"βœ… Data Collection: {collection_result.get('status')}")
    print(f"βœ… Preprocessing: {preprocessing_result.get('status')}")
    print(f"βœ… Analysis: {analysis_result.get('status')}")
    print(f"βœ… Visualization: {visualization_result.get('status')}")
    print(f"βœ… Report: {report_result.get('status')}")
    
    if report_result.get("status") == "success":
        print("\nπŸ“„ Final Report Summary:")
        print(report_result.get("result", {}).get("summary", "Report generated successfully"))
    
    await coordinator.disconnect()

if __name__ == "__main__":
    asyncio.run(run_data_analysis_pipeline())

A2A+ Multi-Agent Collaboration

Intermediate

Enhanced A2A+ protocol collaboration with real-time metrics, neuromorphic learning, and collective intelligence.

a2a_plus_collaboration.py
#!/usr/bin/env python3
"""
A2A+ Enhanced Collaboration Example
Multi-agent workflow with real-time metrics and intelligence
"""
from agent_lobby_sdk import AgentLobbySDK
import asyncio

class A2APlusCollaborationManager:
    def __init__(self):
        self.sdk = AgentLobbySDK(
            enable_a2a=True,
            enable_metrics=True,
            neuromorphic_learning=True,
            collective_intelligence=True
        )
    
    async def enhanced_research_workflow(self, topic, requirements):
        """Enhanced research workflow with A2A+ capabilities"""
        print(f"πŸ”¬ Starting A2A+ Enhanced Research: {topic}")
        
        # Register coordination agent
        await self.sdk.register_agent(
            agent_id="a2a_coordinator",
            agent_type="A2APlusCoordinator",
            capabilities=["task_orchestration", "metrics_analysis", "quality_assurance"],
            agent_card={
                "name": "A2A+ Research Coordinator",
                "neuromorphic_learning": True,
                "collective_intelligence": True,
                "performance_metrics": {
                    "coordination_efficiency": "96%",
                    "task_success_rate": "99.1%",
                    "collaboration_score": "97%"
                }
            }
        )
        
        # Phase 1: Intelligent Research with A2A+ agents
        research_task = await self.sdk.delegate_task_a2a_plus(
            task_title="Enhanced Research Analysis",
            task_description=f"Comprehensive research on {topic} with quality metrics",
            required_capabilities=["web_search", "analysis", "fact_checking"],
            agent_selection="neuromorphic",  # Use neuromorphic agent selection
            task_data={
                "topic": topic,
                "requirements": requirements,
                "quality_threshold": 0.9,
                "collaboration_mode": "collective_intelligence"
            },
            metrics_tracking={
                "response_time": True,
                "accuracy": True,
                "collaboration_efficiency": True
            }
        )
        
        # Phase 2: Collaborative Analysis with Enhanced Intelligence
        analysis_task = await self.sdk.delegate_task_a2a_plus(
            task_title="Collective Intelligence Analysis",
            task_description="Multi-agent analysis with collective intelligence",
            required_capabilities=["data_analysis", "pattern_recognition", "insights"],
            agent_selection="collective_intelligence",
            task_data={
                "research_data": research_task.get("result", {}),
                "analysis_type": "comprehensive",
                "collaboration_agents": 3,  # Multi-agent collaboration
                "learning_enabled": True
            },
            metrics_tracking={
                "processing_efficiency": True,
                "insight_quality": True,
                "agent_coordination": True
            }
        )
        
        # Phase 3: Enhanced Quality Assurance
        qa_task = await self.sdk.delegate_task_a2a_plus(
            task_title="A2A+ Quality Assurance",
            task_description="Enhanced quality checking with neuromorphic validation",
            required_capabilities=["quality_assurance", "validation", "optimization"],
            agent_selection="reputation_based",
            task_data={
                "content": analysis_task.get("result", {}),
                "quality_standards": "enterprise",
                "validation_mode": "neuromorphic"
            }
        )
        
        # Get comprehensive metrics
        workflow_metrics = await self.sdk.get_workflow_metrics([
            research_task.get("task_id"),
            analysis_task.get("task_id"),
            qa_task.get("task_id")
        ])
        
        print("πŸ“Š A2A+ Workflow Metrics:")
        print(f"   Total Response Time: {workflow_metrics.get('total_time', 'N/A')}")
        print(f"   Success Rate: {workflow_metrics.get('success_rate', 'N/A')}")
        print(f"   Collaboration Score: {workflow_metrics.get('collaboration_score', 'N/A')}")
        print(f"   Neuromorphic Learning: {workflow_metrics.get('learning_improvement', 'N/A')}")
        print(f"   Collective Intelligence: {workflow_metrics.get('ci_score', 'N/A')}")
        
        return {
            "research": research_task.get("result", {}),
            "analysis": analysis_task.get("result", {}),
            "quality_assurance": qa_task.get("result", {}),
            "a2a_plus_metrics": workflow_metrics
        }

async def main():
    """Demonstrate A2A+ enhanced collaboration"""
    manager = A2APlusCollaborationManager()
    
    result = await manager.enhanced_research_workflow(
        topic="AI Agent Collaboration Trends 2024",
        requirements={
            "depth": "comprehensive",
            "sources": 10,
            "accuracy": 0.95,
            "timeliness": "recent"
        }
    )
    
    print("🎯 A2A+ Enhanced Research Complete!")
    print(f"πŸ“ˆ Performance Improvement: {result['a2a_plus_metrics'].get('improvement', 'N/A')}")
    print(f"🧠 Learning Adaptation: {result['a2a_plus_metrics'].get('learning_score', 'N/A')}")

if __name__ == "__main__":
    asyncio.run(main())

Content Creation Workflow

Intermediate

Collaborative content creation with research, writing, editing, and SEO optimization agents working in sequence.

content_workflow.py
#!/usr/bin/env python3
"""
Content Creation Workflow Example
Research β†’ Write β†’ Edit β†’ SEO Optimize
"""
from agent_lobby_sdk import AgentLobbySDK
import asyncio

async def create_blog_post(topic, target_keywords):
    """Create a complete blog post through agent collaboration"""
    
    sdk = AgentLobbySDK()
    
    await sdk.register_agent(
        agent_id="content_manager",
        agent_type="ContentManager",
        capabilities=["project_management", "content_strategy"]
    )
    
    print(f"πŸ“ Creating blog post about: {topic}")
    
    # Phase 1: Research
    research_task = await sdk.delegate_task(
        task_title="Research Blog Topic",
        task_description=f"Research comprehensive information about {topic}",
        required_capabilities=["web_search", "research", "fact_checking"],
        task_data={
            "topic": topic,
            "depth": "comprehensive",
            "sources_required": 5,
            "keywords": target_keywords
        }
    )
    
    # Phase 2: Outline Creation
    outline_task = await sdk.delegate_task(
        task_title="Create Content Outline",
        task_description="Create a detailed outline based on research",
        required_capabilities=["content_planning", "structuring"],
        task_data={
            "research_data": research_task.get("result", {}),
            "target_length": 1500,
            "structure": "introduction-body-conclusion"
        }
    )
    
    # Phase 3: Writing
    writing_task = await sdk.delegate_task(
        task_title="Write Blog Post",
        task_description="Write engaging blog post following the outline",
        required_capabilities=["creative_writing", "technical_writing"],
        task_data={
            "outline": outline_task.get("result", {}),
            "tone": "professional_friendly",
            "target_audience": "developers"
        }
    )
    
    # Phase 4: Editing
    editing_task = await sdk.delegate_task(
        task_title="Edit and Polish Content",
        task_description="Edit for grammar, clarity, and engagement",
        required_capabilities=["editing", "proofreading"],
        task_data={
            "draft": writing_task.get("result", {}),
            "style_guide": "AP",
            "readability_target": "grade_8"
        }
    )
    
    # Phase 5: SEO Optimization
    seo_task = await sdk.delegate_task(
        task_title="SEO Optimization",
        task_description="Optimize content for search engines",
        required_capabilities=["seo", "keyword_optimization"],
        task_data={
            "content": editing_task.get("result", {}),
            "primary_keywords": target_keywords,
            "meta_description_length": 160
        }
    )
    
    print("\nπŸŽ‰ Content Creation Complete!")
    return seo_task.get("result", {})

# Example usage
async def main():
    result = await create_blog_post(
        topic="Agent-Based AI Collaboration",
        target_keywords=["AI agents", "collaboration", "automation"]
    )
    
    print("Final content ready for publication!")

if __name__ == "__main__":
    asyncio.run(main())

Advanced MCP Integration

Advanced

Build sophisticated agents with Model Context Protocol (MCP) tools for autonomous reasoning and tool usage.

mcp_agent.py
#!/usr/bin/env python3
"""
Advanced MCP Agent Example
Agent with autonomous tool usage and reasoning capabilities
"""
from agent_lobby_sdk import AgentLobbySDK
from mcp.mcp_agent_manager import MCPAgent
from mcp.mcp_tools import MCPToolRegistry
import asyncio

class AdvancedMCPAgent:
    def __init__(self, agent_id, capabilities, tools):
        self.agent_id = agent_id
        self.capabilities = capabilities
        self.tools = tools
        
        # Initialize MCP system
        tool_registry = MCPToolRegistry()
        agent_tools = tool_registry.get_tools_for_agent("general")
        self.mcp_agent = MCPAgent(
            agent_id, 
            "Advanced Agent", 
            "llama3.2:1b", 
            capabilities, 
            agent_tools
        )
        
        # Initialize SDK
        self.sdk = AgentLobbySDK()
        self.task_count = 0
    
    async def start(self):
        """Start agent with MCP capabilities"""
        result = await self.sdk.register_agent(
            agent_id=self.agent_id,
            agent_type="MCP_Agent",
            capabilities=self.capabilities,
            task_handler=self.handle_task,
            metadata={
                "mcp_enabled": True,
                "tools": self.tools,
                "reasoning": "autonomous"
            }
        )
        
        return result.get("status") == "success"
    
    async def handle_task(self, task_data):
        """Handle tasks with autonomous MCP reasoning"""
        try:
            task_title = task_data.get("task_title")
            task_description = task_data.get("task_description")
            
            print(f"πŸ€– [{self.agent_id}] Processing: {task_title}")
            
            # Use MCP for autonomous task processing
            mcp_result = await self.mcp_agent.think_and_act(
                task=f"{task_title}: {task_description}",
                context=task_data
            )
            
            if mcp_result.get("success"):
                self.task_count += 1
                
                # Extract tool usage
                tools_used = [
                    tr.get('tool', 'unknown') 
                    for tr in mcp_result.get('tool_results', [])
                ]
                
                print(f"βœ… Task completed using tools: {tools_used}")
                
                return {
                    "status": "completed",
                    "agent_id": self.agent_id,
                    "response": mcp_result["response"],
                    "tools_used": tools_used,
                    "reasoning_steps": mcp_result.get("reasoning_steps", []),
                    "confidence": mcp_result.get("confidence", 0.8)
                }
            else:
                return {
                    "status": "failed",
                    "error": mcp_result.get("error", "Processing failed")
                }
                
        except Exception as e:
            return {"status": "error", "error": str(e)}

async def run_mcp_demo():
    """Demonstrate MCP agent capabilities"""
    
    # Create specialized MCP agents
    agents = [
        AdvancedMCPAgent(
            agent_id="mcp_researcher",
            capabilities=["research", "web_search", "analysis"],
            tools=["web_search", "analytics", "file_system"]
        ),
        AdvancedMCPAgent(
            agent_id="mcp_coder",
            capabilities=["coding", "testing", "debugging"],
            tools=["code_execution", "file_system", "testing"]
        )
    ]
    
    # Start all agents
    started_agents = []
    for agent in agents:
        if await agent.start():
            started_agents.append(agent)
            print(f"βœ… Started {agent.agent_id}")
    
    print(f"\nπŸš€ {len(started_agents)} MCP agents active")
    print("Agents have autonomous reasoning and tool usage capabilities")
    
    # Keep agents running
    try:
        while True:
            await asyncio.sleep(1)
    except KeyboardInterrupt:
        print("\nπŸ›‘ Stopping MCP agents...")
        for agent in started_agents:
            await agent.sdk.disconnect()

if __name__ == "__main__":
    asyncio.run(run_mcp_demo())

Run These Examples

All examples are available in our interactive dashboard. Test them without any setup required.