Working with Streams

Learn how to work with Server-Sent Events (SSE) for real-time updates in ChordianAI workflows.

What is SSE?

Server-Sent Events (SSE) is a technology that allows servers to push real-time updates to clients over HTTP. ChordianAI uses SSE to provide live progress updates during long-running operations.

When to Use SSE

Use SSE streaming for:

  • Workflow progress - Real-time updates during data collection
  • Research workflows - Live updates from research agents
  • CSV imports - Progress tracking during file processing
  • Enrichment - Status updates during data enrichment

SSE vs Polling

Polling (Traditional)

// Check status every 5 seconds
setInterval(async () => {
  const response = await fetch(`/api/workflow/status/${threadId}`);
  const status = await response.json();
  console.log(status);
}, 5000);

Pros: Simple, works everywhere
Cons: Delayed updates, more requests, higher latency

SSE (Streaming)

// Real-time updates
const eventSource = new EventSource(`/api/research/stream/${threadId}`);
eventSource.onmessage = (event) => {
  const data = JSON.parse(event.data);
  console.log(data); // Instant updates!
};

Pros: Real-time, efficient, lower latency
Cons: Requires SSE support


Basic SSE Implementation

JavaScript/Browser

const threadId = 'research_abc123';
const eventSource = new EventSource(
  `https://chordian-core.chordian.ai/api/research/stream/${threadId}`
);
 
// Handle messages
eventSource.onmessage = (event) => {
  const data = JSON.parse(event.data);
  console.log('Update:', data);
};
 
// Handle errors
eventSource.onerror = (error) => {
  console.error('SSE Error:', error);
  eventSource.close();
};
 
// Close when done
eventSource.close();

Event Types

Different endpoints emit different event types:

Research Stream Events

eventSource.addEventListener('status', (event) => {
  const data = JSON.parse(event.data);
  console.log('Status:', data.status);
});
 
eventSource.addEventListener('clarification', (event) => {
  const data = JSON.parse(event.data);
  console.log('Question:', data.question);
  // Prompt user for response
});
 
eventSource.addEventListener('brief', (event) => {
  const data = JSON.parse(event.data);
  console.log('Research Brief:', data.research_brief);
});
 
eventSource.addEventListener('progress', (event) => {
  const data = JSON.parse(event.data);
  console.log(`Agent ${data.agent}: ${data.status}`);
});
 
eventSource.addEventListener('complete', (event) => {
  const data = JSON.parse(event.data);
  console.log('Final Report:', data.final_report);
  eventSource.close();
});

React Example

import { useEffect, useState } from 'react';
 
function ResearchProgress({ threadId }) {
  const [status, setStatus] = useState('connecting');
  const [progress, setProgress] = useState([]);
  const [report, setReport] = useState(null);
 
  useEffect(() => {
    const eventSource = new EventSource(
      `https://chordian-core.chordian.ai/api/research/stream/${threadId}`
    );
 
    eventSource.addEventListener('status', (event) => {
      const data = JSON.parse(event.data);
      setStatus(data.status);
    });
 
    eventSource.addEventListener('progress', (event) => {
      const data = JSON.parse(event.data);
      setProgress(prev => [...prev, data]);
    });
 
    eventSource.addEventListener('complete', (event) => {
      const data = JSON.parse(event.data);
      setReport(data.final_report);
      eventSource.close();
    });
 
    eventSource.onerror = () => {
      setStatus('error');
      eventSource.close();
    };
 
    return () => eventSource.close();
  }, [threadId]);
 
  return (
    <div>
      <h2>Status: {status}</h2>
      <div>
        {progress.map((p, i) => (
          <div key={i}>{p.agent}: {p.status}</div>
        ))}
      </div>
      {report && <pre>{report}</pre>}
    </div>
  );
}

Node.js Example

import { EventSource } from 'eventsource';
 
const threadId = 'research_abc123';
const eventSource = new EventSource(
  `https://chordian-core.chordian.ai/api/research/stream/${threadId}`
);
 
eventSource.onmessage = (event) => {
  const data = JSON.parse(event.data);
  console.log('Update:', data);
};
 
eventSource.addEventListener('complete', (event) => {
  const data = JSON.parse(event.data);
  console.log('Complete:', data.final_report);
  eventSource.close();
});

Note: Install eventsource package: npm install eventsource


Error Handling

Reconnection Strategy

let reconnectAttempts = 0;
const MAX_RECONNECTS = 3;
 
function connectSSE(threadId) {
  const eventSource = new EventSource(
    `https://chordian-core.chordian.ai/api/research/stream/${threadId}`
  );
 
  eventSource.onerror = (error) => {
    console.error('Connection error:', error);
    eventSource.close();
 
    if (reconnectAttempts < MAX_RECONNECTS) {
      reconnectAttempts++;
      console.log(`Reconnecting... (${reconnectAttempts}/${MAX_RECONNECTS})`);
      setTimeout(() => connectSSE(threadId), 2000);
    } else {
      console.error('Max reconnection attempts reached');
      // Fall back to polling
      pollStatus(threadId);
    }
  };
 
  eventSource.onmessage = (event) => {
    reconnectAttempts = 0; // Reset on successful message
    const data = JSON.parse(event.data);
    handleUpdate(data);
  };
 
  return eventSource;
}

Timeout Handling

const TIMEOUT_MS = 60000; // 60 seconds
 
const eventSource = new EventSource(`/api/research/stream/${threadId}`);
let timeoutId;
 
function resetTimeout() {
  clearTimeout(timeoutId);
  timeoutId = setTimeout(() => {
    console.warn('No updates for 60 seconds, closing connection');
    eventSource.close();
  }, TIMEOUT_MS);
}
 
eventSource.onmessage = (event) => {
  resetTimeout();
  const data = JSON.parse(event.data);
  console.log('Update:', data);
};
 
resetTimeout(); // Start timeout

Best Practices

? Info: Always close connections when you’re done to prevent memory leaks!

1. Always Close Connections

// Close when component unmounts (React)
useEffect(() => {
  const eventSource = new EventSource(url);
  return () => eventSource.close();
}, []);
 
// Close when done
eventSource.addEventListener('complete', () => {
  eventSource.close();
});

2. Handle Errors Gracefully

eventSource.onerror = (error) => {
  console.error('SSE Error:', error);
  eventSource.close();
  // Fall back to polling or show error to user
};

3. Implement Timeouts

Don’t let connections hang forever. Implement timeouts and fallback strategies.

4. Parse Data Safely

eventSource.onmessage = (event) => {
  try {
    const data = JSON.parse(event.data);
    handleUpdate(data);
  } catch (error) {
    console.error('Failed to parse SSE data:', error);
  }
};

5. Use Event Types

Listen for specific event types instead of just onmessage:

eventSource.addEventListener('progress', handleProgress);
eventSource.addEventListener('complete', handleComplete);
eventSource.addEventListener('error', handleError);

Debugging SSE

Browser DevTools

  1. Open DevTools (F12)
  2. Go to Network tab
  3. Look for requests with type eventsource
  4. Click to see real-time messages

Common Issues

IssueCauseSolution
Connection closes immediatelyInvalid thread IDVerify thread ID is correct
No messages receivedWorkflow not startedCheck workflow status first
CORS errorsCross-origin requestEnsure proper CORS headers
Connection timeoutLong-running operationImplement reconnection logic

Next Steps