Skip to content

Commit

Permalink
Merge pull request #7214 from simon-mazenoux/feat-refactor-for-taskqu…
Browse files Browse the repository at this point in the history
…euedb

[8.1] Refactor TaskQueueDB for diracx
  • Loading branch information
chrisburr authored Sep 28, 2023
2 parents 4849196 + 14b5cc7 commit c3cea97
Show file tree
Hide file tree
Showing 2 changed files with 278 additions and 46 deletions.
108 changes: 62 additions & 46 deletions src/DIRAC/WorkloadManagementSystem/DB/TaskQueueDB.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
""" TaskQueueDB class is a front-end to the task queues db
"""
from collections import defaultdict
import random
import string
from typing import Any

from DIRAC import S_ERROR, S_OK, gConfig
from DIRAC.ConfigurationSystem.Client.Helpers import Registry
Expand Down Expand Up @@ -1162,57 +1164,13 @@ def __setPrioritiesForEntity(self, user, userGroup, share, connObj=False, consol
tqDict = dict(result["Value"])
if not tqDict:
return S_OK()
# Calculate Sum of priorities
totalPrio = 0
for k in tqDict:
if tqDict[k] > 0.1 or not allowBgTQs:
totalPrio += tqDict[k]
# Update prio for each TQ
for tqId in tqDict:
if tqDict[tqId] > 0.1 or not allowBgTQs:
prio = (share / totalPrio) * tqDict[tqId]
else:
prio = TQ_MIN_SHARE
prio = max(prio, TQ_MIN_SHARE)
tqDict[tqId] = prio

# Generate groups of TQs that will have the same prio=sum(prios) maomenos
result = self.retrieveTaskQueues(list(tqDict))
if not result["OK"]:
return result
allTQsData = result["Value"]
tqGroups = {}
for tqid in allTQsData:
tqData = allTQsData[tqid]
for field in ("Jobs", "Priority") + priorityIgnoredFields:
if field in tqData:
tqData.pop(field)
tqHash = []
for f in sorted(tqData):
tqHash.append(f"{f}:{tqData[f]}")
tqHash = "|".join(tqHash)
if tqHash not in tqGroups:
tqGroups[tqHash] = []
tqGroups[tqHash].append(tqid)
tqGroups = [tqGroups[td] for td in tqGroups]

# Do the grouping
for tqGroup in tqGroups:
totalPrio = 0
if len(tqGroup) < 2:
continue
for tqid in tqGroup:
totalPrio += tqDict[tqid]
for tqid in tqGroup:
tqDict[tqid] = totalPrio

# Group by priorities
prioDict = {}
for tqId in tqDict:
prio = tqDict[tqId]
if prio not in prioDict:
prioDict[prio] = []
prioDict[prio].append(tqId)

prioDict = calculate_priority(tqDict, allTQsData, share, allowBgTQs)

# Execute updates
for prio, tqs in prioDict.items():
Expand All @@ -1235,3 +1193,61 @@ def getGroupShares():
for group in groups:
shares[group] = gConfig.getValue(f"/Registry/Groups/{group}/JobShare", DEFAULT_GROUP_SHARE)
return shares


def calculate_priority(
tq_dict: dict[int, float], all_tqs_data: dict[int, dict[str, Any]], share: float, allow_bg_tqs: bool
) -> dict[float, list[int]]:
"""
Calculate the priority for each TQ given a share
:param tq_dict: dict of {tq_id: prio}
:param all_tqs_data: dict of {tq_id: {tq_data}}, where tq_data is a dict of {field: value}
:param share: share to be distributed among TQs
:param allow_bg_tqs: allow background TQs to be used
:return: dict of {priority: [tq_ids]}
"""

def is_background(tq_priority: float, allow_bg_tqs: bool) -> bool:
"""
A TQ is background if its priority is below a threshold and background TQs are allowed
"""
return tq_priority <= 0.1 and allow_bg_tqs

# Calculate Sum of priorities of non background TQs
total_prio = sum([prio for prio in tq_dict.values() if not is_background(prio, allow_bg_tqs)])

# Update prio for each TQ
for tq_id, tq_priority in tq_dict.items():
if is_background(tq_priority, allow_bg_tqs):
prio = TQ_MIN_SHARE
else:
prio = max((share / total_prio) * tq_priority, TQ_MIN_SHARE)
tq_dict[tq_id] = prio

# Generate groups of TQs that will have the same prio=sum(prios) maomenos
tq_groups: dict[str, list[int]] = defaultdict(list)
for tq_id, tq_data in all_tqs_data.items():
for field in ("Jobs", "Priority") + priorityIgnoredFields:
if field in tq_data:
tq_data.pop(field)
tq_hash = []
for f in sorted(tq_data):
tq_hash.append(f"{f}:{tq_data[f]}")
tq_hash = "|".join(tq_hash)
# if tq_hash not in tq_groups:
# tq_groups[tq_hash] = []
tq_groups[tq_hash].append(tq_id)

# Do the grouping
for tq_group in tq_groups.values():
total_prio = sum(tq_dict[tq_id] for tq_id in tq_group)
for tq_id in tq_group:
tq_dict[tq_id] = total_prio

# Group by priorities
result: dict[float, list[int]] = defaultdict(list)
for tq_id, tq_priority in tq_dict.items():
result[tq_priority].append(tq_id)

return result
216 changes: 216 additions & 0 deletions src/DIRAC/WorkloadManagementSystem/DB/tests/Test_TaskQueueDB.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
import math
from typing import Any

import pytest
from DIRAC.WorkloadManagementSystem.DB.TaskQueueDB import TQ_MIN_SHARE, calculate_priority


@pytest.mark.parametrize("allow_bg_tqs", [True, False])
@pytest.mark.parametrize("share", [0.5, 1.0, 2.0])
def test_calculate_priority_empty_entry(share: float, allow_bg_tqs: bool) -> None:
"""test of the calculate_priority function"""
# Arrange
tq_dict: dict[int, float] = {}
all_tqs_data: dict[int, dict[str, Any]] = {}

# Act
result = calculate_priority(tq_dict, all_tqs_data, share, allow_bg_tqs)

# Assert
assert isinstance(result, dict)
assert len(result.keys()) == 0


@pytest.mark.parametrize("allow_bg_tqs", [True, False])
@pytest.mark.parametrize("share", [0.5, 1.0, 2.0])
def test_calculate_priority_different_priority_same_number_of_jobs(share: float, allow_bg_tqs: bool) -> None:
"""test of the calculate_priority function"""
# Arrange
tq_dict: dict[int, float] = {
1: 3.0,
2: 2.0,
3: 0.3,
}
all_tqs_data: dict[int, dict[str, Any]] = {
1: {
"Priority": 3.0,
"Jobs": 1,
"Owner": "userName",
"OwnerGroup": "myGroup",
"CPUTime": 50000,
},
2: {
"Priority": 2.0,
"Jobs": 1,
"Owner": "userName",
"OwnerGroup": "myGroup",
"CPUTime": 50000,
},
3: {
"Priority": 1.0,
"Jobs": 1,
"Owner": "userName",
"OwnerGroup": "myGroup",
"CPUTime": 50000,
},
}

# Act
result = calculate_priority(tq_dict, all_tqs_data, share, allow_bg_tqs)

# Assert
assert isinstance(result, dict)
assert len(result.keys()) == 1
key, value = result.popitem()
assert key == pytest.approx(share)
assert value == [1, 2, 3]
assert all(prio >= TQ_MIN_SHARE for prio in result.keys())


@pytest.mark.parametrize("allow_bg_tqs", [True, False])
@pytest.mark.parametrize("share", [0.5, 1.0, 2.0])
def test_calculate_priority_same_cpu_time(share: float, allow_bg_tqs: bool) -> None:
"""test of the calculate_priority function"""
# Arrange

# NOTE: the priority value from the tq_dict is not used in the calculation
# because all task queues end up in the same "priority group" let's say
tq_dict: dict[int, float] = {
1: 3.0,
2: 2.0,
3: 0.3,
}
all_tqs_data: dict[int, dict[str, Any]] = {
1: {
"Priority": 1.0,
"Jobs": 100,
"Owner": "userName",
"OwnerGroup": "myGroup",
"CPUTime": 50000,
},
2: {
"Priority": 2.0,
"Jobs": 14,
"Owner": "userName",
"OwnerGroup": "myGroup",
"CPUTime": 50000,
},
3: {
"Priority": 1.0,
"Jobs": 154,
"Owner": "userName",
"OwnerGroup": "myGroup",
"CPUTime": 50000,
},
}

# Act
result = calculate_priority(tq_dict, all_tqs_data, share, allow_bg_tqs)

# Assert
# All the tqs are supporsed to be regrouped in the same priority group
# even though they have different priority values (same cpu time)
assert isinstance(result, dict)
assert len(result.keys()) == 1
priority = set(result.keys()).pop()
assert priority == pytest.approx(share)
assert result[priority] == [1, 2, 3]
assert all(prio >= TQ_MIN_SHARE for prio in result.keys())


@pytest.mark.parametrize("allow_bg_tqs", [True, False])
@pytest.mark.parametrize("share", [0.5, 1.0, 2.0])
def test_calculate_priority_different_cpu_time(share: float, allow_bg_tqs: bool) -> None:
"""test of the calculate_priority function"""
# Arrange
tq_dict: dict[int, float] = {
1: 1.0,
2: 1.0,
3: 1.0,
}
all_tqs_data: dict[int, dict[str, Any]] = {
1: {
"Priority": 1.0,
"Jobs": 1,
"Owner": "userName",
"OwnerGroup": "myGroup",
"CPUTime": 150000,
},
2: {
"Priority": 1.0,
"Jobs": 1,
"Owner": "userName",
"OwnerGroup": "myGroup",
"CPUTime": 100000,
},
3: {
"Priority": 1.0,
"Jobs": 1,
"Owner": "userName",
"OwnerGroup": "myGroup",
"CPUTime": 50000,
},
}

# Act
result = calculate_priority(tq_dict, all_tqs_data, share, allow_bg_tqs)

# Assert
assert isinstance(result, dict)
assert len(result.keys()) == 1
priority = set(result.keys()).pop()
assert priority == pytest.approx(share / 3) # different group category
assert result[priority] == [1, 2, 3]


@pytest.mark.parametrize("allow_bg_tqs", [True, False])
@pytest.mark.parametrize("share", [0.5, 1.0, 2.0])
def test_calculate_priority_different_priority_different_number_of_jobs_different_cpu_time(
share: float, allow_bg_tqs: bool
) -> None:
"""test of the calculate_priority function"""
# Arrange
tq_dict: dict[int, float] = {
1: 5.0,
2: 3.0,
3: 2.0,
}
all_tqs_data: dict[int, dict[str, Any]] = {
1: {
"Priority": 1.0,
"Jobs": 1,
"Owner": "userName",
"OwnerGroup": "myGroup",
"CPUTime": 150000,
},
2: {
"Priority": 1.0,
"Jobs": 1,
"Owner": "userName",
"OwnerGroup": "myGroup",
"CPUTime": 100000,
},
3: {
"Priority": 1.0,
"Jobs": 1,
"Owner": "userName",
"OwnerGroup": "myGroup",
"CPUTime": 50000,
},
}

# Act
result = calculate_priority(tq_dict, all_tqs_data, share, allow_bg_tqs)

# Assert
assert isinstance(result, dict)
assert sum(result.keys()) == pytest.approx(share)
assert len(result.keys()) == 3

for priority in result.keys():
# assert that each key is in the following list at maximum epsilon distance
delta = math.inf
for expected_priority in [share * 0.5, share * 0.3, share * 0.2]:
delta = min(delta, abs(priority - expected_priority))
assert delta < 1e-6
assert len(result[priority]) == 1

0 comments on commit c3cea97

Please sign in to comment.