Skip to content

Commit f83d1cf

Browse files
committed
Merge branch 'issue197-vector-cube-udf'
2 parents 29af246 + 5864c8b commit f83d1cf

File tree

7 files changed

+509
-73
lines changed

7 files changed

+509
-73
lines changed

openeo_driver/_version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "0.59.0a1"
1+
__version__ = "0.60.0a1"

openeo_driver/datacube.py

Lines changed: 150 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import io
88

99
import geopandas as gpd
10-
import numpy as np
10+
import numpy
1111
import pyproj
1212
import shapely.geometry
1313
import shapely.geometry.base
@@ -17,11 +17,13 @@
1717
import requests
1818

1919
from openeo.metadata import CollectionMetadata
20-
from openeo.util import ensure_dir
20+
from openeo.util import ensure_dir, str_truncate
21+
import openeo.udf
2122
from openeo_driver.datastructs import SarBackscatterArgs, ResolutionMergeArgs, StacAsset
2223
from openeo_driver.errors import FeatureUnsupportedException, InternalException
2324
from openeo_driver.util.geometry import GeometryBufferer, validate_geojson_coordinates
2425
from openeo_driver.util.ioformats import IOFORMATS
26+
from openeo_driver.util.pgparsing import SingleRunUDFProcessGraph
2527
from openeo_driver.util.utm import area_in_square_meters
2628
from openeo_driver.utils import EvalEnv
2729

@@ -214,13 +216,15 @@ class DriverVectorCube:
214216
These components are "joined" on the GeoPandas dataframe's index and DataArray first dimension
215217
"""
216218
DIM_GEOMETRIES = "geometries"
217-
FLATTEN_PREFIX = "vc"
219+
DIM_BANDS = "bands"
220+
DIM_PROPERTIES = "properties"
221+
COLUMN_SELECTION_ALL = "all"
222+
COLUMN_SELECTION_NUMERICAL = "numerical"
218223

219224
def __init__(
220225
self,
221226
geometries: gpd.GeoDataFrame,
222227
cube: Optional[xarray.DataArray] = None,
223-
flatten_prefix: str = FLATTEN_PREFIX,
224228
):
225229
"""
226230
@@ -234,18 +238,77 @@ def __init__(
234238
log.error(f"First cube dim should be {self.DIM_GEOMETRIES!r} but got dims {cube.dims!r}")
235239
raise VectorCubeError("Cube's first dimension is invalid.")
236240
if not geometries.index.equals(cube.indexes[cube.dims[0]]):
237-
log.error(f"Invalid VectorCube components {geometries.index!r} != {cube.indexes[cube.dims[0]]!r}")
241+
log.error(f"Invalid VectorCube components {geometries.index=} != {cube.indexes[cube.dims[0]]=}")
238242
raise VectorCubeError("Incompatible vector cube components")
239243
self._geometries: gpd.GeoDataFrame = geometries
240244
self._cube = cube
241-
self._flatten_prefix = flatten_prefix
242245

243-
def with_cube(self, cube: xarray.DataArray, flatten_prefix: str = FLATTEN_PREFIX) -> "DriverVectorCube":
246+
def with_cube(self, cube: xarray.DataArray) -> "DriverVectorCube":
244247
"""Create new vector cube with same geometries but new cube"""
245248
log.info(f"Creating vector cube with new cube {cube.name!r}")
246-
return type(self)(
247-
geometries=self._geometries, cube=cube, flatten_prefix=flatten_prefix
248-
)
249+
return type(self)(geometries=self._geometries, cube=cube)
250+
251+
@classmethod
252+
def from_geodataframe(
253+
cls,
254+
data: gpd.GeoDataFrame,
255+
*,
256+
columns_for_cube: Union[List[str], str] = COLUMN_SELECTION_NUMERICAL,
257+
dimension_name: str = DIM_PROPERTIES,
258+
) -> "DriverVectorCube":
259+
"""
260+
Build a DriverVectorCube from given GeoPandas data frame,
261+
using the data frame geometries as vector cube geometries
262+
and other columns (as specified) as cube values along a "bands" dimension
263+
264+
:param data: geopandas data frame
265+
:param columns_for_cube: which data frame columns to use as cube values.
266+
One of:
267+
- "numerical": automatically pick numerical columns
268+
- "all": use all columns as cube values
269+
- list of column names
270+
:param dimension_name: name of the "bands" dimension
271+
:return: vector cube
272+
"""
273+
available_columns = [c for c in data.columns if c != "geometry"]
274+
275+
if columns_for_cube is None:
276+
# TODO #114: what should default selection be?
277+
columns_for_cube = cls.COLUMN_SELECTION_NUMERICAL
278+
279+
if columns_for_cube == cls.COLUMN_SELECTION_NUMERICAL:
280+
columns_for_cube = [c for c in available_columns if numpy.issubdtype(data[c].dtype, numpy.number)]
281+
elif columns_for_cube == cls.COLUMN_SELECTION_ALL:
282+
columns_for_cube = available_columns
283+
elif isinstance(columns_for_cube, list):
284+
# TODO #114 limit to subset with available columns (and automatically fill in missing columns with nodata)?
285+
columns_for_cube = columns_for_cube
286+
else:
287+
raise ValueError(columns_for_cube)
288+
assert isinstance(columns_for_cube, list)
289+
290+
if columns_for_cube:
291+
cube_df = data[columns_for_cube]
292+
# TODO: remove `columns_for_cube` from geopandas data frame?
293+
# Enabling that triggers failure of som existing tests that use `aggregate_spatial`
294+
# to "enrich" a vector cube with pre-existing properties
295+
# Also see https://github.com/Open-EO/openeo-api/issues/504
296+
# geometries_df = data.drop(columns=columns_for_cube)
297+
geometries_df = data
298+
299+
# TODO: leverage pandas `to_xarray` and xarray `to_array` instead of this manual building?
300+
cube: xarray.DataArray = xarray.DataArray(
301+
data=cube_df.values,
302+
dims=[cls.DIM_GEOMETRIES, dimension_name],
303+
coords={
304+
cls.DIM_GEOMETRIES: data.geometry.index.to_list(),
305+
dimension_name: cube_df.columns,
306+
},
307+
)
308+
return cls(geometries=geometries_df, cube=cube)
309+
310+
else:
311+
return cls(geometries=data)
249312

250313
@classmethod
251314
def from_fiona(
@@ -258,15 +321,21 @@ def from_fiona(
258321
if len(paths) != 1:
259322
# TODO #114 EP-3981: support multiple paths
260323
raise FeatureUnsupportedException(message="Loading a vector cube from multiple files is not supported")
324+
columns_for_cube = (options or {}).get("columns_for_cube", cls.COLUMN_SELECTION_NUMERICAL)
261325
# TODO #114 EP-3981: lazy loading like/with DelayedVector
262326
# note for GeoJSON: will consider Feature.id as well as Feature.properties.id
263327
if "parquet" == driver:
264-
return cls.from_parquet(paths=paths)
328+
return cls.from_parquet(paths=paths, columns_for_cube=columns_for_cube)
265329
else:
266-
return cls(geometries=gpd.read_file(paths[0], driver=driver))
330+
gdf = gpd.read_file(paths[0], driver=driver)
331+
return cls.from_geodataframe(gdf, columns_for_cube=columns_for_cube)
267332

268333
@classmethod
269-
def from_parquet(cls, paths: List[Union[str, Path]]):
334+
def from_parquet(
335+
cls,
336+
paths: List[Union[str, Path]],
337+
columns_for_cube: Union[List[str], str] = COLUMN_SELECTION_NUMERICAL,
338+
):
270339
if len(paths) != 1:
271340
# TODO #114 EP-3981: support multiple paths
272341
raise FeatureUnsupportedException(
@@ -284,10 +353,14 @@ def from_parquet(cls, paths: List[Union[str, Path]]):
284353
if "OGC:CRS84" in str(df.crs) or "WGS 84 (CRS84)" in str(df.crs):
285354
# workaround for not being able to decode ogc:crs84
286355
df.crs = CRS.from_epsg(4326)
287-
return cls(geometries=df)
356+
return cls.from_geodataframe(df, columns_for_cube=columns_for_cube)
288357

289358
@classmethod
290-
def from_geojson(cls, geojson: dict) -> "DriverVectorCube":
359+
def from_geojson(
360+
cls,
361+
geojson: dict,
362+
columns_for_cube: Union[List[str], str] = COLUMN_SELECTION_NUMERICAL,
363+
) -> "DriverVectorCube":
291364
"""Construct vector cube from GeoJson dict structure"""
292365
validate_geojson_coordinates(geojson)
293366
# TODO support more geojson types?
@@ -305,7 +378,8 @@ def from_geojson(cls, geojson: dict) -> "DriverVectorCube":
305378
raise FeatureUnsupportedException(
306379
f"Can not construct DriverVectorCube from {geojson.get('type', type(geojson))!r}"
307380
)
308-
return cls(geometries=gpd.GeoDataFrame.from_features(features))
381+
gdf = gpd.GeoDataFrame.from_features(features)
382+
return cls.from_geodataframe(gdf, columns_for_cube=columns_for_cube)
309383

310384
@classmethod
311385
def from_geometry(
@@ -320,7 +394,9 @@ def from_geometry(
320394
geometry = [geometry]
321395
return cls(geometries=gpd.GeoDataFrame(geometry=geometry))
322396

323-
def _as_geopandas_df(self) -> gpd.GeoDataFrame:
397+
def _as_geopandas_df(
398+
self, flatten_prefix: Optional[str] = None, flatten_name_joiner: str = "~"
399+
) -> gpd.GeoDataFrame:
324400
"""Join geometries and cube as a geopandas dataframe"""
325401
# TODO: avoid copy?
326402
df = self._geometries.copy(deep=True)
@@ -331,18 +407,20 @@ def _as_geopandas_df(self) -> gpd.GeoDataFrame:
331407
if self._cube.dims[1:]:
332408
stacked = self._cube.stack(prop=self._cube.dims[1:])
333409
log.info(f"Flattened cube component of vector cube to {stacked.shape[1]} properties")
410+
name_prefix = [flatten_prefix] if flatten_prefix else []
334411
for p in stacked.indexes["prop"]:
335-
name = "~".join(str(x) for x in [self._flatten_prefix] + list(p))
412+
name = flatten_name_joiner.join(str(x) for x in name_prefix + list(p))
336413
# TODO: avoid column collisions?
337414
df[name] = stacked.sel(prop=p)
338415
else:
339-
df[self._flatten_prefix] = self._cube
416+
# TODO: better fallback column/property name in this case?
417+
df[flatten_prefix or "_vc"] = self._cube
340418

341419
return df
342420

343-
def to_geojson(self) -> dict:
421+
def to_geojson(self, flatten_prefix: Optional[str] = None) -> dict:
344422
"""Export as GeoJSON FeatureCollection."""
345-
return shapely.geometry.mapping(self._as_geopandas_df())
423+
return shapely.geometry.mapping(self._as_geopandas_df(flatten_prefix=flatten_prefix))
346424

347425
def to_wkt(self) -> List[str]:
348426
wkts = [str(g) for g in self._geometries.geometry]
@@ -366,7 +444,8 @@ def write_assets(
366444
)
367445
return self.to_legacy_save_result().write_assets(directory)
368446

369-
self._as_geopandas_df().to_file(path, driver=format_info.fiona_driver)
447+
gdf = self._as_geopandas_df(flatten_prefix=options.get("flatten_prefix"))
448+
gdf.to_file(path, driver=format_info.fiona_driver)
370449

371450
if not format_info.multi_file:
372451
# single file format
@@ -461,6 +540,9 @@ def geometry_count(self) -> int:
461540
def get_geometries(self) -> Sequence[shapely.geometry.base.BaseGeometry]:
462541
return self._geometries.geometry
463542

543+
def get_cube(self) -> Optional[xarray.DataArray]:
544+
return self._cube
545+
464546
def get_ids(self) -> Optional[Sequence]:
465547
return self._geometries.get("id")
466548

@@ -471,8 +553,9 @@ def get_xarray_cube_basics(self) -> Tuple[tuple, dict]:
471553
return dims, coords
472554

473555
def __eq__(self, other):
474-
return (isinstance(other, DriverVectorCube)
475-
and np.array_equal(self._as_geopandas_df().values, other._as_geopandas_df().values))
556+
return isinstance(other, DriverVectorCube) and numpy.array_equal(
557+
self._as_geopandas_df().values, other._as_geopandas_df().values
558+
)
476559

477560
def fit_class_random_forest(
478561
self,
@@ -504,6 +587,49 @@ def buffer_points(self, distance: float = 10) -> "DriverVectorCube":
504587
]
505588
)
506589

590+
def apply_dimension(
591+
self,
592+
process: dict,
593+
*,
594+
dimension: str,
595+
target_dimension: Optional[str] = None,
596+
context: Optional[dict] = None,
597+
env: EvalEnv,
598+
) -> "DriverVectorCube":
599+
single_run_udf = SingleRunUDFProcessGraph.parse_or_none(process)
600+
601+
if single_run_udf:
602+
# Process with single "run_udf" node
603+
# TODO: check provided dimension with actual dimension of the cube
604+
if dimension in (self.DIM_BANDS, self.DIM_PROPERTIES) and target_dimension is None:
605+
log.warning(
606+
f"Using experimental feature: DriverVectorCube.apply_dimension along dim {dimension} and empty cube"
607+
)
608+
# TODO: this is non-standard special case: vector cube with only geometries, but no "cube" data
609+
gdf = self._as_geopandas_df()
610+
feature_collection = openeo.udf.FeatureCollection(id="_", data=gdf)
611+
udf_data = openeo.udf.UdfData(
612+
proj={"EPSG": self._geometries.crs.to_epsg()},
613+
feature_collection_list=[feature_collection],
614+
user_context=context,
615+
)
616+
log.info(f"[run_udf] Running UDF {str_truncate(single_run_udf.udf, width=256)!r} on {udf_data!r}")
617+
result_data = env.backend_implementation.processing.run_udf(udf=single_run_udf.udf, data=udf_data)
618+
log.info(f"[run_udf] UDF resulted in {result_data!r}")
619+
620+
if not isinstance(result_data, openeo.udf.UdfData):
621+
raise ValueError(f"UDF should return UdfData, but got {type(result_data)}")
622+
result_features = result_data.get_feature_collection_list()
623+
if not (result_features and len(result_features) == 1):
624+
raise ValueError(
625+
f"UDF should return single feature collection but got {result_features and len(result_features)}"
626+
)
627+
return DriverVectorCube(geometries=result_features[0].data)
628+
629+
raise FeatureUnsupportedException(
630+
message=f"DriverVectorCube.apply_dimension with {dimension=} and {bool(single_run_udf)=}"
631+
)
632+
507633

508634
class DriverMlModel:
509635
"""Base class for driver-side 'ml-model' data structures"""

openeo_driver/dummy/dummy_backend.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ def assert_polygon_sequence(geometries: Union[Sequence, BaseMultipartGeometry])
265265
coords=coords,
266266
name="aggregate_spatial",
267267
)
268-
return geometries.with_cube(cube=cube, flatten_prefix="agg")
268+
return geometries.with_cube(cube=cube)
269269
elif isinstance(geometries, str):
270270
geometries = [geometry for geometry in DelayedVector(geometries).geometries]
271271
n_geometries = assert_polygon_sequence(geometries)

openeo_driver/util/pgparsing.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import dataclasses
2+
from typing import Optional
3+
4+
5+
class NotASingleRunUDFProcessGraph(ValueError):
6+
pass
7+
8+
9+
@dataclasses.dataclass(frozen=True)
10+
class SingleRunUDFProcessGraph:
11+
"""
12+
Container (and parser) for a callback process graph containing only a single `run_udf` node.
13+
"""
14+
15+
data: dict
16+
udf: str
17+
runtime: str
18+
version: Optional[str] = None
19+
context: Optional[dict] = None
20+
21+
@classmethod
22+
def parse(cls, process_graph: dict) -> "SingleRunUDFProcessGraph":
23+
try:
24+
(node,) = process_graph.values()
25+
assert node["process_id"] == "run_udf"
26+
assert node["result"] is True
27+
arguments = node["arguments"]
28+
assert {"data", "udf", "runtime"}.issubset(arguments.keys())
29+
30+
return cls(
31+
data=arguments["data"],
32+
udf=arguments["udf"],
33+
runtime=arguments["runtime"],
34+
version=arguments.get("version"),
35+
context=arguments.get("context") or {},
36+
)
37+
except Exception as e:
38+
raise NotASingleRunUDFProcessGraph(str(e)) from e
39+
40+
@classmethod
41+
def parse_or_none(cls, process_graph: dict) -> Optional["SingleNodeRunUDFProcessGraph"]:
42+
try:
43+
return cls.parse(process_graph=process_graph)
44+
except NotASingleRunUDFProcessGraph:
45+
return None

0 commit comments

Comments
 (0)