3
votes

With the objective of having an application that runs in python 3 and reads incoming emails on an specific gmail account, how would one listen for the reception of this emails?

What it should do is wait until a new mail is received on the inbox, read the subject and body from the email and get the text from the body (without format).

This is what I got so far:

import imaplib
import email
import datetime
import time

mail = imaplib.IMAP4_SSL('imap.gmail.com', 993)
mail.login(user, password)
mail.list()
mail.select('inbox')

status, data = mail.search(None, 'ALL')
for num in data[0].split():
    status, data = mail.fetch(num, '(RFC822)')
    email_msg = data[0][1]
    email_msg = email.message_from_bytes(email_msg)
    maintype = email_msg.get_content_maintype()
    if maintype == 'multipart':
        for part in email_msg.get_payload():
            if part.get_content_maintype() == 'text':
                print(part.get_payload())
    elif maintype == 'text':
        print(email_msg.get_payload())

But this has a couple of problems: When the message is multipart each part is printed and sometimes after that the last part is basically the whole message but in html format.

Also, this prints all the messages from the inbox, how would one listen for new emails with imaplib? or with other library.

2

2 Answers

4
votes

Have you check below script (3_emailcheck.py) from here posted by git user nickoala? Its a python 2 script and in Python3 you need to decode the bytes with the email content first.

import time
from itertools import chain
import email
import imaplib

imap_ssl_host = 'imap.gmail.com'  # imap.mail.yahoo.com
imap_ssl_port = 993
username = 'USERNAME or EMAIL ADDRESS'
password = 'PASSWORD'

# Restrict mail search. Be very specific.
# Machine should be very selective to receive messages.
criteria = {
    'FROM':    'PRIVILEGED EMAIL ADDRESS',
    'SUBJECT': 'SPECIAL SUBJECT LINE',
    'BODY':    'SECRET SIGNATURE',
}
uid_max = 0


def search_string(uid_max, criteria):
    c = list(map(lambda t: (t[0], '"'+str(t[1])+'"'), criteria.items())) + [('UID', '%d:*' % (uid_max+1))]
    return '(%s)' % ' '.join(chain(*c))
    # Produce search string in IMAP format:
    #   e.g. (FROM "me@gmail.com" SUBJECT "abcde" BODY "123456789" UID 9999:*)


def get_first_text_block(msg):
    type = msg.get_content_maintype()

    if type == 'multipart':
        for part in msg.get_payload():
            if part.get_content_maintype() == 'text':
                return part.get_payload()
    elif type == 'text':
        return msg.get_payload()


server = imaplib.IMAP4_SSL(imap_ssl_host, imap_ssl_port)
server.login(username, password)
server.select('INBOX')

result, data = server.uid('search', None, search_string(uid_max, criteria))

uids = [int(s) for s in data[0].split()]
if uids:
    uid_max = max(uids)
    # Initialize `uid_max`. Any UID less than or equal to `uid_max` will be ignored subsequently.

server.logout()


# Keep checking messages ...
# I don't like using IDLE because Yahoo does not support it.
while 1:
    # Have to login/logout each time because that's the only way to get fresh results.

    server = imaplib.IMAP4_SSL(imap_ssl_host, imap_ssl_port)
    server.login(username, password)
    server.select('INBOX')

    result, data = server.uid('search', None, search_string(uid_max, criteria))

    uids = [int(s) for s in data[0].split()]
    for uid in uids:
        # Have to check again because Gmail sometimes does not obey UID criterion.
        if uid > uid_max:
            result, data = server.uid('fetch', uid, '(RFC822)')  # fetch entire message
            msg = email.message_from_string(data[0][1])

            uid_max = uid

            text = get_first_text_block(msg)
            print 'New message :::::::::::::::::::::'
            print text

    server.logout()
time.sleep(1)
3
votes

I'm not sure about the synchronous way of doing that, but if you don't mind having an async loop and defining unread emails as your target then it could work.
(I didn't implement the IMAP polling loop, only the email fetching loop)

My changes

  1. Replace the IMAP search filter from 'ALL' to '(UNSEEN)' to fetch unread emails.
  2. Change the serializing policy to policy.SMTP from the default policy.Compat32.
  3. Use the email.message.walk() method (new API) to run & filter message parts.
  4. Replace the legacy email API calls with the new ones as described in the docs,
    and demonstrated in these examples.

The result code

import imaplib, email, getpass
from email import policy

imap_host = 'imap.gmail.com'
imap_user = 'example@gmail.com'

# init imap connection
mail = imaplib.IMAP4_SSL(imap_host, 993)
rc, resp = mail.login(imap_user, getpass.getpass())

# select only unread messages from inbox
mail.select('Inbox')
status, data = mail.search(None, '(UNSEEN)')

# for each e-mail messages, print text content
for num in data[0].split():
    # get a single message and parse it by policy.SMTP (RFC compliant)
    status, data = mail.fetch(num, '(RFC822)')
    email_msg = data[0][1]
    email_msg = email.message_from_bytes(email_msg, policy=policy.SMTP)

    print("\n----- MESSAGE START -----\n")

    print("From: %s\nTo: %s\nDate: %s\nSubject: %s\n\n" % ( \
        str(email_msg['From']), \
        str(email_msg['To']), \
        str(email_msg['Date']), \
        str(email_msg['Subject'] )))

    # print only message parts that contain text data
    for part in email_msg.walk():
        if part.get_content_type() == "text/plain":
            for line in part.get_content().splitlines():
                print(line)

    print("\n----- MESSAGE END -----\n")