diff --git a/qiita_db/private.py b/qiita_db/private.py index 78a286f51..0f10187ca 100644 --- a/qiita_db/private.py +++ b/qiita_db/private.py @@ -47,8 +47,27 @@ def build_analysis_files(job): j.submit() sleep(1) + # The validator jobs no longer finish the job automatically so we need + # to release the validators here + job.release_validators() -TASK_DICT = {'build_analysis_files': build_analysis_files} + +def release_validators(job): + """Waits until all the validators of a job are completed + + Parameters + ---------- + job : qiita_db.processing_job.ProcessingJob + The processing job with the information of the parent job + """ + with qdb.sql_connection.TRN: + qdb.processing_job.ProcessingJob( + job.parameters.values['job']).release_validators() + job._set_status('success') + + +TASK_DICT = {'build_analysis_files': build_analysis_files, + 'release_validators': release_validators} def private_task(job_id): diff --git a/qiita_db/processing_job.py b/qiita_db/processing_job.py index fa4d93341..86453d4f5 100644 --- a/qiita_db/processing_job.py +++ b/qiita_db/processing_job.py @@ -14,6 +14,7 @@ from itertools import chain from collections import defaultdict from json import dumps, loads +from time import sleep from future.utils import viewitems, viewvalues import networkx as nx @@ -420,8 +421,9 @@ def release_validators(self): "Only artifact transformation and private jobs can " "release validators") - # Check if all the validators are ready by checking that there is - # no validator processing job whose status is not waiting + # Check if all the validators are completed. Validator jobs can be + # in two states when completed: 'waiting' in case of success + # or 'error' otherwise sql = """SELECT COUNT(1) FROM qiita.processing_job_validator pjv JOIN qiita.processing_job pj ON @@ -429,12 +431,49 @@ def release_validators(self): JOIN qiita.processing_job_status USING (processing_job_status_id) WHERE pjv.processing_job_id = %s - AND processing_job_status != %s""" - qdb.sql_connection.TRN.add(sql, [self.id, 'waiting']) + AND processing_job_status NOT IN %s""" + sql_args = [self.id, ('waiting', 'error')] + qdb.sql_connection.TRN.add(sql, sql_args) remaining = qdb.sql_connection.TRN.execute_fetchlast() - if remaining == 0: - # All validators have completed + # Active polling - wait until all validator jobs are completed + while remaining != 0: + self.step = "Validating outputs (%d remaining)" % remaining + sleep(10) + qdb.sql_connection.TRN.add(sql, sql_args) + remaining = qdb.sql_connection.TRN.execute_fetchlast() + + # Check if any of the validators errored + sql = """SELECT validator_id + FROM qiita.processing_job_validator pjv + JOIN qiita.processing_job pj + ON pjv.validator_id = pj.processing_job_id + JOIN qiita.processing_job_status USING + (processing_job_status_id) + WHERE pjv.processing_job_id = %s AND + processing_job_status = %s""" + qdb.sql_connection.TRN.add(sql, [self.id, 'error']) + errored = qdb.sql_connection.TRN.execute_fetchflatten() + + if errored: + # At least one of the validators failed, Set the rest of the + # validators and the current job as failed + qdb.sql_connection.TRN.add(sql, [self.id, 'waiting']) + waiting = qdb.sql_connection.TRN.execute_fetchflatten() + + common_error = "\n".join( + ["Validator %s error message: %s" + % (j, ProcessingJob(j).log.msg) for j in errored]) + + val_error = "%d sister validator jobs failed: %s" % ( + len(errored), common_error) + for j in waiting: + ProcessingJob(j)._set_error(val_error) + + self._set_error('%d validator jobs failed: %s' + % (len(errored), common_error)) + else: + # All validators have successfully completed sql = """SELECT validator_id FROM qiita.processing_job_validator WHERE processing_job_id = %s""" @@ -460,8 +499,6 @@ def release_validators(self): self._update_and_launch_children(mapping) self._set_status('success') - else: - self.step = "Validating outputs (%d remaining)" % remaining def _complete_artifact_definition(self, artifact_data): """"Performs the needed steps to complete an artifact definition job @@ -487,7 +524,6 @@ def _complete_artifact_definition(self, artifact_data): if job_params['provenance'] is not None: # The artifact is a result from a previous job provenance = loads(job_params['provenance']) - job = ProcessingJob(provenance['job']) if provenance.get('data_type') is not None: artifact_data = {'data_type': provenance['data_type'], 'artifact_data': artifact_data} @@ -500,7 +536,6 @@ def _complete_artifact_definition(self, artifact_data): qdb.sql_connection.TRN.execute() # Can't create the artifact until all validators are completed self._set_status('waiting') - job.release_validators() else: # The artifact is uploaded by the user or is the initial # artifact of an analysis @@ -619,6 +654,16 @@ def _complete_artifact_transformation(self, artifacts_data): for j in validator_jobs: j.submit() + # Submit the job that will release all the validators + plugin = qdb.software.Software.from_name_and_version( + 'Qiita', 'alpha') + cmd = plugin.get_command('release_validators') + params = qdb.software.Parameters.load( + cmd, values_dict={'job': self.id}) + job = ProcessingJob.create(self.user, params) + # Doing the submission outside of the transaction + job.submit() + def _set_validator_jobs(self, validator_jobs): """Sets the validator jobs for the current job @@ -673,15 +718,6 @@ def complete(self, success, artifacts_data=None, error=None): else: self._set_status('success') else: - if self.command.software.type == 'artifact definition': - job_params = self.parameters.values - if job_params.get('provenance') is not None: - # This artifact definition job is a result of a command - # run, if it fails, set up the status of the "parent" - # job also as failed, and assign the sem error message - provenance = loads(job_params['provenance']) - job = ProcessingJob(provenance['job']) - job._set_error(error) self._set_error(error) @property diff --git a/qiita_db/support_files/patches/57.sql b/qiita_db/support_files/patches/57.sql new file mode 100644 index 000000000..ac5970659 --- /dev/null +++ b/qiita_db/support_files/patches/57.sql @@ -0,0 +1,19 @@ +-- Aug 8, 2017 +-- Add release validators internal Qiita command + +DO $do$ +DECLARE + qiita_sw_id bigint; + rv_cmd_id bigint; +BEGIN + SELECT software_id INTO qiita_sw_id + FROM qiita.software + WHERE name = 'Qiita' AND version = 'alpha'; + + INSERT INTO qiita.software_command (software_id, name, description) + VALUES (qiita_sw_id, 'release_validators', 'Releases the job validators') + RETURNING command_id INTO rv_cmd_id; + + INSERT INTO qiita.command_parameter (command_id, parameter_name, parameter_type, required, default_value) + VALUES (rv_cmd_id, 'job', 'string', True, NULL); +END $do$; diff --git a/qiita_db/test/test_artifact.py b/qiita_db/test/test_artifact.py index 6441f6baf..ca4160c3b 100644 --- a/qiita_db/test/test_artifact.py +++ b/qiita_db/test/test_artifact.py @@ -1011,6 +1011,7 @@ def test_delete_as_output_job(self): job.complete(True, artifacts_data=data) job = qdb.processing_job.ProcessingJob( "bcc7ebcd-39c1-43e4-af2d-822e3589f14d") + job.release_validators() artifact = job.outputs['OTU table'] self._clean_up_files.extend([afp for _, afp, _ in artifact.filepaths]) diff --git a/qiita_db/test/test_processing_job.py b/qiita_db/test/test_processing_job.py index 0bfdbb540..84fb9ef4f 100644 --- a/qiita_db/test/test_processing_job.py +++ b/qiita_db/test/test_processing_job.py @@ -359,6 +359,12 @@ def test_complete_multiple_outputs(self): artifact_data_2 = {'filepaths': [(fp2, 'biom')], 'artifact_type': 'BIOM'} obs2._complete_artifact_definition(artifact_data_2) + self.assertEqual(obs1.status, 'waiting') + self.assertEqual(obs2.status, 'waiting') + self.assertEqual(job.status, 'running') + + job.release_validators() + self.assertEqual(obs1.status, 'success') self.assertEqual(obs2.status, 'success') self.assertEqual(job.status, 'success') @@ -386,7 +392,8 @@ def test_complete_artifact_definition(self): qdb.user.User('test@foo.bar'), params) job._set_validator_jobs([obs]) obs._complete_artifact_definition(artifact_data) - self.assertEqual(job.status, 'success') + self.assertEqual(obs.status, 'waiting') + self.assertEqual(job.status, 'running') # Upload case implicitly tested by "test_complete_type" def test_complete_artifact_transformation(self): @@ -476,7 +483,9 @@ def test_complete_success(self): obsjobs = set(self._get_all_job_ids()) - self.assertEqual(len(obsjobs), len(alljobs) + 1) + # The complete call above submits 2 new jobs: the validator job and + # the release validators job. Hence the +2 + self.assertEqual(len(obsjobs), len(alljobs) + 2) self._wait_for_job(job) def test_complete_failure(self): @@ -501,12 +510,17 @@ def test_complete_failure(self): ) obs = qdb.processing_job.ProcessingJob.create( qdb.user.User('test@foo.bar'), params) + job._set_validator_jobs([obs]) obs.complete(False, error="Validation failure") self.assertEqual(obs.status, 'error') self.assertEqual(obs.log.msg, 'Validation failure') + self.assertEqual(job.status, 'running') + job.release_validators() self.assertEqual(job.status, 'error') - self.assertEqual(job.log.msg, 'Validation failure') + self.assertEqual( + job.log.msg, '1 validator jobs failed: Validator %s ' + 'error message: Validation failure' % obs.id) def test_complete_error(self): with self.assertRaises( @@ -628,6 +642,7 @@ def test_outputs(self): job._set_validator_jobs([obs]) exp_artifact_count = qdb.util.get_count('qiita.artifact') + 1 obs._complete_artifact_definition(artifact_data) + job.release_validators() self.assertEqual(job.status, 'success') obs = job.outputs