StreamableHTTP 传输
StreamableHTTP 传输为 MCP 服务器提供传统的请求/响应通信,非常适合类似 REST 的交互、无状态客户端以及与现有 Web 基础设施的集成。
用例
StreamableHTTP 传输在以下场景中表现出色:
- Web 服务:传统 REST API 模式
- 无状态交互:每个请求都是独立的
- 负载均衡:跨多个服务器分发请求
- 缓存:利用 HTTP 缓存机制
- 集成:与现有 HTTP 基础设施配合使用
- 公共 API:将 MCP 功能作为 Web API 公开
示例应用:
- 微服务架构
- 公共 API 端点
- 与 API 网关集成
- 缓存数据服务
- 速率限制服务
- 多租户应用
实现
基本 StreamableHTTP 服务器
package main
import (
"context"
"fmt"
"log"
"net/http"
"strings"
"time"
"github.com/mark3labs/mcp-go/mcp"
"github.com/mark3labs/mcp-go/server"
)
func main() {
s := server.NewMCPServer("StreamableHTTP API Server", "1.0.0",
server.WithToolCapabilities(true),
server.WithResourceCapabilities(true, true),
)
// 添加 RESTful 工具
s.AddTool(
mcp.NewTool("get_user",
mcp.WithDescription("获取用户信息"),
mcp.WithString("user_id", mcp.Required()),
),
handleGetUser,
)
s.AddTool(
mcp.NewTool("create_user",
mcp.WithDescription("创建新用户"),
mcp.WithString("name", mcp.Required()),
mcp.WithString("email", mcp.Required()),
mcp.WithNumber("age", mcp.Min(0)),
),
handleCreateUser,
)
s.AddTool(
mcp.NewTool("search_users",
mcp.WithDescription("使用过滤器搜索用户"),
mcp.WithString("query", mcp.Description("搜索查询")),
mcp.WithNumber("limit", mcp.DefaultNumber(10), mcp.Max(100)),
mcp.WithNumber("offset", mcp.DefaultNumber(0), mcp.Min(0)),
),
handleSearchUsers,
)
// 添加资源
s.AddResource(
mcp.NewResource(
"users://{user_id}",
"User Profile",
mcp.WithResourceDescription("用户资料数据"),
mcp.WithMIMEType("application/json"),
),
handleUserResource,
)
// 启动 StreamableHTTP 服务器
log.Println("Starting StreamableHTTP server on :8080")
httpServer := server.NewStreamableHTTPServer(s)
if err := httpServer.Start(":8080"); err != nil {
log.Fatal(err)
}
}
func handleGetUser(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
userID := req.GetString("user_id", "")
if userID == "" {
return nil, fmt.Errorf("user_id is required")
}
// 模拟数据库查找
user, err := getUserFromDB(userID)
if err != nil {
return nil, fmt.Errorf("user not found: %s", userID)
}
return mcp.NewToolResultText(fmt.Sprintf(`{"id":"%s","name":"%s","email":"%s","age":%d}`,
user.ID, user.Name, user.Email, user.Age)), nil
}
func handleCreateUser(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
name := req.GetString("name", "")
email := req.GetString("email", "")
age := req.GetInt("age", 0)
if name == "" || email == "" {
return nil, fmt.Errorf("name and email are required")
}
// 验证输入
if !isValidEmail(email) {
return nil, fmt.Errorf("invalid email format: %s", email)
}
// 创建用户
user := &User{
ID: generateID(),
Name: name,
Email: email,
Age: age,
CreatedAt: time.Now(),
}
if err := saveUserToDB(user); err != nil {
return nil, fmt.Errorf("failed to create user: %w", err)
}
return mcp.NewToolResultText(fmt.Sprintf(`{"id":"%s","message":"User created successfully","user":{"id":"%s","name":"%s","email":"%s","age":%d}}`,
user.ID, user.ID, user.Name, user.Email, user.Age)), nil
}
// 示例辅助函数和类型
type User struct {
ID string `json:"id"`
Name string `json:"name"`
Email string `json:"email"`
Age int `json:"age"`
CreatedAt time.Time `json:"created_at"`
}
func getUserFromDB(userID string) (*User, error) {
// 占位符实现
return &User{
ID: userID,
Name: "John Doe",
Email: "john@example.com",
Age: 30,
}, nil
}
func isValidEmail(email string) bool {
return strings.Contains(email, "@") && strings.Contains(email, ".")
}
func generateID() string {
// 占位符实现
return fmt.Sprintf("user_%d", time.Now().UnixNano())
}
func saveUserToDB(user *User) error {
// 占位符实现
return nil
}
func handleSearchUsers(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
query := req.GetString("query", "")
limit := req.GetInt("limit", 10)
offset := req.GetInt("offset", 0)
// 使用分页搜索用户
users, total, err := searchUsersInDB(query, limit, offset)
if err != nil {
return nil, fmt.Errorf("search failed: %w", err)
}
return mcp.NewToolResultText(fmt.Sprintf(`{"users":[{"id":"1","name":"John Doe","email":"john@example.com","age":30},{"id":"2","name":"Jane Smith","email":"jane@example.com","age":25}],"total":%d,"limit":%d,"offset":%d,"query":"%s"}`,
total, limit, offset, query)), nil
}
func handleUserResource(ctx context.Context, req mcp.ReadResourceRequest) ([]mcp.ResourceContents, error) {
userID := extractUserIDFromURI(req.Params.URI)
user, err := getUserFromDB(userID)
if err != nil {
return nil, fmt.Errorf("user not found: %s", userID)
}
return []mcp.ResourceContents{
mcp.TextResourceContents{
URI: req.Params.URI,
MIMEType: "application/json",
Text: fmt.Sprintf(`{"id":"%s","name":"%s","email":"%s","age":%d}`, user.ID, user.Name, user.Email, user.Age),
},
}, nil
}
// 其他辅助函数
func searchUsersInDB(query string, limit, offset int) ([]*User, int, error) {
// 占位符实现
users := []*User{
{ID: "1", Name: "John Doe", Email: "john@example.com", Age: 30},
{ID: "2", Name: "Jane Smith", Email: "jane@example.com", Age: 25},
}
return users, len(users), nil
}
func extractUserIDFromURI(uri string) string {
// 从 "users://123" 这样的 URI 中提取用户 ID
if len(uri) > 8 && uri[:8] == "users://" {
return uri[8:]
}
return uri
}高级 StreamableHTTP 配置
func main() {
s := server.NewMCPServer("Advanced StreamableHTTP Server", "1.0.0",
server.WithResourceCapabilities(true, true),
server.WithPromptCapabilities(true),
server.WithToolCapabilities(true),
server.WithLogging(),
)
// 添加全面的工具和资源
addCRUDTools(s)
addBatchTools(s)
addAnalyticsTools(s)
log.Println("Starting advanced StreamableHTTP server on :8080")
httpServer := server.NewStreamableHTTPServer(s,
server.WithEndpointPath("/api/v1/mcp"),
server.WithHeartbeatInterval(30*time.Second),
server.WithStateLess(false),
server.WithSessionIdleTTL(10*time.Minute), // 10 分钟后清理空闲会话
)
if err := httpServer.Start(":8080"); err != nil {
log.Fatal(err)
}
}端点
标准 MCP 端点
当您启动 StreamableHTTP MCP 服务器时,它会自动创建这些端点:
POST /mcp/initialize - 初始化 MCP 会话
POST /mcp/tools/list - 列出可用工具
POST /mcp/tools/call - 调用工具
POST /mcp/resources/list - 列出可用资源
POST /mcp/resources/read - 读取资源
POST /mcp/prompts/list - 列出可用提示词
POST /mcp/prompts/get - 获取提示词
GET /mcp/health - 健康检查
GET /mcp/capabilities - 服务器能力自定义端点
在旁边添加自定义 HTTP 端点 MCP:
func main() {
s := server.NewMCPServer("Custom StreamableHTTP Server", "1.0.0")
// 创建带有自定义路由的 HTTP 服务器
mux := http.NewServeMux()
// 添加 MCP 端点
mux.Handle("/mcp", server.NewStreamableHTTPServer(s))
// 添加自定义端点
mux.HandleFunc("/api/status", handleStatus)
mux.HandleFunc("/api/metrics", handleMetrics)
mux.HandleFunc("/api/users", handleUsersAPI)
mux.HandleFunc("/api/upload", handleFileUpload)
// 添加中间件
handler := addMiddleware(mux)
log.Println("Starting custom StreamableHTTP server on :8080")
if err := http.ListenAndServe(":8080", handler); err != nil {
log.Fatal(err)
}
}会话管理
会话空闲 TTL
当客户端断开连接而未发送 DELETE 请求时,每个会话的传输状态(工具、资源、日志级别等)可能会泄漏内存。使用 WithSessionIdleTTL 自动清理空闲会话:
httpServer := server.NewStreamableHTTPServer(s,
server.WithSessionIdleTTL(10*time.Minute), // 清理空闲 10+ 分钟的会话
)清理器在后台运行,删除在 TTL 内未收到任何请求的会话的每个会话状态。它还会使被清理的会话 ID 失效,以防止重复使用。零或负 TTL 禁用清理器(默认行为)。清理器在 Shutdown() 时自动停止。
有状态与无状态
无状态设计(推荐)
// 每个请求都是独立的
func handleStatelessTool(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
// 从请求中提取所有需要的信息
userID := extractUserFromToken(ctx)
params := req.Params.Arguments
// 处理时不依赖服务器状态
result, err := processRequest(userID, params)
if err != nil {
return nil, err
}
return mcp.NewToolResultJSON(result)
}有状态设计(需要时)
type HTTPSessionManager struct {
sessions map[string]*HTTPSession
mutex sync.RWMutex
cleanup *time.Ticker
}
type HTTPSession struct {
ID string
UserID string
CreatedAt time.Time
LastAccess time.Time
Data map[string]interface{}
ExpiresAt time.Time
}
func NewHTTPSessionManager() *HTTPSessionManager {
sm := &HTTPSessionManager{
sessions: make(map[string]*HTTPSession),
cleanup: time.NewTicker(1 * time.Minute),
}
go sm.cleanupExpiredSessions()
return sm
}身份验证和授权
type AuthMiddleware struct {
jwtSecret []byte
userStore UserStore
}
func NewAuthMiddleware(secret []byte, store UserStore) *AuthMiddleware {
return &AuthMiddleware{
jwtSecret: secret,
userStore: store,
}
}
func (m *AuthMiddleware) Middleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// 从 Authorization 头提取令牌
authHeader := r.Header.Get("Authorization")
if !strings.HasPrefix(authHeader, "Bearer ") {
http.Error(w, "Missing or invalid authorization header", http.StatusUnauthorized)
return
}
token := strings.TrimPrefix(authHeader, "Bearer ")
// 验证 JWT 令牌
claims, err := m.validateJWT(token)
if err != nil {
http.Error(w, "Invalid token", http.StatusUnauthorized)
return
}
// 加载用户信息
user, err := m.userStore.GetUser(claims.UserID)
if err != nil {
http.Error(w, "User not found", http.StatusUnauthorized)
return
}
// 将用户添加到请求上下文
ctx := context.WithValue(r.Context(), "user", user)
next.ServeHTTP(w, r.WithContext(ctx))
})
}OAuth 保护的资源元数据(RFC 9728)
当您的 MCP 服务器需要 OAuth 时,客户端需要一种发现支持的授权服务器、作用域和承载方法的方法。MCP 授权规范将客户端指向 RFC 9728 /.well-known/oauth-protected-resource 端点来执行此操作。
mcp-go 提供两种方式来服务此元数据。
选项 A:通过服务器选项自动挂载
最简单的方法——启动时自动注册已知端点,在服务器作为 http.Handler 通过 ServeHTTP 挂载时也会被调度。
mcpServer := server.NewMCPServer("my-server", "1.0.0")
httpServer := server.NewStreamableHTTPServer(mcpServer,
server.WithProtectedResourceMetadata(server.ProtectedResourceMetadataConfig{
Resource: "https://my-mcp-server.com",
AuthorizationServers: []string{"https://auth.example.com"},
ScopesSupported: []string{"mcp:read", "mcp:write"},
BearerMethodsSupported: []string{"header"},
ResourceName: "My MCP Server",
}),
)
httpServer.Start(":8080")
// GET http://localhost:8080/.well-known/oauth-protected-resource
// -> 200 application/json 带有上述元数据对于 SSE 服务器,使用等效的 WithSSEProtectedResourceMetadata。
选项 B:用于自定义路由的独立处理程序
如果您构建自己的 http.ServeMux(或使用 chi / gin 等路由器),直接使用 NewProtectedResourceMetadataHandler。
跨域资源共享(CORS)
MCP 端点本身默认不启用 CORS——每个部署都有不同的来源要求,无条件发出允许的 headers 将是安全回归。除非您选择加入,否则托管在与服务器不同来源的基于浏览器的 MCP 客户端将被阻止。
使用 server.WithStreamableHTTPCORS(或 SSE 传输的 server.WithSSECORS)来启用 CORS 处理。
采样支持
StreamableHTTP 传输现在支持双向采样,允许服务器从客户端请求 LLM 完成。这实现了服务器可以利用客户端 LLM 能力的高级场景。
::::warning[安全:需要人工介入] 根据 MCP 规范,实现 应该 始终在有能力拒绝采样请求的用户循环中。
您的采样处理程序实现必须:
- 在执行前向用户显示采样请求供审查
- 允许用户在发送到 LLM 之前查看和编辑提示
- 在返回服务器前显示生成的响应供批准
- 提供清晰的 UI 来接受或拒绝采样请求
未能实现批准流程会造成严重的安全和信任风险。 ::::
采样要求
要使用 StreamableHTTP 传输启用采样,客户端必须使用 WithContinuousListening() 选项:
// 使用采样支持设置客户端
httpTransport, err := transport.NewStreamableHTTP(
serverURL,
transport.WithContinuousListening(), // 采样需要
)
// 使用采样处理程序创建客户端
mcpClient := client.NewClient(httpTransport,
client.WithSamplingHandler(samplingHandler))服务器端实现
在您的 StreamableHTTP 服务器中启用采样:
mcpServer := server.NewMCPServer("HTTP Sampling Server", "1.0.0")
mcpServer.EnableSampling()
// 添加使用采样的工具
mcpServer.AddTool(mcp.Tool{
Name: "ask-llm",
Description: "向 LLM 提问",
// ...
}, func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
// 从客户端请求采样
samplingRequest := mcp.CreateMessageRequest{
// ...
}
result, err := mcpServer.RequestSampling(ctx, samplingRequest)
// ...
})工作原理
- 持久连接:启用
WithContinuousListening()时,客户端维护到服务器的持久 SSE 连接 - 双向通信:服务器可以通过 SSE 流发送采样请求
- 响应通道:客户端通过同一端点的 HTTP POST 响应采样请求
- 会话关联:使用会话 ID 关联响应以确保它们到达正确的处理程序
限制
- 采样需要
WithContinuousListening()来维护 SSE 连接 - 没有持续监听,传输仅在无状态请求/响应模式下运行
- 网络中断可能需要重新连接和重新建立采样通道
在非 net/http 框架中嵌入
StreamableHTTPServer 暴露两个等效的入口点:
ServeHTTP(w http.ResponseWriter, r *http.Request)— 标准net/http处理程序Handle(w HTTPResponseWriter, r *HTTPRequest)— 传输无关的入口点,允许您嵌入 MCP 到不通过net/http的 HTTP 框架(例如 fasthttp 或 fiber)
下一步
- In-Process 传输 - 了解嵌入式场景
- 客户端开发 - 为 HTTP 传输构建 MCP 客户端
- 服务器基础 - 回顾基本服务器概念

