-
Notifications
You must be signed in to change notification settings - Fork 35
Early scalability demonstrations #345
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Following up the suggestion to run a pairwise distance computation on the Ag1000G phase 2 data, with some more details on the input data. The data are available in scikit-allel-style zarrs in GCS. The full set of SNP calls and genotypes is at this path: import fsspec
import zarr
store = fsspec.get_mapper('gs://ag1000g-release/phase2.AR1/variation/main/zarr/all/ag1000g.phase2.ar1')
callset_snps = zarr.open_consolidated(store=store) These data are grouped by chromosome arm. A demonstration for just a single chromosome arm would be sufficient. 2R is the largest, and so has the most SNPs. Alternatively whole genome is also interesting if equally convenient to compute. In case it helps, here are the genotype calls for 2R:
One route to bring these data into sgkit is via the open_vcfzarr function. Another route may be to use the data that @tomwhite has previously converted to the sgkit zarr representation. Either would be fine. @aktech I'll follow up by email to get you access to our jupyterhub service on Google Cloud so you can try this on a dask kubernetes cluster if useful. |
FYI @alimanfoo this notebook runs PCA on the UKB PLINK data using a 50 node 8 vCPU cluster (n1-standard-8). It takes ~30 seconds to run on a 108,621 variant x 141,070 sample (~60GB) alt allele count matrix. I chose that size based on what UKB did for their own PCA calculations. For comparison, running an axis-wise reduce function takes about 3 seconds (e.g. get the call rate for all variants). I also tried this on a 20 node cluster and saw the same PCA step take 50 seconds, so the operations don't scale perfectly -- the 50 node cluster should have only taken 20 seconds otherwise. I originally tried this with tsqr and non-random PCA but that kept blowing past memory limits, even on "highmem" instances. I suspect something in tsqr produces n_samples x n_samples results for chunks that include all samples but relatively tiny numbers of variants. I think it's an awkward fit for arrays this shape regardless so I plan on getting comfortable with using randomized PCA (as in the notebook). |
@tomwhite Can you share, where can I get this? or would you suggest I use |
@aktech I think the simplest thing for the moment is to use Note that |
Oh, alright. Thanks @tomwhite I'll give it a go. |
BTW the sgkit notebooks I'm working on are here: https://github.com/tomwhite/shiny-train/tree/sgkit/notebooks/gwss. (Very much WIP.) |
The contents of
By this did you mean modifying the function |
Is it possible to increase that?
Yes, since the input is zarr you should be able to call |
I am not sure, its malariagen's datalab. I am now doing this on my cluster, where I have enough disk space. I have pulled the contents of read_vcfzarr(path)
---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
<ipython-input-23-b318af759c7a> in <module>
----> 1 read_vcfzarr(path)
~/sgkit/sgkit/io/vcfzarr_reader.py in read_vcfzarr(path)
44
45 # Index the contig names
---> 46 variants_chrom = da.from_zarr(vcfzarr["variants/CHROM"]).astype(str)
47 variant_contig, variant_contig_names = encode_array(variants_chrom.compute())
48 variant_contig = variant_contig.astype("int16")
~/.local/lib/python3.7/site-packages/zarr/hierarchy.py in __getitem__(self, item)
347 synchronizer=self._synchronizer)
348 else:
--> 349 raise KeyError(item)
350
351 def __setitem__(self, item, value):
KeyError: 'variants/CHROM' Also, after I get through the error above, I am not sure which part of the xarray dataset I need to use for pairwise distance calculation, as in the pairwise distance needs a 2D matrix, is there a documentation somewhere which I can refer to for the same? |
Was there an example in one of @alimanfoo's notebooks for this? Maybe in the pairwise distance blogpost? |
I didn't find any notebooks neither in blog post nor in the prototype. |
OK. Well, we'll be comparing the genotypes anyway - I'm not sure exactly how Alistair uses pairwise distance between genotypes, but I don't think it'll matter too much for a simple benchmark. So, basically we're getting the distance between pairs of samples by comparing their genotype arrays. |
@tomwhite I am getting the same error, while trying to read the data: ---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
<ipython-input-5-7e129b306df9> in <module>
----> 1 read_vcfzarr("shared/sgkit_data/2R")
~/sgkit/sgkit/io/vcfzarr_reader.py in read_vcfzarr(path)
42
43 # don't fix strings since it requires a pass over the whole dataset
---> 44 return _vcfzarr_to_dataset(vcfzarr, fix_strings=False)
45
46
~/sgkit/sgkit/io/vcfzarr_reader.py in _vcfzarr_to_dataset(vcfzarr, contig, variant_contig_names, fix_strings)
168 if contig is None:
169 # Get the contigs from variants/CHROM
--> 170 variants_chrom = da.from_zarr(vcfzarr["variants/CHROM"]).astype(str)
171 variant_contig, variant_contig_names = encode_array(variants_chrom.compute())
172 variant_contig = variant_contig.astype("int16")
~/.local/lib/python3.7/site-packages/zarr/hierarchy.py in __getitem__(self, item)
347 synchronizer=self._synchronizer)
348 else:
--> 349 raise KeyError(item)
350
351 def __setitem__(self, item, value):
KeyError: 'variants/CHROM' |
Off the top of my head, you need to set |
I guess you mean to use The path is: ---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
<ipython-input-46-8adb2c67c900> in <module>
----> 1 vcfzarr_to_zarr(path, "output/", contigs=["2R"], grouped_by_contig=True, consolidated=True)
~/sgkit/sgkit/io/vcfzarr_reader.py in vcfzarr_to_zarr(input, output, contigs, grouped_by_contig, consolidated, tempdir)
74
75 if consolidated:
---> 76 vcfzarr = zarr.open_consolidated(str(input), mode="r")
77 else:
78 vcfzarr = zarr.open_group(str(input), mode="r")
~/.local/lib/python3.7/site-packages/zarr/convenience.py in open_consolidated(store, metadata_key, mode, **kwargs)
1172
1173 # setup metadata sotre
-> 1174 meta_store = ConsolidatedMetadataStore(store, metadata_key=metadata_key)
1175
1176 # pass through
~/.local/lib/python3.7/site-packages/zarr/storage.py in __init__(self, store, metadata_key)
2672
2673 # retrieve consolidated metadata
-> 2674 meta = json_loads(store[metadata_key])
2675
2676 # check format of consolidated metadata
~/.local/lib/python3.7/site-packages/zarr/storage.py in __getitem__(self, key)
824 return self._fromfile(filepath)
825 else:
--> 826 raise KeyError(key)
827
828 def __setitem__(self, key, value):
KeyError: '.zmetadata' The error seems to be coming from here: zarr.open_consolidated("shared/sgkit_data/", mode="r") |
Did you copy the BTW, if this doesn't work you can also load the genotype data using the approach Alistair outlines in the second comment of this issue. |
Oh I see, didn't realise that's required.
Ah, that's good to know, I thought this step is necessary. I am going to use this method, thanks! |
By the way I am using the following piece of code to run the scalability test: import logging
from sgkit.distance.api import pairwise_distance
import dask.array as da
from dask.distributed import Client
from bokeh.io import output_notebook
import numpy as np
import dask
import zarr
output_notebook()
def setup_logging():
"""Sets up general and dask logger"""
logging_format = "%(asctime)s %(levelname)9s %(lineno)4s %(module)s: %(message)s"
logging.basicConfig(level=logging.INFO, format=logging_format)
logging.info("Logging initialised")
def create_client():
logging.info("Setting dask config")
dask.config.set({'temporary_directory': '/work/aktech/tmp'})
logging.info("Creating dask client")
client = Client()
logging.info(f"Client created: {client}")
def get_data():
logging.info("Getting store object")
sgkit_data = zarr.open_group('/work/aktech/sgkit_data/output/')
gt = sgkit_data['call_genotype']
# Roughly 100 Mb chunks and in the multiples of the current chunk size
gt_da = da.from_zarr(gt, chunks=(524288, 183, -1))
x = gt_da[:, :, 1].T
return x
def main():
setup_logging()
x = get_data()
logging.info(f'The x is: {x}')
logging.info("Starting the pairwise calculation")
out = pairwise_distance(x)
logging.info("Pairwise process complete")
np.save('output.csv', out)
logging.info("All done!")
if __name__ == '__main__':
main() I was reading from GCP bucket directly but it had some ssl errors for a bit (It is working now though), so I shifted to the in disk approach as shown above. I am running this on a machine with 48 cores and 64 GB Memory, and I am getting some memory issues. In particular the following:
There is a potential memory leak somewhere, I don't know what. I am trying to debug that at the moment. @eric-czech Do you have any ideas on what could potentially be leaking memory? |
How many workers do you have? If you have 48 (one per core), then the worker memory limit of 7GB will exceed system memory. Perhaps you could keep reducing the number of workers, giving each more memory, to see if that helps. The input is chunked along the variants dimension (axis 0), then transposed (so it becomes axis 1), then passed to (BTW you've loaded the data now, but here's a notebook for posterity to load the data into sgkit format: https://nbviewer.jupyter.org/github/tomwhite/shiny-train/blob/sgkit/notebooks/gwss/sgkit_import.ipynb) |
I just saw Eric's comment here, saying that whole row blocks need to fit in memory for this implementation. So, because of the transpose, that means whole columns of the input (variants) need to fit in memory. If there are 50 million rows, then that's ~100 column blocks, so ~100 x 100MB = 10GB of memory. One option would be to rechunk to make the sample chunks size smaller (183 -> 18 say). Or try on a machine with fewer CPUs and more memory. |
Although the above error is for 8 workers/8 threads, I tried with 4 workers/4threads too, with same result.
I thought so, but it seems the memory usage is stacking irrespective of number of workers. I also tried with less number of worker and also less threads to give more memory to each worker, but didn't help.
That's true, axis=1 not chunked is faster and yes this is the expectation. The chunking you see in the code above is the result of one of the various things I was trying to make dask happy, I initially started with chunking only on axis=0, then I was getting |
I tried this too, the peak memory usage goes to 50 GB and then the worker restarts and the compute fails eventually.
I guess so, one thing I don't understand is when I use the chunking mentioned in "2.", then also the peak memory is beyond the limits, which doesn't makes sense as the chunk size is only Another suspect I have is the following error:
I guess, if I have chunks size which dask is not happy about, it might rechunk them (along axis=1), as the message above says causing too many chunks in memory. |
@aktech it might be worth using (Resource)Profiler (doc), and maybe also visualize the graph just to validate the assumptions about the data orchestration (doc). Also for you 2nd option (no chunking along axis=1), you could also try to run it with just a single process (either using local scheduler or tune "distributed" one), to make sure that all threads can use the same memory, and for example not "duplicate" objects by "sharing" chunks etc (which could explain the extra memory usage). |
Thanks for all the details @aktech. We know rechunking in Dask can cause unpredictable memory usage, so I would strongly suggest that you use rechunker to rechunk the data on disk, then run the distance computations separately. Otherwise it's not clear if the memory issues are coming from rechunking, or pairwise distance, (or both!). The notebook here shows how to user rechunker on Zarr groups: https://nbviewer.jupyter.org/github/tomwhite/shiny-train/blob/sgkit/notebooks/gwss/sgkit_import.ipynb (near the bottom). As a general strategy I think it would be useful to progressively halve the large dimension (the one that is ~20 million) until you get a computation that succeeds, then from that known good point you can try different worker settings etc, to get a better idea of how the computation scales. |
Yeah, worth doing, I was using in smaller dataset but this one never finished so couldn't see that. I have been using dask diagnostics dashboard so far,
Sure, I can give this a go.
Thanks for the pointer, I'll try this in a bit.
True, the smallest data I had success with was roughly 20% ( |
Also, another thing to note, which I forgot to mention. I recall the chunksize you see in the code above was taken to address the problem of rechunking as well, since its With chunksize which doesn't chunks on axis=1, I always get that warning (irrespective of the chunking on disk, I just tried chunking on disk and it gave the warning: I'll try on single process soon. |
Possible useful data point to demonstrate performance: I microbenchmarked the |
Closing this as it's quite old now |
Uh oh!
There was an error while loading. Please reload this page.
It would be very useful to have some simple demonstrations that using sgkit and the underlying stack we can run some computations over large input datasets, showing we are making real steps towards the goal of interactive analysis of biobank-scale datasets, given a sufficiently powerful HPC/cloud/GPU cluster.
The main purpose of this is to be able to give some highlights in a short report on what has been achieved from the sgkit development work to date.
By "demonstration" I mean just a simple benchmark result.
E.g., being able to say that we can run a PCA on UK biobank-sized data using a N node cluster in Google Cloud and compute the result in M minutes.
E.g., being able to say we can run a pairwise distance computation on the MalariaGEN Ag1000G phase 2 data and compute the result in M minutes on a N node CPU cluster in Google Cloud. Even better would be being able to say we can also run the same computation in M' minutes on a N' node GPU cluster.
I.e., I just need a couple of data points here showing proof of concept, I don't need any detailed performance or scalability benchmarking.
Raising this issue to share suggestions for computations/datasets to use, and post back any results.
The text was updated successfully, but these errors were encountered: