采样
学习如何实现能够处理来自服务端采样请求的 MCP 客户端,实现双向通信,客户端向服务端提供 LLM 能力。
概述
采样允许 MCP 客户端响应来自服务端的 LLM 完成请求。当服务端需要生成内容、回答问题或执行推理任务时,它可以向客户端发送采样请求,然后客户端使用 LLM 处理并返回结果。
[关键安全要求]
根据 MCP 规范,采样实现应该始终包含一个人工审核环节,用户能够拒绝采样请求。
你必须实现批准流程:
- 在执行前向用户展示每个采样请求供审核
- 允许用户在发送到 LLM 之前查看和编辑提示词
- 在返回服务端之前显示生成的响应供用户批准
- 在每个阶段提供清晰的 UI 来接受或拒绝请求
没有人批准,你的实现:
- 允许服务端在未经用户同意的情况下发出未经授权的 LLM 请求
- 可能通过未审核的提示词暴露敏感信息
- 从自动采样创建不受控制的 API 成本
- 违反用户信任和安全最佳实践
下面的示例展示了基本的处理器实现。在生产使用之前,你必须添加批准逻辑。
实现采样处理器
通过实现 SamplingHandler 接口创建采样处理器:
package main
import (
"context"
"fmt"
"github.com/mark3labs/mcp-go/client"
"github.com/mark3labs/mcp-go/client/transport"
"github.com/mark3labs/mcp-go/mcp"
)
type MySamplingHandler struct {
// 为你的 LLM 客户端添加字段(OpenAI、Anthropic 等)
}
func (h *MySamplingHandler) CreateMessage(ctx context.Context, request mcp.CreateMessageRequest) (*mcp.CreateMessageResult, error) {
// 提取请求参数
messages := request.Messages
systemPrompt := request.SystemPrompt
maxTokens := request.MaxTokens
temperature := request.Temperature
// 使用你的 LLM 处理
response, err := h.callLLM(ctx, messages, systemPrompt, maxTokens, temperature)
if err != nil {
return nil, fmt.Errorf("LLM call failed: %w", err)
}
// 返回 MCP 格式的结果
return &mcp.CreateMessageResult{
Model: "your-model-name",
Role: mcp.RoleAssistant,
Content: mcp.TextContent{
Type: "text",
Text: response,
},
StopReason: "endTurn",
}, nil
}
func (h *MySamplingHandler) callLLM(ctx context.Context, messages []mcp.SamplingMessage, systemPrompt string, maxTokens int, temperature float64) (string, error) {
// 在这里实现你的 LLM 集成
// 这里你可以调用 OpenAI、Anthropic 或其他 LLM API
return "Your LLM response here", nil
}配置客户端
创建客户端时通过提供处理器来启用采样:
func main() {
// 创建采样处理器
samplingHandler := &MySamplingHandler{}
// 创建 stdio 传输
stdioTransport := transport.NewStdio("/path/to/mcp/server", nil)
// 使用采样支持创建客户端
mcpClient := client.NewClient(stdioTransport, client.WithSamplingHandler(samplingHandler))
// 启动客户端
ctx := context.Background()
if err := mcpClient.Start(ctx); err != nil {
log.Fatalf("Failed to start client: %v", err)
}
defer mcpClient.Close()
if err := mcpClient.Connect(ctx); err != nil {
log.Fatalf("Failed to connect: %v", err)
}
// 客户端现在将自动处理来自服务端的采样请求
// 使用你的处理器
}模拟实现示例
用于测试的完整模拟实现:
package main
import (
import (
"context"
"fmt"
"log"
"strings"
"os"
"github.com/mark3labs/mcp-go/client"
"github.com/mark3labs/mcp-go/mcp"
)
type MockSamplingHandler struct{}
func (h *MockSamplingHandler) CreateMessage(ctx context.Context, request mcp.CreateMessageRequest) (*mcp.CreateMessageResult, error) {
// 记录请求用于调试
log.Printf("Mock LLM received sampling request:")
log.Printf(" System prompt: %s", request.SystemPrompt)
log.Printf(" Max tokens: %d", request.MaxTokens)
log.Printf(" Temperature: %f", request.Temperature)
// 提取用户消息
var userMessage string
for _, msg := range request.Messages {
if msg.Role == mcp.RoleUser {
if textContent, ok := msg.Content.(mcp.TextContent); ok {
userMessage = textContent.Text
log.Printf(" User message: %s", userMessage)
break
}
}
}
// 生成模拟响应
mockResponse := fmt.Sprintf(
"Mock LLM response to: '%s'. This is a simulated response from a mock LLM handler.",
userMessage,
)
return &mcp.CreateMessageResult{
Model: "mock-llm-v1",
Role: mcp.RoleAssistant,
Content: mcp.TextContent{
Type: "text",
Text: mockResponse,
},
StopReason: "endTurn",
}, nil
}
func main() {
if len(os.Args) < 2 {
log.Fatal("Usage: sampling_client <server_path>")
}
serverPath := os.Args[1]
// 创建 stdio 传输
stdioTransport := transport.NewStdio(serverPath, nil)
// 使用模拟采样处理器创建客户端
mcpClient := client.NewClient(stdioTransport, client.WithSamplingHandler(&MockSamplingHandler{}))
// 启动客户端
ctx := context.Background()
if err := mcpClient.Start(ctx); err != nil {
log.Fatalf("Failed to start client: %v", err)
}
defer mcpClient.Close()
if err := mcpClient.Connect(ctx); err != nil {
log.Fatalf("Failed to connect: %v", err)
}
// 测试使用采样的服务端工具
result, err := mcpClient.CallTool(ctx, "ask_llm", map[string]any{
"question": "What is the capital of France?",
"system_prompt": "You are a helpful geography assistant.",
})
if err != nil {
log.Fatalf("Tool call failed: %v", err)
}
fmt.Printf("Tool result: %+v\n", result)
}真实 LLM 集成
OpenAI 集成
import (
"github.com/sashabaranov/go-openai"
)
type OpenAISamplingHandler struct {
client *openai.Client
}
func NewOpenAISamplingHandler(apiKey string) *OpenAISamplingHandler {
return &OpenAISamplingHandler{
client: openai.NewClient(apiKey),
}
}
func (h *OpenAISamplingHandler) CreateMessage(ctx context.Context, request mcp.CreateMessageRequest) (*mcp.CreateMessageResult, error) {
// 将 MCP 消息转换为 OpenAI 格式
var messages []openai.ChatCompletionMessage
// 如果提供了系统消息则添加
if request.SystemPrompt != "" {
messages = append(messages, openai.ChatCompletionMessage{
Role: openai.ChatMessageRoleSystem,
Content: request.SystemPrompt,
})
}
// 转换 MCP 消息
for _, msg := range request.Messages {
var role string
switch msg.Role {
case mcp.RoleUser:
role = openai.ChatMessageRoleUser
case mcp.RoleAssistant:
role = openai.ChatMessageRoleAssistant
}
if textContent, ok := msg.Content.(mcp.TextContent); ok {
messages = append(messages, openai.ChatCompletionMessage{
Role: role,
Content: textContent.Text,
})
}
}
// 创建 OpenAI 请求
req := openai.ChatCompletionRequest{
Model: openai.GPT3Dot5Turbo,
Messages: messages,
MaxTokens: request.MaxTokens,
Temperature: float32(request.Temperature),
}
// 调用 OpenAI API
resp, err := h.client.CreateChatCompletion(ctx, req)
if err != nil {
return nil, fmt.Errorf("OpenAI API call failed: %w", err)
}
if len(resp.Choices) == 0 {
return nil, fmt.Errorf("no response from OpenAI")
}
choice := resp.Choices[0]
// 转换停止原因
var stopReason string
switch choice.FinishReason {
case "stop":
stopReason = "endTurn"
case "length":
stopReason = "maxTokens"
default:
stopReason = "other"
}
return &mcp.CreateMessageResult{
Model: resp.Model,
Role: mcp.RoleAssistant,
Content: mcp.TextContent{
Type: "text",
Text: choice.Message.Content,
},
StopReason: stopReason,
}, nil
}Anthropic 集成
import (
"bytes"
"encoding/json"
"net/http"
)
type AnthropicSamplingHandler struct {
apiKey string
client *http.Client
}
func NewAnthropicSamplingHandler(apiKey string) *AnthropicSamplingHandler {
return &AnthropicSamplingHandler{
apiKey: apiKey,
client: &http.Client{},
}
}
func (h *AnthropicSamplingHandler) CreateMessage(ctx context.Context, request mcp.CreateMessageRequest) (*mcp.CreateMessageResult, error) {
// 转换为 Anthropic 格式
anthropicReq := map[string]any{
"model": "claude-3-sonnet-20240229",
"max_tokens": request.MaxTokens,
"messages": h.convertMessages(request.Messages),
}
if request.SystemPrompt != "" {
anthropicReq["system"] = request.SystemPrompt
}
if request.Temperature > 0 {
anthropicReq["temperature"] = request.Temperature
}
// 发出 API 调用
reqBody, _ := json.Marshal(anthropicReq)
httpReq, _ := http.NewRequestWithContext(ctx, "POST",
"https://api.anthropic.com/v1/messages", bytes.NewBuffer(reqBody))
httpReq.Header.Set("Content-Type", "application/json")
httpReq.Header.Set("x-api-key", h.apiKey)
httpReq.Header.Set("anthropic-version", "2023-06-01")
resp, err := h.client.Do(httpReq)
if err != nil {
return nil, fmt.Errorf("Anthropic API call failed: %w", err)
}
defer resp.Body.Close()
var anthropicResp struct {
Content []struct {
Text string `json:"text"`
Type string `json:"type"`
} `json:"content"`
Model string `json:"model"`
StopReason string `json:"stop_reason"`
}
if err := json.NewDecoder(resp.Body).Decode(&anthropicResp); err != nil {
return nil, fmt.Errorf("failed to decode Anthropic response: %w", err)
}
// 提取文本内容
var text string
for _, content := range anthropicResp.Content {
if content.Type == "text" {
text += content.Text
}
}
return &mcp.CreateMessageResult{
Model: anthropicResp.Model,
Role: mcp.RoleAssistant,
Content: mcp.TextContent{
Type: "text",
Text: text,
},
StopReason: anthropicResp.StopReason,
}, nil
}
func (h *AnthropicSamplingHandler) convertMessages(messages []mcp.SamplingMessage) []map[string]any {
var result []map[string]any
for _, msg := range messages {
if textContent, ok := msg.Content.(mcp.TextContent); ok {
result = append(result, map[string]any{
"role": string(msg.Role),
"content": textContent.Text,
})
}
}
return result
}自动能力声明
当你提供采样处理器时,客户端在初始化期间自动声明采样能力:
// 这会自动添加采样能力
stdioTransport := transport.NewStdio(serverPath, nil)
mcpClient := client.NewClient(stdioTransport, client.WithSamplingHandler(handler))客户端将在初始化请求中包含:
{
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {
"sampling": {}
},
"clientInfo": {
"name": "your-client",
"version": "1.0.0"
}
}
}采样子能力(2025-11-25)
2025-11-25 协议修订将采样分为基线能力(上面的空 "sampling": {} 对象)加上两个可选子能力,客户端使用它们来通告自己实际理解哪些可选的 CreateMessageParams 字段:
type SamplingCapability struct {
// Context 声明客户端遵守 CreateMessageParams.IncludeContext。
// 如果为 nil,服务端应该只发送 IncludeContext "none"(或省略它)。
Context *struct{} `json:"context,omitempty"`
// Tools 声明客户端遵守 CreateMessageParams.Tools 和
// CreateMessageParams.ToolChoice(带工具的采样)。
Tools *struct{} `json:"tools,omitempty"`
}声明两个子能力时对应的 wire 格式:
{
"capabilities": {
"sampling": { "context": {}, "tools": {} }
}
}client.WithSamplingHandler 只通告基线("sampling": {})。尚未暴露通过高级客户端通告可选的 Context / Tools 子能力 —— 当注册采样处理器时,Initialize 流程会覆盖所提供请求上的 Sampling 字段。在添加专用选项之前,需要 wire 上的子能力的代码必须在较低层级声明它们(例如,通过传输直接发送自己的 initialize 请求)。该类型已经到位,以便:
- 服务端可以检查其连接的客户端通过
session.GetClientCapabilities().Sampling通告的内容(参见服务端页面上的能力门控)。 - 自定义客户端集成可以直接构建初始化请求,今天就可以填充子能力。
::::warning[编译时破坏性变更] ClientCapabilities.Sampling 和 ServerCapabilities.Sampling 现在是 *mcp.SamplingCapability(之前是 *struct{})。使用 &struct{}{} 字面量构造能力的代码必须更新为 &mcp.SamplingCapability{}。基线情况的 JSON wire 格式不变。 ::::
处理带工具的采样请求
如果你的处理器声明了 Sampling.Tools,传入的 mcp.CreateMessageRequest 值可能包含 request.Tools(模型允许调用的工具列表)和 request.ToolChoice(控制模型应该多积极地使用它们)。在你的处理器中检查它们并呈现给你的 LLM:
func (h *MySamplingHandler) CreateMessage(ctx context.Context, request mcp.CreateMessageRequest) (*mcp.CreateMessageResult, error) {
var allowedTools []mcp.Tool
if len(request.Tools) > 0 {
allowedTools = request.Tools
}
mode := mcp.ToolChoiceModeAuto // 省略 ToolChoice 时的规范默认值
if request.ToolChoice != nil && request.ToolChoice.Mode != "" {
mode = request.ToolChoice.Mode
}
switch mode {
case mcp.ToolChoiceModeNone:
// 此请求禁用了工具使用。
case mcp.ToolChoiceModeRequired:
// 模型必须至少调用一个工具。
case mcp.ToolChoiceModeAuto:
// 模型决定。
}
// ... 将 `allowedTools` 和 `mode` 转发到你的 LLM 客户端并返回
// 作为 SamplingMessage 的响应(除了 TextContent / ImageContent / AudioContent
// 之外还可能携带 ToolUseContent / ToolResultContent)。
return nil, nil
}构造函数 mcp.NewToolUseContent 和 mcp.NewToolResultContent 构建相应的 SamplingMessage.Content 值,无需手动设置 Type 判别器。
错误处理
在采样处理器中优雅地处理错误:
func (h *MySamplingHandler) CreateMessage(ctx context.Context, request mcp.CreateMessageRequest) (*mcp.CreateMessageResult, error) {
// 验证请求
if len(request.Messages) == 0 {
return nil, fmt.Errorf("no messages provided")
}
// 检查上下文取消
if err := ctx.Err(); err != nil {
return nil, fmt.Errorf("request cancelled: %w", err)
}
// 调用 LLM 并处理错误
response, err := h.callLLM(ctx, request)
if err != nil {
// 记录错误用于调试
log.Printf("LLM call failed: %v", err)
// 返回适当的错误
if strings.Contains(err.Error(), "rate limit") {
return nil, fmt.Errorf("rate limit exceeded, please try again later")
}
return nil, fmt.Errorf("LLM service unavailable: %w", err)
}
return response, nil
}最佳实践
- 实现适当的错误处理:始终优雅地处理 LLM API 错误
- 遵守速率限制:实现速率限制和退避策略
- 验证输入:在处理前检查消息内容和参数
- 使用上下文:遵守上下文取消和超时
- 适当日志记录:为调试记录请求,但避免记录敏感数据
- 模型选择:允许配置使用哪个 LLM 模型
- 内容过滤:根据用例需要实施内容过滤
测试你的实现
使用采样服务端示例测试你的采样处理器:
# 构建采样服务端
cd examples/sampling_server
go build -o sampling_server
# 构建你的客户端
go build -o my_client
# 测试集成
./my_client ./sampling_server传输支持
以下传输方式支持采样:
STDIO 传输
对于 STDIO 客户端,分别创建传输和客户端:
stdioTransport := transport.NewStdio("/path/to/server", nil)
mcpClient := client.NewClient(stdioTransport, client.WithSamplingHandler(&MySamplingHandler{}))进程内传输
对于进程内客户端,使用专用构造函数:
mcpClient, err := client.NewInProcessClientWithSamplingHandler(
mcpServer,
&MySamplingHandler{},
)进程内采样使用直接方法调用而不是 JSON-RPC 序列化。
不支持的传输
由于 SSE 和 StreamableHTTP 传输的单向或无状态性质,它们不支持采样。

