Skip to content

Commit

Permalink
Added the mapper tool to map trace to vSwarm proxies.
Browse files Browse the repository at this point in the history
Added documentation for the mapper tool.

Signed-off-by: KarthikL1729 <[email protected]>
  • Loading branch information
KarthikL1729 committed Oct 30, 2024
1 parent b6aad78 commit bc1d3a7
Show file tree
Hide file tree
Showing 6 changed files with 528 additions and 0 deletions.
88 changes: 88 additions & 0 deletions docs/mapper.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# Mapper

The mapper tool can be used to map the functions in a given trace directory (with memory and duration traces) to the proxy functions in the [`vSwarm`](https://github.com/vhive-serverless/vSwarm/tree/main/) benchmark suite. The benchmarks present in the vSwarm benchmark suite have been profiled and their memory utilization and duration traces have been collected and stored in the `profile.json` file. Each function in the trace is mapped to a function in the benchmark suite as its closest proxy (based on memory and duration correlation).

The `profile.json` JSON output file is generated by the [`profiler` tool](https://github.com/vhive-serverless/vSwarm/tree/load-generator/tools/profiler#profiler) to obtain the profile of the benchmark suite functions.

### Usage

```bash
usage: mapper.py [-h] -t TRACE_DIRECTORYPATH -p PROFILE_FILEPATH [-o OUTPUT_FILEPATH] [-u UNIQUE_ASSIGNMENT]

Arguments:
-h, --help show this help message and exit
-t TRACE_DIRECTORYPATH, --trace-directorypath TRACE_DIRECTORYPATH
Path to the directory containing the trace files (required)
-p PROFILE_FILEPATH, --profile-filepath PROFILE_FILEPATH
Path to the profile file containing the proxy functions
-u UNIQUE_ASSIGNMENT, --unique-assignment UNIQUE_ASSIGNMENT
Whether to assign unique proxy functions to each trace function
```
The tool reads the trace information(memory and duration details) from the `trace/` directory (can be configured using `-t` or `--trace-directorypath` flags). The `trace/` directory must contain the `memory.csv` and `durations.csv` files containing the respective trace information of the format mentioned in [*Azure Functions Dataset 2019*](https://github.com/Azure/AzurePublicDataset/blob/master/AzureFunctionsDataset2019.md)

#### Function Execution Duration `durations.csv` Schema

|Field|Description |
|--|--|
| HashOwner | unique id of the application owner |
| HashApp | unique id for application name |
| HashFunction | unique id for the function name within the app |
|Average | Average execution time (ms) across all invocations of the 24-period|
|Count | Number of executions used in computing the average|
|Minimum | Minimum execution time|
|Maximum | Maximum execution time|
|percentile_Average_0| Weighted 0th-percentile of the execution time *average*|
|percentile_Average_1| Weighted 1st-percentile of the execution time *average*|
|percentile_Average_25 | Weighted 25th-percentile of the execution time *average*|
|percentile_Average_50 | Weighted 50th-percentile of the execution time *average*|
|percentile_Average_75 | Weighted 75th-percentile of the execution time *average*|
|percentile_Average_99 | Weighted 99th-percentile of the execution time *average*|
|percentile_Average_100 | Weighted 100th-percentile of the execution time *average*|
Execution time is in milliseconds.

#### Function Memory Usage `memory.csv` Schema

|Field|Description |
|--|--|
| HashOwner | unique id of the application owner |
| HashApp | unique id for application name |
| HashFunction | unique id for the function name within the app |
|SampleCount | Number of samples used for computing the average |
|AverageAllocatedMb | Average allocated memory across all SampleCount measurements|
|AverageAllocatedMb_pct1 | 1st percentile of the average allocated memory|
|AverageAllocatedMb_pct5 | 5th percentile of the average allocated memory|
|AverageAllocatedMb_pct25 | 25th percentile of the average allocated memory|
|AverageAllocatedMb_pct50 | 50th percentile of the average allocated memory|
|AverageAllocatedMb_pct75 | 75th percentile of the average allocated memory|
|AverageAllocatedMb_pct95 | 95th percentile of the average allocated memory|
|AverageAllocatedMb_pct99 | 99th percentile of the average allocated memory|
|AverageAllocatedMb_pct100 | 100th percentile of the average allocated memory|

The [`sampler`](https://github.com/vhive-serverless/invitro/tree/main/sampler) tool in InVitro can be used to generate the sampled traces from the original Azure traces.

For every function in the trace, the closest function in the [`vSwarm`](https://github.com/vhive-serverless/vSwarm/tree/main/) benchmark suite is set as its proxy (75-percentile memory and 75-percentile duration are considered to find the highest correlation). If the `-u` (or `--unique-assignment`) flag is set to true, the tool tries to find a one-to-one (injective) mapping between trace functions and proxy functions by modelling it as a *linear sum assignment* problem which is solved by the SciPy implementation of the *Jonker-Volgenant algorithm*. The 75th percentile is used to ensure that the mapping is not only corresponding to the peak values of the workload, but is also leading to a representative proxy function. If the number of trace functions is greater than the number of proxy functions, or if the mapping is not achieved, the injective constraint is removed, and the closest proxy function is obtained. Currently the tool utilizes only _Serving Functions_ that are _NOT Pipelined_ as proxy functions.

This mapping requires the profiles of the benchmark functions for it to be used as a proxy. The tool utilizes the `profile.json` JSON output file generated by the [`profiler` tool](https://github.com/vhive-serverless/vSwarm/tree/load-generator/tools/profiler#profiler) to obtain the profile of the benchmark suite functions. The User can configure the path of the JSON file through the `-p` (or `--profile-filepath`) flag (by default, it is `profile.json`, which needs to be unzipped).

An example of a generated output file is as follows:

```json
{
"c13acdc7567b225971cef2416a3a2b03c8a4d8d154df48afe75834e2f5c59ddf": {
"proxy-function": "video-processing-python-10"
},
"a2faad786b3c813b12ce57d349d5e62f6d0f22ceecfa86cd72a962853383b600": {
"proxy-function": "image-rotate-go-11"
},
"7dc5aeabc131669912e8c793c8925cc9928321f45f13a4af031592b4611630d7": {
"proxy-function": "video-processing-python-70"
},
"ae8a1640fa932024f59b38a0b001808b5c64612bd60c6f3eb80ba9461ba2d091": {
"proxy-function": "video-processing-python-20"
}
}
```

The mapper output file will be stored in the trace directory with the name `mapper_output.json` by default. The output file contains the mapping of the trace functions to the proxy functions in the vSwarm benchmark suite.

---
1 change: 1 addition & 0 deletions tools/mapper/.gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
profile.tar.gz filter=lfs diff=lfs merge=lfs -text
252 changes: 252 additions & 0 deletions tools/mapper/find_proxy_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
import numpy as np
import scipy.optimize as sp
import math

from collections import OrderedDict

from log_config import *
from typing import Tuple

def get_error(trace_function, proxy_function) -> float:
"""
Returns a float value on how close the trace function is to the proxy function. Lower the value, better the correlation.
Euclidean distance between normalized memory and duration is considered.
Parameters:
- `trace_function` (dict): Dictionary containing information regarding trace function
- `proxy_function` (dict): Dictionary containing information regarding proxy function
Returns:
- `float`: closeness value
"""

try:
trace_memory = trace_function["memory"]["75-percentile"]
proxy_memory = proxy_function["memory"]["75-percentile"]
trace_duration = trace_function["duration"]["75-percentile"]
proxy_duration = proxy_function["duration"]["75-percentile"]
except KeyError as e:
log.warning(f"Correlation cannot be found. Error: {e}")
return math.inf

# NOTE: Better Error mechanisms can be considered to improve the correlation
# Currently only the 75%tile memory and duration are considered.
# Euclidean distance between normalized memory and duration is considered
try:
if trace_memory == 0: trace_memory += 0.01
if trace_duration == 0: trace_duration += 0.01
diff_memory = (math.log(trace_memory) - math.log(proxy_memory))
diff_duration = (math.log(trace_duration) - math.log(proxy_duration))
error = math.sqrt((diff_memory) ** 2 + (diff_duration) ** 2)
return error
except ValueError as e:
log.warning(f"Correlation cannot be found. Error: {e}")
return math.inf


def get_proxy_function_using_linear_sum_assignment(
trace_functions: dict, proxy_functions: dict
) -> Tuple[dict, int]:
"""
Obtains the one-to-one mapped proxy function for every trace function
Parameters:
- `trace_functions` (dict): Dictionary containing information regarding trace functions
- `proxy_functions` (dict): Dictionary containing information regarding proxy functions
Returns:
- `dict`: Dictionary containing information regarding trace functions with the associated proxy functions
- `int`: 0 if no error. -1 if error
"""

try:

trace_functions = OrderedDict(trace_functions)
proxy_functions = OrderedDict(proxy_functions)

trace_list = []
for tf in trace_functions:
trace_list.append(trace_functions[tf])
trace_functions[tf]["index"] = len(trace_list) - 1

proxy_list = []
for pf in proxy_functions:
proxy_list.append(proxy_functions[pf])
proxy_functions[pf]["index"] = len(proxy_list) - 1

# Creating error matrix
m, n = len(trace_functions.keys()), len(proxy_functions.keys())
error_matrix = np.empty((m, n))

# This utilized Jonker-Volgenant algorithm for Linear Sum assignment - scipy package
# to calculate the best possible assignment for the trace functions
# Time complexity : O(n^3) where n is the largest of number of rows/columns
for i in range(m):
for j in range(n):
error_matrix[i, j] = get_error(trace_list[i], proxy_list[j])

# Do the linear sum assignment problem
row_indices, col_indices = sp.linear_sum_assignment(error_matrix)
assignments = list(zip(row_indices, col_indices))

# Go through the assignment solution
for assignment in assignments:
row_index = assignment[0]
col_index = assignment[1]
trace = ""
proxy = ""
for tf in trace_functions:
if row_index == trace_functions[tf]["index"]:
trace = tf
break
for pf in proxy_functions:
if col_index == proxy_functions[pf]["index"]:
proxy = pf
break
trace_functions[trace]["proxy-function"] = proxy
trace_functions[trace]["proxy-correlation"] = get_error(
trace_functions[trace], proxy_functions[proxy]
)
log.debug(
f"Found proxy function for {trace}: {trace_functions[trace]['proxy-function']} with correlation: {trace_functions[trace]['proxy-correlation']}"
)

# Go through the trace functions to ensure proxy function exists. If not, then report
for tf in trace_functions:
if "proxy-function" not in trace_functions[tf]:
log.warning(f"Mapping for function {tf} not found")
elif trace_functions[tf]["proxy-function"] == "":
log.warning(f"Mapping for function {tf} not found")

# Deleting unnecessary stuffs
for tf in trace_functions:
del trace_functions[tf]["index"]
for pf in proxy_functions:
del proxy_functions[pf]["index"]

return trace_functions, 0

except Exception as e:
log.error(f"Mapping through linear sum assignment failed. Error: {e}")
return trace_functions, -1


def get_closest_proxy_function(
trace_functions: dict, proxy_functions: dict
) -> Tuple[dict, int]:
"""
Obtains the closest proxy function for every trace function
Parameters:
- `trace_functions` (dict): Dictionary containing information regarding trace functions
- `proxy_functions` (dict): Dictionary containing information regarding proxy functions
Returns:
- `dict`: Dictionary containing information regarding trace functions with the associated proxy functions
- `int`: 0 if no error. -1 if error
"""

try:
proxy_list = []
for function_name in proxy_functions:
proxy_list.append(proxy_functions[function_name])
proxy_functions[function_name]["index"] = len(proxy_list) - 1

for function_name in trace_functions:
min_error = math.inf
min_error_index = -1
for i in range(0, len(proxy_list)):
error = get_error(trace_functions[function_name], proxy_list[i])
if error < min_error:
min_error = error
min_error_index = i

if min_error == math.inf:
log.warning(f"Proxy function for function {function_name} not found")
continue

trace_functions[function_name]["proxy-function"] = proxy_list[
min_error_index
]["name"]
trace_functions[function_name]["proxy-correlation"] = get_error(
trace_functions[function_name], proxy_list[min_error_index]
)
log.debug(
f"Found proxy function for {function_name}: {trace_functions[function_name]['proxy-function']} with correlation: {trace_functions[function_name]['proxy-correlation']}"
)

for function_name in proxy_functions:
del proxy_functions[function_name]["index"]

return trace_functions, 0

except Exception as e:
log.error(f"Finding closest proxy function failed. Error: {e}")
return trace_functions, -1


def get_proxy_function(
trace_functions: dict, proxy_functions: dict, unique_assignment: bool
) -> Tuple[dict, int]:
"""
Obtains the closest proxy function for every trace function
Parameters:
- `trace_functions` (dict): Dictionary containing information regarding trace functions
- `proxy_functions` (dict): Dictionary containing information regarding proxy functions
- `unique_assignment` (bool): If `True`, then trace-proxy function mapping is one-to-one, provided #(proxy functions) > #(trace functions)
Returns:
- `dict`: Dictionary containing information regarding trace functions with the associated proxy functions
- `int`: 0 if no error. -1 if error
"""

trace_functions = OrderedDict(trace_functions)
proxy_functions = OrderedDict(proxy_functions)

log.info(
f"Lower the correlation value, the proxy function is a better proxy of the trace function"
)

if (unique_assignment) and (len(trace_functions) <= len(proxy_functions)):
log.info(
f"Getting One-To-One mapping between trace function and proxy function using Linear-Sum-Assignment"
)
trace_functions, err = get_proxy_function_using_linear_sum_assignment(
trace_functions=trace_functions, proxy_functions=proxy_functions
)
if err == -1:
log.error(
f"One-To-One mapping between trace function and proxy function not obtained"
)
log.info(
f"Getting closest proxy function for every trace function. Note: Mapping may not be unique"
)
trace_functions, err = get_closest_proxy_function(
trace_functions=trace_functions, proxy_functions=proxy_functions
)

elif (unique_assignment) and (len(trace_functions) > len(proxy_functions)):
log.warning(
f"One-To-One mapping between trace function and proxy function not possible since number of trace functions is greater than available proxy functions"
)
log.info(
f"Getting closest proxy function for every trace function. Note: Mapping may not be unique"
)
trace_functions, err = get_closest_proxy_function(
trace_functions=trace_functions, proxy_functions=proxy_functions
)

else:
log.info(
f"Getting closest proxy function for every trace function. Note: Mapping may not be unique"
)
trace_functions, err = get_closest_proxy_function(
trace_functions=trace_functions, proxy_functions=proxy_functions
)

if err == -1:
log.critical(f"Mapping between trace function and proxy function not obtained")
return trace_functions, -1

return trace_functions, 0
37 changes: 37 additions & 0 deletions tools/mapper/log_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import logging

class CustomFormatter(logging.Formatter):

blue = "\x1b[34;20m"
green = "\x1b[32;20m"
grey = "\x1b[38;20m"
yellow = "\x1b[33;20m"
red = "\x1b[31;20m"
bold_red = "\x1b[31;1m"
bold_red_white_bg = "\x1b[1;37;41m"
reset = "\x1b[0m"
format = "%(asctime)s - %(levelname)s - (%(filename)s:%(lineno)d) - %(message)s"

FORMATS = {
logging.DEBUG: grey + format + reset,
logging.INFO: green + format + reset,
logging.WARNING: yellow + format + reset,
logging.ERROR: red + format + reset,
logging.CRITICAL: bold_red_white_bg + format + reset,
}

def format(self, record):
log_fmt = self.FORMATS.get(record.levelno)
formatter = logging.Formatter(log_fmt)
return formatter.format(record)


log = logging.getLogger("Benchmark")
log.setLevel(logging.INFO)
ch = logging.StreamHandler()
ch.setFormatter(CustomFormatter())
log.addHandler(ch)

def setLogLevel(level: str) -> None:
if (level == "INFO"): log.setLevel(logging.INFO)
elif (level == "DEBUG"): log.setLevel(logging.DEBUG)
Loading

0 comments on commit bc1d3a7

Please sign in to comment.