You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Code :
import asyncio
from aioimaplib import aioimaplib
import asyncio
from aioimaplib import aioimaplib
from email import message_from_bytes
from email.header import decode_header
async def fetch_email_content(imap_client, email_id):
"""
Fetch and decode the content of the email with the given email_id.
"""
res, data = await imap_client.fetch(email_id, '(BODY[HEADER.FIELDS (FROM SUBJECT DATE)] BODY[TEXT])')
print('reached')
if res == 'OK':
email_data = data[0][1]
email_message = message_from_bytes(email_data)
# Get the sender
sender = decode_header(email_message["From"])[0][0]
if isinstance(sender, bytes):
sender = sender.decode()
# Get the subject
subject = decode_header(email_message["Subject"])[0][0]
if isinstance(subject, bytes):
subject = subject.decode()
# Get the date
date = email_message["Date"]
# Get the content
# Email body content may be multipart, so we need to handle that
if email_message.is_multipart():
for part in email_message.walk():
content_type = part.get_content_type()
if content_type == "text/plain":
content = part.get_payload(decode=True).decode()
break
else:
content = email_message.get_payload(decode=True).decode()
# Print the email content in the desired format
print(f"Sender: {sender}")
print(f"Subject: {subject}")
print(f"Content: {content}")
print(f"Date: {date}")
else:
print("Failed to fetch email")
async def check_mailbox(host, user, password):
"""
Continuously check mailbox using IMAP IDLE for new email alerts.
"""
imap_client = aioimaplib.IMAP4_SSL(host=host)
await imap_client.wait_hello_from_server()
# Log in to the server
await imap_client.login(user, password)
# Select the INBOX folder
res, data = await imap_client.select("INBOX")
if res != 'OK':
print(f"Failed to select inbox: {res}")
return
print(f"There are {data[0]} messages in the INBOX.")
# Start monitoring the mailbox
print("Monitoring mailbox for new emails...")
while True:
try:
# Enter IDLE mode to wait for new emails
await imap_client.idle_start()
data = await imap_client.wait_server_push()
# Debugging: print the raw response to understand its structure
print(f"Server response: {data}")
# Check if a new email notification is received
if b"EXISTS" in data[0]:
print("New email received!")
# Fetch the latest email id (use the most recent one)
data=imap_client.fetch('1', '(RFC822)',timeout=400)
await asyncio.sleep(5)
print (res,data)
# if data:
# # email_id = data[0][0].decode() # Extract the email ID
# await fetch_email_content(imap_client, '1')
# # Break IDLE and re-enter to catch more updates
imap_client.idle_done()
except Exception as e:
print(f"Error: {e}")
break
# Logout gracefully
await imap_client.logout()
The text was updated successfully, but these errors were encountered:
Try moving imap_client.idle_done() before you fetch the email, like so:
# Log in to the serverawaitimap_client.login(user, password)
# Select the INBOX folderres, data=awaitimap_client.select("INBOX")
ifres!='OK':
print(f"Failed to select inbox: {res}")
returnprint(f"There are {data[0]} messages in the INBOX.")
# Start monitoring the mailboxprint("Monitoring mailbox for new emails...")
whileTrue:
try:
# Enter IDLE mode to wait for new emailsawaitimap_client.idle_start()
data=awaitimap_client.wait_server_push()
# Debugging: print the raw response to understand its structureprint(f"Server response: {data}")
# Check if a new email notification is received# # Break IDLE and re-enter to catch more updatesimap_client.idle_done()
ifb"EXISTS"indata[0]:
print("New email received!")
# Fetch the latest email id (use the most recent one)data=imap_client.fetch('1', '(RFC822)',timeout=400)
awaitasyncio.sleep(5)
print (res,data)
# if data:# # email_id = data[0][0].decode() # Extract the email ID# await fetch_email_content(imap_client, '1')exceptExceptionase:
print(f"Error: {e}")
break# Logout gracefullyawaitimap_client.logout()
I assume you are not be able to send any requests to the imap server in idle mode. If you want to fetch the most recent email (the one that was received), you can parse the id from the EXISTS message in data and fetch using that instead of '1'. Spent way too much time fixing the same issue, so hopefully this helps someone.
Whenever I fetch email, it stucks infinitely.
Code :
import asyncio
from aioimaplib import aioimaplib
import asyncio
from aioimaplib import aioimaplib
from email import message_from_bytes
from email.header import decode_header
async def fetch_email_content(imap_client, email_id):
"""
Fetch and decode the content of the email with the given email_id.
"""
res, data = await imap_client.fetch(email_id, '(BODY[HEADER.FIELDS (FROM SUBJECT DATE)] BODY[TEXT])')
print('reached')
if res == 'OK':
email_data = data[0][1]
email_message = message_from_bytes(email_data)
async def check_mailbox(host, user, password):
"""
Continuously check mailbox using IMAP IDLE for new email alerts.
"""
imap_client = aioimaplib.IMAP4_SSL(host=host)
await imap_client.wait_hello_from_server()
The text was updated successfully, but these errors were encountered: