| from datetime import datetime |
| import json |
| import base64 |
| from googleapiclient.discovery import build |
| from utils.auth import authenticate_google_services |
|
|
| class Gmail_Agent: |
| def __init__(self, creds=None): |
| if not creds: |
| creds = authenticate_google_services() |
| self.service = build('gmail', 'v1', credentials=creds) |
|
|
| def read_gmail_messages(self, queries, max_results=5): |
| query = " ".join(queries) |
| results = self.service.users().messages().list(userId='me', maxResults=max_results, q=query).execute() |
| messages = results.get('messages', []) |
| return messages |
|
|
| def format_messages(self, messages): |
| formatted_content = "\n\n==========\n\n" |
| for message in messages: |
| msg = self.service.users().messages().get(userId='me', id=message['id']).execute() |
| import json |
| with open('results.json', 'w', encoding='utf-8') as f: |
| json.dump(msg, f) |
| internal_Date = int(msg['internalDate']) |
| formatted_Date = datetime.fromtimestamp(internal_Date / 1000).strftime('%Y-%m-%d %H:%M:%S') |
| formatted_content += f"信件日期: {formatted_Date}\n" |
| content = self._parse_gmail_response(msg['payload']['parts']) |
| formatted_content+=f"信件內容: {content}\n\n==========\n\n" |
| return formatted_content |
|
|
| def send_email(self, to, subject, message_content): |
| try: |
| from email.message import EmailMessage |
| message = EmailMessage() |
| message.set_content(message_content) |
| message["To"] = ", ".join(to) |
| message["Subject"] = subject |
| encoded_message = base64.urlsafe_b64encode(message.as_bytes()).decode() |
| create_message = {"raw": encoded_message} |
| send_message = ( |
| self.service.users() |
| .messages() |
| .send(userId="me", body=create_message) |
| .execute() |
| ) |
| print(f'Message Id: {send_message["id"]}') |
| formatted_content = "✅ Gmail 已傳送給所有使用者,請至您的 Gmail 寄件備份確認!" |
| except Exception as error: |
| print(f"An error occurred: {error}") |
| send_message = None |
| formatted_content = f"❌ 信件未能成功寄出,以下是錯誤訊息(英文):{error}" |
| return formatted_content |
|
|
| def get_all_tools(self): |
| LLM_tools = [] |
| LLM_tools.append({ |
| "name": "query_gmail_tool", |
| "description": "query the gmails with specified queries", |
| "parameters": { |
| "type": "object", |
| "properties": { |
| "queries": { |
| "type": "array", |
| "items": {"type": "string"}, |
| "description": "Queries that can be used to filter the gmails. (e.g., ['before:2025/06/01', 'after:2025/05/01', 'from:abd@gmail.com'])", |
| }, |
| }, |
| "required": ["queries"] |
| }, |
| }) |
| LLM_tools.append({ |
| "name": "send_email_tool", |
| "description": "Send an email to a specified recipient with a specified subject and message", |
| "parameters": { |
| "type": "object", |
| "properties": { |
| "to": { |
| "type": "array", |
| "items": {"type": "string"}, |
| "description": "An array of email addresses of all the recipients" |
| }, |
| "subject": { |
| "type": "string", |
| "description": "The subject of the email" |
| }, |
| "message_content": { |
| "type": "string", |
| "description": "The content of the email" |
| } |
| }, |
| "required": ["to", "subject", "message_content"] |
| } |
| }) |
| return LLM_tools |
|
|
| def _parse_gmail_response(self, parts): |
| for part in parts: |
| if "parts" in part: |
| content = self._parse_gmail_response(part["parts"]) |
| if content: |
| return content |
| if part['mimeType'] == 'text/plain' and part['body']['size'] != 0: |
| decoded_data = base64.urlsafe_b64decode(part['body']['data']).decode('utf-8') |
| return decoded_data |
| return None |
|
|