Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ and start a new "In Progress" section above it.
- `DiskWorkspace`: support unified asset keys ([Open-EO/openeo-geopyspark-driver#1111](https://github.com/Open-EO/openeo-geopyspark-driver/issues/1111))
- Support persisting results metadata URI in job registry ([Open-EO/openeo-geopyspark-driver#1255](https://github.com/Open-EO/openeo-geopyspark-driver/issues/1255))
- More fine-grained `convert_node` cache control ([Open-EO/openeo-geopyspark-driver#1331](https://github.com/Open-EO/openeo-geopyspark-driver/issues/1331)/[#422](https://github.com/Open-EO/openeo-python-driver/pull/422))
- `get_result_metadata` can return items ([Open-EO/openeo-geopyspark-driver#1111](https://github.com/Open-EO/openeo-geopyspark-driver/issues/1111))


## 0.134.0
Expand Down
147 changes: 133 additions & 14 deletions openeo_driver/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -1160,16 +1160,20 @@ def job_results_canonical_url() -> str:
if TREAT_JOB_RESULTS_V100_LIKE_V110 or requested_api_version().at_least("1.1.0"):
ml_model_metadata = None

def job_result_item_url(item_id) -> str:
def job_result_item_url(item_id, is11 = False) -> str:
signer = get_backend_config().url_signer

method_start = ".get_job_result_item"
if is11:
method_start = method_start + "11"
if not signer:
return url_for(".get_job_result_item", job_id=job_id, item_id=item_id, _external=True)
return url_for(method_start, job_id=job_id, item_id=item_id, _external=True)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to avoid a new endpoint (/jobs/<job_id>/results/items11/<item_id>)? I suppose the original _get_job_result_item method can also check whether the item ID corresponds to either:

  • an item derived from an asset (old case) or;
  • an actual item generated in the batch job (new case aka stac11).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by keeping it separate, we can just migrate to 1.1 and then do complete delete of the 1.0 method?
Is there really an advantage to keeping the path the same, or just minor remark?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed. We discussed this in the meanwhile and I'm fine with some temporary code duplication if the 1.0 method is going to disappear eventually.


expires = signer.get_expires()
secure_key = signer.sign_job_item(job_id=job_id, user_id=user_id, item_id=item_id, expires=expires)
user_base64 = user_id_b64_encode(user_id)
return url_for(
".get_job_result_item_signed",
method_start + "_signed",
job_id=job_id,
user_base64=user_base64,
secure_key=secure_key,
Expand All @@ -1178,19 +1182,27 @@ def job_result_item_url(item_id) -> str:
_external=True,
)

for filename, metadata in result_assets.items():
if ("data" in metadata.get("roles", []) and
any(media_type in metadata.get("type", "") for media_type in
["geotiff", "netcdf", "text/csv", "application/parquet"])):
links.append(
{"rel": "item", "href": job_result_item_url(item_id=filename), "type": stac_item_media_type}
)
elif metadata.get("ml_model_metadata", False):
# TODO: Currently we only support one ml_model per batch job.
ml_model_metadata = metadata

if len(result_metadata.items) > 0 :
for item_id in result_metadata.items.keys():
links.append(
{"rel": "item", "href": job_result_item_url(item_id=filename), "type": "application/json"}
{"rel": "item", "href": job_result_item_url(item_id=item_id, is11=True), "type": stac_item_media_type}
)
else:

for filename, metadata in result_assets.items():
if ("data" in metadata.get("roles", []) and
any(media_type in metadata.get("type", "") for media_type in
["geotiff", "netcdf", "text/csv", "application/parquet"])):
links.append(
{"rel": "item", "href": job_result_item_url(item_id=filename), "type": stac_item_media_type}
)
elif metadata.get("ml_model_metadata", False):
# TODO: Currently we only support one ml_model per batch job.
ml_model_metadata = metadata
links.append(
{"rel": "item", "href": job_result_item_url(item_id=filename), "type": "application/json"}
)

result = dict_no_none(
{
Expand Down Expand Up @@ -1357,11 +1369,118 @@ def get_job_result_item_signed(job_id, user_base64, secure_key, item_id):
signer.verify_job_item(signature=secure_key, job_id=job_id, user_id=user_id, item_id=item_id, expires=expires)
return _get_job_result_item(job_id, item_id, user_id)

@api_endpoint
@blueprint.route('/jobs/<job_id>/results/items11/<user_base64>/<secure_key>/<item_id>', methods=['GET'])
def get_job_result_item11_signed(job_id, user_base64, secure_key, item_id):
expires = request.args.get('expires')
signer = get_backend_config().url_signer
user_id = user_id_b64_decode(user_base64)
signer.verify_job_item(signature=secure_key, job_id=job_id, user_id=user_id, item_id=item_id, expires=expires)
return _get_job_result_item11(job_id, item_id, user_id)

@blueprint.route('/jobs/<job_id>/results/items/<item_id>', methods=['GET'])
@auth_handler.requires_bearer_auth
def get_job_result_item(job_id: str, item_id: str, user: User) -> flask.Response:
return _get_job_result_item(job_id, item_id, user.user_id)

@api_endpoint(version=ComparableVersion("1.1.0").or_higher)
@blueprint.route('/jobs/<job_id>/results/items11/<item_id>', methods=['GET'])
@auth_handler.requires_bearer_auth
def get_job_result_item11(job_id: str, item_id: str, user: User) -> flask.Response:
return _get_job_result_item11(job_id, item_id, user.user_id)

def _get_job_result_item11(job_id, item_id, user_id):
if item_id == DriverMlModel.METADATA_FILE_NAME:
return _download_ml_model_metadata(job_id, item_id, user_id)
Comment on lines +1393 to +1394
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this ever be the case?


metadata = backend_implementation.batch_jobs.get_result_metadata(
job_id=job_id, user_id=user_id
)

if item_id not in metadata.items:
raise OpenEOApiException("Item with id {item_id!r} not found in job {job_id!r}".format(item_id=item_id, job_id=job_id), status_code=404)
item_metadata = metadata.items.get(item_id,None)

job_info = backend_implementation.batch_jobs.get_job_info(job_id, user_id)

assets = {}
for asset_key, asset in item_metadata.get("assets", {}).items():
assets[asset_key] = _asset_object(job_id, user_id, asset_key, asset, job_info)

geometry = item_metadata.get("geometry", job_info.geometry)
bbox = item_metadata.get("bbox", job_info.bbox)

properties = item_metadata.get("properties", {"datetime": item_metadata.get("datetime")})
if properties["datetime"] is None:
to_datetime = Rfc3339(propagate_none=True).datetime

start_datetime = item_metadata.get("start_datetime") or to_datetime(job_info.start_datetime)
end_datetime = item_metadata.get("end_datetime") or to_datetime(job_info.end_datetime)

if start_datetime == end_datetime:
properties["datetime"] = start_datetime
else:
if start_datetime:
properties["start_datetime"] = start_datetime
if end_datetime:
properties["end_datetime"] = end_datetime

if job_info.proj_shape:
properties["proj:shape"] = job_info.proj_shape
if job_info.proj_bbox:
properties["proj:bbox"] = job_info.proj_bbox
if job_info.epsg:
properties["proj:epsg"] = job_info.epsg

if job_info.proj_bbox and job_info.epsg:
if not bbox:
bbox = BoundingBox.from_wsen_tuple(job_info.proj_bbox, job_info.epsg).reproject(4326).as_wsen_tuple()
if not geometry:
geometry = BoundingBox.from_wsen_tuple(job_info.proj_bbox, job_info.epsg).as_polygon()
geometry = mapping(reproject_geometry(geometry, CRS.from_epsg(job_info.epsg), CRS.from_epsg(4326)))

stac_item = {
"type": "Feature",
"stac_version": "1.0.0",
"stac_extensions": [
STAC_EXTENSION.EO_V110,
STAC_EXTENSION.FILEINFO,
STAC_EXTENSION.PROJECTION,
],
"id": item_id,
"geometry": geometry,
"bbox": bbox,
"properties": properties,
"links": [
{
"rel": "self",
# MUST be absolute
"href": url_for(".get_job_result_item", job_id=job_id, item_id=item_id, _external=True),
"type": stac_item_media_type,
},
{
"rel": "collection",
"href": url_for(".list_job_results", job_id=job_id, _external=True), # SHOULD be absolute
"type": "application/json",
},
],
"assets": assets,
"collection": job_id,
}
# Add optional items, if they are present.
stac_item.update(
**dict_no_none(
{
"epsg": job_info.epsg,
}
)
)

resp = jsonify(stac_item)
resp.mimetype = stac_item_media_type
return resp


def _get_job_result_item(job_id, item_id, user_id):
if item_id == DriverMlModel.METADATA_FILE_NAME:
return _download_ml_model_metadata(job_id, item_id, user_id)
Expand Down
Loading