Skip to content

Commit

Permalink
Add support for converting sessions to / from ASN1 representation
Browse files Browse the repository at this point in the history
  • Loading branch information
mdulaney committed Dec 14, 2024
1 parent 43a2e36 commit e889343
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 5 deletions.
15 changes: 15 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,21 @@ Changelog
Versions are year-based with a strict backward-compatibility policy.
The third digit is only for regressions.

24.4.0 (UNRELEASED)
-------------------

Backward-incompatible changes:
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Deprecations:
^^^^^^^^^^^^^

Changes:
^^^^^^^^

* Added ``OpenSSL.SSL.Session.i2d`` to convert session objects to ASN1. Updated ``OpenSSL.SSL.Session`` constructor to support conversion from ASN1. `#1373 <https://github.com/pyca/pyopenssl/pull/1373>`_.
* ``cryptography`` minimum version is now 44.0.x.

24.3.0 (2024-11-27)
-------------------

Expand Down
5 changes: 4 additions & 1 deletion doc/api/ssl.rst
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ Context, Connection.
:noindex:

.. autoclass:: Session
:noindex:


.. py:class:: Connection(context, socket)
Expand Down Expand Up @@ -247,8 +248,10 @@ Context objects have the following methods:
Session objects
---------------

Session objects have no methods.
Session objects have the following methods:

.. autoclass:: OpenSSL.SSL.Session
:members:

.. _openssl-connection:

Expand Down
43 changes: 42 additions & 1 deletion src/OpenSSL/SSL.py
Original file line number Diff line number Diff line change
Expand Up @@ -821,10 +821,51 @@ class Session:
parameters which may be re-used to speed up the setup of subsequent
connections.
:param data: An optional bytes object of an ASN1 encoded Session.
.. versionadded:: 0.14
"""

_session: Any
_session: Any = None

def __init__(self, data: bytes | None = None) -> None:
if data is None:
return

p = _ffi.new("unsigned char[]", data)
pp = _ffi.new("unsigned char **")
pp[0] = p
length = _ffi.cast("long", len(data))

session = _lib.d2i_SSL_SESSION(_ffi.NULL, pp, length)
if session == _ffi.NULL:
_raise_current_error()

self._session = _ffi.gc(session, _lib.SSL_SESSION_free)

def i2d(self) -> bytes:
"""
Convert the Session object to an ASN1 encoded bytes object.
:return A bytes object representing the ASN1 encoded session.
"""

if self._session is None:
raise ValueError("Not a valid session")

length = _lib.i2d_SSL_SESSION(self._session, _ffi.NULL)
if length == 0:
raise ValueError("Not a valid session")

pp = _ffi.new("unsigned char **")
p = _ffi.new("unsigned char[]", length)
pp[0] = p

length = _lib.i2d_SSL_SESSION(self._session, pp)
if length == 0:
raise ValueError("Not a valid session")

return _ffi.buffer(p, length)[:]


class Context:
Expand Down
100 changes: 98 additions & 2 deletions tests/test_ssl.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,17 +321,21 @@ def _create_certificate_chain():
]


def loopback_client_factory(socket, version=SSLv23_METHOD):
def loopback_client_factory(socket, version=SSLv23_METHOD, session_data=None):
client = Connection(Context(version), socket)
if session_data is not None:
client.set_session(Session(session_data))
client.set_connect_state()
return client


def loopback_server_factory(socket, version=SSLv23_METHOD):
def loopback_server_factory(socket, version=SSLv23_METHOD, session_data=None):
ctx = Context(version)
ctx.use_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem))
ctx.use_certificate(load_certificate(FILETYPE_PEM, server_cert_pem))
server = Connection(ctx, socket)
if session_data is not None:
server.set_session(Session(session_data))
server.set_accept_state()
return server

Expand Down Expand Up @@ -2176,6 +2180,98 @@ def test_construction(self):
new_session = Session()
assert isinstance(new_session, Session)

def test_d2i_fail(self):
with pytest.raises(Error) as e:
Session(b"abc" * 1000)

assert e.value.args[0][0] in [
# 1.1.x
(
"asn1 encoding routines",
"asn1_check_tlen",
"wrong tag",
),
# 3.0.x
(
"asn1 encoding routines",
"",
"wrong tag",
),
]

assert e.value.args[0][1] in [
# 1.1.x
(
"asn1 encoding routines",
"asn1_item_embed_d2i",
"nested asn1 error",
),
# 3.0.x
(
"asn1 encoding routines",
"",
"nested asn1 error",
),
]

def test_session_success(self):
session_id = (
b"\x51\x6d\x1d\x18\xc3\xb5\x86\x81\xc6\x79\x89\x2c\x89\x3e\x56\x33"
b"\xa7\x9c\xcd\x9b\x87\xbb\xb3\xdc\xf6\x76\x70\xf9\xc0\xdd\xf4\xef"
)

master_key = (
b"\x0f\xb2\x51\xe3\x15\x60\x2d\xef\x6e\x6d\xd2\x94\x2d\xe5\x37\x96"
b"\x72\xfa\xce\xb0\x39\xcc\x8d\xdf\xab\x32\xcc\x75\x0c\x66\xf9\xfd"
b"\xef\xbc\xc6\x2a\x8f\x9c\x35\x16\xfd\x4d\x38\xd9\xf9\xeb\x1d\xe4"
)

session_data = (
# sequence length=0x71
b"\x30\x71"
# integer (version)
b"\x02\x01\x01"
# integer (SSL version)
b"\x02\x02\x03\x03"
# octet-string (cipher suite)
b"\x04\x02\xc0\x30"
# octet-string length=0x20 (session id)
b"\x04\x20"
+ session_id
+
# octet-string length=0x30 (master secret)
b"\x04\x30"
+ master_key
+
# application (1), integer (time)
b"\xa1\x06\x02\x04"
+ b"\x66\xec\x4c\x2d"
+
# application (2), integer (timeout)
b"\xa2\x04\x02\x02"
+ b"\x02\x58"
+
# application (4), octet-string (session id context)
b"\xa4\x02\x04"
+ b"\x00"
)
serverSocket, clientSocket = socket_pair()

client = loopback_client_factory(
clientSocket, session_data=session_data
)
server = loopback_server_factory(
serverSocket, session_data=session_data
)

assert client.master_key() == master_key
assert server.master_key() == master_key

handshake(client, server)

client.send(b"hello world")
assert b"hello world" == server.recv(len(b"hello world"))


@pytest.fixture(params=["context", "connection"])
def ctx_or_conn(request) -> Union[Context, Connection]:
Expand Down
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ extras =
test
deps =
coverage>=4.2
cryptographyMinimum: cryptography==41.0.5
cryptographyMinimum: cryptography==44.0.0
randomorder: pytest-randomly
setenv =
# Do not allow the executing environment to pollute the test environment
Expand Down

0 comments on commit e889343

Please sign in to comment.