diff --git a/symmer/__init__.py b/symmer/__init__.py index 2078d331..5252ee98 100644 --- a/symmer/__init__.py +++ b/symmer/__init__.py @@ -1,3 +1,4 @@ """Main init for package.""" +from symmer.process_handler import process from symmer.operators import PauliwordOp, QuantumState from symmer.projection import QubitTapering, ContextualSubspace, QubitSubspaceManager \ No newline at end of file diff --git a/symmer/evolution/variational_optimization.py b/symmer/evolution/variational_optimization.py index 813e87f4..f5a6d382 100644 --- a/symmer/evolution/variational_optimization.py +++ b/symmer/evolution/variational_optimization.py @@ -1,7 +1,7 @@ from cached_property import cached_property from qiskit.opflow import CircuitStateFn from qiskit import QuantumCircuit -from symmer import QuantumState, PauliwordOp +from symmer import process, QuantumState, PauliwordOp from symmer.operators.utils import ( symplectic_to_string, safe_PauliwordOp_to_dict, safe_QuantumState_to_dict ) @@ -10,7 +10,6 @@ from scipy.optimize import minimize from copy import deepcopy import numpy as np -import multiprocessing as mp from typing import * class VQE_Driver: @@ -164,11 +163,11 @@ def gradient(self, x: np.array) -> np.array: Ansatz parameter gradient (np.array) """ if self.expectation_eval.find('projector') == -1: - with mp.Pool(mp.cpu_count()) as pool: - grad_vec = pool.starmap( - self.partial_derivative, - [(x, i) for i in range(self.circuit.num_parameters)] - ) + @process.parallelize + def f(index, param): + return self.partial_derivative(param,index) + grad_vec = f(range(self.circuit.num_parameters), x) + else: grad_vec = [self.partial_derivative(x, i) for i in range(self.circuit.num_parameters)] @@ -280,9 +279,11 @@ def commutators(self) -> List[PauliwordOp]: Returns: List of commutators [H, P] """ - with mp.Pool(mp.cpu_count()) as pool: - commutators = pool.map(self.observable.commutator, self.excitation_pool) - return list(map(lambda x:x*1j, commutators)) + @process.parallelize + def f(P, obs): + return obs.commutator(P)*1j + commutators = f(self.excitation_pool, self.observable) + return commutators def _derivative_from_commutators(self, index: int) -> float: """ @@ -334,8 +335,10 @@ def pool_gradient(self): self.current_state = self.get_state(circuit_temp, self.opt_parameters) if self.expectation_eval in ['sparse_array', 'symbolic_direct', 'observable_rotation']: # the commutator method may be parallelized since the state is constant - with mp.Pool(mp.cpu_count()) as pool: - gradient = pool.map(self._derivative_from_commutators, range(self.excitation_pool.n_terms)) + @process.parallelize + def f(index, obs): + return obs._derivative_from_commutators(index) + gradient = f(range(self.excitation_pool.n_terms), self) else: # ... unless using symbolic_projector since this is multiprocessed gradient = list(map(self._derivative_from_commutators, range(self.excitation_pool.n_terms))) diff --git a/symmer/operators/base.py b/symmer/operators/base.py index 3c853d4d..5b5d5831 100644 --- a/symmer/operators/base.py +++ b/symmer/operators/base.py @@ -1,25 +1,21 @@ -import os -import quimb -from symmer.operators.utils import * -import ray +import warnings import numpy as np import pandas as pd import networkx as nx +import matplotlib.pyplot as plt +from symmer import process +from symmer.operators.utils import * +from symmer.operators.utils import _cref_binary from tqdm.auto import tqdm from copy import deepcopy from functools import reduce from typing import List, Union, Optional from numbers import Number -import multiprocessing as mp from cached_property import cached_property +from scipy.stats import unitary_group from scipy.sparse import csr_matrix, csc_matrix, coo_matrix, dok_matrix -from symmer.operators.utils import _cref_binary - from openfermion import QubitOperator, count_qubits -import matplotlib.pyplot as plt from qiskit.opflow import PauliSumOp as ibm_PauliSumOp -from scipy.stats import unitary_group -import warnings warnings.simplefilter('always', UserWarning) from numba.core.errors import NumbaDeprecationWarning, NumbaPendingDeprecationWarning @@ -68,6 +64,12 @@ def __init__(self, assert(self.n_terms==len(self.coeff_vec)), 'coeff list and Pauliwords not same length' self.X_block = self.symp_matrix[:, :self.n_qubits] self.Z_block = self.symp_matrix[:, self.n_qubits:] + + def set_processing_method(self, method): + """ Set the method to use when running parallelizable processes. + Valid options are: mp, ray, single_thread. + """ + process.method = method @classmethod def random(cls, @@ -828,10 +830,11 @@ def expval(self, psi: "QuantumState") -> complex: complex: The expectation value. """ if self.n_terms > 1: - # parallelize if number of terms greater than one - psi_ray_store = ray.put(psi) - expvals = np.array(ray.get( - [single_term_expval.remote(P, psi_ray_store) for P in self])) + @process.parallelize + def f(P, psi): + return single_term_expval(P, psi) + + expvals = np.array(f(self, psi)) else: expvals = np.array(single_term_expval(self, psi)) @@ -2399,16 +2402,7 @@ def get_ij_operator(i:int, j:int, n_qubits:int, else: return ij_symp_matrix, coeffs -@ray.remote(num_cpus=os.cpu_count(), - runtime_env={ - "env_vars": { - "NUMBA_NUM_THREADS": os.getenv("NUMBA_NUM_THREADS"), - # "OMP_NUM_THREADS": str(os.cpu_count()), - "OMP_NUM_THREADS": os.getenv("NUMBA_NUM_THREADS"), - "NUMEXPR_MAX_THREADS": str(os.cpu_count()) - } - } - ) + def single_term_expval(P_op: PauliwordOp, psi: QuantumState) -> float: """ Expectation value calculation for a single Pauli operator given a QuantumState psi diff --git a/symmer/operators/independent_op.py b/symmer/operators/independent_op.py index 30b4bf55..1e339a23 100644 --- a/symmer/operators/independent_op.py +++ b/symmer/operators/independent_op.py @@ -1,10 +1,9 @@ import numpy as np -from typing import Dict, List, Tuple, Union import warnings -import multiprocessing as mp +from typing import Dict, List, Tuple, Union +from symmer import process from symmer.operators.utils import _rref_binary, _cref_binary, check_independent from symmer.operators import PauliwordOp, QuantumState, symplectic_to_string, single_term_expval -import ray class IndependentOp(PauliwordOp): """ @@ -292,17 +291,7 @@ def update_sector(self, assert ref_state._is_normalized(), 'Reference state is not normalized.' ### update the stabilizers assignments in parallel - # with mp.Pool(mp.cpu_count()) as pool: - # self.coeff_vec = np.array( - # list(pool.starmap(assign_value, [(S, ref_state, threshold) for S in self])) - # ) - ref_state_ray_store = ray.put(ref_state) - self.coeff_vec = np.array(ray.get( - [single_term_expval.remote(S, ref_state_ray_store) for S in self])) - # set anything below threshold to be zero - self.coeff_vec[np.abs(self.coeff_vec) int: +@process.parallelize +def assign_value(S: PauliwordOp, ref_state: QuantumState) -> int: """ Measure expectation value of stabilizer on input reference state. Args: @@ -378,6 +367,7 @@ def assign_value(S: PauliwordOp, ref_state: QuantumState, threshold: float) -> i Returns: Expectation Value (int) of stabilizer on input reference state. """ + threshold = 0.5 expval = single_term_expval(S, ref_state) # if this expval exceeds some predefined threshold then assign the corresponding # ±1 eigenvalue. Otherwise, return 0 as insufficient evidence to fix the value. diff --git a/symmer/operators/noncontextual_op.py b/symmer/operators/noncontextual_op.py index eff8edaa..ce1f97b5 100644 --- a/symmer/operators/noncontextual_op.py +++ b/symmer/operators/noncontextual_op.py @@ -1,9 +1,7 @@ import warnings -import os import itertools import numpy as np import networkx as nx -import multiprocessing as mp import qubovert as qv from cached_property import cached_property from time import time @@ -13,8 +11,8 @@ from scipy.optimize import differential_evolution, shgo from symmer.operators import PauliwordOp, IndependentOp, AntiCommutingOp, QuantumState from symmer.operators.utils import binomial_coefficient, perform_noncontextual_sweep -import ray from symmer.utils import random_anitcomm_2n_1_PauliwordOp +from symmer import process class NoncontextualOp(PauliwordOp): """ @@ -748,14 +746,7 @@ def energy_via_brute_force(self) -> Tuple[float, np.array, np.array]: nu_list[:,~self.fixed_ev_mask] = np.array(list(itertools.product([-1,1],repeat=np.sum(~self.fixed_ev_mask)))) # # optimize over all discrete value assignments of nu in parallel - noncon_H = ray.put(self.NC_op) - tracker = np.array(ray.get( - [get_noncon_energy.remote(noncon_H, nu_vec) for nu_vec in nu_list])) - - # with mp.Pool(mp.cpu_count()) as pool: - # tracker = pool.map(self.NC_op.get_energy, nu_list) - - # find the lowest energy eigenvalue assignment from the full list + tracker = get_noncon_energy(nu_list, self.NC_op) full_search_results = zip(tracker, nu_list) energy, fixed_nu = min(full_search_results, key=lambda x:x[0]) @@ -878,20 +869,9 @@ def energy_xUSO(self) -> Tuple[float, np.array, np.array]: return self.NC_op.get_energy(nu_vec), nu_vec - -@ray.remote(num_cpus=os.cpu_count(), - runtime_env={ - "env_vars": { - "NUMBA_NUM_THREADS": os.getenv("NUMBA_NUM_THREADS"), - # "OMP_NUM_THREADS": str(os.cpu_count()), - "OMP_NUM_THREADS": os.getenv("NUMBA_NUM_THREADS"), - "NUMEXPR_MAX_THREADS": str(os.cpu_count()) - } - } - ) -def get_noncon_energy(noncon_H:NoncontextualOp, nu: np.array) -> float: +@process.parallelize +def get_noncon_energy(nu: np.array, noncon_H:NoncontextualOp) -> float: """ The classical objective function that encodes the noncontextual energies. """ - s0, si = noncon_H.get_symmetry_contributions(nu) - return s0 - np.linalg.norm(si) \ No newline at end of file + return noncon_H.get_energy(nu) \ No newline at end of file diff --git a/symmer/process_handler.py b/symmer/process_handler.py new file mode 100644 index 00000000..6ec4ad0c --- /dev/null +++ b/symmer/process_handler.py @@ -0,0 +1,126 @@ +import os +import sys +import numpy as np +import quimb +from ray import remote, put, get +from multiprocessing import Process, Queue, set_start_method + +if sys.platform.lower() in ['linux', 'darwin']: + set_start_method('fork', force = True) +else: + set_start_method('spawn', force = True) + +class ProcessHandler: + + method = 'mp' + verbose = False + + def __init__(self): + self.n_logical_cores = os.cpu_count() + + def prepare_chunks(self, iter): + """ split a list into smaller sized chunks + """ + iter = list(iter) + self.n_chunks = min(len(iter), self.n_logical_cores) + chunk_size = int(np.ceil(len(iter)/self.n_chunks)) + indices = np.append(np.arange(self.n_chunks)*chunk_size, None) + for i,j in zip(indices[:-1], indices[1:]): + yield iter[i:j] + + def _process_ray(self, func, iter, shared): + """ Helper function for ray processing + """ + if self.verbose: + print(f'*** executing in ray mode ***') + # duplicate func with ray.remote wrapper : + @remote(num_cpus=self.n_logical_cores, + runtime_env={ + "env_vars": { + "NUMBA_NUM_THREADS": os.getenv("NUMBA_NUM_THREADS"), + "OMP_NUM_THREADS": os.getenv("NUMBA_NUM_THREADS"), + "NUMEXPR_MAX_THREADS": str(self.n_logical_cores) + } + } + ) + def _func(iter, shared): + return func(iter, shared) + # place into shared memory: + shared_obj = put(shared) + # split iterable into smaller chunks and parallelize remote instances: + results = get( + [ + _func.remote(chunk, shared_obj) + for chunk in self.prepare_chunks(iter) + ] + ) + # flatten the list and return: + return [a for b in results for a in b] + + def _process_mp(self, func, iter, shared): + """ Helper function for multiprocessing + """ + if self.verbose: + print(f'*** executing in multiprocessing mode ***') + # wrapper function for putting results into queue + def _func(iter, shared, _queue=None): + data_out = func(iter, shared) + _queue.put(data_out) + + chunks = list(self.prepare_chunks(iter)) + procs = [] # for storing processes + queue = Queue(self.n_chunks) # storage of data from processes + for chunk in chunks: + proc = Process(target=_func, args=(chunk, shared, queue)) + procs.append(proc) + proc.start() + # retrieve data from the queue + data = [] + for _ in range(self.n_chunks): + data += queue.get() + # complete the processes + for proc in procs: + proc.join() + return data + + def _process_single(self, func, iter, shared): + """ Helper function for single threading + """ + if self.verbose: + print(f'*** executing in single-threaded mode ***') + return func(iter, shared) + + def parallelize(self, func): + + def wrapper(iter, shared): + + _func = lambda iter,shared: [func(i, shared) for i in iter] + + if self.method == 'mp': + return self._process_mp(_func, iter, shared) + elif self.method == 'ray': + return self._process_ray(_func, iter, shared) + elif self.method == 'single_thread': + return self._process_single(_func, iter, shared) + else: + raise ValueError(f'Invalid processing method {self.method}, must be ray, mp or single_thread.') + + return wrapper + +process = ProcessHandler() + +if __name__ == '__main__': + + @process.parallelize + def multiply_list(iter, shared): + return [i*shared for i in iter] + + l = list(range(100)) + + process.method = 'single_thread' + print(multiply_list(l,2)) + process.method = 'mp' + print(multiply_list(l,2)) + process.method = 'ray' + print(multiply_list(l,2)) + diff --git a/symmer/utils.py b/symmer/utils.py index 6cefe6e7..7f3844d5 100644 --- a/symmer/utils.py +++ b/symmer/utils.py @@ -7,7 +7,6 @@ from scipy.sparse import csr_matrix from scipy.sparse import kron as sparse_kron from symmer.operators.utils import _rref_binary -import multiprocessing as mp import ray import os # from psutil import cpu_count