Shiny Integration: Streaming Chat with Interrupt
Source:vignettes/articles/shiny-streaming.Rmd
shiny-streaming.RmdThis article shows the recommended pattern for streaming Claude responses token-by-token inside a Shiny app with reliable interrupt support.
The key design uses coro::async +
client$poll_messages() + Shiny’s ExtendedTask.
Each await() call yields the R event loop, so Shiny can
process input events (e.g., an interrupt button) between streamed
tokens.
Why not receive_response_async()?
receive_response_async() drives its own
later::later() polling loop internally. Because
later callbacks run at higher priority than Shiny input
observers, an interrupt button press is not processed until the promise
resolves — meaning the user has to wait for the entire response before
the interrupt takes effect.
The coro::async pattern solves this: every
await() surrenders control back to the Shiny session
scheduler, so observers fire between tokens.
Complete App
library(shiny)
library(bslib)
library(shinychat)
library(ClaudeAgentSDK)
library(promises)
library(coro)
ui <- page_fillable(
chat_ui("chat", fill = TRUE, placeholder = "Ask Claude anything…"),
# ESC key sends an interrupt signal with high priority
tags$script(HTML("
document.addEventListener('keydown', function(e) {
if (e.key === 'Escape')
Shiny.setInputValue('esc', Math.random(), {priority: 'event'});
});
"))
)
server <- function(input, output, session) {
client <- ClaudeSDKClient$new(ClaudeAgentOptions(
max_turns = 1L,
permission_mode = "bypassPermissions",
include_partial_messages = TRUE # enables StreamEvent token stream
))
client$connect()
onStop(function() client$disconnect())
interrupt_flag <- reactiveVal(FALSE)
# -------------------------------------------------------------------------
# coro::async streaming function
#
# Interrupt flow:
# 1. At the top of every loop iteration, check interrupt_flag.
# 2. On first detection: call client$interrupt(), append "[Interrupted]".
# 3. Enter *drain mode*: skip all messages until ResultMessage arrives.
# This clears the internal buffer so the next send() starts clean.
# -------------------------------------------------------------------------
do_stream <- coro::async(function(client, interrupt_flag, session) {
chunk_started <- FALSE
interrupted <- FALSE
repeat {
# Check for interrupt at the top of each iteration
if (!interrupted && shiny::isolate(interrupt_flag())) {
interrupted <- TRUE
tryCatch(client$interrupt(), error = function(e) NULL)
if (chunk_started) {
chat_append_message("chat",
list(role = "assistant", content = "\n\n_[Interrupted]_"),
chunk = "end", session = session)
chunk_started <- FALSE
} else {
chat_append_message("chat",
list(role = "assistant", content = "_[Interrupted]_"),
chunk = FALSE, session = session)
}
}
msgs <- tryCatch(client$poll_messages(), error = function(e) list())
if (length(msgs) == 0L) {
# No messages yet — yield for 50 ms so Shiny can process inputs
await(promises::promise(function(resolve, reject) {
later::later(function() resolve(TRUE), 0.05)
}))
next
}
drain_done <- FALSE
for (msg in msgs) {
await(promises::promise_resolve(TRUE)) # yield between each message
# Drain mode: skip everything until ResultMessage
if (interrupted) {
if (inherits(msg, "ResultMessage")) { drain_done <- TRUE; break }
next
}
# StreamEvent: extract text_delta tokens
if (inherits(msg, "StreamEvent") && is.list(msg$event)) {
evt <- msg$event
if (identical(evt$type, "content_block_delta") &&
is.list(evt$delta) &&
identical(evt$delta$type, "text_delta") &&
!is.null(evt$delta$text)) {
if (!chunk_started) {
chunk_started <- TRUE
chat_append_message("chat",
list(role = "assistant", content = ""),
chunk = "start", session = session)
}
chat_append_message("chat",
list(role = "assistant", content = evt$delta$text),
chunk = TRUE, session = session)
}
}
# ResultMessage: finalize chunk and return
if (inherits(msg, "ResultMessage")) {
if (chunk_started) {
chat_append_message("chat",
list(role = "assistant", content = ""),
chunk = "end", session = session)
}
return("done")
}
}
if (drain_done) break
}
"done"
})
# -------------------------------------------------------------------------
# ExtendedTask: runs do_stream asynchronously; Shiny stays responsive
# -------------------------------------------------------------------------
stream_task <- ExtendedTask$new(function(user_input) {
client$send(user_input)
do_stream(client, interrupt_flag, session)
})
observeEvent(input$chat_user_input, {
if (stream_task$status() == "running") return()
interrupt_flag(FALSE)
stream_task$invoke(input$chat_user_input)
})
# Interrupt on ESC key
observeEvent(input$esc, {
if (stream_task$status() == "running") interrupt_flag(TRUE)
})
}
shinyApp(ui, server)How It Works
| Step | What happens |
|---|---|
| User sends message |
client$send() queues the prompt;
ExtendedTask starts do_stream()
|
| Tokens arrive |
poll_messages() returns StreamEvent
objects; each text delta appended via chunk = TRUE
|
| No tokens yet |
await(later::later(..., 0.05)) yields for 50 ms — Shiny
processes inputs |
| User presses ESC |
interrupt_flag(TRUE) is set; caught at the top of the
next loop iteration |
| Interrupt detected |
client$interrupt() signals the CLI; drain mode waits
for ResultMessage
|
ResultMessage received |
chunk closed with chunk = "end"; coroutine returns
"done"
|
Key Design Points
include_partial_messages = TRUE Without
this option the CLI buffers the full response before sending it. Setting
it to TRUE enables the StreamEvent token
stream.
Drain after interrupt After calling
client$interrupt() the CLI still sends a final
ResultMessage. The drain loop consumes it so the internal
buffer is empty before the next client$send() call —
otherwise the next response would return immediately with the stale
result.
{priority: 'event'} in JS The
Shiny.setInputValue(..., {priority: 'event'}) flag ensures
the ESC keydown fires an observer before any pending later
callbacks, giving the interrupt near-instant latency even under heavy
streaming.
Running the Example
shiny::runApp("examples/13_shinychat_streaming.R")