X Tutup
{ "metadata": { "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.6-final" }, "orig_nbformat": 2, "kernelspec": { "name": "python36664bitea6884f10f474b21a2a2f022451e0d09", "display_name": "Python 3.6.6 64-bit" } }, "nbformat": 4, "nbformat_minor": 2, "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "import pickle\n", "# Gmail API utils\n", "from googleapiclient.discovery import build\n", "from google_auth_oauthlib.flow import InstalledAppFlow\n", "from google.auth.transport.requests import Request\n", "# for encoding/decoding messages in base64\n", "from base64 import urlsafe_b64decode, urlsafe_b64encode\n", "# for dealing with attachement MIME types\n", "from email.mime.text import MIMEText\n", "from email.mime.multipart import MIMEMultipart\n", "from email.mime.image import MIMEImage\n", "from email.mime.audio import MIMEAudio\n", "from email.mime.base import MIMEBase\n", "from email.mime.multipart import MIMEMultipart\n", "from mimetypes import guess_type as guess_mime_type" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Request all access (permission to read/send/receive emails, manage the inbox, and more)\n", "SCOPES = ['https://mail.google.com/']\n", "our_email = 'your_gmail@gmail.com'" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def gmail_authenticate():\n", " creds = None\n", " # the file token.pickle stores the user's access and refresh tokens, and is\n", " # created automatically when the authorization flow completes for the first time\n", " if os.path.exists(\"token.pickle\"):\n", " with open(\"token.pickle\", \"rb\") as token:\n", " creds = pickle.load(token)\n", " # if there are no (valid) credentials availablle, let the user log in.\n", " if not creds or not creds.valid:\n", " if creds and creds.expired and creds.refresh_token:\n", " creds.refresh(Request())\n", " else:\n", " flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES)\n", " creds = flow.run_local_server(port=0)\n", " # save the credentials for the next run\n", " with open(\"token.pickle\", \"wb\") as token:\n", " pickle.dump(creds, token)\n", " return build('gmail', 'v1', credentials=creds)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# get the Gmail API service\n", "service = gmail_authenticate()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Adds the attachment with the given filename to the given message\n", "def add_attachment(message, filename):\n", " content_type, encoding = guess_mime_type(filename)\n", " if content_type is None or encoding is not None:\n", " content_type = 'application/octet-stream'\n", " main_type, sub_type = content_type.split('/', 1)\n", " if main_type == 'text':\n", " fp = open(filename, 'rb')\n", " msg = MIMEText(fp.read().decode(), _subtype=sub_type)\n", " fp.close()\n", " elif main_type == 'image':\n", " fp = open(filename, 'rb')\n", " msg = MIMEImage(fp.read(), _subtype=sub_type)\n", " fp.close()\n", " elif main_type == 'audio':\n", " fp = open(filename, 'rb')\n", " msg = MIMEAudio(fp.read(), _subtype=sub_type)\n", " fp.close()\n", " else:\n", " fp = open(filename, 'rb')\n", " msg = MIMEBase(main_type, sub_type)\n", " msg.set_payload(fp.read())\n", " fp.close()\n", " filename = os.path.basename(filename)\n", " msg.add_header('Content-Disposition', 'attachment', filename=filename)\n", " message.attach(msg)\n", "\n", "def build_message(destination, obj, body, attachments=[]):\n", " if not attachments: # no attachments given\n", " message = MIMEText(body)\n", " message['to'] = destination\n", " message['from'] = our_email\n", " message['subject'] = obj\n", " else:\n", " message = MIMEMultipart()\n", " message['to'] = destination\n", " message['from'] = our_email\n", " message['subject'] = obj\n", " message.attach(MIMEText(body))\n", " for filename in attachments:\n", " add_attachment(message, filename)\n", " return {'raw': urlsafe_b64encode(message.as_bytes()).decode()}\n", "\n", "def send_message(service, destination, obj, body, attachments=[]):\n", " return service.users().messages().send(\n", " userId=\"me\",\n", " body=build_message(destination, obj, body, attachments)\n", " ).execute()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# test send email\n", "send_message(service, \"destination@domain.com\", \"This is a subject\", \n", " \"This is the body of the email\", [\"test.txt\", \"credentials.json\"])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def search_messages(service, query):\n", " result = service.users().messages().list(userId='me',q=query).execute()\n", " messages = [ ]\n", " if 'messages' in result:\n", " messages.extend(result['messages'])\n", " while 'nextPageToken' in result:\n", " page_token = result['nextPageToken']\n", " result = service.users().messages().list(userId='me',q=query, pageToken=page_token).execute()\n", " if 'messages' in result:\n", " messages.extend(result['messages'])\n", " return messages" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# utility functions\n", "def get_size_format(b, factor=1024, suffix=\"B\"):\n", " \"\"\"\n", " Scale bytes to its proper byte format\n", " e.g:\n", " 1253656 => '1.20MB'\n", " 1253656678 => '1.17GB'\n", " \"\"\"\n", " for unit in [\"\", \"K\", \"M\", \"G\", \"T\", \"P\", \"E\", \"Z\"]:\n", " if b < factor:\n", " return f\"{b:.2f}{unit}{suffix}\"\n", " b /= factor\n", " return f\"{b:.2f}Y{suffix}\"\n", "\n", "\n", "def clean(text):\n", " # clean text for creating a folder\n", " return \"\".join(c if c.isalnum() else \"_\" for c in text)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def parse_parts(service, parts, folder_name):\n", " \"\"\"\n", " Utility function that parses the content of an email partition\n", " \"\"\"\n", " if parts:\n", " for part in parts:\n", " filename = part.get(\"filename\")\n", " mimeType = part.get(\"mimeType\")\n", " body = part.get(\"body\")\n", " data = body.get(\"data\")\n", " file_size = body.get(\"size\")\n", " part_headers = part.get(\"headers\")\n", " if part.get(\"parts\"):\n", " # recursively call this function when we see that a part\n", " # has parts inside\n", " parse_parts(service, part.get(\"parts\"), folder_name)\n", " if mimeType == \"text/plain\":\n", " # if the email part is text plain\n", " if data:\n", " text = urlsafe_b64decode(data).decode()\n", " print(text)\n", " elif mimeType == \"text/html\":\n", " # if the email part is an HTML content\n", " # save the HTML file and optionally open it in the browser\n", " if not filename:\n", " filename = \"index.html\"\n", " filepath = os.path.join(folder_name, filename)\n", " print(\"Saving HTML to\", filepath)\n", " with open(filepath, \"wb\") as f:\n", " f.write(urlsafe_b64decode(data))\n", " else:\n", " # attachment other than a plain text or HTML\n", " for part_header in part_headers:\n", " part_header_name = part_header.get(\"name\")\n", " part_header_value = part_header.get(\"value\")\n", " if part_header_name == \"Content-Disposition\":\n", " if \"attachment\" in part_header_value:\n", " # we get the attachment ID \n", " # and make another request to get the attachment itself\n", " print(\"Saving the file:\", filename, \"size:\", get_size_format(file_size))\n", " attachment_id = body.get(\"attachmentId\")\n", " attachment = service.users().messages() \\\n", " .attachments().get(id=attachment_id, userId='me', messageId=msg['id']).execute()\n", " data = attachment.get(\"data\")\n", " filepath = os.path.join(folder_name, filename)\n", " if data:\n", " with open(filepath, \"wb\") as f:\n", " f.write(urlsafe_b64decode(data)) \n", "\n", "\n", "def read_message(service, message_id):\n", " \"\"\"\n", " This function takes Gmail API `service` and the given `message_id` and does the following:\n", " - Downloads the content of the email\n", " - Prints email basic information (To, From, Subject & Date) and plain/text parts\n", " - Creates a folder for each email based on the subject\n", " - Downloads text/html content (if available) and saves it under the folder created as index.html\n", " - Downloads any file that is attached to the email and saves it in the folder created\n", " \"\"\"\n", " msg = service.users().messages().get(userId='me', id=message_id['id'], format='full').execute()\n", " # parts can be the message body, or attachments\n", " payload = msg['payload']\n", " headers = payload.get(\"headers\")\n", " parts = payload.get(\"parts\")\n", " folder_name = \"email\"\n", " if headers:\n", " # this section prints email basic info & creates a folder for the email\n", " for header in headers:\n", " name = header.get(\"name\")\n", " value = header.get(\"value\")\n", " if name == 'From':\n", " # we print the From address\n", " print(\"From:\", value)\n", " if name == \"To\":\n", " # we print the To address\n", " print(\"To:\", value)\n", " if name == \"Subject\":\n", " # make a directory with the name of the subject\n", " folder_name = clean(value)\n", " # we will also handle emails with the same subject name\n", " folder_counter = 0\n", " while os.path.isdir(folder_name):\n", " folder_counter += 1\n", " # we have the same folder name, add a number next to it\n", " if folder_name[-1].isdigit() and folder_name[-2] == \"_\":\n", " folder_name = f\"{folder_name[:-2]}_{folder_counter}\"\n", " elif folder_name[-2:].isdigit() and folder_name[-3] == \"_\":\n", " folder_name = f\"{folder_name[:-3]}_{folder_counter}\"\n", " else:\n", " folder_name = f\"{folder_name}_{folder_counter}\"\n", " os.mkdir(folder_name)\n", " print(\"Subject:\", value)\n", " if name == \"Date\":\n", " # we print the date when the message was sent\n", " print(\"Date:\", value)\n", " \n", " parse_parts(service, parts, folder_name)\n", " print(\"=\"*50)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# get emails that match the query you specify\n", "results = search_messages(service, \"Python Code\")\n", "# for each email matched, read it (output plain/text to console & save HTML and attachments)\n", "for msg in results:\n", " read_message(service, msg)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def mark_as_read(service, query):\n", " messages_to_mark = search_messages(service, query)\n", " return service.users().messages().batchModify(\n", " userId='me',\n", " body={\n", " 'ids': [ msg['id'] for msg in messages_to_mark ],\n", " 'removeLabelIds': ['UNREAD']\n", " }\n", " ).execute()\n", "\n", "def mark_as_unread(service, query):\n", " messages_to_mark = search_messages(service, query)\n", " return service.users().messages().batchModify(\n", " userId='me',\n", " body={\n", " 'ids': [ msg['id'] for msg in messages_to_mark ],\n", " 'addLabelIds': ['UNREAD']\n", " }\n", " ).execute()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "mark_as_read(service, \"Google\")\n", "# search query by sender/receiver\n", "mark_as_unread(service, \"email@domain.com\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def delete_messages(service, query):\n", " messages_to_delete = search_messages(service, query)\n", " # it's possible to delete a single message with the delete API, like this:\n", " # service.users().messages().delete(userId='me', id=msg['id'])\n", " # but it's also possible to delete all the selected messages with one query, batchDelete\n", " return service.users().messages().batchDelete(\n", " userId='me',\n", " body={\n", " 'ids': [ msg['id'] for msg in messages_to_delete]\n", " }\n", " ).execute()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "delete_messages(service, \"Google Alerts\")" ] } ] }
X Tutup