Source code for langchain_core.vectorstores

"""**Vector store** stores embedded data and performs vector search.

One of the most common ways to store and search over unstructured data is to
embed it and store the resulting embedding vectors, and then query the store
and retrieve the data that are 'most similar' to the embedded query.

**Class hierarchy:**

.. code-block::

    VectorStore --> <name>  # Examples: Annoy, FAISS, Milvus

    BaseRetriever --> VectorStoreRetriever --> <name>Retriever  # Example: VespaRetriever

**Main helpers:**

.. code-block::

    Embeddings, Document
"""  # noqa: E501
from __future__ import annotations

import logging
import math
import warnings
from abc import ABC, abstractmethod
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    ClassVar,
    Collection,
    Dict,
    Iterable,
    List,
    Optional,
    Tuple,
    Type,
    TypeVar,
)

from langchain_core.embeddings import Embeddings
from langchain_core.pydantic_v1 import Field, root_validator
from langchain_core.retrievers import BaseRetriever
from langchain_core.runnables.config import run_in_executor

if TYPE_CHECKING:
    from langchain_core.callbacks.manager import (
        AsyncCallbackManagerForRetrieverRun,
        CallbackManagerForRetrieverRun,
    )
    from langchain_core.documents import Document

logger = logging.getLogger(__name__)

VST = TypeVar("VST", bound="VectorStore")


[docs]class VectorStore(ABC): """Interface for vector store."""
[docs] @abstractmethod def add_texts( self, texts: Iterable[str], metadatas: Optional[List[dict]] = None, **kwargs: Any, ) -> List[str]: """Run more texts through the embeddings and add to the vectorstore. Args: texts: Iterable of strings to add to the vectorstore. metadatas: Optional list of metadatas associated with the texts. kwargs: vectorstore specific parameters Returns: List of ids from adding the texts into the vectorstore. """
@property def embeddings(self) -> Optional[Embeddings]: """Access the query embedding object if available.""" logger.debug( f"{Embeddings.__name__} is not implemented for {self.__class__.__name__}" ) return None
[docs] def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> Optional[bool]: """Delete by vector ID or other criteria. Args: ids: List of ids to delete. **kwargs: Other keyword arguments that subclasses might use. Returns: Optional[bool]: True if deletion is successful, False otherwise, None if not implemented. """ raise NotImplementedError("delete method must be implemented by subclass.")
[docs] async def adelete( self, ids: Optional[List[str]] = None, **kwargs: Any ) -> Optional[bool]: """Delete by vector ID or other criteria. Args: ids: List of ids to delete. **kwargs: Other keyword arguments that subclasses might use. Returns: Optional[bool]: True if deletion is successful, False otherwise, None if not implemented. """ return await run_in_executor(None, self.delete, ids, **kwargs)
[docs] async def aadd_texts( self, texts: Iterable[str], metadatas: Optional[List[dict]] = None, **kwargs: Any, ) -> List[str]: """Run more texts through the embeddings and add to the vectorstore.""" return await run_in_executor(None, self.add_texts, texts, metadatas, **kwargs)
[docs] def add_documents(self, documents: List[Document], **kwargs: Any) -> List[str]: """Run more documents through the embeddings and add to the vectorstore. Args: documents (List[Document]: Documents to add to the vectorstore. Returns: List[str]: List of IDs of the added texts. """ # TODO: Handle the case where the user doesn't provide ids on the Collection texts = [doc.page_content for doc in documents] metadatas = [doc.metadata for doc in documents] return self.add_texts(texts, metadatas, **kwargs)
[docs] async def aadd_documents( self, documents: List[Document], **kwargs: Any ) -> List[str]: """Run more documents through the embeddings and add to the vectorstore. Args: documents (List[Document]: Documents to add to the vectorstore. Returns: List[str]: List of IDs of the added texts. """ texts = [doc.page_content for doc in documents] metadatas = [doc.metadata for doc in documents] return await self.aadd_texts(texts, metadatas, **kwargs)
[docs] def search(self, query: str, search_type: str, **kwargs: Any) -> List[Document]: """Return docs most similar to query using specified search type.""" if search_type == "similarity": return self.similarity_search(query, **kwargs) elif search_type == "mmr": return self.max_marginal_relevance_search(query, **kwargs) else: raise ValueError( f"search_type of {search_type} not allowed. Expected " "search_type to be 'similarity' or 'mmr'." )
[docs] async def asearch( self, query: str, search_type: str, **kwargs: Any ) -> List[Document]: """Return docs most similar to query using specified search type.""" if search_type == "similarity": return await self.asimilarity_search(query, **kwargs) elif search_type == "mmr": return await self.amax_marginal_relevance_search(query, **kwargs) else: raise ValueError( f"search_type of {search_type} not allowed. Expected " "search_type to be 'similarity' or 'mmr'." )
@staticmethod def _euclidean_relevance_score_fn(distance: float) -> float: """Return a similarity score on a scale [0, 1].""" # The 'correct' relevance function # may differ depending on a few things, including: # - the distance / similarity metric used by the VectorStore # - the scale of your embeddings (OpenAI's are unit normed. Many # others are not!) # - embedding dimensionality # - etc. # This function converts the euclidean norm of normalized embeddings # (0 is most similar, sqrt(2) most dissimilar) # to a similarity function (0 to 1) return 1.0 - distance / math.sqrt(2) @staticmethod def _cosine_relevance_score_fn(distance: float) -> float: """Normalize the distance to a score on a scale [0, 1].""" return 1.0 - distance @staticmethod def _max_inner_product_relevance_score_fn(distance: float) -> float: """Normalize the distance to a score on a scale [0, 1].""" if distance > 0: return 1.0 - distance return -1.0 * distance def _select_relevance_score_fn(self) -> Callable[[float], float]: """ The 'correct' relevance function may differ depending on a few things, including: - the distance / similarity metric used by the VectorStore - the scale of your embeddings (OpenAI's are unit normed. Many others are not!) - embedding dimensionality - etc. Vectorstores should define their own selection based method of relevance. """ raise NotImplementedError
[docs] def similarity_search_with_score( self, *args: Any, **kwargs: Any ) -> List[Tuple[Document, float]]: """Run similarity search with distance.""" raise NotImplementedError
[docs] async def asimilarity_search_with_score( self, *args: Any, **kwargs: Any ) -> List[Tuple[Document, float]]: """Run similarity search with distance asynchronously.""" # This is a temporary workaround to make the similarity search # asynchronous. The proper solution is to make the similarity search # asynchronous in the vector store implementations. return await run_in_executor( None, self.similarity_search_with_score, *args, **kwargs )
def _similarity_search_with_relevance_scores( self, query: str, k: int = 4, **kwargs: Any, ) -> List[Tuple[Document, float]]: """ Default similarity search with relevance scores. Modify if necessary in subclass. Return docs and relevance scores in the range [0, 1]. 0 is dissimilar, 1 is most similar. Args: query: input text k: Number of Documents to return. Defaults to 4. **kwargs: kwargs to be passed to similarity search. Should include: score_threshold: Optional, a floating point value between 0 to 1 to filter the resulting set of retrieved docs Returns: List of Tuples of (doc, similarity_score) """ relevance_score_fn = self._select_relevance_score_fn() docs_and_scores = self.similarity_search_with_score(query, k, **kwargs) return [(doc, relevance_score_fn(score)) for doc, score in docs_and_scores] async def _asimilarity_search_with_relevance_scores( self, query: str, k: int = 4, **kwargs: Any, ) -> List[Tuple[Document, float]]: """ Default async similarity search with relevance scores. Modify if necessary in subclass. Return docs and relevance scores in the range [0, 1]. 0 is dissimilar, 1 is most similar. Args: query: input text k: Number of Documents to return. Defaults to 4. **kwargs: kwargs to be passed to similarity search. Should include: score_threshold: Optional, a floating point value between 0 to 1 to filter the resulting set of retrieved docs Returns: List of Tuples of (doc, similarity_score) """ relevance_score_fn = self._select_relevance_score_fn() docs_and_scores = await self.asimilarity_search_with_score(query, k, **kwargs) return [(doc, relevance_score_fn(score)) for doc, score in docs_and_scores]
[docs] def similarity_search_with_relevance_scores( self, query: str, k: int = 4, **kwargs: Any, ) -> List[Tuple[Document, float]]: """Return docs and relevance scores in the range [0, 1]. 0 is dissimilar, 1 is most similar. Args: query: input text k: Number of Documents to return. Defaults to 4. **kwargs: kwargs to be passed to similarity search. Should include: score_threshold: Optional, a floating point value between 0 to 1 to filter the resulting set of retrieved docs Returns: List of Tuples of (doc, similarity_score) """ score_threshold = kwargs.pop("score_threshold", None) docs_and_similarities = self._similarity_search_with_relevance_scores( query, k=k, **kwargs ) if any( similarity < 0.0 or similarity > 1.0 for _, similarity in docs_and_similarities ): warnings.warn( "Relevance scores must be between" f" 0 and 1, got {docs_and_similarities}" ) if score_threshold is not None: docs_and_similarities = [ (doc, similarity) for doc, similarity in docs_and_similarities if similarity >= score_threshold ] if len(docs_and_similarities) == 0: warnings.warn( "No relevant docs were retrieved using the relevance score" f" threshold {score_threshold}" ) return docs_and_similarities
[docs] async def asimilarity_search_with_relevance_scores( self, query: str, k: int = 4, **kwargs: Any, ) -> List[Tuple[Document, float]]: """Return docs and relevance scores in the range [0, 1], asynchronously. 0 is dissimilar, 1 is most similar. Args: query: input text k: Number of Documents to return. Defaults to 4. **kwargs: kwargs to be passed to similarity search. Should include: score_threshold: Optional, a floating point value between 0 to 1 to filter the resulting set of retrieved docs Returns: List of Tuples of (doc, similarity_score) """ score_threshold = kwargs.pop("score_threshold", None) docs_and_similarities = await self._asimilarity_search_with_relevance_scores( query, k=k, **kwargs ) if any( similarity < 0.0 or similarity > 1.0 for _, similarity in docs_and_similarities ): warnings.warn( "Relevance scores must be between" f" 0 and 1, got {docs_and_similarities}" ) if score_threshold is not None: docs_and_similarities = [ (doc, similarity) for doc, similarity in docs_and_similarities if similarity >= score_threshold ] if len(docs_and_similarities) == 0: warnings.warn( "No relevant docs were retrieved using the relevance score" f" threshold {score_threshold}" ) return docs_and_similarities
[docs] def similarity_search_by_vector( self, embedding: List[float], k: int = 4, **kwargs: Any ) -> List[Document]: """Return docs most similar to embedding vector. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. Returns: List of Documents most similar to the query vector. """ raise NotImplementedError
[docs] async def asimilarity_search_by_vector( self, embedding: List[float], k: int = 4, **kwargs: Any ) -> List[Document]: """Return docs most similar to embedding vector.""" # This is a temporary workaround to make the similarity search # asynchronous. The proper solution is to make the similarity search # asynchronous in the vector store implementations. return await run_in_executor( None, self.similarity_search_by_vector, embedding, k=k, **kwargs )
[docs] def max_marginal_relevance_search_by_vector( self, embedding: List[float], k: int = 4, fetch_k: int = 20, lambda_mult: float = 0.5, **kwargs: Any, ) -> List[Document]: """Return docs selected using the maximal marginal relevance. Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. fetch_k: Number of Documents to fetch to pass to MMR algorithm. lambda_mult: Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. Returns: List of Documents selected by maximal marginal relevance. """ raise NotImplementedError
[docs] async def amax_marginal_relevance_search_by_vector( self, embedding: List[float], k: int = 4, fetch_k: int = 20, lambda_mult: float = 0.5, **kwargs: Any, ) -> List[Document]: """Return docs selected using the maximal marginal relevance.""" return await run_in_executor( None, self.max_marginal_relevance_search_by_vector, embedding, k=k, fetch_k=fetch_k, lambda_mult=lambda_mult, **kwargs, )
[docs] @classmethod def from_documents( cls: Type[VST], documents: List[Document], embedding: Embeddings, **kwargs: Any, ) -> VST: """Return VectorStore initialized from documents and embeddings.""" texts = [d.page_content for d in documents] metadatas = [d.metadata for d in documents] return cls.from_texts(texts, embedding, metadatas=metadatas, **kwargs)
[docs] @classmethod async def afrom_documents( cls: Type[VST], documents: List[Document], embedding: Embeddings, **kwargs: Any, ) -> VST: """Return VectorStore initialized from documents and embeddings.""" texts = [d.page_content for d in documents] metadatas = [d.metadata for d in documents] return await cls.afrom_texts(texts, embedding, metadatas=metadatas, **kwargs)
[docs] @classmethod @abstractmethod def from_texts( cls: Type[VST], texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, **kwargs: Any, ) -> VST: """Return VectorStore initialized from texts and embeddings."""
[docs] @classmethod async def afrom_texts( cls: Type[VST], texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, **kwargs: Any, ) -> VST: """Return VectorStore initialized from texts and embeddings.""" return await run_in_executor( None, cls.from_texts, texts, embedding, metadatas, **kwargs )
def _get_retriever_tags(self) -> List[str]: """Get tags for retriever.""" tags = [self.__class__.__name__] if self.embeddings: tags.append(self.embeddings.__class__.__name__) return tags
[docs] def as_retriever(self, **kwargs: Any) -> VectorStoreRetriever: """Return VectorStoreRetriever initialized from this VectorStore. Args: search_type (Optional[str]): Defines the type of search that the Retriever should perform. Can be "similarity" (default), "mmr", or "similarity_score_threshold". search_kwargs (Optional[Dict]): Keyword arguments to pass to the search function. Can include things like: k: Amount of documents to return (Default: 4) score_threshold: Minimum relevance threshold for similarity_score_threshold fetch_k: Amount of documents to pass to MMR algorithm (Default: 20) lambda_mult: Diversity of results returned by MMR; 1 for minimum diversity and 0 for maximum. (Default: 0.5) filter: Filter by document metadata Returns: VectorStoreRetriever: Retriever class for VectorStore. Examples: .. code-block:: python # Retrieve more documents with higher diversity # Useful if your dataset has many similar documents docsearch.as_retriever( search_type="mmr", search_kwargs={'k': 6, 'lambda_mult': 0.25} ) # Fetch more documents for the MMR algorithm to consider # But only return the top 5 docsearch.as_retriever( search_type="mmr", search_kwargs={'k': 5, 'fetch_k': 50} ) # Only retrieve documents that have a relevance score # Above a certain threshold docsearch.as_retriever( search_type="similarity_score_threshold", search_kwargs={'score_threshold': 0.8} ) # Only get the single most similar document from the dataset docsearch.as_retriever(search_kwargs={'k': 1}) # Use a filter to only retrieve documents from a specific paper docsearch.as_retriever( search_kwargs={'filter': {'paper_title':'GPT-4 Technical Report'}} ) """ tags = kwargs.pop("tags", None) or [] tags.extend(self._get_retriever_tags()) return VectorStoreRetriever(vectorstore=self, **kwargs, tags=tags)
[docs]class VectorStoreRetriever(BaseRetriever): """Base Retriever class for VectorStore.""" vectorstore: VectorStore """VectorStore to use for retrieval.""" search_type: str = "similarity" """Type of search to perform. Defaults to "similarity".""" search_kwargs: dict = Field(default_factory=dict) """Keyword arguments to pass to the search function.""" allowed_search_types: ClassVar[Collection[str]] = ( "similarity", "similarity_score_threshold", "mmr", ) class Config: """Configuration for this pydantic object.""" arbitrary_types_allowed = True @root_validator() def validate_search_type(cls, values: Dict) -> Dict: """Validate search type.""" search_type = values["search_type"] if search_type not in cls.allowed_search_types: raise ValueError( f"search_type of {search_type} not allowed. Valid values are: " f"{cls.allowed_search_types}" ) if search_type == "similarity_score_threshold": score_threshold = values["search_kwargs"].get("score_threshold") if (score_threshold is None) or (not isinstance(score_threshold, float)): raise ValueError( "`score_threshold` is not specified with a float value(0~1) " "in `search_kwargs`." ) return values def _get_relevant_documents( self, query: str, *, run_manager: CallbackManagerForRetrieverRun ) -> List[Document]: if self.search_type == "similarity": docs = self.vectorstore.similarity_search(query, **self.search_kwargs) elif self.search_type == "similarity_score_threshold": docs_and_similarities = ( self.vectorstore.similarity_search_with_relevance_scores( query, **self.search_kwargs ) ) docs = [doc for doc, _ in docs_and_similarities] elif self.search_type == "mmr": docs = self.vectorstore.max_marginal_relevance_search( query, **self.search_kwargs ) else: raise ValueError(f"search_type of {self.search_type} not allowed.") return docs async def _aget_relevant_documents( self, query: str, *, run_manager: AsyncCallbackManagerForRetrieverRun ) -> List[Document]: if self.search_type == "similarity": docs = await self.vectorstore.asimilarity_search( query, **self.search_kwargs ) elif self.search_type == "similarity_score_threshold": docs_and_similarities = ( await self.vectorstore.asimilarity_search_with_relevance_scores( query, **self.search_kwargs ) ) docs = [doc for doc, _ in docs_and_similarities] elif self.search_type == "mmr": docs = await self.vectorstore.amax_marginal_relevance_search( query, **self.search_kwargs ) else: raise ValueError(f"search_type of {self.search_type} not allowed.") return docs
[docs] def add_documents(self, documents: List[Document], **kwargs: Any) -> List[str]: """Add documents to vectorstore.""" return self.vectorstore.add_documents(documents, **kwargs)
[docs] async def aadd_documents( self, documents: List[Document], **kwargs: Any ) -> List[str]: """Add documents to vectorstore.""" return await self.vectorstore.aadd_documents(documents, **kwargs)