pybedtools.parallel.parallel_apply¶
-
pybedtools.parallel.
parallel_apply
(orig_bedtool, method, genome=None, genome_fn=None, method_args=None, method_kwargs=None, shuffle_kwargs=None, shuffle=True, reduce_func=None, processes=1, sort=False, _orig_pool=None, iterations=1000, debug=False, report_iterations=False)[source]¶ Call an arbitrary BedTool method many times in parallel.
An example use-case is to generate a null distribution of intersections, and then compare this to the actual intersections.
Important: due to a known file handle leak in BedTool.__len__, it’s best to simply check the number of lines in the file, as in the below function. This works because BEDTools programs strip any non-interval lines in the results.
>>> # set up example BedTools >>> a = pybedtools.example_bedtool('a.bed') >>> b = pybedtools.example_bedtool('b.bed')
>>> # Method of `a` to call: >>> method = 'intersect'
>>> # Kwargs provided to `a.intersect` each iteration >>> method_kwargs = dict(b=b, u=True)
>>> # Function that will be called on the results of >>> # `a.intersect(**method_kwargs)`. >>> def reduce_func(x): ... return sum(1 for _ in open(x.fn))
>>> # Create a small artificial genome for this test (generally you'd >>> # use an assembly name, like "hg19"): >>> genome = dict(chr1=(0, 1000))
>>> # Do 10 iterations using 1 process for this test (generally you'd >>> # use 1000+ iterations, and as many processes as you have CPUs) >>> results = pybedtools.parallel.parallel_apply(a, method, genome=genome, ... method_kwargs=method_kwargs, iterations=10, processes=1, ... reduce_func=reduce_func, debug=True, report_iterations=True)
>>> # get results >>> print(list(results)) [2, 1, 2, 1, 1, 1, 1, 1, 1, 2]
>>> # We can compare this to the actual intersection: >>> reduce_func(a.intersect(**method_kwargs)) 3
Alternatively, we could use the
a.jaccard
method, which already does the reduction to a dictionary. However, the Jaccard method requires the input to be sorted. Here, we specifysort=True
to sort each shuffled BedTool before calling itsjaccard
method.>>> from pybedtools.parallel import parallel_apply >>> a = pybedtools.example_bedtool('a.bed') >>> results = parallel_apply(a, method='jaccard', method_args=(b,), ... genome=genome, iterations=3, processes=1, sort=True, debug=True) >>> for i in results: ... print(sorted(i.items())) [('intersection', 101), ('jaccard', 0.226966), ('n_intersections', 1), ('union-intersection', 445)] [('intersection', 20), ('jaccard', 0.0296296), ('n_intersections', 1), ('union-intersection', 675)] [('intersection', 45), ('jaccard', 0.0725806), ('n_intersections', 1), ('union-intersection', 620)]
- Parameters
orig_bedtool : BedTool
method : str
The method of
orig_bedtool
to runmethod_args : tuple
Passed directly to getattr(orig_bedtool, method)()
method_kwargs : dict
Passed directly to getattr(orig_bedtool, method)()
shuffle : bool
If True, then
orig_bedtool
will be shuffled at each iteration and that shuffled version’smethod
will be called withmethod_args
andmethod_kwargs
.shuffle_kwargs : dict
If
shuffle
is True, these are passed toorig_bedtool.shuffle()
. You do not need to pass the genome here; that’s handled separately by thegenome
andgenome_fn
kwargs.iterations : int
Number of iterations to perform
genome : string or dict
If string, then assume it is the assembly name (e.g., hg19) and get a dictionary of chromsizes for that assembly, then converts to a filename.
genome_fn : str
Mutually exclusive with
genome
;genome_fn
must be an existing filename with the chromsizes. Use thegenome
kwarg instead if you’d rather supply an assembly or dict.reduce_func : callable
Function or other callable object that accepts, as its only argument, the results from
orig_bedtool.method()
. For example, if you care about the number of results, then you can usereduce_func=len
.processes : int
Number of processes to run. If
processes=1
, then multiprocessing is not used (making it much easier to debug). This argument is ignored if_orig_pool
is provided.sort : bool
If both
shuffle
andsort
are True, then the shuffled BedTool will then be sorted. Use this ifmethod
requires sorted input._orig_pool : multiprocessing.Pool instance
If provided, uses
_orig_pool
instead of creating one. In this case,processes
will be ignored.debug : bool
If True, then use the current iteration index as the seed to shuffle.
report_iterations : bool
If True, then report the number of iterations to stderr.