实现资源
资源以只读方式向 LLM 暴露数据。可以将它们视为提供对文件、数据库、API 或任何其他数据源访问的 GET 端点。
资源基础
MCP 中的资源通过 URI 标识,可以是静态的(固定内容)或动态的(按需生成)。它们非常适合让 LLM 访问文档、配置文件、数据库记录或 API 响应。
基本资源结构
// 创建一个简单的资源
resource := mcp.NewResource(
"docs://readme", // URI - 唯一标识符
"Project README", // Name - 人类可读名称
mcp.WithResourceDescription("主项目文档"),
mcp.WithMIMEType("text/markdown"),
)资源图标
资源可以包含图标以便更好地视觉识别:
resource := mcp.NewResource(
"docs://readme",
"Project README",
mcp.WithResourceDescription("主项目文档"),
mcp.WithMIMEType("text/markdown"),
mcp.WithResourceIcons(
mcp.Icon{
Src: "https://example.com/icons/document.svg",
MIMEType: "image/svg+xml",
},
),
)显示标题
资源和资源模板可以携带与 name 分离的人类可读显示标题。客户端在文件浏览器和选择器中显示标题;name 保持为稳定的标识符,适合程序化查找。
resource := mcp.NewResource(
"docs://readme",
"readme.md", // name(稳定标识符)
mcp.WithResourceTitle("项目自述文件"), // 在 UI 中显示
mcp.WithResourceDescription("主项目文档"),
mcp.WithMIMEType("text/markdown"),
)
// 资源模板也有相同选项
template := mcp.NewResourceTemplate(
"users://{user_id}",
"user-profile",
mcp.WithTemplateTitle("用户资料"),
mcp.WithTemplateDescription("特定用户的信息"),
)如果 Title 为空,客户端会回退到 Name。该字段使用 omitempty 序列化,因此未设置标题的资源在传输时不会改变。
资源大小
当可以提前知道资源的原始字节大小时,设置它以便主机可以在不获取内容的情况下渲染文件大小并预先预算上下文窗口使用量:
resource := mcp.NewResource(
"file:///var/log/app.log",
"app.log",
mcp.WithResourceTitle("应用日志"),
mcp.WithMIMEType("text/plain"),
mcp.WithResourceSize(2_048_576), // 2 MiB,在 base64 编码之前测量
)Size 在 Resource 结构体上暴露为 *int64,这样明确的零字节资源可以与未知大小区分开来。当大小未知时,完全省略 WithResourceSize。
静态资源
静态资源具有固定的 URI,通常提供预定的内容。
基于文件的资源
从文件系统公开文件:
func main() {
s := server.NewMCPServer("File Server", "1.0.0",
server.WithResourceCapabilities(true),
)
// 添加静态文件资源
s.AddResource(
mcp.NewResource(
"file://README.md",
"Project README",
mcp.WithResourceDescription("主项目文档"),
mcp.WithMIMEType("text/markdown"),
),
handleReadmeFile,
)
server.ServeStdio(s)
}
func handleReadmeFile(ctx context.Context, req mcp.ReadResourceRequest) (*mcp.ReadResourceResult, error) {
content, err := os.ReadFile("README.md")
if err != nil {
return nil, fmt.Errorf("failed to read README: %w", err)
}
return &mcp.ReadResourceResult{
Contents: []mcp.ResourceContent{
{
URI: req.Params.URI,
MIMEType: "text/markdown",
Text: string(content),
},
},
}, nil
}配置资源
公开应用配置:
// 配置资源
s.AddResource(
mcp.NewResource(
"config://app",
"Application Configuration",
mcp.WithResourceDescription("当前应用设置"),
mcp.WithMIMEType("application/json"),
),
handleAppConfig,
)
func handleAppConfig(ctx context.Context, req mcp.ReadResourceRequest) (*mcp.ReadResourceResult, error) {
config := map[string]interface{}{
"database_url": os.Getenv("DATABASE_URL"),
"debug_mode": os.Getenv("DEBUG") == "true",
"version": "1.0.0",
"features": []string{
"authentication",
"caching",
"logging",
},
}
configJSON, err := json.Marshal(config)
if err != nil {
return nil, err
}
return &mcp.ReadResourceResult{
Contents: []mcp.ResourceContent{
mcp.TextResourceContent{
URI: req.Params.URI,
MIMEType: "application/json",
Text: string(configJSON),
},
},
}, nil
}动态资源
动态资源使用带参数的 URI 模板,允许灵活地参数化访问数据。
URI 模板
使用 {parameter} 语法表示动态部分:
// 带动态用户 ID 的用户资料资源
s.AddResource(
mcp.NewResource(
"users://{user_id}",
"User Profile",
mcp.WithResourceDescription("用户资料信息"),
mcp.WithMIMEType("application/json"),
),
handleUserProfile,
)
func handleUserProfile(ctx context.Context, req mcp.ReadResourceRequest) (*mcp.ReadResourceResult, error) {
// 从 URI 中提取 user_id
userID := extractUserID(req.Params.URI) // "users://123" -> "123"
// 获取用户数据(从数据库、API 等)
user, err := getUserFromDB(userID)
if err != nil {
return nil, fmt.Errorf("user not found: %w", err)
}
jsonData, err := json.Marshal(user)
if err != nil {
return nil, err
}
return []mcp.ResourceContents{
mcp.TextResourceContents{
URI: req.Params.URI,
MIMEType: "application/json",
Text: string(jsonData),
},
}, nil
}
func extractUserID(uri string) string {
// 从 "users://123" 格式中提取 ID
parts := strings.Split(uri, "://")
if len(parts) == 2 {
return parts[1]
}
return ""
}数据库资源
动态公开数据库记录:
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"github.com/mark3labs/mcp-go/mcp"
"github.com/mark3labs/mcp-go/server"
)
// 数据库表资源
s.AddResource(
mcp.NewResource(
"db://{table}/{id}",
"Database Record",
mcp.WithResourceDescription("按表和 ID 访问数据库记录"),
mcp.WithMIMEType("application/json"),
),
handleDatabaseRecord,
)
func handleDatabaseRecord(ctx context.Context, req mcp.ReadResourceRequest) (*mcp.ReadResourceResult, error) {
table, id := parseDBURI(req.Params.URI) // "db://users/123" -> "users", "123"
// 验证表名以确保安全
allowedTables := map[string]bool{
"users": true,
"products": true,
"orders": true,
}
if !allowedTables[table] {
return nil, fmt.Errorf("table not accessible: %s", table)
}
// 查询数据库
query := fmt.Sprintf("SELECT * FROM %s WHERE id = ?", table)
row := db.QueryRowContext(ctx, query, id)
var data map[string]interface{}
if err := scanRowToMap(row, &data); err != nil {
return nil, fmt.Errorf("record not found: %w", err)
}
jsonData, err := json.Marshal(data)
if err != nil {
return nil, err
}
return &mcp.ReadResourceResult{
Contents: []mcp.ResourceContent{
mcp.TextResourceContent{
URI: req.Params.URI,
MIMEType: "application/json",
Text: string(jsonData),
},
},
}, nil
}API 资源
通过资源代理外部 API:
// 天气 API 资源
s.AddResource(
mcp.NewResource(
"weather://{location}",
"Weather Data",
mcp.WithResourceDescription("当前位置的当前天气"),
mcp.WithMIMEType("application/json"),
),
handleWeatherData,
)
func handleWeatherData(ctx context.Context, req mcp.ReadResourceRequest) (*mcp.ReadResourceResult, error) {
location := extractLocation(req.Params.URI)
// 调用外部天气 API
apiURL := fmt.Sprintf("https://api.weather.com/v1/current?location=%s&key=%s",
url.QueryEscape(location), os.Getenv("WEATHER_API_KEY"))
resp, err := http.Get(apiURL)
if err != nil {
return nil, fmt.Errorf("weather API error: %w", err)
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response: %w", err)
}
return &mcp.ReadResourceResult{
Contents: []mcp.ResourceContent{
{
URI: req.Params.URI,
MIMEType: "application/json",
Text: string(body),
},
},
}, nil
}内容类型
资源可以服务不同类型的内容,并带有适当的 MIME 类型。
文本内容
func handleTextResource(ctx context.Context, req mcp.ReadResourceRequest) (*mcp.ReadResourceResult, error) {
content := "这是纯文本内容"
return &mcp.ReadResourceResult{
Contents: []mcp.ResourceContent{
{
URI: req.Params.URI,
MIMEType: "text/plain",
Text: content,
},
},
}, nil
}JSON 内容
func handleJSONResource(ctx context.Context, req mcp.ReadResourceRequest) ([]mcp.ResourceContents, error) {
data := map[string]interface{}{
"message": "Hello, World!",
"timestamp": time.Now().Unix(),
"status": "success",
}
jsonData, err := json.Marshal(data)
if err != nil {
return nil, err
}
return []mcp.ResourceContents{
mcp.TextResourceContents{
URI: req.Params.URI,
MIMEType: "application/json",
Text: string(jsonData),
},
}, nil
}二进制内容
func handleImageResource(ctx context.Context, req mcp.ReadResourceRequest) (*mcp.ReadResourceResult, error) {
imageData, err := os.ReadFile("logo.png")
if err != nil {
return nil, err
}
// 将二进制数据编码为 base64
encoded := base64.StdEncoding.EncodeToString(imageData)
return &mcp.ReadResourceResult{
Contents: []mcp.ResourceContent{
{
URI: req.Params.URI,
MIMEType: "image/png",
Blob: encoded,
},
},
}, nil
}多种内容类型
单个资源可以返回多种内容表示:
func handleMultiFormatResource(ctx context.Context, req mcp.ReadResourceRequest) (*mcp.ReadResourceResult, error) {
data := map[string]interface{}{
"name": "John Doe",
"age": 30,
"city": "New York",
}
// JSON 表示
jsonData, _ := json.Marshal(data)
// 文本表示
textData := fmt.Sprintf("Name: %s\nAge: %d\nCity: %s",
data["name"], data["age"], data["city"])
return &mcp.ReadResourceResult{
Contents: []mcp.ResourceContent{
{
URI: req.Params.URI,
MIMEType: "application/json",
Text: string(jsonData),
},
{
URI: req.Params.URI,
MIMEType: "text/plain",
Text: textData,
},
},
}, nil
}错误处理
适当的错误处理确保资源访问的健壮性:
常见错误模式
func handleResourceWithErrors(ctx context.Context, req mcp.ReadResourceRequest) (*mcp.ReadResourceResult, error) {
// 验证 URI 格式
if !isValidURI(req.Params.URI) {
return nil, fmt.Errorf("invalid URI format: %s", req.Params.URI)
}
// 检查权限
if !hasPermission(ctx, req.Params.URI) {
return nil, fmt.Errorf("access denied to resource: %s", req.Params.URI)
}
// 处理资源未找到
data, err := fetchResourceData(req.Params.URI)
if err != nil {
if errors.Is(err, ErrResourceNotFound) {
return nil, fmt.Errorf("resource not found: %s", req.Params.URI)
}
return nil, fmt.Errorf("failed to fetch resource: %w", err)
}
jsonData, err := json.Marshal(data)
if err != nil {
return nil, err
}
return &mcp.ReadResourceResult{
Contents: []mcp.ResourceContents{
mcp.TextResourceContents{
URI: req.Params.URI,
MIMEType: "application/json",
Text: string(jsonData),
},
},
}, nil
}超时处理
func handleResourceWithTimeout(ctx context.Context, req mcp.ReadResourceRequest) (*mcp.ReadResourceResult, error) {
// 创建超时上下文
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
// 在操作中使用上下文
data, err := fetchDataWithContext(ctx, req.Params.URI)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
return nil, fmt.Errorf("resource fetch timeout: %s", req.Params.URI)
}
return nil, err
}
jsonData, err := json.Marshal(data)
if err != nil {
return nil, err
}
return &mcp.ReadResourceResult{
Contents: []mcp.ResourceContent{
mcp.TextResourceContent{
URI: req.Params.URI,
MIMEType: "application/json",
Text: string(jsonData),
},
},
}, nil
}资源列表
实现客户端的资源发现:
func main() {
s := server.NewMCPServer("Resource Server", "1.0.0",
server.WithResourceCapabilities(true),
)
// 添加多个资源
resources := []struct {
uri string
name string
description string
mimeType string
handler server.ResourceHandler
}{
{"docs://readme", "README", "项目文档", "text/markdown", handleReadme},
{"config://app", "App Config", "应用设置", "application/json", handleConfig},
{"users://{id}", "User Profile", "用户信息", "application/json", handleUser},
}
for _, r := range resources {
s.AddResource(
mcp.NewResource(r.uri, r.name,
mcp.WithResourceDescription(r.description),
mcp.WithMIMEType(r.mimeType),
),
r.handler,
)
}
server.ServeStdio(s)
}资源缓存
为昂贵的资源实现缓存:
type CachedResourceHandler struct {
cache map[string]cacheEntry
mutex sync.RWMutex
ttl time.Duration
}
type cacheEntry struct {
data *mcp.ReadResourceResult
timestamp time.Time
}
func (h *CachedResourceHandler) HandleResource(ctx context.Context, req mcp.ReadResourceRequest) (*mcp.ReadResourceResult, error) {
h.mutex.RLock()
if entry, exists := h.cache[req.Params.URI]; exists {
if time.Since(entry.timestamp) < h.ttl {
h.mutex.RUnlock()
return entry.data, nil
}
}
h.mutex.RUnlock()
// 获取新数据
data, err := h.fetchFreshData(ctx, req)
if err != nil {
return nil, err
}
// 缓存结果
h.mutex.Lock()
h.cache[req.Params.URI] = cacheEntry{
data: data,
timestamp: time.Now(),
}
h.mutex.Unlock()
return data, nil
}资源订阅
MCP 允许客户端通过 resources/subscribe 订阅单个资源的更新,然后通过 resources/unsubscribe 取消订阅。当订阅的资源发生变化时,服务器稍后会推送 notifications/resources/updated。
使用 WithResourceCapabilities 的第一个参数启用该能力:
s := server.NewMCPServer("Resource Server", "1.0.0",
// (subscribe, listChanged)
server.WithResourceCapabilities(true, true),
)一旦 subscribe 能力被公布,服务器就会接受 resources/subscribe 和 resources/unsubscribe 请求,并用空结果确认它们。如果客户端在能力被禁用(或未配置资源能力)时调用任一方法,服务器会返回 METHOD_NOT_FOUND。
按会话跟踪订阅
默认处理程序确认请求但不自行存储任何订阅状态。要记住哪个会话对哪个 URI 感兴趣,请在自定义会话类型上实现可选的 SessionWithResourceSubscriptions 接口——调度程序会自动调用它:
type mySession struct {
server.ClientSession
mu sync.Mutex
subs map[string]struct{}
}
func (s *mySession) SubscribeToResource(uri string) {
s.mu.Lock()
defer s.mu.Unlock()
if s.subs == nil {
s.subs = make(map[string]struct{})
}
s.subs[uri] = struct{}{}
}
func (s *mySession) UnsubscribeFromResource(uri string) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.subs, uri)
}
func (s *mySession) SubscribedResources() []string {
s.mu.Lock()
defer s.mu.Unlock()
out := make([]string, 0, len(s.subs))
for uri := range s.subs {
out = append(out, uri)
}
return out
}
func (s *mySession) IsSubscribedToResource(uri string) bool {
s.mu.Lock()
defer s.mu.Unlock()
_, ok := s.subs[uri]
return ok
}对未知 URI 的重复订阅和取消订阅被视为空操作,与规范一致。
推送 resources/updated 通知
当资源发生变化时,只向请求它的客户端发送更新。最简单的方法是通过 OnRegisterSession / OnUnregisterSession 钩子自行跟踪会话,然后查找订阅了该 URI 的会话,并调用 SendNotificationToSpecificClient:
var (
sessionsMu sync.Mutex
sessions = map[string]server.ClientSession{}
)
hooks := &server.Hooks{}
hooks.AddOnRegisterSession(func(ctx context.Context, session server.ClientSession) {
sessionsMu.Lock()
defer sessionsMu.Unlock()
sessions[session.SessionID()] = session
})
hooks.AddOnUnregisterSession(func(ctx context.Context, session server.ClientSession) {
sessionsMu.Lock()
defer sessionsMu.Unlock()
delete(sessions, session.SessionID())
})
func notifyResourceUpdated(s *server.MCPServer, uri string) {
sessionsMu.Lock()
snapshot := make([]server.ClientSession, 0, len(sessions))
for _, sess := range sessions {
snapshot = append(snapshot, sess)
}
sessionsMu.Unlock()
for _, sess := range snapshot {
subs, ok := sess.(server.SessionWithResourceSubscriptions)
if !ok || !subs.IsSubscribedToResource(uri) {
continue
}
_ = s.SendNotificationToSpecificClient(
sess.SessionID(),
"notifications/resources/updated",
map[string]any{"uri": uri},
)
}
}订阅钩子
如果您只需要观察订阅/取消订阅流量(用于指标、审计日志或驱动外部发布/订阅系统)而不需要维护每个会话的状态,请附加钩子:
hooks := &server.Hooks{}
hooks.AddBeforeSubscribe(func(ctx context.Context, id any, req *mcp.SubscribeRequest) {
log.Printf("subscribe: %s", req.Params.URI)
})
hooks.AddAfterUnsubscribe(func(ctx context.Context, id any, req *mcp.UnsubscribeRequest, _ *mcp.EmptyResult) {
log.Printf("unsubscribed: %s", req.Params.URI)
})
s := server.NewMCPServer("Resource Server", "1.0.0",
server.WithResourceCapabilities(true, true),
server.WithHooks(hooks),
)也有匹配的 AddAfterSubscribe 和 AddBeforeUnsubscribe 钩子可用。请参阅钩子了解更广泛的钩子契约。
高级资源模式
会话特定资源
您可以将资源添加到特定的客户端会话,允许不同的客户端查看不同的资源或使用会话特定实现覆盖全局资源。
使用辅助函数(推荐)
服务器提供方便的辅助函数,与会话工具辅助函数类似:
// 添加单个会话资源
userResource := mcp.NewResource(
"user://profile",
"User Profile",
mcp.WithResourceDescription("当前用户资料数据"),
)
err := s.AddSessionResource(
sessionID,
userResource,
func(ctx context.Context, request mcp.ReadResourceRequest) ([]mcp.ResourceContents, error) {
// 此处理程序仅对该特定会话可用
return []mcp.ResourceContents{
mcp.TextResourceContents{
URI: request.Params.URI,
MIMEType: "application/json",
Text: getUserProfile(sessionID),
},
}, nil
},
)
if err != nil {
log.Printf("Failed to add session resource: %v", err)
}
// 一次添加多个会话资源
err = s.AddSessionResources(
sessionID,
server.ServerResource{
Resource: mcp.NewResource("user://settings", "User Settings"),
Handler: func(ctx context.Context, req mcp.ReadResourceRequest) ([]mcp.ResourceContents, error) {
return getUserSettings(sessionID)
},
},
server.ServerResource{
Resource: mcp.NewResource("user://history", "User History"),
Handler: func(ctx context.Context, req mcp.ReadResourceRequest) ([]mcp.ResourceContents, error) {
return getUserHistory(sessionID)
},
},
)
if err != nil {
log.Printf("Failed to add session resources: %v", err)
}
// 不再需要时删除会话资源
err = s.DeleteSessionResources(sessionID, "user://profile", "user://settings")
if err != nil {
log.Printf("Failed to delete session resources: %v", err)
}直接使用接口
您也可以直接使用 SessionWithResources 接口:
sseServer := server.NewSSEServer(
s,
server.WithAppendQueryToMessageEndpoint(),
server.WithSSEContextFunc(func(ctx context.Context, r *http.Request) context.Context {
withNewResources := r.URL.Query().Get("withNewResources")
if withNewResources != "1" {
return ctx
}
session := server.ClientSessionFromContext(ctx)
if sessionWithResources, ok := session.(server.SessionWithResources); ok {
// 添加新资源
sessionWithResources.SetSessionResources(map[string]server.ServerResource{
myNewResource.URI: {
Resource: myNewResource,
Handler: myNewResourceHandler,
},
})
}
return ctx
}),
)重要说明
- 会话资源会覆盖具有相同 URI 的全局资源
- 添加/删除资源时会自动发送通知(
resources/list_changed) - 首次添加会话资源时,服务器会自动注册资源能力
- 操作是线程安全的,可以并发调用
- 除非明确在初始化之前添加,否则资源仅对已初始化的会话可用

