"""
Module: neighbour_ops
---------------------
Neighbour (connectivity) operations for 2D grain structures.
Identifies neighbouring grain IDs and grain-boundary pixel segments for
individual grains or across an entire grain structure. Provides first- through
nth-order neighbourhood queries with optional Numba acceleration for large
datasets.
Usage
-----
::
import upxo.connops.neighbour_ops as neighOps
"""
import math
import random
import numpy as np
from numba import njit, prange
from copy import deepcopy
@njit(parallel=True)
def NMB_get_neighbor_mask(neigh_mask, gid):
"""Mark the boundary pixels of a single grain in a 2D label sub-array.
JIT-compiled with Numba (parallel outer loop via ``prange``). For every
pixel that belongs to ``gid``, the four 4-connected neighbours that do
*not* belong to ``gid`` are marked ``-1``. All remaining pixels are set
to ``0``.
Parameters
----------
neigh_mask : numpy.ndarray of int, shape (R, C)
Sub-array of the full labelled image covering the extended bounding box
of the grain. Modified in-place and returned.
gid : int
Label value of the grain whose boundary-adjacent pixels are to be
identified.
Returns
-------
neigh_mask : numpy.ndarray of int, shape (R, C)
Same array, now containing ``-1`` at pixels that are direct
4-connected neighbours of ``gid`` but belong to a different grain, and
``0`` everywhere else.
Notes
-----
The outer row loop is parallelised with ``prange``; the inner column loop
is sequential. Bounds checks are explicit — no zero-padding of the input
array is required.
"""
nrows, ncols = neigh_mask.shape
for row in prange(nrows):
for col in range(ncols):
if neigh_mask[row, col] == gid:
if row - 1 >= 0:
if neigh_mask[row-1, col] != gid:
neigh_mask[row-1, col] = -1
if row + 1 < nrows:
if neigh_mask[row+1, col] != gid:
neigh_mask[row+1, col] = -1
if col - 1 >= 0:
if neigh_mask[row, col-1] != gid:
neigh_mask[row, col-1] = -1
if col + 1 < ncols:
if neigh_mask[row, col+1] != gid:
neigh_mask[row, col+1] = -1
if neigh_mask[row, col] != -1:
neigh_mask[row, col] = 0
return neigh_mask
[docs]
def find_neigh_fid(gdict, lfi, fid, nfeatures, include_central_grain=False,
update_grain_object=True, user_defined_bbox_ex_bounds=False,
bbox_ex_bounds_fid=None, use_numba=False, _char_fx_version_=2,
get_gbsegs=True, save_gbsegs=False, dtype_gbseg=np.int32,
throw=False, throw_gbsegs=False):
"""Find the neighbour IDs of a single grain and optionally its boundary segments.
Extracts the extended bounding-box sub-array for ``fid``, constructs a
neighbour mask (via Numba or pure Python depending on ``nfeatures``), and
returns the unique IDs of all directly touching grains. Grain-boundary
segment maps can be generated and stored in the grain object's sparse
format.
Parameters
----------
gdict : dict
Grain-object dictionary keyed by grain ID. Each value is either a
grain object directly (``_char_fx_version_=2``) or a dict with a
``'grain'`` key (``_char_fx_version_=1``).
lfi : numpy.ndarray of int, shape (R, C)
Full labelled feature image for the grain structure.
fid : int
Grain ID for which neighbours are to be found.
nfeatures : int
Total number of grains in the structure. Governs the Numba/pure-Python
branch threshold (Numba is used when ``nfeatures > 2000`` or
``use_numba`` is True).
include_central_grain : bool, default False
Include ``fid`` itself in the returned neighbour array.
update_grain_object : bool, default True
Write the neighbour ID array into the grain object as ``grain.neigh``.
user_defined_bbox_ex_bounds : bool, default False
Use ``bbox_ex_bounds_fid`` instead of the bounding-box stored in the
grain object.
bbox_ex_bounds_fid : tuple of int or None, default None
``(rmin, rmax, cmin, cmax)`` bounding-box extents to use when
``user_defined_bbox_ex_bounds`` is True.
use_numba : bool, default False
Force use of :func:`NMB_get_neighbor_mask` regardless of ``nfeatures``.
_char_fx_version_ : {1, 2}, default 2
Selects how grain objects are accessed from ``gdict``. Version 2
accesses grain attributes directly; version 1 assumes a nested dict
with a ``'grain'`` key.
get_gbsegs : bool, default True
Compute the grain-boundary segment map (a 2D array where each
non-zero pixel contains the neighbour grain's ID at that boundary
location).
save_gbsegs : bool, default False
Serialise the boundary segment map into the grain object in sparse
format (``shape``, ``NZI``, ``NZV``, ``dtype`` keys).
dtype_gbseg : numpy dtype, default numpy.int32
Data type recorded in the sparse ``dtype`` field when saving segments.
throw : bool, default False
Return ``neighbour_ids`` (or a tuple) instead of ``None``.
throw_gbsegs : bool, default False
When both ``get_gbsegs`` and ``throw_gbsegs`` are True, return
``(neighbour_ids, gbsegs)`` instead of ``neighbour_ids`` alone.
Returns
-------
None
When ``throw`` is False.
neighbour_ids : numpy.ndarray of int
Unique IDs of all grains directly touching ``fid``. Returned when
``throw`` is True and ``throw_gbsegs`` is False.
(neighbour_ids, gbsegs) : tuple
Tuple of the neighbour ID array and the 2D boundary segment map.
Returned when both ``throw`` and ``throw_gbsegs`` are True.
Notes
-----
Grain-boundary segments are always stored in the grain object in sparse
format (non-zero indices and values). To reconstruct the dense array use
the ``lfi_gbsegs`` property of the grain object, or rebuild manually from
the ``shape``, ``NZI``, and ``NZV`` fields.
Examples
--------
Typical usage through the grain-structure object (which calls this
function internally):
.. code-block:: python
from upxo.ggrowth.mcgs import mcgs
pxtal = mcgs(input_dashboard='input_dashboard.xls')
pxtal.simulate()
pxtal.detect_grains()
tslice = 10
pxtal.gs[tslice].char_morph_2d(char_gb=True)
pxtal.gs[tslice].find_neigh(include_central_grain=False)
print(pxtal.gs[tslice].neigh_gid[10])
For direct use at module level:
.. code-block:: python
import upxo.connops.neighbour_ops as neighOps
neigh_ids = neighOps.find_neigh_fid(
gdict, lfi, fid=5, nfeatures=200,
throw=True, throw_gbsegs=False)
"""
if user_defined_bbox_ex_bounds:
bounds = bbox_ex_bounds_fid
else:
if _char_fx_version_ == 1:
bounds = gdict[fid]['grain'].bbox_ex_bounds
elif _char_fx_version_ == 2:
bounds = gdict[fid].bbox_ex_bounds
probable_grains_locs = lfi[bounds[0]:bounds[1]+1, bounds[2]:bounds[3]+1]
neigh_mask = deepcopy(probable_grains_locs)
if nfeatures > 2000 or use_numba:
neigh_mask = NMB_get_neighbor_mask(neigh_mask, fid)
else:
for row in range(neigh_mask.shape[0]):
for col in range(neigh_mask.shape[1]):
if neigh_mask[row, col] == fid:
if row-1 >= 0:
if neigh_mask[row-1, col] != fid:
neigh_mask[row-1, col] = -1
if row+1 < neigh_mask.shape[0]:
if neigh_mask[row+1, col] != fid:
neigh_mask[row+1, col] = -1
if col-1 >= 0:
if neigh_mask[row, col-1] != fid:
neigh_mask[row, col-1] = -1
if col+1 < neigh_mask.shape[1]:
if neigh_mask[row, col+1] != fid:
neigh_mask[row, col+1] = -1
if neigh_mask[row, col] != -1:
neigh_mask[row, col] = 0
neigh_pixel_locs = np.argwhere(neigh_mask == -1)
neigh_pixel_grain_ids = probable_grains_locs[neigh_pixel_locs[:, 0], neigh_pixel_locs[:, 1]]
if include_central_grain:
neighbour_ids = np.unique(np.append(neigh_pixel_grain_ids, fid))
else:
neighbour_ids = np.unique(neigh_pixel_grain_ids[neigh_pixel_grain_ids != fid])
if update_grain_object:
if _char_fx_version_ == 1:
gdict[fid]['grain'].neigh = np.asarray(neighbour_ids, dtype=np.int32)
elif _char_fx_version_ == 2:
gdict[fid].neigh = np.asarray(neighbour_ids, dtype=np.int32)
if get_gbsegs:
gbsegs = np.zeros_like(neigh_mask)
for ni in neighbour_ids:
gbsegs[np.logical_and(neigh_mask == -1, probable_grains_locs == ni)] = ni
if save_gbsegs:
gbseg_data = {}
gbsegs_raveled = gbsegs.ravel()
initShape = gbsegs.shape
NZI = np.where(gbsegs_raveled)[0]
NZV = gbsegs_raveled[NZI]
gbseg_data['shape'] = initShape
gbseg_data['NZI'] = NZI
gbseg_data['NZV'] = NZV
gbseg_data['dtype'] = dtype_gbseg
if _char_fx_version_ == 1:
gdict[fid]['grain'].gbsegs = gbseg_data
elif _char_fx_version_ == 2:
gdict[fid].gbsegs = gbseg_data
if throw:
neigh_gid__gbsegs = neighbour_ids
else:
neigh_gid__gbsegs = None
if get_gbsegs and throw_gbsegs:
neigh_gid__gbsegs = (neighbour_ids, gbsegs)
return neigh_gid__gbsegs
[docs]
def find_neigh(lfi, fids, gdict, nfeatures, include_central_grain=False,
char_fx_version=2, print_msg=True,
user_defined_bbox_ex_bounds=False, bbox_ex_bounds=None,
update_grain_object=True, use_numba=False):
"""Find the neighbour IDs for every grain in the structure.
Iterates over all grain IDs in ``fids``, calls :func:`find_neigh_fid` for
each, and collects the results into a single dictionary.
Parameters
----------
lfi : numpy.ndarray of int, shape (R, C)
Full labelled feature image for the grain structure.
fids : iterable of int
Grain IDs to process. Typically ``range(1, nfeatures + 1)``.
gdict : dict
Grain-object dictionary keyed by grain ID (see :func:`find_neigh_fid`).
nfeatures : int
Total number of grains; governs the Numba/pure-Python branch in
:func:`find_neigh_fid`.
include_central_grain : bool, default False
Include each grain's own ID in its neighbour list.
char_fx_version : {1, 2}, default 2
Grain-object access version forwarded to :func:`find_neigh_fid`.
print_msg : bool, default True
Print a progress message before the loop begins.
user_defined_bbox_ex_bounds : bool, default False
Use bounding-box extents from ``bbox_ex_bounds`` instead of those
stored in the grain objects.
bbox_ex_bounds : dict or None, default None
Mapping of grain ID → ``(rmin, rmax, cmin, cmax)`` when
``user_defined_bbox_ex_bounds`` is True.
update_grain_object : bool, default True
Write each grain's neighbour array into its grain object.
use_numba : bool, default False
Force Numba acceleration regardless of ``nfeatures``.
Returns
-------
neigh_gid : dict[int, list of int]
Mapping of grain ID → list of neighbour grain IDs.
Examples
--------
.. code-block:: python
from upxo.ggrowth.mcgs import mcgs
pxtal = mcgs(input_dashboard='input_dashboard.xls')
pxtal.simulate()
pxtal.detect_grains()
tslice = 10
pxtal.gs[tslice].char_morph_2d(char_gb=True)
# With central grain included
pxtal.gs[tslice].find_neigh(include_central_grain=True)
print(pxtal.gs[tslice].neigh_gid[10])
# Without central grain
pxtal.gs[tslice].find_neigh(include_central_grain=False)
print(pxtal.gs[tslice].neigh_gid[10])
"""
neigh_gid = {}
if print_msg:
print('\nExtracting neigh list for all grains\n')
for fid in fids:
bbox_ex_bounds_fid = bbox_ex_bounds[fid] if user_defined_bbox_ex_bounds else None
neigh_gid[fid] = [int(ngid) for ngid in find_neigh_fid(gdict, lfi, fid, nfeatures,
include_central_grain=include_central_grain,
update_grain_object=update_grain_object,
_char_fx_version_=char_fx_version,
get_gbsegs=True, save_gbsegs=False, dtype_gbseg=np.int32,
throw=True, throw_gbsegs=False,
user_defined_bbox_ex_bounds=user_defined_bbox_ex_bounds,
bbox_ex_bounds_fid=bbox_ex_bounds_fid,
use_numba=use_numba)]
return neigh_gid
[docs]
def get_upto_nth_order_neighbors(neigh_gid, grain_id, neigh_order,
include_parent=True, output_type='list'):
"""Return all grains reachable within ``neigh_order`` hops from ``grain_id``.
Starting from the 1st-order neighbours of ``grain_id``, the function
iteratively expands the set by one hop per order until ``neigh_order``
hops are exhausted.
Parameters
----------
neigh_gid : dict[int, list of int]
Pre-computed neighbour map — typically ``gs.neigh_gid`` populated by
:func:`find_neigh`.
grain_id : int
ID of the central grain.
neigh_order : int
Maximum hop count. ``1`` returns direct neighbours; ``2`` returns
neighbours-of-neighbours, and so on. ``0`` returns ``[grain_id]``.
include_parent : bool, default True
Include ``grain_id`` in the returned set.
output_type : {'list', 'nparray', 'set'}, default 'list'
Return type. ``'nparray'`` also accepts ``'np'``, ``'np.array'``,
``'numpy'``.
Returns
-------
list, numpy.ndarray, or set
All grain IDs within ``neigh_order`` hops of ``grain_id``.
Examples
--------
.. code-block:: python
import upxo.connops.neighbour_ops as neighOps
# neigh_gid populated by find_neigh or find_neigh_v2
result = neighOps.get_upto_nth_order_neighbors(
gs.neigh_gid, grain_id=6, neigh_order=3,
include_parent=True, output_type='list')
See Also
--------
get_nth_order_neighbors : Returns only the grains at exactly order n.
"""
if neigh_order == 0:
singleton = {grain_id}
if output_type in ('np', 'nparray', 'np.array', 'numpy'):
return np.array([grain_id])
if output_type == 'set':
return singleton
return [grain_id]
neighbors = set(neigh_gid.get(grain_id, []))
for _ in range(neigh_order - 1):
new_neighbors = set()
for neighbor in neighbors:
new_neighbors.update(neigh_gid.get(neighbor, []))
neighbors.update(new_neighbors)
if not include_parent:
neighbors.discard(grain_id)
if output_type == 'list':
return list(neighbors)
if output_type in ('np', 'nparray', 'np.array', 'numpy'):
return np.array(list(neighbors))
elif output_type == 'set':
return neighbors
[docs]
def get_nth_order_neighbors(neigh_gid, grain_id, neigh_order, include_parent=True):
"""Return the grains reachable in *exactly* ``neigh_order`` hops.
Computes the set difference between the up-to-*n* and up-to-*(n-1)*
neighbourhood sets to isolate grains that first become reachable at
exactly the requested order.
Parameters
----------
neigh_gid : dict[int, list of int]
Pre-computed neighbour map — typically ``gs.neigh_gid``.
grain_id : int
ID of the central grain.
neigh_order : int
Exact hop count. Grains reachable in fewer hops are excluded.
include_parent : bool, default True
Include ``grain_id`` in the candidate set before differencing.
Returns
-------
list of int
Grain IDs reachable in exactly ``neigh_order`` hops from
``grain_id``.
Examples
--------
.. code-block:: python
import upxo.connops.neighbour_ops as neighOps
result = neighOps.get_nth_order_neighbors(
gs.neigh_gid, grain_id=10, neigh_order=2, include_parent=True)
"""
neigh_upto_n_minus_1 = set(get_upto_nth_order_neighbors(
neigh_gid, grain_id, neigh_order - 1,
include_parent=include_parent, output_type='set'))
neigh_upto_n = set(get_upto_nth_order_neighbors(
neigh_gid, grain_id, neigh_order,
include_parent=include_parent, output_type='set'))
return list(neigh_upto_n.difference(neigh_upto_n_minus_1))
[docs]
def get_upto_nth_order_neighbors_all_grains(neigh_gid, gids, neigh_order,
include_parent=True, output_type='list'):
"""Return up-to-nth-order neighbours for every grain.
Calls :func:`get_upto_nth_order_neighbors` for each grain ID in
``gids`` and collects the results into a dictionary.
Parameters
----------
neigh_gid : dict[int, list of int]
Pre-computed neighbour map — typically ``gs.neigh_gid``.
gids : array-like of int
Grain IDs to process — typically ``gs.gid``.
neigh_order : int
Maximum hop count.
include_parent : bool, default True
Include each grain's own ID in its neighbour list.
output_type : {'list', 'nparray', 'set'}, default 'list'
Return type for each grain's neighbour collection.
Returns
-------
dict[int, list or numpy.ndarray or set]
Mapping of grain ID → neighbours up to order ``neigh_order``.
Examples
--------
.. code-block:: python
import upxo.connops.neighbour_ops as neighOps
result = neighOps.get_upto_nth_order_neighbors_all_grains(
gs.neigh_gid, gs.gid, neigh_order=2,
include_parent=True, output_type='list')
print(result[5]) # neighbours of grain 5 up to order 2
"""
return {gid: get_upto_nth_order_neighbors(neigh_gid, gid, neigh_order,
include_parent=include_parent,
output_type=output_type)
for gid in gids}
[docs]
def get_nth_order_neighbors_all_grains(neigh_gid, gids, neigh_order, include_parent=True):
"""Return the exactly-nth-order neighbours for every grain.
Calls :func:`get_nth_order_neighbors` for each grain ID in ``gids``
and collects results into a dictionary. Only grains first reachable
at exactly ``neigh_order`` hops are included.
Parameters
----------
neigh_gid : dict[int, list of int]
Pre-computed neighbour map — typically ``gs.neigh_gid``.
gids : array-like of int
Grain IDs to process — typically ``gs.gid``.
neigh_order : int
Exact hop count.
include_parent : bool, default True
Include each grain's own ID in the candidate set before differencing.
Returns
-------
dict[int, list of int]
Mapping of grain ID → list of grains reachable in exactly
``neigh_order`` hops.
Examples
--------
.. code-block:: python
import upxo.connops.neighbour_ops as neighOps
result = neighOps.get_nth_order_neighbors_all_grains(
gs.neigh_gid, gs.gid, neigh_order=2, include_parent=True)
print(result[10]) # grains at exactly order 2 from grain 10
"""
return {gid: get_nth_order_neighbors(neigh_gid, gid, neigh_order,
include_parent=include_parent)
for gid in gids}
[docs]
def get_upto_nth_order_neighbors_all_grains_prob(neigh_gid, gids, neigh_order,
include_parent=False,
print_msg=False,
_int_approx_=0.05):
"""Return up-to-nth-order neighbours for all grains with probabilistic fractional orders.
Extends :func:`get_upto_nth_order_neighbors_all_grains` to accept
non-integer ``neigh_order`` values. A fractional order such as ``1.5``
returns all grains up to order 1 plus a random subset of order-2 grains,
where the fraction included equals the decimal part of ``neigh_order``.
Parameters
----------
neigh_order : int or float
Neighbourhood order.
* **Integer** (or float within ``_int_approx_`` of an integer):
delegates directly to :func:`get_upto_nth_order_neighbors_all_grains`.
* **Non-integer float** (e.g. ``1.5``): returns all up-to-floor
neighbours plus ``ceil(decimal_part × |order-n neighbours|)``
randomly chosen grains from the next order shell.
recalculate : bool, default False
Force recomputation of the base neighbour map.
include_parent : bool, default False
Include each grain's own ID in its neighbour set.
print_msg : bool, default False
Print a diagnostic message indicating which branch was taken.
_int_approx_ : float, default 0.05
Maximum distance from the nearest integer for ``neigh_order`` to be
treated as an integer.
Returns
-------
dict[int, list of int]
When ``neigh_order`` is (effectively) an integer, a mapping of grain
ID → neighbours up to that integer order.
When ``neigh_order`` is a true fraction, a mapping of grain ID →
blended neighbour list (floor-order neighbours + probabilistic
sample of the next order shell).
Raises
------
ValueError
If ``neigh_order`` is neither an int nor a float.
Examples
--------
.. code-block:: python
from upxo.ggrowth.mcgs import mcgs
pxtal = mcgs(input_dashboard='input_dashboard.xls')
pxtal.simulate()
pxtal.detect_grains()
tslice = 10
fn = pxtal.gs[tslice].get_upto_nth_order_neighbors_all_grains_prob
neigh_int = fn(1, recalculate=False, include_parent=True)
neigh_near = fn(1.06, recalculate=False, include_parent=True)
neigh_half = fn(1.5, recalculate=False, include_parent=True)
print(neigh_int[22]) # integer-order result for grain 22
print(neigh_near[22]) # near-integer result (treated as order 1)
print(neigh_half[22]) # probabilistic blend for grain 22
"""
no = neigh_order
if isinstance(no, (int, np.int32)):
if print_msg:
print('neigh_order is of type int. Adopting the usual method.')
return get_upto_nth_order_neighbors_all_grains(
neigh_gid, gids, no, include_parent=include_parent)
elif isinstance(no, (float, np.float64)):
if abs(no - round(no)) < _int_approx_:
if print_msg:
print('neigh_order is close to being int. Adopting usual method.')
return get_upto_nth_order_neighbors_all_grains(
neigh_gid, gids, math.floor(no), include_parent=include_parent)
else:
no_low = math.floor(no)
neigh_upto_low = get_upto_nth_order_neighbors_all_grains(
neigh_gid, gids, no_low, include_parent=include_parent)
neigh_at_high = get_nth_order_neighbors_all_grains(
neigh_gid, gids, no_low + 1, include_parent=False)
delno = np.round(abs(no - no_low), 4)
neighbours = {}
for gid in gids:
nselect = math.ceil(delno * len(neigh_at_high[gid]))
if len(neigh_at_high[gid]) > 1:
neighbours[gid] = (neigh_upto_low[gid] +
random.sample(neigh_at_high[gid], nselect))
return neighbours
else:
raise ValueError('Invalid neigh_order')