进程内传输
进程内传输支持在同一个进程内直接集成 MCP 服务器,消除了网络开销,为嵌入式场景提供无缝集成。
适用场景
进程内传输非常适合以下场景:
- 嵌入式服务器:现有应用程序中的 MCP 功能
- 测试与开发:快速、可靠的测试,无需网络开销
- 库集成:作为库组件的 MCP
- 单进程架构:具有 MCP 能力的单体应用程序
示例应用:
- 具有插件架构的桌面应用程序
- 测试框架
- 嵌入式分析引擎
- 具有 AI 工具集成的游戏引擎
实现
基本的进程内服务器
go
package main
import (
"context"
"fmt"
"log"
"github.com/mark3labs/mcp-go/mcp"
"github.com/mark3labs/mcp-go/server"
"github.com/mark3labs/mcp-go/client"
)
func main() {
// 创建服务器
s := server.NewMCPServer("Calculator Server", "1.0.0",
server.WithToolCapabilities(true),
)
// 添加计算器工具
s.AddTool(
mcp.NewTool("calculate",
mcp.WithDescription("Perform basic mathematical calculations"),
mcp.WithString("operation",
mcp.Required(),
mcp.Enum("add", "subtract", "multiply", "divide"),
mcp.Description("The operation to perform"),
),
mcp.WithNumber("x", mcp.Required(), mcp.Description("First number")),
mcp.WithNumber("y", mcp.Required(), mcp.Description("Second number")),
),
handleCalculate,
)
// 创建进程内客户端
mcpClient, err := client.NewInProcessClient(s)
if err != nil {
log.Fatal(err)
}
defer mcpClient.Close()
ctx := context.Background()
// 初始化
_, err = mcpClient.Initialize(ctx, mcp.InitializeRequest{
Params: mcp.InitializeRequestParams{
ProtocolVersion: "2024-11-05",
Capabilities: mcp.ClientCapabilities{
Tools: &mcp.ToolsCapability{},
},
ClientInfo: mcp.Implementation{
Name: "test-client",
Version: "1.0.0",
},
},
})
if err != nil {
log.Fatal(err)
}
// 使用计算器
result, err := mcpClient.CallTool(ctx, mcp.CallToolRequest{
Params: mcp.CallToolParams{
Name: "calculate",
Arguments: map[string]interface{}{
"operation": "add",
"x": 10.0,
"y": 5.0,
},
},
})
if err != nil {
log.Fatal(err)
}
// 从第一个内容项中提取文本
if len(result.Content) > 0 {
if textContent, ok := mcp.AsTextContent(result.Content[0]); ok {
fmt.Printf("Result: %s\n", textContent.Text)
}
}
}
func handleCalculate(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
operation := req.GetString("operation", "")
x := req.GetFloat("x", 0)
y := req.GetFloat("y", 0)
var result float64
switch operation {
case "add":
result = x + y
case "subtract":
result = x - y
case "multiply":
result = x * y
case "divide":
if y == 0 {
return mcp.NewToolResultError("division by zero"), nil
}
result = x / y
default:
return nil, fmt.Errorf("unknown operation: %s", operation)
}
return mcp.NewToolResultText(fmt.Sprintf("%.2f", result)), nil
}嵌入式应用程序集成
go
// 在更大应用程序中的嵌入式 MCP 服务器
type Application struct {
mcpServer *server.MCPServer
mcpClient *client.InProcessClient
config *Config
}
func NewApplication(config *Config) *Application {
app := &Application{
config: config,
}
// 创建嵌入式 MCP 服务器
app.mcpServer = server.NewMCPServer("Embedded Server", "1.0.0",
server.WithToolCapabilities(true),
)
// 添加应用程序特定的工具
app.addApplicationTools()
// 创建用于内部使用的进程内客户端
var err error
app.mcpClient, err = client.NewInProcessClient(app.mcpServer)
if err != nil {
panic(err)
}
return app
}
type Config struct {
AppName string
Debug bool
}
func (app *Application) addApplicationTools() {
// 应用程序状态工具
app.mcpServer.AddTool(
mcp.NewTool("get_app_status",
mcp.WithDescription("Get current application status"),
),
func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
return mcp.NewToolResultText(fmt.Sprintf(`{"app_name":"%s","debug":%t,"status":"running"}`,
app.config.AppName, app.config.Debug)), nil
},
)
// 配置工具
app.mcpServer.AddTool(
mcp.NewTool("update_config",
mcp.WithDescription("Update application configuration"),
mcp.WithString("key", mcp.Required()),
mcp.WithString("value", mcp.Required()),
),
func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
key := req.GetString("key", "")
value := req.GetString("value", "")
// 根据 key 更新配置
switch key {
case "debug":
app.config.Debug = value == "true"
case "app_name":
app.config.AppName = value
default:
return mcp.NewToolResultError(fmt.Sprintf("unknown config key: %s", key)), nil
}
return mcp.NewToolResultText(fmt.Sprintf("Updated %s to %s", key, value)), nil
},
)
}
func (app *Application) ProcessWithMCP(ctx context.Context, operation string) (interface{}, error) {
// 在内部处理中使用 MCP 工具
result, err := app.mcpClient.CallTool(ctx, mcp.CallToolRequest{
Params: mcp.CallToolParams{
Name: "calculate",
Arguments: map[string]interface{}{
"operation": operation,
"x": 10.0,
"y": 5.0,
},
},
})
if err != nil {
return nil, err
}
// 从第一个内容项中提取文本
if len(result.Content) > 0 {
if textContent, ok := mcp.AsTextContent(result.Content[0]); ok {
return textContent.Text, nil
}
}
return "no result", nil
}
// 使用示例
func main() {
config := &Config{
AppName: "My App",
Debug: true,
}
app := NewApplication(config)
ctx := context.Background()
// 初始化嵌入式 MCP 客户端
_, err := app.mcpClient.Initialize(ctx, mcp.InitializeRequest{
Params: mcp.InitializeRequestParams{
ProtocolVersion: "2024-11-05",
Capabilities: mcp.ClientCapabilities{
Tools: &mcp.ToolsCapability{},
},
ClientInfo: mcp.Implementation{
Name: "embedded-client",
Version: "1.0.0",
},
},
})
if err != nil {
log.Fatal(err)
}
// 在应用程序中使用 MCP 功能
result, err := app.ProcessWithMCP(ctx, "add")
if err != nil {
log.Fatal(err)
}
fmt.Printf("Application result: %v\n", result)
}采样支持
进程内传输支持采样功能,允许服务器从客户端请求 LLM 完成。这使得双向通信成为可能,服务器可以利用客户端端的 LLM 能力。
启用采样
要在进程内服务器中启用采样,请在服务器设置期间调用 EnableSampling():
go
package main
import (
"context"
"fmt"
"log"
"github.com/mark3labs/mcp-go/client"
"github.com/mark3labs/mcp-go/mcp"
"github.com/mark3labs/mcp-go/server"
)
// MockSamplingHandler 实现 client.SamplingHandler 用于演示
type MockSamplingHandler struct{}
func (h *MockSamplingHandler) CreateMessage(ctx context.Context, request mcp.CreateMessageRequest) (*mcp.CreateMessageResult, error) {
// 提取用户消息
var userMessage string
for _, msg := range request.Messages {
if msg.Role == mcp.RoleUser {
if textContent, ok := msg.Content.(mcp.TextContent); ok {
userMessage = textContent.Text
break
}
}
}
// 生成模拟响应
mockResponse := fmt.Sprintf("Mock LLM response to: '%s'", userMessage)
return &mcp.CreateMessageResult{
SamplingMessage: mcp.SamplingMessage{
Role: mcp.RoleAssistant,
Content: mcp.TextContent{
Type: "text",
Text: mockResponse,
},
},
Model: "mock-llm-v1",
StopReason: "endTurn",
}, nil
}
func main() {
// 创建启用采样的服务器
mcpServer := server.NewMCPServer("sampling-server", "1.0.0")
mcpServer.EnableSampling()
// 添加使用采样的工具
mcpServer.AddTool(mcp.Tool{
Name: "ask_llm",
Description: "Ask the LLM a question using sampling",
InputSchema: mcp.ToolInputSchema{
Type: "object",
Properties: map[string]any{
"question": map[string]any{
"type": "string",
"description": "The question to ask the LLM",
},
"system_prompt": map[string]any{
"type": "string",
"description": "Optional system prompt",
},
},
Required: []string{"question"},
},
}, func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
question, err := request.RequireString("question")
if err != nil {
return nil, err
}
systemPrompt := request.GetString("system_prompt", "You are a helpful assistant.")
// 创建采样请求
samplingRequest := mcp.CreateMessageRequest{
CreateMessageParams: mcp.CreateMessageParams{
Messages: []mcp.SamplingMessage{
{
Role: mcp.RoleUser,
Content: mcp.TextContent{
Type: "text",
Text: question,
},
},
},
SystemPrompt: systemPrompt,
MaxTokens: 1000,
Temperature: 0.7,
},
}
// 从客户端请求采样
result, err := mcpServer.RequestSampling(ctx, samplingRequest)
if err != nil {
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("Error requesting sampling: %v", err),
},
},
IsError: true,
}, nil
}
// 返回 LLM 响应
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("LLM Response (model: %s): %s",
result.Model, result.Content.(mcp.TextContent).Text),
},
},
}, nil
})
// 创建带有采样处理程序的客户端
mockHandler := &MockSamplingHandler{}
mcpClient, err := client.NewInProcessClientWithSamplingHandler(mcpServer, mockHandler)
if err != nil {
log.Fatalf("Failed to create client: %v", err)
}
defer mcpClient.Close()
// 启动并初始化客户端
ctx := context.Background()
if err := mcpClient.Start(ctx); err != nil {
log.Fatalf("Failed to start client: %v", err)
}
initRequest := mcp.InitializeRequest{}
initRequest.Params.ProtocolVersion = mcp.LATEST_PROTOCOL_VERSION
initRequest.Params.ClientInfo = mcp.Implementation{
Name: "sampling-client",
Version: "1.0.0",
}
_, err = mcpClient.Initialize(ctx, initRequest)
if err != nil {
log.Fatalf("Failed to initialize: %v", err)
}
// 调用使用采样的工具
result, err := mcpClient.CallTool(ctx, mcp.CallToolRequest{
Params: mcp.CallToolParams{
Name: "ask_llm",
Arguments: map[string]any{
"question": "What is the capital of France?",
"system_prompt": "You are a helpful geography assistant.",
},
},
})
if err != nil {
log.Fatalf("Tool call failed: %v", err)
}
// 打印结果
if len(result.Content) > 0 {
if textContent, ok := result.Content[0].(mcp.TextContent); ok {
fmt.Printf("Tool result: %s\n", textContent.Text)
}
}
}真实 LLM 集成
对于生产环境使用,请用真实的 LLM 集成替换模拟处理程序:
OpenAI 集成
go
import (
"github.com/sashabaranov/go-openai"
)
type OpenAISamplingHandler struct {
client *openai.Client
}
func NewOpenAISamplingHandler(apiKey string) *OpenAISamplingHandler {
return &OpenAISamplingHandler{
client: openai.NewClient(apiKey),
}
}
func (h *OpenAISamplingHandler) CreateMessage(ctx context.Context, request mcp.CreateMessageRequest) (*mcp.CreateMessageResult, error) {
// 将 MCP 消息转换为 OpenAI 格式
var messages []openai.ChatCompletionMessage
// 如果提供了系统消息则添加
if request.SystemPrompt != "" {
messages = append(messages, openai.ChatCompletionMessage{
Role: openai.ChatMessageRoleSystem,
Content: request.SystemPrompt,
})
}
// 转换 MCP 消息
for _, msg := range request.Messages {
var role string
switch msg.Role {
case mcp.RoleUser:
role = openai.ChatMessageRoleUser
case mcp.RoleAssistant:
role = openai.ChatMessageRoleAssistant
}
if textContent, ok := msg.Content.(mcp.TextContent); ok {
messages = append(messages, openai.ChatCompletionMessage{
Role: role,
Content: textContent.Text,
})
}
}
// 创建 OpenAI 请求
req := openai.ChatCompletionRequest{
Model: openai.GPT3Dot5Turbo,
Messages: messages,
MaxTokens: request.MaxTokens,
Temperature: float32(request.Temperature),
}
// 调用 OpenAI API
resp, err := h.client.CreateChatCompletion(ctx, req)
if err != nil {
return nil, fmt.Errorf("OpenAI API call failed: %w", err)
}
if len(resp.Choices) == 0 {
return nil, fmt.Errorf("no response from OpenAI")
}
choice := resp.Choices[0]
// 转换停止原因
var stopReason string
switch choice.FinishReason {
case "stop":
stopReason = "endTurn"
case "length":
stopReason = "maxTokens"
default:
stopReason = "other"
}
return &mcp.CreateMessageResult{
SamplingMessage: mcp.SamplingMessage{
Role: mcp.RoleAssistant,
Content: mcp.TextContent{
Type: "text",
Text: choice.Message.Content,
},
},
Model: resp.Model,
StopReason: stopReason,
}, nil
}
// 使用方法
func main() {
// 创建 OpenAI 处理程序
openaiHandler := NewOpenAISamplingHandler("your-api-key")
// 使用 OpenAI 采样创建客户端
mcpClient, err := client.NewInProcessClientWithSamplingHandler(mcpServer, openaiHandler)
if err != nil {
log.Fatal(err)
}
// ... 其余设置
}采样请求参数
CreateMessageRequest 支持各种参数来控制 LLM 行为:
go
samplingRequest := mcp.CreateMessageRequest{
CreateMessageParams: mcp.CreateMessageParams{
// 必需:发送给 LLM 的消息
Messages: []mcp.SamplingMessage{
{
Role: mcp.RoleUser, // 或 mcp.RoleAssistant
Content: mcp.TextContent{ // 或 mcp.ImageContent
Type: "text",
Text: "Your message here",
},
},
},
// 可选:用于上下文的系统提示
SystemPrompt: "You are a helpful assistant.",
// 可选:生成的最大令牌数
MaxTokens: 1000,
// 可选:随机性温度 (0.0 到 1.0)
Temperature: 0.7,
// 可选:Top-p 采样参数
TopP: 0.9,
// 可选:停止序列
StopSequences: []string{"\n\n"},
},
}错误处理
始终优雅地处理采样错误:
go
result, err := mcpServer.RequestSampling(ctx, samplingRequest)
if err != nil {
// 记录错误
log.Printf("Sampling request failed: %v", err)
// 返回适当的错误响应
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: "Sorry, I couldn't process your request at this time.",
},
},
IsError: true,
}, nil
}
