公告

任何建议或需求可联系我!


Skip to content

StreamableHTTP 传输

StreamableHTTP 传输为 MCP 服务器提供传统的请求/响应通信,非常适合类似 REST 的交互、无状态客户端以及与现有 Web 基础设施的集成。

用例

StreamableHTTP 传输在以下场景中表现出色:

  • Web 服务:传统 REST API 模式
  • 无状态交互:每个请求都是独立的
  • 负载均衡:跨多个服务器分发请求
  • 缓存:利用 HTTP 缓存机制
  • 集成:与现有 HTTP 基础设施配合使用
  • 公共 API:将 MCP 功能作为 Web API 公开

示例应用:

  • 微服务架构
  • 公共 API 端点
  • 与 API 网关集成
  • 缓存数据服务
  • 速率限制服务
  • 多租户应用

实现

基本 StreamableHTTP 服务器

go
package main

import (
    "context"
    "fmt"
    "log"
    "net/http"
    "strings"
    "time"

    "github.com/mark3labs/mcp-go/mcp"
    "github.com/mark3labs/mcp-go/server"
)

func main() {
    s := server.NewMCPServer("StreamableHTTP API Server", "1.0.0",
        server.WithToolCapabilities(true),
        server.WithResourceCapabilities(true, true),
    )

    // 添加 RESTful 工具
    s.AddTool(
        mcp.NewTool("get_user",
            mcp.WithDescription("获取用户信息"),
            mcp.WithString("user_id", mcp.Required()),
        ),
        handleGetUser,
    )

    s.AddTool(
        mcp.NewTool("create_user",
            mcp.WithDescription("创建新用户"),
            mcp.WithString("name", mcp.Required()),
            mcp.WithString("email", mcp.Required()),
            mcp.WithNumber("age", mcp.Min(0)),
        ),
        handleCreateUser,
    )

    s.AddTool(
        mcp.NewTool("search_users",
            mcp.WithDescription("使用过滤器搜索用户"),
            mcp.WithString("query", mcp.Description("搜索查询")),
            mcp.WithNumber("limit", mcp.DefaultNumber(10), mcp.Max(100)),
            mcp.WithNumber("offset", mcp.DefaultNumber(0), mcp.Min(0)),
        ),
        handleSearchUsers,
    )

    // 添加资源
    s.AddResource(
        mcp.NewResource(
            "users://{user_id}",
            "User Profile",
            mcp.WithResourceDescription("用户资料数据"),
            mcp.WithMIMEType("application/json"),
        ),
        handleUserResource,
    )

    // 启动 StreamableHTTP 服务器
    log.Println("Starting StreamableHTTP server on :8080")
    httpServer := server.NewStreamableHTTPServer(s)
    if err := httpServer.Start(":8080"); err != nil {
        log.Fatal(err)
    }
}

func handleGetUser(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
    userID := req.GetString("user_id", "")
    if userID == "" {
        return nil, fmt.Errorf("user_id is required")
    }
    
    // 模拟数据库查找
    user, err := getUserFromDB(userID)
    if err != nil {
        return nil, fmt.Errorf("user not found: %s", userID)
    }

    return mcp.NewToolResultText(fmt.Sprintf(`{"id":"%s","name":"%s","email":"%s","age":%d}`, 
        user.ID, user.Name, user.Email, user.Age)), nil
}

func handleCreateUser(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
    name := req.GetString("name", "")
    email := req.GetString("email", "")
    age := req.GetInt("age", 0)

    if name == "" || email == "" {
        return nil, fmt.Errorf("name and email are required")
    }

    // 验证输入
    if !isValidEmail(email) {
        return nil, fmt.Errorf("invalid email format: %s", email)
    }

    // 创建用户
    user := &User{
        ID:        generateID(),
        Name:      name,
        Email:     email,
        Age:       age,
        CreatedAt: time.Now(),
    }

    if err := saveUserToDB(user); err != nil {
        return nil, fmt.Errorf("failed to create user: %w", err)
    }

    return mcp.NewToolResultText(fmt.Sprintf(`{"id":"%s","message":"User created successfully","user":{"id":"%s","name":"%s","email":"%s","age":%d}}`,
        user.ID, user.ID, user.Name, user.Email, user.Age)), nil
}

// 示例辅助函数和类型
type User struct {
    ID        string    `json:"id"`
    Name      string    `json:"name"`
    Email     string    `json:"email"`
    Age       int       `json:"age"`
    CreatedAt time.Time `json:"created_at"`
}

func getUserFromDB(userID string) (*User, error) {
    // 占位符实现
    return &User{
        ID:    userID,
        Name:  "John Doe",
        Email: "john@example.com",
        Age:   30,
    }, nil
}

func isValidEmail(email string) bool {
    return strings.Contains(email, "@") && strings.Contains(email, ".")
}

func generateID() string {
    // 占位符实现
    return fmt.Sprintf("user_%d", time.Now().UnixNano())
}

func saveUserToDB(user *User) error {
    // 占位符实现
    return nil
}

func handleSearchUsers(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
    query := req.GetString("query", "")
    limit := req.GetInt("limit", 10)
    offset := req.GetInt("offset", 0)

    // 使用分页搜索用户
    users, total, err := searchUsersInDB(query, limit, offset)
    if err != nil {
        return nil, fmt.Errorf("search failed: %w", err)
    }

    return mcp.NewToolResultText(fmt.Sprintf(`{"users":[{"id":"1","name":"John Doe","email":"john@example.com","age":30},{"id":"2","name":"Jane Smith","email":"jane@example.com","age":25}],"total":%d,"limit":%d,"offset":%d,"query":"%s"}`,
        total, limit, offset, query)), nil
}

func handleUserResource(ctx context.Context, req mcp.ReadResourceRequest) ([]mcp.ResourceContents, error) {
    userID := extractUserIDFromURI(req.Params.URI)
    
    user, err := getUserFromDB(userID)
    if err != nil {
        return nil, fmt.Errorf("user not found: %s", userID)
    }

    return []mcp.ResourceContents{
        mcp.TextResourceContents{
            URI:      req.Params.URI,
            MIMEType: "application/json",
            Text:     fmt.Sprintf(`{"id":"%s","name":"%s","email":"%s","age":%d}`, user.ID, user.Name, user.Email, user.Age),
        },
    }, nil
}

// 其他辅助函数

func searchUsersInDB(query string, limit, offset int) ([]*User, int, error) {
    // 占位符实现
    users := []*User{
        {ID: "1", Name: "John Doe", Email: "john@example.com", Age: 30},
        {ID: "2", Name: "Jane Smith", Email: "jane@example.com", Age: 25},
    }
    return users, len(users), nil
}

func extractUserIDFromURI(uri string) string {
    // 从 "users://123" 这样的 URI 中提取用户 ID
    if len(uri) > 8 && uri[:8] == "users://" {
        return uri[8:]
    }
    return uri
}

高级 StreamableHTTP 配置

go
func main() {
    s := server.NewMCPServer("Advanced StreamableHTTP Server", "1.0.0",
        server.WithResourceCapabilities(true, true),
        server.WithPromptCapabilities(true),
        server.WithToolCapabilities(true),
        server.WithLogging(),
    )

    // 添加全面的工具和资源
    addCRUDTools(s)
    addBatchTools(s)
    addAnalyticsTools(s)

    log.Println("Starting advanced StreamableHTTP server on :8080")
    httpServer := server.NewStreamableHTTPServer(s,
        server.WithEndpointPath("/api/v1/mcp"),
        server.WithHeartbeatInterval(30*time.Second),
        server.WithStateLess(false),
        server.WithSessionIdleTTL(10*time.Minute), // 10 分钟后清理空闲会话
    )
    
    if err := httpServer.Start(":8080"); err != nil {
        log.Fatal(err)
    }
}

端点

标准 MCP 端点

当您启动 StreamableHTTP MCP 服务器时,它会自动创建这些端点:

POST /mcp/initialize     - 初始化 MCP 会话
POST /mcp/tools/list     - 列出可用工具
POST /mcp/tools/call     - 调用工具
POST /mcp/resources/list - 列出可用资源
POST /mcp/resources/read - 读取资源
POST /mcp/prompts/list   - 列出可用提示词
POST /mcp/prompts/get    - 获取提示词
GET  /mcp/health         - 健康检查
GET  /mcp/capabilities   - 服务器能力

自定义端点

在旁边添加自定义 HTTP 端点 MCP:

go
func main() {
    s := server.NewMCPServer("Custom StreamableHTTP Server", "1.0.0")
    
    // 创建带有自定义路由的 HTTP 服务器
    mux := http.NewServeMux()
    
    // 添加 MCP 端点
    mux.Handle("/mcp", server.NewStreamableHTTPServer(s))
    
    // 添加自定义端点
    mux.HandleFunc("/api/status", handleStatus)
    mux.HandleFunc("/api/metrics", handleMetrics)
    mux.HandleFunc("/api/users", handleUsersAPI)
    mux.HandleFunc("/api/upload", handleFileUpload)
    
    // 添加中间件
    handler := addMiddleware(mux)
    
    log.Println("Starting custom StreamableHTTP server on :8080")
    if err := http.ListenAndServe(":8080", handler); err != nil {
        log.Fatal(err)
    }
}

会话管理

会话空闲 TTL

当客户端断开连接而未发送 DELETE 请求时,每个会话的传输状态(工具、资源、日志级别等)可能会泄漏内存。使用 WithSessionIdleTTL 自动清理空闲会话:

go
httpServer := server.NewStreamableHTTPServer(s,
    server.WithSessionIdleTTL(10*time.Minute), // 清理空闲 10+ 分钟的会话
)

清理器在后台运行,删除在 TTL 内未收到任何请求的会话的每个会话状态。它还会使被清理的会话 ID 失效,以防止重复使用。零或负 TTL 禁用清理器(默认行为)。清理器在 Shutdown() 时自动停止。

有状态与无状态

无状态设计(推荐)

go
// 每个请求都是独立的
func handleStatelessTool(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
    // 从请求中提取所有需要的信息
    userID := extractUserFromToken(ctx)
    params := req.Params.Arguments
    
    // 处理时不依赖服务器状态
    result, err := processRequest(userID, params)
    if err != nil {
        return nil, err
    }
    
    return mcp.NewToolResultJSON(result)
}

有状态设计(需要时)

go
type HTTPSessionManager struct {
    sessions map[string]*HTTPSession
    mutex    sync.RWMutex
    cleanup  *time.Ticker
}

type HTTPSession struct {
    ID          string
    UserID      string
    CreatedAt   time.Time
    LastAccess  time.Time
    Data        map[string]interface{}
    ExpiresAt   time.Time
}

func NewHTTPSessionManager() *HTTPSessionManager {
    sm := &HTTPSessionManager{
        sessions: make(map[string]*HTTPSession),
        cleanup:  time.NewTicker(1 * time.Minute),
    }
    
    go sm.cleanupExpiredSessions()
    return sm
}

身份验证和授权

go
type AuthMiddleware struct {
    jwtSecret []byte
    userStore UserStore
}

func NewAuthMiddleware(secret []byte, store UserStore) *AuthMiddleware {
    return &AuthMiddleware{
        jwtSecret: secret,
        userStore: store,
    }
}

func (m *AuthMiddleware) Middleware(next http.Handler) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        // 从 Authorization 头提取令牌
        authHeader := r.Header.Get("Authorization")
        if !strings.HasPrefix(authHeader, "Bearer ") {
            http.Error(w, "Missing or invalid authorization header", http.StatusUnauthorized)
            return
        }
        
        token := strings.TrimPrefix(authHeader, "Bearer ")
        
        // 验证 JWT 令牌
        claims, err := m.validateJWT(token)
        if err != nil {
            http.Error(w, "Invalid token", http.StatusUnauthorized)
            return
        }
        
        // 加载用户信息
        user, err := m.userStore.GetUser(claims.UserID)
        if err != nil {
            http.Error(w, "User not found", http.StatusUnauthorized)
            return
        }
        
        // 将用户添加到请求上下文
        ctx := context.WithValue(r.Context(), "user", user)
        next.ServeHTTP(w, r.WithContext(ctx))
    })
}

OAuth 保护的资源元数据(RFC 9728)

当您的 MCP 服务器需要 OAuth 时,客户端需要一种发现支持的授权服务器、作用域和承载方法的方法。MCP 授权规范将客户端指向 RFC 9728 /.well-known/oauth-protected-resource 端点来执行此操作。

mcp-go 提供两种方式来服务此元数据。

选项 A:通过服务器选项自动挂载

最简单的方法——启动时自动注册已知端点,在服务器作为 http.Handler 通过 ServeHTTP 挂载时也会被调度。

go
mcpServer := server.NewMCPServer("my-server", "1.0.0")

httpServer := server.NewStreamableHTTPServer(mcpServer,
    server.WithProtectedResourceMetadata(server.ProtectedResourceMetadataConfig{
        Resource:               "https://my-mcp-server.com",
        AuthorizationServers:   []string{"https://auth.example.com"},
        ScopesSupported:        []string{"mcp:read", "mcp:write"},
        BearerMethodsSupported: []string{"header"},
        ResourceName:           "My MCP Server",
    }),
)

httpServer.Start(":8080")
// GET http://localhost:8080/.well-known/oauth-protected-resource
//   -> 200 application/json 带有上述元数据

对于 SSE 服务器,使用等效的 WithSSEProtectedResourceMetadata

选项 B:用于自定义路由的独立处理程序

如果您构建自己的 http.ServeMux(或使用 chi / gin 等路由器),直接使用 NewProtectedResourceMetadataHandler

跨域资源共享(CORS)

MCP 端点本身默认不启用 CORS——每个部署都有不同的来源要求,无条件发出允许的 headers 将是安全回归。除非您选择加入,否则托管在与服务器不同来源的基于浏览器的 MCP 客户端将被阻止。

使用 server.WithStreamableHTTPCORS(或 SSE 传输的 server.WithSSECORS)来启用 CORS 处理。

采样支持

StreamableHTTP 传输现在支持双向采样,允许服务器从客户端请求 LLM 完成。这实现了服务器可以利用客户端 LLM 能力的高级场景。

::::warning[安全:需要人工介入] 根据 MCP 规范,实现 应该 始终在有能力拒绝采样请求的用户循环中。

您的采样处理程序实现必须:

  • 在执行前向用户显示采样请求供审查
  • 允许用户在发送到 LLM 之前查看和编辑提示
  • 在返回服务器前显示生成的响应供批准
  • 提供清晰的 UI 来接受或拒绝采样请求

未能实现批准流程会造成严重的安全和信任风险。 ::::

采样要求

要使用 StreamableHTTP 传输启用采样,客户端必须使用 WithContinuousListening() 选项:

go
// 使用采样支持设置客户端
httpTransport, err := transport.NewStreamableHTTP(
    serverURL,
    transport.WithContinuousListening(), // 采样需要
)

// 使用采样处理程序创建客户端
mcpClient := client.NewClient(httpTransport, 
    client.WithSamplingHandler(samplingHandler))

服务器端实现

在您的 StreamableHTTP 服务器中启用采样:

go
mcpServer := server.NewMCPServer("HTTP Sampling Server", "1.0.0")
mcpServer.EnableSampling()

// 添加使用采样的工具
mcpServer.AddTool(mcp.Tool{
    Name:        "ask-llm",
    Description: "向 LLM 提问",
    // ...
}, func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
    // 从客户端请求采样
    samplingRequest := mcp.CreateMessageRequest{
        // ...
    }
    
    result, err := mcpServer.RequestSampling(ctx, samplingRequest)
    // ...
})

工作原理

  1. 持久连接:启用 WithContinuousListening() 时,客户端维护到服务器的持久 SSE 连接
  2. 双向通信:服务器可以通过 SSE 流发送采样请求
  3. 响应通道:客户端通过同一端点的 HTTP POST 响应采样请求
  4. 会话关联:使用会话 ID 关联响应以确保它们到达正确的处理程序

限制

  • 采样需要 WithContinuousListening() 来维护 SSE 连接
  • 没有持续监听,传输仅在无状态请求/响应模式下运行
  • 网络中断可能需要重新连接和重新建立采样通道

在非 net/http 框架中嵌入

StreamableHTTPServer 暴露两个等效的入口点:

  • ServeHTTP(w http.ResponseWriter, r *http.Request) — 标准 net/http 处理程序
  • Handle(w HTTPResponseWriter, r *HTTPRequest) — 传输无关的入口点,允许您嵌入 MCP 到通过 net/http 的 HTTP 框架(例如 fasthttpfiber

下一步