· web development · 7 min read

Mastering Stream Detection: Using MutationObserver to Track LLM Responses in Real-Time

A vanilla JavaScript solution for detecting when streaming responses from Large Language Models have completed

A vanilla JavaScript solution for detecting when streaming responses from Large Language Models have completed

Mastering Stream Detection: Using MutationObserver to Track LLM Responses in Real-Time

Introduction

While building a chrome extension that monitors streaming responses from ChatGPT, I ran into an interesting issue:

The completion of a streamed response can vary in length and time. If I want to perform some operation on the completed response exactly once1 and I am not able to monitor network traffic, how can I identify when the response is completed?

TL;DR? We can monitor changes to the DOM using the browser’s MutationObserver API, though we need to use two MutationObservers in order to handle some of the complexities with LLM streaming responses.

While I encountered this while building a browser extension, this might still be relevant if you’re using a library like HTMX, Hotwire, or just find yourself in a situation where a lightweight, frontend-only solution is preferable.

Understanding the Streaming Pattern

Before we dive into the proposed approach, it’s important to understand a common pattern used in LLM streaming interfaces. ChatGPT uses a specific class (e.g. result-streaming) to indicate the status of the response:

  1. When a new response starts streaming, a new DOM node is created with a result-streaming class.
  2. As the response continues to stream, content is added to this node.
  3. Once the streaming is complete, the result-streaming class is removed from the node.

This pattern provides a clear, DOM-based signal for the start and end of streaming, which we can leverage in our solution.

Enter the MutationObserver

We can use MutationObservers to subscribe to changes in the DOM, either on a specific node or a node and all its children. When the observed changes occur, the mutation observer triggers a callback that allowing us to add additional side effects to those mutations.

We can configure which DOM changes trigger the callback that we register:

  1. Child List Changes: When elements are added, removed, or moved within a specified node.
  2. Attribute Changes: When any attribute of a specified node is added, removed, or modified.
  3. Character Data Changes: When the text content (character data) of a text node is modified.

These changes represent broad categories of DOM changes that can occur quite a lot. We can reduce how often the callback is triggered by configuring additional properties for the MutationObserver, like defining an attributeFilter. As you might expect, the mutation observer will filter out all attribute-based mutations except for passed to the filter.

Once the observer is defined, you can call observer.disconnect() on it to stop subscribing to all future DOM mutations.

Our distilled approach

For our solution, we broke the problem down into 3 discrete steps.

  1. Detect when new response nodes are added to the DOM (which could happen at any time).
  2. For each new node, watch for the removal of the ‘streaming’ class.
  3. Perform an action exactly once when the streaming is complete.

In order to both watch for new streamed responses and process the completed LLM responses exactly once, we took a two-stage approach of setting up MutationObservers.

To understand why using a single MutationObserver was insufficient, let’s walk through two examples.

The Problem with Premature Disconnection

Since we care about performing our action exactly once, we could guarantee that we perform our business logic exactly once per completed response by disconnecting the observer after processing the response:

// This approach would NOT work correctly
const observer = new MutationObserver((mutations) => {
  for (let mutation of mutations) {
    // hasAssistantAttribute is a utility function identifying that a given node
    // is a ChatGPT's response
    if (hasAssistantAttribute(mutation.target)) {
      const targetNodeContainsStreamingClass = !mutation.target.classList.contains('result-streaming');
      if (targetNodeContainsStreamingClass) {
        // perform some operation here using the completed response...
        observer.disconnect(); // stop all observing after processing the completed response
      }
    }
  }
});

observer.observe(document.body, { attributes: true, subtree: true });

The issue here is that observer.disconnect() would stop the entire observation process. This means we would miss any new ChatGPT responses appearing after the first one completes.

The Pitfall of Duplicate Events

On the other hand, if we don’t disconnect the observer at all in an attempt to keep watching for new messages, we risk triggering duplicate events. Consider this scenario:

// This approach could trigger duplicate events
const observer = new MutationObserver((mutations) => {
  for (let mutation of mutations) {
    const targetNodeContainsStreamingClass = !mutation.target.classList.contains('result-streaming');
    if (hasAssistantAttribute(mutation.target) && targetNodeContainsStreamingClass) {
      // perform some operation here using the completed response...
      // Notice: No observer.disconnect() here, which means we'll always keep observing
    }
  }
});

// Observe all attribute changes within the entire DOM tree
observer.observe(document.body, { attributes: true, subtree: true });

In this case, the observer continues to watch for mutations even after the streaming is complete. If any attribute of the observed node changes after the result-streaming class was removed, our logic would trigger again and reprocess the completed message.

The Two-Stage Solution

To address these issues, we can implement a two-stage approach using separate observers:

  1. A main observer that continuously watches for new messages.
  2. Individual observers for each message that monitor for the completion of streaming.

By implementing this two-stage approach, we create a robust system that efficiently handles the dynamic nature of these streaming interfaces. In this way, we address the challenges of premature disconnection while guaranteeing that any side effects triggered once the streamed response ends happen exactly once.

Implementing the Two-Stage MutationObserver Solution

Step 1: Observing Individual Assistant Messages

First, we create a function to observe each assistant message for the completion of streaming:

const observedElements = new WeakMap();

function observeForRemovalOfStreamingClass(node) {
  if (observedElements.has(node)) {
    return observedElements.get(node);
  }

  const observer = new MutationObserver((mutations) => {
    for (let mutation of mutations) {
      if (mutation.type === 'attributes' && mutation.attributeName === 'class') {
        const currentClasses = node.classList;
        if (!currentClasses.contains('result-streaming')) {
          // perform some operation here using the completed response...
          observer.disconnect();
          observedElements.delete(node);
        }
      }
    }
  });

  observer.observe(node, {
    attributes: true,
    attributeFilter: ['class'],
    subtree: true,
  });

  return observer;
}

This function creates a new MutationObserver for a specific assistant message node. It watches for changes to the class attribute and captures the complete message text when the result-streaming class is removed

To ensure that each node is observed once, we used a WeakMap to track observed nodes. If a node is already being observed, it returns the existing observer, preventing duplicate observers. After processing the message and disconnecting the observer, the node is removed from the WeakMap to clean up resources effectively.

Step 2: Setting Up the Main Observer

Next, we set up the main observer that watches for new assistant messages:

function setupMutationObserver() {
  const observer = new MutationObserver((mutations) => {
    for (let mutation of mutations) {
      if (mutation.type === 'attributes') {
        if (hasAssistantAttribute(mutation.target)) {
          observeForRemovalOfStreamingClass(mutation.target);
        }
      }
    }
  });

  observer.observe(document.body, {
    attributes: true,
    attributeFilter: ['class'],
    subtree: true,
  });
}

This main observer watches the entire document body for attribute changes. When it detects a new node with the assistant attribute, it creates a new observer for that specific node. It continues to run, catching any new assistant messages that appear in the DOM.

The class attribute filter ensures that changes related to message streaming status are detected. This is critical for identifying when the result-streaming class is added or removed in the the mutation.target’s children nodes (for a side-note, you may be wondering why we didn’t observe).

The Power of This Approach

This two-stage approach offers two key advantages:

  1. Flexibility: The solution is not tied to specific timing or length of responses, relying instead on DOM changes. This makes it adaptable to various LLM behaviors.

  2. Precise Control: We can perform actions on completed responses without risk of duplication or missing events.

Added Considerations

While our solution is robust for most scenarios, there are some added considerations to keep in mind:

  1. Performance Optimization: For applications with a high volume of messages, implement a cleanup strategy to remove observers for old messages.

  2. Customization: The current solution assumes specific class names and attributes. Adjust these based on the particular LLM interface you’re working with.

  3. Targeted Observation: Consider observing only the chat message container instead of the entire document body to improve efficiency.

Conclusion

Our two-stage MutationObserver approach effectively addresses the challenge of detecting the completion of streaming responses in a dynamic DOM environment.

By implementing a main observer to catch new response nodes and individual observers to monitor the end of streaming, we were able to control over when actions are triggered, ensuring they occur exactly once per completed response.

Further Reading

References

Acknowledgements

Big thanks to Guy Grinapell for proofreading, ideating, and reminding me that complex topics still require simple explanations.

Footnotes

  1. Exactly Once Delivery

Back to Blog