Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(modules): Mailpit Container #625

Merged
3 changes: 3 additions & 0 deletions modules/mailpit/README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
.. autoclass:: testcontainers.mailpit.MailpitUser
.. autoclass:: testcontainers.mailpit.MailpitContainer
.. title:: testcontainers.mailpit.MailpitContainer
243 changes: 243 additions & 0 deletions modules/mailpit/testcontainers/mailpit/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import os
import tempfile
from datetime import datetime, timedelta, timezone
from typing import TYPE_CHECKING, Any, NamedTuple

from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.serialization import (
NoEncryption,
)
from cryptography.x509.oid import NameOID

from testcontainers.core.container import DockerContainer
from testcontainers.core.waiting_utils import wait_for_logs

if TYPE_CHECKING:
from typing_extensions import Self


class MailpitUser(NamedTuple):
"""Mailpit user for authentication

Helper class to define a user for Mailpit authentication.

This is just a named tuple for username and password.
Comment on lines +35 to +40
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagine this could be a controversial addition, since it's the first class being exported from community modules that isn't a container.

The reason I've done this is because it's a more descriptive and intuitive datatype (to me):

- users:  list[tuple[str, str]] | None
+ users:  list[MailpitUser] | None



Example:

.. doctest::

>>> from testcontainers.mailpit import MailpitUser

>>> users = [
... MailpitUser("jane", "secret"),
... MailpitUser("ron", "pass2"),
... ]

>>> for user in users:
... print(user.username, user.password)
...
jane secret
ron pass2

>>> username, password = users[0]

>>> print(username, password)
jane secret
"""

username: str
password: str


class MailpitContainer(DockerContainer):
"""
Test container for Mailpit. The example below spins up a Mailpit server

Default configuration supports SMTP with STARTTLS and allows login with any
user/password.

Options:

* ``require_tls = True`` forces the use of SSL
* ``users = [MailpitUser("jane", "secret"), MailpitUser("ron", "pass2")]`` \
only allows login with ``jane:secret`` or ``ron:pass2``

Simple example:

.. doctest::

>>> import smtplib

>>> from testcontainers.mailpit import MailpitContainer

>>> with MailpitContainer() as mailpit_container:
... host_ip = mailpit_container.get_container_host_ip()
... host_port = mailpit_container.get_exposed_smtp_port()
... server = smtplib.SMTP(
... mailpit_container.get_container_host_ip(),
... mailpit_container.get_exposed_smtp_port(),
... )
... code, _ = server.login("any", "auth")
... assert code == 235 # authentication successful
... # use server.sendmail(...) to send emails

Example with auth and forced TLS:

.. doctest::

>>> import smtplib

>>> from testcontainers.mailpit import MailpitContainer, MailpitUser

>>> users = [MailpitUser("jane", "secret"), MailpitUser("ron", "pass2")]

>>> with MailpitContainer(users=users, require_tls=True) as mailpit_container:
... host_ip = mailpit_container.get_container_host_ip()
... host_port = mailpit_container.get_exposed_smtp_port()
... server = smtplib.SMTP_SSL(
... mailpit_container.get_container_host_ip(),
... mailpit_container.get_exposed_smtp_port(),
... )
... code, _ = server.login("jane", "secret")
... assert code == 235 # authentication successful
... # use server.sendmail(...) to send emails
"""

def __init__(
self,
image: str = "axllent/mailpit",
*,
smtp_port: int = 1025,
ui_port: int = 8025,
users: list[MailpitUser] | None = None,
require_tls: bool = False,
**kwargs: Any,
) -> None:
super().__init__(image=image, **kwargs)
self.smtp_port = smtp_port
self.ui_port = ui_port

self.users = users if users is not None else []
self.auth_accept_any = int(len(self.users) == 0)

self.require_tls = int(require_tls)
self.tls_key, self.tls_cert = _generate_tls_certificates()
with tempfile.NamedTemporaryFile(delete=False) as tls_key_file:
tls_key_file.write(self.tls_key)
self.tls_key_file = tls_key_file.name

with tempfile.NamedTemporaryFile(delete=False) as tls_cert_file:
tls_cert_file.write(self.tls_cert)
self.tls_cert_file = tls_cert_file.name

@property
def _users_conf(self) -> str:
"""Mailpit user configuration string

"user:password user2:pass2 ...]
"""
return " ".join(f"{user.username}:{user.password}" for user in self.users)

def _configure(self) -> None:
if self.users:
self.with_env("MP_SMTP_AUTH", self._users_conf)
self.with_env("MP_SMTP_AUTH_ACCEPT_ANY", str(self.auth_accept_any))

self.with_env("MP_SMTP_REQUIRE_TLS", str(self.require_tls))

self.with_volume_mapping(self.tls_cert_file, "/cert.pem")
self.with_volume_mapping(self.tls_key_file, "/key.pem")
self.with_env("MP_SMTP_TLS_CERT", "/cert.pem")
self.with_env("MP_SMTP_TLS_KEY", "/key.pem")

self.with_exposed_ports(self.smtp_port, self.ui_port)

def start(self) -> Self:
super().start()
wait_for_logs(self, ".*accessible via.*")
return self

def stop(self, *args: Any, **kwargs: Any) -> None:
super().stop(*args, **kwargs)
os.remove(self.tls_key_file)
os.remove(self.tls_cert_file)

def get_exposed_smtp_port(self) -> int:
return int(self.get_exposed_port(self.smtp_port))


class _TLSCertificates(NamedTuple):
private_key: bytes
certificate: bytes


def _generate_tls_certificates() -> _TLSCertificates:
"""Generate self-signed TLS certificates as bytes"""
private_key = _generate_private_key()
certificate = _generate_self_signed_certificate(private_key)

private_key_bytes = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=NoEncryption(),
)
certificate_bytes = certificate.public_bytes(serialization.Encoding.PEM)

return _TLSCertificates(private_key_bytes, certificate_bytes)


def _generate_private_key() -> rsa.RSAPrivateKey:
"""Generate RSA private key"""
return rsa.generate_private_key(
public_exponent=65537,
key_size=4096,
)


def _generate_self_signed_certificate(
private_key: rsa.RSAPrivateKey,
) -> x509.Certificate:
"""Generate self-signed certificate with RSA private key"""
domain = "mydomain.com"
subject = issuer = x509.Name(
[
x509.NameAttribute(NameOID.COUNTRY_NAME, "US"),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "California"),
x509.NameAttribute(NameOID.LOCALITY_NAME, "San Francisco"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "The Post Office"),
x509.NameAttribute(NameOID.COMMON_NAME, domain),
]
)

return (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(issuer)
.public_key(private_key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(datetime.now(timezone.utc))
.not_valid_after(datetime.now(timezone.utc) + timedelta(days=3650)) # 10 years
.add_extension(
x509.SubjectAlternativeName([x509.DNSName(domain)]),
critical=False,
)
.sign(private_key, hashes.SHA256())
)
Copy link
Contributor Author

@oliverlambson oliverlambson Jun 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've also added a py.typed as this module is fully type-hinted (see #504). Running mypy on it reveals the type hint issues in core, but raises no issues within this module

Empty file.
124 changes: 124 additions & 0 deletions modules/mailpit/tests/test_mailpit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

import pytest

from testcontainers.mailpit import MailpitContainer, MailpitUser

_sender = "[email protected]"
_receivers = ["[email protected]"]
_msg = MIMEMultipart("mixed")
_msg["From"] = _sender
_msg["To"] = ", ".join(_receivers)
_msg["Subject"] = "test"
_msg.attach(MIMEText("test", "plain"))
_sendmail_args = (_sender, _receivers, _msg.as_string())


def test_mailpit_basic():
config = MailpitContainer()
with config as mailpit:
server = smtplib.SMTP(
mailpit.get_container_host_ip(),
mailpit.get_exposed_smtp_port(),
)
server.login("any", "auth")
server.sendmail(*_sendmail_args)


def test_mailpit_starttls():
config = MailpitContainer()
with config as mailpit:
server = smtplib.SMTP(
mailpit.get_container_host_ip(),
mailpit.get_exposed_smtp_port(),
)
server.starttls()
server.login("any", "auth")
server.sendmail(*_sendmail_args)


def test_mailpit_force_tls():
config = MailpitContainer(require_tls=True)
with config as mailpit:
server = smtplib.SMTP_SSL(
mailpit.get_container_host_ip(),
mailpit.get_exposed_smtp_port(),
)
server.login("any", "auth")
server.sendmail(*_sendmail_args)


def test_mailpit_basic_with_users_pass_auth():
users = [MailpitUser("user", "password")]
config = MailpitContainer(users=users)
with config as mailpit:
server = smtplib.SMTP(
mailpit.get_container_host_ip(),
mailpit.get_exposed_smtp_port(),
)
server.login(mailpit.users[0].username, mailpit.users[0].password)
server.sendmail(*_sendmail_args)


def test_mailpit_basic_with_users_fail_auth():
users = [MailpitUser("user", "password")]
config = MailpitContainer(users=users)
with pytest.raises(smtplib.SMTPAuthenticationError):
with config as mailpit:
server = smtplib.SMTP(
mailpit.get_container_host_ip(),
mailpit.get_exposed_smtp_port(),
)
server.login("not", "good")


def test_mailpit_starttls_with_users_pass_auth():
users = [MailpitUser("user", "password")]
config = MailpitContainer(users=users)
with config as mailpit:
server = smtplib.SMTP(
mailpit.get_container_host_ip(),
mailpit.get_exposed_smtp_port(),
)
server.starttls()
server.login(mailpit.users[0].username, mailpit.users[0].password)
server.sendmail(*_sendmail_args)


def test_mailpit_starttls_with_users_fail_auth():
users = [MailpitUser("user", "password")]
config = MailpitContainer(users=users)
with pytest.raises(smtplib.SMTPAuthenticationError):
with config as mailpit:
server = smtplib.SMTP(
mailpit.get_container_host_ip(),
mailpit.get_exposed_smtp_port(),
)
server.starttls()
server.login("not", "good")


def test_mailpit_force_tls_with_users_pass_auth():
users = [MailpitUser("user", "password")]
config = MailpitContainer(users=users, require_tls=True)
with config as mailpit:
server = smtplib.SMTP_SSL(
mailpit.get_container_host_ip(),
mailpit.get_exposed_smtp_port(),
)
server.login(mailpit.users[0].username, mailpit.users[0].password)
server.sendmail(*_sendmail_args)


def test_mailpit_force_tls_with_users_fail_auth():
users = [MailpitUser("user", "password")]
config = MailpitContainer(users=users, require_tls=True)
with pytest.raises(smtplib.SMTPAuthenticationError):
with config as mailpit:
server = smtplib.SMTP_SSL(
mailpit.get_container_host_ip(),
mailpit.get_exposed_smtp_port(),
)
server.login("not", "good")
Loading