Source code for

"""Gmail tool utils."""

from __future__ import annotations

import logging
import os
from typing import TYPE_CHECKING, List, Optional, Tuple

from langchain_core.utils import guard_import

    from google.auth.transport.requests import Request
    from google.oauth2.credentials import Credentials
    from google_auth_oauthlib.flow import InstalledAppFlow
    from googleapiclient.discovery import Resource
    from googleapiclient.discovery import build as build_resource

logger = logging.getLogger(__name__)

[docs]def import_google() -> Tuple[Request, Credentials]: """Import google libraries. Returns: Tuple[Request, Credentials]: Request and Credentials classes. """ return ( guard_import( module_name="google.auth.transport.requests", pip_name="google-auth-httplib2", ).Request, guard_import( module_name="google.oauth2.credentials", pip_name="google-auth-httplib2" ).Credentials, )
[docs]def import_installed_app_flow() -> InstalledAppFlow: """Import InstalledAppFlow class. Returns: InstalledAppFlow: InstalledAppFlow class. """ return guard_import( module_name="google_auth_oauthlib.flow", pip_name="google-auth-oauthlib" ).InstalledAppFlow
[docs]def import_googleapiclient_resource_builder() -> build_resource: """Import function. Returns: build_resource: function. """ return guard_import( module_name="googleapiclient.discovery", pip_name="google-api-python-client" ).build
[docs]def get_gmail_credentials( token_file: Optional[str] = None, client_secrets_file: Optional[str] = None, scopes: Optional[List[str]] = None, ) -> Credentials: """Get credentials.""" # From Request, Credentials = import_google() InstalledAppFlow = import_installed_app_flow() creds = None scopes = scopes or DEFAULT_SCOPES token_file = token_file or DEFAULT_CREDS_TOKEN_FILE client_secrets_file = client_secrets_file or DEFAULT_CLIENT_SECRETS_FILE # The file token.json stores the user's access and refresh tokens, and is # created automatically when the authorization flow completes for the first # time. if os.path.exists(token_file): creds = Credentials.from_authorized_user_file(token_file, scopes) # If there are no (valid) credentials available, let the user log in. if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else: # # noqa flow = InstalledAppFlow.from_client_secrets_file( client_secrets_file, scopes ) creds = flow.run_local_server(port=0) # Save the credentials for the next run with open(token_file, "w") as token: token.write(creds.to_json()) return creds
[docs]def build_resource_service( credentials: Optional[Credentials] = None, service_name: str = "gmail", service_version: str = "v1", ) -> Resource: """Build a Gmail service.""" credentials = credentials or get_gmail_credentials() builder = import_googleapiclient_resource_builder() return builder(service_name, service_version, credentials=credentials)
[docs]def clean_email_body(body: str) -> str: """Clean email body.""" try: from bs4 import BeautifulSoup try: soup = BeautifulSoup(str(body), "html.parser") body = soup.get_text() return str(body) except Exception as e: logger.error(e) return str(body) except ImportError: logger.warning("BeautifulSoup not installed. Skipping cleaning.") return str(body)