Skip to content

Commit

Permalink
Merge pull request #167 from UCL-CCS/parallelization
Browse files Browse the repository at this point in the history
Parallelization
  • Loading branch information
TimWeaving authored Nov 7, 2023
2 parents 5f2ab5c + ad238b3 commit c842a50
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 78 deletions.
1 change: 1 addition & 0 deletions symmer/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""Main init for package."""
from symmer.process_handler import process
from symmer.operators import PauliwordOp, QuantumState
from symmer.projection import QubitTapering, ContextualSubspace, QubitSubspaceManager
27 changes: 15 additions & 12 deletions symmer/evolution/variational_optimization.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from cached_property import cached_property
from qiskit.opflow import CircuitStateFn
from qiskit import QuantumCircuit
from symmer import QuantumState, PauliwordOp
from symmer import process, QuantumState, PauliwordOp
from symmer.operators.utils import (
symplectic_to_string, safe_PauliwordOp_to_dict, safe_QuantumState_to_dict
)
Expand All @@ -10,7 +10,6 @@
from scipy.optimize import minimize
from copy import deepcopy
import numpy as np
import multiprocessing as mp
from typing import *

class VQE_Driver:
Expand Down Expand Up @@ -164,11 +163,11 @@ def gradient(self, x: np.array) -> np.array:
Ansatz parameter gradient (np.array)
"""
if self.expectation_eval.find('projector') == -1:
with mp.Pool(mp.cpu_count()) as pool:
grad_vec = pool.starmap(
self.partial_derivative,
[(x, i) for i in range(self.circuit.num_parameters)]
)
@process.parallelize
def f(index, param):
return self.partial_derivative(param,index)
grad_vec = f(range(self.circuit.num_parameters), x)

else:
grad_vec = [self.partial_derivative(x, i) for i in range(self.circuit.num_parameters)]

Expand Down Expand Up @@ -280,9 +279,11 @@ def commutators(self) -> List[PauliwordOp]:
Returns:
List of commutators [H, P]
"""
with mp.Pool(mp.cpu_count()) as pool:
commutators = pool.map(self.observable.commutator, self.excitation_pool)
return list(map(lambda x:x*1j, commutators))
@process.parallelize
def f(P, obs):
return obs.commutator(P)*1j
commutators = f(self.excitation_pool, self.observable)
return commutators

def _derivative_from_commutators(self, index: int) -> float:
"""
Expand Down Expand Up @@ -334,8 +335,10 @@ def pool_gradient(self):
self.current_state = self.get_state(circuit_temp, self.opt_parameters)
if self.expectation_eval in ['sparse_array', 'symbolic_direct', 'observable_rotation']:
# the commutator method may be parallelized since the state is constant
with mp.Pool(mp.cpu_count()) as pool:
gradient = pool.map(self._derivative_from_commutators, range(self.excitation_pool.n_terms))
@process.parallelize
def f(index, obs):
return obs._derivative_from_commutators(index)
gradient = f(range(self.excitation_pool.n_terms), self)
else:
# ... unless using symbolic_projector since this is multiprocessed
gradient = list(map(self._derivative_from_commutators, range(self.excitation_pool.n_terms)))
Expand Down
42 changes: 18 additions & 24 deletions symmer/operators/base.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,21 @@
import os
import quimb
from symmer.operators.utils import *
import ray
import warnings
import numpy as np
import pandas as pd
import networkx as nx
import matplotlib.pyplot as plt
from symmer import process
from symmer.operators.utils import *
from symmer.operators.utils import _cref_binary
from tqdm.auto import tqdm
from copy import deepcopy
from functools import reduce
from typing import List, Union, Optional
from numbers import Number
import multiprocessing as mp
from cached_property import cached_property
from scipy.stats import unitary_group
from scipy.sparse import csr_matrix, csc_matrix, coo_matrix, dok_matrix
from symmer.operators.utils import _cref_binary

from openfermion import QubitOperator, count_qubits
import matplotlib.pyplot as plt
from qiskit.opflow import PauliSumOp as ibm_PauliSumOp
from scipy.stats import unitary_group
import warnings
warnings.simplefilter('always', UserWarning)

from numba.core.errors import NumbaDeprecationWarning, NumbaPendingDeprecationWarning
Expand Down Expand Up @@ -68,6 +64,12 @@ def __init__(self,
assert(self.n_terms==len(self.coeff_vec)), 'coeff list and Pauliwords not same length'
self.X_block = self.symp_matrix[:, :self.n_qubits]
self.Z_block = self.symp_matrix[:, self.n_qubits:]

def set_processing_method(self, method):
""" Set the method to use when running parallelizable processes.
Valid options are: mp, ray, single_thread.
"""
process.method = method

@classmethod
def random(cls,
Expand Down Expand Up @@ -828,10 +830,11 @@ def expval(self, psi: "QuantumState") -> complex:
complex: The expectation value.
"""
if self.n_terms > 1:
# parallelize if number of terms greater than one
psi_ray_store = ray.put(psi)
expvals = np.array(ray.get(
[single_term_expval.remote(P, psi_ray_store) for P in self]))
@process.parallelize
def f(P, psi):
return single_term_expval(P, psi)

expvals = np.array(f(self, psi))
else:
expvals = np.array(single_term_expval(self, psi))

Expand Down Expand Up @@ -2399,16 +2402,7 @@ def get_ij_operator(i:int, j:int, n_qubits:int,
else:
return ij_symp_matrix, coeffs

@ray.remote(num_cpus=os.cpu_count(),
runtime_env={
"env_vars": {
"NUMBA_NUM_THREADS": os.getenv("NUMBA_NUM_THREADS"),
# "OMP_NUM_THREADS": str(os.cpu_count()),
"OMP_NUM_THREADS": os.getenv("NUMBA_NUM_THREADS"),
"NUMEXPR_MAX_THREADS": str(os.cpu_count())
}
}
)

def single_term_expval(P_op: PauliwordOp, psi: QuantumState) -> float:
"""
Expectation value calculation for a single Pauli operator given a QuantumState psi
Expand Down
22 changes: 6 additions & 16 deletions symmer/operators/independent_op.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import numpy as np
from typing import Dict, List, Tuple, Union
import warnings
import multiprocessing as mp
from typing import Dict, List, Tuple, Union
from symmer import process
from symmer.operators.utils import _rref_binary, _cref_binary, check_independent
from symmer.operators import PauliwordOp, QuantumState, symplectic_to_string, single_term_expval
import ray

class IndependentOp(PauliwordOp):
"""
Expand Down Expand Up @@ -292,17 +291,7 @@ def update_sector(self,
assert ref_state._is_normalized(), 'Reference state is not normalized.'

### update the stabilizers assignments in parallel
# with mp.Pool(mp.cpu_count()) as pool:
# self.coeff_vec = np.array(
# list(pool.starmap(assign_value, [(S, ref_state, threshold) for S in self]))
# )
ref_state_ray_store = ray.put(ref_state)
self.coeff_vec = np.array(ray.get(
[single_term_expval.remote(S, ref_state_ray_store) for S in self]))
# set anything below threshold to be zero
self.coeff_vec[np.abs(self.coeff_vec)<threshold] = 0
self.coeff_vec = np.sign(self.coeff_vec)

self.coeff_vec = np.array(assign_value(self, ref_state))
# raise a warning if any stabilizers are assigned a zero value
if np.any(self.coeff_vec==0):
S_zero = self[self.coeff_vec==0]; S_zero.coeff_vec[:]=1
Expand Down Expand Up @@ -366,8 +355,8 @@ def __iter__(self):
"""
return iter([self[i] for i in range(self.n_terms)])


def assign_value(S: PauliwordOp, ref_state: QuantumState, threshold: float) -> int:
@process.parallelize
def assign_value(S: PauliwordOp, ref_state: QuantumState) -> int:
"""
Measure expectation value of stabilizer on input reference state.
Args:
Expand All @@ -378,6 +367,7 @@ def assign_value(S: PauliwordOp, ref_state: QuantumState, threshold: float) -> i
Returns:
Expectation Value (int) of stabilizer on input reference state.
"""
threshold = 0.5
expval = single_term_expval(S, ref_state)
# if this expval exceeds some predefined threshold then assign the corresponding
# ±1 eigenvalue. Otherwise, return 0 as insufficient evidence to fix the value.
Expand Down
30 changes: 5 additions & 25 deletions symmer/operators/noncontextual_op.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import warnings
import os
import itertools
import numpy as np
import networkx as nx
import multiprocessing as mp
import qubovert as qv
from cached_property import cached_property
from time import time
Expand All @@ -13,8 +11,8 @@
from scipy.optimize import differential_evolution, shgo
from symmer.operators import PauliwordOp, IndependentOp, AntiCommutingOp, QuantumState
from symmer.operators.utils import binomial_coefficient, perform_noncontextual_sweep
import ray
from symmer.utils import random_anitcomm_2n_1_PauliwordOp
from symmer import process

class NoncontextualOp(PauliwordOp):
"""
Expand Down Expand Up @@ -748,14 +746,7 @@ def energy_via_brute_force(self) -> Tuple[float, np.array, np.array]:
nu_list[:,~self.fixed_ev_mask] = np.array(list(itertools.product([-1,1],repeat=np.sum(~self.fixed_ev_mask))))

# # optimize over all discrete value assignments of nu in parallel
noncon_H = ray.put(self.NC_op)
tracker = np.array(ray.get(
[get_noncon_energy.remote(noncon_H, nu_vec) for nu_vec in nu_list]))

# with mp.Pool(mp.cpu_count()) as pool:
# tracker = pool.map(self.NC_op.get_energy, nu_list)

# find the lowest energy eigenvalue assignment from the full list
tracker = get_noncon_energy(nu_list, self.NC_op)
full_search_results = zip(tracker, nu_list)
energy, fixed_nu = min(full_search_results, key=lambda x:x[0])

Expand Down Expand Up @@ -878,20 +869,9 @@ def energy_xUSO(self) -> Tuple[float, np.array, np.array]:

return self.NC_op.get_energy(nu_vec), nu_vec


@ray.remote(num_cpus=os.cpu_count(),
runtime_env={
"env_vars": {
"NUMBA_NUM_THREADS": os.getenv("NUMBA_NUM_THREADS"),
# "OMP_NUM_THREADS": str(os.cpu_count()),
"OMP_NUM_THREADS": os.getenv("NUMBA_NUM_THREADS"),
"NUMEXPR_MAX_THREADS": str(os.cpu_count())
}
}
)
def get_noncon_energy(noncon_H:NoncontextualOp, nu: np.array) -> float:
@process.parallelize
def get_noncon_energy(nu: np.array, noncon_H:NoncontextualOp) -> float:
"""
The classical objective function that encodes the noncontextual energies.
"""
s0, si = noncon_H.get_symmetry_contributions(nu)
return s0 - np.linalg.norm(si)
return noncon_H.get_energy(nu)
126 changes: 126 additions & 0 deletions symmer/process_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import os
import sys
import numpy as np
import quimb
from ray import remote, put, get
from multiprocessing import Process, Queue, set_start_method

if sys.platform.lower() in ['linux', 'darwin']:
set_start_method('fork', force = True)
else:
set_start_method('spawn', force = True)

class ProcessHandler:

method = 'mp'
verbose = False

def __init__(self):
self.n_logical_cores = os.cpu_count()

def prepare_chunks(self, iter):
""" split a list into smaller sized chunks
"""
iter = list(iter)
self.n_chunks = min(len(iter), self.n_logical_cores)
chunk_size = int(np.ceil(len(iter)/self.n_chunks))
indices = np.append(np.arange(self.n_chunks)*chunk_size, None)
for i,j in zip(indices[:-1], indices[1:]):
yield iter[i:j]

def _process_ray(self, func, iter, shared):
""" Helper function for ray processing
"""
if self.verbose:
print(f'*** executing in ray mode ***')
# duplicate func with ray.remote wrapper :
@remote(num_cpus=self.n_logical_cores,
runtime_env={
"env_vars": {
"NUMBA_NUM_THREADS": os.getenv("NUMBA_NUM_THREADS"),
"OMP_NUM_THREADS": os.getenv("NUMBA_NUM_THREADS"),
"NUMEXPR_MAX_THREADS": str(self.n_logical_cores)
}
}
)
def _func(iter, shared):
return func(iter, shared)
# place into shared memory:
shared_obj = put(shared)
# split iterable into smaller chunks and parallelize remote instances:
results = get(
[
_func.remote(chunk, shared_obj)
for chunk in self.prepare_chunks(iter)
]
)
# flatten the list and return:
return [a for b in results for a in b]

def _process_mp(self, func, iter, shared):
""" Helper function for multiprocessing
"""
if self.verbose:
print(f'*** executing in multiprocessing mode ***')
# wrapper function for putting results into queue
def _func(iter, shared, _queue=None):
data_out = func(iter, shared)
_queue.put(data_out)

chunks = list(self.prepare_chunks(iter))
procs = [] # for storing processes
queue = Queue(self.n_chunks) # storage of data from processes
for chunk in chunks:
proc = Process(target=_func, args=(chunk, shared, queue))
procs.append(proc)
proc.start()
# retrieve data from the queue
data = []
for _ in range(self.n_chunks):
data += queue.get()
# complete the processes
for proc in procs:
proc.join()
return data

def _process_single(self, func, iter, shared):
""" Helper function for single threading
"""
if self.verbose:
print(f'*** executing in single-threaded mode ***')
return func(iter, shared)

def parallelize(self, func):

def wrapper(iter, shared):

_func = lambda iter,shared: [func(i, shared) for i in iter]

if self.method == 'mp':
return self._process_mp(_func, iter, shared)
elif self.method == 'ray':
return self._process_ray(_func, iter, shared)
elif self.method == 'single_thread':
return self._process_single(_func, iter, shared)
else:
raise ValueError(f'Invalid processing method {self.method}, must be ray, mp or single_thread.')

return wrapper

process = ProcessHandler()

if __name__ == '__main__':

@process.parallelize
def multiply_list(iter, shared):
return [i*shared for i in iter]

l = list(range(100))

process.method = 'single_thread'
print(multiply_list(l,2))
process.method = 'mp'
print(multiply_list(l,2))
process.method = 'ray'
print(multiply_list(l,2))

1 change: 0 additions & 1 deletion symmer/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from scipy.sparse import csr_matrix
from scipy.sparse import kron as sparse_kron
from symmer.operators.utils import _rref_binary
import multiprocessing as mp
import ray
import os
# from psutil import cpu_count
Expand Down

0 comments on commit c842a50

Please sign in to comment.