Skip to content
tom-kuca edited this page May 5, 2024 · 57 revisions

User-Contributed Hooks

Auto-Bcc

https://github.com/pazz/alot/issues/563#issuecomment-13717496

async def pre_envelope_send(ui, dbm):                                                 
        ADDR = "[email protected]"                                   
        BCC_ADDR = "[email protected]"                               
        from_fulladdr = ui.current_buffer.envelope.get_all("From")[0]           
        if ADDR in from_fulladdr:                                               
                ui.current_buffer.envelope.add("Bcc:", BCC_ADDR)

Auto-refresh search buffer

If it bothers you that a search buffer still shows unread messages after you have marked them read in another buffer and return to your search buffer, you may add the following hook:

def pre_buffer_focus(ui, dbm, buf):
	if buf.modename == 'search':
		buf.rebuild()

Save marked position in search buffer

To maintain the position of focus in a search buffer after you open another buffer and return to the search buffer use the following hooks (based on pull request #633)

def pre_buffer_open(ui, dbm, buf):
    current = ui.current_buffer
    if isinstance(current, alot.buffers.SearchBuffer):
        current.focused_thread = current.get_selected_thread()   # remember focus

def post_buffer_focus(ui, dbm, buf, success):
    if success and hasattr(buf, "focused_thread"):  # if buffer has saved focus
        if buf.focused_thread is not None:
            tid = buf.focused_thread.get_thread_id() 
            for pos, tlw in enumerate(buf.threadlist.get_lines()):
                if tlw.get_thread().get_thread_id() == tid:
                    buf.body.set_focus(pos)
                    break

Check for missing attachment before sending

https://github.com/pazz/alot/issues/395#issuecomment-13859844

import re

async def pre_envelope_send(ui=None, dbm=None, cmd=None):
    e = ui.current_buffer.envelope
    att = r".*(\battach(ed|ing)?\b|\bhere's|\bhere\s+is)"
    if (
        re.match(att, e.body_txt, re.DOTALL | re.IGNORECASE)
        and not e.attachments
    ):
        msg = 'No attachments. Send anyway?'
        if not (await ui.choice(msg, select='yes')) == 'yes':
            raise Exception("Send aborted")

Change From: address based on destination

Sometimes you want to change your From address based on where is the destination. I.e. you want to use work email address for company domain, project email address when addressing some mailing list and personal address otherwise. This will override every other setting:

import re

transitions = [
    ('.*work-domain\.com.*',
     'Your Name <[email protected]>'),
    ('.*[email protected].*',
     'Your Name <[email protected]>')
    ]

addr_trans = []
for addr, fr in transitions:
    addr_trans.append((re.compile("(To|Cc): %s" % addr, re.MULTILINE),
                       "From: %s" % fr))

def pre_edit_translate(bodytext, ui, dbm):
    for addr, new_from in addr_trans:
        if addr.search(bodytext):
            return re.sub('^From: .*$', new_from, bodytext, flags=re.MULTILINE)
    return bodytext

Manually marking a Github notification as read

Github automatically marks a notification as read if one views the corresponding email in a html-aware mailclient. This is done by a getting a small invisible 1-pixel beacon image. alot does not download this image per default. If you want to manually mark some github notification as seen, you can define the hook below and triger it using call:

import re
import urllib2


def github_mark_read(ui):
    msg = ui.current_buffer.get_selected_message()
    msgtext = str(msg.get_email())
    r = r"img src='(https://github.com/notifications/beacon/.*.gif)'"
    beacons = re.findall(r, msgtext)
    if beacons:
        urllib2.urlopen(beacons[0])
        ui.notify('removed from github notifications:\n %s' % beacons[0])

If you want to mark the focussed message (in thread mode) as read by hitting $, add the following to the binding section of the config:

[[bindings]]
  [[thread]]
    $ = call hooks.github_mark_read(ui)

Automatically construct list unsubscribe mails

https://github.com/pazz/alot/issues/656

Recent branches of alot (0.3.5-feature-mailto-666) contain helpers alot.helper.parse_mailto and alot.helper.mailto_to_envelope. The hook below uses those to construct unsubscribe mails for mailing lists by checking if (in thread mode) the currently highlighted message has a List-unsubscribe header that contains a mailto-string.

With the hooks defines as below, use call hooks.unsubscribe(ui) to trigger the function. This can of course also be bound to a keypress.

def unsubscribe(ui):
    """ 
    Unsubscribe from a mailing list.

    This hook reads the 'List-Unsubscribe' header of a mail in thread mode,
    constructs a unsubsribe-mail according to any mailto-url it finds
    and opens the new mail in an envelope buffer.
    """
    from alot.helper import mailto_to_envelope
    from alot.buffers import EnvelopeBuffer
    msg = ui.current_buffer.get_selected_message()
    e = msg.get_email()
    uheader = e['List-Unsubscribe']
    dtheader = e.get('Delivered-To', None)

    if uheader is not None:
        M = re.search(r'<(mailto:\S*?)>', uheader)
        if M is not None:
            env = mailto_to_envelope(M.group(1))
            if dtheader is not None:
                env['From'] = dtheader
            ui.buffer_open(EnvelopeBuffer(ui, env))
    else:
        ui.notify('focussed mail contains no \'List-Unsubscribe\' header',
                  'error')

Translate Message body via google translate (goslate):

update 09/2017: This does not work any more. Apparently google has shut down the API that the goslate package uses. "Free lunch is over. Thanks for using." (https://pypi.python.org/pypi/goslate) A possible alternative is to use the google API in combination with a dev key directly. Contributions welcome!

[bindings]
[[thread]]
', t' = "call hooks.translate(ui)"

the hook code is below. It uses google translate via goslate: https://pypi.python.org/pypi/goslate .

def translate(ui, targetlang='en'):
    # get msg content
    msg = ui.current_buffer.get_selected_message()
    msgtext = msg.accumulate_body()

    # translate
    import goslate
    gs = goslate.Goslate()
    tmsg = gs.translate(msgtext, targetlang)

    # replace message widgets content
    mt=ui.current_buffer.get_selected_messagetree()
    mt.replace_bodytext(tmsg)
    mt.refresh()

    # refresh the thread buffer
    ui.current_buffer.refresh()

reply to an email with PGP inline encryption or with PGP inline signature

To automatically decrypt PGP encrypted emails or remove the PGP header and signature from PGP signed emails, the following hook can be used:

from alot.settings.const import settings
import alot.crypto as crypto
from alot import errors

def text_quote(message):
    # avoid importing a big module by using a simple heuristic to guess the
    # right encoding
    def decode(s, encodings=('ascii', 'utf8', 'latin1')):
        for encoding in encodings:
            try:
                return s.decode(encoding)
            except UnicodeDecodeError:
                pass
        return s.decode('ascii', 'ignore')
    lines = message.splitlines()
    if len(lines) == 0:
        return ""
    # delete empty lines at beginning and end (some email client insert these
    # outside of the pgp signed message...)
    if lines[0] == '' or lines[-1] == '':
        from itertools import dropwhile
        lines = list(dropwhile(lambda l: l == '', lines))
        lines = list(reversed(list(dropwhile(lambda l: l == '', reversed(lines)))))
    if len(lines) > 0 and lines[0] == '-----BEGIN PGP MESSAGE-----' \
            and lines[-1] == '-----END PGP MESSAGE-----':
        try:
            sigs, d = crypto.decrypt_verify(message.encode('utf-8'))
            message = decode(d)
        except errors.GPGProblem:
            pass
    elif len(lines) > 0 and lines[0] == '-----BEGIN PGP SIGNED MESSAGE-----' \
            and lines[-1] == '-----END PGP SIGNATURE-----':
        # gpgme does not seem to be able to extract the plain text part of a signed message
        import gnupg
        gpg = gnupg.GPG()
        d = gpg.decrypt(message.encode('utf8'))
        message = d.data.decode('utf8')
    quote_prefix = settings.get('quote_prefix')
    return "\n".join([quote_prefix + line for line in message.splitlines()])

Add X-Debbugs-No-Ack for debbugs

The debbugs bug tracking system as used by gnu.org and debian.org is controlled via email. By default messages to a bug result in an acknowledgement email. To prevent these emails, the X-Debbugs-No-Ack header has to be added to the emails to the bug server. The following hook automatically adds this header to all emails to [email protected]:

from email.utils import getaddresses
async def pre_envelope_send(ui, dbm, cmd):
    e = ui.current_buffer.envelope
    found = False
    for header in ['To', 'Cc', 'Bcc']:
        for _, address in getaddresses(e.get_all(header, [])):
            if re.match(r'^\[email protected]$', address):
                found = True
                break
        if found:
            break
    if found:
        e.add("X-Debbugs-No-Ack", "kthxbye")

Getting new msg while alot is running

If you need to fetch your email and you've already configured notmuch hooks you may want to add a hook in alot in order to run notmuch new and fetch the new email when you press a key.

First you need to add some code in alot/hooks.py in order to define the function getmail

import subprocess

def getmail(ui=None):
    ui.notify("fetching email..")
    msg=subprocess.Popen("notmuch new".split(),stdout=subprocess.PIPE,stderr=subprocess.PIPE)

Then you can map a key in order to do get your emails just adding the following line in your alot/config file

[bindings]
   G = call hooks.getmail(ui)

Knowing which version/branch/commit of alot you are using

As an alot developer I run alot directly from git. In order to know which version of the code I am currently running (helpful if you switch branches) I use this hook function in my inital_command:

import alot
import os.path
import subprocess
def version_notification(ui):
    dn = os.path.dirname
    directory = dn(dn(alot.__file__))
    output = lambda *x: subprocess.check_output(x, cwd=directory).strip()
    commit = output('git', 'describe', '--tags')
    branch = output('git', 'rev-parse', '--abbrev-ref', 'HEAD')
    ui.notify('Version:    {}\nGit commit: {}\nGit branch: {}'.format(
        alot.__version__, commit, branch), timeout=10)

Open HTML emails in external browser

Useful for viewing and printing HTML emails as they are intended by the sender:

import alot
import tempfile
import webbrowser
from alot.helper import string_sanitize
from alot.helper import string_decode

# Helper method to extract the raw html part of a message. Note that it
# only extracts the first text/html part found.
def _get_raw_html(msg):
    mail = msg.get_email()

    for part in mail.walk():
        ctype = part.get_content_type()

        if ctype != "text/html":
            continue

        cd = part.get('Content-Disposition', '')

        if cd.startswith('attachment'):
            continue

        enc = part.get_content_charset() or 'utf-8'

        raw = string_decode(part.get_payload(decode=True), enc)

        return string_sanitize(raw), enc

    return None, None


# Opens HTML emails in an external browser.
# Related issue:
#  - https://github.com/pazz/alot/issues/1153
def open_in_browser(ui=None):
    ui.notify("Opening message in browser...")
    msg = ui.current_buffer.get_selected_message()

    htmlstr, enc = _get_raw_html(msg)

    if htmlstr == None:
        ui.notify("Email has no html part")
        return

    temp = tempfile.NamedTemporaryFile(prefix="alot-",suffix=".html",
                                       delete=False)
    temp.write(htmlstr.encode(enc))
    temp.flush()
    temp.close()
    webbrowser.open(temp.name)

Import GPG keys

This piece of code implements collection of keys both as ascii-armored inline text and as attached MIME parts, and allows importing them into your main keyring.

A temporary keyring is used to parse the keys found (because gpgme doesn't allow for other forms of key parsing, it seems) and a confirmation is required for each key found. Improvements are very welcome. :-)

Add the following to .config/alot/hooks.py:

import gpg
import os
import shutil
import signal
import subprocess
import tempfile

from contextlib import contextmanager


BEGIN_KEY = "-----BEGIN PGP PUBLIC KEY BLOCK-----"
END_KEY = "-----END PGP PUBLIC KEY BLOCK-----"


def _get_inline_keys(content):
    if BEGIN_KEY not in content:
        return []

    keys = []
    while content:
        start = content.find(BEGIN_KEY)
        if start == -1:
            # there are no more inline keys
            break
        content = content[start:]
        end = content.find(END_KEY) + len(END_KEY)
        key = content[0:end]
        keys.append(key)
        content = content[end:]

    return keys


def _get_attached_keys(attachments):
    keys = []
    for attachment in attachments:
        content_type = attachment.get_content_type()
        if content_type == 'application/pgp-keys':
            keys.append(attachment.get_data())
    return keys


@contextmanager
def temp_gpg_context():
    tempdir = tempfile.mkdtemp()
    tempctx = gpg.Context()
    tempctx.set_engine_info(gpg.constants.PROTOCOL_OpenPGP, home_dir=tempdir)
    try:
        yield tempctx
    finally:
        # Kill any gpg-agent's that have been opened
        lookfor = 'gpg-agent --homedir {}'.format(tempdir)
        out = subprocess.check_output(['ps', 'xo', 'pid,cmd'],
                                      stderr=open('/dev/null', 'w'))
        for each in out.strip().split('\n'):
            pid, cmd = each.strip().split(' ', 1)
            if cmd.startswith(lookfor):
                os.kill(int(pid), signal.SIGKILL)
        shutil.rmtree(tempdir)


async def import_keys(ui):
    ui.notify('Looking for keys in message...')
    m = ui.current_buffer.get_selected_message()
    content = m.get_text_content()
    attachments = m.get_attachments()
    inline = _get_inline_keys(content)
    attached = _get_attached_keys(attachments)
    keys = inline + attached

    if not keys:
        ui.notify('No keys found in message.')
        return

    for keydata in keys:
        with temp_gpg_context() as tempctx:
            tempctx.op_import(keydata)
            key = [k for k in tempctx.keylist()].pop()
            fpr = key.fpr
            uids = [u.uid for u in key.uids]
        confirm = 'Found key %s with uids:' % fpr
        for uid in uids:
            confirm += '\n  %s' % uid
        confirm += '\nImport key into keyring?'
        if (await ui.choice(confirm, select='yes')) == 'yes':
            # ***ATTENTION*** - operation in real keyring
            ctx = gpg.Context()
            ctx.op_import(keydata)
            ui.notify('Key imported: %s' % fpr)

You can add a key binding to call the hook in .config/alot/config as such:

[bindings]
  [[thread]]
    k = call hooks.import_keys(ui)

Attach GPG keys

The following hooks allow for searching for key in your default keyring in different ways and attaching them to a message.

Things to improve:

  • an alternative way to see the list of keys found and select one (or many) for attachment.
  • give the option of attaching a minimal key (i.e. using GPGME_EXPORT_MODE_MINIMAL).
  • others?
import gpg

from email.mime.base import MIMEBase
from email.encoders import encode_base64
from email.utils import parseaddr

from alot.db.attachment import Attachment
from alot.settings.const import settings


#
# Attach key
#

def _key_to_mime(ctx, fpr):
    """
    Return an 'application/pgp-keys' MIME part containing an ascii-armored
    OpenPGP public key.
    """
    filename = '0x{}.pub.asc'.format(fpr)
    key = gpg.Data()
    ctx.op_export(fpr, 0, key)
    key.seek(0, 0)
    content = key.read()
    part = MIMEBase('application', 'pgp-keys')
    part.set_payload(content)
    encode_base64(part)
    part.add_header('Content-Disposition', 'attachment', filename=filename)
    return part


def _attach_key(ui, pattern):
    """
    Attach an OpenPGP public key to the current envelope.
    """
    ctx = gpg.Context()
    ctx.armor = True
    keys = _list_keys(pattern)
    for key in keys:
        part = _key_to_mime(ctx, key.fpr)
        attachment = Attachment(part)
        ui.current_buffer.envelope.attachments.append(attachment)
        ui.notify('Attached key %s' % key.fpr)
    ui.current_buffer.rebuild()


def _list_keys(pattern):
    """
    Return a list of OpenPGP keys that match the given pattern.
    """
    ctx = gpg.Context()
    ctx.armor = True
    keys = [k for k in ctx.keylist(pattern)]
    return keys


async def attach_keys(ui):
    """
    Query the user for a pattern, search the default keyring, and offer to
    attach matching keys.
    """
    pattern = await ui.prompt('Search for key to attach')
    ui.notify('Looking for "{}" in keyring...'.format(pattern))
    keys = _list_keys(pattern)

    if not keys:
        ui.notify('No keys found.')
        return

    for key in keys:
        prompt = []
        fpr = "{}".format(key.fpr)
        prompt.append("Key 0x{}:".format(fpr))
        for uid in key.uids:
            prompt.append("  {}".format(uid.uid))
        prompt.append('Attach?')
        if (await ui.choice('\n'.join(prompt), select='yes')) == 'yes':
            _attach_key(ui, fpr)


def attach_my_key(ui):
    """
    Attach my own OpenPGP public key to the current envelope.
    """
    sender = ui.current_buffer.envelope.get('From', "")
    address = parseaddr(sender)[1]
    acc = settings.account_matching_address(address)
    fpr = acc.gpg_key.fpr
    return _attach_key(ui, fpr)


def attach_recipient_keys(ui):
    """
    Attach the OpenPGP public keys of all the recipients of the email.
    """
    to = ui.current_buffer.envelope.get('To', "")
    cc = ui.current_buffer.envelope.get('Cc', "")
    for recipient in to.split(',') + cc.split(','):
        address = parseaddr(recipient)[1]
        if address:
            _attach_key(ui, address)

You can add key bindings to call the hooks in .config/alot/config as such:

[bindings]
  [[envelope]]
    k = call hooks.attach_my_key(ui)
    K = call hooks.attach_keys(ui)
    'control k' = call hooks.attach_recipient_keys(ui)

Apply patches to repositories based on the notmuch tags applied

This hook allows you to bind a key to apply patches to different projects based on the notmuch tags of the email.

create a file called patch.config alongside your main alot config (usually in ~/.config/alot/patch.config), it is organized such that each section heading is a tag, and it has a single option directory which points to the source directory of that repository:

[lists/alot]
directory = ~/src/alot

Add a binding such as:

[bindings]
    [[thread]]
        a = call hooks.apply_patch(ui)
# Copyright © 2017 Dylan Baker
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import logging
import os
import subprocess

from alot.settings.utils import read_config



def _get_config():
    config_path = os.path.join(
        os.environ.get('XDG_CONFIG_HOME', os.path.join(os.environ['HOME'], '.config')),
        'alot', 'patch.config')
    return read_config(configpath=config_path)


CONFIG = _get_config()


def apply_patch(ui):
    message = ui.current_buffer.get_selected_message()
    filename = message.get_filename()

    for tag in message.get_tags():
        if tag in CONFIG:
            config = CONFIG[tag]
            break
    else:
        logging.debug('found: ' + ', '.join(message.get_tags()))
        ui.notify('No tags matched a config rule!', priority='error')
        return

    try:
        subprocess.check_output(
            ['git', '-C', os.path.expanduser(config['directory']), 'am', '-3', filename],
            stderr=subprocess.STDOUT)
    except Exception as e:
        ui.notify('Failed to apply patch. Reason:' + str(e), priority='error')
        logging.debug('git am output: ' + e.output)
    else:
        ui.notify('Patch applied.')

"Anonymize" message headers

The following modifications are an attempt to minimize the amount of metadata sent by alot in message headers in case you use it for multiple identities. Note that "real anonymity" is hard, and is by no means fully addresses by these modifications. If you need higher levels of anonymity, you should use Tor and Tails and possibly think about other measures when using the network and registering email accounts.

That said, the following measures intend to make all messages leave your computer with the following headers modified:

  • Message-Id - use localhost instead of machine's hostname/fqdn.
  • User-Agent - omit.
  • Date - use UTC time instead of publishing your timezone.

There are other ways to handle this (example: randomize time, use generic user-agent, etc). Feel free to improve these examples if you have nice ideas. :-)

First, set an empty (or generic custom) user agent configuration in ~/.config/alot/config:

user_agent =

Then, add the following hook to ~/.config/alot/hooks.py:

import random
import time


async def anonymize_message(ui, dbm, cmd):                                      
    anonymize_message_id(ui, dbm, cmd)                                          
    anonymize_date(ui, dbm, cmd)                                                
                                                                                
                                                                                
def anonymize_message_id(ui, dbm, cmd):                                         
    headers = ui.current_buffer.envelope.headers                                
    timeval = int(time.time()*100)                                              
    pid = random.getrandbits(15)                                                
    randint = random.getrandbits(64)                                            
    if 'Message-ID' not in headers:                                             
        msgid = '<{}.{}.{}>@localhost'.format(timeval, pid, randint)            
        headers.update({'Message-ID': [msgid]})                                 
                                                                                
                                                                                
def anonymize_date(ui, dbm, cmd):                                               
    # there's currently no way to modify the date in a hook before the message  
    # is sent, so our option is to monkey-patch the formatdate function and     
    # make it ignore localization.                                              
    from email import utils                                                     
    formatdate = utils.formatdate                                               
                                                                                
    def anonymized_formatdate(*args, **kwargs):                                 
        kwargs['localtime'] = False                                             
        return formatdate(*args, **kwargs)                                      
                                                                                
    utils.formatdate = anonymized_formatdate

Mimic Gmail's reply/forward prefix

def reply_prefix(*args, **kwargs):
  # On Tue, 9 Apr 2019 at 18:17 John Doe, <[email protected]> wrote:
  return "On {2:%a, %-d %b %Y at %H:%M} {0}, <{1}> wrote:\n\n".format(*args)

def forward_prefix(*args, **kwargs):
  # ---------- Forwarded message ---------
  # From: John Doe <[email protected]>
  # Date: Wed, 10 Apr 2019 at 00:23
  # Subject: Re: Cool mail
  # To: Jane Doe <[email protected]>
  # requires patched alot (https://github.com/pazz/alot/pull/1390)
  subject = alot.db.utils.decode_header(kwargs["message"].get("Subject", ""))
  to = alot.db.utils.decode_header(kwargs["message"].get("To", ""))
  return (" ---------- Forwarded message ---------\n"
             " From: {0} <{1}>\n"
             " Date: {2:%a, %-d %b %Y at %H:%M}\n"
             " Subject: {3}\n"
             " To: {4}\n\n\n"
         ).format(*args, subject, to)

UNDO/REDO toggletag actions + Save search position + better refresh (it keeps the current search position)

The following modifications are based on the above, but rewritten to support some undo/redo actions.

  # give the hooks some keybindings
  u=call hooks.undo(ui)
  Z=call hooks.redo(ui)
  `=call hooks.refresh(ui)
undolog = []

def _start_add_undo_entry(ui, dbm, cmd):
    "capture tags before toggletags command"
    store_pos(ui)

    thread = ui.current_buffer.get_selected_thread()
    if not thread:
        return

    undolog.append({
        'thread_id': thread.get_thread_id(),
        'before': thread.get_tags(),
        'ready': False,
        'undone': False,
    })

def _finish_add_undo_entry(ui, dbm, cmd):
    "capture tags after toggletags command"
    entry = undolog and undolog[-1]
    if not entry or entry['ready'] == True:
        return

    after = dbm.get_thread(entry['thread_id']).get_tags()
    before = entry.pop('before')
    entry['added_tags'] = after.difference(before)
    entry['removed_tags'] = before.difference(after)
    entry['ready'] = True

async def pre_search_toggletags(ui, dbm, cmd):
    _start_add_undo_entry(ui, dbm, cmd)

async def post_search_toggletags(ui, dbm, cmd):
    _finish_add_undo_entry(ui, dbm, cmd)

async def pre_thread_toggletags(ui, dbm, cmd):
    _start_add_undo_entry(ui, dbm, cmd)

async def post_thread_toggletags(ui, dbm, cmd):
    _finish_add_undo_entry(ui, dbm, cmd)

def undo(ui):
    "Undo toggletags"
    undoable = [entry for entry in undolog if entry['ready'] and not entry['undone']]
    entry = undoable and undoable[-1]
    if not entry:
        ui.notify("Nothing to undo")
        return
    thread = ui.dbman.get_thread(entry['thread_id'])
    thread.add_tags(entry['removed_tags'])
    thread.remove_tags(entry['added_tags'])
    refresh(ui, entry['thread_id'])
    entry['undone'] = True

    tag_change = ' '.join(['+{}'.format(t) for t in entry['removed_tags']] +
                          ['-{}'.format(t) for t in entry['added_tags']])
    ui.notify("Undo change: {} to: {}".format(tag_change, thread.get_subject()))

def redo(ui):
    "Redo toggletags"
    redoable = [entry for entry in undolog if entry['ready'] and entry['undone']]
    entry = redoable and redoable[0]
    if not entry:
        ui.notify("Nothing to redo")
        return
    thread = ui.dbman.get_thread(entry['thread_id'])
    thread.add_tags(entry['added_tags'])
    thread.remove_tags(entry['removed_tags'])
    refresh(ui, entry['thread_id'])
    entry['undone'] = False

    tag_change = ' '.join(['-{}'.format(t) for t in entry['removed_tags']] +
                          ['+{}'.format(t) for t in entry['added_tags']])
    ui.notify("Redo change: {} to: {}".format(tag_change, thread.get_subject()))

## Auto-refresh search buffer

def store_pos(ui):
    "Remember what position the selection was at"
    buf = ui.current_buffer
    if ui.mode == 'search':
        thread_id = buf and buf.get_selected_thread() and buf.get_selected_thread().get_thread_id()
        if thread_id:
            for pos, tlw in enumerate(buf.threadlist.get_lines()):
                if tlw.get_thread().get_thread_id() == thread_id:
                    buf.focused_thread_pos = pos

def restore_pos(ui, thread_id=None):
    "Restore the previously selected thread"
    buf = ui.current_buffer
    thread_id = getattr(buf, "focused_thread_id", thread_id)
    pos = getattr(buf, "focused_thread_pos", 0)
    if pos and ui.mode == 'search':
        buf.body.set_focus(pos)

def refresh(ui, thread_id=None, store_current=True):
    "Refresh and keep thread selection"
    if store_current:
        store_pos(ui)
    ui.dbman.flush()
    ui.current_buffer.rebuild()
    ui.update()
    restore_pos(ui, thread_id)

def pre_buffer_open(ui, dbm, buf):
    store_pos(ui)

def post_buffer_focus(ui, dbm, buf, success):
    restore_pos(ui)

Ask recipient for read receipt

# hooks.py
def toggle_disposition_notification(ui):
    """
    Ask to be notified when the email is read

    This hook adds the Disposition-Notification-To header to an envelope.
    """
    headers = ui.current_buffer.envelope.headers
    sender = headers.get('From', '')
    
    if headers.get('Disposition-Notification-To', None) is not None:
        headers.pop('Disposition-Notification-To')
    else:
        headers['Disposition-Notification-To'] = sender

    ui.current_buffer.rebuild()
    ui.update()

# config
[bindings]
    [[envelope]]
        n = call hooks.toggle_disposition_notification(ui)

Generate autcrypt header and import keys from it

While we wait for alot to support autocrypt directly (#1370) this hook provides some basic compatibility with the autocrypt header. Adding it to all our sent emails and adding a key binding to import keys from received emails:

# hooks.py
import gpg
import logging
import shutil
import subprocess

from alot.settings.const import settings
from base64 import b64encode, b64decode
from email.utils import parseaddr
from tempfile import mkdtemp


prefer_encrypt = "mutual"
num_cols = 78


class AutocryptError(Exception):
    pass


async def pre_envelope_send(ui, dbm, cmd):
    envelope = ui.current_buffer.envelope
    if envelope.get('Autocrypt', "") != "":
        return

    frm = envelope.get('From', "")
    address = parseaddr(frm)[1]
    acc = settings.account_matching_address(address)
    if not acc.gpg_key:
        return
    
    fpr = acc.gpg_key.fpr
    key = subprocess.check_output(
        ['gpg', '--export', '--export-options', 'export-minimal,no-export-attributes', fpr])
    keydata = b64encode(key).decode('utf-8', 'replace')
    keydata = '\n '.join(
        [keydata[i:i+num_cols] for i in range(0, len(keydata), num_cols)])
    envelope.add(
        'Autocrypt',
        "addr=%s;prefer-encrypt=%s;keydata=\n %s"
        % (address, prefer_encrypt, keydata))


async def import_autocrypt(ui):
    mail = ui.current_buffer.get_selected_message().get_email()
    try:
        autocrypt = parse_autocrypt(mail)
        message = "Import autocrypt key: %(addr)s (%(fpr)s)?" % autocrypt
        if (await ui.choice(message, cancel='no', msg_position='left')
                == 'no'):
            return

        c = gpg.Context()
        c.op_import(autocrypt["bytes"])
        logging.debug("key imported for: %s", autocrypt["addr"])
    except AutocryptError as e:
        logging.debug(str(e))
        ui.notify(str(e), 'error')


def parse_autocrypt(mail):
    autocrypt = {}

    aheader = mail.get('Autocrypt', "")
    frm = mail.get('From', "")
    address = parseaddr(frm)[1]
    if aheader == "":
        raise AutocryptError("No autocrypt header found")

    for i in aheader.split(";"):
        attr = i.split("=", 1)
        if len(attr) != 2:
            logging.debug("Can parse autocrypt attribute: %s", i)
            continue
        autocrypt[attr[0].strip()] = attr[1].strip()

    if "addr" not in autocrypt or "keydata" not in autocrypt:
        raise AutocryptError(
            "Address or keydata not in the autocrypt header: %s"
            % str(autocrypt.keys()))
    if address != autocrypt["addr"]:
        raise AutocryptError("The from doesn't match the autocrypt header")

    fpr, uids, kbytes = parse_keydata(autocrypt["keydata"])
    if autocrypt["addr"] not in uids:
        raise AutocryptError("%s not present in uids: %s"
                             % (autocrypt["addr"], str(uids)))

    autocrypt["fpr"] = fpr
    autocrypt["uids"] = uids
    autocrypt["bytes"] = kbytes
    return autocrypt


def parse_keydata(keydata):
    gpghome = mkdtemp("autocrypt")
    c = gpg.Context(home_dir=gpghome)

    keybytes = b64decode(keydata)
    c.op_import(keybytes)
    k = next(c.keylist())
    uids = [u.email for u in k.uids]

    shutil.rmtree(gpghome)
    return (k.fpr, uids, keybytes)

# config
[bindings]
    [[thread]]
        A = call hooks.import_autocrypt(ui)

Preview attachments with fzf

List attachments of an e-mail or a thread with inline preview of images. Require compliant terminal (kitty, iTerm2, Sixel), depends on fzf. Customize your fzf-preview.sh. Opens attachment with xdg-open. Skips attachments without a filename.

import os
import tempfile
import subprocess


def _preview_attachemnts(emails):
    directory = tempfile.TemporaryDirectory(prefix="alot-", suffix="-attachments")

    for email in emails:
        for part in email.walk():
            # multipart/* are just containers
            if part.get_content_maintype() == "multipart":
                continue
            filename = part.get_filename()
            if filename:
                with open(
                    os.path.join(directory.name, os.path.basename(filename)), "wb"
                ) as fp:
                    fp.write(part.get_payload(decode=True))

    # fzf is interactive, capture_output doesn't work out of the box
    resultfile = tempfile.NamedTemporaryFile(mode="w", suffix="fzf", prefix="alot-")
    with open(resultfile.name, "w") as outfile:
        result = subprocess.run(
            [
                "fzf",
                "--preview",
                "fzf-preview.sh {}",
                "--preview-window",
                "up,80%",
            ],
            cwd=directory.name,
            stdout=outfile,
        )

    if result.returncode == 0:
        with open(resultfile.name) as f:
            for line in f:
                filename = line.rstrip("\n")
                subprocess.run(["xdg-open", os.path.join(directory.name, filename)])
    resultfile.close()


def preview_attachments(ui=None):
    if not ui:
        return

    ui.notify("Listing attachments…")
    if ui.current_buffer.modename == "thread":
        messages = [ui.current_buffer.get_selected_message()]
    elif ui.current_buffer.modename == "search":
        messages = ui.current_buffer.get_selected_thread().get_messages()
    else:
        return

    emails = [msg.get_email() for msg in messages]
    _preview_attachemnts(emails)

Configuration

[[search]]
v = call hooks.preview_attachments(ui)

[[thread]]
v = call hooks.preview_attachments(ui)