Source code for genomedata

#!/usr/bin/env python

from __future__ import absolute_import, division, print_function

"""
Genomedata is a module to store and access large-scale functional
genomics data in a format which is both space-efficient and allows
efficient random-access.

Under the surface, genomedata is implemented as a collection of HDF5 files,
but genomedata provides a transparent interface to interact with your
underlying data without having to worry about the mess of repeatedly parsing
large data files or having to keep them in memory for random access.

Copyright 2009-2014 Michael M. Hoffman <michael.hoffman@utoronto.ca>

"""


from pkg_resources import get_distribution
import sys

from numpy import square
from path import Path

from ._hdf5 import _HDF5DirectoryChromosomeList, _HDF5SingleFileChromosomeList
from ._bigwig import _BigWigChromosomeList, is_big_wig

# Allow raising a DistributionNotFound error if somehow genomedata was not
# installed
__version__ = get_distribution(__name__.split('.')[0]).version

FORMAT_VERSION = 1


# If there are fewer than this many chomosomes, the default behavior
# is to implement the Genomedata archive as a directory. If there are
# more than this many, it will be a single file by default.
FILE_MODE_CHROMS = 100


[docs]class Genome(object): """The root level of the genomedata object hierarchy. If you use this as a context manager, it will keep track of any open Chromosomes and close them (and the Genome object) for you later when the context is left:: with Genome("/path/to/genomedata") as genome: chromosome = genome["chr1"] [...] If not used as a context manager, you are responsible for closing the Genomedata archive once you are done: >>> genome = Genome("/path/to/genomedata") >>> chromosome = genome["chr1"] [...] >>> genome.close() """
[docs] def __init__(self, filename, *args, **kwargs): r"""Create a Genome object from a genomedata archive. :param filename: the root of the Genomedata object hierarchy. This can either be a .genomedata file that contains the entire genome or a directory containing multiple chromosome files. :type filename: string :param *args: args passed on to open_file if single file or to Chromosome if directory :param **kwargs: keyword args passed on to open_file if single file or to Chromosome if directory Example: >>> genome = Genome("./genomedata.ctcf.pol2b/") >>> genome Genome("./genomedata.ctcf.pol2b/") [...] >>> genome.close() >>> genome = Genome("./cat_chipseq.genomedata", mode="r") [...] >>> genome.close() """ # so that Genome.__del__() won't throw an exception if there # is an error during __init__() self._isopen = False self.filename = filename self.args = args self.kwargs = kwargs # Process path for internal use, following symbolic links # until we get to the eventual file or directory filepath = Path(filename).expand() while filepath.islink(): filepath = filepath.readlinkabs() if not filepath.exists(): raise IOError("Could not find Genomedata archive: %s" % filepath) # If it's a file we are opening if filepath.isfile(): # Check if the file type is bigWig # NB: Could consider checking by filename extension only if is_big_wig(filepath): self._chromosomes = _BigWigChromosomeList(filepath) # Otherwise assume and attempt to open the Genomedata file else: self._chromosomes = _HDF5SingleFileChromosomeList( filepath, *args, **kwargs) elif filepath.isdir(): # Genomedata directory self._chromosomes = _HDF5DirectoryChromosomeList(filepath, *args, **kwargs) else: raise ValueError("Genomedata archive must be file or directory: %s" % filepath) # a kind of refcounting for context managers self._context_count = 0 self._isopen = True format_version = self.format_version if format_version is not None and format_version > FORMAT_VERSION: raise NotImplementedError("This archive has format version %s," " but the installed Genomedata software" " unly supports format version %d" % (format_version, FORMAT_VERSION))
[docs] def __iter__(self): """Return next chromosome, in sorted order, with memoization. Example:: for chromosome in genome: print chromosome.name for supercontig, continuous in chromosome.itercontinuous(): [...] """ assert self.isopen for chromosome in self._chromosomes: yield chromosome
[docs] def __getitem__(self, name): """Return a reference to a chromosome of the given name. :param name: name of the chromosome (e.g. "chr1" if chr1.genomedata is a file in the Genomedata archive or chr1 is a top-level group in the single-file Genomedata archive) :type name: string :returns: :class:`Chromosome` Example: >>> genome["chrX"] <Chromosome 'chrX', file='/path/to/genomedata/chrX.genomedata'> >>> genome["chrZ"] KeyError: 'Could not find chromosome: chrZ' """ assert self.isopen return self._chromosomes[name]
def __contains__(self, name): """Return if there is a chromosome of the given name :param name: name of the chromosome (e.g. "chr1" if chr1.genomedata is a file in the Genomedata archive or chr1 is a top-level group in the single-file Genomedata archive) :type name: string :returns: boolean Example: >>> "chrX" in Genome True >>> "chrZ" in Genome False """ try: self[name] except KeyError: return False # Couldn't find chromosome else: return True # No errors opening chromosome def __enter__(self): assert self.isopen self._context_count += 1 return self def __exit__(self, exc_type, exc_value, exc_tb): # XXX: this and __enter__ have potential race conditions, if # _context_count is changed simultaneously by different # threads. should be synchronized self._context_count -= 1 if self._context_count == 0: self.close() def __del__(self): if self.isopen: self.close() def _create_chromosome(self, name, mode): name = "_".join(name.split()) # Remove any whitespace res = self._chromosomes.create(name) return res
[docs] def close(self): """Close this Genomedata archive and any open chromosomes If the Genomedata archive is a directory, this closes all open chromosomes. If it is a single file, this closes that file. This should only be used if Genome is not a context manager (see :class:`Genome`). The behavior is undefined if this is called while Genome is being used as a context manager. """ assert self.isopen self._chromosomes.close() self._isopen = False
def __repr__(self): items = ["'%s'" % self.filename] if self.args: items.append("*%r" % self.args) if self.kwargs: items.append("**%r" % self.kwargs) return "Genome(%s)" % ", ".join(items) def __str__(self): return repr(self)
[docs] def erase_data(self, trackname): """Erase all data for the given track across all chromosomes The Genome object must have been created with :param mode:="r+". Behavior is undefined if this is not the case. Currently sets the dirty bit, which can only be erased with genomedata-close-data """ assert self.isopen for chromosome in self: chromosome._erase_data(trackname)
[docs] def add_track_continuous(self, trackname): """Add a new track The Genome object must have been created with :param mode:="r+". Behavior is undefined if this is not the case. Currently sets the dirty bit, which can only be erased with genomedata-close-data """ # TODO: Add sensible error message for non HDF5 formats if self.format_version < 1: raise NotImplementedError("""Adding tracks is only supported \ for archives created with Genomedata version 1.2.0 or later.""") self._chromosomes.add_trackname(trackname) # Open continuous data regions per chromosome for chromosome in self: chromosome._add_track_continuous()
@property def isopen(self): """Return a boolean indicating if the Genome is still open""" return self._isopen @property def tracknames_continuous(self): """Return a list of the names of all data tracks stored.""" assert self.isopen return self._chromosomes.tracknames_continuous()
[docs] def index_continuous(self, trackname): """Return the column index of the trackname in the continuous data. :param trackname: name of data track :type trackname: string :returns: integer This is used for efficient indexing into continuous data: >>> col_index = genome.index_continuous("sample_track") >>> data = genome["chr3"][100:150, col_index] although for typical use, the track can be indexed directly: >>> data = genome["chr3"][100:150, "sample_track"] """ try: return self.tracknames_continuous.index(trackname) except ValueError: raise KeyError("Could not find continuous track: %s" % trackname)
@property def num_tracks_continuous(self): """Returns the number of continuous data tracks.""" try: return len(self.tracknames_continuous) except AttributeError: return 0 @property def format_version(self): """Genomedata format version None means there are no chromosomes in it already or there is no information available. """ assert self.isopen return self._chromosomes._format_version() # XXX: should memoize these with an off-the-shelf decorator @property def mins(self): """Return the minimum value for each track. :returns: numpy.array """ return self._chromosomes.mins @property def maxs(self): """Return a vector of the maximum value for each track. :returns: numpy.array """ return self._chromosomes.maxs @property def sums(self): """Return a vector of the sum of the values for each track. :returns: numpy.array """ return self._chromosomes.sums @property def sums_squares(self): """Return a vector of the sum of squared values for each track's data. :returns: numpy.array """ return self._chromosomes.sums_squares @property def num_datapoints(self): """Return the number of datapoints in each track. :returns: a numpy.array vector with an entry for each track. """ return self._chromosomes.num_datapoints @property def means(self): """Return a vector of the mean value of each track. :returns: numpy.array """ return self.sums / self.num_datapoints @property def vars(self): """Return a vector of the variance in the data for each track. :returns: numpy.array """ # this is an unstable way of calculating the variance, # but it should be good enough # Numerical Recipes in C, Eqn 14.1.7 # XXX: best would be to switch to the pairwise parallel method # (see Wikipedia) return (self.sums_squares / self.num_datapoints) - \ square(self.means)
def main(argv=sys.argv[1:]): pass if __name__ == "__main__": sys.exit(main())