公告

任何建议或需求可联系我!


Skip to content

SSE 传输

服务器发送事件(SSE)传输支持 MCP 客户端与服务器之间的实时、Web 友好的通信。非常适合需要实时更新和多客户端支持的 Web 应用程序。

适用场景

SSE 传输非常适合:

  • Web 应用程序:基于浏览器的 LLM 界面
  • 实时仪表板:实时数据监控和可视化
  • 协作工具:具有共享状态的多用户环境
  • 流式响应:具有进度更新的长时间运行操作
  • 事件驱动系统:需要服务器主动通信的应用程序

示例应用:

  • 基于 Web 的 LLM 聊天界面
  • 实时分析仪表板
  • 协作文档编辑
  • 实时系统监控工具
  • 流式数据处理界面

实现

基本的 SSE 服务器

go
package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/mark3labs/mcp-go/mcp"
    "github.com/mark3labs/mcp-go/server"
)

func main() {
    s := server.NewMCPServer("SSE Server", "1.0.0",
        server.WithToolCapabilities(true),
        server.WithResourceCapabilities(true, true),
    )

    // 添加实时工具
    s.AddTool(
        mcp.NewTool("stream_data",
            mcp.WithDescription("Stream data with real-time updates"),
            mcp.WithString("source", mcp.Required()),
            mcp.WithNumber("count", mcp.DefaultNumber(10)),
        ),
        handleStreamData,
    )

    s.AddTool(
        mcp.NewTool("monitor_system",
            mcp.WithDescription("Monitor system metrics in real-time"),
            mcp.WithNumber("duration", mcp.DefaultNumber(60)),
        ),
        handleSystemMonitor,
    )

    // 添加动态资源
    s.AddResource(
        mcp.NewResource(
            "metrics://current",
            "Current System Metrics",
            mcp.WithResourceDescription("Real-time system metrics"),
            mcp.WithMIMEType("application/json"),
        ),
        handleCurrentMetrics,
    )

    // 启动 SSE 服务器
    log.Println("Starting SSE server on :8080")
    sseServer := server.NewSSEServer(s)
    if err := sseServer.Start(":8080"); err != nil {
        log.Fatal(err)
    }
}

func handleStreamData(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
    // 访问请求头
    headers := req.Header

    // 使用 headers 进行身份验证、追踪等
    authToken := headers.Get("Authorization")
    if authToken == "" {
        return nil, fmt.Errorf("authentication required")
    }

    // 访问其他 headers
    requestID := headers.Get("X-Request-ID")
    userAgent := headers.Get("User-Agent")

    source := req.GetString("source", "")
    count := req.GetInt("count", 10)

    // 从上下文获取服务器用于通知
    mcpServer := server.ServerFromContext(ctx)

    // 带进度更新的数据流
    var results []map[string]interface{}
    for i := 0; i < count; i++ {
        // 检查取消
        select {
        case <-ctx.Done():
            return nil, ctx.Err()
        default:
        }

        // 模拟数据处理
        data := generateData(source, i)
        results = append(results, data)

        // 发送进度通知
        if mcpServer != nil {
            err := mcpServer.SendNotificationToClient(ctx, "notifications/progress", map[string]interface{}{
                "progress": i + 1,
                "total":    count,
                "message":  fmt.Sprintf("Processed %d/%d items from %s", i+1, count, source),
            })
            if err != nil {
                log.Printf("Failed to send notification: %v", err)
            }
        }

        time.Sleep(100 * time.Millisecond)
    }

    return mcp.NewToolResultText(fmt.Sprintf(`{"source":"%s","results":%v,"count":%d}`,
        source, results, len(results))), nil
}

// 示例辅助函数
func generateData(source string, index int) map[string]interface{} {
    return map[string]interface{}{
        "source": source,
        "index":  index,
        "value":  fmt.Sprintf("data_%d", index),
    }
}

func handleSystemMonitor(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
    duration := req.GetInt("duration", 60)

    mcpServer := server.ServerFromContext(ctx)

    // 监控指定持续时间的系统
    ticker := time.NewTicker(5 * time.Second)
    defer ticker.Stop()

    timeout := time.After(time.Duration(duration) * time.Second)
    var metrics []map[string]interface{}

    for {
        select {
        case <-ctx.Done():
            return nil, ctx.Err()
        case <-timeout:
            return mcp.NewToolResultText(fmt.Sprintf(`{"duration":%d,"metrics":%v,"samples":%d}`,
                duration, metrics, len(metrics))), nil
        case <-ticker.C:
            // 收集当前指标
            currentMetrics := collectSystemMetrics()
            metrics = append(metrics, currentMetrics)

            // 发送实时更新
            if mcpServer != nil {
                err := mcpServer.SendNotificationToClient(ctx, "system_metrics", currentMetrics)
                if err != nil {
                    log.Printf("Failed to send system metrics notification: %v", err)
                }
            }
        }
    }
}

func collectSystemMetrics() map[string]interface{} {
    // 占位符实现
    return map[string]interface{}{
        "cpu":    50.5,
        "memory": 75.2,
        "disk":   30.1,
    }
}

func handleCurrentMetrics(ctx context.Context, req mcp.ReadResourceRequest) ([]mcp.ResourceContents, error) {
    metrics := collectSystemMetrics()
    return []mcp.ResourceContents{
        mcp.TextResourceContents{
            URI:      req.Params.URI,
            MIMEType: "application/json",
            Text:     fmt.Sprintf(`{"cpu":%.1f,"memory":%.1f,"disk":%.1f}`, metrics["cpu"], metrics["memory"], metrics["disk"]),
        },
    }, nil
}

高级 SSE 配置

go
func main() {
    s := server.NewMCPServer("Advanced SSE Server", "1.0.0",
        server.WithResourceCapabilities(true, true),
        server.WithPromptCapabilities(true),
        server.WithToolCapabilities(true),
        server.WithLogging(),
    )

    // 添加协作工具
    addCollaborativeTools(s)
    addRealTimeResources(s)

    log.Println("Starting advanced SSE server on :8080")
    sseServer := server.NewSSEServer(s,
        server.WithStaticBasePath("/mcp"),
        server.WithKeepAliveInterval(30*time.Second),
        server.WithBaseURL("http://localhost:8080"),
    )

    if err := sseServer.Start(":8080"); err != nil {
        log.Fatal(err)
    }
}

// 高级示例辅助函数
func addRealTimeResources(s *server.MCPServer) {
    // 占位符实现 - 会添加实时资源
}

func addCollaborativeTools(s *server.MCPServer) {
    // 共享文档编辑
    s.AddTool(
        mcp.NewTool("edit_document",
            mcp.WithDescription("Edit a shared document"),
            mcp.WithString("doc_id", mcp.Required()),
            mcp.WithString("operation", mcp.Required()),
            mcp.WithObject("data", mcp.Required()),
        ),
        handleDocumentEdit,
    )

    // 实时聊天
    s.AddTool(
        mcp.NewTool("send_message",
            mcp.WithDescription("Send a message to all connected clients"),
            mcp.WithString("message", mcp.Required()),
            mcp.WithString("channel", mcp.DefaultString("general")),
        ),
        handleSendMessage,
    )

    // 实时数据更新
    s.AddTool(
        mcp.NewTool("subscribe_updates",
            mcp.WithDescription("Subscribe to real-time data updates"),
            mcp.WithString("topic", mcp.Required()),
            mcp.WithArray("filters", mcp.Description("Optional filters")),
        ),
        handleSubscribeUpdates,
    )
}

配置

SSE 服务器选项

SSE 服务器可以通过各种选项进行配置:

go
sseServer := server.NewSSEServer(s,
    // 设置 SSE 端点的基本路径
    server.WithStaticBasePath("/api/mcp"),

    // 配置 keep-alive 间隔
    server.WithKeepAliveInterval(30*time.Second),

    // 设置客户端连接的基础 URL
    server.WithBaseURL("http://localhost:8080"),

    // 配置 SSE 和消息端点
    server.WithSSEEndpoint("/sse"),
    server.WithMessageEndpoint("/message"),

    // 添加用于请求处理的上下文函数
    server.WithSSEContextFunc(func(ctx context.Context, r *http.Request) context.Context {
        // 从 headers 添加自定义上下文值
        return ctx
    }),

    // 为基于浏览器的客户端启用跨域资源共享
    server.WithSSECORS(
        server.WithCORSAllowedOrigins("https://my-ai-app.com"),
        server.WithCORSAllowCredentials(),
    ),
)

CORS 处理是可选的。如果没有使用 WithSSECORS,SSE 流会回退到历史默认的 Access-Control-Allow-Origin: * 以保持向后兼容性,而消息端点不会发送 CORS headers。 请参阅 流式 HTTP CORS 文档 获取两种传输共享的完整 CORSOption 辅助函数列表。

结果端点:

  • SSE 流:http://localhost:8080/api/mcp/sse
  • 消息端点:http://localhost:8080/api/mcp/message

SSE 客户端超时选项

在客户端使用传统 SSE 传输时,您可以配置端点和响应超时:

go
import "github.com/mark3labs/mcp-go/client/transport"

sseTransport, err := transport.NewSSE("http://localhost:8080/sse",
    // Start() 期间等待 SSE 端点 URL 的最长时间(默认:30s)
    transport.WithEndpointTimeout(60*time.Second),

    // 发送请求后等待响应的最长时间(默认:60s)
    transport.WithResponseTimeout(120*time.Second),
)

零值或负值会被静默忽略以防止配置错误。如果传递给操作上下文的截止时间更短,则以较短值为准。

实时通知

SSE 传输通过通知支持实时的服务器到客户端通信。使用服务器上下文发送通知:

go
func handleRealtimeTool(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
    // 从上下文获取 MCP 服务器
    mcpServer := server.ServerFromContext(ctx)

    // 向客户端发送通知
    if mcpServer != nil {
        err := mcpServer.SendNotificationToClient(ctx, "custom_event", map[string]interface{}{
            "message":  "Real-time update",
            "timestamp": time.Now().Unix(),
        })
        if err != nil {
            log.Printf("Failed to send notification: %v", err)
        }
    }

    return mcp.NewToolResultText(`{"status":"notification_sent"}`), nil
}

会话管理

SSE 服务器自动处理会话管理。您可以使用服务器的通知方法向特定会话发送事件:

go
// 向当前客户端会话发送通知
mcpServer.SendNotificationToClient(ctx, "progress_update", progressData)

// 向所有连接的客户端发送通知(如果支持)
// 注意:检查服务器实现以获取广播功能

请求头

与流式 HTTP 传输一样,SSE 传输将 HTTP 请求头传递给 MCP 处理程序。这允许您在工具和资源处理程序中访问随 SSE 连接发送的原始 HTTP headers。

在处理程序中访问 Headers

SSE 连接中的 headers 在所有 MCP 请求对象中都可用:

go
func handleStreamData(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
    // 访问请求头
    headers := req.Header

    // 使用 headers 进行身份验证、追踪等
    authToken := headers.Get("Authorization")
    if authToken == "" {
        return nil, fmt.Errorf("authentication required")
    }

    // 访问其他 headers
    requestID := headers.Get("X-Request-ID")
    userAgent := headers.Get("User-Agent")

    // 处理程序的其余代码...
    mcpServer := server.ServerFromContext(ctx)
    // ...
}

这适用于所有 MCP 请求类型,包括:

  • CallToolRequest
  • ReadResourceRequest
  • ListToolsRequest
  • ListResourcesRequest
  • InitializeRequest
  • 以及其他 MCP 请求类型

headers 由 SSE 传输层从初始 SSE 连接自动填充,在处理程序中无需额外配置即可使用。

注意:由于 SSE 保持持久连接,headers 在建立连接时被捕获,并在该连接整个生命周期内保持不变。

后续步骤