Skip to content

aggregate spatial geojson2vectorcube (issue #141) #142

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
49 changes: 25 additions & 24 deletions openeo_driver/ProcessGraphDeserializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -740,32 +740,24 @@ def fit_class_random_forest(args: dict, env: EvalEnv) -> DriverMlModel:
return DriverMlModel()

predictors = extract_arg(args, 'predictors')
if not isinstance(predictors, AggregatePolygonSpatialResult):
# TODO #114 EP-3981 add support for real vector cubes.
if not isinstance(predictors, (AggregatePolygonSpatialResult, DriverVectorCube)):
# TODO #114 EP-3981 drop AggregatePolygonSpatialResult support.
raise ProcessParameterInvalidException(
parameter="predictors", process="fit_class_random_forest",
reason=f"should be non-temporal vector-cube (got `{type(predictors)}`)."
)
target = extract_arg(args, 'target')
if not (
isinstance(target, dict)
and target.get("type") == "FeatureCollection"
and isinstance(target.get("features"), list)
):
# TODO #114 EP-3981 vector cube support
raise ProcessParameterInvalidException(
parameter="target", process="fit_class_random_forest",
reason='only GeoJSON FeatureCollection is currently supported.',
parameter="predictors",
process="fit_class_random_forest",
reason=f"should be non-temporal vector-cube, got {type(predictors)}.",
)
if any(
# TODO: allow string based target labels too?
not isinstance(deep_get(f, "properties", "target", default=None), int)
for f in target.get("features", [])
):

target = extract_arg(args, "target")
if isinstance(target, DriverVectorCube):
pass
elif isinstance(target, dict) and target.get("type") == "FeatureCollection":
target = env.backend_implementation.DriverVectorCube.from_geojson(target)
else:
raise ProcessParameterInvalidException(
parameter="target",
process="fit_class_random_forest",
reason="Each feature (from target feature collection) should have an integer 'target' property.",
reason=f"expected vector-cube like value but got {type(target)}.",
)

# TODO: get defaults from process spec?
Expand Down Expand Up @@ -1056,7 +1048,7 @@ def aggregate_spatial(args: dict, env: EvalEnv) -> DriverDataCube:
if isinstance(geoms, DriverVectorCube):
geoms = geoms
elif isinstance(geoms, dict):
geoms = geojson_to_geometry(geoms)
geoms = env.backend_implementation.DriverVectorCube.from_geojson(geoms)
elif isinstance(geoms, DelayedVector):
geoms = geoms.path
else:
Expand Down Expand Up @@ -1321,6 +1313,12 @@ def run_udf(args: dict, env: EvalEnv):
# TODO #114 add support for DriverVectorCube
if isinstance(data, AggregatePolygonResult):
pass
if isinstance(data, DriverVectorCube):
# TODO: this is temporary stopgap measure, converting to old-style save results to stay backward compatible.
# Better have proper DriverVectorCube support in run_udf?
# How does that fit in UdfData and UDF function signatures?
data = data.to_legacy_save_result()

if isinstance(data, (DelayedVector, dict)):
if isinstance(data, dict):
data = DelayedVector.from_json_dict(data)
Expand All @@ -1338,7 +1336,10 @@ def run_udf(args: dict, env: EvalEnv):
)
else:
raise ProcessParameterInvalidException(
parameter='data', process='run_udf', reason=f"Invalid data type {type(data)!r} expected raster-cube.")
parameter="data",
process="run_udf",
reason=f"Unsupported data type {type(data)}.",
)

_log.info(f"[run_udf] Running UDF {str_truncate(udf, width=256)!r} on {data!r}")
result_data = openeo.udf.run_udf_code(udf, data)
Expand Down Expand Up @@ -1550,7 +1551,7 @@ def to_vector_cube(args: Dict, env: EvalEnv):
# TODO: standardization of something like this? https://github.com/Open-EO/openeo-processes/issues/346
data = extract_arg(args, "data", process_id="to_vector_cube")
if isinstance(data, dict) and data.get("type") in {"Polygon", "MultiPolygon", "Feature", "FeatureCollection"}:
return DriverVectorCube.from_geojson(data)
return env.backend_implementation.DriverVectorCube.from_geojson(data)
# TODO: support more inputs: string with geojson, string with WKT, list of WKT, string with URL to GeoJSON, ...
raise FeatureUnsupportedException(f"Converting {type(data)} to vector cube is not supported")

Expand Down
2 changes: 1 addition & 1 deletion openeo_driver/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.20.4a1'
__version__ = "0.21.0a1"
5 changes: 4 additions & 1 deletion openeo_driver/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from openeo.capabilities import ComparableVersion
from openeo.internal.process_graph_visitor import ProcessGraphVisitor
from openeo.util import rfc3339, dict_no_none
from openeo_driver.datacube import DriverDataCube, DriverMlModel
from openeo_driver.datacube import DriverDataCube, DriverMlModel, DriverVectorCube
from openeo_driver.datastructs import SarBackscatterArgs
from openeo_driver.dry_run import SourceConstraint
from openeo_driver.errors import CollectionNotFoundException, ServiceUnsupportedException, FeatureUnsupportedException
Expand Down Expand Up @@ -570,6 +570,9 @@ class OpenEoBackendImplementation:
enable_basic_auth = True
enable_oidc_auth = True

# Overridable vector cube implementation
DriverVectorCube = DriverVectorCube

def __init__(
self,
secondary_services: Optional[SecondaryServices] = None,
Expand Down
53 changes: 52 additions & 1 deletion openeo_driver/datacube.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,9 @@ def __init__(
def with_cube(self, cube: xarray.DataArray, flatten_prefix: str = FLATTEN_PREFIX) -> "DriverVectorCube":
"""Create new vector cube with same geometries but new cube"""
log.info(f"Creating vector cube with new cube {cube.name!r}")
return DriverVectorCube(geometries=self._geometries, cube=cube, flatten_prefix=flatten_prefix)
return type(self)(
geometries=self._geometries, cube=cube, flatten_prefix=flatten_prefix
)

@classmethod
def from_fiona(cls, paths: List[str], driver: str, options: dict) -> "DriverVectorCube":
Expand Down Expand Up @@ -226,6 +228,7 @@ def _as_geopandas_df(self) -> gpd.GeoDataFrame:
return df

def to_geojson(self):
"""Export as GeoJSON FeatureCollection."""
return shapely.geometry.mapping(self._as_geopandas_df())

def to_wkt(self) -> List[str]:
Expand All @@ -242,6 +245,11 @@ def write_assets(
format_info = IOFORMATS.get(format)
# TODO: check if format can be used for vector data?
path = directory / f"vectorcube.{format_info.extension}"

if format_info.format == "JSON":
# TODO: eliminate this legacy format?
return self._write_legacy_aggregate_polygon_result_json(directory=directory)

self._as_geopandas_df().to_file(path, driver=format_info.fiona_driver)

if not format_info.multi_file:
Expand Down Expand Up @@ -274,6 +282,40 @@ def write_assets(
def to_multipolygon(self) -> shapely.geometry.MultiPolygon:
return shapely.ops.unary_union(self._geometries.geometry)

def to_legacy_save_result(self) -> Union["AggregatePolygonResult", "JSONResult"]:
"""
Export to legacy AggregatePolygonResult/JSONResult objects.
Provided as temporary adaption layer while migrating to real vector cubes.
"""
# TODO: eliminate these legacy, non-standard format?
from openeo_driver.save_result import AggregatePolygonResult, JSONResult

cube = self._cube
# TODO: more flexible temporal/band dimension detection?
if cube.dims == (self.DIM_GEOMETRIES, "t"):
# Add single band dimension
cube = cube.expand_dims({"bands": ["band"]}, axis=-1)
if cube.dims == (self.DIM_GEOMETRIES, "t", "bands"):
cube = cube.transpose("t", self.DIM_GEOMETRIES, "bands")
timeseries = {
t.item(): t_slice.values.tolist()
for t, t_slice in zip(cube.coords["t"], cube)
}
return AggregatePolygonResult(timeseries=timeseries, regions=self)
elif cube.dims == (self.DIM_GEOMETRIES, "bands"):
cube = cube.transpose(self.DIM_GEOMETRIES, "bands")
return JSONResult(data=cube.values.tolist())
raise ValueError(
f"Unsupported cube configuration {cube.dims} for _write_legacy_aggregate_polygon_result_json"
)

def _write_legacy_aggregate_polygon_result_json(
self, directory: Path
) -> Dict[str, StacAsset]:
"""Export to legacy AggregatePolygonResult JSON format"""
# TODO: eliminate this legacy, non-standard format?
return self.to_legacy_save_result().write_assets(directory)

def get_bounding_box(self) -> Tuple[float, float, float, float]:
return tuple(self._geometries.total_bounds)

Expand All @@ -293,6 +335,15 @@ def __eq__(self, other):
return (isinstance(other, DriverVectorCube)
and np.array_equal(self._as_geopandas_df().values, other._as_geopandas_df().values))

def fit_class_random_forest(
self,
target: "DriverVectorCube",
num_trees: int = 100,
max_variables: Optional[Union[int, str]] = None,
seed: Optional[int] = None,
) -> "DriverMlModel":
raise NotImplementedError


class DriverMlModel:
"""Base class for driver-side 'ml-model' data structures"""
Expand Down
6 changes: 4 additions & 2 deletions openeo_driver/dry_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,10 @@ def get_source_constraints(self, merge=True) -> List[SourceConstraint]:
return source_constraints

def get_geometries(
self, operation="aggregate_spatial"
) -> List[Union[shapely.geometry.base.BaseGeometry, DelayedVector]]:
self, operation="aggregate_spatial"
) -> List[
Union[shapely.geometry.base.BaseGeometry, DelayedVector, DriverVectorCube]
]:
"""Get geometries (polygons or DelayedVector), as used by aggregate_spatial"""
geometries_by_id = {}
for leaf in self.get_trace_leaves():
Expand Down
34 changes: 32 additions & 2 deletions openeo_driver/dummy/dummy_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,16 @@ def assert_polygon_sequence(geometries: Union[Sequence, BaseMultipartGeometry])
dims += (self.metadata.band_dimension.name,)
coords[self.metadata.band_dimension.name] = self.metadata.band_names
shape = [len(coords[d]) for d in dims]
data = numpy.arange(numpy.prod(shape)).reshape(shape)
cube = xarray.DataArray(data=data, dims=dims, coords=coords, name="aggregate_spatial")
data = numpy.arange(numpy.prod(shape), dtype="float")
# Start with some more interesting values (e.g. to test NaN/null/None handling)
data[0] = 2.345
data[1] = float("nan")
cube = xarray.DataArray(
data=data.reshape(shape),
dims=dims,
coords=coords,
name="aggregate_spatial",
)
return geometries.with_cube(cube=cube, flatten_prefix="agg")
elif isinstance(geometries, str):
geometries = [geometry for geometry in DelayedVector(geometries).geometries]
Expand Down Expand Up @@ -276,6 +284,25 @@ def fit_class_random_forest(
)


class DummyVectorCube(DriverVectorCube):
def fit_class_random_forest(
self,
target: DriverVectorCube,
num_trees: int = 100,
max_variables: Optional[Union[int, str]] = None,
seed: Optional[int] = None,
) -> "DriverMlModel":
return DummyMlModel(
process_id="fit_class_random_forest",
# TODO: handle `to_geojson` in `DummyMlModel.write_assets` instead of here?
data=self.to_geojson(),
target=target.to_geojson(),
num_trees=num_trees,
max_variables=max_variables,
seed=seed,
)


class DummyMlModel(DriverMlModel):

def __init__(self, **kwargs):
Expand Down Expand Up @@ -600,6 +627,9 @@ def delete(self, user_id: str, process_id: str) -> None:


class DummyBackendImplementation(OpenEoBackendImplementation):

DriverVectorCube = DummyVectorCube

def __init__(self, processing: Optional[Processing] = None):
super(DummyBackendImplementation, self).__init__(
secondary_services=DummySecondaryServices(),
Expand Down
4 changes: 2 additions & 2 deletions openeo_driver/save_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def __init__(self, data, format: str = "json", options: dict = None):
super().__init__(format=format, options=options)
self.data = data

def write_assets(self, path:str) -> Dict[str, StacAsset]:
def write_assets(self, path: Union[str, Path]) -> Dict[str, StacAsset]:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

path arg name is inconsistent with other write_assets

"""
Save generated assets into a directory, return asset metadata.
TODO: can an asset also be a full STAC item? In principle, one openEO job can either generate a full STAC collection, or one STAC item with multiple assets...
Expand Down Expand Up @@ -220,7 +220,7 @@ def get_data(self):
# By default, keep original (proprietary) result format
return self.data

def write_assets(self, directory: str) -> Dict[str, StacAsset]:
def write_assets(self, directory: Union[str, Path]) -> Dict[str, StacAsset]:
"""
Save generated assets into a directory, return asset metadata.
TODO: can an asset also be a full STAC item? In principle, one openEO job can either generate a full STAC collection, or one STAC item with multiple assets...
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def udp_registry(backend_implementation) -> UserDefinedProcesses:
def flask_app(backend_implementation) -> flask.Flask:
app = build_app(
backend_implementation=backend_implementation,
# error_handling=False
# error_handling=False,
)
app.config.from_mapping(TEST_APP_CONFIG)
return app
Expand Down
2 changes: 1 addition & 1 deletion tests/data/pg/1.0/no_nested_json_result.json
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
"result": true,
"process_id": "save_result",
"arguments": {
"format": "GTIFF",
"format": "GeoJSON",
"data": {
"from_node": "aggregatespatial1"
},
Expand Down
4 changes: 4 additions & 0 deletions tests/data/pg/1.0/run_udf_on_timeseries.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
"temporal_extent": [
"2017-11-21",
"2017-11-21"
],
"bands": [
"B02",
"B03"
]
}
},
Expand Down
Loading