Skip to content

Gateway 内部机制

hermes agent Gateway 内部机制

Messaging gateway 是一个长期运行的进程,它通过统一架构将 Hermes 连接到 20+ 个外部消息平台。

文件用途
gateway/run.pyGatewayRunner —— 主循环、斜杠命令、消息分发(大型文件;当前 LOC 请查看 git)
gateway/session.pySessionStore —— conversation 持久化和 session key 构造
gateway/delivery.py向目标平台 / channels 发送 outbound messages
gateway/pairing.py用于用户授权的 DM pairing flow
gateway/channel_directory.py将 chat IDs 映射为人类可读名称,用于 cron delivery
gateway/hooks.pyHook discovery、loading 和 lifecycle event dispatch
gateway/mirror.py用于 send_message 的跨 session message mirroring
gateway/status.pyProfile-scoped gateway instances 的 token lock management
gateway/builtin_hooks/始终注册的 hooks 的扩展点(当前未随附任何内容)
gateway/platforms/Platform adapters(每个 messaging platform 一个)
┌─────────────────────────────────────────────────┐
│ GatewayRunner │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Telegram │ │ Discord │ │ Slack │ │
│ │ Adapter │ │ Adapter │ │ Adapter │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │
│ └─────────────┼─────────────┘ │
│ ▼ │
│ _handle_message() │
│ │ │
│ ┌───────────┼───────────┐ │
│ ▼ ▼ ▼ │
│ Slash command AIAgent Queue/BG │
│ dispatch creation sessions │
│ │ │
│ ▼ │
│ SessionStore │
│ (SQLite persistence) │
└───────┴─────────────┴─────────────┴─────────────┘

当任何平台收到一条消息时:

  1. Platform adapter 接收原始 event,并将其规范化为 MessageEvent
  2. Base adapter 检查 active session guard:
    • 如果 agent 正在为该 session 运行 → 将消息加入 queue,并设置 interrupt event
    • 如果是 /approve/deny/stop → 绕过 guard(inline dispatch)
  3. GatewayRunner._handle_message() 接收 event:
    • 通过 _session_key_for_source() 解析 session key(格式:agent:main:{platform}:{chat_type}:{chat_id}
    • 检查 authorization(见下面的 Authorization)
    • 检查它是否是 slash command → 分发到 command handler
    • 检查 agent 是否已经在运行 → 拦截 /stop/status 等命令
    • 否则 → 创建 AIAgent 实例并运行 conversation
  4. Response 会通过 platform adapter 发回

Session keys 会编码完整的 routing context:

agent:main:{platform}:{chat_type}:{chat_id}

例如:

agent:main:telegram:private:123456789

支持 thread 的平台(Telegram forum topics、Discord threads、Slack threads)可能会在 chat_id 部分包含 thread IDs。永远不要手动构造 session keys —— 始终使用 gateway/session.py 中的 build_session_key()

当 agent 正在 active 运行时,传入 messages 会经过两个连续 guard:

  1. Level 1 —— Base adapter(gateway/platforms/base.py):检查 _active_sessions。如果 session active,则将消息加入 _pending_messages 并设置 interrupt event。这会在消息到达 gateway runner 之前捕获它们。

  2. Level 2 —— Gateway runner(gateway/run.py):检查 _running_agents。拦截特定命令(/stop/new/queue/status/approve/deny),并将它们路由到合适位置。其他所有内容都会触发 running_agent.interrupt()

当 agent 被阻塞时仍必须到达 runner 的命令(例如 /approve)会通过 await self._message_handler(event) inline dispatch —— 它们会绕过 background task system,以避免 race conditions。

Gateway 使用多层 authorization check,按顺序评估:

  1. Per-platform allow-all flag(例如 TELEGRAM_ALLOW_ALL_USERS)—— 如果设置,则该平台上的所有用户都被授权
  2. Platform allowlist(例如 TELEGRAM_ALLOWED_USERS)—— 逗号分隔的 user IDs
  3. DM pairing —— 已认证用户可以通过 pairing code 配对新用户
  4. Global allow-all(GATEWAY_ALLOW_ALL_USERS)—— 如果设置,则所有平台上的所有用户都被授权
  5. 默认:deny —— 未授权用户会被拒绝
Admin: /pair
Gateway: "Pairing code: ABC123. Share with the user."
New user: ABC123
Gateway: "Paired! You're now authorized."

Pairing state 会持久化在 gateway/pairing.py 中,并且在重启后仍然保留。

Gateway 中的所有斜杠命令都会经过同一个解析 pipeline:

  1. hermes_cli/commands.py 中的 resolve_command() 会将输入映射到 canonical name(处理 aliases、prefix matching)
  2. canonical name 会与 GATEWAY_KNOWN_COMMANDS 进行检查
  3. _handle_message() 中的 handler 会根据 canonical name 进行分发
  4. 某些命令会受 config 限制(CommandDef 上的 gateway_config_gate

当 agent 正在处理时,不能执行的命令会被提前拒绝:

if _quick_key in self._running_agents:
if canonical == "model":
return "⏳ Agent is running — wait for it to finish or /stop first."

Bypass commands(/stop/new/approve/deny/queue/status)有特殊处理。

Gateway 从多个来源读取配置:

Source提供内容
~/.hermes/.envAPI keys、bot tokens、platform credentials
~/.hermes/config.yamlModel settings、tool configuration、display options
Environment variables覆盖上述任意配置

不同于 CLI(使用带硬编码 defaults 的 load_cli_config()),gateway 会直接通过 YAML loader 读取 config.yaml。这意味着 CLI 的 defaults dict 中存在、但用户 config 文件中不存在的配置 key,在 CLI 和 gateway 之间可能表现不同。

每个 messaging platform 都在 gateway/platforms/ 中有一个 adapter:

gateway/platforms/
├── base.py # BaseAdapter — 所有平台的共享逻辑
├── telegram.py # Telegram Bot API(long polling 或 webhook)
├── discord.py # 通过 discord.py 使用 Discord bot
├── slack.py # Slack Socket Mode
├── whatsapp.py # WhatsApp Business Cloud API
├── signal.py # 通过 signal-cli REST API 使用 Signal
├── matrix.py # 通过 mautrix 使用 Matrix(可选 E2EE)
├── mattermost.py # Mattermost WebSocket API
├── email.py # 通过 IMAP/SMTP 使用 Email
├── sms.py # 通过 Twilio 使用 SMS
├── dingtalk.py # DingTalk WebSocket
├── feishu.py # Feishu/Lark WebSocket 或 webhook
├── wecom.py # WeCom(WeChat Work)callback
├── weixin.py # 通过 iLink Bot API 使用 Weixin(个人微信)
├── bluebubbles.py # 通过 BlueBubbles macOS server 使用 Apple iMessage
├── qqbot/ # QQ Bot(腾讯 QQ)通过 Official API v2(子包:adapter.py、crypto.py、keyboards.py 等)
├── yuanbao.py # Yuanbao(腾讯)DM/group adapter
├── feishu_comment.py # Feishu document/drive comment-reply handler
├── msgraph_webhook.py # Microsoft Graph change-notification webhook(Teams、Outlook 等)
├── webhook.py # Inbound/outbound webhook adapter
├── api_server.py # REST API server adapter
└── homeassistant.py # Home Assistant conversation integration

Adapters 实现一个通用接口:

  • connect() / disconnect() —— lifecycle management
  • send_message() —— outbound message delivery
  • on_message() —— inbound message normalization → MessageEvent

使用唯一 credentials 连接的 adapters 会在 connect() 中调用 acquire_scoped_lock(),并在 disconnect() 中调用 release_scoped_lock()。这可以防止两个 profiles 同时使用同一个 bot token。

Outgoing deliveries(gateway/delivery.py)处理:

  • Direct reply —— 将 response 发回原始 chat
  • Home channel delivery —— 将 cron job outputs 和 background results 路由到配置好的 home channel
  • Explicit target delivery —— send_message tool 指定 telegram:-1001234567890,或者 hermes send CLI 包装同一个 tool 给 shell scripts 使用
  • Cross-platform delivery —— 投递到不同于原始消息的平台

Cron job deliveries 不会被 mirror 到 gateway session history 中 —— 它们只存在于自己的 cron session 中。这是一个有意的设计选择,用于避免 message alternation violations。

Gateway hooks 是响应 lifecycle events 的 Python modules:

Event触发时机
gateway:startupGateway process 启动
session:start新 conversation session 开始
session:endSession 完成或超时
session:reset用户使用 /new 重置 session
agent:startAgent 开始处理 message
agent:stepAgent 完成一个 tool-calling iteration
agent:endAgent 完成并返回 response
command:*任意 slash command 被执行

Hooks 会从 gateway/builtin_hooks/(扩展点 —— 当前发布版本为空;_register_builtin_hooks() 是 no-op stub)和 ~/.hermes/hooks/(用户安装)中发现。每个 hook 都是一个包含 HOOK.yaml manifest 和 handler.py 的目录。

当启用 memory provider plugin(例如 Honcho)时:

  1. Gateway 会为每条 message 创建一个带 session ID 的 AIAgent
  2. MemoryManager 会使用 session context 初始化 provider
  3. Provider tools(例如 honcho_profileviking_search)会通过以下路径路由:
AIAgent._invoke_tool()
→ self._memory_manager.handle_tool_call(name, args)
→ provider.handle_tool_call(name, args)
  1. 在 session end/reset 时,会触发 on_session_end() 用于 cleanup 和 final data flush。

当 session 被 reset、resumed 或 expires 时:

  1. Built-in memories 会 flush 到磁盘
  2. Memory provider 的 on_session_end() hook 会触发
  3. 一个临时 AIAgent 会运行一个 memory-only conversation turn
  4. Context 随后会被丢弃或归档

Gateway 会在处理消息的同时运行周期性维护:

  • Cron ticking —— 检查 job schedules 并触发到期 jobs
  • Session expiry —— 在超时后清理 abandoned sessions
  • Memory flush —— 在 session expiry 前主动 flush memory
  • Cache refresh —— 刷新 model lists 和 provider status

Gateway 是长期运行的进程,通过以下方式管理:

  • hermes gateway start / hermes gateway stop —— 手动控制
  • systemctl(Linux)或 launchctl(macOS)—— service management
  • 位于 ~/.hermes/gateway.pid 的 PID file —— profile-scoped process tracking

Profile-scoped vs global:start_gateway() 使用 profile-scoped PID files。hermes gateway stop 只会停止当前 profile 的 gateway。hermes gateway stop --all 使用全局 ps aux 扫描来杀掉所有 gateway processes(更新期间使用)。

  • Session Storage
  • Cron Internals
  • ACP Internals
  • Agent Loop Internals
  • Messaging Gateway(User Guide)
-
0:000:00