Skip to content

Commit

Permalink
Integration tests for running VEP and fixing another annotation alt m…
Browse files Browse the repository at this point in the history
…atching edge case. (#234)
  • Loading branch information
bashir2 authored May 9, 2018
1 parent a9c86fa commit 3ddd73d
Show file tree
Hide file tree
Showing 6 changed files with 225 additions and 14 deletions.
28 changes: 27 additions & 1 deletion gcp_variant_transforms/libs/annotation/vep/vep_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ def create_runner_and_update_args(known_args, pipeline_args):
known_args.annotation_fields.append(known_args.vep_info_field)
else:
known_args.annotation_fields = [known_args.vep_info_field]
# TODO(bashir2): The VEP runner by default runs VEP with --allele_number hence
# we turn on this feature here. However, this might be inconsistent with other
# annotation fields that are originally present in input files, if they do not
# have ALLELE_NUM annotation. The fix is to make annotation ALT matching
# smarter to fall back on other matching methods if ALLELE_NUM is not present.
# When this is implemented, we may even consider removing use_allele_num flag
# and always start by checking if ALLELE_NUM is present.
known_args.use_allele_num = True
return runner


Expand Down Expand Up @@ -205,7 +213,10 @@ def _process_pipeline_args(self, pipeline_args):
self._region = self._get_flag(flags_dict, 'region')
# TODO(bahsir2): Fix the error messages of _check_flag since
# --worker_machine_type has dest='machine_type'.
self._machine_type = self._get_flag(flags_dict, 'machine_type')
try:
self._machine_type = self._get_flag(flags_dict, 'machine_type')
except ValueError:
self._machine_type = self._get_machine_type_from_fork()
self._max_num_workers = self._get_flag(
flags_dict, 'max_num_workers', 'num_workers')
if self._max_num_workers <= 0:
Expand All @@ -223,6 +234,21 @@ def _get_flag(self, pipeline_flags, *expected_flags):
raise ValueError('Could not find any of {} among pipeline flags {}'.format(
expected_flags, pipeline_flags))

def _get_machine_type_from_fork(self):
# type: () -> str
if self._vep_num_fork == 1:
return 'n1-standard-1'
elif self._vep_num_fork == 2:
return 'n1-standard-2'
elif self._vep_num_fork <= 4:
return 'n1-standard-4'
elif self._vep_num_fork <= 8:
return 'n1-standard-8'
else:
# This is just a heuristic since after a certain point having more cores
# does not help VEP performance much more because of its file I/O.
return 'n1-standard-16'

def wait_until_done(self):
"""Polls currently running operations and waits until all are done."""
while self._running_operation_ids:
Expand Down
23 changes: 13 additions & 10 deletions gcp_variant_transforms/libs/processed_variant.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

import enum
import logging
import os
import re

from collections import defaultdict
Expand Down Expand Up @@ -416,7 +415,7 @@ def add_annotation_data(self, proc_var, annotation_field_name, data):
Allele|Consequence|IMPACT|SYMBOL|Gene|...
"""
common_prefix = self._find_common_alt_ref_prefix(proc_var)
common_prefix = self._find_common_alt_ref_prefix_char(proc_var)
alt_annotation_map = self._convert_annotation_strs_to_alt_map(
annotation_field_name, data)
for alt_bases, annotations_list in alt_annotation_map.iteritems():
Expand All @@ -436,12 +435,15 @@ def add_annotation_data(self, proc_var, annotation_field_name, data):
self._add_ambiguous_fields(annotations_list, ambiguous)
alt._info[annotation_field_name] = annotations_list

def _find_common_alt_ref_prefix(self, proc_var):
def _find_common_alt_ref_prefix_char(self, proc_var):
# type: (ProcessedVariant) -> str
alt_list = [
alt.alternate_bases for alt in proc_var._alternate_datas]
alt_list.append(proc_var.reference_bases or '')
return os.path.commonprefix(alt_list)
if not proc_var.reference_bases:
return ''
common_char = proc_var.reference_bases[0]
for alt in proc_var._alternate_datas:
if not alt.alternate_bases or alt.alternate_bases[0] != common_char:
return ''
return common_char

def _convert_annotation_strs_to_alt_map(
self, annotation_field_name, field_data):
Expand Down Expand Up @@ -561,7 +563,7 @@ def _alt_matches_annotation_alt(
"""Returns true if `alt_bases` matches `annotation_alt`
See the "VCF" and "Complex VCF entries" sections of
https://useast.ensembl.org/info/docs/tools/vep/vep_formats.html
https://ensembl.org/info/docs/tools/vep/vep_formats.html
for details of prefix matching and indels. Some examples:
REF ALT annotation-ALT
A T T
Expand All @@ -572,8 +574,9 @@ def _alt_matches_annotation_alt(
if not self._minimal_match:
# Check equality without the common prefix.
# Note according to VCF spec the length of this common prefix should be
# at most one but we have not checked/enforced that here.
# Also, this string matching should be skipped if in minimal_match mode.
# at most one. This string matching is skipped if in minimal_match mode.
# TODO(bashir2): This is a VEP specific issue and should be updated once
# we need to import annotations generated by other programs.
if alt_bases[len(common_prefix):] == annotation_alt:
return True
# Handling deletion.
Expand Down
47 changes: 47 additions & 0 deletions gcp_variant_transforms/libs/processed_variant_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,53 @@ def test_create_processed_variant_annotation_alt_prefix(self):
counter_factory.counter_map[
CEnum.ANNOTATION_ALT_MISMATCH.value].get_value(), 0)

def test_create_processed_variant_annotation_alt_long_prefix(self):
# The returned variant is ignored as we create a custom one next.
_, header_fields = self._get_sample_variant_and_header_with_csq()
variant = vcfio.Variant(
reference_name='19', start=11, end=12, reference_bases='CC',
alternate_bases=['CCT', 'CCC', 'CCCC'],
names=['rs1'], quality=2,
filters=['PASS'],
info={'CSQ': vcfio.VariantInfo(
data=[
'CT|C1|I1|S1|G1', 'CC|C2|I2|S2|G2', 'CCC|C3|I3|S3|G3'],
field_count='.')})
counter_factory = _CounterSpyFactory()
factory = processed_variant.ProcessedVariantFactory(
header_fields,
split_alternate_allele_info_fields=True,
annotation_fields=['CSQ'],
counter_factory=counter_factory)
proc_var = factory.create_processed_variant(variant)
alt1 = processed_variant.AlternateBaseData('CCT')
alt1._info = {
'CSQ': [
{processed_variant._ANNOTATION_ALT: 'CT', 'Consequence': 'C1',
'IMPACT': 'I1', 'SYMBOL': 'S1', 'Gene': 'G1'}]
}
alt2 = processed_variant.AlternateBaseData('CCC')
alt2._info = {
'CSQ': [
{processed_variant._ANNOTATION_ALT: 'CC', 'Consequence': 'C2',
'IMPACT': 'I2', 'SYMBOL': 'S2', 'Gene': 'G2'}]
}
alt3 = processed_variant.AlternateBaseData('CCCC')
alt3._info = {
'CSQ': [
{processed_variant._ANNOTATION_ALT: 'CCC', 'Consequence': 'C3',
'IMPACT': 'I3', 'SYMBOL': 'S3', 'Gene': 'G3'}]
}
self.assertEqual(proc_var.alternate_data_list, [alt1, alt2, alt3])
self.assertFalse(proc_var.non_alt_info.has_key('CSQ'))
self.assertEqual(counter_factory.counter_map[
CEnum.VARIANT.value].get_value(), 1)
self.assertEqual(counter_factory.counter_map[
CEnum.ANNOTATION_ALT_MATCH.value].get_value(), 3)
self.assertEqual(
counter_factory.counter_map[
CEnum.ANNOTATION_ALT_MISMATCH.value].get_value(), 0)

def test_create_processed_variant_annotation_alt_prefix_but_ref(self):
# The returned variant is ignored as we create a custom one next.
_, header_fields = self._get_sample_variant_and_header_with_csq()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
{
"test_name": "platinum-na12877-hg38-10k-lines-run-vep",
"table_name": "platinum_NA12877_hg38_10K_lines_run_vep",
"input_pattern": "gs://gcp-variant-transforms-testfiles/small_tests/platinum_NA12877_hg38_10K_lines_manual_vep_orig_output.vcf",
"annotation_fields": "CSQ",
"runner": "DataflowRunner",
"run_annotation_pipeline": "True",
"vep_image_uri": "gcr.io/gcp-variant-transforms-test/vep_91",
"vep_cache_path": "gs://gcp-variant-transforms-test_vep_cache/vep_cache_homo_sapiens_GRCh38_91.tar.gz",
"annotation_output_dir": "gs://integration_test_runs/temp/vep_output/{TABLE_NAME}",
"num_workers": 1,
"assertion_configs": [
{
"query": ["NUM_ROWS_QUERY"],
"expected_result": {"num_rows": 9953}
},
{
"query": [
"SELECT COUNT(0) AS num_annotation_sets ",
"FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ AS CSQ"
],
"expected_result": {"num_annotation_sets": 45770}
},
{
"query": [
"SELECT COUNT(0) AS num_annotation_sets ",
"FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ_VT AS CSQ_VT"
],
"expected_result": {"num_annotation_sets": 45770}
},
{
"query": [
"SELECT SUM(start_position * number_of_annotations) AS hash_sum ",
"FROM ( ",
" SELECT start_position, reference_bases, A.alt, ",
" COUNT(0) AS number_of_annotations ",
" FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ AS CSQ",
" GROUP BY 1, 2, 3",
")"
],
"expected_result": {"hash_sum": 143375297338}
},
{
"query": [
"SELECT SUM(start_position * number_of_annotations) AS hash_sum ",
"FROM ( ",
" SELECT start_position, reference_bases, A.alt, ",
" COUNT(0) AS number_of_annotations ",
" FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ_VT AS CSQ_VT",
" GROUP BY 1, 2, 3",
")"
],
"expected_result": {"hash_sum": 143375297338}
},
{
"query": [
"SELECT COUNT(DISTINCT CSQ.Feature) AS num_features ",
"FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ AS CSQ"
],
"expected_result": {"num_features": 1576}
},
{
"query": [
"SELECT COUNT(DISTINCT CSQ_VT.Feature) AS num_features ",
"FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ_VT AS CSQ_VT"
],
"expected_result": {"num_features": 1576}
},
{
"query": [
"SELECT COUNT(DISTINCT CSQ.SYMBOL) AS num_symbol ",
"FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ AS CSQ"
],
"expected_result": {"num_symbol": 207}
},
{
"query": [
"SELECT COUNT(DISTINCT CSQ_VT.SYMBOL) AS num_symbol ",
"FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ_VT AS CSQ_VT"
],
"expected_result": {"num_symbol": 207}
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
{
"test_name": "platinum-na12877-hg38-10k-lines",
"table_name": "platinum_NA12877_hg38_10K_lines",
"input_pattern": "gs://gcp-variant-transforms-testfiles/small_tests/platinum_NA12877_hg38_10K_lines_manual_vep_orig_output.vcf",
"annotation_fields": "CSQ",
"runner": "DataflowRunner",
"assertion_configs": [
{
"query": ["NUM_ROWS_QUERY"],
"expected_result": {"num_rows": 9953}
},
{
"query": [
"SELECT COUNT(0) AS num_annotation_sets ",
"FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ AS CSQ"
],
"expected_result": {"num_annotation_sets": 45770}
},
{
"query": [
"SELECT SUM(start_position * number_of_annotations) AS hash_sum ",
"FROM ( ",
" SELECT start_position, reference_bases, A.alt, ",
" COUNT(0) AS number_of_annotations ",
" FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ AS CSQ",
" GROUP BY 1, 2, 3",
")"
],
"expected_result": {"hash_sum": 143375297338}
},
{
"query": [
"SELECT COUNT(DISTINCT CSQ.Feature) AS num_features ",
"FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ AS CSQ"
],
"expected_result": {"num_features": 1576}
},
{
"query": [
"SELECT COUNT(DISTINCT CSQ.SYMBOL) AS num_symbol ",
"FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ AS CSQ"
],
"expected_result": {"num_symbol": 207}
}
]
}
11 changes: 8 additions & 3 deletions gcp_variant_transforms/testing/integration/run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,10 @@ def __init__(self,
'--temp_location {}'.format(context.temp_location),
'--job_name {}-{}'.format(test_name, dataset_id.replace('_', '-'))]
for k, v in kwargs.iteritems():
args.append('--{} {}'.format(k, v))
value = v
if isinstance(v, basestring):
value = v.format(TABLE_NAME=self._table_name)
args.append('--{} {}'.format(k, value))

self._pipelines_api_request = {
'pipelineArgs': {
Expand All @@ -115,6 +118,7 @@ def get_name(self):
return self._name

def run(self, context):
# type: (TestContextManager) -> None
service = discovery.build(
'genomics', 'v1alpha2', credentials=context.credentials)
# The following pylint hint is needed because `pipelines` is a method that
Expand Down Expand Up @@ -279,8 +283,8 @@ def _get_args():
parser.add_argument(
'--image',
help=('The name of the container image to run the test against it, for '
'example: gcr.io/test-gcp-variant-transforms/'
'test_gcp-variant-transforms_2018-01-20-13-47-12. '
'example: gcr.io/gcp-variant-transforms-test/'
'gcp-variant-transforms:2018-01-20-13-47-12. '
'By default the production image {} is used.'
).format(DEFAULT_IMAGE_NAME),
default=DEFAULT_IMAGE_NAME,
Expand Down Expand Up @@ -361,6 +365,7 @@ def _validate_assertion_config(assertion_config):


def _run_test(test, context):
# type: (TestCase, TestContextManager) -> None
if not context.revalidation_dataset_id:
test.run(context)
test.validate_table()
Expand Down

0 comments on commit 3ddd73d

Please sign in to comment.