diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 09961f8..5275b3e 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -62,8 +62,10 @@ repos: - id: ruff args: ["--fix", "--show-fixes"] types_or: [python, pyi, jupyter] + exclude: "src/qutip_qoc/examples.ipynb" - id: ruff-format types_or: [python, pyi, jupyter] + exclude: "src/qutip_qoc/examples.ipynb" # Also run Black on examples in the documentation - repo: https://github.com/adamchainz/blacken-docs diff --git a/src/qutip_qoc/analytical_control.py b/src/qutip_qoc/analytical_control.py index 9de9600..a02fefb 100644 --- a/src/qutip_qoc/analytical_control.py +++ b/src/qutip_qoc/analytical_control.py @@ -173,15 +173,16 @@ def opt_callback(self, x, f, accept): def optimize_pulses( - objectives, - pulse_options, - time_interval, - time_options, - algorithm_kwargs, - optimizer_kwargs, - minimizer_kwargs, - integrator_kwargs, - crab_optimizer=None): + objectives, + pulse_options, + time_interval, + time_options, + algorithm_kwargs, + optimizer_kwargs, + minimizer_kwargs, + integrator_kwargs, + crab_optimizer=None, +): """ Optimize a pulse sequence to implement a given target unitary by optimizing the parameters of the pulse functions. The algorithm is a two-layered @@ -304,16 +305,27 @@ def optimize_pulses( **integrator_kwargs, ) elif algorithm_kwargs.get("alg") == "GOAT": - multi_objective = Multi_GOAT(objectives, time_interval, time_options, - pulse_options, algorithm_kwargs, - guess_params=optimizer_kwargs["x0"], - **integrator_kwargs) + multi_objective = Multi_GOAT( + objectives, + time_interval, + time_options, + pulse_options, + algorithm_kwargs, + guess_params=optimizer_kwargs["x0"], + **integrator_kwargs, + ) elif algorithm_kwargs.get("alg") == "CRAB": - multi_objective = Multi_CRAB(crab_optimizer, - objectives, time_interval, time_options, - pulse_options, algorithm_kwargs, - guess_params=optimizer_kwargs["x0"], - **integrator_kwargs) + multi_objective = Multi_CRAB( + crab_optimizer, + objectives, + time_interval, + time_options, + pulse_options, + algorithm_kwargs, + guess_params=optimizer_kwargs["x0"], + **integrator_kwargs, + ) + minimizer_kwargs.setdefault("method", "Nelder-Mead") # optimizer specific settings opt_method = optimizer_kwargs.get( @@ -331,7 +343,7 @@ def optimize_pulses( ), ) - if len(bounds) != 0:# realizes boundaries through minimizer + if len(bounds) != 0: # realizes boundaries through minimizer minimizer_kwargs.setdefault("bounds", np.concatenate(bounds)) elif opt_method == "dual_annealing": @@ -345,7 +357,7 @@ def optimize_pulses( ), ) - if len(bounds) != 0:# realizes boundaries through optimizer + if len(bounds) != 0: # realizes boundaries through optimizer optimizer_kwargs.setdefault("bounds", np.concatenate(bounds)) # remove overload from optimizer_kwargs @@ -369,9 +381,9 @@ def optimize_pulses( min_res = optimizer( func=multi_objective.goal_fun, minimizer_kwargs={ - 'jac': None,#multi_objective.grad_fun, - 'callback': cllbck.min_callback, - **minimizer_kwargs + "jac": multi_objective.grad_fun, + "callback": cllbck.min_callback, + **minimizer_kwargs, }, callback=cllbck.opt_callback, **optimizer_kwargs, diff --git a/src/qutip_qoc/crab.py b/src/qutip_qoc/crab.py index 95ead23..65ba6e3 100644 --- a/src/qutip_qoc/crab.py +++ b/src/qutip_qoc/crab.py @@ -3,15 +3,16 @@ import qutip_qtrl.optimizer as opt import types import copy + logger = logging.get_logger() + class CRAB(opt.OptimizerCrab): def __init__(self, cfg, dyn, params, termination_conditions): super().__init__(cfg, dyn, params) self.init_optim(termination_conditions) - # overwrite def fid_err_func_wrapper(self, *args): """ Get the fidelity error achieved using the ctrl amplitudes passed @@ -32,15 +33,12 @@ def fid_err_func_wrapper(self, *args): self.stats.num_fidelity_func_calls = self.num_fid_func_calls if self.log_level <= logging.DEBUG: logger.debug( - "fidelity error call {}".format( - self.stats.num_fidelity_func_calls - ) + "fidelity error call {}".format(self.stats.num_fidelity_func_calls) ) amps = self._get_ctrl_amps(args[0].copy()) self.dynamics.update_ctrl_amps(amps) - tc = self.termination_conditions err = self.dynamics.fid_computer.get_fid_err() if self.iter_summary: @@ -54,13 +52,12 @@ def fid_err_func_wrapper(self, *args): """ if err <= tc.fid_err_targ: raise errors.GoalAchievedTerminate(err) - + if self.num_fid_func_calls > tc.max_fid_func_calls: raise errors.MaxFidFuncCallTerminate() """ return err -# overwrite def fid_err_grad_wrapper(self, *args): """ @@ -84,9 +81,7 @@ def fid_err_grad_wrapper(self, *args): if self.stats is not None: self.stats.num_grad_func_calls = self.num_grad_func_calls if self.log_level <= logging.DEBUG: - logger.debug( - "gradient call {}".format(self.stats.num_grad_func_calls) - ) + logger.debug("gradient call {}".format(self.stats.num_grad_func_calls)) amps = self._get_ctrl_amps(args[0].copy()) self.dynamics.update_ctrl_amps(amps) fid_comp = self.dynamics.fid_computer @@ -114,19 +109,30 @@ def fid_err_grad_wrapper(self, *args): return grad.flatten() -class Multi_CRAB(): +class Multi_CRAB: """ Composite class for multiple GOAT instances to optimize multiple objectives simultaneously """ - def __init__(self, crab_optimizer, objectives, time_interval, time_options, pulse_options, - alg_kwargs, guess_params, **integrator_kwargs): - - self.crabs = [] - - for obj in objectives: - crab = copy.deepcopy(crab_optimizer) + crabs: list = [] + grad_fun = None + + def __init__( + self, + crab_optimizer, + objectives, + time_interval, + time_options, + pulse_options, + alg_kwargs, + guess_params, + **integrator_kwargs, + ): + if not isinstance(crab_optimizer, list): + crab_optimizer = [crab_optimizer] + for optim in crab_optimizer: + crab = copy.deepcopy(optim) crab.fid_err_func_wrapper = types.MethodType(fid_err_func_wrapper, crab) # Stack for each objective self.crabs.append(crab) @@ -145,14 +151,3 @@ def goal_fun(self, params): self.mean_infid = np.mean(infid_sum) return self.mean_infid - - def grad_fun(self, params): - """ - Calculates the sum of gradients over all objectives - """ - grads = 0 - - for c in self.crabs: - grads += c.fid_err_grad_wrapper(params) - - return grads \ No newline at end of file diff --git a/src/qutip_qoc/optimize.py b/src/qutip_qoc/optimize.py index 37f1312..ea84e5f 100644 --- a/src/qutip_qoc/optimize.py +++ b/src/qutip_qoc/optimize.py @@ -120,70 +120,138 @@ def optimize_pulses( """ alg = algorithm_kwargs.get("alg", "GRAPE") - if alg == "GOAT" or alg == "JOAT": - return opt_pulses( - objectives, - pulse_options, - time_interval, - time_options, - algorithm_kwargs, - optimizer_kwargs, - minimizer_kwargs, - integrator_kwargs, - ) - - elif alg == "GRAPE" or alg == "CRAB": - if len(objectives) != 1: + if isinstance(objectives, list): + if alg == "GRAPE" and len(objectives) != 1: raise TypeError( - "GRAPE and CRAB optimization only supports one objective at a " - "time. Please use GOAT or JOAT for multiple objectives." + "GRAPE optimization only supports one objective at a time. Please use CRAB, GOAT or JOAT for multiple objectives." ) - objective = objectives[0] + else: + objectives = [objectives] + Hd_lst, Hc_lst = [], [] + for objective in objectives: # extract drift and control Hamiltonians from the objective - Hd = objective.H[0] - Hc_lst = [H[0] for H in objective.H[1:]] - - # extract initial and target states/operators from the objective - init = objective.initial - targ = objective.target - - # extract guess and bounds for the control pulses - x0, bounds = [], [] - for key in pulse_options.keys(): - x0.append(pulse_options[key].get("guess")) - bounds.append(pulse_options[key].get("bounds")) - - try: - lbound = [b[0][0] for b in bounds] - ubound = [b[0][1] for b in bounds] - except Exception: - lbound = [b[0] for b in bounds] - ubound = [b[1] for b in bounds] - - # default "log_level" if not specified - if algorithm_kwargs.get("disp", False): - log_level = logging.INFO - else: - log_level = logging.WARN - - # low level minimizer overrides high level algorithm kwargs - max_iter = minimizer_kwargs.get("options", {}).get( - "maxiter", algorithm_kwargs.get("max_iter", 1000) + Hd_lst.append(objective.H[0]) + Hc_lst.append([H[0] if isinstance(H, list) else H for H in objective.H[1:]]) + + # extract initial and target states/operators from the objective + init = objective.initial + targ = objective.target + + # extract guess and bounds for the control pulses + x0, bounds = [], [] + for key in pulse_options.keys(): + x0.append(pulse_options[key].get("guess")) + bounds.append(pulse_options[key].get("bounds")) + + try: + lbound = [b[0][0] for b in bounds] + ubound = [b[0][1] for b in bounds] + except Exception: + lbound = [b[0] for b in bounds] + ubound = [b[1] for b in bounds] + + # default "log_level" if not specified + if algorithm_kwargs.get("disp", False): + log_level = logging.INFO + else: + log_level = logging.WARN + + # low level minimizer overrides high level algorithm kwargs + max_iter = minimizer_kwargs.get("options", {}).get( + "maxiter", algorithm_kwargs.get("max_iter", 1000) + ) + + optim_method = minimizer_kwargs.get( + "method", algorithm_kwargs.get("optim_method", "BFGS") + ) + + result = Result(objectives, time_interval) + + start_time = time.time() + + if alg == "GRAPE": + # only allow for scalar bound + lbound = lbound[0] + ubound = ubound[0] + + min_g = minimizer_kwargs.get("gtol", 1e-10) + + algorithm_kwargs["alg_params"] = { + "init_amps": np.array(x0).T, + } + + res = optimize_pulse( + drift=Hd_lst[0], + ctrls=Hc_lst[0], + initial=init, + target=targ, + num_tslots=time_interval.n_tslots, + evo_time=time_interval.evo_time, + tau=None, # implicitly derived from tslots + amp_lbound=lbound, + amp_ubound=ubound, + fid_err_targ=algorithm_kwargs.get("fid_err_targ", 1e-10), + min_grad=min_g, + max_iter=max_iter, + max_wall_time=algorithm_kwargs.get("max_wall_time", 180), + alg=alg, + optim_method=optim_method, + method_params=minimizer_kwargs, + optim_alg=None, # deprecated + max_metric_corr=None, # deprecated + accuracy_factor=None, # deprecated + alg_params=algorithm_kwargs.get("alg_params", None), + optim_params=algorithm_kwargs.get("optim_params", None), + dyn_type=algorithm_kwargs.get("dyn_type", "GEN_MAT"), + dyn_params=algorithm_kwargs.get("dyn_params", None), + prop_type=algorithm_kwargs.get("prop_type", "DEF"), + prop_params=algorithm_kwargs.get("prop_params", None), + fid_type=algorithm_kwargs.get("fid_type", "DEF"), + fid_params=algorithm_kwargs.get("fid_params", None), + phase_option=None, # deprecated + fid_err_scale_factor=None, # deprecated + tslot_type=algorithm_kwargs.get("tslot_type", "DEF"), + tslot_params=algorithm_kwargs.get("tslot_params", None), + amp_update_mode=None, # deprecated + init_pulse_type=algorithm_kwargs.get("init_pulse_type", "DEF"), + init_pulse_params=algorithm_kwargs.get("init_pulse_params", None), + pulse_scaling=algorithm_kwargs.get("pulse_scaling", 1.0), + pulse_offset=algorithm_kwargs.get("pulse_offset", 0.0), + ramping_pulse_type=algorithm_kwargs.get("ramping_pulse_type", None), + ramping_pulse_params=algorithm_kwargs.get("ramping_pulse_params", None), + log_level=algorithm_kwargs.get("log_level", log_level), + out_file_ext=algorithm_kwargs.get("out_file_ext", None), + gen_stats=algorithm_kwargs.get("gen_stats", False), ) + end_time = time.time() - optim_method = minimizer_kwargs.get( - "method", algorithm_kwargs.get("optim_method", "BFGS")) - - result = Result(objectives, time_interval) - - start_time = time.time() - - if alg == "CRAB": - + # extract runtime information + result.start_local_time = time.strftime( + "%Y-%m-%d %H:%M:%S", time.localtime(start_time) + ) + result.end_local_time = time.strftime( + "%Y-%m-%d %H:%M:%S", time.localtime(end_time) + ) + total_seconds = end_time - start_time + result.iter_seconds = [res.num_iter / total_seconds] * res.num_iter + result.n_iters = res.num_iter + result.message = res.termination_reason + result.final_states = [res.evo_full_final] + result.infidelity = res.fid_err + result.guess_params = res.initial_amps.T + result.optimized_params = res.final_amps.T + + # not present in analytical results + result.stats = res.stats + return result # GRAPE result + + crab_optimizer = [] + if alg == "CRAB": + for i, objective in enumerate(objectives): alg_params = { - "drift": Hd, - "ctrls": Hc_lst, + "drift": Hd_lst[i], + "ctrls": Hc_lst[i], "initial": init, "target": targ, "num_tslots": time_interval.n_tslots, @@ -192,7 +260,7 @@ def optimize_pulses( "amp_lbound": lbound, "amp_ubound": ubound, "fid_err_targ": algorithm_kwargs.get("fid_err_targ", 1e-10), - "min_grad": 0., + "min_grad": 0.0, "max_iter": max_iter, "max_wall_time": algorithm_kwargs.get("max_wall_time", 180), "alg": alg, @@ -202,9 +270,9 @@ def optimize_pulses( "max_metric_corr": None, # deprecated "accuracy_factor": None, # deprecated "alg_params": { - "num_coeffs": None, # TODO - "init_coeff_scaling": None, # TODO - "crab_pulse_params": None, # TODO + "num_coeffs": algorithm_kwargs.get("num_coeffs"), + "init_coeff_scaling": algorithm_kwargs.get("init_coeff_scaling"), + "crab_pulse_params": algorithm_kwargs.get("crab_pulse_params"), }, "optim_params": algorithm_kwargs.get("optim_params", None), "dyn_type": algorithm_kwargs.get("dyn_type", "GEN_MAT"), @@ -219,20 +287,24 @@ def optimize_pulses( "tslot_params": algorithm_kwargs.get("tslot_params", None), "amp_update_mode": None, # deprecated "init_pulse_type": algorithm_kwargs.get("init_pulse_type", "DEF"), - "init_pulse_params": algorithm_kwargs.get("init_pulse_params", None), # wavelength, frequency, phase etc. + "init_pulse_params": algorithm_kwargs.get( + "init_pulse_params", None + ), # wavelength, frequency, phase etc. "pulse_scaling": algorithm_kwargs.get("pulse_scaling", 1.0), "pulse_offset": algorithm_kwargs.get("pulse_offset", 0.0), - "ramping_pulse_type": algorithm_kwargs.get( - "ramping_pulse_type", None), + "ramping_pulse_type": algorithm_kwargs.get("ramping_pulse_type", None), "ramping_pulse_params": algorithm_kwargs.get( - "ramping_pulse_params", None), - "log_level": algorithm_kwargs.get("log_level", log_level), # TODO: deprecate + "ramping_pulse_params", None + ), + "log_level": algorithm_kwargs.get( + "log_level", log_level + ), # TODO: deprecate "gen_stats": algorithm_kwargs.get("gen_stats", False), } ###### code from qutip_qtrl.pulseoptim.optimize_pulse ###### - crab_optimizer = create_pulse_optimizer( + crab_optim = create_pulse_optimizer( drift=alg_params["drift"], ctrls=alg_params["ctrls"], initial=alg_params["initial"], @@ -275,116 +347,47 @@ def optimize_pulses( gen_stats=alg_params["gen_stats"], ) - dyn = crab_optimizer.dynamics + dyn = crab_optim.dynamics dyn.init_timeslots() - # Generate initial pulses for each control + # Generate initial pulses for each control through generator init_amps = np.zeros([dyn.num_tslots, dyn.num_ctrls]) + + # Check wether guess referes to amplitudes or parameters + use_as_amps = len(x0[0]) == time_interval.n_tslots + for j in range(dyn.num_ctrls): - pgen = crab_optimizer.pulse_generator[j] - pgen.init_pulse() - init_amps[:, j] = pgen.gen_pulse() + pgen = crab_optim.pulse_generator[j] + pgen.init_pulse() # generator for each control + + if use_as_amps: + init_amps[:, j] = x0[j] + else: + pgen.set_optim_var_vals(np.array(x0[j])) + init_amps[:, j] = pgen.gen_pulse() # Initialise the starting amplitudes dyn.initialize_controls(init_amps) - # And store the corresponding parameters (x0) - init_params = crab_optimizer._get_optim_var_vals() - - # guess and bounds for parameters are not supported - # the amplitude bounds are taken care of by the pulse generator - x0, bounds = [], [] - num_params = len(init_params) // len(pulse_options) - for i, key in enumerate(pulse_options.keys()): - pulse_options[key]["guess"] = init_params[i*num_params:(i+1)*num_params] - pulse_options[key]["bounds"] = None - - return opt_pulses( - objectives, - pulse_options, - time_interval, - time_options, - algorithm_kwargs, - optimizer_kwargs, - minimizer_kwargs, - integrator_kwargs, - crab_optimizer, - ) - - else: - # only alowes for scalar bound - lbound = lbound[0] - ubound = ubound[0] - - min_g = minimizer_kwargs.get("gtol", 1e-10) - - algorithm_kwargs["alg_params"] = { - "init_amps": np.array(x0).T, - } - - res = optimize_pulse( - drift=Hd, - ctrls=Hc_lst, - initial=init, - target=targ, - num_tslots=time_interval.n_tslots, - evo_time=time_interval.evo_time, - tau=None, # implicitly derived from tslots - amp_lbound=lbound, - amp_ubound=ubound, - fid_err_targ=algorithm_kwargs.get("fid_err_targ", 1e-10), - min_grad=min_g, - max_iter=max_iter, - max_wall_time=algorithm_kwargs.get("max_wall_time", 180), - alg=alg, - optim_method=optim_method, - method_params=minimizer_kwargs, - - optim_alg=None, # deprecated - max_metric_corr=None, # deprecated - accuracy_factor=None, # deprecated - alg_params=algorithm_kwargs.get("alg_params", None), - optim_params=algorithm_kwargs.get("optim_params", None), - dyn_type=algorithm_kwargs.get("dyn_type", "GEN_MAT"), - dyn_params=algorithm_kwargs.get("dyn_params", None), - prop_type=algorithm_kwargs.get("prop_type", "DEF"), - prop_params=algorithm_kwargs.get("prop_params", None), - fid_type=algorithm_kwargs.get("fid_type", "DEF"), - fid_params=algorithm_kwargs.get("fid_params", None), - phase_option=None, # deprecated - fid_err_scale_factor=None, # deprecated - tslot_type=algorithm_kwargs.get("tslot_type", "DEF"), - tslot_params=algorithm_kwargs.get("tslot_params", None), - amp_update_mode=None, # deprecated - init_pulse_type=algorithm_kwargs.get("init_pulse_type", "DEF"), - init_pulse_params=algorithm_kwargs.get("init_pulse_params", None), - pulse_scaling=algorithm_kwargs.get("pulse_scaling", 1.0), - pulse_offset=algorithm_kwargs.get("pulse_offset", 0.0), - ramping_pulse_type=algorithm_kwargs.get( - "ramping_pulse_type", None), - ramping_pulse_params=algorithm_kwargs.get( - "ramping_pulse_params", None), - log_level=algorithm_kwargs.get("log_level", log_level), - out_file_ext=algorithm_kwargs.get("out_file_ext", None), - gen_stats=algorithm_kwargs.get("gen_stats", False), - ) - - end_time = time.time() - - # extract runtime information - result.start_local_time = time.strftime( - '%Y-%m-%d %H:%M:%S', time.localtime(start_time)) - result.end_local_time = time.strftime( - '%Y-%m-%d %H:%M:%S', time.localtime(end_time)) - total_seconds = end_time - start_time - result.iter_seconds = [res.num_iter / total_seconds] * res.num_iter - result.n_iters = res.num_iter - result.message = res.termination_reason - result.final_states = [res.evo_full_final] - result.infidelity = res.fid_err - result.guess_params = res.initial_amps.T - result.optimized_params = res.final_amps.T - - # not present in analytical results - result.stats = res.stats - - return result + # And store the corresponding parameters + init_params = crab_optim._get_optim_var_vals() + + if use_as_amps: # For the global optimizer + num_params = len(init_params) // len(pulse_options) + for i, key in enumerate(pulse_options.keys()): + pulse_options[key]["guess"] = init_params[ + i * num_params : (i + 1) * num_params + ] # amplitude bounds are taken care of by pulse generator + + crab_optimizer.append(crab_optim) + + return opt_pulses( + objectives, + pulse_options, + time_interval, + time_options, + algorithm_kwargs, + optimizer_kwargs, + minimizer_kwargs, + integrator_kwargs, + crab_optimizer, + ) diff --git a/src/qutip_qoc/paper.py b/src/qutip_qoc/paper.py new file mode 100644 index 0000000..047f89a --- /dev/null +++ b/src/qutip_qoc/paper.py @@ -0,0 +1,80 @@ +import qutip as qt +import numpy as np +import qutip_qoc as qoc + +# Identity ---> Hadamard +initial = qt.qeye(2) +target = qt.gates.hadamard_transform() + +# convert to superoperator +initial = qt.sprepost(initial, initial.dag()) +target = qt.sprepost(target, target.dag()) + +# control Hamiltonian +sx, sy, sz = qt.sigmax(), qt.sigmay(), qt.sigmaz() +Hc = [sx, sy, sz] +Hc = [qt.liouvillian(H) for H in Hc] + +# energy splitting, tunneling, amplitude damping +omega, delta, gamma = 0.1, 1.0, 0.1 + +# drift Hamiltonian +Hd = 1 / 2 * (omega * sz + delta * sx) # drift +Hd = qt.liouvillian(H=Hd, c_ops=[np.sqrt(gamma) * qt.sigmam()]) +# full Hamiltonian +H = [Hd, Hc[0], Hc[1], Hc[2]] + +# time interval +pi, N = 3.14, 100 +interval = qoc.TimeInterval(evo_time=2 * pi, n_tslots=N) +# initial control amplitudes +initial_x = np.sin(interval()) +initial_y = np.cos(interval()) +initial_z = np.tan(interval()) + +res_grape = qoc.optimize_pulses( + objectives=[qoc.Objective(initial, H, target)], + pulse_options={ + "ctrl_x": {"guess": initial_x, "bounds": [-1, 1]}, + "ctrl_y": {"guess": initial_y, "bounds": [-1, 1]}, + "ctrl_z": {"guess": initial_z, "bounds": [-1, 1]}, + }, + time_interval=interval, + algorithm_kwargs={ + "alg": "GRAPE", + "fid_err_targ": 1, + }, +) + +print(res_grape) + +# initial control amplitudes +initial_x = np.sin(interval()) +initial_y = np.cos(interval()) +initial_z = np.tan(interval()) + +res_grape = qoc.optimize_pulses( + objectives=qoc.Objective(initial, H, target), + pulse_options={ + "ctrl_x": { + "guess": [0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5], + "bounds": [(-1, 1) for _ in range(8)], + }, + "ctrl_y": { + "guess": [0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5], + "bounds": [(-1, 1) for _ in range(8)], + }, + "ctrl_z": { + "guess": [0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5], + "bounds": [(-1, 1) for _ in range(8)], + }, + }, + time_interval=interval, + algorithm_kwargs={ + "alg": "CRAB", + "fid_err_targ": 0.5, + "num_coeffs": 4, + }, +) + +print(res_grape) diff --git a/src/qutip_qoc/pulse.c b/src/qutip_qoc/pulse.c index 0cb01d0..8ba1589 100644 --- a/src/qutip_qoc/pulse.c +++ b/src/qutip_qoc/pulse.c @@ -31597,7 +31597,7 @@ static struct PyGetSetDef __pyx_getsets_9qutip_qoc_5pulse_Pulse[] = { static PyType_Slot __pyx_type_9qutip_qoc_5pulse_Pulse_slots[] = { {Py_tp_dealloc, (void *)__pyx_tp_dealloc_9qutip_qoc_5pulse_Pulse}, {Py_tp_call, (void *)__pyx_pw_9qutip_qoc_5pulse_5Pulse_3__call__}, - {Py_tp_doc, (void *)PyDoc_STR("\n Base class for analytically defined pulse shapes.\n\n Attributes:\n n_sup (int): \n number of superpositions i.e. summands\n\n n_var (int): \n number of parameters for each summand\n\n n_par (int): \n total number of parameters\n ")}, + {Py_tp_doc, (void *)PyDoc_STR("\n Base class for analytically defined pulse shapes.\n\n Attributes:\n n_sup (int):\n number of superpositions i.e. summands\n\n n_var (int):\n number of parameters for each summand\n\n n_par (int):\n total number of parameters\n ")}, {Py_tp_methods, (void *)__pyx_methods_9qutip_qoc_5pulse_Pulse}, {Py_tp_getset, (void *)__pyx_getsets_9qutip_qoc_5pulse_Pulse}, {Py_tp_init, (void *)__pyx_pw_9qutip_qoc_5pulse_5Pulse_1__init__}, @@ -31644,7 +31644,7 @@ static PyTypeObject __pyx_type_9qutip_qoc_5pulse_Pulse = { 0, /*tp_setattro*/ 0, /*tp_as_buffer*/ Py_TPFLAGS_DEFAULT|Py_TPFLAGS_HAVE_VERSION_TAG|Py_TPFLAGS_CHECKTYPES|Py_TPFLAGS_HAVE_NEWBUFFER|Py_TPFLAGS_BASETYPE, /*tp_flags*/ - PyDoc_STR("\n Base class for analytically defined pulse shapes.\n\n Attributes:\n n_sup (int): \n number of superpositions i.e. summands\n\n n_var (int): \n number of parameters for each summand\n\n n_par (int): \n total number of parameters\n "), /*tp_doc*/ + PyDoc_STR("\n Base class for analytically defined pulse shapes.\n\n Attributes:\n n_sup (int):\n number of superpositions i.e. summands\n\n n_var (int):\n number of parameters for each summand\n\n n_par (int):\n total number of parameters\n "), /*tp_doc*/ 0, /*tp_traverse*/ 0, /*tp_clear*/ 0, /*tp_richcompare*/ @@ -32302,7 +32302,7 @@ static PyMethodDef __pyx_methods_9qutip_qoc_5pulse_FourierPulse[] = { }; #if CYTHON_USE_TYPE_SPECS static PyType_Slot __pyx_type_9qutip_qoc_5pulse_FourierPulse_slots[] = { - {Py_tp_doc, (void *)PyDoc_STR("\n Fourier pulse with summands of shape:\n A0 + A1 * cos(2 * pi / period * 1 * t) + B1 * sin(2 * pi / period * 1 * t) \n + A2 * cos(2 * pi / period * 2 * t) + B2 * sin(2 * pi / period * 2 * t) \n + ...\n where the period is the first parameter.\n ")}, + {Py_tp_doc, (void *)PyDoc_STR("\n Fourier pulse with summands of shape:\n A0 + A1 * cos(2 * pi / period * 1 * t) + B1 * sin(2 * pi / period * 1 * t)\n + A2 * cos(2 * pi / period * 2 * t) + B2 * sin(2 * pi / period * 2 * t)\n + ...\n where the period is the first parameter.\n ")}, {Py_tp_methods, (void *)__pyx_methods_9qutip_qoc_5pulse_FourierPulse}, {Py_tp_init, (void *)__pyx_pw_9qutip_qoc_5pulse_12FourierPulse_1__init__}, {Py_tp_new, (void *)__pyx_tp_new_9qutip_qoc_5pulse_FourierPulse}, @@ -32352,7 +32352,7 @@ static PyTypeObject __pyx_type_9qutip_qoc_5pulse_FourierPulse = { 0, /*tp_setattro*/ 0, /*tp_as_buffer*/ Py_TPFLAGS_DEFAULT|Py_TPFLAGS_HAVE_VERSION_TAG|Py_TPFLAGS_CHECKTYPES|Py_TPFLAGS_HAVE_NEWBUFFER|Py_TPFLAGS_BASETYPE, /*tp_flags*/ - PyDoc_STR("\n Fourier pulse with summands of shape:\n A0 + A1 * cos(2 * pi / period * 1 * t) + B1 * sin(2 * pi / period * 1 * t) \n + A2 * cos(2 * pi / period * 2 * t) + B2 * sin(2 * pi / period * 2 * t) \n + ...\n where the period is the first parameter.\n "), /*tp_doc*/ + PyDoc_STR("\n Fourier pulse with summands of shape:\n A0 + A1 * cos(2 * pi / period * 1 * t) + B1 * sin(2 * pi / period * 1 * t)\n + A2 * cos(2 * pi / period * 2 * t) + B2 * sin(2 * pi / period * 2 * t)\n + ...\n where the period is the first parameter.\n "), /*tp_doc*/ 0, /*tp_traverse*/ 0, /*tp_clear*/ 0, /*tp_richcompare*/ diff --git a/src/qutip_qoc/qtrl.py b/src/qutip_qoc/qtrl.py deleted file mode 100644 index cb94f83..0000000 --- a/src/qutip_qoc/qtrl.py +++ /dev/null @@ -1,499 +0,0 @@ -""" -This module contains a global optimization wrapper -for the GRAPE and CRAB pulse optimization. -""" -import time -import numpy as np -import scipy as sp -import qutip as qt - -from scipy.optimize import OptimizeResult - -from qutip_qoc.result import Result -from qutip_qoc.joat import Multi_JOAT -from qutip_qoc.goat import Multi_GOAT - -from qutip_qtrl.pulseoptim import optimize_pulse - - -__all__ = ["optimize_pulses"] - - -def get_init_and_bounds_from_options(lst, input): - """ - Extract initial and boundary values of any kind and shape - from the pulse_options and time_options dictionary. - """ - if input is None: - return lst - if isinstance(input, (list, np.ndarray)): - lst.append(input) - elif isinstance(input, (tuple)): - lst.append([input]) - elif np.isscalar(input): - lst.append([input]) - else: # jax Array - lst.append(np.array(input)) - return lst - - -class Callback: - """ - Callback functions for the local and global optimization algorithm. - Keeps track of time and saves intermediate results. - Terminates the optimization if the infidelity error target is reached. - Class initialization starts the clock. - """ - - def __init__(self, result, fid_err_targ, max_wall_time, bounds, disp): - self.result = result - self.fid_err_targ = fid_err_targ - self.max_wall_time = max_wall_time - self.bounds = bounds - self.disp = disp - - self.elapsed_time = 0 - self.iter_seconds = [] - self.start_time = self.iter_time = time.time() - - def stop_clock(self): - """ - Stops the clock and saves the start-,end- and iterations- time in result. - """ - self.end_time = time.time() - - self.result.start_local_time = time.strftime( - '%Y-%m-%d %H:%M:%S', time.localtime(self.start_time)) - self.result.end_local_time = time.strftime( - '%Y-%m-%d %H:%M:%S', time.localtime(self.end_time)) - - self.result.iter_seconds = self.iter_seconds - - def time_iter(self): - """ - Calculates and stores the time for each iteration. - """ - iter_time = time.time() - diff = round(iter_time - self.iter_time, 4) - self.iter_time = iter_time - self.iter_seconds.append(diff) - return diff - - def time_elapsed(self): - """ - Calculates and stores the elapsed time since the start of the optimization. - """ - self.elapsed_time = round(time.time() - self.start_time, 4) - return self.elapsed_time - - def inside_bounds(self, x): - """ - Check if the current parameters are inside the boundaries - used for the global and local optimization callback. - """ - idx = 0 - for bound in self.bounds: - for b in bound: - if not (b[0] <= x[idx] <= b[1]): - if self.disp: - print("parameter out of bounds, continuing optimization") - return False - idx += 1 - return True - - def min_callback(self, intermediate_result: OptimizeResult): - """ - Callback function for the local minimizer, - terminates if the infidelity target is reached or - the maximum wall time is exceeded. - """ - terminate = False - - if intermediate_result.fun <= self.fid_err_targ: - terminate = True - reason = "fid_err_targ reached" - elif self.time_elapsed() >= self.max_wall_time: - terminate = True - reason = "max_wall_time reached" - - if self.disp: - message = "minimizer step, infidelity: %.5f" % intermediate_result.fun - if terminate: - message += "\n" + reason + ", terminating minimization" - print(message) - - if terminate: # manually save the result and exit - if intermediate_result.fun < self.result.infidelity: - if intermediate_result.fun > 0: - if self.inside_bounds(intermediate_result.x): - self.result.update(intermediate_result.fun, - intermediate_result.x) - raise StopIteration - - def opt_callback(self, x, f, accept): - """ - Callback function for the global optimizer, - terminates if the infidelity target is reached or - the maximum wall time is exceeded. - """ - terminate = False - global_step_seconds = self.time_iter() - - if f <= self.fid_err_targ: - terminate = True - self.result.message = "fid_err_targ reached" - elif self.time_elapsed() >= self.max_wall_time: - terminate = True - self.result.message = "max_wall_time reached" - - if self.disp: - message = "optimizer step, infidelity: %.5f" % f +\ - ", took %.2f seconds" % global_step_seconds - if terminate: - message += "\n" + self.result.message + ", terminating optimization" - print(message) - - if terminate: # manually save the result and exit - if f < self.result.infidelity: - if f < 0: - print( - "WARNING: infidelity < 0 -> inaccurate integration, " - "try reducing integrator tolerance (atol, rtol), " - "continuing with global optimization") - terminate = False - elif self.inside_bounds(x): - self.result.update(f, x) - - return terminate - - -def optimize_pulses( - objectives, - pulse_options, - time_interval, - time_options, - algorithm_kwargs, - optimizer_kwargs, - minimizer_kwargs, - integrator_kwargs): - """ - Optimize a pulse sequence to implement a given target unitary by optimizing - the parameters of the pulse functions. The algorithm is a two-layered - approach. The outer layer does a global optimization using basin-hopping or - dual annealing. The inner layer does a local optimization using a gradient- - based method. Gradients and error values are calculated in the GOAT/JOAT - module. - - Parameters - ---------- - objectives : list of :class:`qutip_qoc.Objective` - List of objectives to be optimized. - - pulse_options : dict - Dictionary of options for the control pulse optimization. - For each control function it must specify: - - control_id : dict - - guess: ndarray, shape (n,) - Initial guess. Array of real elements of size (n,), - where ``n`` is the number of independent variables. - - - bounds : sequence, optional - Sequence of ``(min, max)`` pairs for each element in - `guess`. None is used to specify no bound. - - time_interval : :class:`qutip_qoc.TimeInterval` - Time interval for the optimization. - - time_options : dict, optional - Only supported by GOAT and JOAT. - Dictionary of options for the time interval optimization. - It must specify both: - - - guess: ndarray, shape (n,) - Initial guess. Array of real elements of size (n,), - where ``n`` is the number of independent variables. - - - bounds : sequence, optional - Sequence of ``(min, max)`` pairs for each element in `guess`. - None is used to specify no bound. - - algorithm_kwargs : dict, optional - Dictionary of options for the optimization algorithm. - - - alg : str - Algorithm to use for the optimization. - Supported are: "GOAT", "JOAT". - - - fid_err_targ : float, optional - Fidelity error target for the optimization. - - - max_iter : int, optional - Maximum number of global iterations to perform. - Can be overridden by specifying in - optimizer_kwargs/minimizer_kwargs. - - optimizer_kwargs : dict, optional - Dictionary of options for the global optimizer. - Only supported by GOAT and JOAT. - - - method : str, optional - Algorithm to use for the global optimization. - Supported are: "basinhopping", "dual_annealing" - - - max_iter : int, optional - Maximum number of iterations to perform. - - Full list of options can be found in - :func:`scipy.optimize.basinhopping` - and :func:`scipy.optimize.dual_annealing`. - - minimizer_kwargs : dict, optional - Dictionary of options for the local minimizer. - - - method : str, optional - Algorithm to use for the local optimization. - Gradient driven methods are supported. - - Full list of options and methods can be found in - :func:`scipy.optimize.minimize`. - - integrator_kwargs : dict, optional - Dictionary of options for the integrator. - Only supported by GOAT and JOAT. - Options for the solver, see :obj:`MESolver.options` and - `Integrator <./classes.html#classes-ode>`_ for a list of all options. - - Returns - ------- - result : :class:`qutip_qoc.Result` - Optimization result. - """ - if len(objectives) != 1: - raise TypeError( - "GRAPE and CRAB optimization only supports one objective at a " - "time. Please use GOAT or JOAT for multiple objectives." - ) - objective = objectives[0] - - # extract drift and control Hamiltonians from the objective - Hd = objective.H[0] - Hc_lst = [H[0] for H in objective.H[1:]] - - # extract initial and target states/operators from the objective - init = objective.initial - targ = objective.target - - # extract guess and bounds for the control pulses - x0, bounds = [], [] - for key in pulse_options.keys(): - x0.append(pulse_options[key].get("guess")) - bounds.append(pulse_options[key].get("bounds")) - - try: - lbound = [b[0][0] for b in bounds] - ubound = [b[0][1] for b in bounds] - except Exception: - lbound = [b[0] for b in bounds] - ubound = [b[1] for b in bounds] - - alg = algorithm_kwargs.get("alg", "GRAPE") - if alg == "CRAB": - min_g = 0. - algorithm_kwargs["alg_params"] = { - "guess_pulse": x0, - } - - elif alg == "GRAPE": - # only alowes for scalar bound - lbound = lbound[0] - ubound = ubound[0] - - min_g = minimizer_kwargs.get("gtol", 1e-10) - - algorithm_kwargs["alg_params"] = { - "init_amps": np.array(x0).T, - } - - # default "log_level" if not specified - if algorithm_kwargs.get("disp", False): - log_level = logging.INFO - else: - log_level = logging.WARN - - # low level minimizer overrides high level algorithm kwargs - max_iter = minimizer_kwargs.get("options", {}).get( - "maxiter", algorithm_kwargs.get("max_iter", 1000)) - - optim_method = minimizer_kwargs.get( - "method", algorithm_kwargs.get("optim_method", "DEF")) - - result = Result(objectives, time_interval) - - - def optimize_pulse_wrapper(): - res = optimize_pulse( - drift=Hd, - ctrls=Hc_lst, - initial=init, - target=targ, - num_tslots=time_interval.n_tslots, - evo_time=time_interval.evo_time, - tau=None, # implicitly derived from tslots - amp_lbound=lbound, - amp_ubound=ubound, - fid_err_targ=algorithm_kwargs.get("fid_err_targ", 1e-10), - min_grad=min_g, - max_iter=max_iter, - max_wall_time=algorithm_kwargs.get("max_wall_time", 180), - alg=alg, - optim_method=optim_method, - method_params=minimizer_kwargs, - - optim_alg=None, # deprecated - max_metric_corr=None, # deprecated - accuracy_factor=None, # deprecated - alg_params=algorithm_kwargs.get("alg_params", None), - optim_params=algorithm_kwargs.get("optim_params", None), - dyn_type=algorithm_kwargs.get("dyn_type", "GEN_MAT"), - dyn_params=algorithm_kwargs.get("dyn_params", None), - prop_type=algorithm_kwargs.get("prop_type", "DEF"), - prop_params=algorithm_kwargs.get("prop_params", None), - fid_type=algorithm_kwargs.get("fid_type", "DEF"), - fid_params=algorithm_kwargs.get("fid_params", None), - phase_option=None, # deprecated - fid_err_scale_factor=None, # deprecated - tslot_type=algorithm_kwargs.get("tslot_type", "DEF"), - tslot_params=algorithm_kwargs.get("tslot_params", None), - amp_update_mode=None, # deprecated - init_pulse_type=algorithm_kwargs.get("init_pulse_type", "DEF"), - init_pulse_params=algorithm_kwargs.get("init_pulse_params", None), - pulse_scaling=algorithm_kwargs.get("pulse_scaling", 1.0), - pulse_offset=algorithm_kwargs.get("pulse_offset", 0.0), - ramping_pulse_type=algorithm_kwargs.get( - "ramping_pulse_type", None), - ramping_pulse_params=algorithm_kwargs.get( - "ramping_pulse_params", None), - log_level=algorithm_kwargs.get("log_level", log_level), - out_file_ext=algorithm_kwargs.get("out_file_ext", None), - gen_stats=algorithm_kwargs.get("gen_stats", False), - ) - - - - # extract initial and boundary values - x0, bounds = [], [] - for key in pulse_options.keys(): - get_init_and_bounds_from_options(x0, pulse_options[key].get("guess")) - get_init_and_bounds_from_options(bounds, pulse_options[key].get("bounds")) - - get_init_and_bounds_from_options(x0, time_options.get("guess", None)) - get_init_and_bounds_from_options(bounds, time_options.get("bounds", None)) - - optimizer_kwargs.setdefault("x0", np.concatenate(x0)) - - # algorithm specific settings - if algorithm_kwargs.get("alg") == "JOAT": - with qt.CoreOptions(default_dtype="jax"): - multi_objective = Multi_JOAT(objectives, time_interval, - time_options, pulse_options, - algorithm_kwargs, - guess_params=optimizer_kwargs["x0"], - **integrator_kwargs) - elif algorithm_kwargs.get("alg") == "GOAT": - multi_objective = Multi_GOAT(objectives, time_interval, time_options, - pulse_options, algorithm_kwargs, - guess_params=optimizer_kwargs["x0"], - **integrator_kwargs) - - # optimizer specific settings - opt_method = optimizer_kwargs.get( - "method", algorithm_kwargs.get("method", "basinhopping")) - - if opt_method == "basinhopping": - optimizer = sp.optimize.basinhopping - - # if not specified through optimizer_kwargs "niter" - optimizer_kwargs.setdefault( # or "max_iter" - "niter", optimizer_kwargs.get( # use algorithm_kwargs - "max_iter", algorithm_kwargs.get("max_iter", 1000))) - - # realizes boundaries through minimizer - minimizer_kwargs.setdefault("bounds", np.concatenate(bounds)) - - elif opt_method == "dual_annealing": - optimizer = sp.optimize.dual_annealing - - # if not specified through optimizer_kwargs "maxiter" - optimizer_kwargs.setdefault( # or "max_iter" - "maxiter", optimizer_kwargs.get( # use algorithm_kwargs - "max_iter", algorithm_kwargs.get("max_iter", 1000))) - - # realizes boundaries through optimizer - optimizer_kwargs.setdefault("bounds", np.concatenate(bounds)) - - # remove overload from optimizer_kwargs - optimizer_kwargs.pop("max_iter", None) - optimizer_kwargs.pop("method", None) - - # should optimization include time - var_t = True if time_options.get("guess", False) else False - - # define the result Krotov style - result = Result(objectives, - time_interval, - guess_params=x0, - var_time=var_t) - - # Callback instance for termination and logging - max_wall_time = algorithm_kwargs.get("max_wall_time", 1e10) - fid_err_targ = algorithm_kwargs.get("fid_err_targ", 1e-10) - disp = algorithm_kwargs.get("disp", False) - # start the clock - cllbck = Callback(result, fid_err_targ, max_wall_time, bounds, disp) - - # run the optimization - min_res = optimizer( - func=optimize_pulse_wrapper, - minimizer_kwargs={ - 'jac': multi_objective.grad_fun, - 'callback': cllbck.min_callback, - **minimizer_kwargs - }, - callback=cllbck.opt_callback, - **optimizer_kwargs - ) - - cllbck.stop_clock() # stop the clock - - end_time = time.time() - - # extract runtime information - result.start_local_time = time.strftime( - '%Y-%m-%d %H:%M:%S', time.localtime(start_time)) - result.end_local_time = time.strftime( - '%Y-%m-%d %H:%M:%S', time.localtime(end_time)) - total_seconds = end_time - start_time - result.iter_seconds = [res.num_iter / total_seconds] * res.num_iter - result.n_iters = res.num_iter - result.message = res.termination_reason - result.final_states = [res.evo_full_final] - result.infidelity = res.fid_err - result.guess_params = res.initial_amps.T - result.optimized_params = res.final_amps.T - - # not present in analytical results - result.stats = res.stats - - # some global optimization methods do not return the minimum result - # when terminated through StopIteration (see min_callback) - if min_res.fun < result.infidelity: - if cllbck.inside_bounds(min_res.x): - result.update(min_res.fun, min_res.x) - - # save runtime information in result - result.n_iters = min_res.nit - if result.message is None: - result.message = min_res.message - - return result