Skip to content

Commit

Permalink
Add implementation-specific descriptors (python-graphblas#343)
Browse files Browse the repository at this point in the history
* Add implementation-specific descriptors

* Coverage (skip some hard-to-reach safety exceptions)

* pre-commit autoupdate

* Use latest coveralls (that doesn't yet support latest coverage)

* Is quote needed?

* Closer. Try '"'

* Why is windows being stupid?

* Have both forward and reverse mappings for config and descriptor enumerations

* Still create descriptors where `**opts` is ignored

* Coverage
  • Loading branch information
eriknw authored Dec 22, 2022
1 parent 70a0d17 commit ae0a232
Show file tree
Hide file tree
Showing 22 changed files with 1,215 additions and 588 deletions.
141 changes: 73 additions & 68 deletions graphblas/core/agg.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ def _new(self, updater, expr, *, in_composite=False):
return parent._as_vector()
return

opts = updater.opts
if agg._composite is not None:
# Masks are applied throughout the aggregation, including composite aggregations.
# Aggregations done while `in_composite is True` should return the updater parent
Expand All @@ -173,14 +174,14 @@ def _new(self, updater, expr, *, in_composite=False):
for cur_agg in agg._composite:
cur_agg = cur_agg[self.type] # Hopefully works well enough
arg = expr.construct_output(cur_agg.return_type)
results.append(cur_agg._new(arg(mask=mask), expr, in_composite=True))
final_expr = agg._finalize(*results)
results.append(cur_agg._new(arg(mask=mask, **opts), expr, in_composite=True))
final_expr = agg._finalize(*results, opts)
if expr.cfunc_name == "GrB_Matrix_reduce_Aggregator":
updater << final_expr
elif expr.cfunc_name.startswith("GrB_Vector_reduce") or expr.cfunc_name.startswith(
"GrB_Matrix_reduce"
):
final = final_expr.new()
final = final_expr.new(**opts)
updater << final[0]
else:
raise NotImplementedError(f"{agg.name} with {expr.cfunc_name}")
Expand All @@ -192,7 +193,7 @@ def _new(self, updater, expr, *, in_composite=False):
return

if agg._custom is not None:
return agg._custom(self, updater, expr, in_composite=in_composite)
return agg._custom(self, updater, expr, opts, in_composite=in_composite)

semiring = get_typed_op(agg._semiring, self.type, agg._initdtype)
if expr.cfunc_name == "GrB_Matrix_reduce_Aggregator":
Expand All @@ -201,12 +202,12 @@ def _new(self, updater, expr, *, in_composite=False):
orig_updater = updater
if agg._finalize is not None:
step1 = expr.construct_output(semiring.return_type)
updater = step1(mask=updater.kwargs.get("mask"))
updater = step1(mask=updater.kwargs.get("mask"), **opts)
if expr.method_name == "reduce_columnwise":
A = A.T
size = A._ncols
init = expr._new_vector(agg._initdtype, size=size)
init[...] = agg._initval # O(1) dense vector in SuiteSparse 5
init(**opts)[...] = agg._initval # O(1) dense vector in SuiteSparse 5
if agg._switch:
updater << semiring(init @ A.T)
else:
Expand All @@ -220,17 +221,17 @@ def _new(self, updater, expr, *, in_composite=False):
v = expr.args[0]
step1 = expr._new_vector(semiring.return_type, size=1)
init = expr._new_matrix(agg._initdtype, nrows=v._size, ncols=1)
init[...] = agg._initval # O(1) dense column vector in SuiteSparse 5
init(**opts)[...] = agg._initval # O(1) dense column vector in SuiteSparse 5
if agg._switch:
step1 << semiring(init.T @ v)
step1(**opts) << semiring(init.T @ v)
else:
step1 << semiring(v @ init)
step1(**opts) << semiring(v @ init)
if agg._finalize is not None:
finalize = agg._finalize[semiring.return_type]
if step1.dtype == finalize.return_type:
step1 << finalize(step1)
step1(**opts) << finalize(step1)
else:
step1 = finalize(step1).new(finalize.return_type)
step1 = finalize(step1).new(finalize.return_type, **opts)
if in_composite:
return step1
updater << step1[0]
Expand All @@ -241,23 +242,23 @@ def _new(self, updater, expr, *, in_composite=False):
# This has not been benchmarked or optimized.
# We may be able to intelligently choose the faster path.
init1 = expr._new_vector(agg._initdtype, size=A._ncols)
init1[...] = agg._initval # O(1) dense vector in SuiteSparse 5
init1(**opts)[...] = agg._initval # O(1) dense vector in SuiteSparse 5
step1 = expr._new_vector(semiring.return_type, size=A._nrows)
if agg._switch:
step1 << semiring(init1 @ A.T)
step1(**opts) << semiring(init1 @ A.T)
else:
step1 << semiring(A @ init1)
step1(**opts) << semiring(A @ init1)
init2 = expr._new_matrix(agg._initdtype, nrows=A._nrows, ncols=1)
init2[...] = agg._initval # O(1) dense vector in SuiteSparse 5
init2(**opts)[...] = agg._initval # O(1) dense vector in SuiteSparse 5
semiring2 = agg._semiring2[semiring.return_type]
step2 = expr._new_vector(semiring2.return_type, size=1)
step2 << semiring2(step1 @ init2)
step2(**opts) << semiring2(step1 @ init2)
if agg._finalize is not None:
finalize = agg._finalize[semiring2.return_type]
if step2.dtype == finalize.return_type:
step2 << finalize(step2)
else:
step2 = finalize(step2).new(finalize.return_type)
step2 = finalize(step2).new(finalize.return_type, **opts)
if in_composite:
return step2
updater << step2[0]
Expand Down Expand Up @@ -342,53 +343,53 @@ def __reduce__(self):


# Composite
def _mean_finalize(c, x):
def _mean_finalize(c, x, opts):
return binary.truediv(x & c)


def _ptp_finalize(max, min):
def _ptp_finalize(max, min, opts):
return binary.minus(max & min)


def _varp_finalize(c, x, x2):
def _varp_finalize(c, x, x2, opts):
# <x2> / n - (<x> / n)**2
left = binary.truediv(x2 & c).new()
right = binary.truediv(x & c).new()
right << binary.pow(right, 2)
left = binary.truediv(x2 & c).new(**opts)
right = binary.truediv(x & c).new(**opts)
right(**opts) << binary.pow(right, 2)
return binary.minus(left & right)


def _vars_finalize(c, x, x2):
def _vars_finalize(c, x, x2, opts):
# <x2> / (n-1) - <x>**2 / (n * (n-1))
x << binary.pow(x, 2)
right = binary.truediv(x & c).new()
c << binary.minus(c, 1)
right << binary.truediv(right & c)
left = binary.truediv(x2 & c).new()
x(**opts) << binary.pow(x, 2)
right = binary.truediv(x & c).new(**opts)
c(**opts) << binary.minus(c, 1)
right(**opts) << binary.truediv(right & c)
left = binary.truediv(x2 & c).new(**opts)
return binary.minus(left & right)


def _stdp_finalize(c, x, x2):
val = _varp_finalize(c, x, x2).new()
def _stdp_finalize(c, x, x2, opts):
val = _varp_finalize(c, x, x2, opts).new(**opts)
return unary.sqrt(val)


def _stds_finalize(c, x, x2):
val = _vars_finalize(c, x, x2).new()
def _stds_finalize(c, x, x2, opts):
val = _vars_finalize(c, x, x2, opts).new(**opts)
return unary.sqrt(val)


def _geometric_mean_finalize(c, x):
right = unary.minv["FP64"](c).new()
def _geometric_mean_finalize(c, x, opts):
right = unary.minv["FP64"](c).new(**opts)
return binary.pow(x & right)


def _harmonic_mean_finalize(c, x):
def _harmonic_mean_finalize(c, x, opts):
return binary.truediv(c & x)


def _root_mean_square_finalize(c, x2):
val = binary.truediv(x2 & c).new()
def _root_mean_square_finalize(c, x2, opts):
val = binary.truediv(x2 & c).new(**opts)
return unary.sqrt(val)


Expand Down Expand Up @@ -453,6 +454,7 @@ def _argminmaxij(
agg,
updater,
expr,
opts,
*,
in_composite,
monoid,
Expand All @@ -462,51 +464,52 @@ def _argminmaxij(
if expr.cfunc_name == "GrB_Matrix_reduce_Aggregator":
A = expr.args[0]
if expr.method_name == "reduce_rowwise":
step1 = A.reduce_rowwise(monoid).new()
step1 = A.reduce_rowwise(monoid).new(**opts)

D = step1.diag()

masked = semiring.any_eq(D @ A).new()
masked(mask=masked.V, replace=True) << masked # Could use select
masked = semiring.any_eq(D @ A).new(**opts)
masked(mask=masked.V, replace=True, **opts) << masked # Could use select
init = expr._new_vector(bool, size=A._ncols)
init[...] = False # O(1) dense vector in SuiteSparse 5
init(**opts)[...] = False # O(1) dense vector in SuiteSparse 5
updater << row_semiring(masked @ init)
if in_composite:
return updater.parent
else:
step1 = A.reduce_columnwise(monoid).new()
step1 = A.reduce_columnwise(monoid).new(**opts)

D = step1.diag()

masked = semiring.any_eq(A @ D).new()
masked(mask=masked.V, replace=True) << masked # Could use select
masked = semiring.any_eq(A @ D).new(**opts)
masked(mask=masked.V, replace=True, **opts) << masked # Could use select
init = expr._new_vector(bool, size=A._nrows)
init[...] = False # O(1) dense vector in SuiteSparse 5
init(**opts)[...] = False # O(1) dense vector in SuiteSparse 5
updater << col_semiring(init @ masked)
if in_composite:
return updater.parent
elif expr.cfunc_name.startswith("GrB_Vector_reduce"):
v = expr.args[0]
step1 = v.reduce(monoid, allow_empty=False).new()
masked = binary.eq(v, step1).new()
masked(mask=masked.V, replace=True) << masked # Could use select
step1 = v.reduce(monoid, allow_empty=False).new(**opts)
masked = binary.eq(v, step1).new(**opts)
masked(mask=masked.V, replace=True, **opts) << masked # Could use select
init = expr._new_matrix(bool, nrows=v._size, ncols=1)
init[...] = False # O(1) dense column vector in SuiteSparse 5
step2 = col_semiring(masked @ init).new()
init(**opts)[...] = False # O(1) dense column vector in SuiteSparse 5
step2 = col_semiring(masked @ init).new(**opts)
if in_composite:
return step2
updater << step2[0]
else:
raise NotImplementedError(f"{agg.name} with {expr.cfunc_name}")


def _argminmax(agg, updater, expr, *, in_composite, monoid):
def _argminmax(agg, updater, expr, opts, *, in_composite, monoid):
if expr.cfunc_name == "GrB_Matrix_reduce_Aggregator":
if expr.method_name == "reduce_rowwise":
return _argminmaxij(
agg,
updater,
expr,
opts,
in_composite=in_composite,
monoid=monoid,
row_semiring=semiring.min_firstj,
Expand All @@ -516,6 +519,7 @@ def _argminmax(agg, updater, expr, *, in_composite, monoid):
agg,
updater,
expr,
opts,
in_composite=in_composite,
monoid=monoid,
row_semiring=semiring.min_firsti,
Expand All @@ -526,6 +530,7 @@ def _argminmax(agg, updater, expr, *, in_composite, monoid):
agg,
updater,
expr,
opts,
in_composite=in_composite,
monoid=monoid,
row_semiring=semiring.min_firsti,
Expand All @@ -549,52 +554,52 @@ def _argminmax(agg, updater, expr, *, in_composite, monoid):
)


def _first_last(agg, updater, expr, *, in_composite, semiring_):
def _first_last(agg, updater, expr, opts, *, in_composite, semiring_):
if expr.cfunc_name == "GrB_Matrix_reduce_Aggregator":
A = expr.args[0]
if expr.method_name == "reduce_columnwise":
A = A.T
init = expr._new_vector(bool, size=A._ncols)
init[...] = False # O(1) dense vector in SuiteSparse 5
step1 = semiring_(A @ init).new()
init(**opts)[...] = False # O(1) dense vector in SuiteSparse 5
step1 = semiring_(A @ init).new(**opts)
Is, Js = step1.to_coo()

Matrix_ = type(expr._new_matrix(bool))
P = Matrix_.from_coo(Js, Is, 1, nrows=A._ncols, ncols=A._nrows)
mask = step1.diag()
result = semiring.any_first(A @ P).new(mask=mask.S).diag()
result = semiring.any_first(A @ P).new(mask=mask.S, **opts).diag(**opts)

updater << result
if in_composite:
return updater.parent
elif expr.cfunc_name.startswith("GrB_Vector_reduce"):
v = expr.args[0]
init = expr._new_matrix(bool, nrows=v._size, ncols=1)
init[...] = False # O(1) dense matrix in SuiteSparse 5
step1 = semiring_(v @ init).new()
init(**opts)[...] = False # O(1) dense matrix in SuiteSparse 5
step1 = semiring_(v @ init).new(**opts)
index = step1[0].new().value
# `==` instead of `is` automatically triggers index.compute() in dask-graphblas:
if index == None: # noqa
index = 0
if in_composite:
return v[[index]].new()
return v[[index]].new(**opts)
updater << v[index]
else: # GrB_Matrix_reduce
A = expr.args[0]
init1 = expr._new_matrix(bool, nrows=A._ncols, ncols=1)
init1[...] = False # O(1) dense matrix in SuiteSparse 5
step1 = semiring_(A @ init1).new()
init1(**opts)[...] = False # O(1) dense matrix in SuiteSparse 5
step1 = semiring_(A @ init1).new(**opts)
init2 = expr._new_vector(bool, size=A._nrows)
init2[...] = False # O(1) dense vector in SuiteSparse 5
step2 = semiring_(step1.T @ init2).new()
init2(**opts)[...] = False # O(1) dense vector in SuiteSparse 5
step2 = semiring_(step1.T @ init2).new(**opts)
i = step2[0].new().value
# `==` instead of `is` automatically triggers i.compute() in dask-graphblas:
if i == None: # noqa
i = j = 0
else:
j = step1[i, 0].new().value
if in_composite:
return A[i, [j]].new()
return A[i, [j]].new(**opts)
updater << A[i, j]


Expand All @@ -612,22 +617,22 @@ def _first_last(agg, updater, expr, *, in_composite, semiring_):
)


def _first_last_index(agg, updater, expr, *, in_composite, semiring):
def _first_last_index(agg, updater, expr, opts, *, in_composite, semiring):
if expr.cfunc_name == "GrB_Matrix_reduce_Aggregator":
A = expr.args[0]
if expr.method_name == "reduce_columnwise":
A = A.T
init = expr._new_vector(bool, size=A._ncols)
init[...] = False # O(1) dense vector in SuiteSparse 5
init(**opts)[...] = False # O(1) dense vector in SuiteSparse 5
expr = semiring(A @ init)
updater << expr
if in_composite:
return updater.parent
elif expr.cfunc_name.startswith("GrB_Vector_reduce"):
v = expr.args[0]
init = expr._new_matrix(bool, nrows=v._size, ncols=1)
init[...] = False # O(1) dense matrix in SuiteSparse 5
step1 = semiring(v @ init).new()
init(**opts)[...] = False # O(1) dense matrix in SuiteSparse 5
step1 = semiring(v @ init).new(**opts)
if in_composite:
return step1
updater << step1[0]
Expand Down
Loading

0 comments on commit ae0a232

Please sign in to comment.