Skip to content

Commit

Permalink
Add tests for multiprocessing forkserver and dependencies
Browse files Browse the repository at this point in the history
* **Lib/test/test_multiprocessing_forkserver.py**
  - Add tests for shared memory access, process return code, and signal handling in forkserver.

* **Lib/test/test_dependencies.py**
  - Add tests to check for the existence of `pyvenv.cfg` and availability of `multiprocessing.shared_memory`, `ssl`, `pdb`, and `warnings` modules.
  • Loading branch information
gianfrancomauro committed Nov 30, 2024
1 parent 29d2791 commit 74f7d7f
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 0 deletions.
36 changes: 36 additions & 0 deletions Lib/test/test_dependencies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import unittest
import os
import sys

class TestDependencies(unittest.TestCase):

def test_pyvenv_cfg_exists(self):
venv_path = os.path.join(sys.prefix, 'pyvenv.cfg')
self.assertTrue(os.path.exists(venv_path), f"{venv_path} does not exist")

def test_shared_memory_access(self):
try:
import multiprocessing.shared_memory
except ImportError:
self.fail("multiprocessing.shared_memory module is not available")

def test_ssl_module(self):
try:
import ssl
except ImportError:
self.fail("ssl module is not available")

def test_pdb_module(self):
try:
import pdb
except ImportError:
self.fail("pdb module is not available")

def test_warnings_module(self):
try:
import warnings
except ImportError:
self.fail("warnings module is not available")

if __name__ == '__main__':
unittest.main()
48 changes: 48 additions & 0 deletions Lib/test/test_multiprocessing_forkserver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import unittest
import multiprocessing
import os
import signal
import time

class TestForkserver(unittest.TestCase):

def setUp(self):
self.manager = multiprocessing.Manager()
self.shared_dict = self.manager.dict()

def tearDown(self):
self.manager.shutdown()

def test_shared_memory_access(self):
def worker(shared_dict):
shared_dict['key'] = 'value'

process = multiprocessing.Process(target=worker, args=(self.shared_dict,))
process.start()
process.join()

self.assertEqual(self.shared_dict['key'], 'value')

def test_forkserver_process_return_code(self):
def worker():
os._exit(1)

process = multiprocessing.Process(target=worker)
process.start()
process.join()

self.assertEqual(process.exitcode, 1)

def test_forkserver_signal_handling(self):
def worker():
time.sleep(5)

process = multiprocessing.Process(target=worker)
process.start()
os.kill(process.pid, signal.SIGTERM)
process.join()

self.assertEqual(process.exitcode, -signal.SIGTERM)

if __name__ == '__main__':
unittest.main()

0 comments on commit 74f7d7f

Please sign in to comment.