diff --git a/docs/source/gflow_utils.rst b/docs/source/gflow_utils.rst new file mode 100644 index 000000000..f6be1232f --- /dev/null +++ b/docs/source/gflow_utils.rst @@ -0,0 +1,17 @@ +Gflow Utils +=========== + +:mod:`graphqomb.gflow_utils` module ++++++++++++++++++++++++++++++++++++ + +.. automodule:: graphqomb.gflow_utils + +Functions +--------- + +.. autofunction:: graphqomb.gflow_utils.gflow_wrapper + +Constants +--------- + +.. autodata:: graphqomb.gflow_utils._EQUIV_MEAS_BASIS_MAP diff --git a/docs/source/references.rst b/docs/source/references.rst index 5bc984987..f6039a287 100644 --- a/docs/source/references.rst +++ b/docs/source/references.rst @@ -15,6 +15,7 @@ Module reference random_objects feedforward focus_flow + gflow_utils command pattern pauli_frame @@ -22,3 +23,4 @@ Module reference scheduler stim_compiler visualizer + zxgraphstate diff --git a/docs/source/zxgraphstate.rst b/docs/source/zxgraphstate.rst new file mode 100644 index 000000000..6961dc11a --- /dev/null +++ b/docs/source/zxgraphstate.rst @@ -0,0 +1,21 @@ +ZXGraphState +============ + +:mod:`graphqomb.zxgraphstate` module ++++++++++++++++++++++++++++++++++++++ + +.. automodule:: graphqomb.zxgraphstate + +ZX Graph State +-------------- + +.. autoclass:: graphqomb.zxgraphstate.ZXGraphState + :members: + :show-inheritance: + :member-order: bysource + +Functions +--------- + +.. autofunction:: graphqomb.zxgraphstate.to_zx_graphstate +.. autofunction:: graphqomb.zxgraphstate.complete_graph_edges diff --git a/examples/zxgraph_simplification.py b/examples/zxgraph_simplification.py new file mode 100644 index 000000000..464cb2e05 --- /dev/null +++ b/examples/zxgraph_simplification.py @@ -0,0 +1,154 @@ +""" +Basic example of simplifying a ZX-diagram. +========================================== + +By using the full_reduce method, +we can remove all the internal Clifford nodes and some non-Clifford nodes from the graph state, +which generates a simpler ZX-diagram. +This example is a simple demonstration of the simplification process. + +Note that as a result of the simplification, local Clifford operations are applied to the input/output nodes. +""" + +# %% + +from __future__ import annotations + +from copy import deepcopy + +import numpy as np + +from graphqomb.gflow_utils import gflow_wrapper +from graphqomb.qompiler import qompile +from graphqomb.random_objects import generate_random_flow_graph +from graphqomb.simulator import PatternSimulator, SimulatorBackend +from graphqomb.visualizer import visualize +from graphqomb.zxgraphstate import ZXGraphState, to_zx_graphstate + +FlowLike = dict[int, set[int]] + +# %% +# Prepare an initial random graph state with flow +graph, flow = generate_random_flow_graph(width=3, depth=4, edge_p=0.5) +zx_graph, _ = to_zx_graphstate(graph) +visualize(zx_graph) + +# %% +# We can compile the graph state into a measurement pattern, simulate it, and get the resulting statevector. +pattern = qompile(zx_graph, flow) +sim = PatternSimulator(pattern, backend=SimulatorBackend.StateVector) +sim.simulate() +statevec_original = sim.state + + +# %% +def print_boundary_lcs(zxgraph: ZXGraphState) -> None: + lc_map = zxgraph.local_cliffords + for node in zxgraph.input_node_indices | zxgraph.output_node_indices: + # check lc on input and output nodes + lc = lc_map.get(node, None) + if lc is not None: + if node in zxgraph.input_node_indices: + print(f"Input node {node} has local Clifford: alpha={lc.alpha}, beta={lc.beta}, gamma={lc.gamma}") + else: + print(f"Output node {node} has local Clifford: alpha={lc.alpha}, beta={lc.beta}, gamma={lc.gamma}") + else: + print(f"Node {node} has no local Clifford.") + + +def print_meas_bses(graph: ZXGraphState) -> None: + print("node | plane | angle (/pi)") + for node in graph.input_node_indices: + print(f"{node} (input)", graph.meas_bases[node].plane, graph.meas_bases[node].angle / np.pi) + for node in graph.physical_nodes - set(graph.input_node_indices) - set(graph.output_node_indices): + print(node, graph.meas_bases[node].plane, graph.meas_bases[node].angle / np.pi) + for node in graph.output_node_indices: + print(f"{node} (output)", "-", "-") + + +# %% +print_boundary_lcs(zx_graph) + +# %% +# Initial graph state before simplification +print_meas_bses(zx_graph) + + +# %% +# Simplify the graph state by full_reduce method +zx_graph_smp = deepcopy(zx_graph) +zx_graph_smp.full_reduce() + +# %% +# Simplified graph state after full_reduce. +visualize(zx_graph_smp) +print_meas_bses(zx_graph_smp) +print_boundary_lcs(zx_graph_smp) + +# %% +# NOTE: +# At first glance, the input/output nodes appear to remain unaffected. +# However, note that a local Clifford operation is actually applied as a result of the action of the full_reduce method. + +# If you visualize the graph state after executing the `expand_local_cliffords` method, +# you will see additional nodes connected to the former input/output nodes. + + +# %% +# Let us compare the graph state before and after simplification. +# We simulate the pattern obtained from the simplified graph state. +# Note that we need to call the `expand_local_cliffords` method before generating the pattern to get the gflow. + +zx_graph_smp.expand_local_cliffords() +zx_graph_smp.to_xy() # to improve gflow search performance +zx_graph_smp.to_xz() # to improve gflow search performance +print("input_node_indices: ", set(zx_graph_smp.input_node_indices)) +print("output_node_indices: ", set(zx_graph_smp.output_node_indices)) +print("local_cliffords: ", zx_graph_smp.local_cliffords) + +print_meas_bses(zx_graph_smp) +visualize(zx_graph_smp) +print_boundary_lcs(zx_graph_smp) + +# %% +# Now we can obtain the gflow for the simplified graph state. +# Then, we compile the simplified graph state into a measurement pattern, +# simulate it, and get the resulting statevector. + +# NOTE: +# gflow_wrapper does not support graph states with multiple subgraph structures in the gflow search wrapper below. +# Hence, in case you fail, ensure that the simplified graph state consists of a single connected component. +# To calculate the graph states with multiple subgraph structures, +# you need to calculate gflow for each connected component separately. +gflow_smp = gflow_wrapper(zx_graph_smp) +pattern_smp = qompile(zx_graph_smp, gflow_smp) +sim_smp = PatternSimulator(pattern_smp, backend=SimulatorBackend.StateVector) +sim_smp.simulate() + +# %% +statevec_smp = sim_smp.state +# %% +# normalization check +print("norm of original statevector:", np.linalg.norm(statevec_original.state())) +print("norm of simplified statevector:", np.linalg.norm(statevec_smp.state())) + +# %% +# Finally, we compare the expectation values of random observables before and after simplification. +rng = np.random.default_rng() +for i in range(len(zx_graph.input_node_indices)): + rand_mat = rng.random((2, 2)) + 1j * rng.random((2, 2)) + rand_mat += rand_mat.T.conj() + exp = statevec_original.expectation(rand_mat, [i]) + exp_cr = statevec_smp.expectation(rand_mat, [i]) + print("Expectation values for rand_mat\n===============================") + print("rand_mat: \n", rand_mat) + print("Original: \t\t", exp) + print("After simplification: \t", exp_cr) + +print("norm: ", np.linalg.norm(statevec_original.state()), np.linalg.norm(statevec_smp.state())) +print("data shape: ", statevec_original.state().shape, statevec_smp.state().shape) +psi_org = statevec_original.state() +psi_smp = statevec_smp.state() +print("inner product: ", np.abs(np.vdot(psi_org, psi_smp))) + +# %% diff --git a/graphqomb/common.py b/graphqomb/common.py index b55ca78b5..883fc8cd4 100644 --- a/graphqomb/common.py +++ b/graphqomb/common.py @@ -397,3 +397,45 @@ def meas_basis(plane: Plane, angle: float) -> NDArray[np.complex128]: else: typing_extensions.assert_never(plane) return basis.astype(np.complex128) + + +def basis2tuple(meas_basis: MeasBasis) -> tuple[Plane, float]: + r"""Return the key (tuple[Plane, float]) for _EQUIV_MEAS_BASIS_MAP from a measurement basis. + + Parameters + ---------- + meas_basis : `MeasBasis` + measurement basis + + Returns + ------- + tuple[Plane, float] + key for _EQUIV_MEAS_BASIS_MAP + """ + angle = round_clifford_angle(meas_basis.angle) + return (meas_basis.plane, angle) + + +def round_clifford_angle(angle: float, atol: float = 1e-9) -> float: + r"""Round the Clifford angle numerically to the nearest Clifford angle. + + Parameters + ---------- + angle : `float` + angle in radians + atol : `float`, optional + absolute tolerance, by default 1e-9 + + Returns + ------- + angle : `float` + For Clifford angles, the rounded angle. + If the angle is not a Clifford angle, return the original angle. + """ + clifford_angles = [0.0, np.pi / 2, np.pi, 3 * np.pi / 2] + for ca in clifford_angles: + if is_close_angle(angle, ca, atol): + angle = ca + break + + return angle diff --git a/graphqomb/euler.py b/graphqomb/euler.py index 0b4e0bb92..a33ff7aae 100644 --- a/graphqomb/euler.py +++ b/graphqomb/euler.py @@ -290,7 +290,7 @@ def update_lc_basis(lc: LocalClifford, basis: MeasBasis) -> PlannerMeasBasis: `PlannerMeasBasis` updated `PlannerMeasBasis` """ - matrix = lc.matrix() + matrix = lc.matrix().conjugate().T vector = basis.vector() updated_vector = np.asarray(matrix @ vector, dtype=np.complex128) diff --git a/graphqomb/gflow_utils.py b/graphqomb/gflow_utils.py new file mode 100644 index 000000000..8c8c89a39 --- /dev/null +++ b/graphqomb/gflow_utils.py @@ -0,0 +1,111 @@ +"""Utilities for generalized flow (gflow) computation. + +This module provides: + +- `gflow_wrapper`: Thin adapter around ``swiflow.gflow`` so that gflow can be computed directly + from a `BaseGraphState` instance. +- `_EQUIV_MEAS_BASIS_MAP`: A mapping between equivalent measurement bases used to improve gflow finding performance. +""" + +from __future__ import annotations + +import math +from typing import TYPE_CHECKING + +import networkx as nx +from swiflow import gflow +from swiflow.common import Plane as SfPlane + +from graphqomb.common import Plane, PlannerMeasBasis + +if TYPE_CHECKING: + from typing import Any + + from networkx import Graph as NxGraph + + from graphqomb.graphstate import BaseGraphState + + FlowLike = dict[int, set[int]] + + +def gflow_wrapper(graphstate: BaseGraphState) -> FlowLike: + """Utilize ``swiflow.gflow`` to search gflow. + + Parameters + ---------- + graphstate : `BaseGraphState` + graph state to find gflow + + Returns + ------- + ``FlowLike`` + gflow object + + Raises + ------ + ValueError + If no gflow is found + + Notes + ----- + This wrapper does not support graph states with multiple subgraph structures. + """ + graph: NxGraph[Any] = nx.Graph() + graph.add_nodes_from(graphstate.physical_nodes) + graph.add_edges_from(graphstate.physical_edges) + + bases = graphstate.meas_bases + planes = {node: bases[node].plane for node in bases} + swiflow_planes: dict[int, SfPlane] = {} + for node, plane in planes.items(): + if plane == Plane.XY: + swiflow_planes[node] = SfPlane.XY + elif plane == Plane.YZ: + swiflow_planes[node] = SfPlane.YZ + elif plane == Plane.XZ: + swiflow_planes[node] = SfPlane.XZ + else: + msg = f"No match {plane}" + raise ValueError(msg) + + gflow_object = gflow.find( + graph, set(graphstate.input_node_indices), set(graphstate.output_node_indices), swiflow_planes + ) + if gflow_object is None: + msg = "No flow found" + raise ValueError(msg) + + gflow_obj = gflow_object.f + + return {node: {child for child in children if child != node} for node, children in gflow_obj.items()} + + +#: Mapping between equivalent measurement bases. +#: +#: This map is used to replace a measurement basis by an equivalent one +#: to improve gflow search performance. +#: +#: Key: +#: ``(Plane, angle)`` where angle is in radians. +#: Value: +#: :class:`~graphqomb.common.PlannerMeasBasis`. +_EQUIV_MEAS_BASIS_MAP: dict[tuple[Plane, float], PlannerMeasBasis] = { + # (XY, 0) <-> (XZ, pi/2) + (Plane.XY, 0.0): PlannerMeasBasis(Plane.XZ, 0.5 * math.pi), + (Plane.XZ, 0.5 * math.pi): PlannerMeasBasis(Plane.XY, 0.0), + # (XY, pi/2) <-> (YZ, pi/2) + (Plane.XY, 0.5 * math.pi): PlannerMeasBasis(Plane.YZ, 0.5 * math.pi), + (Plane.YZ, 0.5 * math.pi): PlannerMeasBasis(Plane.XY, 0.5 * math.pi), + # (XY, -pi/2) == (XY, 3pi/2) <-> (YZ, 3pi/2) + (Plane.XY, 1.5 * math.pi): PlannerMeasBasis(Plane.YZ, 1.5 * math.pi), + (Plane.YZ, 1.5 * math.pi): PlannerMeasBasis(Plane.XY, 1.5 * math.pi), + # (XY, pi) <-> (XZ, -pi/2) == (XZ, 3pi/2) + (Plane.XY, math.pi): PlannerMeasBasis(Plane.XZ, 1.5 * math.pi), + (Plane.XZ, 1.5 * math.pi): PlannerMeasBasis(Plane.XY, math.pi), + # (XZ, 0) <-> (YZ, 0) + (Plane.XZ, 0.0): PlannerMeasBasis(Plane.YZ, 0.0), + (Plane.YZ, 0.0): PlannerMeasBasis(Plane.XZ, 0.0), + # (XZ, pi) <-> (YZ, pi) + (Plane.XZ, math.pi): PlannerMeasBasis(Plane.YZ, math.pi), + (Plane.YZ, math.pi): PlannerMeasBasis(Plane.XZ, math.pi), +} diff --git a/graphqomb/graphstate.py b/graphqomb/graphstate.py index 4253ac542..b3550fbe5 100644 --- a/graphqomb/graphstate.py +++ b/graphqomb/graphstate.py @@ -21,15 +21,14 @@ from abc import ABC from collections.abc import Hashable, Iterable, Mapping, Sequence from collections.abc import Set as AbstractSet -from typing import TYPE_CHECKING, NamedTuple, TypeVar +from typing import NamedTuple, TypeVar +import numpy as np import typing_extensions -from graphqomb.common import MeasBasis, Plane, PlannerMeasBasis -from graphqomb.euler import update_lc_basis, update_lc_lc - -if TYPE_CHECKING: - from graphqomb.euler import LocalClifford +from graphqomb.common import MeasBasis, Plane, PlannerMeasBasis, basis2tuple, is_close_angle, round_clifford_angle +from graphqomb.euler import LocalClifford, update_lc_basis, update_lc_lc +from graphqomb.gflow_utils import _EQUIV_MEAS_BASIS_MAP NodeT = TypeVar("NodeT", bound=Hashable) @@ -474,7 +473,7 @@ def apply_local_clifford(self, node: int, lc: LocalClifford) -> None: self.__local_cliffords[node] = lc else: self._check_meas_basis() - new_meas_basis = update_lc_basis(lc.conjugate(), self.meas_bases[node]) + new_meas_basis = update_lc_basis(lc, self.meas_bases[node]) self.assign_meas_basis(node, new_meas_basis) @typing_extensions.override @@ -552,28 +551,25 @@ def _expand_input_local_cliffords(self) -> dict[int, LocalCliffordExpansion]: """ node_index_addition_map: dict[int, LocalCliffordExpansion] = {} new_input_indices: dict[int, int] = {} - for input_node, q_index in self.input_node_indices.items(): - lc = self._pop_local_clifford(input_node) + for old_input_node, q_index in self.input_node_indices.items(): + lc = self._pop_local_clifford(old_input_node) if lc is None: - new_input_indices[input_node] = q_index - continue + lc = LocalClifford(0.0, 0.0, 0.0) - new_node_index0 = self.add_physical_node() - new_input_indices[new_node_index0] = q_index - new_node_index1 = self.add_physical_node() - new_node_index2 = self.add_physical_node() + new_input = self.add_physical_node() + new_node = self.add_physical_node() + new_input_indices[new_input] = q_index - self.add_physical_edge(new_node_index0, new_node_index1) - self.add_physical_edge(new_node_index1, new_node_index2) - self.add_physical_edge(new_node_index2, input_node) + self.add_physical_edge(new_input, new_node) + self.add_physical_edge(new_node, old_input_node) - self.assign_meas_basis(new_node_index0, PlannerMeasBasis(Plane.XY, lc.alpha)) - self.assign_meas_basis(new_node_index1, PlannerMeasBasis(Plane.XY, lc.beta)) - self.assign_meas_basis(new_node_index2, PlannerMeasBasis(Plane.XY, lc.gamma)) - - node_index_addition_map[input_node] = LocalCliffordExpansion( - new_node_index0, new_node_index1, new_node_index2 - ) + self.assign_meas_basis(new_input, PlannerMeasBasis(Plane.XY, 0.0)) + self.assign_meas_basis(new_node, PlannerMeasBasis(Plane.XY, 0.0)) + meas_basis = self.meas_bases[old_input_node] + new_meas_basis = update_lc_basis(lc, meas_basis) + self.assign_meas_basis(old_input_node, new_meas_basis) + self._assure_gflow_input_expansion(old_input_node) + node_index_addition_map[old_input_node] = LocalCliffordExpansion(new_input, new_node) self.__input_node_indices = {} for new_input_index, q_index in new_input_indices.items(): @@ -581,6 +577,32 @@ def _expand_input_local_cliffords(self) -> dict[int, LocalCliffordExpansion]: return node_index_addition_map + def _assure_gflow_input_expansion(self, node: int) -> None: + r"""Assure gflow existence after input local Clifford expansion. + + Parameters + ---------- + node : `int` + node index + """ + cur = self.meas_bases[node] + rounded = round_clifford_angle(cur.angle) + self.assign_meas_basis(node, PlannerMeasBasis(cur.plane, rounded)) + + cur = self.meas_bases[node] + cur_key = basis2tuple(cur) + + # if the updated basis is self-inclusion type, push it to an XY-equivalent one. + if (cur.plane in {Plane.XZ, Plane.YZ} and is_close_angle(cur.angle, 0.0)) or ( + is_close_angle(cur.angle, np.pi) and cur_key in _EQUIV_MEAS_BASIS_MAP + ): + self.assign_meas_basis(node, _EQUIV_MEAS_BASIS_MAP[cur_key]) + + # ensure XY if possible. + cur = self.meas_bases[node] + if cur.plane != Plane.XY and cur_key in _EQUIV_MEAS_BASIS_MAP: + self.assign_meas_basis(node, _EQUIV_MEAS_BASIS_MAP[cur_key]) + def _expand_output_local_cliffords(self) -> dict[int, LocalCliffordExpansion]: r"""Expand local Clifford operators applied on the output nodes. @@ -590,33 +612,28 @@ def _expand_output_local_cliffords(self) -> dict[int, LocalCliffordExpansion]: A dictionary mapping output node indices to the new node indices created. """ node_index_addition_map: dict[int, LocalCliffordExpansion] = {} - new_output_index_map: dict[int, int] = {} - for output_node, q_index in self.output_node_indices.items(): - lc = self._pop_local_clifford(output_node) + new_output_node_index_map: dict[int, int] = {} + for old_output_node, q_index in self.output_node_indices.items(): + lc = self._pop_local_clifford(old_output_node) if lc is None: - new_output_index_map[output_node] = q_index + new_output_node_index_map[old_output_node] = q_index continue - new_node_index0 = self.add_physical_node() - new_node_index1 = self.add_physical_node() - new_node_index2 = self.add_physical_node() - new_output_index_map[new_node_index2] = q_index + new_node = self.add_physical_node() + new_output_node = self.add_physical_node() + new_output_node_index_map[new_output_node] = q_index - self.add_physical_edge(output_node, new_node_index0) - self.add_physical_edge(new_node_index0, new_node_index1) - self.add_physical_edge(new_node_index1, new_node_index2) + self.__output_node_indices.pop(old_output_node) + self.register_output(new_output_node, q_index) - self.assign_meas_basis(output_node, PlannerMeasBasis(Plane.XY, lc.alpha)) - self.assign_meas_basis(new_node_index0, PlannerMeasBasis(Plane.XY, lc.beta)) - self.assign_meas_basis(new_node_index1, PlannerMeasBasis(Plane.XY, lc.gamma)) + self.add_physical_edge(old_output_node, new_node) + self.add_physical_edge(new_node, new_output_node) - node_index_addition_map[output_node] = LocalCliffordExpansion( - new_node_index0, new_node_index1, new_node_index2 - ) + self.assign_meas_basis(new_node, PlannerMeasBasis(Plane.XY, 0.0)) + meas_basis = update_lc_basis(lc, PlannerMeasBasis(Plane.XY, 0.0)) + self.assign_meas_basis(old_output_node, meas_basis) - self.__output_node_indices = {} - for new_output_index, q_index in new_output_index_map.items(): - self.register_output(new_output_index, q_index) + node_index_addition_map[old_output_node] = LocalCliffordExpansion(new_node, new_output_node) return node_index_addition_map @@ -794,11 +811,10 @@ def from_base_graph_state( class LocalCliffordExpansion(NamedTuple): - """Local Clifford expansion map for each input/output node.""" + """Local Clifford expansion map.""" node1: int node2: int - node3: int class ExpansionMaps(NamedTuple): diff --git a/graphqomb/random_objects.py b/graphqomb/random_objects.py index d43bffc0d..d299d5b74 100644 --- a/graphqomb/random_objects.py +++ b/graphqomb/random_objects.py @@ -11,10 +11,13 @@ import numpy as np +from graphqomb.circuit import MBQCCircuit from graphqomb.common import default_meas_basis from graphqomb.graphstate import GraphState if TYPE_CHECKING: + from collections.abc import Sequence + from numpy.random import Generator @@ -80,3 +83,48 @@ def generate_random_flow_graph( flow[node_index - width] = {node_index} return graph, flow + + +def random_circ( + width: int, + depth: int, + rng: np.random.Generator | None = None, + edge_p: float = 0.5, + angle_candidates: Sequence[float] = (0.0, np.pi / 3, 2 * np.pi / 3, np.pi), +) -> MBQCCircuit: + r"""Generate a random MBQC circuit. + + Parameters + ---------- + width : `int` + circuit width + depth : `int` + circuit depth + rng : `numpy.random.Generator`, optional + random number generator, by default numpy.random.default_rng() + edge_p : `float`, optional + probability of adding CZ gate, by default 0.5 + angle_candidates : `collections.abc.Sequence[float]`, optional + sequence of angles, by default (0, np.pi / 3, 2 * np.pi / 3, np.pi) + + Returns + ------- + `MBQCCircuit` + generated MBQC circuit + """ + if rng is None: + rng = np.random.default_rng() + circ = MBQCCircuit(width) + for d in range(depth): + for j in range(width): + circ.j(j, rng.choice(angle_candidates)) + if d < depth - 1: + for j in range(width): + if rng.random() < edge_p: + circ.cz(j, (j + 1) % width) + num = rng.integers(0, width) + if num > 0: + target = sorted(set(rng.choice(range(width), num))) + circ.phase_gadget(target, rng.choice(angle_candidates)) + + return circ diff --git a/graphqomb/zxgraphstate.py b/graphqomb/zxgraphstate.py new file mode 100644 index 000000000..88f0878fe --- /dev/null +++ b/graphqomb/zxgraphstate.py @@ -0,0 +1,800 @@ +"""ZXGraph State classes for Measurement-based Quantum Computing. + +This module provides: + +- `ZXGraphState`: Graph State for the ZX-calculus. +- `to_zx_graphstate`: Convert input GraphState to ZXGraphState. +- `complete_graph_edges`: Return a set of edges for the complete graph on the given nodes. +""" + +from __future__ import annotations + +import sys +from collections import defaultdict +from itertools import combinations +from typing import TYPE_CHECKING + +import numpy as np + +from graphqomb.common import ( + Plane, + PlannerMeasBasis, + basis2tuple, + is_clifford_angle, + is_close_angle, + round_clifford_angle, +) +from graphqomb.euler import LocalClifford +from graphqomb.gflow_utils import _EQUIV_MEAS_BASIS_MAP +from graphqomb.graphstate import BaseGraphState, GraphState, bipartite_edges + +if sys.version_info >= (3, 10): + from typing import TypeAlias +else: + from typing_extensions import TypeAlias + +if TYPE_CHECKING: + from collections.abc import Callable, Iterable + from collections.abc import Set as AbstractSet + + CliffordRule: TypeAlias = tuple[Callable[[int, float], bool], Callable[[int], None]] + + +class ZXGraphState(GraphState): + r"""Graph State for the ZX-calculus. + + Attributes + ---------- + input_nodes : `set`\[`int`\] + set of input nodes + output_nodes : `set`\[`int`\] + set of output nodes + physical_nodes : `set`\[`int`\] + set of physical nodes + physical_edges : `set`\[`tuple`\[`int`, `int`\]\] + physical edges + meas_bases : `dict`\[`int`, `MeasBasis`\] + measurement bases + q_indices : `dict`\[`int`, `int`\] + qubit indices + local_cliffords : `dict`\[`int`, `LocalClifford`\] + local clifford operators + """ + + def __init__(self) -> None: + super().__init__() + + @property + def _clifford_rules(self) -> tuple[CliffordRule, ...]: + r"""Tuple of rules (check_func, action_func) for removing local clifford nodes. + + The rules are applied in the order they are defined. + + Returns + ------- + `tuple`\[`CliffordRule`, ...\] + Tuple of rules (check_func, action_func) before removing local clifford nodes. + If check_func(node) returns True, action_func(node) is executed. + Then, the removal of the local clifford node is performed if possible. + """ + return ( + (self._needs_lc, self.local_complement), + (self._is_trivial_meas, lambda _: None), + ( + self._needs_pivot, + lambda node: self.pivot(node, min(self.neighbors(node) - set(self.input_node_indices))), + ), + (self._needs_pivot_on_boundary, self.pivot_on_boundary), + ) + + def _assure_gflow(self, node: int, plane_map: dict[Plane, Plane], old_basis: tuple[Plane, float]) -> None: + r"""Transform the measurement basis after applying operation to assure gflow existence. + + This method is used to assure gflow existence + after the Clifford angle measurement basis is transformed by LocalClifford. + + Parameters + ---------- + node : `int` + node index + plane_map : `dict`\[`Plane`, `Plane`\] + mapping of planes + old_basis : `tuple[Plane, float]` + basis before applying the operation (such as local complement, pivot etc.) + """ + # Round first + cur = self.meas_bases[node] + rounded = round_clifford_angle(cur.angle) + self.assign_meas_basis(node, PlannerMeasBasis(cur.plane, rounded)) + + # Re-read after rounding + cur = self.meas_bases[node] + cur_key = basis2tuple(cur) + + # Convert to an equivalent basis if plane mismatch + if plane_map[old_basis[0]] != cur.plane: + self.assign_meas_basis(node, _EQUIV_MEAS_BASIS_MAP[cur_key]) + + def _update_connections( + self, rmv_edges: AbstractSet[tuple[int, int]], new_edges: AbstractSet[tuple[int, int]] + ) -> None: + r"""Update the physical edges of the graph state. + + Parameters + ---------- + rmv_edges : `collections.abc.Set`\[`tuple`\[`int`, `int`\]\] + edges to remove + new_edges : `collections.abc.Set`\[`tuple`\[`int`, `int`\]\] + edges to add + """ + for edge in rmv_edges: + self.remove_physical_edge(edge[0], edge[1]) + for edge in new_edges: + self.add_physical_edge(edge[0], edge[1]) + + def local_complement(self, node: int) -> None: + r"""Local complement operation on the graph state: G*u. + + Non-input node u gets Rx(pi/2) and its neighbors get Rz(-pi/2). + The edges between the neighbors of u are complemented. + + Parameters + ---------- + node : `int` + node index. The node must not be an input node. + + Raises + ------ + ValueError + If the node does not exist, is an input node, or the graph is not a ZX-diagram. + + Notes + ----- + Here we adopt the definition (lemma) of local complementation from [1]. + In some literature, local complementation is defined with Rx(-pi/2) on the target node + and Rz(pi/2) on the neighbors, which is strictly equivalent. + + References + ---------- + [1] Backens et al., Quantum 5, 421 (2021); arXiv:2003.01664v3 [quant-ph]. Lemma 2.31 and lemma 4.3 + """ + self._ensure_node_exists(node) + if node in self.input_node_indices: + msg = "Cannot apply local complement to input node." + raise ValueError(msg) + self._check_meas_basis() + + nbrs: set[int] = self.neighbors(node) + nbr_pairs: set[tuple[int, int]] = complete_graph_edges(nbrs) + new_edges = nbr_pairs - self.physical_edges + rmv_edges = self.physical_edges & nbr_pairs + + self._update_connections(rmv_edges, new_edges) + + # apply local clifford to node and assure gflow existence + lc = LocalClifford(0, np.pi / 2, 0) + old_meas_basis = self.meas_bases.get(node, None) + if old_meas_basis is None: + self.apply_local_clifford(node, lc) + else: + old_basis = basis2tuple(old_meas_basis) + self.apply_local_clifford(node, lc) + plane_map: dict[Plane, Plane] = {Plane.XY: Plane.XZ, Plane.XZ: Plane.XY, Plane.YZ: Plane.YZ} + self._assure_gflow(node, plane_map, old_basis) + + # apply local clifford to neighbors and assure gflow existence + lc = LocalClifford(-np.pi / 2, 0, 0) + plane_map = {Plane.XY: Plane.XY, Plane.XZ: Plane.YZ, Plane.YZ: Plane.XZ} + for v in nbrs: + old_meas_basis = self.meas_bases.get(v, None) + if old_meas_basis is None: + self.apply_local_clifford(v, lc) + continue + + self.apply_local_clifford(v, lc) + old_basis = basis2tuple(old_meas_basis) + self._assure_gflow(v, plane_map, old_basis) + + def _pivot(self, node1: int, node2: int) -> None: + """Pivot edges around nodes u and v in the graph state. + + Parameters + ---------- + node1 : `int` + node index + node2 : `int` + node index + """ + node1_nbrs = self.neighbors(node1) - {node2} + node2_nbrs = self.neighbors(node2) - {node1} + nbr_a = node1_nbrs & node2_nbrs + nbr_b = node1_nbrs - node2_nbrs + nbr_c = node2_nbrs - node1_nbrs + nbr_pairs = [ + bipartite_edges(nbr_a, nbr_b), + bipartite_edges(nbr_a, nbr_c), + bipartite_edges(nbr_b, nbr_c), + ] + + # complement edges between nbr_a, nbr_b, nbr_c + rmv_edges: set[tuple[int, int]] = set() + rmv_edges.update(*(p & self.physical_edges for p in nbr_pairs)) + add_edges: set[tuple[int, int]] = set() + add_edges.update(*(p - self.physical_edges for p in nbr_pairs)) + self._update_connections(rmv_edges, add_edges) + + # swap node u and node v + for b in nbr_b: + self.remove_physical_edge(node1, b) + self.add_physical_edge(node2, b) + for c in nbr_c: + self.remove_physical_edge(node2, c) + self.add_physical_edge(node1, c) + + def pivot(self, node1: int, node2: int) -> None: + """Pivot operation on the graph state: G∧(uv) (= G*u*v*u = G*v*u*v) for neighboring nodes u and v. + + Parameters + ---------- + node1 : `int` + node index. The node must not be an input node. + node2 : `int` + node index. The node must not be an input node. + + Raises + ------ + ValueError + If the nodes are input nodes, or the graph is not a ZX-diagram. + + Notes + ----- + Here we adopt the definition (lemma) of pivot from [1]. + In some literature, pivot is defined as below:: + + Rz(pi/2) Rx(-pi/2) Rz(pi/2) on the target nodes, + Rz(pi) on all the neighbors of both target nodes (not including the target nodes). + + This definition is strictly equivalent to the one adopted here. + + References + ---------- + [1] Backens et al., Quantum 5, 421 (2021); arXiv:2003.01664v3 [quant-ph]. Lemma 2.32 and lemma 4.5 + """ + self._ensure_node_exists(node1) + self._ensure_node_exists(node2) + if node1 in self.input_node_indices or node2 in self.input_node_indices: + msg = "Cannot apply pivot to input node" + raise ValueError(msg) + self._check_meas_basis() + self._pivot(node1, node2) + + # update node1 and node2 measurement + plane_map: dict[Plane, Plane] = {Plane.XY: Plane.YZ, Plane.XZ: Plane.XZ, Plane.YZ: Plane.XY} + lc = LocalClifford(np.pi / 2, np.pi / 2, np.pi / 2) + for a in {node1, node2}: + old_meas_basis = self.meas_bases.get(a, None) + if old_meas_basis is None: + self.apply_local_clifford(a, lc) + continue + + self.apply_local_clifford(a, lc) + old_basis = basis2tuple(old_meas_basis) + self._assure_gflow(a, plane_map, old_basis) + + # update nodes measurement of neighbors + plane_map = {Plane.XY: Plane.XY, Plane.XZ: Plane.XZ, Plane.YZ: Plane.YZ} + lc = LocalClifford(np.pi, 0, 0) + for w in self.neighbors(node1) & self.neighbors(node2): + old_meas_basis = self.meas_bases.get(w, None) + if old_meas_basis is None: + self.apply_local_clifford(w, lc) + continue + + self.apply_local_clifford(w, lc) + old_basis = basis2tuple(old_meas_basis) + self._assure_gflow(w, plane_map, old_basis) + + def _is_trivial_meas(self, node: int, atol: float = 1e-9) -> bool: + """Check if the node does not need any operation in order to perform _remove_clifford. + + For this operation, the followings must hold: + measurement plane = YZ or XZ + measurement angle = 0 or pi (mod 2pi) + + Parameters + ---------- + node : `int` + node index + atol : `float`, optional + absolute tolerance, by default 1e-9 + + Returns + ------- + `bool` + True if the node is a removable Clifford node. + + References + ---------- + [1] Backens et al., Quantum 5, 421 (2021); arXiv:2003.01664v3 [quant-ph]. Lemma 4.7 + """ + alpha = self.meas_bases[node].angle % (2.0 * np.pi) + return (self.meas_bases[node].plane in {Plane.YZ, Plane.XZ}) and is_close_angle(2 * alpha, 0, atol) + + def _needs_lc(self, node: int, atol: float = 1e-9) -> bool: + """Check if the node needs a local complementation in order to perform _remove_clifford. + + For this operation, the followings must hold: + measurement plane = XY or YZ + measurement angle = 0.5 pi or 1.5 pi (mod 2pi) + + Parameters + ---------- + node : `int` + node index + atol : `float`, optional + absolute tolerance, by default 1e-9 + + Returns + ------- + `bool` + True if the node needs a local complementation. + + References + ---------- + [1] Backens et al., Quantum 5, 421 (2021); arXiv:2003.01664v3 [quant-ph]. Lemma 4.8 + """ + alpha = self.meas_bases[node].angle % (2.0 * np.pi) + return self.meas_bases[node].plane in {Plane.XY, Plane.YZ} and is_close_angle(2 * (alpha - np.pi / 2), 0, atol) + + def _needs_pivot(self, node: int, atol: float = 1e-9) -> bool: + """Check if the node needs a pivot operation in order to perform _remove_clifford. + + The pivot operation is performed on the node and its non-input neighbor. + For this operation, either of the following must hold: + (a) measurement plane = XY and measurement angle = 0 or pi (mod 2pi) + (b) measurement plane = XZ and measurement angle = 0.5 pi or 1.5 pi (mod 2pi) + + Parameters + ---------- + node : `int` + node index + atol : `float`, optional + absolute tolerance, by default 1e-9 + + Returns + ------- + `bool` + True if the node needs a pivot operation. + + References + ---------- + [1] Backens et al., Quantum 5, 421 (2021); arXiv:2003.01664v3 [quant-ph]. Lemma 4.9 + """ + non_input_nbrs = self.neighbors(node) - set(self.input_node_indices) + if not non_input_nbrs: + return False + + alpha = self.meas_bases[node].angle % (2.0 * np.pi) + # (a) measurement plane = XY and measurement angle = 0 or pi (mod 2pi) + case_a = self.meas_bases[node].plane == Plane.XY and is_close_angle(2 * alpha, 0, atol) + # (b) measurement plane = XZ and measurement angle = 0.5 pi or 1.5 pi (mod 2pi) + case_b = self.meas_bases[node].plane == Plane.XZ and is_close_angle(2 * (alpha - np.pi / 2), 0, atol) + return case_a or case_b + + def _needs_pivot_on_boundary(self, node: int, atol: float = 1e-9) -> bool: + """Check if the node is non-input and all neighbors are input or output nodes. + + If True, pivot operation is performed on the node and its non-input neighbor, and then the node will be removed. + + For this operation, one of the following must hold: + (a) measurement plane = XY and measurement angle = 0 or pi (mod 2pi) + (b) measurement plane = XZ and measurement angle = 0.5 pi or 1.5 pi (mod 2pi) + + Parameters + ---------- + node : `int` + node index + atol : `float`, optional + absolute tolerance, by default 1e-9 + + Returns + ------- + `bool` + True if the node is non-input and all neighbors are input or output nodes. + + Notes + ----- + In order to follow the algorithm in Theorem 4.12 of Quantum 5, 421 (2021), + this function is not commonalized into _needs_pivot. + + References + ---------- + [1] Backens et al., Quantum 5, 421 (2021); arXiv:2003.01664v3 [quant-ph]. Lemma 4.11 + """ + non_input_nbrs = self.neighbors(node) - set(self.input_node_indices) + # check non_input_nbrs is composed of only output nodes and is not empty + if not (non_input_nbrs.issubset(set(self.output_node_indices)) and non_input_nbrs): + return False + + alpha = self.meas_bases[node].angle % (2.0 * np.pi) + # (a) measurement plane = XY and measurement angle = 0 or pi (mod 2pi) + case_a = self.meas_bases[node].plane == Plane.XY and is_close_angle(2 * alpha, 0, atol) + # (b) measurement plane = XZ and measurement angle = 0.5 pi or 1.5 pi (mod 2pi) + case_b = self.meas_bases[node].plane == Plane.XZ and is_close_angle(2 * (alpha - np.pi / 2), 0, atol) + return case_a or case_b + + def pivot_on_boundary(self, node: int) -> None: + """Perform the Clifford node removal on a corner case. + + Parameters + ---------- + node : `int` + node index + + References + ---------- + [1] Backens et al., Quantum 5, 421 (2021); arXiv:2003.01664v3 [quant-ph]. Lemma 4.11 + """ + output_nbr = min(self.neighbors(node) - set(self.input_node_indices)) + self.pivot(node, output_nbr) + + def _remove_clifford(self, node: int, atol: float = 1e-9) -> None: + """Perform the Clifford node removal. + + Parameters + ---------- + node : `int` + node index + atol : `float`, optional + absolute tolerance, by default 1e-9 + + Raises + ------ + ValueError + If the node is not a Clifford node. + + References + ---------- + [1] Backens et al., Quantum 5, 421 (2021); arXiv:2003.01664v3 [quant-ph]. Lemma 4.7 + """ + a_pi = self.meas_bases[node].angle % (2.0 * np.pi) + if not is_close_angle(2 * a_pi, 0, atol): + msg = "This node cannot be removed by _remove_clifford." + raise ValueError(msg) + + lc = LocalClifford(a_pi, 0, 0) + plane_map = {Plane.XY: Plane.XY, Plane.XZ: Plane.XZ, Plane.YZ: Plane.YZ} + for v in self.neighbors(node): + old_meas_basis = self.meas_bases.get(v, None) + if old_meas_basis is None: + self.apply_local_clifford(v, lc) + continue + + self.apply_local_clifford(v, lc) + old_basis = basis2tuple(old_meas_basis) + self._assure_gflow(v, plane_map, old_basis) + + self.remove_physical_node(node) + + def remove_clifford(self, node: int, atol: float = 1e-9) -> None: + """Remove the local Clifford node. + + Parameters + ---------- + node : `int` + node index + atol : `float`, optional + absolute tolerance, by default 1e-9 + + Raises + ------ + ValueError + 1. If the node is an input node. + 2. If the node is not a Clifford node. + 3. If all neighbors are input nodes + in some special cases ((meas_plane, meas_angle) = (XY, a pi), (XZ, a pi/2) for a = 0, 1). + 4. If the node has no neighbors that are not connected only to output nodes. + + References + ---------- + [1] Backens et al., Quantum 5, 421 (2021); arXiv:2003.01664v3 [quant-ph]. Theorem 4.12 + """ + self._ensure_node_exists(node) + if node in self.input_node_indices or node in self.output_node_indices: + msg = "Clifford node removal not allowed for input or output nodes." + raise ValueError(msg) + + if not ( + is_clifford_angle(self.meas_bases[node].angle, atol) + and self.meas_bases[node].plane in {Plane.XY, Plane.XZ, Plane.YZ} + ): + msg = "This node is not a Clifford node." + raise ValueError(msg) + + for check, action in self._clifford_rules: + if not check(node, atol): + continue + action(node) + self._remove_clifford(node, atol) + return + + msg = "This Clifford node is unremovable." + raise ValueError(msg) + + def is_removable_clifford(self, node: int, atol: float = 1e-9) -> bool: + """Check if the node is a removable Clifford node. + + Parameters + ---------- + node : `int` + node index + atol : `float`, optional + absolute tolerance, by default 1e-9 + + Returns + ------- + `bool` + True if the node is a removable Clifford node. + """ + return any( + [ + self._is_trivial_meas(node, atol), + self._needs_lc(node, atol), + self._needs_pivot(node, atol), + self._needs_pivot_on_boundary(node, atol), + ] + ) + + def remove_cliffords(self, atol: float = 1e-9) -> None: + """Remove all local clifford nodes which are removable. + + Parameters + ---------- + atol : `float`, optional + absolute tolerance, by default 1e-9 + """ + self._check_meas_basis() + while any( + self.is_removable_clifford(n, atol) + for n in (self.physical_nodes - set(self.input_node_indices) - set(self.output_node_indices)) + ): + for check, action in self._clifford_rules: + while True: + candidates = self.physical_nodes - set(self.input_node_indices) - set(self.output_node_indices) + clifford_node = next((node for node in candidates if check(node, atol)), None) + if clifford_node is None: + break + action(clifford_node) + self._remove_clifford(clifford_node, atol) + + def to_xy(self) -> None: + r"""Update some special measurement basis to logically equivalent XY-basis. + + - (Plane.XZ, \pm pi/2) -> (Plane.XY, 0 or pi) + - (Plane.YZ, \pm pi/2) -> (Plane.XY, \pm pi/2) + + This method is mainly used in convert_to_phase_gadget. + """ + for node, basis in self.meas_bases.items(): + if basis.plane == Plane.XZ and is_close_angle(basis.angle, np.pi / 2): + self.assign_meas_basis(node, PlannerMeasBasis(Plane.XY, 0.0)) + elif basis.plane == Plane.XZ and is_close_angle(basis.angle, -np.pi / 2): + self.assign_meas_basis(node, PlannerMeasBasis(Plane.XY, np.pi)) + elif basis.plane == Plane.YZ and is_close_angle(basis.angle, np.pi / 2): + self.assign_meas_basis(node, PlannerMeasBasis(Plane.XY, np.pi / 2)) + elif basis.plane == Plane.YZ and is_close_angle(basis.angle, -np.pi / 2): + self.assign_meas_basis(node, PlannerMeasBasis(Plane.XY, -np.pi / 2)) + + def to_yz(self) -> None: + r"""Update some special measurement basis to logically equivalent YZ-basis. + + - (Plane.XZ, 0) -> (Plane.YZ, 0) + - (Plane.XZ, pi) -> (Plane.YZ, pi) + + This method is mainly used in convert_to_phase_gadget. + """ + for node, basis in self.meas_bases.items(): + if basis.plane == Plane.XZ and is_close_angle(basis.angle, 0.0): + self.assign_meas_basis(node, PlannerMeasBasis(Plane.YZ, 0.0)) + elif basis.plane == Plane.XZ and is_close_angle(basis.angle, np.pi): + self.assign_meas_basis(node, PlannerMeasBasis(Plane.YZ, np.pi)) + + def to_xz(self) -> None: + r"""Update some special measurement basis to logically equivalent XZ-basis. + + This method is mainly used when we want to find a gflow. + """ + inputs = set(self.input_node_indices) + for node, basis in self.meas_bases.items(): + if node in inputs: + continue + if basis.plane == Plane.YZ and is_close_angle(basis.angle, 0.0): + self.assign_meas_basis(node, PlannerMeasBasis(Plane.XZ, 0.0)) + elif basis.plane == Plane.YZ and is_close_angle(basis.angle, np.pi): + self.assign_meas_basis(node, PlannerMeasBasis(Plane.XZ, np.pi)) + + def _extract_yz_adjacent_pair(self) -> tuple[int, int] | None: + r"""Call inside convert_to_phase_gadget. + + Find a pair of adjacent nodes that are both measured in the YZ-plane. + + Returns + ------- + `tuple`\[`int`, `int`\] | `None` + A pair of adjacent nodes that are both measured in the YZ-plane, or None if no such pair exists. + """ + yz_nodes = {node for node, basis in self.meas_bases.items() if basis.plane == Plane.YZ} + for u, v in self.physical_edges: + if u in yz_nodes and v in yz_nodes: + return (min(u, v), max(u, v)) + return None + + def _extract_xz_node(self) -> int | None: + """Call inside convert_to_phase_gadget. + + Find a node that is measured in the XZ-plane. + + Returns + ------- + `int` | `None` + A node that is measured in the XZ-plane, or None if no such node exists. + """ + for node, basis in self.meas_bases.items(): + if basis.plane == Plane.XZ: + return node + return None + + def convert_to_phase_gadget(self) -> None: + """Convert a ZX-diagram with gflow in MBQC+LC form into its phase-gadget form while preserving gflow.""" + while True: + self.to_xy() + self.to_yz() + if pair := self._extract_yz_adjacent_pair(): + self.pivot(*pair) + continue + if u := self._extract_xz_node(): + self.local_complement(u) + continue + break + + def merge_yz_to_xy(self) -> None: + """Merge YZ-measured nodes that have only one neighbor with an XY-measured node. + + If a node u is measured in the YZ-plane and u has only one neighbor v with a XY-measurement, + then the node u can be merged into the node v. + """ + target_candidates = { + u for u, basis in self.meas_bases.items() if (basis.plane == Plane.YZ and len(self.neighbors(u)) == 1) + } + target_nodes = { + u + for u in target_candidates + if ( + (v := next(iter(self.neighbors(u)))) + and (mb := self.meas_bases.get(v, None)) is not None + and mb.plane == Plane.XY + ) + } + for u in target_nodes: + (v,) = self.neighbors(u) + new_angle = (self.meas_bases[u].angle + self.meas_bases[v].angle) % (2.0 * np.pi) + self.assign_meas_basis(v, PlannerMeasBasis(Plane.XY, new_angle)) + self.remove_physical_node(u) + + def merge_yz_nodes(self) -> None: + """Merge isolated YZ-measured nodes into a single node. + + If u, v nodes are measured in the YZ-plane and u, v have the same neighbors, + then u, v can be merged into a single node. + """ + min_nodes = 2 + yz_nodes = {u for u, basis in self.meas_bases.items() if basis.plane == Plane.YZ} + if len(yz_nodes) < min_nodes: + return + neighbor_groups: dict[frozenset[int], list[int]] = defaultdict(list) + for u in yz_nodes: + neighbors = frozenset(self.neighbors(u)) + neighbor_groups[neighbors].append(u) + + for neighbors, nodes in neighbor_groups.items(): + if len(nodes) < min_nodes or len(neighbors) < min_nodes: + continue + new_angle = sum(self.meas_bases[v].angle for v in nodes) % (2.0 * np.pi) + self.assign_meas_basis(nodes[0], PlannerMeasBasis(Plane.YZ, new_angle)) + for v in nodes[1:]: + self.remove_physical_node(v) + + def full_reduce(self, atol: float = 1e-9) -> None: + """Reduce all Clifford nodes and some non-Clifford nodes. + + Repeat the following steps until there are no non-Clifford nodes: + 1. remove_cliffords + 2. convert_to_phase_gadget + 3. merge_yz_to_xy + 4. merge_yz_nodes + 5. if there are some removable Clifford nodes, back to step 1. + + Parameters + ---------- + atol : `float`, optional + absolute tolerance, by default 1e-9 + """ + while True: + self.remove_cliffords(atol) + self.convert_to_phase_gadget() + self.merge_yz_to_xy() + self.merge_yz_nodes() + if not any( + self.is_removable_clifford(node, atol) + for node in self.physical_nodes - set(self.input_node_indices) - set(self.output_node_indices) + ): + break + + +def to_zx_graphstate(graph: BaseGraphState) -> tuple[ZXGraphState, dict[int, int]]: + r"""Convert input graph to ZXGraphState. + + Parameters + ---------- + graph : `BaseGraphState` + The graph state to convert. + + Returns + ------- + `tuple`\[`ZXGraphState`, `dict`\[`int`, `int`\]\] + Converted ZXGraphState and node map for old node index to new node index. + + Raises + ------ + TypeError + If the input graph is not an instance of GraphState. + """ + graph.check_canonical_form() + if not isinstance(graph, GraphState): + msg = "The input graph must be an instance of GraphState." + raise TypeError(msg) + + node_map: dict[int, int] = {} + zx_graph = ZXGraphState() + + # Copy all physical nodes and measurement bases + for node in graph.physical_nodes: + node_index = zx_graph.add_physical_node() + node_map[node] = node_index + meas_basis = graph.meas_bases.get(node, None) + if meas_basis is not None: + zx_graph.assign_meas_basis(node_index, meas_basis) + + # Register input nodes + for node, q_index in graph.input_node_indices.items(): + zx_graph.register_input(node_map[node], q_index) + + # Register output nodes + for node, q_index in graph.output_node_indices.items(): + zx_graph.register_output(node_map[node], q_index) + + # Copy all physical edges + for u, v in graph.physical_edges: + zx_graph.add_physical_edge(node_map[u], node_map[v]) + + # Copy local Clifford operators + for node, lc in graph.local_cliffords.items(): + zx_graph.apply_local_clifford(node_map[node], lc) + + return zx_graph, node_map + + +def complete_graph_edges(nodes: Iterable[int]) -> set[tuple[int, int]]: + r"""Return a set of edges for the complete graph on the given nodes. + + Parameters + ---------- + nodes : `Iterable`\[`int`\] + nodes + + Returns + ------- + `set`\[`tuple`\[`int`, `int`\]\] + edges of the complete graph + """ + return {(min(u, v), max(u, v)) for u, v in combinations(nodes, 2)} diff --git a/requirements.txt b/requirements.txt index 59ee09d07..2316d62c6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,4 +2,5 @@ numpy>=1.22,<3 matplotlib networkx ortools>=9,<10 +swiflow typing_extensions diff --git a/tests/test_euler.py b/tests/test_euler.py index fb3614998..71be3a17f 100644 --- a/tests/test_euler.py +++ b/tests/test_euler.py @@ -130,8 +130,8 @@ def test_lc_basis_update( angle = rng.uniform(0, 2 * np.pi) basis = PlannerMeasBasis(plane, angle) basis_updated = update_lc_basis(lc, basis) - ref_updated_basis = lc.matrix() @ basis.vector() - inner_product = abs(np.vdot(basis_updated.vector(), ref_updated_basis)) + ref_updated_vector = lc.conjugate().matrix() @ basis.vector() + inner_product = abs(np.vdot(basis_updated.vector(), ref_updated_vector)) assert np.isclose(inner_product, 1) @@ -140,17 +140,16 @@ def test_local_complement_target_update(plane: Plane, rng: np.random.Generator) lc = LocalClifford(0, np.pi / 2, 0) measurement_action: dict[Plane, tuple[Plane, Callable[[float], float]]] = { Plane.XY: (Plane.XZ, lambda angle: angle + np.pi / 2), - Plane.XZ: (Plane.XY, lambda angle: np.pi / 2 - angle), + Plane.XZ: (Plane.XY, lambda angle: -angle + np.pi / 2), Plane.YZ: (Plane.YZ, lambda angle: angle + np.pi / 2), } angle = rng.random() * 2 * np.pi meas_basis = PlannerMeasBasis(plane, angle) - result_basis = update_lc_basis(lc.conjugate(), meas_basis) + result_basis = update_lc_basis(lc, meas_basis) ref_plane, ref_angle_func = measurement_action[plane] ref_angle = ref_angle_func(angle) - assert result_basis.plane == ref_plane assert is_close_angle(result_basis.angle, ref_angle) @@ -167,9 +166,80 @@ def test_local_complement_neighbors(plane: Plane, rng: np.random.Generator) -> N angle = rng.random() * 2 * np.pi meas_basis = PlannerMeasBasis(plane, angle) - result_basis = update_lc_basis(lc.conjugate(), meas_basis) + result_basis = update_lc_basis(lc, meas_basis) ref_plane, ref_angle_func = measurement_action[plane] ref_angle = ref_angle_func(angle) assert result_basis.plane == ref_plane assert is_close_angle(result_basis.angle, ref_angle) + + +@pytest.mark.parametrize("plane", list(Plane)) +def test_pivot_target_update(plane: Plane, rng: np.random.Generator) -> None: + lc = LocalClifford(np.pi / 2, np.pi / 2, np.pi / 2) + measurement_action: dict[Plane, tuple[Plane, Callable[[float], float]]] = { + Plane.XY: (Plane.YZ, lambda angle: -1 * angle), + Plane.XZ: (Plane.XZ, lambda angle: (np.pi / 2 - angle)), + Plane.YZ: (Plane.XY, lambda angle: -1 * angle), + } + + angle = rng.random() * 2 * np.pi + + meas_basis = PlannerMeasBasis(plane, angle) + result_basis = update_lc_basis(lc, meas_basis) + ref_plane, ref_angle_func = measurement_action[plane] + ref_angle = ref_angle_func(angle) + + assert result_basis.plane == ref_plane + assert is_close_angle(result_basis.angle, ref_angle) + + +@pytest.mark.parametrize("plane", list(Plane)) +def test_pivot_neighbors(plane: Plane, rng: np.random.Generator) -> None: + lc = LocalClifford(np.pi, 0, 0) + measurement_action: dict[Plane, tuple[Plane, Callable[[float], float]]] = { + Plane.XY: (Plane.XY, lambda angle: (angle + np.pi) % (2.0 * np.pi)), + Plane.XZ: (Plane.XZ, lambda angle: -1 * angle), + Plane.YZ: (Plane.YZ, lambda angle: -1 * angle), + } + + angle = rng.random() * 2 * np.pi + + meas_basis = PlannerMeasBasis(plane, angle) + result_basis = update_lc_basis(lc, meas_basis) + ref_plane, ref_angle_func = measurement_action[plane] + ref_angle = ref_angle_func(angle) + + assert result_basis.plane == ref_plane + assert is_close_angle(result_basis.angle, ref_angle) + + +@pytest.mark.parametrize("plane", list(Plane)) +def test_remove_clifford_update(plane: Plane, rng: np.random.Generator) -> None: + measurement_action: dict[Plane, tuple[Plane, Callable[[float, float], float]]] = { + Plane.XY: ( + Plane.XY, + lambda a_pi, alpha: (alpha if is_close_angle(a_pi, 0) else alpha + np.pi) % (2.0 * np.pi), + ), + Plane.XZ: ( + Plane.XZ, + lambda a_pi, alpha: (alpha if is_close_angle(a_pi, 0) else -alpha) % (2.0 * np.pi), + ), + Plane.YZ: ( + Plane.YZ, + lambda a_pi, alpha: (alpha if is_close_angle(a_pi, 0) else -alpha) % (2.0 * np.pi), + ), + } + + angle = rng.random() * 2 * np.pi + + a_pi = np.pi + for a_pi in (0.0, np.pi): + lc = LocalClifford(a_pi, 0, 0) + meas_basis = PlannerMeasBasis(plane, angle) + result_basis = update_lc_basis(lc, meas_basis) + ref_plane, ref_angle_func = measurement_action[plane] + ref_angle = ref_angle_func(a_pi, angle) + + assert result_basis.plane == ref_plane + assert is_close_angle(result_basis.angle, ref_angle) diff --git a/tests/test_graphstate.py b/tests/test_graphstate.py index e649ff916..565431889 100644 --- a/tests/test_graphstate.py +++ b/tests/test_graphstate.py @@ -5,8 +5,8 @@ import numpy as np import pytest -from graphqomb.common import Plane, PlannerMeasBasis -from graphqomb.euler import LocalClifford +from graphqomb.common import Plane, PlannerMeasBasis, is_close_angle +from graphqomb.euler import LocalClifford, update_lc_basis from graphqomb.graphstate import GraphState, bipartite_edges, odd_neighbors @@ -193,6 +193,123 @@ def test_assign_meas_basis(graph: GraphState) -> None: assert graph.meas_bases[node_index].angle == 0.5 * np.pi +@pytest.mark.parametrize( + "gamma", + [0, np.pi / 2, np.pi, 3 * np.pi / 2], +) +def test_expand_input_local_cliffords_xy_plane(graph: GraphState, gamma: float) -> None: + """Test expanding local Clifford operators on an input node with XY measurement plane.""" + old_input_node = graph.add_physical_node() + output_node = graph.add_physical_node() + graph.add_physical_edge(old_input_node, output_node) + graph.register_input(old_input_node, 0) + graph.register_output(output_node, 0) + old_input_angle = np.pi / 3 + graph.assign_meas_basis(old_input_node, PlannerMeasBasis(Plane.XY, old_input_angle)) + + alpha = 0.0 + beta = 0.0 + lc = LocalClifford(alpha=alpha, beta=beta, gamma=gamma) + graph.apply_local_clifford(old_input_node, lc) + graph.expand_local_cliffords() + + new_input_node = 2 + assert graph.input_node_indices == {new_input_node: 0} + for node in graph.physical_nodes - set(graph.output_node_indices): + assert graph.meas_bases[node].plane == Plane.XY + assert is_close_angle(graph.meas_bases[new_input_node].angle, 0.0) + assert is_close_angle(graph.meas_bases[new_input_node + 1].angle, 0.0) + assert is_close_angle(graph.meas_bases[old_input_node].angle, old_input_angle - gamma) + + +@pytest.mark.parametrize( + "gamma", + [0, np.pi, np.pi / 2, 3 * np.pi / 2], +) +def test_expand_input_local_cliffords_yz_plane(graph: GraphState, gamma: float) -> None: + """Test expanding local Clifford operators on an input node with YZ measurement plane.""" + old_input_node = graph.add_physical_node() + output_node = graph.add_physical_node() + graph.add_physical_edge(old_input_node, output_node) + graph.register_input(old_input_node, 0) + graph.register_output(output_node, 0) + old_input_angle = np.pi / 3 + graph.assign_meas_basis(old_input_node, PlannerMeasBasis(Plane.YZ, old_input_angle)) + + alpha = 0.0 + beta = 0.0 + lc = LocalClifford(alpha=alpha, beta=beta, gamma=gamma) + graph.apply_local_clifford(old_input_node, lc) + graph.expand_local_cliffords() + + exp_angle: float = old_input_angle + if is_close_angle(2 * gamma, 0): + assert graph.meas_bases[old_input_node].plane == Plane.YZ + exp_angle = old_input_angle if is_close_angle(gamma, 0) else -old_input_angle + elif is_close_angle(2 * (gamma - np.pi / 2), 0): + assert graph.meas_bases[old_input_node].plane == Plane.XZ + exp_angle = old_input_angle if is_close_angle(gamma - np.pi / 2, 0) else -old_input_angle + assert is_close_angle(graph.meas_bases[old_input_node].angle, exp_angle) + new_input_node = 2 + assert graph.meas_bases[new_input_node].plane == Plane.XY + assert is_close_angle(graph.meas_bases[new_input_node].angle, 0.0) + + +@pytest.mark.parametrize( + "gamma", + [0, np.pi / 2, np.pi, 3 * np.pi / 2], +) +def test_expand_input_local_cliffords_xz_plane(graph: GraphState, gamma: float) -> None: + """Test expanding local Clifford operators on an input node with XZ measurement plane.""" + old_input_node = graph.add_physical_node() + output_node = graph.add_physical_node() + graph.add_physical_edge(old_input_node, output_node) + graph.register_input(old_input_node, 0) + graph.register_output(output_node, 0) + old_input_angle = np.pi / 3 + graph.assign_meas_basis(old_input_node, PlannerMeasBasis(Plane.XZ, old_input_angle)) + + alpha = 0.0 + beta = 0.0 + lc = LocalClifford(alpha=alpha, beta=beta, gamma=gamma) + graph.apply_local_clifford(old_input_node, lc) + graph.expand_local_cliffords() + + exp_angle: float = old_input_angle + if is_close_angle(2 * gamma, 0): + assert graph.meas_bases[old_input_node].plane == Plane.XZ + exp_angle = old_input_angle if is_close_angle(gamma, 0) else -old_input_angle + elif is_close_angle(2 * (gamma - np.pi / 2), 0): + assert graph.meas_bases[old_input_node].plane == Plane.YZ + exp_angle = old_input_angle if is_close_angle(gamma + np.pi / 2, 0) else -old_input_angle + assert is_close_angle(graph.meas_bases[old_input_node].angle, exp_angle) + new_input_node = 2 + assert graph.meas_bases[new_input_node].plane == Plane.XY + assert is_close_angle(graph.meas_bases[new_input_node].angle, 0.0) + + +def test_expand_output_local_cliffords(graph: GraphState) -> None: + """Test expanding local Clifford operators on an output node.""" + input_node = graph.add_physical_node() + old_output_node = graph.add_physical_node() + graph.add_physical_edge(input_node, old_output_node) + graph.register_input(input_node, 0) + graph.register_output(old_output_node, 0) + graph.assign_meas_basis(input_node, PlannerMeasBasis(Plane.XY, 0.0)) + + lc = LocalClifford(alpha=np.pi / 2, beta=np.pi, gamma=3 * np.pi / 2) + graph.apply_local_clifford(old_output_node, lc) + graph.expand_local_cliffords() + + new_output_node = 5 + assert graph.output_node_indices == {new_output_node: 0} + assert graph.meas_bases[old_output_node + 1].plane == Plane.XY + assert is_close_angle(graph.meas_bases[old_output_node + 1].angle, 0.0) + meas_basis = update_lc_basis(lc, PlannerMeasBasis(Plane.XY, 0.0)) + assert graph.meas_bases[old_output_node].plane == meas_basis.plane + assert is_close_angle(graph.meas_bases[old_output_node].angle, meas_basis.angle) + + def test_check_canonical_form_true(canonical_graph: GraphState) -> None: """Test if the graph is in canonical form.""" canonical_graph.check_canonical_form() diff --git a/tests/test_zxgraphstate.py b/tests/test_zxgraphstate.py new file mode 100644 index 000000000..c6d6aa8d5 --- /dev/null +++ b/tests/test_zxgraphstate.py @@ -0,0 +1,1141 @@ +"""Tests for ZXGraphState + +Measurement actions for the followings are used: + - Local complement (LC): MEAS_ACTION_LC_* + - Pivot (PV): MEAS_ACTION_PV_* + - Remove Cliffords (RC): MEAS_ACTION_RC + +Reference: + M. Backens et al., Quantum 5, 421 (2021). + https://doi.org/10.22331/q-2021-03-25-421 +""" + +from __future__ import annotations + +import itertools +import operator +from copy import deepcopy +from typing import TYPE_CHECKING + +import numpy as np +import pytest + +from graphqomb.common import Plane, PlannerMeasBasis, is_close_angle +from graphqomb.euler import LocalClifford +from graphqomb.gflow_utils import gflow_wrapper +from graphqomb.qompiler import qompile +from graphqomb.random_objects import generate_random_flow_graph +from graphqomb.simulator import PatternSimulator, SimulatorBackend +from graphqomb.zxgraphstate import ZXGraphState, to_zx_graphstate + +if TYPE_CHECKING: + from collections.abc import Sequence + from typing import Callable + + Measurements = list[tuple[int, PlannerMeasBasis]] + +MEAS_ACTION_LC_TARGET: dict[Plane, tuple[Plane, Callable[[float], float]]] = { + Plane.XY: (Plane.XZ, lambda angle: angle + np.pi / 2), + Plane.XZ: (Plane.XY, lambda angle: -angle + np.pi / 2), + Plane.YZ: (Plane.YZ, lambda angle: angle + np.pi / 2), +} +MEAS_ACTION_LC_NEIGHBORS: dict[Plane, tuple[Plane, Callable[[float], float]]] = { + Plane.XY: (Plane.XY, lambda angle: angle + np.pi / 2), + Plane.XZ: (Plane.YZ, lambda angle: angle), + Plane.YZ: (Plane.XZ, operator.neg), +} +MEAS_ACTION_PV_TARGET: dict[Plane, tuple[Plane, Callable[[float], float]]] = { + Plane.XY: (Plane.YZ, operator.neg), + Plane.XZ: (Plane.XZ, lambda angle: (np.pi / 2 - angle)), + Plane.YZ: (Plane.XY, operator.neg), +} +MEAS_ACTION_PV_NEIGHBORS: dict[Plane, tuple[Plane, Callable[[float], float]]] = { + Plane.XY: (Plane.XY, lambda angle: (angle + np.pi) % (2.0 * np.pi)), + Plane.XZ: (Plane.XZ, operator.neg), + Plane.YZ: (Plane.YZ, operator.neg), +} +ATOL = 1e-9 +MEAS_ACTION_RC: dict[Plane, tuple[Plane, Callable[[float, float], float]]] = { + Plane.XY: ( + Plane.XY, + lambda a_pi, alpha: (alpha if is_close_angle(a_pi, 0, ATOL) else alpha + np.pi) % (2.0 * np.pi), + ), + Plane.XZ: ( + Plane.XZ, + lambda a_pi, alpha: (alpha if is_close_angle(a_pi, 0, ATOL) else -alpha) % (2.0 * np.pi), + ), + Plane.YZ: ( + Plane.YZ, + lambda a_pi, alpha: (alpha if is_close_angle(a_pi, 0, ATOL) else -alpha) % (2.0 * np.pi), + ), +} + + +def plane_combinations(n: int) -> list[tuple[Plane, ...]]: + """Generate all combinations of planes of length n. + + Parameters + ---------- + n : int + The length of the combinations. n > 1. + + Returns + ------- + list[tuple[Plane, ...]] + A list of tuples containing all combinations of planes of length n. + """ + return list(itertools.product(Plane, repeat=n)) + + +@pytest.fixture +def rng() -> np.random.Generator: + return np.random.default_rng() + + +@pytest.fixture +def zx_graph() -> ZXGraphState: + """Generate an empty ZXGraphState object. + + Returns + ------- + ZXGraphState: An empty ZXGraphState object. + """ + return ZXGraphState() + + +def _initialize_graph( + zx_graph: ZXGraphState, + nodes: range, + edges: set[tuple[int, int]], + inputs: Sequence[int] = (), + outputs: Sequence[int] = (), +) -> None: + """Initialize a ZXGraphState object with the given nodes and edges. + + Parameters + ---------- + zx_graph : ZXGraphState + The ZXGraphState object to initialize. + nodes : range + nodes to add to the graph. + edges : list[tuple[int, int]] + edges to add to the graph. + inputs : Sequence[int], optional + input nodes, by default (). + outputs : Sequence[int], optional + output nodes, by default (). + + Raises + ------ + ValueError + If the number of output nodes is greater than the number of input nodes. + """ + for _ in nodes: + zx_graph.add_physical_node() + for i, j in edges: + zx_graph.add_physical_edge(i, j) + + node2q_index: dict[int, int] = {} + q_indices: list[int] = [] + + for i, node in enumerate(inputs): + zx_graph.register_input(node, q_index=i) + node2q_index[node] = i + q_indices.append(i) + + if len(outputs) > len(q_indices): + msg = "Cannot assign valid q_index to all output nodes." + raise ValueError(msg) + + for i, node in enumerate(outputs): + q_index = node2q_index.get(node, q_indices[i]) + zx_graph.register_output(node, q_index) + + +def _apply_measurements(zx_graph: ZXGraphState, measurements: Measurements) -> None: + for node_id, planner_meas_basis in measurements: + if node_id in zx_graph.output_node_indices: + continue + zx_graph.assign_meas_basis(node_id, planner_meas_basis) + + +def _test( + zx_graph: ZXGraphState, + exp_nodes: set[int], + exp_edges: set[tuple[int, int]], + exp_measurements: Measurements, +) -> None: + assert zx_graph.physical_nodes == exp_nodes + assert zx_graph.physical_edges == exp_edges + for node_id, planner_meas_basis in exp_measurements: + assert zx_graph.meas_bases[node_id].plane == planner_meas_basis.plane + assert is_close_angle(zx_graph.meas_bases[node_id].angle, planner_meas_basis.angle) + + +def test_apply_local_clifford_to_planner_meas_basis_xy_1(zx_graph: ZXGraphState, rng: np.random.Generator) -> None: + """Test apply_local_clifford correctly updates the PlannerMeasBasis(XY, angle).""" + node = zx_graph.add_physical_node() + ref_zx_graph = deepcopy(zx_graph) + + angle = rng.random() * 2 * np.pi + zx_graph.assign_meas_basis(node, PlannerMeasBasis(Plane.XY, angle)) + lc = LocalClifford(-np.pi / 2, 0.0, 0.0) + zx_graph.apply_local_clifford(node, lc) + meas_vector = zx_graph.meas_bases[node].vector() + + ref_meas_basis = PlannerMeasBasis(Plane.XY, angle + np.pi / 2) + ref_zx_graph.assign_meas_basis(node, ref_meas_basis) + ref_meas_vector = ref_zx_graph.meas_bases[node].vector() + assert np.isclose(abs(np.vdot(meas_vector, ref_meas_vector)), 1) + assert zx_graph.meas_bases[node].plane == ref_meas_basis.plane + assert is_close_angle(zx_graph.meas_bases[node].angle, ref_meas_basis.angle) + + +def test_apply_local_clifford_to_planner_meas_basis_xy_2(zx_graph: ZXGraphState, rng: np.random.Generator) -> None: + """Test apply_local_clifford correctly updates the PlannerMeasBasis(XY, angle).""" + node = zx_graph.add_physical_node() + ref_zx_graph = deepcopy(zx_graph) + + angle = rng.random() * 2 * np.pi + zx_graph.assign_meas_basis(node, PlannerMeasBasis(Plane.XY, angle)) + lc = LocalClifford(0.0, np.pi / 2, 0.0) + zx_graph.apply_local_clifford(node, lc) + meas_vector = zx_graph.meas_bases[node].vector() + + ref_meas_basis = PlannerMeasBasis(Plane.XZ, angle + np.pi / 2) + ref_zx_graph.assign_meas_basis(node, ref_meas_basis) + ref_meas_vector = ref_zx_graph.meas_bases[node].vector() + assert np.isclose(abs(np.vdot(meas_vector, ref_meas_vector)), 1) + assert zx_graph.meas_bases[node].plane == ref_meas_basis.plane + assert is_close_angle(zx_graph.meas_bases[node].angle, ref_meas_basis.angle) + + +def test_apply_local_clifford_to_planner_meas_basis_xy_3(zx_graph: ZXGraphState, rng: np.random.Generator) -> None: + """Test apply_local_clifford correctly updates the PlannerMeasBasis(XY, angle).""" + node = zx_graph.add_physical_node() + ref_zx_graph = deepcopy(zx_graph) + + angle = rng.random() * 2 * np.pi + zx_graph.assign_meas_basis(node, PlannerMeasBasis(Plane.XY, angle)) + lc = LocalClifford(np.pi / 2, np.pi / 2, np.pi / 2) + zx_graph.apply_local_clifford(node, lc) + meas_vector = zx_graph.meas_bases[node].vector() + + ref_meas_basis = PlannerMeasBasis(Plane.YZ, -angle) + ref_zx_graph.assign_meas_basis(node, ref_meas_basis) + ref_meas_vector = ref_zx_graph.meas_bases[node].vector() + assert np.isclose(abs(np.vdot(meas_vector, ref_meas_vector)), 1) + assert zx_graph.meas_bases[node].plane == ref_meas_basis.plane + assert is_close_angle(zx_graph.meas_bases[node].angle, ref_meas_basis.angle) + + +def test_apply_local_clifford_to_planner_meas_basis_xz_1(zx_graph: ZXGraphState, rng: np.random.Generator) -> None: + """Test apply_local_clifford correctly updates the PlannerMeasBasis(XZ, angle).""" + node = zx_graph.add_physical_node() + ref_zx_graph = deepcopy(zx_graph) + + angle = rng.random() * 2 * np.pi + zx_graph.assign_meas_basis(node, PlannerMeasBasis(Plane.XZ, angle)) + lc = LocalClifford(-np.pi / 2, 0.0, 0.0) + zx_graph.apply_local_clifford(node, lc) + meas_vector = zx_graph.meas_bases[node].vector() + + ref_meas_basis = PlannerMeasBasis(Plane.YZ, angle) + ref_zx_graph.assign_meas_basis(node, ref_meas_basis) + ref_meas_vector = ref_zx_graph.meas_bases[node].vector() + assert np.isclose(abs(np.vdot(meas_vector, ref_meas_vector)), 1) + assert zx_graph.meas_bases[node].plane == ref_meas_basis.plane + assert is_close_angle(zx_graph.meas_bases[node].angle, ref_meas_basis.angle) + + +def test_apply_local_clifford_to_planner_meas_basis_xz_2(zx_graph: ZXGraphState, rng: np.random.Generator) -> None: + """Test apply_local_clifford correctly updates the PlannerMeasBasis(XZ, angle).""" + node = zx_graph.add_physical_node() + ref_zx_graph = deepcopy(zx_graph) + + angle = rng.random() * 2 * np.pi + zx_graph.assign_meas_basis(node, PlannerMeasBasis(Plane.XZ, angle)) + lc = LocalClifford(0.0, np.pi / 2, 0.0) + zx_graph.apply_local_clifford(node, lc) + meas_vector = zx_graph.meas_bases[node].vector() + + ref_meas_basis = PlannerMeasBasis(Plane.XY, -angle + np.pi / 2) + ref_zx_graph.assign_meas_basis(node, ref_meas_basis) + ref_meas_vector = ref_zx_graph.meas_bases[node].vector() + assert np.isclose(abs(np.vdot(meas_vector, ref_meas_vector)), 1) + assert zx_graph.meas_bases[node].plane == ref_meas_basis.plane + assert is_close_angle(zx_graph.meas_bases[node].angle, ref_meas_basis.angle) + + +def test_apply_local_clifford_to_planner_meas_basis_xz_3(zx_graph: ZXGraphState, rng: np.random.Generator) -> None: + """Test apply_local_clifford correctly updates the PlannerMeasBasis(XZ, angle).""" + node = zx_graph.add_physical_node() + ref_zx_graph = deepcopy(zx_graph) + + angle = rng.random() * 2 * np.pi + zx_graph.assign_meas_basis(node, PlannerMeasBasis(Plane.XZ, angle)) + lc = LocalClifford(np.pi / 2, np.pi / 2, np.pi / 2) + zx_graph.apply_local_clifford(node, lc) + meas_vector = zx_graph.meas_bases[node].vector() + + ref_meas_basis = PlannerMeasBasis(Plane.XZ, -angle + np.pi / 2) + ref_zx_graph.assign_meas_basis(node, ref_meas_basis) + ref_meas_vector = ref_zx_graph.meas_bases[node].vector() + assert np.isclose(abs(np.vdot(meas_vector, ref_meas_vector)), 1) + assert zx_graph.meas_bases[node].plane == ref_meas_basis.plane + assert is_close_angle(zx_graph.meas_bases[node].angle, ref_meas_basis.angle) + + +def test_apply_local_clifford_to_planner_meas_basis_yz_1(zx_graph: ZXGraphState, rng: np.random.Generator) -> None: + """Test apply_local_clifford correctly updates the PlannerMeasBasis(YZ, angle).""" + node = zx_graph.add_physical_node() + ref_zx_graph = deepcopy(zx_graph) + + angle = rng.random() * 2 * np.pi + zx_graph.assign_meas_basis(node, PlannerMeasBasis(Plane.YZ, angle)) + lc = LocalClifford(-np.pi / 2, 0.0, 0.0) + zx_graph.apply_local_clifford(node, lc) + meas_vector = zx_graph.meas_bases[node].vector() + + ref_meas_basis = PlannerMeasBasis(Plane.XZ, -angle) + ref_zx_graph.assign_meas_basis(node, ref_meas_basis) + ref_meas_vector = ref_zx_graph.meas_bases[node].vector() + assert np.isclose(abs(np.vdot(meas_vector, ref_meas_vector)), 1) + assert zx_graph.meas_bases[node].plane == ref_meas_basis.plane + assert is_close_angle(zx_graph.meas_bases[node].angle, ref_meas_basis.angle) + + +def test_apply_local_clifford_to_planner_meas_basis_yz_2(zx_graph: ZXGraphState, rng: np.random.Generator) -> None: + """Test apply_local_clifford correctly updates the PlannerMeasBasis(YZ, angle).""" + node = zx_graph.add_physical_node() + ref_zx_graph = deepcopy(zx_graph) + + angle = rng.random() * 2 * np.pi + zx_graph.assign_meas_basis(node, PlannerMeasBasis(Plane.YZ, angle)) + lc = LocalClifford(0.0, np.pi / 2, 0.0) + zx_graph.apply_local_clifford(node, lc) + meas_vector = zx_graph.meas_bases[node].vector() + + ref_meas_basis = PlannerMeasBasis(Plane.YZ, angle + np.pi / 2) + ref_zx_graph.assign_meas_basis(node, ref_meas_basis) + ref_meas_vector = ref_zx_graph.meas_bases[node].vector() + assert np.isclose(abs(np.vdot(meas_vector, ref_meas_vector)), 1) + assert zx_graph.meas_bases[node].plane == ref_meas_basis.plane + assert is_close_angle(zx_graph.meas_bases[node].angle, ref_meas_basis.angle) + + +def test_apply_local_clifford_to_planner_meas_basis_yz_3(zx_graph: ZXGraphState, rng: np.random.Generator) -> None: + """Test apply_local_clifford correctly updates the PlannerMeasBasis(YZ, angle).""" + node = zx_graph.add_physical_node() + ref_zx_graph = deepcopy(zx_graph) + + angle = rng.random() * 2 * np.pi + zx_graph.assign_meas_basis(node, PlannerMeasBasis(Plane.YZ, angle)) + lc = LocalClifford(np.pi / 2, np.pi / 2, np.pi / 2) + zx_graph.apply_local_clifford(node, lc) + meas_vector = zx_graph.meas_bases[node].vector() + + ref_meas_basis = PlannerMeasBasis(Plane.XY, -angle) + ref_zx_graph.assign_meas_basis(node, ref_meas_basis) + ref_meas_vector = ref_zx_graph.meas_bases[node].vector() + assert np.isclose(abs(np.vdot(meas_vector, ref_meas_vector)), 1) + assert zx_graph.meas_bases[node].plane == ref_meas_basis.plane + assert is_close_angle(zx_graph.meas_bases[node].angle, ref_meas_basis.angle) + + +def test_local_complement_fails_if_nonexistent_node(zx_graph: ZXGraphState) -> None: + """Test local complement raises an error if the node does not exist.""" + with pytest.raises(ValueError, match="Node does not exist node=0"): + zx_graph.local_complement(0) + + +def test_local_complement_fails_if_not_zx_graph(zx_graph: ZXGraphState) -> None: + """Test local complement raises an error if the graph is not a ZX-diagram.""" + node = zx_graph.add_physical_node() + with pytest.raises(ValueError, match="Measurement basis not set for node 0"): + zx_graph.local_complement(node) + + +def test_local_complement_fails_with_input_node(zx_graph: ZXGraphState) -> None: + """Test local complement fails with input node.""" + node = zx_graph.add_physical_node() + zx_graph.register_input(node, q_index=0) + with pytest.raises(ValueError, match=r"Cannot apply local complement to input node."): + zx_graph.local_complement(node) + + +def test_local_clifford_updates_xy_basis(zx_graph: ZXGraphState, rng: np.random.Generator) -> None: + """Applying an LC should update stored XY angles using the public convention.""" + node = zx_graph.add_physical_node() + angle = rng.random() * 2 * np.pi + zx_graph.assign_meas_basis(node, PlannerMeasBasis(Plane.XY, angle)) + lc = LocalClifford(0.0, 0.0, -np.pi / 2) + + zx_graph.apply_local_clifford(node, lc) + meas_basis_vector = zx_graph.meas_bases[node].vector() + ref_vector = PlannerMeasBasis(Plane.XY, angle + np.pi / 2).vector() + inner_product = abs(np.vdot(meas_basis_vector, ref_vector)) + assert np.isclose(inner_product, 1) + + +@pytest.mark.parametrize("plane", list(Plane)) +def test_local_complement_with_no_edge(zx_graph: ZXGraphState, plane: Plane, rng: np.random.Generator) -> None: + """Test local complement with a graph with a single node and no edges.""" + angle = rng.random() * 2 * np.pi + ref_plane, ref_angle_func = MEAS_ACTION_LC_TARGET[plane] + ref_angle = ref_angle_func(angle) + node = zx_graph.add_physical_node() + zx_graph.assign_meas_basis(node, PlannerMeasBasis(plane, angle)) + + zx_graph.local_complement(node) + assert zx_graph.physical_edges == set() + assert zx_graph.meas_bases[node].plane == ref_plane + assert is_close_angle(zx_graph.meas_bases[node].angle, ref_angle) + + +@pytest.mark.parametrize("planes", plane_combinations(3)) +def test_local_complement_with_minimal_graph( + zx_graph: ZXGraphState, planes: tuple[Plane, Plane, Plane], rng: np.random.Generator +) -> None: + """Test local complement with a minimal graph.""" + for _ in range(3): + zx_graph.add_physical_node() + for i, j in [(0, 1), (1, 2)]: + zx_graph.add_physical_edge(i, j) + angles = [rng.random() * 2 * np.pi for _ in range(3)] + for i in range(3): + zx_graph.assign_meas_basis(i, PlannerMeasBasis(planes[i], angles[i])) + zx_graph.local_complement(1) + ref_plane0, ref_angle_func0 = MEAS_ACTION_LC_NEIGHBORS[planes[0]] + ref_plane1, ref_angle_func1 = MEAS_ACTION_LC_TARGET[planes[1]] + ref_plane2, ref_angle_func2 = MEAS_ACTION_LC_NEIGHBORS[planes[2]] + ref_angle0 = ref_angle_func0(angles[0]) + ref_angle1 = ref_angle_func1(angles[1]) + ref_angle2 = ref_angle_func2(angles[2]) + exp_measurements = [ + (0, PlannerMeasBasis(ref_plane0, ref_angle0)), + (1, PlannerMeasBasis(ref_plane1, ref_angle1)), + (2, PlannerMeasBasis(ref_plane2, ref_angle2)), + ] + _test(zx_graph, exp_nodes={0, 1, 2}, exp_edges={(0, 1), (0, 2), (1, 2)}, exp_measurements=exp_measurements) + + zx_graph.local_complement(1) + ref_plane0, ref_angle_func0 = MEAS_ACTION_LC_NEIGHBORS[ref_plane0] + ref_plane1, ref_angle_func1 = MEAS_ACTION_LC_TARGET[ref_plane1] + ref_plane2, ref_angle_func2 = MEAS_ACTION_LC_NEIGHBORS[ref_plane2] + exp_measurements = [ + (0, PlannerMeasBasis(ref_plane0, ref_angle_func0(ref_angle0))), + (1, PlannerMeasBasis(ref_plane1, ref_angle_func1(ref_angle1))), + (2, PlannerMeasBasis(ref_plane2, ref_angle_func2(ref_angle2))), + ] + _test( + zx_graph, + exp_nodes={0, 1, 2}, + exp_edges={(0, 1), (1, 2)}, + exp_measurements=exp_measurements, + ) + + +@pytest.mark.parametrize(("plane1", "plane3"), plane_combinations(2)) +def test_local_complement_on_output_node( + zx_graph: ZXGraphState, plane1: Plane, plane3: Plane, rng: np.random.Generator +) -> None: + """Test local complement on an output node.""" + _initialize_graph(zx_graph, nodes=range(5), edges={(0, 1), (1, 2), (2, 3), (3, 4)}, inputs=(0, 4), outputs=(2,)) + angle1 = rng.random() * 2 * np.pi + angle3 = rng.random() * 2 * np.pi + measurements = [ + (0, PlannerMeasBasis(Plane.XY, 0.0)), + (1, PlannerMeasBasis(plane1, angle1)), + (3, PlannerMeasBasis(plane3, angle3)), + (4, PlannerMeasBasis(Plane.XY, 0.0)), + ] + _apply_measurements(zx_graph, measurements) + zx_graph.local_complement(2) + + ref_plane1, ref_angle_func1 = MEAS_ACTION_LC_NEIGHBORS[plane1] + ref_plane3, ref_angle_func3 = MEAS_ACTION_LC_NEIGHBORS[plane3] + exp_measurements = [ + (1, PlannerMeasBasis(ref_plane1, ref_angle_func1(measurements[1][1].angle))), + (3, PlannerMeasBasis(ref_plane3, ref_angle_func3(measurements[2][1].angle))), + ] + _test( + zx_graph, + exp_nodes={0, 1, 2, 3, 4}, + exp_edges={(0, 1), (1, 2), (1, 3), (2, 3), (3, 4)}, + exp_measurements=exp_measurements, + ) + assert zx_graph.meas_bases.get(2) is None + + +@pytest.mark.parametrize(("plane0", "plane1"), plane_combinations(2)) +def test_local_complement_with_two_nodes_graph( + zx_graph: ZXGraphState, plane0: Plane, plane1: Plane, rng: np.random.Generator +) -> None: + """Test local complement with a graph with two nodes.""" + zx_graph.add_physical_node() + zx_graph.add_physical_node() + zx_graph.add_physical_edge(0, 1) + angle0 = rng.random() * 2 * np.pi + angle1 = rng.random() * 2 * np.pi + zx_graph.assign_meas_basis(0, PlannerMeasBasis(plane0, angle0)) + zx_graph.assign_meas_basis(1, PlannerMeasBasis(plane1, angle1)) + zx_graph.local_complement(0) + + ref_plane0, ref_angle_func0 = MEAS_ACTION_LC_TARGET[plane0] + ref_plane1, ref_angle_func1 = MEAS_ACTION_LC_NEIGHBORS[plane1] + exp_measurements = [ + (0, PlannerMeasBasis(ref_plane0, ref_angle_func0(angle0))), + (1, PlannerMeasBasis(ref_plane1, ref_angle_func1(angle1))), + ] + _test(zx_graph, exp_nodes={0, 1}, exp_edges={(0, 1)}, exp_measurements=exp_measurements) + + +@pytest.mark.parametrize("planes", plane_combinations(3)) +def test_local_complement_4_times( + zx_graph: ZXGraphState, planes: tuple[Plane, Plane, Plane], rng: np.random.Generator +) -> None: + """Test 4 times local complement returns to the original graph.""" + for _ in range(3): + zx_graph.add_physical_node() + for i, j in [(0, 1), (1, 2)]: + zx_graph.add_physical_edge(i, j) + angles = [rng.random() * 2 * np.pi for _ in range(3)] + for i in range(3): + zx_graph.assign_meas_basis(i, PlannerMeasBasis(planes[i], angles[i])) + + for _ in range(4): + zx_graph.local_complement(1) + + exp_measurements = [(i, PlannerMeasBasis(planes[i], angles[i])) for i in range(3)] + _test(zx_graph, exp_nodes={0, 1, 2}, exp_edges={(0, 1), (1, 2)}, exp_measurements=exp_measurements) + + +def test_local_clifford_expansion() -> None: + graph, flow = generate_random_flow_graph(width=1, depth=3, edge_p=0.5) + zx_graph, _ = to_zx_graphstate(graph) + + pattern = qompile(zx_graph, flow) + sim = PatternSimulator(pattern, backend=SimulatorBackend.StateVector) + sim.simulate() + psi_original = sim.state.state() + + zx_graph_cp = deepcopy(zx_graph) + lc = LocalClifford(2 * np.pi, 2 * np.pi, 2 * np.pi) + zx_graph_cp.apply_local_clifford(0, lc) + lc = LocalClifford(2 * np.pi, 0.0, 0.0) + zx_graph_cp.apply_local_clifford(2, lc) + zx_graph_cp.expand_local_cliffords() + zx_graph_cp.to_xy() + zx_graph_cp.to_xz() + gflow_cp = gflow_wrapper(zx_graph_cp) + pattern_cp = qompile(zx_graph_cp, gflow_cp) + sim_cp = PatternSimulator(pattern_cp, backend=SimulatorBackend.StateVector) + sim_cp.simulate() + psi_cp = sim_cp.state.state() + assert np.isclose(np.abs(np.vdot(psi_original, psi_cp)), 1.0) + + +def test_pivot_fails_with_nonexistent_nodes(zx_graph: ZXGraphState) -> None: + """Test pivot fails with nonexistent nodes.""" + with pytest.raises(ValueError, match="Node does not exist node=0"): + zx_graph.pivot(0, 1) + zx_graph.add_physical_node() + with pytest.raises(ValueError, match="Node does not exist node=1"): + zx_graph.pivot(0, 1) + + +def test_pivot_fails_with_input_node(zx_graph: ZXGraphState) -> None: + """Test pivot fails with input node.""" + node1 = zx_graph.add_physical_node() + node2 = zx_graph.add_physical_node() + zx_graph.register_input(node1, q_index=0) + with pytest.raises(ValueError, match="Cannot apply pivot to input node"): + zx_graph.pivot(node1, node2) + + +def test_pivot_with_obvious_graph(zx_graph: ZXGraphState) -> None: + """Test pivot with an obvious graph.""" + # 0---1---2 -> 0---2---1 + for _ in range(3): + zx_graph.add_physical_node() + + for i, j in [(0, 1), (1, 2)]: + zx_graph.add_physical_edge(i, j) + + measurements = [ + (0, PlannerMeasBasis(Plane.XY, np.pi)), + (1, PlannerMeasBasis(Plane.XZ, 1.4 * np.pi)), + (2, PlannerMeasBasis(Plane.YZ, 0.4 * np.pi)), + ] + _apply_measurements(zx_graph, measurements) + + zx_graph.pivot(1, 2) + assert zx_graph.physical_edges == {(0, 2), (1, 2)} + ref_plane1, ref_angle_func1 = MEAS_ACTION_PV_TARGET[Plane.XZ] + ref_plane2, ref_angle_func2 = MEAS_ACTION_PV_TARGET[Plane.YZ] + assert zx_graph.meas_bases[0].plane == Plane.XY + assert zx_graph.meas_bases[1].plane == ref_plane1 + assert zx_graph.meas_bases[2].plane == ref_plane2 + assert is_close_angle(zx_graph.meas_bases[0].angle, np.pi) + assert is_close_angle(zx_graph.meas_bases[1].angle, ref_angle_func1(1.4 * np.pi)) + assert is_close_angle(zx_graph.meas_bases[2].angle, ref_angle_func2(0.4 * np.pi)) + + +@pytest.mark.parametrize("planes", plane_combinations(5)) +def test_pivot_with_minimal_graph( + zx_graph: ZXGraphState, planes: tuple[Plane, Plane, Plane, Plane, Plane], rng: np.random.Generator +) -> None: + """Test pivot with a minimal graph.""" + # 0---1---2---4 + # \ / + # 3 + for _ in range(5): + zx_graph.add_physical_node() + + for i, j in [(0, 1), (1, 2), (1, 3), (2, 3), (2, 4)]: + zx_graph.add_physical_edge(i, j) + + angles = [rng.random() * 2 * np.pi for _ in range(5)] + measurements = [(i, PlannerMeasBasis(planes[i], angles[i])) for i in range(5)] + _apply_measurements(zx_graph, measurements) + zx_graph_cp = deepcopy(zx_graph) + + zx_graph.pivot(1, 2) + zx_graph_cp.local_complement(1) + zx_graph_cp.local_complement(2) + zx_graph_cp.local_complement(1) + assert zx_graph.physical_edges == zx_graph_cp.physical_edges + assert zx_graph.meas_bases[1].plane == zx_graph_cp.meas_bases[1].plane + assert zx_graph.meas_bases[2].plane == zx_graph_cp.meas_bases[2].plane + + _, ref_angle_func1 = MEAS_ACTION_PV_TARGET[planes[1]] + _, ref_angle_func2 = MEAS_ACTION_PV_TARGET[planes[2]] + _, ref_angle_func3 = MEAS_ACTION_PV_NEIGHBORS[planes[3]] + ref_angle1 = ref_angle_func1(angles[1]) + ref_angle2 = ref_angle_func2(angles[2]) + ref_angle3 = ref_angle_func3(angles[3]) + assert is_close_angle(zx_graph.meas_bases[1].angle, ref_angle1) + assert is_close_angle(zx_graph.meas_bases[2].angle, ref_angle2) + assert is_close_angle(zx_graph.meas_bases[3].angle, ref_angle3) + + +def test_remove_clifford_fails_if_nonexistent_node(zx_graph: ZXGraphState) -> None: + """Test remove_clifford raises an error if the node does not exist.""" + with pytest.raises(ValueError, match="Node does not exist node=0"): + zx_graph.remove_clifford(0) + + +def test_remove_clifford_fails_with_input_node(zx_graph: ZXGraphState) -> None: + node = zx_graph.add_physical_node() + zx_graph.register_input(node, q_index=0) + with pytest.raises(ValueError, match="Clifford node removal not allowed for input or output nodes"): + zx_graph.remove_clifford(node) + + +def test_remove_clifford_fails_with_invalid_plane(zx_graph: ZXGraphState) -> None: + """Test remove_clifford fails if the measurement plane is invalid.""" + zx_graph.add_physical_node() + zx_graph.assign_meas_basis( + 0, + PlannerMeasBasis("test_plane", 0.5 * np.pi), # type: ignore[reportArgumentType, arg-type, unused-ignore] + ) + with pytest.raises(ValueError, match="This node is not a Clifford node"): + zx_graph.remove_clifford(0) + + +def test_remove_clifford_fails_for_non_clifford_node(zx_graph: ZXGraphState) -> None: + zx_graph.add_physical_node() + zx_graph.assign_meas_basis(0, PlannerMeasBasis(Plane.XY, 0.1 * np.pi)) + with pytest.raises(ValueError, match="This node is not a Clifford node"): + zx_graph.remove_clifford(0) + + +def graph_1(zx_graph: ZXGraphState) -> None: + # _is_trivial_meas + # 3---0---1 3 1 + # | -> + # 2 2 + _initialize_graph(zx_graph, nodes=range(4), edges={(0, 1), (0, 2), (0, 3)}) + + +def graph_2(zx_graph: ZXGraphState) -> None: + # _needs_lc + # 0---1---2 -> 0---2 + _initialize_graph(zx_graph, nodes=range(3), edges={(0, 1), (1, 2)}) + + +def graph_3(zx_graph: ZXGraphState) -> None: + # _needs_pivot_1 on (1, 2) + # 3(I) 3(I) + # / \ / | \ + # 0(I) - 1 - 2 - 5 -> 0(I) - 2 5 - 0(I) + # \ / \ | / + # 4(I) 4(I) + _initialize_graph( + zx_graph, nodes=range(6), edges={(0, 1), (1, 2), (1, 3), (1, 4), (2, 3), (2, 4), (2, 5)}, inputs=(0, 3, 4) + ) + + +def _test_remove_clifford( + zx_graph: ZXGraphState, + node: int, + measurements: Measurements, + exp_graph: tuple[set[int], set[tuple[int, int]]], + exp_measurements: Measurements, +) -> None: + _apply_measurements(zx_graph, measurements) + zx_graph.remove_clifford(node) + exp_nodes = exp_graph[0] + exp_edges = exp_graph[1] + _test(zx_graph, exp_nodes, exp_edges, exp_measurements) + + +@pytest.mark.parametrize( + "planes", + list(itertools.product(list(Plane), [Plane.XZ, Plane.YZ], list(Plane))), +) +def test_remove_clifford( + zx_graph: ZXGraphState, + planes: tuple[Plane, Plane, Plane], + rng: np.random.Generator, +) -> None: + graph_2(zx_graph) + angles = [rng.random() * 2 * np.pi for _ in range(3)] + epsilon = 1e-10 + angles[1] = rng.choice([0.0, np.pi, 2 * np.pi - epsilon]) + measurements = [(i, PlannerMeasBasis(planes[i], angles[i])) for i in range(3)] + ref_plane0, ref_angle_func0 = MEAS_ACTION_RC[planes[0]] + ref_plane2, ref_angle_func2 = MEAS_ACTION_RC[planes[2]] + ref_angle0 = ref_angle_func0(angles[1], angles[0]) + ref_angle2 = ref_angle_func2(angles[1], angles[2]) + exp_measurements = [ + (0, PlannerMeasBasis(ref_plane0, ref_angle0)), + (2, PlannerMeasBasis(ref_plane2, ref_angle2)), + ] + _test_remove_clifford( + zx_graph, node=1, measurements=measurements, exp_graph=({0, 2}, set()), exp_measurements=exp_measurements + ) + + +def test_unremovable_clifford_node(zx_graph: ZXGraphState) -> None: + _initialize_graph(zx_graph, nodes=range(3), edges={(0, 1), (1, 2)}, inputs=(0, 2)) + measurements = [ + (0, PlannerMeasBasis(Plane.XY, 0.5 * np.pi)), + (1, PlannerMeasBasis(Plane.XY, np.pi)), + (2, PlannerMeasBasis(Plane.XY, 0.5 * np.pi)), + ] + _apply_measurements(zx_graph, measurements) + with pytest.raises(ValueError, match=r"This Clifford node is unremovable."): + zx_graph.remove_clifford(1) + + +def test_remove_cliffords(zx_graph: ZXGraphState) -> None: + """Test removing multiple Clifford nodes.""" + _initialize_graph(zx_graph, nodes=range(4), edges={(0, 1), (0, 2), (0, 3)}) + measurements = [ + (0, PlannerMeasBasis(Plane.XY, 0.5 * np.pi)), + (1, PlannerMeasBasis(Plane.XY, 0.5 * np.pi)), + (2, PlannerMeasBasis(Plane.XY, 0.5 * np.pi)), + (3, PlannerMeasBasis(Plane.XY, 0.5 * np.pi)), + ] + _apply_measurements(zx_graph, measurements) + zx_graph.remove_cliffords() + _test(zx_graph, {2}, set(), []) + + +def test_remove_cliffords_graph1(zx_graph: ZXGraphState) -> None: + """Test removing multiple Clifford nodes.""" + graph_1(zx_graph) + measurements = [ + (0, PlannerMeasBasis(Plane.YZ, np.pi)), + (1, PlannerMeasBasis(Plane.XY, 0.1 * np.pi)), + (2, PlannerMeasBasis(Plane.XZ, 0.2 * np.pi)), + (3, PlannerMeasBasis(Plane.YZ, 0.3 * np.pi)), + ] + exp_measurements = [ + (1, PlannerMeasBasis(Plane.XY, 1.1 * np.pi)), + (2, PlannerMeasBasis(Plane.XZ, 1.8 * np.pi)), + (3, PlannerMeasBasis(Plane.YZ, 1.7 * np.pi)), + ] + _apply_measurements(zx_graph, measurements) + zx_graph.remove_cliffords() + _test(zx_graph, {1, 2, 3}, set(), exp_measurements=exp_measurements) + + +def test_remove_cliffords_graph2(zx_graph: ZXGraphState) -> None: + graph_2(zx_graph) + measurements = [ + (0, PlannerMeasBasis(Plane.XY, 0.1 * np.pi)), + (1, PlannerMeasBasis(Plane.YZ, 1.5 * np.pi)), + (2, PlannerMeasBasis(Plane.XZ, 0.2 * np.pi)), + ] + exp_measurements = [ + (0, PlannerMeasBasis(Plane.XY, 0.6 * np.pi)), + (2, PlannerMeasBasis(Plane.YZ, 0.2 * np.pi)), + ] + _apply_measurements(zx_graph, measurements) + zx_graph.remove_cliffords() + _test(zx_graph, {0, 2}, {(0, 2)}, exp_measurements=exp_measurements) + + +@pytest.mark.parametrize( + "planes", + list( + itertools.product( + list(Plane), + [Plane.XY], + ) + ), +) +def test_needs_pivot_on_boundary_xy( + zx_graph: ZXGraphState, + planes: tuple[Plane, Plane], + rng: np.random.Generator, +) -> None: + graph_2(zx_graph) + zx_graph.register_input(0, q_index=0) + zx_graph.register_output(2, q_index=0) + angles = [rng.random() * 2 * np.pi for _ in range(2)] + angles[1] = rng.choice([0.0, np.pi]) + measurements = [(i, PlannerMeasBasis(planes[i], angles[i])) for i in range(2)] + _apply_measurements(zx_graph, measurements) + assert zx_graph._needs_pivot_on_boundary(1, atol=1e-9) is True + + +@pytest.mark.parametrize( + "planes", + list( + itertools.product( + list(Plane), + [Plane.XZ], + ) + ), +) +def test_needs_pivot_on_boundary_xz( + zx_graph: ZXGraphState, + planes: tuple[Plane, Plane], + rng: np.random.Generator, +) -> None: + graph_2(zx_graph) + zx_graph.register_input(0, q_index=0) + zx_graph.register_output(2, q_index=0) + angles = [rng.random() * 2 * np.pi for _ in range(2)] + angles[1] = rng.choice([0.5 * np.pi, 1.5 * np.pi]) + measurements = [(i, PlannerMeasBasis(planes[i], angles[i])) for i in range(2)] + _apply_measurements(zx_graph, measurements) + assert zx_graph._needs_pivot_on_boundary(1, atol=1e-9) is True + + +def test_remove_cliffords_graph3(zx_graph: ZXGraphState) -> None: + graph_3(zx_graph) + measurements = [ + (0, PlannerMeasBasis(Plane.XY, 0.1 * np.pi)), + (1, PlannerMeasBasis(Plane.XZ, 1.5 * np.pi)), + (2, PlannerMeasBasis(Plane.XZ, 0.2 * np.pi)), + (3, PlannerMeasBasis(Plane.YZ, 0.3 * np.pi)), + (4, PlannerMeasBasis(Plane.XY, 0.4 * np.pi)), + (5, PlannerMeasBasis(Plane.XZ, 0.5 * np.pi)), + ] + exp_measurements = [ + (0, PlannerMeasBasis(Plane.XY, 0.1 * np.pi)), + (2, PlannerMeasBasis(Plane.XZ, 1.7 * np.pi)), + (3, PlannerMeasBasis(Plane.YZ, 0.3 * np.pi)), + (4, PlannerMeasBasis(Plane.XY, 0.4 * np.pi)), + (5, PlannerMeasBasis(Plane.XZ, 1.5 * np.pi)), + ] + _apply_measurements(zx_graph, measurements) + zx_graph.remove_cliffords() + _test( + zx_graph, + {0, 2, 3, 4, 5}, + {(0, 2), (0, 3), (0, 4), (0, 5), (2, 3), (2, 4), (3, 5), (4, 5)}, + exp_measurements=exp_measurements, + ) + + +def test_random_graph(zx_graph: ZXGraphState) -> None: + """Test removing multiple Clifford nodes from a random graph.""" + random_graph, _ = generate_random_flow_graph(5, 5) + zx_graph, _ = to_zx_graphstate(random_graph) + + for i in zx_graph.physical_nodes - set(zx_graph.output_node_indices): + rng = np.random.default_rng(seed=0) + rnd = rng.random() + if 0 <= rnd < 0.33: + pass + elif 0.33 <= rnd < 0.66: + angle = zx_graph.meas_bases[i].angle + zx_graph.assign_meas_basis(i, PlannerMeasBasis(Plane.XZ, angle)) + else: + angle = zx_graph.meas_bases[i].angle + zx_graph.assign_meas_basis(i, PlannerMeasBasis(Plane.YZ, angle)) + + zx_graph.remove_cliffords() + atol = 1e-9 + nodes = zx_graph.physical_nodes - set(zx_graph.input_node_indices) - set(zx_graph.output_node_indices) + clifford_nodes = [node for node in nodes if zx_graph.is_removable_clifford(node, atol)] + assert clifford_nodes == [] + + +@pytest.mark.parametrize( + ("measurements", "exp_measurements", "exp_edges"), + [ + # no pair of adjacent nodes with YZ measurements + # and no node with XZ measurement + ( + [ + (0, PlannerMeasBasis(Plane.XY, 0.11 * np.pi)), + (1, PlannerMeasBasis(Plane.XY, 0.22 * np.pi)), + (2, PlannerMeasBasis(Plane.XY, 0.33 * np.pi)), + (3, PlannerMeasBasis(Plane.XY, 0.44 * np.pi)), + (4, PlannerMeasBasis(Plane.XY, 0.55 * np.pi)), + (5, PlannerMeasBasis(Plane.XY, 0.66 * np.pi)), + ], + [ + (0, PlannerMeasBasis(Plane.XY, 0.11 * np.pi)), + (1, PlannerMeasBasis(Plane.XY, 0.22 * np.pi)), + (2, PlannerMeasBasis(Plane.XY, 0.33 * np.pi)), + (3, PlannerMeasBasis(Plane.XY, 0.44 * np.pi)), + (4, PlannerMeasBasis(Plane.XY, 0.55 * np.pi)), + (5, PlannerMeasBasis(Plane.XY, 0.66 * np.pi)), + ], + {(0, 1), (1, 2), (1, 3), (1, 4), (2, 3), (2, 4), (2, 5)}, + ), + ], +) +def test_convert_to_phase_gadget( + zx_graph: ZXGraphState, + measurements: Measurements, + exp_measurements: Measurements, + exp_edges: set[tuple[int, int]], +) -> None: + initial_edges = {(0, 1), (1, 2), (1, 3), (1, 4), (2, 3), (2, 4), (2, 5)} + _initialize_graph(zx_graph, nodes=range(6), edges=initial_edges) + _apply_measurements(zx_graph, measurements) + zx_graph.convert_to_phase_gadget() + _test(zx_graph, exp_nodes={0, 1, 2, 3, 4, 5}, exp_edges=exp_edges, exp_measurements=exp_measurements) + + +@pytest.mark.parametrize( + ("initial_edges", "measurements", "exp_measurements", "exp_edges"), + [ + # 3(XY) 3(XY) + # | -> | + # 0(YZ) - 1(XY) - 2(XY) 1(XY) - 2(XY) + ( + {(0, 1), (1, 2), (1, 3)}, + [ + (0, PlannerMeasBasis(Plane.YZ, 0.11 * np.pi)), + (1, PlannerMeasBasis(Plane.XY, 0.22 * np.pi)), + (2, PlannerMeasBasis(Plane.XY, 0.33 * np.pi)), + (3, PlannerMeasBasis(Plane.XY, 0.44 * np.pi)), + ], + [ + (1, PlannerMeasBasis(Plane.XY, 0.33 * np.pi)), + (2, PlannerMeasBasis(Plane.XY, 0.33 * np.pi)), + (3, PlannerMeasBasis(Plane.XY, 0.44 * np.pi)), + ], + {(1, 2), (1, 3)}, + ), + # 3(YZ) 3(YZ) + # | \ -> | \ + # 0(YZ) - 1(XY) - 2(XY) 1(XY) - 2(XY) + ( + {(0, 1), (1, 2), (1, 3), (2, 3)}, + [ + (0, PlannerMeasBasis(Plane.YZ, 0.11 * np.pi)), + (1, PlannerMeasBasis(Plane.XY, 0.22 * np.pi)), + (2, PlannerMeasBasis(Plane.XY, 0.33 * np.pi)), + (3, PlannerMeasBasis(Plane.YZ, 0.44 * np.pi)), + ], + [ + (1, PlannerMeasBasis(Plane.XY, 0.33 * np.pi)), + (2, PlannerMeasBasis(Plane.XY, 0.33 * np.pi)), + (3, PlannerMeasBasis(Plane.YZ, 0.44 * np.pi)), + ], + {(1, 2), (1, 3), (2, 3)}, + ), + ], +) +def test_merge_yz_to_xy( + zx_graph: ZXGraphState, + initial_edges: set[tuple[int, int]], + measurements: Measurements, + exp_measurements: Measurements, + exp_edges: set[tuple[int, int]], +) -> None: + _initialize_graph(zx_graph, nodes=range(4), edges=initial_edges) + _apply_measurements(zx_graph, measurements) + zx_graph.merge_yz_to_xy() + _test(zx_graph, exp_nodes={1, 2, 3}, exp_edges=exp_edges, exp_measurements=exp_measurements) + + +@pytest.mark.parametrize( + ("initial_edges", "measurements", "exp_zxgraph"), + [ + # 3(YZ) 3(YZ) + # / \ / \ + # 0(XY) - 1(XY) - 2(XY) -> 0(XY) - 1(XY) - 2(XY) + # \ / + # 4(YZ) + ( + {(0, 1), (0, 3), (0, 4), (1, 2), (2, 3), (2, 4)}, + [ + (0, PlannerMeasBasis(Plane.XY, 0.11 * np.pi)), + (1, PlannerMeasBasis(Plane.XY, 0.22 * np.pi)), + (2, PlannerMeasBasis(Plane.XY, 0.33 * np.pi)), + (3, PlannerMeasBasis(Plane.YZ, 0.44 * np.pi)), + (4, PlannerMeasBasis(Plane.YZ, 0.55 * np.pi)), + ], + ( + [ + (0, PlannerMeasBasis(Plane.XY, 0.11 * np.pi)), + (1, PlannerMeasBasis(Plane.XY, 0.22 * np.pi)), + (2, PlannerMeasBasis(Plane.XY, 0.33 * np.pi)), + (3, PlannerMeasBasis(Plane.YZ, 0.99 * np.pi)), + ], + {(0, 1), (0, 3), (1, 2), (2, 3)}, + {0, 1, 2, 3}, + ), + ), + # 3(YZ) + # / \ + # 0(XY) - 1(YZ) - 2(XY) -> 0(XY) - 1(YZ) - 2(XY) + # \ / + # 4(YZ) + ( + {(0, 1), (0, 3), (0, 4), (1, 2), (2, 3), (2, 4)}, + [ + (0, PlannerMeasBasis(Plane.XY, 0.11 * np.pi)), + (1, PlannerMeasBasis(Plane.YZ, 0.22 * np.pi)), + (2, PlannerMeasBasis(Plane.XY, 0.33 * np.pi)), + (3, PlannerMeasBasis(Plane.YZ, 0.44 * np.pi)), + (4, PlannerMeasBasis(Plane.YZ, 0.55 * np.pi)), + ], + ( + [ + (0, PlannerMeasBasis(Plane.XY, 0.11 * np.pi)), + (1, PlannerMeasBasis(Plane.YZ, 1.21 * np.pi)), + (2, PlannerMeasBasis(Plane.XY, 0.33 * np.pi)), + ], + {(0, 1), (1, 2)}, + {0, 1, 2}, + ), + ), + # 3(YZ) + # / \ + # 0(XY) - 1(YZ) - 2(XY) - 0(XY) -> 0(XY) - 1(YZ) - 2(XY) - 0(XY) + # \ / + # 4(YZ) + ( + {(0, 1), (0, 2), (0, 3), (0, 4), (1, 2), (2, 3), (2, 4)}, + [ + (0, PlannerMeasBasis(Plane.XY, 0.11 * np.pi)), + (1, PlannerMeasBasis(Plane.YZ, 0.22 * np.pi)), + (2, PlannerMeasBasis(Plane.XY, 0.33 * np.pi)), + (3, PlannerMeasBasis(Plane.YZ, 0.44 * np.pi)), + (4, PlannerMeasBasis(Plane.YZ, 0.55 * np.pi)), + ], + ( + [ + (0, PlannerMeasBasis(Plane.XY, 0.11 * np.pi)), + (1, PlannerMeasBasis(Plane.YZ, 1.21 * np.pi)), + (2, PlannerMeasBasis(Plane.XY, 0.33 * np.pi)), + ], + {(0, 1), (0, 2), (1, 2)}, + {0, 1, 2}, + ), + ), + ], +) +def test_merge_yz_nodes( + zx_graph: ZXGraphState, + initial_edges: set[tuple[int, int]], + measurements: Measurements, + exp_zxgraph: tuple[Measurements, set[tuple[int, int]], set[int]], +) -> None: + _initialize_graph(zx_graph, nodes=range(5), edges=initial_edges) + _apply_measurements(zx_graph, measurements) + zx_graph.merge_yz_nodes() + exp_measurements, exp_edges, exp_nodes = exp_zxgraph + _test(zx_graph, exp_nodes, exp_edges, exp_measurements) + + +@pytest.mark.parametrize( + ("initial_zxgraph", "measurements", "exp_zxgraph"), + [ + # test for a phase gadget: apply merge_yz_to_xy then remove_cliffords + ( + (range(4), {(0, 1), (1, 2), (1, 3)}), + [ + (0, PlannerMeasBasis(Plane.YZ, 0.1 * np.pi)), + (1, PlannerMeasBasis(Plane.XY, 0.4 * np.pi)), + (2, PlannerMeasBasis(Plane.XY, 0.3 * np.pi)), + (3, PlannerMeasBasis(Plane.XY, 0.4 * np.pi)), + ], + ( + [ + (2, PlannerMeasBasis(Plane.XY, 1.8 * np.pi)), + (3, PlannerMeasBasis(Plane.XY, 1.9 * np.pi)), + ], + {(2, 3)}, + {2, 3}, + ), + ), + # apply convert_to_phase_gadget, merge_yz_to_xy, then remove_cliffords + ( + (range(4), {(0, 1), (1, 2), (1, 3)}), + [ + (0, PlannerMeasBasis(Plane.YZ, 0.1 * np.pi)), + (1, PlannerMeasBasis(Plane.XY, 0.9 * np.pi)), + (2, PlannerMeasBasis(Plane.XZ, 0.8 * np.pi)), + (3, PlannerMeasBasis(Plane.XY, 0.4 * np.pi)), + ], + ( + [ + (2, PlannerMeasBasis(Plane.XY, 0.2 * np.pi)), + (3, PlannerMeasBasis(Plane.XY, 0.9 * np.pi)), + ], + {(2, 3)}, + {2, 3}, + ), + ), + # apply remove_cliffords, convert_to_phase_gadget, merge_yz_to_xy, then remove_cliffords + ( + (range(6), {(0, 1), (1, 2), (1, 3), (2, 5), (3, 4)}), + [ + (0, PlannerMeasBasis(Plane.YZ, 0.1 * np.pi)), + (1, PlannerMeasBasis(Plane.XY, 0.9 * np.pi)), + (2, PlannerMeasBasis(Plane.YZ, 1.2 * np.pi)), + (3, PlannerMeasBasis(Plane.XY, 1.4 * np.pi)), + (4, PlannerMeasBasis(Plane.YZ, 1.0 * np.pi)), + (5, PlannerMeasBasis(Plane.XY, 0.5 * np.pi)), + ], + ( + [ + (2, PlannerMeasBasis(Plane.XY, 1.8 * np.pi)), + (3, PlannerMeasBasis(Plane.XY, 0.9 * np.pi)), + ], + {(2, 3)}, + {2, 3}, + ), + ), + ], +) +def test_full_reduce( + zx_graph: ZXGraphState, + initial_zxgraph: tuple[range, set[tuple[int, int]]], + measurements: Measurements, + exp_zxgraph: tuple[Measurements, set[tuple[int, int]], set[int]], +) -> None: + nodes, edges = initial_zxgraph + _initialize_graph(zx_graph, nodes, edges) + exp_measurements, exp_edges, exp_nodes = exp_zxgraph + _apply_measurements(zx_graph, measurements) + zx_graph.full_reduce() + _test(zx_graph, exp_nodes, exp_edges, exp_measurements) + + +if __name__ == "__main__": + pytest.main()