Source code for

import base64
import re
from typing import Any, Iterator

from langchain_core._api.deprecation import deprecated
from langchain_core.chat_loaders import BaseChatLoader
from langchain_core.chat_sessions import ChatSession
from langchain_core.messages import HumanMessage

def _extract_email_content(msg: Any) -> HumanMessage:
    from_email = None
    for values in msg["payload"]["headers"]:
        name = values["name"]
        if name == "From":
            from_email = values["value"]
    if from_email is None:
        raise ValueError
    for part in msg["payload"]["parts"]:
        if part["mimeType"] == "text/plain":
            data = part["body"]["data"]
            data = base64.urlsafe_b64decode(data).decode("utf-8")
            # Regular expression to split the email body at the first
            # occurrence of a line that starts with "On ... wrote:"
            pattern = re.compile(r"\r\nOn .+(\r\n)*wrote:\r\n")
            # Split the email body and extract the first part
            newest_response = re.split(pattern, data)[0]
            message = HumanMessage(
                content=newest_response, additional_kwargs={"sender": from_email}
            return message
    raise ValueError

def _get_message_data(service: Any, message: Any) -> ChatSession:
    msg = service.users().messages().get(userId="me", id=message["id"]).execute()
    message_content = _extract_email_content(msg)
    in_reply_to = None
    email_data = msg["payload"]["headers"]
    for values in email_data:
        name = values["name"]
        if name == "In-Reply-To":
            in_reply_to = values["value"]
    if in_reply_to is None:
        raise ValueError

    thread_id = msg["threadId"]

    thread = service.users().threads().get(userId="me", id=thread_id).execute()
    messages = thread["messages"]

    response_email = None
    for message in messages:
        email_data = message["payload"]["headers"]
        for values in email_data:
            if values["name"] == "Message-ID":
                message_id = values["value"]
                if message_id == in_reply_to:
                    response_email = message
    if response_email is None:
        raise ValueError
    starter_content = _extract_email_content(response_email)
    return ChatSession(messages=[starter_content, message_content])

[docs]@deprecated( since="0.0.32", removal="0.3.0", alternative_import="langchain_google_community.GMailLoader", ) class GMailLoader(BaseChatLoader): """Load data from `GMail`. There are many ways you could want to load data from GMail. This loader is currently fairly opinionated in how to do so. The way it does it is it first looks for all messages that you have sent. It then looks for messages where you are responding to a previous email. It then fetches that previous email, and creates a training example of that email, followed by your email. Note that there are clear limitations here. For example, all examples created are only looking at the previous email for context. To use: - Set up a Google Developer Account: Go to the Google Developer Console, create a project, and enable the Gmail API for that project. This will give you a credentials.json file that you'll need later. """
[docs] def __init__(self, creds: Any, n: int = 100, raise_error: bool = False) -> None: super().__init__() self.creds = creds self.n = n self.raise_error = raise_error
[docs] def lazy_load(self) -> Iterator[ChatSession]: from googleapiclient.discovery import build service = build("gmail", "v1", credentials=self.creds) results = ( service.users() .messages() .list(userId="me", labelIds=["SENT"], maxResults=self.n) .execute() ) messages = results.get("messages", []) for message in messages: try: yield _get_message_data(service, message) except Exception as e: # TODO: handle errors better if self.raise_error: raise e else: pass