{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"import pickle\n",
"# Gmail API utils\n",
"from googleapiclient.discovery import build\n",
"from google_auth_oauthlib.flow import InstalledAppFlow\n",
"from google.auth.transport.requests import Request\n",
"# for encoding/decoding messages in base64\n",
"from base64 import urlsafe_b64decode, urlsafe_b64encode\n",
"# for dealing with attachement MIME types\n",
"from email.mime.text import MIMEText\n",
"from email.mime.multipart import MIMEMultipart\n",
"from email.mime.image import MIMEImage\n",
"from email.mime.audio import MIMEAudio\n",
"from email.mime.base import MIMEBase\n",
"from mimetypes import guess_type as guess_mime_type"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Request all access (permission to read/send/receive emails, manage the inbox, and more)\n",
"SCOPES = ['https://mail.google.com/']\n",
"our_email = 'your_gmail@gmail.com'"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def gmail_authenticate():\n",
" creds = None\n",
" # the file token.pickle stores the user's access and refresh tokens, and is\n",
" # created automatically when the authorization flow completes for the first time\n",
" if os.path.exists(\"token.pickle\"):\n",
" with open(\"token.pickle\", \"rb\") as token:\n",
" creds = pickle.load(token)\n",
" # if there are no (valid) credentials availablle, let the user log in.\n",
" if not creds or not creds.valid:\n",
" if creds and creds.expired and creds.refresh_token:\n",
" creds.refresh(Request())\n",
" else:\n",
" flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES)\n",
" creds = flow.run_local_server(port=0)\n",
" # save the credentials for the next run\n",
" with open(\"token.pickle\", \"wb\") as token:\n",
" pickle.dump(creds, token)\n",
" return build('gmail', 'v1', credentials=creds)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# get the Gmail API service\n",
"service = gmail_authenticate()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Adds the attachment with the given filename to the given message\n",
"def add_attachment(message, filename):\n",
" content_type, encoding = guess_mime_type(filename)\n",
" if content_type is None or encoding is not None:\n",
" content_type = 'application/octet-stream'\n",
" main_type, sub_type = content_type.split('/', 1)\n",
" if main_type == 'text':\n",
" fp = open(filename, 'rb')\n",
" msg = MIMEText(fp.read().decode(), _subtype=sub_type)\n",
" fp.close()\n",
" elif main_type == 'image':\n",
" fp = open(filename, 'rb')\n",
" msg = MIMEImage(fp.read(), _subtype=sub_type)\n",
" fp.close()\n",
" elif main_type == 'audio':\n",
" fp = open(filename, 'rb')\n",
" msg = MIMEAudio(fp.read(), _subtype=sub_type)\n",
" fp.close()\n",
" else:\n",
" fp = open(filename, 'rb')\n",
" msg = MIMEBase(main_type, sub_type)\n",
" msg.set_payload(fp.read())\n",
" fp.close()\n",
" filename = os.path.basename(filename)\n",
" msg.add_header('Content-Disposition', 'attachment', filename=filename)\n",
" message.attach(msg)\n",
"\n",
"def build_message(destination, obj, body, attachments=[]):\n",
" if not attachments: # no attachments given\n",
" message = MIMEText(body)\n",
" message['to'] = destination\n",
" message['from'] = our_email\n",
" message['subject'] = obj\n",
" else:\n",
" message = MIMEMultipart()\n",
" message['to'] = destination\n",
" message['from'] = our_email\n",
" message['subject'] = obj\n",
" message.attach(MIMEText(body))\n",
" for filename in attachments:\n",
" add_attachment(message, filename)\n",
" return {'raw': urlsafe_b64encode(message.as_bytes()).decode()}\n",
"\n",
"def send_message(service, destination, obj, body, attachments=[]):\n",
" return service.users().messages().send(\n",
" userId=\"me\",\n",
" body=build_message(destination, obj, body, attachments)\n",
" ).execute()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# test send email\n",
"send_message(service, \"destination@domain.com\", \"This is a subject\", \n",
" \"This is the body of the email\", [\"test.txt\", \"anyfile.png\"])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def search_messages(service, query):\n",
" result = service.users().messages().list(userId='me',q=query).execute()\n",
" messages = [ ]\n",
" if 'messages' in result:\n",
" messages.extend(result['messages'])\n",
" while 'nextPageToken' in result:\n",
" page_token = result['nextPageToken']\n",
" result = service.users().messages().list(userId='me',q=query, pageToken=page_token).execute()\n",
" if 'messages' in result:\n",
" messages.extend(result['messages'])\n",
" return messages"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# utility functions\n",
"def get_size_format(b, factor=1024, suffix=\"B\"):\n",
" \"\"\"\n",
" Scale bytes to its proper byte format\n",
" e.g:\n",
" 1253656 => '1.20MB'\n",
" 1253656678 => '1.17GB'\n",
" \"\"\"\n",
" for unit in [\"\", \"K\", \"M\", \"G\", \"T\", \"P\", \"E\", \"Z\"]:\n",
" if b < factor:\n",
" return f\"{b:.2f}{unit}{suffix}\"\n",
" b /= factor\n",
" return f\"{b:.2f}Y{suffix}\"\n",
"\n",
"\n",
"def clean(text):\n",
" # clean text for creating a folder\n",
" return \"\".join(c if c.isalnum() else \"_\" for c in text)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def parse_parts(service, parts, folder_name, message):\n",
" \"\"\"\n",
" Utility function that parses the content of an email partition\n",
" \"\"\"\n",
" if parts:\n",
" for part in parts:\n",
" filename = part.get(\"filename\")\n",
" mimeType = part.get(\"mimeType\")\n",
" body = part.get(\"body\")\n",
" data = body.get(\"data\")\n",
" file_size = body.get(\"size\")\n",
" part_headers = part.get(\"headers\")\n",
" if part.get(\"parts\"):\n",
" # recursively call this function when we see that a part\n",
" # has parts inside\n",
" parse_parts(service, part.get(\"parts\"), folder_name, message)\n",
" if mimeType == \"text/plain\":\n",
" # if the email part is text plain\n",
" if data:\n",
" text = urlsafe_b64decode(data).decode()\n",
" print(text)\n",
" elif mimeType == \"text/html\":\n",
" # if the email part is an HTML content\n",
" # save the HTML file and optionally open it in the browser\n",
" if not filename:\n",
" filename = \"index.html\"\n",
" filepath = os.path.join(folder_name, filename)\n",
" print(\"Saving HTML to\", filepath)\n",
" with open(filepath, \"wb\") as f:\n",
" f.write(urlsafe_b64decode(data))\n",
" else:\n",
" # attachment other than a plain text or HTML\n",
" for part_header in part_headers:\n",
" part_header_name = part_header.get(\"name\")\n",
" part_header_value = part_header.get(\"value\")\n",
" if part_header_name == \"Content-Disposition\":\n",
" if \"attachment\" in part_header_value:\n",
" # we get the attachment ID \n",
" # and make another request to get the attachment itself\n",
" print(\"Saving the file:\", filename, \"size:\", get_size_format(file_size))\n",
" attachment_id = body.get(\"attachmentId\")\n",
" attachment = service.users().messages() \\\n",
" .attachments().get(id=attachment_id, userId='me', messageId=message['id']).execute()\n",
" data = attachment.get(\"data\")\n",
" filepath = os.path.join(folder_name, filename)\n",
" if data:\n",
" with open(filepath, \"wb\") as f:\n",
" f.write(urlsafe_b64decode(data))\n",
"\n",
"\n",
"def read_message(service, message):\n",
" \"\"\"\n",
" This function takes Gmail API `service` and the given `message_id` and does the following:\n",
" - Downloads the content of the email\n",
" - Prints email basic information (To, From, Subject & Date) and plain/text parts\n",
" - Creates a folder for each email based on the subject\n",
" - Downloads text/html content (if available) and saves it under the folder created as index.html\n",
" - Downloads any file that is attached to the email and saves it in the folder created\n",
" \"\"\"\n",
" msg = service.users().messages().get(userId='me', id=message['id'], format='full').execute()\n",
" # parts can be the message body, or attachments\n",
" payload = msg['payload']\n",
" headers = payload.get(\"headers\")\n",
" parts = payload.get(\"parts\")\n",
" folder_name = \"email\"\n",
" has_subject = False\n",
" if headers:\n",
" # this section prints email basic info & creates a folder for the email\n",
" for header in headers:\n",
" name = header.get(\"name\")\n",
" value = header.get(\"value\")\n",
" if name.lower() == 'from':\n",
" # we print the From address\n",
" print(\"From:\", value)\n",
" if name.lower() == \"to\":\n",
" # we print the To address\n",
" print(\"To:\", value)\n",
" if name.lower() == \"subject\":\n",
" # make our boolean True, the email has \"subject\"\n",
" has_subject = True\n",
" # make a directory with the name of the subject\n",
" folder_name = clean(value)\n",
" # we will also handle emails with the same subject name\n",
" folder_counter = 0\n",
" while os.path.isdir(folder_name):\n",
" folder_counter += 1\n",
" # we have the same folder name, add a number next to it\n",
" if folder_name[-1].isdigit() and folder_name[-2] == \"_\":\n",
" folder_name = f\"{folder_name[:-2]}_{folder_counter}\"\n",
" elif folder_name[-2:].isdigit() and folder_name[-3] == \"_\":\n",
" folder_name = f\"{folder_name[:-3]}_{folder_counter}\"\n",
" else:\n",
" folder_name = f\"{folder_name}_{folder_counter}\"\n",
" os.mkdir(folder_name)\n",
" print(\"Subject:\", value)\n",
" if name.lower() == \"date\":\n",
" # we print the date when the message was sent\n",
" print(\"Date:\", value)\n",
" if not has_subject:\n",
" # if the email does not have a subject, then make a folder with \"email\" name\n",
" # since folders are created based on subjects\n",
" if not os.path.isdir(folder_name):\n",
" os.mkdir(folder_name)\n",
" parse_parts(service, parts, folder_name, message)\n",
" print(\"=\"*50)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# get emails that match the query you specify\n",
"results = search_messages(service, \"Python Code\")\n",
"print(f\"Found {len(results)} results.\")\n",
"# for each email matched, read it (output plain/text to console & save HTML and attachments)\n",
"for msg in results:\n",
" read_message(service, msg)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def mark_as_read(service, query):\n",
" messages_to_mark = search_messages(service, query)\n",
" print(f\"Matched emails: {len(messages_to_mark)}\")\n",
" return service.users().messages().batchModify(\n",
" userId='me',\n",
" body={\n",
" 'ids': [ msg['id'] for msg in messages_to_mark ],\n",
" 'removeLabelIds': ['UNREAD']\n",
" }\n",
" ).execute()\n",
"\n",
"def mark_as_unread(service, query):\n",
" messages_to_mark = search_messages(service, query)\n",
" print(f\"Matched emails: {len(messages_to_mark)}\")\n",
" return service.users().messages().batchModify(\n",
" userId='me',\n",
" body={\n",
" 'ids': [ msg['id'] for msg in messages_to_mark ],\n",
" 'addLabelIds': ['UNREAD']\n",
" }\n",
" ).execute()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"mark_as_read(service, \"Google\")\n",
"# search query by sender/receiver\n",
"mark_as_unread(service, \"from: email@domain.com\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def delete_messages(service, query):\n",
" messages_to_delete = search_messages(service, query)\n",
" print(f\"Deleting {len(messages_to_delete)} emails.\")\n",
" # it's possible to delete a single message with the delete API, like this:\n",
" # service.users().messages().delete(userId='me', id=msg['id'])\n",
" # but it's also possible to delete all the selected messages with one query, batchDelete\n",
" return service.users().messages().batchDelete(\n",
" userId='me',\n",
" body={\n",
" 'ids': [ msg['id'] for msg in messages_to_delete]\n",
" }\n",
" ).execute()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"delete_messages(service, \"Google Alerts\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.12"
},
"vscode": {
"interpreter": {
"hash": "f89a88aed07bbcd763ac68893150ace71e487877d8c6527a76855322f20001c6"
}
}
},
"nbformat": 4,
"nbformat_minor": 4
}