"""
crystal_orientation.py
======================
Stand-alone module for crystallographic orientation mathematics.
All definitions are extracted (by reference, not moved) from
``upxo.pxtal.mcgs3_temporal_slice`` so that the original class is
unchanged. Callers should import from here rather than reaching into
the temporal-slice module.
Public API
----------
Constants
FCC_TEXTURE_COMPONENTS – dict {name: (phi1, Phi, phi2)} in degrees (Bunge)
CUBIC_SYMM_OPS_CACHE – module-level cache for 24 cubic SO(3) operators
Primitive rotation helpers (pure functions, no class needed)
Rz(a) – 3x3 rotation about Z axis (angle in radians)
Rx(a) – 3x3 rotation about X axis (angle in radians)
euler_bunge_to_matrix(phi1, Phi, phi2, degrees=True) – single Bunge ZXZ R
euler_bunge_to_matrix_batch(phi1, Phi, phi2, ...) – vectorized version
matrix_to_euler_bunge(R, degrees=True) – R -> (phi1, Phi, phi2)
axis_angle_to_R(axis, angle_rad) – Rodrigues formula
normalize_euler_bunge(ea, degrees=True) – canonical Euler range
proj_to_so3(R) – SVD-project to SO(3)
unique_rotations(rotations, tol=1e-8) – deduplicate R matrices
Symmetry helpers
cubic_symmetry_operators() – list of 24 proper cubic rotation matrices
get_cubic_ops_np() – (24,3,3) ndarray (cached)
fcc_symmetrise_ori(bea) – symmetric equivalents of one orientation
rand_unit_vector(rng) – uniform S2 sample
rand_uniform_SO3(rng) – uniform SO(3) sample
Misorientation
cubic_rotation_angle(R) – disorientation angle from single dR
cubic_rotation_angle_batch(rstack) – vectorized version
cubic_rotation_axis(R, angle) – axis for a single dR
cubic_rotation_axis_batch(rstack, angles) – vectorized version
cubic_misorientation(EA1, EA2, ...) – fast vectorized cubic misorientation
cubic_misorientation_scalar(EA1, EA2, ...) – scalar (loop) reference version
High-level utilities
grain_boundary_misorientation_distribution(euler_array, pairs, ...)
Compute the GBMD (misorientation angle distribution) for a list of
grain-boundary pairs given per-grain Bunge Euler angles.
gbmd_from_lfi(lfi, euler_array, connectivity=4, ...)
Convenience wrapper: auto-detects grain boundary pairs from a
label-field image and returns the GBMD.
ipf_color(euler_deg, sample_direction=[0,0,1])
IPF colour (R,G,B) tuple.
Special orientation relationships
get_ks_rotations() – 24 Kurdjumov-Sachs BCC variant matrices
Usage example
-------------
>>> from upxo.xtalphy.crystal_orientation import (
... cubic_misorientation,
... grain_boundary_misorientation_distribution,
... gbmd_from_lfi,
... FCC_TEXTURE_COMPONENTS,
... )
>>> angle, axis, top3 = cubic_misorientation([0, 0, 0], [45, 0, 0])
>>> print(f"Misorientation: {angle:.2f} deg, axis: {axis}")
"""
from __future__ import annotations
import numpy as np
from itertools import permutations, product
from scipy.spatial.transform import Rotation
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
FCC_TEXTURE_COMPONENTS: dict[str, tuple[float, float, float]] = {
"copper": (90.0, 35.0, 45.0), # {112}<111> Rolling
"brass": (35.0, 45.0, 0.0), # {110}<112> Rolling
"s": (59.0, 37.0, 63.0), # {123}<634> Rolling
"goss": (90.0, 90.0, 45.0), # {110}<001> Rolling
"cube": (0.0, 0.0, 0.0), # {001}<100> Annealing/RX
"rotated_cube": (45.0, 0.0, 0.0), # {001}<110> Annealing/RX
"P": (90.0, 45.0, 0.0), # {011}<122> Annealing/RX
"A1": (35.0, 45.0, 90.0), # {111}<110> Shear
"A2": (55.0, 90.0, 45.0), # {111}<112> Shear
"B": (45.0, 90.0, 45.0), # {112}<110> Shear
"C": (0.0, 90.0, 45.0), # {001}<110> Shear
"Q": (35.0, 55.0, 45.0), # {013}<231> Minor
"D": (59.0, 37.0, 26.0), # {4411}<1118> Minor
}
# Module-level cache for the cubic symmetry operators (numpy array)
CUBIC_SYMM_OPS_CACHE: np.ndarray | None = None
# ---------------------------------------------------------------------------
# Primitive elementary rotation matrices
# ---------------------------------------------------------------------------
[docs]
def Rz(a: float) -> np.ndarray:
"""3x3 rotation matrix about Z axis. *a* in radians."""
c, s = np.cos(a), np.sin(a)
return np.array([[ c, -s, 0.],
[ s, c, 0.],
[0., 0., 1.]])
[docs]
def Rx(a: float) -> np.ndarray:
"""3x3 rotation matrix about X axis. *a* in radians."""
c, s = np.cos(a), np.sin(a)
return np.array([[1., 0., 0.],
[0., c, -s],
[0., s, c]])
# ---------------------------------------------------------------------------
# Euler ↔ rotation matrix
# ---------------------------------------------------------------------------
[docs]
def euler_bunge_to_matrix(phi1: float, Phi: float, phi2: float,
degrees: bool = True) -> np.ndarray:
"""
Bunge ZXZ convention: R = Rz(phi1) · Rx(Phi) · Rz(phi2).
Parameters
----------
phi1, Phi, phi2 : float
Bunge Euler angles.
degrees : bool
If True, inputs are in degrees. Default True.
Returns
-------
R : ndarray, shape (3, 3)
"""
if degrees:
phi1, Phi, phi2 = np.deg2rad([phi1, Phi, phi2])
return Rz(phi1) @ Rx(Phi) @ Rz(phi2)
[docs]
def euler_bunge_to_matrix_batch(phi1: np.ndarray,
Phi: np.ndarray,
phi2: np.ndarray,
degrees: bool = True,
dtype=np.float64) -> np.ndarray:
"""
Vectorized Bunge ZXZ rotation matrix from arrays of Euler angles.
Parameters
----------
phi1, Phi, phi2 : array-like, shape (N,)
Bunge Euler angles.
degrees : bool
Default True.
dtype : numpy dtype
Output dtype.
Returns
-------
R : ndarray, shape (N, 3, 3)
"""
phi1 = np.asarray(phi1, dtype=dtype)
Phi = np.asarray(Phi, dtype=dtype)
phi2 = np.asarray(phi2, dtype=dtype)
if degrees:
phi1, Phi, phi2 = np.deg2rad(phi1), np.deg2rad(Phi), np.deg2rad(phi2)
c1, s1 = np.cos(phi1), np.sin(phi1)
c, s = np.cos(Phi), np.sin(Phi)
c2, s2 = np.cos(phi2), np.sin(phi2)
N = phi1.shape[0]
R = np.empty((N, 3, 3), dtype=dtype)
R[:, 0, 0] = c1*c2 - s1*s2*c
R[:, 0, 1] = -c1*s2 - s1*c2*c
R[:, 0, 2] = s1*s
R[:, 1, 0] = s1*c2 + c1*s2*c
R[:, 1, 1] = -s1*s2 + c1*c2*c
R[:, 1, 2] = -c1*s
R[:, 2, 0] = s2*s
R[:, 2, 1] = c2*s
R[:, 2, 2] = c
return R
[docs]
def matrix_to_euler_bunge(R: np.ndarray,
degrees: bool = True) -> tuple[float, float, float]:
"""
Convert a (3×3) rotation matrix to Bunge ZXZ Euler angles.
Returns
-------
(phi1, Phi, phi2) : tuple of float
In degrees (default) or radians.
"""
R = np.asarray(R, dtype=float)
c = np.clip(R[2, 2], -1.0, 1.0)
Phi_rad = np.arccos(c)
if abs(Phi_rad) < 1e-12:
phi1_rad = np.arctan2(R[1, 0], R[0, 0])
phi2_rad = 0.0
elif abs(Phi_rad - np.pi) < 1e-12:
phi1_rad = np.arctan2(R[1, 2], R[0, 2])
phi2_rad = 0.0
else:
phi1_rad = np.arctan2(R[2, 0], -R[2, 1])
phi2_rad = np.arctan2(R[0, 2], R[1, 2])
if degrees:
return (float(np.degrees(phi1_rad) % 360.0),
float(np.degrees(Phi_rad)),
float(np.degrees(phi2_rad) % 360.0))
return (float(phi1_rad % (2*np.pi)), float(Phi_rad), float(phi2_rad % (2*np.pi)))
[docs]
def normalize_euler_bunge(ea: np.ndarray,
degrees: bool = True,
eps: float = 1e-6) -> np.ndarray:
"""
Normalize Bunge ZXZ Euler angles to canonical ranges:
phi1, phi2 in [0, 360°) Phi in [0, 180°]
Parameters
----------
ea : array-like, shape (..., 3) or (3,)
degrees : bool
eps : float
Snap tolerance near 0 and 180.
Returns
-------
Normalized array, same shape as input.
"""
A = np.asarray(ea, dtype=float)
A2 = np.atleast_2d(A)
phi1, Phi, phi2 = A2[:, 0].copy(), A2[:, 1].copy(), A2[:, 2].copy()
two_pi, pi = (360.0, 180.0) if degrees else (2*np.pi, np.pi)
phi1[:] = np.mod(phi1, two_pi)
phi2[:] = np.mod(phi2, two_pi)
Phi[:] = ((Phi + pi) % (2*pi)) - pi
neg = Phi < 0.0
if np.any(neg):
Phi[neg] = -Phi[neg]
phi1[neg] = np.mod(phi1[neg] + pi, two_pi)
phi2[neg] = np.mod(phi2[neg] + pi, two_pi)
over = Phi > pi
if np.any(over):
Phi[over] = 2*pi - Phi[over]
phi1[over] = np.mod(phi1[over] + pi, two_pi)
phi2[over] = np.mod(phi2[over] + pi, two_pi)
if eps is not None:
Phi[np.abs(Phi) < eps] = 0.0
Phi[np.abs(Phi - pi) < eps] = pi
out = np.stack([phi1, Phi, phi2], axis=-1)
return out if A2.shape[0] > 1 else out[0]
[docs]
def axis_angle_to_R(axis: np.ndarray, angle_rad: float) -> np.ndarray:
"""
Rodrigues' rotation formula: axis-angle → 3×3 rotation matrix.
Parameters
----------
axis : array-like, shape (3,)
angle_rad : float
Returns
-------
R : ndarray, shape (3, 3)
"""
ax = np.asarray(axis, dtype=float)
n = np.linalg.norm(ax)
if n < 1e-15 or abs(angle_rad) < 1e-15:
return np.eye(3)
u = ax / n
ux, uy, uz = u
K = np.array([[ 0, -uz, uy],
[ uz, 0, -ux],
[-uy, ux, 0]], dtype=float)
return np.eye(3) + np.sin(angle_rad)*K + (1.0 - np.cos(angle_rad))*(K @ K)
[docs]
def proj_to_so3(R: np.ndarray) -> np.ndarray:
"""Project *R* to the nearest proper rotation matrix via SVD."""
U, _, Vt = np.linalg.svd(R)
Rn = U @ Vt
if np.linalg.det(Rn) < 0:
U[:, -1] *= -1
Rn = U @ Vt
return Rn
[docs]
def unique_rotations(rotations: list[np.ndarray],
tol: float = 1e-8) -> list[np.ndarray]:
"""Return deduplicated list of rotation matrices (Frobenius norm criterion)."""
uniq: list[np.ndarray] = []
for R in rotations:
if not any(np.linalg.norm(R - Q, ord='fro') < tol for Q in uniq):
uniq.append(R)
return uniq
# ---------------------------------------------------------------------------
# Symmetry operators
# ---------------------------------------------------------------------------
[docs]
def cubic_symmetry_operators() -> list[np.ndarray]:
"""
Return the 24 proper rotation matrices for cubic m-3m symmetry.
Uses signed-permutation matrices with determinant +1.
Returns
-------
list of ndarray, each shape (3, 3)
"""
ops: list[np.ndarray] = []
for p in permutations(range(3)):
P = np.eye(3)[list(p)]
for signs in product([-1, 1], repeat=3):
S = P * np.array(signs)[None, :]
if round(np.linalg.det(S)) == 1:
ops.append(S.astype(float))
return unique_rotations(ops)
[docs]
def get_cubic_ops_np() -> np.ndarray:
"""
Return the 24 cubic symmetry operators as a cached (24, 3, 3) ndarray.
"""
global CUBIC_SYMM_OPS_CACHE
if CUBIC_SYMM_OPS_CACHE is None:
CUBIC_SYMM_OPS_CACHE = np.asarray(cubic_symmetry_operators(),
dtype=float)
return CUBIC_SYMM_OPS_CACHE
[docs]
def fcc_symmetrise_ori(bea: tuple[float, float, float],
dtype=np.float32) -> np.ndarray:
"""
Generate all symmetrically equivalent Bunge Euler angle triplets for
one FCC orientation.
Parameters
----------
bea : (phi1, Phi, phi2) in degrees
dtype : numpy dtype for output
Returns
-------
ndarray, shape (M, 3), M ≤ 24
"""
g = euler_bunge_to_matrix(*bea, degrees=True)
sym_ops = cubic_symmetry_operators()
eq_mats = [proj_to_so3(S @ g) for S in sym_ops]
eq_mats = unique_rotations(eq_mats, tol=1e-8)
return np.array([list(matrix_to_euler_bunge(R, degrees=True))
for R in eq_mats], dtype=dtype)
# ---------------------------------------------------------------------------
# Random orientation sampling
# ---------------------------------------------------------------------------
[docs]
def rand_unit_vector(rng) -> np.ndarray:
"""Uniform random unit vector on S². *rng* must expose ``.gauss(mu, s)``."""
v = np.array([rng.gauss(0, 1), rng.gauss(0, 1), rng.gauss(0, 1)], dtype=float)
n = np.linalg.norm(v)
return v / n if n >= 1e-15 else np.array([1., 0., 0.])
# ---------------------------------------------------------------------------
# Rotation angle / axis (scalar and batch)
# ---------------------------------------------------------------------------
[docs]
def cubic_rotation_angle(R: np.ndarray) -> float:
"""Disorientation angle (radians) of a proper rotation matrix *R*."""
x = np.clip((np.trace(R) - 1.0) / 2.0, -1.0, 1.0)
return float(np.arccos(x))
[docs]
def cubic_rotation_angle_batch(rstack: np.ndarray) -> np.ndarray:
"""
Vectorized rotation angle for a stack of matrices.
Parameters
----------
rstack : ndarray, shape (..., 3, 3)
Returns
-------
angles : ndarray, same leading shape as *rstack* minus last two dims
"""
x = np.clip((np.trace(rstack, axis1=-2, axis2=-1) - 1.0) / 2.0, -1.0, 1.0)
return np.arccos(x)
[docs]
def cubic_rotation_axis(R: np.ndarray, angle: float) -> np.ndarray:
"""
Return unit rotation axis for a proper rotation *R* given its angle (rad).
Falls back to [1,0,0] for near-zero angles.
"""
if angle < 1e-8:
return np.array([1., 0., 0.])
A = (R - R.T) / (2.0 * np.sin(angle))
axis = np.array([A[2, 1], A[0, 2], A[1, 0]])
n = np.linalg.norm(axis)
return axis / n if n > 0 else np.array([1., 0., 0.])
[docs]
def cubic_rotation_axis_batch(rstack: np.ndarray,
angles: np.ndarray) -> np.ndarray:
"""
Vectorized rotation axis for a stack of rotation matrices.
Parameters
----------
rstack : ndarray, shape (N, 3, 3)
angles : ndarray, shape (N,) in radians
Returns
-------
axes : ndarray, shape (N, 3)
"""
N = rstack.shape[0]
axes = np.empty((N, 3), dtype=rstack.dtype)
small = np.abs(angles) < 1e-8
axes[small] = np.array([1., 0., 0.])
if np.any(~small):
R_big = rstack[~small]
ang_big = angles[~small]
denom = 2.0 * np.sin(ang_big)
A = (R_big - np.transpose(R_big, (0, 2, 1))) / denom[:, None, None]
ax_big = np.stack([A[:, 2, 1], A[:, 0, 2], A[:, 1, 0]], axis=1)
norms = np.linalg.norm(ax_big, axis=1, keepdims=True)
norms = np.where(norms > 0, norms, 1.0)
axes[~small] = ax_big / norms
return axes
# ---------------------------------------------------------------------------
# Misorientation (fast vectorized + scalar reference)
# ---------------------------------------------------------------------------
[docs]
def cubic_misorientation(EA1, EA2,
unique_tol_deg: float = 1e-4,
degrees: bool = True
) -> tuple[float, np.ndarray, list[float]]:
"""
Vectorized cubic (m-3m) misorientation between two orientations.
Parameters
----------
EA1, EA2 : array-like
Each can be a Bunge Euler triplet (phi1, Phi, phi2) **or** a
(3×3) rotation matrix.
unique_tol_deg : float
Tolerance (degrees) for deduplicating the top-3 angles.
degrees : bool
Applies to Euler inputs only. Default True.
Returns
-------
angle_deg_min : float
Minimum (fundamental) misorientation angle in degrees.
axis_min : ndarray, shape (3,)
Corresponding rotation axis (sample frame).
top3_angles_deg : list[float]
Up to 3 smallest unique misorientation angles (degrees), ascending.
"""
def _as_R(x):
""" as r."""
arr = np.asarray(x, dtype=float)
if arr.shape == (3, 3):
return arr
phi1, Phi, phi2 = float(arr[0]), float(arr[1]), float(arr[2])
return euler_bunge_to_matrix(phi1, Phi, phi2, degrees=degrees)
gA = _as_R(EA1)
gB = _as_R(EA2)
S = get_cubic_ops_np() # (24, 3, 3)
RA = S @ gA # (24, 3, 3)
RB = S @ gB
RtA = np.transpose(RA, (0, 2, 1))
dR = RB[:, None, :, :] @ RtA[None, :, :, :] # (24, 24, 3, 3)
tr = dR[..., 0, 0] + dR[..., 1, 1] + dR[..., 2, 2]
x = np.clip((tr - 1.0) * 0.5, -1.0, 1.0)
ang = np.arccos(x) # (24, 24) radians
idx_flat = int(np.argmin(ang))
l_min, k_min = divmod(idx_flat, ang.shape[1])
best_angle_rad = float(ang[l_min, k_min])
dR_min = dR[l_min, k_min]
if best_angle_rad < 1e-12:
axis_min = np.array([1., 0., 0.])
else:
sin_a = np.sin(best_angle_rad)
A = (dR_min - dR_min.T) / (2.0 * sin_a)
axis_min = np.array([A[2, 1], A[0, 2], A[1, 0]])
n = np.linalg.norm(axis_min)
axis_min = axis_min / n if n > 0 else np.array([1., 0., 0.])
angles_deg_flat = np.degrees(np.sort(ang, axis=None))
top3: list[float] = []
for a in angles_deg_flat:
if not top3 or abs(a - top3[-1]) > unique_tol_deg:
top3.append(float(a))
if len(top3) >= 3:
break
return float(np.degrees(best_angle_rad)), axis_min, top3
[docs]
def cubic_misorientation_scalar(EA1, EA2,
unique_tol_deg: float = 1e-4,
degrees: bool = True
) -> tuple[float, np.ndarray, list[float]]:
"""
Scalar (double-loop) reference implementation of cubic misorientation.
Slower but more transparent. Same return signature as
``cubic_misorientation``.
"""
def _as_R(x):
""" as r."""
arr = np.asarray(x, dtype=float)
if arr.shape == (3, 3):
return arr
return euler_bunge_to_matrix(float(arr[0]), float(arr[1]),
float(arr[2]), degrees=degrees)
gA = _as_R(EA1)
gB = _as_R(EA2)
sym_ops = cubic_symmetry_operators()
angles: list[float] = []
best_angle = np.inf
best_axis = np.array([1., 0., 0.], dtype=float)
for SA in sym_ops:
RA = SA @ gA
for SB in sym_ops:
dR = (SB @ gB) @ RA.T
ang = cubic_rotation_angle(dR)
angles.append(ang)
if ang < best_angle:
best_angle = ang
best_axis = cubic_rotation_axis(dR, ang)
angles_deg = np.degrees(np.sort(angles))
top3: list[float] = []
for a in angles_deg:
if not top3 or abs(a - top3[-1]) > unique_tol_deg:
top3.append(float(a))
if len(top3) >= 3:
break
return float(np.degrees(best_angle)), best_axis, top3
# ---------------------------------------------------------------------------
# IPF colour
# ---------------------------------------------------------------------------
[docs]
def ipf_color(euler_deg, sample_direction=(0., 0., 1.)) -> tuple[float, float, float]:
"""
Approximate IPF colour for a single cubic orientation.
Parameters
----------
euler_deg : array-like, shape (3,)
Bunge Euler angles in degrees.
sample_direction : array-like, shape (3,)
Sample reference direction. Default [001].
Returns
-------
(r, g, b) : tuple of float in [0, 1]
"""
sd = np.asarray(sample_direction, dtype=float)
R = euler_bunge_to_matrix(*euler_deg, degrees=True)
v = np.abs(R.T @ sd)
v_max = np.max(v)
if v_max > 0:
v /= v_max
return tuple(v)
# ---------------------------------------------------------------------------
# Grain-boundary misorientation distribution
# ---------------------------------------------------------------------------
[docs]
def grain_boundary_misorientation_distribution(
euler_array: np.ndarray,
pairs: np.ndarray,
grain_ids: np.ndarray | None = None,
degrees: bool = True,
n_bins: int = 36,
angle_range: tuple[float, float] = (0.0, 65.0),
) -> dict:
"""
Compute the Grain Boundary Misorientation Distribution (GBMD) for a
set of grain-boundary pairs.
Parameters
----------
euler_array : ndarray, shape (N_grains, 3)
Bunge Euler angles (phi1, Phi, phi2) for each grain.
If *grain_ids* is given, row *i* corresponds to ``grain_ids[i]``.
Otherwise row *i* corresponds to grain ID *i*.
pairs : ndarray, shape (N_pairs, 2), dtype int
Each row [gid_a, gid_b] is one grain-boundary pair.
grain_ids : ndarray, shape (N_grains,) or None
Grain ID labels that index into *euler_array*.
If None, grain IDs are assumed to be 0, 1, ..., N_grains-1.
degrees : bool
If True, *euler_array* is in degrees. Default True.
n_bins : int
Number of histogram bins over *angle_range*. Default 36.
angle_range : (float, float)
(min_deg, max_deg) for the histogram. Default (0, 65).
Returns
-------
result : dict with keys
``'misorientation_angles'`` – ndarray of angle per pair (degrees)
``'misorientation_axes'`` – ndarray, shape (N_pairs, 3)
``'pairs'`` – same as input *pairs*
``'hist_counts'`` – bin counts
``'hist_bin_edges'`` – bin edge values (degrees)
``'hist_bin_centers'`` – bin centre values (degrees)
``'mean_angle'`` – mean misorientation angle (degrees)
``'median_angle'`` – median misorientation angle (degrees)
``'std_angle'`` – std deviation of angles (degrees)
``'n_pairs'`` – total number of pairs processed
Notes
-----
The computation uses the full 24×24 cubic symmetry exhaustive search
(``cubic_misorientation``). For very large datasets (> 10 000 pairs)
this may be slow; consider sub-sampling or parallelising the loop.
Examples
--------
>>> from upxo.xtalphy.crystal_orientation import (
... grain_boundary_misorientation_distribution, gbmd_from_lfi)
# With explicit pairs
>>> ea = np.random.rand(100, 3) * [360, 180, 360] # random orientations
>>> pairs = np.array([[0, 1], [1, 2], [3, 4]])
>>> result = grain_boundary_misorientation_distribution(ea, pairs)
>>> import matplotlib.pyplot as plt
>>> plt.bar(result['hist_bin_centers'], result['hist_counts'],
... width=np.diff(result['hist_bin_edges'])[0])
>>> plt.xlabel('Misorientation angle (°)'); plt.ylabel('Count')
>>> plt.title('GBMD'); plt.show()
"""
euler_array = np.asarray(euler_array, dtype=float)
pairs = np.asarray(pairs, dtype=int)
# Build fast ID → row-index lookup
if grain_ids is None:
id_to_row = {i: i for i in range(len(euler_array))}
else:
grain_ids = np.asarray(grain_ids, dtype=int)
id_to_row = {int(gid): i for i, gid in enumerate(grain_ids)}
angles: list[float] = []
axes: list[np.ndarray] = []
for gid_a, gid_b in pairs:
row_a = id_to_row.get(int(gid_a))
row_b = id_to_row.get(int(gid_b))
if row_a is None or row_b is None:
continue
ea_a = euler_array[row_a]
ea_b = euler_array[row_b]
angle, axis, _ = cubic_misorientation(ea_a, ea_b, degrees=degrees)
angles.append(angle)
axes.append(axis)
angles_arr = np.asarray(angles)
axes_arr = np.asarray(axes) if axes else np.empty((0, 3))
counts, edges = np.histogram(angles_arr, bins=n_bins, range=angle_range)
centers = 0.5 * (edges[:-1] + edges[1:])
return {
'misorientation_angles': angles_arr,
'misorientation_axes': axes_arr,
'pairs': pairs,
'hist_counts': counts,
'hist_bin_edges': edges,
'hist_bin_centers': centers,
'mean_angle': float(np.mean(angles_arr)) if len(angles_arr) else float('nan'),
'median_angle': float(np.median(angles_arr)) if len(angles_arr) else float('nan'),
'std_angle': float(np.std(angles_arr)) if len(angles_arr) else float('nan'),
'n_pairs': len(angles_arr),
}
[docs]
def gbmd_from_lfi(lfi: np.ndarray,
euler_array: np.ndarray,
grain_ids: np.ndarray | None = None,
connectivity: int = 4,
degrees: bool = True,
n_bins: int = 36,
angle_range: tuple[float, float] = (0.0, 65.0),
) -> dict:
"""
Convenience wrapper: detect grain-boundary pairs from a label-field
image *lfi* and compute the GBMD.
Parameters
----------
lfi : ndarray, shape (ny, nx), dtype int
Integer label field. Each unique positive integer is a grain ID.
Pixels with value ≤ 0 are ignored (unindexed).
euler_array : ndarray, shape (N_grains, 3)
Bunge Euler angles for each grain (row-order matches *grain_ids*).
grain_ids : ndarray or None
Grain ID labels corresponding to rows of *euler_array*.
If None, grain IDs are 0 … N_grains-1.
connectivity : {4, 8}
Pixel adjacency for boundary detection. Default 4.
degrees : bool
Default True.
n_bins : int
Default 36.
angle_range : (float, float)
Default (0, 65).
Returns
-------
Same dict as ``grain_boundary_misorientation_distribution`` plus
an extra key ``'gb_pairs'`` containing the deduplicated set of
adjacent grain-ID pairs.
Notes
-----
The pair-detection uses integer-shift neighbour comparison, which is
O(ny·nx) and memory-efficient.
"""
lfi = np.asarray(lfi, dtype=int)
# --- detect all adjacent grain-boundary pairs via pixel shifts ----------
pairs_set: set[tuple[int, int]] = set()
def _add(a_arr, b_arr):
""" add."""
mask = (a_arr > 0) & (b_arr > 0) & (a_arr != b_arr)
for a, b in zip(a_arr[mask].ravel(), b_arr[mask].ravel()):
pair = (int(min(a, b)), int(max(a, b)))
pairs_set.add(pair)
# horizontal neighbour
_add(lfi[:, :-1], lfi[:, 1:])
# vertical neighbour
_add(lfi[:-1, :], lfi[1:, :])
if connectivity == 8:
# diagonals
_add(lfi[:-1, :-1], lfi[1:, 1:])
_add(lfi[:-1, 1:], lfi[1:, :-1])
pairs_arr = np.array(sorted(pairs_set), dtype=int) if pairs_set else np.empty((0, 2), dtype=int)
result = grain_boundary_misorientation_distribution(
euler_array, pairs_arr, grain_ids=grain_ids,
degrees=degrees, n_bins=n_bins, angle_range=angle_range,
)
result['gb_pairs'] = pairs_arr
return result
# ---------------------------------------------------------------------------
# Quaternion utilities
# ---------------------------------------------------------------------------
[docs]
def quat_to_R_batch(q: np.ndarray) -> np.ndarray:
"""
Convert a stack of unit quaternions to rotation matrices.
Parameters
----------
q : ndarray, shape (N, 4)
Unit quaternions in [w, x, y, z] convention.
Returns
-------
R : ndarray, shape (N, 3, 3)
Examples
--------
from upxo.xtalphy.crystal_orientation import quat_to_R_batch
import numpy as np
q = np.array([[1., 0., 0., 0.]]) # identity
quat_to_R_batch(q) # → [[[1,0,0],[0,1,0],[0,0,1]]]
"""
q = np.asarray(q, dtype=np.float64)
w, x, y, z = q[:, 0], q[:, 1], q[:, 2], q[:, 3]
R = np.empty((len(q), 3, 3), dtype=np.float64)
R[:, 0, 0] = 1 - 2*(y*y + z*z); R[:, 0, 1] = 2*(x*y - z*w); R[:, 0, 2] = 2*(x*z + y*w)
R[:, 1, 0] = 2*(x*y + z*w); R[:, 1, 1] = 1 - 2*(x*x + z*z); R[:, 1, 2] = 2*(y*z - x*w)
R[:, 2, 0] = 2*(x*z - y*w); R[:, 2, 1] = 2*(y*z + x*w); R[:, 2, 2] = 1 - 2*(x*x + y*y)
return R
[docs]
def grain_avg_quats(lfi: np.ndarray, quat: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
"""
Compute per-grain average quaternion from a pixel-wise quaternion map.
Handles the cyclic ambiguity of Euler angles by averaging in quaternion
space (enforcing positive-w hemisphere before accumulation).
Parameters
----------
lfi : ndarray, shape (ny, nx), dtype int
Integer grain label field. Positive values are grain IDs;
values ≤ 0 are ignored.
quat : ndarray, shape (ny, nx, 4)
Per-pixel unit quaternions in [w, x, y, z] convention.
Returns
-------
gids : ndarray, shape (N_grains,), dtype int
Sorted array of unique positive grain IDs found in *lfi*.
q_mean : ndarray, shape (N_grains, 4)
Normalised mean quaternion for each grain (row order matches *gids*).
Examples
--------
from upxo.xtalphy.crystal_orientation import grain_avg_quats
gids, q_mean = grain_avg_quats(rg.lfi_ebsd, rg.quat_ebsd)
"""
lfi = np.asarray(lfi, dtype=int)
quat = np.asarray(quat, dtype=np.float64)
gids = np.unique(lfi)
gids = gids[gids > 0]
N_labels = int(gids.max()) + 1
q_flat = quat.reshape(-1, 4)
lfi_flat = lfi.ravel()
# Enforce positive-w hemisphere before averaging
q_flat = np.where(q_flat[:, 0:1] < 0, -q_flat, q_flat)
q_sum = np.zeros((N_labels, 4), dtype=np.float64)
q_count = np.zeros(N_labels, dtype=np.int64)
valid = lfi_flat > 0
np.add.at(q_sum, lfi_flat[valid], q_flat[valid])
np.add.at(q_count, lfi_flat[valid], 1)
q_mean = q_sum[gids] / q_count[gids, None]
norms = np.linalg.norm(q_mean, axis=1, keepdims=True)
q_mean = q_mean / np.where(norms > 0, norms, 1.0)
return gids, q_mean
[docs]
def compute_mdf_from_quats(
lfi: np.ndarray,
quat: np.ndarray,
neigh_gid: dict,
n_bins: int = 65,
angle_range: tuple[float, float] = (0.0, 65.0),
) -> dict:
"""
Compute the grain-boundary Misorientation Distribution Function (MDF)
from a pixel quaternion map and a neighbour-graph dict.
Uses per-grain quaternion averaging (correct for cyclic Euler angles)
and a fully vectorised 24×24 cubic symmetry exhaustive search.
Parameters
----------
lfi : ndarray, shape (ny, nx), dtype int
Integer grain label field.
quat : ndarray, shape (ny, nx, 4)
Per-pixel unit quaternions [w, x, y, z].
neigh_gid : dict
Mapping ``{grain_id: array_of_neighbour_ids}`` as produced by
``EBSDReader.characterise()`` or ``find_neighs2d()``.
n_bins : int
Number of histogram bins. Default 65 (1° per bin over 0–65°).
angle_range : (float, float)
Histogram range in degrees. Default (0, 65).
Returns
-------
result : dict with keys
``'miso_deg'`` – ndarray (N_pairs,) of disorientation angles
``'pairs'`` – ndarray (N_pairs, 2) of grain-ID pairs used
``'hist_counts'`` – bin counts (int)
``'hist_density'`` – probability density (float, integrates to 1)
``'hist_bin_edges'`` – bin edges (degrees)
``'hist_bin_centers'`` – bin centres (degrees)
``'mean_angle'`` – mean disorientation (degrees)
``'std_angle'`` – std deviation (degrees)
``'n_pairs'`` – number of unique boundary pairs
Examples
--------
from upxo.xtalphy.crystal_orientation import compute_mdf_from_quats
mdf = compute_mdf_from_quats(rg.lfi_ebsd, rg.quat_ebsd, rg.neigh_gid_ebsd)
print(mdf['mean_angle'], '°')
"""
# 1. Grain-average quaternions
gids, q_mean = grain_avg_quats(lfi, quat)
N_labels = int(gids.max()) + 1
gid_to_idx = np.zeros(N_labels, dtype=int)
gid_to_idx[gids] = np.arange(len(gids))
# 2. Rotation matrices
R_all = quat_to_R_batch(q_mean) # (N_grains, 3, 3)
# 3. Unique neighbour pairs
pairs_set = set()
for gA_id, neighbours in neigh_gid.items():
for gB_id in np.asarray(neighbours).ravel():
gB_id = int(gB_id)
if gB_id > 0 and int(gA_id) != gB_id:
pairs_set.add((min(int(gA_id), gB_id), max(int(gA_id), gB_id)))
pairs = np.array(sorted(pairs_set), dtype=int) # (P, 2)
# 4. Vectorised disorientation (full 24×24 cubic symmetry)
S = get_cubic_ops_np()
iA = gid_to_idx[pairs[:, 0]]
iB = gid_to_idx[pairs[:, 1]]
RA = R_all[iA] # (P, 3, 3)
RB = R_all[iB]
SA = np.einsum('sij,pjk->psik', S, RA) # (P, 24, 3, 3)
SB = np.einsum('sij,pjk->psik', S, RB)
dR = np.einsum('pmab,pncb->pmnac', SB, SA) # (P, 24, 24, 3, 3)
tr = dR[..., 0, 0] + dR[..., 1, 1] + dR[..., 2, 2]
cos_ang = np.clip((tr - 1.0) * 0.5, -1.0, 1.0)
miso_deg = np.degrees(np.arccos(cos_ang).min(axis=(1, 2))) # (P,)
# 5. Histogram
counts, edges = np.histogram(miso_deg, bins=n_bins, range=angle_range)
centers = 0.5 * (edges[:-1] + edges[1:])
bin_width = edges[1] - edges[0]
density = counts / (counts.sum() * bin_width) if counts.sum() > 0 else counts.astype(float)
return {
'miso_deg': miso_deg,
'pairs': pairs,
'hist_counts': counts,
'hist_density': density,
'hist_bin_edges': edges,
'hist_bin_centers': centers,
'mean_angle': float(np.mean(miso_deg)),
'std_angle': float(np.std(miso_deg)),
'n_pairs': len(pairs),
}
# ---------------------------------------------------------------------------
# MDF peak detection
# ---------------------------------------------------------------------------
#: Default cubic CSL reference angles (degrees)
CUBIC_CSL: dict[str, float] = {
'S3 (twin)': 60.00, # {111}<110>
'S5': 36.87, # {210}<001>
'S7': 38.21, # {211}<111>
'S9': 38.94, # {221}<110>
'S11': 50.48,
'S13b': 27.80,
}
[docs]
def detect_mdf_peaks(
mdf: dict,
prominence: float = 0.002,
distance: int = 3,
csl: dict[str, float] | None = None,
csl_tol: float = 2.0,
bw_method: str | float = 'scott',
n_kde: int = 500,
angle_max: float = 65.0,
) -> dict:
"""
Detect peaks in a pre-computed MDF, match them against CSL angles, and
compute a KDE curve over the raw disorientation angles.
Parameters
----------
mdf : dict
Output of ``compute_mdf_from_quats()``. Must contain keys
``'hist_bin_centers'``, ``'hist_density'``, and ``'miso_deg'``.
prominence : float
Minimum peak prominence for ``scipy.signal.find_peaks``. Default 0.002.
distance : int
Minimum number of bins between two peaks. Default 3.
csl : dict or None
Mapping ``{label: angle_degrees}``. If None, uses ``CUBIC_CSL``.
csl_tol : float
Tolerance in degrees for declaring a peak "near" a CSL. Default 2.0.
bw_method : str or float
Bandwidth method passed to ``scipy.stats.gaussian_kde``. Default ``'scott'``.
n_kde : int
Number of points in the KDE evaluation grid. Default 500.
angle_max : float
Upper end of the KDE evaluation grid (degrees). Default 65.
Returns
-------
result : dict with keys
``'peak_indices'`` – ndarray of bin indices of detected peaks
``'peak_angles'`` – list of float, angle of each detected peak
``'peak_labels'`` – list of human-readable label strings
``'csl_nearest'`` – list of (nearest_label, delta) tuples per peak
``'kde'`` – fitted ``gaussian_kde`` object
``'kde_vals'`` – ndarray (n_kde,) of KDE density values
``'theta_fine'`` – ndarray (n_kde,) evaluation grid (degrees)
``'csl'`` – the CSL dict used
``'csl_tol'`` – the tolerance used
"""
from scipy.signal import find_peaks as _find_peaks
from scipy.stats import gaussian_kde as _gaussian_kde
if csl is None:
csl = CUBIC_CSL
centers = np.asarray(mdf['hist_bin_centers'])
density = np.asarray(mdf['hist_density'])
peak_indices, _ = _find_peaks(density, prominence=prominence, distance=distance)
peak_angles: list[float] = [float(centers[pi]) for pi in peak_indices]
csl_nearest: list[tuple[str, float]] = []
peak_labels: list[str] = []
for pi, angle in zip(peak_indices, peak_angles):
nearest = min(csl, key=lambda k: abs(csl[k] - angle))
delta = angle - csl[nearest]
within = abs(delta) <= csl_tol
csl_nearest.append((nearest, float(delta)))
csl_tag = f' ≈ {nearest} (Δ={delta:+.1f}°)' if within else ''
peak_labels.append(f'{angle:.1f}° (density={density[pi]:.4f}){csl_tag}')
kde = _gaussian_kde(mdf['miso_deg'], bw_method=bw_method)
theta_fine = np.linspace(0, angle_max, n_kde)
kde_vals = kde(theta_fine)
return {
'peak_indices': peak_indices,
'peak_angles': peak_angles,
'peak_labels': peak_labels,
'csl_nearest': csl_nearest,
'kde': kde,
'kde_vals': kde_vals,
'theta_fine': theta_fine,
'csl': csl,
'csl_tol': csl_tol,
}
[docs]
def segregate_csl_pairs(
mdf: dict,
selected_peaks: dict,
csl: dict[str, float] | None = None,
csl_tol: float = 2.0,
) -> dict:
"""
Segregate grain-boundary pairs into CSL categories based on
user-selected MDF peaks.
For every selected peak, the nearest CSL reference angle is found; pairs
whose disorientation lies within *csl_tol* of that reference are collected
under that CSL label.
Parameters
----------
mdf : dict
Output of ``compute_mdf_from_quats()``. Must contain
``'pairs'`` (N,2) and ``'miso_deg'`` (N,).
selected_peaks : dict
Output of ``mdf_peak_selector()``.
Keys: ``'angles'`` (list[float]) and ``'indices'`` (list[int]).
csl : dict or None
``{label: reference_angle_degrees}``. Defaults to ``CUBIC_CSL``.
csl_tol : float
Tolerance in degrees. Pairs with ``|Δθ| ≤ csl_tol`` are included.
Default 2.0.
Returns
-------
result : dict
Keyed by CSL label. Each value is a dict with keys:
``'csl_angle'`` (float), ``'pairs'`` (ndarray M×2),
``'miso_deg'`` (ndarray M), ``'grains_A'``, ``'grains_B'``,
``'grains_all'`` (ndarrays of unique grain IDs).
Notes
-----
A selected peak that maps to the same CSL label as another peak
(within csl_tol) produces only one key in the output.
Pairs can match more than one CSL category if tolerances overlap.
"""
if csl is None:
csl = CUBIC_CSL
pairs = np.asarray(mdf['pairs'], dtype=int)
miso_deg = np.asarray(mdf['miso_deg'], dtype=float)
# ── Collect unique CSL targets from the selected peaks ──────────────────
# Multiple selected peaks may map to the same CSL; deduplicate by label.
csl_targets: dict[str, float] = {}
for angle in selected_peaks['angles']:
nearest = min(csl, key=lambda k: abs(csl[k] - angle))
if abs(angle - csl[nearest]) <= csl_tol:
csl_targets[nearest] = csl[nearest]
# ── Segregate ────────────────────────────────────────────────────────────
result: dict = {}
for label, ref_ang in csl_targets.items():
mask = np.abs(miso_deg - ref_ang) <= csl_tol
sel_pairs = pairs[mask]
sel_miso = miso_deg[mask]
grains_A = np.unique(sel_pairs[:, 0]) if len(sel_pairs) else np.empty(0, int)
grains_B = np.unique(sel_pairs[:, 1]) if len(sel_pairs) else np.empty(0, int)
grains_all = np.unique(sel_pairs) if len(sel_pairs) else np.empty(0, int)
result[label] = {
'csl_angle': ref_ang,
'pairs': sel_pairs,
'miso_deg': sel_miso,
'grains_A': grains_A,
'grains_B': grains_B,
'grains_all': grains_all,
'n_pairs': int(mask.sum()),
'n_grains': int(len(grains_all)),
}
return result
[docs]
def csl_volume_fractions(
lfi: np.ndarray,
csl_grains: dict,
) -> dict:
"""
Compute the area (volume) fraction of the indexed map occupied by grains
that participate in each CSL boundary type.
A grain is counted if it appears in *any* boundary of that CSL type
(i.e. it belongs to ``csl_grains[label]['grains_all']``).
Grains can contribute to multiple CSL categories simultaneously, so
fractions do not necessarily sum to 1.
Parameters
----------
lfi : ndarray, shape (ny, nx)
Integer grain label field. Positive values = grain IDs; ≤0 = unindexed.
csl_grains : dict
Output of ``segregate_csl_pairs()``.
Returns
-------
result : dict keyed by CSL label, each value a dict with
``'n_pixels'`` – int, total pixels occupied by CSL grains
``'vf_indexed'`` – float, fraction of *indexed* pixels
``'vf_total'`` – float, fraction of *all* pixels (including unindexed)
``'csl_angle'`` – float, reference CSL angle (degrees)
``'n_grains'`` – int, number of grains in this CSL type
"""
lfi = np.asarray(lfi, dtype=int)
n_total = lfi.size
n_indexed = int(np.sum(lfi > 0))
result: dict = {}
for label, info in csl_grains.items():
gids = np.asarray(info['grains_all'], dtype=int)
n_pixels = int(np.isin(lfi, gids).sum())
result[label] = {
'n_pixels': n_pixels,
'vf_indexed': n_pixels / n_indexed if n_indexed > 0 else 0.0,
'vf_total': n_pixels / n_total if n_total > 0 else 0.0,
'csl_angle': info['csl_angle'],
'n_grains': info['n_grains'],
}
return result
[docs]
def identify_parent_grains(
csl_grains: dict,
prop: dict,
) -> dict:
"""
For each CSL boundary pair, identify the **parent** (larger) grain and
the **twin / child** (smaller) grain using grain area as the discriminator.
A grain can play both roles across different pairs (twin chain:
A→B→C where B is twin of A but parent of C). Grains are therefore
classified per their *net* role:
* ``pure_parents`` – appear as the larger grain in every one of their pairs
* ``pure_twins`` – appear as the smaller grain in every one of their pairs
* ``intermediates`` – appear as parent in some pairs and twin in others
Parameters
----------
csl_grains : dict
Output of ``segregate_csl_pairs()``.
prop : dict
Grain property dict as returned by ``EBSDReader.characterise()['prop']``
or stored in ``repgen2d.prop_ebsd``.
Must contain key ``'area'`` for each grain.
Returns
-------
result : dict keyed by CSL label, each value a dict with
``'pairs_labeled'`` – list of ``(parent_gid, twin_gid)`` tuples
``'all_parents'`` – ndarray, grains that are parent in ≥1 pair
``'all_twins'`` – ndarray, grains that are twin in ≥1 pair
``'pure_parents'`` – ndarray, grains that are *only ever* a parent
``'pure_twins'`` – ndarray, grains that are *only ever* a twin
``'intermediates'`` – ndarray, grains in both roles (twin chains)
``'n_pure_parents'`` – int
``'n_pure_twins'`` – int
``'n_intermediates'`` – int
``'csl_angle'`` – float
"""
result: dict = {}
for label, info in csl_grains.items():
pairs = info['pairs'] # (N, 2) array of grain-ID pairs
csl_angle = info['csl_angle']
pairs_labeled: list[tuple[int, int]] = []
parent_set: set[int] = set()
twin_set: set[int] = set()
for gA, gB in pairs:
aA = prop.get(int(gA), {}).get('area', 0.0)
aB = prop.get(int(gB), {}).get('area', 0.0)
if aA >= aB:
parent, twin = int(gA), int(gB)
else:
parent, twin = int(gB), int(gA)
pairs_labeled.append((parent, twin))
parent_set.add(parent)
twin_set.add(twin)
all_parents = np.array(sorted(parent_set), dtype=int)
all_twins = np.array(sorted(twin_set), dtype=int)
pure_parents = np.array(sorted(parent_set - twin_set), dtype=int)
pure_twins = np.array(sorted(twin_set - parent_set), dtype=int)
intermediates = np.array(sorted(parent_set & twin_set), dtype=int)
result[label] = {
'pairs_labeled': pairs_labeled,
'all_parents': all_parents,
'all_twins': all_twins,
'pure_parents': pure_parents,
'pure_twins': pure_twins,
'intermediates': intermediates,
'n_pure_parents': len(pure_parents),
'n_pure_twins': len(pure_twins),
'n_intermediates': len(intermediates),
'csl_angle': csl_angle,
}
return result
[docs]
def classify_grain_roles_extended(parent_info: dict) -> dict:
"""
Extend the EBSD parent_info dict with ``'primary_twins'`` and
``'secondary_twins'`` keys, derived from the existing ``'pairs_labeled'``
and ``'intermediates'`` arrays.
Operates entirely on EBSD grain IDs — no MC data involved.
A **primary twin** is a ``pure_twin`` whose direct parent (the larger grain
in its CSL pair) is itself a ``pure_parent``. It is a first-generation twin
with no twins of its own.
A **secondary twin** is a ``pure_twin`` whose direct parent is an
``intermediate`` grain (a grain that is both someone's twin and someone's
parent). It is a twin-of-a-twin — second generation or deeper.
Parameters
----------
parent_info : dict
Output of :func:`identify_parent_grains`. Each value must contain
``'pairs_labeled'``, ``'pure_twins'``, and ``'intermediates'``.
Returns
-------
dict
Same structure as ``parent_info`` with two extra keys per CSL label:
``'primary_twins'`` — ndarray of first-generation twin grain IDs
``'secondary_twins'`` — ndarray of second-generation twin grain IDs
``'n_primary_twins'`` — int
``'n_secondary_twins'`` — int
Notes
-----
``pure_twins ≡ primary_twins ∪ secondary_twins`` (the sets are disjoint and
their union equals the existing ``pure_twins`` array).
"""
result = {}
for label, info in parent_info.items():
intermediates = set(info['intermediates'].tolist())
pure_twins = set(info['pure_twins'].tolist())
twin_to_parent = {twin: parent
for parent, twin in info['pairs_labeled']}
primary_twins = np.array(sorted(
gid for gid in pure_twins
if twin_to_parent.get(gid) not in intermediates
), dtype=int)
secondary_twins = np.array(sorted(
gid for gid in pure_twins
if twin_to_parent.get(gid) in intermediates
), dtype=int)
result[label] = {
**info,
'primary_twins': primary_twins,
'secondary_twins': secondary_twins,
'n_primary_twins': len(primary_twins),
'n_secondary_twins': len(secondary_twins),
}
return result
[docs]
def compute_grain_role_ratios(
lfi: 'np.ndarray',
parent_info: dict,
) -> dict:
"""
Compute the ratio of each grain role (pure parent, pure twin,
intermediate, non-role) to the total number of indexed grains,
using the same priority rules as ``plot_grain_role_property_stats``
(intermediate > parent > twin).
Parameters
----------
lfi : ndarray, shape (ny, nx)
Integer grain label field (positive = grain ID).
parent_info : dict
Output of :func:`identify_parent_grains`.
Returns
-------
dict
Keys: ``'total'``, ``'pure_parents'``, ``'pure_twins'``,
``'intermediates'``, ``'non_role'``, ``'all_role'``.
Each value is a sub-dict with ``'count'`` and ``'ratio'``.
"""
all_pp: set = set()
all_pt: set = set()
all_im: set = set()
for info in parent_info.values():
all_pp.update(info['pure_parents'].tolist())
all_pt.update(info['pure_twins'].tolist())
all_im.update(info['intermediates'].tolist())
only_pp_or_pt = (all_pp | all_pt) - all_im
pure_parent_set = only_pp_or_pt & all_pp
pure_twin_set = (only_pp_or_pt & all_pt) - pure_parent_set
intermediate_set = all_im
all_role_set = pure_parent_set | pure_twin_set | intermediate_set
total_gids = set(int(g) for g in np.unique(lfi[lfi > 0]))
non_role_set = total_gids - all_role_set
N = len(total_gids)
def _entry(gset: set) -> dict:
""" entry."""
n = len(gset)
return {'count': n, 'ratio': n / N if N else 0.0, 'grain_ids': gset}
return {
'total': {'count': N, 'ratio': 1.0, 'grain_ids': total_gids},
'pure_parents': _entry(pure_parent_set),
'pure_twins': _entry(pure_twin_set),
'intermediates': _entry(intermediate_set),
'non_role': _entry(non_role_set),
'all_role': _entry(all_role_set),
}
[docs]
def assign_grain_roles_by_ratio(
lfi: 'np.ndarray',
grain_areas: dict,
target_ratios: dict,
) -> dict:
"""
Assign pure-parent / pure-twin / intermediate / non-role labels to grains
in a grain structure so that the resulting role fractions match the
*target_ratios* (e.g. derived from an EBSD dataset via
:func:`compute_grain_role_ratios`).
Grains are sorted by area (descending). The largest grains receive the
*pure parent* label, the next tier *pure twin*, the next *intermediate*,
and the remainder *non-role*. This reflects the physical expectation that
parent grains are typically larger than their twins.
Parameters
----------
lfi : ndarray, shape (ny, nx)
Integer grain label field (positive = grain ID).
grain_areas : dict
``{grain_id: area}`` mapping. Can be built from an EBSD
``prop_ebsd`` dict or from a ``mcgs2_grain_structure.prop``
DataFrame column.
target_ratios : dict
Output of :func:`compute_grain_role_ratios`. The ``'ratio'`` field
of ``'pure_parents'``, ``'pure_twins'``, ``'intermediates'``, and
``'non_role'`` entries is used.
Returns
-------
dict
Same structure as :func:`compute_grain_role_ratios`:
keys ``'total'``, ``'pure_parents'``, ``'pure_twins'``,
``'intermediates'``, ``'non_role'``, ``'all_role'``;
each value has ``'count'``, ``'ratio'``, ``'grain_ids'``.
"""
total_gids = sorted(int(g) for g in np.unique(lfi[lfi > 0]))
N = len(total_gids)
if N == 0:
empty = {'count': 0, 'ratio': 0.0, 'grain_ids': set()}
return {k: empty for k in ('total', 'pure_parents', 'pure_twins',
'intermediates', 'non_role', 'all_role')}
# Sort grain IDs by area descending
sorted_gids = sorted(total_gids,
key=lambda g: grain_areas.get(g, 0.0),
reverse=True)
# Compute target counts from ratios (round, ensure sum = N)
r_pp = target_ratios['pure_parents']['ratio']
r_pt = target_ratios['pure_twins']['ratio']
r_im = target_ratios['intermediates']['ratio']
n_pp = int(round(r_pp * N))
n_pt = int(round(r_pt * N))
n_im = int(round(r_im * N))
# Clamp so totals don't exceed N
n_pp = min(n_pp, N)
n_pt = min(n_pt, N - n_pp)
n_im = min(n_im, N - n_pp - n_pt)
n_nr = N - n_pp - n_pt - n_im
# Slice sorted list
pp_set = set(sorted_gids[:n_pp])
pt_set = set(sorted_gids[n_pp: n_pp + n_pt])
im_set = set(sorted_gids[n_pp + n_pt: n_pp + n_pt + n_im])
nr_set = set(sorted_gids[n_pp + n_pt + n_im:])
all_role_set = pp_set | pt_set | im_set
def _entry(gset: set) -> dict:
""" entry."""
n = len(gset)
return {'count': n, 'ratio': n / N, 'grain_ids': gset}
return {
'total': {'count': N, 'ratio': 1.0, 'grain_ids': set(total_gids)},
'pure_parents': _entry(pp_set),
'pure_twins': _entry(pt_set),
'intermediates': _entry(im_set),
'non_role': _entry(nr_set),
'all_role': _entry(all_role_set),
}
[docs]
def introduce_twins_by_csl(
lfi: 'np.ndarray',
parent_grain_ids: 'set | list',
csl_label: str,
twin_half_width: float = 2.0,
twin_angle_deg: float | None = None,
n_twins_per_parent: int = 1,
angle_perturb_deg: float = 0.0,
width_perturb: float = 0.0,
rng_seed: int | None = None,
) -> dict:
"""
Introduce straight twin lamellae into selected parent grains on a pixel
grid using :class:`~upxo.geoEntities.sline2d.Sline2d`.
For each parent grain a base angle is chosen; all lamellae within that
grain are parallel (same angle) but can receive small independent
perturbations to their angle and half-width to model realistic scatter.
Parameters
----------
lfi : ndarray, shape (ny, nx)
Integer grain label field. **Modified in-place.**
parent_grain_ids : set or list
Grain IDs of the parent grains to twin.
csl_label : str
CSL type label (e.g. ``'Σ3 (twin)'``). Stored in output metadata.
twin_half_width : float
Nominal half-width in pixels of each twin lamella.
twin_angle_deg : float or None
Fixed base inclination angle (degrees, from the x-axis). If ``None``,
a random angle in [0°, 180°) is drawn independently for each parent.
n_twins_per_parent : int
Number of parallel twin lamellae to insert per parent grain.
angle_perturb_deg : float
Maximum angular perturbation (degrees) applied independently to each
lamella angle around the parent's base angle. A uniform random draw
in ``[-angle_perturb_deg, +angle_perturb_deg]`` is used. Set to 0
for strictly parallel lamellae.
width_perturb : float
Maximum half-width perturbation (pixels) applied independently to each
lamella. A uniform random draw in
``[-width_perturb, +width_perturb]`` is added to *twin_half_width*.
The effective half-width is clamped to a minimum of 0.5 px.
rng_seed : int or None
Seed for the random number generator (reproducibility).
Returns
-------
dict
A dict with keys::
{'lfi': ndarray,
'twin_lines': dict[parent_gid -> list[Sline2d]],
'new_twin_gids': dict[parent_gid -> list[new_gid]],
'csl_label': str}
"""
from upxo.geoEntities.sline2d import Sline2d as Sl2d
lfi = np.asarray(lfi, dtype=int)
rng = np.random.default_rng(rng_seed)
next_gid = int(lfi.max()) + 1
twin_lines: dict = {}
new_twin_gids: dict = {}
for parent_gid in parent_grain_ids:
parent_gid = int(parent_gid)
rows, cols = np.where(lfi == parent_gid)
if rows.size == 0:
continue
# Bounding box of parent grain (in pixel coords)
r_min, r_max = rows.min(), rows.max()
c_min, c_max = cols.min(), cols.max()
cx = (c_min + c_max) / 2.0
cy = (r_min + r_max) / 2.0
diag = np.sqrt((r_max - r_min) ** 2 + (c_max - c_min) ** 2) + 1.0
twin_lines[parent_gid] = []
new_twin_gids[parent_gid] = []
# Parent pixel coordinates as (x=col, y=row) for Sline2d
parent_xs = cols.astype(float)
parent_ys = rows.astype(float)
# Base angle for this parent (all lamellae are nominally parallel)
base_angle = (float(twin_angle_deg) if twin_angle_deg is not None
else float(rng.uniform(0.0, 180.0)))
# Build base line to compute normal direction for parallel spacing
base_line = Sl2d.by_LFAL(
location=[cx, cy],
factor=0.5,
angle=base_angle,
length=diag,
degree=True,
)
nv = base_line.normal_vector(ratio=0.0, return_type='sl2d')
nx_unit, ny_unit = nv.x0, nv.y0
norm = np.hypot(nx_unit, ny_unit)
if norm > 0:
nx_unit /= norm
ny_unit /= norm
# Nominal spacing: 2*half_width + 1 px gap
spacing_px = max(2.0 * twin_half_width + 1.0, 4.0)
# Symmetric offsets along the normal
half_n = (n_twins_per_parent - 1) / 2.0
all_lines = []
perturbed_widths = []
for k in range(n_twins_per_parent):
# Per-lamella angle perturbation
da = (rng.uniform(-angle_perturb_deg, angle_perturb_deg)
if angle_perturb_deg > 0.0 else 0.0)
lam_angle = base_angle + da
# Per-lamella width perturbation
dw = (rng.uniform(-width_perturb, width_perturb)
if width_perturb > 0.0 else 0.0)
lam_half_width = max(0.5, twin_half_width + dw)
offset = (k - half_n) * spacing_px
ox = cx + offset * nx_unit
oy = cy + offset * ny_unit
line = Sl2d.by_LFAL(
location=[ox, oy],
factor=0.5,
angle=lam_angle,
length=diag,
degree=True,
)
all_lines.append(line)
perturbed_widths.append(lam_half_width)
# Assign pixels for each lamella; skip already-relabelled pixels
for line, lam_hw in zip(all_lines, perturbed_widths):
twin_lines[parent_gid].append(line)
still_parent = (lfi[rows, cols] == parent_gid)
if not still_parent.any():
continue
sub_xs = parent_xs[still_parent]
sub_ys = parent_ys[still_parent]
sub_rows = rows[still_parent]
sub_cols = cols[still_parent]
twin_pixel_idx = line.find_neigh_point_by_perp_distance(
(sub_xs, sub_ys),
r=lam_hw,
use_bounding_rec=True,
)
if not twin_pixel_idx:
continue
t_rows = sub_rows[twin_pixel_idx]
t_cols = sub_cols[twin_pixel_idx]
lfi[t_rows, t_cols] = next_gid
new_twin_gids[parent_gid].append(next_gid)
next_gid += 1
return {
'lfi': lfi,
'twin_lines': twin_lines,
'new_twin_gids': new_twin_gids,
'csl_label': csl_label,
}
# ---------------------------------------------------------------------------
# Grain-size matching scale factor
# ---------------------------------------------------------------------------
[docs]
def compute_scale_factor_grain_size(
lfi_sim: np.ndarray,
mean_area_ebsd_um2: float,
ebsd_step_size_um: float,
prop_ebsd: dict | None = None,
) -> dict:
"""
Compute the linear scale factor (µm per sim-pixel) that makes the mean
grain area of the synthetic label field equal to the EBSD mean grain area.
The derivation is:
.. math::
s = \\sqrt{\\bar{A}_{\\text{EBSD}} / \\bar{A}_{\\text{sim}}}
where :math:`s` is in µm/px, :math:`\\bar{A}_{\\text{EBSD}}` is in µm²,
and :math:`\\bar{A}_{\\text{sim}}` is in px².
Parameters
----------
lfi_sim : ndarray (ny, nx)
Synthetic label field (integer grain IDs, background ≤ 0).
mean_area_ebsd_um2 : float
Mean grain area of the EBSD target in µm².
Typically ``rg.stat_ebsd['area']['mean']``.
ebsd_step_size_um : float
EBSD step size in µm (used for reporting only).
prop_ebsd : dict or None
``{grain_id: {'area': ..., ...}}`` dict from EBSD characterisation.
When provided, per-grain EBSD areas are stored in the result for
downstream plotting.
Returns
-------
dict with keys
``'scale_factor'`` : float — µm per sim-pixel
``'mean_area_sim_px2'`` : float — mean sim grain area (px²)
``'mean_area_ebsd_um2'`` : float — target EBSD mean area (µm²)
``'mean_area_sim_um2'`` : float — sim mean area after scaling (µm²)
``'ebsd_step_size_um'`` : float
``'sim_domain_px'`` : tuple (ny, nx)
``'sim_domain_um'`` : ndarray (ny_um, nx_um) — physical size
``'gids_sim'`` : ndarray — grain IDs from lfi_sim (> 0)
``'counts_sim'`` : ndarray — pixel counts per grain
``'areas_sim_um2'`` : ndarray — per-grain areas after scaling (µm²)
``'areas_ebsd_um2'`` : ndarray or None — per-grain EBSD areas (µm²)
``'ratio_pixel_sizes'`` : float — scale_factor / ebsd_step_size_um
"""
gids, counts = np.unique(lfi_sim, return_counts=True)
valid = gids > 0
gids, counts = gids[valid], counts[valid]
mean_area_sim_px2 = counts.mean()
scale_factor = np.sqrt(mean_area_ebsd_um2 / mean_area_sim_px2)
areas_sim_um2 = counts * scale_factor**2
sim_domain_um = np.array(lfi_sim.shape, dtype=float) * scale_factor
areas_ebsd_um2 = None
if prop_ebsd is not None:
areas_ebsd_um2 = np.array([v['area'] for v in prop_ebsd.values()],
dtype=float)
return {
'scale_factor': float(scale_factor),
'mean_area_sim_px2': float(mean_area_sim_px2),
'mean_area_ebsd_um2': float(mean_area_ebsd_um2),
'mean_area_sim_um2': float(areas_sim_um2.mean()),
'ebsd_step_size_um': float(ebsd_step_size_um),
'sim_domain_px': tuple(lfi_sim.shape),
'sim_domain_um': sim_domain_um,
'gids_sim': gids,
'counts_sim': counts,
'areas_sim_um2': areas_sim_um2,
'areas_ebsd_um2': areas_ebsd_um2,
'ratio_pixel_sizes': float(scale_factor / ebsd_step_size_um),
}
# ---------------------------------------------------------------------------
# Twin thickness statistics
# ---------------------------------------------------------------------------
[docs]
def compute_twin_thickness_stats(
parent_info: dict,
prop_ebsd: dict,
step_size_um: float,
thickness_key: str | None = None,
) -> dict:
"""
Compute twin lamella thickness statistics from EBSD grain properties.
Parameters
----------
parent_info : dict
Output of ``identify_parent_grains``. Each value must expose
``'pure_twins'`` and ``'intermediates'`` arrays of EBSD grain IDs.
prop_ebsd : dict
Per-grain property dict ``{grain_id: {'minor_axis_length': ..., ...}}``.
step_size_um : float
EBSD step size in µm/pixel used to convert pixel lengths to µm.
thickness_key : str or None
Key to use as the thickness proxy. If *None* (default), uses
``'minor_axis_length'`` when present, otherwise ``'eq_diameter'``.
Returns
-------
dict with keys
``'gids'`` : list[int] — twin grain IDs with data
``'thick_px'`` : ndarray(N,) — thickness in pixels
``'thick_um'`` : ndarray(N,) — thickness in µm
``'col'`` : str — property key used
``'step_um'`` : float — step size used
``'mean'`` : float
``'median'`` : float
``'std'`` : float
``'min'`` : float
``'max'`` : float
``'pct10'`` : float (10th percentile)
``'pct90'`` : float (90th percentile)
"""
# 1. Collect twin + intermediate grain IDs
twin_gids: set[int] = set()
for info in parent_info.values():
twin_gids.update(int(g) for g in np.asarray(info['pure_twins']).ravel())
twin_gids.update(int(g) for g in np.asarray(info['intermediates']).ravel())
gids_in_prop = set(prop_ebsd.keys())
twin_gids_found = sorted(twin_gids & gids_in_prop)
# 2. Choose property key
sample_keys = list(prop_ebsd[next(iter(prop_ebsd))].keys())
if thickness_key is None:
if 'minor_axis_length' in sample_keys:
thickness_key = 'minor_axis_length'
elif 'eq_diameter' in sample_keys:
thickness_key = 'eq_diameter'
else:
raise RuntimeError(
f"No suitable thickness key found. Available: {sample_keys}"
)
# 3. Extract and filter
thick_px = np.array(
[prop_ebsd[g][thickness_key] for g in twin_gids_found
if prop_ebsd[g][thickness_key] is not None
and prop_ebsd[g][thickness_key] > 0],
dtype=np.float64,
)
thick_um = thick_px * float(step_size_um)
return {
'gids': twin_gids_found,
'thick_px': thick_px,
'thick_um': thick_um,
'col': thickness_key,
'step_um': float(step_size_um),
'mean': float(thick_um.mean()),
'median': float(np.median(thick_um)),
'std': float(thick_um.std(ddof=1)),
'min': float(thick_um.min()),
'max': float(thick_um.max()),
'pct10': float(np.percentile(thick_um, 10)),
'pct90': float(np.percentile(thick_um, 90)),
}
# ---------------------------------------------------------------------------
# Orientation assignment – MDF-matched simulation
# ---------------------------------------------------------------------------
# Σ3 twinning rotation: 60° about [111]/√3 (w, x, y, z) convention
# Use ** 0.5 (pure Python) so module-level init works when numpy is mocked.
_SIGMA3_Q = np.array([
3.0 ** 0.5 / 2.0,
1.0 / (2.0 * 3.0 ** 0.5),
1.0 / (2.0 * 3.0 ** 0.5),
1.0 / (2.0 * 3.0 ** 0.5),
], dtype=np.float64)
# Σ5 rotations: 36.87° about each cubic <100> axis — three symmetry-equivalent variants
import math as _math
_S5_HW = _math.radians(36.87 / 2.0)
_SIGMA5_Q_VARIANTS = np.array([
[_math.cos(_S5_HW), _math.sin(_S5_HW), 0.0, 0.0 ], # [100]
[_math.cos(_S5_HW), 0.0, _math.sin(_S5_HW), 0.0 ], # [010]
[_math.cos(_S5_HW), 0.0, 0.0, _math.sin(_S5_HW)], # [001]
], dtype=np.float64)
def _quat_mul(q1: np.ndarray, q2: np.ndarray) -> np.ndarray:
"""Hamilton product of two unit quaternions stored as (w, x, y, z)."""
w1, x1, y1, z1 = q1
w2, x2, y2, z2 = q2
return np.array([
w1*w2 - x1*x2 - y1*y2 - z1*z2,
w1*x2 + x1*w2 + y1*z2 - z1*y2,
w1*y2 - x1*z2 + y1*w2 + z1*x2,
w1*z2 + x1*y2 - y1*x2 + z1*w2,
], dtype=np.float64)
def _positive_w(q: np.ndarray) -> np.ndarray:
"""Return *q* in the canonical hemisphere (w ≥ 0)."""
return q if q[0] >= 0.0 else -q
[docs]
def compute_s3_lamella_angle_2d(q: 'np.ndarray') -> float:
"""
Compute the 2D S3 twin lamella trace angle in the sample XY plane.
The active {111} plane variant is the one whose sample-frame normal has
the largest XY-plane projected norm. The lamella trace direction is
perpendicular to that projected normal.
Parameters
----------
q : array-like, shape (4,)
Host grain quaternion [w, x, y, z].
Returns
-------
float
Lamella trace angle in degrees, in [0°, 180°).
"""
q = np.asarray(q, dtype=np.float64)
q = q / np.linalg.norm(q)
w, x, y, z = q
R = np.array([
[1 - 2*(y*y + z*z), 2*(x*y - w*z), 2*(x*z + w*y)],
[ 2*(x*y + w*z), 1 - 2*(x*x + z*z), 2*(y*z - w*x)],
[ 2*(x*z - w*y), 2*(y*z + w*x), 1 - 2*(x*x + y*y)],
])
inv_sqrt3 = 1.0 / np.sqrt(3.0)
n_crystal = np.array([
[ 1.0, 1.0, 1.0],
[-1.0, 1.0, 1.0],
[ 1.0, -1.0, 1.0],
[ 1.0, 1.0, -1.0],
]) * inv_sqrt3
n_sample = (R @ n_crystal.T).T # shape (4, 3)
n_xy = n_sample[:, :2] # project onto XY plane
norms = np.linalg.norm(n_xy, axis=1)
best = np.argmax(norms)
n_sel = n_xy[best]
# Trace is perpendicular to the selected projected normal
trace = np.array([-n_sel[1], n_sel[0]])
return float(np.degrees(np.arctan2(trace[1], trace[0])) % 180.0)
[docs]
def assign_orientations_mdf_matched(
lfi_after: np.ndarray,
gs_roles: dict,
twin_result: dict,
ebsd_lfi: np.ndarray,
ebsd_quat: np.ndarray,
ebsd_parent_info: dict,
rng_seed=None,
) -> dict:
"""
Assign grain-averaged quaternion orientations to every domain in the
twinned simulated label field *lfi_after* so that the grain-boundary
MDF of the simulation approximates the EBSD target.
Assignment strategy
-------------------
Pure-parent sim grains
Sampled randomly from the pool of EBSD pure-parent grain-averaged
quaternions.
Geometrically-introduced twin grains (new IDs in *twin_result*)
Derived from the parent's assigned quaternion via the Σ3 twinning
rotation: q_twin = Σ3_Q ⊗ q_parent
Pre-labelled pure-twin sim grains (from Section 15, not in twin_result)
Sampled from the EBSD pure-twin pool.
Intermediate sim grains
Sampled from the EBSD intermediate pool.
Non-role sim grains
Sampled from the EBSD non-role pool.
Pools are sampled *with replacement* when the pool is smaller than the
number of grains needing assignment. The neighbour graph of *lfi_after*
(4-connected pixel contacts) is returned so the caller can immediately
compute the simulated MDF and compare it with the EBSD reference.
Parameters
----------
lfi_after : ndarray (ny, nx)
Label field after twin introduction (integer grain IDs, 0 = background).
gs_roles : dict
Output of ``assign_grain_roles_by_ratio``. Must contain keys
``'pure_parents'``, ``'pure_twins'``, ``'intermediates'``, ``'non_role'``
each with a ``'grain_ids'`` entry that is a set of integer grain IDs.
twin_result : dict
Output of ``introduce_twins_by_csl``. Must contain key
``'new_twin_gids'`` : {parent_gid : [twin_gid, ...]}.
ebsd_lfi : ndarray (ny_e, nx_e)
EBSD label field (grain IDs, 0 = boundary / unindexed).
ebsd_quat : ndarray (ny_e, nx_e, 4)
Per-pixel quaternions from EBSD in (w, x, y, z) convention.
ebsd_parent_info : dict
Output of ``find_csl_grains`` (keyed by CSL label). Each value
must expose ``'pure_parents'``, ``'pure_twins'``, and
``'intermediates'`` arrays of EBSD grain IDs.
rng_seed : int or None
Seed for the random-number generator (reproducibility).
Returns
-------
dict with keys
``'grain_quats'`` : {grain_id : ndarray(4,)}
Grain-averaged quaternion for every non-background grain in
*lfi_after*.
``'quat_pixel'`` : ndarray(ny, nx, 4)
Per-pixel quaternion map built by flooding each grain with its
assigned quaternion.
``'neigh_gid_sim'`` : {grain_id : list[int]}
4-connected contact neighbour graph of *lfi_after*.
"""
rng = np.random.default_rng(rng_seed)
# ------------------------------------------------------------------
# 1. EBSD grain-averaged quaternions
# ------------------------------------------------------------------
gids_ebsd, q_mean_ebsd = grain_avg_quats(ebsd_lfi, ebsd_quat)
gid_to_q_ebsd = {int(g): q_mean_ebsd[i] for i, g in enumerate(gids_ebsd)}
# ------------------------------------------------------------------
# 2. Build EBSD role-stratified pools
# Priority: intermediate > (parent + twin) to match ratio logic.
# ------------------------------------------------------------------
all_pp, all_pt, all_im = set(), set(), set()
for info in ebsd_parent_info.values():
all_pp.update(int(g) for g in np.asarray(info['pure_parents']).ravel())
all_pt.update(int(g) for g in np.asarray(info['pure_twins']).ravel())
all_im.update(int(g) for g in np.asarray(info['intermediates']).ravel())
all_role = all_pp | all_pt | all_im
all_gids_ebsd = set(gid_to_q_ebsd.keys())
all_nr = all_gids_ebsd - all_role
# Remove intermediates from pure sets (same priority rule as compute_grain_role_ratios)
pure_pp_set = all_pp - all_im
pure_pt_set = (all_pt - all_im) - pure_pp_set
im_set = all_im
def _build_pool(gid_set):
""" build pool."""
qs = [gid_to_q_ebsd[g] for g in gid_set if g in gid_to_q_ebsd]
return np.array(qs, dtype=np.float64) if qs else np.empty((0, 4), dtype=np.float64)
pool_pp = _build_pool(pure_pp_set)
pool_pt = _build_pool(pure_pt_set)
pool_im = _build_pool(im_set)
pool_nr = _build_pool(all_nr)
def _sample_pool(pool, n):
"""Draw *n* quaternions from *pool* with replacement; fallback to
random unit quaternions when pool is empty."""
if n == 0:
return []
if len(pool) == 0:
q = rng.standard_normal((n, 4))
q /= np.linalg.norm(q, axis=1, keepdims=True)
return [_positive_w(q[i]) for i in range(n)]
idx = rng.integers(0, len(pool), size=n)
return [_positive_w(pool[i].copy()) for i in idx]
# ------------------------------------------------------------------
# 3. Classify sim grains
# ------------------------------------------------------------------
new_twin_map = twin_result.get('new_twin_gids', {}) # parent → [twins]
introduced_twin_gids: set[int] = set()
parent_of_twin: dict[int, int] = {}
for pg, tg_list in new_twin_map.items():
for tg in tg_list:
introduced_twin_gids.add(int(tg))
parent_of_twin[int(tg)] = int(pg)
# Remove introduced twin IDs from pre-labelled role sets
sim_pp_gids = gs_roles['pure_parents']['grain_ids'] - introduced_twin_gids
sim_pt_gids = gs_roles['pure_twins']['grain_ids'] - introduced_twin_gids
sim_im_gids = gs_roles['intermediates']['grain_ids'] - introduced_twin_gids
sim_nr_gids = gs_roles['non_role']['grain_ids'] - introduced_twin_gids
# ------------------------------------------------------------------
# 4. Assign orientations by role
# ------------------------------------------------------------------
grain_quats: dict[int, np.ndarray] = {}
# Parents first – introduced twins depend on them
for gid, q in zip(sorted(sim_pp_gids), _sample_pool(pool_pp, len(sim_pp_gids))):
grain_quats[gid] = q
for gid, q in zip(sorted(sim_pt_gids), _sample_pool(pool_pt, len(sim_pt_gids))):
grain_quats[gid] = q
for gid, q in zip(sorted(sim_im_gids), _sample_pool(pool_im, len(sim_im_gids))):
grain_quats[gid] = q
for gid, q in zip(sorted(sim_nr_gids), _sample_pool(pool_nr, len(sim_nr_gids))):
grain_quats[gid] = q
# Introduced twin grains: q_twin = Σ3_Q ⊗ q_parent
for tg, pg in parent_of_twin.items():
if pg not in grain_quats:
# Parent might not have been assigned if it was classified differently;
# assign it now from the parent pool as fallback.
q_fb = _sample_pool(pool_pp, 1)
grain_quats[pg] = q_fb[0] if q_fb else np.array([1., 0., 0., 0.])
grain_quats[tg] = _positive_w(_quat_mul(_SIGMA3_Q, grain_quats[pg]))
# ------------------------------------------------------------------
# 5. Per-pixel quaternion array
# ------------------------------------------------------------------
ny, nx = lfi_after.shape
quat_pixel = np.zeros((ny, nx, 4), dtype=np.float64)
quat_pixel[..., 0] = 1.0 # identity as default (background)
for gid, q in grain_quats.items():
quat_pixel[lfi_after == gid] = q
# ------------------------------------------------------------------
# 6. 4-connected neighbour graph
# ------------------------------------------------------------------
all_sim_ids = np.unique(lfi_after)
all_sim_ids = all_sim_ids[all_sim_ids > 0]
neigh_gid_sim: dict[int, set] = {int(g): set() for g in all_sim_ids}
left, right = lfi_after[:, :-1], lfi_after[:, 1:]
mask_h = (left > 0) & (right > 0) & (left != right)
for a, b in zip(left[mask_h].ravel(), right[mask_h].ravel()):
a, b = int(a), int(b)
neigh_gid_sim[a].add(b)
neigh_gid_sim[b].add(a)
top, bot = lfi_after[:-1, :], lfi_after[1:, :]
mask_v = (top > 0) & (bot > 0) & (top != bot)
for a, b in zip(top[mask_v].ravel(), bot[mask_v].ravel()):
a, b = int(a), int(b)
neigh_gid_sim[a].add(b)
neigh_gid_sim[b].add(a)
neigh_gid_sim_lists = {k: list(v) for k, v in neigh_gid_sim.items()}
return {
'grain_quats': grain_quats,
'quat_pixel': quat_pixel,
'neigh_gid_sim': neigh_gid_sim_lists,
}
[docs]
def assign_parent_orientations(
host_gids: np.ndarray,
sim_lfi: np.ndarray,
ebsd_parent_gids: np.ndarray,
ebsd_lfi: np.ndarray,
ebsd_quat: np.ndarray,
rng_seed=None,
max_retries: int = 50,
) -> dict:
"""
Assign EBSD pure-parent quaternions to MC host grains with a
neighbour-conflict-free constraint: no two directly adjacent host grains
receive the same quaternion (which would imply zero misorientation between
them, contradicting the existence of a grain boundary).
When the EBSD parent pool is exhausted (all remaining pool entries still
conflict with every neighbour after *max_retries* redraws), the function
falls back to FCC-texture quaternions generated by
``tops.synth_fcc_quats``.
Parameters
----------
host_gids : array-like of int
MC grain IDs that will host twins.
sim_lfi : ndarray (ny, nx)
MC grain label field.
ebsd_parent_gids : array-like of int
EBSD pure-parent grain IDs (typically
``parent_info[csl_label]['pure_parents']``).
ebsd_lfi : ndarray (ny_e, nx_e)
EBSD grain label field (``rg.lfi_ebsd``).
ebsd_quat : ndarray (ny_e, nx_e, 4)
Per-pixel EBSD quaternions in [w, x, y, z] convention
(``rg.quat_ebsd``).
rng_seed : int or None
Seed for reproducibility.
max_retries : int
Maximum redraws from the EBSD pool before switching to FCC fallback
for a given grain.
Returns
-------
dict with keys
``'host_quats'`` : {gid: ndarray(4,)} — assigned quaternion per host
``'pool_size'`` : int — unique EBSD parent orientations available
``'n_hosts'`` : int
``'n_fallback'`` : int — grains that needed the synth-FCC fallback
"""
import cc3d
from upxo.xtalphy.texops import tops
rng = np.random.default_rng(rng_seed)
host_gids = np.asarray(host_gids, dtype=int)
host_set = set(host_gids.tolist())
# -- EBSD parent quaternion pool ----------------------------------------
all_gids_e, q_mean_e = grain_avg_quats(ebsd_lfi, ebsd_quat)
gid2q = {int(g): q_mean_e[i] for i, g in enumerate(all_gids_e)}
parent_set = {int(g) for g in np.asarray(ebsd_parent_gids).ravel()}
pool = np.array([gid2q[g] for g in sorted(parent_set) if g in gid2q],
dtype=np.float64)
n_pool = len(pool)
# -- Fallback FCC pool (generated lazily on first need) -----------------
_state: dict = {'fcc': None, 'idx': 0}
def _get_fallback() -> np.ndarray:
""" get fallback."""
if _state['fcc'] is None or _state['idx'] >= len(_state['fcc']):
_state['fcc'] = tops.synth_fcc_quats(N=max(200, len(host_set)))
_state['idx'] = 0
q = _state['fcc'][_state['idx']]
_state['idx'] += 1
return _positive_w(q.copy())
# -- 4-connected adjacency restricted to host grains --------------------
edges = cc3d.region_graph(sim_lfi.astype(np.int32), connectivity=4)
adjacency: dict[int, set] = {int(g): set() for g in host_set}
for a, b in edges:
a, b = int(a), int(b)
if a in host_set and b in host_set:
adjacency[a].add(b)
adjacency[b].add(a)
# -- Neighbour-conflict-free assignment ---------------------------------
host_quats: dict[int, np.ndarray] = {}
n_fallback = 0
for gid in rng.permutation(host_gids).tolist():
used_by_neigh = {
host_quats[nb].tobytes()
for nb in adjacency[int(gid)]
if nb in host_quats
}
chosen = None
for _ in range(max_retries):
candidate = _positive_w(pool[int(rng.integers(0, n_pool))].copy())
if candidate.tobytes() not in used_by_neigh:
chosen = candidate
break
if chosen is None:
chosen = _get_fallback()
n_fallback += 1
host_quats[int(gid)] = chosen
return {
'host_quats': host_quats,
'pool_size': n_pool,
'n_hosts': len(host_gids),
'n_fallback': n_fallback,
}
# ---------------------------------------------------------------------------
# Special orientation relationships
# ---------------------------------------------------------------------------
[docs]
def get_ks_rotations() -> np.ndarray:
"""
Return the 24 Kurdjumov-Sachs (K-S) orientation relationship operators
as (24, 3, 3) rotation matrices.
The K-S OR is defined by the relationship between FCC austenite and
BCC martensite/ferrite: ``{111}γ ∥ {110}α, <110>γ ∥ <111>α``.
Returns
-------
ks_variants : ndarray, shape (24, 3, 3)
"""
r1 = Rotation.from_euler('Z', 45, degrees=True)
r2 = Rotation.from_rotvec(
np.deg2rad(54.7356) * np.array([-1., 1., 0.]) / np.sqrt(2)
)
g_ks = (r2 * r1).as_matrix()
ops: list[np.ndarray] = []
for p in permutations(range(3)):
P = np.eye(3)[list(p)]
for signs in product([-1, 1], repeat=3):
S = P * np.array(signs)
if round(np.linalg.det(S)) == 1:
ops.append(S)
sym_ops = np.array(ops) # (48, 3, 3)
ks_variants = sym_ops @ g_ks # (48, 3, 3) — degenerate variants included
return ks_variants
# ---------------------------------------------------------------------------
# Self-sufficient examples (run individually or via exmp_all())
# Each function is fully standalone: imports nothing from outer scope beyond
# this module and standard library.
# ---------------------------------------------------------------------------
[docs]
def exmp_cubic_misorientation():
"""
Demonstrate cubic_misorientation() for a single grain-boundary pair.
Computes the fundamental misorientation angle (degrees), the rotation
axis, and the three smallest unique equivalent angles between two grains
whose orientations are given as Bunge Euler angles.
"""
from upxo.xtalphy.crystal_orientation import cubic_misorientation
ea_grain_A = [0.0, 35.0, 45.0] # Copper component
ea_grain_B = [90.0, 45.0, 0.0] # Goss component
angle, axis, top3 = cubic_misorientation(ea_grain_A, ea_grain_B, degrees=True)
print("=== exmp_cubic_misorientation ===")
print(f" Grain A (Copper): phi1={ea_grain_A[0]}, Phi={ea_grain_A[1]}, phi2={ea_grain_A[2]} deg")
print(f" Grain B (Goss): phi1={ea_grain_B[0]}, Phi={ea_grain_B[1]}, phi2={ea_grain_B[2]} deg")
print(f" Fundamental misorientation angle : {angle:.4f} deg")
print(f" Rotation axis (sample frame) : [{axis[0]:.4f}, {axis[1]:.4f}, {axis[2]:.4f}]")
print(f" Top-3 unique equivalent angles : {[round(a, 4) for a in top3]} deg")
[docs]
def exmp_grain_boundary_misorientation_distribution():
"""
Demonstrate grain_boundary_misorientation_distribution() with explicit
grain-boundary pairs and random per-grain Euler angles.
Prints histogram statistics and plots the GBMD bar chart.
"""
import numpy as np
import matplotlib.pyplot as plt
from upxo.xtalphy.crystal_orientation import (
grain_boundary_misorientation_distribution,
)
rng = np.random.default_rng(seed=42)
N_grains = 30
grain_ids = np.arange(1, N_grains + 1)
# Random Bunge Euler angles in degrees
euler = rng.uniform([0., 0., 0.], [360., 180., 360.], size=(N_grains, 3))
# Build a simple chain of adjacent pairs: 1-2, 2-3, ..., (N-1)-N
pairs = np.column_stack([grain_ids[:-1], grain_ids[1:]])
result = grain_boundary_misorientation_distribution(
euler, pairs,
grain_ids=grain_ids,
degrees=True,
n_bins=18,
angle_range=(0.0, 65.0),
)
print("=== exmp_grain_boundary_misorientation_distribution ===")
print(f" Pairs processed : {result['n_pairs']}")
print(f" Mean angle : {result['mean_angle']:.3f} deg")
print(f" Median angle : {result['median_angle']:.3f} deg")
print(f" Std deviation : {result['std_angle']:.3f} deg")
fig, ax = plt.subplots(figsize=(7, 4))
width = result['hist_bin_edges'][1] - result['hist_bin_edges'][0]
ax.bar(result['hist_bin_centers'], result['hist_counts'], width=width,
edgecolor='k', color='steelblue', alpha=0.8)
ax.set_xlabel('Misorientation angle (°)')
ax.set_ylabel('Number of grain boundaries')
ax.set_title(f"GBMD (n={result['n_pairs']} pairs, "
f"mean={result['mean_angle']:.1f}°, "
f"std={result['std_angle']:.1f}°)")
ax.set_xlim(0, 65)
plt.tight_layout()
plt.show()
[docs]
def exmp_gbmd_from_lfi():
"""
Demonstrate gbmd_from_lfi() on a small synthetic label-field image.
A 20×20 pixel domain is partitioned into a regular 4×4 grid of grains
(16 grains total) each given a random Bunge Euler angle. Grain
boundaries are detected automatically from the label field.
"""
import numpy as np
import matplotlib.pyplot as plt
from upxo.xtalphy.crystal_orientation import gbmd_from_lfi
rng = np.random.default_rng(seed=7)
ny, nx = 20, 20
n_cols = 4
n_rows = 4
N_grains = n_rows * n_cols
# Build label field: grain IDs 1..16 tiled in a 4×4 block pattern
lfi = np.zeros((ny, nx), dtype=int)
tile_h = ny // n_rows
tile_w = nx // n_cols
grain_ids = []
for row in range(n_rows):
for col in range(n_cols):
gid = row * n_cols + col + 1
grain_ids.append(gid)
r0, r1 = row*tile_h, (row+1)*tile_h
c0, c1 = col*tile_w, (col+1)*tile_w
lfi[r0:r1, c0:c1] = gid
grain_ids = np.asarray(grain_ids)
euler = rng.uniform([0., 0., 0.], [360., 180., 360.],
size=(N_grains, 3))
result = gbmd_from_lfi(
lfi, euler,
grain_ids=grain_ids,
connectivity=4,
n_bins=13,
angle_range=(0.0, 65.0),
)
print("=== exmp_gbmd_from_lfi ===")
print(f" GB pairs detected : {len(result['gb_pairs'])}")
print(f" Pairs processed : {result['n_pairs']}")
print(f" Mean angle : {result['mean_angle']:.3f} deg")
print(f" Median angle : {result['median_angle']:.3f} deg")
print(f" Std deviation : {result['std_angle']:.3f} deg")
fig, axes = plt.subplots(1, 2, figsize=(11, 4))
# Left: label field
axes[0].imshow(lfi, cmap='tab20', interpolation='nearest')
axes[0].set_title('Label field (grain IDs)')
axes[0].set_xlabel('x (pixels)'); axes[0].set_ylabel('y (pixels)')
# Right: GBMD
width = result['hist_bin_edges'][1] - result['hist_bin_edges'][0]
axes[1].bar(result['hist_bin_centers'], result['hist_counts'], width=width,
edgecolor='k', color='darkorange', alpha=0.85)
axes[1].set_xlabel('Misorientation angle (°)')
axes[1].set_ylabel('Number of grain boundaries')
axes[1].set_title(f"GBMD (n={result['n_pairs']} pairs, "
f"mean={result['mean_angle']:.1f}°)")
axes[1].set_xlim(0, 65)
plt.tight_layout()
plt.show()
[docs]
def exmp_euler_bunge_to_matrix():
"""
Demonstrate euler_bunge_to_matrix() for well-known FCC texture components
and verify R is orthogonal and det(R)=+1.
"""
import numpy as np
from upxo.xtalphy.crystal_orientation import (
euler_bunge_to_matrix,
FCC_TEXTURE_COMPONENTS,
)
print("=== exmp_euler_bunge_to_matrix ===")
for name, (phi1, Phi, phi2) in FCC_TEXTURE_COMPONENTS.items():
R = euler_bunge_to_matrix(phi1, Phi, phi2, degrees=True)
det = np.linalg.det(R)
err = np.linalg.norm(R @ R.T - np.eye(3), ord='fro')
print(f" {name:<14s} phi1={phi1:5.1f} Phi={Phi:5.1f} phi2={phi2:5.1f} "
f" det={det:.6f} ||RR^T - I||_F={err:.2e}")
[docs]
def exmp_normalize_euler_bunge():
"""
Demonstrate normalize_euler_bunge() on edge-case inputs including
negative Phi, Phi > 180, and out-of-range phi1/phi2.
"""
import numpy as np
from upxo.xtalphy.crystal_orientation import normalize_euler_bunge
raw = np.array([
[ 370., -20., 400.], # phi1 > 360, Phi < 0, phi2 > 360
[ -10., 200., -5.], # phi1 < 0, Phi > 180
[ 45., 90., 90.], # already canonical
[ 0., 0., 0.], # identity
])
norm = normalize_euler_bunge(raw, degrees=True)
print("=== exmp_normalize_euler_bunge ===")
print(f" {'Raw':>30s} {'Normalised':>30s}")
for r, n in zip(raw, norm):
rs = f"({r[0]:6.1f}, {r[1]:6.1f}, {r[2]:6.1f})"
ns = f"({n[0]:6.1f}, {n[1]:6.1f}, {n[2]:6.1f})"
print(f" {rs} -> {ns}")
[docs]
def exmp_fcc_symmetrise_ori():
"""
Demonstrate fcc_symmetrise_ori() for the Copper texture component.
Prints the number of distinct symmetrically equivalent orientations
and the first few triplets.
"""
import numpy as np
from upxo.xtalphy.crystal_orientation import fcc_symmetrise_ori
bea_copper = (90.0, 35.0, 45.0) # Copper: phi1, Phi, phi2 in degrees
equiv = fcc_symmetrise_ori(bea_copper)
print("=== exmp_fcc_symmetrise_ori ===")
print(f" Input orientation (Copper): phi1={bea_copper[0]}, "
f"Phi={bea_copper[1]}, phi2={bea_copper[2]} deg")
print(f" Number of distinct equivalent orientations: {len(equiv)}")
print(" First 5 equivalents (phi1, Phi, phi2):")
for row in equiv[:5]:
print(f" ({row[0]:7.3f}, {row[1]:7.3f}, {row[2]:7.3f})")
[docs]
def exmp_ipf_color():
"""
Demonstrate ipf_color() for standard FCC texture components along [001].
"""
from upxo.xtalphy.crystal_orientation import ipf_color, FCC_TEXTURE_COMPONENTS
print("=== exmp_ipf_color ===")
print(f" {'Component':<14s} {'R':>6s} {'G':>6s} {'B':>6s}")
for name, ea in FCC_TEXTURE_COMPONENTS.items():
r, g, b = ipf_color(ea, sample_direction=[0., 0., 1.])
print(f" {name:<14s} {r:.4f} {g:.4f} {b:.4f}")
[docs]
def exmp_get_ks_rotations():
"""
Demonstrate get_ks_rotations().
Prints shape, verifies all returned matrices are proper rotations.
"""
import numpy as np
from upxo.xtalphy.crystal_orientation import get_ks_rotations
ks = get_ks_rotations()
dets = np.linalg.det(ks)
errs = np.array([np.linalg.norm(R @ R.T - np.eye(3), ord='fro') for R in ks])
print("=== exmp_get_ks_rotations ===")
print(f" Output shape : {ks.shape}")
print(f" All det(R) ≈ +1 : {np.allclose(dets, 1.0, atol=1e-10)}")
print(f" All ||RR^T - I||_F < 1e-10 : {np.all(errs < 1e-10)}")
print(f" Max orthogonality error : {errs.max():.2e}")
[docs]
def exmp_all():
"""
Run every self-sufficient example in sequence.
Suitable for a quick sanity-check of the module.
"""
exmp_euler_bunge_to_matrix()
print()
exmp_normalize_euler_bunge()
print()
exmp_fcc_symmetrise_ori()
print()
exmp_ipf_color()
print()
exmp_get_ks_rotations()
print()
exmp_cubic_misorientation()
print()
exmp_grain_boundary_misorientation_distribution()
print()
exmp_gbmd_from_lfi()