-
Notifications
You must be signed in to change notification settings - Fork 4
DRY refactoring #60
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DRY refactoring #60
Changes from all commits
0be276d
d287bdb
135780c
d5196c2
773c9ba
f199189
6fd27d2
2ba2bc6
b740b0d
01d2f3d
537cc7e
6dffa20
1117f5a
65a04ca
715716e
b6fde09
7f7a6ef
70301a6
b889ec3
fe186de
3df5f25
547d3d1
b2e48a6
2f638b0
3e788ec
5784752
1c8f792
d9a6efa
0d82aee
8392d24
f3779cd
3d5a223
245effb
1c1e762
b2840f2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -128,5 +128,6 @@ dmypy.json | |
# Pyre type checker | ||
.pyre/ | ||
|
||
|
||
# Plugin configuration file | ||
configuration.json | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
from os import listdir, makedirs | ||
from os.path import join, isfile, basename | ||
from shutil import copyfile | ||
from qp_klp.Step import Step | ||
|
||
|
||
class Amplicon(Step): | ||
def __init__(self, pipeline, master_qiita_job_id, sn_tid_map_by_project, | ||
status_update_callback=None): | ||
super().__init__(pipeline, | ||
master_qiita_job_id, | ||
sn_tid_map_by_project, | ||
status_update_callback) | ||
|
||
if pipeline.pipeline_type != Step.AMPLICON_TYPE: | ||
raise ValueError("Cannot create an Amplicon run using a " | ||
f"{pipeline.pipeline_type}-configured Pipeline.") | ||
|
||
def convert_bcl_to_fastq(self): | ||
# The 'bcl2fastq' key is a convention hard-coded into mg-scripts and | ||
# qp-klp projects. By design and history, amplicon jobs use EMP primers | ||
# and are demuxed downstream of qp-klp by Qiita. This necessitates the | ||
# use of bcl2fastq and a 'dummy' sample-sheet to avoid the | ||
# demultiplexing that otherwise occurs at this stage. The name and | ||
# path of the executable, the resource requirements to instantiate a | ||
# SLURM job with, etc. are stored in configuration['bcl2fastq']. | ||
config = self.pipeline.configuration['bcl2fastq'] | ||
super()._convert_bcl_to_fastq(config, self.pipeline.sample_sheet) | ||
|
||
def quality_control(self): | ||
# Quality control for Amplicon runs occurs downstream. | ||
# Do not perform QC at this time. | ||
|
||
# Simulate QCJob's output directory for use as input into FastQCJob. | ||
projects = self.pipeline.get_project_info() | ||
projects = [x['project_name'] for x in projects] | ||
|
||
for project_name in projects: | ||
# copy the files from ConvertJob output to faked QCJob output | ||
# folder: $WKDIR/$RUN_ID/QCJob/$PROJ_NAME/amplicon | ||
output_folder = join(self.pipeline.output_path, | ||
'QCJob', | ||
project_name, | ||
Step.AMPLICON_TYPE) | ||
|
||
makedirs(output_folder) | ||
|
||
raw_fastq_files_path = join(self.pipeline.output_path, | ||
'ConvertJob') | ||
|
||
# get list of all raw output files to be copied. | ||
job_output = [join(raw_fastq_files_path, x) for x in | ||
listdir(raw_fastq_files_path)] | ||
job_output = [x for x in job_output if isfile(x)] | ||
job_output = [x for x in job_output if x.endswith('fastq.gz')] | ||
# Undetermined files are very small and should be filtered from | ||
# results. | ||
job_output = [x for x in job_output if | ||
not basename(x).startswith('Undetermined')] | ||
|
||
# copy the file | ||
for fastq_file in job_output: | ||
new_path = join(output_folder, basename(fastq_file)) | ||
copyfile(fastq_file, new_path) | ||
|
||
# FastQC expects the ConvertJob output to also be organized by | ||
# project. Since this would entail running the same ConvertJob | ||
# multiple times on the same input with just a name-change in | ||
# the dummy sample-sheet, we instead perform ConvertJob once | ||
# and copy the output from ConvertJob's output folder into | ||
# ConvertJob's output folder/project1, project2 ... projectN. | ||
output_folder = join(raw_fastq_files_path, project_name) | ||
makedirs(output_folder) | ||
|
||
job_output = [join(raw_fastq_files_path, x) for x in | ||
listdir(raw_fastq_files_path)] | ||
job_output = [x for x in job_output if isfile(x) and x.endswith( | ||
'fastq.gz') and not basename(x).startswith('Undetermined')] | ||
Comment on lines
+75
to
+78
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess these 2 can be merged, no? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it might be better to keep them separate, because you want x in isfile(x) in the second one to be the full path created by join(raw_fastq_files_path, x) in the first one and if we merged them that would mean we'd have something like: job_output = [join(raw_fastq_files_path, x) for x in listdir(raw_fastq_files_path) if isfile(join(raw_fastq_files_path, x)) and x.endswith('fastq.gz') and not basename(x).startswith('Undetermined')]] The join() has to be present twice. |
||
|
||
for raw_fastq_file in job_output: | ||
new_path = join(output_folder, basename(raw_fastq_file)) | ||
copyfile(raw_fastq_file, new_path) | ||
|
||
def generate_reports(self, input_file_path): | ||
super()._generate_reports(self.pipeline.mapping_file) | ||
return None # amplicon doesn't need project names | ||
|
||
def generate_prep_file(self): | ||
config = self.pipeline.configuration['seqpro'] | ||
|
||
seqpro_path = config['seqpro_path'].replace('seqpro', 'seqpro_mf') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the plan to remove seqpro_mf? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's an easy enough change on this side of things, and a little bit of refactoring for metapool package. I can make the change by the end of the week in between other things. |
||
|
||
job = super()._generate_prep_file(config, | ||
self.pipeline.mapping_file, | ||
seqpro_path, | ||
self.project_names) | ||
|
||
self.prep_file_paths = job.prep_file_paths | ||
|
||
def generate_commands(self): | ||
super()._generate_commands() | ||
self.cmds.append(f'cd {self.pipeline.output_path}; ' | ||
'tar zcvf reports-ConvertJob.tgz ConvertJob/Reports') | ||
self.write_commands_to_output_path() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,161 @@ | ||
from os import walk | ||
from os.path import exists, join, basename | ||
from sequence_processing_pipeline.PipelineError import PipelineError | ||
import pandas as pd | ||
from qp_klp.Step import Step, FailedSamplesRecord | ||
|
||
|
||
class Metagenomic(Step): | ||
def __init__(self, pipeline, master_qiita_job_id, sn_tid_map_by_project, | ||
status_update_callback=None): | ||
super().__init__(pipeline, | ||
master_qiita_job_id, | ||
sn_tid_map_by_project, | ||
status_update_callback) | ||
|
||
if pipeline.pipeline_type not in Step.META_TYPES: | ||
raise ValueError("Cannot instantiate Metagenomic object from " | ||
f"pipeline of type '{pipeline.pipeline_type}'") | ||
|
||
# Note: FailedSamplesRecord is not used when processing amplicon as the | ||
# samples are processed as a single fastq file and hence that info | ||
# is not available. | ||
self.fsr = FailedSamplesRecord(self.pipeline.output_path, | ||
pipeline.sample_sheet.samples) | ||
self.project_names = None | ||
|
||
def convert_bcl_to_fastq(self): | ||
# The 'bcl-convert' key is a convention hard-coded into mg-scripts and | ||
# qp-klp projects. Currently meta*omic jobs use bcl-convert for its | ||
# improved performance over bcl2fastq. The name and path of the | ||
# executable, the resource requirements to instantiate a SLURM job | ||
# with, etc. are stored in configuration['bcl-convert'']. | ||
config = self.pipeline.configuration['bcl-convert'] | ||
job = super()._convert_bcl_to_fastq(config, | ||
self.pipeline.sample_sheet.path) | ||
self.fsr.write(job.audit(self.pipeline.get_sample_ids()), 'ConvertJob') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like at the end of each method something similar to this line is called, should it be more general? I mean something like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tried this out, but it seems like it will just create another function something like the following: It doesn't really do much. The meat of each audit is in each Job class, while the structure and writing of each failed samples record is in the FailedSamplesRecord class. |
||
|
||
def quality_control(self): | ||
config = self.pipeline.configuration['qc'] | ||
job = super()._quality_control(config, self.pipeline.sample_sheet.path) | ||
self.fsr.write(job.audit(self.pipeline.get_sample_ids()), 'QCJob') | ||
|
||
def generate_reports(self): | ||
job = super()._generate_reports(self.pipeline.sample_sheet.path) | ||
self.fsr.write(job.audit(self.pipeline.get_sample_ids()), 'FastQCJob') | ||
|
||
self.project_names = job.project_names | ||
|
||
def generate_prep_file(self): | ||
config = self.pipeline.configuration['seqpro'] | ||
|
||
if self.project_names is None: | ||
raise ValueError("reports not yet generated") | ||
|
||
job = super()._generate_prep_file(config, | ||
self.pipeline.sample_sheet.path, | ||
config['seqpro_path'], | ||
self.project_names) | ||
|
||
self.prep_file_paths = job.prep_file_paths | ||
|
||
def generate_commands(self, special_map, server_url, | ||
touched_studies_prep_info): | ||
super()._generate_commands() | ||
|
||
out_dir = self.pipeline.output_path | ||
output_path = self.pipeline.output_path | ||
|
||
self.cmds.append(f'cd {self.pipeline.output_path}; ' | ||
'tar zcvf logs-QCJob.tgz QCJob/logs') | ||
|
||
self.cmds.append(f'cd {self.pipeline.output_path}; ' | ||
'tar zcvf reports-ConvertJob.tgz ConvertJob/Reports ' | ||
'ConvertJob/Logs') | ||
|
||
self.write_commands_to_output_path() | ||
|
||
if self.sifs: | ||
# just use the filenames for tarballing the sifs. | ||
# the sifs should all be stored in the {out_dir} by default. | ||
tmp = [basename(x) for x in self.sifs] | ||
# convert sifs into a list of filenames. | ||
tmp = ' '.join(tmp) | ||
self.cmds.append(f'cd {out_dir}; tar zcvf sample-files.tgz {tmp}') | ||
|
||
csv_fps = [] | ||
for root, dirs, files in walk(join(output_path, 'PrepFiles')): | ||
for csv_file in files: | ||
csv_fps.append(join(root, csv_file)) | ||
|
||
touched_studies = [] | ||
|
||
for project, upload_dir, qiita_id in special_map: | ||
# sif filenames are of the form: | ||
blanks_file = f'{self.pipeline.run_id}_{project}_blanks.tsv' | ||
if self.sifs and [x for x in self.sifs if blanks_file in x]: | ||
# move uncompressed sifs to upload_dir. | ||
tmp = f'cd {out_dir}; mv {blanks_file} {upload_dir}' | ||
self.cmds.append(tmp) | ||
|
||
# record that something is being moved into a Qiita Study. | ||
# this will allow us to notify the user which Studies to | ||
# review upon completion. | ||
touched_studies.append((qiita_id, project)) | ||
|
||
if self.pipeline.pipeline_type in Step.META_TYPES: | ||
self.cmds.append(f'cd {out_dir}; tar zcvf reports-QCJob.tgz ' | ||
f'QCJob/{project}/fastp_reports_dir') | ||
|
||
if exists(f'{out_dir}/QCJob/{project}/filtered_sequences'): | ||
self.cmds.append(f'cd {out_dir}; mv ' | ||
f'QCJob/{project}/filtered_sequences/* ' | ||
f'{upload_dir}') | ||
elif exists(f'{out_dir}/QCJob/{project}/trimmed_sequences'): | ||
self.cmds.append(f'cd {out_dir}; mv ' | ||
f'QCJob/{project}/trimmed_sequences/* ' | ||
f'{upload_dir}') | ||
elif exists(f'{out_dir}/QCJob/{project}/amplicon'): | ||
self.cmds.append(f'cd {out_dir}; mv ' | ||
f'QCJob/{project}/amplicon/* ' | ||
f'{upload_dir}') | ||
else: | ||
raise PipelineError("QCJob output not in expected location") | ||
|
||
for csv_file in csv_fps: | ||
if project in csv_file: | ||
tmp = f'cd {out_dir}; mv {csv_file} {upload_dir}' | ||
self.cmds.append(tmp) | ||
break | ||
|
||
# create a set of unique study-ids that were touched by the Pipeline | ||
# and return this information to the user. | ||
touched_studies = sorted(list(set(touched_studies))) | ||
|
||
data = [] | ||
for qiita_id, project in touched_studies: | ||
for prep_id in touched_studies_prep_info[qiita_id]: | ||
study_url = f'{server_url}/study/description/{qiita_id}' | ||
prep_url = (f'{server_url}/study/description/' | ||
f'{qiita_id}?prep_id={prep_id}') | ||
data.append({'Project': project, 'Qiita Study ID': qiita_id, | ||
'Qiita Prep ID': prep_id, 'Qiita URL': study_url, | ||
'Prep URL': prep_url}) | ||
|
||
df = pd.DataFrame(data) | ||
|
||
with open(join(out_dir, 'touched_studies.html'), 'w') as f: | ||
f.write(df.to_html(border=2, index=False, justify="left", | ||
render_links=True, escape=False)) | ||
|
||
# copy all tgz files, including sample-files.tgz, to final_results. | ||
self.cmds.append(f'cd {out_dir}; mv *.tgz final_results') | ||
self.cmds.append(f'cd {out_dir}; mv FastQCJob/multiqc final_results') | ||
|
||
if exists(join(out_dir, 'touched_studies.html')): | ||
tmp = f'cd {out_dir}; mv touched_studies.html final_results' | ||
self.cmds.append(tmp) | ||
|
||
if exists(join(out_dir, 'failed_samples.html')): | ||
tmp = f'cd {out_dir}; mv failed_samples.html final_results' | ||
self.cmds.append(tmp) |
Uh oh!
There was an error while loading. Please reload this page.