公告

任何建议或需求可联系我!


Skip to content

进程内传输

进程内传输支持在同一个进程内直接集成 MCP 服务器,消除了网络开销,为嵌入式场景提供无缝集成。

适用场景

进程内传输非常适合以下场景:

  • 嵌入式服务器:现有应用程序中的 MCP 功能
  • 测试与开发:快速、可靠的测试,无需网络开销
  • 库集成:作为库组件的 MCP
  • 单进程架构:具有 MCP 能力的单体应用程序

示例应用:

  • 具有插件架构的桌面应用程序
  • 测试框架
  • 嵌入式分析引擎
  • 具有 AI 工具集成的游戏引擎

实现

基本的进程内服务器

go
package main

import (
    "context"
    "fmt"
    "log"

    "github.com/mark3labs/mcp-go/mcp"
    "github.com/mark3labs/mcp-go/server"
    "github.com/mark3labs/mcp-go/client"
)

func main() {
    // 创建服务器
    s := server.NewMCPServer("Calculator Server", "1.0.0",
        server.WithToolCapabilities(true),
    )

    // 添加计算器工具
    s.AddTool(
        mcp.NewTool("calculate",
            mcp.WithDescription("Perform basic mathematical calculations"),
            mcp.WithString("operation", 
                mcp.Required(),
                mcp.Enum("add", "subtract", "multiply", "divide"),
                mcp.Description("The operation to perform"),
            ),
            mcp.WithNumber("x", mcp.Required(), mcp.Description("First number")),
            mcp.WithNumber("y", mcp.Required(), mcp.Description("Second number")),
        ),
        handleCalculate,
    )

    // 创建进程内客户端
    mcpClient, err := client.NewInProcessClient(s)
    if err != nil {
        log.Fatal(err)
    }
    defer mcpClient.Close()

    ctx := context.Background()

    // 初始化
    _, err = mcpClient.Initialize(ctx, mcp.InitializeRequest{
        Params: mcp.InitializeRequestParams{
            ProtocolVersion: "2024-11-05",
            Capabilities: mcp.ClientCapabilities{
                Tools: &mcp.ToolsCapability{},
            },
            ClientInfo: mcp.Implementation{
                Name:    "test-client",
                Version: "1.0.0",
            },
        },
    })
    if err != nil {
        log.Fatal(err)
    }

    // 使用计算器
    result, err := mcpClient.CallTool(ctx, mcp.CallToolRequest{
        Params: mcp.CallToolParams{
            Name: "calculate",
            Arguments: map[string]interface{}{
                "operation": "add",
                "x":         10.0,
                "y":         5.0,
            },
        },
    })
    if err != nil {
        log.Fatal(err)
    }

    // 从第一个内容项中提取文本
    if len(result.Content) > 0 {
        if textContent, ok := mcp.AsTextContent(result.Content[0]); ok {
            fmt.Printf("Result: %s\n", textContent.Text)
        }
    }
}

func handleCalculate(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
    operation := req.GetString("operation", "")
    x := req.GetFloat("x", 0)
    y := req.GetFloat("y", 0)

    var result float64
    switch operation {
    case "add":
        result = x + y
    case "subtract":
        result = x - y
    case "multiply":
        result = x * y
    case "divide":
        if y == 0 {
            return mcp.NewToolResultError("division by zero"), nil
        }
        result = x / y
    default:
        return nil, fmt.Errorf("unknown operation: %s", operation)
    }

    return mcp.NewToolResultText(fmt.Sprintf("%.2f", result)), nil
}

嵌入式应用程序集成

go
// 在更大应用程序中的嵌入式 MCP 服务器
type Application struct {
    mcpServer *server.MCPServer
    mcpClient *client.InProcessClient
    config    *Config
}

func NewApplication(config *Config) *Application {
    app := &Application{
        config: config,
    }

    // 创建嵌入式 MCP 服务器
    app.mcpServer = server.NewMCPServer("Embedded Server", "1.0.0",
        server.WithToolCapabilities(true),
    )

    // 添加应用程序特定的工具
    app.addApplicationTools()

    // 创建用于内部使用的进程内客户端
    var err error
    app.mcpClient, err = client.NewInProcessClient(app.mcpServer)
    if err != nil {
        panic(err)
    }

    return app
}

type Config struct {
    AppName string
    Debug   bool
}

func (app *Application) addApplicationTools() {
    // 应用程序状态工具
    app.mcpServer.AddTool(
        mcp.NewTool("get_app_status",
            mcp.WithDescription("Get current application status"),
        ),
        func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
            return mcp.NewToolResultText(fmt.Sprintf(`{"app_name":"%s","debug":%t,"status":"running"}`, 
                app.config.AppName, app.config.Debug)), nil
        },
    )

    // 配置工具
    app.mcpServer.AddTool(
        mcp.NewTool("update_config",
            mcp.WithDescription("Update application configuration"),
            mcp.WithString("key", mcp.Required()),
            mcp.WithString("value", mcp.Required()),
        ),
        func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
            key := req.GetString("key", "")
            value := req.GetString("value", "")

            // 根据 key 更新配置
            switch key {
            case "debug":
                app.config.Debug = value == "true"
            case "app_name":
                app.config.AppName = value
            default:
                return mcp.NewToolResultError(fmt.Sprintf("unknown config key: %s", key)), nil
            }

            return mcp.NewToolResultText(fmt.Sprintf("Updated %s to %s", key, value)), nil
        },
    )
}

func (app *Application) ProcessWithMCP(ctx context.Context, operation string) (interface{}, error) {
    // 在内部处理中使用 MCP 工具
    result, err := app.mcpClient.CallTool(ctx, mcp.CallToolRequest{
        Params: mcp.CallToolParams{
            Name: "calculate",
            Arguments: map[string]interface{}{
                "operation": operation,
                "x":         10.0,
                "y":         5.0,
            },
        },
    })
    if err != nil {
        return nil, err
    }

    // 从第一个内容项中提取文本
    if len(result.Content) > 0 {
        if textContent, ok := mcp.AsTextContent(result.Content[0]); ok {
            return textContent.Text, nil
        }
    }
    
    return "no result", nil
}

// 使用示例
func main() {
    config := &Config{
        AppName: "My App",
        Debug:   true,
    }

    app := NewApplication(config)
    ctx := context.Background()

    // 初始化嵌入式 MCP 客户端
    _, err := app.mcpClient.Initialize(ctx, mcp.InitializeRequest{
        Params: mcp.InitializeRequestParams{
            ProtocolVersion: "2024-11-05",
            Capabilities: mcp.ClientCapabilities{
                Tools: &mcp.ToolsCapability{},
            },
            ClientInfo: mcp.Implementation{
                Name:    "embedded-client",
                Version: "1.0.0",
            },
        },
    })
    if err != nil {
        log.Fatal(err)
    }

    // 在应用程序中使用 MCP 功能
    result, err := app.ProcessWithMCP(ctx, "add")
    if err != nil {
        log.Fatal(err)
    }

    fmt.Printf("Application result: %v\n", result)
}

采样支持

进程内传输支持采样功能,允许服务器从客户端请求 LLM 完成。这使得双向通信成为可能,服务器可以利用客户端端的 LLM 能力。

启用采样

要在进程内服务器中启用采样,请在服务器设置期间调用 EnableSampling()

go
package main

import (
    "context"
    "fmt"
    "log"

    "github.com/mark3labs/mcp-go/client"
    "github.com/mark3labs/mcp-go/mcp"
    "github.com/mark3labs/mcp-go/server"
)

// MockSamplingHandler 实现 client.SamplingHandler 用于演示
type MockSamplingHandler struct{}

func (h *MockSamplingHandler) CreateMessage(ctx context.Context, request mcp.CreateMessageRequest) (*mcp.CreateMessageResult, error) {
    // 提取用户消息
    var userMessage string
    for _, msg := range request.Messages {
        if msg.Role == mcp.RoleUser {
            if textContent, ok := msg.Content.(mcp.TextContent); ok {
                userMessage = textContent.Text
                break
            }
        }
    }

    // 生成模拟响应
    mockResponse := fmt.Sprintf("Mock LLM response to: '%s'", userMessage)

    return &mcp.CreateMessageResult{
        SamplingMessage: mcp.SamplingMessage{
            Role: mcp.RoleAssistant,
            Content: mcp.TextContent{
                Type: "text",
                Text: mockResponse,
            },
        },
        Model:      "mock-llm-v1",
        StopReason: "endTurn",
    }, nil
}

func main() {
    // 创建启用采样的服务器
    mcpServer := server.NewMCPServer("sampling-server", "1.0.0")
    mcpServer.EnableSampling()

    // 添加使用采样的工具
    mcpServer.AddTool(mcp.Tool{
        Name:        "ask_llm",
        Description: "Ask the LLM a question using sampling",
        InputSchema: mcp.ToolInputSchema{
            Type: "object",
            Properties: map[string]any{
                "question": map[string]any{
                    "type":        "string",
                    "description": "The question to ask the LLM",
                },
                "system_prompt": map[string]any{
                    "type":        "string",
                    "description": "Optional system prompt",
                },
            },
            Required: []string{"question"},
        },
    }, func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
        question, err := request.RequireString("question")
        if err != nil {
            return nil, err
        }

        systemPrompt := request.GetString("system_prompt", "You are a helpful assistant.")

        // 创建采样请求
        samplingRequest := mcp.CreateMessageRequest{
            CreateMessageParams: mcp.CreateMessageParams{
                Messages: []mcp.SamplingMessage{
                    {
                        Role: mcp.RoleUser,
                        Content: mcp.TextContent{
                            Type: "text",
                            Text: question,
                        },
                    },
                },
                SystemPrompt: systemPrompt,
                MaxTokens:    1000,
                Temperature:  0.7,
            },
        }

        // 从客户端请求采样
        result, err := mcpServer.RequestSampling(ctx, samplingRequest)
        if err != nil {
            return &mcp.CallToolResult{
                Content: []mcp.Content{
                    mcp.TextContent{
                        Type: "text",
                        Text: fmt.Sprintf("Error requesting sampling: %v", err),
                    },
                },
                IsError: true,
            }, nil
        }

        // 返回 LLM 响应
        return &mcp.CallToolResult{
            Content: []mcp.Content{
                mcp.TextContent{
                    Type: "text",
                    Text: fmt.Sprintf("LLM Response (model: %s): %s",
                        result.Model, result.Content.(mcp.TextContent).Text),
                },
            },
        }, nil
    })

    // 创建带有采样处理程序的客户端
    mockHandler := &MockSamplingHandler{}
    mcpClient, err := client.NewInProcessClientWithSamplingHandler(mcpServer, mockHandler)
    if err != nil {
        log.Fatalf("Failed to create client: %v", err)
    }
    defer mcpClient.Close()

    // 启动并初始化客户端
    ctx := context.Background()
    if err := mcpClient.Start(ctx); err != nil {
        log.Fatalf("Failed to start client: %v", err)
    }

    initRequest := mcp.InitializeRequest{}
    initRequest.Params.ProtocolVersion = mcp.LATEST_PROTOCOL_VERSION
    initRequest.Params.ClientInfo = mcp.Implementation{
        Name:    "sampling-client",
        Version: "1.0.0",
    }

    _, err = mcpClient.Initialize(ctx, initRequest)
    if err != nil {
        log.Fatalf("Failed to initialize: %v", err)
    }

    // 调用使用采样的工具
    result, err := mcpClient.CallTool(ctx, mcp.CallToolRequest{
        Params: mcp.CallToolParams{
            Name: "ask_llm",
            Arguments: map[string]any{
                "question":      "What is the capital of France?",
                "system_prompt": "You are a helpful geography assistant.",
            },
        },
    })
    if err != nil {
        log.Fatalf("Tool call failed: %v", err)
    }

    // 打印结果
    if len(result.Content) > 0 {
        if textContent, ok := result.Content[0].(mcp.TextContent); ok {
            fmt.Printf("Tool result: %s\n", textContent.Text)
        }
    }
}

真实 LLM 集成

对于生产环境使用,请用真实的 LLM 集成替换模拟处理程序:

OpenAI 集成

go
import (
    "github.com/sashabaranov/go-openai"
)

type OpenAISamplingHandler struct {
    client *openai.Client
}

func NewOpenAISamplingHandler(apiKey string) *OpenAISamplingHandler {
    return &OpenAISamplingHandler{
        client: openai.NewClient(apiKey),
    }
}

func (h *OpenAISamplingHandler) CreateMessage(ctx context.Context, request mcp.CreateMessageRequest) (*mcp.CreateMessageResult, error) {
    // 将 MCP 消息转换为 OpenAI 格式
    var messages []openai.ChatCompletionMessage

    // 如果提供了系统消息则添加
    if request.SystemPrompt != "" {
        messages = append(messages, openai.ChatCompletionMessage{
            Role:    openai.ChatMessageRoleSystem,
            Content: request.SystemPrompt,
        })
    }

    // 转换 MCP 消息
    for _, msg := range request.Messages {
        var role string
        switch msg.Role {
        case mcp.RoleUser:
            role = openai.ChatMessageRoleUser
        case mcp.RoleAssistant:
            role = openai.ChatMessageRoleAssistant
        }

        if textContent, ok := msg.Content.(mcp.TextContent); ok {
            messages = append(messages, openai.ChatCompletionMessage{
                Role:    role,
                Content: textContent.Text,
            })
        }
    }

    // 创建 OpenAI 请求
    req := openai.ChatCompletionRequest{
        Model:       openai.GPT3Dot5Turbo,
        Messages:    messages,
        MaxTokens:   request.MaxTokens,
        Temperature: float32(request.Temperature),
    }

    // 调用 OpenAI API
    resp, err := h.client.CreateChatCompletion(ctx, req)
    if err != nil {
        return nil, fmt.Errorf("OpenAI API call failed: %w", err)
    }

    if len(resp.Choices) == 0 {
        return nil, fmt.Errorf("no response from OpenAI")
    }

    choice := resp.Choices[0]

    // 转换停止原因
    var stopReason string
    switch choice.FinishReason {
    case "stop":
        stopReason = "endTurn"
    case "length":
        stopReason = "maxTokens"
    default:
        stopReason = "other"
    }

    return &mcp.CreateMessageResult{
        SamplingMessage: mcp.SamplingMessage{
            Role: mcp.RoleAssistant,
            Content: mcp.TextContent{
                Type: "text",
                Text: choice.Message.Content,
            },
        },
        Model:      resp.Model,
        StopReason: stopReason,
    }, nil
}

// 使用方法
func main() {
    // 创建 OpenAI 处理程序
    openaiHandler := NewOpenAISamplingHandler("your-api-key")
    
    // 使用 OpenAI 采样创建客户端
    mcpClient, err := client.NewInProcessClientWithSamplingHandler(mcpServer, openaiHandler)
    if err != nil {
        log.Fatal(err)
    }
    
    // ... 其余设置
}

采样请求参数

CreateMessageRequest 支持各种参数来控制 LLM 行为:

go
samplingRequest := mcp.CreateMessageRequest{
    CreateMessageParams: mcp.CreateMessageParams{
        // 必需:发送给 LLM 的消息
        Messages: []mcp.SamplingMessage{
            {
                Role: mcp.RoleUser,        // 或 mcp.RoleAssistant
                Content: mcp.TextContent{   // 或 mcp.ImageContent
                    Type: "text",
                    Text: "Your message here",
                },
            },
        },
        
        // 可选:用于上下文的系统提示
        SystemPrompt: "You are a helpful assistant.",
        
        // 可选:生成的最大令牌数
        MaxTokens: 1000,
        
        // 可选:随机性温度 (0.0 到 1.0)
        Temperature: 0.7,
        
        // 可选:Top-p 采样参数
        TopP: 0.9,
        
        // 可选:停止序列
        StopSequences: []string{"\n\n"},
    },
}

错误处理

始终优雅地处理采样错误:

go
result, err := mcpServer.RequestSampling(ctx, samplingRequest)
if err != nil {
    // 记录错误
    log.Printf("Sampling request failed: %v", err)
    
    // 返回适当的错误响应
    return &mcp.CallToolResult{
        Content: []mcp.Content{
            mcp.TextContent{
                Type: "text",
                Text: "Sorry, I couldn't process your request at this time.",
            },
        },
        IsError: true,
    }, nil
}

后续步骤