Skip to content
This repository has been archived by the owner on Jul 20, 2023. It is now read-only.

Commit

Permalink
Merge pull request #16 from katulu-io/fivenum-boxplot
Browse files Browse the repository at this point in the history
feat(develop): Federated Analytics : Boxplot and five number summary core
  • Loading branch information
msuzen authored Jun 29, 2022
2 parents 553b861 + 0b4f5d2 commit e50b291
Show file tree
Hide file tree
Showing 6 changed files with 309 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from .correlation import CorrelationProvider
from .data import data
from .fedhist import HistogramProvider
from .fedbox import BoxPlotProvider
from .provider import AnalyticsProvider

log = logging.getLogger(__name__)
Expand All @@ -17,6 +18,7 @@
providers: List[AnalyticsProvider] = [
CorrelationProvider(d),
HistogramProvider(d),
BoxPlotProvider(d)
]

log.info(f"starting client with {', '.join(p.name for p in providers) } providers")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from fedhist import HistogramProvider

class BoxPlotProvider(HistogramProvider):

@property
def name(self) -> str:
return "boxplot"
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import io
import json
from .fivenum import compute_fivesum, FiveNum
from .fedhist import HistogramProvider
import matplotlib.pyplot as plt


class BoxPlotProvider(HistogramProvider):

def aggregate(self) -> None:

super().aggregate()
histogram = self._result
client_input = self.client_input_data()
self._result = compute_fivesum(client_input['hmin'], client_input['hmax'], histogram)

def result_metadata_json(self) -> str:

five_num_info = {}
five_num_info["label"] = "box" # not required
five_num_info["med"] = self._result.median
five_num_info["q1"] = self._result.quartile_first
five_num_info["q3"] = self._result.quartile_third
five_num_info["whislo"] = self._result.maximum
five_num_info["whishi"] = self._result.minimum
five_num_info["fliers"] = []

stats = [five_num_info]

fig_svg = io.BytesIO()

fig, axes = plt.subplots(1, 1)
axes.bxp(stats)
axes.set_title("Box plot out of fiveNumSummary ")
axes.set_ylabel("Values")
plt.savefig(
fig_svg, facecolor="w", edgecolor="w", transparent=False, format="svg"
)

metadata = {
"version": 1,
"outputs": [
{
"type": "web-app",
"storage": "inline",
"source": fig_svg.getvalue().decode("utf-8"),
},
],
}

return json.dumps(metadata)

@property
def name(self) -> str:
return "boxplot"
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"""
Five number summary given histogram.
Bases for a Boxplot.
Katulu GmbH
(c) 2022
"""

import numpy as np
from dataclasses import dataclass
from typing import List, Optional
from .fedhist import HistogramData


@dataclass
class FiveNum:
"""Five numbers for box plot."""

minimum: float
quartile_first: float
median: float
quartile_third: float
maximum: float

def compute_fivesum(hmin: float, hmax: float, histogram: HistogramData) -> FiveNum:
""" Compute five number summary. """
def inverse_value_bin(histogram: HistogramData, yvalue: float) -> float:
"""
Return closest bin edge given yvalue : count
"""
idx = (np.abs(yvalue - histogram.counts)).argmin()
return histogram.bins[idx]

dist = histogram.counts

fivenum = FiveNum(
minimum=hmin,
quartile_first=inverse_value_bin(histogram, np.quantile(dist, q=0.25)),
median=inverse_value_bin(histogram, np.quantile(dist, q=0.50)),
quartile_third=inverse_value_bin(histogram, np.quantile(dist, q=0.75)),
maximum=hmax,
)
return fivenum
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""
Federated FedBox Utilities : Functional tests
Katulu GmbH
(c) 2022
"""
import io
import argparse
import numpy as np
from flwr.common import Scalar
from flwr_analytics_server.fedbox import BoxPlotProvider

def _numpy_to_scalar(input: np.ndarray) -> Scalar:
buf = io.BytesIO()
np.save(buf, input)

return buf.getvalue()


def run_two_client_example(_rtol=1e-05, _atol=1e-08) -> None:
"""Run two client histogram construction."""
#
# Two client : histogram constuction
#
# Test data
#
np.random.seed(4242)
global_data = np.random.normal(size=1000, loc=20)
local_data_0 = global_data[0:500]
local_data_1 = global_data[500:1000]
#
# Compute local histograms
#
client_0_histogram = np.histogram(local_data_0, bins=100, range=(17.0, 24.0))
client_1_histogram = np.histogram(local_data_1, bins=100, range=(17.0, 24.0))
#
# Aggregation for global histogram
#
provider = BoxPlotProvider()
args = argparse.Namespace()
args.nbins = 100
args.hmin = 17.0
args.hmax = 24.0
provider.set_arguments(args)

provider.add_client_data({
"counts": _numpy_to_scalar(client_0_histogram[0]),
"bins": _numpy_to_scalar(client_0_histogram[1]),
})
provider.add_client_data({
"counts": _numpy_to_scalar(client_1_histogram[0]),
"bins": _numpy_to_scalar(client_1_histogram[1]),
})

provider.aggregate()
fivenum = provider._result
_fivenum = [fivenum.minimum, fivenum.quartile_first, fivenum.median, fivenum.quartile_third, fivenum.maximum]
_fivenum_precomputed = [17.0, 17.21, 17.91, 19.03, 24.0]
#
# Assertion
#
assert np.allclose(_fivenum, _fivenum_precomputed, atol=_atol, rtol=_rtol)


def test_two_clients_normal() -> None:
"""Two client example."""
run_two_client_example()
129 changes: 129 additions & 0 deletions components/kubeflow-pipeline/python/fl_suite/analytics/_boxplot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
from typing import Callable, Optional, Tuple

from kfp import Client
from kfp.components import load_component
from kfp.components._structures import InputSpec, InputValuePlaceholder
from kfp.dsl import ContainerOp, ExitHandler, pipeline

from .._version import __version__
from ..pipelines import (
add_envoy_proxy,
build_image,
cleanup_kubernetes_resources,
create_image_tag,
setup_kubernetes_resources,
)
from ._analytics import Parameter, analytics_server_spec


# pylint: disable-next=too-many-arguments
def boxplot(
data_func: Callable[[str, bool], ContainerOp],
nbins: int,
hrange: Tuple[float, float],
min_available_clients: int = 2,
host: Optional[str] = None,
experiment_name: Optional[str] = None,
registry: str = "ghcr.io/katulu-io/fl-suite",
verify_registry_tls: bool = True,
) -> None:
"""Run distributed boxplot of data provided by multiple clients."""
analytics_server = f"{registry}/analytics-server:{__version__}"

client = Client(host)
pipeline_run = client.create_run_from_pipeline_func(
boxplot_pipeline(
fl_client=data_func,
fl_server_image=analytics_server,
registry=registry,
verify_registry_tls=verify_registry_tls,
),
arguments={
"min_available_clients": min_available_clients,
"nbins": nbins,
"hmin": hrange[0],
"hmax": hrange[1],
},
experiment_name=experiment_name,
)
result = client.wait_for_run_completion(pipeline_run.run_id, timeout=3600)
status = result.run.status.lower()

if status in ["failed", "skipped", "error"]:
raise RuntimeError(f"Run {status}")


def boxplot_server(
server_image: str,
min_available_clients: int,
nbins: int,
hrange: Tuple[float, float],
) -> ContainerOp:
"""Component to run a Flower server for federated boxplot."""
spec = analytics_server_spec(
server_image=server_image,
subcommand="boxplot",
parameters=[
Parameter(
InputSpec("nbins", type="Integer"),
"--nbins",
InputValuePlaceholder("nbins"),
),
Parameter(
InputSpec("hmin", type="Float"),
"--hmin",
InputValuePlaceholder("hmin"),
),
Parameter(
InputSpec("hmax", type="Float"),
"--hmax",
InputValuePlaceholder("hmax"),
),
],
)
component = load_component(component_spec=spec)
# pylint: disable-next=not-callable
analytics_server_op: ContainerOp = component(
min_available_clients,
nbins,
hrange[0],
hrange[1],
)
analytics_server_op.enable_caching = False
add_envoy_proxy(analytics_server_op)

return analytics_server_op


def boxplot_pipeline(
registry: str,
verify_registry_tls: bool,
fl_client: Callable[[str, bool], ContainerOp],
fl_server_image: str,
):
"""Create a boxplot pipeline."""

@pipeline(name="boxplot")
def h_pipeline(
min_available_clients: int,
nbins: int,
hmin: float,
hmax: float,
image_tag: str = create_image_tag("boxplot-client"),
) -> None:
with ExitHandler(cleanup_kubernetes_resources()):
prepare_context_op = fl_client(registry, verify_registry_tls)
build_image(
build_context_path=prepare_context_op.outputs["build_context_path"],
image_tag=image_tag,
registry=registry,
verify_registry_tls=verify_registry_tls,
)

setup_kubernetes_resources_op = setup_kubernetes_resources()
analytics_server_op = boxplot_server(
fl_server_image, min_available_clients, nbins, (hmin, hmax)
)
analytics_server_op.after(setup_kubernetes_resources_op)

return h_pipeline

0 comments on commit e50b291

Please sign in to comment.