Skip to content

Commit

Permalink
Added new process handler that controls the parallelization mode glob…
Browse files Browse the repository at this point in the history
…ally: multiprocessing, ray or single_thread
  • Loading branch information
TimWeaving committed Nov 6, 2023
1 parent f815de7 commit 5477b93
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 82 deletions.
1 change: 1 addition & 0 deletions symmer/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""Main init for package."""
from symmer.process_handler import process
from symmer.operators import PauliwordOp, QuantumState
from symmer.projection import QubitTapering, ContextualSubspace, QubitSubspaceManager
24 changes: 7 additions & 17 deletions symmer/operators/base.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import os
import quimb
from symmer.operators.utils import *
import ray
import numpy as np
import pandas as pd
import networkx as nx
Expand All @@ -10,11 +9,10 @@
from functools import reduce
from typing import List, Union, Optional
from numbers import Number
import multiprocessing as mp
from cached_property import cached_property
from scipy.sparse import csr_matrix, csc_matrix, coo_matrix, dok_matrix
from symmer import process
from symmer.operators.utils import _cref_binary

from openfermion import QubitOperator, count_qubits
import matplotlib.pyplot as plt
from qiskit.opflow import PauliSumOp as ibm_PauliSumOp
Expand Down Expand Up @@ -828,10 +826,11 @@ def expval(self, psi: "QuantumState") -> complex:
complex: The expectation value.
"""
if self.n_terms > 1:
# parallelize if number of terms greater than one
psi_ray_store = ray.put(psi)
expvals = np.array(ray.get(
[single_term_expval.remote(P, psi_ray_store) for P in self]))
@process.parallelize
def f(P, psi):
return single_term_expval(P, psi)

expvals = np.array(f(self, psi))
else:
expvals = np.array(single_term_expval(self, psi))

Expand Down Expand Up @@ -2399,16 +2398,7 @@ def get_ij_operator(i:int, j:int, n_qubits:int,
else:
return ij_symp_matrix, coeffs

@ray.remote(num_cpus=os.cpu_count(),
runtime_env={
"env_vars": {
"NUMBA_NUM_THREADS": os.getenv("NUMBA_NUM_THREADS"),
# "OMP_NUM_THREADS": str(os.cpu_count()),
"OMP_NUM_THREADS": os.getenv("NUMBA_NUM_THREADS"),
"NUMEXPR_MAX_THREADS": str(os.cpu_count())
}
}
)

def single_term_expval(P_op: PauliwordOp, psi: QuantumState) -> float:
"""
Expectation value calculation for a single Pauli operator given a QuantumState psi
Expand Down
22 changes: 6 additions & 16 deletions symmer/operators/independent_op.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import numpy as np
from typing import Dict, List, Tuple, Union
import warnings
import multiprocessing as mp
from typing import Dict, List, Tuple, Union
from symmer import process
from symmer.operators.utils import _rref_binary, _cref_binary, check_independent
from symmer.operators import PauliwordOp, QuantumState, symplectic_to_string, single_term_expval
import ray

class IndependentOp(PauliwordOp):
"""
Expand Down Expand Up @@ -292,17 +291,7 @@ def update_sector(self,
assert ref_state._is_normalized(), 'Reference state is not normalized.'

### update the stabilizers assignments in parallel
# with mp.Pool(mp.cpu_count()) as pool:
# self.coeff_vec = np.array(
# list(pool.starmap(assign_value, [(S, ref_state, threshold) for S in self]))
# )
ref_state_ray_store = ray.put(ref_state)
self.coeff_vec = np.array(ray.get(
[single_term_expval.remote(S, ref_state_ray_store) for S in self]))
# set anything below threshold to be zero
self.coeff_vec[np.abs(self.coeff_vec)<threshold] = 0
self.coeff_vec = np.sign(self.coeff_vec)

self.coeff_vec = np.array(assign_value(self, ref_state))
# raise a warning if any stabilizers are assigned a zero value
if np.any(self.coeff_vec==0):
S_zero = self[self.coeff_vec==0]; S_zero.coeff_vec[:]=1
Expand Down Expand Up @@ -366,8 +355,8 @@ def __iter__(self):
"""
return iter([self[i] for i in range(self.n_terms)])


def assign_value(S: PauliwordOp, ref_state: QuantumState, threshold: float) -> int:
@process.parallelize
def assign_value(S: PauliwordOp, ref_state: QuantumState) -> int:
"""
Measure expectation value of stabilizer on input reference state.
Args:
Expand All @@ -378,6 +367,7 @@ def assign_value(S: PauliwordOp, ref_state: QuantumState, threshold: float) -> i
Returns:
Expectation Value (int) of stabilizer on input reference state.
"""
threshold = 0.5
expval = single_term_expval(S, ref_state)
# if this expval exceeds some predefined threshold then assign the corresponding
# ±1 eigenvalue. Otherwise, return 0 as insufficient evidence to fix the value.
Expand Down
30 changes: 5 additions & 25 deletions symmer/operators/noncontextual_op.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import warnings
import os
import itertools
import numpy as np
import networkx as nx
import multiprocessing as mp
import qubovert as qv
from cached_property import cached_property
from time import time
Expand All @@ -13,8 +11,8 @@
from scipy.optimize import differential_evolution, shgo
from symmer.operators import PauliwordOp, IndependentOp, AntiCommutingOp, QuantumState
from symmer.operators.utils import binomial_coefficient, perform_noncontextual_sweep
import ray
from symmer.utils import random_anitcomm_2n_1_PauliwordOp
from symmer import process

class NoncontextualOp(PauliwordOp):
"""
Expand Down Expand Up @@ -748,14 +746,7 @@ def energy_via_brute_force(self) -> Tuple[float, np.array, np.array]:
nu_list[:,~self.fixed_ev_mask] = np.array(list(itertools.product([-1,1],repeat=np.sum(~self.fixed_ev_mask))))

# # optimize over all discrete value assignments of nu in parallel
noncon_H = ray.put(self.NC_op)
tracker = np.array(ray.get(
[get_noncon_energy.remote(noncon_H, nu_vec) for nu_vec in nu_list]))

# with mp.Pool(mp.cpu_count()) as pool:
# tracker = pool.map(self.NC_op.get_energy, nu_list)

# find the lowest energy eigenvalue assignment from the full list
tracker = get_noncon_energy(nu_list, self.NC_op)
full_search_results = zip(tracker, nu_list)
energy, fixed_nu = min(full_search_results, key=lambda x:x[0])

Expand Down Expand Up @@ -878,20 +869,9 @@ def energy_xUSO(self) -> Tuple[float, np.array, np.array]:

return self.NC_op.get_energy(nu_vec), nu_vec


@ray.remote(num_cpus=os.cpu_count(),
runtime_env={
"env_vars": {
"NUMBA_NUM_THREADS": os.getenv("NUMBA_NUM_THREADS"),
# "OMP_NUM_THREADS": str(os.cpu_count()),
"OMP_NUM_THREADS": os.getenv("NUMBA_NUM_THREADS"),
"NUMEXPR_MAX_THREADS": str(os.cpu_count())
}
}
)
def get_noncon_energy(noncon_H:NoncontextualOp, nu: np.array) -> float:
@process.parallelize
def get_noncon_energy(nu: np.array, noncon_H:NoncontextualOp) -> float:
"""
The classical objective function that encodes the noncontextual energies.
"""
s0, si = noncon_H.get_symmetry_contributions(nu)
return s0 - np.linalg.norm(si)
return noncon_H.get_energy(nu)
130 changes: 107 additions & 23 deletions symmer/process_handler.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,119 @@
import multiprocessing as mp
import ray
import os
import numpy as np
from ray import remote, put, get
from multiprocessing import Process, Queue

class ProcessHandler:

method = 'mp'
method = 'mp'
verbose = False

def __init__(self):
self.n_logical_cores = mp.cpu_count()
self.n_logical_cores = os.cpu_count()

def _process_ray(self, func):
def worker(iter, shared=None):
return func(iter, shared)
return worker

def _process_mp(self, func):
def worker(iter, shared=None):
def prepare_chunks(self, iter):
""" split a list into smaller sized chunks
"""
iter = list(iter)
self.n_chunks = min(len(iter), self.n_logical_cores)
chunk_size = int(np.ceil(len(iter)/self.n_chunks))
indices = np.append(np.arange(self.n_chunks)*chunk_size, None)
for i,j in zip(indices[:-1], indices[1:]):
yield iter[i:j]

def _process_ray(self, func, iter, shared):
""" Helper function for ray processing
"""
if self.verbose:
print(f'*** executing in ray mode ***')
# duplicate func with ray.remote wrapper :
@remote(num_cpus=self.n_logical_cores,
# runtime_env={
# "env_vars": {
# "NUMBA_NUM_THREADS": os.getenv("NUMBA_NUM_THREADS"),
# "OMP_NUM_THREADS": os.getenv("NUMBA_NUM_THREADS"),
# "NUMEXPR_MAX_THREADS": str(self.n_logical_cores)
# }
# }
)
def _func(iter, shared):
return func(iter, shared)
return worker
# place into shared memory:
shared_obj = put(shared)
# split iterable into smaller chunks and parallelize remote instances:
results = get(
[
_func.remote(chunk, shared_obj)
for chunk in self.prepare_chunks(iter)
]
)
# flatten the list and return:
return [a for b in results for a in b]

def _process_mp(self, func, iter, shared):
""" Helper function for multiprocessing
"""
if self.verbose:
print(f'*** executing in multiprocessing mode ***')
# wrapper function for putting results into queue
def _func(iter, shared, _queue=None):
data_out = func(iter, shared)
_queue.put(data_out)

chunks = list(self.prepare_chunks(iter))
procs = [] # for storing processes
queue = Queue(self.n_chunks) # storage of data from processes
for chunk in chunks:
proc = Process(target=_func, args=(chunk, shared, queue))
procs.append(proc)
proc.start()
# retrieve data from the queue
data = []
for _ in range(self.n_chunks):
data += queue.get()
# complete the processes
for proc in procs:
proc.join()
return data

def _process_single(self, func, iter, shared):
""" Helper function for single threading
"""
if self.verbose:
print(f'*** executing in single-threaded mode ***')
return func(iter, shared)

def parallelize(self, func):
if self.method == 'mp':
return self._process_mp(func)
elif self.method == 'ray':
return self._process_ray(func)

if __name__ == '__main__':
PH = ProcessHandler()

@PH.parallelize
def add_n(l, n):
return [i+n for i in l]
def wrapper(iter, shared):

_func = lambda iter,shared: [func(i, shared) for i in iter]

if self.method == 'mp':
return self._process_mp(_func, iter, shared)
elif self.method == 'ray':
return self._process_ray(_func, iter, shared)
elif self.method == 'single_thread':
return self._process_single(_func, iter, shared)
else:
raise ValueError(f'Invalid processing method {self.method}, must be ray, mp or single_thread.')

return wrapper

process = ProcessHandler()

print(add_n([1,2,3,4,5,6,7], 3))
if __name__ == '__main__':

@process.parallelize
def multiply_list(iter, shared):
return [i*shared for i in iter]

l = list(range(100))

process.method = 'single_thread'
print(multiply_list(l,2))
process.method = 'mp'
print(multiply_list(l,2))
process.method = 'ray'
print(multiply_list(l,2))

1 change: 0 additions & 1 deletion symmer/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from scipy.sparse import csr_matrix
from scipy.sparse import kron as sparse_kron
from symmer.operators.utils import _rref_binary
import multiprocessing as mp
import ray
import os
# from psutil import cpu_count
Expand Down

0 comments on commit 5477b93

Please sign in to comment.