agents_core/
events.rs

1//! Event system for agent lifecycle tracking and progress broadcasting
2
3use crate::state::TodoItem;
4use async_trait::async_trait;
5use serde::{Deserialize, Serialize};
6use std::sync::Arc;
7
8#[derive(Debug, Clone, Serialize, Deserialize)]
9#[serde(tag = "event_type", rename_all = "snake_case")]
10pub enum AgentEvent {
11    AgentStarted(AgentStartedEvent),
12    AgentCompleted(AgentCompletedEvent),
13    ToolStarted(ToolStartedEvent),
14    ToolCompleted(ToolCompletedEvent),
15    ToolFailed(ToolFailedEvent),
16    SubAgentStarted(SubAgentStartedEvent),
17    SubAgentCompleted(SubAgentCompletedEvent),
18    TodosUpdated(TodosUpdatedEvent),
19    StateCheckpointed(StateCheckpointedEvent),
20    PlanningComplete(PlanningCompleteEvent),
21    TokenUsage(TokenUsageEvent),
22    StreamingToken(StreamingTokenEvent),
23}
24
25impl AgentEvent {
26    pub fn event_type_name(&self) -> &'static str {
27        match self {
28            AgentEvent::AgentStarted(_) => "agent_started",
29            AgentEvent::AgentCompleted(_) => "agent_completed",
30            AgentEvent::ToolStarted(_) => "tool_started",
31            AgentEvent::ToolCompleted(_) => "tool_completed",
32            AgentEvent::ToolFailed(_) => "tool_failed",
33            AgentEvent::SubAgentStarted(_) => "sub_agent_started",
34            AgentEvent::SubAgentCompleted(_) => "sub_agent_completed",
35            AgentEvent::TodosUpdated(_) => "todos_updated",
36            AgentEvent::StateCheckpointed(_) => "state_checkpointed",
37            AgentEvent::PlanningComplete(_) => "planning_complete",
38            AgentEvent::TokenUsage(_) => "token_usage",
39            AgentEvent::StreamingToken(_) => "streaming_token",
40        }
41    }
42
43    pub fn metadata(&self) -> &EventMetadata {
44        match self {
45            AgentEvent::AgentStarted(e) => &e.metadata,
46            AgentEvent::AgentCompleted(e) => &e.metadata,
47            AgentEvent::ToolStarted(e) => &e.metadata,
48            AgentEvent::ToolCompleted(e) => &e.metadata,
49            AgentEvent::ToolFailed(e) => &e.metadata,
50            AgentEvent::SubAgentStarted(e) => &e.metadata,
51            AgentEvent::SubAgentCompleted(e) => &e.metadata,
52            AgentEvent::TodosUpdated(e) => &e.metadata,
53            AgentEvent::StateCheckpointed(e) => &e.metadata,
54            AgentEvent::PlanningComplete(e) => &e.metadata,
55            AgentEvent::TokenUsage(e) => &e.metadata,
56            AgentEvent::StreamingToken(e) => &e.metadata,
57        }
58    }
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct EventMetadata {
63    pub thread_id: String,
64    pub correlation_id: String,
65    #[serde(skip_serializing_if = "Option::is_none")]
66    pub customer_id: Option<String>,
67    pub timestamp: String,
68}
69
70impl EventMetadata {
71    pub fn new(thread_id: String, correlation_id: String, customer_id: Option<String>) -> Self {
72        Self {
73            thread_id,
74            correlation_id,
75            customer_id,
76            timestamp: chrono::Utc::now().to_rfc3339(),
77        }
78    }
79}
80
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct AgentStartedEvent {
83    pub metadata: EventMetadata,
84    pub agent_name: String,
85    pub message_preview: String,
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct AgentCompletedEvent {
90    pub metadata: EventMetadata,
91    pub agent_name: String,
92    pub duration_ms: u64,
93    pub response_preview: String, // Truncated for logs (~100 chars)
94    pub response: String,         // Full response text
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
98pub struct ToolStartedEvent {
99    pub metadata: EventMetadata,
100    pub tool_name: String,
101    pub input_summary: String,
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
105pub struct ToolCompletedEvent {
106    pub metadata: EventMetadata,
107    pub tool_name: String,
108    pub duration_ms: u64,
109    pub result_summary: String,
110    pub success: bool,
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct ToolFailedEvent {
115    pub metadata: EventMetadata,
116    pub tool_name: String,
117    pub duration_ms: u64,
118    pub error_message: String,
119    pub is_recoverable: bool,
120    pub retry_count: u32,
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct SubAgentStartedEvent {
125    pub metadata: EventMetadata,
126    pub agent_name: String,
127    pub instruction_summary: String,
128    pub delegation_depth: u32,
129}
130
131#[derive(Debug, Clone, Serialize, Deserialize)]
132pub struct SubAgentCompletedEvent {
133    pub metadata: EventMetadata,
134    pub agent_name: String,
135    pub duration_ms: u64,
136    pub result_summary: String,
137}
138
139#[derive(Debug, Clone, Serialize, Deserialize)]
140pub struct TodosUpdatedEvent {
141    pub metadata: EventMetadata,
142    pub todos: Vec<TodoItem>,
143    pub pending_count: usize,
144    pub in_progress_count: usize,
145    pub completed_count: usize,
146    pub last_updated: String,
147}
148
149#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct StateCheckpointedEvent {
151    pub metadata: EventMetadata,
152    pub checkpoint_id: String,
153    pub state_size_bytes: usize,
154}
155
156#[derive(Debug, Clone, Serialize, Deserialize)]
157pub struct PlanningCompleteEvent {
158    pub metadata: EventMetadata,
159    pub action_type: String,
160    pub action_summary: String,
161}
162
163#[derive(Debug, Clone, Serialize, Deserialize)]
164pub struct TokenUsageEvent {
165    pub metadata: EventMetadata,
166    pub usage: TokenUsage,
167}
168
169#[derive(Debug, Clone, Serialize, Deserialize)]
170pub struct StreamingTokenEvent {
171    pub metadata: EventMetadata,
172    pub agent_name: String,
173    pub token: String,
174}
175
176#[derive(Debug, Clone, Serialize, Deserialize)]
177pub struct TokenUsage {
178    /// Number of input tokens
179    pub input_tokens: u32,
180    /// Number of output tokens
181    pub output_tokens: u32,
182    /// Total tokens used
183    pub total_tokens: u32,
184    /// Estimated cost in USD
185    pub estimated_cost: f64,
186    /// Provider name
187    pub provider: String,
188    /// Model name
189    pub model: String,
190    /// Request duration in milliseconds
191    pub duration_ms: u64,
192    /// Timestamp of the request
193    pub timestamp: String,
194}
195
196impl TokenUsage {
197    pub fn new(
198        input_tokens: u32,
199        output_tokens: u32,
200        provider: impl Into<String>,
201        model: impl Into<String>,
202        duration_ms: u64,
203        estimated_cost: f64,
204    ) -> Self {
205        let provider = provider.into();
206        let model = model.into();
207        let total_tokens = input_tokens + output_tokens;
208
209        Self {
210            input_tokens,
211            output_tokens,
212            total_tokens,
213            estimated_cost,
214            provider,
215            model,
216            duration_ms,
217            timestamp: chrono::Utc::now().to_rfc3339(),
218        }
219    }
220}
221
222#[async_trait]
223pub trait EventBroadcaster: Send + Sync {
224    fn id(&self) -> &str;
225    async fn broadcast(&self, event: &AgentEvent) -> anyhow::Result<()>;
226    fn should_broadcast(&self, _event: &AgentEvent) -> bool {
227        true
228    }
229
230    /// Indicates whether this broadcaster supports streaming token events.
231    /// Default is false for backward compatibility.
232    fn supports_streaming(&self) -> bool {
233        false
234    }
235}
236
237pub struct EventDispatcher {
238    broadcasters: std::sync::RwLock<Vec<Arc<dyn EventBroadcaster>>>,
239}
240
241impl EventDispatcher {
242    pub fn new() -> Self {
243        Self {
244            broadcasters: std::sync::RwLock::new(Vec::new()),
245        }
246    }
247
248    /// Add a broadcaster (supports dynamic addition with interior mutability)
249    pub fn add_broadcaster(&self, broadcaster: Arc<dyn EventBroadcaster>) {
250        if let Ok(mut broadcasters) = self.broadcasters.write() {
251            broadcasters.push(broadcaster);
252        } else {
253            tracing::error!("Failed to acquire write lock on broadcasters");
254        }
255    }
256
257    pub async fn dispatch(&self, event: AgentEvent) {
258        let broadcasters = {
259            if let Ok(guard) = self.broadcasters.read() {
260                guard.clone()
261            } else {
262                tracing::error!("Failed to acquire read lock on broadcasters");
263                return;
264            }
265        };
266
267        for broadcaster in broadcasters {
268            let event_clone = event.clone();
269            tokio::spawn(async move {
270                // Skip streaming tokens for broadcasters that don't support them
271                if matches!(event_clone, AgentEvent::StreamingToken(_))
272                    && !broadcaster.supports_streaming()
273                {
274                    return;
275                }
276
277                if broadcaster.should_broadcast(&event_clone) {
278                    if let Err(e) = broadcaster.broadcast(&event_clone).await {
279                        tracing::warn!(
280                            broadcaster_id = broadcaster.id(),
281                            error = %e,
282                            "Failed to broadcast event"
283                        );
284                    }
285                }
286            });
287        }
288    }
289}
290
291impl Default for EventDispatcher {
292    fn default() -> Self {
293        Self::new()
294    }
295}