"""Geometrification module for converting raster grain structures to polygon representations."""
import numpy as np
import rasterio
from abc import ABC, abstractmethod
from copy import deepcopy
import matplotlib.pyplot as plt
from rasterio.features import shapes
from shapely.strtree import STRtree
from shapely.geometry import Point
import upxo._sup.data_ops as DO
from shapely import affinity as ShAff
from upxo._sup import dataTypeHandlers as dth
from shapely.geometry import LineString, MultiLineString
from shapely.geometry import shape as ShShape
from shapely.geometry import Polygon, MultiPolygon
from shapely.geometry.collection import GeometryCollection
from shapely.geometry import Polygon, MultiPoint
from upxo.geoEntities.point2d import Point2d
from upxo.geoEntities.mulpoint2d import MPoint2d
from shapely.geometry import Point as ShPoint2d
from upxo.geoEntities.mulsline2d import MSline2d
from upxo.geoEntities.mulsline2d import ring2d
from upxo._sup.data_ops import find_common_coordinates
# from meshpy.triangle import MeshInfo, build
from scipy.ndimage import generic_filter
[docs]
class polygonised_grain_structure():
"""Polygonised grain structure for geometrification of raster-format grain structures."""
__slots__ = ('lgi', 'gid', 'n', 'polygons', 'pxtal', 'neigh_gid',
'gsmp', 'gbsegments_raw', 'gbsegments_mls', 'gbseg_smoothed',
'neigh_gid_pxtal', 'GBSEG', 'GBPOINTS',
'raster_img_polygonisation_results', 'polygons_raw_exteriors',
'polygons_raw_holes', 'neigh_pols', 'centroids_raw',
'centroids', 'xyoffset', 'grain_loc_ids',
'JNP', 'GBP', 'GBP_pure', 'GBP_at_boundary', 'jnp_all_coords',
'quality', 'sorted_segs',
'jnp_all_upxo', 'jnp_all_shapely', 'gbp_all_coords',
'gbp_all_upxo', 'gbp_all_shapely', 'jnp_grain_wise_indices',
'gbp_grain_wise_coords', 'gbp_grain_wise_indices',
'gbp_grain_wise_points', 'gbmullines_grain_wise',
'junction_points_coord', 'jnp_all_sorted_coords',
'jnp_all_sorted_upxo', 'gbsegments', 'gid_pair_ids',
'gid_pair_ids_unique_lr', 'gid_pair_ids_unique_rl',
'nconn', 'GBSEGMENTS', 'consolidated_segments',
'GB', 'GBCoords', 'GRAINS', 'POLYXTAL', 'mids_all_gbsegs',
'sgseg_obj_list', 'smoothed')
EPS_coord_coincide = 1E-8
def __init__(self, lgi, gids, neigh_gid_pxtal):
"""Initialise the instance."""
self.lgi, self.gid, self.neigh_gid_pxtal = lgi, gids, neigh_gid_pxtal
self.n = len(np.unique(self.gid))
self.GBSEG = {gid: [] for gid in self.gid}
# mp = self.polygonize(verbose=True)
self.polygons = None
self.polygons_raw_exteriors = {gid: [] for gid in self.gid}
self.polygons_raw_holes = {gid: [] for gid in self.gid}
self.gsmp = None
self.smoothed = {}
[docs]
def geometrify(self, verbose=True):
"""Geometrify."""
self.polygonize(self.lgi, self.gid, verbose=verbose)
[docs]
def polygonize(self, user_lgi=False, lgi=None, user_gids=False, gids=None,
verbose=True):
"""Polygonize grains in self.lgi."""
if verbose:
print("Polygonizing the raster image of the grain structure.")
if not user_lgi:
lgi, gid = self.lgi, self.gid
# ----------------------------------------
# Validations
# ----------------------------------------
rioshapes = rasterio.features.shapes
with rasterio.Env():
profile = rasterio.profiles.DefaultGTiffProfile()
profile.update(width=lgi.shape[1], height=lgi.shape[0],
count=1, dtype=lgi.dtype,
transform=rasterio.transform.Affine.identity())
# ----------------------------------------
with rasterio.MemoryFile() as memfile:
with memfile.open(**profile) as dataset:
dataset.write(lgi, 1)
self.raster_img_polygonisation_results = []
for _gid_ in gid:
mask = (lgi == _gid_).astype(np.uint8)
rs = list(rioshapes(mask, mask=mask,
transform=dataset.transform))
self.raster_img_polygonisation_results.append(rs)
[docs]
def setup_gsmp_datastructure(self):
"""Setup gsmp datastructure."""
self.gsmp = {'raw': None, 'smoothed': None}
[docs]
def make_gsmp(self, verbose=True):
"""Build and return gsmp."""
if verbose:
print("Setting up grain structure multi-polygon data structure.")
if not self.gsmp:
self.setup_gsmp_datastructure()
# --------------------------------
if not self.holes_exist:
self.gsmp['raw'] = MultiPolygon(self.allpol)
else:
print('Not implemented yet.')
[docs]
def make_polygonal_grains_raw(self, verbose=True, perform_xtal_dict_check=True):
"""Build and return polygonal grains raw."""
if verbose:
print("Setting up raw polygon data structure for the grain structure.")
for i, rpolrs in enumerate(self.raster_img_polygonisation_results,
start=1):
if len(rpolrs) == 0:
print(40*'-', f"\n No geometry at index {i}.")
elif len(rpolrs) == 1:
geometry = rpolrs[0]
# geometry[1] is the raster value and is not useful anymore.
featcoords = geometry[0]['coordinates']
nfeatures = len(featcoords) # Number of features found
self.polygons_raw_exteriors[i] = Polygon(featcoords[0])
if nfeatures > 1:
if nfeatures == 2:
self.polygons_raw_holes[i] = Polygon(featcoords[1])
else:
self.polygons_raw_holes[i] = {j: Polygon(fc)
for j, fc in enumerate(featcoords[1:],
start=1)}
elif len(rpolrs) > 1:
print(40*'-', f"\n More than one anticipated geometry at index {i}.")
if perform_xtal_dict_check:
VALUES = list(self.polygons_raw_exteriors.values())
A = {i: v for i, v in self.polygons_raw_exteriors.items() if type(v)!=list}
self.polygons_raw_exteriors = A
@property
def expol(self):
"""Expol."""
return list(self.polygons_raw_exteriors.values())
@property
def hlpol(self):
"""Hlpol."""
# Return all holes in the grain structure. Searches 3 levels deep.
if not self.holes_exist:
return []
holes = []
for hp1 in self.polygons_raw_holes.values():
if isinstance(hp1, Polygon):
# Level 1 hole feature. Hole inside a grain
# Example: isand grain
holes.append(hp1)
elif isinstance(hp1, dict):
# Level 2 hole feature
# hole inside a hole which is inside a grain
# Example: particle inside an island grain insie a grain.
for hp2 in hp1.values():
if isinstance(hp2, Polygon):
holes.append(hp2)
elif isinstance(hp2, dict):
# Level 3 hole feature
# whatever could be the application!! Just having it.
for hp3 in hp2.values():
if isinstance(hp2, Polygon):
holes.append(hp3)
else:
# We will end here and not have anymore holes!
pass
return holes
@property
def holes_exist(self):
"""Holes exist."""
return len(self.polygons_raw_holes.keys()) == 0
@property
def allpol(self):
"""Allpol."""
if not self.holes_exist:
return self.expol
else:
return self.expol + self.hlpol
[docs]
def set_polygons(self, verbose=True):
"""Set self.polygons to the list of external grain polygons."""
if verbose:
print("Setting up raw polygon data structure for the grain structure.")
self.polygons = self.allpol
[docs]
def find_neighbors(self, verbose=True):
"""Calculate neighbouring polygon IDs for all polygons in the grain structure.
Parameters
----------
polygons : list
A list of Shapely Polygon objects.
Returns
-------
dict
Keys are polygon IDs (1-based) and values are lists of neighbouring polygon IDs.
"""
if verbose:
print("Finding neighboring grains based on polygon intersections.")
if self.gid.size == 1:
self.neigh_gid = {int(self.gid[0]): [int(self.gid[0])]}
return
polygons, self.neigh_gid = self.allpol, {}
for i, polygon in enumerate(polygons, start=1):
self.neigh_gid[i] = []
for j, other_polygon in enumerate(polygons, start=1):
if i != j and polygon.touches(other_polygon):
self.neigh_gid[i].append(j)
[docs]
def val_neigh_gid(self, pixl_gs_neigh_gid):
"""
Parameters
----------
pixl_gs_neigh_gid : dict
neigh_gid dict of the pixellated grain structure (i.e. mcgs).
Returns
-------
bool, bool
True if validation passes, else False.
"""
diffid, neighcount = [], []
for n1, n2 in zip(pixl_gs_neigh_gid.values(), self.neigh_gid.values()):
count_diff = abs(len(n1)-len(n2))
neighcount.append(count_diff)
if count_diff == 0:
n1, n2 = np.sort(n1), np.sort(n2)
diffid.append(np.argwhere(n1 != n2).squeeze().size)
else:
diffid.append(-1)
return not(any(neighcount)), not(any(diffid))
[docs]
def make_neighpols(self):
"""Build and return neighpols."""
polygons = self.allpol
self.neigh_pols = {pid: [polygons[i-1] for i in self.neigh_gid[pid]]
for pid in self.gid}
[docs]
def get_polygon_touch_lines(self, poly1, poly2):
"""Calculate the lines or points where two Shapely polygons touch.
Parameters
----------
poly1, poly2 : shapely.geometry.Polygon
The two polygon objects to test.
Returns
-------
list
List of Shapely LineString objects if the polygons share edges, or
Shapely Point objects if they touch at a single point.
"""
intersection = poly1.intersection(poly2)
# -------------------------
if intersection.is_empty:
return []
# -------------------------
if isinstance(intersection, Point):
# Single point contact
return [intersection]
elif isinstance(intersection, LineString):
# Single line contact
return [intersection]
else:
# MultiLineString or GeometryCollection
# Multiple lines or points
return list(intersection.geoms)
[docs]
def get_multilinestring_touch_points(mls1, mls2):
"""Calculate the point(s) where two Shapely MultiLineStrings touch.
Parameters
----------
mls1, mls2 : shapely.geometry.MultiLineString
The two MultiLineString objects to test.
Returns
-------
list
List of Shapely Point objects at touch points, or empty list if none.
"""
touch_points = []
for line1 in mls1.geoms: # Iterate through linestrings in mls1
for line2 in mls2.geoms: # Iterate through linestrings in mls2
intersection = line1.intersection(line2) # Find intersection
if isinstance(intersection, Point):
touch_points.append(intersection)
return touch_points
[docs]
def area_gid(self, gid, gsrepr='raw'):
"""
Return area of gid grain. Valid values for gsrepr are self.gsmp.keys().
In any case, the returned area will be geometric and not pixellated.
Examples
--------
.. code-block:: python
self.area_gid(1, gsrepr='raw')
"""
return self.gsmp[gsrepr].geoms[gid-1].area
[docs]
def plot_linestrings(linestrings, ax=None, color='blue', linewidth=1, **kwargs):
"""Plot a list of Shapely LineStrings and return the axis.
Parameters
----------
linestrings : list
A list of LineString or MultiLineString objects.
ax : matplotlib.axes.Axes, optional
Axes object to plot on. If None, a new figure and axis are created.
color : str, optional
Line colour. Default is ``'blue'``.
linewidth : float, optional
Line width. Default is ``1``.
**kwargs
Additional keyword arguments passed to ``plt.plot()``.
Returns
-------
matplotlib.axes.Axes
The Axes object on which the lines were plotted.
"""
if ax is None:
fig, ax = plt.subplots() # Create a figure and axis if not provided
for linestring in linestrings:
if isinstance(linestring, MultiLineString):
for geom in linestring.geoms: # Plot each LineString in a MultiLineString
x, y = geom.xy
ax.plot(x, y, color=color, linewidth=linewidth, **kwargs)
else:
x, y = linestring.xy
ax.plot(x, y, color=color, linewidth=linewidth, **kwargs)
ax.set_aspect('equal') # Ensure equal aspect ratio for accurate representation
#plt.title('Plot of LineStrings') # Optional title
plt.show() # Show the plot (optional)
return ax # Return the axis object
[docs]
def plot_gsmp(self, raw=True, overlay_on_lgi=False,
xoffset=0.5, yoffset=0.5, ax=None):
"""
Plot multi-polygon form of the grain structure.
Parameters
----------
raw : bool, optional
If True, plot the raw gsmp. Default is True.
overlay_on_lgi : bool, optional
If True, overlay the polygons on the lgi image. Default is False.
xoffset, yoffset : float, optional
Coordinate offsets applied before plotting. Default is 0.5.
ax : matplotlib.axes.Axes, optional
Axes object to plot on. If None, a new figure and axis are created.
Returns
-------
matplotlib.axes.Axes
"""
if ax is None:
fig, ax = plt.subplots()
if overlay_on_lgi:
ax.imshow(self.lgi, cmap='viridis', origin='lower')
# -----------------------------------
# Access the grain structure multi-polygon object.
if raw or self.gsmp['smooth_1'] is not None:
GSMP = self.gsmp['raw']
if not raw and isinstance(self.gsmp['smooth_1'], MultiPolygon):
GSMP = self.gsmp['smooth_1']
# -----------------------------------
for i, polygon in enumerate(GSMP.geoms, start=1):
x, y = polygon.exterior.xy
x, y = np.array(list(x))-xoffset, np.array(list(y))-yoffset
ax.plot(x, y, color='black', lw=1, ls='-', marker='.')
pcx, pcy = polygon.centroid.coords.xy
pcx, pcy = pcx[0], pcy[0]
ax.plot(pcx, pcy, 'ko')
ax.text(pcx, pcy, str(i), color='white', fontsize=12,
fontweight='bold')
ax.set_aspect('equal')
return ax
[docs]
def plot_grains_gids(self, gids, add_points=True, points=None, gclr='color', title="user grains",
cmap_name='viridis', plot_centroids=True,
add_gid_text=True, plot_gbseg=False,
bjp_kwargs={'marker': 'o', 'mfc': 'yellow',
'mec': 'black', 'ms': 2.5},
addpoints_kwargs={'marker': 'x', 'mfc': 'black',
'mec': 'black', 'ms': 5}
):
"""
Parameters
----------
gids : int or list
Either a single grain index number or a list of them.
title : str, optional
Plot title. Default is ``"user grains"``.
gclr : str, optional
Colour mode. Options: ``'color'``, ``'binary'``, ``'grayscale'``.
Returns
-------
matplotlib.axes.Axes
Examples
--------
.. code-block:: python
# After acquiring gids for aspect_ratio between ranks 80 and 100,
# visualize those grains.
gid, _, __ = PXGS.gs[8].get_gid_prop_range(PROP_NAME='aspect_ratio',
range_type='rank',
rank_range=[80, 100])
PXGS.gs[8].plot_grains_gids(gid, cmap_name='CMRmap_r')
"""
# Validations
if not dth.IS_ITER(gids):
gids = [gids]
# -------------------------------
# Validtions
# -------------------------------
if gclr not in ('binary', 'grayscale'):
lgi_masked, masker = self.mask_lgi_with_gids(gids)
fig, ax = plt.subplots(1, figsize=(5, 5), dpi=120)
im = ax.imshow(lgi_masked, cmap=cmap_name, vmin=1)
# -------------------------------
if gclr in ('binary', 'grayscale'):
lgi_masked, masker = self.mask_lgi_with_gids(gids, masker=-10)
lgi_masked[lgi_masked != 0] = 1
fig, ax = plt.subplots(1, figsize=(5, 5), dpi=120)
im = ax.imshow(lgi_masked, cmap='gray_r', vmin=0, vmax=1)
# -------------------------------
fig.colorbar(im, ax=ax)
# -------------------------------
if plot_centroids:
self.plot_grain_centroids(gids, ax, add_gid_text=add_gid_text)
# -------------------------------
if plot_gbseg:
self.plot_contour_grains_gids(gids,
simple_all_preference='simple',
new_fig=False, ax=ax,
bjp_kwargs={'marker': bjp_kwargs['marker'],
'mfc': bjp_kwargs['mfc'],
'mec': bjp_kwargs['mec'],
'ms': bjp_kwargs['ms']}
)
if add_points:
ax.plot(points[:, 0], points[:, 1],
marker=addpoints_kwargs['marker'],
mfc=addpoints_kwargs['mfc'],
mec=addpoints_kwargs['mec'],
ms=addpoints_kwargs['ms'])
# -------------------------------
ax.set_title(title)
ax.set_xlabel(r"X-axis, $\mu m$", fontsize=12)
ax.set_ylabel(r"Y-axis, $\mu m$", fontsize=12)
return ax
[docs]
def set_minimum_nnodes_per_gbseg(self):
"""Set or update minimum nnodes per gbseg."""
raise NotImplementedError("set_minimum_nnodes_per_gbseg is not yet implemented.")
[docs]
def subdivide_gbsegments(self, method=1):
"""Subdivide grain boundary segments by inserting additional nodes."""
raise NotImplementedError("subdivide_gbsegments is not yet implemented.")
[docs]
def smooth_moving_avg(self, n=3):
"""Smooth moving avg."""
raise NotImplementedError("smooth_moving_avg is not yet implemented.")
[docs]
def smooth_polynomial(self):
"""Smooth polynomial."""
raise NotImplementedError("smooth_polynomial is not yet implemented.")
[docs]
def heal_edges(self, verbose=True):
"""Heal edges."""
raise NotImplementedError("heal_edges is not yet implemented.")
[docs]
def plot_gbseg(self, gid, segid=[]):
"""Visualise gbseg using Matplotlib or PyVista."""
raise NotImplementedError("plot_gbseg is not yet implemented.")
[docs]
def link_polygons(self):
"""Link polygons."""
raise NotImplementedError("link_polygons is not yet implemented.")
[docs]
def heal_polygons(self):
"""Heal polygons."""
raise NotImplementedError("heal_polygons is not yet implemented.")
[docs]
def write_abq_script_data(self):
"""Export or convert to the ABQ script data format."""
raise NotImplementedError("write_abq_script_data is not yet implemented.")
[docs]
def set_mesh_properties(self):
"""Set or update mesh properties."""
raise NotImplementedError("set_mesh_properties is not yet implemented.")
[docs]
def mesh(self):
"""Mesh."""
raise NotImplementedError("mesh is not yet implemented.")
[docs]
def assess_mesh_quality(self):
"""Assess mesh quality."""
raise NotImplementedError("assess_mesh_quality is not yet implemented.")
[docs]
def export_mesh(self):
"""Export the mesh data."""
raise NotImplementedError("export_mesh is not yet implemented.")
[docs]
def set_grain_centroids_raw(self, verbose=True):
"""Set raw grain centroids computed from lgi pixel locations."""
# Validations
if verbose:
print("Extracting grain boundary segments based on polygon intersections.")
centroids_raw = []
for gid in self.gid:
centroid = list(np.argwhere(self.lgi == gid).mean(axis=0)-0.5)
#coordinates = self.raster_img_polygonisation_results[gid-1][0][0]['coordinates'][0][:-1]
#centroids_raw.append(list(np.array(coordinates).mean(axis=0)))
centroids_raw.append(centroid)
self.centroids_raw = np.array(centroids_raw)
[docs]
def set_grain_centroids(self, verbose=True):
"""Set grain centroids from polygon coordinates."""
# Validations
self.centroids = []
# To do
[docs]
def set_polygonization_xyoffset(self, xyoffset, verbose=True):
"""Set or update polygonization xyoffset."""
# Validations
if verbose:
print(f"Setting up polygonization xy-offset to {xyoffset}.")
self.xyoffset = xyoffset
[docs]
def pix_to_geom(self, polygonisation_offset=0.5, verbose=True, perform_xtal_dict_check=True):
"""Pix to geom."""
if verbose:
print(40*'-', "\n", "Starting geometrification of the grain structure.")
if verbose:
print("\n \n ------ > Phase-1 <----- \n \n")
self.set_up_quality_measures(verbose=verbose)
self.polygonize(verbose=verbose)
self.set_polygonization_xyoffset(polygonisation_offset,
verbose=verbose)
self.make_polygonal_grains_raw(verbose=verbose, perform_xtal_dict_check=perform_xtal_dict_check)
self.set_polygons(verbose=verbose)
self.make_gsmp(verbose=verbose)
self.find_neighbors(verbose=verbose)
self.set_grain_centroids_raw(verbose=verbose)
self.set_grain_loc_ids(verbose=verbose)
self.get_junction_points_from_grain_intersections(verbose=verbose)
self.extract_GBP(verbose=verbose)
self.find_GBP_at_boundary(verbose=verbose)
self.update_JNP_from_GBP_at_boundary(verbose=verbose)
if verbose:
print("\n \n ------ > Phase-2 <----- \n \n")
self.build_jnp_objects(verbose=verbose)
self.build_all_gbp_objects(verbose=verbose)
self.get_gbp_grain_wise_coords(verbose=verbose)
if verbose:
print("\n \n ------ > Phase-3 <----- \n \n")
self.build_jnp_grain_wise_indices(verbose=verbose)
self.build_gbp_grain_wise_indices_geometric(verbose=verbose)
self.build_gbp_grain_wise_indices_coordbased(verbose=verbose)
self.build_gbp_grain_wise_indices_pointsbased(verbose=verbose)
self.build_gbmullines_grain_wise(verbose=verbose)
self.build_sorted_jnp_objects(plot=False, verbose=verbose)
if verbose:
print("\n \n ------ > Phase-4 <----- \n \n")
self.align_gbmullines_start_to_jnp_start(plot_bf=False,
plot_af=False, verbose=verbose)
self.splice_grain_boundary_segments_at_junction_points(verbose=verbose)
self.find_quality_of_grain_boundary_segmentation(verbose=verbose)
if verbose:
print("\n \n ------ > Phase-5 <----- \n \n")
self.create_neigh_gid_pair_ids(self.neigh_gid, verbose=verbose)
self.setup_neigh_connectivity_flags_DS(verbose=verbose)
self.set_neigh_connectivity_flags_DS(centroid_eq_EPS=1E-8,
verbose=verbose)
if verbose:
print("\n \n ------ > Phase-6 <----- \n \n")
self.gather_grain_boundary_segments_of_all_pairs(verbose=verbose)
self.consolidate_gbsegments(squeeze_segment_data_structure=True,
verbose=verbose)
# problematic_grains = self.get_problematic_grains()
if verbose:
print("\n \n ------ > Phase-7 <----- \n \n")
self.update_consolidated_segments_with_boundary_grain_gids(plot=False,
verbose=verbose)
if verbose:
print("\n \n ------ > Phase-8 <----- \n \n")
self.sort_gbsegments_by_original_order(verbose=verbose)
if verbose:
print("\n \n ------ > Phase-9 <----- \n \n")
self.flip_segments_to_reorder_GBS(plot_each_grain_details=False,
verbose=verbose)
if verbose:
print("\n \n ------ > Phase-10 <----- \n \n")
self.update_segflip_requirements(verbose=verbose)
self.calculate_grain_boundary_coordinates_after_gbseg_reordering(verbose=verbose)
if verbose:
print("\n \n ------ > Phase-11 <----- \n \n")
self.AssembleGBSEGS(self.GB, saa=True, throw=False,
verbose=verbose)
if verbose:
print("\n \n ------ > Phase-12 <----- \n \n")
self.construct_geometric_polyxtal_from_gbcoords(self.GBCoords, dtype='shapely',
saa=True, throw=False, verbose=verbose)
if verbose:
print("\n \n ------ > Phase-13 <----- \n \n")
self.set_pure_gbpoints(verbose=verbose)
if verbose:
print(40*'-', "\n", "Completed geometrification of the grain structure.")
[docs]
def get_bounds_from_grain_boundary_points(self):
"""
Get x and y bounds from grain boundary points.
Notes
-----
Works on grain boundary points obtained from polygonization of the raster
image. ``self.xyoffset`` is subtracted from polygonization end coordinates
to align with MCGS 2D results.
Examples
--------
.. code-block:: python
xmin, xmax, ymin, ymax = self.get_bounds_from_grain_boundary_points()
"""
for gid in self.gid:
_ripolres_ = self.raster_img_polygonisation_results[gid-1][0][0]
if gid == 1:
gbpoints = np.array(_ripolres_['coordinates'][0][:-1])-self.xyoffset
else:
new_gbpoints = np.array(_ripolres_['coordinates'][0][:-1])-self.xyoffset
gbpoints = np.array(list(gbpoints) + list(new_gbpoints))
gbpoints = np.unique(gbpoints, axis=1)
xmin, xmax = gbpoints[:, 0].min(), gbpoints[:, 0].max()
ymin, ymax = gbpoints[:, 1].min(), gbpoints[:, 1].max()
return xmin, xmax, ymin, ymax
[docs]
def set_grain_loc_ids(self, verbose=True):
"""
Identify the location of grains and place their IDs in self.grain_loc_ids
dictionary.
"""
if verbose:
print("Setting up grain location IDs based on grain centroids.")
if self.gid.size == 1:
self.grain_loc_ids = {'internal': [int(self.gid[0])],
'boundary': [int(self.gid[0])],
'left': [int(self.gid[0])],
'bottom': [int(self.gid[0])],
'right': [int(self.gid[0])],
'top': [int(self.gid[0])],
'pure_left': [int(self.gid[0])],
'pure_bottom': [int(self.gid[0])],
'pure_right': [int(self.gid[0])],
'pure_top': [int(self.gid[0])],
'corner': [int(self.gid[0])],
'bottom_left_corner': [int(self.gid[0])],
'bottom_right_corner': [int(self.gid[0])],
'top_right_corner': [int(self.gid[0])],
'top_left_corner': [int(self.gid[0])],
}
return
# ----------------------------------
xmin, xmax, ymin, ymax = self.get_bounds_from_grain_boundary_points()
# ----------------------------------
# Label grains by their location in the grain structure.
border_grain_flags = [False for gid in self.gid]
internal_grain_flags = [False for gid in self.gid]
corner_grain_flags = [False for gid in self.gid]
# ----------------------------------
bl_grain_flags = [False for gid in self.gid]
tl_grain_flags = [False for gid in self.gid]
br_grain_flags = [False for gid in self.gid]
tr_grain_flags = [False for gid in self.gid]
# ----------------------------------
for gid in self.gid:
_ripolres_ = self.raster_img_polygonisation_results[gid-1][0][0]
coords = np.array(_ripolres_['coordinates'][0])-self.xyoffset
c1 = xmin in coords[:, 0]
c2 = xmax in coords[:, 0]
c3 = ymin in coords[:, 1]
c4 = ymax in coords[:, 1]
if any((c1, c2, c3, c4)):
border_grain_flags[gid-1] = True
if any((coords[:, 0] == xmin) & (coords[:, 1] == ymin)):
corner_grain_flags[gid-1] = True
bl_grain_flags[gid-1] = True # Bottom left
elif any((coords[:, 0] == xmin) & (coords[:, 1] == ymax)):
corner_grain_flags[gid-1] = True
tl_grain_flags[gid-1] = True # Top left
elif any((coords[:, 0] == xmax) & (coords[:, 1] == ymin)):
corner_grain_flags[gid-1] = True
br_grain_flags[gid-1] = True # Bottom right
elif any((coords[:, 0] == xmax) & (coords[:, 1] == ymax)):
corner_grain_flags[gid-1] = True
tr_grain_flags[gid-1] = True # Top right
else:
internal_grain_flags[gid-1] = True
# ----------------------------------
_border_grain_gids_ = np.argwhere(border_grain_flags).squeeze()+1
_internal_grain_gids_ = np.argwhere(internal_grain_flags).squeeze()+1
_corner_grain_gids_ = np.argwhere(corner_grain_flags).squeeze()+1
if isinstance(_border_grain_gids_, np.integer):
border_grain_gids = [int(_border_grain_gids_)]
else:
border_grain_gids = _border_grain_gids_.tolist()
if isinstance(_internal_grain_gids_, np.integer):
internal_grain_gids = [int(_internal_grain_gids_)]
else:
internal_grain_gids = _internal_grain_gids_.tolist()
if isinstance(_corner_grain_gids_, np.integer):
corner_grain_gids = [int(_corner_grain_gids_)]
else:
corner_grain_gids = _corner_grain_gids_.tolist()
# ----------------------------------
_bl_grain_gids_ = np.argwhere(bl_grain_flags).squeeze()+1
_tl_grain_gids_ = np.argwhere(tl_grain_flags).squeeze()+1
_br_grain_gids_ = np.argwhere(br_grain_flags).squeeze()+1
_tr_grain_gids_ = np.argwhere(tr_grain_flags).squeeze()+1
if isinstance(_bl_grain_gids_, np.integer):
bl_grain_gids = [int(_bl_grain_gids_)]
else:
bl_grain_gids = _bl_grain_gids_.tolist()
if isinstance(_tl_grain_gids_, np.integer):
tl_grain_gids = [int(_tl_grain_gids_)]
else:
tl_grain_gids = _tl_grain_gids_.tolist()
if isinstance(_br_grain_gids_, np.integer):
br_grain_gids = [int(_br_grain_gids_)]
else:
br_grain_gids = _br_grain_gids_.tolist()
if isinstance(_tr_grain_gids_, np.integer):
tr_grain_gids = [int(_tr_grain_gids_)]
else:
tr_grain_gids = _tr_grain_gids_.tolist()
# ----------------------------------
# Identify grains which are pure edge grains. These grains are not on corner.
l_grain_flags = [False for gid in self.gid]
r_grain_flags = [False for gid in self.gid]
b_grain_flags = [False for gid in self.gid]
t_grain_flags = [False for gid in self.gid]
# ----------------------------------
for gid in self.gid:
_ripolres_ = self.raster_img_polygonisation_results[gid-1][0][0]
coords = np.array(_ripolres_['coordinates'][0])-self.xyoffset
if any(coords[:, 0] == xmin):
l_grain_flags[gid-1] = True
if any(coords[:, 0] == xmax):
r_grain_flags[gid-1] = True
if any(coords[:, 1] == ymin):
b_grain_flags[gid-1] = True
if any(coords[:, 1] == ymax):
t_grain_flags[gid-1] = True
# ----------------------------------
_l_grain_gids_ = np.argwhere(l_grain_flags).squeeze()+1
_r_grain_gids_ = np.argwhere(r_grain_flags).squeeze()+1
_b_grain_gids_ = np.argwhere(b_grain_flags).squeeze()+1
_t_grain_gids_ = np.argwhere(t_grain_flags).squeeze()+1
if isinstance(_l_grain_gids_, np.integer):
l_grain_gids = [int(_l_grain_gids_)]
else:
l_grain_gids = _l_grain_gids_.tolist()
if isinstance(_r_grain_gids_, np.integer):
r_grain_gids = [int(_r_grain_gids_)]
else:
r_grain_gids = _r_grain_gids_.tolist()
if isinstance(_b_grain_gids_, np.integer):
b_grain_gids = [int(_b_grain_gids_)]
else:
b_grain_gids = _b_grain_gids_.tolist()
if isinstance(_t_grain_gids_, np.integer):
t_grain_gids = [int(_t_grain_gids_)]
else:
t_grain_gids = _t_grain_gids_.tolist()
# ----------------------------------
pl_grain_gids = list(set(l_grain_gids) - set(bl_grain_gids) - set(tl_grain_gids))
pr_grain_gids = list(set(r_grain_gids) - set(br_grain_gids) - set(tr_grain_gids))
pb_grain_gids = list(set(b_grain_gids) - set(bl_grain_gids) - set(br_grain_gids))
pt_grain_gids = list(set(t_grain_gids) - set(tl_grain_gids) - set(tr_grain_gids))
# ----------------------------------
self.grain_loc_ids = {'internal': internal_grain_gids,
'boundary': border_grain_gids,
'left': l_grain_gids,
'bottom': b_grain_gids,
'right': r_grain_gids,
'top': t_grain_gids,
'pure_left': pl_grain_gids,
'pure_bottom': pb_grain_gids,
'pure_right': pr_grain_gids,
'pure_top': pt_grain_gids,
'corner': corner_grain_gids,
'bottom_left_corner': bl_grain_gids,
'bottom_right_corner': br_grain_gids,
'top_right_corner': tr_grain_gids,
'top_left_corner': tl_grain_gids,
}
[docs]
def get_junction_points_from_grain_intersections(self, verbose=True):
"""Return the junction points from grain intersections."""
if verbose:
print("Extracting grain boundary segments and junction points")
self.JNP = []
for gid1 in self.gid:
gid1_xy = []
if gid1 % 50 == 0:
_s_ = "Extracting Junction points"
print(f'{_s_} {np.round(gid1*100/self.n, 2)} % complete.')
for gid2 in self.gid:
if gid2 <= gid1:
continue
# print(gid1, gid2)
self.polygons[gid1-1]
self.polygons[gid2-1]
intersec = self.polygons[gid1-1].intersection(self.polygons[gid2-1])
# intersec = self.polygons[gid1-1].boundary.intersection(self.polygons[gid2-1].boundary)
# print(type(intersec))
# print(isinstance(intersec, MultiLineString))
# print(isinstance(intersec, LineString))
if isinstance(intersec, LineString):
# print(intersec.coords.self.JNP)
self.JNP.append([intersec.coords.xy[0][0], intersec.coords.xy[1][0]])
pass
elif isinstance(intersec, MultiLineString):
# print(dir(intersec))
# print(intersec.boundary)#.coords.xy)
if isinstance(intersec.boundary, MultiPoint):
for pnt in intersec.boundary.geoms:
self.JNP.append([pnt.x, pnt.y])
if isinstance(intersec.boundary, tuple):
self.JNP.append([intersec.boundary[0][0], intersec.boundary[1][0]])
self.JNP.append([intersec.boundary[0][1], intersec.boundary[1][1]])
#for line in intersec.geoms:
# self.JNP.append([line.coords.self.JNP[0][0], line.coords.self.JNP[1][0]])
# self.JNP.append([line.coords.self.JNP[0][1], line.coords.self.JNP[1][1]])
elif isinstance(intersec, Point):
self.JNP.append([intersec.x, intersec.y])
self.JNP = np.unique(self.JNP, axis = 0)-self.xyoffset
[docs]
def set_up_quality_measures(self, verbose=True):
"""
Initiate the quality dictionary to hold results on operation quality.
"""
if verbose:
print("Setting up initial data structures and data")
self.quality = {'gb_segmentation': [],
'GBS_reordering_success': None, }
[docs]
def get_bounds_from_GBP(self):
"""Calculate the bounds of the polygonized grain structure."""
xmin = self.GBP[:, 0].min()
xmax = self.GBP[:, 0].max()
ymin = self.GBP[:, 1].min()
ymax = self.GBP[:, 1].max()
return xmin, xmax, ymin, ymax
[docs]
def find_GBP_at_boundary(self, verbose=True):
"""Identify grain boundary points lying on the grain structure boundary."""
if verbose:
print("Finding grain boundary points at the boundary of the grain structure.")
xmin, xmax, ymin, ymax = self.get_bounds_from_GBP()
GBPx, GBPy = self.GBP[:, 0], self.GBP[:, 1]
GBP_left, GBP_right = self.GBP[GBPx == xmin], self.GBP[GBPx == xmax]
GBP_bot, GBP_top = self.GBP[GBPy == ymin], self.GBP[GBPy == ymax]
self.GBP_at_boundary = np.unique(np.vstack((GBP_left, GBP_bot,
GBP_right, GBP_top)), axis=0)
# End
## == ## == ## == ## == ## == ## == ## == ## == ## == ## == ## == ## == ## ==
# Add missing points to the junction points.
if self.gid.size > 1:
GBP_at_boundary_flag_to_remove = [False for _ in self.GBP_at_boundary]
for i, gbpboundary in enumerate(self.GBP_at_boundary):
x, y = gbpboundary
if not any((self.JNP[:, 0] == x) & (self.JNP[:, 1] == y)):
GBP_at_boundary_flag_to_remove[i] = True
self.GBP_at_boundary[GBP_at_boundary_flag_to_remove]
[docs]
def update_JNP_from_GBP_at_boundary(self, verbose=True):
"""
Update junction points with the grain boundary points which are on the
boundaries of the poly-xtal.
"""
if verbose:
print("Updating junction point objects based on grain boundary segment coordinates.")
if self.JNP.size == 0:
self.JNP = np.unique(self.GBP_at_boundary, axis=0)
return
self.JNP = np.append(self.JNP, self.GBP_at_boundary, axis=0)
self.JNP = np.unique(self.JNP, axis=0)
[docs]
def build_jnp_objects(self, verbose=True):
"""
Build coordinates, UPXO Point2d objects and shapely point objects of all
junction points.
"""
if verbose:
print("Building grain boundary point objects based on grain boundary segment coordinates.")
self.jnp_all_coords = self.JNP
self.jnp_all_upxo = np.array([Point2d(jnp[0], jnp[1]) for jnp in self.JNP])
self.jnp_all_shapely = [ShPoint2d(jnp[0], jnp[1]) for jnp in self.JNP]
# jnp_all_upxo_mp = MPoint2d.from_xy(jnp_all_coords.T)
# jnp_all_upxo_mp = MPoint2d.from_upxo_points2d(jnp_all_upxo, zloc=0.0)
[docs]
def build_all_gbp_objects(self, verbose=True):
"""Build coordinates of all grain boundary points. Global."""
if verbose:
print("Building grain boundary point objects based on grain boundary segment coordinates.")
self.gbp_all_coords = []
postpolcoords = self.raster_img_polygonisation_results
# len(gbp_all_coords)
for gid in self.gid:
postpolcoords_gid = postpolcoords[gid-1][0][0]['coordinates']
_ = np.array(postpolcoords_gid[0][:-1]) - self.xyoffset
self.gbp_all_coords.extend(_.tolist())
self.gbp_all_coords = np.unique(self.gbp_all_coords, axis=0)
# Build UPXO points from gbp_all_coords data.
self.gbp_all_upxo = np.array([Point2d(gbp[0], gbp[1])
for gbp in self.gbp_all_coords])
self.gbp_all_shapely = [ShPoint2d(gbp[0], gbp[1])
for gbp in self.gbp_all_coords]
# gbp_all_upxo_mp = MPoint2d.from_upxo_points2d(gbp_all_upxo, zloc=0.0)
[docs]
def get_gbp_grain_wise_coords(self, verbose=True):
"""Build coordinates of all grain boundary points, grain-wise."""
if verbose:
print("Building grain boundary point objects based on grain-wise coordinates.")
self.gbp_grain_wise_coords = {}
postpolcoords = self.raster_img_polygonisation_results
for gid in self.gid:
_ = np.array(postpolcoords[gid-1][0][0]['coordinates'][0][:-1]) - self.xyoffset
self.gbp_grain_wise_coords[gid] = _
[docs]
def build_jnp_grain_wise_indices(self, verbose=True):
"""
Build indices of jnp for every grain. Indices will be from jnp_all_coords.
These indices relate to:
* jnp_all_coords, jnp_all_upxo, jnp_all_shapely
* jnp_all_upxo_mp.points, jnp_all_upxo_mp.coords
Examples
--------
.. code-block:: python
# Data access:
jnp_all_coords[jnp_grain_wise_indices[gid]]
jnp_all_upxo[jnp_grain_wise_indices[gid]]
jnp_all_upxo_mp.points[jnp_grain_wise_indices[gid]]
# Verification:
gid = 10
plt.imshow(geom.lgi)
coord = jnp_all_coords[jnp_grain_wise_indices[gid]]
plt.plot(coord[:, 0], coord[:, 1], 'ko')
Notes
-----
jnp_all_upxo and jnp_all_upxo_mp.points share the same objects:
``id(jnp_all_upxo[i]) == id(jnp_all_upxo_mp.points[i])``
"""
if verbose:
print("Building grain boundary point objects based on grain-wise coordinates.")
self.jnp_grain_wise_indices = {}
for gid in self.gid:
self.jnp_grain_wise_indices[gid] = []
pol_shapely = ShAff.translate(self.polygons[gid-1],
xoff=-self.xyoffset,
yoff=-self.xyoffset)
for i, jnp_shapely in enumerate(self.jnp_all_shapely, start=0):
if pol_shapely.touches(jnp_shapely):
self.jnp_grain_wise_indices[gid].append(i)
[docs]
def build_gbp_grain_wise_indices_geometric(self, verbose=True):
"""
This is to correct inconsistencies in build_jnp_grain_wise_indices method.
There are cases which lead to mulline constityutent lines intersecting each other.
This step is crucial to avoid it.
"""
if verbose:
print("Building grain boundary point objects based on grain-wise coordinates.")
self.gbp_grain_wise_indices = {}
for gid in self.gid:
self.gbp_grain_wise_indices[gid] = []
pol_shapely = ShAff.translate(self.polygons[gid-1],
xoff=-self.xyoffset,
yoff=-self.xyoffset)
for i, gbp_shapely in enumerate(self.gbp_all_shapely, start=0):
if pol_shapely.touches(gbp_shapely):
self.gbp_grain_wise_indices[gid].append(i)
[docs]
def build_gbp_grain_wise_indices_coordbased(self, verbose=True):
"""Build and return gbp grain wise indices coordbased."""
if verbose:
print("Building grain boundary point objects based on grain-wise indices and geometric coordinates.")
findloc = DO.find_coorda_loc_in_coords_arrayb
self.gbp_grain_wise_indices = {gid: None for gid in self.gid}
for gid in self.gid:
plist = self.gbp_grain_wise_coords[gid]
self.gbp_grain_wise_indices[gid] = [findloc(p, self.gbp_all_coords) for p in plist]
[docs]
def build_gbp_grain_wise_indices_pointsbased(self, verbose=True):
"""Build and return gbp grain wise indices pointsbased."""
if verbose:
print("Building grain boundary point objects based on grain-wise indices.")
self.gbp_grain_wise_points = {gid: None for gid in self.gid}
for gid in self.gid:
plist = self.gbp_grain_wise_coords[gid]
self.gbp_grain_wise_points[gid] = self.gbp_all_upxo[self.gbp_grain_wise_indices[gid]]
[docs]
def build_gbmullines_grain_wise(self, verbose=True):
"""Build grain boundary multi-linestring objects from grain boundary point data."""
if verbose:
print("Building grain boundary multi-linestrings")
# Build lines from gbp_grain_wise_coordinates.
self.gbmullines_grain_wise = {gid: [] for gid in self.gid}
for gid in self.gid:
nodes = self.gbp_grain_wise_points[gid].tolist()
self.gbmullines_grain_wise[gid] = MSline2d.by_nodes(nodes, close=False)
# ax = gbmullines_grain_wise[gid].plot()
for gid in self.gid:
self.gbmullines_grain_wise[gid].close(reclose=False)
for gid in self.gid:
# Build the jnp coordinates which are not there in gbp array.
jnp_indices_toinsert = []
for i, jnp in enumerate(self.jnp_all_coords[self.jnp_grain_wise_indices[gid]]):
if not DO.is_a_in_b(jnp, self.gbp_grain_wise_coords[gid]):
# This means jnp should be inserted into self.gbp_grain_wise_coords[gid].
jnp_indices_toinsert.append(i)
jnps_to_insert = self.jnp_all_upxo[self.jnp_grain_wise_indices[gid]][jnp_indices_toinsert]
# --------------------------
#if len(jnps_to_insert) > 0:
self.gbmullines_grain_wise[gid].add_nodes(jnps_to_insert)
[docs]
def arrange_junction_point_coords_new(self, gbcoords_thisgrain, junction_points_coord):
"""Arrange junction point coords new."""
# Create a dictionary to map coordinates to their indices in gbcoords_thisgrain
coord_index_map = {tuple(coord): idx for idx, coord in enumerate(gbcoords_thisgrain)}
# Generate a list of indices for sorting
sorted_indices = sorted(range(len(junction_points_coord)), key=lambda i: coord_index_map[tuple(junction_points_coord[i])])
# Sort junction_points_coord based on the sorted indices
sorted_junction_points = np.array([junction_points_coord[i] for i in sorted_indices])
return sorted_junction_points, sorted_indices
[docs]
def build_sorted_jnp_objects(self, plot=False, verbose=True):
"""Build and return sorted jnp objects."""
if verbose:
print("Building sorted junction point objects")
# find_coord_loc = DO.find_coorda_loc_in_coords_arrayb
self.jnp_all_sorted_coords = {gid: None for gid in self.gid}
self.jnp_all_sorted_upxo = {gid: None for gid in self.gid}
for gid in self.gid:
# gbpoints_thisgrain = self.gbmullines_grain_wise[gid].nodes
gbcoords_thisgrain = self.gbmullines_grain_wise[gid].get_node_coords()
junction_points_upxo, junction_points_coord = [], []
for i, jnpcoord in enumerate(self.jnp_all_coords):
if DO.is_a_in_b(jnpcoord, gbcoords_thisgrain):
junction_points_coord.append(jnpcoord)
junction_points_upxo.append(self.jnp_all_upxo[i])
# junction_points_coord = arrange_junction_point_coords(gbcoords_thisgrain, junction_points_coord)
# self.arrange_junction_point_coords_new(self.gbcoords_thisgrain, self.junction_points_coord)
junction_points_coord, _ = self.arrange_junction_point_coords_new(gbcoords_thisgrain, junction_points_coord)
junction_points_upxo = list(np.array(junction_points_upxo)[_])
junction_points_coord, sorted_indices = self.arrange_junction_point_coords_new(gbcoords_thisgrain,
junction_points_coord)
junction_points_upxo = list(np.array(junction_points_upxo)[sorted_indices])
# junction_points_upxo, _ = arrange_junction_points_upxo(gbpoints_thisgrain, junction_points_upxo)
self.jnp_all_sorted_coords[gid] = junction_points_coord
self.jnp_all_sorted_upxo[gid] = junction_points_upxo
if plot:
for gid in self.gid:
plt.figure()
coord = self.gbmullines_grain_wise[gid].get_node_coords()
plt.plot(coord[:, 0], coord[:, 1], '-k.')
for i, c in enumerate(coord[:-1,:], 0):
plt.text(c[0]+0.15, c[1]+0.15, i)
jnp = self.jnp_all_sorted_coords[gid]
plt.plot(jnp[:, 0], jnp[:, 1], 'ro', mfc='c', alpha=0.5)
for i, j in enumerate(jnp):
plt.text(j[0]+0.15, j[1], i, color='red')
[docs]
def align_gbmullines_start_to_jnp_start(self, plot_bf=False, plot_af=False, verbose=True):
"""Align gbmullines start to jnp start."""
if verbose:
print("Aligning grain boundary multi-linestrings with junction points.")
if plot_bf:
for gid in self.gid:
plt.figure()
coord = self.gbmullines_grain_wise[gid].get_node_coords()
plt.plot(coord[:, 0], coord[:, 1], '-k.')
for i, c in enumerate(coord[:-1,:], 0):
plt.text(c[0]+0.15, c[1]+0.15, i)
jnp = self.jnp_all_sorted_coords[gid]
plt.plot(jnp[:, 0], jnp[:, 1], 'ro', mfc='c', alpha=0.5)
for i, j in enumerate(jnp):
plt.text(j[0]+0.15, j[1], i, color='red')
# ---------------------------------------------------
for gid in self.gid:
roll_distance = DO.find_coorda_loc_in_coords_arrayb(self.jnp_all_sorted_coords[gid][0],
self.gbmullines_grain_wise[gid].get_node_coords())
self.gbmullines_grain_wise[gid].roll(roll_distance)
# ---------------------------------------------------
if plot_af:
for gid in self.gid:
plt.figure()
coord = self.gbmullines_grain_wise[gid].get_node_coords()
plt.plot(coord[:, 0], coord[:, 1], '-k.')
for i, c in enumerate(coord[:-1,:], 0):
plt.text(c[0]+0.15, c[1]+0.15, i)
jnp = self.jnp_all_sorted_coords[gid]
plt.plot(jnp[:, 0], jnp[:, 1], 'ro', mfc='c', alpha=0.5)
for i, j in enumerate(jnp):
plt.text(j[0]+0.15, j[1], i, color='red')
[docs]
def splice_grain_boundary_segments_at_junction_points(self, plot=False, verbose=True):
"""
Splice the grain boundary into grain boundary segments using jnp point data
"""
if verbose:
print("Finding junction points at grain boundary segment intersections and splicing grain boundary segments at junction points.")
self.gbsegments = {gid: [] for gid in self.gid}
for gid in self.gid:
if len(self.jnp_all_sorted_upxo[gid]) == 1:
segment = self.gbmullines_grain_wise[gid].lines
elif len(self.jnp_all_sorted_upxo[gid]) > 1:
choplocs = []
for point in self.jnp_all_sorted_upxo[gid]:
location = point.eq_fast(self.gbmullines_grain_wise[gid].nodes[:-1], point_spec=2)
choplocs.append(np.argwhere(location).squeeze().tolist())
if choplocs[0] != 0:
choplocs = [0] + choplocs
if choplocs[-1] == len(self.gbmullines_grain_wise[gid].lines):
choplocs = choplocs[:-1]
ranges = []
for i in range(1, len(choplocs)):
ranges.append([choplocs[i-1], choplocs[i]])
for r in ranges:
lines = self.gbmullines_grain_wise[gid].lines[r[0]:r[1]]
self.gbsegments[gid].append(MSline2d.from_lines(lines, close=False))
rem_lines = self.gbmullines_grain_wise[gid].lines[r[1]:len(self.gbmullines_grain_wise[gid].lines)]
self.gbsegments[gid].append(MSline2d.from_lines(rem_lines, close=False))
if plot:
fig, ax = plt.subplots()
ax.imshow(self.lgi)
for gid in self.gid:
for gbseg in self.gbsegments[gid]:
coords = gbseg.get_node_coords()
ax.plot(coords[:, 0], coords[:, 1], '-o', ms=5)
ax.plot(self.jnp_all_sorted_coords[gid][:, 0],
self.jnp_all_sorted_coords[gid][:, 1],
'k*', ms = 7)
centroid = self.gbmullines_grain_wise[gid].get_node_coords()[:-1].mean(axis=0)
plt.text(centroid[0], centroid[1], gid, color='white', fontsize=12)
[docs]
def find_quality_of_grain_boundary_segmentation(self, verbose=True):
"""Assess quality of grain boundary segmentation by comparing segment counts per grain."""
if verbose:
print("Building sorted grain boundary segment objects after splicing at junction points.")
GBSEG = []
gbseg_map_indices = {gid: [] for gid in self.gid}
i = 0
for gid in self.gid:
for gbseg in self.gbsegments[gid]:
GBSEG.append(gbseg)
gbseg_map_indices[gid].append(i)
i += 1
quality = []
for gid in self.gid:
quality.append(int(len(gbseg_map_indices[gid]) == len(self.neigh_gid)))
quality = (self.n-sum(quality))*100/self.n
self.quality['gb_segmentation'] = quality
if verbose:
print(f'Grain boundary segmentation quality measure 1: {quality} %')
[docs]
def create_neigh_gid_pair_ids(self, neigh_gid, verbose=True):
"""Create a dictionary mapping unique grain pairs to integer IDs.
Parameters
----------
neigh_gid : dict
Keys are grain IDs and values are lists of neighbouring grain IDs.
Returns
-------
dict
Keys are integer pair IDs and values are lists of two grain IDs.
"""
if verbose:
print("Setting up neighbor connectivity flags and consolidating grain boundary segments.")
self.gid_pair_ids = {}
pair_id = 1 # Start with pair ID 1
# ----------------------------------------
for gid, neighbors in neigh_gid.items():
for neighbor in neighbors:
# Create a sorted tuple of the pair (ensures uniqueness)
pair = tuple(sorted((gid, neighbor)))
# Assign a new pair ID if not seen before
if pair not in self.gid_pair_ids:
self.gid_pair_ids[pair_id] = list(pair)
pair_id += 1
# ----------------------------------------
self.gid_pair_ids_unique_lr = np.unique(np.array(list(self.gid_pair_ids.values())), axis=0)
self.gid_pair_ids_unique_rl = np.flip(self.gid_pair_ids_unique_lr, axis=1)
[docs]
def get_random_gbpoint_between_jnpoints(self, gbcoords, jnpcoords):
"""Return a random grain boundary point located between two junction points."""
first = DO.find_coorda_loc_in_coords_arrayb(jnpcoords[0], gbcoords)
last = DO.find_coorda_loc_in_coords_arrayb(jnpcoords[1], gbcoords)
if last-first >= 2:
return gbcoords[np.random.randint(first+1, last)]
else:
return None
[docs]
def get_random_gbpoints_between_jnpoints(self, gid):
"""Return random grain boundary points between junction points for a given grain."""
gbcoords = self.gbmullines_grain_wise[gid].get_node_coords()
seg_ends = self.extract_end_coordinates_of_grain_boundary_segments_grain_wise(gid)
random_gb_points = {i: None for i in seg_ends.keys()}
# ------------------------------------------
for key, seg_end in seg_ends.items():
first = DO.find_coorda_loc_in_coords_arrayb(seg_end[0], gbcoords)
if sum(seg_end[1] - seg_ends[0][0]) == 0.0:
last = len(gbcoords)
else:
last = DO.find_coorda_loc_in_coords_arrayb(seg_end[1], gbcoords)
# ------------------------------------------
if last-first >= 2:
random_gb_points[key] = gbcoords[np.random.randint(first+1, last)]
else:
random_gb_points[key] = None
return random_gb_points
[docs]
def setup_neigh_connectivity_flags_DS(self, neigh_sense='lr',
field_names=['gbseg',
'nnodes_eq',
'length_eq', 'n',
'areas_raw',
'uniquified'],
verbose=True):
"""Setup neigh connectivity flags ds."""
if verbose:
print("Setting up neighbor connectivity flags for grain boundary segments.")
if neigh_sense == 'lr':
unique_pair_ids = self.gid_pair_ids_unique_lr
elif neigh_sense == 'rl':
unique_pair_ids = self.gid_pair_ids_unique_rl
self.nconn = {tuple(neighpair): {fn: [] for fn in field_names}
for neighpair in unique_pair_ids}
[docs]
def set_neigh_connectivity_flags_DS(self, centroid_eq_EPS=1E-8, verbose=True):
"""Set or update neigh connectivity flags DS."""
if verbose:
print("Setting up neighbor connectivity flags for grain boundary segments based on centroid proximity.")
for i, pair in enumerate(self.gid_pair_ids_unique_lr):
# flag = nconn[tuple(pair)]['gbseg']
pair_rl = tuple((pair[1], pair[0]))
self.nconn[tuple(pair)]['areas_raw'].append(self.area_gid(pair[0], gsrepr='raw'))
self.nconn[tuple(pair)]['areas_raw'].append(self.area_gid(pair[1], gsrepr='raw'))
# ====================================
for gbseg1 in self.gbsegments[pair[0]]:
# Iterate through all grain boundary segments of the centre grain (pair[0]).
# gbseg1 = gbsegments[pair[0]][0]
gbseg1_nnodes = gbseg1.nnodes # Number of nodes
gbseg1_centroid = gbseg1.centroid_p2dl # Centroidal point object
gbseg1_length = gbseg1.length # Total lemngth
# ====================================
for gbseg2 in self.gbsegments[pair[1]]:
# Iterate through all grain boundary segments of the neighbour grain (pair[1]).
gbseg2_nnodes = gbseg2.nnodes
proceed = False
# Prepare for the nnodes equality test.
nnodes_equality = gbseg1_nnodes == gbseg2_nnodes
if nnodes_equality:
# nnodes equality test passed. Prepare for the next test.
_fx_ = gbseg1_centroid.is_p2dl_within_cor
centroid_equality = _fx_(gbseg2.centroid_p2dl,
centroid_eq_EPS)
else:
continue
# ----------------------------
if centroid_equality:
# centroid equality test passed. Prepare for the next test.
ldiff = abs(gbseg1_length - gbseg2.length)
length_equality = ldiff <= centroid_eq_EPS
else:
continue
# ----------------------------
self.nconn[tuple(pair)]['gbseg'].append(gbseg1)
self.nconn[tuple(pair)]['gbseg'].append(gbseg2)
# ----------------------------
self.nconn[tuple(pair)]['nnodes_eq'].append(gbseg1.nnodes == gbseg2.nnodes)
# ----------------------------
self.nconn[tuple(pair)]['length_eq'].append(gbseg1.length == gbseg2.length)
# ----------------------------
self.nconn[tuple(pair)]['n'].append(len(self.nconn[tuple(pair)]['gbseg']))
self.nconn[tuple(pair)]['uniquified'] = False
[docs]
def get_unique_object_indices(self, nnodes, lengths, centroids, tol=1e-8):
"""
Find indices of unique objects based on nnodes, lengths, and centroids.
Parameters
----------
nnodes : np.ndarray
1D array of nnode values.
lengths : np.ndarray
1D array of length values.
centroids : np.ndarray
2D array of centroid coordinates.
tol : float, optional
Tolerance for centroid coordinate comparison. Default is ``1e-8``.
Returns
-------
np.ndarray
1D array of indices corresponding to unique objects.
"""
# Create a structured array for combined properties
dtype = [('nnodes', nnodes.dtype),
('lengths', lengths.dtype),
('centroids', centroids.dtype, (2,))]
data = np.empty(nnodes.shape[0], dtype=dtype)
data['nnodes'] = nnodes
data['lengths'] = lengths
data['centroids'] = centroids
# Round centroids to handle floating-point errors
data['centroids'] = np.round(data['centroids'], decimals=int(-np.log10(tol)))
# Find unique entries
_, unique_indices = np.unique(data, return_index=True)
return unique_indices
[docs]
def gather_grain_boundary_segments_of_all_pairs(self, verbose=True):
"""Gather grain boundary segments of all pairs."""
# Gather grain boundary segments of all pairs
if verbose:
print("Gathering grain boundary segments of all pairs and consolidating them.")
print("Constructing grain neigh pair ID keyed and gbseg valued dictionary")
self.GBSEGMENTS = {tuple(pair): None for pair in self.gid_pair_ids_unique_lr}
for pair in self.gid_pair_ids_unique_lr:
_gbseg_ = self.nconn[tuple(pair)]['gbseg']
if len(_gbseg_) > 0:
###########################################
# MAKE UNIQUE THE LIST OF gbsegments in _gbseg_.
nnodes = np.array([seg.nnodes for seg in self.nconn[tuple(pair)]['gbseg']])
lengths = np.array([seg.length for seg in self.nconn[tuple(pair)]['gbseg']])
centroids = np.array([seg.centroid for seg in self.nconn[tuple(pair)]['gbseg']])
ui = self.get_unique_object_indices(nnodes, lengths, centroids, tol=1e-8)
###########################################
self.GBSEGMENTS[tuple(pair)] = [self.nconn[tuple(pair)]['gbseg'][_ui_] for _ui_ in ui]
'''As we no longer need any duplicates, we will override the repeated
ones in ncoon as well.'''
self.nconn[tuple(pair)]['gbseg'] = [self.nconn[tuple(pair)]['gbseg'][_ui_] for _ui_ in ui]
# No longer need duplicates — override repeated ones in nconn as well.
self.nconn[tuple(pair)]['uniquified'] = True
# GBSEGMENTS[tuple(pair)] = _gbseg_[0]
###########################################
# Access
# [nconn[tuple((32, 36))]['gbseg'][_ui_] for _ui_ in ui]
[docs]
def consolidate_gbsegments(self, squeeze_segment_data_structure=False, verbose=True):
"""Consolidate grain boundary segments by grain ID.
Parameters
----------
squeeze_segment_data_structure : bool, optional
If True, flatten nested segment lists. Default is False.
Returns
-------
dict
Keys are grain IDs and values are lists of grain boundary segments.
"""
if verbose:
print("Consolidating grain boundary segments and updating them with boundary grain IDs.")
if self.gid.size == 1:
self.consolidated_segments = self.gbsegments
return
grain_segments = {} # To store the consolidated segments
for pair, segment in self.GBSEGMENTS.items():
gid1, gid2 = pair
grain_segments.setdefault(gid1, []).append(segment) # Add to gid1
grain_segments.setdefault(gid2, []).append(segment) # Add to gid2
# -------------------------------------
self.consolidated_segments = {}
for index in np.unique(list(grain_segments.keys())):
self.consolidated_segments[index] = grain_segments[index]
# -------------------------------------
if squeeze_segment_data_structure:
for index in np.unique(list(grain_segments.keys())):
squeezed = []
if self.consolidated_segments[index] is not None:
for a in self.consolidated_segments[index]:
if a is not None:
squeezed.extend(a)
self.consolidated_segments[index] = squeezed
[docs]
def get_problematic_grains(self, plot=False):
"""Def name must be changed."""
areas, nidentical_seg, nnodes_eq, length_eq = [], [], [], []
for nconn_val in self.nconn.values():
areas.append(nconn_val['areas_raw'])
nidentical_seg.append(nconn_val['n'])
nnodes_eq.append(nconn_val['nnodes_eq'])
length_eq.append(nconn_val['length_eq'])
areas = np.array(areas)
# Identify the problematic grains
areas_min = np.array(areas).min(axis=1)
areas_min_pair_locations = np.argwhere(areas_min == 1)
pairids = self.gid_pair_ids_unique_lr[areas_min_pair_locations].squeeze()
pairids_areas = areas[areas_min_pair_locations.squeeze()]
# End
## == ## == ## == ## == ## == ## == ## == ## == ## == ## == ## == ## == ## =
# Start
problematic_grains = {gid: self.polygons[gid-1] for gid in
np.unique(np.hstack((pairids[:, 0][pairids_areas[:, 0] == 1],
pairids[:, 1][pairids_areas[:, 1] == 1])))}
if plot:
fig, ax = plt.subplots()
ax.imshow(self.lgi)
for prbgrain in problematic_grains.values():
cenx, ceny = prbgrain.centroid.xy
cenx, ceny = cenx[0]-0.5, ceny[0]-0.5
ax.plot(cenx, ceny, 'kx')
# problematic_grains
return problematic_grains
[docs]
def find_segs_at_loc(self, gbsegs, axis='y', location=-0.5):
"""Find segs at loc."""
gb_indices, gbsegs_at_location = [], []
column = 0 if axis == 'x' else 1 if axis == 'y' else None
for i, gbseg in enumerate(gbsegs, start=0):
if all(gbseg.get_node_coords()[:, column] == location):
gb_indices.append(i)
gbsegs_at_location.append(gbseg)
return gb_indices, gbsegs_at_location
[docs]
def update_consolidated_segments_with_boundary_grain_gids(self, plot=False, verbose=True):
"""Set or update te consolidated segments with boundary grain gids."""
if verbose:
print("Updating consolidated grain boundary segments with boundary grain IDs.")
if self.gid.size == 1:
return
xmin, xmax, ymin, ymax = self.get_bounds_from_GBP()
for fid in self.grain_loc_ids['bottom_left_corner']:
for seg in self.gbsegments[fid]:
if seg.has_coord([xmin, ymin]):
self.consolidated_segments[fid].append(seg)
for fid in self.grain_loc_ids['bottom_right_corner']:
for seg in self.gbsegments[fid]:
if seg.has_coord([xmax, ymin]):
self.consolidated_segments[fid].append(seg)
for fid in self.grain_loc_ids['top_right_corner']:
for seg in self.gbsegments[fid]:
if seg.has_coord([xmax, ymax]):
self.consolidated_segments[fid].append(seg)
for fid in self.grain_loc_ids['top_left_corner']:
for seg in self.gbsegments[fid]:
if seg.has_coord([xmin, ymax]):
self.consolidated_segments[fid].append(seg)
for gid in self.grain_loc_ids['pure_bottom']:
gbind, gbs = self.find_segs_at_loc(self.gbsegments[gid],
axis='y', location=ymin)
if len(gbs) > 0:
for _gbs_ in gbs:
self.consolidated_segments[gid].append(_gbs_)
for gid in self.grain_loc_ids['pure_right']:
gbind, gbs = self.find_segs_at_loc(self.gbsegments[gid],
axis='x', location=xmax)
if len(gbs) > 0:
for _gbs_ in gbs:
self.consolidated_segments[gid].append(_gbs_)
for gid in self.grain_loc_ids['pure_top']:
gbind, gbs = self.find_segs_at_loc(self.gbsegments[gid],
axis='y', location=ymax)
if len(gbs) > 0:
for _gbs_ in gbs:
self.consolidated_segments[gid].append(_gbs_)
for gid in self.grain_loc_ids['pure_left']:
gbind, gbs = self.find_segs_at_loc(self.gbsegments[gid],
axis='x', location=xmin)
if len(gbs) > 0:
for _gbs_ in gbs:
self.consolidated_segments[gid].append(_gbs_)
if plot:
self.plot_consolidated_segments()
[docs]
def plot_consolidated_segments(self):
"""Visualise consolidated segments using Matplotlib or PyVista."""
fig, ax = plt.subplots()
ax.imshow(self.lgi)
for gid in self.gid:
for _ in self.consolidated_segments[gid]:
_.plot(ax=ax)
coord = self.gbmullines_grain_wise[gid].get_node_coords()
plt.plot(coord[:, 0], coord[:, 1], 'k.')
jnp = self.jnp_all_sorted_coords[gid]
plt.plot(jnp[:, 0], jnp[:, 1], 'ks', mfc='c', ms=10, alpha=0.25)
[docs]
def flip_segments_to_reorder_GBS(self, plot_each_grain_details=False, verbose=True):
"""Flip segments to reorder gbs."""
if verbose:
print("Flipping grain boundary segments to reorder them in a consistent manner.")
self.GB = {gid: None for gid in self.gid}
self.quality['GBS_reordering_success'] = {gid: None for gid in self.gid}
niterations = {gid: 0 for gid in self.gid}
from upxo.geoEntities.mulsline2d import ring2d
for gid in self.gid:
gbsegs = self.consolidated_segments[gid]
if verbose:
print(40*'-')
_gbseg_ring_ = ring2d(segments=gbsegs,
segids=list(range(len(gbsegs))),
segflips=[False for _ in gbsegs])
# segments=None, segids=None, segflips=None
# _gbseg_ring_.segments
if plot_each_grain_details:
_gbseg_ring_.plot_segs(plot_centroid=True, centroid_text=gid,
plot_coord_order=True, visualize_flip_req=False)
# _gbseg_ring_.create_polygon_from_coords()
continuity, flip_needed, i_precede_chain = _gbseg_ring_.assess_spatial_continuity()
if verbose:
print(f'gid={gid}: ', 'gbsegs continuous' if continuity else 'gbsegs not continuous. Attempting reorder.')
# print(40*'-', '\n Extreme coordinates of gbsegs are:\n')
# for i, gbseg in enumerate(gbsegs, start = 0):
# print(f'Segment {i}: ', gbseg.nodes[0], gbseg.nodes[-1])
# print(40*'-')
# plt.imshow(geom.lgi==gid)
if continuity:
# If the segments are indeed continuous, ring formation is straightforward.
# segments=None, segids=None, segflips=None
NSEG_rng = range(len(gbsegs))
self.GB[gid] = ring2d(gbsegs, list(NSEG_rng), [False for _ in NSEG_rng])
# GB[gid].segments
# GB[gid].segids
# GB[gid].segflips
self.quality['GBS_reordering_success'][gid] = True
if plot_each_grain_details:
self.GB[gid].plot_segs(plot_centroid=True, centroid_text=gid,
plot_coord_order=True, visualize_flip_req=True)
if not continuity:
# If segments are not continuous, ring formation requires computing exact
# segids and segflip values to ensure spatial continuity of adjacent segment nodes.
self.quality['GBS_reordering_success'][gid] = False
# ------------------------
segids = list(range(len(gbsegs)))
# segstart_flip set to True if gbsegs[0] goes counter-clockwise.
# Setting to False for now — needs separate assessment.
segstart_flip = False
self.GB[gid] = ring2d([gbsegs[0]], [0], [segstart_flip])
# GB[gid].segments
# GB[gid].segids
# GB[gid].segflips
seg_num = 0
used_segids = [seg_num]
search_segids = set(segids)
max_iterations = 10*len(gbsegs)
itcount = 1 # iteration_count
while len(search_segids) > 0:
current_seg = self.GB[gid].segments[-1]
flip_req_previous = self.GB[gid].segflips[-1]
# print(40*'-', '\n Current segment end nodes:\n')
# print(current_seg.nodes[0], current_seg.nodes[-1], '\n', 40*'-')
search_segids = set(segids) - set(used_segids)
adj = []
for candidate_segid in search_segids:
# candidate_segid = 2
# print(40*'-', '\n Current segment end nodes:\n')
# print(current_seg.nodes[0], current_seg.nodes[-1])
candidate_seg = gbsegs[candidate_segid]
if flip_req_previous:
adjacency = current_seg.do_i_proceed(candidate_seg)
else:
adjacency = current_seg.do_i_precede(candidate_seg)
adj.append(adjacency)
# print(40*'-', f'\n Cand. seg. {candidate_segid} end nodes:')
# print(candidate_seg.nodes[0], candidate_seg.nodes[-1])
# print(40*'-', f'\n Current precede candidate: {adjacency}\n', 40*'-')
# adjacency = current_seg.do_i_proceed(candidate_seg)
# print(adjacency)
if adjacency[0]:
used_segids.append(candidate_segid)
self.GB[gid].add_segment_unsafe(candidate_seg)
self.GB[gid].add_segid(candidate_segid)
self.GB[gid].add_segflip(adjacency[1])
'''
GB[gid].segments
GB[gid].segids
GB[gid].segflips
GB[gid].plot_segs()
for i, gbseg in enumerate(GB[gid].segments, start = 0):
print(f'Segment {i}: ', gbseg.nodes[0], gbseg.nodes[-1])
'''
break
if itcount >= max_iterations:
'Prevent infinite loop.'
break
itcount += 1
if len(search_segids) == 0:
# --------------------------------
# Ensure complete connectivity
if not self.GB[gid].segments[0].nodes[0].eq_fast(self.GB[gid].segments[-1].nodes[-1]):
self.GB[gid].segflips[-1] = True
# --------------------------------
self.quality['GBS_reordering_success'][gid] = True
niterations[gid] = itcount
if verbose:
print(f'Re-ordering success. gbsegs are continous. N.Segs={len(gbsegs)}. N.Iterations={itcount}')
else:
# self.quality['GBS_reordering_success'][gid] = False < -- BY default.
# So, nothinhg more to do here.
pass
if plot_each_grain_details:
self.GB[gid].plot_segs(plot_centroid=True, centroid_text=gid,
plot_coord_order=True, visualize_flip_req=True)
# GB[gid].segflips
if verbose:
print(40*'-', f'\nTotal number of iterations: {sum(list(niterations.values()))}')
[docs]
def update_segflip_requirements(self, verbose=True):
"""Set or update te segflip requirements."""
# Re-assessment - segflips of the last segment.
if verbose:
print("Updating segment flip requirements to ensure spatial continuity of grain boundary segments.")
for gid in self.gid:
self.GB[gid]
self.GB[gid].segments[0]
start = self.GB[gid].segments[0].nodes[0]
end0 = self.GB[gid].segments[-1].nodes[0]
end1 = self.GB[gid].segments[-1].nodes[-1]
condition1 = start.eq_fast(end0)[0]
condition2 = start.eq_fast(end1)[0]
if condition1:
self.GB[gid].segflips[-1] = True
[docs]
def sort_subsets_by_original_order(self, CA, subsets):
"""
Sort subsets of coordinates based on their original order in the CA array.
Parameters
----------
CA : np.ndarray
The original 2D coordinate array (N x 2).
subsets : list
A list of np.ndarrays, each representing a subset of coordinates.
Returns
-------
list
Sorted np.ndarrays, each subset ordered according to its position in CA.
np.ndarray
1D array of the original subset indices in sorted order.
"""
coord_to_index = {tuple(coord): idx for idx, coord in enumerate(CA)}
sorted_subsets = []
subset_indices = [] # To track the original indices of subsets
for i, subset in enumerate(subsets):
sorted_indices = np.argsort([coord_to_index[tuple(coord)] for coord in subset])
sorted_subsets.append(subset[sorted_indices])
subset_indices.append(i) # Record the original index
# Sort subset indices based on the first element of each sorted subset
sort_order = np.argsort([coord_to_index[tuple(s[0])] for s in sorted_subsets])
sorted_subsets = [sorted_subsets[i] for i in sort_order]
subset_indices = np.array(subset_indices)[sort_order] # Convert to NumPy array and reorder
return sorted_subsets, subset_indices
[docs]
def sort_gbsegments_by_original_order(self, verbose=True):
"""Sort gbsegments by original order."""
if verbose:
print("Sorting grain boundary segments by original order.")
self.sorted_segs = {gid: None for gid in self.gid}
_sorter_ = self.sort_subsets_by_original_order
for gid in self.gid:
_, subset_indices = _sorter_(self.gbmullines_grain_wise[gid].get_node_coords(),
[seg.get_node_coords() for seg in self.consolidated_segments[gid]])
self.sorted_segs[gid] = [self.consolidated_segments[gid][ssind] for ssind in subset_indices]
[docs]
def calculate_grain_boundary_coordinates_after_gbseg_reordering(self, verbose=True):
"""Calculate grain boundary coordinates after gbseg reordering."""
if verbose:
print("Calculating grain boundary coordinates after grain boundary segment reordering.")
self.GBCoords = {gid: None for gid in self.gid}
for gid in self.gid:
if gid in self.grain_loc_ids['boundary']:
coord = self.GB[gid].create_coords_from_segments(force_close=True)
else:
coord = self.GB[gid].create_coords_from_segments(force_close=False)
self.GBCoords[gid] = coord
[docs]
def are_grains_closed_usenodes(self):
"""Are grains closed usenodes."""
return [gb.check_closed() for gb in self.GB.values()]
[docs]
def are_grains_closed_usecoords(self):
"""Are grains closed usecoords."""
coords = self.GBCoords
flags = []
for gid in self.gid:
flag = np.abs(coords[gid][0] - coords[gid][-1]).sum() <= self.EPS_coord_coincide
flags.append(flag)
return flags
[docs]
def set_pure_gbpoints(self, verbose=True):
"""Set or update pure gbpoints."""
if verbose:
print("Setting up pure grain boundary points.")
GBP_pure = []
for gbp in np.unique(self.GBP, axis=0):
if DO.is_a_in_b(gbp, np.unique(self.JNP, axis=0)):
pass
else:
GBP_pure.append(gbp)
self.GBP_pure = np.array(GBP_pure)
[docs]
def plot_reordered_GBCoords(self, gid, force_close=True):
"""Visualise reordered GBCoords using Matplotlib or PyVista."""
# gid = 43
# GB[gid].segments
# GB[gid].segflips
coords = self.GB[gid].segments[0].get_node_coords()
for i, seg in enumerate(self.GB[gid].segments[1:], start=1):
if self.GB[gid].segflips[i]:
thissegcoords = np.flip(seg.get_node_coords(), axis=0)
coords = np.vstack((coords, thissegcoords[1:]))
else:
coords = np.vstack((coords, seg.get_node_coords()[1:]))
if force_close:
coords = self.force_close_coordinates(coords, assess_first=True)
plt.figure()
for gid in self.gid:
c = self.GBCoords[gid]
plt.plot(c[:, 0], c[:, 1])
[docs]
def plot_user_gbcoords(self, gbcoords, lw=1.5):
"""Visualise user gbcoords using Matplotlib or PyVista."""
fig, ax = plt.subplots()
for gid in self.gid:
plt.plot(gbcoords[gid][:, 0], gbcoords[gid][:, 1],
ls='solid', lw=lw)
[docs]
def plot_user_gbcoords1(self, gbcoords, lw=1.5):
"""Visualise user gbcoords1 using Matplotlib or PyVista."""
fig, ax = plt.subplots()
for gid in self.gid:
gbc = gbcoords[gid]
if not DO.is_a_in_b(gbc[0], gbc[1:]):
gbc = np.vstack((gbc, gbc[-1]))
plt.plot(gbc[:, 0], gbc[:, 1],
ls='solid', lw=lw)
[docs]
def construct_geometric_xtals_from_gbcoords(self,
coord_loop_dict,
dtype='shapely',
saa=True, throw=False):
""" self.construct_geometric_xtals_from_gbcoords(GBCoords)."""
if dtype == 'shapely':
GRAINS = {gid: Polygon(coord_loop_dict[gid]) for gid in self.gid}
if saa:
self.GRAINS = GRAINS
if throw:
return GRAINS
[docs]
def construct_geometric_polyxtal_from_xtals(self,
xtal_list,
dtype='shapely',
saa=True, throw=False):
"""
self.construct_geometric_polyxtal_from_xtals(self.GRAINS.values(), dtype='shapely')
"""
if dtype == 'shapely':
POLYXTAL = MultiPolygon(xtal_list)
if saa:
self.POLYXTAL = POLYXTAL
if throw:
return POLYXTAL
[docs]
def construct_geometric_polyxtal_from_gbcoords(self,
coord_loop_dict,
dtype='shapely',
saa=True, throw=False,
plot_polyxtal=False,
verbose=True
):
"""Construct geometric polyxtal from gbcoords."""
if verbose:
print("Constructing geometric polyxtal from grain boundary coordinates.")
if dtype == 'shapely':
if saa:
self.construct_geometric_xtals_from_gbcoords(coord_loop_dict,
saa=True,
throw=False)
self.POLYXTAL = MultiPolygon(self.GRAINS.values())
if plot_polyxtal:
self.plot_multipolygon(self.POLYXTAL, invert_y=True)
if throw:
return self.GRAINS, self.POLYXTAL
if not saa and throw:
GRAINS = self.construct_geometric_xtals_from_gbcoords(coord_loop_dict,
saa=False,
throw=True)
POLYXTAL = MultiPolygon(GRAINS.values())
if plot_polyxtal:
self.plot_multipolygon(POLYXTAL, invert_y=True)
return GRAINS, POLYXTAL
[docs]
def AssembleGBSEGS(self, GB, saa=True, throw=False, verbose=True):
"""Assemblegbsegs."""
if verbose:
print("Assembling grain boundary segments into grain boundary multi-linestrings")
mids_all_gbsegs = []
for gb in GB.values():
mids_all_gbsegs.extend([id(seg) for seg in gb.segments])
mids_all_gbsegs = np.unique(mids_all_gbsegs)
sgseg_obj_list = [None for mid in mids_all_gbsegs]
# --------------------------
for i, mid in enumerate(mids_all_gbsegs, start=0):
for gid in self.gid:
for gb in GB[gid].segments:
if mid == id(gb):
sgseg_obj_list[i] = gb
sgseg_obj_list = np.array(sgseg_obj_list)
# --------------------------
if saa:
self.mids_all_gbsegs = mids_all_gbsegs
self.sgseg_obj_list = sgseg_obj_list
if throw:
return mids_all_gbsegs, sgseg_obj_list
[docs]
def get_mids_gbsegs(self, gid):
"""Return the mids gbsegs."""
return [id(seg) for seg in self.GB[gid].segments]
[docs]
def get_gbmid_indices_at_gid(self, gid, all_mids):
"""Return the gbmid indices at gid."""
segmids = self.get_mids_gbsegs(gid)
locs = [np.argwhere(all_mids == segmid)[0][0] for segmid in segmids]
# for segmid in segmids:
# locs.append(np.argwhere(all_mids == segmid)[0][0])
return locs
[docs]
def smooth_gbsegs(self, GB, npasses=2, max_smooth_levels=[3, 3], plot=True,
name='kali'):
"""Smooth gbsegs."""
# Validations
if type(max_smooth_levels) in dth.dt.NUMBERS:
max_smooth_levels = [max_smooth_levels]
# -----------------------------------------------
GB_smooth = deepcopy(GB)
all_mids, sgseg_list = self.AssembleGBSEGS(GB_smooth, saa=False, throw=True)
for np in range(npasses):
print(f"Carrying out smoothing pass: {np+1}")
slevel = max_smooth_levels[np]
for seg in sgseg_list:
seg.smooth(max_smooth_level=slevel)
# -------------------------------------
if plot:
for gid in self.gid:
GB_smooth[gid].plot_segs()
for gid in self.gid:
self.GB[gid].plot_segs()
# -------------------------------------
GBCoords_smoothed = {gid: None for gid in self.gid}
# -------------------------------------
for gid in self.gid:
if gid in self.grain_loc_ids['boundary']:
coord = GB_smooth[gid].create_coords_from_segments(force_close=True)
else:
coord = GB_smooth[gid].create_coords_from_segments(force_close=False)
GBCoords_smoothed[gid] = coord
# -------------------------------------
if plot:
plt.figure()
for gid in self.gid:
c = GBCoords_smoothed[gid]
plt.plot(c[:, 0], c[:, 1])
# -------------------------------------
_fn_ = self.construct_geometric_xtals_from_gbcoords
GRAINS = _fn_(GBCoords_smoothed, saa=False, throw=True)
POLYXTAL = MultiPolygon(GRAINS.values())
# -------------------------------------
self.smoothed[name] = {'GB': GB_smooth,
'GBCoords': GBCoords_smoothed,
'GRAINS': GRAINS,
'POLYXTAL': POLYXTAL,
}
[docs]
def plotgs(self, gs_geometric, fig=None, ax=None, cmap='tab20', edgecolor='black',
alpha=0.7, lw=1, figsize=(10, 10), dpi=100):
"""Plotgs."""
from upxo.viz.gsviz import plot_multipolygon_geometric
fig, ax = plot_multipolygon_geometric(gs_geometric, fig=fig, ax=ax, cmap=cmap,
edgecolor=edgecolor, alpha=alpha, lw=lw,
figsize=figsize, dpi=dpi)
return fig, ax
# =====================================================================================
# =====================================================================================
# =====================================================================================
[docs]
class VoronoiMasking(ABC):
"""Abstract base class for Voronoi-based masking of a label field image (LFI)."""
def __init__(self, lfi):
"""Initialise the instance."""
self.lfi = lfi # n x m (x o)
self.seeds = None # Seed coordinates
self.cell_to_id = {} # Mapping: {cell_idx: grain_id}
self.lfi_field = None # Optional: Specific field to sample from
self.cells = {} # {cell_id: geometric object}
[docs]
def map_seeds_to_lfi(self, channel=0):
"""
Samples the LFI at seed coordinates.
Correctly handles the conversion from Cartesian (x, y) seeds
to array indices (row, col) for sampling.
"""
from scipy.ndimage import map_coordinates
# Handle multi-channel vs single-channel indexing
# self.seeds.shape[1] is the spatial dimension (2 for 2D, 3 for 3D)
if self.lfi.ndim > self.seeds.shape[1]:
target_data = self.lfi[..., channel]
else:
target_data = self.lfi
# CRITICAL FIX:
# map_coordinates requires coordinates in (row, col, depth) order.
# Our seeds are in (x, y, z) order.
# For 2D: we swap [x, y] -> [y, x] to get [row, col]
# For 3D: we swap [x, y, z] -> [z, y, x] to get [plane, row, col]
coords_for_sampling = self.seeds[:, ::-1].T
# order=0 is essential for labeled images to avoid interpolating between IDs
ids = map_coordinates(target_data, coords_for_sampling, order=0, mode='nearest')
# Store mapping: {cell_index: entity_id}
self.cell_to_id = {i: int(val) for i, val in enumerate(ids)}
[docs]
@classmethod
@abstractmethod
def by_tessellation(cls, lfi, seeds):
"""Factory method to create instance via new Voronoi computation."""
pass
[docs]
@classmethod
@abstractmethod
def load_tessellation(cls, lfi, filepath):
"""Factory method to create instance from a saved geometry file."""
pass
[docs]
@abstractmethod
def assemble_cells(self):
"""Dimension-specific: Shapely UnaryUnion vs PyVista Merge."""
pass
# =====================================================================================
# =====================================================================================
# =====================================================================================
from shapely.ops import unary_union
from collections import defaultdict
[docs]
class GrainManifold2D(VoronoiMasking):
"""2D grain manifold constructed by Voronoi tessellation and label field sampling."""
[docs]
@classmethod
def by_tessellation(cls, lfi, seeds, channel=0):
"""By tessellation."""
instance = cls(lfi)
instance.seeds = seeds
# 1. Map seeds
instance.map_seeds_to_lfi(channel=channel)
# 2. Generate clipped polygons (your internal logic)
polygons = instance._generate_clipped_polygons(seeds)
# 3. Assemble into 'cells' (Grains/Twins/etc)
instance.assemble_cells(polygons)
return instance
[docs]
def assemble_cells(self, polygons):
"""Steps 5 & 6: Groups by ID and performs manifold union."""
groups = defaultdict(list)
for cell_idx, entity_id in self.cell_to_id.items():
groups[entity_id].append(polygons[cell_idx])
for entity_id, poly_list in groups.items():
# Merges all Voronoi cells into a single manifold entity
self.cells[entity_id] = unary_union(poly_list)
def _generate_clipped_polygons(self, seeds):
"""
Step 3: Internal helper to generate finite, clipped Voronoi polygons.
"""
from scipy.spatial import Voronoi
from shapely.geometry import Polygon, box
# 1. Define RVE boundaries based on LFI shape
height, width = self.lfi.shape[:2]
boundary_box = box(0, 0, width, height)
# 2. Ghost Seed Padding (Reflect boundary points to trap infinite rays)
# Using a 10% buffer based on RVE size
pad = max(width, height) * 0.1
left = seeds[seeds[:, 0] < pad].copy()
left[:, 0] = -left[:, 0]
right = seeds[seeds[:, 0] > (width - pad)].copy()
right[:, 0] = 2 * width - right[:, 0]
bottom = seeds[seeds[:, 1] < pad].copy()
bottom[:, 1] = -bottom[:, 1]
top = seeds[seeds[:, 1] > (height - pad)].copy()
top[:, 1] = 2 * height - top[:, 1]
# Stack original seeds with ghosts
all_seeds = np.vstack([seeds, left, right, bottom, top])
# 3. Generate Voronoi
vor = Voronoi(all_seeds)
# 4. Extract and Clip Polygons
polygons = []
self.cell_vertices_raw = [] # Initialize storage for reconstruction
# Only iterate through the original seeds (first len(seeds) points)
for i in range(len(seeds)):
region_idx = vor.point_region[i]
region = vor.regions[region_idx]
# In a padded setup, regions for original seeds are guaranteed to be finite
# but we clip with the bounding box to ensure perfect RVE edges
verts = vor.vertices[region]
# Store the raw vertex coordinates for this cell
# We store them as a list of tuples to be hashable for the map
self.cell_vertices_raw.append([tuple(v) for v in verts])
poly = Polygon(verts)
# Intersection ensures the cell stays within [0,0] to [width, height]
clipped_poly = poly.intersection(boundary_box)
polygons.append(clipped_poly)
return polygons # This prevents the TypeError
[docs]
@classmethod
def load_tessellation(cls, lfi, filepath):
"""Placeholder implementation to satisfy abstract requirement."""
instance = cls(lfi)
# Add logic to load shapely geometries from file here
return instance
[docs]
def smooth_interfaces(self, iterations=10, lmbda=0.5, mu=-0.53,
method='taubin', ma_window=3, corner_angle_deg=30.0,
thin_grain_px=0.0):
"""Smooth interfaces."""
if method == 'taubin':
from shapely.geometry import Polygon, MultiPolygon, GeometryCollection
adj = self._get_vertex_adjacency()
coords = {p: p for p in adj.keys()}
frozen: set = set()
if corner_angle_deg > 0:
for geom in self.cells.values():
polys = (list(geom.geoms)
if isinstance(geom, (MultiPolygon, GeometryCollection))
else [geom])
for poly in polys:
if not isinstance(poly, Polygon):
continue
c = np.array(poly.exterior.coords)
for i, ang in enumerate(self._compute_vertex_angles(c)):
if ang < corner_angle_deg:
frozen.add(tuple(c[i]))
frozen |= self._collect_thin_grain_vertices(thin_grain_px)
for _ in range(iterations):
coords = self._laplacian_step(lmbda, coords_map=coords, frozen=frozen)
coords = self._laplacian_step(mu, coords_map=coords, frozen=frozen)
self._reconstruct_from_coords(coords)
elif method == 'moving_average':
for _ in range(iterations):
self._apply_moving_average_v2(window_size=ma_window,
corner_angle_deg=corner_angle_deg,
thin_grain_px=thin_grain_px)
else:
raise ValueError(f"Unknown smoothing method: {method!r}. "
f"Choose 'taubin' or 'moving_average'.")
def _laplacian_step(self, factor, coords_map=None, frozen=None):
"""
Smoothing step. Calculates displacement for each vertex toward the average of its neighbors.
"""
# Use the current state of vertices if no intermediate map is provided
if coords_map is None:
# We use tuple keys to map original coordinates to current positions
coords_map = {p: p for p in self._get_vertex_adjacency().keys()}
adj = self._get_vertex_adjacency()
height, width = self.lfi.shape[:2]
new_coords = {}
for p, neighbors in adj.items():
x, y = p
# --- TOPOLOGICAL PINNING ---
# 1. Freeze RVE boundaries (x=0, x=width, y=0, y=height)
is_on_boundary = (x <= 0 or x >= width or y <= 0 or y >= height)
# 2. Freeze Triple/Quadruple Points (> 2 neighbors)
is_junction = len(neighbors) > 2
# 3. Handle Endpoints (isolated segments)
is_endpoint = len(neighbors) < 2
# 4. Freeze explicitly supplied vertices (e.g. sharp-angle tips)
is_frozen = frozen is not None and p in frozen
if is_on_boundary or is_junction or is_endpoint or is_frozen:
# Anchor these points to maintain manifold integrity
new_coords[p] = coords_map[p]
else:
# Discrete Laplacian: v_new = v_old + factor * (Average(neighbors) - v_old)
neighbor_list = list(neighbors)
# Pull coordinates from the current map
p1_coords = coords_map[neighbor_list[0]]
p2_coords = coords_map[neighbor_list[1]]
current_p = coords_map[p]
# Calculate centroid of neighbors
avg_x = (p1_coords[0]+p2_coords[0]) / 2
avg_y = (p1_coords[1]+p2_coords[1]) / 2
# Apply displacement
dx, dy = avg_x - current_p[0], avg_y-current_p[1]
new_coords[p] = (current_p[0]+factor*dx, current_p[1]+factor*dy)
return new_coords
def _get_vertex_adjacency_old(self):
"""
Builds a map of shared vertices and their neighbors.
Triple points and RVE corners are identified to be pinned.
"""
from collections import defaultdict
adj = defaultdict(set)
for gid, poly in self.cells.items():
# Get exterior coordinates (x, y)
coords = list(poly.exterior.coords)
for i in range(len(coords) - 1):
p1, p2 = coords[i], coords[i+1]
adj[p1].add(p2)
adj[p2].add(p1)
return adj
def _get_vertex_adjacenc_old2(self):
"""
Builds a map of shared vertices and their neighbors.
Robustly handles MultiPolygons (islands/fragmented grains).
"""
from collections import defaultdict
from shapely.geometry import MultiPolygon
adj = defaultdict(set)
for gid, geom in self.cells.items():
# Recursive check for multi-part geometries
if isinstance(geom, MultiPolygon):
polys = list(geom.geoms)
else:
polys = [geom]
for poly in polys:
# Extract exterior ring coordinates
coords = list(poly.exterior.coords)
for i in range(len(coords) - 1):
p1, p2 = coords[i], coords[i+1]
# Map adjacency bidirectionally
adj[p1].add(p2)
adj[p2].add(p1)
return adj
def _get_vertex_adjacency(self):
"""Build and return a shared-vertex adjacency map for all grain polygons."""
from shapely.geometry import Polygon, MultiPolygon, GeometryCollection
from collections import defaultdict
adj = defaultdict(set)
for entity_id, geom in self.cells.items():
# Helper to extract polygons from any geometry type
def get_polys(g):
"""Return the polys."""
if isinstance(g, Polygon):
return [g]
elif isinstance(g, (MultiPolygon, GeometryCollection)):
# Recursively find polygons inside collections
res = []
for part in g.geoms:
res.extend(get_polys(part))
return res
return [] # Ignore Points and LineStrings
polys = get_polys(geom)
for poly in polys:
# Now safe to access .exterior
coords = list(poly.exterior.coords)
for i in range(len(coords) - 1):
p1, p2 = tuple(coords[i]), tuple(coords[i+1])
adj[p1].add(p2)
adj[p2].add(p1)
return adj
[docs]
def gb_smooth(self, ma_window=3, taubin_iter=10, lmbda=0.5, mu=-0.5, ma_smoother_version=2):
"""Gb smooth."""
# 1. Stage 1: Local Moving Average
if ma_smoother_version == 1:
self._apply_moving_average_v1(window_size=ma_window)
else:
self._apply_moving_average_v2(window_size=ma_window)
# 2. CRITICAL SYNC: Re-build adjacency based on the MOVED vertices
# This prevents Taubin from using the old jagged voxel positions.
self.vertex_adj = self._get_vertex_adjacency()
# 3. Stage 2: Global Taubin
self.apply_taubin_smoothing(iterations=taubin_iter, lmbda=lmbda, mu=mu)
def _apply_moving_average_v1(self, window_size=3):
"""
Improved Moving Average that preserves topological anchors.
"""
from shapely.geometry import Polygon
import numpy as np
adj = self._get_vertex_adjacency()
height, width = self.lfi.shape[:2]
# Identify anchor points (Junctions and RVE boundaries)
anchors = {p for p, neighbors in adj.items()
if len(neighbors) > 2 or p[0] <= 0 or p[0] >= width or p[1] <= 0 or p[1] >= height}
new_polygons = []
for i in range(len(self.seeds)):
verts = np.array(self.cell_vertices_raw[i])
smoothed_verts = np.copy(verts)
# Apply smoothing only to non-anchor vertices
for j in range(1, len(verts) - 1):
p = tuple(verts[j])
if p not in anchors:
# Average with immediate neighbors in the vertex list
prev_p = verts[j-1]
next_p = verts[j+1]
smoothed_verts[j] = (prev_p + verts[j] + next_p) / 3
# Ensure closing vertex matches starting vertex
smoothed_verts[-1] = smoothed_verts[0]
# Heal with buffer(0) to prevent GEOS errors
new_polygons.append(Polygon(smoothed_verts).buffer(0))
self._re_assemble_from_polygons(new_polygons)
return new_polygons
@staticmethod
def _compute_vertex_angles(coords: np.ndarray) -> np.ndarray:
"""Interior angle (degrees) at each vertex of a closed polygon ring."""
n = len(coords) - 1 # last coord repeats first
angles = np.full(n, 180.0)
for i in range(n):
u = coords[i - 1] - coords[i]
w = coords[(i + 1) % n] - coords[i]
denom = np.linalg.norm(u) * np.linalg.norm(w)
if denom > 1e-12:
cos_a = np.clip(np.dot(u, w) / denom, -1.0, 1.0)
angles[i] = np.degrees(np.arccos(cos_a))
return angles
def _collect_thin_grain_vertices(self, width_threshold: float) -> set:
"""
Return the set of all exterior vertex coords belonging to polygons whose
minimum bounding-rectangle width is below `width_threshold` (pixel units).
Freezing all vertices of a thin polygon prevents it from self-intersecting
during smoothing.
"""
from shapely.geometry import Polygon, MultiPolygon, GeometryCollection
thin_verts: set = set()
if width_threshold <= 0:
return thin_verts
for geom in self.cells.values():
polys = (list(geom.geoms)
if isinstance(geom, (MultiPolygon, GeometryCollection))
else [geom])
for poly in polys:
if not isinstance(poly, Polygon) or poly.is_empty:
continue
mrr = poly.minimum_rotated_rectangle
c = list(mrr.exterior.coords)
d1 = ((c[0][0]-c[1][0])**2 + (c[0][1]-c[1][1])**2) ** 0.5
d2 = ((c[1][0]-c[2][0])**2 + (c[1][1]-c[2][1])**2) ** 0.5
if min(d1, d2) < width_threshold:
for x, y in list(poly.exterior.coords)[:-1]:
thin_verts.add((float(x), float(y)))
return thin_verts
def _apply_moving_average_v2(self, window_size=3, corner_angle_deg=30.0, thin_grain_px=0.0):
"""
Applies segment-based smoothing using the user's mean_coordinates logic.
Each boundary segment between anchor points is smoothed independently.
"""
from shapely.geometry import Polygon, MultiPolygon, GeometryCollection
from shapely.ops import unary_union
import numpy as np
# Internal helper to compute moving average (mode='valid')
def local_ma(data, w):
"""Local ma."""
if len(data) < w: return data
return np.convolve(data, np.ones(w) / w, mode='valid')
# Internal helper to apply smoothing and re-cap endpoints
def local_smooth(coords, w):
"""Local smooth."""
if len(coords) < w: return coords
x, y = coords[:, 0], coords[:, 1]
x_smooth = local_ma(x, w)
y_smooth = local_ma(y, w)
if len(x_smooth) > 0:
return np.vstack([
[x[0], y[0]], # Keep start anchor
np.column_stack([x_smooth, y_smooth]),
[x[-1], y[-1]] # Keep end anchor
])
return coords
# 1. Identify Anchor Points (Triple points and RVE boundaries)
adj = self._get_vertex_adjacency()
height, width = self.lfi.shape[:2]
anchors = {p for p, neighbors in adj.items()
if len(neighbors) > 2
or p[0] <= 0 or p[0] >= width
or p[1] <= 0 or p[1] >= height}
# Also freeze vertices with very acute interior angles (thin grain tips / twin lamellae)
if corner_angle_deg > 0:
for geom in self.cells.values():
polys_to_scan = (list(geom.geoms)
if isinstance(geom, (MultiPolygon, GeometryCollection))
else [geom])
for poly in polys_to_scan:
if not isinstance(poly, Polygon):
continue
coords = np.array(poly.exterior.coords)
angles = self._compute_vertex_angles(coords)
for i, ang in enumerate(angles):
if ang < corner_angle_deg:
anchors.add(tuple(coords[i]))
# Freeze ALL vertices of geometrically thin polygons (entire grain frozen)
anchors |= self._collect_thin_grain_vertices(thin_grain_px)
new_cells = {}
for gid, geom in self.cells.items():
if isinstance(geom, MultiPolygon):
parts = list(geom.geoms)
elif isinstance(geom, GeometryCollection):
parts = [g for g in geom.geoms if isinstance(g, Polygon)]
else:
parts = [geom]
smoothed_parts = []
for part in parts:
coords = np.array(part.exterior.coords)
anchor_indices = sorted(list(set([i for i, p in enumerate(coords) if tuple(p) in anchors])))
if not anchor_indices:
# Handle closed loops with no anchors (Islands)
smoothed_ring = local_smooth(coords, window_size)
else:
# Ensure the loop is fully covered
if anchor_indices[0] != 0: anchor_indices = [0] + anchor_indices
if anchor_indices[-1] != len(coords) - 1: anchor_indices = anchor_indices + [len(coords) - 1]
segments = []
for i in range(len(anchor_indices) - 1):
seg = coords[anchor_indices[i] : anchor_indices[i+1] + 1]
smoothed_seg = local_smooth(seg, window_size)
segments.append(smoothed_seg[:-1])
segments.append(smoothed_seg[-1:])
smoothed_ring = np.vstack(segments)
new_poly = Polygon(smoothed_ring)
smoothed_parts.append(new_poly.buffer(0))
merged = unary_union(smoothed_parts) if len(smoothed_parts) > 1 else smoothed_parts[0]
if isinstance(merged, GeometryCollection) and not isinstance(merged, MultiPolygon):
polys = [g for g in merged.geoms if isinstance(g, Polygon) and g.area > 0]
merged = unary_union(polys) if polys else merged
new_cells[gid] = merged
self.cells = new_cells
# Update raw vertices so Taubin uses the smoothed segment nodes
for gid, geom in self.cells.items():
if isinstance(geom, Polygon):
self.cell_vertices_raw[gid] = list(geom.exterior.coords)
return self.cells
def _taubin_step(self, adj, coords_map, factor):
"""Pass of Taubin filter with RVE boundary freezing."""
height, width = self.lfi.shape[:2]
new_coords = {}
for p, neighbors in adj.items():
x, y = p
# --- FREEZE LOGIC ---
# 1. Freeze RVE boundaries (x=0, x=width, y=0, y=height)
is_on_boundary = (x <= 0 or x >= width or y <= 0 or y >= height)
# 2. Freeze Triple Points (more than 2 neighbors)
is_triple_point = len(neighbors) > 2
if is_on_boundary or is_triple_point:
new_coords[p] = p # Keep original coordinates exactly
else:
# Standard Taubin smoothing for internal interface points
neighbor_list = list(neighbors)
avg_x = sum(coords_map[n][0] for n in neighbor_list) / 2
avg_y = sum(coords_map[n][1] for n in neighbor_list) / 2
dx = avg_x - coords_map[p][0]
dy = avg_y - coords_map[p][1]
new_coords[p] = (coords_map[p][0] + factor * dx,
coords_map[p][1] + factor * dy)
return new_coords
[docs]
def apply_taubin_smoothing(self, iterations=10, lmbda=0.5, mu=-0.53):
"""
Step 8: Smooths interfaces while keeping triple points pinned.
"""
adj = self._get_vertex_adjacency()
# Current state of all vertices in the system
coords = {p: p for p in adj.keys()}
for _ in range(iterations):
# Step 1: Shrink
coords = self._taubin_step(adj, coords, lmbda)
# Step 2: Inflate
coords = self._taubin_step(adj, coords, mu)
# Update the Shapely geometries with smoothed coordinates
self._reconstruct_from_coords(coords)
def _reconstruct_from_coords(self, smoothed_coords_map):
"""Reconstruct Shapely cell polygons from a smoothed coordinate map."""
from shapely.geometry import Polygon
from shapely.ops import unary_union
from collections import defaultdict
import shapely
updated_polygons = []
for i in range(len(self.seeds)):
original_verts = self.cell_vertices_raw[i]
new_verts = [smoothed_coords_map.get(v, v) for v in original_verts]
# Create polygon and ensure it is valid
poly = Polygon(new_verts)
if not poly.is_valid:
poly = poly.buffer(0) # Standard fix for self-intersections
updated_polygons.append(poly)
self.cells = {}
groups = defaultdict(list)
for cell_idx, entity_id in self.cell_to_id.items():
groups[entity_id].append(updated_polygons[cell_idx])
for entity_id, poly_list in groups.items():
try:
# Attempt union
self.cells[entity_id] = unary_union(poly_list)
except Exception:
# If union fails, clean geometries further and try again
clean_polys = [p.buffer(0.0001) for p in poly_list] # Tiny expansion to force overlap
self.cells[entity_id] = unary_union(clean_polys)
def _re_assemble_from_polygons_old(self, updated_polygons):
"""
Groups Voronoi cells by their Grain/Entity ID and merges them.
Includes robustness checks to handle GEOS TopologyExceptions.
Stores individual constituent cells in self.constituent_cells.
"""
from shapely.ops import unary_union
from collections import defaultdict
# import shapely
# 1. Initialize the storage for constituent cells {entity_id: [poly1, poly2, ...]}
self.constituent_cells = defaultdict(list)
# Group individual smoothed Voronoi polygons by their assigned ID
groups = defaultdict(list)
for cell_idx, entity_id in self.cell_to_id.items():
poly = updated_polygons[cell_idx]
groups[entity_id].append(poly)
# Store in the persistent attribute for sub-grain analysis
self.constituent_cells[entity_id].append(poly)
# 2. Clear current cells and perform the manifold union
self.cells = {}
for entity_id, poly_list in groups.items():
try:
# Standard union for well-behaved smoothed cells
union_geom = unary_union(poly_list)
# Ensure the result is valid (fixes self-intersections from MA)
if not union_geom.is_valid:
union_geom = union_geom.buffer(0)
self.cells[entity_id] = union_geom
except Exception as e:
# Fallback: If MA caused vertices to overlap exactly,
# a tiny buffer helps GEOS resolve the 'side location conflict'.
print(f"Heuristic repair triggered for Grain {entity_id} due to: {e}")
cleaned_polys = [p.buffer(1e-7) if not p.is_valid else p for p in poly_list]
self.cells[entity_id] = unary_union(cleaned_polys).buffer(-1e-7)
return self.cells
def _re_assemble_from_polygons_old1(self, updated_polygons):
"""
Groups Voronoi cells by their ID and merges them into a clean manifold.
Prunes GeometryCollections to ensure only Polygons remain.
"""
from shapely.ops import unary_union
from shapely.geometry import Polygon, MultiPolygon, GeometryCollection
from collections import defaultdict
import shapely
# 1. Group and capture constituents
self.constituent_cells = defaultdict(list)
groups = defaultdict(list)
for cell_idx, entity_id in self.cell_to_id.items():
poly = updated_polygons[cell_idx]
groups[entity_id].append(poly)
self.constituent_cells[entity_id].append(poly)
# 2. Perform the manifold union with artifact pruning
self.cells = {}
for entity_id, poly_list in groups.items():
try:
# Stage 1: Initial Union
union_geom = unary_union(poly_list)
# Stage 2: Precision Snapping (Fixes sliver/topology errors)
# This prevents GEOS from seeing 'nearly identical' lines as separate
union_geom = shapely.set_precision(union_geom, grid_size=1e-6)
# Stage 3: GeometryCollection Pruning
if isinstance(union_geom, GeometryCollection):
# Recursively extract only the polygonal parts
polys = [g for g in union_geom.geoms if isinstance(g, (Polygon, MultiPolygon))]
union_geom = unary_union(polys)
# Final validation and healing
if not union_geom.is_valid:
union_geom = union_geom.buffer(0)
# Ensure Grain 1 doesn't store that stray POINT
self.cells[entity_id] = union_geom
except Exception as e:
print(f"Heuristic repair triggered for Grain {entity_id}: {e}")
cleaned_polys = [p.buffer(1e-7) if not p.is_valid else p for p in poly_list]
self.cells[entity_id] = unary_union(cleaned_polys).buffer(-1e-7)
return self.cells
def _re_assemble_from_polygons(self, updated_polygons):
"""
Groups Voronoi cells by Grain ID and merges them.
Explicitly ignores guard seeds (ID == -1) to ensure clean RVE boundaries.
"""
from shapely.ops import unary_union
from shapely.geometry import Polygon, MultiPolygon, GeometryCollection
from collections import defaultdict
import shapely
# 1. Initialize storage
self.constituent_cells = defaultdict(list)
groups = defaultdict(list)
# 2. Group individual polygons by their assigned ID
for cell_idx, entity_id in self.cell_to_id.items():
# --- IGNORE GUARD SEEDS ---
# Guard seeds have ID -1; they are only used to 'shape' the boundary
if entity_id == -1:
continue
poly = updated_polygons[cell_idx]
groups[entity_id].append(poly)
self.constituent_cells[entity_id].append(poly)
# 3. Perform the manifold union with artifact pruning
self.cells = {}
for entity_id, poly_list in groups.items():
try:
# Merge individual cells into a single Grain geometry
union_geom = unary_union(poly_list)
# Snap to precision to fix sliver/topology errors
union_geom = shapely.set_precision(union_geom, grid_size=1e-6)
# Prune GeometryCollections to ensure only Polygons remain
if isinstance(union_geom, GeometryCollection):
polys = [g for g in union_geom.geoms if isinstance(g, (Polygon, MultiPolygon))]
union_geom = unary_union(polys)
# Final 'healing' pass to fix self-intersections from smoothing
if not union_geom.is_valid:
union_geom = union_geom.buffer(0)
self.cells[entity_id] = union_geom
except Exception as e:
print(f"Heuristic repair triggered for Grain {entity_id}: {e}")
cleaned_polys = [p.buffer(1e-7) if not p.is_valid else p for p in poly_list]
self.cells[entity_id] = unary_union(cleaned_polys).buffer(-1e-7)
return self.cells
def _capture_grain_constituents(self):
"""
Groups individual Voronoi cells by their assigned Grain ID
and stores them in an attribute for later analysis or retrieval.
"""
from collections import defaultdict
# Initialize the container: {grain_id: [list_of_shapely_polygons]}
self.grain_to_cells = defaultdict(list)
# Iterate through all generated Voronoi cells
# Note: self.cell_to_id maps the index of the seed to the Grain ID from the LFI
for i in range(len(self.seeds)):
grain_id = self.cell_to_id[i]
# Retrieve the individual Voronoi polygon for this seed
# Assuming they are stored in a temporary list or self.cell_vertices_raw
from shapely.geometry import Polygon
cell_polygon = Polygon(self.cell_vertices_raw[i])
# Store the individual cell against the final grain ID key
self.grain_to_cells[grain_id].append(cell_polygon)
return self.grain_to_cells
[docs]
def trim_to_rve(self, bounds=(0, 0, 200, 200)):
"""
Trims all grain geometries to the exact RVE bounding box.
This ensures perfectly straight, rectangular external faces.
"""
from shapely.geometry import box, Polygon, MultiPolygon, GeometryCollection
import shapely
# 1. Define the 'Cookie Cutter' (The RVE Box)
minx, miny, maxx, maxy = bounds
rve_box = box(minx, miny, maxx, maxy)
trimmed_cells = {}
for gid, geom in self.cells.items():
# 2. Perform Intersection with the RVE Box
# This cuts off any 'leakage' outside the 200x200 domain
cut_geom = geom.intersection(rve_box)
# 3. Artifact Pruning (Similar to our re_assemble logic)
if isinstance(cut_geom, GeometryCollection):
polys = [g for g in cut_geom.geoms if isinstance(g, (Polygon, MultiPolygon))]
cut_geom = shapely.ops.unary_union(polys)
# 4. Snap to precision to ensure edges are exactly on the boundary
# This prevents floating point errors like 199.99999999
cut_geom = shapely.set_precision(cut_geom, grid_size=1e-6)
# Final healing pass
if not cut_geom.is_valid:
cut_geom = cut_geom.buffer(0)
trimmed_cells[gid] = cut_geom
# Update the internal manifold state
self.cells = trimmed_cells
return self.cells
# =====================================================================================
# =====================================================================================
# =====================================================================================
[docs]
class GrainManifold3D(VoronoiMasking):
[docs]
@classmethod
def by_tessellation(cls, lfi, seeds, channel=0):
"""By tessellation."""
instance = cls(lfi)
instance.seeds = seeds
instance.map_seeds_to_lfi(channel=channel)
# 3D specific tessellation (returns list of PyVista UnstructuredGrids)
solids = instance._generate_clipped_solids(seeds)
instance.assemble_cells(solids)
return instance
[docs]
def assemble_cells(self, solids):
"""Assemble cells."""
# Use PyVista/VTK boolean_union or append filters here
pass