SSE 传输
服务器发送事件(SSE)传输支持 MCP 客户端与服务器之间的实时、Web 友好的通信。非常适合需要实时更新和多客户端支持的 Web 应用程序。
适用场景
SSE 传输非常适合:
- Web 应用程序:基于浏览器的 LLM 界面
- 实时仪表板:实时数据监控和可视化
- 协作工具:具有共享状态的多用户环境
- 流式响应:具有进度更新的长时间运行操作
- 事件驱动系统:需要服务器主动通信的应用程序
示例应用:
- 基于 Web 的 LLM 聊天界面
- 实时分析仪表板
- 协作文档编辑
- 实时系统监控工具
- 流式数据处理界面
实现
基本的 SSE 服务器
go
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/mark3labs/mcp-go/mcp"
"github.com/mark3labs/mcp-go/server"
)
func main() {
s := server.NewMCPServer("SSE Server", "1.0.0",
server.WithToolCapabilities(true),
server.WithResourceCapabilities(true, true),
)
// 添加实时工具
s.AddTool(
mcp.NewTool("stream_data",
mcp.WithDescription("Stream data with real-time updates"),
mcp.WithString("source", mcp.Required()),
mcp.WithNumber("count", mcp.DefaultNumber(10)),
),
handleStreamData,
)
s.AddTool(
mcp.NewTool("monitor_system",
mcp.WithDescription("Monitor system metrics in real-time"),
mcp.WithNumber("duration", mcp.DefaultNumber(60)),
),
handleSystemMonitor,
)
// 添加动态资源
s.AddResource(
mcp.NewResource(
"metrics://current",
"Current System Metrics",
mcp.WithResourceDescription("Real-time system metrics"),
mcp.WithMIMEType("application/json"),
),
handleCurrentMetrics,
)
// 启动 SSE 服务器
log.Println("Starting SSE server on :8080")
sseServer := server.NewSSEServer(s)
if err := sseServer.Start(":8080"); err != nil {
log.Fatal(err)
}
}
func handleStreamData(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
// 访问请求头
headers := req.Header
// 使用 headers 进行身份验证、追踪等
authToken := headers.Get("Authorization")
if authToken == "" {
return nil, fmt.Errorf("authentication required")
}
// 访问其他 headers
requestID := headers.Get("X-Request-ID")
userAgent := headers.Get("User-Agent")
source := req.GetString("source", "")
count := req.GetInt("count", 10)
// 从上下文获取服务器用于通知
mcpServer := server.ServerFromContext(ctx)
// 带进度更新的数据流
var results []map[string]interface{}
for i := 0; i < count; i++ {
// 检查取消
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
// 模拟数据处理
data := generateData(source, i)
results = append(results, data)
// 发送进度通知
if mcpServer != nil {
err := mcpServer.SendNotificationToClient(ctx, "notifications/progress", map[string]interface{}{
"progress": i + 1,
"total": count,
"message": fmt.Sprintf("Processed %d/%d items from %s", i+1, count, source),
})
if err != nil {
log.Printf("Failed to send notification: %v", err)
}
}
time.Sleep(100 * time.Millisecond)
}
return mcp.NewToolResultText(fmt.Sprintf(`{"source":"%s","results":%v,"count":%d}`,
source, results, len(results))), nil
}
// 示例辅助函数
func generateData(source string, index int) map[string]interface{} {
return map[string]interface{}{
"source": source,
"index": index,
"value": fmt.Sprintf("data_%d", index),
}
}
func handleSystemMonitor(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
duration := req.GetInt("duration", 60)
mcpServer := server.ServerFromContext(ctx)
// 监控指定持续时间的系统
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
timeout := time.After(time.Duration(duration) * time.Second)
var metrics []map[string]interface{}
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-timeout:
return mcp.NewToolResultText(fmt.Sprintf(`{"duration":%d,"metrics":%v,"samples":%d}`,
duration, metrics, len(metrics))), nil
case <-ticker.C:
// 收集当前指标
currentMetrics := collectSystemMetrics()
metrics = append(metrics, currentMetrics)
// 发送实时更新
if mcpServer != nil {
err := mcpServer.SendNotificationToClient(ctx, "system_metrics", currentMetrics)
if err != nil {
log.Printf("Failed to send system metrics notification: %v", err)
}
}
}
}
}
func collectSystemMetrics() map[string]interface{} {
// 占位符实现
return map[string]interface{}{
"cpu": 50.5,
"memory": 75.2,
"disk": 30.1,
}
}
func handleCurrentMetrics(ctx context.Context, req mcp.ReadResourceRequest) ([]mcp.ResourceContents, error) {
metrics := collectSystemMetrics()
return []mcp.ResourceContents{
mcp.TextResourceContents{
URI: req.Params.URI,
MIMEType: "application/json",
Text: fmt.Sprintf(`{"cpu":%.1f,"memory":%.1f,"disk":%.1f}`, metrics["cpu"], metrics["memory"], metrics["disk"]),
},
}, nil
}高级 SSE 配置
go
func main() {
s := server.NewMCPServer("Advanced SSE Server", "1.0.0",
server.WithResourceCapabilities(true, true),
server.WithPromptCapabilities(true),
server.WithToolCapabilities(true),
server.WithLogging(),
)
// 添加协作工具
addCollaborativeTools(s)
addRealTimeResources(s)
log.Println("Starting advanced SSE server on :8080")
sseServer := server.NewSSEServer(s,
server.WithStaticBasePath("/mcp"),
server.WithKeepAliveInterval(30*time.Second),
server.WithBaseURL("http://localhost:8080"),
)
if err := sseServer.Start(":8080"); err != nil {
log.Fatal(err)
}
}
// 高级示例辅助函数
func addRealTimeResources(s *server.MCPServer) {
// 占位符实现 - 会添加实时资源
}
func addCollaborativeTools(s *server.MCPServer) {
// 共享文档编辑
s.AddTool(
mcp.NewTool("edit_document",
mcp.WithDescription("Edit a shared document"),
mcp.WithString("doc_id", mcp.Required()),
mcp.WithString("operation", mcp.Required()),
mcp.WithObject("data", mcp.Required()),
),
handleDocumentEdit,
)
// 实时聊天
s.AddTool(
mcp.NewTool("send_message",
mcp.WithDescription("Send a message to all connected clients"),
mcp.WithString("message", mcp.Required()),
mcp.WithString("channel", mcp.DefaultString("general")),
),
handleSendMessage,
)
// 实时数据更新
s.AddTool(
mcp.NewTool("subscribe_updates",
mcp.WithDescription("Subscribe to real-time data updates"),
mcp.WithString("topic", mcp.Required()),
mcp.WithArray("filters", mcp.Description("Optional filters")),
),
handleSubscribeUpdates,
)
}配置
SSE 服务器选项
SSE 服务器可以通过各种选项进行配置:
go
sseServer := server.NewSSEServer(s,
// 设置 SSE 端点的基本路径
server.WithStaticBasePath("/api/mcp"),
// 配置 keep-alive 间隔
server.WithKeepAliveInterval(30*time.Second),
// 设置客户端连接的基础 URL
server.WithBaseURL("http://localhost:8080"),
// 配置 SSE 和消息端点
server.WithSSEEndpoint("/sse"),
server.WithMessageEndpoint("/message"),
// 添加用于请求处理的上下文函数
server.WithSSEContextFunc(func(ctx context.Context, r *http.Request) context.Context {
// 从 headers 添加自定义上下文值
return ctx
}),
// 为基于浏览器的客户端启用跨域资源共享
server.WithSSECORS(
server.WithCORSAllowedOrigins("https://my-ai-app.com"),
server.WithCORSAllowCredentials(),
),
)CORS 处理是可选的。如果没有使用 WithSSECORS,SSE 流会回退到历史默认的 Access-Control-Allow-Origin: * 以保持向后兼容性,而消息端点不会发送 CORS headers。 请参阅 流式 HTTP CORS 文档 获取两种传输共享的完整 CORSOption 辅助函数列表。
结果端点:
- SSE 流:
http://localhost:8080/api/mcp/sse - 消息端点:
http://localhost:8080/api/mcp/message
SSE 客户端超时选项
在客户端使用传统 SSE 传输时,您可以配置端点和响应超时:
go
import "github.com/mark3labs/mcp-go/client/transport"
sseTransport, err := transport.NewSSE("http://localhost:8080/sse",
// Start() 期间等待 SSE 端点 URL 的最长时间(默认:30s)
transport.WithEndpointTimeout(60*time.Second),
// 发送请求后等待响应的最长时间(默认:60s)
transport.WithResponseTimeout(120*time.Second),
)零值或负值会被静默忽略以防止配置错误。如果传递给操作上下文的截止时间更短,则以较短值为准。
实时通知
SSE 传输通过通知支持实时的服务器到客户端通信。使用服务器上下文发送通知:
go
func handleRealtimeTool(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
// 从上下文获取 MCP 服务器
mcpServer := server.ServerFromContext(ctx)
// 向客户端发送通知
if mcpServer != nil {
err := mcpServer.SendNotificationToClient(ctx, "custom_event", map[string]interface{}{
"message": "Real-time update",
"timestamp": time.Now().Unix(),
})
if err != nil {
log.Printf("Failed to send notification: %v", err)
}
}
return mcp.NewToolResultText(`{"status":"notification_sent"}`), nil
}会话管理
SSE 服务器自动处理会话管理。您可以使用服务器的通知方法向特定会话发送事件:
go
// 向当前客户端会话发送通知
mcpServer.SendNotificationToClient(ctx, "progress_update", progressData)
// 向所有连接的客户端发送通知(如果支持)
// 注意:检查服务器实现以获取广播功能请求头
与流式 HTTP 传输一样,SSE 传输将 HTTP 请求头传递给 MCP 处理程序。这允许您在工具和资源处理程序中访问随 SSE 连接发送的原始 HTTP headers。
在处理程序中访问 Headers
SSE 连接中的 headers 在所有 MCP 请求对象中都可用:
go
func handleStreamData(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
// 访问请求头
headers := req.Header
// 使用 headers 进行身份验证、追踪等
authToken := headers.Get("Authorization")
if authToken == "" {
return nil, fmt.Errorf("authentication required")
}
// 访问其他 headers
requestID := headers.Get("X-Request-ID")
userAgent := headers.Get("User-Agent")
// 处理程序的其余代码...
mcpServer := server.ServerFromContext(ctx)
// ...
}这适用于所有 MCP 请求类型,包括:
CallToolRequestReadResourceRequestListToolsRequestListResourcesRequestInitializeRequest- 以及其他 MCP 请求类型
headers 由 SSE 传输层从初始 SSE 连接自动填充,在处理程序中无需额外配置即可使用。
注意:由于 SSE 保持持久连接,headers 在建立连接时被捕获,并在该连接整个生命周期内保持不变。

