公告

任何建议或需求可联系我!


Skip to content

任务增强工具

任务增强工具允许异步执行长时间运行的操作。不是阻塞直到完成,而是服务器立即创建任务,客户端可以轮询状态更新。这非常适合批处理、数据分析或可能需要超过几秒钟的任何工作。

概述

使用常规工具,处理程序同步运行——客户端在继续之前等待完整结果。任务增强工具通过立即返回 CreateTaskResult 来改变这一点,而实际工作在后台继续。客户端然后可以:

  • 轮询 任务的当前状态
  • 列出 所有活动任务
  • 取消 正在运行的任务

任务支持模式

每个工具通过 mcp.TaskSupport 声明它如何与任务系统交互。有三种模式:

模式常量行为
禁止mcp.TaskSupportForbidden默认。工具不能作为任务调用。它像常规工具一样同步运行。
可选mcp.TaskSupportOptional工具可以作为任务调用或同步运行,取决于客户端的请求。
必需mcp.TaskSupportRequired工具必须作为任务调用。同步调用会被拒绝并返回错误。
go
// 创建工具时设置任务支持
tool := mcp.NewTool("long_process",
    mcp.WithDescription("长时间运行的操作"),
    mcp.WithTaskSupport(mcp.TaskSupportRequired),
)

启用任务能力

要使用任务增强工具,在服务器上同时启用工具和任务能力。WithTaskCapabilities 接受三个布尔值来控制哪些任务操作可用:

  1. list — 允许客户端列出任务
  2. cancel — 允许客户端取消正在运行的任务
  3. toolCallTasks — 为工具调用启用任务增强
go
s := server.NewMCPServer("Task Server", "1.0.0",
    server.WithToolCapabilities(true),
    server.WithTaskCapabilities(true, true, true), // list, cancel, toolCallTasks
)

注册任务工具

AddTaskTool

使用 AddTaskTool 注册带有其处理程序的单个任务增强工具:

go
tool := mcp.NewTool("long_process",
    mcp.WithDescription("异步运行长时间进程"),
    mcp.WithTaskSupport(mcp.TaskSupportRequired),
    mcp.WithString("input", mcp.Required(), mcp.Description("输入数据")),
)

s.AddTaskTool(tool, func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CreateTaskResult, error) {
    input := req.GetString("input", "")

    // 在这里执行实际工作 — 这在后台运行。
    // 检查 ctx.Done() 以支持取消。
    for i := 0; i < 100; i++ {
        select {
        case <-ctx.Done():
            return nil, ctx.Err()
        default:
            // 模拟工作
            time.Sleep(100 * time.Millisecond)
        }
    }

    // 返回结果 — 服务器管理任务生命周期
    return &mcp.CreateTaskResult{
        Task: mcp.NewTask("unique-task-id",
            mcp.WithTaskStatusMessage("Processing started"),
            mcp.WithTaskPollInterval(2000), // 建议每 2 秒轮询一次
            mcp.WithTaskTTL(300000),        // 任务 5 分钟后过期
        ),
    }, nil
})

AddTaskTools

使用 AddTaskTools 一次注册多个任务工具:

go
s.AddTaskTools(
    server.ServerTaskTool{Tool: tool1, Handler: handler1},
    server.ServerTaskTool{Tool: tool2, Handler: handler2},
)

处理程序签名

任务工具处理程序使用专用函数类型,返回 *mcp.CreateTaskResult 而不是常规的 *mcp.CallToolResult

go
type TaskToolHandlerFunc func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CreateTaskResult, error)

处理程序在后台异步运行。服务器负责创建任务条目、跟踪状态以及向客户端发送通知。

任务生命周期

任务通过明确定义的状态集:

Working ──→ Completed
       ──→ Failed
       ──→ Cancelled
       ──→ InputRequired ──→ Working (resumed)

可用的状态常量:

状态常量描述
Workingmcp.TaskStatusWorking任务正在积极处理
Completedmcp.TaskStatusCompleted任务成功完成
Failedmcp.TaskStatusFailed任务遇到错误
Cancelledmcp.TaskStatusCancelled任务被客户端取消
Input Requiredmcp.TaskStatusInputRequired任务需要额外输入才能继续

使用 IsTerminal() 检查任务是否已达到最终状态:

go
task := mcp.NewTask("my-task")

// 之后,检查任务是否完成
if task.Status.IsTerminal() {
    // 任务已完成、失败或已取消
}

任务选项

使用 mcp.NewTask 创建任务时,可以使用函数式选项配置它:

go
task := mcp.NewTask("task-123",
    mcp.WithTaskStatus(mcp.TaskStatusWorking),       // 设置初始状态(默认:Working)
    mcp.WithTaskStatusMessage("初始化中..."),           // 附加人类可读的状态消息
    mcp.WithTaskTTL(300000),                          // TTL 毫秒数(5 分钟)
    mcp.WithTaskPollInterval(2000),                   // 客户端建议的轮询间隔(毫秒)
    mcp.WithTaskCreatedAt("2025-01-15T10:30:00Z"),    // 自定义创建时间戳
)
选项描述
WithTaskStatus设置任务的初始状态。默认为 TaskStatusWorking
WithTaskStatusMessage附加描述当前状态的人类可读消息。
WithTaskTTL毫秒为单位的生存时间。超过此时间后,任务可能会被删除。
WithTaskPollInterval客户端建议的轮询间隔(毫秒)。
WithTaskCreatedAt覆盖创建时间戳(默认为 time.Now())。

模型即时响应

服务器可以在任务继续在后台执行时向模型提供即时响应。这允许模型在不等完整结果的情况下确认请求:

go
s.AddTaskTool(tool, func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CreateTaskResult, error) {
    // ... 开始处理 ...

    task := mcp.NewTask("task-456",
        mcp.WithTaskStatusMessage("处理已开始"),
        mcp.WithTaskPollInterval(5000),
    )

    return &mcp.CreateTaskResult{
        Task: task,
        Result: mcp.Result{
            Meta: mcp.WithModelImmediateResponse("正在处理您的请求。这可能需要几分钟。"),
        },
    }, nil
})

WithModelImmediateResponse 辅助函数设置 io.modelcontextprotocol/model-immediate-response 元数据键,提供客户端可以直接传递给模型的字符串作为临时结果。

相关任务元数据

您可以使用 WithRelatedTask 将消息或结果与特定任务关联。这在发送引用现有任务的随访数据时很有用:

go
meta := mcp.WithRelatedTask("task-id-123")

元数据包含带有任务 ID 的 io.modelcontextprotocol/related-task 键,使客户端能够将响应与其原始任务关联。

任务钩子

任务钩子提供对任务生命周期的可观察性。使用 server.TaskHooks 注册创建、完成、失败、取消和任何状态更改的回调:

go
taskHooks := &server.TaskHooks{}

taskHooks.AddOnTaskCreated(func(ctx context.Context, metrics server.TaskMetrics) {
    log.Printf("Task %s created for tool %s", metrics.TaskID, metrics.ToolName)
})

taskHooks.AddOnTaskCompleted(func(ctx context.Context, metrics server.TaskMetrics) {
    log.Printf("Task %s completed in %v", metrics.TaskID, metrics.Duration)
})

taskHooks.AddOnTaskFailed(func(ctx context.Context, metrics server.TaskMetrics) {
    log.Printf("Task %s failed: %v", metrics.TaskID, metrics.Error)
})

taskHooks.AddOnTaskCancelled(func(ctx context.Context, metrics server.TaskMetrics) {
    log.Printf("Task %s was cancelled", metrics.TaskID)
})

taskHooks.AddOnTaskStatusChanged(func(ctx context.Context, metrics server.TaskMetrics) {
    log.Printf("Task %s status: %s", metrics.TaskID, metrics.Status)
})

使用 WithTaskHooks 将钩子传递给服务器:

go
s := server.NewMCPServer("Task Server", "1.0.0",
    server.WithToolCapabilities(true),
    server.WithTaskCapabilities(true, true, true),
    server.WithTaskHooks(taskHooks),
)

TaskMetrics 字段

server.TaskMetrics 结构为每个钩子提供丰富的上下文:

字段类型描述
TaskIDstring任务的唯一标识符
ToolNamestring创建任务的工具名称
Statusmcp.TaskStatus任务的当前状态
StatusMessagestring可选的状态消息
CreatedAttime.Time任务创建时间
CompletedAt*time.Time任务完成时间(如果仍在运行则为 nil)
Durationtime.Duration任务运行时间(如果未完成则为 0)
SessionIDstring拥有此任务的会话
Errorerror任务失败时的错误(否则为 nil)

并发限制

使用 WithMaxConcurrentTasks 限制可以同时运行的任务数。当达到限制时,新的任务请求会被拒绝并返回错误:

go
s := server.NewMCPServer("Task Server", "1.0.0",
    server.WithToolCapabilities(true),
    server.WithTaskCapabilities(true, true, true),
    server.WithMaxConcurrentTasks(10),
)

任务状态通知

每当任务状态更改时,服务器会自动向连接的客户端发送 notifications/tasks/status。这意味着客户端不必仅依赖轮询——它们还可以监听推送通知以实时对状态转换做出反应。

完整示例

这是一个结合所有概念的完整示例——带有必需任务工具、可选任务工具和可观察性钩子的服务器:

go
package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/mark3labs/mcp-go/mcp"
    "github.com/mark3labs/mcp-go/server"
)

func main() {
    // 设置可观察性钩子
    taskHooks := &server.TaskHooks{}
    taskHooks.AddOnTaskCreated(func(ctx context.Context, metrics server.TaskMetrics) {
        log.Printf("Task %s created for tool %s", metrics.TaskID, metrics.ToolName)
    })
    taskHooks.AddOnTaskCompleted(func(ctx context.Context, metrics server.TaskMetrics) {
        log.Printf("Task %s completed in %v", metrics.TaskID, metrics.Duration)
    })
    taskHooks.AddOnTaskFailed(func(ctx context.Context, metrics server.TaskMetrics) {
        log.Printf("Task %s failed: %v", metrics.TaskID, metrics.Error)
    })

    // 创建服务器
    s := server.NewMCPServer("Task Example", "1.0.0",
        server.WithToolCapabilities(true),
        server.WithTaskCapabilities(true, true, true),
        server.WithTaskHooks(taskHooks),
        server.WithMaxConcurrentTasks(10),
    )

    // 必需任务工具:必须作为任务调用
    batchTool := mcp.NewTool("process_batch",
        mcp.WithDescription("异步处理一批项目"),
        mcp.WithTaskSupport(mcp.TaskSupportRequired),
        mcp.WithArray("items",
            mcp.Description("要处理的项目"),
            mcp.Required(),
        ),
    )
    s.AddTaskTool(batchTool, handleBatch)

    // 可选任务工具:可以作为同步或异步运行
    analyzeTool := mcp.NewTool("analyze",
        mcp.WithDescription("分析数据 — 如果作为任务调用则异步运行"),
        mcp.WithTaskSupport(mcp.TaskSupportOptional),
        mcp.WithString("data", mcp.Required(), mcp.Description("要分析的数据")),
    )
    s.AddTaskTool(analyzeTool, handleAnalyze)

    // 启动服务器
    if err := server.ServeStdio(s); err != nil {
        log.Fatalf("Server error: %v", err)
    }
}

func handleBatch(ctx context.Context, req mcp.CallToolRequest) (*mcp.CreateTaskResult, error) {
    // 处理项目 — 检查取消
    for i := 0; i < 10; i++ {
        select {
        case <-ctx.Done():
            return nil, ctx.Err()
        default:
            time.Sleep(1 * time.Second)
        }
    }

    return &mcp.CreateTaskResult{
        Task: mcp.NewTask("batch-task",
            mcp.WithTaskStatusMessage("批处理完成"),
        ),
    }, nil
}

func handleAnalyze(ctx context.Context, req mcp.CallToolRequest) (*mcp.CreateTaskResult, error) {
    data := req.GetString("data", "")

    // 模拟分析
    time.Sleep(2 * time.Second)

    return &mcp.CreateTaskResult{
        Task: mcp.NewTask("analyze-task",
            mcp.WithTaskStatusMessage(fmt.Sprintf("分析了 %d 个字符", len(data))),
        ),
        Result: mcp.Result{
            Meta: mcp.WithModelImmediateResponse("分析正在进行中。结果将很快可用。"),
        },
    }, nil
}

下一步

  • 工具 — 了解常规同步工具
  • 高级功能 — 探索中间件、钩子和类型处理程序