Skip to contents

This article shows the recommended pattern for streaming Claude responses token-by-token inside a Shiny app with reliable interrupt support.

The key design uses coro::async + client$poll_messages() + Shiny’s ExtendedTask. Each await() call yields the R event loop, so Shiny can process input events (e.g., an interrupt button) between streamed tokens.

Why not receive_response_async()?

receive_response_async() drives its own later::later() polling loop internally. Because later callbacks run at higher priority than Shiny input observers, an interrupt button press is not processed until the promise resolves — meaning the user has to wait for the entire response before the interrupt takes effect.

The coro::async pattern solves this: every await() surrenders control back to the Shiny session scheduler, so observers fire between tokens.

Complete App

library(shiny)
library(bslib)
library(shinychat)
library(ClaudeAgentSDK)
library(promises)
library(coro)

ui <- page_fillable(
  chat_ui("chat", fill = TRUE, placeholder = "Ask Claude anything…"),
  # ESC key sends an interrupt signal with high priority
  tags$script(HTML("
    document.addEventListener('keydown', function(e) {
      if (e.key === 'Escape')
        Shiny.setInputValue('esc', Math.random(), {priority: 'event'});
    });
  "))
)

server <- function(input, output, session) {

  client <- ClaudeSDKClient$new(ClaudeAgentOptions(
    max_turns                = 1L,
    permission_mode          = "bypassPermissions",
    include_partial_messages = TRUE   # enables StreamEvent token stream
  ))
  client$connect()
  onStop(function() client$disconnect())

  interrupt_flag <- reactiveVal(FALSE)

  # -------------------------------------------------------------------------
  # coro::async streaming function
  #
  # Interrupt flow:
  #   1. At the top of every loop iteration, check interrupt_flag.
  #   2. On first detection: call client$interrupt(), append "[Interrupted]".
  #   3. Enter *drain mode*: skip all messages until ResultMessage arrives.
  #      This clears the internal buffer so the next send() starts clean.
  # -------------------------------------------------------------------------
  do_stream <- coro::async(function(client, interrupt_flag, session) {
    chunk_started <- FALSE
    interrupted   <- FALSE

    repeat {
      # Check for interrupt at the top of each iteration
      if (!interrupted && shiny::isolate(interrupt_flag())) {
        interrupted <- TRUE
        tryCatch(client$interrupt(), error = function(e) NULL)
        if (chunk_started) {
          chat_append_message("chat",
            list(role = "assistant", content = "\n\n_[Interrupted]_"),
            chunk = "end", session = session)
          chunk_started <- FALSE
        } else {
          chat_append_message("chat",
            list(role = "assistant", content = "_[Interrupted]_"),
            chunk = FALSE, session = session)
        }
      }

      msgs <- tryCatch(client$poll_messages(), error = function(e) list())

      if (length(msgs) == 0L) {
        # No messages yet — yield for 50 ms so Shiny can process inputs
        await(promises::promise(function(resolve, reject) {
          later::later(function() resolve(TRUE), 0.05)
        }))
        next
      }

      drain_done <- FALSE
      for (msg in msgs) {
        await(promises::promise_resolve(TRUE))  # yield between each message

        # Drain mode: skip everything until ResultMessage
        if (interrupted) {
          if (inherits(msg, "ResultMessage")) { drain_done <- TRUE; break }
          next
        }

        # StreamEvent: extract text_delta tokens
        if (inherits(msg, "StreamEvent") && is.list(msg$event)) {
          evt <- msg$event
          if (identical(evt$type, "content_block_delta") &&
              is.list(evt$delta) &&
              identical(evt$delta$type, "text_delta") &&
              !is.null(evt$delta$text)) {
            if (!chunk_started) {
              chunk_started <- TRUE
              chat_append_message("chat",
                list(role = "assistant", content = ""),
                chunk = "start", session = session)
            }
            chat_append_message("chat",
              list(role = "assistant", content = evt$delta$text),
              chunk = TRUE, session = session)
          }
        }

        # ResultMessage: finalize chunk and return
        if (inherits(msg, "ResultMessage")) {
          if (chunk_started) {
            chat_append_message("chat",
              list(role = "assistant", content = ""),
              chunk = "end", session = session)
          }
          return("done")
        }
      }

      if (drain_done) break
    }

    "done"
  })

  # -------------------------------------------------------------------------
  # ExtendedTask: runs do_stream asynchronously; Shiny stays responsive
  # -------------------------------------------------------------------------
  stream_task <- ExtendedTask$new(function(user_input) {
    client$send(user_input)
    do_stream(client, interrupt_flag, session)
  })

  observeEvent(input$chat_user_input, {
    if (stream_task$status() == "running") return()
    interrupt_flag(FALSE)
    stream_task$invoke(input$chat_user_input)
  })

  # Interrupt on ESC key
  observeEvent(input$esc, {
    if (stream_task$status() == "running") interrupt_flag(TRUE)
  })
}

shinyApp(ui, server)

How It Works

Step What happens
User sends message client$send() queues the prompt; ExtendedTask starts do_stream()
Tokens arrive poll_messages() returns StreamEvent objects; each text delta appended via chunk = TRUE
No tokens yet await(later::later(..., 0.05)) yields for 50 ms — Shiny processes inputs
User presses ESC interrupt_flag(TRUE) is set; caught at the top of the next loop iteration
Interrupt detected client$interrupt() signals the CLI; drain mode waits for ResultMessage
ResultMessage received chunk closed with chunk = "end"; coroutine returns "done"

Key Design Points

include_partial_messages = TRUE Without this option the CLI buffers the full response before sending it. Setting it to TRUE enables the StreamEvent token stream.

Drain after interrupt After calling client$interrupt() the CLI still sends a final ResultMessage. The drain loop consumes it so the internal buffer is empty before the next client$send() call — otherwise the next response would return immediately with the stale result.

{priority: 'event'} in JS The Shiny.setInputValue(..., {priority: 'event'}) flag ensures the ESC keydown fires an observer before any pending later callbacks, giving the interrupt near-instant latency even under heavy streaming.

Running the Example

shiny::runApp("examples/13_shinychat_streaming.R")