Add Chat SDK messenger example with managed fiber durability#1563
Merged
Conversation
Demonstrates Chat SDK ingress on Agents with subagent-backed state and Think-owned conversation replies. Co-authored-by: Cursor <cursoragent@cursor.com>
Adds Think chat streaming with RPC-safe cancellation so messenger delivery failures can stop the corresponding sub-agent turn. Co-authored-by: Cursor <cursoragent@cursor.com>
Introduce managed fiber jobs on top of runFiber so agents can durably accept idempotent background work, inspect retained status, cancel running jobs, explicitly resolve interrupted jobs, and record recovery policy decisions. This adds the cf_agents_fibers ledger, schema v8 migration, status/list/delete/resolve APIs, cooperative cancellation signals, and waitForCompletion support that waits on terminal ledger state instead of only the callback promise. Tighten crash recovery semantics for managed work by reconciling stale run rows, recovering ledger-only pending/running rows, skipping recovery for already-terminal fibers, settling setup failures, and letting onFiberRecovered return a FiberRecoveryResult to move interrupted fibers to completed, error, aborted, or intentionally interrupted. The implementation also tracks active managed executions and terminal waiters so duplicate requests can join in-memory work when possible while post-restart retries drive the same recovery path. Use the new managed fiber API in the Chat SDK messenger example for AI replies. Telegram messages now get a stable per-message idempotency boundary, completion waiting preserves Chat SDK per-thread visible reply serialization, and recovery policy is explicit: accepted replies are replayed while mid-stream interruptions post a concise apology and settle the retained job. Expand coverage across unit, sub-agent, schema, and real eviction tests. The E2E harness now starts wrangler dev with persisted SQLite state, kills it mid-managed-fiber, restarts it, and verifies interrupted retention, recovery-result settlement, duplicate waitForCompletion retries after restart, and sub-agent managed fiber recovery through the parent alarm. Document the new durable job surface in the Agent and durable execution docs, including waitForCompletion, cancellation behavior, retained terminal records, explicit recovery outcomes, and how this differs from Think message admission. Co-authored-by: Cursor <cursoragent@cursor.com>
Rename the public managed-fiber terminal timestamp from completedAt to settledAt, and rename the cleanup filter from completedBefore to settledBefore. These names better describe terminal rows across completed, error, aborted, and interrupted states while keeping the existing SQLite completed_at column internal. Make default deleteFibers() cleanup preserve interrupted rows. Interrupted managed fibers often need inspection or explicit application-level resolution, so callers must now opt in to deleting them by passing status: "interrupted". Clarify FiberContext.snapshot documentation so it does not imply callbacks are automatically re-entered with recovered snapshots; recovery snapshots are delivered through onFiberRecovered(). Add a regression test that default cleanup deletes completed rows while preserving interrupted rows, then verifies explicit interrupted cleanup still works. Co-authored-by: Cursor <cursoragent@cursor.com>
Add practical guidance for using managed fibers around webhook-style application jobs, including retained cleanup with settledBefore, interrupted recovery, resolveFiber, and waitForCompletion behavior. Clarify the boundary between Think submissions and managed fibers across the Think docs, package README, server-driven messaging docs, webhook docs, and examples so users can distinguish durable Think turn admission from app-owned side-effect jobs. Co-authored-by: Cursor <cursoragent@cursor.com>
🦋 Changeset detectedLatest commit: 0b72b49 The changes in this PR will be included in the next version bump. This PR includes changesets to release 2 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
Use the workspace dependency for the Chat SDK messenger example's Think package so npm ci can resolve the merged branch after main's version-package release. Always run npm ci in the shared GitHub install action while relying on setup-node's npm package cache, avoiding stale node_modules cache hits that can mask lockfile drift. Co-authored-by: Cursor <cursoragent@cursor.com>
agents
@cloudflare/ai-chat
@cloudflare/codemode
hono-agents
@cloudflare/shell
@cloudflare/think
@cloudflare/voice
@cloudflare/worker-bundler
commit: |
Correct the malformed Think changeset frontmatter so Changesets can parse the release metadata. Ensure waitForCompletion waits for a terminal managed fiber status even when duplicate calls race with an already-running recovery pass, and cover the race with a regression test. Also document and test the Chat SDK state adapter's list-level TTL behavior. Co-authored-by: Cursor <cursoragent@cursor.com>
Merged
4 tasks
cjol
pushed a commit
that referenced
this pull request
May 20, 2026
* Add Chat SDK messenger example Demonstrates Chat SDK ingress on Agents with subagent-backed state and Think-owned conversation replies. Co-authored-by: Cursor <cursoragent@cursor.com> * Stream Chat SDK messenger replies Adds Think chat streaming with RPC-safe cancellation so messenger delivery failures can stop the corresponding sub-agent turn. Co-authored-by: Cursor <cursoragent@cursor.com> * Add managed fiber jobs Introduce managed fiber jobs on top of runFiber so agents can durably accept idempotent background work, inspect retained status, cancel running jobs, explicitly resolve interrupted jobs, and record recovery policy decisions. This adds the cf_agents_fibers ledger, schema v8 migration, status/list/delete/resolve APIs, cooperative cancellation signals, and waitForCompletion support that waits on terminal ledger state instead of only the callback promise. Tighten crash recovery semantics for managed work by reconciling stale run rows, recovering ledger-only pending/running rows, skipping recovery for already-terminal fibers, settling setup failures, and letting onFiberRecovered return a FiberRecoveryResult to move interrupted fibers to completed, error, aborted, or intentionally interrupted. The implementation also tracks active managed executions and terminal waiters so duplicate requests can join in-memory work when possible while post-restart retries drive the same recovery path. Use the new managed fiber API in the Chat SDK messenger example for AI replies. Telegram messages now get a stable per-message idempotency boundary, completion waiting preserves Chat SDK per-thread visible reply serialization, and recovery policy is explicit: accepted replies are replayed while mid-stream interruptions post a concise apology and settle the retained job. Expand coverage across unit, sub-agent, schema, and real eviction tests. The E2E harness now starts wrangler dev with persisted SQLite state, kills it mid-managed-fiber, restarts it, and verifies interrupted retention, recovery-result settlement, duplicate waitForCompletion retries after restart, and sub-agent managed fiber recovery through the parent alarm. Document the new durable job surface in the Agent and durable execution docs, including waitForCompletion, cancellation behavior, retained terminal records, explicit recovery outcomes, and how this differs from Think message admission. Co-authored-by: Cursor <cursoragent@cursor.com> * Polish managed fiber cleanup API Rename the public managed-fiber terminal timestamp from completedAt to settledAt, and rename the cleanup filter from completedBefore to settledBefore. These names better describe terminal rows across completed, error, aborted, and interrupted states while keeping the existing SQLite completed_at column internal. Make default deleteFibers() cleanup preserve interrupted rows. Interrupted managed fibers often need inspection or explicit application-level resolution, so callers must now opt in to deleting them by passing status: "interrupted". Clarify FiberContext.snapshot documentation so it does not imply callbacks are automatically re-entered with recovered snapshots; recovery snapshots are delivered through onFiberRecovered(). Add a regression test that default cleanup deletes completed rows while preserving interrupted rows, then verifies explicit interrupted cleanup still works. Co-authored-by: Cursor <cursoragent@cursor.com> * Document managed fiber adoption patterns Add practical guidance for using managed fibers around webhook-style application jobs, including retained cleanup with settledBefore, interrupted recovery, resolveFiber, and waitForCompletion behavior. Clarify the boundary between Think submissions and managed fibers across the Think docs, package README, server-driven messaging docs, webhook docs, and examples so users can distinguish durable Think turn admission from app-owned side-effect jobs. Co-authored-by: Cursor <cursoragent@cursor.com> * Fix PR install after main package bumps Use the workspace dependency for the Chat SDK messenger example's Think package so npm ci can resolve the merged branch after main's version-package release. Always run npm ci in the shared GitHub install action while relying on setup-node's npm package cache, avoiding stale node_modules cache hits that can mask lockfile drift. Co-authored-by: Cursor <cursoragent@cursor.com> * Fix managed fiber review issues Correct the malformed Think changeset frontmatter so Changesets can parse the release metadata. Ensure waitForCompletion waits for a terminal managed fiber status even when duplicate calls race with an already-running recovery pass, and cover the race with a regression test. Also document and test the Chat SDK state adapter's list-level TTL behavior. Co-authored-by: Cursor <cursoragent@cursor.com> --------- Co-authored-by: Cursor <cursoragent@cursor.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
This PR adds a Chat SDK messenger example and the managed fiber APIs needed to make webhook-driven AI replies durable, idempotent, inspectable, and recoverable.
The example uses Chat SDK as the messenger abstraction, Telegram as the current adapter, Agents subagents for Chat SDK state, and a Think-backed conversation subagent for AI replies. The core Agents SDK gains
startFiber()and related APIs so application-owned jobs, such as webhook reply work, can be accepted once, deduped across retries, inspected later, cancelled, cleaned up, and explicitly recovered after eviction.Why
Webhook-driven chat integrations have an awkward failure boundary:
runFiber()is useful for crash recovery, but it does not provide a stable public job identity, idempotency key, retained terminal status, or an explicit application recovery result.This PR adds a higher-level managed fiber layer on top of
runFiber()for those application-owned jobs, then uses it in the Chat SDK messenger example.What Changed
Chat SDK messenger example
Adds
examples/chat-sdk-messenger, a server-only Worker example that demonstrates:StateAdapterbacked by an Agents subagent.ConversationAgentsubagent for AI replies.The example intentionally keeps provider-specific behavior thin so future adapters can reuse the same shape.
Think streaming and cancellation
The Think path now supports RPC-safe streamed turns for this use case:
chat()streams text deltas through a serializable callback target.cancelChat(requestId)/cancelAllChats()instead of passing non-serializableAbortSignals across Durable Object RPC.Managed fiber jobs
Adds a retained managed job layer to
Agent:startFiber()durably accepts a background job.idempotencyKeydedupes retries.fiberIdgives stable identity.inspectFiber()/inspectFiberByKey()expose retained status.listFibers()supports status inspection.cancelFiber()/cancelFiberByKey()record cooperative cancellation.deleteFibers()cleans up retained terminal rows.resolveFiber()lets app-level recovery resolve interrupted jobs.onFiberRecovered()can return aFiberRecoveryResultto record explicit recovery policy.Managed fibers retain terminal status separately from the transient
cf_agents_runsrows used by raw fibers. This gives callers a durable job ledger without breaking the existingrunFiber()primitive.waitForCompletionAdds
waitForCompletion: truetostartFiber()for cases where the caller should wait until the retained job reaches a terminal status.The messenger example uses this to preserve Chat SDK's visible reply serialization semantics. Without it, durable acceptance would return quickly and allow multiple visible replies to overlap in the same thread.
The implementation waits on managed fiber terminal ledger status, not just the callback promise, so cancellation and recovery unblock waiters correctly.
Recovery semantics
Managed fiber recovery is explicit:
completed.error.aborted.interrupted.onFiberRecovered()may return{ status: "completed" | "error" | "aborted" | "interrupted" }.undefinedleaves the rowinterrupted.interruptedand records the recovery error.resolveFiber()only updates currently interrupted rows.deleteFibers()defaults to deletingcompleted,error, andabortedrows, while preservinginterruptedrows for inspection or manual/application-level resolution.Public API Notes
New managed fiber APIs are additive and backward-compatible with existing
runFiber()usage.Important naming choices:
settledAt, because the timestamp applies to any terminal state, not only successful completion.settledBeforefor the same reason.Documentation
Updates docs for:
The docs clarify the intended boundary:
submitMessages()owns durable Think conversation admission.startFiber()owns external application jobs around a turn.runFiber()for stream recovery because those are internal recovery fibers, not externally inspectable application jobs.Tests
Adds and updates unit and E2E coverage for managed fibers and the messenger example.
Covered cases include:
fiberIdbehavior.waitForCompletionfor new and duplicate jobs.wrangler dev.waitForCompletionafter restart/stale ledger recovery.Verification
Ran:
npm run test:workers -w agents -- "src/tests/run-fiber.test.ts"npm run test:e2e -w agentsnpm run test -w @cloudflare/agents-chat-sdk-messengernpm run checkNote: one
npm run checkattempt hit a transientexperimental/gadgets-chatagents/reacttype resolution failure. Rerunningnpm run checkpassed with all 85 projects typechecking successfully.Review Focus
Suggested review areas:
waitForCompletionsemantics, especially duplicate calls, cancellation, and recovery.interruptedshould remain excluded from default cleanup.FiberRecoveryResultis explicit enough for applications.runFiber()vsstartFiber()vssubmitMessages()vs Workflows.Follow-Ups
Not included in this PR:
runFiber()to managed fibers.Made with Cursor