Skip to content
This repository was archived by the owner on Oct 15, 2020. It is now read-only.

BGEN reader implementation using bgen_reader #1

Merged
merged 8 commits into from
Jul 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v2.5.0
hooks:
- id: check-merge-conflict
- id: debug-statements
- id: mixed-line-ending
- id: check-case-conflict
- id: check-yaml
- repo: https://github.com/asottile/seed-isort-config
rev: v2.2.0
hooks:
- id: seed-isort-config
- repo: https://github.com/timothycrosley/isort
rev: 4.3.21-2 # pick the isort version you'd like to use from https://github.com/timothycrosley/isort/releases
hooks:
- id: isort
- repo: https://github.com/python/black
rev: 19.10b0
hooks:
- id: black
language_version: python3
- repo: https://gitlab.com/pycqa/flake8
rev: 3.7.9
hooks:
- id: flake8
language_version: python3
8 changes: 8 additions & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
black
flake8
isort
mypy
pre-commit
pytest
pytest-cov
pytest-datadir
8 changes: 8 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
dask[array]
dask[dataframe]
fsspec
numpy
scipy
xarray
bgen_reader
git+https://github.com/pystatgen/sgkit
68 changes: 68 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
[metadata]
name = sgkit-bgen
author = sgkit Developers
license = Apache
description = BGEN IO implementations for sgkit
long_description_content_type=text/x-rst
long_description =
**sgkit-bgen** contains block array readers for large-scale genomic data stored as BGEN
url = https://github.com/pystatgen/sgkit
classifiers =
Development Status :: 2 - Pre-Alpha
License :: OSI Approved :: Apache Software License
Operating System :: OS Independent
Intended Audience :: Science/Research
Programming Language :: Python
Programming Language :: Python :: 3
Programming Language :: Python :: 3.7
Programming Language :: Python :: 3.8
Topic :: Scientific/Engineering

[options]
packages = sgkit_bgen
zip_safe = False # https://mypy.readthedocs.io/en/latest/installed_packages.html
include_package_data = True
python_requires = >=3.7
install_requires =
dask[array]
dask[dataframe]
fsspec
numpy
scipy
xarray
bgen_reader
setuptools >= 41.2 # For pkg_resources
setup_requires =
setuptools >= 41.2
setuptools_scm

[coverage:report]
fail_under = 100

[flake8]
ignore =
# whitespace before ':' - doesn't work well with black
E203
E402
# line too long - let black worry about that
E501
# do not assign a lambda expression, use a def
E731
# line break before binary operator
W503

[isort]
default_section = THIRDPARTY
known_first_party = sgkit
known_third_party = bgen_reader,dask,numpy,pytest,xarray
multi_line_output = 3
include_trailing_comma = True
force_grid_wrap = 0
use_parentheses = True
line_length = 88

[mypy-numpy.*]
ignore_missing_imports = True

[mypy-sgkit_bgen.tests.*]
disallow_untyped_defs = False
3 changes: 3 additions & 0 deletions sgkit_bgen/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .bgen_reader import read_bgen # noqa: F401

__all__ = ["read_bgen"]
207 changes: 207 additions & 0 deletions sgkit_bgen/bgen_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
"""BGEN reader implementation (using bgen_reader)"""
from pathlib import Path
from typing import Any, Union

import dask.array as da
import numpy as np
from bgen_reader._bgen_file import bgen_file
from bgen_reader._bgen_metafile import bgen_metafile
from bgen_reader._metafile import create_metafile
from bgen_reader._reader import _infer_metafile_filepath
from bgen_reader._samples import generate_samples, read_samples_file
from xarray import Dataset

from sgkit import create_genotype_dosage_dataset
from sgkit.typing import ArrayLike
from sgkit.utils import encode_array

PathType = Union[str, Path]


def _to_dict(df, dtype=None):
return {
c: df[c].to_dask_array(lengths=True).astype(dtype[c] if dtype else df[c].dtype)
for c in df
}


VARIANT_FIELDS = [
("id", str, "U"),
("rsid", str, "U"),
("chrom", str, "U"),
("pos", str, "int32"),
("nalleles", str, "int8"),
("allele_ids", str, "U"),
("vaddr", str, "int64"),
]
VARIANT_DF_DTYPE = dict([(f[0], f[1]) for f in VARIANT_FIELDS])
VARIANT_ARRAY_DTYPE = dict([(f[0], f[2]) for f in VARIANT_FIELDS])


class BgenReader:

name = "bgen_reader"

def __init__(self, path, persist=True, dtype=np.float32):
self.path = Path(path)

self.metafile_filepath = _infer_metafile_filepath(Path(self.path))
if not self.metafile_filepath.exists():
create_metafile(path, self.metafile_filepath, verbose=False)

with bgen_metafile(self.metafile_filepath) as mf:
self.n_variants = mf.nvariants
self.npartitions = mf.npartitions
self.partition_size = mf.partition_size

df = mf.create_variants()
if persist:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Quite cool that bgen reader is already giving back dask frames!

df = df.persist()
variant_arrs = _to_dict(df, dtype=VARIANT_ARRAY_DTYPE)

self.variant_id = variant_arrs["id"]
self.contig = variant_arrs["chrom"]
self.pos = variant_arrs["pos"]

def split_alleles(alleles, block_info=None):
if block_info is None or len(block_info) == 0:
return alleles

def split(allele_row):
alleles_list = allele_row[0].split(",")
assert len(alleles_list) == 2 # bi-allelic
return np.array(alleles_list)

return np.apply_along_axis(split, 1, alleles[:, np.newaxis])

variant_alleles = variant_arrs["allele_ids"].map_blocks(split_alleles)

def max_str_len(arr: ArrayLike) -> Any:
return arr.map_blocks(
lambda s: np.char.str_len(s.astype(str)), dtype=np.int8
).max()

max_allele_length = max(max_str_len(variant_alleles).compute())
self.variant_alleles = variant_alleles.astype(f"S{max_allele_length}")

with bgen_file(self.path) as bgen:
sample_path = self.path.with_suffix(".sample")
if sample_path.exists():
self.sample_id = read_samples_file(sample_path, verbose=False)
else:
if bgen.contain_samples:
self.sample_id = bgen.read_samples()
else:
self.sample_id = generate_samples(bgen.nsamples)

self.shape = (self.n_variants, len(self.sample_id))
self.dtype = dtype
self.ndim = 2

def __getitem__(self, idx):
if not isinstance(idx, tuple):
raise IndexError( # pragma: no cover
f"Indexer must be tuple (received {type(idx)})"
)
if len(idx) != self.ndim:
raise IndexError( # pragma: no cover
f"Indexer must be two-item tuple (received {len(idx)} slices)"
)

if idx[0].start == idx[0].stop:
return np.empty((0, 0), dtype=self.dtype)

start_partition = idx[0].start // self.partition_size
start_partition_offset = idx[0].start % self.partition_size
end_partition = (idx[0].stop - 1) // self.partition_size
end_partition_offset = (idx[0].stop - 1) % self.partition_size

all_vaddr = []
with bgen_metafile(self.metafile_filepath) as mf:
for i in range(start_partition, end_partition + 1):
partition = mf.read_partition(i)
start_offset = start_partition_offset if i == start_partition else 0
end_offset = (
end_partition_offset + 1
if i == end_partition
else self.partition_size
)
vaddr = partition["vaddr"].tolist()
all_vaddr.extend(vaddr[start_offset:end_offset])

with bgen_file(self.path) as bgen:
genotypes = [bgen.read_genotype(vaddr) for vaddr in all_vaddr]
all_probs = [genotype["probs"] for genotype in genotypes]
d = [_to_dosage(probs) for probs in all_probs]
return np.stack(d)[:, idx[1]]


def _to_dosage(probs: ArrayLike):
"""Calculate the dosage from genotype likelihoods (probabilities)"""
assert len(probs.shape) == 2 and probs.shape[1] == 3
return 2 * probs[:, -1] + probs[:, 1]


def read_bgen(
path: PathType,
chunks: Union[str, int, tuple] = "auto",
lock: bool = False,
persist: bool = True,
) -> Dataset:
"""Read BGEN dataset.

Loads a single BGEN dataset as dask arrays within a Dataset
from a bgen file.

Parameters
----------
path : PathType
Path to BGEN file.
chunks : Union[str, int, tuple], optional
Chunk size for genotype data, by default "auto"
lock : bool, optional
Whether or not to synchronize concurrent reads of
file blocks, by default False. This is passed through to
[dask.array.from_array](https://docs.dask.org/en/latest/array-api.html#dask.array.from_array).
persist : bool, optional
Whether or not to persist variant information in
memory, by default True. This is an important performance
consideration as the metadata file for this data will
be read multiple times when False.

Warnings
--------
Only bi-allelic, diploid BGEN files are currently supported.
"""

bgen_reader = BgenReader(path, persist)

variant_contig, variant_contig_names = encode_array(bgen_reader.contig.compute())
variant_contig_names = list(variant_contig_names)
variant_contig = variant_contig.astype("int16")

variant_position = np.array(bgen_reader.pos, dtype=int)
variant_alleles = np.array(bgen_reader.variant_alleles, dtype="S1")
variant_id = np.array(bgen_reader.variant_id, dtype=str)

sample_id = np.array(bgen_reader.sample_id, dtype=str)

call_dosage = da.from_array(
bgen_reader,
chunks=chunks,
lock=lock,
asarray=False,
name=f"{bgen_reader.name}:read_bgen:{path}",
)

ds = create_genotype_dosage_dataset(
variant_contig_names=variant_contig_names,
variant_contig=variant_contig,
variant_position=variant_position,
variant_alleles=variant_alleles,
sample_id=sample_id,
call_dosage=call_dosage,
variant_id=variant_id,
)

return ds
Empty file added sgkit_bgen/tests/__init__.py
Empty file.
Binary file added sgkit_bgen/tests/data/example-no-samples.bgen
Binary file not shown.
Binary file not shown.
7 changes: 7 additions & 0 deletions sgkit_bgen/tests/data/example-separate-samples.sample
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
ID
0
s1
s2
s3
s4
s5
Binary file added sgkit_bgen/tests/data/example.bgen
Binary file not shown.
5 changes: 5 additions & 0 deletions sgkit_bgen/tests/data/samples
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
sample_001
sample_002
sample_003
sample_004
sample_005
40 changes: 40 additions & 0 deletions sgkit_bgen/tests/test_bgen_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import numpy.testing as npt
import pytest
from sgkit_bgen import read_bgen


@pytest.mark.parametrize("chunks", [(100, 200), (100, 500), (199, 500), "auto"])
def test_read_bgen(shared_datadir, chunks):
path = shared_datadir / "example.bgen"
ds = read_bgen(path, chunks=chunks)

# check some of the data (in different chunks)
assert ds["call/dosage"].shape == (199, 500)
npt.assert_almost_equal(ds["call/dosage"].values[1][0], 1.987, decimal=3)
npt.assert_almost_equal(ds["call/dosage"].values[100][0], 0.160, decimal=3)


def test_read_bgen_with_sample_file(shared_datadir):
# The example file was generated using
# qctool -g sgkit_bgen/tests/data/example.bgen -og sgkit_bgen/tests/data/example-separate-samples.bgen -os sgkit_bgen/tests/data/example-separate-samples.sample -incl-samples sgkit_bgen/tests/data/samples
# Then editing example-separate-samples.sample to change the sample IDs
path = shared_datadir / "example-separate-samples.bgen"
ds = read_bgen(path)
# Check the sample IDs are the ones from the .sample file
assert ds["sample/id"].values.tolist() == ["s1", "s2", "s3", "s4", "s5"]


def test_read_bgen_with_no_samples(shared_datadir):
# The example file was generated using
# qctool -g sgkit_bgen/tests/data/example.bgen -og sgkit_bgen/tests/data/example-no-samples.bgen -os sgkit_bgen/tests/data/example-no-samples.sample -bgen-omit-sample-identifier-block -incl-samples sgkit_bgen/tests/data/samples
# Then deleting example-no-samples.sample
path = shared_datadir / "example-no-samples.bgen"
ds = read_bgen(path)
# Check the sample IDs are generated
assert ds["sample/id"].values.tolist() == [
"sample_0",
"sample_1",
"sample_2",
"sample_3",
"sample_4",
]