Source code for genopandas.core.frame

"""Dataframe-related functions/classes."""

from collections import OrderedDict
import math
import re

from natsort import natsorted
import numpy as np
import pandas as pd

from .tree import GenomicIntervalTree


[docs]class GenomicDataFrame(pd.DataFrame): """DataFrame with fast indexing by genomic position. Requires columns 'chromosome', 'start' and 'end' to be present in the DataFrame, as these columns are used for indexing. Examples -------- Constructing from scratch: >>> df = pd.DataFrame.from_records( ... [('1', 10, 20, 'a'), ('2', 10, 20, 'b'), ('2', 30, 40, 'c')], ... columns=['chromosome', 'start', 'end']) >>> df.set_index(['chromosome', 'start', 'end']) >>> GenomicDataFrame(df) Reading from a GTF file: >>> GenomicDataFrame.from_gtf('/path/to/reference.gtf') Querying by a genomic range: >>> genomic_df.gloc['2'][30:50] """ _internal_names = pd.DataFrame._internal_names + ['_gloc'] _internal_names_set = set(_internal_names) _metadata = ['_chrom_lengths'] def __init__(self, *args, chrom_lengths=None, **kwargs): super().__init__(*args, **kwargs) self._gloc = None self._chrom_lengths = chrom_lengths @property def _constructor(self): return GenomicDataFrame
[docs] @classmethod def from_df(cls, df, **kwargs): """Constructs instance from dataframe containing ranged/positioned data.""" if df.index.nlevels == 3: return cls(df, **kwargs) elif df.index.nlevels == 2: return cls.from_position_df(df, **kwargs) else: raise ValueError('DataFrame should have either two index levels ' '(for positioned data) or three index levels '
'(for ranged data)')
[docs] @classmethod def from_position_df(cls, df, width=1, **kwargs): """Constructs instance from positioned dataframe.""" positions = df.index.get_level_values(1) starts = positions - (width // 2) ends = positions + math.ceil(width / 2) new_df = df.copy() new_df.index = pd.MultiIndex.from_arrays( [df.index.get_level_values(0), starts, ends], names=['chromosome', 'start', 'end'])
return cls(new_df, **kwargs)
[docs] @classmethod def from_records(cls, records, index_col, columns=None, drop_index_col=True, chrom_lengths=None, **kwargs): """Creates a GenomicDataFrame from a structured or record ndarray.""" if not 2 <= len(index_col) <= 3: raise ValueError('index_col should contain 2 entries' ' (for positioned data or 3 entries' ' (for ranged data)') df = super().from_records(records, columns=columns, **kwargs) # Convert chromosome to str. df[index_col[0]] = df[index_col[0]].astype(str) df = df.set_index(index_col, drop=drop_index_col)
return cls.from_df(df, chrom_lengths=chrom_lengths)
[docs] @classmethod def from_csv(cls, file_path, index_col, drop_index_col=True, chrom_lengths=None, **kwargs): """Creates a GenomicDataFrame from a csv file using ``pandas.read_csv``. Parameters ---------- file_path : str Path to file. index_col : List[str] Columns to use for index. Columns should be indicated by their name. Should contain two entries for positioned data, three entries for ranged data. If not given, the first three columns of the DataFrame are used by default. drop_index_col : bool Whether to drop the index columns in the DataFrame (True, default) or to drop them from the dataframe (False). chrom_lengths : Dict[str, int] Chromosome lengths. **kwargs Any extra kwargs are passed to ``pandas.read_csv``. Returns ------- GenomicDataFrame DataFrame containing the file contents. """ if not 2 <= len(index_col) <= 3: raise ValueError('index_col should contain 2 entries' ' (for positioned data or 3 entries' ' (for ranged data)') df = pd.read_csv( file_path, index_col=None, dtype={index_col[0]: str}, **kwargs) df = df.set_index(index_col, drop=drop_index_col)
return cls.from_df(df, chrom_lengths=chrom_lengths)
[docs] @classmethod def from_gtf(cls, gtf_path, filter_=None): """Creates a GenomicDataFrame from a GTF file.""" if (gtf_path.suffixes[-1] == '.gz' and gtf_path.with_suffix('.gz.tbi').exists()): gdf = cls._from_gtf_pysam(gtf_path, filter_=filter_) else: gdf = cls._from_gtf_pandas(gtf_path, filter_=filter_)
return gdf @classmethod def _from_gtf_pandas(cls, gtf_path, filter_=None): """Reads GTF directly using pandas.""" # Read gtf data. data = pd.read_csv( gtf_path, sep='\t', header=None, index_col=None, names=('contig', 'source', 'feature', 'start', 'end', 'score', 'strand', 'frame', 'attributes'), dtype={'contig': str}) # Expand attributes. regex = re.compile(r'(?P<key>\w+) "(?P<value>\w+)"') rows = (dict(regex.findall(el)) for el in data['attributes']) attr_data = pd.DataFrame.from_records(rows) data = pd.concat([data.drop('attributes', axis=1), attr_data], axis=1) data = data.set_index(['contig', 'start', 'end'], drop=False) # Filter rows if filter_ is given. if filter_ is not None: data = data.query(filter_) return cls(data) @classmethod def _from_gtf_pysam(cls, gtf_path, filter_=None): """Reads GTF more efficiently using pysam.""" try: import pysam except ImportError: raise ImportError('Pysam needs to be installed for ' 'reading GTF files') # Parse records into rows. gtf_file = pysam.TabixFile(str(gtf_path), parser=pysam.asGTF()) records = (rec for rec in gtf_file.fetch()) # Filter records if needed. if filter_ is not None: records = (rec for rec in records if filter_(rec)) # Build dataframe. def _record_to_row(record): row = { 'contig': record.contig, 'source': record.source, 'feature': record.feature, 'start': record.start, 'end': record.end, 'score': record.score, 'strand': record.strand, 'frame': record.frame } row.update(dict(record)) return row gdf = cls.from_records( (_record_to_row(rec) for rec in records), index_col=['contig', 'start', 'end'], drop_index_col=False) # Reorder columns to correspond with GTF format. columns = ('contig', 'source', 'feature', 'start', 'end', 'score', 'strand', 'frame') gdf = _reorder_columns(gdf, order=columns) return gdf @property def gloc(self): """Genomic indexer for querying the dataframe.""" if self._gloc is None: self._gloc = GenomicIndexer(self) return self._gloc @property def chromosome_lengths(self): """Chromosome lengths.""" if self._chrom_lengths is None: chrom_lengths = self._calculate_chrom_lengths() self._chrom_lengths = self._order_chrom_lengths(chrom_lengths) return self._chrom_lengths @chromosome_lengths.setter def chromosome_lengths(self, value): if not isinstance(value, OrderedDict): value = self._order_chrom_lengths(value) self._chrom_lengths = value def _calculate_chrom_lengths(self): chromosomes = self.index.get_level_values(0) ends = self.index.get_level_values(2) - 1 lengths = pd.Series(ends).groupby(chromosomes).max() return dict(zip(lengths.index, lengths.values)) @staticmethod def _order_chrom_lengths(chrom_lengths): if not isinstance(chrom_lengths, OrderedDict): order = natsorted(chrom_lengths.keys()) values = (chrom_lengths[k] for k in order) chrom_lengths = OrderedDict(zip(order, values)) return chrom_lengths @property def chromosome_offsets(self): """Chromosome offsets (used when plotting chromosomes linearly).""" # Record offsets in ordered dict. sorted_lengths = list(self.chromosome_lengths.values()) cumsums = np.concatenate([[0], np.cumsum(sorted_lengths)]) offsets = OrderedDict(zip(self.chromosome_lengths.keys(), cumsums[:-1])) # yapf: disable # Add special marker for end. offsets['_END_'] = cumsums[-1] return offsets
[docs] def reset_index(self, level=None, drop=False, col_level=0, col_fill=''): """Mirrors pd.DataFrame.reset_index, but returns a standard DataFrame instead of a GenomicDataFrame, as the (genomic) index is being dropped. """ reset_values = super().reset_index( level=level, drop=drop, col_level=col_level, col_fill=col_fill)
return pd.DataFrame(reset_values)
[docs] def rename_chromosomes(self, mapping): """Returns copy with renamed chromosomes.""" if callable(mapping): map_func = mapping else: map_func = lambda s: mapping.get(s, s) # Map chromosomes. chrom_col, start_col, end_col = self.index.names new_values = self.reset_index() new_values[chrom_col] = new_values[chrom_col].map(map_func) new_values.set_index([chrom_col, start_col, end_col], inplace=True)
return self.__class__(new_values)
[docs]class GenomicIndexer(object): """Base GenomicIndexer class used to index GenomicDataFrames.""" def __init__(self, gdf): self._gdf = gdf self._trees = None def __getitem__(self, item): """Accessor used to query the dataframe by position. If a list of chromosomes is given, the dataframe is subset to the given chromosomes. Note that chromosomes are also re-ordered to adhere to the given order. If a single chromosome is given, a GenomicSlice is returned. This slice object can be sliced to query a specific genomic range. """ if isinstance(item, list): # Add extra index level to enforce uniqueness. augmented = self._gdf.set_index( self._gdf.groupby(level=0).cumcount(), append=True) # Reindex and drop added level. subset = augmented.reindex(index=item, level=0) subset.index = subset.index.droplevel(subset.index.nlevels - 1) # Remove any duplicates. items = list(OrderedDict.fromkeys(item)) # Subset lengths. prev_lengths = subset.chromosome_lengths subset.chromosome_lengths = OrderedDict( (k, prev_lengths[k]) for k in items) # yapf: disable return subset return GenomicSlice(self, chromosome=item) @property def gdf(self): """The indexed DataFrame.""" return self._gdf @property def chromosome(self): """Chromosome values.""" return self._gdf.index.get_level_values(0) @property def chromosomes(self): """Available chromosomes.""" return list(self.chromosome_lengths.keys()) @property def chromosome_lengths(self): """Chromosome lengths.""" return self._gdf.chromosome_lengths @property def chromosome_offsets(self): """Chromosome offsets.""" return self._gdf.chromosome_offsets @property def start(self): """Start positions.""" return self._gdf.index.get_level_values(1) @property def start_offset(self): """Start positions, offset by chromosome lengths.""" return self._offset_positions(self.start) @property def end(self): """End positions.""" return self._gdf.index.get_level_values(2) @property def end_offset(self): """End positions, offset by chromosome lengths.""" return self._offset_positions(self.end) @property def position(self): """Mid positions (between start/end). Should corrrespond with original positions for expanded positioned data. """ return (self.start + self.end) // 2 @property def position_offset(self): """Mid positions (see position), offset by chromosome lengths.""" return self._offset_positions(self.position) def _offset_positions(self, positions): offsets = pd.Series(self.chromosome_offsets) return positions + offsets.loc[self.chromosome].values @property def trees(self): """Trees used for indexing the DataFrame.""" if self._trees is None: self._trees = self._build_trees() return self._trees
[docs] def rebuild(self): """Rebuilds the genomic interval trees."""
self._trees = self._build_trees() def _build_trees(self): tuples = zip(self.chromosome, self.start, self.end, range(self._gdf.shape[0])) return GenomicIntervalTree.from_tuples(tuples)
[docs] def search(self, chromosome, start, end, strict_left=False, strict_right=False): """Searches the DataFrame for rows within given range.""" overlap = self.trees.search( chromosome, start, end, strict_left=strict_left, strict_right=strict_right) indices = [interval[2] for interval in overlap]
return self._gdf.iloc[indices].sort_index() class GenomicSlice(object): """Supporting class used by the GenomicIndexer for slicing chromosomes.""" def __init__(self, indexer, chromosome): self._indexer = indexer self._chromosome = chromosome def __getitem__(self, item): if isinstance(item, slice): subset = self._indexer.search( self._chromosome, start=item.start, end=item.stop) # Subset lengths. subset.chromosome_lengths = OrderedDict( [(self._chromosome, subset.chromosome_lengths[self._chromosome])]) return subset return self._indexer.search(self._chromosome, start=item) def _reorder_columns(df, order): """Reorders dataframe columns, sorting any extra columns alphabetically.""" extra_cols = set(df.columns) - set(order) return df[list(order) + sorted(extra_cols)]