import pysam
from Bio.Seq import Seq
from LAFITE.utils import Vividict

split_bed_line[source]

split_bed_line(entry, extends=False)

split bed line

def split_bed_line(entry, extends=False):
	"""
	split bed line
	"""
	cells = entry.strip('\n').split("\t")
	chrom = start = end = name = score = strand = thick_start = thick_end = item_rgb = block_count = block_sizes = block_starts = None
	if len(cells) >= 6:
		chrom = cells[0]
		start = int(cells[1])
		end = int(cells[2])
		name = cells[3]
		score = cells[4]
		strand= cells[5]
	if len(cells) >=12:
		thick_start = int(cells[6])
		thick_end = int(cells[7])
		item_rgb = cells[8]
		block_count = int(cells[9])
		block_sizes = [int(i) for i in cells[10].rstrip(',').split(',')]
		block_starts= [int(i) for i in cells[11].rstrip(',').split(',')]
		
	return chrom,start,end,name,score,strand,thick_start,thick_end,item_rgb,block_count,block_sizes,block_starts

bed_block_to_splicing[source]

bed_block_to_splicing(start, block_count, block_starts, block_sizes)

convert bed block to isoform splicing structure

def bed_block_to_splicing(start, block_count, block_starts, block_sizes):
	"""convert bed block to isoform splicing structure
	"""
	read_splicing = []
	if block_count > 1:
		for i in range(block_count-1):
			left_sj = start + block_starts[i] + block_sizes[i]
			right_sj = start + block_starts[i+1] + 1
			read_splicing.extend([left_sj, right_sj])
	return read_splicing
	

read_grouping[source]

read_grouping(bed_file, fasta)

return junctions with only one long read support

def read_grouping(bed_file, fasta):
	"""
	return junctions with only one long read support
	"""
	junction_dict = Vividict()
	processed_read = Vividict()
	genome = pysam.FastaFile(fasta)
	with open (bed_file) as f:
		for read in f:
			chrom, start, end, name, score, strand, thick_start, thick_end, item_rgb, block_count, block_sizes, block_starts = split_bed_line(read) # read the each nanopore read
			full_block = [start+1]
			if block_count > 1:
				for i in range(block_count-1):
					left_sj = start + block_starts[i] + block_sizes[i]
					right_sj = start + block_starts[i+1] + 1
					full_block.extend([left_sj, right_sj])
					tmp_sj = (chrom,strand,left_sj,right_sj)
					
					if tmp_sj not in junction_dict[(chrom,strand)]:
						if strand =='+':
							intron_motif = f'{genome.fetch(chrom, left_sj, left_sj+2)}-{genome.fetch(chrom, right_sj-3, right_sj-1)}'
						else:
							intron_motif = f'{Seq(genome.fetch(chrom, right_sj-3, right_sj-1)).reverse_complement()}-{Seq(genome.fetch(chrom, left_sj, left_sj+2)).reverse_complement()}'
						if intron_motif in ['GT-AG', 'CT-AC', 'GC-AG', 'CT-GC', 'AT-AC', 'GT-AT']:
							junction_dict[(chrom,strand)][tmp_sj] = [1, intron_motif, 'canonical']
						else:
							junction_dict[(chrom,strand)][tmp_sj] = [1, intron_motif, 'non-canonical']
					else:
						junction_dict[(chrom,strand)][tmp_sj][0] += 1
			full_block.append(end)
			name = f'{start+1}_{name}'
			processed_read[(chrom,strand)][name] = full_block
			
	return junction_dict, processed_read
bed = '/expt/zjzace/Nanopore_subcellular/SIRV/SIRV_Set1/bam/SRR6058583.sorted.bed'
fa = '/expt/zjzace/Nanopore_subcellular/SIRV/SIRV_Set1/Raw_data/SIRV_isoforms_multi-fasta_170612a.fasta'
junction_dict, processed_read = read_grouping(bed, fa)

polya_signal_import[source]

polya_signal_import(polyadenylation_event)

read the polyA information from nanopolish-polya mode

def polya_signal_import(polyadenylation_event):
	"""
	read the polyA information from nanopolish-polya mode
	"""
	polya_reads = {}
	if polyadenylation_event:
		with open (polyadenylation_event) as f:
			next(f)
			for line in f:
				line = line.rstrip().split("\t")
				if len(line) != 10:
					raise ValueError("Fatal: input polyA results was not from nanopolish!")
				elif line[9] == 'PASS': 
					polya_reads[f'{int(line[2])+1}_{line[0]}'] = True
	return polya_reads

class PolyAFinder[source]

PolyAFinder(processed_read, fasta, polyA_motif_file, updis=40, downdis=10)

estimate read polyadenylation event by checking the polya motifs from sequence

class PolyAFinder():
	"""estimate read polyadenylation event by checking the polya motifs from sequence
	"""
	def __init__(self, processed_read, fasta, polyA_motif_file, updis = 40,  downdis = 10):
		self.processed_read = processed_read
		self.fasta = fasta
		self.polyA_motif_file = polyA_motif_file
		self.updis = updis
		self.downdis = downdis

	def polyA_motif_import(self):
		"""import given polyA motif
		"""
		polyA_motif_list = []
		for line in open(self.polyA_motif_file):
			x = line.strip().upper().replace('U', 'A')
			polyA_motif_list.append(x)

		return polyA_motif_list

	def find_polyA_motif(self, seq, polyA_motif_list):
		"""check the occurrence of given polyA motif in target sequence
		"""
		for motif in polyA_motif_list:
			i = seq.find(motif)
			if i >= 0:
				return motif
		return None

	def polya_estimation(self):
		"""estimate read polyadenylation event
		"""
		genome = pysam.FastaFile(self.fasta)
		read_polya_dict = {}
		polyA_motif_list = self.polyA_motif_import()

		for (chrom, strand) in self.processed_read:
			for read_id, full_block in self.processed_read[(chrom, strand)].items():

				if strand == '-':
					res = full_block[0]
				else:
					strand = '+'
					res = full_block[-1]
				try:
					if strand == '-':
						seq = f'{Seq(genome.fetch(chrom, res - self.downdis, res + self.updis)).reverse_complement()}'
					else:
						seq = f'{genome.fetch(chrom, res - self.updis, res + self.downdis)}'
					polyA_motif = self.find_polyA_motif(seq, polyA_motif_list)
		
					if polyA_motif:
						read_polya_dict[read_id] = True
				except:
					pass
		return read_polya_dict