Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 32 additions & 7 deletions align_trim/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -748,19 +748,41 @@ def read_pair_generator(bam, region_string=None):

def create_primer_lookup(ref_len_tuple, pools, amplicons: list[Amplicon], padding=35):
Comment thread
ChrisgKent marked this conversation as resolved.
Outdated
"""
Returns a dict of chroms, each containing a (max(pools), chrom_len) shaped array
Returns a dict of chroms, each containing a (N, chrom_len) shaped array
Comment thread
BioWilko marked this conversation as resolved.
Outdated
"""
lookups = {}
for chrom, chromlen in ref_len_tuple:
a = np.empty_like(None, shape=(max(pools), chromlen + 1))
a = np.empty_like(None, shape=(1, chromlen + 1))
for amp in amplicons:
added = False
if amp.chrom == chrom:
a[
amp.ipool,
# If amplicon clashes with any in same pool add new row
amp_slice = a[
:,
max(amp.amplicon_start - padding, 0) : min(
amp.amplicon_end + padding, chromlen
),
] = amp
]
for i, row in enumerate(amp_slice): # Check each row for collision
if row[row != None].size == 0:
a[
Comment thread
BioWilko marked this conversation as resolved.
Outdated
i,
max(amp.amplicon_start - padding, 0) : min(
amp.amplicon_end + padding, chromlen
),
] = amp
added = True
# If not added, add new row to array and add to that.
if not added:
b = np.empty_like(None, shape=(1, chromlen + 1))
b[
0,
max(amp.amplicon_start - padding, 0) : min(
amp.amplicon_end + padding, chromlen
),
] = amp
a = np.vstack((a, b))

lookups[chrom] = a
return lookups

Expand Down Expand Up @@ -878,7 +900,10 @@ def go(args):
# Create a lookup table for primer location
ref_lengths = [(r, infile.get_reference_length(r)) for r in infile.references]
primer_lookup = create_primer_lookup(
ref_len_tuple=ref_lengths, pools=pools, amplicons=amplicon_list, padding=35
ref_len_tuple=ref_lengths,
pools=pools,
amplicons=amplicon_list,
padding=args.primer_match_threshold,
)

trimmed_segments = {x: {} for x in chroms}
Expand Down Expand Up @@ -1067,7 +1092,7 @@ def main():
"-p",
type=int,
default=35,
help="Fuzzy match primer positions within this threshold",
help="Add -p bases of padding to the outside (5' end) of primer coordinates to allow fuzzy matching for reads with barcodes/adapters. (default: %(default)s)",
Comment thread
ChrisgKent marked this conversation as resolved.
Outdated
)
parser.add_argument(
"--report", "-r", type=Path, help="Output report TSV to filepath"
Expand Down
133 changes: 91 additions & 42 deletions tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def test_create_primer_lookup(self):
"""
Test that the primer lookup is created correctly
"""
# Create the primer lookup with 35 padding
# Create the primer lookup with 0 padding
padding = 0
primer_lookup = create_primer_lookup(
ref_len_tuple=self.ref_len,
Expand All @@ -51,38 +51,35 @@ def test_create_primer_lookup(self):
# Check the size of the primer lookup
self.assertEqual(
primer_lookup["MN908947.3"].shape,
(max(self.pools), 29903 + 1), # +1 for half open interval
(2, 29903 + 1), # +1 for half open interval
)

# Check each amplicon is present in the lookup and in correct pool
for amplicon in self.amplicons:
# Check amplicon spans correct region, and correct pool
self.assertEqual(
set(
primer_lookup[amplicon.chrom][
amplicon.ipool, amplicon.amplicon_start : amplicon.amplicon_end
]
),
set([amplicon]),
)
contents = []
for row in primer_lookup[amplicon.chrom][
:, amplicon.amplicon_start - padding : amplicon.amplicon_end + padding
]:
contents.append(set(row))
self.assertIn(set([amplicon]), contents)

# Check padding is aligned correctly
self.assertIsNone(
primer_lookup[amplicon.chrom][
amplicon.ipool, amplicon.amplicon_start - padding - 1
]
)
self.assertIsNone(
primer_lookup[amplicon.chrom][
amplicon.ipool, amplicon.amplicon_end + padding # -1 +1
]
)
for row in primer_lookup[amplicon.chrom][
:, amplicon.amplicon_start - padding - 1
]:
self.assertIsNot(row, amplicon)

for row in primer_lookup[amplicon.chrom][
:, amplicon.amplicon_end + padding # +1 -1
]:
self.assertIsNot(row, amplicon)

def test_create_primer_lookup_padding(self):
"""
Test that the primer lookup is created correctly
"""
# Create the primer lookup with 35 padding
padding = 10
padding = 35
primer_lookup = create_primer_lookup(
ref_len_tuple=self.ref_len,
amplicons=self.amplicons,
Expand All @@ -100,31 +97,83 @@ def test_create_primer_lookup_padding(self):
# Check the size of the primer lookup
self.assertEqual(
primer_lookup["MN908947.3"].shape,
(max(self.pools), 29903 + 1), # +1 for half open interval
(2, 29903 + 1), # +1 for half open interval
)

# Check each amplicon is present in the lookup and in correct pool
# Check each amplicon is present in the lookup
for amplicon in self.amplicons:
# Check amplicon spans correct region, and correct pool
self.assertEqual(
set(
primer_lookup[amplicon.chrom][
amplicon.ipool, amplicon.amplicon_start : amplicon.amplicon_end
]
# Check amplicon spans correct region
contents = []
for row in primer_lookup[amplicon.chrom][
:,
max(0, amplicon.amplicon_start - padding) : min(
amplicon.amplicon_end + padding, 29903 + 1
),
set([amplicon]),
)
]:
contents.append(set(row))
self.assertIn(set([amplicon]), contents)

# Check padding is aligned correctly
self.assertIsNone(
primer_lookup[amplicon.chrom][
amplicon.ipool, amplicon.amplicon_start - padding - 1
]
)
self.assertIsNone(
primer_lookup[amplicon.chrom][
amplicon.ipool, amplicon.amplicon_end + padding # -1 +1
]
)
for row in primer_lookup[amplicon.chrom][
:, amplicon.amplicon_start - padding - 1
]:
self.assertIsNot(row, amplicon)

for row in primer_lookup[amplicon.chrom][
:, amplicon.amplicon_end + padding # +1 -1
]:
self.assertIsNot(row, amplicon)

def test_create_primer_lookup_no_overlap(self):
"""
Test that the primer lookup is created correctly
"""
# Create the primer lookup with a single amplicon
padding = 0
primer_lookup = create_primer_lookup(
ref_len_tuple=self.ref_len,
amplicons=[self.amplicons[0]],
pools=self.pools,
padding=padding,
)

# Check the size of the primer lookup
self.assertEqual(
primer_lookup["MN908947.3"].shape,
(1, 29903 + 1), # +1 for half open interval
)

def test_create_primer_lookup_overlap(self):
"""
Test that the primer lookup is created correctly
"""
# Create some fake amplicon
# nCoV-2019_3 overlaps with nCoV-2019_1
scheme = Scheme.from_str(
"MN908947.3 30 54 nCoV-2019_1_LEFT_1 1 + ACCAACCAACTTTCGATCTCTTGT\n"
"MN908947.3 385 410 nCoV-2019_1_RIGHT_1 1 - CATCTTTAAGATGTTGACGTGCCTC\n"
"MN908947.3 320 342 nCoV-2019_2_LEFT_1 2 + CTGTTTTACAGGTTCGCGACGT\n"
"MN908947.3 704 726 nCoV-2019_2_RIGHT_1 2 - TAAGGATCAGTGCCAAGCTCGT\n"
"MN908947.3 385 400 nCoV-2019_3_LEFT_1 1 + CGGTAATAAAGGAGCTGGTGGC\n"
"MN908947.3 800 820 nCoV-2019_3_RIGHT_1 1 - AAGGTGTCTGCAATTCATAGCTCT\n"
)
scheme.bedlines = merge_primers(scheme.bedlines)
amplicons = create_amplicons(scheme.bedlines)

# Create the primer lookup with a single amplicon
padding = 0
primer_lookup = create_primer_lookup(
ref_len_tuple=self.ref_len,
amplicons=amplicons,
pools=self.pools,
padding=padding,
)

# Check the size of the primer lookup
self.assertEqual(
primer_lookup["MN908947.3"].shape,
(3, 29903 + 1), # +1 for half open interval
)


class TestFindPrimerWithLookup(unittest.TestCase):
Expand Down
Loading