X Tutup
{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "import pickle\n", "# Gmail API utils\n", "from googleapiclient.discovery import build\n", "from google_auth_oauthlib.flow import InstalledAppFlow\n", "from google.auth.transport.requests import Request\n", "# for encoding/decoding messages in base64\n", "from base64 import urlsafe_b64decode, urlsafe_b64encode\n", "# for dealing with attachement MIME types\n", "from email.mime.text import MIMEText\n", "from email.mime.multipart import MIMEMultipart\n", "from email.mime.image import MIMEImage\n", "from email.mime.audio import MIMEAudio\n", "from email.mime.base import MIMEBase\n", "from mimetypes import guess_type as guess_mime_type" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Request all access (permission to read/send/receive emails, manage the inbox, and more)\n", "SCOPES = ['https://mail.google.com/']\n", "our_email = 'your_gmail@gmail.com'" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def gmail_authenticate():\n", " creds = None\n", " # the file token.pickle stores the user's access and refresh tokens, and is\n", " # created automatically when the authorization flow completes for the first time\n", " if os.path.exists(\"token.pickle\"):\n", " with open(\"token.pickle\", \"rb\") as token:\n", " creds = pickle.load(token)\n", " # if there are no (valid) credentials availablle, let the user log in.\n", " if not creds or not creds.valid:\n", " if creds and creds.expired and creds.refresh_token:\n", " creds.refresh(Request())\n", " else:\n", " flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES)\n", " creds = flow.run_local_server(port=0)\n", " # save the credentials for the next run\n", " with open(\"token.pickle\", \"wb\") as token:\n", " pickle.dump(creds, token)\n", " return build('gmail', 'v1', credentials=creds)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# get the Gmail API service\n", "service = gmail_authenticate()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Adds the attachment with the given filename to the given message\n", "def add_attachment(message, filename):\n", " content_type, encoding = guess_mime_type(filename)\n", " if content_type is None or encoding is not None:\n", " content_type = 'application/octet-stream'\n", " main_type, sub_type = content_type.split('/', 1)\n", " if main_type == 'text':\n", " fp = open(filename, 'rb')\n", " msg = MIMEText(fp.read().decode(), _subtype=sub_type)\n", " fp.close()\n", " elif main_type == 'image':\n", " fp = open(filename, 'rb')\n", " msg = MIMEImage(fp.read(), _subtype=sub_type)\n", " fp.close()\n", " elif main_type == 'audio':\n", " fp = open(filename, 'rb')\n", " msg = MIMEAudio(fp.read(), _subtype=sub_type)\n", " fp.close()\n", " else:\n", " fp = open(filename, 'rb')\n", " msg = MIMEBase(main_type, sub_type)\n", " msg.set_payload(fp.read())\n", " fp.close()\n", " filename = os.path.basename(filename)\n", " msg.add_header('Content-Disposition', 'attachment', filename=filename)\n", " message.attach(msg)\n", "\n", "def build_message(destination, obj, body, attachments=[]):\n", " if not attachments: # no attachments given\n", " message = MIMEText(body)\n", " message['to'] = destination\n", " message['from'] = our_email\n", " message['subject'] = obj\n", " else:\n", " message = MIMEMultipart()\n", " message['to'] = destination\n", " message['from'] = our_email\n", " message['subject'] = obj\n", " message.attach(MIMEText(body))\n", " for filename in attachments:\n", " add_attachment(message, filename)\n", " return {'raw': urlsafe_b64encode(message.as_bytes()).decode()}\n", "\n", "def send_message(service, destination, obj, body, attachments=[]):\n", " return service.users().messages().send(\n", " userId=\"me\",\n", " body=build_message(destination, obj, body, attachments)\n", " ).execute()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# test send email\n", "send_message(service, \"destination@domain.com\", \"This is a subject\", \n", " \"This is the body of the email\", [\"test.txt\", \"anyfile.png\"])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def search_messages(service, query):\n", " result = service.users().messages().list(userId='me',q=query).execute()\n", " messages = [ ]\n", " if 'messages' in result:\n", " messages.extend(result['messages'])\n", " while 'nextPageToken' in result:\n", " page_token = result['nextPageToken']\n", " result = service.users().messages().list(userId='me',q=query, pageToken=page_token).execute()\n", " if 'messages' in result:\n", " messages.extend(result['messages'])\n", " return messages" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# utility functions\n", "def get_size_format(b, factor=1024, suffix=\"B\"):\n", " \"\"\"\n", " Scale bytes to its proper byte format\n", " e.g:\n", " 1253656 => '1.20MB'\n", " 1253656678 => '1.17GB'\n", " \"\"\"\n", " for unit in [\"\", \"K\", \"M\", \"G\", \"T\", \"P\", \"E\", \"Z\"]:\n", " if b < factor:\n", " return f\"{b:.2f}{unit}{suffix}\"\n", " b /= factor\n", " return f\"{b:.2f}Y{suffix}\"\n", "\n", "\n", "def clean(text):\n", " # clean text for creating a folder\n", " return \"\".join(c if c.isalnum() else \"_\" for c in text)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def parse_parts(service, parts, folder_name, message):\n", " \"\"\"\n", " Utility function that parses the content of an email partition\n", " \"\"\"\n", " if parts:\n", " for part in parts:\n", " filename = part.get(\"filename\")\n", " mimeType = part.get(\"mimeType\")\n", " body = part.get(\"body\")\n", " data = body.get(\"data\")\n", " file_size = body.get(\"size\")\n", " part_headers = part.get(\"headers\")\n", " if part.get(\"parts\"):\n", " # recursively call this function when we see that a part\n", " # has parts inside\n", " parse_parts(service, part.get(\"parts\"), folder_name, message)\n", " if mimeType == \"text/plain\":\n", " # if the email part is text plain\n", " if data:\n", " text = urlsafe_b64decode(data).decode()\n", " print(text)\n", " elif mimeType == \"text/html\":\n", " # if the email part is an HTML content\n", " # save the HTML file and optionally open it in the browser\n", " if not filename:\n", " filename = \"index.html\"\n", " filepath = os.path.join(folder_name, filename)\n", " print(\"Saving HTML to\", filepath)\n", " with open(filepath, \"wb\") as f:\n", " f.write(urlsafe_b64decode(data))\n", " else:\n", " # attachment other than a plain text or HTML\n", " for part_header in part_headers:\n", " part_header_name = part_header.get(\"name\")\n", " part_header_value = part_header.get(\"value\")\n", " if part_header_name == \"Content-Disposition\":\n", " if \"attachment\" in part_header_value:\n", " # we get the attachment ID \n", " # and make another request to get the attachment itself\n", " print(\"Saving the file:\", filename, \"size:\", get_size_format(file_size))\n", " attachment_id = body.get(\"attachmentId\")\n", " attachment = service.users().messages() \\\n", " .attachments().get(id=attachment_id, userId='me', messageId=message['id']).execute()\n", " data = attachment.get(\"data\")\n", " filepath = os.path.join(folder_name, filename)\n", " if data:\n", " with open(filepath, \"wb\") as f:\n", " f.write(urlsafe_b64decode(data))\n", "\n", "\n", "def read_message(service, message):\n", " \"\"\"\n", " This function takes Gmail API `service` and the given `message_id` and does the following:\n", " - Downloads the content of the email\n", " - Prints email basic information (To, From, Subject & Date) and plain/text parts\n", " - Creates a folder for each email based on the subject\n", " - Downloads text/html content (if available) and saves it under the folder created as index.html\n", " - Downloads any file that is attached to the email and saves it in the folder created\n", " \"\"\"\n", " msg = service.users().messages().get(userId='me', id=message['id'], format='full').execute()\n", " # parts can be the message body, or attachments\n", " payload = msg['payload']\n", " headers = payload.get(\"headers\")\n", " parts = payload.get(\"parts\")\n", " folder_name = \"email\"\n", " has_subject = False\n", " if headers:\n", " # this section prints email basic info & creates a folder for the email\n", " for header in headers:\n", " name = header.get(\"name\")\n", " value = header.get(\"value\")\n", " if name.lower() == 'from':\n", " # we print the From address\n", " print(\"From:\", value)\n", " if name.lower() == \"to\":\n", " # we print the To address\n", " print(\"To:\", value)\n", " if name.lower() == \"subject\":\n", " # make our boolean True, the email has \"subject\"\n", " has_subject = True\n", " # make a directory with the name of the subject\n", " folder_name = clean(value)\n", " # we will also handle emails with the same subject name\n", " folder_counter = 0\n", " while os.path.isdir(folder_name):\n", " folder_counter += 1\n", " # we have the same folder name, add a number next to it\n", " if folder_name[-1].isdigit() and folder_name[-2] == \"_\":\n", " folder_name = f\"{folder_name[:-2]}_{folder_counter}\"\n", " elif folder_name[-2:].isdigit() and folder_name[-3] == \"_\":\n", " folder_name = f\"{folder_name[:-3]}_{folder_counter}\"\n", " else:\n", " folder_name = f\"{folder_name}_{folder_counter}\"\n", " os.mkdir(folder_name)\n", " print(\"Subject:\", value)\n", " if name.lower() == \"date\":\n", " # we print the date when the message was sent\n", " print(\"Date:\", value)\n", " if not has_subject:\n", " # if the email does not have a subject, then make a folder with \"email\" name\n", " # since folders are created based on subjects\n", " if not os.path.isdir(folder_name):\n", " os.mkdir(folder_name)\n", " parse_parts(service, parts, folder_name, message)\n", " print(\"=\"*50)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# get emails that match the query you specify\n", "results = search_messages(service, \"Python Code\")\n", "print(f\"Found {len(results)} results.\")\n", "# for each email matched, read it (output plain/text to console & save HTML and attachments)\n", "for msg in results:\n", " read_message(service, msg)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def mark_as_read(service, query):\n", " messages_to_mark = search_messages(service, query)\n", " print(f\"Matched emails: {len(messages_to_mark)}\")\n", " return service.users().messages().batchModify(\n", " userId='me',\n", " body={\n", " 'ids': [ msg['id'] for msg in messages_to_mark ],\n", " 'removeLabelIds': ['UNREAD']\n", " }\n", " ).execute()\n", "\n", "def mark_as_unread(service, query):\n", " messages_to_mark = search_messages(service, query)\n", " print(f\"Matched emails: {len(messages_to_mark)}\")\n", " return service.users().messages().batchModify(\n", " userId='me',\n", " body={\n", " 'ids': [ msg['id'] for msg in messages_to_mark ],\n", " 'addLabelIds': ['UNREAD']\n", " }\n", " ).execute()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "mark_as_read(service, \"Google\")\n", "# search query by sender/receiver\n", "mark_as_unread(service, \"from: email@domain.com\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def delete_messages(service, query):\n", " messages_to_delete = search_messages(service, query)\n", " print(f\"Deleting {len(messages_to_delete)} emails.\")\n", " # it's possible to delete a single message with the delete API, like this:\n", " # service.users().messages().delete(userId='me', id=msg['id'])\n", " # but it's also possible to delete all the selected messages with one query, batchDelete\n", " return service.users().messages().batchDelete(\n", " userId='me',\n", " body={\n", " 'ids': [ msg['id'] for msg in messages_to_delete]\n", " }\n", " ).execute()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "delete_messages(service, \"Google Alerts\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.12" }, "vscode": { "interpreter": { "hash": "f89a88aed07bbcd763ac68893150ace71e487877d8c6527a76855322f20001c6" } } }, "nbformat": 4, "nbformat_minor": 4 }
X Tutup