Source code for upxo.interfaces.defdap.ebsd_reader

"""
upxo.interfaces.defdap.ebsd_reader
===================================
Thin, UPXO-native wrapper around DefDAP for loading 2D EBSD maps from
Oxford Instruments files (.ctf / .crc) and exposing the data as plain
NumPy arrays that the rest of UPXO (repgen2d, gsan2d, etc.) can consume
directly.

Supported file formats
----------------------
.ctf   Oxford Instruments text   -> data_type='OxfordText'
.crc   Oxford Instruments binary -> data_type='OxfordBinary'

Extracted arrays (all stored as plain NumPy — no DefDAP objects leak out)
-------------------------------------------------------------------------
lfi_ebsd   : np.ndarray, int,   shape (ny, nx)
    Grain label field.  Values >= 1 are grain IDs.
    -1  = remnant grain-boundary pixel (not assigned to any grain).
    -2  = pixel belonging to a grain smaller than min_grain_size.
    0   = non-indexed point.

euler_ebsd : np.ndarray, float, shape (ny, nx, 3)
    Bunge Euler angles (phi1, Phi, phi2) in **radians**.
    Transposed from DefDAP's native (3, ny, nx) to image-convention
    (ny, nx, 3) so that euler_ebsd[row, col] gives a 3-vector.

quat_ebsd  : np.ndarray, float, shape (ny, nx, 4)
    Unit quaternion per pixel (q0, q1, q2, q3) extracted from DefDAP's
    object array.  Positive-hemisphere convention (q0 >= 0).

step_size  : float
    Pixel step size in microns, read directly from the file metadata.

Usage
-----
from upxo.interfaces.defdap.ebsd_reader import EBSDReader

rdr = EBSDReader.from_file('scan.ctf', min_grain_size=10)
# or
rdr = EBSDReader.from_file('scan.crc', min_grain_size=10)

rdr.lfi_ebsd    # (ny, nx) int array
rdr.euler_ebsd  # (ny, nx, 3) float array, radians
rdr.quat_ebsd   # (ny, nx, 4) float array
rdr.step_size   # float, microns
"""

import os
import pathlib
import numpy as np

# DefDAP is an optional dependency; import lazily so that the rest of UPXO
# does not break if it is not installed.
try:
    import defdap.ebsd as _defdap_ebsd
    _DEFDAP_AVAILABLE = True
except ImportError:
    _DEFDAP_AVAILABLE = False

# ---------------------------------------------------------------------------
# File-format registry
# ---------------------------------------------------------------------------
_EXT_TO_DATATYPE = {
    '.ctf': 'OxfordText',
    '.crc': 'OxfordBinary',
}


[docs] class EBSDReader: """ UPXO-native EBSD file reader backed by DefDAP. Parameters are not passed directly; use the class-methods as constructors. Attributes ---------- lfi_ebsd : np.ndarray, int, shape (ny, nx) Grain label field (see module docstring for value conventions). euler_ebsd : np.ndarray, float, shape (ny, nx, 3) Bunge Euler angles phi1, Phi, phi2 in radians. quat_ebsd : np.ndarray, float, shape (ny, nx, 4) Unit quaternion coefficients q0, q1, q2, q3 per pixel. step_size : float Step size in microns. file_path : str Absolute path to the source EBSD file. shape : tuple(int, int) Map shape (ny, nx). """ __slots__ = ('lfi_ebsd', 'euler_ebsd', 'quat_ebsd', 'step_size', 'file_path', 'shape', 'prop') def __init__(self): # Slots are populated by the class-methods; direct construction # is intentionally left empty. pass # ------------------------------------------------------------------ # Constructors # ------------------------------------------------------------------
[docs] @classmethod def from_file(cls, file_path, min_grain_size=10, misori_tol=10, data_type=None): """ Load an EBSD map from a .ctf or .crc file and extract all UPXO-relevant arrays. Parameters ---------- file_path : str or pathlib.Path Path to the EBSD file. Extension must be .ctf or .crc unless data_type is supplied explicitly. min_grain_size : int, optional Minimum grain size in pixels passed to DefDAP's ``find_grains()``. Grains smaller than this are labelled -2 in ``lfi_ebsd``. Default 10. misori_tol : float, optional Misorientation tolerance in degrees for grain boundary detection inside DefDAP. Default 10. data_type : str or None, optional Override the DefDAP data_type string. When None (default) the format is inferred from the file extension: '.ctf' -> 'OxfordText' '.crc' -> 'OxfordBinary' Returns ------- EBSDReader Populated instance with ``lfi_ebsd``, ``euler_ebsd``, ``quat_ebsd``, ``step_size``, ``file_path``, ``shape``. Raises ------ ImportError If DefDAP is not installed. FileNotFoundError If ``file_path`` does not exist. ValueError If the file extension is not recognised and ``data_type`` is not provided. """ _require_defdap() file_path = pathlib.Path(file_path).resolve() _check_file(file_path) if data_type is None: data_type = _infer_data_type(file_path) # DefDAP's OxfordTextLoader appends the extension (.ctf/.crc) itself, # so we must pass the path WITHOUT the extension (stem only). ebsd_map = _defdap_ebsd.Map(str(file_path.with_suffix('')), dataType=data_type) # buildQuatArray() must be called before findBoundaries() — DefDAP # does not build it automatically on load (0.93.x behaviour). ebsd_map.buildQuatArray() # Detect grain boundaries and grains # findBoundaries uses 'boundDef' (not misori_tol) in 0.93.x ebsd_map.findBoundaries(boundDef=misori_tol) ebsd_map.findGrains(minGrainSize=min_grain_size) obj = cls() obj.file_path = str(file_path) obj.step_size = float(ebsd_map.stepSize) # stepSize in 0.93.x obj.shape = tuple(ebsd_map.shape) # (ny, nx) obj.lfi_ebsd = _extract_lfi(ebsd_map) obj.euler_ebsd = _extract_euler(ebsd_map) obj.quat_ebsd = _extract_quat(ebsd_map) obj.prop = None return obj
[docs] @classmethod def from_file_subsampled( cls, src_path, dst_path, stride_x: int = 2, stride_y: int = 2, min_grain_size: int = 10, misori_tol: float = 10, data_type=None, ): """ Sub-sample a CTF file, write the result to *dst_path*, then load it via the normal ``from_file`` pipeline. This is Method C sub-sampling: the file is reduced before DefDAP ever reads it, so grain detection runs on the smaller map and memory usage is proportional to the sub-sampled size. Parameters ---------- src_path : str or pathlib.Path Source ``.ctf`` file. dst_path : str or pathlib.Path Destination for the written sub-sampled CTF file. The parent directory must already exist. stride_x : int Keep every *stride_x*-th pixel along X. Default 2. stride_y : int Keep every *stride_y*-th pixel along Y. Default 2. min_grain_size : int Forwarded to ``from_file()``. Default 10. misori_tol : float Forwarded to ``from_file()``. Default 10. data_type : str or None Forwarded to ``from_file()``. Default None (auto-detected). Returns ------- EBSDReader Instance loaded from the written sub-sampled CTF file. """ out = write_subsampled_ctf(src_path, dst_path, stride_x=stride_x, stride_y=stride_y) return cls.from_file(out, min_grain_size=min_grain_size, misori_tol=misori_tol, data_type=data_type)
# ------------------------------------------------------------------ # Convenience properties # ------------------------------------------------------------------ @property def ny(self): """Number of rows (y pixels).""" return self.shape[0] @property def nx(self): """Number of columns (x pixels).""" return self.shape[1] @property def n_grains(self): """Number of grains detected (label values >= 1).""" return int(self.lfi_ebsd.max()) # ------------------------------------------------------------------ # Repr # ------------------------------------------------------------------ def __repr__(self): try: return (f"EBSDReader(file='{os.path.basename(self.file_path)}', " f"shape={self.shape}, n_grains={self.n_grains}, " f"step_size={self.step_size} um)") except AttributeError: return "EBSDReader(uninitialised)" # ------------------------------------------------------------------ # Spatial crop # ------------------------------------------------------------------
[docs] def crop(self, region, inplace=False): """ Crop the EBSD map to a rectangular sub-region specified as **percentage** extents of the full map. Parameters ---------- region : sequence of 4 numbers ``[xstart%, ystart%, xend%, yend%]`` — all values in the range [0, 100]. * ``xstart%`` / ``xend%`` are measured along the *column* (x) axis (i.e. nx dimension). * ``ystart%`` / ``yend%`` are measured along the *row* (y) axis (i.e. ny dimension). Example: ``[10, 10, 80, 80]`` keeps the central 70 % of the map in both directions, discarding a 10 % border on each side. inplace : bool, optional If ``True``, modify ``self`` and return ``None``. If ``False`` (default), return a new :class:`EBSDReader` with the cropped arrays; ``self`` is left unchanged. Returns ------- EBSDReader or None Cropped reader (when ``inplace=False``) or ``None`` (when ``inplace=True``). Raises ------ ValueError If *region* does not have exactly 4 elements, any value is outside [0, 100], or start >= end in either axis. Notes ----- DefDAP (0.93.x) has no built-in crop, so the crop is performed entirely on the extracted NumPy arrays. ``step_size`` is unchanged (cropping does not affect pixel pitch). Grain IDs in ``lfi_ebsd`` are preserved as-is — they are *not* re-numbered after cropping. """ xs, ys, xe, ye = region if not (len(region) == 4): raise ValueError("region must have exactly 4 elements " "[xstart%, ystart%, xend%, yend%]") for v in (xs, ys, xe, ye): if not (0 <= v <= 100): raise ValueError(f"All region values must be in [0, 100]; got {v}") if xs >= xe: raise ValueError(f"xstart% ({xs}) must be less than xend% ({xe})") if ys >= ye: raise ValueError(f"ystart% ({ys}) must be less than yend% ({ye})") ny, nx = self.shape # Convert percentage to pixel indices (inclusive on both ends) col0 = int(round(xs / 100.0 * nx)) col1 = int(round(xe / 100.0 * nx)) row0 = int(round(ys / 100.0 * ny)) row1 = int(round(ye / 100.0 * ny)) # Clamp to valid range col0 = max(0, min(col0, nx)) col1 = max(0, min(col1, nx)) row0 = max(0, min(row0, ny)) row1 = max(0, min(row1, ny)) lfi_c = self.lfi_ebsd[row0:row1, col0:col1].copy() euler_c = self.euler_ebsd[row0:row1, col0:col1, :].copy() quat_c = self.quat_ebsd[row0:row1, col0:col1, :].copy() new_shape = lfi_c.shape # (ny_crop, nx_crop) print(f"[crop] region=[{xs}, {ys}, {xe}, {ye}]% → " f"rows {row0}:{row1}, cols {col0}:{col1} | " f"original {ny}×{nx} → cropped {new_shape[0]}×{new_shape[1]}", flush=True) if inplace: self.lfi_ebsd = lfi_c self.euler_ebsd = euler_c self.quat_ebsd = quat_c self.shape = new_shape return None else: obj = EBSDReader.__new__(EBSDReader) obj.lfi_ebsd = lfi_c obj.euler_ebsd = euler_c obj.quat_ebsd = quat_c obj.step_size = self.step_size obj.file_path = self.file_path obj.shape = new_shape return obj
# ------------------------------------------------------------------ # Full EBSD characterisation pipeline # ------------------------------------------------------------------
[docs] def characterise(self, connectivity=4, min_grain_size=0): """ Run the complete post-load characterisation pipeline on ``self`` and return a result dict ready for consumption by ``repgen2d.rechar()`` or directly by user code. Steps ----- 1. ``rechar_lfi(connectivity)`` — fill non-positive pixels, update ``euler_ebsd`` / ``quat_ebsd`` in-place. 2. ``_char_lfi(lfi_ebsd, step_size, min_grain_size)`` — compute per-grain morphological properties; grains smaller than ``min_grain_size`` pixels are excluded. 3. ``find_neighs2d(lfi_ebsd, conn)`` — build first-order neighbour dict. Parameters ---------- connectivity : int cc3d connectivity for ``rechar_lfi`` and ``find_neighs2d``. Valid 2D values: 4 or 8. Default 4. min_grain_size : int, optional Minimum grain size in pixels. Grains with fewer pixels are excluded from the returned ``'prop'`` dict. Default 0 (all grains included). Returns ------- dict with keys: ``'lfi'`` np.ndarray int32 (ny, nx) cleaned label field ``'euler'`` np.ndarray float64 (ny, nx, 3) Bunge angles, radians ``'quat'`` np.ndarray float64 (ny, nx, 4) unit quaternions ``'neigh_gid'`` dict grain_id -> list[int] first-order neighbours ``'prop'`` dict grain_id -> property dict (see ``_char_lfi``) ``'step_size'`` float pixel step size in microns """ from upxo.gsdataops.gid_ops import find_neighs2d self.rechar_lfi(connectivity=connectivity) prop = _char_lfi(self.lfi_ebsd, px_size=getattr(self, 'step_size', 1.0), min_grain_size=min_grain_size) # Relabel lfi_ebsd so surviving grain IDs are contiguous 1…N. # Done after _char_lfi so min_grain_size drops are already resolved. # Small-grain pixels (not in prop) are zeroed out in lfi_ebsd. # euler_ebsd / quat_ebsd need no remapping — they are (row,col) arrays. surviving = sorted(prop.keys()) lut = np.zeros(int(self.lfi_ebsd.max()) + 1, dtype=self.lfi_ebsd.dtype) for new_id, old_id in enumerate(surviving, start=1): lut[old_id] = new_id self.lfi_ebsd = lut[self.lfi_ebsd] prop = {new_id: prop[old_id] for new_id, old_id in enumerate(surviving, start=1)} neigh = find_neighs2d(self.lfi_ebsd, conn=connectivity) self.prop = prop return { 'lfi': self.lfi_ebsd, 'euler': self.euler_ebsd, 'quat': self.quat_ebsd, 'neigh_gid': neigh, 'prop': prop, 'step_size': getattr(self, 'step_size', 1.0), }
# ------------------------------------------------------------------ # LFI re-characterisation # ------------------------------------------------------------------
[docs] def rechar_lfi(self, connectivity=4): """ Re-characterise ``lfi_ebsd`` by filling every non-positive pixel (values <= 0, including DefDAP's -1 remnant-boundary and -2 sub-minimum-size codes, and non-indexed 0) with the label of the spatially largest grain that borders that connected region. ``euler_ebsd`` and ``quat_ebsd`` for the filled pixels are then updated with the grain-average orientation of the assigned grain. Algorithm --------- 1. Identify all non-positive pixels as the *unknown* mask. 2. Use cc3d to label connected components of the unknown mask (connectivity 4 or 8, matching the supplied parameter). 3. Assign each connected component a temporary unique label ``n_grains + cc_id`` in a working copy of ``lfi_ebsd``. 4. Call ``find_neighs2d`` on the augmented label field to obtain the adjacency list for every label (standard + temporary). 5. For each temporary CC label, find the neighbouring valid grain with the greatest pixel count and assign that grain's ID to all pixels of the CC. 6. Compute per-grain mean Euler angles and mean (normalised) quaternion from the *original* valid pixels of each grain. 7. Write those averages into ``euler_ebsd`` and ``quat_ebsd`` at the newly filled pixel positions. 8. Store the updated arrays back to ``self``. Parameters ---------- connectivity : int cc3d connectivity for labelling the unknown region. Valid 2D values: 4 (edge-only) or 8 (edge+corner). Default 4. Raises ------ ValueError If ``connectivity`` is not 4 or 8. Notes ----- After calling this method ``lfi_ebsd`` should contain only positive grain labels (>= 1). ``euler_ebsd`` and ``quat_ebsd`` are updated in-place on ``self``. The grain-average orientation used for filling is the arithmetic mean of the *original* valid pixels — adequate for intra-grain spread; it is not a proper quaternion mean but sufficient for neighbourhood-assignment. """ import cc3d from upxo.gsdataops.gid_ops import find_neighs2d try: from tqdm.auto import tqdm as _tqdm except ImportError: def _tqdm(it, **kw): # silent fallback if tqdm not installed print(f"[rechar_lfi] {kw.get('desc', '')} ...", flush=True) return it if connectivity not in (4, 8): raise ValueError("connectivity must be 4 or 8 for 2D cc3d.") lfi = self.lfi_ebsd.astype(np.int32) unknown_mask = lfi <= 0 if not unknown_mask.any(): print("[rechar_lfi] No non-positive pixels found — nothing to do.") return n_unknown = int(unknown_mask.sum()) ny, nx = lfi.shape print(f"[rechar_lfi] Map {ny}×{nx} | unknown pixels: {n_unknown} " f"({100*n_unknown/(ny*nx):.2f}%) | connectivity={connectivity}", flush=True) # ------------------------------------------------------------------ # Step 1-3: connected-component labelling of unknown pixels # ------------------------------------------------------------------ print("[rechar_lfi] Step 1/5 — labelling unknown connected components …", flush=True) n_valid = int(lfi.max()) cc_labels = cc3d.connected_components( unknown_mask.astype(np.uint8), connectivity=connectivity ) # (ny, nx), values 1..n_cc, 0 = valid n_cc = int(cc_labels.max()) print(f"[rechar_lfi] {n_cc} connected components found.", flush=True) lfi_aug = lfi.copy() lfi_aug[unknown_mask] = (n_valid + cc_labels[unknown_mask]).astype(np.int32) # ------------------------------------------------------------------ # Step 4: adjacency # ------------------------------------------------------------------ print("[rechar_lfi] Step 2/5 — computing grain adjacency …", flush=True) neigh = find_neighs2d(lfi_aug, conn=connectivity) # ------------------------------------------------------------------ # Step 5: assign each CC to largest neighbouring valid grain # ------------------------------------------------------------------ print("[rechar_lfi] Step 3/5 — assigning CC pixels to grains …", flush=True) valid_pixels = lfi[~unknown_mask] ids, counts = np.unique(valid_pixels, return_counts=True) grain_size = dict(zip(ids.tolist(), counts.tolist())) lfi_filled = lfi.copy() cc_assignment = {} for cc_id in _tqdm(range(1, n_cc + 1), desc=" assigning CCs", unit="CC", leave=False): temp_label = n_valid + cc_id valid_neighbours = [ nb for nb in neigh.get(temp_label, []) if 1 <= nb <= n_valid ] if not valid_neighbours: continue largest = max(valid_neighbours, key=lambda g: grain_size.get(g, 0)) cc_mask = cc_labels == cc_id lfi_filled[cc_mask] = largest cc_assignment[cc_id] = largest n_filled = sum((cc_labels == cc_id).sum() for cc_id in cc_assignment) print(f"[rechar_lfi] {len(cc_assignment)}/{n_cc} CCs assigned " f"({n_filled} pixels filled).", flush=True) # ------------------------------------------------------------------ # Step 6-7: vectorized grain-average orientation fill # Avoid per-grain Python loops by using scipy.ndimage.mean over the # full array at once — O(ny*nx) regardless of grain count. # ------------------------------------------------------------------ print("[rechar_lfi] Step 4/5 — computing grain-average orientations " "(vectorized) …", flush=True) from scipy.ndimage import mean as _ndmean euler = self.euler_ebsd.copy() # (ny, nx, 3) quat = self.quat_ebsd.copy() # (ny, nx, 4) assigned_grains = sorted(set(cc_assignment.values())) n_ag = len(assigned_grains) print(f"[rechar_lfi] {n_ag} unique grains to average over.", flush=True) # Compute per-grain mean for every channel in one ndimage.mean call. # Labels array is the *original* lfi (valid pixels only). valid_lfi = lfi.copy() valid_lfi[unknown_mask] = 0 # neutralize unknowns for ndimage # euler: 3 channels print("[rechar_lfi] averaging Euler channels (3) …", flush=True) avg_euler_arr = np.stack( [_ndmean(euler[:, :, ch], labels=valid_lfi, index=assigned_grains) for ch in range(3)], axis=1 ) # (n_assigned_grains, 3) print("[rechar_lfi] averaging quaternion channels (4) …", flush=True) # quat: 4 channels avg_quat_arr = np.stack( [_ndmean(quat[:, :, ch], labels=valid_lfi, index=assigned_grains) for ch in range(4)], axis=1 ) # (n_assigned_grains, 4) # Normalise quaternions and enforce positive hemisphere norms = np.linalg.norm(avg_quat_arr, axis=1, keepdims=True) norms = np.where(norms > 0, norms, 1.0) avg_quat_arr /= norms neg = avg_quat_arr[:, 0] < 0 avg_quat_arr[neg] *= -1 print("[rechar_lfi] orientation averages ready.", flush=True) # ------------------------------------------------------------------ # Step 5/5 — vectorised orientation write (O(n_unknown), no loop) # ------------------------------------------------------------------ # lfi_filled already stores the assigned grain ID for every pixel # (including the ones that were unknown). Build a compact lookup: # grain_ids[i] → assigned_grains[i] # gid_to_idx[gid] → row index in avg_euler_arr / avg_quat_arr # Then fancy-index the two arrays for all unknown pixels at once. # ------------------------------------------------------------------ n_unk = int(unknown_mask.sum()) print(f"[rechar_lfi] Step 5/5 — writing orientations for {n_unk} " "unknown pixels (vectorised, no loop) …", flush=True) grain_ids = np.array(assigned_grains, dtype=np.intp) max_gid = int(grain_ids.max()) + 1 gid_to_idx = np.full(max_gid, -1, dtype=np.intp) gid_to_idx[grain_ids] = np.arange(len(grain_ids), dtype=np.intp) unk_y, unk_x = np.where(unknown_mask) assigned_gids = lfi_filled[unk_y, unk_x] # grain ID per unknown px row_idx = gid_to_idx[assigned_gids] # row in avg arrays # Only write pixels that were actually assigned (row_idx >= 0) valid_write = row_idx >= 0 vy, vx, vr = unk_y[valid_write], unk_x[valid_write], row_idx[valid_write] euler[vy, vx] = avg_euler_arr[vr] # (n_written, 3) quat[vy, vx] = avg_quat_arr[vr] # (n_written, 4) n_written = int(valid_write.sum()) print(f"[rechar_lfi] {n_written} pixels written " f"({n_unk - n_written} isolated islands left unfilled).", flush=True) # ------------------------------------------------------------------ # Step 8: store back # ------------------------------------------------------------------ self.lfi_ebsd = lfi_filled self.euler_ebsd = euler self.quat_ebsd = quat print("[rechar_lfi] Done.", flush=True)
# ------------------------------------------------------------------ # Visualisation orchestrator — thin wrappers around ebsdviz # ------------------------------------------------------------------
[docs] def plot_grain_map(self, **kwargs): """Plot the grain label field (lfi_ebsd) with optional boundary overlay. All keyword arguments are forwarded to ``upxo.viz.ebsdviz.plot_grain_labels``. Returns ------- fig, ax """ from upxo.viz import ebsdviz as _ev return _ev.plot_grain_labels(self, **kwargs)
[docs] def plot_euler_maps(self, **kwargs): """Plot phi1, Phi, phi2 Euler angle maps side-by-side. All keyword arguments are forwarded to ``upxo.viz.ebsdviz.plot_euler_maps``. Returns ------- fig, axes """ from upxo.viz import ebsdviz as _ev return _ev.plot_euler_maps(self, **kwargs)
[docs] def plot_grain_size_histogram(self, **kwargs): """Plot a grain area histogram in physical units (µm²). All keyword arguments are forwarded to ``upxo.viz.ebsdviz.plot_grain_size_histogram``. Returns ------- fig, ax """ from upxo.viz import ebsdviz as _ev return _ev.plot_grain_size_histogram(self, **kwargs)
[docs] def plot_grain_structure_with_boundaries(self, **kwargs): """Plot grain label map and Euler channel side-by-side with boundaries. All keyword arguments are forwarded to ``upxo.viz.ebsdviz.plot_grain_structure_with_boundaries``. Returns ------- fig, axes """ from upxo.viz import ebsdviz as _ev return _ev.plot_grain_structure_with_boundaries(self, **kwargs)
[docs] def see_distr(self, prop='area', nbins=40, vis='hist', show_kde=True, show_stats=True, color='steelblue', figsize=(7, 4), log_scale=False): """ Visualise the distribution of a grain morphological property. Requires :meth:`characterise` to have been called first (it populates ``self.prop`` with per-grain property dicts). Parameters ---------- prop : str Grain property to plot. Valid names (all scaled to physical units via ``step_size``): ``'area'``, ``'perimeter'``, ``'eq_diameter'``, ``'aspect_ratio'``, ``'major_axis_length'``, ``'minor_axis_length'``, ``'eccentricity'``, ``'solidity'``, ``'npixels'``. nbins : int Number of histogram bins. Default 40. vis : str Plot style: ``'hist'`` — histogram with optional KDE overlay; ``'kde'`` — pure kernel density estimate; ``'hist_kde'`` — density-normalised histogram + KDE. Default ``'hist'``. show_kde : bool Overlay a KDE curve on the histogram (``vis='hist'`` only). Default True. show_stats : bool Annotate mean and median lines. Default True. color : str Histogram / KDE fill colour. Default ``'steelblue'``. figsize : tuple Figure size (width, height) in inches. Default ``(7, 4)``. log_scale : bool Apply log scale to the x-axis. Default False. Returns ------- fig, ax : matplotlib Figure and Axes Raises ------ RuntimeError If ``self.prop`` is not populated (characterise not yet called). KeyError If *prop* is not present in the property dicts. ValueError If *vis* is not one of the supported values. Examples -------- >>> fig, ax = rdr.see_distr(prop='area', nbins=40) >>> plt.show() >>> fig, ax = rdr.see_distr(prop='aspect_ratio', vis='hist_kde') >>> plt.show() """ import numpy as np from upxo.viz.vizDistr import DistrViz, PROP_UNITS if not hasattr(self, 'prop') or self.prop is None: raise RuntimeError( "self.prop is not populated. Call rdr.characterise() first." ) data = np.array([v[prop] for v in self.prop.values()]) label = prop.replace('_', ' ').title() dv = DistrViz(data, label=label, units=PROP_UNITS.get(prop, '')) return dv.plot(vis=vis, bins=nbins, show_kde=show_kde, show_stats=show_stats, color=color, figsize=figsize, log_scale=log_scale, step_size=self.step_size)
# --------------------------------------------------------------------------- # Module-level helper: quick skimage characterisation of a label field # --------------------------------------------------------------------------- def _char_lfi(lfi, px_size=1.0, min_grain_size=0): """ Compute basic grain morphological properties from a 2D integer label field using ``skimage.measure.regionprops``. Parameters ---------- lfi : np.ndarray, int, shape (ny, nx) Grain label field. Labels must be >= 1. Non-positive values are ignored. px_size : float, optional Physical size of one pixel (microns or simulation units). Area is reported in px_size² and lengths in px_size. Default 1.0. min_grain_size : int, optional Minimum grain size in **pixels**. Grains with fewer pixels than this threshold are excluded from the returned dict. Default 0 (all grains included). Returns ------- dict Mapping grain_id (int) -> dict of properties: ``area`` float — grain area in px_size² units ``perimeter`` float — grain perimeter in px_size units ``eq_diameter`` float — equivalent circular diameter ``aspect_ratio`` float — major_axis_length / minor_axis_length ``major_axis_length`` float — major axis in px_size units ``minor_axis_length`` float — minor axis in px_size units ``eccentricity`` float — eccentricity of best-fit ellipse ``solidity`` float — area / convex hull area ``euler_number`` int — Euler characteristic ``centroid`` tuple — (row, col) in pixel coordinates ``bbox`` tuple — (min_row, min_col, max_row, max_col) ``npixels`` int — number of pixels """ from skimage.measure import regionprops props_dict = {} ps = float(px_size) for region in regionprops(lfi): gid = region.label if gid <= 0: continue if region.area < min_grain_size: continue maj = region.major_axis_length * ps mn = region.minor_axis_length * ps ar = (maj / mn) if mn > 0 else float('nan') props_dict[gid] = { 'area': region.area * ps ** 2, 'perimeter': region.perimeter * ps, 'eq_diameter': region.equivalent_diameter * ps, 'aspect_ratio': ar, 'major_axis_length': maj, 'minor_axis_length': mn, 'eccentricity': region.eccentricity, 'solidity': region.solidity, 'euler_number': region.euler_number, 'centroid': region.centroid, 'bbox': region.bbox, 'npixels': region.area, } return props_dict # --------------------------------------------------------------------------- # Private helpers # --------------------------------------------------------------------------- def _require_defdap(): if not _DEFDAP_AVAILABLE: raise ImportError( "DefDAP is required to read EBSD files. " "Install it with: pip install defdap" ) def _check_file(path): if not path.exists(): raise FileNotFoundError(f"EBSD file not found: {path}") def _infer_data_type(path): ext = path.suffix.lower() if ext not in _EXT_TO_DATATYPE: raise ValueError( f"Unrecognised file extension '{ext}'. " f"Supported: {list(_EXT_TO_DATATYPE.keys())}. " f"Pass data_type explicitly to override." ) return _EXT_TO_DATATYPE[ext]
[docs] def write_subsampled_ctf( src_path, dst_path, stride_x: int = 2, stride_y: int = 2, ) -> pathlib.Path: """ Read a CTF file, sub-sample every *stride_x*-th column and *stride_y*-th row, update the header metadata, and write a new valid CTF file to *dst_path*. The X / Y physical-coordinate values in the kept data rows are left unchanged — they already carry the correct micron positions. Only the ``XCells``, ``YCells``, ``XStep``, and ``YStep`` header fields are updated to reflect the new pixel pitch and map dimensions. Parameters ---------- src_path : str or pathlib.Path Source ``.ctf`` file. A ``ValueError`` is raised for other extensions. dst_path : str or pathlib.Path Destination path for the sub-sampled CTF file. The parent directory must already exist. stride_x : int Keep every *stride_x*-th pixel along the X (column) axis. Default 2. stride_y : int Keep every *stride_y*-th pixel along the Y (row) axis. Default 2. Returns ------- pathlib.Path Absolute path to the written file (*dst_path*), so the caller can chain directly into ``EBSDReader.from_file()``. Raises ------ ValueError If *src_path* does not have a ``.ctf`` extension. FileNotFoundError If *src_path* does not exist. """ import math src_path = pathlib.Path(src_path).resolve() dst_path = pathlib.Path(dst_path).resolve() if src_path.suffix.lower() != '.ctf': raise ValueError( f"write_subsampled_ctf only supports .ctf files; " f"got '{src_path.suffix}'." ) _check_file(src_path) # ── 1. Parse header ──────────────────────────────────────────────── header_lines = [] # verbatim header lines (to be modified below) col_header = '' # the "Phase X Y Euler1 ..." line xcells = ycells = None xstep = ystep = None with src_path.open('r', encoding='latin-1') as fh: for raw in fh: line = raw.rstrip('\n') stripped = line.strip() # Detect end of header: the column-name line starts with the # token "Phase" (case-insensitive) regardless of whether the # separator is a tab or a space. tokens = stripped.split() if tokens and tokens[0].lower() == 'phase': col_header = raw break # Extract key header fields if stripped.startswith('XCells'): xcells = int(stripped.split()[-1]) elif stripped.startswith('YCells'): ycells = int(stripped.split()[-1]) elif stripped.startswith('XStep'): xstep = float(stripped.split()[-1]) elif stripped.startswith('YStep'): ystep = float(stripped.split()[-1]) header_lines.append(raw) # Read all remaining data lines data_lines = fh.readlines() if None in (xcells, ycells, xstep, ystep): raise ValueError( "Could not parse XCells/YCells/XStep/YStep from CTF header. " f"File: {src_path}" ) # ── 2. Compute updated header values ────────────────────────────── new_xcells = math.ceil(xcells / stride_x) new_ycells = math.ceil(ycells / stride_y) new_xstep = xstep * stride_x new_ystep = ystep * stride_y def _patch_header(lines): out = [] for raw in lines: s = raw.strip() if s.startswith('XCells'): out.append(f'XCells\t{new_xcells}\n') elif s.startswith('YCells'): out.append(f'YCells\t{new_ycells}\n') elif s.startswith('XStep'): out.append(f'XStep\t{new_xstep}\n') elif s.startswith('YStep'): out.append(f'YStep\t{new_ystep}\n') else: out.append(raw if raw.endswith('\n') else raw + '\n') return out # ── 3. Sub-sample data rows ──────────────────────────────────────── # Rows are ordered row-major: Y-index (iy) increments slowest, # X-index (ix) fastest. Pixel at flat index k: # iy = k // xcells, ix = k % xcells kept = [ row for k, row in enumerate(data_lines) if (k // xcells) % stride_y == 0 and (k % xcells) % stride_x == 0 ] # ── 4. Write new CTF file ────────────────────────────────────────── with dst_path.open('w', encoding='latin-1') as fh: fh.writelines(_patch_header(header_lines)) fh.write(col_header) fh.writelines(kept) return dst_path
def _extract_lfi(ebsd_map): """ Return grain label field as int32 array of shape (ny, nx). DefDAP 0.93.x stores the label field in ``ebsd_map.grains`` (not ``ebsd_map.data.grains``). Values: >=1 proper grain IDs, -1 remnant boundary, -2 sub-minimum-size, 0 non-indexed. """ return np.array(ebsd_map.grains, dtype=np.int32) def _extract_euler(ebsd_map): """ Return Bunge Euler angles (phi1, Phi, phi2) in radians. DefDAP 0.93.x stores as ``eulerAngleArray`` with shape (3, ny, nx). We transpose to (ny, nx, 3) for image-convention access: euler_ebsd[row, col] -> [phi1, Phi, phi2]. """ # eulerAngleArray shape: (3, ny, nx) -> transpose to (ny, nx, 3) return np.asarray(ebsd_map.eulerAngleArray, dtype=np.float64).transpose(1, 2, 0) def _extract_quat(ebsd_map): """ Return quaternion coefficients (q0, q1, q2, q3) per pixel. DefDAP 0.93.x stores an object array of Quat instances in ``ebsd_map.quatArray`` with shape (ny, nx). Each Quat exposes ``.quatCoef`` (camelCase) as a (4,) float array. Positive-hemisphere convention applied (q0 >= 0). Returns shape (ny, nx, 4), float64. """ # Ensure quatArray is populated (it may need buildQuatArray() first) if ebsd_map.quatArray is None: ebsd_map.buildQuatArray() # DefDAP 0.93.x builds quatComps internally during buildQuatArray(). # That array has shape (4, ny, nx) and lives at ebsd_map._quatComps # (private) — however the public path is to re-derive it from quatArray # using np.vectorize, which is ~50× faster than a pure Python double loop. ori = ebsd_map.quatArray # (ny, nx) object array of Quat ny, nx = ori.shape # Vectorized extraction: stack all .quatCoef arrays along a new axis. # np.frompyfunc avoids Python-level loop overhead. get_coef = np.frompyfunc(lambda q: q.quatCoef, 1, 1) coef_obj = get_coef(ori) # (ny, nx) object array of (4,) arrays # Stack into (ny, nx, 4) — np.array on an object array of equal-shape # sub-arrays triggers a single C-level copy. quat_arr = np.array(coef_obj.tolist(), dtype=np.float64) # (ny, nx, 4) # Positive-hemisphere convention: flip rows where q0 < 0 neg = quat_arr[:, :, 0] < 0 quat_arr[neg] *= -1 return quat_arr