langchain_postgres.checkpoint
.PostgresSaver¶
- class langchain_postgres.checkpoint.PostgresSaver[source]¶
Bases:
BaseCheckpointSaver
LangGraph checkpoint saver for Postgres.
This implementation of a checkpoint saver uses a Postgres database to save and retrieve checkpoints. It uses the psycopg3 package to interact with the Postgres database.
The checkpoint accepts either a sync_connection in the form of a psycopg.Connection or a psycopg.ConnectionPool object, or an async_connection in the form of a psycopg.AsyncConnection or psycopg.AsyncConnectionPool object.
Usage:
First time use: create schema in the database using the create_tables method or the async version acreate_tables method.
Create a PostgresCheckpoint object with a serializer and an appropriate connection object. It’s recommended to use a connection pool object for the connection. If using a connection object, you are responsible for closing the connection when done.
Examples:
Sync usage with a connection pool:
from psycopg_pool import ConnectionPool from langchain_postgres import ( PostgresCheckpoint, PickleCheckpointSerializer ) pool = ConnectionPool( # Example configuration conninfo="postgresql://user:password@localhost:5432/dbname", max_size=20, ) # Uses the pickle module for serialization # Make sure that you're only de-serializing trusted data # (e.g., payloads that you have serialized yourself). # Or implement a custom serializer. checkpoint = PostgresCheckpoint( serializer=PickleCheckpointSerializer(), sync_connection=pool, ) # Use the checkpoint object to put, get, list checkpoints, etc.
Async usage with a connection pool:
from psycopg_pool import AsyncConnectionPool from langchain_postgres import ( PostgresCheckpoint, PickleCheckpointSerializer ) pool = AsyncConnectionPool( # Example configuration conninfo="postgresql://user:password@localhost:5432/dbname", max_size=20, ) # Uses the pickle module for serialization # Make sure that you're only de-serializing trusted data # (e.g., payloads that you have serialized yourself). # Or implement a custom serializer. checkpoint = PostgresCheckpoint( serializer=PickleCheckpointSerializer(), async_connection=pool, ) # Use the checkpoint object to put, get, list checkpoints, etc.
Async usage with a connection object:
from psycopg import AsyncConnection from langchain_postgres import ( PostgresCheckpoint, PickleCheckpointSerializer ) conninfo="postgresql://user:password@localhost:5432/dbname" # Take care of closing the connection when done async with AsyncConnection(conninfo=conninfo) as conn: # Uses the pickle module for serialization # Make sure that you're only de-serializing trusted data # (e.g., payloads that you have serialized yourself). # Or implement a custom serializer. checkpoint = PostgresCheckpoint( serializer=PickleCheckpointSerializer(), async_connection=conn, ) # Use the checkpoint object to put, get, list checkpoints, etc. ...
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
- param async_connection: Optional[Union[AsyncConnection, AsyncConnectionPool]] = None¶
The asynchronous connection or pool to the Postgres database.
If providing a connection object, please ensure that the connection is open and remember to close the connection when done.
- param at: CheckpointAt = CheckpointAt.END_OF_RUN¶
- param serializer: CheckpointSerializer [Required]¶
The serializer for serializing and deserializing objects to and from bytes.
- param sync_connection: Optional[Union[Connection, ConnectionPool]] = None¶
The synchronous connection or pool to the Postgres database.
If providing a connection object, please ensure that the connection is open and remember to close the connection when done.
- async static acreate_tables(connection: Union[AsyncConnection, AsyncConnectionPool], /) None [source]¶
Create the schema for the checkpoint saver.
- Parameters
connection (Union[AsyncConnection, AsyncConnectionPool]) –
- Return type
None
- async static adrop_tables(connection: AsyncConnection, /) None [source]¶
Drop the table for the checkpoint saver.
- Parameters
connection (AsyncConnection) –
- Return type
None
- async aget(config: RunnableConfig) Optional[Checkpoint] ¶
- Parameters
config (RunnableConfig) –
- Return type
Optional[Checkpoint]
- async aget_tuple(config: RunnableConfig) Optional[CheckpointTuple] [source]¶
Get the checkpoint tuple for the given configuration.
- Parameters
config (RunnableConfig) – The configuration for the checkpoint. A dict with a configurable key which is a dict with a thread_id key and an optional thread_ts key. For example, { ‘configurable’: { ‘thread_id’: ‘test_thread’ } }
- Returns
The checkpoint tuple for the given configuration if it exists, otherwise None.
If thread_ts is None, the latest checkpoint is returned if it exists.
- Return type
Optional[CheckpointTuple]
- async alist(config: RunnableConfig) AsyncIterator[CheckpointTuple] [source]¶
Get all the checkpoints for the given configuration.
- Parameters
config (RunnableConfig) –
- Return type
AsyncIterator[CheckpointTuple]
- async aput(config: RunnableConfig, checkpoint: Checkpoint) RunnableConfig [source]¶
Put the checkpoint for the given configuration.
- Parameters
config (RunnableConfig) – The configuration for the checkpoint. A dict with a configurable key which is a dict with a thread_id key and an optional thread_ts key. For example, { ‘configurable’: { ‘thread_id’: ‘test_thread’ } }
checkpoint (Checkpoint) – The checkpoint to persist.
- Returns
The RunnableConfig that describes the checkpoint that was just created. It’ll contain the thread_id and thread_ts of the checkpoint.
- Return type
- classmethod construct(_fields_set: Optional[SetStr] = None, **values: Any) Model ¶
Creates a new model setting __dict__ and __fields_set__ from trusted or pre-validated data. Default values are respected, but no other validation is performed. Behaves as if Config.extra = ‘allow’ was set since it adds all passed values
- Parameters
_fields_set (Optional[SetStr]) –
values (Any) –
- Return type
Model
- copy(*, include: Optional[Union[AbstractSetIntStr, MappingIntStrAny]] = None, exclude: Optional[Union[AbstractSetIntStr, MappingIntStrAny]] = None, update: Optional[DictStrAny] = None, deep: bool = False) Model ¶
Duplicate a model, optionally choose which fields to include, exclude and change.
- Parameters
include (Optional[Union[AbstractSetIntStr, MappingIntStrAny]]) – fields to include in new model
exclude (Optional[Union[AbstractSetIntStr, MappingIntStrAny]]) – fields to exclude from new model, as with values this takes precedence over include
update (Optional[DictStrAny]) – values to change/add in the new model. Note: the data is not validated before creating the new model: you should trust this data
deep (bool) – set to True to make a deep copy of the model
self (Model) –
- Returns
new model instance
- Return type
Model
- static create_tables(connection: Union[Connection, ConnectionPool], /) None [source]¶
Create the schema for the checkpoint saver.
- Parameters
connection (Union[Connection, ConnectionPool]) –
- Return type
None
- dict(*, include: Optional[Union[AbstractSetIntStr, MappingIntStrAny]] = None, exclude: Optional[Union[AbstractSetIntStr, MappingIntStrAny]] = None, by_alias: bool = False, skip_defaults: Optional[bool] = None, exclude_unset: bool = False, exclude_defaults: bool = False, exclude_none: bool = False) DictStrAny ¶
Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.
- Parameters
include (Optional[Union[AbstractSetIntStr, MappingIntStrAny]]) –
exclude (Optional[Union[AbstractSetIntStr, MappingIntStrAny]]) –
by_alias (bool) –
skip_defaults (Optional[bool]) –
exclude_unset (bool) –
exclude_defaults (bool) –
exclude_none (bool) –
- Return type
DictStrAny
- static drop_tables(connection: Connection, /) None [source]¶
Drop the table for the checkpoint saver.
- Parameters
connection (Connection) –
- Return type
None
- classmethod from_orm(obj: Any) Model ¶
- Parameters
obj (Any) –
- Return type
Model
- get(config: RunnableConfig) Optional[Checkpoint] ¶
- Parameters
config (RunnableConfig) –
- Return type
Optional[Checkpoint]
- classmethod get_lc_namespace() List[str] ¶
Get the namespace of the langchain object.
For example, if the class is langchain.llms.openai.OpenAI, then the namespace is [“langchain”, “llms”, “openai”]
- Return type
List[str]
- get_tuple(config: RunnableConfig) Optional[CheckpointTuple] [source]¶
Get the checkpoint tuple for the given configuration.
- Parameters
config (RunnableConfig) – The configuration for the checkpoint. A dict with a configurable key which is a dict with a thread_id key and an optional thread_ts key. For example, { ‘configurable’: { ‘thread_id’: ‘test_thread’ } }
- Returns
The checkpoint tuple for the given configuration if it exists, otherwise None.
If thread_ts is None, the latest checkpoint is returned if it exists.
- Return type
Optional[CheckpointTuple]
- classmethod is_lc_serializable() bool ¶
Is this class serializable?
- Return type
bool
- json(*, include: Optional[Union[AbstractSetIntStr, MappingIntStrAny]] = None, exclude: Optional[Union[AbstractSetIntStr, MappingIntStrAny]] = None, by_alias: bool = False, skip_defaults: Optional[bool] = None, exclude_unset: bool = False, exclude_defaults: bool = False, exclude_none: bool = False, encoder: Optional[Callable[[Any], Any]] = None, models_as_dict: bool = True, **dumps_kwargs: Any) unicode ¶
Generate a JSON representation of the model, include and exclude arguments as per dict().
encoder is an optional function to supply as default to json.dumps(), other arguments as per json.dumps().
- Parameters
include (Optional[Union[AbstractSetIntStr, MappingIntStrAny]]) –
exclude (Optional[Union[AbstractSetIntStr, MappingIntStrAny]]) –
by_alias (bool) –
skip_defaults (Optional[bool]) –
exclude_unset (bool) –
exclude_defaults (bool) –
exclude_none (bool) –
encoder (Optional[Callable[[Any], Any]]) –
models_as_dict (bool) –
dumps_kwargs (Any) –
- Return type
unicode
- classmethod lc_id() List[str] ¶
A unique identifier for this class for serialization purposes.
The unique identifier is a list of strings that describes the path to the object.
- Return type
List[str]
- list(config: RunnableConfig) Generator[CheckpointTuple, None, None] [source]¶
Get all the checkpoints for the given configuration.
- Parameters
config (RunnableConfig) –
- Return type
Generator[CheckpointTuple, None, None]
- classmethod parse_file(path: Union[str, Path], *, content_type: unicode = None, encoding: unicode = 'utf8', proto: Protocol = None, allow_pickle: bool = False) Model ¶
- Parameters
path (Union[str, Path]) –
content_type (unicode) –
encoding (unicode) –
proto (Protocol) –
allow_pickle (bool) –
- Return type
Model
- classmethod parse_obj(obj: Any) Model ¶
- Parameters
obj (Any) –
- Return type
Model
- classmethod parse_raw(b: Union[str, bytes], *, content_type: unicode = None, encoding: unicode = 'utf8', proto: Protocol = None, allow_pickle: bool = False) Model ¶
- Parameters
b (Union[str, bytes]) –
content_type (unicode) –
encoding (unicode) –
proto (Protocol) –
allow_pickle (bool) –
- Return type
Model
- put(config: RunnableConfig, checkpoint: Checkpoint) RunnableConfig [source]¶
Put the checkpoint for the given configuration.
- Parameters
config (RunnableConfig) – The configuration for the checkpoint. A dict with a configurable key which is a dict with a thread_id key and an optional thread_ts key. For example, { ‘configurable’: { ‘thread_id’: ‘test_thread’ } }
checkpoint (Checkpoint) – The checkpoint to persist.
- Returns
The RunnableConfig that describes the checkpoint that was just created. It’ll contain the thread_id and thread_ts of the checkpoint.
- Return type
- classmethod schema(by_alias: bool = True, ref_template: unicode = '#/definitions/{model}') DictStrAny ¶
- Parameters
by_alias (bool) –
ref_template (unicode) –
- Return type
DictStrAny
- classmethod schema_json(*, by_alias: bool = True, ref_template: unicode = '#/definitions/{model}', **dumps_kwargs: Any) unicode ¶
- Parameters
by_alias (bool) –
ref_template (unicode) –
dumps_kwargs (Any) –
- Return type
unicode
- to_json() Union[SerializedConstructor, SerializedNotImplemented] ¶
- Return type
- to_json_not_implemented() SerializedNotImplemented ¶
- Return type
- classmethod update_forward_refs(**localns: Any) None ¶
Try to update ForwardRefs on fields based on this Model, globalns and localns.
- Parameters
localns (Any) –
- Return type
None
- classmethod validate(value: Any) Model ¶
- Parameters
value (Any) –
- Return type
Model
- property config_specs: list[langchain_core.runnables.utils.ConfigurableFieldSpec]¶
Return the configuration specs for this runnable.
- property lc_attributes: Dict¶
List of attribute names that should be included in the serialized kwargs.
These attributes must be accepted by the constructor.
- property lc_secrets: Dict[str, str]¶
A map of constructor argument names to secret ids.
- For example,
{“openai_api_key”: “OPENAI_API_KEY”}