Skip to content

Agents

Introduction

Agents are PydanticAI's primary interface for interacting with LLMs.

In some use cases a single Agent will control an entire application or component, but multiple agents can also interact to embody more complex workflows.

The Agent class is well documented, but in essence you can think of an agent as a container for:

  • A system prompt — a set of instructions for the LLM written by the developer
  • One or more retrievers — functions that the LLM may call to get information while generating a response
  • An optional structured result type — the structured datatype the LLM must return at the end of a run
  • A dependency type constraint — system prompt functions, retrievers and result validators may all use dependencies when they're run
  • Agents may optionally also have a default model associated with them, the model to use can also be defined when running the agent

In typing terms, agents are generic in their dependency and result types, e.g. an agent which required Foobar dependencies and returned data of type list[str] results would have type Agent[Foobar, list[str]].

Here's a toy example of an agent that simulates a roulette wheel:

roulette_wheel.py
from pydantic_ai import Agent, CallContext

roulette_agent = Agent(  # (1)!
    'openai:gpt-4o',
    deps_type=int,
    result_type=bool,
    system_prompt=(
        'Use the `roulette_wheel` to see if the '
        'customer has won based on the number they provide.'
    ),
)


@roulette_agent.retriever_context
async def roulette_wheel(ctx: CallContext[int], square: int) -> str:  # (2)!
    """check if the square is a winner"""
    return 'winner' if square == ctx.deps else 'loser'


# Run the agent
success_number = 18  # (3)!
result = roulette_agent.run_sync('Put my money on square eighteen', deps=success_number)
print(result.data)  # (4)!
#> True

result = roulette_agent.run_sync('I bet five is the winner', deps=success_number)
print(result.data)
#> False
  1. Create an agent, which expects an integer dependency and returns a boolean result, this agent will ahve type of Agent[int, bool].
  2. Define a retriever that checks if the square is a winner, here CallContext is parameterized with the dependency type int, if you got the dependency type wrong you'd get a typing error.
  3. In reality, you might want to use a random number here e.g. random.randint(0, 36) here.
  4. result.data will be a boolean indicating if the square is a winner, Pydantic performs the result validation, it'll be typed as a bool since its type is derived from the result_type generic parameter of the agent.

Agents are Singletons, like FastAPI

Agents are a singleton instance, you can think of them as similar to a small FastAPI app or an APIRouter.

Running Agents

There are three ways to run an agent:

  1. agent.run() — a coroutine which returns a result containing a completed response, returns a RunResult
  2. agent.run_sync() — a plain function which returns a result containing a completed response (internally, this just calls asyncio.run(self.run())), returns a RunResult
  3. agent.run_stream() — a coroutine which returns a result containing methods to stream a response as an async iterable, returns a StreamedRunResult

Here's a simple example demonstrating all three:

run_agent.py
from pydantic_ai import Agent

agent = Agent('openai:gpt-4o')

result_sync = agent.run_sync('What is the capital of Italy?')
print(result_sync.data)
#> Rome


async def main():
    result = await agent.run('What is the capital of France?')
    print(result.data)
    #> Paris

    async with agent.run_stream('What is the capital of the UK?') as response:
        print(await response.get_data())
        #> London
(This example is complete, it can be run "as is")

You can also pass messages from previous runs to continue a conversation or provide context, as described in Messages and Chat History.

Runs vs. Conversations

An agent run might represent an entire conversation — there's no limit to how many messages can be exchanged in a single run. However, a conversation might also be composed of multiple runs, especially if you need to maintain state between separate interactions or API calls.

Here's an example of a conversation comprised of multiple runs:

conversation_example.py
from pydantic_ai import Agent

agent = Agent('openai:gpt-4o')

# First run
result1 = agent.run_sync('Who was Albert Einstein?')
print(result1.data)
#> Albert Einstein was a German-born theoretical physicist.

# Second run, passing previous messages
result2 = agent.run_sync(
    'What was his most famous equation?', message_history=result1.new_messages()  # (1)!
)
print(result2.data)
#> Albert Einstein's most famous equation is (E = mc^2).
1. Continue the conversation, without message_history the model would not know who "he" was referring to.

(This example is complete, it can be run "as is")

System Prompts

System prompts might seem simple at first glance since they're just strings (or sequences of strings that are concatenated), but crafting the right system prompt is key to getting the model to behave as you want.

Generally, system prompts fall into two categories:

  1. Static system prompts: These are known when writing the code and can be defined via the system_prompt parameter of the Agent constructor.
  2. Dynamic system prompts: These aren't known until runtime and should be defined via functions decorated with @agent.system_prompt.

You can add both to a single agent; they're concatenated in the order they're defined at runtime.

Here's an example using both types of system prompts:

system_prompts.py
from datetime import date

from pydantic_ai import Agent, CallContext

agent = Agent(
    'openai:gpt-4o',
    deps_type=str,  # (1)!
    system_prompt="Use the customer's name while replying to them.",  # (2)!
)


@agent.system_prompt  # (3)!
def add_the_users_name(ctx: CallContext[str]) -> str:
    return f"The user's named is {ctx.deps}."


@agent.system_prompt
def add_the_date() -> str:  # (4)!
    return f'The date is {date.today()}.'


result = agent.run_sync('What is the date?', deps='Frank')
print(result.data)
#> Hello Frank, the date today is 2032-01-02.
  1. The agent expects a string dependency.
  2. Static system prompt defined at agent creation time.
  3. Dynamic system prompt defined via a decorator.
  4. Another dynamic system prompt, system prompts don't have to have the CallContext parameter.

(This example is complete, it can be run "as is")

Retrievers

Retrievers provide a mechanism for models to request extra information to help them generate a response.

They're useful when it is impractical or impossible to put all the context an agent might need into the system prompt, or when you want to make agents' behavior more deterministic by deferring some of the logic required to generate a response to another tool.

Retrievers vs. RAG

Retrievers are basically the "R" of RAG (Retrieval-Augmented Generation) — they augment what the model can do by letting it request extra information.

The main semantic difference between PydanticAI Retreivers and RAG is RAG is synonymous with vector search, while PydanticAI retrievers are more general purpose. (Note: we might add support for some vector search functionality in the future, particuarly an API for generating embeddings, see #58)

There are two different decorator functions to register retrievers:

  1. @agent.retriever_plain — for retrievers that don't need access to the agent context
  2. @agent.retriever_context — for retrievers that do need access to the agent context

Here's an example using both:

dice_game.py
import random

from pydantic_ai import Agent, CallContext

agent = Agent(
    'gemini-1.5-flash',  # (1)!
    deps_type=str,  # (2)!
    system_prompt=(
        "You're a dice game, you should roll the dice and see if the number "
        "you got back matches the user's guess, if so tell them they're a winner. "
        "Use the player's name in the response."
    ),
)


@agent.retriever_plain  # (3)!
def roll_dice() -> str:
    """Roll a six-sided dice and return the result."""
    return str(random.randint(1, 6))


@agent.retriever_context  # (4)!
def get_player_name(ctx: CallContext[str]) -> str:
    """Get the player's name."""
    return ctx.deps


dice_result = agent.run_sync('My guess is 4', deps='Adam')  # (5)!
print(dice_result.data)
#> Congratulations Adam, you guessed correctly! You're a winner!
  1. This is a pretty simple task, so we can use the fast and cheap Gemini flash model.
  2. We pass the user's name as the dependency, to keep things simple we use just the name as a string as the dependency.
  3. This retriever doesn't need any context, it just returns a random number. You could probably use a dynamic system prompt in this case.
  4. This retriever needs the player's name, so it uses CallContext to access dependencies which are just the player's name.
  5. Run the agent, passing the player's name as the dependency.

(This example is complete, it can be run "as is")

Let's print the messages from that game to see what happened:

dice_game_messages.py
from dice_game import dice_result

print(dice_result.all_messages())
"""
[
    SystemPrompt(
        content="You're a dice game, you should roll the dice and see if the number you got back matches the user's guess, if so tell them they're a winner. Use the player's name in the response.",
        role='system',
    ),
    UserPrompt(
        content='My guess is 4',
        timestamp=datetime.datetime(...),
        role='user',
    ),
    ModelStructuredResponse(
        calls=[
            ToolCall(
                tool_name='roll_dice', args=ArgsObject(args_object={}), tool_id=None
            )
        ],
        timestamp=datetime.datetime(...),
        role='model-structured-response',
    ),
    ToolReturn(
        tool_name='roll_dice',
        content='4',
        tool_id=None,
        timestamp=datetime.datetime(...),
        role='tool-return',
    ),
    ModelStructuredResponse(
        calls=[
            ToolCall(
                tool_name='get_player_name',
                args=ArgsObject(args_object={}),
                tool_id=None,
            )
        ],
        timestamp=datetime.datetime(...),
        role='model-structured-response',
    ),
    ToolReturn(
        tool_name='get_player_name',
        content='Adam',
        tool_id=None,
        timestamp=datetime.datetime(...),
        role='tool-return',
    ),
    ModelTextResponse(
        content="Congratulations Adam, you guessed correctly! You're a winner!",
        timestamp=datetime.datetime(...),
        role='model-text-response',
    ),
]
"""

We can represent that as a flow diagram, thus:

Dice game flow diagram Dice game flow diagram

Retrievers, tools, and schema

Under the hood, retrievers use the model's "tools" or "functions" API to let the model know what retrievers are available to call. Tools or functions are also used to define the schema(s) for structured responses, thus a model might have access to many tools, some of which call retrievers while others end the run and return a result.

Function parameters are extracted from the function signature, and all parameters except CallContext are used to build the schema for that tool call.

Even better, PydanticAI extracts the docstring from retriever functions and (thanks to griffe) extracts parameter descriptions from the docstring and add them to the schema.

Griffe supports extracting parameter descriptions from google, numpy and sphinx style docstrings, PydanticAI will infer the format to use based on the docstring. We'll add support in future to explicitly set the style to use, and warn/error if not all parameters are documented, see #59.

To demonstrate retriever schema, here we use FunctionModel to print the schema a model would receive:

retriever_schema.py
from pydantic_ai import Agent
from pydantic_ai.messages import Message, ModelAnyResponse, ModelTextResponse
from pydantic_ai.models.function import AgentInfo, FunctionModel

agent = Agent()


@agent.retriever_plain
def foobar(a: int, b: str, c: dict[str, list[float]]) -> str:
    """Get me foobar.

    Args:
        a: apple pie
        b: banana cake
        c: carrot smoothie
    """
    return f'{a} {b} {c}'


def print_schema(messages: list[Message], info: AgentInfo) -> ModelAnyResponse:
    retriever = info.retrievers['foobar']
    print(retriever.description)
    #> Get me foobar.
    print(retriever.json_schema)
    """
    {
        'description': 'Get me foobar.',
        'properties': {
            'a': {'description': 'apple pie', 'title': 'A', 'type': 'integer'},
            'b': {'description': 'banana cake', 'title': 'B', 'type': 'string'},
            'c': {
                'additionalProperties': {'items': {'type': 'number'}, 'type': 'array'},
                'description': 'carrot smoothie',
                'title': 'C',
                'type': 'object',
            },
        },
        'required': ['a', 'b', 'c'],
        'type': 'object',
        'additionalProperties': False,
    }
    """
    return ModelTextResponse(content='foobar')


agent.run_sync('hello', model=FunctionModel(print_schema))

(This example is complete, it can be run "as is")

The return type of retriever can any valid JSON object (JsonData) as some models (e.g. Gemini) support semi-structured return values, some expect text (OpenAI) but seem to be just as good at extracting meaning from the data, if a Python is returned and the model expects a string, the value will be serialized to JSON

Reflection and self-correction

Validation errors from both retriever parameter validation and structured result validation can be passed back to the model with a request to retry.

You can also raise ModelRetry from within a retriever or result validator functions to tell the model it should retry.

Here's an example:

retriever_retry.py
from fake_database import DatabaseConn
from pydantic import BaseModel

from pydantic_ai import Agent, CallContext, ModelRetry


class ChatResult(BaseModel):
    user_id: int
    message: str


agent = Agent(
    'openai:gpt-4o',
    deps_type=DatabaseConn,
    result_type=ChatResult,
)


@agent.retriever_context(retries=2)
def get_user_by_name(ctx: CallContext[DatabaseConn], name: str) -> int:
    """Get a user's ID from their full name."""
    print(name)
    #> John
    #> John Doe
    user_id = ctx.deps.users.get(name=name)
    if user_id is None:
        raise ModelRetry(
            f'No user found with name {name!r}, remember to provide their full name'
        )
    return user_id


result = agent.run_sync(
    'Send a message to John Doe asking for coffee next week', deps=DatabaseConn()
)
print(result.data)
"""
user_id=123 message='Hello John, would you be free for coffee sometime next week? Let me know what works for you!'
"""

Model errors

If models behave unexpectedly (e.g., the retry limit is exceeded, or their API returns 503), agent runs will raise UnexpectedModelBehaviour.

In these cases, agent.last_run_messages can be used to access the messages exchanged during the run to help diagnose the issue.

from pydantic_ai import Agent, ModelRetry, UnexpectedModelBehaviour

agent = Agent('openai:gpt-4o')


@agent.retriever_plain
def calc_volume(size: int) -> int:  # (1)!
    if size == 42:
        return size**3
    else:
        raise ModelRetry('Please try again.')


try:
    result = agent.run_sync('Please get me the volume of a box with size 6.')
except UnexpectedModelBehaviour as e:
    print('An error occurred:', e)
    #> An error occurred: Retriever exceeded max retries count of 1
    print('cause:', repr(e.__cause__))
    #> cause: ModelRetry('Please try again.')
    print('messages:', agent.last_run_messages)
    """
    messages:
    [
        UserPrompt(
            content='Please get me the volume of a box with size 6.',
            timestamp=datetime.datetime(...),
            role='user',
        ),
        ModelStructuredResponse(
            calls=[
                ToolCall(
                    tool_name='calc_volume',
                    args=ArgsObject(args_object={'size': 6}),
                    tool_id=None,
                )
            ],
            timestamp=datetime.datetime(...),
            role='model-structured-response',
        ),
        RetryPrompt(
            content='Please try again.',
            tool_name='calc_volume',
            tool_id=None,
            timestamp=datetime.datetime(...),
            role='retry-prompt',
        ),
        ModelStructuredResponse(
            calls=[
                ToolCall(
                    tool_name='calc_volume',
                    args=ArgsObject(args_object={'size': 6}),
                    tool_id=None,
                )
            ],
            timestamp=datetime.datetime(...),
            role='model-structured-response',
        ),
    ]
    """
else:
    print(result.data)
1. Define a retriever that will raise ModelRetry repeatedly in this case.

(This example is complete, it can be run "as is")

API Reference

Bases: Generic[AgentDeps, ResultData]

Class for defining "agents" - a way to have a specific type of "conversation" with an LLM.

Agents are generic in the dependency type they take AgentDeps and the result data type they return, ResultData.

By default, if neither generic parameter is customised, agents have type Agent[None, str].

Minimal usage example:

from pydantic_ai import Agent

agent = Agent('openai:gpt-4o')
result = agent.run_sync('What is the capital of France?')
print(result.data)
#> Paris
Source code in pydantic_ai/agent.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
@final
@dataclass(init=False)
class Agent(Generic[AgentDeps, ResultData]):
    """Class for defining "agents" - a way to have a specific type of "conversation" with an LLM.

    Agents are generic in the dependency type they take [`AgentDeps`][pydantic_ai.dependencies.AgentDeps]
    and the result data type they return, [`ResultData`][pydantic_ai.result.ResultData].

    By default, if neither generic parameter is customised, agents have type `Agent[None, str]`.

    Minimal usage example:

    ```py
    from pydantic_ai import Agent

    agent = Agent('openai:gpt-4o')
    result = agent.run_sync('What is the capital of France?')
    print(result.data)
    #> Paris
    ```
    """

    # dataclass fields mostly for my sanity — knowing what attributes are available
    model: models.Model | models.KnownModelName | None
    """The default model configured for this agent."""
    _result_schema: _result.ResultSchema[ResultData] | None
    _result_validators: list[_result.ResultValidator[AgentDeps, ResultData]]
    _allow_text_result: bool
    _system_prompts: tuple[str, ...]
    _retrievers: dict[str, _r.Retriever[AgentDeps, Any]]
    _default_retries: int
    _system_prompt_functions: list[_system_prompt.SystemPromptRunner[AgentDeps]]
    _deps_type: type[AgentDeps]
    _max_result_retries: int
    _current_result_retry: int
    _override_deps: _utils.Option[AgentDeps] = None
    _override_model: _utils.Option[models.Model] = None
    last_run_messages: list[_messages.Message] | None = None
    """The messages from the last run, useful when a run raised an exception.

    Note: these are not used by the agent, e.g. in future runs, they are just stored for developers' convenience.
    """

    def __init__(
        self,
        model: models.Model | models.KnownModelName | None = None,
        result_type: type[ResultData] = str,
        *,
        system_prompt: str | Sequence[str] = (),
        deps_type: type[AgentDeps] = NoneType,
        retries: int = 1,
        result_tool_name: str = 'final_result',
        result_tool_description: str | None = None,
        result_retries: int | None = None,
        defer_model_check: bool = False,
    ):
        """Create an agent.

        Args:
            model: The default model to use for this agent, if not provide,
                you must provide the model when calling the agent.
            result_type: The type of the result data, used to validate the result data, defaults to `str`.
            system_prompt: Static system prompts to use for this agent, you can also register system
                prompts via a function with [`system_prompt`][pydantic_ai.Agent.system_prompt].
            deps_type: The type used for dependency injection, this parameter exists solely to allow you to fully
                parameterize the agent, and therefore get the best out of static type checking.
                If you're not using deps, but want type checking to pass, you can set `deps=None` to satisfy Pyright
                or add a type hint `: Agent[None, <return type>]`.
            retries: The default number of retries to allow before raising an error.
            result_tool_name: The name of the tool to use for the final result.
            result_tool_description: The description of the final result tool.
            result_retries: The maximum number of retries to allow for result validation, defaults to `retries`.
            defer_model_check: by default, if you provide a [named][pydantic_ai.models.KnownModelName] model,
                it's evaluated to create a [`Model`][pydantic_ai.models.Model] instance immediately,
                which checks for the necessary environment variables. Set this to `false`
                to defer the evaluation until the first run. Useful if you want to
                [override the model][pydantic_ai.Agent.override_model] for testing.
        """
        if model is None or defer_model_check:
            self.model = model
        else:
            self.model = models.infer_model(model)

        self._result_schema = _result.ResultSchema[result_type].build(
            result_type, result_tool_name, result_tool_description
        )
        # if the result tool is None, or its schema allows `str`, we allow plain text results
        self._allow_text_result = self._result_schema is None or self._result_schema.allow_text_result

        self._system_prompts = (system_prompt,) if isinstance(system_prompt, str) else tuple(system_prompt)
        self._retrievers: dict[str, _r.Retriever[AgentDeps, Any]] = {}
        self._deps_type = deps_type
        self._default_retries = retries
        self._system_prompt_functions = []
        self._max_result_retries = result_retries if result_retries is not None else retries
        self._current_result_retry = 0
        self._result_validators = []

    async def run(
        self,
        user_prompt: str,
        *,
        message_history: list[_messages.Message] | None = None,
        model: models.Model | models.KnownModelName | None = None,
        deps: AgentDeps = None,
    ) -> result.RunResult[ResultData]:
        """Run the agent with a user prompt in async mode.

        Args:
            user_prompt: User input to start/continue the conversation.
            message_history: History of the conversation so far.
            model: Optional model to use for this run, required if `model` was not set when creating the agent.
            deps: Optional dependencies to use for this run.

        Returns:
            The result of the run.
        """
        model_used, custom_model, agent_model = await self._get_agent_model(model)

        deps = self._get_deps(deps)

        new_message_index, messages = await self._prepare_messages(deps, user_prompt, message_history)
        self.last_run_messages = messages

        for retriever in self._retrievers.values():
            retriever.reset()

        cost = result.Cost()

        with _logfire.span(
            'agent run {prompt=}',
            prompt=user_prompt,
            agent=self,
            custom_model=custom_model,
            model_name=model_used.name(),
        ) as run_span:
            run_step = 0
            while True:
                run_step += 1
                with _logfire.span('model request {run_step=}', run_step=run_step) as model_req_span:
                    model_response, request_cost = await agent_model.request(messages)
                    model_req_span.set_attribute('response', model_response)
                    model_req_span.set_attribute('cost', request_cost)
                    model_req_span.message = f'model request -> {model_response.role}'

                messages.append(model_response)
                cost += request_cost

                with _logfire.span('handle model response') as handle_span:
                    either = await self._handle_model_response(model_response, deps)

                    if isinstance(either, _MarkFinalResult):
                        # we have a final result, end the conversation
                        result_data = either.data
                        run_span.set_attribute('all_messages', messages)
                        run_span.set_attribute('cost', cost)
                        handle_span.set_attribute('result', result_data)
                        handle_span.message = 'handle model response -> final result'
                        return result.RunResult(messages, new_message_index, result_data, cost)
                    else:
                        # continue the conversation
                        tool_responses = either
                        handle_span.set_attribute('tool_responses', tool_responses)
                        response_msgs = ' '.join(m.role for m in tool_responses)
                        handle_span.message = f'handle model response -> {response_msgs}'
                        messages.extend(tool_responses)

    def run_sync(
        self,
        user_prompt: str,
        *,
        message_history: list[_messages.Message] | None = None,
        model: models.Model | models.KnownModelName | None = None,
        deps: AgentDeps = None,
    ) -> result.RunResult[ResultData]:
        """Run the agent with a user prompt synchronously.

        This is a convenience method that wraps `self.run` with `asyncio.run()`.

        Args:
            user_prompt: User input to start/continue the conversation.
            message_history: History of the conversation so far.
            model: Optional model to use for this run, required if `model` was not set when creating the agent.
            deps: Optional dependencies to use for this run.

        Returns:
            The result of the run.
        """
        return asyncio.run(self.run(user_prompt, message_history=message_history, model=model, deps=deps))

    @asynccontextmanager
    async def run_stream(
        self,
        user_prompt: str,
        *,
        message_history: list[_messages.Message] | None = None,
        model: models.Model | models.KnownModelName | None = None,
        deps: AgentDeps = None,
    ) -> AsyncIterator[result.StreamedRunResult[AgentDeps, ResultData]]:
        """Run the agent with a user prompt in async mode, returning a streamed response.

        Args:
            user_prompt: User input to start/continue the conversation.
            message_history: History of the conversation so far.
            model: Optional model to use for this run, required if `model` was not set when creating the agent.
            deps: Optional dependencies to use for this run.

        Returns:
            The result of the run.
        """
        model_used, custom_model, agent_model = await self._get_agent_model(model)

        deps = self._get_deps(deps)

        new_message_index, messages = await self._prepare_messages(deps, user_prompt, message_history)
        self.last_run_messages = messages

        for retriever in self._retrievers.values():
            retriever.reset()

        cost = result.Cost()

        with _logfire.span(
            'agent run stream {prompt=}',
            prompt=user_prompt,
            agent=self,
            custom_model=custom_model,
            model_name=model_used.name(),
        ) as run_span:
            run_step = 0
            while True:
                run_step += 1
                with _logfire.span('model request {run_step=}', run_step=run_step) as model_req_span:
                    async with agent_model.request_stream(messages) as model_response:
                        model_req_span.set_attribute('response_type', model_response.__class__.__name__)
                        # We want to end the "model request" span here, but we can't exit the context manager
                        # in the traditional way
                        model_req_span.__exit__(None, None, None)

                        with _logfire.span('handle model response') as handle_span:
                            either = await self._handle_streamed_model_response(model_response, deps)

                            if isinstance(either, _MarkFinalResult):
                                result_stream = either.data
                                run_span.set_attribute('all_messages', messages)
                                handle_span.set_attribute('result_type', result_stream.__class__.__name__)
                                handle_span.message = 'handle model response -> final result'
                                yield result.StreamedRunResult(
                                    messages,
                                    new_message_index,
                                    cost,
                                    result_stream,
                                    self._result_schema,
                                    deps,
                                    self._result_validators,
                                )
                                return
                            else:
                                tool_responses = either
                                handle_span.set_attribute('tool_responses', tool_responses)
                                response_msgs = ' '.join(m.role for m in tool_responses)
                                handle_span.message = f'handle model response -> {response_msgs}'
                                messages.extend(tool_responses)
                                # the model_response should have been fully streamed by now, we can add it's cost
                                cost += model_response.cost()

    @contextmanager
    def override_deps(self, overriding_deps: AgentDeps) -> Iterator[None]:
        """Context manager to temporarily override agent dependencies, this is particularly useful when testing.

        Args:
            overriding_deps: The dependencies to use instead of the dependencies passed to the agent run.
        """
        override_deps_before = self._override_deps
        self._override_deps = _utils.Some(overriding_deps)
        try:
            yield
        finally:
            self._override_deps = override_deps_before

    @contextmanager
    def override_model(self, overriding_model: models.Model | models.KnownModelName) -> Iterator[None]:
        """Context manager to temporarily override the model used by the agent.

        Args:
            overriding_model: The model to use instead of the model passed to the agent run.
        """
        override_model_before = self._override_model
        self._override_model = _utils.Some(models.infer_model(overriding_model))
        try:
            yield
        finally:
            self._override_model = override_model_before

    def system_prompt(
        self, func: _system_prompt.SystemPromptFunc[AgentDeps]
    ) -> _system_prompt.SystemPromptFunc[AgentDeps]:
        """Decorator to register a system prompt function that optionally takes `CallContext` as it's only argument."""
        self._system_prompt_functions.append(_system_prompt.SystemPromptRunner(func))
        return func

    def result_validator(
        self, func: _result.ResultValidatorFunc[AgentDeps, ResultData]
    ) -> _result.ResultValidatorFunc[AgentDeps, ResultData]:
        """Decorator to register a result validator function."""
        self._result_validators.append(_result.ResultValidator(func))
        return func

    @overload
    def retriever_context(
        self, func: RetrieverContextFunc[AgentDeps, RetrieverParams], /
    ) -> _r.Retriever[AgentDeps, RetrieverParams]: ...

    @overload
    def retriever_context(
        self, /, *, retries: int | None = None
    ) -> Callable[[RetrieverContextFunc[AgentDeps, RetrieverParams]], _r.Retriever[AgentDeps, RetrieverParams]]: ...

    def retriever_context(
        self,
        func: RetrieverContextFunc[AgentDeps, RetrieverParams] | None = None,
        /,
        *,
        retries: int | None = None,
    ) -> Any:
        """Decorator to register a retriever function."""
        if func is None:

            def retriever_decorator(
                func_: RetrieverContextFunc[AgentDeps, RetrieverParams],
            ) -> _r.Retriever[AgentDeps, RetrieverParams]:
                # noinspection PyTypeChecker
                return self._register_retriever(_utils.Either(left=func_), retries)

            return retriever_decorator
        else:
            # noinspection PyTypeChecker
            return self._register_retriever(_utils.Either(left=func), retries)

    @overload
    def retriever_plain(
        self, func: RetrieverPlainFunc[RetrieverParams], /
    ) -> _r.Retriever[AgentDeps, RetrieverParams]: ...

    @overload
    def retriever_plain(
        self, /, *, retries: int | None = None
    ) -> Callable[[RetrieverPlainFunc[RetrieverParams]], _r.Retriever[AgentDeps, RetrieverParams]]: ...

    def retriever_plain(
        self, func: RetrieverPlainFunc[RetrieverParams] | None = None, /, *, retries: int | None = None
    ) -> Any:
        """Decorator to register a retriever function."""
        if func is None:

            def retriever_decorator(
                func_: RetrieverPlainFunc[RetrieverParams],
            ) -> _r.Retriever[AgentDeps, RetrieverParams]:
                # noinspection PyTypeChecker
                return self._register_retriever(_utils.Either(right=func_), retries)

            return retriever_decorator
        else:
            return self._register_retriever(_utils.Either(right=func), retries)

    def _register_retriever(
        self, func: _r.RetrieverEitherFunc[AgentDeps, RetrieverParams], retries: int | None
    ) -> _r.Retriever[AgentDeps, RetrieverParams]:
        """Private utility to register a retriever function."""
        retries_ = retries if retries is not None else self._default_retries
        retriever = _r.Retriever[AgentDeps, RetrieverParams](func, retries_)

        if self._result_schema and retriever.name in self._result_schema.tools:
            raise ValueError(f'Retriever name conflicts with result schema name: {retriever.name!r}')

        if retriever.name in self._retrievers:
            raise ValueError(f'Retriever name conflicts with existing retriever: {retriever.name!r}')

        self._retrievers[retriever.name] = retriever
        return retriever

    async def _get_agent_model(
        self, model: models.Model | models.KnownModelName | None
    ) -> tuple[models.Model, models.Model | None, models.AgentModel]:
        """Create a model configured for this agent.

        Args:
            model: model to use for this run, required if `model` was not set when creating the agent.

        Returns:
            a tuple of `(model used, custom_model if any, agent_model)`
        """
        model_: models.Model
        if some_model := self._override_model:
            # we don't want `override_model()` to cover up errors from the model not being defined, hence this check
            if model is None and self.model is None:
                raise exceptions.UserError(
                    '`model` must be set either when creating the agent or when calling it. '
                    '(Even when `override_model()` is customizing the model that will actually be called)'
                )
            model_ = some_model.value
            custom_model = None
        elif model is not None:
            custom_model = model_ = models.infer_model(model)
        elif self.model is not None:
            # noinspection PyTypeChecker
            model_ = self.model = models.infer_model(self.model)
            custom_model = None
        else:
            raise exceptions.UserError('`model` must be set either when creating the agent or when calling it.')

        result_tools = list(self._result_schema.tools.values()) if self._result_schema else None
        return model_, custom_model, model_.agent_model(self._retrievers, self._allow_text_result, result_tools)

    async def _prepare_messages(
        self, deps: AgentDeps, user_prompt: str, message_history: list[_messages.Message] | None
    ) -> tuple[int, list[_messages.Message]]:
        # if message history includes system prompts, we don't want to regenerate them
        if message_history and any(m.role == 'system' for m in message_history):
            # shallow copy messages
            messages = message_history.copy()
        else:
            messages = await self._init_messages(deps)
            if message_history:
                messages += message_history

        new_message_index = len(messages)
        messages.append(_messages.UserPrompt(user_prompt))
        return new_message_index, messages

    async def _handle_model_response(
        self, model_response: _messages.ModelAnyResponse, deps: AgentDeps
    ) -> _MarkFinalResult[ResultData] | list[_messages.Message]:
        """Process a non-streamed response from the model.

        Returns:
            Return `Either` — left: final result data, right: list of messages to send back to the model.
        """
        if model_response.role == 'model-text-response':
            # plain string response
            if self._allow_text_result:
                result_data_input = cast(ResultData, model_response.content)
                try:
                    result_data = await self._validate_result(result_data_input, deps, None)
                except _result.ToolRetryError as e:
                    self._incr_result_retry()
                    return [e.tool_retry]
                else:
                    return _MarkFinalResult(result_data)
            else:
                self._incr_result_retry()
                response = _messages.RetryPrompt(
                    content='Plain text responses are not permitted, please call one of the functions instead.',
                )
                return [response]
        elif model_response.role == 'model-structured-response':
            if self._result_schema is not None:
                # if there's a result schema, and any of the calls match one of its tools, return the result
                # NOTE: this means we ignore any other tools called here
                if match := self._result_schema.find_tool(model_response):
                    call, result_tool = match
                    try:
                        result_data = result_tool.validate(call)
                        result_data = await self._validate_result(result_data, deps, call)
                    except _result.ToolRetryError as e:
                        self._incr_result_retry()
                        return [e.tool_retry]
                    else:
                        return _MarkFinalResult(result_data)

            if not model_response.calls:
                raise exceptions.UnexpectedModelBehaviour('Received empty tool call message')

            # otherwise we run all retriever functions in parallel
            messages: list[_messages.Message] = []
            tasks: list[asyncio.Task[_messages.Message]] = []
            for call in model_response.calls:
                if retriever := self._retrievers.get(call.tool_name):
                    tasks.append(asyncio.create_task(retriever.run(deps, call), name=call.tool_name))
                else:
                    messages.append(self._unknown_tool(call.tool_name))

            with _logfire.span('running {tools=}', tools=[t.get_name() for t in tasks]):
                messages += await asyncio.gather(*tasks)
            return messages
        else:
            assert_never(model_response)

    async def _handle_streamed_model_response(
        self, model_response: models.EitherStreamedResponse, deps: AgentDeps
    ) -> _MarkFinalResult[models.EitherStreamedResponse] | list[_messages.Message]:
        """Process a streamed response from the model.

        TODO: change the response type to `models.EitherStreamedResponse | list[_messages.Message]` once we drop 3.9
        (with 3.9 we get `TypeError: Subscripted generics cannot be used with class and instance checks`)

        Returns:
            Return `Either` — left: final result data, right: list of messages to send back to the model.
        """
        if isinstance(model_response, models.StreamTextResponse):
            # plain string response
            if self._allow_text_result:
                return _MarkFinalResult(model_response)
            else:
                self._incr_result_retry()
                response = _messages.RetryPrompt(
                    content='Plain text responses are not permitted, please call one of the functions instead.',
                )
                # stream the response, so cost is correct
                async for _ in model_response:
                    pass

                return [response]
        else:
            assert isinstance(model_response, models.StreamStructuredResponse), f'Unexpected response: {model_response}'
            if self._result_schema is not None:
                # if there's a result schema, iterate over the stream until we find at least one tool
                # NOTE: this means we ignore any other tools called here
                structured_msg = model_response.get()
                while not structured_msg.calls:
                    try:
                        await model_response.__anext__()
                    except StopAsyncIteration:
                        break
                    structured_msg = model_response.get()

                if self._result_schema.find_tool(structured_msg):
                    return _MarkFinalResult(model_response)

            # the model is calling a retriever function, consume the response to get the next message
            async for _ in model_response:
                pass
            structured_msg = model_response.get()
            if not structured_msg.calls:
                raise exceptions.UnexpectedModelBehaviour('Received empty tool call message')
            messages: list[_messages.Message] = [structured_msg]

            # we now run all retriever functions in parallel
            tasks: list[asyncio.Task[_messages.Message]] = []
            for call in structured_msg.calls:
                if retriever := self._retrievers.get(call.tool_name):
                    tasks.append(asyncio.create_task(retriever.run(deps, call), name=call.tool_name))
                else:
                    messages.append(self._unknown_tool(call.tool_name))

            with _logfire.span('running {tools=}', tools=[t.get_name() for t in tasks]):
                messages += await asyncio.gather(*tasks)
            return messages

    async def _validate_result(
        self, result_data: ResultData, deps: AgentDeps, tool_call: _messages.ToolCall | None
    ) -> ResultData:
        for validator in self._result_validators:
            result_data = await validator.validate(result_data, deps, self._current_result_retry, tool_call)
        return result_data

    def _incr_result_retry(self) -> None:
        self._current_result_retry += 1
        if self._current_result_retry > self._max_result_retries:
            raise exceptions.UnexpectedModelBehaviour(
                f'Exceeded maximum retries ({self._max_result_retries}) for result validation'
            )

    async def _init_messages(self, deps: AgentDeps) -> list[_messages.Message]:
        """Build the initial messages for the conversation."""
        messages: list[_messages.Message] = [_messages.SystemPrompt(p) for p in self._system_prompts]
        for sys_prompt_runner in self._system_prompt_functions:
            prompt = await sys_prompt_runner.run(deps)
            messages.append(_messages.SystemPrompt(prompt))
        return messages

    def _unknown_tool(self, tool_name: str) -> _messages.RetryPrompt:
        self._incr_result_retry()
        names = list(self._retrievers.keys())
        if self._result_schema:
            names.extend(self._result_schema.tool_names())
        if names:
            msg = f'Available tools: {", ".join(names)}'
        else:
            msg = 'No tools available.'
        return _messages.RetryPrompt(content=f'Unknown tool name: {tool_name!r}. {msg}')

    def _get_deps(self, deps: AgentDeps) -> AgentDeps:
        """Get deps for a run.

        If we've overridden deps via `_override_deps_stack`, use that, otherwise use the deps passed to the call.

        We could do runtime type checking of deps against `self._deps_type`, but that's a slippery slope.
        """
        if some_deps := self._override_deps:
            return some_deps.value
        else:
            return deps

__init__

__init__(
    model: Model | KnownModelName | None = None,
    result_type: type[ResultData] = str,
    *,
    system_prompt: str | Sequence[str] = (),
    deps_type: type[AgentDeps] = NoneType,
    retries: int = 1,
    result_tool_name: str = "final_result",
    result_tool_description: str | None = None,
    result_retries: int | None = None,
    defer_model_check: bool = False
)

Create an agent.

Parameters:

Name Type Description Default
model Model | KnownModelName | None

The default model to use for this agent, if not provide, you must provide the model when calling the agent.

None
result_type type[ResultData]

The type of the result data, used to validate the result data, defaults to str.

str
system_prompt str | Sequence[str]

Static system prompts to use for this agent, you can also register system prompts via a function with system_prompt.

()
deps_type type[AgentDeps]

The type used for dependency injection, this parameter exists solely to allow you to fully parameterize the agent, and therefore get the best out of static type checking. If you're not using deps, but want type checking to pass, you can set deps=None to satisfy Pyright or add a type hint : Agent[None, <return type>].

NoneType
retries int

The default number of retries to allow before raising an error.

1
result_tool_name str

The name of the tool to use for the final result.

'final_result'
result_tool_description str | None

The description of the final result tool.

None
result_retries int | None

The maximum number of retries to allow for result validation, defaults to retries.

None
defer_model_check bool

by default, if you provide a named model, it's evaluated to create a Model instance immediately, which checks for the necessary environment variables. Set this to false to defer the evaluation until the first run. Useful if you want to override the model for testing.

False
Source code in pydantic_ai/agent.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def __init__(
    self,
    model: models.Model | models.KnownModelName | None = None,
    result_type: type[ResultData] = str,
    *,
    system_prompt: str | Sequence[str] = (),
    deps_type: type[AgentDeps] = NoneType,
    retries: int = 1,
    result_tool_name: str = 'final_result',
    result_tool_description: str | None = None,
    result_retries: int | None = None,
    defer_model_check: bool = False,
):
    """Create an agent.

    Args:
        model: The default model to use for this agent, if not provide,
            you must provide the model when calling the agent.
        result_type: The type of the result data, used to validate the result data, defaults to `str`.
        system_prompt: Static system prompts to use for this agent, you can also register system
            prompts via a function with [`system_prompt`][pydantic_ai.Agent.system_prompt].
        deps_type: The type used for dependency injection, this parameter exists solely to allow you to fully
            parameterize the agent, and therefore get the best out of static type checking.
            If you're not using deps, but want type checking to pass, you can set `deps=None` to satisfy Pyright
            or add a type hint `: Agent[None, <return type>]`.
        retries: The default number of retries to allow before raising an error.
        result_tool_name: The name of the tool to use for the final result.
        result_tool_description: The description of the final result tool.
        result_retries: The maximum number of retries to allow for result validation, defaults to `retries`.
        defer_model_check: by default, if you provide a [named][pydantic_ai.models.KnownModelName] model,
            it's evaluated to create a [`Model`][pydantic_ai.models.Model] instance immediately,
            which checks for the necessary environment variables. Set this to `false`
            to defer the evaluation until the first run. Useful if you want to
            [override the model][pydantic_ai.Agent.override_model] for testing.
    """
    if model is None or defer_model_check:
        self.model = model
    else:
        self.model = models.infer_model(model)

    self._result_schema = _result.ResultSchema[result_type].build(
        result_type, result_tool_name, result_tool_description
    )
    # if the result tool is None, or its schema allows `str`, we allow plain text results
    self._allow_text_result = self._result_schema is None or self._result_schema.allow_text_result

    self._system_prompts = (system_prompt,) if isinstance(system_prompt, str) else tuple(system_prompt)
    self._retrievers: dict[str, _r.Retriever[AgentDeps, Any]] = {}
    self._deps_type = deps_type
    self._default_retries = retries
    self._system_prompt_functions = []
    self._max_result_retries = result_retries if result_retries is not None else retries
    self._current_result_retry = 0
    self._result_validators = []

run async

run(
    user_prompt: str,
    *,
    message_history: list[Message] | None = None,
    model: Model | KnownModelName | None = None,
    deps: AgentDeps = None
) -> RunResult[ResultData]

Run the agent with a user prompt in async mode.

Parameters:

Name Type Description Default
user_prompt str

User input to start/continue the conversation.

required
message_history list[Message] | None

History of the conversation so far.

None
model Model | KnownModelName | None

Optional model to use for this run, required if model was not set when creating the agent.

None
deps AgentDeps

Optional dependencies to use for this run.

None

Returns:

Type Description
RunResult[ResultData]

The result of the run.

Source code in pydantic_ai/agent.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
async def run(
    self,
    user_prompt: str,
    *,
    message_history: list[_messages.Message] | None = None,
    model: models.Model | models.KnownModelName | None = None,
    deps: AgentDeps = None,
) -> result.RunResult[ResultData]:
    """Run the agent with a user prompt in async mode.

    Args:
        user_prompt: User input to start/continue the conversation.
        message_history: History of the conversation so far.
        model: Optional model to use for this run, required if `model` was not set when creating the agent.
        deps: Optional dependencies to use for this run.

    Returns:
        The result of the run.
    """
    model_used, custom_model, agent_model = await self._get_agent_model(model)

    deps = self._get_deps(deps)

    new_message_index, messages = await self._prepare_messages(deps, user_prompt, message_history)
    self.last_run_messages = messages

    for retriever in self._retrievers.values():
        retriever.reset()

    cost = result.Cost()

    with _logfire.span(
        'agent run {prompt=}',
        prompt=user_prompt,
        agent=self,
        custom_model=custom_model,
        model_name=model_used.name(),
    ) as run_span:
        run_step = 0
        while True:
            run_step += 1
            with _logfire.span('model request {run_step=}', run_step=run_step) as model_req_span:
                model_response, request_cost = await agent_model.request(messages)
                model_req_span.set_attribute('response', model_response)
                model_req_span.set_attribute('cost', request_cost)
                model_req_span.message = f'model request -> {model_response.role}'

            messages.append(model_response)
            cost += request_cost

            with _logfire.span('handle model response') as handle_span:
                either = await self._handle_model_response(model_response, deps)

                if isinstance(either, _MarkFinalResult):
                    # we have a final result, end the conversation
                    result_data = either.data
                    run_span.set_attribute('all_messages', messages)
                    run_span.set_attribute('cost', cost)
                    handle_span.set_attribute('result', result_data)
                    handle_span.message = 'handle model response -> final result'
                    return result.RunResult(messages, new_message_index, result_data, cost)
                else:
                    # continue the conversation
                    tool_responses = either
                    handle_span.set_attribute('tool_responses', tool_responses)
                    response_msgs = ' '.join(m.role for m in tool_responses)
                    handle_span.message = f'handle model response -> {response_msgs}'
                    messages.extend(tool_responses)

run_sync

run_sync(
    user_prompt: str,
    *,
    message_history: list[Message] | None = None,
    model: Model | KnownModelName | None = None,
    deps: AgentDeps = None
) -> RunResult[ResultData]

Run the agent with a user prompt synchronously.

This is a convenience method that wraps self.run with asyncio.run().

Parameters:

Name Type Description Default
user_prompt str

User input to start/continue the conversation.

required
message_history list[Message] | None

History of the conversation so far.

None
model Model | KnownModelName | None

Optional model to use for this run, required if model was not set when creating the agent.

None
deps AgentDeps

Optional dependencies to use for this run.

None

Returns:

Type Description
RunResult[ResultData]

The result of the run.

Source code in pydantic_ai/agent.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
def run_sync(
    self,
    user_prompt: str,
    *,
    message_history: list[_messages.Message] | None = None,
    model: models.Model | models.KnownModelName | None = None,
    deps: AgentDeps = None,
) -> result.RunResult[ResultData]:
    """Run the agent with a user prompt synchronously.

    This is a convenience method that wraps `self.run` with `asyncio.run()`.

    Args:
        user_prompt: User input to start/continue the conversation.
        message_history: History of the conversation so far.
        model: Optional model to use for this run, required if `model` was not set when creating the agent.
        deps: Optional dependencies to use for this run.

    Returns:
        The result of the run.
    """
    return asyncio.run(self.run(user_prompt, message_history=message_history, model=model, deps=deps))

run_stream async

run_stream(
    user_prompt: str,
    *,
    message_history: list[Message] | None = None,
    model: Model | KnownModelName | None = None,
    deps: AgentDeps = None
) -> AsyncIterator[
    StreamedRunResult[AgentDeps, ResultData]
]

Run the agent with a user prompt in async mode, returning a streamed response.

Parameters:

Name Type Description Default
user_prompt str

User input to start/continue the conversation.

required
message_history list[Message] | None

History of the conversation so far.

None
model Model | KnownModelName | None

Optional model to use for this run, required if model was not set when creating the agent.

None
deps AgentDeps

Optional dependencies to use for this run.

None

Returns:

Type Description
AsyncIterator[StreamedRunResult[AgentDeps, ResultData]]

The result of the run.

Source code in pydantic_ai/agent.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
@asynccontextmanager
async def run_stream(
    self,
    user_prompt: str,
    *,
    message_history: list[_messages.Message] | None = None,
    model: models.Model | models.KnownModelName | None = None,
    deps: AgentDeps = None,
) -> AsyncIterator[result.StreamedRunResult[AgentDeps, ResultData]]:
    """Run the agent with a user prompt in async mode, returning a streamed response.

    Args:
        user_prompt: User input to start/continue the conversation.
        message_history: History of the conversation so far.
        model: Optional model to use for this run, required if `model` was not set when creating the agent.
        deps: Optional dependencies to use for this run.

    Returns:
        The result of the run.
    """
    model_used, custom_model, agent_model = await self._get_agent_model(model)

    deps = self._get_deps(deps)

    new_message_index, messages = await self._prepare_messages(deps, user_prompt, message_history)
    self.last_run_messages = messages

    for retriever in self._retrievers.values():
        retriever.reset()

    cost = result.Cost()

    with _logfire.span(
        'agent run stream {prompt=}',
        prompt=user_prompt,
        agent=self,
        custom_model=custom_model,
        model_name=model_used.name(),
    ) as run_span:
        run_step = 0
        while True:
            run_step += 1
            with _logfire.span('model request {run_step=}', run_step=run_step) as model_req_span:
                async with agent_model.request_stream(messages) as model_response:
                    model_req_span.set_attribute('response_type', model_response.__class__.__name__)
                    # We want to end the "model request" span here, but we can't exit the context manager
                    # in the traditional way
                    model_req_span.__exit__(None, None, None)

                    with _logfire.span('handle model response') as handle_span:
                        either = await self._handle_streamed_model_response(model_response, deps)

                        if isinstance(either, _MarkFinalResult):
                            result_stream = either.data
                            run_span.set_attribute('all_messages', messages)
                            handle_span.set_attribute('result_type', result_stream.__class__.__name__)
                            handle_span.message = 'handle model response -> final result'
                            yield result.StreamedRunResult(
                                messages,
                                new_message_index,
                                cost,
                                result_stream,
                                self._result_schema,
                                deps,
                                self._result_validators,
                            )
                            return
                        else:
                            tool_responses = either
                            handle_span.set_attribute('tool_responses', tool_responses)
                            response_msgs = ' '.join(m.role for m in tool_responses)
                            handle_span.message = f'handle model response -> {response_msgs}'
                            messages.extend(tool_responses)
                            # the model_response should have been fully streamed by now, we can add it's cost
                            cost += model_response.cost()

model instance-attribute

model: Model | KnownModelName | None

The default model configured for this agent.

override_deps

override_deps(overriding_deps: AgentDeps) -> Iterator[None]

Context manager to temporarily override agent dependencies, this is particularly useful when testing.

Parameters:

Name Type Description Default
overriding_deps AgentDeps

The dependencies to use instead of the dependencies passed to the agent run.

required
Source code in pydantic_ai/agent.py
298
299
300
301
302
303
304
305
306
307
308
309
310
@contextmanager
def override_deps(self, overriding_deps: AgentDeps) -> Iterator[None]:
    """Context manager to temporarily override agent dependencies, this is particularly useful when testing.

    Args:
        overriding_deps: The dependencies to use instead of the dependencies passed to the agent run.
    """
    override_deps_before = self._override_deps
    self._override_deps = _utils.Some(overriding_deps)
    try:
        yield
    finally:
        self._override_deps = override_deps_before

override_model

override_model(
    overriding_model: Model | KnownModelName,
) -> Iterator[None]

Context manager to temporarily override the model used by the agent.

Parameters:

Name Type Description Default
overriding_model Model | KnownModelName

The model to use instead of the model passed to the agent run.

required
Source code in pydantic_ai/agent.py
312
313
314
315
316
317
318
319
320
321
322
323
324
@contextmanager
def override_model(self, overriding_model: models.Model | models.KnownModelName) -> Iterator[None]:
    """Context manager to temporarily override the model used by the agent.

    Args:
        overriding_model: The model to use instead of the model passed to the agent run.
    """
    override_model_before = self._override_model
    self._override_model = _utils.Some(models.infer_model(overriding_model))
    try:
        yield
    finally:
        self._override_model = override_model_before

last_run_messages class-attribute instance-attribute

last_run_messages: list[Message] | None = None

The messages from the last run, useful when a run raised an exception.

Note: these are not used by the agent, e.g. in future runs, they are just stored for developers' convenience.

system_prompt

Decorator to register a system prompt function that optionally takes CallContext as it's only argument.

Source code in pydantic_ai/agent.py
326
327
328
329
330
331
def system_prompt(
    self, func: _system_prompt.SystemPromptFunc[AgentDeps]
) -> _system_prompt.SystemPromptFunc[AgentDeps]:
    """Decorator to register a system prompt function that optionally takes `CallContext` as it's only argument."""
    self._system_prompt_functions.append(_system_prompt.SystemPromptRunner(func))
    return func

retriever_plain

retriever_plain(
    func: RetrieverPlainFunc[RetrieverParams],
) -> Retriever[AgentDeps, RetrieverParams]
retriever_plain(
    *, retries: int | None = None
) -> Callable[
    [RetrieverPlainFunc[RetrieverParams]],
    Retriever[AgentDeps, RetrieverParams],
]
retriever_plain(
    func: RetrieverPlainFunc[RetrieverParams] | None = None,
    /,
    *,
    retries: int | None = None,
) -> Any

Decorator to register a retriever function.

Source code in pydantic_ai/agent.py
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
def retriever_plain(
    self, func: RetrieverPlainFunc[RetrieverParams] | None = None, /, *, retries: int | None = None
) -> Any:
    """Decorator to register a retriever function."""
    if func is None:

        def retriever_decorator(
            func_: RetrieverPlainFunc[RetrieverParams],
        ) -> _r.Retriever[AgentDeps, RetrieverParams]:
            # noinspection PyTypeChecker
            return self._register_retriever(_utils.Either(right=func_), retries)

        return retriever_decorator
    else:
        return self._register_retriever(_utils.Either(right=func), retries)

retriever_context

retriever_context(
    func: RetrieverContextFunc[AgentDeps, RetrieverParams]
) -> Retriever[AgentDeps, RetrieverParams]
retriever_context(
    *, retries: int | None = None
) -> Callable[
    [RetrieverContextFunc[AgentDeps, RetrieverParams]],
    Retriever[AgentDeps, RetrieverParams],
]
retriever_context(
    func: (
        RetrieverContextFunc[AgentDeps, RetrieverParams]
        | None
    ) = None,
    /,
    *,
    retries: int | None = None,
) -> Any

Decorator to register a retriever function.

Source code in pydantic_ai/agent.py
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
def retriever_context(
    self,
    func: RetrieverContextFunc[AgentDeps, RetrieverParams] | None = None,
    /,
    *,
    retries: int | None = None,
) -> Any:
    """Decorator to register a retriever function."""
    if func is None:

        def retriever_decorator(
            func_: RetrieverContextFunc[AgentDeps, RetrieverParams],
        ) -> _r.Retriever[AgentDeps, RetrieverParams]:
            # noinspection PyTypeChecker
            return self._register_retriever(_utils.Either(left=func_), retries)

        return retriever_decorator
    else:
        # noinspection PyTypeChecker
        return self._register_retriever(_utils.Either(left=func), retries)

result_validator

Decorator to register a result validator function.

Source code in pydantic_ai/agent.py
333
334
335
336
337
338
def result_validator(
    self, func: _result.ResultValidatorFunc[AgentDeps, ResultData]
) -> _result.ResultValidatorFunc[AgentDeps, ResultData]:
    """Decorator to register a result validator function."""
    self._result_validators.append(_result.ResultValidator(func))
    return func

ModelRetry

Bases: Exception

Exception raised when a retriever function should be retried.

The agent will return the message to the model and ask it to try calling the function/tool again.

Source code in pydantic_ai/exceptions.py
 8
 9
10
11
12
13
14
15
16
17
18
19
class ModelRetry(Exception):
    """Exception raised when a retriever function should be retried.

    The agent will return the message to the model and ask it to try calling the function/tool again.
    """

    message: str
    """The message to return to the model."""

    def __init__(self, message: str):
        self.message = message
        super().__init__(message)

message instance-attribute

message: str = message

The message to return to the model.

UserError

Bases: RuntimeError

Error caused by a usage mistake by the application developer — You!

Source code in pydantic_ai/exceptions.py
22
23
24
25
26
27
28
29
30
class UserError(RuntimeError):
    """Error caused by a usage mistake by the application developer — You!"""

    message: str
    """Description of the mistake."""

    def __init__(self, message: str):
        self.message = message
        super().__init__(message)

message instance-attribute

message: str = message

Description of the mistake.

UnexpectedModelBehaviour

Bases: RuntimeError

Error caused by unexpected Model behavior, e.g. an unexpected response code.

Source code in pydantic_ai/exceptions.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
class UnexpectedModelBehaviour(RuntimeError):
    """Error caused by unexpected Model behavior, e.g. an unexpected response code."""

    message: str
    """Description of the unexpected behavior."""
    body: str | None
    """The body of the response, if available."""

    def __init__(self, message: str, body: str | None = None):
        self.message = message
        if body is None:
            self.body: str | None = None
        else:
            try:
                self.body = json.dumps(json.loads(body), indent=2)
            except ValueError:
                self.body = body
        super().__init__(message)

    def __str__(self) -> str:
        if self.body:
            return f'{self.message}, body:\n{self.body}'
        else:
            return self.message

message instance-attribute

message: str = message

Description of the unexpected behavior.

body instance-attribute

body: str | None = dumps(loads(body), indent=2)

The body of the response, if available.