From 9fce8506236b0e6f27e97d58e83db68e5b035f50 Mon Sep 17 00:00:00 2001 From: Jamie Matthews Date: Thu, 4 Nov 2021 08:52:37 +0000 Subject: [PATCH 1/3] Add support for scheduling jobs to run in the future --- README.md | 9 +++++++++ django_dbq/migrations/0005_job_run_after.py | 18 ++++++++++++++++++ django_dbq/models.py | 8 +++++++- django_dbq/tests.py | 14 ++++++++++++++ 4 files changed, 48 insertions(+), 1 deletion(-) create mode 100644 django_dbq/migrations/0005_job_run_after.py diff --git a/README.md b/README.md index 5a55d00..869798f 100644 --- a/README.md +++ b/README.md @@ -129,6 +129,15 @@ Job.objects.create(name='critical_job', priority=2) Jobs will be ordered by their `priority` (highest to lowest) and then the time which they were created (oldest to newest) and processed in that order. +### Scheduling jobs +If you'd like to create a job but have it run at some time in the future, you can use the `run_after` field on the Job model: + +```python +Job.objects.create(name='scheduled_job', run_after=timezone.now() + timedelta(minutes=10)) +``` + +Of course, the job will only be run if your `python manage.py worker` process is running at the time when the job is scheduled to run. Otherwise, it will run the next time you start your worker process after that time has passed. + ## Terminology ### Job diff --git a/django_dbq/migrations/0005_job_run_after.py b/django_dbq/migrations/0005_job_run_after.py new file mode 100644 index 0000000..ad86b3b --- /dev/null +++ b/django_dbq/migrations/0005_job_run_after.py @@ -0,0 +1,18 @@ +# Generated by Django 3.2rc1 on 2021-11-04 03:32 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('django_dbq', '0004_auto_20210818_0247'), + ] + + operations = [ + migrations.AddField( + model_name='job', + name='run_after', + field=models.DateTimeField(db_index=True, null=True), + ), + ] diff --git a/django_dbq/models.py b/django_dbq/models.py index 2f51a78..a0075da 100644 --- a/django_dbq/models.py +++ b/django_dbq/models.py @@ -68,7 +68,12 @@ def delete_old(self): def to_process(self, queue_name): return self.select_for_update().filter( - queue_name=queue_name, state__in=(Job.STATES.READY, Job.STATES.NEW) + models.Q(queue_name=queue_name) & + models.Q(state__in=(Job.STATES.READY, Job.STATES.NEW)) & + models.Q( + models.Q(run_after__isnull=True) | + models.Q(run_after__lte=timezone.now()) + ) ) @@ -91,6 +96,7 @@ class STATES(TextChoices): workspace = JSONField(null=True) queue_name = models.CharField(max_length=20, default="default", db_index=True) priority = models.SmallIntegerField(default=0, db_index=True) + run_after = models.DateTimeField(null=True, db_index=True) class Meta: ordering = ["-priority", "created"] diff --git a/django_dbq/tests.py b/django_dbq/tests.py index 33df2f5..2c8e0e9 100644 --- a/django_dbq/tests.py +++ b/django_dbq/tests.py @@ -210,6 +210,20 @@ def test_gets_jobs_in_priority_and_date_order(self): self.assertEqual(Job.objects.get_ready_or_none("default"), job_1) self.assertFalse(Job.objects.to_process("default").filter(id=job_2.id).exists()) + def test_ignores_jobs_until_run_after_is_in_the_past(self): + job_1 = Job.objects.create(name="testjob") + job_2 = Job.objects.create(name="testjob", run_after=datetime(2021, 11, 4, 8)) + + with freezegun.freeze_time(datetime(2021, 11, 4, 7)): + self.assertEqual( + {job for job in Job.objects.to_process("default")}, {job_1} + ) + + with freezegun.freeze_time(datetime(2021, 11, 4, 9)): + self.assertEqual( + {job for job in Job.objects.to_process("default")}, {job_1, job_2} + ) + def test_get_next_ready_job_created(self): """ Created jobs should be picked too. From e17d9bd24b24c4bdcbd4290dedaf3f08dce17e81 Mon Sep 17 00:00:00 2001 From: Jamie Matthews Date: Thu, 4 Nov 2021 08:57:18 +0000 Subject: [PATCH 2/3] Format with black --- django_dbq/migrations/0005_job_run_after.py | 6 +++--- django_dbq/models.py | 10 +++++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/django_dbq/migrations/0005_job_run_after.py b/django_dbq/migrations/0005_job_run_after.py index ad86b3b..67a2c0d 100644 --- a/django_dbq/migrations/0005_job_run_after.py +++ b/django_dbq/migrations/0005_job_run_after.py @@ -6,13 +6,13 @@ class Migration(migrations.Migration): dependencies = [ - ('django_dbq', '0004_auto_20210818_0247'), + ("django_dbq", "0004_auto_20210818_0247"), ] operations = [ migrations.AddField( - model_name='job', - name='run_after', + model_name="job", + name="run_after", field=models.DateTimeField(db_index=True, null=True), ), ] diff --git a/django_dbq/models.py b/django_dbq/models.py index a0075da..5669861 100644 --- a/django_dbq/models.py +++ b/django_dbq/models.py @@ -68,11 +68,11 @@ def delete_old(self): def to_process(self, queue_name): return self.select_for_update().filter( - models.Q(queue_name=queue_name) & - models.Q(state__in=(Job.STATES.READY, Job.STATES.NEW)) & - models.Q( - models.Q(run_after__isnull=True) | - models.Q(run_after__lte=timezone.now()) + models.Q(queue_name=queue_name) + & models.Q(state__in=(Job.STATES.READY, Job.STATES.NEW)) + & models.Q( + models.Q(run_after__isnull=True) + | models.Q(run_after__lte=timezone.now()) ) ) From d1610281d617e5d790974462ee929691b9ff5598 Mon Sep 17 00:00:00 2001 From: Jamie Matthews Date: Thu, 4 Nov 2021 09:06:15 +0000 Subject: [PATCH 3/3] Add note to README on precision of scheduled job timing --- README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 869798f..8556a3f 100644 --- a/README.md +++ b/README.md @@ -136,7 +136,9 @@ If you'd like to create a job but have it run at some time in the future, you ca Job.objects.create(name='scheduled_job', run_after=timezone.now() + timedelta(minutes=10)) ``` -Of course, the job will only be run if your `python manage.py worker` process is running at the time when the job is scheduled to run. Otherwise, it will run the next time you start your worker process after that time has passed. +Of course, the scheduled job will only be run if your `python manage.py worker` process is running at the time when the job is scheduled to run. Otherwise, it will run the next time you start your worker process after that time has passed. + +It's also worth noting that, by default, scheduled jobs run as part of the same queue as all other jobs, and so if a job is already being processed at the time when your scheduled job is due to run, it won't run until that job has finished. If increased precision is important, you might consider using the `queue_name` feature to run a separate worker dedicated to only running scheduled jobs. ## Terminology