Skip to content

Commit

Permalink
module shuffle
Browse files Browse the repository at this point in the history
  • Loading branch information
mortewle committed Oct 15, 2023
1 parent 55091aa commit c3de41c
Show file tree
Hide file tree
Showing 5 changed files with 303 additions and 466 deletions.
5 changes: 5 additions & 0 deletions src/sgis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,20 @@
from .geopandas_tools.overlay import clean_overlay
from .geopandas_tools.point_operations import snap_all, snap_within_distance
from .geopandas_tools.polygon_operations import (
PolygonsAsRings,
close_all_holes,
close_small_holes,
eliminate_by_largest,
eliminate_by_longest,
eliminate_by_smallest,
get_gaps,
get_holes,
get_polygon_clusters,
)
from .geopandas_tools.polygons_as_rings import PolygonsAsRings
from .geopandas_tools.polygons_to_lines import get_cheap_centerlines
from .geopandas_tools.sfilter import sfilter, sfilter_inverse, sfilter_split
from .geopandas_tools.snap_polygons import coverage_clean, snap_polygons
from .helpers import get_object_name, sort_nans_last
from .io.opener import opener
from .io.read_parquet import read_parquet_url
Expand Down
233 changes: 1 addition & 232 deletions src/sgis/geopandas_tools/polygon_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from .geometry_types import get_geom_type, make_all_singlepart, to_single_geom_type
from .neighbors import get_neighbor_indices
from .overlay import clean_overlay
from .polygons_as_rings import PolygonsAsRings
from .sfilter import sfilter, sfilter_inverse


Expand Down Expand Up @@ -505,238 +506,6 @@ def _eliminate(gdf, to_eliminate, aggfunc, crs, **kwargs):
return pd.concat(to_concat).sort_index()


class PolygonsAsRings:
def __init__(self, polys: GeoDataFrame | GeoSeries | GeometryArray, crs=None):
if not isinstance(polys, (pd.DataFrame, pd.Series, GeometryArray)):
raise TypeError(type(polys))

self.polyclass = polys.__class__

if not isinstance(polys, pd.DataFrame):
polys = to_gdf(polys, crs)

self.gdf = polys.reset_index(drop=True)

if crs is not None:
self.crs = crs
elif hasattr(polys, "crs"):
self.crs = polys.crs
else:
self.crs = None

if not len(self.gdf):
self.rings = pd.Series()
return

# TODO: change to get_rings with return_index=True??

exterior = pd.Series(
get_exterior_ring(self.gdf.geometry.values),
index=self.exterior_index,
)

self.max_rings: int = np.max(get_num_interior_rings(self.gdf.geometry.values))

if not self.max_rings:
self.rings = exterior
return

# series same length as number of potential inner rings
interiors = pd.Series(
(
[
[get_interior_ring(geom, i) for i in range(self.max_rings)]
for geom in self.gdf.geometry
]
),
).explode()

interiors.index = self.interiors_index

interiors = interiors.dropna()

self.rings = pd.concat([exterior, interiors])

def get_rings(self, agg: bool = False):
gdf = self.gdf.copy()
rings = self.rings.copy()
if agg:
gdf.geometry = rings.groupby(level=1).agg(unary_union)
else:
rings.index = rings.index.get_level_values(1)
rings.name = "geometry"
gdf = gdf.drop(columns="geometry").join(rings)

if issubclass(self.polyclass, pd.DataFrame):
return GeoDataFrame(gdf, crs=self.crs)
if issubclass(self.polyclass, pd.Series):
return GeoSeries(gdf.geometry)
return self.polyclass(gdf.geometry.values)

def apply_numpy_func_to_interiors(
self, func: Callable, args: tuple | None = None, kwargs: dict | None = None
):
kwargs = kwargs or {}
args = args or ()
arr: NDArray[LinearRing] = self.rings.loc[self.is_interior].values
index: pd.Index = self.rings.loc[self.is_interior].index
results = pd.Series(
np.array(func(arr, *args, **kwargs)),
index=index,
)
self.rings.loc[self.is_interior] = results
return self

def apply_geoseries_func(
self, func: Callable, args: tuple | None = None, kwargs: dict | None = None
):
kwargs = kwargs or {}
args = args or ()

ser: pd.Series = self.rings.loc[self.is_interior]
index: pd.Index = self.rings.loc[self.is_interior].index

results = pd.Series(
func(
GeoSeries(ser, crs=self.crs, index=self.rings.index),
*args,
**kwargs,
),
index=index,
)
self.rings.loc[self.is_interior] = results

return self

def apply_numpy_func(
self, func: Callable, args: tuple | None = None, kwargs: dict | None = None
):
kwargs = kwargs or {}
args = args or ()

self.rings.loc[:] = np.array(func(self.rings.values, *args, **kwargs))
return self

def apply_geoseries_func(
self, func: Callable, args: tuple | None = None, kwargs: dict | None = None
):
kwargs = kwargs or {}
args = args or ()

self.rings.loc[:] = np.array(
func(
GeoSeries(self.rings, crs=self.crs, index=self.rings.index),
*args,
**kwargs,
)
)

return self

def apply_gdf_func(
self, func: Callable, args: tuple | None = None, kwargs: dict | None = None
):
kwargs = kwargs or {}
args = args or ()

gdf = GeoDataFrame(
{"geometry": self.rings.values},
crs=self.crs,
index=self.rings.index.get_level_values(1),
).join(self.gdf.drop(columns="geometry"))

assert len(gdf) == len(self.rings)

gdf.index = self.rings.index

self.rings.loc[:] = func(
gdf,
*args,
**kwargs,
).geometry.values

return self

@property
def is_interior(self):
return self.rings.index.get_level_values(0) == 1

@property
def is_exterior(self):
return self.rings.index.get_level_values(0) == 0

@property
def interiors_index(self):
"""A three-leveled MultiIndex.
Used to separate interior and exterior and sort the interior in
the 'to_numpy' method.
level 0: all 1s, indicating "is interior".
level 1: gdf index repeated *self.max_rings* times.
level 2: interior number index. 0 * len(gdf), 1 * len(gdf), 2 * len(gdf)...
"""
if not self.max_rings:
return pd.MultiIndex()
len_gdf = len(self.gdf)
n_potential_interiors = len_gdf * self.max_rings
gdf_index = sorted(list(self.gdf.index) * self.max_rings)
interior_number_index = np.tile(np.arange(self.max_rings), len_gdf)
one_for_interior = np.repeat(1, n_potential_interiors)

return pd.MultiIndex.from_arrays(
[one_for_interior, gdf_index, interior_number_index]
)

@property
def exterior_index(self):
"""A three-leveled MultiIndex.
Used to separate interior and exterior in the 'to_numpy' method.
Only leve 1 is used for the exterior.
level 0: all 0s, indicating "not interior".
level 1: gdf index.
level 2: All 0s.
"""
zero_for_not_interior = np.repeat(0, len(self.gdf))
return pd.MultiIndex.from_arrays(
[zero_for_not_interior, self.gdf.index, zero_for_not_interior]
)

def to_gdf(self) -> GeoDataFrame:
"""Return the GeoDataFrame with polygons."""
self.gdf.geometry = self.to_numpy()
return self.gdf

def to_numpy(self) -> NDArray[Polygon]:
"""Return a numpy array of polygons."""
exterior = self.rings.loc[self.is_exterior].sort_index().values
assert exterior.shape == (len(self.gdf),)

nonempty_interiors = self.rings.loc[self.is_interior]

if not len(nonempty_interiors):
return make_valid(polygons(exterior))

empty_interiors = pd.Series(
[None for _ in range(len(self.gdf) * self.max_rings)],
index=self.interiors_index,
).loc[lambda x: ~x.index.isin(nonempty_interiors.index)]

interiors = (
pd.concat([nonempty_interiors, empty_interiors])
.sort_index()
# make each ring level a column with same length and order as gdf
.unstack(level=2)
.sort_index()
.values
)
assert interiors.shape == (len(self.gdf), self.max_rings), interiors.shape

return make_valid(polygons(exterior, interiors))


def close_thin_holes(gdf: GeoDataFrame, tolerance: int | float) -> GeoDataFrame:
holes = get_holes(gdf)
inside_holes = sfilter(gdf, holes, predicate="within").unary_union
Expand Down
Loading

0 comments on commit c3de41c

Please sign in to comment.