langchain_postgres.checkpoint.PostgresSaver

class langchain_postgres.checkpoint.PostgresSaver[source]

Bases: BaseCheckpointSaver

LangGraph checkpoint saver for Postgres.

This implementation of a checkpoint saver uses a Postgres database to save and retrieve checkpoints. It uses the psycopg3 package to interact with the Postgres database.

The checkpoint accepts either a sync_connection in the form of a psycopg.Connection or a psycopg.ConnectionPool object, or an async_connection in the form of a psycopg.AsyncConnection or psycopg.AsyncConnectionPool object.

Usage:

  1. First time use: create schema in the database using the create_tables method or the async version acreate_tables method.

  2. Create a PostgresCheckpoint object with a serializer and an appropriate connection object. It’s recommended to use a connection pool object for the connection. If using a connection object, you are responsible for closing the connection when done.

Examples:

Sync usage with a connection pool:

from psycopg_pool import ConnectionPool
from langchain_postgres import (
    PostgresCheckpoint, PickleCheckpointSerializer
)

pool = ConnectionPool(
    # Example configuration
    conninfo="postgresql://user:password@localhost:5432/dbname",
    max_size=20,
)

# Uses the pickle module for serialization
# Make sure that you're only de-serializing trusted data
# (e.g., payloads that you have serialized yourself).
# Or implement a custom serializer.
checkpoint = PostgresCheckpoint(
    serializer=PickleCheckpointSerializer(),
    sync_connection=pool,
)

# Use the checkpoint object to put, get, list checkpoints, etc.

Async usage with a connection pool:

from psycopg_pool import AsyncConnectionPool
from langchain_postgres import (
    PostgresCheckpoint, PickleCheckpointSerializer
)

pool = AsyncConnectionPool(
    # Example configuration
    conninfo="postgresql://user:password@localhost:5432/dbname",
    max_size=20,
)

# Uses the pickle module for serialization
# Make sure that you're only de-serializing trusted data
# (e.g., payloads that you have serialized yourself).
# Or implement a custom serializer.
checkpoint = PostgresCheckpoint(
    serializer=PickleCheckpointSerializer(),
    async_connection=pool,
)

# Use the checkpoint object to put, get, list checkpoints, etc.

Async usage with a connection object:

from psycopg import AsyncConnection
from langchain_postgres import (
    PostgresCheckpoint, PickleCheckpointSerializer
)

conninfo="postgresql://user:password@localhost:5432/dbname"
# Take care of closing the connection when done
async with AsyncConnection(conninfo=conninfo) as conn:
    # Uses the pickle module for serialization
    # Make sure that you're only de-serializing trusted data
    # (e.g., payloads that you have serialized yourself).
    # Or implement a custom serializer.
    checkpoint = PostgresCheckpoint(
        serializer=PickleCheckpointSerializer(),
        async_connection=conn,
    )

    # Use the checkpoint object to put, get, list checkpoints, etc.
    ...

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

param async_connection: Optional[Union[AsyncConnection, AsyncConnectionPool]] = None

The asynchronous connection or pool to the Postgres database.

If providing a connection object, please ensure that the connection is open and remember to close the connection when done.

param at: CheckpointAt = CheckpointAt.END_OF_RUN
param serializer: CheckpointSerializer [Required]

The serializer for serializing and deserializing objects to and from bytes.

param sync_connection: Optional[Union[Connection, ConnectionPool]] = None

The synchronous connection or pool to the Postgres database.

If providing a connection object, please ensure that the connection is open and remember to close the connection when done.

async static acreate_tables(connection: Union[AsyncConnection, AsyncConnectionPool], /) None[source]

Create the schema for the checkpoint saver.

Parameters

connection (Union[AsyncConnection, AsyncConnectionPool]) –

Return type

None

async static adrop_tables(connection: AsyncConnection, /) None[source]

Drop the table for the checkpoint saver.

Parameters

connection (AsyncConnection) –

Return type

None

async aget(config: RunnableConfig) Optional[Checkpoint]
Parameters

config (RunnableConfig) –

Return type

Optional[Checkpoint]

async aget_tuple(config: RunnableConfig) Optional[CheckpointTuple][source]

Get the checkpoint tuple for the given configuration.

Parameters

config (RunnableConfig) – The configuration for the checkpoint. A dict with a configurable key which is a dict with a thread_id key and an optional thread_ts key. For example, { ‘configurable’: { ‘thread_id’: ‘test_thread’ } }

Returns

The checkpoint tuple for the given configuration if it exists, otherwise None.

If thread_ts is None, the latest checkpoint is returned if it exists.

Return type

Optional[CheckpointTuple]

async alist(config: RunnableConfig) AsyncIterator[CheckpointTuple][source]

Get all the checkpoints for the given configuration.

Parameters

config (RunnableConfig) –

Return type

AsyncIterator[CheckpointTuple]

async aput(config: RunnableConfig, checkpoint: Checkpoint) RunnableConfig[source]

Put the checkpoint for the given configuration.

Parameters
  • config (RunnableConfig) – The configuration for the checkpoint. A dict with a configurable key which is a dict with a thread_id key and an optional thread_ts key. For example, { ‘configurable’: { ‘thread_id’: ‘test_thread’ } }

  • checkpoint (Checkpoint) – The checkpoint to persist.

Returns

The RunnableConfig that describes the checkpoint that was just created. It’ll contain the thread_id and thread_ts of the checkpoint.

Return type

RunnableConfig

classmethod construct(_fields_set: Optional[SetStr] = None, **values: Any) Model

Creates a new model setting __dict__ and __fields_set__ from trusted or pre-validated data. Default values are respected, but no other validation is performed. Behaves as if Config.extra = ‘allow’ was set since it adds all passed values

Parameters
  • _fields_set (Optional[SetStr]) –

  • values (Any) –

Return type

Model

copy(*, include: Optional[Union[AbstractSetIntStr, MappingIntStrAny]] = None, exclude: Optional[Union[AbstractSetIntStr, MappingIntStrAny]] = None, update: Optional[DictStrAny] = None, deep: bool = False) Model

Duplicate a model, optionally choose which fields to include, exclude and change.

Parameters
  • include (Optional[Union[AbstractSetIntStr, MappingIntStrAny]]) – fields to include in new model

  • exclude (Optional[Union[AbstractSetIntStr, MappingIntStrAny]]) – fields to exclude from new model, as with values this takes precedence over include

  • update (Optional[DictStrAny]) – values to change/add in the new model. Note: the data is not validated before creating the new model: you should trust this data

  • deep (bool) – set to True to make a deep copy of the model

  • self (Model) –

Returns

new model instance

Return type

Model

static create_tables(connection: Union[Connection, ConnectionPool], /) None[source]

Create the schema for the checkpoint saver.

Parameters

connection (Union[Connection, ConnectionPool]) –

Return type

None

dict(*, include: Optional[Union[AbstractSetIntStr, MappingIntStrAny]] = None, exclude: Optional[Union[AbstractSetIntStr, MappingIntStrAny]] = None, by_alias: bool = False, skip_defaults: Optional[bool] = None, exclude_unset: bool = False, exclude_defaults: bool = False, exclude_none: bool = False) DictStrAny

Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.

Parameters
  • include (Optional[Union[AbstractSetIntStr, MappingIntStrAny]]) –

  • exclude (Optional[Union[AbstractSetIntStr, MappingIntStrAny]]) –

  • by_alias (bool) –

  • skip_defaults (Optional[bool]) –

  • exclude_unset (bool) –

  • exclude_defaults (bool) –

  • exclude_none (bool) –

Return type

DictStrAny

static drop_tables(connection: Connection, /) None[source]

Drop the table for the checkpoint saver.

Parameters

connection (Connection) –

Return type

None

classmethod from_orm(obj: Any) Model
Parameters

obj (Any) –

Return type

Model

get(config: RunnableConfig) Optional[Checkpoint]
Parameters

config (RunnableConfig) –

Return type

Optional[Checkpoint]

classmethod get_lc_namespace() List[str]

Get the namespace of the langchain object.

For example, if the class is langchain.llms.openai.OpenAI, then the namespace is [“langchain”, “llms”, “openai”]

Return type

List[str]

get_tuple(config: RunnableConfig) Optional[CheckpointTuple][source]

Get the checkpoint tuple for the given configuration.

Parameters

config (RunnableConfig) – The configuration for the checkpoint. A dict with a configurable key which is a dict with a thread_id key and an optional thread_ts key. For example, { ‘configurable’: { ‘thread_id’: ‘test_thread’ } }

Returns

The checkpoint tuple for the given configuration if it exists, otherwise None.

If thread_ts is None, the latest checkpoint is returned if it exists.

Return type

Optional[CheckpointTuple]

classmethod is_lc_serializable() bool

Is this class serializable?

Return type

bool

json(*, include: Optional[Union[AbstractSetIntStr, MappingIntStrAny]] = None, exclude: Optional[Union[AbstractSetIntStr, MappingIntStrAny]] = None, by_alias: bool = False, skip_defaults: Optional[bool] = None, exclude_unset: bool = False, exclude_defaults: bool = False, exclude_none: bool = False, encoder: Optional[Callable[[Any], Any]] = None, models_as_dict: bool = True, **dumps_kwargs: Any) unicode

Generate a JSON representation of the model, include and exclude arguments as per dict().

encoder is an optional function to supply as default to json.dumps(), other arguments as per json.dumps().

Parameters
  • include (Optional[Union[AbstractSetIntStr, MappingIntStrAny]]) –

  • exclude (Optional[Union[AbstractSetIntStr, MappingIntStrAny]]) –

  • by_alias (bool) –

  • skip_defaults (Optional[bool]) –

  • exclude_unset (bool) –

  • exclude_defaults (bool) –

  • exclude_none (bool) –

  • encoder (Optional[Callable[[Any], Any]]) –

  • models_as_dict (bool) –

  • dumps_kwargs (Any) –

Return type

unicode

classmethod lc_id() List[str]

A unique identifier for this class for serialization purposes.

The unique identifier is a list of strings that describes the path to the object.

Return type

List[str]

list(config: RunnableConfig) Generator[CheckpointTuple, None, None][source]

Get all the checkpoints for the given configuration.

Parameters

config (RunnableConfig) –

Return type

Generator[CheckpointTuple, None, None]

classmethod parse_file(path: Union[str, Path], *, content_type: unicode = None, encoding: unicode = 'utf8', proto: Protocol = None, allow_pickle: bool = False) Model
Parameters
  • path (Union[str, Path]) –

  • content_type (unicode) –

  • encoding (unicode) –

  • proto (Protocol) –

  • allow_pickle (bool) –

Return type

Model

classmethod parse_obj(obj: Any) Model
Parameters

obj (Any) –

Return type

Model

classmethod parse_raw(b: Union[str, bytes], *, content_type: unicode = None, encoding: unicode = 'utf8', proto: Protocol = None, allow_pickle: bool = False) Model
Parameters
  • b (Union[str, bytes]) –

  • content_type (unicode) –

  • encoding (unicode) –

  • proto (Protocol) –

  • allow_pickle (bool) –

Return type

Model

put(config: RunnableConfig, checkpoint: Checkpoint) RunnableConfig[source]

Put the checkpoint for the given configuration.

Parameters
  • config (RunnableConfig) – The configuration for the checkpoint. A dict with a configurable key which is a dict with a thread_id key and an optional thread_ts key. For example, { ‘configurable’: { ‘thread_id’: ‘test_thread’ } }

  • checkpoint (Checkpoint) – The checkpoint to persist.

Returns

The RunnableConfig that describes the checkpoint that was just created. It’ll contain the thread_id and thread_ts of the checkpoint.

Return type

RunnableConfig

classmethod schema(by_alias: bool = True, ref_template: unicode = '#/definitions/{model}') DictStrAny
Parameters
  • by_alias (bool) –

  • ref_template (unicode) –

Return type

DictStrAny

classmethod schema_json(*, by_alias: bool = True, ref_template: unicode = '#/definitions/{model}', **dumps_kwargs: Any) unicode
Parameters
  • by_alias (bool) –

  • ref_template (unicode) –

  • dumps_kwargs (Any) –

Return type

unicode

to_json() Union[SerializedConstructor, SerializedNotImplemented]
Return type

Union[SerializedConstructor, SerializedNotImplemented]

to_json_not_implemented() SerializedNotImplemented
Return type

SerializedNotImplemented

classmethod update_forward_refs(**localns: Any) None

Try to update ForwardRefs on fields based on this Model, globalns and localns.

Parameters

localns (Any) –

Return type

None

classmethod validate(value: Any) Model
Parameters

value (Any) –

Return type

Model

property config_specs: list[langchain_core.runnables.utils.ConfigurableFieldSpec]

Return the configuration specs for this runnable.

property lc_attributes: Dict

List of attribute names that should be included in the serialized kwargs.

These attributes must be accepted by the constructor.

property lc_secrets: Dict[str, str]

A map of constructor argument names to secret ids.

For example,

{“openai_api_key”: “OPENAI_API_KEY”}