|
29 | 29 | import math
|
30 | 30 | import os
|
31 | 31 | import tempfile
|
| 32 | +import textwrap |
32 | 33 |
|
33 | 34 | import msprime
|
34 | 35 | import numpy as np
|
35 | 36 | import pytest
|
36 | 37 | import vcf
|
37 | 38 |
|
| 39 | +import tests |
38 | 40 | import tests.test_wright_fisher as wf
|
39 | 41 | import tskit
|
40 | 42 | from tests import tsutil
|
@@ -638,3 +640,192 @@ def test_defaults(self):
|
638 | 640 | assert ts.num_sites > 0
|
639 | 641 | with ts_to_pyvcf(ts) as vcf_reader:
|
640 | 642 | assert vcf_reader.samples == ["tsk_0", "tsk_1"]
|
| 643 | + |
| 644 | + |
| 645 | +def drop_header(s): |
| 646 | + return "\n".join(line for line in s.splitlines() if not line.startswith("##")) |
| 647 | + |
| 648 | + |
| 649 | +class TestMasking: |
| 650 | + @tests.cached_example |
| 651 | + def ts(self): |
| 652 | + ts = tskit.Tree.generate_balanced(3, span=10).tree_sequence |
| 653 | + ts = tsutil.insert_branch_sites(ts) |
| 654 | + return ts |
| 655 | + |
| 656 | + @pytest.mark.parametrize("mask", [[True], np.zeros(5, dtype=bool), []]) |
| 657 | + def test_site_mask_wrong_size(self, mask): |
| 658 | + with pytest.raises(ValueError, match="Site mask must be"): |
| 659 | + self.ts().as_vcf(site_mask=mask) |
| 660 | + |
| 661 | + @pytest.mark.parametrize("mask", [[[0, 1], [1, 0]], "abcd"]) |
| 662 | + def test_site_mask_bad_type(self, mask): |
| 663 | + # converting to a bool array is pretty lax in what's allows. |
| 664 | + with pytest.raises(ValueError, match="Site mask must be"): |
| 665 | + self.ts().as_vcf(site_mask=mask) |
| 666 | + |
| 667 | + @pytest.mark.parametrize("mask", [[[0, 1], [1, 0]], "abcd"]) |
| 668 | + def test_sample_mask_bad_type(self, mask): |
| 669 | + # converting to a bool array is pretty lax in what's allows. |
| 670 | + with pytest.raises(ValueError, match="Sample mask must be"): |
| 671 | + self.ts().as_vcf(sample_mask=mask) |
| 672 | + |
| 673 | + def test_no_masks(self): |
| 674 | + s = """\ |
| 675 | + #CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\tFORMAT\ttsk_0\ttsk_1\ttsk_2 |
| 676 | + 1\t0\t0\t0\t1\t.\tPASS\t.\tGT\t1\t0\t0 |
| 677 | + 1\t2\t1\t0\t1\t.\tPASS\t.\tGT\t0\t1\t1 |
| 678 | + 1\t4\t2\t0\t1\t.\tPASS\t.\tGT\t0\t1\t0 |
| 679 | + 1\t6\t3\t0\t1\t.\tPASS\t.\tGT\t0\t0\t1""" |
| 680 | + expected = textwrap.dedent(s) |
| 681 | + assert drop_header(self.ts().as_vcf()) == expected |
| 682 | + |
| 683 | + def test_no_masks_triploid(self): |
| 684 | + s = """\ |
| 685 | + #CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\tFORMAT\ttsk_0 |
| 686 | + 1\t0\t0\t0\t1\t.\tPASS\t.\tGT\t1|0|0 |
| 687 | + 1\t2\t1\t0\t1\t.\tPASS\t.\tGT\t0|1|1 |
| 688 | + 1\t4\t2\t0\t1\t.\tPASS\t.\tGT\t0|1|0 |
| 689 | + 1\t6\t3\t0\t1\t.\tPASS\t.\tGT\t0|0|1""" |
| 690 | + expected = textwrap.dedent(s) |
| 691 | + assert drop_header(self.ts().as_vcf(ploidy=3)) == expected |
| 692 | + |
| 693 | + def test_site_0_masked(self): |
| 694 | + s = """\ |
| 695 | + #CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\tFORMAT\ttsk_0\ttsk_1\ttsk_2 |
| 696 | + 1\t2\t1\t0\t1\t.\tPASS\t.\tGT\t0\t1\t1 |
| 697 | + 1\t4\t2\t0\t1\t.\tPASS\t.\tGT\t0\t1\t0 |
| 698 | + 1\t6\t3\t0\t1\t.\tPASS\t.\tGT\t0\t0\t1""" |
| 699 | + expected = textwrap.dedent(s) |
| 700 | + actual = self.ts().as_vcf(site_mask=[True, False, False, False]) |
| 701 | + assert drop_header(actual) == expected |
| 702 | + |
| 703 | + def test_site_0_masked_triploid(self): |
| 704 | + s = """\ |
| 705 | + #CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\tFORMAT\ttsk_0 |
| 706 | + 1\t2\t1\t0\t1\t.\tPASS\t.\tGT\t0|1|1 |
| 707 | + 1\t4\t2\t0\t1\t.\tPASS\t.\tGT\t0|1|0 |
| 708 | + 1\t6\t3\t0\t1\t.\tPASS\t.\tGT\t0|0|1""" |
| 709 | + expected = textwrap.dedent(s) |
| 710 | + actual = self.ts().as_vcf(ploidy=3, site_mask=[True, False, False, False]) |
| 711 | + assert drop_header(actual) == expected |
| 712 | + |
| 713 | + def test_site_1_masked(self): |
| 714 | + s = """\ |
| 715 | + #CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\tFORMAT\ttsk_0\ttsk_1\ttsk_2 |
| 716 | + 1\t0\t0\t0\t1\t.\tPASS\t.\tGT\t1\t0\t0 |
| 717 | + 1\t4\t2\t0\t1\t.\tPASS\t.\tGT\t0\t1\t0 |
| 718 | + 1\t6\t3\t0\t1\t.\tPASS\t.\tGT\t0\t0\t1""" |
| 719 | + expected = textwrap.dedent(s) |
| 720 | + actual = self.ts().as_vcf(site_mask=[False, True, False, False]) |
| 721 | + assert drop_header(actual) == expected |
| 722 | + |
| 723 | + def test_all_sites_masked(self): |
| 724 | + s = """\ |
| 725 | + #CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\tFORMAT\ttsk_0\ttsk_1\ttsk_2""" |
| 726 | + expected = textwrap.dedent(s) |
| 727 | + actual = self.ts().as_vcf(site_mask=[True, True, True, True]) |
| 728 | + assert drop_header(actual) == expected |
| 729 | + |
| 730 | + def test_all_sites_not_masked(self): |
| 731 | + s = """\ |
| 732 | + #CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\tFORMAT\ttsk_0\ttsk_1\ttsk_2 |
| 733 | + 1\t0\t0\t0\t1\t.\tPASS\t.\tGT\t1\t0\t0 |
| 734 | + 1\t2\t1\t0\t1\t.\tPASS\t.\tGT\t0\t1\t1 |
| 735 | + 1\t4\t2\t0\t1\t.\tPASS\t.\tGT\t0\t1\t0 |
| 736 | + 1\t6\t3\t0\t1\t.\tPASS\t.\tGT\t0\t0\t1""" |
| 737 | + expected = textwrap.dedent(s) |
| 738 | + actual = self.ts().as_vcf(site_mask=[False, False, False, False]) |
| 739 | + assert drop_header(actual) == expected |
| 740 | + |
| 741 | + @pytest.mark.parametrize( |
| 742 | + "mask", |
| 743 | + [[False, False, False], [0, 0, 0], lambda _: [False, False, False]], |
| 744 | + ) |
| 745 | + def test_all_samples_not_masked(self, mask): |
| 746 | + s = """\ |
| 747 | + #CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\tFORMAT\ttsk_0\ttsk_1\ttsk_2 |
| 748 | + 1\t0\t0\t0\t1\t.\tPASS\t.\tGT\t1\t0\t0 |
| 749 | + 1\t2\t1\t0\t1\t.\tPASS\t.\tGT\t0\t1\t1 |
| 750 | + 1\t4\t2\t0\t1\t.\tPASS\t.\tGT\t0\t1\t0 |
| 751 | + 1\t6\t3\t0\t1\t.\tPASS\t.\tGT\t0\t0\t1""" |
| 752 | + expected = textwrap.dedent(s) |
| 753 | + actual = self.ts().as_vcf(sample_mask=mask) |
| 754 | + assert drop_header(actual) == expected |
| 755 | + |
| 756 | + @pytest.mark.parametrize( |
| 757 | + "mask", [[True, False, False], [1, 0, 0], lambda _: [True, False, False]] |
| 758 | + ) |
| 759 | + def test_sample_0_masked(self, mask): |
| 760 | + s = """\ |
| 761 | + #CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\tFORMAT\ttsk_0\ttsk_1\ttsk_2 |
| 762 | + 1\t0\t0\t0\t1\t.\tPASS\t.\tGT\t.\t0\t0 |
| 763 | + 1\t2\t1\t0\t1\t.\tPASS\t.\tGT\t.\t1\t1 |
| 764 | + 1\t4\t2\t0\t1\t.\tPASS\t.\tGT\t.\t1\t0 |
| 765 | + 1\t6\t3\t0\t1\t.\tPASS\t.\tGT\t.\t0\t1""" |
| 766 | + expected = textwrap.dedent(s) |
| 767 | + actual = self.ts().as_vcf(sample_mask=mask) |
| 768 | + assert drop_header(actual) == expected |
| 769 | + |
| 770 | + @pytest.mark.parametrize( |
| 771 | + "mask", [[False, True, False], [0, 1, 0], lambda _: [False, True, False]] |
| 772 | + ) |
| 773 | + def test_sample_1_masked(self, mask): |
| 774 | + s = """\ |
| 775 | + #CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\tFORMAT\ttsk_0\ttsk_1\ttsk_2 |
| 776 | + 1\t0\t0\t0\t1\t.\tPASS\t.\tGT\t1\t.\t0 |
| 777 | + 1\t2\t1\t0\t1\t.\tPASS\t.\tGT\t0\t.\t1 |
| 778 | + 1\t4\t2\t0\t1\t.\tPASS\t.\tGT\t0\t.\t0 |
| 779 | + 1\t6\t3\t0\t1\t.\tPASS\t.\tGT\t0\t.\t1""" |
| 780 | + expected = textwrap.dedent(s) |
| 781 | + actual = self.ts().as_vcf(sample_mask=mask) |
| 782 | + assert drop_header(actual) == expected |
| 783 | + |
| 784 | + @pytest.mark.parametrize( |
| 785 | + "mask", [[True, True, True], [1, 1, 1], lambda _: [True, True, True]] |
| 786 | + ) |
| 787 | + def test_all_samples_masked(self, mask): |
| 788 | + s = """\ |
| 789 | + #CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\tFORMAT\ttsk_0\ttsk_1\ttsk_2 |
| 790 | + 1\t0\t0\t0\t1\t.\tPASS\t.\tGT\t.\t.\t. |
| 791 | + 1\t2\t1\t0\t1\t.\tPASS\t.\tGT\t.\t.\t. |
| 792 | + 1\t4\t2\t0\t1\t.\tPASS\t.\tGT\t.\t.\t. |
| 793 | + 1\t6\t3\t0\t1\t.\tPASS\t.\tGT\t.\t.\t.""" |
| 794 | + expected = textwrap.dedent(s) |
| 795 | + actual = self.ts().as_vcf(sample_mask=mask) |
| 796 | + assert drop_header(actual) == expected |
| 797 | + |
| 798 | + def test_all_functional_sample_mask(self): |
| 799 | + s = """\ |
| 800 | + #CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\tFORMAT\ttsk_0\ttsk_1\ttsk_2 |
| 801 | + 1\t0\t0\t0\t1\t.\tPASS\t.\tGT\t.\t0\t0 |
| 802 | + 1\t2\t1\t0\t1\t.\tPASS\t.\tGT\t0\t.\t1 |
| 803 | + 1\t4\t2\t0\t1\t.\tPASS\t.\tGT\t0\t1\t. |
| 804 | + 1\t6\t3\t0\t1\t.\tPASS\t.\tGT\t.\t0\t1""" |
| 805 | + |
| 806 | + def mask(variant): |
| 807 | + a = [0, 0, 0] |
| 808 | + a[variant.site.id % 3] = 1 |
| 809 | + return a |
| 810 | + |
| 811 | + expected = textwrap.dedent(s) |
| 812 | + actual = self.ts().as_vcf(sample_mask=mask) |
| 813 | + assert drop_header(actual) == expected |
| 814 | + |
| 815 | + @pytest.mark.skipif(not _pysam_imported, reason="pysam not available") |
| 816 | + def test_mask_ok_with_pysam(self): |
| 817 | + with ts_to_pysam(self.ts(), sample_mask=[0, 0, 1]) as records: |
| 818 | + variants = list(records) |
| 819 | + assert len(variants) == 4 |
| 820 | + samples = ["tsk_0", "tsk_1", "tsk_2"] |
| 821 | + gts = [variants[0].samples[key]["GT"] for key in samples] |
| 822 | + assert gts == [(1,), (0,), (None,)] |
| 823 | + |
| 824 | + gts = [variants[1].samples[key]["GT"] for key in samples] |
| 825 | + assert gts == [(0,), (1,), (None,)] |
| 826 | + |
| 827 | + gts = [variants[2].samples[key]["GT"] for key in samples] |
| 828 | + assert gts == [(0,), (1,), (None,)] |
| 829 | + |
| 830 | + gts = [variants[3].samples[key]["GT"] for key in samples] |
| 831 | + assert gts == [(0,), (0,), (None,)] |
0 commit comments