Cookbook¶
Contents
Subsequence extraction¶
myseq = s'CAATAGAGACTAAGCATTAT'
sublen = 5
stride = 2
# explicit for-loop
for subseq in myseq.split(sublen, stride):
print subseq
# pipelined
myseq |> split(sublen, stride) |> echo
k-mer extraction¶
myseq = s'CAATAGAGACTAAGCATTAT'
type K = Kmer[5]
stride = 2
# explicit for-loop
for subseq in myseq.kmers[K](stride):
print subseq
# pipelined
myseq |> kmers[K](stride) |> echo
Reverse complementation¶
# sequences
s = s'GGATC'
print ~s # GATCC
# k-mers
k = k'GGATC'
print ~k # GATCC
k-mer Hamming distance¶
k1 = k'ACGTC'
k2 = k'ACTTA'
# ^ ^
print abs(k1 - k2) # Hamming distance = 2
k-mer Hamming neighbors¶
def neighbors(kmer):
for i in range(len(kmer)):
for b in (k'A', k'C', k'G', k'T'):
if kmer[i] != b:
yield kmer |> base(i, b)
print list(neighbors(k'AGC')) # CGC, GGC, etc.
k-mer minimizer¶
def minimizer[K](s: seq):
assert len(s) >= K.len()
kmer_min = K(s)
for kmer in s[1:].kmers[K](1):
kmer = min(kmer, ~kmer)
if kmer < kmer_min: kmer_min = kmer
return kmer_min
print minimizer[Kmer[10]](s'ACGTACGTACGT')
de Bruijn edge¶
def de_bruijn_edge[K](a: K, b: K):
a = a |> base(0, k'A') # reset first base: [T]GAG -> [A]GAG
b = b >> s'A' # shift right to A: [GAG]C -> A[GAG]
return a == b # suffix of a == prefix of b
print de_bruijn_edge(k'TGAG', k'GAGC') # True
print de_bruijn_edge(k'TCAG', k'GAGC') # False
Count bases¶
type BaseCount(A: int, C: int, G: int, T: int):
def __add__(self: BaseCount, other: BaseCount):
a1, c1, g1, t1 = self
a2, c2, g2, t2 = other
return (a1 + a2, c1 + c2, g1 + g2, t1 + t2)
def count_bases(s: seq) -> BaseCount:
match s:
case s'A...': return count_bases(s[1:]) + (1,0,0,0)
case s'C...': return count_bases(s[1:]) + (0,1,0,0)
case s'G...': return count_bases(s[1:]) + (0,0,1,0)
case s'T...': return count_bases(s[1:]) + (0,0,0,1)
case _: return BaseCount(0,0,0,0)
Spaced seed search¶
def has_spaced_acgt(s: seq) -> bool:
match s:
case s'A_C_G_T...':
return True
case t if len(t) >= 8:
return has_spaced_acgt(s[1:])
case _:
return False
Reverse-complement palindrome¶
def is_own_revcomp(s: seq) -> bool:
match s:
case s'A...T' or s'T...A' or s'C...G' or s'G...C':
return is_own_revcomp(s[1:-1])
case s'':
return True
case _:
return False
Sequence alignment¶
# default parameters
s1 = s'CGCGAGTCTT'
s2 = s'CGCAGAGTT'
aln = s1 @ s2
print aln.cigar, aln.score
# custom parameters
# match = 2; mismatch = 4; gap1(k) = 2k + 4; gap2(k) = k + 13
aln = s1.align(s2, a=2, b=4, gapo=4, gape=2, gapo2=13, gape2=1)
print aln.cigar, aln.score
Reading FASTA/FASTQ¶
# iterate over everything
for r in FASTA('genome.fa'):
print r.name
print r.seq
# iterate over sequences
for s in FASTA('genome.fa') |> seqs:
print s
# iterate over everything
for r in FASTQ('reads.fq'):
print r.name
print r.read
print r.qual
# iterate over sequences
for s in FASTQ('reads.fq') |> seqs:
print s
Reading paired-end FASTQ¶
for r1, r2 in zip(FASTQ('reads_1.fq'), FASTQ('reads_2.fq')):
print r1.name, r2.name
print r1.read, r2.read
print r1.qual, r2.qual
Parallel FASTQ processing¶
def process(s: seq):
...
# OMP_NUM_THREADS environment variable controls threads
FASTQ('reads.fq') |> iter ||> process
# Sometimes batching reads into blocks can improve performance,
# especially if each is quick to process.
FASTQ('reads.fq') |> blocks(size=1000) ||> iter |> process
Reading SAM/BAM/CRAM¶
# iterate over everything
for r in SAM('alignments.sam'):
print r.name
print r.read
print r.pos
print r.mapq
print r.cigar
print r.reversed
# etc.
for r in BAM('alignments.bam'):
# ...
for r in CRAM('alignments.cram'):
# ...
# iterate over sequences
for s in SAM('alignments.sam') |> seqs:
print s
for s in BAM('alignments.bam') |> seqs:
print s
for s in CRAM('alignments.cram') |> seqs:
print s
DNA to protein translation¶
dna = s'AGGTCTAACGGC'
protein = dna |> translate
print protein # RSNG
Reading protein sequences from FASTA¶
for s in pFASTA('seqs.fasta') |> seqs:
print s