公告

任何建议或需求可联系我!


Skip to content

采样

学习如何实现能够处理来自服务端采样请求的 MCP 客户端,实现双向通信,客户端向服务端提供 LLM 能力。

概述

采样允许 MCP 客户端响应来自服务端的 LLM 完成请求。当服务端需要生成内容、回答问题或执行推理任务时,它可以向客户端发送采样请求,然后客户端使用 LLM 处理并返回结果。

[关键安全要求]

根据 MCP 规范,采样实现应该始终包含一个人工审核环节,用户能够拒绝采样请求。

你必须实现批准流程:

  • 在执行前向用户展示每个采样请求供审核
  • 允许用户在发送到 LLM 之前查看和编辑提示词
  • 在返回服务端之前显示生成的响应供用户批准
  • 在每个阶段提供清晰的 UI 来接受或拒绝请求

没有人批准,你的实现:

  • 允许服务端在未经用户同意的情况下发出未经授权的 LLM 请求
  • 可能通过未审核的提示词暴露敏感信息
  • 从自动采样创建不受控制的 API 成本
  • 违反用户信任和安全最佳实践

下面的示例展示了基本的处理器实现。在生产使用之前,你必须添加批准逻辑

实现采样处理器

通过实现 SamplingHandler 接口创建采样处理器:

go
package main

import (
    "context"
    "fmt"
    
    "github.com/mark3labs/mcp-go/client"
    "github.com/mark3labs/mcp-go/client/transport"
    "github.com/mark3labs/mcp-go/mcp"
)

type MySamplingHandler struct {
    // 为你的 LLM 客户端添加字段(OpenAI、Anthropic 等)
}

func (h *MySamplingHandler) CreateMessage(ctx context.Context, request mcp.CreateMessageRequest) (*mcp.CreateMessageResult, error) {
    // 提取请求参数
    messages := request.Messages
    systemPrompt := request.SystemPrompt
    maxTokens := request.MaxTokens
    temperature := request.Temperature
    
    // 使用你的 LLM 处理
    response, err := h.callLLM(ctx, messages, systemPrompt, maxTokens, temperature)
    if err != nil {
        return nil, fmt.Errorf("LLM call failed: %w", err)
    }
    
    // 返回 MCP 格式的结果
    return &mcp.CreateMessageResult{
        Model: "your-model-name",
        Role:  mcp.RoleAssistant,
        Content: mcp.TextContent{
            Type: "text",
            Text: response,
        },
        StopReason: "endTurn",
    }, nil
}

func (h *MySamplingHandler) callLLM(ctx context.Context, messages []mcp.SamplingMessage, systemPrompt string, maxTokens int, temperature float64) (string, error) {
    // 在这里实现你的 LLM 集成
    // 这里你可以调用 OpenAI、Anthropic 或其他 LLM API
    return "Your LLM response here", nil
}

配置客户端

创建客户端时通过提供处理器来启用采样:

go
func main() {
    // 创建采样处理器
    samplingHandler := &MySamplingHandler{}
    
    // 创建 stdio 传输
    stdioTransport := transport.NewStdio("/path/to/mcp/server", nil)
    
    // 使用采样支持创建客户端
    mcpClient := client.NewClient(stdioTransport, client.WithSamplingHandler(samplingHandler))
    
    // 启动客户端
    ctx := context.Background()
    if err := mcpClient.Start(ctx); err != nil {
        log.Fatalf("Failed to start client: %v", err)
    }
    defer mcpClient.Close()
    if err := mcpClient.Connect(ctx); err != nil {
        log.Fatalf("Failed to connect: %v", err)
    }
    
    // 客户端现在将自动处理来自服务端的采样请求
    // 使用你的处理器
}

模拟实现示例

用于测试的完整模拟实现:

go
package main

import (
import (
    "context"
    "fmt"
    "log"
    "strings"
    "os"

    "github.com/mark3labs/mcp-go/client"
    "github.com/mark3labs/mcp-go/mcp"
)
type MockSamplingHandler struct{}

func (h *MockSamplingHandler) CreateMessage(ctx context.Context, request mcp.CreateMessageRequest) (*mcp.CreateMessageResult, error) {
    // 记录请求用于调试
    log.Printf("Mock LLM received sampling request:")
    log.Printf("  System prompt: %s", request.SystemPrompt)
    log.Printf("  Max tokens: %d", request.MaxTokens)
    log.Printf("  Temperature: %f", request.Temperature)
    
    // 提取用户消息
    var userMessage string
    for _, msg := range request.Messages {
        if msg.Role == mcp.RoleUser {
            if textContent, ok := msg.Content.(mcp.TextContent); ok {
                userMessage = textContent.Text
                log.Printf("  User message: %s", userMessage)
                break
            }
        }
    }
    
    // 生成模拟响应
    mockResponse := fmt.Sprintf(
        "Mock LLM response to: '%s'. This is a simulated response from a mock LLM handler.",
        userMessage,
    )
    
    return &mcp.CreateMessageResult{
        Model: "mock-llm-v1",
        Role:  mcp.RoleAssistant,
        Content: mcp.TextContent{
            Type: "text",
            Text: mockResponse,
        },
        StopReason: "endTurn",
    }, nil
}

func main() {
    if len(os.Args) < 2 {
        log.Fatal("Usage: sampling_client <server_path>")
    }
    
    serverPath := os.Args[1]
    
    // 创建 stdio 传输
    stdioTransport := transport.NewStdio(serverPath, nil)
    
    // 使用模拟采样处理器创建客户端
    mcpClient := client.NewClient(stdioTransport, client.WithSamplingHandler(&MockSamplingHandler{}))
    
    // 启动客户端
    ctx := context.Background()
    if err := mcpClient.Start(ctx); err != nil {
        log.Fatalf("Failed to start client: %v", err)
    }
    defer mcpClient.Close()
    if err := mcpClient.Connect(ctx); err != nil {
        log.Fatalf("Failed to connect: %v", err)
    }
    
    // 测试使用采样的服务端工具
    result, err := mcpClient.CallTool(ctx, "ask_llm", map[string]any{
        "question": "What is the capital of France?",
        "system_prompt": "You are a helpful geography assistant.",
    })
    if err != nil {
        log.Fatalf("Tool call failed: %v", err)
    }
    
    fmt.Printf("Tool result: %+v\n", result)
}

真实 LLM 集成

OpenAI 集成

go
import (
    "github.com/sashabaranov/go-openai"
)

type OpenAISamplingHandler struct {
    client *openai.Client
}

func NewOpenAISamplingHandler(apiKey string) *OpenAISamplingHandler {
    return &OpenAISamplingHandler{
        client: openai.NewClient(apiKey),
    }
}

func (h *OpenAISamplingHandler) CreateMessage(ctx context.Context, request mcp.CreateMessageRequest) (*mcp.CreateMessageResult, error) {
    // 将 MCP 消息转换为 OpenAI 格式
    var messages []openai.ChatCompletionMessage
    
    // 如果提供了系统消息则添加
    if request.SystemPrompt != "" {
        messages = append(messages, openai.ChatCompletionMessage{
            Role:    openai.ChatMessageRoleSystem,
            Content: request.SystemPrompt,
        })
    }
    
    // 转换 MCP 消息
    for _, msg := range request.Messages {
        var role string
        switch msg.Role {
        case mcp.RoleUser:
            role = openai.ChatMessageRoleUser
        case mcp.RoleAssistant:
            role = openai.ChatMessageRoleAssistant
        }
        
        if textContent, ok := msg.Content.(mcp.TextContent); ok {
            messages = append(messages, openai.ChatCompletionMessage{
                Role:    role,
                Content: textContent.Text,
            })
        }
    }
    
    // 创建 OpenAI 请求
    req := openai.ChatCompletionRequest{
        Model:       openai.GPT3Dot5Turbo,
        Messages:    messages,
        MaxTokens:   request.MaxTokens,
        Temperature: float32(request.Temperature),
    }
    
    // 调用 OpenAI API
    resp, err := h.client.CreateChatCompletion(ctx, req)
    if err != nil {
        return nil, fmt.Errorf("OpenAI API call failed: %w", err)
    }
    
    if len(resp.Choices) == 0 {
        return nil, fmt.Errorf("no response from OpenAI")
    }
    
    choice := resp.Choices[0]
    
    // 转换停止原因
    var stopReason string
    switch choice.FinishReason {
    case "stop":
        stopReason = "endTurn"
    case "length":
        stopReason = "maxTokens"
    default:
        stopReason = "other"
    }
    
    return &mcp.CreateMessageResult{
        Model: resp.Model,
        Role:  mcp.RoleAssistant,
        Content: mcp.TextContent{
            Type: "text",
            Text: choice.Message.Content,
        },
        StopReason: stopReason,
    }, nil
}

Anthropic 集成

go
import (
    "bytes"
    "encoding/json"
    "net/http"
)

type AnthropicSamplingHandler struct {
    apiKey string
    client *http.Client
}

func NewAnthropicSamplingHandler(apiKey string) *AnthropicSamplingHandler {
    return &AnthropicSamplingHandler{
        apiKey: apiKey,
        client: &http.Client{},
    }
}

func (h *AnthropicSamplingHandler) CreateMessage(ctx context.Context, request mcp.CreateMessageRequest) (*mcp.CreateMessageResult, error) {
    // 转换为 Anthropic 格式
    anthropicReq := map[string]any{
        "model":      "claude-3-sonnet-20240229",
        "max_tokens": request.MaxTokens,
        "messages":   h.convertMessages(request.Messages),
    }
    
    if request.SystemPrompt != "" {
        anthropicReq["system"] = request.SystemPrompt
    }
    
    if request.Temperature > 0 {
        anthropicReq["temperature"] = request.Temperature
    }
    
    // 发出 API 调用
    reqBody, _ := json.Marshal(anthropicReq)
    httpReq, _ := http.NewRequestWithContext(ctx, "POST", 
        "https://api.anthropic.com/v1/messages", bytes.NewBuffer(reqBody))
    
    httpReq.Header.Set("Content-Type", "application/json")
    httpReq.Header.Set("x-api-key", h.apiKey)
    httpReq.Header.Set("anthropic-version", "2023-06-01")
    
    resp, err := h.client.Do(httpReq)
    if err != nil {
        return nil, fmt.Errorf("Anthropic API call failed: %w", err)
    }
    defer resp.Body.Close()
    
    var anthropicResp struct {
        Content []struct {
            Text string `json:"text"`
            Type string `json:"type"`
        } `json:"content"`
        Model      string `json:"model"`
        StopReason string `json:"stop_reason"`
    }
    
    if err := json.NewDecoder(resp.Body).Decode(&anthropicResp); err != nil {
        return nil, fmt.Errorf("failed to decode Anthropic response: %w", err)
    }
    
    // 提取文本内容
    var text string
    for _, content := range anthropicResp.Content {
        if content.Type == "text" {
            text += content.Text
        }
    }
    
    return &mcp.CreateMessageResult{
        Model: anthropicResp.Model,
        Role:  mcp.RoleAssistant,
        Content: mcp.TextContent{
            Type: "text",
            Text: text,
        },
        StopReason: anthropicResp.StopReason,
    }, nil
}

func (h *AnthropicSamplingHandler) convertMessages(messages []mcp.SamplingMessage) []map[string]any {
    var result []map[string]any
    for _, msg := range messages {
        if textContent, ok := msg.Content.(mcp.TextContent); ok {
            result = append(result, map[string]any{
                "role":    string(msg.Role),
                "content": textContent.Text,
            })
        }
    }
    return result
}

自动能力声明

当你提供采样处理器时,客户端在初始化期间自动声明采样能力:

go
// 这会自动添加采样能力
stdioTransport := transport.NewStdio(serverPath, nil)
mcpClient := client.NewClient(stdioTransport, client.WithSamplingHandler(handler))

客户端将在初始化请求中包含:

json
{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "initialize",
  "params": {
    "protocolVersion": "2024-11-05",
    "capabilities": {
      "sampling": {}
    },
    "clientInfo": {
      "name": "your-client",
      "version": "1.0.0"
    }
  }
}

采样子能力(2025-11-25)

2025-11-25 协议修订将采样分为基线能力(上面的空 "sampling": {} 对象)加上两个可选子能力,客户端使用它们来通告自己实际理解哪些可选的 CreateMessageParams 字段:

go
type SamplingCapability struct {
    // Context 声明客户端遵守 CreateMessageParams.IncludeContext。
    // 如果为 nil,服务端应该只发送 IncludeContext "none"(或省略它)。
    Context *struct{} `json:"context,omitempty"`
    // Tools 声明客户端遵守 CreateMessageParams.Tools 和
    // CreateMessageParams.ToolChoice(带工具的采样)。
    Tools   *struct{} `json:"tools,omitempty"`
}

声明两个子能力时对应的 wire 格式:

json
{
  "capabilities": {
    "sampling": { "context": {}, "tools": {} }
  }
}

client.WithSamplingHandler 只通告基线("sampling": {})。尚未暴露通过高级客户端通告可选的 Context / Tools 子能力 —— 当注册采样处理器时,Initialize 流程会覆盖所提供请求上的 Sampling 字段。在添加专用选项之前,需要 wire 上的子能力的代码必须在较低层级声明它们(例如,通过传输直接发送自己的 initialize 请求)。该类型已经到位,以便:

  • 服务端可以检查其连接的客户端通过 session.GetClientCapabilities().Sampling 通告的内容(参见服务端页面上的能力门控)。
  • 自定义客户端集成可以直接构建初始化请求,今天就可以填充子能力。

::::warning[编译时破坏性变更] ClientCapabilities.SamplingServerCapabilities.Sampling 现在是 *mcp.SamplingCapability(之前是 *struct{})。使用 &struct{}{} 字面量构造能力的代码必须更新为 &mcp.SamplingCapability{}。基线情况的 JSON wire 格式不变。 ::::

处理带工具的采样请求

如果你的处理器声明了 Sampling.Tools,传入的 mcp.CreateMessageRequest 值可能包含 request.Tools(模型允许调用的工具列表)和 request.ToolChoice(控制模型应该多积极地使用它们)。在你的处理器中检查它们并呈现给你的 LLM:

go
func (h *MySamplingHandler) CreateMessage(ctx context.Context, request mcp.CreateMessageRequest) (*mcp.CreateMessageResult, error) {
    var allowedTools []mcp.Tool
    if len(request.Tools) > 0 {
        allowedTools = request.Tools
    }

    mode := mcp.ToolChoiceModeAuto // 省略 ToolChoice 时的规范默认值
    if request.ToolChoice != nil && request.ToolChoice.Mode != "" {
        mode = request.ToolChoice.Mode
    }

    switch mode {
    case mcp.ToolChoiceModeNone:
        // 此请求禁用了工具使用。
    case mcp.ToolChoiceModeRequired:
        // 模型必须至少调用一个工具。
    case mcp.ToolChoiceModeAuto:
        // 模型决定。
    }

    // ... 将 `allowedTools` 和 `mode` 转发到你的 LLM 客户端并返回
    // 作为 SamplingMessage 的响应(除了 TextContent / ImageContent / AudioContent
    // 之外还可能携带 ToolUseContent / ToolResultContent)。
    return nil, nil
}

构造函数 mcp.NewToolUseContentmcp.NewToolResultContent 构建相应的 SamplingMessage.Content 值,无需手动设置 Type 判别器。

错误处理

在采样处理器中优雅地处理错误:

go
func (h *MySamplingHandler) CreateMessage(ctx context.Context, request mcp.CreateMessageRequest) (*mcp.CreateMessageResult, error) {
    // 验证请求
    if len(request.Messages) == 0 {
        return nil, fmt.Errorf("no messages provided")
    }
    
    // 检查上下文取消
    if err := ctx.Err(); err != nil {
        return nil, fmt.Errorf("request cancelled: %w", err)
    }
    
    // 调用 LLM 并处理错误
    response, err := h.callLLM(ctx, request)
    if err != nil {
        // 记录错误用于调试
        log.Printf("LLM call failed: %v", err)
        
        // 返回适当的错误
        if strings.Contains(err.Error(), "rate limit") {
            return nil, fmt.Errorf("rate limit exceeded, please try again later")
        }
        return nil, fmt.Errorf("LLM service unavailable: %w", err)
    }
    
    return response, nil
}

最佳实践

  1. 实现适当的错误处理:始终优雅地处理 LLM API 错误
  2. 遵守速率限制:实现速率限制和退避策略
  3. 验证输入:在处理前检查消息内容和参数
  4. 使用上下文:遵守上下文取消和超时
  5. 适当日志记录:为调试记录请求,但避免记录敏感数据
  6. 模型选择:允许配置使用哪个 LLM 模型
  7. 内容过滤:根据用例需要实施内容过滤

测试你的实现

使用采样服务端示例测试你的采样处理器:

bash
# 构建采样服务端
cd examples/sampling_server
go build -o sampling_server

# 构建你的客户端
go build -o my_client

# 测试集成
./my_client ./sampling_server

传输支持

以下传输方式支持采样:

STDIO 传输

对于 STDIO 客户端,分别创建传输和客户端:

go
stdioTransport := transport.NewStdio("/path/to/server", nil)
mcpClient := client.NewClient(stdioTransport, client.WithSamplingHandler(&MySamplingHandler{}))

进程内传输

对于进程内客户端,使用专用构造函数:

go
mcpClient, err := client.NewInProcessClientWithSamplingHandler(
    mcpServer, 
    &MySamplingHandler{},
)

进程内采样使用直接方法调用而不是 JSON-RPC 序列化。

不支持的传输

由于 SSE 和 StreamableHTTP 传输的单向或无状态性质,它们不支持采样。

下一步