Working with Streams
Learn how to work with Server-Sent Events (SSE) for real-time updates in ChordianAI workflows.
What is SSE?
Server-Sent Events (SSE) is a technology that allows servers to push real-time updates to clients over HTTP. ChordianAI uses SSE to provide live progress updates during long-running operations.
When to Use SSE
Use SSE streaming for:
- Workflow progress - Real-time updates during data collection
- Research workflows - Live updates from research agents
- CSV imports - Progress tracking during file processing
- Enrichment - Status updates during data enrichment
SSE vs Polling
Polling (Traditional)
// Check status every 5 seconds
setInterval(async () => {
const response = await fetch(`/api/workflow/status/${threadId}`);
const status = await response.json();
console.log(status);
}, 5000);Pros: Simple, works everywhere
Cons: Delayed updates, more requests, higher latency
SSE (Streaming)
// Real-time updates
const eventSource = new EventSource(`/api/research/stream/${threadId}`);
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log(data); // Instant updates!
};Pros: Real-time, efficient, lower latency
Cons: Requires SSE support
Basic SSE Implementation
JavaScript/Browser
const threadId = 'research_abc123';
const eventSource = new EventSource(
`https://chordian-core.chordian.ai/api/research/stream/${threadId}`
);
// Handle messages
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log('Update:', data);
};
// Handle errors
eventSource.onerror = (error) => {
console.error('SSE Error:', error);
eventSource.close();
};
// Close when done
eventSource.close();Event Types
Different endpoints emit different event types:
Research Stream Events
eventSource.addEventListener('status', (event) => {
const data = JSON.parse(event.data);
console.log('Status:', data.status);
});
eventSource.addEventListener('clarification', (event) => {
const data = JSON.parse(event.data);
console.log('Question:', data.question);
// Prompt user for response
});
eventSource.addEventListener('brief', (event) => {
const data = JSON.parse(event.data);
console.log('Research Brief:', data.research_brief);
});
eventSource.addEventListener('progress', (event) => {
const data = JSON.parse(event.data);
console.log(`Agent ${data.agent}: ${data.status}`);
});
eventSource.addEventListener('complete', (event) => {
const data = JSON.parse(event.data);
console.log('Final Report:', data.final_report);
eventSource.close();
});React Example
import { useEffect, useState } from 'react';
function ResearchProgress({ threadId }) {
const [status, setStatus] = useState('connecting');
const [progress, setProgress] = useState([]);
const [report, setReport] = useState(null);
useEffect(() => {
const eventSource = new EventSource(
`https://chordian-core.chordian.ai/api/research/stream/${threadId}`
);
eventSource.addEventListener('status', (event) => {
const data = JSON.parse(event.data);
setStatus(data.status);
});
eventSource.addEventListener('progress', (event) => {
const data = JSON.parse(event.data);
setProgress(prev => [...prev, data]);
});
eventSource.addEventListener('complete', (event) => {
const data = JSON.parse(event.data);
setReport(data.final_report);
eventSource.close();
});
eventSource.onerror = () => {
setStatus('error');
eventSource.close();
};
return () => eventSource.close();
}, [threadId]);
return (
<div>
<h2>Status: {status}</h2>
<div>
{progress.map((p, i) => (
<div key={i}>{p.agent}: {p.status}</div>
))}
</div>
{report && <pre>{report}</pre>}
</div>
);
}Node.js Example
import { EventSource } from 'eventsource';
const threadId = 'research_abc123';
const eventSource = new EventSource(
`https://chordian-core.chordian.ai/api/research/stream/${threadId}`
);
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log('Update:', data);
};
eventSource.addEventListener('complete', (event) => {
const data = JSON.parse(event.data);
console.log('Complete:', data.final_report);
eventSource.close();
});Note: Install eventsource package: npm install eventsource
Error Handling
Reconnection Strategy
let reconnectAttempts = 0;
const MAX_RECONNECTS = 3;
function connectSSE(threadId) {
const eventSource = new EventSource(
`https://chordian-core.chordian.ai/api/research/stream/${threadId}`
);
eventSource.onerror = (error) => {
console.error('Connection error:', error);
eventSource.close();
if (reconnectAttempts < MAX_RECONNECTS) {
reconnectAttempts++;
console.log(`Reconnecting... (${reconnectAttempts}/${MAX_RECONNECTS})`);
setTimeout(() => connectSSE(threadId), 2000);
} else {
console.error('Max reconnection attempts reached');
// Fall back to polling
pollStatus(threadId);
}
};
eventSource.onmessage = (event) => {
reconnectAttempts = 0; // Reset on successful message
const data = JSON.parse(event.data);
handleUpdate(data);
};
return eventSource;
}Timeout Handling
const TIMEOUT_MS = 60000; // 60 seconds
const eventSource = new EventSource(`/api/research/stream/${threadId}`);
let timeoutId;
function resetTimeout() {
clearTimeout(timeoutId);
timeoutId = setTimeout(() => {
console.warn('No updates for 60 seconds, closing connection');
eventSource.close();
}, TIMEOUT_MS);
}
eventSource.onmessage = (event) => {
resetTimeout();
const data = JSON.parse(event.data);
console.log('Update:', data);
};
resetTimeout(); // Start timeoutBest Practices
? Info: Always close connections when you’re done to prevent memory leaks!
1. Always Close Connections
// Close when component unmounts (React)
useEffect(() => {
const eventSource = new EventSource(url);
return () => eventSource.close();
}, []);
// Close when done
eventSource.addEventListener('complete', () => {
eventSource.close();
});2. Handle Errors Gracefully
eventSource.onerror = (error) => {
console.error('SSE Error:', error);
eventSource.close();
// Fall back to polling or show error to user
};3. Implement Timeouts
Don’t let connections hang forever. Implement timeouts and fallback strategies.
4. Parse Data Safely
eventSource.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
handleUpdate(data);
} catch (error) {
console.error('Failed to parse SSE data:', error);
}
};5. Use Event Types
Listen for specific event types instead of just onmessage:
eventSource.addEventListener('progress', handleProgress);
eventSource.addEventListener('complete', handleComplete);
eventSource.addEventListener('error', handleError);Debugging SSE
Browser DevTools
- Open DevTools (F12)
- Go to Network tab
- Look for requests with type
eventsource - Click to see real-time messages
Common Issues
| Issue | Cause | Solution |
|---|---|---|
| Connection closes immediately | Invalid thread ID | Verify thread ID is correct |
| No messages received | Workflow not started | Check workflow status first |
| CORS errors | Cross-origin request | Ensure proper CORS headers |
| Connection timeout | Long-running operation | Implement reconnection logic |
Next Steps
- Try the Workflow Tutorial
- Learn about Deep Research APIs
- Explore Workflow APIs