1use crate::state::TodoItem;
4use async_trait::async_trait;
5use serde::{Deserialize, Serialize};
6use std::sync::Arc;
7
8#[derive(Debug, Clone, Serialize, Deserialize)]
9#[serde(tag = "event_type", rename_all = "snake_case")]
10pub enum AgentEvent {
11 AgentStarted(AgentStartedEvent),
12 AgentCompleted(AgentCompletedEvent),
13 ToolStarted(ToolStartedEvent),
14 ToolCompleted(ToolCompletedEvent),
15 ToolFailed(ToolFailedEvent),
16 SubAgentStarted(SubAgentStartedEvent),
17 SubAgentCompleted(SubAgentCompletedEvent),
18 TodosUpdated(TodosUpdatedEvent),
19 StateCheckpointed(StateCheckpointedEvent),
20 PlanningComplete(PlanningCompleteEvent),
21 TokenUsage(TokenUsageEvent),
22 StreamingToken(StreamingTokenEvent),
23}
24
25impl AgentEvent {
26 pub fn event_type_name(&self) -> &'static str {
27 match self {
28 AgentEvent::AgentStarted(_) => "agent_started",
29 AgentEvent::AgentCompleted(_) => "agent_completed",
30 AgentEvent::ToolStarted(_) => "tool_started",
31 AgentEvent::ToolCompleted(_) => "tool_completed",
32 AgentEvent::ToolFailed(_) => "tool_failed",
33 AgentEvent::SubAgentStarted(_) => "sub_agent_started",
34 AgentEvent::SubAgentCompleted(_) => "sub_agent_completed",
35 AgentEvent::TodosUpdated(_) => "todos_updated",
36 AgentEvent::StateCheckpointed(_) => "state_checkpointed",
37 AgentEvent::PlanningComplete(_) => "planning_complete",
38 AgentEvent::TokenUsage(_) => "token_usage",
39 AgentEvent::StreamingToken(_) => "streaming_token",
40 }
41 }
42
43 pub fn metadata(&self) -> &EventMetadata {
44 match self {
45 AgentEvent::AgentStarted(e) => &e.metadata,
46 AgentEvent::AgentCompleted(e) => &e.metadata,
47 AgentEvent::ToolStarted(e) => &e.metadata,
48 AgentEvent::ToolCompleted(e) => &e.metadata,
49 AgentEvent::ToolFailed(e) => &e.metadata,
50 AgentEvent::SubAgentStarted(e) => &e.metadata,
51 AgentEvent::SubAgentCompleted(e) => &e.metadata,
52 AgentEvent::TodosUpdated(e) => &e.metadata,
53 AgentEvent::StateCheckpointed(e) => &e.metadata,
54 AgentEvent::PlanningComplete(e) => &e.metadata,
55 AgentEvent::TokenUsage(e) => &e.metadata,
56 AgentEvent::StreamingToken(e) => &e.metadata,
57 }
58 }
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct EventMetadata {
63 pub thread_id: String,
64 pub correlation_id: String,
65 #[serde(skip_serializing_if = "Option::is_none")]
66 pub customer_id: Option<String>,
67 pub timestamp: String,
68}
69
70impl EventMetadata {
71 pub fn new(thread_id: String, correlation_id: String, customer_id: Option<String>) -> Self {
72 Self {
73 thread_id,
74 correlation_id,
75 customer_id,
76 timestamp: chrono::Utc::now().to_rfc3339(),
77 }
78 }
79}
80
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct AgentStartedEvent {
83 pub metadata: EventMetadata,
84 pub agent_name: String,
85 pub message_preview: String,
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct AgentCompletedEvent {
90 pub metadata: EventMetadata,
91 pub agent_name: String,
92 pub duration_ms: u64,
93 pub response_preview: String, pub response: String, }
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
98pub struct ToolStartedEvent {
99 pub metadata: EventMetadata,
100 pub tool_name: String,
101 pub input_summary: String,
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
105pub struct ToolCompletedEvent {
106 pub metadata: EventMetadata,
107 pub tool_name: String,
108 pub duration_ms: u64,
109 pub result_summary: String,
110 pub success: bool,
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct ToolFailedEvent {
115 pub metadata: EventMetadata,
116 pub tool_name: String,
117 pub duration_ms: u64,
118 pub error_message: String,
119 pub is_recoverable: bool,
120 pub retry_count: u32,
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct SubAgentStartedEvent {
125 pub metadata: EventMetadata,
126 pub agent_name: String,
127 pub instruction_summary: String,
128 pub delegation_depth: u32,
129}
130
131#[derive(Debug, Clone, Serialize, Deserialize)]
132pub struct SubAgentCompletedEvent {
133 pub metadata: EventMetadata,
134 pub agent_name: String,
135 pub duration_ms: u64,
136 pub result_summary: String,
137}
138
139#[derive(Debug, Clone, Serialize, Deserialize)]
140pub struct TodosUpdatedEvent {
141 pub metadata: EventMetadata,
142 pub todos: Vec<TodoItem>,
143 pub pending_count: usize,
144 pub in_progress_count: usize,
145 pub completed_count: usize,
146 pub last_updated: String,
147}
148
149#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct StateCheckpointedEvent {
151 pub metadata: EventMetadata,
152 pub checkpoint_id: String,
153 pub state_size_bytes: usize,
154}
155
156#[derive(Debug, Clone, Serialize, Deserialize)]
157pub struct PlanningCompleteEvent {
158 pub metadata: EventMetadata,
159 pub action_type: String,
160 pub action_summary: String,
161}
162
163#[derive(Debug, Clone, Serialize, Deserialize)]
164pub struct TokenUsageEvent {
165 pub metadata: EventMetadata,
166 pub usage: TokenUsage,
167}
168
169#[derive(Debug, Clone, Serialize, Deserialize)]
170pub struct StreamingTokenEvent {
171 pub metadata: EventMetadata,
172 pub agent_name: String,
173 pub token: String,
174}
175
176#[derive(Debug, Clone, Serialize, Deserialize)]
177pub struct TokenUsage {
178 pub input_tokens: u32,
180 pub output_tokens: u32,
182 pub total_tokens: u32,
184 pub estimated_cost: f64,
186 pub provider: String,
188 pub model: String,
190 pub duration_ms: u64,
192 pub timestamp: String,
194}
195
196impl TokenUsage {
197 pub fn new(
198 input_tokens: u32,
199 output_tokens: u32,
200 provider: impl Into<String>,
201 model: impl Into<String>,
202 duration_ms: u64,
203 estimated_cost: f64,
204 ) -> Self {
205 let provider = provider.into();
206 let model = model.into();
207 let total_tokens = input_tokens + output_tokens;
208
209 Self {
210 input_tokens,
211 output_tokens,
212 total_tokens,
213 estimated_cost,
214 provider,
215 model,
216 duration_ms,
217 timestamp: chrono::Utc::now().to_rfc3339(),
218 }
219 }
220}
221
222#[async_trait]
223pub trait EventBroadcaster: Send + Sync {
224 fn id(&self) -> &str;
225 async fn broadcast(&self, event: &AgentEvent) -> anyhow::Result<()>;
226 fn should_broadcast(&self, _event: &AgentEvent) -> bool {
227 true
228 }
229
230 fn supports_streaming(&self) -> bool {
233 false
234 }
235}
236
237pub struct EventDispatcher {
238 broadcasters: std::sync::RwLock<Vec<Arc<dyn EventBroadcaster>>>,
239}
240
241impl EventDispatcher {
242 pub fn new() -> Self {
243 Self {
244 broadcasters: std::sync::RwLock::new(Vec::new()),
245 }
246 }
247
248 pub fn add_broadcaster(&self, broadcaster: Arc<dyn EventBroadcaster>) {
250 if let Ok(mut broadcasters) = self.broadcasters.write() {
251 broadcasters.push(broadcaster);
252 } else {
253 tracing::error!("Failed to acquire write lock on broadcasters");
254 }
255 }
256
257 pub async fn dispatch(&self, event: AgentEvent) {
258 let broadcasters = {
259 if let Ok(guard) = self.broadcasters.read() {
260 guard.clone()
261 } else {
262 tracing::error!("Failed to acquire read lock on broadcasters");
263 return;
264 }
265 };
266
267 for broadcaster in broadcasters {
268 let event_clone = event.clone();
269 tokio::spawn(async move {
270 if matches!(event_clone, AgentEvent::StreamingToken(_))
272 && !broadcaster.supports_streaming()
273 {
274 return;
275 }
276
277 if broadcaster.should_broadcast(&event_clone) {
278 if let Err(e) = broadcaster.broadcast(&event_clone).await {
279 tracing::warn!(
280 broadcaster_id = broadcaster.id(),
281 error = %e,
282 "Failed to broadcast event"
283 );
284 }
285 }
286 });
287 }
288 }
289}
290
291impl Default for EventDispatcher {
292 fn default() -> Self {
293 Self::new()
294 }
295}