"""
Grid operations for Labelled Feature Image workflows.
This module provides utilities for sectioning, interpolation/resampling,
bounding-box extraction, feature-ID transformations, neighborhood-aware
smoothing, and synthetic LFI test-pattern generation in 2D/3D.
Imports
-------
import upxo.gsdataops.grid_ops as gridOps
Metadata
--------
* Module: upxo.gsdataops.grid_ops
* Package: upxo
* License: GPL-3.0-only
* Author: Dr. Sunil Anandatheertha
* Email: vaasu.anandatheertha@ukaea.uk
* Status: Active development
* Last updated: 2026-03-12
Applications
------------
* 2D/3D Labelled Feature Image section extraction
* Grid upscaling/downscaling and anisotropic stretching
* Per-feature extended bounding-box extraction
* Feature-ID shuffling and merge operations
* Majority-filter smoothing for grain-label fields
* Seed generation and synthetic LFI benchmark creation
Definitions
-----------
* LFI: Labelled Feature Image
"""
import numpy as np
from numba import njit, prange
from copy import deepcopy
from collections import deque
import upxo._sup.decorators as decorators
# ###########################################################################
# ###########################################################################
# SECTIONING OPERATIPONS
# Search ID: SID_3dSectioningOps
# ###########################################################################
# ###########################################################################
[docs]
def section_from_3d(db, axis=0, location=0):
"""
Extract a 2D section from a 3D grid along a specified axis.
Parameters
----------
db : ndarray
3D input array from which to extract the section.
axis : int, optional
Axis along which to take the section (0, 1, or 2). Default is 0.
location : int, optional
Location index along the specified axis to extract the section. Default is 0.
"""
if axis == 0:
section = db[location, :, :]
elif axis == 1:
section = db[:, location, :]
elif axis == 2:
section = db[:, :, location]
else:
raise ValueError("Axis must be 0, 1, or 2.")
return section
[docs]
def build_pvgrid(data=None, origin=(0, 0, 0), spacing=(1.0, 1.0, 1.0)):
"""Build a PyVista `ImageData` grid from a NumPy array.
Parameters
----------
data : ndarray, optional
Input scalar array to assign as cell data.
origin : tuple, optional
Grid origin coordinates.
spacing : tuple, optional
Grid spacing along each axis.
Returns
-------
pyvista.ImageData
Configured PyVista image grid.
"""
import pyvista as pv
grid = pv.ImageData()
grid.dimensions = np.array(data.shape)+1
grid.origin = origin
grid.spacing = spacing
grid.cell_data["values"] = data.flatten(order="F")
return grid
def _interpolate_grid_2d(data, new_h, new_w, method='nearest'):
"""
Core interpolation function for 2D grid resampling.
Parameters
----------
data : ndarray
2D input array to be resampled.
new_h : int
New height (number of rows).
new_w : int
New width (number of columns).
method : str, optional
Interpolation method. Options are 'nearest', 'linear', etc.
Default is 'nearest'.
Returns
-------
ndarray
Resampled 2D array with shape (new_h, new_w), maintaining original dtype.
"""
from scipy.interpolate import RegularGridInterpolator
h, w = data.shape
x_old, y_old = np.linspace(0, 1, h), np.linspace(0, 1, w)
interp = RegularGridInterpolator((x_old, y_old), data, method=method, bounds_error=False, fill_value=None)
x_new, y_new = np.linspace(0, 1, new_h), np.linspace(0, 1, new_w)
X, Y = np.meshgrid(x_new, y_new, indexing='ij')
pts = np.array([X.ravel(), Y.ravel()]).T
return interp(pts).reshape(new_h, new_w).astype(data.dtype)
[docs]
def mask_featIDImg_at_coords(featIDImg, bsegCoords, local_seg_ids,
featName='fbseg', maskDType=np.int32):
"""
Example
-------
mask_featIDImg_at_coords(lgi_boundaries, bsegCoords, local_seg_ids,
featName='fbseg', maskDType=np.int32)
Usage
-----
import upxo.gsdataops.grid_ops as gridOps
Use as: gridOps.mask_featIDImg_at_coords
"""
segID_masked_lfi = np.asarray(deepcopy(featIDImg), dtype=np.float32)
for cg, ngcoords in bsegCoords.items():
for i, (ng, ngsegcoords) in enumerate(ngcoords.items()):
segid = local_seg_ids[cg][i]
for sc in ngsegcoords:
segID_masked_lfi[sc[0]][sc[1]] = segid
return segID_masked_lfi
# ###########################################################################
# ###########################################################################
#----------------------- BOUNDING BOX OPERATIONS --------------------------
# Search ID: SID_BBoxOps
# ###########################################################################
# ###########################################################################
[docs]
def find_feature_extended_bbox_pix(fid=None, lfi=None, make_binary=False):
"""
Find the extended bounded box of a given fid in a 2D local feature ID array.
Parameters
----------
fid : int
Feature ID for which to find the extended bounding box. Feature
ID is generally the grain ID. Default is None.
lfi : np.ndarray
2D Labelled Feature Image. Default is None.
make_binary : bool, optional
If True, the returned bounding box array is a binary mask. Default is False.
Returns
-------
feat_lfi_ExtBBox : np.ndarray
Extended bounding box of the specified feature ID.
Example
-------
from upxo.ggrowth.mcgs import mcgs
pxtal = mcgs(study='independent',
input_dashboard='input_dashboard_for_testing_50x50_alg202.xls')
pxtal.simulate()
pxtal.detect_grains()
pxtal.gs[16].find_extended_bounding_box(10)
Notes
-----
Applicable only for 2D grain structures of pixellated type.
The extended bounding box adds a one-pixel margin around the feature's
actual bounding box, taking care to handle edge cases where the feature
touches the edges of the array. You could also use this function independently
to find the extended bounding box of any feature in a 2D local feature ID array.
For example, youcould use this to find the extended bounding box of a given
grain boundary segment ID in a grain boundary segment local feature ID array. You
could use either local or global feature IDs for this purpose, but make sure to
use the right lfi array.
Comments
--------
The line `pxtal.gs[16].find_extended_bounding_box(fid)` is an orchestrator
function call in the temporal slice object. The present function is called
internally by that function.
Usage
-----
import upxo.gsdataops.grid_ops as gridOps
Use as: gridOps.find_feature_extended_bbox_pix
"""
xmax, ymax = lfi.shape
loc = np.where(lfi == fid)
if len(loc[0]) == 0:
return None
xi, xj = loc[0].min(), loc[0].max()
yi, yj = loc[1].min(), loc[1].max()
feat_lfi_ExtBBox = lfi[max(0, xi-1):min(xmax, xj+2), max(0, yi-1):min(ymax, yj+2)]
if make_binary:
feat_lfi_ExtBBox = (feat_lfi_ExtBBox == fid).astype(np.int32)
return feat_lfi_ExtBBox
[docs]
def find_extended_bbox_pix_fids(fids=None, lfi=None, make_binary=False):
"""
Find the extended bounded box of a given fid in a 2D local feature ID array.
Parameters
----------
fid : int
Feature ID for which to find the extended bounding box. Feature
ID is generally the grain ID. Default is None.
lfi : np.ndarray
2D Labelled Feature Image. Default is None.
make_binary : bool, optional
If True, the returned bounding box arrays are binary masks. Default is False.
Returns
-------
bboxes : dict
Dictionary with feature IDs as keys and their extended bounding boxes as values.
Notes
-----
Refer to find_feature_extended_bbox_pix for detailed documentation.
Usage
-----
import upxo.gsdataops.grid_ops as gridOps
Use as: gridOps.find_extended_bbox_pix_fids
"""
fids = [fids] if type(fids) is int else fids
xmax, ymax = lfi.shape
bboxes = {}
for fid in fids:
loc = np.where(lfi == fid)
if len(loc[0]) == 0:
continue
xi, xj = loc[0].min(), loc[0].max()
yi, yj = loc[1].min(), loc[1].max()
bbox = lfi[max(0, xi-1):min(xmax, xj+2), max(0, yi-1):min(ymax, yj+2)]
if make_binary:
bbox = (bbox == fid).astype(np.int32)
bboxes[fid] = bbox
return bboxes
[docs]
@decorators.port_doc(module_path='upxo.gsdataops.grid_ops', func_name='find_extended_bbox_pix_fids')
def find_extended_bounding_box_all_grains(fids=None, lfi=None, make_binary=False):
"""Return extended bounding boxes for all requested feature IDs.
Parameters
----------
fids : int or iterable, optional
Feature IDs to process.
lfi : ndarray, optional
2D Labelled Feature Image.
make_binary : bool, optional
If True, output bounding boxes as binary masks.
Returns
-------
dict
Mapping ``fid -> extended bounding-box array``.
"""
grain_lgi_ex_all = find_extended_bbox_pix_fids(fids=fids, lfi=lfi, make_binary=make_binary)
return grain_lgi_ex_all
# ###########################################################################
# ###########################################################################
# ###########################################################################
# ###########################################################################
[docs]
def rescale_grid_2d(data, scale_factor, method='nearest'):
"""
Rescale a 2D grid by a uniform scale factor using interpolation.
Parameters
----------
data : ndarray
2D input array to be rescaled.
scale_factor : float
Uniform scaling factor to apply to both dimensions. Values > 1 enlarge
the grid, values < 1 shrink it.
method : str, optional
Interpolation method to use. Options are 'nearest', 'linear', etc.
Default is 'nearest'.
Returns
-------
ndarray
Rescaled 2D array with shape determined by scale_factor, maintaining
the original data type.
Usage
-----
import upxo.gsdataops.grid_ops as gridOps
"""
h, w = data.shape
new_h, new_w = int(round(h * scale_factor)), int(round(w * scale_factor))
return _interpolate_grid_2d(data, new_h, new_w, method)
[docs]
def stretch_grid_2d(data, stretch_x=1.0, stretch_y=1.0, px_size_x=1.0, px_size_y=1.0, method='nearest'):
"""
Stretch a 2D grid by different factors in each dimension while accounting
for pixel size and physical dimensions.
Parameters
----------
data : ndarray
2D input array to be stretched.
stretch_x : float, optional
Stretch factor in the x (width) dimension. Default is 1.0 (no stretch).
stretch_y : float, optional
Stretch factor in the y (height) dimension. Default is 1.0 (no stretch).
px_size_x : float, optional
Physical pixel size in the x dimension. Default is 1.0.
px_size_y : float, optional
Physical pixel size in the y dimension. Default is 1.0.
method : str, optional
Interpolation method to use. Options are 'nearest', 'linear', etc.
Default is 'nearest'.
Returns
-------
stretched_data : ndarray
Stretched 2D array maintaining the original data type.
px_size_x : float
Physical pixel size in the x dimension (unchanged).
px_size_y : float
Physical pixel size in the y dimension (unchanged).
new_shape : tuple
New shape of the stretched data as (new_h, new_w).
Notes
-----
The stretch is applied to the physical dimensions of the grid, with final
grid size determined by dividing new physical dimensions by pixel sizes.
Usage
-----
import upxo.gsdataops.grid_ops as gridOps
"""
h, w = data.shape
new_physical_x, new_physical_y = w*px_size_x*stretch_x, h*px_size_y*stretch_y
new_w, new_h = int(round(new_physical_x / px_size_x)), int(round(new_physical_y / px_size_y))
stretched_data = _interpolate_grid_2d(data, new_h, new_w, method)
return stretched_data, px_size_x, px_size_y, (new_h, new_w)
[docs]
def resample_grid_2d(data, uigrid, sf=0.5, method='nearest'):
"""
Resample a 2D array at every nth row and column.
Parameters
----------
data : np.ndarray
2D state matrix to resample
uigrid : object or None
Grid configuration object with attributes: xmin, xmax, xinc, ymin, ymax, yinc.
If None, grid attributes are inferred from data shape: xmin=ymin=0, xinc=yinc=1,
xmax=data.shape[1]-1, ymax=data.shape[0]-1.
sf : float
Sampling factor (0 < sf <= 1). sf=0.5 means every 2nd pixel, sf=0.33 means every 3rd pixel
method : str, optional
Interpolation method ('nearest', 'linear', etc.). Default is 'nearest'.
Returns
-------
resampled_data : np.ndarray
Resampled 2D state matrix
x_new : np.ndarray
New x grid coordinates
y_new : np.ndarray
New y grid coordinates
xinc_new : float
New x increment
yinc_new : float
New y increment
"""
from scipy.interpolate import RegularGridInterpolator
if uigrid is None:
class _G:
pass
uigrid = _G()
uigrid.xmin, uigrid.ymin = 0.0, 0.0
uigrid.xinc, uigrid.yinc = 1.0, 1.0
uigrid.xmax = float(data.shape[1] - 1)
uigrid.ymax = float(data.shape[0] - 1)
xgr = np.arange(uigrid.xmin, uigrid.xmax+uigrid.xinc, uigrid.xinc)
ygr = np.arange(uigrid.ymin, uigrid.ymax+uigrid.yinc, uigrid.yinc)
xinc_new, yinc_new = uigrid.xinc/sf, uigrid.yinc/sf
x_new = np.arange(uigrid.xmin, uigrid.xmax + xinc_new, xinc_new)
y_new = np.arange(uigrid.ymin, uigrid.ymax + yinc_new, yinc_new)
# Ensure we don't exceed original bounds
x_new, y_new = x_new[x_new <= uigrid.xmax], y_new[y_new <= uigrid.ymax]
interp = RegularGridInterpolator((ygr, xgr), data, method=method, bounds_error=False, fill_value=None)
Y_new, X_new = np.meshgrid(y_new, x_new, indexing='ij')
pts = np.array([Y_new.ravel(), X_new.ravel()]).T
resampled_data = interp(pts).reshape(len(y_new), len(x_new)).astype(data.dtype)
return resampled_data, x_new, y_new, xinc_new, yinc_new
import numpy as np
def _interpolate_grid_3d(data, new_d, new_h, new_w, method='nearest'):
"""
Core interpolation function for 3D grid resampling.
Parameters
----------
data : ndarray
3D input array to be resampled.
new_d : int
New depth (number of slices).
new_h : int
New height (number of rows).
new_w : int
New width (number of columns).
method : str, optional
Interpolation method. Options: 'nearest', 'linear', etc.
Default is 'nearest'.
Returns
-------
ndarray
Resampled 3D array with shape (new_d, new_h, new_w),
maintaining original dtype.
"""
from scipy.interpolate import RegularGridInterpolator
d, h, w = data.shape
x_old = np.linspace(0, 1, d)
y_old = np.linspace(0, 1, h)
z_old = np.linspace(0, 1, w)
interp = RegularGridInterpolator((x_old, y_old, z_old),
data, method=method, bounds_error=False, fill_value=None)
x_new = np.linspace(0, 1, new_d)
y_new = np.linspace(0, 1, new_h)
z_new = np.linspace(0, 1, new_w)
X, Y, Z = np.meshgrid(x_new, y_new, z_new, indexing='ij')
pts = np.array([X.ravel(), Y.ravel(), Z.ravel()]).T
return interp(pts).reshape(new_d, new_h, new_w).astype(data.dtype)
[docs]
def rescale_grid_3d(data, scale_factor, method='nearest'):
"""
Rescale a 3D grid by a uniform scale factor using interpolation.
Parameters
----------
data : ndarray
3D input array to be rescaled.
scale_factor : float
Uniform scaling factor applied to all three dimensions.
Values > 1 enlarge, values < 1 shrink.
method : str, optional
Interpolation method. Options: 'nearest', 'linear', etc.
Default is 'nearest'.
Returns
-------
ndarray
Rescaled 3D array with shape determined by scale_factor,
maintaining original dtype.
Usage
-----
import upxo.gsdataops.grid_ops as gridOps
Call
----
gridOps.rescale_grid_3d(data, scale_factor, method)
"""
d, h, w = data.shape
new_d = int(round(d*scale_factor))
new_h = int(round(h*scale_factor))
new_w = int(round(w*scale_factor))
return _interpolate_grid_3d(data, new_d, new_h, new_w, method)
[docs]
def detect_grains_cc3d(image_data, connectivity=18, delta=0, return_num_grains=True,
verbose=False):
"""
THe following documentation taken verbatim from
https://seunglab.org/connected-components-3d/ to help UPXO users.
Only a littlbit of doicumentation has been provided below. For a comprehensive
documentation, the user is referred to the above link.
# Only 4,8 (2D) and 26, 18, and 6 (3D) are allowed
# By default, cc3d works on multivalued labelings
If you need the borders to wrap around specify periodic_boundary=True,
currently only supported for 4 and 8 (2d) and 6 (3d) connectivities.
If you're working with continuously valued images like microscopy
images you can use cc3d to perform a very rough segmentation.
If delta = 0, standard high speed processing. If delta > 0, then
neighbor voxel values <= delta are considered the same component.
The algorithm can be 2-10x slower though. Zero is considered
background and will not join to any other voxel.
If you're working with an image that's larger than memory you can
use mmapped files. The input and output files can be used independently.
In this case an array labels.bin that is 5000x5000x2000 voxels and uint32_t
in Fortran order is computed and the results are written to out.bin in Fortran
order. You can find the properties of the file (shape, dtype, order) by inspecting
labels_out.
labels_in = np.memmap("labels.bin", order="F", dtype=np.uint32, shape=(5000, 5000, 2000))
labels_out = cc3d.connected_components(labels_in, out_file="out.bin")
Here's another strategy that you can use for huge files that won't even
take up any disk space. Provide any iterator to this function that produces
thick z sections of the input array that are in sequential order.
The output is a highly compressed CrackleArray that is still random access.
See: https://github.com/seung-lab/crackle
You need to pip install connected-components-3d[stack] to get the extra modules.
def sections(labels_in):
'''
A generator that produces thick Z slices
of an image
'''
for z in range(0, labels_in.shape[2], 100):
yield labels_in[:,:,z:z+100]
# You can access compressed_labels_out using array notation
compressed_labels_out = cc3d.connected_components_stack(sections(labels))
# convert to numpy array, probably a big mistake since
# you probably expected it was going to blow up RAM
cc_labels = compressed_labels_out.numpy()
# if you don't like hanging onto this exotic format, you
# can write it as a numpy array to disk in a memory efficient way.
compressed_labels_out.save("example.npy.gz")
# or hang onto it
compressed_labels_out.save("example.ckl")
"""
import cc3d
if verbose:
print("Finding features like grains.")
labels_out, N = cc3d.connected_components(image_data, connectivity=connectivity,
delta=delta, return_N=True)
return labels_out, N, connectivity
[docs]
def detect_grains_3d(state_array, connectivity=3, return_num_grains=False):
"""
Detect grains in a 3D Monte Carlo state array using connected component labeling.
Grains are identified as connected regions of voxels with the same state ID.
Returns a grain ID (lfi - Local Feature ID) array with the same shape as input.
Parameters
----------
state_array : ndarray
3D integer array where each value represents a state ID (0 to S-1).
Shape: (nx, ny, nz)
connectivity : int, optional
Connectivity type for 3D neighbor detection:
- 1: Face connectivity (6 neighbors, most restrictive)
- 2: Face + edge connectivity (18 neighbors)
- 3: Face + edge + corner connectivity (26 neighbors, default)
return_num_grains : bool, optional
If True, returns tuple (lfi_array, num_grains). Default is False.
Returns
-------
lfi_array : ndarray
3D integer array of grain IDs with same shape as state_array.
Grain IDs start from 1. Background (if any) is labeled 0.
num_grains : int (only if return_num_grains=True)
Total number of grains detected.
Notes
-----
- Uses scipy.ndimage.label for connected component analysis
- Connectivity=1 (6-neighbor) is strictest, useful for well-separated grains
- Connectivity=3 (26-neighbor) is most common, matches typical grain definitions
- Processing time scales with array size: ~0.1s for 100³, ~1s for 300³ voxels
- Memory usage: ~2x input array size for intermediate calculations
Examples
--------
>>> import numpy as np
>>> from upxo.gsdataops.grid_ops import detect_grains_3d
>>>
>>> # Simple 3D state array with two grains
>>> s = np.zeros((10, 10, 10), dtype=np.int16)
>>> s[:5, :, :] = 1 # State 1 in left half
>>> s[5:, :, :] = 2 # State 2 in right half
>>>
>>> # Detect grains
>>> lfi = detect_grains_3d(s)
>>> print(f"Grain IDs: {np.unique(lfi)}") # [1, 2]
>>>
>>> # With grain count
>>> lfi, num_grains = detect_grains_3d(s, return_num_grains=True)
>>> print(f"Detected {num_grains} grains")
>>>
>>> # Use strict connectivity
>>> lfi_strict = detect_grains_3d(s, connectivity=1)
See Also
--------
scipy.ndimage.label : Underlying labeling function
upxo.ggrowth.mcgs.detect_grains : 2D/3D grain detection in MCGS objects
"""
from scipy import ndimage
# Validate input
if state_array.ndim != 3:
raise ValueError(f"Expected 3D array, got {state_array.ndim}D array")
if connectivity not in [1, 2, 3]:
raise ValueError(f"Connectivity must be 1, 2, or 3, got {connectivity}")
# Generate connectivity structure for scipy
# connectivity=1 -> 3x3x3 structure with only face neighbors
# connectivity=2 -> 3x3x3 structure with face + edge neighbors
# connectivity=3 -> 3x3x3 structure with all 26 neighbors
structure = ndimage.generate_binary_structure(3, connectivity)
# Get unique state IDs
unique_states = np.unique(state_array)
# Initialize grain ID array
lfi_array = np.zeros_like(state_array, dtype=np.int32)
# Counter for grain IDs across all states
current_grain_id = 1
# Label connected components for each state separately
for state_id in unique_states:
# Create binary mask for current state
state_mask = (state_array == state_id)
# Label connected components within this state
labeled_state, num_features = ndimage.label(state_mask, structure=structure)
# Assign unique grain IDs to this state's grains
# Avoid relabeling zeros (background)
mask_nonzero = labeled_state > 0
lfi_array[mask_nonzero] = labeled_state[mask_nonzero] + current_grain_id - 1
# Update grain ID counter
current_grain_id += num_features
# Total number of grains detected
num_grains = current_grain_id - 1
if return_num_grains:
return lfi_array, num_grains
else:
return lfi_array
[docs]
def detect_grains_3d_optimized(state_array, connectivity=3, return_num_grains=False):
"""Detect connected grains in a 3D state array using grouped labeling.
Parameters
----------
state_array : ndarray
3D state array.
connectivity : int, optional
Connectivity order for 3D labeling (1, 2, or 3).
return_num_grains : bool, optional
If True, also return number of detected grains.
Returns
-------
ndarray or tuple
3D Labelled Feature Image, optionally with grain count.
"""
from scipy import ndimage
# Validate input
if state_array.ndim != 3:
raise ValueError(f"Expected 3D array, got {state_array.ndim}D array")
if connectivity not in [1, 2, 3]:
raise ValueError(f"Connectivity must be 1, 2, or 3, got {connectivity}")
# Generate connectivity structure for scipy
# connectivity=1 -> 3x3x3 structure with only face neighbors
# connectivity=2 -> 3x3x3 structure with face + edge neighbors
# connectivity=3 -> 3x3x3 structure with all 26 neighbors
structure = ndimage.generate_binary_structure(3, connectivity)
lfi_array = np.zeros_like(state_array, dtype=np.int32)
# 1. Flatten and get indices that would sort the array
flat_array = state_array.ravel()
sorted_idx = np.argsort(flat_array)
sorted_states = flat_array[sorted_idx]
# 2. Find where the state IDs change
diffs = np.diff(sorted_states)
split_indices = np.where(diffs != 0)[0] + 1
# Split the indices into groups, one for each state ID
groups = np.split(sorted_idx, split_indices)
unique_states = sorted_states[np.concatenate(([0], split_indices))]
current_grain_id = 1
# 3. Process each state using its pre-found indices
for state_id, indices in zip(unique_states, groups):
# Instead of state_array == state_id, create mask using known indices
# This mask is sparse and only touches necessary memory
state_mask = np.zeros(state_array.size, dtype=bool)
state_mask[indices] = True
state_mask = state_mask.reshape(state_array.shape)
# Label connected components
labeled_state, num_features = ndimage.label(state_mask, structure=structure)
if num_features > 0:
mask_nonzero = labeled_state > 0
lfi_array[mask_nonzero] = labeled_state[mask_nonzero] + (current_grain_id - 1)
current_grain_id += num_features
num_grains = current_grain_id - 1
return (lfi_array, num_grains) if return_num_grains else lfi_array
[docs]
def detect_grains_using_s(scalar_data, scalar_ids):
"""Detect and relabel connected regions for selected scalar IDs.
Parameters
----------
scalar_data : ndarray
3D scalar/state field.
scalar_ids : iterable
Scalar IDs to segment and relabel.
Returns
-------
ndarray
3D Labelled Feature Image with unique global labels.
"""
from scipy import ndimage
from scipy.ndimage import generate_binary_structure as GBS
results = [ndimage.label(scalar_data == s, structure=GBS(3, 3)) for s in scalar_ids]
label_volumes = [res[0] for res in results]
counts = [res[1] for res in results]
offsets = np.cumsum([0] + counts[:-1])
final_labels = []
for i in range(len(label_volumes)):
vol = label_volumes[i]
offset = offsets[i]
mask = (vol > 0)
vol[mask] += offset
final_labels.append(vol)
grain_id_map = np.sum(final_labels, axis=0)
return grain_id_map
[docs]
def detect_grains_fast_v3(scalar_data, scalar_ids):
"""Fast 3D grain labeling by state-wise grouped bounding-box labeling.
Parameters
----------
scalar_data : ndarray
3D scalar/state field.
scalar_ids : iterable
Scalar IDs to include in detection.
Returns
-------
ndarray
3D Labelled Feature Image.
"""
from scipy import ndimage
from scipy.ndimage import generate_binary_structure as GBS
grain_id_map = np.zeros(scalar_data.shape, dtype=np.int32)
structure = GBS(3, 3)
current_offset = 0
# 1. Sort indices by state ID (The "Grouping" step)
# This is the secret to avoiding scanning the whole volume repeatedly
flat_data = scalar_data.ravel()
sort_idx = np.argsort(flat_data)
sorted_values = flat_data[sort_idx]
# 2. Find transitions between IDs
diffs = np.where(np.diff(sorted_values) != 0)[0] + 1
# Only include IDs that are in your 'scalar_ids' list
split_indices = np.split(sort_idx, diffs)
found_ids = sorted_values[np.concatenate(([0], diffs))]
# 3. Process each group
for s_id, pixels in zip(found_ids, split_indices):
if s_id not in scalar_ids: continue
# Create a local coordinate mask only for the pixels we KNOW belong to this ID
# We find the local bounding box of these specific pixels
z, y, x = np.unravel_index(pixels, scalar_data.shape)
zmin, zmax = z.min(), z.max() + 1
ymin, ymax = y.min(), y.max() + 1
xmin, xmax = x.min(), x.max() + 1
# Slice the region and create a binary mask
sub_vol = scalar_data[zmin:zmax, ymin:ymax, xmin:xmax]
mask = (sub_vol == s_id)
# Label the sub-volume (handles multiple grains of the same state perfectly)
labeled_sub, num_features = ndimage.label(mask, structure=structure)
if num_features > 0:
# Shift IDs to global unique IDs and place back
dest = grain_id_map[zmin:zmax, ymin:ymax, xmin:xmax]
nonzero = labeled_sub > 0
dest[nonzero] = labeled_sub[nonzero] + current_offset
current_offset += num_features
return grain_id_map
[docs]
def detect_grains_greedy_3d(state_array):
"""Detect 3D grains with greedy flood-fill using 26-neighbour traversal.
Parameters
----------
state_array : ndarray
3D state array.
Returns
-------
ndarray
3D Labelled Feature Image.
"""
nx, ny, nz = state_array.shape
lfi_array = np.zeros_like(state_array, dtype=np.int32)
visited = np.zeros_like(state_array, dtype=bool)
current_grain_id = 1
# Define neighbor offsets for 26-connectivity (connectivity=3)
# This is the "greedy" reach of the algorithm
offsets = np.array(np.meshgrid([-1,0,1], [-1,0,1], [-1,0,1])).T.reshape(-1, 3)
offsets = offsets[~np.all(offsets == 0, axis=1)] # Remove [0,0,0]
for x in range(nx):
for y in range(ny):
for z in range(nz):
# If we haven't visited this voxel, it's the start of a new grain
if not visited[x, y, z]:
state_id = state_array[x, y, z]
# Start a Greedy BFS (Flood Fill)
queue = deque([(x, y, z)])
visited[x, y, z] = True
lfi_array[x, y, z] = current_grain_id
while queue:
cx, cy, cz = queue.popleft()
for dx, dy, dz in offsets:
nx_idx, ny_idx, nz_idx = cx+dx, cy+dy, cz+dz
# Bounds check
if 0 <= nx_idx < nx and 0 <= ny_idx < ny and 0 <= nz_idx < nz:
# Check if same state and not yet visited
if not visited[nx_idx, ny_idx, nz_idx] and \
state_array[nx_idx, ny_idx, nz_idx] == state_id:
visited[nx_idx, ny_idx, nz_idx] = True
lfi_array[nx_idx, ny_idx, nz_idx] = current_grain_id
queue.append((nx_idx, ny_idx, nz_idx))
current_grain_id += 1
return lfi_array
# ###########################################################################
# ###########################################################################
# Intra-grain location finding operations
# Search ID: SID_IntraGrainLocOps
# ###########################################################################
# ###########################################################################
# ###########################################################################
# ###########################################################################
# General LFI operations
# Search ID: SID_GeneralLFIOps
[docs]
def shuffle_feature_IDs(lfi):
"""
Shuffle the feature IDs in a local feature ID array (lfi) while keeping the
same number of features and their spatial distribution intact.
Parameters
----------
lfi : np.ndarray
N-dimensional Labelled Feature Image.
Returns
-------
np.ndarray
Local feature ID array with shuffled feature IDs.
Usage
-----
import upxo.gsdataops.grid_ops as gridOps
Use as: gridOps.shuffle_feature_IDs
"""
unique_ids = np.unique(lfi)
if 0 in unique_ids:
unique_ids = unique_ids[unique_ids != 0]
shuffled_ids = unique_ids.copy()
np.random.shuffle(shuffled_ids)
id_map = dict(zip(unique_ids, shuffled_ids))
lfi = np.vectorize(lambda x: id_map.get(x, x))(lfi)
return lfi
[docs]
def merge_features_to_neighs(lfi, fids, neigh_fids):
"""
Update the labelled feature ID array (lfi) by merging specified fids
into their corresponding neighboring feature IDs (neigh_fids).
Parameters
----------
lfi : np.ndarray
N-dimensional Labelled Feature Image.
fids : list or array-like
List of feature IDs to be merged.
neigh_fids : dict
Dictionary mapping each feature ID in fids to its neighboring feature ID.
Returns
-------
np.ndarray
Updated lfi array with specified fids merged into their neighbors.
Usage
-----
import upxo.gsdataops.grid_ops as gridOps
Use as: gridOps.merge_features_to_neighs
"""
for island in fids:
lfi[lfi==island] = neigh_fids[island]
return lfi
# #################################################################################
# #################################################################################
'''
VOXEL MORPHOLOGY SMOOTHING OPERATIONS
Search ID: SID_VoxelMorphSmoothOps
List of definitions
-------------------
get_ball_footprint
smooth_voxMorph_npass
smooth_voxMorph_1pass
majority_filter_2d: numba implementation
majority_filter_3d_npass
majority_filter_3d_1pass
'''
[docs]
def smooth_voxMorph_npass(lfi, niterations=2, DILfpSizes=[4, 4],
ERSfpSizes=[4, 4], footprints=['ball', 'ball'],
removeEndVox=[True, True]):
"""Apply multiple morphology smoothing passes to a 3D LFI.
Parameters
----------
lfi : ndarray
3D Labelled Feature Image.
niterations : int, optional
Number of smoothing passes.
DILfpSizes : list, optional
Dilation footprint sizes per pass.
ERSfpSizes : list, optional
Erosion footprint sizes per pass.
footprints : list, optional
Footprint type per pass.
removeEndVox : list, optional
End-voxel trimming flags per pass.
Returns
-------
ndarray
Smoothed 3D Labelled Feature Image.
"""
for i in np.arange(1, niterations+1):
lfi = smooth_voxMorph_1pass(lfi, DILfpSize=DILfpSizes[i-1],
ERSfpSize=ERSfpSizes[i-1], footprint=footprints[i-1],
removeEndVox=removeEndVox[i-1])
return lfi
[docs]
def smooth_voxMorph_1pass(lfi, DILfpSize=4, ERSfpSize=4,
footprint='ball', removeEndVox=True):
"""Apply one morphology smoothing pass (dilation then erosion).
Parameters
----------
lfi : ndarray
3D Labelled Feature Image.
DILfpSize : int, optional
Dilation footprint size.
ERSfpSize : int, optional
Erosion footprint size.
footprint : str, optional
Footprint type, currently ``'ball'``/``'sphere'``.
removeEndVox : bool, optional
Reserved control for endpoint-voxel handling.
Returns
-------
ndarray
Smoothed 3D Labelled Feature Image.
"""
from skimage.morphology import dilation, erosion
if footprint in ('ball', 'sphere'):
DILfp = get_ball_footprint(DILfpSize)
ERSfp = get_ball_footprint(ERSfpSize)
else:
raise NotImplementedError(f"Footprint {footprint} not implemented yet.")
return erosion(dilation(lfi, footprint=DILfp), footprint=ERSfp)
@njit(parallel=True)
def majority_filter_2d(lfi):
"""
Apply a 2D majority filter to the local feature ID array (lfi) to smooth
the grain structure by replacing each pixel's feature ID with the most common
feature ID among its 8 neighbors (including itself).
Parameters
----------
lfi : np.ndarray
2D Labelled Feature Image.
Returns
-------
np.ndarray
Smoothed 2D array of feature IDs after applying the majority filter.
Usage
-----
import upxo.gsdataops.grid_ops as gridOps
Use as: gridOps.majority_filter_2d
"""
h, w = lfi.shape
lfi_new = lfi.copy()
for i in prange(1, h-1):
for j in range(1, w-1):
# Extract 3x3 neighborhood
neigh = lfi[i-1:i+2, j-1:j+2].ravel()
max_label = -1
max_count = 0
tie = False
# Count frequencies
for a in range(neigh.size):
label = neigh[a]
count = 0
for b in range(neigh.size):
if neigh[b] == label:
count += 1
if count > max_count:
max_count = count
max_label = label
tie = False
elif count == max_count and label != max_label:
tie = True
if not tie:
lfi_new[i, j] = max_label
return lfi_new
[docs]
def majority_filter_3d_npass(lfi=None, n=2, sizes=[3, 3]):
"""
Apply multiple passes of a 3D majority filter to the local feature ID array (lfi)
to iteratively smooth the grain structure.
Parameters
----------
lfi : np.ndarray
3D Labelled Feature Image to be smoothed. Default is None,
which will raise an error if not provided.
n : int
Number of passes to apply. Default is 2.
sizes : list of int
List of window sizes for each pass. Must have length n. Default is [3, 3].
Returns
-------
np.ndarray
Smoothed 3D array of feature IDs after applying n passes of the majority filter.
"""
for i in np.arange(1, n+1):
lfi_new = _majority_filter_3d_(lfi, size=sizes[i-1])
diff = np.sum(lfi_new != lfi)
print(f"Smoothing pass {i}. Number of voxels modified: {diff}")
lfi = lfi_new
return lfi
[docs]
def majority_filter_3d_1pass(lfi=None, size=3):
"""
Apply a 3D majority filter to the local feature ID array (lfi) to smooth
the grain structure by replacing each voxel's feature ID with the most common
feature ID among its neighbors in a 3D window.
Parameters
----------
lfi : np.ndarray
3D Labelled Feature Image. Default is None, which will raise
an error if not provided.
size : int, optional
Size of the neighborhood window (must be odd). Default is 3 for a 3x3x3 window.
Returns
-------
np.ndarray
Smoothed 3D array of feature IDs after applying the majority filter.
Usage
-----
import upxo.gsdataops.grid_ops as gridOps
Use as: gridOps.majority_filter_3d
"""
return _majority_filter_3d_(lfi, size=size)
@njit(parallel=True)
def _majority_filter_3d_(lfi, size=3):
"""
Apply a more general 3D majority filter to the local feature ID array (lfi)
using a window of the specified size.
Parameters
----------
lfi : np.ndarray
3D Labelled Feature Image.
size : int
Size of the neighborhood window (must be odd). Default is 3 for a 3x3x3 window.
Returns
-------
np.ndarray
Smoothed 3D array.
"""
r = size // 2 # radius
d, h, w = lfi.shape
lfi_new = lfi.copy()
for i in prange(r, d - r):
for j in range(r, h - r):
for k in range(r, w - r):
max_label = -1
max_count = 0
tie = False
# Count frequencies inside window
for ii in range(i - r, i + r + 1):
for jj in range(j - r, j + r + 1):
for kk in range(k - r, k + r + 1):
label = lfi[ii, jj, kk]
count = 0
# Count occurrences of this label
for iii in range(i - r, i + r + 1):
for jjj in range(j - r, j + r + 1):
for kkk in range(k - r, k + r + 1):
if lfi[iii, jjj, kkk] == label:
count += 1
if count > max_count:
max_count = count
max_label = label
tie = False
elif count == max_count and label != max_label:
tie = True
if not tie:
lfi_new[i, j, k] = max_label
return lfi_new
[docs]
def compute_grain_bounding_boxes(lfi, padding=1):
"""
Identifies the min/max spatial extent for every grain ID in the volume.
Parameters:
-----------
lfi : ndarray
3D Labelled Feature Image.
padding : int
Extra voxels to include around the box to ensure boundary
gradients are captured.
Returns:
--------
bboxes : dict
Mapping of GrainID -> tuple(slice_x, slice_y, slice_z)
"""
bboxes = {}
unique_ids = np.unique(lfi)
# We skip 0 if it represents the background/exterior
for gid in unique_ids:
if gid == 0:
continue
# Find coordinates where the grain exists
coords = np.argwhere(lfi == gid)
if coords.size == 0:
continue
# Get min and max for each axis
min_coords = coords.min(axis=0) - padding
max_coords = coords.max(axis=0) + padding + 1
# Clip to ensure we stay within the array dimensions
min_coords = np.maximum(min_coords, 0)
max_coords = np.minimum(max_coords, lfi.shape)
# Create slice objects for easy indexing
bboxes[gid] = tuple(slice(min_coords[i], max_coords[i]) for i in range(3))
return bboxes
[docs]
def compute_local_grain_mask(lfi, bounding_box, grain_id):
"""
Extracts a local boolean mask for a specific grain within its bounding box.
Parameters:
-----------
lfi : ndarray
3D Labelled Feature Image (can be the original or lfiex).
bounding_box : tuple of slices
The (slice_x, slice_y, slice_z) identified for this grain.
grain_id : int
The ID of the grain to mask.
Returns:
--------
grain_mask : ndarray[bool]
A 3D boolean array of the same shape as the bounding box crop,
True where the voxel belongs to the grain_id.
"""
# 1. Crop the global array to the bounding box
local_crop = lfi[bounding_box]
# 2. Create the boolean mask for the target grain
grain_mask = (local_crop == grain_id)
return grain_mask
# #################################################################################
# #################################################################################
# STANDARD LFI GENERATORS
[docs]
def generate_test_2D_LFI_1(plotlfi=True, figsize=(6, 6), dpi=100):
"""
Generate a 2D Local Feature ID (LFI) array with multiple grains and features
for testing purposes. The array includes distinct quadrants, a central feature,
an island, a thin neck, and a triple point cluster to provide a variety of
grain structures for testing grain detection and analysis algorithms.
Returns
-------
np.ndarray
A 2D array of shape (200, 200) representing the LFI with various features.
Usage
-----
import upxo.gsdataops.grid_ops as gridOps
Use as: gridOps.generate_test_2D_LFI_1
"""
M, N = 200, 200
lfi = np.zeros((M, N), dtype=int)
# --- Original Quadrants and Central Features ---
lfi[:100, :100] = 1
lfi[100:, :100] = 2
lfi[:100, 100:] = 3
lfi[100:, 100:] = 4
lfi[75:125, 75:125] = 5 # Central square
lfi[20:30, 20:40] = 6 # Island
lfi[150:152, 10:190] = 7 # The Neck (Thin horizontal strip)
lfi[95:105, 95:105] = 8 # Triple Point Cluster
# --- New Features using Coordinate Grids ---
Y, X = np.ogrid[:M, :N]
# 3. Sinusoidal Interface (challenging Grain 2 / Grain 4 boundary)
# Redefine the boundary between Y > 100 and Y < 100 with a wave
wave = 100 + 8 * np.sin(X / 15.0)
mask_wave = (Y < wave) & (X > 100)
lfi[mask_wave] = 4
lfi[~mask_wave & (X > 100) & (Y < 125)] = 3
# 4. The "Swiss Cheese" Grain (Grain 12 with holes 13, 14)
center_cheese = (50, 150)
dist_cheese = np.sqrt((X - center_cheese[1])**2 + (Y - center_cheese[0])**2)
lfi[dist_cheese < 25] = 12
# Add holes
lfi[np.sqrt((X-140)**2 + (Y-45)**2) < 5] = 13
lfi[np.sqrt((X-160)**2 + (Y-55)**2) < 5] = 14
# 1. Circle inside a Circle (Concentric)
# Located in the bottom-right quadrant (Grain 4 area)
center_concentric = (170, 170) # (y, x)
dist_concentric = np.sqrt((X - center_concentric[1])**2 + (Y - center_concentric[0])**2)
lfi[dist_concentric < 20] = 9 # Outer circle
lfi[dist_concentric < 8] = 10 # Inner circle
# 2. Coalescing Circles (Two overlapping features)
# Located in the top-left quadrant (Grain 1 area)
c1, c2 = (40, 70), (40, 85) # (y, x) Centers shifted horizontally
dist1 = np.sqrt((X - c1[1])**2 + (Y - c1[0])**2)
dist2 = np.sqrt((X - c2[1])**2 + (Y - c2[0])**2)
# Both circles assigned to the same Grain ID to simulate coalescence
lfi[(dist1 < 12) | (dist2 < 12)] = 11
# 5. Triple Triangles meeting at a single pixel (Grains 15, 16, 17)
# Define exact center (must be floats for precise distance calculation)
cy, cx = 115.0, 35.0
radius = 22.0
# 1. Create relative coordinate grids
# Y, X are already defined as np.ogrid[:M, :N] in your function
dy = Y - cy
dx = X - cx
# 2. Calculate Distance and Angle
# We use degrees [0, 360] to make the 120-degree logic foolproof
dist = np.sqrt(dx**2 + dy**2)
# arctan2(y, x) returns [-pi, pi], so we shift to [0, 360]
angle = (np.arctan2(dy, dx) * 180 / np.pi) % 360
# 3. Create the feature mask
mask = dist <= radius
# 4. Assign Sectors (Triangle 120-degree divisions)
# Triangle 1 (Bottom sector in your image)
lfi[mask & (angle >= 340) & (angle < 360)] = 15
# Triangle 2 (Top-Right sector)
lfi[mask & (angle >= 30) & (angle < 50)] = 16
# Triangle 3 (Top-Left sector)
lfi[mask & (angle >= 120) & (angle < 240)] = 17
if plotlfi:
# --- Visualization Check ---
import matplotlib.pyplot as plt
plt.figure(figsize=figsize, dpi=dpi)
plt.imshow(lfi, origin='lower', cmap='tab20')
plt.title("Updated LFI with Concentric and Coalescing Features")
plt.colorbar(label='Grain ID')
plt.show()
return lfi
# #################################################################################
# #################################################################################
[docs]
def generate_constrained_hybrid_seeds(lfi, target_spacing=0.5, bulk_spacing=10.0,
jitter_factor=1.0, margin=0.0, padding=1.0, plot_seeds=False,
figsize=(8, 8), dpi=50, markersize=2):
"""
Seeding with Rigid Guard Rails to eliminate RVE boundary irregularities.
Parameters
----------
lfi : np.ndarray
2D Labelled Feature Image.
target_spacing : float
Desired spacing between seeds along the boundary. Default is 0.5 units.
bulk_spacing : float
Desired spacing between seeds in the bulk (internal) region. Default is 10.0
units.
jitter_factor : float
Factor to control the amount of jitter applied to bulk seeds. Default is 1.0
(0 means no jitter, 1 means up to ±bulk_spacing/2 jitter).
margin : float
Margin to define an internal buffer to prevent bulk seeds from pushing against RVE edges. Default is 0.0.
padding : float
Padding to define an additional buffer around the domain edges for guard rails. Default is 1.0.
plot_seeds : bool
Whether to plot the seeds on top of the LFI for visualization. Default is False.
figsize : tuple
Size of the figure for plotting seeds if plot_seeds is True. Default is (8, 8).
dpi : int
Dots per inch for the seed plot if plot_seeds is True. Default is 50.
markersize : int
Size of the markers for the seeds if plot_seeds is True. Default is 2.
Returns
-------
np.ndarray
Array of seed coordinates with shape (num_seeds, 2), where each row is [x, y].
Notes
-----
- The function first detects the grain boundaries in the LFI and samples seeds along
these boundaries at a specified target spacing.
- It then identifies internal regions that are sufficiently far from the boundaries
and samples seeds in these bulk areas with a specified bulk spacing, applying jitter
to avoid regular patterns.
- Finally, it adds rigid guard rails of seeds along the domain edges to ensure that
the Voronoi tessellation produces straight edges at the boundaries, eliminating
irregularities caused by boundary effects.
Usage
-----
import upxo.gsdataops.grid_ops as gridOps
Use as: gridOps.constrained_hybrid_seeds
"""
from scipy.ndimage import distance_transform_edt
import numpy as np
M, N = lfi.shape
# Interface Detection
gy, gx = np.gradient(lfi)
boundary_mask = (gx != 0) | (gy != 0)
# Perform Equidistant Boundary Sampling
coords_boundary = np.argwhere(boundary_mask).astype(float)
stride = max(1, int(target_spacing / 1.0))
seeds_boundary = coords_boundary[::stride]
# Jittered Bulk Sampling (Internal Only)
dist_to_boundary = distance_transform_edt(~boundary_mask)
# Define an internal buffer to prevent bulk seeds from pushing against RVE edges
Y, X = np.indices((M, N))
internal_mask = (X > margin) & (X < N-margin) & (Y > margin) & (Y < M-margin)
bulk_mask = (dist_to_boundary > target_spacing * 3) & internal_mask
coords_bulk = np.argwhere(bulk_mask).astype(float)
seeds_bulk = np.empty((0, 2))
if len(coords_bulk) > 0:
bulk_stride = max(1, int(bulk_spacing))
seeds_bulk = coords_bulk[::bulk_stride]
# Apply jitter to break inclined/aliased patterns
jitter = (np.random.rand(*seeds_bulk.shape) - 0.5) * (bulk_stride * jitter_factor)
seeds_bulk += jitter
# RIGID GUARD RAILS (The Boundary Fix)
# Place seeds exactly 1 unit outside the boundary to create flat bisectors
guard_spacing = target_spacing
pad = padding
rail_coords = []
# Horizontal rails (Top and Bottom)
steps_x = np.arange(-pad, N + pad, guard_spacing)
for x in steps_x:
rail_coords.append([-pad, x]) # Bottom rail
rail_coords.append([M + pad, x]) # Top rail
# Vertical rails (Left and Right)
steps_y = np.arange(-pad, M + pad, guard_spacing)
for y in steps_y:
rail_coords.append([y, -pad]) # Left rail
rail_coords.append([y, N + pad]) # Right rail
seeds_guard = np.array(rail_coords)
all_seeds = np.vstack([seeds_boundary, seeds_bulk, seeds_guard])[:, [1, 0]]
if plot_seeds:
import matplotlib.pyplot as plt
fig, ax = plt.subplots(figsize=figsize, dpi=dpi)
ax.plot(seeds_boundary[:, 1], seeds_boundary[:, 0], 'ko', label='Boundary Seeds', markersize=markersize)
ax.plot(seeds_bulk[:, 1], seeds_bulk[:, 0], 'b.', label='Bulk Seeds', markersize=markersize)
ax.plot(seeds_guard[:, 1], seeds_guard[:, 0], 'ro', label='Guard Rails', markersize=markersize+2)
box = ax.get_position()
ax.set_position([box.x0, box.y0, box.width * 0.8, box.height])
ax.legend(loc='center left', bbox_to_anchor=(1, 0.5))
# Formatting
# ax.set_title("RVE optimized constrained hybrid Voronoi tessellation seeds", fontsize=8)
ax.grid(True, linestyle='--', alpha=0.3)
ax.set_aspect('equal')
# Rigid RVE Cropping
ax.set_xlim(0, N)
ax.set_ylim(0, M)
plt.show()
# 5. Standardize to [x, y] Cartesian coordinates
return all_seeds
import numpy as np
from scipy.ndimage import distance_transform_edt
[docs]
def generate_constrained_hybrid_seeds_3d(lfi, target_spacing=0.5, bulk_spacing=10.0,
jitter_factor=1.0, margin=0.0, padding=1.0):
"""
3D Seeding with Guard Sheets to eliminate RVE boundary irregularities.
Parameters
----------
lfi : np.ndarray
3D Labelled Feature Volume (D, H, W).
target_spacing : float
Spacing along the grain boundary surfaces.
bulk_spacing : float
Spacing within the interior of grains.
jitter_factor : float
Magnitude of random displacement for bulk seeds.
margin : float
Internal buffer to keep bulk seeds away from RVE faces.
padding : float
Distance outside the RVE where guard seeds are placed.
Returns
-------
np.ndarray
Array of seed coordinates with shape (num_seeds, 3) as [x, y, z].
"""
D, H, W = lfi.shape
# 1. Interface Detection (3D Gradients)
# Detects where the label changes in any direction
gz, gy, gx = np.gradient(lfi)
boundary_mask = (gx != 0) | (gy != 0) | (gz != 0)
# Surface Sampling
coords_boundary = np.argwhere(boundary_mask).astype(float)
stride = max(1, int(target_spacing))
seeds_boundary = coords_boundary[::stride]
# 2. Jittered Bulk Sampling
dist_to_boundary = distance_transform_edt(~boundary_mask)
# Create internal volume mask
Z, Y, X = np.indices((D, H, W))
internal_mask = (X > margin) & (X < W - margin) & \
(Y > margin) & (Y < H - margin) & \
(Z > margin) & (Z < D - margin)
bulk_mask = (dist_to_boundary > target_spacing * 3) & internal_mask
coords_bulk = np.argwhere(bulk_mask).astype(float)
seeds_bulk = []
if len(coords_bulk) > 0:
bulk_stride = max(1, int(bulk_spacing))
seeds_bulk = coords_bulk[::bulk_stride]
# Apply 3D jitter
jitter = (np.random.rand(*seeds_bulk.shape) - 0.5) * (bulk_stride * jitter_factor)
seeds_bulk += jitter
# 3. RIGID GUARD SHEETS (The 3D Boundary Fix)
# We create 6 planes of seeds just outside the domain boundaries
guard_spacing = target_spacing
pad = padding
sheet_coords = []
# Helper to generate a 2D grid for the faces
def get_grid(dim1_range, dim2_range, fixed_val, axis):
"""Return the grid."""
grid_a, grid_b = np.meshgrid(
np.arange(-pad, dim1_range + pad, guard_spacing),
np.arange(-pad, dim2_range + pad, guard_spacing)
)
points = np.zeros((grid_a.size, 3))
if axis == 0: # X-Face
points[:, 0], points[:, 1], points[:, 2] = fixed_val, grid_a.ravel(), grid_b.ravel()
elif axis == 1: # Y-Face
points[:, 0], points[:, 1], points[:, 2] = grid_a.ravel(), fixed_val, grid_b.ravel()
else: # Z-Face
points[:, 0], points[:, 1], points[:, 2] = grid_a.ravel(), grid_b.ravel(), fixed_val
return points
# X-Faces (Left/Right)
sheet_coords.append(get_grid(H, D, -pad, axis=0))
sheet_coords.append(get_grid(H, D, W + pad, axis=0))
# Y-Faces (Front/Back)
sheet_coords.append(get_grid(W, D, -pad, axis=1))
sheet_coords.append(get_grid(W, D, H + pad, axis=1))
# Z-Faces (Top/Bottom)
sheet_coords.append(get_grid(W, H, -pad, axis=2))
sheet_coords.append(get_grid(W, H, D + pad, axis=2))
seeds_guard = np.vstack(sheet_coords)
# 4. Combine and Standardize
# Input is (z, y, x) from argwhere, so we flip to (x, y, z)
all_seeds = np.vstack([
seeds_boundary,
seeds_bulk,
seeds_guard
])
# Reorder from [z, y, x] to [x, y, z] for standard Cartesian output
return all_seeds[:, [2, 1, 0]]
[docs]
def generate_poisson_disk_seeds(xbound, ybound, radius=5, k=6, see_seeds=False, **plotKwargs):
"""
Generate Poisson Disk Sampling seeds within a specified rectangular domain defined by xbound and ybound.
Parameters
----------
xbound : tuple
A tuple (x_min, x_max) defining the horizontal bounds of the sampling area.
ybound : tuple
A tuple (y_min, y_max) defining the vertical bounds of the sampling area.
radius : float
The minimum distance between any two seeds. Default is 5 units.
k : int
The number of attempts to place a new seed around an existing seed before giving up. Default is 6.
Returns
-------
np.ndarray
An array of seed coordinates with shape (num_seeds, 2), where each row is [x, y].
Notes
-----
- Poisson Disk Sampling is a method for generating points that are evenly distributed while maintaining
a minimum distance between them.
- The algorithm typically starts with an initial random seed and iteratively attempts to place new seeds
around existing seeds, ensuring that they are at least 'radius' distance apart. The 'k' parameter
controls how many attempts are made to place a new seed around an existing seed before it is discarded.
- This method is particularly useful for applications like seeding in Voronoi tessellations, where a more
uniform distribution of seeds is desired compared to purely random sampling.
Usage
-----
import upxo.gsdataops.grid_ops as gridOps
Use as: gridOps.generate_poisson_disk_seeds
"""
# pds: Poisson Disk Sampling
xstart, ystart = xbound[0], ybound[0]
# xend, yend = xbound[1], ybound[1]
from upxo.statops.sampling import bridson_uniform_density as bud
# bud(width=1, height=1, radius=0.2, k=20)
points = bud(width=xbound[1]-xbound[0], height=ybound[1]-ybound[0], radius=radius, k=k)
locx, locy = [_[0]+xstart for _ in points], [_[1]+ystart for _ in points]
seeds = np.array(list(zip(locx, locy)))
if see_seeds:
from upxo.viz import gsviz
gsviz.see_2dPoints(seeds, figsize=plotKwargs.get('figsize', (6, 6)),
dpi=plotKwargs.get('dpi', 100), title=plotKwargs.get('title', 'Poisson Disk Seeds'),
xlabel=plotKwargs.get('xlabel', 'X-axis'), ylabel=plotKwargs.get('ylabel', 'Y-axis'),
point_color=plotKwargs.get('point_color', 'black'),
point_size=plotKwargs.get('point_size', 20),
point_marker=plotKwargs.get('point_marker', '.'),
point_alpha=plotKwargs.get('point_alpha', 0.8))
return seeds
[docs]
def generate_darted_seeds(xbound, ybound, radius=5, k=6, see_seeds=False, **plotKwargs):
"""
Generate Darted Sampling seeds within a specified rectangular domain defined by xbound and ybound.
Parameters
----------
xbound : tuple
A tuple (x_min, x_max) defining the horizontal bounds of the sampling area.
ybound : tuple
A tuple (y_min, y_max) defining the vertical bounds of the sampling area.
radius : float
The minimum distance between any two seeds. Default is 5 units.
k : int
The number of attempts to place a new seed around an existing seed before giving up. Default
is 6.
Returns
-------
np.ndarray
An array of seed coordinates with shape (num_seeds, 2), where each row is [x, y].
Notes
-----
- Darted Sampling is a method for generating points that are evenly distributed while maintaining
a minimum distance between them, similar to Poisson Disk Sampling. However, Darted Sampling
typically involves a more aggressive rejection of candidate points, which can lead to a more
uniform distribution in certain cases.
- The algorithm starts with an initial random seed and iteratively attempts to place new seeds around
existing seeds. If a candidate seed is too close to an existing seed (within the specified radius),
it is rejected (or "darted"). The 'k' parameter controls how many attempts are made to place a new
seed around an existing seed before it is discarded.
- This method can be particularly effective for applications like seeding in Voronoi tessellations,
where a more uniform distribution of seeds is desired compared to purely random sampling, and
can sometimes produce better results than Poisson Disk Sampling in terms of uniformity.
- The 'darting' aspect of the algorithm can help to break up clusters of points that might occur in
Poisson Disk Sampling, leading to a more even distribution of seeds across the domain.
Usage
-----
import upxo.gsdataops.grid_ops as gridOps
Use as: gridOps.generate_darted_seeds
"""
from upxo.statops.sampling import dart
xstart, ystart = xbound[0], ybound[0]
points = dart(width=xbound[1]-xbound[0], height=ybound[1]-ybound[0], radius=radius, k=k)
locx = [_[0]+xstart for _ in points]
locy = [_[1]+ystart for _ in points]
seeds = np.array(list(zip(locx, locy)))
if see_seeds:
from upxo.viz import gsviz
gsviz.see_2dPoints(seeds, figsize=plotKwargs.get('figsize', (6, 6)),
dpi=plotKwargs.get('dpi', 100), title=plotKwargs.get('title', 'Darted Seeds'),
xlabel=plotKwargs.get('xlabel', 'X-axis'), ylabel=plotKwargs.get('ylabel', 'Y-axis'),
point_color=plotKwargs.get('point_color', 'blue'),
point_size=plotKwargs.get('point_size', 20),
point_marker=plotKwargs.get('point_marker', '.'),
point_alpha=plotKwargs.get('point_alpha', 0.8),
plot_legend=plotKwargs.get('plot_legend', True)
)
return seeds
# #################################################################################
# #################################################################################
[docs]
def pad_lfi(lfi, pad_width, padder):
"""
Pad a local feature ID array (lfi) with a specified value on all sides.
Parameters
----------
lfi : np.ndarray
N-dimensional Labelled Feature Image to be padded.
pad_width : int or tuple
The width of the padding to be applied on each side. If an integer is provided,
the same padding width will be applied to all sides. If a tuple is provided, it
should specify the padding width for each side in the format ((before_1, after_1),
(before_2, after_2), ...).
padder : int
The value to use for padding the array. This value will be assigned to the padded
regions around the original array.
Returns
-------
np.ndarray
The padded local feature ID array with the specified padding width and padder value.
Usage
-----
import upxo.gsdataops.grid_ops as gridOps
Use as: gridOps.pad_lfi
"""
def pad_with(vector, pad_width, iaxis, kwargs):
"""Pad with."""
# REF: https://numpy.org/doc/stable/reference/generated/numpy.pad.html
pad_value = kwargs.get('padder', 10)
vector[:pad_width[0]] = pad_value
vector[-pad_width[1]:] = pad_value
lgi_padded = np.pad(lfi, pad_width=pad_width, mode=pad_with, padder=padder)
return lgi_padded
[docs]
def find_gb_v1(gsimage, plot_gb=True, figsize=(6, 6), dpi=100, cmap='nipy_spectral'):
"""Find gb v1."""
from skimage.segmentation import find_boundaries
gsimg_padded_boundaries = find_boundaries(gsimage, connectivity=1, mode='thick', background=0)*gsimage
# gsimg_boundaries = gsimg_padded_boundaries[1:-1, 1:-1]
gsimg_boundaries = gsimg_padded_boundaries # The above line would make it specific.
# As we have commeneted out the padding, we can keep the boundaries as is without cropping.
# NOTE: USERS WILL HAVE TO MORE CAREFUL in theire data entry.
if plot_gb:
import matplotlib.pyplot as plt
plt.figure(figsize=figsize, dpi=dpi)
plt.imshow(gsimg_boundaries, cmap=cmap, origin='lower')
plt.title("Grain Boundaries (Thick, Connectivity=1)")
plt.colorbar(label='Boundary ID')
plt.show()
return gsimg_boundaries
[docs]
def segment_grain_boundaries(gsimage, gbimage, neigh_fid, connectivity=8):
"""Segment grain boundaries."""
from collections import defaultdict
rows, cols = np.where(gbimage)
interfaces = defaultdict(list)
if connectivity == 4:
offsets = [(0, 1), (0, -1), (1, 0), (-1, 0)]
elif connectivity == 8:
offsets = [(0, 1), (0, -1), (1, 0), (-1, 0), (1, 1), (1, -1), (-1, 1), (-1, -1)]
gsimage_int = gsimage.astype(int)
max_r, max_c = gsimage.shape
for r, c in zip(rows, cols):
g1 = gsimage_int[r, c]
if g1 == 0: continue
for dr, dc in offsets:
nr, nc = r+dr, c+dc
if 0 <= nr < max_r and 0 <= nc < max_c:
g2 = gsimage_int[nr, nc]
if g2 != 0 and g2 != g1:
interfaces[(g1, g2)].append((r, c))
segments = {pair: np.unique(np.array(pts), axis=0) for pair, pts in interfaces.items()}
segments = {cg: {ng: segments[(cg, ng)] for ng in ngs} for cg, ngs in neigh_fid.items()}
nsegments = {cg: len(segments[cg]) for cg in neigh_fid.keys()}
return segments, nsegments
[docs]
def make_gbsegImage(gbMask, segments, nsegments, neigh_fid):
"""Build and return gbsegImage."""
new_ids = {cg: np.round(np.linspace(cg, cg+1, nsegments[cg]+1)[:-1], 4)
for cg, ngs in neigh_fid.items()}
from copy import deepcopy
new_ids = {cg: np.round(np.linspace(cg, cg+1, nsegments[cg]+1)[:-1], 4)
for cg, ngs in neigh_fid.items()}
from copy import deepcopy
gbSegMask = np.asarray(deepcopy(gbMask), dtype=np.float32)
for cg, ngcoords in segments.items():
for i, (ng, ngsegcoords) in enumerate(ngcoords.items()):
segid = new_ids[cg][i]
for sc in ngsegcoords:
gbSegMask[sc[0]][sc[1]] = segid
return gbSegMask
[docs]
def combine_partitions(image_data, combinations):
"""
Combine multiple partition IDs in an image array according to specified groupings.
This method merges partition regions by replacing multiple partition IDs with a single
target ID for each group. After combining, the partition IDs are renumbered sequentially
starting from 1.
Parameters
----------
image_data : numpy.ndarray
Input image array containing partition IDs as integer values.
combinations : list of list of int
List of groups where each group is a list of partition IDs to be combined.
The first valid ID in each group becomes the target ID for that group.
Returns
-------
numpy.ndarray
Modified image array with combined partitions and renumbered IDs starting from 1.
Has the same shape as the input image_data.
Notes
-----
- Only partition IDs that are present in the image_data are considered valid.
- Groups with fewer than 2 valid IDs are skipped.
- The final renumbering ensures sequential partition IDs without gaps.
Example
-------
import numpy as np
image_data = np.array([[1, 1, 2, 2],
[1, 3, 3, 2],
[4, 4, 3, 2]])
combinations = [[1, 2], [3, 4]]
combined_image = combine_partitions(image_data, combinations)
print(combined_image)
# Output:
# [[1 1 1 1]
# [1 2 2 1]
# [2 2 2 1]]
Usage
-----
import upxo.gsdataops.grid_ops as gridOps
Use as: gridOps.combine_partitions
"""
original_shape = image_data.shape
present_values = set(np.unique(image_data))
for group in combinations:
valid_ids = [val for val in group if val in present_values]
if len(valid_ids) < 2:
continue
target_id = valid_ids[0]
ids_to_replace = valid_ids[1:]
mask = np.isin(image_data, ids_to_replace)
image_data[mask] = target_id
for old_id in ids_to_replace:
present_values.discard(old_id)
_, inverse_indices = np.unique(image_data, return_inverse=True)
img_data_mod = inverse_indices.reshape(original_shape)+1
return img_data_mod
[docs]
def detect_features_in_image_MCstateWise_2d(image_data, binary_structure_order=2,):
"""
Detect features in an image and label them uniquely.
Parameters
----------
image_data : numpy.ndarray
Input image array containing different states as integer values.
binary_structure_order : int, optional
Order of the binary structure for connectivity. Default is 2 (8-connectivity).
Returns
-------
labeled_image : numpy.ndarray
Labeled image where each detected feature has a unique integer label.
original_to_labels : dict
Mapping from original state values to lists of new feature labels.
labels_to_original : dict
Mapping from new feature labels to their corresponding original state values.
Example
-------
import numpy as np
image_data = np.array([[1, 1, 0, 2, 2],
[1, 0, 0, 2, 2],
[0, 0, 3, 3, 0],
[4, 4, 0, 0, 0]])
labeled_image, orig_to_labels, labels_to_orig = detect_features_in_image_MCstateWise_2d(image_data, binary_structure_order=2)
print("Labeled Image:\n", labeled_image)
print("Original to Labels Mapping:\n", orig_to_labels)
print("Labels to Original Mapping:\n", labels_to_orig)
"""
from scipy.ndimage import label as nd_label
from scipy.ndimage import generate_binary_structure
binstr = generate_binary_structure(2, binary_structure_order)
labeled_image = np.zeros_like(image_data, dtype=int)
original_to_labels = {} # {original_state_id : [new_label_1, new_label_2]}
labels_to_original = {} # {new_label_id: original_state_id}
current_label_offset = 1
unique_states = np.unique(image_data)
for state_val in unique_states:
mask = (image_data == state_val)
temp_labels, num_features = nd_label(mask, structure=binstr)
if num_features == 0:
continue
feature_mask = temp_labels > 0
temp_labels[feature_mask] += (current_label_offset-1)
labeled_image += temp_labels
new_ids = list(range(current_label_offset,
current_label_offset+num_features))
original_to_labels[state_val] = new_ids
for new_id in new_ids:
labels_to_original[new_id] = state_val
current_label_offset += num_features
return labeled_image, original_to_labels, labels_to_original
# ---------------------------------------------------------------------------
# Twin lamella analysis helpers (SID_TwinAnalysisOps)
# ---------------------------------------------------------------------------
[docs]
def compute_twin_thickness_stats(
parent_info: dict,
prop_ebsd: dict,
step_size_um: float,
thickness_key=None,
lfi=None,
abrupt_threshold: float = 0.8,
) -> dict:
"""
Compute twin lamella thickness statistics from EBSD grain properties.
Parameters
----------
parent_info : dict
Output of ``identify_parent_grains``.
prop_ebsd : dict
``{grain_id: {'minor_axis_length': ..., ...}}``.
step_size_um : float
EBSD step size in µm/pixel.
thickness_key : str or None
Property key to use as thickness proxy. Defaults to
``'minor_axis_length'`` when available, else ``'eq_diameter'``.
lfi : ndarray or None
EBSD label field image. When supplied, abrupt-twin detection is
performed by comparing each twin's extent along the parent's major
axis direction against the parent's own extent.
abrupt_threshold : float
Twins with ``twin_span / parent_span < abrupt_threshold`` are
counted as abruptly-ending. Default 0.8.
Returns
-------
dict with keys: ``gids``, ``thick_px``, ``thick_um``, ``col``,
``step_um``, ``mean``, ``median``, ``std``, ``min``, ``max``,
``pct10``, ``pct90``, ``n_abrupt_ebsd``, ``abrupt_frac_ebsd``.
"""
import numpy as np
twin_gids: set = set()
for info in parent_info.values():
twin_gids.update(int(g) for g in np.asarray(info['pure_twins']).ravel())
twin_gids.update(int(g) for g in np.asarray(info['intermediates']).ravel())
gids_in_prop = set(prop_ebsd.keys())
twin_gids_found = sorted(twin_gids & gids_in_prop)
sample_keys = list(prop_ebsd[next(iter(prop_ebsd))].keys())
if thickness_key is None:
if 'minor_axis_length' in sample_keys:
thickness_key = 'minor_axis_length'
elif 'eq_diameter' in sample_keys:
thickness_key = 'eq_diameter'
else:
raise RuntimeError(f'No suitable thickness key. Available: {sample_keys}')
thick_px = np.array(
[prop_ebsd[g][thickness_key] for g in twin_gids_found
if prop_ebsd[g][thickness_key] is not None
and prop_ebsd[g][thickness_key] > 0],
dtype=np.float64,
)
thick_um = thick_px * float(step_size_um)
result = {
'gids': twin_gids_found,
'thick_px': thick_px,
'thick_um': thick_um,
'col': thickness_key,
'step_um': float(step_size_um),
'mean': float(thick_um.mean()),
'median': float(np.median(thick_um)),
'std': float(thick_um.std(ddof=1)),
'min': float(thick_um.min()),
'max': float(thick_um.max()),
'pct10': float(np.percentile(thick_um, 10)),
'pct90': float(np.percentile(thick_um, 90)),
'n_abrupt_ebsd': 0,
'abrupt_frac_ebsd': 0.0,
}
if lfi is None or len(twin_gids_found) == 0:
return result
# Abrupt-twin detection — single regionprops pass avoids repeated np.where
import cc3d
from skimage.measure import regionprops as _skrp
gid2props = {p.label: p for p in _skrp(lfi.astype(np.int32))}
pure_parents_all: set = set()
for info in parent_info.values():
pure_parents_all.update(int(g) for g in np.asarray(info.get('pure_parents', [])).ravel())
twin_gids_set = set(int(g) for g in twin_gids_found)
edges = cc3d.region_graph(lfi.astype(np.int32), connectivity=4)
twin_to_parent: dict = {}
for a, b in edges:
a, b = int(a), int(b)
if a in twin_gids_set and b in pure_parents_all and a not in twin_to_parent:
twin_to_parent[a] = b
elif b in twin_gids_set and a in pure_parents_all and b not in twin_to_parent:
twin_to_parent[b] = a
n_abrupt = 0
n_checked = 0
for twin_gid in twin_gids_found:
parent_gid = twin_to_parent.get(twin_gid)
if parent_gid is None or twin_gid not in gid2props or parent_gid not in gid2props:
continue
rp_par = gid2props[parent_gid]
rp_twin = gid2props[twin_gid]
d = np.array([-np.sin(rp_par.orientation), np.cos(rp_par.orientation)])
proj_par = rp_par.coords @ d
proj_twin = rp_twin.coords @ d
parent_span = float(proj_par.max() - proj_par.min())
twin_span = float(proj_twin.max() - proj_twin.min())
if parent_span > 0 and (twin_span / parent_span) < abrupt_threshold:
n_abrupt += 1
n_checked += 1
if n_checked > 0:
result['n_abrupt_ebsd'] = n_abrupt
result['abrupt_frac_ebsd'] = n_abrupt / n_checked
return result
[docs]
def compute_grain_intercept_lengths(rp, lfi, n_sample_pts=None) -> 'np.ndarray':
"""
Compute chord (intercept) lengths through a grain perpendicular to its
major axis, by ray-casting from evenly-spaced sample points along the
major axis.
Parameters
----------
rp : skimage.measure.RegionProperties
regionprops object for the grain — pre-computed by the caller so
the LFI is only iterated once for the full grain set.
lfi : ndarray (ny, nx)
Label field image.
n_sample_pts : int or None
Number of sample points along the major axis. Defaults to
``int(major_axis_length)``, capped from above by that value.
Returns
-------
ndarray(n,)
Intercept lengths in pixels, one value per sample point.
"""
import numpy as np
orientation = rp.orientation
major_ax = rp.major_axis_length
centroid = np.array(rp.centroid, dtype=np.float64)
gid = rp.label
if major_ax < 1.0:
return np.array([float(len(rp.coords))], dtype=np.float64)
# Long-axis and perpendicular (minor-axis) directions in (row, col) space
d = np.array([-np.sin(orientation), np.cos(orientation)])
pd = np.array([ np.cos(orientation), np.sin(orientation)])
half = major_ax / 2.0
n = min(n_sample_pts or int(major_ax), max(1, int(major_ax)))
pts = np.linspace(centroid - half * d, centroid + half * d, n)
ny, nx = lfi.shape
max_step = max(ny, nx)
intercepts = np.zeros(n, dtype=np.float64)
for i, pt in enumerate(pts):
count_pos = 0
for step in range(1, max_step):
r = int(round(pt[0] + step * pd[0]))
c = int(round(pt[1] + step * pd[1]))
if 0 <= r < ny and 0 <= c < nx and lfi[r, c] == gid:
count_pos += 1
else:
break
count_neg = 0
for step in range(1, max_step):
r = int(round(pt[0] - step * pd[0]))
c = int(round(pt[1] - step * pd[1]))
if 0 <= r < ny and 0 <= c < nx and lfi[r, c] == gid:
count_neg += 1
else:
break
intercepts[i] = float(count_pos + count_neg + 1)
return intercepts
# #################################################################################
# #################################################################################
# This module is expected to grow quite a lot. In the interest of locating
# the right definitions whilst development, I have provided the below
# search ids. Just copy the search IDs and search for it. The definition
# must be near by to the location identified.
# DEVELOPER :---- FUNCTIONALITY LOCATION SEARCH
# Sectioning and PV grid helpers. Search ID: SID_3dSectioningOps
# Bounding box operations. Search ID: SID_BBoxOps
# Intra-grain location finding operations. Search ID: SID_IntraGrainLocOps
# General LFI operations. Search ID: SID_GeneralLFIOps
# Voxel morphology smoothing operations. Search ID: SID_VoxelMorphSmoothOps