Skip to content

Commit b5f3b47

Browse files
committed
RESTJob.download_result and execute_batch: finetune and unit testing
1 parent e57aa6b commit b5f3b47

File tree

3 files changed

+82
-40
lines changed

3 files changed

+82
-40
lines changed

openeo/rest/imagecollectionclient.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -969,7 +969,7 @@ def execute_batch(
969969
self,
970970
outputfile: str, out_format: str = None,
971971
print=print, max_poll_interval=60, connection_retry_interval=30,
972-
job_options = None,**format_options):
972+
job_options=None, **format_options):
973973
"""
974974
Evaluate the process graph by creating a batch job, and retrieving the results when it is finished.
975975
This method is mostly recommended if the batch job is expected to run in a reasonable amount of time.
@@ -986,12 +986,13 @@ def execute_batch(
986986
job = self.send_job(out_format,job_options=job_options, **format_options)
987987
job.start_job()
988988

989-
job_id = None
989+
job_id = job.job_id
990990
job_info = None
991991
status = None
992-
poll_interval = 5
992+
poll_interval = min(5, max_poll_interval)
993993
start_time = time.time()
994994
while True:
995+
# TODO: also allow a hard time limit on this infinite poll loop?
995996
elapsed = str(datetime.timedelta(seconds=time.time() - start_time))
996997
try:
997998
job_info = job.describe_job()
@@ -1000,11 +1001,11 @@ def execute_batch(
10001001
time.sleep(connection_retry_interval)
10011002
continue
10021003

1003-
job_id = job_info.get("id", "N/A")
10041004
status = job_info.get("status", "N/A")
10051005
print("{t} Job {i}: {s} (progress {p})".format(
1006-
t=elapsed, i=job_id, s=status, p=job_info.get("progress", "N/A"))
1007-
)
1006+
t=elapsed, i=job_id, s=status,
1007+
p='{p}%'.format(p=job_info["progress"]) if "progress" in job_info else "N/A"
1008+
))
10081009
if status not in ('submitted', 'queued', 'running'):
10091010
break
10101011

openeo/rest/job.py

Lines changed: 26 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
1+
import logging
12
import typing
23
import urllib.request
34
from typing import List
45

56
from openeo.job import Job, JobResult
7+
from openeo.rest import OpenEoClientException
68

79
if hasattr(typing, 'TYPE_CHECKING') and typing.TYPE_CHECKING:
810
# Only import this for type hinting purposes. Runtime import causes circular dependency issues.
911
# Note: the `hasattr` check is necessary for Python versions before 3.5.2.
1012
from openeo.rest.connection import Connection
1113

14+
logger = logging.getLogger(__name__)
15+
1216

1317
class RESTJobResult(JobResult):
1418
def __init__(self, url):
@@ -54,9 +58,10 @@ def estimate_job(self):
5458
def start_job(self):
5559
""" Start / queue a job for processing."""
5660
# POST /jobs/{job_id}/results
57-
request = self.connection.post("/jobs/{}/results".format(self.job_id))
58-
59-
return request.status_code
61+
url = "/jobs/{}/results".format(self.job_id)
62+
request = self.connection.post(url)
63+
if request.status_code != 202:
64+
logger.warning("{u} returned with status code {s} instead of 202".format(u=url, s=request.status_code))
6065

6166
def stop_job(self):
6267
""" Stop / cancel job processing."""
@@ -72,34 +77,24 @@ def list_results(self, type=None):
7277

7378
def download_results(self, target):
7479
""" Download job results."""
75-
# GET /jobs/{job_id}/results > ...
76-
77-
download_url = "/jobs/{}/results".format(self.job_id)
78-
r = self.connection.get(download_url, stream=True)
79-
80-
if r.status_code == 200:
81-
82-
url = r.json()
83-
if "links" in url:
84-
download_url = url["links"][0]
85-
if "href" in download_url:
86-
download_url = download_url["href"]
87-
88-
with open(target, 'wb') as handle:
89-
response = self.connection.get(download_url, stream=True)
90-
91-
for block in response.iter_content(1024):
92-
93-
if not block:
94-
break
95-
96-
handle.write(block)
97-
else:
98-
raise ConnectionAbortedError(r.text)
99-
return r.status_code
100-
101-
# TODO: All below methods are deprecated (at least not specified in the coreAPI)
102-
def download(self, outputfile:str, outputformat=None):
80+
results_url = "/jobs/{}/results".format(self.job_id)
81+
r = self.connection.get(results_url, expected_status=200)
82+
links = r.json()["links"]
83+
if len(links) != 1:
84+
# TODO handle download of multiple files?
85+
raise OpenEoClientException("Expected 1 result file to download, but got {c}".format(c=len(links)))
86+
file_url = links[0]["href"]
87+
88+
with open(target, 'wb') as handle:
89+
response = self.connection.get(file_url, stream=True)
90+
for block in response.iter_content(1024):
91+
if not block:
92+
break
93+
handle.write(block)
94+
return target
95+
96+
# TODO: All below methods are deprecated (at least not specified in the coreAPI)
97+
def download(self, outputfile: str, outputformat=None):
10398
""" Download the result as a raster."""
10499
try:
105100
return self.connection.download_job(self.job_id, outputfile, outputformat)
@@ -117,6 +112,3 @@ def queue(self):
117112
def results(self) -> List[RESTJobResult]:
118113
""" Returns this job's results. """
119114
return [RESTJobResult(link['href']) for link in self.connection.job_results(self.job_id)['links']]
120-
121-
122-

tests/rest/test_job.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import re
2+
3+
import openeo
4+
import pytest
5+
6+
API_URL = "https://oeo.net"
7+
8+
9+
@pytest.fixture
10+
def session040(requests_mock):
11+
requests_mock.get(API_URL + "/", json={"api_version": "0.4.0"})
12+
session = openeo.connect(API_URL)
13+
return session
14+
15+
16+
def test_execute_batch(session040, requests_mock, tmpdir):
17+
requests_mock.get(API_URL + "/collections/SENTINEL2", json={"foo": "bar"})
18+
requests_mock.post(API_URL + "/jobs", headers={"OpenEO-Identifier": "f00ba5"})
19+
requests_mock.post(API_URL + "/jobs/f00ba5/results")
20+
requests_mock.get(API_URL + "/jobs/f00ba5", [
21+
{'json': {"status": "submitted"}},
22+
{'json': {"status": "queued"}},
23+
{'json': {"status": "running", "progress": 15}},
24+
{'json': {"status": "running", "progress": 80}},
25+
{'json': {"status": "finished", "progress": 100}},
26+
])
27+
requests_mock.get(API_URL + "/jobs/f00ba5/results", json={
28+
"links": [{"href": API_URL + "/jobs/f00ba5/files/output.tiff"}]
29+
})
30+
requests_mock.get(API_URL + "/jobs/f00ba5/files/output.tiff", text="tiffdata")
31+
32+
path = tmpdir.join("tmp.tiff")
33+
log = []
34+
35+
def print(msg):
36+
log.append(msg)
37+
38+
session040.load_collection("SENTINEL2").execute_batch(
39+
outputfile=str(path), out_format="GTIFF",
40+
max_poll_interval=.1, print=print
41+
)
42+
43+
assert re.match(r"0:00:00(.0\d*)? Job f00ba5: submitted \(progress N/A\)", log[0])
44+
assert re.match(r"0:00:00.1\d* Job f00ba5: queued \(progress N/A\)", log[1])
45+
assert re.match(r"0:00:00.2\d* Job f00ba5: running \(progress 15%\)", log[2])
46+
assert re.match(r"0:00:00.3\d* Job f00ba5: running \(progress 80%\)", log[3])
47+
assert re.match(r"0:00:00.4\d* Job f00ba5: finished \(progress 100%\)", log[4])
48+
49+
assert path.read() == "tiffdata"

0 commit comments

Comments
 (0)