任务增强工具
任务增强工具允许异步执行长时间运行的操作。不是阻塞直到完成,而是服务器立即创建任务,客户端可以轮询状态更新。这非常适合批处理、数据分析或可能需要超过几秒钟的任何工作。
概述
使用常规工具,处理程序同步运行——客户端在继续之前等待完整结果。任务增强工具通过立即返回 CreateTaskResult 来改变这一点,而实际工作在后台继续。客户端然后可以:
- 轮询 任务的当前状态
- 列出 所有活动任务
- 取消 正在运行的任务
任务支持模式
每个工具通过 mcp.TaskSupport 声明它如何与任务系统交互。有三种模式:
| 模式 | 常量 | 行为 |
|---|---|---|
| 禁止 | mcp.TaskSupportForbidden | 默认。工具不能作为任务调用。它像常规工具一样同步运行。 |
| 可选 | mcp.TaskSupportOptional | 工具可以作为任务调用或同步运行,取决于客户端的请求。 |
| 必需 | mcp.TaskSupportRequired | 工具必须作为任务调用。同步调用会被拒绝并返回错误。 |
// 创建工具时设置任务支持
tool := mcp.NewTool("long_process",
mcp.WithDescription("长时间运行的操作"),
mcp.WithTaskSupport(mcp.TaskSupportRequired),
)启用任务能力
要使用任务增强工具,在服务器上同时启用工具和任务能力。WithTaskCapabilities 接受三个布尔值来控制哪些任务操作可用:
- list — 允许客户端列出任务
- cancel — 允许客户端取消正在运行的任务
- toolCallTasks — 为工具调用启用任务增强
s := server.NewMCPServer("Task Server", "1.0.0",
server.WithToolCapabilities(true),
server.WithTaskCapabilities(true, true, true), // list, cancel, toolCallTasks
)注册任务工具
AddTaskTool
使用 AddTaskTool 注册带有其处理程序的单个任务增强工具:
tool := mcp.NewTool("long_process",
mcp.WithDescription("异步运行长时间进程"),
mcp.WithTaskSupport(mcp.TaskSupportRequired),
mcp.WithString("input", mcp.Required(), mcp.Description("输入数据")),
)
s.AddTaskTool(tool, func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CreateTaskResult, error) {
input := req.GetString("input", "")
// 在这里执行实际工作 — 这在后台运行。
// 检查 ctx.Done() 以支持取消。
for i := 0; i < 100; i++ {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
// 模拟工作
time.Sleep(100 * time.Millisecond)
}
}
// 返回结果 — 服务器管理任务生命周期
return &mcp.CreateTaskResult{
Task: mcp.NewTask("unique-task-id",
mcp.WithTaskStatusMessage("Processing started"),
mcp.WithTaskPollInterval(2000), // 建议每 2 秒轮询一次
mcp.WithTaskTTL(300000), // 任务 5 分钟后过期
),
}, nil
})AddTaskTools
使用 AddTaskTools 一次注册多个任务工具:
s.AddTaskTools(
server.ServerTaskTool{Tool: tool1, Handler: handler1},
server.ServerTaskTool{Tool: tool2, Handler: handler2},
)处理程序签名
任务工具处理程序使用专用函数类型,返回 *mcp.CreateTaskResult 而不是常规的 *mcp.CallToolResult:
type TaskToolHandlerFunc func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CreateTaskResult, error)处理程序在后台异步运行。服务器负责创建任务条目、跟踪状态以及向客户端发送通知。
任务生命周期
任务通过明确定义的状态集:
Working ──→ Completed
──→ Failed
──→ Cancelled
──→ InputRequired ──→ Working (resumed)可用的状态常量:
| 状态 | 常量 | 描述 |
|---|---|---|
| Working | mcp.TaskStatusWorking | 任务正在积极处理 |
| Completed | mcp.TaskStatusCompleted | 任务成功完成 |
| Failed | mcp.TaskStatusFailed | 任务遇到错误 |
| Cancelled | mcp.TaskStatusCancelled | 任务被客户端取消 |
| Input Required | mcp.TaskStatusInputRequired | 任务需要额外输入才能继续 |
使用 IsTerminal() 检查任务是否已达到最终状态:
task := mcp.NewTask("my-task")
// 之后,检查任务是否完成
if task.Status.IsTerminal() {
// 任务已完成、失败或已取消
}任务选项
使用 mcp.NewTask 创建任务时,可以使用函数式选项配置它:
task := mcp.NewTask("task-123",
mcp.WithTaskStatus(mcp.TaskStatusWorking), // 设置初始状态(默认:Working)
mcp.WithTaskStatusMessage("初始化中..."), // 附加人类可读的状态消息
mcp.WithTaskTTL(300000), // TTL 毫秒数(5 分钟)
mcp.WithTaskPollInterval(2000), // 客户端建议的轮询间隔(毫秒)
mcp.WithTaskCreatedAt("2025-01-15T10:30:00Z"), // 自定义创建时间戳
)| 选项 | 描述 |
|---|---|
WithTaskStatus | 设置任务的初始状态。默认为 TaskStatusWorking。 |
WithTaskStatusMessage | 附加描述当前状态的人类可读消息。 |
WithTaskTTL | 毫秒为单位的生存时间。超过此时间后,任务可能会被删除。 |
WithTaskPollInterval | 客户端建议的轮询间隔(毫秒)。 |
WithTaskCreatedAt | 覆盖创建时间戳(默认为 time.Now())。 |
模型即时响应
服务器可以在任务继续在后台执行时向模型提供即时响应。这允许模型在不等完整结果的情况下确认请求:
s.AddTaskTool(tool, func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CreateTaskResult, error) {
// ... 开始处理 ...
task := mcp.NewTask("task-456",
mcp.WithTaskStatusMessage("处理已开始"),
mcp.WithTaskPollInterval(5000),
)
return &mcp.CreateTaskResult{
Task: task,
Result: mcp.Result{
Meta: mcp.WithModelImmediateResponse("正在处理您的请求。这可能需要几分钟。"),
},
}, nil
})WithModelImmediateResponse 辅助函数设置 io.modelcontextprotocol/model-immediate-response 元数据键,提供客户端可以直接传递给模型的字符串作为临时结果。
相关任务元数据
您可以使用 WithRelatedTask 将消息或结果与特定任务关联。这在发送引用现有任务的随访数据时很有用:
meta := mcp.WithRelatedTask("task-id-123")元数据包含带有任务 ID 的 io.modelcontextprotocol/related-task 键,使客户端能够将响应与其原始任务关联。
任务钩子
任务钩子提供对任务生命周期的可观察性。使用 server.TaskHooks 注册创建、完成、失败、取消和任何状态更改的回调:
taskHooks := &server.TaskHooks{}
taskHooks.AddOnTaskCreated(func(ctx context.Context, metrics server.TaskMetrics) {
log.Printf("Task %s created for tool %s", metrics.TaskID, metrics.ToolName)
})
taskHooks.AddOnTaskCompleted(func(ctx context.Context, metrics server.TaskMetrics) {
log.Printf("Task %s completed in %v", metrics.TaskID, metrics.Duration)
})
taskHooks.AddOnTaskFailed(func(ctx context.Context, metrics server.TaskMetrics) {
log.Printf("Task %s failed: %v", metrics.TaskID, metrics.Error)
})
taskHooks.AddOnTaskCancelled(func(ctx context.Context, metrics server.TaskMetrics) {
log.Printf("Task %s was cancelled", metrics.TaskID)
})
taskHooks.AddOnTaskStatusChanged(func(ctx context.Context, metrics server.TaskMetrics) {
log.Printf("Task %s status: %s", metrics.TaskID, metrics.Status)
})使用 WithTaskHooks 将钩子传递给服务器:
s := server.NewMCPServer("Task Server", "1.0.0",
server.WithToolCapabilities(true),
server.WithTaskCapabilities(true, true, true),
server.WithTaskHooks(taskHooks),
)TaskMetrics 字段
server.TaskMetrics 结构为每个钩子提供丰富的上下文:
| 字段 | 类型 | 描述 |
|---|---|---|
TaskID | string | 任务的唯一标识符 |
ToolName | string | 创建任务的工具名称 |
Status | mcp.TaskStatus | 任务的当前状态 |
StatusMessage | string | 可选的状态消息 |
CreatedAt | time.Time | 任务创建时间 |
CompletedAt | *time.Time | 任务完成时间(如果仍在运行则为 nil) |
Duration | time.Duration | 任务运行时间(如果未完成则为 0) |
SessionID | string | 拥有此任务的会话 |
Error | error | 任务失败时的错误(否则为 nil) |
并发限制
使用 WithMaxConcurrentTasks 限制可以同时运行的任务数。当达到限制时,新的任务请求会被拒绝并返回错误:
s := server.NewMCPServer("Task Server", "1.0.0",
server.WithToolCapabilities(true),
server.WithTaskCapabilities(true, true, true),
server.WithMaxConcurrentTasks(10),
)任务状态通知
每当任务状态更改时,服务器会自动向连接的客户端发送 notifications/tasks/status。这意味着客户端不必仅依赖轮询——它们还可以监听推送通知以实时对状态转换做出反应。
完整示例
这是一个结合所有概念的完整示例——带有必需任务工具、可选任务工具和可观察性钩子的服务器:
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/mark3labs/mcp-go/mcp"
"github.com/mark3labs/mcp-go/server"
)
func main() {
// 设置可观察性钩子
taskHooks := &server.TaskHooks{}
taskHooks.AddOnTaskCreated(func(ctx context.Context, metrics server.TaskMetrics) {
log.Printf("Task %s created for tool %s", metrics.TaskID, metrics.ToolName)
})
taskHooks.AddOnTaskCompleted(func(ctx context.Context, metrics server.TaskMetrics) {
log.Printf("Task %s completed in %v", metrics.TaskID, metrics.Duration)
})
taskHooks.AddOnTaskFailed(func(ctx context.Context, metrics server.TaskMetrics) {
log.Printf("Task %s failed: %v", metrics.TaskID, metrics.Error)
})
// 创建服务器
s := server.NewMCPServer("Task Example", "1.0.0",
server.WithToolCapabilities(true),
server.WithTaskCapabilities(true, true, true),
server.WithTaskHooks(taskHooks),
server.WithMaxConcurrentTasks(10),
)
// 必需任务工具:必须作为任务调用
batchTool := mcp.NewTool("process_batch",
mcp.WithDescription("异步处理一批项目"),
mcp.WithTaskSupport(mcp.TaskSupportRequired),
mcp.WithArray("items",
mcp.Description("要处理的项目"),
mcp.Required(),
),
)
s.AddTaskTool(batchTool, handleBatch)
// 可选任务工具:可以作为同步或异步运行
analyzeTool := mcp.NewTool("analyze",
mcp.WithDescription("分析数据 — 如果作为任务调用则异步运行"),
mcp.WithTaskSupport(mcp.TaskSupportOptional),
mcp.WithString("data", mcp.Required(), mcp.Description("要分析的数据")),
)
s.AddTaskTool(analyzeTool, handleAnalyze)
// 启动服务器
if err := server.ServeStdio(s); err != nil {
log.Fatalf("Server error: %v", err)
}
}
func handleBatch(ctx context.Context, req mcp.CallToolRequest) (*mcp.CreateTaskResult, error) {
// 处理项目 — 检查取消
for i := 0; i < 10; i++ {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
time.Sleep(1 * time.Second)
}
}
return &mcp.CreateTaskResult{
Task: mcp.NewTask("batch-task",
mcp.WithTaskStatusMessage("批处理完成"),
),
}, nil
}
func handleAnalyze(ctx context.Context, req mcp.CallToolRequest) (*mcp.CreateTaskResult, error) {
data := req.GetString("data", "")
// 模拟分析
time.Sleep(2 * time.Second)
return &mcp.CreateTaskResult{
Task: mcp.NewTask("analyze-task",
mcp.WithTaskStatusMessage(fmt.Sprintf("分析了 %d 个字符", len(data))),
),
Result: mcp.Result{
Meta: mcp.WithModelImmediateResponse("分析正在进行中。结果将很快可用。"),
},
}, nil
}
