公告

任何建议或需求可联系我!


Skip to content

STDIO 传输

STDIO(标准输入/输出)传输是最常见的 MCP 传输方式,非常适合命令行工具、桌面应用程序和本地集成。

适用场景

STDIO 传输在以下场景中表现出色:

  • 命令行工具:LLM 可以调用的 CLI 工具
  • 桌面应用程序:IDE 插件、文本编辑器、本地工具
  • 子进程通信:管理 MCP 服务器的父进程
  • 本地开发:测试和调试 MCP 实现
  • 单用户场景:个人生产力工具

示例应用:

  • IDE 文件系统浏览器
  • 本地数据库查询工具
  • Git 仓库分析器
  • 系统监控工具
  • 开发工作流自动化

实现

基本的 STDIO 服务器

go
package main

import (
    "context"
    "fmt"
    "os"
    "path/filepath"
    "strings"

    "github.com/mark3labs/mcp-go/mcp"
    "github.com/mark3labs/mcp-go/server"
)

func main() {
    s := server.NewMCPServer("File Tools", "1.0.0",
        server.WithToolCapabilities(true),
        server.WithResourceCapabilities(true, true),
    )

    // 添加文件列表工具
    s.AddTool(
        mcp.NewTool("list_files",
            mcp.WithDescription("List files in a directory"),
            mcp.WithString("path",
                mcp.Required(),
                mcp.Description("Directory path to list"),
            ),
            mcp.WithBoolean("recursive",
                mcp.DefaultBool(false),
                mcp.Description("List files recursively"),
            ),
        ),
        handleListFiles,
    )

    // 添加文件内容资源
    s.AddResource(
        mcp.NewResource(
            "file://{path}",
            "File Content",
            mcp.WithResourceDescription("Read file contents"),
            mcp.WithMIMEType("text/plain"),
        ),
        handleFileContent,
    )

    // 启动 STDIO 服务器
    if err := server.ServeStdio(s); err != nil {
        fmt.Fprintf(os.Stderr, "Server error: %v\n", err)
        os.Exit(1)
    }
}

func handleListFiles(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
    path, err := req.RequireString("path")
    if err != nil {
        return mcp.NewToolResultError(err.Error()), nil
    }

    recursive, err := req.RequireBool("recursive")
    if err != nil {
        return mcp.NewToolResultError(err.Error()), nil
    }

    // 安全:验证路径
    if !isValidPath(path) {
        return mcp.NewToolResultError(fmt.Sprintf("invalid path: %s", path)), nil
    }

    files, err := listFiles(path, recursive)
    if err != nil {
        return mcp.NewToolResultError(fmt.Sprintf("failed to list files: %v", err)), nil
    }

    return mcp.NewToolResultText(fmt.Sprintf(`{"path":"%s","files":%v,"count":%d,"recursive":%t}`,
        path, files, len(files), recursive)), nil
}

func handleFileContent(ctx context.Context, req mcp.ReadResourceRequest) ([]mcp.ResourceContents, error) {
    // 从 URI 提取路径:"file:///path/to/file" -> "/path/to/file"
    path := extractPathFromURI(req.Params.URI)

    if !isValidPath(path) {
        return nil, fmt.Errorf("invalid path: %s", path)
    }

    content, err := os.ReadFile(path)
    if err != nil {
        return nil, fmt.Errorf("failed to read file: %w", err)
    }

    return []mcp.ResourceContents{
        mcp.TextResourceContents{
            URI:      req.Params.URI,
            MIMEType: detectMIMEType(path),
            Text:     string(content),
        },
    }, nil
}

func isValidPath(path string) bool {
    // 清理路径以解析任何 . 或 .. 组件
    clean := filepath.Clean(path)

    // 检查目录遍历模式
    if strings.Contains(clean, "..") {
        return false
    }

    // 对于绝对路径,确保它们在安全的基础目录内
    if filepath.IsAbs(clean) {
        // 定义安全基础目录(根据您的用例进行调整)
        safeBaseDirs := []string{
            "/tmp",
            "/var/tmp",
            "/home",
            "/Users", // macOS
        }

        // 检查路径是否以任何安全基础目录开头
        for _, baseDir := range safeBaseDirs {
            if strings.HasPrefix(clean, baseDir) {
                return true
            }
        }
        return false
    }

    // 对于相对路径,确保它们不会逃逸到当前目录
    return !strings.HasPrefix(clean, "..")
}

// 示例辅助函数
func listFiles(path string, recursive bool) ([]string, error) {
    // 占位符实现
    return []string{"file1.txt", "file2.txt"}, nil
}

func extractPathFromURI(uri string) string {
    // 从 URI 提取路径:"file:///path/to/file" -> "/path/to/file"
    if strings.HasPrefix(uri, "file://") {
        return strings.TrimPrefix(uri, "file://")
    }
    return uri
}

func detectMIMEType(path string) string {
    // 基于扩展名的简单 MIME 类型检测
    ext := filepath.Ext(path)
    switch ext {
    case ".txt":
        return "text/plain"
    case ".json":
        return "application/json"
    case ".html":
        return "text/html"
    default:
        return "application/octet-stream"
    }
}

高级 STDIO 服务器

go
package main
import (
    "context"
    "fmt"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/mark3labs/mcp-go/mcp"
    "github.com/mark3labs/mcp-go/server"
)

func main() {
    s := server.NewMCPServer("Advanced CLI Tool", "1.0.0",
        server.WithResourceCapabilities(true, true),
        server.WithPromptCapabilities(true),
        server.WithToolCapabilities(true),
        server.WithLogging(),
    )

    // 添加综合工具
    addSystemTools(s)
    addFileTools(s)
    addGitTools(s)
    addDatabaseTools(s)

    // 处理优雅关闭
    setupGracefulShutdown(s)

    // 启动并处理错误
    if err := server.ServeStdio(s); err != nil {
        logError(fmt.Sprintf("Server error: %v", err))
        os.Exit(1)
    }
}

// 高级示例辅助函数
func logToFile(message string) {
    // 占位符实现
    log.Println(message)
}

func logError(message string) {
    // 占位符实现
    log.Printf("ERROR: %s", message)
}

func addSystemTools(s *server.MCPServer) {
    // 占位符实现
}

func addFileTools(s *server.MCPServer) {
    // 占位符实现
}

func addGitTools(s *server.MCPServer) {
    // 占位符实现
}

func addDatabaseTools(s *server.MCPServer) {
    // 占位符实现
}

func setupGracefulShutdown(s *server.MCPServer) {
    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt, syscall.SIGTERM)

    go func() {
        <-c
        logToFile("Received shutdown signal")

        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
        defer cancel()

        if err := s.Shutdown(ctx); err != nil {
            logError(fmt.Sprintf("Shutdown error: %v", err))
        }

        os.Exit(0)
    }()
}

客户端集成

LLM 应用程序如何连接

LLM 应用程序通常通过以下方式连接到 STDIO MCP 服务器:

  1. 生成进程:将服务器作为子进程启动
  2. 管道通信:使用 stdin/stdout 进行 JSON-RPC 消息传递
  3. 生命周期管理:处理进程启动、关闭和错误

Claude Desktop 集成

在 Claude Desktop 中配置您的 STDIO 服务器:

json
{
  "mcpServers": {
    "file-tools": {
      "command": "go",
      "args": ["run", "/path/to/your/server/main.go"],
      "env": {
        "LOG_LEVEL": "info"
      }
    }
  }
}

macOS~/Library/Application Support/Claude/claude_desktop_config.jsonWindows%APPDATA%\Claude\claude_desktop_config.json

自定义客户端集成

go
package main

import (
    "context"
    "log"
    "time"


    "github.com/mark3labs/mcp-go/client"
    "github.com/mark3labs/mcp-go/mcp"
)

func main() {
    // 创建 STDIO 客户端
    c, err := client.NewStdioClient(
        "go", nil /* inherit env */, "run", "/path/to/server/main.go",
    )
    if err != nil {
        log.Fatal(err)
    }
    defer c.Close()

    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()

    // 初始化连接
    _, err = c.Initialize(ctx, mcp.InitializeRequest{
        Params: mcp.InitializeRequestParams{
            ProtocolVersion: mcp.LATEST_PROTOCOL_VERSION,
            ClientInfo: mcp.Implementation{
                Name:    "test-client",
                Version: "1.0.0",
            },
        },
    })
    if err != nil {
        log.Fatal(err)
    }

    // 列出可用工具
    tools, err := c.ListTools(ctx, mcp.ListToolsRequest{})
    if err != nil {
        log.Fatal(err)
    }

    log.Printf("Available tools: %d", len(tools.Tools))
    for _, tool := range tools.Tools {
        log.Printf("- %s: %s", tool.Name, tool.Description)
    }

    // 调用工具
    result, err := c.CallTool(ctx, mcp.CallToolRequest{
        Params: mcp.CallToolParams{
            Name: "list_files",
            Arguments: map[string]interface{}{
                "path":      ".",
                "recursive": false,
            },
        },
    })
    if err != nil {
        log.Fatal(err)
    }

    log.Printf("Tool result: %+v", result)
}

自定义子进程执行

如果您需要更多控制来创建新 STDIO 客户端时如何生成子进程,可以使用 NewStdioMCPClientWithOptions 而不是 NewStdioMCPClient

通过传递 WithCommandFunc 选项,您可以提供一个自定义工厂函数来创建启动服务器的 exec.Cmd。这允许配置环境变量、工作目录和系统级进程属性。

参考前面的示例,我们可以将创建客户端的行:

go
c, err := client.NewStdioClient(
    "go", nil, "run", "/path/to/server/main.go",
)

替换为支持选项的版本:

go
c, err := client.NewStdioMCPClientWithOptions(
	"go",
	nil,
	[]string {"run", "/path/to/server/main.go"},
	transport.WithCommandFunc(func(ctx context.Context, command string, args []string, env []string) (*exec.Cmd, error) {
        cmd := exec.CommandContext(ctx, command, args...)
        cmd.Env = env // 子进程的显式环境。
        cmd.Dir = "/var/sandbox/mcp-server" // 工作目录(除非与 chroot 或 namespace 配对,否则不会隔离)。

        // 应用低级进程隔离和权限降级。
        cmd.SysProcAttr = &syscall.SysProcAttr{
            // 降级到非 root 用户(例如:user/group ID 1001)
            Credential: &syscall.Credential{
                Uid: 1001,
                Gid: 1001,
            },
            // 文件系统隔离:仅在以 root 运行时有效。
            Chroot: "/var/sandbox/mcp-server",

            // Linux namespace 隔离(仅限 Linux):
            // 防止访问其他进程、挂载、IPC、网络等。
            Cloneflags: syscall.CLONE_NEWIPC | // 隔离进程间通信
                syscall.CLONE_NEWNS  | // 隔离文件系统挂载
                syscall.CLONE_NEWPID | // 隔离 PID 命名空间(子进程看到自己为 PID 1)
                syscall.CLONE_NEWUTS | // 隔离主机名
                syscall.CLONE_NEWNET,  // 隔离网络(可选)
        }

		return cmd, nil
	}),
)

调试

命令行测试

直接从命令行测试您的 STDIO 服务器:

bash
# 启动服务器
go run main.go

# 发送初始化请求
echo '{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2024-11-05","capabilities":{"tools":{}},"clientInfo":{"name":"test","version":"1.0.0"}}}' | go run main.go

# 列出工具
echo '{"jsonrpc":"2.0","id":2,"method":"tools/list","params":{}}' | go run main.go

# 调用工具
echo '{"jsonrpc":"2.0","id":3,"method":"tools/call","params":{"name":"list_files","arguments":{"path":".","recursive":false}}}' | go run main.go

交互式测试脚本

bash
#!/bin/bash

# interactive_test.sh
SERVER_CMD="go run main.go"

echo "Starting MCP STDIO server test..."

# 发送 JSON-RPC 请求的函数
send_request() {
    local request="$1"
    echo "Sending: $request"
    echo "$request" | $SERVER_CMD
    echo "---"
}

# 初始化
send_request '{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2024-11-05","capabilities":{"tools":{}},"clientInfo":{"name":"test","version":"1.0.0"}}}'

# 列出工具
send_request '{"jsonrpc":"2.0","id":2,"method":"tools/list","params":{}}'

# 列出资源
send_request '{"jsonrpc":"2.0","id":3,"method":"resources/list","params":{}}'

# 调用工具
send_request '{"jsonrpc":"2.0","id":4,"method":"tools/call","params":{"name":"list_files","arguments":{"path":".","recursive":false}}}'

echo "Test completed."

调试日志

为您的 STDIO 服务器添加调试日志:

go
func main() {
    // 设置调试日志
    logFile, err := os.OpenFile("mcp-server.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
    if err != nil {
        log.Fatal(err)
    }
    defer logFile.Close()

    logger := log.New(logFile, "[MCP] ", log.LstdFlags|log.Lshortfile)

    s := server.NewMCPServer("Debug Server", "1.0.0",
        server.WithToolCapabilities(true),
        server.WithLogging(),
    )

    // 添加带调试日志的工具
    s.AddTool(
        mcp.NewTool("debug_echo",
            mcp.WithDescription("Echo with debug logging"),
            mcp.WithString("message", mcp.Required()),
        ),
        func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
            message := req.GetString("message", "")
            logger.Printf("Echo tool called with message: %s", message)
            return mcp.NewToolResultText(fmt.Sprintf("Echo: %s", message)), nil
        },
    )

    logger.Println("Starting STDIO server...")
    if err := server.ServeStdio(s); err != nil {
        logger.Printf("Server error: %v", err)
    }
}

MCP Inspector 集成

使用 MCP Inspector 进行可视化调试:

bash
# 安装 MCP Inspector
npm install -g @modelcontextprotocol/inspector

# 使用 inspector 运行服务器
mcp-inspector go run main.go

这会打开一个 Web 界面,您可以:

  • 查看可用工具和资源
  • 交互式测试工具调用
  • 检查请求/响应消息
  • 调试协议问题

错误处理

健壮的错误处理

go
func handleToolWithErrors(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
    // 验证必需参数
    path, err := req.RequireString("path")
    if err != nil {
        return nil, fmt.Errorf("path parameter is required and must be a string")
    }

    // 验证路径安全性
    if !isValidPath(path) {
        return nil, fmt.Errorf("invalid or unsafe path: %s", path)
    }

    // 检查路径是否存在
    if _, err := os.Stat(path); os.IsNotExist(err) {
        return nil, fmt.Errorf("path does not exist: %s", path)
    }

    // 处理上下文取消
    select {
    case <-ctx.Done():
        return nil, ctx.Err()
    default:
    }

    // 使用超时执行操作
    ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
    defer cancel()

    result, err := performOperation(ctx, path)
    if err != nil {
        // 记录错误用于调试
        logError(fmt.Sprintf("Operation failed for path %s: %v", path, err))

        // 返回用户友好的错误
        if errors.Is(err, context.DeadlineExceeded) {
            return nil, fmt.Errorf("operation timed out")
        }

        return nil, fmt.Errorf("operation failed: %w", err)
    }

    return mcp.NewToolResultText(fmt.Sprintf("%v", result)), nil
}

进程管理

go
func main() {
    // 优雅地处理 panic
    defer func() {
        if r := recover(); r != nil {
            logError(fmt.Sprintf("Server panic: %v", r))
            os.Exit(1)
        }
    }()

    s := server.NewMCPServer("Robust Server", "1.0.0",
        server.WithRecovery(), // 内置 panic 恢复
    )

    // 设置信号处理
    setupSignalHandling()

    // 带重试逻辑启动服务器
    for attempts := 0; attempts < 3; attempts++ {
        if err := server.ServeStdio(s); err != nil {
            logError(fmt.Sprintf("Server attempt %d failed: %v", attempts+1, err))
            if attempts == 2 {
                os.Exit(1)
            }
            time.Sleep(time.Second * time.Duration(attempts+1))
        } else {
            break
        }
    }
}

func setupSignalHandling() {
    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt, syscall.SIGTERM)

    go func() {
        sig := <-c
        logToFile(fmt.Sprintf("Received signal: %v", sig))
        os.Exit(0)
    }()
}

性能优化

高效的资源使用

go
// 为数据库工具使用连接池
var dbPool *sql.DB

func init() {
    var err error
    dbPool, err = sql.Open("sqlite3", "data.db")
    if err != nil {
        log.Fatal(err)
    }

    dbPool.SetMaxOpenConns(10)
    dbPool.SetMaxIdleConns(5)
    dbPool.SetConnMaxLifetime(time.Hour)
}

// 缓存频繁访问的数据
var fileCache = make(map[string]cacheEntry)
var cacheMutex sync.RWMutex

type cacheEntry struct {
    content   string
    timestamp time.Time
}

func getCachedFile(path string) (string, bool) {
    cacheMutex.RLock()
    defer cacheMutex.RUnlock()

    entry, exists := fileCache[path]
    if !exists || time.Since(entry.timestamp) > 5*time.Minute {
        return "", false
    }

    return entry.content, true
}

内存管理

go
func handleLargeFile(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
    path := req.GetString("path", "")

    // 使用流式处理大文件而不是加载到内存
    file, err := os.Open(path)
    if err != nil {
        return nil, err
    }
    defer file.Close()

    // 分块处理
    const chunkSize = 64 * 1024
    buffer := make([]byte, chunkSize)

    var result strings.Builder
    for {
        n, err := file.Read(buffer)
        if err == io.EOF {
            break
        }
        if err != nil {
            return nil, err
        }

        // 处理块
        processed := processChunk(buffer[:n])
        result.WriteString(processed)

        // 检查取消
        select {
        case <-ctx.Done():
            return nil, ctx.Err()
        default:
        }
    }

    return mcp.NewToolResultText(result.String()), nil
}

后续步骤