diff --git a/symmer/__init__.py b/symmer/__init__.py index 2078d331..5252ee98 100644 --- a/symmer/__init__.py +++ b/symmer/__init__.py @@ -1,3 +1,4 @@ """Main init for package.""" +from symmer.process_handler import process from symmer.operators import PauliwordOp, QuantumState from symmer.projection import QubitTapering, ContextualSubspace, QubitSubspaceManager \ No newline at end of file diff --git a/symmer/operators/base.py b/symmer/operators/base.py index 3c853d4d..c12e7f7b 100644 --- a/symmer/operators/base.py +++ b/symmer/operators/base.py @@ -1,7 +1,6 @@ import os import quimb from symmer.operators.utils import * -import ray import numpy as np import pandas as pd import networkx as nx @@ -10,11 +9,10 @@ from functools import reduce from typing import List, Union, Optional from numbers import Number -import multiprocessing as mp from cached_property import cached_property from scipy.sparse import csr_matrix, csc_matrix, coo_matrix, dok_matrix +from symmer import process from symmer.operators.utils import _cref_binary - from openfermion import QubitOperator, count_qubits import matplotlib.pyplot as plt from qiskit.opflow import PauliSumOp as ibm_PauliSumOp @@ -828,10 +826,11 @@ def expval(self, psi: "QuantumState") -> complex: complex: The expectation value. """ if self.n_terms > 1: - # parallelize if number of terms greater than one - psi_ray_store = ray.put(psi) - expvals = np.array(ray.get( - [single_term_expval.remote(P, psi_ray_store) for P in self])) + @process.parallelize + def f(P, psi): + return single_term_expval(P, psi) + + expvals = np.array(f(self, psi)) else: expvals = np.array(single_term_expval(self, psi)) @@ -2399,16 +2398,7 @@ def get_ij_operator(i:int, j:int, n_qubits:int, else: return ij_symp_matrix, coeffs -@ray.remote(num_cpus=os.cpu_count(), - runtime_env={ - "env_vars": { - "NUMBA_NUM_THREADS": os.getenv("NUMBA_NUM_THREADS"), - # "OMP_NUM_THREADS": str(os.cpu_count()), - "OMP_NUM_THREADS": os.getenv("NUMBA_NUM_THREADS"), - "NUMEXPR_MAX_THREADS": str(os.cpu_count()) - } - } - ) + def single_term_expval(P_op: PauliwordOp, psi: QuantumState) -> float: """ Expectation value calculation for a single Pauli operator given a QuantumState psi diff --git a/symmer/operators/independent_op.py b/symmer/operators/independent_op.py index 30b4bf55..1e339a23 100644 --- a/symmer/operators/independent_op.py +++ b/symmer/operators/independent_op.py @@ -1,10 +1,9 @@ import numpy as np -from typing import Dict, List, Tuple, Union import warnings -import multiprocessing as mp +from typing import Dict, List, Tuple, Union +from symmer import process from symmer.operators.utils import _rref_binary, _cref_binary, check_independent from symmer.operators import PauliwordOp, QuantumState, symplectic_to_string, single_term_expval -import ray class IndependentOp(PauliwordOp): """ @@ -292,17 +291,7 @@ def update_sector(self, assert ref_state._is_normalized(), 'Reference state is not normalized.' ### update the stabilizers assignments in parallel - # with mp.Pool(mp.cpu_count()) as pool: - # self.coeff_vec = np.array( - # list(pool.starmap(assign_value, [(S, ref_state, threshold) for S in self])) - # ) - ref_state_ray_store = ray.put(ref_state) - self.coeff_vec = np.array(ray.get( - [single_term_expval.remote(S, ref_state_ray_store) for S in self])) - # set anything below threshold to be zero - self.coeff_vec[np.abs(self.coeff_vec) int: +@process.parallelize +def assign_value(S: PauliwordOp, ref_state: QuantumState) -> int: """ Measure expectation value of stabilizer on input reference state. Args: @@ -378,6 +367,7 @@ def assign_value(S: PauliwordOp, ref_state: QuantumState, threshold: float) -> i Returns: Expectation Value (int) of stabilizer on input reference state. """ + threshold = 0.5 expval = single_term_expval(S, ref_state) # if this expval exceeds some predefined threshold then assign the corresponding # ±1 eigenvalue. Otherwise, return 0 as insufficient evidence to fix the value. diff --git a/symmer/operators/noncontextual_op.py b/symmer/operators/noncontextual_op.py index eff8edaa..ce1f97b5 100644 --- a/symmer/operators/noncontextual_op.py +++ b/symmer/operators/noncontextual_op.py @@ -1,9 +1,7 @@ import warnings -import os import itertools import numpy as np import networkx as nx -import multiprocessing as mp import qubovert as qv from cached_property import cached_property from time import time @@ -13,8 +11,8 @@ from scipy.optimize import differential_evolution, shgo from symmer.operators import PauliwordOp, IndependentOp, AntiCommutingOp, QuantumState from symmer.operators.utils import binomial_coefficient, perform_noncontextual_sweep -import ray from symmer.utils import random_anitcomm_2n_1_PauliwordOp +from symmer import process class NoncontextualOp(PauliwordOp): """ @@ -748,14 +746,7 @@ def energy_via_brute_force(self) -> Tuple[float, np.array, np.array]: nu_list[:,~self.fixed_ev_mask] = np.array(list(itertools.product([-1,1],repeat=np.sum(~self.fixed_ev_mask)))) # # optimize over all discrete value assignments of nu in parallel - noncon_H = ray.put(self.NC_op) - tracker = np.array(ray.get( - [get_noncon_energy.remote(noncon_H, nu_vec) for nu_vec in nu_list])) - - # with mp.Pool(mp.cpu_count()) as pool: - # tracker = pool.map(self.NC_op.get_energy, nu_list) - - # find the lowest energy eigenvalue assignment from the full list + tracker = get_noncon_energy(nu_list, self.NC_op) full_search_results = zip(tracker, nu_list) energy, fixed_nu = min(full_search_results, key=lambda x:x[0]) @@ -878,20 +869,9 @@ def energy_xUSO(self) -> Tuple[float, np.array, np.array]: return self.NC_op.get_energy(nu_vec), nu_vec - -@ray.remote(num_cpus=os.cpu_count(), - runtime_env={ - "env_vars": { - "NUMBA_NUM_THREADS": os.getenv("NUMBA_NUM_THREADS"), - # "OMP_NUM_THREADS": str(os.cpu_count()), - "OMP_NUM_THREADS": os.getenv("NUMBA_NUM_THREADS"), - "NUMEXPR_MAX_THREADS": str(os.cpu_count()) - } - } - ) -def get_noncon_energy(noncon_H:NoncontextualOp, nu: np.array) -> float: +@process.parallelize +def get_noncon_energy(nu: np.array, noncon_H:NoncontextualOp) -> float: """ The classical objective function that encodes the noncontextual energies. """ - s0, si = noncon_H.get_symmetry_contributions(nu) - return s0 - np.linalg.norm(si) \ No newline at end of file + return noncon_H.get_energy(nu) \ No newline at end of file diff --git a/symmer/process_handler.py b/symmer/process_handler.py index 87499731..e540703e 100644 --- a/symmer/process_handler.py +++ b/symmer/process_handler.py @@ -1,35 +1,119 @@ -import multiprocessing as mp -import ray +import os +import numpy as np +from ray import remote, put, get +from multiprocessing import Process, Queue class ProcessHandler: - method = 'mp' + method = 'mp' + verbose = False def __init__(self): - self.n_logical_cores = mp.cpu_count() + self.n_logical_cores = os.cpu_count() - def _process_ray(self, func): - def worker(iter, shared=None): - return func(iter, shared) - return worker - - def _process_mp(self, func): - def worker(iter, shared=None): + def prepare_chunks(self, iter): + """ split a list into smaller sized chunks + """ + iter = list(iter) + self.n_chunks = min(len(iter), self.n_logical_cores) + chunk_size = int(np.ceil(len(iter)/self.n_chunks)) + indices = np.append(np.arange(self.n_chunks)*chunk_size, None) + for i,j in zip(indices[:-1], indices[1:]): + yield iter[i:j] + + def _process_ray(self, func, iter, shared): + """ Helper function for ray processing + """ + if self.verbose: + print(f'*** executing in ray mode ***') + # duplicate func with ray.remote wrapper : + @remote(num_cpus=self.n_logical_cores, + # runtime_env={ + # "env_vars": { + # "NUMBA_NUM_THREADS": os.getenv("NUMBA_NUM_THREADS"), + # "OMP_NUM_THREADS": os.getenv("NUMBA_NUM_THREADS"), + # "NUMEXPR_MAX_THREADS": str(self.n_logical_cores) + # } + # } + ) + def _func(iter, shared): return func(iter, shared) - return worker + # place into shared memory: + shared_obj = put(shared) + # split iterable into smaller chunks and parallelize remote instances: + results = get( + [ + _func.remote(chunk, shared_obj) + for chunk in self.prepare_chunks(iter) + ] + ) + # flatten the list and return: + return [a for b in results for a in b] + + def _process_mp(self, func, iter, shared): + """ Helper function for multiprocessing + """ + if self.verbose: + print(f'*** executing in multiprocessing mode ***') + # wrapper function for putting results into queue + def _func(iter, shared, _queue=None): + data_out = func(iter, shared) + _queue.put(data_out) + chunks = list(self.prepare_chunks(iter)) + procs = [] # for storing processes + queue = Queue(self.n_chunks) # storage of data from processes + for chunk in chunks: + proc = Process(target=_func, args=(chunk, shared, queue)) + procs.append(proc) + proc.start() + # retrieve data from the queue + data = [] + for _ in range(self.n_chunks): + data += queue.get() + # complete the processes + for proc in procs: + proc.join() + return data + + def _process_single(self, func, iter, shared): + """ Helper function for single threading + """ + if self.verbose: + print(f'*** executing in single-threaded mode ***') + return func(iter, shared) + def parallelize(self, func): - if self.method == 'mp': - return self._process_mp(func) - elif self.method == 'ray': - return self._process_ray(func) - -if __name__ == '__main__': - PH = ProcessHandler() - @PH.parallelize - def add_n(l, n): - return [i+n for i in l] + def wrapper(iter, shared): + + _func = lambda iter,shared: [func(i, shared) for i in iter] + + if self.method == 'mp': + return self._process_mp(_func, iter, shared) + elif self.method == 'ray': + return self._process_ray(_func, iter, shared) + elif self.method == 'single_thread': + return self._process_single(_func, iter, shared) + else: + raise ValueError(f'Invalid processing method {self.method}, must be ray, mp or single_thread.') + + return wrapper + +process = ProcessHandler() - print(add_n([1,2,3,4,5,6,7], 3)) +if __name__ == '__main__': + + @process.parallelize + def multiply_list(iter, shared): + return [i*shared for i in iter] + + l = list(range(100)) + + process.method = 'single_thread' + print(multiply_list(l,2)) + process.method = 'mp' + print(multiply_list(l,2)) + process.method = 'ray' + print(multiply_list(l,2)) diff --git a/symmer/utils.py b/symmer/utils.py index 6cefe6e7..7f3844d5 100644 --- a/symmer/utils.py +++ b/symmer/utils.py @@ -7,7 +7,6 @@ from scipy.sparse import csr_matrix from scipy.sparse import kron as sparse_kron from symmer.operators.utils import _rref_binary -import multiprocessing as mp import ray import os # from psutil import cpu_count