Source code for langchain.agents.agent

"""Chain that takes in an input and produces an action and action input."""

from __future__ import annotations

import asyncio
import json
import logging
import time
from abc import abstractmethod
from pathlib import Path
from typing import (
    Any,
    AsyncIterator,
    Callable,
    Dict,
    Iterator,
    List,
    Optional,
    Sequence,
    Tuple,
    Union,
)

import yaml
from langchain_core._api import deprecated
from langchain_core.agents import AgentAction, AgentFinish, AgentStep
from langchain_core.callbacks import (
    AsyncCallbackManagerForChainRun,
    AsyncCallbackManagerForToolRun,
    BaseCallbackManager,
    CallbackManagerForChainRun,
    CallbackManagerForToolRun,
    Callbacks,
)
from langchain_core.exceptions import OutputParserException
from langchain_core.language_models import BaseLanguageModel
from langchain_core.messages import BaseMessage
from langchain_core.output_parsers import BaseOutputParser
from langchain_core.prompts import BasePromptTemplate
from langchain_core.prompts.few_shot import FewShotPromptTemplate
from langchain_core.prompts.prompt import PromptTemplate
from langchain_core.pydantic_v1 import BaseModel, root_validator
from langchain_core.runnables import Runnable, RunnableConfig, ensure_config
from langchain_core.runnables.utils import AddableDict
from langchain_core.tools import BaseTool
from langchain_core.utils.input import get_color_mapping

from langchain.agents.agent_iterator import AgentExecutorIterator
from langchain.agents.agent_types import AgentType
from langchain.agents.tools import InvalidTool
from langchain.chains.base import Chain
from langchain.chains.llm import LLMChain
from langchain.utilities.asyncio import asyncio_timeout

logger = logging.getLogger(__name__)


[docs]class BaseSingleActionAgent(BaseModel): """Base Single Action Agent class.""" @property def return_values(self) -> List[str]: """Return values of the agent.""" return ["output"]
[docs] def get_allowed_tools(self) -> Optional[List[str]]: return None
[docs] @abstractmethod def plan( self, intermediate_steps: List[Tuple[AgentAction, str]], callbacks: Callbacks = None, **kwargs: Any, ) -> Union[AgentAction, AgentFinish]: """Given input, decided what to do. Args: intermediate_steps: Steps the LLM has taken to date, along with observations callbacks: Callbacks to run. **kwargs: User inputs. Returns: Action specifying what tool to use. """
[docs] @abstractmethod async def aplan( self, intermediate_steps: List[Tuple[AgentAction, str]], callbacks: Callbacks = None, **kwargs: Any, ) -> Union[AgentAction, AgentFinish]: """Given input, decided what to do. Args: intermediate_steps: Steps the LLM has taken to date, along with observations callbacks: Callbacks to run. **kwargs: User inputs. Returns: Action specifying what tool to use. """
@property @abstractmethod def input_keys(self) -> List[str]: """Return the input keys. :meta private: """
[docs] def return_stopped_response( self, early_stopping_method: str, intermediate_steps: List[Tuple[AgentAction, str]], **kwargs: Any, ) -> AgentFinish: """Return response when agent has been stopped due to max iterations.""" if early_stopping_method == "force": # `force` just returns a constant string return AgentFinish( {"output": "Agent stopped due to iteration limit or time limit."}, "" ) else: raise ValueError( f"Got unsupported early_stopping_method `{early_stopping_method}`" )
[docs] @classmethod def from_llm_and_tools( cls, llm: BaseLanguageModel, tools: Sequence[BaseTool], callback_manager: Optional[BaseCallbackManager] = None, **kwargs: Any, ) -> BaseSingleActionAgent: raise NotImplementedError
@property def _agent_type(self) -> str: """Return Identifier of agent type.""" raise NotImplementedError
[docs] def dict(self, **kwargs: Any) -> Dict: """Return dictionary representation of agent.""" _dict = super().dict() try: _type = self._agent_type except NotImplementedError: _type = None if isinstance(_type, AgentType): _dict["_type"] = str(_type.value) elif _type is not None: _dict["_type"] = _type return _dict
[docs] def save(self, file_path: Union[Path, str]) -> None: """Save the agent. Args: file_path: Path to file to save the agent to. Example: .. code-block:: python # If working with agent executor agent.agent.save(file_path="path/agent.yaml") """ # Convert file to Path object. if isinstance(file_path, str): save_path = Path(file_path) else: save_path = file_path directory_path = save_path.parent directory_path.mkdir(parents=True, exist_ok=True) # Fetch dictionary to save agent_dict = self.dict() if "_type" not in agent_dict: raise NotImplementedError(f"Agent {self} does not support saving") if save_path.suffix == ".json": with open(file_path, "w") as f: json.dump(agent_dict, f, indent=4) elif save_path.suffix.endswith((".yaml", ".yml")): with open(file_path, "w") as f: yaml.dump(agent_dict, f, default_flow_style=False) else: raise ValueError(f"{save_path} must be json or yaml")
[docs] def tool_run_logging_kwargs(self) -> Dict: return {}
[docs]class BaseMultiActionAgent(BaseModel): """Base Multi Action Agent class.""" @property def return_values(self) -> List[str]: """Return values of the agent.""" return ["output"]
[docs] def get_allowed_tools(self) -> Optional[List[str]]: return None
[docs] @abstractmethod def plan( self, intermediate_steps: List[Tuple[AgentAction, str]], callbacks: Callbacks = None, **kwargs: Any, ) -> Union[List[AgentAction], AgentFinish]: """Given input, decided what to do. Args: intermediate_steps: Steps the LLM has taken to date, along with the observations. callbacks: Callbacks to run. **kwargs: User inputs. Returns: Actions specifying what tool to use. """
[docs] @abstractmethod async def aplan( self, intermediate_steps: List[Tuple[AgentAction, str]], callbacks: Callbacks = None, **kwargs: Any, ) -> Union[List[AgentAction], AgentFinish]: """Given input, decided what to do. Args: intermediate_steps: Steps the LLM has taken to date, along with the observations. callbacks: Callbacks to run. **kwargs: User inputs. Returns: Actions specifying what tool to use. """
@property @abstractmethod def input_keys(self) -> List[str]: """Return the input keys. :meta private: """
[docs] def return_stopped_response( self, early_stopping_method: str, intermediate_steps: List[Tuple[AgentAction, str]], **kwargs: Any, ) -> AgentFinish: """Return response when agent has been stopped due to max iterations.""" if early_stopping_method == "force": # `force` just returns a constant string return AgentFinish({"output": "Agent stopped due to max iterations."}, "") else: raise ValueError( f"Got unsupported early_stopping_method `{early_stopping_method}`" )
@property def _agent_type(self) -> str: """Return Identifier of agent type.""" raise NotImplementedError
[docs] def dict(self, **kwargs: Any) -> Dict: """Return dictionary representation of agent.""" _dict = super().dict() try: _dict["_type"] = str(self._agent_type) except NotImplementedError: pass return _dict
[docs] def save(self, file_path: Union[Path, str]) -> None: """Save the agent. Args: file_path: Path to file to save the agent to. Example: .. code-block:: python # If working with agent executor agent.agent.save(file_path="path/agent.yaml") """ # Convert file to Path object. if isinstance(file_path, str): save_path = Path(file_path) else: save_path = file_path # Fetch dictionary to save agent_dict = self.dict() if "_type" not in agent_dict: raise NotImplementedError(f"Agent {self} does not support saving.") directory_path = save_path.parent directory_path.mkdir(parents=True, exist_ok=True) if save_path.suffix == ".json": with open(file_path, "w") as f: json.dump(agent_dict, f, indent=4) elif save_path.suffix.endswith((".yaml", ".yml")): with open(file_path, "w") as f: yaml.dump(agent_dict, f, default_flow_style=False) else: raise ValueError(f"{save_path} must be json or yaml")
[docs] def tool_run_logging_kwargs(self) -> Dict: return {}
[docs]class AgentOutputParser(BaseOutputParser[Union[AgentAction, AgentFinish]]): """Base class for parsing agent output into agent action/finish."""
[docs] @abstractmethod def parse(self, text: str) -> Union[AgentAction, AgentFinish]: """Parse text into agent action/finish."""
[docs]class MultiActionAgentOutputParser( BaseOutputParser[Union[List[AgentAction], AgentFinish]] ): """Base class for parsing agent output into agent actions/finish."""
[docs] @abstractmethod def parse(self, text: str) -> Union[List[AgentAction], AgentFinish]: """Parse text into agent actions/finish."""
[docs]class RunnableAgent(BaseSingleActionAgent): """Agent powered by runnables.""" runnable: Runnable[dict, Union[AgentAction, AgentFinish]] """Runnable to call to get agent action.""" input_keys_arg: List[str] = [] return_keys_arg: List[str] = [] stream_runnable: bool = True """Whether to stream from the runnable or not. If True then underlying LLM is invoked in a streaming fashion to make it possible to get access to the individual LLM tokens when using stream_log with the Agent Executor. If False then LLM is invoked in a non-streaming fashion and individual LLM tokens will not be available in stream_log. """ class Config: """Configuration for this pydantic object.""" arbitrary_types_allowed = True @property def return_values(self) -> List[str]: """Return values of the agent.""" return self.return_keys_arg @property def input_keys(self) -> List[str]: return self.input_keys_arg
[docs] def plan( self, intermediate_steps: List[Tuple[AgentAction, str]], callbacks: Callbacks = None, **kwargs: Any, ) -> Union[AgentAction, AgentFinish]: """Based on past history and current inputs, decide what to do. Args: intermediate_steps: Steps the LLM has taken to date, along with the observations. callbacks: Callbacks to run. **kwargs: User inputs. Returns: Action specifying what tool to use. """ inputs = {**kwargs, **{"intermediate_steps": intermediate_steps}} final_output: Any = None if self.stream_runnable: # Use streaming to make sure that the underlying LLM is invoked in a # streaming # fashion to make it possible to get access to the individual LLM tokens # when using stream_log with the Agent Executor. # Because the response from the plan is not a generator, we need to # accumulate the output into final output and return that. for chunk in self.runnable.stream(inputs, config={"callbacks": callbacks}): if final_output is None: final_output = chunk else: final_output += chunk else: final_output = self.runnable.invoke(inputs, config={"callbacks": callbacks}) return final_output
[docs] async def aplan( self, intermediate_steps: List[Tuple[AgentAction, str]], callbacks: Callbacks = None, **kwargs: Any, ) -> Union[ AgentAction, AgentFinish, ]: """Based on past history and current inputs, decide what to do. Args: intermediate_steps: Steps the LLM has taken to date, along with observations callbacks: Callbacks to run. **kwargs: User inputs Returns: Action specifying what tool to use. """ inputs = {**kwargs, **{"intermediate_steps": intermediate_steps}} final_output: Any = None if self.stream_runnable: # Use streaming to make sure that the underlying LLM is invoked in a # streaming # fashion to make it possible to get access to the individual LLM tokens # when using stream_log with the Agent Executor. # Because the response from the plan is not a generator, we need to # accumulate the output into final output and return that. async for chunk in self.runnable.astream( inputs, config={"callbacks": callbacks} ): if final_output is None: final_output = chunk else: final_output += chunk else: final_output = await self.runnable.ainvoke( inputs, config={"callbacks": callbacks} ) return final_output
[docs]class RunnableMultiActionAgent(BaseMultiActionAgent): """Agent powered by runnables.""" runnable: Runnable[dict, Union[List[AgentAction], AgentFinish]] """Runnable to call to get agent actions.""" input_keys_arg: List[str] = [] return_keys_arg: List[str] = [] stream_runnable: bool = True """Whether to stream from the runnable or not. If True then underlying LLM is invoked in a streaming fashion to make it possible to get access to the individual LLM tokens when using stream_log with the Agent Executor. If False then LLM is invoked in a non-streaming fashion and individual LLM tokens will not be available in stream_log. """ class Config: """Configuration for this pydantic object.""" arbitrary_types_allowed = True @property def return_values(self) -> List[str]: """Return values of the agent.""" return self.return_keys_arg @property def input_keys(self) -> List[str]: """Return the input keys. Returns: List of input keys. """ return self.input_keys_arg
[docs] def plan( self, intermediate_steps: List[Tuple[AgentAction, str]], callbacks: Callbacks = None, **kwargs: Any, ) -> Union[ List[AgentAction], AgentFinish, ]: """Based on past history and current inputs, decide what to do. Args: intermediate_steps: Steps the LLM has taken to date, along with the observations. callbacks: Callbacks to run. **kwargs: User inputs. Returns: Action specifying what tool to use. """ inputs = {**kwargs, **{"intermediate_steps": intermediate_steps}} final_output: Any = None if self.stream_runnable: # Use streaming to make sure that the underlying LLM is invoked in a # streaming # fashion to make it possible to get access to the individual LLM tokens # when using stream_log with the Agent Executor. # Because the response from the plan is not a generator, we need to # accumulate the output into final output and return that. for chunk in self.runnable.stream(inputs, config={"callbacks": callbacks}): if final_output is None: final_output = chunk else: final_output += chunk else: final_output = self.runnable.invoke(inputs, config={"callbacks": callbacks}) return final_output
[docs] async def aplan( self, intermediate_steps: List[Tuple[AgentAction, str]], callbacks: Callbacks = None, **kwargs: Any, ) -> Union[ List[AgentAction], AgentFinish, ]: """Based on past history and current inputs, decide what to do. Args: intermediate_steps: Steps the LLM has taken to date, along with observations callbacks: Callbacks to run. **kwargs: User inputs. Returns: Action specifying what tool to use. """ inputs = {**kwargs, **{"intermediate_steps": intermediate_steps}} final_output: Any = None if self.stream_runnable: # Use streaming to make sure that the underlying LLM is invoked in a # streaming # fashion to make it possible to get access to the individual LLM tokens # when using stream_log with the Agent Executor. # Because the response from the plan is not a generator, we need to # accumulate the output into final output and return that. async for chunk in self.runnable.astream( inputs, config={"callbacks": callbacks} ): if final_output is None: final_output = chunk else: final_output += chunk else: final_output = await self.runnable.ainvoke( inputs, config={"callbacks": callbacks} ) return final_output
[docs]@deprecated( "0.1.0", alternative=( "Use new agent constructor methods like create_react_agent, create_json_agent, " "create_structured_chat_agent, etc." ), removal="0.3.0", ) class LLMSingleActionAgent(BaseSingleActionAgent): """Base class for single action agents.""" llm_chain: LLMChain """LLMChain to use for agent.""" output_parser: AgentOutputParser """Output parser to use for agent.""" stop: List[str] """List of strings to stop on.""" @property def input_keys(self) -> List[str]: """Return the input keys. Returns: List of input keys. """ return list(set(self.llm_chain.input_keys) - {"intermediate_steps"})
[docs] def dict(self, **kwargs: Any) -> Dict: """Return dictionary representation of agent.""" _dict = super().dict() del _dict["output_parser"] return _dict
[docs] def plan( self, intermediate_steps: List[Tuple[AgentAction, str]], callbacks: Callbacks = None, **kwargs: Any, ) -> Union[AgentAction, AgentFinish]: """Given input, decided what to do. Args: intermediate_steps: Steps the LLM has taken to date, along with the observations. callbacks: Callbacks to run. **kwargs: User inputs. Returns: Action specifying what tool to use. """ output = self.llm_chain.run( intermediate_steps=intermediate_steps, stop=self.stop, callbacks=callbacks, **kwargs, ) return self.output_parser.parse(output)
[docs] async def aplan( self, intermediate_steps: List[Tuple[AgentAction, str]], callbacks: Callbacks = None, **kwargs: Any, ) -> Union[AgentAction, AgentFinish]: """Given input, decided what to do. Args: intermediate_steps: Steps the LLM has taken to date, along with observations callbacks: Callbacks to run. **kwargs: User inputs. Returns: Action specifying what tool to use. """ output = await self.llm_chain.arun( intermediate_steps=intermediate_steps, stop=self.stop, callbacks=callbacks, **kwargs, ) return self.output_parser.parse(output)
[docs] def tool_run_logging_kwargs(self) -> Dict: return { "llm_prefix": "", "observation_prefix": "" if len(self.stop) == 0 else self.stop[0], }
[docs]@deprecated( "0.1.0", alternative=( "Use new agent constructor methods like create_react_agent, create_json_agent, " "create_structured_chat_agent, etc." ), removal="0.3.0", ) class Agent(BaseSingleActionAgent): """Agent that calls the language model and deciding the action. This is driven by an LLMChain. The prompt in the LLMChain MUST include a variable called "agent_scratchpad" where the agent can put its intermediary work. """ llm_chain: LLMChain output_parser: AgentOutputParser allowed_tools: Optional[List[str]] = None
[docs] def dict(self, **kwargs: Any) -> Dict: """Return dictionary representation of agent.""" _dict = super().dict() del _dict["output_parser"] return _dict
[docs] def get_allowed_tools(self) -> Optional[List[str]]: return self.allowed_tools
@property def return_values(self) -> List[str]: return ["output"] def _fix_text(self, text: str) -> str: """Fix the text.""" raise ValueError("fix_text not implemented for this agent.") @property def _stop(self) -> List[str]: return [ f"\n{self.observation_prefix.rstrip()}", f"\n\t{self.observation_prefix.rstrip()}", ] def _construct_scratchpad( self, intermediate_steps: List[Tuple[AgentAction, str]] ) -> Union[str, List[BaseMessage]]: """Construct the scratchpad that lets the agent continue its thought process.""" thoughts = "" for action, observation in intermediate_steps: thoughts += action.log thoughts += f"\n{self.observation_prefix}{observation}\n{self.llm_prefix}" return thoughts
[docs] def plan( self, intermediate_steps: List[Tuple[AgentAction, str]], callbacks: Callbacks = None, **kwargs: Any, ) -> Union[AgentAction, AgentFinish]: """Given input, decided what to do. Args: intermediate_steps: Steps the LLM has taken to date, along with observations callbacks: Callbacks to run. **kwargs: User inputs. Returns: Action specifying what tool to use. """ full_inputs = self.get_full_inputs(intermediate_steps, **kwargs) full_output = self.llm_chain.predict(callbacks=callbacks, **full_inputs) return self.output_parser.parse(full_output)
[docs] async def aplan( self, intermediate_steps: List[Tuple[AgentAction, str]], callbacks: Callbacks = None, **kwargs: Any, ) -> Union[AgentAction, AgentFinish]: """Given input, decided what to do. Args: intermediate_steps: Steps the LLM has taken to date, along with observations callbacks: Callbacks to run. **kwargs: User inputs. Returns: Action specifying what tool to use. """ full_inputs = self.get_full_inputs(intermediate_steps, **kwargs) full_output = await self.llm_chain.apredict(callbacks=callbacks, **full_inputs) agent_output = await self.output_parser.aparse(full_output) return agent_output
[docs] def get_full_inputs( self, intermediate_steps: List[Tuple[AgentAction, str]], **kwargs: Any ) -> Dict[str, Any]: """Create the full inputs for the LLMChain from intermediate steps.""" thoughts = self._construct_scratchpad(intermediate_steps) new_inputs = {"agent_scratchpad": thoughts, "stop": self._stop} full_inputs = {**kwargs, **new_inputs} return full_inputs
@property def input_keys(self) -> List[str]: """Return the input keys. :meta private: """ return list(set(self.llm_chain.input_keys) - {"agent_scratchpad"}) @root_validator() def validate_prompt(cls, values: Dict) -> Dict: """Validate that prompt matches format.""" prompt = values["llm_chain"].prompt if "agent_scratchpad" not in prompt.input_variables: logger.warning( "`agent_scratchpad` should be a variable in prompt.input_variables." " Did not find it, so adding it at the end." ) prompt.input_variables.append("agent_scratchpad") if isinstance(prompt, PromptTemplate): prompt.template += "\n{agent_scratchpad}" elif isinstance(prompt, FewShotPromptTemplate): prompt.suffix += "\n{agent_scratchpad}" else: raise ValueError(f"Got unexpected prompt type {type(prompt)}") return values @property @abstractmethod def observation_prefix(self) -> str: """Prefix to append the observation with.""" @property @abstractmethod def llm_prefix(self) -> str: """Prefix to append the LLM call with."""
[docs] @classmethod @abstractmethod def create_prompt(cls, tools: Sequence[BaseTool]) -> BasePromptTemplate: """Create a prompt for this class."""
@classmethod def _validate_tools(cls, tools: Sequence[BaseTool]) -> None: """Validate that appropriate tools are passed in.""" pass @classmethod @abstractmethod def _get_default_output_parser(cls, **kwargs: Any) -> AgentOutputParser: """Get default output parser for this class."""
[docs] @classmethod def from_llm_and_tools( cls, llm: BaseLanguageModel, tools: Sequence[BaseTool], callback_manager: Optional[BaseCallbackManager] = None, output_parser: Optional[AgentOutputParser] = None, **kwargs: Any, ) -> Agent: """Construct an agent from an LLM and tools.""" cls._validate_tools(tools) llm_chain = LLMChain( llm=llm, prompt=cls.create_prompt(tools), callback_manager=callback_manager, ) tool_names = [tool.name for tool in tools] _output_parser = output_parser or cls._get_default_output_parser() return cls( llm_chain=llm_chain, allowed_tools=tool_names, output_parser=_output_parser, **kwargs, )
[docs] def return_stopped_response( self, early_stopping_method: str, intermediate_steps: List[Tuple[AgentAction, str]], **kwargs: Any, ) -> AgentFinish: """Return response when agent has been stopped due to max iterations.""" if early_stopping_method == "force": # `force` just returns a constant string return AgentFinish( {"output": "Agent stopped due to iteration limit or time limit."}, "" ) elif early_stopping_method == "generate": # Generate does one final forward pass thoughts = "" for action, observation in intermediate_steps: thoughts += action.log thoughts += ( f"\n{self.observation_prefix}{observation}\n{self.llm_prefix}" ) # Adding to the previous steps, we now tell the LLM to make a final pred thoughts += ( "\n\nI now need to return a final answer based on the previous steps:" ) new_inputs = {"agent_scratchpad": thoughts, "stop": self._stop} full_inputs = {**kwargs, **new_inputs} full_output = self.llm_chain.predict(**full_inputs) # We try to extract a final answer parsed_output = self.output_parser.parse(full_output) if isinstance(parsed_output, AgentFinish): # If we can extract, we send the correct stuff return parsed_output else: # If we can extract, but the tool is not the final tool, # we just return the full output return AgentFinish({"output": full_output}, full_output) else: raise ValueError( "early_stopping_method should be one of `force` or `generate`, " f"got {early_stopping_method}" )
[docs] def tool_run_logging_kwargs(self) -> Dict: return { "llm_prefix": self.llm_prefix, "observation_prefix": self.observation_prefix, }
[docs]class ExceptionTool(BaseTool): """Tool that just returns the query.""" name: str = "_Exception" """Name of the tool.""" description: str = "Exception tool" """Description of the tool.""" def _run( self, query: str, run_manager: Optional[CallbackManagerForToolRun] = None, ) -> str: return query async def _arun( self, query: str, run_manager: Optional[AsyncCallbackManagerForToolRun] = None, ) -> str: return query
NextStepOutput = List[Union[AgentFinish, AgentAction, AgentStep]]
[docs]class AgentExecutor(Chain): """Agent that is using tools.""" agent: Union[BaseSingleActionAgent, BaseMultiActionAgent] """The agent to run for creating a plan and determining actions to take at each step of the execution loop.""" tools: Sequence[BaseTool] """The valid tools the agent can call.""" return_intermediate_steps: bool = False """Whether to return the agent's trajectory of intermediate steps at the end in addition to the final output.""" max_iterations: Optional[int] = 15 """The maximum number of steps to take before ending the execution loop. Setting to 'None' could lead to an infinite loop.""" max_execution_time: Optional[float] = None """The maximum amount of wall clock time to spend in the execution loop. """ early_stopping_method: str = "force" """The method to use for early stopping if the agent never returns `AgentFinish`. Either 'force' or 'generate'. `"force"` returns a string saying that it stopped because it met a time or iteration limit. `"generate"` calls the agent's LLM Chain one final time to generate a final answer based on the previous steps. """ handle_parsing_errors: Union[ bool, str, Callable[[OutputParserException], str] ] = False """How to handle errors raised by the agent's output parser. Defaults to `False`, which raises the error. If `true`, the error will be sent back to the LLM as an observation. If a string, the string itself will be sent to the LLM as an observation. If a callable function, the function will be called with the exception as an argument, and the result of that function will be passed to the agent as an observation. """ trim_intermediate_steps: Union[ int, Callable[[List[Tuple[AgentAction, str]]], List[Tuple[AgentAction, str]]] ] = -1
[docs] @classmethod def from_agent_and_tools( cls, agent: Union[BaseSingleActionAgent, BaseMultiActionAgent], tools: Sequence[BaseTool], callbacks: Callbacks = None, **kwargs: Any, ) -> AgentExecutor: """Create from agent and tools.""" return cls( agent=agent, tools=tools, callbacks=callbacks, **kwargs, )
@root_validator() def validate_tools(cls, values: Dict) -> Dict: """Validate that tools are compatible with agent.""" agent = values["agent"] tools = values["tools"] allowed_tools = agent.get_allowed_tools() if allowed_tools is not None: if set(allowed_tools) != set([tool.name for tool in tools]): raise ValueError( f"Allowed tools ({allowed_tools}) different than " f"provided tools ({[tool.name for tool in tools]})" ) return values @root_validator() def validate_return_direct_tool(cls, values: Dict) -> Dict: """Validate that tools are compatible with agent.""" agent = values["agent"] tools = values["tools"] if isinstance(agent, BaseMultiActionAgent): for tool in tools: if tool.return_direct: raise ValueError( "Tools that have `return_direct=True` are not allowed " "in multi-action agents" ) return values @root_validator(pre=True) def validate_runnable_agent(cls, values: Dict) -> Dict: """Convert runnable to agent if passed in.""" agent = values["agent"] if isinstance(agent, Runnable): try: output_type = agent.OutputType except Exception as _: multi_action = False else: multi_action = output_type == Union[List[AgentAction], AgentFinish] stream_runnable = values.pop("stream_runnable", True) if multi_action: values["agent"] = RunnableMultiActionAgent( runnable=agent, stream_runnable=stream_runnable ) else: values["agent"] = RunnableAgent( runnable=agent, stream_runnable=stream_runnable ) return values
[docs] def save(self, file_path: Union[Path, str]) -> None: """Raise error - saving not supported for Agent Executors.""" raise ValueError( "Saving not supported for agent executors. " "If you are trying to save the agent, please use the " "`.save_agent(...)`" )
[docs] def save_agent(self, file_path: Union[Path, str]) -> None: """Save the underlying agent.""" return self.agent.save(file_path)
[docs] def iter( self, inputs: Any, callbacks: Callbacks = None, *, include_run_info: bool = False, async_: bool = False, # arg kept for backwards compat, but ignored ) -> AgentExecutorIterator: """Enables iteration over steps taken to reach final output.""" return AgentExecutorIterator( self, inputs, callbacks, tags=self.tags, include_run_info=include_run_info, )
@property def input_keys(self) -> List[str]: """Return the input keys. :meta private: """ return self.agent.input_keys @property def output_keys(self) -> List[str]: """Return the singular output key. :meta private: """ if self.return_intermediate_steps: return self.agent.return_values + ["intermediate_steps"] else: return self.agent.return_values
[docs] def lookup_tool(self, name: str) -> BaseTool: """Lookup tool by name.""" return {tool.name: tool for tool in self.tools}[name]
def _should_continue(self, iterations: int, time_elapsed: float) -> bool: if self.max_iterations is not None and iterations >= self.max_iterations: return False if ( self.max_execution_time is not None and time_elapsed >= self.max_execution_time ): return False return True def _return( self, output: AgentFinish, intermediate_steps: list, run_manager: Optional[CallbackManagerForChainRun] = None, ) -> Dict[str, Any]: if run_manager: run_manager.on_agent_finish(output, color="green", verbose=self.verbose) final_output = output.return_values if self.return_intermediate_steps: final_output["intermediate_steps"] = intermediate_steps return final_output async def _areturn( self, output: AgentFinish, intermediate_steps: list, run_manager: Optional[AsyncCallbackManagerForChainRun] = None, ) -> Dict[str, Any]: if run_manager: await run_manager.on_agent_finish( output, color="green", verbose=self.verbose ) final_output = output.return_values if self.return_intermediate_steps: final_output["intermediate_steps"] = intermediate_steps return final_output def _consume_next_step( self, values: NextStepOutput ) -> Union[AgentFinish, List[Tuple[AgentAction, str]]]: if isinstance(values[-1], AgentFinish): assert len(values) == 1 return values[-1] else: return [ (a.action, a.observation) for a in values if isinstance(a, AgentStep) ] def _take_next_step( self, name_to_tool_map: Dict[str, BaseTool], color_mapping: Dict[str, str], inputs: Dict[str, str], intermediate_steps: List[Tuple[AgentAction, str]], run_manager: Optional[CallbackManagerForChainRun] = None, ) -> Union[AgentFinish, List[Tuple[AgentAction, str]]]: return self._consume_next_step( [ a for a in self._iter_next_step( name_to_tool_map, color_mapping, inputs, intermediate_steps, run_manager, ) ] ) def _iter_next_step( self, name_to_tool_map: Dict[str, BaseTool], color_mapping: Dict[str, str], inputs: Dict[str, str], intermediate_steps: List[Tuple[AgentAction, str]], run_manager: Optional[CallbackManagerForChainRun] = None, ) -> Iterator[Union[AgentFinish, AgentAction, AgentStep]]: """Take a single step in the thought-action-observation loop. Override this to take control of how the agent makes and acts on choices. """ try: intermediate_steps = self._prepare_intermediate_steps(intermediate_steps) # Call the LLM to see what to do. output = self.agent.plan( intermediate_steps, callbacks=run_manager.get_child() if run_manager else None, **inputs, ) except OutputParserException as e: if isinstance(self.handle_parsing_errors, bool): raise_error = not self.handle_parsing_errors else: raise_error = False if raise_error: raise ValueError( "An output parsing error occurred. " "In order to pass this error back to the agent and have it try " "again, pass `handle_parsing_errors=True` to the AgentExecutor. " f"This is the error: {str(e)}" ) text = str(e) if isinstance(self.handle_parsing_errors, bool): if e.send_to_llm: observation = str(e.observation) text = str(e.llm_output) else: observation = "Invalid or incomplete response" elif isinstance(self.handle_parsing_errors, str): observation = self.handle_parsing_errors elif callable(self.handle_parsing_errors): observation = self.handle_parsing_errors(e) else: raise ValueError("Got unexpected type of `handle_parsing_errors`") output = AgentAction("_Exception", observation, text) if run_manager: run_manager.on_agent_action(output, color="green") tool_run_kwargs = self.agent.tool_run_logging_kwargs() observation = ExceptionTool().run( output.tool_input, verbose=self.verbose, color=None, callbacks=run_manager.get_child() if run_manager else None, **tool_run_kwargs, ) yield AgentStep(action=output, observation=observation) return # If the tool chosen is the finishing tool, then we end and return. if isinstance(output, AgentFinish): yield output return actions: List[AgentAction] if isinstance(output, AgentAction): actions = [output] else: actions = output for agent_action in actions: yield agent_action for agent_action in actions: yield self._perform_agent_action( name_to_tool_map, color_mapping, agent_action, run_manager ) def _perform_agent_action( self, name_to_tool_map: Dict[str, BaseTool], color_mapping: Dict[str, str], agent_action: AgentAction, run_manager: Optional[CallbackManagerForChainRun] = None, ) -> AgentStep: if run_manager: run_manager.on_agent_action(agent_action, color="green") # Otherwise we lookup the tool if agent_action.tool in name_to_tool_map: tool = name_to_tool_map[agent_action.tool] return_direct = tool.return_direct color = color_mapping[agent_action.tool] tool_run_kwargs = self.agent.tool_run_logging_kwargs() if return_direct: tool_run_kwargs["llm_prefix"] = "" # We then call the tool on the tool input to get an observation observation = tool.run( agent_action.tool_input, verbose=self.verbose, color=color, callbacks=run_manager.get_child() if run_manager else None, **tool_run_kwargs, ) else: tool_run_kwargs = self.agent.tool_run_logging_kwargs() observation = InvalidTool().run( { "requested_tool_name": agent_action.tool, "available_tool_names": list(name_to_tool_map.keys()), }, verbose=self.verbose, color=None, callbacks=run_manager.get_child() if run_manager else None, **tool_run_kwargs, ) return AgentStep(action=agent_action, observation=observation) async def _atake_next_step( self, name_to_tool_map: Dict[str, BaseTool], color_mapping: Dict[str, str], inputs: Dict[str, str], intermediate_steps: List[Tuple[AgentAction, str]], run_manager: Optional[AsyncCallbackManagerForChainRun] = None, ) -> Union[AgentFinish, List[Tuple[AgentAction, str]]]: return self._consume_next_step( [ a async for a in self._aiter_next_step( name_to_tool_map, color_mapping, inputs, intermediate_steps, run_manager, ) ] ) async def _aiter_next_step( self, name_to_tool_map: Dict[str, BaseTool], color_mapping: Dict[str, str], inputs: Dict[str, str], intermediate_steps: List[Tuple[AgentAction, str]], run_manager: Optional[AsyncCallbackManagerForChainRun] = None, ) -> AsyncIterator[Union[AgentFinish, AgentAction, AgentStep]]: """Take a single step in the thought-action-observation loop. Override this to take control of how the agent makes and acts on choices. """ try: intermediate_steps = self._prepare_intermediate_steps(intermediate_steps) # Call the LLM to see what to do. output = await self.agent.aplan( intermediate_steps, callbacks=run_manager.get_child() if run_manager else None, **inputs, ) except OutputParserException as e: if isinstance(self.handle_parsing_errors, bool): raise_error = not self.handle_parsing_errors else: raise_error = False if raise_error: raise ValueError( "An output parsing error occurred. " "In order to pass this error back to the agent and have it try " "again, pass `handle_parsing_errors=True` to the AgentExecutor. " f"This is the error: {str(e)}" ) text = str(e) if isinstance(self.handle_parsing_errors, bool): if e.send_to_llm: observation = str(e.observation) text = str(e.llm_output) else: observation = "Invalid or incomplete response" elif isinstance(self.handle_parsing_errors, str): observation = self.handle_parsing_errors elif callable(self.handle_parsing_errors): observation = self.handle_parsing_errors(e) else: raise ValueError("Got unexpected type of `handle_parsing_errors`") output = AgentAction("_Exception", observation, text) tool_run_kwargs = self.agent.tool_run_logging_kwargs() observation = await ExceptionTool().arun( output.tool_input, verbose=self.verbose, color=None, callbacks=run_manager.get_child() if run_manager else None, **tool_run_kwargs, ) yield AgentStep(action=output, observation=observation) return # If the tool chosen is the finishing tool, then we end and return. if isinstance(output, AgentFinish): yield output return actions: List[AgentAction] if isinstance(output, AgentAction): actions = [output] else: actions = output for agent_action in actions: yield agent_action # Use asyncio.gather to run multiple tool.arun() calls concurrently result = await asyncio.gather( *[ self._aperform_agent_action( name_to_tool_map, color_mapping, agent_action, run_manager ) for agent_action in actions ], ) # TODO This could yield each result as it becomes available for chunk in result: yield chunk async def _aperform_agent_action( self, name_to_tool_map: Dict[str, BaseTool], color_mapping: Dict[str, str], agent_action: AgentAction, run_manager: Optional[AsyncCallbackManagerForChainRun] = None, ) -> AgentStep: if run_manager: await run_manager.on_agent_action( agent_action, verbose=self.verbose, color="green" ) # Otherwise we lookup the tool if agent_action.tool in name_to_tool_map: tool = name_to_tool_map[agent_action.tool] return_direct = tool.return_direct color = color_mapping[agent_action.tool] tool_run_kwargs = self.agent.tool_run_logging_kwargs() if return_direct: tool_run_kwargs["llm_prefix"] = "" # We then call the tool on the tool input to get an observation observation = await tool.arun( agent_action.tool_input, verbose=self.verbose, color=color, callbacks=run_manager.get_child() if run_manager else None, **tool_run_kwargs, ) else: tool_run_kwargs = self.agent.tool_run_logging_kwargs() observation = await InvalidTool().arun( { "requested_tool_name": agent_action.tool, "available_tool_names": list(name_to_tool_map.keys()), }, verbose=self.verbose, color=None, callbacks=run_manager.get_child() if run_manager else None, **tool_run_kwargs, ) return AgentStep(action=agent_action, observation=observation) def _call( self, inputs: Dict[str, str], run_manager: Optional[CallbackManagerForChainRun] = None, ) -> Dict[str, Any]: """Run text through and get agent response.""" # Construct a mapping of tool name to tool for easy lookup name_to_tool_map = {tool.name: tool for tool in self.tools} # We construct a mapping from each tool to a color, used for logging. color_mapping = get_color_mapping( [tool.name for tool in self.tools], excluded_colors=["green", "red"] ) intermediate_steps: List[Tuple[AgentAction, str]] = [] # Let's start tracking the number of iterations and time elapsed iterations = 0 time_elapsed = 0.0 start_time = time.time() # We now enter the agent loop (until it returns something). while self._should_continue(iterations, time_elapsed): next_step_output = self._take_next_step( name_to_tool_map, color_mapping, inputs, intermediate_steps, run_manager=run_manager, ) if isinstance(next_step_output, AgentFinish): return self._return( next_step_output, intermediate_steps, run_manager=run_manager ) intermediate_steps.extend(next_step_output) if len(next_step_output) == 1: next_step_action = next_step_output[0] # See if tool should return directly tool_return = self._get_tool_return(next_step_action) if tool_return is not None: return self._return( tool_return, intermediate_steps, run_manager=run_manager ) iterations += 1 time_elapsed = time.time() - start_time output = self.agent.return_stopped_response( self.early_stopping_method, intermediate_steps, **inputs ) return self._return(output, intermediate_steps, run_manager=run_manager) async def _acall( self, inputs: Dict[str, str], run_manager: Optional[AsyncCallbackManagerForChainRun] = None, ) -> Dict[str, str]: """Run text through and get agent response.""" # Construct a mapping of tool name to tool for easy lookup name_to_tool_map = {tool.name: tool for tool in self.tools} # We construct a mapping from each tool to a color, used for logging. color_mapping = get_color_mapping( [tool.name for tool in self.tools], excluded_colors=["green"] ) intermediate_steps: List[Tuple[AgentAction, str]] = [] # Let's start tracking the number of iterations and time elapsed iterations = 0 time_elapsed = 0.0 start_time = time.time() # We now enter the agent loop (until it returns something). try: async with asyncio_timeout(self.max_execution_time): while self._should_continue(iterations, time_elapsed): next_step_output = await self._atake_next_step( name_to_tool_map, color_mapping, inputs, intermediate_steps, run_manager=run_manager, ) if isinstance(next_step_output, AgentFinish): return await self._areturn( next_step_output, intermediate_steps, run_manager=run_manager, ) intermediate_steps.extend(next_step_output) if len(next_step_output) == 1: next_step_action = next_step_output[0] # See if tool should return directly tool_return = self._get_tool_return(next_step_action) if tool_return is not None: return await self._areturn( tool_return, intermediate_steps, run_manager=run_manager ) iterations += 1 time_elapsed = time.time() - start_time output = self.agent.return_stopped_response( self.early_stopping_method, intermediate_steps, **inputs ) return await self._areturn( output, intermediate_steps, run_manager=run_manager ) except (TimeoutError, asyncio.TimeoutError): # stop early when interrupted by the async timeout output = self.agent.return_stopped_response( self.early_stopping_method, intermediate_steps, **inputs ) return await self._areturn( output, intermediate_steps, run_manager=run_manager ) def _get_tool_return( self, next_step_output: Tuple[AgentAction, str] ) -> Optional[AgentFinish]: """Check if the tool is a returning tool.""" agent_action, observation = next_step_output name_to_tool_map = {tool.name: tool for tool in self.tools} return_value_key = "output" if len(self.agent.return_values) > 0: return_value_key = self.agent.return_values[0] # Invalid tools won't be in the map, so we return False. if agent_action.tool in name_to_tool_map: if name_to_tool_map[agent_action.tool].return_direct: return AgentFinish( {return_value_key: observation}, "", ) return None def _prepare_intermediate_steps( self, intermediate_steps: List[Tuple[AgentAction, str]] ) -> List[Tuple[AgentAction, str]]: if ( isinstance(self.trim_intermediate_steps, int) and self.trim_intermediate_steps > 0 ): return intermediate_steps[-self.trim_intermediate_steps :] elif callable(self.trim_intermediate_steps): return self.trim_intermediate_steps(intermediate_steps) else: return intermediate_steps
[docs] def stream( self, input: Union[Dict[str, Any], Any], config: Optional[RunnableConfig] = None, **kwargs: Any, ) -> Iterator[AddableDict]: """Enables streaming over steps taken to reach final output.""" config = ensure_config(config) iterator = AgentExecutorIterator( self, input, config.get("callbacks"), tags=config.get("tags"), metadata=config.get("metadata"), run_name=config.get("run_name"), run_id=config.get("run_id"), yield_actions=True, **kwargs, ) for step in iterator: yield step
[docs] async def astream( self, input: Union[Dict[str, Any], Any], config: Optional[RunnableConfig] = None, **kwargs: Any, ) -> AsyncIterator[AddableDict]: """Enables streaming over steps taken to reach final output.""" config = ensure_config(config) iterator = AgentExecutorIterator( self, input, config.get("callbacks"), tags=config.get("tags"), metadata=config.get("metadata"), run_name=config.get("run_name"), run_id=config.get("run_id"), yield_actions=True, **kwargs, ) async for step in iterator: yield step