diff --git a/docs/api.rst b/docs/api.rst index 20a787b6c..0520292d0 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -48,6 +48,7 @@ Methods divergence diversity Fst + Garud_h gwas_linear_regression hardy_weinberg_test regenie @@ -88,6 +89,10 @@ Variables variables.pc_relate_phi_spec variables.sample_id_spec variables.sample_pcs_spec + variables.stat_Garud_h1_spec + variables.stat_Garud_h12_spec + variables.stat_Garud_h123_spec + variables.stat_Garud_h2_h1_spec variables.traits_spec variables.variant_allele_spec variables.variant_allele_count_spec diff --git a/sgkit/__init__.py b/sgkit/__init__.py index dc88e8819..ec521c1d5 100644 --- a/sgkit/__init__.py +++ b/sgkit/__init__.py @@ -18,7 +18,7 @@ from .stats.hwe import hardy_weinberg_test from .stats.pc_relate import pc_relate from .stats.pca import pca -from .stats.popgen import Fst, Tajimas_D, divergence, diversity, pbs +from .stats.popgen import Fst, Garud_h, Tajimas_D, divergence, diversity, pbs from .stats.preprocessing import filter_partial_calls from .stats.regenie import regenie from .testing import simulate_genotype_call_dataset @@ -44,6 +44,7 @@ "diversity", "divergence", "Fst", + "Garud_h", "Tajimas_D", "pbs", "pc_relate", diff --git a/sgkit/stats/popgen.py b/sgkit/stats/popgen.py index 5e49b2a30..a02b56d1f 100644 --- a/sgkit/stats/popgen.py +++ b/sgkit/stats/popgen.py @@ -1,3 +1,4 @@ +import collections from typing import Hashable, Optional import dask.array as da @@ -7,7 +8,11 @@ from sgkit.stats.utils import assert_array_shape from sgkit.typing import ArrayLike -from sgkit.utils import conditional_merge_datasets, define_variable_if_absent +from sgkit.utils import ( + conditional_merge_datasets, + define_variable_if_absent, + hash_array, +) from sgkit.window import has_windows, window_statistic from .. import variables @@ -682,3 +687,174 @@ def pbs( {variables.stat_pbs: (["windows", "cohorts_0", "cohorts_1", "cohorts_2"], p)} ) return conditional_merge_datasets(ds, variables.validate(new_ds), merge) + + +N_GARUD_H_STATS = 4 # H1, H12, H123, H2/H1 + + +def _Garud_h(haplotypes: ArrayLike) -> ArrayLike: + # find haplotype counts (sorted in descending order) + counts = sorted(collections.Counter(haplotypes.tolist()).values(), reverse=True) + counts = np.array(counts) + + # find haplotype frequencies + n = haplotypes.shape[0] + f = counts / n + + # compute H1 + h1 = np.sum(f ** 2) + + # compute H12 + h12 = np.sum(f[:2]) ** 2 + np.sum(f[2:] ** 2) + + # compute H123 + h123 = np.sum(f[:3]) ** 2 + np.sum(f[3:] ** 2) + + # compute H2/H1 + h2 = h1 - f[0] ** 2 + h2_h1 = h2 / h1 + + return np.array([h1, h12, h123, h2_h1]) + + +def _Garud_h_cohorts( + gt: ArrayLike, sample_cohort: ArrayLike, n_cohorts: int +) -> ArrayLike: + # transpose to hash columns (haplotypes) + haplotypes = hash_array(gt.transpose()).transpose().flatten() + arr = np.empty((n_cohorts, N_GARUD_H_STATS)) + for c in range(n_cohorts): + arr[c, :] = _Garud_h(haplotypes[sample_cohort == c]) + return arr + + +def Garud_h( + ds: Dataset, + *, + call_genotype: Hashable = variables.call_genotype, + merge: bool = True, +) -> Dataset: + """Compute the H1, H12, H123 and H2/H1 statistics for detecting signatures + of soft sweeps, as defined in Garud et al. (2015). + + By default, values of this statistic are calculated across all variants. + To compute values in windows, call :func:`window` before calling + this function. + + Parameters + ---------- + ds + Genotype call dataset. + call_genotype + Input variable name holding call_genotype as defined by + :data:`sgkit.variables.call_genotype_spec`. + Must be present in ``ds``. + merge + If True (the default), merge the input dataset and the computed + output variables into a single dataset, otherwise return only + the computed output variables. + See :ref:`dataset_merge` for more details. + + Returns + ------- + A dataset containing the following variables: + + - `stat_Garud_h1` (windows, cohorts): Garud H1 statistic. + Defined by :data:`sgkit.variables.stat_Garud_h1_spec`. + + - `stat_Garud_h12` (windows, cohorts): Garud H12 statistic. + Defined by :data:`sgkit.variables.stat_Garud_h12_spec`. + + - `stat_Garud_h123` (windows, cohorts): Garud H123 statistic. + Defined by :data:`sgkit.variables.stat_Garud_h123_spec`. + + - `stat_Garud_h2_h1` (windows, cohorts): Garud H2/H1 statistic. + Defined by :data:`sgkit.variables.stat_Garud_h2_h1_spec`. + + Raises + ------ + NotImplementedError + If the dataset is not diploid. + + Warnings + -------- + This function is currently only implemented for diploid datasets. + + Examples + -------- + + >>> import numpy as np + >>> import sgkit as sg + >>> import xarray as xr + >>> ds = sg.simulate_genotype_call_dataset(n_variant=5, n_sample=4) + + >>> # Divide samples into two cohorts + >>> sample_cohort = np.repeat([0, 1], ds.dims["samples"] // 2) + >>> ds["sample_cohort"] = xr.DataArray(sample_cohort, dims="samples") + + >>> # Divide into windows of size three (variants) + >>> ds = sg.window(ds, size=3, step=3) + + >>> gh = sg.Garud_h(ds) + >>> gh["stat_Garud_h1"].values # doctest: +NORMALIZE_WHITESPACE + array([[0.25 , 0.375], + [0.375, 0.375]]) + >>> gh["stat_Garud_h12"].values # doctest: +NORMALIZE_WHITESPACE + array([[0.375, 0.625], + [0.625, 0.625]]) + >>> gh["stat_Garud_h123"].values # doctest: +NORMALIZE_WHITESPACE + array([[0.625, 1. ], + [1. , 1. ]]) + >>> gh["stat_Garud_h2_h1"].values # doctest: +NORMALIZE_WHITESPACE + array([[0.75 , 0.33333333], + [0.33333333, 0.33333333]]) + """ + + if ds.dims["ploidy"] != 2: + raise NotImplementedError("Garud H only implemented for diploid genotypes") + + if not has_windows(ds): + raise ValueError("Dataset must be windowed for Garud_h") + + variables.validate(ds, {call_genotype: variables.call_genotype_spec}) + + gt = ds[call_genotype] + + # convert sample cohorts to haplotype layout + sc = ds.sample_cohort.values + hsc = np.stack((sc, sc), axis=1).ravel() # TODO: assumes diploid + n_cohorts = sc.max() + 1 # 0-based indexing + + gh = window_statistic( + gt, + lambda gt: _Garud_h_cohorts(gt, hsc, n_cohorts), + ds.window_start.values, + ds.window_stop.values, + dtype=np.float64, + # first chunks dimension is windows, computed in window_statistic + chunks=(-1, n_cohorts, N_GARUD_H_STATS), + ) + n_windows = ds.window_start.shape[0] + assert_array_shape(gh, n_windows, n_cohorts, N_GARUD_H_STATS) + new_ds = Dataset( + { + variables.stat_Garud_h1: ( + ("windows", "cohorts"), + gh[:, :, 0], + ), + variables.stat_Garud_h12: ( + ("windows", "cohorts"), + gh[:, :, 1], + ), + variables.stat_Garud_h123: ( + ("windows", "cohorts"), + gh[:, :, 2], + ), + variables.stat_Garud_h2_h1: ( + ("windows", "cohorts"), + gh[:, :, 3], + ), + } + ) + + return conditional_merge_datasets(ds, variables.validate(new_ds), merge) diff --git a/sgkit/tests/test_popgen.py b/sgkit/tests/test_popgen.py index 8170945d5..2622c2ddf 100644 --- a/sgkit/tests/test_popgen.py +++ b/sgkit/tests/test_popgen.py @@ -9,6 +9,7 @@ from sgkit import ( Fst, + Garud_h, Tajimas_D, count_cohort_alleles, count_variant_alleles, @@ -16,6 +17,7 @@ divergence, diversity, pbs, + simulate_genotype_call_dataset, variables, ) from sgkit.window import window @@ -399,3 +401,58 @@ def test_pbs__windowed(sample_size, n_cohorts, chunks): ) np.testing.assert_allclose(stat_pbs[:-1], ska_pbs_value) + + +@pytest.mark.parametrize( + "n_variants, n_samples, n_contigs, n_cohorts", + [(3, 5, 1, 1)], +) +def test_Garud_h__no_windows(n_variants, n_samples, n_contigs, n_cohorts): + # We can't use msprime since it doesn't generate diploid data, and Garud uses phased data + ds = simulate_genotype_call_dataset( + n_variant=n_variants, n_sample=n_samples, n_contig=n_contigs + ) + subsets = np.array_split(ds.samples.values, n_cohorts) + sample_cohorts = np.concatenate( + [np.full_like(subset, i) for i, subset in enumerate(subsets)] + ) + ds["sample_cohort"] = xr.DataArray(sample_cohorts, dims="samples") + + with pytest.raises(ValueError, match="Dataset must be windowed for Garud_h"): + Garud_h(ds) + + +@pytest.mark.parametrize( + "n_variants, n_samples, n_contigs, n_cohorts", + [(9, 5, 1, 1), (9, 5, 1, 2)], +) +@pytest.mark.parametrize("chunks", [(-1, -1), (5, -1)]) +def test_Garud_h(n_variants, n_samples, n_contigs, n_cohorts, chunks): + ds = simulate_genotype_call_dataset( + n_variant=n_variants, n_sample=n_samples, n_contig=n_contigs + ) + ds = ds.chunk(dict(zip(["variants", "samples"], chunks))) + subsets = np.array_split(ds.samples.values, n_cohorts) + sample_cohorts = np.concatenate( + [np.full_like(subset, i) for i, subset in enumerate(subsets)] + ) + ds["sample_cohort"] = xr.DataArray(sample_cohorts, dims="samples") + ds = window(ds, size=3, step=3) + + gh = Garud_h(ds) + h1 = gh.stat_Garud_h1.values + h12 = gh.stat_Garud_h12.values + h123 = gh.stat_Garud_h123.values + h2_h1 = gh.stat_Garud_h2_h1.values + + # scikit-allel + for c in range(n_cohorts): + gt = ds.call_genotype.values[:, sample_cohorts == c, :] + ska_gt = allel.GenotypeArray(gt) + ska_ha = ska_gt.to_haplotypes() + ska_h = allel.moving_garud_h(ska_ha, size=3, step=3) + + np.testing.assert_allclose(h1[:, c], ska_h[0]) + np.testing.assert_allclose(h12[:, c], ska_h[1]) + np.testing.assert_allclose(h123[:, c], ska_h[2]) + np.testing.assert_allclose(h2_h1[:, c], ska_h[3]) diff --git a/sgkit/tests/test_utils.py b/sgkit/tests/test_utils.py index 8b8ebf13a..ee90832a6 100644 --- a/sgkit/tests/test_utils.py +++ b/sgkit/tests/test_utils.py @@ -13,6 +13,7 @@ check_array_like, define_variable_if_absent, encode_array, + hash_array, max_str_len, merge_datasets, split_array_chunks, @@ -208,3 +209,25 @@ def test_split_array_chunks__raise_on_blocks_lte_0(): def test_split_array_chunks__raise_on_n_lte_0(): with pytest.raises(ValueError, match=r"Number of elements .* must be >= 0"): split_array_chunks(0, 0) + + +@given(st.integers(2, 50), st.integers(1, 50)) +@settings(deadline=None) # avoid problem with numba jit compilation +def test_hash_array(n_rows, n_cols): + # construct an array with random repeated rows + x = np.random.randint(-2, 10, size=(n_rows // 2, n_cols)) + rows = np.random.choice(x.shape[0], n_rows, replace=True) + x = x[rows, :] + + # find unique column counts (exact method) + _, expected_inverse, expected_counts = np.unique( + x, axis=0, return_inverse=True, return_counts=True + ) + + # hash columns, then find unique column counts using the hash values + h = hash_array(x) + _, inverse, counts = np.unique(h, return_inverse=True, return_counts=True) + + # counts[inverse] gives the count for each column in x + # these should be the same for both ways of counting + np.testing.assert_equal(counts[inverse], expected_counts[expected_inverse]) diff --git a/sgkit/utils.py b/sgkit/utils.py index be45016a4..26d623fef 100644 --- a/sgkit/utils.py +++ b/sgkit/utils.py @@ -2,6 +2,7 @@ from typing import Any, Callable, Hashable, List, Optional, Set, Tuple, Union import numpy as np +from numba import guvectorize from xarray import Dataset from .typing import ArrayLike, DType @@ -271,3 +272,36 @@ def max_str_len(a: ArrayLike) -> ArrayLike: if isinstance(a, np.ndarray): lens = np.asarray(lens) return lens.max() + + +@guvectorize( # type: ignore + [ + "void(int8[:], int64[:])", + "void(int16[:], int64[:])", + "void(int32[:], int64[:])", + "void(int64[:], int64[:])", + ], + "(n)->()", + nopython=True, + cache=True, +) +def hash_array(x: ArrayLike, out: ArrayLike) -> None: + """Hash entries of ``x`` using the DJBX33A hash function. + + This is ~5 times faster than calling ``tobytes()`` followed + by ``hash()`` on array columns. This function also does not + hold the GIL, making it suitable for use with the Dask + threaded scheduler. + + Parameters + ---------- + x + 1D array of type integer. + + Returns + ------- + Array containing a single hash value of type int64. + """ + out[0] = 5381 + for i in range(x.shape[0]): + out[0] = out[0] * 33 + x[i] diff --git a/sgkit/variables.py b/sgkit/variables.py index dd5ad09ba..4e00b8fb5 100644 --- a/sgkit/variables.py +++ b/sgkit/variables.py @@ -318,6 +318,22 @@ def _check_field( ArrayLikeSpec("stat_diversity", ndim=2, kind="f") ) """Genetic diversity (also known as "Tajima’s pi") for cohorts.""" +stat_Garud_h1, stat_Garud_h1_spec = SgkitVariables.register_variable( + ArrayLikeSpec("stat_Garud_h1", ndim={1, 2}, kind="f") +) +"""Garud H1 statistic for cohorts.""" +stat_Garud_h12, stat_Garud_h12_spec = SgkitVariables.register_variable( + ArrayLikeSpec("stat_Garud_h12", ndim={1, 2}, kind="f") +) +"""Garud H12 statistic for cohorts.""" +stat_Garud_h123, stat_Garud_h123_spec = SgkitVariables.register_variable( + ArrayLikeSpec("stat_Garud_h123", ndim={1, 2}, kind="f") +) +"""Garud H123 statistic for cohorts.""" +stat_Garud_h2_h1, stat_Garud_h2_h1_spec = SgkitVariables.register_variable( + ArrayLikeSpec("stat_Garud_h2_h1", ndim={1, 2}, kind="f") +) +"""Garud H2/H1 statistic for cohorts.""" stat_pbs, stat_pbs_spec = SgkitVariables.register_variable( ArrayLikeSpec("stat_pbs", ndim=4, kind="f") ) diff --git a/sgkit/window.py b/sgkit/window.py index 1d2da2ef1..74948da9d 100644 --- a/sgkit/window.py +++ b/sgkit/window.py @@ -1,4 +1,4 @@ -from typing import Any, Callable, Tuple +from typing import Any, Callable, Iterable, Tuple, Union import dask.array as da import numpy as np @@ -134,10 +134,13 @@ def window_statistic( window_starts: ArrayLike, window_stops: ArrayLike, dtype: DType, + chunks: Any = None, + new_axis: Union[None, int, Iterable[int]] = None, **kwargs: Any, ) -> da.Array: values = da.asarray(values) + desired_chunks = chunks or values.chunks window_lengths = window_stops - window_starts depth = np.max(window_lengths) @@ -184,7 +187,7 @@ def blockwise_moving_stat(x: ArrayLike, block_info: Any = None) -> ArrayLike: # depth is 0 except in first axis depth = {0: depth} # new chunks are same except in first axis - new_chunks = tuple([tuple(windows_per_chunk)] + list(values.chunks[1:])) # type: ignore + new_chunks = tuple([tuple(windows_per_chunk)] + list(desired_chunks[1:])) # type: ignore return values.map_overlap( blockwise_moving_stat, dtype=dtype, @@ -192,6 +195,7 @@ def blockwise_moving_stat(x: ArrayLike, block_info: Any = None) -> ArrayLike: depth=depth, boundary=0, trim=False, + new_axis=new_axis, )