diff --git a/src/sgis/__init__.py b/src/sgis/__init__.py index f74502c5..96037f1a 100644 --- a/src/sgis/__init__.py +++ b/src/sgis/__init__.py @@ -55,15 +55,20 @@ from .geopandas_tools.overlay import clean_overlay from .geopandas_tools.point_operations import snap_all, snap_within_distance from .geopandas_tools.polygon_operations import ( + PolygonsAsRings, close_all_holes, close_small_holes, eliminate_by_largest, eliminate_by_longest, eliminate_by_smallest, + get_gaps, get_holes, get_polygon_clusters, ) +from .geopandas_tools.polygons_as_rings import PolygonsAsRings +from .geopandas_tools.polygons_to_lines import get_cheap_centerlines from .geopandas_tools.sfilter import sfilter, sfilter_inverse, sfilter_split +from .geopandas_tools.snap_polygons import coverage_clean, snap_polygons from .helpers import get_object_name, sort_nans_last from .io.opener import opener from .io.read_parquet import read_parquet_url diff --git a/src/sgis/geopandas_tools/polygon_operations.py b/src/sgis/geopandas_tools/polygon_operations.py index 99055c68..a3953b12 100644 --- a/src/sgis/geopandas_tools/polygon_operations.py +++ b/src/sgis/geopandas_tools/polygon_operations.py @@ -42,6 +42,7 @@ from .geometry_types import get_geom_type, make_all_singlepart, to_single_geom_type from .neighbors import get_neighbor_indices from .overlay import clean_overlay +from .polygons_as_rings import PolygonsAsRings from .sfilter import sfilter, sfilter_inverse @@ -505,238 +506,6 @@ def _eliminate(gdf, to_eliminate, aggfunc, crs, **kwargs): return pd.concat(to_concat).sort_index() -class PolygonsAsRings: - def __init__(self, polys: GeoDataFrame | GeoSeries | GeometryArray, crs=None): - if not isinstance(polys, (pd.DataFrame, pd.Series, GeometryArray)): - raise TypeError(type(polys)) - - self.polyclass = polys.__class__ - - if not isinstance(polys, pd.DataFrame): - polys = to_gdf(polys, crs) - - self.gdf = polys.reset_index(drop=True) - - if crs is not None: - self.crs = crs - elif hasattr(polys, "crs"): - self.crs = polys.crs - else: - self.crs = None - - if not len(self.gdf): - self.rings = pd.Series() - return - - # TODO: change to get_rings with return_index=True?? - - exterior = pd.Series( - get_exterior_ring(self.gdf.geometry.values), - index=self.exterior_index, - ) - - self.max_rings: int = np.max(get_num_interior_rings(self.gdf.geometry.values)) - - if not self.max_rings: - self.rings = exterior - return - - # series same length as number of potential inner rings - interiors = pd.Series( - ( - [ - [get_interior_ring(geom, i) for i in range(self.max_rings)] - for geom in self.gdf.geometry - ] - ), - ).explode() - - interiors.index = self.interiors_index - - interiors = interiors.dropna() - - self.rings = pd.concat([exterior, interiors]) - - def get_rings(self, agg: bool = False): - gdf = self.gdf.copy() - rings = self.rings.copy() - if agg: - gdf.geometry = rings.groupby(level=1).agg(unary_union) - else: - rings.index = rings.index.get_level_values(1) - rings.name = "geometry" - gdf = gdf.drop(columns="geometry").join(rings) - - if issubclass(self.polyclass, pd.DataFrame): - return GeoDataFrame(gdf, crs=self.crs) - if issubclass(self.polyclass, pd.Series): - return GeoSeries(gdf.geometry) - return self.polyclass(gdf.geometry.values) - - def apply_numpy_func_to_interiors( - self, func: Callable, args: tuple | None = None, kwargs: dict | None = None - ): - kwargs = kwargs or {} - args = args or () - arr: NDArray[LinearRing] = self.rings.loc[self.is_interior].values - index: pd.Index = self.rings.loc[self.is_interior].index - results = pd.Series( - np.array(func(arr, *args, **kwargs)), - index=index, - ) - self.rings.loc[self.is_interior] = results - return self - - def apply_geoseries_func( - self, func: Callable, args: tuple | None = None, kwargs: dict | None = None - ): - kwargs = kwargs or {} - args = args or () - - ser: pd.Series = self.rings.loc[self.is_interior] - index: pd.Index = self.rings.loc[self.is_interior].index - - results = pd.Series( - func( - GeoSeries(ser, crs=self.crs, index=self.rings.index), - *args, - **kwargs, - ), - index=index, - ) - self.rings.loc[self.is_interior] = results - - return self - - def apply_numpy_func( - self, func: Callable, args: tuple | None = None, kwargs: dict | None = None - ): - kwargs = kwargs or {} - args = args or () - - self.rings.loc[:] = np.array(func(self.rings.values, *args, **kwargs)) - return self - - def apply_geoseries_func( - self, func: Callable, args: tuple | None = None, kwargs: dict | None = None - ): - kwargs = kwargs or {} - args = args or () - - self.rings.loc[:] = np.array( - func( - GeoSeries(self.rings, crs=self.crs, index=self.rings.index), - *args, - **kwargs, - ) - ) - - return self - - def apply_gdf_func( - self, func: Callable, args: tuple | None = None, kwargs: dict | None = None - ): - kwargs = kwargs or {} - args = args or () - - gdf = GeoDataFrame( - {"geometry": self.rings.values}, - crs=self.crs, - index=self.rings.index.get_level_values(1), - ).join(self.gdf.drop(columns="geometry")) - - assert len(gdf) == len(self.rings) - - gdf.index = self.rings.index - - self.rings.loc[:] = func( - gdf, - *args, - **kwargs, - ).geometry.values - - return self - - @property - def is_interior(self): - return self.rings.index.get_level_values(0) == 1 - - @property - def is_exterior(self): - return self.rings.index.get_level_values(0) == 0 - - @property - def interiors_index(self): - """A three-leveled MultiIndex. - - Used to separate interior and exterior and sort the interior in - the 'to_numpy' method. - - level 0: all 1s, indicating "is interior". - level 1: gdf index repeated *self.max_rings* times. - level 2: interior number index. 0 * len(gdf), 1 * len(gdf), 2 * len(gdf)... - """ - if not self.max_rings: - return pd.MultiIndex() - len_gdf = len(self.gdf) - n_potential_interiors = len_gdf * self.max_rings - gdf_index = sorted(list(self.gdf.index) * self.max_rings) - interior_number_index = np.tile(np.arange(self.max_rings), len_gdf) - one_for_interior = np.repeat(1, n_potential_interiors) - - return pd.MultiIndex.from_arrays( - [one_for_interior, gdf_index, interior_number_index] - ) - - @property - def exterior_index(self): - """A three-leveled MultiIndex. - - Used to separate interior and exterior in the 'to_numpy' method. - Only leve 1 is used for the exterior. - - level 0: all 0s, indicating "not interior". - level 1: gdf index. - level 2: All 0s. - """ - zero_for_not_interior = np.repeat(0, len(self.gdf)) - return pd.MultiIndex.from_arrays( - [zero_for_not_interior, self.gdf.index, zero_for_not_interior] - ) - - def to_gdf(self) -> GeoDataFrame: - """Return the GeoDataFrame with polygons.""" - self.gdf.geometry = self.to_numpy() - return self.gdf - - def to_numpy(self) -> NDArray[Polygon]: - """Return a numpy array of polygons.""" - exterior = self.rings.loc[self.is_exterior].sort_index().values - assert exterior.shape == (len(self.gdf),) - - nonempty_interiors = self.rings.loc[self.is_interior] - - if not len(nonempty_interiors): - return make_valid(polygons(exterior)) - - empty_interiors = pd.Series( - [None for _ in range(len(self.gdf) * self.max_rings)], - index=self.interiors_index, - ).loc[lambda x: ~x.index.isin(nonempty_interiors.index)] - - interiors = ( - pd.concat([nonempty_interiors, empty_interiors]) - .sort_index() - # make each ring level a column with same length and order as gdf - .unstack(level=2) - .sort_index() - .values - ) - assert interiors.shape == (len(self.gdf), self.max_rings), interiors.shape - - return make_valid(polygons(exterior, interiors)) - - def close_thin_holes(gdf: GeoDataFrame, tolerance: int | float) -> GeoDataFrame: holes = get_holes(gdf) inside_holes = sfilter(gdf, holes, predicate="within").unary_union diff --git a/src/sgis/geopandas_tools/polygons_as_rings.py b/src/sgis/geopandas_tools/polygons_as_rings.py new file mode 100644 index 00000000..39783169 --- /dev/null +++ b/src/sgis/geopandas_tools/polygons_as_rings.py @@ -0,0 +1,291 @@ +import functools +import itertools +from typing import Callable, Iterable + +import geopandas as gpd +import igraph +import networkx as nx +import numpy as np +import pandas as pd +from geopandas import GeoDataFrame, GeoSeries +from geopandas.array import GeometryArray +from IPython.display import display +from networkx.algorithms import approximation as approx +from numpy import ndarray +from numpy.typing import NDArray +from pandas import Index +from shapely import ( + Geometry, + STRtree, + area, + box, + buffer, + centroid, + difference, + distance, + extract_unique_points, + get_coordinates, + get_exterior_ring, + get_interior_ring, + get_num_interior_rings, + get_parts, + get_rings, + intersection, + intersects, + is_empty, + is_ring, + length, + line_merge, + linearrings, + linestrings, + make_valid, + points, + polygons, + segmentize, + simplify, + unary_union, + voronoi_polygons, +) +from shapely.errors import GEOSException +from shapely.geometry import ( + LinearRing, + LineString, + MultiLineString, + MultiPoint, + Point, + Polygon, +) + +from .conversion import to_gdf + + +class PolygonsAsRings: + def __init__(self, polys: GeoDataFrame | GeoSeries | GeometryArray, crs=None): + if not isinstance(polys, (pd.DataFrame, pd.Series, GeometryArray)): + raise TypeError(type(polys)) + + self.polyclass = polys.__class__ + + if not isinstance(polys, pd.DataFrame): + polys = to_gdf(polys, crs) + + self.gdf = polys.reset_index(drop=True) + + if crs is not None: + self.crs = crs + elif hasattr(polys, "crs"): + self.crs = polys.crs + else: + self.crs = None + + if not len(self.gdf): + self.rings = pd.Series() + return + + # TODO: change to get_rings with return_index=True?? + + exterior = pd.Series( + get_exterior_ring(self.gdf.geometry.values), + index=self.exterior_index, + ) + + self.max_rings: int = np.max(get_num_interior_rings(self.gdf.geometry.values)) + + if not self.max_rings: + self.rings = exterior + return + + # series same length as number of potential inner rings + interiors = pd.Series( + ( + [ + [get_interior_ring(geom, i) for i in range(self.max_rings)] + for geom in self.gdf.geometry + ] + ), + ).explode() + + interiors.index = self.interiors_index + + interiors = interiors.dropna() + + self.rings = pd.concat([exterior, interiors]) + + def get_rings(self, agg: bool = False): + gdf = self.gdf.copy() + rings = self.rings.copy() + if agg: + gdf.geometry = rings.groupby(level=1).agg(unary_union) + else: + rings.index = rings.index.get_level_values(1) + rings.name = "geometry" + gdf = gdf.drop(columns="geometry").join(rings) + + if issubclass(self.polyclass, pd.DataFrame): + return GeoDataFrame(gdf, crs=self.crs) + if issubclass(self.polyclass, pd.Series): + return GeoSeries(gdf.geometry) + return self.polyclass(gdf.geometry.values) + + def apply_numpy_func_to_interiors( + self, func: Callable, args: tuple | None = None, kwargs: dict | None = None + ): + kwargs = kwargs or {} + args = args or () + arr: NDArray[LinearRing] = self.rings.loc[self.is_interior].values + index: pd.Index = self.rings.loc[self.is_interior].index + results = pd.Series( + np.array(func(arr, *args, **kwargs)), + index=index, + ) + self.rings.loc[self.is_interior] = results + return self + + def apply_geoseries_func( + self, func: Callable, args: tuple | None = None, kwargs: dict | None = None + ): + kwargs = kwargs or {} + args = args or () + + ser: pd.Series = self.rings.loc[self.is_interior] + index: pd.Index = self.rings.loc[self.is_interior].index + + results = pd.Series( + func( + GeoSeries(ser, crs=self.crs, index=self.rings.index), + *args, + **kwargs, + ), + index=index, + ) + self.rings.loc[self.is_interior] = results + + return self + + def apply_numpy_func( + self, func: Callable, args: tuple | None = None, kwargs: dict | None = None + ): + kwargs = kwargs or {} + args = args or () + + self.rings.loc[:] = np.array(func(self.rings.values, *args, **kwargs)) + return self + + def apply_geoseries_func( + self, func: Callable, args: tuple | None = None, kwargs: dict | None = None + ): + kwargs = kwargs or {} + args = args or () + + self.rings.loc[:] = np.array( + func( + GeoSeries(self.rings, crs=self.crs, index=self.rings.index), + *args, + **kwargs, + ) + ) + + return self + + def apply_gdf_func( + self, func: Callable, args: tuple | None = None, kwargs: dict | None = None + ): + kwargs = kwargs or {} + args = args or () + + gdf = GeoDataFrame( + {"geometry": self.rings.values}, + crs=self.crs, + index=self.rings.index.get_level_values(1), + ).join(self.gdf.drop(columns="geometry")) + + assert len(gdf) == len(self.rings) + + gdf.index = self.rings.index + + self.rings.loc[:] = func( + gdf, + *args, + **kwargs, + ).geometry.values + + return self + + @property + def is_interior(self): + return self.rings.index.get_level_values(0) == 1 + + @property + def is_exterior(self): + return self.rings.index.get_level_values(0) == 0 + + @property + def interiors_index(self): + """A three-leveled MultiIndex. + + Used to separate interior and exterior and sort the interior in + the 'to_numpy' method. + + level 0: all 1s, indicating "is interior". + level 1: gdf index repeated *self.max_rings* times. + level 2: interior number index. 0 * len(gdf), 1 * len(gdf), 2 * len(gdf)... + """ + if not self.max_rings: + return pd.MultiIndex() + len_gdf = len(self.gdf) + n_potential_interiors = len_gdf * self.max_rings + gdf_index = sorted(list(self.gdf.index) * self.max_rings) + interior_number_index = np.tile(np.arange(self.max_rings), len_gdf) + one_for_interior = np.repeat(1, n_potential_interiors) + + return pd.MultiIndex.from_arrays( + [one_for_interior, gdf_index, interior_number_index] + ) + + @property + def exterior_index(self): + """A three-leveled MultiIndex. + + Used to separate interior and exterior in the 'to_numpy' method. + Only leve 1 is used for the exterior. + + level 0: all 0s, indicating "not interior". + level 1: gdf index. + level 2: All 0s. + """ + zero_for_not_interior = np.repeat(0, len(self.gdf)) + return pd.MultiIndex.from_arrays( + [zero_for_not_interior, self.gdf.index, zero_for_not_interior] + ) + + def to_gdf(self) -> GeoDataFrame: + """Return the GeoDataFrame with polygons.""" + self.gdf.geometry = self.to_numpy() + return self.gdf + + def to_numpy(self) -> NDArray[Polygon]: + """Return a numpy array of polygons.""" + exterior = self.rings.loc[self.is_exterior].sort_index().values + assert exterior.shape == (len(self.gdf),) + + nonempty_interiors = self.rings.loc[self.is_interior] + + if not len(nonempty_interiors): + return make_valid(polygons(exterior)) + + empty_interiors = pd.Series( + [None for _ in range(len(self.gdf) * self.max_rings)], + index=self.interiors_index, + ).loc[lambda x: ~x.index.isin(nonempty_interiors.index)] + + interiors = ( + pd.concat([nonempty_interiors, empty_interiors]) + .sort_index() + # make each ring level a column with same length and order as gdf + .unstack(level=2) + .sort_index() + .values + ) + assert interiors.shape == (len(self.gdf), self.max_rings), interiors.shape + + return make_valid(polygons(exterior, interiors)) diff --git a/src/sgis/geopandas_tools/polygons_to_lines.py b/src/sgis/geopandas_tools/polygons_to_lines.py index aa966524..b5808de3 100644 --- a/src/sgis/geopandas_tools/polygons_to_lines.py +++ b/src/sgis/geopandas_tools/polygons_to_lines.py @@ -75,6 +75,7 @@ ) from .overlay import clean_overlay from .polygon_operations import close_small_holes, close_thin_holes, get_gaps, get_holes +from .polygons_as_rings import PolygonsAsRings from .sfilter import sfilter, sfilter_inverse, sfilter_split @@ -156,7 +157,9 @@ def connect_multilines(multiline): lines = get_parts(multiline) lines_between = [] for line in lines: - lines_between += [LineString(nearest_points(line, lines.difference(line)))] + lines_between += [ + LineString(nearest_points(line, multiline.difference(line))) + ] return MultiLineString(list(lines) + lines_between) is_multiline = GeoSeries(centerlines).geom_type == "MultiLineString" @@ -185,238 +188,6 @@ def connect_multilines(multiline): return centerlines -class PolygonsAsRings: - def __init__(self, polys: GeoDataFrame | GeoSeries | GeometryArray, crs=None): - if not isinstance(polys, (pd.DataFrame, pd.Series, GeometryArray)): - raise TypeError(type(polys)) - - self.polyclass = polys.__class__ - - if not isinstance(polys, pd.DataFrame): - polys = to_gdf(polys, crs) - - self.gdf = polys.reset_index(drop=True) - - if crs is not None: - self.crs = crs - elif hasattr(polys, "crs"): - self.crs = polys.crs - else: - self.crs = None - - if not len(self.gdf): - self.rings = pd.Series() - return - - # TODO: change to get_rings with return_index=True?? - - exterior = pd.Series( - get_exterior_ring(self.gdf.geometry.values), - index=self.exterior_index, - ) - - self.max_rings: int = np.max(get_num_interior_rings(self.gdf.geometry.values)) - - if not self.max_rings: - self.rings = exterior - return - - # series same length as number of potential inner rings - interiors = pd.Series( - ( - [ - [get_interior_ring(geom, i) for i in range(self.max_rings)] - for geom in self.gdf.geometry - ] - ), - ).explode() - - interiors.index = self.interiors_index - - interiors = interiors.dropna() - - self.rings = pd.concat([exterior, interiors]) - - def get_rings(self, agg: bool = False): - gdf = self.gdf.copy() - rings = self.rings.copy() - if agg: - gdf.geometry = rings.groupby(level=1).agg(unary_union) - else: - rings.index = rings.index.get_level_values(1) - rings.name = "geometry" - gdf = gdf.drop(columns="geometry").join(rings) - - if issubclass(self.polyclass, pd.DataFrame): - return GeoDataFrame(gdf, crs=self.crs) - if issubclass(self.polyclass, pd.Series): - return GeoSeries(gdf.geometry) - return self.polyclass(gdf.geometry.values) - - def apply_numpy_func_to_interiors( - self, func: Callable, args: tuple | None = None, kwargs: dict | None = None - ): - kwargs = kwargs or {} - args = args or () - arr: NDArray[LinearRing] = self.rings.loc[self.is_interior].values - index: pd.Index = self.rings.loc[self.is_interior].index - results = pd.Series( - np.array(func(arr, *args, **kwargs)), - index=index, - ) - self.rings.loc[self.is_interior] = results - return self - - def apply_geoseries_func( - self, func: Callable, args: tuple | None = None, kwargs: dict | None = None - ): - kwargs = kwargs or {} - args = args or () - - ser: pd.Series = self.rings.loc[self.is_interior] - index: pd.Index = self.rings.loc[self.is_interior].index - - results = pd.Series( - func( - GeoSeries(ser, crs=self.crs, index=self.rings.index), - *args, - **kwargs, - ), - index=index, - ) - self.rings.loc[self.is_interior] = results - - return self - - def apply_numpy_func( - self, func: Callable, args: tuple | None = None, kwargs: dict | None = None - ): - kwargs = kwargs or {} - args = args or () - - self.rings.loc[:] = np.array(func(self.rings.values, *args, **kwargs)) - return self - - def apply_geoseries_func( - self, func: Callable, args: tuple | None = None, kwargs: dict | None = None - ): - kwargs = kwargs or {} - args = args or () - - self.rings.loc[:] = np.array( - func( - GeoSeries(self.rings, crs=self.crs, index=self.rings.index), - *args, - **kwargs, - ) - ) - - return self - - def apply_gdf_func( - self, func: Callable, args: tuple | None = None, kwargs: dict | None = None - ): - kwargs = kwargs or {} - args = args or () - - gdf = GeoDataFrame( - {"geometry": self.rings.values}, - crs=self.crs, - index=self.rings.index.get_level_values(1), - ).join(self.gdf.drop(columns="geometry")) - - assert len(gdf) == len(self.rings) - - gdf.index = self.rings.index - - self.rings.loc[:] = func( - gdf, - *args, - **kwargs, - ).geometry.values - - return self - - @property - def is_interior(self): - return self.rings.index.get_level_values(0) == 1 - - @property - def is_exterior(self): - return self.rings.index.get_level_values(0) == 0 - - @property - def interiors_index(self): - """A three-leveled MultiIndex. - - Used to separate interior and exterior and sort the interior in - the 'to_numpy' method. - - level 0: all 1s, indicating "is interior". - level 1: gdf index repeated *self.max_rings* times. - level 2: interior number index. 0 * len(gdf), 1 * len(gdf), 2 * len(gdf)... - """ - if not self.max_rings: - return pd.MultiIndex() - len_gdf = len(self.gdf) - n_potential_interiors = len_gdf * self.max_rings - gdf_index = sorted(list(self.gdf.index) * self.max_rings) - interior_number_index = np.tile(np.arange(self.max_rings), len_gdf) - one_for_interior = np.repeat(1, n_potential_interiors) - - return pd.MultiIndex.from_arrays( - [one_for_interior, gdf_index, interior_number_index] - ) - - @property - def exterior_index(self): - """A three-leveled MultiIndex. - - Used to separate interior and exterior in the 'to_numpy' method. - Only leve 1 is used for the exterior. - - level 0: all 0s, indicating "not interior". - level 1: gdf index. - level 2: All 0s. - """ - zero_for_not_interior = np.repeat(0, len(self.gdf)) - return pd.MultiIndex.from_arrays( - [zero_for_not_interior, self.gdf.index, zero_for_not_interior] - ) - - def to_gdf(self) -> GeoDataFrame: - """Return the GeoDataFrame with polygons.""" - self.gdf.geometry = self.to_numpy() - return self.gdf - - def to_numpy(self) -> NDArray[Polygon]: - """Return a numpy array of polygons.""" - exterior = self.rings.loc[self.is_exterior].sort_index().values - assert exterior.shape == (len(self.gdf),) - - nonempty_interiors = self.rings.loc[self.is_interior] - - if not len(nonempty_interiors): - return make_valid(polygons(exterior)) - - empty_interiors = pd.Series( - [None for _ in range(len(self.gdf) * self.max_rings)], - index=self.interiors_index, - ).loc[lambda x: ~x.index.isin(nonempty_interiors.index)] - - interiors = ( - pd.concat([nonempty_interiors, empty_interiors]) - .sort_index() - # make each ring level a column with same length and order as gdf - .unstack(level=2) - .sort_index() - .values - ) - assert interiors.shape == (len(self.gdf), self.max_rings), interiors.shape - - return make_valid(polygons(exterior, interiors)) - - def get_approximate_polygon_endpoints(geoms: GeoSeries) -> GeoSeries: # TODO # la det være mulig med flere? hvis polygonet er et kors, trekant osv. diff --git a/src/sgis/geopandas_tools/snap_polygons.py b/src/sgis/geopandas_tools/snap_polygons.py index b69fbe20..45db6fcd 100644 --- a/src/sgis/geopandas_tools/snap_polygons.py +++ b/src/sgis/geopandas_tools/snap_polygons.py @@ -66,7 +66,8 @@ from .neighbors import get_all_distances, k_nearest_neighbors from .overlay import clean_overlay from .polygon_operations import close_small_holes, close_thin_holes, get_gaps, get_holes -from .polygons_to_lines import PolygonsAsRings, get_cheap_centerlines +from .polygons_as_rings import PolygonsAsRings +from .polygons_to_lines import get_cheap_centerlines from .sfilter import sfilter, sfilter_inverse, sfilter_split