Skip to content

Add Chat SDK messenger example with managed fiber durability#1563

Merged
threepointone merged 8 commits into
mainfrom
chat-sdk-messenger-example
May 19, 2026
Merged

Add Chat SDK messenger example with managed fiber durability#1563
threepointone merged 8 commits into
mainfrom
chat-sdk-messenger-example

Conversation

@threepointone
Copy link
Copy Markdown
Contributor

@threepointone threepointone commented May 19, 2026

Summary

This PR adds a Chat SDK messenger example and the managed fiber APIs needed to make webhook-driven AI replies durable, idempotent, inspectable, and recoverable.

The example uses Chat SDK as the messenger abstraction, Telegram as the current adapter, Agents subagents for Chat SDK state, and a Think-backed conversation subagent for AI replies. The core Agents SDK gains startFiber() and related APIs so application-owned jobs, such as webhook reply work, can be accepted once, deduped across retries, inspected later, cancelled, cleaned up, and explicitly recovered after eviction.

Why

Webhook-driven chat integrations have an awkward failure boundary:

  • Providers can retry the same webhook after a timeout.
  • LLM turns can run longer than webhook deadlines.
  • A Durable Object can be evicted while a visible reply is mid-stream.
  • Retrying blindly can duplicate replies.
  • Returning quickly without a retained job record makes status and recovery opaque.
  • Raw runFiber() is useful for crash recovery, but it does not provide a stable public job identity, idempotency key, retained terminal status, or an explicit application recovery result.

This PR adds a higher-level managed fiber layer on top of runFiber() for those application-owned jobs, then uses it in the Chat SDK messenger example.

What Changed

Chat SDK messenger example

Adds examples/chat-sdk-messenger, a server-only Worker example that demonstrates:

  • Chat SDK webhook ingress using Telegram as the current provider adapter.
  • A Chat SDK StateAdapter backed by an Agents subagent.
  • A Think-backed ConversationAgent subagent for AI replies.
  • Streaming AI text into Chat SDK message updates.
  • Recovery behavior for interrupted reply jobs.
  • Documentation that frames Chat SDK as the core abstraction and Telegram as a replaceable adapter.

The example intentionally keeps provider-specific behavior thin so future adapters can reuse the same shape.

Think streaming and cancellation

The Think path now supports RPC-safe streamed turns for this use case:

  • chat() streams text deltas through a serializable callback target.
  • The callback receives a start event with a request ID.
  • Callers can cancel with cancelChat(requestId) / cancelAllChats() instead of passing non-serializable AbortSignals across Durable Object RPC.
  • Streaming edge cases are handled in the messenger example, including empty responses, partial failures, post failures, and cancellation.

Managed fiber jobs

Adds a retained managed job layer to Agent:

  • startFiber() durably accepts a background job.
  • idempotencyKey dedupes retries.
  • fiberId gives stable identity.
  • inspectFiber() / inspectFiberByKey() expose retained status.
  • listFibers() supports status inspection.
  • cancelFiber() / cancelFiberByKey() record cooperative cancellation.
  • deleteFibers() cleans up retained terminal rows.
  • resolveFiber() lets app-level recovery resolve interrupted jobs.
  • onFiberRecovered() can return a FiberRecoveryResult to record explicit recovery policy.

Managed fibers retain terminal status separately from the transient cf_agents_runs rows used by raw fibers. This gives callers a durable job ledger without breaking the existing runFiber() primitive.

waitForCompletion

Adds waitForCompletion: true to startFiber() for cases where the caller should wait until the retained job reaches a terminal status.

The messenger example uses this to preserve Chat SDK's visible reply serialization semantics. Without it, durable acceptance would return quickly and allow multiple visible replies to overlap in the same thread.

The implementation waits on managed fiber terminal ledger status, not just the callback promise, so cancellation and recovery unblock waiters correctly.

Recovery semantics

Managed fiber recovery is explicit:

  • Successful execution records completed.
  • Thrown callbacks record error.
  • Cancellation records aborted.
  • Eviction or lost execution records interrupted.
  • onFiberRecovered() may return { status: "completed" | "error" | "aborted" | "interrupted" }.
  • Returning undefined leaves the row interrupted.
  • Throwing during recovery leaves the row interrupted and records the recovery error.
  • resolveFiber() only updates currently interrupted rows.

deleteFibers() defaults to deleting completed, error, and aborted rows, while preserving interrupted rows for inspection or manual/application-level resolution.

Public API Notes

New managed fiber APIs are additive and backward-compatible with existing runFiber() usage.

Important naming choices:

  • Public inspection uses settledAt, because the timestamp applies to any terminal state, not only successful completion.
  • Cleanup uses settledBefore for the same reason.
  • The database column remains internal and does not affect the public API.

Documentation

Updates docs for:

  • Durable execution and managed fiber recipes.
  • Agent class reference.
  • Long-running agent guidance.
  • Think turn API selection.
  • Think programmatic submissions.
  • Webhook and server-driven message guidance.
  • Chat SDK messenger production behavior and future TODOs.
  • Think package and example README references.

The docs clarify the intended boundary:

  • submitMessages() owns durable Think conversation admission.
  • startFiber() owns external application jobs around a turn.
  • Workflows own multi-step orchestration.
  • Think/AIChat internals continue using raw runFiber() for stream recovery because those are internal recovery fibers, not externally inspectable application jobs.

Tests

Adds and updates unit and E2E coverage for managed fibers and the messenger example.

Covered cases include:

  • New managed fiber acceptance.
  • Duplicate idempotency keys.
  • Explicit fiberId behavior.
  • Cancellation.
  • Callback failures.
  • waitForCompletion for new and duplicate jobs.
  • Waiters resolving after cancellation.
  • Default cleanup preserving interrupted rows.
  • Managed fiber recovery result application.
  • Sub-agent managed fiber recovery.
  • Process-kill E2E recovery through wrangler dev.
  • Duplicate waitForCompletion after restart/stale ledger recovery.
  • Messenger state adapter behavior.
  • Messenger AI reply policy helpers.

Verification

Ran:

  • npm run test:workers -w agents -- "src/tests/run-fiber.test.ts"
  • npm run test:e2e -w agents
  • npm run test -w @cloudflare/agents-chat-sdk-messenger
  • npm run check

Note: one npm run check attempt hit a transient experimental/gadgets-chat agents/react type resolution failure. Rerunning npm run check passed with all 85 projects typechecking successfully.

Review Focus

Suggested review areas:

  • Managed fiber API shape and naming.
  • waitForCompletion semantics, especially duplicate calls, cancellation, and recovery.
  • Whether interrupted should remain excluded from default cleanup.
  • Messenger example architecture: Chat SDK as core, Telegram as adapter, Agents subagents for state, Think subagent for intelligence.
  • Recovery policy DX: whether FiberRecoveryResult is explicit enough for applications.
  • Docs clarity around runFiber() vs startFiber() vs submitMessages() vs Workflows.

Follow-Ups

Not included in this PR:

  • Telegram-specific retry buttons or admin commands.
  • Messenger-specific process-kill E2E tests.
  • Moving Think or AIChat internals from raw runFiber() to managed fibers.
  • Production bot features such as operator dashboards or provider-specific recovery UI.

Made with Cursor


Open in Devin Review

threepointone and others added 5 commits May 19, 2026 18:51
Demonstrates Chat SDK ingress on Agents with subagent-backed state and Think-owned conversation replies.

Co-authored-by: Cursor <cursoragent@cursor.com>
Adds Think chat streaming with RPC-safe cancellation so messenger delivery failures can stop the corresponding sub-agent turn.

Co-authored-by: Cursor <cursoragent@cursor.com>
Introduce managed fiber jobs on top of runFiber so agents can durably accept idempotent background work, inspect retained status, cancel running jobs, explicitly resolve interrupted jobs, and record recovery policy decisions. This adds the cf_agents_fibers ledger, schema v8 migration, status/list/delete/resolve APIs, cooperative cancellation signals, and waitForCompletion support that waits on terminal ledger state instead of only the callback promise.

Tighten crash recovery semantics for managed work by reconciling stale run rows, recovering ledger-only pending/running rows, skipping recovery for already-terminal fibers, settling setup failures, and letting onFiberRecovered return a FiberRecoveryResult to move interrupted fibers to completed, error, aborted, or intentionally interrupted. The implementation also tracks active managed executions and terminal waiters so duplicate requests can join in-memory work when possible while post-restart retries drive the same recovery path.

Use the new managed fiber API in the Chat SDK messenger example for AI replies. Telegram messages now get a stable per-message idempotency boundary, completion waiting preserves Chat SDK per-thread visible reply serialization, and recovery policy is explicit: accepted replies are replayed while mid-stream interruptions post a concise apology and settle the retained job.

Expand coverage across unit, sub-agent, schema, and real eviction tests. The E2E harness now starts wrangler dev with persisted SQLite state, kills it mid-managed-fiber, restarts it, and verifies interrupted retention, recovery-result settlement, duplicate waitForCompletion retries after restart, and sub-agent managed fiber recovery through the parent alarm.

Document the new durable job surface in the Agent and durable execution docs, including waitForCompletion, cancellation behavior, retained terminal records, explicit recovery outcomes, and how this differs from Think message admission.

Co-authored-by: Cursor <cursoragent@cursor.com>
Rename the public managed-fiber terminal timestamp from completedAt to settledAt, and rename the cleanup filter from completedBefore to settledBefore. These names better describe terminal rows across completed, error, aborted, and interrupted states while keeping the existing SQLite completed_at column internal.

Make default deleteFibers() cleanup preserve interrupted rows. Interrupted managed fibers often need inspection or explicit application-level resolution, so callers must now opt in to deleting them by passing status: "interrupted".

Clarify FiberContext.snapshot documentation so it does not imply callbacks are automatically re-entered with recovered snapshots; recovery snapshots are delivered through onFiberRecovered(). Add a regression test that default cleanup deletes completed rows while preserving interrupted rows, then verifies explicit interrupted cleanup still works.

Co-authored-by: Cursor <cursoragent@cursor.com>
Add practical guidance for using managed fibers around webhook-style application jobs, including retained cleanup with settledBefore, interrupted recovery, resolveFiber, and waitForCompletion behavior.

Clarify the boundary between Think submissions and managed fibers across the Think docs, package README, server-driven messaging docs, webhook docs, and examples so users can distinguish durable Think turn admission from app-owned side-effect jobs.

Co-authored-by: Cursor <cursoragent@cursor.com>
@changeset-bot
Copy link
Copy Markdown

changeset-bot Bot commented May 19, 2026

🦋 Changeset detected

Latest commit: 0b72b49

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 2 packages
Name Type
agents Patch
@cloudflare/think Patch

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 2 potential issues.

View 7 additional findings in Devin Review.

Open in Devin Review

Comment thread .changeset/quiet-chats-cancel.md Outdated
Comment thread examples/chat-sdk-messenger/src/state/agent.ts
threepointone and others added 2 commits May 19, 2026 23:09
Use the workspace dependency for the Chat SDK messenger example's Think package so npm ci can resolve the merged branch after main's version-package release.

Always run npm ci in the shared GitHub install action while relying on setup-node's npm package cache, avoiding stale node_modules cache hits that can mask lockfile drift.

Co-authored-by: Cursor <cursoragent@cursor.com>
@pkg-pr-new
Copy link
Copy Markdown

pkg-pr-new Bot commented May 19, 2026

Open in StackBlitz

agents

npm i https://pkg.pr.new/agents@1563

@cloudflare/ai-chat

npm i https://pkg.pr.new/@cloudflare/ai-chat@1563

@cloudflare/codemode

npm i https://pkg.pr.new/@cloudflare/codemode@1563

hono-agents

npm i https://pkg.pr.new/hono-agents@1563

@cloudflare/shell

npm i https://pkg.pr.new/@cloudflare/shell@1563

@cloudflare/think

npm i https://pkg.pr.new/@cloudflare/think@1563

@cloudflare/voice

npm i https://pkg.pr.new/@cloudflare/voice@1563

@cloudflare/worker-bundler

npm i https://pkg.pr.new/@cloudflare/worker-bundler@1563

commit: 0b72b49

Correct the malformed Think changeset frontmatter so Changesets can parse the release metadata.

Ensure waitForCompletion waits for a terminal managed fiber status even when duplicate calls race with an already-running recovery pass, and cover the race with a regression test. Also document and test the Chat SDK state adapter's list-level TTL behavior.

Co-authored-by: Cursor <cursoragent@cursor.com>
@threepointone threepointone merged commit 32cde40 into main May 19, 2026
4 checks passed
@threepointone threepointone deleted the chat-sdk-messenger-example branch May 19, 2026 22:53
@github-actions github-actions Bot mentioned this pull request May 19, 2026
cjol pushed a commit that referenced this pull request May 20, 2026
* Add Chat SDK messenger example

Demonstrates Chat SDK ingress on Agents with subagent-backed state and Think-owned conversation replies.

Co-authored-by: Cursor <cursoragent@cursor.com>

* Stream Chat SDK messenger replies

Adds Think chat streaming with RPC-safe cancellation so messenger delivery failures can stop the corresponding sub-agent turn.

Co-authored-by: Cursor <cursoragent@cursor.com>

* Add managed fiber jobs

Introduce managed fiber jobs on top of runFiber so agents can durably accept idempotent background work, inspect retained status, cancel running jobs, explicitly resolve interrupted jobs, and record recovery policy decisions. This adds the cf_agents_fibers ledger, schema v8 migration, status/list/delete/resolve APIs, cooperative cancellation signals, and waitForCompletion support that waits on terminal ledger state instead of only the callback promise.

Tighten crash recovery semantics for managed work by reconciling stale run rows, recovering ledger-only pending/running rows, skipping recovery for already-terminal fibers, settling setup failures, and letting onFiberRecovered return a FiberRecoveryResult to move interrupted fibers to completed, error, aborted, or intentionally interrupted. The implementation also tracks active managed executions and terminal waiters so duplicate requests can join in-memory work when possible while post-restart retries drive the same recovery path.

Use the new managed fiber API in the Chat SDK messenger example for AI replies. Telegram messages now get a stable per-message idempotency boundary, completion waiting preserves Chat SDK per-thread visible reply serialization, and recovery policy is explicit: accepted replies are replayed while mid-stream interruptions post a concise apology and settle the retained job.

Expand coverage across unit, sub-agent, schema, and real eviction tests. The E2E harness now starts wrangler dev with persisted SQLite state, kills it mid-managed-fiber, restarts it, and verifies interrupted retention, recovery-result settlement, duplicate waitForCompletion retries after restart, and sub-agent managed fiber recovery through the parent alarm.

Document the new durable job surface in the Agent and durable execution docs, including waitForCompletion, cancellation behavior, retained terminal records, explicit recovery outcomes, and how this differs from Think message admission.

Co-authored-by: Cursor <cursoragent@cursor.com>

* Polish managed fiber cleanup API

Rename the public managed-fiber terminal timestamp from completedAt to settledAt, and rename the cleanup filter from completedBefore to settledBefore. These names better describe terminal rows across completed, error, aborted, and interrupted states while keeping the existing SQLite completed_at column internal.

Make default deleteFibers() cleanup preserve interrupted rows. Interrupted managed fibers often need inspection or explicit application-level resolution, so callers must now opt in to deleting them by passing status: "interrupted".

Clarify FiberContext.snapshot documentation so it does not imply callbacks are automatically re-entered with recovered snapshots; recovery snapshots are delivered through onFiberRecovered(). Add a regression test that default cleanup deletes completed rows while preserving interrupted rows, then verifies explicit interrupted cleanup still works.

Co-authored-by: Cursor <cursoragent@cursor.com>

* Document managed fiber adoption patterns

Add practical guidance for using managed fibers around webhook-style application jobs, including retained cleanup with settledBefore, interrupted recovery, resolveFiber, and waitForCompletion behavior.

Clarify the boundary between Think submissions and managed fibers across the Think docs, package README, server-driven messaging docs, webhook docs, and examples so users can distinguish durable Think turn admission from app-owned side-effect jobs.

Co-authored-by: Cursor <cursoragent@cursor.com>

* Fix PR install after main package bumps

Use the workspace dependency for the Chat SDK messenger example's Think package so npm ci can resolve the merged branch after main's version-package release.

Always run npm ci in the shared GitHub install action while relying on setup-node's npm package cache, avoiding stale node_modules cache hits that can mask lockfile drift.

Co-authored-by: Cursor <cursoragent@cursor.com>

* Fix managed fiber review issues

Correct the malformed Think changeset frontmatter so Changesets can parse the release metadata.

Ensure waitForCompletion waits for a terminal managed fiber status even when duplicate calls race with an already-running recovery pass, and cover the race with a regression test. Also document and test the Chat SDK state adapter's list-level TTL behavior.

Co-authored-by: Cursor <cursoragent@cursor.com>

---------

Co-authored-by: Cursor <cursoragent@cursor.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant