import pysam
from Bio.Seq import Seq
from LAFITE.utils import Vividict
def split_bed_line(entry, extends=False):
"""
split bed line
"""
cells = entry.strip('\n').split("\t")
chrom = start = end = name = score = strand = thick_start = thick_end = item_rgb = block_count = block_sizes = block_starts = None
if len(cells) >= 6:
chrom = cells[0]
start = int(cells[1])
end = int(cells[2])
name = cells[3]
score = cells[4]
strand= cells[5]
if len(cells) >=12:
thick_start = int(cells[6])
thick_end = int(cells[7])
item_rgb = cells[8]
block_count = int(cells[9])
block_sizes = [int(i) for i in cells[10].rstrip(',').split(',')]
block_starts= [int(i) for i in cells[11].rstrip(',').split(',')]
return chrom,start,end,name,score,strand,thick_start,thick_end,item_rgb,block_count,block_sizes,block_starts
def bed_block_to_splicing(start, block_count, block_starts, block_sizes):
"""convert bed block to isoform splicing structure
"""
read_splicing = []
if block_count > 1:
for i in range(block_count-1):
left_sj = start + block_starts[i] + block_sizes[i]
right_sj = start + block_starts[i+1] + 1
read_splicing.extend([left_sj, right_sj])
return read_splicing
def read_grouping(bed_file, fasta):
"""
return junctions with only one long read support
"""
junction_dict = Vividict()
processed_read = Vividict()
genome = pysam.FastaFile(fasta)
with open (bed_file) as f:
for read in f:
chrom, start, end, name, score, strand, thick_start, thick_end, item_rgb, block_count, block_sizes, block_starts = split_bed_line(read) # read the each nanopore read
full_block = [start+1]
if block_count > 1:
for i in range(block_count-1):
left_sj = start + block_starts[i] + block_sizes[i]
right_sj = start + block_starts[i+1] + 1
full_block.extend([left_sj, right_sj])
tmp_sj = (chrom,strand,left_sj,right_sj)
if tmp_sj not in junction_dict[(chrom,strand)]:
if strand =='+':
intron_motif = f'{genome.fetch(chrom, left_sj, left_sj+2)}-{genome.fetch(chrom, right_sj-3, right_sj-1)}'
else:
intron_motif = f'{Seq(genome.fetch(chrom, right_sj-3, right_sj-1)).reverse_complement()}-{Seq(genome.fetch(chrom, left_sj, left_sj+2)).reverse_complement()}'
if intron_motif in ['GT-AG', 'CT-AC', 'GC-AG', 'CT-GC', 'AT-AC', 'GT-AT']:
junction_dict[(chrom,strand)][tmp_sj] = [1, intron_motif, 'canonical']
else:
junction_dict[(chrom,strand)][tmp_sj] = [1, intron_motif, 'non-canonical']
else:
junction_dict[(chrom,strand)][tmp_sj][0] += 1
full_block.append(end)
name = f'{start+1}_{name}'
processed_read[(chrom,strand)][name] = full_block
return junction_dict, processed_read
bed = '/expt/zjzace/Nanopore_subcellular/SIRV/SIRV_Set1/bam/SRR6058583.sorted.bed'
fa = '/expt/zjzace/Nanopore_subcellular/SIRV/SIRV_Set1/Raw_data/SIRV_isoforms_multi-fasta_170612a.fasta'
junction_dict, processed_read = read_grouping(bed, fa)
def polya_signal_import(polyadenylation_event):
"""
read the polyA information from nanopolish-polya mode
"""
polya_reads = {}
if polyadenylation_event:
with open (polyadenylation_event) as f:
next(f)
for line in f:
line = line.rstrip().split("\t")
if len(line) != 10:
raise ValueError("Fatal: input polyA results was not from nanopolish!")
elif line[9] == 'PASS':
polya_reads[f'{int(line[2])+1}_{line[0]}'] = True
return polya_reads
class PolyAFinder():
"""estimate read polyadenylation event by checking the polya motifs from sequence
"""
def __init__(self, processed_read, fasta, polyA_motif_file, updis = 40, downdis = 10):
self.processed_read = processed_read
self.fasta = fasta
self.polyA_motif_file = polyA_motif_file
self.updis = updis
self.downdis = downdis
def polyA_motif_import(self):
"""import given polyA motif
"""
polyA_motif_list = []
for line in open(self.polyA_motif_file):
x = line.strip().upper().replace('U', 'A')
polyA_motif_list.append(x)
return polyA_motif_list
def find_polyA_motif(self, seq, polyA_motif_list):
"""check the occurrence of given polyA motif in target sequence
"""
for motif in polyA_motif_list:
i = seq.find(motif)
if i >= 0:
return motif
return None
def polya_estimation(self):
"""estimate read polyadenylation event
"""
genome = pysam.FastaFile(self.fasta)
read_polya_dict = {}
polyA_motif_list = self.polyA_motif_import()
for (chrom, strand) in self.processed_read:
for read_id, full_block in self.processed_read[(chrom, strand)].items():
if strand == '-':
res = full_block[0]
else:
strand = '+'
res = full_block[-1]
try:
if strand == '-':
seq = f'{Seq(genome.fetch(chrom, res - self.downdis, res + self.updis)).reverse_complement()}'
else:
seq = f'{genome.fetch(chrom, res - self.updis, res + self.downdis)}'
polyA_motif = self.find_polyA_motif(seq, polyA_motif_list)
if polyA_motif:
read_polya_dict[read_id] = True
except:
pass
return read_polya_dict