Skip to content

Hv issue719 job manager threaded job start #736

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 73 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
3335070
Issue #719 job manager WIP: start jobs in worker thread
soxofaan Feb 14, 2025
ff80f58
Issue #719/#730 Some quick-fixes to avoid hanging tests
soxofaan Feb 14, 2025
d5c65c1
post_process queue
HansVRP Feb 19, 2025
d6bf073
fix concurrency
HansVRP Feb 19, 2025
c5f47bc
fix threaded unit test
HansVRP Feb 19, 2025
3162a3f
fix final unit test
HansVRP Feb 19, 2025
ac57284
fix final unit test
HansVRP Feb 19, 2025
138ef85
propose additional unit tests
HansVRP Feb 19, 2025
d0ffda7
fix linting
HansVRP Feb 19, 2025
1e4db19
improve logging
HansVRP Feb 20, 2025
4461d03
update
HansVRP Feb 20, 2025
5f43281
fix oversight
HansVRP Feb 20, 2025
55d9def
fix oversight
HansVRP Feb 20, 2025
eeaec40
fix unit test
HansVRP Feb 20, 2025
a5d5967
simplify
HansVRP Feb 20, 2025
07adae9
test
HansVRP Feb 24, 2025
69ceb80
fix log test
HansVRP Feb 24, 2025
a6f94bc
homogeneous unit tests
HansVRP Feb 25, 2025
703d150
unit tests
HansVRP Feb 26, 2025
85ea738
split off worker thread logic
HansVRP Feb 26, 2025
8eafa54
lastest updates
HansVRP Feb 28, 2025
0380b33
fix
HansVRP Feb 28, 2025
b589afb
introduce threadpool
HansVRP Feb 28, 2025
2dcb484
revise feedback
HansVRP Feb 28, 2025
d99996a
fix endless unit test
HansVRP Mar 14, 2025
eeb8ab0
fix tests
HansVRP Mar 14, 2025
da0e961
fix tests
HansVRP Mar 14, 2025
8529efc
clean up print statements
HansVRP Mar 14, 2025
c154706
clean up
HansVRP Mar 14, 2025
b3dbca4
clean up
HansVRP Mar 14, 2025
1157507
work on feedback
HansVRP Mar 17, 2025
768b2fc
fix unit tests
HansVRP Mar 17, 2025
af015ed
fix unit tests
HansVRP Mar 17, 2025
869d1c3
Merge remote-tracking branch 'origin/master' into hv_issue719-job-man…
soxofaan Mar 17, 2025
fce55c8
PR #736 Code style cleanup
soxofaan Mar 17, 2025
64a4ad9
split off job class
HansVRP Mar 20, 2025
3e8a2e6
split off job class
HansVRP Mar 20, 2025
fabc346
add job manager unit test
HansVRP Mar 20, 2025
69e9dee
fix
HansVRP Mar 20, 2025
b367062
test printing caplog
HansVRP Mar 20, 2025
7ea8202
fix logging
HansVRP Mar 20, 2025
da728d1
generalize
HansVRP Mar 25, 2025
9f02ae6
status update
HansVRP Mar 26, 2025
c49feb1
atomic updates to dataframe
HansVRP Apr 3, 2025
a9c3c8b
simplification
HansVRP Apr 3, 2025
ed70d0f
add flexibility
HansVRP Apr 3, 2025
a9b1fb9
improve naming
HansVRP Apr 3, 2025
ecd04f4
clean up
HansVRP Apr 4, 2025
9fa693d
small refactor and clean up
HansVRP Apr 16, 2025
24ccda3
fixing unit tests
HansVRP Apr 17, 2025
7b6efe7
fix
HansVRP Apr 17, 2025
69147eb
don't break stac database
HansVRP Apr 17, 2025
8b51f84
fix
HansVRP Apr 17, 2025
14f1971
additional testing for processingworkerupdates
HansVRP Apr 17, 2025
48c4565
added 'integration test'
HansVRP Apr 17, 2025
0860840
Merge remote-tracking branch 'origin/master' into hv_issue719-job-man…
HansVRP Apr 17, 2025
c28a2c5
always provide a task_resukt
HansVRP Apr 17, 2025
23ce981
docstrings
HansVRP Apr 18, 2025
54c8f97
Merge remote-tracking branch 'origin/master' into hv_issue719-job-man…
soxofaan Apr 30, 2025
4b6aeea
fixup! don't break stac database
soxofaan Apr 30, 2025
30c65db
PR #736 Code style cleanup
soxofaan Apr 30, 2025
f6e2bc0
PR #736 various tweaks based on review notes
soxofaan Apr 30, 2025
8c238c4
fixup! PR #736 various tweaks based on review notes
soxofaan Apr 30, 2025
78337ec
fixup! PR #736 various tweaks based on review notes
soxofaan Apr 30, 2025
f359f56
PR #736 finetune test_thread_worker
soxofaan Apr 30, 2025
23ac000
Merge branch 'master' into hv_issue719-job-manager-threaded-job-start
HansVRP May 15, 2025
a13fee0
Merge branch 'hv_issue719-job-manager-threaded-job-start' of https://…
HansVRP May 15, 2025
7c0a4f4
remove update_row and rely on persisting
HansVRP May 15, 2025
0cfa735
include row index as a standard for tracking and updating the datefra…
HansVRP May 16, 2025
0dc56e8
start adapting the unit tests to take in df_idx
HansVRP May 16, 2025
ccb28e1
further fixes to unit test
HansVRP May 16, 2025
e5763a2
improve error logging and add unit testing
HansVRP May 19, 2025
bb4c70b
Merge remote-tracking branch 'origin/master' into hv_issue719-job-man…
soxofaan May 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 130 additions & 21 deletions openeo/extra/job_management/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,17 @@
from requests.adapters import HTTPAdapter, Retry

from openeo import BatchJob, Connection
from openeo.extra.job_management._thread_worker import (
_JobManagerWorkerThreadPool,
_JobStartTask,
)
from openeo.internal.processes.parse import (
Parameter,
Process,
parse_remote_process_definition,
)
from openeo.rest import OpenEoApiError
from openeo.rest.auth.auth import BearerAuth
from openeo.util import LazyLoadCache, deep_get, repr_truncate, rfc3339

_log = logging.getLogger(__name__)
Expand Down Expand Up @@ -105,6 +110,7 @@ def get_by_status(self, statuses: List[str], max=None) -> pd.DataFrame:
"""
...


def _start_job_default(row: pd.Series, connection: Connection, *args, **kwargs):
raise NotImplementedError("No 'start_job' callable provided")

Expand Down Expand Up @@ -186,6 +192,7 @@ def start_job(

# Expected columns in the job DB dataframes.
# TODO: make this part of public API when settled?
# TODO: move non official statuses to seperate column (not_started, queued_for_start)
_COLUMN_REQUIREMENTS: Mapping[str, _ColumnProperties] = {
"id": _ColumnProperties(dtype="str"),
"backend_name": _ColumnProperties(dtype="str"),
Expand Down Expand Up @@ -222,6 +229,7 @@ def __init__(
datetime.timedelta(seconds=cancel_running_job_after) if cancel_running_job_after is not None else None
)
self._thread = None
self._worker_pool = None

def add_backend(
self,
Expand Down Expand Up @@ -358,21 +366,27 @@ def start_job_thread(self, start_job: Callable[[], BatchJob], job_db: JobDatabas
_log.info(f"Resuming `run_jobs` from existing {job_db}")

self._stop_thread = False
self._worker_pool = _JobManagerWorkerThreadPool()

def run_loop():

# TODO: support user-provided `stats`
stats = collections.defaultdict(int)

while (
sum(job_db.count_by_status(statuses=["not_started", "created", "queued", "running"]).values()) > 0
sum(
job_db.count_by_status(
statuses=["not_started", "created", "queued", "queued_for_start", "running"]
).values()
)
> 0
and not self._stop_thread
):
self._job_update_loop(job_db=job_db, start_job=start_job)
self._job_update_loop(job_db=job_db, start_job=start_job, stats=stats)
stats["run_jobs loop"] += 1

# Show current stats and sleep
_log.info(f"Job status histogram: {job_db.count_by_status()}. Run stats: {dict(stats)}")
# Do sequence of micro-sleeps to allow for quick thread exit
for _ in range(int(max(1, self.poll_sleep))):
time.sleep(1)
if self._stop_thread:
Expand All @@ -391,6 +405,8 @@ def stop_job_thread(self, timeout_seconds: Optional[float] = _UNSET):

.. versionadded:: 0.32.0
"""
self._worker_pool.shutdown()

if self._thread is not None:
self._stop_thread = True
if timeout_seconds is _UNSET:
Expand Down Expand Up @@ -493,7 +509,16 @@ def run_jobs(
# TODO: support user-provided `stats`
stats = collections.defaultdict(int)

while sum(job_db.count_by_status(statuses=["not_started", "created", "queued", "running"]).values()) > 0:
self._worker_pool = _JobManagerWorkerThreadPool()

while (
sum(
job_db.count_by_status(
statuses=["not_started", "created", "queued_for_start", "queued", "running"]
).values()
)
> 0
):
self._job_update_loop(job_db=job_db, start_job=start_job, stats=stats)
stats["run_jobs loop"] += 1

Expand All @@ -502,6 +527,9 @@ def run_jobs(
time.sleep(self.poll_sleep)
stats["sleep"] += 1

# TODO; run post process after shutdown once more to ensure completion?
self._worker_pool.shutdown()

return stats

def _job_update_loop(
Expand All @@ -524,7 +552,7 @@ def _job_update_loop(
not_started = job_db.get_by_status(statuses=["not_started"], max=200).copy()
if len(not_started) > 0:
# Check number of jobs running at each backend
running = job_db.get_by_status(statuses=["created", "queued", "running"])
running = job_db.get_by_status(statuses=["created", "queued", "queued_for_start", "running"])
stats["job_db get_by_status"] += 1
per_backend = running.groupby("backend_name").size().to_dict()
_log.info(f"Running per backend: {per_backend}")
Expand All @@ -541,7 +569,9 @@ def _job_update_loop(
stats["job_db persist"] += 1
total_added += 1

# Act on jobs
self._process_threadworker_updates(self._worker_pool, job_db, stats)

# TODO: move this back closer to the `_track_statuses` call above, once job done/error handling is also handled in threads?
for job, row in jobs_done:
self.on_job_done(job, row)

Expand All @@ -551,7 +581,6 @@ def _job_update_loop(
for job, row in jobs_cancel:
self.on_job_cancel(job, row)


def _launch_job(self, start_job, df, i, backend_name, stats: Optional[dict] = None):
"""Helper method for launching jobs

Expand Down Expand Up @@ -598,26 +627,106 @@ def _launch_job(self, start_job, df, i, backend_name, stats: Optional[dict] = No
df.loc[i, "start_time"] = rfc3339.now_utc()
if job:
df.loc[i, "id"] = job.job_id
_log.info(f"Job created: {job.job_id}")
with ignore_connection_errors(context="get status"):
status = job.status()
stats["job get status"] += 1
df.loc[i, "status"] = status
if status == "created":
# start job if not yet done by callback
try:
job.start()
stats["job start"] += 1
df.loc[i, "status"] = job.status()
stats["job get status"] += 1
job_con = job.connection
task = _JobStartTask(
root_url=job_con.root_url,
bearer_token=job_con.auth.bearer if isinstance(job_con.auth, BearerAuth) else None,
job_id=job.job_id,
df_idx = i
)
_log.info(f"Submitting task {task} to thread pool")
self._worker_pool.submit_task(task)

stats["job_queued_for_start"] += 1
df.loc[i, "status"] = "queued_for_start"
except OpenEoApiError as e:
_log.error(e)
df.loc[i, "status"] = "start_failed"
stats["job start error"] += 1
_log.info(f"Failed submitting task {task} to thread pool with error: {e}")
df.loc[i, "status"] = "queued_for_start_failed"
stats["job queued for start failed"] += 1
else:
# TODO: what is this "skipping" about actually?
df.loc[i, "status"] = "skipped"
stats["start_job skipped"] += 1

def _process_threadworker_updates(
self,
worker_pool: '_JobManagerWorkerThreadPool',
job_db: 'JobDatabaseInterface',
stats: Dict[str, int],
) -> None:
"""
Fetches completed TaskResult objects from the worker pool and applies
their db_update and stats_updates. Only existing DataFrame rows
(matched by df_idx) are upserted via job_db.persist(). Any results
targeting unknown df_idx indices are logged as errors but not persisted.



:param worker_pool: Thread-pool managing asynchronous Task executes
:param job_db: Interface to append/upsert to the job database
:param stats: Dictionary accumulating statistic counters
"""
# Retrieve completed task results immediately
results, _ = worker_pool.process_futures(timeout=0)
if not isinstance(results, list):
raise TypeError(f"Expected list of TaskResult, got {results}")

# Collect update dicts
updates: List[Dict[str, Any]] = []
for res in results:
# Process database updates
if res.db_update:
try:
if 'id' in res.db_update or 'df_idx' in res.db_update:
raise KeyError("db_update must not override 'id' or 'df_idx'")
updates.append({
'id': res.job_id,
'df_idx': res.df_idx,
**res.db_update,
})
except Exception as e:
_log.error(f"Skipping invalid db_update for job '{res.job_id}': {e}")

# Process stats updates
if res.stats_update:
for key, val in res.stats_update.items():
try:
count = int(val)
stats[key] = stats.get(key, 0) + count
except Exception:
_log.error(
f"Skipping invalid stats_update for job '{res.job_id}': "
f"key={key!r}, val={val!r}"
)

# No valid updates: nothing to persist
if not updates:
return

# Build DataFrame of updates indexed by df_idx
df_updates = pd.DataFrame(updates).set_index('df_idx', drop=True)

# Determine which rows to upsert
existing_indices = set(df_updates.index).intersection(job_db.read().index)
if existing_indices:
df_upsert = df_updates.loc[sorted(existing_indices)]
job_db.persist(df_upsert)
stats['job_db persist'] = stats.get('job_db persist', 0) + 1

# Any df_idx not in original index are errors
missing = set(df_updates.index) - existing_indices
if missing:
_log.error(f"Unknown df_idx values, skipping updates for: {sorted(missing)}")


def on_job_done(self, job: BatchJob, row):
"""
Handles jobs that have finished. Can be overridden to provide custom behaviour.
Expand Down Expand Up @@ -673,20 +782,19 @@ def _cancel_prolonged_job(self, job: BatchJob, row):
try:
# Ensure running start time is valid
job_running_start_time = rfc3339.parse_datetime(row.get("running_start_time"), with_timezone=True)

# Parse the current time into a datetime object with timezone info
current_time = rfc3339.parse_datetime(rfc3339.now_utc(), with_timezone=True)

# Calculate the elapsed time between job start and now
elapsed = current_time - job_running_start_time

if elapsed > self._cancel_running_job_after:

_log.info(
f"Cancelling long-running job {job.job_id} (after {elapsed}, running since {job_running_start_time})"
)
job.stop()

except Exception as e:
_log.error(f"Unexpected error while handling job {job.job_id}: {e}")

Expand Down Expand Up @@ -715,7 +823,7 @@ def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] =
"""
stats = stats if stats is not None else collections.defaultdict(int)

active = job_db.get_by_status(statuses=["created", "queued", "running"]).copy()
active = job_db.get_by_status(statuses=["created", "queued", "queued_for_start", "running"]).copy()

jobs_done = []
jobs_error = []
Expand All @@ -737,7 +845,7 @@ def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] =
f"Status of job {job_id!r} (on backend {backend_name}) is {new_status!r} (previously {previous_status!r})"
)

if new_status == "finished":
if previous_status != "finished" and new_status == "finished":
stats["job finished"] += 1
jobs_done.append((the_job, active.loc[i]))

Expand All @@ -749,7 +857,7 @@ def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] =
stats["job canceled"] += 1
jobs_cancel.append((the_job, active.loc[i]))

if previous_status in {"created", "queued"} and new_status == "running":
if previous_status in {"created", "queued", "queued_for_start"} and new_status == "running":
stats["job started running"] += 1
active.loc[i, "running_start_time"] = rfc3339.now_utc()

Expand Down Expand Up @@ -873,11 +981,12 @@ def get_by_status(self, statuses, max=None) -> pd.DataFrame:

def _merge_into_df(self, df: pd.DataFrame):
if self._df is not None:
self._df.update(df, overwrite=True)
self._df.update(df, overwrite=True) #TODO index is not consistent so this creates an issue. --> when we get a row, we need to give the right index to the row. Best solution when creating a task; pass this one along
else:
self._df = df



class CsvJobDatabase(FullDataFrameJobDatabase):
"""
Persist/load job metadata with a CSV file.
Expand Down
Loading
Loading