Overview

The Channel component provides a secure, authenticated communication layer between agents using WebSockets. It enables agents to establish connections, exchange messages, and maintain persistent communication channels with minimal setup.

Installation

The Channel component is included in the Authed SDK:

pip install authed

Basic Usage

Creating a Channel

After initializing the Authed SDK (as described in the SDK documentation), you can create a Channel instance:

from authed import Authed
from authed.sdk.channel import Channel
from authed.sdk.channel.protocol import MessageType

# Initialize from environment variables (recommended)
auth = Authed.from_env()

# Create a channel using the Authed instance
channel = auth.create_channel()

Alternatively, you can create a Channel directly with agent credentials:

# Create a channel with explicit credentials
channel = Channel(
    agent_id="your-agent-id",
    agent_secret="your-agent-secret",
    registry_url="https://api.getauthed.dev"
)

Setting Up a WebSocket Endpoint

To receive incoming connections, set up a WebSocket endpoint in your application:

from fastapi import FastAPI, WebSocket

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        # Let the channel handle the WebSocket connection
        await channel.handle_websocket(websocket, "/ws")
    except Exception as e:
        logger.error(f"WebSocket error: {e}")
        await websocket.close()

Connecting to Another Agent

To initiate a connection to another agent:

# Open a channel to another agent
target_channel = await channel.open_channel(
    target_agent_id="target-agent-id",
    websocket_url="wss://target-agent-domain.com/ws"
)

# Send a message
content_data = {
    "text": "Hello from Agent A!",
    "timestamp": channel.get_iso_timestamp()
}

message_id = await channel.send_message(
    channel=target_channel,
    content_type=MessageType.REQUEST,
    content_data=content_data
)

# Receive a response (with optional timeout)
response = await channel.receive_message(target_channel, timeout=5.0)

# Close the channel when done
await channel.close_channel(target_channel)

Message Handling

Registering Message Handlers

You can register custom handlers for different message types:

from authed.sdk.channel.protocol import MessageType

# Define a custom message handler
async def handle_text_message(message):
    # Extract message content
    content_data = message["content"]["data"]
    text = content_data.get("text", "")
    
    # Process the message...
    
    # Return a response
    return {
        "type": MessageType.RESPONSE,
        "data": {
            "text": f"Received: {text}",
            "timestamp": channel.get_iso_timestamp()
        }
    }

# Register the handler for REQUEST message type
channel.register_handler(MessageType.REQUEST, handle_text_message)

If you don’t register a handler for the REQUEST message type, a default echo handler will be used.

Default Message Types

The Channel component defines standard message types:

Message TypeDescription
CHANNEL_OPENInitiates a new channel
CHANNEL_ACCEPTAccepts a channel connection
CHANNEL_REJECTRejects a channel connection
CHANNEL_CLOSECloses an existing channel
HEARTBEATKeeps the connection alive
ERRORIndicates an error condition
REQUESTApplication-level request
RESPONSEApplication-level response
EVENTApplication-level event notification

Complete Example

Here’s a complete example of an agent that can both receive connections and initiate them:

import logging
import os
from fastapi import FastAPI, WebSocket
import uvicorn
from authed import Authed
from authed.sdk.channel import Channel
from authed.sdk.channel.protocol import MessageType

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Create FastAPI app
app = FastAPI()

# Initialize Authed from environment variables
auth = Authed.from_env()

# Create a channel
channel = auth.create_channel()

# Define a custom message handler
async def handle_text_message(message):
    content_data = message["content"]["data"]
    text = content_data.get("text", "")
    
    logger.info(f"Received message: {text}")
    
    return {
        "type": MessageType.RESPONSE,
        "data": {
            "text": f"Echo: {text}",
            "timestamp": channel.get_iso_timestamp()
        }
    }

# Register the message handler
channel.register_handler(MessageType.REQUEST, handle_text_message)

# WebSocket endpoint
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        await channel.handle_websocket(websocket, "/ws")
    except Exception as e:
        logger.error(f"WebSocket error: {e}")
        await websocket.close()

# API endpoint to send a message to another agent
@app.get("/send-message/{agent_id}")
async def send_message(agent_id: str, message: str, websocket_url: str):
    try:
        # Open a channel to the target agent
        target_channel = await channel.open_channel(
            target_agent_id=agent_id,
            websocket_url=websocket_url
        )
        
        # Send a message
        content_data = {
            "text": message,
            "timestamp": channel.get_iso_timestamp()
        }
        
        message_id = await channel.send_message(
            channel=target_channel,
            content_type=MessageType.REQUEST,
            content_data=content_data
        )
        
        # Wait for a response
        response = await channel.receive_message(target_channel, timeout=5.0)
        
        # Extract response text
        response_text = response["content"]["data"].get("text", "")
        
        # Keep the channel open for future messages
        
        return {
            "status": "success",
            "message_id": message_id,
            "response": response_text
        }
    except Exception as e:
        logger.error(f"Error sending message: {e}")
        return {
            "status": "error",
            "error": str(e)
        }

# Run the server
if __name__ == "__main__":
    port = int(os.environ.get("PORT", 8000))
    uvicorn.run(app, host="0.0.0.0", port=port)

Advanced Usage

Managing Multiple Channels

The Channel component automatically manages connections:

# Open multiple channels
channel_a = await channel.open_channel(
    target_agent_id="agent-a-id",
    websocket_url="wss://agent-a.com/ws"
)

channel_b = await channel.open_channel(
    target_agent_id="agent-b-id",
    websocket_url="wss://agent-b.com/ws"
)

# Get an existing channel if it's already open
existing_channel = channel.get_channel(
    target_agent_id="agent-a-id",
    websocket_url="wss://agent-a.com/ws"
)

# Close all channels when shutting down
await channel.close_all_channels()

Custom Message Formats

While the Channel component provides default message handling, you can customize the message format:

# Send a custom message
await channel.send_message(
    channel=target_channel,
    content_type="custom.message.type",
    content_data={
        "custom_field": "custom_value",
        "nested": {
            "data": "structure"
        }
    }
)

Authentication

The Channel component leverages Authed’s authentication system to ensure secure communication:

  1. When connecting to another agent, the Channel obtains an interaction token from the registry
  2. The token is used to authenticate the WebSocket connection
  3. Messages are exchanged only after successful authentication
  4. The target agent must have granted permission to the source agent

Make sure to set up permissions correctly using the CLI or Platform UI before attempting to establish a channel.