Skip to content

Vector cube support (aggregate_spatial) #116

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 17 additions & 13 deletions openeo_driver/ProcessGraphDeserializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ def _extract_load_parameters(env: EvalEnv, source_id: tuple) -> LoadParameters:
params.bands = constraints.get("bands", None)
params.properties = constraints.get("properties", {})
params.aggregate_spatial_geometries = constraints.get("aggregate_spatial", {}).get("geometries")
if params.aggregate_spatial_geometries == None:
if params.aggregate_spatial_geometries is None:
params.aggregate_spatial_geometries = constraints.get("filter_spatial", {}).get("geometries")
params.sar_backscatter = constraints.get("sar_backscatter", None)
params.process_types = process_types
Expand Down Expand Up @@ -521,7 +521,7 @@ def vector_buffer(args: Dict, env: EvalEnv) -> dict:
input_crs = 'epsg:4326'
buffer_resolution = 3

# TODO EP-3981 convert `geometry` to vector cube and move buffer logic to there
# TODO #114 EP-3981 convert `geometry` to vector cube and move buffer logic to there
if isinstance(geometry, str):
# TODO: assumption here that `geometry` is a path/url
geoms = list(DelayedVector(geometry).geometries)
Expand Down Expand Up @@ -664,7 +664,7 @@ def chunk_polygon(args: dict, env: EvalEnv) -> DriverDataCube:
data_cube = extract_arg(args, 'data')

# Chunks parameter check.
# TODO EP-3981 normalize first to vector cube and simplify logic
# TODO #114 EP-3981 normalize first to vector cube and simplify logic
if isinstance(chunks, DelayedVector):
polygons = list(chunks.geometries)
for p in polygons:
Expand Down Expand Up @@ -705,7 +705,7 @@ def fit_class_random_forest(args: dict, env: EvalEnv) -> DriverMlModel:

predictors = extract_arg(args, 'predictors')
if not isinstance(predictors, AggregatePolygonSpatialResult):
# TODO EP-3981 add support for real vector cubes.
# TODO #114 EP-3981 add support for real vector cubes.
raise ProcessParameterInvalidException(
parameter="predictors", process="fit_class_random_forest",
reason=f"should be non-temporal vector-cube (got `{type(predictors)}`)."
Expand All @@ -716,7 +716,7 @@ def fit_class_random_forest(args: dict, env: EvalEnv) -> DriverMlModel:
and target.get("type") == "FeatureCollection"
and isinstance(target.get("features"), list)
):
# TODO EP-3981 vector cube support
# TODO #114 EP-3981 vector cube support
raise ProcessParameterInvalidException(
parameter="target", process="fit_class_random_forest",
reason='only GeoJSON FeatureCollection is currently supported.',
Expand Down Expand Up @@ -949,7 +949,10 @@ def aggregate_spatial(args: dict, env: EvalEnv) -> DriverDataCube:
target_dimension = args.get('target_dimension', None)

geoms = extract_arg(args, 'geometries')
if isinstance(geoms, dict):
# TODO #114: convert all cases to DriverVectorCube first and just work with that
if isinstance(geoms, DriverVectorCube):
geoms = geoms
elif isinstance(geoms, dict):
geoms = geojson_to_geometry(geoms)
elif isinstance(geoms, DelayedVector):
geoms = geoms.path
Expand Down Expand Up @@ -990,11 +993,11 @@ def mask_polygon(args: dict, env: EvalEnv) -> DriverDataCube:
replacement = args.get('replacement', None)
inside = args.get('inside', False)

# TODO: instead of if-elif-else chain: generically "cast" to VectorCube first (e.g. for wide input
# TODO #114: instead of if-elif-else chain: generically "cast" to VectorCube first (e.g. for wide input
# support: GeoJSON, WKT, ...) and then convert to MultiPolygon?
if isinstance(mask, DelayedVector):
# TODO: avoid reading DelayedVector twice due to dry-run?
# TODO EP-3981 embed DelayedVector in VectorCube implementation
# TODO #114 EP-3981 embed DelayedVector in VectorCube implementation
polygon = shapely.ops.unary_union(list(mask.geometries))
elif isinstance(mask, DriverVectorCube):
polygon = mask.to_multipolygon()
Expand Down Expand Up @@ -1070,7 +1073,7 @@ def filter_spatial(args: Dict, env: EvalEnv) -> DriverDataCube:
geometries = extract_arg(args, 'geometries')

if not isinstance(geometries, dict):
# TODO: support DelayedVector
# TODO #114: support DriverDataCube
raise NotImplementedError("filter_spatial only supports dict but got {g!r}".format(g=geometries))

geometries = geojson_to_geometry(geometries)
Expand Down Expand Up @@ -1170,6 +1173,7 @@ def run_udf(args: dict, env: EvalEnv):
if dry_run_tracer and isinstance(data, AggregatePolygonResult):
return JSONResult({})

# TODO #114 add support for DriverVectorCube
if isinstance(data, AggregatePolygonResult):
pass
if isinstance(data, (DelayedVector, dict)):
Expand Down Expand Up @@ -1343,15 +1347,15 @@ def apply_process(process_id: str, args: dict, namespace: Union[str, None], env:
.returns("TODO", schema={"type": "object", "subtype": "vector-cube"})
)
def read_vector(args: Dict, env: EvalEnv) -> DelayedVector:
# TODO EP-3981: deprecated in favor of load_uploaded_files/load_external? https://github.com/Open-EO/openeo-processes/issues/322
# TODO #114 EP-3981: deprecated in favor of load_uploaded_files/load_external? https://github.com/Open-EO/openeo-processes/issues/322
path = extract_arg(args, 'filename')
return DelayedVector(path)


@process_registry_100.add_function(spec=read_spec("openeo-processes/1.x/proposals/load_uploaded_files.json"))
def load_uploaded_files(args: dict, env: EvalEnv) -> DriverVectorCube:
# TODO EP-3981 process name is still under discussion https://github.com/Open-EO/openeo-processes/issues/322
# TODO EP-3981 also other return types: raster data cube, array, ...
# TODO #114 EP-3981 process name is still under discussion https://github.com/Open-EO/openeo-processes/issues/322
# TODO also other return types: raster data cube, array, ...
paths = extract_arg(args, 'paths', process_id="load_uploaded_files")
format = extract_arg(args, 'format', process_id="load_uploaded_files")
options = args.get("options", {})
Expand All @@ -1373,7 +1377,7 @@ def load_uploaded_files(args: dict, env: EvalEnv) -> DriverVectorCube:
.returns("TODO", schema={"type": "object", "subtype": "vector-cube"})
)
def get_geometries(args: Dict, env: EvalEnv) -> Union[DelayedVector, dict]:
# TODO: standardize or deprecate this? EP-3981 https://github.com/Open-EO/openeo-processes/issues/322
# TODO: standardize or deprecate this? #114 EP-3981 https://github.com/Open-EO/openeo-processes/issues/322
feature_collection = args.get('feature_collection', None)
path = args.get('filename', None)
if path is not None:
Expand Down
1 change: 1 addition & 0 deletions openeo_driver/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ class LoadParameters(dict):
global_extent = dict_item(default={})
bands = dict_item(default=None)
properties = dict_item(default={})
# TODO: rename this to filter_spatial_geometries (because it is used for load_collection-time filtering)?
aggregate_spatial_geometries = dict_item(default=None)
sar_backscatter: Union[SarBackscatterArgs, None] = dict_item(default=None)
process_types = dict_item(default=set())
Expand Down
107 changes: 92 additions & 15 deletions openeo_driver/datacube.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,26 @@
import inspect
import logging
import zipfile
from pathlib import Path
from typing import List, Union, Optional, Dict, Any
from typing import List, Union, Optional, Dict, Any, Tuple, Sequence

import geopandas as gpd
import pandas as pd
import shapely.geometry
import shapely.geometry.base
import shapely.ops
import xarray

from openeo import ImageCollection
from openeo.metadata import CollectionMetadata
from openeo.util import ensure_dir
from openeo_driver.datastructs import SarBackscatterArgs, ResolutionMergeArgs, StacAsset
from openeo_driver.errors import FeatureUnsupportedException
from openeo_driver.errors import FeatureUnsupportedException, InternalException
from openeo_driver.util.ioformats import IOFORMATS
from openeo_driver.utils import EvalEnv

log = logging.getLogger(__name__)


class DriverDataCube(ImageCollection):
"""Base class for "driver" side raster data cubes."""
Expand Down Expand Up @@ -103,10 +108,10 @@ def aggregate_temporal(self, intervals: list, reducer, labels: list = None,

def aggregate_spatial(
self,
geometries: Union[shapely.geometry.base.BaseGeometry, str],
geometries: Union[shapely.geometry.base.BaseGeometry, str, "DriverVectorCube"],
reducer: dict,
target_dimension: str = "result",
) -> Union["AggregatePolygonResult", "AggregatePolygonSpatialResult"]:
) -> Union["AggregatePolygonResult", "AggregatePolygonSpatialResult", "DriverVectorCube"]:
self._not_implemented()

def zonal_statistics(self, regions, func:str) -> 'DriverDataCube':
Expand All @@ -132,32 +137,92 @@ def resolution_merge(self, args: ResolutionMergeArgs) -> 'DriverDataCube':
self._not_implemented()


class VectorCubeError(InternalException):
code = "VectorCubeError"

def __init__(self, message="Unspecified VectorCube error"):
super(VectorCubeError, self).__init__(message=message)


class DriverVectorCube:
"""
Base class for driver-side 'vector cubes'

Conceptually comparable to GeoJSON FeatureCollections, but possibly more advanced with more dimensions, bands, ...
Internal design has two components:
- a GeoPandas dataframe for holding the GeoJSON-style properties (possibly heterogeneously typed or sparse/free-style)
- optional xarray DataArray for holding the data cube data (homogeneously typed and rigorously indexed/gridded)
These components are "joined" on the GeoPandas dataframe's index and DataArray first dimension
"""

def __init__(self, data: gpd.GeoDataFrame):
# TODO EP-3981: consider other data containers (xarray) and lazy loading?
self.data = data
DIM_GEOMETRIES = "geometries"
FLATTEN_PREFIX = "vc"

def __init__(
self, geometries: gpd.GeoDataFrame, cube: Optional[xarray.DataArray] = None,
flatten_prefix: str = FLATTEN_PREFIX
):
"""

:param geometries:
:param cube:
:param flatten_prefix: prefix for column/field/property names when flattening the cube
"""
# TODO #114 EP-3981: lazy loading (like DelayedVector)?
if cube is not None:
if cube.dims[0] != self.DIM_GEOMETRIES:
log.error(f"First cube dim should be {self.DIM_GEOMETRIES!r} but got dims {cube.dims!r}")
raise VectorCubeError("Cube's first dimension is invalid.")
if not geometries.index.equals(cube.indexes[cube.dims[0]]):
log.error(f"Invalid VectorCube components {geometries.index!r} != {cube.indexes[cube.dims[0]]!r}")
raise VectorCubeError("Incompatible vector cube components")
self._geometries = geometries
self._cube = cube
self._flatten_prefix = flatten_prefix

def with_cube(self, cube: xarray.DataArray, flatten_prefix: str = FLATTEN_PREFIX) -> "DriverVectorCube":
"""Create new vector cube with same geometries but new cube"""
log.info(f"Creating vector cube with new cube {cube.name!r}")
return DriverVectorCube(geometries=self._geometries, cube=cube, flatten_prefix=flatten_prefix)

@classmethod
def from_fiona(cls, paths: List[str], driver: str, options: dict):
"""Factory to load vector cube data using fiona/GeoPandas."""
if len(paths) != 1:
# TODO EP-3981: support multiple paths
# TODO #114 EP-3981: support multiple paths
raise FeatureUnsupportedException(message="Loading a vector cube from multiple files is not supported")
# TODO EP-3981: lazy loading like/with DelayedVector
return cls(data=gpd.read_file(paths[0], driver=driver))
# TODO #114 EP-3981: lazy loading like/with DelayedVector
return cls(geometries=gpd.read_file(paths[0], driver=driver))

def _as_geopandas_df(self) -> gpd.GeoDataFrame:
"""Join geometries and cube as a geopandas dataframe"""
# TODO: avoid copy?
df = self._geometries.copy(deep=True)
if self._cube is not None:
assert self._cube.dims[0] == self.DIM_GEOMETRIES
# TODO: better way to combine cube with geometries
# Flatten multiple (non-geometry) dimensions from cube to new properties in geopandas dataframe
if self._cube.dims[1:]:
stacked = self._cube.stack(prop=self._cube.dims[1:])
log.info(f"Flattened cube component of vector cube to {stacked.shape[1]} properties")
for p in stacked.indexes["prop"]:
name = "~".join(str(x) for x in [self._flatten_prefix] + list(p))
# TODO: avoid column collisions?
df[name] = stacked.sel(prop=p)
else:
df[self._flatten_prefix] = self._cube

def write_assets(self, directory: Union[str, Path], format: str, options: Optional[dict] = None) -> Dict[str, StacAsset]:
return df

def to_geojson(self):
return shapely.geometry.mapping(self._as_geopandas_df())

def write_assets(
self, directory: Union[str, Path], format: str, options: Optional[dict] = None
) -> Dict[str, StacAsset]:
directory = ensure_dir(directory)
format_info = IOFORMATS.get(format)
# TODO: check if format can be used for vector data?
path = directory / f"vectorcube.{format_info.extension}"
self.data.to_file(path, driver=format_info.fiona_driver)
self._as_geopandas_df().to_file(path, driver=format_info.fiona_driver)

if not format_info.multi_file:
# single file format
Expand Down Expand Up @@ -187,7 +252,19 @@ def write_assets(self, directory: Union[str, Path], format: str, options: Option
return {p.name: {"href": p} for p in components}

def to_multipolygon(self) -> shapely.geometry.MultiPolygon:
return shapely.ops.unary_union(self.data.geometry)
return shapely.ops.unary_union(self._geometries.geometry)

def get_bounding_box(self) -> Tuple[float, float, float, float]:
return tuple(self._geometries.total_bounds)

def get_geometries(self) -> Sequence[shapely.geometry.base.BaseGeometry]:
return self._geometries.geometry

def get_xarray_cube_basics(self) -> Tuple[tuple, dict]:
"""Get initial dims/coords for xarray DataArray construction"""
dims = (self.DIM_GEOMETRIES,)
coords = {self.DIM_GEOMETRIES: self._geometries.index.to_list()}
return dims, coords


class DriverMlModel:
Expand Down
1 change: 1 addition & 0 deletions openeo_driver/delayed_vector.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ def _read_shapefile_crs(shp_path: str) -> pyproj.CRS:

@staticmethod
def _as_geometry_collection(feature_collection: Dict) -> Dict:
# TODO #71 #114 Deprecate/avoid usage of GeometryCollection
geometries = (feature['geometry'] for feature in feature_collection['features'])

return {
Expand Down
20 changes: 12 additions & 8 deletions openeo_driver/dry_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@

from openeo.metadata import CollectionMetadata
from openeo_driver import filter_properties
from openeo_driver.datacube import DriverDataCube
from openeo_driver.datacube import DriverDataCube, DriverVectorCube
from openeo_driver.datastructs import SarBackscatterArgs, ResolutionMergeArgs
from openeo_driver.delayed_vector import DelayedVector
from openeo_driver.save_result import AggregatePolygonResult, AggregatePolygonSpatialResult
Expand Down Expand Up @@ -439,24 +439,27 @@ def mask_polygon(self, mask, replacement=None, inside: bool = False) -> 'DriverD

def aggregate_spatial(
self,
geometries: Union[BaseGeometry, str],
geometries: Union[BaseGeometry, str, DriverVectorCube],
reducer: dict,
target_dimension: str = "result",
) -> Union[AggregatePolygonResult, AggregatePolygonSpatialResult]:
# TODO EP-3981 normalize to vector cube instead of GeometryCollection
# TODO #71 #114 EP-3981 normalize to vector cube instead of GeometryCollection
geometries, bbox = self._normalize_geometry(geometries)
cube = self.filter_bbox(**bbox, operation="_weak_spatial_extent")
cube._process(operation="aggregate_spatial", arguments={"geometries": geometries})
if isinstance(geometries, (Polygon, MultiPolygon)):
geometries = GeometryCollection([geometries])
return AggregatePolygonResult(timeseries={}, regions=geometries)

def _normalize_geometry(self, geometries):
def _normalize_geometry(self, geometries) -> Tuple[Union[DriverVectorCube, DelayedVector, BaseGeometry], dict]:
"""
Helper to preprocess geometries (as used in aggregate_spatial and mask_polygon) and apply related filter_bbox
Helper to preprocess geometries (as used in aggregate_spatial and mask_polygon)
and extract bbox (e.g. for filter_bbox)
"""
# TODO EP-3981 normalize to vector cube instead of GeometryCollection
if isinstance(geometries, dict):
# TODO #71 #114 EP-3981 normalize to vector cube instead of GeometryCollection
if isinstance(geometries, DriverVectorCube):
bbox = geometries.get_bounding_box()
elif isinstance(geometries, dict):
return self._normalize_geometry(geojson_to_geometry(geometries))
elif isinstance(geometries, str):
return self._normalize_geometry(DelayedVector(geometries))
Expand All @@ -475,7 +478,7 @@ def _normalize_geometry(self, geometries):
bbox = dict(west=bbox[0], south=bbox[1], east=bbox[2], north=bbox[3], crs="EPSG:4326")
return geometries, bbox

# TODO: this is a workaround until vectorcube is fully upgraded
# TODO: #114 this is a workaround until vectorcube is fully upgraded
def raster_to_vector(self):
return AggregatePolygonResult(timeseries={}, regions=None)

Expand All @@ -501,6 +504,7 @@ def reduce_dimension(self, reducer, dimension: str, context: Any, env: EvalEnv)

def chunk_polygon(self, reducer, chunks: MultiPolygon, mask_value: float, env: EvalEnv, context={}) -> 'DryRunDataCube':
polygons: List[Polygon] = chunks.geoms
# TODO #71 #114 Deprecate/avoid usage of GeometryCollection
geometries, bbox = self._normalize_geometry(GeometryCollection(polygons))
cube = self.filter_bbox(**bbox, operation="_weak_spatial_extent")
return cube._process("chunk_polygon", arguments={"geometries": geometries})
Expand Down
Loading