agents_runtime/agent/
runtime.rs

1//! Deep Agent runtime implementation
2//!
3//! This module contains the core DeepAgent struct and its runtime behavior,
4//! including message handling, tool execution, HITL support, and state management.
5
6use super::config::DeepAgentConfig;
7use crate::middleware::{
8    AgentMiddleware, AnthropicPromptCachingMiddleware, BaseSystemPromptMiddleware,
9    DeepAgentPromptMiddleware, FilesystemMiddleware, HumanInLoopMiddleware, MiddlewareContext,
10    ModelRequest, PlanningMiddleware, SubAgentDescriptor, SubAgentMiddleware, SubAgentRegistration,
11    SummarizationMiddleware,
12};
13use crate::planner::LlmBackedPlanner;
14use agents_core::agent::{
15    AgentDescriptor, AgentHandle, PlannerAction, PlannerContext, PlannerHandle,
16};
17use agents_core::hitl::{AgentInterrupt, HitlAction};
18use agents_core::messaging::{AgentMessage, MessageContent, MessageMetadata, MessageRole};
19use agents_core::persistence::{Checkpointer, ThreadId};
20use agents_core::state::AgentStateSnapshot;
21use agents_core::tools::{ToolBox, ToolContext, ToolResult};
22use async_trait::async_trait;
23use serde_json::Value;
24use std::collections::{HashMap, HashSet};
25use std::sync::{Arc, RwLock};
26
27// Built-in tool names exposed by middlewares. The `task` tool for subagents is not gated.
28const BUILTIN_TOOL_NAMES: &[&str] = &["write_todos", "ls", "read_file", "write_file", "edit_file"];
29
30// (no streaming types in baseline)
31
32/// Helper function to count todos by status
33fn count_todos(todos: &[agents_core::state::TodoItem]) -> (usize, usize, usize) {
34    let mut pending = 0;
35    let mut in_progress = 0;
36    let mut completed = 0;
37
38    for todo in todos {
39        match todo.status {
40            agents_core::state::TodoStatus::Pending => pending += 1,
41            agents_core::state::TodoStatus::InProgress => in_progress += 1,
42            agents_core::state::TodoStatus::Completed => completed += 1,
43        }
44    }
45
46    (pending, in_progress, completed)
47}
48
49/// Core Deep Agent runtime implementation
50///
51/// This struct contains all the runtime state and behavior for a Deep Agent,
52/// including middleware management, tool execution, HITL support, and state persistence.
53pub struct DeepAgent {
54    descriptor: AgentDescriptor,
55    instructions: String,
56    planner: Arc<dyn PlannerHandle>,
57    middlewares: Vec<Arc<dyn AgentMiddleware>>,
58    base_tools: Vec<ToolBox>,
59    state: Arc<RwLock<AgentStateSnapshot>>,
60    history: Arc<RwLock<Vec<AgentMessage>>>,
61    _summarization: Option<Arc<SummarizationMiddleware>>,
62    _hitl: Option<Arc<HumanInLoopMiddleware>>,
63    builtin_tools: Option<HashSet<String>>,
64    checkpointer: Option<Arc<dyn Checkpointer>>,
65    event_dispatcher: Option<Arc<agents_core::events::EventDispatcher>>,
66    enable_pii_sanitization: bool,
67}
68
69impl DeepAgent {
70    fn collect_tools(&self) -> HashMap<String, ToolBox> {
71        let mut tools: HashMap<String, ToolBox> = HashMap::new();
72        for tool in &self.base_tools {
73            tools.insert(tool.schema().name.clone(), tool.clone());
74        }
75        for middleware in &self.middlewares {
76            for tool in middleware.tools() {
77                let tool_name = tool.schema().name.clone();
78                if self.should_include(&tool_name) {
79                    tools.insert(tool_name, tool);
80                }
81            }
82        }
83        tools
84    }
85    // no streaming path in baseline
86
87    fn should_include(&self, name: &str) -> bool {
88        let is_builtin = BUILTIN_TOOL_NAMES.contains(&name);
89        if !is_builtin {
90            return true;
91        }
92        match &self.builtin_tools {
93            None => true,
94            Some(selected) => selected.contains(name),
95        }
96    }
97
98    fn append_history(&self, message: AgentMessage) {
99        if let Ok(mut history) = self.history.write() {
100            history.push(message);
101        }
102    }
103
104    fn current_history(&self) -> Vec<AgentMessage> {
105        self.history.read().map(|h| h.clone()).unwrap_or_default()
106    }
107
108    fn emit_event(&self, event: agents_core::events::AgentEvent) {
109        if let Some(dispatcher) = &self.event_dispatcher {
110            let dispatcher_clone = dispatcher.clone();
111            tokio::spawn(async move {
112                dispatcher_clone.dispatch(event).await;
113            });
114        }
115    }
116
117    fn create_event_metadata(&self) -> agents_core::events::EventMetadata {
118        agents_core::events::EventMetadata::new(
119            "default".to_string(),
120            uuid::Uuid::new_v4().to_string(),
121            None,
122        )
123    }
124
125    fn truncate_message(&self, message: &AgentMessage) -> String {
126        let text = match &message.content {
127            MessageContent::Text(t) => t.clone(),
128            MessageContent::Json(v) => v.to_string(),
129        };
130
131        if self.enable_pii_sanitization {
132            agents_core::security::safe_preview(&text, agents_core::security::MAX_PREVIEW_LENGTH)
133        } else {
134            // No sanitization - just truncate
135            agents_core::security::truncate_string(&text, agents_core::security::MAX_PREVIEW_LENGTH)
136        }
137    }
138
139    fn get_full_message_text(&self, message: &AgentMessage) -> String {
140        match &message.content {
141            MessageContent::Text(t) => t.clone(),
142            MessageContent::Json(v) => v.to_string(),
143        }
144    }
145
146    fn summarize_payload(&self, payload: &Value) -> String {
147        if self.enable_pii_sanitization {
148            agents_core::security::sanitize_tool_payload(
149                payload,
150                agents_core::security::MAX_PREVIEW_LENGTH,
151            )
152        } else {
153            // No sanitization - just truncate JSON string
154            let json_str = payload.to_string();
155            agents_core::security::truncate_string(
156                &json_str,
157                agents_core::security::MAX_PREVIEW_LENGTH,
158            )
159        }
160    }
161
162    /// Save the current agent state to the configured checkpointer.
163    pub async fn save_state(&self, thread_id: &ThreadId) -> anyhow::Result<()> {
164        if let Some(ref checkpointer) = self.checkpointer {
165            let state = self
166                .state
167                .read()
168                .map_err(|_| anyhow::anyhow!("Failed to read agent state"))?
169                .clone();
170
171            // Calculate state size before saving
172            let state_json = serde_json::to_string(&state)?;
173            let state_size = state_json.len();
174
175            // Save state to checkpointer
176            checkpointer.save_state(thread_id, &state).await?;
177
178            // Emit StateCheckpointed event after successful save
179            self.emit_event(agents_core::events::AgentEvent::StateCheckpointed(
180                agents_core::events::StateCheckpointedEvent {
181                    metadata: self.create_event_metadata(),
182                    checkpoint_id: thread_id.to_string(),
183                    state_size_bytes: state_size,
184                },
185            ));
186
187            tracing::debug!(
188                thread_id = %thread_id,
189                state_size_bytes = state_size,
190                "πŸ’Ύ State checkpointed and event emitted"
191            );
192
193            Ok(())
194        } else {
195            tracing::warn!("Attempted to save state but no checkpointer is configured");
196            Ok(())
197        }
198    }
199
200    /// Load agent state from the configured checkpointer.
201    pub async fn load_state(&self, thread_id: &ThreadId) -> anyhow::Result<bool> {
202        if let Some(ref checkpointer) = self.checkpointer {
203            if let Some(saved_state) = checkpointer.load_state(thread_id).await? {
204                *self
205                    .state
206                    .write()
207                    .map_err(|_| anyhow::anyhow!("Failed to write agent state"))? = saved_state;
208                tracing::info!(thread_id = %thread_id, "Loaded agent state from checkpointer");
209                Ok(true)
210            } else {
211                tracing::debug!(thread_id = %thread_id, "No saved state found for thread");
212                Ok(false)
213            }
214        } else {
215            tracing::warn!("Attempted to load state but no checkpointer is configured");
216            Ok(false)
217        }
218    }
219
220    /// Delete saved state for the specified thread.
221    pub async fn delete_thread(&self, thread_id: &ThreadId) -> anyhow::Result<()> {
222        if let Some(ref checkpointer) = self.checkpointer {
223            checkpointer.delete_thread(thread_id).await
224        } else {
225            tracing::warn!("Attempted to delete thread state but no checkpointer is configured");
226            Ok(())
227        }
228    }
229
230    /// List all threads with saved state.
231    pub async fn list_threads(&self) -> anyhow::Result<Vec<ThreadId>> {
232        if let Some(ref checkpointer) = self.checkpointer {
233            checkpointer.list_threads().await
234        } else {
235            Ok(Vec::new())
236        }
237    }
238
239    async fn execute_tool(
240        &self,
241        tool: ToolBox,
242        _tool_name: String,
243        payload: Value,
244    ) -> anyhow::Result<AgentMessage> {
245        let state_snapshot = self.state.read().unwrap().clone();
246        let ctx = ToolContext::with_mutable_state(Arc::new(state_snapshot), self.state.clone());
247
248        let result = tool.execute(payload, ctx).await?;
249        Ok(self.apply_tool_result(result))
250    }
251
252    fn apply_tool_result(&self, result: ToolResult) -> AgentMessage {
253        match result {
254            ToolResult::Message(message) => {
255                // Tool results are not added to conversation history
256                // Only the final LLM response after tool execution is added
257                message
258            }
259            ToolResult::WithStateUpdate {
260                message,
261                state_diff,
262            } => {
263                // Check if todos were updated
264                let todos_updated = state_diff.todos.is_some();
265
266                if let Ok(mut state) = self.state.write() {
267                    let command = agents_core::command::Command::with_state(state_diff);
268                    command.apply_to(&mut state);
269
270                    // Emit TodosUpdated event if todos were modified
271                    if todos_updated {
272                        let (pending_count, in_progress_count, completed_count) =
273                            count_todos(&state.todos);
274
275                        self.emit_event(agents_core::events::AgentEvent::TodosUpdated(
276                            agents_core::events::TodosUpdatedEvent {
277                                metadata: self.create_event_metadata(),
278                                todos: state.todos.clone(),
279                                pending_count,
280                                in_progress_count,
281                                completed_count,
282                                last_updated: chrono::Utc::now().to_rfc3339(),
283                            },
284                        ));
285
286                        tracing::debug!(
287                            pending = pending_count,
288                            in_progress = in_progress_count,
289                            completed = completed_count,
290                            total = state.todos.len(),
291                            "πŸ“ Todos updated and event emitted"
292                        );
293                    }
294                }
295                // Tool results are not added to conversation history
296                // Only the final LLM response after tool execution is added
297                message
298            }
299        }
300    }
301
302    /// Get the current pending interrupt, if any.
303    pub fn current_interrupt(&self) -> Option<AgentInterrupt> {
304        self.state
305            .read()
306            .ok()
307            .and_then(|guard| guard.pending_interrupts.first().cloned())
308    }
309
310    /// Add a broadcaster dynamically to the agent's event dispatcher.
311    ///
312    /// Add a single broadcaster dynamically after the agent is built.
313    ///
314    /// This is useful for per-conversation or per-customer broadcasters.
315    ///
316    /// # Example
317    /// ```no_run
318    /// use std::sync::Arc;
319    /// // agent.add_broadcaster(Arc::new(MyBroadcaster::new()));
320    /// ```
321    pub fn add_broadcaster(&self, broadcaster: Arc<dyn agents_core::events::EventBroadcaster>) {
322        if let Some(dispatcher) = &self.event_dispatcher {
323            dispatcher.add_broadcaster(broadcaster);
324            tracing::debug!("Broadcaster added to event dispatcher");
325        } else {
326            tracing::warn!("add_broadcaster called but no event dispatcher configured");
327        }
328    }
329
330    /// Add multiple broadcasters at once.
331    ///
332    /// This is useful when you need to add several broadcasters for a conversation
333    /// (e.g., WhatsApp, SSE, DynamoDB).
334    ///
335    /// # Example
336    /// ```no_run
337    /// use std::sync::Arc;
338    /// // agent.add_broadcasters(vec![
339    /// //     Arc::new(WhatsAppBroadcaster::new(phone)),
340    /// //     Arc::new(SseBroadcaster::new(channel)),
341    /// //     Arc::new(DynamoDbBroadcaster::new(table)),
342    /// // ]);
343    /// ```
344    pub fn add_broadcasters(
345        &self,
346        broadcasters: Vec<Arc<dyn agents_core::events::EventBroadcaster>>,
347    ) {
348        if let Some(dispatcher) = &self.event_dispatcher {
349            for broadcaster in broadcasters {
350                dispatcher.add_broadcaster(broadcaster);
351            }
352            tracing::debug!("Multiple broadcasters added to event dispatcher");
353        } else {
354            tracing::warn!("add_broadcasters called but no event dispatcher configured");
355        }
356    }
357
358    /// Resume execution after human approval of an interrupt.
359    pub async fn resume_with_approval(&self, action: HitlAction) -> anyhow::Result<AgentMessage> {
360        // Get the first pending interrupt
361        let interrupt = {
362            let state_guard = self
363                .state
364                .read()
365                .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on state"))?;
366            state_guard
367                .pending_interrupts
368                .first()
369                .cloned()
370                .ok_or_else(|| anyhow::anyhow!("No pending interrupts"))?
371        };
372
373        let result_message = match action {
374            HitlAction::Accept => {
375                // Execute with original args
376                let AgentInterrupt::HumanInLoop(hitl) = interrupt;
377                tracing::info!(
378                    tool_name = %hitl.tool_name,
379                    call_id = %hitl.call_id,
380                    "βœ… HITL: Tool approved, executing with original arguments"
381                );
382
383                let tools = self.collect_tools();
384                let tool = tools
385                    .get(&hitl.tool_name)
386                    .cloned()
387                    .ok_or_else(|| anyhow::anyhow!("Tool '{}' not found", hitl.tool_name))?;
388
389                self.execute_tool(tool, hitl.tool_name, hitl.tool_args)
390                    .await?
391            }
392
393            HitlAction::Edit {
394                tool_name,
395                tool_args,
396            } => {
397                // Execute with modified args
398                tracing::info!(
399                    tool_name = %tool_name,
400                    "✏️ HITL: Tool edited, executing with modified arguments"
401                );
402
403                let tools = self.collect_tools();
404                let tool = tools
405                    .get(&tool_name)
406                    .cloned()
407                    .ok_or_else(|| anyhow::anyhow!("Tool '{}' not found", tool_name))?;
408
409                self.execute_tool(tool, tool_name, tool_args).await?
410            }
411
412            HitlAction::Reject { reason } => {
413                // Don't execute - return rejection message
414                tracing::info!("❌ HITL: Tool rejected");
415
416                let text = reason
417                    .unwrap_or_else(|| "Tool execution rejected by human reviewer.".to_string());
418
419                let message = AgentMessage {
420                    role: MessageRole::Tool,
421                    content: MessageContent::Text(text),
422                    metadata: None,
423                };
424
425                self.append_history(message.clone());
426                message
427            }
428
429            HitlAction::Respond { message } => {
430                // Don't execute - return custom message
431                tracing::info!("πŸ’¬ HITL: Custom response provided");
432
433                self.append_history(message.clone());
434                message
435            }
436        };
437
438        // Clear the interrupt from state
439        {
440            let mut state_guard = self
441                .state
442                .write()
443                .map_err(|_| anyhow::anyhow!("Failed to acquire write lock on state"))?;
444            state_guard.clear_interrupts();
445        }
446
447        // Persist cleared state
448        if let Some(checkpointer) = &self.checkpointer {
449            let state_clone = self
450                .state
451                .read()
452                .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on state"))?
453                .clone();
454            checkpointer
455                .save_state(&ThreadId::default(), &state_clone)
456                .await?;
457        }
458
459        Ok(result_message)
460    }
461
462    /// Handle message from string input - converts string to AgentMessage internally
463    pub async fn handle_message(
464        &self,
465        input: impl AsRef<str>,
466        state: Arc<AgentStateSnapshot>,
467    ) -> anyhow::Result<AgentMessage> {
468        self.handle_message_with_metadata(input, None, state).await
469    }
470
471    /// Handle message from string input with metadata - converts string to AgentMessage internally
472    pub async fn handle_message_with_metadata(
473        &self,
474        input: impl AsRef<str>,
475        metadata: Option<MessageMetadata>,
476        state: Arc<AgentStateSnapshot>,
477    ) -> anyhow::Result<AgentMessage> {
478        let agent_message = AgentMessage {
479            role: MessageRole::User,
480            content: MessageContent::Text(input.as_ref().to_string()),
481            metadata,
482        };
483        self.handle_message_internal(agent_message, state).await
484    }
485
486    /// Internal method that contains the actual message handling logic
487    async fn handle_message_internal(
488        &self,
489        input: AgentMessage,
490        loaded_state: Arc<AgentStateSnapshot>,
491    ) -> anyhow::Result<AgentMessage> {
492        let start_time = std::time::Instant::now();
493
494        // Initialize internal state with loaded state from checkpointer
495        // This ensures conversation context is maintained across sessions
496        if let Ok(mut state_guard) = self.state.write() {
497            *state_guard = (*loaded_state).clone();
498        }
499
500        self.emit_event(agents_core::events::AgentEvent::AgentStarted(
501            agents_core::events::AgentStartedEvent {
502                metadata: self.create_event_metadata(),
503                agent_name: self.descriptor.name.clone(),
504                message_preview: self.truncate_message(&input),
505            },
506        ));
507
508        self.append_history(input.clone());
509
510        // ReAct loop: continue until LLM responds with text (not tool calls)
511        let max_iterations = 10;
512        let mut iteration = 0;
513
514        loop {
515            iteration += 1;
516            if iteration > max_iterations {
517                tracing::warn!(
518                    "⚠️ Max iterations ({}) reached, stopping ReAct loop",
519                    max_iterations
520                );
521                let response = AgentMessage {
522                    role: MessageRole::Agent,
523                    content: MessageContent::Text(
524                        "I've reached the maximum number of steps. Let me summarize what I've done so far.".to_string()
525                    ),
526                    metadata: None,
527                };
528                self.append_history(response.clone());
529                return Ok(response);
530            }
531
532            tracing::debug!("πŸ”„ ReAct iteration {}/{}", iteration, max_iterations);
533
534            // Build request with current history
535            let mut request = ModelRequest::new(&self.instructions, self.current_history());
536            let tools = self.collect_tools();
537            for middleware in &self.middlewares {
538                let mut ctx = MiddlewareContext::with_request(&mut request, self.state.clone());
539                middleware.modify_model_request(&mut ctx).await?;
540            }
541
542            let tool_schemas: Vec<_> = tools.values().map(|t| t.schema()).collect();
543            let context = PlannerContext {
544                history: request.messages.clone(),
545                system_prompt: request.system_prompt.clone(),
546                tools: tool_schemas,
547            };
548            let state_snapshot = Arc::new(self.state.read().map(|s| s.clone()).unwrap_or_default());
549
550            // Ask LLM what to do
551            let decision = self.planner.plan(context, state_snapshot).await?;
552
553            // Emit PlanningComplete event
554            self.emit_event(agents_core::events::AgentEvent::PlanningComplete(
555                agents_core::events::PlanningCompleteEvent {
556                    metadata: self.create_event_metadata(),
557                    action_type: match &decision.next_action {
558                        PlannerAction::Respond { .. } => "respond".to_string(),
559                        PlannerAction::CallTool { .. } => "call_tool".to_string(),
560                        PlannerAction::Terminate => "terminate".to_string(),
561                    },
562                    action_summary: match &decision.next_action {
563                        PlannerAction::Respond { message } => {
564                            format!("Respond: {}", self.truncate_message(message))
565                        }
566                        PlannerAction::CallTool { tool_name, .. } => {
567                            format!("Call tool: {}", tool_name)
568                        }
569                        PlannerAction::Terminate => "Terminate".to_string(),
570                    },
571                },
572            ));
573
574            match decision.next_action {
575                PlannerAction::Respond { message } => {
576                    // LLM decided to respond with text - exit loop
577                    self.emit_event(agents_core::events::AgentEvent::AgentCompleted(
578                        agents_core::events::AgentCompletedEvent {
579                            metadata: self.create_event_metadata(),
580                            agent_name: self.descriptor.name.clone(),
581                            duration_ms: start_time.elapsed().as_millis() as u64,
582                            response_preview: self.truncate_message(&message),
583                            response: self.get_full_message_text(&message),
584                        },
585                    ));
586
587                    self.append_history(message.clone());
588                    return Ok(message);
589                }
590                PlannerAction::CallTool { tool_name, payload } => {
591                    // Add AI's decision to call tool to history
592                    // This is needed for OpenAI's API which expects:
593                    // 1. Assistant message with tool call
594                    // 2. Tool message with result
595                    let tool_call_message = AgentMessage {
596                        role: MessageRole::Agent,
597                        content: MessageContent::Text(format!(
598                            "Calling tool: {} with args: {}",
599                            tool_name,
600                            serde_json::to_string(&payload).unwrap_or_default()
601                        )),
602                        metadata: None,
603                    };
604                    self.append_history(tool_call_message);
605
606                    if let Some(tool) = tools.get(&tool_name).cloned() {
607                        // Check all middleware for interrupts before executing tool
608                        let call_id = format!("call_{}", uuid::Uuid::new_v4());
609                        for middleware in &self.middlewares {
610                            if let Some(interrupt) = middleware
611                                .before_tool_execution(&tool_name, &payload, &call_id)
612                                .await?
613                            {
614                                // Save interrupt to state
615                                {
616                                    let mut state_guard = self.state.write().map_err(|_| {
617                                        anyhow::anyhow!("Failed to acquire write lock on state")
618                                    })?;
619                                    state_guard.add_interrupt(interrupt.clone());
620                                }
621
622                                // Persist state with checkpointer
623                                if let Some(checkpointer) = &self.checkpointer {
624                                    let state_clone = self
625                                        .state
626                                        .read()
627                                        .map_err(|_| {
628                                            anyhow::anyhow!("Failed to acquire read lock on state")
629                                        })?
630                                        .clone();
631                                    checkpointer
632                                        .save_state(&ThreadId::default(), &state_clone)
633                                        .await?;
634                                }
635
636                                // Return interrupt message - execution pauses here
637                                let interrupt_message = AgentMessage {
638                                    role: MessageRole::System,
639                                    content: MessageContent::Text(format!(
640                                        "⏸️ Execution paused: Tool '{}' requires human approval",
641                                        tool_name
642                                    )),
643                                    metadata: None,
644                                };
645                                self.append_history(interrupt_message.clone());
646                                return Ok(interrupt_message);
647                            }
648                        }
649
650                        // No interrupt - execute tool
651                        let tool_start_time = std::time::Instant::now();
652
653                        self.emit_event(agents_core::events::AgentEvent::ToolStarted(
654                            agents_core::events::ToolStartedEvent {
655                                metadata: self.create_event_metadata(),
656                                tool_name: tool_name.clone(),
657                                input_summary: self.summarize_payload(&payload),
658                            },
659                        ));
660
661                        tracing::warn!(
662                            "βš™οΈ EXECUTING TOOL: {} with payload: {}",
663                            tool_name,
664                            serde_json::to_string(&payload)
665                                .unwrap_or_else(|_| "invalid json".to_string())
666                        );
667
668                        let result = self
669                            .execute_tool(tool.clone(), tool_name.clone(), payload.clone())
670                            .await;
671
672                        let duration = tool_start_time.elapsed();
673                        match result {
674                            Ok(tool_result_message) => {
675                                let content_preview = match &tool_result_message.content {
676                                    MessageContent::Text(t) => {
677                                        if t.len() > 100 {
678                                            format!("{}... ({} chars)", &t[..100], t.len())
679                                        } else {
680                                            t.clone()
681                                        }
682                                    }
683                                    MessageContent::Json(v) => {
684                                        format!("JSON: {} bytes", v.to_string().len())
685                                    }
686                                };
687
688                                self.emit_event(agents_core::events::AgentEvent::ToolCompleted(
689                                    agents_core::events::ToolCompletedEvent {
690                                        metadata: self.create_event_metadata(),
691                                        tool_name: tool_name.clone(),
692                                        duration_ms: duration.as_millis() as u64,
693                                        result_summary: content_preview.clone(),
694                                        success: true,
695                                    },
696                                ));
697
698                                tracing::warn!(
699                                    "βœ… TOOL COMPLETED: {} in {:?} - Result: {}",
700                                    tool_name,
701                                    duration,
702                                    content_preview
703                                );
704
705                                // Add tool result to history and continue ReAct loop
706                                self.append_history(tool_result_message);
707                                // Loop continues - LLM will see tool result and decide next action
708                            }
709                            Err(e) => {
710                                self.emit_event(agents_core::events::AgentEvent::ToolFailed(
711                                    agents_core::events::ToolFailedEvent {
712                                        metadata: self.create_event_metadata(),
713                                        tool_name: tool_name.clone(),
714                                        duration_ms: duration.as_millis() as u64,
715                                        error_message: e.to_string(),
716                                        is_recoverable: true,
717                                        retry_count: 0,
718                                    },
719                                ));
720
721                                tracing::error!(
722                                    "❌ TOOL FAILED: {} in {:?} - Error: {}",
723                                    tool_name,
724                                    duration,
725                                    e
726                                );
727
728                                // Add error to history and continue - let LLM handle the error
729                                let error_message = AgentMessage {
730                                    role: MessageRole::Tool,
731                                    content: MessageContent::Text(format!(
732                                        "Error executing {}: {}",
733                                        tool_name, e
734                                    )),
735                                    metadata: None,
736                                };
737                                self.append_history(error_message);
738                                // Loop continues - LLM will see error and decide how to handle it
739                            }
740                        }
741                    } else {
742                        // Tool not found - add error to history and continue
743                        tracing::warn!("⚠️ Tool '{}' not found", tool_name);
744                        let error_message = AgentMessage {
745                            role: MessageRole::Tool,
746                            content: MessageContent::Text(format!(
747                                "Tool '{}' not found. Available tools: {}",
748                                tool_name,
749                                tools
750                                    .keys()
751                                    .map(|k| k.as_str())
752                                    .collect::<Vec<_>>()
753                                    .join(", ")
754                            )),
755                            metadata: None,
756                        };
757                        self.append_history(error_message);
758                        // Loop continues - LLM will see error and try something else
759                    }
760                }
761                PlannerAction::Terminate => {
762                    // LLM decided to terminate - exit loop
763                    tracing::debug!("πŸ›‘ Agent terminated");
764                    let message = AgentMessage {
765                        role: MessageRole::Agent,
766                        content: MessageContent::Text("Task completed.".into()),
767                        metadata: None,
768                    };
769                    self.append_history(message.clone());
770                    return Ok(message);
771                }
772            }
773        }
774    }
775}
776
777#[async_trait]
778impl AgentHandle for DeepAgent {
779    async fn describe(&self) -> AgentDescriptor {
780        self.descriptor.clone()
781    }
782
783    async fn handle_message(
784        &self,
785        input: AgentMessage,
786        _state: Arc<AgentStateSnapshot>,
787    ) -> anyhow::Result<AgentMessage> {
788        self.handle_message_internal(input, _state).await
789    }
790
791    async fn handle_message_stream(
792        &self,
793        input: AgentMessage,
794        _state: Arc<AgentStateSnapshot>,
795    ) -> anyhow::Result<agents_core::agent::AgentStream> {
796        use crate::planner::LlmBackedPlanner;
797        use agents_core::llm::{LlmRequest, StreamChunk};
798        use futures::StreamExt;
799
800        // Add input to history
801        self.append_history(input.clone());
802
803        // Build the request similar to handle_message_internal
804        let mut request = ModelRequest::new(&self.instructions, self.current_history());
805        let tools = self.collect_tools();
806
807        // Apply middleware modifications
808        for middleware in &self.middlewares {
809            let mut ctx = MiddlewareContext::with_request(&mut request, self.state.clone());
810            middleware.modify_model_request(&mut ctx).await?;
811        }
812
813        // Convert ModelRequest to LlmRequest and add tools
814        let tool_schemas: Vec<_> = tools.values().map(|t| t.schema()).collect();
815        let llm_request = LlmRequest {
816            system_prompt: request.system_prompt.clone(),
817            messages: request.messages.clone(),
818            tools: tool_schemas,
819        };
820
821        // Try to get the underlying LLM model for streaming
822        let planner_any = self.planner.as_any();
823
824        if let Some(llm_planner) = planner_any.downcast_ref::<LlmBackedPlanner>() {
825            // We have an LlmBackedPlanner, use its model for streaming
826            let model = llm_planner.model().clone();
827            let stream = model.generate_stream(llm_request).await?;
828
829            // Wrap stream to emit events to broadcasters
830            let agent_name = self.descriptor.name.clone();
831            let event_dispatcher = self.event_dispatcher.clone();
832
833            let wrapped_stream = stream.then(move |chunk_result| {
834                let dispatcher = event_dispatcher.clone();
835                let name = agent_name.clone();
836
837                async move {
838                    match &chunk_result {
839                        Ok(StreamChunk::TextDelta(token)) => {
840                            // Emit streaming token event
841                            if let Some(ref dispatcher) = dispatcher {
842                                let event = agents_core::events::AgentEvent::StreamingToken(
843                                    agents_core::events::StreamingTokenEvent {
844                                        metadata: agents_core::events::EventMetadata::new(
845                                            "default".to_string(),
846                                            uuid::Uuid::new_v4().to_string(),
847                                            None,
848                                        ),
849                                        agent_name: name.clone(),
850                                        token: token.clone(),
851                                    },
852                                );
853                                dispatcher.dispatch(event).await;
854                            }
855                        }
856                        Ok(StreamChunk::Done { message }) => {
857                            // Emit agent completed event
858                            if let Some(ref dispatcher) = dispatcher {
859                                let full_text = match &message.content {
860                                    agents_core::messaging::MessageContent::Text(t) => t.clone(),
861                                    agents_core::messaging::MessageContent::Json(v) => {
862                                        v.to_string()
863                                    }
864                                };
865
866                                let preview = if full_text.len() > 100 {
867                                    format!("{}...", &full_text[..100])
868                                } else {
869                                    full_text.clone()
870                                };
871
872                                let event = agents_core::events::AgentEvent::AgentCompleted(
873                                    agents_core::events::AgentCompletedEvent {
874                                        metadata: agents_core::events::EventMetadata::new(
875                                            "default".to_string(),
876                                            uuid::Uuid::new_v4().to_string(),
877                                            None,
878                                        ),
879                                        agent_name: name.clone(),
880                                        duration_ms: 0, // Duration not tracked in streaming mode
881                                        response_preview: preview,
882                                        response: full_text,
883                                    },
884                                );
885                                dispatcher.dispatch(event).await;
886                            }
887                        }
888                        _ => {}
889                    }
890                    chunk_result
891                }
892            });
893
894            Ok(Box::pin(wrapped_stream))
895        } else {
896            // Fallback to non-streaming
897            let response = self.handle_message_internal(input, _state).await?;
898            Ok(Box::pin(futures::stream::once(async move {
899                Ok(StreamChunk::Done { message: response })
900            })))
901        }
902    }
903
904    async fn current_interrupt(&self) -> anyhow::Result<Option<AgentInterrupt>> {
905        let state_guard = self
906            .state
907            .read()
908            .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on state"))?;
909        Ok(state_guard.pending_interrupts.first().cloned())
910    }
911
912    async fn resume_with_approval(
913        &self,
914        action: agents_core::hitl::HitlAction,
915    ) -> anyhow::Result<AgentMessage> {
916        self.resume_with_approval(action).await
917    }
918}
919
920/// Create a deep agent from configuration - matches Python middleware assembly exactly
921///
922/// This function assembles the middleware stack in the same order as the Python SDK:
923/// planning β†’ filesystem β†’ subagents β†’ summarization β†’ prompt caching β†’ optional HITL
924pub fn create_deep_agent_from_config(config: DeepAgentConfig) -> DeepAgent {
925    let state = Arc::new(RwLock::new(AgentStateSnapshot::default()));
926    let history = Arc::new(RwLock::new(Vec::<AgentMessage>::new()));
927
928    let planning = Arc::new(PlanningMiddleware::new(state.clone()));
929    let filesystem = Arc::new(FilesystemMiddleware::new(state.clone()));
930
931    // Build sub-agents from configurations
932    let mut registrations: Vec<SubAgentRegistration> = Vec::new();
933
934    for subagent_config in &config.subagent_configs {
935        // Determine the planner for this sub-agent
936        let sub_planner = if let Some(ref model) = subagent_config.model {
937            // Sub-agent has its own model - wrap it in a planner
938            Arc::new(LlmBackedPlanner::new(model.clone())) as Arc<dyn PlannerHandle>
939        } else {
940            // Inherit parent's planner
941            config.planner.clone()
942        };
943
944        // Create a DeepAgentConfig for this sub-agent
945        let mut sub_cfg = DeepAgentConfig::new(subagent_config.instructions.clone(), sub_planner);
946
947        // Configure tools
948        if let Some(ref tools) = subagent_config.tools {
949            tracing::debug!(
950                "  - Configuring {} tools for {}",
951                tools.len(),
952                subagent_config.name
953            );
954            for tool in tools {
955                sub_cfg = sub_cfg.with_tool(tool.clone());
956            }
957        }
958
959        // Configure built-in tools
960        if let Some(ref builtin) = subagent_config.builtin_tools {
961            sub_cfg = sub_cfg.with_builtin_tools(builtin.iter().cloned());
962        }
963
964        // Sub-agents should not have their own sub-agents
965        sub_cfg = sub_cfg.with_auto_general_purpose(false);
966
967        // Configure prompt caching
968        sub_cfg = sub_cfg.with_prompt_caching(subagent_config.enable_prompt_caching);
969
970        // Inherit PII sanitization setting from parent
971        sub_cfg = sub_cfg.with_pii_sanitization(config.enable_pii_sanitization);
972
973        // Build the sub-agent recursively
974        let sub_agent = create_deep_agent_from_config(sub_cfg);
975
976        // Register the sub-agent
977        registrations.push(SubAgentRegistration {
978            descriptor: SubAgentDescriptor {
979                name: subagent_config.name.clone(),
980                description: subagent_config.description.clone(),
981            },
982            agent: Arc::new(sub_agent),
983        });
984
985        tracing::info!("=> Registered sub-agent: {}", subagent_config.name);
986    }
987
988    tracing::info!("=> Total sub-agents registered: {}", registrations.len());
989
990    // Optionally inject a general-purpose subagent
991    if config.auto_general_purpose {
992        let has_gp = registrations
993            .iter()
994            .any(|r| r.descriptor.name == "general-purpose");
995        if !has_gp {
996            // Create a subagent with inherited planner/tools and same instructions
997            let mut sub_cfg =
998                DeepAgentConfig::new(config.instructions.clone(), config.planner.clone())
999                    .with_auto_general_purpose(false)
1000                    .with_prompt_caching(config.enable_prompt_caching)
1001                    .with_pii_sanitization(config.enable_pii_sanitization);
1002            if let Some(ref selected) = config.builtin_tools {
1003                sub_cfg = sub_cfg.with_builtin_tools(selected.iter().cloned());
1004            }
1005            if let Some(ref sum) = config.summarization {
1006                sub_cfg = sub_cfg.with_summarization(sum.clone());
1007            }
1008            for t in &config.tools {
1009                sub_cfg = sub_cfg.with_tool(t.clone());
1010            }
1011
1012            let gp = create_deep_agent_from_config(sub_cfg);
1013            registrations.push(SubAgentRegistration {
1014                descriptor: SubAgentDescriptor {
1015                    name: "general-purpose".into(),
1016                    description: "Default reasoning agent".into(),
1017                },
1018                agent: Arc::new(gp),
1019            });
1020        }
1021    }
1022
1023    let subagent = Arc::new(SubAgentMiddleware::new_with_events(
1024        registrations,
1025        config.event_dispatcher.clone(),
1026    ));
1027    let base_prompt = Arc::new(BaseSystemPromptMiddleware);
1028    let deep_agent_prompt = Arc::new(DeepAgentPromptMiddleware::new(config.instructions.clone()));
1029    let summarization = config.summarization.as_ref().map(|cfg| {
1030        Arc::new(SummarizationMiddleware::new(
1031            cfg.messages_to_keep,
1032            cfg.summary_note.clone(),
1033        ))
1034    });
1035    let hitl = if config.tool_interrupts.is_empty() {
1036        None
1037    } else {
1038        // Validate that checkpointer is configured when HITL is enabled
1039        if config.checkpointer.is_none() {
1040            tracing::error!(
1041                "⚠️ HITL middleware requires a checkpointer to persist interrupt state. \
1042                 HITL will be disabled. Please configure a checkpointer to enable HITL."
1043            );
1044            None
1045        } else {
1046            tracing::info!("πŸ”’ HITL enabled for {} tools", config.tool_interrupts.len());
1047            Some(Arc::new(HumanInLoopMiddleware::new(
1048                config.tool_interrupts.clone(),
1049            )))
1050        }
1051    };
1052
1053    // Assemble middleware stack with Deep Agent prompt for automatic tool usage
1054    // Order: base β†’ deep agent prompt β†’ planning β†’ filesystem β†’ subagents β†’ summarization β†’ caching β†’ HITL
1055    let mut middlewares: Vec<Arc<dyn AgentMiddleware>> = vec![
1056        base_prompt,
1057        deep_agent_prompt,
1058        planning,
1059        filesystem,
1060        subagent,
1061    ];
1062    if let Some(ref summary) = summarization {
1063        middlewares.push(summary.clone());
1064    }
1065    if config.enable_prompt_caching {
1066        middlewares.push(Arc::new(AnthropicPromptCachingMiddleware::with_defaults()));
1067    }
1068    if let Some(ref hitl_mw) = hitl {
1069        middlewares.push(hitl_mw.clone());
1070    }
1071
1072    DeepAgent {
1073        descriptor: AgentDescriptor {
1074            name: "deep-agent".into(),
1075            version: "0.0.1".into(),
1076            description: Some("Rust deep agent".into()),
1077        },
1078        instructions: config.instructions,
1079        planner: config.planner,
1080        middlewares,
1081        base_tools: config.tools,
1082        state,
1083        history,
1084        _summarization: summarization,
1085        _hitl: hitl,
1086        builtin_tools: config.builtin_tools,
1087        checkpointer: config.checkpointer,
1088        event_dispatcher: config.event_dispatcher,
1089        enable_pii_sanitization: config.enable_pii_sanitization,
1090    }
1091}