STDIO 传输
STDIO(标准输入/输出)传输是最常见的 MCP 传输方式,非常适合命令行工具、桌面应用程序和本地集成。
适用场景
STDIO 传输在以下场景中表现出色:
- 命令行工具:LLM 可以调用的 CLI 工具
- 桌面应用程序:IDE 插件、文本编辑器、本地工具
- 子进程通信:管理 MCP 服务器的父进程
- 本地开发:测试和调试 MCP 实现
- 单用户场景:个人生产力工具
示例应用:
- IDE 文件系统浏览器
- 本地数据库查询工具
- Git 仓库分析器
- 系统监控工具
- 开发工作流自动化
实现
基本的 STDIO 服务器
go
package main
import (
"context"
"fmt"
"os"
"path/filepath"
"strings"
"github.com/mark3labs/mcp-go/mcp"
"github.com/mark3labs/mcp-go/server"
)
func main() {
s := server.NewMCPServer("File Tools", "1.0.0",
server.WithToolCapabilities(true),
server.WithResourceCapabilities(true, true),
)
// 添加文件列表工具
s.AddTool(
mcp.NewTool("list_files",
mcp.WithDescription("List files in a directory"),
mcp.WithString("path",
mcp.Required(),
mcp.Description("Directory path to list"),
),
mcp.WithBoolean("recursive",
mcp.DefaultBool(false),
mcp.Description("List files recursively"),
),
),
handleListFiles,
)
// 添加文件内容资源
s.AddResource(
mcp.NewResource(
"file://{path}",
"File Content",
mcp.WithResourceDescription("Read file contents"),
mcp.WithMIMEType("text/plain"),
),
handleFileContent,
)
// 启动 STDIO 服务器
if err := server.ServeStdio(s); err != nil {
fmt.Fprintf(os.Stderr, "Server error: %v\n", err)
os.Exit(1)
}
}
func handleListFiles(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
path, err := req.RequireString("path")
if err != nil {
return mcp.NewToolResultError(err.Error()), nil
}
recursive, err := req.RequireBool("recursive")
if err != nil {
return mcp.NewToolResultError(err.Error()), nil
}
// 安全:验证路径
if !isValidPath(path) {
return mcp.NewToolResultError(fmt.Sprintf("invalid path: %s", path)), nil
}
files, err := listFiles(path, recursive)
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("failed to list files: %v", err)), nil
}
return mcp.NewToolResultText(fmt.Sprintf(`{"path":"%s","files":%v,"count":%d,"recursive":%t}`,
path, files, len(files), recursive)), nil
}
func handleFileContent(ctx context.Context, req mcp.ReadResourceRequest) ([]mcp.ResourceContents, error) {
// 从 URI 提取路径:"file:///path/to/file" -> "/path/to/file"
path := extractPathFromURI(req.Params.URI)
if !isValidPath(path) {
return nil, fmt.Errorf("invalid path: %s", path)
}
content, err := os.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("failed to read file: %w", err)
}
return []mcp.ResourceContents{
mcp.TextResourceContents{
URI: req.Params.URI,
MIMEType: detectMIMEType(path),
Text: string(content),
},
}, nil
}
func isValidPath(path string) bool {
// 清理路径以解析任何 . 或 .. 组件
clean := filepath.Clean(path)
// 检查目录遍历模式
if strings.Contains(clean, "..") {
return false
}
// 对于绝对路径,确保它们在安全的基础目录内
if filepath.IsAbs(clean) {
// 定义安全基础目录(根据您的用例进行调整)
safeBaseDirs := []string{
"/tmp",
"/var/tmp",
"/home",
"/Users", // macOS
}
// 检查路径是否以任何安全基础目录开头
for _, baseDir := range safeBaseDirs {
if strings.HasPrefix(clean, baseDir) {
return true
}
}
return false
}
// 对于相对路径,确保它们不会逃逸到当前目录
return !strings.HasPrefix(clean, "..")
}
// 示例辅助函数
func listFiles(path string, recursive bool) ([]string, error) {
// 占位符实现
return []string{"file1.txt", "file2.txt"}, nil
}
func extractPathFromURI(uri string) string {
// 从 URI 提取路径:"file:///path/to/file" -> "/path/to/file"
if strings.HasPrefix(uri, "file://") {
return strings.TrimPrefix(uri, "file://")
}
return uri
}
func detectMIMEType(path string) string {
// 基于扩展名的简单 MIME 类型检测
ext := filepath.Ext(path)
switch ext {
case ".txt":
return "text/plain"
case ".json":
return "application/json"
case ".html":
return "text/html"
default:
return "application/octet-stream"
}
}高级 STDIO 服务器
go
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/mark3labs/mcp-go/mcp"
"github.com/mark3labs/mcp-go/server"
)
func main() {
s := server.NewMCPServer("Advanced CLI Tool", "1.0.0",
server.WithResourceCapabilities(true, true),
server.WithPromptCapabilities(true),
server.WithToolCapabilities(true),
server.WithLogging(),
)
// 添加综合工具
addSystemTools(s)
addFileTools(s)
addGitTools(s)
addDatabaseTools(s)
// 处理优雅关闭
setupGracefulShutdown(s)
// 启动并处理错误
if err := server.ServeStdio(s); err != nil {
logError(fmt.Sprintf("Server error: %v", err))
os.Exit(1)
}
}
// 高级示例辅助函数
func logToFile(message string) {
// 占位符实现
log.Println(message)
}
func logError(message string) {
// 占位符实现
log.Printf("ERROR: %s", message)
}
func addSystemTools(s *server.MCPServer) {
// 占位符实现
}
func addFileTools(s *server.MCPServer) {
// 占位符实现
}
func addGitTools(s *server.MCPServer) {
// 占位符实现
}
func addDatabaseTools(s *server.MCPServer) {
// 占位符实现
}
func setupGracefulShutdown(s *server.MCPServer) {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
logToFile("Received shutdown signal")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := s.Shutdown(ctx); err != nil {
logError(fmt.Sprintf("Shutdown error: %v", err))
}
os.Exit(0)
}()
}客户端集成
LLM 应用程序如何连接
LLM 应用程序通常通过以下方式连接到 STDIO MCP 服务器:
- 生成进程:将服务器作为子进程启动
- 管道通信:使用 stdin/stdout 进行 JSON-RPC 消息传递
- 生命周期管理:处理进程启动、关闭和错误
Claude Desktop 集成
在 Claude Desktop 中配置您的 STDIO 服务器:
json
{
"mcpServers": {
"file-tools": {
"command": "go",
"args": ["run", "/path/to/your/server/main.go"],
"env": {
"LOG_LEVEL": "info"
}
}
}
}macOS:~/Library/Application Support/Claude/claude_desktop_config.jsonWindows:%APPDATA%\Claude\claude_desktop_config.json
自定义客户端集成
go
package main
import (
"context"
"log"
"time"
"github.com/mark3labs/mcp-go/client"
"github.com/mark3labs/mcp-go/mcp"
)
func main() {
// 创建 STDIO 客户端
c, err := client.NewStdioClient(
"go", nil /* inherit env */, "run", "/path/to/server/main.go",
)
if err != nil {
log.Fatal(err)
}
defer c.Close()
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// 初始化连接
_, err = c.Initialize(ctx, mcp.InitializeRequest{
Params: mcp.InitializeRequestParams{
ProtocolVersion: mcp.LATEST_PROTOCOL_VERSION,
ClientInfo: mcp.Implementation{
Name: "test-client",
Version: "1.0.0",
},
},
})
if err != nil {
log.Fatal(err)
}
// 列出可用工具
tools, err := c.ListTools(ctx, mcp.ListToolsRequest{})
if err != nil {
log.Fatal(err)
}
log.Printf("Available tools: %d", len(tools.Tools))
for _, tool := range tools.Tools {
log.Printf("- %s: %s", tool.Name, tool.Description)
}
// 调用工具
result, err := c.CallTool(ctx, mcp.CallToolRequest{
Params: mcp.CallToolParams{
Name: "list_files",
Arguments: map[string]interface{}{
"path": ".",
"recursive": false,
},
},
})
if err != nil {
log.Fatal(err)
}
log.Printf("Tool result: %+v", result)
}自定义子进程执行
如果您需要更多控制来创建新 STDIO 客户端时如何生成子进程,可以使用 NewStdioMCPClientWithOptions 而不是 NewStdioMCPClient。
通过传递 WithCommandFunc 选项,您可以提供一个自定义工厂函数来创建启动服务器的 exec.Cmd。这允许配置环境变量、工作目录和系统级进程属性。
参考前面的示例,我们可以将创建客户端的行:
go
c, err := client.NewStdioClient(
"go", nil, "run", "/path/to/server/main.go",
)替换为支持选项的版本:
go
c, err := client.NewStdioMCPClientWithOptions(
"go",
nil,
[]string {"run", "/path/to/server/main.go"},
transport.WithCommandFunc(func(ctx context.Context, command string, args []string, env []string) (*exec.Cmd, error) {
cmd := exec.CommandContext(ctx, command, args...)
cmd.Env = env // 子进程的显式环境。
cmd.Dir = "/var/sandbox/mcp-server" // 工作目录(除非与 chroot 或 namespace 配对,否则不会隔离)。
// 应用低级进程隔离和权限降级。
cmd.SysProcAttr = &syscall.SysProcAttr{
// 降级到非 root 用户(例如:user/group ID 1001)
Credential: &syscall.Credential{
Uid: 1001,
Gid: 1001,
},
// 文件系统隔离:仅在以 root 运行时有效。
Chroot: "/var/sandbox/mcp-server",
// Linux namespace 隔离(仅限 Linux):
// 防止访问其他进程、挂载、IPC、网络等。
Cloneflags: syscall.CLONE_NEWIPC | // 隔离进程间通信
syscall.CLONE_NEWNS | // 隔离文件系统挂载
syscall.CLONE_NEWPID | // 隔离 PID 命名空间(子进程看到自己为 PID 1)
syscall.CLONE_NEWUTS | // 隔离主机名
syscall.CLONE_NEWNET, // 隔离网络(可选)
}
return cmd, nil
}),
)调试
命令行测试
直接从命令行测试您的 STDIO 服务器:
bash
# 启动服务器
go run main.go
# 发送初始化请求
echo '{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2024-11-05","capabilities":{"tools":{}},"clientInfo":{"name":"test","version":"1.0.0"}}}' | go run main.go
# 列出工具
echo '{"jsonrpc":"2.0","id":2,"method":"tools/list","params":{}}' | go run main.go
# 调用工具
echo '{"jsonrpc":"2.0","id":3,"method":"tools/call","params":{"name":"list_files","arguments":{"path":".","recursive":false}}}' | go run main.go交互式测试脚本
bash
#!/bin/bash
# interactive_test.sh
SERVER_CMD="go run main.go"
echo "Starting MCP STDIO server test..."
# 发送 JSON-RPC 请求的函数
send_request() {
local request="$1"
echo "Sending: $request"
echo "$request" | $SERVER_CMD
echo "---"
}
# 初始化
send_request '{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2024-11-05","capabilities":{"tools":{}},"clientInfo":{"name":"test","version":"1.0.0"}}}'
# 列出工具
send_request '{"jsonrpc":"2.0","id":2,"method":"tools/list","params":{}}'
# 列出资源
send_request '{"jsonrpc":"2.0","id":3,"method":"resources/list","params":{}}'
# 调用工具
send_request '{"jsonrpc":"2.0","id":4,"method":"tools/call","params":{"name":"list_files","arguments":{"path":".","recursive":false}}}'
echo "Test completed."调试日志
为您的 STDIO 服务器添加调试日志:
go
func main() {
// 设置调试日志
logFile, err := os.OpenFile("mcp-server.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
log.Fatal(err)
}
defer logFile.Close()
logger := log.New(logFile, "[MCP] ", log.LstdFlags|log.Lshortfile)
s := server.NewMCPServer("Debug Server", "1.0.0",
server.WithToolCapabilities(true),
server.WithLogging(),
)
// 添加带调试日志的工具
s.AddTool(
mcp.NewTool("debug_echo",
mcp.WithDescription("Echo with debug logging"),
mcp.WithString("message", mcp.Required()),
),
func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
message := req.GetString("message", "")
logger.Printf("Echo tool called with message: %s", message)
return mcp.NewToolResultText(fmt.Sprintf("Echo: %s", message)), nil
},
)
logger.Println("Starting STDIO server...")
if err := server.ServeStdio(s); err != nil {
logger.Printf("Server error: %v", err)
}
}MCP Inspector 集成
使用 MCP Inspector 进行可视化调试:
bash
# 安装 MCP Inspector
npm install -g @modelcontextprotocol/inspector
# 使用 inspector 运行服务器
mcp-inspector go run main.go这会打开一个 Web 界面,您可以:
- 查看可用工具和资源
- 交互式测试工具调用
- 检查请求/响应消息
- 调试协议问题
错误处理
健壮的错误处理
go
func handleToolWithErrors(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
// 验证必需参数
path, err := req.RequireString("path")
if err != nil {
return nil, fmt.Errorf("path parameter is required and must be a string")
}
// 验证路径安全性
if !isValidPath(path) {
return nil, fmt.Errorf("invalid or unsafe path: %s", path)
}
// 检查路径是否存在
if _, err := os.Stat(path); os.IsNotExist(err) {
return nil, fmt.Errorf("path does not exist: %s", path)
}
// 处理上下文取消
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
// 使用超时执行操作
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
result, err := performOperation(ctx, path)
if err != nil {
// 记录错误用于调试
logError(fmt.Sprintf("Operation failed for path %s: %v", path, err))
// 返回用户友好的错误
if errors.Is(err, context.DeadlineExceeded) {
return nil, fmt.Errorf("operation timed out")
}
return nil, fmt.Errorf("operation failed: %w", err)
}
return mcp.NewToolResultText(fmt.Sprintf("%v", result)), nil
}进程管理
go
func main() {
// 优雅地处理 panic
defer func() {
if r := recover(); r != nil {
logError(fmt.Sprintf("Server panic: %v", r))
os.Exit(1)
}
}()
s := server.NewMCPServer("Robust Server", "1.0.0",
server.WithRecovery(), // 内置 panic 恢复
)
// 设置信号处理
setupSignalHandling()
// 带重试逻辑启动服务器
for attempts := 0; attempts < 3; attempts++ {
if err := server.ServeStdio(s); err != nil {
logError(fmt.Sprintf("Server attempt %d failed: %v", attempts+1, err))
if attempts == 2 {
os.Exit(1)
}
time.Sleep(time.Second * time.Duration(attempts+1))
} else {
break
}
}
}
func setupSignalHandling() {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
sig := <-c
logToFile(fmt.Sprintf("Received signal: %v", sig))
os.Exit(0)
}()
}性能优化
高效的资源使用
go
// 为数据库工具使用连接池
var dbPool *sql.DB
func init() {
var err error
dbPool, err = sql.Open("sqlite3", "data.db")
if err != nil {
log.Fatal(err)
}
dbPool.SetMaxOpenConns(10)
dbPool.SetMaxIdleConns(5)
dbPool.SetConnMaxLifetime(time.Hour)
}
// 缓存频繁访问的数据
var fileCache = make(map[string]cacheEntry)
var cacheMutex sync.RWMutex
type cacheEntry struct {
content string
timestamp time.Time
}
func getCachedFile(path string) (string, bool) {
cacheMutex.RLock()
defer cacheMutex.RUnlock()
entry, exists := fileCache[path]
if !exists || time.Since(entry.timestamp) > 5*time.Minute {
return "", false
}
return entry.content, true
}内存管理
go
func handleLargeFile(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
path := req.GetString("path", "")
// 使用流式处理大文件而不是加载到内存
file, err := os.Open(path)
if err != nil {
return nil, err
}
defer file.Close()
// 分块处理
const chunkSize = 64 * 1024
buffer := make([]byte, chunkSize)
var result strings.Builder
for {
n, err := file.Read(buffer)
if err == io.EOF {
break
}
if err != nil {
return nil, err
}
// 处理块
processed := processChunk(buffer[:n])
result.WriteString(processed)
// 检查取消
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
}
return mcp.NewToolResultText(result.String()), nil
}
