Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
297 changes: 185 additions & 112 deletions cirq-core/cirq/transformers/dynamical_decoupling.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@

from __future__ import annotations

from enum import Enum
from functools import reduce
from itertools import cycle
from typing import TYPE_CHECKING

import numpy as np
from attrs import frozen

from cirq import ops, protocols
from cirq.circuits import Circuit, FrozenCircuit, Moment
Expand Down Expand Up @@ -133,10 +135,6 @@ def _calc_busy_moment_range_of_each_qubit(circuit: FrozenCircuit) -> dict[ops.Qi
return busy_moment_range_by_qubit


def _is_insertable_moment(moment: Moment, single_qubit_gate_moments_only: bool) -> bool:
return not single_qubit_gate_moments_only or _is_single_qubit_gate_moment(moment)


def _merge_single_qubit_ops_to_phxz(
q: ops.Qid, operations: tuple[ops.Operation, ...]
) -> ops.Operation:
Expand All @@ -149,34 +147,162 @@ def _merge_single_qubit_ops_to_phxz(
return gate.on(q)


def _calc_pulled_through(moment: Moment, input_pauli_ops: ops.PauliString) -> ops.PauliString:
"""Calculates the pulled_through such that circuit(input_pauli_ops, moment.clifford_ops) is
equivalent to circuit(moment.clifford_ops, pulled_through).
def _backward_set_stopping_slots(
q: ops.Qid,
from_mid: int,
mergable: dict[ops.Qid, dict[int, bool]],
need_to_stop: dict[ops.Qid, dict[int, bool]],
gate_types: dict[ops.Qid, dict[int, _CellType]],
circuit: FrozenCircuit,
):
"""Sets stopping slots for dynamical decoupling insertion.

This function traverses backward from a given moment `from_mid` for a specific qubit `q`.
It identifies moments where a dynamical decoupling sequence needs to be "stopped".

Args:
q: The qubit for which to set stopping slots.
from_mid: The moment ID to start the backward traversal from.
mergable: A dictionary indicating if a single-qubit Clifford gate at (qubit, moment_id)
can be merged with a Pauli gate.
need_to_stop: A dictionary to mark moments where a DD sequence must be stopped.
gate_types: A dictionary indicating the type of gate at each (qubit, moment_id).
circuit: The original frozen circuit.
"""
clifford_ops_in_moment: list[ops.Operation] = [
op for op in moment.operations if _is_clifford_op(op)
]
return input_pauli_ops.after(clifford_ops_in_moment)
affected_qubits: set[ops.Qid] = {q}
for back_mid in range(from_mid, -1, -1):
for back_q in set(affected_qubits):
if gate_types[back_q][back_mid] == _CellType.WALL:
affected_qubits.remove(back_q)
continue
if mergable[back_q][back_mid]:
need_to_stop[back_q][back_mid] = True
affected_qubits.remove(back_q)
continue
op_at_q = circuit[back_mid].operation_at(back_q) or ops.I(q)
affected_qubits.update(op_at_q.qubits)
if not affected_qubits:
break


class _CellType(Enum):
UNKNOWN = '?'
# Non-insertable gates that cannot be pulled through
WALL = 'w'
# Clifford gates where Pauli Gates can be pulled through
DOOR = 'd'
# An empty gate can be used to insert Pauli gates from the dd sequence
INSERTABLE = 'i'


@frozen
class _Grid:
"""A grid representation of the circuit where each gate position is labeled for
dynamical decoupling.

With this representation, a DD sequence can be automatically navigated in a
forward-only process. This avoids issues where a partially inserted DD
sequence encounters a "wall" and a new moment must be inserted because the
remaining DD sequence cannot be absorbed by nearby gates.

This labeled representation pre-calculates where DD pulses can be inserted
and where leftover DD sequences must be merged, avoiding the need for
backtracking.

An example labeled circuit is shown below:
| 0 | 1 | 2 | 3 | 4 |
-----+-----+-----+-----+-----+-----+
q(0) | d | i | i,s | d | w |
q(1) | d | i | d,s | w | w |
q(2) | d | d | d,s | w | w |
where `w`=WALL, `d`=DOOR, `i`=INSERTABLE. `s` represents a stop gate,
meaning that any unfinished DD sequences must be merged at this gate.
"""

gate_types: dict[ops.Qid, dict[int, _CellType]]
need_to_stop: dict[ops.Qid, dict[int, bool]]
circuit: FrozenCircuit

@classmethod
def from_circuit(
cls, circuit: cirq.FrozenCircuit, single_qubit_gate_moments_only: bool
) -> _Grid:
gate_types: dict[ops.Qid, dict[int, _CellType]] = {
q: {mid: _CellType.UNKNOWN for mid in range(len(circuit))} for q in circuit.all_qubits()
}
mergable: dict[ops.Qid, dict[int, bool]] = {
q: {mid: False for mid in range(len(circuit))} for q in circuit.all_qubits()
}
busy_moment_range_by_qubit = _calc_busy_moment_range_of_each_qubit(circuit)

# Set gate types for each (q, mid)
for mid, moment in enumerate(circuit):
is_insertable_moment = (
not single_qubit_gate_moments_only or _is_single_qubit_gate_moment(moment)
)
for q in circuit.all_qubits():
if mid < busy_moment_range_by_qubit[q][0] or mid > busy_moment_range_by_qubit[q][1]:
gate_types[q][mid] = _CellType.WALL
continue
op_at_q = moment.operation_at(q)
if op_at_q is None:
if is_insertable_moment:
gate_types[q][mid] = _CellType.INSERTABLE
mergable[q][mid] = True
else:
gate_types[q][mid] = _CellType.DOOR
else:
if _is_clifford_op(op_at_q):
gate_types[q][mid] = _CellType.DOOR
mergable[q][mid] = _is_single_qubit_operation(op_at_q)
else:
gate_types[q][mid] = _CellType.WALL

need_to_stop: dict[ops.Qid, dict[int, bool]] = {
q: {mid: False for mid in range(len(circuit))} for q in circuit.all_qubits()
}
# Reversely find the last mergeable gate of each qubit, set them as need_to_stop.
for q in circuit.all_qubits():
_backward_set_stopping_slots(
q, len(circuit) - 1, mergable, need_to_stop, gate_types, circuit
)
# Reversely check for each wall gate, mark the closest mergeable gate as need_to_stop.
for mid in range(len(circuit)):
for q in circuit.all_qubits():
if gate_types[q][mid] == _CellType.WALL:
_backward_set_stopping_slots(
q, mid - 1, mergable, need_to_stop, gate_types, circuit
)
return cls(circuit=circuit, gate_types=gate_types, need_to_stop=need_to_stop)

def __str__(self) -> str:
if not self.gate_types:
return "Grid(empty)"

def _get_stop_qubits(moment: Moment) -> set[ops.Qid]:
stop_pulling_through_qubits: set[ops.Qid] = set()
for op in moment:
if (not _is_clifford_op(op) and not _is_single_qubit_operation(op)) or not has_unitary(
op
): # multi-qubit clifford op or non-mergable op.
stop_pulling_through_qubits.update(op.qubits)
return stop_pulling_through_qubits
qubits = sorted(list(self.gate_types.keys()))
num_moments = len(self.gate_types[qubits[0]])

max_qubit_len = max(len(str(q)) for q in qubits) if qubits else 0

def _need_merge_pulled_through(op_at_q: ops.Operation, is_at_last_busy_moment: bool) -> bool:
"""With a pulling through pauli gate before op_at_q, need to merge with the
pauli in the conditions below."""
# The op must be mergable and single-qubit
if not (_is_single_qubit_operation(op_at_q) and has_unitary(op_at_q)):
return False
# Either non-Clifford or at the last busy moment
return is_at_last_busy_moment or not _is_clifford_op(op_at_q)
header = f"{'':>{max_qubit_len}} |"
for i in range(num_moments):
header += f" {i:^3} |"

separator = f"{'-' * max_qubit_len}-+"
separator += '-----+' * num_moments

lines = ["Grid Repr:", header, separator]

for q in qubits:
row_str = f"{str(q):>{max_qubit_len}} |"
for mid in range(num_moments):
gate_type = self.gate_types[q][mid].value
stop = self.need_to_stop[q][mid]
cell = f"{gate_type},s" if stop else f" {gate_type} "
row_str += f" {cell} |"
lines.append(row_str)

return "\n".join(lines)


@transformer_api.transformer
Expand All @@ -188,7 +314,7 @@ def add_dynamical_decoupling(
single_qubit_gate_moments_only: bool = True,
) -> cirq.Circuit:
"""Adds dynamical decoupling gate operations to a given circuit.
This transformer might add new moments and thus change the structure of the original circuit.
This transformer preserves the structure of the original circuit.

Args:
circuit: Input circuit to transform.
Expand All @@ -202,11 +328,18 @@ def add_dynamical_decoupling(
Returns:
A copy of the input circuit with dynamical decoupling operations.
"""
base_dd_sequence, pauli_map = _parse_dd_sequence(schema)

if context is not None and context.deep:
raise ValueError("Deep transformation is not supported.")

orig_circuit = circuit.freeze()

busy_moment_range_by_qubit = _calc_busy_moment_range_of_each_qubit(orig_circuit)
grid = _Grid.from_circuit(orig_circuit, single_qubit_gate_moments_only)

if context is not None and context.logger is not None:
context.logger.log("Preprocessed input circuit grid repr:\n%s", str(grid))

base_dd_sequence, pauli_map = _parse_dd_sequence(schema)
# Stores all the moments of the output circuit chronologically.
transformed_moments: list[Moment] = []
# A PauliString stores the result of 'pulling' Pauli gates past each operations
Expand All @@ -215,90 +348,30 @@ def add_dynamical_decoupling(
# Iterator of gate to be used in dd sequence for each qubit.
dd_iter_by_qubits = {q: cycle(base_dd_sequence) for q in circuit.all_qubits()}

def _update_pulled_through(q: ops.Qid, insert_gate: ops.Gate) -> ops.Operation:
nonlocal pulled_through, pauli_map
pulled_through *= pauli_map[insert_gate].on(q)
return insert_gate.on(q)

# Insert and pull remaining Pauli ops through the whole circuit.
# General ideas are
# * Pull through Clifford gates.
# * Stop at multi-qubit non-Clifford ops (and other non-mergable ops).
# * Merge to single-qubit non-Clifford ops.
# * Insert a new moment if necessary.
# After pulling through pulled_through at `moment`, we expect a transformation of
# (pulled_through, moment) -> (updated_moment, updated_pulled_through) or
# (pulled_through, moment) -> (new_moment, updated_moment, updated_pulled_through)
# Moments structure changes are split into 3 steps:
# 1, (..., last_moment, pulled_through1, moment, ...)
# -> (..., last_moment, new_moment or None, pulled_through2, moment, ...)
# 2, (..., pulled_through2, moment, ...) -> (..., pulled_through3, updated_moment, ...)
# 3, (..., pulled_through3, updated_moment, ...)
# -> (..., updated_moment, pulled_through4, ...)
for moment_id, moment in enumerate(orig_circuit.moments):
# Step 1, insert new_moment if necessary.
# In detail: stop pulling through for multi-qubit non-Clifford ops or gates without
# unitary representation (e.g., measure gates). If there are remaining pulled through ops,
# insert into a new moment before current moment.
stop_pulling_through_qubits: set[ops.Qid] = _get_stop_qubits(moment)
new_moment_ops: list[ops.Operation] = []
for q in stop_pulling_through_qubits:
# Insert the remaining pulled_through
remaining_pulled_through_gate = pulled_through.get(q)
if remaining_pulled_through_gate is not None:
new_moment_ops.append(_update_pulled_through(q, remaining_pulled_through_gate))
# Reset dd sequence
dd_iter_by_qubits[q] = cycle(base_dd_sequence)
# Need to insert a new moment before current moment
if new_moment_ops:
# Fill insertable idle moments in the new moment using dd sequence
for q in orig_circuit.all_qubits() - stop_pulling_through_qubits:
if busy_moment_range_by_qubit[q][0] < moment_id <= busy_moment_range_by_qubit[q][1]:
new_moment_ops.append(_update_pulled_through(q, next(dd_iter_by_qubits[q])))
transformed_moments.append(Moment(new_moment_ops))

# Step 2, calc updated_moment with insertions / merges.
updated_moment_ops: set[cirq.Operation] = set()
for q in orig_circuit.all_qubits():
op_at_q = moment.operation_at(q)
remaining_pulled_through_gate = pulled_through.get(q)
updated_op = op_at_q
if op_at_q is None: # insert into idle op
if not _is_insertable_moment(moment, single_qubit_gate_moments_only):
continue
if (
busy_moment_range_by_qubit[q][0] < moment_id < busy_moment_range_by_qubit[q][1]
): # insert next pauli gate in the dd sequence
updated_op = _update_pulled_through(q, next(dd_iter_by_qubits[q]))
elif ( # insert the remaining pulled through if beyond the ending busy moment
moment_id > busy_moment_range_by_qubit[q][1]
and remaining_pulled_through_gate is not None
):
updated_op = _update_pulled_through(q, remaining_pulled_through_gate)
elif (
remaining_pulled_through_gate is not None
): # merge pulled-through of q to op_at_q if needed
if _need_merge_pulled_through(
op_at_q, moment_id == busy_moment_range_by_qubit[q][1]
):
remaining_op = _update_pulled_through(q, remaining_pulled_through_gate)
updated_op = _merge_single_qubit_ops_to_phxz(q, (remaining_op, op_at_q))
if updated_op is not None:
updated_moment_ops.add(updated_op)

if updated_moment_ops:
updated_moment = Moment(updated_moment_ops)
transformed_moments.append(updated_moment)

# Step 3, update pulled through.
# In detail: pulling current `pulled_through` through updated_moment.
pulled_through = _calc_pulled_through(updated_moment, pulled_through)

# Insert a new moment if there are remaining pulled-through operations.
ending_moment_ops = []
for affected_q, combined_op_in_pauli in pulled_through.items():
ending_moment_ops.append(combined_op_in_pauli.on(affected_q))
if ending_moment_ops:
transformed_moments.append(Moment(ending_moment_ops))
new_op_at_q = moment.operation_at(q)
if grid.gate_types[q][moment_id] == _CellType.INSERTABLE:
new_gate = next(dd_iter_by_qubits[q])
new_op_at_q = new_gate.on(q)
pulled_through *= pauli_map[new_gate].on(q)
if grid.need_to_stop[q][moment_id]:
to_be_merged = pulled_through.get(q)
if to_be_merged is not None:
new_op_at_q = _merge_single_qubit_ops_to_phxz(
q, (to_be_merged.on(q), new_op_at_q or ops.I(q))
)
pulled_through *= to_be_merged.on(q)
if new_op_at_q is not None:
updated_moment_ops.add(new_op_at_q)

updated_moment = Moment(updated_moment_ops)
clifford_ops = [op for op in updated_moment if _is_clifford_op(op)]
pulled_through = pulled_through.after(clifford_ops)
transformed_moments.append(updated_moment)

if len(pulled_through) > 0:
raise RuntimeError("Expect empty remaining Paulis after the dd insertion.")

return Circuit.from_moments(*transformed_moments)
Loading