-
Notifications
You must be signed in to change notification settings - Fork 297
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
01d6c18
commit f4b2278
Showing
5 changed files
with
1,381 additions
and
1,017 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
.. autoclass:: testcontainers.mailpit.MailpitContainer | ||
.. title:: testcontainers.mailpit.MailpitContainer |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,190 @@ | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); you may | ||
# not use this file except in compliance with the License. You may obtain | ||
# a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
# License for the specific language governing permissions and limitations | ||
# under the License. | ||
import logging | ||
import os | ||
import tempfile | ||
from datetime import UTC, datetime, timedelta | ||
from typing import NamedTuple, Self | ||
|
||
from cryptography import x509 | ||
from cryptography.hazmat.primitives import hashes, serialization | ||
from cryptography.hazmat.primitives.asymmetric import rsa | ||
from cryptography.hazmat.primitives.serialization import ( | ||
NoEncryption, | ||
) | ||
from cryptography.x509.oid import NameOID | ||
|
||
from testcontainers.core.container import DockerContainer | ||
from testcontainers.core.waiting_utils import wait_for_logs | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class MailpitUser(NamedTuple): | ||
username: str | ||
password: str | ||
|
||
|
||
class MailpitContainer(DockerContainer): # type: ignore[misc] | ||
""" | ||
Test container for Mailpit. The example below spins up a Mailpit server | ||
Default configuration supports SMTP with STARTTLS and allows login with any | ||
user/password. | ||
Options: | ||
- require_tls = True forces the use of SSL | ||
- users = [MailpitUser("jane", "secret"), MailpitUser("ron", "pass2")] only | ||
allows login with jane:secret or ron:pass2 | ||
Example: | ||
.. doctest:: | ||
>>> import smtplib | ||
>>> from testcontainers.mailpit import MailpitContainer | ||
>>> with MailpitContainer() as mailpit_container: | ||
... host_ip = mailpit_container.get_container_host_ip() | ||
... host_port = mailpit_container.get_exposed_smtp_port() | ||
... server = smtplib.SMTP( | ||
... mailpit_container.get_container_host_ip(), | ||
... mailpit_container.get_exposed_smtp_port(), | ||
... ) | ||
... code, _ = server.login("any", "auth") | ||
... assert code == 235 # authentication successful | ||
... # use server.sendmail(...) to send emails | ||
""" | ||
|
||
def __init__( # type: ignore[no-untyped-def] | ||
self, | ||
image: str = "axllent/mailpit", | ||
*, | ||
smtp_port: int = 1025, | ||
ui_port: int = 8025, | ||
users: list[MailpitUser] | None = None, | ||
require_tls: bool = False, | ||
**kwargs, | ||
) -> None: | ||
super().__init__(image=image, **kwargs) | ||
self.smtp_port = smtp_port | ||
self.ui_port = ui_port | ||
|
||
self.users = users if users is not None else [] | ||
self.auth_accept_any = int(len(self.users) == 0) | ||
|
||
self.require_tls = int(require_tls) | ||
self.tls_key, self.tls_cert = _generate_tls_certificates() | ||
with tempfile.NamedTemporaryFile(delete=False, delete_on_close=False) as tls_key_file: | ||
tls_key_file.write(self.tls_key) | ||
self.tls_key_file = tls_key_file.name | ||
|
||
with tempfile.NamedTemporaryFile(delete=False, delete_on_close=False) as tls_cert_file: | ||
tls_cert_file.write(self.tls_cert) | ||
self.tls_cert_file = tls_cert_file.name | ||
|
||
@property | ||
def _users_conf(self) -> str: | ||
"""Mailpit user configuration string | ||
"user:password user2:pass2 ...] | ||
""" | ||
return " ".join(f"{user.username}:{user.password}" for user in self.users) | ||
|
||
def _configure(self) -> None: | ||
if self.users: | ||
self.with_env("MP_SMTP_AUTH", self._users_conf) | ||
self.with_env("MP_SMTP_AUTH_ACCEPT_ANY", str(self.auth_accept_any)) | ||
|
||
self.with_env("MP_SMTP_REQUIRE_TLS", str(self.require_tls)) | ||
|
||
self.with_volume_mapping(self.tls_cert_file, "/cert.pem") | ||
self.with_volume_mapping(self.tls_key_file, "/key.pem") | ||
self.with_env("MP_SMTP_TLS_CERT", "/cert.pem") | ||
self.with_env("MP_SMTP_TLS_KEY", "/key.pem") | ||
|
||
self.with_exposed_ports(self.smtp_port, self.ui_port) | ||
|
||
def start(self) -> Self: | ||
super().start() | ||
wait_for_logs(self, ".*accessible via.*") | ||
return self | ||
|
||
def stop(self, *args, **kwargs) -> None: | ||
super().stop(*args, **kwargs) | ||
os.remove(self.tls_key_file) | ||
os.remove(self.tls_cert_file) | ||
|
||
def get_exposed_smtp_port(self) -> int: | ||
return int(self.get_exposed_port(self.smtp_port)) | ||
|
||
|
||
class _TLSCertificates(NamedTuple): | ||
private_key: bytes | ||
certificate: bytes | ||
|
||
|
||
def _generate_tls_certificates() -> _TLSCertificates: | ||
"""Generate self-signed TLS certificates as bytes""" | ||
private_key = _generate_private_key() | ||
certificate = _generate_self_signed_certificate(private_key) | ||
|
||
private_key_bytes = private_key.private_bytes( | ||
encoding=serialization.Encoding.PEM, | ||
format=serialization.PrivateFormat.TraditionalOpenSSL, | ||
encryption_algorithm=NoEncryption(), | ||
) | ||
certificate_bytes = certificate.public_bytes(serialization.Encoding.PEM) | ||
|
||
return _TLSCertificates(private_key_bytes, certificate_bytes) | ||
|
||
|
||
def _generate_private_key() -> rsa.RSAPrivateKey: | ||
"""Generate RSA private key""" | ||
return rsa.generate_private_key( | ||
public_exponent=65537, | ||
key_size=4096, | ||
) | ||
|
||
|
||
def _generate_self_signed_certificate( | ||
private_key: rsa.RSAPrivateKey, | ||
) -> x509.Certificate: | ||
"""Generate self-signed certificate with RSA private key""" | ||
domain = "mydomain.com" | ||
subject = issuer = x509.Name( | ||
[ | ||
x509.NameAttribute(NameOID.COUNTRY_NAME, "US"), | ||
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "California"), | ||
x509.NameAttribute(NameOID.LOCALITY_NAME, "San Francisco"), | ||
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "The Post Office"), | ||
x509.NameAttribute(NameOID.COMMON_NAME, domain), | ||
] | ||
) | ||
|
||
return ( | ||
x509.CertificateBuilder() | ||
.subject_name(subject) | ||
.issuer_name(issuer) | ||
.public_key(private_key.public_key()) | ||
.serial_number(x509.random_serial_number()) | ||
.not_valid_before(datetime.now(UTC)) | ||
.not_valid_after(datetime.now(UTC) + timedelta(days=3650)) # 10 years | ||
.add_extension( | ||
x509.SubjectAlternativeName([x509.DNSName(domain)]), | ||
critical=False, | ||
) | ||
.sign(private_key, hashes.SHA256()) | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,124 @@ | ||
import smtplib | ||
from email.mime.text import MIMEText | ||
from email.mime.multipart import MIMEMultipart | ||
|
||
import pytest | ||
|
||
from testcontainers.mailpit import MailpitContainer, MailpitUser | ||
|
||
_sender = "[email protected]" | ||
_receivers = ["[email protected]"] | ||
_msg = MIMEMultipart("mixed") | ||
_msg["From"] = _sender | ||
_msg["To"] = ", ".join(_receivers) | ||
_msg["Subject"] = "test" | ||
_msg.attach(MIMEText("test", "plain")) | ||
_sendmail_args = (_sender, _receivers, _msg.as_string()) | ||
|
||
|
||
def test_mailpit_basic(): | ||
config = MailpitContainer() | ||
with config as mailpit: | ||
server = smtplib.SMTP( | ||
mailpit.get_container_host_ip(), | ||
mailpit.get_exposed_smtp_port(), | ||
) | ||
server.login("any", "auth") | ||
server.sendmail(*_sendmail_args) | ||
|
||
|
||
def test_mailpit_starttls(): | ||
config = MailpitContainer() | ||
with config as mailpit: | ||
server = smtplib.SMTP( | ||
mailpit.get_container_host_ip(), | ||
mailpit.get_exposed_smtp_port(), | ||
) | ||
server.starttls() | ||
server.login("any", "auth") | ||
server.sendmail(*_sendmail_args) | ||
|
||
|
||
def test_mailpit_force_tls(): | ||
config = MailpitContainer(require_tls=True) | ||
with config as mailpit: | ||
server = smtplib.SMTP_SSL( | ||
mailpit.get_container_host_ip(), | ||
mailpit.get_exposed_smtp_port(), | ||
) | ||
server.login("any", "auth") | ||
server.sendmail(*_sendmail_args) | ||
|
||
|
||
def test_mailpit_basic_with_users_pass_auth(): | ||
users = [MailpitUser("user", "password")] | ||
config = MailpitContainer(users=users) | ||
with config as mailpit: | ||
server = smtplib.SMTP( | ||
mailpit.get_container_host_ip(), | ||
mailpit.get_exposed_smtp_port(), | ||
) | ||
server.login(mailpit.users[0].username, mailpit.users[0].password) | ||
server.sendmail(*_sendmail_args) | ||
|
||
|
||
def test_mailpit_basic_with_users_fail_auth(): | ||
users = [MailpitUser("user", "password")] | ||
config = MailpitContainer(users=users) | ||
with pytest.raises(smtplib.SMTPAuthenticationError): | ||
with config as mailpit: | ||
server = smtplib.SMTP( | ||
mailpit.get_container_host_ip(), | ||
mailpit.get_exposed_smtp_port(), | ||
) | ||
server.login("not", "good") | ||
|
||
|
||
def test_mailpit_starttls_with_users_pass_auth(): | ||
users = [MailpitUser("user", "password")] | ||
config = MailpitContainer(users=users) | ||
with config as mailpit: | ||
server = smtplib.SMTP( | ||
mailpit.get_container_host_ip(), | ||
mailpit.get_exposed_smtp_port(), | ||
) | ||
server.starttls() | ||
server.login(mailpit.users[0].username, mailpit.users[0].password) | ||
server.sendmail(*_sendmail_args) | ||
|
||
|
||
def test_mailpit_starttls_with_users_fail_auth(): | ||
users = [MailpitUser("user", "password")] | ||
config = MailpitContainer(users=users) | ||
with pytest.raises(smtplib.SMTPAuthenticationError): | ||
with config as mailpit: | ||
server = smtplib.SMTP( | ||
mailpit.get_container_host_ip(), | ||
mailpit.get_exposed_smtp_port(), | ||
) | ||
server.starttls() | ||
server.login("not", "good") | ||
|
||
|
||
def test_mailpit_force_tls_with_users_pass_auth(): | ||
users = [MailpitUser("user", "password")] | ||
config = MailpitContainer(users=users, require_tls=True) | ||
with config as mailpit: | ||
server = smtplib.SMTP_SSL( | ||
mailpit.get_container_host_ip(), | ||
mailpit.get_exposed_smtp_port(), | ||
) | ||
server.login(mailpit.users[0].username, mailpit.users[0].password) | ||
server.sendmail(*_sendmail_args) | ||
|
||
|
||
def test_mailpit_force_tls_with_users_fail_auth(): | ||
users = [MailpitUser("user", "password")] | ||
config = MailpitContainer(users=users, require_tls=True) | ||
with pytest.raises(smtplib.SMTPAuthenticationError): | ||
with config as mailpit: | ||
server = smtplib.SMTP_SSL( | ||
mailpit.get_container_host_ip(), | ||
mailpit.get_exposed_smtp_port(), | ||
) | ||
server.login("not", "good") |
Oops, something went wrong.