-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstress_test.py
105 lines (87 loc) · 3.35 KB
/
stress_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import asyncio
from asyncio import AbstractEventLoop
from concurrent.futures import ProcessPoolExecutor
import time
from functools import partial
from random import randint
import random
from typing import Callable
from algorithms import (insert_sort, selection_sort,
bubble_sort, count_sort,
quick_sort, merge_sort)
random.seed(1)
class CustomBenchamrk:
"""Class for CPU benchmark by means of a stress test."""
def __init__(self,
loop: AbstractEventLoop,
chosen_mods: set[str],
arrays_number: int,
numbers_amount: int,
alghoritm: str,
output_results: Callable,
activate_start_button: Callable) -> None:
self._load_test_future = None
self._loop = loop
self._chosen_mods = chosen_mods
self._arrays_number = arrays_number
self._alghoritm = alghoritm
self._output_results = output_results
self._numbers_amount = numbers_amount
self._activate_start_button = activate_start_button
# Local data
self._func_choice: dict[int, Callable] = {
0: insert_sort,
1: selection_sort,
2: bubble_sort,
3: count_sort,
4: quick_sort,
5: merge_sort
}
self.single_time: float = None
self.multi_time: float = None
def start(self) -> None:
future = asyncio.run_coroutine_threadsafe(self._test_1(), self._loop)
self._load_test_future = future
def cancel(self) -> None:
if self._load_test_future:
self._loop.call_soon_threadsafe(self._load_test_future.cancel)
self._output_results("Отмена", "Отмена")
async def _test_1(self) -> None:
nums = self.get_test_data()
# testing single-processing
if "Однопроцессность" in self._chosen_mods:
start_s = time.time()
for num in nums:
self._func_choice[self._alghoritm](num)
end_s = time.time()
self.single_time = end_s - start_s
# creates a new array with the same numbers
# because prev array was sorted
nums = self.get_test_data()
# gives chill out to our CPU for 3 seconds
await asyncio.sleep(3)
# testing multi-processing
if "Многопроцессность" in self._chosen_mods:
start_m = time.time()
with ProcessPoolExecutor() as process_pool:
calls = [
partial(self._func_choice[self._alghoritm], num)
for num in nums
]
call_coros = []
for call in calls:
call_coros.append(
self._loop.run_in_executor(process_pool, call)
)
await asyncio.gather(*call_coros)
end_m = time.time()
self.multi_time = end_m - start_m
# output result data into GUI
self._output_results(self.single_time, self.multi_time)
self._activate_start_button()
def get_test_data(self) -> list[list[int]]:
return [
[
randint(1, 1000) for n in range(self._numbers_amount)
] for _ in range(self._arrays_number)
]