公告

任何建议或需求可联系我!


Skip to content

客户端传输

了解传输特定的客户端实现以及如何为你的用例选择正确的传输方式。

传输概述

MCP-Go 为所有支持的传输方式提供客户端实现。每种传输方式都有不同的特性,针对不同的场景进行了优化。

传输方式适用于连接方式实时性多客户端
STDIOCLI 工具、桌面应用进程管道
StreamableHTTPWeb 服务、APIHTTP 请求
SSEWeb 应用、实时HTTP + EventSource
In-Process测试、嵌入式直接调用

STDIO 客户端

STDIO 客户端通过标准输入/输出与服务端通信,通常通过启动子进程。

基本 STDIO 客户端

go
package main

import (
    "context"
    "crypto/tls"
    "errors"
    "fmt"
    "log"
    "net/http"
    "os"
    "sync"
    "time"

    "github.com/mark3labs/mcp-go/client"
    "github.com/mark3labs/mcp-go/mcp"
    "github.com/mark3labs/mcp-go/server"
)

func createStdioClient() {
    // 创建客户端,启动子进程
    c, err := client.NewStdioMCPClient(
        "go", []string{}, "run", "/path/to/server/main.go",
    )
    if err != nil {
        log.Fatal(err)
    }
    defer c.Close()

    ctx := context.Background()

    // 初始化连接
    if err := c.Initialize(ctx, initRequest); err != nil {
        log.Fatal(err)
    }

    // 使用客户端
    tools, err := c.ListTools(ctx, mcp.ListToolsRequest{})
    if err != nil {
        log.Fatal(err)
    }

    log.Printf("Available tools: %d", len(tools.Tools))
}

带自定义配置的 STDIO 客户端

go
func createCustomStdioClient() {
    // 创建自定义日志记录器用于调试
    logger := myCustomLogger{}

    // 使用自定义选项创建 STDIO 客户端
    c, err := client.NewStdioMCPClientWithOptions(
        "go",
        []string{"GOCACHE=/tmp/gocache"}, // 自定义环境
        []string{"run", "/path/to/server/main.go"},
        transport.WithCommandLogger(logger),
        transport.WithCommandFunc(func(ctx context.Context, command string, args []string, env []string) (*exec.Cmd, error) {
            cmd := exec.CommandContext(ctx, command, args...)
            cmd.Env = append(os.Environ(), env...)
            cmd.Dir = "/path/to/working/directory"
            return cmd, nil
        }),
    )
    if err != nil {
        log.Fatal(err)
    }
    defer c.Close()

    ctx := context.Background()

    // 初始化连接
    if err := c.Initialize(ctx); err != nil {
        log.Fatal(err)
    }

    // 使用客户端...
}

STDIO 错误处理

go
// 定义 STDIO 客户端错误的错误常量
var (
    ErrProcessExited  = errors.New("process exited")
    ErrProcessTimeout = errors.New("process timeout")
    ErrBrokenPipe     = errors.New("broken pipe")
)

func handleStdioErrors(c *client.StdioClient) {
    ctx := context.Background()

    result, err := c.CallTool(ctx, mcp.CallToolRequest{
        Params: mcp.CallToolRequestParams{
            Name: "example_tool",
        },
    })

    if err != nil {
        switch {
        case errors.Is(err, ErrProcessExited):
            log.Println("Server process exited unexpectedly")
            // 尝试重启
            if restartErr := c.Restart(); restartErr != nil {
                log.Printf("Failed to restart: %v", restartErr)
            }

        case errors.Is(err, ErrProcessTimeout):
            log.Println("Server process timed out")
            // 终止并重启进程
            c.Kill()
            if restartErr := c.Restart(); restartErr != nil {
                log.Printf("Failed to restart: %v", restartErr)
            }

        case errors.Is(err, ErrBrokenPipe):
            log.Println("Communication pipe broken")
            // 进程可能崩溃,重启
            if restartErr := c.Restart(); restartErr != nil {
                log.Printf("Failed to restart: %v", restartErr)
            }

        default:
            log.Printf("Unexpected error: %v", err)
        }
        return
    }

    log.Printf("Tool result: %+v", result)
}

STDIO 进程管理

go
type ManagedStdioClient struct {
    client      *client.StdioClient
    options     client.StdioOptions
    restartChan chan struct{}
    ctx         context.Context
    cancel      context.CancelFunc
    wg          sync.WaitGroup
}

func NewManagedStdioClient(options client.StdioOptions) (*ManagedStdioClient, error) {
    ctx, cancel := context.WithCancel(context.Background())

    msc := &ManagedStdioClient{
        options:     options,
        restartChan: make(chan struct{}, 1),
        ctx:         ctx,
        cancel:      cancel,
    }

    if err := msc.start(); err != nil {
        cancel()
        return nil, err
    }

    msc.wg.Add(1)
    go msc.monitorProcess()

    return msc, nil
}

func (msc *ManagedStdioClient) start() error {
    client, err := client.NewStdioClientWithOptions(msc.options)
    if err != nil {
        return err
    }

    if err := client.Initialize(msc.ctx); err != nil {
        client.Close()
        return err
    }

    msc.client = client
    return nil
}

func (msc *ManagedStdioClient) monitorProcess() {
    defer msc.wg.Done()

    for {
        select {
        case <-msc.ctx.Done():
            return
        case <-msc.restartChan:
            log.Println("Restarting STDIO client...")

            if msc.client != nil {
                msc.client.Close()
            }

            // 等待重启
            time.Sleep(1 * time.Second)

            if err := msc.start(); err != nil {
                log.Printf("Failed to restart client: %v", err)
                // 延迟后重试
                time.Sleep(5 * time.Second)
                select {
                case msc.restartChan <- struct{}{}:
                default:
                }
            } else {
                log.Println("Client restarted successfully")
            }
        }
    }
}

func (msc *ManagedStdioClient) CallTool(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
    if msc.client == nil {
        return nil, fmt.Errorf("client not available")
    }

    result, err := msc.client.CallTool(ctx, req)
    if err != nil && isProcessError(err) {
        // 触发重启
        select {
        case msc.restartChan <- struct{}{}:
        default:
        }
        return nil, fmt.Errorf("process error, restarting: %w", err)
    }

    return result, err
}

func (msc *ManagedStdioClient) Close() error {
    msc.cancel()
    msc.wg.Wait()

    if msc.client != nil {
        return msc.client.Close()
    }

    return nil
}

func isProcessError(err error) bool {
    return errors.Is(err, ErrProcessExited) ||
           errors.Is(err, ErrBrokenPipe) ||
           errors.Is(err, ErrProcessTimeout)
}
go
// 定义连接错误常量
var (
    ErrConnectionLost   = errors.New("connection lost")
    ErrConnectionFailed = errors.New("connection failed")
    ErrUnauthorized     = errors.New("unauthorized")
    ErrForbidden        = errors.New("forbidden")
)

StreamableHTTP 客户端

StreamableHTTP 客户端使用传统 HTTP 请求与服务端通信。

自动回退到传统 SSE

当连接到仅支持传统 SSE 传输的服务端时,StreamableHTTP 客户端将在初始化期间收到 4xx 错误。使用 ErrLegacySSEServer 哨兵错误检测此情况并自动回退:

go
import "github.com/mark3labs/mcp-go/client/transport"

func connectWithFallback(ctx context.Context, url string) (*client.Client, error) {
    // 首先尝试 StreamableHTTP
    streamableTransport, err := transport.NewStreamableHTTP(url)
    if err != nil {
        return nil, err
    }

    c := client.New(streamableTransport)
    if err := c.Start(ctx); err != nil {
        if errors.Is(err, transport.ErrLegacySSEServer) {
            // 回退到传统 SSE 传输
            sseTransport, err := transport.NewSSE(url)
            if err != nil {
                return nil, err
            }
            c = client.New(sseTransport)
            if err := c.Start(ctx); err != nil {
                return nil, err
            }
            return c, nil
        }
        return nil, err
    }
    return c, nil
}

基本 StreamableHTTP 客户端

go
func createStreamableHTTPClient() {
    // 创建 StreamableHTTP 客户端
    c := client.NewStreamableHttpClient("http://localhost:8080/mcp")
    defer c.Close()

    ctx := context.Background()

    // 初始化
    if err := c.Initialize(ctx); err != nil {
        log.Fatal(err)
    }

    // 使用客户端
    tools, err := c.ListTools(ctx, mcp.ListToolsRequest{})
    if err != nil {
        log.Fatal(err)
    }

    log.Printf("Available tools: %d", len(tools.Tools))
}

带自定义配置的 StreamableHTTP 客户端

go
func createCustomStreamableHTTPClient() {
    // 创建自定义日志记录器用于调试
    logger := myCustomLogger{}

    // 使用选项创建 StreamableHTTP 客户端
    c := client.NewStreamableHttpClient("https://api.example.com/mcp",
        transport.WithLogger(logger),
        transport.WithHTTPTimeout(30*time.Second),
        transport.WithHTTPHeaders(map[string]string{
            "User-Agent": "MyApp/1.0",
            "Accept":     "application/json",
        }),
        transport.WithHTTPBasicClient(&http.Client{
            Transport: &http.Transport{
                MaxIdleConns:        100,
                MaxIdleConnsPerHost: 10,
                IdleConnTimeout:     90 * time.Second,
                TLSClientConfig: &tls.Config{
                    InsecureSkipVerify: false,
                },
            },
        }),
    )
    defer c.Close()

    ctx := context.Background()

    if err := c.Initialize(ctx); err != nil {
        log.Fatal(err)
    }

    // 使用客户端...
}

StreamableHTTP 身份验证

go
func createAuthenticatedStreamableHTTPClient() {
    // 使用 OAuth 创建 StreamableHTTP 客户端
    c := client.NewStreamableHttpClient("http://localhost:8080/mcp",
        transport.WithHTTPOAuth(transport.OAuthConfig{
            ClientID:     "your-client-id",
            ClientSecret: "your-client-secret",
            TokenURL:     "https://auth.example.com/token",
            Scopes:       []string{"mcp:read", "mcp:write"},
        }),
    )
    defer c.Close()

    ctx := context.Background()

    if err := c.Initialize(ctx); err != nil {
        log.Fatal(err)
    }

    // 使用客户端...
}

func isAuthError(err error) bool {
    return errors.Is(err, ErrUnauthorized) ||
           errors.Is(err, ErrForbidden)
}

StreamableHTTP 连接池

go
type StreamableHTTPClientPool struct {
    clients chan *client.Client
    factory func() *client.Client
    maxSize int
}

func NewStreamableHTTPClientPool(baseURL string, maxSize int) *StreamableHTTPClientPool {
    pool := &StreamableHTTPClientPool{
        clients: make(chan *client.Client, maxSize),
        maxSize: maxSize,
        factory: func() *client.Client {
            return client.NewStreamableHttpClient(baseURL)
        },
    }

    // 预填充池
    for i := 0; i < maxSize; i++ {
        pool.clients <- pool.factory()
    }

    return pool
}

func (pool *StreamableHTTPClientPool) Get() *client.Client {
    select {
    case c := <-pool.clients:
        return c
    default:
        return pool.factory()
    }
}

func (pool *StreamableHTTPClientPool) Put(c *client.Client) {
    select {
    case pool.clients <- c:
    default:
        // 池已满,关闭客户端
        c.Close()
    }
}

func (pool *StreamableHTTPClientPool) CallTool(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
    c := pool.Get()
    defer pool.Put(c)

    return c.CallTool(ctx, req)
}

使用预配置会话的 StreamableHTTP

你还可以使用预配置会话创建 StreamableHTTP 客户端,这允许你在多个请求之间重用同一会话

go
func createStreamableHTTPClientWithSession() {
    // 使用选项创建 StreamableHTTP 客户端
    sessionID := // 获取现有会话 ID
    c := client.NewStreamableHttpClient("https://api.example.com/mcp",
        transport.WithSession(sessionID),
    )
    defer c.Close()

    ctx := context.Background()
    // 使用客户端...
    _, err := c.ListTools(ctx, mcp.ListToolsRequest{})
    // 如果会话终止,必须重新初始化客户端
    if errors.Is(err, transport.ErrSessionTerminated) {
        c.Initialize(ctx) // 如果会话终止则重新初始化
        // 重新初始化后会话 ID 应该会改变
        sessionID = c.GetSessionId() // 更新会话 ID
    }
}

SSE 客户端

SSE(Server-Sent Events)客户端提供服务端实时通信能力。

基本 SSE 客户端

go
func createSSEClient() {
    // 创建 SSE 客户端
    c, err := client.NewSSEMCPClient("http://localhost:8080/mcp/sse",
        transport.WithHeaders(map[string]string{
            "Authorization": "Bearer your-token",
        }),
    )
    if err != nil {
        log.Fatal(err)
    }
    defer c.Close()

    ctx := context.Background()

    // 初始化
    if err := c.Initialize(ctx); err != nil {
        log.Fatal(err)
    }

    // 订阅通知
    notifications, err := c.Subscribe(ctx)
    if err != nil {
        log.Fatal(err)
    }

    // 在后台处理通知
    go func() {
        for notification := range notifications {
            log.Printf("Notification: %+v", notification)
        }
    }()

    // 使用客户端进行常规操作
    tools, err := c.ListTools(ctx, mcp.ListToolsRequest{})
    if err != nil {
        log.Fatal(err)
    }

    log.Printf("Available tools: %d", len(tools.Tools))
}

带自定义配置的 SSE 客户端

go
func createCustomSSEClient() {
    // 创建自定义日志记录器用于调试
    logger := myCustomLogger{}

    // 使用自定义选项创建 SSE 客户端
    c, err := client.NewSSEMCPClient("http://localhost:8080/mcp/sse",
        transport.WithSSELogger(logger),
        transport.WithHeaders(map[string]string{
            "Authorization": "Bearer your-token",
            "User-Agent":    "MyApp/1.0",
        }),
        transport.WithHTTPClient(&http.Client{
            Timeout: 30 * time.Second,
        }),
    )
    if err != nil {
        log.Fatal(err)
    }
    defer c.Close()

    ctx := context.Background()

    // 初始化
    if err := c.Initialize(ctx); err != nil {
        log.Fatal(err)
    }

    // 使用客户端...
}

带重连的 SSE 客户端

go
type ResilientSSEClient struct {
    baseURL     string
    headers     map[string]string
    client      *client.Client
    ctx         context.Context
    cancel      context.CancelFunc
    reconnectCh chan struct{}
    mutex       sync.RWMutex
}

func NewResilientSSEClient(baseURL string) *ResilientSSEClient {
    ctx, cancel := context.WithCancel(context.Background())

    rsc := &ResilientSSEClient{
        baseURL:     baseURL,
        headers:     make(map[string]string),
        ctx:         ctx,
        cancel:      cancel,
        reconnectCh: make(chan struct{}, 1),
    }

    go rsc.reconnectLoop()
    return rsc
}

func (rsc *ResilientSSEClient) SetHeader(key, value string) {
    rsc.mutex.Lock()
    defer rsc.mutex.Unlock()
    rsc.headers[key] = value
}

func (rsc *ResilientSSEClient) connect() error {
    rsc.mutex.Lock()
    defer rsc.mutex.Unlock()

    if rsc.client != nil {
        rsc.client.Close()
    }

    client, err := client.NewSSEMCPClient(rsc.baseURL,
        transport.WithHeaders(rsc.headers),
    )
    if err != nil {
        return err
    }

    if err := client.Initialize(rsc.ctx); err != nil {
        return err
    }

    rsc.client = client
    return nil
}

func (rsc *ResilientSSEClient) reconnectLoop() {
    for {
        select {
        case <-rsc.ctx.Done():
            return
        case <-rsc.reconnectCh:
            log.Println("Reconnecting SSE client...")

            for attempt := 1; attempt <= 5; attempt++ {
                if err := rsc.connect(); err != nil {
                    log.Printf("Reconnection attempt %d failed: %v", attempt, err)

                    backoff := time.Duration(attempt) * time.Second
                    select {
                    case <-time.After(backoff):
                    case <-rsc.ctx.Done():
                        return
                    }
                } else {
                    log.Println("Reconnected successfully")
                    break
                }
            }
        }
    }
}

func (rsc *ResilientSSEClient) CallTool(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
    rsc.mutex.RLock()
    client := rsc.client
    rsc.mutex.RUnlock()

    if client == nil {
        return nil, fmt.Errorf("client not connected")
    }

    result, err := client.CallTool(ctx, req)
    if err != nil && isConnectionError(err) {
        // 触发重连
        select {
        case rsc.reconnectCh <- struct{}{}:
        default:
        }
        return nil, fmt.Errorf("connection error: %w", err)
    }

    return result, err
}

func (rsc *ResilientSSEClient) Subscribe(ctx context.Context) (<-chan mcp.Notification, error) {
    rsc.mutex.RLock()
    client := rsc.client
    rsc.mutex.RUnlock()

    if client == nil {
        return nil, fmt.Errorf("client not connected")
    }

    return client.Subscribe(ctx)
}

func (rsc *ResilientSSEClient) Close() error {
    rsc.cancel()

    rsc.mutex.Lock()
    defer rsc.mutex.Unlock()

    if rsc.client != nil {
        return rsc.client.Close()
    }

    return nil
}

// 辅助函数检查错误是否为连接错误
func isConnectionError(err error) bool {
    return errors.Is(err, ErrConnectionLost) ||
           errors.Is(err, ErrConnectionFailed)
}

SSE 事件处理

go
type SSEEventHandler struct {
    client      *client.SSEClient
    handlers    map[string][]func(mcp.Notification)
    mutex       sync.RWMutex
    ctx         context.Context
    cancel      context.CancelFunc
    wg          sync.WaitGroup
}

func NewSSEEventHandler(c *client.SSEClient) *SSEEventHandler {
    ctx, cancel := context.WithCancel(context.Background())

    return &SSEEventHandler{
        client:   c,
        handlers: make(map[string][]func(mcp.Notification)),
        ctx:      ctx,
        cancel:   cancel,
    }
}

func (seh *SSEEventHandler) Start() error {
    notifications, err := seh.client.Subscribe(seh.ctx)
    if err != nil {
        return err
    }

    seh.wg.Add(1)
    go func() {
        defer seh.wg.Done()

        for {
            select {
            case notification := <-notifications:
                seh.handleNotification(notification)
            case <-seh.ctx.Done():
                return
            }
        }
    }()

    return nil
}

func (seh *SSEEventHandler) Stop() {
    seh.cancel()
    seh.wg.Wait()
}

func (seh *SSEEventHandler) OnProgress(handler func(mcp.Notification)) {
    seh.addHandler("notifications/progress", handler)
}

func (seh *SSEEventHandler) OnMessage(handler func(mcp.Notification)) {
    seh.addHandler("notifications/message", handler)
}

func (seh *SSEEventHandler) OnResourceUpdate(handler func(mcp.Notification)) {
    seh.addHandler("notifications/resources/updated", handler)
}

func (seh *SSEEventHandler) OnToolUpdate(handler func(mcp.Notification)) {
    seh.addHandler("notifications/tools/updated", handler)
}

func (seh *SSEEventHandler) addHandler(method string, handler func(mcp.Notification)) {
    seh.mutex.Lock()
    defer seh.mutex.Unlock()

    seh.handlers[method] = append(seh.handlers[method], handler)
}

func (seh *SSEEventHandler) handleNotification(notification mcp.Notification) {
    seh.mutex.RLock()
    handlers := seh.handlers[notification.Method]
    seh.mutex.RUnlock()

    for _, handler := range handlers {
        go handler(notification)
    }
}

进程内客户端

进程内客户端在同一进程内提供服务端直接通信能力。

基本进程内客户端

go
func createInProcessClient() {
    // 创建服务端
    s := server.NewMCPServer("Test Server", "1.0.0")

    // 向服务端添加工具
    s.AddTool(
        mcp.NewTool("test_tool",
            mcp.WithDescription("Test tool"),
            mcp.WithString("input", mcp.Required()),
        ),
        func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
            input := req.Params.Arguments["input"].(string)
            return mcp.NewToolResultText("Processed: " + input), nil
        },
    )

    // 创建进程内客户端
    c := client.NewInProcessClient(s)
    defer c.Close()

    ctx := context.Background()

    // 初始化(无网络开销)
    if err := c.Initialize(ctx); err != nil {
        log.Fatal(err)
    }

    // 使用客户端
    result, err := c.CallTool(ctx, mcp.CallToolRequest{
        Params: mcp.CallToolRequestParams{
            Name: "test_tool",
            Arguments: map[string]interface{}{
                "input": "test data",
            },
        },
    })
    if err != nil {
        log.Fatal(err)
    }

    log.Printf("Tool result: %+v", result)
}

用于测试的进程内客户端

go
type TestClient struct {
    server *server.MCPServer
    client *client.InProcessClient
}

func NewTestClient() *TestClient {
    s := server.NewMCPServer("Test Server", "1.0.0",
        server.WithAllCapabilities(),
    )

    return &TestClient{
        server: s,
        client: client.NewInProcessClient(s),
    }
}

func (tc *TestClient) AddTool(name, description string, handler server.ToolHandler) {
    tool := mcp.NewTool(name, mcp.WithDescription(description))
    tc.server.AddTool(tool, handler)
}

func (tc *TestClient) AddResource(uri, name string, handler server.ResourceHandler) {
    resource := mcp.NewResource(uri, name)
    tc.server.AddResource(resource, handler)
}

func (tc *TestClient) Initialize(ctx context.Context) error {
    return tc.client.Initialize(ctx)
}

func (tc *TestClient) CallTool(ctx context.Context, name string, args map[string]interface{}) (*mcp.CallToolResult, error) {
    return tc.client.CallTool(ctx, mcp.CallToolRequest{
        Params: mcp.CallToolRequestParams{
            Name:      name,
            Arguments: args,
        },
    })
}

func (tc *TestClient) ReadResource(ctx context.Context, uri string) (*mcp.ReadResourceResult, error) {
    return tc.client.ReadResource(ctx, mcp.ReadResourceRequest{
        Params: mcp.ReadResourceRequestParams{
            URI: uri,
        },
    })
}

func (tc *TestClient) Close() error {
    return tc.client.Close()
}

// 在测试中使用
func TestWithInProcessClient(t *testing.T) {
    tc := NewTestClient()
    defer tc.Close()

    // 添加测试工具
    tc.AddTool("echo", "Echo input", func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
        input := req.Params.Arguments["input"].(string)
        return mcp.NewToolResultText(input), nil
    })

    ctx := context.Background()
    err := tc.Initialize(ctx)
    require.NoError(t, err)

    // 测试工具调用
    result, err := tc.CallTool(ctx, "echo", map[string]interface{}{
        "input": "hello world",
    })
    require.NoError(t, err)
    assert.Equal(t, "hello world", result.Content[0].Text)
}

传输选择

决策矩阵

根据这些因素选择你的传输方式:

go
type TransportRequirements struct {
    RealTime        bool
    MultiClient     bool
    NetworkRequired bool
    Performance     string // "high", "medium", "low"
    Complexity      string // "low", "medium", "high"
}

func SelectTransport(req TransportRequirements) string {
    switch {
    case !req.NetworkRequired && req.Performance == "high":
        return "inprocess"

    case !req.NetworkRequired && !req.MultiClient:
        return "stdio"

    case req.RealTime && req.MultiClient:
        return "sse"

    case req.NetworkRequired && req.MultiClient:
        return "streamablehttp"

    default:
        return "stdio" // 默认回退
    }
}

// 使用示例
func demonstrateTransportSelection() {
    // 高性能测试
    testReq := TransportRequirements{
        RealTime:        false,
        MultiClient:     false,
        NetworkRequired: false,
        Performance:     "high",
        Complexity:      "low",
    }
    fmt.Printf("Testing: %s\n", SelectTransport(testReq))

    // 实时 Web 应用程序
    webReq := TransportRequirements{
        RealTime:        true,
        MultiClient:     true,
        NetworkRequired: true,
        Performance:     "medium",
        Complexity:      "medium",
    }
    fmt.Printf("Web app: %s\n", SelectTransport(webReq))

    // CLI 工具
    cliReq := TransportRequirements{
        RealTime:        false,
        MultiClient:     false,
        NetworkRequired: false,
        Performance:     "medium",
        Complexity:      "low",
    }
    fmt.Printf("CLI tool: %s\n", SelectTransport(cliReq))
}

多传输客户端工厂

go
type ClientFactory struct {
    configs map[string]interface{}
}

func NewClientFactory() *ClientFactory {
    return &ClientFactory{
        configs: make(map[string]interface{}),
    }
}

func (cf *ClientFactory) SetStdioConfig(command string, args ...string) {
    cf.configs["stdio"] = client.StdioOptions{
        Command: command,
        Args:    args,
    }
}

func (cf *ClientFactory) SetStreamableHTTPConfig(baseURL string, headers map[string]string) {
    cf.configs["streamablehttp"] = struct {
        BaseURL string
        Headers map[string]string
    }{
        BaseURL: baseURL,
        Headers: headers,
    }
}

func (cf *ClientFactory) SetSSEConfig(baseURL string, headers map[string]string) {
    cf.configs["sse"] = struct {
        BaseURL string
        Headers map[string]string
    }{
        BaseURL: baseURL,
        Headers: headers,
    }
}

func (cf *ClientFactory) CreateClient(transport string) (client.Client, error) {
    switch transport {
    case "stdio":
        config, ok := cf.configs["stdio"].(client.StdioOptions)
        if !ok {
            return nil, fmt.Errorf("stdio config not set")
        }
        return client.NewStdioClientWithOptions(config)

    case "streamablehttp":
        config, ok := cf.configs["streamablehttp"].(struct {
            BaseURL string
            Headers map[string]string
        })
        if !ok {
            return nil, fmt.Errorf("streamablehttp config not set")
        }

        options := []transport.StreamableHTTPCOption{}
        if len(config.Headers) > 0 {
            options = append(options, transport.WithHTTPHeaders(config.Headers))
        }

        return client.NewStreamableHttpClient(config.BaseURL, options...), nil

    case "sse":
        config, ok := cf.configs["sse"].(struct {
            BaseURL string
            Headers map[string]string
        })
        if !ok {
            return nil, fmt.Errorf("sse config not set")
        }

        options := []transport.ClientOption{}
        if len(config.Headers) > 0 {
            options = append(options, transport.WithHeaders(config.Headers))
        }

        return client.NewSSEMCPClient(config.BaseURL, options...)

    default:
        return nil, fmt.Errorf("unknown transport: %s", transport)
    }
}

// 使用
func demonstrateClientFactory() {
    factory := NewClientFactory()

    // 配置传输
    factory.SetStdioConfig("go", "run", "server.go")
    factory.SetStreamableHTTPConfig("http://localhost:8080/mcp", map[string]string{
        "Authorization": "Bearer token",
    })
    factory.SetSSEConfig("http://localhost:8080/mcp/sse", map[string]string{
        "Authorization": "Bearer token",
    })

    // 基于环境创建客户端
    transport := os.Getenv("MCP_TRANSPORT")
    if transport == "" {
        transport = "stdio"
    }

    client, err := factory.CreateClient(transport)
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()

    // 使用客户端...
}

日志配置

所有客户端传输都支持自定义日志记录。每种传输都提供了一个日志记录器选项,接受 util.Logger 接口的任何实现。

go
type myCustomLogger struct {}

func (myCustomLogger) Infof(format string, args ...any) {
    // TODO
}

func (myCustomLogger) Errorf(format string, args ...any) {
    // TODO
}

使用 LoggingTransport 进行 JSON-RPC 追踪

为了调试线协议本身,可以用 transport.NewLogging 包装任何传输。该包装器是一个透明装饰器:它实现了 transport.Interface,此外还实现了内部传输支持的 BidirectionalInterface / HTTPConnection,因此客户端中的类型断言继续按预期工作。

go
import (
    "log/slog"
    "os"

    "github.com/mark3labs/mcp-go/client"
    "github.com/mark3labs/mcp-go/client/transport"
)

func tracedClient() (*client.Client, error) {
    inner, err := transport.NewStdio("my-server", nil)
    if err != nil {
        return nil, err
    }

    logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{
        Level: slog.LevelDebug,
    }))

    // 包装传输以便记录每条 JSON-RPC 消息
    logged := transport.NewLogging(inner, logger,
        transport.WithLoggingLevel(slog.LevelDebug),
        transport.WithLoggingPayloads(true), // 包含 params/result body
    )

    return client.NewClient(logged), nil
}

每条记录都带有消息方向、方法名、请求 ID 和(对于响应)往返时间:

text
level=DEBUG msg="→ request"  method=initialize id=1
level=DEBUG msg="← response" method=initialize id=1 duration=12ms
level=DEBUG msg="→ request"  method=tools/list id=2
level=DEBUG msg="← response" method=tools/list id=2 duration=3ms
level=DEBUG msg="← notification" method=notifications/tools/list_changed

使用 WithLoggingPayloads(false) 在只需要方法/ID 元数据时抑制消息体,使用 WithLoggingLevel 更改发出记录的 slog 级别。