Source code for pyim.align.pipelines.shear_splink

"""Module containing the ShearSplink pipelines."""

import logging
from pathlib import Path

from cutadapt import seqio
import pandas as pd
import pysam

from pyim.external.cutadapt import cutadapt, cutadapt_summary
from pyim.external.bowtie2 import bowtie2
from pyim.external.util import flatten_arguments
from pyim.model import Insertion
from pyim.util.path import shorten_path, extract_suffix

from .base import Pipeline, register_pipeline
from ..util import extract_insertions

DEFAULT_OVERLAP = 3
DEFAULT_ERROR_RATE = 0.1


[docs]class ShearSplinkPipeline(Pipeline): """ShearSplink pipeline. Analyzes (single-end) sequencing data that was prepared using the ShearSplink protocol. Sequence reads are expected to have the following structure:: [Transposon][Genomic][Linker] Here, ``transposon`` refers to the flanking part of the transposon sequence, ``linker`` to the flanking linker sequence and ``genomic`` to the genomic DNA located in between (which varies per insertion). The linker sequence is optional and may be omitted if the linker is not included in sequencing. The pipeline essentially performs the following steps: - If contaminants are provided, sequence reads are filtered (using Cutadapt) for the contaminant sequences. - The remaining reads are trimmed to remove the transposon and linker sequences, leaving only genomic sequences. Reads without the transposon/linker sequences are dropped, as we cannot be certain of their origin. (Note that the linker is optional and is only trimmed if a linker is given). - The genomic reads are aligned to the reference genome. - The resulting alignment is used to identify insertions. Note that this pipeline does **NOT** support multiplexed datasets (which is the default output of the ShearSplink protocol). For multiplexed datasets, use the ``MultiplexedShearSplinkPipeline``. Parameters ---------- transposon_path : Path Path to the (flanking) transposon sequence (fasta). bowtie_index_path : Path Path to the bowtie index. linker_path : Path Path to the linker sequence (fasta). contaminant_path : Path Path to file containing contaminant sequences (fasta). If provided, sequences are filtered for these sequences before extracting genomic sequences for alignment. min_length : int Minimum length for genomic reads to be kept for alignment. min_support : int Minimum support for insertions to be kept in the final output. min_mapq : int Minimum mapping quality of alignments to be used for identifying insertions. merge_distance : int Maximum distance within which insertions are merged. Used to merge insertions that occur within close vicinity, which is typically due to slight variations in alignments. bowtie_options : Dict[str, Any] Dictionary of extra options for Bowtie. min_overlaps : Dict[str, int] Minimum overlap required to recognize the transposon, linker and contaminant sequences (see Cutadapts documentation for more information). Keys of the dictionary indicate to which sequence the overlap corresponds and should be one of the following: ``linker``, ``transposon`` or ``contaminant``. error_rates : Dict[str, float] Maximum error rate to use when recognizing transposon, linker and contaminant sequences (see Cutadapts documentation for more information). Keys should be the same as for ``min_overlaps``. """ def __init__(self, transposon_path, bowtie_index_path, linker_path=None, contaminant_path=None, min_length=15, min_support=2, min_mapq=23, merge_distance=None, bowtie_options=None, min_overlaps=None, error_rates=None): super().__init__() self._transposon_path = transposon_path self._linker_path = linker_path self._contaminant_path = contaminant_path self._index_path = bowtie_index_path self._min_length = min_length self._min_support = min_support self._min_mapq = min_mapq self._merge_distance = merge_distance self._bowtie_options = bowtie_options or {} self._min_overlaps = min_overlaps or {} self._error_rates = error_rates or {} @classmethod def configure_args(cls, parser): cls._setup_base_args(parser, paired=False) parser.description = 'ShearSplink pipeline' # Paths to various sequences. seq_options = parser.add_argument_group('Sequences') seq_options.add_argument( '--transposon', type=Path, required=True, help='Fasta file containing the transposon sequence.') seq_options.add_argument( '--contaminants', type=Path, default=None, help='Fasta file containing contaminant sequences.') seq_options.add_argument( '--linker', type=Path, default=None, help='Fasta file containing the linker sequence.') # Trimming options (used for cutadapt). trim_options = parser.add_argument_group('Trimming') trim_options.add_argument( '--min_length', type=int, default=15, help='Minimum length for (trimmed) genomic sequences.') trim_options.add_argument( '--contaminant_error', default=0.1, type=float, help='Maximum error rate for matching contaminants.') trim_options.add_argument( '--contaminant_overlap', default=3, type=int, help='Minimum overlap for matching contaminants.') trim_options.add_argument( '--transposon_error', default=0.1, type=float, help='Maximum error rate for matching the transposon.') trim_options.add_argument( '--transposon_overlap', default=3, type=int, help='Minimum overlap for matching the transposon.') trim_options.add_argument( '--linker_error', default=0.1, type=float, help='Maximum error rate for matching the linker.') trim_options.add_argument( '--linker_overlap', default=3, type=int, help='Minimum overlap for matching the linker.') align_options = parser.add_argument_group('Alignment') align_options.add_argument( '--bowtie_index', type=Path, required=True, help='Bowtie2 index to use for alignment.') align_options.add_argument( '--local', default=False, action='store_true', help='Use local alignment.') ins_options = parser.add_argument_group('Insertions') ins_options.add_argument( '--min_mapq', type=int, default=23, help=('Minimum mapping quality for reads ' 'used to identify insertions.')) ins_options.add_argument( '--merge_distance', type=int, default=None, help=('Distance within which insertions (from same ' 'sample) are merged.')) ins_options.add_argument( '--min_support', type=int, default=2, help='Minimum support for insertions.') @classmethod def _extract_args(cls, args): bowtie_options = {'--local': args.local} min_overlaps = { 'contaminant': args.contaminant_overlap, 'transposon': args.transposon_overlap, 'linker': args.linker_overlap } error_rates = { 'contaminant': args.contaminant_error, 'transposon': args.transposon_error, 'linker': args.linker_error } return dict( transposon_path=args.transposon, bowtie_index_path=args.bowtie_index, linker_path=args.linker, contaminant_path=args.contaminants, min_length=args.min_length, min_support=args.min_support, min_mapq=args.min_mapq, merge_distance=args.merge_distance, bowtie_options=bowtie_options, min_overlaps=min_overlaps, error_rates=error_rates) def run(self, read_path, output_dir, read2_path=None): if read2_path is not None: raise ValueError('Pipeline does not support paired-end data') logger = logging.getLogger() # Ensure output dir exists. output_dir.mkdir(exist_ok=True, parents=True) # Extract genomic sequences and align to reference. genomic_path = self._extract_genomic(read_path, output_dir, logger) alignment_path = self._align(genomic_path, output_dir, logger) # Extract insertions from bam file. bam_file = pysam.AlignmentFile(str(alignment_path)) try: insertions = extract_insertions( iter(bam_file), func=_process_alignment, merge_dist=self._merge_distance, min_mapq=self._min_mapq, min_support=self._min_support, logger=logger) finally: bam_file.close() # Write insertions to output file. insertion_path = output_dir / 'insertions.txt' ins_frame = Insertion.to_frame(insertions) ins_frame.to_csv(str(insertion_path), sep='\t', index=False) def _extract_genomic(self, read_path, output_dir, logger): """Extracts the genomic part of sequence reads.""" # Log parameters if logger is not None: logger.info('Extracting genomic sequences') logger.info(' %-18s: %s', 'Transposon', shorten_path(self._transposon_path)) logger.info(' %-18s: %s', 'Linker', shorten_path(self._linker_path)) logger.info(' %-18s: %s', 'Contaminants', shorten_path(self._contaminant_path)) logger.info(' %-18s: %s', 'Minimum length', self._min_length) # Get suffix to use for intermediate/genomic files. suffix = extract_suffix(read_path) # Track interim files for cleaning. interim_files = [] if self._contaminant_path is not None: # Remove contaminants. contaminant_out_path = output_dir / ( 'trimmed_contaminant' + suffix) contaminant_opts = { '-g': 'file:' + str(self._contaminant_path), '--discard-trimmed': True, '-O': self._min_overlaps.get('contaminant', DEFAULT_OVERLAP), '-e': self._error_rates.get('contaminant', DEFAULT_ERROR_RATE) } process = cutadapt(read_path, contaminant_out_path, contaminant_opts) if logger is not None: summary = cutadapt_summary(process.stdout, padding=' ') logger.info('Trimmed contaminant sequences' + summary) interim_files.append(contaminant_out_path) else: contaminant_out_path = read_path if self._linker_path is not None: # Remove linker. linker_out_path = output_dir / ('trimmed_linker' + suffix) linker_opts = { '-a': 'file:' + str(self._linker_path), '--discard-untrimmed': True, '-O': self._min_overlaps.get('linker', DEFAULT_OVERLAP), '-e': self._error_rates.get('linker', DEFAULT_ERROR_RATE) } process = cutadapt(contaminant_out_path, linker_out_path, linker_opts) if logger is not None: summary = cutadapt_summary(process.stdout, padding=' ') logger.info('Trimmed linker sequence' + summary) interim_files.append(linker_out_path) else: linker_out_path = contaminant_out_path # Trim transposon and check minimum length. transposon_opts = { '-g': 'file:' + str(self._transposon_path), '--discard-untrimmed': True, '-O': self._min_overlaps.get('transposon', DEFAULT_OVERLAP), '-e': self._error_rates.get('transposon', DEFAULT_ERROR_RATE) } if self._min_length is not None: transposon_opts['--minimum-length'] = self._min_length genomic_path = output_dir / ('genomic' + suffix) process = cutadapt(linker_out_path, genomic_path, transposon_opts) if logger is not None: summary = cutadapt_summary(process.stdout, padding=' ') logger.info('Trimmed transposon sequence and filtered ' 'for length' + summary) # Clean-up interim files. for file_path in interim_files: file_path.unlink() return genomic_path def _align(self, read_path, output_dir, logger): """Aligns genomic reads to the reference genome using Bowtie.""" # Log parameters if logger is not None: logger.info('Aligning to reference') logger.info(' %-18s: %s', 'Reference', shorten_path(self._index_path)) logger.info(' %-18s: %s', 'Bowtie options', flatten_arguments(self._bowtie_options)) alignment_path = output_dir / 'alignment.bam' bowtie2( [read_path], index_path=self._index_path, output_path=alignment_path, options=self._bowtie_options, verbose=True) return alignment_path
register_pipeline(name='shearsplink', pipeline=ShearSplinkPipeline) def _process_alignment(aln): """Analyzes an alignment to determine the tranposon/linker breakpoints.""" ref = aln.reference_name if aln.is_reverse: transposon_pos = aln.reference_end linker_pos = aln.reference_start strand = -1 else: transposon_pos = aln.reference_start linker_pos = aln.reference_end strand = 1 return (ref, transposon_pos, strand), linker_pos
[docs]class MultiplexedShearSplinkPipeline(ShearSplinkPipeline): """ShearSplink pipeline supporting multiplexed reads. Analyzes multiplexed (single-end) sequencing data that was prepared using the ShearSplink protocol. Sequence reads are expected to have the following structure:: [Barcode][Transposon][Genomic][Linker] Here, the ``transposon``, ``genomic`` and ``linker`` sequences are the same as for the ``ShearSplinkPipeline``. The ``barcode`` sequence is an index that indicates which sample the read originated for. Barcode sequences should be provided using the ``barcode_path`` argument. The optional ``barcode_mapping`` argument can be used to map barcodes to sample names. Parameters ---------- transposon_path : Path Path to the (flanking) transposon sequence (fasta). bowtie_index_path : Path Path to the bowtie index. barcode_path : Path to barcode sequences (fasta). barcode_mapping : Path Path to a tsv file specifying a mapping from barcodes to sample names. Should contain ``sample`` and ``barcode`` columns. linker_path : Path Path to the linker sequence (fasta). contaminant_path : Path Path to file containing contamintant sequences (fasta). If provided, sequences are filtered for these sequences before extracting genomic sequences for alignment. min_length : int Minimum length for genomic reads to be kept for alignment. min_support : int Minimum support for insertions to be kept in the final output. min_mapq : int Minimum mapping quality of alignments to be used for identifying insertions. merge_distance : int Maximum distance within which insertions are merged. Used to merge insertions that occur within close vicinity, which is typically due to slight variations in alignments. bowtie_options : Dict[str, Any] Dictionary of extra options for Bowtie. min_overlaps : Dict[str, int] Minimum overlap required to recognize the transposon, linker and contamintant sequences (see Cutadapts documentation for more information). Keys of the dictionary indicate to which sequence the overlap corresponds and should be one of the following: ``linker``, ``transposon`` or ``contaminant``. error_rates : Dict[str, float] Maximum error rate to use when recognizing transposon, linker and contamintant sequences (see Cutadapts documentation for more information). Keys should be the same as for ``min_overlaps``. """ def __init__(self, transposon_path, bowtie_index_path, barcode_path, barcode_mapping=None, linker_path=None, contaminant_path=None, min_length=15, min_support=2, min_mapq=23, merge_distance=0, bowtie_options=None, min_overlaps=None, error_rates=None): super().__init__( transposon_path=transposon_path, bowtie_index_path=bowtie_index_path, linker_path=linker_path, contaminant_path=contaminant_path, min_length=min_length, min_support=min_support, min_mapq=min_mapq, merge_distance=merge_distance, bowtie_options=bowtie_options, min_overlaps=min_overlaps, error_rates=error_rates) self._barcode_path = barcode_path self._barcode_mapping = barcode_mapping @classmethod def configure_args(cls, parser): super().configure_args(parser) parser.add_argument('--barcodes', required=True, type=Path) parser.add_argument( '--barcode_mapping', required=False, type=Path, default=None) @classmethod def _extract_args(cls, args): arg_dict = super()._extract_args(args) if args.barcode_mapping is not None: map_df = pd.read_csv(args.barcode_mapping, sep='\t') arg_dict['barcode_mapping'] = dict( zip(map_df['barcode'], map_df['sample'])) else: arg_dict['barcode_mapping'] = None arg_dict['barcode_path'] = args.barcodes return arg_dict def run(self, read_path, output_dir, read2_path=None): if read2_path is not None: raise ValueError('Pipeline does not support paired-end data') logger = logging.getLogger() # Ensure output dir exists. output_dir.mkdir(exist_ok=True, parents=True) # Extract genomic sequences and align to reference. genomic_path = self._extract_genomic(read_path, output_dir, logger) alignment_path = self._align(genomic_path, output_dir, logger) # Map reads to specific barcodes/samples. logger.info('Extracting barcode/sample mapping') logger.info(' %-18s: %s', 'Barcodes', shorten_path(self._barcode_path)) read_map = self._get_barcode_mapping(read_path) # Extract insertions from bam file. bam_file = pysam.AlignmentFile(str(alignment_path)) try: insertions = extract_insertions( iter(bam_file), func=_process_alignment, group_func=lambda aln: read_map.get(aln.query_name, None), merge_dist=self._merge_distance, min_mapq=self._min_mapq, min_support=self._min_support, logger=logger) finally: bam_file.close() # Write insertions to output file. insertion_path = output_dir / 'insertions.txt' ins_frame = Insertion.to_frame(insertions) ins_frame.to_csv(str(insertion_path), sep='\t', index=False) def _get_barcode_mapping(self, read_path): # Read barcode sequences. with seqio.open(str(self._barcode_path)) as barcode_file: barcodes = list(barcode_file) # Extract read --> barcode mapping. with seqio.open(str(read_path)) as reads: return _extract_barcode_mapping(reads, barcodes, self._barcode_mapping)
register_pipeline( name='shearsplink-multiplexed', pipeline=MultiplexedShearSplinkPipeline) def _extract_barcode_mapping(reads, barcodes, barcode_mapping=None): # Create barcode/sample dict. barcode_dict = {bc.name: bc.sequence for bc in barcodes} if barcode_mapping is not None: barcode_dict = {sample: barcode_dict[barcode] for barcode, sample in barcode_mapping.items()} # Build mapping. mapping = {} for read in reads: # Check each barcode for match in read. matched = [k for k, v in barcode_dict.items() if v in read.sequence] if len(matched) == 1: # Record single matches. name = read.name.split()[0] mapping[name] = matched[0] elif len(matched) > 1: logging.warning('Skipping %s due to multiple matching barcodes', read.name.split()[0]) return mapping